From e20083a283955fba9848c5b1115c825044de0f5c Mon Sep 17 00:00:00 2001 From: dave Date: Sat, 25 Apr 2026 13:07:12 +0000 Subject: [PATCH] huskies: merge 624_bug_agent_turn_and_budget_limits_not_enforced_coder_1_ran_5_6x_over_max_turns --- server/src/agents/mod.rs | 12 + server/src/agents/pool/auto_assign/scan.rs | 1 + .../src/agents/pool/auto_assign/watchdog.rs | 404 +++++++++++++++++- server/src/agents/pool/start.rs | 2 + server/src/agents/pool/test_helpers.rs | 4 + server/src/agents/pool/types.rs | 3 + server/src/agents/pool/wait.rs | 1 + server/src/http/mcp/agent_tools.rs | 12 +- server/src/main.rs | 2 +- server/src/service/agents/selection.rs | 1 + 10 files changed, 435 insertions(+), 7 deletions(-) diff --git a/server/src/agents/mod.rs b/server/src/agents/mod.rs index 44382957..727123f5 100644 --- a/server/src/agents/mod.rs +++ b/server/src/agents/mod.rs @@ -80,6 +80,16 @@ pub enum AgentStatus { Failed, } +/// Why an agent was forcibly terminated by the watchdog. +#[derive(Debug, Clone, Serialize, PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum TerminationReason { + /// Agent exceeded its configured `max_turns`. + TurnLimit, + /// Agent exceeded its configured `max_budget_usd`. + BudgetLimit, +} + impl std::fmt::Display for AgentStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -192,6 +202,8 @@ pub struct AgentInfo { pub log_session_id: Option, /// True when a rate-limit throttle warning was received for this agent. pub throttled: bool, + /// Set when the watchdog terminates the agent for exceeding a limit. + pub termination_reason: Option, } #[cfg(test)] diff --git a/server/src/agents/pool/auto_assign/scan.rs b/server/src/agents/pool/auto_assign/scan.rs index 8390ed15..728ae6db 100644 --- a/server/src/agents/pool/auto_assign/scan.rs +++ b/server/src/agents/pool/auto_assign/scan.rs @@ -149,6 +149,7 @@ mod tests { log_session_id: None, merge_failure_reported: false, throttled: false, + termination_reason: None, } } diff --git a/server/src/agents/pool/auto_assign/watchdog.rs b/server/src/agents/pool/auto_assign/watchdog.rs index c87bde0a..a2dcacbe 100644 --- a/server/src/agents/pool/auto_assign/watchdog.rs +++ b/server/src/agents/pool/auto_assign/watchdog.rs @@ -1,12 +1,15 @@ -//! Watchdog task: detects orphaned agents and triggers auto-assign. +//! Watchdog task: detects orphaned agents, enforces turn/budget limits, and +//! triggers auto-assign. use std::collections::HashMap; +use std::path::Path; use std::sync::Mutex; use tokio::sync::broadcast; +use crate::config::ProjectConfig; use crate::slog; -use super::super::super::{AgentEvent, AgentStatus}; +use super::super::super::{AgentEvent, AgentStatus, TerminationReason}; use super::super::{AgentPool, StoryAgent}; /// Scan the agent pool for Running entries whose backing tokio task has already @@ -65,6 +68,141 @@ pub(super) fn check_orphaned_agents(agents: &Mutex>) count } +/// Scan running agents for turn/budget limit violations and terminate offenders. +/// +/// Uses the same accessor path as `tool_get_agent_remaining_turns_and_budget`: +/// log files are counted via `agent_log::list_story_log_files` + `read_log`, +/// and budget via `token_usage::read_all`. +fn check_agent_limits( + agents: &Mutex>, + project_root: &Path, +) -> Vec<(String, TerminationReason)> { + let config = match ProjectConfig::load(project_root) { + Ok(c) => c, + Err(_) => return Vec::new(), + }; + + // Snapshot running agents: (key, story_id, agent_name, tx). + let running: Vec<(String, String, String, broadcast::Sender)> = { + let lock = match agents.lock() { + Ok(l) => l, + Err(_) => return Vec::new(), + }; + lock.iter() + .filter(|(_, agent)| agent.status == AgentStatus::Running) + .map(|(key, agent)| { + let story_id = key + .rsplit_once(':') + .map(|(s, _)| s.to_string()) + .unwrap_or_else(|| key.clone()); + ( + key.clone(), + story_id, + agent.agent_name.clone(), + agent.tx.clone(), + ) + }) + .collect() + }; + + let all_token_records = crate::agents::token_usage::read_all(project_root).unwrap_or_default(); + + let mut terminated = Vec::new(); + + for (key, story_id, agent_name, tx) in &running { + let agent_config = config.agent.iter().find(|a| a.name == *agent_name); + let max_turns = agent_config.and_then(|a| a.max_turns); + let max_budget_usd = agent_config.and_then(|a| a.max_budget_usd); + + // Skip agents with no limits configured. + if max_turns.is_none() && max_budget_usd.is_none() { + continue; + } + + // Count turns — same path as tool_get_agent_remaining_turns_and_budget. + let mut turns_used: u64 = 0; + if max_turns.is_some() { + let log_files = + crate::agent_log::list_story_log_files(project_root, story_id, Some(agent_name)); + for path in &log_files { + if let Ok(entries) = crate::agent_log::read_log(path) { + for entry in &entries { + if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json") + && let Some(data) = entry.event.get("data") + && data.get("type").and_then(|v| v.as_str()) == Some("assistant") + { + turns_used += 1; + } + } + } + } + } + + // Compute budget — same path as tool_get_agent_remaining_turns_and_budget. + let budget_used_usd: f64 = if max_budget_usd.is_some() { + all_token_records + .iter() + .filter(|r| r.story_id == *story_id && r.agent_name == *agent_name) + .map(|r| r.usage.total_cost_usd) + .sum() + } else { + 0.0 + }; + + // Determine if a limit is exceeded. + let reason = if let Some(max) = max_turns { + if turns_used >= max as u64 { + Some(TerminationReason::TurnLimit) + } else { + None + } + } else { + None + } + .or(if let Some(max) = max_budget_usd { + if budget_used_usd >= max { + Some(TerminationReason::BudgetLimit) + } else { + None + } + } else { + None + }); + + if let Some(reason) = reason { + let reason_str = match &reason { + TerminationReason::TurnLimit => { + format!("turn limit exceeded ({turns_used}/{})", max_turns.unwrap()) + } + TerminationReason::BudgetLimit => format!( + "budget limit exceeded (${budget_used_usd:.2}/${:.2})", + max_budget_usd.unwrap() + ), + }; + + // Mark agent as Failed with termination reason. + if let Ok(mut lock) = agents.lock() + && let Some(agent) = lock.get_mut(key) + { + agent.status = AgentStatus::Failed; + agent.termination_reason = Some(reason.clone()); + } + + slog!("[watchdog] Terminating agent '{key}': {reason_str}."); + + let _ = tx.send(AgentEvent::Error { + story_id: story_id.clone(), + agent_name: agent_name.clone(), + message: format!("Agent terminated by watchdog: {reason_str}"), + }); + + terminated.push((key.clone(), reason)); + } + } + + terminated +} + impl AgentPool { /// Run a single watchdog pass synchronously (test helper). #[cfg(test)] @@ -72,11 +210,31 @@ impl AgentPool { check_orphaned_agents(&self.agents); } - /// Run one watchdog pass and return the number of orphaned agents detected. + /// Run one watchdog pass: detect orphans, enforce limits, kill offenders. /// /// Called by the unified background tick loop every 30 ticks. - pub fn run_watchdog_pass(&self) -> usize { - check_orphaned_agents(&self.agents) + pub fn run_watchdog_pass(&self, project_root: Option<&Path>) -> usize { + let orphaned = check_orphaned_agents(&self.agents); + + if let Some(root) = project_root { + let terminated = check_agent_limits(&self.agents, root); + for (key, _reason) in &terminated { + // Kill the PTY child and abort the task, same as stop_agent. + self.kill_child_for_key(key); + if let Ok(mut lock) = self.agents.lock() + && let Some(agent) = lock.get_mut(key) + && let Some(handle) = agent.task_handle.take() + { + handle.abort(); + } + } + if !terminated.is_empty() { + Self::notify_agent_state_changed(&self.watcher_tx); + } + return orphaned + terminated.len(); + } + + orphaned } } @@ -87,6 +245,58 @@ mod tests { use super::super::super::{AgentPool, composite_key}; use super::*; + /// Write a fake agent log file with `n` assistant turn entries. + fn write_fake_log(project_root: &Path, story_id: &str, agent_name: &str, n_turns: u64) { + let log_dir = project_root.join(".huskies").join("logs").join(story_id); + std::fs::create_dir_all(&log_dir).unwrap(); + let log_path = log_dir.join(format!("{agent_name}-test-session.log")); + let mut content = String::new(); + for _ in 0..n_turns { + content.push_str( + &serde_json::to_string(&serde_json::json!({ + "timestamp": "2026-04-25T00:00:00Z", + "type": "agent_json", + "story_id": story_id, + "agent_name": agent_name, + "data": { "type": "assistant", "message": {} } + })) + .unwrap(), + ); + content.push('\n'); + } + std::fs::write(log_path, content).unwrap(); + } + + /// Write a fake token usage record with the given cost. + fn write_fake_token_usage( + project_root: &Path, + story_id: &str, + agent_name: &str, + cost_usd: f64, + ) { + let record = crate::agents::token_usage::TokenUsageRecord { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + timestamp: "2026-04-25T00:00:00Z".to_string(), + model: None, + usage: crate::agents::TokenUsage { + input_tokens: 1000, + output_tokens: 500, + cache_creation_input_tokens: 0, + cache_read_input_tokens: 0, + total_cost_usd: cost_usd, + }, + }; + crate::agents::token_usage::append_record(project_root, &record).unwrap(); + } + + /// Write a minimal project.toml with the given agent config. + fn write_project_config(project_root: &Path, config_toml: &str) { + let huskies_dir = project_root.join(".huskies"); + std::fs::create_dir_all(&huskies_dir).unwrap(); + std::fs::write(huskies_dir.join("project.toml"), config_toml).unwrap(); + } + // ── check_orphaned_agents return value tests (bug 161) ────────────────── #[tokio::test] @@ -160,6 +370,190 @@ mod tests { ); } + // ── Limit enforcement integration tests (bug 624) ──────────────────────── + + #[test] + fn watchdog_terminates_agent_exceeding_turn_limit() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_turns = 10 +"#, + ); + + // Write 12 turns (exceeds limit of 10). + write_fake_log(root, "story_a", "coder-1", 12); + + let pool = AgentPool::new_test(3001); + let tx = pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running); + let mut rx = tx.subscribe(); + + let found = pool.run_watchdog_pass(Some(root)); + assert!(found >= 1, "watchdog should detect the over-limit agent"); + + // Agent should now be Failed with TurnLimit reason. + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("story_a", "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!(agent.status, AgentStatus::Failed); + assert_eq!( + agent.termination_reason, + Some(TerminationReason::TurnLimit), + "termination reason must be TurnLimit" + ); + } + + let event = rx.try_recv().expect("watchdog must emit an Error event"); + assert!( + matches!(event, AgentEvent::Error { .. }), + "expected AgentEvent::Error, got: {event:?}" + ); + } + + #[test] + fn watchdog_terminates_agent_exceeding_budget_limit() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_budget_usd = 5.00 +"#, + ); + + // Write $6.00 in usage (exceeds limit of $5.00). + write_fake_token_usage(root, "story_b", "coder-1", 6.00); + + let pool = AgentPool::new_test(3001); + let tx = pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running); + let mut rx = tx.subscribe(); + + let found = pool.run_watchdog_pass(Some(root)); + assert!(found >= 1, "watchdog should detect the over-budget agent"); + + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("story_b", "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!(agent.status, AgentStatus::Failed); + assert_eq!( + agent.termination_reason, + Some(TerminationReason::BudgetLimit), + "termination reason must be BudgetLimit" + ); + } + + let event = rx.try_recv().expect("watchdog must emit an Error event"); + assert!(matches!(event, AgentEvent::Error { .. })); + } + + #[test] + fn watchdog_does_not_terminate_agent_under_limits() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_turns = 50 +max_budget_usd = 10.00 +"#, + ); + + // Agent is under both limits. + write_fake_log(root, "story_c", "coder-1", 25); + write_fake_token_usage(root, "story_c", "coder-1", 3.00); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story_c", "coder-1", AgentStatus::Running); + + let found = pool.run_watchdog_pass(Some(root)); + assert_eq!(found, 0, "agent under limits should not be terminated"); + + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("story_c", "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!( + agent.status, + AgentStatus::Running, + "agent under limits should stay Running" + ); + assert!(agent.termination_reason.is_none()); + } + } + + /// Regression test for the original bug 624 incident: + /// coder-1 with max_turns=50, max_budget_usd=5.00 ran 5.6× over the turn + /// limit (280 turns). The watchdog must terminate at the turn limit (turns + /// hit first in the observed trace), with reason TurnLimit. + #[test] + fn regression_bug624_coder1_story623_trajectory() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_turns = 50 +max_budget_usd = 5.00 +"#, + ); + + // Simulate the trajectory: 55 turns (just past the limit — the watchdog + // would have caught it here, not at 280) and $2.50 spent (under budget). + // Turns hit first, so reason should be TurnLimit. + write_fake_log(root, "story_623", "coder-1", 55); + write_fake_token_usage(root, "story_623", "coder-1", 2.50); + + let pool = AgentPool::new_test(3001); + let tx = pool.inject_test_agent("story_623", "coder-1", AgentStatus::Running); + let mut rx = tx.subscribe(); + + let found = pool.run_watchdog_pass(Some(root)); + assert!(found >= 1, "watchdog must catch the turn-limit violation"); + + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("story_623", "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!(agent.status, AgentStatus::Failed); + assert_eq!( + agent.termination_reason, + Some(TerminationReason::TurnLimit), + "turns hit first in the observed trace, so reason must be TurnLimit" + ); + } + + // The error event should have been emitted. + let event = rx.try_recv().expect("watchdog must emit an Error event"); + if let AgentEvent::Error { message, .. } = &event { + assert!( + message.contains("turn limit"), + "error message should mention turn limit, got: {message}" + ); + } else { + panic!("expected AgentEvent::Error, got: {event:?}"); + } + } + #[tokio::test] async fn watchdog_orphan_detection_returns_nonzero_enabling_auto_assign() { // This test verifies the contract that `check_orphaned_agents` returns diff --git a/server/src/agents/pool/start.rs b/server/src/agents/pool/start.rs index de8f257d..f31481cb 100644 --- a/server/src/agents/pool/start.rs +++ b/server/src/agents/pool/start.rs @@ -285,6 +285,7 @@ impl AgentPool { log_session_id: Some(log_session_id.clone()), merge_failure_reported: false, throttled: false, + termination_reason: None, }, ); } @@ -652,6 +653,7 @@ impl AgentPool { completion: None, log_session_id: Some(log_session_id), throttled: false, + termination_reason: None, }) } } diff --git a/server/src/agents/pool/test_helpers.rs b/server/src/agents/pool/test_helpers.rs index c2dc8574..41dddc5a 100644 --- a/server/src/agents/pool/test_helpers.rs +++ b/server/src/agents/pool/test_helpers.rs @@ -35,6 +35,7 @@ impl AgentPool { log_session_id: None, merge_failure_reported: false, throttled: false, + termination_reason: None, }, ); tx @@ -71,6 +72,7 @@ impl AgentPool { log_session_id: None, merge_failure_reported: false, throttled: false, + termination_reason: None, }, ); tx @@ -104,6 +106,7 @@ impl AgentPool { log_session_id: None, merge_failure_reported: false, throttled: false, + termination_reason: None, }, ); tx @@ -136,6 +139,7 @@ impl AgentPool { log_session_id: None, merge_failure_reported: false, throttled: false, + termination_reason: None, }, ); tx diff --git a/server/src/agents/pool/types.rs b/server/src/agents/pool/types.rs index d168d4f3..1187170c 100644 --- a/server/src/agents/pool/types.rs +++ b/server/src/agents/pool/types.rs @@ -84,6 +84,8 @@ pub(super) struct StoryAgent { /// Set to `true` when a rate-limit throttle warning was received for this agent. /// True when a rate-limit throttle warning was received for this agent. pub(super) throttled: bool, + /// Set when the watchdog terminates the agent for exceeding a limit. + pub(super) termination_reason: Option, } /// Build an `AgentInfo` snapshot from a `StoryAgent` map entry. @@ -104,5 +106,6 @@ pub(super) fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> Agent completion: agent.completion.clone(), log_session_id: agent.log_session_id.clone(), throttled: agent.throttled, + termination_reason: agent.termination_reason.clone(), } } diff --git a/server/src/agents/pool/wait.rs b/server/src/agents/pool/wait.rs index 52d899a0..bc81665c 100644 --- a/server/src/agents/pool/wait.rs +++ b/server/src/agents/pool/wait.rs @@ -71,6 +71,7 @@ impl AgentPool { completion: None, log_session_id: None, throttled: false, + termination_reason: None, } }); } diff --git a/server/src/http/mcp/agent_tools.rs b/server/src/http/mcp/agent_tools.rs index 8f6d0024..de8046ad 100644 --- a/server/src/http/mcp/agent_tools.rs +++ b/server/src/http/mcp/agent_tools.rs @@ -259,8 +259,18 @@ pub(super) fn tool_get_agent_remaining_turns_and_budget( agent_info.status, crate::agents::AgentStatus::Running | crate::agents::AgentStatus::Pending ) { + let reason_suffix = agent_info + .termination_reason + .as_ref() + .map(|r| { + format!( + ", termination_reason: {}", + serde_json::to_string(r).unwrap_or_default() + ) + }) + .unwrap_or_default(); return Err(format!( - "Agent '{agent_name}' for story '{story_id}' is not running (status: {})", + "Agent '{agent_name}' for story '{story_id}' is not running (status: {}{reason_suffix})", agent_info.status )); } diff --git a/server/src/main.rs b/server/src/main.rs index f8ab475c..459fee03 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -852,7 +852,7 @@ async fn main() -> Result<(), std::io::Error> { // Watchdog: detect orphaned Running agents every 30 ticks. if tick_count.is_multiple_of(30) { - let found = tick_agents.run_watchdog_pass(); + let found = tick_agents.run_watchdog_pass(tick_root.as_deref()); if found > 0 { crate::slog!( "[tick] {found} orphaned agent(s) detected; triggering auto-assign." diff --git a/server/src/service/agents/selection.rs b/server/src/service/agents/selection.rs index fb7bafa4..0b9d9cc1 100644 --- a/server/src/service/agents/selection.rs +++ b/server/src/service/agents/selection.rs @@ -55,6 +55,7 @@ mod tests { completion: None, log_session_id: None, throttled: false, + termination_reason: None, } }