Files
huskies/server/src/agents/pool/start/spawn.rs
T

1152 lines
48 KiB
Rust
Raw Normal View History

//! Background async work spawned by `AgentPool::start_agent`.
//!
//! `start_agent` returns immediately after registering the agent as `Pending`;
//! this module runs the slow worktree creation, agent process launch, and
//! event streaming in the background (story 157).
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use portable_pty::ChildKiller;
use tokio::sync::broadcast;
use crate::agent_log::AgentLogWriter;
use crate::config::ProjectConfig;
2026-04-29 21:35:55 +00:00
use crate::http::context::AppContext;
use crate::io::watcher::WatcherEvent;
2026-04-30 00:31:08 +00:00
use crate::{slog, slog_error};
use super::super::super::runtime::{
AgentRuntime, ClaudeCodeRuntime, GeminiRuntime, OpenAiRuntime, RuntimeContext,
};
use super::super::super::{
AgentEvent, AgentStatus, PipelineStage, agent_config_stage, pipeline_stage,
};
use super::super::AgentPool;
use super::super::types::StoryAgent;
/// Maximum bytes of gate output to include in the `--append-system-prompt`
/// failure section. The tail is preserved (where errors appear).
const GATE_OUTPUT_PROMPT_BYTES: usize = 3_000;
/// Truncate `output` to at most [`GATE_OUTPUT_PROMPT_BYTES`], keeping the tail.
2026-05-12 17:49:44 +00:00
#[allow(clippy::string_slice)] // adjusted is walked forward to a char boundary before slicing
fn truncate_for_system_prompt(output: &str) -> &str {
if output.len() <= GATE_OUTPUT_PROMPT_BYTES {
return output;
}
let start = output.len() - GATE_OUTPUT_PROMPT_BYTES;
let mut adjusted = start;
while !output.is_char_boundary(adjusted) {
adjusted += 1;
}
&output[adjusted..]
}
/// Inject a gate-failure section into the `--append-system-prompt` CLI arg.
///
/// If a `--append-system-prompt` pair already exists, appends to its value.
/// Otherwise adds a new `--append-system-prompt <section>` pair.
fn inject_gate_failure_section(args: &mut Vec<String>, gate_output: &str) {
let section = format!(
"Your previous run's quality gates failed:\n{}",
truncate_for_system_prompt(gate_output)
);
if let Some(pos) = args.iter().position(|a| a == "--append-system-prompt")
&& let Some(val) = args.get_mut(pos + 1)
{
val.push_str("\n\n");
val.push_str(&section);
return;
}
args.push("--append-system-prompt".to_string());
args.push(section);
}
2026-05-13 13:54:27 +00:00
/// Cap `--max-turns` and `--max-budget-usd` for merge-gate fixup sessions (story 981).
///
/// When [`ContentKey::MergeFixupPending`] is set the fixup coder must not run
/// longer than 20 turns or spend more than $1. If the agent config already
/// specifies lower values those are preserved; otherwise the fixup caps take
/// precedence by overriding the args in place.
pub(super) fn maybe_cap_for_merge_fixup(args: &mut Vec<String>, story_id: &str) {
if crate::db::read_content(crate::db::ContentKey::MergeFixupPending(story_id)).is_none() {
return;
}
// Override --max-turns: set to 20 unless already lower.
const FIXUP_MAX_TURNS: u32 = 20;
if let Some(pos) = args.iter().position(|a| a == "--max-turns") {
if let Some(val) = args.get_mut(pos + 1) {
let current: u32 = val.parse().unwrap_or(u32::MAX);
if current > FIXUP_MAX_TURNS {
*val = FIXUP_MAX_TURNS.to_string();
}
}
} else {
args.push("--max-turns".to_string());
args.push(FIXUP_MAX_TURNS.to_string());
}
// Override --max-budget-usd: set to 1.0 unless already lower.
const FIXUP_MAX_BUDGET: f64 = 1.0;
if let Some(pos) = args.iter().position(|a| a == "--max-budget-usd") {
if let Some(val) = args.get_mut(pos + 1) {
let current: f64 = val.parse().unwrap_or(f64::MAX);
if current > FIXUP_MAX_BUDGET {
*val = FIXUP_MAX_BUDGET.to_string();
}
}
} else {
args.push("--max-budget-usd".to_string());
args.push(FIXUP_MAX_BUDGET.to_string());
}
}
/// On retry spawns (retry_count > 0), read the stored gate_output from the DB
/// and inject it into `--append-system-prompt` so the agent always sees the
/// prior failure context, even when session-resuming (story 881).
pub(super) fn maybe_inject_gate_failure(args: &mut Vec<String>, story_id: &str) {
let retry_count = crate::crdt_state::read_item(story_id)
2026-05-12 17:03:41 +00:00
.map(|item| item.retry_count())
.unwrap_or(0);
if retry_count > 0
2026-05-13 11:22:57 +00:00
&& let Some(gate_output) =
crate::db::read_content(crate::db::ContentKey::GateOutput(story_id))
{
inject_gate_failure_section(args, &gate_output);
}
}
/// Run the background worktree-creation + agent-launch flow.
///
/// Caller (`AgentPool::start_agent`) wraps this in `tokio::spawn` and stores
/// the resulting handle on the Pending entry so cancellation works.
#[allow(clippy::too_many_arguments)]
pub(super) async fn run_agent_spawn(
project_root: PathBuf,
config: ProjectConfig,
resume_context: Option<String>,
session_id_to_resume: Option<String>,
story_id: String,
agent_name: String,
tx: broadcast::Sender<AgentEvent>,
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
key: String,
event_log: Arc<Mutex<Vec<AgentEvent>>>,
port: u16,
log_writer: Option<Arc<Mutex<AgentLogWriter>>>,
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
watcher_tx: broadcast::Sender<WatcherEvent>,
inactivity_timeout_secs: u64,
// Formatted `<recent-events>` block drained from the previous session's
// buffer. Prepended to the first agent turn so the agent sees what
// happened while it was idle (story 736). `None` when there were no
// buffered events.
buffered_events_block: Option<String>,
2026-04-29 21:35:55 +00:00
app_ctx: Option<Arc<AppContext>>,
) {
// Re-bind to the legacy `_clone` / `_owned` names so the body below remains
// a verbatim copy of the original closure (story 157).
let project_root_clone = project_root;
let config_clone = config;
let resume_context_owned = resume_context;
let session_id_to_resume_owned = session_id_to_resume;
let sid = story_id;
let aname = agent_name;
let tx_clone = tx;
let agents_ref = agents;
let key_clone = key;
let log_clone = event_log;
let port_for_task = port;
let log_writer_clone = log_writer;
let child_killers_clone = child_killers;
let watcher_tx_clone = watcher_tx;
let _ = inactivity_timeout_secs; // currently unused inside the closure body
2026-05-13 21:37:07 +00:00
// Step 1: wait for the worktree created by the worktree lifecycle subscriber.
// The Coding transition fires before this task is spawned; the subscriber
// creates the worktree asynchronously. Poll until it exists or the deadline.
// Tests use a short deadline (1 s) so the failure path exercises quickly;
// production uses 120 s to allow slow git operations to complete.
#[cfg(not(test))]
let worktree_wait_secs: u64 = 120;
#[cfg(test)]
let worktree_wait_secs: u64 = 1;
let wt_info = {
let wt_path = crate::worktree::worktree_path(&project_root_clone, &sid);
let branch = format!("feature/story-{sid}");
let base_branch = config_clone
.base_branch
.clone()
.unwrap_or_else(|| crate::worktree::detect_base_branch(&project_root_clone));
let deadline =
tokio::time::Instant::now() + std::time::Duration::from_secs(worktree_wait_secs);
loop {
if wt_path.exists() {
break crate::worktree::WorktreeInfo {
path: wt_path,
branch,
base_branch,
};
}
2026-05-13 21:37:07 +00:00
if tokio::time::Instant::now() >= deadline {
let error_msg = format!(
"Worktree for story '{sid}' did not appear within {worktree_wait_secs} s; \
the lifecycle subscriber may have failed."
);
slog_error!("[agents] {error_msg}");
let event = AgentEvent::Error {
story_id: sid.clone(),
agent_name: aname.clone(),
message: error_msg,
};
if let Ok(mut log) = log_clone.lock() {
log.push(event.clone());
}
let _ = tx_clone.send(event);
if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&key_clone)
{
agent.status = AgentStatus::Failed;
}
AgentPool::notify_agent_state_changed(&watcher_tx_clone);
return;
}
2026-05-13 21:37:07 +00:00
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
};
2026-05-13 14:38:55 +00:00
// Step 1.1: Install the pre-commit quality-gate hook in the worktree.
// Non-fatal — if installation fails the agent can still run; the failure
// is logged so the operator can investigate.
if let Err(e) = crate::worktree::install_pre_commit_hook(&wt_info.path) {
slog_error!("[agents] pre-commit hook install failed for {sid}: {e}");
}
2026-04-28 20:22:22 +00:00
// Step 1.5: Update the source map for changed files since master.
// Non-blocking — failures are logged but do not gate the spawn.
{
let wt_path_for_map = wt_info.path.clone();
let base_for_map = wt_info.base_branch.clone();
let map_path = project_root_clone.join(".huskies").join("source-map.json");
match tokio::task::spawn_blocking(move || {
source_map_gen::update_for_worktree(&wt_path_for_map, &base_for_map, &map_path)
})
.await
.unwrap_or_else(|e| Err(e.to_string()))
{
Ok(()) => {}
Err(e) => slog_error!("[agents] source map update for {sid}: {e}"),
}
}
// Step 2: store worktree info and render agent command/args/prompt.
let wt_path_str = wt_info.path.to_string_lossy().to_string();
{
if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&key_clone)
{
agent.worktree_info = Some(wt_info.clone());
}
}
let (command, mut args, mut prompt) = match config_clone.render_agent_args(
&wt_path_str,
&sid,
Some(&aname),
Some(&wt_info.base_branch),
) {
Ok(result) => result,
Err(e) => {
let error_msg = format!("Failed to render agent args: {e}");
slog_error!("[agents] {error_msg}");
let event = AgentEvent::Error {
story_id: sid.clone(),
agent_name: aname.clone(),
message: error_msg,
};
if let Ok(mut log) = log_clone.lock() {
log.push(event.clone());
}
let _ = tx_clone.send(event);
if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&key_clone)
{
agent.status = AgentStatus::Failed;
}
AgentPool::notify_agent_state_changed(&watcher_tx_clone);
return;
}
};
// On retry spawns (retry_count > 0), inject prior gate failure output into
// --append-system-prompt so the agent always sees the failure context (story 881).
maybe_inject_gate_failure(&mut args, &sid);
2026-05-13 13:54:27 +00:00
// Cap turns and budget for merge-gate fixup sessions (story 981).
maybe_cap_for_merge_fixup(&mut args, &sid);
// Append project-local prompt content (.huskies/AGENT.md) to the
// baked-in prompt so every agent role sees project-specific guidance
// without any config changes. The file is read fresh each spawn;
// if absent or empty, the prompt is unchanged and no warning is logged.
if let Some(local) = crate::agents::local_prompt::read_project_local_prompt(&project_root_clone)
{
prompt.push_str("\n\n");
prompt.push_str(&local);
}
2026-04-29 21:41:44 +00:00
// Prepend epic context when the story belongs to an epic (AC3, story 880).
// Story 933: epic linkage is now a typed CRDT register on PipelineItemCrdt.
if let Some(view) = crate::crdt_state::read_item(&sid)
&& let Some(epic_id) = view.epic()
2026-05-13 11:22:57 +00:00
&& let Some(epic_content) = {
let epic_id_str = epic_id.to_string();
crate::db::read_content(crate::db::ContentKey::Story(&epic_id_str))
}
2026-04-29 21:41:44 +00:00
{
let block = format!(
"# Epic Context\n\nThis work item belongs to epic `{epic_id}`.\
The following is the authoritative epic context you must respect:\n\n\
---\n{epic_content}\n---"
);
prompt = format!("{block}\n\n{prompt}");
}
match &session_id_to_resume_owned {
Some(sess_id) => slog!("[agent:{sid}:{aname}] spawn mode=warm session_id={sess_id}"),
None => slog!("[agent:{sid}:{aname}] spawn mode=cold"),
}
// Build the effective prompt and determine resume session.
//
// When resuming a previous session, discard the full rendered prompt
// (which would re-read CLAUDE.md and README) and send only the gate
// failure context as a new message. On a fresh start, append the
// failure context to the original prompt as before.
let (effective_prompt, fresh_prompt) = match &session_id_to_resume_owned {
Some(_) => {
// Keep the full rendered prompt as fallback if resume fails.
let fallback = prompt;
// claude-code's --resume requires a non-empty -p prompt unless the
// resumed session has a deferred-tool marker. Watchdog-killed
// sessions don't have one, so an empty -p aborts the CLI with
// "No deferred tool marker found... Provide a prompt to continue
// the conversation." When resume succeeds, the agent already has
// its full prior conversation restored (no need to point at
// PLAN.md — that's the cold-start orientation path). A brief
// "continue" nudge is all the CLI needs.
let resume_msg = resume_context_owned
.filter(|s| !s.is_empty())
.unwrap_or_else(|| "Continue your prior work.".to_string());
(resume_msg, Some(fallback))
}
None => {
if let Some(ctx) = resume_context_owned {
prompt.push_str(&ctx);
}
(prompt, None)
}
};
// Prepend buffered pipeline events from the previous idle period so the
// agent sees what happened while it was not running (story 736).
let effective_prompt = match buffered_events_block {
Some(block) if !block.is_empty() => format!("{block}\n\n{effective_prompt}"),
_ => effective_prompt,
};
// Step 3: transition to Running now that the worktree is ready.
{
if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&key_clone)
{
agent.status = AgentStatus::Running;
}
}
let _ = tx_clone.send(AgentEvent::Status {
story_id: sid.clone(),
agent_name: aname.clone(),
status: "running".to_string(),
});
AgentPool::notify_agent_state_changed(&watcher_tx_clone);
2026-04-29 21:28:41 +00:00
let _ = watcher_tx_clone.send(WatcherEvent::AgentStarted {
story_id: sid.clone(),
agent_name: aname.clone(),
});
// Step 4: launch the agent process via the configured runtime.
let runtime_name = config_clone
.find_agent(&aname)
.and_then(|a| a.runtime.as_deref())
.unwrap_or("claude-code");
2026-05-13 12:34:35 +00:00
// Extract model once so it can be shared across all runtime branches and
// passed to RuntimeContext for eager session recording (bug 967).
let agent_model = config_clone
.find_agent(&aname)
2026-05-13 23:33:30 +00:00
.and_then(|a| a.model.clone());
2026-05-13 12:34:35 +00:00
let run_result = match runtime_name {
"claude-code" => {
let runtime =
ClaudeCodeRuntime::new(child_killers_clone.clone(), watcher_tx_clone.clone());
let ctx = RuntimeContext {
story_id: sid.clone(),
agent_name: aname.clone(),
command,
args,
prompt: effective_prompt,
cwd: wt_path_str,
inactivity_timeout_secs,
2026-04-29 21:35:55 +00:00
app_ctx: app_ctx.clone(),
session_id_to_resume: session_id_to_resume_owned.clone(),
fresh_prompt: fresh_prompt.clone(),
2026-05-13 12:34:35 +00:00
project_root: project_root_clone.clone(),
model: agent_model.clone(),
};
runtime
.start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone)
.await
}
"gemini" => {
let runtime = GeminiRuntime::new();
let ctx = RuntimeContext {
story_id: sid.clone(),
agent_name: aname.clone(),
command,
args,
prompt: effective_prompt,
cwd: wt_path_str,
inactivity_timeout_secs,
2026-04-29 21:35:55 +00:00
app_ctx: app_ctx.clone(),
session_id_to_resume: session_id_to_resume_owned.clone(),
fresh_prompt: fresh_prompt.clone(),
2026-05-13 12:34:35 +00:00
project_root: project_root_clone.clone(),
model: agent_model.clone(),
};
runtime
.start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone)
.await
}
"openai" => {
let runtime = OpenAiRuntime::new();
let ctx = RuntimeContext {
story_id: sid.clone(),
agent_name: aname.clone(),
command,
args,
prompt: effective_prompt,
cwd: wt_path_str,
inactivity_timeout_secs,
2026-04-29 21:35:55 +00:00
app_ctx: app_ctx.clone(),
session_id_to_resume: session_id_to_resume_owned,
fresh_prompt,
2026-05-13 12:34:35 +00:00
project_root: project_root_clone.clone(),
model: agent_model,
};
runtime
.start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone)
.await
}
other => Err(format!(
"Unknown agent runtime '{other}'; check the 'runtime' field in project.toml. \
Supported: 'claude-code', 'gemini', 'openai'"
)),
};
match run_result {
Ok(result) => {
// Persist token usage if the agent reported it.
if let Some(ref usage) = result.token_usage
&& let Ok(agents) = agents_ref.lock()
&& let Some(agent) = agents.get(&key_clone)
&& let Some(ref pr) = agent.project_root
{
2026-05-13 23:33:30 +00:00
let model_for_record = config_clone
.find_agent(&aname)
.and_then(|a| a.model.clone());
2026-05-13 23:33:30 +00:00
let record = crate::agents::token_usage::build_record(
&sid,
&aname,
model_for_record,
usage.clone(),
);
if let Err(e) = crate::agents::token_usage::append_record(pr, &record) {
slog_error!(
"[agents] Failed to persist token usage for \
{sid}:{aname}: {e}"
);
}
}
// Persist session_id so respawns can resume prior reasoning.
if let Some(ref sess_id) = result.session_id {
let model = config_clone
.find_agent(&aname)
2026-05-13 23:33:30 +00:00
.and_then(|a| a.model.as_ref().map(|m| m.as_str()))
.unwrap_or("");
crate::agents::session_store::record_session(
&project_root_clone,
&sid,
&aname,
2026-05-13 23:33:30 +00:00
model,
sess_id,
);
}
// Mergemaster agents have their own completion path via
// start_merge_agent_work / run_merge_pipeline and must NOT go
// through server-owned gates. When a mergemaster exits early
// (e.g. rate-limited before calling start_merge_agent_work) the
// feature-branch worktree compiles fine and post-merge tests on
// master pass (nothing changed), which would wrongly advance the
// story to 5_done/ without any squash merge having occurred.
// Instead: just remove the agent from the pool and let
// auto-assign restart a new mergemaster for the story.
let stage = config_clone
.find_agent(&aname)
.map(agent_config_stage)
.unwrap_or_else(|| pipeline_stage(&aname));
2026-04-30 00:31:08 +00:00
// AC1/AC2 (bug 882): CLI crashed (SIGABRT) before establishing a
// session. Respawn immediately without running gates or incrementing
// retry_count. Cap consecutive crash-respawns at 5 to avoid
// infinite loops; after the cap, block the story with a clear reason.
if result.aborted_signal && stage != PipelineStage::Mergemaster {
const ABORT_RESPAWN_CAP: u32 = 5;
2026-05-13 11:22:57 +00:00
let count = crate::db::read_content(crate::db::ContentKey::AbortRespawnCount(&sid))
2026-04-30 00:31:08 +00:00
.and_then(|s| s.trim().parse::<u32>().ok())
.unwrap_or(0)
+ 1;
2026-05-13 11:22:57 +00:00
crate::db::write_content(
crate::db::ContentKey::AbortRespawnCount(&sid),
&count.to_string(),
);
2026-04-30 00:31:08 +00:00
// Remove the agent entry from the pool and emit Done so that
// any caller blocked on wait_for_agent is unblocked.
let tx_done = {
let mut lock = match agents_ref.lock() {
Ok(a) => a,
Err(_) => return,
};
if let Some(agent) = lock.remove(&key_clone) {
agent.tx
} else {
tx_clone.clone()
}
};
let _ = tx_done.send(AgentEvent::Done {
story_id: sid.clone(),
agent_name: aname.clone(),
session_id: None,
});
AgentPool::notify_agent_state_changed(&watcher_tx_clone);
if count >= ABORT_RESPAWN_CAP {
let reason = format!(
"CLI crashed before establishing a session (signal=Aborted, no session) \
{count} times in a row. Stopping to avoid an infinite respawn loop."
);
slog_error!(
"[agents] Story '{sid}' blocked after {count} consecutive CLI crashes."
);
if let Err(e) = crate::agents::lifecycle::transition_to_blocked(&sid, &reason) {
slog_error!("[agents] Failed to block '{sid}' after abort cap: {e}");
}
let _ = watcher_tx_clone.send(WatcherEvent::StoryBlocked {
story_id: sid.clone(),
reason,
});
} else {
// Prune session_store entries for this story so the next
// spawn starts cold (no `--resume` flag). The crash likely
// came from claude-code choking on the bloated stdio
// replay; resuming again would re-trigger the same abort.
crate::agents::session_store::remove_sessions_for_story(
&project_root_clone,
&sid,
);
2026-04-30 00:31:08 +00:00
slog!(
"[agents] CLI crashed before session for '{sid}:{aname}' \
(abort respawn {count}/{ABORT_RESPAWN_CAP}). \
Pruned session_store and respawning cold without \
consuming a retry slot."
2026-04-30 00:31:08 +00:00
);
let agents_for_respawn = Arc::clone(&agents_ref);
let watcher_for_respawn = watcher_tx_clone.clone();
let sid_r = sid.clone();
let aname_r = aname.clone();
let root_r = project_root_clone.clone();
let port_r = port_for_task;
tokio::spawn(async move {
let pool = AgentPool {
agents: agents_for_respawn,
port: port_r,
child_killers: Arc::new(Mutex::new(HashMap::new())),
watcher_tx: watcher_for_respawn,
status_broadcaster: Arc::new(
crate::service::status::StatusBroadcaster::new(),
),
};
if let Err(e) = pool
.start_agent(&root_r, &sid_r, Some(&aname_r), None, None)
.await
{
slog_error!(
"[agents] Failed to respawn '{aname_r}' for '{sid_r}' \
after CLI crash: {e}"
);
}
});
}
return;
}
// Reset the abort-respawn counter on any non-aborted exit so that
// a single successful run clears the consecutive-crash history.
2026-05-13 11:22:57 +00:00
crate::db::delete_content(crate::db::ContentKey::AbortRespawnCount(&sid));
2026-04-30 00:31:08 +00:00
if stage == PipelineStage::Mergemaster {
2026-05-14 08:41:49 +00:00
let (tx_done, done_session_id, merge_failure_reported, merge_success_reported) = {
let mut lock = match agents_ref.lock() {
Ok(a) => a,
Err(_) => return,
};
if let Some(agent) = lock.remove(&key_clone) {
2026-05-12 16:36:15 +00:00
(
agent.tx,
agent.session_id.or(result.session_id),
agent.merge_failure_reported,
2026-05-14 08:41:49 +00:00
agent.merge_success_reported,
2026-05-12 16:36:15 +00:00
)
} else {
2026-05-14 08:41:49 +00:00
(tx_clone.clone(), result.session_id, false, false)
}
};
2026-05-14 08:41:49 +00:00
// AC1 (bug 1008): Check for a successful merge exit BEFORE the
// transient/failure path. The merge runner sets the flag on the
// agent record and writes a DB fallback key (for the race where
// remove_agents_for_story ran first) before marking the job
// "completed". A clean exit after success → no re-spawn, no
// blocked transition, spawn count reset.
let merge_succeeded = merge_success_reported
|| crate::db::read_content(crate::db::ContentKey::MergeSuccess(&sid)).is_some();
if merge_succeeded {
crate::db::delete_content(crate::db::ContentKey::MergeSuccess(&sid));
// AC3: reset spawn count so future re-entries start fresh.
crate::db::delete_content(crate::db::ContentKey::MergeMasterSpawnCount(&sid));
slog!(
"[agents] Mergemaster '{aname}' for '{sid}' exited after \
successful merge — no re-spawn."
);
let _ = tx_done.send(AgentEvent::Done {
story_id: sid.clone(),
agent_name: aname.clone(),
session_id: done_session_id,
});
AgentPool::notify_agent_state_changed(&watcher_tx_clone);
// Do NOT send WorkItem/reassign — story is already Done.
2026-05-14 14:48:55 +00:00
// Drain one queued ConflictDetected story now that this
// mergemaster slot is free (story 1044).
if let Some((candidate_id, candidate_agent)) =
crate::config::ProjectConfig::load(&project_root_clone)
.ok()
.and_then(|cfg| {
agents_ref.lock().ok().as_ref().and_then(|agts| {
pick_queued_conflict_detected(&cfg, agts, &sid)
})
})
{
slog!(
"[agents] Mergemaster exit for '{sid}' (success): \
queued ConflictDetected story '{candidate_id}' found; \
spawning '{candidate_agent}'."
);
let agents_for_cd = Arc::clone(&agents_ref);
let watcher_for_cd = watcher_tx_clone.clone();
let root_for_cd = project_root_clone.clone();
let port_for_cd = port_for_task;
tokio::spawn(async move {
let pool = AgentPool {
agents: agents_for_cd,
port: port_for_cd,
child_killers: Arc::new(Mutex::new(HashMap::new())),
watcher_tx: watcher_for_cd,
status_broadcaster: Arc::new(
crate::service::status::StatusBroadcaster::new(),
),
};
if let Err(e) = pool
.start_agent(
&root_for_cd,
&candidate_id,
Some(&candidate_agent),
None,
None,
)
.await
{
slog_error!(
"[agents] Failed to spawn '{candidate_agent}' for queued \
ConflictDetected story '{candidate_id}': {e}"
);
}
});
}
2026-05-14 08:41:49 +00:00
return;
}
// Clear any stale Running merge job so the next mergemaster
// can call start_merge_agent_work without hitting "Merge
// already in progress" (bug 498).
2026-04-28 10:19:43 +00:00
if crate::crdt_state::read_merge_job(&sid)
.is_some_and(|job| job.status == "running")
{
2026-04-28 10:19:43 +00:00
crate::crdt_state::delete_merge_job(&sid);
}
2026-05-12 16:36:15 +00:00
// Classify termination: genuine (report_merge_failure called, or
// the transient-respawn budget is exhausted) vs transient
// (watchdog / rate-limit / crash without an explicit give-up call).
// Only mark mergemaster_attempted on a genuine give-up so that
// transient exits can be re-spawned up to the cap (story 920).
const MERGEMASTER_RESPAWN_CAP: u32 = 3;
let is_genuine = if merge_failure_reported {
slog!(
"[agents] Mergemaster '{aname}' for '{sid}' gave up genuinely \
(report_merge_failure called)."
);
true
} else {
2026-05-13 11:22:57 +00:00
let count =
crate::db::read_content(crate::db::ContentKey::MergeMasterSpawnCount(&sid))
.and_then(|s| s.trim().parse::<u32>().ok())
.unwrap_or(0)
+ 1;
crate::db::write_content(
crate::db::ContentKey::MergeMasterSpawnCount(&sid),
&count.to_string(),
);
2026-05-12 16:36:15 +00:00
if count >= MERGEMASTER_RESPAWN_CAP {
slog!(
"[agents] Mergemaster '{aname}' for '{sid}' exhausted \
respawn budget ({count}/{MERGEMASTER_RESPAWN_CAP}); \
marking as permanently blocked."
);
true
} else {
slog!(
"[agents] Mergemaster '{aname}' for '{sid}' terminated \
transiently (spawn {count}/{MERGEMASTER_RESPAWN_CAP}); \
will re-spawn."
);
false
}
};
if is_genuine {
2026-05-13 06:05:01 +00:00
let _ = crate::pipeline_state::apply_transition_str(
&sid,
crate::pipeline_state::PipelineEvent::MergemasterAttempted,
None,
);
2026-05-12 16:36:15 +00:00
}
let _ = tx_done.send(AgentEvent::Done {
story_id: sid.clone(),
agent_name: aname.clone(),
session_id: done_session_id,
});
AgentPool::notify_agent_state_changed(&watcher_tx_clone);
// Send a WorkItem event so the auto-assign watcher loop
// re-dispatches a new mergemaster if the story still needs
// merging. This avoids an async call to start_agent inside
// a tokio::spawn (which would require Send).
let _ = watcher_tx_clone.send(crate::io::watcher::WatcherEvent::WorkItem {
stage: "4_merge".to_string(),
item_id: sid.clone(),
action: "reassign".to_string(),
commit_msg: String::new(),
from_stage: None,
});
2026-05-14 14:48:55 +00:00
// Drain one queued ConflictDetected story now that this
// mergemaster slot is free (story 1044).
if let Some((candidate_id, candidate_agent)) =
crate::config::ProjectConfig::load(&project_root_clone)
.ok()
.and_then(|cfg| {
agents_ref
.lock()
.ok()
.as_ref()
.and_then(|agts| pick_queued_conflict_detected(&cfg, agts, &sid))
})
{
slog!(
"[agents] Mergemaster exit for '{sid}': queued ConflictDetected \
story '{candidate_id}' found; spawning '{candidate_agent}'."
);
let agents_for_cd = Arc::clone(&agents_ref);
let watcher_for_cd = watcher_tx_clone.clone();
let root_for_cd = project_root_clone.clone();
let port_for_cd = port_for_task;
tokio::spawn(async move {
let pool = AgentPool {
agents: agents_for_cd,
port: port_for_cd,
child_killers: Arc::new(Mutex::new(HashMap::new())),
watcher_tx: watcher_for_cd,
status_broadcaster: Arc::new(
crate::service::status::StatusBroadcaster::new(),
),
};
if let Err(e) = pool
.start_agent(
&root_for_cd,
&candidate_id,
Some(&candidate_agent),
None,
None,
)
.await
{
slog_error!(
"[agents] Failed to spawn '{candidate_agent}' for queued \
ConflictDetected story '{candidate_id}': {e}"
);
}
});
}
} else {
// Server-owned completion: run acceptance gates automatically
// when the agent process exits normally.
super::super::pipeline::run_server_owned_completion(
&agents_ref,
port_for_task,
&sid,
&aname,
result.session_id,
watcher_tx_clone.clone(),
)
.await;
AgentPool::notify_agent_state_changed(&watcher_tx_clone);
}
}
Err(e) => {
slog_error!("[agents] Agent process error for {aname} on {sid}: {e}");
let event = AgentEvent::Error {
story_id: sid.clone(),
agent_name: aname.clone(),
message: e,
};
if let Ok(mut log) = log_clone.lock() {
log.push(event.clone());
}
let _ = tx_clone.send(event);
if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&key_clone)
{
agent.status = AgentStatus::Failed;
}
AgentPool::notify_agent_state_changed(&watcher_tx_clone);
}
}
}
2026-05-14 14:48:55 +00:00
/// Find the first story in `Stage::MergeFailure { kind: ConflictDetected }` that
/// has no active mergemaster in the pool (excluding `exclude_story_id`), together
/// with a free mergemaster agent name. Returns `None` if no eligible story exists
/// or if all configured mergemaster agents are currently busy.
///
/// Called from the mergemaster exit handler to drain the queue of waiting
/// ConflictDetected stories one slot at a time (story 1044).
fn pick_queued_conflict_detected(
config: &ProjectConfig,
agents: &HashMap<String, StoryAgent>,
exclude_story_id: &str,
) -> Option<(String, String)> {
use crate::pipeline_state::{MergeFailureKind, Stage};
// Find a free mergemaster agent first; bail early if the pool is saturated.
let agent_name = config
.agent
.iter()
.find(|ac| {
agent_config_stage(ac) == PipelineStage::Mergemaster
&& !agents.values().any(|a| {
a.agent_name == ac.name
&& matches!(a.status, AgentStatus::Running | AgentStatus::Pending)
})
})?
.name
.clone();
// Find the first eligible ConflictDetected story with no active mergemaster.
for item in crate::pipeline_state::read_all_typed() {
if item.story_id.0 == exclude_story_id {
continue;
}
let Stage::MergeFailure {
kind: MergeFailureKind::ConflictDetected(_),
..
} = &item.stage
else {
continue;
};
let has_mergemaster = agents.iter().any(|(key, agt)| {
let key_sid = key.rsplit_once(':').map(|(s, _)| s).unwrap_or(key.as_str());
let agt_stage = config
.find_agent(&agt.agent_name)
.map(agent_config_stage)
.unwrap_or_else(|| pipeline_stage(&agt.agent_name));
key_sid == item.story_id.0
&& agt_stage == PipelineStage::Mergemaster
&& matches!(agt.status, AgentStatus::Running | AgentStatus::Pending)
});
if !has_mergemaster {
return Some((item.story_id.0.clone(), agent_name));
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
/// AC1 + AC3 (story 881): when retry_count=1 and gate_output is stored in
/// the DB, `maybe_inject_gate_failure` injects a failure section beginning
/// `Your previous run's quality gates failed:` into `--append-system-prompt`.
#[test]
fn gate_failure_injected_into_system_prompt_on_retry() {
crate::crdt_state::init_for_test();
crate::db::ensure_content_store();
let story_id = "9960_story_gate_injection_881";
2026-04-30 22:23:21 +00:00
crate::db::write_item_with_content(
story_id,
"2_current",
"---\nname: Test\n---\n",
crate::db::ItemMeta::named("Test"),
2026-04-30 22:23:21 +00:00
);
crate::crdt_state::set_retry_count(story_id, 1);
let gate_output =
"error[E0308]: mismatched types\n --> src/lib.rs:5:10\n = expected i32, found &str";
2026-05-13 11:22:57 +00:00
crate::db::write_content(crate::db::ContentKey::GateOutput(story_id), gate_output);
let mut args: Vec<String> = vec!["--verbose".to_string()];
maybe_inject_gate_failure(&mut args, story_id);
let pos = args
.iter()
.position(|a| a == "--append-system-prompt")
.expect("--append-system-prompt must be present after retry injection");
let value = &args[pos + 1];
assert!(
value.starts_with("Your previous run's quality gates failed:"),
"--append-system-prompt must begin with the failure marker; got: {value}"
);
assert!(
value.contains("mismatched types"),
"--append-system-prompt must contain a snippet of gate_output; got: {value}"
);
}
/// AC2 (story 881): first-attempt spawn (retry_count == 0) must NOT add the
/// failure section to `--append-system-prompt`.
#[test]
fn gate_failure_not_injected_on_first_attempt() {
crate::crdt_state::init_for_test();
crate::db::ensure_content_store();
let story_id = "9961_story_no_gate_injection_881";
2026-04-30 22:23:21 +00:00
crate::db::write_item_with_content(
story_id,
"2_current",
"---\nname: Test\n---\n",
crate::db::ItemMeta::named("Test"),
2026-04-30 22:23:21 +00:00
);
// retry_count is 0 (default — never bumped).
2026-05-13 11:22:57 +00:00
crate::db::write_content(
crate::db::ContentKey::GateOutput(story_id),
"some previous output",
);
let mut args: Vec<String> = vec!["--verbose".to_string()];
maybe_inject_gate_failure(&mut args, story_id);
assert!(
!args.iter().any(|a| a == "--append-system-prompt"),
"no --append-system-prompt should be added when retry_count is 0; args: {args:?}"
);
}
/// Injection appends to an existing `--append-system-prompt` value rather
/// than adding a duplicate flag.
#[test]
fn gate_failure_appends_to_existing_system_prompt() {
let gate_output = "test failure output";
let mut args = vec![
"--append-system-prompt".to_string(),
"base prompt".to_string(),
];
inject_gate_failure_section(&mut args, gate_output);
// Only one --append-system-prompt flag.
let count = args
.iter()
.filter(|a| a.as_str() == "--append-system-prompt")
.count();
assert_eq!(count, 1, "must not duplicate --append-system-prompt");
let pos = args
.iter()
.position(|a| a == "--append-system-prompt")
.unwrap();
let value = &args[pos + 1];
assert!(
value.contains("base prompt"),
"original prompt must be preserved"
);
assert!(
value.contains("Your previous run's quality gates failed:"),
"failure section must be appended"
);
assert!(
value.contains("test failure output"),
"gate_output must appear in value"
);
}
2026-04-30 00:31:08 +00:00
2026-05-14 14:48:55 +00:00
/// AC4 (story 1044): three ConflictDetected stories queued with no active mergemaster;
/// `pick_queued_conflict_detected` must return exactly one story per call.
#[test]
fn mergemaster_exit_picks_up_one_queued_conflict_detected() {
use crate::agents::lifecycle::transition_to_merge_failure;
use crate::pipeline_state::MergeFailureKind;
crate::crdt_state::init_for_test();
crate::db::ensure_content_store();
let story_ids = ["1044_qcd_alpha", "1044_qcd_beta", "1044_qcd_gamma"];
// Seed each story in 4_merge then transition to MergeFailure ConflictDetected.
for sid in &story_ids {
crate::db::write_item_with_content(
sid,
"4_merge",
"---\nname: Test\n---\n",
crate::db::ItemMeta::named("Test"),
);
transition_to_merge_failure(
sid,
MergeFailureKind::ConflictDetected(Some(
"CONFLICT (content): src/lib.rs".to_string(),
)),
)
.expect("transition to ConflictDetected must succeed");
}
let config = crate::config::ProjectConfig::parse(
"[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
)
.unwrap();
// No active mergemaster in the pool.
let agents: HashMap<String, StoryAgent> = HashMap::new();
let exclude = "9999_finished";
let result = pick_queued_conflict_detected(&config, &agents, exclude);
assert!(
result.is_some(),
"must find a queued ConflictDetected story when none have a mergemaster"
);
let (chosen_id, chosen_agent) = result.unwrap();
assert!(
story_ids.contains(&chosen_id.as_str()),
"chosen story must be one of the three queued stories; got: {chosen_id}"
);
assert_eq!(
chosen_agent, "mergemaster",
"chosen agent must be the configured mergemaster"
);
// AC3: a second call (simulating the next mergemaster exit) must pick a
// different story. The mergemaster for `chosen_id` has now exited and
// freed its slot — agents map is empty again — so we exclude `chosen_id`
// and expect the scan to return one of the remaining two.
let result2 = pick_queued_conflict_detected(&config, &HashMap::new(), &chosen_id);
assert!(
result2.is_some(),
"second exit must find another queued story"
);
let (chosen_id2, _) = result2.unwrap();
assert_ne!(
chosen_id2, chosen_id,
"second pick must be a different story from the first"
);
assert!(
story_ids.contains(&chosen_id2.as_str()),
"second chosen story must be one of the three; got: {chosen_id2}"
);
}
2026-04-30 00:31:08 +00:00
/// AC3 (bug 882): simulates the abort-respawn counter mechanism to verify that
/// retry_count is never bumped during consecutive aborted+no-session exits and
/// that the abort counter reaches the cap (5) before blocking.
#[test]
fn abort_respawn_leaves_retry_count_unchanged_and_caps_at_five() {
crate::crdt_state::init_for_test();
crate::db::ensure_content_store();
let story_id = "9962_story_abort_respawn_882";
2026-04-30 22:23:21 +00:00
crate::db::write_item_with_content(
story_id,
"2_current",
"---\nname: Test\n---\n",
crate::db::ItemMeta::named("Test"),
2026-04-30 22:23:21 +00:00
);
2026-04-30 00:31:08 +00:00
const CAP: u32 = 5;
// Simulate CAP consecutive abort-before-session exits.
for expected_count in 1u32..=CAP {
// This is exactly the counter logic in run_agent_spawn's abort path.
2026-05-13 11:22:57 +00:00
let count = crate::db::read_content(crate::db::ContentKey::AbortRespawnCount(story_id))
2026-04-30 00:31:08 +00:00
.and_then(|s| s.trim().parse::<u32>().ok())
.unwrap_or(0)
+ 1;
2026-05-13 11:22:57 +00:00
crate::db::write_content(
crate::db::ContentKey::AbortRespawnCount(story_id),
&count.to_string(),
);
2026-04-30 00:31:08 +00:00
assert_eq!(
count, expected_count,
"abort counter must increment by 1 each time"
);
// retry_count must remain 0 — the abort path never calls bump_retry_count.
let retry_count = crate::crdt_state::read_item(story_id)
2026-05-12 17:03:41 +00:00
.map(|item| item.retry_count())
2026-04-30 00:31:08 +00:00
.unwrap_or(0);
assert_eq!(
retry_count, 0,
"retry_count must not be incremented by the abort-respawn path \
(got {retry_count} on cycle {expected_count})"
);
}
// After CAP cycles the counter equals the cap — the story would be blocked.
2026-05-13 11:22:57 +00:00
let final_count: u32 =
crate::db::read_content(crate::db::ContentKey::AbortRespawnCount(story_id))
.and_then(|s| s.trim().parse().ok())
.unwrap_or(0);
2026-04-30 00:31:08 +00:00
assert_eq!(
final_count, CAP,
"counter must equal {CAP} after {CAP} abort cycles"
);
assert!(
final_count >= CAP,
"a count of {final_count} triggers blocking (>= {CAP})"
);
}
}