From fb91096b09a376ede8a4f86f95499f90dffc4e8f Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 24 Feb 2026 17:28:45 +0000 Subject: [PATCH] story-kit: merge 161_bug_auto_assign_only_triggers_on_agent_completion_not_on_failure_or_periodically --- server/src/agents.rs | 96 +++++++++++++++++++++++++++++++++++++++++--- server/src/main.rs | 31 +++++++++++++- 2 files changed, 121 insertions(+), 6 deletions(-) diff --git a/server/src/agents.rs b/server/src/agents.rs index c547cc9..12cacd4 100644 --- a/server/src/agents.rs +++ b/server/src/agents.rs @@ -1658,14 +1658,24 @@ impl AgentPool { /// The watchdog runs every 30 seconds. It is a safety net for edge cases where the /// PTY read loop exits without updating the agent status (e.g. a panic in the /// spawn_blocking task, or an external SIGKILL that closes the PTY fd immediately). - pub fn spawn_watchdog(&self) { - let agents = Arc::clone(&self.agents); + /// + /// When orphaned agents are detected and a `project_root` is provided, auto-assign + /// is triggered so that free agents can pick up unassigned work. + pub fn spawn_watchdog(pool: Arc, project_root: Option) { tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); loop { interval.tick().await; - check_orphaned_agents(&agents); + let found = check_orphaned_agents(&pool.agents); + if found > 0 + && let Some(ref root) = project_root + { + slog!( + "[watchdog] {found} orphaned agent(s) detected; triggering auto-assign." + ); + pool.auto_assign_available_work(root).await; + } } }); } @@ -1825,10 +1835,10 @@ fn find_free_agent_for_stage<'a>( /// without updating the agent status — for example when the process is killed /// externally and the PTY master fd returns EOF before our inactivity timeout /// fires, but some other edge case prevents the normal cleanup path from running. -fn check_orphaned_agents(agents: &Mutex>) { +fn check_orphaned_agents(agents: &Mutex>) -> usize { let mut lock = match agents.lock() { Ok(l) => l, - Err(_) => return, + Err(_) => return 0, }; // Collect orphaned entries: Running or Pending agents whose task handle is finished. @@ -1850,6 +1860,7 @@ fn check_orphaned_agents(agents: &Mutex>) { }) .collect(); + let count = orphaned.len(); for (key, story_id, tx, prev_status) in orphaned { if let Some(agent) = lock.get_mut(&key) { agent.status = AgentStatus::Failed; @@ -1865,6 +1876,7 @@ fn check_orphaned_agents(agents: &Mutex>) { }); } } + count } /// Server-owned completion: runs acceptance gates when an agent process exits @@ -5917,4 +5929,78 @@ theirs "merge workspace should be cleaned up after failure" ); } + + // ── check_orphaned_agents return value tests (bug 161) ────────────────── + + #[tokio::test] + async fn check_orphaned_agents_returns_count_of_orphaned_agents() { + let pool = AgentPool::new(3001); + + // Spawn two tasks that finish immediately. + let h1 = tokio::spawn(async {}); + let h2 = tokio::spawn(async {}); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + assert!(h1.is_finished()); + assert!(h2.is_finished()); + + pool.inject_test_agent_with_handle("story_a", "coder", AgentStatus::Running, h1); + pool.inject_test_agent_with_handle("story_b", "coder", AgentStatus::Running, h2); + + let found = check_orphaned_agents(&pool.agents); + assert_eq!(found, 2, "should detect both orphaned agents"); + } + + #[test] + fn check_orphaned_agents_returns_zero_when_no_orphans() { + let pool = AgentPool::new(3001); + // Inject agents in terminal states — not orphaned. + pool.inject_test_agent("story_a", "coder", AgentStatus::Completed); + pool.inject_test_agent("story_b", "qa", AgentStatus::Failed); + + let found = check_orphaned_agents(&pool.agents); + assert_eq!(found, 0, "no orphans should be detected for terminal agents"); + } + + #[tokio::test] + async fn watchdog_orphan_detection_returns_nonzero_enabling_auto_assign() { + // This test verifies the contract that `check_orphaned_agents` returns + // a non-zero count when orphans exist, which the watchdog uses to + // decide whether to trigger auto-assign (bug 161). + let pool = AgentPool::new(3001); + + let handle = tokio::spawn(async {}); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + pool.inject_test_agent_with_handle( + "orphan_story", + "coder", + AgentStatus::Running, + handle, + ); + + // Before watchdog: agent is Running. + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("orphan_story", "coder"); + assert_eq!(agents.get(&key).unwrap().status, AgentStatus::Running); + } + + // Run watchdog pass — should return 1 (orphan found). + let found = check_orphaned_agents(&pool.agents); + assert_eq!( + found, 1, + "watchdog must return 1 for a single orphaned agent" + ); + + // After watchdog: agent is Failed. + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("orphan_story", "coder"); + assert_eq!( + agents.get(&key).unwrap().status, + AgentStatus::Failed, + "orphaned agent must be marked Failed" + ); + } + } } diff --git a/server/src/main.rs b/server/src/main.rs index 48be123..efbe92a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -58,7 +58,9 @@ async fn main() -> Result<(), std::io::Error> { let agents = Arc::new(AgentPool::new(port)); // Start the background watchdog that detects and cleans up orphaned Running agents. - agents.spawn_watchdog(); + // When orphans are found, auto-assign is triggered to reassign free agents. + let watchdog_root: Option = app_state.project_root.lock().unwrap().clone(); + AgentPool::spawn_watchdog(Arc::clone(&agents), watchdog_root); // Filesystem watcher: broadcast channel for work/ pipeline changes. let (watcher_tx, _) = broadcast::channel::(1024); @@ -69,6 +71,33 @@ async fn main() -> Result<(), std::io::Error> { } } + // Subscribe to watcher events so that auto-assign triggers when a work item + // file is moved into an active pipeline stage (2_current/, 3_qa/, 4_merge/). + { + let watcher_auto_rx = watcher_tx.subscribe(); + let watcher_auto_agents = Arc::clone(&agents); + let watcher_auto_root: Option = + app_state.project_root.lock().unwrap().clone(); + if let Some(root) = watcher_auto_root { + tokio::spawn(async move { + let mut rx = watcher_auto_rx; + while let Ok(event) = rx.recv().await { + if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event + && matches!(stage.as_str(), "2_current" | "3_qa" | "4_merge") + { + slog!( + "[auto-assign] Watcher detected work item in {stage}/; \ + triggering auto-assign." + ); + watcher_auto_agents + .auto_assign_available_work(&root) + .await; + } + } + }); + } + } + // Reconciliation progress channel: startup reconciliation → WebSocket clients. let (reconciliation_tx, _) = broadcast::channel::(64);