diff --git a/server/src/agents/pool/pipeline/merge/runner.rs b/server/src/agents/pool/pipeline/merge/runner.rs index c0efc4a9..0b847472 100644 --- a/server/src/agents/pool/pipeline/merge/runner.rs +++ b/server/src/agents/pool/pipeline/merge/runner.rs @@ -10,23 +10,17 @@ use super::time::{ }; impl AgentPool { - /// Start the merge pipeline as a background task. + /// Sweep all Running merge jobs in the CRDT and delete any that were left + /// behind by a previous server instance. /// - /// Returns immediately so the MCP tool call doesn't time out (the full - /// pipeline — squash merge + quality gates — takes well over 60 seconds, - /// exceeding Claude Code's MCP tool-call timeout). + /// A job is considered stale when its recorded `server_start` timestamp is + /// older than the current server's boot time, or when the `error` field + /// cannot be decoded (legacy / malformed entries). /// - /// The mergemaster agent should poll [`get_merge_status`](Self::get_merge_status) - /// until the job reaches a terminal state. - pub fn start_merge_agent_work( - self: &Arc, - project_root: &Path, - story_id: &str, - ) -> Result<(), String> { - // Sweep stale Running entries left behind by dead processes before - // applying the double-start guard. This handles the case where the - // server crashed mid-merge: the next attempt finds a Running entry - // whose owning process is gone and clears it automatically. + /// Called at the top of [`start_merge_agent_work`] to unblock retries, and + /// also by the periodic background reaper in the tick loop so stale entries + /// are cleaned up even when no new merge is triggered. + pub(crate) fn reap_stale_merge_jobs(&self) { if let Some(jobs) = crate::crdt_state::read_all_merge_jobs() { let current_boot = server_start_time(); for job in jobs { @@ -46,6 +40,24 @@ impl AgentPool { } } } + } + + /// Start the merge pipeline as a background task. + /// + /// Returns immediately so the MCP tool call doesn't time out (the full + /// pipeline — squash merge + quality gates — takes well over 60 seconds, + /// exceeding Claude Code's MCP tool-call timeout). + /// + /// The mergemaster agent should poll [`get_merge_status`](Self::get_merge_status) + /// until the job reaches a terminal state. + pub fn start_merge_agent_work( + self: &Arc, + project_root: &Path, + story_id: &str, + ) -> Result<(), String> { + // Sweep stale Running entries left behind by dead processes before + // applying the double-start guard. + self.reap_stale_merge_jobs(); // Guard against double-starts; clear any completed/failed entry so the // caller can retry without needing to call a separate cleanup step. diff --git a/server/src/agents/pool/pipeline/merge/tests.rs b/server/src/agents/pool/pipeline/merge/tests.rs index 44196771..07ab2a60 100644 --- a/server/src/agents/pool/pipeline/merge/tests.rs +++ b/server/src/agents/pool/pipeline/merge/tests.rs @@ -87,6 +87,47 @@ async fn stale_running_merge_job_is_cleared_and_retry_succeeds() { ); } +// ── story 852: periodic background reaper ──────────────────────────────── + +/// AC2: a Running merge_job whose `server_start` is older than the current +/// server's boot time must be deleted by `reap_stale_merge_jobs` without any +/// merge attempt being triggered. +#[tokio::test] +async fn reap_stale_merge_jobs_removes_old_running_entry_without_merge() { + crate::crdt_state::init_for_test(); + + let pool = Arc::new(AgentPool::new_test(3001)); + + // Inject a Running entry whose server_start predates the current server. + crate::crdt_state::write_merge_job( + "852_stale_reaper", + "running", + 1.0, + None, + Some(&super::time::encode_server_start_time(0.0)), // older boot → stale + ); + assert!( + crate::crdt_state::read_merge_job("852_stale_reaper").is_some(), + "stale entry must exist before reap" + ); + + // Reap: must remove the entry without triggering a merge pipeline. + pool.reap_stale_merge_jobs(); + + assert!( + crate::crdt_state::read_merge_job("852_stale_reaper").is_none(), + "reap_stale_merge_jobs must delete the stale Running entry" + ); + + // No agents must have been spawned (no merge was triggered). + let agents = pool.agents.lock().unwrap(); + assert!( + agents.is_empty(), + "reap must not spawn any agents; got {} agent(s)", + agents.len() + ); +} + // ── story 719: stale-lock recovery on new merge attempts ───────────────── /// AC1/AC2/AC3: seeding merge_jobs with an entry whose PID is dead, then diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index 356d1217..73803d70 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -114,6 +114,7 @@ pub(crate) fn spawn_tick_loop( } // Watchdog: detect orphaned Running agents every 30 ticks. + // Also reap stale Running merge_jobs from previous server instances. if tick_count.is_multiple_of(30) { let found = agents.run_watchdog_pass(root.as_deref()); if found > 0 { @@ -124,6 +125,7 @@ pub(crate) fn spawn_tick_loop( agents.auto_assign_available_work(r).await; } } + agents.reap_stale_merge_jobs(); } // Sweep: promote done→archived every sweep_interval_secs ticks.