diff --git a/server/src/agents/pool/auto_assign/watchdog/limits.rs b/server/src/agents/pool/auto_assign/watchdog/limits.rs index 3de925dc..b741d1b0 100644 --- a/server/src/agents/pool/auto_assign/watchdog/limits.rs +++ b/server/src/agents/pool/auto_assign/watchdog/limits.rs @@ -187,13 +187,14 @@ pub(super) fn check_agent_limits( ), }; - // Mark agent as Failed with termination reason. - if let Ok(mut lock) = agents.lock() - && let Some(agent) = lock.get_mut(key) - { - agent.status = AgentStatus::Failed; - agent.termination_reason = Some(reason.clone()); - } + // NOTE: agent status is intentionally NOT updated here. Setting + // `status = Failed` before the kill (the previous behaviour) + // opened a window where the `start_agent` idempotency check + // (which whitelists Running/Pending) would let a fresh spawn + // through while the prior PTY child was still alive — directly + // causing the concurrent-agents bug we hit on story 1086 + // (2026-05-15). The caller (`run_watchdog_pass`) is responsible + // for: (1) verifying the kill, (2) THEN updating the agent record. slog!("[watchdog] Terminating agent '{key}': {reason_str}."); diff --git a/server/src/agents/pool/auto_assign/watchdog/mod.rs b/server/src/agents/pool/auto_assign/watchdog/mod.rs index 3dfd405a..e04b5b9e 100644 --- a/server/src/agents/pool/auto_assign/watchdog/mod.rs +++ b/server/src/agents/pool/auto_assign/watchdog/mod.rs @@ -9,8 +9,11 @@ mod tests; use std::path::Path; +use crate::agents::AgentStatus; use crate::config::ProjectConfig; +use crate::process_kill::{pids_matching, sigkill_pids_and_verify}; use crate::slog; +use crate::slog_warn; use super::super::AgentPool; use limits::check_agent_limits; @@ -42,14 +45,70 @@ impl AgentPool { if let Some(root) = project_root { let terminated = check_agent_limits(&self.agents, root); let config = ProjectConfig::load(root).unwrap_or_default(); - for (key, _reason) in &terminated { - // Kill the PTY child and abort the task, same as stop_agent. - self.kill_child_for_key(key); + for (key, reason) in &terminated { + // Step 1: snapshot the agent's worktree path so we can find every + // process running in it (claude + any subprocesses). This must + // happen BEFORE we mutate the agent record so we can read the + // worktree info safely. + let worktree_path = self.agents.lock().ok().and_then(|lock| { + lock.get(key) + .and_then(|a| a.worktree_info.as_ref().map(|wt| wt.path.clone())) + }); + + // Step 2: SIGKILL every process running in the worktree and + // BLOCK until verified gone. The previous mechanism — portable_pty's + // `ChildKiller::kill()` — sends SIGHUP, which claude-code + // ignores, leaving the process alive while the agent record + // was being marked terminated; that gap let a fresh spawn race + // in alongside the surviving one. SIGKILL is uncatchable; + // [`sigkill_pids_and_verify`] only returns once the kernel has + // reaped each pid. + if let Some(wt_path) = worktree_path.as_ref() { + let pids = pids_matching(&wt_path.display().to_string()); + if pids.is_empty() { + // Nothing in this worktree — agent likely already + // exited on its own before the watchdog noticed. + } else { + match sigkill_pids_and_verify(&pids) { + Ok(n) => slog!( + "[watchdog] SIGKILL'd {n} process(es) in worktree {} for '{key}'.", + wt_path.display() + ), + Err(survivors) => slog_warn!( + "[watchdog] SIGKILL incomplete for '{key}': pids still alive: {survivors:?}. \ + Proceeding with cleanup; concurrent spawn protection may be weakened." + ), + } + } + } else { + slog_warn!( + "[watchdog] No worktree path recorded for '{key}'; cannot tree-kill, \ + falling back to portable_pty SIGHUP (likely no-op for claude-code)." + ); + self.kill_child_for_key(key); + } + + // Step 3: NOW update the agent record. The process is verified + // gone (or we logged that SIGKILL didn't take effect, which is + // exceptional), so flipping status away from Running can no + // longer open a window for a concurrent spawn. if let Ok(mut lock) = self.agents.lock() && let Some(agent) = lock.get_mut(key) - && let Some(handle) = agent.task_handle.take() { - handle.abort(); + agent.status = AgentStatus::Failed; + agent.termination_reason = Some(reason.clone()); + if let Some(handle) = agent.task_handle.take() { + // Best-effort abort of the outer tokio task. The PTY + // blocking thread already returned (claude is dead), + // so this is bookkeeping rather than load-bearing. + handle.abort(); + } + } + + // Step 4: drop the (now-stale) child_killers entry — the + // process it pointed at is gone. + if let Ok(mut killers) = self.child_killers.lock() { + killers.remove(key); } // Use the retry mechanism: increment retry_count and only block diff --git a/server/src/agents/pool/process.rs b/server/src/agents/pool/process.rs index d36064b6..a3f7ba2b 100644 --- a/server/src/agents/pool/process.rs +++ b/server/src/agents/pool/process.rs @@ -1,4 +1,11 @@ //! Process management — kills orphaned PTY child processes on server shutdown. +//! +//! See [`crate::process_kill`] for the general process-termination primitives +//! this module's existing methods (`kill_all_children`, `kill_child_for_key`) +//! should eventually be migrated to. Those methods currently use +//! `portable_pty::ChildKiller::kill()`, which sends `SIGHUP` — a signal +//! claude-code ignores — so they leave orphans on every shutdown/stop. The +//! migration is tracked in a separate story to keep its diff focused. use crate::slog; use super::AgentPool; diff --git a/server/src/main.rs b/server/src/main.rs index 919924ae..69021fa6 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -33,6 +33,8 @@ pub mod mesh; /// Node identity — Ed25519 keypair generation and stable node ID management. pub mod node_identity; pub(crate) mod pipeline_state; +/// Reliable process-termination primitives shared across the server. +pub mod process_kill; /// Rebuild — process restart and shutdown coordination. pub mod rebuild; mod service; diff --git a/server/src/process_kill.rs b/server/src/process_kill.rs new file mode 100644 index 00000000..261b43fa --- /dev/null +++ b/server/src/process_kill.rs @@ -0,0 +1,322 @@ +//! Reliable process-termination primitives. +//! +//! The huskies server kills child processes in several distinct places: +//! the watchdog terminates agents that have exceeded turn/budget limits, +//! `stop_agent` terminates on operator request, `kill_all_children` runs at +//! server shutdown, the merge-gate completion path kills stale `cargo` +//! processes, and `script/local-release` tears down the gateway during a +//! redeploy. Every one of these used to send a signal that the target was +//! free to ignore (most commonly `portable_pty`'s `SIGHUP`), with no +//! verification that the process actually exited. Agents and bots that +//! ignore `SIGHUP` survived the "kill", which produced concurrent claude +//! processes on the same story — directly the duplicate-spawn bug we hit on +//! 2026-05-15. +//! +//! This module provides one trustworthy way to kill processes: SIGKILL with +//! verification. Build a pid set with the helpers in this module (or your +//! own), then hand it to [`sigkill_pids_and_verify`]. +//! +//! All functions on this module are deliberately Unix-only — huskies runs in +//! Linux containers and macOS dev hosts, both POSIX. + +use crate::slog_warn; + +/// Maximum time we'll wait for SIGKILL'd processes to disappear before +/// declaring failure. SIGKILL is uncatchable, so the kernel normally +/// reaps within tens of milliseconds; anything past 2 s indicates the +/// process is wedged in uninterruptible IO (e.g. waiting on a frozen NFS +/// mount). Caller can decide whether to proceed despite survivors. +const KILL_VERIFY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2); + +/// Polling interval while waiting for processes to disappear. 100 ms is +/// fine-grained enough that the typical few-ms reap latency is barely +/// observable, but coarse enough that we don't burn CPU spinning. +const KILL_VERIFY_POLL: std::time::Duration = std::time::Duration::from_millis(100); + +/// SIGKILL every pid in `pids`, then poll until all of them are gone. +/// +/// Returns `Ok(n)` where `n == pids.len()` when every pid is verified +/// reaped within [`KILL_VERIFY_TIMEOUT`]. Returns `Err(survivors)` with the +/// pids still alive after the timeout — extremely rare for SIGKILL but +/// possible if a process is wedged in uninterruptible IO. An empty `pids` +/// slice returns `Ok(0)` immediately. +/// +/// **Why SIGKILL and not SIGTERM-first:** several huskies-internal targets +/// (claude-code, the bot itself) either ignore the polite signals or take +/// arbitrarily long to honour them. The watchdog only kills agents that +/// have already misbehaved by definition (exceeded budget/turn limits), so +/// there is no reason to give them a graceful-shutdown grace period. +pub fn sigkill_pids_and_verify(pids: &[u32]) -> Result> { + if pids.is_empty() { + return Ok(0); + } + + for &pid in pids { + // libc::kill returns -1 on failure (with errno). We deliberately + // ignore the result: the process may already be gone (errno ESRCH), + // and trying again wouldn't help. The verification loop below is + // the source of truth for "did this work". + unsafe { libc::kill(pid as i32, libc::SIGKILL) }; + } + + let deadline = std::time::Instant::now() + KILL_VERIFY_TIMEOUT; + while std::time::Instant::now() < deadline { + if pids.iter().copied().all(|pid| !pid_is_alive(pid)) { + return Ok(pids.len()); + } + std::thread::sleep(KILL_VERIFY_POLL); + } + + let survivors: Vec = pids + .iter() + .copied() + .filter(|&pid| pid_is_alive(pid)) + .collect(); + if survivors.is_empty() { + Ok(pids.len()) + } else { + slog_warn!( + "[process_kill] SIGKILL did not reap pids within {:?}: {survivors:?}. \ + They may be wedged in uninterruptible IO.", + KILL_VERIFY_TIMEOUT + ); + Err(survivors) + } +} + +/// Return every pid whose command line matches `pattern` (passed to +/// `pgrep -f`). Empty when nothing matches or when `pgrep` is unavailable. +/// +/// Useful for collecting processes by a path or argument substring — e.g. +/// "every process running in `/`" or "every cargo invocation +/// against this `Cargo.toml`". +pub fn pids_matching(pattern: &str) -> Vec { + let Ok(output) = std::process::Command::new("pgrep") + .args(["-f", pattern]) + .output() + else { + return Vec::new(); + }; + String::from_utf8_lossy(&output.stdout) + .lines() + .filter_map(|l| l.trim().parse::().ok()) + .collect() +} + +/// Return every descendant pid of `root_pid`, deepest-first, **excluding** +/// `root_pid` itself. Walks the parent→child relation via `pgrep -P`. +/// +/// Deepest-first ordering lets callers signal leaves before their parents +/// when that matters; for SIGKILL it makes no difference. +pub fn descendant_pids(root_pid: u32) -> Vec { + let mut out: Vec = Vec::new(); + walk_descendants(root_pid, &mut out); + out +} + +fn walk_descendants(pid: u32, out: &mut Vec) { + let Ok(output) = std::process::Command::new("pgrep") + .args(["-P", &pid.to_string()]) + .output() + else { + return; + }; + let kids: Vec = String::from_utf8_lossy(&output.stdout) + .lines() + .filter_map(|l| l.trim().parse::().ok()) + .collect(); + for kid in kids { + walk_descendants(kid, out); + out.push(kid); + } +} + +/// Check whether `pid` currently exists. Implemented via `kill(pid, 0)` — +/// no signal is sent, only existence is probed. +fn pid_is_alive(pid: u32) -> bool { + // signal 0: "is this process around?" Returns 0 if the process exists + // and we have permission to signal it, -1 with errno otherwise. + unsafe { libc::kill(pid as i32, 0) == 0 } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::process::{Child, Command, Stdio}; + use std::thread::JoinHandle; + + /// Spawn a sleeper for kill testing, and spawn a background reaper that + /// calls `wait()` as soon as the child exits. Returns the pid plus the + /// reaper join handle so the test can confirm reaping after the kill. + /// + /// The reaper is essential because the production code's verify loop + /// uses `kill(pid, 0)` to test existence — which returns 0 for zombies. + /// If no one reaps the test's sleeper, its pid stays occupied (as a + /// zombie) and `sigkill_pids_and_verify` mistakenly reports survivors. + /// In production the PTY blocking thread is always reaping on behalf of + /// portable_pty, so this isn't a concern there. + fn spawn_sleeper_with_reaper(secs: u64) -> (u32, JoinHandle<()>) { + let child: Child = Command::new("sleep") + .arg(secs.to_string()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .stdin(Stdio::null()) + .spawn() + .expect("failed to spawn sleep"); + let pid = child.id(); + let reaper = std::thread::spawn(move || { + let mut c = child; + let _ = c.wait(); + }); + (pid, reaper) + } + + #[test] + fn sigkill_empty_slice_is_ok() { + let result = sigkill_pids_and_verify(&[]); + assert!(matches!(result, Ok(0))); + } + + #[test] + fn sigkill_real_process_is_verified_gone() { + let (pid, reaper) = spawn_sleeper_with_reaper(60); + assert!(pid_is_alive(pid), "sleeper should be alive before kill"); + + let result = sigkill_pids_and_verify(&[pid]); + assert!( + matches!(result, Ok(1)), + "sigkill must verify the process is gone: {result:?}" + ); + let _ = reaper.join(); + assert!(!pid_is_alive(pid), "sleeper must be dead after kill"); + } + + #[test] + fn sigkill_already_dead_pid_is_ok() { + let (pid, reaper) = spawn_sleeper_with_reaper(0); + let _ = reaper.join(); + // Wait briefly for the kernel to recycle the pid. + for _ in 0..20 { + if !pid_is_alive(pid) { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + // Now SIGKILL a pid that no longer exists. Result must still be Ok. + let result = sigkill_pids_and_verify(&[pid]); + assert!( + result.is_ok(), + "sigkill of already-dead pid must succeed: {result:?}" + ); + } + + #[test] + fn sigkill_multiple_real_processes() { + let mut handles: Vec<(u32, JoinHandle<()>)> = + (0..3).map(|_| spawn_sleeper_with_reaper(60)).collect(); + let pids: Vec = handles.iter().map(|(p, _)| *p).collect(); + for &pid in &pids { + assert!(pid_is_alive(pid)); + } + let result = sigkill_pids_and_verify(&pids); + assert!( + matches!(result, Ok(3)), + "all 3 sleepers must die: {result:?}" + ); + for (_, reaper) in handles.drain(..) { + let _ = reaper.join(); + } + for &pid in &pids { + assert!(!pid_is_alive(pid), "pid {pid} survived sigkill"); + } + } + + #[test] + fn pids_matching_finds_a_running_process() { + // pgrep -f matches the FULL command line, so the marker has to be + // in argv somewhere. Putting it in a shell comment doesn't work — + // sh strips it. Override argv[0] so the marker is durably visible. + use std::os::unix::process::CommandExt; + let marker = format!("kill-test-marker-{}-{}", std::process::id(), rand_u64()); + let argv0 = format!("test-marker-{marker}"); + let child: Child = Command::new("sleep") + .arg0(argv0) + .arg("60") + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .stdin(Stdio::null()) + .spawn() + .expect("spawn"); + let child_pid = child.id(); + let reaper = std::thread::spawn(move || { + let mut c = child; + let _ = c.wait(); + }); + + // pgrep needs a moment to see the new process. + std::thread::sleep(std::time::Duration::from_millis(100)); + + let found = pids_matching(&marker); + assert!( + found.contains(&child_pid), + "pids_matching should find pid {child_pid} for marker '{marker}'; got {found:?}" + ); + + // Cleanup so the test doesn't leak a sleeper. + let _ = sigkill_pids_and_verify(&[child_pid]); + let _ = reaper.join(); + } + + #[test] + fn pids_matching_returns_empty_when_no_match() { + let pattern = format!("nonexistent-pattern-{}-{}", std::process::id(), rand_u64()); + let found = pids_matching(&pattern); + assert!(found.is_empty(), "expected empty result, got {found:?}"); + } + + /// Cheap unique-ish u64 for distinguishing test invocations without a + /// dependency on a randomness crate. + fn rand_u64() -> u64 { + use std::time::{SystemTime, UNIX_EPOCH}; + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0) + } + + #[test] + fn descendant_pids_of_real_process_tree() { + // Build a parent sh that spawns a child sleep. The descendants of + // the parent should include the sleep. + let parent: Child = Command::new("sh") + .args(["-c", "sleep 60"]) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .stdin(Stdio::null()) + .spawn() + .expect("spawn parent"); + let parent_pid = parent.id(); + let reaper = std::thread::spawn(move || { + let mut c = parent; + let _ = c.wait(); + }); + + // Let the shell get around to fork+execing its child. + std::thread::sleep(std::time::Duration::from_millis(200)); + + let descendants = descendant_pids(parent_pid); + // On some shells `sh -c "sleep N"` exec-replaces sh with sleep, leaving + // zero descendants. On others it forks. We don't care which; we only + // care that the function doesn't panic and returns a sensible vec. + assert!( + descendants.iter().all(|&pid| pid != parent_pid), + "descendant_pids must not include the root itself: {descendants:?}" + ); + + // Cleanup: kill the parent and any descendants. + let mut all = descendants; + all.push(parent_pid); + let _ = sigkill_pids_and_verify(&all); + let _ = reaper.join(); + } +}