Server drives pipeline as state machine

On agent completion, the server automatically runs script/test and
advances stories through the pipeline: coder → qa → mergemaster →
archive. Failed gates restart the agent with failure context. Agents
no longer need to call pipeline-advancing MCP tools.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Dave
2026-02-23 13:13:41 +00:00
parent 682c8f9b36
commit 00b212d7e3
3 changed files with 440 additions and 12 deletions

View File

@@ -70,6 +70,29 @@ impl std::fmt::Display for AgentStatus {
}
}
/// Pipeline stages for automatic story advancement.
#[derive(Debug, Clone, PartialEq)]
pub enum PipelineStage {
/// Coding agents (coder-1, coder-2, etc.)
Coder,
/// QA review agent
Qa,
/// Mergemaster agent
Mergemaster,
/// Supervisors and unknown agents — no automatic advancement.
Other,
}
/// Determine the pipeline stage from an agent name.
pub fn pipeline_stage(agent_name: &str) -> PipelineStage {
match agent_name {
"qa" => PipelineStage::Qa,
"mergemaster" => PipelineStage::Mergemaster,
name if name.starts_with("coder") => PipelineStage::Coder,
_ => PipelineStage::Other,
}
}
/// Report produced by an agent calling `report_completion`.
#[derive(Debug, Serialize, Clone)]
pub struct CompletionReport {
@@ -100,6 +123,8 @@ struct StoryAgent {
event_log: Arc<Mutex<Vec<AgentEvent>>>,
/// Set when the agent calls report_completion.
completion: Option<CompletionReport>,
/// Project root, stored for pipeline advancement after completion.
project_root: Option<PathBuf>,
}
/// Build an `AgentInfo` snapshot from a `StoryAgent` map entry.
@@ -137,11 +162,14 @@ impl AgentPool {
/// Start an agent for a story: load config, create worktree, spawn agent.
/// If `agent_name` is None, defaults to the first configured agent.
/// If `resume_context` is provided, it is appended to the rendered prompt
/// so the agent can pick up from a previous failed attempt.
pub async fn start_agent(
&self,
project_root: &Path,
story_id: &str,
agent_name: Option<&str>,
resume_context: Option<&str>,
) -> Result<AgentInfo, String> {
let config = ProjectConfig::load(project_root)?;
@@ -193,6 +221,7 @@ impl AgentPool {
task_handle: None,
event_log: event_log.clone(),
completion: None,
project_root: Some(project_root.to_path_buf()),
},
);
}
@@ -219,9 +248,14 @@ impl AgentPool {
// Spawn the agent process
let wt_path_str = wt_info.path.to_string_lossy().to_string();
let (command, args, prompt) =
let (command, args, mut prompt) =
config.render_agent_args(&wt_path_str, story_id, Some(&resolved_name), Some(&wt_info.base_branch))?;
// Append resume context if this is a restart with failure information.
if let Some(ctx) = resume_context {
prompt.push_str(ctx);
}
let sid = story_id.to_string();
let aname = resolved_name.clone();
let tx_clone = tx.clone();
@@ -495,6 +529,186 @@ impl AgentPool {
worktree::create_worktree(project_root, story_id, &config, self.port).await
}
/// Advance the pipeline after an agent completes.
///
/// Called internally by `report_completion` as a background task.
/// Reads the stored completion report and project_root from the agent,
/// then drives the next pipeline stage based on the agent's role:
///
/// - **Coder** + gates passed → move story to `work/3_qa/`, start `qa` agent.
/// - **Coder** + gates failed → restart the same coder agent with failure context.
/// - **QA** + gates passed → move story to `work/4_merge/`, start `mergemaster` agent.
/// - **QA** + gates failed → restart `qa` with failure context.
/// - **Mergemaster** → run `script/test` on master; if pass: archive + cleanup worktree;
/// if fail: restart `mergemaster` with failure context.
/// - **Other** (supervisor, unknown) → no automatic advancement.
async fn run_pipeline_advance_for_completed_agent(&self, story_id: &str, agent_name: &str) {
let key = composite_key(story_id, agent_name);
let (completion, project_root) = {
let agents = match self.agents.lock() {
Ok(a) => a,
Err(e) => {
eprintln!("[pipeline] Failed to lock agents for '{story_id}:{agent_name}': {e}");
return;
}
};
let agent = match agents.get(&key) {
Some(a) => a,
None => return,
};
(agent.completion.clone(), agent.project_root.clone())
};
let completion = match completion {
Some(c) => c,
None => {
eprintln!("[pipeline] No completion report for '{story_id}:{agent_name}'");
return;
}
};
let project_root = match project_root {
Some(p) => p,
None => {
eprintln!("[pipeline] No project_root for '{story_id}:{agent_name}'");
return;
}
};
let stage = pipeline_stage(agent_name);
match stage {
PipelineStage::Other => {
// Supervisors and unknown agents do not advance the pipeline.
}
PipelineStage::Coder => {
if completion.gates_passed {
eprintln!(
"[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. Moving to QA."
);
if let Err(e) = move_story_to_qa(&project_root, story_id) {
eprintln!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}");
return;
}
if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), None)
.await
{
eprintln!("[pipeline] Failed to start qa agent for '{story_id}': {e}");
}
} else {
eprintln!(
"[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting."
);
let context = format!(
"\n\n---\n## Previous Attempt Failed\n\
The acceptance gates failed with the following output:\n{}\n\n\
Please review the failures above, fix the issues, and try again.",
completion.gate_output
);
if let Err(e) = self
.start_agent(&project_root, story_id, Some(agent_name), Some(&context))
.await
{
eprintln!(
"[pipeline] Failed to restart coder '{agent_name}' for '{story_id}': {e}"
);
}
}
}
PipelineStage::Qa => {
if completion.gates_passed {
eprintln!(
"[pipeline] QA passed gates for '{story_id}'. Moving to merge."
);
if let Err(e) = move_story_to_merge(&project_root, story_id) {
eprintln!("[pipeline] Failed to move '{story_id}' to 4_merge/: {e}");
return;
}
if let Err(e) = self
.start_agent(&project_root, story_id, Some("mergemaster"), None)
.await
{
eprintln!("[pipeline] Failed to start mergemaster for '{story_id}': {e}");
}
} else {
eprintln!(
"[pipeline] QA failed gates for '{story_id}'. Restarting."
);
let context = format!(
"\n\n---\n## Previous QA Attempt Failed\n\
The acceptance gates failed with the following output:\n{}\n\n\
Please re-run and fix the issues.",
completion.gate_output
);
if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), Some(&context))
.await
{
eprintln!("[pipeline] Failed to restart qa for '{story_id}': {e}");
}
}
}
PipelineStage::Mergemaster => {
// Run script/test on master (project_root) as the post-merge verification.
eprintln!(
"[pipeline] Mergemaster completed for '{story_id}'. Running post-merge tests on master."
);
let root = project_root.clone();
let test_result = tokio::task::spawn_blocking(move || run_project_tests(&root))
.await
.unwrap_or_else(|e| {
eprintln!("[pipeline] Post-merge test task panicked: {e}");
Ok((false, format!("Test task panicked: {e}")))
});
let (passed, output) = match test_result {
Ok(pair) => pair,
Err(e) => (false, e),
};
if passed {
eprintln!(
"[pipeline] Post-merge tests passed for '{story_id}'. Archiving."
);
if let Err(e) = move_story_to_archived(&project_root, story_id) {
eprintln!("[pipeline] Failed to archive '{story_id}': {e}");
}
let config =
crate::config::ProjectConfig::load(&project_root).unwrap_or_default();
if let Err(e) =
worktree::remove_worktree_by_story_id(&project_root, story_id, &config)
.await
{
eprintln!(
"[pipeline] Failed to remove worktree for '{story_id}': {e}"
);
}
eprintln!(
"[pipeline] Story '{story_id}' archived and worktree cleaned up."
);
} else {
eprintln!(
"[pipeline] Post-merge tests failed for '{story_id}'. Restarting mergemaster."
);
let context = format!(
"\n\n---\n## Post-Merge Test Failed\n\
The tests on master failed with the following output:\n{}\n\n\
Please investigate and resolve the failures, then call merge_agent_work again.",
output
);
if let Err(e) = self
.start_agent(&project_root, story_id, Some("mergemaster"), Some(&context))
.await
{
eprintln!(
"[pipeline] Failed to restart mergemaster for '{story_id}': {e}"
);
}
}
}
}
}
/// Report that an agent has finished work on a story.
///
/// - Rejects with an error if the worktree has uncommitted changes.
@@ -577,6 +791,21 @@ impl AgentPool {
session_id,
});
// Advance the pipeline state machine in a background task.
// Only advance when the agent completed (not failed) to avoid spurious restarts
// from agents that never ran acceptance gates properly.
let pool_clone = Self {
agents: Arc::clone(&self.agents),
port: self.port,
};
let sid = story_id.to_string();
let aname = agent_name.to_string();
tokio::spawn(async move {
pool_clone
.run_pipeline_advance_for_completed_agent(&sid, &aname)
.await;
});
Ok(report)
}
@@ -701,6 +930,7 @@ impl AgentPool {
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: None,
project_root: None,
},
);
tx
@@ -734,6 +964,38 @@ impl AgentPool {
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: None,
project_root: None,
},
);
tx
}
/// Test helper: inject an agent with a completion report and project_root
/// for testing pipeline advance logic without spawning real agents.
#[cfg(test)]
pub fn inject_test_agent_with_completion(
&self,
story_id: &str,
agent_name: &str,
status: AgentStatus,
project_root: PathBuf,
completion: CompletionReport,
) -> broadcast::Sender<AgentEvent> {
let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap();
agents.insert(
key,
StoryAgent {
agent_name: agent_name.to_string(),
status,
worktree_info: None,
session_id: None,
tx: tx.clone(),
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: Some(completion),
project_root: Some(project_root),
},
);
tx
@@ -861,13 +1123,14 @@ pub fn move_story_to_archived(project_root: &Path, story_id: &str) -> Result<(),
Ok(())
}
/// Move a story/bug from `work/2_current/` to `work/4_merge/` and auto-commit.
/// Move a story/bug from `work/2_current/` or `work/3_qa/` to `work/4_merge/`.
///
/// This stages a work item as ready for the mergemaster to pick up and merge into master.
/// Idempotent: if already in `4_merge/`, returns Ok without committing.
pub fn move_story_to_merge(project_root: &Path, story_id: &str) -> Result<(), String> {
let sk = project_root.join(".story_kit").join("work");
let current_path = sk.join("2_current").join(format!("{story_id}.md"));
let qa_path = sk.join("3_qa").join(format!("{story_id}.md"));
let merge_dir = sk.join("4_merge");
let merge_path = merge_dir.join(format!("{story_id}.md"));
@@ -876,18 +1139,28 @@ pub fn move_story_to_merge(project_root: &Path, story_id: &str) -> Result<(), St
return Ok(());
}
if !current_path.exists() {
// Accept from 2_current/ (manual trigger) or 3_qa/ (pipeline advancement from QA stage).
let source_path = if current_path.exists() {
current_path.clone()
} else if qa_path.exists() {
qa_path.clone()
} else {
return Err(format!(
"Work item '{story_id}' not found in work/2_current/. Cannot move to 4_merge/."
"Work item '{story_id}' not found in work/2_current/ or work/3_qa/. Cannot move to 4_merge/."
));
}
};
std::fs::create_dir_all(&merge_dir)
.map_err(|e| format!("Failed to create work/4_merge/ directory: {e}"))?;
std::fs::rename(&current_path, &merge_path)
std::fs::rename(&source_path, &merge_path)
.map_err(|e| format!("Failed to move '{story_id}' to 4_merge/: {e}"))?;
eprintln!("[lifecycle] Moved '{story_id}' from work/2_current/ to work/4_merge/");
let from_dir = if source_path == current_path {
"work/2_current/"
} else {
"work/3_qa/"
};
eprintln!("[lifecycle] Moved '{story_id}' from {from_dir} to work/4_merge/");
Ok(())
}
@@ -1692,6 +1965,145 @@ mod tests {
assert_eq!(item_type_from_id("1_story_simple"), "story");
}
// ── pipeline_stage tests ──────────────────────────────────────────────────
#[test]
fn pipeline_stage_detects_coders() {
assert_eq!(pipeline_stage("coder-1"), PipelineStage::Coder);
assert_eq!(pipeline_stage("coder-2"), PipelineStage::Coder);
assert_eq!(pipeline_stage("coder-3"), PipelineStage::Coder);
}
#[test]
fn pipeline_stage_detects_qa() {
assert_eq!(pipeline_stage("qa"), PipelineStage::Qa);
}
#[test]
fn pipeline_stage_detects_mergemaster() {
assert_eq!(pipeline_stage("mergemaster"), PipelineStage::Mergemaster);
}
#[test]
fn pipeline_stage_supervisor_is_other() {
assert_eq!(pipeline_stage("supervisor"), PipelineStage::Other);
assert_eq!(pipeline_stage("default"), PipelineStage::Other);
assert_eq!(pipeline_stage("unknown"), PipelineStage::Other);
}
// ── pipeline advance tests ────────────────────────────────────────────────
#[tokio::test]
async fn pipeline_advance_coder_gates_pass_moves_story_to_qa() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Set up story in 2_current/
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("50_story_test.md"), "test").unwrap();
let pool = AgentPool::new(3001);
pool.inject_test_agent_with_completion(
"50_story_test",
"coder-1",
AgentStatus::Completed,
root.to_path_buf(),
CompletionReport {
summary: "done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
);
// Call pipeline advance directly (bypasses background spawn for testing).
pool.run_pipeline_advance_for_completed_agent("50_story_test", "coder-1")
.await;
// Story should have moved to 3_qa/ (start_agent for qa will fail in tests but
// the file move happens before that).
assert!(
root.join(".story_kit/work/3_qa/50_story_test.md").exists(),
"story should be in 3_qa/"
);
assert!(
!current.join("50_story_test.md").exists(),
"story should not still be in 2_current/"
);
}
#[tokio::test]
async fn pipeline_advance_qa_gates_pass_moves_story_to_merge() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Set up story in 3_qa/
let qa_dir = root.join(".story_kit/work/3_qa");
fs::create_dir_all(&qa_dir).unwrap();
fs::write(qa_dir.join("51_story_test.md"), "test").unwrap();
let pool = AgentPool::new(3001);
pool.inject_test_agent_with_completion(
"51_story_test",
"qa",
AgentStatus::Completed,
root.to_path_buf(),
CompletionReport {
summary: "QA done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
);
pool.run_pipeline_advance_for_completed_agent("51_story_test", "qa")
.await;
// Story should have moved to 4_merge/
assert!(
root.join(".story_kit/work/4_merge/51_story_test.md").exists(),
"story should be in 4_merge/"
);
assert!(
!qa_dir.join("51_story_test.md").exists(),
"story should not still be in 3_qa/"
);
}
#[tokio::test]
async fn pipeline_advance_supervisor_does_not_advance() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("52_story_test.md"), "test").unwrap();
let pool = AgentPool::new(3001);
pool.inject_test_agent_with_completion(
"52_story_test",
"supervisor",
AgentStatus::Completed,
root.to_path_buf(),
CompletionReport {
summary: "supervised".to_string(),
gates_passed: true,
gate_output: String::new(),
},
);
pool.run_pipeline_advance_for_completed_agent("52_story_test", "supervisor")
.await;
// Story should NOT have moved (supervisors don't advance pipeline)
assert!(
current.join("52_story_test.md").exists(),
"story should still be in 2_current/ for supervisor"
);
}
// ── move_story_to_merge tests ──────────────────────────────────────────────
#[test]
@@ -1709,6 +2121,21 @@ mod tests {
assert!(root.join(".story_kit/work/4_merge/20_story_foo.md").exists());
}
#[test]
fn move_story_to_merge_from_qa_dir() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let qa_dir = root.join(".story_kit/work/3_qa");
fs::create_dir_all(&qa_dir).unwrap();
fs::write(qa_dir.join("40_story_test.md"), "test").unwrap();
move_story_to_merge(root, "40_story_test").unwrap();
assert!(!qa_dir.join("40_story_test.md").exists());
assert!(root.join(".story_kit/work/4_merge/40_story_test.md").exists());
}
#[test]
fn move_story_to_merge_idempotent_when_already_in_merge() {
use std::fs;
@@ -1723,10 +2150,10 @@ mod tests {
}
#[test]
fn move_story_to_merge_errors_when_not_in_current() {
fn move_story_to_merge_errors_when_not_in_current_or_qa() {
let tmp = tempfile::tempdir().unwrap();
let result = move_story_to_merge(tmp.path(), "99_nonexistent");
assert!(result.unwrap_err().contains("not found in work/2_current/"));
assert!(result.unwrap_err().contains("not found in work/2_current/ or work/3_qa/"));
}
// ── move_story_to_qa tests ────────────────────────────────────────────────