From 148c88bd405b40204306109ab80611ae2309618c Mon Sep 17 00:00:00 2001 From: dave Date: Sun, 26 Apr 2026 13:07:02 +0000 Subject: [PATCH] huskies: merge 646_bug_watchdog_from_bug_624_is_not_actually_enforcing_max_turns_max_budget_usd_in_production --- server/src/agents/mod.rs | 23 ++- server/src/agents/pool/auto_assign/mod.rs | 2 +- .../src/agents/pool/auto_assign/watchdog.rs | 186 +++++++++++++++++- server/src/agents/pool/mod.rs | 2 +- server/src/http/mcp/agent_tools.rs | 11 +- 5 files changed, 212 insertions(+), 12 deletions(-) diff --git a/server/src/agents/mod.rs b/server/src/agents/mod.rs index 727123f5..7fd5e6d5 100644 --- a/server/src/agents/mod.rs +++ b/server/src/agents/mod.rs @@ -3,7 +3,7 @@ pub mod gates; pub mod lifecycle; pub mod local_prompt; pub mod merge; -mod pool; +pub(crate) mod pool; pub(crate) mod pty; pub mod runtime; pub mod token_usage; @@ -161,6 +161,27 @@ pub struct TokenUsage { } impl TokenUsage { + /// Estimate USD cost from token counts using approximate per-model pricing. + /// + /// Used by the watchdog to compute in-flight budget from per-message usage + /// data in the agent log (since `total_cost_usd` is only available in the + /// `result` event at session end). Uses conservative (high) pricing when + /// the model is unknown so budget limits are hit sooner rather than later. + pub fn estimate_cost_usd(&self, model: Option<&str>) -> f64 { + // Per-million-token pricing (input, output, cache_read, cache_create). + let (inp, out, cr, cc) = match model { + Some(m) if m.contains("haiku") => (0.80, 4.0, 0.08, 1.00), + Some(m) if m.contains("sonnet") => (3.0, 15.0, 0.30, 3.75), + // Opus or unknown → most expensive = conservative. + _ => (15.0, 75.0, 1.50, 18.75), + }; + (self.input_tokens as f64 * inp + + self.output_tokens as f64 * out + + self.cache_read_input_tokens as f64 * cr + + self.cache_creation_input_tokens as f64 * cc) + / 1_000_000.0 + } + /// Parse token usage from a Claude Code `result` JSON event. pub fn from_result_event(json: &serde_json::Value) -> Option { let usage = json.get("usage")?; diff --git a/server/src/agents/pool/auto_assign/mod.rs b/server/src/agents/pool/auto_assign/mod.rs index ff2daeb6..b2744941 100644 --- a/server/src/agents/pool/auto_assign/mod.rs +++ b/server/src/agents/pool/auto_assign/mod.rs @@ -5,7 +5,7 @@ mod auto_assign; mod reconcile; mod scan; mod story_checks; -mod watchdog; +pub(crate) mod watchdog; // Re-export items that were pub(super) in the original monolithic auto_assign.rs // so that pool::lifecycle and pool::pipeline continue to access them unchanged. diff --git a/server/src/agents/pool/auto_assign/watchdog.rs b/server/src/agents/pool/auto_assign/watchdog.rs index a2dcacbe..1a436893 100644 --- a/server/src/agents/pool/auto_assign/watchdog.rs +++ b/server/src/agents/pool/auto_assign/watchdog.rs @@ -68,11 +68,88 @@ pub(super) fn check_orphaned_agents(agents: &Mutex>) count } +/// Compute in-flight budget from per-message usage data in agent log files. +/// +/// For completed sessions (containing a `result` event), uses the accurate +/// `total_cost_usd` from Claude Code. For running sessions (no `result` +/// event yet), estimates cost from per-turn token counts in `assistant` +/// events. +pub(crate) fn compute_budget_from_logs( + project_root: &Path, + story_id: &str, + agent_name: &str, +) -> f64 { + use crate::agents::TokenUsage; + + let log_files = + crate::agent_log::list_story_log_files(project_root, story_id, Some(agent_name)); + let mut total_cost = 0.0; + + for path in &log_files { + let entries = match crate::agent_log::read_log(path) { + Ok(e) => e, + Err(_) => continue, + }; + + // Check for a result event with accurate total_cost_usd (completed session). + let result_cost = entries.iter().rev().find_map(|entry| { + if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json") + && let Some(data) = entry.event.get("data") + && data.get("type").and_then(|v| v.as_str()) == Some("result") + { + data.get("total_cost_usd").and_then(|v| v.as_f64()) + } else { + None + } + }); + + if let Some(cost) = result_cost { + // Completed session — use accurate cost. + total_cost += cost; + } else { + // Running session — estimate from per-message token usage. + for entry in &entries { + if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json") + && let Some(data) = entry.event.get("data") + && data.get("type").and_then(|v| v.as_str()) == Some("assistant") + && let Some(message) = data.get("message") + && let Some(usage) = message.get("usage") + { + let model = message.get("model").and_then(|v| v.as_str()); + let token_usage = TokenUsage { + input_tokens: usage + .get("input_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0), + output_tokens: usage + .get("output_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0), + cache_creation_input_tokens: usage + .get("cache_creation_input_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0), + cache_read_input_tokens: usage + .get("cache_read_input_tokens") + .and_then(|v| v.as_u64()) + .unwrap_or(0), + total_cost_usd: 0.0, + }; + total_cost += token_usage.estimate_cost_usd(model); + } + } + } + } + + total_cost +} + /// Scan running agents for turn/budget limit violations and terminate offenders. /// -/// Uses the same accessor path as `tool_get_agent_remaining_turns_and_budget`: -/// log files are counted via `agent_log::list_story_log_files` + `read_log`, -/// and budget via `token_usage::read_all`. +/// Turns are counted from `assistant` events in the log files. Budget is +/// computed from per-message token usage in the logs (for running sessions) +/// or from the `result` event's `total_cost_usd` (for completed sessions), +/// with `token_usage.jsonl` records as a fallback floor. fn check_agent_limits( agents: &Mutex>, project_root: &Path, @@ -119,7 +196,7 @@ fn check_agent_limits( continue; } - // Count turns — same path as tool_get_agent_remaining_turns_and_budget. + // Count turns from log files. let mut turns_used: u64 = 0; if max_turns.is_some() { let log_files = @@ -138,13 +215,18 @@ fn check_agent_limits( } } - // Compute budget — same path as tool_get_agent_remaining_turns_and_budget. + // Compute budget from log-based per-message estimates (works for + // running agents) and completed-session records from token_usage.jsonl. + // Use whichever is higher — log estimates cover in-flight sessions, + // while token_usage.jsonl records are accurate for completed ones. let budget_used_usd: f64 = if max_budget_usd.is_some() { - all_token_records + let log_cost = compute_budget_from_logs(project_root, story_id, agent_name); + let record_cost: f64 = all_token_records .iter() .filter(|r| r.story_id == *story_id && r.agent_name == *agent_name) .map(|r| r.usage.total_cost_usd) - .sum() + .sum(); + log_cost.max(record_cost) } else { 0.0 }; @@ -213,6 +295,13 @@ impl AgentPool { /// Run one watchdog pass: detect orphans, enforce limits, kill offenders. /// /// Called by the unified background tick loop every 30 ticks. + /// + /// When a limit is exceeded, the agent's PTY child is killed AND the story is + /// marked `blocked: true` in CRDT state. Marking blocked is essential — without + /// it, `auto_assign_available_work` immediately re-spawns the agent on the next + /// tick, the new session inherits the prior session's logs (so `turns_used` + /// keeps growing across sessions), and the agent burns through hundreds of + /// turns in a kill-respawn loop before any operator intervention. pub fn run_watchdog_pass(&self, project_root: Option<&Path>) -> usize { let orphaned = check_orphaned_agents(&self.agents); @@ -227,6 +316,24 @@ impl AgentPool { { handle.abort(); } + + // Mark the story `blocked: true` so auto-assign skips it on the + // next tick. Without this, the kill-respawn loop described above + // burns the agent's budget across many short sessions. + let story_id = key.rsplit_once(':').map(|(s, _)| s).unwrap_or(key); + if let Some(contents) = crate::db::read_content(story_id) { + let blocked = crate::io::story_metadata::write_blocked_in_content(&contents); + crate::db::write_content(story_id, &blocked); + let stage = crate::pipeline_state::read_typed(story_id) + .ok() + .flatten() + .map(|i| i.stage.dir_name().to_string()) + .unwrap_or_else(|| "2_current".to_string()); + crate::db::write_item_with_content(story_id, &stage, &blocked); + slog!( + "[watchdog] Marked story '{story_id}' as blocked after limit termination." + ); + } } if !terminated.is_empty() { Self::notify_agent_state_changed(&self.watcher_tx); @@ -591,4 +698,69 @@ max_budget_usd = 5.00 ); } } + + // ── Kill-respawn loop fix (bug 646) ────────────────────────────────────── + + /// When the watchdog terminates an agent for limit-exceeded, it must mark + /// the story `blocked: true` in CRDT state so `auto_assign_available_work` + /// won't immediately re-spawn the agent on the next tick. + /// + /// Without this, the kill-respawn loop produces the symptom observed in + /// production: the new session inherits the prior session's logs, so + /// `turns_used` keeps growing across sessions (e.g. 991/50 turns observed). + #[test] + fn watchdog_marks_story_blocked_after_limit_termination() { + crate::db::ensure_content_store(); + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + write_project_config( + root, + r#" +[[agent]] +name = "coder-1" +runtime = "claude-code" +max_turns = 10 +"#, + ); + + // Write story content into the CRDT-backed content store so the + // watchdog's mark-blocked path has something to read+update. + let story_id = "42_story_runaway"; + let initial = "---\nname: Runaway Story\n---\n# Runaway Story\n"; + crate::db::write_content(story_id, initial); + + // 12 turns exceeds the configured max of 10. + write_fake_log(root, story_id, "coder-1", 12); + + let pool = AgentPool::new_test(3001); + let _tx = pool.inject_test_agent(story_id, "coder-1", AgentStatus::Running); + + let found = pool.run_watchdog_pass(Some(root)); + assert!(found >= 1, "watchdog should detect the over-limit agent"); + + // CORE INVARIANT: after termination, the story must be marked blocked. + // Without this, auto_assign_available_work would immediately re-spawn + // the agent on its next tick. + let updated = crate::db::read_content(story_id) + .expect("story content must still exist after watchdog termination"); + assert!( + updated.contains("blocked: true"), + "story must be marked `blocked: true` after limit termination — got:\n{updated}" + ); + + // Sanity: the agent itself is also Failed with the right reason. + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key(story_id, "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!(agent.status, AgentStatus::Failed); + assert_eq!( + agent.termination_reason, + Some(TerminationReason::TurnLimit), + "termination reason must be TurnLimit" + ); + } + } } diff --git a/server/src/agents/pool/mod.rs b/server/src/agents/pool/mod.rs index 7243f987..5e9d0585 100644 --- a/server/src/agents/pool/mod.rs +++ b/server/src/agents/pool/mod.rs @@ -1,5 +1,5 @@ //! Agent pool — manages the set of active agents across all pipeline stages. -mod auto_assign; +pub(crate) mod auto_assign; mod pipeline; mod process; mod query; diff --git a/server/src/http/mcp/agent_tools.rs b/server/src/http/mcp/agent_tools.rs index 3a342eb5..4b13dc80 100644 --- a/server/src/http/mcp/agent_tools.rs +++ b/server/src/http/mcp/agent_tools.rs @@ -307,13 +307,20 @@ pub(super) fn tool_get_agent_remaining_turns_and_budget( } } - // Compute budget used from completed-session token usage records. + // Compute budget from log-based per-message estimates (works for running + // agents) and completed-session records from token_usage.jsonl. + let log_cost = crate::agents::pool::auto_assign::watchdog::compute_budget_from_logs( + &project_root, + story_id, + agent_name, + ); let all_records = crate::agents::token_usage::read_all(&project_root).unwrap_or_default(); - let budget_used_usd: f64 = all_records + let record_cost: f64 = all_records .iter() .filter(|r| r.story_id == story_id && r.agent_name == agent_name) .map(|r| r.usage.total_cost_usd) .sum(); + let budget_used_usd: f64 = log_cost.max(record_cost); let remaining_turns = max_turns.map(|max| (max as i64) - (turns_used as i64)); let remaining_budget_usd = max_budget_usd.map(|max| max - budget_used_usd);