huskies: merge 1100 bug Multiple LLM agents can run concurrently on the same story (coder + mergemaster + others) — enforce one-agent-per-story invariant

This commit is contained in:
dave
2026-05-15 20:19:35 +00:00
parent 9f4f493486
commit 4216ced493
7 changed files with 429 additions and 13 deletions
+80 -1
View File
@@ -5,7 +5,10 @@ use crate::slog_error;
use crate::slog_warn;
use std::path::Path;
use super::super::{AgentEvent, AgentStatus};
use super::super::{
AgentEvent, AgentStatus, PipelineStage, agent_config_stage, canonical_pipeline_stage,
pipeline_stage,
};
use super::AgentPool;
use super::types::composite_key;
@@ -114,6 +117,82 @@ impl AgentPool {
Ok(())
}
/// Stop LLM agents whose pipeline stage no longer matches the story's canonical stage.
///
/// Called periodically by the tick loop (story 1100). For each Running or Pending
/// LLM agent (Coder, Qa, or Mergemaster) whose stage does not match the canonical
/// stage derived from the story's current CRDT state, the agent is stopped via the
/// existing SIGKILL path. Idempotent: agents already at the correct stage are left
/// untouched. Also stops LLM agents on stories that have no active pipeline stage
/// (terminal, blocked, or frozen), since no LLM agent should run there.
pub async fn reconcile_canonical_agents(&self, root: &std::path::Path) {
use crate::config::ProjectConfig;
let config = match ProjectConfig::load(root) {
Ok(c) => c,
Err(e) => {
slog_warn!("[reconcile] Cannot load config for canonical reconcile: {e}");
return;
}
};
// Snapshot active LLM agents without holding the lock during async stops.
let snapshot: Vec<(String, String, PipelineStage)> = {
let Ok(agents) = self.agents.lock() else {
return;
};
agents
.iter()
.filter_map(|(key, a)| {
if !matches!(a.status, AgentStatus::Running | AgentStatus::Pending) {
return None;
}
let stage = config
.find_agent(&a.agent_name)
.map(agent_config_stage)
.unwrap_or_else(|| pipeline_stage(&a.agent_name));
if !matches!(
stage,
PipelineStage::Coder | PipelineStage::Qa | PipelineStage::Mergemaster
) {
return None;
}
let story_id = key
.rsplit_once(':')
.map(|(s, _)| s)
.unwrap_or(key)
.to_string();
Some((story_id, a.agent_name.clone(), stage))
})
.collect()
};
for (story_id, agent_name, agent_stage) in snapshot {
let canonical = crate::pipeline_state::read_typed(&story_id)
.ok()
.flatten()
.and_then(|item| canonical_pipeline_stage(&item.stage));
let should_stop = match &canonical {
None => true,
Some(c) if *c != agent_stage => true,
_ => false,
};
if !should_stop {
continue;
}
slog!(
"[reconcile] stopping '{agent_name}' on '{story_id}': \
canonical={canonical:?} actual={agent_stage:?}"
);
if let Err(e) = self.stop_agent(root, &story_id, &agent_name).await {
slog_warn!("[reconcile] failed to stop '{agent_name}' on '{story_id}': {e}");
}
}
}
/// Remove all agent entries for a given story_id from the pool.
///
/// Called when a story is archived so that stale entries don't accumulate.