storkit: merge 393_story_pipeline_stage_notifications_for_whatsapp_and_slack_transports

This commit is contained in:
dave
2026-03-25 15:35:19 +00:00
parent f5024b2648
commit 5dd8feb75c
3 changed files with 130 additions and 17 deletions
+1 -1
View File
@@ -437,7 +437,7 @@ pub async fn run_bot(
notif_room_ids.iter().map(|r| r.to_string()).collect();
super::notifications::spawn_notification_listener(
Arc::clone(&transport),
notif_room_id_strings,
move || notif_room_id_strings.clone(),
watcher_rx,
notif_project_root,
);
+100 -16
View File
@@ -142,9 +142,14 @@ pub fn format_rate_limit_notification(
/// Spawn a background task that listens for watcher events and posts
/// stage-transition notifications to all configured rooms via the
/// [`ChatTransport`] abstraction.
///
/// `get_room_ids` is called on each notification to obtain the current list of
/// destination room IDs. Pass a closure that returns a static list for Matrix
/// and Slack, or one that reads from a runtime `Arc<Mutex<HashSet<String>>>`
/// for WhatsApp ambient senders.
pub fn spawn_notification_listener(
transport: Arc<dyn ChatTransport>,
room_ids: Vec<String>,
get_room_ids: impl Fn() -> Vec<String> + Send + 'static,
watcher_rx: broadcast::Receiver<WatcherEvent>,
project_root: PathBuf,
) {
@@ -175,12 +180,12 @@ pub fn spawn_notification_listener(
to_display,
);
slog!("[matrix-bot] Sending stage notification: {plain}");
slog!("[bot] Sending stage notification: {plain}");
for room_id in &room_ids {
for room_id in &get_room_ids() {
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
slog!(
"[matrix-bot] Failed to send notification to {room_id}: {e}"
"[bot] Failed to send notification to {room_id}: {e}"
);
}
}
@@ -197,12 +202,12 @@ pub fn spawn_notification_listener(
reason,
);
slog!("[matrix-bot] Sending error notification: {plain}");
slog!("[bot] Sending error notification: {plain}");
for room_id in &room_ids {
for room_id in &get_room_ids() {
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
slog!(
"[matrix-bot] Failed to send error notification to {room_id}: {e}"
"[bot] Failed to send error notification to {room_id}: {e}"
);
}
}
@@ -219,7 +224,7 @@ pub fn spawn_notification_listener(
&& now.duration_since(last) < RATE_LIMIT_DEBOUNCE
{
slog!(
"[matrix-bot] Rate-limit notification debounced for \
"[bot] Rate-limit notification debounced for \
{story_id}:{agent_name}"
);
continue;
@@ -233,12 +238,12 @@ pub fn spawn_notification_listener(
agent_name,
);
slog!("[matrix-bot] Sending rate-limit notification: {plain}");
slog!("[bot] Sending rate-limit notification: {plain}");
for room_id in &room_ids {
for room_id in &get_room_ids() {
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
slog!(
"[matrix-bot] Failed to send rate-limit notification \
"[bot] Failed to send rate-limit notification \
to {room_id}: {e}"
);
}
@@ -247,12 +252,12 @@ pub fn spawn_notification_listener(
Ok(_) => {} // Ignore non-work-item events
Err(broadcast::error::RecvError::Lagged(n)) => {
slog!(
"[matrix-bot] Notification listener lagged, skipped {n} events"
"[bot] Notification listener lagged, skipped {n} events"
);
}
Err(broadcast::error::RecvError::Closed) => {
slog!(
"[matrix-bot] Watcher channel closed, stopping notification listener"
"[bot] Watcher channel closed, stopping notification listener"
);
break;
}
@@ -319,7 +324,7 @@ mod tests {
spawn_notification_listener(
transport,
vec!["!room123:example.org".to_string()],
|| vec!["!room123:example.org".to_string()],
watcher_rx,
tmp.path().to_path_buf(),
);
@@ -353,7 +358,7 @@ mod tests {
spawn_notification_listener(
transport,
vec!["!room1:example.org".to_string()],
|| vec!["!room1:example.org".to_string()],
watcher_rx,
tmp.path().to_path_buf(),
);
@@ -383,7 +388,7 @@ mod tests {
spawn_notification_listener(
transport,
vec!["!room1:example.org".to_string()],
|| vec!["!room1:example.org".to_string()],
watcher_rx,
tmp.path().to_path_buf(),
);
@@ -403,6 +408,85 @@ mod tests {
assert_eq!(calls.len(), 2, "Different agents should each trigger a notification");
}
// ── dynamic room IDs (WhatsApp ambient_rooms pattern) ───────────────────
/// Notifications are sent to the rooms returned by the closure at
/// notification time, not at listener-spawn time. This verifies that a
/// closure backed by a runtime set (e.g. WhatsApp ambient_rooms) delivers
/// messages to the rooms present when the event fires.
#[tokio::test]
async fn stage_notification_uses_dynamic_room_ids() {
let tmp = tempfile::tempdir().unwrap();
let stage_dir = tmp.path().join(".storkit").join("work").join("3_qa");
std::fs::create_dir_all(&stage_dir).unwrap();
std::fs::write(
stage_dir.join("10_story_foo.md"),
"---\nname: Foo Story\n---\n",
)
.unwrap();
let (watcher_tx, watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let (transport, calls) = MockTransport::new();
let rooms: Arc<std::sync::Mutex<std::collections::HashSet<String>>> =
Arc::new(std::sync::Mutex::new(std::collections::HashSet::new()));
let rooms_for_closure = Arc::clone(&rooms);
spawn_notification_listener(
transport,
move || rooms_for_closure.lock().unwrap().iter().cloned().collect(),
watcher_rx,
tmp.path().to_path_buf(),
);
// Add a room after the listener is spawned (simulates a user messaging first).
rooms.lock().unwrap().insert("phone:+15551234567".to_string());
watcher_tx.send(WatcherEvent::WorkItem {
stage: "3_qa".to_string(),
item_id: "10_story_foo".to_string(),
action: "qa".to_string(),
commit_msg: "storkit: qa 10_story_foo".to_string(),
}).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let calls = calls.lock().unwrap();
assert_eq!(calls.len(), 1, "Should deliver to the dynamically added room");
assert_eq!(calls[0].0, "phone:+15551234567");
assert!(calls[0].1.contains("10"), "plain should contain story number");
assert!(calls[0].1.contains("Foo Story"), "plain should contain story name");
}
/// When no rooms are registered (e.g. no WhatsApp users have messaged yet),
/// no notifications are sent and the listener does not panic.
#[tokio::test]
async fn stage_notification_with_no_rooms_is_silent() {
let tmp = tempfile::tempdir().unwrap();
let (watcher_tx, watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let (transport, calls) = MockTransport::new();
spawn_notification_listener(
transport,
|| vec![],
watcher_rx,
tmp.path().to_path_buf(),
);
watcher_tx.send(WatcherEvent::WorkItem {
stage: "3_qa".to_string(),
item_id: "10_story_foo".to_string(),
action: "qa".to_string(),
commit_msg: "storkit: qa 10_story_foo".to_string(),
}).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let calls = calls.lock().unwrap();
assert_eq!(calls.len(), 0, "No rooms means no notifications");
}
// ── stage_display_name ──────────────────────────────────────────────────
#[test]
+29
View File
@@ -250,6 +250,10 @@ async fn main() -> Result<(), std::io::Error> {
// Clone watcher_tx for the Matrix bot before it is moved into AppContext.
let watcher_tx_for_bot = watcher_tx.clone();
// Subscribe to watcher events for WhatsApp/Slack notification listeners
// before watcher_tx is moved into AppContext.
let watcher_rx_for_whatsapp = watcher_tx.subscribe();
let watcher_rx_for_slack = watcher_tx.subscribe();
// Wrap perm_rx in Arc<Mutex> so it can be shared with both the WebSocket
// handler (via AppContext) and the Matrix bot.
let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx));
@@ -413,6 +417,31 @@ async fn main() -> Result<(), std::io::Error> {
drop(matrix_shutdown_rx);
}
// Spawn stage-transition notification listeners for WhatsApp and Slack.
// These mirror the listener that the Matrix bot spawns internally.
if let (Some(ctx), Some(root)) = (&whatsapp_ctx, &startup_root) {
let ambient_rooms = Arc::clone(&ctx.ambient_rooms);
chat::transport::matrix::notifications::spawn_notification_listener(
Arc::clone(&ctx.transport),
move || ambient_rooms.lock().unwrap().iter().cloned().collect(),
watcher_rx_for_whatsapp,
root.clone(),
);
} else {
drop(watcher_rx_for_whatsapp);
}
if let (Some(ctx), Some(root)) = (&slack_ctx, &startup_root) {
let channel_ids: Vec<String> = ctx.channel_ids.iter().cloned().collect();
chat::transport::matrix::notifications::spawn_notification_listener(
Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>,
move || channel_ids.clone(),
watcher_rx_for_slack,
root.clone(),
);
} else {
drop(watcher_rx_for_slack);
}
// On startup:
// 1. Reconcile any stories whose agent work was committed while the server was
// offline (worktree has commits ahead of master but pipeline didn't advance).