fix(merge): use server-start-time, not pid, for stale-merge detection

The merge_jobs cleanup encoded the server's pid in the CRDT and checked
`kill(pid, 0)` to decide whether a "running" entry was stale. Two problems:

  1. The cleanup runs *inside* the server, so checking whether the
     server's own pid is alive is tautological — kill(self_pid, 0)
     always succeeds.
  2. `rebuild_and_restart` does an `execve()` re-exec, which keeps the
     same pid. After re-exec, merge_jobs from the previous server
     instance still encode "the current pid" — so the cleanup never
     fires, and stories like 799/800 sit forever with status="running"
     while no actual merge runs.

Switch to a per-process server-start-time captured lazily in a
`OnceLock<f64>` (reset by execve, so the new instance sees a fresh
boot-time). A merge_job's recorded start-time < current boot-time means
it came from a previous instance: stale, delete it.

Legacy pid-encoded entries decode to None and are also treated as stale.

MergeJob.pid → MergeJob.server_start_time. Tests updated.
This commit is contained in:
dave
2026-04-28 20:41:32 +00:00
parent 8f392f4fc7
commit 2a77f73ba4
2 changed files with 51 additions and 52 deletions
+6 -4
View File
@@ -19,12 +19,14 @@ pub enum MergeJobStatus {
pub struct MergeJob {
pub story_id: String,
pub status: MergeJobStatus,
/// PID of the server process that started this job.
/// Server start-time (Unix seconds) of the server instance that started
/// this job.
///
/// Used by stale-lock recovery: on a new merge attempt the system checks
/// every Running entry and removes any whose owning process is no longer
/// alive (e.g. the server crashed and restarted).
pub pid: u32,
/// every Running entry and removes any whose recorded start-time is older
/// than the current server's boot time. This survives `rebuild_and_restart`
/// (which re-execs and keeps the same PID).
pub server_start_time: f64,
}
/// Result of a mergemaster merge operation.
+45 -48
View File
@@ -15,30 +15,32 @@ use super::super::AgentPool;
/// On Unix this sends signal 0 to the PID (no actual signal delivered, but
/// the kernel validates whether the process exists and is reachable).
/// Returns `false` for any error, including ESRCH (no such process).
#[cfg(unix)]
fn is_process_alive(pid: u32) -> bool {
// SAFETY: kill(pid, 0) is a read-only signal check; no signal is sent.
unsafe { libc::kill(pid as libc::pid_t, 0) == 0 }
/// Wall-clock time captured the first time this server process touches the
/// merge subsystem. Used to detect merge_jobs left over from a previous
/// server instance: a re-exec on `rebuild_and_restart` keeps the same PID,
/// so PID alone cannot distinguish "current" vs "previous" server. This
/// timestamp is fresh per-process (the static is reset by execve) and is
/// the source of truth for stale-merge detection.
static SERVER_START_TIME: std::sync::OnceLock<f64> = std::sync::OnceLock::new();
/// Return this server process's start time (lazily captured on first call).
pub(crate) fn server_start_time() -> f64 {
*SERVER_START_TIME.get_or_init(unix_now)
}
/// Fallback for non-Unix platforms: assume the process is alive.
#[cfg(not(unix))]
fn is_process_alive(_pid: u32) -> bool {
true
/// Encode the current server's start-time into the CRDT `error` field for
/// a Running merge job.
fn encode_server_start_time(t: f64) -> String {
format!("{{\"server_start\":{t}}}")
}
/// Encode a `pid` into the CRDT `error` field for a Running merge job.
fn encode_pid(pid: u32) -> String {
format!("{{\"pid\":{pid}}}")
}
/// Decode a `pid` from the CRDT `error` field of a Running merge job.
fn decode_pid(error: Option<&str>) -> u32 {
/// Decode the server-start-time from a Running merge job's `error` field.
/// Returns `None` for legacy entries (which encoded `pid` instead) — those
/// are treated as stale by the cleanup pass.
fn decode_server_start_time(error: Option<&str>) -> Option<f64> {
error
.and_then(|e| serde_json::from_str::<serde_json::Value>(e).ok())
.and_then(|v| v["pid"].as_u64())
.map(|p| p as u32)
.unwrap_or(0)
.and_then(|v| v["server_start"].as_f64())
}
/// Current Unix timestamp in seconds as `f64`.
@@ -68,17 +70,21 @@ impl AgentPool {
// server crashed mid-merge: the next attempt finds a Running entry
// whose owning process is gone and clears it automatically.
if let Some(jobs) = crate::crdt_state::read_all_merge_jobs() {
let current_boot = server_start_time();
for job in jobs {
if job.status == "running" {
let pid = decode_pid(job.error.as_deref());
if pid > 0 && !is_process_alive(pid) {
slog!(
"[merge] Cleared stale Running merge job for '{}' (dead pid {})",
job.story_id,
pid
);
crate::crdt_state::delete_merge_job(&job.story_id);
}
if job.status != "running" {
continue;
}
let stale = match decode_server_start_time(job.error.as_deref()) {
Some(t) => t < current_boot,
None => true, // Legacy (pid-encoded) or malformed: stale
};
if stale {
slog!(
"[merge] Cleared stale Running merge job for '{}' (server restarted)",
job.story_id
);
crate::crdt_state::delete_merge_job(&job.story_id);
}
}
}
@@ -102,13 +108,12 @@ impl AgentPool {
// Insert Running job into CRDT.
let started_at = unix_now();
let pid = std::process::id();
crate::crdt_state::write_merge_job(
story_id,
"running",
started_at,
None,
Some(&encode_pid(pid)),
Some(&encode_server_start_time(server_start_time())),
);
let pool = Arc::clone(self);
@@ -268,10 +273,10 @@ impl AgentPool {
/// Completed jobs.
pub fn get_merge_status(&self, story_id: &str) -> Option<crate::agents::merge::MergeJob> {
let view = crate::crdt_state::read_merge_job(story_id)?;
let (status, pid) = match view.status.as_str() {
let (status, server_start_time) = match view.status.as_str() {
"running" => {
let pid = decode_pid(view.error.as_deref());
(crate::agents::merge::MergeJobStatus::Running, pid)
let t = decode_server_start_time(view.error.as_deref()).unwrap_or(0.0);
(crate::agents::merge::MergeJobStatus::Running, t)
}
"completed" => {
let report = view
@@ -289,17 +294,17 @@ impl AgentPool {
worktree_cleaned_up: false,
story_archived: false,
});
(crate::agents::merge::MergeJobStatus::Completed(report), 0)
(crate::agents::merge::MergeJobStatus::Completed(report), 0.0)
}
_ => {
let err = view.error.unwrap_or_else(|| "Unknown error".to_string());
(crate::agents::merge::MergeJobStatus::Failed(err), 0)
(crate::agents::merge::MergeJobStatus::Failed(err), 0.0)
}
};
Some(crate::agents::merge::MergeJob {
story_id: story_id.to_string(),
status,
pid,
server_start_time,
})
}
@@ -424,7 +429,7 @@ mod tests {
"running",
1.0,
None,
Some(&encode_pid(std::process::id())),
Some(&encode_server_start_time(server_start_time())),
);
// With a stale Running entry, start_merge_agent_work must be blocked.
@@ -471,22 +476,14 @@ mod tests {
let pool = Arc::new(AgentPool::new_test(3001));
// Obtain a PID that is guaranteed to be dead by spawning a short-lived
// process and waiting for it to exit.
let dead_pid = {
let mut child = Command::new("true").spawn().unwrap();
let pid = child.id();
child.wait().unwrap();
pid
};
// Seed CRDT merge_jobs with a Running entry whose PID is dead.
// Seed CRDT merge_jobs with a Running entry whose recorded server-start
// time is older than the current server (legacy / previous instance).
crate::crdt_state::write_merge_job(
"719_stale_other",
"running",
1.0,
None,
Some(&encode_pid(dead_pid)),
Some(&encode_server_start_time(0.0)), // legacy/older boot — should be cleaned up
);
// Verify the entry is present before the sweep.