fix: call auto_assign_available_work after every pipeline advance (bug 295)

Stories got stuck in QA/merge when agents were busy at assignment time.
Consolidates auto_assign into a single unconditional call at the end of
run_pipeline_advance, so whenever any agent completes, the system
immediately scans for pending work and assigns free agents.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dave
2026-03-19 09:53:41 +00:00
parent 28b29b55a8
commit 6c413e1fc7

View File

@@ -348,12 +348,8 @@ impl AgentPool {
// Create persistent log writer (needs resolved_name, so must be after // Create persistent log writer (needs resolved_name, so must be after
// the atomic resolution above). // the atomic resolution above).
let log_writer = match AgentLogWriter::new( let log_writer =
project_root, match AgentLogWriter::new(project_root, story_id, &resolved_name, &log_session_id) {
story_id,
&resolved_name,
&log_session_id,
) {
Ok(w) => Some(Arc::new(Mutex::new(w))), Ok(w) => Some(Arc::new(Mutex::new(w))),
Err(e) => { Err(e) => {
eprintln!( eprintln!(
@@ -420,7 +416,8 @@ impl AgentPool {
} }
let _ = tx_clone.send(event); let _ = tx_clone.send(event);
if let Ok(mut agents) = agents_ref.lock() if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&key_clone) { && let Some(agent) = agents.get_mut(&key_clone)
{
agent.status = AgentStatus::Failed; agent.status = AgentStatus::Failed;
} }
Self::notify_agent_state_changed(&watcher_tx_clone); Self::notify_agent_state_changed(&watcher_tx_clone);
@@ -458,7 +455,8 @@ impl AgentPool {
} }
let _ = tx_clone.send(event); let _ = tx_clone.send(event);
if let Ok(mut agents) = agents_ref.lock() if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&key_clone) { && let Some(agent) = agents.get_mut(&key_clone)
{
agent.status = AgentStatus::Failed; agent.status = AgentStatus::Failed;
} }
Self::notify_agent_state_changed(&watcher_tx_clone); Self::notify_agent_state_changed(&watcher_tx_clone);
@@ -528,7 +526,8 @@ impl AgentPool {
} }
let _ = tx_clone.send(event); let _ = tx_clone.send(event);
if let Ok(mut agents) = agents_ref.lock() if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&key_clone) { && let Some(agent) = agents.get_mut(&key_clone)
{
agent.status = AgentStatus::Failed; agent.status = AgentStatus::Failed;
} }
Self::notify_agent_state_changed(&watcher_tx_clone); Self::notify_agent_state_changed(&watcher_tx_clone);
@@ -707,8 +706,7 @@ impl AgentPool {
} }
} }
let deadline = let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
loop { loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
@@ -841,16 +839,12 @@ impl AgentPool {
); );
if let Err(e) = super::lifecycle::move_story_to_qa(&project_root, story_id) { if let Err(e) = super::lifecycle::move_story_to_qa(&project_root, story_id) {
slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"); slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}");
return; } else if let Err(e) = self
}
if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), None) .start_agent(&project_root, story_id, Some("qa"), None)
.await .await
{ {
slog_error!("[pipeline] Failed to start qa agent for '{story_id}': {e}"); slog_error!("[pipeline] Failed to start qa agent for '{story_id}': {e}");
} }
// Coder slot is now free — pick up any other unassigned work in 2_current/.
self.auto_assign_available_work(&project_root).await;
} else { } else {
slog!( slog!(
"[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting." "[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting."
@@ -874,7 +868,9 @@ impl AgentPool {
PipelineStage::Qa => { PipelineStage::Qa => {
if completion.gates_passed { if completion.gates_passed {
// Run coverage gate in the QA worktree before advancing to merge. // Run coverage gate in the QA worktree before advancing to merge.
let coverage_path = worktree_path.clone().unwrap_or_else(|| project_root.clone()); let coverage_path = worktree_path
.clone()
.unwrap_or_else(|| project_root.clone());
let cp = coverage_path.clone(); let cp = coverage_path.clone();
let coverage_result = let coverage_result =
tokio::task::spawn_blocking(move || super::gates::run_coverage_gate(&cp)) tokio::task::spawn_blocking(move || super::gates::run_coverage_gate(&cp))
@@ -906,33 +902,37 @@ impl AgentPool {
// Hold in 3_qa/ for human review. // Hold in 3_qa/ for human review.
let qa_dir = project_root.join(".story_kit/work/3_qa"); let qa_dir = project_root.join(".story_kit/work/3_qa");
let story_path = qa_dir.join(format!("{story_id}.md")); let story_path = qa_dir.join(format!("{story_id}.md"));
if let Err(e) = crate::io::story_metadata::write_review_hold(&story_path) { if let Err(e) =
slog_error!("[pipeline] Failed to set review_hold on '{story_id}': {e}"); crate::io::story_metadata::write_review_hold(&story_path)
{
slog_error!(
"[pipeline] Failed to set review_hold on '{story_id}': {e}"
);
} }
slog!( slog!(
"[pipeline] QA passed for '{story_id}'. \ "[pipeline] QA passed for '{story_id}'. \
Holding for human review. \ Holding for human review. \
Worktree preserved at: {worktree_path:?}" Worktree preserved at: {worktree_path:?}"
); );
// Free up the QA slot without advancing.
self.auto_assign_available_work(&project_root).await;
} else { } else {
slog!( slog!(
"[pipeline] QA passed gates and coverage for '{story_id}'. \ "[pipeline] QA passed gates and coverage for '{story_id}'. \
manual_qa: false — moving directly to merge." manual_qa: false — moving directly to merge."
); );
if let Err(e) = super::lifecycle::move_story_to_merge(&project_root, story_id) { if let Err(e) =
slog_error!("[pipeline] Failed to move '{story_id}' to 4_merge/: {e}"); super::lifecycle::move_story_to_merge(&project_root, story_id)
return; {
} slog_error!(
if let Err(e) = self "[pipeline] Failed to move '{story_id}' to 4_merge/: {e}"
);
} else if let Err(e) = self
.start_agent(&project_root, story_id, Some("mergemaster"), None) .start_agent(&project_root, story_id, Some("mergemaster"), None)
.await .await
{ {
slog_error!("[pipeline] Failed to start mergemaster for '{story_id}': {e}"); slog_error!(
"[pipeline] Failed to start mergemaster for '{story_id}': {e}"
);
} }
// QA slot is now free — pick up any other unassigned work in 3_qa/.
self.auto_assign_available_work(&project_root).await;
} }
} else { } else {
slog!( slog!(
@@ -952,9 +952,7 @@ impl AgentPool {
} }
} }
} else { } else {
slog!( slog!("[pipeline] QA failed gates for '{story_id}'. Restarting.");
"[pipeline] QA failed gates for '{story_id}'. Restarting."
);
let context = format!( let context = format!(
"\n\n---\n## Previous QA Attempt Failed\n\ "\n\n---\n## Previous QA Attempt Failed\n\
The acceptance gates failed with the following output:\n{}\n\n\ The acceptance gates failed with the following output:\n{}\n\n\
@@ -979,15 +977,14 @@ impl AgentPool {
mergemaster explicitly reported a merge failure. \ mergemaster explicitly reported a merge failure. \
Story stays in 4_merge/ for human review." Story stays in 4_merge/ for human review."
); );
return; } else {
}
// Run script/test on master (project_root) as the post-merge verification. // Run script/test on master (project_root) as the post-merge verification.
slog!( slog!(
"[pipeline] Mergemaster completed for '{story_id}'. Running post-merge tests on master." "[pipeline] Mergemaster completed for '{story_id}'. Running post-merge tests on master."
); );
let root = project_root.clone(); let root = project_root.clone();
let test_result = tokio::task::spawn_blocking(move || super::gates::run_project_tests(&root)) let test_result =
tokio::task::spawn_blocking(move || super::gates::run_project_tests(&root))
.await .await
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
slog_warn!("[pipeline] Post-merge test task panicked: {e}"); slog_warn!("[pipeline] Post-merge test task panicked: {e}");
@@ -1002,12 +999,12 @@ impl AgentPool {
slog!( slog!(
"[pipeline] Post-merge tests passed for '{story_id}'. Moving to done." "[pipeline] Post-merge tests passed for '{story_id}'. Moving to done."
); );
if let Err(e) = super::lifecycle::move_story_to_archived(&project_root, story_id) { if let Err(e) =
super::lifecycle::move_story_to_archived(&project_root, story_id)
{
slog_error!("[pipeline] Failed to move '{story_id}' to done: {e}"); slog_error!("[pipeline] Failed to move '{story_id}' to done: {e}");
} }
self.remove_agents_for_story(story_id); self.remove_agents_for_story(story_id);
// Mergemaster slot is now free — pick up any other items in 4_merge/.
self.auto_assign_available_work(&project_root).await;
// TODO: Re-enable worktree cleanup once we have persistent agent logs. // TODO: Re-enable worktree cleanup once we have persistent agent logs.
// Removing worktrees destroys evidence needed to debug empty-commit agents. // Removing worktrees destroys evidence needed to debug empty-commit agents.
// let config = // let config =
@@ -1034,7 +1031,12 @@ impl AgentPool {
output output
); );
if let Err(e) = self if let Err(e) = self
.start_agent(&project_root, story_id, Some("mergemaster"), Some(&context)) .start_agent(
&project_root,
story_id,
Some("mergemaster"),
Some(&context),
)
.await .await
{ {
slog_error!( slog_error!(
@@ -1046,6 +1048,13 @@ impl AgentPool {
} }
} }
// Always scan for unassigned work after any agent completes, regardless
// of the outcome (success, failure, restart). This ensures stories that
// failed agent assignment due to busy agents are retried when agents
// become available (bug 295).
self.auto_assign_available_work(&project_root).await;
}
/// Internal: report that an agent has finished work on a story. /// Internal: report that an agent has finished work on a story.
/// ///
/// **Note:** This is no longer exposed as an MCP tool. The server now /// **Note:** This is no longer exposed as an MCP tool. The server now
@@ -1114,7 +1123,13 @@ impl AgentPool {
// Extract data for pipeline advance, then remove the entry so // Extract data for pipeline advance, then remove the entry so
// completed agents never appear in list_agents. // completed agents never appear in list_agents.
let (tx, session_id, project_root_for_advance, wt_path_for_advance, merge_failure_reported_for_advance) = { let (
tx,
session_id,
project_root_for_advance,
wt_path_for_advance,
merge_failure_reported_for_advance,
) = {
let mut agents = self.agents.lock().map_err(|e| e.to_string())?; let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents.get_mut(&key).ok_or_else(|| { let agent = agents.get_mut(&key).ok_or_else(|| {
format!("Agent '{agent_name}' for story '{story_id}' disappeared during gate check") format!("Agent '{agent_name}' for story '{story_id}' disappeared during gate check")
@@ -1267,14 +1282,14 @@ impl AgentPool {
}); });
} }
let story_archived = super::lifecycle::move_story_to_archived(project_root, story_id).is_ok(); let story_archived =
super::lifecycle::move_story_to_archived(project_root, story_id).is_ok();
if story_archived { if story_archived {
self.remove_agents_for_story(story_id); self.remove_agents_for_story(story_id);
} }
let worktree_cleaned_up = if wt_path.exists() { let worktree_cleaned_up = if wt_path.exists() {
let config = crate::config::ProjectConfig::load(project_root) let config = crate::config::ProjectConfig::load(project_root).unwrap_or_default();
.unwrap_or_default();
worktree::remove_worktree_by_story_id(project_root, story_id, &config) worktree::remove_worktree_by_story_id(project_root, story_id, &config)
.await .await
.is_ok() .is_ok()
@@ -1306,21 +1321,14 @@ impl AgentPool {
} }
/// Get project root helper. /// Get project root helper.
pub fn get_project_root( pub fn get_project_root(&self, state: &crate::state::SessionState) -> Result<PathBuf, String> {
&self,
state: &crate::state::SessionState,
) -> Result<PathBuf, String> {
state.get_project_root() state.get_project_root()
} }
/// Get the log session ID and project root for an agent, if available. /// Get the log session ID and project root for an agent, if available.
/// ///
/// Used by MCP tools to find the persistent log file for a completed agent. /// Used by MCP tools to find the persistent log file for a completed agent.
pub fn get_log_info( pub fn get_log_info(&self, story_id: &str, agent_name: &str) -> Option<(String, PathBuf)> {
&self,
story_id: &str,
agent_name: &str,
) -> Option<(String, PathBuf)> {
let key = composite_key(story_id, agent_name); let key = composite_key(story_id, agent_name);
let agents = self.agents.lock().ok()?; let agents = self.agents.lock().ok()?;
let agent = agents.get(&key)?; let agent = agents.get(&key)?;
@@ -1364,9 +1372,7 @@ impl AgentPool {
} }
} }
Err(e) => { Err(e) => {
slog_error!( slog_error!("[pipeline] set_merge_failure_reported: could not lock agents: {e}");
"[pipeline] set_merge_failure_reported: could not lock agents: {e}"
);
} }
} }
} }
@@ -1678,9 +1684,7 @@ impl AgentPool {
continue; continue;
} }
Err(e) => { Err(e) => {
eprintln!( eprintln!("[startup:reconcile] Gate check task panicked for '{story_id}': {e}");
"[startup:reconcile] Gate check task panicked for '{story_id}': {e}"
);
let _ = progress_tx.send(ReconciliationEvent { let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(), story_id: story_id.clone(),
status: "failed".to_string(), status: "failed".to_string(),
@@ -1703,9 +1707,7 @@ impl AgentPool {
continue; continue;
} }
eprintln!( eprintln!("[startup:reconcile] Gates passed for '{story_id}' (stage: {stage_dir}/).");
"[startup:reconcile] Gates passed for '{story_id}' (stage: {stage_dir}/)."
);
if stage_dir == "2_current" { if stage_dir == "2_current" {
// Coder stage → advance to QA. // Coder stage → advance to QA.
@@ -1727,16 +1729,15 @@ impl AgentPool {
} else if stage_dir == "3_qa" { } else if stage_dir == "3_qa" {
// QA stage → run coverage gate before advancing to merge. // QA stage → run coverage gate before advancing to merge.
let wt_path_for_cov = wt_path.clone(); let wt_path_for_cov = wt_path.clone();
let coverage_result = let coverage_result = tokio::task::spawn_blocking(move || {
tokio::task::spawn_blocking(move || super::gates::run_coverage_gate(&wt_path_for_cov)) super::gates::run_coverage_gate(&wt_path_for_cov)
})
.await; .await;
let (coverage_passed, coverage_output) = match coverage_result { let (coverage_passed, coverage_output) = match coverage_result {
Ok(Ok(pair)) => pair, Ok(Ok(pair)) => pair,
Ok(Err(e)) => { Ok(Err(e)) => {
eprintln!( eprintln!("[startup:reconcile] Coverage gate error for '{story_id}': {e}");
"[startup:reconcile] Coverage gate error for '{story_id}': {e}"
);
let _ = progress_tx.send(ReconciliationEvent { let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(), story_id: story_id.clone(),
status: "failed".to_string(), status: "failed".to_string(),
@@ -1788,7 +1789,9 @@ impl AgentPool {
status: "review_hold".to_string(), status: "review_hold".to_string(),
message: "Passed QA — waiting for human review.".to_string(), message: "Passed QA — waiting for human review.".to_string(),
}); });
} else if let Err(e) = super::lifecycle::move_story_to_merge(project_root, story_id) { } else if let Err(e) =
super::lifecycle::move_story_to_merge(project_root, story_id)
{
eprintln!( eprintln!(
"[startup:reconcile] Failed to move '{story_id}' to 4_merge/: {e}" "[startup:reconcile] Failed to move '{story_id}' to 4_merge/: {e}"
); );
@@ -1923,17 +1926,14 @@ impl AgentPool {
/// is triggered so that free agents can pick up unassigned work. /// is triggered so that free agents can pick up unassigned work.
pub fn spawn_watchdog(pool: Arc<AgentPool>, project_root: Option<PathBuf>) { pub fn spawn_watchdog(pool: Arc<AgentPool>, project_root: Option<PathBuf>) {
tokio::spawn(async move { tokio::spawn(async move {
let mut interval = let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
tokio::time::interval(std::time::Duration::from_secs(30));
loop { loop {
interval.tick().await; interval.tick().await;
let found = check_orphaned_agents(&pool.agents); let found = check_orphaned_agents(&pool.agents);
if found > 0 if found > 0
&& let Some(ref root) = project_root && let Some(ref root) = project_root
{ {
slog!( slog!("[watchdog] {found} orphaned agent(s) detected; triggering auto-assign.");
"[watchdog] {found} orphaned agent(s) detected; triggering auto-assign."
);
pool.auto_assign_available_work(root).await; pool.auto_assign_available_work(root).await;
} }
} }
@@ -1992,7 +1992,11 @@ fn find_active_story_stage(project_root: &Path, story_id: &str) -> Option<&'stat
/// ///
/// Returns `Some(agent_name)` if the front matter specifies an agent, or `None` /// Returns `Some(agent_name)` if the front matter specifies an agent, or `None`
/// if the field is absent or the file cannot be read / parsed. /// if the field is absent or the file cannot be read / parsed.
fn read_story_front_matter_agent(project_root: &Path, stage_dir: &str, story_id: &str) -> Option<String> { fn read_story_front_matter_agent(
project_root: &Path,
stage_dir: &str,
story_id: &str,
) -> Option<String> {
use crate::io::story_metadata::parse_front_matter; use crate::io::story_metadata::parse_front_matter;
let path = project_root let path = project_root
.join(".story_kit") .join(".story_kit")
@@ -2030,10 +2034,7 @@ fn is_agent_free(agents: &HashMap<String, StoryAgent>, agent_name: &str) -> bool
} }
fn scan_stage_items(project_root: &Path, stage_dir: &str) -> Vec<String> { fn scan_stage_items(project_root: &Path, stage_dir: &str) -> Vec<String> {
let dir = project_root let dir = project_root.join(".story_kit").join("work").join(stage_dir);
.join(".story_kit")
.join("work")
.join(stage_dir);
if !dir.is_dir() { if !dir.is_dir() {
return Vec::new(); return Vec::new();
} }
@@ -2124,7 +2125,12 @@ fn check_orphaned_agents(agents: &Mutex<HashMap<String, StoryAgent>>) -> usize {
.rsplit_once(':') .rsplit_once(':')
.map(|(s, _)| s.to_string()) .map(|(s, _)| s.to_string())
.unwrap_or_else(|| key.clone()); .unwrap_or_else(|| key.clone());
return Some((key.clone(), story_id, agent.tx.clone(), agent.status.clone())); return Some((
key.clone(),
story_id,
agent.tx.clone(),
agent.status.clone(),
));
} }
None None
}) })
@@ -2440,9 +2446,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn report_completion_rejects_nonexistent_agent() { async fn report_completion_rejects_nonexistent_agent() {
let pool = AgentPool::new_test(3001); let pool = AgentPool::new_test(3001);
let result = pool let result = pool.report_completion("no_story", "no_bot", "done").await;
.report_completion("no_story", "no_bot", "done")
.await;
assert!(result.is_err()); assert!(result.is_err());
let msg = result.unwrap_err(); let msg = result.unwrap_err();
assert!(msg.contains("No agent"), "unexpected: {msg}"); assert!(msg.contains("No agent"), "unexpected: {msg}");
@@ -2518,7 +2522,14 @@ mod tests {
// Subscribe before calling so we can check if Done event was emitted. // Subscribe before calling so we can check if Done event was emitted.
let mut rx = pool.subscribe("s10", "coder-1").unwrap(); let mut rx = pool.subscribe("s10", "coder-1").unwrap();
run_server_owned_completion(&pool.agents, pool.port, "s10", "coder-1", Some("sess-1".to_string()), pool.watcher_tx.clone()) run_server_owned_completion(
&pool.agents,
pool.port,
"s10",
"coder-1",
Some("sess-1".to_string()),
pool.watcher_tx.clone(),
)
.await; .await;
// Status should remain Completed (unchanged) — no gate re-run. // Status should remain Completed (unchanged) — no gate re-run.
@@ -2527,10 +2538,7 @@ mod tests {
let agent = agents.get(&key).unwrap(); let agent = agents.get(&key).unwrap();
assert_eq!(agent.status, AgentStatus::Completed); assert_eq!(agent.status, AgentStatus::Completed);
// Summary should still be the original, not overwritten. // Summary should still be the original, not overwritten.
assert_eq!( assert_eq!(agent.completion.as_ref().unwrap().summary, "Already done");
agent.completion.as_ref().unwrap().summary,
"Already done"
);
drop(agents); drop(agents);
// No Done event should have been emitted. // No Done event should have been emitted.
@@ -2558,7 +2566,14 @@ mod tests {
let mut rx = pool.subscribe("s11", "coder-1").unwrap(); let mut rx = pool.subscribe("s11", "coder-1").unwrap();
run_server_owned_completion(&pool.agents, pool.port, "s11", "coder-1", Some("sess-2".to_string()), pool.watcher_tx.clone()) run_server_owned_completion(
&pool.agents,
pool.port,
"s11",
"coder-1",
Some("sess-2".to_string()),
pool.watcher_tx.clone(),
)
.await; .await;
// Agent entry should be removed from the map after completion. // Agent entry should be removed from the map after completion.
@@ -2601,7 +2616,14 @@ mod tests {
let mut rx = pool.subscribe("s12", "coder-1").unwrap(); let mut rx = pool.subscribe("s12", "coder-1").unwrap();
run_server_owned_completion(&pool.agents, pool.port, "s12", "coder-1", None, pool.watcher_tx.clone()) run_server_owned_completion(
&pool.agents,
pool.port,
"s12",
"coder-1",
None,
pool.watcher_tx.clone(),
)
.await; .await;
// Agent entry should be removed from the map after completion (even on failure). // Agent entry should be removed from the map after completion (even on failure).
@@ -2625,7 +2647,14 @@ mod tests {
async fn server_owned_completion_nonexistent_agent_is_noop() { async fn server_owned_completion_nonexistent_agent_is_noop() {
let pool = AgentPool::new_test(3001); let pool = AgentPool::new_test(3001);
// Should not panic or error — just silently return. // Should not panic or error — just silently return.
run_server_owned_completion(&pool.agents, pool.port, "nonexistent", "bot", None, pool.watcher_tx.clone()) run_server_owned_completion(
&pool.agents,
pool.port,
"nonexistent",
"bot",
None,
pool.watcher_tx.clone(),
)
.await; .await;
} }
@@ -2703,7 +2732,8 @@ mod tests {
// Story should have moved to 4_merge/ // Story should have moved to 4_merge/
assert!( assert!(
root.join(".story_kit/work/4_merge/51_story_test.md").exists(), root.join(".story_kit/work/4_merge/51_story_test.md")
.exists(),
"story should be in 4_merge/" "story should be in 4_merge/"
); );
assert!( assert!(
@@ -2916,12 +2946,7 @@ stage = "qa"
); );
// Should NOT appear as a coder // Should NOT appear as a coder
assert!( assert!(
!is_story_assigned_for_stage( !is_story_assigned_for_stage(&config, &agents, "42_story_foo", &PipelineStage::Coder),
&config,
&agents,
"42_story_foo",
&PipelineStage::Coder
),
"qa-2 should not be detected as a coder" "qa-2 should not be detected as a coder"
); );
} }
@@ -2967,7 +2992,11 @@ name = "coder-3"
let agents = pool.agents.lock().unwrap(); let agents = pool.agents.lock().unwrap();
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert_eq!(free, Some("coder-2"), "coder-2 should be the first free coder"); assert_eq!(
free,
Some("coder-2"),
"coder-2 should be the first free coder"
);
} }
#[test] #[test]
@@ -3070,10 +3099,7 @@ stage = "coder"
fs::create_dir_all(&qa).unwrap(); fs::create_dir_all(&qa).unwrap();
fs::write(qa.join("11_story_test.md"), "test").unwrap(); fs::write(qa.join("11_story_test.md"), "test").unwrap();
assert_eq!( assert_eq!(find_active_story_stage(root, "11_story_test"), Some("3_qa"));
find_active_story_stage(root, "11_story_test"),
Some("3_qa")
);
} }
#[test] #[test]
@@ -3125,7 +3151,10 @@ stage = "coder"
pool.inject_test_agent("story_b", "qa", AgentStatus::Failed); pool.inject_test_agent("story_b", "qa", AgentStatus::Failed);
let found = check_orphaned_agents(&pool.agents); let found = check_orphaned_agents(&pool.agents);
assert_eq!(found, 0, "no orphans should be detected for terminal agents"); assert_eq!(
found, 0,
"no orphans should be detected for terminal agents"
);
} }
#[tokio::test] #[tokio::test]
@@ -3134,10 +3163,17 @@ stage = "coder"
let handle = tokio::spawn(async {}); let handle = tokio::spawn(async {});
tokio::time::sleep(std::time::Duration::from_millis(20)).await; tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(handle.is_finished(), "task should be finished before injection"); assert!(
handle.is_finished(),
"task should be finished before injection"
);
let tx = let tx = pool.inject_test_agent_with_handle(
pool.inject_test_agent_with_handle("orphan_story", "coder", AgentStatus::Running, handle); "orphan_story",
"coder",
AgentStatus::Running,
handle,
);
let mut rx = tx.subscribe(); let mut rx = tx.subscribe();
pool.run_watchdog_once(); pool.run_watchdog_once();
@@ -3170,12 +3206,7 @@ stage = "coder"
let handle = tokio::spawn(async {}); let handle = tokio::spawn(async {});
tokio::time::sleep(std::time::Duration::from_millis(20)).await; tokio::time::sleep(std::time::Duration::from_millis(20)).await;
pool.inject_test_agent_with_handle( pool.inject_test_agent_with_handle("orphan_story", "coder", AgentStatus::Running, handle);
"orphan_story",
"coder",
AgentStatus::Running,
handle,
);
// Before watchdog: agent is Running. // Before watchdog: agent is Running.
{ {
@@ -3260,11 +3291,18 @@ stage = "coder"
// Agent entries for the archived story should be gone. // Agent entries for the archived story should be gone.
let remaining = pool.list_agents().unwrap(); let remaining = pool.list_agents().unwrap();
assert_eq!(remaining.len(), 1, "only the other story's agent should remain"); assert_eq!(
remaining.len(),
1,
"only the other story's agent should remain"
);
assert_eq!(remaining[0].story_id, "61_story_other"); assert_eq!(remaining[0].story_id, "61_story_other");
// Story file should be in 5_done/ // Story file should be in 5_done/
assert!(root.join(".story_kit/work/5_done/60_story_cleanup.md").exists()); assert!(
root.join(".story_kit/work/5_done/60_story_cleanup.md")
.exists()
);
} }
// ── kill_all_children tests ──────────────────────────────────── // ── kill_all_children tests ────────────────────────────────────
@@ -3515,9 +3553,7 @@ stage = "coder"
pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running);
pool.inject_test_agent("story-2", "coder-2", AgentStatus::Pending); pool.inject_test_agent("story-2", "coder-2", AgentStatus::Pending);
let result = pool let result = pool.start_agent(tmp.path(), "story-3", None, None).await;
.start_agent(tmp.path(), "story-3", None, None)
.await;
assert!(result.is_err()); assert!(result.is_err());
let err = result.unwrap_err(); let err = result.unwrap_err();
assert!( assert!(
@@ -3545,18 +3581,12 @@ stage = "coder"
) )
.unwrap(); .unwrap();
// Place the story in 1_backlog/. // Place the story in 1_backlog/.
std::fs::write( std::fs::write(backlog.join("story-3.md"), "---\nname: Story 3\n---\n").unwrap();
backlog.join("story-3.md"),
"---\nname: Story 3\n---\n",
)
.unwrap();
let pool = AgentPool::new_test(3001); let pool = AgentPool::new_test(3001);
pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running);
let result = pool let result = pool.start_agent(tmp.path(), "story-3", None, None).await;
.start_agent(tmp.path(), "story-3", None, None)
.await;
// Should fail because all coders are busy. // Should fail because all coders are busy.
assert!(result.is_err()); assert!(result.is_err());
@@ -3597,11 +3627,7 @@ stage = "coder"
) )
.unwrap(); .unwrap();
// Place the story in 2_current/ (simulating the "queued" state). // Place the story in 2_current/ (simulating the "queued" state).
std::fs::write( std::fs::write(current.join("story-3.md"), "---\nname: Story 3\n---\n").unwrap();
current.join("story-3.md"),
"---\nname: Story 3\n---\n",
)
.unwrap();
let pool = AgentPool::new_test(3001); let pool = AgentPool::new_test(3001);
// No agents are running — coder-1 is free. // No agents are running — coder-1 is free.
@@ -3637,20 +3663,14 @@ stage = "coder"
) )
.unwrap(); .unwrap();
// Place the story directly in 2_current/. // Place the story directly in 2_current/.
std::fs::write( std::fs::write(current.join("story-5.md"), "---\nname: Story 5\n---\n").unwrap();
current.join("story-5.md"),
"---\nname: Story 5\n---\n",
)
.unwrap();
let pool = AgentPool::new_test(3001); let pool = AgentPool::new_test(3001);
// start_agent should attempt to assign coder-1 (no infra, so it will // start_agent should attempt to assign coder-1 (no infra, so it will
// fail for git reasons), but must NOT fail due to the story already // fail for git reasons), but must NOT fail due to the story already
// being in 2_current/. // being in 2_current/.
let result = pool let result = pool.start_agent(tmp.path(), "story-5", None, None).await;
.start_agent(tmp.path(), "story-5", None, None)
.await;
match result { match result {
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
@@ -3710,20 +3730,14 @@ stage = "coder"
// Write a minimal project.toml so ProjectConfig::load can find the "qa" agent. // Write a minimal project.toml so ProjectConfig::load can find the "qa" agent.
let sk_dir = root.join(".story_kit"); let sk_dir = root.join(".story_kit");
fs::create_dir_all(&sk_dir).unwrap(); fs::create_dir_all(&sk_dir).unwrap();
fs::write( fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap();
sk_dir.join("project.toml"),
"[[agent]]\nname = \"qa\"\n",
)
.unwrap();
let pool = AgentPool::new_test(3001); let pool = AgentPool::new_test(3001);
// Simulate qa already running on story-a. // Simulate qa already running on story-a.
pool.inject_test_agent("story-a", "qa", AgentStatus::Running); pool.inject_test_agent("story-a", "qa", AgentStatus::Running);
// Attempt to start qa on story-b — must be rejected. // Attempt to start qa on story-b — must be rejected.
let result = pool let result = pool.start_agent(root, "story-b", Some("qa"), None).await;
.start_agent(root, "story-b", Some("qa"), None)
.await;
assert!( assert!(
result.is_err(), result.is_err(),
@@ -3747,11 +3761,7 @@ stage = "coder"
let sk_dir = root.join(".story_kit"); let sk_dir = root.join(".story_kit");
fs::create_dir_all(&sk_dir).unwrap(); fs::create_dir_all(&sk_dir).unwrap();
fs::write( fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap();
sk_dir.join("project.toml"),
"[[agent]]\nname = \"qa\"\n",
)
.unwrap();
let pool = AgentPool::new_test(3001); let pool = AgentPool::new_test(3001);
// Previous run completed — should NOT block a new story. // Previous run completed — should NOT block a new story.
@@ -3761,9 +3771,7 @@ stage = "coder"
// NOT fail at the concurrency check. We detect the difference by inspecting // NOT fail at the concurrency check. We detect the difference by inspecting
// the error message: a concurrency rejection says "already running", while a // the error message: a concurrency rejection says "already running", while a
// later failure (missing story file, missing claude binary, etc.) says something else. // later failure (missing story file, missing claude binary, etc.) says something else.
let result = pool let result = pool.start_agent(root, "story-b", Some("qa"), None).await;
.start_agent(root, "story-b", Some("qa"), None)
.await;
if let Err(ref e) = result { if let Err(ref e) = result {
assert!( assert!(
@@ -3795,21 +3803,13 @@ stage = "coder"
// Minimal project.toml with a "qa" agent. // Minimal project.toml with a "qa" agent.
let sk_dir = root.join(".story_kit"); let sk_dir = root.join(".story_kit");
fs::create_dir_all(&sk_dir).unwrap(); fs::create_dir_all(&sk_dir).unwrap();
fs::write( fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap();
sk_dir.join("project.toml"),
"[[agent]]\nname = \"qa\"\n",
)
.unwrap();
// Create the story in upcoming so `move_story_to_current` succeeds, // Create the story in upcoming so `move_story_to_current` succeeds,
// but do NOT init a git repo — `create_worktree` will fail in the spawn. // but do NOT init a git repo — `create_worktree` will fail in the spawn.
let upcoming = root.join(".story_kit/work/1_backlog"); let upcoming = root.join(".story_kit/work/1_backlog");
fs::create_dir_all(&upcoming).unwrap(); fs::create_dir_all(&upcoming).unwrap();
fs::write( fs::write(upcoming.join("50_story_test.md"), "---\nname: Test\n---\n").unwrap();
upcoming.join("50_story_test.md"),
"---\nname: Test\n---\n",
)
.unwrap();
let pool = AgentPool::new_test(3099); let pool = AgentPool::new_test(3099);
@@ -3858,9 +3858,7 @@ stage = "coder"
let events = pool let events = pool
.drain_events("50_story_test", "qa") .drain_events("50_story_test", "qa")
.expect("drain_events should succeed"); .expect("drain_events should succeed");
let has_error_event = events let has_error_event = events.iter().any(|e| matches!(e, AgentEvent::Error { .. }));
.iter()
.any(|e| matches!(e, AgentEvent::Error { .. }));
assert!( assert!(
has_error_event, has_error_event,
"event_log must contain AgentEvent::Error after worktree creation fails" "event_log must contain AgentEvent::Error after worktree creation fails"
@@ -3880,11 +3878,7 @@ stage = "coder"
let sk_dir = root.join(".story_kit"); let sk_dir = root.join(".story_kit");
fs::create_dir_all(&sk_dir).unwrap(); fs::create_dir_all(&sk_dir).unwrap();
fs::write( fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap();
sk_dir.join("project.toml"),
"[[agent]]\nname = \"qa\"\n",
)
.unwrap();
let pool = AgentPool::new_test(3099); let pool = AgentPool::new_test(3099);
@@ -3893,9 +3887,7 @@ stage = "coder"
// Attempting to start the same agent on a different story must be // Attempting to start the same agent on a different story must be
// rejected — the Running entry must still be there. // rejected — the Running entry must still be there.
let result = pool let result = pool.start_agent(root, "story-y", Some("qa"), None).await;
.start_agent(root, "story-y", Some("qa"), None)
.await;
assert!(result.is_err()); assert!(result.is_err());
let err = result.unwrap_err(); let err = result.unwrap_err();
@@ -3920,7 +3912,11 @@ stage = "coder"
let sk_dir = root.join(".story_kit"); let sk_dir = root.join(".story_kit");
fs::create_dir_all(&sk_dir).unwrap(); fs::create_dir_all(&sk_dir).unwrap();
fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"coder-1\"\n").unwrap(); fs::write(
sk_dir.join("project.toml"),
"[[agent]]\nname = \"coder-1\"\n",
)
.unwrap();
let pool = AgentPool::new_test(3099); let pool = AgentPool::new_test(3099);
@@ -4041,7 +4037,10 @@ stage = "coder"
.start_agent(root, "42_story_foo", Some("coder-2"), None) .start_agent(root, "42_story_foo", Some("coder-2"), None)
.await; .await;
assert!(result.is_err(), "second coder on same story must be rejected"); assert!(
result.is_err(),
"second coder on same story must be rejected"
);
let err = result.unwrap_err(); let err = result.unwrap_err();
assert!( assert!(
err.contains("same pipeline stage"), err.contains("same pipeline stage"),
@@ -4144,10 +4143,7 @@ stage = "coder"
// Exactly one call must be rejected with a stage-conflict error. // Exactly one call must be rejected with a stage-conflict error.
let stage_rejections = [&r1, &r2] let stage_rejections = [&r1, &r2]
.iter() .iter()
.filter(|r| { .filter(|r| r.as_ref().is_err_and(|e| e.contains("same pipeline stage")))
r.as_ref()
.is_err_and(|e| e.contains("same pipeline stage"))
})
.count(); .count();
assert_eq!( assert_eq!(
@@ -4359,7 +4355,9 @@ stage = "coder"
MergeJobStatus::Completed(report) => { MergeJobStatus::Completed(report) => {
assert!(!report.had_conflicts, "should have no conflicts"); assert!(!report.had_conflicts, "should have no conflicts");
assert!( assert!(
report.success || report.gate_output.contains("Failed to run") || !report.gates_passed, report.success
|| report.gate_output.contains("Failed to run")
|| !report.gates_passed,
"report should be coherent: {report:?}" "report should be coherent: {report:?}"
); );
if report.story_archived { if report.story_archived {
@@ -4454,7 +4452,9 @@ stage = "coder"
// Run the squash-merge. The failing script/test makes quality gates // Run the squash-merge. The failing script/test makes quality gates
// fail → fast-forward must NOT happen. // fail → fast-forward must NOT happen.
let result = crate::agents::merge::run_squash_merge(repo, "feature/story-142_test", "142_test").unwrap(); let result =
crate::agents::merge::run_squash_merge(repo, "feature/story-142_test", "142_test")
.unwrap();
let head_after = String::from_utf8( let head_after = String::from_utf8(
Command::new("git") Command::new("git")
@@ -4489,7 +4489,11 @@ stage = "coder"
init_git_repo(repo); init_git_repo(repo);
// Create a file on master. // Create a file on master.
fs::write(repo.join("code.rs"), "fn main() {\n println!(\"hello\");\n}\n").unwrap(); fs::write(
repo.join("code.rs"),
"fn main() {\n println!(\"hello\");\n}\n",
)
.unwrap();
Command::new("git") Command::new("git")
.args(["add", "."]) .args(["add", "."])
.current_dir(repo) .current_dir(repo)
@@ -4507,7 +4511,11 @@ stage = "coder"
.current_dir(repo) .current_dir(repo)
.output() .output()
.unwrap(); .unwrap();
fs::write(repo.join("code.rs"), "fn main() {\n println!(\"hello\");\n feature_fn();\n}\n").unwrap(); fs::write(
repo.join("code.rs"),
"fn main() {\n println!(\"hello\");\n feature_fn();\n}\n",
)
.unwrap();
Command::new("git") Command::new("git")
.args(["add", "."]) .args(["add", "."])
.current_dir(repo) .current_dir(repo)
@@ -4525,7 +4533,11 @@ stage = "coder"
.current_dir(repo) .current_dir(repo)
.output() .output()
.unwrap(); .unwrap();
fs::write(repo.join("code.rs"), "fn main() {\n println!(\"hello\");\n master_fn();\n}\n").unwrap(); fs::write(
repo.join("code.rs"),
"fn main() {\n println!(\"hello\");\n master_fn();\n}\n",
)
.unwrap();
Command::new("git") Command::new("git")
.args(["add", "."]) .args(["add", "."])
.current_dir(repo) .current_dir(repo)
@@ -4717,9 +4729,7 @@ stage = "coder"
// and the story stays in 2_current/. The important assertion is that // and the story stays in 2_current/. The important assertion is that
// reconcile ran without panicking and the story is in a consistent state. // reconcile ran without panicking and the story is in a consistent state.
let in_current = current.join("61_story_test.md").exists(); let in_current = current.join("61_story_test.md").exists();
let in_qa = root let in_qa = root.join(".story_kit/work/3_qa/61_story_test.md").exists();
.join(".story_kit/work/3_qa/61_story_test.md")
.exists();
assert!( assert!(
in_current || in_qa, in_current || in_qa,
"story should be in 2_current/ or 3_qa/ after reconciliation" "story should be in 2_current/ or 3_qa/ after reconciliation"
@@ -4746,11 +4756,7 @@ stage = "coder"
let qa_dir = tmp.path().join(".story_kit/work/3_qa"); let qa_dir = tmp.path().join(".story_kit/work/3_qa");
std::fs::create_dir_all(&qa_dir).unwrap(); std::fs::create_dir_all(&qa_dir).unwrap();
let spike_path = qa_dir.join("10_spike_research.md"); let spike_path = qa_dir.join("10_spike_research.md");
std::fs::write( std::fs::write(&spike_path, "---\nname: Research spike\n---\n# Spike\n").unwrap();
&spike_path,
"---\nname: Research spike\n---\n# Spike\n",
)
.unwrap();
assert!(!has_review_hold(tmp.path(), "3_qa", "10_spike_research")); assert!(!has_review_hold(tmp.path(), "3_qa", "10_spike_research"));
} }
@@ -4828,17 +4834,19 @@ stage = "coder"
let agents = pool.agents.lock().unwrap(); let agents = pool.agents.lock().unwrap();
// coder-1 must NOT have been assigned (wrong stage for 3_qa/). // coder-1 must NOT have been assigned (wrong stage for 3_qa/).
let coder_assigned = agents let coder_assigned = agents.values().any(|a| {
.values() a.agent_name == "coder-1"
.any(|a| a.agent_name == "coder-1" && matches!(a.status, AgentStatus::Pending | AgentStatus::Running)); && matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
});
assert!( assert!(
!coder_assigned, !coder_assigned,
"coder-1 should not be assigned to a QA-stage story" "coder-1 should not be assigned to a QA-stage story"
); );
// qa-1 should have been assigned instead. // qa-1 should have been assigned instead.
let qa_assigned = agents let qa_assigned = agents.values().any(|a| {
.values() a.agent_name == "qa-1"
.any(|a| a.agent_name == "qa-1" && matches!(a.status, AgentStatus::Pending | AgentStatus::Running)); && matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
});
assert!( assert!(
qa_assigned, qa_assigned,
"qa-1 should be assigned as fallback for the QA-stage story" "qa-1 should be assigned as fallback for the QA-stage story"
@@ -4873,17 +4881,19 @@ stage = "coder"
let agents = pool.agents.lock().unwrap(); let agents = pool.agents.lock().unwrap();
// coder-1 should have been picked (it matches the stage and is preferred). // coder-1 should have been picked (it matches the stage and is preferred).
let coder1_assigned = agents let coder1_assigned = agents.values().any(|a| {
.values() a.agent_name == "coder-1"
.any(|a| a.agent_name == "coder-1" && matches!(a.status, AgentStatus::Pending | AgentStatus::Running)); && matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
});
assert!( assert!(
coder1_assigned, coder1_assigned,
"coder-1 should be assigned when it matches the stage and is preferred" "coder-1 should be assigned when it matches the stage and is preferred"
); );
// coder-2 must NOT be assigned (not preferred). // coder-2 must NOT be assigned (not preferred).
let coder2_assigned = agents let coder2_assigned = agents.values().any(|a| {
.values() a.agent_name == "coder-2"
.any(|a| a.agent_name == "coder-2" && matches!(a.status, AgentStatus::Pending | AgentStatus::Running)); && matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
});
assert!( assert!(
!coder2_assigned, !coder2_assigned,
"coder-2 should not be assigned when coder-1 is explicitly preferred" "coder-2 should not be assigned when coder-1 is explicitly preferred"
@@ -4923,4 +4933,99 @@ stage = "coder"
"No agent should be started when no stage-appropriate agent is available" "No agent should be started when no stage-appropriate agent is available"
); );
} }
/// Bug 295: when a coder completes and QA is busy on another story,
/// the newly QA-queued story must be picked up when `run_pipeline_advance`
/// finishes for the busy QA agent's story (because auto_assign is now
/// called unconditionally at the end of pipeline advance).
#[tokio::test]
async fn pipeline_advance_picks_up_waiting_qa_stories_after_completion() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let sk = root.join(".story_kit");
let qa_dir = sk.join("work/3_qa");
fs::create_dir_all(&qa_dir).unwrap();
// Configure a single QA agent.
fs::write(
sk.join("project.toml"),
r#"
[[agent]]
name = "qa"
stage = "qa"
"#,
)
.unwrap();
// Story 292 is in QA with QA agent running (will "complete" via
// run_pipeline_advance below). Story 293 is in QA with NO agent —
// simulating the "stuck" state from bug 295.
fs::write(
qa_dir.join("292_story_first.md"),
"---\nname: First\nmanual_qa: true\n---\n",
)
.unwrap();
fs::write(
qa_dir.join("293_story_second.md"),
"---\nname: Second\nmanual_qa: true\n---\n",
)
.unwrap();
let pool = AgentPool::new_test(3001);
// QA is currently running on story 292.
pool.inject_test_agent("292_story_first", "qa", AgentStatus::Running);
// Verify that 293 cannot get a QA agent right now (QA is busy).
{
let agents = pool.agents.lock().unwrap();
assert!(
!is_agent_free(&agents, "qa"),
"qa should be busy on story 292"
);
}
// Simulate QA completing on story 292: remove the agent from the pool
// (as run_server_owned_completion does) then run pipeline advance.
{
let mut agents = pool.agents.lock().unwrap();
agents.remove(&composite_key("292_story_first", "qa"));
}
// Pipeline advance for QA with gates_passed=true will:
// 1. Run coverage gate (will "pass" trivially in test — no script/test_coverage)
// 2. Set review_hold on 292 (manual_qa: true)
// 3. Call auto_assign_available_work (the fix from bug 295)
// 4. auto_assign should find 293 in 3_qa/ with no agent and start qa on it
pool.run_pipeline_advance(
"292_story_first",
"qa",
CompletionReport {
summary: "QA done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
false,
)
.await;
// After pipeline advance, auto_assign should have started QA on story 293.
let agents = pool.agents.lock().unwrap();
let qa_on_293 = agents.values().any(|a| {
a.agent_name == "qa"
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
});
assert!(
qa_on_293,
"auto_assign should have started qa for story 293 after 292's QA completed, \
but no qa agent is pending/running. Pool: {:?}",
agents
.iter()
.map(|(k, a)| format!("{k}: {} ({})", a.agent_name, a.status))
.collect::<Vec<_>>()
);
}
} }