Files
huskies/server/src/agents/pool/pipeline/completion.rs
T

614 lines
21 KiB
Rust
Raw Normal View History

use crate::slog;
use crate::io::watcher::WatcherEvent;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use super::super::super::{AgentEvent, AgentStatus, CompletionReport, PipelineStage, pipeline_stage};
use super::super::{AgentPool, StoryAgent, composite_key};
use super::advance::spawn_pipeline_advance;
impl AgentPool {
/// Internal: report that an agent has finished work on a story.
///
/// **Note:** This is no longer exposed as an MCP tool. The server now
/// automatically runs completion gates when an agent process exits
/// (see `run_server_owned_completion`). This method is retained for
/// backwards compatibility and testing.
///
/// - Rejects with an error if the worktree has uncommitted changes.
/// - Runs acceptance gates (cargo clippy + cargo nextest run / cargo test).
/// - Stores the `CompletionReport` on the agent record.
/// - Transitions status to `Completed` (gates passed) or `Failed` (gates failed).
/// - Emits a `Done` event so `wait_for_agent` unblocks.
#[allow(dead_code)]
pub async fn report_completion(
&self,
story_id: &str,
agent_name: &str,
summary: &str,
) -> Result<CompletionReport, String> {
let key = composite_key(story_id, agent_name);
// Verify agent exists, is Running, and grab its worktree path.
let worktree_path = {
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
if agent.status != AgentStatus::Running {
return Err(format!(
"Agent '{agent_name}' for story '{story_id}' is not running (status: {}). \
report_completion can only be called by a running agent.",
agent.status
));
}
agent
.worktree_info
.as_ref()
.map(|wt| wt.path.clone())
.ok_or_else(|| {
format!(
"Agent '{agent_name}' for story '{story_id}' has no worktree. \
Cannot run acceptance gates."
)
})?
};
let path = worktree_path.clone();
// Run gate checks in a blocking thread to avoid stalling the async runtime.
let (gates_passed, gate_output) = tokio::task::spawn_blocking(move || {
// Step 1: Reject if worktree is dirty.
crate::agents::gates::check_uncommitted_changes(&path)?;
// Step 2: Run clippy + tests and return (passed, output).
crate::agents::gates::run_acceptance_gates(&path)
})
.await
.map_err(|e| format!("Gate check task panicked: {e}"))??;
let report = CompletionReport {
summary: summary.to_string(),
gates_passed,
gate_output,
};
// Extract data for pipeline advance, then remove the entry so
// completed agents never appear in list_agents.
let (
tx,
session_id,
project_root_for_advance,
wt_path_for_advance,
merge_failure_reported_for_advance,
) = {
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents.get_mut(&key).ok_or_else(|| {
format!("Agent '{agent_name}' for story '{story_id}' disappeared during gate check")
})?;
agent.completion = Some(report.clone());
let tx = agent.tx.clone();
let sid = agent.session_id.clone();
let pr = agent.project_root.clone();
let wt = agent.worktree_info.as_ref().map(|w| w.path.clone());
let mfr = agent.merge_failure_reported;
agents.remove(&key);
(tx, sid, pr, wt, mfr)
};
// Emit Done so wait_for_agent unblocks.
let _ = tx.send(AgentEvent::Done {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
session_id,
});
// Notify WebSocket clients that the agent is gone.
Self::notify_agent_state_changed(&self.watcher_tx);
// Advance the pipeline state machine in a background task.
let pool_clone = Self {
agents: Arc::clone(&self.agents),
port: self.port,
child_killers: Arc::clone(&self.child_killers),
watcher_tx: self.watcher_tx.clone(),
merge_jobs: Arc::clone(&self.merge_jobs),
};
let sid = story_id.to_string();
let aname = agent_name.to_string();
let report_for_advance = report.clone();
tokio::spawn(async move {
pool_clone
.run_pipeline_advance(
&sid,
&aname,
report_for_advance,
project_root_for_advance,
wt_path_for_advance,
merge_failure_reported_for_advance,
)
.await;
});
Ok(report)
}
}
/// Server-owned completion: runs acceptance gates when an agent process exits
/// normally, and advances the pipeline based on results.
///
/// This is a **free function** (not a method on `AgentPool`) to break the
/// opaque type cycle that would otherwise arise: `start_agent` → spawned task
/// → server-owned completion → pipeline advance → `start_agent`.
///
/// If the agent already has a completion report (e.g. from a legacy
/// `report_completion` call), this is a no-op to avoid double-running gates.
pub(in crate::agents::pool) async fn run_server_owned_completion(
agents: &Arc<Mutex<HashMap<String, StoryAgent>>>,
port: u16,
story_id: &str,
agent_name: &str,
session_id: Option<String>,
watcher_tx: broadcast::Sender<WatcherEvent>,
) {
let key = composite_key(story_id, agent_name);
// Guard: mergemaster agents have their own completion path via
// start_merge_agent_work / run_merge_pipeline. Running server-owned gates
// for a mergemaster would wrongly advance the story to 5_done/ even when
// no squash merge has occurred (e.g. rate-limited exit before the agent
// called start_merge_agent_work). The lifecycle caller is responsible for
// cleaning up the agent entry and triggering auto-assign.
if pipeline_stage(agent_name) == PipelineStage::Mergemaster {
slog!(
"[agents] run_server_owned_completion skipped for mergemaster \
'{story_id}:{agent_name}'; mergemaster completion is handled by \
start_merge_agent_work."
);
return;
}
// Guard: skip if completion was already recorded (legacy path).
{
let lock = match agents.lock() {
Ok(a) => a,
Err(_) => return,
};
match lock.get(&key) {
Some(agent) if agent.completion.is_some() => {
slog!(
"[agents] Completion already recorded for '{story_id}:{agent_name}'; \
skipping server-owned gates."
);
return;
}
Some(_) => {}
None => return,
}
}
// Get worktree path for running gates.
let worktree_path = {
let lock = match agents.lock() {
Ok(a) => a,
Err(_) => return,
};
lock.get(&key)
.and_then(|a| a.worktree_info.as_ref().map(|wt| wt.path.clone()))
};
// Run acceptance gates.
let (gates_passed, gate_output) = if let Some(wt_path) = worktree_path {
let path = wt_path;
match tokio::task::spawn_blocking(move || {
crate::agents::gates::check_uncommitted_changes(&path)?;
// AC5: Fail early if the coder finished with no commits on the feature branch.
// This prevents empty-diff stories from advancing through QA to merge.
if !crate::agents::gates::worktree_has_committed_work(&path) {
return Ok((
false,
"Agent exited with no commits on the feature branch. \
The agent did not produce any code changes."
.to_string(),
));
}
crate::agents::gates::run_acceptance_gates(&path)
})
.await
{
Ok(Ok(result)) => result,
Ok(Err(e)) => (false, e),
Err(e) => (false, format!("Gate check task panicked: {e}")),
}
} else {
(
false,
"No worktree path available to run acceptance gates".to_string(),
)
};
slog!(
"[agents] Server-owned completion for '{story_id}:{agent_name}': gates_passed={gates_passed}"
);
let report = CompletionReport {
summary: "Agent process exited normally".to_string(),
gates_passed,
gate_output,
};
// Store completion report, extract data for pipeline advance, then
// remove the entry so completed agents never appear in list_agents.
let (tx, project_root_for_advance, wt_path_for_advance, merge_failure_reported_for_advance) = {
let mut lock = match agents.lock() {
Ok(a) => a,
Err(_) => return,
};
let agent = match lock.get_mut(&key) {
Some(a) => a,
None => return,
};
agent.completion = Some(report.clone());
agent.session_id = session_id.clone();
let tx = agent.tx.clone();
let pr = agent.project_root.clone();
let wt = agent.worktree_info.as_ref().map(|w| w.path.clone());
let mfr = agent.merge_failure_reported;
lock.remove(&key);
(tx, pr, wt, mfr)
};
// Emit Done so wait_for_agent unblocks.
let _ = tx.send(AgentEvent::Done {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
session_id,
});
// Notify WebSocket clients that the agent is gone.
AgentPool::notify_agent_state_changed(&watcher_tx);
// Advance the pipeline state machine in a background task.
spawn_pipeline_advance(
Arc::clone(agents),
port,
story_id,
agent_name,
report,
project_root_for_advance,
wt_path_for_advance,
watcher_tx,
merge_failure_reported_for_advance,
);
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::super::AgentPool;
use crate::agents::{AgentEvent, AgentStatus, CompletionReport};
use std::path::PathBuf;
use std::process::Command;
fn init_git_repo(repo: &std::path::Path) {
Command::new("git")
.args(["init"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.email", "test@test.com"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.name", "Test"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "--allow-empty", "-m", "init"])
.current_dir(repo)
.output()
.unwrap();
}
// ── report_completion tests ────────────────────────────────────
#[tokio::test]
async fn report_completion_rejects_nonexistent_agent() {
let pool = AgentPool::new_test(3001);
let result = pool.report_completion("no_story", "no_bot", "done").await;
assert!(result.is_err());
let msg = result.unwrap_err();
assert!(msg.contains("No agent"), "unexpected: {msg}");
}
#[tokio::test]
async fn report_completion_rejects_non_running_agent() {
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("s6", "bot", AgentStatus::Completed);
let result = pool.report_completion("s6", "bot", "done").await;
assert!(result.is_err());
let msg = result.unwrap_err();
assert!(
msg.contains("not running"),
"expected 'not running' in: {msg}"
);
}
#[tokio::test]
async fn report_completion_rejects_dirty_worktree() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
// Init a real git repo and make an initial commit
Command::new("git")
.args(["init"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "--allow-empty", "-m", "init"])
.current_dir(repo)
.output()
.unwrap();
// Write an uncommitted file
fs::write(repo.join("dirty.txt"), "not committed").unwrap();
let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_path("s7", "bot", AgentStatus::Running, repo.to_path_buf());
let result = pool.report_completion("s7", "bot", "done").await;
assert!(result.is_err());
let msg = result.unwrap_err();
assert!(
msg.contains("uncommitted"),
"expected 'uncommitted' in: {msg}"
);
}
// ── server-owned completion tests ───────────────────────────────────────────
#[tokio::test]
async fn server_owned_completion_skips_when_already_completed() {
let pool = AgentPool::new_test(3001);
let report = CompletionReport {
summary: "Already done".to_string(),
gates_passed: true,
gate_output: String::new(),
};
pool.inject_test_agent_with_completion(
"s10",
"coder-1",
AgentStatus::Completed,
PathBuf::from("/tmp/nonexistent"),
report,
);
// Subscribe before calling so we can check if Done event was emitted.
let mut rx = pool.subscribe("s10", "coder-1").unwrap();
run_server_owned_completion(
&pool.agents,
pool.port,
"s10",
"coder-1",
Some("sess-1".to_string()),
pool.watcher_tx.clone(),
)
.await;
// Status should remain Completed (unchanged) — no gate re-run.
let agents = pool.agents.lock().unwrap();
let key = composite_key("s10", "coder-1");
let agent = agents.get(&key).unwrap();
assert_eq!(agent.status, AgentStatus::Completed);
// Summary should still be the original, not overwritten.
assert_eq!(agent.completion.as_ref().unwrap().summary, "Already done");
drop(agents);
// No Done event should have been emitted.
assert!(
rx.try_recv().is_err(),
"should not emit Done when completion already exists"
);
}
#[tokio::test]
async fn server_owned_completion_runs_gates_on_clean_worktree() {
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_path(
"s11",
"coder-1",
AgentStatus::Running,
repo.to_path_buf(),
);
let mut rx = pool.subscribe("s11", "coder-1").unwrap();
run_server_owned_completion(
&pool.agents,
pool.port,
"s11",
"coder-1",
Some("sess-2".to_string()),
pool.watcher_tx.clone(),
)
.await;
// Agent entry should be removed from the map after completion.
let agents = pool.agents.lock().unwrap();
let key = composite_key("s11", "coder-1");
assert!(
agents.get(&key).is_none(),
"agent should be removed from map after completion"
);
drop(agents);
// A Done event should have been emitted with the session_id.
let event = rx.try_recv().expect("should emit Done event");
match &event {
AgentEvent::Done { session_id, .. } => {
assert_eq!(*session_id, Some("sess-2".to_string()));
}
other => panic!("expected Done event, got: {other:?}"),
}
}
#[tokio::test]
async fn server_owned_completion_fails_on_dirty_worktree() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
// Create an uncommitted file.
fs::write(repo.join("dirty.txt"), "not committed").unwrap();
let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_path(
"s12",
"coder-1",
AgentStatus::Running,
repo.to_path_buf(),
);
let mut rx = pool.subscribe("s12", "coder-1").unwrap();
run_server_owned_completion(
&pool.agents,
pool.port,
"s12",
"coder-1",
None,
pool.watcher_tx.clone(),
)
.await;
// Agent entry should be removed from the map after completion (even on failure).
let agents = pool.agents.lock().unwrap();
let key = composite_key("s12", "coder-1");
assert!(
agents.get(&key).is_none(),
"agent should be removed from map after failed completion"
);
drop(agents);
// A Done event should have been emitted.
let event = rx.try_recv().expect("should emit Done event");
assert!(
matches!(event, AgentEvent::Done { .. }),
"expected Done event, got: {event:?}"
);
}
#[tokio::test]
async fn server_owned_completion_nonexistent_agent_is_noop() {
let pool = AgentPool::new_test(3001);
// Should not panic or error — just silently return.
run_server_owned_completion(
&pool.agents,
pool.port,
"nonexistent",
"bot",
None,
pool.watcher_tx.clone(),
)
.await;
}
/// Regression test for bug 445: a rate-limited mergemaster exits before
/// calling start_merge_agent_work. run_server_owned_completion must be a
/// no-op for mergemaster agents — it must not run acceptance gates and must
/// not advance the story to 5_done/ even when a passing script/test exists.
///
/// Before the fix: run_server_owned_completion would call run_pipeline_advance
/// for the Mergemaster stage, which ran post-merge tests on master (they pass
/// because nothing changed), then called move_story_to_done — advancing the
/// story without any squash merge having occurred.
#[cfg(unix)]
#[tokio::test]
async fn server_owned_completion_is_noop_for_mergemaster() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let root = tmp.path();
init_git_repo(root);
// Create a passing script/test so post-merge tests would succeed if
// run_pipeline_advance were incorrectly called for this mergemaster.
let script_dir = root.join("script");
fs::create_dir_all(&script_dir).unwrap();
let script_test = script_dir.join("test");
fs::write(&script_test, "#!/usr/bin/env sh\nexit 0\n").unwrap();
let mut perms = fs::metadata(&script_test).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script_test, perms).unwrap();
// Story in 4_merge/ — must NOT be moved to 5_done/.
let merge_dir = root.join(".huskies/work/4_merge");
fs::create_dir_all(&merge_dir).unwrap();
let story_path = merge_dir.join("99_story_merge445.md");
fs::write(&story_path, "---\nname: Merge 445 Test\n---\n").unwrap();
let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_path(
"99_story_merge445",
"mergemaster",
AgentStatus::Running,
root.to_path_buf(),
);
run_server_owned_completion(
&pool.agents,
pool.port,
"99_story_merge445",
"mergemaster",
None,
pool.watcher_tx.clone(),
)
.await;
// Wait briefly in case any background task fires.
tokio::time::sleep(std::time::Duration::from_millis(150)).await;
// Story must remain in 4_merge/ — not moved to 5_done/.
let done_path = root.join(".huskies/work/5_done/99_story_merge445.md");
assert!(
!done_path.exists(),
"Story must NOT be moved to 5_done/ when run_server_owned_completion \
is (incorrectly) called for a mergemaster agent"
);
assert!(
story_path.exists(),
"Story must remain in 4_merge/ when mergemaster completion is a no-op"
);
// The agent entry should remain in the pool (lifecycle cleanup is the
// caller's responsibility, not run_server_owned_completion's).
let agents = pool.agents.lock().unwrap();
let key = composite_key("99_story_merge445", "mergemaster");
assert!(
agents.get(&key).is_some(),
"Agent must remain in pool — run_server_owned_completion is a no-op for mergemaster"
);
}
}