huskies: merge 719_refactor_stale_merge_job_lock_recovery_on_new_merge_attempts

This commit is contained in:
dave
2026-04-27 17:41:39 +00:00
parent 1ecb4dad55
commit 101f616346
2 changed files with 116 additions and 1 deletions
+6
View File
@@ -20,6 +20,12 @@ pub enum MergeJobStatus {
pub struct MergeJob {
pub story_id: String,
pub status: MergeJobStatus,
/// PID of the server process that started this job.
///
/// Used by stale-lock recovery: on a new merge attempt the system checks
/// every Running entry and removes any whose owning process is no longer
/// alive (e.g. the server crashed and restarted).
pub pid: u32,
}
/// Result of a mergemaster merge operation.
+110 -1
View File
@@ -10,6 +10,23 @@ use super::super::super::PipelineStage;
use super::super::super::pipeline_stage;
use super::super::AgentPool;
/// Returns `true` if the process with the given PID is currently alive.
///
/// On Unix this sends signal 0 to the PID (no actual signal delivered, but
/// the kernel validates whether the process exists and is reachable).
/// Returns `false` for any error, including ESRCH (no such process).
#[cfg(unix)]
fn is_process_alive(pid: u32) -> bool {
// SAFETY: kill(pid, 0) is a read-only signal check; no signal is sent.
unsafe { libc::kill(pid as libc::pid_t, 0) == 0 }
}
/// Fallback for non-Unix platforms: assume the process is alive.
#[cfg(not(unix))]
fn is_process_alive(_pid: u32) -> bool {
true
}
impl AgentPool {
/// Start the merge pipeline as a background task.
///
@@ -24,6 +41,31 @@ impl AgentPool {
project_root: &Path,
story_id: &str,
) -> Result<(), String> {
// Sweep stale Running entries left behind by dead processes before
// applying the double-start guard. This handles the case where the
// server crashed mid-merge: the next attempt finds a Running entry
// whose owning process is gone and clears it automatically.
{
let mut jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?;
let stale_ids: Vec<String> = jobs
.iter()
.filter_map(|(sid, job)| {
if matches!(job.status, crate::agents::merge::MergeJobStatus::Running)
&& !is_process_alive(job.pid)
{
Some(sid.clone())
} else {
None
}
})
.collect();
for sid in stale_ids {
let dead_pid = jobs[&sid].pid;
jobs.remove(&sid);
slog!("[merge] Cleared stale Running merge job for '{sid}' (dead pid {dead_pid})");
}
}
// Guard against double-starts; clear any completed/failed entry so the
// caller can retry without needing to call a separate cleanup step.
{
@@ -52,6 +94,7 @@ impl AgentPool {
crate::agents::merge::MergeJob {
story_id: story_id.to_string(),
status: crate::agents::merge::MergeJobStatus::Running,
pid: std::process::id(),
},
);
}
@@ -241,7 +284,9 @@ mod tests {
let pool = Arc::new(AgentPool::new_test(3001));
// Inject a stale Running entry, simulating a mergemaster that died
// before the merge pipeline completed.
// before the merge pipeline completed. Use the current process PID so
// the stale-lock sweep (which checks whether the PID is alive) does NOT
// auto-remove it — this test verifies the double-start guard path.
{
let mut jobs = pool.merge_jobs.lock().unwrap();
jobs.insert(
@@ -249,6 +294,7 @@ mod tests {
MergeJob {
story_id: "77_story_stale".to_string(),
status: MergeJobStatus::Running,
pid: std::process::id(),
},
);
}
@@ -285,6 +331,69 @@ mod tests {
);
}
// ── story 719: stale-lock recovery on new merge attempts ─────────────────
/// AC1/AC2/AC3: seeding merge_jobs with an entry whose PID is dead, then
/// triggering a new merge for a *different* story, must automatically remove
/// the stale entry (AC1/AC3) and log at INFO (AC2 — verified structurally
/// because the log path is exercised when the entry is removed).
#[cfg(unix)]
#[tokio::test]
async fn stale_merge_job_with_dead_pid_is_swept_on_new_merge_attempt() {
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
let pool = Arc::new(AgentPool::new_test(3001));
// Obtain a PID that is guaranteed to be dead by spawning a short-lived
// process and waiting for it to exit.
let dead_pid = {
let mut child = Command::new("true").spawn().unwrap();
let pid = child.id();
child.wait().unwrap();
pid
};
// Seed merge_jobs with a Running entry whose PID is dead.
{
let mut jobs = pool.merge_jobs.lock().unwrap();
jobs.insert(
"719_stale_other".to_string(),
MergeJob {
story_id: "719_stale_other".to_string(),
status: MergeJobStatus::Running,
pid: dead_pid,
},
);
}
// Verify the entry is present before the sweep.
assert!(
pool.merge_jobs
.lock()
.unwrap()
.contains_key("719_stale_other"),
"stale entry should exist before new merge attempt"
);
// Trigger a new merge for a *different* story. The sweep runs at the
// top of start_merge_agent_work and must remove the dead-PID entry.
let _ = pool.start_merge_agent_work(repo, "719_trigger_story");
// The stale entry must have been cleared.
assert!(
!pool
.merge_jobs
.lock()
.unwrap()
.contains_key("719_stale_other"),
"stale entry with dead pid must be removed when a new merge attempt starts"
);
}
// ── merge_agent_work tests ────────────────────────────────────────────────
/// Helper: start a merge and poll until terminal state.