diff --git a/server/src/agents/pool/auto_assign.rs b/server/src/agents/pool/auto_assign.rs deleted file mode 100644 index a8c35af7..00000000 --- a/server/src/agents/pool/auto_assign.rs +++ /dev/null @@ -1,1813 +0,0 @@ -//! Auto-assign logic: scanning pipeline stages for unassigned stories and -//! dispatching free agents, startup reconciliation, and the watchdog task. - -use crate::config::ProjectConfig; -use crate::slog; -use crate::slog_error; -use crate::slog_warn; -use crate::worktree; -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; -use tokio::sync::broadcast; - -use super::super::{ - AgentEvent, AgentStatus, PipelineStage, ReconciliationEvent, agent_config_stage, pipeline_stage, -}; -use super::{AgentPool, StoryAgent, find_active_story_stage}; - -impl AgentPool { - pub async fn auto_assign_available_work(&self, project_root: &Path) { - let config = match ProjectConfig::load(project_root) { - Ok(c) => c, - Err(e) => { - slog_warn!("[auto-assign] Failed to load project config: {e}"); - return; - } - }; - - // Process each active pipeline stage in order. - let stages: [(&str, PipelineStage); 3] = [ - ("2_current", PipelineStage::Coder), - ("3_qa", PipelineStage::Qa), - ("4_merge", PipelineStage::Mergemaster), - ]; - - for (stage_dir, stage) in &stages { - let items = scan_stage_items(project_root, stage_dir); - if items.is_empty() { - continue; - } - - for story_id in &items { - // Items marked with review_hold (e.g. spikes after QA passes) stay - // in their current stage for human review — don't auto-assign agents. - if has_review_hold(project_root, stage_dir, story_id) { - continue; - } - - // Skip blocked stories (retry limit exceeded). - if is_story_blocked(project_root, stage_dir, story_id) { - continue; - } - - // Skip stories in 4_merge/ that already have a reported merge failure. - // These need human intervention — auto-assigning a new mergemaster - // would just waste tokens on the same broken merge. - if *stage == PipelineStage::Mergemaster - && has_merge_failure(project_root, stage_dir, story_id) - { - continue; - } - - // AC6: Detect empty-diff stories in 4_merge/ before starting a - // mergemaster. If the worktree has no commits on the feature branch, - // write a merge_failure and block the story immediately. - if *stage == PipelineStage::Mergemaster - && let Some(wt_path) = worktree::find_worktree_path(project_root, story_id) - && !super::super::gates::worktree_has_committed_work(&wt_path) - { - slog_warn!( - "[auto-assign] Story '{story_id}' in 4_merge/ has no commits \ - on feature branch. Writing merge_failure and blocking." - ); - let story_path = project_root - .join(".storkit/work") - .join(stage_dir) - .join(format!("{story_id}.md")); - let _ = crate::io::story_metadata::write_merge_failure( - &story_path, - "Feature branch has no code changes — the coder agent \ - did not produce any commits.", - ); - let _ = crate::io::story_metadata::write_blocked(&story_path); - continue; - } - - // Re-acquire the lock on each iteration to see state changes - // from previous start_agent calls in the same pass. - let preferred_agent = - read_story_front_matter_agent(project_root, stage_dir, story_id); - - // Check max_coders limit for the Coder stage before agent selection. - // If the pool is full, all remaining items in this stage wait. - if *stage == PipelineStage::Coder - && let Some(max) = config.max_coders - { - let agents_lock = match self.agents.lock() { - Ok(a) => a, - Err(e) => { - slog_error!("[auto-assign] Failed to lock agents: {e}"); - break; - } - }; - let active = count_active_agents_for_stage(&config, &agents_lock, stage); - if active >= max { - slog!( - "[auto-assign] Coder pool full ({active}/{max}); remaining items in {stage_dir}/ will wait." - ); - break; - } - } - - // Outcome: (already_assigned, chosen_agent, preferred_busy, stage_mismatch) - // preferred_busy=true means the story has a specific agent requested but it is - // currently occupied — the story should wait rather than fall back. - // stage_mismatch=true means the preferred agent's stage doesn't match the - // pipeline stage, so we fell back to a generic stage agent. - let (already_assigned, free_agent, preferred_busy, stage_mismatch) = { - let agents = match self.agents.lock() { - Ok(a) => a, - Err(e) => { - slog_error!("[auto-assign] Failed to lock agents: {e}"); - break; - } - }; - let assigned = is_story_assigned_for_stage(&config, &agents, story_id, stage); - if assigned { - (true, None, false, false) - } else if let Some(ref pref) = preferred_agent { - // Story has a front-matter agent preference. - // Verify the preferred agent's stage matches the current - // pipeline stage — a coder shouldn't be assigned to QA. - let pref_stage_matches = config - .find_agent(pref) - .map(|cfg| agent_config_stage(cfg) == *stage) - .unwrap_or(false); - if !pref_stage_matches { - // Stage mismatch — fall back to any free agent for this stage. - let free = find_free_agent_for_stage(&config, &agents, stage) - .map(|s| s.to_string()); - (false, free, false, true) - } else if is_agent_free(&agents, pref) { - (false, Some(pref.clone()), false, false) - } else { - (false, None, true, false) - } - } else { - let free = find_free_agent_for_stage(&config, &agents, stage) - .map(|s| s.to_string()); - (false, free, false, false) - } - }; - - if already_assigned { - // Story already has an active agent — skip silently. - continue; - } - - if preferred_busy { - // The story requests a specific agent that is currently busy. - // Do not fall back to a different agent; let this story wait. - slog!( - "[auto-assign] Preferred agent '{}' busy for '{story_id}'; story will wait.", - preferred_agent.as_deref().unwrap_or("?") - ); - continue; - } - - if stage_mismatch { - slog!( - "[auto-assign] Preferred agent '{}' stage mismatch for '{story_id}' in {stage_dir}/; falling back to stage-appropriate agent.", - preferred_agent.as_deref().unwrap_or("?") - ); - } - - match free_agent { - Some(agent_name) => { - slog!( - "[auto-assign] Assigning '{agent_name}' to '{story_id}' in {stage_dir}/" - ); - if let Err(e) = self - .start_agent(project_root, story_id, Some(&agent_name), None) - .await - { - slog!( - "[auto-assign] Failed to start '{agent_name}' for '{story_id}': {e}" - ); - } - } - None => { - // No free agents of this type — stop scanning this stage. - slog!( - "[auto-assign] All {:?} agents busy; remaining items in {stage_dir}/ will wait.", - stage - ); - break; - } - } - } - } - } - - /// Reconcile stories whose agent work was committed while the server was offline. - /// - /// On server startup the in-memory agent pool is empty, so any story that an agent - /// completed during a previous session is stuck: the worktree has committed work but - /// the pipeline never advanced. This method detects those stories, re-runs the - /// acceptance gates, and advances the pipeline stage so that `auto_assign_available_work` - /// (called immediately after) picks up the right next-stage agents. - /// - /// Algorithm: - /// 1. List all worktree directories under `{project_root}/.storkit/worktrees/`. - /// 2. For each worktree, check whether its feature branch has commits ahead of the - /// base branch (`master` / `main`). - /// 3. If committed work is found AND the story is in `2_current/` or `3_qa/`: - /// - Run acceptance gates (uncommitted-change check + clippy + tests). - /// - On pass + `2_current/`: move the story to `3_qa/`. - /// - On pass + `3_qa/`: run the coverage gate; if that also passes move to `4_merge/`. - /// - On failure: leave the story where it is so `auto_assign_available_work` can - /// start a fresh agent to retry. - /// 4. Stories in `4_merge/` are left for `auto_assign_available_work` to handle via a - /// fresh mergemaster (squash-merge must be re-executed by the mergemaster agent). - pub async fn reconcile_on_startup( - &self, - project_root: &Path, - progress_tx: &broadcast::Sender, - ) { - let worktrees = match worktree::list_worktrees(project_root) { - Ok(wt) => wt, - Err(e) => { - eprintln!("[startup:reconcile] Failed to list worktrees: {e}"); - let _ = progress_tx.send(ReconciliationEvent { - story_id: String::new(), - status: "done".to_string(), - message: format!("Reconciliation failed: {e}"), - }); - return; - } - }; - - for wt_entry in &worktrees { - let story_id = &wt_entry.story_id; - let wt_path = wt_entry.path.clone(); - - // Determine which active stage the story is in. - let stage_dir = match find_active_story_stage(project_root, story_id) { - Some(s) => s, - None => continue, // Not in any active stage (backlog/archived or unknown). - }; - - // 4_merge/ is left for auto_assign to handle with a fresh mergemaster. - if stage_dir == "4_merge" { - continue; - } - - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "checking".to_string(), - message: format!("Checking for committed work in {stage_dir}/"), - }); - - // Check whether the worktree has commits ahead of the base branch. - let wt_path_for_check = wt_path.clone(); - let has_work = tokio::task::spawn_blocking(move || { - super::super::gates::worktree_has_committed_work(&wt_path_for_check) - }) - .await - .unwrap_or(false); - - if !has_work { - eprintln!( - "[startup:reconcile] No committed work for '{story_id}' in {stage_dir}/; skipping." - ); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "skipped".to_string(), - message: "No committed work found; skipping.".to_string(), - }); - continue; - } - - eprintln!( - "[startup:reconcile] Found committed work for '{story_id}' in {stage_dir}/. Running acceptance gates." - ); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "gates_running".to_string(), - message: "Running acceptance gates…".to_string(), - }); - - // Run acceptance gates on the worktree. - let wt_path_for_gates = wt_path.clone(); - let gates_result = tokio::task::spawn_blocking(move || { - super::super::gates::check_uncommitted_changes(&wt_path_for_gates)?; - super::super::gates::run_acceptance_gates(&wt_path_for_gates) - }) - .await; - - let (gates_passed, gate_output) = match gates_result { - Ok(Ok(pair)) => pair, - Ok(Err(e)) => { - eprintln!("[startup:reconcile] Gate check error for '{story_id}': {e}"); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "failed".to_string(), - message: format!("Gate error: {e}"), - }); - continue; - } - Err(e) => { - eprintln!("[startup:reconcile] Gate check task panicked for '{story_id}': {e}"); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "failed".to_string(), - message: format!("Gate task panicked: {e}"), - }); - continue; - } - }; - - if !gates_passed { - eprintln!( - "[startup:reconcile] Gates failed for '{story_id}': {gate_output}\n\ - Leaving in {stage_dir}/ for auto-assign to restart the agent." - ); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "failed".to_string(), - message: "Gates failed; will be retried by auto-assign.".to_string(), - }); - continue; - } - - eprintln!("[startup:reconcile] Gates passed for '{story_id}' (stage: {stage_dir}/)."); - - if stage_dir == "2_current" { - // Coder stage — determine qa mode to decide next step. - let qa_mode = { - let item_type = super::super::lifecycle::item_type_from_id(story_id); - if item_type == "spike" { - crate::io::story_metadata::QaMode::Human - } else { - let default_qa = crate::config::ProjectConfig::load(project_root) - .unwrap_or_default() - .default_qa_mode(); - let story_path = project_root - .join(".storkit/work/2_current") - .join(format!("{story_id}.md")); - crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa) - } - }; - - match qa_mode { - crate::io::story_metadata::QaMode::Server => { - if let Err(e) = super::super::lifecycle::move_story_to_merge(project_root, story_id) { - eprintln!("[startup:reconcile] Failed to move '{story_id}' to 4_merge/: {e}"); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "failed".to_string(), - message: format!("Failed to advance to merge: {e}"), - }); - } else { - eprintln!("[startup:reconcile] Moved '{story_id}' → 4_merge/ (qa: server)."); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "advanced".to_string(), - message: "Gates passed — moved to merge (qa: server).".to_string(), - }); - } - } - crate::io::story_metadata::QaMode::Agent => { - if let Err(e) = super::super::lifecycle::move_story_to_qa(project_root, story_id) { - eprintln!("[startup:reconcile] Failed to move '{story_id}' to 3_qa/: {e}"); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "failed".to_string(), - message: format!("Failed to advance to QA: {e}"), - }); - } else { - eprintln!("[startup:reconcile] Moved '{story_id}' → 3_qa/."); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "advanced".to_string(), - message: "Gates passed — moved to QA.".to_string(), - }); - } - } - crate::io::story_metadata::QaMode::Human => { - if let Err(e) = super::super::lifecycle::move_story_to_qa(project_root, story_id) { - eprintln!("[startup:reconcile] Failed to move '{story_id}' to 3_qa/: {e}"); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "failed".to_string(), - message: format!("Failed to advance to QA: {e}"), - }); - } else { - let story_path = project_root - .join(".storkit/work/3_qa") - .join(format!("{story_id}.md")); - if let Err(e) = crate::io::story_metadata::write_review_hold(&story_path) { - eprintln!( - "[startup:reconcile] Failed to set review_hold on '{story_id}': {e}" - ); - } - eprintln!("[startup:reconcile] Moved '{story_id}' → 3_qa/ (qa: human — holding for review)."); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "review_hold".to_string(), - message: "Gates passed — holding for human review.".to_string(), - }); - } - } - } - } else if stage_dir == "3_qa" { - // QA stage → run coverage gate before advancing to merge. - let wt_path_for_cov = wt_path.clone(); - let coverage_result = tokio::task::spawn_blocking(move || { - super::super::gates::run_coverage_gate(&wt_path_for_cov) - }) - .await; - - let (coverage_passed, coverage_output) = match coverage_result { - Ok(Ok(pair)) => pair, - Ok(Err(e)) => { - eprintln!("[startup:reconcile] Coverage gate error for '{story_id}': {e}"); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "failed".to_string(), - message: format!("Coverage gate error: {e}"), - }); - continue; - } - Err(e) => { - eprintln!( - "[startup:reconcile] Coverage gate panicked for '{story_id}': {e}" - ); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "failed".to_string(), - message: format!("Coverage gate panicked: {e}"), - }); - continue; - } - }; - - if coverage_passed { - // Check whether this item needs human review before merging. - let needs_human_review = { - let item_type = super::super::lifecycle::item_type_from_id(story_id); - if item_type == "spike" { - true - } else { - let story_path = project_root - .join(".storkit/work/3_qa") - .join(format!("{story_id}.md")); - let default_qa = crate::config::ProjectConfig::load(project_root) - .unwrap_or_default() - .default_qa_mode(); - matches!( - crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa), - crate::io::story_metadata::QaMode::Human - ) - } - }; - - if needs_human_review { - let story_path = project_root - .join(".storkit/work/3_qa") - .join(format!("{story_id}.md")); - if let Err(e) = crate::io::story_metadata::write_review_hold(&story_path) { - eprintln!( - "[startup:reconcile] Failed to set review_hold on '{story_id}': {e}" - ); - } - eprintln!( - "[startup:reconcile] '{story_id}' passed QA — holding for human review." - ); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "review_hold".to_string(), - message: "Passed QA — waiting for human review.".to_string(), - }); - } else if let Err(e) = - super::super::lifecycle::move_story_to_merge(project_root, story_id) - { - eprintln!( - "[startup:reconcile] Failed to move '{story_id}' to 4_merge/: {e}" - ); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "failed".to_string(), - message: format!("Failed to advance to merge: {e}"), - }); - } else { - eprintln!("[startup:reconcile] Moved '{story_id}' → 4_merge/."); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "advanced".to_string(), - message: "Gates passed — moved to merge.".to_string(), - }); - } - } else { - eprintln!( - "[startup:reconcile] Coverage gate failed for '{story_id}': {coverage_output}\n\ - Leaving in 3_qa/ for auto-assign to restart the QA agent." - ); - let _ = progress_tx.send(ReconciliationEvent { - story_id: story_id.clone(), - status: "failed".to_string(), - message: "Coverage gate failed; will be retried.".to_string(), - }); - } - } - } - - // Signal that reconciliation is complete. - let _ = progress_tx.send(ReconciliationEvent { - story_id: String::new(), - status: "done".to_string(), - message: "Startup reconciliation complete.".to_string(), - }); - } - - /// Run a single watchdog pass synchronously (test helper). - #[cfg(test)] - pub fn run_watchdog_once(&self) { - check_orphaned_agents(&self.agents); - } - - /// Spawn a background watchdog task that periodically checks for Running agents - /// whose underlying task has already finished (orphaned entries). Any such agent - /// is marked Failed and an Error event is emitted so that `wait_for_agent` unblocks. - /// - /// The watchdog runs every 30 seconds. It is a safety net for edge cases where the - /// PTY read loop exits without updating the agent status (e.g. a panic in the - /// spawn_blocking task, or an external SIGKILL that closes the PTY fd immediately). - /// - /// When orphaned agents are detected and a `project_root` is provided, auto-assign - /// is triggered so that free agents can pick up unassigned work. - pub fn spawn_watchdog(pool: Arc, project_root: Option) { - tokio::spawn(async move { - let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); - loop { - interval.tick().await; - let found = check_orphaned_agents(&pool.agents); - if found > 0 - && let Some(ref root) = project_root - { - slog!("[watchdog] {found} orphaned agent(s) detected; triggering auto-assign."); - pool.auto_assign_available_work(root).await; - } - } - }); - } -} - -// ── Free helper functions ────────────────────────────────────────────────── - -/// Read the optional `agent:` field from the front matter of a story file. -/// -/// Returns `Some(agent_name)` if the front matter specifies an agent, or `None` -/// if the field is absent or the file cannot be read / parsed. -fn read_story_front_matter_agent( - project_root: &Path, - stage_dir: &str, - story_id: &str, -) -> Option { - use crate::io::story_metadata::parse_front_matter; - let path = project_root - .join(".storkit") - .join("work") - .join(stage_dir) - .join(format!("{story_id}.md")); - let contents = std::fs::read_to_string(path).ok()?; - parse_front_matter(&contents).ok()?.agent -} - -/// Return `true` if the story file in the given stage has `review_hold: true` in its front matter. -fn has_review_hold(project_root: &Path, stage_dir: &str, story_id: &str) -> bool { - use crate::io::story_metadata::parse_front_matter; - let path = project_root - .join(".storkit") - .join("work") - .join(stage_dir) - .join(format!("{story_id}.md")); - let contents = match std::fs::read_to_string(path) { - Ok(c) => c, - Err(_) => return false, - }; - parse_front_matter(&contents) - .ok() - .and_then(|m| m.review_hold) - .unwrap_or(false) -} - -/// Return `true` if the story file has `blocked: true` in its front matter. -fn is_story_blocked(project_root: &Path, stage_dir: &str, story_id: &str) -> bool { - use crate::io::story_metadata::parse_front_matter; - let path = project_root - .join(".storkit") - .join("work") - .join(stage_dir) - .join(format!("{story_id}.md")); - let contents = match std::fs::read_to_string(path) { - Ok(c) => c, - Err(_) => return false, - }; - parse_front_matter(&contents) - .ok() - .and_then(|m| m.blocked) - .unwrap_or(false) -} - -/// Return `true` if the story file has a `merge_failure` field in its front matter. -fn has_merge_failure(project_root: &Path, stage_dir: &str, story_id: &str) -> bool { - use crate::io::story_metadata::parse_front_matter; - let path = project_root - .join(".storkit") - .join("work") - .join(stage_dir) - .join(format!("{story_id}.md")); - let contents = match std::fs::read_to_string(path) { - Ok(c) => c, - Err(_) => return false, - }; - parse_front_matter(&contents) - .ok() - .and_then(|m| m.merge_failure) - .is_some() -} - -/// Return `true` if `agent_name` has no active (pending/running) entry in the pool. -pub(super) fn is_agent_free(agents: &HashMap, agent_name: &str) -> bool { - !agents.values().any(|a| { - a.agent_name == agent_name - && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) - }) -} - -fn scan_stage_items(project_root: &Path, stage_dir: &str) -> Vec { - let dir = project_root.join(".storkit").join("work").join(stage_dir); - if !dir.is_dir() { - return Vec::new(); - } - let mut items = Vec::new(); - if let Ok(entries) = std::fs::read_dir(&dir) { - for entry in entries.flatten() { - let path = entry.path(); - if path.extension().and_then(|e| e.to_str()) == Some("md") - && let Some(stem) = path.file_stem().and_then(|s| s.to_str()) - { - items.push(stem.to_string()); - } - } - } - items.sort(); - items -} - -/// Return `true` if `story_id` has any active (pending/running) agent matching `stage`. -/// -/// Uses the explicit `stage` config field when the agent is found in `config`; -/// falls back to the legacy name-based heuristic for unlisted agents. -fn is_story_assigned_for_stage( - config: &ProjectConfig, - agents: &HashMap, - story_id: &str, - stage: &PipelineStage, -) -> bool { - agents.iter().any(|(key, agent)| { - // Composite key format: "{story_id}:{agent_name}" - let key_story_id = key.rsplit_once(':').map(|(sid, _)| sid).unwrap_or(key); - let agent_stage = config - .find_agent(&agent.agent_name) - .map(agent_config_stage) - .unwrap_or_else(|| pipeline_stage(&agent.agent_name)); - key_story_id == story_id - && agent_stage == *stage - && matches!(agent.status, AgentStatus::Running | AgentStatus::Pending) - }) -} - -/// Count active (pending/running) agents for a given pipeline stage. -fn count_active_agents_for_stage( - config: &ProjectConfig, - agents: &HashMap, - stage: &PipelineStage, -) -> usize { - agents - .values() - .filter(|a| { - matches!(a.status, AgentStatus::Running | AgentStatus::Pending) - && config - .find_agent(&a.agent_name) - .map(|cfg| agent_config_stage(cfg) == *stage) - .unwrap_or_else(|| pipeline_stage(&a.agent_name) == *stage) - }) - .count() -} - -/// Find the first configured agent for `stage` that has no active (pending/running) assignment. -/// Returns `None` if all agents for that stage are busy, none are configured, -/// or the `max_coders` limit has been reached (for the Coder stage). -/// -/// For the Coder stage, when `default_coder_model` is set, only considers agents whose -/// model matches the default. This ensures opus-class agents are reserved for explicit -/// front-matter requests. -pub(super) fn find_free_agent_for_stage<'a>( - config: &'a ProjectConfig, - agents: &HashMap, - stage: &PipelineStage, -) -> Option<&'a str> { - // Enforce max_coders limit for the Coder stage. - if *stage == PipelineStage::Coder - && let Some(max) = config.max_coders - { - let active = count_active_agents_for_stage(config, agents, stage); - if active >= max { - return None; - } - } - - for agent_config in &config.agent { - if agent_config_stage(agent_config) != *stage { - continue; - } - // When default_coder_model is set, only auto-assign coder agents whose - // model matches. This keeps opus agents reserved for explicit requests. - if *stage == PipelineStage::Coder - && let Some(ref default_model) = config.default_coder_model - { - let agent_model = agent_config.model.as_deref().unwrap_or(""); - if agent_model != default_model { - continue; - } - } - let is_busy = agents.values().any(|a| { - a.agent_name == agent_config.name - && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) - }); - if !is_busy { - return Some(&agent_config.name); - } - } - None -} - -/// Scan the agent pool for Running entries whose backing tokio task has already -/// finished and mark them as Failed. -/// -/// This handles the case where the PTY read loop or the spawned task exits -/// without updating the agent status — for example when the process is killed -/// externally and the PTY master fd returns EOF before our inactivity timeout -/// fires, but some other edge case prevents the normal cleanup path from running. -fn check_orphaned_agents(agents: &Mutex>) -> usize { - let mut lock = match agents.lock() { - Ok(l) => l, - Err(_) => return 0, - }; - - // Collect orphaned entries: Running or Pending agents whose task handle is finished. - // Pending agents can be orphaned if worktree creation panics before setting status. - let orphaned: Vec<(String, String, broadcast::Sender, AgentStatus)> = lock - .iter() - .filter_map(|(key, agent)| { - if matches!(agent.status, AgentStatus::Running | AgentStatus::Pending) - && let Some(handle) = &agent.task_handle - && handle.is_finished() - { - let story_id = key - .rsplit_once(':') - .map(|(s, _)| s.to_string()) - .unwrap_or_else(|| key.clone()); - return Some(( - key.clone(), - story_id, - agent.tx.clone(), - agent.status.clone(), - )); - } - None - }) - .collect(); - - let count = orphaned.len(); - for (key, story_id, tx, prev_status) in orphaned { - if let Some(agent) = lock.get_mut(&key) { - agent.status = AgentStatus::Failed; - slog!( - "[watchdog] Orphaned agent '{key}': task finished but status was {prev_status}. \ - Marking Failed." - ); - let _ = tx.send(AgentEvent::Error { - story_id, - agent_name: agent.agent_name.clone(), - message: "Agent process terminated unexpectedly (watchdog detected orphan)" - .to_string(), - }); - } - } - count -} - -// ── Tests ────────────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - use crate::config::ProjectConfig; - use crate::io::watcher::WatcherEvent; - use std::process::Command; - - use super::super::{AgentPool, StoryAgent, composite_key}; - - fn make_config(toml_str: &str) -> ProjectConfig { - ProjectConfig::parse(toml_str).unwrap() - } - - fn init_git_repo(repo: &std::path::Path) { - Command::new("git") - .args(["init"]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["config", "user.email", "test@test.com"]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["config", "user.name", "Test"]) - .current_dir(repo) - .output() - .unwrap(); - // Create initial commit so master branch exists. - std::fs::write(repo.join("README.md"), "# test\n").unwrap(); - Command::new("git") - .args(["add", "."]) - .current_dir(repo) - .output() - .unwrap(); - Command::new("git") - .args(["commit", "-m", "initial"]) - .current_dir(repo) - .output() - .unwrap(); - } - - fn make_test_story_agent(agent_name: &str, status: AgentStatus) -> StoryAgent { - StoryAgent { - agent_name: agent_name.to_string(), - status, - worktree_info: None, - session_id: None, - tx: broadcast::channel(1).0, - task_handle: None, - event_log: Arc::new(Mutex::new(Vec::new())), - completion: None, - project_root: None, - log_session_id: None, - merge_failure_reported: false, - } - } - - // ── auto-assign helper tests ─────────────────────────────────── - - #[test] - fn scan_stage_items_returns_empty_for_missing_dir() { - let tmp = tempfile::tempdir().unwrap(); - let items = scan_stage_items(tmp.path(), "2_current"); - assert!(items.is_empty()); - } - - #[test] - fn scan_stage_items_returns_sorted_story_ids() { - use std::fs; - let tmp = tempfile::tempdir().unwrap(); - let stage_dir = tmp.path().join(".storkit").join("work").join("2_current"); - fs::create_dir_all(&stage_dir).unwrap(); - fs::write(stage_dir.join("42_story_foo.md"), "---\nname: foo\n---").unwrap(); - fs::write(stage_dir.join("10_story_bar.md"), "---\nname: bar\n---").unwrap(); - fs::write(stage_dir.join("5_story_baz.md"), "---\nname: baz\n---").unwrap(); - // non-md file should be ignored - fs::write(stage_dir.join("README.txt"), "ignore me").unwrap(); - - let items = scan_stage_items(tmp.path(), "2_current"); - assert_eq!(items, vec!["10_story_bar", "42_story_foo", "5_story_baz"]); - } - - #[test] - fn is_story_assigned_returns_true_for_running_coder() { - let config = ProjectConfig::default(); - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running); - - let agents = pool.agents.lock().unwrap(); - assert!(is_story_assigned_for_stage( - &config, - &agents, - "42_story_foo", - &PipelineStage::Coder - )); - // Same story but wrong stage — should be false - assert!(!is_story_assigned_for_stage( - &config, - &agents, - "42_story_foo", - &PipelineStage::Qa - )); - // Different story — should be false - assert!(!is_story_assigned_for_stage( - &config, - &agents, - "99_story_other", - &PipelineStage::Coder - )); - } - - #[test] - fn is_story_assigned_returns_false_for_completed_agent() { - let config = ProjectConfig::default(); - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Completed); - - let agents = pool.agents.lock().unwrap(); - // Completed agents don't count as assigned - assert!(!is_story_assigned_for_stage( - &config, - &agents, - "42_story_foo", - &PipelineStage::Coder - )); - } - - #[test] - fn is_story_assigned_uses_config_stage_field_for_nonstandard_names() { - let config = ProjectConfig::parse( - r#" -[[agent]] -name = "qa-2" -stage = "qa" -"#, - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("42_story_foo", "qa-2", AgentStatus::Running); - - let agents = pool.agents.lock().unwrap(); - // qa-2 with stage=qa should be recognised as a QA agent - assert!( - is_story_assigned_for_stage(&config, &agents, "42_story_foo", &PipelineStage::Qa), - "qa-2 should be detected as assigned to QA stage" - ); - // Should NOT appear as a coder - assert!( - !is_story_assigned_for_stage(&config, &agents, "42_story_foo", &PipelineStage::Coder), - "qa-2 should not be detected as a coder" - ); - } - - #[test] - fn find_free_agent_returns_none_when_all_busy() { - let config = ProjectConfig::parse( - r#" -[[agent]] -name = "coder-1" -[[agent]] -name = "coder-2" -"#, - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("s1", "coder-1", AgentStatus::Running); - pool.inject_test_agent("s2", "coder-2", AgentStatus::Running); - - let agents = pool.agents.lock().unwrap(); - let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); - assert!(free.is_none(), "no free coders should be available"); - } - - #[test] - fn find_free_agent_returns_first_free_coder() { - let config = ProjectConfig::parse( - r#" -[[agent]] -name = "coder-1" -[[agent]] -name = "coder-2" -[[agent]] -name = "coder-3" -"#, - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - // coder-1 is busy, coder-2 is free - pool.inject_test_agent("s1", "coder-1", AgentStatus::Running); - - let agents = pool.agents.lock().unwrap(); - let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); - assert_eq!( - free, - Some("coder-2"), - "coder-2 should be the first free coder" - ); - } - - #[test] - fn find_free_agent_ignores_completed_agents() { - let config = ProjectConfig::parse( - r#" -[[agent]] -name = "coder-1" -"#, - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - // coder-1 completed its previous story — it's free for a new one - pool.inject_test_agent("s1", "coder-1", AgentStatus::Completed); - - let agents = pool.agents.lock().unwrap(); - let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); - assert_eq!(free, Some("coder-1"), "completed coder-1 should be free"); - } - - #[test] - fn find_free_agent_returns_none_for_wrong_stage() { - let config = ProjectConfig::parse( - r#" -[[agent]] -name = "qa" -"#, - ) - .unwrap(); - - let agents: HashMap = HashMap::new(); - // Looking for a Coder but only QA is configured - let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); - assert!(free.is_none()); - // Looking for QA should find it - let free_qa = find_free_agent_for_stage(&config, &agents, &PipelineStage::Qa); - assert_eq!(free_qa, Some("qa")); - } - - #[test] - fn find_free_agent_uses_config_stage_field_not_name() { - // Agents named "qa-2" and "coder-opus" don't match the legacy name heuristic - // but should be picked up via their explicit stage field. - let config = ProjectConfig::parse( - r#" -[[agent]] -name = "qa-2" -stage = "qa" - -[[agent]] -name = "coder-opus" -stage = "coder" -"#, - ) - .unwrap(); - - let agents: HashMap = HashMap::new(); - - // qa-2 should be found for PipelineStage::Qa via config stage field - let free_qa = find_free_agent_for_stage(&config, &agents, &PipelineStage::Qa); - assert_eq!(free_qa, Some("qa-2"), "qa-2 with stage=qa should be found"); - - // coder-opus should be found for PipelineStage::Coder via config stage field - let free_coder = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); - assert_eq!( - free_coder, - Some("coder-opus"), - "coder-opus with stage=coder should be found" - ); - - // Neither should match the other stage - let free_merge = find_free_agent_for_stage(&config, &agents, &PipelineStage::Mergemaster); - assert!(free_merge.is_none()); - } - - // ── check_orphaned_agents return value tests (bug 161) ────────────────── - - #[tokio::test] - async fn check_orphaned_agents_returns_count_of_orphaned_agents() { - let pool = AgentPool::new_test(3001); - - // Spawn two tasks that finish immediately. - let h1 = tokio::spawn(async {}); - let h2 = tokio::spawn(async {}); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - assert!(h1.is_finished()); - assert!(h2.is_finished()); - - pool.inject_test_agent_with_handle("story_a", "coder", AgentStatus::Running, h1); - pool.inject_test_agent_with_handle("story_b", "coder", AgentStatus::Running, h2); - - let found = check_orphaned_agents(&pool.agents); - assert_eq!(found, 2, "should detect both orphaned agents"); - } - - #[test] - fn check_orphaned_agents_returns_zero_when_no_orphans() { - let pool = AgentPool::new_test(3001); - // Inject agents in terminal states — not orphaned. - pool.inject_test_agent("story_a", "coder", AgentStatus::Completed); - pool.inject_test_agent("story_b", "qa", AgentStatus::Failed); - - let found = check_orphaned_agents(&pool.agents); - assert_eq!( - found, 0, - "no orphans should be detected for terminal agents" - ); - } - - #[tokio::test] - async fn watchdog_detects_orphaned_running_agent() { - let pool = AgentPool::new_test(3001); - - let handle = tokio::spawn(async {}); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - assert!( - handle.is_finished(), - "task should be finished before injection" - ); - - let tx = pool.inject_test_agent_with_handle( - "orphan_story", - "coder", - AgentStatus::Running, - handle, - ); - let mut rx = tx.subscribe(); - - pool.run_watchdog_once(); - - { - let agents = pool.agents.lock().unwrap(); - let key = composite_key("orphan_story", "coder"); - let agent = agents.get(&key).unwrap(); - assert_eq!( - agent.status, - AgentStatus::Failed, - "watchdog must mark an orphaned Running agent as Failed" - ); - } - - let event = rx.try_recv().expect("watchdog must emit an Error event"); - assert!( - matches!(event, AgentEvent::Error { .. }), - "expected AgentEvent::Error, got: {event:?}" - ); - } - - #[tokio::test] - async fn watchdog_orphan_detection_returns_nonzero_enabling_auto_assign() { - // This test verifies the contract that `check_orphaned_agents` returns - // a non-zero count when orphans exist, which the watchdog uses to - // decide whether to trigger auto-assign (bug 161). - let pool = AgentPool::new_test(3001); - - let handle = tokio::spawn(async {}); - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - - pool.inject_test_agent_with_handle("orphan_story", "coder", AgentStatus::Running, handle); - - // Before watchdog: agent is Running. - { - let agents = pool.agents.lock().unwrap(); - let key = composite_key("orphan_story", "coder"); - assert_eq!(agents.get(&key).unwrap().status, AgentStatus::Running); - } - - // Run watchdog pass — should return 1 (orphan found). - let found = check_orphaned_agents(&pool.agents); - assert_eq!( - found, 1, - "watchdog must return 1 for a single orphaned agent" - ); - - // After watchdog: agent is Failed. - { - let agents = pool.agents.lock().unwrap(); - let key = composite_key("orphan_story", "coder"); - assert_eq!( - agents.get(&key).unwrap().status, - AgentStatus::Failed, - "orphaned agent must be marked Failed" - ); - } - } - - // ── auto_assign_available_work tests ────────────────────────────────────── - - /// Story 203: auto_assign_available_work must detect a story in 2_current/ - /// with no active agent and start an agent for it. - #[tokio::test] - async fn auto_assign_picks_up_story_queued_in_current() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - let current = sk.join("work/2_current"); - std::fs::create_dir_all(¤t).unwrap(); - std::fs::write( - sk.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", - ) - .unwrap(); - // Place the story in 2_current/ (simulating the "queued" state). - std::fs::write(current.join("story-3.md"), "---\nname: Story 3\n---\n").unwrap(); - - let pool = AgentPool::new_test(3001); - // No agents are running — coder-1 is free. - - // auto_assign will try to call start_agent, which will attempt to create - // a worktree (will fail without a git repo) — that is fine. We only need - // to verify the agent is registered as Pending before the background - // task eventually fails. - pool.auto_assign_available_work(tmp.path()).await; - - let agents = pool.agents.lock().unwrap(); - let has_pending = agents.values().any(|a| { - a.agent_name == "coder-1" - && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) - }); - assert!( - has_pending, - "auto_assign should have started coder-1 for story-3, but pool is empty" - ); - } - - /// Story 265: auto_assign_available_work must skip spikes in 3_qa/ that - /// have review_hold: true set in their front matter. - #[tokio::test] - async fn auto_assign_skips_spikes_with_review_hold() { - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - // Create project.toml with a QA agent. - let sk = root.join(".storkit"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write( - sk.join("project.toml"), - "[[agents]]\nname = \"qa\"\nrole = \"qa\"\nmodel = \"test\"\nprompt = \"test\"\n", - ) - .unwrap(); - - // Put a spike in 3_qa/ with review_hold: true. - let qa_dir = root.join(".storkit/work/3_qa"); - std::fs::create_dir_all(&qa_dir).unwrap(); - std::fs::write( - qa_dir.join("20_spike_test.md"), - "---\nname: Test Spike\nreview_hold: true\n---\n# Spike\n", - ) - .unwrap(); - - let (watcher_tx, _) = broadcast::channel::(4); - let pool = AgentPool::new(3001, watcher_tx); - - pool.auto_assign_available_work(root).await; - - // No agent should have been started for the spike. - let agents = pool.agents.lock().unwrap(); - assert!( - agents.is_empty(), - "No agents should be assigned to a spike with review_hold" - ); - } - - // ── Story 279: auto-assign respects agent stage from front matter ────────── - - /// When a story in 3_qa/ has `agent: coder-1` in its front matter but - /// coder-1 is a coder-stage agent, auto-assign must NOT assign coder-1. - /// Instead it should fall back to a free QA-stage agent. - #[tokio::test] - async fn auto_assign_ignores_coder_preference_when_story_is_in_qa_stage() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - let qa_dir = sk.join("work/3_qa"); - std::fs::create_dir_all(&qa_dir).unwrap(); - std::fs::write( - sk.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ - [[agent]]\nname = \"qa-1\"\nstage = \"qa\"\n", - ) - .unwrap(); - // Story in 3_qa/ with a preferred coder-stage agent. - std::fs::write( - qa_dir.join("story-qa1.md"), - "---\nname: QA Story\nagent: coder-1\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - - pool.auto_assign_available_work(tmp.path()).await; - - let agents = pool.agents.lock().unwrap(); - // coder-1 must NOT have been assigned (wrong stage for 3_qa/). - let coder_assigned = agents.values().any(|a| { - a.agent_name == "coder-1" - && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) - }); - assert!( - !coder_assigned, - "coder-1 should not be assigned to a QA-stage story" - ); - // qa-1 should have been assigned instead. - let qa_assigned = agents.values().any(|a| { - a.agent_name == "qa-1" - && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) - }); - assert!( - qa_assigned, - "qa-1 should be assigned as fallback for the QA-stage story" - ); - } - - /// When a story in 2_current/ has `agent: coder-1` in its front matter and - /// coder-1 is a coder-stage agent, auto-assign must respect the preference - /// and assign coder-1 (not fall back to some other coder). - #[tokio::test] - async fn auto_assign_respects_coder_preference_when_story_is_in_current_stage() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - let current_dir = sk.join("work/2_current"); - std::fs::create_dir_all(¤t_dir).unwrap(); - std::fs::write( - sk.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ - [[agent]]\nname = \"coder-2\"\nstage = \"coder\"\n", - ) - .unwrap(); - // Story in 2_current/ with a preferred coder-1 agent. - std::fs::write( - current_dir.join("story-pref.md"), - "---\nname: Coder Story\nagent: coder-1\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - - pool.auto_assign_available_work(tmp.path()).await; - - let agents = pool.agents.lock().unwrap(); - // coder-1 should have been picked (it matches the stage and is preferred). - let coder1_assigned = agents.values().any(|a| { - a.agent_name == "coder-1" - && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) - }); - assert!( - coder1_assigned, - "coder-1 should be assigned when it matches the stage and is preferred" - ); - // coder-2 must NOT be assigned (not preferred). - let coder2_assigned = agents.values().any(|a| { - a.agent_name == "coder-2" - && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) - }); - assert!( - !coder2_assigned, - "coder-2 should not be assigned when coder-1 is explicitly preferred" - ); - } - - /// When the preferred agent's stage mismatches and no other agent of the - /// correct stage is available, auto-assign must not start any agent for that - /// story (no panic, no error). - #[tokio::test] - async fn auto_assign_stage_mismatch_with_no_fallback_starts_no_agent() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - let qa_dir = sk.join("work/3_qa"); - std::fs::create_dir_all(&qa_dir).unwrap(); - // Only a coder agent is configured — no QA agent exists. - std::fs::write( - sk.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", - ) - .unwrap(); - // Story in 3_qa/ requests coder-1 (wrong stage) and no QA agent exists. - std::fs::write( - qa_dir.join("story-noqa.md"), - "---\nname: QA Story No Agent\nagent: coder-1\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - - // Must not panic. - pool.auto_assign_available_work(tmp.path()).await; - - let agents = pool.agents.lock().unwrap(); - assert!( - agents.is_empty(), - "No agent should be started when no stage-appropriate agent is available" - ); - } - - /// Two concurrent auto_assign_available_work calls must not assign the same - /// agent to two stories simultaneously. After both complete, at most one - /// Pending/Running entry must exist per agent name. - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn toctou_concurrent_auto_assign_no_duplicate_agent_assignments() { - use std::fs; - use std::sync::Arc; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path().to_path_buf(); - - let sk_dir = root.join(".storkit"); - // Two stories waiting in 2_current, one coder agent. - fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\n", - ) - .unwrap(); - fs::write( - sk_dir.join("work/2_current/86_story_foo.md"), - "---\nname: Foo\n---\n", - ) - .unwrap(); - fs::write( - sk_dir.join("work/2_current/130_story_bar.md"), - "---\nname: Bar\n---\n", - ) - .unwrap(); - - let pool = Arc::new(AgentPool::new_test(3099)); - - // Run two concurrent auto_assign calls. - let pool1 = pool.clone(); - let root1 = root.clone(); - let t1 = tokio::spawn(async move { pool1.auto_assign_available_work(&root1).await }); - - let pool2 = pool.clone(); - let root2 = root.clone(); - let t2 = tokio::spawn(async move { pool2.auto_assign_available_work(&root2).await }); - - let _ = tokio::join!(t1, t2); - - // At most one Pending/Running entry should exist for coder-1. - let agents = pool.agents.lock().unwrap(); - let active_coder_count = agents - .values() - .filter(|a| { - a.agent_name == "coder-1" - && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) - }) - .count(); - - assert!( - active_coder_count <= 1, - "coder-1 must not be assigned to more than one story simultaneously; \ - found {active_coder_count} active entries" - ); - } - - // ── has_review_hold tests ──────────────────────────────────────────────── - - #[test] - fn has_review_hold_returns_true_when_set() { - let tmp = tempfile::tempdir().unwrap(); - let qa_dir = tmp.path().join(".storkit/work/3_qa"); - std::fs::create_dir_all(&qa_dir).unwrap(); - let spike_path = qa_dir.join("10_spike_research.md"); - std::fs::write( - &spike_path, - "---\nname: Research spike\nreview_hold: true\n---\n# Spike\n", - ) - .unwrap(); - assert!(has_review_hold(tmp.path(), "3_qa", "10_spike_research")); - } - - #[test] - fn has_review_hold_returns_false_when_not_set() { - let tmp = tempfile::tempdir().unwrap(); - let qa_dir = tmp.path().join(".storkit/work/3_qa"); - std::fs::create_dir_all(&qa_dir).unwrap(); - let spike_path = qa_dir.join("10_spike_research.md"); - std::fs::write(&spike_path, "---\nname: Research spike\n---\n# Spike\n").unwrap(); - assert!(!has_review_hold(tmp.path(), "3_qa", "10_spike_research")); - } - - #[test] - fn has_review_hold_returns_false_when_file_missing() { - let tmp = tempfile::tempdir().unwrap(); - assert!(!has_review_hold(tmp.path(), "3_qa", "99_spike_missing")); - } - - // ── find_free_agent_for_stage: default_coder_model filtering ───────── - - #[test] - fn find_free_agent_skips_opus_when_default_coder_model_set() { - let config = make_config( - r#" -default_coder_model = "sonnet" - -[[agent]] -name = "coder-1" -stage = "coder" -model = "sonnet" - -[[agent]] -name = "coder-opus" -stage = "coder" -model = "opus" -"#, - ); - - let agents = HashMap::new(); - let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); - assert_eq!(free, Some("coder-1")); - } - - #[test] - fn find_free_agent_returns_opus_when_no_default_coder_model() { - let config = make_config( - r#" -[[agent]] -name = "coder-opus" -stage = "coder" -model = "opus" -"#, - ); - - let agents = HashMap::new(); - let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); - assert_eq!(free, Some("coder-opus")); - } - - #[test] - fn find_free_agent_returns_none_when_all_sonnet_coders_busy() { - let config = make_config( - r#" -default_coder_model = "sonnet" - -[[agent]] -name = "coder-1" -stage = "coder" -model = "sonnet" - -[[agent]] -name = "coder-opus" -stage = "coder" -model = "opus" -"#, - ); - - let mut agents = HashMap::new(); - agents.insert( - "story1:coder-1".to_string(), - make_test_story_agent("coder-1", AgentStatus::Running), - ); - - let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); - assert_eq!(free, None, "opus agent should not be auto-assigned"); - } - - // ── find_free_agent_for_stage: max_coders limit ───────────────────── - - #[test] - fn find_free_agent_respects_max_coders() { - let config = make_config( - r#" -max_coders = 1 - -[[agent]] -name = "coder-1" -stage = "coder" -model = "sonnet" - -[[agent]] -name = "coder-2" -stage = "coder" -model = "sonnet" -"#, - ); - - let mut agents = HashMap::new(); - agents.insert( - "story1:coder-1".to_string(), - make_test_story_agent("coder-1", AgentStatus::Running), - ); - - let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); - assert_eq!(free, None, "max_coders=1 should block second coder"); - } - - #[test] - fn find_free_agent_allows_within_max_coders() { - let config = make_config( - r#" -max_coders = 2 - -[[agent]] -name = "coder-1" -stage = "coder" -model = "sonnet" - -[[agent]] -name = "coder-2" -stage = "coder" -model = "sonnet" -"#, - ); - - let mut agents = HashMap::new(); - agents.insert( - "story1:coder-1".to_string(), - make_test_story_agent("coder-1", AgentStatus::Running), - ); - - let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); - assert_eq!(free, Some("coder-2")); - } - - #[test] - fn max_coders_does_not_affect_qa_stage() { - let config = make_config( - r#" -max_coders = 1 - -[[agent]] -name = "qa" -stage = "qa" -model = "sonnet" -"#, - ); - - let agents = HashMap::new(); - let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Qa); - assert_eq!(free, Some("qa")); - } - - // ── count_active_agents_for_stage ──────────────────────────────────── - - #[test] - fn count_active_agents_counts_running_and_pending() { - let config = make_config( - r#" -[[agent]] -name = "coder-1" -stage = "coder" - -[[agent]] -name = "coder-2" -stage = "coder" -"#, - ); - - let mut agents = HashMap::new(); - agents.insert( - "s1:coder-1".to_string(), - make_test_story_agent("coder-1", AgentStatus::Running), - ); - agents.insert( - "s2:coder-2".to_string(), - make_test_story_agent("coder-2", AgentStatus::Completed), - ); - - let count = count_active_agents_for_stage(&config, &agents, &PipelineStage::Coder); - assert_eq!(count, 1, "Only Running coder should be counted, not Completed"); - } - - // ── reconcile_on_startup tests ──────────────────────────────────────────── - - #[tokio::test] - async fn reconcile_on_startup_noop_when_no_worktrees() { - let tmp = tempfile::tempdir().unwrap(); - let pool = AgentPool::new_test(3001); - let (tx, _rx) = broadcast::channel(16); - // Should not panic; no worktrees to reconcile. - pool.reconcile_on_startup(tmp.path(), &tx).await; - } - - #[tokio::test] - async fn reconcile_on_startup_emits_done_event() { - let tmp = tempfile::tempdir().unwrap(); - let pool = AgentPool::new_test(3001); - let (tx, mut rx) = broadcast::channel::(16); - pool.reconcile_on_startup(tmp.path(), &tx).await; - - // Collect all events; the last must be "done". - let mut events: Vec = Vec::new(); - while let Ok(evt) = rx.try_recv() { - events.push(evt); - } - assert!( - events.iter().any(|e| e.status == "done"), - "reconcile_on_startup must emit a 'done' event; got: {:?}", - events.iter().map(|e| &e.status).collect::>() - ); - } - - #[tokio::test] - async fn reconcile_on_startup_skips_story_without_committed_work() { - use std::fs; - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - // Set up story in 2_current/. - let current = root.join(".storkit/work/2_current"); - fs::create_dir_all(¤t).unwrap(); - fs::write(current.join("60_story_test.md"), "test").unwrap(); - - // Create a worktree directory that is a fresh git repo with no commits - // ahead of its own base branch (simulates a worktree where no work was done). - let wt_dir = root.join(".storkit/worktrees/60_story_test"); - fs::create_dir_all(&wt_dir).unwrap(); - init_git_repo(&wt_dir); - - let pool = AgentPool::new_test(3001); - let (tx, _rx) = broadcast::channel(16); - pool.reconcile_on_startup(root, &tx).await; - - // Story should still be in 2_current/ — nothing was reconciled. - assert!( - current.join("60_story_test.md").exists(), - "story should stay in 2_current/ when worktree has no committed work" - ); - } - - #[tokio::test] - async fn reconcile_on_startup_runs_gates_on_worktree_with_committed_work() { - use std::fs; - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - // Set up a git repo for the project root. - init_git_repo(root); - - // Set up story in 2_current/ and commit it so the project root is clean. - let current = root.join(".storkit/work/2_current"); - fs::create_dir_all(¤t).unwrap(); - fs::write(current.join("61_story_test.md"), "test").unwrap(); - Command::new("git") - .args(["add", "."]) - .current_dir(root) - .output() - .unwrap(); - Command::new("git") - .args([ - "-c", - "user.email=test@test.com", - "-c", - "user.name=Test", - "commit", - "-m", - "add story", - ]) - .current_dir(root) - .output() - .unwrap(); - - // Create a real git worktree for the story. - let wt_dir = root.join(".storkit/worktrees/61_story_test"); - fs::create_dir_all(wt_dir.parent().unwrap()).unwrap(); - Command::new("git") - .args([ - "worktree", - "add", - &wt_dir.to_string_lossy(), - "-b", - "feature/story-61_story_test", - ]) - .current_dir(root) - .output() - .unwrap(); - - // Add a commit to the feature branch (simulates coder completing work). - fs::write(wt_dir.join("implementation.txt"), "done").unwrap(); - Command::new("git") - .args(["add", "."]) - .current_dir(&wt_dir) - .output() - .unwrap(); - Command::new("git") - .args([ - "-c", - "user.email=test@test.com", - "-c", - "user.name=Test", - "commit", - "-m", - "implement story", - ]) - .current_dir(&wt_dir) - .output() - .unwrap(); - - assert!( - crate::agents::gates::worktree_has_committed_work(&wt_dir), - "test setup: worktree should have committed work" - ); - - let pool = AgentPool::new_test(3001); - let (tx, _rx) = broadcast::channel(16); - pool.reconcile_on_startup(root, &tx).await; - - // In the test env, cargo clippy will fail (no Cargo.toml) so gates fail - // and the story stays in 2_current/. The important assertion is that - // reconcile ran without panicking and the story is in a consistent state. - let in_current = current.join("61_story_test.md").exists(); - let in_qa = root.join(".storkit/work/3_qa/61_story_test.md").exists(); - assert!( - in_current || in_qa, - "story should be in 2_current/ or 3_qa/ after reconciliation" - ); - } -} diff --git a/server/src/agents/pool/auto_assign/auto_assign.rs b/server/src/agents/pool/auto_assign/auto_assign.rs new file mode 100644 index 00000000..11ff2ecd --- /dev/null +++ b/server/src/agents/pool/auto_assign/auto_assign.rs @@ -0,0 +1,477 @@ +//! Auto-assign: scan pipeline stages and dispatch free agents to unassigned stories. + +use crate::config::ProjectConfig; +use crate::slog; +use crate::slog_error; +use crate::slog_warn; +use crate::worktree; +use std::path::Path; + +use super::super::super::PipelineStage; +use super::super::AgentPool; +use super::scan::{ + count_active_agents_for_stage, find_free_agent_for_stage, is_agent_free, + is_story_assigned_for_stage, scan_stage_items, +}; +use super::story_checks::{ + has_merge_failure, has_review_hold, is_story_blocked, read_story_front_matter_agent, +}; + +impl AgentPool { + pub async fn auto_assign_available_work(&self, project_root: &Path) { + let config = match ProjectConfig::load(project_root) { + Ok(c) => c, + Err(e) => { + slog_warn!("[auto-assign] Failed to load project config: {e}"); + return; + } + }; + + // Process each active pipeline stage in order. + let stages: [(&str, PipelineStage); 3] = [ + ("2_current", PipelineStage::Coder), + ("3_qa", PipelineStage::Qa), + ("4_merge", PipelineStage::Mergemaster), + ]; + + for (stage_dir, stage) in &stages { + let items = scan_stage_items(project_root, stage_dir); + if items.is_empty() { + continue; + } + + for story_id in &items { + // Items marked with review_hold (e.g. spikes after QA passes) stay + // in their current stage for human review — don't auto-assign agents. + if has_review_hold(project_root, stage_dir, story_id) { + continue; + } + + // Skip blocked stories (retry limit exceeded). + if is_story_blocked(project_root, stage_dir, story_id) { + continue; + } + + // Skip stories in 4_merge/ that already have a reported merge failure. + // These need human intervention — auto-assigning a new mergemaster + // would just waste tokens on the same broken merge. + if *stage == PipelineStage::Mergemaster + && has_merge_failure(project_root, stage_dir, story_id) + { + continue; + } + + // AC6: Detect empty-diff stories in 4_merge/ before starting a + // mergemaster. If the worktree has no commits on the feature branch, + // write a merge_failure and block the story immediately. + if *stage == PipelineStage::Mergemaster + && let Some(wt_path) = worktree::find_worktree_path(project_root, story_id) + && !crate::agents::gates::worktree_has_committed_work(&wt_path) + { + slog_warn!( + "[auto-assign] Story '{story_id}' in 4_merge/ has no commits \ + on feature branch. Writing merge_failure and blocking." + ); + let story_path = project_root + .join(".storkit/work") + .join(stage_dir) + .join(format!("{story_id}.md")); + let _ = crate::io::story_metadata::write_merge_failure( + &story_path, + "Feature branch has no code changes — the coder agent \ + did not produce any commits.", + ); + let _ = crate::io::story_metadata::write_blocked(&story_path); + continue; + } + + // Re-acquire the lock on each iteration to see state changes + // from previous start_agent calls in the same pass. + let preferred_agent = + read_story_front_matter_agent(project_root, stage_dir, story_id); + + // Check max_coders limit for the Coder stage before agent selection. + // If the pool is full, all remaining items in this stage wait. + if *stage == PipelineStage::Coder + && let Some(max) = config.max_coders + { + let agents_lock = match self.agents.lock() { + Ok(a) => a, + Err(e) => { + slog_error!("[auto-assign] Failed to lock agents: {e}"); + break; + } + }; + let active = count_active_agents_for_stage(&config, &agents_lock, stage); + if active >= max { + slog!( + "[auto-assign] Coder pool full ({active}/{max}); remaining items in {stage_dir}/ will wait." + ); + break; + } + } + + // Outcome: (already_assigned, chosen_agent, preferred_busy, stage_mismatch) + // preferred_busy=true means the story has a specific agent requested but it is + // currently occupied — the story should wait rather than fall back. + // stage_mismatch=true means the preferred agent's stage doesn't match the + // pipeline stage, so we fell back to a generic stage agent. + let (already_assigned, free_agent, preferred_busy, stage_mismatch) = { + let agents = match self.agents.lock() { + Ok(a) => a, + Err(e) => { + slog_error!("[auto-assign] Failed to lock agents: {e}"); + break; + } + }; + let assigned = is_story_assigned_for_stage(&config, &agents, story_id, stage); + if assigned { + (true, None, false, false) + } else if let Some(ref pref) = preferred_agent { + // Story has a front-matter agent preference. + // Verify the preferred agent's stage matches the current + // pipeline stage — a coder shouldn't be assigned to QA. + let pref_stage_matches = config + .find_agent(pref) + .map(|cfg| super::super::super::agent_config_stage(cfg) == *stage) + .unwrap_or(false); + if !pref_stage_matches { + // Stage mismatch — fall back to any free agent for this stage. + let free = find_free_agent_for_stage(&config, &agents, stage) + .map(|s| s.to_string()); + (false, free, false, true) + } else if is_agent_free(&agents, pref) { + (false, Some(pref.clone()), false, false) + } else { + (false, None, true, false) + } + } else { + let free = find_free_agent_for_stage(&config, &agents, stage) + .map(|s| s.to_string()); + (false, free, false, false) + } + }; + + if already_assigned { + // Story already has an active agent — skip silently. + continue; + } + + if preferred_busy { + // The story requests a specific agent that is currently busy. + // Do not fall back to a different agent; let this story wait. + slog!( + "[auto-assign] Preferred agent '{}' busy for '{story_id}'; story will wait.", + preferred_agent.as_deref().unwrap_or("?") + ); + continue; + } + + if stage_mismatch { + slog!( + "[auto-assign] Preferred agent '{}' stage mismatch for '{story_id}' in {stage_dir}/; falling back to stage-appropriate agent.", + preferred_agent.as_deref().unwrap_or("?") + ); + } + + match free_agent { + Some(agent_name) => { + slog!( + "[auto-assign] Assigning '{agent_name}' to '{story_id}' in {stage_dir}/" + ); + if let Err(e) = self + .start_agent(project_root, story_id, Some(&agent_name), None) + .await + { + slog!( + "[auto-assign] Failed to start '{agent_name}' for '{story_id}': {e}" + ); + } + } + None => { + // No free agents of this type — stop scanning this stage. + slog!( + "[auto-assign] All {:?} agents busy; remaining items in {stage_dir}/ will wait.", + stage + ); + break; + } + } + } + } + } +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::super::super::AgentPool; + use crate::agents::AgentStatus; + use crate::io::watcher::WatcherEvent; + use tokio::sync::broadcast; + + /// Story 203: auto_assign_available_work must detect a story in 2_current/ + /// with no active agent and start an agent for it. + #[tokio::test] + async fn auto_assign_picks_up_story_queued_in_current() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + let current = sk.join("work/2_current"); + std::fs::create_dir_all(¤t).unwrap(); + std::fs::write( + sk.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", + ) + .unwrap(); + // Place the story in 2_current/ (simulating the "queued" state). + std::fs::write(current.join("story-3.md"), "---\nname: Story 3\n---\n").unwrap(); + + let pool = AgentPool::new_test(3001); + // No agents are running — coder-1 is free. + + // auto_assign will try to call start_agent, which will attempt to create + // a worktree (will fail without a git repo) — that is fine. We only need + // to verify the agent is registered as Pending before the background + // task eventually fails. + pool.auto_assign_available_work(tmp.path()).await; + + let agents = pool.agents.lock().unwrap(); + let has_pending = agents.values().any(|a| { + a.agent_name == "coder-1" + && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) + }); + assert!( + has_pending, + "auto_assign should have started coder-1 for story-3, but pool is empty" + ); + } + + /// Story 265: auto_assign_available_work must skip spikes in 3_qa/ that + /// have review_hold: true set in their front matter. + #[tokio::test] + async fn auto_assign_skips_spikes_with_review_hold() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + // Create project.toml with a QA agent. + let sk = root.join(".storkit"); + std::fs::create_dir_all(&sk).unwrap(); + std::fs::write( + sk.join("project.toml"), + "[[agents]]\nname = \"qa\"\nrole = \"qa\"\nmodel = \"test\"\nprompt = \"test\"\n", + ) + .unwrap(); + + // Put a spike in 3_qa/ with review_hold: true. + let qa_dir = root.join(".storkit/work/3_qa"); + std::fs::create_dir_all(&qa_dir).unwrap(); + std::fs::write( + qa_dir.join("20_spike_test.md"), + "---\nname: Test Spike\nreview_hold: true\n---\n# Spike\n", + ) + .unwrap(); + + let (watcher_tx, _) = broadcast::channel::(4); + let pool = AgentPool::new(3001, watcher_tx); + + pool.auto_assign_available_work(root).await; + + // No agent should have been started for the spike. + let agents = pool.agents.lock().unwrap(); + assert!( + agents.is_empty(), + "No agents should be assigned to a spike with review_hold" + ); + } + + // ── Story 279: auto-assign respects agent stage from front matter ────────── + + /// When a story in 3_qa/ has `agent: coder-1` in its front matter but + /// coder-1 is a coder-stage agent, auto-assign must NOT assign coder-1. + /// Instead it should fall back to a free QA-stage agent. + #[tokio::test] + async fn auto_assign_ignores_coder_preference_when_story_is_in_qa_stage() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + let qa_dir = sk.join("work/3_qa"); + std::fs::create_dir_all(&qa_dir).unwrap(); + std::fs::write( + sk.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ + [[agent]]\nname = \"qa-1\"\nstage = \"qa\"\n", + ) + .unwrap(); + // Story in 3_qa/ with a preferred coder-stage agent. + std::fs::write( + qa_dir.join("story-qa1.md"), + "---\nname: QA Story\nagent: coder-1\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + + pool.auto_assign_available_work(tmp.path()).await; + + let agents = pool.agents.lock().unwrap(); + // coder-1 must NOT have been assigned (wrong stage for 3_qa/). + let coder_assigned = agents.values().any(|a| { + a.agent_name == "coder-1" + && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) + }); + assert!( + !coder_assigned, + "coder-1 should not be assigned to a QA-stage story" + ); + // qa-1 should have been assigned instead. + let qa_assigned = agents.values().any(|a| { + a.agent_name == "qa-1" + && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) + }); + assert!( + qa_assigned, + "qa-1 should be assigned as fallback for the QA-stage story" + ); + } + + /// When a story in 2_current/ has `agent: coder-1` in its front matter and + /// coder-1 is a coder-stage agent, auto-assign must respect the preference + /// and assign coder-1 (not fall back to some other coder). + #[tokio::test] + async fn auto_assign_respects_coder_preference_when_story_is_in_current_stage() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + let current_dir = sk.join("work/2_current"); + std::fs::create_dir_all(¤t_dir).unwrap(); + std::fs::write( + sk.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ + [[agent]]\nname = \"coder-2\"\nstage = \"coder\"\n", + ) + .unwrap(); + // Story in 2_current/ with a preferred coder-1 agent. + std::fs::write( + current_dir.join("story-pref.md"), + "---\nname: Coder Story\nagent: coder-1\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + + pool.auto_assign_available_work(tmp.path()).await; + + let agents = pool.agents.lock().unwrap(); + // coder-1 should have been picked (it matches the stage and is preferred). + let coder1_assigned = agents.values().any(|a| { + a.agent_name == "coder-1" + && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) + }); + assert!( + coder1_assigned, + "coder-1 should be assigned when it matches the stage and is preferred" + ); + // coder-2 must NOT be assigned (not preferred). + let coder2_assigned = agents.values().any(|a| { + a.agent_name == "coder-2" + && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) + }); + assert!( + !coder2_assigned, + "coder-2 should not be assigned when coder-1 is explicitly preferred" + ); + } + + /// When the preferred agent's stage mismatches and no other agent of the + /// correct stage is available, auto-assign must not start any agent for that + /// story (no panic, no error). + #[tokio::test] + async fn auto_assign_stage_mismatch_with_no_fallback_starts_no_agent() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + let qa_dir = sk.join("work/3_qa"); + std::fs::create_dir_all(&qa_dir).unwrap(); + // Only a coder agent is configured — no QA agent exists. + std::fs::write( + sk.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", + ) + .unwrap(); + // Story in 3_qa/ requests coder-1 (wrong stage) and no QA agent exists. + std::fs::write( + qa_dir.join("story-noqa.md"), + "---\nname: QA Story No Agent\nagent: coder-1\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + + // Must not panic. + pool.auto_assign_available_work(tmp.path()).await; + + let agents = pool.agents.lock().unwrap(); + assert!( + agents.is_empty(), + "No agent should be started when no stage-appropriate agent is available" + ); + } + + /// Two concurrent auto_assign_available_work calls must not assign the same + /// agent to two stories simultaneously. After both complete, at most one + /// Pending/Running entry must exist per agent name. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn toctou_concurrent_auto_assign_no_duplicate_agent_assignments() { + use std::fs; + use std::sync::Arc; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path().to_path_buf(); + + let sk_dir = root.join(".storkit"); + // Two stories waiting in 2_current, one coder agent. + fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\n", + ) + .unwrap(); + fs::write( + sk_dir.join("work/2_current/86_story_foo.md"), + "---\nname: Foo\n---\n", + ) + .unwrap(); + fs::write( + sk_dir.join("work/2_current/130_story_bar.md"), + "---\nname: Bar\n---\n", + ) + .unwrap(); + + let pool = Arc::new(AgentPool::new_test(3099)); + + // Run two concurrent auto_assign calls. + let pool1 = pool.clone(); + let root1 = root.clone(); + let t1 = tokio::spawn(async move { pool1.auto_assign_available_work(&root1).await }); + + let pool2 = pool.clone(); + let root2 = root.clone(); + let t2 = tokio::spawn(async move { pool2.auto_assign_available_work(&root2).await }); + + let _ = tokio::join!(t1, t2); + + // At most one Pending/Running entry should exist for coder-1. + let agents = pool.agents.lock().unwrap(); + let active_coder_count = agents + .values() + .filter(|a| { + a.agent_name == "coder-1" + && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) + }) + .count(); + + assert!( + active_coder_count <= 1, + "coder-1 must not be assigned to more than one story simultaneously; \ + found {active_coder_count} active entries" + ); + } +} diff --git a/server/src/agents/pool/auto_assign/mod.rs b/server/src/agents/pool/auto_assign/mod.rs new file mode 100644 index 00000000..ff2daeb6 --- /dev/null +++ b/server/src/agents/pool/auto_assign/mod.rs @@ -0,0 +1,12 @@ +//! Auto-assign submodules: wires focused sub-files and re-exports public items. + +#[allow(clippy::module_inception)] +mod auto_assign; +mod reconcile; +mod scan; +mod story_checks; +mod watchdog; + +// Re-export items that were pub(super) in the original monolithic auto_assign.rs +// so that pool::lifecycle and pool::pipeline continue to access them unchanged. +pub(super) use scan::{find_free_agent_for_stage, is_agent_free}; diff --git a/server/src/agents/pool/auto_assign/reconcile.rs b/server/src/agents/pool/auto_assign/reconcile.rs new file mode 100644 index 00000000..b12296d8 --- /dev/null +++ b/server/src/agents/pool/auto_assign/reconcile.rs @@ -0,0 +1,527 @@ +//! Startup reconciliation: detect stories with committed work and advance the pipeline. + +use std::path::Path; +use tokio::sync::broadcast; + +use crate::worktree; + +use super::super::super::ReconciliationEvent; +use super::super::{AgentPool, find_active_story_stage}; + +impl AgentPool { + /// Reconcile stories whose agent work was committed while the server was offline. + /// + /// On server startup the in-memory agent pool is empty, so any story that an agent + /// completed during a previous session is stuck: the worktree has committed work but + /// the pipeline never advanced. This method detects those stories, re-runs the + /// acceptance gates, and advances the pipeline stage so that `auto_assign_available_work` + /// (called immediately after) picks up the right next-stage agents. + /// + /// Algorithm: + /// 1. List all worktree directories under `{project_root}/.storkit/worktrees/`. + /// 2. For each worktree, check whether its feature branch has commits ahead of the + /// base branch (`master` / `main`). + /// 3. If committed work is found AND the story is in `2_current/` or `3_qa/`: + /// - Run acceptance gates (uncommitted-change check + clippy + tests). + /// - On pass + `2_current/`: move the story to `3_qa/`. + /// - On pass + `3_qa/`: run the coverage gate; if that also passes move to `4_merge/`. + /// - On failure: leave the story where it is so `auto_assign_available_work` can + /// start a fresh agent to retry. + /// 4. Stories in `4_merge/` are left for `auto_assign_available_work` to handle via a + /// fresh mergemaster (squash-merge must be re-executed by the mergemaster agent). + pub async fn reconcile_on_startup( + &self, + project_root: &Path, + progress_tx: &broadcast::Sender, + ) { + let worktrees = match worktree::list_worktrees(project_root) { + Ok(wt) => wt, + Err(e) => { + eprintln!("[startup:reconcile] Failed to list worktrees: {e}"); + let _ = progress_tx.send(ReconciliationEvent { + story_id: String::new(), + status: "done".to_string(), + message: format!("Reconciliation failed: {e}"), + }); + return; + } + }; + + for wt_entry in &worktrees { + let story_id = &wt_entry.story_id; + let wt_path = wt_entry.path.clone(); + + // Determine which active stage the story is in. + let stage_dir = match find_active_story_stage(project_root, story_id) { + Some(s) => s, + None => continue, // Not in any active stage (backlog/archived or unknown). + }; + + // 4_merge/ is left for auto_assign to handle with a fresh mergemaster. + if stage_dir == "4_merge" { + continue; + } + + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "checking".to_string(), + message: format!("Checking for committed work in {stage_dir}/"), + }); + + // Check whether the worktree has commits ahead of the base branch. + let wt_path_for_check = wt_path.clone(); + let has_work = tokio::task::spawn_blocking(move || { + crate::agents::gates::worktree_has_committed_work(&wt_path_for_check) + }) + .await + .unwrap_or(false); + + if !has_work { + eprintln!( + "[startup:reconcile] No committed work for '{story_id}' in {stage_dir}/; skipping." + ); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "skipped".to_string(), + message: "No committed work found; skipping.".to_string(), + }); + continue; + } + + eprintln!( + "[startup:reconcile] Found committed work for '{story_id}' in {stage_dir}/. Running acceptance gates." + ); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "gates_running".to_string(), + message: "Running acceptance gates…".to_string(), + }); + + // Run acceptance gates on the worktree. + let wt_path_for_gates = wt_path.clone(); + let gates_result = tokio::task::spawn_blocking(move || { + crate::agents::gates::check_uncommitted_changes(&wt_path_for_gates)?; + crate::agents::gates::run_acceptance_gates(&wt_path_for_gates) + }) + .await; + + let (gates_passed, gate_output) = match gates_result { + Ok(Ok(pair)) => pair, + Ok(Err(e)) => { + eprintln!("[startup:reconcile] Gate check error for '{story_id}': {e}"); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: format!("Gate error: {e}"), + }); + continue; + } + Err(e) => { + eprintln!("[startup:reconcile] Gate check task panicked for '{story_id}': {e}"); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: format!("Gate task panicked: {e}"), + }); + continue; + } + }; + + if !gates_passed { + eprintln!( + "[startup:reconcile] Gates failed for '{story_id}': {gate_output}\n\ + Leaving in {stage_dir}/ for auto-assign to restart the agent." + ); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: "Gates failed; will be retried by auto-assign.".to_string(), + }); + continue; + } + + eprintln!("[startup:reconcile] Gates passed for '{story_id}' (stage: {stage_dir}/)."); + + if stage_dir == "2_current" { + // Coder stage — determine qa mode to decide next step. + let qa_mode = { + let item_type = crate::agents::lifecycle::item_type_from_id(story_id); + if item_type == "spike" { + crate::io::story_metadata::QaMode::Human + } else { + let default_qa = crate::config::ProjectConfig::load(project_root) + .unwrap_or_default() + .default_qa_mode(); + let story_path = project_root + .join(".storkit/work/2_current") + .join(format!("{story_id}.md")); + crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa) + } + }; + + match qa_mode { + crate::io::story_metadata::QaMode::Server => { + if let Err(e) = + crate::agents::move_story_to_merge(project_root, story_id) + { + eprintln!("[startup:reconcile] Failed to move '{story_id}' to 4_merge/: {e}"); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: format!("Failed to advance to merge: {e}"), + }); + } else { + eprintln!("[startup:reconcile] Moved '{story_id}' → 4_merge/ (qa: server)."); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "advanced".to_string(), + message: "Gates passed — moved to merge (qa: server).".to_string(), + }); + } + } + crate::io::story_metadata::QaMode::Agent => { + if let Err(e) = + crate::agents::move_story_to_qa(project_root, story_id) + { + eprintln!("[startup:reconcile] Failed to move '{story_id}' to 3_qa/: {e}"); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: format!("Failed to advance to QA: {e}"), + }); + } else { + eprintln!("[startup:reconcile] Moved '{story_id}' → 3_qa/."); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "advanced".to_string(), + message: "Gates passed — moved to QA.".to_string(), + }); + } + } + crate::io::story_metadata::QaMode::Human => { + if let Err(e) = + crate::agents::move_story_to_qa(project_root, story_id) + { + eprintln!("[startup:reconcile] Failed to move '{story_id}' to 3_qa/: {e}"); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: format!("Failed to advance to QA: {e}"), + }); + } else { + let story_path = project_root + .join(".storkit/work/3_qa") + .join(format!("{story_id}.md")); + if let Err(e) = + crate::io::story_metadata::write_review_hold(&story_path) + { + eprintln!( + "[startup:reconcile] Failed to set review_hold on '{story_id}': {e}" + ); + } + eprintln!("[startup:reconcile] Moved '{story_id}' → 3_qa/ (qa: human — holding for review)."); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "review_hold".to_string(), + message: "Gates passed — holding for human review.".to_string(), + }); + } + } + } + } else if stage_dir == "3_qa" { + // QA stage → run coverage gate before advancing to merge. + let wt_path_for_cov = wt_path.clone(); + let coverage_result = tokio::task::spawn_blocking(move || { + crate::agents::gates::run_coverage_gate(&wt_path_for_cov) + }) + .await; + + let (coverage_passed, coverage_output) = match coverage_result { + Ok(Ok(pair)) => pair, + Ok(Err(e)) => { + eprintln!("[startup:reconcile] Coverage gate error for '{story_id}': {e}"); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: format!("Coverage gate error: {e}"), + }); + continue; + } + Err(e) => { + eprintln!( + "[startup:reconcile] Coverage gate panicked for '{story_id}': {e}" + ); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: format!("Coverage gate panicked: {e}"), + }); + continue; + } + }; + + if coverage_passed { + // Check whether this item needs human review before merging. + let needs_human_review = { + let item_type = crate::agents::lifecycle::item_type_from_id(story_id); + if item_type == "spike" { + true + } else { + let story_path = project_root + .join(".storkit/work/3_qa") + .join(format!("{story_id}.md")); + let default_qa = crate::config::ProjectConfig::load(project_root) + .unwrap_or_default() + .default_qa_mode(); + matches!( + crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa), + crate::io::story_metadata::QaMode::Human + ) + } + }; + + if needs_human_review { + let story_path = project_root + .join(".storkit/work/3_qa") + .join(format!("{story_id}.md")); + if let Err(e) = + crate::io::story_metadata::write_review_hold(&story_path) + { + eprintln!( + "[startup:reconcile] Failed to set review_hold on '{story_id}': {e}" + ); + } + eprintln!( + "[startup:reconcile] '{story_id}' passed QA — holding for human review." + ); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "review_hold".to_string(), + message: "Passed QA — waiting for human review.".to_string(), + }); + } else if let Err(e) = + crate::agents::move_story_to_merge(project_root, story_id) + { + eprintln!( + "[startup:reconcile] Failed to move '{story_id}' to 4_merge/: {e}" + ); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: format!("Failed to advance to merge: {e}"), + }); + } else { + eprintln!("[startup:reconcile] Moved '{story_id}' → 4_merge/."); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "advanced".to_string(), + message: "Gates passed — moved to merge.".to_string(), + }); + } + } else { + eprintln!( + "[startup:reconcile] Coverage gate failed for '{story_id}': {coverage_output}\n\ + Leaving in 3_qa/ for auto-assign to restart the QA agent." + ); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: "Coverage gate failed; will be retried.".to_string(), + }); + } + } + } + + // Signal that reconciliation is complete. + let _ = progress_tx.send(ReconciliationEvent { + story_id: String::new(), + status: "done".to_string(), + message: "Startup reconciliation complete.".to_string(), + }); + } +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use std::process::Command; + use tokio::sync::broadcast; + + use super::super::super::AgentPool; + use crate::agents::ReconciliationEvent; + + fn init_git_repo(repo: &std::path::Path) { + Command::new("git") + .args(["init"]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["config", "user.email", "test@test.com"]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["config", "user.name", "Test"]) + .current_dir(repo) + .output() + .unwrap(); + // Create initial commit so master branch exists. + std::fs::write(repo.join("README.md"), "# test\n").unwrap(); + Command::new("git") + .args(["add", "."]) + .current_dir(repo) + .output() + .unwrap(); + Command::new("git") + .args(["commit", "-m", "initial"]) + .current_dir(repo) + .output() + .unwrap(); + } + + #[tokio::test] + async fn reconcile_on_startup_noop_when_no_worktrees() { + let tmp = tempfile::tempdir().unwrap(); + let pool = AgentPool::new_test(3001); + let (tx, _rx) = broadcast::channel(16); + // Should not panic; no worktrees to reconcile. + pool.reconcile_on_startup(tmp.path(), &tx).await; + } + + #[tokio::test] + async fn reconcile_on_startup_emits_done_event() { + let tmp = tempfile::tempdir().unwrap(); + let pool = AgentPool::new_test(3001); + let (tx, mut rx) = broadcast::channel::(16); + pool.reconcile_on_startup(tmp.path(), &tx).await; + + // Collect all events; the last must be "done". + let mut events: Vec = Vec::new(); + while let Ok(evt) = rx.try_recv() { + events.push(evt); + } + assert!( + events.iter().any(|e| e.status == "done"), + "reconcile_on_startup must emit a 'done' event; got: {:?}", + events.iter().map(|e| &e.status).collect::>() + ); + } + + #[tokio::test] + async fn reconcile_on_startup_skips_story_without_committed_work() { + use std::fs; + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + // Set up story in 2_current/. + let current = root.join(".storkit/work/2_current"); + fs::create_dir_all(¤t).unwrap(); + fs::write(current.join("60_story_test.md"), "test").unwrap(); + + // Create a worktree directory that is a fresh git repo with no commits + // ahead of its own base branch (simulates a worktree where no work was done). + let wt_dir = root.join(".storkit/worktrees/60_story_test"); + fs::create_dir_all(&wt_dir).unwrap(); + init_git_repo(&wt_dir); + + let pool = AgentPool::new_test(3001); + let (tx, _rx) = broadcast::channel(16); + pool.reconcile_on_startup(root, &tx).await; + + // Story should still be in 2_current/ — nothing was reconciled. + assert!( + current.join("60_story_test.md").exists(), + "story should stay in 2_current/ when worktree has no committed work" + ); + } + + #[tokio::test] + async fn reconcile_on_startup_runs_gates_on_worktree_with_committed_work() { + use std::fs; + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + // Set up a git repo for the project root. + init_git_repo(root); + + // Set up story in 2_current/ and commit it so the project root is clean. + let current = root.join(".storkit/work/2_current"); + fs::create_dir_all(¤t).unwrap(); + fs::write(current.join("61_story_test.md"), "test").unwrap(); + Command::new("git") + .args(["add", "."]) + .current_dir(root) + .output() + .unwrap(); + Command::new("git") + .args([ + "-c", + "user.email=test@test.com", + "-c", + "user.name=Test", + "commit", + "-m", + "add story", + ]) + .current_dir(root) + .output() + .unwrap(); + + // Create a real git worktree for the story. + let wt_dir = root.join(".storkit/worktrees/61_story_test"); + fs::create_dir_all(wt_dir.parent().unwrap()).unwrap(); + Command::new("git") + .args([ + "worktree", + "add", + &wt_dir.to_string_lossy(), + "-b", + "feature/story-61_story_test", + ]) + .current_dir(root) + .output() + .unwrap(); + + // Add a commit to the feature branch (simulates coder completing work). + fs::write(wt_dir.join("implementation.txt"), "done").unwrap(); + Command::new("git") + .args(["add", "."]) + .current_dir(&wt_dir) + .output() + .unwrap(); + Command::new("git") + .args([ + "-c", + "user.email=test@test.com", + "-c", + "user.name=Test", + "commit", + "-m", + "implement story", + ]) + .current_dir(&wt_dir) + .output() + .unwrap(); + + assert!( + crate::agents::gates::worktree_has_committed_work(&wt_dir), + "test setup: worktree should have committed work" + ); + + let pool = AgentPool::new_test(3001); + let (tx, _rx) = broadcast::channel(16); + pool.reconcile_on_startup(root, &tx).await; + + // In the test env, cargo clippy will fail (no Cargo.toml) so gates fail + // and the story stays in 2_current/. The important assertion is that + // reconcile ran without panicking and the story is in a consistent state. + let in_current = current.join("61_story_test.md").exists(); + let in_qa = root.join(".storkit/work/3_qa/61_story_test.md").exists(); + assert!( + in_current || in_qa, + "story should be in 2_current/ or 3_qa/ after reconciliation" + ); + } +} diff --git a/server/src/agents/pool/auto_assign/scan.rs b/server/src/agents/pool/auto_assign/scan.rs new file mode 100644 index 00000000..ac3a9d2b --- /dev/null +++ b/server/src/agents/pool/auto_assign/scan.rs @@ -0,0 +1,552 @@ +//! Scanning pipeline stages for work items and querying agent pool state. + +use crate::config::ProjectConfig; +use std::collections::HashMap; +use std::path::Path; + +use super::super::super::{AgentStatus, PipelineStage, agent_config_stage, pipeline_stage}; +use super::super::StoryAgent; + +/// Return `true` if `agent_name` has no active (pending/running) entry in the pool. +pub(in crate::agents::pool) fn is_agent_free( + agents: &HashMap, + agent_name: &str, +) -> bool { + !agents.values().any(|a| { + a.agent_name == agent_name + && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) + }) +} + +pub(super) fn scan_stage_items(project_root: &Path, stage_dir: &str) -> Vec { + let dir = project_root.join(".storkit").join("work").join(stage_dir); + if !dir.is_dir() { + return Vec::new(); + } + let mut items = Vec::new(); + if let Ok(entries) = std::fs::read_dir(&dir) { + for entry in entries.flatten() { + let path = entry.path(); + if path.extension().and_then(|e| e.to_str()) == Some("md") + && let Some(stem) = path.file_stem().and_then(|s| s.to_str()) + { + items.push(stem.to_string()); + } + } + } + items.sort(); + items +} + +/// Return `true` if `story_id` has any active (pending/running) agent matching `stage`. +/// +/// Uses the explicit `stage` config field when the agent is found in `config`; +/// falls back to the legacy name-based heuristic for unlisted agents. +pub(super) fn is_story_assigned_for_stage( + config: &ProjectConfig, + agents: &HashMap, + story_id: &str, + stage: &PipelineStage, +) -> bool { + agents.iter().any(|(key, agent)| { + // Composite key format: "{story_id}:{agent_name}" + let key_story_id = key.rsplit_once(':').map(|(sid, _)| sid).unwrap_or(key); + let agent_stage = config + .find_agent(&agent.agent_name) + .map(agent_config_stage) + .unwrap_or_else(|| pipeline_stage(&agent.agent_name)); + key_story_id == story_id + && agent_stage == *stage + && matches!(agent.status, AgentStatus::Running | AgentStatus::Pending) + }) +} + +/// Count active (pending/running) agents for a given pipeline stage. +pub(super) fn count_active_agents_for_stage( + config: &ProjectConfig, + agents: &HashMap, + stage: &PipelineStage, +) -> usize { + agents + .values() + .filter(|a| { + matches!(a.status, AgentStatus::Running | AgentStatus::Pending) + && config + .find_agent(&a.agent_name) + .map(|cfg| agent_config_stage(cfg) == *stage) + .unwrap_or_else(|| pipeline_stage(&a.agent_name) == *stage) + }) + .count() +} + +/// Find the first configured agent for `stage` that has no active (pending/running) assignment. +/// Returns `None` if all agents for that stage are busy, none are configured, +/// or the `max_coders` limit has been reached (for the Coder stage). +/// +/// For the Coder stage, when `default_coder_model` is set, only considers agents whose +/// model matches the default. This ensures opus-class agents are reserved for explicit +/// front-matter requests. +pub(in crate::agents::pool) fn find_free_agent_for_stage<'a>( + config: &'a ProjectConfig, + agents: &HashMap, + stage: &PipelineStage, +) -> Option<&'a str> { + // Enforce max_coders limit for the Coder stage. + if *stage == PipelineStage::Coder + && let Some(max) = config.max_coders + { + let active = count_active_agents_for_stage(config, agents, stage); + if active >= max { + return None; + } + } + + for agent_config in &config.agent { + if agent_config_stage(agent_config) != *stage { + continue; + } + // When default_coder_model is set, only auto-assign coder agents whose + // model matches. This keeps opus agents reserved for explicit requests. + if *stage == PipelineStage::Coder + && let Some(ref default_model) = config.default_coder_model + { + let agent_model = agent_config.model.as_deref().unwrap_or(""); + if agent_model != default_model { + continue; + } + } + let is_busy = agents.values().any(|a| { + a.agent_name == agent_config.name + && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) + }); + if !is_busy { + return Some(&agent_config.name); + } + } + None +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::ProjectConfig; + use std::sync::{Arc, Mutex}; + use tokio::sync::broadcast; + + use super::super::super::AgentPool; + + fn make_config(toml_str: &str) -> ProjectConfig { + ProjectConfig::parse(toml_str).unwrap() + } + + fn make_test_story_agent(agent_name: &str, status: AgentStatus) -> StoryAgent { + StoryAgent { + agent_name: agent_name.to_string(), + status, + worktree_info: None, + session_id: None, + tx: broadcast::channel(1).0, + task_handle: None, + event_log: Arc::new(Mutex::new(Vec::new())), + completion: None, + project_root: None, + log_session_id: None, + merge_failure_reported: false, + } + } + + #[test] + fn scan_stage_items_returns_empty_for_missing_dir() { + let tmp = tempfile::tempdir().unwrap(); + let items = scan_stage_items(tmp.path(), "2_current"); + assert!(items.is_empty()); + } + + #[test] + fn scan_stage_items_returns_sorted_story_ids() { + use std::fs; + let tmp = tempfile::tempdir().unwrap(); + let stage_dir = tmp.path().join(".storkit").join("work").join("2_current"); + fs::create_dir_all(&stage_dir).unwrap(); + fs::write(stage_dir.join("42_story_foo.md"), "---\nname: foo\n---").unwrap(); + fs::write(stage_dir.join("10_story_bar.md"), "---\nname: bar\n---").unwrap(); + fs::write(stage_dir.join("5_story_baz.md"), "---\nname: baz\n---").unwrap(); + // non-md file should be ignored + fs::write(stage_dir.join("README.txt"), "ignore me").unwrap(); + + let items = scan_stage_items(tmp.path(), "2_current"); + assert_eq!(items, vec!["10_story_bar", "42_story_foo", "5_story_baz"]); + } + + #[test] + fn is_story_assigned_returns_true_for_running_coder() { + let config = ProjectConfig::default(); + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running); + + let agents = pool.agents.lock().unwrap(); + assert!(is_story_assigned_for_stage( + &config, + &agents, + "42_story_foo", + &PipelineStage::Coder + )); + // Same story but wrong stage — should be false + assert!(!is_story_assigned_for_stage( + &config, + &agents, + "42_story_foo", + &PipelineStage::Qa + )); + // Different story — should be false + assert!(!is_story_assigned_for_stage( + &config, + &agents, + "99_story_other", + &PipelineStage::Coder + )); + } + + #[test] + fn is_story_assigned_returns_false_for_completed_agent() { + let config = ProjectConfig::default(); + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Completed); + + let agents = pool.agents.lock().unwrap(); + // Completed agents don't count as assigned + assert!(!is_story_assigned_for_stage( + &config, + &agents, + "42_story_foo", + &PipelineStage::Coder + )); + } + + #[test] + fn is_story_assigned_uses_config_stage_field_for_nonstandard_names() { + let config = ProjectConfig::parse( + r#" +[[agent]] +name = "qa-2" +stage = "qa" +"#, + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("42_story_foo", "qa-2", AgentStatus::Running); + + let agents = pool.agents.lock().unwrap(); + // qa-2 with stage=qa should be recognised as a QA agent + assert!( + is_story_assigned_for_stage(&config, &agents, "42_story_foo", &PipelineStage::Qa), + "qa-2 should be detected as assigned to QA stage" + ); + // Should NOT appear as a coder + assert!( + !is_story_assigned_for_stage(&config, &agents, "42_story_foo", &PipelineStage::Coder), + "qa-2 should not be detected as a coder" + ); + } + + #[test] + fn find_free_agent_returns_none_when_all_busy() { + let config = ProjectConfig::parse( + r#" +[[agent]] +name = "coder-1" +[[agent]] +name = "coder-2" +"#, + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("s1", "coder-1", AgentStatus::Running); + pool.inject_test_agent("s2", "coder-2", AgentStatus::Running); + + let agents = pool.agents.lock().unwrap(); + let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); + assert!(free.is_none(), "no free coders should be available"); + } + + #[test] + fn find_free_agent_returns_first_free_coder() { + let config = ProjectConfig::parse( + r#" +[[agent]] +name = "coder-1" +[[agent]] +name = "coder-2" +[[agent]] +name = "coder-3" +"#, + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + // coder-1 is busy, coder-2 is free + pool.inject_test_agent("s1", "coder-1", AgentStatus::Running); + + let agents = pool.agents.lock().unwrap(); + let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); + assert_eq!( + free, + Some("coder-2"), + "coder-2 should be the first free coder" + ); + } + + #[test] + fn find_free_agent_ignores_completed_agents() { + let config = ProjectConfig::parse( + r#" +[[agent]] +name = "coder-1" +"#, + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + // coder-1 completed its previous story — it's free for a new one + pool.inject_test_agent("s1", "coder-1", AgentStatus::Completed); + + let agents = pool.agents.lock().unwrap(); + let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); + assert_eq!(free, Some("coder-1"), "completed coder-1 should be free"); + } + + #[test] + fn find_free_agent_returns_none_for_wrong_stage() { + let config = ProjectConfig::parse( + r#" +[[agent]] +name = "qa" +"#, + ) + .unwrap(); + + let agents: HashMap = HashMap::new(); + // Looking for a Coder but only QA is configured + let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); + assert!(free.is_none()); + // Looking for QA should find it + let free_qa = find_free_agent_for_stage(&config, &agents, &PipelineStage::Qa); + assert_eq!(free_qa, Some("qa")); + } + + #[test] + fn find_free_agent_uses_config_stage_field_not_name() { + // Agents named "qa-2" and "coder-opus" don't match the legacy name heuristic + // but should be picked up via their explicit stage field. + let config = ProjectConfig::parse( + r#" +[[agent]] +name = "qa-2" +stage = "qa" + +[[agent]] +name = "coder-opus" +stage = "coder" +"#, + ) + .unwrap(); + + let agents: HashMap = HashMap::new(); + + // qa-2 should be found for PipelineStage::Qa via config stage field + let free_qa = find_free_agent_for_stage(&config, &agents, &PipelineStage::Qa); + assert_eq!(free_qa, Some("qa-2"), "qa-2 with stage=qa should be found"); + + // coder-opus should be found for PipelineStage::Coder via config stage field + let free_coder = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); + assert_eq!( + free_coder, + Some("coder-opus"), + "coder-opus with stage=coder should be found" + ); + + // Neither should match the other stage + let free_merge = find_free_agent_for_stage(&config, &agents, &PipelineStage::Mergemaster); + assert!(free_merge.is_none()); + } + + // ── find_free_agent_for_stage: default_coder_model filtering ───────── + + #[test] + fn find_free_agent_skips_opus_when_default_coder_model_set() { + let config = make_config( + r#" +default_coder_model = "sonnet" + +[[agent]] +name = "coder-1" +stage = "coder" +model = "sonnet" + +[[agent]] +name = "coder-opus" +stage = "coder" +model = "opus" +"#, + ); + + let agents = HashMap::new(); + let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); + assert_eq!(free, Some("coder-1")); + } + + #[test] + fn find_free_agent_returns_opus_when_no_default_coder_model() { + let config = make_config( + r#" +[[agent]] +name = "coder-opus" +stage = "coder" +model = "opus" +"#, + ); + + let agents = HashMap::new(); + let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); + assert_eq!(free, Some("coder-opus")); + } + + #[test] + fn find_free_agent_returns_none_when_all_sonnet_coders_busy() { + let config = make_config( + r#" +default_coder_model = "sonnet" + +[[agent]] +name = "coder-1" +stage = "coder" +model = "sonnet" + +[[agent]] +name = "coder-opus" +stage = "coder" +model = "opus" +"#, + ); + + let mut agents = HashMap::new(); + agents.insert( + "story1:coder-1".to_string(), + make_test_story_agent("coder-1", AgentStatus::Running), + ); + + let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); + assert_eq!(free, None, "opus agent should not be auto-assigned"); + } + + // ── find_free_agent_for_stage: max_coders limit ───────────────────── + + #[test] + fn find_free_agent_respects_max_coders() { + let config = make_config( + r#" +max_coders = 1 + +[[agent]] +name = "coder-1" +stage = "coder" +model = "sonnet" + +[[agent]] +name = "coder-2" +stage = "coder" +model = "sonnet" +"#, + ); + + let mut agents = HashMap::new(); + agents.insert( + "story1:coder-1".to_string(), + make_test_story_agent("coder-1", AgentStatus::Running), + ); + + let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); + assert_eq!(free, None, "max_coders=1 should block second coder"); + } + + #[test] + fn find_free_agent_allows_within_max_coders() { + let config = make_config( + r#" +max_coders = 2 + +[[agent]] +name = "coder-1" +stage = "coder" +model = "sonnet" + +[[agent]] +name = "coder-2" +stage = "coder" +model = "sonnet" +"#, + ); + + let mut agents = HashMap::new(); + agents.insert( + "story1:coder-1".to_string(), + make_test_story_agent("coder-1", AgentStatus::Running), + ); + + let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); + assert_eq!(free, Some("coder-2")); + } + + #[test] + fn max_coders_does_not_affect_qa_stage() { + let config = make_config( + r#" +max_coders = 1 + +[[agent]] +name = "qa" +stage = "qa" +model = "sonnet" +"#, + ); + + let agents = HashMap::new(); + let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Qa); + assert_eq!(free, Some("qa")); + } + + // ── count_active_agents_for_stage ──────────────────────────────────── + + #[test] + fn count_active_agents_counts_running_and_pending() { + let config = make_config( + r#" +[[agent]] +name = "coder-1" +stage = "coder" + +[[agent]] +name = "coder-2" +stage = "coder" +"#, + ); + + let mut agents = HashMap::new(); + agents.insert( + "s1:coder-1".to_string(), + make_test_story_agent("coder-1", AgentStatus::Running), + ); + agents.insert( + "s2:coder-2".to_string(), + make_test_story_agent("coder-2", AgentStatus::Completed), + ); + + let count = count_active_agents_for_stage(&config, &agents, &PipelineStage::Coder); + assert_eq!(count, 1, "Only Running coder should be counted, not Completed"); + } + +} diff --git a/server/src/agents/pool/auto_assign/story_checks.rs b/server/src/agents/pool/auto_assign/story_checks.rs new file mode 100644 index 00000000..f5133f2b --- /dev/null +++ b/server/src/agents/pool/auto_assign/story_checks.rs @@ -0,0 +1,113 @@ +//! Front-matter checks for story files: review holds, blocked state, and merge failures. + +use std::path::Path; + +/// Read the optional `agent:` field from the front matter of a story file. +/// +/// Returns `Some(agent_name)` if the front matter specifies an agent, or `None` +/// if the field is absent or the file cannot be read / parsed. +pub(super) fn read_story_front_matter_agent( + project_root: &Path, + stage_dir: &str, + story_id: &str, +) -> Option { + use crate::io::story_metadata::parse_front_matter; + let path = project_root + .join(".storkit") + .join("work") + .join(stage_dir) + .join(format!("{story_id}.md")); + let contents = std::fs::read_to_string(path).ok()?; + parse_front_matter(&contents).ok()?.agent +} + +/// Return `true` if the story file in the given stage has `review_hold: true` in its front matter. +pub(super) fn has_review_hold(project_root: &Path, stage_dir: &str, story_id: &str) -> bool { + use crate::io::story_metadata::parse_front_matter; + let path = project_root + .join(".storkit") + .join("work") + .join(stage_dir) + .join(format!("{story_id}.md")); + let contents = match std::fs::read_to_string(path) { + Ok(c) => c, + Err(_) => return false, + }; + parse_front_matter(&contents) + .ok() + .and_then(|m| m.review_hold) + .unwrap_or(false) +} + +/// Return `true` if the story file has `blocked: true` in its front matter. +pub(super) fn is_story_blocked(project_root: &Path, stage_dir: &str, story_id: &str) -> bool { + use crate::io::story_metadata::parse_front_matter; + let path = project_root + .join(".storkit") + .join("work") + .join(stage_dir) + .join(format!("{story_id}.md")); + let contents = match std::fs::read_to_string(path) { + Ok(c) => c, + Err(_) => return false, + }; + parse_front_matter(&contents) + .ok() + .and_then(|m| m.blocked) + .unwrap_or(false) +} + +/// Return `true` if the story file has a `merge_failure` field in its front matter. +pub(super) fn has_merge_failure(project_root: &Path, stage_dir: &str, story_id: &str) -> bool { + use crate::io::story_metadata::parse_front_matter; + let path = project_root + .join(".storkit") + .join("work") + .join(stage_dir) + .join(format!("{story_id}.md")); + let contents = match std::fs::read_to_string(path) { + Ok(c) => c, + Err(_) => return false, + }; + parse_front_matter(&contents) + .ok() + .and_then(|m| m.merge_failure) + .is_some() +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn has_review_hold_returns_true_when_set() { + let tmp = tempfile::tempdir().unwrap(); + let qa_dir = tmp.path().join(".storkit/work/3_qa"); + std::fs::create_dir_all(&qa_dir).unwrap(); + let spike_path = qa_dir.join("10_spike_research.md"); + std::fs::write( + &spike_path, + "---\nname: Research spike\nreview_hold: true\n---\n# Spike\n", + ) + .unwrap(); + assert!(has_review_hold(tmp.path(), "3_qa", "10_spike_research")); + } + + #[test] + fn has_review_hold_returns_false_when_not_set() { + let tmp = tempfile::tempdir().unwrap(); + let qa_dir = tmp.path().join(".storkit/work/3_qa"); + std::fs::create_dir_all(&qa_dir).unwrap(); + let spike_path = qa_dir.join("10_spike_research.md"); + std::fs::write(&spike_path, "---\nname: Research spike\n---\n# Spike\n").unwrap(); + assert!(!has_review_hold(tmp.path(), "3_qa", "10_spike_research")); + } + + #[test] + fn has_review_hold_returns_false_when_file_missing() { + let tmp = tempfile::tempdir().unwrap(); + assert!(!has_review_hold(tmp.path(), "3_qa", "99_spike_missing")); + } +} diff --git a/server/src/agents/pool/auto_assign/watchdog.rs b/server/src/agents/pool/auto_assign/watchdog.rs new file mode 100644 index 00000000..c374c6ba --- /dev/null +++ b/server/src/agents/pool/auto_assign/watchdog.rs @@ -0,0 +1,220 @@ +//! Watchdog task: detects orphaned agents and triggers auto-assign. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; + +use crate::slog; + +use super::super::super::{AgentEvent, AgentStatus}; +use super::super::{AgentPool, StoryAgent}; + +/// Scan the agent pool for Running entries whose backing tokio task has already +/// finished and mark them as Failed. +/// +/// This handles the case where the PTY read loop or the spawned task exits +/// without updating the agent status — for example when the process is killed +/// externally and the PTY master fd returns EOF before our inactivity timeout +/// fires, but some other edge case prevents the normal cleanup path from running. +pub(super) fn check_orphaned_agents(agents: &Mutex>) -> usize { + let mut lock = match agents.lock() { + Ok(l) => l, + Err(_) => return 0, + }; + + // Collect orphaned entries: Running or Pending agents whose task handle is finished. + // Pending agents can be orphaned if worktree creation panics before setting status. + let orphaned: Vec<(String, String, broadcast::Sender, AgentStatus)> = lock + .iter() + .filter_map(|(key, agent)| { + if matches!(agent.status, AgentStatus::Running | AgentStatus::Pending) + && let Some(handle) = &agent.task_handle + && handle.is_finished() + { + let story_id = key + .rsplit_once(':') + .map(|(s, _)| s.to_string()) + .unwrap_or_else(|| key.clone()); + return Some(( + key.clone(), + story_id, + agent.tx.clone(), + agent.status.clone(), + )); + } + None + }) + .collect(); + + let count = orphaned.len(); + for (key, story_id, tx, prev_status) in orphaned { + if let Some(agent) = lock.get_mut(&key) { + agent.status = AgentStatus::Failed; + slog!( + "[watchdog] Orphaned agent '{key}': task finished but status was {prev_status}. \ + Marking Failed." + ); + let _ = tx.send(AgentEvent::Error { + story_id, + agent_name: agent.agent_name.clone(), + message: "Agent process terminated unexpectedly (watchdog detected orphan)" + .to_string(), + }); + } + } + count +} + +impl AgentPool { + /// Run a single watchdog pass synchronously (test helper). + #[cfg(test)] + pub fn run_watchdog_once(&self) { + check_orphaned_agents(&self.agents); + } + + /// Spawn a background watchdog task that periodically checks for Running agents + /// whose underlying task has already finished (orphaned entries). Any such agent + /// is marked Failed and an Error event is emitted so that `wait_for_agent` unblocks. + /// + /// The watchdog runs every 30 seconds. It is a safety net for edge cases where the + /// PTY read loop exits without updating the agent status (e.g. a panic in the + /// spawn_blocking task, or an external SIGKILL that closes the PTY fd immediately). + /// + /// When orphaned agents are detected and a `project_root` is provided, auto-assign + /// is triggered so that free agents can pick up unassigned work. + pub fn spawn_watchdog(pool: Arc, project_root: Option) { + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); + loop { + interval.tick().await; + let found = check_orphaned_agents(&pool.agents); + if found > 0 + && let Some(ref root) = project_root + { + slog!("[watchdog] {found} orphaned agent(s) detected; triggering auto-assign."); + pool.auto_assign_available_work(root).await; + } + } + }); + } +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use super::super::super::{AgentPool, composite_key}; + + // ── check_orphaned_agents return value tests (bug 161) ────────────────── + + #[tokio::test] + async fn check_orphaned_agents_returns_count_of_orphaned_agents() { + let pool = AgentPool::new_test(3001); + + // Spawn two tasks that finish immediately. + let h1 = tokio::spawn(async {}); + let h2 = tokio::spawn(async {}); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + assert!(h1.is_finished()); + assert!(h2.is_finished()); + + pool.inject_test_agent_with_handle("story_a", "coder", AgentStatus::Running, h1); + pool.inject_test_agent_with_handle("story_b", "coder", AgentStatus::Running, h2); + + let found = check_orphaned_agents(&pool.agents); + assert_eq!(found, 2, "should detect both orphaned agents"); + } + + #[test] + fn check_orphaned_agents_returns_zero_when_no_orphans() { + let pool = AgentPool::new_test(3001); + // Inject agents in terminal states — not orphaned. + pool.inject_test_agent("story_a", "coder", AgentStatus::Completed); + pool.inject_test_agent("story_b", "qa", AgentStatus::Failed); + + let found = check_orphaned_agents(&pool.agents); + assert_eq!( + found, 0, + "no orphans should be detected for terminal agents" + ); + } + + #[tokio::test] + async fn watchdog_detects_orphaned_running_agent() { + let pool = AgentPool::new_test(3001); + + let handle = tokio::spawn(async {}); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + assert!( + handle.is_finished(), + "task should be finished before injection" + ); + + let tx = pool.inject_test_agent_with_handle( + "orphan_story", + "coder", + AgentStatus::Running, + handle, + ); + let mut rx = tx.subscribe(); + + pool.run_watchdog_once(); + + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("orphan_story", "coder"); + let agent = agents.get(&key).unwrap(); + assert_eq!( + agent.status, + AgentStatus::Failed, + "watchdog must mark an orphaned Running agent as Failed" + ); + } + + let event = rx.try_recv().expect("watchdog must emit an Error event"); + assert!( + matches!(event, AgentEvent::Error { .. }), + "expected AgentEvent::Error, got: {event:?}" + ); + } + + #[tokio::test] + async fn watchdog_orphan_detection_returns_nonzero_enabling_auto_assign() { + // This test verifies the contract that `check_orphaned_agents` returns + // a non-zero count when orphans exist, which the watchdog uses to + // decide whether to trigger auto-assign (bug 161). + let pool = AgentPool::new_test(3001); + + let handle = tokio::spawn(async {}); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + pool.inject_test_agent_with_handle("orphan_story", "coder", AgentStatus::Running, handle); + + // Before watchdog: agent is Running. + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("orphan_story", "coder"); + assert_eq!(agents.get(&key).unwrap().status, AgentStatus::Running); + } + + // Run watchdog pass — should return 1 (orphan found). + let found = check_orphaned_agents(&pool.agents); + assert_eq!( + found, 1, + "watchdog must return 1 for a single orphaned agent" + ); + + // After watchdog: agent is Failed. + { + let agents = pool.agents.lock().unwrap(); + let key = composite_key("orphan_story", "coder"); + assert_eq!( + agents.get(&key).unwrap().status, + AgentStatus::Failed, + "orphaned agent must be marked Failed" + ); + } + } +}