huskies: merge 1090 refactor Migrate AgentPool::kill_all_children and kill_child_for_key to process_kill so server shutdown and stop_agent actually kill claude
This commit is contained in:
@@ -105,12 +105,6 @@ impl AgentPool {
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: drop the (now-stale) child_killers entry — the
|
||||
// process it pointed at is gone.
|
||||
if let Ok(mut killers) = self.child_killers.lock() {
|
||||
killers.remove(key);
|
||||
}
|
||||
|
||||
// Use the retry mechanism: increment retry_count and only block
|
||||
// when the limit is exceeded, matching the pipeline's behaviour.
|
||||
let story_id = key.rsplit_once(':').map(|(s, _)| s).unwrap_or(key);
|
||||
|
||||
@@ -18,7 +18,6 @@ mod test_helpers;
|
||||
|
||||
use crate::io::watcher::WatcherEvent;
|
||||
use crate::service::status::StatusBroadcaster;
|
||||
use portable_pty::ChildKiller;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::broadcast;
|
||||
@@ -31,10 +30,6 @@ use types::{StoryAgent, composite_key};
|
||||
pub struct AgentPool {
|
||||
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
|
||||
port: u16,
|
||||
/// Registry of active PTY child process killers, keyed by "{story_id}:{agent_name}".
|
||||
/// Used to terminate child processes on server shutdown or agent stop, preventing
|
||||
/// orphaned Claude Code processes from running after the server exits.
|
||||
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
||||
/// Broadcast channel for notifying WebSocket clients of agent state changes.
|
||||
/// When an agent transitions state (Pending, Running, Completed, Failed, Stopped),
|
||||
/// an `AgentStateChanged` event is emitted so the frontend can refresh the
|
||||
@@ -56,7 +51,6 @@ impl AgentPool {
|
||||
let pool = Self {
|
||||
agents: Arc::new(Mutex::new(HashMap::new())),
|
||||
port,
|
||||
child_killers: Arc::new(Mutex::new(HashMap::new())),
|
||||
watcher_tx: watcher_tx.clone(),
|
||||
status_broadcaster: Arc::new(StatusBroadcaster::new()),
|
||||
};
|
||||
|
||||
@@ -33,7 +33,6 @@ pub(crate) fn spawn_pipeline_advance(
|
||||
let pool = AgentPool {
|
||||
agents,
|
||||
port,
|
||||
child_killers: Arc::new(Mutex::new(HashMap::new())),
|
||||
watcher_tx,
|
||||
status_broadcaster: Arc::new(crate::service::status::StatusBroadcaster::new()),
|
||||
};
|
||||
|
||||
@@ -111,7 +111,6 @@ impl AgentPool {
|
||||
let pool_clone = Self {
|
||||
agents: Arc::clone(&self.agents),
|
||||
port: self.port,
|
||||
child_killers: Arc::clone(&self.child_killers),
|
||||
watcher_tx: self.watcher_tx.clone(),
|
||||
status_broadcaster: Arc::clone(&self.status_broadcaster),
|
||||
};
|
||||
|
||||
@@ -18,7 +18,6 @@ impl AgentPool {
|
||||
let pool = Arc::new(Self {
|
||||
agents: Arc::clone(&self.agents),
|
||||
port: self.port,
|
||||
child_killers: Arc::clone(&self.child_killers),
|
||||
watcher_tx: self.watcher_tx.clone(),
|
||||
status_broadcaster: Arc::clone(&self.status_broadcaster),
|
||||
});
|
||||
|
||||
@@ -1,12 +1,20 @@
|
||||
//! Process management — kills orphaned PTY child processes on server shutdown.
|
||||
//!
|
||||
//! See [`crate::process_kill`] for the general process-termination primitives
|
||||
//! this module's existing methods (`kill_all_children`, `kill_child_for_key`)
|
||||
//! should eventually be migrated to. Those methods currently use
|
||||
//! `portable_pty::ChildKiller::kill()`, which sends `SIGHUP` — a signal
|
||||
//! claude-code ignores — so they leave orphans on every shutdown/stop. The
|
||||
//! migration is tracked in a separate story to keep its diff focused.
|
||||
//! As of story 1090 (2026-05-15), all process termination in this module uses
|
||||
//! [`crate::process_kill::sigkill_pids_and_verify`] — SIGHUP-based killing via
|
||||
//! `portable_pty::ChildKiller` has been removed entirely from the server.
|
||||
//!
|
||||
//! ## History
|
||||
//!
|
||||
//! Prior to commit `fe9804b3`, the watchdog and all kill paths sent SIGHUP via
|
||||
//! `portable_pty::ChildKiller::kill()`. Claude Code ignores SIGHUP, so agents
|
||||
//! survived "kills" and ran concurrently with their replacements — the root cause
|
||||
//! of the 2026-05-15 duplicate-spawn incident. `fe9804b3` migrated the watchdog;
|
||||
//! story 1090 completes the migration by rewriting `kill_all_children` and
|
||||
//! `kill_child_for_key` (this file) to use `pids_matching` + `sigkill_pids_and_verify`.
|
||||
use crate::process_kill::{pids_matching, sigkill_pids_and_verify};
|
||||
use crate::slog;
|
||||
use crate::slog_warn;
|
||||
|
||||
use super::AgentPool;
|
||||
|
||||
@@ -14,53 +22,97 @@ impl AgentPool {
|
||||
/// Kill all active PTY child processes.
|
||||
///
|
||||
/// Called on server shutdown to prevent orphaned Claude Code processes from
|
||||
/// continuing to run after the server exits. Each registered killer is called
|
||||
/// once, then the registry is cleared.
|
||||
/// continuing to run after the server exits. Collects each agent's worktree
|
||||
/// path, then SIGKILLs every process running inside that path and verifies
|
||||
/// termination before returning.
|
||||
pub fn kill_all_children(&self) {
|
||||
if let Ok(mut killers) = self.child_killers.lock() {
|
||||
for (key, killer) in killers.iter_mut() {
|
||||
slog!("[agents] Killing child process for {key} on shutdown");
|
||||
let _ = killer.kill();
|
||||
let worktree_paths: Vec<(String, std::path::PathBuf)> = {
|
||||
let Ok(agents) = self.agents.lock() else {
|
||||
return;
|
||||
};
|
||||
agents
|
||||
.iter()
|
||||
.filter_map(|(key, agent)| {
|
||||
agent
|
||||
.worktree_info
|
||||
.as_ref()
|
||||
.map(|wt| (key.clone(), wt.path.clone()))
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
for (key, path) in worktree_paths {
|
||||
let pattern = path.display().to_string();
|
||||
let pids = pids_matching(&pattern);
|
||||
if pids.is_empty() {
|
||||
slog!(
|
||||
"[agents] No processes found in worktree {} for '{key}' on shutdown",
|
||||
path.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
match sigkill_pids_and_verify(&pids) {
|
||||
Ok(n) => slog!(
|
||||
"[agents] SIGKILL'd {n} process(es) in worktree {} for '{key}' on shutdown",
|
||||
path.display()
|
||||
),
|
||||
Err(survivors) => slog_warn!(
|
||||
"[agents] SIGKILL incomplete for '{key}' on shutdown: \
|
||||
pids still alive: {survivors:?}"
|
||||
),
|
||||
}
|
||||
killers.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/// Kill and deregister the child process for a specific agent key.
|
||||
///
|
||||
/// Used by `stop_agent` to ensure the PTY child is terminated even though
|
||||
/// aborting a `spawn_blocking` task handle does not interrupt the blocking thread.
|
||||
/// Fallback used by `stop_agent` when no worktree path is recorded for the
|
||||
/// agent. Also the primary kill path for any caller that has only a composite
|
||||
/// key and not a worktree path directly.
|
||||
pub(super) fn kill_child_for_key(&self, key: &str) {
|
||||
if let Ok(mut killers) = self.child_killers.lock()
|
||||
&& let Some(mut killer) = killers.remove(key)
|
||||
{
|
||||
slog!("[agents] Killing child process for {key} on stop");
|
||||
let _ = killer.kill();
|
||||
let worktree_path = {
|
||||
let Ok(agents) = self.agents.lock() else {
|
||||
return;
|
||||
};
|
||||
agents
|
||||
.get(key)
|
||||
.and_then(|a| a.worktree_info.as_ref().map(|wt| wt.path.clone()))
|
||||
};
|
||||
|
||||
let Some(path) = worktree_path else {
|
||||
slog_warn!(
|
||||
"[agents] No worktree path recorded for '{key}'; \
|
||||
cannot SIGKILL via process_kill (no-op)"
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
let pattern = path.display().to_string();
|
||||
let pids = pids_matching(&pattern);
|
||||
if pids.is_empty() {
|
||||
slog!(
|
||||
"[agents] No processes found in worktree {} for '{key}' on stop",
|
||||
path.display()
|
||||
);
|
||||
return;
|
||||
}
|
||||
match sigkill_pids_and_verify(&pids) {
|
||||
Ok(n) => slog!(
|
||||
"[agents] SIGKILL'd {n} process(es) in worktree {} for '{key}' on stop",
|
||||
path.display()
|
||||
),
|
||||
Err(survivors) => slog_warn!(
|
||||
"[agents] SIGKILL incomplete for '{key}' on stop: \
|
||||
pids still alive: {survivors:?}"
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Test helper: inject a child killer into the registry.
|
||||
#[cfg(test)]
|
||||
pub fn inject_child_killer(
|
||||
&self,
|
||||
key: &str,
|
||||
killer: Box<dyn portable_pty::ChildKiller + Send + Sync>,
|
||||
) {
|
||||
let mut killers = self.child_killers.lock().unwrap();
|
||||
killers.insert(key.to_string(), killer);
|
||||
}
|
||||
|
||||
/// Test helper: return the number of registered child killers.
|
||||
#[cfg(test)]
|
||||
pub fn child_killer_count(&self) -> usize {
|
||||
self.child_killers.lock().unwrap().len()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::super::AgentPool;
|
||||
use portable_pty::{CommandBuilder, PtySize, native_pty_system};
|
||||
use crate::agents::AgentStatus;
|
||||
use std::process::Command;
|
||||
|
||||
/// Returns true if a process with the given PID is currently running.
|
||||
@@ -75,79 +127,100 @@ mod tests {
|
||||
#[test]
|
||||
fn kill_all_children_is_safe_on_empty_pool() {
|
||||
let pool = AgentPool::new_test(3001);
|
||||
pool.kill_all_children();
|
||||
assert_eq!(pool.child_killer_count(), 0);
|
||||
pool.kill_all_children(); // must not panic
|
||||
}
|
||||
|
||||
/// AC 4 — `kill_child_for_key` SIGKILLs the single agent's process and
|
||||
/// verifies it is gone within 2 s. The sleeper has the worktree path in
|
||||
/// its argv[0] so `pgrep -f` can locate it, mirroring how claude-code is
|
||||
/// launched with `--directory <worktree>` in production.
|
||||
#[test]
|
||||
fn kill_all_children_kills_real_process() {
|
||||
let pool = AgentPool::new_test(3001);
|
||||
fn kill_child_for_key_kills_real_process() {
|
||||
use std::os::unix::process::CommandExt;
|
||||
|
||||
let pty_system = native_pty_system();
|
||||
let pair = pty_system
|
||||
.openpty(PtySize {
|
||||
rows: 24,
|
||||
cols: 80,
|
||||
pixel_width: 0,
|
||||
pixel_height: 0,
|
||||
})
|
||||
.expect("failed to open pty");
|
||||
let pool = AgentPool::new_test(3002);
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let worktree = tmp.path();
|
||||
|
||||
let mut cmd = CommandBuilder::new("sleep");
|
||||
cmd.arg("100");
|
||||
let mut child = pair
|
||||
.slave
|
||||
.spawn_command(cmd)
|
||||
.expect("failed to spawn sleep");
|
||||
let pid = child.process_id().expect("no pid");
|
||||
// argv[0] = worktree path → pgrep -f <path> finds this process.
|
||||
let mut child = Command::new("sleep")
|
||||
.arg0(worktree.to_string_lossy().as_ref())
|
||||
.arg("100")
|
||||
.spawn()
|
||||
.expect("spawn sleeper");
|
||||
let pid = child.id();
|
||||
|
||||
pool.inject_child_killer("story:agent", child.clone_killer());
|
||||
// Give pgrep a moment to see the new process.
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
|
||||
pool.inject_test_agent_with_path(
|
||||
"story-1090-kill",
|
||||
"coder",
|
||||
AgentStatus::Running,
|
||||
worktree.to_path_buf(),
|
||||
);
|
||||
|
||||
assert!(
|
||||
process_is_running(pid),
|
||||
"process {pid} should be running before kill_all_children"
|
||||
"sleeper pid {pid} should be running before kill_child_for_key"
|
||||
);
|
||||
|
||||
pool.kill_all_children();
|
||||
let _ = child.wait();
|
||||
pool.kill_child_for_key("story-1090-kill:coder");
|
||||
let _ = child.wait(); // reap zombie so ps -p returns false
|
||||
|
||||
assert!(
|
||||
!process_is_running(pid),
|
||||
"process {pid} should have been killed by kill_all_children"
|
||||
"sleeper pid {pid} should be dead after kill_child_for_key"
|
||||
);
|
||||
}
|
||||
|
||||
/// AC 5 — `kill_all_children` SIGKILLs all agents' processes. Two agents
|
||||
/// with distinct worktree paths are injected; both must be gone after the call.
|
||||
#[test]
|
||||
fn kill_all_children_clears_registry() {
|
||||
let pool = AgentPool::new_test(3001);
|
||||
fn kill_all_children_kills_multiple_real_processes() {
|
||||
use std::os::unix::process::CommandExt;
|
||||
|
||||
let pty_system = native_pty_system();
|
||||
let pair = pty_system
|
||||
.openpty(PtySize {
|
||||
rows: 24,
|
||||
cols: 80,
|
||||
pixel_width: 0,
|
||||
pixel_height: 0,
|
||||
let pool = AgentPool::new_test(3003);
|
||||
|
||||
let mut sleepers: Vec<(u32, std::process::Child, tempfile::TempDir)> = (0..2_u32)
|
||||
.map(|i| {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let worktree = tmp.path();
|
||||
// argv[0] = worktree path for pgrep discoverability.
|
||||
let child = Command::new("sleep")
|
||||
.arg0(worktree.to_string_lossy().as_ref())
|
||||
.arg("100")
|
||||
.spawn()
|
||||
.expect("spawn sleeper");
|
||||
let pid = child.id();
|
||||
pool.inject_test_agent_with_path(
|
||||
&format!("story-1090-all-{i}"),
|
||||
"coder",
|
||||
AgentStatus::Running,
|
||||
worktree.to_path_buf(),
|
||||
);
|
||||
(pid, child, tmp)
|
||||
})
|
||||
.expect("failed to open pty");
|
||||
.collect();
|
||||
|
||||
let mut cmd = CommandBuilder::new("sleep");
|
||||
cmd.arg("1");
|
||||
let mut child = pair
|
||||
.slave
|
||||
.spawn_command(cmd)
|
||||
.expect("failed to spawn sleep");
|
||||
// Give pgrep a moment to see the new processes.
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
|
||||
pool.inject_child_killer("story:agent", child.clone_killer());
|
||||
assert_eq!(pool.child_killer_count(), 1);
|
||||
for (pid, _, _) in &sleepers {
|
||||
assert!(
|
||||
process_is_running(*pid),
|
||||
"pid {pid} should be running before kill_all_children"
|
||||
);
|
||||
}
|
||||
|
||||
pool.kill_all_children();
|
||||
let _ = child.wait();
|
||||
|
||||
assert_eq!(
|
||||
pool.child_killer_count(),
|
||||
0,
|
||||
"child_killers should be cleared after kill_all_children"
|
||||
);
|
||||
for (pid, child, _tmp) in &mut sleepers {
|
||||
let _ = child.wait(); // reap zombie
|
||||
assert!(
|
||||
!process_is_running(*pid),
|
||||
"pid {pid} should be dead after kill_all_children"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -392,7 +392,6 @@ impl AgentPool {
|
||||
event_log.clone(),
|
||||
self.port,
|
||||
log_writer.clone(),
|
||||
self.child_killers.clone(),
|
||||
self.watcher_tx.clone(),
|
||||
inactivity_timeout_secs,
|
||||
prior_events,
|
||||
|
||||
@@ -8,7 +8,6 @@ use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use portable_pty::ChildKiller;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::agent_log::AgentLogWriter;
|
||||
@@ -135,7 +134,6 @@ pub(super) async fn run_agent_spawn(
|
||||
event_log: Arc<Mutex<Vec<AgentEvent>>>,
|
||||
port: u16,
|
||||
log_writer: Option<Arc<Mutex<AgentLogWriter>>>,
|
||||
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
||||
watcher_tx: broadcast::Sender<WatcherEvent>,
|
||||
inactivity_timeout_secs: u64,
|
||||
// Formatted `<recent-events>` block drained from the previous session's
|
||||
@@ -159,7 +157,6 @@ pub(super) async fn run_agent_spawn(
|
||||
let log_clone = event_log;
|
||||
let port_for_task = port;
|
||||
let log_writer_clone = log_writer;
|
||||
let child_killers_clone = child_killers;
|
||||
let watcher_tx_clone = watcher_tx;
|
||||
let _ = inactivity_timeout_secs; // currently unused inside the closure body
|
||||
|
||||
@@ -371,8 +368,7 @@ pub(super) async fn run_agent_spawn(
|
||||
|
||||
let run_result = match runtime_name {
|
||||
"claude-code" => {
|
||||
let runtime =
|
||||
ClaudeCodeRuntime::new(child_killers_clone.clone(), watcher_tx_clone.clone());
|
||||
let runtime = ClaudeCodeRuntime::new(watcher_tx_clone.clone());
|
||||
let ctx = RuntimeContext {
|
||||
story_id: sid.clone(),
|
||||
agent_name: aname.clone(),
|
||||
@@ -566,7 +562,6 @@ pub(super) async fn run_agent_spawn(
|
||||
let pool = AgentPool {
|
||||
agents: agents_for_respawn,
|
||||
port: port_r,
|
||||
child_killers: Arc::new(Mutex::new(HashMap::new())),
|
||||
watcher_tx: watcher_for_respawn,
|
||||
status_broadcaster: Arc::new(
|
||||
crate::service::status::StatusBroadcaster::new(),
|
||||
@@ -654,7 +649,6 @@ pub(super) async fn run_agent_spawn(
|
||||
let pool = AgentPool {
|
||||
agents: agents_for_cd,
|
||||
port: port_for_cd,
|
||||
child_killers: Arc::new(Mutex::new(HashMap::new())),
|
||||
watcher_tx: watcher_for_cd,
|
||||
status_broadcaster: Arc::new(
|
||||
crate::service::status::StatusBroadcaster::new(),
|
||||
@@ -774,7 +768,6 @@ pub(super) async fn run_agent_spawn(
|
||||
let pool = AgentPool {
|
||||
agents: agents_for_cd,
|
||||
port: port_for_cd,
|
||||
child_killers: Arc::new(Mutex::new(HashMap::new())),
|
||||
watcher_tx: watcher_for_cd,
|
||||
status_broadcaster: Arc::new(
|
||||
crate::service::status::StatusBroadcaster::new(),
|
||||
@@ -862,7 +855,6 @@ pub(super) async fn run_agent_spawn(
|
||||
let pool = AgentPool {
|
||||
agents: agents_for_respawn,
|
||||
port: port_r,
|
||||
child_killers: Arc::new(Mutex::new(HashMap::new())),
|
||||
watcher_tx: watcher_for_respawn,
|
||||
status_broadcaster: Arc::new(
|
||||
crate::service::status::StatusBroadcaster::new(),
|
||||
|
||||
@@ -71,8 +71,7 @@ impl AgentPool {
|
||||
self.kill_child_for_key(&key);
|
||||
}
|
||||
|
||||
// Step 3: now safe to mutate. Status flip, handle abort, drop the
|
||||
// child_killers entry.
|
||||
// Step 3: now safe to mutate. Status flip and handle abort.
|
||||
let (task_handle, tx) = {
|
||||
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
||||
let agent = agents
|
||||
@@ -88,9 +87,6 @@ impl AgentPool {
|
||||
handle.abort();
|
||||
let _ = handle.await;
|
||||
}
|
||||
if let Ok(mut killers) = self.child_killers.lock() {
|
||||
killers.remove(&key);
|
||||
}
|
||||
|
||||
// Preserve worktree for inspection — don't destroy agent's work on stop.
|
||||
if let Some(ref wt) = worktree_info {
|
||||
|
||||
@@ -13,7 +13,6 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::agents::AgentEvent;
|
||||
use crate::io::watcher::WatcherEvent;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
@@ -41,7 +40,6 @@ mod tests {
|
||||
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
||||
let (watcher_tx, mut watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
||||
let event_log = Arc::new(Mutex::new(Vec::new()));
|
||||
let child_killers = Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
// sh -p "--" <script>: -p = privileged mode, "--" = end options,
|
||||
// then the script path is the file operand.
|
||||
@@ -56,7 +54,6 @@ mod tests {
|
||||
&event_log,
|
||||
None,
|
||||
0,
|
||||
child_killers,
|
||||
watcher_tx,
|
||||
None,
|
||||
None,
|
||||
@@ -98,7 +95,6 @@ mod tests {
|
||||
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
||||
let (watcher_tx, mut watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
||||
let event_log = Arc::new(Mutex::new(Vec::new()));
|
||||
let child_killers = Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
let result = run_agent_pty_streaming(
|
||||
"423_story_rate_limit",
|
||||
@@ -111,7 +107,6 @@ mod tests {
|
||||
&event_log,
|
||||
None,
|
||||
0,
|
||||
child_killers,
|
||||
watcher_tx,
|
||||
None,
|
||||
None,
|
||||
@@ -160,7 +155,6 @@ mod tests {
|
||||
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
||||
let (watcher_tx, mut watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
||||
let event_log = Arc::new(Mutex::new(Vec::new()));
|
||||
let child_killers = Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
let before = chrono::Utc::now();
|
||||
let result = run_agent_pty_streaming(
|
||||
@@ -174,7 +168,6 @@ mod tests {
|
||||
&event_log,
|
||||
None,
|
||||
0,
|
||||
child_killers,
|
||||
watcher_tx,
|
||||
None,
|
||||
None,
|
||||
@@ -229,7 +222,6 @@ mod tests {
|
||||
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
||||
let (watcher_tx, _watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
||||
let event_log = Arc::new(Mutex::new(Vec::new()));
|
||||
let child_killers = Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
let result = run_agent_pty_streaming(
|
||||
"916_story_rate_limit_extension",
|
||||
@@ -242,7 +234,6 @@ mod tests {
|
||||
&event_log,
|
||||
None,
|
||||
1, // inactivity_timeout_secs = 1s; would expire before the 3s sleep without the extension
|
||||
child_killers,
|
||||
watcher_tx,
|
||||
None,
|
||||
None,
|
||||
@@ -407,18 +398,16 @@ mod tests {
|
||||
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
||||
let (watcher_tx, _watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
||||
let event_log = Arc::new(Mutex::new(Vec::new()));
|
||||
let child_killers: Arc<
|
||||
Mutex<HashMap<String, Box<dyn portable_pty::ChildKiller + Send + Sync>>>,
|
||||
> = Arc::new(Mutex::new(HashMap::new()));
|
||||
let child_killers_for_kill = Arc::clone(&child_killers);
|
||||
|
||||
// Spawn a task to kill the child after a short delay (simulating watchdog).
|
||||
// Uses pids_matching on the script path — same mechanism as the production
|
||||
// watchdog after the process_kill migration (story 1090).
|
||||
let script_path_for_kill = script.to_string_lossy().to_string();
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
|
||||
if let Ok(mut killers) = child_killers_for_kill.lock() {
|
||||
for (_, killer) in killers.iter_mut() {
|
||||
let _ = killer.kill();
|
||||
}
|
||||
let pids = crate::process_kill::pids_matching(&script_path_for_kill);
|
||||
if !pids.is_empty() {
|
||||
let _ = crate::process_kill::sigkill_pids_and_verify(&pids);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -435,7 +424,6 @@ mod tests {
|
||||
&event_log,
|
||||
None,
|
||||
0, // no inactivity timeout
|
||||
child_killers,
|
||||
watcher_tx,
|
||||
None, // no session to resume
|
||||
Some((project_root.clone(), "sonnet".to_string())),
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
//! PTY process spawning and output loop: builds the command, drives the reader thread,
|
||||
//! and dispatches parsed JSON events to the broadcast channel.
|
||||
use std::collections::HashMap;
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use portable_pty::{ChildKiller, CommandBuilder, PtySize, native_pty_system};
|
||||
use portable_pty::{CommandBuilder, PtySize, native_pty_system};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::agent_log::AgentLogWriter;
|
||||
@@ -14,7 +13,7 @@ use crate::slog;
|
||||
use crate::slog_warn;
|
||||
|
||||
use super::events::{emit_event, handle_agent_stream_event};
|
||||
use super::types::{ChildKillerGuard, PtyResult, composite_key};
|
||||
use super::types::PtyResult;
|
||||
|
||||
/// Spawn claude agent in a PTY and stream events through the broadcast channel.
|
||||
///
|
||||
@@ -55,7 +54,6 @@ pub(in crate::agents) async fn run_agent_pty_streaming(
|
||||
event_log: &Arc<Mutex<Vec<AgentEvent>>>,
|
||||
log_writer: Option<Arc<Mutex<AgentLogWriter>>>,
|
||||
inactivity_timeout_secs: u64,
|
||||
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
||||
watcher_tx: broadcast::Sender<WatcherEvent>,
|
||||
session_id_to_resume: Option<&str>,
|
||||
eager_record: Option<(std::path::PathBuf, String)>,
|
||||
@@ -82,7 +80,6 @@ pub(in crate::agents) async fn run_agent_pty_streaming(
|
||||
&event_log,
|
||||
log_writer.as_deref(),
|
||||
inactivity_timeout_secs,
|
||||
&child_killers,
|
||||
&watcher_tx,
|
||||
resume_sid.as_deref(),
|
||||
eager_record,
|
||||
@@ -104,7 +101,6 @@ fn run_agent_pty_blocking(
|
||||
event_log: &Mutex<Vec<AgentEvent>>,
|
||||
log_writer: Option<&Mutex<AgentLogWriter>>,
|
||||
inactivity_timeout_secs: u64,
|
||||
child_killers: &Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
||||
watcher_tx: &broadcast::Sender<WatcherEvent>,
|
||||
session_id_to_resume: Option<&str>,
|
||||
eager_record: Option<(std::path::PathBuf, String)>,
|
||||
@@ -204,21 +200,6 @@ fn run_agent_pty_blocking(
|
||||
.spawn_command(cmd)
|
||||
.map_err(|e| format!("Failed to spawn agent for {story_id}:{agent_name}: {e}"))?;
|
||||
|
||||
// Register the child killer so that kill_all_children() / stop_agent() can
|
||||
// terminate this process on server shutdown, even if the blocking thread
|
||||
// cannot be interrupted. The ChildKillerGuard deregisters on function exit.
|
||||
let killer_key = composite_key(story_id, agent_name);
|
||||
{
|
||||
let killer = child.clone_killer();
|
||||
if let Ok(mut killers) = child_killers.lock() {
|
||||
killers.insert(killer_key.clone(), killer);
|
||||
}
|
||||
}
|
||||
let _killer_guard = ChildKillerGuard {
|
||||
killers: Arc::clone(child_killers),
|
||||
key: killer_key,
|
||||
};
|
||||
|
||||
drop(pair.slave);
|
||||
|
||||
let reader = pair
|
||||
|
||||
@@ -1,9 +1,4 @@
|
||||
//! Core types for the PTY runner: result container and process lifecycle helpers.
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use portable_pty::ChildKiller;
|
||||
|
||||
use crate::agents::TokenUsage;
|
||||
|
||||
/// Result from a PTY agent session, containing the session ID and token usage.
|
||||
@@ -23,20 +18,3 @@ pub(in crate::agents) struct PtyResult {
|
||||
/// event was seen or when the `reset_at` field was absent from the event.
|
||||
pub rate_limit_reset_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
}
|
||||
|
||||
pub(super) fn composite_key(story_id: &str, agent_name: &str) -> String {
|
||||
format!("{story_id}:{agent_name}")
|
||||
}
|
||||
|
||||
pub(super) struct ChildKillerGuard {
|
||||
pub killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
||||
pub key: String,
|
||||
}
|
||||
|
||||
impl Drop for ChildKillerGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Ok(mut killers) = self.killers.lock() {
|
||||
killers.remove(&self.key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
//! Claude Code runtime — launches Claude Code CLI sessions as agent backends.
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use portable_pty::ChildKiller;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::agent_log::AgentLogWriter;
|
||||
@@ -17,20 +15,13 @@ use super::{AgentEvent, AgentRuntime, RuntimeContext, RuntimeResult, RuntimeStat
|
||||
/// It wraps the existing PTY-based execution logic, preserving all streaming,
|
||||
/// token tracking, and inactivity timeout behaviour.
|
||||
pub struct ClaudeCodeRuntime {
|
||||
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
||||
watcher_tx: broadcast::Sender<WatcherEvent>,
|
||||
}
|
||||
|
||||
impl ClaudeCodeRuntime {
|
||||
/// Create a new Claude Code runtime with shared child-killer registry and event channel.
|
||||
pub fn new(
|
||||
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
||||
watcher_tx: broadcast::Sender<WatcherEvent>,
|
||||
) -> Self {
|
||||
Self {
|
||||
child_killers,
|
||||
watcher_tx,
|
||||
}
|
||||
/// Create a new Claude Code runtime with a shared event channel.
|
||||
pub fn new(watcher_tx: broadcast::Sender<WatcherEvent>) -> Self {
|
||||
Self { watcher_tx }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,7 +48,6 @@ impl AgentRuntime for ClaudeCodeRuntime {
|
||||
&event_log,
|
||||
log_writer.clone(),
|
||||
ctx.inactivity_timeout_secs,
|
||||
Arc::clone(&self.child_killers),
|
||||
self.watcher_tx.clone(),
|
||||
ctx.session_id_to_resume.as_deref(),
|
||||
eager_record.clone(),
|
||||
@@ -94,7 +84,6 @@ impl AgentRuntime for ClaudeCodeRuntime {
|
||||
&event_log,
|
||||
log_writer,
|
||||
ctx.inactivity_timeout_secs,
|
||||
Arc::clone(&self.child_killers),
|
||||
self.watcher_tx.clone(),
|
||||
None, // no --resume on fallback
|
||||
eager_record,
|
||||
@@ -115,7 +104,6 @@ impl AgentRuntime for ClaudeCodeRuntime {
|
||||
|
||||
fn stop(&self) {
|
||||
// Stopping is handled externally by the pool via kill_child_for_key().
|
||||
// The ChildKillerGuard in pty.rs deregisters automatically on process exit.
|
||||
}
|
||||
|
||||
fn get_status(&self) -> RuntimeStatus {
|
||||
|
||||
@@ -204,20 +204,16 @@ mod tests {
|
||||
#[test]
|
||||
fn claude_code_runtime_get_status_returns_idle() {
|
||||
use crate::io::watcher::WatcherEvent;
|
||||
use std::collections::HashMap;
|
||||
let killers = Arc::new(Mutex::new(HashMap::new()));
|
||||
let (watcher_tx, _) = broadcast::channel::<WatcherEvent>(16);
|
||||
let runtime = ClaudeCodeRuntime::new(killers, watcher_tx);
|
||||
let runtime = ClaudeCodeRuntime::new(watcher_tx);
|
||||
assert_eq!(runtime.get_status(), RuntimeStatus::Idle);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn claude_code_runtime_stream_events_empty() {
|
||||
use crate::io::watcher::WatcherEvent;
|
||||
use std::collections::HashMap;
|
||||
let killers = Arc::new(Mutex::new(HashMap::new()));
|
||||
let (watcher_tx, _) = broadcast::channel::<WatcherEvent>(16);
|
||||
let runtime = ClaudeCodeRuntime::new(killers, watcher_tx);
|
||||
let runtime = ClaudeCodeRuntime::new(watcher_tx);
|
||||
assert!(runtime.stream_events().is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user