huskies: merge 641_story_unified_status_update_delivery_across_chat_web_ui_and_top_level_agent_context

This commit is contained in:
dave
2026-04-26 02:23:23 +00:00
parent dc7ae3a23c
commit d8f9be5b23
11 changed files with 670 additions and 0 deletions
+1
View File
@@ -146,6 +146,7 @@ pub(super) fn call_sync(
perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)),
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
permission_timeout_secs: 120,
status: Arc::new(crate::service::status::StatusBroadcaster::new()),
});
let dispatch = CommandDispatch {
+1
View File
@@ -406,6 +406,7 @@ pub fn spawn_gateway_bot(
std::collections::HashMap::new(),
)),
permission_timeout_secs: 120,
status: std::sync::Arc::new(crate::service::status::StatusBroadcaster::new()),
});
crate::chat::transport::matrix::spawn_bot(
+1
View File
@@ -23,6 +23,7 @@ pub mod project;
pub mod qa;
pub mod settings;
pub mod shell;
pub mod status;
pub mod story;
pub mod timer;
pub mod wizard;
+203
View File
@@ -0,0 +1,203 @@
//! Pure status-event message formatter.
//!
//! A single [`format_status_event`] function converts any [`StatusEvent`] into
//! a human-readable string. Adding a new event type means adding one match arm
//! here — no per-transport duplication anywhere in the codebase.
use crate::service::common::item_id::extract_item_number;
use crate::service::notifications::format::stage_display_name;
use crate::service::status::StatusEvent;
/// Render a [`StatusEvent`] into a human-readable plain-text string.
#[allow(dead_code)]
///
/// This is the single formatter for all status event types. Every transport
/// (chat, Web UI, agent context) calls this function rather than duplicating
/// formatting logic.
pub fn format_status_event(event: &StatusEvent) -> String {
match event {
StatusEvent::StageTransition {
story_id,
story_name,
from_stage,
to_stage,
} => {
let number = extract_item_number(story_id).unwrap_or(story_id.as_str());
let name = story_name.as_deref().unwrap_or(story_id.as_str());
let from = stage_display_name(from_stage);
let to = stage_display_name(to_stage);
let prefix = if to == "Done" { "\u{1f389} " } else { "" };
format!("{prefix}#{number} {name} \u{2014} {from} \u{2192} {to}")
}
StatusEvent::MergeFailure {
story_id,
story_name,
reason,
} => {
let number = extract_item_number(story_id).unwrap_or(story_id.as_str());
let name = story_name.as_deref().unwrap_or(story_id.as_str());
format!("\u{274c} #{number} {name} \u{2014} {reason}")
}
StatusEvent::StoryBlocked {
story_id,
story_name,
reason,
} => {
let number = extract_item_number(story_id).unwrap_or(story_id.as_str());
let name = story_name.as_deref().unwrap_or(story_id.as_str());
format!("\u{1f6ab} #{number} {name} \u{2014} BLOCKED: {reason}")
}
StatusEvent::RateLimitWarning {
story_id,
story_name,
agent_name,
} => {
let number = extract_item_number(story_id).unwrap_or(story_id.as_str());
let name = story_name.as_deref().unwrap_or(story_id.as_str());
format!("\u{26a0}\u{fe0f} #{number} {name} \u{2014} {agent_name} hit an API rate limit")
}
StatusEvent::RateLimitHardBlock {
story_id,
story_name,
agent_name,
reset_at,
} => {
let number = extract_item_number(story_id).unwrap_or(story_id.as_str());
let name = story_name.as_deref().unwrap_or(story_id.as_str());
let reset = reset_at.format("%H:%M UTC").to_string();
format!(
"\u{26d4} #{number} {name} \u{2014} {agent_name} hard rate-limited until {reset}"
)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
#[test]
fn formats_stage_transition_to_done_with_emoji() {
let event = StatusEvent::StageTransition {
story_id: "42_story_foo".to_string(),
story_name: Some("Foo Story".to_string()),
from_stage: "4_merge".to_string(),
to_stage: "5_done".to_string(),
};
let s = format_status_event(&event);
assert!(
s.contains("\u{1f389}"),
"done transition should include party emoji"
);
assert!(s.contains("#42"));
assert!(s.contains("Foo Story"));
assert!(s.contains("Merge \u{2192} Done"));
}
#[test]
fn formats_stage_transition_no_emoji_for_non_done() {
let event = StatusEvent::StageTransition {
story_id: "10_story_bar".to_string(),
story_name: Some("Bar".to_string()),
from_stage: "1_backlog".to_string(),
to_stage: "2_current".to_string(),
};
let s = format_status_event(&event);
assert!(!s.contains("\u{1f389}"));
assert!(s.contains("Backlog \u{2192} Current"));
}
#[test]
fn formats_stage_transition_falls_back_to_story_id_when_no_name() {
let event = StatusEvent::StageTransition {
story_id: "5_story_x".to_string(),
story_name: None,
from_stage: "2_current".to_string(),
to_stage: "3_qa".to_string(),
};
let s = format_status_event(&event);
assert!(s.contains("5_story_x"));
}
#[test]
fn formats_merge_failure() {
let event = StatusEvent::MergeFailure {
story_id: "7_story_fail".to_string(),
story_name: Some("Failing Story".to_string()),
reason: "conflicts detected".to_string(),
};
let s = format_status_event(&event);
assert!(s.contains("\u{274c}"));
assert!(s.contains("#7"));
assert!(s.contains("conflicts detected"));
}
#[test]
fn formats_story_blocked() {
let event = StatusEvent::StoryBlocked {
story_id: "8_story_blk".to_string(),
story_name: Some("Blocked Story".to_string()),
reason: "retry limit exceeded".to_string(),
};
let s = format_status_event(&event);
assert!(s.contains("\u{1f6ab}"));
assert!(s.contains("BLOCKED: retry limit exceeded"));
}
#[test]
fn formats_rate_limit_warning() {
let event = StatusEvent::RateLimitWarning {
story_id: "9_story_rl".to_string(),
story_name: Some("RL Story".to_string()),
agent_name: "coder-1".to_string(),
};
let s = format_status_event(&event);
assert!(s.contains("coder-1 hit an API rate limit"));
}
#[test]
fn formats_rate_limit_hard_block() {
let reset = chrono::Utc
.with_ymd_and_hms(2026, 4, 26, 15, 30, 0)
.unwrap();
let event = StatusEvent::RateLimitHardBlock {
story_id: "3_story_hb".to_string(),
story_name: Some("HB Story".to_string()),
agent_name: "coder-2".to_string(),
reset_at: reset,
};
let s = format_status_event(&event);
assert!(s.contains("\u{26d4}"));
assert!(s.contains("coder-2"));
assert!(s.contains("hard rate-limited"));
assert!(s.contains("15:30 UTC"));
}
#[test]
fn single_formatter_no_per_transport_duplication() {
// Each event type produces output through a single call — no per-transport variants.
let events: Vec<StatusEvent> = vec![
StatusEvent::StageTransition {
story_id: "1_story_a".to_string(),
story_name: None,
from_stage: "1_backlog".to_string(),
to_stage: "2_current".to_string(),
},
StatusEvent::MergeFailure {
story_id: "2_story_b".to_string(),
story_name: None,
reason: "test".to_string(),
},
StatusEvent::StoryBlocked {
story_id: "3_story_c".to_string(),
story_name: None,
reason: "limit".to_string(),
},
];
for e in &events {
let s = format_status_event(e);
assert!(!s.is_empty());
}
}
}
+449
View File
@@ -0,0 +1,449 @@
//! Status broadcaster — unified pipeline-event fan-out for all consumers.
//!
//! [`StatusBroadcaster`] lives on the [`crate::services::Services`] bundle and
//! exposes two entry points:
//!
//! - [`StatusBroadcaster::publish`] — emit an event to every active subscriber.
//! - [`StatusBroadcaster::subscribe`] — obtain a [`Subscription`] handle that
//! receives events and can be independently enabled or disabled.
//!
//! # Multi-project isolation
//!
//! Each project has its own `Services` bundle and therefore its own
//! `StatusBroadcaster` instance. Events published to project A's broadcaster
//! are delivered only to subscribers of that same broadcaster; project B's
//! subscribers see nothing.
//!
//! # Plug-in consumer design
//!
//! Future stories add their transport-specific logic without touching this
//! module:
//!
//! - Story 642 (top-level agent context): calls `subscribe()` once at startup.
//! - Story 643 (Web UI): calls `subscribe()` once at startup.
//! - Story 644 (chat transports): calls `subscribe()` once per transport.
pub mod format;
use chrono::{DateTime, Utc};
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use tokio::sync::broadcast;
/// The capacity of the underlying broadcast channel.
///
/// Lagging subscribers lose events — callers should consume promptly.
const CHANNEL_CAPACITY: usize = 256;
// ── StatusEvent ───────────────────────────────────────────────────────────────
/// A pipeline lifecycle event that the [`StatusBroadcaster`] fans out to all
/// configured consumers.
///
/// Each variant carries enough context for [`format_status_event`] to render a
/// human-readable message without additional lookups.
#[derive(Clone, Debug)]
pub enum StatusEvent {
/// A work item moved between pipeline stages.
StageTransition {
/// Work item ID (e.g. `"42_story_my_feature"`).
story_id: String,
/// Human-readable story name, if available.
story_name: Option<String>,
/// Pipeline stage directory the item moved FROM (e.g. `"2_current"`).
from_stage: String,
/// Pipeline stage directory the item moved TO (e.g. `"3_qa"`).
to_stage: String,
},
/// A merge operation failed for a story.
MergeFailure {
/// Work item ID (e.g. `"42_story_my_feature"`).
story_id: String,
/// Human-readable story name, if available.
story_name: Option<String>,
/// Human-readable description of the failure.
reason: String,
},
/// A story was blocked (e.g. retry limit exceeded, empty diff).
StoryBlocked {
/// Work item ID (e.g. `"42_story_my_feature"`).
story_id: String,
/// Human-readable story name, if available.
story_name: Option<String>,
/// Human-readable reason the story was blocked.
reason: String,
},
/// An agent hit a soft API rate limit.
RateLimitWarning {
/// Work item ID the agent was working on.
story_id: String,
/// Human-readable story name, if available.
story_name: Option<String>,
/// Name of the agent that hit the limit.
agent_name: String,
},
/// An agent hit a hard API rate limit and will be blocked until `reset_at`.
RateLimitHardBlock {
/// Work item ID the agent was working on.
story_id: String,
/// Human-readable story name, if available.
story_name: Option<String>,
/// Name of the agent that hit the hard limit.
agent_name: String,
/// UTC instant at which the rate limit resets.
reset_at: DateTime<Utc>,
},
}
// ── Subscription ──────────────────────────────────────────────────────────────
/// A per-consumer handle returned by [`StatusBroadcaster::subscribe`].
///
/// Each subscription holds its own [`broadcast::Receiver`] and an independent
/// `enabled` flag. When disabled, [`Subscription::recv`] silently discards
/// incoming events until the subscription is re-enabled.
///
/// Stories 642/643/644 own the lifecycle of their `Subscription` handles,
/// including when to call [`enable`](Subscription::enable) and
/// [`disable`](Subscription::disable).
pub struct Subscription {
receiver: broadcast::Receiver<StatusEvent>,
enabled: Arc<AtomicBool>,
}
impl Subscription {
/// Enable this subscription — future events will be delivered to the caller.
pub fn enable(&self) {
self.enabled.store(true, Ordering::Relaxed);
}
/// Disable this subscription — incoming events are silently discarded until
/// [`enable`](Subscription::enable) is called again.
pub fn disable(&self) {
self.enabled.store(false, Ordering::Relaxed);
}
/// Returns `true` if this subscription is currently enabled.
pub fn is_enabled(&self) -> bool {
self.enabled.load(Ordering::Relaxed)
}
/// Receive the next event, skipping any that arrive while the subscription
/// is disabled.
///
/// Returns `None` if the broadcaster has been dropped (channel closed).
/// If the receiver is lagging (events were dropped), the lag is cleared and
/// reception continues.
pub async fn recv(&mut self) -> Option<StatusEvent> {
loop {
match self.receiver.recv().await {
Ok(event) => {
if self.enabled.load(Ordering::Relaxed) {
return Some(event);
}
// Subscription disabled — discard and wait for next.
}
Err(broadcast::error::RecvError::Lagged(_)) => {
// Dropped some events; continue from current position.
continue;
}
Err(broadcast::error::RecvError::Closed) => return None,
}
}
}
/// Returns a clone of the `enabled` flag so the owner can share it with
/// another task (e.g. an HTTP handler that toggles the flag).
pub fn enabled_flag(&self) -> Arc<AtomicBool> {
Arc::clone(&self.enabled)
}
}
// ── StatusBroadcaster ─────────────────────────────────────────────────────────
/// A project-scoped pub-sub hub for pipeline status events.
///
/// Constructed once per project in `main.rs` and stored on
/// [`crate::services::Services`]. Downstream consumers (chat transports, Web
/// UI, agent context) call [`subscribe`](StatusBroadcaster::subscribe) to
/// obtain a [`Subscription`] handle; the broadcaster itself only needs to be
/// called via [`publish`](StatusBroadcaster::publish).
pub struct StatusBroadcaster {
tx: broadcast::Sender<StatusEvent>,
}
impl StatusBroadcaster {
/// Create a new broadcaster with a channel capacity of [`CHANNEL_CAPACITY`].
pub fn new() -> Self {
let (tx, _) = broadcast::channel(CHANNEL_CAPACITY);
Self { tx }
}
/// Publish an event to all currently active, non-lagging subscribers.
///
/// If there are no subscribers the send is silently dropped.
pub fn publish(&self, event: StatusEvent) {
let _ = self.tx.send(event);
}
/// Create a new [`Subscription`] for a downstream consumer.
///
/// The subscription starts **enabled** by default. Pass the returned
/// handle to the consuming task; call [`Subscription::disable`] /
/// [`Subscription::enable`] to gate delivery independently per consumer.
pub fn subscribe(&self) -> Subscription {
Subscription {
receiver: self.tx.subscribe(),
enabled: Arc::new(AtomicBool::new(true)),
}
}
/// Create a new [`Subscription`] with an explicit initial enabled state.
///
/// Useful when a consumer is provisioned before its transport is ready.
pub fn subscribe_with_state(&self, initial_enabled: bool) -> Subscription {
Subscription {
receiver: self.tx.subscribe(),
enabled: Arc::new(AtomicBool::new(initial_enabled)),
}
}
}
impl Default for StatusBroadcaster {
fn default() -> Self {
Self::new()
}
}
// ── Tests ─────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
// ── publish / subscribe basics ────────────────────────────────────────────
#[tokio::test]
async fn publish_delivers_to_subscriber() {
let broadcaster = StatusBroadcaster::new();
let mut sub = broadcaster.subscribe();
broadcaster.publish(StatusEvent::MergeFailure {
story_id: "1_story_a".to_string(),
story_name: None,
reason: "conflict".to_string(),
});
let received = sub.recv().await.expect("should receive event");
assert!(matches!(received, StatusEvent::MergeFailure { .. }));
}
#[tokio::test]
async fn publish_fans_out_to_multiple_subscribers() {
let broadcaster = StatusBroadcaster::new();
let mut sub1 = broadcaster.subscribe();
let mut sub2 = broadcaster.subscribe();
broadcaster.publish(StatusEvent::StoryBlocked {
story_id: "2_story_b".to_string(),
story_name: None,
reason: "retry limit".to_string(),
});
let e1 = sub1.recv().await.expect("sub1 should receive");
let e2 = sub2.recv().await.expect("sub2 should receive");
assert!(matches!(e1, StatusEvent::StoryBlocked { .. }));
assert!(matches!(e2, StatusEvent::StoryBlocked { .. }));
}
#[tokio::test]
async fn disabled_subscription_does_not_deliver_until_enabled() {
use tokio::time::{Duration, timeout};
let broadcaster = StatusBroadcaster::new();
let mut sub = broadcaster.subscribe_with_state(false);
// Publish an event while disabled.
broadcaster.publish(StatusEvent::StoryBlocked {
story_id: "3_story_c".to_string(),
story_name: None,
reason: "empty diff".to_string(),
});
// recv() internally discards the buffered StoryBlocked event (disabled),
// then blocks waiting for the next one. The timeout proves it does not
// return the discarded event.
let blocked = timeout(Duration::from_millis(20), sub.recv()).await;
assert!(
blocked.is_err(),
"disabled subscription must not deliver events"
);
// Enable and publish another event — should be delivered.
sub.enable();
broadcaster.publish(StatusEvent::MergeFailure {
story_id: "4_story_d".to_string(),
story_name: None,
reason: "gate failed".to_string(),
});
let received = timeout(Duration::from_millis(100), sub.recv())
.await
.expect("should not time out after enabling")
.expect("channel should be open");
assert!(matches!(received, StatusEvent::MergeFailure { .. }));
}
#[tokio::test]
async fn subscribe_with_state_false_starts_disabled() {
let broadcaster = StatusBroadcaster::new();
let sub = broadcaster.subscribe_with_state(false);
assert!(!sub.is_enabled());
}
#[tokio::test]
async fn subscribe_starts_enabled_by_default() {
let broadcaster = StatusBroadcaster::new();
let sub = broadcaster.subscribe();
assert!(sub.is_enabled());
}
// ── Multi-project isolation ───────────────────────────────────────────────
/// Events from project A's broadcaster must not reach project B's subscribers.
#[tokio::test]
async fn multi_project_isolation_events_do_not_cross_broadcasters() {
let broadcaster_a = StatusBroadcaster::new();
let broadcaster_b = StatusBroadcaster::new();
let mut sub_a = broadcaster_a.subscribe();
let mut sub_b = broadcaster_b.subscribe();
// Publish an event only to project A.
broadcaster_a.publish(StatusEvent::StageTransition {
story_id: "10_story_project_a".to_string(),
story_name: Some("Project A Story".to_string()),
from_stage: "1_backlog".to_string(),
to_stage: "2_current".to_string(),
});
// Publish a different event only to project B.
broadcaster_b.publish(StatusEvent::MergeFailure {
story_id: "20_story_project_b".to_string(),
story_name: Some("Project B Story".to_string()),
reason: "b conflict".to_string(),
});
// sub_a should receive project A's event only.
let from_a = sub_a.recv().await.expect("sub_a should receive event");
match &from_a {
StatusEvent::StageTransition { story_id, .. } => {
assert_eq!(story_id, "10_story_project_a");
}
other => panic!("sub_a got wrong event: {other:?}"),
}
// sub_b should receive project B's event only.
let from_b = sub_b.recv().await.expect("sub_b should receive event");
match &from_b {
StatusEvent::MergeFailure { story_id, .. } => {
assert_eq!(story_id, "20_story_project_b");
}
other => panic!("sub_b got wrong event: {other:?}"),
}
// Verify no cross-contamination: sub_a should have no further events.
use tokio::time::{Duration, timeout};
let cross = timeout(Duration::from_millis(10), sub_a.recv()).await;
assert!(cross.is_err(), "sub_a should not receive project B's event");
}
/// Two projects firing events independently must not affect each other.
#[tokio::test]
async fn multi_project_integration_two_projects_independent() {
let ba = StatusBroadcaster::new();
let bb = StatusBroadcaster::new();
let mut sub_a1 = ba.subscribe();
let mut sub_a2 = ba.subscribe();
let mut sub_b1 = bb.subscribe();
// Project A fires two events.
ba.publish(StatusEvent::StoryBlocked {
story_id: "1_story_a_blocked".to_string(),
story_name: None,
reason: "retry".to_string(),
});
ba.publish(StatusEvent::MergeFailure {
story_id: "2_story_a_fail".to_string(),
story_name: None,
reason: "conflict".to_string(),
});
// Project B fires one event.
bb.publish(StatusEvent::StageTransition {
story_id: "3_story_b_move".to_string(),
story_name: None,
from_stage: "1_backlog".to_string(),
to_stage: "2_current".to_string(),
});
// sub_a1 sees both of project A's events.
let e1 = sub_a1.recv().await.unwrap();
let e2 = sub_a1.recv().await.unwrap();
assert!(matches!(e1, StatusEvent::StoryBlocked { .. }));
assert!(matches!(e2, StatusEvent::MergeFailure { .. }));
// sub_a2 also sees both of project A's events.
let e3 = sub_a2.recv().await.unwrap();
let e4 = sub_a2.recv().await.unwrap();
assert!(matches!(e3, StatusEvent::StoryBlocked { .. }));
assert!(matches!(e4, StatusEvent::MergeFailure { .. }));
// sub_b1 sees only project B's event.
let e5 = sub_b1.recv().await.unwrap();
assert!(matches!(e5, StatusEvent::StageTransition { .. }));
// No cross-contamination from A to B.
use tokio::time::{Duration, timeout};
assert!(
timeout(Duration::from_millis(10), sub_b1.recv())
.await
.is_err(),
"sub_b1 should not see project A events"
);
}
// ── enable/disable flag sharing ───────────────────────────────────────────
#[tokio::test]
async fn enabled_flag_shared_between_tasks() {
let broadcaster = StatusBroadcaster::new();
let sub = broadcaster.subscribe();
let flag = sub.enabled_flag();
// Disable via the shared flag.
flag.store(false, std::sync::atomic::Ordering::Relaxed);
assert!(!sub.is_enabled());
// Re-enable via the subscription directly.
sub.enable();
assert!(sub.is_enabled());
assert!(flag.load(std::sync::atomic::Ordering::Relaxed));
}
// ── no-op when no subscribers ─────────────────────────────────────────────
#[test]
fn publish_with_no_subscribers_does_not_panic() {
let broadcaster = StatusBroadcaster::new();
// No subscriber — send is silently dropped.
broadcaster.publish(StatusEvent::MergeFailure {
story_id: "99_story_nobody".to_string(),
story_name: None,
reason: "no one listening".to_string(),
});
}
}