From 4520e0e6f932a8373949b75886290b323b9c75c7 Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 14 May 2026 07:51:16 +0000 Subject: [PATCH] huskies: merge 995 --- server/src/service/notifications/events.rs | 20 +- .../src/service/notifications/io/listener.rs | 120 +-------- server/src/service/notifications/io/mod.rs | 6 + .../notifications/io/stage_subscriber.rs | 119 ++++++++ .../service/notifications/io/tests_stage.rs | 197 +------------- .../notifications/io/tests_transition.rs | 255 ++++++++++++++++++ server/src/service/notifications/mod.rs | 1 + server/src/startup/bots.rs | 24 ++ 8 files changed, 423 insertions(+), 319 deletions(-) create mode 100644 server/src/service/notifications/io/stage_subscriber.rs create mode 100644 server/src/service/notifications/io/tests_transition.rs diff --git a/server/src/service/notifications/events.rs b/server/src/service/notifications/events.rs index d52fd8e4..fb12935a 100644 --- a/server/src/service/notifications/events.rs +++ b/server/src/service/notifications/events.rs @@ -9,8 +9,6 @@ use crate::io::watcher::WatcherEvent; /// The notification action to take in response to a [`WatcherEvent`]. #[derive(Debug, PartialEq)] pub enum EventAction { - /// Post a stage-transition notification; the event carries a known source stage. - StageTransition, /// Post a merge-failure error notification. MergeFailure, /// Post a rate-limit warning (subject to config/debounce suppression). @@ -39,15 +37,9 @@ pub enum EventAction { /// Classify a [`WatcherEvent`] into the action the notification listener should take. pub fn classify(event: &WatcherEvent) -> EventAction { match event { - WatcherEvent::WorkItem { from_stage, .. } => { - if from_stage.is_some() { - EventAction::StageTransition - } else { - // Synthetic events (creation, reassign) have no from_stage. - // Posting a notification for these would produce incorrect messages. - EventAction::Skip - } - } + // Stage-change notifications are now handled by the TransitionFired subscriber + // (story 995). WorkItem events are skipped regardless of from_stage. + WatcherEvent::WorkItem { .. } => EventAction::Skip, WatcherEvent::MergeFailure { .. } => EventAction::MergeFailure, WatcherEvent::RateLimitWarning { .. } => EventAction::RateLimitWarning, WatcherEvent::StoryBlocked { .. } => EventAction::StoryBlocked, @@ -77,10 +69,12 @@ mod tests { } } + // Stage-change notifications moved to TransitionFired subscriber (story 995). + // All WorkItem events are now classified as Skip regardless of from_stage. #[test] - fn work_item_with_from_stage_is_stage_transition() { + fn work_item_with_from_stage_is_skip() { let event = work_item(Some("2_current")); - assert_eq!(classify(&event), EventAction::StageTransition); + assert_eq!(classify(&event), EventAction::Skip); } #[test] diff --git a/server/src/service/notifications/io/listener.rs b/server/src/service/notifications/io/listener.rs index 4fb8d707..b614f4a8 100644 --- a/server/src/service/notifications/io/listener.rs +++ b/server/src/service/notifications/io/listener.rs @@ -4,7 +4,6 @@ use crate::chat::ChatTransport; use crate::config::ProjectConfig; use crate::io::watcher::WatcherEvent; -use crate::pipeline_state::Stage; use crate::slog; use std::collections::HashMap; use std::path::PathBuf; @@ -13,14 +12,11 @@ use std::time::Instant; use tokio::sync::broadcast; use super::super::events::classify; -use super::super::filter::{ - AGENT_EVENT_DEBOUNCE, STAGE_TRANSITION_DEBOUNCE, should_send_rate_limit, -}; +use super::super::filter::{AGENT_EVENT_DEBOUNCE, should_send_rate_limit}; use super::super::format::{ format_agent_completed_notification, format_agent_started_notification, format_blocked_notification, format_error_notification, format_oauth_account_swapped, - format_oauth_accounts_exhausted, format_rate_limit_notification, format_stage_notification, - merge_failure_snippet, + format_oauth_accounts_exhausted, format_rate_limit_notification, merge_failure_snippet, }; use super::super::route::rooms_for_notification; use super::{find_story_name_any_stage, read_story_name}; @@ -47,14 +43,6 @@ pub fn spawn_notification_listener( // "story_id:agent_name" key, to debounce repeated warnings. let mut rate_limit_last_notified: HashMap = HashMap::new(); - // Pending stage-transition notifications, keyed by item_id. - // Value: (from_stage, to_stage, story_name). - // Rapid successive transitions for the same item are coalesced: the - // original `from_stage` is kept while `to_stage` is updated to the - // latest destination, so only one notification fires for the final stage. - let mut pending_transitions: HashMap = HashMap::new(); - let mut flush_deadline: Option = None; - // Pending agent-status notifications, keyed by "{story_id}:{event_kind}". // Value: (plain, html). Rapid successive events for the same story and // event kind are coalesced: only the latest is sent after the debounce @@ -63,48 +51,17 @@ pub fn spawn_notification_listener( let mut agent_flush_deadline: Option = None; loop { - // Pick the earliest of the two debounce deadlines. - let earliest_deadline = match (flush_deadline, agent_flush_deadline) { - (Some(a), Some(b)) => Some(a.min(b)), - (Some(a), None) => Some(a), - (None, Some(b)) => Some(b), - (None, None) => None, - }; - - // Wait for the next event, or flush pending notifications when the - // earliest debounce window expires. - let recv_result = if let Some(deadline) = earliest_deadline { + // Wait for the next event, or flush pending agent notifications when + // the debounce window expires. + let recv_result = if let Some(deadline) = agent_flush_deadline { tokio::time::timeout_at(deadline, rx.recv()).await.ok() } else { Some(rx.recv().await) }; if recv_result.is_none() { - let now = tokio::time::Instant::now(); - // Flush stage transitions if their deadline has passed. - if flush_deadline.is_some_and(|d| d <= now) { - for (item_id, (from_stage, to_stage, story_name)) in pending_transitions.drain() - { - let (plain, html) = format_stage_notification( - &item_id, - &story_name, - &from_stage, - &to_stage, - ); - slog!("[bot] Sending stage notification: {plain}"); - if config.status_push_enabled { - for room_id in &rooms_for_notification(&get_room_ids) { - if let Err(e) = transport.send_message(room_id, &plain, &html).await - { - slog!("[bot] Failed to send notification to {room_id}: {e}"); - } - } - } - } - flush_deadline = None; - } // Flush agent events if their deadline has passed. - if agent_flush_deadline.is_some_and(|d| d <= now) { + if agent_flush_deadline.is_some_and(|d| d <= tokio::time::Instant::now()) { for (_key, (plain, html)) in pending_agent_events.drain() { slog!("[bot] Sending agent notification: {plain}"); if config.status_push_enabled { @@ -131,25 +88,7 @@ pub fn spawn_notification_listener( } Err(broadcast::error::RecvError::Closed) => { slog!("[bot] Watcher channel closed, stopping notification listener"); - // Flush any coalesced transitions that haven't fired yet. if config.status_push_enabled { - for (item_id, (from_stage, to_stage, story_name)) in - pending_transitions.drain() - { - let (plain, html) = format_stage_notification( - &item_id, - &story_name, - &from_stage, - &to_stage, - ); - slog!("[bot] Sending stage notification: {plain}"); - for room_id in &rooms_for_notification(&get_room_ids) { - if let Err(e) = transport.send_message(room_id, &plain, &html).await - { - slog!("[bot] Failed to send notification to {room_id}: {e}"); - } - } - } for (_key, (plain, html)) in pending_agent_events.drain() { slog!("[bot] Sending agent notification: {plain}"); for room_id in &rooms_for_notification(&get_room_ids) { @@ -168,53 +107,6 @@ pub fn spawn_notification_listener( use super::super::events::EventAction; match classify(&event) { - EventAction::StageTransition => { - if !config.status_push_enabled { - continue; - } - // WorkItem with a known from_stage — extract the fields. - let WatcherEvent::WorkItem { - ref stage, - ref item_id, - ref from_stage, - .. - } = event - else { - continue; - }; - let from_typed = from_stage - .as_deref() - .and_then(Stage::from_dir) - .unwrap_or(Stage::Upcoming); - let to_typed = Stage::from_dir(stage).unwrap_or(Stage::Upcoming); - - // Look up the story name in the expected stage directory; fall - // back to a full search so stale events still show the name. - let story_name = { - let n = read_story_name(&project_root, stage, item_id); - if n.is_empty() { - find_story_name_any_stage(&project_root, item_id) - } else { - n - } - }; - - // Buffer the transition. If this item_id is already pending (rapid - // succession), update the destination stage to the latest while - // preserving the original from_stage. - pending_transitions - .entry(item_id.clone()) - .and_modify(|e| { - e.1 = to_typed.clone(); - if !story_name.is_empty() { - e.2 = story_name.clone(); - } - }) - .or_insert_with(|| (from_typed, to_typed, story_name)); - - // Start or extend the debounce window. - flush_deadline = Some(tokio::time::Instant::now() + STAGE_TRANSITION_DEBOUNCE); - } EventAction::MergeFailure => { if !config.status_push_enabled { continue; diff --git a/server/src/service/notifications/io/mod.rs b/server/src/service/notifications/io/mod.rs index ed1087e0..94faffaa 100644 --- a/server/src/service/notifications/io/mod.rs +++ b/server/src/service/notifications/io/mod.rs @@ -9,12 +9,18 @@ use std::path::Path; mod listener; pub use listener::spawn_notification_listener; +/// Subscriber that fires stage-change notifications via the pipeline event bus. +pub(super) mod stage_subscriber; +pub use stage_subscriber::spawn_stage_notification_subscriber; + #[cfg(test)] mod mock_transport; #[cfg(test)] mod tests_notifications; #[cfg(test)] mod tests_stage; +#[cfg(test)] +mod tests_transition; /// Read the story name from the typed CRDT register. /// diff --git a/server/src/service/notifications/io/stage_subscriber.rs b/server/src/service/notifications/io/stage_subscriber.rs new file mode 100644 index 00000000..c453933e --- /dev/null +++ b/server/src/service/notifications/io/stage_subscriber.rs @@ -0,0 +1,119 @@ +//! Stage-transition notification subscriber. +//! +//! Subscribes to the [`TransitionFired`] broadcast channel and dispatches +//! chat notifications for each significant pipeline stage change. +//! This is the **single** emitter of stage-change notifications; all other +//! call sites have been removed (story 995). + +use crate::chat::ChatTransport; +use crate::config::ProjectConfig; +use crate::pipeline_state::Stage; +use crate::service::notifications::filter::STAGE_TRANSITION_DEBOUNCE; +use crate::service::notifications::format::format_stage_notification; +use crate::service::notifications::route::rooms_for_notification; +use crate::slog; +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::broadcast; + +/// Spawn a background task that subscribes to typed [`TransitionFired`] events +/// and posts stage-change notifications to all configured chat rooms. +/// +/// Coalesces rapid successive transitions for the same item into a single +/// notification (200 ms debounce window) so only the final stage is announced. +/// Config is read once at startup; `status_push_enabled = false` suppresses all +/// notifications from this subscriber. +pub fn spawn_stage_notification_subscriber( + transport: Arc, + get_room_ids: impl Fn() -> Vec + Send + Sync + 'static, + project_root: PathBuf, +) { + let mut rx = crate::pipeline_state::subscribe_transitions(); + tokio::spawn(async move { + let config = ProjectConfig::load(&project_root).unwrap_or_default(); + + // Pending notifications keyed by story_id: (from_stage, to_stage, story_name). + // Rapid transitions are coalesced: original from preserved, to updated. + let mut pending: HashMap = HashMap::new(); + let mut flush_deadline: Option = None; + + loop { + let recv_result = if let Some(deadline) = flush_deadline { + tokio::time::timeout_at(deadline, rx.recv()).await.ok() + } else { + Some(rx.recv().await) + }; + + // Timeout → flush coalesced notifications. + if recv_result.is_none() { + if flush_deadline.is_some_and(|d| d <= tokio::time::Instant::now()) { + send_pending(&pending, &transport, &get_room_ids).await; + pending.clear(); + flush_deadline = None; + } + continue; + } + + let fired = match recv_result.unwrap() { + Ok(f) => f, + Err(broadcast::error::RecvError::Lagged(n)) => { + slog!("[bot/transition] Subscriber lagged; skipped {n} event(s)"); + continue; + } + Err(broadcast::error::RecvError::Closed) => { + send_pending(&pending, &transport, &get_room_ids).await; + break; + } + }; + + if !config.status_push_enabled { + continue; + } + + // Upcoming is the only stage we suppress; every other stage + // arrival produces a notification. + if matches!(&fired.after, Stage::Upcoming) { + continue; + } + + let story_name = + super::read_story_name(&project_root, fired.after.dir_name(), &fired.story_id.0); + + // Coalesce: keep original from_stage, update to_stage and name. + pending + .entry(fired.story_id.0.clone()) + .and_modify(|e| { + e.1 = fired.after.clone(); + if !story_name.is_empty() { + e.2 = story_name.clone(); + } + }) + .or_insert_with(|| (fired.before.clone(), fired.after.clone(), story_name)); + + // Set the deadline once from the first arriving event so that + // concurrent test broadcasts on the global channel do not keep + // pushing the window out and starving the flush. + if flush_deadline.is_none() { + flush_deadline = Some(tokio::time::Instant::now() + STAGE_TRANSITION_DEBOUNCE); + } + } + }); +} + +/// Send all pending stage-transition notifications and log each one. +async fn send_pending( + pending: &HashMap, + transport: &Arc, + get_room_ids: &(impl Fn() -> Vec + Sync), +) { + for (item_id, (from_stage, to_stage, story_name)) in pending { + let (plain, html) = format_stage_notification(item_id, story_name, from_stage, to_stage); + slog!("[bot/transition] Sending stage notification: {plain}"); + for room_id in &rooms_for_notification(get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!("[bot/transition] Failed to send notification to {room_id}: {e}"); + } + } + } +} diff --git a/server/src/service/notifications/io/tests_stage.rs b/server/src/service/notifications/io/tests_stage.rs index acf98112..2b765919 100644 --- a/server/src/service/notifications/io/tests_stage.rs +++ b/server/src/service/notifications/io/tests_stage.rs @@ -1,106 +1,9 @@ -//! Tests for stage-transition notifications and `read_story_name`. +//! Tests for `read_story_name`. +//! +//! Stage-transition notification tests have moved to `tests_transition.rs` +//! which uses `spawn_stage_notification_subscriber` and real CRDT transitions. -use super::mock_transport::MockTransport; -use super::{read_story_name, spawn_notification_listener}; -use crate::io::watcher::WatcherEvent; -use std::sync::Arc; -use tokio::sync::broadcast; - -// ── dynamic room IDs (WhatsApp ambient_rooms pattern) ─────────────────────── - -/// Notifications are sent to the rooms returned by the closure at -/// notification time, not at listener-spawn time. This verifies that a -/// closure backed by a runtime set (e.g. WhatsApp ambient_rooms) delivers -/// messages to the rooms present when the event fires. -#[tokio::test] -async fn stage_notification_uses_dynamic_room_ids() { - let tmp = tempfile::tempdir().unwrap(); - // Seed story via CRDT (the only source of truth). - crate::db::ensure_content_store(); - crate::db::write_item_with_content( - "10_story_foo", - "3_qa", - "---\nname: Foo Story\n---\n", - crate::db::ItemMeta::named("Foo Story"), - ); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - let rooms: Arc>> = - Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())); - let rooms_for_closure = Arc::clone(&rooms); - - spawn_notification_listener( - transport, - move || rooms_for_closure.lock().unwrap().iter().cloned().collect(), - watcher_rx, - tmp.path().to_path_buf(), - ); - - // Add a room after the listener is spawned (simulates a user messaging first). - rooms - .lock() - .unwrap() - .insert("phone:+15551234567".to_string()); - - watcher_tx - .send(WatcherEvent::WorkItem { - stage: "qa".to_string(), - item_id: "10_story_foo".to_string(), - action: "qa".to_string(), - commit_msg: "huskies: qa 10_story_foo".to_string(), - from_stage: Some("coding".to_string()), - }) - .unwrap(); - - // Wait longer than STAGE_TRANSITION_DEBOUNCE (200ms) so the coalesced - // notification flushes. - tokio::time::sleep(std::time::Duration::from_millis(350)).await; - - let calls = calls.lock().unwrap(); - assert_eq!( - calls.len(), - 1, - "Should deliver to the dynamically added room" - ); - assert_eq!(calls[0].0, "phone:+15551234567"); - assert!( - calls[0].1.contains("10"), - "plain should contain story number" - ); - assert!( - calls[0].1.contains("Foo Story"), - "plain should contain story name" - ); -} - -/// When no rooms are registered (e.g. no WhatsApp users have messaged yet), -/// no notifications are sent and the listener does not panic. -#[tokio::test] -async fn stage_notification_with_no_rooms_is_silent() { - let tmp = tempfile::tempdir().unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener(transport, Vec::new, watcher_rx, tmp.path().to_path_buf()); - - watcher_tx - .send(WatcherEvent::WorkItem { - stage: "qa".to_string(), - item_id: "10_story_foo".to_string(), - action: "qa".to_string(), - commit_msg: "huskies: qa 10_story_foo".to_string(), - from_stage: Some("coding".to_string()), - }) - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 0, "No rooms means no notifications"); -} +use super::read_story_name; // ── read_story_name ────────────────────────────────────────────────────────── @@ -141,93 +44,3 @@ fn read_story_name_returns_none_for_missing_name_field() { let name = read_story_name(tmp.path(), "2_current", "9943_story_no_name"); assert!(name.is_empty()); } - -// ── Bug 549: synthetic events with from_stage=None must not notify ─────────── - -/// Synthetic events (reassign, creation) have from_stage=None and must -/// not produce stage-transition notifications. Before the fix, the -/// inferred_from_stage fallback would emit e.g. "QA → Merge" for a -/// reassign event within the merge stage. -#[tokio::test] -async fn synthetic_event_without_from_stage_does_not_notify() { - let tmp = tempfile::tempdir().unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room1:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - // Synthetic reassign event within 4_merge — no actual stage change. - watcher_tx - .send(WatcherEvent::WorkItem { - stage: "merge".to_string(), - item_id: "549_story_skip_qa".to_string(), - action: "reassign".to_string(), - commit_msg: String::new(), - from_stage: None, - }) - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(350)).await; - - let calls = calls.lock().unwrap(); - assert_eq!( - calls.len(), - 0, - "Synthetic events with from_stage=None must not generate notifications" - ); -} - -/// Stories that skip QA (qa: server) move directly from Current to Merge. -/// The notification must say "Current → Merge", not "QA → Merge". -#[tokio::test] -async fn skip_qa_shows_current_to_merge_not_qa_to_merge() { - let tmp = tempfile::tempdir().unwrap(); - let stage_dir = tmp.path().join(".huskies").join("work").join("4_merge"); - std::fs::create_dir_all(&stage_dir).unwrap(); - std::fs::write( - stage_dir.join("549_story_skip_qa.md"), - "---\nname: Skip QA Story\n---\n", - ) - .unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room1:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - // Story skips QA: from_stage is 2_current, not 3_qa. - watcher_tx - .send(WatcherEvent::WorkItem { - stage: "merge".to_string(), - item_id: "549_story_skip_qa".to_string(), - action: "merge".to_string(), - commit_msg: "huskies: merge 549_story_skip_qa".to_string(), - from_stage: Some("coding".to_string()), - }) - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(350)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 1, "Should send exactly one notification"); - assert!( - calls[0].1.contains("Current \u{2192} Merge"), - "Notification should say 'Current → Merge', got: {}", - calls[0].1 - ); - assert!( - !calls[0].1.contains("QA \u{2192} Merge"), - "Must NOT say 'QA → Merge' when QA was skipped" - ); -} diff --git a/server/src/service/notifications/io/tests_transition.rs b/server/src/service/notifications/io/tests_transition.rs new file mode 100644 index 00000000..5617ae7c --- /dev/null +++ b/server/src/service/notifications/io/tests_transition.rs @@ -0,0 +1,255 @@ +//! Tests for `spawn_stage_notification_subscriber`. +//! +//! Each test triggers real CRDT pipeline transitions so that `TransitionFired` +//! events propagate through the global broadcast channel to the subscriber. +//! Unique story IDs avoid interference between concurrently-running tests. + +use super::mock_transport::MockTransport; +use super::stage_subscriber::spawn_stage_notification_subscriber; +use std::path::PathBuf; + +fn tmp_root() -> PathBuf { + tempfile::tempdir().unwrap().keep() +} + +fn setup_story(item_id: &str, stage_dir: &str, name: &str) { + crate::db::ensure_content_store(); + crate::db::write_item_with_content( + item_id, + stage_dir, + &format!("---\nname: {name}\n---\n"), + crate::db::ItemMeta::named(name), + ); +} + +// ── Backlog → Coding ────────────────────────────────────────────────────────── + +/// Transitioning a story from Backlog to Coding must produce a notification +/// that names the story and says "Current". +#[tokio::test] +async fn backlog_to_coding_sends_notification() { + crate::crdt_state::init_for_test(); + setup_story("9951_story_b2c", "1_backlog", "Backlog to Coding"); + + let (transport, calls) = MockTransport::new(); + spawn_stage_notification_subscriber( + transport, + || vec!["!room-b2c:example.org".to_string()], + tmp_root(), + ); + + crate::agents::lifecycle::move_story_to_current("9951_story_b2c") + .expect("move to current must succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(350)).await; + + let calls = calls.lock().unwrap(); + let hit = calls + .iter() + .any(|(_, plain, _)| plain.contains("9951") && plain.contains("Current")); + assert!( + hit, + "Expected a 'Current' notification for story 9951; got: {calls:?}" + ); +} + +// ── Coding → QA ────────────────────────────────────────────────────────────── + +/// Transitioning a story from Coding to QA must produce a notification +/// that names the story and says "QA". +#[tokio::test] +async fn coding_to_qa_sends_notification() { + crate::crdt_state::init_for_test(); + setup_story("9952_story_c2q", "2_current", "Coding to QA"); + + let (transport, calls) = MockTransport::new(); + spawn_stage_notification_subscriber( + transport, + || vec!["!room-c2q:example.org".to_string()], + tmp_root(), + ); + + crate::agents::lifecycle::move_story_to_qa("9952_story_c2q").expect("move to qa must succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(350)).await; + + let calls = calls.lock().unwrap(); + let hit = calls + .iter() + .any(|(_, plain, _)| plain.contains("9952") && plain.contains("QA")); + assert!( + hit, + "Expected a 'QA' notification for story 9952; got: {calls:?}" + ); +} + +// ── Merge → Done ───────────────────────────────────────────────────────────── + +/// Transitioning a story to Done must produce a notification with the party +/// emoji and say "Done". +#[tokio::test] +async fn merge_to_done_sends_party_notification() { + crate::crdt_state::init_for_test(); + // Start in Merge stage so we can call move_story_to_done. + setup_story("9953_story_m2d", "4_merge", "Merge to Done"); + + let (transport, calls) = MockTransport::new(); + spawn_stage_notification_subscriber( + transport, + || vec!["!room-m2d:example.org".to_string()], + tmp_root(), + ); + + crate::agents::lifecycle::move_story_to_done("9953_story_m2d") + .expect("move to done must succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(350)).await; + + let calls = calls.lock().unwrap(); + let hit = calls.iter().any(|(_, plain, _)| { + plain.contains("9953") && plain.contains("Done") && plain.contains('\u{1f389}') + }); + assert!( + hit, + "Expected a party-emoji Done notification for story 9953; got: {calls:?}" + ); +} + +// ── Coding → QA → Merge coalescing ─────────────────────────────────────────── + +/// Rapid successive transitions for the same item are coalesced so only the +/// final stage is announced in a single notification. +#[tokio::test] +async fn rapid_transitions_are_coalesced() { + crate::crdt_state::init_for_test(); + setup_story("9954_story_coalesce", "2_current", "Coalesce Test"); + + let (transport, calls) = MockTransport::new(); + spawn_stage_notification_subscriber( + transport, + || vec!["!room-coal:example.org".to_string()], + tmp_root(), + ); + + // Coding → QA → Merge in rapid succession (no sleep between). + crate::agents::lifecycle::move_story_to_qa("9954_story_coalesce") + .expect("move to qa must succeed"); + crate::agents::lifecycle::move_story_to_merge("9954_story_coalesce") + .expect("move to merge must succeed"); + + // Wait for the debounce to flush. + tokio::time::sleep(std::time::Duration::from_millis(350)).await; + + let calls = calls.lock().unwrap(); + // Filter to only this story's notifications. + let story_calls: Vec<_> = calls + .iter() + .filter(|(_, plain, _)| plain.contains("9954")) + .collect(); + + // Exactly one notification for this story (the coalesced final stage). + assert_eq!( + story_calls.len(), + 1, + "Rapid transitions must be coalesced into one notification; got: {story_calls:?}" + ); + // Final destination must be Merge. + assert!( + story_calls[0].1.contains("Merge"), + "Coalesced notification must mention the final stage (Merge); got: {}", + story_calls[0].1 + ); +} + +// ── Dynamic room IDs ────────────────────────────────────────────────────────── + +/// The subscriber calls the room-ID closure at notification time, so rooms +/// added after the subscriber is spawned are still reached. +#[tokio::test] +async fn dynamic_room_ids_are_resolved_at_notification_time() { + use std::sync::Arc; + crate::crdt_state::init_for_test(); + setup_story("9955_story_dynroom", "1_backlog", "Dynamic Room"); + + let (transport, calls) = MockTransport::new(); + let rooms: Arc>> = Arc::new(std::sync::Mutex::new(Vec::new())); + let rooms_for_closure = Arc::clone(&rooms); + + spawn_stage_notification_subscriber( + transport, + move || rooms_for_closure.lock().unwrap().clone(), + tmp_root(), + ); + + // Add a room AFTER spawning the subscriber (simulates WhatsApp first message). + rooms.lock().unwrap().push("phone:+15559990001".to_string()); + + crate::agents::lifecycle::move_story_to_current("9955_story_dynroom") + .expect("move to current must succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(350)).await; + + let calls = calls.lock().unwrap(); + let hit = calls + .iter() + .any(|(room, plain, _)| room == "phone:+15559990001" && plain.contains("9955")); + assert!( + hit, + "Must deliver to the dynamically-added room; got: {calls:?}" + ); +} + +// ── No rooms → silent ──────────────────────────────────────────────────────── + +/// When no rooms are registered the subscriber must not panic and must send +/// nothing. +#[tokio::test] +async fn no_rooms_produces_no_notifications() { + crate::crdt_state::init_for_test(); + setup_story("9956_story_noroom", "1_backlog", "No Room Test"); + + let (transport, calls) = MockTransport::new(); + spawn_stage_notification_subscriber(transport, Vec::new, tmp_root()); + + crate::agents::lifecycle::move_story_to_current("9956_story_noroom") + .expect("move to current must succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(350)).await; + + let calls = calls.lock().unwrap(); + let hits: Vec<_> = calls + .iter() + .filter(|(_, plain, _)| plain.contains("9956")) + .collect(); + assert!(hits.is_empty(), "No rooms means no notifications"); +} + +// ── story_name included ─────────────────────────────────────────────────────── + +/// The notification must include the story name read from the CRDT. +#[tokio::test] +async fn notification_includes_story_name() { + crate::crdt_state::init_for_test(); + setup_story("9957_story_named", "1_backlog", "Named Story Feature"); + + let (transport, calls) = MockTransport::new(); + spawn_stage_notification_subscriber( + transport, + || vec!["!room-name:example.org".to_string()], + tmp_root(), + ); + + crate::agents::lifecycle::move_story_to_current("9957_story_named") + .expect("move to current must succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(350)).await; + + let calls = calls.lock().unwrap(); + let hit = calls + .iter() + .any(|(_, plain, _)| plain.contains("9957") && plain.contains("Named Story Feature")); + assert!( + hit, + "Notification must include the story name; got: {calls:?}" + ); +} diff --git a/server/src/service/notifications/mod.rs b/server/src/service/notifications/mod.rs index 69a7c36a..d0dced20 100644 --- a/server/src/service/notifications/mod.rs +++ b/server/src/service/notifications/mod.rs @@ -22,6 +22,7 @@ pub use format::{ format_blocked_notification, format_error_notification, format_stage_notification, }; pub use io::spawn_notification_listener; +pub use io::spawn_stage_notification_subscriber; // ── Error type ──────────────────────────────────────────────────────────────── diff --git a/server/src/startup/bots.rs b/server/src/startup/bots.rs index 6a9b13a4..95dd4c1b 100644 --- a/server/src/startup/bots.rs +++ b/server/src/startup/bots.rs @@ -226,6 +226,14 @@ pub(crate) fn spawn_notification_listeners( watcher_rx_for_whatsapp, root.clone(), ); + { + let ambient_rooms = Arc::clone(&ctx.services.ambient_rooms); + service::notifications::spawn_stage_notification_subscriber( + Arc::clone(&ctx.transport), + move || ambient_rooms.lock().unwrap().iter().cloned().collect(), + root.clone(), + ); + } { use crate::service::status::format::format_status_event; @@ -267,6 +275,14 @@ pub(crate) fn spawn_notification_listeners( watcher_rx_for_slack, root.clone(), ); + { + let channel_ids: Vec = ctx.channel_ids.iter().cloned().collect(); + service::notifications::spawn_stage_notification_subscriber( + Arc::clone(&ctx.transport) as Arc, + move || channel_ids.clone(), + root.clone(), + ); + } { use crate::service::status::format::format_status_event; @@ -308,6 +324,14 @@ pub(crate) fn spawn_notification_listeners( watcher_rx_for_discord, root.clone(), ); + { + let channel_ids: Vec = ctx.channel_ids.iter().cloned().collect(); + service::notifications::spawn_stage_notification_subscriber( + Arc::clone(&ctx.transport) as Arc, + move || channel_ids.clone(), + root.clone(), + ); + } { use crate::service::status::format::format_status_event;