//! Merge pipeline runner — start_merge_agent_work and run_merge_pipeline. use crate::slog; use crate::worktree; use std::path::Path; use std::sync::Arc; use super::super::super::AgentPool; use super::time::{ decode_server_start_time, encode_server_start_time, server_start_time, unix_now, }; impl AgentPool { /// Sweep all Running merge jobs in the CRDT and delete any that were left /// behind by a previous server instance. /// /// A job is considered stale when its recorded `server_start` timestamp is /// older than the current server's boot time, or when the `error` field /// cannot be decoded (legacy / malformed entries). /// /// Called at the top of [`start_merge_agent_work`] to unblock retries, and /// also by the periodic background reaper in the tick loop so stale entries /// are cleaned up even when no new merge is triggered. pub(crate) fn reap_stale_merge_jobs(&self) { if let Some(jobs) = crate::crdt_state::read_all_merge_jobs() { let current_boot = server_start_time(); for job in jobs { if job.status != "running" { continue; } let stale = match decode_server_start_time(job.error.as_deref()) { Some(t) => t < current_boot, None => true, // Legacy (pid-encoded) or malformed: stale }; if stale { slog!( "[merge] Cleared stale Running merge job for '{}' (server restarted)", job.story_id ); crate::crdt_state::delete_merge_job(&job.story_id); } } } } /// Start the merge pipeline as a background task. /// /// Returns immediately so the MCP tool call doesn't time out (the full /// pipeline — squash merge + quality gates — takes well over 60 seconds, /// exceeding Claude Code's MCP tool-call timeout). /// /// The mergemaster agent should poll [`get_merge_status`](Self::get_merge_status) /// until the job reaches a terminal state. pub fn start_merge_agent_work( self: &Arc, project_root: &Path, story_id: &str, ) -> Result<(), String> { // Sweep stale Running entries left behind by dead processes before // applying the double-start guard. self.reap_stale_merge_jobs(); // Guard against double-starts; clear any completed/failed entry so the // caller can retry without needing to call a separate cleanup step. if let Some(job) = crate::crdt_state::read_merge_job(story_id) { match job.status.as_str() { "running" => { return Err(format!( "Merge already in progress for '{story_id}'. \ Use get_merge_status to poll for completion." )); } // Completed or Failed: clear stale entry so we can start fresh. _ => { crate::crdt_state::delete_merge_job(story_id); } } } // Insert Running job into CRDT. let started_at = unix_now(); crate::crdt_state::write_merge_job( story_id, "running", started_at, None, Some(&encode_server_start_time(server_start_time())), ); let pool = Arc::clone(self); let root = project_root.to_path_buf(); let sid = story_id.to_string(); tokio::spawn(async move { let report = pool.run_merge_pipeline(&root, &sid).await; let success = matches!(&report, Ok(r) if r.success); let finished_at = unix_now(); // On any failure: record merge_failure in CRDT and emit notification. if !success { let reason = match &report { Ok(r) => { if r.had_conflicts { format!( "Merge conflict: {}", r.conflict_details .as_deref() .unwrap_or("conflicts detected") ) } else { format!("Quality gates failed: {}", r.gate_output) } } Err(e) => e.clone(), }; let is_no_commits = reason.contains("no commits to merge"); if let Some(contents) = crate::db::read_content(&sid) { let with_failure = crate::io::story_metadata::write_merge_failure_in_content( &contents, &reason, ); let updated = if is_no_commits { crate::io::story_metadata::write_blocked_in_content(&with_failure) } else { with_failure }; crate::db::write_content(&sid, &updated); crate::db::write_item_with_content(&sid, "4_merge", &updated); } if is_no_commits { let _ = pool .watcher_tx .send(crate::io::watcher::WatcherEvent::StoryBlocked { story_id: sid.clone(), reason, }); } else { let _ = pool .watcher_tx .send(crate::io::watcher::WatcherEvent::MergeFailure { story_id: sid.clone(), reason, }); } } // Update CRDT with terminal status. match &report { Ok(r) => { let report_json = serde_json::to_string(r).unwrap_or_else(|_| String::new()); crate::crdt_state::write_merge_job( &sid, "completed", started_at, Some(finished_at), Some(&report_json), ); } Err(e) => { crate::crdt_state::write_merge_job( &sid, "failed", started_at, Some(finished_at), Some(e), ); } } if !success { pool.auto_assign_available_work(&root).await; } }); Ok(()) } /// The actual merge pipeline, run inside a background task. async fn run_merge_pipeline( self: &Arc, project_root: &Path, story_id: &str, ) -> Result { let branch = format!("feature/story-{story_id}"); let wt_path = worktree::worktree_path(project_root, story_id); let root = project_root.to_path_buf(); let sid = story_id.to_string(); let br = branch.clone(); let merge_result = tokio::task::spawn_blocking(move || { crate::agents::merge::run_squash_merge(&root, &br, &sid) }) .await .map_err(|e| format!("Merge task panicked: {e}"))??; if !merge_result.success { return Ok(crate::agents::merge::MergeReport { story_id: story_id.to_string(), success: false, had_conflicts: merge_result.had_conflicts, conflicts_resolved: merge_result.conflicts_resolved, conflict_details: merge_result.conflict_details, gates_passed: merge_result.gates_passed, gate_output: merge_result.output, worktree_cleaned_up: false, story_archived: false, }); } let story_archived = crate::agents::lifecycle::move_story_to_done(story_id).is_ok(); if story_archived { self.remove_agents_for_story(story_id); } let worktree_cleaned_up = if wt_path.exists() { let config = crate::config::ProjectConfig::load(project_root).unwrap_or_default(); worktree::remove_worktree_by_story_id(project_root, story_id, &config) .await .is_ok() } else { false }; self.auto_assign_available_work(project_root).await; Ok(crate::agents::merge::MergeReport { story_id: story_id.to_string(), success: true, had_conflicts: merge_result.had_conflicts, conflicts_resolved: merge_result.conflicts_resolved, conflict_details: merge_result.conflict_details, gates_passed: true, gate_output: merge_result.output, worktree_cleaned_up, story_archived, }) } }