Files
huskies/server/src/agents/pool/pipeline/advance.rs
T

1059 lines
42 KiB
Rust
Raw Normal View History

//! Pipeline advance — moves stories forward through pipeline stages after agent completion.
use crate::config::ProjectConfig;
use crate::io::watcher::WatcherEvent;
use crate::slog;
use crate::slog_error;
use crate::slog_warn;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use super::super::super::{CompletionReport, PipelineStage, agent_config_stage, pipeline_stage};
use super::super::{AgentPool, StoryAgent};
impl AgentPool {
/// Pipeline advancement: after an agent completes, move the story to
/// the next pipeline stage and start the appropriate agent.
#[allow(clippy::too_many_arguments)]
pub(super) async fn run_pipeline_advance(
&self,
story_id: &str,
agent_name: &str,
completion: CompletionReport,
project_root: Option<PathBuf>,
worktree_path: Option<PathBuf>,
merge_failure_reported: bool,
previous_session_id: Option<String>,
) {
let project_root = match project_root {
Some(p) => p,
None => {
slog_warn!("[pipeline] No project_root for '{story_id}:{agent_name}'");
return;
}
};
let config = ProjectConfig::load(&project_root).unwrap_or_default();
let stage = config
.find_agent(agent_name)
.map(agent_config_stage)
.unwrap_or_else(|| pipeline_stage(agent_name));
match stage {
PipelineStage::Other => {
// Supervisors and unknown agents do not advance the pipeline.
}
PipelineStage::Coder => {
if completion.gates_passed {
// Determine effective QA mode for this story.
let qa_mode = {
let item_type = crate::agents::lifecycle::item_type_from_id(story_id);
if item_type == "spike" {
crate::io::story_metadata::QaMode::Human
} else {
let default_qa = config.default_qa_mode();
resolve_qa_mode_from_store(&project_root, story_id, default_qa)
}
};
match qa_mode {
crate::io::story_metadata::QaMode::Server => {
slog!(
"[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \
qa: server — moving directly to merge."
);
if let Err(e) = crate::agents::lifecycle::move_story_to_merge(
&project_root,
story_id,
) {
slog_error!(
"[pipeline] Failed to move '{story_id}' to 4_merge/: {e}"
);
} else {
self.start_mergemaster_or_block(&project_root, story_id)
.await;
}
}
crate::io::story_metadata::QaMode::Agent => {
slog!(
"[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \
qa: agent — moving to QA."
);
if let Err(e) =
crate::agents::lifecycle::move_story_to_qa(&project_root, story_id)
{
slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}");
} else if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), None, None)
.await
{
slog_error!(
"[pipeline] Failed to start qa agent for '{story_id}': {e}"
);
}
}
crate::io::story_metadata::QaMode::Human => {
slog!(
"[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \
qa: human — holding for human review."
);
if let Err(e) =
crate::agents::lifecycle::move_story_to_qa(&project_root, story_id)
{
slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}");
} else {
write_review_hold_to_store(story_id);
}
}
}
} else {
// Increment retry count and check if blocked.
if let Some(reason) = should_block_story(story_id, config.max_retries, "coder")
{
// Story has exceeded retry limit — do not restart.
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
story_id: story_id.to_string(),
reason,
});
} else {
slog!(
"[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting."
);
let context = format!(
"\n\n---\n## Previous Attempt Failed\n\
The acceptance gates failed with the following output:\n{}\n\n\
Please review the failures above, fix the issues, and try again.",
completion.gate_output
);
if let Err(e) = self
.start_agent(
&project_root,
story_id,
Some(agent_name),
Some(&context),
previous_session_id,
)
.await
{
slog_error!(
"[pipeline] Failed to restart coder '{agent_name}' for '{story_id}': {e}"
);
}
}
}
}
PipelineStage::Qa => {
if completion.gates_passed {
// Run coverage gate in the QA worktree before advancing to merge.
let coverage_path = worktree_path
.clone()
.unwrap_or_else(|| project_root.clone());
let cp = coverage_path.clone();
let coverage_result = tokio::task::spawn_blocking(move || {
crate::agents::gates::run_coverage_gate(&cp)
})
.await
.unwrap_or_else(|e| {
slog_warn!("[pipeline] Coverage gate task panicked: {e}");
Ok((false, format!("Coverage gate task panicked: {e}")))
});
let (coverage_passed, coverage_output) = match coverage_result {
Ok(pair) => pair,
Err(e) => (false, e),
};
if coverage_passed {
// Check whether this item needs human review before merging.
let needs_human_review = {
let item_type = crate::agents::lifecycle::item_type_from_id(story_id);
if item_type == "spike" {
true // Spikes always need human review.
} else {
let default_qa = config.default_qa_mode();
matches!(
resolve_qa_mode_from_store(&project_root, story_id, default_qa),
crate::io::story_metadata::QaMode::Human
)
}
};
if needs_human_review {
// Hold in 3_qa/ for human review.
write_review_hold_to_store(story_id);
slog!(
"[pipeline] QA passed for '{story_id}'. \
Holding for human review. \
Worktree preserved at: {worktree_path:?}"
);
} else {
slog!(
"[pipeline] QA passed gates and coverage for '{story_id}'. \
Moving directly to merge."
);
if let Err(e) = crate::agents::lifecycle::move_story_to_merge(
&project_root,
story_id,
) {
slog_error!(
"[pipeline] Failed to move '{story_id}' to 4_merge/: {e}"
);
} else {
self.start_mergemaster_or_block(&project_root, story_id)
.await;
}
}
} else if let Some(reason) =
should_block_story(story_id, config.max_retries, "qa-coverage")
{
// Story has exceeded retry limit — do not restart.
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
story_id: story_id.to_string(),
reason,
});
} else {
slog!(
"[pipeline] QA coverage gate failed for '{story_id}'. Restarting QA."
);
let context = format!(
"\n\n---\n## Coverage Gate Failed\n\
The coverage gate (script/test_coverage) failed with the following output:\n{}\n\n\
Please improve test coverage until the coverage gate passes.",
coverage_output
);
if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), Some(&context), None)
.await
{
slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}");
}
}
} else if let Some(reason) = should_block_story(story_id, config.max_retries, "qa")
{
// Story has exceeded retry limit — do not restart.
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
story_id: story_id.to_string(),
reason,
});
} else {
slog!("[pipeline] QA failed gates for '{story_id}'. Restarting.");
let context = format!(
"\n\n---\n## Previous QA Attempt Failed\n\
The acceptance gates failed with the following output:\n{}\n\n\
Please re-run and fix the issues.",
completion.gate_output
);
if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), Some(&context), None)
.await
{
slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}");
}
}
}
PipelineStage::Mergemaster => {
// Bug 529: Guard against stale mergemaster advances. If the story
// has already reached done or archived (e.g. a previous mergemaster
// succeeded), this advance is a zombie — skip it entirely to avoid
// phantom notifications and redundant post-merge test runs.
if let Ok(Some(typed_item)) = crate::pipeline_state::read_typed(story_id) {
let current_dir = typed_item.stage.dir_name();
if current_dir == "5_done" || current_dir == "6_archived" {
slog!(
"[pipeline] Skipping stale mergemaster advance for '{story_id}': \
story is already in work/{current_dir}/"
);
// Skip pipeline advancement — do not run post-merge tests,
// do not emit notifications, do not restart agents.
return;
}
}
// Block advancement if the mergemaster explicitly reported a failure.
// The server-owned gate check runs in the feature-branch worktree (not
// master), so `gates_passed=true` is misleading when no code was merged.
if merge_failure_reported {
slog!(
"[pipeline] Pipeline advancement blocked for '{story_id}': \
mergemaster explicitly reported a merge failure. \
Story stays in 4_merge/ for human review."
);
} else {
// Run script/test on master (project_root) as the post-merge verification.
slog!(
"[pipeline] Mergemaster completed for '{story_id}'. Running post-merge tests on master."
);
let root = project_root.clone();
let test_result = tokio::task::spawn_blocking(move || {
crate::agents::gates::run_project_tests(&root)
})
.await
.unwrap_or_else(|e| {
slog_warn!("[pipeline] Post-merge test task panicked: {e}");
Ok((false, format!("Test task panicked: {e}")))
});
let (passed, output) = match test_result {
Ok(pair) => pair,
Err(e) => (false, e),
};
if passed {
slog!(
"[pipeline] Post-merge tests passed for '{story_id}'. Moving to done."
);
if let Err(e) =
crate::agents::lifecycle::move_story_to_done(&project_root, story_id)
{
slog_error!("[pipeline] Failed to move '{story_id}' to done: {e}");
}
self.remove_agents_for_story(story_id);
// TODO: Re-enable worktree cleanup once we have persistent agent logs.
// Removing worktrees destroys evidence needed to debug empty-commit agents.
// let config =
// crate::config::ProjectConfig::load(&project_root).unwrap_or_default();
// if let Err(e) =
// worktree::remove_worktree_by_story_id(&project_root, story_id, &config)
// .await
// {
// slog!(
// "[pipeline] Failed to remove worktree for '{story_id}': {e}"
// );
// }
slog!(
"[pipeline] Story '{story_id}' done. Worktree preserved for inspection."
);
} else if let Some(reason) =
should_block_story(story_id, config.max_retries, "mergemaster")
{
// Story has exceeded retry limit — do not restart.
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
story_id: story_id.to_string(),
reason,
});
} else {
slog!(
"[pipeline] Post-merge tests failed for '{story_id}'. Restarting mergemaster."
);
let context = format!(
"\n\n---\n## Post-Merge Test Failed\n\
The tests on master failed with the following output:\n{}\n\n\
Please investigate and resolve the failures, then call merge_agent_work again.",
output
);
if let Err(e) = self
.start_agent(
&project_root,
story_id,
Some("mergemaster"),
Some(&context),
None,
)
.await
{
slog_error!(
"[pipeline] Failed to restart mergemaster for '{story_id}': {e}"
);
}
}
}
}
}
// Always scan for unassigned work after any agent completes, regardless
// of the outcome (success, failure, restart). This ensures stories that
// failed agent assignment due to busy agents are retried when agents
// become available (bug 295).
self.auto_assign_available_work(&project_root).await;
}
/// Start the mergemaster agent for `story_id`, but only if the feature
/// branch has commits that are not yet on master.
///
/// If the branch has zero commits ahead of master, this logs an error and
/// sends a [`WatcherEvent::StoryBlocked`] instead of spawning a Claude
/// session. A no-op merge session was observed spending $0.82 in the
/// 2026-04-09 incident (story 519).
async fn start_mergemaster_or_block(&self, project_root: &Path, story_id: &str) {
let branch = format!("feature/story-{story_id}");
if !crate::agents::lifecycle::feature_branch_has_unmerged_changes(project_root, story_id) {
slog_error!(
"[mergemaster] Branch '{branch}' has no commits ahead of master — \
refusing to spawn merge session. \
Likely cause: the worktree was reset to master after the feature \
branch's commits were created. Investigate the worktree's git state \
before retrying. Story '{story_id}' stays in 4_merge/ for human review."
);
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
story_id: story_id.to_string(),
reason: format!(
"Feature branch '{branch}' has no commits ahead of master — nothing to merge. \
The worktree may have been reset to master. \
Check the worktree's git state and retry manually."
),
});
return;
}
if let Err(e) = self
.start_agent(project_root, story_id, Some("mergemaster"), None, None)
.await
{
slog_error!("[pipeline] Failed to start mergemaster for '{story_id}': {e}");
}
}
}
/// Spawn pipeline advancement as a background task.
///
/// This is a **non-async** function so it does not participate in the opaque
/// type cycle between `start_agent` and `run_server_owned_completion`.
#[allow(clippy::too_many_arguments)]
pub(super) fn spawn_pipeline_advance(
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
port: u16,
story_id: &str,
agent_name: &str,
completion: CompletionReport,
project_root: Option<PathBuf>,
worktree_path: Option<PathBuf>,
watcher_tx: broadcast::Sender<WatcherEvent>,
merge_failure_reported: bool,
previous_session_id: Option<String>,
) {
let sid = story_id.to_string();
let aname = agent_name.to_string();
tokio::spawn(async move {
let pool = AgentPool {
agents,
port,
child_killers: Arc::new(Mutex::new(HashMap::new())),
watcher_tx,
merge_jobs: Arc::new(Mutex::new(HashMap::new())),
};
pool.run_pipeline_advance(
&sid,
&aname,
completion,
project_root,
worktree_path,
merge_failure_reported,
previous_session_id,
)
.await;
});
}
/// Resolve QA mode from the content store.
fn resolve_qa_mode_from_store(
_project_root: &Path,
story_id: &str,
default: crate::io::story_metadata::QaMode,
) -> crate::io::story_metadata::QaMode {
if let Some(contents) = crate::db::read_content(story_id) {
return crate::io::story_metadata::resolve_qa_mode_from_content(&contents, default);
}
default
}
/// Write review_hold to the content store.
fn write_review_hold_to_store(story_id: &str) {
if let Some(contents) = crate::db::read_content(story_id) {
let updated = crate::io::story_metadata::write_review_hold_in_content(&contents);
crate::db::write_content(story_id, &updated);
// Also persist to SQLite via shadow write.
let stage = crate::pipeline_state::read_typed(story_id)
.ok()
.flatten()
.map(|i| i.stage.dir_name().to_string())
.unwrap_or_else(|| "3_qa".to_string());
crate::db::write_item_with_content(story_id, &stage, &updated);
} else {
slog_error!("[pipeline] Cannot write review_hold for '{story_id}': no content in store");
}
}
/// Increment retry_count and block the story if it exceeds `max_retries`.
///
/// Returns `Some(reason)` if the story is now blocked (caller should NOT restart the agent).
/// Returns `None` if the story may be retried.
/// When `max_retries` is 0, retry limits are disabled.
fn should_block_story(story_id: &str, max_retries: u32, stage_label: &str) -> Option<String> {
use crate::io::story_metadata::{increment_retry_count_in_content, write_blocked_in_content};
if max_retries == 0 {
return None;
}
if let Some(contents) = crate::db::read_content(story_id) {
let (updated, new_count) = increment_retry_count_in_content(&contents);
crate::db::write_content(story_id, &updated);
let stage = crate::pipeline_state::read_typed(story_id)
.ok()
.flatten()
.map(|i| i.stage.dir_name().to_string())
.unwrap_or_else(|| "2_current".to_string());
crate::db::write_item_with_content(story_id, &stage, &updated);
if new_count >= max_retries {
slog_warn!(
"[pipeline] Story '{story_id}' reached retry limit ({new_count}/{max_retries}) \
at {stage_label} stage. Marking as blocked."
);
let blocked = write_blocked_in_content(&updated);
crate::db::write_content(story_id, &blocked);
crate::db::write_item_with_content(story_id, &stage, &blocked);
Some(format!(
"Retry limit exceeded ({new_count}/{max_retries}) at {stage_label} stage"
))
} else {
slog!(
"[pipeline] Story '{story_id}' retry {new_count}/{max_retries} at {stage_label} stage."
);
None
}
} else {
slog_error!("[pipeline] Failed to read content for '{story_id}' to increment retry_count");
None
}
}
#[cfg(test)]
mod tests {
use super::super::super::AgentPool;
use super::super::super::composite_key;
use crate::agents::{AgentStatus, CompletionReport};
use crate::io::watcher::WatcherEvent;
// ── pipeline advance tests ────────────────────────────────────────────────
#[tokio::test]
async fn pipeline_advance_coder_gates_pass_server_qa_moves_to_merge() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Set up story in 2_current/ (no qa frontmatter → uses project default "server").
// Use a unique high-numbered ID to avoid collision with the agent_qa test.
let current = root.join(".huskies/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("9908_story_server_qa.md"), "test").unwrap();
crate::db::ensure_content_store();
crate::db::write_content("9908_story_server_qa", "test");
let pool = AgentPool::new_test(3001);
pool.run_pipeline_advance(
"9908_story_server_qa",
"coder-1",
CompletionReport {
summary: "done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
false,
None,
)
.await;
// With default qa: server, story skips QA and goes straight to 4_merge/
// Lifecycle moves now update the content store, not the filesystem.
assert!(
crate::db::read_content("9908_story_server_qa").is_some(),
"story should still exist in content store after move to merge"
);
}
#[tokio::test]
async fn pipeline_advance_coder_gates_pass_agent_qa_moves_to_qa() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Set up story in 2_current/ with qa: agent frontmatter.
// Use a unique high-numbered ID to avoid collision with the server_qa test.
let current = root.join(".huskies/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(
current.join("9909_story_agent_qa.md"),
"---\nname: Test\nqa: agent\n---\ntest",
)
.unwrap();
crate::db::ensure_content_store();
crate::db::write_content(
"9909_story_agent_qa",
"---\nname: Test\nqa: agent\n---\ntest",
);
let pool = AgentPool::new_test(3001);
pool.run_pipeline_advance(
"9909_story_agent_qa",
"coder-1",
CompletionReport {
summary: "done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
false,
None,
)
.await;
// With qa: agent, story should move to 3_qa/
// Lifecycle moves now update the content store, not the filesystem.
assert!(
crate::db::read_content("9909_story_agent_qa").is_some(),
"story should still exist in content store after move to qa"
);
}
#[tokio::test]
async fn pipeline_advance_qa_gates_pass_moves_story_to_merge() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Set up story in 3_qa/
let qa_dir = root.join(".huskies/work/3_qa");
fs::create_dir_all(&qa_dir).unwrap();
// qa: server so the story skips human review and goes straight to merge.
fs::write(
qa_dir.join("51_story_test.md"),
"---\nname: Test\nqa: server\n---\ntest",
)
.unwrap();
crate::db::ensure_content_store();
crate::db::write_content("51_story_test", "---\nname: Test\nqa: server\n---\ntest");
let pool = AgentPool::new_test(3001);
pool.run_pipeline_advance(
"51_story_test",
"qa",
CompletionReport {
summary: "QA done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
false,
None,
)
.await;
// Story should have moved to 4_merge/
// Lifecycle moves now update the content store, not the filesystem.
assert!(
crate::db::read_content("51_story_test").is_some(),
"story should still exist in content store after move to merge"
);
}
#[tokio::test]
async fn pipeline_advance_supervisor_does_not_advance() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let current = root.join(".huskies/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("52_story_test.md"), "test").unwrap();
let pool = AgentPool::new_test(3001);
pool.run_pipeline_advance(
"52_story_test",
"supervisor",
CompletionReport {
summary: "supervised".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
false,
None,
)
.await;
// Story should NOT have moved (supervisors don't advance pipeline)
assert!(
current.join("52_story_test.md").exists(),
"story should still be in 2_current/ for supervisor"
);
}
#[tokio::test]
async fn pipeline_advance_sends_agent_state_changed_to_watcher_tx() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Seed story via CRDT (the only source of truth).
crate::db::ensure_content_store();
crate::db::write_item_with_content("173_story_test", "2_current", "---\nname: test\n---\n");
// Write a project.toml with a qa agent so start_agent can resolve it.
fs::create_dir_all(root.join(".huskies")).unwrap();
fs::write(
root.join(".huskies/project.toml"),
r#"
default_qa = "agent"
[[agent]]
name = "coder-1"
role = "Coder"
command = "echo"
args = ["noop"]
prompt = "test"
stage = "coder"
[[agent]]
name = "qa"
role = "QA"
command = "echo"
args = ["noop"]
prompt = "test"
stage = "qa"
"#,
)
.unwrap();
let pool = AgentPool::new_test(3001);
// Subscribe to the watcher channel BEFORE the pipeline advance.
let mut rx = pool.watcher_tx.subscribe();
pool.run_pipeline_advance(
"173_story_test",
"coder-1",
CompletionReport {
summary: "done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
false,
None,
)
.await;
// The pipeline advance should have sent AgentStateChanged events via
// the pool's watcher_tx (not a dummy channel). Collect all events.
let mut got_agent_state_changed = false;
while let Ok(evt) = rx.try_recv() {
if matches!(evt, WatcherEvent::AgentStateChanged) {
got_agent_state_changed = true;
break;
}
}
assert!(
got_agent_state_changed,
"pipeline advance should send AgentStateChanged through the real watcher_tx \
(bug 173: lozenges must update when agents are assigned during pipeline advance)"
);
}
// ── story 519: mergemaster pre-flight blocks when no commits ahead ──
/// Regression test for story 519: when the feature branch has zero commits
/// ahead of master, mergemaster must not spawn a Claude session. A no-op
/// session spent $0.82 in the 2026-04-09 incident because the worktree was
/// reset to master before mergemaster ran.
#[tokio::test]
async fn mergemaster_blocks_and_sends_story_blocked_when_no_commits_ahead() {
use std::fs;
use std::process::Command;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Init a bare git repo on master with one empty commit.
Command::new("git")
.args(["init"])
.current_dir(root)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.email", "test@test.com"])
.current_dir(root)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.name", "Test"])
.current_dir(root)
.output()
.unwrap();
Command::new("git")
.args(["commit", "--allow-empty", "-m", "init"])
.current_dir(root)
.output()
.unwrap();
// Create a feature branch that points at master HEAD (zero commits ahead).
// This replicates the incident where the worktree was reset to master.
Command::new("git")
.args(["branch", "feature/story-9919_story_no_commits"])
.current_dir(root)
.output()
.unwrap();
// Set up pipeline dirs and story file.
let current = root.join(".huskies/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::create_dir_all(root.join(".huskies/work/4_merge")).unwrap();
fs::write(
current.join("9919_story_no_commits.md"),
"---\nname: Test\n---\n",
)
.unwrap();
crate::db::ensure_content_store();
crate::db::write_content("9919_story_no_commits", "---\nname: Test\n---\n");
let pool = AgentPool::new_test(3001);
let mut rx = pool.watcher_tx.subscribe();
// Simulate coder completing with gates passed (qa: server → goes to merge).
pool.run_pipeline_advance(
"9919_story_no_commits",
"coder-1",
CompletionReport {
summary: "done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
false,
None,
)
.await;
// Story should still exist in the content store after moving to merge.
assert!(
crate::db::read_content("9919_story_no_commits").is_some(),
"story should remain in content store — not removed"
);
// A StoryBlocked event must have been emitted (triggers chat failure notice,
// not the success 🎉 emoji).
let mut got_blocked = false;
while let Ok(evt) = rx.try_recv() {
if let WatcherEvent::StoryBlocked { story_id, .. } = &evt
&& story_id == "9919_story_no_commits"
{
got_blocked = true;
break;
}
}
assert!(
got_blocked,
"StoryBlocked event must be sent when feature branch has no commits ahead of master"
);
// No mergemaster agent should have been started.
let agents = pool.agents.lock().unwrap();
let mergemaster_started = agents
.values()
.any(|a| a.agent_name.contains("mergemaster"));
assert!(
!mergemaster_started,
"mergemaster agent must NOT be started when no commits ahead of master"
);
}
// ── bug 295: pipeline advance picks up waiting QA stories ──────────
#[tokio::test]
async fn pipeline_advance_picks_up_waiting_qa_stories_after_completion() {
use super::super::super::auto_assign::is_agent_free;
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let sk = root.join(".huskies");
fs::create_dir_all(&sk).unwrap();
// Configure a single QA agent.
fs::write(
sk.join("project.toml"),
r#"
[[agent]]
name = "qa"
stage = "qa"
"#,
)
.unwrap();
// Seed stories via CRDT (the only source of truth).
crate::db::ensure_content_store();
// Story 292 is in QA with QA agent running (will "complete" via
// run_pipeline_advance below). Story 293 is in QA with NO agent —
// simulating the "stuck" state from bug 295.
crate::db::write_item_with_content(
"292_story_first",
"3_qa",
"---\nname: First\nqa: human\n---\n",
);
crate::db::write_item_with_content(
"293_story_second",
"3_qa",
"---\nname: Second\nqa: human\n---\n",
);
let pool = AgentPool::new_test(3001);
// QA is currently running on story 292.
pool.inject_test_agent("292_story_first", "qa", AgentStatus::Running);
// Verify that 293 cannot get a QA agent right now (QA is busy).
{
let agents = pool.agents.lock().unwrap();
assert!(
!is_agent_free(&agents, "qa"),
"qa should be busy on story 292"
);
}
// Simulate QA completing on story 292: remove the agent from the pool
// (as run_server_owned_completion does) then run pipeline advance.
{
let mut agents = pool.agents.lock().unwrap();
agents.remove(&composite_key("292_story_first", "qa"));
}
pool.run_pipeline_advance(
"292_story_first",
"qa",
CompletionReport {
summary: "QA done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
false,
None,
)
.await;
// After pipeline advance, auto_assign should have started QA on story 293.
let agents = pool.agents.lock().unwrap();
let qa_on_293 = agents.values().any(|a| {
a.agent_name == "qa" && matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
});
assert!(
qa_on_293,
"auto_assign should have started qa for story 293 after 292's QA completed, \
but no qa agent is pending/running. Pool: {:?}",
agents
.iter()
.map(|(k, a)| format!("{k}: {} ({})", a.agent_name, a.status))
.collect::<Vec<_>>()
);
}
// ── bug 529: stale mergemaster advance for a done story is a no-op ──
/// Regression test for bug 529: when a stale mergemaster advance fires
/// after the story has already reached 5_done, the advance must be a
/// no-op — no post-merge tests, no notifications, no agent restarts.
#[tokio::test]
async fn stale_mergemaster_advance_for_done_story_is_noop() {
use std::fs;
use std::process::Command;
// Initialise CRDT so read_typed works.
crate::crdt_state::init_for_test();
crate::db::ensure_content_store();
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Init a git repo so post-merge tests would pass if they ran.
Command::new("git")
.args(["init"])
.current_dir(root)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.email", "test@test.com"])
.current_dir(root)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.name", "Test"])
.current_dir(root)
.output()
.unwrap();
Command::new("git")
.args(["commit", "--allow-empty", "-m", "init"])
.current_dir(root)
.output()
.unwrap();
// Set up pipeline dirs.
fs::create_dir_all(root.join(".huskies/work/5_done")).unwrap();
// Seed the story in 5_done via the DB, which also writes to the CRDT.
let story_id = "9929_story_zombie_merge";
let content = "---\nname: Zombie Merge Test\n---\n";
crate::db::write_content(story_id, content);
crate::db::write_item_with_content(story_id, "5_done", content);
let pool = AgentPool::new_test(3001);
let mut rx = pool.watcher_tx.subscribe();
// Simulate a stale mergemaster advance firing for the already-done story.
pool.run_pipeline_advance(
story_id,
"mergemaster",
CompletionReport {
summary: "stale advance".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
false,
None,
)
.await;
// No agents should have been started.
let agents = pool.agents.lock().unwrap();
assert!(
agents.is_empty(),
"No agents should be started for a stale advance on a done story. \
Pool: {:?}",
agents.keys().collect::<Vec<_>>()
);
drop(agents);
// No StoryBlocked or other events should have been emitted.
let mut got_event = false;
while let Ok(evt) = rx.try_recv() {
// AgentStateChanged from auto_assign is acceptable only if the
// advance didn't short-circuit. Since we return early, no events.
if matches!(evt, WatcherEvent::StoryBlocked { .. }) {
got_event = true;
}
}
assert!(
!got_event,
"No StoryBlocked event should be emitted for a stale advance"
);
// The story should still be in 5_done (not moved elsewhere).
if let Ok(Some(item)) = crate::pipeline_state::read_typed(story_id) {
assert_eq!(
item.stage.dir_name(),
"5_done",
"Story should remain in 5_done after stale mergemaster advance"
);
}
}
}