Server-owned agent completion: remove report_completion dependency
When an agent process exits normally, the server now automatically runs acceptance gates (uncommitted changes check + cargo clippy + tests) and advances the pipeline based on results. This replaces the previous model where agents had to explicitly call report_completion as an MCP tool. Changes: - Add run_server_owned_completion() free function in agents.rs that runs gates on process exit, stores a CompletionReport, and advances pipeline - Wire it into start_agent's spawned task (replaces simple status setting) - Remove report_completion from MCP tools list and handler (mcp.rs) - Update default_agent_prompt() to not reference report_completion - Update all agent prompts in project.toml (supervisor, coders, qa, mergemaster) to reflect server-owned completion - Add guard: skip gates if completion was already recorded (legacy path) - Add 4 new tests for server-owned completion behavior - Update tools_list test (26 tools, report_completion excluded) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -93,7 +93,10 @@ pub fn pipeline_stage(agent_name: &str) -> PipelineStage {
|
||||
}
|
||||
}
|
||||
|
||||
/// Report produced by an agent calling `report_completion`.
|
||||
/// Completion report produced when acceptance gates are run.
|
||||
///
|
||||
/// Created automatically by the server when an agent process exits normally,
|
||||
/// or via the internal `report_completion` method.
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct CompletionReport {
|
||||
pub summary: String,
|
||||
@@ -263,6 +266,7 @@ impl AgentPool {
|
||||
let cwd = wt_path_str.clone();
|
||||
let key_clone = key.clone();
|
||||
let log_clone = event_log.clone();
|
||||
let port_for_task = self.port;
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let _ = tx_clone.send(AgentEvent::Status {
|
||||
@@ -277,17 +281,16 @@ impl AgentPool {
|
||||
.await
|
||||
{
|
||||
Ok(session_id) => {
|
||||
if let Ok(mut agents) = agents_ref.lock()
|
||||
&& let Some(agent) = agents.get_mut(&key_clone)
|
||||
{
|
||||
agent.status = AgentStatus::Completed;
|
||||
agent.session_id = session_id.clone();
|
||||
}
|
||||
let _ = tx_clone.send(AgentEvent::Done {
|
||||
story_id: sid.clone(),
|
||||
agent_name: aname.clone(),
|
||||
// Server-owned completion: run acceptance gates automatically
|
||||
// when the agent process exits normally.
|
||||
run_server_owned_completion(
|
||||
&agents_ref,
|
||||
port_for_task,
|
||||
&sid,
|
||||
&aname,
|
||||
session_id,
|
||||
});
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(e) => {
|
||||
if let Ok(mut agents) = agents_ref.lock()
|
||||
@@ -747,13 +750,19 @@ impl AgentPool {
|
||||
}
|
||||
}
|
||||
|
||||
/// Report that an agent has finished work on a story.
|
||||
/// Internal: report that an agent has finished work on a story.
|
||||
///
|
||||
/// **Note:** This is no longer exposed as an MCP tool. The server now
|
||||
/// automatically runs completion gates when an agent process exits
|
||||
/// (see `run_server_owned_completion`). This method is retained for
|
||||
/// backwards compatibility and testing.
|
||||
///
|
||||
/// - Rejects with an error if the worktree has uncommitted changes.
|
||||
/// - Runs acceptance gates (cargo clippy + cargo nextest run / cargo test).
|
||||
/// - Stores the `CompletionReport` on the agent record.
|
||||
/// - Transitions status to `Completed` (gates passed) or `Failed` (gates failed).
|
||||
/// - Emits a `Done` event so `wait_for_agent` unblocks.
|
||||
#[allow(dead_code)]
|
||||
pub async fn report_completion(
|
||||
&self,
|
||||
story_id: &str,
|
||||
@@ -1040,6 +1049,134 @@ impl AgentPool {
|
||||
}
|
||||
}
|
||||
|
||||
/// Server-owned completion: runs acceptance gates when an agent process exits
|
||||
/// normally, and advances the pipeline based on results.
|
||||
///
|
||||
/// This is a **free function** (not a method on `AgentPool`) to break the
|
||||
/// opaque type cycle that would otherwise arise: `start_agent` → spawned task
|
||||
/// → server-owned completion → pipeline advance → `start_agent`.
|
||||
///
|
||||
/// If the agent already has a completion report (e.g. from a legacy
|
||||
/// `report_completion` call), this is a no-op to avoid double-running gates.
|
||||
async fn run_server_owned_completion(
|
||||
agents: &Arc<Mutex<HashMap<String, StoryAgent>>>,
|
||||
port: u16,
|
||||
story_id: &str,
|
||||
agent_name: &str,
|
||||
session_id: Option<String>,
|
||||
) {
|
||||
let key = composite_key(story_id, agent_name);
|
||||
|
||||
// Guard: skip if completion was already recorded (legacy path).
|
||||
{
|
||||
let lock = match agents.lock() {
|
||||
Ok(a) => a,
|
||||
Err(_) => return,
|
||||
};
|
||||
match lock.get(&key) {
|
||||
Some(agent) if agent.completion.is_some() => {
|
||||
eprintln!(
|
||||
"[agents] Completion already recorded for '{story_id}:{agent_name}'; \
|
||||
skipping server-owned gates."
|
||||
);
|
||||
return;
|
||||
}
|
||||
Some(_) => {}
|
||||
None => return,
|
||||
}
|
||||
}
|
||||
|
||||
// Get worktree path for running gates.
|
||||
let worktree_path = {
|
||||
let lock = match agents.lock() {
|
||||
Ok(a) => a,
|
||||
Err(_) => return,
|
||||
};
|
||||
lock.get(&key)
|
||||
.and_then(|a| a.worktree_info.as_ref().map(|wt| wt.path.clone()))
|
||||
};
|
||||
|
||||
// Run acceptance gates.
|
||||
let (gates_passed, gate_output) = if let Some(wt_path) = worktree_path {
|
||||
let path = wt_path;
|
||||
match tokio::task::spawn_blocking(move || {
|
||||
check_uncommitted_changes(&path)?;
|
||||
run_acceptance_gates(&path)
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(Ok(result)) => result,
|
||||
Ok(Err(e)) => (false, e),
|
||||
Err(e) => (false, format!("Gate check task panicked: {e}")),
|
||||
}
|
||||
} else {
|
||||
(
|
||||
false,
|
||||
"No worktree path available to run acceptance gates".to_string(),
|
||||
)
|
||||
};
|
||||
|
||||
eprintln!(
|
||||
"[agents] Server-owned completion for '{story_id}:{agent_name}': gates_passed={gates_passed}"
|
||||
);
|
||||
|
||||
let report = CompletionReport {
|
||||
summary: "Agent process exited normally".to_string(),
|
||||
gates_passed,
|
||||
gate_output,
|
||||
};
|
||||
|
||||
// Store completion report and set status.
|
||||
let tx = {
|
||||
let mut lock = match agents.lock() {
|
||||
Ok(a) => a,
|
||||
Err(_) => return,
|
||||
};
|
||||
let agent = match lock.get_mut(&key) {
|
||||
Some(a) => a,
|
||||
None => return,
|
||||
};
|
||||
agent.completion = Some(report);
|
||||
agent.session_id = session_id.clone();
|
||||
agent.status = if gates_passed {
|
||||
AgentStatus::Completed
|
||||
} else {
|
||||
AgentStatus::Failed
|
||||
};
|
||||
agent.tx.clone()
|
||||
};
|
||||
|
||||
// Emit Done so wait_for_agent unblocks.
|
||||
let _ = tx.send(AgentEvent::Done {
|
||||
story_id: story_id.to_string(),
|
||||
agent_name: agent_name.to_string(),
|
||||
session_id,
|
||||
});
|
||||
|
||||
// Advance the pipeline state machine in a background task.
|
||||
// Uses a non-async helper to break the opaque type cycle.
|
||||
spawn_pipeline_advance(Arc::clone(agents), port, story_id, agent_name);
|
||||
}
|
||||
|
||||
/// Spawn pipeline advancement as a background task.
|
||||
///
|
||||
/// This is a **non-async** function so it does not participate in the opaque
|
||||
/// type cycle between `start_agent` and `run_server_owned_completion`.
|
||||
fn spawn_pipeline_advance(
|
||||
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
|
||||
port: u16,
|
||||
story_id: &str,
|
||||
agent_name: &str,
|
||||
) {
|
||||
let sid = story_id.to_string();
|
||||
let aname = agent_name.to_string();
|
||||
tokio::spawn(async move {
|
||||
let pool = AgentPool { agents, port };
|
||||
pool.run_pipeline_advance_for_completed_agent(&sid, &aname)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
/// Result of a mergemaster merge operation.
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct MergeReport {
|
||||
@@ -1287,8 +1424,8 @@ fn check_uncommitted_changes(path: &Path) -> Result<(), String> {
|
||||
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||
if !stdout.trim().is_empty() {
|
||||
return Err(format!(
|
||||
"Worktree has uncommitted changes. Commit your work before calling \
|
||||
report_completion:\n{stdout}"
|
||||
"Worktree has uncommitted changes. Please commit all work before \
|
||||
the agent exits:\n{stdout}"
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
@@ -1916,6 +2053,148 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
// ── server-owned completion tests ───────────────────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn server_owned_completion_skips_when_already_completed() {
|
||||
let pool = AgentPool::new(3001);
|
||||
let report = CompletionReport {
|
||||
summary: "Already done".to_string(),
|
||||
gates_passed: true,
|
||||
gate_output: String::new(),
|
||||
};
|
||||
pool.inject_test_agent_with_completion(
|
||||
"s10",
|
||||
"coder-1",
|
||||
AgentStatus::Completed,
|
||||
PathBuf::from("/tmp/nonexistent"),
|
||||
report,
|
||||
);
|
||||
|
||||
// Subscribe before calling so we can check if Done event was emitted.
|
||||
let mut rx = pool.subscribe("s10", "coder-1").unwrap();
|
||||
|
||||
run_server_owned_completion(&pool.agents, pool.port, "s10", "coder-1", Some("sess-1".to_string()))
|
||||
.await;
|
||||
|
||||
// Status should remain Completed (unchanged) — no gate re-run.
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("s10", "coder-1");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert_eq!(agent.status, AgentStatus::Completed);
|
||||
// Summary should still be the original, not overwritten.
|
||||
assert_eq!(
|
||||
agent.completion.as_ref().unwrap().summary,
|
||||
"Already done"
|
||||
);
|
||||
drop(agents);
|
||||
|
||||
// No Done event should have been emitted.
|
||||
assert!(
|
||||
rx.try_recv().is_err(),
|
||||
"should not emit Done when completion already exists"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn server_owned_completion_runs_gates_on_clean_worktree() {
|
||||
use tempfile::tempdir;
|
||||
|
||||
let tmp = tempdir().unwrap();
|
||||
let repo = tmp.path();
|
||||
init_git_repo(repo);
|
||||
|
||||
let pool = AgentPool::new(3001);
|
||||
pool.inject_test_agent_with_path(
|
||||
"s11",
|
||||
"coder-1",
|
||||
AgentStatus::Running,
|
||||
repo.to_path_buf(),
|
||||
);
|
||||
|
||||
let mut rx = pool.subscribe("s11", "coder-1").unwrap();
|
||||
|
||||
run_server_owned_completion(&pool.agents, pool.port, "s11", "coder-1", Some("sess-2".to_string()))
|
||||
.await;
|
||||
|
||||
// Completion report should exist (gates were run, though they may fail
|
||||
// because this is not a real Cargo project).
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("s11", "coder-1");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert!(
|
||||
agent.completion.is_some(),
|
||||
"completion report should be created"
|
||||
);
|
||||
assert_eq!(
|
||||
agent.completion.as_ref().unwrap().summary,
|
||||
"Agent process exited normally"
|
||||
);
|
||||
// Session ID should be stored.
|
||||
assert_eq!(agent.session_id, Some("sess-2".to_string()));
|
||||
// Status should be terminal (Completed or Failed depending on gate results).
|
||||
assert!(
|
||||
agent.status == AgentStatus::Completed || agent.status == AgentStatus::Failed,
|
||||
"status should be terminal, got: {:?}",
|
||||
agent.status
|
||||
);
|
||||
drop(agents);
|
||||
|
||||
// A Done event should have been emitted.
|
||||
let event = rx.try_recv().expect("should emit Done event");
|
||||
assert!(
|
||||
matches!(event, AgentEvent::Done { .. }),
|
||||
"expected Done event, got: {event:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn server_owned_completion_fails_on_dirty_worktree() {
|
||||
use std::fs;
|
||||
use tempfile::tempdir;
|
||||
|
||||
let tmp = tempdir().unwrap();
|
||||
let repo = tmp.path();
|
||||
init_git_repo(repo);
|
||||
// Create an uncommitted file.
|
||||
fs::write(repo.join("dirty.txt"), "not committed").unwrap();
|
||||
|
||||
let pool = AgentPool::new(3001);
|
||||
pool.inject_test_agent_with_path(
|
||||
"s12",
|
||||
"coder-1",
|
||||
AgentStatus::Running,
|
||||
repo.to_path_buf(),
|
||||
);
|
||||
|
||||
run_server_owned_completion(&pool.agents, pool.port, "s12", "coder-1", None)
|
||||
.await;
|
||||
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let key = composite_key("s12", "coder-1");
|
||||
let agent = agents.get(&key).unwrap();
|
||||
assert!(agent.completion.is_some());
|
||||
assert!(!agent.completion.as_ref().unwrap().gates_passed);
|
||||
assert_eq!(agent.status, AgentStatus::Failed);
|
||||
assert!(
|
||||
agent
|
||||
.completion
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.gate_output
|
||||
.contains("uncommitted"),
|
||||
"gate_output should mention uncommitted changes"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn server_owned_completion_nonexistent_agent_is_noop() {
|
||||
let pool = AgentPool::new(3001);
|
||||
// Should not panic or error — just silently return.
|
||||
run_server_owned_completion(&pool.agents, pool.port, "nonexistent", "bot", None)
|
||||
.await;
|
||||
}
|
||||
|
||||
// ── move_story_to_current tests ────────────────────────────────────────────
|
||||
// No git repo needed: the watcher handles commits asynchronously.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user