//! Merge pipeline runner — start_merge_agent_work and run_merge_pipeline. use crate::slog; use crate::slog_error; use crate::worktree; use std::path::Path; use std::sync::Arc; /// Return `true` when `gate_output` matches a self-evident-fix class of failure /// that a short fixup coder session can resolve without human intervention. /// /// Patterns covered: fmt drift (`cargo fmt --check`), clippy warnings promoted /// to errors (`-D warnings`), and missing doc comments detected by clippy or /// the source-map-check gate. fn is_self_evident_fix(gate_output: &str) -> bool { let patterns: &[&str] = &[ "Diff in ", // cargo fmt --check output "would reformat", // rustfmt --check output "error[clippy::", // clippy error "warning[clippy::", // clippy warning (treated as error via -D warnings) "missing_doc_comments", // clippy missing-doc lint "missing-docs direction", // source-map-check gate ]; patterns.iter().any(|p| gate_output.contains(p)) } use super::super::super::AgentPool; use super::time::{ decode_server_start_time, encode_server_start_time, server_start_time, unix_now, }; impl AgentPool { /// Sweep all Running merge jobs in the CRDT and delete any that were left /// behind by a previous server instance. /// /// A job is considered stale when its recorded `server_start` timestamp is /// older than the current server's boot time, or when the `error` field /// cannot be decoded (legacy / malformed entries). /// /// Called at the top of [`start_merge_agent_work`] to unblock retries, and /// also by the periodic background reaper in the tick loop so stale entries /// are cleaned up even when no new merge is triggered. pub(crate) fn reap_stale_merge_jobs(&self) { if let Some(jobs) = crate::crdt_state::read_all_merge_jobs() { let current_boot = server_start_time(); for job in jobs { if job.status != "running" { continue; } let stale = match decode_server_start_time(job.error.as_deref()) { Some(t) => t < current_boot, None => true, // Legacy (pid-encoded) or malformed: stale }; if stale { slog!( "[merge] Cleared stale Running merge job for '{}' (server restarted)", job.story_id ); crate::crdt_state::delete_merge_job(&job.story_id); } } } } /// Start the merge pipeline as a background task. /// /// Returns immediately so the MCP tool call doesn't time out (the full /// pipeline — squash merge + quality gates — takes well over 60 seconds, /// exceeding Claude Code's MCP tool-call timeout). /// /// The mergemaster agent should poll [`get_merge_status`](Self::get_merge_status) /// until the job reaches a terminal state. pub fn start_merge_agent_work( self: &Arc, project_root: &Path, story_id: &str, ) -> Result<(), String> { // Sweep stale Running entries left behind by dead processes before // applying the double-start guard. self.reap_stale_merge_jobs(); // Guard against double-starts; clear any completed/failed entry so the // caller can retry without needing to call a separate cleanup step. if let Some(job) = crate::crdt_state::read_merge_job(story_id) { match job.status.as_str() { "running" => { return Err(format!( "Merge already in progress for '{story_id}'. \ Use get_merge_status to poll for completion." )); } // Completed or Failed: clear stale entry so we can start fresh. _ => { crate::crdt_state::delete_merge_job(story_id); } } } // Insert Running job into CRDT. let started_at = unix_now(); crate::crdt_state::write_merge_job( story_id, "running", started_at, None, Some(&encode_server_start_time(server_start_time())), ); let pool = Arc::clone(self); let root = project_root.to_path_buf(); let sid = story_id.to_string(); tokio::spawn(async move { let report = pool.run_merge_pipeline(&root, &sid).await; // Story 973: if the story was aborted (Merge → Coding) while the git // operation was running, skip state-machine transitions and watcher // notifications — they would reference the wrong stage. if let Some(job) = crate::crdt_state::read_merge_job(&sid) && job.status == "cancelled" { crate::crdt_state::delete_merge_job(&sid); pool.auto_assign_available_work(&root).await; return; } let success = matches!(&report, Ok(r) if r.success); let finished_at = unix_now(); // On any failure: record merge_failure in CRDT and emit notification. if !success { let reason = match &report { Ok(r) => { if r.had_conflicts { format!( "Merge conflict: {}", r.conflict_details .as_deref() .unwrap_or("conflicts detected") ) } else { format!("Quality gates failed: {}", r.gate_output) } } Err(e) => e.clone(), }; let is_no_commits = reason.contains("no commits to merge"); // Self-evident fix: gate-only failure (no conflicts) whose output matches // a pattern a fixup coder can resolve in one short session (story 981). let gate_output = match &report { Ok(r) if !r.had_conflicts => r.gate_output.clone(), _ => String::new(), }; let is_fixup = !is_no_commits && !gate_output.is_empty() && is_self_evident_fix(&gate_output); if is_no_commits { if let Err(e) = crate::agents::lifecycle::transition_to_blocked(&sid, &reason) { slog_error!("[merge] Failed to transition '{sid}' to Blocked: {e}"); } let _ = pool .watcher_tx .send(crate::io::watcher::WatcherEvent::StoryBlocked { story_id: sid.clone(), reason, }); } else if is_fixup { // Save gate output and mark fixup pending before any state transition // so that a concurrent auto-assign that fires after the state change // sees the keys already set. crate::db::write_content(crate::db::ContentKey::GateOutput(&sid), &gate_output); crate::db::write_content(crate::db::ContentKey::MergeFixupPending(&sid), "1"); // Merge → MergeFailure → Coding. FixupRequested also sets // retry_count=1 so maybe_inject_gate_failure injects the gate // output into --append-system-prompt on the fixup spawn. let _ = crate::agents::lifecycle::transition_to_merge_failure(&sid, &reason); match crate::agents::lifecycle::move_story_to_stage(&sid, "current") { Ok(_) => { slog!( "[merge] Self-evident gate fix for '{sid}'; spawning fixup coder" ); let context = "\n\nYour task is to fix the merge gate failures \ shown above (see --append-system-prompt). \ Run run_tests then commit. Do not explore further."; if let Err(e) = pool .start_agent(&root, &sid, None, Some(context), None) .await { slog_error!( "[merge] Fixup coder spawn failed for '{sid}': {e} \ (auto-assign will retry when a slot opens)" ); } } Err(e) => { slog_error!( "[merge] Failed to move '{sid}' back to current for fixup: {e}; \ reverting to MergeFailure" ); crate::db::delete_content(crate::db::ContentKey::MergeFixupPending( &sid, )); let _ = pool.watcher_tx.send( crate::io::watcher::WatcherEvent::MergeFailure { story_id: sid.clone(), reason, }, ); } } } else { // Transition through the state machine (Merge → MergeFailure). // Only send the notification when the stage actually changed; if the // story was already in MergeFailure (self-loop), suppress the duplicate. let should_notify = match crate::agents::lifecycle::transition_to_merge_failure( &sid, &reason, ) { Ok(fired) => !matches!( fired.before, crate::pipeline_state::Stage::MergeFailure { .. } ), Err(e) => { slog_error!( "[merge] Failed to transition '{sid}' to MergeFailure: {e}" ); true } }; if should_notify { let _ = pool.watcher_tx .send(crate::io::watcher::WatcherEvent::MergeFailure { story_id: sid.clone(), reason, }); } } } // Update CRDT with terminal status. match &report { Ok(r) => { let report_json = serde_json::to_string(r).unwrap_or_else(|_| String::new()); crate::crdt_state::write_merge_job( &sid, "completed", started_at, Some(finished_at), Some(&report_json), ); } Err(e) => { crate::crdt_state::write_merge_job( &sid, "failed", started_at, Some(finished_at), Some(e), ); } } if !success { pool.auto_assign_available_work(&root).await; } }); Ok(()) } /// The actual merge pipeline, run inside a background task. async fn run_merge_pipeline( self: &Arc, project_root: &Path, story_id: &str, ) -> Result { let branch = format!("feature/story-{story_id}"); let wt_path = worktree::worktree_path(project_root, story_id); let root = project_root.to_path_buf(); let sid = story_id.to_string(); let br = branch.clone(); let merge_result = tokio::task::spawn_blocking(move || { crate::agents::merge::run_squash_merge(&root, &br, &sid) }) .await .map_err(|e| format!("Merge task panicked: {e}"))??; if !merge_result.success { return Ok(crate::agents::merge::MergeReport { story_id: story_id.to_string(), success: false, had_conflicts: merge_result.had_conflicts, conflicts_resolved: merge_result.conflicts_resolved, conflict_details: merge_result.conflict_details, gates_passed: merge_result.gates_passed, gate_output: merge_result.output, worktree_cleaned_up: false, story_archived: false, }); } let story_archived = crate::agents::lifecycle::move_story_to_done(story_id).is_ok(); if story_archived { self.remove_agents_for_story(story_id); } let worktree_cleaned_up = if wt_path.exists() { let config = crate::config::ProjectConfig::load(project_root).unwrap_or_default(); worktree::remove_worktree_by_story_id(project_root, story_id, &config) .await .is_ok() } else { false }; self.auto_assign_available_work(project_root).await; Ok(crate::agents::merge::MergeReport { story_id: story_id.to_string(), success: true, had_conflicts: merge_result.had_conflicts, conflicts_resolved: merge_result.conflicts_resolved, conflict_details: merge_result.conflict_details, gates_passed: true, gate_output: merge_result.output, worktree_cleaned_up, story_archived, }) } }