diff --git a/server/src/agent_mode.rs b/server/src/agent_mode.rs index 0a8dfc4a..12d7a6af 100644 --- a/server/src/agent_mode.rs +++ b/server/src/agent_mode.rs @@ -631,6 +631,7 @@ fn build_agent_app_context( perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), permission_timeout_secs: 120, + status: Arc::new(crate::service::status::StatusBroadcaster::new()), }); crate::http::context::AppContext { state: Arc::new(state), diff --git a/server/src/chat/transport/matrix/bot/context.rs b/server/src/chat/transport/matrix/bot/context.rs index 0ef3f714..e475629b 100644 --- a/server/src/chat/transport/matrix/bot/context.rs +++ b/server/src/chat/transport/matrix/bot/context.rs @@ -140,6 +140,7 @@ mod tests { perm_rx: Arc::new(TokioMutex::new(perm_rx)), pending_perm_replies: Arc::new(TokioMutex::new(HashMap::new())), permission_timeout_secs: 120, + status: Arc::new(crate::service::status::StatusBroadcaster::new()), }) } diff --git a/server/src/chat/transport/whatsapp/commands.rs b/server/src/chat/transport/whatsapp/commands.rs index 4fd45e09..1338e0e5 100644 --- a/server/src/chat/transport/whatsapp/commands.rs +++ b/server/src/chat/transport/whatsapp/commands.rs @@ -491,6 +491,7 @@ mod tests { perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(Default::default())), permission_timeout_secs: 120, + status: Arc::new(crate::service::status::StatusBroadcaster::new()), }); Arc::new(WhatsAppWebhookContext { services, diff --git a/server/src/http/context.rs b/server/src/http/context.rs index 84cb2865..dde1cd5e 100644 --- a/server/src/http/context.rs +++ b/server/src/http/context.rs @@ -128,6 +128,7 @@ impl AppContext { perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())), permission_timeout_secs: 120, + status: Arc::new(crate::service::status::StatusBroadcaster::new()), }); Self { state: Arc::new(state), diff --git a/server/src/main.rs b/server/src/main.rs index 9b511627..76095e92 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -623,6 +623,7 @@ async fn main() -> Result<(), std::io::Error> { .as_ref() .map(|c| c.permission_timeout_secs) .unwrap_or(120), + status: Arc::new(service::status::StatusBroadcaster::new()), }); // Build WhatsApp webhook context if bot.toml configures transport = "whatsapp". diff --git a/server/src/service/bot_command/io.rs b/server/src/service/bot_command/io.rs index 1cad4d93..43ba680b 100644 --- a/server/src/service/bot_command/io.rs +++ b/server/src/service/bot_command/io.rs @@ -146,6 +146,7 @@ pub(super) fn call_sync( perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())), permission_timeout_secs: 120, + status: Arc::new(crate::service::status::StatusBroadcaster::new()), }); let dispatch = CommandDispatch { diff --git a/server/src/service/gateway/io.rs b/server/src/service/gateway/io.rs index e7abfe35..cd0d3579 100644 --- a/server/src/service/gateway/io.rs +++ b/server/src/service/gateway/io.rs @@ -406,6 +406,7 @@ pub fn spawn_gateway_bot( std::collections::HashMap::new(), )), permission_timeout_secs: 120, + status: std::sync::Arc::new(crate::service::status::StatusBroadcaster::new()), }); crate::chat::transport::matrix::spawn_bot( diff --git a/server/src/service/mod.rs b/server/src/service/mod.rs index 3397e5d8..ed96a18b 100644 --- a/server/src/service/mod.rs +++ b/server/src/service/mod.rs @@ -23,6 +23,7 @@ pub mod project; pub mod qa; pub mod settings; pub mod shell; +pub mod status; pub mod story; pub mod timer; pub mod wizard; diff --git a/server/src/service/status/format.rs b/server/src/service/status/format.rs new file mode 100644 index 00000000..5f910fa9 --- /dev/null +++ b/server/src/service/status/format.rs @@ -0,0 +1,203 @@ +//! Pure status-event message formatter. +//! +//! A single [`format_status_event`] function converts any [`StatusEvent`] into +//! a human-readable string. Adding a new event type means adding one match arm +//! here — no per-transport duplication anywhere in the codebase. + +use crate::service::common::item_id::extract_item_number; +use crate::service::notifications::format::stage_display_name; +use crate::service::status::StatusEvent; + +/// Render a [`StatusEvent`] into a human-readable plain-text string. +#[allow(dead_code)] +/// +/// This is the single formatter for all status event types. Every transport +/// (chat, Web UI, agent context) calls this function rather than duplicating +/// formatting logic. +pub fn format_status_event(event: &StatusEvent) -> String { + match event { + StatusEvent::StageTransition { + story_id, + story_name, + from_stage, + to_stage, + } => { + let number = extract_item_number(story_id).unwrap_or(story_id.as_str()); + let name = story_name.as_deref().unwrap_or(story_id.as_str()); + let from = stage_display_name(from_stage); + let to = stage_display_name(to_stage); + let prefix = if to == "Done" { "\u{1f389} " } else { "" }; + format!("{prefix}#{number} {name} \u{2014} {from} \u{2192} {to}") + } + StatusEvent::MergeFailure { + story_id, + story_name, + reason, + } => { + let number = extract_item_number(story_id).unwrap_or(story_id.as_str()); + let name = story_name.as_deref().unwrap_or(story_id.as_str()); + format!("\u{274c} #{number} {name} \u{2014} {reason}") + } + StatusEvent::StoryBlocked { + story_id, + story_name, + reason, + } => { + let number = extract_item_number(story_id).unwrap_or(story_id.as_str()); + let name = story_name.as_deref().unwrap_or(story_id.as_str()); + format!("\u{1f6ab} #{number} {name} \u{2014} BLOCKED: {reason}") + } + StatusEvent::RateLimitWarning { + story_id, + story_name, + agent_name, + } => { + let number = extract_item_number(story_id).unwrap_or(story_id.as_str()); + let name = story_name.as_deref().unwrap_or(story_id.as_str()); + format!("\u{26a0}\u{fe0f} #{number} {name} \u{2014} {agent_name} hit an API rate limit") + } + StatusEvent::RateLimitHardBlock { + story_id, + story_name, + agent_name, + reset_at, + } => { + let number = extract_item_number(story_id).unwrap_or(story_id.as_str()); + let name = story_name.as_deref().unwrap_or(story_id.as_str()); + let reset = reset_at.format("%H:%M UTC").to_string(); + format!( + "\u{26d4} #{number} {name} \u{2014} {agent_name} hard rate-limited until {reset}" + ) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + + #[test] + fn formats_stage_transition_to_done_with_emoji() { + let event = StatusEvent::StageTransition { + story_id: "42_story_foo".to_string(), + story_name: Some("Foo Story".to_string()), + from_stage: "4_merge".to_string(), + to_stage: "5_done".to_string(), + }; + let s = format_status_event(&event); + assert!( + s.contains("\u{1f389}"), + "done transition should include party emoji" + ); + assert!(s.contains("#42")); + assert!(s.contains("Foo Story")); + assert!(s.contains("Merge \u{2192} Done")); + } + + #[test] + fn formats_stage_transition_no_emoji_for_non_done() { + let event = StatusEvent::StageTransition { + story_id: "10_story_bar".to_string(), + story_name: Some("Bar".to_string()), + from_stage: "1_backlog".to_string(), + to_stage: "2_current".to_string(), + }; + let s = format_status_event(&event); + assert!(!s.contains("\u{1f389}")); + assert!(s.contains("Backlog \u{2192} Current")); + } + + #[test] + fn formats_stage_transition_falls_back_to_story_id_when_no_name() { + let event = StatusEvent::StageTransition { + story_id: "5_story_x".to_string(), + story_name: None, + from_stage: "2_current".to_string(), + to_stage: "3_qa".to_string(), + }; + let s = format_status_event(&event); + assert!(s.contains("5_story_x")); + } + + #[test] + fn formats_merge_failure() { + let event = StatusEvent::MergeFailure { + story_id: "7_story_fail".to_string(), + story_name: Some("Failing Story".to_string()), + reason: "conflicts detected".to_string(), + }; + let s = format_status_event(&event); + assert!(s.contains("\u{274c}")); + assert!(s.contains("#7")); + assert!(s.contains("conflicts detected")); + } + + #[test] + fn formats_story_blocked() { + let event = StatusEvent::StoryBlocked { + story_id: "8_story_blk".to_string(), + story_name: Some("Blocked Story".to_string()), + reason: "retry limit exceeded".to_string(), + }; + let s = format_status_event(&event); + assert!(s.contains("\u{1f6ab}")); + assert!(s.contains("BLOCKED: retry limit exceeded")); + } + + #[test] + fn formats_rate_limit_warning() { + let event = StatusEvent::RateLimitWarning { + story_id: "9_story_rl".to_string(), + story_name: Some("RL Story".to_string()), + agent_name: "coder-1".to_string(), + }; + let s = format_status_event(&event); + assert!(s.contains("coder-1 hit an API rate limit")); + } + + #[test] + fn formats_rate_limit_hard_block() { + let reset = chrono::Utc + .with_ymd_and_hms(2026, 4, 26, 15, 30, 0) + .unwrap(); + let event = StatusEvent::RateLimitHardBlock { + story_id: "3_story_hb".to_string(), + story_name: Some("HB Story".to_string()), + agent_name: "coder-2".to_string(), + reset_at: reset, + }; + let s = format_status_event(&event); + assert!(s.contains("\u{26d4}")); + assert!(s.contains("coder-2")); + assert!(s.contains("hard rate-limited")); + assert!(s.contains("15:30 UTC")); + } + + #[test] + fn single_formatter_no_per_transport_duplication() { + // Each event type produces output through a single call — no per-transport variants. + let events: Vec = vec![ + StatusEvent::StageTransition { + story_id: "1_story_a".to_string(), + story_name: None, + from_stage: "1_backlog".to_string(), + to_stage: "2_current".to_string(), + }, + StatusEvent::MergeFailure { + story_id: "2_story_b".to_string(), + story_name: None, + reason: "test".to_string(), + }, + StatusEvent::StoryBlocked { + story_id: "3_story_c".to_string(), + story_name: None, + reason: "limit".to_string(), + }, + ]; + for e in &events { + let s = format_status_event(e); + assert!(!s.is_empty()); + } + } +} diff --git a/server/src/service/status/mod.rs b/server/src/service/status/mod.rs new file mode 100644 index 00000000..76c124bd --- /dev/null +++ b/server/src/service/status/mod.rs @@ -0,0 +1,449 @@ +//! Status broadcaster — unified pipeline-event fan-out for all consumers. +//! +//! [`StatusBroadcaster`] lives on the [`crate::services::Services`] bundle and +//! exposes two entry points: +//! +//! - [`StatusBroadcaster::publish`] — emit an event to every active subscriber. +//! - [`StatusBroadcaster::subscribe`] — obtain a [`Subscription`] handle that +//! receives events and can be independently enabled or disabled. +//! +//! # Multi-project isolation +//! +//! Each project has its own `Services` bundle and therefore its own +//! `StatusBroadcaster` instance. Events published to project A's broadcaster +//! are delivered only to subscribers of that same broadcaster; project B's +//! subscribers see nothing. +//! +//! # Plug-in consumer design +//! +//! Future stories add their transport-specific logic without touching this +//! module: +//! +//! - Story 642 (top-level agent context): calls `subscribe()` once at startup. +//! - Story 643 (Web UI): calls `subscribe()` once at startup. +//! - Story 644 (chat transports): calls `subscribe()` once per transport. + +pub mod format; + +use chrono::{DateTime, Utc}; +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; +use tokio::sync::broadcast; + +/// The capacity of the underlying broadcast channel. +/// +/// Lagging subscribers lose events — callers should consume promptly. +const CHANNEL_CAPACITY: usize = 256; + +// ── StatusEvent ─────────────────────────────────────────────────────────────── + +/// A pipeline lifecycle event that the [`StatusBroadcaster`] fans out to all +/// configured consumers. +/// +/// Each variant carries enough context for [`format_status_event`] to render a +/// human-readable message without additional lookups. +#[derive(Clone, Debug)] +pub enum StatusEvent { + /// A work item moved between pipeline stages. + StageTransition { + /// Work item ID (e.g. `"42_story_my_feature"`). + story_id: String, + /// Human-readable story name, if available. + story_name: Option, + /// Pipeline stage directory the item moved FROM (e.g. `"2_current"`). + from_stage: String, + /// Pipeline stage directory the item moved TO (e.g. `"3_qa"`). + to_stage: String, + }, + /// A merge operation failed for a story. + MergeFailure { + /// Work item ID (e.g. `"42_story_my_feature"`). + story_id: String, + /// Human-readable story name, if available. + story_name: Option, + /// Human-readable description of the failure. + reason: String, + }, + /// A story was blocked (e.g. retry limit exceeded, empty diff). + StoryBlocked { + /// Work item ID (e.g. `"42_story_my_feature"`). + story_id: String, + /// Human-readable story name, if available. + story_name: Option, + /// Human-readable reason the story was blocked. + reason: String, + }, + /// An agent hit a soft API rate limit. + RateLimitWarning { + /// Work item ID the agent was working on. + story_id: String, + /// Human-readable story name, if available. + story_name: Option, + /// Name of the agent that hit the limit. + agent_name: String, + }, + /// An agent hit a hard API rate limit and will be blocked until `reset_at`. + RateLimitHardBlock { + /// Work item ID the agent was working on. + story_id: String, + /// Human-readable story name, if available. + story_name: Option, + /// Name of the agent that hit the hard limit. + agent_name: String, + /// UTC instant at which the rate limit resets. + reset_at: DateTime, + }, +} + +// ── Subscription ────────────────────────────────────────────────────────────── + +/// A per-consumer handle returned by [`StatusBroadcaster::subscribe`]. +/// +/// Each subscription holds its own [`broadcast::Receiver`] and an independent +/// `enabled` flag. When disabled, [`Subscription::recv`] silently discards +/// incoming events until the subscription is re-enabled. +/// +/// Stories 642/643/644 own the lifecycle of their `Subscription` handles, +/// including when to call [`enable`](Subscription::enable) and +/// [`disable`](Subscription::disable). +pub struct Subscription { + receiver: broadcast::Receiver, + enabled: Arc, +} + +impl Subscription { + /// Enable this subscription — future events will be delivered to the caller. + pub fn enable(&self) { + self.enabled.store(true, Ordering::Relaxed); + } + + /// Disable this subscription — incoming events are silently discarded until + /// [`enable`](Subscription::enable) is called again. + pub fn disable(&self) { + self.enabled.store(false, Ordering::Relaxed); + } + + /// Returns `true` if this subscription is currently enabled. + pub fn is_enabled(&self) -> bool { + self.enabled.load(Ordering::Relaxed) + } + + /// Receive the next event, skipping any that arrive while the subscription + /// is disabled. + /// + /// Returns `None` if the broadcaster has been dropped (channel closed). + /// If the receiver is lagging (events were dropped), the lag is cleared and + /// reception continues. + pub async fn recv(&mut self) -> Option { + loop { + match self.receiver.recv().await { + Ok(event) => { + if self.enabled.load(Ordering::Relaxed) { + return Some(event); + } + // Subscription disabled — discard and wait for next. + } + Err(broadcast::error::RecvError::Lagged(_)) => { + // Dropped some events; continue from current position. + continue; + } + Err(broadcast::error::RecvError::Closed) => return None, + } + } + } + + /// Returns a clone of the `enabled` flag so the owner can share it with + /// another task (e.g. an HTTP handler that toggles the flag). + pub fn enabled_flag(&self) -> Arc { + Arc::clone(&self.enabled) + } +} + +// ── StatusBroadcaster ───────────────────────────────────────────────────────── + +/// A project-scoped pub-sub hub for pipeline status events. +/// +/// Constructed once per project in `main.rs` and stored on +/// [`crate::services::Services`]. Downstream consumers (chat transports, Web +/// UI, agent context) call [`subscribe`](StatusBroadcaster::subscribe) to +/// obtain a [`Subscription`] handle; the broadcaster itself only needs to be +/// called via [`publish`](StatusBroadcaster::publish). +pub struct StatusBroadcaster { + tx: broadcast::Sender, +} + +impl StatusBroadcaster { + /// Create a new broadcaster with a channel capacity of [`CHANNEL_CAPACITY`]. + pub fn new() -> Self { + let (tx, _) = broadcast::channel(CHANNEL_CAPACITY); + Self { tx } + } + + /// Publish an event to all currently active, non-lagging subscribers. + /// + /// If there are no subscribers the send is silently dropped. + pub fn publish(&self, event: StatusEvent) { + let _ = self.tx.send(event); + } + + /// Create a new [`Subscription`] for a downstream consumer. + /// + /// The subscription starts **enabled** by default. Pass the returned + /// handle to the consuming task; call [`Subscription::disable`] / + /// [`Subscription::enable`] to gate delivery independently per consumer. + pub fn subscribe(&self) -> Subscription { + Subscription { + receiver: self.tx.subscribe(), + enabled: Arc::new(AtomicBool::new(true)), + } + } + + /// Create a new [`Subscription`] with an explicit initial enabled state. + /// + /// Useful when a consumer is provisioned before its transport is ready. + pub fn subscribe_with_state(&self, initial_enabled: bool) -> Subscription { + Subscription { + receiver: self.tx.subscribe(), + enabled: Arc::new(AtomicBool::new(initial_enabled)), + } + } +} + +impl Default for StatusBroadcaster { + fn default() -> Self { + Self::new() + } +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + // ── publish / subscribe basics ──────────────────────────────────────────── + + #[tokio::test] + async fn publish_delivers_to_subscriber() { + let broadcaster = StatusBroadcaster::new(); + let mut sub = broadcaster.subscribe(); + + broadcaster.publish(StatusEvent::MergeFailure { + story_id: "1_story_a".to_string(), + story_name: None, + reason: "conflict".to_string(), + }); + + let received = sub.recv().await.expect("should receive event"); + assert!(matches!(received, StatusEvent::MergeFailure { .. })); + } + + #[tokio::test] + async fn publish_fans_out_to_multiple_subscribers() { + let broadcaster = StatusBroadcaster::new(); + let mut sub1 = broadcaster.subscribe(); + let mut sub2 = broadcaster.subscribe(); + + broadcaster.publish(StatusEvent::StoryBlocked { + story_id: "2_story_b".to_string(), + story_name: None, + reason: "retry limit".to_string(), + }); + + let e1 = sub1.recv().await.expect("sub1 should receive"); + let e2 = sub2.recv().await.expect("sub2 should receive"); + assert!(matches!(e1, StatusEvent::StoryBlocked { .. })); + assert!(matches!(e2, StatusEvent::StoryBlocked { .. })); + } + + #[tokio::test] + async fn disabled_subscription_does_not_deliver_until_enabled() { + use tokio::time::{Duration, timeout}; + + let broadcaster = StatusBroadcaster::new(); + let mut sub = broadcaster.subscribe_with_state(false); + + // Publish an event while disabled. + broadcaster.publish(StatusEvent::StoryBlocked { + story_id: "3_story_c".to_string(), + story_name: None, + reason: "empty diff".to_string(), + }); + + // recv() internally discards the buffered StoryBlocked event (disabled), + // then blocks waiting for the next one. The timeout proves it does not + // return the discarded event. + let blocked = timeout(Duration::from_millis(20), sub.recv()).await; + assert!( + blocked.is_err(), + "disabled subscription must not deliver events" + ); + + // Enable and publish another event — should be delivered. + sub.enable(); + broadcaster.publish(StatusEvent::MergeFailure { + story_id: "4_story_d".to_string(), + story_name: None, + reason: "gate failed".to_string(), + }); + + let received = timeout(Duration::from_millis(100), sub.recv()) + .await + .expect("should not time out after enabling") + .expect("channel should be open"); + assert!(matches!(received, StatusEvent::MergeFailure { .. })); + } + + #[tokio::test] + async fn subscribe_with_state_false_starts_disabled() { + let broadcaster = StatusBroadcaster::new(); + let sub = broadcaster.subscribe_with_state(false); + assert!(!sub.is_enabled()); + } + + #[tokio::test] + async fn subscribe_starts_enabled_by_default() { + let broadcaster = StatusBroadcaster::new(); + let sub = broadcaster.subscribe(); + assert!(sub.is_enabled()); + } + + // ── Multi-project isolation ─────────────────────────────────────────────── + + /// Events from project A's broadcaster must not reach project B's subscribers. + #[tokio::test] + async fn multi_project_isolation_events_do_not_cross_broadcasters() { + let broadcaster_a = StatusBroadcaster::new(); + let broadcaster_b = StatusBroadcaster::new(); + + let mut sub_a = broadcaster_a.subscribe(); + let mut sub_b = broadcaster_b.subscribe(); + + // Publish an event only to project A. + broadcaster_a.publish(StatusEvent::StageTransition { + story_id: "10_story_project_a".to_string(), + story_name: Some("Project A Story".to_string()), + from_stage: "1_backlog".to_string(), + to_stage: "2_current".to_string(), + }); + + // Publish a different event only to project B. + broadcaster_b.publish(StatusEvent::MergeFailure { + story_id: "20_story_project_b".to_string(), + story_name: Some("Project B Story".to_string()), + reason: "b conflict".to_string(), + }); + + // sub_a should receive project A's event only. + let from_a = sub_a.recv().await.expect("sub_a should receive event"); + match &from_a { + StatusEvent::StageTransition { story_id, .. } => { + assert_eq!(story_id, "10_story_project_a"); + } + other => panic!("sub_a got wrong event: {other:?}"), + } + + // sub_b should receive project B's event only. + let from_b = sub_b.recv().await.expect("sub_b should receive event"); + match &from_b { + StatusEvent::MergeFailure { story_id, .. } => { + assert_eq!(story_id, "20_story_project_b"); + } + other => panic!("sub_b got wrong event: {other:?}"), + } + + // Verify no cross-contamination: sub_a should have no further events. + use tokio::time::{Duration, timeout}; + let cross = timeout(Duration::from_millis(10), sub_a.recv()).await; + assert!(cross.is_err(), "sub_a should not receive project B's event"); + } + + /// Two projects firing events independently must not affect each other. + #[tokio::test] + async fn multi_project_integration_two_projects_independent() { + let ba = StatusBroadcaster::new(); + let bb = StatusBroadcaster::new(); + + let mut sub_a1 = ba.subscribe(); + let mut sub_a2 = ba.subscribe(); + let mut sub_b1 = bb.subscribe(); + + // Project A fires two events. + ba.publish(StatusEvent::StoryBlocked { + story_id: "1_story_a_blocked".to_string(), + story_name: None, + reason: "retry".to_string(), + }); + ba.publish(StatusEvent::MergeFailure { + story_id: "2_story_a_fail".to_string(), + story_name: None, + reason: "conflict".to_string(), + }); + + // Project B fires one event. + bb.publish(StatusEvent::StageTransition { + story_id: "3_story_b_move".to_string(), + story_name: None, + from_stage: "1_backlog".to_string(), + to_stage: "2_current".to_string(), + }); + + // sub_a1 sees both of project A's events. + let e1 = sub_a1.recv().await.unwrap(); + let e2 = sub_a1.recv().await.unwrap(); + assert!(matches!(e1, StatusEvent::StoryBlocked { .. })); + assert!(matches!(e2, StatusEvent::MergeFailure { .. })); + + // sub_a2 also sees both of project A's events. + let e3 = sub_a2.recv().await.unwrap(); + let e4 = sub_a2.recv().await.unwrap(); + assert!(matches!(e3, StatusEvent::StoryBlocked { .. })); + assert!(matches!(e4, StatusEvent::MergeFailure { .. })); + + // sub_b1 sees only project B's event. + let e5 = sub_b1.recv().await.unwrap(); + assert!(matches!(e5, StatusEvent::StageTransition { .. })); + + // No cross-contamination from A to B. + use tokio::time::{Duration, timeout}; + assert!( + timeout(Duration::from_millis(10), sub_b1.recv()) + .await + .is_err(), + "sub_b1 should not see project A events" + ); + } + + // ── enable/disable flag sharing ─────────────────────────────────────────── + + #[tokio::test] + async fn enabled_flag_shared_between_tasks() { + let broadcaster = StatusBroadcaster::new(); + let sub = broadcaster.subscribe(); + let flag = sub.enabled_flag(); + + // Disable via the shared flag. + flag.store(false, std::sync::atomic::Ordering::Relaxed); + assert!(!sub.is_enabled()); + + // Re-enable via the subscription directly. + sub.enable(); + assert!(sub.is_enabled()); + assert!(flag.load(std::sync::atomic::Ordering::Relaxed)); + } + + // ── no-op when no subscribers ───────────────────────────────────────────── + + #[test] + fn publish_with_no_subscribers_does_not_panic() { + let broadcaster = StatusBroadcaster::new(); + // No subscriber — send is silently dropped. + broadcaster.publish(StatusEvent::MergeFailure { + story_id: "99_story_nobody".to_string(), + story_name: None, + reason: "no one listening".to_string(), + }); + } +} diff --git a/server/src/services.rs b/server/src/services.rs index d8fbde3e..2e4baf0c 100644 --- a/server/src/services.rs +++ b/server/src/services.rs @@ -7,6 +7,7 @@ use crate::agents::AgentPool; use crate::http::context::{PermissionDecision, PermissionForward}; +use crate::service::status::StatusBroadcaster; use std::collections::{HashMap, HashSet}; use std::path::PathBuf; use std::sync::Arc; @@ -35,6 +36,14 @@ pub struct Services { /// Seconds to wait for a user to respond to a permission prompt before /// auto-denying (fail-closed). pub permission_timeout_secs: u64, + /// Project-scoped status broadcaster. + /// + /// Consumers (chat transports, Web UI, agent context) call + /// [`StatusBroadcaster::subscribe`] to receive pipeline status events. + /// The broadcaster is project-scoped: events published here are delivered + /// only to subscribers of this instance, providing natural multi-project + /// isolation. + pub status: Arc, } #[cfg(test)] @@ -52,6 +61,7 @@ impl Services { perm_rx: std::sync::Arc::new(TokioMutex::new(perm_rx)), pending_perm_replies: std::sync::Arc::new(TokioMutex::new(HashMap::new())), permission_timeout_secs: 120, + status: std::sync::Arc::new(StatusBroadcaster::new()), }) } }