mod auto_assign; mod pipeline; use crate::agent_log::AgentLogWriter; use crate::config::ProjectConfig; use crate::io::watcher::WatcherEvent; use crate::slog; use crate::slog_error; use crate::worktree::{self, WorktreeInfo}; use portable_pty::ChildKiller; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; use super::{ AgentEvent, AgentInfo, AgentStatus, CompletionReport, PipelineStage, agent_config_stage, pipeline_stage, }; use super::runtime::{AgentRuntime, ClaudeCodeRuntime, GeminiRuntime, OpenAiRuntime, RuntimeContext}; /// Build the composite key used to track agents in the pool. fn composite_key(story_id: &str, agent_name: &str) -> String { format!("{story_id}:{agent_name}") } /// RAII guard that removes a pending agent entry from the pool on drop. /// /// Created after inserting a `Pending` entry into the agent HashMap. /// If `start_agent` succeeds (the agent process is spawned and status /// transitions to `Running`), call [`disarm`](Self::disarm) to prevent /// cleanup. If any intermediate step fails and the guard is dropped /// without being disarmed, the pending entry is removed so it cannot /// block future auto-assign dispatches. struct PendingGuard { agents: Arc>>, key: String, armed: bool, } impl PendingGuard { fn new(agents: Arc>>, key: String) -> Self { Self { agents, key, armed: true, } } /// Prevent the guard from cleaning up the entry (call after /// successful spawn). fn disarm(&mut self) { self.armed = false; } } impl Drop for PendingGuard { fn drop(&mut self) { if self.armed && let Ok(mut agents) = self.agents.lock() && agents .get(&self.key) .is_some_and(|a| a.status == AgentStatus::Pending) { agents.remove(&self.key); slog!( "[agents] Cleaned up leaked Pending entry for '{}'", self.key ); } } } struct StoryAgent { agent_name: String, status: AgentStatus, worktree_info: Option, session_id: Option, tx: broadcast::Sender, task_handle: Option>, /// Accumulated events for polling via get_agent_output. event_log: Arc>>, /// Set when the agent calls report_completion. completion: Option, /// Project root, stored for pipeline advancement after completion. project_root: Option, /// UUID identifying the log file for this session. log_session_id: Option, /// Set to `true` when the agent calls `report_merge_failure`. /// Prevents the pipeline from blindly advancing to `5_done/` after a /// failed merge: the server-owned gate check runs in the feature-branch /// worktree (which compiles fine) and returns `gates_passed=true` even /// though the code was never squash-merged onto master. merge_failure_reported: bool, } /// Build an `AgentInfo` snapshot from a `StoryAgent` map entry. fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> AgentInfo { AgentInfo { story_id: story_id.to_string(), agent_name: agent.agent_name.clone(), status: agent.status.clone(), session_id: agent.session_id.clone(), worktree_path: agent .worktree_info .as_ref() .map(|wt| wt.path.to_string_lossy().to_string()), base_branch: agent .worktree_info .as_ref() .map(|wt| wt.base_branch.clone()), completion: agent.completion.clone(), log_session_id: agent.log_session_id.clone(), } } /// Manages concurrent story agents, each in its own worktree. pub struct AgentPool { agents: Arc>>, port: u16, /// Registry of active PTY child process killers, keyed by "{story_id}:{agent_name}". /// Used to terminate child processes on server shutdown or agent stop, preventing /// orphaned Claude Code processes from running after the server exits. child_killers: Arc>>>, /// Broadcast channel for notifying WebSocket clients of agent state changes. /// When an agent transitions state (Pending, Running, Completed, Failed, Stopped), /// an `AgentStateChanged` event is emitted so the frontend can refresh the /// pipeline board without waiting for a filesystem event. watcher_tx: broadcast::Sender, /// Tracks background merge jobs started by `merge_agent_work`, keyed by story_id. /// The MCP tool returns immediately and the mergemaster agent polls /// `get_merge_status` until the job reaches a terminal state. merge_jobs: Arc>>, } impl AgentPool { pub fn new(port: u16, watcher_tx: broadcast::Sender) -> Self { Self { agents: Arc::new(Mutex::new(HashMap::new())), port, child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx, merge_jobs: Arc::new(Mutex::new(HashMap::new())), } } /// Create a pool with a dummy watcher channel for unit tests. #[cfg(test)] pub fn new_test(port: u16) -> Self { let (watcher_tx, _) = broadcast::channel(16); Self::new(port, watcher_tx) } /// Notify WebSocket clients that agent state has changed, so the pipeline /// board and agent panel can refresh. fn notify_agent_state_changed(watcher_tx: &broadcast::Sender) { let _ = watcher_tx.send(WatcherEvent::AgentStateChanged); } /// Kill all active PTY child processes. /// /// Called on server shutdown to prevent orphaned Claude Code processes from /// continuing to run after the server exits. Each registered killer is called /// once, then the registry is cleared. pub fn kill_all_children(&self) { if let Ok(mut killers) = self.child_killers.lock() { for (key, killer) in killers.iter_mut() { slog!("[agents] Killing child process for {key} on shutdown"); let _ = killer.kill(); } killers.clear(); } } /// Kill and deregister the child process for a specific agent key. /// /// Used by `stop_agent` to ensure the PTY child is terminated even though /// aborting a `spawn_blocking` task handle does not interrupt the blocking thread. fn kill_child_for_key(&self, key: &str) { if let Ok(mut killers) = self.child_killers.lock() && let Some(mut killer) = killers.remove(key) { slog!("[agents] Killing child process for {key} on stop"); let _ = killer.kill(); } } /// Start an agent for a story: load config, create worktree, spawn agent. /// /// When `agent_name` is `None`, automatically selects the first idle coder /// agent (story 190). If all coders are busy the call fails with an error /// indicating the story will be picked up when one becomes available. /// /// If `resume_context` is provided, it is appended to the rendered prompt /// so the agent can pick up from a previous failed attempt. pub async fn start_agent( &self, project_root: &Path, story_id: &str, agent_name: Option<&str>, resume_context: Option<&str>, ) -> Result { let config = ProjectConfig::load(project_root)?; // Validate explicit agent name early (no lock needed). if let Some(name) = agent_name { config .find_agent(name) .ok_or_else(|| format!("No agent named '{name}' in config"))?; } // Create name-independent shared resources before the lock so they are // ready for the atomic check-and-insert (story 132). let (tx, _) = broadcast::channel::(1024); let event_log: Arc>> = Arc::new(Mutex::new(Vec::new())); let log_session_id = uuid::Uuid::new_v4().to_string(); // Move story from backlog/ to current/ before checking agent // availability so that auto_assign_available_work can pick it up even // when all coders are currently busy (story 203). This is idempotent: // if the story is already in 2_current/ or a later stage, the call is // a no-op. super::lifecycle::move_story_to_current(project_root, story_id)?; // Validate that the agent's configured stage matches the story's // pipeline stage. This prevents any caller (auto-assign, MCP tool, // pipeline advance, supervisor) from starting a wrong-stage agent on // a story — e.g. mergemaster on a coding-stage story (bug 312). if let Some(name) = agent_name { let agent_stage = config .find_agent(name) .map(agent_config_stage) .unwrap_or_else(|| pipeline_stage(name)); if agent_stage != PipelineStage::Other && let Some(story_stage_dir) = find_active_story_stage(project_root, story_id) { let expected_stage = match story_stage_dir { "2_current" => PipelineStage::Coder, "3_qa" => PipelineStage::Qa, "4_merge" => PipelineStage::Mergemaster, _ => PipelineStage::Other, }; if expected_stage != PipelineStage::Other && expected_stage != agent_stage { return Err(format!( "Agent '{name}' (stage: {agent_stage:?}) cannot be assigned to \ story '{story_id}' in {story_stage_dir}/ (requires stage: {expected_stage:?})" )); } } } // Atomically resolve agent name, check availability, and register as // Pending. When `agent_name` is `None` the first idle coder is // selected inside the lock so no TOCTOU race can occur between the // availability check and the Pending insert (story 132, story 190). // // The `PendingGuard` ensures that if any step below fails the entry is // removed from the pool so it does not permanently block auto-assign // (bug 118). let resolved_name: String; let key: String; { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; resolved_name = match agent_name { Some(name) => name.to_string(), None => auto_assign::find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder) .map(|s| s.to_string()) .ok_or_else(|| { if config .agent .iter() .any(|a| agent_config_stage(a) == PipelineStage::Coder) { format!( "All coder agents are busy; story '{story_id}' has been \ queued in work/2_current/ and will be auto-assigned when \ one becomes available" ) } else { "No coder agent configured. Specify an agent_name explicitly." .to_string() } })?, }; key = composite_key(story_id, &resolved_name); // Check for duplicate assignment (same story + same agent already active). if let Some(agent) = agents.get(&key) && (agent.status == AgentStatus::Running || agent.status == AgentStatus::Pending) { return Err(format!( "Agent '{resolved_name}' for story '{story_id}' is already {}", agent.status )); } // Enforce single-stage concurrency: reject if there is already a // Running/Pending agent at the same pipeline stage for this story. // This prevents two coders (or two QA/mergemaster agents) from // corrupting each other's work in the same worktree. // Applies to both explicit and auto-selected agents; the Other // stage (supervisors, unknown agents) is exempt. let resolved_stage = config .find_agent(&resolved_name) .map(agent_config_stage) .unwrap_or_else(|| pipeline_stage(&resolved_name)); if resolved_stage != PipelineStage::Other && let Some(conflicting_name) = agents.iter().find_map(|(k, a)| { let k_story = k.rsplit_once(':').map(|(s, _)| s).unwrap_or(k); if k_story == story_id && a.agent_name != resolved_name && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) { let a_stage = config .find_agent(&a.agent_name) .map(agent_config_stage) .unwrap_or_else(|| pipeline_stage(&a.agent_name)); if a_stage == resolved_stage { Some(a.agent_name.clone()) } else { None } } else { None } }) { return Err(format!( "Cannot start '{resolved_name}' on story '{story_id}': \ '{conflicting_name}' is already active at the same pipeline stage" )); } // Enforce single-instance concurrency for explicitly-named agents: // if this agent is already running on any other story, reject. // Auto-selected agents are already guaranteed idle by // find_free_agent_for_stage, so this check is only needed for // explicit requests. if agent_name.is_some() && let Some(busy_story) = agents.iter().find_map(|(k, a)| { if a.agent_name == resolved_name && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) { Some( k.rsplit_once(':') .map(|(sid, _)| sid) .unwrap_or(k) .to_string(), ) } else { None } }) { return Err(format!( "Agent '{resolved_name}' is already running on story '{busy_story}'; \ story '{story_id}' will be picked up when the agent becomes available" )); } agents.insert( key.clone(), StoryAgent { agent_name: resolved_name.clone(), status: AgentStatus::Pending, worktree_info: None, session_id: None, tx: tx.clone(), task_handle: None, event_log: event_log.clone(), completion: None, project_root: Some(project_root.to_path_buf()), log_session_id: Some(log_session_id.clone()), merge_failure_reported: false, }, ); } let mut pending_guard = PendingGuard::new(self.agents.clone(), key.clone()); // Create persistent log writer (needs resolved_name, so must be after // the atomic resolution above). let log_writer = match AgentLogWriter::new(project_root, story_id, &resolved_name, &log_session_id) { Ok(w) => Some(Arc::new(Mutex::new(w))), Err(e) => { eprintln!( "[agents] Failed to create log writer for {story_id}:{resolved_name}: {e}" ); None } }; // Notify WebSocket clients that a new agent is pending. Self::notify_agent_state_changed(&self.watcher_tx); let _ = tx.send(AgentEvent::Status { story_id: story_id.to_string(), agent_name: resolved_name.clone(), status: "pending".to_string(), }); // Extract inactivity timeout from the agent config before cloning config. let inactivity_timeout_secs = config .find_agent(&resolved_name) .map(|a| a.inactivity_timeout_secs) .unwrap_or(300); // Clone all values needed inside the background spawn. let project_root_clone = project_root.to_path_buf(); let config_clone = config.clone(); let resume_context_owned = resume_context.map(str::to_string); let sid = story_id.to_string(); let aname = resolved_name.clone(); let tx_clone = tx.clone(); let agents_ref = self.agents.clone(); let key_clone = key.clone(); let log_clone = event_log.clone(); let port_for_task = self.port; let log_writer_clone = log_writer.clone(); let child_killers_clone = self.child_killers.clone(); let watcher_tx_clone = self.watcher_tx.clone(); // Spawn the background task. Worktree creation and agent launch happen here // so `start_agent` returns immediately after registering the agent as // Pending — non-blocking by design (story 157). let handle = tokio::spawn(async move { // Step 1: create the worktree (slow — git checkout, pnpm install, etc.) let wt_info = match worktree::create_worktree( &project_root_clone, &sid, &config_clone, port_for_task, ) .await { Ok(wt) => wt, Err(e) => { let error_msg = format!("Failed to create worktree: {e}"); slog_error!("[agents] {error_msg}"); let event = AgentEvent::Error { story_id: sid.clone(), agent_name: aname.clone(), message: error_msg, }; if let Ok(mut log) = log_clone.lock() { log.push(event.clone()); } let _ = tx_clone.send(event); if let Ok(mut agents) = agents_ref.lock() && let Some(agent) = agents.get_mut(&key_clone) { agent.status = AgentStatus::Failed; } Self::notify_agent_state_changed(&watcher_tx_clone); return; } }; // Step 2: store worktree info and render agent command/args/prompt. let wt_path_str = wt_info.path.to_string_lossy().to_string(); { if let Ok(mut agents) = agents_ref.lock() && let Some(agent) = agents.get_mut(&key_clone) { agent.worktree_info = Some(wt_info.clone()); } } let (command, args, mut prompt) = match config_clone.render_agent_args( &wt_path_str, &sid, Some(&aname), Some(&wt_info.base_branch), ) { Ok(result) => result, Err(e) => { let error_msg = format!("Failed to render agent args: {e}"); slog_error!("[agents] {error_msg}"); let event = AgentEvent::Error { story_id: sid.clone(), agent_name: aname.clone(), message: error_msg, }; if let Ok(mut log) = log_clone.lock() { log.push(event.clone()); } let _ = tx_clone.send(event); if let Ok(mut agents) = agents_ref.lock() && let Some(agent) = agents.get_mut(&key_clone) { agent.status = AgentStatus::Failed; } Self::notify_agent_state_changed(&watcher_tx_clone); return; } }; // Append resume context if this is a restart with failure information. if let Some(ctx) = resume_context_owned { prompt.push_str(&ctx); } // Step 3: transition to Running now that the worktree is ready. { if let Ok(mut agents) = agents_ref.lock() && let Some(agent) = agents.get_mut(&key_clone) { agent.status = AgentStatus::Running; } } let _ = tx_clone.send(AgentEvent::Status { story_id: sid.clone(), agent_name: aname.clone(), status: "running".to_string(), }); Self::notify_agent_state_changed(&watcher_tx_clone); // Step 4: launch the agent process via the configured runtime. let runtime_name = config_clone .find_agent(&aname) .and_then(|a| a.runtime.as_deref()) .unwrap_or("claude-code"); let run_result = match runtime_name { "claude-code" => { let runtime = ClaudeCodeRuntime::new(child_killers_clone.clone()); let ctx = RuntimeContext { story_id: sid.clone(), agent_name: aname.clone(), command, args, prompt, cwd: wt_path_str, inactivity_timeout_secs, mcp_port: port_for_task, }; runtime .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) .await } "gemini" => { let runtime = GeminiRuntime::new(); let ctx = RuntimeContext { story_id: sid.clone(), agent_name: aname.clone(), command, args, prompt, cwd: wt_path_str, inactivity_timeout_secs, mcp_port: port_for_task, }; runtime .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) .await } "openai" => { let runtime = OpenAiRuntime::new(); let ctx = RuntimeContext { story_id: sid.clone(), agent_name: aname.clone(), command, args, prompt, cwd: wt_path_str, inactivity_timeout_secs, mcp_port: port_for_task, }; runtime .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) .await } other => Err(format!( "Unknown agent runtime '{other}'; check the 'runtime' field in project.toml. \ Supported: 'claude-code', 'gemini', 'openai'" )), }; match run_result { Ok(result) => { // Persist token usage if the agent reported it. if let Some(ref usage) = result.token_usage && let Ok(agents) = agents_ref.lock() && let Some(agent) = agents.get(&key_clone) && let Some(ref pr) = agent.project_root { let model = config_clone .find_agent(&aname) .and_then(|a| a.model.clone()); let record = super::token_usage::build_record( &sid, &aname, model, usage.clone(), ); if let Err(e) = super::token_usage::append_record(pr, &record) { slog_error!( "[agents] Failed to persist token usage for \ {sid}:{aname}: {e}" ); } } // Server-owned completion: run acceptance gates automatically // when the agent process exits normally. pipeline::run_server_owned_completion( &agents_ref, port_for_task, &sid, &aname, result.session_id, watcher_tx_clone.clone(), ) .await; Self::notify_agent_state_changed(&watcher_tx_clone); } Err(e) => { slog_error!("[agents] Agent process error for {aname} on {sid}: {e}"); let event = AgentEvent::Error { story_id: sid.clone(), agent_name: aname.clone(), message: e, }; if let Ok(mut log) = log_clone.lock() { log.push(event.clone()); } let _ = tx_clone.send(event); if let Ok(mut agents) = agents_ref.lock() && let Some(agent) = agents.get_mut(&key_clone) { agent.status = AgentStatus::Failed; } Self::notify_agent_state_changed(&watcher_tx_clone); } } }); // Store the task handle while the agent is still Pending. { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; if let Some(agent) = agents.get_mut(&key) { agent.task_handle = Some(handle); } } // Agent successfully spawned — prevent the guard from removing the entry. pending_guard.disarm(); Ok(AgentInfo { story_id: story_id.to_string(), agent_name: resolved_name, status: AgentStatus::Pending, session_id: None, worktree_path: None, base_branch: None, completion: None, log_session_id: Some(log_session_id), }) } /// Stop a running agent. Worktree is preserved for inspection. pub async fn stop_agent( &self, _project_root: &Path, story_id: &str, agent_name: &str, ) -> Result<(), String> { let key = composite_key(story_id, agent_name); let (worktree_info, task_handle, tx) = { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents .get_mut(&key) .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; let wt = agent.worktree_info.clone(); let handle = agent.task_handle.take(); let tx = agent.tx.clone(); agent.status = AgentStatus::Failed; (wt, handle, tx) }; // Abort the task and kill the PTY child process. // Note: aborting a spawn_blocking task handle does not interrupt the blocking // thread, so we must also kill the child process directly via the killer registry. if let Some(handle) = task_handle { handle.abort(); let _ = handle.await; } self.kill_child_for_key(&key); // Preserve worktree for inspection — don't destroy agent's work on stop. if let Some(ref wt) = worktree_info { slog!( "[agents] Worktree preserved for {story_id}:{agent_name}: {}", wt.path.display() ); } let _ = tx.send(AgentEvent::Status { story_id: story_id.to_string(), agent_name: agent_name.to_string(), status: "stopped".to_string(), }); // Remove from map { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; agents.remove(&key); } // Notify WebSocket clients so pipeline board and agent panel update. Self::notify_agent_state_changed(&self.watcher_tx); Ok(()) } /// Return the names of configured agents for `stage` that are not currently /// running or pending. pub fn available_agents_for_stage( &self, config: &ProjectConfig, stage: &PipelineStage, ) -> Result, String> { let agents = self.agents.lock().map_err(|e| e.to_string())?; Ok(config .agent .iter() .filter(|cfg| agent_config_stage(cfg) == *stage) .filter(|cfg| { !agents.values().any(|a| { a.agent_name == cfg.name && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) }) }) .map(|cfg| cfg.name.clone()) .collect()) } /// List all agents with their status. pub fn list_agents(&self) -> Result, String> { let agents = self.agents.lock().map_err(|e| e.to_string())?; Ok(agents .iter() .map(|(key, agent)| { // Extract story_id from composite key "story_id:agent_name" let story_id = key .rsplit_once(':') .map(|(sid, _)| sid.to_string()) .unwrap_or_else(|| key.clone()); agent_info_from_entry(&story_id, agent) }) .collect()) } /// Subscribe to events for a story agent. pub fn subscribe( &self, story_id: &str, agent_name: &str, ) -> Result, String> { let key = composite_key(story_id, agent_name); let agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents .get(&key) .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; Ok(agent.tx.subscribe()) } /// Drain accumulated events for polling. Returns all events since the last drain. pub fn drain_events( &self, story_id: &str, agent_name: &str, ) -> Result, String> { let key = composite_key(story_id, agent_name); let agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents .get(&key) .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; let mut log = agent.event_log.lock().map_err(|e| e.to_string())?; Ok(log.drain(..).collect()) } /// Block until the agent reaches a terminal state (completed, failed, stopped). /// Returns the agent's final `AgentInfo`. /// `timeout_ms` caps how long to wait; returns an error if the deadline passes. pub async fn wait_for_agent( &self, story_id: &str, agent_name: &str, timeout_ms: u64, ) -> Result { // Subscribe before checking status so we don't miss the terminal event // if the agent completes in the window between the two operations. let mut rx = self.subscribe(story_id, agent_name)?; // Return immediately if already in a terminal state. { let agents = self.agents.lock().map_err(|e| e.to_string())?; let key = composite_key(story_id, agent_name); if let Some(agent) = agents.get(&key) && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) { return Ok(agent_info_from_entry(story_id, agent)); } } let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms); loop { let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); if remaining.is_zero() { return Err(format!( "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" )); } match tokio::time::timeout(remaining, rx.recv()).await { Ok(Ok(event)) => { let is_terminal = match &event { AgentEvent::Done { .. } | AgentEvent::Error { .. } => true, AgentEvent::Status { status, .. } if status == "stopped" => true, _ => false, }; if is_terminal { let agents = self.agents.lock().map_err(|e| e.to_string())?; let key = composite_key(story_id, agent_name); return Ok(if let Some(agent) = agents.get(&key) { agent_info_from_entry(story_id, agent) } else { // Agent was removed from map (e.g. stop_agent removes it after // the "stopped" status event is sent). let (status, session_id) = match event { AgentEvent::Done { session_id, .. } => { (AgentStatus::Completed, session_id) } _ => (AgentStatus::Failed, None), }; AgentInfo { story_id: story_id.to_string(), agent_name: agent_name.to_string(), status, session_id, worktree_path: None, base_branch: None, completion: None, log_session_id: None, } }); } } Ok(Err(broadcast::error::RecvError::Lagged(_))) => { // Missed some buffered events — check current status before resuming. let agents = self.agents.lock().map_err(|e| e.to_string())?; let key = composite_key(story_id, agent_name); if let Some(agent) = agents.get(&key) && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) { return Ok(agent_info_from_entry(story_id, agent)); } // Still running — continue the loop. } Ok(Err(broadcast::error::RecvError::Closed)) => { // Channel closed: no more events will arrive. Return current state. let agents = self.agents.lock().map_err(|e| e.to_string())?; let key = composite_key(story_id, agent_name); if let Some(agent) = agents.get(&key) { return Ok(agent_info_from_entry(story_id, agent)); } return Err(format!( "Agent '{agent_name}' for story '{story_id}' channel closed unexpectedly" )); } Err(_) => { return Err(format!( "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" )); } } } } /// Create a worktree for the given story using the server port (writes .mcp.json). pub async fn create_worktree( &self, project_root: &Path, story_id: &str, ) -> Result { let config = ProjectConfig::load(project_root)?; worktree::create_worktree(project_root, story_id, &config, self.port).await } /// Get project root helper. pub fn get_project_root(&self, state: &crate::state::SessionState) -> Result { state.get_project_root() } /// Get the log session ID and project root for an agent, if available. /// /// Used by MCP tools to find the persistent log file for a completed agent. pub fn get_log_info(&self, story_id: &str, agent_name: &str) -> Option<(String, PathBuf)> { let key = composite_key(story_id, agent_name); let agents = self.agents.lock().ok()?; let agent = agents.get(&key)?; let session_id = agent.log_session_id.clone()?; let project_root = agent.project_root.clone()?; Some((session_id, project_root)) } /// Remove all agent entries for a given story_id from the pool. /// /// Called when a story is archived so that stale entries don't accumulate. /// Returns the number of entries removed. pub fn remove_agents_for_story(&self, story_id: &str) -> usize { let mut agents = match self.agents.lock() { Ok(a) => a, Err(e) => { slog_error!("[agents] Failed to lock pool for cleanup of '{story_id}': {e}"); return 0; } }; let prefix = format!("{story_id}:"); let keys_to_remove: Vec = agents .keys() .filter(|k| k.starts_with(&prefix)) .cloned() .collect(); let count = keys_to_remove.len(); for key in &keys_to_remove { agents.remove(key); } if count > 0 { slog!("[agents] Removed {count} agent entries for archived story '{story_id}'"); } count } /// Test helper: inject a pre-built agent entry so unit tests can exercise /// wait/subscribe logic without spawning a real process. #[cfg(test)] pub fn inject_test_agent( &self, story_id: &str, agent_name: &str, status: AgentStatus, ) -> broadcast::Sender { let (tx, _) = broadcast::channel::(64); let key = composite_key(story_id, agent_name); let mut agents = self.agents.lock().unwrap(); agents.insert( key, StoryAgent { agent_name: agent_name.to_string(), status, worktree_info: None, session_id: None, tx: tx.clone(), task_handle: None, event_log: Arc::new(Mutex::new(Vec::new())), completion: None, project_root: None, log_session_id: None, merge_failure_reported: false, }, ); tx } /// Test helper: inject an agent with a specific worktree path for testing /// gate-related logic. #[cfg(test)] pub fn inject_test_agent_with_path( &self, story_id: &str, agent_name: &str, status: AgentStatus, worktree_path: PathBuf, ) -> broadcast::Sender { let (tx, _) = broadcast::channel::(64); let key = composite_key(story_id, agent_name); let mut agents = self.agents.lock().unwrap(); agents.insert( key, StoryAgent { agent_name: agent_name.to_string(), status, worktree_info: Some(WorktreeInfo { path: worktree_path, branch: format!("feature/story-{story_id}"), base_branch: "master".to_string(), }), session_id: None, tx: tx.clone(), task_handle: None, event_log: Arc::new(Mutex::new(Vec::new())), completion: None, project_root: None, log_session_id: None, merge_failure_reported: false, }, ); tx } /// Test helper: inject an agent with a completion report and project_root /// for testing pipeline advance logic without spawning real agents. #[cfg(test)] pub fn inject_test_agent_with_completion( &self, story_id: &str, agent_name: &str, status: AgentStatus, project_root: PathBuf, completion: CompletionReport, ) -> broadcast::Sender { let (tx, _) = broadcast::channel::(64); let key = composite_key(story_id, agent_name); let mut agents = self.agents.lock().unwrap(); agents.insert( key, StoryAgent { agent_name: agent_name.to_string(), status, worktree_info: None, session_id: None, tx: tx.clone(), task_handle: None, event_log: Arc::new(Mutex::new(Vec::new())), completion: Some(completion), project_root: Some(project_root), log_session_id: None, merge_failure_reported: false, }, ); tx } /// Inject a Running agent with a pre-built (possibly finished) task handle. /// Used by watchdog tests to simulate an orphaned agent. #[cfg(test)] pub fn inject_test_agent_with_handle( &self, story_id: &str, agent_name: &str, status: AgentStatus, task_handle: tokio::task::JoinHandle<()>, ) -> broadcast::Sender { let (tx, _) = broadcast::channel::(64); let key = composite_key(story_id, agent_name); let mut agents = self.agents.lock().unwrap(); agents.insert( key, StoryAgent { agent_name: agent_name.to_string(), status, worktree_info: None, session_id: None, tx: tx.clone(), task_handle: Some(task_handle), event_log: Arc::new(Mutex::new(Vec::new())), completion: None, project_root: None, log_session_id: None, merge_failure_reported: false, }, ); tx } /// Test helper: inject a child killer into the registry. #[cfg(test)] pub fn inject_child_killer(&self, key: &str, killer: Box) { let mut killers = self.child_killers.lock().unwrap(); killers.insert(key.to_string(), killer); } /// Test helper: return the number of registered child killers. #[cfg(test)] pub fn child_killer_count(&self) -> usize { self.child_killers.lock().unwrap().len() } } /// Return the active pipeline stage directory name for `story_id`, or `None` if the /// story is not in any active stage (`2_current/`, `3_qa/`, `4_merge/`). fn find_active_story_stage(project_root: &Path, story_id: &str) -> Option<&'static str> { const STAGES: [&str; 3] = ["2_current", "3_qa", "4_merge"]; for stage in &STAGES { let path = project_root .join(".storkit") .join("work") .join(stage) .join(format!("{story_id}.md")); if path.exists() { return Some(stage); } } None } #[cfg(test)] mod tests { use super::*; use crate::agents::{AgentEvent, AgentStatus, PipelineStage}; use crate::config::ProjectConfig; use portable_pty::{CommandBuilder, PtySize, native_pty_system}; use std::process::Command; fn make_config(toml_str: &str) -> ProjectConfig { ProjectConfig::parse(toml_str).unwrap() } #[tokio::test] async fn wait_for_agent_returns_immediately_if_completed() { let pool = AgentPool::new_test(3001); pool.inject_test_agent("s1", "bot", AgentStatus::Completed); let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap(); assert_eq!(info.status, AgentStatus::Completed); assert_eq!(info.story_id, "s1"); assert_eq!(info.agent_name, "bot"); } #[tokio::test] async fn wait_for_agent_returns_immediately_if_failed() { let pool = AgentPool::new_test(3001); pool.inject_test_agent("s2", "bot", AgentStatus::Failed); let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap(); assert_eq!(info.status, AgentStatus::Failed); } #[tokio::test] async fn wait_for_agent_completes_on_done_event() { let pool = AgentPool::new_test(3001); let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running); // Send Done event after a short delay let tx_clone = tx.clone(); tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_millis(50)).await; let _ = tx_clone.send(AgentEvent::Done { story_id: "s3".to_string(), agent_name: "bot".to_string(), session_id: Some("sess-abc".to_string()), }); }); let info = pool.wait_for_agent("s3", "bot", 2000).await.unwrap(); assert_eq!(info.story_id, "s3"); } #[tokio::test] async fn wait_for_agent_times_out() { let pool = AgentPool::new_test(3001); pool.inject_test_agent("s4", "bot", AgentStatus::Running); let result = pool.wait_for_agent("s4", "bot", 50).await; assert!(result.is_err()); let msg = result.unwrap_err(); assert!(msg.contains("Timed out"), "unexpected message: {msg}"); } #[tokio::test] async fn wait_for_agent_errors_for_nonexistent() { let pool = AgentPool::new_test(3001); let result = pool.wait_for_agent("no_story", "no_bot", 100).await; assert!(result.is_err()); } #[tokio::test] async fn wait_for_agent_completes_on_stopped_status_event() { let pool = AgentPool::new_test(3001); let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running); let tx_clone = tx.clone(); tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_millis(30)).await; let _ = tx_clone.send(AgentEvent::Status { story_id: "s5".to_string(), agent_name: "bot".to_string(), status: "stopped".to_string(), }); }); let info = pool.wait_for_agent("s5", "bot", 2000).await.unwrap(); assert_eq!(info.story_id, "s5"); } // ── kill_all_children tests ──────────────────────────────────── /// Returns true if a process with the given PID is currently running. fn process_is_running(pid: u32) -> bool { Command::new("ps") .args(["-p", &pid.to_string()]) .output() .map(|o| o.status.success()) .unwrap_or(false) } #[test] fn kill_all_children_is_safe_on_empty_pool() { let pool = AgentPool::new_test(3001); pool.kill_all_children(); assert_eq!(pool.child_killer_count(), 0); } #[test] fn kill_all_children_kills_real_process() { let pool = AgentPool::new_test(3001); let pty_system = native_pty_system(); let pair = pty_system .openpty(PtySize { rows: 24, cols: 80, pixel_width: 0, pixel_height: 0, }) .expect("failed to open pty"); let mut cmd = CommandBuilder::new("sleep"); cmd.arg("100"); let mut child = pair .slave .spawn_command(cmd) .expect("failed to spawn sleep"); let pid = child.process_id().expect("no pid"); pool.inject_child_killer("story:agent", child.clone_killer()); assert!( process_is_running(pid), "process {pid} should be running before kill_all_children" ); pool.kill_all_children(); let _ = child.wait(); assert!( !process_is_running(pid), "process {pid} should have been killed by kill_all_children" ); } #[test] fn kill_all_children_clears_registry() { let pool = AgentPool::new_test(3001); let pty_system = native_pty_system(); let pair = pty_system .openpty(PtySize { rows: 24, cols: 80, pixel_width: 0, pixel_height: 0, }) .expect("failed to open pty"); let mut cmd = CommandBuilder::new("sleep"); cmd.arg("1"); let mut child = pair .slave .spawn_command(cmd) .expect("failed to spawn sleep"); pool.inject_child_killer("story:agent", child.clone_killer()); assert_eq!(pool.child_killer_count(), 1); pool.kill_all_children(); let _ = child.wait(); assert_eq!( pool.child_killer_count(), 0, "child_killers should be cleared after kill_all_children" ); } // ── available_agents_for_stage tests (story 190) ────────────────────────── #[test] fn available_agents_for_stage_returns_idle_agents() { let config = make_config( r#" [[agent]] name = "coder-1" stage = "coder" [[agent]] name = "coder-2" stage = "coder" [[agent]] name = "qa" stage = "qa" "#, ); let pool = AgentPool::new_test(3001); pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); let available = pool .available_agents_for_stage(&config, &PipelineStage::Coder) .unwrap(); assert_eq!(available, vec!["coder-2"]); let available_qa = pool .available_agents_for_stage(&config, &PipelineStage::Qa) .unwrap(); assert_eq!(available_qa, vec!["qa"]); } #[test] fn available_agents_for_stage_returns_empty_when_all_busy() { let config = make_config( r#" [[agent]] name = "coder-1" stage = "coder" "#, ); let pool = AgentPool::new_test(3001); pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); let available = pool .available_agents_for_stage(&config, &PipelineStage::Coder) .unwrap(); assert!(available.is_empty()); } #[test] fn available_agents_for_stage_ignores_completed_agents() { let config = make_config( r#" [[agent]] name = "coder-1" stage = "coder" "#, ); let pool = AgentPool::new_test(3001); pool.inject_test_agent("story-1", "coder-1", AgentStatus::Completed); let available = pool .available_agents_for_stage(&config, &PipelineStage::Coder) .unwrap(); assert_eq!(available, vec!["coder-1"]); } #[tokio::test] async fn start_agent_auto_selects_second_coder_when_first_busy() { let tmp = tempfile::tempdir().unwrap(); let sk = tmp.path().join(".storkit"); std::fs::create_dir_all(&sk).unwrap(); std::fs::write( sk.join("project.toml"), r#" [[agent]] name = "supervisor" stage = "other" [[agent]] name = "coder-1" stage = "coder" [[agent]] name = "coder-2" stage = "coder" "#, ) .unwrap(); let pool = AgentPool::new_test(3001); pool.inject_test_agent("other-story", "coder-1", AgentStatus::Running); let result = pool .start_agent(tmp.path(), "42_my_story", None, None) .await; match result { Ok(info) => { assert_eq!(info.agent_name, "coder-2"); } Err(err) => { assert!( !err.contains("All coder agents are busy"), "should have selected coder-2 but got: {err}" ); assert!( !err.contains("No coder agent configured"), "should not fail on agent selection, got: {err}" ); } } } #[tokio::test] async fn start_agent_returns_busy_when_all_coders_occupied() { let tmp = tempfile::tempdir().unwrap(); let sk = tmp.path().join(".storkit"); std::fs::create_dir_all(&sk).unwrap(); std::fs::write( sk.join("project.toml"), r#" [[agent]] name = "coder-1" stage = "coder" [[agent]] name = "coder-2" stage = "coder" "#, ) .unwrap(); let pool = AgentPool::new_test(3001); pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); pool.inject_test_agent("story-2", "coder-2", AgentStatus::Pending); let result = pool.start_agent(tmp.path(), "story-3", None, None).await; assert!(result.is_err()); let err = result.unwrap_err(); assert!( err.contains("All coder agents are busy"), "expected busy error, got: {err}" ); } #[tokio::test] async fn start_agent_moves_story_to_current_when_coders_busy() { let tmp = tempfile::tempdir().unwrap(); let sk = tmp.path().join(".storkit"); let backlog = sk.join("work/1_backlog"); std::fs::create_dir_all(&backlog).unwrap(); std::fs::write( sk.join("project.toml"), r#" [[agent]] name = "coder-1" stage = "coder" "#, ) .unwrap(); std::fs::write(backlog.join("story-3.md"), "---\nname: Story 3\n---\n").unwrap(); let pool = AgentPool::new_test(3001); pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); let result = pool.start_agent(tmp.path(), "story-3", None, None).await; assert!(result.is_err()); let err = result.unwrap_err(); assert!( err.contains("All coder agents are busy"), "expected busy error, got: {err}" ); assert!( err.contains("queued in work/2_current/"), "expected story-to-current message, got: {err}" ); let current_path = sk.join("work/2_current/story-3.md"); assert!( current_path.exists(), "story should be in 2_current/ after busy error, but was not" ); let backlog_path = backlog.join("story-3.md"); assert!( !backlog_path.exists(), "story should no longer be in 1_backlog/" ); } #[tokio::test] async fn start_agent_story_already_in_current_is_noop() { let tmp = tempfile::tempdir().unwrap(); let sk = tmp.path().join(".storkit"); let current = sk.join("work/2_current"); std::fs::create_dir_all(¤t).unwrap(); std::fs::write( sk.join("project.toml"), "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", ) .unwrap(); std::fs::write(current.join("story-5.md"), "---\nname: Story 5\n---\n").unwrap(); let pool = AgentPool::new_test(3001); let result = pool.start_agent(tmp.path(), "story-5", None, None).await; match result { Ok(_) => {} Err(e) => { assert!( !e.contains("Failed to move"), "should not fail on idempotent move, got: {e}" ); } } } #[tokio::test] async fn start_agent_explicit_name_unchanged_when_busy() { let tmp = tempfile::tempdir().unwrap(); let sk = tmp.path().join(".storkit"); std::fs::create_dir_all(&sk).unwrap(); std::fs::write( sk.join("project.toml"), r#" [[agent]] name = "coder-1" stage = "coder" [[agent]] name = "coder-2" stage = "coder" "#, ) .unwrap(); let pool = AgentPool::new_test(3001); pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); let result = pool .start_agent(tmp.path(), "story-2", Some("coder-1"), None) .await; assert!(result.is_err()); let err = result.unwrap_err(); assert!( err.contains("coder-1") && err.contains("already running"), "expected explicit busy error, got: {err}" ); } // ── start_agent single-instance concurrency tests ───────────────────────── #[tokio::test] async fn start_agent_rejects_when_same_agent_already_running_on_another_story() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let sk_dir = root.join(".storkit"); fs::create_dir_all(&sk_dir).unwrap(); fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap(); let pool = AgentPool::new_test(3001); pool.inject_test_agent("story-a", "qa", AgentStatus::Running); let result = pool.start_agent(root, "story-b", Some("qa"), None).await; assert!( result.is_err(), "start_agent should fail when qa is already running on another story" ); let err = result.unwrap_err(); assert!( err.contains("already running") || err.contains("becomes available"), "error message should explain why: got '{err}'" ); } #[tokio::test] async fn start_agent_allows_new_story_when_previous_run_is_completed() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let sk_dir = root.join(".storkit"); fs::create_dir_all(&sk_dir).unwrap(); fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap(); let pool = AgentPool::new_test(3001); pool.inject_test_agent("story-a", "qa", AgentStatus::Completed); let result = pool.start_agent(root, "story-b", Some("qa"), None).await; if let Err(ref e) = result { assert!( !e.contains("already running") && !e.contains("becomes available"), "completed agent must not trigger the concurrency guard: got '{e}'" ); } } // ── bug 118: pending entry cleanup on start_agent failure ──────────────── #[tokio::test] async fn start_agent_cleans_up_pending_entry_on_failure() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let sk_dir = root.join(".storkit"); fs::create_dir_all(&sk_dir).unwrap(); fs::write( sk_dir.join("project.toml"), "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", ) .unwrap(); let upcoming = root.join(".storkit/work/1_backlog"); fs::create_dir_all(&upcoming).unwrap(); fs::write(upcoming.join("50_story_test.md"), "---\nname: Test\n---\n").unwrap(); let pool = AgentPool::new_test(3099); let result = pool .start_agent(root, "50_story_test", Some("coder-1"), None) .await; assert!( result.is_ok(), "start_agent should return Ok(Pending) immediately: {:?}", result.err() ); assert_eq!( result.unwrap().status, AgentStatus::Pending, "initial status must be Pending" ); let final_info = pool .wait_for_agent("50_story_test", "coder-1", 5000) .await .expect("wait_for_agent should not time out"); assert_eq!( final_info.status, AgentStatus::Failed, "agent must transition to Failed after worktree creation error" ); let agents = pool.agents.lock().unwrap(); let failed_entry = agents .values() .find(|a| a.agent_name == "coder-1" && a.status == AgentStatus::Failed); assert!( failed_entry.is_some(), "agent pool must retain a Failed entry so the UI can show the error state" ); drop(agents); let events = pool .drain_events("50_story_test", "coder-1") .expect("drain_events should succeed"); let has_error_event = events.iter().any(|e| matches!(e, AgentEvent::Error { .. })); assert!( has_error_event, "event_log must contain AgentEvent::Error after worktree creation fails" ); } #[tokio::test] async fn start_agent_guard_does_not_remove_running_entry() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let sk_dir = root.join(".storkit"); fs::create_dir_all(&sk_dir).unwrap(); fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap(); let pool = AgentPool::new_test(3099); pool.inject_test_agent("story-x", "qa", AgentStatus::Running); let result = pool.start_agent(root, "story-y", Some("qa"), None).await; assert!(result.is_err()); let err = result.unwrap_err(); assert!( err.contains("already running") || err.contains("becomes available"), "running entry must survive: got '{err}'" ); } // ── TOCTOU race-condition regression tests (story 132) ─────────────────── #[tokio::test] async fn toctou_pending_entry_blocks_same_agent_on_different_story() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let sk_dir = root.join(".storkit"); fs::create_dir_all(&sk_dir).unwrap(); fs::write( sk_dir.join("project.toml"), "[[agent]]\nname = \"coder-1\"\n", ) .unwrap(); let pool = AgentPool::new_test(3099); pool.inject_test_agent("86_story_foo", "coder-1", AgentStatus::Pending); let result = pool .start_agent(root, "130_story_bar", Some("coder-1"), None) .await; assert!(result.is_err(), "second start_agent must be rejected"); let err = result.unwrap_err(); assert!( err.contains("already running") || err.contains("becomes available"), "expected concurrency-rejection message, got: '{err}'" ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn toctou_concurrent_start_agent_same_agent_exactly_one_concurrency_rejection() { use std::fs; use std::sync::Arc; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path().to_path_buf(); let sk_dir = root.join(".storkit"); fs::create_dir_all(sk_dir.join("work/1_backlog")).unwrap(); fs::write( root.join(".storkit/project.toml"), "[[agent]]\nname = \"coder-1\"\n", ) .unwrap(); fs::write( root.join(".storkit/work/1_backlog/86_story_foo.md"), "---\nname: Foo\n---\n", ) .unwrap(); fs::write( root.join(".storkit/work/1_backlog/130_story_bar.md"), "---\nname: Bar\n---\n", ) .unwrap(); let pool = Arc::new(AgentPool::new_test(3099)); let pool1 = pool.clone(); let root1 = root.clone(); let t1 = tokio::spawn(async move { pool1 .start_agent(&root1, "86_story_foo", Some("coder-1"), None) .await }); let pool2 = pool.clone(); let root2 = root.clone(); let t2 = tokio::spawn(async move { pool2 .start_agent(&root2, "130_story_bar", Some("coder-1"), None) .await }); let (r1, r2) = tokio::join!(t1, t2); let r1 = r1.unwrap(); let r2 = r2.unwrap(); let concurrency_rejections = [&r1, &r2] .iter() .filter(|r| { r.as_ref().is_err_and(|e| { e.contains("already running") || e.contains("becomes available") }) }) .count(); assert_eq!( concurrency_rejections, 1, "exactly one call must be rejected by the concurrency check; \ got r1={r1:?} r2={r2:?}" ); } // ── story-230: prevent duplicate stage agents on same story ─────────────── #[tokio::test] async fn start_agent_rejects_second_coder_stage_on_same_story() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let sk_dir = root.join(".storkit"); fs::create_dir_all(&sk_dir).unwrap(); fs::write( sk_dir.join("project.toml"), "[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n", ) .unwrap(); let pool = AgentPool::new_test(3099); pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running); let result = pool .start_agent(root, "42_story_foo", Some("coder-2"), None) .await; assert!( result.is_err(), "second coder on same story must be rejected" ); let err = result.unwrap_err(); assert!( err.contains("same pipeline stage"), "error must mention same pipeline stage, got: '{err}'" ); assert!( err.contains("coder-1") && err.contains("coder-2"), "error must name both agents, got: '{err}'" ); } #[tokio::test] async fn start_agent_rejects_second_qa_stage_on_same_story() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let sk_dir = root.join(".storkit"); fs::create_dir_all(&sk_dir).unwrap(); fs::write( sk_dir.join("project.toml"), "[[agent]]\nname = \"qa-1\"\nstage = \"qa\"\n\n\ [[agent]]\nname = \"qa-2\"\nstage = \"qa\"\n", ) .unwrap(); let pool = AgentPool::new_test(3099); pool.inject_test_agent("55_story_bar", "qa-1", AgentStatus::Running); let result = pool .start_agent(root, "55_story_bar", Some("qa-2"), None) .await; assert!(result.is_err(), "second qa on same story must be rejected"); let err = result.unwrap_err(); assert!( err.contains("same pipeline stage"), "error must mention same pipeline stage, got: '{err}'" ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn start_agent_concurrent_two_coders_same_story_exactly_one_stage_rejection() { use std::fs; use std::sync::Arc; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path().to_path_buf(); let sk_dir = root.join(".storkit"); fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); fs::write( root.join(".storkit/project.toml"), "[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n", ) .unwrap(); fs::write( root.join(".storkit/work/2_current/42_story_foo.md"), "---\nname: Foo\n---\n", ) .unwrap(); let pool = Arc::new(AgentPool::new_test(3099)); let pool1 = pool.clone(); let root1 = root.clone(); let t1 = tokio::spawn(async move { pool1 .start_agent(&root1, "42_story_foo", Some("coder-1"), None) .await }); let pool2 = pool.clone(); let root2 = root.clone(); let t2 = tokio::spawn(async move { pool2 .start_agent(&root2, "42_story_foo", Some("coder-2"), None) .await }); let (r1, r2) = tokio::join!(t1, t2); let r1 = r1.unwrap(); let r2 = r2.unwrap(); let stage_rejections = [&r1, &r2] .iter() .filter(|r| r.as_ref().is_err_and(|e| e.contains("same pipeline stage"))) .count(); assert_eq!( stage_rejections, 1, "exactly one call must be rejected by the stage-conflict check; \ got r1={r1:?} r2={r2:?}" ); } #[tokio::test] async fn start_agent_two_coders_different_stories_not_blocked_by_stage_check() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let sk_dir = root.join(".storkit"); fs::create_dir_all(sk_dir.join("work/1_backlog")).unwrap(); fs::write( root.join(".storkit/project.toml"), "[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n", ) .unwrap(); fs::write( root.join(".storkit/work/1_backlog/99_story_baz.md"), "---\nname: Baz\n---\n", ) .unwrap(); let pool = AgentPool::new_test(3099); pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running); let result = pool .start_agent(root, "99_story_baz", Some("coder-2"), None) .await; if let Err(ref e) = result { assert!( !e.contains("same pipeline stage"), "stage-conflict guard must not fire for agents on different stories; \ got: '{e}'" ); } } // ── bug 312: stage-pipeline mismatch guard in start_agent ────────────── #[tokio::test] async fn start_agent_rejects_mergemaster_on_coding_stage_story() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let sk_dir = root.join(".storkit"); fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); fs::write( sk_dir.join("project.toml"), "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ [[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", ) .unwrap(); fs::write( sk_dir.join("work/2_current/310_story_foo.md"), "---\nname: Foo\n---\n", ) .unwrap(); let pool = AgentPool::new_test(3099); let result = pool .start_agent(root, "310_story_foo", Some("mergemaster"), None) .await; assert!( result.is_err(), "mergemaster must not be assigned to a story in 2_current/" ); let err = result.unwrap_err(); assert!( err.contains("stage") && err.contains("2_current"), "error must mention stage mismatch, got: '{err}'" ); } #[tokio::test] async fn start_agent_rejects_coder_on_qa_stage_story() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let sk_dir = root.join(".storkit"); fs::create_dir_all(sk_dir.join("work/3_qa")).unwrap(); fs::write( sk_dir.join("project.toml"), "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ [[agent]]\nname = \"qa\"\nstage = \"qa\"\n", ) .unwrap(); fs::write( sk_dir.join("work/3_qa/42_story_bar.md"), "---\nname: Bar\n---\n", ) .unwrap(); let pool = AgentPool::new_test(3099); let result = pool .start_agent(root, "42_story_bar", Some("coder-1"), None) .await; assert!( result.is_err(), "coder must not be assigned to a story in 3_qa/" ); let err = result.unwrap_err(); assert!( err.contains("stage") && err.contains("3_qa"), "error must mention stage mismatch, got: '{err}'" ); } #[tokio::test] async fn start_agent_rejects_qa_on_merge_stage_story() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let sk_dir = root.join(".storkit"); fs::create_dir_all(sk_dir.join("work/4_merge")).unwrap(); fs::write( sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\nstage = \"qa\"\n\n\ [[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", ) .unwrap(); fs::write( sk_dir.join("work/4_merge/55_story_baz.md"), "---\nname: Baz\n---\n", ) .unwrap(); let pool = AgentPool::new_test(3099); let result = pool .start_agent(root, "55_story_baz", Some("qa"), None) .await; assert!( result.is_err(), "qa must not be assigned to a story in 4_merge/" ); let err = result.unwrap_err(); assert!( err.contains("stage") && err.contains("4_merge"), "error must mention stage mismatch, got: '{err}'" ); } #[tokio::test] async fn start_agent_allows_supervisor_on_any_stage() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let sk_dir = root.join(".storkit"); fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); fs::write( sk_dir.join("project.toml"), "[[agent]]\nname = \"supervisor\"\nstage = \"other\"\n", ) .unwrap(); fs::write( sk_dir.join("work/2_current/77_story_sup.md"), "---\nname: Sup\n---\n", ) .unwrap(); let pool = AgentPool::new_test(3099); let result = pool .start_agent(root, "77_story_sup", Some("supervisor"), None) .await; match result { Ok(_) => {} Err(e) => { assert!( !e.contains("stage:") || !e.contains("cannot be assigned"), "supervisor should not be rejected for stage mismatch, got: '{e}'" ); } } } #[tokio::test] async fn start_agent_allows_correct_stage_agent() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let sk_dir = root.join(".storkit"); fs::create_dir_all(sk_dir.join("work/4_merge")).unwrap(); fs::write( sk_dir.join("project.toml"), "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", ) .unwrap(); fs::write( sk_dir.join("work/4_merge/88_story_ok.md"), "---\nname: OK\n---\n", ) .unwrap(); let pool = AgentPool::new_test(3099); let result = pool .start_agent(root, "88_story_ok", Some("mergemaster"), None) .await; match result { Ok(_) => {} Err(e) => { assert!( !e.contains("cannot be assigned"), "mergemaster on 4_merge/ story should not fail stage check, got: '{e}'" ); } } } // ── find_active_story_stage tests ───────────────────────────────────────── #[test] fn find_active_story_stage_detects_current() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let current = root.join(".storkit/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write(current.join("10_story_test.md"), "test").unwrap(); assert_eq!( find_active_story_stage(root, "10_story_test"), Some("2_current") ); } #[test] fn find_active_story_stage_detects_qa() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let qa = root.join(".storkit/work/3_qa"); fs::create_dir_all(&qa).unwrap(); fs::write(qa.join("11_story_test.md"), "test").unwrap(); assert_eq!(find_active_story_stage(root, "11_story_test"), Some("3_qa")); } #[test] fn find_active_story_stage_detects_merge() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let merge = root.join(".storkit/work/4_merge"); fs::create_dir_all(&merge).unwrap(); fs::write(merge.join("12_story_test.md"), "test").unwrap(); assert_eq!( find_active_story_stage(root, "12_story_test"), Some("4_merge") ); } #[test] fn find_active_story_stage_returns_none_for_unknown_story() { let tmp = tempfile::tempdir().unwrap(); assert_eq!(find_active_story_stage(tmp.path(), "99_nonexistent"), None); } // ── remove_agents_for_story tests ──────────────────────────────────────── #[test] fn remove_agents_for_story_removes_all_entries() { let pool = AgentPool::new_test(3001); pool.inject_test_agent("story_a", "coder-1", AgentStatus::Completed); pool.inject_test_agent("story_a", "qa", AgentStatus::Failed); pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running); let removed = pool.remove_agents_for_story("story_a"); assert_eq!(removed, 2, "should remove both agents for story_a"); let agents = pool.list_agents().unwrap(); assert_eq!(agents.len(), 1, "only story_b agent should remain"); assert_eq!(agents[0].story_id, "story_b"); } #[test] fn remove_agents_for_story_returns_zero_when_no_match() { let pool = AgentPool::new_test(3001); pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running); let removed = pool.remove_agents_for_story("nonexistent"); assert_eq!(removed, 0); let agents = pool.list_agents().unwrap(); assert_eq!(agents.len(), 1, "existing agents should not be affected"); } // ── archive + cleanup integration test ─────────────────────────────────── #[tokio::test] async fn archiving_story_removes_agent_entries_from_pool() { use crate::agents::lifecycle::move_story_to_archived; use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let current = root.join(".storkit/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write(current.join("60_story_cleanup.md"), "test").unwrap(); let pool = AgentPool::new_test(3001); pool.inject_test_agent("60_story_cleanup", "coder-1", AgentStatus::Completed); pool.inject_test_agent("60_story_cleanup", "qa", AgentStatus::Completed); pool.inject_test_agent("61_story_other", "coder-1", AgentStatus::Running); assert_eq!(pool.list_agents().unwrap().len(), 3); move_story_to_archived(root, "60_story_cleanup").unwrap(); pool.remove_agents_for_story("60_story_cleanup"); let remaining = pool.list_agents().unwrap(); assert_eq!( remaining.len(), 1, "only the other story's agent should remain" ); assert_eq!(remaining[0].story_id, "61_story_other"); assert!( root.join(".storkit/work/5_done/60_story_cleanup.md") .exists() ); } }