huskies: merge 856

This commit is contained in:
dave
2026-04-29 21:28:41 +00:00
parent db526bbdb2
commit a7b1572693
13 changed files with 319 additions and 36 deletions
@@ -181,6 +181,13 @@ pub(in crate::agents::pool) async fn run_server_owned_completion(
"[agents] Server-owned completion for '{story_id}:{agent_name}': gates_passed={gates_passed}" "[agents] Server-owned completion for '{story_id}:{agent_name}': gates_passed={gates_passed}"
); );
// Notify chat transports of the agent completion result.
let _ = watcher_tx.send(WatcherEvent::AgentCompleted {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
success: gates_passed,
});
let report = CompletionReport { let report = CompletionReport {
summary: "Agent process exited normally".to_string(), summary: "Agent process exited normally".to_string(),
gates_passed, gates_passed,
+4
View File
@@ -217,6 +217,10 @@ pub(super) async fn run_agent_spawn(
status: "running".to_string(), status: "running".to_string(),
}); });
AgentPool::notify_agent_state_changed(&watcher_tx_clone); AgentPool::notify_agent_state_changed(&watcher_tx_clone);
let _ = watcher_tx_clone.send(WatcherEvent::AgentStarted {
story_id: sid.clone(),
agent_name: aname.clone(),
});
// Step 4: launch the agent process via the configured runtime. // Step 4: launch the agent process via the configured runtime.
let runtime_name = config_clone let runtime_name = config_clone
+16
View File
@@ -125,6 +125,14 @@ pub struct ProjectConfig {
/// Example: `gateway_project = "huskies"`. /// Example: `gateway_project = "huskies"`.
#[serde(default)] #[serde(default)]
pub gateway_project: Option<String>, pub gateway_project: Option<String>,
/// Whether the notification listener pushes status messages to chat rooms.
///
/// Set to `false` for quiet mode: all WatcherEvent-driven chat pushes
/// (stage transitions, agent started/completed, blocked, merge failures)
/// are suppressed. Other bot functionality (commands, responses) is
/// unaffected. Default: `true`.
#[serde(default = "default_status_push_enabled")]
pub status_push_enabled: bool,
} }
/// Configuration for the filesystem watcher's sweep behaviour. /// Configuration for the filesystem watcher's sweep behaviour.
@@ -192,6 +200,10 @@ fn default_whatsapp_status_consumer() -> bool {
true true
} }
fn default_status_push_enabled() -> bool {
true
}
fn default_max_mesh_peers() -> usize { fn default_max_mesh_peers() -> usize {
3 3
} }
@@ -336,6 +348,7 @@ impl Default for ProjectConfig {
max_mesh_peers: default_max_mesh_peers(), max_mesh_peers: default_max_mesh_peers(),
gateway_url: None, gateway_url: None,
gateway_project: None, gateway_project: None,
status_push_enabled: default_status_push_enabled(),
} }
} }
} }
@@ -424,6 +437,7 @@ impl ProjectConfig {
max_mesh_peers: default_max_mesh_peers(), max_mesh_peers: default_max_mesh_peers(),
gateway_url: None, gateway_url: None,
gateway_project: None, gateway_project: None,
status_push_enabled: default_status_push_enabled(),
}; };
validate_agents(&config.agent)?; validate_agents(&config.agent)?;
return Ok(config); return Ok(config);
@@ -463,6 +477,7 @@ impl ProjectConfig {
max_mesh_peers: default_max_mesh_peers(), max_mesh_peers: default_max_mesh_peers(),
gateway_url: None, gateway_url: None,
gateway_project: None, gateway_project: None,
status_push_enabled: default_status_push_enabled(),
}; };
validate_agents(&config.agent)?; validate_agents(&config.agent)?;
Ok(config) Ok(config)
@@ -490,6 +505,7 @@ impl ProjectConfig {
max_mesh_peers: default_max_mesh_peers(), max_mesh_peers: default_max_mesh_peers(),
gateway_url: None, gateway_url: None,
gateway_project: None, gateway_project: None,
status_push_enabled: default_status_push_enabled(),
}) })
} }
} }
+18
View File
@@ -72,4 +72,22 @@ pub enum WatcherEvent {
/// Human-readable message describing when the earliest reset occurs. /// Human-readable message describing when the earliest reset occurs.
earliest_reset_msg: String, earliest_reset_msg: String,
}, },
/// An agent transitioned to the Running state for a story.
/// Triggers a status notification to configured chat rooms.
AgentStarted {
/// Work item ID the agent is working on.
story_id: String,
/// Name of the agent that started.
agent_name: String,
},
/// An agent finished processing a story (gates passed or failed).
/// Triggers a status notification to configured chat rooms.
AgentCompleted {
/// Work item ID the agent was working on.
story_id: String,
/// Name of the agent that completed.
agent_name: String,
/// `true` if acceptance gates passed; `false` if they failed.
success: bool,
},
} }
@@ -21,6 +21,13 @@ pub enum EventAction {
OAuthAccountSwapped, OAuthAccountSwapped,
/// Post an OAuth accounts-exhausted notification with the earliest reset time. /// Post an OAuth accounts-exhausted notification with the earliest reset time.
OAuthAccountsExhausted, OAuthAccountsExhausted,
/// Post an agent-started (running) notification.
AgentStarted,
/// Post an agent-completed notification with pass/fail result.
AgentCompleted {
/// `true` if acceptance gates passed.
success: bool,
},
/// Log server-side only; do not post to chat (e.g. hard rate-limit blocks). /// Log server-side only; do not post to chat (e.g. hard rate-limit blocks).
LogOnly, LogOnly,
/// Reload the project configuration. /// Reload the project configuration.
@@ -48,6 +55,10 @@ pub fn classify(event: &WatcherEvent) -> EventAction {
WatcherEvent::ConfigChanged => EventAction::ReloadConfig, WatcherEvent::ConfigChanged => EventAction::ReloadConfig,
WatcherEvent::OAuthAccountSwapped { .. } => EventAction::OAuthAccountSwapped, WatcherEvent::OAuthAccountSwapped { .. } => EventAction::OAuthAccountSwapped,
WatcherEvent::OAuthAccountsExhausted { .. } => EventAction::OAuthAccountsExhausted, WatcherEvent::OAuthAccountsExhausted { .. } => EventAction::OAuthAccountsExhausted,
WatcherEvent::AgentStarted { .. } => EventAction::AgentStarted,
WatcherEvent::AgentCompleted { success, .. } => {
EventAction::AgentCompleted { success: *success }
}
_ => EventAction::Skip, _ => EventAction::Skip,
} }
} }
@@ -138,4 +149,39 @@ mod tests {
}; };
assert_eq!(classify(&event), EventAction::OAuthAccountsExhausted); assert_eq!(classify(&event), EventAction::OAuthAccountsExhausted);
} }
#[test]
fn agent_started_is_classified_correctly() {
let event = WatcherEvent::AgentStarted {
story_id: "1_story_foo".to_string(),
agent_name: "coder-1".to_string(),
};
assert_eq!(classify(&event), EventAction::AgentStarted);
}
#[test]
fn agent_completed_success_is_classified_correctly() {
let event = WatcherEvent::AgentCompleted {
story_id: "1_story_foo".to_string(),
agent_name: "coder-1".to_string(),
success: true,
};
assert_eq!(
classify(&event),
EventAction::AgentCompleted { success: true }
);
}
#[test]
fn agent_completed_failure_is_classified_correctly() {
let event = WatcherEvent::AgentCompleted {
story_id: "1_story_foo".to_string(),
agent_name: "coder-1".to_string(),
success: false,
};
assert_eq!(
classify(&event),
EventAction::AgentCompleted { success: false }
);
}
} }
@@ -12,6 +12,10 @@ pub const RATE_LIMIT_DEBOUNCE: Duration = Duration::from_secs(60);
/// into a single notification (only the final stage is announced). /// into a single notification (only the final stage is announced).
pub const STAGE_TRANSITION_DEBOUNCE: Duration = Duration::from_millis(200); pub const STAGE_TRANSITION_DEBOUNCE: Duration = Duration::from_millis(200);
/// Window during which repeated agent-status events for the same story are
/// coalesced into a single notification (only the latest is sent).
pub const AGENT_EVENT_DEBOUNCE: Duration = Duration::from_secs(5);
/// Returns `true` if a rate-limit notification should be sent. /// Returns `true` if a rate-limit notification should be sent.
/// ///
/// `last_notified` is the [`Instant`] of the last sent notification for this /// `last_notified` is the [`Instant`] of the last sent notification for this
@@ -115,6 +115,65 @@ pub fn format_oauth_accounts_exhausted(earliest_reset_msg: &str) -> (String, Str
(plain, html) (plain, html)
} }
/// Format an agent-started notification message.
///
/// Sent when an agent transitions to the Running state.
/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`.
pub fn format_agent_started_notification(
item_id: &str,
story_name: Option<&str>,
agent_name: &str,
) -> (String, String) {
let number = extract_item_number(item_id).unwrap_or(item_id);
let name = story_name.unwrap_or(item_id);
let plain = format!("\u{1F916} #{number} {name} \u{2014} {agent_name} started");
let html = format!(
"\u{1F916} <strong>#{number}</strong> <em>{name}</em> \u{2014} {agent_name} started"
);
(plain, html)
}
/// Format an agent-completed notification message.
///
/// Sent when an agent finishes processing a story (gates passed or failed).
/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`.
pub fn format_agent_completed_notification(
item_id: &str,
story_name: Option<&str>,
agent_name: &str,
success: bool,
) -> (String, String) {
let number = extract_item_number(item_id).unwrap_or(item_id);
let name = story_name.unwrap_or(item_id);
let (emoji, result) = if success {
("\u{2705}", "completed") // ✅
} else {
("\u{274C}", "failed") // ❌
};
let plain = format!("{emoji} #{number} {name} \u{2014} {agent_name} {result}");
let html = format!(
"{emoji} <strong>#{number}</strong> <em>{name}</em> \u{2014} {agent_name} {result}"
);
(plain, html)
}
/// Extract the first non-empty line from a merge failure reason, truncated to `max_len` chars.
///
/// Used to produce a compact snippet for chat notifications.
pub fn merge_failure_snippet(reason: &str, max_len: usize) -> String {
let line = reason
.lines()
.find(|l| !l.trim().is_empty())
.unwrap_or(reason);
let mut chars = line.chars();
let truncated: String = chars.by_ref().take(max_len).collect();
if chars.next().is_some() {
format!("{truncated}\u{2026}") // append …
} else {
truncated
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
+158 -36
View File
@@ -12,11 +12,14 @@ use std::time::Instant;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use super::super::events::classify; use super::super::events::classify;
use super::super::filter::{STAGE_TRANSITION_DEBOUNCE, should_send_rate_limit}; use super::super::filter::{
AGENT_EVENT_DEBOUNCE, STAGE_TRANSITION_DEBOUNCE, should_send_rate_limit,
};
use super::super::format::{ use super::super::format::{
format_agent_completed_notification, format_agent_started_notification,
format_blocked_notification, format_error_notification, format_oauth_account_swapped, format_blocked_notification, format_error_notification, format_oauth_account_swapped,
format_oauth_accounts_exhausted, format_rate_limit_notification, format_stage_notification, format_oauth_accounts_exhausted, format_rate_limit_notification, format_stage_notification,
stage_display_name, merge_failure_snippet, stage_display_name,
}; };
use super::super::route::rooms_for_notification; use super::super::route::rooms_for_notification;
use super::{find_story_name_any_stage, read_story_name}; use super::{find_story_name_any_stage, read_story_name};
@@ -52,35 +55,73 @@ pub fn spawn_notification_listener(
HashMap::new(); HashMap::new();
let mut flush_deadline: Option<tokio::time::Instant> = None; let mut flush_deadline: Option<tokio::time::Instant> = None;
// Pending agent-status notifications, keyed by "{story_id}:{event_kind}".
// Value: (plain, html). Rapid successive events for the same story and
// event kind are coalesced: only the latest is sent after the debounce
// window expires (AGENT_EVENT_DEBOUNCE = 5 s).
let mut pending_agent_events: HashMap<String, (String, String)> = HashMap::new();
let mut agent_flush_deadline: Option<tokio::time::Instant> = None;
loop { loop {
// Wait for the next event, or flush pending transitions when the // Pick the earliest of the two debounce deadlines.
// debounce window expires. let earliest_deadline = match (flush_deadline, agent_flush_deadline) {
let recv_result = if let Some(deadline) = flush_deadline { (Some(a), Some(b)) => Some(a.min(b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
};
// Wait for the next event, or flush pending notifications when the
// earliest debounce window expires.
let recv_result = if let Some(deadline) = earliest_deadline {
tokio::time::timeout_at(deadline, rx.recv()).await.ok() tokio::time::timeout_at(deadline, rx.recv()).await.ok()
} else { } else {
Some(rx.recv().await) Some(rx.recv().await)
}; };
if recv_result.is_none() { if recv_result.is_none() {
// Flush all coalesced stage-transition notifications. let now = tokio::time::Instant::now();
for (item_id, (from_display, to_stage_key, story_name)) in // Flush stage transitions if their deadline has passed.
pending_transitions.drain() if flush_deadline.is_some_and(|d| d <= now) {
{ for (item_id, (from_display, to_stage_key, story_name)) in
let to_display = stage_display_name(&to_stage_key); pending_transitions.drain()
let (plain, html) = format_stage_notification( {
&item_id, let to_display = stage_display_name(&to_stage_key);
story_name.as_deref(), let (plain, html) = format_stage_notification(
&from_display, &item_id,
to_display, story_name.as_deref(),
); &from_display,
slog!("[bot] Sending stage notification: {plain}"); to_display,
for room_id in &rooms_for_notification(&get_room_ids) { );
if let Err(e) = transport.send_message(room_id, &plain, &html).await { slog!("[bot] Sending stage notification: {plain}");
slog!("[bot] Failed to send notification to {room_id}: {e}"); if config.status_push_enabled {
for room_id in &rooms_for_notification(&get_room_ids) {
if let Err(e) = transport.send_message(room_id, &plain, &html).await
{
slog!("[bot] Failed to send notification to {room_id}: {e}");
}
}
} }
} }
flush_deadline = None;
}
// Flush agent events if their deadline has passed.
if agent_flush_deadline.is_some_and(|d| d <= now) {
for (_key, (plain, html)) in pending_agent_events.drain() {
slog!("[bot] Sending agent notification: {plain}");
if config.status_push_enabled {
for room_id in &rooms_for_notification(&get_room_ids) {
if let Err(e) = transport.send_message(room_id, &plain, &html).await
{
slog!(
"[bot] Failed to send agent notification to {room_id}: {e}"
);
}
}
}
}
agent_flush_deadline = None;
} }
flush_deadline = None;
continue; continue;
} }
@@ -93,20 +134,34 @@ pub fn spawn_notification_listener(
Err(broadcast::error::RecvError::Closed) => { Err(broadcast::error::RecvError::Closed) => {
slog!("[bot] Watcher channel closed, stopping notification listener"); slog!("[bot] Watcher channel closed, stopping notification listener");
// Flush any coalesced transitions that haven't fired yet. // Flush any coalesced transitions that haven't fired yet.
for (item_id, (from_display, to_stage_key, story_name)) in if config.status_push_enabled {
pending_transitions.drain() for (item_id, (from_display, to_stage_key, story_name)) in
{ pending_transitions.drain()
let to_display = stage_display_name(&to_stage_key); {
let (plain, html) = format_stage_notification( let to_display = stage_display_name(&to_stage_key);
&item_id, let (plain, html) = format_stage_notification(
story_name.as_deref(), &item_id,
&from_display, story_name.as_deref(),
to_display, &from_display,
); to_display,
slog!("[bot] Sending stage notification: {plain}"); );
for room_id in &rooms_for_notification(&get_room_ids) { slog!("[bot] Sending stage notification: {plain}");
if let Err(e) = transport.send_message(room_id, &plain, &html).await { for room_id in &rooms_for_notification(&get_room_ids) {
slog!("[bot] Failed to send notification to {room_id}: {e}"); if let Err(e) = transport.send_message(room_id, &plain, &html).await
{
slog!("[bot] Failed to send notification to {room_id}: {e}");
}
}
}
for (_key, (plain, html)) in pending_agent_events.drain() {
slog!("[bot] Sending agent notification: {plain}");
for room_id in &rooms_for_notification(&get_room_ids) {
if let Err(e) = transport.send_message(room_id, &plain, &html).await
{
slog!(
"[bot] Failed to send agent notification to {room_id}: {e}"
);
}
} }
} }
} }
@@ -117,6 +172,9 @@ pub fn spawn_notification_listener(
use super::super::events::EventAction; use super::super::events::EventAction;
match classify(&event) { match classify(&event) {
EventAction::StageTransition => { EventAction::StageTransition => {
if !config.status_push_enabled {
continue;
}
// WorkItem with a known from_stage — extract the fields. // WorkItem with a known from_stage — extract the fields.
let WatcherEvent::WorkItem { let WatcherEvent::WorkItem {
ref stage, ref stage,
@@ -151,6 +209,9 @@ pub fn spawn_notification_listener(
flush_deadline = Some(tokio::time::Instant::now() + STAGE_TRANSITION_DEBOUNCE); flush_deadline = Some(tokio::time::Instant::now() + STAGE_TRANSITION_DEBOUNCE);
} }
EventAction::MergeFailure => { EventAction::MergeFailure => {
if !config.status_push_enabled {
continue;
}
let WatcherEvent::MergeFailure { let WatcherEvent::MergeFailure {
ref story_id, ref story_id,
ref reason, ref reason,
@@ -159,8 +220,11 @@ pub fn spawn_notification_listener(
continue; continue;
}; };
let story_name = read_story_name(&project_root, "4_merge", story_id); let story_name = read_story_name(&project_root, "4_merge", story_id);
// AC3: include only the first non-empty line of the failure,
// truncated to ~120 chars.
let snippet = merge_failure_snippet(reason, 120);
let (plain, html) = let (plain, html) =
format_error_notification(story_id, story_name.as_deref(), reason); format_error_notification(story_id, story_name.as_deref(), &snippet);
slog!("[bot] Sending error notification: {plain}"); slog!("[bot] Sending error notification: {plain}");
for room_id in &rooms_for_notification(&get_room_ids) { for room_id in &rooms_for_notification(&get_room_ids) {
if let Err(e) = transport.send_message(room_id, &plain, &html).await { if let Err(e) = transport.send_message(room_id, &plain, &html).await {
@@ -169,6 +233,9 @@ pub fn spawn_notification_listener(
} }
} }
EventAction::RateLimitWarning => { EventAction::RateLimitWarning => {
if !config.status_push_enabled {
continue;
}
let WatcherEvent::RateLimitWarning { let WatcherEvent::RateLimitWarning {
ref story_id, ref story_id,
ref agent_name, ref agent_name,
@@ -210,6 +277,9 @@ pub fn spawn_notification_listener(
} }
} }
EventAction::StoryBlocked => { EventAction::StoryBlocked => {
if !config.status_push_enabled {
continue;
}
let WatcherEvent::StoryBlocked { let WatcherEvent::StoryBlocked {
ref story_id, ref story_id,
ref reason, ref reason,
@@ -228,6 +298,9 @@ pub fn spawn_notification_listener(
} }
} }
EventAction::OAuthAccountSwapped => { EventAction::OAuthAccountSwapped => {
if !config.status_push_enabled {
continue;
}
let WatcherEvent::OAuthAccountSwapped { ref new_email } = event else { let WatcherEvent::OAuthAccountSwapped { ref new_email } = event else {
continue; continue;
}; };
@@ -243,6 +316,9 @@ pub fn spawn_notification_listener(
} }
} }
EventAction::OAuthAccountsExhausted => { EventAction::OAuthAccountsExhausted => {
if !config.status_push_enabled {
continue;
}
let WatcherEvent::OAuthAccountsExhausted { let WatcherEvent::OAuthAccountsExhausted {
ref earliest_reset_msg, ref earliest_reset_msg,
} = event } = event
@@ -260,6 +336,52 @@ pub fn spawn_notification_listener(
} }
} }
} }
EventAction::AgentStarted => {
if !config.status_push_enabled {
continue;
}
let WatcherEvent::AgentStarted {
ref story_id,
ref agent_name,
} = event
else {
continue;
};
let story_name = find_story_name_any_stage(&project_root, story_id);
let (plain, html) = format_agent_started_notification(
story_id,
story_name.as_deref(),
agent_name,
);
// Buffer with 5s debounce; later arrivals overwrite earlier ones.
let key = format!("{story_id}:started");
pending_agent_events.insert(key, (plain, html));
agent_flush_deadline = Some(tokio::time::Instant::now() + AGENT_EVENT_DEBOUNCE);
}
EventAction::AgentCompleted { success } => {
if !config.status_push_enabled {
continue;
}
let WatcherEvent::AgentCompleted {
ref story_id,
ref agent_name,
..
} = event
else {
continue;
};
let story_name = find_story_name_any_stage(&project_root, story_id);
let (plain, html) = format_agent_completed_notification(
story_id,
story_name.as_deref(),
agent_name,
success,
);
// Buffer with 5s debounce; later arrivals overwrite earlier ones.
let key = format!("{story_id}:completed");
pending_agent_events.insert(key, (plain, html));
agent_flush_deadline = Some(tokio::time::Instant::now() + AGENT_EVENT_DEBOUNCE);
}
EventAction::LogOnly => { EventAction::LogOnly => {
// Hard-block: log server-side for debugging; do NOT post to chat. // Hard-block: log server-side for debugging; do NOT post to chat.
// Hard-block auto-resume is normal operation — the status command // Hard-block auto-resume is normal operation — the status command
+3
View File
@@ -34,6 +34,9 @@ pub fn watcher_event_to_response(e: WatcherEvent) -> Option<WsResponse> {
// OAuth events are forwarded to chat transports only; no WebSocket message for the frontend. // OAuth events are forwarded to chat transports only; no WebSocket message for the frontend.
WatcherEvent::OAuthAccountSwapped { .. } => None, WatcherEvent::OAuthAccountSwapped { .. } => None,
WatcherEvent::OAuthAccountsExhausted { .. } => None, WatcherEvent::OAuthAccountsExhausted { .. } => None,
// Agent lifecycle events are forwarded to chat transports only; no WebSocket message.
WatcherEvent::AgentStarted { .. } => None,
WatcherEvent::AgentCompleted { .. } => None,
} }
} }
+1
View File
@@ -213,6 +213,7 @@ mod tests {
max_mesh_peers: 3, max_mesh_peers: 3,
gateway_url: None, gateway_url: None,
gateway_project: None, gateway_project: None,
status_push_enabled: true,
} }
} }
+1
View File
@@ -156,6 +156,7 @@ mod tests {
max_mesh_peers: 3, max_mesh_peers: 3,
gateway_url: None, gateway_url: None,
gateway_project: None, gateway_project: None,
status_push_enabled: true,
} }
} }
+1
View File
@@ -91,6 +91,7 @@ mod tests {
max_mesh_peers: 3, max_mesh_peers: 3,
gateway_url: None, gateway_url: None,
gateway_project: None, gateway_project: None,
status_push_enabled: true,
} }
} }
+1
View File
@@ -135,6 +135,7 @@ mod tests {
max_mesh_peers: 3, max_mesh_peers: 3,
gateway_url: None, gateway_url: None,
gateway_project: None, gateway_project: None,
status_push_enabled: true,
} }
} }