From 4aa76ce673e10996a7abe796822f5e57e9a202f4 Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 15 May 2026 11:10:55 +0000 Subject: [PATCH] huskies: merge 1090 refactor Migrate `AgentPool::kill_all_children` and `kill_child_for_key` to `process_kill` so server shutdown and `stop_agent` actually kill claude --- .../agents/pool/auto_assign/watchdog/mod.rs | 6 - server/src/agents/pool/mod.rs | 6 - .../agents/pool/pipeline/advance/helpers.rs | 1 - .../agents/pool/pipeline/completion/legacy.rs | 1 - .../src/agents/pool/pipeline/merge/control.rs | 1 - server/src/agents/pool/process.rs | 247 ++++++++++++------ server/src/agents/pool/start/mod.rs | 1 - server/src/agents/pool/start/spawn.rs | 10 +- server/src/agents/pool/stop.rs | 6 +- server/src/agents/pty/mod.rs | 24 +- server/src/agents/pty/runner.rs | 23 +- server/src/agents/pty/types.rs | 22 -- server/src/agents/runtime/claude_code.rs | 18 +- server/src/agents/runtime/mod.rs | 8 +- 14 files changed, 175 insertions(+), 199 deletions(-) diff --git a/server/src/agents/pool/auto_assign/watchdog/mod.rs b/server/src/agents/pool/auto_assign/watchdog/mod.rs index e04b5b9e..e7d4709d 100644 --- a/server/src/agents/pool/auto_assign/watchdog/mod.rs +++ b/server/src/agents/pool/auto_assign/watchdog/mod.rs @@ -105,12 +105,6 @@ impl AgentPool { } } - // Step 4: drop the (now-stale) child_killers entry — the - // process it pointed at is gone. - if let Ok(mut killers) = self.child_killers.lock() { - killers.remove(key); - } - // Use the retry mechanism: increment retry_count and only block // when the limit is exceeded, matching the pipeline's behaviour. let story_id = key.rsplit_once(':').map(|(s, _)| s).unwrap_or(key); diff --git a/server/src/agents/pool/mod.rs b/server/src/agents/pool/mod.rs index c539273f..8c0cd43d 100644 --- a/server/src/agents/pool/mod.rs +++ b/server/src/agents/pool/mod.rs @@ -18,7 +18,6 @@ mod test_helpers; use crate::io::watcher::WatcherEvent; use crate::service::status::StatusBroadcaster; -use portable_pty::ChildKiller; use std::collections::HashMap; use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; @@ -31,10 +30,6 @@ use types::{StoryAgent, composite_key}; pub struct AgentPool { agents: Arc>>, port: u16, - /// Registry of active PTY child process killers, keyed by "{story_id}:{agent_name}". - /// Used to terminate child processes on server shutdown or agent stop, preventing - /// orphaned Claude Code processes from running after the server exits. - child_killers: Arc>>>, /// Broadcast channel for notifying WebSocket clients of agent state changes. /// When an agent transitions state (Pending, Running, Completed, Failed, Stopped), /// an `AgentStateChanged` event is emitted so the frontend can refresh the @@ -56,7 +51,6 @@ impl AgentPool { let pool = Self { agents: Arc::new(Mutex::new(HashMap::new())), port, - child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx: watcher_tx.clone(), status_broadcaster: Arc::new(StatusBroadcaster::new()), }; diff --git a/server/src/agents/pool/pipeline/advance/helpers.rs b/server/src/agents/pool/pipeline/advance/helpers.rs index 4ee340a8..2ac440a2 100644 --- a/server/src/agents/pool/pipeline/advance/helpers.rs +++ b/server/src/agents/pool/pipeline/advance/helpers.rs @@ -33,7 +33,6 @@ pub(crate) fn spawn_pipeline_advance( let pool = AgentPool { agents, port, - child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx, status_broadcaster: Arc::new(crate::service::status::StatusBroadcaster::new()), }; diff --git a/server/src/agents/pool/pipeline/completion/legacy.rs b/server/src/agents/pool/pipeline/completion/legacy.rs index fd699ca0..6f3351fb 100644 --- a/server/src/agents/pool/pipeline/completion/legacy.rs +++ b/server/src/agents/pool/pipeline/completion/legacy.rs @@ -111,7 +111,6 @@ impl AgentPool { let pool_clone = Self { agents: Arc::clone(&self.agents), port: self.port, - child_killers: Arc::clone(&self.child_killers), watcher_tx: self.watcher_tx.clone(), status_broadcaster: Arc::clone(&self.status_broadcaster), }; diff --git a/server/src/agents/pool/pipeline/merge/control.rs b/server/src/agents/pool/pipeline/merge/control.rs index a99f0a64..f2155a63 100644 --- a/server/src/agents/pool/pipeline/merge/control.rs +++ b/server/src/agents/pool/pipeline/merge/control.rs @@ -18,7 +18,6 @@ impl AgentPool { let pool = Arc::new(Self { agents: Arc::clone(&self.agents), port: self.port, - child_killers: Arc::clone(&self.child_killers), watcher_tx: self.watcher_tx.clone(), status_broadcaster: Arc::clone(&self.status_broadcaster), }); diff --git a/server/src/agents/pool/process.rs b/server/src/agents/pool/process.rs index a3f7ba2b..fc767925 100644 --- a/server/src/agents/pool/process.rs +++ b/server/src/agents/pool/process.rs @@ -1,12 +1,20 @@ //! Process management — kills orphaned PTY child processes on server shutdown. //! -//! See [`crate::process_kill`] for the general process-termination primitives -//! this module's existing methods (`kill_all_children`, `kill_child_for_key`) -//! should eventually be migrated to. Those methods currently use -//! `portable_pty::ChildKiller::kill()`, which sends `SIGHUP` — a signal -//! claude-code ignores — so they leave orphans on every shutdown/stop. The -//! migration is tracked in a separate story to keep its diff focused. +//! As of story 1090 (2026-05-15), all process termination in this module uses +//! [`crate::process_kill::sigkill_pids_and_verify`] — SIGHUP-based killing via +//! `portable_pty::ChildKiller` has been removed entirely from the server. +//! +//! ## History +//! +//! Prior to commit `fe9804b3`, the watchdog and all kill paths sent SIGHUP via +//! `portable_pty::ChildKiller::kill()`. Claude Code ignores SIGHUP, so agents +//! survived "kills" and ran concurrently with their replacements — the root cause +//! of the 2026-05-15 duplicate-spawn incident. `fe9804b3` migrated the watchdog; +//! story 1090 completes the migration by rewriting `kill_all_children` and +//! `kill_child_for_key` (this file) to use `pids_matching` + `sigkill_pids_and_verify`. +use crate::process_kill::{pids_matching, sigkill_pids_and_verify}; use crate::slog; +use crate::slog_warn; use super::AgentPool; @@ -14,53 +22,97 @@ impl AgentPool { /// Kill all active PTY child processes. /// /// Called on server shutdown to prevent orphaned Claude Code processes from - /// continuing to run after the server exits. Each registered killer is called - /// once, then the registry is cleared. + /// continuing to run after the server exits. Collects each agent's worktree + /// path, then SIGKILLs every process running inside that path and verifies + /// termination before returning. pub fn kill_all_children(&self) { - if let Ok(mut killers) = self.child_killers.lock() { - for (key, killer) in killers.iter_mut() { - slog!("[agents] Killing child process for {key} on shutdown"); - let _ = killer.kill(); + let worktree_paths: Vec<(String, std::path::PathBuf)> = { + let Ok(agents) = self.agents.lock() else { + return; + }; + agents + .iter() + .filter_map(|(key, agent)| { + agent + .worktree_info + .as_ref() + .map(|wt| (key.clone(), wt.path.clone())) + }) + .collect() + }; + + for (key, path) in worktree_paths { + let pattern = path.display().to_string(); + let pids = pids_matching(&pattern); + if pids.is_empty() { + slog!( + "[agents] No processes found in worktree {} for '{key}' on shutdown", + path.display() + ); + continue; + } + match sigkill_pids_and_verify(&pids) { + Ok(n) => slog!( + "[agents] SIGKILL'd {n} process(es) in worktree {} for '{key}' on shutdown", + path.display() + ), + Err(survivors) => slog_warn!( + "[agents] SIGKILL incomplete for '{key}' on shutdown: \ + pids still alive: {survivors:?}" + ), } - killers.clear(); } } /// Kill and deregister the child process for a specific agent key. /// - /// Used by `stop_agent` to ensure the PTY child is terminated even though - /// aborting a `spawn_blocking` task handle does not interrupt the blocking thread. + /// Fallback used by `stop_agent` when no worktree path is recorded for the + /// agent. Also the primary kill path for any caller that has only a composite + /// key and not a worktree path directly. pub(super) fn kill_child_for_key(&self, key: &str) { - if let Ok(mut killers) = self.child_killers.lock() - && let Some(mut killer) = killers.remove(key) - { - slog!("[agents] Killing child process for {key} on stop"); - let _ = killer.kill(); + let worktree_path = { + let Ok(agents) = self.agents.lock() else { + return; + }; + agents + .get(key) + .and_then(|a| a.worktree_info.as_ref().map(|wt| wt.path.clone())) + }; + + let Some(path) = worktree_path else { + slog_warn!( + "[agents] No worktree path recorded for '{key}'; \ + cannot SIGKILL via process_kill (no-op)" + ); + return; + }; + + let pattern = path.display().to_string(); + let pids = pids_matching(&pattern); + if pids.is_empty() { + slog!( + "[agents] No processes found in worktree {} for '{key}' on stop", + path.display() + ); + return; + } + match sigkill_pids_and_verify(&pids) { + Ok(n) => slog!( + "[agents] SIGKILL'd {n} process(es) in worktree {} for '{key}' on stop", + path.display() + ), + Err(survivors) => slog_warn!( + "[agents] SIGKILL incomplete for '{key}' on stop: \ + pids still alive: {survivors:?}" + ), } - } - - /// Test helper: inject a child killer into the registry. - #[cfg(test)] - pub fn inject_child_killer( - &self, - key: &str, - killer: Box, - ) { - let mut killers = self.child_killers.lock().unwrap(); - killers.insert(key.to_string(), killer); - } - - /// Test helper: return the number of registered child killers. - #[cfg(test)] - pub fn child_killer_count(&self) -> usize { - self.child_killers.lock().unwrap().len() } } #[cfg(test)] mod tests { use super::super::AgentPool; - use portable_pty::{CommandBuilder, PtySize, native_pty_system}; + use crate::agents::AgentStatus; use std::process::Command; /// Returns true if a process with the given PID is currently running. @@ -75,79 +127,100 @@ mod tests { #[test] fn kill_all_children_is_safe_on_empty_pool() { let pool = AgentPool::new_test(3001); - pool.kill_all_children(); - assert_eq!(pool.child_killer_count(), 0); + pool.kill_all_children(); // must not panic } + /// AC 4 — `kill_child_for_key` SIGKILLs the single agent's process and + /// verifies it is gone within 2 s. The sleeper has the worktree path in + /// its argv[0] so `pgrep -f` can locate it, mirroring how claude-code is + /// launched with `--directory ` in production. #[test] - fn kill_all_children_kills_real_process() { - let pool = AgentPool::new_test(3001); + fn kill_child_for_key_kills_real_process() { + use std::os::unix::process::CommandExt; - let pty_system = native_pty_system(); - let pair = pty_system - .openpty(PtySize { - rows: 24, - cols: 80, - pixel_width: 0, - pixel_height: 0, - }) - .expect("failed to open pty"); + let pool = AgentPool::new_test(3002); + let tmp = tempfile::tempdir().unwrap(); + let worktree = tmp.path(); - let mut cmd = CommandBuilder::new("sleep"); - cmd.arg("100"); - let mut child = pair - .slave - .spawn_command(cmd) - .expect("failed to spawn sleep"); - let pid = child.process_id().expect("no pid"); + // argv[0] = worktree path → pgrep -f finds this process. + let mut child = Command::new("sleep") + .arg0(worktree.to_string_lossy().as_ref()) + .arg("100") + .spawn() + .expect("spawn sleeper"); + let pid = child.id(); - pool.inject_child_killer("story:agent", child.clone_killer()); + // Give pgrep a moment to see the new process. + std::thread::sleep(std::time::Duration::from_millis(100)); + + pool.inject_test_agent_with_path( + "story-1090-kill", + "coder", + AgentStatus::Running, + worktree.to_path_buf(), + ); assert!( process_is_running(pid), - "process {pid} should be running before kill_all_children" + "sleeper pid {pid} should be running before kill_child_for_key" ); - pool.kill_all_children(); - let _ = child.wait(); + pool.kill_child_for_key("story-1090-kill:coder"); + let _ = child.wait(); // reap zombie so ps -p returns false assert!( !process_is_running(pid), - "process {pid} should have been killed by kill_all_children" + "sleeper pid {pid} should be dead after kill_child_for_key" ); } + /// AC 5 — `kill_all_children` SIGKILLs all agents' processes. Two agents + /// with distinct worktree paths are injected; both must be gone after the call. #[test] - fn kill_all_children_clears_registry() { - let pool = AgentPool::new_test(3001); + fn kill_all_children_kills_multiple_real_processes() { + use std::os::unix::process::CommandExt; - let pty_system = native_pty_system(); - let pair = pty_system - .openpty(PtySize { - rows: 24, - cols: 80, - pixel_width: 0, - pixel_height: 0, + let pool = AgentPool::new_test(3003); + + let mut sleepers: Vec<(u32, std::process::Child, tempfile::TempDir)> = (0..2_u32) + .map(|i| { + let tmp = tempfile::tempdir().unwrap(); + let worktree = tmp.path(); + // argv[0] = worktree path for pgrep discoverability. + let child = Command::new("sleep") + .arg0(worktree.to_string_lossy().as_ref()) + .arg("100") + .spawn() + .expect("spawn sleeper"); + let pid = child.id(); + pool.inject_test_agent_with_path( + &format!("story-1090-all-{i}"), + "coder", + AgentStatus::Running, + worktree.to_path_buf(), + ); + (pid, child, tmp) }) - .expect("failed to open pty"); + .collect(); - let mut cmd = CommandBuilder::new("sleep"); - cmd.arg("1"); - let mut child = pair - .slave - .spawn_command(cmd) - .expect("failed to spawn sleep"); + // Give pgrep a moment to see the new processes. + std::thread::sleep(std::time::Duration::from_millis(100)); - pool.inject_child_killer("story:agent", child.clone_killer()); - assert_eq!(pool.child_killer_count(), 1); + for (pid, _, _) in &sleepers { + assert!( + process_is_running(*pid), + "pid {pid} should be running before kill_all_children" + ); + } pool.kill_all_children(); - let _ = child.wait(); - assert_eq!( - pool.child_killer_count(), - 0, - "child_killers should be cleared after kill_all_children" - ); + for (pid, child, _tmp) in &mut sleepers { + let _ = child.wait(); // reap zombie + assert!( + !process_is_running(*pid), + "pid {pid} should be dead after kill_all_children" + ); + } } } diff --git a/server/src/agents/pool/start/mod.rs b/server/src/agents/pool/start/mod.rs index 02daf7c6..6a8b5a0b 100644 --- a/server/src/agents/pool/start/mod.rs +++ b/server/src/agents/pool/start/mod.rs @@ -392,7 +392,6 @@ impl AgentPool { event_log.clone(), self.port, log_writer.clone(), - self.child_killers.clone(), self.watcher_tx.clone(), inactivity_timeout_secs, prior_events, diff --git a/server/src/agents/pool/start/spawn.rs b/server/src/agents/pool/start/spawn.rs index 6017df32..130e9f9e 100644 --- a/server/src/agents/pool/start/spawn.rs +++ b/server/src/agents/pool/start/spawn.rs @@ -8,7 +8,6 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::{Arc, Mutex}; -use portable_pty::ChildKiller; use tokio::sync::broadcast; use crate::agent_log::AgentLogWriter; @@ -135,7 +134,6 @@ pub(super) async fn run_agent_spawn( event_log: Arc>>, port: u16, log_writer: Option>>, - child_killers: Arc>>>, watcher_tx: broadcast::Sender, inactivity_timeout_secs: u64, // Formatted `` block drained from the previous session's @@ -159,7 +157,6 @@ pub(super) async fn run_agent_spawn( let log_clone = event_log; let port_for_task = port; let log_writer_clone = log_writer; - let child_killers_clone = child_killers; let watcher_tx_clone = watcher_tx; let _ = inactivity_timeout_secs; // currently unused inside the closure body @@ -371,8 +368,7 @@ pub(super) async fn run_agent_spawn( let run_result = match runtime_name { "claude-code" => { - let runtime = - ClaudeCodeRuntime::new(child_killers_clone.clone(), watcher_tx_clone.clone()); + let runtime = ClaudeCodeRuntime::new(watcher_tx_clone.clone()); let ctx = RuntimeContext { story_id: sid.clone(), agent_name: aname.clone(), @@ -566,7 +562,6 @@ pub(super) async fn run_agent_spawn( let pool = AgentPool { agents: agents_for_respawn, port: port_r, - child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx: watcher_for_respawn, status_broadcaster: Arc::new( crate::service::status::StatusBroadcaster::new(), @@ -654,7 +649,6 @@ pub(super) async fn run_agent_spawn( let pool = AgentPool { agents: agents_for_cd, port: port_for_cd, - child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx: watcher_for_cd, status_broadcaster: Arc::new( crate::service::status::StatusBroadcaster::new(), @@ -774,7 +768,6 @@ pub(super) async fn run_agent_spawn( let pool = AgentPool { agents: agents_for_cd, port: port_for_cd, - child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx: watcher_for_cd, status_broadcaster: Arc::new( crate::service::status::StatusBroadcaster::new(), @@ -862,7 +855,6 @@ pub(super) async fn run_agent_spawn( let pool = AgentPool { agents: agents_for_respawn, port: port_r, - child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx: watcher_for_respawn, status_broadcaster: Arc::new( crate::service::status::StatusBroadcaster::new(), diff --git a/server/src/agents/pool/stop.rs b/server/src/agents/pool/stop.rs index 2ad1dfe1..f6aff2e6 100644 --- a/server/src/agents/pool/stop.rs +++ b/server/src/agents/pool/stop.rs @@ -71,8 +71,7 @@ impl AgentPool { self.kill_child_for_key(&key); } - // Step 3: now safe to mutate. Status flip, handle abort, drop the - // child_killers entry. + // Step 3: now safe to mutate. Status flip and handle abort. let (task_handle, tx) = { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents @@ -88,9 +87,6 @@ impl AgentPool { handle.abort(); let _ = handle.await; } - if let Ok(mut killers) = self.child_killers.lock() { - killers.remove(&key); - } // Preserve worktree for inspection — don't destroy agent's work on stop. if let Some(ref wt) = worktree_info { diff --git a/server/src/agents/pty/mod.rs b/server/src/agents/pty/mod.rs index 627c89da..c44a956f 100644 --- a/server/src/agents/pty/mod.rs +++ b/server/src/agents/pty/mod.rs @@ -13,7 +13,6 @@ mod tests { use super::*; use crate::agents::AgentEvent; use crate::io::watcher::WatcherEvent; - use std::collections::HashMap; use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; @@ -41,7 +40,6 @@ mod tests { let (tx, _rx) = broadcast::channel::(64); let (watcher_tx, mut watcher_rx) = broadcast::channel::(16); let event_log = Arc::new(Mutex::new(Vec::new())); - let child_killers = Arc::new(Mutex::new(HashMap::new())); // sh -p "--"