fe9804b32c
Adds `crate::process_kill` — reliable SIGKILL-with-verify primitives used
across the server in place of the various ad-hoc kill paths that ignored
their kill-effective return values. The module exposes three pieces:
- `sigkill_pids_and_verify(pids)`: SIGKILL each pid and block (up to 2s)
until every pid is verified gone. Returns survivors if not.
- `pids_matching(pattern)`: pgrep -f wrapper.
- `descendant_pids(root)`: recursive pgrep -P walker for process trees.
Wires the watchdog's limit-termination path through it, and reorders the
protocol to fix the duplicate-coder bug observed on story 1086 (2026-05-15):
Before: check_agent_limits set status=Failed before the kill ran. The
kill itself was `portable_pty::ChildKiller::kill()`, which sends SIGHUP
on Unix — claude-code ignores SIGHUP, so the process kept running while
the agent record was already marked terminated. The idempotency check
in `start_agent` whitelists Running/Pending, so the next auto-assign
pass spawned a fresh agent alongside the still-alive prior one. Two
claude PIDs sharing one session_id, racing on the same worktree.
After: status update is moved OUT of check_agent_limits and into the
caller AFTER the kill is verified. The kill itself is now SIGKILL-the-
process-tree-in-the-worktree, with explicit verification that every pid
is gone. The idempotency window is closed.
The existing watchdog test suite (14 tests) still passes; 7 new tests
cover the process_kill primitives directly.
`agents/pool/process.rs`'s `kill_all_children` and `kill_child_for_key`
still use the old portable_pty SIGHUP path — they have the same bug but
in lower-impact code paths (shutdown, operator stop). They will be
migrated under a separate story to keep this commit focused.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
213 lines
7.6 KiB
Rust
213 lines
7.6 KiB
Rust
//! Limit enforcement: checks agent turn/budget limits and terminates offenders.
|
||
|
||
use std::collections::HashMap;
|
||
use std::path::Path;
|
||
use std::sync::Mutex;
|
||
use tokio::sync::broadcast;
|
||
|
||
use crate::agents::pool::StoryAgent;
|
||
use crate::agents::{AgentEvent, AgentStatus, TerminationReason};
|
||
use crate::config::ProjectConfig;
|
||
use crate::slog;
|
||
|
||
use super::budget::compute_budget_from_single_log;
|
||
|
||
/// Snapshot of a running agent for limit checking: (composite_key, story_id, agent_name, event_tx, log_session_id).
|
||
type RunningAgentSnapshot = (
|
||
String,
|
||
String,
|
||
String,
|
||
broadcast::Sender<AgentEvent>,
|
||
Option<String>,
|
||
);
|
||
|
||
/// Resolve the current session's log file for an agent.
|
||
///
|
||
/// Uses `log_session_id` to construct the exact path when available;
|
||
/// otherwise falls back to the most recently modified log file for the
|
||
/// agent.
|
||
pub(crate) fn resolve_session_log(
|
||
project_root: &Path,
|
||
story_id: &str,
|
||
agent_name: &str,
|
||
log_session_id: &Option<String>,
|
||
) -> Option<std::path::PathBuf> {
|
||
if let Some(sid) = log_session_id {
|
||
let path = crate::agent_log::log_file_path(project_root, story_id, agent_name, sid);
|
||
if path.exists() {
|
||
return Some(path);
|
||
}
|
||
}
|
||
crate::agent_log::find_latest_log(project_root, story_id, agent_name)
|
||
}
|
||
|
||
/// Count **tool-using** assistant turns in a single log file (story 923).
|
||
///
|
||
/// A turn counts only if its assistant message contains at least one
|
||
/// `content` block with `type == "tool_use"`. Narration-only turns
|
||
/// (text-only assistant messages such as "I'll read X next" preambles)
|
||
/// don't count against the watchdog limit — they make no progress, only
|
||
/// burn budget, and Sonnet emits them at roughly 30–55% of all turns.
|
||
pub(crate) fn count_turns_in_log(path: &Path) -> u64 {
|
||
let entries = match crate::agent_log::read_log(path) {
|
||
Ok(e) => e,
|
||
Err(_) => return 0,
|
||
};
|
||
entries
|
||
.iter()
|
||
.filter(|entry| {
|
||
if entry.event.get("type").and_then(|v| v.as_str()) != Some("agent_json") {
|
||
return false;
|
||
}
|
||
let data = match entry.event.get("data") {
|
||
Some(d) => d,
|
||
None => return false,
|
||
};
|
||
if data.get("type").and_then(|v| v.as_str()) != Some("assistant") {
|
||
return false;
|
||
}
|
||
// Require at least one tool_use content block.
|
||
data.pointer("/message/content")
|
||
.and_then(|c| c.as_array())
|
||
.map(|arr| {
|
||
arr.iter()
|
||
.any(|item| item.get("type").and_then(|v| v.as_str()) == Some("tool_use"))
|
||
})
|
||
.unwrap_or(false)
|
||
})
|
||
.count() as u64
|
||
}
|
||
|
||
/// Scan running agents for turn/budget limit violations and terminate offenders.
|
||
///
|
||
/// Turns and budget are counted from the **current session's** log file
|
||
/// only — prior sessions are excluded so that restart counts from earlier
|
||
/// runs do not accumulate against the limits.
|
||
pub(super) fn check_agent_limits(
|
||
agents: &Mutex<HashMap<String, StoryAgent>>,
|
||
project_root: &Path,
|
||
) -> Vec<(String, TerminationReason)> {
|
||
let config = match ProjectConfig::load(project_root) {
|
||
Ok(c) => c,
|
||
Err(_) => return Vec::new(),
|
||
};
|
||
|
||
// Snapshot running agents: (key, story_id, agent_name, tx, log_session_id).
|
||
let running: Vec<RunningAgentSnapshot> = {
|
||
let lock = match agents.lock() {
|
||
Ok(l) => l,
|
||
Err(_) => return Vec::new(),
|
||
};
|
||
lock.iter()
|
||
.filter(|(_, agent)| agent.status == AgentStatus::Running)
|
||
.map(|(key, agent)| {
|
||
let story_id = key
|
||
.rsplit_once(':')
|
||
.map(|(s, _)| s.to_string())
|
||
.unwrap_or_else(|| key.clone());
|
||
(
|
||
key.clone(),
|
||
story_id,
|
||
agent.agent_name.clone(),
|
||
agent.tx.clone(),
|
||
agent.log_session_id.clone(),
|
||
)
|
||
})
|
||
.collect()
|
||
};
|
||
|
||
let mut terminated = Vec::new();
|
||
|
||
for (key, story_id, agent_name, tx, log_session_id) in &running {
|
||
let agent_config = config.agent.iter().find(|a| a.name == *agent_name);
|
||
// The watchdog gates on max_tool_turns (counts only tool-using
|
||
// assistant turns) when set; otherwise falls back to max_turns for
|
||
// backwards compatibility with configs that haven't migrated yet.
|
||
let max_turns = agent_config.and_then(|a| a.max_tool_turns.or(a.max_turns));
|
||
let max_budget_usd = agent_config.and_then(|a| a.max_budget_usd);
|
||
|
||
// Skip agents with no limits configured.
|
||
if max_turns.is_none() && max_budget_usd.is_none() {
|
||
continue;
|
||
}
|
||
|
||
// Resolve the current session's log file.
|
||
let session_log = resolve_session_log(project_root, story_id, agent_name, log_session_id);
|
||
|
||
// Count turns from the current session's log file only.
|
||
let turns_used: u64 = if max_turns.is_some() {
|
||
session_log
|
||
.as_ref()
|
||
.map(|p| count_turns_in_log(p))
|
||
.unwrap_or(0)
|
||
} else {
|
||
0
|
||
};
|
||
|
||
// Compute budget from the current session's log file only.
|
||
// Completed-session records from token_usage.jsonl are NOT included
|
||
// in the watchdog's enforcement — they belong to prior sessions.
|
||
let budget_used_usd: f64 = if max_budget_usd.is_some() {
|
||
session_log
|
||
.as_ref()
|
||
.map(|p| compute_budget_from_single_log(p))
|
||
.unwrap_or(0.0)
|
||
} else {
|
||
0.0
|
||
};
|
||
|
||
// Determine if a limit is exceeded.
|
||
let reason = if let Some(max) = max_turns {
|
||
if turns_used >= max as u64 {
|
||
Some(TerminationReason::TurnLimit)
|
||
} else {
|
||
None
|
||
}
|
||
} else {
|
||
None
|
||
}
|
||
.or(if let Some(max) = max_budget_usd {
|
||
if budget_used_usd >= max {
|
||
Some(TerminationReason::BudgetLimit)
|
||
} else {
|
||
None
|
||
}
|
||
} else {
|
||
None
|
||
});
|
||
|
||
if let Some(reason) = reason {
|
||
let reason_str = match &reason {
|
||
TerminationReason::TurnLimit => {
|
||
format!("turn limit exceeded ({turns_used}/{})", max_turns.unwrap())
|
||
}
|
||
TerminationReason::BudgetLimit => format!(
|
||
"budget limit exceeded (${budget_used_usd:.2}/${:.2})",
|
||
max_budget_usd.unwrap()
|
||
),
|
||
};
|
||
|
||
// NOTE: agent status is intentionally NOT updated here. Setting
|
||
// `status = Failed` before the kill (the previous behaviour)
|
||
// opened a window where the `start_agent` idempotency check
|
||
// (which whitelists Running/Pending) would let a fresh spawn
|
||
// through while the prior PTY child was still alive — directly
|
||
// causing the concurrent-agents bug we hit on story 1086
|
||
// (2026-05-15). The caller (`run_watchdog_pass`) is responsible
|
||
// for: (1) verifying the kill, (2) THEN updating the agent record.
|
||
|
||
slog!("[watchdog] Terminating agent '{key}': {reason_str}.");
|
||
|
||
let _ = tx.send(AgentEvent::Error {
|
||
story_id: story_id.clone(),
|
||
agent_name: agent_name.clone(),
|
||
message: format!("Agent terminated by watchdog: {reason_str}"),
|
||
});
|
||
|
||
terminated.push((key.clone(), reason));
|
||
}
|
||
}
|
||
|
||
terminated
|
||
}
|