//! I/O side of the notifications service. //! //! This is the **only** file inside `service/notifications/` that may perform //! side effects: reading from the CRDT content store, loading configuration, //! and spawning the background listener task. use crate::chat::ChatTransport; use crate::config::ProjectConfig; use crate::io::story_metadata::parse_front_matter; use crate::io::watcher::WatcherEvent; use crate::slog; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Instant; use tokio::sync::broadcast; use super::events::classify; use super::filter::{STAGE_TRANSITION_DEBOUNCE, should_send_rate_limit}; use super::format::{ format_blocked_notification, format_error_notification, format_rate_limit_notification, format_stage_notification, stage_display_name, }; use super::route::rooms_for_notification; /// Read the story name from the CRDT content store's YAML front matter. /// /// Returns `None` if the item is not in the content store or has no parseable name. pub fn read_story_name(_project_root: &Path, _stage: &str, item_id: &str) -> Option { let contents = crate::db::read_content(item_id)?; let meta = parse_front_matter(&contents).ok()?; meta.name } /// Look up a story name from the CRDT content store regardless of stage. /// /// Used for events (like rate-limit warnings) that arrive without a known stage. fn find_story_name_any_stage(project_root: &Path, item_id: &str) -> Option { read_story_name(project_root, "", item_id) } /// Spawn a background task that listens for watcher events and posts /// stage-transition notifications to all configured rooms via the /// [`ChatTransport`] abstraction. /// /// `get_room_ids` is called on each notification to obtain the current list of /// destination room IDs. Pass a closure that returns a static list for Matrix /// and Slack, or one that reads from a runtime `Arc>>` /// for WhatsApp ambient senders. pub fn spawn_notification_listener( transport: Arc, get_room_ids: impl Fn() -> Vec + Send + 'static, watcher_rx: broadcast::Receiver, project_root: PathBuf, ) { tokio::spawn(async move { let mut rx = watcher_rx; // Load initial config; re-loaded on ConfigChanged events. let mut config = ProjectConfig::load(&project_root).unwrap_or_default(); // Tracks when a rate-limit notification was last sent for each // "story_id:agent_name" key, to debounce repeated warnings. let mut rate_limit_last_notified: HashMap = HashMap::new(); // Pending stage-transition notifications, keyed by item_id. // Value: (from_display, to_stage_key, story_name). // Rapid successive transitions for the same item are coalesced: the // original from_display is kept while to_stage_key is updated to the // latest destination, so only one notification fires for the final stage. let mut pending_transitions: HashMap)> = HashMap::new(); let mut flush_deadline: Option = None; loop { // Wait for the next event, or flush pending transitions when the // debounce window expires. let recv_result = if let Some(deadline) = flush_deadline { tokio::time::timeout_at(deadline, rx.recv()).await.ok() } else { Some(rx.recv().await) }; if recv_result.is_none() { // Flush all coalesced stage-transition notifications. for (item_id, (from_display, to_stage_key, story_name)) in pending_transitions.drain() { let to_display = stage_display_name(&to_stage_key); let (plain, html) = format_stage_notification( &item_id, story_name.as_deref(), &from_display, to_display, ); slog!("[bot] Sending stage notification: {plain}"); for room_id in &rooms_for_notification(&get_room_ids) { if let Err(e) = transport.send_message(room_id, &plain, &html).await { slog!("[bot] Failed to send notification to {room_id}: {e}"); } } } flush_deadline = None; continue; } let event = match recv_result.unwrap() { Ok(ev) => ev, Err(broadcast::error::RecvError::Lagged(n)) => { slog!("[bot] Notification listener lagged, skipped {n} events"); continue; } Err(broadcast::error::RecvError::Closed) => { slog!("[bot] Watcher channel closed, stopping notification listener"); // Flush any coalesced transitions that haven't fired yet. for (item_id, (from_display, to_stage_key, story_name)) in pending_transitions.drain() { let to_display = stage_display_name(&to_stage_key); let (plain, html) = format_stage_notification( &item_id, story_name.as_deref(), &from_display, to_display, ); slog!("[bot] Sending stage notification: {plain}"); for room_id in &rooms_for_notification(&get_room_ids) { if let Err(e) = transport.send_message(room_id, &plain, &html).await { slog!("[bot] Failed to send notification to {room_id}: {e}"); } } } break; } }; use super::events::EventAction; match classify(&event) { EventAction::StageTransition => { // WorkItem with a known from_stage — extract the fields. let WatcherEvent::WorkItem { ref stage, ref item_id, ref from_stage, .. } = event else { continue; }; let from_display = stage_display_name(from_stage.as_deref().unwrap_or("")); // Look up the story name in the expected stage directory; fall // back to a full search so stale events still show the name. let story_name = read_story_name(&project_root, stage, item_id) .or_else(|| find_story_name_any_stage(&project_root, item_id)); // Buffer the transition. If this item_id is already pending (rapid // succession), update to_stage_key to the latest destination while // preserving the original from_display. pending_transitions .entry(item_id.clone()) .and_modify(|e| { e.1 = stage.clone(); if story_name.is_some() { e.2 = story_name.clone(); } }) .or_insert_with(|| (from_display.to_string(), stage.clone(), story_name)); // Start or extend the debounce window. flush_deadline = Some(tokio::time::Instant::now() + STAGE_TRANSITION_DEBOUNCE); } EventAction::MergeFailure => { let WatcherEvent::MergeFailure { ref story_id, ref reason, } = event else { continue; }; let story_name = read_story_name(&project_root, "4_merge", story_id); let (plain, html) = format_error_notification(story_id, story_name.as_deref(), reason); slog!("[bot] Sending error notification: {plain}"); for room_id in &rooms_for_notification(&get_room_ids) { if let Err(e) = transport.send_message(room_id, &plain, &html).await { slog!("[bot] Failed to send error notification to {room_id}: {e}"); } } } EventAction::RateLimitWarning => { let WatcherEvent::RateLimitWarning { ref story_id, ref agent_name, } = event else { continue; }; if !config.rate_limit_notifications { slog!( "[bot] RateLimitWarning suppressed by config for \ {story_id}:{agent_name}" ); continue; } let debounce_key = format!("{story_id}:{agent_name}"); let now = Instant::now(); if !should_send_rate_limit( rate_limit_last_notified.get(&debounce_key).copied(), now, ) { slog!( "[bot] Rate-limit notification debounced for \ {story_id}:{agent_name}" ); continue; } rate_limit_last_notified.insert(debounce_key, now); let story_name = find_story_name_any_stage(&project_root, story_id); let (plain, html) = format_rate_limit_notification(story_id, story_name.as_deref(), agent_name); slog!("[bot] Sending rate-limit notification: {plain}"); for room_id in &rooms_for_notification(&get_room_ids) { if let Err(e) = transport.send_message(room_id, &plain, &html).await { slog!( "[bot] Failed to send rate-limit notification \ to {room_id}: {e}" ); } } } EventAction::StoryBlocked => { let WatcherEvent::StoryBlocked { ref story_id, ref reason, } = event else { continue; }; let story_name = find_story_name_any_stage(&project_root, story_id); let (plain, html) = format_blocked_notification(story_id, story_name.as_deref(), reason); slog!("[bot] Sending blocked notification: {plain}"); for room_id in &rooms_for_notification(&get_room_ids) { if let Err(e) = transport.send_message(room_id, &plain, &html).await { slog!("[bot] Failed to send blocked notification to {room_id}: {e}"); } } } EventAction::LogOnly => { // Hard-block: log server-side for debugging; do NOT post to chat. // Hard-block auto-resume is normal operation — the status command // already surfaces rate-limit state via emoji. if let WatcherEvent::RateLimitHardBlock { ref story_id, ref agent_name, reset_at, } = event { slog!( "[bot] Rate-limit hard block for {story_id}/{agent_name}, \ auto-resume at {reset_at}" ); } } EventAction::ReloadConfig => { if let Ok(new_cfg) = ProjectConfig::load(&project_root) { config = new_cfg; } } EventAction::Skip => {} } } }); } #[cfg(test)] mod tests { use super::*; use crate::chat::MessageId; use async_trait::async_trait; // ── MockTransport ─────────────────────────────────────────────────────── type CallLog = Arc>>; /// Records every `send_message` call for inspection in tests. struct MockTransport { calls: CallLog, } impl MockTransport { fn new() -> (Arc, CallLog) { let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); ( Arc::new(Self { calls: Arc::clone(&calls), }), calls, ) } } #[async_trait] impl crate::chat::ChatTransport for MockTransport { async fn send_message( &self, room_id: &str, plain: &str, html: &str, ) -> Result { self.calls.lock().unwrap().push(( room_id.to_string(), plain.to_string(), html.to_string(), )); Ok("mock-msg-id".to_string()) } async fn edit_message( &self, _room_id: &str, _id: &str, _plain: &str, _html: &str, ) -> Result<(), String> { Ok(()) } async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { Ok(()) } } // ── spawn_notification_listener: RateLimitWarning ─────────────────────── /// AC2 + AC3: when a RateLimitWarning event arrives, send_message is called /// with a notification that names the agent and story. #[tokio::test] async fn rate_limit_warning_sends_notification_with_agent_and_story() { let tmp = tempfile::tempdir().unwrap(); // Seed story via CRDT (the only source of truth). crate::db::ensure_content_store(); crate::db::write_item_with_content( "365_story_rate_limit", "2_current", "---\nname: Rate Limit Test Story\n---\n", ); let (watcher_tx, watcher_rx) = broadcast::channel::(16); let (transport, calls) = MockTransport::new(); spawn_notification_listener( transport, || vec!["!room123:example.org".to_string()], watcher_rx, tmp.path().to_path_buf(), ); watcher_tx .send(WatcherEvent::RateLimitWarning { story_id: "365_story_rate_limit".to_string(), agent_name: "coder-1".to_string(), }) .unwrap(); // Give the spawned task time to process the event. tokio::time::sleep(std::time::Duration::from_millis(100)).await; let calls = calls.lock().unwrap(); assert_eq!(calls.len(), 1, "Expected exactly one notification"); let (room_id, plain, _html) = &calls[0]; assert_eq!(room_id, "!room123:example.org"); assert!(plain.contains("365"), "plain should contain story number"); assert!( plain.contains("Rate Limit Test Story"), "plain should contain story name" ); assert!(plain.contains("coder-1"), "plain should contain agent name"); assert!( plain.contains("rate limit"), "plain should mention rate limit" ); } /// AC4: a second RateLimitWarning for the same agent within the debounce /// window must NOT trigger a second notification. #[tokio::test] async fn rate_limit_warning_is_debounced() { let tmp = tempfile::tempdir().unwrap(); let (watcher_tx, watcher_rx) = broadcast::channel::(16); let (transport, calls) = MockTransport::new(); spawn_notification_listener( transport, || vec!["!room1:example.org".to_string()], watcher_rx, tmp.path().to_path_buf(), ); // Send the same warning twice in rapid succession. for _ in 0..2 { watcher_tx .send(WatcherEvent::RateLimitWarning { story_id: "42_story_debounce".to_string(), agent_name: "coder-2".to_string(), }) .unwrap(); } tokio::time::sleep(std::time::Duration::from_millis(100)).await; let calls = calls.lock().unwrap(); assert_eq!( calls.len(), 1, "Debounce should suppress the second notification" ); } /// AC4 (corollary): warnings for different agents are NOT debounced against /// each other — both should produce notifications. #[tokio::test] async fn rate_limit_warnings_for_different_agents_both_notify() { let tmp = tempfile::tempdir().unwrap(); let (watcher_tx, watcher_rx) = broadcast::channel::(16); let (transport, calls) = MockTransport::new(); spawn_notification_listener( transport, || vec!["!room1:example.org".to_string()], watcher_rx, tmp.path().to_path_buf(), ); watcher_tx .send(WatcherEvent::RateLimitWarning { story_id: "42_story_foo".to_string(), agent_name: "coder-1".to_string(), }) .unwrap(); watcher_tx .send(WatcherEvent::RateLimitWarning { story_id: "42_story_foo".to_string(), agent_name: "coder-2".to_string(), }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; let calls = calls.lock().unwrap(); assert_eq!( calls.len(), 2, "Different agents should each trigger a notification" ); } // ── dynamic room IDs (WhatsApp ambient_rooms pattern) ─────────────────── /// Notifications are sent to the rooms returned by the closure at /// notification time, not at listener-spawn time. This verifies that a /// closure backed by a runtime set (e.g. WhatsApp ambient_rooms) delivers /// messages to the rooms present when the event fires. #[tokio::test] async fn stage_notification_uses_dynamic_room_ids() { let tmp = tempfile::tempdir().unwrap(); // Seed story via CRDT (the only source of truth). crate::db::ensure_content_store(); crate::db::write_item_with_content("10_story_foo", "3_qa", "---\nname: Foo Story\n---\n"); let (watcher_tx, watcher_rx) = broadcast::channel::(16); let (transport, calls) = MockTransport::new(); let rooms: Arc>> = Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())); let rooms_for_closure = Arc::clone(&rooms); spawn_notification_listener( transport, move || rooms_for_closure.lock().unwrap().iter().cloned().collect(), watcher_rx, tmp.path().to_path_buf(), ); // Add a room after the listener is spawned (simulates a user messaging first). rooms .lock() .unwrap() .insert("phone:+15551234567".to_string()); watcher_tx .send(WatcherEvent::WorkItem { stage: "3_qa".to_string(), item_id: "10_story_foo".to_string(), action: "qa".to_string(), commit_msg: "huskies: qa 10_story_foo".to_string(), from_stage: Some("2_current".to_string()), }) .unwrap(); // Wait longer than STAGE_TRANSITION_DEBOUNCE (200ms) so the coalesced // notification flushes. tokio::time::sleep(std::time::Duration::from_millis(350)).await; let calls = calls.lock().unwrap(); assert_eq!( calls.len(), 1, "Should deliver to the dynamically added room" ); assert_eq!(calls[0].0, "phone:+15551234567"); assert!( calls[0].1.contains("10"), "plain should contain story number" ); assert!( calls[0].1.contains("Foo Story"), "plain should contain story name" ); } /// When no rooms are registered (e.g. no WhatsApp users have messaged yet), /// no notifications are sent and the listener does not panic. #[tokio::test] async fn stage_notification_with_no_rooms_is_silent() { let tmp = tempfile::tempdir().unwrap(); let (watcher_tx, watcher_rx) = broadcast::channel::(16); let (transport, calls) = MockTransport::new(); spawn_notification_listener(transport, Vec::new, watcher_rx, tmp.path().to_path_buf()); watcher_tx .send(WatcherEvent::WorkItem { stage: "3_qa".to_string(), item_id: "10_story_foo".to_string(), action: "qa".to_string(), commit_msg: "huskies: qa 10_story_foo".to_string(), from_stage: Some("2_current".to_string()), }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; let calls = calls.lock().unwrap(); assert_eq!(calls.len(), 0, "No rooms means no notifications"); } // ── read_story_name ───────────────────────────────────────────────────── #[test] fn read_story_name_reads_from_front_matter() { crate::db::ensure_content_store(); crate::db::write_item_with_content( "9942_story_my_feature", "2_current", "---\nname: My Cool Feature\n---\n# Story\n", ); let tmp = tempfile::tempdir().unwrap(); let name = read_story_name(tmp.path(), "2_current", "9942_story_my_feature"); assert_eq!(name.as_deref(), Some("My Cool Feature")); } #[test] fn read_story_name_returns_none_for_missing_file() { crate::db::ensure_content_store(); let tmp = tempfile::tempdir().unwrap(); let name = read_story_name(tmp.path(), "2_current", "99_story_missing_notif_test"); assert_eq!(name, None); } #[test] fn read_story_name_returns_none_for_missing_name_field() { crate::db::ensure_content_store(); crate::db::write_item_with_content( "9943_story_no_name", "2_current", "---\ncoverage_baseline: 50%\n---\n# Story\n", ); let tmp = tempfile::tempdir().unwrap(); let name = read_story_name(tmp.path(), "2_current", "9943_story_no_name"); assert_eq!(name, None); } // ── spawn_notification_listener: StoryBlocked ─────────────────────────── /// AC1: when a StoryBlocked event arrives, send_message is called with a /// notification that includes the story number, name, and reason. #[tokio::test] async fn story_blocked_sends_notification_with_reason() { let tmp = tempfile::tempdir().unwrap(); // Seed story via CRDT (the only source of truth). crate::db::ensure_content_store(); crate::db::write_item_with_content( "425_story_blocking_test", "2_current", "---\nname: Blocking Test Story\n---\n", ); let (watcher_tx, watcher_rx) = broadcast::channel::(16); let (transport, calls) = MockTransport::new(); spawn_notification_listener( transport, || vec!["!room123:example.org".to_string()], watcher_rx, tmp.path().to_path_buf(), ); watcher_tx .send(WatcherEvent::StoryBlocked { story_id: "425_story_blocking_test".to_string(), reason: "Retry limit exceeded (3/3) at coder stage".to_string(), }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; let calls = calls.lock().unwrap(); assert_eq!(calls.len(), 1, "Expected exactly one notification"); let (room_id, plain, html) = &calls[0]; assert_eq!(room_id, "!room123:example.org"); assert!(plain.contains("425"), "plain should contain story number"); assert!( plain.contains("Blocking Test Story"), "plain should contain story name" ); assert!( plain.contains("BLOCKED"), "plain should contain BLOCKED label" ); assert!( plain.contains("Retry limit exceeded"), "plain should contain the reason" ); assert!( html.contains("BLOCKED"), "html should contain BLOCKED label" ); } /// StoryBlocked with no room registered should not panic. #[tokio::test] async fn story_blocked_with_no_rooms_is_silent() { let tmp = tempfile::tempdir().unwrap(); let (watcher_tx, watcher_rx) = broadcast::channel::(16); let (transport, calls) = MockTransport::new(); spawn_notification_listener(transport, Vec::new, watcher_rx, tmp.path().to_path_buf()); watcher_tx .send(WatcherEvent::StoryBlocked { story_id: "42_story_no_rooms".to_string(), reason: "empty diff".to_string(), }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; let calls = calls.lock().unwrap(); assert_eq!(calls.len(), 0, "No rooms means no notifications"); } // ── rate_limit_notifications config flag ───────────────────────────────── /// AC1+AC2: when rate_limit_notifications = false in project.toml, /// RateLimitWarning events are suppressed (no send_message call). #[tokio::test] async fn rate_limit_warning_suppressed_when_config_false() { let tmp = tempfile::tempdir().unwrap(); let sk_dir = tmp.path().join(".huskies"); std::fs::create_dir_all(&sk_dir).unwrap(); std::fs::write( sk_dir.join("project.toml"), "rate_limit_notifications = false\n", ) .unwrap(); let (watcher_tx, watcher_rx) = broadcast::channel::(16); let (transport, calls) = MockTransport::new(); spawn_notification_listener( transport, || vec!["!room1:example.org".to_string()], watcher_rx, tmp.path().to_path_buf(), ); watcher_tx .send(WatcherEvent::RateLimitWarning { story_id: "42_story_suppress".to_string(), agent_name: "coder-1".to_string(), }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; let calls = calls.lock().unwrap(); assert_eq!( calls.len(), 0, "RateLimitWarning should be suppressed when rate_limit_notifications = false" ); } /// RateLimitHardBlock is never posted to Matrix — it is logged server-side only. #[tokio::test] async fn rate_limit_hard_block_never_sent_to_matrix() { let tmp = tempfile::tempdir().unwrap(); let (watcher_tx, watcher_rx) = broadcast::channel::(16); let (transport, calls) = MockTransport::new(); spawn_notification_listener( transport, || vec!["!room1:example.org".to_string()], watcher_rx, tmp.path().to_path_buf(), ); let reset_at = chrono::Utc::now() + chrono::Duration::hours(1); watcher_tx .send(WatcherEvent::RateLimitHardBlock { story_id: "42_story_hard_block".to_string(), agent_name: "coder-1".to_string(), reset_at, }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; let calls = calls.lock().unwrap(); assert_eq!(calls.len(), 0, "RateLimitHardBlock must not post to Matrix"); } /// AC3: StoryBlocked is always sent regardless of rate_limit_notifications. #[tokio::test] async fn story_blocked_always_sent_when_config_false() { let tmp = tempfile::tempdir().unwrap(); let sk_dir = tmp.path().join(".huskies"); std::fs::create_dir_all(&sk_dir).unwrap(); std::fs::write( sk_dir.join("project.toml"), "rate_limit_notifications = false\n", ) .unwrap(); let (watcher_tx, watcher_rx) = broadcast::channel::(16); let (transport, calls) = MockTransport::new(); spawn_notification_listener( transport, || vec!["!room1:example.org".to_string()], watcher_rx, tmp.path().to_path_buf(), ); watcher_tx .send(WatcherEvent::StoryBlocked { story_id: "42_story_blocked".to_string(), reason: "retry limit exceeded".to_string(), }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; let calls = calls.lock().unwrap(); assert_eq!(calls.len(), 1, "StoryBlocked should always be sent"); } /// AC5: Config is hot-reloaded — disabling rate_limit_notifications after /// startup suppresses subsequent RateLimitWarning events. #[tokio::test] async fn rate_limit_warning_suppressed_after_hot_reload() { let tmp = tempfile::tempdir().unwrap(); let sk_dir = tmp.path().join(".huskies"); std::fs::create_dir_all(&sk_dir).unwrap(); // Start with notifications enabled. std::fs::write( sk_dir.join("project.toml"), "rate_limit_notifications = true\n", ) .unwrap(); let (watcher_tx, watcher_rx) = broadcast::channel::(16); let (transport, calls) = MockTransport::new(); spawn_notification_listener( transport, || vec!["!room1:example.org".to_string()], watcher_rx, tmp.path().to_path_buf(), ); // First warning is sent. watcher_tx .send(WatcherEvent::RateLimitWarning { story_id: "42_story_reload".to_string(), agent_name: "coder-1".to_string(), }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; // Disable notifications and trigger hot-reload. std::fs::write( sk_dir.join("project.toml"), "rate_limit_notifications = false\n", ) .unwrap(); watcher_tx.send(WatcherEvent::ConfigChanged).unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; // Second warning (different agent to bypass debounce) should be suppressed. watcher_tx .send(WatcherEvent::RateLimitWarning { story_id: "42_story_reload".to_string(), agent_name: "coder-2".to_string(), }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; let calls = calls.lock().unwrap(); assert_eq!( calls.len(), 1, "Only the first warning should be sent; second should be suppressed after hot-reload" ); } // ── Bug 549: synthetic events with from_stage=None must not notify ────── /// Synthetic events (reassign, creation) have from_stage=None and must /// not produce stage-transition notifications. Before the fix, the /// inferred_from_stage fallback would emit e.g. "QA → Merge" for a /// reassign event within the merge stage. #[tokio::test] async fn synthetic_event_without_from_stage_does_not_notify() { let tmp = tempfile::tempdir().unwrap(); let (watcher_tx, watcher_rx) = broadcast::channel::(16); let (transport, calls) = MockTransport::new(); spawn_notification_listener( transport, || vec!["!room1:example.org".to_string()], watcher_rx, tmp.path().to_path_buf(), ); // Synthetic reassign event within 4_merge — no actual stage change. watcher_tx .send(WatcherEvent::WorkItem { stage: "4_merge".to_string(), item_id: "549_story_skip_qa".to_string(), action: "reassign".to_string(), commit_msg: String::new(), from_stage: None, }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(350)).await; let calls = calls.lock().unwrap(); assert_eq!( calls.len(), 0, "Synthetic events with from_stage=None must not generate notifications" ); } /// Stories that skip QA (qa: server) move directly from Current to Merge. /// The notification must say "Current → Merge", not "QA → Merge". #[tokio::test] async fn skip_qa_shows_current_to_merge_not_qa_to_merge() { let tmp = tempfile::tempdir().unwrap(); let stage_dir = tmp.path().join(".huskies").join("work").join("4_merge"); std::fs::create_dir_all(&stage_dir).unwrap(); std::fs::write( stage_dir.join("549_story_skip_qa.md"), "---\nname: Skip QA Story\n---\n", ) .unwrap(); let (watcher_tx, watcher_rx) = broadcast::channel::(16); let (transport, calls) = MockTransport::new(); spawn_notification_listener( transport, || vec!["!room1:example.org".to_string()], watcher_rx, tmp.path().to_path_buf(), ); // Story skips QA: from_stage is 2_current, not 3_qa. watcher_tx .send(WatcherEvent::WorkItem { stage: "4_merge".to_string(), item_id: "549_story_skip_qa".to_string(), action: "merge".to_string(), commit_msg: "huskies: merge 549_story_skip_qa".to_string(), from_stage: Some("2_current".to_string()), }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(350)).await; let calls = calls.lock().unwrap(); assert_eq!(calls.len(), 1, "Should send exactly one notification"); assert!( calls[0].1.contains("Current \u{2192} Merge"), "Notification should say 'Current → Merge', got: {}", calls[0].1 ); assert!( !calls[0].1.contains("QA \u{2192} Merge"), "Must NOT say 'QA → Merge' when QA was skipped" ); } }