diff --git a/server/src/agents/pool/mod.rs b/server/src/agents/pool/mod.rs index 995c9ab5..c897c302 100644 --- a/server/src/agents/pool/mod.rs +++ b/server/src/agents/pool/mod.rs @@ -104,6 +104,14 @@ impl AgentPool { self.port } + /// Return a clone of the broadcaster used for filesystem-watcher events. + /// + /// Used by sibling modules (e.g. the merge runner) that need to fire + /// re-dispatch events without holding a long-lived reference. + pub(crate) fn watcher_tx(&self) -> broadcast::Sender { + self.watcher_tx.clone() + } + /// Returns the project-scoped [`StatusBroadcaster`] owned by this pool. /// /// Callers that also construct a [`crate::services::Services`] bundle should diff --git a/server/src/agents/pool/pipeline/merge/runner.rs b/server/src/agents/pool/pipeline/merge/runner.rs index d0200cd4..47b8ee87 100644 --- a/server/src/agents/pool/pipeline/merge/runner.rs +++ b/server/src/agents/pool/pipeline/merge/runner.rs @@ -11,16 +11,27 @@ use super::time::{ }; impl AgentPool { - /// Sweep all Running merge jobs in the CRDT and delete any that were left - /// behind by a previous server instance. + /// Sweep all Running merge jobs in the CRDT and clear any left behind by + /// a previous server instance, re-dispatching them to the auto-assigner + /// so the merge resumes on the new boot. /// - /// A job is considered stale when its recorded `server_start` timestamp is - /// older than the current server's boot time, or when the `error` field - /// cannot be decoded (legacy / malformed entries). + /// A job is considered stale when its recorded `server_start` timestamp + /// is older than the current server's boot time, or when the `error` + /// field cannot be decoded (legacy / malformed entries). /// - /// Called at the top of [`start_merge_agent_work`] to unblock retries, and - /// also by the periodic background reaper in the tick loop so stale entries - /// are cleaned up even when no new merge is triggered. + /// For every stale job we (a) delete the orphaned Running entry and (b) + /// emit a `WatcherEvent::WorkItem reassign` for the same story_id in + /// `4_merge/`, so the auto-assign watcher loop re-triggers + /// `start_merge_agent_work` on the fresh boot. Without (b), a mid-merge + /// server restart would leave the mergemaster polling `get_merge_status` + /// only to see "Merge job disappeared", burn a retry, and after three + /// such interruptions escalate to `MergeFailureFinal` — a process bug, + /// not a real merge failure (this is what happened to story 998 during + /// the bug 1001 fix iterations). + /// + /// Called at the top of [`start_merge_agent_work`] to unblock retries, + /// and also by the periodic background reaper in the tick loop so stale + /// entries are cleaned up even when no new merge is triggered. pub(crate) fn reap_stale_merge_jobs(&self) { if let Some(jobs) = crate::crdt_state::read_all_merge_jobs() { let current_boot = server_start_time(); @@ -34,10 +45,22 @@ impl AgentPool { }; if stale { slog!( - "[merge] Cleared stale Running merge job for '{}' (server restarted)", + "[merge] Cleared stale Running merge job for '{}' (server restarted) — re-dispatching", job.story_id ); crate::crdt_state::delete_merge_job(&job.story_id); + // Re-trigger the merge on the new boot. Auto-assign sees + // a story in 4_merge/ with no Running job and will call + // start_merge_agent_work again. + let _ = self + .watcher_tx() + .send(crate::io::watcher::WatcherEvent::WorkItem { + stage: "4_merge".to_string(), + item_id: job.story_id.clone(), + action: "reassign".to_string(), + commit_msg: String::new(), + from_stage: None, + }); } } } diff --git a/server/src/agents/pool/pipeline/merge/tests.rs b/server/src/agents/pool/pipeline/merge/tests.rs index 86f9a379..51652ef3 100644 --- a/server/src/agents/pool/pipeline/merge/tests.rs +++ b/server/src/agents/pool/pipeline/merge/tests.rs @@ -157,6 +157,53 @@ async fn reap_stale_merge_jobs_removes_old_running_entry_without_merge() { ); } +/// Reaping a stale Running merge job must also fire a `WatcherEvent::WorkItem` +/// reassign so the auto-assign watcher loop re-triggers the merge on the +/// fresh boot. Without this, a mid-merge server restart causes mergemaster +/// to see "Merge job disappeared", burn a retry, and eventually escalate to +/// MergeFailureFinal even though the merge never actually failed. +#[tokio::test] +async fn reap_stale_merge_jobs_emits_reassign_watcher_event() { + let _serial = serial_test_lock(); + crate::crdt_state::init_for_test(); + + use tokio::sync::broadcast; + let (watcher_tx, mut rx) = broadcast::channel::(16); + let pool = Arc::new(AgentPool::new(3001, watcher_tx)); + + crate::crdt_state::write_merge_job( + "853_stale_reapdispatch", + "running", + 1.0, + None, + Some(&super::time::encode_server_start_time(0.0)), + ); + + pool.reap_stale_merge_jobs(); + + // Drain the channel and look for a matching reassign event. + let mut found = false; + while let Ok(event) = rx.try_recv() { + if let crate::io::watcher::WatcherEvent::WorkItem { + stage, + item_id, + action, + .. + } = event + && stage == "4_merge" + && item_id == "853_stale_reapdispatch" + && action == "reassign" + { + found = true; + break; + } + } + assert!( + found, + "reap_stale_merge_jobs must emit a 4_merge reassign event for the cleared job" + ); +} + // ── story 719: stale-lock recovery on new merge attempts ───────────────── /// AC1/AC2/AC3: seeding merge_jobs with an entry whose PID is dead, then