From 5dd8feb75c1af21a15c24af4dcb0c08dec01d6cc Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 25 Mar 2026 15:35:19 +0000 Subject: [PATCH] storkit: merge 393_story_pipeline_stage_notifications_for_whatsapp_and_slack_transports --- server/src/chat/transport/matrix/bot.rs | 2 +- .../chat/transport/matrix/notifications.rs | 116 +++++++++++++++--- server/src/main.rs | 29 +++++ 3 files changed, 130 insertions(+), 17 deletions(-) diff --git a/server/src/chat/transport/matrix/bot.rs b/server/src/chat/transport/matrix/bot.rs index 188c6d2a..631614a5 100644 --- a/server/src/chat/transport/matrix/bot.rs +++ b/server/src/chat/transport/matrix/bot.rs @@ -437,7 +437,7 @@ pub async fn run_bot( notif_room_ids.iter().map(|r| r.to_string()).collect(); super::notifications::spawn_notification_listener( Arc::clone(&transport), - notif_room_id_strings, + move || notif_room_id_strings.clone(), watcher_rx, notif_project_root, ); diff --git a/server/src/chat/transport/matrix/notifications.rs b/server/src/chat/transport/matrix/notifications.rs index cadaae60..ee1b51ff 100644 --- a/server/src/chat/transport/matrix/notifications.rs +++ b/server/src/chat/transport/matrix/notifications.rs @@ -142,9 +142,14 @@ pub fn format_rate_limit_notification( /// Spawn a background task that listens for watcher events and posts /// stage-transition notifications to all configured rooms via the /// [`ChatTransport`] abstraction. +/// +/// `get_room_ids` is called on each notification to obtain the current list of +/// destination room IDs. Pass a closure that returns a static list for Matrix +/// and Slack, or one that reads from a runtime `Arc>>` +/// for WhatsApp ambient senders. pub fn spawn_notification_listener( transport: Arc, - room_ids: Vec, + get_room_ids: impl Fn() -> Vec + Send + 'static, watcher_rx: broadcast::Receiver, project_root: PathBuf, ) { @@ -175,12 +180,12 @@ pub fn spawn_notification_listener( to_display, ); - slog!("[matrix-bot] Sending stage notification: {plain}"); + slog!("[bot] Sending stage notification: {plain}"); - for room_id in &room_ids { + for room_id in &get_room_ids() { if let Err(e) = transport.send_message(room_id, &plain, &html).await { slog!( - "[matrix-bot] Failed to send notification to {room_id}: {e}" + "[bot] Failed to send notification to {room_id}: {e}" ); } } @@ -197,12 +202,12 @@ pub fn spawn_notification_listener( reason, ); - slog!("[matrix-bot] Sending error notification: {plain}"); + slog!("[bot] Sending error notification: {plain}"); - for room_id in &room_ids { + for room_id in &get_room_ids() { if let Err(e) = transport.send_message(room_id, &plain, &html).await { slog!( - "[matrix-bot] Failed to send error notification to {room_id}: {e}" + "[bot] Failed to send error notification to {room_id}: {e}" ); } } @@ -219,7 +224,7 @@ pub fn spawn_notification_listener( && now.duration_since(last) < RATE_LIMIT_DEBOUNCE { slog!( - "[matrix-bot] Rate-limit notification debounced for \ + "[bot] Rate-limit notification debounced for \ {story_id}:{agent_name}" ); continue; @@ -233,12 +238,12 @@ pub fn spawn_notification_listener( agent_name, ); - slog!("[matrix-bot] Sending rate-limit notification: {plain}"); + slog!("[bot] Sending rate-limit notification: {plain}"); - for room_id in &room_ids { + for room_id in &get_room_ids() { if let Err(e) = transport.send_message(room_id, &plain, &html).await { slog!( - "[matrix-bot] Failed to send rate-limit notification \ + "[bot] Failed to send rate-limit notification \ to {room_id}: {e}" ); } @@ -247,12 +252,12 @@ pub fn spawn_notification_listener( Ok(_) => {} // Ignore non-work-item events Err(broadcast::error::RecvError::Lagged(n)) => { slog!( - "[matrix-bot] Notification listener lagged, skipped {n} events" + "[bot] Notification listener lagged, skipped {n} events" ); } Err(broadcast::error::RecvError::Closed) => { slog!( - "[matrix-bot] Watcher channel closed, stopping notification listener" + "[bot] Watcher channel closed, stopping notification listener" ); break; } @@ -319,7 +324,7 @@ mod tests { spawn_notification_listener( transport, - vec!["!room123:example.org".to_string()], + || vec!["!room123:example.org".to_string()], watcher_rx, tmp.path().to_path_buf(), ); @@ -353,7 +358,7 @@ mod tests { spawn_notification_listener( transport, - vec!["!room1:example.org".to_string()], + || vec!["!room1:example.org".to_string()], watcher_rx, tmp.path().to_path_buf(), ); @@ -383,7 +388,7 @@ mod tests { spawn_notification_listener( transport, - vec!["!room1:example.org".to_string()], + || vec!["!room1:example.org".to_string()], watcher_rx, tmp.path().to_path_buf(), ); @@ -403,6 +408,85 @@ mod tests { assert_eq!(calls.len(), 2, "Different agents should each trigger a notification"); } + // ── dynamic room IDs (WhatsApp ambient_rooms pattern) ─────────────────── + + /// Notifications are sent to the rooms returned by the closure at + /// notification time, not at listener-spawn time. This verifies that a + /// closure backed by a runtime set (e.g. WhatsApp ambient_rooms) delivers + /// messages to the rooms present when the event fires. + #[tokio::test] + async fn stage_notification_uses_dynamic_room_ids() { + let tmp = tempfile::tempdir().unwrap(); + let stage_dir = tmp.path().join(".storkit").join("work").join("3_qa"); + std::fs::create_dir_all(&stage_dir).unwrap(); + std::fs::write( + stage_dir.join("10_story_foo.md"), + "---\nname: Foo Story\n---\n", + ) + .unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + let rooms: Arc>> = + Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())); + let rooms_for_closure = Arc::clone(&rooms); + + spawn_notification_listener( + transport, + move || rooms_for_closure.lock().unwrap().iter().cloned().collect(), + watcher_rx, + tmp.path().to_path_buf(), + ); + + // Add a room after the listener is spawned (simulates a user messaging first). + rooms.lock().unwrap().insert("phone:+15551234567".to_string()); + + watcher_tx.send(WatcherEvent::WorkItem { + stage: "3_qa".to_string(), + item_id: "10_story_foo".to_string(), + action: "qa".to_string(), + commit_msg: "storkit: qa 10_story_foo".to_string(), + }).unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 1, "Should deliver to the dynamically added room"); + assert_eq!(calls[0].0, "phone:+15551234567"); + assert!(calls[0].1.contains("10"), "plain should contain story number"); + assert!(calls[0].1.contains("Foo Story"), "plain should contain story name"); + } + + /// When no rooms are registered (e.g. no WhatsApp users have messaged yet), + /// no notifications are sent and the listener does not panic. + #[tokio::test] + async fn stage_notification_with_no_rooms_is_silent() { + let tmp = tempfile::tempdir().unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec![], + watcher_rx, + tmp.path().to_path_buf(), + ); + + watcher_tx.send(WatcherEvent::WorkItem { + stage: "3_qa".to_string(), + item_id: "10_story_foo".to_string(), + action: "qa".to_string(), + commit_msg: "storkit: qa 10_story_foo".to_string(), + }).unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 0, "No rooms means no notifications"); + } + // ── stage_display_name ────────────────────────────────────────────────── #[test] diff --git a/server/src/main.rs b/server/src/main.rs index 2d585805..44da66eb 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -250,6 +250,10 @@ async fn main() -> Result<(), std::io::Error> { // Clone watcher_tx for the Matrix bot before it is moved into AppContext. let watcher_tx_for_bot = watcher_tx.clone(); + // Subscribe to watcher events for WhatsApp/Slack notification listeners + // before watcher_tx is moved into AppContext. + let watcher_rx_for_whatsapp = watcher_tx.subscribe(); + let watcher_rx_for_slack = watcher_tx.subscribe(); // Wrap perm_rx in Arc so it can be shared with both the WebSocket // handler (via AppContext) and the Matrix bot. let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx)); @@ -413,6 +417,31 @@ async fn main() -> Result<(), std::io::Error> { drop(matrix_shutdown_rx); } + // Spawn stage-transition notification listeners for WhatsApp and Slack. + // These mirror the listener that the Matrix bot spawns internally. + if let (Some(ctx), Some(root)) = (&whatsapp_ctx, &startup_root) { + let ambient_rooms = Arc::clone(&ctx.ambient_rooms); + chat::transport::matrix::notifications::spawn_notification_listener( + Arc::clone(&ctx.transport), + move || ambient_rooms.lock().unwrap().iter().cloned().collect(), + watcher_rx_for_whatsapp, + root.clone(), + ); + } else { + drop(watcher_rx_for_whatsapp); + } + if let (Some(ctx), Some(root)) = (&slack_ctx, &startup_root) { + let channel_ids: Vec = ctx.channel_ids.iter().cloned().collect(); + chat::transport::matrix::notifications::spawn_notification_listener( + Arc::clone(&ctx.transport) as Arc, + move || channel_ids.clone(), + watcher_rx_for_slack, + root.clone(), + ); + } else { + drop(watcher_rx_for_slack); + } + // On startup: // 1. Reconcile any stories whose agent work was committed while the server was // offline (worktree has commits ahead of master but pipeline didn't advance).