From 101f616346d930c52af27211c1499d70cd518ca3 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 27 Apr 2026 17:41:39 +0000 Subject: [PATCH] huskies: merge 719_refactor_stale_merge_job_lock_recovery_on_new_merge_attempts --- server/src/agents/merge/mod.rs | 6 ++ server/src/agents/pool/pipeline/merge.rs | 111 ++++++++++++++++++++++- 2 files changed, 116 insertions(+), 1 deletion(-) diff --git a/server/src/agents/merge/mod.rs b/server/src/agents/merge/mod.rs index 73aabb7f..b9fbb4ae 100644 --- a/server/src/agents/merge/mod.rs +++ b/server/src/agents/merge/mod.rs @@ -20,6 +20,12 @@ pub enum MergeJobStatus { pub struct MergeJob { pub story_id: String, pub status: MergeJobStatus, + /// PID of the server process that started this job. + /// + /// Used by stale-lock recovery: on a new merge attempt the system checks + /// every Running entry and removes any whose owning process is no longer + /// alive (e.g. the server crashed and restarted). + pub pid: u32, } /// Result of a mergemaster merge operation. diff --git a/server/src/agents/pool/pipeline/merge.rs b/server/src/agents/pool/pipeline/merge.rs index 27f32c39..dacb4afd 100644 --- a/server/src/agents/pool/pipeline/merge.rs +++ b/server/src/agents/pool/pipeline/merge.rs @@ -10,6 +10,23 @@ use super::super::super::PipelineStage; use super::super::super::pipeline_stage; use super::super::AgentPool; +/// Returns `true` if the process with the given PID is currently alive. +/// +/// On Unix this sends signal 0 to the PID (no actual signal delivered, but +/// the kernel validates whether the process exists and is reachable). +/// Returns `false` for any error, including ESRCH (no such process). +#[cfg(unix)] +fn is_process_alive(pid: u32) -> bool { + // SAFETY: kill(pid, 0) is a read-only signal check; no signal is sent. + unsafe { libc::kill(pid as libc::pid_t, 0) == 0 } +} + +/// Fallback for non-Unix platforms: assume the process is alive. +#[cfg(not(unix))] +fn is_process_alive(_pid: u32) -> bool { + true +} + impl AgentPool { /// Start the merge pipeline as a background task. /// @@ -24,6 +41,31 @@ impl AgentPool { project_root: &Path, story_id: &str, ) -> Result<(), String> { + // Sweep stale Running entries left behind by dead processes before + // applying the double-start guard. This handles the case where the + // server crashed mid-merge: the next attempt finds a Running entry + // whose owning process is gone and clears it automatically. + { + let mut jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?; + let stale_ids: Vec = jobs + .iter() + .filter_map(|(sid, job)| { + if matches!(job.status, crate::agents::merge::MergeJobStatus::Running) + && !is_process_alive(job.pid) + { + Some(sid.clone()) + } else { + None + } + }) + .collect(); + for sid in stale_ids { + let dead_pid = jobs[&sid].pid; + jobs.remove(&sid); + slog!("[merge] Cleared stale Running merge job for '{sid}' (dead pid {dead_pid})"); + } + } + // Guard against double-starts; clear any completed/failed entry so the // caller can retry without needing to call a separate cleanup step. { @@ -52,6 +94,7 @@ impl AgentPool { crate::agents::merge::MergeJob { story_id: story_id.to_string(), status: crate::agents::merge::MergeJobStatus::Running, + pid: std::process::id(), }, ); } @@ -241,7 +284,9 @@ mod tests { let pool = Arc::new(AgentPool::new_test(3001)); // Inject a stale Running entry, simulating a mergemaster that died - // before the merge pipeline completed. + // before the merge pipeline completed. Use the current process PID so + // the stale-lock sweep (which checks whether the PID is alive) does NOT + // auto-remove it — this test verifies the double-start guard path. { let mut jobs = pool.merge_jobs.lock().unwrap(); jobs.insert( @@ -249,6 +294,7 @@ mod tests { MergeJob { story_id: "77_story_stale".to_string(), status: MergeJobStatus::Running, + pid: std::process::id(), }, ); } @@ -285,6 +331,69 @@ mod tests { ); } + // ── story 719: stale-lock recovery on new merge attempts ───────────────── + + /// AC1/AC2/AC3: seeding merge_jobs with an entry whose PID is dead, then + /// triggering a new merge for a *different* story, must automatically remove + /// the stale entry (AC1/AC3) and log at INFO (AC2 — verified structurally + /// because the log path is exercised when the entry is removed). + #[cfg(unix)] + #[tokio::test] + async fn stale_merge_job_with_dead_pid_is_swept_on_new_merge_attempt() { + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let repo = tmp.path(); + init_git_repo(repo); + + let pool = Arc::new(AgentPool::new_test(3001)); + + // Obtain a PID that is guaranteed to be dead by spawning a short-lived + // process and waiting for it to exit. + let dead_pid = { + let mut child = Command::new("true").spawn().unwrap(); + let pid = child.id(); + child.wait().unwrap(); + pid + }; + + // Seed merge_jobs with a Running entry whose PID is dead. + { + let mut jobs = pool.merge_jobs.lock().unwrap(); + jobs.insert( + "719_stale_other".to_string(), + MergeJob { + story_id: "719_stale_other".to_string(), + status: MergeJobStatus::Running, + pid: dead_pid, + }, + ); + } + + // Verify the entry is present before the sweep. + assert!( + pool.merge_jobs + .lock() + .unwrap() + .contains_key("719_stale_other"), + "stale entry should exist before new merge attempt" + ); + + // Trigger a new merge for a *different* story. The sweep runs at the + // top of start_merge_agent_work and must remove the dead-PID entry. + let _ = pool.start_merge_agent_work(repo, "719_trigger_story"); + + // The stale entry must have been cleared. + assert!( + !pool + .merge_jobs + .lock() + .unwrap() + .contains_key("719_stale_other"), + "stale entry with dead pid must be removed when a new merge attempt starts" + ); + } + // ── merge_agent_work tests ──────────────────────────────────────────────── /// Helper: start a merge and poll until terminal state.