From 560c731869b281d18e80ac54cf7d96c897448252 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 24 Feb 2026 13:13:16 +0000 Subject: [PATCH] story-kit: merge 134_story_add_process_health_monitoring_and_timeout_to_agent_pty_sessions --- server/src/agents.rs | 247 +++++++++++++++++++++++- server/src/config.rs | 10 + server/src/llm/providers/claude_code.rs | 1 + server/src/main.rs | 3 + 4 files changed, 256 insertions(+), 5 deletions(-) diff --git a/server/src/agents.rs b/server/src/agents.rs index 06a5633..ea062dc 100644 --- a/server/src/agents.rs +++ b/server/src/agents.rs @@ -372,6 +372,12 @@ impl AgentPool { prompt.push_str(ctx); } + // Extract inactivity timeout from the agent config before moving config. + let inactivity_timeout_secs = config + .find_agent(&resolved_name) + .map(|a| a.inactivity_timeout_secs) + .unwrap_or(300); + let sid = story_id.to_string(); let aname = resolved_name.clone(); let tx_clone = tx.clone(); @@ -392,6 +398,7 @@ impl AgentPool { match run_agent_pty_streaming( &sid, &aname, &command, &args, &prompt, &cwd, &tx_clone, &log_clone, log_writer_clone, + inactivity_timeout_secs, ) .await { @@ -1507,6 +1514,62 @@ impl AgentPool { ); tx } + + /// Inject a Running agent with a pre-built (possibly finished) task handle. + /// Used by watchdog tests to simulate an orphaned agent. + #[cfg(test)] + pub fn inject_test_agent_with_handle( + &self, + story_id: &str, + agent_name: &str, + status: AgentStatus, + task_handle: tokio::task::JoinHandle<()>, + ) -> broadcast::Sender { + let (tx, _) = broadcast::channel::(64); + let key = composite_key(story_id, agent_name); + let mut agents = self.agents.lock().unwrap(); + agents.insert( + key, + StoryAgent { + agent_name: agent_name.to_string(), + status, + worktree_info: None, + session_id: None, + tx: tx.clone(), + task_handle: Some(task_handle), + event_log: Arc::new(Mutex::new(Vec::new())), + completion: None, + project_root: None, + log_session_id: None, + }, + ); + tx + } + + /// Run a single watchdog pass synchronously (test helper). + #[cfg(test)] + pub fn run_watchdog_once(&self) { + check_orphaned_agents(&self.agents); + } + + /// Spawn a background watchdog task that periodically checks for Running agents + /// whose underlying task has already finished (orphaned entries). Any such agent + /// is marked Failed and an Error event is emitted so that `wait_for_agent` unblocks. + /// + /// The watchdog runs every 30 seconds. It is a safety net for edge cases where the + /// PTY read loop exits without updating the agent status (e.g. a panic in the + /// spawn_blocking task, or an external SIGKILL that closes the PTY fd immediately). + pub fn spawn_watchdog(&self) { + let agents = Arc::clone(&self.agents); + tokio::spawn(async move { + let mut interval = + tokio::time::interval(std::time::Duration::from_secs(30)); + loop { + interval.tick().await; + check_orphaned_agents(&agents); + } + }); + } } /// Return the active pipeline stage directory name for `story_id`, or `None` if the @@ -1588,6 +1651,54 @@ fn find_free_agent_for_stage<'a>( None } +/// Scan the agent pool for Running entries whose backing tokio task has already +/// finished and mark them as Failed. +/// +/// This handles the case where the PTY read loop or the spawned task exits +/// without updating the agent status — for example when the process is killed +/// externally and the PTY master fd returns EOF before our inactivity timeout +/// fires, but some other edge case prevents the normal cleanup path from running. +fn check_orphaned_agents(agents: &Mutex>) { + let mut lock = match agents.lock() { + Ok(l) => l, + Err(_) => return, + }; + + // Collect orphaned entries: Running agents whose task handle is finished. + let orphaned: Vec<(String, String, broadcast::Sender)> = lock + .iter() + .filter_map(|(key, agent)| { + if agent.status == AgentStatus::Running + && let Some(handle) = &agent.task_handle + && handle.is_finished() + { + let story_id = key + .rsplit_once(':') + .map(|(s, _)| s.to_string()) + .unwrap_or_else(|| key.clone()); + return Some((key.clone(), story_id, agent.tx.clone())); + } + None + }) + .collect(); + + for (key, story_id, tx) in orphaned { + if let Some(agent) = lock.get_mut(&key) { + agent.status = AgentStatus::Failed; + slog!( + "[watchdog] Orphaned agent '{key}': task finished but status was Running. \ + Marking Failed." + ); + let _ = tx.send(AgentEvent::Error { + story_id, + agent_name: agent.agent_name.clone(), + message: "Agent process terminated unexpectedly (watchdog detected orphan)" + .to_string(), + }); + } + } +} + /// Server-owned completion: runs acceptance gates when an agent process exits /// normally, and advances the pipeline based on results. /// @@ -2629,6 +2740,7 @@ async fn run_agent_pty_streaming( tx: &broadcast::Sender, event_log: &Arc>>, log_writer: Option>>, + inactivity_timeout_secs: u64, ) -> Result, String> { let sid = story_id.to_string(); let aname = agent_name.to_string(); @@ -2650,6 +2762,7 @@ async fn run_agent_pty_streaming( &tx, &event_log, log_writer.as_deref(), + inactivity_timeout_secs, ) }) .await @@ -2686,6 +2799,7 @@ fn run_agent_pty_blocking( tx: &broadcast::Sender, event_log: &Mutex>, log_writer: Option<&Mutex>, + inactivity_timeout_secs: u64, ) -> Result, String> { let pty_system = native_pty_system(); @@ -2740,13 +2854,57 @@ fn run_agent_pty_blocking( drop(pair.master); - let buf_reader = BufReader::new(reader); + // Spawn a reader thread to collect PTY output lines. + // We use a channel so the main thread can apply an inactivity deadline + // via recv_timeout: if no output arrives within the configured window + // the process is killed and the agent is marked Failed. + let (line_tx, line_rx) = std::sync::mpsc::channel::>(); + std::thread::spawn(move || { + let buf_reader = BufReader::new(reader); + for line in buf_reader.lines() { + if line_tx.send(line).is_err() { + break; + } + } + }); + + let timeout_dur = if inactivity_timeout_secs > 0 { + Some(std::time::Duration::from_secs(inactivity_timeout_secs)) + } else { + None + }; + let mut session_id: Option = None; - for line in buf_reader.lines() { - let line = match line { - Ok(l) => l, - Err(_) => break, + loop { + let recv_result = match timeout_dur { + Some(dur) => line_rx.recv_timeout(dur), + None => line_rx + .recv() + .map_err(|_| std::sync::mpsc::RecvTimeoutError::Disconnected), + }; + + let line = match recv_result { + Ok(Ok(l)) => l, + Ok(Err(_)) => { + // IO error reading from PTY — treat as EOF. + break; + } + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => { + // Reader thread exited (EOF from PTY). + break; + } + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + slog!( + "[agent:{story_id}:{agent_name}] Inactivity timeout after \ + {inactivity_timeout_secs}s with no output. Killing process." + ); + let _ = child.kill(); + let _ = child.wait(); + return Err(format!( + "Agent inactivity timeout: no output received for {inactivity_timeout_secs}s" + )); + } }; let trimmed = line.trim(); @@ -4845,4 +5003,83 @@ theirs // The report should accurately reflect what happened. assert!(report.had_conflicts, "should report conflicts"); } + + // ── process health monitoring tests ────────────────────────────────────── + + /// Demonstrates that the PTY read-loop inactivity timeout fires when no output + /// is produced by the agent process within the configured window. + /// + /// A `HangingReader` simulates a hung agent process that never writes to the + /// PTY master. The test verifies that `recv_timeout` fires with a `Timeout` + /// error — the signal that causes `run_agent_pty_blocking` to kill the child + /// and return `Err("Agent inactivity timeout: …")`, which the error handler + /// in `start_agent` converts into `AgentStatus::Failed`. + #[test] + fn pty_inactivity_timeout_kills_hung_agent() { + struct HangingReader; + impl std::io::Read for HangingReader { + fn read(&mut self, _buf: &mut [u8]) -> std::io::Result { + std::thread::sleep(std::time::Duration::from_secs(300)); + Ok(0) + } + } + + let (line_tx, line_rx) = + std::sync::mpsc::channel::>(); + + std::thread::spawn(move || { + let buf_reader = BufReader::new(HangingReader); + for line in buf_reader.lines() { + if line_tx.send(line).is_err() { + break; + } + } + }); + + let timeout_dur = std::time::Duration::from_millis(100); + let result = line_rx.recv_timeout(timeout_dur); + + assert!( + matches!( + result, + Err(std::sync::mpsc::RecvTimeoutError::Timeout) + ), + "recv_timeout must fire when no PTY output arrives within the deadline" + ); + } + + /// Demonstrates that the background watchdog detects Running agents whose + /// backing tokio task has already finished (orphaned entries) and marks them + /// as Failed, emitting an Error event so that `wait_for_agent` unblocks. + #[tokio::test] + async fn watchdog_detects_orphaned_running_agent() { + let pool = AgentPool::new(3001); + + let handle = tokio::spawn(async {}); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + assert!(handle.is_finished(), "task should be finished before injection"); + + let tx = + pool.inject_test_agent_with_handle("orphan_story", "coder", AgentStatus::Running, handle); + let mut rx = tx.subscribe(); + + pool.run_watchdog_once(); + + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("orphan_story", "coder"); + let agent = agents.get(&key).unwrap(); + assert_eq!( + agent.status, + AgentStatus::Failed, + "watchdog must mark an orphaned Running agent as Failed" + ); + } + + let event = rx.try_recv().expect("watchdog must emit an Error event"); + assert!( + matches!(event, AgentEvent::Error { .. }), + "expected AgentEvent::Error, got: {event:?}" + ); + } } diff --git a/server/src/config.rs b/server/src/config.rs index b96c291..a9667e1 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -45,6 +45,11 @@ pub struct AgentConfig { pub max_budget_usd: Option, #[serde(default)] pub system_prompt: Option, + /// Inactivity timeout in seconds for the PTY read loop. + /// If no output is received within this duration, the agent process is killed + /// and marked as Failed. Default: 300 (5 minutes). Set to 0 to disable. + #[serde(default = "default_inactivity_timeout_secs")] + pub inactivity_timeout_secs: u64, } fn default_path() -> String { @@ -55,6 +60,10 @@ fn default_agent_name() -> String { "default".to_string() } +fn default_inactivity_timeout_secs() -> u64 { + 300 +} + fn default_agent_command() -> String { "claude".to_string() } @@ -90,6 +99,7 @@ impl Default for ProjectConfig { max_turns: None, max_budget_usd: None, system_prompt: None, + inactivity_timeout_secs: default_inactivity_timeout_secs(), }], } } diff --git a/server/src/llm/providers/claude_code.rs b/server/src/llm/providers/claude_code.rs index d00f194..3e81753 100644 --- a/server/src/llm/providers/claude_code.rs +++ b/server/src/llm/providers/claude_code.rs @@ -122,6 +122,7 @@ impl ClaudeCodeProvider { /// Permission handling is delegated to the MCP `prompt_permission` tool /// via `--permission-prompt-tool`. Claude Code calls the MCP tool when it /// needs user approval, and the server bridges the request to the frontend. +#[allow(clippy::too_many_arguments)] fn run_pty_session( user_message: &str, cwd: &str, diff --git a/server/src/main.rs b/server/src/main.rs index 824d9af..d3505c1 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -57,6 +57,9 @@ async fn main() -> Result<(), std::io::Error> { let port = resolve_port(); let agents = Arc::new(AgentPool::new(port)); + // Start the background watchdog that detects and cleans up orphaned Running agents. + agents.spawn_watchdog(); + // Filesystem watcher: broadcast channel for work/ pipeline changes. let (watcher_tx, _) = broadcast::channel::(1024); if let Some(ref root) = *app_state.project_root.lock().unwrap() {