huskies: merge 624_bug_agent_turn_and_budget_limits_not_enforced_coder_1_ran_5_6x_over_max_turns
This commit is contained in:
@@ -149,6 +149,7 @@ mod tests {
|
||||
log_session_id: None,
|
||||
merge_failure_reported: false,
|
||||
throttled: false,
|
||||
termination_reason: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
//! Watchdog task: detects orphaned agents and triggers auto-assign.
|
||||
//! Watchdog task: detects orphaned agents, enforces turn/budget limits, and
|
||||
//! triggers auto-assign.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::Mutex;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::config::ProjectConfig;
|
||||
use crate::slog;
|
||||
|
||||
use super::super::super::{AgentEvent, AgentStatus};
|
||||
use super::super::super::{AgentEvent, AgentStatus, TerminationReason};
|
||||
use super::super::{AgentPool, StoryAgent};
|
||||
|
||||
/// Scan the agent pool for Running entries whose backing tokio task has already
|
||||
@@ -65,6 +68,141 @@ pub(super) fn check_orphaned_agents(agents: &Mutex<HashMap<String, StoryAgent>>)
|
||||
count
|
||||
}
|
||||
|
||||
/// Scan running agents for turn/budget limit violations and terminate offenders.
|
||||
///
|
||||
/// Uses the same accessor path as `tool_get_agent_remaining_turns_and_budget`:
|
||||
/// log files are counted via `agent_log::list_story_log_files` + `read_log`,
|
||||
/// and budget via `token_usage::read_all`.
|
||||
fn check_agent_limits(
|
||||
agents: &Mutex<HashMap<String, StoryAgent>>,
|
||||
project_root: &Path,
|
||||
) -> Vec<(String, TerminationReason)> {
|
||||
let config = match ProjectConfig::load(project_root) {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Vec::new(),
|
||||
};
|
||||
|
||||
// Snapshot running agents: (key, story_id, agent_name, tx).
|
||||
let running: Vec<(String, String, String, broadcast::Sender<AgentEvent>)> = {
|
||||
let lock = match agents.lock() {
|
||||
Ok(l) => l,
|
||||
Err(_) => return Vec::new(),
|
||||
};
|
||||
lock.iter()
|
||||
.filter(|(_, agent)| agent.status == AgentStatus::Running)
|
||||
.map(|(key, agent)| {
|
||||
let story_id = key
|
||||
.rsplit_once(':')
|
||||
.map(|(s, _)| s.to_string())
|
||||
.unwrap_or_else(|| key.clone());
|
||||
(
|
||||
key.clone(),
|
||||
story_id,
|
||||
agent.agent_name.clone(),
|
||||
agent.tx.clone(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
let all_token_records = crate::agents::token_usage::read_all(project_root).unwrap_or_default();
|
||||
|
||||
let mut terminated = Vec::new();
|
||||
|
||||
for (key, story_id, agent_name, tx) in &running {
|
||||
let agent_config = config.agent.iter().find(|a| a.name == *agent_name);
|
||||
let max_turns = agent_config.and_then(|a| a.max_turns);
|
||||
let max_budget_usd = agent_config.and_then(|a| a.max_budget_usd);
|
||||
|
||||
// Skip agents with no limits configured.
|
||||
if max_turns.is_none() && max_budget_usd.is_none() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Count turns — same path as tool_get_agent_remaining_turns_and_budget.
|
||||
let mut turns_used: u64 = 0;
|
||||
if max_turns.is_some() {
|
||||
let log_files =
|
||||
crate::agent_log::list_story_log_files(project_root, story_id, Some(agent_name));
|
||||
for path in &log_files {
|
||||
if let Ok(entries) = crate::agent_log::read_log(path) {
|
||||
for entry in &entries {
|
||||
if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json")
|
||||
&& let Some(data) = entry.event.get("data")
|
||||
&& data.get("type").and_then(|v| v.as_str()) == Some("assistant")
|
||||
{
|
||||
turns_used += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Compute budget — same path as tool_get_agent_remaining_turns_and_budget.
|
||||
let budget_used_usd: f64 = if max_budget_usd.is_some() {
|
||||
all_token_records
|
||||
.iter()
|
||||
.filter(|r| r.story_id == *story_id && r.agent_name == *agent_name)
|
||||
.map(|r| r.usage.total_cost_usd)
|
||||
.sum()
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
// Determine if a limit is exceeded.
|
||||
let reason = if let Some(max) = max_turns {
|
||||
if turns_used >= max as u64 {
|
||||
Some(TerminationReason::TurnLimit)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
.or(if let Some(max) = max_budget_usd {
|
||||
if budget_used_usd >= max {
|
||||
Some(TerminationReason::BudgetLimit)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
});
|
||||
|
||||
if let Some(reason) = reason {
|
||||
let reason_str = match &reason {
|
||||
TerminationReason::TurnLimit => {
|
||||
format!("turn limit exceeded ({turns_used}/{})", max_turns.unwrap())
|
||||
}
|
||||
TerminationReason::BudgetLimit => format!(
|
||||
"budget limit exceeded (${budget_used_usd:.2}/${:.2})",
|
||||
max_budget_usd.unwrap()
|
||||
),
|
||||
};
|
||||
|
||||
// Mark agent as Failed with termination reason.
|
||||
if let Ok(mut lock) = agents.lock()
|
||||
&& let Some(agent) = lock.get_mut(key)
|
||||
{
|
||||
agent.status = AgentStatus::Failed;
|
||||
agent.termination_reason = Some(reason.clone());
|
||||
}
|
||||
|
||||
slog!("[watchdog] Terminating agent '{key}': {reason_str}.");
|
||||
|
||||
let _ = tx.send(AgentEvent::Error {
|
||||
story_id: story_id.clone(),
|
||||
agent_name: agent_name.clone(),
|
||||
message: format!("Agent terminated by watchdog: {reason_str}"),
|
||||
});
|
||||
|
||||
terminated.push((key.clone(), reason));
|
||||
}
|
||||
}
|
||||
|
||||
terminated
|
||||
}
|
||||
|
||||
impl AgentPool {
|
||||
/// Run a single watchdog pass synchronously (test helper).
|
||||
#[cfg(test)]
|
||||
@@ -72,11 +210,31 @@ impl AgentPool {
|
||||
check_orphaned_agents(&self.agents);
|
||||
}
|
||||
|
||||
/// Run one watchdog pass and return the number of orphaned agents detected.
|
||||
/// Run one watchdog pass: detect orphans, enforce limits, kill offenders.
|
||||
///
|
||||
/// Called by the unified background tick loop every 30 ticks.
|
||||
pub fn run_watchdog_pass(&self) -> usize {
|
||||
check_orphaned_agents(&self.agents)
|
||||
pub fn run_watchdog_pass(&self, project_root: Option<&Path>) -> usize {
|
||||
let orphaned = check_orphaned_agents(&self.agents);
|
||||
|
||||
if let Some(root) = project_root {
|
||||
let terminated = check_agent_limits(&self.agents, root);
|
||||
for (key, _reason) in &terminated {
|
||||
// Kill the PTY child and abort the task, same as stop_agent.
|
||||
self.kill_child_for_key(key);
|
||||
if let Ok(mut lock) = self.agents.lock()
|
||||
&& let Some(agent) = lock.get_mut(key)
|
||||
&& let Some(handle) = agent.task_handle.take()
|
||||
{
|
||||
handle.abort();
|
||||
}
|
||||
}
|
||||
if !terminated.is_empty() {
|
||||
Self::notify_agent_state_changed(&self.watcher_tx);
|
||||
}
|
||||
return orphaned + terminated.len();
|
||||
}
|
||||
|
||||
orphaned
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +245,58 @@ mod tests {
|
||||
use super::super::super::{AgentPool, composite_key};
|
||||
use super::*;
|
||||
|
||||
/// Write a fake agent log file with `n` assistant turn entries.
|
||||
fn write_fake_log(project_root: &Path, story_id: &str, agent_name: &str, n_turns: u64) {
|
||||
let log_dir = project_root.join(".huskies").join("logs").join(story_id);
|
||||
std::fs::create_dir_all(&log_dir).unwrap();
|
||||
let log_path = log_dir.join(format!("{agent_name}-test-session.log"));
|
||||
let mut content = String::new();
|
||||
for _ in 0..n_turns {
|
||||
content.push_str(
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"timestamp": "2026-04-25T00:00:00Z",
|
||||
"type": "agent_json",
|
||||
"story_id": story_id,
|
||||
"agent_name": agent_name,
|
||||
"data": { "type": "assistant", "message": {} }
|
||||
}))
|
||||
.unwrap(),
|
||||
);
|
||||
content.push('\n');
|
||||
}
|
||||
std::fs::write(log_path, content).unwrap();
|
||||
}
|
||||
|
||||
/// Write a fake token usage record with the given cost.
|
||||
fn write_fake_token_usage(
|
||||
project_root: &Path,
|
||||
story_id: &str,
|
||||
agent_name: &str,
|
||||
cost_usd: f64,
|
||||
) {
|
||||
let record = crate::agents::token_usage::TokenUsageRecord {
|
||||
story_id: story_id.to_string(),
|
||||
agent_name: agent_name.to_string(),
|
||||
timestamp: "2026-04-25T00:00:00Z".to_string(),
|
||||
model: None,
|
||||
usage: crate::agents::TokenUsage {
|
||||
input_tokens: 1000,
|
||||
output_tokens: 500,
|
||||
cache_creation_input_tokens: 0,
|
||||
cache_read_input_tokens: 0,
|
||||
total_cost_usd: cost_usd,
|
||||
},
|
||||
};
|
||||
crate::agents::token_usage::append_record(project_root, &record).unwrap();
|
||||
}
|
||||
|
||||
/// Write a minimal project.toml with the given agent config.
|
||||
fn write_project_config(project_root: &Path, config_toml: &str) {
|
||||
let huskies_dir = project_root.join(".huskies");
|
||||
std::fs::create_dir_all(&huskies_dir).unwrap();
|
||||
std::fs::write(huskies_dir.join("project.toml"), config_toml).unwrap();
|
||||
}
|
||||
|
||||
// ── check_orphaned_agents return value tests (bug 161) ──────────────────
|
||||
|
||||
#[tokio::test]
|
||||
@@ -160,6 +370,190 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
// ── Limit enforcement integration tests (bug 624) ────────────────────────
|
||||
|
||||
#[test]
|
||||
fn watchdog_terminates_agent_exceeding_turn_limit() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
write_project_config(
|
||||
root,
|
||||
r#"
|
||||
[[agent]]
|
||||
name = "coder-1"
|
||||
runtime = "claude-code"
|
||||
max_turns = 10
|
||||
"#,
|
||||
);
|
||||
|
||||
// Write 12 turns (exceeds limit of 10).
|
||||
write_fake_log(root, "story_a", "coder-1", 12);
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
let tx = pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running);
|
||||
let mut rx = tx.subscribe();
|
||||
|
||||
let found = pool.run_watchdog_pass(Some(root));
|
||||
assert!(found >= 1, "watchdog should detect the over-limit agent");
|
||||
|
||||
// Agent should now be Failed with TurnLimit reason.
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("story_a", "coder-1");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert_eq!(agent.status, AgentStatus::Failed);
|
||||
assert_eq!(
|
||||
agent.termination_reason,
|
||||
Some(TerminationReason::TurnLimit),
|
||||
"termination reason must be TurnLimit"
|
||||
);
|
||||
}
|
||||
|
||||
let event = rx.try_recv().expect("watchdog must emit an Error event");
|
||||
assert!(
|
||||
matches!(event, AgentEvent::Error { .. }),
|
||||
"expected AgentEvent::Error, got: {event:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn watchdog_terminates_agent_exceeding_budget_limit() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
write_project_config(
|
||||
root,
|
||||
r#"
|
||||
[[agent]]
|
||||
name = "coder-1"
|
||||
runtime = "claude-code"
|
||||
max_budget_usd = 5.00
|
||||
"#,
|
||||
);
|
||||
|
||||
// Write $6.00 in usage (exceeds limit of $5.00).
|
||||
write_fake_token_usage(root, "story_b", "coder-1", 6.00);
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
let tx = pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running);
|
||||
let mut rx = tx.subscribe();
|
||||
|
||||
let found = pool.run_watchdog_pass(Some(root));
|
||||
assert!(found >= 1, "watchdog should detect the over-budget agent");
|
||||
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("story_b", "coder-1");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert_eq!(agent.status, AgentStatus::Failed);
|
||||
assert_eq!(
|
||||
agent.termination_reason,
|
||||
Some(TerminationReason::BudgetLimit),
|
||||
"termination reason must be BudgetLimit"
|
||||
);
|
||||
}
|
||||
|
||||
let event = rx.try_recv().expect("watchdog must emit an Error event");
|
||||
assert!(matches!(event, AgentEvent::Error { .. }));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn watchdog_does_not_terminate_agent_under_limits() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
write_project_config(
|
||||
root,
|
||||
r#"
|
||||
[[agent]]
|
||||
name = "coder-1"
|
||||
runtime = "claude-code"
|
||||
max_turns = 50
|
||||
max_budget_usd = 10.00
|
||||
"#,
|
||||
);
|
||||
|
||||
// Agent is under both limits.
|
||||
write_fake_log(root, "story_c", "coder-1", 25);
|
||||
write_fake_token_usage(root, "story_c", "coder-1", 3.00);
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
pool.inject_test_agent("story_c", "coder-1", AgentStatus::Running);
|
||||
|
||||
let found = pool.run_watchdog_pass(Some(root));
|
||||
assert_eq!(found, 0, "agent under limits should not be terminated");
|
||||
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("story_c", "coder-1");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert_eq!(
|
||||
agent.status,
|
||||
AgentStatus::Running,
|
||||
"agent under limits should stay Running"
|
||||
);
|
||||
assert!(agent.termination_reason.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
/// Regression test for the original bug 624 incident:
|
||||
/// coder-1 with max_turns=50, max_budget_usd=5.00 ran 5.6× over the turn
|
||||
/// limit (280 turns). The watchdog must terminate at the turn limit (turns
|
||||
/// hit first in the observed trace), with reason TurnLimit.
|
||||
#[test]
|
||||
fn regression_bug624_coder1_story623_trajectory() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
write_project_config(
|
||||
root,
|
||||
r#"
|
||||
[[agent]]
|
||||
name = "coder-1"
|
||||
runtime = "claude-code"
|
||||
max_turns = 50
|
||||
max_budget_usd = 5.00
|
||||
"#,
|
||||
);
|
||||
|
||||
// Simulate the trajectory: 55 turns (just past the limit — the watchdog
|
||||
// would have caught it here, not at 280) and $2.50 spent (under budget).
|
||||
// Turns hit first, so reason should be TurnLimit.
|
||||
write_fake_log(root, "story_623", "coder-1", 55);
|
||||
write_fake_token_usage(root, "story_623", "coder-1", 2.50);
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
let tx = pool.inject_test_agent("story_623", "coder-1", AgentStatus::Running);
|
||||
let mut rx = tx.subscribe();
|
||||
|
||||
let found = pool.run_watchdog_pass(Some(root));
|
||||
assert!(found >= 1, "watchdog must catch the turn-limit violation");
|
||||
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("story_623", "coder-1");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert_eq!(agent.status, AgentStatus::Failed);
|
||||
assert_eq!(
|
||||
agent.termination_reason,
|
||||
Some(TerminationReason::TurnLimit),
|
||||
"turns hit first in the observed trace, so reason must be TurnLimit"
|
||||
);
|
||||
}
|
||||
|
||||
// The error event should have been emitted.
|
||||
let event = rx.try_recv().expect("watchdog must emit an Error event");
|
||||
if let AgentEvent::Error { message, .. } = &event {
|
||||
assert!(
|
||||
message.contains("turn limit"),
|
||||
"error message should mention turn limit, got: {message}"
|
||||
);
|
||||
} else {
|
||||
panic!("expected AgentEvent::Error, got: {event:?}");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn watchdog_orphan_detection_returns_nonzero_enabling_auto_assign() {
|
||||
// This test verifies the contract that `check_orphaned_agents` returns
|
||||
|
||||
@@ -285,6 +285,7 @@ impl AgentPool {
|
||||
log_session_id: Some(log_session_id.clone()),
|
||||
merge_failure_reported: false,
|
||||
throttled: false,
|
||||
termination_reason: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -652,6 +653,7 @@ impl AgentPool {
|
||||
completion: None,
|
||||
log_session_id: Some(log_session_id),
|
||||
throttled: false,
|
||||
termination_reason: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ impl AgentPool {
|
||||
log_session_id: None,
|
||||
merge_failure_reported: false,
|
||||
throttled: false,
|
||||
termination_reason: None,
|
||||
},
|
||||
);
|
||||
tx
|
||||
@@ -71,6 +72,7 @@ impl AgentPool {
|
||||
log_session_id: None,
|
||||
merge_failure_reported: false,
|
||||
throttled: false,
|
||||
termination_reason: None,
|
||||
},
|
||||
);
|
||||
tx
|
||||
@@ -104,6 +106,7 @@ impl AgentPool {
|
||||
log_session_id: None,
|
||||
merge_failure_reported: false,
|
||||
throttled: false,
|
||||
termination_reason: None,
|
||||
},
|
||||
);
|
||||
tx
|
||||
@@ -136,6 +139,7 @@ impl AgentPool {
|
||||
log_session_id: None,
|
||||
merge_failure_reported: false,
|
||||
throttled: false,
|
||||
termination_reason: None,
|
||||
},
|
||||
);
|
||||
tx
|
||||
|
||||
@@ -84,6 +84,8 @@ pub(super) struct StoryAgent {
|
||||
/// Set to `true` when a rate-limit throttle warning was received for this agent.
|
||||
/// True when a rate-limit throttle warning was received for this agent.
|
||||
pub(super) throttled: bool,
|
||||
/// Set when the watchdog terminates the agent for exceeding a limit.
|
||||
pub(super) termination_reason: Option<crate::agents::TerminationReason>,
|
||||
}
|
||||
|
||||
/// Build an `AgentInfo` snapshot from a `StoryAgent` map entry.
|
||||
@@ -104,5 +106,6 @@ pub(super) fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> Agent
|
||||
completion: agent.completion.clone(),
|
||||
log_session_id: agent.log_session_id.clone(),
|
||||
throttled: agent.throttled,
|
||||
termination_reason: agent.termination_reason.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,6 +71,7 @@ impl AgentPool {
|
||||
completion: None,
|
||||
log_session_id: None,
|
||||
throttled: false,
|
||||
termination_reason: None,
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user