use crate::config::ProjectConfig; use crate::slog; use crate::slog_error; use crate::slog_warn; use crate::io::watcher::WatcherEvent; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; use super::super::super::{ CompletionReport, PipelineStage, agent_config_stage, pipeline_stage, }; use super::super::{AgentPool, StoryAgent}; impl AgentPool { /// Pipeline advancement: after an agent completes, move the story to /// the next pipeline stage and start the appropriate agent. pub(super) async fn run_pipeline_advance( &self, story_id: &str, agent_name: &str, completion: CompletionReport, project_root: Option, worktree_path: Option, merge_failure_reported: bool, ) { let project_root = match project_root { Some(p) => p, None => { slog_warn!("[pipeline] No project_root for '{story_id}:{agent_name}'"); return; } }; let config = ProjectConfig::load(&project_root).unwrap_or_default(); let stage = config .find_agent(agent_name) .map(agent_config_stage) .unwrap_or_else(|| pipeline_stage(agent_name)); match stage { PipelineStage::Other => { // Supervisors and unknown agents do not advance the pipeline. } PipelineStage::Coder => { if completion.gates_passed { // Determine effective QA mode for this story. let qa_mode = { let item_type = crate::agents::lifecycle::item_type_from_id(story_id); if item_type == "spike" { crate::io::story_metadata::QaMode::Human } else { let default_qa = config.default_qa_mode(); // Story is in 2_current/ when a coder completes. let story_path = project_root .join(".storkit/work/2_current") .join(format!("{story_id}.md")); crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa) } }; match qa_mode { crate::io::story_metadata::QaMode::Server => { slog!( "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \ qa: server — moving directly to merge." ); if let Err(e) = crate::agents::lifecycle::move_story_to_merge(&project_root, story_id) { slog_error!( "[pipeline] Failed to move '{story_id}' to 4_merge/: {e}" ); } else if let Err(e) = self .start_agent(&project_root, story_id, Some("mergemaster"), None) .await { slog_error!( "[pipeline] Failed to start mergemaster for '{story_id}': {e}" ); } } crate::io::story_metadata::QaMode::Agent => { slog!( "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \ qa: agent — moving to QA." ); if let Err(e) = crate::agents::lifecycle::move_story_to_qa(&project_root, story_id) { slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"); } else if let Err(e) = self .start_agent(&project_root, story_id, Some("qa"), None) .await { slog_error!("[pipeline] Failed to start qa agent for '{story_id}': {e}"); } } crate::io::story_metadata::QaMode::Human => { slog!( "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \ qa: human — holding for human review." ); if let Err(e) = crate::agents::lifecycle::move_story_to_qa(&project_root, story_id) { slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"); } else { let qa_dir = project_root.join(".storkit/work/3_qa"); let story_path = qa_dir.join(format!("{story_id}.md")); if let Err(e) = crate::io::story_metadata::write_review_hold(&story_path) { slog_error!( "[pipeline] Failed to set review_hold on '{story_id}': {e}" ); } } } } } else { // Increment retry count and check if blocked. let story_path = project_root .join(".storkit/work/2_current") .join(format!("{story_id}.md")); if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "coder") { // Story has exceeded retry limit — do not restart. let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { story_id: story_id.to_string(), reason, }); } else { slog!( "[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting." ); let context = format!( "\n\n---\n## Previous Attempt Failed\n\ The acceptance gates failed with the following output:\n{}\n\n\ Please review the failures above, fix the issues, and try again.", completion.gate_output ); if let Err(e) = self .start_agent(&project_root, story_id, Some(agent_name), Some(&context)) .await { slog_error!( "[pipeline] Failed to restart coder '{agent_name}' for '{story_id}': {e}" ); } } } } PipelineStage::Qa => { if completion.gates_passed { // Run coverage gate in the QA worktree before advancing to merge. let coverage_path = worktree_path .clone() .unwrap_or_else(|| project_root.clone()); let cp = coverage_path.clone(); let coverage_result = tokio::task::spawn_blocking(move || crate::agents::gates::run_coverage_gate(&cp)) .await .unwrap_or_else(|e| { slog_warn!("[pipeline] Coverage gate task panicked: {e}"); Ok((false, format!("Coverage gate task panicked: {e}"))) }); let (coverage_passed, coverage_output) = match coverage_result { Ok(pair) => pair, Err(e) => (false, e), }; if coverage_passed { // Check whether this item needs human review before merging. let needs_human_review = { let item_type = crate::agents::lifecycle::item_type_from_id(story_id); if item_type == "spike" { true // Spikes always need human review. } else { let qa_dir = project_root.join(".storkit/work/3_qa"); let story_path = qa_dir.join(format!("{story_id}.md")); let default_qa = config.default_qa_mode(); matches!( crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa), crate::io::story_metadata::QaMode::Human ) } }; if needs_human_review { // Hold in 3_qa/ for human review. let qa_dir = project_root.join(".storkit/work/3_qa"); let story_path = qa_dir.join(format!("{story_id}.md")); if let Err(e) = crate::io::story_metadata::write_review_hold(&story_path) { slog_error!( "[pipeline] Failed to set review_hold on '{story_id}': {e}" ); } slog!( "[pipeline] QA passed for '{story_id}'. \ Holding for human review. \ Worktree preserved at: {worktree_path:?}" ); } else { slog!( "[pipeline] QA passed gates and coverage for '{story_id}'. \ Moving directly to merge." ); if let Err(e) = crate::agents::lifecycle::move_story_to_merge(&project_root, story_id) { slog_error!( "[pipeline] Failed to move '{story_id}' to 4_merge/: {e}" ); } else if let Err(e) = self .start_agent(&project_root, story_id, Some("mergemaster"), None) .await { slog_error!( "[pipeline] Failed to start mergemaster for '{story_id}': {e}" ); } } } else { let story_path = project_root .join(".storkit/work/3_qa") .join(format!("{story_id}.md")); if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "qa-coverage") { // Story has exceeded retry limit — do not restart. let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { story_id: story_id.to_string(), reason, }); } else { slog!( "[pipeline] QA coverage gate failed for '{story_id}'. Restarting QA." ); let context = format!( "\n\n---\n## Coverage Gate Failed\n\ The coverage gate (script/test_coverage) failed with the following output:\n{}\n\n\ Please improve test coverage until the coverage gate passes.", coverage_output ); if let Err(e) = self .start_agent(&project_root, story_id, Some("qa"), Some(&context)) .await { slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}"); } } } } else { let story_path = project_root .join(".storkit/work/3_qa") .join(format!("{story_id}.md")); if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "qa") { // Story has exceeded retry limit — do not restart. let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { story_id: story_id.to_string(), reason, }); } else { slog!("[pipeline] QA failed gates for '{story_id}'. Restarting."); let context = format!( "\n\n---\n## Previous QA Attempt Failed\n\ The acceptance gates failed with the following output:\n{}\n\n\ Please re-run and fix the issues.", completion.gate_output ); if let Err(e) = self .start_agent(&project_root, story_id, Some("qa"), Some(&context)) .await { slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}"); } } } } PipelineStage::Mergemaster => { // Block advancement if the mergemaster explicitly reported a failure. // The server-owned gate check runs in the feature-branch worktree (not // master), so `gates_passed=true` is misleading when no code was merged. if merge_failure_reported { slog!( "[pipeline] Pipeline advancement blocked for '{story_id}': \ mergemaster explicitly reported a merge failure. \ Story stays in 4_merge/ for human review." ); } else { // Run script/test on master (project_root) as the post-merge verification. slog!( "[pipeline] Mergemaster completed for '{story_id}'. Running post-merge tests on master." ); let root = project_root.clone(); let test_result = tokio::task::spawn_blocking(move || crate::agents::gates::run_project_tests(&root)) .await .unwrap_or_else(|e| { slog_warn!("[pipeline] Post-merge test task panicked: {e}"); Ok((false, format!("Test task panicked: {e}"))) }); let (passed, output) = match test_result { Ok(pair) => pair, Err(e) => (false, e), }; if passed { slog!( "[pipeline] Post-merge tests passed for '{story_id}'. Moving to done." ); if let Err(e) = crate::agents::lifecycle::move_story_to_done(&project_root, story_id) { slog_error!("[pipeline] Failed to move '{story_id}' to done: {e}"); } self.remove_agents_for_story(story_id); // TODO: Re-enable worktree cleanup once we have persistent agent logs. // Removing worktrees destroys evidence needed to debug empty-commit agents. // let config = // crate::config::ProjectConfig::load(&project_root).unwrap_or_default(); // if let Err(e) = // worktree::remove_worktree_by_story_id(&project_root, story_id, &config) // .await // { // slog!( // "[pipeline] Failed to remove worktree for '{story_id}': {e}" // ); // } slog!( "[pipeline] Story '{story_id}' done. Worktree preserved for inspection." ); } else { let story_path = project_root .join(".storkit/work/4_merge") .join(format!("{story_id}.md")); if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "mergemaster") { // Story has exceeded retry limit — do not restart. let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { story_id: story_id.to_string(), reason, }); } else { slog!( "[pipeline] Post-merge tests failed for '{story_id}'. Restarting mergemaster." ); let context = format!( "\n\n---\n## Post-Merge Test Failed\n\ The tests on master failed with the following output:\n{}\n\n\ Please investigate and resolve the failures, then call merge_agent_work again.", output ); if let Err(e) = self .start_agent( &project_root, story_id, Some("mergemaster"), Some(&context), ) .await { slog_error!( "[pipeline] Failed to restart mergemaster for '{story_id}': {e}" ); } } } } } } // Always scan for unassigned work after any agent completes, regardless // of the outcome (success, failure, restart). This ensures stories that // failed agent assignment due to busy agents are retried when agents // become available (bug 295). self.auto_assign_available_work(&project_root).await; } } /// Spawn pipeline advancement as a background task. /// /// This is a **non-async** function so it does not participate in the opaque /// type cycle between `start_agent` and `run_server_owned_completion`. #[allow(clippy::too_many_arguments)] pub(super) fn spawn_pipeline_advance( agents: Arc>>, port: u16, story_id: &str, agent_name: &str, completion: CompletionReport, project_root: Option, worktree_path: Option, watcher_tx: broadcast::Sender, merge_failure_reported: bool, ) { let sid = story_id.to_string(); let aname = agent_name.to_string(); tokio::spawn(async move { let pool = AgentPool { agents, port, child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx, merge_jobs: Arc::new(Mutex::new(HashMap::new())), }; pool.run_pipeline_advance( &sid, &aname, completion, project_root, worktree_path, merge_failure_reported, ) .await; }); } /// Increment retry_count and block the story if it exceeds `max_retries`. /// /// Returns `Some(reason)` if the story is now blocked (caller should NOT restart the agent). /// Returns `None` if the story may be retried. /// When `max_retries` is 0, retry limits are disabled. fn should_block_story(story_path: &Path, max_retries: u32, story_id: &str, stage_label: &str) -> Option { use crate::io::story_metadata::{increment_retry_count, write_blocked}; if max_retries == 0 { // Retry limits disabled. return None; } match increment_retry_count(story_path) { Ok(new_count) => { if new_count >= max_retries { slog_warn!( "[pipeline] Story '{story_id}' reached retry limit ({new_count}/{max_retries}) \ at {stage_label} stage. Marking as blocked." ); if let Err(e) = write_blocked(story_path) { slog_error!("[pipeline] Failed to write blocked flag for '{story_id}': {e}"); } Some(format!( "Retry limit exceeded ({new_count}/{max_retries}) at {stage_label} stage" )) } else { slog!( "[pipeline] Story '{story_id}' retry {new_count}/{max_retries} at {stage_label} stage." ); None } } Err(e) => { slog_error!("[pipeline] Failed to increment retry_count for '{story_id}': {e}"); None // Don't block on error — allow retry. } } } #[cfg(test)] mod tests { use super::super::super::AgentPool; use super::super::super::composite_key; use crate::agents::{AgentStatus, CompletionReport}; use crate::io::watcher::WatcherEvent; // ── pipeline advance tests ──────────────────────────────────────────────── #[tokio::test] async fn pipeline_advance_coder_gates_pass_server_qa_moves_to_merge() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); // Set up story in 2_current/ (no qa frontmatter → uses project default "server") let current = root.join(".storkit/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write(current.join("50_story_test.md"), "test").unwrap(); let pool = AgentPool::new_test(3001); pool.run_pipeline_advance( "50_story_test", "coder-1", CompletionReport { summary: "done".to_string(), gates_passed: true, gate_output: String::new(), }, Some(root.to_path_buf()), None, false, ) .await; // With default qa: server, story skips QA and goes straight to 4_merge/ assert!( root.join(".storkit/work/4_merge/50_story_test.md") .exists(), "story should be in 4_merge/" ); assert!( !current.join("50_story_test.md").exists(), "story should not still be in 2_current/" ); } #[tokio::test] async fn pipeline_advance_coder_gates_pass_agent_qa_moves_to_qa() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); // Set up story in 2_current/ with qa: agent frontmatter let current = root.join(".storkit/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write( current.join("50_story_test.md"), "---\nname: Test\nqa: agent\n---\ntest", ) .unwrap(); let pool = AgentPool::new_test(3001); pool.run_pipeline_advance( "50_story_test", "coder-1", CompletionReport { summary: "done".to_string(), gates_passed: true, gate_output: String::new(), }, Some(root.to_path_buf()), None, false, ) .await; // With qa: agent, story should move to 3_qa/ assert!( root.join(".storkit/work/3_qa/50_story_test.md").exists(), "story should be in 3_qa/" ); assert!( !current.join("50_story_test.md").exists(), "story should not still be in 2_current/" ); } #[tokio::test] async fn pipeline_advance_qa_gates_pass_moves_story_to_merge() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); // Set up story in 3_qa/ let qa_dir = root.join(".storkit/work/3_qa"); fs::create_dir_all(&qa_dir).unwrap(); // qa: server so the story skips human review and goes straight to merge. fs::write( qa_dir.join("51_story_test.md"), "---\nname: Test\nqa: server\n---\ntest", ) .unwrap(); let pool = AgentPool::new_test(3001); pool.run_pipeline_advance( "51_story_test", "qa", CompletionReport { summary: "QA done".to_string(), gates_passed: true, gate_output: String::new(), }, Some(root.to_path_buf()), None, false, ) .await; // Story should have moved to 4_merge/ assert!( root.join(".storkit/work/4_merge/51_story_test.md") .exists(), "story should be in 4_merge/" ); assert!( !qa_dir.join("51_story_test.md").exists(), "story should not still be in 3_qa/" ); } #[tokio::test] async fn pipeline_advance_supervisor_does_not_advance() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let current = root.join(".storkit/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write(current.join("52_story_test.md"), "test").unwrap(); let pool = AgentPool::new_test(3001); pool.run_pipeline_advance( "52_story_test", "supervisor", CompletionReport { summary: "supervised".to_string(), gates_passed: true, gate_output: String::new(), }, Some(root.to_path_buf()), None, false, ) .await; // Story should NOT have moved (supervisors don't advance pipeline) assert!( current.join("52_story_test.md").exists(), "story should still be in 2_current/ for supervisor" ); } #[tokio::test] async fn pipeline_advance_sends_agent_state_changed_to_watcher_tx() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); // Set up story in 2_current/ let current = root.join(".storkit/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write(current.join("173_story_test.md"), "test").unwrap(); // Ensure 3_qa/ exists for the move target fs::create_dir_all(root.join(".storkit/work/3_qa")).unwrap(); // Ensure 1_backlog/ exists (start_agent calls move_story_to_current) fs::create_dir_all(root.join(".storkit/work/1_backlog")).unwrap(); // Write a project.toml with a qa agent so start_agent can resolve it. fs::create_dir_all(root.join(".storkit")).unwrap(); fs::write( root.join(".storkit/project.toml"), r#" default_qa = "agent" [[agent]] name = "coder-1" role = "Coder" command = "echo" args = ["noop"] prompt = "test" stage = "coder" [[agent]] name = "qa" role = "QA" command = "echo" args = ["noop"] prompt = "test" stage = "qa" "#, ) .unwrap(); let pool = AgentPool::new_test(3001); // Subscribe to the watcher channel BEFORE the pipeline advance. let mut rx = pool.watcher_tx.subscribe(); pool.run_pipeline_advance( "173_story_test", "coder-1", CompletionReport { summary: "done".to_string(), gates_passed: true, gate_output: String::new(), }, Some(root.to_path_buf()), None, false, ) .await; // The pipeline advance should have sent AgentStateChanged events via // the pool's watcher_tx (not a dummy channel). Collect all events. let mut got_agent_state_changed = false; while let Ok(evt) = rx.try_recv() { if matches!(evt, WatcherEvent::AgentStateChanged) { got_agent_state_changed = true; break; } } assert!( got_agent_state_changed, "pipeline advance should send AgentStateChanged through the real watcher_tx \ (bug 173: lozenges must update when agents are assigned during pipeline advance)" ); } // ── bug 295: pipeline advance picks up waiting QA stories ────────── #[tokio::test] async fn pipeline_advance_picks_up_waiting_qa_stories_after_completion() { use std::fs; use super::super::super::auto_assign::is_agent_free; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let sk = root.join(".storkit"); let qa_dir = sk.join("work/3_qa"); fs::create_dir_all(&qa_dir).unwrap(); // Configure a single QA agent. fs::write( sk.join("project.toml"), r#" [[agent]] name = "qa" stage = "qa" "#, ) .unwrap(); // Story 292 is in QA with QA agent running (will "complete" via // run_pipeline_advance below). Story 293 is in QA with NO agent — // simulating the "stuck" state from bug 295. fs::write( qa_dir.join("292_story_first.md"), "---\nname: First\nqa: human\n---\n", ) .unwrap(); fs::write( qa_dir.join("293_story_second.md"), "---\nname: Second\nqa: human\n---\n", ) .unwrap(); let pool = AgentPool::new_test(3001); // QA is currently running on story 292. pool.inject_test_agent("292_story_first", "qa", AgentStatus::Running); // Verify that 293 cannot get a QA agent right now (QA is busy). { let agents = pool.agents.lock().unwrap(); assert!( !is_agent_free(&agents, "qa"), "qa should be busy on story 292" ); } // Simulate QA completing on story 292: remove the agent from the pool // (as run_server_owned_completion does) then run pipeline advance. { let mut agents = pool.agents.lock().unwrap(); agents.remove(&composite_key("292_story_first", "qa")); } pool.run_pipeline_advance( "292_story_first", "qa", CompletionReport { summary: "QA done".to_string(), gates_passed: true, gate_output: String::new(), }, Some(root.to_path_buf()), None, false, ) .await; // After pipeline advance, auto_assign should have started QA on story 293. let agents = pool.agents.lock().unwrap(); let qa_on_293 = agents.values().any(|a| { a.agent_name == "qa" && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) }); assert!( qa_on_293, "auto_assign should have started qa for story 293 after 292's QA completed, \ but no qa agent is pending/running. Pool: {:?}", agents .iter() .map(|(k, a)| format!("{k}: {} ({})", a.agent_name, a.status)) .collect::>() ); } }