diff --git a/server/src/service/notifications/io.rs b/server/src/service/notifications/io.rs deleted file mode 100644 index d3b69395..00000000 --- a/server/src/service/notifications/io.rs +++ /dev/null @@ -1,1026 +0,0 @@ -//! I/O side of the notifications service. -//! -//! This is the **only** file inside `service/notifications/` that may perform -//! side effects: reading from the CRDT content store, loading configuration, -//! and spawning the background listener task. - -use crate::chat::ChatTransport; -use crate::config::ProjectConfig; -use crate::io::story_metadata::parse_front_matter; -use crate::io::watcher::WatcherEvent; -use crate::slog; -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use std::time::Instant; -use tokio::sync::broadcast; - -use super::events::classify; -use super::filter::{STAGE_TRANSITION_DEBOUNCE, should_send_rate_limit}; -use super::format::{ - format_blocked_notification, format_error_notification, format_oauth_account_swapped, - format_oauth_accounts_exhausted, format_rate_limit_notification, format_stage_notification, - stage_display_name, -}; -use super::route::rooms_for_notification; - -/// Read the story name from the CRDT content store's YAML front matter. -/// -/// Returns `None` if the item is not in the content store or has no parseable name. -pub fn read_story_name(_project_root: &Path, _stage: &str, item_id: &str) -> Option { - let contents = crate::db::read_content(item_id)?; - let meta = parse_front_matter(&contents).ok()?; - meta.name -} - -/// Look up a story name from the CRDT content store regardless of stage. -/// -/// Used for events (like rate-limit warnings) that arrive without a known stage. -fn find_story_name_any_stage(project_root: &Path, item_id: &str) -> Option { - read_story_name(project_root, "", item_id) -} - -/// Spawn a background task that listens for watcher events and posts -/// stage-transition notifications to all configured rooms via the -/// [`ChatTransport`] abstraction. -/// -/// `get_room_ids` is called on each notification to obtain the current list of -/// destination room IDs. Pass a closure that returns a static list for Matrix -/// and Slack, or one that reads from a runtime `Arc>>` -/// for WhatsApp ambient senders. -pub fn spawn_notification_listener( - transport: Arc, - get_room_ids: impl Fn() -> Vec + Send + 'static, - watcher_rx: broadcast::Receiver, - project_root: PathBuf, -) { - tokio::spawn(async move { - let mut rx = watcher_rx; - // Load initial config; re-loaded on ConfigChanged events. - let mut config = ProjectConfig::load(&project_root).unwrap_or_default(); - // Tracks when a rate-limit notification was last sent for each - // "story_id:agent_name" key, to debounce repeated warnings. - let mut rate_limit_last_notified: HashMap = HashMap::new(); - - // Pending stage-transition notifications, keyed by item_id. - // Value: (from_display, to_stage_key, story_name). - // Rapid successive transitions for the same item are coalesced: the - // original from_display is kept while to_stage_key is updated to the - // latest destination, so only one notification fires for the final stage. - let mut pending_transitions: HashMap)> = - HashMap::new(); - let mut flush_deadline: Option = None; - - loop { - // Wait for the next event, or flush pending transitions when the - // debounce window expires. - let recv_result = if let Some(deadline) = flush_deadline { - tokio::time::timeout_at(deadline, rx.recv()).await.ok() - } else { - Some(rx.recv().await) - }; - - if recv_result.is_none() { - // Flush all coalesced stage-transition notifications. - for (item_id, (from_display, to_stage_key, story_name)) in - pending_transitions.drain() - { - let to_display = stage_display_name(&to_stage_key); - let (plain, html) = format_stage_notification( - &item_id, - story_name.as_deref(), - &from_display, - to_display, - ); - slog!("[bot] Sending stage notification: {plain}"); - for room_id in &rooms_for_notification(&get_room_ids) { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!("[bot] Failed to send notification to {room_id}: {e}"); - } - } - } - flush_deadline = None; - continue; - } - - let event = match recv_result.unwrap() { - Ok(ev) => ev, - Err(broadcast::error::RecvError::Lagged(n)) => { - slog!("[bot] Notification listener lagged, skipped {n} events"); - continue; - } - Err(broadcast::error::RecvError::Closed) => { - slog!("[bot] Watcher channel closed, stopping notification listener"); - // Flush any coalesced transitions that haven't fired yet. - for (item_id, (from_display, to_stage_key, story_name)) in - pending_transitions.drain() - { - let to_display = stage_display_name(&to_stage_key); - let (plain, html) = format_stage_notification( - &item_id, - story_name.as_deref(), - &from_display, - to_display, - ); - slog!("[bot] Sending stage notification: {plain}"); - for room_id in &rooms_for_notification(&get_room_ids) { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!("[bot] Failed to send notification to {room_id}: {e}"); - } - } - } - break; - } - }; - - use super::events::EventAction; - match classify(&event) { - EventAction::StageTransition => { - // WorkItem with a known from_stage — extract the fields. - let WatcherEvent::WorkItem { - ref stage, - ref item_id, - ref from_stage, - .. - } = event - else { - continue; - }; - let from_display = stage_display_name(from_stage.as_deref().unwrap_or("")); - - // Look up the story name in the expected stage directory; fall - // back to a full search so stale events still show the name. - let story_name = read_story_name(&project_root, stage, item_id) - .or_else(|| find_story_name_any_stage(&project_root, item_id)); - - // Buffer the transition. If this item_id is already pending (rapid - // succession), update to_stage_key to the latest destination while - // preserving the original from_display. - pending_transitions - .entry(item_id.clone()) - .and_modify(|e| { - e.1 = stage.clone(); - if story_name.is_some() { - e.2 = story_name.clone(); - } - }) - .or_insert_with(|| (from_display.to_string(), stage.clone(), story_name)); - - // Start or extend the debounce window. - flush_deadline = Some(tokio::time::Instant::now() + STAGE_TRANSITION_DEBOUNCE); - } - EventAction::MergeFailure => { - let WatcherEvent::MergeFailure { - ref story_id, - ref reason, - } = event - else { - continue; - }; - let story_name = read_story_name(&project_root, "4_merge", story_id); - let (plain, html) = - format_error_notification(story_id, story_name.as_deref(), reason); - slog!("[bot] Sending error notification: {plain}"); - for room_id in &rooms_for_notification(&get_room_ids) { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!("[bot] Failed to send error notification to {room_id}: {e}"); - } - } - } - EventAction::RateLimitWarning => { - let WatcherEvent::RateLimitWarning { - ref story_id, - ref agent_name, - } = event - else { - continue; - }; - if !config.rate_limit_notifications { - slog!( - "[bot] RateLimitWarning suppressed by config for \ - {story_id}:{agent_name}" - ); - continue; - } - let debounce_key = format!("{story_id}:{agent_name}"); - let now = Instant::now(); - if !should_send_rate_limit( - rate_limit_last_notified.get(&debounce_key).copied(), - now, - ) { - slog!( - "[bot] Rate-limit notification debounced for \ - {story_id}:{agent_name}" - ); - continue; - } - rate_limit_last_notified.insert(debounce_key, now); - let story_name = find_story_name_any_stage(&project_root, story_id); - let (plain, html) = - format_rate_limit_notification(story_id, story_name.as_deref(), agent_name); - slog!("[bot] Sending rate-limit notification: {plain}"); - for room_id in &rooms_for_notification(&get_room_ids) { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!( - "[bot] Failed to send rate-limit notification \ - to {room_id}: {e}" - ); - } - } - } - EventAction::StoryBlocked => { - let WatcherEvent::StoryBlocked { - ref story_id, - ref reason, - } = event - else { - continue; - }; - let story_name = find_story_name_any_stage(&project_root, story_id); - let (plain, html) = - format_blocked_notification(story_id, story_name.as_deref(), reason); - slog!("[bot] Sending blocked notification: {plain}"); - for room_id in &rooms_for_notification(&get_room_ids) { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!("[bot] Failed to send blocked notification to {room_id}: {e}"); - } - } - } - EventAction::OAuthAccountSwapped => { - let WatcherEvent::OAuthAccountSwapped { ref new_email } = event else { - continue; - }; - let (plain, html) = format_oauth_account_swapped(new_email); - slog!("[bot] Sending OAuth account-swap notification: {plain}"); - for room_id in &rooms_for_notification(&get_room_ids) { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!( - "[bot] Failed to send OAuth account-swap notification \ - to {room_id}: {e}" - ); - } - } - } - EventAction::OAuthAccountsExhausted => { - let WatcherEvent::OAuthAccountsExhausted { - ref earliest_reset_msg, - } = event - else { - continue; - }; - let (plain, html) = format_oauth_accounts_exhausted(earliest_reset_msg); - slog!("[bot] Sending OAuth accounts-exhausted notification: {plain}"); - for room_id in &rooms_for_notification(&get_room_ids) { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!( - "[bot] Failed to send OAuth accounts-exhausted notification \ - to {room_id}: {e}" - ); - } - } - } - EventAction::LogOnly => { - // Hard-block: log server-side for debugging; do NOT post to chat. - // Hard-block auto-resume is normal operation — the status command - // already surfaces rate-limit state via emoji. - if let WatcherEvent::RateLimitHardBlock { - ref story_id, - ref agent_name, - reset_at, - } = event - { - slog!( - "[bot] Rate-limit hard block for {story_id}/{agent_name}, \ - auto-resume at {reset_at}" - ); - } - } - EventAction::ReloadConfig => { - if let Ok(new_cfg) = ProjectConfig::load(&project_root) { - config = new_cfg; - } - } - EventAction::Skip => {} - } - } - }); -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::chat::MessageId; - use async_trait::async_trait; - - // ── MockTransport ─────────────────────────────────────────────────────── - - type CallLog = Arc>>; - - /// Records every `send_message` call for inspection in tests. - struct MockTransport { - calls: CallLog, - } - - impl MockTransport { - fn new() -> (Arc, CallLog) { - let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); - ( - Arc::new(Self { - calls: Arc::clone(&calls), - }), - calls, - ) - } - } - - #[async_trait] - impl crate::chat::ChatTransport for MockTransport { - async fn send_message( - &self, - room_id: &str, - plain: &str, - html: &str, - ) -> Result { - self.calls.lock().unwrap().push(( - room_id.to_string(), - plain.to_string(), - html.to_string(), - )); - Ok("mock-msg-id".to_string()) - } - - async fn edit_message( - &self, - _room_id: &str, - _id: &str, - _plain: &str, - _html: &str, - ) -> Result<(), String> { - Ok(()) - } - - async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { - Ok(()) - } - } - - // ── spawn_notification_listener: RateLimitWarning ─────────────────────── - - /// AC2 + AC3: when a RateLimitWarning event arrives, send_message is called - /// with a notification that names the agent and story. - #[tokio::test] - async fn rate_limit_warning_sends_notification_with_agent_and_story() { - let tmp = tempfile::tempdir().unwrap(); - // Seed story via CRDT (the only source of truth). - crate::db::ensure_content_store(); - crate::db::write_item_with_content( - "365_story_rate_limit", - "2_current", - "---\nname: Rate Limit Test Story\n---\n", - ); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room123:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - watcher_tx - .send(WatcherEvent::RateLimitWarning { - story_id: "365_story_rate_limit".to_string(), - agent_name: "coder-1".to_string(), - }) - .unwrap(); - - // Give the spawned task time to process the event. - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 1, "Expected exactly one notification"); - let (room_id, plain, _html) = &calls[0]; - assert_eq!(room_id, "!room123:example.org"); - assert!(plain.contains("365"), "plain should contain story number"); - assert!( - plain.contains("Rate Limit Test Story"), - "plain should contain story name" - ); - assert!(plain.contains("coder-1"), "plain should contain agent name"); - assert!( - plain.contains("rate limit"), - "plain should mention rate limit" - ); - } - - /// AC4: a second RateLimitWarning for the same agent within the debounce - /// window must NOT trigger a second notification. - #[tokio::test] - async fn rate_limit_warning_is_debounced() { - let tmp = tempfile::tempdir().unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room1:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - // Send the same warning twice in rapid succession. - for _ in 0..2 { - watcher_tx - .send(WatcherEvent::RateLimitWarning { - story_id: "42_story_debounce".to_string(), - agent_name: "coder-2".to_string(), - }) - .unwrap(); - } - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!( - calls.len(), - 1, - "Debounce should suppress the second notification" - ); - } - - /// AC4 (corollary): warnings for different agents are NOT debounced against - /// each other — both should produce notifications. - #[tokio::test] - async fn rate_limit_warnings_for_different_agents_both_notify() { - let tmp = tempfile::tempdir().unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room1:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - watcher_tx - .send(WatcherEvent::RateLimitWarning { - story_id: "42_story_foo".to_string(), - agent_name: "coder-1".to_string(), - }) - .unwrap(); - watcher_tx - .send(WatcherEvent::RateLimitWarning { - story_id: "42_story_foo".to_string(), - agent_name: "coder-2".to_string(), - }) - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!( - calls.len(), - 2, - "Different agents should each trigger a notification" - ); - } - - // ── dynamic room IDs (WhatsApp ambient_rooms pattern) ─────────────────── - - /// Notifications are sent to the rooms returned by the closure at - /// notification time, not at listener-spawn time. This verifies that a - /// closure backed by a runtime set (e.g. WhatsApp ambient_rooms) delivers - /// messages to the rooms present when the event fires. - #[tokio::test] - async fn stage_notification_uses_dynamic_room_ids() { - let tmp = tempfile::tempdir().unwrap(); - // Seed story via CRDT (the only source of truth). - crate::db::ensure_content_store(); - crate::db::write_item_with_content("10_story_foo", "3_qa", "---\nname: Foo Story\n---\n"); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - let rooms: Arc>> = - Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())); - let rooms_for_closure = Arc::clone(&rooms); - - spawn_notification_listener( - transport, - move || rooms_for_closure.lock().unwrap().iter().cloned().collect(), - watcher_rx, - tmp.path().to_path_buf(), - ); - - // Add a room after the listener is spawned (simulates a user messaging first). - rooms - .lock() - .unwrap() - .insert("phone:+15551234567".to_string()); - - watcher_tx - .send(WatcherEvent::WorkItem { - stage: "3_qa".to_string(), - item_id: "10_story_foo".to_string(), - action: "qa".to_string(), - commit_msg: "huskies: qa 10_story_foo".to_string(), - from_stage: Some("2_current".to_string()), - }) - .unwrap(); - - // Wait longer than STAGE_TRANSITION_DEBOUNCE (200ms) so the coalesced - // notification flushes. - tokio::time::sleep(std::time::Duration::from_millis(350)).await; - - let calls = calls.lock().unwrap(); - assert_eq!( - calls.len(), - 1, - "Should deliver to the dynamically added room" - ); - assert_eq!(calls[0].0, "phone:+15551234567"); - assert!( - calls[0].1.contains("10"), - "plain should contain story number" - ); - assert!( - calls[0].1.contains("Foo Story"), - "plain should contain story name" - ); - } - - /// When no rooms are registered (e.g. no WhatsApp users have messaged yet), - /// no notifications are sent and the listener does not panic. - #[tokio::test] - async fn stage_notification_with_no_rooms_is_silent() { - let tmp = tempfile::tempdir().unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener(transport, Vec::new, watcher_rx, tmp.path().to_path_buf()); - - watcher_tx - .send(WatcherEvent::WorkItem { - stage: "3_qa".to_string(), - item_id: "10_story_foo".to_string(), - action: "qa".to_string(), - commit_msg: "huskies: qa 10_story_foo".to_string(), - from_stage: Some("2_current".to_string()), - }) - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 0, "No rooms means no notifications"); - } - - // ── read_story_name ───────────────────────────────────────────────────── - - #[test] - fn read_story_name_reads_from_front_matter() { - crate::db::ensure_content_store(); - crate::db::write_item_with_content( - "9942_story_my_feature", - "2_current", - "---\nname: My Cool Feature\n---\n# Story\n", - ); - - let tmp = tempfile::tempdir().unwrap(); - let name = read_story_name(tmp.path(), "2_current", "9942_story_my_feature"); - assert_eq!(name.as_deref(), Some("My Cool Feature")); - } - - #[test] - fn read_story_name_returns_none_for_missing_file() { - crate::db::ensure_content_store(); - let tmp = tempfile::tempdir().unwrap(); - let name = read_story_name(tmp.path(), "2_current", "99_story_missing_notif_test"); - assert_eq!(name, None); - } - - #[test] - fn read_story_name_returns_none_for_missing_name_field() { - crate::db::ensure_content_store(); - crate::db::write_item_with_content( - "9943_story_no_name", - "2_current", - "---\ncoverage_baseline: 50%\n---\n# Story\n", - ); - - let tmp = tempfile::tempdir().unwrap(); - let name = read_story_name(tmp.path(), "2_current", "9943_story_no_name"); - assert_eq!(name, None); - } - - // ── spawn_notification_listener: StoryBlocked ─────────────────────────── - - /// AC1: when a StoryBlocked event arrives, send_message is called with a - /// notification that includes the story number, name, and reason. - #[tokio::test] - async fn story_blocked_sends_notification_with_reason() { - let tmp = tempfile::tempdir().unwrap(); - // Seed story via CRDT (the only source of truth). - crate::db::ensure_content_store(); - crate::db::write_item_with_content( - "425_story_blocking_test", - "2_current", - "---\nname: Blocking Test Story\n---\n", - ); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room123:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - watcher_tx - .send(WatcherEvent::StoryBlocked { - story_id: "425_story_blocking_test".to_string(), - reason: "Retry limit exceeded (3/3) at coder stage".to_string(), - }) - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 1, "Expected exactly one notification"); - let (room_id, plain, html) = &calls[0]; - assert_eq!(room_id, "!room123:example.org"); - assert!(plain.contains("425"), "plain should contain story number"); - assert!( - plain.contains("Blocking Test Story"), - "plain should contain story name" - ); - assert!( - plain.contains("BLOCKED"), - "plain should contain BLOCKED label" - ); - assert!( - plain.contains("Retry limit exceeded"), - "plain should contain the reason" - ); - assert!( - html.contains("BLOCKED"), - "html should contain BLOCKED label" - ); - } - - /// StoryBlocked with no room registered should not panic. - #[tokio::test] - async fn story_blocked_with_no_rooms_is_silent() { - let tmp = tempfile::tempdir().unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener(transport, Vec::new, watcher_rx, tmp.path().to_path_buf()); - - watcher_tx - .send(WatcherEvent::StoryBlocked { - story_id: "42_story_no_rooms".to_string(), - reason: "empty diff".to_string(), - }) - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 0, "No rooms means no notifications"); - } - - // ── rate_limit_notifications config flag ───────────────────────────────── - - /// AC1+AC2: when rate_limit_notifications = false in project.toml, - /// RateLimitWarning events are suppressed (no send_message call). - #[tokio::test] - async fn rate_limit_warning_suppressed_when_config_false() { - let tmp = tempfile::tempdir().unwrap(); - let sk_dir = tmp.path().join(".huskies"); - std::fs::create_dir_all(&sk_dir).unwrap(); - std::fs::write( - sk_dir.join("project.toml"), - "rate_limit_notifications = false\n", - ) - .unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room1:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - watcher_tx - .send(WatcherEvent::RateLimitWarning { - story_id: "42_story_suppress".to_string(), - agent_name: "coder-1".to_string(), - }) - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!( - calls.len(), - 0, - "RateLimitWarning should be suppressed when rate_limit_notifications = false" - ); - } - - /// RateLimitHardBlock is never posted to Matrix — it is logged server-side only. - #[tokio::test] - async fn rate_limit_hard_block_never_sent_to_matrix() { - let tmp = tempfile::tempdir().unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room1:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - let reset_at = chrono::Utc::now() + chrono::Duration::hours(1); - watcher_tx - .send(WatcherEvent::RateLimitHardBlock { - story_id: "42_story_hard_block".to_string(), - agent_name: "coder-1".to_string(), - reset_at, - }) - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 0, "RateLimitHardBlock must not post to Matrix"); - } - - /// AC3: StoryBlocked is always sent regardless of rate_limit_notifications. - #[tokio::test] - async fn story_blocked_always_sent_when_config_false() { - let tmp = tempfile::tempdir().unwrap(); - let sk_dir = tmp.path().join(".huskies"); - std::fs::create_dir_all(&sk_dir).unwrap(); - std::fs::write( - sk_dir.join("project.toml"), - "rate_limit_notifications = false\n", - ) - .unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room1:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - watcher_tx - .send(WatcherEvent::StoryBlocked { - story_id: "42_story_blocked".to_string(), - reason: "retry limit exceeded".to_string(), - }) - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 1, "StoryBlocked should always be sent"); - } - - /// AC5: Config is hot-reloaded — disabling rate_limit_notifications after - /// startup suppresses subsequent RateLimitWarning events. - #[tokio::test] - async fn rate_limit_warning_suppressed_after_hot_reload() { - let tmp = tempfile::tempdir().unwrap(); - let sk_dir = tmp.path().join(".huskies"); - std::fs::create_dir_all(&sk_dir).unwrap(); - // Start with notifications enabled. - std::fs::write( - sk_dir.join("project.toml"), - "rate_limit_notifications = true\n", - ) - .unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room1:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - // First warning is sent. - watcher_tx - .send(WatcherEvent::RateLimitWarning { - story_id: "42_story_reload".to_string(), - agent_name: "coder-1".to_string(), - }) - .unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - // Disable notifications and trigger hot-reload. - std::fs::write( - sk_dir.join("project.toml"), - "rate_limit_notifications = false\n", - ) - .unwrap(); - watcher_tx.send(WatcherEvent::ConfigChanged).unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - // Second warning (different agent to bypass debounce) should be suppressed. - watcher_tx - .send(WatcherEvent::RateLimitWarning { - story_id: "42_story_reload".to_string(), - agent_name: "coder-2".to_string(), - }) - .unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!( - calls.len(), - 1, - "Only the first warning should be sent; second should be suppressed after hot-reload" - ); - } - - // ── Bug 549: synthetic events with from_stage=None must not notify ────── - - /// Synthetic events (reassign, creation) have from_stage=None and must - /// not produce stage-transition notifications. Before the fix, the - /// inferred_from_stage fallback would emit e.g. "QA → Merge" for a - /// reassign event within the merge stage. - #[tokio::test] - async fn synthetic_event_without_from_stage_does_not_notify() { - let tmp = tempfile::tempdir().unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room1:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - // Synthetic reassign event within 4_merge — no actual stage change. - watcher_tx - .send(WatcherEvent::WorkItem { - stage: "4_merge".to_string(), - item_id: "549_story_skip_qa".to_string(), - action: "reassign".to_string(), - commit_msg: String::new(), - from_stage: None, - }) - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(350)).await; - - let calls = calls.lock().unwrap(); - assert_eq!( - calls.len(), - 0, - "Synthetic events with from_stage=None must not generate notifications" - ); - } - - // ── OAuthAccountSwapped / OAuthAccountsExhausted ──────────────────────── - - /// AC1: OAuthAccountSwapped fires a notification naming the new account. - #[tokio::test] - async fn oauth_account_swapped_sends_notification_with_new_email() { - let tmp = tempfile::tempdir().unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room1:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - watcher_tx - .send(WatcherEvent::OAuthAccountSwapped { - new_email: "alice@example.com".to_string(), - }) - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 1, "Expected exactly one notification"); - let (room_id, plain, _html) = &calls[0]; - assert_eq!(room_id, "!room1:example.org"); - assert!( - plain.contains("alice@example.com"), - "notification should name the new account; got: {plain}" - ); - } - - /// AC2: OAuthAccountsExhausted fires a notification with the reset message. - #[tokio::test] - async fn oauth_accounts_exhausted_sends_notification_with_reset_msg() { - let tmp = tempfile::tempdir().unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room1:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - watcher_tx - .send(WatcherEvent::OAuthAccountsExhausted { - earliest_reset_msg: "All OAuth accounts are rate-limited; earliest reset in 3h" - .to_string(), - }) - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 1, "Expected exactly one notification"); - let (room_id, plain, _html) = &calls[0]; - assert_eq!(room_id, "!room1:example.org"); - assert!( - plain.contains("rate-limited"), - "notification should contain reset message; got: {plain}" - ); - } - - /// Stories that skip QA (qa: server) move directly from Current to Merge. - /// The notification must say "Current → Merge", not "QA → Merge". - #[tokio::test] - async fn skip_qa_shows_current_to_merge_not_qa_to_merge() { - let tmp = tempfile::tempdir().unwrap(); - let stage_dir = tmp.path().join(".huskies").join("work").join("4_merge"); - std::fs::create_dir_all(&stage_dir).unwrap(); - std::fs::write( - stage_dir.join("549_story_skip_qa.md"), - "---\nname: Skip QA Story\n---\n", - ) - .unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room1:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - // Story skips QA: from_stage is 2_current, not 3_qa. - watcher_tx - .send(WatcherEvent::WorkItem { - stage: "4_merge".to_string(), - item_id: "549_story_skip_qa".to_string(), - action: "merge".to_string(), - commit_msg: "huskies: merge 549_story_skip_qa".to_string(), - from_stage: Some("2_current".to_string()), - }) - .unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(350)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 1, "Should send exactly one notification"); - assert!( - calls[0].1.contains("Current \u{2192} Merge"), - "Notification should say 'Current → Merge', got: {}", - calls[0].1 - ); - assert!( - !calls[0].1.contains("QA \u{2192} Merge"), - "Must NOT say 'QA → Merge' when QA was skipped" - ); - } -} diff --git a/server/src/service/notifications/io/listener.rs b/server/src/service/notifications/io/listener.rs new file mode 100644 index 00000000..59b7bf57 --- /dev/null +++ b/server/src/service/notifications/io/listener.rs @@ -0,0 +1,288 @@ +//! Background listener task: receives [`WatcherEvent`]s and dispatches +//! stage-transition and alert notifications to all configured chat rooms. + +use crate::chat::ChatTransport; +use crate::config::ProjectConfig; +use crate::io::watcher::WatcherEvent; +use crate::slog; +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; +use tokio::sync::broadcast; + +use super::super::events::classify; +use super::super::filter::{STAGE_TRANSITION_DEBOUNCE, should_send_rate_limit}; +use super::super::format::{ + format_blocked_notification, format_error_notification, format_oauth_account_swapped, + format_oauth_accounts_exhausted, format_rate_limit_notification, format_stage_notification, + stage_display_name, +}; +use super::super::route::rooms_for_notification; +use super::{find_story_name_any_stage, read_story_name}; + +/// Spawn a background task that listens for watcher events and posts +/// stage-transition notifications to all configured rooms via the +/// [`ChatTransport`] abstraction. +/// +/// `get_room_ids` is called on each notification to obtain the current list of +/// destination room IDs. Pass a closure that returns a static list for Matrix +/// and Slack, or one that reads from a runtime `Arc>>` +/// for WhatsApp ambient senders. +pub fn spawn_notification_listener( + transport: Arc, + get_room_ids: impl Fn() -> Vec + Send + 'static, + watcher_rx: broadcast::Receiver, + project_root: PathBuf, +) { + tokio::spawn(async move { + let mut rx = watcher_rx; + // Load initial config; re-loaded on ConfigChanged events. + let mut config = ProjectConfig::load(&project_root).unwrap_or_default(); + // Tracks when a rate-limit notification was last sent for each + // "story_id:agent_name" key, to debounce repeated warnings. + let mut rate_limit_last_notified: HashMap = HashMap::new(); + + // Pending stage-transition notifications, keyed by item_id. + // Value: (from_display, to_stage_key, story_name). + // Rapid successive transitions for the same item are coalesced: the + // original from_display is kept while to_stage_key is updated to the + // latest destination, so only one notification fires for the final stage. + let mut pending_transitions: HashMap)> = + HashMap::new(); + let mut flush_deadline: Option = None; + + loop { + // Wait for the next event, or flush pending transitions when the + // debounce window expires. + let recv_result = if let Some(deadline) = flush_deadline { + tokio::time::timeout_at(deadline, rx.recv()).await.ok() + } else { + Some(rx.recv().await) + }; + + if recv_result.is_none() { + // Flush all coalesced stage-transition notifications. + for (item_id, (from_display, to_stage_key, story_name)) in + pending_transitions.drain() + { + let to_display = stage_display_name(&to_stage_key); + let (plain, html) = format_stage_notification( + &item_id, + story_name.as_deref(), + &from_display, + to_display, + ); + slog!("[bot] Sending stage notification: {plain}"); + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!("[bot] Failed to send notification to {room_id}: {e}"); + } + } + } + flush_deadline = None; + continue; + } + + let event = match recv_result.unwrap() { + Ok(ev) => ev, + Err(broadcast::error::RecvError::Lagged(n)) => { + slog!("[bot] Notification listener lagged, skipped {n} events"); + continue; + } + Err(broadcast::error::RecvError::Closed) => { + slog!("[bot] Watcher channel closed, stopping notification listener"); + // Flush any coalesced transitions that haven't fired yet. + for (item_id, (from_display, to_stage_key, story_name)) in + pending_transitions.drain() + { + let to_display = stage_display_name(&to_stage_key); + let (plain, html) = format_stage_notification( + &item_id, + story_name.as_deref(), + &from_display, + to_display, + ); + slog!("[bot] Sending stage notification: {plain}"); + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!("[bot] Failed to send notification to {room_id}: {e}"); + } + } + } + break; + } + }; + + use super::super::events::EventAction; + match classify(&event) { + EventAction::StageTransition => { + // WorkItem with a known from_stage — extract the fields. + let WatcherEvent::WorkItem { + ref stage, + ref item_id, + ref from_stage, + .. + } = event + else { + continue; + }; + let from_display = stage_display_name(from_stage.as_deref().unwrap_or("")); + + // Look up the story name in the expected stage directory; fall + // back to a full search so stale events still show the name. + let story_name = read_story_name(&project_root, stage, item_id) + .or_else(|| find_story_name_any_stage(&project_root, item_id)); + + // Buffer the transition. If this item_id is already pending (rapid + // succession), update to_stage_key to the latest destination while + // preserving the original from_display. + pending_transitions + .entry(item_id.clone()) + .and_modify(|e| { + e.1 = stage.clone(); + if story_name.is_some() { + e.2 = story_name.clone(); + } + }) + .or_insert_with(|| (from_display.to_string(), stage.clone(), story_name)); + + // Start or extend the debounce window. + flush_deadline = Some(tokio::time::Instant::now() + STAGE_TRANSITION_DEBOUNCE); + } + EventAction::MergeFailure => { + let WatcherEvent::MergeFailure { + ref story_id, + ref reason, + } = event + else { + continue; + }; + let story_name = read_story_name(&project_root, "4_merge", story_id); + let (plain, html) = + format_error_notification(story_id, story_name.as_deref(), reason); + slog!("[bot] Sending error notification: {plain}"); + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!("[bot] Failed to send error notification to {room_id}: {e}"); + } + } + } + EventAction::RateLimitWarning => { + let WatcherEvent::RateLimitWarning { + ref story_id, + ref agent_name, + } = event + else { + continue; + }; + if !config.rate_limit_notifications { + slog!( + "[bot] RateLimitWarning suppressed by config for \ + {story_id}:{agent_name}" + ); + continue; + } + let debounce_key = format!("{story_id}:{agent_name}"); + let now = Instant::now(); + if !should_send_rate_limit( + rate_limit_last_notified.get(&debounce_key).copied(), + now, + ) { + slog!( + "[bot] Rate-limit notification debounced for \ + {story_id}:{agent_name}" + ); + continue; + } + rate_limit_last_notified.insert(debounce_key, now); + let story_name = find_story_name_any_stage(&project_root, story_id); + let (plain, html) = + format_rate_limit_notification(story_id, story_name.as_deref(), agent_name); + slog!("[bot] Sending rate-limit notification: {plain}"); + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!( + "[bot] Failed to send rate-limit notification \ + to {room_id}: {e}" + ); + } + } + } + EventAction::StoryBlocked => { + let WatcherEvent::StoryBlocked { + ref story_id, + ref reason, + } = event + else { + continue; + }; + let story_name = find_story_name_any_stage(&project_root, story_id); + let (plain, html) = + format_blocked_notification(story_id, story_name.as_deref(), reason); + slog!("[bot] Sending blocked notification: {plain}"); + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!("[bot] Failed to send blocked notification to {room_id}: {e}"); + } + } + } + EventAction::OAuthAccountSwapped => { + let WatcherEvent::OAuthAccountSwapped { ref new_email } = event else { + continue; + }; + let (plain, html) = format_oauth_account_swapped(new_email); + slog!("[bot] Sending OAuth account-swap notification: {plain}"); + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!( + "[bot] Failed to send OAuth account-swap notification \ + to {room_id}: {e}" + ); + } + } + } + EventAction::OAuthAccountsExhausted => { + let WatcherEvent::OAuthAccountsExhausted { + ref earliest_reset_msg, + } = event + else { + continue; + }; + let (plain, html) = format_oauth_accounts_exhausted(earliest_reset_msg); + slog!("[bot] Sending OAuth accounts-exhausted notification: {plain}"); + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!( + "[bot] Failed to send OAuth accounts-exhausted notification \ + to {room_id}: {e}" + ); + } + } + } + EventAction::LogOnly => { + // Hard-block: log server-side for debugging; do NOT post to chat. + // Hard-block auto-resume is normal operation — the status command + // already surfaces rate-limit state via emoji. + if let WatcherEvent::RateLimitHardBlock { + ref story_id, + ref agent_name, + reset_at, + } = event + { + slog!( + "[bot] Rate-limit hard block for {story_id}/{agent_name}, \ + auto-resume at {reset_at}" + ); + } + } + EventAction::ReloadConfig => { + if let Ok(new_cfg) = ProjectConfig::load(&project_root) { + config = new_cfg; + } + } + EventAction::Skip => {} + } + } + }); +} diff --git a/server/src/service/notifications/io/mock_transport.rs b/server/src/service/notifications/io/mock_transport.rs new file mode 100644 index 00000000..dfc017ed --- /dev/null +++ b/server/src/service/notifications/io/mock_transport.rs @@ -0,0 +1,54 @@ +//! Shared [`MockTransport`] for notification listener tests. + +use crate::chat::{ChatTransport, MessageId}; +use async_trait::async_trait; +use std::sync::Arc; + +pub(super) type CallLog = Arc>>; + +/// Records every `send_message` call for inspection in tests. +pub(super) struct MockTransport { + pub(super) calls: CallLog, +} + +impl MockTransport { + pub(super) fn new() -> (Arc, CallLog) { + let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); + ( + Arc::new(Self { + calls: Arc::clone(&calls), + }), + calls, + ) + } +} + +#[async_trait] +impl ChatTransport for MockTransport { + async fn send_message( + &self, + room_id: &str, + plain: &str, + html: &str, + ) -> Result { + self.calls + .lock() + .unwrap() + .push((room_id.to_string(), plain.to_string(), html.to_string())); + Ok("mock-msg-id".to_string()) + } + + async fn edit_message( + &self, + _room_id: &str, + _id: &str, + _plain: &str, + _html: &str, + ) -> Result<(), String> { + Ok(()) + } + + async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { + Ok(()) + } +} diff --git a/server/src/service/notifications/io/mod.rs b/server/src/service/notifications/io/mod.rs new file mode 100644 index 00000000..986b6d29 --- /dev/null +++ b/server/src/service/notifications/io/mod.rs @@ -0,0 +1,34 @@ +//! I/O side of the notifications service. +//! +//! This is the **only** file inside `service/notifications/` that may perform +//! side effects: reading from the CRDT content store, loading configuration, +//! and spawning the background listener task. + +use crate::io::story_metadata::parse_front_matter; +use std::path::Path; + +mod listener; +pub use listener::spawn_notification_listener; + +#[cfg(test)] +mod mock_transport; +#[cfg(test)] +mod tests_notifications; +#[cfg(test)] +mod tests_stage; + +/// Read the story name from the CRDT content store's YAML front matter. +/// +/// Returns `None` if the item is not in the content store or has no parseable name. +pub fn read_story_name(_project_root: &Path, _stage: &str, item_id: &str) -> Option { + let contents = crate::db::read_content(item_id)?; + let meta = parse_front_matter(&contents).ok()?; + meta.name +} + +/// Look up a story name from the CRDT content store regardless of stage. +/// +/// Used for events (like rate-limit warnings) that arrive without a known stage. +fn find_story_name_any_stage(project_root: &Path, item_id: &str) -> Option { + read_story_name(project_root, "", item_id) +} diff --git a/server/src/service/notifications/io/tests_notifications.rs b/server/src/service/notifications/io/tests_notifications.rs new file mode 100644 index 00000000..eed38e8f --- /dev/null +++ b/server/src/service/notifications/io/tests_notifications.rs @@ -0,0 +1,447 @@ +//! Tests for rate-limit, story-blocked, OAuth, and config-reload notifications. + +use super::mock_transport::MockTransport; +use super::spawn_notification_listener; +use crate::io::watcher::WatcherEvent; +use tokio::sync::broadcast; + +// ── spawn_notification_listener: RateLimitWarning ──────────────────────────── + +/// AC2 + AC3: when a RateLimitWarning event arrives, send_message is called +/// with a notification that names the agent and story. +#[tokio::test] +async fn rate_limit_warning_sends_notification_with_agent_and_story() { + let tmp = tempfile::tempdir().unwrap(); + // Seed story via CRDT (the only source of truth). + crate::db::ensure_content_store(); + crate::db::write_item_with_content( + "365_story_rate_limit", + "2_current", + "---\nname: Rate Limit Test Story\n---\n", + ); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room123:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + watcher_tx + .send(WatcherEvent::RateLimitWarning { + story_id: "365_story_rate_limit".to_string(), + agent_name: "coder-1".to_string(), + }) + .unwrap(); + + // Give the spawned task time to process the event. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 1, "Expected exactly one notification"); + let (room_id, plain, _html) = &calls[0]; + assert_eq!(room_id, "!room123:example.org"); + assert!(plain.contains("365"), "plain should contain story number"); + assert!( + plain.contains("Rate Limit Test Story"), + "plain should contain story name" + ); + assert!(plain.contains("coder-1"), "plain should contain agent name"); + assert!( + plain.contains("rate limit"), + "plain should mention rate limit" + ); +} + +/// AC4: a second RateLimitWarning for the same agent within the debounce +/// window must NOT trigger a second notification. +#[tokio::test] +async fn rate_limit_warning_is_debounced() { + let tmp = tempfile::tempdir().unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room1:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + // Send the same warning twice in rapid succession. + for _ in 0..2 { + watcher_tx + .send(WatcherEvent::RateLimitWarning { + story_id: "42_story_debounce".to_string(), + agent_name: "coder-2".to_string(), + }) + .unwrap(); + } + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!( + calls.len(), + 1, + "Debounce should suppress the second notification" + ); +} + +/// AC4 (corollary): warnings for different agents are NOT debounced against +/// each other — both should produce notifications. +#[tokio::test] +async fn rate_limit_warnings_for_different_agents_both_notify() { + let tmp = tempfile::tempdir().unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room1:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + watcher_tx + .send(WatcherEvent::RateLimitWarning { + story_id: "42_story_foo".to_string(), + agent_name: "coder-1".to_string(), + }) + .unwrap(); + watcher_tx + .send(WatcherEvent::RateLimitWarning { + story_id: "42_story_foo".to_string(), + agent_name: "coder-2".to_string(), + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!( + calls.len(), + 2, + "Different agents should each trigger a notification" + ); +} + +// ── spawn_notification_listener: StoryBlocked ──────────────────────────────── + +/// AC1: when a StoryBlocked event arrives, send_message is called with a +/// notification that includes the story number, name, and reason. +#[tokio::test] +async fn story_blocked_sends_notification_with_reason() { + let tmp = tempfile::tempdir().unwrap(); + // Seed story via CRDT (the only source of truth). + crate::db::ensure_content_store(); + crate::db::write_item_with_content( + "425_story_blocking_test", + "2_current", + "---\nname: Blocking Test Story\n---\n", + ); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room123:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + watcher_tx + .send(WatcherEvent::StoryBlocked { + story_id: "425_story_blocking_test".to_string(), + reason: "Retry limit exceeded (3/3) at coder stage".to_string(), + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 1, "Expected exactly one notification"); + let (room_id, plain, html) = &calls[0]; + assert_eq!(room_id, "!room123:example.org"); + assert!(plain.contains("425"), "plain should contain story number"); + assert!( + plain.contains("Blocking Test Story"), + "plain should contain story name" + ); + assert!( + plain.contains("BLOCKED"), + "plain should contain BLOCKED label" + ); + assert!( + plain.contains("Retry limit exceeded"), + "plain should contain the reason" + ); + assert!( + html.contains("BLOCKED"), + "html should contain BLOCKED label" + ); +} + +/// StoryBlocked with no room registered should not panic. +#[tokio::test] +async fn story_blocked_with_no_rooms_is_silent() { + let tmp = tempfile::tempdir().unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener(transport, Vec::new, watcher_rx, tmp.path().to_path_buf()); + + watcher_tx + .send(WatcherEvent::StoryBlocked { + story_id: "42_story_no_rooms".to_string(), + reason: "empty diff".to_string(), + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 0, "No rooms means no notifications"); +} + +// ── rate_limit_notifications config flag ───────────────────────────────────── + +/// AC1+AC2: when rate_limit_notifications = false in project.toml, +/// RateLimitWarning events are suppressed (no send_message call). +#[tokio::test] +async fn rate_limit_warning_suppressed_when_config_false() { + let tmp = tempfile::tempdir().unwrap(); + let sk_dir = tmp.path().join(".huskies"); + std::fs::create_dir_all(&sk_dir).unwrap(); + std::fs::write( + sk_dir.join("project.toml"), + "rate_limit_notifications = false\n", + ) + .unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room1:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + watcher_tx + .send(WatcherEvent::RateLimitWarning { + story_id: "42_story_suppress".to_string(), + agent_name: "coder-1".to_string(), + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!( + calls.len(), + 0, + "RateLimitWarning should be suppressed when rate_limit_notifications = false" + ); +} + +/// RateLimitHardBlock is never posted to Matrix — it is logged server-side only. +#[tokio::test] +async fn rate_limit_hard_block_never_sent_to_matrix() { + let tmp = tempfile::tempdir().unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room1:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + let reset_at = chrono::Utc::now() + chrono::Duration::hours(1); + watcher_tx + .send(WatcherEvent::RateLimitHardBlock { + story_id: "42_story_hard_block".to_string(), + agent_name: "coder-1".to_string(), + reset_at, + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 0, "RateLimitHardBlock must not post to Matrix"); +} + +/// AC3: StoryBlocked is always sent regardless of rate_limit_notifications. +#[tokio::test] +async fn story_blocked_always_sent_when_config_false() { + let tmp = tempfile::tempdir().unwrap(); + let sk_dir = tmp.path().join(".huskies"); + std::fs::create_dir_all(&sk_dir).unwrap(); + std::fs::write( + sk_dir.join("project.toml"), + "rate_limit_notifications = false\n", + ) + .unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room1:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + watcher_tx + .send(WatcherEvent::StoryBlocked { + story_id: "42_story_blocked".to_string(), + reason: "retry limit exceeded".to_string(), + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 1, "StoryBlocked should always be sent"); +} + +/// AC5: Config is hot-reloaded — disabling rate_limit_notifications after +/// startup suppresses subsequent RateLimitWarning events. +#[tokio::test] +async fn rate_limit_warning_suppressed_after_hot_reload() { + let tmp = tempfile::tempdir().unwrap(); + let sk_dir = tmp.path().join(".huskies"); + std::fs::create_dir_all(&sk_dir).unwrap(); + // Start with notifications enabled. + std::fs::write( + sk_dir.join("project.toml"), + "rate_limit_notifications = true\n", + ) + .unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room1:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + // First warning is sent. + watcher_tx + .send(WatcherEvent::RateLimitWarning { + story_id: "42_story_reload".to_string(), + agent_name: "coder-1".to_string(), + }) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Disable notifications and trigger hot-reload. + std::fs::write( + sk_dir.join("project.toml"), + "rate_limit_notifications = false\n", + ) + .unwrap(); + watcher_tx.send(WatcherEvent::ConfigChanged).unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Second warning (different agent to bypass debounce) should be suppressed. + watcher_tx + .send(WatcherEvent::RateLimitWarning { + story_id: "42_story_reload".to_string(), + agent_name: "coder-2".to_string(), + }) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!( + calls.len(), + 1, + "Only the first warning should be sent; second should be suppressed after hot-reload" + ); +} + +// ── OAuthAccountSwapped / OAuthAccountsExhausted ───────────────────────────── + +/// AC1: OAuthAccountSwapped fires a notification naming the new account. +#[tokio::test] +async fn oauth_account_swapped_sends_notification_with_new_email() { + let tmp = tempfile::tempdir().unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room1:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + watcher_tx + .send(WatcherEvent::OAuthAccountSwapped { + new_email: "alice@example.com".to_string(), + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 1, "Expected exactly one notification"); + let (room_id, plain, _html) = &calls[0]; + assert_eq!(room_id, "!room1:example.org"); + assert!( + plain.contains("alice@example.com"), + "notification should name the new account; got: {plain}" + ); +} + +/// AC2: OAuthAccountsExhausted fires a notification with the reset message. +#[tokio::test] +async fn oauth_accounts_exhausted_sends_notification_with_reset_msg() { + let tmp = tempfile::tempdir().unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room1:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + watcher_tx + .send(WatcherEvent::OAuthAccountsExhausted { + earliest_reset_msg: "All OAuth accounts are rate-limited; earliest reset in 3h" + .to_string(), + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 1, "Expected exactly one notification"); + let (room_id, plain, _html) = &calls[0]; + assert_eq!(room_id, "!room1:example.org"); + assert!( + plain.contains("rate-limited"), + "notification should contain reset message; got: {plain}" + ); +} diff --git a/server/src/service/notifications/io/tests_stage.rs b/server/src/service/notifications/io/tests_stage.rs new file mode 100644 index 00000000..bff6c8a4 --- /dev/null +++ b/server/src/service/notifications/io/tests_stage.rs @@ -0,0 +1,226 @@ +//! Tests for stage-transition notifications and `read_story_name`. + +use super::mock_transport::MockTransport; +use super::{read_story_name, spawn_notification_listener}; +use crate::io::watcher::WatcherEvent; +use std::sync::Arc; +use tokio::sync::broadcast; + +// ── dynamic room IDs (WhatsApp ambient_rooms pattern) ─────────────────────── + +/// Notifications are sent to the rooms returned by the closure at +/// notification time, not at listener-spawn time. This verifies that a +/// closure backed by a runtime set (e.g. WhatsApp ambient_rooms) delivers +/// messages to the rooms present when the event fires. +#[tokio::test] +async fn stage_notification_uses_dynamic_room_ids() { + let tmp = tempfile::tempdir().unwrap(); + // Seed story via CRDT (the only source of truth). + crate::db::ensure_content_store(); + crate::db::write_item_with_content("10_story_foo", "3_qa", "---\nname: Foo Story\n---\n"); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + let rooms: Arc>> = + Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())); + let rooms_for_closure = Arc::clone(&rooms); + + spawn_notification_listener( + transport, + move || rooms_for_closure.lock().unwrap().iter().cloned().collect(), + watcher_rx, + tmp.path().to_path_buf(), + ); + + // Add a room after the listener is spawned (simulates a user messaging first). + rooms + .lock() + .unwrap() + .insert("phone:+15551234567".to_string()); + + watcher_tx + .send(WatcherEvent::WorkItem { + stage: "3_qa".to_string(), + item_id: "10_story_foo".to_string(), + action: "qa".to_string(), + commit_msg: "huskies: qa 10_story_foo".to_string(), + from_stage: Some("2_current".to_string()), + }) + .unwrap(); + + // Wait longer than STAGE_TRANSITION_DEBOUNCE (200ms) so the coalesced + // notification flushes. + tokio::time::sleep(std::time::Duration::from_millis(350)).await; + + let calls = calls.lock().unwrap(); + assert_eq!( + calls.len(), + 1, + "Should deliver to the dynamically added room" + ); + assert_eq!(calls[0].0, "phone:+15551234567"); + assert!( + calls[0].1.contains("10"), + "plain should contain story number" + ); + assert!( + calls[0].1.contains("Foo Story"), + "plain should contain story name" + ); +} + +/// When no rooms are registered (e.g. no WhatsApp users have messaged yet), +/// no notifications are sent and the listener does not panic. +#[tokio::test] +async fn stage_notification_with_no_rooms_is_silent() { + let tmp = tempfile::tempdir().unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener(transport, Vec::new, watcher_rx, tmp.path().to_path_buf()); + + watcher_tx + .send(WatcherEvent::WorkItem { + stage: "3_qa".to_string(), + item_id: "10_story_foo".to_string(), + action: "qa".to_string(), + commit_msg: "huskies: qa 10_story_foo".to_string(), + from_stage: Some("2_current".to_string()), + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 0, "No rooms means no notifications"); +} + +// ── read_story_name ────────────────────────────────────────────────────────── + +#[test] +fn read_story_name_reads_from_front_matter() { + crate::db::ensure_content_store(); + crate::db::write_item_with_content( + "9942_story_my_feature", + "2_current", + "---\nname: My Cool Feature\n---\n# Story\n", + ); + + let tmp = tempfile::tempdir().unwrap(); + let name = read_story_name(tmp.path(), "2_current", "9942_story_my_feature"); + assert_eq!(name.as_deref(), Some("My Cool Feature")); +} + +#[test] +fn read_story_name_returns_none_for_missing_file() { + crate::db::ensure_content_store(); + let tmp = tempfile::tempdir().unwrap(); + let name = read_story_name(tmp.path(), "2_current", "99_story_missing_notif_test"); + assert_eq!(name, None); +} + +#[test] +fn read_story_name_returns_none_for_missing_name_field() { + crate::db::ensure_content_store(); + crate::db::write_item_with_content( + "9943_story_no_name", + "2_current", + "---\ncoverage_baseline: 50%\n---\n# Story\n", + ); + + let tmp = tempfile::tempdir().unwrap(); + let name = read_story_name(tmp.path(), "2_current", "9943_story_no_name"); + assert_eq!(name, None); +} + +// ── Bug 549: synthetic events with from_stage=None must not notify ─────────── + +/// Synthetic events (reassign, creation) have from_stage=None and must +/// not produce stage-transition notifications. Before the fix, the +/// inferred_from_stage fallback would emit e.g. "QA → Merge" for a +/// reassign event within the merge stage. +#[tokio::test] +async fn synthetic_event_without_from_stage_does_not_notify() { + let tmp = tempfile::tempdir().unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room1:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + // Synthetic reassign event within 4_merge — no actual stage change. + watcher_tx + .send(WatcherEvent::WorkItem { + stage: "4_merge".to_string(), + item_id: "549_story_skip_qa".to_string(), + action: "reassign".to_string(), + commit_msg: String::new(), + from_stage: None, + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(350)).await; + + let calls = calls.lock().unwrap(); + assert_eq!( + calls.len(), + 0, + "Synthetic events with from_stage=None must not generate notifications" + ); +} + +/// Stories that skip QA (qa: server) move directly from Current to Merge. +/// The notification must say "Current → Merge", not "QA → Merge". +#[tokio::test] +async fn skip_qa_shows_current_to_merge_not_qa_to_merge() { + let tmp = tempfile::tempdir().unwrap(); + let stage_dir = tmp.path().join(".huskies").join("work").join("4_merge"); + std::fs::create_dir_all(&stage_dir).unwrap(); + std::fs::write( + stage_dir.join("549_story_skip_qa.md"), + "---\nname: Skip QA Story\n---\n", + ) + .unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room1:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + // Story skips QA: from_stage is 2_current, not 3_qa. + watcher_tx + .send(WatcherEvent::WorkItem { + stage: "4_merge".to_string(), + item_id: "549_story_skip_qa".to_string(), + action: "merge".to_string(), + commit_msg: "huskies: merge 549_story_skip_qa".to_string(), + from_stage: Some("2_current".to_string()), + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(350)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 1, "Should send exactly one notification"); + assert!( + calls[0].1.contains("Current \u{2192} Merge"), + "Notification should say 'Current → Merge', got: {}", + calls[0].1 + ); + assert!( + !calls[0].1.contains("QA \u{2192} Merge"), + "Must NOT say 'QA → Merge' when QA was skipped" + ); +} diff --git a/server/src/service/notifications/mod.rs b/server/src/service/notifications/mod.rs index b9f1fe43..00a696cb 100644 --- a/server/src/service/notifications/mod.rs +++ b/server/src/service/notifications/mod.rs @@ -5,7 +5,7 @@ //! //! Follows service-module conventions: //! - `mod.rs` (this file) — public API, typed [`Error`] type, orchestration -//! - `io.rs` — the ONLY place that performs side effects (DB reads, config +//! - `io/` — the ONLY place that performs side effects (DB reads, config //! loads, `tokio::spawn`) //! - `format.rs` — pure: message formatting functions //! - `filter.rs` — pure: debounce constants and suppression predicates