diff --git a/server/src/agents/pool/mod.rs b/server/src/agents/pool/mod.rs index 739db792..722ab91d 100644 --- a/server/src/agents/pool/mod.rs +++ b/server/src/agents/pool/mod.rs @@ -51,6 +51,7 @@ impl AgentPool { // Spawn a background task (only when inside a tokio runtime) that // listens for RateLimitWarning and HardBlock events and updates the + // listens for RateLimitWarning and RateLimitHardBlock events and updates the // throttled flag on the relevant agent so status dots stay current. if tokio::runtime::Handle::try_current().is_ok() { let agents_clone = Arc::clone(&pool.agents); diff --git a/server/src/agents/pool/pipeline.rs b/server/src/agents/pool/pipeline.rs deleted file mode 100644 index 0d6f7334..00000000 --- a/server/src/agents/pool/pipeline.rs +++ /dev/null @@ -1,1789 +0,0 @@ -use crate::config::ProjectConfig; -use crate::slog; -use crate::slog_error; -use crate::slog_warn; -use crate::worktree; -use crate::io::watcher::WatcherEvent; -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; -use tokio::sync::broadcast; - -use super::super::{ - AgentEvent, AgentStatus, CompletionReport, PipelineStage, - agent_config_stage, pipeline_stage, -}; -use super::{AgentPool, StoryAgent, composite_key}; - -impl AgentPool { - /// Pipeline advancement: after an agent completes, move the story to - /// the next pipeline stage and start the appropriate agent. - pub(super) async fn run_pipeline_advance( - &self, - story_id: &str, - agent_name: &str, - completion: CompletionReport, - project_root: Option, - worktree_path: Option, - merge_failure_reported: bool, - ) { - let project_root = match project_root { - Some(p) => p, - None => { - slog_warn!("[pipeline] No project_root for '{story_id}:{agent_name}'"); - return; - } - }; - - let config = ProjectConfig::load(&project_root).unwrap_or_default(); - let stage = config - .find_agent(agent_name) - .map(agent_config_stage) - .unwrap_or_else(|| pipeline_stage(agent_name)); - - match stage { - PipelineStage::Other => { - // Supervisors and unknown agents do not advance the pipeline. - } - PipelineStage::Coder => { - if completion.gates_passed { - // Determine effective QA mode for this story. - let qa_mode = { - let item_type = super::super::lifecycle::item_type_from_id(story_id); - if item_type == "spike" { - crate::io::story_metadata::QaMode::Human - } else { - let default_qa = config.default_qa_mode(); - // Story is in 2_current/ when a coder completes. - let story_path = project_root - .join(".storkit/work/2_current") - .join(format!("{story_id}.md")); - crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa) - } - }; - - match qa_mode { - crate::io::story_metadata::QaMode::Server => { - slog!( - "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \ - qa: server — moving directly to merge." - ); - if let Err(e) = - super::super::lifecycle::move_story_to_merge(&project_root, story_id) - { - slog_error!( - "[pipeline] Failed to move '{story_id}' to 4_merge/: {e}" - ); - } else if let Err(e) = self - .start_agent(&project_root, story_id, Some("mergemaster"), None) - .await - { - slog_error!( - "[pipeline] Failed to start mergemaster for '{story_id}': {e}" - ); - } - } - crate::io::story_metadata::QaMode::Agent => { - slog!( - "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \ - qa: agent — moving to QA." - ); - if let Err(e) = super::super::lifecycle::move_story_to_qa(&project_root, story_id) { - slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"); - } else if let Err(e) = self - .start_agent(&project_root, story_id, Some("qa"), None) - .await - { - slog_error!("[pipeline] Failed to start qa agent for '{story_id}': {e}"); - } - } - crate::io::story_metadata::QaMode::Human => { - slog!( - "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \ - qa: human — holding for human review." - ); - if let Err(e) = super::super::lifecycle::move_story_to_qa(&project_root, story_id) { - slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"); - } else { - let qa_dir = project_root.join(".storkit/work/3_qa"); - let story_path = qa_dir.join(format!("{story_id}.md")); - if let Err(e) = - crate::io::story_metadata::write_review_hold(&story_path) - { - slog_error!( - "[pipeline] Failed to set review_hold on '{story_id}': {e}" - ); - } - } - } - } - } else { - // Increment retry count and check if blocked. - let story_path = project_root - .join(".storkit/work/2_current") - .join(format!("{story_id}.md")); - if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "coder") { - // Story has exceeded retry limit — do not restart. - let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { - story_id: story_id.to_string(), - reason, - }); - } else { - slog!( - "[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting." - ); - let context = format!( - "\n\n---\n## Previous Attempt Failed\n\ - The acceptance gates failed with the following output:\n{}\n\n\ - Please review the failures above, fix the issues, and try again.", - completion.gate_output - ); - if let Err(e) = self - .start_agent(&project_root, story_id, Some(agent_name), Some(&context)) - .await - { - slog_error!( - "[pipeline] Failed to restart coder '{agent_name}' for '{story_id}': {e}" - ); - } - } - } - } - PipelineStage::Qa => { - if completion.gates_passed { - // Run coverage gate in the QA worktree before advancing to merge. - let coverage_path = worktree_path - .clone() - .unwrap_or_else(|| project_root.clone()); - let cp = coverage_path.clone(); - let coverage_result = - tokio::task::spawn_blocking(move || super::super::gates::run_coverage_gate(&cp)) - .await - .unwrap_or_else(|e| { - slog_warn!("[pipeline] Coverage gate task panicked: {e}"); - Ok((false, format!("Coverage gate task panicked: {e}"))) - }); - let (coverage_passed, coverage_output) = match coverage_result { - Ok(pair) => pair, - Err(e) => (false, e), - }; - - if coverage_passed { - // Check whether this item needs human review before merging. - let needs_human_review = { - let item_type = super::super::lifecycle::item_type_from_id(story_id); - if item_type == "spike" { - true // Spikes always need human review. - } else { - let qa_dir = project_root.join(".storkit/work/3_qa"); - let story_path = qa_dir.join(format!("{story_id}.md")); - let default_qa = config.default_qa_mode(); - matches!( - crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa), - crate::io::story_metadata::QaMode::Human - ) - } - }; - - if needs_human_review { - // Hold in 3_qa/ for human review. - let qa_dir = project_root.join(".storkit/work/3_qa"); - let story_path = qa_dir.join(format!("{story_id}.md")); - if let Err(e) = - crate::io::story_metadata::write_review_hold(&story_path) - { - slog_error!( - "[pipeline] Failed to set review_hold on '{story_id}': {e}" - ); - } - slog!( - "[pipeline] QA passed for '{story_id}'. \ - Holding for human review. \ - Worktree preserved at: {worktree_path:?}" - ); - } else { - slog!( - "[pipeline] QA passed gates and coverage for '{story_id}'. \ - Moving directly to merge." - ); - if let Err(e) = - super::super::lifecycle::move_story_to_merge(&project_root, story_id) - { - slog_error!( - "[pipeline] Failed to move '{story_id}' to 4_merge/: {e}" - ); - } else if let Err(e) = self - .start_agent(&project_root, story_id, Some("mergemaster"), None) - .await - { - slog_error!( - "[pipeline] Failed to start mergemaster for '{story_id}': {e}" - ); - } - } - } else { - let story_path = project_root - .join(".storkit/work/3_qa") - .join(format!("{story_id}.md")); - if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "qa-coverage") { - // Story has exceeded retry limit — do not restart. - let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { - story_id: story_id.to_string(), - reason, - }); - } else { - slog!( - "[pipeline] QA coverage gate failed for '{story_id}'. Restarting QA." - ); - let context = format!( - "\n\n---\n## Coverage Gate Failed\n\ - The coverage gate (script/test_coverage) failed with the following output:\n{}\n\n\ - Please improve test coverage until the coverage gate passes.", - coverage_output - ); - if let Err(e) = self - .start_agent(&project_root, story_id, Some("qa"), Some(&context)) - .await - { - slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}"); - } - } - } - } else { - let story_path = project_root - .join(".storkit/work/3_qa") - .join(format!("{story_id}.md")); - if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "qa") { - // Story has exceeded retry limit — do not restart. - let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { - story_id: story_id.to_string(), - reason, - }); - } else { - slog!("[pipeline] QA failed gates for '{story_id}'. Restarting."); - let context = format!( - "\n\n---\n## Previous QA Attempt Failed\n\ - The acceptance gates failed with the following output:\n{}\n\n\ - Please re-run and fix the issues.", - completion.gate_output - ); - if let Err(e) = self - .start_agent(&project_root, story_id, Some("qa"), Some(&context)) - .await - { - slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}"); - } - } - } - } - PipelineStage::Mergemaster => { - // Block advancement if the mergemaster explicitly reported a failure. - // The server-owned gate check runs in the feature-branch worktree (not - // master), so `gates_passed=true` is misleading when no code was merged. - if merge_failure_reported { - slog!( - "[pipeline] Pipeline advancement blocked for '{story_id}': \ - mergemaster explicitly reported a merge failure. \ - Story stays in 4_merge/ for human review." - ); - } else { - // Run script/test on master (project_root) as the post-merge verification. - slog!( - "[pipeline] Mergemaster completed for '{story_id}'. Running post-merge tests on master." - ); - let root = project_root.clone(); - let test_result = - tokio::task::spawn_blocking(move || super::super::gates::run_project_tests(&root)) - .await - .unwrap_or_else(|e| { - slog_warn!("[pipeline] Post-merge test task panicked: {e}"); - Ok((false, format!("Test task panicked: {e}"))) - }); - let (passed, output) = match test_result { - Ok(pair) => pair, - Err(e) => (false, e), - }; - - if passed { - slog!( - "[pipeline] Post-merge tests passed for '{story_id}'. Moving to done." - ); - if let Err(e) = - super::super::lifecycle::move_story_to_archived(&project_root, story_id) - { - slog_error!("[pipeline] Failed to move '{story_id}' to done: {e}"); - } - self.remove_agents_for_story(story_id); - // TODO: Re-enable worktree cleanup once we have persistent agent logs. - // Removing worktrees destroys evidence needed to debug empty-commit agents. - // let config = - // crate::config::ProjectConfig::load(&project_root).unwrap_or_default(); - // if let Err(e) = - // worktree::remove_worktree_by_story_id(&project_root, story_id, &config) - // .await - // { - // slog!( - // "[pipeline] Failed to remove worktree for '{story_id}': {e}" - // ); - // } - slog!( - "[pipeline] Story '{story_id}' done. Worktree preserved for inspection." - ); - } else { - let story_path = project_root - .join(".storkit/work/4_merge") - .join(format!("{story_id}.md")); - if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "mergemaster") { - // Story has exceeded retry limit — do not restart. - let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { - story_id: story_id.to_string(), - reason, - }); - } else { - slog!( - "[pipeline] Post-merge tests failed for '{story_id}'. Restarting mergemaster." - ); - let context = format!( - "\n\n---\n## Post-Merge Test Failed\n\ - The tests on master failed with the following output:\n{}\n\n\ - Please investigate and resolve the failures, then call merge_agent_work again.", - output - ); - if let Err(e) = self - .start_agent( - &project_root, - story_id, - Some("mergemaster"), - Some(&context), - ) - .await - { - slog_error!( - "[pipeline] Failed to restart mergemaster for '{story_id}': {e}" - ); - } - } - } - } - } - } - - // Always scan for unassigned work after any agent completes, regardless - // of the outcome (success, failure, restart). This ensures stories that - // failed agent assignment due to busy agents are retried when agents - // become available (bug 295). - self.auto_assign_available_work(&project_root).await; - } - - /// Internal: report that an agent has finished work on a story. - /// - /// **Note:** This is no longer exposed as an MCP tool. The server now - /// automatically runs completion gates when an agent process exits - /// (see `run_server_owned_completion`). This method is retained for - /// backwards compatibility and testing. - /// - /// - Rejects with an error if the worktree has uncommitted changes. - /// - Runs acceptance gates (cargo clippy + cargo nextest run / cargo test). - /// - Stores the `CompletionReport` on the agent record. - /// - Transitions status to `Completed` (gates passed) or `Failed` (gates failed). - /// - Emits a `Done` event so `wait_for_agent` unblocks. - #[allow(dead_code)] - pub async fn report_completion( - &self, - story_id: &str, - agent_name: &str, - summary: &str, - ) -> Result { - let key = composite_key(story_id, agent_name); - - // Verify agent exists, is Running, and grab its worktree path. - let worktree_path = { - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let agent = agents - .get(&key) - .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; - - if agent.status != AgentStatus::Running { - return Err(format!( - "Agent '{agent_name}' for story '{story_id}' is not running (status: {}). \ - report_completion can only be called by a running agent.", - agent.status - )); - } - - agent - .worktree_info - .as_ref() - .map(|wt| wt.path.clone()) - .ok_or_else(|| { - format!( - "Agent '{agent_name}' for story '{story_id}' has no worktree. \ - Cannot run acceptance gates." - ) - })? - }; - - let path = worktree_path.clone(); - - // Run gate checks in a blocking thread to avoid stalling the async runtime. - let (gates_passed, gate_output) = tokio::task::spawn_blocking(move || { - // Step 1: Reject if worktree is dirty. - super::super::gates::check_uncommitted_changes(&path)?; - // Step 2: Run clippy + tests and return (passed, output). - super::super::gates::run_acceptance_gates(&path) - }) - .await - .map_err(|e| format!("Gate check task panicked: {e}"))??; - - let report = CompletionReport { - summary: summary.to_string(), - gates_passed, - gate_output, - }; - - // Extract data for pipeline advance, then remove the entry so - // completed agents never appear in list_agents. - let ( - tx, - session_id, - project_root_for_advance, - wt_path_for_advance, - merge_failure_reported_for_advance, - ) = { - let mut agents = self.agents.lock().map_err(|e| e.to_string())?; - let agent = agents.get_mut(&key).ok_or_else(|| { - format!("Agent '{agent_name}' for story '{story_id}' disappeared during gate check") - })?; - agent.completion = Some(report.clone()); - let tx = agent.tx.clone(); - let sid = agent.session_id.clone(); - let pr = agent.project_root.clone(); - let wt = agent.worktree_info.as_ref().map(|w| w.path.clone()); - let mfr = agent.merge_failure_reported; - agents.remove(&key); - (tx, sid, pr, wt, mfr) - }; - - // Emit Done so wait_for_agent unblocks. - let _ = tx.send(AgentEvent::Done { - story_id: story_id.to_string(), - agent_name: agent_name.to_string(), - session_id, - }); - - // Notify WebSocket clients that the agent is gone. - Self::notify_agent_state_changed(&self.watcher_tx); - - // Advance the pipeline state machine in a background task. - let pool_clone = Self { - agents: Arc::clone(&self.agents), - port: self.port, - child_killers: Arc::clone(&self.child_killers), - watcher_tx: self.watcher_tx.clone(), - merge_jobs: Arc::clone(&self.merge_jobs), - }; - let sid = story_id.to_string(); - let aname = agent_name.to_string(); - let report_for_advance = report.clone(); - tokio::spawn(async move { - pool_clone - .run_pipeline_advance( - &sid, - &aname, - report_for_advance, - project_root_for_advance, - wt_path_for_advance, - merge_failure_reported_for_advance, - ) - .await; - }); - - Ok(report) - } - - /// Start the merge pipeline as a background task. - /// - /// Returns immediately so the MCP tool call doesn't time out (the full - /// pipeline — squash merge + quality gates — takes well over 60 seconds, - /// exceeding Claude Code's MCP tool-call timeout). - /// - /// The mergemaster agent should poll [`get_merge_status`](Self::get_merge_status) - /// until the job reaches a terminal state. - pub fn start_merge_agent_work( - self: &Arc, - project_root: &Path, - story_id: &str, - ) -> Result<(), String> { - // Guard against double-starts. - { - let jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?; - if let Some(job) = jobs.get(story_id) - && matches!(job.status, super::super::merge::MergeJobStatus::Running) - { - return Err(format!( - "Merge already in progress for '{story_id}'. \ - Use get_merge_status to poll for completion." - )); - } - } - - // Insert Running job. - { - let mut jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?; - jobs.insert( - story_id.to_string(), - super::super::merge::MergeJob { - story_id: story_id.to_string(), - status: super::super::merge::MergeJobStatus::Running, - }, - ); - } - - let pool = Arc::clone(self); - let root = project_root.to_path_buf(); - let sid = story_id.to_string(); - - tokio::spawn(async move { - let report = pool.run_merge_pipeline(&root, &sid).await; - let failed = report.is_err(); - let status = match report { - Ok(r) => super::super::merge::MergeJobStatus::Completed(r), - Err(e) => super::super::merge::MergeJobStatus::Failed(e), - }; - if let Ok(mut jobs) = pool.merge_jobs.lock() - && let Some(job) = jobs.get_mut(&sid) - { - job.status = status; - } - if failed { - pool.auto_assign_available_work(&root).await; - } - }); - - Ok(()) - } - - /// The actual merge pipeline, run inside a background task. - async fn run_merge_pipeline( - self: &Arc, - project_root: &Path, - story_id: &str, - ) -> Result { - let branch = format!("feature/story-{story_id}"); - let wt_path = worktree::worktree_path(project_root, story_id); - let root = project_root.to_path_buf(); - let sid = story_id.to_string(); - let br = branch.clone(); - - let merge_result = - tokio::task::spawn_blocking(move || super::super::merge::run_squash_merge(&root, &br, &sid)) - .await - .map_err(|e| format!("Merge task panicked: {e}"))??; - - if !merge_result.success { - return Ok(super::super::merge::MergeReport { - story_id: story_id.to_string(), - success: false, - had_conflicts: merge_result.had_conflicts, - conflicts_resolved: merge_result.conflicts_resolved, - conflict_details: merge_result.conflict_details, - gates_passed: merge_result.gates_passed, - gate_output: merge_result.output, - worktree_cleaned_up: false, - story_archived: false, - }); - } - - let story_archived = - super::super::lifecycle::move_story_to_archived(project_root, story_id).is_ok(); - if story_archived { - self.remove_agents_for_story(story_id); - } - - let worktree_cleaned_up = if wt_path.exists() { - let config = crate::config::ProjectConfig::load(project_root).unwrap_or_default(); - worktree::remove_worktree_by_story_id(project_root, story_id, &config) - .await - .is_ok() - } else { - false - }; - - self.auto_assign_available_work(project_root).await; - - Ok(super::super::merge::MergeReport { - story_id: story_id.to_string(), - success: true, - had_conflicts: merge_result.had_conflicts, - conflicts_resolved: merge_result.conflicts_resolved, - conflict_details: merge_result.conflict_details, - gates_passed: true, - gate_output: merge_result.output, - worktree_cleaned_up, - story_archived, - }) - } - - /// Check the status of a background merge job. - pub fn get_merge_status(&self, story_id: &str) -> Option { - self.merge_jobs - .lock() - .ok() - .and_then(|jobs| jobs.get(story_id).cloned()) - } - - /// Record that the mergemaster agent for `story_id` explicitly reported a - /// merge failure via the `report_merge_failure` MCP tool. - /// - /// Sets `merge_failure_reported = true` on the active mergemaster agent so - /// that `run_pipeline_advance` can block advancement to `5_done/` even when - /// the server-owned gate check returns `gates_passed=true` (those gates run - /// in the feature-branch worktree, not on master). - pub fn set_merge_failure_reported(&self, story_id: &str) { - match self.agents.lock() { - Ok(mut lock) => { - let found = lock.iter_mut().find(|(key, agent)| { - let key_story_id = key - .rsplit_once(':') - .map(|(sid, _)| sid) - .unwrap_or(key.as_str()); - key_story_id == story_id - && pipeline_stage(&agent.agent_name) == PipelineStage::Mergemaster - }); - match found { - Some((_, agent)) => { - agent.merge_failure_reported = true; - slog!( - "[pipeline] Merge failure flag set for '{story_id}:{}'", - agent.agent_name - ); - } - None => { - slog_warn!( - "[pipeline] set_merge_failure_reported: no running mergemaster found \ - for story '{story_id}' — flag not set" - ); - } - } - } - Err(e) => { - slog_error!("[pipeline] set_merge_failure_reported: could not lock agents: {e}"); - } - } - } -} - -/// Server-owned completion: runs acceptance gates when an agent process exits -/// normally, and advances the pipeline based on results. -/// -/// This is a **free function** (not a method on `AgentPool`) to break the -/// opaque type cycle that would otherwise arise: `start_agent` → spawned task -/// → server-owned completion → pipeline advance → `start_agent`. -/// -/// If the agent already has a completion report (e.g. from a legacy -/// `report_completion` call), this is a no-op to avoid double-running gates. -pub(super) async fn run_server_owned_completion( - agents: &Arc>>, - port: u16, - story_id: &str, - agent_name: &str, - session_id: Option, - watcher_tx: broadcast::Sender, -) { - let key = composite_key(story_id, agent_name); - - // Guard: skip if completion was already recorded (legacy path). - { - let lock = match agents.lock() { - Ok(a) => a, - Err(_) => return, - }; - match lock.get(&key) { - Some(agent) if agent.completion.is_some() => { - slog!( - "[agents] Completion already recorded for '{story_id}:{agent_name}'; \ - skipping server-owned gates." - ); - return; - } - Some(_) => {} - None => return, - } - } - - // Get worktree path for running gates. - let worktree_path = { - let lock = match agents.lock() { - Ok(a) => a, - Err(_) => return, - }; - lock.get(&key) - .and_then(|a| a.worktree_info.as_ref().map(|wt| wt.path.clone())) - }; - - // Run acceptance gates. - let (gates_passed, gate_output) = if let Some(wt_path) = worktree_path { - let path = wt_path; - match tokio::task::spawn_blocking(move || { - super::super::gates::check_uncommitted_changes(&path)?; - // AC5: Fail early if the coder finished with no commits on the feature branch. - // This prevents empty-diff stories from advancing through QA to merge. - if !super::super::gates::worktree_has_committed_work(&path) { - return Ok(( - false, - "Agent exited with no commits on the feature branch. \ - The agent did not produce any code changes." - .to_string(), - )); - } - super::super::gates::run_acceptance_gates(&path) - }) - .await - { - Ok(Ok(result)) => result, - Ok(Err(e)) => (false, e), - Err(e) => (false, format!("Gate check task panicked: {e}")), - } - } else { - ( - false, - "No worktree path available to run acceptance gates".to_string(), - ) - }; - - slog!( - "[agents] Server-owned completion for '{story_id}:{agent_name}': gates_passed={gates_passed}" - ); - - let report = CompletionReport { - summary: "Agent process exited normally".to_string(), - gates_passed, - gate_output, - }; - - // Store completion report, extract data for pipeline advance, then - // remove the entry so completed agents never appear in list_agents. - let (tx, project_root_for_advance, wt_path_for_advance, merge_failure_reported_for_advance) = { - let mut lock = match agents.lock() { - Ok(a) => a, - Err(_) => return, - }; - let agent = match lock.get_mut(&key) { - Some(a) => a, - None => return, - }; - agent.completion = Some(report.clone()); - agent.session_id = session_id.clone(); - let tx = agent.tx.clone(); - let pr = agent.project_root.clone(); - let wt = agent.worktree_info.as_ref().map(|w| w.path.clone()); - let mfr = agent.merge_failure_reported; - lock.remove(&key); - (tx, pr, wt, mfr) - }; - - // Emit Done so wait_for_agent unblocks. - let _ = tx.send(AgentEvent::Done { - story_id: story_id.to_string(), - agent_name: agent_name.to_string(), - session_id, - }); - - // Notify WebSocket clients that the agent is gone. - AgentPool::notify_agent_state_changed(&watcher_tx); - - // Advance the pipeline state machine in a background task. - spawn_pipeline_advance( - Arc::clone(agents), - port, - story_id, - agent_name, - report, - project_root_for_advance, - wt_path_for_advance, - watcher_tx, - merge_failure_reported_for_advance, - ); -} - -/// Spawn pipeline advancement as a background task. -/// -/// This is a **non-async** function so it does not participate in the opaque -/// type cycle between `start_agent` and `run_server_owned_completion`. -#[allow(clippy::too_many_arguments)] -fn spawn_pipeline_advance( - agents: Arc>>, - port: u16, - story_id: &str, - agent_name: &str, - completion: CompletionReport, - project_root: Option, - worktree_path: Option, - watcher_tx: broadcast::Sender, - merge_failure_reported: bool, -) { - let sid = story_id.to_string(); - let aname = agent_name.to_string(); - tokio::spawn(async move { - let pool = AgentPool { - agents, - port, - child_killers: Arc::new(Mutex::new(HashMap::new())), - watcher_tx, - merge_jobs: Arc::new(Mutex::new(HashMap::new())), - }; - pool.run_pipeline_advance( - &sid, - &aname, - completion, - project_root, - worktree_path, - merge_failure_reported, - ) - .await; - }); -} - -/// Increment retry_count and block the story if it exceeds `max_retries`. -/// -/// Returns `Some(reason)` if the story is now blocked (caller should NOT restart the agent). -/// Returns `None` if the story may be retried. -/// When `max_retries` is 0, retry limits are disabled. -fn should_block_story(story_path: &Path, max_retries: u32, story_id: &str, stage_label: &str) -> Option { - use crate::io::story_metadata::{increment_retry_count, write_blocked}; - - if max_retries == 0 { - // Retry limits disabled. - return None; - } - - match increment_retry_count(story_path) { - Ok(new_count) => { - if new_count >= max_retries { - slog_warn!( - "[pipeline] Story '{story_id}' reached retry limit ({new_count}/{max_retries}) \ - at {stage_label} stage. Marking as blocked." - ); - if let Err(e) = write_blocked(story_path) { - slog_error!("[pipeline] Failed to write blocked flag for '{story_id}': {e}"); - } - Some(format!( - "Retry limit exceeded ({new_count}/{max_retries}) at {stage_label} stage" - )) - } else { - slog!( - "[pipeline] Story '{story_id}' retry {new_count}/{max_retries} at {stage_label} stage." - ); - None - } - } - Err(e) => { - slog_error!("[pipeline] Failed to increment retry_count for '{story_id}': {e}"); - None // Don't block on error — allow retry. - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use super::super::AgentPool; - use crate::agents::merge::{MergeJob, MergeJobStatus}; - use crate::agents::{AgentEvent, AgentStatus, CompletionReport}; - use crate::io::watcher::WatcherEvent; - use std::path::PathBuf; - use std::process::Command; - - fn init_git_repo(repo: &std::path::Path) { - Command::new("git") - .args(["init"]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["config", "user.email", "test@test.com"]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["config", "user.name", "Test"]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "--allow-empty", "-m", "init"]) - .current_dir(repo) - .output() - .unwrap(); - } - - // ── report_completion tests ──────────────────────────────────── - - #[tokio::test] - async fn report_completion_rejects_nonexistent_agent() { - let pool = AgentPool::new_test(3001); - let result = pool.report_completion("no_story", "no_bot", "done").await; - assert!(result.is_err()); - let msg = result.unwrap_err(); - assert!(msg.contains("No agent"), "unexpected: {msg}"); - } - - #[tokio::test] - async fn report_completion_rejects_non_running_agent() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("s6", "bot", AgentStatus::Completed); - - let result = pool.report_completion("s6", "bot", "done").await; - assert!(result.is_err()); - let msg = result.unwrap_err(); - assert!( - msg.contains("not running"), - "expected 'not running' in: {msg}" - ); - } - - #[tokio::test] - async fn report_completion_rejects_dirty_worktree() { - use std::fs; - use tempfile::tempdir; - - let tmp = tempdir().unwrap(); - let repo = tmp.path(); - - // Init a real git repo and make an initial commit - Command::new("git") - .args(["init"]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "--allow-empty", "-m", "init"]) - .current_dir(repo) - .output() - .unwrap(); - - // Write an uncommitted file - fs::write(repo.join("dirty.txt"), "not committed").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_path("s7", "bot", AgentStatus::Running, repo.to_path_buf()); - - let result = pool.report_completion("s7", "bot", "done").await; - assert!(result.is_err()); - let msg = result.unwrap_err(); - assert!( - msg.contains("uncommitted"), - "expected 'uncommitted' in: {msg}" - ); - } - - // ── server-owned completion tests ─────────────────────────────────────────── - - #[tokio::test] - async fn server_owned_completion_skips_when_already_completed() { - let pool = AgentPool::new_test(3001); - let report = CompletionReport { - summary: "Already done".to_string(), - gates_passed: true, - gate_output: String::new(), - }; - pool.inject_test_agent_with_completion( - "s10", - "coder-1", - AgentStatus::Completed, - PathBuf::from("/tmp/nonexistent"), - report, - ); - - // Subscribe before calling so we can check if Done event was emitted. - let mut rx = pool.subscribe("s10", "coder-1").unwrap(); - - run_server_owned_completion( - &pool.agents, - pool.port, - "s10", - "coder-1", - Some("sess-1".to_string()), - pool.watcher_tx.clone(), - ) - .await; - - // Status should remain Completed (unchanged) — no gate re-run. - let agents = pool.agents.lock().unwrap(); - let key = composite_key("s10", "coder-1"); - let agent = agents.get(&key).unwrap(); - assert_eq!(agent.status, AgentStatus::Completed); - // Summary should still be the original, not overwritten. - assert_eq!(agent.completion.as_ref().unwrap().summary, "Already done"); - drop(agents); - - // No Done event should have been emitted. - assert!( - rx.try_recv().is_err(), - "should not emit Done when completion already exists" - ); - } - - #[tokio::test] - async fn server_owned_completion_runs_gates_on_clean_worktree() { - use tempfile::tempdir; - - let tmp = tempdir().unwrap(); - let repo = tmp.path(); - init_git_repo(repo); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_path( - "s11", - "coder-1", - AgentStatus::Running, - repo.to_path_buf(), - ); - - let mut rx = pool.subscribe("s11", "coder-1").unwrap(); - - run_server_owned_completion( - &pool.agents, - pool.port, - "s11", - "coder-1", - Some("sess-2".to_string()), - pool.watcher_tx.clone(), - ) - .await; - - // Agent entry should be removed from the map after completion. - let agents = pool.agents.lock().unwrap(); - let key = composite_key("s11", "coder-1"); - assert!( - agents.get(&key).is_none(), - "agent should be removed from map after completion" - ); - drop(agents); - - // A Done event should have been emitted with the session_id. - let event = rx.try_recv().expect("should emit Done event"); - match &event { - AgentEvent::Done { session_id, .. } => { - assert_eq!(*session_id, Some("sess-2".to_string())); - } - other => panic!("expected Done event, got: {other:?}"), - } - } - - #[tokio::test] - async fn server_owned_completion_fails_on_dirty_worktree() { - use std::fs; - use tempfile::tempdir; - - let tmp = tempdir().unwrap(); - let repo = tmp.path(); - init_git_repo(repo); - // Create an uncommitted file. - fs::write(repo.join("dirty.txt"), "not committed").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_path( - "s12", - "coder-1", - AgentStatus::Running, - repo.to_path_buf(), - ); - - let mut rx = pool.subscribe("s12", "coder-1").unwrap(); - - run_server_owned_completion( - &pool.agents, - pool.port, - "s12", - "coder-1", - None, - pool.watcher_tx.clone(), - ) - .await; - - // Agent entry should be removed from the map after completion (even on failure). - let agents = pool.agents.lock().unwrap(); - let key = composite_key("s12", "coder-1"); - assert!( - agents.get(&key).is_none(), - "agent should be removed from map after failed completion" - ); - drop(agents); - - // A Done event should have been emitted. - let event = rx.try_recv().expect("should emit Done event"); - assert!( - matches!(event, AgentEvent::Done { .. }), - "expected Done event, got: {event:?}" - ); - } - - #[tokio::test] - async fn server_owned_completion_nonexistent_agent_is_noop() { - let pool = AgentPool::new_test(3001); - // Should not panic or error — just silently return. - run_server_owned_completion( - &pool.agents, - pool.port, - "nonexistent", - "bot", - None, - pool.watcher_tx.clone(), - ) - .await; - } - - // ── pipeline advance tests ──────────────────────────────────────────────── - - #[tokio::test] - async fn pipeline_advance_coder_gates_pass_server_qa_moves_to_merge() { - use std::fs; - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - // Set up story in 2_current/ (no qa frontmatter → uses project default "server") - let current = root.join(".storkit/work/2_current"); - fs::create_dir_all(¤t).unwrap(); - fs::write(current.join("50_story_test.md"), "test").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.run_pipeline_advance( - "50_story_test", - "coder-1", - CompletionReport { - summary: "done".to_string(), - gates_passed: true, - gate_output: String::new(), - }, - Some(root.to_path_buf()), - None, - false, - ) - .await; - - // With default qa: server, story skips QA and goes straight to 4_merge/ - assert!( - root.join(".storkit/work/4_merge/50_story_test.md") - .exists(), - "story should be in 4_merge/" - ); - assert!( - !current.join("50_story_test.md").exists(), - "story should not still be in 2_current/" - ); - } - - #[tokio::test] - async fn pipeline_advance_coder_gates_pass_agent_qa_moves_to_qa() { - use std::fs; - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - // Set up story in 2_current/ with qa: agent frontmatter - let current = root.join(".storkit/work/2_current"); - fs::create_dir_all(¤t).unwrap(); - fs::write( - current.join("50_story_test.md"), - "---\nname: Test\nqa: agent\n---\ntest", - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - pool.run_pipeline_advance( - "50_story_test", - "coder-1", - CompletionReport { - summary: "done".to_string(), - gates_passed: true, - gate_output: String::new(), - }, - Some(root.to_path_buf()), - None, - false, - ) - .await; - - // With qa: agent, story should move to 3_qa/ - assert!( - root.join(".storkit/work/3_qa/50_story_test.md").exists(), - "story should be in 3_qa/" - ); - assert!( - !current.join("50_story_test.md").exists(), - "story should not still be in 2_current/" - ); - } - - #[tokio::test] - async fn pipeline_advance_qa_gates_pass_moves_story_to_merge() { - use std::fs; - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - // Set up story in 3_qa/ - let qa_dir = root.join(".storkit/work/3_qa"); - fs::create_dir_all(&qa_dir).unwrap(); - // qa: server so the story skips human review and goes straight to merge. - fs::write( - qa_dir.join("51_story_test.md"), - "---\nname: Test\nqa: server\n---\ntest", - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - pool.run_pipeline_advance( - "51_story_test", - "qa", - CompletionReport { - summary: "QA done".to_string(), - gates_passed: true, - gate_output: String::new(), - }, - Some(root.to_path_buf()), - None, - false, - ) - .await; - - // Story should have moved to 4_merge/ - assert!( - root.join(".storkit/work/4_merge/51_story_test.md") - .exists(), - "story should be in 4_merge/" - ); - assert!( - !qa_dir.join("51_story_test.md").exists(), - "story should not still be in 3_qa/" - ); - } - - #[tokio::test] - async fn pipeline_advance_supervisor_does_not_advance() { - use std::fs; - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let current = root.join(".storkit/work/2_current"); - fs::create_dir_all(¤t).unwrap(); - fs::write(current.join("52_story_test.md"), "test").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.run_pipeline_advance( - "52_story_test", - "supervisor", - CompletionReport { - summary: "supervised".to_string(), - gates_passed: true, - gate_output: String::new(), - }, - Some(root.to_path_buf()), - None, - false, - ) - .await; - - // Story should NOT have moved (supervisors don't advance pipeline) - assert!( - current.join("52_story_test.md").exists(), - "story should still be in 2_current/ for supervisor" - ); - } - - #[tokio::test] - async fn pipeline_advance_sends_agent_state_changed_to_watcher_tx() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - // Set up story in 2_current/ - let current = root.join(".storkit/work/2_current"); - fs::create_dir_all(¤t).unwrap(); - fs::write(current.join("173_story_test.md"), "test").unwrap(); - // Ensure 3_qa/ exists for the move target - fs::create_dir_all(root.join(".storkit/work/3_qa")).unwrap(); - // Ensure 1_backlog/ exists (start_agent calls move_story_to_current) - fs::create_dir_all(root.join(".storkit/work/1_backlog")).unwrap(); - - // Write a project.toml with a qa agent so start_agent can resolve it. - fs::create_dir_all(root.join(".storkit")).unwrap(); - fs::write( - root.join(".storkit/project.toml"), - r#" -default_qa = "agent" - -[[agent]] -name = "coder-1" -role = "Coder" -command = "echo" -args = ["noop"] -prompt = "test" -stage = "coder" - -[[agent]] -name = "qa" -role = "QA" -command = "echo" -args = ["noop"] -prompt = "test" -stage = "qa" -"#, - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - // Subscribe to the watcher channel BEFORE the pipeline advance. - let mut rx = pool.watcher_tx.subscribe(); - - pool.run_pipeline_advance( - "173_story_test", - "coder-1", - CompletionReport { - summary: "done".to_string(), - gates_passed: true, - gate_output: String::new(), - }, - Some(root.to_path_buf()), - None, - false, - ) - .await; - - // The pipeline advance should have sent AgentStateChanged events via - // the pool's watcher_tx (not a dummy channel). Collect all events. - let mut got_agent_state_changed = false; - while let Ok(evt) = rx.try_recv() { - if matches!(evt, WatcherEvent::AgentStateChanged) { - got_agent_state_changed = true; - break; - } - } - - assert!( - got_agent_state_changed, - "pipeline advance should send AgentStateChanged through the real watcher_tx \ - (bug 173: lozenges must update when agents are assigned during pipeline advance)" - ); - } - - // ── merge_agent_work tests ──────────────────────────────────────────────── - - /// Helper: start a merge and poll until terminal state. - async fn run_merge_to_completion( - pool: &Arc, - repo: &std::path::Path, - story_id: &str, - ) -> MergeJob { - pool.start_merge_agent_work(repo, story_id).unwrap(); - loop { - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - if let Some(job) = pool.get_merge_status(story_id) - && !matches!(job.status, MergeJobStatus::Running) - { - return job; - } - } - } - - #[tokio::test] - async fn merge_agent_work_returns_error_when_branch_not_found() { - use tempfile::tempdir; - - let tmp = tempdir().unwrap(); - let repo = tmp.path(); - init_git_repo(repo); - - let pool = Arc::new(AgentPool::new_test(3001)); - let job = run_merge_to_completion(&pool, repo, "99_nonexistent").await; - match &job.status { - MergeJobStatus::Completed(report) => { - assert!(!report.success, "should fail when branch missing"); - } - MergeJobStatus::Failed(_) => { - // Also acceptable — the pipeline errored out - } - MergeJobStatus::Running => { - panic!("should not still be running"); - } - } - } - - #[tokio::test] - async fn merge_agent_work_succeeds_on_clean_branch() { - use std::fs; - use tempfile::tempdir; - - let tmp = tempdir().unwrap(); - let repo = tmp.path(); - init_git_repo(repo); - - // Create a feature branch with a commit - Command::new("git") - .args(["checkout", "-b", "feature/story-23_test"]) - .current_dir(repo) - .output() - .unwrap(); - fs::write(repo.join("feature.txt"), "feature content").unwrap(); - Command::new("git") - .args(["add", "."]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "-m", "add feature"]) - .current_dir(repo) - .output() - .unwrap(); - - // Switch back to master (initial branch) - Command::new("git") - .args(["checkout", "master"]) - .current_dir(repo) - .output() - .unwrap(); - - // Create the story file in 4_merge/ so we can test archival - let merge_dir = repo.join(".storkit/work/4_merge"); - fs::create_dir_all(&merge_dir).unwrap(); - let story_file = merge_dir.join("23_test.md"); - fs::write(&story_file, "---\nname: Test\n---\n").unwrap(); - Command::new("git") - .args(["add", "."]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "-m", "add story in merge"]) - .current_dir(repo) - .output() - .unwrap(); - - let pool = Arc::new(AgentPool::new_test(3001)); - let job = run_merge_to_completion(&pool, repo, "23_test").await; - - match &job.status { - MergeJobStatus::Completed(report) => { - assert!(!report.had_conflicts, "should have no conflicts"); - assert!( - report.success - || report.gate_output.contains("Failed to run") - || !report.gates_passed, - "report should be coherent: {report:?}" - ); - if report.story_archived { - let done = repo.join(".storkit/work/5_done/23_test.md"); - assert!(done.exists(), "done file should exist"); - } - } - MergeJobStatus::Failed(e) => { - // Gate failures are acceptable in test env - assert!( - e.contains("Failed") || e.contains("failed"), - "unexpected failure: {e}" - ); - } - MergeJobStatus::Running => panic!("should not still be running"), - } - } - - // ── quality gate ordering test ──────────────────────────────── - - /// Regression test for bug 142: quality gates must run BEFORE the fast-forward - /// to master so that broken code never lands on master. - #[cfg(unix)] - #[test] - fn quality_gates_run_before_fast_forward_to_master() { - use std::fs; - use std::os::unix::fs::PermissionsExt; - use tempfile::tempdir; - - let tmp = tempdir().unwrap(); - let repo = tmp.path(); - init_git_repo(repo); - - // Add a failing script/test so quality gates will fail. - let script_dir = repo.join("script"); - fs::create_dir_all(&script_dir).unwrap(); - let script_test = script_dir.join("test"); - fs::write(&script_test, "#!/usr/bin/env bash\nexit 1\n").unwrap(); - let mut perms = fs::metadata(&script_test).unwrap().permissions(); - perms.set_mode(0o755); - fs::set_permissions(&script_test, perms).unwrap(); - Command::new("git") - .args(["add", "."]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "-m", "add failing script/test"]) - .current_dir(repo) - .output() - .unwrap(); - - // Create a feature branch with a commit. - Command::new("git") - .args(["checkout", "-b", "feature/story-142_test"]) - .current_dir(repo) - .output() - .unwrap(); - fs::write(repo.join("change.txt"), "feature change").unwrap(); - Command::new("git") - .args(["add", "."]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "-m", "feature work"]) - .current_dir(repo) - .output() - .unwrap(); - - // Switch back to master and record its HEAD. - Command::new("git") - .args(["checkout", "master"]) - .current_dir(repo) - .output() - .unwrap(); - let head_before = String::from_utf8( - Command::new("git") - .args(["rev-parse", "HEAD"]) - .current_dir(repo) - .output() - .unwrap() - .stdout, - ) - .unwrap() - .trim() - .to_string(); - - // Run the squash-merge. The failing script/test makes quality gates - // fail → fast-forward must NOT happen. - let result = - crate::agents::merge::run_squash_merge(repo, "feature/story-142_test", "142_test") - .unwrap(); - - let head_after = String::from_utf8( - Command::new("git") - .args(["rev-parse", "HEAD"]) - .current_dir(repo) - .output() - .unwrap() - .stdout, - ) - .unwrap() - .trim() - .to_string(); - - // Gates must have failed (script/test exits 1) so master should be untouched. - assert!( - !result.success, - "run_squash_merge must report failure when gates fail" - ); - assert_eq!( - head_before, head_after, - "master HEAD must not advance when quality gates fail (bug 142)" - ); - } - - #[tokio::test] - async fn merge_agent_work_conflict_does_not_break_master() { - use std::fs; - use tempfile::tempdir; - - let tmp = tempdir().unwrap(); - let repo = tmp.path(); - init_git_repo(repo); - - // Create a file on master. - fs::write( - repo.join("code.rs"), - "fn main() {\n println!(\"hello\");\n}\n", - ) - .unwrap(); - Command::new("git") - .args(["add", "."]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "-m", "initial code"]) - .current_dir(repo) - .output() - .unwrap(); - - // Feature branch: modify the same line differently. - Command::new("git") - .args(["checkout", "-b", "feature/story-42_story_foo"]) - .current_dir(repo) - .output() - .unwrap(); - fs::write( - repo.join("code.rs"), - "fn main() {\n println!(\"hello\");\n feature_fn();\n}\n", - ) - .unwrap(); - Command::new("git") - .args(["add", "."]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "-m", "feature: add fn call"]) - .current_dir(repo) - .output() - .unwrap(); - - // Master: add different line at same location. - Command::new("git") - .args(["checkout", "master"]) - .current_dir(repo) - .output() - .unwrap(); - fs::write( - repo.join("code.rs"), - "fn main() {\n println!(\"hello\");\n master_fn();\n}\n", - ) - .unwrap(); - Command::new("git") - .args(["add", "."]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "-m", "master: add fn call"]) - .current_dir(repo) - .output() - .unwrap(); - - // Create story file in 4_merge. - let merge_dir = repo.join(".storkit/work/4_merge"); - fs::create_dir_all(&merge_dir).unwrap(); - fs::write(merge_dir.join("42_story_foo.md"), "---\nname: Test\n---\n").unwrap(); - Command::new("git") - .args(["add", "."]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "-m", "add story"]) - .current_dir(repo) - .output() - .unwrap(); - - let pool = Arc::new(AgentPool::new_test(3001)); - let job = run_merge_to_completion(&pool, repo, "42_story_foo").await; - - // Master should NEVER have conflict markers, regardless of merge outcome. - let master_code = fs::read_to_string(repo.join("code.rs")).unwrap(); - assert!( - !master_code.contains("<<<<<<<"), - "master must never contain conflict markers:\n{master_code}" - ); - assert!( - !master_code.contains(">>>>>>>"), - "master must never contain conflict markers:\n{master_code}" - ); - - // The report should accurately reflect what happened. - match &job.status { - MergeJobStatus::Completed(report) => { - assert!(report.had_conflicts, "should report conflicts"); - } - MergeJobStatus::Failed(_) => { - // Acceptable — merge aborted due to conflicts - } - MergeJobStatus::Running => panic!("should not still be running"), - } - } - - // ── bug 295: pipeline advance picks up waiting QA stories ────────── - - #[tokio::test] - async fn pipeline_advance_picks_up_waiting_qa_stories_after_completion() { - use std::fs; - use super::super::auto_assign::is_agent_free; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk = root.join(".storkit"); - let qa_dir = sk.join("work/3_qa"); - fs::create_dir_all(&qa_dir).unwrap(); - - // Configure a single QA agent. - fs::write( - sk.join("project.toml"), - r#" -[[agent]] -name = "qa" -stage = "qa" -"#, - ) - .unwrap(); - - // Story 292 is in QA with QA agent running (will "complete" via - // run_pipeline_advance below). Story 293 is in QA with NO agent — - // simulating the "stuck" state from bug 295. - fs::write( - qa_dir.join("292_story_first.md"), - "---\nname: First\nqa: human\n---\n", - ) - .unwrap(); - fs::write( - qa_dir.join("293_story_second.md"), - "---\nname: Second\nqa: human\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - // QA is currently running on story 292. - pool.inject_test_agent("292_story_first", "qa", AgentStatus::Running); - - // Verify that 293 cannot get a QA agent right now (QA is busy). - { - let agents = pool.agents.lock().unwrap(); - assert!( - !is_agent_free(&agents, "qa"), - "qa should be busy on story 292" - ); - } - - // Simulate QA completing on story 292: remove the agent from the pool - // (as run_server_owned_completion does) then run pipeline advance. - { - let mut agents = pool.agents.lock().unwrap(); - agents.remove(&composite_key("292_story_first", "qa")); - } - - pool.run_pipeline_advance( - "292_story_first", - "qa", - CompletionReport { - summary: "QA done".to_string(), - gates_passed: true, - gate_output: String::new(), - }, - Some(root.to_path_buf()), - None, - false, - ) - .await; - - // After pipeline advance, auto_assign should have started QA on story 293. - let agents = pool.agents.lock().unwrap(); - let qa_on_293 = agents.values().any(|a| { - a.agent_name == "qa" - && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) - }); - assert!( - qa_on_293, - "auto_assign should have started qa for story 293 after 292's QA completed, \ - but no qa agent is pending/running. Pool: {:?}", - agents - .iter() - .map(|(k, a)| format!("{k}: {} ({})", a.agent_name, a.status)) - .collect::>() - ); - } -} diff --git a/server/src/agents/pool/pipeline/advance.rs b/server/src/agents/pool/pipeline/advance.rs new file mode 100644 index 00000000..8e75107c --- /dev/null +++ b/server/src/agents/pool/pipeline/advance.rs @@ -0,0 +1,785 @@ +use crate::config::ProjectConfig; +use crate::slog; +use crate::slog_error; +use crate::slog_warn; +use crate::io::watcher::WatcherEvent; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; + +use super::super::super::{ + CompletionReport, PipelineStage, + agent_config_stage, pipeline_stage, +}; +use super::super::{AgentPool, StoryAgent}; + +impl AgentPool { + /// Pipeline advancement: after an agent completes, move the story to + /// the next pipeline stage and start the appropriate agent. + pub(super) async fn run_pipeline_advance( + &self, + story_id: &str, + agent_name: &str, + completion: CompletionReport, + project_root: Option, + worktree_path: Option, + merge_failure_reported: bool, + ) { + let project_root = match project_root { + Some(p) => p, + None => { + slog_warn!("[pipeline] No project_root for '{story_id}:{agent_name}'"); + return; + } + }; + + let config = ProjectConfig::load(&project_root).unwrap_or_default(); + let stage = config + .find_agent(agent_name) + .map(agent_config_stage) + .unwrap_or_else(|| pipeline_stage(agent_name)); + + match stage { + PipelineStage::Other => { + // Supervisors and unknown agents do not advance the pipeline. + } + PipelineStage::Coder => { + if completion.gates_passed { + // Determine effective QA mode for this story. + let qa_mode = { + let item_type = crate::agents::lifecycle::item_type_from_id(story_id); + if item_type == "spike" { + crate::io::story_metadata::QaMode::Human + } else { + let default_qa = config.default_qa_mode(); + // Story is in 2_current/ when a coder completes. + let story_path = project_root + .join(".storkit/work/2_current") + .join(format!("{story_id}.md")); + crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa) + } + }; + + match qa_mode { + crate::io::story_metadata::QaMode::Server => { + slog!( + "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \ + qa: server — moving directly to merge." + ); + if let Err(e) = + crate::agents::lifecycle::move_story_to_merge(&project_root, story_id) + { + slog_error!( + "[pipeline] Failed to move '{story_id}' to 4_merge/: {e}" + ); + } else if let Err(e) = self + .start_agent(&project_root, story_id, Some("mergemaster"), None) + .await + { + slog_error!( + "[pipeline] Failed to start mergemaster for '{story_id}': {e}" + ); + } + } + crate::io::story_metadata::QaMode::Agent => { + slog!( + "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \ + qa: agent — moving to QA." + ); + if let Err(e) = crate::agents::lifecycle::move_story_to_qa(&project_root, story_id) { + slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"); + } else if let Err(e) = self + .start_agent(&project_root, story_id, Some("qa"), None) + .await + { + slog_error!("[pipeline] Failed to start qa agent for '{story_id}': {e}"); + } + } + crate::io::story_metadata::QaMode::Human => { + slog!( + "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \ + qa: human — holding for human review." + ); + if let Err(e) = crate::agents::lifecycle::move_story_to_qa(&project_root, story_id) { + slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"); + } else { + let qa_dir = project_root.join(".storkit/work/3_qa"); + let story_path = qa_dir.join(format!("{story_id}.md")); + if let Err(e) = + crate::io::story_metadata::write_review_hold(&story_path) + { + slog_error!( + "[pipeline] Failed to set review_hold on '{story_id}': {e}" + ); + } + } + } + } + } else { + // Increment retry count and check if blocked. + let story_path = project_root + .join(".storkit/work/2_current") + .join(format!("{story_id}.md")); + if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "coder") { + // Story has exceeded retry limit — do not restart. + let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { + story_id: story_id.to_string(), + reason, + }); + } else { + slog!( + "[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting." + ); + let context = format!( + "\n\n---\n## Previous Attempt Failed\n\ + The acceptance gates failed with the following output:\n{}\n\n\ + Please review the failures above, fix the issues, and try again.", + completion.gate_output + ); + if let Err(e) = self + .start_agent(&project_root, story_id, Some(agent_name), Some(&context)) + .await + { + slog_error!( + "[pipeline] Failed to restart coder '{agent_name}' for '{story_id}': {e}" + ); + } + } + } + } + PipelineStage::Qa => { + if completion.gates_passed { + // Run coverage gate in the QA worktree before advancing to merge. + let coverage_path = worktree_path + .clone() + .unwrap_or_else(|| project_root.clone()); + let cp = coverage_path.clone(); + let coverage_result = + tokio::task::spawn_blocking(move || crate::agents::gates::run_coverage_gate(&cp)) + .await + .unwrap_or_else(|e| { + slog_warn!("[pipeline] Coverage gate task panicked: {e}"); + Ok((false, format!("Coverage gate task panicked: {e}"))) + }); + let (coverage_passed, coverage_output) = match coverage_result { + Ok(pair) => pair, + Err(e) => (false, e), + }; + + if coverage_passed { + // Check whether this item needs human review before merging. + let needs_human_review = { + let item_type = crate::agents::lifecycle::item_type_from_id(story_id); + if item_type == "spike" { + true // Spikes always need human review. + } else { + let qa_dir = project_root.join(".storkit/work/3_qa"); + let story_path = qa_dir.join(format!("{story_id}.md")); + let default_qa = config.default_qa_mode(); + matches!( + crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa), + crate::io::story_metadata::QaMode::Human + ) + } + }; + + if needs_human_review { + // Hold in 3_qa/ for human review. + let qa_dir = project_root.join(".storkit/work/3_qa"); + let story_path = qa_dir.join(format!("{story_id}.md")); + if let Err(e) = + crate::io::story_metadata::write_review_hold(&story_path) + { + slog_error!( + "[pipeline] Failed to set review_hold on '{story_id}': {e}" + ); + } + slog!( + "[pipeline] QA passed for '{story_id}'. \ + Holding for human review. \ + Worktree preserved at: {worktree_path:?}" + ); + } else { + slog!( + "[pipeline] QA passed gates and coverage for '{story_id}'. \ + Moving directly to merge." + ); + if let Err(e) = + crate::agents::lifecycle::move_story_to_merge(&project_root, story_id) + { + slog_error!( + "[pipeline] Failed to move '{story_id}' to 4_merge/: {e}" + ); + } else if let Err(e) = self + .start_agent(&project_root, story_id, Some("mergemaster"), None) + .await + { + slog_error!( + "[pipeline] Failed to start mergemaster for '{story_id}': {e}" + ); + } + } + } else { + let story_path = project_root + .join(".storkit/work/3_qa") + .join(format!("{story_id}.md")); + if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "qa-coverage") { + // Story has exceeded retry limit — do not restart. + let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { + story_id: story_id.to_string(), + reason, + }); + } else { + slog!( + "[pipeline] QA coverage gate failed for '{story_id}'. Restarting QA." + ); + let context = format!( + "\n\n---\n## Coverage Gate Failed\n\ + The coverage gate (script/test_coverage) failed with the following output:\n{}\n\n\ + Please improve test coverage until the coverage gate passes.", + coverage_output + ); + if let Err(e) = self + .start_agent(&project_root, story_id, Some("qa"), Some(&context)) + .await + { + slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}"); + } + } + } + } else { + let story_path = project_root + .join(".storkit/work/3_qa") + .join(format!("{story_id}.md")); + if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "qa") { + // Story has exceeded retry limit — do not restart. + let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { + story_id: story_id.to_string(), + reason, + }); + } else { + slog!("[pipeline] QA failed gates for '{story_id}'. Restarting."); + let context = format!( + "\n\n---\n## Previous QA Attempt Failed\n\ + The acceptance gates failed with the following output:\n{}\n\n\ + Please re-run and fix the issues.", + completion.gate_output + ); + if let Err(e) = self + .start_agent(&project_root, story_id, Some("qa"), Some(&context)) + .await + { + slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}"); + } + } + } + } + PipelineStage::Mergemaster => { + // Block advancement if the mergemaster explicitly reported a failure. + // The server-owned gate check runs in the feature-branch worktree (not + // master), so `gates_passed=true` is misleading when no code was merged. + if merge_failure_reported { + slog!( + "[pipeline] Pipeline advancement blocked for '{story_id}': \ + mergemaster explicitly reported a merge failure. \ + Story stays in 4_merge/ for human review." + ); + } else { + // Run script/test on master (project_root) as the post-merge verification. + slog!( + "[pipeline] Mergemaster completed for '{story_id}'. Running post-merge tests on master." + ); + let root = project_root.clone(); + let test_result = + tokio::task::spawn_blocking(move || crate::agents::gates::run_project_tests(&root)) + .await + .unwrap_or_else(|e| { + slog_warn!("[pipeline] Post-merge test task panicked: {e}"); + Ok((false, format!("Test task panicked: {e}"))) + }); + let (passed, output) = match test_result { + Ok(pair) => pair, + Err(e) => (false, e), + }; + + if passed { + slog!( + "[pipeline] Post-merge tests passed for '{story_id}'. Moving to done." + ); + if let Err(e) = + crate::agents::lifecycle::move_story_to_archived(&project_root, story_id) + { + slog_error!("[pipeline] Failed to move '{story_id}' to done: {e}"); + } + self.remove_agents_for_story(story_id); + // TODO: Re-enable worktree cleanup once we have persistent agent logs. + // Removing worktrees destroys evidence needed to debug empty-commit agents. + // let config = + // crate::config::ProjectConfig::load(&project_root).unwrap_or_default(); + // if let Err(e) = + // worktree::remove_worktree_by_story_id(&project_root, story_id, &config) + // .await + // { + // slog!( + // "[pipeline] Failed to remove worktree for '{story_id}': {e}" + // ); + // } + slog!( + "[pipeline] Story '{story_id}' done. Worktree preserved for inspection." + ); + } else { + let story_path = project_root + .join(".storkit/work/4_merge") + .join(format!("{story_id}.md")); + if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "mergemaster") { + // Story has exceeded retry limit — do not restart. + let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked { + story_id: story_id.to_string(), + reason, + }); + } else { + slog!( + "[pipeline] Post-merge tests failed for '{story_id}'. Restarting mergemaster." + ); + let context = format!( + "\n\n---\n## Post-Merge Test Failed\n\ + The tests on master failed with the following output:\n{}\n\n\ + Please investigate and resolve the failures, then call merge_agent_work again.", + output + ); + if let Err(e) = self + .start_agent( + &project_root, + story_id, + Some("mergemaster"), + Some(&context), + ) + .await + { + slog_error!( + "[pipeline] Failed to restart mergemaster for '{story_id}': {e}" + ); + } + } + } + } + } + } + + // Always scan for unassigned work after any agent completes, regardless + // of the outcome (success, failure, restart). This ensures stories that + // failed agent assignment due to busy agents are retried when agents + // become available (bug 295). + self.auto_assign_available_work(&project_root).await; + } +} + +/// Spawn pipeline advancement as a background task. +/// +/// This is a **non-async** function so it does not participate in the opaque +/// type cycle between `start_agent` and `run_server_owned_completion`. +#[allow(clippy::too_many_arguments)] +pub(super) fn spawn_pipeline_advance( + agents: Arc>>, + port: u16, + story_id: &str, + agent_name: &str, + completion: CompletionReport, + project_root: Option, + worktree_path: Option, + watcher_tx: broadcast::Sender, + merge_failure_reported: bool, +) { + let sid = story_id.to_string(); + let aname = agent_name.to_string(); + tokio::spawn(async move { + let pool = AgentPool { + agents, + port, + child_killers: Arc::new(Mutex::new(HashMap::new())), + watcher_tx, + merge_jobs: Arc::new(Mutex::new(HashMap::new())), + }; + pool.run_pipeline_advance( + &sid, + &aname, + completion, + project_root, + worktree_path, + merge_failure_reported, + ) + .await; + }); +} + +/// Increment retry_count and block the story if it exceeds `max_retries`. +/// +/// Returns `Some(reason)` if the story is now blocked (caller should NOT restart the agent). +/// Returns `None` if the story may be retried. +/// When `max_retries` is 0, retry limits are disabled. +fn should_block_story(story_path: &Path, max_retries: u32, story_id: &str, stage_label: &str) -> Option { + use crate::io::story_metadata::{increment_retry_count, write_blocked}; + + if max_retries == 0 { + // Retry limits disabled. + return None; + } + + match increment_retry_count(story_path) { + Ok(new_count) => { + if new_count >= max_retries { + slog_warn!( + "[pipeline] Story '{story_id}' reached retry limit ({new_count}/{max_retries}) \ + at {stage_label} stage. Marking as blocked." + ); + if let Err(e) = write_blocked(story_path) { + slog_error!("[pipeline] Failed to write blocked flag for '{story_id}': {e}"); + } + Some(format!( + "Retry limit exceeded ({new_count}/{max_retries}) at {stage_label} stage" + )) + } else { + slog!( + "[pipeline] Story '{story_id}' retry {new_count}/{max_retries} at {stage_label} stage." + ); + None + } + } + Err(e) => { + slog_error!("[pipeline] Failed to increment retry_count for '{story_id}': {e}"); + None // Don't block on error — allow retry. + } + } +} + +#[cfg(test)] +mod tests { + use super::super::super::AgentPool; + use super::super::super::composite_key; + use crate::agents::{AgentStatus, CompletionReport}; + use crate::io::watcher::WatcherEvent; + + // ── pipeline advance tests ──────────────────────────────────────────────── + + #[tokio::test] + async fn pipeline_advance_coder_gates_pass_server_qa_moves_to_merge() { + use std::fs; + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + // Set up story in 2_current/ (no qa frontmatter → uses project default "server") + let current = root.join(".storkit/work/2_current"); + fs::create_dir_all(¤t).unwrap(); + fs::write(current.join("50_story_test.md"), "test").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.run_pipeline_advance( + "50_story_test", + "coder-1", + CompletionReport { + summary: "done".to_string(), + gates_passed: true, + gate_output: String::new(), + }, + Some(root.to_path_buf()), + None, + false, + ) + .await; + + // With default qa: server, story skips QA and goes straight to 4_merge/ + assert!( + root.join(".storkit/work/4_merge/50_story_test.md") + .exists(), + "story should be in 4_merge/" + ); + assert!( + !current.join("50_story_test.md").exists(), + "story should not still be in 2_current/" + ); + } + + #[tokio::test] + async fn pipeline_advance_coder_gates_pass_agent_qa_moves_to_qa() { + use std::fs; + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + // Set up story in 2_current/ with qa: agent frontmatter + let current = root.join(".storkit/work/2_current"); + fs::create_dir_all(¤t).unwrap(); + fs::write( + current.join("50_story_test.md"), + "---\nname: Test\nqa: agent\n---\ntest", + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + pool.run_pipeline_advance( + "50_story_test", + "coder-1", + CompletionReport { + summary: "done".to_string(), + gates_passed: true, + gate_output: String::new(), + }, + Some(root.to_path_buf()), + None, + false, + ) + .await; + + // With qa: agent, story should move to 3_qa/ + assert!( + root.join(".storkit/work/3_qa/50_story_test.md").exists(), + "story should be in 3_qa/" + ); + assert!( + !current.join("50_story_test.md").exists(), + "story should not still be in 2_current/" + ); + } + + #[tokio::test] + async fn pipeline_advance_qa_gates_pass_moves_story_to_merge() { + use std::fs; + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + // Set up story in 3_qa/ + let qa_dir = root.join(".storkit/work/3_qa"); + fs::create_dir_all(&qa_dir).unwrap(); + // qa: server so the story skips human review and goes straight to merge. + fs::write( + qa_dir.join("51_story_test.md"), + "---\nname: Test\nqa: server\n---\ntest", + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + pool.run_pipeline_advance( + "51_story_test", + "qa", + CompletionReport { + summary: "QA done".to_string(), + gates_passed: true, + gate_output: String::new(), + }, + Some(root.to_path_buf()), + None, + false, + ) + .await; + + // Story should have moved to 4_merge/ + assert!( + root.join(".storkit/work/4_merge/51_story_test.md") + .exists(), + "story should be in 4_merge/" + ); + assert!( + !qa_dir.join("51_story_test.md").exists(), + "story should not still be in 3_qa/" + ); + } + + #[tokio::test] + async fn pipeline_advance_supervisor_does_not_advance() { + use std::fs; + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let current = root.join(".storkit/work/2_current"); + fs::create_dir_all(¤t).unwrap(); + fs::write(current.join("52_story_test.md"), "test").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.run_pipeline_advance( + "52_story_test", + "supervisor", + CompletionReport { + summary: "supervised".to_string(), + gates_passed: true, + gate_output: String::new(), + }, + Some(root.to_path_buf()), + None, + false, + ) + .await; + + // Story should NOT have moved (supervisors don't advance pipeline) + assert!( + current.join("52_story_test.md").exists(), + "story should still be in 2_current/ for supervisor" + ); + } + + #[tokio::test] + async fn pipeline_advance_sends_agent_state_changed_to_watcher_tx() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + // Set up story in 2_current/ + let current = root.join(".storkit/work/2_current"); + fs::create_dir_all(¤t).unwrap(); + fs::write(current.join("173_story_test.md"), "test").unwrap(); + // Ensure 3_qa/ exists for the move target + fs::create_dir_all(root.join(".storkit/work/3_qa")).unwrap(); + // Ensure 1_backlog/ exists (start_agent calls move_story_to_current) + fs::create_dir_all(root.join(".storkit/work/1_backlog")).unwrap(); + + // Write a project.toml with a qa agent so start_agent can resolve it. + fs::create_dir_all(root.join(".storkit")).unwrap(); + fs::write( + root.join(".storkit/project.toml"), + r#" +default_qa = "agent" + +[[agent]] +name = "coder-1" +role = "Coder" +command = "echo" +args = ["noop"] +prompt = "test" +stage = "coder" + +[[agent]] +name = "qa" +role = "QA" +command = "echo" +args = ["noop"] +prompt = "test" +stage = "qa" +"#, + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + // Subscribe to the watcher channel BEFORE the pipeline advance. + let mut rx = pool.watcher_tx.subscribe(); + + pool.run_pipeline_advance( + "173_story_test", + "coder-1", + CompletionReport { + summary: "done".to_string(), + gates_passed: true, + gate_output: String::new(), + }, + Some(root.to_path_buf()), + None, + false, + ) + .await; + + // The pipeline advance should have sent AgentStateChanged events via + // the pool's watcher_tx (not a dummy channel). Collect all events. + let mut got_agent_state_changed = false; + while let Ok(evt) = rx.try_recv() { + if matches!(evt, WatcherEvent::AgentStateChanged) { + got_agent_state_changed = true; + break; + } + } + + assert!( + got_agent_state_changed, + "pipeline advance should send AgentStateChanged through the real watcher_tx \ + (bug 173: lozenges must update when agents are assigned during pipeline advance)" + ); + } + + // ── bug 295: pipeline advance picks up waiting QA stories ────────── + + #[tokio::test] + async fn pipeline_advance_picks_up_waiting_qa_stories_after_completion() { + use std::fs; + use super::super::super::auto_assign::is_agent_free; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk = root.join(".storkit"); + let qa_dir = sk.join("work/3_qa"); + fs::create_dir_all(&qa_dir).unwrap(); + + // Configure a single QA agent. + fs::write( + sk.join("project.toml"), + r#" +[[agent]] +name = "qa" +stage = "qa" +"#, + ) + .unwrap(); + + // Story 292 is in QA with QA agent running (will "complete" via + // run_pipeline_advance below). Story 293 is in QA with NO agent — + // simulating the "stuck" state from bug 295. + fs::write( + qa_dir.join("292_story_first.md"), + "---\nname: First\nqa: human\n---\n", + ) + .unwrap(); + fs::write( + qa_dir.join("293_story_second.md"), + "---\nname: Second\nqa: human\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + // QA is currently running on story 292. + pool.inject_test_agent("292_story_first", "qa", AgentStatus::Running); + + // Verify that 293 cannot get a QA agent right now (QA is busy). + { + let agents = pool.agents.lock().unwrap(); + assert!( + !is_agent_free(&agents, "qa"), + "qa should be busy on story 292" + ); + } + + // Simulate QA completing on story 292: remove the agent from the pool + // (as run_server_owned_completion does) then run pipeline advance. + { + let mut agents = pool.agents.lock().unwrap(); + agents.remove(&composite_key("292_story_first", "qa")); + } + + pool.run_pipeline_advance( + "292_story_first", + "qa", + CompletionReport { + summary: "QA done".to_string(), + gates_passed: true, + gate_output: String::new(), + }, + Some(root.to_path_buf()), + None, + false, + ) + .await; + + // After pipeline advance, auto_assign should have started QA on story 293. + let agents = pool.agents.lock().unwrap(); + let qa_on_293 = agents.values().any(|a| { + a.agent_name == "qa" + && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) + }); + assert!( + qa_on_293, + "auto_assign should have started qa for story 293 after 292's QA completed, \ + but no qa agent is pending/running. Pool: {:?}", + agents + .iter() + .map(|(k, a)| format!("{k}: {} ({})", a.agent_name, a.status)) + .collect::>() + ); + } +} diff --git a/server/src/agents/pool/pipeline/completion.rs b/server/src/agents/pool/pipeline/completion.rs new file mode 100644 index 00000000..df88a9c1 --- /dev/null +++ b/server/src/agents/pool/pipeline/completion.rs @@ -0,0 +1,519 @@ +use crate::slog; +use crate::io::watcher::WatcherEvent; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; + +use super::super::super::{AgentEvent, AgentStatus, CompletionReport}; +use super::super::{AgentPool, StoryAgent, composite_key}; +use super::advance::spawn_pipeline_advance; + +impl AgentPool { + /// Internal: report that an agent has finished work on a story. + /// + /// **Note:** This is no longer exposed as an MCP tool. The server now + /// automatically runs completion gates when an agent process exits + /// (see `run_server_owned_completion`). This method is retained for + /// backwards compatibility and testing. + /// + /// - Rejects with an error if the worktree has uncommitted changes. + /// - Runs acceptance gates (cargo clippy + cargo nextest run / cargo test). + /// - Stores the `CompletionReport` on the agent record. + /// - Transitions status to `Completed` (gates passed) or `Failed` (gates failed). + /// - Emits a `Done` event so `wait_for_agent` unblocks. + #[allow(dead_code)] + pub async fn report_completion( + &self, + story_id: &str, + agent_name: &str, + summary: &str, + ) -> Result { + let key = composite_key(story_id, agent_name); + + // Verify agent exists, is Running, and grab its worktree path. + let worktree_path = { + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let agent = agents + .get(&key) + .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; + + if agent.status != AgentStatus::Running { + return Err(format!( + "Agent '{agent_name}' for story '{story_id}' is not running (status: {}). \ + report_completion can only be called by a running agent.", + agent.status + )); + } + + agent + .worktree_info + .as_ref() + .map(|wt| wt.path.clone()) + .ok_or_else(|| { + format!( + "Agent '{agent_name}' for story '{story_id}' has no worktree. \ + Cannot run acceptance gates." + ) + })? + }; + + let path = worktree_path.clone(); + + // Run gate checks in a blocking thread to avoid stalling the async runtime. + let (gates_passed, gate_output) = tokio::task::spawn_blocking(move || { + // Step 1: Reject if worktree is dirty. + crate::agents::gates::check_uncommitted_changes(&path)?; + // Step 2: Run clippy + tests and return (passed, output). + crate::agents::gates::run_acceptance_gates(&path) + }) + .await + .map_err(|e| format!("Gate check task panicked: {e}"))??; + + let report = CompletionReport { + summary: summary.to_string(), + gates_passed, + gate_output, + }; + + // Extract data for pipeline advance, then remove the entry so + // completed agents never appear in list_agents. + let ( + tx, + session_id, + project_root_for_advance, + wt_path_for_advance, + merge_failure_reported_for_advance, + ) = { + let mut agents = self.agents.lock().map_err(|e| e.to_string())?; + let agent = agents.get_mut(&key).ok_or_else(|| { + format!("Agent '{agent_name}' for story '{story_id}' disappeared during gate check") + })?; + agent.completion = Some(report.clone()); + let tx = agent.tx.clone(); + let sid = agent.session_id.clone(); + let pr = agent.project_root.clone(); + let wt = agent.worktree_info.as_ref().map(|w| w.path.clone()); + let mfr = agent.merge_failure_reported; + agents.remove(&key); + (tx, sid, pr, wt, mfr) + }; + + // Emit Done so wait_for_agent unblocks. + let _ = tx.send(AgentEvent::Done { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + session_id, + }); + + // Notify WebSocket clients that the agent is gone. + Self::notify_agent_state_changed(&self.watcher_tx); + + // Advance the pipeline state machine in a background task. + let pool_clone = Self { + agents: Arc::clone(&self.agents), + port: self.port, + child_killers: Arc::clone(&self.child_killers), + watcher_tx: self.watcher_tx.clone(), + merge_jobs: Arc::clone(&self.merge_jobs), + }; + let sid = story_id.to_string(); + let aname = agent_name.to_string(); + let report_for_advance = report.clone(); + tokio::spawn(async move { + pool_clone + .run_pipeline_advance( + &sid, + &aname, + report_for_advance, + project_root_for_advance, + wt_path_for_advance, + merge_failure_reported_for_advance, + ) + .await; + }); + + Ok(report) + } +} + +/// Server-owned completion: runs acceptance gates when an agent process exits +/// normally, and advances the pipeline based on results. +/// +/// This is a **free function** (not a method on `AgentPool`) to break the +/// opaque type cycle that would otherwise arise: `start_agent` → spawned task +/// → server-owned completion → pipeline advance → `start_agent`. +/// +/// If the agent already has a completion report (e.g. from a legacy +/// `report_completion` call), this is a no-op to avoid double-running gates. +pub(in crate::agents::pool) async fn run_server_owned_completion( + agents: &Arc>>, + port: u16, + story_id: &str, + agent_name: &str, + session_id: Option, + watcher_tx: broadcast::Sender, +) { + let key = composite_key(story_id, agent_name); + + // Guard: skip if completion was already recorded (legacy path). + { + let lock = match agents.lock() { + Ok(a) => a, + Err(_) => return, + }; + match lock.get(&key) { + Some(agent) if agent.completion.is_some() => { + slog!( + "[agents] Completion already recorded for '{story_id}:{agent_name}'; \ + skipping server-owned gates." + ); + return; + } + Some(_) => {} + None => return, + } + } + + // Get worktree path for running gates. + let worktree_path = { + let lock = match agents.lock() { + Ok(a) => a, + Err(_) => return, + }; + lock.get(&key) + .and_then(|a| a.worktree_info.as_ref().map(|wt| wt.path.clone())) + }; + + // Run acceptance gates. + let (gates_passed, gate_output) = if let Some(wt_path) = worktree_path { + let path = wt_path; + match tokio::task::spawn_blocking(move || { + crate::agents::gates::check_uncommitted_changes(&path)?; + // AC5: Fail early if the coder finished with no commits on the feature branch. + // This prevents empty-diff stories from advancing through QA to merge. + if !crate::agents::gates::worktree_has_committed_work(&path) { + return Ok(( + false, + "Agent exited with no commits on the feature branch. \ + The agent did not produce any code changes." + .to_string(), + )); + } + crate::agents::gates::run_acceptance_gates(&path) + }) + .await + { + Ok(Ok(result)) => result, + Ok(Err(e)) => (false, e), + Err(e) => (false, format!("Gate check task panicked: {e}")), + } + } else { + ( + false, + "No worktree path available to run acceptance gates".to_string(), + ) + }; + + slog!( + "[agents] Server-owned completion for '{story_id}:{agent_name}': gates_passed={gates_passed}" + ); + + let report = CompletionReport { + summary: "Agent process exited normally".to_string(), + gates_passed, + gate_output, + }; + + // Store completion report, extract data for pipeline advance, then + // remove the entry so completed agents never appear in list_agents. + let (tx, project_root_for_advance, wt_path_for_advance, merge_failure_reported_for_advance) = { + let mut lock = match agents.lock() { + Ok(a) => a, + Err(_) => return, + }; + let agent = match lock.get_mut(&key) { + Some(a) => a, + None => return, + }; + agent.completion = Some(report.clone()); + agent.session_id = session_id.clone(); + let tx = agent.tx.clone(); + let pr = agent.project_root.clone(); + let wt = agent.worktree_info.as_ref().map(|w| w.path.clone()); + let mfr = agent.merge_failure_reported; + lock.remove(&key); + (tx, pr, wt, mfr) + }; + + // Emit Done so wait_for_agent unblocks. + let _ = tx.send(AgentEvent::Done { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + session_id, + }); + + // Notify WebSocket clients that the agent is gone. + AgentPool::notify_agent_state_changed(&watcher_tx); + + // Advance the pipeline state machine in a background task. + spawn_pipeline_advance( + Arc::clone(agents), + port, + story_id, + agent_name, + report, + project_root_for_advance, + wt_path_for_advance, + watcher_tx, + merge_failure_reported_for_advance, + ); +} + +#[cfg(test)] +mod tests { + use super::*; + use super::super::super::AgentPool; + use crate::agents::{AgentEvent, AgentStatus, CompletionReport}; + use std::path::PathBuf; + use std::process::Command; + + fn init_git_repo(repo: &std::path::Path) { + Command::new("git") + .args(["init"]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["config", "user.email", "test@test.com"]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["config", "user.name", "Test"]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "--allow-empty", "-m", "init"]) + .current_dir(repo) + .output() + .unwrap(); + } + + // ── report_completion tests ──────────────────────────────────── + + #[tokio::test] + async fn report_completion_rejects_nonexistent_agent() { + let pool = AgentPool::new_test(3001); + let result = pool.report_completion("no_story", "no_bot", "done").await; + assert!(result.is_err()); + let msg = result.unwrap_err(); + assert!(msg.contains("No agent"), "unexpected: {msg}"); + } + + #[tokio::test] + async fn report_completion_rejects_non_running_agent() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("s6", "bot", AgentStatus::Completed); + + let result = pool.report_completion("s6", "bot", "done").await; + assert!(result.is_err()); + let msg = result.unwrap_err(); + assert!( + msg.contains("not running"), + "expected 'not running' in: {msg}" + ); + } + + #[tokio::test] + async fn report_completion_rejects_dirty_worktree() { + use std::fs; + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let repo = tmp.path(); + + // Init a real git repo and make an initial commit + Command::new("git") + .args(["init"]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "--allow-empty", "-m", "init"]) + .current_dir(repo) + .output() + .unwrap(); + + // Write an uncommitted file + fs::write(repo.join("dirty.txt"), "not committed").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_path("s7", "bot", AgentStatus::Running, repo.to_path_buf()); + + let result = pool.report_completion("s7", "bot", "done").await; + assert!(result.is_err()); + let msg = result.unwrap_err(); + assert!( + msg.contains("uncommitted"), + "expected 'uncommitted' in: {msg}" + ); + } + + // ── server-owned completion tests ─────────────────────────────────────────── + + #[tokio::test] + async fn server_owned_completion_skips_when_already_completed() { + let pool = AgentPool::new_test(3001); + let report = CompletionReport { + summary: "Already done".to_string(), + gates_passed: true, + gate_output: String::new(), + }; + pool.inject_test_agent_with_completion( + "s10", + "coder-1", + AgentStatus::Completed, + PathBuf::from("/tmp/nonexistent"), + report, + ); + + // Subscribe before calling so we can check if Done event was emitted. + let mut rx = pool.subscribe("s10", "coder-1").unwrap(); + + run_server_owned_completion( + &pool.agents, + pool.port, + "s10", + "coder-1", + Some("sess-1".to_string()), + pool.watcher_tx.clone(), + ) + .await; + + // Status should remain Completed (unchanged) — no gate re-run. + let agents = pool.agents.lock().unwrap(); + let key = composite_key("s10", "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!(agent.status, AgentStatus::Completed); + // Summary should still be the original, not overwritten. + assert_eq!(agent.completion.as_ref().unwrap().summary, "Already done"); + drop(agents); + + // No Done event should have been emitted. + assert!( + rx.try_recv().is_err(), + "should not emit Done when completion already exists" + ); + } + + #[tokio::test] + async fn server_owned_completion_runs_gates_on_clean_worktree() { + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let repo = tmp.path(); + init_git_repo(repo); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_path( + "s11", + "coder-1", + AgentStatus::Running, + repo.to_path_buf(), + ); + + let mut rx = pool.subscribe("s11", "coder-1").unwrap(); + + run_server_owned_completion( + &pool.agents, + pool.port, + "s11", + "coder-1", + Some("sess-2".to_string()), + pool.watcher_tx.clone(), + ) + .await; + + // Agent entry should be removed from the map after completion. + let agents = pool.agents.lock().unwrap(); + let key = composite_key("s11", "coder-1"); + assert!( + agents.get(&key).is_none(), + "agent should be removed from map after completion" + ); + drop(agents); + + // A Done event should have been emitted with the session_id. + let event = rx.try_recv().expect("should emit Done event"); + match &event { + AgentEvent::Done { session_id, .. } => { + assert_eq!(*session_id, Some("sess-2".to_string())); + } + other => panic!("expected Done event, got: {other:?}"), + } + } + + #[tokio::test] + async fn server_owned_completion_fails_on_dirty_worktree() { + use std::fs; + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let repo = tmp.path(); + init_git_repo(repo); + // Create an uncommitted file. + fs::write(repo.join("dirty.txt"), "not committed").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_path( + "s12", + "coder-1", + AgentStatus::Running, + repo.to_path_buf(), + ); + + let mut rx = pool.subscribe("s12", "coder-1").unwrap(); + + run_server_owned_completion( + &pool.agents, + pool.port, + "s12", + "coder-1", + None, + pool.watcher_tx.clone(), + ) + .await; + + // Agent entry should be removed from the map after completion (even on failure). + let agents = pool.agents.lock().unwrap(); + let key = composite_key("s12", "coder-1"); + assert!( + agents.get(&key).is_none(), + "agent should be removed from map after failed completion" + ); + drop(agents); + + // A Done event should have been emitted. + let event = rx.try_recv().expect("should emit Done event"); + assert!( + matches!(event, AgentEvent::Done { .. }), + "expected Done event, got: {event:?}" + ); + } + + #[tokio::test] + async fn server_owned_completion_nonexistent_agent_is_noop() { + let pool = AgentPool::new_test(3001); + // Should not panic or error — just silently return. + run_server_owned_completion( + &pool.agents, + pool.port, + "nonexistent", + "bot", + None, + pool.watcher_tx.clone(), + ) + .await; + } +} diff --git a/server/src/agents/pool/pipeline/merge.rs b/server/src/agents/pool/pipeline/merge.rs new file mode 100644 index 00000000..30b94ae8 --- /dev/null +++ b/server/src/agents/pool/pipeline/merge.rs @@ -0,0 +1,544 @@ +use crate::slog; +use crate::slog_error; +use crate::slog_warn; +use crate::worktree; +use std::path::Path; +use std::sync::Arc; + +use super::super::super::PipelineStage; +use super::super::super::pipeline_stage; +use super::super::AgentPool; + +impl AgentPool { + /// Start the merge pipeline as a background task. + /// + /// Returns immediately so the MCP tool call doesn't time out (the full + /// pipeline — squash merge + quality gates — takes well over 60 seconds, + /// exceeding Claude Code's MCP tool-call timeout). + /// + /// The mergemaster agent should poll [`get_merge_status`](Self::get_merge_status) + /// until the job reaches a terminal state. + pub fn start_merge_agent_work( + self: &Arc, + project_root: &Path, + story_id: &str, + ) -> Result<(), String> { + // Guard against double-starts. + { + let jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?; + if let Some(job) = jobs.get(story_id) + && matches!(job.status, crate::agents::merge::MergeJobStatus::Running) + { + return Err(format!( + "Merge already in progress for '{story_id}'. \ + Use get_merge_status to poll for completion." + )); + } + } + + // Insert Running job. + { + let mut jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?; + jobs.insert( + story_id.to_string(), + crate::agents::merge::MergeJob { + story_id: story_id.to_string(), + status: crate::agents::merge::MergeJobStatus::Running, + }, + ); + } + + let pool = Arc::clone(self); + let root = project_root.to_path_buf(); + let sid = story_id.to_string(); + + tokio::spawn(async move { + let report = pool.run_merge_pipeline(&root, &sid).await; + let failed = report.is_err(); + let status = match report { + Ok(r) => crate::agents::merge::MergeJobStatus::Completed(r), + Err(e) => crate::agents::merge::MergeJobStatus::Failed(e), + }; + if let Ok(mut jobs) = pool.merge_jobs.lock() + && let Some(job) = jobs.get_mut(&sid) + { + job.status = status; + } + if failed { + pool.auto_assign_available_work(&root).await; + } + }); + + Ok(()) + } + + /// The actual merge pipeline, run inside a background task. + async fn run_merge_pipeline( + self: &Arc, + project_root: &Path, + story_id: &str, + ) -> Result { + let branch = format!("feature/story-{story_id}"); + let wt_path = worktree::worktree_path(project_root, story_id); + let root = project_root.to_path_buf(); + let sid = story_id.to_string(); + let br = branch.clone(); + + let merge_result = + tokio::task::spawn_blocking(move || crate::agents::merge::run_squash_merge(&root, &br, &sid)) + .await + .map_err(|e| format!("Merge task panicked: {e}"))??; + + if !merge_result.success { + return Ok(crate::agents::merge::MergeReport { + story_id: story_id.to_string(), + success: false, + had_conflicts: merge_result.had_conflicts, + conflicts_resolved: merge_result.conflicts_resolved, + conflict_details: merge_result.conflict_details, + gates_passed: merge_result.gates_passed, + gate_output: merge_result.output, + worktree_cleaned_up: false, + story_archived: false, + }); + } + + let story_archived = + crate::agents::lifecycle::move_story_to_archived(project_root, story_id).is_ok(); + if story_archived { + self.remove_agents_for_story(story_id); + } + + let worktree_cleaned_up = if wt_path.exists() { + let config = crate::config::ProjectConfig::load(project_root).unwrap_or_default(); + worktree::remove_worktree_by_story_id(project_root, story_id, &config) + .await + .is_ok() + } else { + false + }; + + self.auto_assign_available_work(project_root).await; + + Ok(crate::agents::merge::MergeReport { + story_id: story_id.to_string(), + success: true, + had_conflicts: merge_result.had_conflicts, + conflicts_resolved: merge_result.conflicts_resolved, + conflict_details: merge_result.conflict_details, + gates_passed: true, + gate_output: merge_result.output, + worktree_cleaned_up, + story_archived, + }) + } + + /// Check the status of a background merge job. + pub fn get_merge_status(&self, story_id: &str) -> Option { + self.merge_jobs + .lock() + .ok() + .and_then(|jobs| jobs.get(story_id).cloned()) + } + + /// Record that the mergemaster agent for `story_id` explicitly reported a + /// merge failure via the `report_merge_failure` MCP tool. + /// + /// Sets `merge_failure_reported = true` on the active mergemaster agent so + /// that `run_pipeline_advance` can block advancement to `5_done/` even when + /// the server-owned gate check returns `gates_passed=true` (those gates run + /// in the feature-branch worktree, not on master). + pub fn set_merge_failure_reported(&self, story_id: &str) { + match self.agents.lock() { + Ok(mut lock) => { + let found = lock.iter_mut().find(|(key, agent)| { + let key_story_id = key + .rsplit_once(':') + .map(|(sid, _)| sid) + .unwrap_or(key.as_str()); + key_story_id == story_id + && pipeline_stage(&agent.agent_name) == PipelineStage::Mergemaster + }); + match found { + Some((_, agent)) => { + agent.merge_failure_reported = true; + slog!( + "[pipeline] Merge failure flag set for '{story_id}:{}'", + agent.agent_name + ); + } + None => { + slog_warn!( + "[pipeline] set_merge_failure_reported: no running mergemaster found \ + for story '{story_id}' — flag not set" + ); + } + } + } + Err(e) => { + slog_error!("[pipeline] set_merge_failure_reported: could not lock agents: {e}"); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use super::super::super::AgentPool; + use crate::agents::merge::{MergeJob, MergeJobStatus}; + use std::process::Command; + + fn init_git_repo(repo: &std::path::Path) { + Command::new("git") + .args(["init"]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["config", "user.email", "test@test.com"]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["config", "user.name", "Test"]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "--allow-empty", "-m", "init"]) + .current_dir(repo) + .output() + .unwrap(); + } + + // ── merge_agent_work tests ──────────────────────────────────────────────── + + /// Helper: start a merge and poll until terminal state. + async fn run_merge_to_completion( + pool: &Arc, + repo: &std::path::Path, + story_id: &str, + ) -> MergeJob { + pool.start_merge_agent_work(repo, story_id).unwrap(); + loop { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + if let Some(job) = pool.get_merge_status(story_id) + && !matches!(job.status, MergeJobStatus::Running) + { + return job; + } + } + } + + #[tokio::test] + async fn merge_agent_work_returns_error_when_branch_not_found() { + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let repo = tmp.path(); + init_git_repo(repo); + + let pool = Arc::new(AgentPool::new_test(3001)); + let job = run_merge_to_completion(&pool, repo, "99_nonexistent").await; + match &job.status { + MergeJobStatus::Completed(report) => { + assert!(!report.success, "should fail when branch missing"); + } + MergeJobStatus::Failed(_) => { + // Also acceptable — the pipeline errored out + } + MergeJobStatus::Running => { + panic!("should not still be running"); + } + } + } + + #[tokio::test] + async fn merge_agent_work_succeeds_on_clean_branch() { + use std::fs; + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let repo = tmp.path(); + init_git_repo(repo); + + // Create a feature branch with a commit + Command::new("git") + .args(["checkout", "-b", "feature/story-23_test"]) + .current_dir(repo) + .output() + .unwrap(); + fs::write(repo.join("feature.txt"), "feature content").unwrap(); + Command::new("git") + .args(["add", "."]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "-m", "add feature"]) + .current_dir(repo) + .output() + .unwrap(); + + // Switch back to master (initial branch) + Command::new("git") + .args(["checkout", "master"]) + .current_dir(repo) + .output() + .unwrap(); + + // Create the story file in 4_merge/ so we can test archival + let merge_dir = repo.join(".storkit/work/4_merge"); + fs::create_dir_all(&merge_dir).unwrap(); + let story_file = merge_dir.join("23_test.md"); + fs::write(&story_file, "---\nname: Test\n---\n").unwrap(); + Command::new("git") + .args(["add", "."]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "-m", "add story in merge"]) + .current_dir(repo) + .output() + .unwrap(); + + let pool = Arc::new(AgentPool::new_test(3001)); + let job = run_merge_to_completion(&pool, repo, "23_test").await; + + match &job.status { + MergeJobStatus::Completed(report) => { + assert!(!report.had_conflicts, "should have no conflicts"); + assert!( + report.success + || report.gate_output.contains("Failed to run") + || !report.gates_passed, + "report should be coherent: {report:?}" + ); + if report.story_archived { + let done = repo.join(".storkit/work/5_done/23_test.md"); + assert!(done.exists(), "done file should exist"); + } + } + MergeJobStatus::Failed(e) => { + // Gate failures are acceptable in test env + assert!( + e.contains("Failed") || e.contains("failed"), + "unexpected failure: {e}" + ); + } + MergeJobStatus::Running => panic!("should not still be running"), + } + } + + // ── quality gate ordering test ──────────────────────────────── + + /// Regression test for bug 142: quality gates must run BEFORE the fast-forward + /// to master so that broken code never lands on master. + #[cfg(unix)] + #[test] + fn quality_gates_run_before_fast_forward_to_master() { + use std::fs; + use std::os::unix::fs::PermissionsExt; + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let repo = tmp.path(); + init_git_repo(repo); + + // Add a failing script/test so quality gates will fail. + let script_dir = repo.join("script"); + fs::create_dir_all(&script_dir).unwrap(); + let script_test = script_dir.join("test"); + fs::write(&script_test, "#!/usr/bin/env bash\nexit 1\n").unwrap(); + let mut perms = fs::metadata(&script_test).unwrap().permissions(); + perms.set_mode(0o755); + fs::set_permissions(&script_test, perms).unwrap(); + Command::new("git") + .args(["add", "."]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "-m", "add failing script/test"]) + .current_dir(repo) + .output() + .unwrap(); + + // Create a feature branch with a commit. + Command::new("git") + .args(["checkout", "-b", "feature/story-142_test"]) + .current_dir(repo) + .output() + .unwrap(); + fs::write(repo.join("change.txt"), "feature change").unwrap(); + Command::new("git") + .args(["add", "."]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "-m", "feature work"]) + .current_dir(repo) + .output() + .unwrap(); + + // Switch back to master and record its HEAD. + Command::new("git") + .args(["checkout", "master"]) + .current_dir(repo) + .output() + .unwrap(); + let head_before = String::from_utf8( + Command::new("git") + .args(["rev-parse", "HEAD"]) + .current_dir(repo) + .output() + .unwrap() + .stdout, + ) + .unwrap() + .trim() + .to_string(); + + // Run the squash-merge. The failing script/test makes quality gates + // fail → fast-forward must NOT happen. + let result = + crate::agents::merge::run_squash_merge(repo, "feature/story-142_test", "142_test") + .unwrap(); + + let head_after = String::from_utf8( + Command::new("git") + .args(["rev-parse", "HEAD"]) + .current_dir(repo) + .output() + .unwrap() + .stdout, + ) + .unwrap() + .trim() + .to_string(); + + // Gates must have failed (script/test exits 1) so master should be untouched. + assert!( + !result.success, + "run_squash_merge must report failure when gates fail" + ); + assert_eq!( + head_before, head_after, + "master HEAD must not advance when quality gates fail (bug 142)" + ); + } + + #[tokio::test] + async fn merge_agent_work_conflict_does_not_break_master() { + use std::fs; + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let repo = tmp.path(); + init_git_repo(repo); + + // Create a file on master. + fs::write( + repo.join("code.rs"), + "fn main() {\n println!(\"hello\");\n}\n", + ) + .unwrap(); + Command::new("git") + .args(["add", "."]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "-m", "initial code"]) + .current_dir(repo) + .output() + .unwrap(); + + // Feature branch: modify the same line differently. + Command::new("git") + .args(["checkout", "-b", "feature/story-42_story_foo"]) + .current_dir(repo) + .output() + .unwrap(); + fs::write( + repo.join("code.rs"), + "fn main() {\n println!(\"hello\");\n feature_fn();\n}\n", + ) + .unwrap(); + Command::new("git") + .args(["add", "."]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "-m", "feature: add fn call"]) + .current_dir(repo) + .output() + .unwrap(); + + // Master: add different line at same location. + Command::new("git") + .args(["checkout", "master"]) + .current_dir(repo) + .output() + .unwrap(); + fs::write( + repo.join("code.rs"), + "fn main() {\n println!(\"hello\");\n master_fn();\n}\n", + ) + .unwrap(); + Command::new("git") + .args(["add", "."]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "-m", "master: add fn call"]) + .current_dir(repo) + .output() + .unwrap(); + + // Create story file in 4_merge. + let merge_dir = repo.join(".storkit/work/4_merge"); + fs::create_dir_all(&merge_dir).unwrap(); + fs::write(merge_dir.join("42_story_foo.md"), "---\nname: Test\n---\n").unwrap(); + Command::new("git") + .args(["add", "."]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "-m", "add story"]) + .current_dir(repo) + .output() + .unwrap(); + + let pool = Arc::new(AgentPool::new_test(3001)); + let job = run_merge_to_completion(&pool, repo, "42_story_foo").await; + + // Master should NEVER have conflict markers, regardless of merge outcome. + let master_code = fs::read_to_string(repo.join("code.rs")).unwrap(); + assert!( + !master_code.contains("<<<<<<<"), + "master must never contain conflict markers:\n{master_code}" + ); + assert!( + !master_code.contains(">>>>>>>"), + "master must never contain conflict markers:\n{master_code}" + ); + + // The report should accurately reflect what happened. + match &job.status { + MergeJobStatus::Completed(report) => { + assert!(report.had_conflicts, "should report conflicts"); + } + MergeJobStatus::Failed(_) => { + // Acceptable — merge aborted due to conflicts + } + MergeJobStatus::Running => panic!("should not still be running"), + } + } +} diff --git a/server/src/agents/pool/pipeline/mod.rs b/server/src/agents/pool/pipeline/mod.rs new file mode 100644 index 00000000..14aa5390 --- /dev/null +++ b/server/src/agents/pool/pipeline/mod.rs @@ -0,0 +1,5 @@ +mod advance; +mod completion; +mod merge; + +pub(super) use completion::run_server_owned_completion; diff --git a/server/src/agents/pool/types.rs b/server/src/agents/pool/types.rs index 345ace0a..c579956a 100644 --- a/server/src/agents/pool/types.rs +++ b/server/src/agents/pool/types.rs @@ -81,6 +81,7 @@ pub(super) struct StoryAgent { /// though the code was never squash-merged onto master. pub(super) merge_failure_reported: bool, /// Set to `true` when a rate-limit throttle warning was received for this agent. + /// True when a rate-limit throttle warning was received for this agent. pub(super) throttled: bool, }