diff --git a/server/src/agents/pool/auto_assign/watchdog.rs b/server/src/agents/pool/auto_assign/watchdog.rs deleted file mode 100644 index c2c4b7db..00000000 --- a/server/src/agents/pool/auto_assign/watchdog.rs +++ /dev/null @@ -1,1069 +0,0 @@ -//! Watchdog task: detects orphaned agents, enforces turn/budget limits, and -//! triggers auto-assign. - -use std::collections::HashMap; -use std::path::Path; -use std::sync::Mutex; -use tokio::sync::broadcast; - -use crate::config::ProjectConfig; -use crate::slog; - -use super::super::super::{AgentEvent, AgentStatus, TerminationReason}; -use super::super::{AgentPool, StoryAgent}; - -/// Scan the agent pool for Running entries whose backing tokio task has already -/// finished and mark them as Failed. -/// -/// This handles the case where the PTY read loop or the spawned task exits -/// without updating the agent status — for example when the process is killed -/// externally and the PTY master fd returns EOF before our inactivity timeout -/// fires, but some other edge case prevents the normal cleanup path from running. -/// Snapshot of a running agent for limit checking: (composite_key, story_id, agent_name, event_tx, log_session_id). -type RunningAgentSnapshot = ( - String, - String, - String, - broadcast::Sender, - Option, -); - -pub(super) fn check_orphaned_agents(agents: &Mutex>) -> usize { - let mut lock = match agents.lock() { - Ok(l) => l, - Err(_) => return 0, - }; - - // Collect orphaned entries: Running or Pending agents whose task handle is finished. - // Pending agents can be orphaned if worktree creation panics before setting status. - let orphaned: Vec<(String, String, broadcast::Sender, AgentStatus)> = lock - .iter() - .filter_map(|(key, agent)| { - if matches!(agent.status, AgentStatus::Running | AgentStatus::Pending) - && let Some(handle) = &agent.task_handle - && handle.is_finished() - { - let story_id = key - .rsplit_once(':') - .map(|(s, _)| s.to_string()) - .unwrap_or_else(|| key.clone()); - return Some(( - key.clone(), - story_id, - agent.tx.clone(), - agent.status.clone(), - )); - } - None - }) - .collect(); - - let count = orphaned.len(); - for (key, story_id, tx, prev_status) in orphaned { - if let Some(agent) = lock.get_mut(&key) { - agent.status = AgentStatus::Failed; - slog!( - "[watchdog] Orphaned agent '{key}': task finished but status was {prev_status}. \ - Marking Failed." - ); - let _ = tx.send(AgentEvent::Error { - story_id, - agent_name: agent.agent_name.clone(), - message: "Agent process terminated unexpectedly (watchdog detected orphan)" - .to_string(), - }); - } - } - count -} - -/// Compute in-flight budget from per-message usage data in agent log files. -/// -/// Sums cost across **all** sessions for the given story+agent combination. -/// For completed sessions (containing a `result` event), uses the accurate -/// `total_cost_usd` from Claude Code. For running sessions (no `result` -/// event yet), estimates cost from per-turn token counts in `assistant` -/// events. -/// -/// This cumulative total is useful for cost analysis (metric tool) but is -/// NOT used by the watchdog for enforcement — the watchdog uses -/// [`compute_budget_from_single_log`] on the current session only. -pub(crate) fn compute_budget_from_logs( - project_root: &Path, - story_id: &str, - agent_name: &str, -) -> f64 { - let log_files = - crate::agent_log::list_story_log_files(project_root, story_id, Some(agent_name)); - let mut total_cost = 0.0; - - for path in &log_files { - total_cost += compute_budget_from_single_log(path); - } - - total_cost -} - -/// Compute budget from a single log file. -/// -/// If the log contains a `result` event with `total_cost_usd` (completed -/// session), returns that. Otherwise estimates cost from per-turn token -/// usage in `assistant` events (running session). -pub(crate) fn compute_budget_from_single_log(path: &Path) -> f64 { - use crate::agents::TokenUsage; - - let entries = match crate::agent_log::read_log(path) { - Ok(e) => e, - Err(_) => return 0.0, - }; - - // Check for a result event with accurate total_cost_usd (completed session). - let result_cost = entries.iter().rev().find_map(|entry| { - if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json") - && let Some(data) = entry.event.get("data") - && data.get("type").and_then(|v| v.as_str()) == Some("result") - { - data.get("total_cost_usd").and_then(|v| v.as_f64()) - } else { - None - } - }); - - if let Some(cost) = result_cost { - return cost; - } - - // Running session — estimate from per-message token usage. - let mut cost = 0.0; - for entry in &entries { - if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json") - && let Some(data) = entry.event.get("data") - && data.get("type").and_then(|v| v.as_str()) == Some("assistant") - && let Some(message) = data.get("message") - && let Some(usage) = message.get("usage") - { - let model = message.get("model").and_then(|v| v.as_str()); - let token_usage = TokenUsage { - input_tokens: usage - .get("input_tokens") - .and_then(|v| v.as_u64()) - .unwrap_or(0), - output_tokens: usage - .get("output_tokens") - .and_then(|v| v.as_u64()) - .unwrap_or(0), - cache_creation_input_tokens: usage - .get("cache_creation_input_tokens") - .and_then(|v| v.as_u64()) - .unwrap_or(0), - cache_read_input_tokens: usage - .get("cache_read_input_tokens") - .and_then(|v| v.as_u64()) - .unwrap_or(0), - total_cost_usd: 0.0, - }; - cost += token_usage.estimate_cost_usd(model); - } - } - cost -} - -/// Resolve the current session's log file for an agent. -/// -/// Uses `log_session_id` to construct the exact path when available; -/// otherwise falls back to the most recently modified log file for the -/// agent. -pub(crate) fn resolve_session_log( - project_root: &Path, - story_id: &str, - agent_name: &str, - log_session_id: &Option, -) -> Option { - if let Some(sid) = log_session_id { - let path = crate::agent_log::log_file_path(project_root, story_id, agent_name, sid); - if path.exists() { - return Some(path); - } - } - crate::agent_log::find_latest_log(project_root, story_id, agent_name) -} - -/// Count `assistant` events in a single log file. -pub(crate) fn count_turns_in_log(path: &Path) -> u64 { - let entries = match crate::agent_log::read_log(path) { - Ok(e) => e, - Err(_) => return 0, - }; - entries - .iter() - .filter(|entry| { - entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json") - && entry - .event - .get("data") - .and_then(|d| d.get("type")) - .and_then(|v| v.as_str()) - == Some("assistant") - }) - .count() as u64 -} - -/// Scan running agents for turn/budget limit violations and terminate offenders. -/// -/// Turns and budget are counted from the **current session's** log file -/// only — prior sessions are excluded so that restart counts from earlier -/// runs do not accumulate against the limits. -fn check_agent_limits( - agents: &Mutex>, - project_root: &Path, -) -> Vec<(String, TerminationReason)> { - let config = match ProjectConfig::load(project_root) { - Ok(c) => c, - Err(_) => return Vec::new(), - }; - - // Snapshot running agents: (key, story_id, agent_name, tx, log_session_id). - let running: Vec = { - let lock = match agents.lock() { - Ok(l) => l, - Err(_) => return Vec::new(), - }; - lock.iter() - .filter(|(_, agent)| agent.status == AgentStatus::Running) - .map(|(key, agent)| { - let story_id = key - .rsplit_once(':') - .map(|(s, _)| s.to_string()) - .unwrap_or_else(|| key.clone()); - ( - key.clone(), - story_id, - agent.agent_name.clone(), - agent.tx.clone(), - agent.log_session_id.clone(), - ) - }) - .collect() - }; - - let mut terminated = Vec::new(); - - for (key, story_id, agent_name, tx, log_session_id) in &running { - let agent_config = config.agent.iter().find(|a| a.name == *agent_name); - let max_turns = agent_config.and_then(|a| a.max_turns); - let max_budget_usd = agent_config.and_then(|a| a.max_budget_usd); - - // Skip agents with no limits configured. - if max_turns.is_none() && max_budget_usd.is_none() { - continue; - } - - // Resolve the current session's log file. - let session_log = resolve_session_log(project_root, story_id, agent_name, log_session_id); - - // Count turns from the current session's log file only. - let turns_used: u64 = if max_turns.is_some() { - session_log - .as_ref() - .map(|p| count_turns_in_log(p)) - .unwrap_or(0) - } else { - 0 - }; - - // Compute budget from the current session's log file only. - // Completed-session records from token_usage.jsonl are NOT included - // in the watchdog's enforcement — they belong to prior sessions. - let budget_used_usd: f64 = if max_budget_usd.is_some() { - session_log - .as_ref() - .map(|p| compute_budget_from_single_log(p)) - .unwrap_or(0.0) - } else { - 0.0 - }; - - // Determine if a limit is exceeded. - let reason = if let Some(max) = max_turns { - if turns_used >= max as u64 { - Some(TerminationReason::TurnLimit) - } else { - None - } - } else { - None - } - .or(if let Some(max) = max_budget_usd { - if budget_used_usd >= max { - Some(TerminationReason::BudgetLimit) - } else { - None - } - } else { - None - }); - - if let Some(reason) = reason { - let reason_str = match &reason { - TerminationReason::TurnLimit => { - format!("turn limit exceeded ({turns_used}/{})", max_turns.unwrap()) - } - TerminationReason::BudgetLimit => format!( - "budget limit exceeded (${budget_used_usd:.2}/${:.2})", - max_budget_usd.unwrap() - ), - }; - - // Mark agent as Failed with termination reason. - if let Ok(mut lock) = agents.lock() - && let Some(agent) = lock.get_mut(key) - { - agent.status = AgentStatus::Failed; - agent.termination_reason = Some(reason.clone()); - } - - slog!("[watchdog] Terminating agent '{key}': {reason_str}."); - - let _ = tx.send(AgentEvent::Error { - story_id: story_id.clone(), - agent_name: agent_name.clone(), - message: format!("Agent terminated by watchdog: {reason_str}"), - }); - - terminated.push((key.clone(), reason)); - } - } - - terminated -} - -impl AgentPool { - /// Run a single watchdog pass synchronously (test helper). - #[cfg(test)] - pub fn run_watchdog_once(&self) { - check_orphaned_agents(&self.agents); - } - - /// Run one watchdog pass: detect orphans, enforce limits, kill offenders. - /// - /// Called by the unified background tick loop every 30 ticks. - /// - /// When a limit is exceeded the agent's PTY child is killed and the - /// `should_block_story` retry mechanism is invoked. The story is marked - /// `blocked: true` only when `retry_count >= max_retries`; otherwise - /// `retry_count` is incremented and the story stays in `2_current/` for - /// re-attempt. This prevents the original kill-respawn loop (bug 646) - /// while restoring the `max_retries` semantic for turn/budget overruns. - pub fn run_watchdog_pass(&self, project_root: Option<&Path>) -> usize { - let orphaned = check_orphaned_agents(&self.agents); - - if let Some(root) = project_root { - let terminated = check_agent_limits(&self.agents, root); - let config = ProjectConfig::load(root).unwrap_or_default(); - for (key, _reason) in &terminated { - // Kill the PTY child and abort the task, same as stop_agent. - self.kill_child_for_key(key); - if let Ok(mut lock) = self.agents.lock() - && let Some(agent) = lock.get_mut(key) - && let Some(handle) = agent.task_handle.take() - { - handle.abort(); - } - - // Use the retry mechanism: increment retry_count and only block - // when the limit is exceeded, matching the pipeline's behaviour. - let story_id = key.rsplit_once(':').map(|(s, _)| s).unwrap_or(key); - if let Some(block_reason) = super::super::pipeline::should_block_story( - story_id, - config.max_retries, - "watchdog", - ) { - let _ = self - .watcher_tx - .send(crate::io::watcher::WatcherEvent::StoryBlocked { - story_id: story_id.to_string(), - reason: block_reason, - }); - slog!("[watchdog] Story '{story_id}' blocked after exceeding retry limit."); - } else { - slog!( - "[watchdog] Story '{story_id}' retry incremented after limit \ - termination; stays in 2_current/ for re-attempt." - ); - } - } - if !terminated.is_empty() { - Self::notify_agent_state_changed(&self.watcher_tx); - } - return orphaned + terminated.len(); - } - - orphaned - } -} - -// ── Tests ────────────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::super::super::{AgentPool, composite_key}; - use super::*; - - /// Write a fake session log file with `n` assistant turn entries. - /// - /// The file is named `{agent_name}-{session_id}.log` to match the - /// real naming convention used by `AgentLogWriter`. - fn write_fake_session_log( - project_root: &Path, - story_id: &str, - agent_name: &str, - session_id: &str, - n_turns: u64, - ) { - let log_dir = project_root.join(".huskies").join("logs").join(story_id); - std::fs::create_dir_all(&log_dir).unwrap(); - let log_path = log_dir.join(format!("{agent_name}-{session_id}.log")); - let mut content = String::new(); - for _ in 0..n_turns { - content.push_str( - &serde_json::to_string(&serde_json::json!({ - "timestamp": "2026-04-25T00:00:00Z", - "type": "agent_json", - "story_id": story_id, - "agent_name": agent_name, - "data": { "type": "assistant", "message": {} } - })) - .unwrap(), - ); - content.push('\n'); - } - std::fs::write(log_path, content).unwrap(); - } - - /// Write a fake session log containing a `result` event with the given cost. - /// - /// Used to test budget enforcement via the watchdog's per-session log - /// reading (not `token_usage.jsonl`). - fn write_fake_budget_session_log( - project_root: &Path, - story_id: &str, - agent_name: &str, - session_id: &str, - cost_usd: f64, - ) { - let log_dir = project_root.join(".huskies").join("logs").join(story_id); - std::fs::create_dir_all(&log_dir).unwrap(); - let log_path = log_dir.join(format!("{agent_name}-{session_id}.log")); - let content = serde_json::to_string(&serde_json::json!({ - "timestamp": "2026-04-25T00:00:00Z", - "type": "agent_json", - "story_id": story_id, - "agent_name": agent_name, - "data": { "type": "result", "total_cost_usd": cost_usd } - })) - .unwrap() - + "\n"; - std::fs::write(log_path, content).unwrap(); - } - - /// Write a minimal project.toml with the given agent config. - fn write_project_config(project_root: &Path, config_toml: &str) { - let huskies_dir = project_root.join(".huskies"); - std::fs::create_dir_all(&huskies_dir).unwrap(); - std::fs::write(huskies_dir.join("project.toml"), config_toml).unwrap(); - } - - // ── check_orphaned_agents return value tests (bug 161) ────────────────── - - #[tokio::test] - async fn check_orphaned_agents_returns_count_of_orphaned_agents() { - let pool = AgentPool::new_test(3001); - - // Spawn two tasks that finish immediately. - let h1 = tokio::spawn(async {}); - let h2 = tokio::spawn(async {}); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - assert!(h1.is_finished()); - assert!(h2.is_finished()); - - pool.inject_test_agent_with_handle("story_a", "coder", AgentStatus::Running, h1); - pool.inject_test_agent_with_handle("story_b", "coder", AgentStatus::Running, h2); - - let found = check_orphaned_agents(&pool.agents); - assert_eq!(found, 2, "should detect both orphaned agents"); - } - - #[test] - fn check_orphaned_agents_returns_zero_when_no_orphans() { - let pool = AgentPool::new_test(3001); - // Inject agents in terminal states — not orphaned. - pool.inject_test_agent("story_a", "coder", AgentStatus::Completed); - pool.inject_test_agent("story_b", "qa", AgentStatus::Failed); - - let found = check_orphaned_agents(&pool.agents); - assert_eq!( - found, 0, - "no orphans should be detected for terminal agents" - ); - } - - #[tokio::test] - async fn watchdog_detects_orphaned_running_agent() { - let pool = AgentPool::new_test(3001); - - let handle = tokio::spawn(async {}); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - assert!( - handle.is_finished(), - "task should be finished before injection" - ); - - let tx = pool.inject_test_agent_with_handle( - "orphan_story", - "coder", - AgentStatus::Running, - handle, - ); - let mut rx = tx.subscribe(); - - pool.run_watchdog_once(); - - { - let agents = pool.agents.lock().unwrap(); - let key = composite_key("orphan_story", "coder"); - let agent = agents.get(&key).unwrap(); - assert_eq!( - agent.status, - AgentStatus::Failed, - "watchdog must mark an orphaned Running agent as Failed" - ); - } - - let event = rx.try_recv().expect("watchdog must emit an Error event"); - assert!( - matches!(event, AgentEvent::Error { .. }), - "expected AgentEvent::Error, got: {event:?}" - ); - } - - // ── Limit enforcement integration tests (bug 624) ──────────────────────── - - #[test] - fn watchdog_terminates_agent_exceeding_turn_limit() { - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - write_project_config( - root, - r#" -[[agent]] -name = "coder-1" -runtime = "claude-code" -max_turns = 10 -"#, - ); - - // Write 12 turns in the current session (exceeds limit of 10). - write_fake_session_log(root, "story_a", "coder-1", "sess-current", 12); - - let pool = AgentPool::new_test(3001); - let tx = pool.inject_test_agent_with_session( - "story_a", - "coder-1", - AgentStatus::Running, - "sess-current", - ); - let mut rx = tx.subscribe(); - - let found = pool.run_watchdog_pass(Some(root)); - assert!(found >= 1, "watchdog should detect the over-limit agent"); - - // Agent should now be Failed with TurnLimit reason. - { - let agents = pool.agents.lock().unwrap(); - let key = composite_key("story_a", "coder-1"); - let agent = agents.get(&key).unwrap(); - assert_eq!(agent.status, AgentStatus::Failed); - assert_eq!( - agent.termination_reason, - Some(TerminationReason::TurnLimit), - "termination reason must be TurnLimit" - ); - } - - let event = rx.try_recv().expect("watchdog must emit an Error event"); - assert!( - matches!(event, AgentEvent::Error { .. }), - "expected AgentEvent::Error, got: {event:?}" - ); - } - - #[test] - fn watchdog_terminates_agent_exceeding_budget_limit() { - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - write_project_config( - root, - r#" -[[agent]] -name = "coder-1" -runtime = "claude-code" -max_budget_usd = 5.00 -"#, - ); - - // Write $6.00 in the current session's log (exceeds limit of $5.00). - write_fake_budget_session_log(root, "story_b", "coder-1", "sess-budget", 6.00); - - let pool = AgentPool::new_test(3001); - let tx = pool.inject_test_agent_with_session( - "story_b", - "coder-1", - AgentStatus::Running, - "sess-budget", - ); - let mut rx = tx.subscribe(); - - let found = pool.run_watchdog_pass(Some(root)); - assert!(found >= 1, "watchdog should detect the over-budget agent"); - - { - let agents = pool.agents.lock().unwrap(); - let key = composite_key("story_b", "coder-1"); - let agent = agents.get(&key).unwrap(); - assert_eq!(agent.status, AgentStatus::Failed); - assert_eq!( - agent.termination_reason, - Some(TerminationReason::BudgetLimit), - "termination reason must be BudgetLimit" - ); - } - - let event = rx.try_recv().expect("watchdog must emit an Error event"); - assert!(matches!(event, AgentEvent::Error { .. })); - } - - #[test] - fn watchdog_does_not_terminate_agent_under_limits() { - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - write_project_config( - root, - r#" -[[agent]] -name = "coder-1" -runtime = "claude-code" -max_turns = 50 -max_budget_usd = 10.00 -"#, - ); - - // Agent is under both limits in the current session. - write_fake_session_log(root, "story_c", "coder-1", "sess-ok", 25); - write_fake_budget_session_log(root, "story_c", "coder-1", "sess-ok-budget", 3.00); - - let pool = AgentPool::new_test(3001); - // Use the turns session (the budget session is a separate file; - // resolve_session_log picks the specified one for turns, and - // find_latest_log for budget if needed — but here the turns file - // has 25 turns < 50 so no violation). - pool.inject_test_agent_with_session("story_c", "coder-1", AgentStatus::Running, "sess-ok"); - - let found = pool.run_watchdog_pass(Some(root)); - assert_eq!(found, 0, "agent under limits should not be terminated"); - - { - let agents = pool.agents.lock().unwrap(); - let key = composite_key("story_c", "coder-1"); - let agent = agents.get(&key).unwrap(); - assert_eq!( - agent.status, - AgentStatus::Running, - "agent under limits should stay Running" - ); - assert!(agent.termination_reason.is_none()); - } - } - - /// Regression test for the original bug 624 incident: - /// coder-1 with max_turns=50, max_budget_usd=5.00 ran 5.6× over the turn - /// limit (280 turns). The watchdog must terminate at the turn limit (turns - /// hit first in the observed trace), with reason TurnLimit. - #[test] - fn regression_bug624_coder1_story623_trajectory() { - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - write_project_config( - root, - r#" -[[agent]] -name = "coder-1" -runtime = "claude-code" -max_turns = 50 -max_budget_usd = 5.00 -"#, - ); - - // Simulate the trajectory: 55 turns in the current session (just past - // the limit) and $2.50 spent (under budget). Turns hit first, so - // reason should be TurnLimit. - write_fake_session_log(root, "story_623", "coder-1", "sess-624", 55); - - let pool = AgentPool::new_test(3001); - let tx = pool.inject_test_agent_with_session( - "story_623", - "coder-1", - AgentStatus::Running, - "sess-624", - ); - let mut rx = tx.subscribe(); - - let found = pool.run_watchdog_pass(Some(root)); - assert!(found >= 1, "watchdog must catch the turn-limit violation"); - - { - let agents = pool.agents.lock().unwrap(); - let key = composite_key("story_623", "coder-1"); - let agent = agents.get(&key).unwrap(); - assert_eq!(agent.status, AgentStatus::Failed); - assert_eq!( - agent.termination_reason, - Some(TerminationReason::TurnLimit), - "turns hit first in the observed trace, so reason must be TurnLimit" - ); - } - - // The error event should have been emitted. - let event = rx.try_recv().expect("watchdog must emit an Error event"); - if let AgentEvent::Error { message, .. } = &event { - assert!( - message.contains("turn limit"), - "error message should mention turn limit, got: {message}" - ); - } else { - panic!("expected AgentEvent::Error, got: {event:?}"); - } - } - - #[tokio::test] - async fn watchdog_orphan_detection_returns_nonzero_enabling_auto_assign() { - // This test verifies the contract that `check_orphaned_agents` returns - // a non-zero count when orphans exist, which the watchdog uses to - // decide whether to trigger auto-assign (bug 161). - let pool = AgentPool::new_test(3001); - - let handle = tokio::spawn(async {}); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - - pool.inject_test_agent_with_handle("orphan_story", "coder", AgentStatus::Running, handle); - - // Before watchdog: agent is Running. - { - let agents = pool.agents.lock().unwrap(); - let key = composite_key("orphan_story", "coder"); - assert_eq!(agents.get(&key).unwrap().status, AgentStatus::Running); - } - - // Run watchdog pass — should return 1 (orphan found). - let found = check_orphaned_agents(&pool.agents); - assert_eq!( - found, 1, - "watchdog must return 1 for a single orphaned agent" - ); - - // After watchdog: agent is Failed. - { - let agents = pool.agents.lock().unwrap(); - let key = composite_key("orphan_story", "coder"); - assert_eq!( - agents.get(&key).unwrap().status, - AgentStatus::Failed, - "orphaned agent must be marked Failed" - ); - } - } - - // ── Kill-respawn loop fix (bug 646), updated for per-session + retry ─── - - /// When the watchdog terminates an agent for limit-exceeded AND the story - /// has exhausted its retries, it must be marked `blocked: true` in CRDT - /// state so `auto_assign_available_work` won't re-spawn the agent. - /// - /// This test seeds a single session that legitimately exceeds the limit - /// and uses `max_retries = 1` so that the first violation blocks. - #[test] - fn watchdog_marks_story_blocked_after_limit_termination() { - crate::db::ensure_content_store(); - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - write_project_config( - root, - r#" -max_retries = 1 - -[[agent]] -name = "coder-1" -runtime = "claude-code" -max_turns = 10 -"#, - ); - - // Write story content into the CRDT-backed content store so the - // watchdog's retry/block path has something to read+update. - let story_id = "42_story_runaway"; - let initial = "---\nname: Runaway Story\n---\n# Runaway Story\n"; - crate::db::write_content(story_id, initial); - - // 12 turns in a single session exceeds the configured max of 10. - write_fake_session_log(root, story_id, "coder-1", "sess-runaway", 12); - - let pool = AgentPool::new_test(3001); - let _tx = pool.inject_test_agent_with_session( - story_id, - "coder-1", - AgentStatus::Running, - "sess-runaway", - ); - - let found = pool.run_watchdog_pass(Some(root)); - assert!(found >= 1, "watchdog should detect the over-limit agent"); - - // With max_retries=1, the first violation blocks immediately. - let updated = crate::db::read_content(story_id) - .expect("story content must still exist after watchdog termination"); - assert!( - updated.contains("blocked: true"), - "story must be marked `blocked: true` after limit termination with max_retries=1 — got:\n{updated}" - ); - - // Sanity: the agent itself is also Failed with the right reason. - { - let agents = pool.agents.lock().unwrap(); - let key = composite_key(story_id, "coder-1"); - let agent = agents.get(&key).unwrap(); - assert_eq!(agent.status, AgentStatus::Failed); - assert_eq!( - agent.termination_reason, - Some(TerminationReason::TurnLimit), - "termination reason must be TurnLimit" - ); - } - } - - // ── Per-session counting (bug 650) ────────────────────────────────────── - - /// Seed multiple prior session log files whose combined assistant-event - /// count exceeds `max_turns`. Then inject a NEW running agent with a - /// fresh session_id whose log has fewer events than `max_turns`. - /// Assert the agent is NOT terminated (per-session count is under the - /// limit) AND the story is NOT marked blocked. - #[test] - fn per_session_counting_does_not_terminate_under_limit() { - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - write_project_config( - root, - r#" -[[agent]] -name = "coder-1" -runtime = "claude-code" -max_turns = 10 -"#, - ); - - // 3 prior sessions with 5 turns each = 15 total (above max_turns=10). - write_fake_session_log(root, "story_d", "coder-1", "old-sess-1", 5); - write_fake_session_log(root, "story_d", "coder-1", "old-sess-2", 5); - write_fake_session_log(root, "story_d", "coder-1", "old-sess-3", 5); - - // New running session has only 3 turns (under limit of 10). - write_fake_session_log(root, "story_d", "coder-1", "new-sess", 3); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_session("story_d", "coder-1", AgentStatus::Running, "new-sess"); - - let found = pool.run_watchdog_pass(Some(root)); - assert_eq!( - found, 0, - "agent under per-session limit should NOT be terminated" - ); - - { - let agents = pool.agents.lock().unwrap(); - let key = composite_key("story_d", "coder-1"); - let agent = agents.get(&key).unwrap(); - assert_eq!( - agent.status, - AgentStatus::Running, - "agent under per-session limit should stay Running" - ); - assert!(agent.termination_reason.is_none()); - } - } - - /// Same setup as per_session_counting_does_not_terminate_under_limit, but - /// the new agent's own session log exceeds `max_turns`. Assert the agent - /// IS terminated AND (with max_retries=1) the story IS marked blocked. - #[test] - fn per_session_counting_terminates_over_limit() { - crate::db::ensure_content_store(); - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - write_project_config( - root, - r#" -max_retries = 1 - -[[agent]] -name = "coder-1" -runtime = "claude-code" -max_turns = 10 -"#, - ); - - let story_id = "story_e_per_session"; - crate::db::write_content(story_id, "---\nname: Per-Session Test\n---\n"); - - // Prior session with 5 turns (under limit alone). - write_fake_session_log(root, story_id, "coder-1", "old-sess", 5); - - // Current session with 12 turns (over limit). - write_fake_session_log(root, story_id, "coder-1", "new-sess", 12); - - let pool = AgentPool::new_test(3001); - let tx = pool.inject_test_agent_with_session( - story_id, - "coder-1", - AgentStatus::Running, - "new-sess", - ); - let mut rx = tx.subscribe(); - - let found = pool.run_watchdog_pass(Some(root)); - assert!( - found >= 1, - "agent over per-session limit must be terminated" - ); - - { - let agents = pool.agents.lock().unwrap(); - let key = composite_key(story_id, "coder-1"); - let agent = agents.get(&key).unwrap(); - assert_eq!(agent.status, AgentStatus::Failed); - assert_eq!(agent.termination_reason, Some(TerminationReason::TurnLimit),); - } - - let event = rx.try_recv().expect("watchdog must emit an Error event"); - assert!(matches!(event, AgentEvent::Error { .. })); - - // With max_retries=1, the story is blocked. - let updated = crate::db::read_content(story_id).unwrap(); - assert!( - updated.contains("blocked: true"), - "story must be blocked after per-session overrun with max_retries=1" - ); - } - - // ── Retry semantic integration test (bug 650) ─────────────────────────── - - /// With `max_retries = 3`, simulate separate sessions each exceeding - /// `max_turns`. After session 1: retry_count=1, NOT blocked. After - /// session 2: retry_count=2, NOT blocked. After session 3: - /// retry_count=3 >= max_retries, story IS blocked. - #[test] - fn watchdog_retry_semantic_blocks_after_max_retries() { - crate::db::ensure_content_store(); - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - write_project_config( - root, - r#" -max_retries = 3 - -[[agent]] -name = "coder-1" -runtime = "claude-code" -max_turns = 10 -"#, - ); - - let story_id = "88_story_retry_watchdog"; - let initial = "---\nname: Retry Test\n---\n"; - crate::db::write_content(story_id, initial); - - // Session 1: exceeds limit → retry_count=1, NOT blocked. - { - write_fake_session_log(root, story_id, "coder-1", "session-1", 12); - let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_session( - story_id, - "coder-1", - AgentStatus::Running, - "session-1", - ); - pool.run_watchdog_pass(Some(root)); - - let content = crate::db::read_content(story_id).unwrap(); - assert!( - content.contains("retry_count: 1"), - "after session 1, retry_count should be 1 — got:\n{content}" - ); - assert!( - !content.contains("blocked: true"), - "story should NOT be blocked after session 1" - ); - } - - // Session 2: exceeds limit → retry_count=2, NOT blocked. - { - write_fake_session_log(root, story_id, "coder-1", "session-2", 12); - let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_session( - story_id, - "coder-1", - AgentStatus::Running, - "session-2", - ); - pool.run_watchdog_pass(Some(root)); - - let content = crate::db::read_content(story_id).unwrap(); - assert!( - content.contains("retry_count: 2"), - "after session 2, retry_count should be 2 — got:\n{content}" - ); - assert!( - !content.contains("blocked: true"), - "story should NOT be blocked after session 2" - ); - } - - // Session 3: exceeds limit → retry_count=3 >= max_retries(3), IS blocked. - { - write_fake_session_log(root, story_id, "coder-1", "session-3", 12); - let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_session( - story_id, - "coder-1", - AgentStatus::Running, - "session-3", - ); - pool.run_watchdog_pass(Some(root)); - - let content = crate::db::read_content(story_id).unwrap(); - assert!( - content.contains("blocked: true"), - "story must be blocked after session 3 (retry_count=3 >= max_retries=3) — got:\n{content}" - ); - } - } -} diff --git a/server/src/agents/pool/auto_assign/watchdog/budget.rs b/server/src/agents/pool/auto_assign/watchdog/budget.rs new file mode 100644 index 00000000..5c11dc23 --- /dev/null +++ b/server/src/agents/pool/auto_assign/watchdog/budget.rs @@ -0,0 +1,94 @@ +//! Budget computation: estimates USD spend from agent log files. + +use std::path::Path; + +/// Compute in-flight budget from per-message usage data in agent log files. +/// +/// Sums cost across **all** sessions for the given story+agent combination. +/// For completed sessions (containing a `result` event), uses the accurate +/// `total_cost_usd` from Claude Code. For running sessions (no `result` +/// event yet), estimates cost from per-turn token counts in `assistant` +/// events. +/// +/// This cumulative total is useful for cost analysis (metric tool) but is +/// NOT used by the watchdog for enforcement — the watchdog uses +/// [`compute_budget_from_single_log`] on the current session only. +pub(crate) fn compute_budget_from_logs( + project_root: &Path, + story_id: &str, + agent_name: &str, +) -> f64 { + let log_files = + crate::agent_log::list_story_log_files(project_root, story_id, Some(agent_name)); + let mut total_cost = 0.0; + + for path in &log_files { + total_cost += compute_budget_from_single_log(path); + } + + total_cost +} + +/// Compute budget from a single log file. +/// +/// If the log contains a `result` event with `total_cost_usd` (completed +/// session), returns that. Otherwise estimates cost from per-turn token +/// usage in `assistant` events (running session). +pub(crate) fn compute_budget_from_single_log(path: &Path) -> f64 { + use crate::agents::TokenUsage; + + let entries = match crate::agent_log::read_log(path) { + Ok(e) => e, + Err(_) => return 0.0, + }; + + // Check for a result event with accurate total_cost_usd (completed session). + let result_cost = entries.iter().rev().find_map(|entry| { + if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json") + && let Some(data) = entry.event.get("data") + && data.get("type").and_then(|v| v.as_str()) == Some("result") + { + data.get("total_cost_usd").and_then(|v| v.as_f64()) + } else { + None + } + }); + + if let Some(cost) = result_cost { + return cost; + } + + // Running session — estimate from per-message token usage. + let mut cost = 0.0; + for entry in &entries { + if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json") + && let Some(data) = entry.event.get("data") + && data.get("type").and_then(|v| v.as_str()) == Some("assistant") + && let Some(message) = data.get("message") + && let Some(usage) = message.get("usage") + { + let model = message.get("model").and_then(|v| v.as_str()); + let token_usage = TokenUsage { + input_tokens: usage + .get("input_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0), + output_tokens: usage + .get("output_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0), + cache_creation_input_tokens: usage + .get("cache_creation_input_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0), + cache_read_input_tokens: usage + .get("cache_read_input_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0), + total_cost_usd: 0.0, + }; + cost += token_usage.estimate_cost_usd(model); + } + } + cost +} diff --git a/server/src/agents/pool/auto_assign/watchdog/limits.rs b/server/src/agents/pool/auto_assign/watchdog/limits.rs new file mode 100644 index 00000000..6124ef94 --- /dev/null +++ b/server/src/agents/pool/auto_assign/watchdog/limits.rs @@ -0,0 +1,191 @@ +//! Limit enforcement: checks agent turn/budget limits and terminates offenders. + +use std::collections::HashMap; +use std::path::Path; +use std::sync::Mutex; +use tokio::sync::broadcast; + +use crate::agents::pool::StoryAgent; +use crate::agents::{AgentEvent, AgentStatus, TerminationReason}; +use crate::config::ProjectConfig; +use crate::slog; + +use super::budget::compute_budget_from_single_log; + +/// Snapshot of a running agent for limit checking: (composite_key, story_id, agent_name, event_tx, log_session_id). +type RunningAgentSnapshot = ( + String, + String, + String, + broadcast::Sender, + Option, +); + +/// Resolve the current session's log file for an agent. +/// +/// Uses `log_session_id` to construct the exact path when available; +/// otherwise falls back to the most recently modified log file for the +/// agent. +pub(crate) fn resolve_session_log( + project_root: &Path, + story_id: &str, + agent_name: &str, + log_session_id: &Option, +) -> Option { + if let Some(sid) = log_session_id { + let path = crate::agent_log::log_file_path(project_root, story_id, agent_name, sid); + if path.exists() { + return Some(path); + } + } + crate::agent_log::find_latest_log(project_root, story_id, agent_name) +} + +/// Count `assistant` events in a single log file. +pub(crate) fn count_turns_in_log(path: &Path) -> u64 { + let entries = match crate::agent_log::read_log(path) { + Ok(e) => e, + Err(_) => return 0, + }; + entries + .iter() + .filter(|entry| { + entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json") + && entry + .event + .get("data") + .and_then(|d| d.get("type")) + .and_then(|v| v.as_str()) + == Some("assistant") + }) + .count() as u64 +} + +/// Scan running agents for turn/budget limit violations and terminate offenders. +/// +/// Turns and budget are counted from the **current session's** log file +/// only — prior sessions are excluded so that restart counts from earlier +/// runs do not accumulate against the limits. +pub(super) fn check_agent_limits( + agents: &Mutex>, + project_root: &Path, +) -> Vec<(String, TerminationReason)> { + let config = match ProjectConfig::load(project_root) { + Ok(c) => c, + Err(_) => return Vec::new(), + }; + + // Snapshot running agents: (key, story_id, agent_name, tx, log_session_id). + let running: Vec = { + let lock = match agents.lock() { + Ok(l) => l, + Err(_) => return Vec::new(), + }; + lock.iter() + .filter(|(_, agent)| agent.status == AgentStatus::Running) + .map(|(key, agent)| { + let story_id = key + .rsplit_once(':') + .map(|(s, _)| s.to_string()) + .unwrap_or_else(|| key.clone()); + ( + key.clone(), + story_id, + agent.agent_name.clone(), + agent.tx.clone(), + agent.log_session_id.clone(), + ) + }) + .collect() + }; + + let mut terminated = Vec::new(); + + for (key, story_id, agent_name, tx, log_session_id) in &running { + let agent_config = config.agent.iter().find(|a| a.name == *agent_name); + let max_turns = agent_config.and_then(|a| a.max_turns); + let max_budget_usd = agent_config.and_then(|a| a.max_budget_usd); + + // Skip agents with no limits configured. + if max_turns.is_none() && max_budget_usd.is_none() { + continue; + } + + // Resolve the current session's log file. + let session_log = resolve_session_log(project_root, story_id, agent_name, log_session_id); + + // Count turns from the current session's log file only. + let turns_used: u64 = if max_turns.is_some() { + session_log + .as_ref() + .map(|p| count_turns_in_log(p)) + .unwrap_or(0) + } else { + 0 + }; + + // Compute budget from the current session's log file only. + // Completed-session records from token_usage.jsonl are NOT included + // in the watchdog's enforcement — they belong to prior sessions. + let budget_used_usd: f64 = if max_budget_usd.is_some() { + session_log + .as_ref() + .map(|p| compute_budget_from_single_log(p)) + .unwrap_or(0.0) + } else { + 0.0 + }; + + // Determine if a limit is exceeded. + let reason = if let Some(max) = max_turns { + if turns_used >= max as u64 { + Some(TerminationReason::TurnLimit) + } else { + None + } + } else { + None + } + .or(if let Some(max) = max_budget_usd { + if budget_used_usd >= max { + Some(TerminationReason::BudgetLimit) + } else { + None + } + } else { + None + }); + + if let Some(reason) = reason { + let reason_str = match &reason { + TerminationReason::TurnLimit => { + format!("turn limit exceeded ({turns_used}/{})", max_turns.unwrap()) + } + TerminationReason::BudgetLimit => format!( + "budget limit exceeded (${budget_used_usd:.2}/${:.2})", + max_budget_usd.unwrap() + ), + }; + + // Mark agent as Failed with termination reason. + if let Ok(mut lock) = agents.lock() + && let Some(agent) = lock.get_mut(key) + { + agent.status = AgentStatus::Failed; + agent.termination_reason = Some(reason.clone()); + } + + slog!("[watchdog] Terminating agent '{key}': {reason_str}."); + + let _ = tx.send(AgentEvent::Error { + story_id: story_id.clone(), + agent_name: agent_name.clone(), + message: format!("Agent terminated by watchdog: {reason_str}"), + }); + + terminated.push((key.clone(), reason)); + } + } + + terminated +} diff --git a/server/src/agents/pool/auto_assign/watchdog/mod.rs b/server/src/agents/pool/auto_assign/watchdog/mod.rs new file mode 100644 index 00000000..3dfd405a --- /dev/null +++ b/server/src/agents/pool/auto_assign/watchdog/mod.rs @@ -0,0 +1,85 @@ +//! Watchdog task: detects orphaned agents, enforces turn/budget limits, and +//! triggers auto-assign. + +mod budget; +mod limits; +mod orphan; +#[cfg(test)] +mod tests; + +use std::path::Path; + +use crate::config::ProjectConfig; +use crate::slog; + +use super::super::AgentPool; +use limits::check_agent_limits; +use orphan::check_orphaned_agents; + +pub(crate) use budget::{compute_budget_from_logs, compute_budget_from_single_log}; +pub(crate) use limits::{count_turns_in_log, resolve_session_log}; + +impl AgentPool { + /// Run a single watchdog pass synchronously (test helper). + #[cfg(test)] + pub fn run_watchdog_once(&self) { + check_orphaned_agents(&self.agents); + } + + /// Run one watchdog pass: detect orphans, enforce limits, kill offenders. + /// + /// Called by the unified background tick loop every 30 ticks. + /// + /// When a limit is exceeded the agent's PTY child is killed and the + /// `should_block_story` retry mechanism is invoked. The story is marked + /// `blocked: true` only when `retry_count >= max_retries`; otherwise + /// `retry_count` is incremented and the story stays in `2_current/` for + /// re-attempt. This prevents the original kill-respawn loop (bug 646) + /// while restoring the `max_retries` semantic for turn/budget overruns. + pub fn run_watchdog_pass(&self, project_root: Option<&Path>) -> usize { + let orphaned = check_orphaned_agents(&self.agents); + + if let Some(root) = project_root { + let terminated = check_agent_limits(&self.agents, root); + let config = ProjectConfig::load(root).unwrap_or_default(); + for (key, _reason) in &terminated { + // Kill the PTY child and abort the task, same as stop_agent. + self.kill_child_for_key(key); + if let Ok(mut lock) = self.agents.lock() + && let Some(agent) = lock.get_mut(key) + && let Some(handle) = agent.task_handle.take() + { + handle.abort(); + } + + // Use the retry mechanism: increment retry_count and only block + // when the limit is exceeded, matching the pipeline's behaviour. + let story_id = key.rsplit_once(':').map(|(s, _)| s).unwrap_or(key); + if let Some(block_reason) = super::super::pipeline::should_block_story( + story_id, + config.max_retries, + "watchdog", + ) { + let _ = self + .watcher_tx + .send(crate::io::watcher::WatcherEvent::StoryBlocked { + story_id: story_id.to_string(), + reason: block_reason, + }); + slog!("[watchdog] Story '{story_id}' blocked after exceeding retry limit."); + } else { + slog!( + "[watchdog] Story '{story_id}' retry incremented after limit \ + termination; stays in 2_current/ for re-attempt." + ); + } + } + if !terminated.is_empty() { + Self::notify_agent_state_changed(&self.watcher_tx); + } + return orphaned + terminated.len(); + } + + orphaned + } +} diff --git a/server/src/agents/pool/auto_assign/watchdog/orphan.rs b/server/src/agents/pool/auto_assign/watchdog/orphan.rs new file mode 100644 index 00000000..5f7b3358 --- /dev/null +++ b/server/src/agents/pool/auto_assign/watchdog/orphan.rs @@ -0,0 +1,65 @@ +//! Orphan detection: marks running agents whose backing task has exited. + +use std::collections::HashMap; +use std::sync::Mutex; +use tokio::sync::broadcast; + +use crate::agents::pool::StoryAgent; +use crate::agents::{AgentEvent, AgentStatus}; +use crate::slog; + +/// Scan the agent pool for Running entries whose backing tokio task has already +/// finished and mark them as Failed. +/// +/// This handles the case where the PTY read loop or the spawned task exits +/// without updating the agent status — for example when the process is killed +/// externally and the PTY master fd returns EOF before our inactivity timeout +/// fires, but some other edge case prevents the normal cleanup path from running. +pub(super) fn check_orphaned_agents(agents: &Mutex>) -> usize { + let mut lock = match agents.lock() { + Ok(l) => l, + Err(_) => return 0, + }; + + // Collect orphaned entries: Running or Pending agents whose task handle is finished. + // Pending agents can be orphaned if worktree creation panics before setting status. + let orphaned: Vec<(String, String, broadcast::Sender, AgentStatus)> = lock + .iter() + .filter_map(|(key, agent)| { + if matches!(agent.status, AgentStatus::Running | AgentStatus::Pending) + && let Some(handle) = &agent.task_handle + && handle.is_finished() + { + let story_id = key + .rsplit_once(':') + .map(|(s, _)| s.to_string()) + .unwrap_or_else(|| key.clone()); + return Some(( + key.clone(), + story_id, + agent.tx.clone(), + agent.status.clone(), + )); + } + None + }) + .collect(); + + let count = orphaned.len(); + for (key, story_id, tx, prev_status) in orphaned { + if let Some(agent) = lock.get_mut(&key) { + agent.status = AgentStatus::Failed; + slog!( + "[watchdog] Orphaned agent '{key}': task finished but status was {prev_status}. \ + Marking Failed." + ); + let _ = tx.send(AgentEvent::Error { + story_id, + agent_name: agent.agent_name.clone(), + message: "Agent process terminated unexpectedly (watchdog detected orphan)" + .to_string(), + }); + } + } + count +} diff --git a/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs b/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs new file mode 100644 index 00000000..7583e5e6 --- /dev/null +++ b/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs @@ -0,0 +1,470 @@ +//! Limit-enforcement, kill-respawn, per-session counting, and retry tests +//! for the watchdog (bugs 624, 646, 650). + +use super::super::super::super::{AgentPool, composite_key}; +use super::{write_fake_budget_session_log, write_fake_session_log, write_project_config}; +use crate::agents::{AgentEvent, AgentStatus, TerminationReason}; + +// ── Limit enforcement integration tests (bug 624) ──────────────────────── + +#[test] +fn watchdog_terminates_agent_exceeding_turn_limit() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_turns = 10 +"#, + ); + + // Write 12 turns in the current session (exceeds limit of 10). + write_fake_session_log(root, "story_a", "coder-1", "sess-current", 12); + + let pool = AgentPool::new_test(3001); + let tx = pool.inject_test_agent_with_session( + "story_a", + "coder-1", + AgentStatus::Running, + "sess-current", + ); + let mut rx = tx.subscribe(); + + let found = pool.run_watchdog_pass(Some(root)); + assert!(found >= 1, "watchdog should detect the over-limit agent"); + + // Agent should now be Failed with TurnLimit reason. + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("story_a", "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!(agent.status, AgentStatus::Failed); + assert_eq!( + agent.termination_reason, + Some(TerminationReason::TurnLimit), + "termination reason must be TurnLimit" + ); + } + + let event = rx.try_recv().expect("watchdog must emit an Error event"); + assert!( + matches!(event, AgentEvent::Error { .. }), + "expected AgentEvent::Error, got: {event:?}" + ); +} + +#[test] +fn watchdog_terminates_agent_exceeding_budget_limit() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_budget_usd = 5.00 +"#, + ); + + // Write $6.00 in the current session's log (exceeds limit of $5.00). + write_fake_budget_session_log(root, "story_b", "coder-1", "sess-budget", 6.00); + + let pool = AgentPool::new_test(3001); + let tx = pool.inject_test_agent_with_session( + "story_b", + "coder-1", + AgentStatus::Running, + "sess-budget", + ); + let mut rx = tx.subscribe(); + + let found = pool.run_watchdog_pass(Some(root)); + assert!(found >= 1, "watchdog should detect the over-budget agent"); + + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("story_b", "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!(agent.status, AgentStatus::Failed); + assert_eq!( + agent.termination_reason, + Some(TerminationReason::BudgetLimit), + "termination reason must be BudgetLimit" + ); + } + + let event = rx.try_recv().expect("watchdog must emit an Error event"); + assert!(matches!(event, AgentEvent::Error { .. })); +} + +#[test] +fn watchdog_does_not_terminate_agent_under_limits() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_turns = 50 +max_budget_usd = 10.00 +"#, + ); + + // Agent is under both limits in the current session. + write_fake_session_log(root, "story_c", "coder-1", "sess-ok", 25); + write_fake_budget_session_log(root, "story_c", "coder-1", "sess-ok-budget", 3.00); + + let pool = AgentPool::new_test(3001); + // Use the turns session (the budget session is a separate file; + // resolve_session_log picks the specified one for turns, and + // find_latest_log for budget if needed — but here the turns file + // has 25 turns < 50 so no violation). + pool.inject_test_agent_with_session("story_c", "coder-1", AgentStatus::Running, "sess-ok"); + + let found = pool.run_watchdog_pass(Some(root)); + assert_eq!(found, 0, "agent under limits should not be terminated"); + + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("story_c", "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!( + agent.status, + AgentStatus::Running, + "agent under limits should stay Running" + ); + assert!(agent.termination_reason.is_none()); + } +} + +/// Regression test for the original bug 624 incident: +/// coder-1 with max_turns=50, max_budget_usd=5.00 ran 5.6× over the turn +/// limit (280 turns). The watchdog must terminate at the turn limit (turns +/// hit first in the observed trace), with reason TurnLimit. +#[test] +fn regression_bug624_coder1_story623_trajectory() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_turns = 50 +max_budget_usd = 5.00 +"#, + ); + + // Simulate the trajectory: 55 turns in the current session (just past + // the limit) and $2.50 spent (under budget). Turns hit first, so + // reason should be TurnLimit. + write_fake_session_log(root, "story_623", "coder-1", "sess-624", 55); + + let pool = AgentPool::new_test(3001); + let tx = pool.inject_test_agent_with_session( + "story_623", + "coder-1", + AgentStatus::Running, + "sess-624", + ); + let mut rx = tx.subscribe(); + + let found = pool.run_watchdog_pass(Some(root)); + assert!(found >= 1, "watchdog must catch the turn-limit violation"); + + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("story_623", "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!(agent.status, AgentStatus::Failed); + assert_eq!( + agent.termination_reason, + Some(TerminationReason::TurnLimit), + "turns hit first in the observed trace, so reason must be TurnLimit" + ); + } + + // The error event should have been emitted. + let event = rx.try_recv().expect("watchdog must emit an Error event"); + if let AgentEvent::Error { message, .. } = &event { + assert!( + message.contains("turn limit"), + "error message should mention turn limit, got: {message}" + ); + } else { + panic!("expected AgentEvent::Error, got: {event:?}"); + } +} + +// ── Kill-respawn loop fix (bug 646), updated for per-session + retry ─── + +/// When the watchdog terminates an agent for limit-exceeded AND the story +/// has exhausted its retries, it must be marked `blocked: true` in CRDT +/// state so `auto_assign_available_work` won't re-spawn the agent. +/// +/// This test seeds a single session that legitimately exceeds the limit +/// and uses `max_retries = 1` so that the first violation blocks. +#[test] +fn watchdog_marks_story_blocked_after_limit_termination() { + crate::db::ensure_content_store(); + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +max_retries = 1 + +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_turns = 10 +"#, + ); + + // Write story content into the CRDT-backed content store so the + // watchdog's retry/block path has something to read+update. + let story_id = "42_story_runaway"; + let initial = "---\nname: Runaway Story\n---\n# Runaway Story\n"; + crate::db::write_content(story_id, initial); + + // 12 turns in a single session exceeds the configured max of 10. + write_fake_session_log(root, story_id, "coder-1", "sess-runaway", 12); + + let pool = AgentPool::new_test(3001); + let _tx = pool.inject_test_agent_with_session( + story_id, + "coder-1", + AgentStatus::Running, + "sess-runaway", + ); + + let found = pool.run_watchdog_pass(Some(root)); + assert!(found >= 1, "watchdog should detect the over-limit agent"); + + // With max_retries=1, the first violation blocks immediately. + let updated = crate::db::read_content(story_id) + .expect("story content must still exist after watchdog termination"); + assert!( + updated.contains("blocked: true"), + "story must be marked `blocked: true` after limit termination with max_retries=1 — got:\n{updated}" + ); + + // Sanity: the agent itself is also Failed with the right reason. + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key(story_id, "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!(agent.status, AgentStatus::Failed); + assert_eq!( + agent.termination_reason, + Some(TerminationReason::TurnLimit), + "termination reason must be TurnLimit" + ); + } +} + +// ── Per-session counting (bug 650) ────────────────────────────────────── + +/// Seed multiple prior session log files whose combined assistant-event +/// count exceeds `max_turns`. Then inject a NEW running agent with a +/// fresh session_id whose log has fewer events than `max_turns`. +/// Assert the agent is NOT terminated (per-session count is under the +/// limit) AND the story is NOT marked blocked. +#[test] +fn per_session_counting_does_not_terminate_under_limit() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_turns = 10 +"#, + ); + + // 3 prior sessions with 5 turns each = 15 total (above max_turns=10). + write_fake_session_log(root, "story_d", "coder-1", "old-sess-1", 5); + write_fake_session_log(root, "story_d", "coder-1", "old-sess-2", 5); + write_fake_session_log(root, "story_d", "coder-1", "old-sess-3", 5); + + // New running session has only 3 turns (under limit of 10). + write_fake_session_log(root, "story_d", "coder-1", "new-sess", 3); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_session("story_d", "coder-1", AgentStatus::Running, "new-sess"); + + let found = pool.run_watchdog_pass(Some(root)); + assert_eq!( + found, 0, + "agent under per-session limit should NOT be terminated" + ); + + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("story_d", "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!( + agent.status, + AgentStatus::Running, + "agent under per-session limit should stay Running" + ); + assert!(agent.termination_reason.is_none()); + } +} + +/// Same setup as per_session_counting_does_not_terminate_under_limit, but +/// the new agent's own session log exceeds `max_turns`. Assert the agent +/// IS terminated AND (with max_retries=1) the story IS marked blocked. +#[test] +fn per_session_counting_terminates_over_limit() { + crate::db::ensure_content_store(); + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +max_retries = 1 + +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_turns = 10 +"#, + ); + + let story_id = "story_e_per_session"; + crate::db::write_content(story_id, "---\nname: Per-Session Test\n---\n"); + + // Prior session with 5 turns (under limit alone). + write_fake_session_log(root, story_id, "coder-1", "old-sess", 5); + + // Current session with 12 turns (over limit). + write_fake_session_log(root, story_id, "coder-1", "new-sess", 12); + + let pool = AgentPool::new_test(3001); + let tx = + pool.inject_test_agent_with_session(story_id, "coder-1", AgentStatus::Running, "new-sess"); + let mut rx = tx.subscribe(); + + let found = pool.run_watchdog_pass(Some(root)); + assert!( + found >= 1, + "agent over per-session limit must be terminated" + ); + + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key(story_id, "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!(agent.status, AgentStatus::Failed); + assert_eq!(agent.termination_reason, Some(TerminationReason::TurnLimit),); + } + + let event = rx.try_recv().expect("watchdog must emit an Error event"); + assert!(matches!(event, AgentEvent::Error { .. })); + + // With max_retries=1, the story is blocked. + let updated = crate::db::read_content(story_id).unwrap(); + assert!( + updated.contains("blocked: true"), + "story must be blocked after per-session overrun with max_retries=1" + ); +} + +// ── Retry semantic integration test (bug 650) ─────────────────────────── + +/// With `max_retries = 3`, simulate separate sessions each exceeding +/// `max_turns`. After session 1: retry_count=1, NOT blocked. After +/// session 2: retry_count=2, NOT blocked. After session 3: +/// retry_count=3 >= max_retries, story IS blocked. +#[test] +fn watchdog_retry_semantic_blocks_after_max_retries() { + crate::db::ensure_content_store(); + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +max_retries = 3 + +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_turns = 10 +"#, + ); + + let story_id = "88_story_retry_watchdog"; + let initial = "---\nname: Retry Test\n---\n"; + crate::db::write_content(story_id, initial); + + // Session 1: exceeds limit → retry_count=1, NOT blocked. + { + write_fake_session_log(root, story_id, "coder-1", "session-1", 12); + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_session(story_id, "coder-1", AgentStatus::Running, "session-1"); + pool.run_watchdog_pass(Some(root)); + + let content = crate::db::read_content(story_id).unwrap(); + assert!( + content.contains("retry_count: 1"), + "after session 1, retry_count should be 1 — got:\n{content}" + ); + assert!( + !content.contains("blocked: true"), + "story should NOT be blocked after session 1" + ); + } + + // Session 2: exceeds limit → retry_count=2, NOT blocked. + { + write_fake_session_log(root, story_id, "coder-1", "session-2", 12); + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_session(story_id, "coder-1", AgentStatus::Running, "session-2"); + pool.run_watchdog_pass(Some(root)); + + let content = crate::db::read_content(story_id).unwrap(); + assert!( + content.contains("retry_count: 2"), + "after session 2, retry_count should be 2 — got:\n{content}" + ); + assert!( + !content.contains("blocked: true"), + "story should NOT be blocked after session 2" + ); + } + + // Session 3: exceeds limit → retry_count=3 >= max_retries(3), IS blocked. + { + write_fake_session_log(root, story_id, "coder-1", "session-3", 12); + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_session(story_id, "coder-1", AgentStatus::Running, "session-3"); + pool.run_watchdog_pass(Some(root)); + + let content = crate::db::read_content(story_id).unwrap(); + assert!( + content.contains("blocked: true"), + "story must be blocked after session 3 (retry_count=3 >= max_retries=3) — got:\n{content}" + ); + } +} diff --git a/server/src/agents/pool/auto_assign/watchdog/tests/mod.rs b/server/src/agents/pool/auto_assign/watchdog/tests/mod.rs new file mode 100644 index 00000000..318c55b8 --- /dev/null +++ b/server/src/agents/pool/auto_assign/watchdog/tests/mod.rs @@ -0,0 +1,70 @@ +//! Shared test helpers for the watchdog module. + +use std::path::Path; + +mod limits_tests; +mod orphan_tests; + +/// Write a fake session log file with `n` assistant turn entries. +/// +/// The file is named `{agent_name}-{session_id}.log` to match the +/// real naming convention used by `AgentLogWriter`. +pub(super) fn write_fake_session_log( + project_root: &Path, + story_id: &str, + agent_name: &str, + session_id: &str, + n_turns: u64, +) { + let log_dir = project_root.join(".huskies").join("logs").join(story_id); + std::fs::create_dir_all(&log_dir).unwrap(); + let log_path = log_dir.join(format!("{agent_name}-{session_id}.log")); + let mut content = String::new(); + for _ in 0..n_turns { + content.push_str( + &serde_json::to_string(&serde_json::json!({ + "timestamp": "2026-04-25T00:00:00Z", + "type": "agent_json", + "story_id": story_id, + "agent_name": agent_name, + "data": { "type": "assistant", "message": {} } + })) + .unwrap(), + ); + content.push('\n'); + } + std::fs::write(log_path, content).unwrap(); +} + +/// Write a fake session log containing a `result` event with the given cost. +/// +/// Used to test budget enforcement via the watchdog's per-session log +/// reading (not `token_usage.jsonl`). +pub(super) fn write_fake_budget_session_log( + project_root: &Path, + story_id: &str, + agent_name: &str, + session_id: &str, + cost_usd: f64, +) { + let log_dir = project_root.join(".huskies").join("logs").join(story_id); + std::fs::create_dir_all(&log_dir).unwrap(); + let log_path = log_dir.join(format!("{agent_name}-{session_id}.log")); + let content = serde_json::to_string(&serde_json::json!({ + "timestamp": "2026-04-25T00:00:00Z", + "type": "agent_json", + "story_id": story_id, + "agent_name": agent_name, + "data": { "type": "result", "total_cost_usd": cost_usd } + })) + .unwrap() + + "\n"; + std::fs::write(log_path, content).unwrap(); +} + +/// Write a minimal project.toml with the given agent config. +pub(super) fn write_project_config(project_root: &Path, config_toml: &str) { + let huskies_dir = project_root.join(".huskies"); + std::fs::create_dir_all(&huskies_dir).unwrap(); + std::fs::write(huskies_dir.join("project.toml"), config_toml).unwrap(); +} diff --git a/server/src/agents/pool/auto_assign/watchdog/tests/orphan_tests.rs b/server/src/agents/pool/auto_assign/watchdog/tests/orphan_tests.rs new file mode 100644 index 00000000..1a1d9f7a --- /dev/null +++ b/server/src/agents/pool/auto_assign/watchdog/tests/orphan_tests.rs @@ -0,0 +1,112 @@ +//! Orphan-detection tests for the watchdog (bug 161). + +use super::super::super::super::{AgentPool, composite_key}; +use super::super::orphan::check_orphaned_agents; +use crate::agents::{AgentEvent, AgentStatus}; + +// ── check_orphaned_agents return value tests (bug 161) ────────────────── + +#[tokio::test] +async fn check_orphaned_agents_returns_count_of_orphaned_agents() { + let pool = AgentPool::new_test(3001); + + // Spawn two tasks that finish immediately. + let h1 = tokio::spawn(async {}); + let h2 = tokio::spawn(async {}); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + assert!(h1.is_finished()); + assert!(h2.is_finished()); + + pool.inject_test_agent_with_handle("story_a", "coder", AgentStatus::Running, h1); + pool.inject_test_agent_with_handle("story_b", "coder", AgentStatus::Running, h2); + + let found = check_orphaned_agents(&pool.agents); + assert_eq!(found, 2, "should detect both orphaned agents"); +} + +#[test] +fn check_orphaned_agents_returns_zero_when_no_orphans() { + let pool = AgentPool::new_test(3001); + // Inject agents in terminal states — not orphaned. + pool.inject_test_agent("story_a", "coder", AgentStatus::Completed); + pool.inject_test_agent("story_b", "qa", AgentStatus::Failed); + + let found = check_orphaned_agents(&pool.agents); + assert_eq!( + found, 0, + "no orphans should be detected for terminal agents" + ); +} + +#[tokio::test] +async fn watchdog_detects_orphaned_running_agent() { + let pool = AgentPool::new_test(3001); + + let handle = tokio::spawn(async {}); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + assert!( + handle.is_finished(), + "task should be finished before injection" + ); + + let tx = + pool.inject_test_agent_with_handle("orphan_story", "coder", AgentStatus::Running, handle); + let mut rx = tx.subscribe(); + + pool.run_watchdog_once(); + + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("orphan_story", "coder"); + let agent = agents.get(&key).unwrap(); + assert_eq!( + agent.status, + AgentStatus::Failed, + "watchdog must mark an orphaned Running agent as Failed" + ); + } + + let event = rx.try_recv().expect("watchdog must emit an Error event"); + assert!( + matches!(event, AgentEvent::Error { .. }), + "expected AgentEvent::Error, got: {event:?}" + ); +} + +#[tokio::test] +async fn watchdog_orphan_detection_returns_nonzero_enabling_auto_assign() { + // This test verifies the contract that `check_orphaned_agents` returns + // a non-zero count when orphans exist, which the watchdog uses to + // decide whether to trigger auto-assign (bug 161). + let pool = AgentPool::new_test(3001); + + let handle = tokio::spawn(async {}); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + pool.inject_test_agent_with_handle("orphan_story", "coder", AgentStatus::Running, handle); + + // Before watchdog: agent is Running. + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("orphan_story", "coder"); + assert_eq!(agents.get(&key).unwrap().status, AgentStatus::Running); + } + + // Run watchdog pass — should return 1 (orphan found). + let found = check_orphaned_agents(&pool.agents); + assert_eq!( + found, 1, + "watchdog must return 1 for a single orphaned agent" + ); + + // After watchdog: agent is Failed. + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("orphan_story", "coder"); + assert_eq!( + agents.get(&key).unwrap().status, + AgentStatus::Failed, + "orphaned agent must be marked Failed" + ); + } +}