fix: reap_stale_merge_jobs re-dispatches instead of just deleting

A mid-merge server restart used to silently kill the merge: the
in-flight tokio task died with the process, reap_stale_merge_jobs ran
on the new boot, saw the Running entry from the previous boot, and
simply deleted it. Mergemaster polling `get_merge_status` then saw
"Merge job disappeared", treated it as a strike, and after three
restarts escalated the story to MergeFailureFinal — even though no
real merge failure ever happened (this is what trapped story 998
during the bug 1001 iteration cycle).

Reap now also fires a `WatcherEvent::WorkItem reassign` for the
cleared story so the auto-assign watcher loop re-runs
start_merge_agent_work on the fresh boot. The story is still in
4_merge/; the merge resumes automatically. The change is contained to
the reap path — start_merge_agent_work's own behaviour is unchanged.

Added regression test
reap_stale_merge_jobs_emits_reassign_watcher_event that asserts the
new event fires. Existing
reap_stale_merge_jobs_removes_old_running_entry_without_merge still
passes (the "without_merge" guarantee is about agent spawning, not
about absence of watcher events).

Also exposes AgentPool::watcher_tx() as pub(crate) so the merge
runner can fan out re-dispatch events.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Timmy
2026-05-13 21:28:10 +01:00
parent bbdee1239b
commit 2758f744f2
3 changed files with 87 additions and 9 deletions
+8
View File
@@ -104,6 +104,14 @@ impl AgentPool {
self.port self.port
} }
/// Return a clone of the broadcaster used for filesystem-watcher events.
///
/// Used by sibling modules (e.g. the merge runner) that need to fire
/// re-dispatch events without holding a long-lived reference.
pub(crate) fn watcher_tx(&self) -> broadcast::Sender<WatcherEvent> {
self.watcher_tx.clone()
}
/// Returns the project-scoped [`StatusBroadcaster`] owned by this pool. /// Returns the project-scoped [`StatusBroadcaster`] owned by this pool.
/// ///
/// Callers that also construct a [`crate::services::Services`] bundle should /// Callers that also construct a [`crate::services::Services`] bundle should
@@ -11,16 +11,27 @@ use super::time::{
}; };
impl AgentPool { impl AgentPool {
/// Sweep all Running merge jobs in the CRDT and delete any that were left /// Sweep all Running merge jobs in the CRDT and clear any left behind by
/// behind by a previous server instance. /// a previous server instance, re-dispatching them to the auto-assigner
/// so the merge resumes on the new boot.
/// ///
/// A job is considered stale when its recorded `server_start` timestamp is /// A job is considered stale when its recorded `server_start` timestamp
/// older than the current server's boot time, or when the `error` field /// is older than the current server's boot time, or when the `error`
/// cannot be decoded (legacy / malformed entries). /// field cannot be decoded (legacy / malformed entries).
/// ///
/// Called at the top of [`start_merge_agent_work`] to unblock retries, and /// For every stale job we (a) delete the orphaned Running entry and (b)
/// also by the periodic background reaper in the tick loop so stale entries /// emit a `WatcherEvent::WorkItem reassign` for the same story_id in
/// are cleaned up even when no new merge is triggered. /// `4_merge/`, so the auto-assign watcher loop re-triggers
/// `start_merge_agent_work` on the fresh boot. Without (b), a mid-merge
/// server restart would leave the mergemaster polling `get_merge_status`
/// only to see "Merge job disappeared", burn a retry, and after three
/// such interruptions escalate to `MergeFailureFinal` — a process bug,
/// not a real merge failure (this is what happened to story 998 during
/// the bug 1001 fix iterations).
///
/// Called at the top of [`start_merge_agent_work`] to unblock retries,
/// and also by the periodic background reaper in the tick loop so stale
/// entries are cleaned up even when no new merge is triggered.
pub(crate) fn reap_stale_merge_jobs(&self) { pub(crate) fn reap_stale_merge_jobs(&self) {
if let Some(jobs) = crate::crdt_state::read_all_merge_jobs() { if let Some(jobs) = crate::crdt_state::read_all_merge_jobs() {
let current_boot = server_start_time(); let current_boot = server_start_time();
@@ -34,10 +45,22 @@ impl AgentPool {
}; };
if stale { if stale {
slog!( slog!(
"[merge] Cleared stale Running merge job for '{}' (server restarted)", "[merge] Cleared stale Running merge job for '{}' (server restarted) — re-dispatching",
job.story_id job.story_id
); );
crate::crdt_state::delete_merge_job(&job.story_id); crate::crdt_state::delete_merge_job(&job.story_id);
// Re-trigger the merge on the new boot. Auto-assign sees
// a story in 4_merge/ with no Running job and will call
// start_merge_agent_work again.
let _ = self
.watcher_tx()
.send(crate::io::watcher::WatcherEvent::WorkItem {
stage: "4_merge".to_string(),
item_id: job.story_id.clone(),
action: "reassign".to_string(),
commit_msg: String::new(),
from_stage: None,
});
} }
} }
} }
@@ -157,6 +157,53 @@ async fn reap_stale_merge_jobs_removes_old_running_entry_without_merge() {
); );
} }
/// Reaping a stale Running merge job must also fire a `WatcherEvent::WorkItem`
/// reassign so the auto-assign watcher loop re-triggers the merge on the
/// fresh boot. Without this, a mid-merge server restart causes mergemaster
/// to see "Merge job disappeared", burn a retry, and eventually escalate to
/// MergeFailureFinal even though the merge never actually failed.
#[tokio::test]
async fn reap_stale_merge_jobs_emits_reassign_watcher_event() {
let _serial = serial_test_lock();
crate::crdt_state::init_for_test();
use tokio::sync::broadcast;
let (watcher_tx, mut rx) = broadcast::channel::<crate::io::watcher::WatcherEvent>(16);
let pool = Arc::new(AgentPool::new(3001, watcher_tx));
crate::crdt_state::write_merge_job(
"853_stale_reapdispatch",
"running",
1.0,
None,
Some(&super::time::encode_server_start_time(0.0)),
);
pool.reap_stale_merge_jobs();
// Drain the channel and look for a matching reassign event.
let mut found = false;
while let Ok(event) = rx.try_recv() {
if let crate::io::watcher::WatcherEvent::WorkItem {
stage,
item_id,
action,
..
} = event
&& stage == "4_merge"
&& item_id == "853_stale_reapdispatch"
&& action == "reassign"
{
found = true;
break;
}
}
assert!(
found,
"reap_stale_merge_jobs must emit a 4_merge reassign event for the cleared job"
);
}
// ── story 719: stale-lock recovery on new merge attempts ───────────────── // ── story 719: stale-lock recovery on new merge attempts ─────────────────
/// AC1/AC2/AC3: seeding merge_jobs with an entry whose PID is dead, then /// AC1/AC2/AC3: seeding merge_jobs with an entry whose PID is dead, then