huskies: merge 646_bug_watchdog_from_bug_624_is_not_actually_enforcing_max_turns_max_budget_usd_in_production
This commit is contained in:
@@ -3,7 +3,7 @@ pub mod gates;
|
|||||||
pub mod lifecycle;
|
pub mod lifecycle;
|
||||||
pub mod local_prompt;
|
pub mod local_prompt;
|
||||||
pub mod merge;
|
pub mod merge;
|
||||||
mod pool;
|
pub(crate) mod pool;
|
||||||
pub(crate) mod pty;
|
pub(crate) mod pty;
|
||||||
pub mod runtime;
|
pub mod runtime;
|
||||||
pub mod token_usage;
|
pub mod token_usage;
|
||||||
@@ -161,6 +161,27 @@ pub struct TokenUsage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl TokenUsage {
|
impl TokenUsage {
|
||||||
|
/// Estimate USD cost from token counts using approximate per-model pricing.
|
||||||
|
///
|
||||||
|
/// Used by the watchdog to compute in-flight budget from per-message usage
|
||||||
|
/// data in the agent log (since `total_cost_usd` is only available in the
|
||||||
|
/// `result` event at session end). Uses conservative (high) pricing when
|
||||||
|
/// the model is unknown so budget limits are hit sooner rather than later.
|
||||||
|
pub fn estimate_cost_usd(&self, model: Option<&str>) -> f64 {
|
||||||
|
// Per-million-token pricing (input, output, cache_read, cache_create).
|
||||||
|
let (inp, out, cr, cc) = match model {
|
||||||
|
Some(m) if m.contains("haiku") => (0.80, 4.0, 0.08, 1.00),
|
||||||
|
Some(m) if m.contains("sonnet") => (3.0, 15.0, 0.30, 3.75),
|
||||||
|
// Opus or unknown → most expensive = conservative.
|
||||||
|
_ => (15.0, 75.0, 1.50, 18.75),
|
||||||
|
};
|
||||||
|
(self.input_tokens as f64 * inp
|
||||||
|
+ self.output_tokens as f64 * out
|
||||||
|
+ self.cache_read_input_tokens as f64 * cr
|
||||||
|
+ self.cache_creation_input_tokens as f64 * cc)
|
||||||
|
/ 1_000_000.0
|
||||||
|
}
|
||||||
|
|
||||||
/// Parse token usage from a Claude Code `result` JSON event.
|
/// Parse token usage from a Claude Code `result` JSON event.
|
||||||
pub fn from_result_event(json: &serde_json::Value) -> Option<Self> {
|
pub fn from_result_event(json: &serde_json::Value) -> Option<Self> {
|
||||||
let usage = json.get("usage")?;
|
let usage = json.get("usage")?;
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ mod auto_assign;
|
|||||||
mod reconcile;
|
mod reconcile;
|
||||||
mod scan;
|
mod scan;
|
||||||
mod story_checks;
|
mod story_checks;
|
||||||
mod watchdog;
|
pub(crate) mod watchdog;
|
||||||
|
|
||||||
// Re-export items that were pub(super) in the original monolithic auto_assign.rs
|
// Re-export items that were pub(super) in the original monolithic auto_assign.rs
|
||||||
// so that pool::lifecycle and pool::pipeline continue to access them unchanged.
|
// so that pool::lifecycle and pool::pipeline continue to access them unchanged.
|
||||||
|
|||||||
@@ -68,11 +68,88 @@ pub(super) fn check_orphaned_agents(agents: &Mutex<HashMap<String, StoryAgent>>)
|
|||||||
count
|
count
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compute in-flight budget from per-message usage data in agent log files.
|
||||||
|
///
|
||||||
|
/// For completed sessions (containing a `result` event), uses the accurate
|
||||||
|
/// `total_cost_usd` from Claude Code. For running sessions (no `result`
|
||||||
|
/// event yet), estimates cost from per-turn token counts in `assistant`
|
||||||
|
/// events.
|
||||||
|
pub(crate) fn compute_budget_from_logs(
|
||||||
|
project_root: &Path,
|
||||||
|
story_id: &str,
|
||||||
|
agent_name: &str,
|
||||||
|
) -> f64 {
|
||||||
|
use crate::agents::TokenUsage;
|
||||||
|
|
||||||
|
let log_files =
|
||||||
|
crate::agent_log::list_story_log_files(project_root, story_id, Some(agent_name));
|
||||||
|
let mut total_cost = 0.0;
|
||||||
|
|
||||||
|
for path in &log_files {
|
||||||
|
let entries = match crate::agent_log::read_log(path) {
|
||||||
|
Ok(e) => e,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Check for a result event with accurate total_cost_usd (completed session).
|
||||||
|
let result_cost = entries.iter().rev().find_map(|entry| {
|
||||||
|
if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json")
|
||||||
|
&& let Some(data) = entry.event.get("data")
|
||||||
|
&& data.get("type").and_then(|v| v.as_str()) == Some("result")
|
||||||
|
{
|
||||||
|
data.get("total_cost_usd").and_then(|v| v.as_f64())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if let Some(cost) = result_cost {
|
||||||
|
// Completed session — use accurate cost.
|
||||||
|
total_cost += cost;
|
||||||
|
} else {
|
||||||
|
// Running session — estimate from per-message token usage.
|
||||||
|
for entry in &entries {
|
||||||
|
if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json")
|
||||||
|
&& let Some(data) = entry.event.get("data")
|
||||||
|
&& data.get("type").and_then(|v| v.as_str()) == Some("assistant")
|
||||||
|
&& let Some(message) = data.get("message")
|
||||||
|
&& let Some(usage) = message.get("usage")
|
||||||
|
{
|
||||||
|
let model = message.get("model").and_then(|v| v.as_str());
|
||||||
|
let token_usage = TokenUsage {
|
||||||
|
input_tokens: usage
|
||||||
|
.get("input_tokens")
|
||||||
|
.and_then(|v| v.as_u64())
|
||||||
|
.unwrap_or(0),
|
||||||
|
output_tokens: usage
|
||||||
|
.get("output_tokens")
|
||||||
|
.and_then(|v| v.as_u64())
|
||||||
|
.unwrap_or(0),
|
||||||
|
cache_creation_input_tokens: usage
|
||||||
|
.get("cache_creation_input_tokens")
|
||||||
|
.and_then(|v| v.as_u64())
|
||||||
|
.unwrap_or(0),
|
||||||
|
cache_read_input_tokens: usage
|
||||||
|
.get("cache_read_input_tokens")
|
||||||
|
.and_then(|v| v.as_u64())
|
||||||
|
.unwrap_or(0),
|
||||||
|
total_cost_usd: 0.0,
|
||||||
|
};
|
||||||
|
total_cost += token_usage.estimate_cost_usd(model);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
total_cost
|
||||||
|
}
|
||||||
|
|
||||||
/// Scan running agents for turn/budget limit violations and terminate offenders.
|
/// Scan running agents for turn/budget limit violations and terminate offenders.
|
||||||
///
|
///
|
||||||
/// Uses the same accessor path as `tool_get_agent_remaining_turns_and_budget`:
|
/// Turns are counted from `assistant` events in the log files. Budget is
|
||||||
/// log files are counted via `agent_log::list_story_log_files` + `read_log`,
|
/// computed from per-message token usage in the logs (for running sessions)
|
||||||
/// and budget via `token_usage::read_all`.
|
/// or from the `result` event's `total_cost_usd` (for completed sessions),
|
||||||
|
/// with `token_usage.jsonl` records as a fallback floor.
|
||||||
fn check_agent_limits(
|
fn check_agent_limits(
|
||||||
agents: &Mutex<HashMap<String, StoryAgent>>,
|
agents: &Mutex<HashMap<String, StoryAgent>>,
|
||||||
project_root: &Path,
|
project_root: &Path,
|
||||||
@@ -119,7 +196,7 @@ fn check_agent_limits(
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Count turns — same path as tool_get_agent_remaining_turns_and_budget.
|
// Count turns from log files.
|
||||||
let mut turns_used: u64 = 0;
|
let mut turns_used: u64 = 0;
|
||||||
if max_turns.is_some() {
|
if max_turns.is_some() {
|
||||||
let log_files =
|
let log_files =
|
||||||
@@ -138,13 +215,18 @@ fn check_agent_limits(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compute budget — same path as tool_get_agent_remaining_turns_and_budget.
|
// Compute budget from log-based per-message estimates (works for
|
||||||
|
// running agents) and completed-session records from token_usage.jsonl.
|
||||||
|
// Use whichever is higher — log estimates cover in-flight sessions,
|
||||||
|
// while token_usage.jsonl records are accurate for completed ones.
|
||||||
let budget_used_usd: f64 = if max_budget_usd.is_some() {
|
let budget_used_usd: f64 = if max_budget_usd.is_some() {
|
||||||
all_token_records
|
let log_cost = compute_budget_from_logs(project_root, story_id, agent_name);
|
||||||
|
let record_cost: f64 = all_token_records
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|r| r.story_id == *story_id && r.agent_name == *agent_name)
|
.filter(|r| r.story_id == *story_id && r.agent_name == *agent_name)
|
||||||
.map(|r| r.usage.total_cost_usd)
|
.map(|r| r.usage.total_cost_usd)
|
||||||
.sum()
|
.sum();
|
||||||
|
log_cost.max(record_cost)
|
||||||
} else {
|
} else {
|
||||||
0.0
|
0.0
|
||||||
};
|
};
|
||||||
@@ -213,6 +295,13 @@ impl AgentPool {
|
|||||||
/// Run one watchdog pass: detect orphans, enforce limits, kill offenders.
|
/// Run one watchdog pass: detect orphans, enforce limits, kill offenders.
|
||||||
///
|
///
|
||||||
/// Called by the unified background tick loop every 30 ticks.
|
/// Called by the unified background tick loop every 30 ticks.
|
||||||
|
///
|
||||||
|
/// When a limit is exceeded, the agent's PTY child is killed AND the story is
|
||||||
|
/// marked `blocked: true` in CRDT state. Marking blocked is essential — without
|
||||||
|
/// it, `auto_assign_available_work` immediately re-spawns the agent on the next
|
||||||
|
/// tick, the new session inherits the prior session's logs (so `turns_used`
|
||||||
|
/// keeps growing across sessions), and the agent burns through hundreds of
|
||||||
|
/// turns in a kill-respawn loop before any operator intervention.
|
||||||
pub fn run_watchdog_pass(&self, project_root: Option<&Path>) -> usize {
|
pub fn run_watchdog_pass(&self, project_root: Option<&Path>) -> usize {
|
||||||
let orphaned = check_orphaned_agents(&self.agents);
|
let orphaned = check_orphaned_agents(&self.agents);
|
||||||
|
|
||||||
@@ -227,6 +316,24 @@ impl AgentPool {
|
|||||||
{
|
{
|
||||||
handle.abort();
|
handle.abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mark the story `blocked: true` so auto-assign skips it on the
|
||||||
|
// next tick. Without this, the kill-respawn loop described above
|
||||||
|
// burns the agent's budget across many short sessions.
|
||||||
|
let story_id = key.rsplit_once(':').map(|(s, _)| s).unwrap_or(key);
|
||||||
|
if let Some(contents) = crate::db::read_content(story_id) {
|
||||||
|
let blocked = crate::io::story_metadata::write_blocked_in_content(&contents);
|
||||||
|
crate::db::write_content(story_id, &blocked);
|
||||||
|
let stage = crate::pipeline_state::read_typed(story_id)
|
||||||
|
.ok()
|
||||||
|
.flatten()
|
||||||
|
.map(|i| i.stage.dir_name().to_string())
|
||||||
|
.unwrap_or_else(|| "2_current".to_string());
|
||||||
|
crate::db::write_item_with_content(story_id, &stage, &blocked);
|
||||||
|
slog!(
|
||||||
|
"[watchdog] Marked story '{story_id}' as blocked after limit termination."
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if !terminated.is_empty() {
|
if !terminated.is_empty() {
|
||||||
Self::notify_agent_state_changed(&self.watcher_tx);
|
Self::notify_agent_state_changed(&self.watcher_tx);
|
||||||
@@ -591,4 +698,69 @@ max_budget_usd = 5.00
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Kill-respawn loop fix (bug 646) ──────────────────────────────────────
|
||||||
|
|
||||||
|
/// When the watchdog terminates an agent for limit-exceeded, it must mark
|
||||||
|
/// the story `blocked: true` in CRDT state so `auto_assign_available_work`
|
||||||
|
/// won't immediately re-spawn the agent on the next tick.
|
||||||
|
///
|
||||||
|
/// Without this, the kill-respawn loop produces the symptom observed in
|
||||||
|
/// production: the new session inherits the prior session's logs, so
|
||||||
|
/// `turns_used` keeps growing across sessions (e.g. 991/50 turns observed).
|
||||||
|
#[test]
|
||||||
|
fn watchdog_marks_story_blocked_after_limit_termination() {
|
||||||
|
crate::db::ensure_content_store();
|
||||||
|
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let root = tmp.path();
|
||||||
|
|
||||||
|
write_project_config(
|
||||||
|
root,
|
||||||
|
r#"
|
||||||
|
[[agent]]
|
||||||
|
name = "coder-1"
|
||||||
|
runtime = "claude-code"
|
||||||
|
max_turns = 10
|
||||||
|
"#,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Write story content into the CRDT-backed content store so the
|
||||||
|
// watchdog's mark-blocked path has something to read+update.
|
||||||
|
let story_id = "42_story_runaway";
|
||||||
|
let initial = "---\nname: Runaway Story\n---\n# Runaway Story\n";
|
||||||
|
crate::db::write_content(story_id, initial);
|
||||||
|
|
||||||
|
// 12 turns exceeds the configured max of 10.
|
||||||
|
write_fake_log(root, story_id, "coder-1", 12);
|
||||||
|
|
||||||
|
let pool = AgentPool::new_test(3001);
|
||||||
|
let _tx = pool.inject_test_agent(story_id, "coder-1", AgentStatus::Running);
|
||||||
|
|
||||||
|
let found = pool.run_watchdog_pass(Some(root));
|
||||||
|
assert!(found >= 1, "watchdog should detect the over-limit agent");
|
||||||
|
|
||||||
|
// CORE INVARIANT: after termination, the story must be marked blocked.
|
||||||
|
// Without this, auto_assign_available_work would immediately re-spawn
|
||||||
|
// the agent on its next tick.
|
||||||
|
let updated = crate::db::read_content(story_id)
|
||||||
|
.expect("story content must still exist after watchdog termination");
|
||||||
|
assert!(
|
||||||
|
updated.contains("blocked: true"),
|
||||||
|
"story must be marked `blocked: true` after limit termination — got:\n{updated}"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Sanity: the agent itself is also Failed with the right reason.
|
||||||
|
{
|
||||||
|
let agents = pool.agents.lock().unwrap();
|
||||||
|
let key = composite_key(story_id, "coder-1");
|
||||||
|
let agent = agents.get(&key).unwrap();
|
||||||
|
assert_eq!(agent.status, AgentStatus::Failed);
|
||||||
|
assert_eq!(
|
||||||
|
agent.termination_reason,
|
||||||
|
Some(TerminationReason::TurnLimit),
|
||||||
|
"termination reason must be TurnLimit"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
//! Agent pool — manages the set of active agents across all pipeline stages.
|
//! Agent pool — manages the set of active agents across all pipeline stages.
|
||||||
mod auto_assign;
|
pub(crate) mod auto_assign;
|
||||||
mod pipeline;
|
mod pipeline;
|
||||||
mod process;
|
mod process;
|
||||||
mod query;
|
mod query;
|
||||||
|
|||||||
@@ -307,13 +307,20 @@ pub(super) fn tool_get_agent_remaining_turns_and_budget(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compute budget used from completed-session token usage records.
|
// Compute budget from log-based per-message estimates (works for running
|
||||||
|
// agents) and completed-session records from token_usage.jsonl.
|
||||||
|
let log_cost = crate::agents::pool::auto_assign::watchdog::compute_budget_from_logs(
|
||||||
|
&project_root,
|
||||||
|
story_id,
|
||||||
|
agent_name,
|
||||||
|
);
|
||||||
let all_records = crate::agents::token_usage::read_all(&project_root).unwrap_or_default();
|
let all_records = crate::agents::token_usage::read_all(&project_root).unwrap_or_default();
|
||||||
let budget_used_usd: f64 = all_records
|
let record_cost: f64 = all_records
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|r| r.story_id == story_id && r.agent_name == agent_name)
|
.filter(|r| r.story_id == story_id && r.agent_name == agent_name)
|
||||||
.map(|r| r.usage.total_cost_usd)
|
.map(|r| r.usage.total_cost_usd)
|
||||||
.sum();
|
.sum();
|
||||||
|
let budget_used_usd: f64 = log_cost.max(record_cost);
|
||||||
|
|
||||||
let remaining_turns = max_turns.map(|max| (max as i64) - (turns_used as i64));
|
let remaining_turns = max_turns.map(|max| (max as i64) - (turns_used as i64));
|
||||||
let remaining_budget_usd = max_budget_usd.map(|max| max - budget_used_usd);
|
let remaining_budget_usd = max_budget_usd.map(|max| max - budget_used_usd);
|
||||||
|
|||||||
Reference in New Issue
Block a user