Files
huskies/server/src/agents/pool/pipeline.rs
T

1790 lines
67 KiB
Rust
Raw Normal View History

use crate::config::ProjectConfig;
use crate::slog;
use crate::slog_error;
use crate::slog_warn;
use crate::worktree;
use crate::io::watcher::WatcherEvent;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use super::super::{
AgentEvent, AgentStatus, CompletionReport, PipelineStage,
agent_config_stage, pipeline_stage,
};
use super::{AgentPool, StoryAgent, composite_key};
impl AgentPool {
/// Pipeline advancement: after an agent completes, move the story to
/// the next pipeline stage and start the appropriate agent.
pub(super) async fn run_pipeline_advance(
&self,
story_id: &str,
agent_name: &str,
completion: CompletionReport,
project_root: Option<PathBuf>,
worktree_path: Option<PathBuf>,
merge_failure_reported: bool,
) {
let project_root = match project_root {
Some(p) => p,
None => {
slog_warn!("[pipeline] No project_root for '{story_id}:{agent_name}'");
return;
}
};
let config = ProjectConfig::load(&project_root).unwrap_or_default();
let stage = config
.find_agent(agent_name)
.map(agent_config_stage)
.unwrap_or_else(|| pipeline_stage(agent_name));
match stage {
PipelineStage::Other => {
// Supervisors and unknown agents do not advance the pipeline.
}
PipelineStage::Coder => {
if completion.gates_passed {
// Determine effective QA mode for this story.
let qa_mode = {
let item_type = super::super::lifecycle::item_type_from_id(story_id);
if item_type == "spike" {
crate::io::story_metadata::QaMode::Human
} else {
let default_qa = config.default_qa_mode();
// Story is in 2_current/ when a coder completes.
let story_path = project_root
.join(".storkit/work/2_current")
.join(format!("{story_id}.md"));
crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa)
}
};
match qa_mode {
crate::io::story_metadata::QaMode::Server => {
slog!(
"[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \
qa: server — moving directly to merge."
);
if let Err(e) =
super::super::lifecycle::move_story_to_merge(&project_root, story_id)
{
slog_error!(
"[pipeline] Failed to move '{story_id}' to 4_merge/: {e}"
);
} else if let Err(e) = self
.start_agent(&project_root, story_id, Some("mergemaster"), None)
.await
{
slog_error!(
"[pipeline] Failed to start mergemaster for '{story_id}': {e}"
);
}
}
crate::io::story_metadata::QaMode::Agent => {
slog!(
"[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \
qa: agent — moving to QA."
);
if let Err(e) = super::super::lifecycle::move_story_to_qa(&project_root, story_id) {
slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}");
} else if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), None)
.await
{
slog_error!("[pipeline] Failed to start qa agent for '{story_id}': {e}");
}
}
crate::io::story_metadata::QaMode::Human => {
slog!(
"[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \
qa: human — holding for human review."
);
if let Err(e) = super::super::lifecycle::move_story_to_qa(&project_root, story_id) {
slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}");
} else {
let qa_dir = project_root.join(".storkit/work/3_qa");
let story_path = qa_dir.join(format!("{story_id}.md"));
if let Err(e) =
crate::io::story_metadata::write_review_hold(&story_path)
{
slog_error!(
"[pipeline] Failed to set review_hold on '{story_id}': {e}"
);
}
}
}
}
} else {
// Increment retry count and check if blocked.
let story_path = project_root
.join(".storkit/work/2_current")
.join(format!("{story_id}.md"));
if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "coder") {
// Story has exceeded retry limit — do not restart.
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
story_id: story_id.to_string(),
reason,
});
} else {
slog!(
"[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting."
);
let context = format!(
"\n\n---\n## Previous Attempt Failed\n\
The acceptance gates failed with the following output:\n{}\n\n\
Please review the failures above, fix the issues, and try again.",
completion.gate_output
);
if let Err(e) = self
.start_agent(&project_root, story_id, Some(agent_name), Some(&context))
.await
{
slog_error!(
"[pipeline] Failed to restart coder '{agent_name}' for '{story_id}': {e}"
);
}
}
}
}
PipelineStage::Qa => {
if completion.gates_passed {
// Run coverage gate in the QA worktree before advancing to merge.
let coverage_path = worktree_path
.clone()
.unwrap_or_else(|| project_root.clone());
let cp = coverage_path.clone();
let coverage_result =
tokio::task::spawn_blocking(move || super::super::gates::run_coverage_gate(&cp))
.await
.unwrap_or_else(|e| {
slog_warn!("[pipeline] Coverage gate task panicked: {e}");
Ok((false, format!("Coverage gate task panicked: {e}")))
});
let (coverage_passed, coverage_output) = match coverage_result {
Ok(pair) => pair,
Err(e) => (false, e),
};
if coverage_passed {
// Check whether this item needs human review before merging.
let needs_human_review = {
let item_type = super::super::lifecycle::item_type_from_id(story_id);
if item_type == "spike" {
true // Spikes always need human review.
} else {
let qa_dir = project_root.join(".storkit/work/3_qa");
let story_path = qa_dir.join(format!("{story_id}.md"));
let default_qa = config.default_qa_mode();
matches!(
crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa),
crate::io::story_metadata::QaMode::Human
)
}
};
if needs_human_review {
// Hold in 3_qa/ for human review.
let qa_dir = project_root.join(".storkit/work/3_qa");
let story_path = qa_dir.join(format!("{story_id}.md"));
if let Err(e) =
crate::io::story_metadata::write_review_hold(&story_path)
{
slog_error!(
"[pipeline] Failed to set review_hold on '{story_id}': {e}"
);
}
slog!(
"[pipeline] QA passed for '{story_id}'. \
Holding for human review. \
Worktree preserved at: {worktree_path:?}"
);
} else {
slog!(
"[pipeline] QA passed gates and coverage for '{story_id}'. \
Moving directly to merge."
);
if let Err(e) =
super::super::lifecycle::move_story_to_merge(&project_root, story_id)
{
slog_error!(
"[pipeline] Failed to move '{story_id}' to 4_merge/: {e}"
);
} else if let Err(e) = self
.start_agent(&project_root, story_id, Some("mergemaster"), None)
.await
{
slog_error!(
"[pipeline] Failed to start mergemaster for '{story_id}': {e}"
);
}
}
} else {
let story_path = project_root
.join(".storkit/work/3_qa")
.join(format!("{story_id}.md"));
if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "qa-coverage") {
// Story has exceeded retry limit — do not restart.
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
story_id: story_id.to_string(),
reason,
});
} else {
slog!(
"[pipeline] QA coverage gate failed for '{story_id}'. Restarting QA."
);
let context = format!(
"\n\n---\n## Coverage Gate Failed\n\
The coverage gate (script/test_coverage) failed with the following output:\n{}\n\n\
Please improve test coverage until the coverage gate passes.",
coverage_output
);
if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), Some(&context))
.await
{
slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}");
}
}
}
} else {
let story_path = project_root
.join(".storkit/work/3_qa")
.join(format!("{story_id}.md"));
if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "qa") {
// Story has exceeded retry limit — do not restart.
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
story_id: story_id.to_string(),
reason,
});
} else {
slog!("[pipeline] QA failed gates for '{story_id}'. Restarting.");
let context = format!(
"\n\n---\n## Previous QA Attempt Failed\n\
The acceptance gates failed with the following output:\n{}\n\n\
Please re-run and fix the issues.",
completion.gate_output
);
if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), Some(&context))
.await
{
slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}");
}
}
}
}
PipelineStage::Mergemaster => {
// Block advancement if the mergemaster explicitly reported a failure.
// The server-owned gate check runs in the feature-branch worktree (not
// master), so `gates_passed=true` is misleading when no code was merged.
if merge_failure_reported {
slog!(
"[pipeline] Pipeline advancement blocked for '{story_id}': \
mergemaster explicitly reported a merge failure. \
Story stays in 4_merge/ for human review."
);
} else {
// Run script/test on master (project_root) as the post-merge verification.
slog!(
"[pipeline] Mergemaster completed for '{story_id}'. Running post-merge tests on master."
);
let root = project_root.clone();
let test_result =
tokio::task::spawn_blocking(move || super::super::gates::run_project_tests(&root))
.await
.unwrap_or_else(|e| {
slog_warn!("[pipeline] Post-merge test task panicked: {e}");
Ok((false, format!("Test task panicked: {e}")))
});
let (passed, output) = match test_result {
Ok(pair) => pair,
Err(e) => (false, e),
};
if passed {
slog!(
"[pipeline] Post-merge tests passed for '{story_id}'. Moving to done."
);
if let Err(e) =
super::super::lifecycle::move_story_to_archived(&project_root, story_id)
{
slog_error!("[pipeline] Failed to move '{story_id}' to done: {e}");
}
self.remove_agents_for_story(story_id);
// TODO: Re-enable worktree cleanup once we have persistent agent logs.
// Removing worktrees destroys evidence needed to debug empty-commit agents.
// let config =
// crate::config::ProjectConfig::load(&project_root).unwrap_or_default();
// if let Err(e) =
// worktree::remove_worktree_by_story_id(&project_root, story_id, &config)
// .await
// {
// slog!(
// "[pipeline] Failed to remove worktree for '{story_id}': {e}"
// );
// }
slog!(
"[pipeline] Story '{story_id}' done. Worktree preserved for inspection."
);
} else {
let story_path = project_root
.join(".storkit/work/4_merge")
.join(format!("{story_id}.md"));
if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "mergemaster") {
// Story has exceeded retry limit — do not restart.
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
story_id: story_id.to_string(),
reason,
});
} else {
slog!(
"[pipeline] Post-merge tests failed for '{story_id}'. Restarting mergemaster."
);
let context = format!(
"\n\n---\n## Post-Merge Test Failed\n\
The tests on master failed with the following output:\n{}\n\n\
Please investigate and resolve the failures, then call merge_agent_work again.",
output
);
if let Err(e) = self
.start_agent(
&project_root,
story_id,
Some("mergemaster"),
Some(&context),
)
.await
{
slog_error!(
"[pipeline] Failed to restart mergemaster for '{story_id}': {e}"
);
}
}
}
}
}
}
// Always scan for unassigned work after any agent completes, regardless
// of the outcome (success, failure, restart). This ensures stories that
// failed agent assignment due to busy agents are retried when agents
// become available (bug 295).
self.auto_assign_available_work(&project_root).await;
}
/// Internal: report that an agent has finished work on a story.
///
/// **Note:** This is no longer exposed as an MCP tool. The server now
/// automatically runs completion gates when an agent process exits
/// (see `run_server_owned_completion`). This method is retained for
/// backwards compatibility and testing.
///
/// - Rejects with an error if the worktree has uncommitted changes.
/// - Runs acceptance gates (cargo clippy + cargo nextest run / cargo test).
/// - Stores the `CompletionReport` on the agent record.
/// - Transitions status to `Completed` (gates passed) or `Failed` (gates failed).
/// - Emits a `Done` event so `wait_for_agent` unblocks.
#[allow(dead_code)]
pub async fn report_completion(
&self,
story_id: &str,
agent_name: &str,
summary: &str,
) -> Result<CompletionReport, String> {
let key = composite_key(story_id, agent_name);
// Verify agent exists, is Running, and grab its worktree path.
let worktree_path = {
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
if agent.status != AgentStatus::Running {
return Err(format!(
"Agent '{agent_name}' for story '{story_id}' is not running (status: {}). \
report_completion can only be called by a running agent.",
agent.status
));
}
agent
.worktree_info
.as_ref()
.map(|wt| wt.path.clone())
.ok_or_else(|| {
format!(
"Agent '{agent_name}' for story '{story_id}' has no worktree. \
Cannot run acceptance gates."
)
})?
};
let path = worktree_path.clone();
// Run gate checks in a blocking thread to avoid stalling the async runtime.
let (gates_passed, gate_output) = tokio::task::spawn_blocking(move || {
// Step 1: Reject if worktree is dirty.
super::super::gates::check_uncommitted_changes(&path)?;
// Step 2: Run clippy + tests and return (passed, output).
super::super::gates::run_acceptance_gates(&path)
})
.await
.map_err(|e| format!("Gate check task panicked: {e}"))??;
let report = CompletionReport {
summary: summary.to_string(),
gates_passed,
gate_output,
};
// Extract data for pipeline advance, then remove the entry so
// completed agents never appear in list_agents.
let (
tx,
session_id,
project_root_for_advance,
wt_path_for_advance,
merge_failure_reported_for_advance,
) = {
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents.get_mut(&key).ok_or_else(|| {
format!("Agent '{agent_name}' for story '{story_id}' disappeared during gate check")
})?;
agent.completion = Some(report.clone());
let tx = agent.tx.clone();
let sid = agent.session_id.clone();
let pr = agent.project_root.clone();
let wt = agent.worktree_info.as_ref().map(|w| w.path.clone());
let mfr = agent.merge_failure_reported;
agents.remove(&key);
(tx, sid, pr, wt, mfr)
};
// Emit Done so wait_for_agent unblocks.
let _ = tx.send(AgentEvent::Done {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
session_id,
});
// Notify WebSocket clients that the agent is gone.
Self::notify_agent_state_changed(&self.watcher_tx);
// Advance the pipeline state machine in a background task.
let pool_clone = Self {
agents: Arc::clone(&self.agents),
port: self.port,
child_killers: Arc::clone(&self.child_killers),
watcher_tx: self.watcher_tx.clone(),
merge_jobs: Arc::clone(&self.merge_jobs),
};
let sid = story_id.to_string();
let aname = agent_name.to_string();
let report_for_advance = report.clone();
tokio::spawn(async move {
pool_clone
.run_pipeline_advance(
&sid,
&aname,
report_for_advance,
project_root_for_advance,
wt_path_for_advance,
merge_failure_reported_for_advance,
)
.await;
});
Ok(report)
}
/// Start the merge pipeline as a background task.
///
/// Returns immediately so the MCP tool call doesn't time out (the full
/// pipeline — squash merge + quality gates — takes well over 60 seconds,
/// exceeding Claude Code's MCP tool-call timeout).
///
/// The mergemaster agent should poll [`get_merge_status`](Self::get_merge_status)
/// until the job reaches a terminal state.
pub fn start_merge_agent_work(
self: &Arc<Self>,
project_root: &Path,
story_id: &str,
) -> Result<(), String> {
// Guard against double-starts.
{
let jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?;
if let Some(job) = jobs.get(story_id)
&& matches!(job.status, super::super::merge::MergeJobStatus::Running)
{
return Err(format!(
"Merge already in progress for '{story_id}'. \
Use get_merge_status to poll for completion."
));
}
}
// Insert Running job.
{
let mut jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?;
jobs.insert(
story_id.to_string(),
super::super::merge::MergeJob {
story_id: story_id.to_string(),
status: super::super::merge::MergeJobStatus::Running,
},
);
}
let pool = Arc::clone(self);
let root = project_root.to_path_buf();
let sid = story_id.to_string();
tokio::spawn(async move {
let report = pool.run_merge_pipeline(&root, &sid).await;
let failed = report.is_err();
let status = match report {
Ok(r) => super::super::merge::MergeJobStatus::Completed(r),
Err(e) => super::super::merge::MergeJobStatus::Failed(e),
};
if let Ok(mut jobs) = pool.merge_jobs.lock()
&& let Some(job) = jobs.get_mut(&sid)
{
job.status = status;
}
if failed {
pool.auto_assign_available_work(&root).await;
}
});
Ok(())
}
/// The actual merge pipeline, run inside a background task.
async fn run_merge_pipeline(
self: &Arc<Self>,
project_root: &Path,
story_id: &str,
) -> Result<super::super::merge::MergeReport, String> {
let branch = format!("feature/story-{story_id}");
let wt_path = worktree::worktree_path(project_root, story_id);
let root = project_root.to_path_buf();
let sid = story_id.to_string();
let br = branch.clone();
let merge_result =
tokio::task::spawn_blocking(move || super::super::merge::run_squash_merge(&root, &br, &sid))
.await
.map_err(|e| format!("Merge task panicked: {e}"))??;
if !merge_result.success {
return Ok(super::super::merge::MergeReport {
story_id: story_id.to_string(),
success: false,
had_conflicts: merge_result.had_conflicts,
conflicts_resolved: merge_result.conflicts_resolved,
conflict_details: merge_result.conflict_details,
gates_passed: merge_result.gates_passed,
gate_output: merge_result.output,
worktree_cleaned_up: false,
story_archived: false,
});
}
let story_archived =
super::super::lifecycle::move_story_to_archived(project_root, story_id).is_ok();
if story_archived {
self.remove_agents_for_story(story_id);
}
let worktree_cleaned_up = if wt_path.exists() {
let config = crate::config::ProjectConfig::load(project_root).unwrap_or_default();
worktree::remove_worktree_by_story_id(project_root, story_id, &config)
.await
.is_ok()
} else {
false
};
self.auto_assign_available_work(project_root).await;
Ok(super::super::merge::MergeReport {
story_id: story_id.to_string(),
success: true,
had_conflicts: merge_result.had_conflicts,
conflicts_resolved: merge_result.conflicts_resolved,
conflict_details: merge_result.conflict_details,
gates_passed: true,
gate_output: merge_result.output,
worktree_cleaned_up,
story_archived,
})
}
/// Check the status of a background merge job.
pub fn get_merge_status(&self, story_id: &str) -> Option<super::super::merge::MergeJob> {
self.merge_jobs
.lock()
.ok()
.and_then(|jobs| jobs.get(story_id).cloned())
}
/// Record that the mergemaster agent for `story_id` explicitly reported a
/// merge failure via the `report_merge_failure` MCP tool.
///
/// Sets `merge_failure_reported = true` on the active mergemaster agent so
/// that `run_pipeline_advance` can block advancement to `5_done/` even when
/// the server-owned gate check returns `gates_passed=true` (those gates run
/// in the feature-branch worktree, not on master).
pub fn set_merge_failure_reported(&self, story_id: &str) {
match self.agents.lock() {
Ok(mut lock) => {
let found = lock.iter_mut().find(|(key, agent)| {
let key_story_id = key
.rsplit_once(':')
.map(|(sid, _)| sid)
.unwrap_or(key.as_str());
key_story_id == story_id
&& pipeline_stage(&agent.agent_name) == PipelineStage::Mergemaster
});
match found {
Some((_, agent)) => {
agent.merge_failure_reported = true;
slog!(
"[pipeline] Merge failure flag set for '{story_id}:{}'",
agent.agent_name
);
}
None => {
slog_warn!(
"[pipeline] set_merge_failure_reported: no running mergemaster found \
for story '{story_id}' — flag not set"
);
}
}
}
Err(e) => {
slog_error!("[pipeline] set_merge_failure_reported: could not lock agents: {e}");
}
}
}
}
/// Server-owned completion: runs acceptance gates when an agent process exits
/// normally, and advances the pipeline based on results.
///
/// This is a **free function** (not a method on `AgentPool`) to break the
/// opaque type cycle that would otherwise arise: `start_agent` → spawned task
/// → server-owned completion → pipeline advance → `start_agent`.
///
/// If the agent already has a completion report (e.g. from a legacy
/// `report_completion` call), this is a no-op to avoid double-running gates.
pub(super) async fn run_server_owned_completion(
agents: &Arc<Mutex<HashMap<String, StoryAgent>>>,
port: u16,
story_id: &str,
agent_name: &str,
session_id: Option<String>,
watcher_tx: broadcast::Sender<WatcherEvent>,
) {
let key = composite_key(story_id, agent_name);
// Guard: skip if completion was already recorded (legacy path).
{
let lock = match agents.lock() {
Ok(a) => a,
Err(_) => return,
};
match lock.get(&key) {
Some(agent) if agent.completion.is_some() => {
slog!(
"[agents] Completion already recorded for '{story_id}:{agent_name}'; \
skipping server-owned gates."
);
return;
}
Some(_) => {}
None => return,
}
}
// Get worktree path for running gates.
let worktree_path = {
let lock = match agents.lock() {
Ok(a) => a,
Err(_) => return,
};
lock.get(&key)
.and_then(|a| a.worktree_info.as_ref().map(|wt| wt.path.clone()))
};
// Run acceptance gates.
let (gates_passed, gate_output) = if let Some(wt_path) = worktree_path {
let path = wt_path;
match tokio::task::spawn_blocking(move || {
super::super::gates::check_uncommitted_changes(&path)?;
// AC5: Fail early if the coder finished with no commits on the feature branch.
// This prevents empty-diff stories from advancing through QA to merge.
if !super::super::gates::worktree_has_committed_work(&path) {
return Ok((
false,
"Agent exited with no commits on the feature branch. \
The agent did not produce any code changes."
.to_string(),
));
}
super::super::gates::run_acceptance_gates(&path)
})
.await
{
Ok(Ok(result)) => result,
Ok(Err(e)) => (false, e),
Err(e) => (false, format!("Gate check task panicked: {e}")),
}
} else {
(
false,
"No worktree path available to run acceptance gates".to_string(),
)
};
slog!(
"[agents] Server-owned completion for '{story_id}:{agent_name}': gates_passed={gates_passed}"
);
let report = CompletionReport {
summary: "Agent process exited normally".to_string(),
gates_passed,
gate_output,
};
// Store completion report, extract data for pipeline advance, then
// remove the entry so completed agents never appear in list_agents.
let (tx, project_root_for_advance, wt_path_for_advance, merge_failure_reported_for_advance) = {
let mut lock = match agents.lock() {
Ok(a) => a,
Err(_) => return,
};
let agent = match lock.get_mut(&key) {
Some(a) => a,
None => return,
};
agent.completion = Some(report.clone());
agent.session_id = session_id.clone();
let tx = agent.tx.clone();
let pr = agent.project_root.clone();
let wt = agent.worktree_info.as_ref().map(|w| w.path.clone());
let mfr = agent.merge_failure_reported;
lock.remove(&key);
(tx, pr, wt, mfr)
};
// Emit Done so wait_for_agent unblocks.
let _ = tx.send(AgentEvent::Done {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
session_id,
});
// Notify WebSocket clients that the agent is gone.
AgentPool::notify_agent_state_changed(&watcher_tx);
// Advance the pipeline state machine in a background task.
spawn_pipeline_advance(
Arc::clone(agents),
port,
story_id,
agent_name,
report,
project_root_for_advance,
wt_path_for_advance,
watcher_tx,
merge_failure_reported_for_advance,
);
}
/// Spawn pipeline advancement as a background task.
///
/// This is a **non-async** function so it does not participate in the opaque
/// type cycle between `start_agent` and `run_server_owned_completion`.
#[allow(clippy::too_many_arguments)]
fn spawn_pipeline_advance(
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
port: u16,
story_id: &str,
agent_name: &str,
completion: CompletionReport,
project_root: Option<PathBuf>,
worktree_path: Option<PathBuf>,
watcher_tx: broadcast::Sender<WatcherEvent>,
merge_failure_reported: bool,
) {
let sid = story_id.to_string();
let aname = agent_name.to_string();
tokio::spawn(async move {
let pool = AgentPool {
agents,
port,
child_killers: Arc::new(Mutex::new(HashMap::new())),
watcher_tx,
merge_jobs: Arc::new(Mutex::new(HashMap::new())),
};
pool.run_pipeline_advance(
&sid,
&aname,
completion,
project_root,
worktree_path,
merge_failure_reported,
)
.await;
});
}
/// Increment retry_count and block the story if it exceeds `max_retries`.
///
/// Returns `Some(reason)` if the story is now blocked (caller should NOT restart the agent).
/// Returns `None` if the story may be retried.
/// When `max_retries` is 0, retry limits are disabled.
fn should_block_story(story_path: &Path, max_retries: u32, story_id: &str, stage_label: &str) -> Option<String> {
use crate::io::story_metadata::{increment_retry_count, write_blocked};
if max_retries == 0 {
// Retry limits disabled.
return None;
}
match increment_retry_count(story_path) {
Ok(new_count) => {
if new_count >= max_retries {
slog_warn!(
"[pipeline] Story '{story_id}' reached retry limit ({new_count}/{max_retries}) \
at {stage_label} stage. Marking as blocked."
);
if let Err(e) = write_blocked(story_path) {
slog_error!("[pipeline] Failed to write blocked flag for '{story_id}': {e}");
}
Some(format!(
"Retry limit exceeded ({new_count}/{max_retries}) at {stage_label} stage"
))
} else {
slog!(
"[pipeline] Story '{story_id}' retry {new_count}/{max_retries} at {stage_label} stage."
);
None
}
}
Err(e) => {
slog_error!("[pipeline] Failed to increment retry_count for '{story_id}': {e}");
None // Don't block on error — allow retry.
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::AgentPool;
use crate::agents::merge::{MergeJob, MergeJobStatus};
use crate::agents::{AgentEvent, AgentStatus, CompletionReport};
use crate::io::watcher::WatcherEvent;
use std::path::PathBuf;
use std::process::Command;
fn init_git_repo(repo: &std::path::Path) {
Command::new("git")
.args(["init"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.email", "test@test.com"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.name", "Test"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "--allow-empty", "-m", "init"])
.current_dir(repo)
.output()
.unwrap();
}
// ── report_completion tests ────────────────────────────────────
#[tokio::test]
async fn report_completion_rejects_nonexistent_agent() {
let pool = AgentPool::new_test(3001);
let result = pool.report_completion("no_story", "no_bot", "done").await;
assert!(result.is_err());
let msg = result.unwrap_err();
assert!(msg.contains("No agent"), "unexpected: {msg}");
}
#[tokio::test]
async fn report_completion_rejects_non_running_agent() {
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("s6", "bot", AgentStatus::Completed);
let result = pool.report_completion("s6", "bot", "done").await;
assert!(result.is_err());
let msg = result.unwrap_err();
assert!(
msg.contains("not running"),
"expected 'not running' in: {msg}"
);
}
#[tokio::test]
async fn report_completion_rejects_dirty_worktree() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
// Init a real git repo and make an initial commit
Command::new("git")
.args(["init"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "--allow-empty", "-m", "init"])
.current_dir(repo)
.output()
.unwrap();
// Write an uncommitted file
fs::write(repo.join("dirty.txt"), "not committed").unwrap();
let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_path("s7", "bot", AgentStatus::Running, repo.to_path_buf());
let result = pool.report_completion("s7", "bot", "done").await;
assert!(result.is_err());
let msg = result.unwrap_err();
assert!(
msg.contains("uncommitted"),
"expected 'uncommitted' in: {msg}"
);
}
// ── server-owned completion tests ───────────────────────────────────────────
#[tokio::test]
async fn server_owned_completion_skips_when_already_completed() {
let pool = AgentPool::new_test(3001);
let report = CompletionReport {
summary: "Already done".to_string(),
gates_passed: true,
gate_output: String::new(),
};
pool.inject_test_agent_with_completion(
"s10",
"coder-1",
AgentStatus::Completed,
PathBuf::from("/tmp/nonexistent"),
report,
);
// Subscribe before calling so we can check if Done event was emitted.
let mut rx = pool.subscribe("s10", "coder-1").unwrap();
run_server_owned_completion(
&pool.agents,
pool.port,
"s10",
"coder-1",
Some("sess-1".to_string()),
pool.watcher_tx.clone(),
)
.await;
// Status should remain Completed (unchanged) — no gate re-run.
let agents = pool.agents.lock().unwrap();
let key = composite_key("s10", "coder-1");
let agent = agents.get(&key).unwrap();
assert_eq!(agent.status, AgentStatus::Completed);
// Summary should still be the original, not overwritten.
assert_eq!(agent.completion.as_ref().unwrap().summary, "Already done");
drop(agents);
// No Done event should have been emitted.
assert!(
rx.try_recv().is_err(),
"should not emit Done when completion already exists"
);
}
#[tokio::test]
async fn server_owned_completion_runs_gates_on_clean_worktree() {
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_path(
"s11",
"coder-1",
AgentStatus::Running,
repo.to_path_buf(),
);
let mut rx = pool.subscribe("s11", "coder-1").unwrap();
run_server_owned_completion(
&pool.agents,
pool.port,
"s11",
"coder-1",
Some("sess-2".to_string()),
pool.watcher_tx.clone(),
)
.await;
// Agent entry should be removed from the map after completion.
let agents = pool.agents.lock().unwrap();
let key = composite_key("s11", "coder-1");
assert!(
agents.get(&key).is_none(),
"agent should be removed from map after completion"
);
drop(agents);
// A Done event should have been emitted with the session_id.
let event = rx.try_recv().expect("should emit Done event");
match &event {
AgentEvent::Done { session_id, .. } => {
assert_eq!(*session_id, Some("sess-2".to_string()));
}
other => panic!("expected Done event, got: {other:?}"),
}
}
#[tokio::test]
async fn server_owned_completion_fails_on_dirty_worktree() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
// Create an uncommitted file.
fs::write(repo.join("dirty.txt"), "not committed").unwrap();
let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_path(
"s12",
"coder-1",
AgentStatus::Running,
repo.to_path_buf(),
);
let mut rx = pool.subscribe("s12", "coder-1").unwrap();
run_server_owned_completion(
&pool.agents,
pool.port,
"s12",
"coder-1",
None,
pool.watcher_tx.clone(),
)
.await;
// Agent entry should be removed from the map after completion (even on failure).
let agents = pool.agents.lock().unwrap();
let key = composite_key("s12", "coder-1");
assert!(
agents.get(&key).is_none(),
"agent should be removed from map after failed completion"
);
drop(agents);
// A Done event should have been emitted.
let event = rx.try_recv().expect("should emit Done event");
assert!(
matches!(event, AgentEvent::Done { .. }),
"expected Done event, got: {event:?}"
);
}
#[tokio::test]
async fn server_owned_completion_nonexistent_agent_is_noop() {
let pool = AgentPool::new_test(3001);
// Should not panic or error — just silently return.
run_server_owned_completion(
&pool.agents,
pool.port,
"nonexistent",
"bot",
None,
pool.watcher_tx.clone(),
)
.await;
}
// ── pipeline advance tests ────────────────────────────────────────────────
#[tokio::test]
async fn pipeline_advance_coder_gates_pass_server_qa_moves_to_merge() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Set up story in 2_current/ (no qa frontmatter → uses project default "server")
let current = root.join(".storkit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("50_story_test.md"), "test").unwrap();
let pool = AgentPool::new_test(3001);
pool.run_pipeline_advance(
"50_story_test",
"coder-1",
CompletionReport {
summary: "done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
false,
)
.await;
// With default qa: server, story skips QA and goes straight to 4_merge/
assert!(
root.join(".storkit/work/4_merge/50_story_test.md")
.exists(),
"story should be in 4_merge/"
);
assert!(
!current.join("50_story_test.md").exists(),
"story should not still be in 2_current/"
);
}
#[tokio::test]
async fn pipeline_advance_coder_gates_pass_agent_qa_moves_to_qa() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Set up story in 2_current/ with qa: agent frontmatter
let current = root.join(".storkit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(
current.join("50_story_test.md"),
"---\nname: Test\nqa: agent\n---\ntest",
)
.unwrap();
let pool = AgentPool::new_test(3001);
pool.run_pipeline_advance(
"50_story_test",
"coder-1",
CompletionReport {
summary: "done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
false,
)
.await;
// With qa: agent, story should move to 3_qa/
assert!(
root.join(".storkit/work/3_qa/50_story_test.md").exists(),
"story should be in 3_qa/"
);
assert!(
!current.join("50_story_test.md").exists(),
"story should not still be in 2_current/"
);
}
#[tokio::test]
async fn pipeline_advance_qa_gates_pass_moves_story_to_merge() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Set up story in 3_qa/
let qa_dir = root.join(".storkit/work/3_qa");
fs::create_dir_all(&qa_dir).unwrap();
// qa: server so the story skips human review and goes straight to merge.
fs::write(
qa_dir.join("51_story_test.md"),
"---\nname: Test\nqa: server\n---\ntest",
)
.unwrap();
let pool = AgentPool::new_test(3001);
pool.run_pipeline_advance(
"51_story_test",
"qa",
CompletionReport {
summary: "QA done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
false,
)
.await;
// Story should have moved to 4_merge/
assert!(
root.join(".storkit/work/4_merge/51_story_test.md")
.exists(),
"story should be in 4_merge/"
);
assert!(
!qa_dir.join("51_story_test.md").exists(),
"story should not still be in 3_qa/"
);
}
#[tokio::test]
async fn pipeline_advance_supervisor_does_not_advance() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let current = root.join(".storkit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("52_story_test.md"), "test").unwrap();
let pool = AgentPool::new_test(3001);
pool.run_pipeline_advance(
"52_story_test",
"supervisor",
CompletionReport {
summary: "supervised".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
false,
)
.await;
// Story should NOT have moved (supervisors don't advance pipeline)
assert!(
current.join("52_story_test.md").exists(),
"story should still be in 2_current/ for supervisor"
);
}
#[tokio::test]
async fn pipeline_advance_sends_agent_state_changed_to_watcher_tx() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Set up story in 2_current/
let current = root.join(".storkit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("173_story_test.md"), "test").unwrap();
// Ensure 3_qa/ exists for the move target
fs::create_dir_all(root.join(".storkit/work/3_qa")).unwrap();
// Ensure 1_backlog/ exists (start_agent calls move_story_to_current)
fs::create_dir_all(root.join(".storkit/work/1_backlog")).unwrap();
// Write a project.toml with a qa agent so start_agent can resolve it.
fs::create_dir_all(root.join(".storkit")).unwrap();
fs::write(
root.join(".storkit/project.toml"),
r#"
default_qa = "agent"
[[agent]]
name = "coder-1"
role = "Coder"
command = "echo"
args = ["noop"]
prompt = "test"
stage = "coder"
[[agent]]
name = "qa"
role = "QA"
command = "echo"
args = ["noop"]
prompt = "test"
stage = "qa"
"#,
)
.unwrap();
let pool = AgentPool::new_test(3001);
// Subscribe to the watcher channel BEFORE the pipeline advance.
let mut rx = pool.watcher_tx.subscribe();
pool.run_pipeline_advance(
"173_story_test",
"coder-1",
CompletionReport {
summary: "done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
false,
)
.await;
// The pipeline advance should have sent AgentStateChanged events via
// the pool's watcher_tx (not a dummy channel). Collect all events.
let mut got_agent_state_changed = false;
while let Ok(evt) = rx.try_recv() {
if matches!(evt, WatcherEvent::AgentStateChanged) {
got_agent_state_changed = true;
break;
}
}
assert!(
got_agent_state_changed,
"pipeline advance should send AgentStateChanged through the real watcher_tx \
(bug 173: lozenges must update when agents are assigned during pipeline advance)"
);
}
// ── merge_agent_work tests ────────────────────────────────────────────────
/// Helper: start a merge and poll until terminal state.
async fn run_merge_to_completion(
pool: &Arc<AgentPool>,
repo: &std::path::Path,
story_id: &str,
) -> MergeJob {
pool.start_merge_agent_work(repo, story_id).unwrap();
loop {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
if let Some(job) = pool.get_merge_status(story_id)
&& !matches!(job.status, MergeJobStatus::Running)
{
return job;
}
}
}
#[tokio::test]
async fn merge_agent_work_returns_error_when_branch_not_found() {
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
let pool = Arc::new(AgentPool::new_test(3001));
let job = run_merge_to_completion(&pool, repo, "99_nonexistent").await;
match &job.status {
MergeJobStatus::Completed(report) => {
assert!(!report.success, "should fail when branch missing");
}
MergeJobStatus::Failed(_) => {
// Also acceptable — the pipeline errored out
}
MergeJobStatus::Running => {
panic!("should not still be running");
}
}
}
#[tokio::test]
async fn merge_agent_work_succeeds_on_clean_branch() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
// Create a feature branch with a commit
Command::new("git")
.args(["checkout", "-b", "feature/story-23_test"])
.current_dir(repo)
.output()
.unwrap();
fs::write(repo.join("feature.txt"), "feature content").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "add feature"])
.current_dir(repo)
.output()
.unwrap();
// Switch back to master (initial branch)
Command::new("git")
.args(["checkout", "master"])
.current_dir(repo)
.output()
.unwrap();
// Create the story file in 4_merge/ so we can test archival
let merge_dir = repo.join(".storkit/work/4_merge");
fs::create_dir_all(&merge_dir).unwrap();
let story_file = merge_dir.join("23_test.md");
fs::write(&story_file, "---\nname: Test\n---\n").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "add story in merge"])
.current_dir(repo)
.output()
.unwrap();
let pool = Arc::new(AgentPool::new_test(3001));
let job = run_merge_to_completion(&pool, repo, "23_test").await;
match &job.status {
MergeJobStatus::Completed(report) => {
assert!(!report.had_conflicts, "should have no conflicts");
assert!(
report.success
|| report.gate_output.contains("Failed to run")
|| !report.gates_passed,
"report should be coherent: {report:?}"
);
if report.story_archived {
let done = repo.join(".storkit/work/5_done/23_test.md");
assert!(done.exists(), "done file should exist");
}
}
MergeJobStatus::Failed(e) => {
// Gate failures are acceptable in test env
assert!(
e.contains("Failed") || e.contains("failed"),
"unexpected failure: {e}"
);
}
MergeJobStatus::Running => panic!("should not still be running"),
}
}
// ── quality gate ordering test ────────────────────────────────
/// Regression test for bug 142: quality gates must run BEFORE the fast-forward
/// to master so that broken code never lands on master.
#[cfg(unix)]
#[test]
fn quality_gates_run_before_fast_forward_to_master() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
// Add a failing script/test so quality gates will fail.
let script_dir = repo.join("script");
fs::create_dir_all(&script_dir).unwrap();
let script_test = script_dir.join("test");
fs::write(&script_test, "#!/usr/bin/env bash\nexit 1\n").unwrap();
let mut perms = fs::metadata(&script_test).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script_test, perms).unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "add failing script/test"])
.current_dir(repo)
.output()
.unwrap();
// Create a feature branch with a commit.
Command::new("git")
.args(["checkout", "-b", "feature/story-142_test"])
.current_dir(repo)
.output()
.unwrap();
fs::write(repo.join("change.txt"), "feature change").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "feature work"])
.current_dir(repo)
.output()
.unwrap();
// Switch back to master and record its HEAD.
Command::new("git")
.args(["checkout", "master"])
.current_dir(repo)
.output()
.unwrap();
let head_before = String::from_utf8(
Command::new("git")
.args(["rev-parse", "HEAD"])
.current_dir(repo)
.output()
.unwrap()
.stdout,
)
.unwrap()
.trim()
.to_string();
// Run the squash-merge. The failing script/test makes quality gates
// fail → fast-forward must NOT happen.
let result =
crate::agents::merge::run_squash_merge(repo, "feature/story-142_test", "142_test")
.unwrap();
let head_after = String::from_utf8(
Command::new("git")
.args(["rev-parse", "HEAD"])
.current_dir(repo)
.output()
.unwrap()
.stdout,
)
.unwrap()
.trim()
.to_string();
// Gates must have failed (script/test exits 1) so master should be untouched.
assert!(
!result.success,
"run_squash_merge must report failure when gates fail"
);
assert_eq!(
head_before, head_after,
"master HEAD must not advance when quality gates fail (bug 142)"
);
}
#[tokio::test]
async fn merge_agent_work_conflict_does_not_break_master() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
// Create a file on master.
fs::write(
repo.join("code.rs"),
"fn main() {\n println!(\"hello\");\n}\n",
)
.unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "initial code"])
.current_dir(repo)
.output()
.unwrap();
// Feature branch: modify the same line differently.
Command::new("git")
.args(["checkout", "-b", "feature/story-42_story_foo"])
.current_dir(repo)
.output()
.unwrap();
fs::write(
repo.join("code.rs"),
"fn main() {\n println!(\"hello\");\n feature_fn();\n}\n",
)
.unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "feature: add fn call"])
.current_dir(repo)
.output()
.unwrap();
// Master: add different line at same location.
Command::new("git")
.args(["checkout", "master"])
.current_dir(repo)
.output()
.unwrap();
fs::write(
repo.join("code.rs"),
"fn main() {\n println!(\"hello\");\n master_fn();\n}\n",
)
.unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "master: add fn call"])
.current_dir(repo)
.output()
.unwrap();
// Create story file in 4_merge.
let merge_dir = repo.join(".storkit/work/4_merge");
fs::create_dir_all(&merge_dir).unwrap();
fs::write(merge_dir.join("42_story_foo.md"), "---\nname: Test\n---\n").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "add story"])
.current_dir(repo)
.output()
.unwrap();
let pool = Arc::new(AgentPool::new_test(3001));
let job = run_merge_to_completion(&pool, repo, "42_story_foo").await;
// Master should NEVER have conflict markers, regardless of merge outcome.
let master_code = fs::read_to_string(repo.join("code.rs")).unwrap();
assert!(
!master_code.contains("<<<<<<<"),
"master must never contain conflict markers:\n{master_code}"
);
assert!(
!master_code.contains(">>>>>>>"),
"master must never contain conflict markers:\n{master_code}"
);
// The report should accurately reflect what happened.
match &job.status {
MergeJobStatus::Completed(report) => {
assert!(report.had_conflicts, "should report conflicts");
}
MergeJobStatus::Failed(_) => {
// Acceptable — merge aborted due to conflicts
}
MergeJobStatus::Running => panic!("should not still be running"),
}
}
// ── bug 295: pipeline advance picks up waiting QA stories ──────────
#[tokio::test]
async fn pipeline_advance_picks_up_waiting_qa_stories_after_completion() {
use std::fs;
use super::super::auto_assign::is_agent_free;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let sk = root.join(".storkit");
let qa_dir = sk.join("work/3_qa");
fs::create_dir_all(&qa_dir).unwrap();
// Configure a single QA agent.
fs::write(
sk.join("project.toml"),
r#"
[[agent]]
name = "qa"
stage = "qa"
"#,
)
.unwrap();
// Story 292 is in QA with QA agent running (will "complete" via
// run_pipeline_advance below). Story 293 is in QA with NO agent —
// simulating the "stuck" state from bug 295.
fs::write(
qa_dir.join("292_story_first.md"),
"---\nname: First\nqa: human\n---\n",
)
.unwrap();
fs::write(
qa_dir.join("293_story_second.md"),
"---\nname: Second\nqa: human\n---\n",
)
.unwrap();
let pool = AgentPool::new_test(3001);
// QA is currently running on story 292.
pool.inject_test_agent("292_story_first", "qa", AgentStatus::Running);
// Verify that 293 cannot get a QA agent right now (QA is busy).
{
let agents = pool.agents.lock().unwrap();
assert!(
!is_agent_free(&agents, "qa"),
"qa should be busy on story 292"
);
}
// Simulate QA completing on story 292: remove the agent from the pool
// (as run_server_owned_completion does) then run pipeline advance.
{
let mut agents = pool.agents.lock().unwrap();
agents.remove(&composite_key("292_story_first", "qa"));
}
pool.run_pipeline_advance(
"292_story_first",
"qa",
CompletionReport {
summary: "QA done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
false,
)
.await;
// After pipeline advance, auto_assign should have started QA on story 293.
let agents = pool.agents.lock().unwrap();
let qa_on_293 = agents.values().any(|a| {
a.agent_name == "qa"
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
});
assert!(
qa_on_293,
"auto_assign should have started qa for story 293 after 292's QA completed, \
but no qa agent is pending/running. Pool: {:?}",
agents
.iter()
.map(|(k, a)| format!("{k}: {} ({})", a.agent_name, a.status))
.collect::<Vec<_>>()
);
}
}