From 271f8ea6a8f2d2afd96703e938e21a45e3eee8a7 Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 24 Apr 2026 18:01:34 +0000 Subject: [PATCH] huskies: merge 616_story_extract_notifications_service --- server/src/chat/transport/matrix/bot/run.rs | 2 +- server/src/chat/transport/matrix/mod.rs | 1 - server/src/gateway.rs | 4 +- server/src/main.rs | 6 +- server/src/service/mod.rs | 1 + server/src/service/notifications/events.rs | 119 ++++ server/src/service/notifications/filter.rs | 73 +++ server/src/service/notifications/format.rs | 344 +++++++++++ .../notifications/io.rs} | 569 +++++------------- server/src/service/notifications/mod.rs | 89 +++ server/src/service/notifications/route.rs | 42 ++ 11 files changed, 829 insertions(+), 421 deletions(-) create mode 100644 server/src/service/notifications/events.rs create mode 100644 server/src/service/notifications/filter.rs create mode 100644 server/src/service/notifications/format.rs rename server/src/{chat/transport/matrix/notifications.rs => service/notifications/io.rs} (70%) create mode 100644 server/src/service/notifications/mod.rs create mode 100644 server/src/service/notifications/route.rs diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index 525cc2e4..9ad2afce 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -272,7 +272,7 @@ pub async fn run_bot( // Spawn the stage-transition notification listener before entering the // sync loop so it starts receiving watcher events immediately. let notif_room_id_strings: Vec = notif_room_ids.iter().map(|r| r.to_string()).collect(); - super::super::notifications::spawn_notification_listener( + crate::service::notifications::spawn_notification_listener( Arc::clone(&transport), move || notif_room_id_strings.clone(), watcher_rx, diff --git a/server/src/chat/transport/matrix/mod.rs b/server/src/chat/transport/matrix/mod.rs index f12af56a..47640f84 100644 --- a/server/src/chat/transport/matrix/mod.rs +++ b/server/src/chat/transport/matrix/mod.rs @@ -21,7 +21,6 @@ pub mod commands; pub(crate) mod config; pub mod delete; pub mod htop; -pub mod notifications; pub mod rebuild; pub mod reset; pub mod rmtree; diff --git a/server/src/gateway.rs b/server/src/gateway.rs index e9628e60..8be508bd 100644 --- a/server/src/gateway.rs +++ b/server/src/gateway.rs @@ -2317,11 +2317,11 @@ fn format_gateway_event( project_name: &str, event: &crate::http::events::StoredEvent, ) -> (String, String) { - use crate::chat::transport::matrix::notifications::{ + use crate::http::events::StoredEvent; + use crate::service::notifications::{ format_blocked_notification, format_error_notification, format_stage_notification, stage_display_name, }; - use crate::http::events::StoredEvent; let prefix = format!("[{project_name}] "); diff --git a/server/src/main.rs b/server/src/main.rs index ad91cf4a..f8ab475c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -893,7 +893,7 @@ async fn main() -> Result<(), std::io::Error> { // These mirror the listener that the Matrix bot spawns internally. if let (Some(ctx), Some(root)) = (&whatsapp_ctx, &startup_root) { let ambient_rooms = Arc::clone(&ctx.ambient_rooms); - chat::transport::matrix::notifications::spawn_notification_listener( + crate::service::notifications::spawn_notification_listener( Arc::clone(&ctx.transport), move || ambient_rooms.lock().unwrap().iter().cloned().collect(), watcher_rx_for_whatsapp, @@ -904,7 +904,7 @@ async fn main() -> Result<(), std::io::Error> { } if let (Some(ctx), Some(root)) = (&slack_ctx, &startup_root) { let channel_ids: Vec = ctx.channel_ids.iter().cloned().collect(); - chat::transport::matrix::notifications::spawn_notification_listener( + crate::service::notifications::spawn_notification_listener( Arc::clone(&ctx.transport) as Arc, move || channel_ids.clone(), watcher_rx_for_slack, @@ -919,7 +919,7 @@ async fn main() -> Result<(), std::io::Error> { // Spawn stage-transition notification listener for Discord. let channel_ids: Vec = ctx.channel_ids.iter().cloned().collect(); - chat::transport::matrix::notifications::spawn_notification_listener( + crate::service::notifications::spawn_notification_listener( Arc::clone(&ctx.transport) as Arc, move || channel_ids.clone(), watcher_rx_for_discord, diff --git a/server/src/service/mod.rs b/server/src/service/mod.rs index 27196758..4a7fa24c 100644 --- a/server/src/service/mod.rs +++ b/server/src/service/mod.rs @@ -11,6 +11,7 @@ pub mod bot_command; pub mod events; pub mod file_io; pub mod health; +pub mod notifications; pub mod oauth; pub mod project; pub mod settings; diff --git a/server/src/service/notifications/events.rs b/server/src/service/notifications/events.rs new file mode 100644 index 00000000..340582df --- /dev/null +++ b/server/src/service/notifications/events.rs @@ -0,0 +1,119 @@ +//! Event-to-notification mapping. +//! +//! Pure functions that classify [`WatcherEvent`] variants into notification +//! actions, deciding which events produce user-visible messages and which +//! are suppressed or logged server-side only. + +use crate::io::watcher::WatcherEvent; + +/// The notification action to take in response to a [`WatcherEvent`]. +#[derive(Debug, PartialEq)] +pub enum EventAction { + /// Post a stage-transition notification; the event carries a known source stage. + StageTransition, + /// Post a merge-failure error notification. + MergeFailure, + /// Post a rate-limit warning (subject to config/debounce suppression). + RateLimitWarning, + /// Post a story-blocked notification. + StoryBlocked, + /// Log server-side only; do not post to chat (e.g. hard rate-limit blocks). + LogOnly, + /// Reload the project configuration. + ReloadConfig, + /// Skip silently (synthetic events, unknown variants). + Skip, +} + +/// Classify a [`WatcherEvent`] into the action the notification listener should take. +pub fn classify(event: &WatcherEvent) -> EventAction { + match event { + WatcherEvent::WorkItem { from_stage, .. } => { + if from_stage.is_some() { + EventAction::StageTransition + } else { + // Synthetic events (creation, reassign) have no from_stage. + // Posting a notification for these would produce incorrect messages. + EventAction::Skip + } + } + WatcherEvent::MergeFailure { .. } => EventAction::MergeFailure, + WatcherEvent::RateLimitWarning { .. } => EventAction::RateLimitWarning, + WatcherEvent::StoryBlocked { .. } => EventAction::StoryBlocked, + WatcherEvent::RateLimitHardBlock { .. } => EventAction::LogOnly, + WatcherEvent::ConfigChanged => EventAction::ReloadConfig, + _ => EventAction::Skip, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn work_item(from_stage: Option<&str>) -> WatcherEvent { + WatcherEvent::WorkItem { + stage: "3_qa".to_string(), + item_id: "1_story_foo".to_string(), + action: "qa".to_string(), + commit_msg: String::new(), + from_stage: from_stage.map(str::to_string), + } + } + + #[test] + fn work_item_with_from_stage_is_stage_transition() { + let event = work_item(Some("2_current")); + assert_eq!(classify(&event), EventAction::StageTransition); + } + + #[test] + fn work_item_without_from_stage_is_skip() { + let event = work_item(None); + assert_eq!(classify(&event), EventAction::Skip); + } + + #[test] + fn merge_failure_is_classified_correctly() { + let event = WatcherEvent::MergeFailure { + story_id: "1_story_foo".to_string(), + reason: "conflict".to_string(), + }; + assert_eq!(classify(&event), EventAction::MergeFailure); + } + + #[test] + fn rate_limit_warning_is_classified_correctly() { + let event = WatcherEvent::RateLimitWarning { + story_id: "1_story_foo".to_string(), + agent_name: "coder-1".to_string(), + }; + assert_eq!(classify(&event), EventAction::RateLimitWarning); + } + + #[test] + fn story_blocked_is_classified_correctly() { + let event = WatcherEvent::StoryBlocked { + story_id: "1_story_foo".to_string(), + reason: "empty diff".to_string(), + }; + assert_eq!(classify(&event), EventAction::StoryBlocked); + } + + #[test] + fn rate_limit_hard_block_is_log_only() { + let event = WatcherEvent::RateLimitHardBlock { + story_id: "1_story_foo".to_string(), + agent_name: "coder-1".to_string(), + reset_at: chrono::Utc::now(), + }; + assert_eq!(classify(&event), EventAction::LogOnly); + } + + #[test] + fn config_changed_triggers_reload() { + assert_eq!( + classify(&WatcherEvent::ConfigChanged), + EventAction::ReloadConfig + ); + } +} diff --git a/server/src/service/notifications/filter.rs b/server/src/service/notifications/filter.rs new file mode 100644 index 00000000..7f20f1d2 --- /dev/null +++ b/server/src/service/notifications/filter.rs @@ -0,0 +1,73 @@ +//! Pure filtering and debounce logic for notification suppression. +//! +//! Contains constants and predicates that decide whether a notification +//! should be sent, without performing any I/O. + +use std::time::{Duration, Instant}; + +/// Minimum time between rate-limit notifications for the same agent key. +pub const RATE_LIMIT_DEBOUNCE: Duration = Duration::from_secs(60); + +/// Window during which rapid stage transitions for the same item are coalesced +/// into a single notification (only the final stage is announced). +pub const STAGE_TRANSITION_DEBOUNCE: Duration = Duration::from_millis(200); + +/// Returns `true` if a rate-limit notification should be sent. +/// +/// `last_notified` is the [`Instant`] of the last sent notification for this +/// agent, or `None` if no notification has been sent yet. +pub fn should_send_rate_limit(last_notified: Option, now: Instant) -> bool { + match last_notified { + None => true, + Some(last) => now.duration_since(last) >= RATE_LIMIT_DEBOUNCE, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // ── should_send_rate_limit ──────────────────────────────────────────────── + + #[test] + fn should_send_when_never_notified() { + let now = Instant::now(); + assert!(should_send_rate_limit(None, now)); + } + + #[test] + fn should_not_send_within_debounce_window() { + let now = Instant::now(); + // Pretend last notification was 10 seconds ago — inside the 60s window. + let last = now - Duration::from_secs(10); + assert!(!should_send_rate_limit(Some(last), now)); + } + + #[test] + fn should_send_after_debounce_window_expires() { + let now = Instant::now(); + // Pretend last notification was 61 seconds ago — outside the 60s window. + let last = now - Duration::from_secs(61); + assert!(should_send_rate_limit(Some(last), now)); + } + + #[test] + fn should_not_send_at_exactly_debounce_boundary() { + let now = Instant::now(); + // Exactly at the boundary: duration_since == RATE_LIMIT_DEBOUNCE (>=, so allowed). + let last = now - RATE_LIMIT_DEBOUNCE; + assert!(should_send_rate_limit(Some(last), now)); + } + + // ── constants ───────────────────────────────────────────────────────────── + + #[test] + fn rate_limit_debounce_is_one_minute() { + assert_eq!(RATE_LIMIT_DEBOUNCE, Duration::from_secs(60)); + } + + #[test] + fn stage_transition_debounce_is_200ms() { + assert_eq!(STAGE_TRANSITION_DEBOUNCE, Duration::from_millis(200)); + } +} diff --git a/server/src/service/notifications/format.rs b/server/src/service/notifications/format.rs new file mode 100644 index 00000000..43ef4a8f --- /dev/null +++ b/server/src/service/notifications/format.rs @@ -0,0 +1,344 @@ +//! Pure message-formatting functions for pipeline-event notifications. +//! +//! All functions are pure (no I/O, no side effects) and accept only owned +//! or borrowed string data. They return `(plain_text, html)` pairs suitable +//! for `ChatTransport::send_message`. + +/// Human-readable display name for a pipeline stage directory. +pub fn stage_display_name(stage: &str) -> &'static str { + match stage { + "1_backlog" => "Backlog", + "2_current" => "Current", + "3_qa" => "QA", + "4_merge" => "Merge", + "5_done" => "Done", + "6_archived" => "Archived", + _ => "Unknown", + } +} + +/// Extract the numeric story number from an item ID like `"261_story_slug"`. +pub fn extract_story_number(item_id: &str) -> Option<&str> { + item_id + .split('_') + .next() + .filter(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit())) +} + +/// Format a stage transition notification message. +/// +/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. +pub fn format_stage_notification( + item_id: &str, + story_name: Option<&str>, + from_stage: &str, + to_stage: &str, +) -> (String, String) { + let number = extract_story_number(item_id).unwrap_or(item_id); + let name = story_name.unwrap_or(item_id); + + let prefix = if to_stage == "Done" { "\u{1f389} " } else { "" }; + let plain = format!("{prefix}#{number} {name} \u{2014} {from_stage} \u{2192} {to_stage}"); + let html = format!( + "{prefix}#{number} {name} \u{2014} {from_stage} \u{2192} {to_stage}" + ); + (plain, html) +} + +/// Format an error notification message for a story merge failure. +/// +/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. +pub fn format_error_notification( + item_id: &str, + story_name: Option<&str>, + reason: &str, +) -> (String, String) { + let number = extract_story_number(item_id).unwrap_or(item_id); + let name = story_name.unwrap_or(item_id); + + let plain = format!("\u{274c} #{number} {name} \u{2014} {reason}"); + let html = format!("\u{274c} #{number} {name} \u{2014} {reason}"); + (plain, html) +} + +/// Format a blocked-story notification message. +/// +/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. +pub fn format_blocked_notification( + item_id: &str, + story_name: Option<&str>, + reason: &str, +) -> (String, String) { + let number = extract_story_number(item_id).unwrap_or(item_id); + let name = story_name.unwrap_or(item_id); + + let plain = format!("\u{1f6ab} #{number} {name} \u{2014} BLOCKED: {reason}"); + let html = + format!("\u{1f6ab} #{number} {name} \u{2014} BLOCKED: {reason}"); + (plain, html) +} + +/// Format a rate limit warning notification message. +/// +/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. +pub fn format_rate_limit_notification( + item_id: &str, + story_name: Option<&str>, + agent_name: &str, +) -> (String, String) { + let number = extract_story_number(item_id).unwrap_or(item_id); + let name = story_name.unwrap_or(item_id); + + let plain = + format!("\u{26a0}\u{fe0f} #{number} {name} \u{2014} {agent_name} hit an API rate limit"); + let html = format!( + "\u{26a0}\u{fe0f} #{number} {name} \u{2014} \ + {agent_name} hit an API rate limit" + ); + (plain, html) +} + +#[cfg(test)] +mod tests { + use super::*; + + // ── stage_display_name ──────────────────────────────────────────────────── + + #[test] + fn stage_display_name_maps_all_known_stages() { + assert_eq!(stage_display_name("1_backlog"), "Backlog"); + assert_eq!(stage_display_name("2_current"), "Current"); + assert_eq!(stage_display_name("3_qa"), "QA"); + assert_eq!(stage_display_name("4_merge"), "Merge"); + assert_eq!(stage_display_name("5_done"), "Done"); + assert_eq!(stage_display_name("6_archived"), "Archived"); + assert_eq!(stage_display_name("unknown"), "Unknown"); + } + + #[test] + fn stage_display_name_unknown_slug_returns_unknown() { + assert_eq!(stage_display_name("99_future"), "Unknown"); + assert_eq!(stage_display_name(""), "Unknown"); + } + + // ── extract_story_number ────────────────────────────────────────────────── + + #[test] + fn extract_story_number_parses_numeric_prefix() { + assert_eq!( + extract_story_number("261_story_bot_notifications"), + Some("261") + ); + assert_eq!(extract_story_number("42_bug_fix_thing"), Some("42")); + assert_eq!(extract_story_number("1_spike_research"), Some("1")); + } + + #[test] + fn extract_story_number_returns_none_for_non_numeric() { + assert_eq!(extract_story_number("abc_story_thing"), None); + assert_eq!(extract_story_number(""), None); + } + + #[test] + fn extract_story_number_returns_none_for_empty_first_segment() { + // Leading underscore: first segment is "" + assert_eq!(extract_story_number("_story_thing"), None); + } + + // ── format_stage_notification ───────────────────────────────────────────── + + #[test] + fn format_notification_done_stage_includes_party_emoji() { + let (plain, html) = + format_stage_notification("353_story_done", Some("Done Story"), "Merge", "Done"); + assert_eq!( + plain, + "\u{1f389} #353 Done Story \u{2014} Merge \u{2192} Done" + ); + assert_eq!( + html, + "\u{1f389} #353 Done Story \u{2014} Merge \u{2192} Done" + ); + } + + #[test] + fn format_notification_non_done_stage_has_no_emoji() { + let (plain, _html) = + format_stage_notification("42_story_thing", Some("Some Story"), "Backlog", "Current"); + assert!(!plain.contains("\u{1f389}")); + } + + #[test] + fn format_notification_with_story_name() { + let (plain, html) = format_stage_notification( + "261_story_bot_notifications", + Some("Bot notifications"), + "Upcoming", + "Current", + ); + assert_eq!( + plain, + "#261 Bot notifications \u{2014} Upcoming \u{2192} Current" + ); + assert_eq!( + html, + "#261 Bot notifications \u{2014} Upcoming \u{2192} Current" + ); + } + + #[test] + fn format_notification_without_story_name_falls_back_to_item_id() { + let (plain, _html) = format_stage_notification("42_bug_fix_thing", None, "Current", "QA"); + assert_eq!(plain, "#42 42_bug_fix_thing \u{2014} Current \u{2192} QA"); + } + + #[test] + fn format_notification_non_numeric_id_uses_full_id() { + let (plain, _html) = + format_stage_notification("abc_story_thing", Some("Some Story"), "QA", "Merge"); + assert_eq!( + plain, + "#abc_story_thing Some Story \u{2014} QA \u{2192} Merge" + ); + } + + #[test] + fn format_stage_notification_long_name_is_preserved() { + let long_name = "A".repeat(300); + let (plain, _html) = + format_stage_notification("1_story_long", Some(&long_name), "Current", "QA"); + assert!(plain.contains(&long_name)); + } + + #[test] + fn format_stage_notification_empty_story_name_falls_back_to_id() { + // Some("") is a valid Some but empty — treat as missing? Currently we use it as-is. + let (plain, _html) = format_stage_notification("42_story_empty", Some(""), "Current", "QA"); + // The name slot is empty but the structure is still correct. + assert!(plain.contains("#42")); + assert!(plain.contains("Current \u{2192} QA")); + } + + #[test] + fn format_stage_notification_unicode_name() { + let (plain, html) = + format_stage_notification("7_story_i18n", Some("Ünïcödé Ñämé 🎉"), "QA", "Merge"); + assert!(plain.contains("Ünïcödé Ñämé 🎉")); + assert!(html.contains("Ünïcödé Ñämé 🎉")); + } + + // ── format_error_notification ───────────────────────────────────────────── + + #[test] + fn format_error_notification_with_story_name() { + let (plain, html) = format_error_notification( + "262_story_bot_errors", + Some("Bot error notifications"), + "merge conflict in src/main.rs", + ); + assert_eq!( + plain, + "\u{274c} #262 Bot error notifications \u{2014} merge conflict in src/main.rs" + ); + assert_eq!( + html, + "\u{274c} #262 Bot error notifications \u{2014} merge conflict in src/main.rs" + ); + } + + #[test] + fn format_error_notification_without_story_name_falls_back_to_item_id() { + let (plain, _html) = format_error_notification("42_bug_fix_thing", None, "tests failed"); + assert_eq!(plain, "\u{274c} #42 42_bug_fix_thing \u{2014} tests failed"); + } + + #[test] + fn format_error_notification_non_numeric_id_uses_full_id() { + let (plain, _html) = + format_error_notification("abc_story_thing", Some("Some Story"), "clippy errors"); + assert_eq!( + plain, + "\u{274c} #abc_story_thing Some Story \u{2014} clippy errors" + ); + } + + #[test] + fn format_error_notification_long_reason_preserved() { + let long_reason = "x".repeat(500); + let (plain, _html) = format_error_notification("1_story_foo", None, &long_reason); + assert!(plain.contains(&long_reason)); + } + + #[test] + fn format_error_notification_unicode_reason() { + let (plain, _html) = + format_error_notification("5_story_foo", Some("Foo"), "错误:合并冲突"); + assert!(plain.contains("错误:合并冲突")); + } + + // ── format_blocked_notification ─────────────────────────────────────────── + + #[test] + fn format_blocked_notification_with_story_name() { + let (plain, html) = format_blocked_notification( + "425_story_blocking_reason", + Some("Blocking Reason Story"), + "Retry limit exceeded (3/3) at coder stage", + ); + assert_eq!( + plain, + "\u{1f6ab} #425 Blocking Reason Story \u{2014} BLOCKED: Retry limit exceeded (3/3) at coder stage" + ); + assert_eq!( + html, + "\u{1f6ab} #425 Blocking Reason Story \u{2014} BLOCKED: Retry limit exceeded (3/3) at coder stage" + ); + } + + #[test] + fn format_blocked_notification_falls_back_to_item_id() { + let (plain, _html) = format_blocked_notification("42_story_thing", None, "empty diff"); + assert_eq!( + plain, + "\u{1f6ab} #42 42_story_thing \u{2014} BLOCKED: empty diff" + ); + } + + #[test] + fn format_blocked_notification_unicode_reason() { + let (plain, _html) = format_blocked_notification("3_story_x", Some("X"), "理由:空の差分"); + assert!(plain.contains("BLOCKED: 理由:空の差分")); + } + + // ── format_rate_limit_notification ──────────────────────────────────────── + + #[test] + fn format_rate_limit_notification_includes_agent_and_story() { + let (plain, html) = + format_rate_limit_notification("365_story_my_feature", Some("My Feature"), "coder-2"); + assert_eq!( + plain, + "\u{26a0}\u{fe0f} #365 My Feature \u{2014} coder-2 hit an API rate limit" + ); + assert_eq!( + html, + "\u{26a0}\u{fe0f} #365 My Feature \u{2014} coder-2 hit an API rate limit" + ); + } + + #[test] + fn format_rate_limit_notification_falls_back_to_item_id() { + let (plain, _html) = format_rate_limit_notification("42_story_thing", None, "coder-1"); + assert_eq!( + plain, + "\u{26a0}\u{fe0f} #42 42_story_thing \u{2014} coder-1 hit an API rate limit" + ); + } + + #[test] + fn format_rate_limit_notification_unicode_agent_name() { + let (plain, _html) = format_rate_limit_notification("9_story_foo", Some("Foo"), "агент-1"); + assert!(plain.contains("агент-1")); + assert!(plain.contains("hit an API rate limit")); + } +} diff --git a/server/src/chat/transport/matrix/notifications.rs b/server/src/service/notifications/io.rs similarity index 70% rename from server/src/chat/transport/matrix/notifications.rs rename to server/src/service/notifications/io.rs index 3e389627..04556172 100644 --- a/server/src/chat/transport/matrix/notifications.rs +++ b/server/src/service/notifications/io.rs @@ -1,7 +1,8 @@ -//! Stage transition notifications for Matrix rooms. +//! I/O side of the notifications service. //! -//! Subscribes to [`WatcherEvent`] broadcasts and posts a notification to all -//! configured Matrix rooms whenever a work item moves between pipeline stages. +//! This is the **only** file inside `service/notifications/` that may perform +//! side effects: reading from the CRDT content store, loading configuration, +//! and spawning the background listener task. use crate::chat::ChatTransport; use crate::config::ProjectConfig; @@ -11,29 +12,16 @@ use crate::slog; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use tokio::sync::broadcast; -/// Human-readable display name for a pipeline stage directory. -pub fn stage_display_name(stage: &str) -> &'static str { - match stage { - "1_backlog" => "Backlog", - "2_current" => "Current", - "3_qa" => "QA", - "4_merge" => "Merge", - "5_done" => "Done", - "6_archived" => "Archived", - _ => "Unknown", - } -} - -/// Extract the numeric story number from an item ID like `"261_story_slug"`. -pub fn extract_story_number(item_id: &str) -> Option<&str> { - item_id - .split('_') - .next() - .filter(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit())) -} +use super::events::classify; +use super::filter::{STAGE_TRANSITION_DEBOUNCE, should_send_rate_limit}; +use super::format::{ + format_blocked_notification, format_error_notification, format_rate_limit_notification, + format_stage_notification, stage_display_name, +}; +use super::route::rooms_for_notification; /// Read the story name from the CRDT content store's YAML front matter. /// @@ -44,93 +32,13 @@ pub fn read_story_name(_project_root: &Path, _stage: &str, item_id: &str) -> Opt meta.name } -/// Format a stage transition notification message. -/// -/// Returns `(plain_text, html)` suitable for `RoomMessageEventContent::text_html`. -pub fn format_stage_notification( - item_id: &str, - story_name: Option<&str>, - from_stage: &str, - to_stage: &str, -) -> (String, String) { - let number = extract_story_number(item_id).unwrap_or(item_id); - let name = story_name.unwrap_or(item_id); - - let prefix = if to_stage == "Done" { "\u{1f389} " } else { "" }; - let plain = format!("{prefix}#{number} {name} \u{2014} {from_stage} \u{2192} {to_stage}"); - let html = format!( - "{prefix}#{number} {name} \u{2014} {from_stage} \u{2192} {to_stage}" - ); - (plain, html) -} - -/// Format an error notification message for a story failure. -/// -/// Returns `(plain_text, html)` suitable for `RoomMessageEventContent::text_html`. -pub fn format_error_notification( - item_id: &str, - story_name: Option<&str>, - reason: &str, -) -> (String, String) { - let number = extract_story_number(item_id).unwrap_or(item_id); - let name = story_name.unwrap_or(item_id); - - let plain = format!("\u{274c} #{number} {name} \u{2014} {reason}"); - let html = format!("\u{274c} #{number} {name} \u{2014} {reason}"); - (plain, html) -} - -/// Look up a story name from the CRDT content store. +/// Look up a story name from the CRDT content store regardless of stage. /// /// Used for events (like rate-limit warnings) that arrive without a known stage. fn find_story_name_any_stage(project_root: &Path, item_id: &str) -> Option { read_story_name(project_root, "", item_id) } -/// Format a blocked-story notification message. -/// -/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. -pub fn format_blocked_notification( - item_id: &str, - story_name: Option<&str>, - reason: &str, -) -> (String, String) { - let number = extract_story_number(item_id).unwrap_or(item_id); - let name = story_name.unwrap_or(item_id); - - let plain = format!("\u{1f6ab} #{number} {name} \u{2014} BLOCKED: {reason}"); - let html = - format!("\u{1f6ab} #{number} {name} \u{2014} BLOCKED: {reason}"); - (plain, html) -} - -/// Minimum time between rate-limit notifications for the same agent. -const RATE_LIMIT_DEBOUNCE: Duration = Duration::from_secs(60); - -/// Window during which rapid stage transitions for the same item are coalesced -/// into a single notification (only the final stage is announced). -const STAGE_TRANSITION_DEBOUNCE: Duration = Duration::from_millis(200); - -/// Format a rate limit warning notification message. -/// -/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. -pub fn format_rate_limit_notification( - item_id: &str, - story_name: Option<&str>, - agent_name: &str, -) -> (String, String) { - let number = extract_story_number(item_id).unwrap_or(item_id); - let name = story_name.unwrap_or(item_id); - - let plain = - format!("\u{26a0}\u{fe0f} #{number} {name} \u{2014} {agent_name} hit an API rate limit"); - let html = format!( - "\u{26a0}\u{fe0f} #{number} {name} \u{2014} \ - {agent_name} hit an API rate limit" - ); - (plain, html) -} - /// Spawn a background task that listens for watcher events and posts /// stage-transition notifications to all configured rooms via the /// [`ChatTransport`] abstraction. @@ -184,7 +92,7 @@ pub fn spawn_notification_listener( to_display, ); slog!("[bot] Sending stage notification: {plain}"); - for room_id in &get_room_ids() { + for room_id in &rooms_for_notification(&get_room_ids) { if let Err(e) = transport.send_message(room_id, &plain, &html).await { slog!("[bot] Failed to send notification to {room_id}: {e}"); } @@ -194,139 +102,11 @@ pub fn spawn_notification_listener( continue; } - match recv_result.unwrap() { - Ok(WatcherEvent::WorkItem { - ref stage, - ref item_id, - ref from_stage, - .. - }) => { - // Only notify for transitions with a known source stage. - // Synthetic events (reassign, creation) have from_stage=None - // and must be skipped — the old inferred_from_stage fallback - // produced wrong notifications for stories that skipped stages - // (e.g. "QA → Merge" when QA was never entered). - let from_display = from_stage.as_deref().map(stage_display_name); - let Some(from_display) = from_display else { - continue; // creation or unknown transition — skip - }; - - // Look up the story name in the expected stage directory; fall - // back to a full search so stale events still show the name (AC1). - let story_name = read_story_name(&project_root, stage, item_id) - .or_else(|| find_story_name_any_stage(&project_root, item_id)); - - // Buffer the transition. If this item_id is already pending (rapid - // succession), update to_stage_key to the latest destination while - // preserving the original from_display (AC2). - pending_transitions - .entry(item_id.clone()) - .and_modify(|e| { - e.1 = stage.clone(); - if story_name.is_some() { - e.2 = story_name.clone(); - } - }) - .or_insert_with(|| (from_display.to_string(), stage.clone(), story_name)); - - // Start or extend the debounce window. - flush_deadline = Some(tokio::time::Instant::now() + STAGE_TRANSITION_DEBOUNCE); - } - Ok(WatcherEvent::MergeFailure { - ref story_id, - ref reason, - }) => { - let story_name = read_story_name(&project_root, "4_merge", story_id); - let (plain, html) = - format_error_notification(story_id, story_name.as_deref(), reason); - - slog!("[bot] Sending error notification: {plain}"); - - for room_id in &get_room_ids() { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!("[bot] Failed to send error notification to {room_id}: {e}"); - } - } - } - Ok(WatcherEvent::RateLimitWarning { - ref story_id, - ref agent_name, - }) => { - if !config.rate_limit_notifications { - slog!( - "[bot] RateLimitWarning suppressed by config for \ - {story_id}:{agent_name}" - ); - continue; - } - // Debounce: skip if we sent a notification for this agent - // within the last RATE_LIMIT_DEBOUNCE seconds. - let debounce_key = format!("{story_id}:{agent_name}"); - let now = Instant::now(); - if let Some(&last) = rate_limit_last_notified.get(&debounce_key) - && now.duration_since(last) < RATE_LIMIT_DEBOUNCE - { - slog!( - "[bot] Rate-limit notification debounced for \ - {story_id}:{agent_name}" - ); - continue; - } - rate_limit_last_notified.insert(debounce_key, now); - - let story_name = find_story_name_any_stage(&project_root, story_id); - let (plain, html) = - format_rate_limit_notification(story_id, story_name.as_deref(), agent_name); - - slog!("[bot] Sending rate-limit notification: {plain}"); - - for room_id in &get_room_ids() { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!( - "[bot] Failed to send rate-limit notification \ - to {room_id}: {e}" - ); - } - } - } - Ok(WatcherEvent::StoryBlocked { - ref story_id, - ref reason, - }) => { - let story_name = find_story_name_any_stage(&project_root, story_id); - let (plain, html) = - format_blocked_notification(story_id, story_name.as_deref(), reason); - - slog!("[bot] Sending blocked notification: {plain}"); - - for room_id in &get_room_ids() { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!("[bot] Failed to send blocked notification to {room_id}: {e}"); - } - } - } - Ok(WatcherEvent::RateLimitHardBlock { - ref story_id, - ref agent_name, - reset_at, - }) => { - // Log server-side for debugging; do NOT post to Matrix. - // Hard-block auto-resume is normal operation — the status - // command already surfaces rate-limit state via emoji. - slog!( - "[bot] Rate-limit hard block for {story_id}/{agent_name}, \ - auto-resume at {reset_at}" - ); - } - Ok(WatcherEvent::ConfigChanged) => { - // Hot-reload: pick up any changes to rate_limit_notifications. - if let Ok(new_cfg) = ProjectConfig::load(&project_root) { - config = new_cfg; - } - } - Ok(_) => {} // Ignore other events + let event = match recv_result.unwrap() { + Ok(ev) => ev, Err(broadcast::error::RecvError::Lagged(n)) => { slog!("[bot] Notification listener lagged, skipped {n} events"); + continue; } Err(broadcast::error::RecvError::Closed) => { slog!("[bot] Watcher channel closed, stopping notification listener"); @@ -342,7 +122,7 @@ pub fn spawn_notification_listener( to_display, ); slog!("[bot] Sending stage notification: {plain}"); - for room_id in &get_room_ids() { + for room_id in &rooms_for_notification(&get_room_ids) { if let Err(e) = transport.send_message(room_id, &plain, &html).await { slog!("[bot] Failed to send notification to {room_id}: {e}"); } @@ -350,6 +130,143 @@ pub fn spawn_notification_listener( } break; } + }; + + use super::events::EventAction; + match classify(&event) { + EventAction::StageTransition => { + // WorkItem with a known from_stage — extract the fields. + let WatcherEvent::WorkItem { + ref stage, + ref item_id, + ref from_stage, + .. + } = event + else { + continue; + }; + let from_display = stage_display_name(from_stage.as_deref().unwrap_or("")); + + // Look up the story name in the expected stage directory; fall + // back to a full search so stale events still show the name. + let story_name = read_story_name(&project_root, stage, item_id) + .or_else(|| find_story_name_any_stage(&project_root, item_id)); + + // Buffer the transition. If this item_id is already pending (rapid + // succession), update to_stage_key to the latest destination while + // preserving the original from_display. + pending_transitions + .entry(item_id.clone()) + .and_modify(|e| { + e.1 = stage.clone(); + if story_name.is_some() { + e.2 = story_name.clone(); + } + }) + .or_insert_with(|| (from_display.to_string(), stage.clone(), story_name)); + + // Start or extend the debounce window. + flush_deadline = Some(tokio::time::Instant::now() + STAGE_TRANSITION_DEBOUNCE); + } + EventAction::MergeFailure => { + let WatcherEvent::MergeFailure { + ref story_id, + ref reason, + } = event + else { + continue; + }; + let story_name = read_story_name(&project_root, "4_merge", story_id); + let (plain, html) = + format_error_notification(story_id, story_name.as_deref(), reason); + slog!("[bot] Sending error notification: {plain}"); + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!("[bot] Failed to send error notification to {room_id}: {e}"); + } + } + } + EventAction::RateLimitWarning => { + let WatcherEvent::RateLimitWarning { + ref story_id, + ref agent_name, + } = event + else { + continue; + }; + if !config.rate_limit_notifications { + slog!( + "[bot] RateLimitWarning suppressed by config for \ + {story_id}:{agent_name}" + ); + continue; + } + let debounce_key = format!("{story_id}:{agent_name}"); + let now = Instant::now(); + if !should_send_rate_limit( + rate_limit_last_notified.get(&debounce_key).copied(), + now, + ) { + slog!( + "[bot] Rate-limit notification debounced for \ + {story_id}:{agent_name}" + ); + continue; + } + rate_limit_last_notified.insert(debounce_key, now); + let story_name = find_story_name_any_stage(&project_root, story_id); + let (plain, html) = + format_rate_limit_notification(story_id, story_name.as_deref(), agent_name); + slog!("[bot] Sending rate-limit notification: {plain}"); + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!( + "[bot] Failed to send rate-limit notification \ + to {room_id}: {e}" + ); + } + } + } + EventAction::StoryBlocked => { + let WatcherEvent::StoryBlocked { + ref story_id, + ref reason, + } = event + else { + continue; + }; + let story_name = find_story_name_any_stage(&project_root, story_id); + let (plain, html) = + format_blocked_notification(story_id, story_name.as_deref(), reason); + slog!("[bot] Sending blocked notification: {plain}"); + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!("[bot] Failed to send blocked notification to {room_id}: {e}"); + } + } + } + EventAction::LogOnly => { + // Hard-block: log server-side for debugging; do NOT post to chat. + // Hard-block auto-resume is normal operation — the status command + // already surfaces rate-limit state via emoji. + if let WatcherEvent::RateLimitHardBlock { + ref story_id, + ref agent_name, + reset_at, + } = event + { + slog!( + "[bot] Rate-limit hard block for {story_id}/{agent_name}, \ + auto-resume at {reset_at}" + ); + } + } + EventAction::ReloadConfig => { + if let Ok(new_cfg) = ProjectConfig::load(&project_root) { + config = new_cfg; + } + } + EventAction::Skip => {} } } }); @@ -630,37 +547,6 @@ mod tests { assert_eq!(calls.len(), 0, "No rooms means no notifications"); } - // ── stage_display_name ────────────────────────────────────────────────── - - #[test] - fn stage_display_name_maps_all_known_stages() { - assert_eq!(stage_display_name("1_backlog"), "Backlog"); - assert_eq!(stage_display_name("2_current"), "Current"); - assert_eq!(stage_display_name("3_qa"), "QA"); - assert_eq!(stage_display_name("4_merge"), "Merge"); - assert_eq!(stage_display_name("5_done"), "Done"); - assert_eq!(stage_display_name("6_archived"), "Archived"); - assert_eq!(stage_display_name("unknown"), "Unknown"); - } - - // ── extract_story_number ──────────────────────────────────────────────── - - #[test] - fn extract_story_number_parses_numeric_prefix() { - assert_eq!( - extract_story_number("261_story_bot_notifications"), - Some("261") - ); - assert_eq!(extract_story_number("42_bug_fix_thing"), Some("42")); - assert_eq!(extract_story_number("1_spike_research"), Some("1")); - } - - #[test] - fn extract_story_number_returns_none_for_non_numeric() { - assert_eq!(extract_story_number("abc_story_thing"), None); - assert_eq!(extract_story_number(""), None); - } - // ── read_story_name ───────────────────────────────────────────────────── #[test] @@ -699,69 +585,6 @@ mod tests { assert_eq!(name, None); } - // ── format_error_notification ──────────────────────────────────────────── - - #[test] - fn format_error_notification_with_story_name() { - let (plain, html) = format_error_notification( - "262_story_bot_errors", - Some("Bot error notifications"), - "merge conflict in src/main.rs", - ); - assert_eq!( - plain, - "\u{274c} #262 Bot error notifications \u{2014} merge conflict in src/main.rs" - ); - assert_eq!( - html, - "\u{274c} #262 Bot error notifications \u{2014} merge conflict in src/main.rs" - ); - } - - #[test] - fn format_error_notification_without_story_name_falls_back_to_item_id() { - let (plain, _html) = format_error_notification("42_bug_fix_thing", None, "tests failed"); - assert_eq!(plain, "\u{274c} #42 42_bug_fix_thing \u{2014} tests failed"); - } - - #[test] - fn format_error_notification_non_numeric_id_uses_full_id() { - let (plain, _html) = - format_error_notification("abc_story_thing", Some("Some Story"), "clippy errors"); - assert_eq!( - plain, - "\u{274c} #abc_story_thing Some Story \u{2014} clippy errors" - ); - } - - // ── format_blocked_notification ───────────────────────────────────────── - - #[test] - fn format_blocked_notification_with_story_name() { - let (plain, html) = format_blocked_notification( - "425_story_blocking_reason", - Some("Blocking Reason Story"), - "Retry limit exceeded (3/3) at coder stage", - ); - assert_eq!( - plain, - "\u{1f6ab} #425 Blocking Reason Story \u{2014} BLOCKED: Retry limit exceeded (3/3) at coder stage" - ); - assert_eq!( - html, - "\u{1f6ab} #425 Blocking Reason Story \u{2014} BLOCKED: Retry limit exceeded (3/3) at coder stage" - ); - } - - #[test] - fn format_blocked_notification_falls_back_to_item_id() { - let (plain, _html) = format_blocked_notification("42_story_thing", None, "empty diff"); - assert_eq!( - plain, - "\u{1f6ab} #42 42_story_thing \u{2014} BLOCKED: empty diff" - ); - } - // ── spawn_notification_listener: StoryBlocked ─────────────────────────── /// AC1: when a StoryBlocked event arrives, send_message is called with a @@ -842,88 +665,6 @@ mod tests { assert_eq!(calls.len(), 0, "No rooms means no notifications"); } - // ── format_rate_limit_notification ───────────────────────────────────── - - #[test] - fn format_rate_limit_notification_includes_agent_and_story() { - let (plain, html) = - format_rate_limit_notification("365_story_my_feature", Some("My Feature"), "coder-2"); - assert_eq!( - plain, - "\u{26a0}\u{fe0f} #365 My Feature \u{2014} coder-2 hit an API rate limit" - ); - assert_eq!( - html, - "\u{26a0}\u{fe0f} #365 My Feature \u{2014} coder-2 hit an API rate limit" - ); - } - - #[test] - fn format_rate_limit_notification_falls_back_to_item_id() { - let (plain, _html) = format_rate_limit_notification("42_story_thing", None, "coder-1"); - assert_eq!( - plain, - "\u{26a0}\u{fe0f} #42 42_story_thing \u{2014} coder-1 hit an API rate limit" - ); - } - - // ── format_stage_notification ─────────────────────────────────────────── - - #[test] - fn format_notification_done_stage_includes_party_emoji() { - let (plain, html) = - format_stage_notification("353_story_done", Some("Done Story"), "Merge", "Done"); - assert_eq!( - plain, - "\u{1f389} #353 Done Story \u{2014} Merge \u{2192} Done" - ); - assert_eq!( - html, - "\u{1f389} #353 Done Story \u{2014} Merge \u{2192} Done" - ); - } - - #[test] - fn format_notification_non_done_stage_has_no_emoji() { - let (plain, _html) = - format_stage_notification("42_story_thing", Some("Some Story"), "Backlog", "Current"); - assert!(!plain.contains("\u{1f389}")); - } - - #[test] - fn format_notification_with_story_name() { - let (plain, html) = format_stage_notification( - "261_story_bot_notifications", - Some("Bot notifications"), - "Upcoming", - "Current", - ); - assert_eq!( - plain, - "#261 Bot notifications \u{2014} Upcoming \u{2192} Current" - ); - assert_eq!( - html, - "#261 Bot notifications \u{2014} Upcoming \u{2192} Current" - ); - } - - #[test] - fn format_notification_without_story_name_falls_back_to_item_id() { - let (plain, _html) = format_stage_notification("42_bug_fix_thing", None, "Current", "QA"); - assert_eq!(plain, "#42 42_bug_fix_thing \u{2014} Current \u{2192} QA"); - } - - #[test] - fn format_notification_non_numeric_id_uses_full_id() { - let (plain, _html) = - format_stage_notification("abc_story_thing", Some("Some Story"), "QA", "Merge"); - assert_eq!( - plain, - "#abc_story_thing Some Story \u{2014} QA \u{2192} Merge" - ); - } - // ── rate_limit_notifications config flag ───────────────────────────────── /// AC1+AC2: when rate_limit_notifications = false in project.toml, diff --git a/server/src/service/notifications/mod.rs b/server/src/service/notifications/mod.rs new file mode 100644 index 00000000..b9f1fe43 --- /dev/null +++ b/server/src/service/notifications/mod.rs @@ -0,0 +1,89 @@ +//! Notifications service — pipeline-event fan-out to chat transports. +//! +//! Subscribes to [`WatcherEvent`] broadcasts and posts human-readable messages +//! to all configured chat rooms whenever a work item moves through the pipeline. +//! +//! Follows service-module conventions: +//! - `mod.rs` (this file) — public API, typed [`Error`] type, orchestration +//! - `io.rs` — the ONLY place that performs side effects (DB reads, config +//! loads, `tokio::spawn`) +//! - `format.rs` — pure: message formatting functions +//! - `filter.rs` — pure: debounce constants and suppression predicates +//! - `events.rs` — pure: WatcherEvent classification / event mapping +//! - `route.rs` — pure: room-routing decisions + +pub(super) mod events; +pub(super) mod filter; +pub(super) mod format; +pub(super) mod io; +pub(super) mod route; + +pub use format::{ + format_blocked_notification, format_error_notification, format_stage_notification, + stage_display_name, +}; +pub use io::spawn_notification_listener; + +// ── Error type ──────────────────────────────────────────────────────────────── + +/// Typed errors returned by `service::notifications` operations. +/// +/// HTTP handlers and bot commands may map these to user-facing messages. +#[derive(Debug)] +#[allow(dead_code)] +pub enum Error { + /// The incoming event type is not recognised or not supported. + UnknownEvent(String), + /// A message could not be formatted for delivery (e.g. malformed input). + RenderFailure(String), + /// The underlying chat transport rejected the send operation. + TransportSendFailure(String), + /// Required configuration (room IDs, credentials) is absent. + ConfigMissing(String), +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::UnknownEvent(msg) => write!(f, "Unknown event: {msg}"), + Self::RenderFailure(msg) => write!(f, "Render failure: {msg}"), + Self::TransportSendFailure(msg) => write!(f, "Transport send failure: {msg}"), + Self::ConfigMissing(msg) => write!(f, "Config missing: {msg}"), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // ── Error Display ───────────────────────────────────────────────────────── + + #[test] + fn error_unknown_event_display() { + let e = Error::UnknownEvent("bad_event_type".to_string()); + assert!(e.to_string().contains("Unknown event")); + assert!(e.to_string().contains("bad_event_type")); + } + + #[test] + fn error_render_failure_display() { + let e = Error::RenderFailure("malformed input".to_string()); + assert!(e.to_string().contains("Render failure")); + assert!(e.to_string().contains("malformed input")); + } + + #[test] + fn error_transport_send_failure_display() { + let e = Error::TransportSendFailure("connection refused".to_string()); + assert!(e.to_string().contains("Transport send failure")); + assert!(e.to_string().contains("connection refused")); + } + + #[test] + fn error_config_missing_display() { + let e = Error::ConfigMissing("room_id not set".to_string()); + assert!(e.to_string().contains("Config missing")); + assert!(e.to_string().contains("room_id not set")); + } +} diff --git a/server/src/service/notifications/route.rs b/server/src/service/notifications/route.rs new file mode 100644 index 00000000..10000074 --- /dev/null +++ b/server/src/service/notifications/route.rs @@ -0,0 +1,42 @@ +//! Room-routing decisions for notifications. +//! +//! Pure functions that determine which destination room IDs should receive +//! a given notification. Currently all notification kinds are broadcast to +//! all registered rooms; this module is the single location to change that +//! policy if per-event routing is needed in the future. + +/// Return the rooms that should receive a notification. +/// +/// `get_room_ids` is called once per notification to obtain the current list +/// of destination room IDs. Passing a closure (rather than a static slice) +/// allows callers to use a runtime-mutable set, e.g. WhatsApp ambient senders. +/// +/// All currently supported event kinds are broadcast to every room returned +/// by the closure. +pub fn rooms_for_notification(get_room_ids: &impl Fn() -> Vec) -> Vec { + get_room_ids() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn returns_all_rooms_from_closure() { + let rooms = rooms_for_notification(&|| vec!["room1".to_string(), "room2".to_string()]); + assert_eq!(rooms, vec!["room1".to_string(), "room2".to_string()]); + } + + #[test] + fn returns_empty_when_no_rooms_registered() { + let rooms = rooms_for_notification(&Vec::new); + assert!(rooms.is_empty()); + } + + #[test] + fn returns_single_room() { + let rooms = rooms_for_notification(&|| vec!["!abc:example.org".to_string()]); + assert_eq!(rooms.len(), 1); + assert_eq!(rooms[0], "!abc:example.org"); + } +}