From f62012ee9ce7925e6e9db68e1d2461fe3c78f30c Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 28 Apr 2026 15:16:05 +0000 Subject: [PATCH] huskies: merge 793 --- server/src/agents/pool/pipeline/completion.rs | 971 ------------------ .../agents/pool/pipeline/completion/legacy.rs | 136 +++ .../agents/pool/pipeline/completion/mod.rs | 8 + .../agents/pool/pipeline/completion/server.rs | 236 +++++ .../agents/pool/pipeline/completion/tests.rs | 583 +++++++++++ 5 files changed, 963 insertions(+), 971 deletions(-) delete mode 100644 server/src/agents/pool/pipeline/completion.rs create mode 100644 server/src/agents/pool/pipeline/completion/legacy.rs create mode 100644 server/src/agents/pool/pipeline/completion/mod.rs create mode 100644 server/src/agents/pool/pipeline/completion/server.rs create mode 100644 server/src/agents/pool/pipeline/completion/tests.rs diff --git a/server/src/agents/pool/pipeline/completion.rs b/server/src/agents/pool/pipeline/completion.rs deleted file mode 100644 index ad1f2a97..00000000 --- a/server/src/agents/pool/pipeline/completion.rs +++ /dev/null @@ -1,971 +0,0 @@ -//! Agent completion handling — processes exit results and triggers pipeline advancement. -use crate::io::watcher::WatcherEvent; -use crate::slog; -use std::collections::HashMap; -use std::sync::{Arc, Mutex}; -use tokio::sync::broadcast; - -use super::super::super::{ - AgentEvent, AgentStatus, CompletionReport, PipelineStage, pipeline_stage, -}; -use super::super::{AgentPool, StoryAgent, composite_key}; -use super::advance::spawn_pipeline_advance; - -impl AgentPool { - /// Internal: report that an agent has finished work on a story. - /// - /// **Note:** This is no longer exposed as an MCP tool. The server now - /// automatically runs completion gates when an agent process exits - /// (see `run_server_owned_completion`). This method is retained for - /// backwards compatibility and testing. - /// - /// - Rejects with an error if the worktree has uncommitted changes. - /// - Runs acceptance gates (cargo clippy + cargo nextest run / cargo test). - /// - Stores the `CompletionReport` on the agent record. - /// - Transitions status to `Completed` (gates passed) or `Failed` (gates failed). - /// - Emits a `Done` event so `wait_for_agent` unblocks. - #[allow(dead_code)] - pub async fn report_completion( - &self, - story_id: &str, - agent_name: &str, - summary: &str, - ) -> Result { - let key = composite_key(story_id, agent_name); - - // Verify agent exists, is Running, and grab its worktree path. - let worktree_path = { - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let agent = agents - .get(&key) - .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; - - if agent.status != AgentStatus::Running { - return Err(format!( - "Agent '{agent_name}' for story '{story_id}' is not running (status: {}). \ - report_completion can only be called by a running agent.", - agent.status - )); - } - - agent - .worktree_info - .as_ref() - .map(|wt| wt.path.clone()) - .ok_or_else(|| { - format!( - "Agent '{agent_name}' for story '{story_id}' has no worktree. \ - Cannot run acceptance gates." - ) - })? - }; - - let path = worktree_path.clone(); - - // Run gate checks in a blocking thread to avoid stalling the async runtime. - let (gates_passed, gate_output) = tokio::task::spawn_blocking(move || { - // Step 1: Reject if worktree is dirty. - crate::agents::gates::check_uncommitted_changes(&path)?; - // Step 2: Run clippy + tests and return (passed, output). - crate::agents::gates::run_acceptance_gates(&path) - }) - .await - .map_err(|e| format!("Gate check task panicked: {e}"))??; - - let report = CompletionReport { - summary: summary.to_string(), - gates_passed, - gate_output, - }; - - // Extract data for pipeline advance, then remove the entry so - // completed agents never appear in list_agents. - let ( - tx, - session_id, - project_root_for_advance, - wt_path_for_advance, - merge_failure_reported_for_advance, - session_id_for_advance, - ) = { - let mut agents = self.agents.lock().map_err(|e| e.to_string())?; - let agent = agents.get_mut(&key).ok_or_else(|| { - format!("Agent '{agent_name}' for story '{story_id}' disappeared during gate check") - })?; - agent.completion = Some(report.clone()); - let tx = agent.tx.clone(); - let sid = agent.session_id.clone(); - let pr = agent.project_root.clone(); - let wt = agent.worktree_info.as_ref().map(|w| w.path.clone()); - let mfr = agent.merge_failure_reported; - let sid_advance = agent.session_id.clone(); - agents.remove(&key); - (tx, sid, pr, wt, mfr, sid_advance) - }; - - // Emit Done so wait_for_agent unblocks. - let _ = tx.send(AgentEvent::Done { - story_id: story_id.to_string(), - agent_name: agent_name.to_string(), - session_id, - }); - - // Notify WebSocket clients that the agent is gone. - Self::notify_agent_state_changed(&self.watcher_tx); - - // Advance the pipeline state machine in a background task. - let pool_clone = Self { - agents: Arc::clone(&self.agents), - port: self.port, - child_killers: Arc::clone(&self.child_killers), - watcher_tx: self.watcher_tx.clone(), - status_broadcaster: Arc::clone(&self.status_broadcaster), - }; - let sid = story_id.to_string(); - let aname = agent_name.to_string(); - let report_for_advance = report.clone(); - tokio::spawn(async move { - pool_clone - .run_pipeline_advance( - &sid, - &aname, - report_for_advance, - project_root_for_advance, - wt_path_for_advance, - merge_failure_reported_for_advance, - session_id_for_advance, - ) - .await; - }); - - Ok(report) - } -} - -/// Server-owned completion: runs acceptance gates when an agent process exits -/// normally, and advances the pipeline based on results. -/// -/// This is a **free function** (not a method on `AgentPool`) to break the -/// opaque type cycle that would otherwise arise: `start_agent` → spawned task -/// → server-owned completion → pipeline advance → `start_agent`. -/// -/// If the agent already has a completion report (e.g. from a legacy -/// `report_completion` call), this is a no-op to avoid double-running gates. -pub(in crate::agents::pool) async fn run_server_owned_completion( - agents: &Arc>>, - port: u16, - story_id: &str, - agent_name: &str, - session_id: Option, - watcher_tx: broadcast::Sender, -) { - let key = composite_key(story_id, agent_name); - - // Guard: mergemaster agents have their own completion path via - // start_merge_agent_work / run_merge_pipeline. Running server-owned gates - // for a mergemaster would wrongly advance the story to 5_done/ even when - // no squash merge has occurred (e.g. rate-limited exit before the agent - // called start_merge_agent_work). The lifecycle caller is responsible for - // cleaning up the agent entry and triggering auto-assign. - if pipeline_stage(agent_name) == PipelineStage::Mergemaster { - slog!( - "[agents] run_server_owned_completion skipped for mergemaster \ - '{story_id}:{agent_name}'; mergemaster completion is handled by \ - start_merge_agent_work." - ); - return; - } - - // Guard: skip if completion was already recorded (legacy path). - { - let lock = match agents.lock() { - Ok(a) => a, - Err(_) => return, - }; - match lock.get(&key) { - Some(agent) if agent.completion.is_some() => { - slog!( - "[agents] Completion already recorded for '{story_id}:{agent_name}'; \ - skipping server-owned gates." - ); - return; - } - Some(_) => {} - None => return, - } - } - - // Get worktree path for running gates. - let worktree_path = { - let lock = match agents.lock() { - Ok(a) => a, - Err(_) => return, - }; - lock.get(&key) - .and_then(|a| a.worktree_info.as_ref().map(|wt| wt.path.clone())) - }; - - // Kill any in-flight cargo test processes for this worktree so they don't - // hold the build lock while gates try to run. - if let Some(wt_path) = worktree_path.as_ref() - && let Ok(output) = std::process::Command::new("pgrep") - .args([ - "-f", - &format!("--manifest-path {}/Cargo.toml", wt_path.display()), - ]) - .output() - { - let pids = String::from_utf8_lossy(&output.stdout); - for pid_str in pids.lines() { - if let Ok(pid) = pid_str.trim().parse::() { - crate::slog!( - "[agents] Killing stale cargo process (pid {pid}) for '{story_id}' before running gates" - ); - unsafe { - libc::kill(pid, libc::SIGKILL); - } - } - } - } - - // Run acceptance gates. - let (gates_passed, gate_output) = if let Some(wt_path) = worktree_path { - let path = wt_path; - match tokio::task::spawn_blocking(move || { - // If the worktree is dirty, check whether committed work survived. - // An agent crash (e.g. Claude Code CLI's `output.write(&bytes).is_ok()` - // assertion — bug 645) can leave uncommitted files behind even though - // the agent already committed valid work. In that case, stash the - // dirty files and proceed with gates on the committed code. - // Uncommitted work is never junk — it may be the next agent session's - // starting point (bug 651). - let stashed = - if let Err(dirty_msg) = crate::agents::gates::check_uncommitted_changes(&path) { - if crate::agents::gates::worktree_has_committed_work(&path) { - crate::slog!( - "[agents] Worktree dirty but committed work exists — \ - stashing uncommitted changes and proceeding with gates. \ - Dirty state: {dirty_msg}" - ); - // Stash dirty files so gates run against committed code only. - // They will be restored after gates complete. - std::process::Command::new("git") - .args([ - "stash", - "push", - "--include-untracked", - "-m", - "server-completion-temp", - ]) - .current_dir(&path) - .output() - .map(|o| { - o.status.success() - && !String::from_utf8_lossy(&o.stdout) - .contains("No local changes to save") - }) - .unwrap_or(false) - } else { - return Ok((false, dirty_msg)); - } - } else { - false - }; - // AC5: Fail early if the coder finished with no commits on the feature branch. - // This prevents empty-diff stories from advancing through QA to merge. - if !crate::agents::gates::worktree_has_committed_work(&path) { - if stashed { - let _ = std::process::Command::new("git") - .args(["stash", "pop"]) - .current_dir(&path) - .output(); - } - return Ok(( - false, - "Agent exited with no commits on the feature branch. \ - The agent did not produce any code changes." - .to_string(), - )); - } - let result = crate::agents::gates::run_acceptance_gates(&path); - // Restore stashed uncommitted changes. - if stashed { - let _ = std::process::Command::new("git") - .args(["stash", "pop"]) - .current_dir(&path) - .output(); - } - result - }) - .await - { - Ok(Ok(result)) => result, - Ok(Err(e)) => (false, e), - Err(e) => (false, format!("Gate check task panicked: {e}")), - } - } else { - ( - false, - "No worktree path available to run acceptance gates".to_string(), - ) - }; - - slog!( - "[agents] Server-owned completion for '{story_id}:{agent_name}': gates_passed={gates_passed}" - ); - - let report = CompletionReport { - summary: "Agent process exited normally".to_string(), - gates_passed, - gate_output, - }; - - // Store completion report, extract data for pipeline advance, then - // remove the entry so completed agents never appear in list_agents. - let (tx, project_root_for_advance, wt_path_for_advance, merge_failure_reported_for_advance) = { - let mut lock = match agents.lock() { - Ok(a) => a, - Err(_) => return, - }; - let agent = match lock.get_mut(&key) { - Some(a) => a, - None => return, - }; - agent.completion = Some(report.clone()); - agent.session_id = session_id.clone(); - let tx = agent.tx.clone(); - let pr = agent.project_root.clone(); - let wt = agent.worktree_info.as_ref().map(|w| w.path.clone()); - let mfr = agent.merge_failure_reported; - lock.remove(&key); - (tx, pr, wt, mfr) - }; - // The completed session's ID is used to resume if gates fail. - let previous_session_id = session_id.clone(); - - // Emit Done so wait_for_agent unblocks. - let _ = tx.send(AgentEvent::Done { - story_id: story_id.to_string(), - agent_name: agent_name.to_string(), - session_id, - }); - - // Notify WebSocket clients that the agent is gone. - AgentPool::notify_agent_state_changed(&watcher_tx); - - // Advance the pipeline state machine in a background task. - spawn_pipeline_advance( - Arc::clone(agents), - port, - story_id, - agent_name, - report, - project_root_for_advance, - wt_path_for_advance, - watcher_tx, - merge_failure_reported_for_advance, - previous_session_id, - ); -} - -#[cfg(test)] -mod tests { - use super::super::super::AgentPool; - use super::*; - use crate::agents::{AgentEvent, AgentStatus, CompletionReport}; - use std::path::PathBuf; - use std::process::Command; - - fn init_git_repo(repo: &std::path::Path) { - Command::new("git") - .args(["init"]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["config", "user.email", "test@test.com"]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["config", "user.name", "Test"]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "--allow-empty", "-m", "init"]) - .current_dir(repo) - .output() - .unwrap(); - } - - // ── report_completion tests ──────────────────────────────────── - - #[tokio::test] - async fn report_completion_rejects_nonexistent_agent() { - let pool = AgentPool::new_test(3001); - let result = pool.report_completion("no_story", "no_bot", "done").await; - assert!(result.is_err()); - let msg = result.unwrap_err(); - assert!(msg.contains("No agent"), "unexpected: {msg}"); - } - - #[tokio::test] - async fn report_completion_rejects_non_running_agent() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("s6", "bot", AgentStatus::Completed); - - let result = pool.report_completion("s6", "bot", "done").await; - assert!(result.is_err()); - let msg = result.unwrap_err(); - assert!( - msg.contains("not running"), - "expected 'not running' in: {msg}" - ); - } - - #[tokio::test] - async fn report_completion_rejects_dirty_worktree() { - use std::fs; - use tempfile::tempdir; - - let tmp = tempdir().unwrap(); - let repo = tmp.path(); - - // Init a real git repo and make an initial commit - Command::new("git") - .args(["init"]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "--allow-empty", "-m", "init"]) - .current_dir(repo) - .output() - .unwrap(); - - // Write an uncommitted file - fs::write(repo.join("dirty.txt"), "not committed").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_path("s7", "bot", AgentStatus::Running, repo.to_path_buf()); - - let result = pool.report_completion("s7", "bot", "done").await; - assert!(result.is_err()); - let msg = result.unwrap_err(); - assert!( - msg.contains("uncommitted"), - "expected 'uncommitted' in: {msg}" - ); - } - - // ── server-owned completion tests ─────────────────────────────────────────── - - #[tokio::test] - async fn server_owned_completion_skips_when_already_completed() { - let pool = AgentPool::new_test(3001); - let report = CompletionReport { - summary: "Already done".to_string(), - gates_passed: true, - gate_output: String::new(), - }; - pool.inject_test_agent_with_completion( - "s10", - "coder-1", - AgentStatus::Completed, - PathBuf::from("/tmp/nonexistent"), - report, - ); - - // Subscribe before calling so we can check if Done event was emitted. - let mut rx = pool.subscribe("s10", "coder-1").unwrap(); - - run_server_owned_completion( - &pool.agents, - pool.port, - "s10", - "coder-1", - Some("sess-1".to_string()), - pool.watcher_tx.clone(), - ) - .await; - - // Status should remain Completed (unchanged) — no gate re-run. - let agents = pool.agents.lock().unwrap(); - let key = composite_key("s10", "coder-1"); - let agent = agents.get(&key).unwrap(); - assert_eq!(agent.status, AgentStatus::Completed); - // Summary should still be the original, not overwritten. - assert_eq!(agent.completion.as_ref().unwrap().summary, "Already done"); - drop(agents); - - // No Done event should have been emitted. - assert!( - rx.try_recv().is_err(), - "should not emit Done when completion already exists" - ); - } - - #[tokio::test] - async fn server_owned_completion_runs_gates_on_clean_worktree() { - use tempfile::tempdir; - - let tmp = tempdir().unwrap(); - let repo = tmp.path(); - init_git_repo(repo); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_path( - "s11", - "coder-1", - AgentStatus::Running, - repo.to_path_buf(), - ); - - let mut rx = pool.subscribe("s11", "coder-1").unwrap(); - - run_server_owned_completion( - &pool.agents, - pool.port, - "s11", - "coder-1", - Some("sess-2".to_string()), - pool.watcher_tx.clone(), - ) - .await; - - // Agent entry should be removed from the map after completion. - let agents = pool.agents.lock().unwrap(); - let key = composite_key("s11", "coder-1"); - assert!( - agents.get(&key).is_none(), - "agent should be removed from map after completion" - ); - drop(agents); - - // A Done event should have been emitted with the session_id. - let event = rx.try_recv().expect("should emit Done event"); - match &event { - AgentEvent::Done { session_id, .. } => { - assert_eq!(*session_id, Some("sess-2".to_string())); - } - other => panic!("expected Done event, got: {other:?}"), - } - } - - #[tokio::test] - async fn server_owned_completion_fails_on_dirty_worktree() { - use std::fs; - use tempfile::tempdir; - - let tmp = tempdir().unwrap(); - let repo = tmp.path(); - init_git_repo(repo); - // Create an uncommitted file. - fs::write(repo.join("dirty.txt"), "not committed").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_path( - "s12", - "coder-1", - AgentStatus::Running, - repo.to_path_buf(), - ); - - let mut rx = pool.subscribe("s12", "coder-1").unwrap(); - - run_server_owned_completion( - &pool.agents, - pool.port, - "s12", - "coder-1", - None, - pool.watcher_tx.clone(), - ) - .await; - - // Agent entry should be removed from the map after completion (even on failure). - let agents = pool.agents.lock().unwrap(); - let key = composite_key("s12", "coder-1"); - assert!( - agents.get(&key).is_none(), - "agent should be removed from map after failed completion" - ); - drop(agents); - - // A Done event should have been emitted. - let event = rx.try_recv().expect("should emit Done event"); - assert!( - matches!(event, AgentEvent::Done { .. }), - "expected Done event, got: {event:?}" - ); - } - - #[tokio::test] - async fn server_owned_completion_nonexistent_agent_is_noop() { - let pool = AgentPool::new_test(3001); - // Should not panic or error — just silently return. - run_server_owned_completion( - &pool.agents, - pool.port, - "nonexistent", - "bot", - None, - pool.watcher_tx.clone(), - ) - .await; - } - - /// Regression test for bug 445: a rate-limited mergemaster exits before - /// calling start_merge_agent_work. run_server_owned_completion must be a - /// no-op for mergemaster agents — it must not run acceptance gates and must - /// not advance the story to 5_done/ even when a passing script/test exists. - /// - /// Before the fix: run_server_owned_completion would call run_pipeline_advance - /// for the Mergemaster stage, which ran post-merge tests on master (they pass - /// because nothing changed), then called move_story_to_done — advancing the - /// story without any squash merge having occurred. - #[cfg(unix)] - #[tokio::test] - async fn server_owned_completion_is_noop_for_mergemaster() { - use std::fs; - use std::os::unix::fs::PermissionsExt; - use tempfile::tempdir; - - let tmp = tempdir().unwrap(); - let root = tmp.path(); - init_git_repo(root); - - // Create a passing script/test so post-merge tests would succeed if - // run_pipeline_advance were incorrectly called for this mergemaster. - let script_dir = root.join("script"); - fs::create_dir_all(&script_dir).unwrap(); - let script_test = script_dir.join("test"); - fs::write(&script_test, "#!/usr/bin/env sh\nexit 0\n").unwrap(); - let mut perms = fs::metadata(&script_test).unwrap().permissions(); - perms.set_mode(0o755); - fs::set_permissions(&script_test, perms).unwrap(); - - // Story in 4_merge/ — must NOT be moved to 5_done/. - let merge_dir = root.join(".huskies/work/4_merge"); - fs::create_dir_all(&merge_dir).unwrap(); - let story_path = merge_dir.join("99_story_merge445.md"); - fs::write(&story_path, "---\nname: Merge 445 Test\n---\n").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_path( - "99_story_merge445", - "mergemaster", - AgentStatus::Running, - root.to_path_buf(), - ); - - run_server_owned_completion( - &pool.agents, - pool.port, - "99_story_merge445", - "mergemaster", - None, - pool.watcher_tx.clone(), - ) - .await; - - // Wait briefly in case any background task fires. - tokio::time::sleep(std::time::Duration::from_millis(150)).await; - - // Story must remain in 4_merge/ — not moved to 5_done/. - let done_path = root.join(".huskies/work/5_done/99_story_merge445.md"); - assert!( - !done_path.exists(), - "Story must NOT be moved to 5_done/ when run_server_owned_completion \ - is (incorrectly) called for a mergemaster agent" - ); - assert!( - story_path.exists(), - "Story must remain in 4_merge/ when mergemaster completion is a no-op" - ); - - // The agent entry should remain in the pool (lifecycle cleanup is the - // caller's responsibility, not run_server_owned_completion's). - let agents = pool.agents.lock().unwrap(); - let key = composite_key("99_story_merge445", "mergemaster"); - assert!( - agents.get(&key).is_some(), - "Agent must remain in pool — run_server_owned_completion is a no-op for mergemaster" - ); - } - - /// Bug 645 + 651: when an agent crashes leaving dirty files but committed - /// work, server-owned completion should stash the dirty files during gates - /// and restore them afterward. Uncommitted work is never junk. - #[tokio::test] - async fn server_owned_completion_preserves_dirty_worktree_with_committed_work() { - use std::fs; - use tempfile::tempdir; - - let tmp = tempdir().unwrap(); - let project_root = tmp.path().join("project"); - fs::create_dir_all(&project_root).unwrap(); - init_git_repo(&project_root); - - // Create a worktree on a feature branch with committed code. - let wt_path = tmp.path().join("wt"); - Command::new("git") - .args([ - "worktree", - "add", - &wt_path.to_string_lossy(), - "-b", - "feature/story-645_test", - ]) - .current_dir(&project_root) - .output() - .unwrap(); - - // Commit a valid file. - fs::write(wt_path.join("work.txt"), "done").unwrap(); - Command::new("git") - .args(["add", "."]) - .current_dir(&wt_path) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "-m", "coder: add work"]) - .current_dir(&wt_path) - .output() - .unwrap(); - - // Now simulate crash leaving dirty files. - fs::write(wt_path.join("dirty.txt"), "crash residue").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_path( - "645_test", - "coder-1", - AgentStatus::Running, - wt_path.clone(), - ); - - let mut rx = pool.subscribe("645_test", "coder-1").unwrap(); - - run_server_owned_completion( - &pool.agents, - pool.port, - "645_test", - "coder-1", - Some("sess-645".to_string()), - pool.watcher_tx.clone(), - ) - .await; - - // Bug 651: The dirty file must be PRESERVED — uncommitted work is never junk. - assert!( - wt_path.join("dirty.txt").exists(), - "dirty file should be preserved after server-owned completion (bug 651)" - ); - assert_eq!( - fs::read_to_string(wt_path.join("dirty.txt")).unwrap(), - "crash residue", - "dirty file contents should be unchanged" - ); - - // A Done event should have been emitted (completion ran, didn't fail - // on dirty worktree). - let event = rx.try_recv().expect("should emit Done event"); - assert!( - matches!(event, AgentEvent::Done { .. }), - "expected Done event, got: {event:?}" - ); - } - - /// AC3 (bug 651): simulate an agent killed by the watchdog with a - /// substantive uncommitted diff. After the orchestrator's full - /// post-termination handling (gates, completion, advance check), - /// `git status --short` must still show the same modified files. - #[tokio::test] - async fn watchdog_kill_preserves_uncommitted_diff() { - use std::fs; - use tempfile::tempdir; - - let tmp = tempdir().unwrap(); - let project_root = tmp.path().join("project"); - fs::create_dir_all(&project_root).unwrap(); - init_git_repo(&project_root); - - // Create a worktree on a feature branch with committed code. - let wt_path = tmp.path().join("wt"); - Command::new("git") - .args([ - "worktree", - "add", - &wt_path.to_string_lossy(), - "-b", - "feature/story-651_watchdog", - ]) - .current_dir(&project_root) - .output() - .unwrap(); - - // Commit some work. - fs::write(wt_path.join("committed.txt"), "committed work").unwrap(); - Command::new("git") - .args(["add", "."]) - .current_dir(&wt_path) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "-m", "coder: add committed work"]) - .current_dir(&wt_path) - .output() - .unwrap(); - - // Simulate substantive uncommitted diff left by watchdog kill. - fs::write(wt_path.join("in_progress.rs"), "fn wip() {}").unwrap(); - fs::write(wt_path.join("committed.txt"), "modified after commit").unwrap(); - - // Snapshot git status before completion. - let status_before = Command::new("git") - .args(["status", "--short"]) - .current_dir(&wt_path) - .output() - .unwrap(); - let status_before = String::from_utf8_lossy(&status_before.stdout) - .trim() - .to_string(); - assert!( - !status_before.is_empty(), - "pre-condition: worktree should be dirty" - ); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_path( - "651_watchdog", - "coder-1", - AgentStatus::Running, - wt_path.clone(), - ); - - run_server_owned_completion( - &pool.agents, - pool.port, - "651_watchdog", - "coder-1", - Some("sess-651".to_string()), - pool.watcher_tx.clone(), - ) - .await; - - // After full post-termination handling, git status must be unchanged. - let status_after = Command::new("git") - .args(["status", "--short"]) - .current_dir(&wt_path) - .output() - .unwrap(); - let status_after = String::from_utf8_lossy(&status_after.stdout) - .trim() - .to_string(); - - assert_eq!( - status_before, status_after, - "Bug 651: uncommitted diff must survive post-termination handling.\n\ - Before: {status_before}\nAfter: {status_after}" - ); - - // Verify file contents are intact. - assert_eq!( - fs::read_to_string(wt_path.join("in_progress.rs")).unwrap(), - "fn wip() {}", - ); - assert_eq!( - fs::read_to_string(wt_path.join("committed.txt")).unwrap(), - "modified after commit", - ); - } - - /// AC4 (bug 651 regression for 645): when an agent crashes with committed - /// work AND uncommitted noise, the auto-advance still picks up the - /// committed work. The committed-state check is authoritative; the - /// uncommitted state is just preserved on disk for the next agent. - #[tokio::test] - async fn committed_work_advances_despite_uncommitted_noise() { - use std::fs; - use tempfile::tempdir; - - let tmp = tempdir().unwrap(); - let project_root = tmp.path().join("project"); - fs::create_dir_all(&project_root).unwrap(); - init_git_repo(&project_root); - - // Create a minimal Cargo project so cargo check works. - fs::write( - project_root.join("Cargo.toml"), - "[package]\nname = \"test_proj\"\nversion = \"0.1.0\"\nedition = \"2021\"\n", - ) - .unwrap(); - fs::create_dir_all(project_root.join("src")).unwrap(); - fs::write(project_root.join("src/lib.rs"), "// empty\n").unwrap(); - Command::new("git") - .args(["add", "."]) - .current_dir(&project_root) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "-m", "add cargo project"]) - .current_dir(&project_root) - .output() - .unwrap(); - - // Create a worktree on a feature branch. - let wt_path = tmp.path().join("wt"); - Command::new("git") - .args([ - "worktree", - "add", - &wt_path.to_string_lossy(), - "-b", - "feature/story-651_regression", - ]) - .current_dir(&project_root) - .output() - .unwrap(); - - // Commit valid code on the feature branch. - fs::write(wt_path.join("src/lib.rs"), "pub fn good() {}\n").unwrap(); - Command::new("git") - .args(["add", "."]) - .current_dir(&wt_path) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "-m", "add good fn"]) - .current_dir(&wt_path) - .output() - .unwrap(); - - // Simulate crash leaving uncommitted noise (broken syntax in tracked file). - fs::write(wt_path.join("src/lib.rs"), "THIS IS BROKEN SYNTAX!!!\n").unwrap(); - fs::write(wt_path.join("crash_junk.tmp"), "untracked noise").unwrap(); - - // The "work survived" check should detect committed work and pass cargo check - // despite the dirty worktree, WITHOUT destroying the dirty files. - assert!( - crate::agents::gates::worktree_has_committed_work(&wt_path), - "committed work should be detected" - ); - assert!( - crate::agents::gates::cargo_check_in_worktree(&wt_path), - "cargo check should pass on committed code (stash/pop, not reset)" - ); - - // Dirty files must still exist after the check. - assert_eq!( - fs::read_to_string(wt_path.join("src/lib.rs")).unwrap(), - "THIS IS BROKEN SYNTAX!!!\n", - "uncommitted noise in tracked file must be preserved" - ); - assert!( - wt_path.join("crash_junk.tmp").exists(), - "untracked noise file must be preserved" - ); - } -} diff --git a/server/src/agents/pool/pipeline/completion/legacy.rs b/server/src/agents/pool/pipeline/completion/legacy.rs new file mode 100644 index 00000000..f75eaee1 --- /dev/null +++ b/server/src/agents/pool/pipeline/completion/legacy.rs @@ -0,0 +1,136 @@ +//! Legacy `report_completion` — retained for backwards compatibility and testing. +use std::sync::Arc; + +use super::super::super::super::{AgentEvent, AgentStatus, CompletionReport}; +use super::super::super::{AgentPool, composite_key}; + +impl AgentPool { + /// Internal: report that an agent has finished work on a story. + /// + /// **Note:** This is no longer exposed as an MCP tool. The server now + /// automatically runs completion gates when an agent process exits + /// (see `run_server_owned_completion`). This method is retained for + /// backwards compatibility and testing. + /// + /// - Rejects with an error if the worktree has uncommitted changes. + /// - Runs acceptance gates (cargo clippy + cargo nextest run / cargo test). + /// - Stores the `CompletionReport` on the agent record. + /// - Transitions status to `Completed` (gates passed) or `Failed` (gates failed). + /// - Emits a `Done` event so `wait_for_agent` unblocks. + #[allow(dead_code)] + pub async fn report_completion( + &self, + story_id: &str, + agent_name: &str, + summary: &str, + ) -> Result { + let key = composite_key(story_id, agent_name); + + // Verify agent exists, is Running, and grab its worktree path. + let worktree_path = { + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let agent = agents + .get(&key) + .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; + + if agent.status != AgentStatus::Running { + return Err(format!( + "Agent '{agent_name}' for story '{story_id}' is not running (status: {}). \ + report_completion can only be called by a running agent.", + agent.status + )); + } + + agent + .worktree_info + .as_ref() + .map(|wt| wt.path.clone()) + .ok_or_else(|| { + format!( + "Agent '{agent_name}' for story '{story_id}' has no worktree. \ + Cannot run acceptance gates." + ) + })? + }; + + let path = worktree_path.clone(); + + // Run gate checks in a blocking thread to avoid stalling the async runtime. + let (gates_passed, gate_output) = tokio::task::spawn_blocking(move || { + // Step 1: Reject if worktree is dirty. + crate::agents::gates::check_uncommitted_changes(&path)?; + // Step 2: Run clippy + tests and return (passed, output). + crate::agents::gates::run_acceptance_gates(&path) + }) + .await + .map_err(|e| format!("Gate check task panicked: {e}"))??; + + let report = CompletionReport { + summary: summary.to_string(), + gates_passed, + gate_output, + }; + + // Extract data for pipeline advance, then remove the entry so + // completed agents never appear in list_agents. + let ( + tx, + session_id, + project_root_for_advance, + wt_path_for_advance, + merge_failure_reported_for_advance, + session_id_for_advance, + ) = { + let mut agents = self.agents.lock().map_err(|e| e.to_string())?; + let agent = agents.get_mut(&key).ok_or_else(|| { + format!("Agent '{agent_name}' for story '{story_id}' disappeared during gate check") + })?; + agent.completion = Some(report.clone()); + let tx = agent.tx.clone(); + let sid = agent.session_id.clone(); + let pr = agent.project_root.clone(); + let wt = agent.worktree_info.as_ref().map(|w| w.path.clone()); + let mfr = agent.merge_failure_reported; + let sid_advance = agent.session_id.clone(); + agents.remove(&key); + (tx, sid, pr, wt, mfr, sid_advance) + }; + + // Emit Done so wait_for_agent unblocks. + let _ = tx.send(AgentEvent::Done { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + session_id, + }); + + // Notify WebSocket clients that the agent is gone. + Self::notify_agent_state_changed(&self.watcher_tx); + + // Advance the pipeline state machine in a background task. + let pool_clone = Self { + agents: Arc::clone(&self.agents), + port: self.port, + child_killers: Arc::clone(&self.child_killers), + watcher_tx: self.watcher_tx.clone(), + status_broadcaster: Arc::clone(&self.status_broadcaster), + }; + let sid = story_id.to_string(); + let aname = agent_name.to_string(); + let report_for_advance = report.clone(); + tokio::spawn(async move { + pool_clone + .run_pipeline_advance( + &sid, + &aname, + report_for_advance, + project_root_for_advance, + wt_path_for_advance, + merge_failure_reported_for_advance, + session_id_for_advance, + ) + .await; + }); + + Ok(report) + } +} diff --git a/server/src/agents/pool/pipeline/completion/mod.rs b/server/src/agents/pool/pipeline/completion/mod.rs new file mode 100644 index 00000000..dcd7f806 --- /dev/null +++ b/server/src/agents/pool/pipeline/completion/mod.rs @@ -0,0 +1,8 @@ +//! Agent completion handling — processes exit results and triggers pipeline advancement. + +mod legacy; +mod server; +#[cfg(test)] +mod tests; + +pub(in crate::agents::pool) use server::run_server_owned_completion; diff --git a/server/src/agents/pool/pipeline/completion/server.rs b/server/src/agents/pool/pipeline/completion/server.rs new file mode 100644 index 00000000..bf26c7ef --- /dev/null +++ b/server/src/agents/pool/pipeline/completion/server.rs @@ -0,0 +1,236 @@ +//! Server-owned completion: runs acceptance gates when an agent process exits normally. +use crate::io::watcher::WatcherEvent; +use crate::slog; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; + +use super::super::super::super::{AgentEvent, CompletionReport, PipelineStage, pipeline_stage}; +use super::super::super::{AgentPool, StoryAgent, composite_key}; +use super::super::advance::spawn_pipeline_advance; + +/// Server-owned completion: runs acceptance gates when an agent process exits +/// normally, and advances the pipeline based on results. +/// +/// This is a **free function** (not a method on `AgentPool`) to break the +/// opaque type cycle that would otherwise arise: `start_agent` → spawned task +/// → server-owned completion → pipeline advance → `start_agent`. +/// +/// If the agent already has a completion report (e.g. from a legacy +/// `report_completion` call), this is a no-op to avoid double-running gates. +pub(in crate::agents::pool) async fn run_server_owned_completion( + agents: &Arc>>, + port: u16, + story_id: &str, + agent_name: &str, + session_id: Option, + watcher_tx: broadcast::Sender, +) { + let key = composite_key(story_id, agent_name); + + // Guard: mergemaster agents have their own completion path via + // start_merge_agent_work / run_merge_pipeline. Running server-owned gates + // for a mergemaster would wrongly advance the story to 5_done/ even when + // no squash merge has occurred (e.g. rate-limited exit before the agent + // called start_merge_agent_work). The lifecycle caller is responsible for + // cleaning up the agent entry and triggering auto-assign. + if pipeline_stage(agent_name) == PipelineStage::Mergemaster { + slog!( + "[agents] run_server_owned_completion skipped for mergemaster \ + '{story_id}:{agent_name}'; mergemaster completion is handled by \ + start_merge_agent_work." + ); + return; + } + + // Guard: skip if completion was already recorded (legacy path). + { + let lock = match agents.lock() { + Ok(a) => a, + Err(_) => return, + }; + match lock.get(&key) { + Some(agent) if agent.completion.is_some() => { + slog!( + "[agents] Completion already recorded for '{story_id}:{agent_name}'; \ + skipping server-owned gates." + ); + return; + } + Some(_) => {} + None => return, + } + } + + // Get worktree path for running gates. + let worktree_path = { + let lock = match agents.lock() { + Ok(a) => a, + Err(_) => return, + }; + lock.get(&key) + .and_then(|a| a.worktree_info.as_ref().map(|wt| wt.path.clone())) + }; + + // Kill any in-flight cargo test processes for this worktree so they don't + // hold the build lock while gates try to run. + if let Some(wt_path) = worktree_path.as_ref() + && let Ok(output) = std::process::Command::new("pgrep") + .args([ + "-f", + &format!("--manifest-path {}/Cargo.toml", wt_path.display()), + ]) + .output() + { + let pids = String::from_utf8_lossy(&output.stdout); + for pid_str in pids.lines() { + if let Ok(pid) = pid_str.trim().parse::() { + crate::slog!( + "[agents] Killing stale cargo process (pid {pid}) for '{story_id}' before running gates" + ); + unsafe { + libc::kill(pid, libc::SIGKILL); + } + } + } + } + + // Run acceptance gates. + let (gates_passed, gate_output) = if let Some(wt_path) = worktree_path { + let path = wt_path; + match tokio::task::spawn_blocking(move || { + // If the worktree is dirty, check whether committed work survived. + // An agent crash (e.g. Claude Code CLI's `output.write(&bytes).is_ok()` + // assertion — bug 645) can leave uncommitted files behind even though + // the agent already committed valid work. In that case, stash the + // dirty files and proceed with gates on the committed code. + // Uncommitted work is never junk — it may be the next agent session's + // starting point (bug 651). + let stashed = + if let Err(dirty_msg) = crate::agents::gates::check_uncommitted_changes(&path) { + if crate::agents::gates::worktree_has_committed_work(&path) { + crate::slog!( + "[agents] Worktree dirty but committed work exists — \ + stashing uncommitted changes and proceeding with gates. \ + Dirty state: {dirty_msg}" + ); + // Stash dirty files so gates run against committed code only. + // They will be restored after gates complete. + std::process::Command::new("git") + .args([ + "stash", + "push", + "--include-untracked", + "-m", + "server-completion-temp", + ]) + .current_dir(&path) + .output() + .map(|o| { + o.status.success() + && !String::from_utf8_lossy(&o.stdout) + .contains("No local changes to save") + }) + .unwrap_or(false) + } else { + return Ok((false, dirty_msg)); + } + } else { + false + }; + // AC5: Fail early if the coder finished with no commits on the feature branch. + // This prevents empty-diff stories from advancing through QA to merge. + if !crate::agents::gates::worktree_has_committed_work(&path) { + if stashed { + let _ = std::process::Command::new("git") + .args(["stash", "pop"]) + .current_dir(&path) + .output(); + } + return Ok(( + false, + "Agent exited with no commits on the feature branch. \ + The agent did not produce any code changes." + .to_string(), + )); + } + let result = crate::agents::gates::run_acceptance_gates(&path); + // Restore stashed uncommitted changes. + if stashed { + let _ = std::process::Command::new("git") + .args(["stash", "pop"]) + .current_dir(&path) + .output(); + } + result + }) + .await + { + Ok(Ok(result)) => result, + Ok(Err(e)) => (false, e), + Err(e) => (false, format!("Gate check task panicked: {e}")), + } + } else { + ( + false, + "No worktree path available to run acceptance gates".to_string(), + ) + }; + + slog!( + "[agents] Server-owned completion for '{story_id}:{agent_name}': gates_passed={gates_passed}" + ); + + let report = CompletionReport { + summary: "Agent process exited normally".to_string(), + gates_passed, + gate_output, + }; + + // Store completion report, extract data for pipeline advance, then + // remove the entry so completed agents never appear in list_agents. + let (tx, project_root_for_advance, wt_path_for_advance, merge_failure_reported_for_advance) = { + let mut lock = match agents.lock() { + Ok(a) => a, + Err(_) => return, + }; + let agent = match lock.get_mut(&key) { + Some(a) => a, + None => return, + }; + agent.completion = Some(report.clone()); + agent.session_id = session_id.clone(); + let tx = agent.tx.clone(); + let pr = agent.project_root.clone(); + let wt = agent.worktree_info.as_ref().map(|w| w.path.clone()); + let mfr = agent.merge_failure_reported; + lock.remove(&key); + (tx, pr, wt, mfr) + }; + // The completed session's ID is used to resume if gates fail. + let previous_session_id = session_id.clone(); + + // Emit Done so wait_for_agent unblocks. + let _ = tx.send(AgentEvent::Done { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + session_id, + }); + + // Notify WebSocket clients that the agent is gone. + AgentPool::notify_agent_state_changed(&watcher_tx); + + // Advance the pipeline state machine in a background task. + spawn_pipeline_advance( + Arc::clone(agents), + port, + story_id, + agent_name, + report, + project_root_for_advance, + wt_path_for_advance, + watcher_tx, + merge_failure_reported_for_advance, + previous_session_id, + ); +} diff --git a/server/src/agents/pool/pipeline/completion/tests.rs b/server/src/agents/pool/pipeline/completion/tests.rs new file mode 100644 index 00000000..47045c9f --- /dev/null +++ b/server/src/agents/pool/pipeline/completion/tests.rs @@ -0,0 +1,583 @@ +use super::super::super::AgentPool; +use super::*; +use crate::agents::{AgentEvent, AgentStatus, CompletionReport}; +use std::path::PathBuf; +use std::process::Command; + +fn init_git_repo(repo: &std::path::Path) { + Command::new("git") + .args(["init"]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["config", "user.email", "test@test.com"]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["config", "user.name", "Test"]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "--allow-empty", "-m", "init"]) + .current_dir(repo) + .output() + .unwrap(); +} + +// ── report_completion tests ──────────────────────────────────── + +#[tokio::test] +async fn report_completion_rejects_nonexistent_agent() { + let pool = AgentPool::new_test(3001); + let result = pool.report_completion("no_story", "no_bot", "done").await; + assert!(result.is_err()); + let msg = result.unwrap_err(); + assert!(msg.contains("No agent"), "unexpected: {msg}"); +} + +#[tokio::test] +async fn report_completion_rejects_non_running_agent() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("s6", "bot", AgentStatus::Completed); + + let result = pool.report_completion("s6", "bot", "done").await; + assert!(result.is_err()); + let msg = result.unwrap_err(); + assert!( + msg.contains("not running"), + "expected 'not running' in: {msg}" + ); +} + +#[tokio::test] +async fn report_completion_rejects_dirty_worktree() { + use std::fs; + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let repo = tmp.path(); + + // Init a real git repo and make an initial commit + Command::new("git") + .args(["init"]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "--allow-empty", "-m", "init"]) + .current_dir(repo) + .output() + .unwrap(); + + // Write an uncommitted file + fs::write(repo.join("dirty.txt"), "not committed").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_path("s7", "bot", AgentStatus::Running, repo.to_path_buf()); + + let result = pool.report_completion("s7", "bot", "done").await; + assert!(result.is_err()); + let msg = result.unwrap_err(); + assert!( + msg.contains("uncommitted"), + "expected 'uncommitted' in: {msg}" + ); +} + +// ── server-owned completion tests ─────────────────────────────────────────── + +#[tokio::test] +async fn server_owned_completion_skips_when_already_completed() { + let pool = AgentPool::new_test(3001); + let report = CompletionReport { + summary: "Already done".to_string(), + gates_passed: true, + gate_output: String::new(), + }; + pool.inject_test_agent_with_completion( + "s10", + "coder-1", + AgentStatus::Completed, + PathBuf::from("/tmp/nonexistent"), + report, + ); + + // Subscribe before calling so we can check if Done event was emitted. + let mut rx = pool.subscribe("s10", "coder-1").unwrap(); + + run_server_owned_completion( + &pool.agents, + pool.port, + "s10", + "coder-1", + Some("sess-1".to_string()), + pool.watcher_tx.clone(), + ) + .await; + + // Status should remain Completed (unchanged) — no gate re-run. + let agents = pool.agents.lock().unwrap(); + let key = super::super::super::composite_key("s10", "coder-1"); + let agent = agents.get(&key).unwrap(); + assert_eq!(agent.status, AgentStatus::Completed); + // Summary should still be the original, not overwritten. + assert_eq!(agent.completion.as_ref().unwrap().summary, "Already done"); + drop(agents); + + // No Done event should have been emitted. + assert!( + rx.try_recv().is_err(), + "should not emit Done when completion already exists" + ); +} + +#[tokio::test] +async fn server_owned_completion_runs_gates_on_clean_worktree() { + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let repo = tmp.path(); + init_git_repo(repo); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_path("s11", "coder-1", AgentStatus::Running, repo.to_path_buf()); + + let mut rx = pool.subscribe("s11", "coder-1").unwrap(); + + run_server_owned_completion( + &pool.agents, + pool.port, + "s11", + "coder-1", + Some("sess-2".to_string()), + pool.watcher_tx.clone(), + ) + .await; + + // Agent entry should be removed from the map after completion. + let agents = pool.agents.lock().unwrap(); + let key = super::super::super::composite_key("s11", "coder-1"); + assert!( + agents.get(&key).is_none(), + "agent should be removed from map after completion" + ); + drop(agents); + + // A Done event should have been emitted with the session_id. + let event = rx.try_recv().expect("should emit Done event"); + match &event { + AgentEvent::Done { session_id, .. } => { + assert_eq!(*session_id, Some("sess-2".to_string())); + } + other => panic!("expected Done event, got: {other:?}"), + } +} + +#[tokio::test] +async fn server_owned_completion_fails_on_dirty_worktree() { + use std::fs; + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let repo = tmp.path(); + init_git_repo(repo); + // Create an uncommitted file. + fs::write(repo.join("dirty.txt"), "not committed").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_path("s12", "coder-1", AgentStatus::Running, repo.to_path_buf()); + + let mut rx = pool.subscribe("s12", "coder-1").unwrap(); + + run_server_owned_completion( + &pool.agents, + pool.port, + "s12", + "coder-1", + None, + pool.watcher_tx.clone(), + ) + .await; + + // Agent entry should be removed from the map after completion (even on failure). + let agents = pool.agents.lock().unwrap(); + let key = super::super::super::composite_key("s12", "coder-1"); + assert!( + agents.get(&key).is_none(), + "agent should be removed from map after failed completion" + ); + drop(agents); + + // A Done event should have been emitted. + let event = rx.try_recv().expect("should emit Done event"); + assert!( + matches!(event, AgentEvent::Done { .. }), + "expected Done event, got: {event:?}" + ); +} + +#[tokio::test] +async fn server_owned_completion_nonexistent_agent_is_noop() { + let pool = AgentPool::new_test(3001); + // Should not panic or error — just silently return. + run_server_owned_completion( + &pool.agents, + pool.port, + "nonexistent", + "bot", + None, + pool.watcher_tx.clone(), + ) + .await; +} + +/// Regression test for bug 445: a rate-limited mergemaster exits before +/// calling start_merge_agent_work. run_server_owned_completion must be a +/// no-op for mergemaster agents — it must not run acceptance gates and must +/// not advance the story to 5_done/ even when a passing script/test exists. +/// +/// Before the fix: run_server_owned_completion would call run_pipeline_advance +/// for the Mergemaster stage, which ran post-merge tests on master (they pass +/// because nothing changed), then called move_story_to_done — advancing the +/// story without any squash merge having occurred. +#[cfg(unix)] +#[tokio::test] +async fn server_owned_completion_is_noop_for_mergemaster() { + use std::fs; + use std::os::unix::fs::PermissionsExt; + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let root = tmp.path(); + init_git_repo(root); + + // Create a passing script/test so post-merge tests would succeed if + // run_pipeline_advance were incorrectly called for this mergemaster. + let script_dir = root.join("script"); + fs::create_dir_all(&script_dir).unwrap(); + let script_test = script_dir.join("test"); + fs::write(&script_test, "#!/usr/bin/env sh\nexit 0\n").unwrap(); + let mut perms = fs::metadata(&script_test).unwrap().permissions(); + perms.set_mode(0o755); + fs::set_permissions(&script_test, perms).unwrap(); + + // Story in 4_merge/ — must NOT be moved to 5_done/. + let merge_dir = root.join(".huskies/work/4_merge"); + fs::create_dir_all(&merge_dir).unwrap(); + let story_path = merge_dir.join("99_story_merge445.md"); + fs::write(&story_path, "---\nname: Merge 445 Test\n---\n").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_path( + "99_story_merge445", + "mergemaster", + AgentStatus::Running, + root.to_path_buf(), + ); + + run_server_owned_completion( + &pool.agents, + pool.port, + "99_story_merge445", + "mergemaster", + None, + pool.watcher_tx.clone(), + ) + .await; + + // Wait briefly in case any background task fires. + tokio::time::sleep(std::time::Duration::from_millis(150)).await; + + // Story must remain in 4_merge/ — not moved to 5_done/. + let done_path = root.join(".huskies/work/5_done/99_story_merge445.md"); + assert!( + !done_path.exists(), + "Story must NOT be moved to 5_done/ when run_server_owned_completion \ + is (incorrectly) called for a mergemaster agent" + ); + assert!( + story_path.exists(), + "Story must remain in 4_merge/ when mergemaster completion is a no-op" + ); + + // The agent entry should remain in the pool (lifecycle cleanup is the + // caller's responsibility, not run_server_owned_completion's). + let agents = pool.agents.lock().unwrap(); + let key = super::super::super::composite_key("99_story_merge445", "mergemaster"); + assert!( + agents.get(&key).is_some(), + "Agent must remain in pool — run_server_owned_completion is a no-op for mergemaster" + ); +} + +/// Bug 645 + 651: when an agent crashes leaving dirty files but committed +/// work, server-owned completion should stash the dirty files during gates +/// and restore them afterward. Uncommitted work is never junk. +#[tokio::test] +async fn server_owned_completion_preserves_dirty_worktree_with_committed_work() { + use std::fs; + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let project_root = tmp.path().join("project"); + fs::create_dir_all(&project_root).unwrap(); + init_git_repo(&project_root); + + // Create a worktree on a feature branch with committed code. + let wt_path = tmp.path().join("wt"); + Command::new("git") + .args([ + "worktree", + "add", + &wt_path.to_string_lossy(), + "-b", + "feature/story-645_test", + ]) + .current_dir(&project_root) + .output() + .unwrap(); + + // Commit a valid file. + fs::write(wt_path.join("work.txt"), "done").unwrap(); + Command::new("git") + .args(["add", "."]) + .current_dir(&wt_path) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "-m", "coder: add work"]) + .current_dir(&wt_path) + .output() + .unwrap(); + + // Now simulate crash leaving dirty files. + fs::write(wt_path.join("dirty.txt"), "crash residue").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_path("645_test", "coder-1", AgentStatus::Running, wt_path.clone()); + + let mut rx = pool.subscribe("645_test", "coder-1").unwrap(); + + run_server_owned_completion( + &pool.agents, + pool.port, + "645_test", + "coder-1", + Some("sess-645".to_string()), + pool.watcher_tx.clone(), + ) + .await; + + // Bug 651: The dirty file must be PRESERVED — uncommitted work is never junk. + assert!( + wt_path.join("dirty.txt").exists(), + "dirty file should be preserved after server-owned completion (bug 651)" + ); + assert_eq!( + fs::read_to_string(wt_path.join("dirty.txt")).unwrap(), + "crash residue", + "dirty file contents should be unchanged" + ); + + // A Done event should have been emitted (completion ran, didn't fail + // on dirty worktree). + let event = rx.try_recv().expect("should emit Done event"); + assert!( + matches!(event, AgentEvent::Done { .. }), + "expected Done event, got: {event:?}" + ); +} + +/// AC3 (bug 651): simulate an agent killed by the watchdog with a +/// substantive uncommitted diff. After the orchestrator's full +/// post-termination handling (gates, completion, advance check), +/// `git status --short` must still show the same modified files. +#[tokio::test] +async fn watchdog_kill_preserves_uncommitted_diff() { + use std::fs; + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let project_root = tmp.path().join("project"); + fs::create_dir_all(&project_root).unwrap(); + init_git_repo(&project_root); + + // Create a worktree on a feature branch with committed code. + let wt_path = tmp.path().join("wt"); + Command::new("git") + .args([ + "worktree", + "add", + &wt_path.to_string_lossy(), + "-b", + "feature/story-651_watchdog", + ]) + .current_dir(&project_root) + .output() + .unwrap(); + + // Commit some work. + fs::write(wt_path.join("committed.txt"), "committed work").unwrap(); + Command::new("git") + .args(["add", "."]) + .current_dir(&wt_path) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "-m", "coder: add committed work"]) + .current_dir(&wt_path) + .output() + .unwrap(); + + // Simulate substantive uncommitted diff left by watchdog kill. + fs::write(wt_path.join("in_progress.rs"), "fn wip() {}").unwrap(); + fs::write(wt_path.join("committed.txt"), "modified after commit").unwrap(); + + // Snapshot git status before completion. + let status_before = Command::new("git") + .args(["status", "--short"]) + .current_dir(&wt_path) + .output() + .unwrap(); + let status_before = String::from_utf8_lossy(&status_before.stdout) + .trim() + .to_string(); + assert!( + !status_before.is_empty(), + "pre-condition: worktree should be dirty" + ); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent_with_path( + "651_watchdog", + "coder-1", + AgentStatus::Running, + wt_path.clone(), + ); + + run_server_owned_completion( + &pool.agents, + pool.port, + "651_watchdog", + "coder-1", + Some("sess-651".to_string()), + pool.watcher_tx.clone(), + ) + .await; + + // After full post-termination handling, git status must be unchanged. + let status_after = Command::new("git") + .args(["status", "--short"]) + .current_dir(&wt_path) + .output() + .unwrap(); + let status_after = String::from_utf8_lossy(&status_after.stdout) + .trim() + .to_string(); + + assert_eq!( + status_before, status_after, + "Bug 651: uncommitted diff must survive post-termination handling.\n\ + Before: {status_before}\nAfter: {status_after}" + ); + + // Verify file contents are intact. + assert_eq!( + fs::read_to_string(wt_path.join("in_progress.rs")).unwrap(), + "fn wip() {}", + ); + assert_eq!( + fs::read_to_string(wt_path.join("committed.txt")).unwrap(), + "modified after commit", + ); +} + +/// AC4 (bug 651 regression for 645): when an agent crashes with committed +/// work AND uncommitted noise, the auto-advance still picks up the +/// committed work. The committed-state check is authoritative; the +/// uncommitted state is just preserved on disk for the next agent. +#[tokio::test] +async fn committed_work_advances_despite_uncommitted_noise() { + use std::fs; + use tempfile::tempdir; + + let tmp = tempdir().unwrap(); + let project_root = tmp.path().join("project"); + fs::create_dir_all(&project_root).unwrap(); + init_git_repo(&project_root); + + // Create a minimal Cargo project so cargo check works. + fs::write( + project_root.join("Cargo.toml"), + "[package]\nname = \"test_proj\"\nversion = \"0.1.0\"\nedition = \"2021\"\n", + ) + .unwrap(); + fs::create_dir_all(project_root.join("src")).unwrap(); + fs::write(project_root.join("src/lib.rs"), "// empty\n").unwrap(); + Command::new("git") + .args(["add", "."]) + .current_dir(&project_root) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "-m", "add cargo project"]) + .current_dir(&project_root) + .output() + .unwrap(); + + // Create a worktree on a feature branch. + let wt_path = tmp.path().join("wt"); + Command::new("git") + .args([ + "worktree", + "add", + &wt_path.to_string_lossy(), + "-b", + "feature/story-651_regression", + ]) + .current_dir(&project_root) + .output() + .unwrap(); + + // Commit valid code on the feature branch. + fs::write(wt_path.join("src/lib.rs"), "pub fn good() {}\n").unwrap(); + Command::new("git") + .args(["add", "."]) + .current_dir(&wt_path) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "-m", "add good fn"]) + .current_dir(&wt_path) + .output() + .unwrap(); + + // Simulate crash leaving uncommitted noise (broken syntax in tracked file). + fs::write(wt_path.join("src/lib.rs"), "THIS IS BROKEN SYNTAX!!!\n").unwrap(); + fs::write(wt_path.join("crash_junk.tmp"), "untracked noise").unwrap(); + + // The "work survived" check should detect committed work and pass cargo check + // despite the dirty worktree, WITHOUT destroying the dirty files. + assert!( + crate::agents::gates::worktree_has_committed_work(&wt_path), + "committed work should be detected" + ); + assert!( + crate::agents::gates::cargo_check_in_worktree(&wt_path), + "cargo check should pass on committed code (stash/pop, not reset)" + ); + + // Dirty files must still exist after the check. + assert_eq!( + fs::read_to_string(wt_path.join("src/lib.rs")).unwrap(), + "THIS IS BROKEN SYNTAX!!!\n", + "uncommitted noise in tracked file must be preserved" + ); + assert!( + wt_path.join("crash_junk.tmp").exists(), + "untracked noise file must be preserved" + ); +}