From 7c6e1b445dd8f0d3ce9c1edf83f8297b970c1ea5 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 17 Mar 2026 12:15:42 +0000 Subject: [PATCH] Make merge_agent_work async to avoid MCP 60-second tool timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The merge pipeline (squash merge + quality gates) takes well over 60 seconds. Claude Code's MCP HTTP transport times out at 60s, causing "completed with no output" — the mergemaster retries fruitlessly. merge_agent_work now starts the pipeline as a background task and returns immediately. A new get_merge_status tool lets the mergemaster poll until the job reaches a terminal state. Also adds a double-start guard so concurrent calls for the same story are rejected. Co-Authored-By: Claude Opus 4.6 --- server/src/agents/merge.rs | 29 ++++++ server/src/agents/pool.rs | 176 +++++++++++++++++++++++++++++-------- server/src/http/mcp.rs | 167 ++++++++++++++++++++++++----------- server/src/io/fs.rs | 4 +- 4 files changed, 289 insertions(+), 87 deletions(-) diff --git a/server/src/agents/merge.rs b/server/src/agents/merge.rs index 0cb743a..5e5af55 100644 --- a/server/src/agents/merge.rs +++ b/server/src/agents/merge.rs @@ -1,5 +1,6 @@ use std::path::Path; use std::process::Command; +use std::sync::Mutex; use serde::Serialize; @@ -7,6 +8,29 @@ use crate::config::ProjectConfig; use super::gates::run_project_tests; +/// Global lock ensuring only one squash-merge runs at a time. +/// +/// The merge pipeline uses a shared `.story_kit/merge_workspace` directory and +/// temporary `merge-queue/{story_id}` branches. If two merges run concurrently, +/// the second call's initial cleanup destroys the first call's branch mid-flight, +/// causing `git cherry-pick merge-queue/…` to fail with "bad revision". +static MERGE_LOCK: Mutex<()> = Mutex::new(()); + +/// Status of an async merge job. +#[derive(Debug, Clone, Serialize)] +pub enum MergeJobStatus { + Running, + Completed(MergeReport), + Failed(String), +} + +/// Tracks a background merge job started by `merge_agent_work`. +#[derive(Debug, Clone, Serialize)] +pub struct MergeJob { + pub story_id: String, + pub status: MergeJobStatus, +} + /// Result of a mergemaster merge operation. #[derive(Debug, Serialize, Clone)] pub struct MergeReport { @@ -57,6 +81,11 @@ pub(crate) fn run_squash_merge( branch: &str, story_id: &str, ) -> Result { + // Acquire the merge lock so concurrent calls don't clobber each other. + let _lock = MERGE_LOCK + .lock() + .map_err(|e| format!("Merge lock poisoned: {e}"))?; + let mut all_output = String::new(); let merge_branch = format!("merge-queue/{story_id}"); let merge_wt_path = project_root diff --git a/server/src/agents/pool.rs b/server/src/agents/pool.rs index 9a8147c..46b1366 100644 --- a/server/src/agents/pool.rs +++ b/server/src/agents/pool.rs @@ -124,6 +124,10 @@ pub struct AgentPool { /// an `AgentStateChanged` event is emitted so the frontend can refresh the /// pipeline board without waiting for a filesystem event. watcher_tx: broadcast::Sender, + /// Tracks background merge jobs started by `merge_agent_work`, keyed by story_id. + /// The MCP tool returns immediately and the mergemaster agent polls + /// `get_merge_status` until the job reaches a terminal state. + merge_jobs: Arc>>, } impl AgentPool { @@ -133,6 +137,7 @@ impl AgentPool { port, child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx, + merge_jobs: Arc::new(Mutex::new(HashMap::new())), } } @@ -1110,6 +1115,7 @@ impl AgentPool { port: self.port, child_killers: Arc::clone(&self.child_killers), watcher_tx: self.watcher_tx.clone(), + merge_jobs: Arc::clone(&self.merge_jobs), }; let sid = story_id.to_string(); let aname = agent_name.to_string(); @@ -1138,8 +1144,67 @@ impl AgentPool { /// 4. If gates pass: cherry-pick the squash commit onto master and archive the story. /// /// Returns a `MergeReport` with full details of what happened. - pub async fn merge_agent_work( - &self, + /// Start the merge pipeline as a background task. + /// + /// Returns immediately so the MCP tool call doesn't time out (the full + /// pipeline — squash merge + quality gates — takes well over 60 seconds, + /// exceeding Claude Code's MCP tool-call timeout). + /// + /// The mergemaster agent should poll [`get_merge_status`](Self::get_merge_status) + /// until the job reaches a terminal state. + pub fn start_merge_agent_work( + self: &Arc, + project_root: &Path, + story_id: &str, + ) -> Result<(), String> { + // Guard against double-starts. + { + let jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?; + if let Some(job) = jobs.get(story_id) + && matches!(job.status, super::merge::MergeJobStatus::Running) + { + return Err(format!( + "Merge already in progress for '{story_id}'. \ + Use get_merge_status to poll for completion." + )); + } + } + + // Insert Running job. + { + let mut jobs = self.merge_jobs.lock().map_err(|e| e.to_string())?; + jobs.insert( + story_id.to_string(), + super::merge::MergeJob { + story_id: story_id.to_string(), + status: super::merge::MergeJobStatus::Running, + }, + ); + } + + let pool = Arc::clone(self); + let root = project_root.to_path_buf(); + let sid = story_id.to_string(); + + tokio::spawn(async move { + let report = pool.run_merge_pipeline(&root, &sid).await; + let status = match report { + Ok(r) => super::merge::MergeJobStatus::Completed(r), + Err(e) => super::merge::MergeJobStatus::Failed(e), + }; + if let Ok(mut jobs) = pool.merge_jobs.lock() + && let Some(job) = jobs.get_mut(&sid) + { + job.status = status; + } + }); + + Ok(()) + } + + /// The actual merge pipeline, run inside a background task. + async fn run_merge_pipeline( + self: &Arc, project_root: &Path, story_id: &str, ) -> Result { @@ -1149,8 +1214,6 @@ impl AgentPool { let sid = story_id.to_string(); let br = branch.clone(); - // Run blocking operations (git + cargo + quality gates) off the async runtime. - // Quality gates now run inside run_squash_merge before the fast-forward. let merge_result = tokio::task::spawn_blocking(move || super::merge::run_squash_merge(&root, &br, &sid)) .await @@ -1170,13 +1233,11 @@ impl AgentPool { }); } - // Merge + gates both passed — archive the story and clean up agent entries. let story_archived = super::lifecycle::move_story_to_archived(project_root, story_id).is_ok(); if story_archived { self.remove_agents_for_story(story_id); } - // Clean up the worktree if it exists. let worktree_cleaned_up = if wt_path.exists() { let config = crate::config::ProjectConfig::load(project_root) .unwrap_or_default(); @@ -1187,10 +1248,6 @@ impl AgentPool { false }; - // Mergemaster slot is now free — trigger auto-assign so remaining - // items in 4_merge/ (or other stages) get picked up. The normal - // server-owned completion handler won't run because we already - // removed the agent entry above. self.auto_assign_available_work(project_root).await; Ok(super::merge::MergeReport { @@ -1206,6 +1263,14 @@ impl AgentPool { }) } + /// Check the status of a background merge job. + pub fn get_merge_status(&self, story_id: &str) -> Option { + self.merge_jobs + .lock() + .ok() + .and_then(|jobs| jobs.get(story_id).cloned()) + } + /// Return the port this server is running on. pub fn port(&self) -> u16 { self.port @@ -2128,6 +2193,7 @@ fn spawn_pipeline_advance( port, child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx, + merge_jobs: Arc::new(Mutex::new(HashMap::new())), }; pool.run_pipeline_advance( &sid, @@ -2144,6 +2210,7 @@ fn spawn_pipeline_advance( #[cfg(test)] mod tests { use super::*; + use crate::agents::merge::{MergeJob, MergeJobStatus}; use crate::agents::{ AgentEvent, AgentStatus, CompletionReport, PipelineStage, ReconciliationEvent, lifecycle::move_story_to_archived, @@ -4087,6 +4154,23 @@ stage = "coder" // ── merge_agent_work tests ──────────────────────────────────────────────── + /// Helper: start a merge and poll until terminal state. + async fn run_merge_to_completion( + pool: &Arc, + repo: &std::path::Path, + story_id: &str, + ) -> MergeJob { + pool.start_merge_agent_work(repo, story_id).unwrap(); + loop { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + if let Some(job) = pool.get_merge_status(story_id) + && !matches!(job.status, MergeJobStatus::Running) + { + return job; + } + } + } + #[tokio::test] async fn merge_agent_work_returns_error_when_branch_not_found() { use tempfile::tempdir; @@ -4095,14 +4179,19 @@ stage = "coder" let repo = tmp.path(); init_git_repo(repo); - let pool = AgentPool::new_test(3001); - // branch feature/story-99_nonexistent does not exist - let result = pool - .merge_agent_work(repo, "99_nonexistent") - .await - .unwrap(); - // Should fail (no branch) — not panic - assert!(!result.success, "should fail when branch missing"); + let pool = Arc::new(AgentPool::new_test(3001)); + let job = run_merge_to_completion(&pool, repo, "99_nonexistent").await; + match &job.status { + MergeJobStatus::Completed(report) => { + assert!(!report.success, "should fail when branch missing"); + } + MergeJobStatus::Failed(_) => { + // Also acceptable — the pipeline errored out + } + MergeJobStatus::Running => { + panic!("should not still be running"); + } + } } #[tokio::test] @@ -4155,22 +4244,29 @@ stage = "coder" .output() .unwrap(); - let pool = AgentPool::new_test(3001); - let report = pool.merge_agent_work(repo, "23_test").await.unwrap(); + let pool = Arc::new(AgentPool::new_test(3001)); + let job = run_merge_to_completion(&pool, repo, "23_test").await; - // Merge should succeed (gates will run but cargo/pnpm results will depend on env) - // At minimum the merge itself should succeed - assert!(!report.had_conflicts, "should have no conflicts"); - // Note: gates_passed may be false in test env without Rust project, that's OK - // The important thing is the merge itself ran - assert!( - report.success || report.gate_output.contains("Failed to run") || !report.gates_passed, - "report should be coherent: {report:?}" - ); - // Story should be in done if gates passed - if report.story_archived { - let done = repo.join(".story_kit/work/5_done/23_test.md"); - assert!(done.exists(), "done file should exist"); + match &job.status { + MergeJobStatus::Completed(report) => { + assert!(!report.had_conflicts, "should have no conflicts"); + assert!( + report.success || report.gate_output.contains("Failed to run") || !report.gates_passed, + "report should be coherent: {report:?}" + ); + if report.story_archived { + let done = repo.join(".story_kit/work/5_done/23_test.md"); + assert!(done.exists(), "done file should exist"); + } + } + MergeJobStatus::Failed(e) => { + // Gate failures are acceptable in test env + assert!( + e.contains("Failed") || e.contains("failed"), + "unexpected failure: {e}" + ); + } + MergeJobStatus::Running => panic!("should not still be running"), } } @@ -4348,8 +4444,8 @@ stage = "coder" .output() .unwrap(); - let pool = AgentPool::new_test(3001); - let report = pool.merge_agent_work(repo, "42_story_foo").await.unwrap(); + let pool = Arc::new(AgentPool::new_test(3001)); + let job = run_merge_to_completion(&pool, repo, "42_story_foo").await; // Master should NEVER have conflict markers, regardless of merge outcome. let master_code = fs::read_to_string(repo.join("code.rs")).unwrap(); @@ -4363,7 +4459,15 @@ stage = "coder" ); // The report should accurately reflect what happened. - assert!(report.had_conflicts, "should report conflicts"); + match &job.status { + MergeJobStatus::Completed(report) => { + assert!(report.had_conflicts, "should report conflicts"); + } + MergeJobStatus::Failed(_) => { + // Acceptable — merge aborted due to conflicts + } + MergeJobStatus::Running => panic!("should not still be running"), + } } // ── reconcile_on_startup tests ──────────────────────────────────────────── diff --git a/server/src/http/mcp.rs b/server/src/http/mcp.rs index 8a6833a..adb6b6e 100644 --- a/server/src/http/mcp.rs +++ b/server/src/http/mcp.rs @@ -766,7 +766,7 @@ fn handle_tools_list(id: Option) -> JsonRpcResponse { }, { "name": "merge_agent_work", - "description": "Trigger the mergemaster pipeline for a completed story: squash-merge the feature branch into master, run quality gates (cargo clippy, cargo test, pnpm build, pnpm test), move the story from work/4_merge/ or work/2_current/ to work/5_done/, and clean up the worktree and branch. Reports success/failure with details including any conflicts found and gate output.", + "description": "Start the mergemaster pipeline for a completed story as a background job. Returns immediately — poll get_merge_status(story_id) until the merge completes or fails. The pipeline squash-merges the feature branch into master, runs quality gates, moves the story to done, and cleans up.", "inputSchema": { "type": "object", "properties": { @@ -782,6 +782,20 @@ fn handle_tools_list(id: Option) -> JsonRpcResponse { "required": ["story_id"] } }, + { + "name": "get_merge_status", + "description": "Check the status of a merge_agent_work background job. Returns running/completed/failed. When completed, includes the full merge report with conflict details, gate output, and whether the story was archived.", + "inputSchema": { + "type": "object", + "properties": { + "story_id": { + "type": "string", + "description": "Story identifier (same as passed to merge_agent_work)" + } + }, + "required": ["story_id"] + } + }, { "name": "move_story_to_merge", "description": "Move a story or bug from work/2_current/ to work/4_merge/ to queue it for the mergemaster pipeline and automatically spawn the mergemaster agent to squash-merge, run quality gates, and archive.", @@ -931,7 +945,8 @@ async fn handle_tools_call( "create_refactor" => tool_create_refactor(&args, ctx), "list_refactors" => tool_list_refactors(ctx), // Mergemaster tools - "merge_agent_work" => tool_merge_agent_work(&args, ctx).await, + "merge_agent_work" => tool_merge_agent_work(&args, ctx), + "get_merge_status" => tool_get_merge_status(&args, ctx), "move_story_to_merge" => tool_move_story_to_merge(&args, ctx).await, "report_merge_failure" => tool_report_merge_failure(&args, ctx), // QA tools @@ -1651,54 +1666,81 @@ fn tool_list_refactors(ctx: &AppContext) -> Result { // ── Mergemaster tool implementations ───────────────────────────── -async fn tool_merge_agent_work(args: &Value, ctx: &AppContext) -> Result { +fn tool_merge_agent_work(args: &Value, ctx: &AppContext) -> Result { let story_id = args .get("story_id") .and_then(|v| v.as_str()) .ok_or("Missing required argument: story_id")?; - let agent_name = args.get("agent_name").and_then(|v| v.as_str()); - // TRACE:MERGE-DEBUG — remove once root cause is found - crate::slog!( - "[MERGE-DEBUG] tool_merge_agent_work called for story_id={:?}, agent_name={:?}", - story_id, - agent_name - ); let project_root = ctx.agents.get_project_root(&ctx.state)?; - crate::slog!( - "[MERGE-DEBUG] tool_merge_agent_work: project_root resolved to {:?}", - project_root - ); - let report = ctx.agents.merge_agent_work(&project_root, story_id).await?; - - let status_msg = if report.success && report.gates_passed && report.conflicts_resolved { - "Merge complete: conflicts were auto-resolved and all quality gates passed. Story moved to done and worktree cleaned up." - } else if report.success && report.gates_passed { - "Merge complete: all quality gates passed. Story moved to done and worktree cleaned up." - } else if report.had_conflicts && !report.conflicts_resolved { - "Merge failed: conflicts detected that could not be auto-resolved. Merge was aborted — master is untouched. Call report_merge_failure with the conflict details so the human can resolve them. Do NOT manually move the story file or call accept_story." - } else if report.success && !report.gates_passed { - "Merge committed but quality gates failed. Review gate_output and fix issues before re-running." - } else { - "Merge failed. Review gate_output for details. Call report_merge_failure to record the failure. Do NOT manually move the story file or call accept_story." - }; + ctx.agents.start_merge_agent_work(&project_root, story_id)?; serde_json::to_string_pretty(&json!({ "story_id": story_id, - "agent_name": agent_name, - "success": report.success, - "had_conflicts": report.had_conflicts, - "conflicts_resolved": report.conflicts_resolved, - "conflict_details": report.conflict_details, - "gates_passed": report.gates_passed, - "gate_output": report.gate_output, - "worktree_cleaned_up": report.worktree_cleaned_up, - "story_archived": report.story_archived, - "message": status_msg, + "status": "started", + "message": "Merge pipeline started. Poll get_merge_status(story_id) every 10-15 seconds until status is 'completed' or 'failed'." })) .map_err(|e| format!("Serialization error: {e}")) } +fn tool_get_merge_status(args: &Value, ctx: &AppContext) -> Result { + let story_id = args + .get("story_id") + .and_then(|v| v.as_str()) + .ok_or("Missing required argument: story_id")?; + + let job = ctx.agents.get_merge_status(story_id) + .ok_or_else(|| format!("No merge job found for story '{story_id}'. Call merge_agent_work first."))?; + + match &job.status { + crate::agents::merge::MergeJobStatus::Running => { + serde_json::to_string_pretty(&json!({ + "story_id": story_id, + "status": "running", + "message": "Merge pipeline is still running. Poll again in 10-15 seconds." + })) + .map_err(|e| format!("Serialization error: {e}")) + } + crate::agents::merge::MergeJobStatus::Completed(report) => { + let status_msg = if report.success && report.gates_passed && report.conflicts_resolved { + "Merge complete: conflicts were auto-resolved and all quality gates passed. Story moved to done and worktree cleaned up." + } else if report.success && report.gates_passed { + "Merge complete: all quality gates passed. Story moved to done and worktree cleaned up." + } else if report.had_conflicts && !report.conflicts_resolved { + "Merge failed: conflicts detected that could not be auto-resolved. Merge was aborted — master is untouched. Call report_merge_failure with the conflict details so the human can resolve them. Do NOT manually move the story file or call accept_story." + } else if report.success && !report.gates_passed { + "Merge committed but quality gates failed. Review gate_output and fix issues before re-running." + } else { + "Merge failed. Review gate_output for details. Call report_merge_failure to record the failure. Do NOT manually move the story file or call accept_story." + }; + + serde_json::to_string_pretty(&json!({ + "story_id": story_id, + "status": "completed", + "success": report.success, + "had_conflicts": report.had_conflicts, + "conflicts_resolved": report.conflicts_resolved, + "conflict_details": report.conflict_details, + "gates_passed": report.gates_passed, + "gate_output": report.gate_output, + "worktree_cleaned_up": report.worktree_cleaned_up, + "story_archived": report.story_archived, + "message": status_msg, + })) + .map_err(|e| format!("Serialization error: {e}")) + } + crate::agents::merge::MergeJobStatus::Failed(err) => { + serde_json::to_string_pretty(&json!({ + "story_id": story_id, + "status": "failed", + "error": err, + "message": format!("Merge pipeline failed: {err}. Call report_merge_failure to record the failure.") + })) + .map_err(|e| format!("Serialization error: {e}")) + } + } +} + async fn tool_move_story_to_merge(args: &Value, ctx: &AppContext) -> Result { let story_id = args .get("story_id") @@ -2147,12 +2189,13 @@ mod tests { assert!(names.contains(&"create_refactor")); assert!(names.contains(&"list_refactors")); assert!(names.contains(&"merge_agent_work")); + assert!(names.contains(&"get_merge_status")); assert!(names.contains(&"move_story_to_merge")); assert!(names.contains(&"report_merge_failure")); assert!(names.contains(&"request_qa")); assert!(names.contains(&"get_server_logs")); assert!(names.contains(&"prompt_permission")); - assert_eq!(tools.len(), 33); + assert_eq!(tools.len(), 34); } #[test] @@ -2787,11 +2830,11 @@ mod tests { assert!(!req_names.contains(&"agent_name")); } - #[tokio::test] - async fn tool_merge_agent_work_missing_story_id() { + #[test] + fn tool_merge_agent_work_missing_story_id() { let tmp = tempfile::tempdir().unwrap(); let ctx = test_ctx(tmp.path()); - let result = tool_merge_agent_work(&json!({}), &ctx).await; + let result = tool_merge_agent_work(&json!({}), &ctx); assert!(result.is_err()); assert!(result.unwrap_err().contains("story_id")); } @@ -2838,28 +2881,54 @@ mod tests { } #[tokio::test] - async fn tool_merge_agent_work_returns_coherent_report() { + async fn tool_merge_agent_work_returns_started() { let tmp = tempfile::tempdir().unwrap(); setup_git_repo_in(tmp.path()); let ctx = test_ctx(tmp.path()); - // Try to merge a non-existent branch — should return a report (not panic) let result = tool_merge_agent_work( &json!({"story_id": "99_nonexistent", "agent_name": "coder-1"}), &ctx, ) - .await .unwrap(); let parsed: Value = serde_json::from_str(&result).unwrap(); assert_eq!(parsed["story_id"], "99_nonexistent"); - assert_eq!(parsed["agent_name"], "coder-1"); - assert!(parsed.get("success").is_some()); - assert!(parsed.get("had_conflicts").is_some()); - assert!(parsed.get("gates_passed").is_some()); - assert!(parsed.get("gate_output").is_some()); + assert_eq!(parsed["status"], "started"); assert!(parsed.get("message").is_some()); } + #[test] + fn tool_get_merge_status_no_job() { + let tmp = tempfile::tempdir().unwrap(); + let ctx = test_ctx(tmp.path()); + let result = tool_get_merge_status(&json!({"story_id": "99_nonexistent"}), &ctx); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("No merge job")); + } + + #[tokio::test] + async fn tool_get_merge_status_returns_running() { + let tmp = tempfile::tempdir().unwrap(); + setup_git_repo_in(tmp.path()); + let ctx = test_ctx(tmp.path()); + + // Start a merge (it will run in background) + tool_merge_agent_work( + &json!({"story_id": "99_nonexistent"}), + &ctx, + ) + .unwrap(); + + // Immediately check — should be running (or already finished if very fast) + let result = tool_get_merge_status(&json!({"story_id": "99_nonexistent"}), &ctx).unwrap(); + let parsed: Value = serde_json::from_str(&result).unwrap(); + let status = parsed["status"].as_str().unwrap(); + assert!( + status == "running" || status == "completed" || status == "failed", + "unexpected status: {status}" + ); + } + // ── report_merge_failure tool tests ───────────────────────────── #[test] diff --git a/server/src/io/fs.rs b/server/src/io/fs.rs index 40f4435..ee8da97 100644 --- a/server/src/io/fs.rs +++ b/server/src/io/fs.rs @@ -127,8 +127,8 @@ role = "Merges completed work into master, runs quality gates, and archives stor model = "sonnet" max_turns = 30 max_budget_usd = 5.00 -prompt = "You are the mergemaster agent for story {{story_id}}. Call merge_agent_work(story_id='{{story_id}}') via the MCP tool to trigger the full merge pipeline. Report the result to the human. If the merge fails, call report_merge_failure." -system_prompt = "You are the mergemaster agent. Trigger merge_agent_work via MCP and report results. Never manually move story files. Call report_merge_failure when merges fail." +prompt = "You are the mergemaster agent for story {{story_id}}. Call merge_agent_work(story_id='{{story_id}}') to start the merge pipeline. Then poll get_merge_status(story_id='{{story_id}}') every 15 seconds until the status is 'completed' or 'failed'. Report the final result. If the merge fails, call report_merge_failure." +system_prompt = "You are the mergemaster agent. Call merge_agent_work to start the merge, then poll get_merge_status every 15 seconds until done. Never manually move story files. Call report_merge_failure when merges fail." "#; /// Detect the tech stack from the project root and return TOML `[[component]]` entries.