//! Pipeline advance — moves stories forward through pipeline stages after agent completion. use crate::config::ProjectConfig; use crate::slog; use crate::slog_error; use crate::slog_warn; use crate::io::watcher::WatcherEvent; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; use super::super::super::{ CompletionReport, PipelineStage, agent_config_stage, pipeline_stage, }; use super::super::{AgentPool, StoryAgent}; impl AgentPool { /// Pipeline advancement: after an agent completes, move the story to /// the next pipeline stage and start the appropriate agent. #[allow(clippy::too_many_arguments)] pub(super) async fn run_pipeline_advance( &self, story_id: &str, agent_name: &str, completion: CompletionReport, project_root: Option, worktree_path: Option, merge_failure_reported: bool, previous_session_id: Option, ) { let project_root = match project_root { Some(p) => p, None => { slog_warn!("[pipeline] No project_root for '{story_id}:{agent_name}'"); return; } }; let config = ProjectConfig::load(&project_root).unwrap_or_default(); let stage = config .find_agent(agent_name) .map(agent_config_stage) .unwrap_or_else(|| pipeline_stage(agent_name)); match stage { PipelineStage::Other => { // Supervisors and unknown agents do not advance the pipeline. } PipelineStage::Coder => { if completion.gates_passed { // Determine effective QA mode for this story. let qa_mode = { let item_type = crate::agents::lifecycle::item_type_from_id(story_id); if item_type == "spike" { crate::io::story_metadata::QaMode::Human } else { let default_qa = config.default_qa_mode(); resolve_qa_mode_from_store(&project_root, story_id, default_qa) } }; match qa_mode { crate::io::story_metadata::QaMode::Server => { slog!( "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \ qa: server — moving directly to merge." ); if let Err(e) = crate::agents::lifecycle::move_story_to_merge(&project_root, story_id) { slog_error!( "[pipeline] Failed to move '{story_id}' to 4_merge/: {e}" ); } else { self.start_mergemaster_or_block(&project_root, story_id).await; } } crate::io::story_metadata::QaMode::Agent => { slog!( "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \ qa: agent — moving to QA." ); if let Err(e) = crate::agents::lifecycle::move_story_to_qa(&project_root, story_id) { slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"); } else if let Err(e) = self .start_agent(&project_root, story_id, Some("qa"), None, None) .await { slog_error!("[pipeline] Failed to start qa agent for '{story_id}': {e}"); } } crate::io::story_metadata::QaMode::Human => { slog!( "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \ qa: human — holding for human review." ); if let Err(e) = crate::agents::lifecycle::move_story_to_qa(&project_root, story_id) { slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"); } else { write_review_hold_to_store(story_id); } } } } else { // Increment retry count and check if blocked. if let Some(reason) = should_block_story(story_id, config.max_retries, "coder") { // Story has exceeded retry limit — do not restart. let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { story_id: story_id.to_string(), reason, }); } else { slog!( "[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting." ); let context = format!( "\n\n---\n## Previous Attempt Failed\n\ The acceptance gates failed with the following output:\n{}\n\n\ Please review the failures above, fix the issues, and try again.", completion.gate_output ); if let Err(e) = self .start_agent( &project_root, story_id, Some(agent_name), Some(&context), previous_session_id, ) .await { slog_error!( "[pipeline] Failed to restart coder '{agent_name}' for '{story_id}': {e}" ); } } } } PipelineStage::Qa => { if completion.gates_passed { // Run coverage gate in the QA worktree before advancing to merge. let coverage_path = worktree_path .clone() .unwrap_or_else(|| project_root.clone()); let cp = coverage_path.clone(); let coverage_result = tokio::task::spawn_blocking(move || crate::agents::gates::run_coverage_gate(&cp)) .await .unwrap_or_else(|e| { slog_warn!("[pipeline] Coverage gate task panicked: {e}"); Ok((false, format!("Coverage gate task panicked: {e}"))) }); let (coverage_passed, coverage_output) = match coverage_result { Ok(pair) => pair, Err(e) => (false, e), }; if coverage_passed { // Check whether this item needs human review before merging. let needs_human_review = { let item_type = crate::agents::lifecycle::item_type_from_id(story_id); if item_type == "spike" { true // Spikes always need human review. } else { let default_qa = config.default_qa_mode(); matches!( resolve_qa_mode_from_store(&project_root, story_id, default_qa), crate::io::story_metadata::QaMode::Human ) } }; if needs_human_review { // Hold in 3_qa/ for human review. write_review_hold_to_store(story_id); slog!( "[pipeline] QA passed for '{story_id}'. \ Holding for human review. \ Worktree preserved at: {worktree_path:?}" ); } else { slog!( "[pipeline] QA passed gates and coverage for '{story_id}'. \ Moving directly to merge." ); if let Err(e) = crate::agents::lifecycle::move_story_to_merge(&project_root, story_id) { slog_error!( "[pipeline] Failed to move '{story_id}' to 4_merge/: {e}" ); } else { self.start_mergemaster_or_block(&project_root, story_id).await; } } } else if let Some(reason) = should_block_story(story_id, config.max_retries, "qa-coverage") { // Story has exceeded retry limit — do not restart. let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { story_id: story_id.to_string(), reason, }); } else { slog!( "[pipeline] QA coverage gate failed for '{story_id}'. Restarting QA." ); let context = format!( "\n\n---\n## Coverage Gate Failed\n\ The coverage gate (script/test_coverage) failed with the following output:\n{}\n\n\ Please improve test coverage until the coverage gate passes.", coverage_output ); if let Err(e) = self .start_agent(&project_root, story_id, Some("qa"), Some(&context), None) .await { slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}"); } } } else if let Some(reason) = should_block_story(story_id, config.max_retries, "qa") { // Story has exceeded retry limit — do not restart. let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { story_id: story_id.to_string(), reason, }); } else { slog!("[pipeline] QA failed gates for '{story_id}'. Restarting."); let context = format!( "\n\n---\n## Previous QA Attempt Failed\n\ The acceptance gates failed with the following output:\n{}\n\n\ Please re-run and fix the issues.", completion.gate_output ); if let Err(e) = self .start_agent(&project_root, story_id, Some("qa"), Some(&context), None) .await { slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}"); } } } PipelineStage::Mergemaster => { // Bug 529: Guard against stale mergemaster advances. If the story // has already reached done or archived (e.g. a previous mergemaster // succeeded), this advance is a zombie — skip it entirely to avoid // phantom notifications and redundant post-merge test runs. if let Ok(Some(typed_item)) = crate::pipeline_state::read_typed(story_id) { let current_dir = typed_item.stage.dir_name(); if current_dir == "5_done" || current_dir == "6_archived" { slog!( "[pipeline] Skipping stale mergemaster advance for '{story_id}': \ story is already in work/{current_dir}/" ); // Skip pipeline advancement — do not run post-merge tests, // do not emit notifications, do not restart agents. return; } } // Block advancement if the mergemaster explicitly reported a failure. // The server-owned gate check runs in the feature-branch worktree (not // master), so `gates_passed=true` is misleading when no code was merged. if merge_failure_reported { slog!( "[pipeline] Pipeline advancement blocked for '{story_id}': \ mergemaster explicitly reported a merge failure. \ Story stays in 4_merge/ for human review." ); } else { // Run script/test on master (project_root) as the post-merge verification. slog!( "[pipeline] Mergemaster completed for '{story_id}'. Running post-merge tests on master." ); let root = project_root.clone(); let test_result = tokio::task::spawn_blocking(move || crate::agents::gates::run_project_tests(&root)) .await .unwrap_or_else(|e| { slog_warn!("[pipeline] Post-merge test task panicked: {e}"); Ok((false, format!("Test task panicked: {e}"))) }); let (passed, output) = match test_result { Ok(pair) => pair, Err(e) => (false, e), }; if passed { slog!( "[pipeline] Post-merge tests passed for '{story_id}'. Moving to done." ); if let Err(e) = crate::agents::lifecycle::move_story_to_done(&project_root, story_id) { slog_error!("[pipeline] Failed to move '{story_id}' to done: {e}"); } self.remove_agents_for_story(story_id); // TODO: Re-enable worktree cleanup once we have persistent agent logs. // Removing worktrees destroys evidence needed to debug empty-commit agents. // let config = // crate::config::ProjectConfig::load(&project_root).unwrap_or_default(); // if let Err(e) = // worktree::remove_worktree_by_story_id(&project_root, story_id, &config) // .await // { // slog!( // "[pipeline] Failed to remove worktree for '{story_id}': {e}" // ); // } slog!( "[pipeline] Story '{story_id}' done. Worktree preserved for inspection." ); } else if let Some(reason) = should_block_story(story_id, config.max_retries, "mergemaster") { // Story has exceeded retry limit — do not restart. let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { story_id: story_id.to_string(), reason, }); } else { slog!( "[pipeline] Post-merge tests failed for '{story_id}'. Restarting mergemaster." ); let context = format!( "\n\n---\n## Post-Merge Test Failed\n\ The tests on master failed with the following output:\n{}\n\n\ Please investigate and resolve the failures, then call merge_agent_work again.", output ); if let Err(e) = self .start_agent( &project_root, story_id, Some("mergemaster"), Some(&context), None, ) .await { slog_error!( "[pipeline] Failed to restart mergemaster for '{story_id}': {e}" ); } } } } } // Always scan for unassigned work after any agent completes, regardless // of the outcome (success, failure, restart). This ensures stories that // failed agent assignment due to busy agents are retried when agents // become available (bug 295). self.auto_assign_available_work(&project_root).await; } /// Start the mergemaster agent for `story_id`, but only if the feature /// branch has commits that are not yet on master. /// /// If the branch has zero commits ahead of master, this logs an error and /// sends a [`WatcherEvent::StoryBlocked`] instead of spawning a Claude /// session. A no-op merge session was observed spending $0.82 in the /// 2026-04-09 incident (story 519). async fn start_mergemaster_or_block(&self, project_root: &Path, story_id: &str) { let branch = format!("feature/story-{story_id}"); if !crate::agents::lifecycle::feature_branch_has_unmerged_changes(project_root, story_id) { slog_error!( "[mergemaster] Branch '{branch}' has no commits ahead of master — \ refusing to spawn merge session. \ Likely cause: the worktree was reset to master after the feature \ branch's commits were created. Investigate the worktree's git state \ before retrying. Story '{story_id}' stays in 4_merge/ for human review." ); let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { story_id: story_id.to_string(), reason: format!( "Feature branch '{branch}' has no commits ahead of master — nothing to merge. \ The worktree may have been reset to master. \ Check the worktree's git state and retry manually." ), }); return; } if let Err(e) = self .start_agent(project_root, story_id, Some("mergemaster"), None, None) .await { slog_error!("[pipeline] Failed to start mergemaster for '{story_id}': {e}"); } } } /// Spawn pipeline advancement as a background task. /// /// This is a **non-async** function so it does not participate in the opaque /// type cycle between `start_agent` and `run_server_owned_completion`. #[allow(clippy::too_many_arguments)] pub(super) fn spawn_pipeline_advance( agents: Arc>>, port: u16, story_id: &str, agent_name: &str, completion: CompletionReport, project_root: Option, worktree_path: Option, watcher_tx: broadcast::Sender, merge_failure_reported: bool, previous_session_id: Option, ) { let sid = story_id.to_string(); let aname = agent_name.to_string(); tokio::spawn(async move { let pool = AgentPool { agents, port, child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx, merge_jobs: Arc::new(Mutex::new(HashMap::new())), }; pool.run_pipeline_advance( &sid, &aname, completion, project_root, worktree_path, merge_failure_reported, previous_session_id, ) .await; }); } /// Resolve QA mode from the content store. fn resolve_qa_mode_from_store( _project_root: &Path, story_id: &str, default: crate::io::story_metadata::QaMode, ) -> crate::io::story_metadata::QaMode { if let Some(contents) = crate::db::read_content(story_id) { return crate::io::story_metadata::resolve_qa_mode_from_content(&contents, default); } default } /// Write review_hold to the content store. fn write_review_hold_to_store(story_id: &str) { if let Some(contents) = crate::db::read_content(story_id) { let updated = crate::io::story_metadata::write_review_hold_in_content(&contents); crate::db::write_content(story_id, &updated); // Also persist to SQLite via shadow write. let stage = crate::pipeline_state::read_typed(story_id) .ok() .flatten() .map(|i| i.stage.dir_name().to_string()) .unwrap_or_else(|| "3_qa".to_string()); crate::db::write_item_with_content(story_id, &stage, &updated); } else { slog_error!("[pipeline] Cannot write review_hold for '{story_id}': no content in store"); } } /// Increment retry_count and block the story if it exceeds `max_retries`. /// /// Returns `Some(reason)` if the story is now blocked (caller should NOT restart the agent). /// Returns `None` if the story may be retried. /// When `max_retries` is 0, retry limits are disabled. fn should_block_story(story_id: &str, max_retries: u32, stage_label: &str) -> Option { use crate::io::story_metadata::{increment_retry_count_in_content, write_blocked_in_content}; if max_retries == 0 { return None; } if let Some(contents) = crate::db::read_content(story_id) { let (updated, new_count) = increment_retry_count_in_content(&contents); crate::db::write_content(story_id, &updated); let stage = crate::pipeline_state::read_typed(story_id) .ok() .flatten() .map(|i| i.stage.dir_name().to_string()) .unwrap_or_else(|| "2_current".to_string()); crate::db::write_item_with_content(story_id, &stage, &updated); if new_count >= max_retries { slog_warn!( "[pipeline] Story '{story_id}' reached retry limit ({new_count}/{max_retries}) \ at {stage_label} stage. Marking as blocked." ); let blocked = write_blocked_in_content(&updated); crate::db::write_content(story_id, &blocked); crate::db::write_item_with_content(story_id, &stage, &blocked); Some(format!( "Retry limit exceeded ({new_count}/{max_retries}) at {stage_label} stage" )) } else { slog!( "[pipeline] Story '{story_id}' retry {new_count}/{max_retries} at {stage_label} stage." ); None } } else { slog_error!("[pipeline] Failed to read content for '{story_id}' to increment retry_count"); None } } #[cfg(test)] mod tests { use super::super::super::AgentPool; use super::super::super::composite_key; use crate::agents::{AgentStatus, CompletionReport}; use crate::io::watcher::WatcherEvent; // ── pipeline advance tests ──────────────────────────────────────────────── #[tokio::test] async fn pipeline_advance_coder_gates_pass_server_qa_moves_to_merge() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); // Set up story in 2_current/ (no qa frontmatter → uses project default "server"). // Use a unique high-numbered ID to avoid collision with the agent_qa test. let current = root.join(".huskies/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write(current.join("9908_story_server_qa.md"), "test").unwrap(); crate::db::ensure_content_store(); crate::db::write_content("9908_story_server_qa", "test"); let pool = AgentPool::new_test(3001); pool.run_pipeline_advance( "9908_story_server_qa", "coder-1", CompletionReport { summary: "done".to_string(), gates_passed: true, gate_output: String::new(), }, Some(root.to_path_buf()), None, false, None, ) .await; // With default qa: server, story skips QA and goes straight to 4_merge/ // Lifecycle moves now update the content store, not the filesystem. assert!( crate::db::read_content("9908_story_server_qa").is_some(), "story should still exist in content store after move to merge" ); } #[tokio::test] async fn pipeline_advance_coder_gates_pass_agent_qa_moves_to_qa() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); // Set up story in 2_current/ with qa: agent frontmatter. // Use a unique high-numbered ID to avoid collision with the server_qa test. let current = root.join(".huskies/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write( current.join("9909_story_agent_qa.md"), "---\nname: Test\nqa: agent\n---\ntest", ) .unwrap(); crate::db::ensure_content_store(); crate::db::write_content("9909_story_agent_qa", "---\nname: Test\nqa: agent\n---\ntest"); let pool = AgentPool::new_test(3001); pool.run_pipeline_advance( "9909_story_agent_qa", "coder-1", CompletionReport { summary: "done".to_string(), gates_passed: true, gate_output: String::new(), }, Some(root.to_path_buf()), None, false, None, ) .await; // With qa: agent, story should move to 3_qa/ // Lifecycle moves now update the content store, not the filesystem. assert!( crate::db::read_content("9909_story_agent_qa").is_some(), "story should still exist in content store after move to qa" ); } #[tokio::test] async fn pipeline_advance_qa_gates_pass_moves_story_to_merge() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); // Set up story in 3_qa/ let qa_dir = root.join(".huskies/work/3_qa"); fs::create_dir_all(&qa_dir).unwrap(); // qa: server so the story skips human review and goes straight to merge. fs::write( qa_dir.join("51_story_test.md"), "---\nname: Test\nqa: server\n---\ntest", ) .unwrap(); crate::db::ensure_content_store(); crate::db::write_content("51_story_test", "---\nname: Test\nqa: server\n---\ntest"); let pool = AgentPool::new_test(3001); pool.run_pipeline_advance( "51_story_test", "qa", CompletionReport { summary: "QA done".to_string(), gates_passed: true, gate_output: String::new(), }, Some(root.to_path_buf()), None, false, None, ) .await; // Story should have moved to 4_merge/ // Lifecycle moves now update the content store, not the filesystem. assert!( crate::db::read_content("51_story_test").is_some(), "story should still exist in content store after move to merge" ); } #[tokio::test] async fn pipeline_advance_supervisor_does_not_advance() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let current = root.join(".huskies/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write(current.join("52_story_test.md"), "test").unwrap(); let pool = AgentPool::new_test(3001); pool.run_pipeline_advance( "52_story_test", "supervisor", CompletionReport { summary: "supervised".to_string(), gates_passed: true, gate_output: String::new(), }, Some(root.to_path_buf()), None, false, None, ) .await; // Story should NOT have moved (supervisors don't advance pipeline) assert!( current.join("52_story_test.md").exists(), "story should still be in 2_current/ for supervisor" ); } #[tokio::test] async fn pipeline_advance_sends_agent_state_changed_to_watcher_tx() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); // Set up story in 2_current/ let current = root.join(".huskies/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write(current.join("173_story_test.md"), "test").unwrap(); // Ensure 3_qa/ exists for the move target fs::create_dir_all(root.join(".huskies/work/3_qa")).unwrap(); // Ensure 1_backlog/ exists (start_agent calls move_story_to_current) fs::create_dir_all(root.join(".huskies/work/1_backlog")).unwrap(); // Write a project.toml with a qa agent so start_agent can resolve it. fs::create_dir_all(root.join(".huskies")).unwrap(); fs::write( root.join(".huskies/project.toml"), r#" default_qa = "agent" [[agent]] name = "coder-1" role = "Coder" command = "echo" args = ["noop"] prompt = "test" stage = "coder" [[agent]] name = "qa" role = "QA" command = "echo" args = ["noop"] prompt = "test" stage = "qa" "#, ) .unwrap(); let pool = AgentPool::new_test(3001); // Subscribe to the watcher channel BEFORE the pipeline advance. let mut rx = pool.watcher_tx.subscribe(); pool.run_pipeline_advance( "173_story_test", "coder-1", CompletionReport { summary: "done".to_string(), gates_passed: true, gate_output: String::new(), }, Some(root.to_path_buf()), None, false, None, ) .await; // The pipeline advance should have sent AgentStateChanged events via // the pool's watcher_tx (not a dummy channel). Collect all events. let mut got_agent_state_changed = false; while let Ok(evt) = rx.try_recv() { if matches!(evt, WatcherEvent::AgentStateChanged) { got_agent_state_changed = true; break; } } assert!( got_agent_state_changed, "pipeline advance should send AgentStateChanged through the real watcher_tx \ (bug 173: lozenges must update when agents are assigned during pipeline advance)" ); } // ── story 519: mergemaster pre-flight blocks when no commits ahead ── /// Regression test for story 519: when the feature branch has zero commits /// ahead of master, mergemaster must not spawn a Claude session. A no-op /// session spent $0.82 in the 2026-04-09 incident because the worktree was /// reset to master before mergemaster ran. #[tokio::test] async fn mergemaster_blocks_and_sends_story_blocked_when_no_commits_ahead() { use std::fs; use std::process::Command; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); // Init a bare git repo on master with one empty commit. Command::new("git").args(["init"]).current_dir(root).output().unwrap(); Command::new("git").args(["config", "user.email", "test@test.com"]).current_dir(root).output().unwrap(); Command::new("git").args(["config", "user.name", "Test"]).current_dir(root).output().unwrap(); Command::new("git").args(["commit", "--allow-empty", "-m", "init"]).current_dir(root).output().unwrap(); // Create a feature branch that points at master HEAD (zero commits ahead). // This replicates the incident where the worktree was reset to master. Command::new("git") .args(["branch", "feature/story-9919_story_no_commits"]) .current_dir(root) .output() .unwrap(); // Set up pipeline dirs and story file. let current = root.join(".huskies/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::create_dir_all(root.join(".huskies/work/4_merge")).unwrap(); fs::write(current.join("9919_story_no_commits.md"), "---\nname: Test\n---\n").unwrap(); crate::db::ensure_content_store(); crate::db::write_content("9919_story_no_commits", "---\nname: Test\n---\n"); let pool = AgentPool::new_test(3001); let mut rx = pool.watcher_tx.subscribe(); // Simulate coder completing with gates passed (qa: server → goes to merge). pool.run_pipeline_advance( "9919_story_no_commits", "coder-1", CompletionReport { summary: "done".to_string(), gates_passed: true, gate_output: String::new(), }, Some(root.to_path_buf()), None, false, None, ) .await; // Story should still exist in the content store after moving to merge. assert!( crate::db::read_content("9919_story_no_commits").is_some(), "story should remain in content store — not removed" ); // A StoryBlocked event must have been emitted (triggers chat failure notice, // not the success 🎉 emoji). let mut got_blocked = false; while let Ok(evt) = rx.try_recv() { if let WatcherEvent::StoryBlocked { story_id, .. } = &evt && story_id == "9919_story_no_commits" { got_blocked = true; break; } } assert!( got_blocked, "StoryBlocked event must be sent when feature branch has no commits ahead of master" ); // No mergemaster agent should have been started. let agents = pool.agents.lock().unwrap(); let mergemaster_started = agents .values() .any(|a| a.agent_name.contains("mergemaster")); assert!( !mergemaster_started, "mergemaster agent must NOT be started when no commits ahead of master" ); } // ── bug 295: pipeline advance picks up waiting QA stories ────────── #[tokio::test] async fn pipeline_advance_picks_up_waiting_qa_stories_after_completion() { use std::fs; use super::super::super::auto_assign::is_agent_free; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let sk = root.join(".huskies"); let qa_dir = sk.join("work/3_qa"); fs::create_dir_all(&qa_dir).unwrap(); // Configure a single QA agent. fs::write( sk.join("project.toml"), r#" [[agent]] name = "qa" stage = "qa" "#, ) .unwrap(); // Story 292 is in QA with QA agent running (will "complete" via // run_pipeline_advance below). Story 293 is in QA with NO agent — // simulating the "stuck" state from bug 295. fs::write( qa_dir.join("292_story_first.md"), "---\nname: First\nqa: human\n---\n", ) .unwrap(); fs::write( qa_dir.join("293_story_second.md"), "---\nname: Second\nqa: human\n---\n", ) .unwrap(); let pool = AgentPool::new_test(3001); // QA is currently running on story 292. pool.inject_test_agent("292_story_first", "qa", AgentStatus::Running); // Verify that 293 cannot get a QA agent right now (QA is busy). { let agents = pool.agents.lock().unwrap(); assert!( !is_agent_free(&agents, "qa"), "qa should be busy on story 292" ); } // Simulate QA completing on story 292: remove the agent from the pool // (as run_server_owned_completion does) then run pipeline advance. { let mut agents = pool.agents.lock().unwrap(); agents.remove(&composite_key("292_story_first", "qa")); } pool.run_pipeline_advance( "292_story_first", "qa", CompletionReport { summary: "QA done".to_string(), gates_passed: true, gate_output: String::new(), }, Some(root.to_path_buf()), None, false, None, ) .await; // After pipeline advance, auto_assign should have started QA on story 293. let agents = pool.agents.lock().unwrap(); let qa_on_293 = agents.values().any(|a| { a.agent_name == "qa" && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) }); assert!( qa_on_293, "auto_assign should have started qa for story 293 after 292's QA completed, \ but no qa agent is pending/running. Pool: {:?}", agents .iter() .map(|(k, a)| format!("{k}: {} ({})", a.agent_name, a.status)) .collect::>() ); } // ── bug 529: stale mergemaster advance for a done story is a no-op ── /// Regression test for bug 529: when a stale mergemaster advance fires /// after the story has already reached 5_done, the advance must be a /// no-op — no post-merge tests, no notifications, no agent restarts. #[tokio::test] async fn stale_mergemaster_advance_for_done_story_is_noop() { use std::fs; use std::process::Command; // Initialise CRDT so read_typed works. crate::crdt_state::init_for_test(); crate::db::ensure_content_store(); let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); // Init a git repo so post-merge tests would pass if they ran. Command::new("git").args(["init"]).current_dir(root).output().unwrap(); Command::new("git").args(["config", "user.email", "test@test.com"]).current_dir(root).output().unwrap(); Command::new("git").args(["config", "user.name", "Test"]).current_dir(root).output().unwrap(); Command::new("git").args(["commit", "--allow-empty", "-m", "init"]).current_dir(root).output().unwrap(); // Set up pipeline dirs. fs::create_dir_all(root.join(".huskies/work/5_done")).unwrap(); // Seed the story in 5_done via the DB, which also writes to the CRDT. let story_id = "9929_story_zombie_merge"; let content = "---\nname: Zombie Merge Test\n---\n"; crate::db::write_content(story_id, content); crate::db::write_item_with_content(story_id, "5_done", content); let pool = AgentPool::new_test(3001); let mut rx = pool.watcher_tx.subscribe(); // Simulate a stale mergemaster advance firing for the already-done story. pool.run_pipeline_advance( story_id, "mergemaster", CompletionReport { summary: "stale advance".to_string(), gates_passed: true, gate_output: String::new(), }, Some(root.to_path_buf()), None, false, None, ) .await; // No agents should have been started. let agents = pool.agents.lock().unwrap(); assert!( agents.is_empty(), "No agents should be started for a stale advance on a done story. \ Pool: {:?}", agents.keys().collect::>() ); drop(agents); // No StoryBlocked or other events should have been emitted. let mut got_event = false; while let Ok(evt) = rx.try_recv() { // AgentStateChanged from auto_assign is acceptable only if the // advance didn't short-circuit. Since we return early, no events. if matches!(evt, WatcherEvent::StoryBlocked { .. }) { got_event = true; } } assert!( !got_event, "No StoryBlocked event should be emitted for a stale advance" ); // The story should still be in 5_done (not moved elsewhere). if let Ok(Some(item)) = crate::pipeline_state::read_typed(story_id) { assert_eq!( item.stage.dir_name(), "5_done", "Story should remain in 5_done after stale mergemaster advance" ); } } }