2026-04-28 15:16:05 +00:00
|
|
|
//! Server-owned completion: runs acceptance gates when an agent process exits normally.
|
|
|
|
|
use crate::io::watcher::WatcherEvent;
|
|
|
|
|
use crate::slog;
|
|
|
|
|
use std::collections::HashMap;
|
|
|
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
|
use tokio::sync::broadcast;
|
|
|
|
|
|
|
|
|
|
use super::super::super::super::{AgentEvent, CompletionReport, PipelineStage, pipeline_stage};
|
|
|
|
|
use super::super::super::{AgentPool, StoryAgent, composite_key};
|
|
|
|
|
use super::super::advance::spawn_pipeline_advance;
|
|
|
|
|
|
|
|
|
|
/// Server-owned completion: runs acceptance gates when an agent process exits
|
|
|
|
|
/// normally, and advances the pipeline based on results.
|
|
|
|
|
///
|
|
|
|
|
/// This is a **free function** (not a method on `AgentPool`) to break the
|
|
|
|
|
/// opaque type cycle that would otherwise arise: `start_agent` → spawned task
|
|
|
|
|
/// → server-owned completion → pipeline advance → `start_agent`.
|
|
|
|
|
///
|
|
|
|
|
/// If the agent already has a completion report (e.g. from a legacy
|
|
|
|
|
/// `report_completion` call), this is a no-op to avoid double-running gates.
|
|
|
|
|
pub(in crate::agents::pool) async fn run_server_owned_completion(
|
|
|
|
|
agents: &Arc<Mutex<HashMap<String, StoryAgent>>>,
|
|
|
|
|
port: u16,
|
|
|
|
|
story_id: &str,
|
|
|
|
|
agent_name: &str,
|
|
|
|
|
session_id: Option<String>,
|
|
|
|
|
watcher_tx: broadcast::Sender<WatcherEvent>,
|
|
|
|
|
) {
|
|
|
|
|
let key = composite_key(story_id, agent_name);
|
|
|
|
|
|
|
|
|
|
// Guard: mergemaster agents have their own completion path via
|
|
|
|
|
// start_merge_agent_work / run_merge_pipeline. Running server-owned gates
|
|
|
|
|
// for a mergemaster would wrongly advance the story to 5_done/ even when
|
|
|
|
|
// no squash merge has occurred (e.g. rate-limited exit before the agent
|
|
|
|
|
// called start_merge_agent_work). The lifecycle caller is responsible for
|
|
|
|
|
// cleaning up the agent entry and triggering auto-assign.
|
|
|
|
|
if pipeline_stage(agent_name) == PipelineStage::Mergemaster {
|
|
|
|
|
slog!(
|
|
|
|
|
"[agents] run_server_owned_completion skipped for mergemaster \
|
|
|
|
|
'{story_id}:{agent_name}'; mergemaster completion is handled by \
|
|
|
|
|
start_merge_agent_work."
|
|
|
|
|
);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Guard: skip if completion was already recorded (legacy path).
|
|
|
|
|
{
|
|
|
|
|
let lock = match agents.lock() {
|
|
|
|
|
Ok(a) => a,
|
|
|
|
|
Err(_) => return,
|
|
|
|
|
};
|
|
|
|
|
match lock.get(&key) {
|
|
|
|
|
Some(agent) if agent.completion.is_some() => {
|
|
|
|
|
slog!(
|
|
|
|
|
"[agents] Completion already recorded for '{story_id}:{agent_name}'; \
|
|
|
|
|
skipping server-owned gates."
|
|
|
|
|
);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
Some(_) => {}
|
|
|
|
|
None => return,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Get worktree path for running gates.
|
|
|
|
|
let worktree_path = {
|
|
|
|
|
let lock = match agents.lock() {
|
|
|
|
|
Ok(a) => a,
|
|
|
|
|
Err(_) => return,
|
|
|
|
|
};
|
|
|
|
|
lock.get(&key)
|
|
|
|
|
.and_then(|a| a.worktree_info.as_ref().map(|wt| wt.path.clone()))
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Kill any in-flight cargo test processes for this worktree so they don't
|
|
|
|
|
// hold the build lock while gates try to run.
|
|
|
|
|
if let Some(wt_path) = worktree_path.as_ref()
|
|
|
|
|
&& let Ok(output) = std::process::Command::new("pgrep")
|
|
|
|
|
.args([
|
|
|
|
|
"-f",
|
|
|
|
|
&format!("--manifest-path {}/Cargo.toml", wt_path.display()),
|
|
|
|
|
])
|
|
|
|
|
.output()
|
|
|
|
|
{
|
|
|
|
|
let pids = String::from_utf8_lossy(&output.stdout);
|
|
|
|
|
for pid_str in pids.lines() {
|
|
|
|
|
if let Ok(pid) = pid_str.trim().parse::<i32>() {
|
|
|
|
|
crate::slog!(
|
|
|
|
|
"[agents] Killing stale cargo process (pid {pid}) for '{story_id}' before running gates"
|
|
|
|
|
);
|
|
|
|
|
unsafe {
|
|
|
|
|
libc::kill(pid, libc::SIGKILL);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Run acceptance gates.
|
|
|
|
|
let (gates_passed, gate_output) = if let Some(wt_path) = worktree_path {
|
|
|
|
|
let path = wt_path;
|
|
|
|
|
match tokio::task::spawn_blocking(move || {
|
|
|
|
|
// If the worktree is dirty, check whether committed work survived.
|
|
|
|
|
// An agent crash (e.g. Claude Code CLI's `output.write(&bytes).is_ok()`
|
|
|
|
|
// assertion — bug 645) can leave uncommitted files behind even though
|
|
|
|
|
// the agent already committed valid work. In that case, stash the
|
|
|
|
|
// dirty files and proceed with gates on the committed code.
|
|
|
|
|
// Uncommitted work is never junk — it may be the next agent session's
|
|
|
|
|
// starting point (bug 651).
|
|
|
|
|
let stashed =
|
|
|
|
|
if let Err(dirty_msg) = crate::agents::gates::check_uncommitted_changes(&path) {
|
|
|
|
|
if crate::agents::gates::worktree_has_committed_work(&path) {
|
|
|
|
|
crate::slog!(
|
|
|
|
|
"[agents] Worktree dirty but committed work exists — \
|
|
|
|
|
stashing uncommitted changes and proceeding with gates. \
|
|
|
|
|
Dirty state: {dirty_msg}"
|
|
|
|
|
);
|
|
|
|
|
// Stash dirty files so gates run against committed code only.
|
|
|
|
|
// They will be restored after gates complete.
|
|
|
|
|
std::process::Command::new("git")
|
|
|
|
|
.args([
|
|
|
|
|
"stash",
|
|
|
|
|
"push",
|
|
|
|
|
"--include-untracked",
|
|
|
|
|
"-m",
|
|
|
|
|
"server-completion-temp",
|
|
|
|
|
])
|
|
|
|
|
.current_dir(&path)
|
|
|
|
|
.output()
|
|
|
|
|
.map(|o| {
|
|
|
|
|
o.status.success()
|
|
|
|
|
&& !String::from_utf8_lossy(&o.stdout)
|
|
|
|
|
.contains("No local changes to save")
|
|
|
|
|
})
|
|
|
|
|
.unwrap_or(false)
|
|
|
|
|
} else {
|
|
|
|
|
return Ok((false, dirty_msg));
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
false
|
|
|
|
|
};
|
|
|
|
|
// AC5: Fail early if the coder finished with no commits on the feature branch.
|
|
|
|
|
// This prevents empty-diff stories from advancing through QA to merge.
|
|
|
|
|
if !crate::agents::gates::worktree_has_committed_work(&path) {
|
|
|
|
|
if stashed {
|
|
|
|
|
let _ = std::process::Command::new("git")
|
|
|
|
|
.args(["stash", "pop"])
|
|
|
|
|
.current_dir(&path)
|
|
|
|
|
.output();
|
|
|
|
|
}
|
|
|
|
|
return Ok((
|
|
|
|
|
false,
|
|
|
|
|
"Agent exited with no commits on the feature branch. \
|
|
|
|
|
The agent did not produce any code changes."
|
|
|
|
|
.to_string(),
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
let result = crate::agents::gates::run_acceptance_gates(&path);
|
|
|
|
|
// Restore stashed uncommitted changes.
|
|
|
|
|
if stashed {
|
|
|
|
|
let _ = std::process::Command::new("git")
|
|
|
|
|
.args(["stash", "pop"])
|
|
|
|
|
.current_dir(&path)
|
|
|
|
|
.output();
|
|
|
|
|
}
|
|
|
|
|
result
|
|
|
|
|
})
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(Ok(result)) => result,
|
|
|
|
|
Ok(Err(e)) => (false, e),
|
|
|
|
|
Err(e) => (false, format!("Gate check task panicked: {e}")),
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
(
|
|
|
|
|
false,
|
|
|
|
|
"No worktree path available to run acceptance gates".to_string(),
|
|
|
|
|
)
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
slog!(
|
|
|
|
|
"[agents] Server-owned completion for '{story_id}:{agent_name}': gates_passed={gates_passed}"
|
|
|
|
|
);
|
|
|
|
|
|
2026-04-29 21:28:41 +00:00
|
|
|
// Notify chat transports of the agent completion result.
|
|
|
|
|
let _ = watcher_tx.send(WatcherEvent::AgentCompleted {
|
|
|
|
|
story_id: story_id.to_string(),
|
|
|
|
|
agent_name: agent_name.to_string(),
|
|
|
|
|
success: gates_passed,
|
|
|
|
|
});
|
|
|
|
|
|
2026-04-28 15:16:05 +00:00
|
|
|
let report = CompletionReport {
|
|
|
|
|
summary: "Agent process exited normally".to_string(),
|
|
|
|
|
gates_passed,
|
|
|
|
|
gate_output,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Store completion report, extract data for pipeline advance, then
|
|
|
|
|
// remove the entry so completed agents never appear in list_agents.
|
|
|
|
|
let (tx, project_root_for_advance, wt_path_for_advance, merge_failure_reported_for_advance) = {
|
|
|
|
|
let mut lock = match agents.lock() {
|
|
|
|
|
Ok(a) => a,
|
|
|
|
|
Err(_) => return,
|
|
|
|
|
};
|
|
|
|
|
let agent = match lock.get_mut(&key) {
|
|
|
|
|
Some(a) => a,
|
|
|
|
|
None => return,
|
|
|
|
|
};
|
|
|
|
|
agent.completion = Some(report.clone());
|
|
|
|
|
agent.session_id = session_id.clone();
|
|
|
|
|
let tx = agent.tx.clone();
|
|
|
|
|
let pr = agent.project_root.clone();
|
|
|
|
|
let wt = agent.worktree_info.as_ref().map(|w| w.path.clone());
|
|
|
|
|
let mfr = agent.merge_failure_reported;
|
|
|
|
|
lock.remove(&key);
|
|
|
|
|
(tx, pr, wt, mfr)
|
|
|
|
|
};
|
|
|
|
|
// The completed session's ID is used to resume if gates fail.
|
|
|
|
|
let previous_session_id = session_id.clone();
|
|
|
|
|
|
|
|
|
|
// Emit Done so wait_for_agent unblocks.
|
|
|
|
|
let _ = tx.send(AgentEvent::Done {
|
|
|
|
|
story_id: story_id.to_string(),
|
|
|
|
|
agent_name: agent_name.to_string(),
|
|
|
|
|
session_id,
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Notify WebSocket clients that the agent is gone.
|
|
|
|
|
AgentPool::notify_agent_state_changed(&watcher_tx);
|
|
|
|
|
|
|
|
|
|
// Advance the pipeline state machine in a background task.
|
|
|
|
|
spawn_pipeline_advance(
|
|
|
|
|
Arc::clone(agents),
|
|
|
|
|
port,
|
|
|
|
|
story_id,
|
|
|
|
|
agent_name,
|
|
|
|
|
report,
|
|
|
|
|
project_root_for_advance,
|
|
|
|
|
wt_path_for_advance,
|
|
|
|
|
watcher_tx,
|
|
|
|
|
merge_failure_reported_for_advance,
|
|
|
|
|
previous_session_id,
|
|
|
|
|
);
|
|
|
|
|
}
|