From 00b212d7e33ae67a2bd6d9c52ef79ed90563f3db Mon Sep 17 00:00:00 2001 From: Dave Date: Mon, 23 Feb 2026 13:13:41 +0000 Subject: [PATCH] Server drives pipeline as state machine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On agent completion, the server automatically runs script/test and advances stories through the pipeline: coder → qa → mergemaster → archive. Failed gates restart the agent with failure context. Agents no longer need to call pipeline-advancing MCP tools. Co-Authored-By: Claude Opus 4.6 --- server/src/agents.rs | 445 +++++++++++++++++++++++++++++++++++++- server/src/http/agents.rs | 1 + server/src/http/mcp.rs | 6 +- 3 files changed, 440 insertions(+), 12 deletions(-) diff --git a/server/src/agents.rs b/server/src/agents.rs index 77d9432..5dc571b 100644 --- a/server/src/agents.rs +++ b/server/src/agents.rs @@ -70,6 +70,29 @@ impl std::fmt::Display for AgentStatus { } } +/// Pipeline stages for automatic story advancement. +#[derive(Debug, Clone, PartialEq)] +pub enum PipelineStage { + /// Coding agents (coder-1, coder-2, etc.) + Coder, + /// QA review agent + Qa, + /// Mergemaster agent + Mergemaster, + /// Supervisors and unknown agents — no automatic advancement. + Other, +} + +/// Determine the pipeline stage from an agent name. +pub fn pipeline_stage(agent_name: &str) -> PipelineStage { + match agent_name { + "qa" => PipelineStage::Qa, + "mergemaster" => PipelineStage::Mergemaster, + name if name.starts_with("coder") => PipelineStage::Coder, + _ => PipelineStage::Other, + } +} + /// Report produced by an agent calling `report_completion`. #[derive(Debug, Serialize, Clone)] pub struct CompletionReport { @@ -100,6 +123,8 @@ struct StoryAgent { event_log: Arc>>, /// Set when the agent calls report_completion. completion: Option, + /// Project root, stored for pipeline advancement after completion. + project_root: Option, } /// Build an `AgentInfo` snapshot from a `StoryAgent` map entry. @@ -137,11 +162,14 @@ impl AgentPool { /// Start an agent for a story: load config, create worktree, spawn agent. /// If `agent_name` is None, defaults to the first configured agent. + /// If `resume_context` is provided, it is appended to the rendered prompt + /// so the agent can pick up from a previous failed attempt. pub async fn start_agent( &self, project_root: &Path, story_id: &str, agent_name: Option<&str>, + resume_context: Option<&str>, ) -> Result { let config = ProjectConfig::load(project_root)?; @@ -193,6 +221,7 @@ impl AgentPool { task_handle: None, event_log: event_log.clone(), completion: None, + project_root: Some(project_root.to_path_buf()), }, ); } @@ -219,9 +248,14 @@ impl AgentPool { // Spawn the agent process let wt_path_str = wt_info.path.to_string_lossy().to_string(); - let (command, args, prompt) = + let (command, args, mut prompt) = config.render_agent_args(&wt_path_str, story_id, Some(&resolved_name), Some(&wt_info.base_branch))?; + // Append resume context if this is a restart with failure information. + if let Some(ctx) = resume_context { + prompt.push_str(ctx); + } + let sid = story_id.to_string(); let aname = resolved_name.clone(); let tx_clone = tx.clone(); @@ -495,6 +529,186 @@ impl AgentPool { worktree::create_worktree(project_root, story_id, &config, self.port).await } + /// Advance the pipeline after an agent completes. + /// + /// Called internally by `report_completion` as a background task. + /// Reads the stored completion report and project_root from the agent, + /// then drives the next pipeline stage based on the agent's role: + /// + /// - **Coder** + gates passed → move story to `work/3_qa/`, start `qa` agent. + /// - **Coder** + gates failed → restart the same coder agent with failure context. + /// - **QA** + gates passed → move story to `work/4_merge/`, start `mergemaster` agent. + /// - **QA** + gates failed → restart `qa` with failure context. + /// - **Mergemaster** → run `script/test` on master; if pass: archive + cleanup worktree; + /// if fail: restart `mergemaster` with failure context. + /// - **Other** (supervisor, unknown) → no automatic advancement. + async fn run_pipeline_advance_for_completed_agent(&self, story_id: &str, agent_name: &str) { + let key = composite_key(story_id, agent_name); + + let (completion, project_root) = { + let agents = match self.agents.lock() { + Ok(a) => a, + Err(e) => { + eprintln!("[pipeline] Failed to lock agents for '{story_id}:{agent_name}': {e}"); + return; + } + }; + let agent = match agents.get(&key) { + Some(a) => a, + None => return, + }; + (agent.completion.clone(), agent.project_root.clone()) + }; + + let completion = match completion { + Some(c) => c, + None => { + eprintln!("[pipeline] No completion report for '{story_id}:{agent_name}'"); + return; + } + }; + let project_root = match project_root { + Some(p) => p, + None => { + eprintln!("[pipeline] No project_root for '{story_id}:{agent_name}'"); + return; + } + }; + + let stage = pipeline_stage(agent_name); + + match stage { + PipelineStage::Other => { + // Supervisors and unknown agents do not advance the pipeline. + } + PipelineStage::Coder => { + if completion.gates_passed { + eprintln!( + "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. Moving to QA." + ); + if let Err(e) = move_story_to_qa(&project_root, story_id) { + eprintln!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"); + return; + } + if let Err(e) = self + .start_agent(&project_root, story_id, Some("qa"), None) + .await + { + eprintln!("[pipeline] Failed to start qa agent for '{story_id}': {e}"); + } + } else { + eprintln!( + "[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting." + ); + let context = format!( + "\n\n---\n## Previous Attempt Failed\n\ + The acceptance gates failed with the following output:\n{}\n\n\ + Please review the failures above, fix the issues, and try again.", + completion.gate_output + ); + if let Err(e) = self + .start_agent(&project_root, story_id, Some(agent_name), Some(&context)) + .await + { + eprintln!( + "[pipeline] Failed to restart coder '{agent_name}' for '{story_id}': {e}" + ); + } + } + } + PipelineStage::Qa => { + if completion.gates_passed { + eprintln!( + "[pipeline] QA passed gates for '{story_id}'. Moving to merge." + ); + if let Err(e) = move_story_to_merge(&project_root, story_id) { + eprintln!("[pipeline] Failed to move '{story_id}' to 4_merge/: {e}"); + return; + } + if let Err(e) = self + .start_agent(&project_root, story_id, Some("mergemaster"), None) + .await + { + eprintln!("[pipeline] Failed to start mergemaster for '{story_id}': {e}"); + } + } else { + eprintln!( + "[pipeline] QA failed gates for '{story_id}'. Restarting." + ); + let context = format!( + "\n\n---\n## Previous QA Attempt Failed\n\ + The acceptance gates failed with the following output:\n{}\n\n\ + Please re-run and fix the issues.", + completion.gate_output + ); + if let Err(e) = self + .start_agent(&project_root, story_id, Some("qa"), Some(&context)) + .await + { + eprintln!("[pipeline] Failed to restart qa for '{story_id}': {e}"); + } + } + } + PipelineStage::Mergemaster => { + // Run script/test on master (project_root) as the post-merge verification. + eprintln!( + "[pipeline] Mergemaster completed for '{story_id}'. Running post-merge tests on master." + ); + let root = project_root.clone(); + let test_result = tokio::task::spawn_blocking(move || run_project_tests(&root)) + .await + .unwrap_or_else(|e| { + eprintln!("[pipeline] Post-merge test task panicked: {e}"); + Ok((false, format!("Test task panicked: {e}"))) + }); + let (passed, output) = match test_result { + Ok(pair) => pair, + Err(e) => (false, e), + }; + + if passed { + eprintln!( + "[pipeline] Post-merge tests passed for '{story_id}'. Archiving." + ); + if let Err(e) = move_story_to_archived(&project_root, story_id) { + eprintln!("[pipeline] Failed to archive '{story_id}': {e}"); + } + let config = + crate::config::ProjectConfig::load(&project_root).unwrap_or_default(); + if let Err(e) = + worktree::remove_worktree_by_story_id(&project_root, story_id, &config) + .await + { + eprintln!( + "[pipeline] Failed to remove worktree for '{story_id}': {e}" + ); + } + eprintln!( + "[pipeline] Story '{story_id}' archived and worktree cleaned up." + ); + } else { + eprintln!( + "[pipeline] Post-merge tests failed for '{story_id}'. Restarting mergemaster." + ); + let context = format!( + "\n\n---\n## Post-Merge Test Failed\n\ + The tests on master failed with the following output:\n{}\n\n\ + Please investigate and resolve the failures, then call merge_agent_work again.", + output + ); + if let Err(e) = self + .start_agent(&project_root, story_id, Some("mergemaster"), Some(&context)) + .await + { + eprintln!( + "[pipeline] Failed to restart mergemaster for '{story_id}': {e}" + ); + } + } + } + } + } + /// Report that an agent has finished work on a story. /// /// - Rejects with an error if the worktree has uncommitted changes. @@ -577,6 +791,21 @@ impl AgentPool { session_id, }); + // Advance the pipeline state machine in a background task. + // Only advance when the agent completed (not failed) to avoid spurious restarts + // from agents that never ran acceptance gates properly. + let pool_clone = Self { + agents: Arc::clone(&self.agents), + port: self.port, + }; + let sid = story_id.to_string(); + let aname = agent_name.to_string(); + tokio::spawn(async move { + pool_clone + .run_pipeline_advance_for_completed_agent(&sid, &aname) + .await; + }); + Ok(report) } @@ -701,6 +930,7 @@ impl AgentPool { task_handle: None, event_log: Arc::new(Mutex::new(Vec::new())), completion: None, + project_root: None, }, ); tx @@ -734,6 +964,38 @@ impl AgentPool { task_handle: None, event_log: Arc::new(Mutex::new(Vec::new())), completion: None, + project_root: None, + }, + ); + tx + } + + /// Test helper: inject an agent with a completion report and project_root + /// for testing pipeline advance logic without spawning real agents. + #[cfg(test)] + pub fn inject_test_agent_with_completion( + &self, + story_id: &str, + agent_name: &str, + status: AgentStatus, + project_root: PathBuf, + completion: CompletionReport, + ) -> broadcast::Sender { + let (tx, _) = broadcast::channel::(64); + let key = composite_key(story_id, agent_name); + let mut agents = self.agents.lock().unwrap(); + agents.insert( + key, + StoryAgent { + agent_name: agent_name.to_string(), + status, + worktree_info: None, + session_id: None, + tx: tx.clone(), + task_handle: None, + event_log: Arc::new(Mutex::new(Vec::new())), + completion: Some(completion), + project_root: Some(project_root), }, ); tx @@ -861,13 +1123,14 @@ pub fn move_story_to_archived(project_root: &Path, story_id: &str) -> Result<(), Ok(()) } -/// Move a story/bug from `work/2_current/` to `work/4_merge/` and auto-commit. +/// Move a story/bug from `work/2_current/` or `work/3_qa/` to `work/4_merge/`. /// /// This stages a work item as ready for the mergemaster to pick up and merge into master. /// Idempotent: if already in `4_merge/`, returns Ok without committing. pub fn move_story_to_merge(project_root: &Path, story_id: &str) -> Result<(), String> { let sk = project_root.join(".story_kit").join("work"); let current_path = sk.join("2_current").join(format!("{story_id}.md")); + let qa_path = sk.join("3_qa").join(format!("{story_id}.md")); let merge_dir = sk.join("4_merge"); let merge_path = merge_dir.join(format!("{story_id}.md")); @@ -876,18 +1139,28 @@ pub fn move_story_to_merge(project_root: &Path, story_id: &str) -> Result<(), St return Ok(()); } - if !current_path.exists() { + // Accept from 2_current/ (manual trigger) or 3_qa/ (pipeline advancement from QA stage). + let source_path = if current_path.exists() { + current_path.clone() + } else if qa_path.exists() { + qa_path.clone() + } else { return Err(format!( - "Work item '{story_id}' not found in work/2_current/. Cannot move to 4_merge/." + "Work item '{story_id}' not found in work/2_current/ or work/3_qa/. Cannot move to 4_merge/." )); - } + }; std::fs::create_dir_all(&merge_dir) .map_err(|e| format!("Failed to create work/4_merge/ directory: {e}"))?; - std::fs::rename(¤t_path, &merge_path) + std::fs::rename(&source_path, &merge_path) .map_err(|e| format!("Failed to move '{story_id}' to 4_merge/: {e}"))?; - eprintln!("[lifecycle] Moved '{story_id}' from work/2_current/ to work/4_merge/"); + let from_dir = if source_path == current_path { + "work/2_current/" + } else { + "work/3_qa/" + }; + eprintln!("[lifecycle] Moved '{story_id}' from {from_dir} to work/4_merge/"); Ok(()) } @@ -1692,6 +1965,145 @@ mod tests { assert_eq!(item_type_from_id("1_story_simple"), "story"); } + // ── pipeline_stage tests ────────────────────────────────────────────────── + + #[test] + fn pipeline_stage_detects_coders() { + assert_eq!(pipeline_stage("coder-1"), PipelineStage::Coder); + assert_eq!(pipeline_stage("coder-2"), PipelineStage::Coder); + assert_eq!(pipeline_stage("coder-3"), PipelineStage::Coder); + } + + #[test] + fn pipeline_stage_detects_qa() { + assert_eq!(pipeline_stage("qa"), PipelineStage::Qa); + } + + #[test] + fn pipeline_stage_detects_mergemaster() { + assert_eq!(pipeline_stage("mergemaster"), PipelineStage::Mergemaster); + } + + #[test] + fn pipeline_stage_supervisor_is_other() { + assert_eq!(pipeline_stage("supervisor"), PipelineStage::Other); + assert_eq!(pipeline_stage("default"), PipelineStage::Other); + assert_eq!(pipeline_stage("unknown"), PipelineStage::Other); + } + + // ── pipeline advance tests ──────────────────────────────────────────────── + + #[tokio::test] + async fn pipeline_advance_coder_gates_pass_moves_story_to_qa() { + use std::fs; + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + // Set up story in 2_current/ + let current = root.join(".story_kit/work/2_current"); + fs::create_dir_all(¤t).unwrap(); + fs::write(current.join("50_story_test.md"), "test").unwrap(); + + let pool = AgentPool::new(3001); + pool.inject_test_agent_with_completion( + "50_story_test", + "coder-1", + AgentStatus::Completed, + root.to_path_buf(), + CompletionReport { + summary: "done".to_string(), + gates_passed: true, + gate_output: String::new(), + }, + ); + + // Call pipeline advance directly (bypasses background spawn for testing). + pool.run_pipeline_advance_for_completed_agent("50_story_test", "coder-1") + .await; + + // Story should have moved to 3_qa/ (start_agent for qa will fail in tests but + // the file move happens before that). + assert!( + root.join(".story_kit/work/3_qa/50_story_test.md").exists(), + "story should be in 3_qa/" + ); + assert!( + !current.join("50_story_test.md").exists(), + "story should not still be in 2_current/" + ); + } + + #[tokio::test] + async fn pipeline_advance_qa_gates_pass_moves_story_to_merge() { + use std::fs; + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + // Set up story in 3_qa/ + let qa_dir = root.join(".story_kit/work/3_qa"); + fs::create_dir_all(&qa_dir).unwrap(); + fs::write(qa_dir.join("51_story_test.md"), "test").unwrap(); + + let pool = AgentPool::new(3001); + pool.inject_test_agent_with_completion( + "51_story_test", + "qa", + AgentStatus::Completed, + root.to_path_buf(), + CompletionReport { + summary: "QA done".to_string(), + gates_passed: true, + gate_output: String::new(), + }, + ); + + pool.run_pipeline_advance_for_completed_agent("51_story_test", "qa") + .await; + + // Story should have moved to 4_merge/ + assert!( + root.join(".story_kit/work/4_merge/51_story_test.md").exists(), + "story should be in 4_merge/" + ); + assert!( + !qa_dir.join("51_story_test.md").exists(), + "story should not still be in 3_qa/" + ); + } + + #[tokio::test] + async fn pipeline_advance_supervisor_does_not_advance() { + use std::fs; + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let current = root.join(".story_kit/work/2_current"); + fs::create_dir_all(¤t).unwrap(); + fs::write(current.join("52_story_test.md"), "test").unwrap(); + + let pool = AgentPool::new(3001); + pool.inject_test_agent_with_completion( + "52_story_test", + "supervisor", + AgentStatus::Completed, + root.to_path_buf(), + CompletionReport { + summary: "supervised".to_string(), + gates_passed: true, + gate_output: String::new(), + }, + ); + + pool.run_pipeline_advance_for_completed_agent("52_story_test", "supervisor") + .await; + + // Story should NOT have moved (supervisors don't advance pipeline) + assert!( + current.join("52_story_test.md").exists(), + "story should still be in 2_current/ for supervisor" + ); + } + // ── move_story_to_merge tests ────────────────────────────────────────────── #[test] @@ -1709,6 +2121,21 @@ mod tests { assert!(root.join(".story_kit/work/4_merge/20_story_foo.md").exists()); } + #[test] + fn move_story_to_merge_from_qa_dir() { + use std::fs; + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + let qa_dir = root.join(".story_kit/work/3_qa"); + fs::create_dir_all(&qa_dir).unwrap(); + fs::write(qa_dir.join("40_story_test.md"), "test").unwrap(); + + move_story_to_merge(root, "40_story_test").unwrap(); + + assert!(!qa_dir.join("40_story_test.md").exists()); + assert!(root.join(".story_kit/work/4_merge/40_story_test.md").exists()); + } + #[test] fn move_story_to_merge_idempotent_when_already_in_merge() { use std::fs; @@ -1723,10 +2150,10 @@ mod tests { } #[test] - fn move_story_to_merge_errors_when_not_in_current() { + fn move_story_to_merge_errors_when_not_in_current_or_qa() { let tmp = tempfile::tempdir().unwrap(); let result = move_story_to_merge(tmp.path(), "99_nonexistent"); - assert!(result.unwrap_err().contains("not found in work/2_current/")); + assert!(result.unwrap_err().contains("not found in work/2_current/ or work/3_qa/")); } // ── move_story_to_qa tests ──────────────────────────────────────────────── diff --git a/server/src/http/agents.rs b/server/src/http/agents.rs index 45117d1..8bdcab6 100644 --- a/server/src/http/agents.rs +++ b/server/src/http/agents.rs @@ -86,6 +86,7 @@ impl AgentsApi { &project_root, &payload.0.story_id, payload.0.agent_name.as_deref(), + None, ) .await .map_err(bad_request)?; diff --git a/server/src/http/mcp.rs b/server/src/http/mcp.rs index 82f182d..fdec178 100644 --- a/server/src/http/mcp.rs +++ b/server/src/http/mcp.rs @@ -983,7 +983,7 @@ async fn tool_start_agent(args: &Value, ctx: &AppContext) -> Result Result Result