From 4216ced4935cc77d91773655e92ce5ee3ec36f8e Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 15 May 2026 20:19:35 +0000 Subject: [PATCH] =?UTF-8?q?huskies:=20merge=201100=20bug=20Multiple=20LLM?= =?UTF-8?q?=20agents=20can=20run=20concurrently=20on=20the=20same=20story?= =?UTF-8?q?=20(coder=20+=20mergemaster=20+=20others)=20=E2=80=94=20enforce?= =?UTF-8?q?=20one-agent-per-story=20invariant?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/src/agents/mod.rs | 36 +++ server/src/agents/pool/start/mod.rs | 36 +++ .../agents/pool/start/tests_concurrency.rs | 260 ++++++++++++++++++ server/src/agents/pool/start/validation.rs | 23 +- server/src/agents/pool/stop.rs | 81 +++++- server/src/agents/pool/worktree.rs | 2 + server/src/startup/tick_loop.rs | 4 + 7 files changed, 429 insertions(+), 13 deletions(-) diff --git a/server/src/agents/mod.rs b/server/src/agents/mod.rs index e3326277..425abf92 100644 --- a/server/src/agents/mod.rs +++ b/server/src/agents/mod.rs @@ -161,6 +161,42 @@ pub fn pipeline_stage(agent_name: &str) -> PipelineStage { } } +/// Map a pipeline [`Stage`] to the canonical [`PipelineStage`] for LLM agent spawning. +/// +/// Returns `None` for stages where no LLM agent should be active (terminal states, +/// blocked, frozen, or unclassified merge failures requiring human intervention). +/// Returns `Some(stage)` naming the single LLM-agent type that may run on this story. +/// Used by `validate_agent_stage` and `reconcile_canonical_agents` to enforce the +/// one-agent-per-story invariant (story 1100). +pub fn canonical_pipeline_stage(s: &crate::pipeline_state::Stage) -> Option { + use crate::pipeline_state::{MergeFailureKind, Stage}; + match s { + Stage::Coding { .. } => Some(PipelineStage::Coder), + Stage::Qa => Some(PipelineStage::Qa), + Stage::Merge { .. } => Some(PipelineStage::Mergemaster), + Stage::MergeFailure { + kind: MergeFailureKind::ConflictDetected(_), + .. + } => Some(PipelineStage::Mergemaster), + Stage::MergeFailure { + kind: MergeFailureKind::GatesFailed(_), + .. + } => Some(PipelineStage::Coder), + Stage::MergeFailureFinal { .. } => Some(PipelineStage::Mergemaster), + Stage::Upcoming + | Stage::Backlog + | Stage::MergeFailure { .. } + | Stage::Done { .. } + | Stage::Blocked { .. } + | Stage::Archived { .. } + | Stage::Frozen { .. } + | Stage::ReviewHold { .. } + | Stage::Abandoned { .. } + | Stage::Superseded { .. } + | Stage::Rejected { .. } => None, + } +} + /// Determine the pipeline stage for a configured agent. /// /// Prefers the explicit `stage` config field (added in Bug 150) over the diff --git a/server/src/agents/pool/start/mod.rs b/server/src/agents/pool/start/mod.rs index 6a8b5a0b..820a9ab8 100644 --- a/server/src/agents/pool/start/mod.rs +++ b/server/src/agents/pool/start/mod.rs @@ -271,6 +271,42 @@ impl AgentPool { '{conflicting_name}' is already active at the same pipeline stage" )); } + // Cross-stage LLM agent guard: reject if any Coder/Qa/Mergemaster agent + // is already Running or Pending on this story at a *different* pipeline stage. + // These are stale agents left over from a previous stage transition that has + // since advanced. The periodic reconciler (reconcile_canonical_agents) stops + // them; here we surface the conflict so the caller waits for reconciliation. + if matches!( + resolved_stage, + PipelineStage::Coder | PipelineStage::Qa | PipelineStage::Mergemaster + ) && let Some(stale_name) = agents.iter().find_map(|(k, a)| { + let k_story = k.rsplit_once(':').map(|(s, _)| s).unwrap_or(k); + if k_story != story_id || a.agent_name == resolved_name { + return None; + } + if !matches!(a.status, AgentStatus::Running | AgentStatus::Pending) { + return None; + } + let a_stage = config + .find_agent(&a.agent_name) + .map(agent_config_stage) + .unwrap_or_else(|| pipeline_stage(&a.agent_name)); + if matches!( + a_stage, + PipelineStage::Coder | PipelineStage::Qa | PipelineStage::Mergemaster + ) && a_stage != resolved_stage + { + Some(a.agent_name.clone()) + } else { + None + } + }) { + return Err(format!( + "story '{story_id}' already has an active LLM agent '{stale_name}'; \ + refusing to spawn '{resolved_name}'" + )); + } + // Enforce single-instance concurrency for explicitly-named agents: // if this agent is already running on any other story, reject. // Auto-selected agents are already guaranteed idle by diff --git a/server/src/agents/pool/start/tests_concurrency.rs b/server/src/agents/pool/start/tests_concurrency.rs index 4f65a019..d34e49c1 100644 --- a/server/src/agents/pool/start/tests_concurrency.rs +++ b/server/src/agents/pool/start/tests_concurrency.rs @@ -602,6 +602,266 @@ async fn start_agent_allows_correct_stage_agent() { } } +// ── story-1100: cross-stage LLM agent rejection ───────────────────────── + +#[tokio::test] +async fn start_agent_rejects_mergemaster_when_coder_running_same_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".huskies"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ + [[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + pool.inject_test_agent("999_story_cross", "coder-1", AgentStatus::Running); + + let result = pool + .start_agent(root, "999_story_cross", Some("mergemaster"), None, None) + .await; + + assert!( + result.is_err(), + "mergemaster must be rejected when coder-1 is still running on same story" + ); + let err = result.unwrap_err(); + assert!( + err.contains("active LLM agent") || err.contains("stale agent"), + "error must mention active LLM agent conflict, got: '{err}'" + ); +} + +#[tokio::test] +async fn start_agent_rejects_coder_when_mergemaster_running_same_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".huskies"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ + [[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + pool.inject_test_agent("888_story_cross2", "mergemaster", AgentStatus::Running); + + let result = pool + .start_agent(root, "888_story_cross2", Some("coder-1"), None, None) + .await; + + assert!( + result.is_err(), + "coder-1 must be rejected when mergemaster is running on same story" + ); + let err = result.unwrap_err(); + assert!( + err.contains("active LLM agent") || err.contains("stale agent"), + "error must mention active LLM agent conflict, got: '{err}'" + ); +} + +#[tokio::test] +async fn start_agent_cross_stage_does_not_block_different_stories() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".huskies"); + fs::create_dir_all(sk_dir.join("work/1_backlog")).unwrap(); + fs::write( + root.join(".huskies/project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ + [[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", + ) + .unwrap(); + fs::write( + root.join(".huskies/work/1_backlog/777_story_other.md"), + "---\nname: Other\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + // mergemaster running on story-x should NOT block coder on story-y + pool.inject_test_agent("111_story_x", "mergemaster", AgentStatus::Running); + + let result = pool + .start_agent(root, "777_story_other", Some("coder-1"), None, None) + .await; + + if let Err(ref e) = result { + assert!( + !e.contains("active LLM agent") && !e.contains("stale agent"), + "cross-stage guard must not fire for agents on different stories, got: '{e}'" + ); + } +} + +#[tokio::test] +async fn reconcile_canonical_agents_stops_stale_coder_in_qa_stage() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".huskies"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", + ) + .unwrap(); + + // Write story to CRDT in QA stage: canonical = Qa, but coder-1 is Running. + crate::db::ensure_content_store(); + crate::db::write_item_with_content( + "777_story_reconcile", + "qa", + "---\nname: Reconcile Test\n---\n", + crate::db::ItemMeta::named("Reconcile Test"), + ); + + let pool = AgentPool::new_test(3099); + pool.inject_test_agent("777_story_reconcile", "coder-1", AgentStatus::Running); + + let before = pool.list_agents().unwrap(); + assert!( + before.iter().any(|a| a.agent_name == "coder-1" + && matches!(a.status, AgentStatus::Running | AgentStatus::Pending)), + "coder-1 should be Running before reconciliation" + ); + + pool.reconcile_canonical_agents(root).await; + + let after = pool.list_agents().unwrap(); + let still_active = after.iter().any(|a| { + a.story_id == "777_story_reconcile" + && a.agent_name == "coder-1" + && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) + }); + assert!( + !still_active, + "reconciler must have stopped coder-1 (CRDT stage is QA, coder is wrong stage)" + ); +} + +#[tokio::test] +async fn reconcile_canonical_agents_leaves_correct_stage_agent_alone() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".huskies"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", + ) + .unwrap(); + + // Story is in coding stage: canonical = Coder. coder-1 is correct. + crate::db::ensure_content_store(); + crate::db::write_item_with_content( + "555_story_correct", + "coding", + "---\nname: Correct Stage\n---\n", + crate::db::ItemMeta::named("Correct Stage"), + ); + + let pool = AgentPool::new_test(3099); + pool.inject_test_agent("555_story_correct", "coder-1", AgentStatus::Running); + + pool.reconcile_canonical_agents(root).await; + + let after = pool.list_agents().unwrap(); + let still_active = after.iter().any(|a| { + a.story_id == "555_story_correct" + && a.agent_name == "coder-1" + && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) + }); + assert!( + still_active, + "reconciler must NOT stop coder-1 when it matches the canonical stage" + ); +} + +/// Regression test for story 1100: a stale coder left running after a stage +/// transition blocks both a same-stage coder and a cross-stage mergemaster. +/// The periodic reconciler stops the stale coder, after which the pool no +/// longer has a cross-stage conflict. +#[tokio::test] +async fn regression_1100_stale_coder_blocks_mergemaster_then_reconciler_clears() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".huskies"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ + [[agent]]\nname = \"coder-2\"\nstage = \"coder\"\n\n\ + [[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + // Simulate coder-1 still Running after the story advanced past the coding stage. + pool.inject_test_agent("1100_reg", "coder-1", AgentStatus::Running); + + // coder-2 blocked by same-stage check (both are Coder stage) + let r1 = pool + .start_agent(root, "1100_reg", Some("coder-2"), None, None) + .await; + assert!(r1.is_err(), "coder-2 must be rejected by same-stage guard"); + assert!( + r1.unwrap_err().contains("same pipeline stage"), + "same-stage check must fire for coder-2" + ); + + // mergemaster blocked by cross-stage LLM guard (coder-1 is a different LLM stage) + let r2 = pool + .start_agent(root, "1100_reg", Some("mergemaster"), None, None) + .await; + assert!( + r2.is_err(), + "mergemaster must be rejected because coder-1 (different LLM stage) is still running" + ); + let r2_err = r2.unwrap_err(); + assert!( + r2_err.contains("active LLM agent") || r2_err.contains("stale agent"), + "cross-stage rejection expected, got: '{r2_err}'" + ); + + // Reconciler: story "1100_reg" has no CRDT entry → canonical = None → stop coder-1. + pool.reconcile_canonical_agents(root).await; + + // coder-1 must be gone from the active pool. + let remaining = pool.list_agents().unwrap(); + assert!( + !remaining.iter().any(|a| { + a.story_id == "1100_reg" + && a.agent_name == "coder-1" + && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) + }), + "reconciler must have removed stale coder-1 from the active pool" + ); +} + /// Bug 502: when start_agent is called for a non-Coder agent (mergemaster /// or qa) on a story that's in 4_merge/, the unconditional /// move_story_to_current at the top of start_agent must NOT fire — even diff --git a/server/src/agents/pool/start/validation.rs b/server/src/agents/pool/start/validation.rs index a2748a6d..7d824379 100644 --- a/server/src/agents/pool/start/validation.rs +++ b/server/src/agents/pool/start/validation.rs @@ -2,11 +2,11 @@ use std::path::Path; -use crate::config::ProjectConfig; -use crate::pipeline_state::Stage; - -use super::super::super::{PipelineStage, agent_config_stage, pipeline_stage}; +use super::super::super::{ + PipelineStage, agent_config_stage, canonical_pipeline_stage, pipeline_stage, +}; use super::super::worktree::find_active_story_stage; +use crate::config::ProjectConfig; /// Validate that an explicit `agent_name` is allowed to attach to `story_id`'s /// current pipeline stage. @@ -34,16 +34,15 @@ pub(super) fn validate_agent_stage( let Some(story_stage) = find_active_story_stage(project_root, story_id) else { return Ok(()); }; - let expected_stage = match story_stage { - Stage::Coding { .. } => PipelineStage::Coder, - Stage::Qa => PipelineStage::Qa, - Stage::Merge { .. } => PipelineStage::Mergemaster, - _ => PipelineStage::Other, - }; - if expected_stage != PipelineStage::Other && expected_stage != agent_stage { + let canonical = canonical_pipeline_stage(&story_stage); + let is_llm = matches!( + agent_stage, + PipelineStage::Coder | PipelineStage::Qa | PipelineStage::Mergemaster + ); + if is_llm && (canonical.is_none() || canonical.as_ref() != Some(&agent_stage)) { return Err(format!( "Agent '{name}' (stage: {agent_stage:?}) cannot be assigned to \ - story '{story_id}' in {}/ (requires stage: {expected_stage:?})", + story '{story_id}' in {}/ (requires stage: {canonical:?})", story_stage.dir_name() )); } diff --git a/server/src/agents/pool/stop.rs b/server/src/agents/pool/stop.rs index f6aff2e6..64446f32 100644 --- a/server/src/agents/pool/stop.rs +++ b/server/src/agents/pool/stop.rs @@ -5,7 +5,10 @@ use crate::slog_error; use crate::slog_warn; use std::path::Path; -use super::super::{AgentEvent, AgentStatus}; +use super::super::{ + AgentEvent, AgentStatus, PipelineStage, agent_config_stage, canonical_pipeline_stage, + pipeline_stage, +}; use super::AgentPool; use super::types::composite_key; @@ -114,6 +117,82 @@ impl AgentPool { Ok(()) } + /// Stop LLM agents whose pipeline stage no longer matches the story's canonical stage. + /// + /// Called periodically by the tick loop (story 1100). For each Running or Pending + /// LLM agent (Coder, Qa, or Mergemaster) whose stage does not match the canonical + /// stage derived from the story's current CRDT state, the agent is stopped via the + /// existing SIGKILL path. Idempotent: agents already at the correct stage are left + /// untouched. Also stops LLM agents on stories that have no active pipeline stage + /// (terminal, blocked, or frozen), since no LLM agent should run there. + pub async fn reconcile_canonical_agents(&self, root: &std::path::Path) { + use crate::config::ProjectConfig; + + let config = match ProjectConfig::load(root) { + Ok(c) => c, + Err(e) => { + slog_warn!("[reconcile] Cannot load config for canonical reconcile: {e}"); + return; + } + }; + + // Snapshot active LLM agents without holding the lock during async stops. + let snapshot: Vec<(String, String, PipelineStage)> = { + let Ok(agents) = self.agents.lock() else { + return; + }; + agents + .iter() + .filter_map(|(key, a)| { + if !matches!(a.status, AgentStatus::Running | AgentStatus::Pending) { + return None; + } + let stage = config + .find_agent(&a.agent_name) + .map(agent_config_stage) + .unwrap_or_else(|| pipeline_stage(&a.agent_name)); + if !matches!( + stage, + PipelineStage::Coder | PipelineStage::Qa | PipelineStage::Mergemaster + ) { + return None; + } + let story_id = key + .rsplit_once(':') + .map(|(s, _)| s) + .unwrap_or(key) + .to_string(); + Some((story_id, a.agent_name.clone(), stage)) + }) + .collect() + }; + + for (story_id, agent_name, agent_stage) in snapshot { + let canonical = crate::pipeline_state::read_typed(&story_id) + .ok() + .flatten() + .and_then(|item| canonical_pipeline_stage(&item.stage)); + + let should_stop = match &canonical { + None => true, + Some(c) if *c != agent_stage => true, + _ => false, + }; + + if !should_stop { + continue; + } + + slog!( + "[reconcile] stopping '{agent_name}' on '{story_id}': \ + canonical={canonical:?} actual={agent_stage:?}" + ); + if let Err(e) = self.stop_agent(root, &story_id, &agent_name).await { + slog_warn!("[reconcile] failed to stop '{agent_name}' on '{story_id}': {e}"); + } + } + } + /// Remove all agent entries for a given story_id from the pool. /// /// Called when a story is archived so that stale entries don't accumulate. diff --git a/server/src/agents/pool/worktree.rs b/server/src/agents/pool/worktree.rs index cab9bb03..ca037edf 100644 --- a/server/src/agents/pool/worktree.rs +++ b/server/src/agents/pool/worktree.rs @@ -33,6 +33,8 @@ pub(super) fn find_active_story_stage( crate::pipeline_state::Stage::Coding { .. } | crate::pipeline_state::Stage::Qa | crate::pipeline_state::Stage::Merge { .. } + | crate::pipeline_state::Stage::MergeFailure { .. } + | crate::pipeline_state::Stage::MergeFailureFinal { .. } ) { return Some(item.stage); diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index 3fcd48f5..4621ed84 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -209,6 +209,10 @@ pub(crate) fn spawn_tick_loop( { crate::slog!("[reconcile] Running periodic reconcile pass."); run_reconcile_pass(r, &agents, done_retention).await; + // Stop LLM agents whose pipeline stage no longer matches the + // story's current canonical stage. Cleans up stale agents left + // behind after a stage transition (story 1100). + agents.reconcile_canonical_agents(r).await; } } });