Files
huskies/server/src/agents/pool/pipeline/advance/mod.rs
T

489 lines
24 KiB
Rust

//! Pipeline advance — moves stories forward through pipeline stages after agent completion.
#![allow(unused_imports, dead_code)]
use crate::config::ProjectConfig;
use crate::io::watcher::WatcherEvent;
use crate::slog;
use crate::slog_error;
use crate::slog_warn;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use super::super::super::{CompletionReport, PipelineStage, agent_config_stage, pipeline_stage};
use super::super::{AgentPool, StoryAgent};
/// Maximum number of bytes of gate output to include in the failure context
/// injected into the resumed session. Keeps the injected message focused —
/// the tail of the output (where errors appear) is always preserved.
const MAX_GATE_OUTPUT_BYTES: usize = 8_000;
/// Truncate gate output to [`MAX_GATE_OUTPUT_BYTES`], keeping the **tail**
/// (where compiler errors and test failures are reported).
fn truncate_gate_output(output: &str) -> &str {
if output.len() <= MAX_GATE_OUTPUT_BYTES {
return output;
}
let start = output.len() - MAX_GATE_OUTPUT_BYTES;
// Advance to the next valid UTF-8 char boundary.
let mut adjusted = start;
while !output.is_char_boundary(adjusted) {
adjusted += 1;
}
&output[adjusted..]
}
impl AgentPool {
/// Pipeline advancement: after an agent completes, move the story to
/// the next pipeline stage and start the appropriate agent.
#[allow(clippy::too_many_arguments)]
pub(super) async fn run_pipeline_advance(
&self,
story_id: &str,
agent_name: &str,
completion: CompletionReport,
project_root: Option<PathBuf>,
worktree_path: Option<PathBuf>,
merge_failure_reported: bool,
previous_session_id: Option<String>,
) {
let project_root = match project_root {
Some(p) => p,
None => {
slog_warn!("[pipeline] No project_root for '{story_id}:{agent_name}'");
return;
}
};
let config = ProjectConfig::load(&project_root).unwrap_or_default();
let stage = config
.find_agent(agent_name)
.map(agent_config_stage)
.unwrap_or_else(|| pipeline_stage(agent_name));
// If the story is frozen, do not advance the pipeline. The agent's work
// is done but the story stays at its current stage.
if crate::io::story_metadata::is_story_frozen_in_store(story_id) {
slog!("[pipeline] Story '{story_id}' is frozen; pipeline advancement suppressed.");
return;
}
match stage {
PipelineStage::Other => {
// Supervisors and unknown agents do not advance the pipeline.
}
PipelineStage::Coder => {
if completion.gates_passed {
// Determine effective QA mode for this story.
let qa_mode = {
let item_type = crate::agents::lifecycle::item_type_from_id(story_id);
if item_type == "spike" {
crate::io::story_metadata::QaMode::Human
} else {
let default_qa = config.default_qa_mode();
resolve_qa_mode_from_store(&project_root, story_id, default_qa)
}
};
match qa_mode {
crate::io::story_metadata::QaMode::Server => {
slog!(
"[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \
qa: server — moving directly to merge."
);
if let Err(e) = crate::agents::lifecycle::move_story_to_merge(story_id)
{
slog_error!(
"[pipeline] Failed to move '{story_id}' to 4_merge/: {e}"
);
} else {
self.trigger_server_side_merge(&project_root, story_id);
}
}
crate::io::story_metadata::QaMode::Agent => {
slog!(
"[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \
qa: agent — moving to QA."
);
if let Err(e) = crate::agents::lifecycle::move_story_to_qa(story_id) {
slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}");
} else if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), None, None)
.await
{
slog_error!(
"[pipeline] Failed to start qa agent for '{story_id}': {e}"
);
}
}
crate::io::story_metadata::QaMode::Human => {
slog!(
"[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \
qa: human — holding for human review."
);
if let Err(e) = crate::agents::lifecycle::move_story_to_qa(story_id) {
slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}");
} else {
write_review_hold_to_store(story_id);
}
}
}
} else {
// Bug 645 / 668: Before retry/block, check if the agent left committed
// work AND the agent had a passing run_tests result captured during its
// session. An agent may crash mid-output (e.g. Claude Code CLI PTY write
// assertion) after having already committed valid code and run tests.
// We require positive test evidence (not just cargo check) so that only
// stories with genuinely passing test suites are salvaged.
//
// The `run_tests` MCP tool writes `{story_id}:run_tests_ok` to the DB
// whenever script/test exits 0 inside a story worktree. Consume the
// evidence here so it does not persist to the next agent session.
let has_test_evidence =
crate::db::read_content(&format!("{story_id}:run_tests_ok")).is_some();
crate::db::delete_content(&format!("{story_id}:run_tests_ok"));
let work_survived = has_test_evidence
&& worktree_path.as_ref().is_some_and(|wt_path| {
crate::agents::gates::worktree_has_committed_work(wt_path)
});
if work_survived {
slog!(
"[pipeline] Coder '{agent_name}' failed gates for '{story_id}' but \
committed work survives with captured passing tests. Advancing to QA \
instead of retrying (bug 645)."
);
let qa_mode = {
let item_type = crate::agents::lifecycle::item_type_from_id(story_id);
if item_type == "spike" {
crate::io::story_metadata::QaMode::Human
} else {
let default_qa = config.default_qa_mode();
resolve_qa_mode_from_store(&project_root, story_id, default_qa)
}
};
match qa_mode {
crate::io::story_metadata::QaMode::Server => {
if let Err(e) =
crate::agents::lifecycle::move_story_to_merge(story_id)
{
slog_error!(
"[pipeline] Failed to move '{story_id}' to 4_merge/: {e}"
);
} else {
self.trigger_server_side_merge(&project_root, story_id);
}
}
crate::io::story_metadata::QaMode::Agent => {
if let Err(e) = crate::agents::lifecycle::move_story_to_qa(story_id)
{
slog_error!(
"[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"
);
} else if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), None, None)
.await
{
slog_error!(
"[pipeline] Failed to start qa for '{story_id}': {e}"
);
}
}
crate::io::story_metadata::QaMode::Human => {
if let Err(e) = crate::agents::lifecycle::move_story_to_qa(story_id)
{
slog_error!(
"[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"
);
} else {
write_review_hold_to_store(story_id);
}
}
}
} else {
// Persist gate_output so the retry spawn can inject it into
// --append-system-prompt (story 881).
crate::db::write_content(
&format!("{story_id}:gate_output"),
&completion.gate_output,
);
// Increment retry count and check if blocked.
if let Some(reason) =
should_block_story(story_id, config.max_retries, "coder")
{
// Story has exceeded retry limit — do not restart.
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
story_id: story_id.to_string(),
reason,
});
} else {
slog!(
"[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting."
);
let context = format!(
"\n\n---\n## Previous Attempt Failed\n\
The acceptance gates failed with the following output:\n{}\n\n\
Please review the failures above, fix the issues, and try again.",
truncate_gate_output(&completion.gate_output)
);
if let Err(e) = self
.start_agent(
&project_root,
story_id,
Some(agent_name),
Some(&context),
previous_session_id,
)
.await
{
slog_error!(
"[pipeline] Failed to restart coder '{agent_name}' for '{story_id}': {e}"
);
}
}
}
}
}
PipelineStage::Qa => {
if completion.gates_passed {
// Run coverage gate in the QA worktree before advancing to merge.
let coverage_path = worktree_path
.clone()
.unwrap_or_else(|| project_root.clone());
let cp = coverage_path.clone();
let coverage_result = tokio::task::spawn_blocking(move || {
crate::agents::gates::run_coverage_gate(&cp)
})
.await
.unwrap_or_else(|e| {
slog_warn!("[pipeline] Coverage gate task panicked: {e}");
Ok((false, format!("Coverage gate task panicked: {e}")))
});
let (coverage_passed, coverage_output) = match coverage_result {
Ok(pair) => pair,
Err(e) => (false, e),
};
if coverage_passed {
// Check whether this item needs human review before merging.
let needs_human_review = {
let item_type = crate::agents::lifecycle::item_type_from_id(story_id);
if item_type == "spike" {
true // Spikes always need human review.
} else {
let default_qa = config.default_qa_mode();
matches!(
resolve_qa_mode_from_store(&project_root, story_id, default_qa),
crate::io::story_metadata::QaMode::Human
)
}
};
if needs_human_review {
// Hold in 3_qa/ for human review.
write_review_hold_to_store(story_id);
slog!(
"[pipeline] QA passed for '{story_id}'. \
Holding for human review. \
Worktree preserved at: {worktree_path:?}"
);
} else {
slog!(
"[pipeline] QA passed gates and coverage for '{story_id}'. \
Moving directly to merge."
);
if let Err(e) = crate::agents::lifecycle::move_story_to_merge(story_id)
{
slog_error!(
"[pipeline] Failed to move '{story_id}' to 4_merge/: {e}"
);
} else {
self.trigger_server_side_merge(&project_root, story_id);
}
}
} else if let Some(reason) =
should_block_story(story_id, config.max_retries, "qa-coverage")
{
// Story has exceeded retry limit — do not restart.
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
story_id: story_id.to_string(),
reason,
});
} else {
slog!(
"[pipeline] QA coverage gate failed for '{story_id}'. Restarting QA."
);
let context = format!(
"\n\n---\n## Coverage Gate Failed\n\
The coverage gate (script/test_coverage) failed with the following output:\n{}\n\n\
Please improve test coverage until the coverage gate passes.",
coverage_output
);
if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), Some(&context), None)
.await
{
slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}");
}
}
} else {
// Persist gate_output so the retry spawn can inject it into
// --append-system-prompt (story 881).
crate::db::write_content(
&format!("{story_id}:gate_output"),
&completion.gate_output,
);
if let Some(reason) = should_block_story(story_id, config.max_retries, "qa") {
// Story has exceeded retry limit — do not restart.
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
story_id: story_id.to_string(),
reason,
});
} else {
slog!("[pipeline] QA failed gates for '{story_id}'. Restarting.");
let context = format!(
"\n\n---\n## Previous QA Attempt Failed\n\
The acceptance gates failed with the following output:\n{}\n\n\
Please re-run and fix the issues.",
completion.gate_output
);
if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), Some(&context), None)
.await
{
slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}");
}
}
}
}
PipelineStage::Mergemaster => {
// Bug 529: Guard against stale mergemaster advances. If the story
// has already reached done or archived (e.g. a previous mergemaster
// succeeded), this advance is a zombie — skip it entirely to avoid
// phantom notifications and redundant post-merge test runs.
if let Ok(Some(typed_item)) = crate::pipeline_state::read_typed(story_id)
&& matches!(
typed_item.stage,
crate::pipeline_state::Stage::Done { .. }
| crate::pipeline_state::Stage::Archived { .. }
)
{
let current_dir = typed_item.stage.dir_name();
slog!(
"[pipeline] Skipping stale mergemaster advance for '{story_id}': \
story is already in work/{current_dir}/"
);
// Skip pipeline advancement — do not run post-merge tests,
// do not emit notifications, do not restart agents.
return;
}
// Block advancement if the mergemaster explicitly reported a failure.
// The server-owned gate check runs in the feature-branch worktree (not
// master), so `gates_passed=true` is misleading when no code was merged.
if merge_failure_reported {
slog!(
"[pipeline] Pipeline advancement blocked for '{story_id}': \
mergemaster explicitly reported a merge failure. \
Story stays in 4_merge/ for human review."
);
} else {
// Run script/test on master (project_root) as the post-merge verification.
slog!(
"[pipeline] Mergemaster completed for '{story_id}'. Running post-merge tests on master."
);
let root = project_root.clone();
let test_result = tokio::task::spawn_blocking(move || {
crate::agents::gates::run_project_tests(&root)
})
.await
.unwrap_or_else(|e| {
slog_warn!("[pipeline] Post-merge test task panicked: {e}");
Ok((false, format!("Test task panicked: {e}")))
});
let (passed, output) = match test_result {
Ok(pair) => pair,
Err(e) => (false, e),
};
if passed {
slog!(
"[pipeline] Post-merge tests passed for '{story_id}'. Moving to done."
);
if let Err(e) = crate::agents::lifecycle::move_story_to_done(story_id) {
slog_error!("[pipeline] Failed to move '{story_id}' to done: {e}");
}
self.remove_agents_for_story(story_id);
// TODO: Re-enable worktree cleanup once we have persistent agent logs.
// Removing worktrees destroys evidence needed to debug empty-commit agents.
// let config =
// crate::config::ProjectConfig::load(&project_root).unwrap_or_default();
// if let Err(e) =
// worktree::remove_worktree_by_story_id(&project_root, story_id, &config)
// .await
// {
// slog!(
// "[pipeline] Failed to remove worktree for '{story_id}': {e}"
// );
// }
slog!(
"[pipeline] Story '{story_id}' done. Worktree preserved for inspection."
);
} else if let Some(reason) =
should_block_story(story_id, config.max_retries, "mergemaster")
{
// Story has exceeded retry limit — do not restart.
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
story_id: story_id.to_string(),
reason,
});
} else {
slog!(
"[pipeline] Post-merge tests failed for '{story_id}'. Restarting mergemaster."
);
let context = format!(
"\n\n---\n## Post-Merge Test Failed\n\
The tests on master failed with the following output:\n{}\n\n\
Please investigate and resolve the failures, then call merge_agent_work again.",
output
);
if let Err(e) = self
.start_agent(
&project_root,
story_id,
Some("mergemaster"),
Some(&context),
None,
)
.await
{
slog_error!(
"[pipeline] Failed to restart mergemaster for '{story_id}': {e}"
);
}
}
}
}
}
// Always scan for unassigned work after any agent completes, regardless
// of the outcome (success, failure, restart). This ensures stories that
// failed agent assignment due to busy agents are retried when agents
// become available (bug 295).
self.auto_assign_available_work(&project_root).await;
}
}
/// Spawn pipeline advancement as a background task.
///
/// This is a **non-async** function so it does not participate in the opaque
/// type cycle between `start_agent` and `run_server_owned_completion`.
mod helpers;
use helpers::{resolve_qa_mode_from_store, write_review_hold_to_store};
pub(crate) use helpers::{should_block_story, spawn_pipeline_advance};
#[cfg(test)]
mod tests;
#[cfg(test)]
mod tests_regression;