Files
huskies/server/src/agents/pool/pipeline/merge.rs
T

545 lines
19 KiB
Rust

use crate::slog;
use crate::slog_error;
use crate::slog_warn;
use crate::worktree;
use std::path::Path;
use std::sync::Arc;
use super::super::super::PipelineStage;
use super::super::super::pipeline_stage;
use super::super::AgentPool;
impl AgentPool {
/// Start the merge pipeline as a background task.
///
/// Returns immediately so the MCP tool call doesn't time out (the full
/// pipeline — squash merge + quality gates — takes well over 60 seconds,
/// exceeding Claude Code's MCP tool-call timeout).
///
/// The mergemaster agent should poll [`get_merge_status`](Self::get_merge_status)
/// until the job reaches a terminal state.
pub fn start_merge_agent_work(
self: &Arc<Self>,
project_root: &Path,
story_id: &str,
) -> Result<(), String> {
// Guard against double-starts.
{
let jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?;
if let Some(job) = jobs.get(story_id)
&& matches!(job.status, crate::agents::merge::MergeJobStatus::Running)
{
return Err(format!(
"Merge already in progress for '{story_id}'. \
Use get_merge_status to poll for completion."
));
}
}
// Insert Running job.
{
let mut jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?;
jobs.insert(
story_id.to_string(),
crate::agents::merge::MergeJob {
story_id: story_id.to_string(),
status: crate::agents::merge::MergeJobStatus::Running,
},
);
}
let pool = Arc::clone(self);
let root = project_root.to_path_buf();
let sid = story_id.to_string();
tokio::spawn(async move {
let report = pool.run_merge_pipeline(&root, &sid).await;
let failed = report.is_err();
let status = match report {
Ok(r) => crate::agents::merge::MergeJobStatus::Completed(r),
Err(e) => crate::agents::merge::MergeJobStatus::Failed(e),
};
if let Ok(mut jobs) = pool.merge_jobs.lock()
&& let Some(job) = jobs.get_mut(&sid)
{
job.status = status;
}
if failed {
pool.auto_assign_available_work(&root).await;
}
});
Ok(())
}
/// The actual merge pipeline, run inside a background task.
async fn run_merge_pipeline(
self: &Arc<Self>,
project_root: &Path,
story_id: &str,
) -> Result<crate::agents::merge::MergeReport, String> {
let branch = format!("feature/story-{story_id}");
let wt_path = worktree::worktree_path(project_root, story_id);
let root = project_root.to_path_buf();
let sid = story_id.to_string();
let br = branch.clone();
let merge_result =
tokio::task::spawn_blocking(move || crate::agents::merge::run_squash_merge(&root, &br, &sid))
.await
.map_err(|e| format!("Merge task panicked: {e}"))??;
if !merge_result.success {
return Ok(crate::agents::merge::MergeReport {
story_id: story_id.to_string(),
success: false,
had_conflicts: merge_result.had_conflicts,
conflicts_resolved: merge_result.conflicts_resolved,
conflict_details: merge_result.conflict_details,
gates_passed: merge_result.gates_passed,
gate_output: merge_result.output,
worktree_cleaned_up: false,
story_archived: false,
});
}
let story_archived =
crate::agents::lifecycle::move_story_to_archived(project_root, story_id).is_ok();
if story_archived {
self.remove_agents_for_story(story_id);
}
let worktree_cleaned_up = if wt_path.exists() {
let config = crate::config::ProjectConfig::load(project_root).unwrap_or_default();
worktree::remove_worktree_by_story_id(project_root, story_id, &config)
.await
.is_ok()
} else {
false
};
self.auto_assign_available_work(project_root).await;
Ok(crate::agents::merge::MergeReport {
story_id: story_id.to_string(),
success: true,
had_conflicts: merge_result.had_conflicts,
conflicts_resolved: merge_result.conflicts_resolved,
conflict_details: merge_result.conflict_details,
gates_passed: true,
gate_output: merge_result.output,
worktree_cleaned_up,
story_archived,
})
}
/// Check the status of a background merge job.
pub fn get_merge_status(&self, story_id: &str) -> Option<crate::agents::merge::MergeJob> {
self.merge_jobs
.lock()
.ok()
.and_then(|jobs| jobs.get(story_id).cloned())
}
/// Record that the mergemaster agent for `story_id` explicitly reported a
/// merge failure via the `report_merge_failure` MCP tool.
///
/// Sets `merge_failure_reported = true` on the active mergemaster agent so
/// that `run_pipeline_advance` can block advancement to `5_done/` even when
/// the server-owned gate check returns `gates_passed=true` (those gates run
/// in the feature-branch worktree, not on master).
pub fn set_merge_failure_reported(&self, story_id: &str) {
match self.agents.lock() {
Ok(mut lock) => {
let found = lock.iter_mut().find(|(key, agent)| {
let key_story_id = key
.rsplit_once(':')
.map(|(sid, _)| sid)
.unwrap_or(key.as_str());
key_story_id == story_id
&& pipeline_stage(&agent.agent_name) == PipelineStage::Mergemaster
});
match found {
Some((_, agent)) => {
agent.merge_failure_reported = true;
slog!(
"[pipeline] Merge failure flag set for '{story_id}:{}'",
agent.agent_name
);
}
None => {
slog_warn!(
"[pipeline] set_merge_failure_reported: no running mergemaster found \
for story '{story_id}' — flag not set"
);
}
}
}
Err(e) => {
slog_error!("[pipeline] set_merge_failure_reported: could not lock agents: {e}");
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::super::AgentPool;
use crate::agents::merge::{MergeJob, MergeJobStatus};
use std::process::Command;
fn init_git_repo(repo: &std::path::Path) {
Command::new("git")
.args(["init"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.email", "test@test.com"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.name", "Test"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "--allow-empty", "-m", "init"])
.current_dir(repo)
.output()
.unwrap();
}
// ── merge_agent_work tests ────────────────────────────────────────────────
/// Helper: start a merge and poll until terminal state.
async fn run_merge_to_completion(
pool: &Arc<AgentPool>,
repo: &std::path::Path,
story_id: &str,
) -> MergeJob {
pool.start_merge_agent_work(repo, story_id).unwrap();
loop {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
if let Some(job) = pool.get_merge_status(story_id)
&& !matches!(job.status, MergeJobStatus::Running)
{
return job;
}
}
}
#[tokio::test]
async fn merge_agent_work_returns_error_when_branch_not_found() {
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
let pool = Arc::new(AgentPool::new_test(3001));
let job = run_merge_to_completion(&pool, repo, "99_nonexistent").await;
match &job.status {
MergeJobStatus::Completed(report) => {
assert!(!report.success, "should fail when branch missing");
}
MergeJobStatus::Failed(_) => {
// Also acceptable — the pipeline errored out
}
MergeJobStatus::Running => {
panic!("should not still be running");
}
}
}
#[tokio::test]
async fn merge_agent_work_succeeds_on_clean_branch() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
// Create a feature branch with a commit
Command::new("git")
.args(["checkout", "-b", "feature/story-23_test"])
.current_dir(repo)
.output()
.unwrap();
fs::write(repo.join("feature.txt"), "feature content").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "add feature"])
.current_dir(repo)
.output()
.unwrap();
// Switch back to master (initial branch)
Command::new("git")
.args(["checkout", "master"])
.current_dir(repo)
.output()
.unwrap();
// Create the story file in 4_merge/ so we can test archival
let merge_dir = repo.join(".storkit/work/4_merge");
fs::create_dir_all(&merge_dir).unwrap();
let story_file = merge_dir.join("23_test.md");
fs::write(&story_file, "---\nname: Test\n---\n").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "add story in merge"])
.current_dir(repo)
.output()
.unwrap();
let pool = Arc::new(AgentPool::new_test(3001));
let job = run_merge_to_completion(&pool, repo, "23_test").await;
match &job.status {
MergeJobStatus::Completed(report) => {
assert!(!report.had_conflicts, "should have no conflicts");
assert!(
report.success
|| report.gate_output.contains("Failed to run")
|| !report.gates_passed,
"report should be coherent: {report:?}"
);
if report.story_archived {
let done = repo.join(".storkit/work/5_done/23_test.md");
assert!(done.exists(), "done file should exist");
}
}
MergeJobStatus::Failed(e) => {
// Gate failures are acceptable in test env
assert!(
e.contains("Failed") || e.contains("failed"),
"unexpected failure: {e}"
);
}
MergeJobStatus::Running => panic!("should not still be running"),
}
}
// ── quality gate ordering test ────────────────────────────────
/// Regression test for bug 142: quality gates must run BEFORE the fast-forward
/// to master so that broken code never lands on master.
#[cfg(unix)]
#[test]
fn quality_gates_run_before_fast_forward_to_master() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
// Add a failing script/test so quality gates will fail.
let script_dir = repo.join("script");
fs::create_dir_all(&script_dir).unwrap();
let script_test = script_dir.join("test");
fs::write(&script_test, "#!/usr/bin/env bash\nexit 1\n").unwrap();
let mut perms = fs::metadata(&script_test).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script_test, perms).unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "add failing script/test"])
.current_dir(repo)
.output()
.unwrap();
// Create a feature branch with a commit.
Command::new("git")
.args(["checkout", "-b", "feature/story-142_test"])
.current_dir(repo)
.output()
.unwrap();
fs::write(repo.join("change.txt"), "feature change").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "feature work"])
.current_dir(repo)
.output()
.unwrap();
// Switch back to master and record its HEAD.
Command::new("git")
.args(["checkout", "master"])
.current_dir(repo)
.output()
.unwrap();
let head_before = String::from_utf8(
Command::new("git")
.args(["rev-parse", "HEAD"])
.current_dir(repo)
.output()
.unwrap()
.stdout,
)
.unwrap()
.trim()
.to_string();
// Run the squash-merge. The failing script/test makes quality gates
// fail → fast-forward must NOT happen.
let result =
crate::agents::merge::run_squash_merge(repo, "feature/story-142_test", "142_test")
.unwrap();
let head_after = String::from_utf8(
Command::new("git")
.args(["rev-parse", "HEAD"])
.current_dir(repo)
.output()
.unwrap()
.stdout,
)
.unwrap()
.trim()
.to_string();
// Gates must have failed (script/test exits 1) so master should be untouched.
assert!(
!result.success,
"run_squash_merge must report failure when gates fail"
);
assert_eq!(
head_before, head_after,
"master HEAD must not advance when quality gates fail (bug 142)"
);
}
#[tokio::test]
async fn merge_agent_work_conflict_does_not_break_master() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
// Create a file on master.
fs::write(
repo.join("code.rs"),
"fn main() {\n println!(\"hello\");\n}\n",
)
.unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "initial code"])
.current_dir(repo)
.output()
.unwrap();
// Feature branch: modify the same line differently.
Command::new("git")
.args(["checkout", "-b", "feature/story-42_story_foo"])
.current_dir(repo)
.output()
.unwrap();
fs::write(
repo.join("code.rs"),
"fn main() {\n println!(\"hello\");\n feature_fn();\n}\n",
)
.unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "feature: add fn call"])
.current_dir(repo)
.output()
.unwrap();
// Master: add different line at same location.
Command::new("git")
.args(["checkout", "master"])
.current_dir(repo)
.output()
.unwrap();
fs::write(
repo.join("code.rs"),
"fn main() {\n println!(\"hello\");\n master_fn();\n}\n",
)
.unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "master: add fn call"])
.current_dir(repo)
.output()
.unwrap();
// Create story file in 4_merge.
let merge_dir = repo.join(".storkit/work/4_merge");
fs::create_dir_all(&merge_dir).unwrap();
fs::write(merge_dir.join("42_story_foo.md"), "---\nname: Test\n---\n").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "add story"])
.current_dir(repo)
.output()
.unwrap();
let pool = Arc::new(AgentPool::new_test(3001));
let job = run_merge_to_completion(&pool, repo, "42_story_foo").await;
// Master should NEVER have conflict markers, regardless of merge outcome.
let master_code = fs::read_to_string(repo.join("code.rs")).unwrap();
assert!(
!master_code.contains("<<<<<<<"),
"master must never contain conflict markers:\n{master_code}"
);
assert!(
!master_code.contains(">>>>>>>"),
"master must never contain conflict markers:\n{master_code}"
);
// The report should accurately reflect what happened.
match &job.status {
MergeJobStatus::Completed(report) => {
assert!(report.had_conflicts, "should report conflicts");
}
MergeJobStatus::Failed(_) => {
// Acceptable — merge aborted due to conflicts
}
MergeJobStatus::Running => panic!("should not still be running"),
}
}
}