Files
storkit/server/src/agents/pool/auto_assign.rs

1814 lines
69 KiB
Rust

//! Auto-assign logic: scanning pipeline stages for unassigned stories and
//! dispatching free agents, startup reconciliation, and the watchdog task.
use crate::config::ProjectConfig;
use crate::slog;
use crate::slog_error;
use crate::slog_warn;
use crate::worktree;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use super::super::{
AgentEvent, AgentStatus, PipelineStage, ReconciliationEvent, agent_config_stage, pipeline_stage,
};
use super::{AgentPool, StoryAgent, find_active_story_stage};
impl AgentPool {
pub async fn auto_assign_available_work(&self, project_root: &Path) {
let config = match ProjectConfig::load(project_root) {
Ok(c) => c,
Err(e) => {
slog_warn!("[auto-assign] Failed to load project config: {e}");
return;
}
};
// Process each active pipeline stage in order.
let stages: [(&str, PipelineStage); 3] = [
("2_current", PipelineStage::Coder),
("3_qa", PipelineStage::Qa),
("4_merge", PipelineStage::Mergemaster),
];
for (stage_dir, stage) in &stages {
let items = scan_stage_items(project_root, stage_dir);
if items.is_empty() {
continue;
}
for story_id in &items {
// Items marked with review_hold (e.g. spikes after QA passes) stay
// in their current stage for human review — don't auto-assign agents.
if has_review_hold(project_root, stage_dir, story_id) {
continue;
}
// Skip blocked stories (retry limit exceeded).
if is_story_blocked(project_root, stage_dir, story_id) {
continue;
}
// Skip stories in 4_merge/ that already have a reported merge failure.
// These need human intervention — auto-assigning a new mergemaster
// would just waste tokens on the same broken merge.
if *stage == PipelineStage::Mergemaster
&& has_merge_failure(project_root, stage_dir, story_id)
{
continue;
}
// AC6: Detect empty-diff stories in 4_merge/ before starting a
// mergemaster. If the worktree has no commits on the feature branch,
// write a merge_failure and block the story immediately.
if *stage == PipelineStage::Mergemaster
&& let Some(wt_path) = worktree::find_worktree_path(project_root, story_id)
&& !super::super::gates::worktree_has_committed_work(&wt_path)
{
slog_warn!(
"[auto-assign] Story '{story_id}' in 4_merge/ has no commits \
on feature branch. Writing merge_failure and blocking."
);
let story_path = project_root
.join(".story_kit/work")
.join(stage_dir)
.join(format!("{story_id}.md"));
let _ = crate::io::story_metadata::write_merge_failure(
&story_path,
"Feature branch has no code changes — the coder agent \
did not produce any commits.",
);
let _ = crate::io::story_metadata::write_blocked(&story_path);
continue;
}
// Re-acquire the lock on each iteration to see state changes
// from previous start_agent calls in the same pass.
let preferred_agent =
read_story_front_matter_agent(project_root, stage_dir, story_id);
// Check max_coders limit for the Coder stage before agent selection.
// If the pool is full, all remaining items in this stage wait.
if *stage == PipelineStage::Coder
&& let Some(max) = config.max_coders
{
let agents_lock = match self.agents.lock() {
Ok(a) => a,
Err(e) => {
slog_error!("[auto-assign] Failed to lock agents: {e}");
break;
}
};
let active = count_active_agents_for_stage(&config, &agents_lock, stage);
if active >= max {
slog!(
"[auto-assign] Coder pool full ({active}/{max}); remaining items in {stage_dir}/ will wait."
);
break;
}
}
// Outcome: (already_assigned, chosen_agent, preferred_busy, stage_mismatch)
// preferred_busy=true means the story has a specific agent requested but it is
// currently occupied — the story should wait rather than fall back.
// stage_mismatch=true means the preferred agent's stage doesn't match the
// pipeline stage, so we fell back to a generic stage agent.
let (already_assigned, free_agent, preferred_busy, stage_mismatch) = {
let agents = match self.agents.lock() {
Ok(a) => a,
Err(e) => {
slog_error!("[auto-assign] Failed to lock agents: {e}");
break;
}
};
let assigned = is_story_assigned_for_stage(&config, &agents, story_id, stage);
if assigned {
(true, None, false, false)
} else if let Some(ref pref) = preferred_agent {
// Story has a front-matter agent preference.
// Verify the preferred agent's stage matches the current
// pipeline stage — a coder shouldn't be assigned to QA.
let pref_stage_matches = config
.find_agent(pref)
.map(|cfg| agent_config_stage(cfg) == *stage)
.unwrap_or(false);
if !pref_stage_matches {
// Stage mismatch — fall back to any free agent for this stage.
let free = find_free_agent_for_stage(&config, &agents, stage)
.map(|s| s.to_string());
(false, free, false, true)
} else if is_agent_free(&agents, pref) {
(false, Some(pref.clone()), false, false)
} else {
(false, None, true, false)
}
} else {
let free = find_free_agent_for_stage(&config, &agents, stage)
.map(|s| s.to_string());
(false, free, false, false)
}
};
if already_assigned {
// Story already has an active agent — skip silently.
continue;
}
if preferred_busy {
// The story requests a specific agent that is currently busy.
// Do not fall back to a different agent; let this story wait.
slog!(
"[auto-assign] Preferred agent '{}' busy for '{story_id}'; story will wait.",
preferred_agent.as_deref().unwrap_or("?")
);
continue;
}
if stage_mismatch {
slog!(
"[auto-assign] Preferred agent '{}' stage mismatch for '{story_id}' in {stage_dir}/; falling back to stage-appropriate agent.",
preferred_agent.as_deref().unwrap_or("?")
);
}
match free_agent {
Some(agent_name) => {
slog!(
"[auto-assign] Assigning '{agent_name}' to '{story_id}' in {stage_dir}/"
);
if let Err(e) = self
.start_agent(project_root, story_id, Some(&agent_name), None)
.await
{
slog!(
"[auto-assign] Failed to start '{agent_name}' for '{story_id}': {e}"
);
}
}
None => {
// No free agents of this type — stop scanning this stage.
slog!(
"[auto-assign] All {:?} agents busy; remaining items in {stage_dir}/ will wait.",
stage
);
break;
}
}
}
}
}
/// Reconcile stories whose agent work was committed while the server was offline.
///
/// On server startup the in-memory agent pool is empty, so any story that an agent
/// completed during a previous session is stuck: the worktree has committed work but
/// the pipeline never advanced. This method detects those stories, re-runs the
/// acceptance gates, and advances the pipeline stage so that `auto_assign_available_work`
/// (called immediately after) picks up the right next-stage agents.
///
/// Algorithm:
/// 1. List all worktree directories under `{project_root}/.story_kit/worktrees/`.
/// 2. For each worktree, check whether its feature branch has commits ahead of the
/// base branch (`master` / `main`).
/// 3. If committed work is found AND the story is in `2_current/` or `3_qa/`:
/// - Run acceptance gates (uncommitted-change check + clippy + tests).
/// - On pass + `2_current/`: move the story to `3_qa/`.
/// - On pass + `3_qa/`: run the coverage gate; if that also passes move to `4_merge/`.
/// - On failure: leave the story where it is so `auto_assign_available_work` can
/// start a fresh agent to retry.
/// 4. Stories in `4_merge/` are left for `auto_assign_available_work` to handle via a
/// fresh mergemaster (squash-merge must be re-executed by the mergemaster agent).
pub async fn reconcile_on_startup(
&self,
project_root: &Path,
progress_tx: &broadcast::Sender<ReconciliationEvent>,
) {
let worktrees = match worktree::list_worktrees(project_root) {
Ok(wt) => wt,
Err(e) => {
eprintln!("[startup:reconcile] Failed to list worktrees: {e}");
let _ = progress_tx.send(ReconciliationEvent {
story_id: String::new(),
status: "done".to_string(),
message: format!("Reconciliation failed: {e}"),
});
return;
}
};
for wt_entry in &worktrees {
let story_id = &wt_entry.story_id;
let wt_path = wt_entry.path.clone();
// Determine which active stage the story is in.
let stage_dir = match find_active_story_stage(project_root, story_id) {
Some(s) => s,
None => continue, // Not in any active stage (backlog/archived or unknown).
};
// 4_merge/ is left for auto_assign to handle with a fresh mergemaster.
if stage_dir == "4_merge" {
continue;
}
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "checking".to_string(),
message: format!("Checking for committed work in {stage_dir}/"),
});
// Check whether the worktree has commits ahead of the base branch.
let wt_path_for_check = wt_path.clone();
let has_work = tokio::task::spawn_blocking(move || {
super::super::gates::worktree_has_committed_work(&wt_path_for_check)
})
.await
.unwrap_or(false);
if !has_work {
eprintln!(
"[startup:reconcile] No committed work for '{story_id}' in {stage_dir}/; skipping."
);
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "skipped".to_string(),
message: "No committed work found; skipping.".to_string(),
});
continue;
}
eprintln!(
"[startup:reconcile] Found committed work for '{story_id}' in {stage_dir}/. Running acceptance gates."
);
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "gates_running".to_string(),
message: "Running acceptance gates…".to_string(),
});
// Run acceptance gates on the worktree.
let wt_path_for_gates = wt_path.clone();
let gates_result = tokio::task::spawn_blocking(move || {
super::super::gates::check_uncommitted_changes(&wt_path_for_gates)?;
super::super::gates::run_acceptance_gates(&wt_path_for_gates)
})
.await;
let (gates_passed, gate_output) = match gates_result {
Ok(Ok(pair)) => pair,
Ok(Err(e)) => {
eprintln!("[startup:reconcile] Gate check error for '{story_id}': {e}");
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: format!("Gate error: {e}"),
});
continue;
}
Err(e) => {
eprintln!("[startup:reconcile] Gate check task panicked for '{story_id}': {e}");
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: format!("Gate task panicked: {e}"),
});
continue;
}
};
if !gates_passed {
eprintln!(
"[startup:reconcile] Gates failed for '{story_id}': {gate_output}\n\
Leaving in {stage_dir}/ for auto-assign to restart the agent."
);
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: "Gates failed; will be retried by auto-assign.".to_string(),
});
continue;
}
eprintln!("[startup:reconcile] Gates passed for '{story_id}' (stage: {stage_dir}/).");
if stage_dir == "2_current" {
// Coder stage — determine qa mode to decide next step.
let qa_mode = {
let item_type = super::super::lifecycle::item_type_from_id(story_id);
if item_type == "spike" {
crate::io::story_metadata::QaMode::Human
} else {
let default_qa = crate::config::ProjectConfig::load(project_root)
.unwrap_or_default()
.default_qa_mode();
let story_path = project_root
.join(".story_kit/work/2_current")
.join(format!("{story_id}.md"));
crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa)
}
};
match qa_mode {
crate::io::story_metadata::QaMode::Server => {
if let Err(e) = super::super::lifecycle::move_story_to_merge(project_root, story_id) {
eprintln!("[startup:reconcile] Failed to move '{story_id}' to 4_merge/: {e}");
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: format!("Failed to advance to merge: {e}"),
});
} else {
eprintln!("[startup:reconcile] Moved '{story_id}' → 4_merge/ (qa: server).");
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "advanced".to_string(),
message: "Gates passed — moved to merge (qa: server).".to_string(),
});
}
}
crate::io::story_metadata::QaMode::Agent => {
if let Err(e) = super::super::lifecycle::move_story_to_qa(project_root, story_id) {
eprintln!("[startup:reconcile] Failed to move '{story_id}' to 3_qa/: {e}");
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: format!("Failed to advance to QA: {e}"),
});
} else {
eprintln!("[startup:reconcile] Moved '{story_id}' → 3_qa/.");
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "advanced".to_string(),
message: "Gates passed — moved to QA.".to_string(),
});
}
}
crate::io::story_metadata::QaMode::Human => {
if let Err(e) = super::super::lifecycle::move_story_to_qa(project_root, story_id) {
eprintln!("[startup:reconcile] Failed to move '{story_id}' to 3_qa/: {e}");
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: format!("Failed to advance to QA: {e}"),
});
} else {
let story_path = project_root
.join(".story_kit/work/3_qa")
.join(format!("{story_id}.md"));
if let Err(e) = crate::io::story_metadata::write_review_hold(&story_path) {
eprintln!(
"[startup:reconcile] Failed to set review_hold on '{story_id}': {e}"
);
}
eprintln!("[startup:reconcile] Moved '{story_id}' → 3_qa/ (qa: human — holding for review).");
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "review_hold".to_string(),
message: "Gates passed — holding for human review.".to_string(),
});
}
}
}
} else if stage_dir == "3_qa" {
// QA stage → run coverage gate before advancing to merge.
let wt_path_for_cov = wt_path.clone();
let coverage_result = tokio::task::spawn_blocking(move || {
super::super::gates::run_coverage_gate(&wt_path_for_cov)
})
.await;
let (coverage_passed, coverage_output) = match coverage_result {
Ok(Ok(pair)) => pair,
Ok(Err(e)) => {
eprintln!("[startup:reconcile] Coverage gate error for '{story_id}': {e}");
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: format!("Coverage gate error: {e}"),
});
continue;
}
Err(e) => {
eprintln!(
"[startup:reconcile] Coverage gate panicked for '{story_id}': {e}"
);
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: format!("Coverage gate panicked: {e}"),
});
continue;
}
};
if coverage_passed {
// Check whether this item needs human review before merging.
let needs_human_review = {
let item_type = super::super::lifecycle::item_type_from_id(story_id);
if item_type == "spike" {
true
} else {
let story_path = project_root
.join(".story_kit/work/3_qa")
.join(format!("{story_id}.md"));
let default_qa = crate::config::ProjectConfig::load(project_root)
.unwrap_or_default()
.default_qa_mode();
matches!(
crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa),
crate::io::story_metadata::QaMode::Human
)
}
};
if needs_human_review {
let story_path = project_root
.join(".story_kit/work/3_qa")
.join(format!("{story_id}.md"));
if let Err(e) = crate::io::story_metadata::write_review_hold(&story_path) {
eprintln!(
"[startup:reconcile] Failed to set review_hold on '{story_id}': {e}"
);
}
eprintln!(
"[startup:reconcile] '{story_id}' passed QA — holding for human review."
);
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "review_hold".to_string(),
message: "Passed QA — waiting for human review.".to_string(),
});
} else if let Err(e) =
super::super::lifecycle::move_story_to_merge(project_root, story_id)
{
eprintln!(
"[startup:reconcile] Failed to move '{story_id}' to 4_merge/: {e}"
);
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: format!("Failed to advance to merge: {e}"),
});
} else {
eprintln!("[startup:reconcile] Moved '{story_id}' → 4_merge/.");
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "advanced".to_string(),
message: "Gates passed — moved to merge.".to_string(),
});
}
} else {
eprintln!(
"[startup:reconcile] Coverage gate failed for '{story_id}': {coverage_output}\n\
Leaving in 3_qa/ for auto-assign to restart the QA agent."
);
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: "Coverage gate failed; will be retried.".to_string(),
});
}
}
}
// Signal that reconciliation is complete.
let _ = progress_tx.send(ReconciliationEvent {
story_id: String::new(),
status: "done".to_string(),
message: "Startup reconciliation complete.".to_string(),
});
}
/// Run a single watchdog pass synchronously (test helper).
#[cfg(test)]
pub fn run_watchdog_once(&self) {
check_orphaned_agents(&self.agents);
}
/// Spawn a background watchdog task that periodically checks for Running agents
/// whose underlying task has already finished (orphaned entries). Any such agent
/// is marked Failed and an Error event is emitted so that `wait_for_agent` unblocks.
///
/// The watchdog runs every 30 seconds. It is a safety net for edge cases where the
/// PTY read loop exits without updating the agent status (e.g. a panic in the
/// spawn_blocking task, or an external SIGKILL that closes the PTY fd immediately).
///
/// When orphaned agents are detected and a `project_root` is provided, auto-assign
/// is triggered so that free agents can pick up unassigned work.
pub fn spawn_watchdog(pool: Arc<AgentPool>, project_root: Option<PathBuf>) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
let found = check_orphaned_agents(&pool.agents);
if found > 0
&& let Some(ref root) = project_root
{
slog!("[watchdog] {found} orphaned agent(s) detected; triggering auto-assign.");
pool.auto_assign_available_work(root).await;
}
}
});
}
}
// ── Free helper functions ──────────────────────────────────────────────────
/// Read the optional `agent:` field from the front matter of a story file.
///
/// Returns `Some(agent_name)` if the front matter specifies an agent, or `None`
/// if the field is absent or the file cannot be read / parsed.
fn read_story_front_matter_agent(
project_root: &Path,
stage_dir: &str,
story_id: &str,
) -> Option<String> {
use crate::io::story_metadata::parse_front_matter;
let path = project_root
.join(".story_kit")
.join("work")
.join(stage_dir)
.join(format!("{story_id}.md"));
let contents = std::fs::read_to_string(path).ok()?;
parse_front_matter(&contents).ok()?.agent
}
/// Return `true` if the story file in the given stage has `review_hold: true` in its front matter.
fn has_review_hold(project_root: &Path, stage_dir: &str, story_id: &str) -> bool {
use crate::io::story_metadata::parse_front_matter;
let path = project_root
.join(".story_kit")
.join("work")
.join(stage_dir)
.join(format!("{story_id}.md"));
let contents = match std::fs::read_to_string(path) {
Ok(c) => c,
Err(_) => return false,
};
parse_front_matter(&contents)
.ok()
.and_then(|m| m.review_hold)
.unwrap_or(false)
}
/// Return `true` if the story file has `blocked: true` in its front matter.
fn is_story_blocked(project_root: &Path, stage_dir: &str, story_id: &str) -> bool {
use crate::io::story_metadata::parse_front_matter;
let path = project_root
.join(".story_kit")
.join("work")
.join(stage_dir)
.join(format!("{story_id}.md"));
let contents = match std::fs::read_to_string(path) {
Ok(c) => c,
Err(_) => return false,
};
parse_front_matter(&contents)
.ok()
.and_then(|m| m.blocked)
.unwrap_or(false)
}
/// Return `true` if the story file has a `merge_failure` field in its front matter.
fn has_merge_failure(project_root: &Path, stage_dir: &str, story_id: &str) -> bool {
use crate::io::story_metadata::parse_front_matter;
let path = project_root
.join(".story_kit")
.join("work")
.join(stage_dir)
.join(format!("{story_id}.md"));
let contents = match std::fs::read_to_string(path) {
Ok(c) => c,
Err(_) => return false,
};
parse_front_matter(&contents)
.ok()
.and_then(|m| m.merge_failure)
.is_some()
}
/// Return `true` if `agent_name` has no active (pending/running) entry in the pool.
pub(super) fn is_agent_free(agents: &HashMap<String, StoryAgent>, agent_name: &str) -> bool {
!agents.values().any(|a| {
a.agent_name == agent_name
&& matches!(a.status, AgentStatus::Running | AgentStatus::Pending)
})
}
fn scan_stage_items(project_root: &Path, stage_dir: &str) -> Vec<String> {
let dir = project_root.join(".story_kit").join("work").join(stage_dir);
if !dir.is_dir() {
return Vec::new();
}
let mut items = Vec::new();
if let Ok(entries) = std::fs::read_dir(&dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("md")
&& let Some(stem) = path.file_stem().and_then(|s| s.to_str())
{
items.push(stem.to_string());
}
}
}
items.sort();
items
}
/// Return `true` if `story_id` has any active (pending/running) agent matching `stage`.
///
/// Uses the explicit `stage` config field when the agent is found in `config`;
/// falls back to the legacy name-based heuristic for unlisted agents.
fn is_story_assigned_for_stage(
config: &ProjectConfig,
agents: &HashMap<String, StoryAgent>,
story_id: &str,
stage: &PipelineStage,
) -> bool {
agents.iter().any(|(key, agent)| {
// Composite key format: "{story_id}:{agent_name}"
let key_story_id = key.rsplit_once(':').map(|(sid, _)| sid).unwrap_or(key);
let agent_stage = config
.find_agent(&agent.agent_name)
.map(agent_config_stage)
.unwrap_or_else(|| pipeline_stage(&agent.agent_name));
key_story_id == story_id
&& agent_stage == *stage
&& matches!(agent.status, AgentStatus::Running | AgentStatus::Pending)
})
}
/// Count active (pending/running) agents for a given pipeline stage.
fn count_active_agents_for_stage(
config: &ProjectConfig,
agents: &HashMap<String, StoryAgent>,
stage: &PipelineStage,
) -> usize {
agents
.values()
.filter(|a| {
matches!(a.status, AgentStatus::Running | AgentStatus::Pending)
&& config
.find_agent(&a.agent_name)
.map(|cfg| agent_config_stage(cfg) == *stage)
.unwrap_or_else(|| pipeline_stage(&a.agent_name) == *stage)
})
.count()
}
/// Find the first configured agent for `stage` that has no active (pending/running) assignment.
/// Returns `None` if all agents for that stage are busy, none are configured,
/// or the `max_coders` limit has been reached (for the Coder stage).
///
/// For the Coder stage, when `default_coder_model` is set, only considers agents whose
/// model matches the default. This ensures opus-class agents are reserved for explicit
/// front-matter requests.
pub(super) fn find_free_agent_for_stage<'a>(
config: &'a ProjectConfig,
agents: &HashMap<String, StoryAgent>,
stage: &PipelineStage,
) -> Option<&'a str> {
// Enforce max_coders limit for the Coder stage.
if *stage == PipelineStage::Coder
&& let Some(max) = config.max_coders
{
let active = count_active_agents_for_stage(config, agents, stage);
if active >= max {
return None;
}
}
for agent_config in &config.agent {
if agent_config_stage(agent_config) != *stage {
continue;
}
// When default_coder_model is set, only auto-assign coder agents whose
// model matches. This keeps opus agents reserved for explicit requests.
if *stage == PipelineStage::Coder
&& let Some(ref default_model) = config.default_coder_model
{
let agent_model = agent_config.model.as_deref().unwrap_or("");
if agent_model != default_model {
continue;
}
}
let is_busy = agents.values().any(|a| {
a.agent_name == agent_config.name
&& matches!(a.status, AgentStatus::Running | AgentStatus::Pending)
});
if !is_busy {
return Some(&agent_config.name);
}
}
None
}
/// Scan the agent pool for Running entries whose backing tokio task has already
/// finished and mark them as Failed.
///
/// This handles the case where the PTY read loop or the spawned task exits
/// without updating the agent status — for example when the process is killed
/// externally and the PTY master fd returns EOF before our inactivity timeout
/// fires, but some other edge case prevents the normal cleanup path from running.
fn check_orphaned_agents(agents: &Mutex<HashMap<String, StoryAgent>>) -> usize {
let mut lock = match agents.lock() {
Ok(l) => l,
Err(_) => return 0,
};
// Collect orphaned entries: Running or Pending agents whose task handle is finished.
// Pending agents can be orphaned if worktree creation panics before setting status.
let orphaned: Vec<(String, String, broadcast::Sender<AgentEvent>, AgentStatus)> = lock
.iter()
.filter_map(|(key, agent)| {
if matches!(agent.status, AgentStatus::Running | AgentStatus::Pending)
&& let Some(handle) = &agent.task_handle
&& handle.is_finished()
{
let story_id = key
.rsplit_once(':')
.map(|(s, _)| s.to_string())
.unwrap_or_else(|| key.clone());
return Some((
key.clone(),
story_id,
agent.tx.clone(),
agent.status.clone(),
));
}
None
})
.collect();
let count = orphaned.len();
for (key, story_id, tx, prev_status) in orphaned {
if let Some(agent) = lock.get_mut(&key) {
agent.status = AgentStatus::Failed;
slog!(
"[watchdog] Orphaned agent '{key}': task finished but status was {prev_status}. \
Marking Failed."
);
let _ = tx.send(AgentEvent::Error {
story_id,
agent_name: agent.agent_name.clone(),
message: "Agent process terminated unexpectedly (watchdog detected orphan)"
.to_string(),
});
}
}
count
}
// ── Tests ──────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ProjectConfig;
use crate::io::watcher::WatcherEvent;
use std::process::Command;
use super::super::{AgentPool, StoryAgent, composite_key};
fn make_config(toml_str: &str) -> ProjectConfig {
ProjectConfig::parse(toml_str).unwrap()
}
fn init_git_repo(repo: &std::path::Path) {
Command::new("git")
.args(["init"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.email", "test@test.com"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.name", "Test"])
.current_dir(repo)
.output()
.unwrap();
// Create initial commit so master branch exists.
std::fs::write(repo.join("README.md"), "# test\n").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "initial"])
.current_dir(repo)
.output()
.unwrap();
}
fn make_test_story_agent(agent_name: &str, status: AgentStatus) -> StoryAgent {
StoryAgent {
agent_name: agent_name.to_string(),
status,
worktree_info: None,
session_id: None,
tx: broadcast::channel(1).0,
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: None,
project_root: None,
log_session_id: None,
merge_failure_reported: false,
}
}
// ── auto-assign helper tests ───────────────────────────────────
#[test]
fn scan_stage_items_returns_empty_for_missing_dir() {
let tmp = tempfile::tempdir().unwrap();
let items = scan_stage_items(tmp.path(), "2_current");
assert!(items.is_empty());
}
#[test]
fn scan_stage_items_returns_sorted_story_ids() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let stage_dir = tmp.path().join(".story_kit").join("work").join("2_current");
fs::create_dir_all(&stage_dir).unwrap();
fs::write(stage_dir.join("42_story_foo.md"), "---\nname: foo\n---").unwrap();
fs::write(stage_dir.join("10_story_bar.md"), "---\nname: bar\n---").unwrap();
fs::write(stage_dir.join("5_story_baz.md"), "---\nname: baz\n---").unwrap();
// non-md file should be ignored
fs::write(stage_dir.join("README.txt"), "ignore me").unwrap();
let items = scan_stage_items(tmp.path(), "2_current");
assert_eq!(items, vec!["10_story_bar", "42_story_foo", "5_story_baz"]);
}
#[test]
fn is_story_assigned_returns_true_for_running_coder() {
let config = ProjectConfig::default();
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running);
let agents = pool.agents.lock().unwrap();
assert!(is_story_assigned_for_stage(
&config,
&agents,
"42_story_foo",
&PipelineStage::Coder
));
// Same story but wrong stage — should be false
assert!(!is_story_assigned_for_stage(
&config,
&agents,
"42_story_foo",
&PipelineStage::Qa
));
// Different story — should be false
assert!(!is_story_assigned_for_stage(
&config,
&agents,
"99_story_other",
&PipelineStage::Coder
));
}
#[test]
fn is_story_assigned_returns_false_for_completed_agent() {
let config = ProjectConfig::default();
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Completed);
let agents = pool.agents.lock().unwrap();
// Completed agents don't count as assigned
assert!(!is_story_assigned_for_stage(
&config,
&agents,
"42_story_foo",
&PipelineStage::Coder
));
}
#[test]
fn is_story_assigned_uses_config_stage_field_for_nonstandard_names() {
let config = ProjectConfig::parse(
r#"
[[agent]]
name = "qa-2"
stage = "qa"
"#,
)
.unwrap();
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("42_story_foo", "qa-2", AgentStatus::Running);
let agents = pool.agents.lock().unwrap();
// qa-2 with stage=qa should be recognised as a QA agent
assert!(
is_story_assigned_for_stage(&config, &agents, "42_story_foo", &PipelineStage::Qa),
"qa-2 should be detected as assigned to QA stage"
);
// Should NOT appear as a coder
assert!(
!is_story_assigned_for_stage(&config, &agents, "42_story_foo", &PipelineStage::Coder),
"qa-2 should not be detected as a coder"
);
}
#[test]
fn find_free_agent_returns_none_when_all_busy() {
let config = ProjectConfig::parse(
r#"
[[agent]]
name = "coder-1"
[[agent]]
name = "coder-2"
"#,
)
.unwrap();
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("s1", "coder-1", AgentStatus::Running);
pool.inject_test_agent("s2", "coder-2", AgentStatus::Running);
let agents = pool.agents.lock().unwrap();
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert!(free.is_none(), "no free coders should be available");
}
#[test]
fn find_free_agent_returns_first_free_coder() {
let config = ProjectConfig::parse(
r#"
[[agent]]
name = "coder-1"
[[agent]]
name = "coder-2"
[[agent]]
name = "coder-3"
"#,
)
.unwrap();
let pool = AgentPool::new_test(3001);
// coder-1 is busy, coder-2 is free
pool.inject_test_agent("s1", "coder-1", AgentStatus::Running);
let agents = pool.agents.lock().unwrap();
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert_eq!(
free,
Some("coder-2"),
"coder-2 should be the first free coder"
);
}
#[test]
fn find_free_agent_ignores_completed_agents() {
let config = ProjectConfig::parse(
r#"
[[agent]]
name = "coder-1"
"#,
)
.unwrap();
let pool = AgentPool::new_test(3001);
// coder-1 completed its previous story — it's free for a new one
pool.inject_test_agent("s1", "coder-1", AgentStatus::Completed);
let agents = pool.agents.lock().unwrap();
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert_eq!(free, Some("coder-1"), "completed coder-1 should be free");
}
#[test]
fn find_free_agent_returns_none_for_wrong_stage() {
let config = ProjectConfig::parse(
r#"
[[agent]]
name = "qa"
"#,
)
.unwrap();
let agents: HashMap<String, StoryAgent> = HashMap::new();
// Looking for a Coder but only QA is configured
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert!(free.is_none());
// Looking for QA should find it
let free_qa = find_free_agent_for_stage(&config, &agents, &PipelineStage::Qa);
assert_eq!(free_qa, Some("qa"));
}
#[test]
fn find_free_agent_uses_config_stage_field_not_name() {
// Agents named "qa-2" and "coder-opus" don't match the legacy name heuristic
// but should be picked up via their explicit stage field.
let config = ProjectConfig::parse(
r#"
[[agent]]
name = "qa-2"
stage = "qa"
[[agent]]
name = "coder-opus"
stage = "coder"
"#,
)
.unwrap();
let agents: HashMap<String, StoryAgent> = HashMap::new();
// qa-2 should be found for PipelineStage::Qa via config stage field
let free_qa = find_free_agent_for_stage(&config, &agents, &PipelineStage::Qa);
assert_eq!(free_qa, Some("qa-2"), "qa-2 with stage=qa should be found");
// coder-opus should be found for PipelineStage::Coder via config stage field
let free_coder = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert_eq!(
free_coder,
Some("coder-opus"),
"coder-opus with stage=coder should be found"
);
// Neither should match the other stage
let free_merge = find_free_agent_for_stage(&config, &agents, &PipelineStage::Mergemaster);
assert!(free_merge.is_none());
}
// ── check_orphaned_agents return value tests (bug 161) ──────────────────
#[tokio::test]
async fn check_orphaned_agents_returns_count_of_orphaned_agents() {
let pool = AgentPool::new_test(3001);
// Spawn two tasks that finish immediately.
let h1 = tokio::spawn(async {});
let h2 = tokio::spawn(async {});
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(h1.is_finished());
assert!(h2.is_finished());
pool.inject_test_agent_with_handle("story_a", "coder", AgentStatus::Running, h1);
pool.inject_test_agent_with_handle("story_b", "coder", AgentStatus::Running, h2);
let found = check_orphaned_agents(&pool.agents);
assert_eq!(found, 2, "should detect both orphaned agents");
}
#[test]
fn check_orphaned_agents_returns_zero_when_no_orphans() {
let pool = AgentPool::new_test(3001);
// Inject agents in terminal states — not orphaned.
pool.inject_test_agent("story_a", "coder", AgentStatus::Completed);
pool.inject_test_agent("story_b", "qa", AgentStatus::Failed);
let found = check_orphaned_agents(&pool.agents);
assert_eq!(
found, 0,
"no orphans should be detected for terminal agents"
);
}
#[tokio::test]
async fn watchdog_detects_orphaned_running_agent() {
let pool = AgentPool::new_test(3001);
let handle = tokio::spawn(async {});
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(
handle.is_finished(),
"task should be finished before injection"
);
let tx = pool.inject_test_agent_with_handle(
"orphan_story",
"coder",
AgentStatus::Running,
handle,
);
let mut rx = tx.subscribe();
pool.run_watchdog_once();
{
let agents = pool.agents.lock().unwrap();
let key = composite_key("orphan_story", "coder");
let agent = agents.get(&key).unwrap();
assert_eq!(
agent.status,
AgentStatus::Failed,
"watchdog must mark an orphaned Running agent as Failed"
);
}
let event = rx.try_recv().expect("watchdog must emit an Error event");
assert!(
matches!(event, AgentEvent::Error { .. }),
"expected AgentEvent::Error, got: {event:?}"
);
}
#[tokio::test]
async fn watchdog_orphan_detection_returns_nonzero_enabling_auto_assign() {
// This test verifies the contract that `check_orphaned_agents` returns
// a non-zero count when orphans exist, which the watchdog uses to
// decide whether to trigger auto-assign (bug 161).
let pool = AgentPool::new_test(3001);
let handle = tokio::spawn(async {});
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
pool.inject_test_agent_with_handle("orphan_story", "coder", AgentStatus::Running, handle);
// Before watchdog: agent is Running.
{
let agents = pool.agents.lock().unwrap();
let key = composite_key("orphan_story", "coder");
assert_eq!(agents.get(&key).unwrap().status, AgentStatus::Running);
}
// Run watchdog pass — should return 1 (orphan found).
let found = check_orphaned_agents(&pool.agents);
assert_eq!(
found, 1,
"watchdog must return 1 for a single orphaned agent"
);
// After watchdog: agent is Failed.
{
let agents = pool.agents.lock().unwrap();
let key = composite_key("orphan_story", "coder");
assert_eq!(
agents.get(&key).unwrap().status,
AgentStatus::Failed,
"orphaned agent must be marked Failed"
);
}
}
// ── auto_assign_available_work tests ──────────────────────────────────────
/// Story 203: auto_assign_available_work must detect a story in 2_current/
/// with no active agent and start an agent for it.
#[tokio::test]
async fn auto_assign_picks_up_story_queued_in_current() {
let tmp = tempfile::tempdir().unwrap();
let sk = tmp.path().join(".story_kit");
let current = sk.join("work/2_current");
std::fs::create_dir_all(&current).unwrap();
std::fs::write(
sk.join("project.toml"),
"[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n",
)
.unwrap();
// Place the story in 2_current/ (simulating the "queued" state).
std::fs::write(current.join("story-3.md"), "---\nname: Story 3\n---\n").unwrap();
let pool = AgentPool::new_test(3001);
// No agents are running — coder-1 is free.
// auto_assign will try to call start_agent, which will attempt to create
// a worktree (will fail without a git repo) — that is fine. We only need
// to verify the agent is registered as Pending before the background
// task eventually fails.
pool.auto_assign_available_work(tmp.path()).await;
let agents = pool.agents.lock().unwrap();
let has_pending = agents.values().any(|a| {
a.agent_name == "coder-1"
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
});
assert!(
has_pending,
"auto_assign should have started coder-1 for story-3, but pool is empty"
);
}
/// Story 265: auto_assign_available_work must skip spikes in 3_qa/ that
/// have review_hold: true set in their front matter.
#[tokio::test]
async fn auto_assign_skips_spikes_with_review_hold() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Create project.toml with a QA agent.
let sk = root.join(".story_kit");
std::fs::create_dir_all(&sk).unwrap();
std::fs::write(
sk.join("project.toml"),
"[[agents]]\nname = \"qa\"\nrole = \"qa\"\nmodel = \"test\"\nprompt = \"test\"\n",
)
.unwrap();
// Put a spike in 3_qa/ with review_hold: true.
let qa_dir = root.join(".story_kit/work/3_qa");
std::fs::create_dir_all(&qa_dir).unwrap();
std::fs::write(
qa_dir.join("20_spike_test.md"),
"---\nname: Test Spike\nreview_hold: true\n---\n# Spike\n",
)
.unwrap();
let (watcher_tx, _) = broadcast::channel::<WatcherEvent>(4);
let pool = AgentPool::new(3001, watcher_tx);
pool.auto_assign_available_work(root).await;
// No agent should have been started for the spike.
let agents = pool.agents.lock().unwrap();
assert!(
agents.is_empty(),
"No agents should be assigned to a spike with review_hold"
);
}
// ── Story 279: auto-assign respects agent stage from front matter ──────────
/// When a story in 3_qa/ has `agent: coder-1` in its front matter but
/// coder-1 is a coder-stage agent, auto-assign must NOT assign coder-1.
/// Instead it should fall back to a free QA-stage agent.
#[tokio::test]
async fn auto_assign_ignores_coder_preference_when_story_is_in_qa_stage() {
let tmp = tempfile::tempdir().unwrap();
let sk = tmp.path().join(".story_kit");
let qa_dir = sk.join("work/3_qa");
std::fs::create_dir_all(&qa_dir).unwrap();
std::fs::write(
sk.join("project.toml"),
"[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\
[[agent]]\nname = \"qa-1\"\nstage = \"qa\"\n",
)
.unwrap();
// Story in 3_qa/ with a preferred coder-stage agent.
std::fs::write(
qa_dir.join("story-qa1.md"),
"---\nname: QA Story\nagent: coder-1\n---\n",
)
.unwrap();
let pool = AgentPool::new_test(3001);
pool.auto_assign_available_work(tmp.path()).await;
let agents = pool.agents.lock().unwrap();
// coder-1 must NOT have been assigned (wrong stage for 3_qa/).
let coder_assigned = agents.values().any(|a| {
a.agent_name == "coder-1"
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
});
assert!(
!coder_assigned,
"coder-1 should not be assigned to a QA-stage story"
);
// qa-1 should have been assigned instead.
let qa_assigned = agents.values().any(|a| {
a.agent_name == "qa-1"
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
});
assert!(
qa_assigned,
"qa-1 should be assigned as fallback for the QA-stage story"
);
}
/// When a story in 2_current/ has `agent: coder-1` in its front matter and
/// coder-1 is a coder-stage agent, auto-assign must respect the preference
/// and assign coder-1 (not fall back to some other coder).
#[tokio::test]
async fn auto_assign_respects_coder_preference_when_story_is_in_current_stage() {
let tmp = tempfile::tempdir().unwrap();
let sk = tmp.path().join(".story_kit");
let current_dir = sk.join("work/2_current");
std::fs::create_dir_all(&current_dir).unwrap();
std::fs::write(
sk.join("project.toml"),
"[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\
[[agent]]\nname = \"coder-2\"\nstage = \"coder\"\n",
)
.unwrap();
// Story in 2_current/ with a preferred coder-1 agent.
std::fs::write(
current_dir.join("story-pref.md"),
"---\nname: Coder Story\nagent: coder-1\n---\n",
)
.unwrap();
let pool = AgentPool::new_test(3001);
pool.auto_assign_available_work(tmp.path()).await;
let agents = pool.agents.lock().unwrap();
// coder-1 should have been picked (it matches the stage and is preferred).
let coder1_assigned = agents.values().any(|a| {
a.agent_name == "coder-1"
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
});
assert!(
coder1_assigned,
"coder-1 should be assigned when it matches the stage and is preferred"
);
// coder-2 must NOT be assigned (not preferred).
let coder2_assigned = agents.values().any(|a| {
a.agent_name == "coder-2"
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
});
assert!(
!coder2_assigned,
"coder-2 should not be assigned when coder-1 is explicitly preferred"
);
}
/// When the preferred agent's stage mismatches and no other agent of the
/// correct stage is available, auto-assign must not start any agent for that
/// story (no panic, no error).
#[tokio::test]
async fn auto_assign_stage_mismatch_with_no_fallback_starts_no_agent() {
let tmp = tempfile::tempdir().unwrap();
let sk = tmp.path().join(".story_kit");
let qa_dir = sk.join("work/3_qa");
std::fs::create_dir_all(&qa_dir).unwrap();
// Only a coder agent is configured — no QA agent exists.
std::fs::write(
sk.join("project.toml"),
"[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n",
)
.unwrap();
// Story in 3_qa/ requests coder-1 (wrong stage) and no QA agent exists.
std::fs::write(
qa_dir.join("story-noqa.md"),
"---\nname: QA Story No Agent\nagent: coder-1\n---\n",
)
.unwrap();
let pool = AgentPool::new_test(3001);
// Must not panic.
pool.auto_assign_available_work(tmp.path()).await;
let agents = pool.agents.lock().unwrap();
assert!(
agents.is_empty(),
"No agent should be started when no stage-appropriate agent is available"
);
}
/// Two concurrent auto_assign_available_work calls must not assign the same
/// agent to two stories simultaneously. After both complete, at most one
/// Pending/Running entry must exist per agent name.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn toctou_concurrent_auto_assign_no_duplicate_agent_assignments() {
use std::fs;
use std::sync::Arc;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path().to_path_buf();
let sk_dir = root.join(".story_kit");
// Two stories waiting in 2_current, one coder agent.
fs::create_dir_all(sk_dir.join("work/2_current")).unwrap();
fs::write(
sk_dir.join("project.toml"),
"[[agent]]\nname = \"coder-1\"\n",
)
.unwrap();
fs::write(
sk_dir.join("work/2_current/86_story_foo.md"),
"---\nname: Foo\n---\n",
)
.unwrap();
fs::write(
sk_dir.join("work/2_current/130_story_bar.md"),
"---\nname: Bar\n---\n",
)
.unwrap();
let pool = Arc::new(AgentPool::new_test(3099));
// Run two concurrent auto_assign calls.
let pool1 = pool.clone();
let root1 = root.clone();
let t1 = tokio::spawn(async move { pool1.auto_assign_available_work(&root1).await });
let pool2 = pool.clone();
let root2 = root.clone();
let t2 = tokio::spawn(async move { pool2.auto_assign_available_work(&root2).await });
let _ = tokio::join!(t1, t2);
// At most one Pending/Running entry should exist for coder-1.
let agents = pool.agents.lock().unwrap();
let active_coder_count = agents
.values()
.filter(|a| {
a.agent_name == "coder-1"
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
})
.count();
assert!(
active_coder_count <= 1,
"coder-1 must not be assigned to more than one story simultaneously; \
found {active_coder_count} active entries"
);
}
// ── has_review_hold tests ────────────────────────────────────────────────
#[test]
fn has_review_hold_returns_true_when_set() {
let tmp = tempfile::tempdir().unwrap();
let qa_dir = tmp.path().join(".story_kit/work/3_qa");
std::fs::create_dir_all(&qa_dir).unwrap();
let spike_path = qa_dir.join("10_spike_research.md");
std::fs::write(
&spike_path,
"---\nname: Research spike\nreview_hold: true\n---\n# Spike\n",
)
.unwrap();
assert!(has_review_hold(tmp.path(), "3_qa", "10_spike_research"));
}
#[test]
fn has_review_hold_returns_false_when_not_set() {
let tmp = tempfile::tempdir().unwrap();
let qa_dir = tmp.path().join(".story_kit/work/3_qa");
std::fs::create_dir_all(&qa_dir).unwrap();
let spike_path = qa_dir.join("10_spike_research.md");
std::fs::write(&spike_path, "---\nname: Research spike\n---\n# Spike\n").unwrap();
assert!(!has_review_hold(tmp.path(), "3_qa", "10_spike_research"));
}
#[test]
fn has_review_hold_returns_false_when_file_missing() {
let tmp = tempfile::tempdir().unwrap();
assert!(!has_review_hold(tmp.path(), "3_qa", "99_spike_missing"));
}
// ── find_free_agent_for_stage: default_coder_model filtering ─────────
#[test]
fn find_free_agent_skips_opus_when_default_coder_model_set() {
let config = make_config(
r#"
default_coder_model = "sonnet"
[[agent]]
name = "coder-1"
stage = "coder"
model = "sonnet"
[[agent]]
name = "coder-opus"
stage = "coder"
model = "opus"
"#,
);
let agents = HashMap::new();
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert_eq!(free, Some("coder-1"));
}
#[test]
fn find_free_agent_returns_opus_when_no_default_coder_model() {
let config = make_config(
r#"
[[agent]]
name = "coder-opus"
stage = "coder"
model = "opus"
"#,
);
let agents = HashMap::new();
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert_eq!(free, Some("coder-opus"));
}
#[test]
fn find_free_agent_returns_none_when_all_sonnet_coders_busy() {
let config = make_config(
r#"
default_coder_model = "sonnet"
[[agent]]
name = "coder-1"
stage = "coder"
model = "sonnet"
[[agent]]
name = "coder-opus"
stage = "coder"
model = "opus"
"#,
);
let mut agents = HashMap::new();
agents.insert(
"story1:coder-1".to_string(),
make_test_story_agent("coder-1", AgentStatus::Running),
);
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert_eq!(free, None, "opus agent should not be auto-assigned");
}
// ── find_free_agent_for_stage: max_coders limit ─────────────────────
#[test]
fn find_free_agent_respects_max_coders() {
let config = make_config(
r#"
max_coders = 1
[[agent]]
name = "coder-1"
stage = "coder"
model = "sonnet"
[[agent]]
name = "coder-2"
stage = "coder"
model = "sonnet"
"#,
);
let mut agents = HashMap::new();
agents.insert(
"story1:coder-1".to_string(),
make_test_story_agent("coder-1", AgentStatus::Running),
);
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert_eq!(free, None, "max_coders=1 should block second coder");
}
#[test]
fn find_free_agent_allows_within_max_coders() {
let config = make_config(
r#"
max_coders = 2
[[agent]]
name = "coder-1"
stage = "coder"
model = "sonnet"
[[agent]]
name = "coder-2"
stage = "coder"
model = "sonnet"
"#,
);
let mut agents = HashMap::new();
agents.insert(
"story1:coder-1".to_string(),
make_test_story_agent("coder-1", AgentStatus::Running),
);
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert_eq!(free, Some("coder-2"));
}
#[test]
fn max_coders_does_not_affect_qa_stage() {
let config = make_config(
r#"
max_coders = 1
[[agent]]
name = "qa"
stage = "qa"
model = "sonnet"
"#,
);
let agents = HashMap::new();
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Qa);
assert_eq!(free, Some("qa"));
}
// ── count_active_agents_for_stage ────────────────────────────────────
#[test]
fn count_active_agents_counts_running_and_pending() {
let config = make_config(
r#"
[[agent]]
name = "coder-1"
stage = "coder"
[[agent]]
name = "coder-2"
stage = "coder"
"#,
);
let mut agents = HashMap::new();
agents.insert(
"s1:coder-1".to_string(),
make_test_story_agent("coder-1", AgentStatus::Running),
);
agents.insert(
"s2:coder-2".to_string(),
make_test_story_agent("coder-2", AgentStatus::Completed),
);
let count = count_active_agents_for_stage(&config, &agents, &PipelineStage::Coder);
assert_eq!(count, 1, "Only Running coder should be counted, not Completed");
}
// ── reconcile_on_startup tests ────────────────────────────────────────────
#[tokio::test]
async fn reconcile_on_startup_noop_when_no_worktrees() {
let tmp = tempfile::tempdir().unwrap();
let pool = AgentPool::new_test(3001);
let (tx, _rx) = broadcast::channel(16);
// Should not panic; no worktrees to reconcile.
pool.reconcile_on_startup(tmp.path(), &tx).await;
}
#[tokio::test]
async fn reconcile_on_startup_emits_done_event() {
let tmp = tempfile::tempdir().unwrap();
let pool = AgentPool::new_test(3001);
let (tx, mut rx) = broadcast::channel::<ReconciliationEvent>(16);
pool.reconcile_on_startup(tmp.path(), &tx).await;
// Collect all events; the last must be "done".
let mut events: Vec<ReconciliationEvent> = Vec::new();
while let Ok(evt) = rx.try_recv() {
events.push(evt);
}
assert!(
events.iter().any(|e| e.status == "done"),
"reconcile_on_startup must emit a 'done' event; got: {:?}",
events.iter().map(|e| &e.status).collect::<Vec<_>>()
);
}
#[tokio::test]
async fn reconcile_on_startup_skips_story_without_committed_work() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Set up story in 2_current/.
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("60_story_test.md"), "test").unwrap();
// Create a worktree directory that is a fresh git repo with no commits
// ahead of its own base branch (simulates a worktree where no work was done).
let wt_dir = root.join(".story_kit/worktrees/60_story_test");
fs::create_dir_all(&wt_dir).unwrap();
init_git_repo(&wt_dir);
let pool = AgentPool::new_test(3001);
let (tx, _rx) = broadcast::channel(16);
pool.reconcile_on_startup(root, &tx).await;
// Story should still be in 2_current/ — nothing was reconciled.
assert!(
current.join("60_story_test.md").exists(),
"story should stay in 2_current/ when worktree has no committed work"
);
}
#[tokio::test]
async fn reconcile_on_startup_runs_gates_on_worktree_with_committed_work() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Set up a git repo for the project root.
init_git_repo(root);
// Set up story in 2_current/ and commit it so the project root is clean.
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("61_story_test.md"), "test").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(root)
.output()
.unwrap();
Command::new("git")
.args([
"-c",
"user.email=test@test.com",
"-c",
"user.name=Test",
"commit",
"-m",
"add story",
])
.current_dir(root)
.output()
.unwrap();
// Create a real git worktree for the story.
let wt_dir = root.join(".story_kit/worktrees/61_story_test");
fs::create_dir_all(wt_dir.parent().unwrap()).unwrap();
Command::new("git")
.args([
"worktree",
"add",
&wt_dir.to_string_lossy(),
"-b",
"feature/story-61_story_test",
])
.current_dir(root)
.output()
.unwrap();
// Add a commit to the feature branch (simulates coder completing work).
fs::write(wt_dir.join("implementation.txt"), "done").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(&wt_dir)
.output()
.unwrap();
Command::new("git")
.args([
"-c",
"user.email=test@test.com",
"-c",
"user.name=Test",
"commit",
"-m",
"implement story",
])
.current_dir(&wt_dir)
.output()
.unwrap();
assert!(
crate::agents::gates::worktree_has_committed_work(&wt_dir),
"test setup: worktree should have committed work"
);
let pool = AgentPool::new_test(3001);
let (tx, _rx) = broadcast::channel(16);
pool.reconcile_on_startup(root, &tx).await;
// In the test env, cargo clippy will fail (no Cargo.toml) so gates fail
// and the story stays in 2_current/. The important assertion is that
// reconcile ran without panicking and the story is in a consistent state.
let in_current = current.join("61_story_test.md").exists();
let in_qa = root.join(".story_kit/work/3_qa/61_story_test.md").exists();
assert!(
in_current || in_qa,
"story should be in 2_current/ or 3_qa/ after reconciliation"
);
}
}