story-kit: merge 159_bug_server_restart_leaves_orphaned_claude_code_pty_processes_running
This commit is contained in:
@@ -4,7 +4,7 @@ use crate::slog_error;
|
|||||||
use crate::slog_warn;
|
use crate::slog_warn;
|
||||||
use crate::config::{AgentConfig, ProjectConfig};
|
use crate::config::{AgentConfig, ProjectConfig};
|
||||||
use crate::worktree::{self, WorktreeInfo};
|
use crate::worktree::{self, WorktreeInfo};
|
||||||
use portable_pty::{CommandBuilder, PtySize, native_pty_system};
|
use portable_pty::{ChildKiller, CommandBuilder, PtySize, native_pty_system};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::{BufRead, BufReader};
|
use std::io::{BufRead, BufReader};
|
||||||
@@ -243,6 +243,27 @@ fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> AgentInfo {
|
|||||||
pub struct AgentPool {
|
pub struct AgentPool {
|
||||||
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
|
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
|
||||||
port: u16,
|
port: u16,
|
||||||
|
/// Registry of active PTY child process killers, keyed by "{story_id}:{agent_name}".
|
||||||
|
/// Used to terminate child processes on server shutdown or agent stop, preventing
|
||||||
|
/// orphaned Claude Code processes from running after the server exits.
|
||||||
|
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// RAII guard that removes a child killer from the registry on drop.
|
||||||
|
///
|
||||||
|
/// This ensures the killer is always cleaned up when `run_agent_pty_blocking`
|
||||||
|
/// returns, regardless of the exit path (normal completion, timeout, or error).
|
||||||
|
struct ChildKillerGuard {
|
||||||
|
killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
||||||
|
key: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for ChildKillerGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Ok(mut killers) = self.killers.lock() {
|
||||||
|
killers.remove(&self.key);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AgentPool {
|
impl AgentPool {
|
||||||
@@ -250,6 +271,35 @@ impl AgentPool {
|
|||||||
Self {
|
Self {
|
||||||
agents: Arc::new(Mutex::new(HashMap::new())),
|
agents: Arc::new(Mutex::new(HashMap::new())),
|
||||||
port,
|
port,
|
||||||
|
child_killers: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Kill all active PTY child processes.
|
||||||
|
///
|
||||||
|
/// Called on server shutdown to prevent orphaned Claude Code processes from
|
||||||
|
/// continuing to run after the server exits. Each registered killer is called
|
||||||
|
/// once, then the registry is cleared.
|
||||||
|
pub fn kill_all_children(&self) {
|
||||||
|
if let Ok(mut killers) = self.child_killers.lock() {
|
||||||
|
for (key, killer) in killers.iter_mut() {
|
||||||
|
slog!("[agents] Killing child process for {key} on shutdown");
|
||||||
|
let _ = killer.kill();
|
||||||
|
}
|
||||||
|
killers.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Kill and deregister the child process for a specific agent key.
|
||||||
|
///
|
||||||
|
/// Used by `stop_agent` to ensure the PTY child is terminated even though
|
||||||
|
/// aborting a `spawn_blocking` task handle does not interrupt the blocking thread.
|
||||||
|
fn kill_child_for_key(&self, key: &str) {
|
||||||
|
if let Ok(mut killers) = self.child_killers.lock()
|
||||||
|
&& let Some(mut killer) = killers.remove(key)
|
||||||
|
{
|
||||||
|
slog!("[agents] Killing child process for {key} on stop");
|
||||||
|
let _ = killer.kill();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -394,6 +444,7 @@ impl AgentPool {
|
|||||||
let log_clone = event_log.clone();
|
let log_clone = event_log.clone();
|
||||||
let port_for_task = self.port;
|
let port_for_task = self.port;
|
||||||
let log_writer_clone = log_writer.clone();
|
let log_writer_clone = log_writer.clone();
|
||||||
|
let child_killers_clone = self.child_killers.clone();
|
||||||
|
|
||||||
// Spawn the background task. Worktree creation and agent launch happen here
|
// Spawn the background task. Worktree creation and agent launch happen here
|
||||||
// so `start_agent` returns immediately after registering the agent as
|
// so `start_agent` returns immediately after registering the agent as
|
||||||
@@ -490,6 +541,7 @@ impl AgentPool {
|
|||||||
&log_clone,
|
&log_clone,
|
||||||
log_writer_clone,
|
log_writer_clone,
|
||||||
inactivity_timeout_secs,
|
inactivity_timeout_secs,
|
||||||
|
child_killers_clone,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
@@ -566,11 +618,14 @@ impl AgentPool {
|
|||||||
(wt, handle, tx)
|
(wt, handle, tx)
|
||||||
};
|
};
|
||||||
|
|
||||||
// Abort the task
|
// Abort the task and kill the PTY child process.
|
||||||
|
// Note: aborting a spawn_blocking task handle does not interrupt the blocking
|
||||||
|
// thread, so we must also kill the child process directly via the killer registry.
|
||||||
if let Some(handle) = task_handle {
|
if let Some(handle) = task_handle {
|
||||||
handle.abort();
|
handle.abort();
|
||||||
let _ = handle.await;
|
let _ = handle.await;
|
||||||
}
|
}
|
||||||
|
self.kill_child_for_key(&key);
|
||||||
|
|
||||||
// Preserve worktree for inspection — don't destroy agent's work on stop.
|
// Preserve worktree for inspection — don't destroy agent's work on stop.
|
||||||
if let Some(ref wt) = worktree_info {
|
if let Some(ref wt) = worktree_info {
|
||||||
@@ -1076,6 +1131,7 @@ impl AgentPool {
|
|||||||
let pool_clone = Self {
|
let pool_clone = Self {
|
||||||
agents: Arc::clone(&self.agents),
|
agents: Arc::clone(&self.agents),
|
||||||
port: self.port,
|
port: self.port,
|
||||||
|
child_killers: Arc::clone(&self.child_killers),
|
||||||
};
|
};
|
||||||
let sid = story_id.to_string();
|
let sid = story_id.to_string();
|
||||||
let aname = agent_name.to_string();
|
let aname = agent_name.to_string();
|
||||||
@@ -1645,6 +1701,19 @@ impl AgentPool {
|
|||||||
tx
|
tx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Test helper: inject a child killer into the registry.
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn inject_child_killer(&self, key: &str, killer: Box<dyn ChildKiller + Send + Sync>) {
|
||||||
|
let mut killers = self.child_killers.lock().unwrap();
|
||||||
|
killers.insert(key.to_string(), killer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test helper: return the number of registered child killers.
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn child_killer_count(&self) -> usize {
|
||||||
|
self.child_killers.lock().unwrap().len()
|
||||||
|
}
|
||||||
|
|
||||||
/// Run a single watchdog pass synchronously (test helper).
|
/// Run a single watchdog pass synchronously (test helper).
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub fn run_watchdog_once(&self) {
|
pub fn run_watchdog_once(&self) {
|
||||||
@@ -2002,7 +2071,11 @@ fn spawn_pipeline_advance(
|
|||||||
let sid = story_id.to_string();
|
let sid = story_id.to_string();
|
||||||
let aname = agent_name.to_string();
|
let aname = agent_name.to_string();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let pool = AgentPool { agents, port };
|
let pool = AgentPool {
|
||||||
|
agents,
|
||||||
|
port,
|
||||||
|
child_killers: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
};
|
||||||
pool.run_pipeline_advance_for_completed_agent(&sid, &aname)
|
pool.run_pipeline_advance_for_completed_agent(&sid, &aname)
|
||||||
.await;
|
.await;
|
||||||
});
|
});
|
||||||
@@ -3062,6 +3135,7 @@ async fn run_agent_pty_streaming(
|
|||||||
event_log: &Arc<Mutex<Vec<AgentEvent>>>,
|
event_log: &Arc<Mutex<Vec<AgentEvent>>>,
|
||||||
log_writer: Option<Arc<Mutex<AgentLogWriter>>>,
|
log_writer: Option<Arc<Mutex<AgentLogWriter>>>,
|
||||||
inactivity_timeout_secs: u64,
|
inactivity_timeout_secs: u64,
|
||||||
|
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
||||||
) -> Result<Option<String>, String> {
|
) -> Result<Option<String>, String> {
|
||||||
let sid = story_id.to_string();
|
let sid = story_id.to_string();
|
||||||
let aname = agent_name.to_string();
|
let aname = agent_name.to_string();
|
||||||
@@ -3084,6 +3158,7 @@ async fn run_agent_pty_streaming(
|
|||||||
&event_log,
|
&event_log,
|
||||||
log_writer.as_deref(),
|
log_writer.as_deref(),
|
||||||
inactivity_timeout_secs,
|
inactivity_timeout_secs,
|
||||||
|
&child_killers,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
@@ -3121,6 +3196,7 @@ fn run_agent_pty_blocking(
|
|||||||
event_log: &Mutex<Vec<AgentEvent>>,
|
event_log: &Mutex<Vec<AgentEvent>>,
|
||||||
log_writer: Option<&Mutex<AgentLogWriter>>,
|
log_writer: Option<&Mutex<AgentLogWriter>>,
|
||||||
inactivity_timeout_secs: u64,
|
inactivity_timeout_secs: u64,
|
||||||
|
child_killers: &Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
||||||
) -> Result<Option<String>, String> {
|
) -> Result<Option<String>, String> {
|
||||||
let pty_system = native_pty_system();
|
let pty_system = native_pty_system();
|
||||||
|
|
||||||
@@ -3166,6 +3242,21 @@ fn run_agent_pty_blocking(
|
|||||||
.spawn_command(cmd)
|
.spawn_command(cmd)
|
||||||
.map_err(|e| format!("Failed to spawn agent for {story_id}:{agent_name}: {e}"))?;
|
.map_err(|e| format!("Failed to spawn agent for {story_id}:{agent_name}: {e}"))?;
|
||||||
|
|
||||||
|
// Register the child killer so that kill_all_children() / stop_agent() can
|
||||||
|
// terminate this process on server shutdown, even if the blocking thread
|
||||||
|
// cannot be interrupted. The ChildKillerGuard deregisters on function exit.
|
||||||
|
let killer_key = composite_key(story_id, agent_name);
|
||||||
|
{
|
||||||
|
let killer = child.clone_killer();
|
||||||
|
if let Ok(mut killers) = child_killers.lock() {
|
||||||
|
killers.insert(killer_key.clone(), killer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let _killer_guard = ChildKillerGuard {
|
||||||
|
killers: Arc::clone(child_killers),
|
||||||
|
key: killer_key,
|
||||||
|
};
|
||||||
|
|
||||||
drop(pair.slave);
|
drop(pair.slave);
|
||||||
|
|
||||||
let reader = pair
|
let reader = pair
|
||||||
@@ -6003,4 +6094,107 @@ theirs
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── kill_all_children tests ────────────────────────────────────
|
||||||
|
|
||||||
|
/// Returns true if a process with the given PID is currently running.
|
||||||
|
fn process_is_running(pid: u32) -> bool {
|
||||||
|
std::process::Command::new("ps")
|
||||||
|
.arg("-p")
|
||||||
|
.arg(pid.to_string())
|
||||||
|
.stdout(std::process::Stdio::null())
|
||||||
|
.stderr(std::process::Stdio::null())
|
||||||
|
.status()
|
||||||
|
.map(|s| s.success())
|
||||||
|
.unwrap_or(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn kill_all_children_is_safe_on_empty_pool() {
|
||||||
|
let pool = AgentPool::new(3001);
|
||||||
|
// Should not panic or deadlock on an empty registry.
|
||||||
|
pool.kill_all_children();
|
||||||
|
assert_eq!(pool.child_killer_count(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn kill_all_children_kills_real_process() {
|
||||||
|
// GIVEN: a real PTY child process (sleep 100) with its killer registered.
|
||||||
|
let pool = AgentPool::new(3001);
|
||||||
|
|
||||||
|
let pty_system = native_pty_system();
|
||||||
|
let pair = pty_system
|
||||||
|
.openpty(PtySize {
|
||||||
|
rows: 24,
|
||||||
|
cols: 80,
|
||||||
|
pixel_width: 0,
|
||||||
|
pixel_height: 0,
|
||||||
|
})
|
||||||
|
.expect("failed to open pty");
|
||||||
|
|
||||||
|
let mut cmd = CommandBuilder::new("sleep");
|
||||||
|
cmd.arg("100");
|
||||||
|
let mut child = pair
|
||||||
|
.slave
|
||||||
|
.spawn_command(cmd)
|
||||||
|
.expect("failed to spawn sleep");
|
||||||
|
let pid = child.process_id().expect("no pid");
|
||||||
|
|
||||||
|
pool.inject_child_killer("story:agent", child.clone_killer());
|
||||||
|
|
||||||
|
// Verify the process is alive before we kill it.
|
||||||
|
assert!(
|
||||||
|
process_is_running(pid),
|
||||||
|
"process {pid} should be running before kill_all_children"
|
||||||
|
);
|
||||||
|
|
||||||
|
// WHEN: kill_all_children() is called.
|
||||||
|
pool.kill_all_children();
|
||||||
|
|
||||||
|
// Collect the exit status (prevents zombie; also ensures signal was sent).
|
||||||
|
let _ = child.wait();
|
||||||
|
|
||||||
|
// THEN: the process should be dead.
|
||||||
|
assert!(
|
||||||
|
!process_is_running(pid),
|
||||||
|
"process {pid} should have been killed by kill_all_children"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn kill_all_children_clears_registry() {
|
||||||
|
// GIVEN: a pool with one registered killer.
|
||||||
|
let pool = AgentPool::new(3001);
|
||||||
|
|
||||||
|
let pty_system = native_pty_system();
|
||||||
|
let pair = pty_system
|
||||||
|
.openpty(PtySize {
|
||||||
|
rows: 24,
|
||||||
|
cols: 80,
|
||||||
|
pixel_width: 0,
|
||||||
|
pixel_height: 0,
|
||||||
|
})
|
||||||
|
.expect("failed to open pty");
|
||||||
|
|
||||||
|
let mut cmd = CommandBuilder::new("sleep");
|
||||||
|
cmd.arg("1");
|
||||||
|
let mut child = pair
|
||||||
|
.slave
|
||||||
|
.spawn_command(cmd)
|
||||||
|
.expect("failed to spawn sleep");
|
||||||
|
|
||||||
|
pool.inject_child_killer("story:agent", child.clone_killer());
|
||||||
|
assert_eq!(pool.child_killer_count(), 1);
|
||||||
|
|
||||||
|
// WHEN: kill_all_children() is called.
|
||||||
|
pool.kill_all_children();
|
||||||
|
let _ = child.wait();
|
||||||
|
|
||||||
|
// THEN: the registry is empty.
|
||||||
|
assert_eq!(
|
||||||
|
pool.child_killer_count(),
|
||||||
|
0,
|
||||||
|
"child_killers should be cleared after kill_all_children"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -110,6 +110,8 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
let startup_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
|
let startup_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
|
||||||
let startup_agents = Arc::clone(&agents);
|
let startup_agents = Arc::clone(&agents);
|
||||||
let startup_reconciliation_tx = reconciliation_tx.clone();
|
let startup_reconciliation_tx = reconciliation_tx.clone();
|
||||||
|
// Clone for shutdown cleanup — kill orphaned PTY children before exiting.
|
||||||
|
let agents_for_shutdown = Arc::clone(&agents);
|
||||||
|
|
||||||
let ctx = AppContext {
|
let ctx = AppContext {
|
||||||
state: app_state,
|
state: app_state,
|
||||||
@@ -170,6 +172,10 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
|
|
||||||
let result = Server::new(TcpListener::bind(&addr)).run(app).await;
|
let result = Server::new(TcpListener::bind(&addr)).run(app).await;
|
||||||
|
|
||||||
|
// Kill all active PTY child processes before exiting to prevent orphaned
|
||||||
|
// Claude Code processes from running after the server restarts.
|
||||||
|
agents_for_shutdown.kill_all_children();
|
||||||
|
|
||||||
if let Some(ref path) = port_file {
|
if let Some(ref path) = port_file {
|
||||||
remove_port_file(path);
|
remove_port_file(path);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user