story-kit: merge 161_bug_auto_assign_only_triggers_on_agent_completion_not_on_failure_or_periodically
This commit is contained in:
@@ -1658,14 +1658,24 @@ impl AgentPool {
|
|||||||
/// The watchdog runs every 30 seconds. It is a safety net for edge cases where the
|
/// The watchdog runs every 30 seconds. It is a safety net for edge cases where the
|
||||||
/// PTY read loop exits without updating the agent status (e.g. a panic in the
|
/// PTY read loop exits without updating the agent status (e.g. a panic in the
|
||||||
/// spawn_blocking task, or an external SIGKILL that closes the PTY fd immediately).
|
/// spawn_blocking task, or an external SIGKILL that closes the PTY fd immediately).
|
||||||
pub fn spawn_watchdog(&self) {
|
///
|
||||||
let agents = Arc::clone(&self.agents);
|
/// When orphaned agents are detected and a `project_root` is provided, auto-assign
|
||||||
|
/// is triggered so that free agents can pick up unassigned work.
|
||||||
|
pub fn spawn_watchdog(pool: Arc<AgentPool>, project_root: Option<PathBuf>) {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut interval =
|
let mut interval =
|
||||||
tokio::time::interval(std::time::Duration::from_secs(30));
|
tokio::time::interval(std::time::Duration::from_secs(30));
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
check_orphaned_agents(&agents);
|
let found = check_orphaned_agents(&pool.agents);
|
||||||
|
if found > 0
|
||||||
|
&& let Some(ref root) = project_root
|
||||||
|
{
|
||||||
|
slog!(
|
||||||
|
"[watchdog] {found} orphaned agent(s) detected; triggering auto-assign."
|
||||||
|
);
|
||||||
|
pool.auto_assign_available_work(root).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -1825,10 +1835,10 @@ fn find_free_agent_for_stage<'a>(
|
|||||||
/// without updating the agent status — for example when the process is killed
|
/// without updating the agent status — for example when the process is killed
|
||||||
/// externally and the PTY master fd returns EOF before our inactivity timeout
|
/// externally and the PTY master fd returns EOF before our inactivity timeout
|
||||||
/// fires, but some other edge case prevents the normal cleanup path from running.
|
/// fires, but some other edge case prevents the normal cleanup path from running.
|
||||||
fn check_orphaned_agents(agents: &Mutex<HashMap<String, StoryAgent>>) {
|
fn check_orphaned_agents(agents: &Mutex<HashMap<String, StoryAgent>>) -> usize {
|
||||||
let mut lock = match agents.lock() {
|
let mut lock = match agents.lock() {
|
||||||
Ok(l) => l,
|
Ok(l) => l,
|
||||||
Err(_) => return,
|
Err(_) => return 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Collect orphaned entries: Running or Pending agents whose task handle is finished.
|
// Collect orphaned entries: Running or Pending agents whose task handle is finished.
|
||||||
@@ -1850,6 +1860,7 @@ fn check_orphaned_agents(agents: &Mutex<HashMap<String, StoryAgent>>) {
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
let count = orphaned.len();
|
||||||
for (key, story_id, tx, prev_status) in orphaned {
|
for (key, story_id, tx, prev_status) in orphaned {
|
||||||
if let Some(agent) = lock.get_mut(&key) {
|
if let Some(agent) = lock.get_mut(&key) {
|
||||||
agent.status = AgentStatus::Failed;
|
agent.status = AgentStatus::Failed;
|
||||||
@@ -1865,6 +1876,7 @@ fn check_orphaned_agents(agents: &Mutex<HashMap<String, StoryAgent>>) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
count
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Server-owned completion: runs acceptance gates when an agent process exits
|
/// Server-owned completion: runs acceptance gates when an agent process exits
|
||||||
@@ -5917,4 +5929,78 @@ theirs
|
|||||||
"merge workspace should be cleaned up after failure"
|
"merge workspace should be cleaned up after failure"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── check_orphaned_agents return value tests (bug 161) ──────────────────
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn check_orphaned_agents_returns_count_of_orphaned_agents() {
|
||||||
|
let pool = AgentPool::new(3001);
|
||||||
|
|
||||||
|
// Spawn two tasks that finish immediately.
|
||||||
|
let h1 = tokio::spawn(async {});
|
||||||
|
let h2 = tokio::spawn(async {});
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
||||||
|
assert!(h1.is_finished());
|
||||||
|
assert!(h2.is_finished());
|
||||||
|
|
||||||
|
pool.inject_test_agent_with_handle("story_a", "coder", AgentStatus::Running, h1);
|
||||||
|
pool.inject_test_agent_with_handle("story_b", "coder", AgentStatus::Running, h2);
|
||||||
|
|
||||||
|
let found = check_orphaned_agents(&pool.agents);
|
||||||
|
assert_eq!(found, 2, "should detect both orphaned agents");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn check_orphaned_agents_returns_zero_when_no_orphans() {
|
||||||
|
let pool = AgentPool::new(3001);
|
||||||
|
// Inject agents in terminal states — not orphaned.
|
||||||
|
pool.inject_test_agent("story_a", "coder", AgentStatus::Completed);
|
||||||
|
pool.inject_test_agent("story_b", "qa", AgentStatus::Failed);
|
||||||
|
|
||||||
|
let found = check_orphaned_agents(&pool.agents);
|
||||||
|
assert_eq!(found, 0, "no orphans should be detected for terminal agents");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn watchdog_orphan_detection_returns_nonzero_enabling_auto_assign() {
|
||||||
|
// This test verifies the contract that `check_orphaned_agents` returns
|
||||||
|
// a non-zero count when orphans exist, which the watchdog uses to
|
||||||
|
// decide whether to trigger auto-assign (bug 161).
|
||||||
|
let pool = AgentPool::new(3001);
|
||||||
|
|
||||||
|
let handle = tokio::spawn(async {});
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
||||||
|
|
||||||
|
pool.inject_test_agent_with_handle(
|
||||||
|
"orphan_story",
|
||||||
|
"coder",
|
||||||
|
AgentStatus::Running,
|
||||||
|
handle,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Before watchdog: agent is Running.
|
||||||
|
{
|
||||||
|
let agents = pool.agents.lock().unwrap();
|
||||||
|
let key = composite_key("orphan_story", "coder");
|
||||||
|
assert_eq!(agents.get(&key).unwrap().status, AgentStatus::Running);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run watchdog pass — should return 1 (orphan found).
|
||||||
|
let found = check_orphaned_agents(&pool.agents);
|
||||||
|
assert_eq!(
|
||||||
|
found, 1,
|
||||||
|
"watchdog must return 1 for a single orphaned agent"
|
||||||
|
);
|
||||||
|
|
||||||
|
// After watchdog: agent is Failed.
|
||||||
|
{
|
||||||
|
let agents = pool.agents.lock().unwrap();
|
||||||
|
let key = composite_key("orphan_story", "coder");
|
||||||
|
assert_eq!(
|
||||||
|
agents.get(&key).unwrap().status,
|
||||||
|
AgentStatus::Failed,
|
||||||
|
"orphaned agent must be marked Failed"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -58,7 +58,9 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
let agents = Arc::new(AgentPool::new(port));
|
let agents = Arc::new(AgentPool::new(port));
|
||||||
|
|
||||||
// Start the background watchdog that detects and cleans up orphaned Running agents.
|
// Start the background watchdog that detects and cleans up orphaned Running agents.
|
||||||
agents.spawn_watchdog();
|
// When orphans are found, auto-assign is triggered to reassign free agents.
|
||||||
|
let watchdog_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
|
||||||
|
AgentPool::spawn_watchdog(Arc::clone(&agents), watchdog_root);
|
||||||
|
|
||||||
// Filesystem watcher: broadcast channel for work/ pipeline changes.
|
// Filesystem watcher: broadcast channel for work/ pipeline changes.
|
||||||
let (watcher_tx, _) = broadcast::channel::<io::watcher::WatcherEvent>(1024);
|
let (watcher_tx, _) = broadcast::channel::<io::watcher::WatcherEvent>(1024);
|
||||||
@@ -69,6 +71,33 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Subscribe to watcher events so that auto-assign triggers when a work item
|
||||||
|
// file is moved into an active pipeline stage (2_current/, 3_qa/, 4_merge/).
|
||||||
|
{
|
||||||
|
let watcher_auto_rx = watcher_tx.subscribe();
|
||||||
|
let watcher_auto_agents = Arc::clone(&agents);
|
||||||
|
let watcher_auto_root: Option<PathBuf> =
|
||||||
|
app_state.project_root.lock().unwrap().clone();
|
||||||
|
if let Some(root) = watcher_auto_root {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut rx = watcher_auto_rx;
|
||||||
|
while let Ok(event) = rx.recv().await {
|
||||||
|
if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event
|
||||||
|
&& matches!(stage.as_str(), "2_current" | "3_qa" | "4_merge")
|
||||||
|
{
|
||||||
|
slog!(
|
||||||
|
"[auto-assign] Watcher detected work item in {stage}/; \
|
||||||
|
triggering auto-assign."
|
||||||
|
);
|
||||||
|
watcher_auto_agents
|
||||||
|
.auto_assign_available_work(&root)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Reconciliation progress channel: startup reconciliation → WebSocket clients.
|
// Reconciliation progress channel: startup reconciliation → WebSocket clients.
|
||||||
let (reconciliation_tx, _) =
|
let (reconciliation_tx, _) =
|
||||||
broadcast::channel::<agents::ReconciliationEvent>(64);
|
broadcast::channel::<agents::ReconciliationEvent>(64);
|
||||||
|
|||||||
Reference in New Issue
Block a user