//! Merge pipeline runner — start_merge_agent_work and run_merge_pipeline. use crate::slog; use crate::slog_error; use crate::worktree; use std::path::Path; use std::sync::Arc; use super::super::super::AgentPool; use super::time::{ decode_server_start_time, encode_server_start_time, server_start_time, unix_now, }; impl AgentPool { /// Sweep all Running merge jobs in the CRDT and clear any left behind by /// a previous server instance, re-dispatching them to the auto-assigner /// so the merge resumes on the new boot. /// /// A job is considered stale when its recorded `server_start` timestamp /// is older than the current server's boot time, or when the `error` /// field cannot be decoded (legacy / malformed entries). /// /// For every stale job we (a) delete the orphaned Running entry and (b) /// emit a `WatcherEvent::WorkItem reassign` for the same story_id in /// `4_merge/`, so the auto-assign watcher loop re-triggers /// `start_merge_agent_work` on the fresh boot. Without (b), a mid-merge /// server restart would leave the mergemaster polling `get_merge_status` /// only to see "Merge job disappeared", burn a retry, and after three /// such interruptions escalate to `MergeFailureFinal` — a process bug, /// not a real merge failure (this is what happened to story 998 during /// the bug 1001 fix iterations). /// /// Called at the top of [`start_merge_agent_work`] to unblock retries, /// and also by the periodic background reaper in the tick loop so stale /// entries are cleaned up even when no new merge is triggered. pub(crate) fn reap_stale_merge_jobs(&self) { if let Some(jobs) = crate::crdt_state::read_all_merge_jobs() { let current_boot = server_start_time(); for job in jobs { if job.status != "running" { continue; } let stale = match decode_server_start_time(job.error.as_deref()) { Some(t) => t < current_boot, None => true, // Legacy (pid-encoded) or malformed: stale }; if stale { slog!( "[merge] Cleared stale Running merge job for '{}' (server restarted) — re-dispatching", job.story_id ); crate::crdt_state::delete_merge_job(&job.story_id); // Re-trigger the merge on the new boot. Auto-assign sees // a story in 4_merge/ with no Running job and will call // start_merge_agent_work again. let _ = self .watcher_tx() .send(crate::io::watcher::WatcherEvent::WorkItem { stage: "4_merge".to_string(), item_id: job.story_id.clone(), action: "reassign".to_string(), commit_msg: String::new(), from_stage: None, }); } } } } /// Start the merge pipeline as a background task. /// /// Returns immediately so the MCP tool call doesn't time out (the full /// pipeline — squash merge + quality gates — takes well over 60 seconds, /// exceeding Claude Code's MCP tool-call timeout). /// /// The mergemaster agent should poll [`get_merge_status`](Self::get_merge_status) /// until the job reaches a terminal state. pub fn start_merge_agent_work( self: &Arc, project_root: &Path, story_id: &str, ) -> Result<(), String> { // Sweep stale Running entries left behind by dead processes before // applying the double-start guard. self.reap_stale_merge_jobs(); // Guard against double-starts; clear any completed/failed entry so the // caller can retry without needing to call a separate cleanup step. if let Some(job) = crate::crdt_state::read_merge_job(story_id) { match job.status.as_str() { "running" => { return Err(format!( "Merge already in progress for '{story_id}'. \ Use get_merge_status to poll for completion." )); } // Completed or Failed: clear stale entry so we can start fresh. _ => { crate::crdt_state::delete_merge_job(story_id); } } } // Story 1051: if the story is in MergeFailure, transition it to Merge // before starting the pipeline so the UI shows an active retry state // rather than the stale failure indicator (AC 1, AC 4). if let Err(e) = crate::agents::lifecycle::transition_merge_failure_to_retry(story_id) { slog!( "[merge] Could not transition '{story_id}' from MergeFailure to Merge: {e} \ (merge will proceed regardless)" ); } // Insert Running job into CRDT. let started_at = unix_now(); crate::crdt_state::write_merge_job( story_id, "running", started_at, None, Some(&encode_server_start_time(server_start_time())), ); let pool = Arc::clone(self); let root = project_root.to_path_buf(); let sid = story_id.to_string(); tokio::spawn(async move { let report = pool.run_merge_pipeline(&root, &sid).await; // Story 973: if the story was aborted (Merge → Coding) while the git // operation was running, skip state-machine transitions and watcher // notifications — they would reference the wrong stage. if let Some(job) = crate::crdt_state::read_merge_job(&sid) && job.status == "cancelled" { crate::crdt_state::delete_merge_job(&sid); pool.auto_assign_available_work(&root).await; return; } // Story 1060: if the story was deleted while the merge was running, // discard the result silently — no MergeFailure notification, no // state-machine transition. Writing failure for a tombstoned story // would emit a spurious "merge failed" notification to chat. if crate::crdt_state::is_tombstoned(&sid) { slog!( "[merge] Merge for '{}' completed but story is tombstoned; skipping result write", sid ); crate::crdt_state::delete_merge_job(&sid); return; } let success = matches!( &report, Ok(r) if matches!(r.result, crate::agents::merge::MergeResult::Success { .. }) ); let finished_at = unix_now(); // On any failure: record merge_failure in CRDT and emit notification. if !success { let kind = match &report { Ok(r) => r.result.to_merge_failure_kind(), Err(e) => crate::pipeline_state::MergeFailureKind::Other(e.clone()), }; let is_no_commits = matches!(&kind, crate::pipeline_state::MergeFailureKind::NoCommits); // Self-evident fix: gate-only failure whose typed kind a fixup coder // can resolve in one short session (story 981, 986). let is_fixup = !is_no_commits && report .as_ref() .ok() .and_then(|r| { if let crate::agents::merge::MergeResult::GateFailure { failure_kind: Some(k), .. } = &r.result { Some(k) } else { None } }) .map(|k| k.is_self_evident_fix()) .unwrap_or(false); if is_no_commits { let reason = kind.display_reason(); if let Err(e) = crate::agents::lifecycle::transition_to_blocked(&sid, &reason) { slog_error!("[merge] Failed to transition '{sid}' to Blocked: {e}"); } let _ = pool .watcher_tx .send(crate::io::watcher::WatcherEvent::StoryBlocked { story_id: sid.clone(), reason, }); } else if is_fixup { // Mark fixup pending before any state transition so a concurrent // auto-assign that fires after the state change sees the key set. crate::db::write_content(crate::db::ContentKey::MergeFixupPending(&sid), "1"); // Merge → MergeFailure → Coding. FixupRequested also sets // retry_count=1 so maybe_inject_gate_failure injects gate output // into --append-system-prompt on the fixup spawn. // transition_to_merge_failure also writes ContentKey::GateOutput. let display = kind.display_reason(); let _ = crate::agents::lifecycle::transition_to_merge_failure(sid.as_str(), kind); match crate::agents::lifecycle::move_story_to_stage(&sid, "current") { Ok(_) => { slog!( "[merge] Self-evident gate fix for '{sid}'; spawning fixup coder" ); let context = "\n\nYour task is to fix the merge gate failures \ shown above (see --append-system-prompt). \ Run run_tests then commit. Do not explore further."; if let Err(e) = pool .start_agent(&root, &sid, None, Some(context), None) .await { slog_error!( "[merge] Fixup coder spawn failed for '{sid}': {e} \ (auto-assign will retry when a slot opens)" ); } } Err(e) => { slog_error!( "[merge] Failed to move '{sid}' back to current for fixup: {e}; \ reverting to MergeFailure" ); crate::db::delete_content(crate::db::ContentKey::MergeFixupPending( &sid, )); let _ = pool.watcher_tx.send( crate::io::watcher::WatcherEvent::MergeFailure { story_id: sid.clone(), reason: display, }, ); } } } else { // Transition through the state machine (Merge → MergeFailure). // Only send the notification when the stage actually changed; if the // story was already in MergeFailure (self-loop), suppress the duplicate. let display = kind.display_reason(); let should_notify = match crate::agents::lifecycle::transition_to_merge_failure( sid.as_str(), kind, ) { Ok(fired) => !matches!( fired.before, crate::pipeline_state::Stage::MergeFailure { .. } ), Err(e) => { slog_error!( "[merge] Failed to transition '{sid}' to MergeFailure: {e}" ); true } }; if should_notify { let _ = pool.watcher_tx .send(crate::io::watcher::WatcherEvent::MergeFailure { story_id: sid.clone(), reason: display, }); } } } // AC1 (bug 1008): Before writing "completed" to the CRDT (which // unblocks the mergemaster agent's get_merge_status poll), record // that the merge succeeded. The exit handler in spawn.rs reads // this flag/key to skip the transient-respawn logic for a clean // successful exit. Must happen BEFORE the CRDT write below so the // flag is always present when the agent sees "completed" and exits. if success && let Ok(ref r) = report && r.story_archived { pool.set_merge_success_reported(&sid); crate::db::write_content(crate::db::ContentKey::MergeSuccess(&sid), "1"); } // Update CRDT with terminal status. match &report { Ok(r) => { let report_json = serde_json::to_string(r).unwrap_or_else(|_| String::new()); crate::crdt_state::write_merge_job( &sid, "completed", started_at, Some(finished_at), Some(&report_json), ); } Err(e) => { crate::crdt_state::write_merge_job( &sid, "failed", started_at, Some(finished_at), Some(e), ); } } if !success { pool.auto_assign_available_work(&root).await; } }); Ok(()) } /// The actual merge pipeline, run inside a background task. async fn run_merge_pipeline( self: &Arc, project_root: &Path, story_id: &str, ) -> Result { let branch = format!("feature/story-{story_id}"); let wt_path = worktree::worktree_path(project_root, story_id); let root = project_root.to_path_buf(); let sid = story_id.to_string(); let br = branch.clone(); let merge_result = tokio::task::spawn_blocking(move || { crate::agents::merge::run_squash_merge(&root, &br, &sid) }) .await .map_err(|e| format!("Merge task panicked: {e}"))??; if !matches!( merge_result, crate::agents::merge::MergeResult::Success { .. } ) { return Ok(crate::agents::merge::MergeReport { story_id: story_id.to_string(), result: merge_result, worktree_cleaned_up: false, story_archived: false, }); } let story_archived = crate::agents::lifecycle::move_story_to_done(story_id).is_ok(); if story_archived { self.remove_agents_for_story(story_id); } let worktree_cleaned_up = if wt_path.exists() { let config = crate::config::ProjectConfig::load(project_root).unwrap_or_default(); worktree::remove_worktree_by_story_id(project_root, story_id, &config) .await .is_ok() } else { false }; self.auto_assign_available_work(project_root).await; Ok(crate::agents::merge::MergeReport { story_id: story_id.to_string(), result: merge_result, worktree_cleaned_up, story_archived, }) } }