diff --git a/server/src/agents/pool/lifecycle.rs b/server/src/agents/pool/lifecycle.rs index c6584820..6a14b3d0 100644 --- a/server/src/agents/pool/lifecycle.rs +++ b/server/src/agents/pool/lifecycle.rs @@ -539,6 +539,7 @@ impl AgentPool { item_id: sid.clone(), action: "reassign".to_string(), commit_msg: String::new(), + from_stage: None, }, ); } else { diff --git a/server/src/chat/transport/matrix/notifications.rs b/server/src/chat/transport/matrix/notifications.rs index e9d152e4..064b1920 100644 --- a/server/src/chat/transport/matrix/notifications.rs +++ b/server/src/chat/transport/matrix/notifications.rs @@ -136,6 +136,10 @@ pub fn format_blocked_notification( /// Minimum time between rate-limit notifications for the same agent. const RATE_LIMIT_DEBOUNCE: Duration = Duration::from_secs(60); +/// Window during which rapid stage transitions for the same item are coalesced +/// into a single notification (only the final stage is announced). +const STAGE_TRANSITION_DEBOUNCE: Duration = Duration::from_millis(200); + /// Format a rate limit hard block notification message with scheduled resume time. /// /// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. @@ -202,36 +206,87 @@ pub fn spawn_notification_listener( // "story_id:agent_name" key, to debounce repeated warnings. let mut rate_limit_last_notified: HashMap = HashMap::new(); + // Pending stage-transition notifications, keyed by item_id. + // Value: (from_display, to_stage_key, story_name). + // Rapid successive transitions for the same item are coalesced: the + // original from_display is kept while to_stage_key is updated to the + // latest destination, so only one notification fires for the final stage. + let mut pending_transitions: HashMap)> = + HashMap::new(); + let mut flush_deadline: Option = None; + loop { - match rx.recv().await { + // Wait for the next event, or flush pending transitions when the + // debounce window expires. + let recv_result = if let Some(deadline) = flush_deadline { + tokio::time::timeout_at(deadline, rx.recv()).await.ok() + } else { + Some(rx.recv().await) + }; + + if recv_result.is_none() { + // Flush all coalesced stage-transition notifications. + for (item_id, (from_display, to_stage_key, story_name)) in + pending_transitions.drain() + { + let to_display = stage_display_name(&to_stage_key); + let (plain, html) = format_stage_notification( + &item_id, + story_name.as_deref(), + &from_display, + to_display, + ); + slog!("[bot] Sending stage notification: {plain}"); + for room_id in &get_room_ids() { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!("[bot] Failed to send notification to {room_id}: {e}"); + } + } + } + flush_deadline = None; + continue; + } + + match recv_result.unwrap() { Ok(WatcherEvent::WorkItem { ref stage, ref item_id, + ref from_stage, .. }) => { - // Only notify on stage transitions, not creations. - let Some(from_display) = inferred_from_stage(stage) else { - continue; + // Determine from_display: prefer the actual from_stage recorded + // in the event (AC3); fall back to inference for synthetic events. + let from_display = from_stage + .as_deref() + .map(stage_display_name) + .or_else(|| inferred_from_stage(stage)); + let Some(from_display) = from_display else { + continue; // creation or unknown transition — skip }; - let to_display = stage_display_name(stage); - let story_name = read_story_name(&project_root, stage, item_id); - let (plain, html) = format_stage_notification( - item_id, - story_name.as_deref(), - from_display, - to_display, - ); + // Look up the story name in the expected stage directory; fall + // back to a full search so stale events still show the name (AC1). + let story_name = read_story_name(&project_root, stage, item_id) + .or_else(|| find_story_name_any_stage(&project_root, item_id)); - slog!("[bot] Sending stage notification: {plain}"); + // Buffer the transition. If this item_id is already pending (rapid + // succession), update to_stage_key to the latest destination while + // preserving the original from_display (AC2). + pending_transitions + .entry(item_id.clone()) + .and_modify(|e| { + e.1 = stage.clone(); + if story_name.is_some() { + e.2 = story_name.clone(); + } + }) + .or_insert_with(|| { + (from_display.to_string(), stage.clone(), story_name) + }); - for room_id in &get_room_ids() { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!( - "[bot] Failed to send notification to {room_id}: {e}" - ); - } - } + // Start or extend the debounce window. + flush_deadline = + Some(tokio::time::Instant::now() + STAGE_TRANSITION_DEBOUNCE); } Ok(WatcherEvent::MergeFailure { ref story_id, @@ -362,6 +417,28 @@ pub fn spawn_notification_listener( slog!( "[bot] Watcher channel closed, stopping notification listener" ); + // Flush any coalesced transitions that haven't fired yet. + for (item_id, (from_display, to_stage_key, story_name)) in + pending_transitions.drain() + { + let to_display = stage_display_name(&to_stage_key); + let (plain, html) = format_stage_notification( + &item_id, + story_name.as_deref(), + &from_display, + to_display, + ); + slog!("[bot] Sending stage notification: {plain}"); + for room_id in &get_room_ids() { + if let Err(e) = + transport.send_message(room_id, &plain, &html).await + { + slog!( + "[bot] Failed to send notification to {room_id}: {e}" + ); + } + } + } break; } } @@ -550,9 +627,12 @@ mod tests { item_id: "10_story_foo".to_string(), action: "qa".to_string(), commit_msg: "storkit: qa 10_story_foo".to_string(), + from_stage: None, }).unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // Wait longer than STAGE_TRANSITION_DEBOUNCE (200ms) so the coalesced + // notification flushes. + tokio::time::sleep(std::time::Duration::from_millis(350)).await; let calls = calls.lock().unwrap(); assert_eq!(calls.len(), 1, "Should deliver to the dynamically added room"); @@ -582,6 +662,7 @@ mod tests { item_id: "10_story_foo".to_string(), action: "qa".to_string(), commit_msg: "storkit: qa 10_story_foo".to_string(), + from_stage: None, }).unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; diff --git a/server/src/http/ws.rs b/server/src/http/ws.rs index ef899f09..d775a4c0 100644 --- a/server/src/http/ws.rs +++ b/server/src/http/ws.rs @@ -168,6 +168,7 @@ impl From for Option { item_id, action, commit_msg, + .. } => Some(WsResponse::WorkItemChanged { stage, item_id, @@ -901,6 +902,7 @@ mod tests { item_id: "42_story_foo".to_string(), action: "start".to_string(), commit_msg: "storkit: start 42_story_foo".to_string(), + from_stage: None, }; let ws_msg: Option = evt.into(); let ws_msg = ws_msg.expect("WorkItem should produce Some"); @@ -1380,6 +1382,7 @@ mod tests { item_id: "99_story_test".to_string(), action: "start".to_string(), commit_msg: "storkit: start 99_story_test".to_string(), + from_stage: None, }) .unwrap(); diff --git a/server/src/io/watcher.rs b/server/src/io/watcher.rs index b72b0cbb..9104da0f 100644 --- a/server/src/io/watcher.rs +++ b/server/src/io/watcher.rs @@ -44,6 +44,9 @@ pub enum WatcherEvent { action: String, /// The deterministic git commit message used (or that would have been used). commit_msg: String, + /// The pipeline stage the item moved FROM, populated for move operations. + /// `None` for creations, deletions, or synthetic events. + from_stage: Option, }, /// `.storkit/project.toml` was modified at the project root (not inside a worktree). ConfigChanged, @@ -276,12 +279,27 @@ fn flush_pending( slog!("[watcher] flush (broadcast-only): {commit_msg}"); } + // For move operations, find the source stage from deleted entries with matching item_id. + let from_stage: Option = if !additions.is_empty() { + pending + .iter() + .filter(|(path, _)| !path.exists()) + .find(|(path, _)| { + path.file_stem() + .and_then(|s| s.to_str()) == Some(item_id.as_str()) + }) + .map(|(_, stage)| stage.clone()) + } else { + None + }; + // Always broadcast the event so connected WebSocket clients stay in sync. let evt = WatcherEvent::WorkItem { stage: dest_stage.to_string(), item_id, action: action.to_string(), commit_msg, + from_stage, }; let _ = event_tx.send(evt); } @@ -623,6 +641,7 @@ mod tests { item_id, action, commit_msg, + .. } => { assert_eq!(stage, "1_backlog"); assert_eq!(item_id, "42_story_foo"); @@ -667,6 +686,7 @@ mod tests { item_id, action, commit_msg, + .. } => { assert_eq!(stage, "2_current"); assert_eq!(item_id, "42_story_foo"); @@ -922,6 +942,75 @@ mod tests { ); } + // ── flush_pending from_stage ───────────────────────────────────────────── + + /// AC3: when a pending map contains both a deletion (source stage) and a + /// creation (dest stage) for the same item_id, the broadcast event should + /// have `from_stage` set to the source stage key. + #[test] + fn flush_pending_sets_from_stage_for_move_operations() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + + // Destination exists (file moved here). + let merge_dir = make_stage_dir(tmp.path(), "4_merge"); + let merge_path = merge_dir.join("42_story_foo.md"); + fs::write(&merge_path, "---\nname: test\n---\n").unwrap(); + + // Source path does NOT exist (file was moved away). + make_stage_dir(tmp.path(), "3_qa"); + let qa_path = tmp + .path() + .join(".storkit") + .join("work") + .join("3_qa") + .join("42_story_foo.md"); + + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + let mut pending = HashMap::new(); + pending.insert(merge_path, "4_merge".to_string()); // addition + pending.insert(qa_path, "3_qa".to_string()); // deletion + + flush_pending(&pending, tmp.path(), &tx); + + let evt = rx.try_recv().expect("expected event"); + match evt { + WatcherEvent::WorkItem { + stage, from_stage, .. + } => { + assert_eq!(stage, "4_merge"); + assert_eq!(from_stage, Some("3_qa".to_string())); + } + other => panic!("unexpected event: {other:?}"), + } + } + + /// AC3: when a pending map has only an addition (creation, not a move), + /// `from_stage` should be `None`. + #[test] + fn flush_pending_sets_from_stage_to_none_for_creations() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + + let stage_dir = make_stage_dir(tmp.path(), "2_current"); + let story_path = stage_dir.join("55_story_new.md"); + fs::write(&story_path, "---\nname: New Story\n---\n").unwrap(); + + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + let mut pending = HashMap::new(); + pending.insert(story_path, "2_current".to_string()); + + flush_pending(&pending, tmp.path(), &tx); + + let evt = rx.try_recv().expect("expected event"); + match evt { + WatcherEvent::WorkItem { from_stage, .. } => { + assert_eq!(from_stage, None, "creation should have no from_stage"); + } + other => panic!("unexpected event: {other:?}"), + } + } + // ── stage_for_path (additional edge cases) ──────────────────────────────── #[test]