diff --git a/server/src/agents/merge/mod.rs b/server/src/agents/merge/mod.rs index 2c017b43..bbac0663 100644 --- a/server/src/agents/merge/mod.rs +++ b/server/src/agents/merge/mod.rs @@ -1,6 +1,6 @@ //! Merge operations — rebases agent work onto master and runs post-merge validation. -use serde::Serialize; +use serde::{Deserialize, Serialize}; mod squash; @@ -28,7 +28,7 @@ pub struct MergeJob { } /// Result of a mergemaster merge operation. -#[derive(Debug, Serialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct MergeReport { pub story_id: String, pub success: bool, diff --git a/server/src/agents/pool/auto_assign/auto_assign.rs b/server/src/agents/pool/auto_assign/auto_assign.rs index 2592bb0c..24f01459 100644 --- a/server/src/agents/pool/auto_assign/auto_assign.rs +++ b/server/src/agents/pool/auto_assign/auto_assign.rs @@ -311,14 +311,8 @@ impl AgentPool { // Skip if a merge job is already running for this story (e.g. triggered // by a previous auto-assign pass or by pipeline advancement). - let already_running = self - .merge_jobs - .lock() - .ok() - .and_then(|jobs| jobs.get(story_id.as_str()).cloned()) - .is_some_and(|job| { - matches!(job.status, crate::agents::merge::MergeJobStatus::Running) - }); + let already_running = crate::crdt_state::read_merge_job(story_id.as_str()) + .is_some_and(|job| job.status == "running"); if already_running { continue; } diff --git a/server/src/agents/pool/mod.rs b/server/src/agents/pool/mod.rs index f7adcbf6..d17a0e79 100644 --- a/server/src/agents/pool/mod.rs +++ b/server/src/agents/pool/mod.rs @@ -37,10 +37,6 @@ pub struct AgentPool { /// an `AgentStateChanged` event is emitted so the frontend can refresh the /// pipeline board without waiting for a filesystem event. watcher_tx: broadcast::Sender, - /// Tracks background merge jobs started by `merge_agent_work`, keyed by story_id. - /// The MCP tool returns immediately and the mergemaster agent polls - /// `get_merge_status` until the job reaches a terminal state. - merge_jobs: Arc>>, /// Project-scoped status broadcaster. Each agent session creates a /// [`crate::service::status::buffer::StatusEventBuffer`] subscribed to this /// broadcaster so it passively accumulates pipeline events without side effects. @@ -58,7 +54,6 @@ impl AgentPool { port, child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx: watcher_tx.clone(), - merge_jobs: Arc::new(Mutex::new(HashMap::new())), status_broadcaster: Arc::new(StatusBroadcaster::new()), }; diff --git a/server/src/agents/pool/pipeline/advance/helpers.rs b/server/src/agents/pool/pipeline/advance/helpers.rs index 5ef01f2e..9a6c8db6 100644 --- a/server/src/agents/pool/pipeline/advance/helpers.rs +++ b/server/src/agents/pool/pipeline/advance/helpers.rs @@ -35,7 +35,6 @@ pub(crate) fn spawn_pipeline_advance( port, child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx, - merge_jobs: Arc::new(Mutex::new(HashMap::new())), status_broadcaster: Arc::new(crate::service::status::StatusBroadcaster::new()), }; pool.run_pipeline_advance( diff --git a/server/src/agents/pool/pipeline/completion.rs b/server/src/agents/pool/pipeline/completion.rs index 4e05e7ca..ad1f2a97 100644 --- a/server/src/agents/pool/pipeline/completion.rs +++ b/server/src/agents/pool/pipeline/completion.rs @@ -119,7 +119,6 @@ impl AgentPool { port: self.port, child_killers: Arc::clone(&self.child_killers), watcher_tx: self.watcher_tx.clone(), - merge_jobs: Arc::clone(&self.merge_jobs), status_broadcaster: Arc::clone(&self.status_broadcaster), }; let sid = story_id.to_string(); diff --git a/server/src/agents/pool/pipeline/merge.rs b/server/src/agents/pool/pipeline/merge.rs index d44fb291..8ebe5f98 100644 --- a/server/src/agents/pool/pipeline/merge.rs +++ b/server/src/agents/pool/pipeline/merge.rs @@ -27,6 +27,28 @@ fn is_process_alive(_pid: u32) -> bool { true } +/// Encode a `pid` into the CRDT `error` field for a Running merge job. +fn encode_pid(pid: u32) -> String { + format!("{{\"pid\":{pid}}}") +} + +/// Decode a `pid` from the CRDT `error` field of a Running merge job. +fn decode_pid(error: Option<&str>) -> u32 { + error + .and_then(|e| serde_json::from_str::(e).ok()) + .and_then(|v| v["pid"].as_u64()) + .map(|p| p as u32) + .unwrap_or(0) +} + +/// Current Unix timestamp in seconds as `f64`. +fn unix_now() -> f64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() +} + impl AgentPool { /// Start the merge pipeline as a background task. /// @@ -45,60 +67,50 @@ impl AgentPool { // applying the double-start guard. This handles the case where the // server crashed mid-merge: the next attempt finds a Running entry // whose owning process is gone and clears it automatically. - { - let mut jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?; - let stale_ids: Vec = jobs - .iter() - .filter_map(|(sid, job)| { - if matches!(job.status, crate::agents::merge::MergeJobStatus::Running) - && !is_process_alive(job.pid) - { - Some(sid.clone()) - } else { - None - } - }) - .collect(); - for sid in stale_ids { - let dead_pid = jobs[&sid].pid; - jobs.remove(&sid); - slog!("[merge] Cleared stale Running merge job for '{sid}' (dead pid {dead_pid})"); - } - } - - // Guard against double-starts; clear any completed/failed entry so the - // caller can retry without needing to call a separate cleanup step. - { - let mut jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?; - if let Some(job) = jobs.get(story_id) { - match &job.status { - crate::agents::merge::MergeJobStatus::Running => { - return Err(format!( - "Merge already in progress for '{story_id}'. \ - Use get_merge_status to poll for completion." - )); - } - // Completed or Failed: clear stale entry so we can start fresh. - _ => { - jobs.remove(story_id); + if let Some(jobs) = crate::crdt_state::read_all_merge_jobs() { + for job in jobs { + if job.status == "running" { + let pid = decode_pid(job.error.as_deref()); + if pid > 0 && !is_process_alive(pid) { + slog!( + "[merge] Cleared stale Running merge job for '{}' (dead pid {})", + job.story_id, + pid + ); + crate::crdt_state::delete_merge_job(&job.story_id); } } } } - // Insert Running job. - { - let mut jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?; - jobs.insert( - story_id.to_string(), - crate::agents::merge::MergeJob { - story_id: story_id.to_string(), - status: crate::agents::merge::MergeJobStatus::Running, - pid: std::process::id(), - }, - ); + // Guard against double-starts; clear any completed/failed entry so the + // caller can retry without needing to call a separate cleanup step. + if let Some(job) = crate::crdt_state::read_merge_job(story_id) { + match job.status.as_str() { + "running" => { + return Err(format!( + "Merge already in progress for '{story_id}'. \ + Use get_merge_status to poll for completion." + )); + } + // Completed or Failed: clear stale entry so we can start fresh. + _ => { + crate::crdt_state::delete_merge_job(story_id); + } + } } + // Insert Running job into CRDT. + let started_at = unix_now(); + let pid = std::process::id(); + crate::crdt_state::write_merge_job( + story_id, + "running", + started_at, + None, + Some(&encode_pid(pid)), + ); + let pool = Arc::clone(self); let root = project_root.to_path_buf(); let sid = story_id.to_string(); @@ -107,6 +119,8 @@ impl AgentPool { let report = pool.run_merge_pipeline(&root, &sid).await; let success = matches!(&report, Ok(r) if r.success); + let finished_at = unix_now(); + // On any failure: record merge_failure in CRDT and emit notification. if !success { let reason = match &report { @@ -154,15 +168,29 @@ impl AgentPool { } } - let status = match report { - Ok(r) => crate::agents::merge::MergeJobStatus::Completed(r), - Err(e) => crate::agents::merge::MergeJobStatus::Failed(e), - }; - if let Ok(mut jobs) = pool.merge_jobs.lock() - && let Some(job) = jobs.get_mut(&sid) - { - job.status = status; + // Update CRDT with terminal status. + match &report { + Ok(r) => { + let report_json = serde_json::to_string(r).unwrap_or_else(|_| String::new()); + crate::crdt_state::write_merge_job( + &sid, + "completed", + started_at, + Some(finished_at), + Some(&report_json), + ); + } + Err(e) => { + crate::crdt_state::write_merge_job( + &sid, + "failed", + started_at, + Some(finished_at), + Some(e), + ); + } } + if !success { pool.auto_assign_available_work(&root).await; } @@ -233,11 +261,46 @@ impl AgentPool { } /// Check the status of a background merge job. + /// + /// Reads from the CRDT `merge_jobs` collection and reconstructs the full + /// [`MergeJob`] struct. The CRDT `error` field encodes the `pid` for + /// Running jobs (as `{"pid":N}`) and the serialised [`MergeReport`] for + /// Completed jobs. pub fn get_merge_status(&self, story_id: &str) -> Option { - self.merge_jobs - .lock() - .ok() - .and_then(|jobs| jobs.get(story_id).cloned()) + let view = crate::crdt_state::read_merge_job(story_id)?; + let (status, pid) = match view.status.as_str() { + "running" => { + let pid = decode_pid(view.error.as_deref()); + (crate::agents::merge::MergeJobStatus::Running, pid) + } + "completed" => { + let report = view + .error + .as_deref() + .and_then(|e| serde_json::from_str::(e).ok()) + .unwrap_or_else(|| crate::agents::merge::MergeReport { + story_id: story_id.to_string(), + success: false, + had_conflicts: false, + conflicts_resolved: false, + conflict_details: None, + gates_passed: false, + gate_output: String::new(), + worktree_cleaned_up: false, + story_archived: false, + }); + (crate::agents::merge::MergeJobStatus::Completed(report), 0) + } + _ => { + let err = view.error.unwrap_or_else(|| "Unknown error".to_string()); + (crate::agents::merge::MergeJobStatus::Failed(err), 0) + } + }; + Some(crate::agents::merge::MergeJob { + story_id: story_id.to_string(), + status, + pid, + }) } /// Trigger a deterministic server-side merge for `story_id` without spawning @@ -253,7 +316,6 @@ impl AgentPool { port: self.port, child_killers: Arc::clone(&self.child_killers), watcher_tx: self.watcher_tx.clone(), - merge_jobs: Arc::clone(&self.merge_jobs), status_broadcaster: Arc::clone(&self.status_broadcaster), }); if let Err(e) = pool.start_merge_agent_work(project_root, story_id) { @@ -345,27 +407,25 @@ mod tests { async fn stale_running_merge_job_is_cleared_and_retry_succeeds() { use tempfile::tempdir; + crate::crdt_state::init_for_test(); + let tmp = tempdir().unwrap(); let repo = tmp.path(); init_git_repo(repo); let pool = Arc::new(AgentPool::new_test(3001)); - // Inject a stale Running entry, simulating a mergemaster that died - // before the merge pipeline completed. Use the current process PID so - // the stale-lock sweep (which checks whether the PID is alive) does NOT - // auto-remove it — this test verifies the double-start guard path. - { - let mut jobs = pool.merge_jobs.lock().unwrap(); - jobs.insert( - "77_story_stale".to_string(), - MergeJob { - story_id: "77_story_stale".to_string(), - status: MergeJobStatus::Running, - pid: std::process::id(), - }, - ); - } + // Inject a stale Running entry via CRDT, simulating a mergemaster that + // died before the merge pipeline completed. Use the current process PID + // so the stale-lock sweep does NOT auto-remove it — this test verifies + // the double-start guard path. + crate::crdt_state::write_merge_job( + "77_story_stale", + "running", + 1.0, + None, + Some(&encode_pid(std::process::id())), + ); // With a stale Running entry, start_merge_agent_work must be blocked. let blocked = pool.start_merge_agent_work(repo, "77_story_stale"); @@ -380,14 +440,7 @@ mod tests { ); // Simulate the mergemaster exit path: clear the stale Running entry. - { - let mut jobs = pool.merge_jobs.lock().unwrap(); - if let Some(job) = jobs.get("77_story_stale") - && matches!(job.status, MergeJobStatus::Running) - { - jobs.remove("77_story_stale"); - } - } + crate::crdt_state::delete_merge_job("77_story_stale"); // After clearing, start_merge_agent_work must succeed (it will fail // the pipeline because there's no feature branch, but it must not be @@ -410,6 +463,8 @@ mod tests { async fn stale_merge_job_with_dead_pid_is_swept_on_new_merge_attempt() { use tempfile::tempdir; + crate::crdt_state::init_for_test(); + let tmp = tempdir().unwrap(); let repo = tmp.path(); init_git_repo(repo); @@ -425,25 +480,18 @@ mod tests { pid }; - // Seed merge_jobs with a Running entry whose PID is dead. - { - let mut jobs = pool.merge_jobs.lock().unwrap(); - jobs.insert( - "719_stale_other".to_string(), - MergeJob { - story_id: "719_stale_other".to_string(), - status: MergeJobStatus::Running, - pid: dead_pid, - }, - ); - } + // Seed CRDT merge_jobs with a Running entry whose PID is dead. + crate::crdt_state::write_merge_job( + "719_stale_other", + "running", + 1.0, + None, + Some(&encode_pid(dead_pid)), + ); // Verify the entry is present before the sweep. assert!( - pool.merge_jobs - .lock() - .unwrap() - .contains_key("719_stale_other"), + crate::crdt_state::read_merge_job("719_stale_other").is_some(), "stale entry should exist before new merge attempt" ); @@ -453,11 +501,7 @@ mod tests { // The stale entry must have been cleared. assert!( - !pool - .merge_jobs - .lock() - .unwrap() - .contains_key("719_stale_other"), + crate::crdt_state::read_merge_job("719_stale_other").is_none(), "stale entry with dead pid must be removed when a new merge attempt starts" ); } @@ -485,6 +529,7 @@ mod tests { async fn merge_agent_work_returns_error_when_branch_not_found() { use tempfile::tempdir; + crate::crdt_state::init_for_test(); let tmp = tempdir().unwrap(); let repo = tmp.path(); init_git_repo(repo); @@ -509,6 +554,7 @@ mod tests { use std::fs; use tempfile::tempdir; + crate::crdt_state::init_for_test(); let tmp = tempdir().unwrap(); let repo = tmp.path(); init_git_repo(repo); @@ -686,6 +732,7 @@ mod tests { use std::fs; use tempfile::tempdir; + crate::crdt_state::init_for_test(); let tmp = tempdir().unwrap(); let repo = tmp.path(); init_git_repo(repo); @@ -802,6 +849,7 @@ mod tests { use std::fs; use tempfile::tempdir; + crate::crdt_state::init_for_test(); let tmp = tempdir().unwrap(); let repo = tmp.path(); init_git_repo(repo); @@ -884,6 +932,7 @@ mod tests { use std::fs; use tempfile::tempdir; + crate::crdt_state::init_for_test(); let tmp = tempdir().unwrap(); let repo = tmp.path(); init_git_repo(repo); @@ -993,6 +1042,7 @@ mod tests { use std::fs; use tempfile::tempdir; + crate::crdt_state::init_for_test(); let tmp = tempdir().unwrap(); let repo = tmp.path(); init_git_repo(repo); @@ -1111,6 +1161,7 @@ mod tests { use std::os::unix::fs::PermissionsExt; use tempfile::tempdir; + crate::crdt_state::init_for_test(); let tmp = tempdir().unwrap(); let repo = tmp.path(); init_git_repo(repo); @@ -1226,6 +1277,7 @@ mod tests { use std::fs; use tempfile::tempdir; + crate::crdt_state::init_for_test(); let tmp = tempdir().unwrap(); let repo = tmp.path(); init_git_repo(repo); diff --git a/server/src/agents/pool/query.rs b/server/src/agents/pool/query.rs index 78318298..6e76edc0 100644 --- a/server/src/agents/pool/query.rs +++ b/server/src/agents/pool/query.rs @@ -35,11 +35,11 @@ impl AgentPool { /// Used by `list_agents` and `get_pipeline_status` to surface in-flight /// deterministic merges that hold the merge lock but have no agent entry. pub fn list_running_merges(&self) -> Result, String> { - let jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?; + let jobs = crate::crdt_state::read_all_merge_jobs().unwrap_or_default(); Ok(jobs - .values() - .filter(|job| matches!(job.status, crate::agents::merge::MergeJobStatus::Running)) - .map(|job| job.story_id.clone()) + .into_iter() + .filter(|job| job.status == "running") + .map(|job| job.story_id) .collect()) } diff --git a/server/src/agents/pool/start/mod.rs b/server/src/agents/pool/start/mod.rs index 17c8d972..9ead18dc 100644 --- a/server/src/agents/pool/start/mod.rs +++ b/server/src/agents/pool/start/mod.rs @@ -350,7 +350,6 @@ impl AgentPool { log_writer.clone(), self.child_killers.clone(), self.watcher_tx.clone(), - Arc::clone(&self.merge_jobs), inactivity_timeout_secs, prior_events, )); diff --git a/server/src/agents/pool/start/spawn.rs b/server/src/agents/pool/start/spawn.rs index ae998db6..d9dba8e3 100644 --- a/server/src/agents/pool/start/spawn.rs +++ b/server/src/agents/pool/start/spawn.rs @@ -16,7 +16,6 @@ use crate::config::ProjectConfig; use crate::io::watcher::WatcherEvent; use crate::slog_error; -use super::super::super::merge::MergeJob; use super::super::super::runtime::{ AgentRuntime, ClaudeCodeRuntime, GeminiRuntime, OpenAiRuntime, RuntimeContext, }; @@ -46,7 +45,6 @@ pub(super) async fn run_agent_spawn( log_writer: Option>>, child_killers: Arc>>>, watcher_tx: broadcast::Sender, - merge_jobs: Arc>>, inactivity_timeout_secs: u64, // Formatted `` block drained from the previous session's // buffer. Prepended to the first agent turn so the agent sees what @@ -70,7 +68,6 @@ pub(super) async fn run_agent_spawn( let log_writer_clone = log_writer; let child_killers_clone = child_killers; let watcher_tx_clone = watcher_tx; - let merge_jobs_clone = merge_jobs; let _ = inactivity_timeout_secs; // currently unused inside the closure body // Step 1: create the worktree (slow — git checkout, pnpm install, etc.) @@ -328,11 +325,10 @@ pub(super) async fn run_agent_spawn( // Clear any stale Running merge job so the next mergemaster // can call start_merge_agent_work without hitting "Merge // already in progress" (bug 498). - if let Ok(mut jobs) = merge_jobs_clone.lock() - && let Some(job) = jobs.get(&sid) - && matches!(job.status, crate::agents::merge::MergeJobStatus::Running) + if crate::crdt_state::read_merge_job(&sid) + .is_some_and(|job| job.status == "running") { - jobs.remove(&sid); + crate::crdt_state::delete_merge_job(&sid); } let _ = tx_done.send(AgentEvent::Done { story_id: sid.clone(),