diff --git a/server/src/agents/merge/mod.rs b/server/src/agents/merge/mod.rs index bbac0663..5108d703 100644 --- a/server/src/agents/merge/mod.rs +++ b/server/src/agents/merge/mod.rs @@ -19,12 +19,14 @@ pub enum MergeJobStatus { pub struct MergeJob { pub story_id: String, pub status: MergeJobStatus, - /// PID of the server process that started this job. + /// Server start-time (Unix seconds) of the server instance that started + /// this job. /// /// Used by stale-lock recovery: on a new merge attempt the system checks - /// every Running entry and removes any whose owning process is no longer - /// alive (e.g. the server crashed and restarted). - pub pid: u32, + /// every Running entry and removes any whose recorded start-time is older + /// than the current server's boot time. This survives `rebuild_and_restart` + /// (which re-execs and keeps the same PID). + pub server_start_time: f64, } /// Result of a mergemaster merge operation. diff --git a/server/src/agents/pool/pipeline/merge.rs b/server/src/agents/pool/pipeline/merge.rs index 8ebe5f98..6d9a65fb 100644 --- a/server/src/agents/pool/pipeline/merge.rs +++ b/server/src/agents/pool/pipeline/merge.rs @@ -15,30 +15,32 @@ use super::super::AgentPool; /// On Unix this sends signal 0 to the PID (no actual signal delivered, but /// the kernel validates whether the process exists and is reachable). /// Returns `false` for any error, including ESRCH (no such process). -#[cfg(unix)] -fn is_process_alive(pid: u32) -> bool { - // SAFETY: kill(pid, 0) is a read-only signal check; no signal is sent. - unsafe { libc::kill(pid as libc::pid_t, 0) == 0 } +/// Wall-clock time captured the first time this server process touches the +/// merge subsystem. Used to detect merge_jobs left over from a previous +/// server instance: a re-exec on `rebuild_and_restart` keeps the same PID, +/// so PID alone cannot distinguish "current" vs "previous" server. This +/// timestamp is fresh per-process (the static is reset by execve) and is +/// the source of truth for stale-merge detection. +static SERVER_START_TIME: std::sync::OnceLock = std::sync::OnceLock::new(); + +/// Return this server process's start time (lazily captured on first call). +pub(crate) fn server_start_time() -> f64 { + *SERVER_START_TIME.get_or_init(unix_now) } -/// Fallback for non-Unix platforms: assume the process is alive. -#[cfg(not(unix))] -fn is_process_alive(_pid: u32) -> bool { - true +/// Encode the current server's start-time into the CRDT `error` field for +/// a Running merge job. +fn encode_server_start_time(t: f64) -> String { + format!("{{\"server_start\":{t}}}") } -/// Encode a `pid` into the CRDT `error` field for a Running merge job. -fn encode_pid(pid: u32) -> String { - format!("{{\"pid\":{pid}}}") -} - -/// Decode a `pid` from the CRDT `error` field of a Running merge job. -fn decode_pid(error: Option<&str>) -> u32 { +/// Decode the server-start-time from a Running merge job's `error` field. +/// Returns `None` for legacy entries (which encoded `pid` instead) — those +/// are treated as stale by the cleanup pass. +fn decode_server_start_time(error: Option<&str>) -> Option { error .and_then(|e| serde_json::from_str::(e).ok()) - .and_then(|v| v["pid"].as_u64()) - .map(|p| p as u32) - .unwrap_or(0) + .and_then(|v| v["server_start"].as_f64()) } /// Current Unix timestamp in seconds as `f64`. @@ -68,17 +70,21 @@ impl AgentPool { // server crashed mid-merge: the next attempt finds a Running entry // whose owning process is gone and clears it automatically. if let Some(jobs) = crate::crdt_state::read_all_merge_jobs() { + let current_boot = server_start_time(); for job in jobs { - if job.status == "running" { - let pid = decode_pid(job.error.as_deref()); - if pid > 0 && !is_process_alive(pid) { - slog!( - "[merge] Cleared stale Running merge job for '{}' (dead pid {})", - job.story_id, - pid - ); - crate::crdt_state::delete_merge_job(&job.story_id); - } + if job.status != "running" { + continue; + } + let stale = match decode_server_start_time(job.error.as_deref()) { + Some(t) => t < current_boot, + None => true, // Legacy (pid-encoded) or malformed: stale + }; + if stale { + slog!( + "[merge] Cleared stale Running merge job for '{}' (server restarted)", + job.story_id + ); + crate::crdt_state::delete_merge_job(&job.story_id); } } } @@ -102,13 +108,12 @@ impl AgentPool { // Insert Running job into CRDT. let started_at = unix_now(); - let pid = std::process::id(); crate::crdt_state::write_merge_job( story_id, "running", started_at, None, - Some(&encode_pid(pid)), + Some(&encode_server_start_time(server_start_time())), ); let pool = Arc::clone(self); @@ -268,10 +273,10 @@ impl AgentPool { /// Completed jobs. pub fn get_merge_status(&self, story_id: &str) -> Option { let view = crate::crdt_state::read_merge_job(story_id)?; - let (status, pid) = match view.status.as_str() { + let (status, server_start_time) = match view.status.as_str() { "running" => { - let pid = decode_pid(view.error.as_deref()); - (crate::agents::merge::MergeJobStatus::Running, pid) + let t = decode_server_start_time(view.error.as_deref()).unwrap_or(0.0); + (crate::agents::merge::MergeJobStatus::Running, t) } "completed" => { let report = view @@ -289,17 +294,17 @@ impl AgentPool { worktree_cleaned_up: false, story_archived: false, }); - (crate::agents::merge::MergeJobStatus::Completed(report), 0) + (crate::agents::merge::MergeJobStatus::Completed(report), 0.0) } _ => { let err = view.error.unwrap_or_else(|| "Unknown error".to_string()); - (crate::agents::merge::MergeJobStatus::Failed(err), 0) + (crate::agents::merge::MergeJobStatus::Failed(err), 0.0) } }; Some(crate::agents::merge::MergeJob { story_id: story_id.to_string(), status, - pid, + server_start_time, }) } @@ -424,7 +429,7 @@ mod tests { "running", 1.0, None, - Some(&encode_pid(std::process::id())), + Some(&encode_server_start_time(server_start_time())), ); // With a stale Running entry, start_merge_agent_work must be blocked. @@ -471,22 +476,14 @@ mod tests { let pool = Arc::new(AgentPool::new_test(3001)); - // Obtain a PID that is guaranteed to be dead by spawning a short-lived - // process and waiting for it to exit. - let dead_pid = { - let mut child = Command::new("true").spawn().unwrap(); - let pid = child.id(); - child.wait().unwrap(); - pid - }; - - // Seed CRDT merge_jobs with a Running entry whose PID is dead. + // Seed CRDT merge_jobs with a Running entry whose recorded server-start + // time is older than the current server (legacy / previous instance). crate::crdt_state::write_merge_job( "719_stale_other", "running", 1.0, None, - Some(&encode_pid(dead_pid)), + Some(&encode_server_start_time(0.0)), // legacy/older boot — should be cleaned up ); // Verify the entry is present before the sweep.