huskies: merge 783
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,94 @@
|
||||
//! Budget computation: estimates USD spend from agent log files.
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
/// Compute in-flight budget from per-message usage data in agent log files.
|
||||
///
|
||||
/// Sums cost across **all** sessions for the given story+agent combination.
|
||||
/// For completed sessions (containing a `result` event), uses the accurate
|
||||
/// `total_cost_usd` from Claude Code. For running sessions (no `result`
|
||||
/// event yet), estimates cost from per-turn token counts in `assistant`
|
||||
/// events.
|
||||
///
|
||||
/// This cumulative total is useful for cost analysis (metric tool) but is
|
||||
/// NOT used by the watchdog for enforcement — the watchdog uses
|
||||
/// [`compute_budget_from_single_log`] on the current session only.
|
||||
pub(crate) fn compute_budget_from_logs(
|
||||
project_root: &Path,
|
||||
story_id: &str,
|
||||
agent_name: &str,
|
||||
) -> f64 {
|
||||
let log_files =
|
||||
crate::agent_log::list_story_log_files(project_root, story_id, Some(agent_name));
|
||||
let mut total_cost = 0.0;
|
||||
|
||||
for path in &log_files {
|
||||
total_cost += compute_budget_from_single_log(path);
|
||||
}
|
||||
|
||||
total_cost
|
||||
}
|
||||
|
||||
/// Compute budget from a single log file.
|
||||
///
|
||||
/// If the log contains a `result` event with `total_cost_usd` (completed
|
||||
/// session), returns that. Otherwise estimates cost from per-turn token
|
||||
/// usage in `assistant` events (running session).
|
||||
pub(crate) fn compute_budget_from_single_log(path: &Path) -> f64 {
|
||||
use crate::agents::TokenUsage;
|
||||
|
||||
let entries = match crate::agent_log::read_log(path) {
|
||||
Ok(e) => e,
|
||||
Err(_) => return 0.0,
|
||||
};
|
||||
|
||||
// Check for a result event with accurate total_cost_usd (completed session).
|
||||
let result_cost = entries.iter().rev().find_map(|entry| {
|
||||
if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json")
|
||||
&& let Some(data) = entry.event.get("data")
|
||||
&& data.get("type").and_then(|v| v.as_str()) == Some("result")
|
||||
{
|
||||
data.get("total_cost_usd").and_then(|v| v.as_f64())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
|
||||
if let Some(cost) = result_cost {
|
||||
return cost;
|
||||
}
|
||||
|
||||
// Running session — estimate from per-message token usage.
|
||||
let mut cost = 0.0;
|
||||
for entry in &entries {
|
||||
if entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json")
|
||||
&& let Some(data) = entry.event.get("data")
|
||||
&& data.get("type").and_then(|v| v.as_str()) == Some("assistant")
|
||||
&& let Some(message) = data.get("message")
|
||||
&& let Some(usage) = message.get("usage")
|
||||
{
|
||||
let model = message.get("model").and_then(|v| v.as_str());
|
||||
let token_usage = TokenUsage {
|
||||
input_tokens: usage
|
||||
.get("input_tokens")
|
||||
.and_then(|v| v.as_u64())
|
||||
.unwrap_or(0),
|
||||
output_tokens: usage
|
||||
.get("output_tokens")
|
||||
.and_then(|v| v.as_u64())
|
||||
.unwrap_or(0),
|
||||
cache_creation_input_tokens: usage
|
||||
.get("cache_creation_input_tokens")
|
||||
.and_then(|v| v.as_u64())
|
||||
.unwrap_or(0),
|
||||
cache_read_input_tokens: usage
|
||||
.get("cache_read_input_tokens")
|
||||
.and_then(|v| v.as_u64())
|
||||
.unwrap_or(0),
|
||||
total_cost_usd: 0.0,
|
||||
};
|
||||
cost += token_usage.estimate_cost_usd(model);
|
||||
}
|
||||
}
|
||||
cost
|
||||
}
|
||||
@@ -0,0 +1,191 @@
|
||||
//! Limit enforcement: checks agent turn/budget limits and terminates offenders.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::Mutex;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::agents::pool::StoryAgent;
|
||||
use crate::agents::{AgentEvent, AgentStatus, TerminationReason};
|
||||
use crate::config::ProjectConfig;
|
||||
use crate::slog;
|
||||
|
||||
use super::budget::compute_budget_from_single_log;
|
||||
|
||||
/// Snapshot of a running agent for limit checking: (composite_key, story_id, agent_name, event_tx, log_session_id).
|
||||
type RunningAgentSnapshot = (
|
||||
String,
|
||||
String,
|
||||
String,
|
||||
broadcast::Sender<AgentEvent>,
|
||||
Option<String>,
|
||||
);
|
||||
|
||||
/// Resolve the current session's log file for an agent.
|
||||
///
|
||||
/// Uses `log_session_id` to construct the exact path when available;
|
||||
/// otherwise falls back to the most recently modified log file for the
|
||||
/// agent.
|
||||
pub(crate) fn resolve_session_log(
|
||||
project_root: &Path,
|
||||
story_id: &str,
|
||||
agent_name: &str,
|
||||
log_session_id: &Option<String>,
|
||||
) -> Option<std::path::PathBuf> {
|
||||
if let Some(sid) = log_session_id {
|
||||
let path = crate::agent_log::log_file_path(project_root, story_id, agent_name, sid);
|
||||
if path.exists() {
|
||||
return Some(path);
|
||||
}
|
||||
}
|
||||
crate::agent_log::find_latest_log(project_root, story_id, agent_name)
|
||||
}
|
||||
|
||||
/// Count `assistant` events in a single log file.
|
||||
pub(crate) fn count_turns_in_log(path: &Path) -> u64 {
|
||||
let entries = match crate::agent_log::read_log(path) {
|
||||
Ok(e) => e,
|
||||
Err(_) => return 0,
|
||||
};
|
||||
entries
|
||||
.iter()
|
||||
.filter(|entry| {
|
||||
entry.event.get("type").and_then(|v| v.as_str()) == Some("agent_json")
|
||||
&& entry
|
||||
.event
|
||||
.get("data")
|
||||
.and_then(|d| d.get("type"))
|
||||
.and_then(|v| v.as_str())
|
||||
== Some("assistant")
|
||||
})
|
||||
.count() as u64
|
||||
}
|
||||
|
||||
/// Scan running agents for turn/budget limit violations and terminate offenders.
|
||||
///
|
||||
/// Turns and budget are counted from the **current session's** log file
|
||||
/// only — prior sessions are excluded so that restart counts from earlier
|
||||
/// runs do not accumulate against the limits.
|
||||
pub(super) fn check_agent_limits(
|
||||
agents: &Mutex<HashMap<String, StoryAgent>>,
|
||||
project_root: &Path,
|
||||
) -> Vec<(String, TerminationReason)> {
|
||||
let config = match ProjectConfig::load(project_root) {
|
||||
Ok(c) => c,
|
||||
Err(_) => return Vec::new(),
|
||||
};
|
||||
|
||||
// Snapshot running agents: (key, story_id, agent_name, tx, log_session_id).
|
||||
let running: Vec<RunningAgentSnapshot> = {
|
||||
let lock = match agents.lock() {
|
||||
Ok(l) => l,
|
||||
Err(_) => return Vec::new(),
|
||||
};
|
||||
lock.iter()
|
||||
.filter(|(_, agent)| agent.status == AgentStatus::Running)
|
||||
.map(|(key, agent)| {
|
||||
let story_id = key
|
||||
.rsplit_once(':')
|
||||
.map(|(s, _)| s.to_string())
|
||||
.unwrap_or_else(|| key.clone());
|
||||
(
|
||||
key.clone(),
|
||||
story_id,
|
||||
agent.agent_name.clone(),
|
||||
agent.tx.clone(),
|
||||
agent.log_session_id.clone(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
let mut terminated = Vec::new();
|
||||
|
||||
for (key, story_id, agent_name, tx, log_session_id) in &running {
|
||||
let agent_config = config.agent.iter().find(|a| a.name == *agent_name);
|
||||
let max_turns = agent_config.and_then(|a| a.max_turns);
|
||||
let max_budget_usd = agent_config.and_then(|a| a.max_budget_usd);
|
||||
|
||||
// Skip agents with no limits configured.
|
||||
if max_turns.is_none() && max_budget_usd.is_none() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Resolve the current session's log file.
|
||||
let session_log = resolve_session_log(project_root, story_id, agent_name, log_session_id);
|
||||
|
||||
// Count turns from the current session's log file only.
|
||||
let turns_used: u64 = if max_turns.is_some() {
|
||||
session_log
|
||||
.as_ref()
|
||||
.map(|p| count_turns_in_log(p))
|
||||
.unwrap_or(0)
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
// Compute budget from the current session's log file only.
|
||||
// Completed-session records from token_usage.jsonl are NOT included
|
||||
// in the watchdog's enforcement — they belong to prior sessions.
|
||||
let budget_used_usd: f64 = if max_budget_usd.is_some() {
|
||||
session_log
|
||||
.as_ref()
|
||||
.map(|p| compute_budget_from_single_log(p))
|
||||
.unwrap_or(0.0)
|
||||
} else {
|
||||
0.0
|
||||
};
|
||||
|
||||
// Determine if a limit is exceeded.
|
||||
let reason = if let Some(max) = max_turns {
|
||||
if turns_used >= max as u64 {
|
||||
Some(TerminationReason::TurnLimit)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
.or(if let Some(max) = max_budget_usd {
|
||||
if budget_used_usd >= max {
|
||||
Some(TerminationReason::BudgetLimit)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
});
|
||||
|
||||
if let Some(reason) = reason {
|
||||
let reason_str = match &reason {
|
||||
TerminationReason::TurnLimit => {
|
||||
format!("turn limit exceeded ({turns_used}/{})", max_turns.unwrap())
|
||||
}
|
||||
TerminationReason::BudgetLimit => format!(
|
||||
"budget limit exceeded (${budget_used_usd:.2}/${:.2})",
|
||||
max_budget_usd.unwrap()
|
||||
),
|
||||
};
|
||||
|
||||
// Mark agent as Failed with termination reason.
|
||||
if let Ok(mut lock) = agents.lock()
|
||||
&& let Some(agent) = lock.get_mut(key)
|
||||
{
|
||||
agent.status = AgentStatus::Failed;
|
||||
agent.termination_reason = Some(reason.clone());
|
||||
}
|
||||
|
||||
slog!("[watchdog] Terminating agent '{key}': {reason_str}.");
|
||||
|
||||
let _ = tx.send(AgentEvent::Error {
|
||||
story_id: story_id.clone(),
|
||||
agent_name: agent_name.clone(),
|
||||
message: format!("Agent terminated by watchdog: {reason_str}"),
|
||||
});
|
||||
|
||||
terminated.push((key.clone(), reason));
|
||||
}
|
||||
}
|
||||
|
||||
terminated
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
//! Watchdog task: detects orphaned agents, enforces turn/budget limits, and
|
||||
//! triggers auto-assign.
|
||||
|
||||
mod budget;
|
||||
mod limits;
|
||||
mod orphan;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use crate::config::ProjectConfig;
|
||||
use crate::slog;
|
||||
|
||||
use super::super::AgentPool;
|
||||
use limits::check_agent_limits;
|
||||
use orphan::check_orphaned_agents;
|
||||
|
||||
pub(crate) use budget::{compute_budget_from_logs, compute_budget_from_single_log};
|
||||
pub(crate) use limits::{count_turns_in_log, resolve_session_log};
|
||||
|
||||
impl AgentPool {
|
||||
/// Run a single watchdog pass synchronously (test helper).
|
||||
#[cfg(test)]
|
||||
pub fn run_watchdog_once(&self) {
|
||||
check_orphaned_agents(&self.agents);
|
||||
}
|
||||
|
||||
/// Run one watchdog pass: detect orphans, enforce limits, kill offenders.
|
||||
///
|
||||
/// Called by the unified background tick loop every 30 ticks.
|
||||
///
|
||||
/// When a limit is exceeded the agent's PTY child is killed and the
|
||||
/// `should_block_story` retry mechanism is invoked. The story is marked
|
||||
/// `blocked: true` only when `retry_count >= max_retries`; otherwise
|
||||
/// `retry_count` is incremented and the story stays in `2_current/` for
|
||||
/// re-attempt. This prevents the original kill-respawn loop (bug 646)
|
||||
/// while restoring the `max_retries` semantic for turn/budget overruns.
|
||||
pub fn run_watchdog_pass(&self, project_root: Option<&Path>) -> usize {
|
||||
let orphaned = check_orphaned_agents(&self.agents);
|
||||
|
||||
if let Some(root) = project_root {
|
||||
let terminated = check_agent_limits(&self.agents, root);
|
||||
let config = ProjectConfig::load(root).unwrap_or_default();
|
||||
for (key, _reason) in &terminated {
|
||||
// Kill the PTY child and abort the task, same as stop_agent.
|
||||
self.kill_child_for_key(key);
|
||||
if let Ok(mut lock) = self.agents.lock()
|
||||
&& let Some(agent) = lock.get_mut(key)
|
||||
&& let Some(handle) = agent.task_handle.take()
|
||||
{
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
// Use the retry mechanism: increment retry_count and only block
|
||||
// when the limit is exceeded, matching the pipeline's behaviour.
|
||||
let story_id = key.rsplit_once(':').map(|(s, _)| s).unwrap_or(key);
|
||||
if let Some(block_reason) = super::super::pipeline::should_block_story(
|
||||
story_id,
|
||||
config.max_retries,
|
||||
"watchdog",
|
||||
) {
|
||||
let _ = self
|
||||
.watcher_tx
|
||||
.send(crate::io::watcher::WatcherEvent::StoryBlocked {
|
||||
story_id: story_id.to_string(),
|
||||
reason: block_reason,
|
||||
});
|
||||
slog!("[watchdog] Story '{story_id}' blocked after exceeding retry limit.");
|
||||
} else {
|
||||
slog!(
|
||||
"[watchdog] Story '{story_id}' retry incremented after limit \
|
||||
termination; stays in 2_current/ for re-attempt."
|
||||
);
|
||||
}
|
||||
}
|
||||
if !terminated.is_empty() {
|
||||
Self::notify_agent_state_changed(&self.watcher_tx);
|
||||
}
|
||||
return orphaned + terminated.len();
|
||||
}
|
||||
|
||||
orphaned
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
//! Orphan detection: marks running agents whose backing task has exited.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Mutex;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::agents::pool::StoryAgent;
|
||||
use crate::agents::{AgentEvent, AgentStatus};
|
||||
use crate::slog;
|
||||
|
||||
/// Scan the agent pool for Running entries whose backing tokio task has already
|
||||
/// finished and mark them as Failed.
|
||||
///
|
||||
/// This handles the case where the PTY read loop or the spawned task exits
|
||||
/// without updating the agent status — for example when the process is killed
|
||||
/// externally and the PTY master fd returns EOF before our inactivity timeout
|
||||
/// fires, but some other edge case prevents the normal cleanup path from running.
|
||||
pub(super) fn check_orphaned_agents(agents: &Mutex<HashMap<String, StoryAgent>>) -> usize {
|
||||
let mut lock = match agents.lock() {
|
||||
Ok(l) => l,
|
||||
Err(_) => return 0,
|
||||
};
|
||||
|
||||
// Collect orphaned entries: Running or Pending agents whose task handle is finished.
|
||||
// Pending agents can be orphaned if worktree creation panics before setting status.
|
||||
let orphaned: Vec<(String, String, broadcast::Sender<AgentEvent>, AgentStatus)> = lock
|
||||
.iter()
|
||||
.filter_map(|(key, agent)| {
|
||||
if matches!(agent.status, AgentStatus::Running | AgentStatus::Pending)
|
||||
&& let Some(handle) = &agent.task_handle
|
||||
&& handle.is_finished()
|
||||
{
|
||||
let story_id = key
|
||||
.rsplit_once(':')
|
||||
.map(|(s, _)| s.to_string())
|
||||
.unwrap_or_else(|| key.clone());
|
||||
return Some((
|
||||
key.clone(),
|
||||
story_id,
|
||||
agent.tx.clone(),
|
||||
agent.status.clone(),
|
||||
));
|
||||
}
|
||||
None
|
||||
})
|
||||
.collect();
|
||||
|
||||
let count = orphaned.len();
|
||||
for (key, story_id, tx, prev_status) in orphaned {
|
||||
if let Some(agent) = lock.get_mut(&key) {
|
||||
agent.status = AgentStatus::Failed;
|
||||
slog!(
|
||||
"[watchdog] Orphaned agent '{key}': task finished but status was {prev_status}. \
|
||||
Marking Failed."
|
||||
);
|
||||
let _ = tx.send(AgentEvent::Error {
|
||||
story_id,
|
||||
agent_name: agent.agent_name.clone(),
|
||||
message: "Agent process terminated unexpectedly (watchdog detected orphan)"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
count
|
||||
}
|
||||
@@ -0,0 +1,470 @@
|
||||
//! Limit-enforcement, kill-respawn, per-session counting, and retry tests
|
||||
//! for the watchdog (bugs 624, 646, 650).
|
||||
|
||||
use super::super::super::super::{AgentPool, composite_key};
|
||||
use super::{write_fake_budget_session_log, write_fake_session_log, write_project_config};
|
||||
use crate::agents::{AgentEvent, AgentStatus, TerminationReason};
|
||||
|
||||
// ── Limit enforcement integration tests (bug 624) ────────────────────────
|
||||
|
||||
#[test]
|
||||
fn watchdog_terminates_agent_exceeding_turn_limit() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
write_project_config(
|
||||
root,
|
||||
r#"
|
||||
[[agent]]
|
||||
name = "coder-1"
|
||||
runtime = "claude-code"
|
||||
max_turns = 10
|
||||
"#,
|
||||
);
|
||||
|
||||
// Write 12 turns in the current session (exceeds limit of 10).
|
||||
write_fake_session_log(root, "story_a", "coder-1", "sess-current", 12);
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
let tx = pool.inject_test_agent_with_session(
|
||||
"story_a",
|
||||
"coder-1",
|
||||
AgentStatus::Running,
|
||||
"sess-current",
|
||||
);
|
||||
let mut rx = tx.subscribe();
|
||||
|
||||
let found = pool.run_watchdog_pass(Some(root));
|
||||
assert!(found >= 1, "watchdog should detect the over-limit agent");
|
||||
|
||||
// Agent should now be Failed with TurnLimit reason.
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("story_a", "coder-1");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert_eq!(agent.status, AgentStatus::Failed);
|
||||
assert_eq!(
|
||||
agent.termination_reason,
|
||||
Some(TerminationReason::TurnLimit),
|
||||
"termination reason must be TurnLimit"
|
||||
);
|
||||
}
|
||||
|
||||
let event = rx.try_recv().expect("watchdog must emit an Error event");
|
||||
assert!(
|
||||
matches!(event, AgentEvent::Error { .. }),
|
||||
"expected AgentEvent::Error, got: {event:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn watchdog_terminates_agent_exceeding_budget_limit() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
write_project_config(
|
||||
root,
|
||||
r#"
|
||||
[[agent]]
|
||||
name = "coder-1"
|
||||
runtime = "claude-code"
|
||||
max_budget_usd = 5.00
|
||||
"#,
|
||||
);
|
||||
|
||||
// Write $6.00 in the current session's log (exceeds limit of $5.00).
|
||||
write_fake_budget_session_log(root, "story_b", "coder-1", "sess-budget", 6.00);
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
let tx = pool.inject_test_agent_with_session(
|
||||
"story_b",
|
||||
"coder-1",
|
||||
AgentStatus::Running,
|
||||
"sess-budget",
|
||||
);
|
||||
let mut rx = tx.subscribe();
|
||||
|
||||
let found = pool.run_watchdog_pass(Some(root));
|
||||
assert!(found >= 1, "watchdog should detect the over-budget agent");
|
||||
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("story_b", "coder-1");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert_eq!(agent.status, AgentStatus::Failed);
|
||||
assert_eq!(
|
||||
agent.termination_reason,
|
||||
Some(TerminationReason::BudgetLimit),
|
||||
"termination reason must be BudgetLimit"
|
||||
);
|
||||
}
|
||||
|
||||
let event = rx.try_recv().expect("watchdog must emit an Error event");
|
||||
assert!(matches!(event, AgentEvent::Error { .. }));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn watchdog_does_not_terminate_agent_under_limits() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
write_project_config(
|
||||
root,
|
||||
r#"
|
||||
[[agent]]
|
||||
name = "coder-1"
|
||||
runtime = "claude-code"
|
||||
max_turns = 50
|
||||
max_budget_usd = 10.00
|
||||
"#,
|
||||
);
|
||||
|
||||
// Agent is under both limits in the current session.
|
||||
write_fake_session_log(root, "story_c", "coder-1", "sess-ok", 25);
|
||||
write_fake_budget_session_log(root, "story_c", "coder-1", "sess-ok-budget", 3.00);
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
// Use the turns session (the budget session is a separate file;
|
||||
// resolve_session_log picks the specified one for turns, and
|
||||
// find_latest_log for budget if needed — but here the turns file
|
||||
// has 25 turns < 50 so no violation).
|
||||
pool.inject_test_agent_with_session("story_c", "coder-1", AgentStatus::Running, "sess-ok");
|
||||
|
||||
let found = pool.run_watchdog_pass(Some(root));
|
||||
assert_eq!(found, 0, "agent under limits should not be terminated");
|
||||
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("story_c", "coder-1");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert_eq!(
|
||||
agent.status,
|
||||
AgentStatus::Running,
|
||||
"agent under limits should stay Running"
|
||||
);
|
||||
assert!(agent.termination_reason.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
/// Regression test for the original bug 624 incident:
|
||||
/// coder-1 with max_turns=50, max_budget_usd=5.00 ran 5.6× over the turn
|
||||
/// limit (280 turns). The watchdog must terminate at the turn limit (turns
|
||||
/// hit first in the observed trace), with reason TurnLimit.
|
||||
#[test]
|
||||
fn regression_bug624_coder1_story623_trajectory() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
write_project_config(
|
||||
root,
|
||||
r#"
|
||||
[[agent]]
|
||||
name = "coder-1"
|
||||
runtime = "claude-code"
|
||||
max_turns = 50
|
||||
max_budget_usd = 5.00
|
||||
"#,
|
||||
);
|
||||
|
||||
// Simulate the trajectory: 55 turns in the current session (just past
|
||||
// the limit) and $2.50 spent (under budget). Turns hit first, so
|
||||
// reason should be TurnLimit.
|
||||
write_fake_session_log(root, "story_623", "coder-1", "sess-624", 55);
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
let tx = pool.inject_test_agent_with_session(
|
||||
"story_623",
|
||||
"coder-1",
|
||||
AgentStatus::Running,
|
||||
"sess-624",
|
||||
);
|
||||
let mut rx = tx.subscribe();
|
||||
|
||||
let found = pool.run_watchdog_pass(Some(root));
|
||||
assert!(found >= 1, "watchdog must catch the turn-limit violation");
|
||||
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("story_623", "coder-1");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert_eq!(agent.status, AgentStatus::Failed);
|
||||
assert_eq!(
|
||||
agent.termination_reason,
|
||||
Some(TerminationReason::TurnLimit),
|
||||
"turns hit first in the observed trace, so reason must be TurnLimit"
|
||||
);
|
||||
}
|
||||
|
||||
// The error event should have been emitted.
|
||||
let event = rx.try_recv().expect("watchdog must emit an Error event");
|
||||
if let AgentEvent::Error { message, .. } = &event {
|
||||
assert!(
|
||||
message.contains("turn limit"),
|
||||
"error message should mention turn limit, got: {message}"
|
||||
);
|
||||
} else {
|
||||
panic!("expected AgentEvent::Error, got: {event:?}");
|
||||
}
|
||||
}
|
||||
|
||||
// ── Kill-respawn loop fix (bug 646), updated for per-session + retry ───
|
||||
|
||||
/// When the watchdog terminates an agent for limit-exceeded AND the story
|
||||
/// has exhausted its retries, it must be marked `blocked: true` in CRDT
|
||||
/// state so `auto_assign_available_work` won't re-spawn the agent.
|
||||
///
|
||||
/// This test seeds a single session that legitimately exceeds the limit
|
||||
/// and uses `max_retries = 1` so that the first violation blocks.
|
||||
#[test]
|
||||
fn watchdog_marks_story_blocked_after_limit_termination() {
|
||||
crate::db::ensure_content_store();
|
||||
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
write_project_config(
|
||||
root,
|
||||
r#"
|
||||
max_retries = 1
|
||||
|
||||
[[agent]]
|
||||
name = "coder-1"
|
||||
runtime = "claude-code"
|
||||
max_turns = 10
|
||||
"#,
|
||||
);
|
||||
|
||||
// Write story content into the CRDT-backed content store so the
|
||||
// watchdog's retry/block path has something to read+update.
|
||||
let story_id = "42_story_runaway";
|
||||
let initial = "---\nname: Runaway Story\n---\n# Runaway Story\n";
|
||||
crate::db::write_content(story_id, initial);
|
||||
|
||||
// 12 turns in a single session exceeds the configured max of 10.
|
||||
write_fake_session_log(root, story_id, "coder-1", "sess-runaway", 12);
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
let _tx = pool.inject_test_agent_with_session(
|
||||
story_id,
|
||||
"coder-1",
|
||||
AgentStatus::Running,
|
||||
"sess-runaway",
|
||||
);
|
||||
|
||||
let found = pool.run_watchdog_pass(Some(root));
|
||||
assert!(found >= 1, "watchdog should detect the over-limit agent");
|
||||
|
||||
// With max_retries=1, the first violation blocks immediately.
|
||||
let updated = crate::db::read_content(story_id)
|
||||
.expect("story content must still exist after watchdog termination");
|
||||
assert!(
|
||||
updated.contains("blocked: true"),
|
||||
"story must be marked `blocked: true` after limit termination with max_retries=1 — got:\n{updated}"
|
||||
);
|
||||
|
||||
// Sanity: the agent itself is also Failed with the right reason.
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key(story_id, "coder-1");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert_eq!(agent.status, AgentStatus::Failed);
|
||||
assert_eq!(
|
||||
agent.termination_reason,
|
||||
Some(TerminationReason::TurnLimit),
|
||||
"termination reason must be TurnLimit"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Per-session counting (bug 650) ──────────────────────────────────────
|
||||
|
||||
/// Seed multiple prior session log files whose combined assistant-event
|
||||
/// count exceeds `max_turns`. Then inject a NEW running agent with a
|
||||
/// fresh session_id whose log has fewer events than `max_turns`.
|
||||
/// Assert the agent is NOT terminated (per-session count is under the
|
||||
/// limit) AND the story is NOT marked blocked.
|
||||
#[test]
|
||||
fn per_session_counting_does_not_terminate_under_limit() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
write_project_config(
|
||||
root,
|
||||
r#"
|
||||
[[agent]]
|
||||
name = "coder-1"
|
||||
runtime = "claude-code"
|
||||
max_turns = 10
|
||||
"#,
|
||||
);
|
||||
|
||||
// 3 prior sessions with 5 turns each = 15 total (above max_turns=10).
|
||||
write_fake_session_log(root, "story_d", "coder-1", "old-sess-1", 5);
|
||||
write_fake_session_log(root, "story_d", "coder-1", "old-sess-2", 5);
|
||||
write_fake_session_log(root, "story_d", "coder-1", "old-sess-3", 5);
|
||||
|
||||
// New running session has only 3 turns (under limit of 10).
|
||||
write_fake_session_log(root, "story_d", "coder-1", "new-sess", 3);
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
pool.inject_test_agent_with_session("story_d", "coder-1", AgentStatus::Running, "new-sess");
|
||||
|
||||
let found = pool.run_watchdog_pass(Some(root));
|
||||
assert_eq!(
|
||||
found, 0,
|
||||
"agent under per-session limit should NOT be terminated"
|
||||
);
|
||||
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("story_d", "coder-1");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert_eq!(
|
||||
agent.status,
|
||||
AgentStatus::Running,
|
||||
"agent under per-session limit should stay Running"
|
||||
);
|
||||
assert!(agent.termination_reason.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
/// Same setup as per_session_counting_does_not_terminate_under_limit, but
|
||||
/// the new agent's own session log exceeds `max_turns`. Assert the agent
|
||||
/// IS terminated AND (with max_retries=1) the story IS marked blocked.
|
||||
#[test]
|
||||
fn per_session_counting_terminates_over_limit() {
|
||||
crate::db::ensure_content_store();
|
||||
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
write_project_config(
|
||||
root,
|
||||
r#"
|
||||
max_retries = 1
|
||||
|
||||
[[agent]]
|
||||
name = "coder-1"
|
||||
runtime = "claude-code"
|
||||
max_turns = 10
|
||||
"#,
|
||||
);
|
||||
|
||||
let story_id = "story_e_per_session";
|
||||
crate::db::write_content(story_id, "---\nname: Per-Session Test\n---\n");
|
||||
|
||||
// Prior session with 5 turns (under limit alone).
|
||||
write_fake_session_log(root, story_id, "coder-1", "old-sess", 5);
|
||||
|
||||
// Current session with 12 turns (over limit).
|
||||
write_fake_session_log(root, story_id, "coder-1", "new-sess", 12);
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
let tx =
|
||||
pool.inject_test_agent_with_session(story_id, "coder-1", AgentStatus::Running, "new-sess");
|
||||
let mut rx = tx.subscribe();
|
||||
|
||||
let found = pool.run_watchdog_pass(Some(root));
|
||||
assert!(
|
||||
found >= 1,
|
||||
"agent over per-session limit must be terminated"
|
||||
);
|
||||
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key(story_id, "coder-1");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert_eq!(agent.status, AgentStatus::Failed);
|
||||
assert_eq!(agent.termination_reason, Some(TerminationReason::TurnLimit),);
|
||||
}
|
||||
|
||||
let event = rx.try_recv().expect("watchdog must emit an Error event");
|
||||
assert!(matches!(event, AgentEvent::Error { .. }));
|
||||
|
||||
// With max_retries=1, the story is blocked.
|
||||
let updated = crate::db::read_content(story_id).unwrap();
|
||||
assert!(
|
||||
updated.contains("blocked: true"),
|
||||
"story must be blocked after per-session overrun with max_retries=1"
|
||||
);
|
||||
}
|
||||
|
||||
// ── Retry semantic integration test (bug 650) ───────────────────────────
|
||||
|
||||
/// With `max_retries = 3`, simulate separate sessions each exceeding
|
||||
/// `max_turns`. After session 1: retry_count=1, NOT blocked. After
|
||||
/// session 2: retry_count=2, NOT blocked. After session 3:
|
||||
/// retry_count=3 >= max_retries, story IS blocked.
|
||||
#[test]
|
||||
fn watchdog_retry_semantic_blocks_after_max_retries() {
|
||||
crate::db::ensure_content_store();
|
||||
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
write_project_config(
|
||||
root,
|
||||
r#"
|
||||
max_retries = 3
|
||||
|
||||
[[agent]]
|
||||
name = "coder-1"
|
||||
runtime = "claude-code"
|
||||
max_turns = 10
|
||||
"#,
|
||||
);
|
||||
|
||||
let story_id = "88_story_retry_watchdog";
|
||||
let initial = "---\nname: Retry Test\n---\n";
|
||||
crate::db::write_content(story_id, initial);
|
||||
|
||||
// Session 1: exceeds limit → retry_count=1, NOT blocked.
|
||||
{
|
||||
write_fake_session_log(root, story_id, "coder-1", "session-1", 12);
|
||||
let pool = AgentPool::new_test(3001);
|
||||
pool.inject_test_agent_with_session(story_id, "coder-1", AgentStatus::Running, "session-1");
|
||||
pool.run_watchdog_pass(Some(root));
|
||||
|
||||
let content = crate::db::read_content(story_id).unwrap();
|
||||
assert!(
|
||||
content.contains("retry_count: 1"),
|
||||
"after session 1, retry_count should be 1 — got:\n{content}"
|
||||
);
|
||||
assert!(
|
||||
!content.contains("blocked: true"),
|
||||
"story should NOT be blocked after session 1"
|
||||
);
|
||||
}
|
||||
|
||||
// Session 2: exceeds limit → retry_count=2, NOT blocked.
|
||||
{
|
||||
write_fake_session_log(root, story_id, "coder-1", "session-2", 12);
|
||||
let pool = AgentPool::new_test(3001);
|
||||
pool.inject_test_agent_with_session(story_id, "coder-1", AgentStatus::Running, "session-2");
|
||||
pool.run_watchdog_pass(Some(root));
|
||||
|
||||
let content = crate::db::read_content(story_id).unwrap();
|
||||
assert!(
|
||||
content.contains("retry_count: 2"),
|
||||
"after session 2, retry_count should be 2 — got:\n{content}"
|
||||
);
|
||||
assert!(
|
||||
!content.contains("blocked: true"),
|
||||
"story should NOT be blocked after session 2"
|
||||
);
|
||||
}
|
||||
|
||||
// Session 3: exceeds limit → retry_count=3 >= max_retries(3), IS blocked.
|
||||
{
|
||||
write_fake_session_log(root, story_id, "coder-1", "session-3", 12);
|
||||
let pool = AgentPool::new_test(3001);
|
||||
pool.inject_test_agent_with_session(story_id, "coder-1", AgentStatus::Running, "session-3");
|
||||
pool.run_watchdog_pass(Some(root));
|
||||
|
||||
let content = crate::db::read_content(story_id).unwrap();
|
||||
assert!(
|
||||
content.contains("blocked: true"),
|
||||
"story must be blocked after session 3 (retry_count=3 >= max_retries=3) — got:\n{content}"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,70 @@
|
||||
//! Shared test helpers for the watchdog module.
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
mod limits_tests;
|
||||
mod orphan_tests;
|
||||
|
||||
/// Write a fake session log file with `n` assistant turn entries.
|
||||
///
|
||||
/// The file is named `{agent_name}-{session_id}.log` to match the
|
||||
/// real naming convention used by `AgentLogWriter`.
|
||||
pub(super) fn write_fake_session_log(
|
||||
project_root: &Path,
|
||||
story_id: &str,
|
||||
agent_name: &str,
|
||||
session_id: &str,
|
||||
n_turns: u64,
|
||||
) {
|
||||
let log_dir = project_root.join(".huskies").join("logs").join(story_id);
|
||||
std::fs::create_dir_all(&log_dir).unwrap();
|
||||
let log_path = log_dir.join(format!("{agent_name}-{session_id}.log"));
|
||||
let mut content = String::new();
|
||||
for _ in 0..n_turns {
|
||||
content.push_str(
|
||||
&serde_json::to_string(&serde_json::json!({
|
||||
"timestamp": "2026-04-25T00:00:00Z",
|
||||
"type": "agent_json",
|
||||
"story_id": story_id,
|
||||
"agent_name": agent_name,
|
||||
"data": { "type": "assistant", "message": {} }
|
||||
}))
|
||||
.unwrap(),
|
||||
);
|
||||
content.push('\n');
|
||||
}
|
||||
std::fs::write(log_path, content).unwrap();
|
||||
}
|
||||
|
||||
/// Write a fake session log containing a `result` event with the given cost.
|
||||
///
|
||||
/// Used to test budget enforcement via the watchdog's per-session log
|
||||
/// reading (not `token_usage.jsonl`).
|
||||
pub(super) fn write_fake_budget_session_log(
|
||||
project_root: &Path,
|
||||
story_id: &str,
|
||||
agent_name: &str,
|
||||
session_id: &str,
|
||||
cost_usd: f64,
|
||||
) {
|
||||
let log_dir = project_root.join(".huskies").join("logs").join(story_id);
|
||||
std::fs::create_dir_all(&log_dir).unwrap();
|
||||
let log_path = log_dir.join(format!("{agent_name}-{session_id}.log"));
|
||||
let content = serde_json::to_string(&serde_json::json!({
|
||||
"timestamp": "2026-04-25T00:00:00Z",
|
||||
"type": "agent_json",
|
||||
"story_id": story_id,
|
||||
"agent_name": agent_name,
|
||||
"data": { "type": "result", "total_cost_usd": cost_usd }
|
||||
}))
|
||||
.unwrap()
|
||||
+ "\n";
|
||||
std::fs::write(log_path, content).unwrap();
|
||||
}
|
||||
|
||||
/// Write a minimal project.toml with the given agent config.
|
||||
pub(super) fn write_project_config(project_root: &Path, config_toml: &str) {
|
||||
let huskies_dir = project_root.join(".huskies");
|
||||
std::fs::create_dir_all(&huskies_dir).unwrap();
|
||||
std::fs::write(huskies_dir.join("project.toml"), config_toml).unwrap();
|
||||
}
|
||||
@@ -0,0 +1,112 @@
|
||||
//! Orphan-detection tests for the watchdog (bug 161).
|
||||
|
||||
use super::super::super::super::{AgentPool, composite_key};
|
||||
use super::super::orphan::check_orphaned_agents;
|
||||
use crate::agents::{AgentEvent, AgentStatus};
|
||||
|
||||
// ── check_orphaned_agents return value tests (bug 161) ──────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn check_orphaned_agents_returns_count_of_orphaned_agents() {
|
||||
let pool = AgentPool::new_test(3001);
|
||||
|
||||
// Spawn two tasks that finish immediately.
|
||||
let h1 = tokio::spawn(async {});
|
||||
let h2 = tokio::spawn(async {});
|
||||
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
||||
assert!(h1.is_finished());
|
||||
assert!(h2.is_finished());
|
||||
|
||||
pool.inject_test_agent_with_handle("story_a", "coder", AgentStatus::Running, h1);
|
||||
pool.inject_test_agent_with_handle("story_b", "coder", AgentStatus::Running, h2);
|
||||
|
||||
let found = check_orphaned_agents(&pool.agents);
|
||||
assert_eq!(found, 2, "should detect both orphaned agents");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn check_orphaned_agents_returns_zero_when_no_orphans() {
|
||||
let pool = AgentPool::new_test(3001);
|
||||
// Inject agents in terminal states — not orphaned.
|
||||
pool.inject_test_agent("story_a", "coder", AgentStatus::Completed);
|
||||
pool.inject_test_agent("story_b", "qa", AgentStatus::Failed);
|
||||
|
||||
let found = check_orphaned_agents(&pool.agents);
|
||||
assert_eq!(
|
||||
found, 0,
|
||||
"no orphans should be detected for terminal agents"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn watchdog_detects_orphaned_running_agent() {
|
||||
let pool = AgentPool::new_test(3001);
|
||||
|
||||
let handle = tokio::spawn(async {});
|
||||
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
||||
assert!(
|
||||
handle.is_finished(),
|
||||
"task should be finished before injection"
|
||||
);
|
||||
|
||||
let tx =
|
||||
pool.inject_test_agent_with_handle("orphan_story", "coder", AgentStatus::Running, handle);
|
||||
let mut rx = tx.subscribe();
|
||||
|
||||
pool.run_watchdog_once();
|
||||
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("orphan_story", "coder");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert_eq!(
|
||||
agent.status,
|
||||
AgentStatus::Failed,
|
||||
"watchdog must mark an orphaned Running agent as Failed"
|
||||
);
|
||||
}
|
||||
|
||||
let event = rx.try_recv().expect("watchdog must emit an Error event");
|
||||
assert!(
|
||||
matches!(event, AgentEvent::Error { .. }),
|
||||
"expected AgentEvent::Error, got: {event:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn watchdog_orphan_detection_returns_nonzero_enabling_auto_assign() {
|
||||
// This test verifies the contract that `check_orphaned_agents` returns
|
||||
// a non-zero count when orphans exist, which the watchdog uses to
|
||||
// decide whether to trigger auto-assign (bug 161).
|
||||
let pool = AgentPool::new_test(3001);
|
||||
|
||||
let handle = tokio::spawn(async {});
|
||||
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
||||
|
||||
pool.inject_test_agent_with_handle("orphan_story", "coder", AgentStatus::Running, handle);
|
||||
|
||||
// Before watchdog: agent is Running.
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("orphan_story", "coder");
|
||||
assert_eq!(agents.get(&key).unwrap().status, AgentStatus::Running);
|
||||
}
|
||||
|
||||
// Run watchdog pass — should return 1 (orphan found).
|
||||
let found = check_orphaned_agents(&pool.agents);
|
||||
assert_eq!(
|
||||
found, 1,
|
||||
"watchdog must return 1 for a single orphaned agent"
|
||||
);
|
||||
|
||||
// After watchdog: agent is Failed.
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("orphan_story", "coder");
|
||||
assert_eq!(
|
||||
agents.get(&key).unwrap().status,
|
||||
AgentStatus::Failed,
|
||||
"orphaned agent must be marked Failed"
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user