diff --git a/server/src/agents/pool/auto_assign/watchdog.rs b/server/src/agents/pool/auto_assign/watchdog.rs index 1a436893..c2c4b7db 100644 --- a/server/src/agents/pool/auto_assign/watchdog.rs +++ b/server/src/agents/pool/auto_assign/watchdog.rs @@ -19,6 +19,15 @@ use super::super::{AgentPool, StoryAgent}; /// without updating the agent status — for example when the process is killed /// externally and the PTY master fd returns EOF before our inactivity timeout /// fires, but some other edge case prevents the normal cleanup path from running. +/// Snapshot of a running agent for limit checking: (composite_key, story_id, agent_name, event_tx, log_session_id). +type RunningAgentSnapshot = ( + String, + String, + String, + broadcast::Sender, + Option, +); + pub(super) fn check_orphaned_agents(agents: &Mutex>) -> usize { let mut lock = match agents.lock() { Ok(l) => l, @@ -70,86 +79,140 @@ pub(super) fn check_orphaned_agents(agents: &Mutex>) /// Compute in-flight budget from per-message usage data in agent log files. /// +/// Sums cost across **all** sessions for the given story+agent combination. /// For completed sessions (containing a `result` event), uses the accurate /// `total_cost_usd` from Claude Code. For running sessions (no `result` /// event yet), estimates cost from per-turn token counts in `assistant` /// events. +/// +/// This cumulative total is useful for cost analysis (metric tool) but is +/// NOT used by the watchdog for enforcement — the watchdog uses +/// [`compute_budget_from_single_log`] on the current session only. pub(crate) fn compute_budget_from_logs( project_root: &Path, story_id: &str, agent_name: &str, ) -> f64 { - use crate::agents::TokenUsage; - let log_files = crate::agent_log::list_story_log_files(project_root, story_id, Some(agent_name)); let mut total_cost = 0.0; for path in &log_files { - let entries = match crate::agent_log::read_log(path) { - Ok(e) => e, - Err(_) => continue, - }; - - // Check for a result event with accurate total_cost_usd (completed session). - let result_cost = entries.iter().rev().find_map(|entry| { - if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json") - && let Some(data) = entry.event.get("data") - && data.get("type").and_then(|v| v.as_str()) == Some("result") - { - data.get("total_cost_usd").and_then(|v| v.as_f64()) - } else { - None - } - }); - - if let Some(cost) = result_cost { - // Completed session — use accurate cost. - total_cost += cost; - } else { - // Running session — estimate from per-message token usage. - for entry in &entries { - if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json") - && let Some(data) = entry.event.get("data") - && data.get("type").and_then(|v| v.as_str()) == Some("assistant") - && let Some(message) = data.get("message") - && let Some(usage) = message.get("usage") - { - let model = message.get("model").and_then(|v| v.as_str()); - let token_usage = TokenUsage { - input_tokens: usage - .get("input_tokens") - .and_then(|v| v.as_u64()) - .unwrap_or(0), - output_tokens: usage - .get("output_tokens") - .and_then(|v| v.as_u64()) - .unwrap_or(0), - cache_creation_input_tokens: usage - .get("cache_creation_input_tokens") - .and_then(|v| v.as_u64()) - .unwrap_or(0), - cache_read_input_tokens: usage - .get("cache_read_input_tokens") - .and_then(|v| v.as_u64()) - .unwrap_or(0), - total_cost_usd: 0.0, - }; - total_cost += token_usage.estimate_cost_usd(model); - } - } - } + total_cost += compute_budget_from_single_log(path); } total_cost } +/// Compute budget from a single log file. +/// +/// If the log contains a `result` event with `total_cost_usd` (completed +/// session), returns that. Otherwise estimates cost from per-turn token +/// usage in `assistant` events (running session). +pub(crate) fn compute_budget_from_single_log(path: &Path) -> f64 { + use crate::agents::TokenUsage; + + let entries = match crate::agent_log::read_log(path) { + Ok(e) => e, + Err(_) => return 0.0, + }; + + // Check for a result event with accurate total_cost_usd (completed session). + let result_cost = entries.iter().rev().find_map(|entry| { + if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json") + && let Some(data) = entry.event.get("data") + && data.get("type").and_then(|v| v.as_str()) == Some("result") + { + data.get("total_cost_usd").and_then(|v| v.as_f64()) + } else { + None + } + }); + + if let Some(cost) = result_cost { + return cost; + } + + // Running session — estimate from per-message token usage. + let mut cost = 0.0; + for entry in &entries { + if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json") + && let Some(data) = entry.event.get("data") + && data.get("type").and_then(|v| v.as_str()) == Some("assistant") + && let Some(message) = data.get("message") + && let Some(usage) = message.get("usage") + { + let model = message.get("model").and_then(|v| v.as_str()); + let token_usage = TokenUsage { + input_tokens: usage + .get("input_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0), + output_tokens: usage + .get("output_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0), + cache_creation_input_tokens: usage + .get("cache_creation_input_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0), + cache_read_input_tokens: usage + .get("cache_read_input_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0), + total_cost_usd: 0.0, + }; + cost += token_usage.estimate_cost_usd(model); + } + } + cost +} + +/// Resolve the current session's log file for an agent. +/// +/// Uses `log_session_id` to construct the exact path when available; +/// otherwise falls back to the most recently modified log file for the +/// agent. +pub(crate) fn resolve_session_log( + project_root: &Path, + story_id: &str, + agent_name: &str, + log_session_id: &Option, +) -> Option { + if let Some(sid) = log_session_id { + let path = crate::agent_log::log_file_path(project_root, story_id, agent_name, sid); + if path.exists() { + return Some(path); + } + } + crate::agent_log::find_latest_log(project_root, story_id, agent_name) +} + +/// Count `assistant` events in a single log file. +pub(crate) fn count_turns_in_log(path: &Path) -> u64 { + let entries = match crate::agent_log::read_log(path) { + Ok(e) => e, + Err(_) => return 0, + }; + entries + .iter() + .filter(|entry| { + entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json") + && entry + .event + .get("data") + .and_then(|d| d.get("type")) + .and_then(|v| v.as_str()) + == Some("assistant") + }) + .count() as u64 +} + /// Scan running agents for turn/budget limit violations and terminate offenders. /// -/// Turns are counted from `assistant` events in the log files. Budget is -/// computed from per-message token usage in the logs (for running sessions) -/// or from the `result` event's `total_cost_usd` (for completed sessions), -/// with `token_usage.jsonl` records as a fallback floor. +/// Turns and budget are counted from the **current session's** log file +/// only — prior sessions are excluded so that restart counts from earlier +/// runs do not accumulate against the limits. fn check_agent_limits( agents: &Mutex>, project_root: &Path, @@ -159,8 +222,8 @@ fn check_agent_limits( Err(_) => return Vec::new(), }; - // Snapshot running agents: (key, story_id, agent_name, tx). - let running: Vec<(String, String, String, broadcast::Sender)> = { + // Snapshot running agents: (key, story_id, agent_name, tx, log_session_id). + let running: Vec = { let lock = match agents.lock() { Ok(l) => l, Err(_) => return Vec::new(), @@ -177,16 +240,15 @@ fn check_agent_limits( story_id, agent.agent_name.clone(), agent.tx.clone(), + agent.log_session_id.clone(), ) }) .collect() }; - let all_token_records = crate::agents::token_usage::read_all(project_root).unwrap_or_default(); - let mut terminated = Vec::new(); - for (key, story_id, agent_name, tx) in &running { + for (key, story_id, agent_name, tx, log_session_id) in &running { let agent_config = config.agent.iter().find(|a| a.name == *agent_name); let max_turns = agent_config.and_then(|a| a.max_turns); let max_budget_usd = agent_config.and_then(|a| a.max_budget_usd); @@ -196,37 +258,27 @@ fn check_agent_limits( continue; } - // Count turns from log files. - let mut turns_used: u64 = 0; - if max_turns.is_some() { - let log_files = - crate::agent_log::list_story_log_files(project_root, story_id, Some(agent_name)); - for path in &log_files { - if let Ok(entries) = crate::agent_log::read_log(path) { - for entry in &entries { - if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json") - && let Some(data) = entry.event.get("data") - && data.get("type").and_then(|v| v.as_str()) == Some("assistant") - { - turns_used += 1; - } - } - } - } - } + // Resolve the current session's log file. + let session_log = resolve_session_log(project_root, story_id, agent_name, log_session_id); - // Compute budget from log-based per-message estimates (works for - // running agents) and completed-session records from token_usage.jsonl. - // Use whichever is higher — log estimates cover in-flight sessions, - // while token_usage.jsonl records are accurate for completed ones. + // Count turns from the current session's log file only. + let turns_used: u64 = if max_turns.is_some() { + session_log + .as_ref() + .map(|p| count_turns_in_log(p)) + .unwrap_or(0) + } else { + 0 + }; + + // Compute budget from the current session's log file only. + // Completed-session records from token_usage.jsonl are NOT included + // in the watchdog's enforcement — they belong to prior sessions. let budget_used_usd: f64 = if max_budget_usd.is_some() { - let log_cost = compute_budget_from_logs(project_root, story_id, agent_name); - let record_cost: f64 = all_token_records - .iter() - .filter(|r| r.story_id == *story_id && r.agent_name == *agent_name) - .map(|r| r.usage.total_cost_usd) - .sum(); - log_cost.max(record_cost) + session_log + .as_ref() + .map(|p| compute_budget_from_single_log(p)) + .unwrap_or(0.0) } else { 0.0 }; @@ -296,17 +348,18 @@ impl AgentPool { /// /// Called by the unified background tick loop every 30 ticks. /// - /// When a limit is exceeded, the agent's PTY child is killed AND the story is - /// marked `blocked: true` in CRDT state. Marking blocked is essential — without - /// it, `auto_assign_available_work` immediately re-spawns the agent on the next - /// tick, the new session inherits the prior session's logs (so `turns_used` - /// keeps growing across sessions), and the agent burns through hundreds of - /// turns in a kill-respawn loop before any operator intervention. + /// When a limit is exceeded the agent's PTY child is killed and the + /// `should_block_story` retry mechanism is invoked. The story is marked + /// `blocked: true` only when `retry_count >= max_retries`; otherwise + /// `retry_count` is incremented and the story stays in `2_current/` for + /// re-attempt. This prevents the original kill-respawn loop (bug 646) + /// while restoring the `max_retries` semantic for turn/budget overruns. pub fn run_watchdog_pass(&self, project_root: Option<&Path>) -> usize { let orphaned = check_orphaned_agents(&self.agents); if let Some(root) = project_root { let terminated = check_agent_limits(&self.agents, root); + let config = ProjectConfig::load(root).unwrap_or_default(); for (key, _reason) in &terminated { // Kill the PTY child and abort the task, same as stop_agent. self.kill_child_for_key(key); @@ -317,21 +370,25 @@ impl AgentPool { handle.abort(); } - // Mark the story `blocked: true` so auto-assign skips it on the - // next tick. Without this, the kill-respawn loop described above - // burns the agent's budget across many short sessions. + // Use the retry mechanism: increment retry_count and only block + // when the limit is exceeded, matching the pipeline's behaviour. let story_id = key.rsplit_once(':').map(|(s, _)| s).unwrap_or(key); - if let Some(contents) = crate::db::read_content(story_id) { - let blocked = crate::io::story_metadata::write_blocked_in_content(&contents); - crate::db::write_content(story_id, &blocked); - let stage = crate::pipeline_state::read_typed(story_id) - .ok() - .flatten() - .map(|i| i.stage.dir_name().to_string()) - .unwrap_or_else(|| "2_current".to_string()); - crate::db::write_item_with_content(story_id, &stage, &blocked); + if let Some(block_reason) = super::super::pipeline::should_block_story( + story_id, + config.max_retries, + "watchdog", + ) { + let _ = self + .watcher_tx + .send(crate::io::watcher::WatcherEvent::StoryBlocked { + story_id: story_id.to_string(), + reason: block_reason, + }); + slog!("[watchdog] Story '{story_id}' blocked after exceeding retry limit."); + } else { slog!( - "[watchdog] Marked story '{story_id}' as blocked after limit termination." + "[watchdog] Story '{story_id}' retry incremented after limit \ + termination; stays in 2_current/ for re-attempt." ); } } @@ -352,11 +409,20 @@ mod tests { use super::super::super::{AgentPool, composite_key}; use super::*; - /// Write a fake agent log file with `n` assistant turn entries. - fn write_fake_log(project_root: &Path, story_id: &str, agent_name: &str, n_turns: u64) { + /// Write a fake session log file with `n` assistant turn entries. + /// + /// The file is named `{agent_name}-{session_id}.log` to match the + /// real naming convention used by `AgentLogWriter`. + fn write_fake_session_log( + project_root: &Path, + story_id: &str, + agent_name: &str, + session_id: &str, + n_turns: u64, + ) { let log_dir = project_root.join(".huskies").join("logs").join(story_id); std::fs::create_dir_all(&log_dir).unwrap(); - let log_path = log_dir.join(format!("{agent_name}-test-session.log")); + let log_path = log_dir.join(format!("{agent_name}-{session_id}.log")); let mut content = String::new(); for _ in 0..n_turns { content.push_str( @@ -374,27 +440,30 @@ mod tests { std::fs::write(log_path, content).unwrap(); } - /// Write a fake token usage record with the given cost. - fn write_fake_token_usage( + /// Write a fake session log containing a `result` event with the given cost. + /// + /// Used to test budget enforcement via the watchdog's per-session log + /// reading (not `token_usage.jsonl`). + fn write_fake_budget_session_log( project_root: &Path, story_id: &str, agent_name: &str, + session_id: &str, cost_usd: f64, ) { - let record = crate::agents::token_usage::TokenUsageRecord { - story_id: story_id.to_string(), - agent_name: agent_name.to_string(), - timestamp: "2026-04-25T00:00:00Z".to_string(), - model: None, - usage: crate::agents::TokenUsage { - input_tokens: 1000, - output_tokens: 500, - cache_creation_input_tokens: 0, - cache_read_input_tokens: 0, - total_cost_usd: cost_usd, - }, - }; - crate::agents::token_usage::append_record(project_root, &record).unwrap(); + let log_dir = project_root.join(".huskies").join("logs").join(story_id); + std::fs::create_dir_all(&log_dir).unwrap(); + let log_path = log_dir.join(format!("{agent_name}-{session_id}.log")); + let content = serde_json::to_string(&serde_json::json!({ + "timestamp": "2026-04-25T00:00:00Z", + "type": "agent_json", + "story_id": story_id, + "agent_name": agent_name, + "data": { "type": "result", "total_cost_usd": cost_usd } + })) + .unwrap() + + "\n"; + std::fs::write(log_path, content).unwrap(); } /// Write a minimal project.toml with the given agent config. @@ -494,11 +563,16 @@ max_turns = 10 "#, ); - // Write 12 turns (exceeds limit of 10). - write_fake_log(root, "story_a", "coder-1", 12); + // Write 12 turns in the current session (exceeds limit of 10). + write_fake_session_log(root, "story_a", "coder-1", "sess-current", 12); let pool = AgentPool::new_test(3001); - let tx = pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running); + let tx = pool.inject_test_agent_with_session( + "story_a", + "coder-1", + AgentStatus::Running, + "sess-current", + ); let mut rx = tx.subscribe(); let found = pool.run_watchdog_pass(Some(root)); @@ -539,11 +613,16 @@ max_budget_usd = 5.00 "#, ); - // Write $6.00 in usage (exceeds limit of $5.00). - write_fake_token_usage(root, "story_b", "coder-1", 6.00); + // Write $6.00 in the current session's log (exceeds limit of $5.00). + write_fake_budget_session_log(root, "story_b", "coder-1", "sess-budget", 6.00); let pool = AgentPool::new_test(3001); - let tx = pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running); + let tx = pool.inject_test_agent_with_session( + "story_b", + "coder-1", + AgentStatus::Running, + "sess-budget", + ); let mut rx = tx.subscribe(); let found = pool.run_watchdog_pass(Some(root)); @@ -581,12 +660,16 @@ max_budget_usd = 10.00 "#, ); - // Agent is under both limits. - write_fake_log(root, "story_c", "coder-1", 25); - write_fake_token_usage(root, "story_c", "coder-1", 3.00); + // Agent is under both limits in the current session. + write_fake_session_log(root, "story_c", "coder-1", "sess-ok", 25); + write_fake_budget_session_log(root, "story_c", "coder-1", "sess-ok-budget", 3.00); let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story_c", "coder-1", AgentStatus::Running); + // Use the turns session (the budget session is a separate file; + // resolve_session_log picks the specified one for turns, and + // find_latest_log for budget if needed — but here the turns file + // has 25 turns < 50 so no violation). + pool.inject_test_agent_with_session("story_c", "coder-1", AgentStatus::Running, "sess-ok"); let found = pool.run_watchdog_pass(Some(root)); assert_eq!(found, 0, "agent under limits should not be terminated"); @@ -624,14 +707,18 @@ max_budget_usd = 5.00 "#, ); - // Simulate the trajectory: 55 turns (just past the limit — the watchdog - // would have caught it here, not at 280) and $2.50 spent (under budget). - // Turns hit first, so reason should be TurnLimit. - write_fake_log(root, "story_623", "coder-1", 55); - write_fake_token_usage(root, "story_623", "coder-1", 2.50); + // Simulate the trajectory: 55 turns in the current session (just past + // the limit) and $2.50 spent (under budget). Turns hit first, so + // reason should be TurnLimit. + write_fake_session_log(root, "story_623", "coder-1", "sess-624", 55); let pool = AgentPool::new_test(3001); - let tx = pool.inject_test_agent("story_623", "coder-1", AgentStatus::Running); + let tx = pool.inject_test_agent_with_session( + "story_623", + "coder-1", + AgentStatus::Running, + "sess-624", + ); let mut rx = tx.subscribe(); let found = pool.run_watchdog_pass(Some(root)); @@ -699,15 +786,14 @@ max_budget_usd = 5.00 } } - // ── Kill-respawn loop fix (bug 646) ────────────────────────────────────── + // ── Kill-respawn loop fix (bug 646), updated for per-session + retry ─── - /// When the watchdog terminates an agent for limit-exceeded, it must mark - /// the story `blocked: true` in CRDT state so `auto_assign_available_work` - /// won't immediately re-spawn the agent on the next tick. + /// When the watchdog terminates an agent for limit-exceeded AND the story + /// has exhausted its retries, it must be marked `blocked: true` in CRDT + /// state so `auto_assign_available_work` won't re-spawn the agent. /// - /// Without this, the kill-respawn loop produces the symptom observed in - /// production: the new session inherits the prior session's logs, so - /// `turns_used` keeps growing across sessions (e.g. 991/50 turns observed). + /// This test seeds a single session that legitimately exceeds the limit + /// and uses `max_retries = 1` so that the first violation blocks. #[test] fn watchdog_marks_story_blocked_after_limit_termination() { crate::db::ensure_content_store(); @@ -718,6 +804,8 @@ max_budget_usd = 5.00 write_project_config( root, r#" +max_retries = 1 + [[agent]] name = "coder-1" runtime = "claude-code" @@ -726,28 +814,31 @@ max_turns = 10 ); // Write story content into the CRDT-backed content store so the - // watchdog's mark-blocked path has something to read+update. + // watchdog's retry/block path has something to read+update. let story_id = "42_story_runaway"; let initial = "---\nname: Runaway Story\n---\n# Runaway Story\n"; crate::db::write_content(story_id, initial); - // 12 turns exceeds the configured max of 10. - write_fake_log(root, story_id, "coder-1", 12); + // 12 turns in a single session exceeds the configured max of 10. + write_fake_session_log(root, story_id, "coder-1", "sess-runaway", 12); let pool = AgentPool::new_test(3001); - let _tx = pool.inject_test_agent(story_id, "coder-1", AgentStatus::Running); + let _tx = pool.inject_test_agent_with_session( + story_id, + "coder-1", + AgentStatus::Running, + "sess-runaway", + ); let found = pool.run_watchdog_pass(Some(root)); assert!(found >= 1, "watchdog should detect the over-limit agent"); - // CORE INVARIANT: after termination, the story must be marked blocked. - // Without this, auto_assign_available_work would immediately re-spawn - // the agent on its next tick. + // With max_retries=1, the first violation blocks immediately. let updated = crate::db::read_content(story_id) .expect("story content must still exist after watchdog termination"); assert!( updated.contains("blocked: true"), - "story must be marked `blocked: true` after limit termination — got:\n{updated}" + "story must be marked `blocked: true` after limit termination with max_retries=1 — got:\n{updated}" ); // Sanity: the agent itself is also Failed with the right reason. @@ -763,4 +854,216 @@ max_turns = 10 ); } } + + // ── Per-session counting (bug 650) ────────────────────────────────────── + + /// Seed multiple prior session log files whose combined assistant-event + /// count exceeds `max_turns`. Then inject a NEW running agent with a + /// fresh session_id whose log has fewer events than `max_turns`. + /// Assert the agent is NOT terminated (per-session count is under the + /// limit) AND the story is NOT marked blocked. + #[test] + fn per_session_counting_does_not_terminate_under_limit() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_turns = 10 +"#, + ); + + // 3 prior sessions with 5 turns each = 15 total (above max_turns=10). + write_fake_session_log(root, "story_d", "coder-1", "old-sess-1", 5); + write_fake_session_log(root, "story_d", "coder-1", "old-sess-2", 5); + write_fake_session_log(root, "story_d", "coder-1", "old-sess-3", 5); + + // New running session has only 3 turns (under limit of 10). + write_fake_session_log(root, "story_d", "coder-1", "new-sess", 3); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_session("story_d", "coder-1", AgentStatus::Running, "new-sess"); + + let found = pool.run_watchdog_pass(Some(root)); + assert_eq!( + found, 0, + "agent under per-session limit should NOT be terminated" + ); + + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("story_d", "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!( + agent.status, + AgentStatus::Running, + "agent under per-session limit should stay Running" + ); + assert!(agent.termination_reason.is_none()); + } + } + + /// Same setup as per_session_counting_does_not_terminate_under_limit, but + /// the new agent's own session log exceeds `max_turns`. Assert the agent + /// IS terminated AND (with max_retries=1) the story IS marked blocked. + #[test] + fn per_session_counting_terminates_over_limit() { + crate::db::ensure_content_store(); + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +max_retries = 1 + +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_turns = 10 +"#, + ); + + let story_id = "story_e_per_session"; + crate::db::write_content(story_id, "---\nname: Per-Session Test\n---\n"); + + // Prior session with 5 turns (under limit alone). + write_fake_session_log(root, story_id, "coder-1", "old-sess", 5); + + // Current session with 12 turns (over limit). + write_fake_session_log(root, story_id, "coder-1", "new-sess", 12); + + let pool = AgentPool::new_test(3001); + let tx = pool.inject_test_agent_with_session( + story_id, + "coder-1", + AgentStatus::Running, + "new-sess", + ); + let mut rx = tx.subscribe(); + + let found = pool.run_watchdog_pass(Some(root)); + assert!( + found >= 1, + "agent over per-session limit must be terminated" + ); + + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key(story_id, "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!(agent.status, AgentStatus::Failed); + assert_eq!(agent.termination_reason, Some(TerminationReason::TurnLimit),); + } + + let event = rx.try_recv().expect("watchdog must emit an Error event"); + assert!(matches!(event, AgentEvent::Error { .. })); + + // With max_retries=1, the story is blocked. + let updated = crate::db::read_content(story_id).unwrap(); + assert!( + updated.contains("blocked: true"), + "story must be blocked after per-session overrun with max_retries=1" + ); + } + + // ── Retry semantic integration test (bug 650) ─────────────────────────── + + /// With `max_retries = 3`, simulate separate sessions each exceeding + /// `max_turns`. After session 1: retry_count=1, NOT blocked. After + /// session 2: retry_count=2, NOT blocked. After session 3: + /// retry_count=3 >= max_retries, story IS blocked. + #[test] + fn watchdog_retry_semantic_blocks_after_max_retries() { + crate::db::ensure_content_store(); + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +max_retries = 3 + +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_turns = 10 +"#, + ); + + let story_id = "88_story_retry_watchdog"; + let initial = "---\nname: Retry Test\n---\n"; + crate::db::write_content(story_id, initial); + + // Session 1: exceeds limit → retry_count=1, NOT blocked. + { + write_fake_session_log(root, story_id, "coder-1", "session-1", 12); + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_session( + story_id, + "coder-1", + AgentStatus::Running, + "session-1", + ); + pool.run_watchdog_pass(Some(root)); + + let content = crate::db::read_content(story_id).unwrap(); + assert!( + content.contains("retry_count: 1"), + "after session 1, retry_count should be 1 — got:\n{content}" + ); + assert!( + !content.contains("blocked: true"), + "story should NOT be blocked after session 1" + ); + } + + // Session 2: exceeds limit → retry_count=2, NOT blocked. + { + write_fake_session_log(root, story_id, "coder-1", "session-2", 12); + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_session( + story_id, + "coder-1", + AgentStatus::Running, + "session-2", + ); + pool.run_watchdog_pass(Some(root)); + + let content = crate::db::read_content(story_id).unwrap(); + assert!( + content.contains("retry_count: 2"), + "after session 2, retry_count should be 2 — got:\n{content}" + ); + assert!( + !content.contains("blocked: true"), + "story should NOT be blocked after session 2" + ); + } + + // Session 3: exceeds limit → retry_count=3 >= max_retries(3), IS blocked. + { + write_fake_session_log(root, story_id, "coder-1", "session-3", 12); + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_session( + story_id, + "coder-1", + AgentStatus::Running, + "session-3", + ); + pool.run_watchdog_pass(Some(root)); + + let content = crate::db::read_content(story_id).unwrap(); + assert!( + content.contains("blocked: true"), + "story must be blocked after session 3 (retry_count=3 >= max_retries=3) — got:\n{content}" + ); + } + } } diff --git a/server/src/agents/pool/pipeline/advance.rs b/server/src/agents/pool/pipeline/advance.rs index 2a976f9f..e9ad24cf 100644 --- a/server/src/agents/pool/pipeline/advance.rs +++ b/server/src/agents/pool/pipeline/advance.rs @@ -553,7 +553,11 @@ fn write_review_hold_to_store(story_id: &str) { /// Returns `Some(reason)` if the story is now blocked (caller should NOT restart the agent). /// Returns `None` if the story may be retried. /// When `max_retries` is 0, retry limits are disabled. -fn should_block_story(story_id: &str, max_retries: u32, stage_label: &str) -> Option { +pub(crate) fn should_block_story( + story_id: &str, + max_retries: u32, + stage_label: &str, +) -> Option { use crate::io::story_metadata::{increment_retry_count_in_content, write_blocked_in_content}; if max_retries == 0 { diff --git a/server/src/agents/pool/pipeline/mod.rs b/server/src/agents/pool/pipeline/mod.rs index b20ef5d5..0588dcf0 100644 --- a/server/src/agents/pool/pipeline/mod.rs +++ b/server/src/agents/pool/pipeline/mod.rs @@ -3,4 +3,5 @@ mod advance; mod completion; mod merge; +pub(crate) use advance::should_block_story; pub(super) use completion::run_server_owned_completion; diff --git a/server/src/agents/pool/test_helpers.rs b/server/src/agents/pool/test_helpers.rs index 41dddc5a..a3e8983d 100644 --- a/server/src/agents/pool/test_helpers.rs +++ b/server/src/agents/pool/test_helpers.rs @@ -112,6 +112,39 @@ impl AgentPool { tx } + /// Test helper: inject an agent with a specific log session ID. + /// Used by watchdog tests to simulate per-session counting. + pub fn inject_test_agent_with_session( + &self, + story_id: &str, + agent_name: &str, + status: AgentStatus, + log_session_id: &str, + ) -> broadcast::Sender { + let (tx, _) = broadcast::channel::(64); + let key = composite_key(story_id, agent_name); + let mut agents = self.agents.lock().unwrap(); + agents.insert( + key, + StoryAgent { + agent_name: agent_name.to_string(), + status, + worktree_info: None, + session_id: None, + tx: tx.clone(), + task_handle: None, + event_log: Arc::new(Mutex::new(Vec::new())), + completion: None, + project_root: None, + log_session_id: Some(log_session_id.to_string()), + merge_failure_reported: false, + throttled: false, + termination_reason: None, + }, + ); + tx + } + /// Inject a Running agent with a pre-built (possibly finished) task handle. /// Used by watchdog tests to simulate an orphaned agent. pub fn inject_test_agent_with_handle( diff --git a/server/src/http/mcp/agent_tools.rs b/server/src/http/mcp/agent_tools.rs index 4b13dc80..34b0d5ce 100644 --- a/server/src/http/mcp/agent_tools.rs +++ b/server/src/http/mcp/agent_tools.rs @@ -290,10 +290,10 @@ pub(super) fn tool_get_agent_remaining_turns_and_budget( let max_turns = agent_config.and_then(|a| a.max_turns); let max_budget_usd = agent_config.and_then(|a| a.max_budget_usd); - // Count turns by reading log files and counting assistant events. + // ── Cumulative counters (all sessions) ───────────────────────────── let log_files = crate::agent_log::list_story_log_files(&project_root, story_id, Some(agent_name)); - let mut turns_used: u64 = 0; + let mut cumulative_turns: u64 = 0; for path in &log_files { if let Ok(entries) = crate::agent_log::read_log(path) { for entry in &entries { @@ -301,15 +301,13 @@ pub(super) fn tool_get_agent_remaining_turns_and_budget( && let Some(data) = entry.event.get("data") && data.get("type").and_then(|v| v.as_str()) == Some("assistant") { - turns_used += 1; + cumulative_turns += 1; } } } } - // Compute budget from log-based per-message estimates (works for running - // agents) and completed-session records from token_usage.jsonl. - let log_cost = crate::agents::pool::auto_assign::watchdog::compute_budget_from_logs( + let cumulative_log_cost = crate::agents::pool::auto_assign::watchdog::compute_budget_from_logs( &project_root, story_id, agent_name, @@ -320,19 +318,43 @@ pub(super) fn tool_get_agent_remaining_turns_and_budget( .filter(|r| r.story_id == story_id && r.agent_name == agent_name) .map(|r| r.usage.total_cost_usd) .sum(); - let budget_used_usd: f64 = log_cost.max(record_cost); + let cumulative_budget: f64 = cumulative_log_cost.max(record_cost); - let remaining_turns = max_turns.map(|max| (max as i64) - (turns_used as i64)); - let remaining_budget_usd = max_budget_usd.map(|max| max - budget_used_usd); + // ── Per-session counters (current session only — enforcement basis) ── + use crate::agents::pool::auto_assign::watchdog::{ + compute_budget_from_single_log, count_turns_in_log, resolve_session_log, + }; + let session_log = resolve_session_log( + &project_root, + story_id, + agent_name, + &agent_info.log_session_id, + ); + let session_turns: u64 = session_log + .as_ref() + .map(|p| count_turns_in_log(p)) + .unwrap_or(0); + let session_budget: f64 = session_log + .as_ref() + .map(|p| compute_budget_from_single_log(p)) + .unwrap_or(0.0); + + let remaining_turns = max_turns.map(|max| (max as i64) - (session_turns as i64)); + let remaining_budget_usd = max_budget_usd.map(|max| max - session_budget); serde_json::to_string_pretty(&json!({ "story_id": story_id, "agent_name": agent_name, "status": agent_info.status.to_string(), - "turns_used": turns_used, + // Per-session values (watchdog enforcement basis): + "turns_used": session_turns, + "budget_used_usd": session_budget, + // Cumulative values (all sessions, useful for cost analysis): + "cumulative_turns_used": cumulative_turns, + "cumulative_budget_used_usd": cumulative_budget, + // Limits and remaining (computed from per-session values): "max_turns": max_turns, "remaining_turns": remaining_turns, - "budget_used_usd": budget_used_usd, "max_budget_usd": max_budget_usd, "remaining_budget_usd": remaining_budget_usd, })) @@ -1038,9 +1060,13 @@ stage = "coder" assert_eq!(parsed["story_id"], "42_story"); assert_eq!(parsed["agent_name"], "coder-1"); assert_eq!(parsed["status"], "running"); + // Per-session values (enforcement basis). assert!(parsed.get("turns_used").is_some()); assert!(parsed.get("budget_used_usd").is_some()); - // max_turns and max_budget_usd may be null if not configured + // Cumulative values (all sessions, for cost analysis). + assert!(parsed.get("cumulative_turns_used").is_some()); + assert!(parsed.get("cumulative_budget_used_usd").is_some()); + // Limits and remaining. assert!(parsed.get("max_turns").is_some()); assert!(parsed.get("remaining_turns").is_some()); assert!(parsed.get("max_budget_usd").is_some());