story-kit: merge 134_story_add_process_health_monitoring_and_timeout_to_agent_pty_sessions
This commit is contained in:
@@ -372,6 +372,12 @@ impl AgentPool {
|
||||
prompt.push_str(ctx);
|
||||
}
|
||||
|
||||
// Extract inactivity timeout from the agent config before moving config.
|
||||
let inactivity_timeout_secs = config
|
||||
.find_agent(&resolved_name)
|
||||
.map(|a| a.inactivity_timeout_secs)
|
||||
.unwrap_or(300);
|
||||
|
||||
let sid = story_id.to_string();
|
||||
let aname = resolved_name.clone();
|
||||
let tx_clone = tx.clone();
|
||||
@@ -392,6 +398,7 @@ impl AgentPool {
|
||||
match run_agent_pty_streaming(
|
||||
&sid, &aname, &command, &args, &prompt, &cwd, &tx_clone, &log_clone,
|
||||
log_writer_clone,
|
||||
inactivity_timeout_secs,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -1507,6 +1514,62 @@ impl AgentPool {
|
||||
);
|
||||
tx
|
||||
}
|
||||
|
||||
/// Inject a Running agent with a pre-built (possibly finished) task handle.
|
||||
/// Used by watchdog tests to simulate an orphaned agent.
|
||||
#[cfg(test)]
|
||||
pub fn inject_test_agent_with_handle(
|
||||
&self,
|
||||
story_id: &str,
|
||||
agent_name: &str,
|
||||
status: AgentStatus,
|
||||
task_handle: tokio::task::JoinHandle<()>,
|
||||
) -> broadcast::Sender<AgentEvent> {
|
||||
let (tx, _) = broadcast::channel::<AgentEvent>(64);
|
||||
let key = composite_key(story_id, agent_name);
|
||||
let mut agents = self.agents.lock().unwrap();
|
||||
agents.insert(
|
||||
key,
|
||||
StoryAgent {
|
||||
agent_name: agent_name.to_string(),
|
||||
status,
|
||||
worktree_info: None,
|
||||
session_id: None,
|
||||
tx: tx.clone(),
|
||||
task_handle: Some(task_handle),
|
||||
event_log: Arc::new(Mutex::new(Vec::new())),
|
||||
completion: None,
|
||||
project_root: None,
|
||||
log_session_id: None,
|
||||
},
|
||||
);
|
||||
tx
|
||||
}
|
||||
|
||||
/// Run a single watchdog pass synchronously (test helper).
|
||||
#[cfg(test)]
|
||||
pub fn run_watchdog_once(&self) {
|
||||
check_orphaned_agents(&self.agents);
|
||||
}
|
||||
|
||||
/// Spawn a background watchdog task that periodically checks for Running agents
|
||||
/// whose underlying task has already finished (orphaned entries). Any such agent
|
||||
/// is marked Failed and an Error event is emitted so that `wait_for_agent` unblocks.
|
||||
///
|
||||
/// The watchdog runs every 30 seconds. It is a safety net for edge cases where the
|
||||
/// PTY read loop exits without updating the agent status (e.g. a panic in the
|
||||
/// spawn_blocking task, or an external SIGKILL that closes the PTY fd immediately).
|
||||
pub fn spawn_watchdog(&self) {
|
||||
let agents = Arc::clone(&self.agents);
|
||||
tokio::spawn(async move {
|
||||
let mut interval =
|
||||
tokio::time::interval(std::time::Duration::from_secs(30));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
check_orphaned_agents(&agents);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the active pipeline stage directory name for `story_id`, or `None` if the
|
||||
@@ -1588,6 +1651,54 @@ fn find_free_agent_for_stage<'a>(
|
||||
None
|
||||
}
|
||||
|
||||
/// Scan the agent pool for Running entries whose backing tokio task has already
|
||||
/// finished and mark them as Failed.
|
||||
///
|
||||
/// This handles the case where the PTY read loop or the spawned task exits
|
||||
/// without updating the agent status — for example when the process is killed
|
||||
/// externally and the PTY master fd returns EOF before our inactivity timeout
|
||||
/// fires, but some other edge case prevents the normal cleanup path from running.
|
||||
fn check_orphaned_agents(agents: &Mutex<HashMap<String, StoryAgent>>) {
|
||||
let mut lock = match agents.lock() {
|
||||
Ok(l) => l,
|
||||
Err(_) => return,
|
||||
};
|
||||
|
||||
// Collect orphaned entries: Running agents whose task handle is finished.
|
||||
let orphaned: Vec<(String, String, broadcast::Sender<AgentEvent>)> = lock
|
||||
.iter()
|
||||
.filter_map(|(key, agent)| {
|
||||
if agent.status == AgentStatus::Running
|
||||
&& let Some(handle) = &agent.task_handle
|
||||
&& handle.is_finished()
|
||||
{
|
||||
let story_id = key
|
||||
.rsplit_once(':')
|
||||
.map(|(s, _)| s.to_string())
|
||||
.unwrap_or_else(|| key.clone());
|
||||
return Some((key.clone(), story_id, agent.tx.clone()));
|
||||
}
|
||||
None
|
||||
})
|
||||
.collect();
|
||||
|
||||
for (key, story_id, tx) in orphaned {
|
||||
if let Some(agent) = lock.get_mut(&key) {
|
||||
agent.status = AgentStatus::Failed;
|
||||
slog!(
|
||||
"[watchdog] Orphaned agent '{key}': task finished but status was Running. \
|
||||
Marking Failed."
|
||||
);
|
||||
let _ = tx.send(AgentEvent::Error {
|
||||
story_id,
|
||||
agent_name: agent.agent_name.clone(),
|
||||
message: "Agent process terminated unexpectedly (watchdog detected orphan)"
|
||||
.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Server-owned completion: runs acceptance gates when an agent process exits
|
||||
/// normally, and advances the pipeline based on results.
|
||||
///
|
||||
@@ -2629,6 +2740,7 @@ async fn run_agent_pty_streaming(
|
||||
tx: &broadcast::Sender<AgentEvent>,
|
||||
event_log: &Arc<Mutex<Vec<AgentEvent>>>,
|
||||
log_writer: Option<Arc<Mutex<AgentLogWriter>>>,
|
||||
inactivity_timeout_secs: u64,
|
||||
) -> Result<Option<String>, String> {
|
||||
let sid = story_id.to_string();
|
||||
let aname = agent_name.to_string();
|
||||
@@ -2650,6 +2762,7 @@ async fn run_agent_pty_streaming(
|
||||
&tx,
|
||||
&event_log,
|
||||
log_writer.as_deref(),
|
||||
inactivity_timeout_secs,
|
||||
)
|
||||
})
|
||||
.await
|
||||
@@ -2686,6 +2799,7 @@ fn run_agent_pty_blocking(
|
||||
tx: &broadcast::Sender<AgentEvent>,
|
||||
event_log: &Mutex<Vec<AgentEvent>>,
|
||||
log_writer: Option<&Mutex<AgentLogWriter>>,
|
||||
inactivity_timeout_secs: u64,
|
||||
) -> Result<Option<String>, String> {
|
||||
let pty_system = native_pty_system();
|
||||
|
||||
@@ -2740,13 +2854,57 @@ fn run_agent_pty_blocking(
|
||||
|
||||
drop(pair.master);
|
||||
|
||||
let buf_reader = BufReader::new(reader);
|
||||
// Spawn a reader thread to collect PTY output lines.
|
||||
// We use a channel so the main thread can apply an inactivity deadline
|
||||
// via recv_timeout: if no output arrives within the configured window
|
||||
// the process is killed and the agent is marked Failed.
|
||||
let (line_tx, line_rx) = std::sync::mpsc::channel::<std::io::Result<String>>();
|
||||
std::thread::spawn(move || {
|
||||
let buf_reader = BufReader::new(reader);
|
||||
for line in buf_reader.lines() {
|
||||
if line_tx.send(line).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let timeout_dur = if inactivity_timeout_secs > 0 {
|
||||
Some(std::time::Duration::from_secs(inactivity_timeout_secs))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut session_id: Option<String> = None;
|
||||
|
||||
for line in buf_reader.lines() {
|
||||
let line = match line {
|
||||
Ok(l) => l,
|
||||
Err(_) => break,
|
||||
loop {
|
||||
let recv_result = match timeout_dur {
|
||||
Some(dur) => line_rx.recv_timeout(dur),
|
||||
None => line_rx
|
||||
.recv()
|
||||
.map_err(|_| std::sync::mpsc::RecvTimeoutError::Disconnected),
|
||||
};
|
||||
|
||||
let line = match recv_result {
|
||||
Ok(Ok(l)) => l,
|
||||
Ok(Err(_)) => {
|
||||
// IO error reading from PTY — treat as EOF.
|
||||
break;
|
||||
}
|
||||
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
|
||||
// Reader thread exited (EOF from PTY).
|
||||
break;
|
||||
}
|
||||
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
|
||||
slog!(
|
||||
"[agent:{story_id}:{agent_name}] Inactivity timeout after \
|
||||
{inactivity_timeout_secs}s with no output. Killing process."
|
||||
);
|
||||
let _ = child.kill();
|
||||
let _ = child.wait();
|
||||
return Err(format!(
|
||||
"Agent inactivity timeout: no output received for {inactivity_timeout_secs}s"
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let trimmed = line.trim();
|
||||
@@ -4845,4 +5003,83 @@ theirs
|
||||
// The report should accurately reflect what happened.
|
||||
assert!(report.had_conflicts, "should report conflicts");
|
||||
}
|
||||
|
||||
// ── process health monitoring tests ──────────────────────────────────────
|
||||
|
||||
/// Demonstrates that the PTY read-loop inactivity timeout fires when no output
|
||||
/// is produced by the agent process within the configured window.
|
||||
///
|
||||
/// A `HangingReader` simulates a hung agent process that never writes to the
|
||||
/// PTY master. The test verifies that `recv_timeout` fires with a `Timeout`
|
||||
/// error — the signal that causes `run_agent_pty_blocking` to kill the child
|
||||
/// and return `Err("Agent inactivity timeout: …")`, which the error handler
|
||||
/// in `start_agent` converts into `AgentStatus::Failed`.
|
||||
#[test]
|
||||
fn pty_inactivity_timeout_kills_hung_agent() {
|
||||
struct HangingReader;
|
||||
impl std::io::Read for HangingReader {
|
||||
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
std::thread::sleep(std::time::Duration::from_secs(300));
|
||||
Ok(0)
|
||||
}
|
||||
}
|
||||
|
||||
let (line_tx, line_rx) =
|
||||
std::sync::mpsc::channel::<std::io::Result<String>>();
|
||||
|
||||
std::thread::spawn(move || {
|
||||
let buf_reader = BufReader::new(HangingReader);
|
||||
for line in buf_reader.lines() {
|
||||
if line_tx.send(line).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let timeout_dur = std::time::Duration::from_millis(100);
|
||||
let result = line_rx.recv_timeout(timeout_dur);
|
||||
|
||||
assert!(
|
||||
matches!(
|
||||
result,
|
||||
Err(std::sync::mpsc::RecvTimeoutError::Timeout)
|
||||
),
|
||||
"recv_timeout must fire when no PTY output arrives within the deadline"
|
||||
);
|
||||
}
|
||||
|
||||
/// Demonstrates that the background watchdog detects Running agents whose
|
||||
/// backing tokio task has already finished (orphaned entries) and marks them
|
||||
/// as Failed, emitting an Error event so that `wait_for_agent` unblocks.
|
||||
#[tokio::test]
|
||||
async fn watchdog_detects_orphaned_running_agent() {
|
||||
let pool = AgentPool::new(3001);
|
||||
|
||||
let handle = tokio::spawn(async {});
|
||||
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
||||
assert!(handle.is_finished(), "task should be finished before injection");
|
||||
|
||||
let tx =
|
||||
pool.inject_test_agent_with_handle("orphan_story", "coder", AgentStatus::Running, handle);
|
||||
let mut rx = tx.subscribe();
|
||||
|
||||
pool.run_watchdog_once();
|
||||
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("orphan_story", "coder");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert_eq!(
|
||||
agent.status,
|
||||
AgentStatus::Failed,
|
||||
"watchdog must mark an orphaned Running agent as Failed"
|
||||
);
|
||||
}
|
||||
|
||||
let event = rx.try_recv().expect("watchdog must emit an Error event");
|
||||
assert!(
|
||||
matches!(event, AgentEvent::Error { .. }),
|
||||
"expected AgentEvent::Error, got: {event:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user