diff --git a/server/src/agents/pool/pipeline/completion/server.rs b/server/src/agents/pool/pipeline/completion/server.rs index bf26c7ef..0e0697db 100644 --- a/server/src/agents/pool/pipeline/completion/server.rs +++ b/server/src/agents/pool/pipeline/completion/server.rs @@ -181,6 +181,13 @@ pub(in crate::agents::pool) async fn run_server_owned_completion( "[agents] Server-owned completion for '{story_id}:{agent_name}': gates_passed={gates_passed}" ); + // Notify chat transports of the agent completion result. + let _ = watcher_tx.send(WatcherEvent::AgentCompleted { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + success: gates_passed, + }); + let report = CompletionReport { summary: "Agent process exited normally".to_string(), gates_passed, diff --git a/server/src/agents/pool/start/spawn.rs b/server/src/agents/pool/start/spawn.rs index 1680f3c7..b8e4c663 100644 --- a/server/src/agents/pool/start/spawn.rs +++ b/server/src/agents/pool/start/spawn.rs @@ -217,6 +217,10 @@ pub(super) async fn run_agent_spawn( status: "running".to_string(), }); AgentPool::notify_agent_state_changed(&watcher_tx_clone); + let _ = watcher_tx_clone.send(WatcherEvent::AgentStarted { + story_id: sid.clone(), + agent_name: aname.clone(), + }); // Step 4: launch the agent process via the configured runtime. let runtime_name = config_clone diff --git a/server/src/config/mod.rs b/server/src/config/mod.rs index fec185ed..6fb76078 100644 --- a/server/src/config/mod.rs +++ b/server/src/config/mod.rs @@ -125,6 +125,14 @@ pub struct ProjectConfig { /// Example: `gateway_project = "huskies"`. #[serde(default)] pub gateway_project: Option, + /// Whether the notification listener pushes status messages to chat rooms. + /// + /// Set to `false` for quiet mode: all WatcherEvent-driven chat pushes + /// (stage transitions, agent started/completed, blocked, merge failures) + /// are suppressed. Other bot functionality (commands, responses) is + /// unaffected. Default: `true`. + #[serde(default = "default_status_push_enabled")] + pub status_push_enabled: bool, } /// Configuration for the filesystem watcher's sweep behaviour. @@ -192,6 +200,10 @@ fn default_whatsapp_status_consumer() -> bool { true } +fn default_status_push_enabled() -> bool { + true +} + fn default_max_mesh_peers() -> usize { 3 } @@ -336,6 +348,7 @@ impl Default for ProjectConfig { max_mesh_peers: default_max_mesh_peers(), gateway_url: None, gateway_project: None, + status_push_enabled: default_status_push_enabled(), } } } @@ -424,6 +437,7 @@ impl ProjectConfig { max_mesh_peers: default_max_mesh_peers(), gateway_url: None, gateway_project: None, + status_push_enabled: default_status_push_enabled(), }; validate_agents(&config.agent)?; return Ok(config); @@ -463,6 +477,7 @@ impl ProjectConfig { max_mesh_peers: default_max_mesh_peers(), gateway_url: None, gateway_project: None, + status_push_enabled: default_status_push_enabled(), }; validate_agents(&config.agent)?; Ok(config) @@ -490,6 +505,7 @@ impl ProjectConfig { max_mesh_peers: default_max_mesh_peers(), gateway_url: None, gateway_project: None, + status_push_enabled: default_status_push_enabled(), }) } } diff --git a/server/src/io/watcher/events.rs b/server/src/io/watcher/events.rs index d2b0b08f..df55575f 100644 --- a/server/src/io/watcher/events.rs +++ b/server/src/io/watcher/events.rs @@ -72,4 +72,22 @@ pub enum WatcherEvent { /// Human-readable message describing when the earliest reset occurs. earliest_reset_msg: String, }, + /// An agent transitioned to the Running state for a story. + /// Triggers a status notification to configured chat rooms. + AgentStarted { + /// Work item ID the agent is working on. + story_id: String, + /// Name of the agent that started. + agent_name: String, + }, + /// An agent finished processing a story (gates passed or failed). + /// Triggers a status notification to configured chat rooms. + AgentCompleted { + /// Work item ID the agent was working on. + story_id: String, + /// Name of the agent that completed. + agent_name: String, + /// `true` if acceptance gates passed; `false` if they failed. + success: bool, + }, } diff --git a/server/src/service/notifications/events.rs b/server/src/service/notifications/events.rs index 67bcbc17..d52fd8e4 100644 --- a/server/src/service/notifications/events.rs +++ b/server/src/service/notifications/events.rs @@ -21,6 +21,13 @@ pub enum EventAction { OAuthAccountSwapped, /// Post an OAuth accounts-exhausted notification with the earliest reset time. OAuthAccountsExhausted, + /// Post an agent-started (running) notification. + AgentStarted, + /// Post an agent-completed notification with pass/fail result. + AgentCompleted { + /// `true` if acceptance gates passed. + success: bool, + }, /// Log server-side only; do not post to chat (e.g. hard rate-limit blocks). LogOnly, /// Reload the project configuration. @@ -48,6 +55,10 @@ pub fn classify(event: &WatcherEvent) -> EventAction { WatcherEvent::ConfigChanged => EventAction::ReloadConfig, WatcherEvent::OAuthAccountSwapped { .. } => EventAction::OAuthAccountSwapped, WatcherEvent::OAuthAccountsExhausted { .. } => EventAction::OAuthAccountsExhausted, + WatcherEvent::AgentStarted { .. } => EventAction::AgentStarted, + WatcherEvent::AgentCompleted { success, .. } => { + EventAction::AgentCompleted { success: *success } + } _ => EventAction::Skip, } } @@ -138,4 +149,39 @@ mod tests { }; assert_eq!(classify(&event), EventAction::OAuthAccountsExhausted); } + + #[test] + fn agent_started_is_classified_correctly() { + let event = WatcherEvent::AgentStarted { + story_id: "1_story_foo".to_string(), + agent_name: "coder-1".to_string(), + }; + assert_eq!(classify(&event), EventAction::AgentStarted); + } + + #[test] + fn agent_completed_success_is_classified_correctly() { + let event = WatcherEvent::AgentCompleted { + story_id: "1_story_foo".to_string(), + agent_name: "coder-1".to_string(), + success: true, + }; + assert_eq!( + classify(&event), + EventAction::AgentCompleted { success: true } + ); + } + + #[test] + fn agent_completed_failure_is_classified_correctly() { + let event = WatcherEvent::AgentCompleted { + story_id: "1_story_foo".to_string(), + agent_name: "coder-1".to_string(), + success: false, + }; + assert_eq!( + classify(&event), + EventAction::AgentCompleted { success: false } + ); + } } diff --git a/server/src/service/notifications/filter.rs b/server/src/service/notifications/filter.rs index 7f20f1d2..3adee368 100644 --- a/server/src/service/notifications/filter.rs +++ b/server/src/service/notifications/filter.rs @@ -12,6 +12,10 @@ pub const RATE_LIMIT_DEBOUNCE: Duration = Duration::from_secs(60); /// into a single notification (only the final stage is announced). pub const STAGE_TRANSITION_DEBOUNCE: Duration = Duration::from_millis(200); +/// Window during which repeated agent-status events for the same story are +/// coalesced into a single notification (only the latest is sent). +pub const AGENT_EVENT_DEBOUNCE: Duration = Duration::from_secs(5); + /// Returns `true` if a rate-limit notification should be sent. /// /// `last_notified` is the [`Instant`] of the last sent notification for this diff --git a/server/src/service/notifications/format.rs b/server/src/service/notifications/format.rs index 8beeb7fa..0923a13b 100644 --- a/server/src/service/notifications/format.rs +++ b/server/src/service/notifications/format.rs @@ -115,6 +115,65 @@ pub fn format_oauth_accounts_exhausted(earliest_reset_msg: &str) -> (String, Str (plain, html) } +/// Format an agent-started notification message. +/// +/// Sent when an agent transitions to the Running state. +/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. +pub fn format_agent_started_notification( + item_id: &str, + story_name: Option<&str>, + agent_name: &str, +) -> (String, String) { + let number = extract_item_number(item_id).unwrap_or(item_id); + let name = story_name.unwrap_or(item_id); + let plain = format!("\u{1F916} #{number} {name} \u{2014} {agent_name} started"); + let html = format!( + "\u{1F916} #{number} {name} \u{2014} {agent_name} started" + ); + (plain, html) +} + +/// Format an agent-completed notification message. +/// +/// Sent when an agent finishes processing a story (gates passed or failed). +/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. +pub fn format_agent_completed_notification( + item_id: &str, + story_name: Option<&str>, + agent_name: &str, + success: bool, +) -> (String, String) { + let number = extract_item_number(item_id).unwrap_or(item_id); + let name = story_name.unwrap_or(item_id); + let (emoji, result) = if success { + ("\u{2705}", "completed") // ✅ + } else { + ("\u{274C}", "failed") // ❌ + }; + let plain = format!("{emoji} #{number} {name} \u{2014} {agent_name} {result}"); + let html = format!( + "{emoji} #{number} {name} \u{2014} {agent_name} {result}" + ); + (plain, html) +} + +/// Extract the first non-empty line from a merge failure reason, truncated to `max_len` chars. +/// +/// Used to produce a compact snippet for chat notifications. +pub fn merge_failure_snippet(reason: &str, max_len: usize) -> String { + let line = reason + .lines() + .find(|l| !l.trim().is_empty()) + .unwrap_or(reason); + let mut chars = line.chars(); + let truncated: String = chars.by_ref().take(max_len).collect(); + if chars.next().is_some() { + format!("{truncated}\u{2026}") // append … + } else { + truncated + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/server/src/service/notifications/io/listener.rs b/server/src/service/notifications/io/listener.rs index 59b7bf57..2d056e5c 100644 --- a/server/src/service/notifications/io/listener.rs +++ b/server/src/service/notifications/io/listener.rs @@ -12,11 +12,14 @@ use std::time::Instant; use tokio::sync::broadcast; use super::super::events::classify; -use super::super::filter::{STAGE_TRANSITION_DEBOUNCE, should_send_rate_limit}; +use super::super::filter::{ + AGENT_EVENT_DEBOUNCE, STAGE_TRANSITION_DEBOUNCE, should_send_rate_limit, +}; use super::super::format::{ + format_agent_completed_notification, format_agent_started_notification, format_blocked_notification, format_error_notification, format_oauth_account_swapped, format_oauth_accounts_exhausted, format_rate_limit_notification, format_stage_notification, - stage_display_name, + merge_failure_snippet, stage_display_name, }; use super::super::route::rooms_for_notification; use super::{find_story_name_any_stage, read_story_name}; @@ -52,35 +55,73 @@ pub fn spawn_notification_listener( HashMap::new(); let mut flush_deadline: Option = None; + // Pending agent-status notifications, keyed by "{story_id}:{event_kind}". + // Value: (plain, html). Rapid successive events for the same story and + // event kind are coalesced: only the latest is sent after the debounce + // window expires (AGENT_EVENT_DEBOUNCE = 5 s). + let mut pending_agent_events: HashMap = HashMap::new(); + let mut agent_flush_deadline: Option = None; + loop { - // Wait for the next event, or flush pending transitions when the - // debounce window expires. - let recv_result = if let Some(deadline) = flush_deadline { + // Pick the earliest of the two debounce deadlines. + let earliest_deadline = match (flush_deadline, agent_flush_deadline) { + (Some(a), Some(b)) => Some(a.min(b)), + (Some(a), None) => Some(a), + (None, Some(b)) => Some(b), + (None, None) => None, + }; + + // Wait for the next event, or flush pending notifications when the + // earliest debounce window expires. + let recv_result = if let Some(deadline) = earliest_deadline { tokio::time::timeout_at(deadline, rx.recv()).await.ok() } else { Some(rx.recv().await) }; if recv_result.is_none() { - // Flush all coalesced stage-transition notifications. - for (item_id, (from_display, to_stage_key, story_name)) in - pending_transitions.drain() - { - let to_display = stage_display_name(&to_stage_key); - let (plain, html) = format_stage_notification( - &item_id, - story_name.as_deref(), - &from_display, - to_display, - ); - slog!("[bot] Sending stage notification: {plain}"); - for room_id in &rooms_for_notification(&get_room_ids) { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!("[bot] Failed to send notification to {room_id}: {e}"); + let now = tokio::time::Instant::now(); + // Flush stage transitions if their deadline has passed. + if flush_deadline.is_some_and(|d| d <= now) { + for (item_id, (from_display, to_stage_key, story_name)) in + pending_transitions.drain() + { + let to_display = stage_display_name(&to_stage_key); + let (plain, html) = format_stage_notification( + &item_id, + story_name.as_deref(), + &from_display, + to_display, + ); + slog!("[bot] Sending stage notification: {plain}"); + if config.status_push_enabled { + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await + { + slog!("[bot] Failed to send notification to {room_id}: {e}"); + } + } } } + flush_deadline = None; + } + // Flush agent events if their deadline has passed. + if agent_flush_deadline.is_some_and(|d| d <= now) { + for (_key, (plain, html)) in pending_agent_events.drain() { + slog!("[bot] Sending agent notification: {plain}"); + if config.status_push_enabled { + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await + { + slog!( + "[bot] Failed to send agent notification to {room_id}: {e}" + ); + } + } + } + } + agent_flush_deadline = None; } - flush_deadline = None; continue; } @@ -93,20 +134,34 @@ pub fn spawn_notification_listener( Err(broadcast::error::RecvError::Closed) => { slog!("[bot] Watcher channel closed, stopping notification listener"); // Flush any coalesced transitions that haven't fired yet. - for (item_id, (from_display, to_stage_key, story_name)) in - pending_transitions.drain() - { - let to_display = stage_display_name(&to_stage_key); - let (plain, html) = format_stage_notification( - &item_id, - story_name.as_deref(), - &from_display, - to_display, - ); - slog!("[bot] Sending stage notification: {plain}"); - for room_id in &rooms_for_notification(&get_room_ids) { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!("[bot] Failed to send notification to {room_id}: {e}"); + if config.status_push_enabled { + for (item_id, (from_display, to_stage_key, story_name)) in + pending_transitions.drain() + { + let to_display = stage_display_name(&to_stage_key); + let (plain, html) = format_stage_notification( + &item_id, + story_name.as_deref(), + &from_display, + to_display, + ); + slog!("[bot] Sending stage notification: {plain}"); + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await + { + slog!("[bot] Failed to send notification to {room_id}: {e}"); + } + } + } + for (_key, (plain, html)) in pending_agent_events.drain() { + slog!("[bot] Sending agent notification: {plain}"); + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await + { + slog!( + "[bot] Failed to send agent notification to {room_id}: {e}" + ); + } } } } @@ -117,6 +172,9 @@ pub fn spawn_notification_listener( use super::super::events::EventAction; match classify(&event) { EventAction::StageTransition => { + if !config.status_push_enabled { + continue; + } // WorkItem with a known from_stage — extract the fields. let WatcherEvent::WorkItem { ref stage, @@ -151,6 +209,9 @@ pub fn spawn_notification_listener( flush_deadline = Some(tokio::time::Instant::now() + STAGE_TRANSITION_DEBOUNCE); } EventAction::MergeFailure => { + if !config.status_push_enabled { + continue; + } let WatcherEvent::MergeFailure { ref story_id, ref reason, @@ -159,8 +220,11 @@ pub fn spawn_notification_listener( continue; }; let story_name = read_story_name(&project_root, "4_merge", story_id); + // AC3: include only the first non-empty line of the failure, + // truncated to ~120 chars. + let snippet = merge_failure_snippet(reason, 120); let (plain, html) = - format_error_notification(story_id, story_name.as_deref(), reason); + format_error_notification(story_id, story_name.as_deref(), &snippet); slog!("[bot] Sending error notification: {plain}"); for room_id in &rooms_for_notification(&get_room_ids) { if let Err(e) = transport.send_message(room_id, &plain, &html).await { @@ -169,6 +233,9 @@ pub fn spawn_notification_listener( } } EventAction::RateLimitWarning => { + if !config.status_push_enabled { + continue; + } let WatcherEvent::RateLimitWarning { ref story_id, ref agent_name, @@ -210,6 +277,9 @@ pub fn spawn_notification_listener( } } EventAction::StoryBlocked => { + if !config.status_push_enabled { + continue; + } let WatcherEvent::StoryBlocked { ref story_id, ref reason, @@ -228,6 +298,9 @@ pub fn spawn_notification_listener( } } EventAction::OAuthAccountSwapped => { + if !config.status_push_enabled { + continue; + } let WatcherEvent::OAuthAccountSwapped { ref new_email } = event else { continue; }; @@ -243,6 +316,9 @@ pub fn spawn_notification_listener( } } EventAction::OAuthAccountsExhausted => { + if !config.status_push_enabled { + continue; + } let WatcherEvent::OAuthAccountsExhausted { ref earliest_reset_msg, } = event @@ -260,6 +336,52 @@ pub fn spawn_notification_listener( } } } + EventAction::AgentStarted => { + if !config.status_push_enabled { + continue; + } + let WatcherEvent::AgentStarted { + ref story_id, + ref agent_name, + } = event + else { + continue; + }; + let story_name = find_story_name_any_stage(&project_root, story_id); + let (plain, html) = format_agent_started_notification( + story_id, + story_name.as_deref(), + agent_name, + ); + // Buffer with 5s debounce; later arrivals overwrite earlier ones. + let key = format!("{story_id}:started"); + pending_agent_events.insert(key, (plain, html)); + agent_flush_deadline = Some(tokio::time::Instant::now() + AGENT_EVENT_DEBOUNCE); + } + EventAction::AgentCompleted { success } => { + if !config.status_push_enabled { + continue; + } + let WatcherEvent::AgentCompleted { + ref story_id, + ref agent_name, + .. + } = event + else { + continue; + }; + let story_name = find_story_name_any_stage(&project_root, story_id); + let (plain, html) = format_agent_completed_notification( + story_id, + story_name.as_deref(), + agent_name, + success, + ); + // Buffer with 5s debounce; later arrivals overwrite earlier ones. + let key = format!("{story_id}:completed"); + pending_agent_events.insert(key, (plain, html)); + agent_flush_deadline = Some(tokio::time::Instant::now() + AGENT_EVENT_DEBOUNCE); + } EventAction::LogOnly => { // Hard-block: log server-side for debugging; do NOT post to chat. // Hard-block auto-resume is normal operation — the status command diff --git a/server/src/service/ws/message/convert.rs b/server/src/service/ws/message/convert.rs index 06b61d8f..f03af4bd 100644 --- a/server/src/service/ws/message/convert.rs +++ b/server/src/service/ws/message/convert.rs @@ -34,6 +34,9 @@ pub fn watcher_event_to_response(e: WatcherEvent) -> Option { // OAuth events are forwarded to chat transports only; no WebSocket message for the frontend. WatcherEvent::OAuthAccountSwapped { .. } => None, WatcherEvent::OAuthAccountsExhausted { .. } => None, + // Agent lifecycle events are forwarded to chat transports only; no WebSocket message. + WatcherEvent::AgentStarted { .. } => None, + WatcherEvent::AgentCompleted { .. } => None, } } diff --git a/server/src/worktree/cleanup.rs b/server/src/worktree/cleanup.rs index 6502fa1a..84645256 100644 --- a/server/src/worktree/cleanup.rs +++ b/server/src/worktree/cleanup.rs @@ -213,6 +213,7 @@ mod tests { max_mesh_peers: 3, gateway_url: None, gateway_project: None, + status_push_enabled: true, } } diff --git a/server/src/worktree/create.rs b/server/src/worktree/create.rs index 043e6472..61c8e35c 100644 --- a/server/src/worktree/create.rs +++ b/server/src/worktree/create.rs @@ -156,6 +156,7 @@ mod tests { max_mesh_peers: 3, gateway_url: None, gateway_project: None, + status_push_enabled: true, } } diff --git a/server/src/worktree/remove.rs b/server/src/worktree/remove.rs index e84fa8a2..cec5a723 100644 --- a/server/src/worktree/remove.rs +++ b/server/src/worktree/remove.rs @@ -91,6 +91,7 @@ mod tests { max_mesh_peers: 3, gateway_url: None, gateway_project: None, + status_push_enabled: true, } } diff --git a/server/src/worktree/sweep.rs b/server/src/worktree/sweep.rs index 464e1c26..9947c7d2 100644 --- a/server/src/worktree/sweep.rs +++ b/server/src/worktree/sweep.rs @@ -135,6 +135,7 @@ mod tests { max_mesh_peers: 3, gateway_url: None, gateway_project: None, + status_push_enabled: true, } }