diff --git a/server/src/agents/pool/lifecycle.rs b/server/src/agents/pool/lifecycle.rs new file mode 100644 index 00000000..b8fa5d15 --- /dev/null +++ b/server/src/agents/pool/lifecycle.rs @@ -0,0 +1,1763 @@ +use crate::agent_log::AgentLogWriter; +use crate::config::ProjectConfig; +use crate::slog; +use crate::slog_error; +use std::path::Path; +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; + +use super::super::{ + AgentEvent, AgentInfo, AgentStatus, PipelineStage, agent_config_stage, + pipeline_stage, +}; +use super::types::{PendingGuard, StoryAgent, agent_info_from_entry, composite_key}; +use super::{AgentPool, auto_assign}; +use super::worktree::find_active_story_stage; +use super::super::runtime::{AgentRuntime, ClaudeCodeRuntime, GeminiRuntime, OpenAiRuntime, RuntimeContext}; + +impl AgentPool { + /// Start an agent for a story: load config, create worktree, spawn agent. + /// + /// When `agent_name` is `None`, automatically selects the first idle coder + /// agent (story 190). If all coders are busy the call fails with an error + /// indicating the story will be picked up when one becomes available. + /// + /// If `resume_context` is provided, it is appended to the rendered prompt + /// so the agent can pick up from a previous failed attempt. + pub async fn start_agent( + &self, + project_root: &Path, + story_id: &str, + agent_name: Option<&str>, + resume_context: Option<&str>, + ) -> Result { + let config = ProjectConfig::load(project_root)?; + + // Validate explicit agent name early (no lock needed). + if let Some(name) = agent_name { + config + .find_agent(name) + .ok_or_else(|| format!("No agent named '{name}' in config"))?; + } + + // Create name-independent shared resources before the lock so they are + // ready for the atomic check-and-insert (story 132). + let (tx, _) = broadcast::channel::(1024); + let event_log: Arc>> = Arc::new(Mutex::new(Vec::new())); + let log_session_id = uuid::Uuid::new_v4().to_string(); + + // Move story from backlog/ to current/ before checking agent + // availability so that auto_assign_available_work can pick it up even + // when all coders are currently busy (story 203). This is idempotent: + // if the story is already in 2_current/ or a later stage, the call is + // a no-op. + crate::agents::lifecycle::move_story_to_current(project_root, story_id)?; + + // Validate that the agent's configured stage matches the story's + // pipeline stage. This prevents any caller (auto-assign, MCP tool, + // pipeline advance, supervisor) from starting a wrong-stage agent on + // a story — e.g. mergemaster on a coding-stage story (bug 312). + if let Some(name) = agent_name { + let agent_stage = config + .find_agent(name) + .map(agent_config_stage) + .unwrap_or_else(|| pipeline_stage(name)); + if agent_stage != PipelineStage::Other + && let Some(story_stage_dir) = find_active_story_stage(project_root, story_id) + { + let expected_stage = match story_stage_dir { + "2_current" => PipelineStage::Coder, + "3_qa" => PipelineStage::Qa, + "4_merge" => PipelineStage::Mergemaster, + _ => PipelineStage::Other, + }; + if expected_stage != PipelineStage::Other && expected_stage != agent_stage { + return Err(format!( + "Agent '{name}' (stage: {agent_stage:?}) cannot be assigned to \ + story '{story_id}' in {story_stage_dir}/ (requires stage: {expected_stage:?})" + )); + } + } + } + + // Read the preferred agent from the story's front matter before acquiring + // the lock. When no explicit agent_name is given, this lets start_agent + // honour `agent: coder-opus` written by the `assign` command — mirroring + // the auto_assign path (bug 379). + let front_matter_agent: Option = if agent_name.is_none() { + find_active_story_stage(project_root, story_id).and_then(|stage_dir| { + let path = project_root + .join(".storkit") + .join("work") + .join(stage_dir) + .join(format!("{story_id}.md")); + let contents = std::fs::read_to_string(path).ok()?; + crate::io::story_metadata::parse_front_matter(&contents).ok()?.agent + }) + } else { + None + }; + + // Atomically resolve agent name, check availability, and register as + // Pending. When `agent_name` is `None` the first idle coder is + // selected inside the lock so no TOCTOU race can occur between the + // availability check and the Pending insert (story 132, story 190). + // + // The `PendingGuard` ensures that if any step below fails the entry is + // removed from the pool so it does not permanently block auto-assign + // (bug 118). + let resolved_name: String; + let key: String; + { + let mut agents = self.agents.lock().map_err(|e| e.to_string())?; + + resolved_name = match agent_name { + Some(name) => name.to_string(), + None => { + // Honour the `agent:` field in the story's front matter so that + // `start 368` after `assign 368 opus` picks the right agent + // (bug 379). Mirrors the auto_assign selection logic. + if let Some(ref pref) = front_matter_agent { + let stage_matches = config + .find_agent(pref) + .map(|cfg| agent_config_stage(cfg) == PipelineStage::Coder) + .unwrap_or(false); + if stage_matches { + if auto_assign::is_agent_free(&agents, pref) { + pref.clone() + } else { + return Err(format!( + "Preferred agent '{pref}' from story front matter is busy; \ + story '{story_id}' has been queued in work/2_current/ and will \ + be auto-assigned when it becomes available" + )); + } + } else { + // Stage mismatch — fall back to any free coder. + auto_assign::find_free_agent_for_stage( + &config, + &agents, + &PipelineStage::Coder, + ) + .map(|s| s.to_string()) + .ok_or_else(|| { + if config + .agent + .iter() + .any(|a| agent_config_stage(a) == PipelineStage::Coder) + { + format!( + "All coder agents are busy; story '{story_id}' has been \ + queued in work/2_current/ and will be auto-assigned when \ + one becomes available" + ) + } else { + "No coder agent configured. Specify an agent_name explicitly." + .to_string() + } + })? + } + } else { + auto_assign::find_free_agent_for_stage( + &config, + &agents, + &PipelineStage::Coder, + ) + .map(|s| s.to_string()) + .ok_or_else(|| { + if config + .agent + .iter() + .any(|a| agent_config_stage(a) == PipelineStage::Coder) + { + format!( + "All coder agents are busy; story '{story_id}' has been \ + queued in work/2_current/ and will be auto-assigned when \ + one becomes available" + ) + } else { + "No coder agent configured. Specify an agent_name explicitly." + .to_string() + } + })? + } + } + }; + + key = composite_key(story_id, &resolved_name); + + // Check for duplicate assignment (same story + same agent already active). + if let Some(agent) = agents.get(&key) + && (agent.status == AgentStatus::Running || agent.status == AgentStatus::Pending) + { + return Err(format!( + "Agent '{resolved_name}' for story '{story_id}' is already {}", + agent.status + )); + } + // Enforce single-stage concurrency: reject if there is already a + // Running/Pending agent at the same pipeline stage for this story. + // This prevents two coders (or two QA/mergemaster agents) from + // corrupting each other's work in the same worktree. + // Applies to both explicit and auto-selected agents; the Other + // stage (supervisors, unknown agents) is exempt. + let resolved_stage = config + .find_agent(&resolved_name) + .map(agent_config_stage) + .unwrap_or_else(|| pipeline_stage(&resolved_name)); + if resolved_stage != PipelineStage::Other + && let Some(conflicting_name) = agents.iter().find_map(|(k, a)| { + let k_story = k.rsplit_once(':').map(|(s, _)| s).unwrap_or(k); + if k_story == story_id + && a.agent_name != resolved_name + && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) + { + let a_stage = config + .find_agent(&a.agent_name) + .map(agent_config_stage) + .unwrap_or_else(|| pipeline_stage(&a.agent_name)); + if a_stage == resolved_stage { + Some(a.agent_name.clone()) + } else { + None + } + } else { + None + } + }) + { + return Err(format!( + "Cannot start '{resolved_name}' on story '{story_id}': \ + '{conflicting_name}' is already active at the same pipeline stage" + )); + } + // Enforce single-instance concurrency for explicitly-named agents: + // if this agent is already running on any other story, reject. + // Auto-selected agents are already guaranteed idle by + // find_free_agent_for_stage, so this check is only needed for + // explicit requests. + if agent_name.is_some() + && let Some(busy_story) = agents.iter().find_map(|(k, a)| { + if a.agent_name == resolved_name + && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) + { + Some( + k.rsplit_once(':') + .map(|(sid, _)| sid) + .unwrap_or(k) + .to_string(), + ) + } else { + None + } + }) + { + return Err(format!( + "Agent '{resolved_name}' is already running on story '{busy_story}'; \ + story '{story_id}' will be picked up when the agent becomes available" + )); + } + agents.insert( + key.clone(), + StoryAgent { + agent_name: resolved_name.clone(), + status: AgentStatus::Pending, + worktree_info: None, + session_id: None, + tx: tx.clone(), + task_handle: None, + event_log: event_log.clone(), + completion: None, + project_root: Some(project_root.to_path_buf()), + log_session_id: Some(log_session_id.clone()), + merge_failure_reported: false, + }, + ); + } + let mut pending_guard = PendingGuard::new(self.agents.clone(), key.clone()); + + // Create persistent log writer (needs resolved_name, so must be after + // the atomic resolution above). + let log_writer = + match AgentLogWriter::new(project_root, story_id, &resolved_name, &log_session_id) { + Ok(w) => Some(Arc::new(Mutex::new(w))), + Err(e) => { + eprintln!( + "[agents] Failed to create log writer for {story_id}:{resolved_name}: {e}" + ); + None + } + }; + + // Notify WebSocket clients that a new agent is pending. + Self::notify_agent_state_changed(&self.watcher_tx); + + let _ = tx.send(AgentEvent::Status { + story_id: story_id.to_string(), + agent_name: resolved_name.clone(), + status: "pending".to_string(), + }); + + // Extract inactivity timeout from the agent config before cloning config. + let inactivity_timeout_secs = config + .find_agent(&resolved_name) + .map(|a| a.inactivity_timeout_secs) + .unwrap_or(300); + + // Clone all values needed inside the background spawn. + let project_root_clone = project_root.to_path_buf(); + let config_clone = config.clone(); + let resume_context_owned = resume_context.map(str::to_string); + let sid = story_id.to_string(); + let aname = resolved_name.clone(); + let tx_clone = tx.clone(); + let agents_ref = self.agents.clone(); + let key_clone = key.clone(); + let log_clone = event_log.clone(); + let port_for_task = self.port; + let log_writer_clone = log_writer.clone(); + let child_killers_clone = self.child_killers.clone(); + let watcher_tx_clone = self.watcher_tx.clone(); + + // Spawn the background task. Worktree creation and agent launch happen here + // so `start_agent` returns immediately after registering the agent as + // Pending — non-blocking by design (story 157). + let handle = tokio::spawn(async move { + // Step 1: create the worktree (slow — git checkout, pnpm install, etc.) + let wt_info = match crate::worktree::create_worktree( + &project_root_clone, + &sid, + &config_clone, + port_for_task, + ) + .await + { + Ok(wt) => wt, + Err(e) => { + let error_msg = format!("Failed to create worktree: {e}"); + slog_error!("[agents] {error_msg}"); + let event = AgentEvent::Error { + story_id: sid.clone(), + agent_name: aname.clone(), + message: error_msg, + }; + if let Ok(mut log) = log_clone.lock() { + log.push(event.clone()); + } + let _ = tx_clone.send(event); + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.status = AgentStatus::Failed; + } + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + return; + } + }; + + // Step 2: store worktree info and render agent command/args/prompt. + let wt_path_str = wt_info.path.to_string_lossy().to_string(); + { + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.worktree_info = Some(wt_info.clone()); + } + } + + let (command, args, mut prompt) = match config_clone.render_agent_args( + &wt_path_str, + &sid, + Some(&aname), + Some(&wt_info.base_branch), + ) { + Ok(result) => result, + Err(e) => { + let error_msg = format!("Failed to render agent args: {e}"); + slog_error!("[agents] {error_msg}"); + let event = AgentEvent::Error { + story_id: sid.clone(), + agent_name: aname.clone(), + message: error_msg, + }; + if let Ok(mut log) = log_clone.lock() { + log.push(event.clone()); + } + let _ = tx_clone.send(event); + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.status = AgentStatus::Failed; + } + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + return; + } + }; + + // Append resume context if this is a restart with failure information. + if let Some(ctx) = resume_context_owned { + prompt.push_str(&ctx); + } + + // Step 3: transition to Running now that the worktree is ready. + { + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.status = AgentStatus::Running; + } + } + let _ = tx_clone.send(AgentEvent::Status { + story_id: sid.clone(), + agent_name: aname.clone(), + status: "running".to_string(), + }); + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + + // Step 4: launch the agent process via the configured runtime. + let runtime_name = config_clone + .find_agent(&aname) + .and_then(|a| a.runtime.as_deref()) + .unwrap_or("claude-code"); + + let run_result = match runtime_name { + "claude-code" => { + let runtime = ClaudeCodeRuntime::new(child_killers_clone.clone(), watcher_tx_clone.clone()); + let ctx = RuntimeContext { + story_id: sid.clone(), + agent_name: aname.clone(), + command, + args, + prompt, + cwd: wt_path_str, + inactivity_timeout_secs, + mcp_port: port_for_task, + }; + runtime + .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) + .await + } + "gemini" => { + let runtime = GeminiRuntime::new(); + let ctx = RuntimeContext { + story_id: sid.clone(), + agent_name: aname.clone(), + command, + args, + prompt, + cwd: wt_path_str, + inactivity_timeout_secs, + mcp_port: port_for_task, + }; + runtime + .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) + .await + } + "openai" => { + let runtime = OpenAiRuntime::new(); + let ctx = RuntimeContext { + story_id: sid.clone(), + agent_name: aname.clone(), + command, + args, + prompt, + cwd: wt_path_str, + inactivity_timeout_secs, + mcp_port: port_for_task, + }; + runtime + .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) + .await + } + other => Err(format!( + "Unknown agent runtime '{other}'; check the 'runtime' field in project.toml. \ + Supported: 'claude-code', 'gemini', 'openai'" + )), + }; + + match run_result { + Ok(result) => { + // Persist token usage if the agent reported it. + if let Some(ref usage) = result.token_usage + && let Ok(agents) = agents_ref.lock() + && let Some(agent) = agents.get(&key_clone) + && let Some(ref pr) = agent.project_root + { + let model = config_clone + .find_agent(&aname) + .and_then(|a| a.model.clone()); + let record = crate::agents::token_usage::build_record( + &sid, &aname, model, usage.clone(), + ); + if let Err(e) = crate::agents::token_usage::append_record(pr, &record) { + slog_error!( + "[agents] Failed to persist token usage for \ + {sid}:{aname}: {e}" + ); + } + } + + // Server-owned completion: run acceptance gates automatically + // when the agent process exits normally. + super::pipeline::run_server_owned_completion( + &agents_ref, + port_for_task, + &sid, + &aname, + result.session_id, + watcher_tx_clone.clone(), + ) + .await; + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + } + Err(e) => { + slog_error!("[agents] Agent process error for {aname} on {sid}: {e}"); + let event = AgentEvent::Error { + story_id: sid.clone(), + agent_name: aname.clone(), + message: e, + }; + if let Ok(mut log) = log_clone.lock() { + log.push(event.clone()); + } + let _ = tx_clone.send(event); + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.status = AgentStatus::Failed; + } + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + } + } + }); + + // Store the task handle while the agent is still Pending. + { + let mut agents = self.agents.lock().map_err(|e| e.to_string())?; + if let Some(agent) = agents.get_mut(&key) { + agent.task_handle = Some(handle); + } + } + + // Agent successfully spawned — prevent the guard from removing the entry. + pending_guard.disarm(); + + Ok(AgentInfo { + story_id: story_id.to_string(), + agent_name: resolved_name, + status: AgentStatus::Pending, + session_id: None, + worktree_path: None, + base_branch: None, + completion: None, + log_session_id: Some(log_session_id), + }) + } + + /// Stop a running agent. Worktree is preserved for inspection. + pub async fn stop_agent( + &self, + _project_root: &Path, + story_id: &str, + agent_name: &str, + ) -> Result<(), String> { + let key = composite_key(story_id, agent_name); + + let (worktree_info, task_handle, tx) = { + let mut agents = self.agents.lock().map_err(|e| e.to_string())?; + let agent = agents + .get_mut(&key) + .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; + + let wt = agent.worktree_info.clone(); + let handle = agent.task_handle.take(); + let tx = agent.tx.clone(); + agent.status = AgentStatus::Failed; + (wt, handle, tx) + }; + + // Abort the task and kill the PTY child process. + // Note: aborting a spawn_blocking task handle does not interrupt the blocking + // thread, so we must also kill the child process directly via the killer registry. + if let Some(handle) = task_handle { + handle.abort(); + let _ = handle.await; + } + self.kill_child_for_key(&key); + + // Preserve worktree for inspection — don't destroy agent's work on stop. + if let Some(ref wt) = worktree_info { + slog!( + "[agents] Worktree preserved for {story_id}:{agent_name}: {}", + wt.path.display() + ); + } + + let _ = tx.send(AgentEvent::Status { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + status: "stopped".to_string(), + }); + + // Remove from map + { + let mut agents = self.agents.lock().map_err(|e| e.to_string())?; + agents.remove(&key); + } + + // Notify WebSocket clients so pipeline board and agent panel update. + Self::notify_agent_state_changed(&self.watcher_tx); + + Ok(()) + } + + /// Block until the agent reaches a terminal state (completed, failed, stopped). + /// Returns the agent's final `AgentInfo`. + /// `timeout_ms` caps how long to wait; returns an error if the deadline passes. + pub async fn wait_for_agent( + &self, + story_id: &str, + agent_name: &str, + timeout_ms: u64, + ) -> Result { + // Subscribe before checking status so we don't miss the terminal event + // if the agent completes in the window between the two operations. + let mut rx = self.subscribe(story_id, agent_name)?; + + // Return immediately if already in a terminal state. + { + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let key = composite_key(story_id, agent_name); + if let Some(agent) = agents.get(&key) + && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) + { + return Ok(agent_info_from_entry(story_id, agent)); + } + } + + let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms); + + loop { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + if remaining.is_zero() { + return Err(format!( + "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" + )); + } + + match tokio::time::timeout(remaining, rx.recv()).await { + Ok(Ok(event)) => { + let is_terminal = match &event { + AgentEvent::Done { .. } | AgentEvent::Error { .. } => true, + AgentEvent::Status { status, .. } if status == "stopped" => true, + _ => false, + }; + if is_terminal { + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let key = composite_key(story_id, agent_name); + return Ok(if let Some(agent) = agents.get(&key) { + agent_info_from_entry(story_id, agent) + } else { + // Agent was removed from map (e.g. stop_agent removes it after + // the "stopped" status event is sent). + let (status, session_id) = match event { + AgentEvent::Done { session_id, .. } => { + (AgentStatus::Completed, session_id) + } + _ => (AgentStatus::Failed, None), + }; + AgentInfo { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + status, + session_id, + worktree_path: None, + base_branch: None, + completion: None, + log_session_id: None, + } + }); + } + } + Ok(Err(broadcast::error::RecvError::Lagged(_))) => { + // Missed some buffered events — check current status before resuming. + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let key = composite_key(story_id, agent_name); + if let Some(agent) = agents.get(&key) + && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) + { + return Ok(agent_info_from_entry(story_id, agent)); + } + // Still running — continue the loop. + } + Ok(Err(broadcast::error::RecvError::Closed)) => { + // Channel closed: no more events will arrive. Return current state. + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let key = composite_key(story_id, agent_name); + if let Some(agent) = agents.get(&key) { + return Ok(agent_info_from_entry(story_id, agent)); + } + return Err(format!( + "Agent '{agent_name}' for story '{story_id}' channel closed unexpectedly" + )); + } + Err(_) => { + return Err(format!( + "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" + )); + } + } + } + } + + /// Remove all agent entries for a given story_id from the pool. + /// + /// Called when a story is archived so that stale entries don't accumulate. + /// Returns the number of entries removed. + pub fn remove_agents_for_story(&self, story_id: &str) -> usize { + let mut agents = match self.agents.lock() { + Ok(a) => a, + Err(e) => { + slog_error!("[agents] Failed to lock pool for cleanup of '{story_id}': {e}"); + return 0; + } + }; + let prefix = format!("{story_id}:"); + let keys_to_remove: Vec = agents + .keys() + .filter(|k| k.starts_with(&prefix)) + .cloned() + .collect(); + let count = keys_to_remove.len(); + for key in &keys_to_remove { + agents.remove(key); + } + if count > 0 { + slog!("[agents] Removed {count} agent entries for archived story '{story_id}'"); + } + count + } +} + +#[cfg(test)] +mod tests { + use super::super::AgentPool; + use crate::agents::{AgentEvent, AgentStatus}; + + #[tokio::test] + async fn wait_for_agent_returns_immediately_if_completed() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("s1", "bot", AgentStatus::Completed); + + let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap(); + assert_eq!(info.status, AgentStatus::Completed); + assert_eq!(info.story_id, "s1"); + assert_eq!(info.agent_name, "bot"); + } + + #[tokio::test] + async fn wait_for_agent_returns_immediately_if_failed() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("s2", "bot", AgentStatus::Failed); + + let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap(); + assert_eq!(info.status, AgentStatus::Failed); + } + + #[tokio::test] + async fn wait_for_agent_completes_on_done_event() { + let pool = AgentPool::new_test(3001); + let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running); + + // Send Done event after a short delay + let tx_clone = tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + let _ = tx_clone.send(AgentEvent::Done { + story_id: "s3".to_string(), + agent_name: "bot".to_string(), + session_id: Some("sess-abc".to_string()), + }); + }); + + let info = pool.wait_for_agent("s3", "bot", 2000).await.unwrap(); + assert_eq!(info.story_id, "s3"); + } + + #[tokio::test] + async fn wait_for_agent_times_out() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("s4", "bot", AgentStatus::Running); + + let result = pool.wait_for_agent("s4", "bot", 50).await; + assert!(result.is_err()); + let msg = result.unwrap_err(); + assert!(msg.contains("Timed out"), "unexpected message: {msg}"); + } + + #[tokio::test] + async fn wait_for_agent_errors_for_nonexistent() { + let pool = AgentPool::new_test(3001); + let result = pool.wait_for_agent("no_story", "no_bot", 100).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn wait_for_agent_completes_on_stopped_status_event() { + let pool = AgentPool::new_test(3001); + let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running); + + let tx_clone = tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(30)).await; + let _ = tx_clone.send(AgentEvent::Status { + story_id: "s5".to_string(), + agent_name: "bot".to_string(), + status: "stopped".to_string(), + }); + }); + + let info = pool.wait_for_agent("s5", "bot", 2000).await.unwrap(); + assert_eq!(info.story_id, "s5"); + } + + #[tokio::test] + async fn start_agent_auto_selects_second_coder_when_first_busy() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + std::fs::create_dir_all(&sk).unwrap(); + std::fs::write( + sk.join("project.toml"), + r#" +[[agent]] +name = "supervisor" +stage = "other" + +[[agent]] +name = "coder-1" +stage = "coder" + +[[agent]] +name = "coder-2" +stage = "coder" +"#, + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("other-story", "coder-1", AgentStatus::Running); + + let result = pool + .start_agent(tmp.path(), "42_my_story", None, None) + .await; + match result { + Ok(info) => { + assert_eq!(info.agent_name, "coder-2"); + } + Err(err) => { + assert!( + !err.contains("All coder agents are busy"), + "should have selected coder-2 but got: {err}" + ); + assert!( + !err.contains("No coder agent configured"), + "should not fail on agent selection, got: {err}" + ); + } + } + } + + #[tokio::test] + async fn start_agent_returns_busy_when_all_coders_occupied() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + std::fs::create_dir_all(&sk).unwrap(); + std::fs::write( + sk.join("project.toml"), + r#" +[[agent]] +name = "coder-1" +stage = "coder" + +[[agent]] +name = "coder-2" +stage = "coder" +"#, + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); + pool.inject_test_agent("story-2", "coder-2", AgentStatus::Pending); + + let result = pool.start_agent(tmp.path(), "story-3", None, None).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.contains("All coder agents are busy"), + "expected busy error, got: {err}" + ); + } + + #[tokio::test] + async fn start_agent_moves_story_to_current_when_coders_busy() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + let backlog = sk.join("work/1_backlog"); + std::fs::create_dir_all(&backlog).unwrap(); + std::fs::write( + sk.join("project.toml"), + r#" +[[agent]] +name = "coder-1" +stage = "coder" +"#, + ) + .unwrap(); + std::fs::write(backlog.join("story-3.md"), "---\nname: Story 3\n---\n").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); + + let result = pool.start_agent(tmp.path(), "story-3", None, None).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.contains("All coder agents are busy"), + "expected busy error, got: {err}" + ); + assert!( + err.contains("queued in work/2_current/"), + "expected story-to-current message, got: {err}" + ); + + let current_path = sk.join("work/2_current/story-3.md"); + assert!( + current_path.exists(), + "story should be in 2_current/ after busy error, but was not" + ); + let backlog_path = backlog.join("story-3.md"); + assert!( + !backlog_path.exists(), + "story should no longer be in 1_backlog/" + ); + } + + #[tokio::test] + async fn start_agent_story_already_in_current_is_noop() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + let current = sk.join("work/2_current"); + std::fs::create_dir_all(¤t).unwrap(); + std::fs::write( + sk.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", + ) + .unwrap(); + std::fs::write(current.join("story-5.md"), "---\nname: Story 5\n---\n").unwrap(); + + let pool = AgentPool::new_test(3001); + + let result = pool.start_agent(tmp.path(), "story-5", None, None).await; + match result { + Ok(_) => {} + Err(e) => { + assert!( + !e.contains("Failed to move"), + "should not fail on idempotent move, got: {e}" + ); + } + } + } + + #[tokio::test] + async fn start_agent_explicit_name_unchanged_when_busy() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + std::fs::create_dir_all(&sk).unwrap(); + std::fs::write( + sk.join("project.toml"), + r#" +[[agent]] +name = "coder-1" +stage = "coder" + +[[agent]] +name = "coder-2" +stage = "coder" +"#, + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); + + let result = pool + .start_agent(tmp.path(), "story-2", Some("coder-1"), None) + .await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.contains("coder-1") && err.contains("already running"), + "expected explicit busy error, got: {err}" + ); + } + + // ── start_agent single-instance concurrency tests ───────────────────────── + + #[tokio::test] + async fn start_agent_rejects_when_same_agent_already_running_on_another_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story-a", "qa", AgentStatus::Running); + + let result = pool.start_agent(root, "story-b", Some("qa"), None).await; + + assert!( + result.is_err(), + "start_agent should fail when qa is already running on another story" + ); + let err = result.unwrap_err(); + assert!( + err.contains("already running") || err.contains("becomes available"), + "error message should explain why: got '{err}'" + ); + } + + #[tokio::test] + async fn start_agent_allows_new_story_when_previous_run_is_completed() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story-a", "qa", AgentStatus::Completed); + + let result = pool.start_agent(root, "story-b", Some("qa"), None).await; + + if let Err(ref e) = result { + assert!( + !e.contains("already running") && !e.contains("becomes available"), + "completed agent must not trigger the concurrency guard: got '{e}'" + ); + } + } + + // ── bug 118: pending entry cleanup on start_agent failure ──────────────── + + #[tokio::test] + async fn start_agent_cleans_up_pending_entry_on_failure() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", + ) + .unwrap(); + + let upcoming = root.join(".storkit/work/1_backlog"); + fs::create_dir_all(&upcoming).unwrap(); + fs::write(upcoming.join("50_story_test.md"), "---\nname: Test\n---\n").unwrap(); + + let pool = AgentPool::new_test(3099); + + let result = pool + .start_agent(root, "50_story_test", Some("coder-1"), None) + .await; + + assert!( + result.is_ok(), + "start_agent should return Ok(Pending) immediately: {:?}", + result.err() + ); + assert_eq!( + result.unwrap().status, + AgentStatus::Pending, + "initial status must be Pending" + ); + + let final_info = pool + .wait_for_agent("50_story_test", "coder-1", 5000) + .await + .expect("wait_for_agent should not time out"); + assert_eq!( + final_info.status, + AgentStatus::Failed, + "agent must transition to Failed after worktree creation error" + ); + + let agents = pool.agents.lock().unwrap(); + let failed_entry = agents + .values() + .find(|a| a.agent_name == "coder-1" && a.status == AgentStatus::Failed); + assert!( + failed_entry.is_some(), + "agent pool must retain a Failed entry so the UI can show the error state" + ); + drop(agents); + + let events = pool + .drain_events("50_story_test", "coder-1") + .expect("drain_events should succeed"); + let has_error_event = events.iter().any(|e| matches!(e, AgentEvent::Error { .. })); + assert!( + has_error_event, + "event_log must contain AgentEvent::Error after worktree creation fails" + ); + } + + #[tokio::test] + async fn start_agent_guard_does_not_remove_running_entry() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap(); + + let pool = AgentPool::new_test(3099); + pool.inject_test_agent("story-x", "qa", AgentStatus::Running); + + let result = pool.start_agent(root, "story-y", Some("qa"), None).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.contains("already running") || err.contains("becomes available"), + "running entry must survive: got '{err}'" + ); + } + + // ── TOCTOU race-condition regression tests (story 132) ─────────────────── + + #[tokio::test] + async fn toctou_pending_entry_blocks_same_agent_on_different_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + pool.inject_test_agent("86_story_foo", "coder-1", AgentStatus::Pending); + + let result = pool + .start_agent(root, "130_story_bar", Some("coder-1"), None) + .await; + + assert!(result.is_err(), "second start_agent must be rejected"); + let err = result.unwrap_err(); + assert!( + err.contains("already running") || err.contains("becomes available"), + "expected concurrency-rejection message, got: '{err}'" + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn toctou_concurrent_start_agent_same_agent_exactly_one_concurrency_rejection() { + use std::fs; + use std::sync::Arc; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path().to_path_buf(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/1_backlog")).unwrap(); + fs::write( + root.join(".storkit/project.toml"), + "[[agent]]\nname = \"coder-1\"\n", + ) + .unwrap(); + fs::write( + root.join(".storkit/work/1_backlog/86_story_foo.md"), + "---\nname: Foo\n---\n", + ) + .unwrap(); + fs::write( + root.join(".storkit/work/1_backlog/130_story_bar.md"), + "---\nname: Bar\n---\n", + ) + .unwrap(); + + let pool = Arc::new(AgentPool::new_test(3099)); + + let pool1 = pool.clone(); + let root1 = root.clone(); + let t1 = tokio::spawn(async move { + pool1 + .start_agent(&root1, "86_story_foo", Some("coder-1"), None) + .await + }); + + let pool2 = pool.clone(); + let root2 = root.clone(); + let t2 = tokio::spawn(async move { + pool2 + .start_agent(&root2, "130_story_bar", Some("coder-1"), None) + .await + }); + + let (r1, r2) = tokio::join!(t1, t2); + let r1 = r1.unwrap(); + let r2 = r2.unwrap(); + + let concurrency_rejections = [&r1, &r2] + .iter() + .filter(|r| { + r.as_ref().is_err_and(|e| { + e.contains("already running") || e.contains("becomes available") + }) + }) + .count(); + + assert_eq!( + concurrency_rejections, 1, + "exactly one call must be rejected by the concurrency check; \ + got r1={r1:?} r2={r2:?}" + ); + } + + // ── story-230: prevent duplicate stage agents on same story ─────────────── + + #[tokio::test] + async fn start_agent_rejects_second_coder_stage_on_same_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running); + + let result = pool + .start_agent(root, "42_story_foo", Some("coder-2"), None) + .await; + + assert!( + result.is_err(), + "second coder on same story must be rejected" + ); + let err = result.unwrap_err(); + assert!( + err.contains("same pipeline stage"), + "error must mention same pipeline stage, got: '{err}'" + ); + assert!( + err.contains("coder-1") && err.contains("coder-2"), + "error must name both agents, got: '{err}'" + ); + } + + #[tokio::test] + async fn start_agent_rejects_second_qa_stage_on_same_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"qa-1\"\nstage = \"qa\"\n\n\ + [[agent]]\nname = \"qa-2\"\nstage = \"qa\"\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + pool.inject_test_agent("55_story_bar", "qa-1", AgentStatus::Running); + + let result = pool + .start_agent(root, "55_story_bar", Some("qa-2"), None) + .await; + + assert!(result.is_err(), "second qa on same story must be rejected"); + let err = result.unwrap_err(); + assert!( + err.contains("same pipeline stage"), + "error must mention same pipeline stage, got: '{err}'" + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn start_agent_concurrent_two_coders_same_story_exactly_one_stage_rejection() { + use std::fs; + use std::sync::Arc; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path().to_path_buf(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); + fs::write( + root.join(".storkit/project.toml"), + "[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n", + ) + .unwrap(); + fs::write( + root.join(".storkit/work/2_current/42_story_foo.md"), + "---\nname: Foo\n---\n", + ) + .unwrap(); + + let pool = Arc::new(AgentPool::new_test(3099)); + + let pool1 = pool.clone(); + let root1 = root.clone(); + let t1 = tokio::spawn(async move { + pool1 + .start_agent(&root1, "42_story_foo", Some("coder-1"), None) + .await + }); + + let pool2 = pool.clone(); + let root2 = root.clone(); + let t2 = tokio::spawn(async move { + pool2 + .start_agent(&root2, "42_story_foo", Some("coder-2"), None) + .await + }); + + let (r1, r2) = tokio::join!(t1, t2); + let r1 = r1.unwrap(); + let r2 = r2.unwrap(); + + let stage_rejections = [&r1, &r2] + .iter() + .filter(|r| r.as_ref().is_err_and(|e| e.contains("same pipeline stage"))) + .count(); + + assert_eq!( + stage_rejections, 1, + "exactly one call must be rejected by the stage-conflict check; \ + got r1={r1:?} r2={r2:?}" + ); + } + + #[tokio::test] + async fn start_agent_two_coders_different_stories_not_blocked_by_stage_check() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/1_backlog")).unwrap(); + fs::write( + root.join(".storkit/project.toml"), + "[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n", + ) + .unwrap(); + fs::write( + root.join(".storkit/work/1_backlog/99_story_baz.md"), + "---\nname: Baz\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running); + + let result = pool + .start_agent(root, "99_story_baz", Some("coder-2"), None) + .await; + + if let Err(ref e) = result { + assert!( + !e.contains("same pipeline stage"), + "stage-conflict guard must not fire for agents on different stories; \ + got: '{e}'" + ); + } + } + + // ── bug 312: stage-pipeline mismatch guard in start_agent ────────────── + + #[tokio::test] + async fn start_agent_rejects_mergemaster_on_coding_stage_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ + [[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", + ) + .unwrap(); + fs::write( + sk_dir.join("work/2_current/310_story_foo.md"), + "---\nname: Foo\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + let result = pool + .start_agent(root, "310_story_foo", Some("mergemaster"), None) + .await; + + assert!( + result.is_err(), + "mergemaster must not be assigned to a story in 2_current/" + ); + let err = result.unwrap_err(); + assert!( + err.contains("stage") && err.contains("2_current"), + "error must mention stage mismatch, got: '{err}'" + ); + } + + #[tokio::test] + async fn start_agent_rejects_coder_on_qa_stage_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/3_qa")).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ + [[agent]]\nname = \"qa\"\nstage = \"qa\"\n", + ) + .unwrap(); + fs::write( + sk_dir.join("work/3_qa/42_story_bar.md"), + "---\nname: Bar\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + let result = pool + .start_agent(root, "42_story_bar", Some("coder-1"), None) + .await; + + assert!( + result.is_err(), + "coder must not be assigned to a story in 3_qa/" + ); + let err = result.unwrap_err(); + assert!( + err.contains("stage") && err.contains("3_qa"), + "error must mention stage mismatch, got: '{err}'" + ); + } + + #[tokio::test] + async fn start_agent_rejects_qa_on_merge_stage_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/4_merge")).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"qa\"\nstage = \"qa\"\n\n\ + [[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", + ) + .unwrap(); + fs::write( + sk_dir.join("work/4_merge/55_story_baz.md"), + "---\nname: Baz\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + let result = pool + .start_agent(root, "55_story_baz", Some("qa"), None) + .await; + + assert!( + result.is_err(), + "qa must not be assigned to a story in 4_merge/" + ); + let err = result.unwrap_err(); + assert!( + err.contains("stage") && err.contains("4_merge"), + "error must mention stage mismatch, got: '{err}'" + ); + } + + #[tokio::test] + async fn start_agent_allows_supervisor_on_any_stage() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"supervisor\"\nstage = \"other\"\n", + ) + .unwrap(); + fs::write( + sk_dir.join("work/2_current/77_story_sup.md"), + "---\nname: Sup\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + let result = pool + .start_agent(root, "77_story_sup", Some("supervisor"), None) + .await; + + match result { + Ok(_) => {} + Err(e) => { + assert!( + !e.contains("stage:") || !e.contains("cannot be assigned"), + "supervisor should not be rejected for stage mismatch, got: '{e}'" + ); + } + } + } + + #[tokio::test] + async fn start_agent_allows_correct_stage_agent() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/4_merge")).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", + ) + .unwrap(); + fs::write( + sk_dir.join("work/4_merge/88_story_ok.md"), + "---\nname: OK\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + let result = pool + .start_agent(root, "88_story_ok", Some("mergemaster"), None) + .await; + + match result { + Ok(_) => {} + Err(e) => { + assert!( + !e.contains("cannot be assigned"), + "mergemaster on 4_merge/ story should not fail stage check, got: '{e}'" + ); + } + } + } + + // ── remove_agents_for_story tests ──────────────────────────────────────── + + #[test] + fn remove_agents_for_story_removes_all_entries() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story_a", "coder-1", AgentStatus::Completed); + pool.inject_test_agent("story_a", "qa", AgentStatus::Failed); + pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running); + + let removed = pool.remove_agents_for_story("story_a"); + assert_eq!(removed, 2, "should remove both agents for story_a"); + + let agents = pool.list_agents().unwrap(); + assert_eq!(agents.len(), 1, "only story_b agent should remain"); + assert_eq!(agents[0].story_id, "story_b"); + } + + #[test] + fn remove_agents_for_story_returns_zero_when_no_match() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running); + + let removed = pool.remove_agents_for_story("nonexistent"); + assert_eq!(removed, 0); + + let agents = pool.list_agents().unwrap(); + assert_eq!(agents.len(), 1, "existing agents should not be affected"); + } + + // ── front matter agent preference (bug 379) ────────────────────────────── + + #[tokio::test] + async fn start_agent_honours_front_matter_agent_when_idle() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + let backlog = sk.join("work/1_backlog"); + std::fs::create_dir_all(&backlog).unwrap(); + std::fs::write( + sk.join("project.toml"), + r#" +[[agent]] +name = "coder-sonnet" +stage = "coder" + +[[agent]] +name = "coder-opus" +stage = "coder" +"#, + ) + .unwrap(); + // Story file with agent preference in front matter. + std::fs::write( + backlog.join("368_story_test.md"), + "---\nname: Test Story\nagent: coder-opus\n---\n# Story 368\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3010); + // coder-sonnet is busy so without front matter the auto-selection + // would skip coder-opus and try something else. + pool.inject_test_agent("other-story", "coder-sonnet", AgentStatus::Running); + + let result = pool + .start_agent(tmp.path(), "368_story_test", None, None) + .await; + match result { + Ok(info) => { + assert_eq!( + info.agent_name, "coder-opus", + "should pick the front-matter preferred agent" + ); + } + Err(err) => { + // Allowed to fail for infrastructure reasons (no git repo), + // but NOT due to agent selection ignoring the preference. + assert!( + !err.contains("All coder agents are busy"), + "should not report busy when coder-opus is idle: {err}" + ); + assert!( + !err.contains("coder-sonnet"), + "should not have picked coder-sonnet: {err}" + ); + } + } + } + + #[tokio::test] + async fn start_agent_returns_error_when_front_matter_agent_busy() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + let backlog = sk.join("work/1_backlog"); + std::fs::create_dir_all(&backlog).unwrap(); + std::fs::write( + sk.join("project.toml"), + r#" +[[agent]] +name = "coder-sonnet" +stage = "coder" + +[[agent]] +name = "coder-opus" +stage = "coder" +"#, + ) + .unwrap(); + std::fs::write( + backlog.join("368_story_test.md"), + "---\nname: Test Story\nagent: coder-opus\n---\n# Story 368\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3011); + // Preferred agent is busy — should NOT fall back to coder-sonnet. + pool.inject_test_agent("other-story", "coder-opus", AgentStatus::Running); + + let result = pool + .start_agent(tmp.path(), "368_story_test", None, None) + .await; + assert!(result.is_err(), "expected error when preferred agent is busy"); + let err = result.unwrap_err(); + assert!( + err.contains("coder-opus"), + "error should mention the preferred agent: {err}" + ); + assert!( + err.contains("busy") || err.contains("queued"), + "error should say agent is busy or story is queued: {err}" + ); + } + + // ── archive + cleanup integration test ─────────────────────────────────── + + #[tokio::test] + async fn archiving_story_removes_agent_entries_from_pool() { + use crate::agents::lifecycle::move_story_to_archived; + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let current = root.join(".storkit/work/2_current"); + fs::create_dir_all(¤t).unwrap(); + fs::write(current.join("60_story_cleanup.md"), "test").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("60_story_cleanup", "coder-1", AgentStatus::Completed); + pool.inject_test_agent("60_story_cleanup", "qa", AgentStatus::Completed); + pool.inject_test_agent("61_story_other", "coder-1", AgentStatus::Running); + + assert_eq!(pool.list_agents().unwrap().len(), 3); + + move_story_to_archived(root, "60_story_cleanup").unwrap(); + pool.remove_agents_for_story("60_story_cleanup"); + + let remaining = pool.list_agents().unwrap(); + assert_eq!( + remaining.len(), + 1, + "only the other story's agent should remain" + ); + assert_eq!(remaining[0].story_id, "61_story_other"); + + assert!( + root.join(".storkit/work/5_done/60_story_cleanup.md") + .exists() + ); + } + +} diff --git a/server/src/agents/pool/mod.rs b/server/src/agents/pool/mod.rs index cc0b4147..a7610f11 100644 --- a/server/src/agents/pool/mod.rs +++ b/server/src/agents/pool/mod.rs @@ -1,118 +1,24 @@ mod auto_assign; +mod lifecycle; mod pipeline; +mod process; +mod query; +mod types; +mod worktree; + +#[cfg(test)] +mod test_helpers; -use crate::agent_log::AgentLogWriter; -use crate::config::ProjectConfig; use crate::io::watcher::WatcherEvent; -use crate::slog; -use crate::slog_error; -use crate::worktree::{self, WorktreeInfo}; use portable_pty::ChildKiller; use std::collections::HashMap; -use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; -use super::{ - AgentEvent, AgentInfo, AgentStatus, CompletionReport, PipelineStage, agent_config_stage, - pipeline_stage, -}; -use super::runtime::{AgentRuntime, ClaudeCodeRuntime, GeminiRuntime, OpenAiRuntime, RuntimeContext}; - -/// Build the composite key used to track agents in the pool. -fn composite_key(story_id: &str, agent_name: &str) -> String { - format!("{story_id}:{agent_name}") -} - -/// RAII guard that removes a pending agent entry from the pool on drop. -/// -/// Created after inserting a `Pending` entry into the agent HashMap. -/// If `start_agent` succeeds (the agent process is spawned and status -/// transitions to `Running`), call [`disarm`](Self::disarm) to prevent -/// cleanup. If any intermediate step fails and the guard is dropped -/// without being disarmed, the pending entry is removed so it cannot -/// block future auto-assign dispatches. -struct PendingGuard { - agents: Arc>>, - key: String, - armed: bool, -} - -impl PendingGuard { - fn new(agents: Arc>>, key: String) -> Self { - Self { - agents, - key, - armed: true, - } - } - - /// Prevent the guard from cleaning up the entry (call after - /// successful spawn). - fn disarm(&mut self) { - self.armed = false; - } -} - -impl Drop for PendingGuard { - fn drop(&mut self) { - if self.armed - && let Ok(mut agents) = self.agents.lock() - && agents - .get(&self.key) - .is_some_and(|a| a.status == AgentStatus::Pending) - { - agents.remove(&self.key); - slog!( - "[agents] Cleaned up leaked Pending entry for '{}'", - self.key - ); - } - } -} - -struct StoryAgent { - agent_name: String, - status: AgentStatus, - worktree_info: Option, - session_id: Option, - tx: broadcast::Sender, - task_handle: Option>, - /// Accumulated events for polling via get_agent_output. - event_log: Arc>>, - /// Set when the agent calls report_completion. - completion: Option, - /// Project root, stored for pipeline advancement after completion. - project_root: Option, - /// UUID identifying the log file for this session. - log_session_id: Option, - /// Set to `true` when the agent calls `report_merge_failure`. - /// Prevents the pipeline from blindly advancing to `5_done/` after a - /// failed merge: the server-owned gate check runs in the feature-branch - /// worktree (which compiles fine) and returns `gates_passed=true` even - /// though the code was never squash-merged onto master. - merge_failure_reported: bool, -} - -/// Build an `AgentInfo` snapshot from a `StoryAgent` map entry. -fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> AgentInfo { - AgentInfo { - story_id: story_id.to_string(), - agent_name: agent.agent_name.clone(), - status: agent.status.clone(), - session_id: agent.session_id.clone(), - worktree_path: agent - .worktree_info - .as_ref() - .map(|wt| wt.path.to_string_lossy().to_string()), - base_branch: agent - .worktree_info - .as_ref() - .map(|wt| wt.base_branch.clone()), - completion: agent.completion.clone(), - log_session_id: agent.log_session_id.clone(), - } -} +// Bring pool-internal types into pool's namespace so that sub-modules +// (auto_assign, pipeline, etc.) can access them via `use super::...`. +use types::{StoryAgent, composite_key}; +use worktree::find_active_story_stage; /// Manages concurrent story agents, each in its own worktree. pub struct AgentPool { @@ -160,2248 +66,4 @@ impl AgentPool { fn notify_agent_state_changed(watcher_tx: &broadcast::Sender) { let _ = watcher_tx.send(WatcherEvent::AgentStateChanged); } - - /// Kill all active PTY child processes. - /// - /// Called on server shutdown to prevent orphaned Claude Code processes from - /// continuing to run after the server exits. Each registered killer is called - /// once, then the registry is cleared. - pub fn kill_all_children(&self) { - if let Ok(mut killers) = self.child_killers.lock() { - for (key, killer) in killers.iter_mut() { - slog!("[agents] Killing child process for {key} on shutdown"); - let _ = killer.kill(); - } - killers.clear(); - } - } - - /// Kill and deregister the child process for a specific agent key. - /// - /// Used by `stop_agent` to ensure the PTY child is terminated even though - /// aborting a `spawn_blocking` task handle does not interrupt the blocking thread. - fn kill_child_for_key(&self, key: &str) { - if let Ok(mut killers) = self.child_killers.lock() - && let Some(mut killer) = killers.remove(key) - { - slog!("[agents] Killing child process for {key} on stop"); - let _ = killer.kill(); - } - } - - /// Start an agent for a story: load config, create worktree, spawn agent. - /// - /// When `agent_name` is `None`, automatically selects the first idle coder - /// agent (story 190). If all coders are busy the call fails with an error - /// indicating the story will be picked up when one becomes available. - /// - /// If `resume_context` is provided, it is appended to the rendered prompt - /// so the agent can pick up from a previous failed attempt. - pub async fn start_agent( - &self, - project_root: &Path, - story_id: &str, - agent_name: Option<&str>, - resume_context: Option<&str>, - ) -> Result { - let config = ProjectConfig::load(project_root)?; - - // Validate explicit agent name early (no lock needed). - if let Some(name) = agent_name { - config - .find_agent(name) - .ok_or_else(|| format!("No agent named '{name}' in config"))?; - } - - // Create name-independent shared resources before the lock so they are - // ready for the atomic check-and-insert (story 132). - let (tx, _) = broadcast::channel::(1024); - let event_log: Arc>> = Arc::new(Mutex::new(Vec::new())); - let log_session_id = uuid::Uuid::new_v4().to_string(); - - // Move story from backlog/ to current/ before checking agent - // availability so that auto_assign_available_work can pick it up even - // when all coders are currently busy (story 203). This is idempotent: - // if the story is already in 2_current/ or a later stage, the call is - // a no-op. - super::lifecycle::move_story_to_current(project_root, story_id)?; - - // Validate that the agent's configured stage matches the story's - // pipeline stage. This prevents any caller (auto-assign, MCP tool, - // pipeline advance, supervisor) from starting a wrong-stage agent on - // a story — e.g. mergemaster on a coding-stage story (bug 312). - if let Some(name) = agent_name { - let agent_stage = config - .find_agent(name) - .map(agent_config_stage) - .unwrap_or_else(|| pipeline_stage(name)); - if agent_stage != PipelineStage::Other - && let Some(story_stage_dir) = find_active_story_stage(project_root, story_id) - { - let expected_stage = match story_stage_dir { - "2_current" => PipelineStage::Coder, - "3_qa" => PipelineStage::Qa, - "4_merge" => PipelineStage::Mergemaster, - _ => PipelineStage::Other, - }; - if expected_stage != PipelineStage::Other && expected_stage != agent_stage { - return Err(format!( - "Agent '{name}' (stage: {agent_stage:?}) cannot be assigned to \ - story '{story_id}' in {story_stage_dir}/ (requires stage: {expected_stage:?})" - )); - } - } - } - - // Read the preferred agent from the story's front matter before acquiring - // the lock. When no explicit agent_name is given, this lets start_agent - // honour `agent: coder-opus` written by the `assign` command — mirroring - // the auto_assign path (bug 379). - let front_matter_agent: Option = if agent_name.is_none() { - find_active_story_stage(project_root, story_id).and_then(|stage_dir| { - let path = project_root - .join(".storkit") - .join("work") - .join(stage_dir) - .join(format!("{story_id}.md")); - let contents = std::fs::read_to_string(path).ok()?; - crate::io::story_metadata::parse_front_matter(&contents).ok()?.agent - }) - } else { - None - }; - - // Atomically resolve agent name, check availability, and register as - // Pending. When `agent_name` is `None` the first idle coder is - // selected inside the lock so no TOCTOU race can occur between the - // availability check and the Pending insert (story 132, story 190). - // - // The `PendingGuard` ensures that if any step below fails the entry is - // removed from the pool so it does not permanently block auto-assign - // (bug 118). - let resolved_name: String; - let key: String; - { - let mut agents = self.agents.lock().map_err(|e| e.to_string())?; - - resolved_name = match agent_name { - Some(name) => name.to_string(), - None => { - // Honour the `agent:` field in the story's front matter so that - // `start 368` after `assign 368 opus` picks the right agent - // (bug 379). Mirrors the auto_assign selection logic. - if let Some(ref pref) = front_matter_agent { - let stage_matches = config - .find_agent(pref) - .map(|cfg| agent_config_stage(cfg) == PipelineStage::Coder) - .unwrap_or(false); - if stage_matches { - if auto_assign::is_agent_free(&agents, pref) { - pref.clone() - } else { - return Err(format!( - "Preferred agent '{pref}' from story front matter is busy; \ - story '{story_id}' has been queued in work/2_current/ and will \ - be auto-assigned when it becomes available" - )); - } - } else { - // Stage mismatch — fall back to any free coder. - auto_assign::find_free_agent_for_stage( - &config, - &agents, - &PipelineStage::Coder, - ) - .map(|s| s.to_string()) - .ok_or_else(|| { - if config - .agent - .iter() - .any(|a| agent_config_stage(a) == PipelineStage::Coder) - { - format!( - "All coder agents are busy; story '{story_id}' has been \ - queued in work/2_current/ and will be auto-assigned when \ - one becomes available" - ) - } else { - "No coder agent configured. Specify an agent_name explicitly." - .to_string() - } - })? - } - } else { - auto_assign::find_free_agent_for_stage( - &config, - &agents, - &PipelineStage::Coder, - ) - .map(|s| s.to_string()) - .ok_or_else(|| { - if config - .agent - .iter() - .any(|a| agent_config_stage(a) == PipelineStage::Coder) - { - format!( - "All coder agents are busy; story '{story_id}' has been \ - queued in work/2_current/ and will be auto-assigned when \ - one becomes available" - ) - } else { - "No coder agent configured. Specify an agent_name explicitly." - .to_string() - } - })? - } - } - }; - - key = composite_key(story_id, &resolved_name); - - // Check for duplicate assignment (same story + same agent already active). - if let Some(agent) = agents.get(&key) - && (agent.status == AgentStatus::Running || agent.status == AgentStatus::Pending) - { - return Err(format!( - "Agent '{resolved_name}' for story '{story_id}' is already {}", - agent.status - )); - } - // Enforce single-stage concurrency: reject if there is already a - // Running/Pending agent at the same pipeline stage for this story. - // This prevents two coders (or two QA/mergemaster agents) from - // corrupting each other's work in the same worktree. - // Applies to both explicit and auto-selected agents; the Other - // stage (supervisors, unknown agents) is exempt. - let resolved_stage = config - .find_agent(&resolved_name) - .map(agent_config_stage) - .unwrap_or_else(|| pipeline_stage(&resolved_name)); - if resolved_stage != PipelineStage::Other - && let Some(conflicting_name) = agents.iter().find_map(|(k, a)| { - let k_story = k.rsplit_once(':').map(|(s, _)| s).unwrap_or(k); - if k_story == story_id - && a.agent_name != resolved_name - && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) - { - let a_stage = config - .find_agent(&a.agent_name) - .map(agent_config_stage) - .unwrap_or_else(|| pipeline_stage(&a.agent_name)); - if a_stage == resolved_stage { - Some(a.agent_name.clone()) - } else { - None - } - } else { - None - } - }) - { - return Err(format!( - "Cannot start '{resolved_name}' on story '{story_id}': \ - '{conflicting_name}' is already active at the same pipeline stage" - )); - } - // Enforce single-instance concurrency for explicitly-named agents: - // if this agent is already running on any other story, reject. - // Auto-selected agents are already guaranteed idle by - // find_free_agent_for_stage, so this check is only needed for - // explicit requests. - if agent_name.is_some() - && let Some(busy_story) = agents.iter().find_map(|(k, a)| { - if a.agent_name == resolved_name - && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) - { - Some( - k.rsplit_once(':') - .map(|(sid, _)| sid) - .unwrap_or(k) - .to_string(), - ) - } else { - None - } - }) - { - return Err(format!( - "Agent '{resolved_name}' is already running on story '{busy_story}'; \ - story '{story_id}' will be picked up when the agent becomes available" - )); - } - agents.insert( - key.clone(), - StoryAgent { - agent_name: resolved_name.clone(), - status: AgentStatus::Pending, - worktree_info: None, - session_id: None, - tx: tx.clone(), - task_handle: None, - event_log: event_log.clone(), - completion: None, - project_root: Some(project_root.to_path_buf()), - log_session_id: Some(log_session_id.clone()), - merge_failure_reported: false, - }, - ); - } - let mut pending_guard = PendingGuard::new(self.agents.clone(), key.clone()); - - // Create persistent log writer (needs resolved_name, so must be after - // the atomic resolution above). - let log_writer = - match AgentLogWriter::new(project_root, story_id, &resolved_name, &log_session_id) { - Ok(w) => Some(Arc::new(Mutex::new(w))), - Err(e) => { - eprintln!( - "[agents] Failed to create log writer for {story_id}:{resolved_name}: {e}" - ); - None - } - }; - - // Notify WebSocket clients that a new agent is pending. - Self::notify_agent_state_changed(&self.watcher_tx); - - let _ = tx.send(AgentEvent::Status { - story_id: story_id.to_string(), - agent_name: resolved_name.clone(), - status: "pending".to_string(), - }); - - // Extract inactivity timeout from the agent config before cloning config. - let inactivity_timeout_secs = config - .find_agent(&resolved_name) - .map(|a| a.inactivity_timeout_secs) - .unwrap_or(300); - - // Clone all values needed inside the background spawn. - let project_root_clone = project_root.to_path_buf(); - let config_clone = config.clone(); - let resume_context_owned = resume_context.map(str::to_string); - let sid = story_id.to_string(); - let aname = resolved_name.clone(); - let tx_clone = tx.clone(); - let agents_ref = self.agents.clone(); - let key_clone = key.clone(); - let log_clone = event_log.clone(); - let port_for_task = self.port; - let log_writer_clone = log_writer.clone(); - let child_killers_clone = self.child_killers.clone(); - let watcher_tx_clone = self.watcher_tx.clone(); - - // Spawn the background task. Worktree creation and agent launch happen here - // so `start_agent` returns immediately after registering the agent as - // Pending — non-blocking by design (story 157). - let handle = tokio::spawn(async move { - // Step 1: create the worktree (slow — git checkout, pnpm install, etc.) - let wt_info = match worktree::create_worktree( - &project_root_clone, - &sid, - &config_clone, - port_for_task, - ) - .await - { - Ok(wt) => wt, - Err(e) => { - let error_msg = format!("Failed to create worktree: {e}"); - slog_error!("[agents] {error_msg}"); - let event = AgentEvent::Error { - story_id: sid.clone(), - agent_name: aname.clone(), - message: error_msg, - }; - if let Ok(mut log) = log_clone.lock() { - log.push(event.clone()); - } - let _ = tx_clone.send(event); - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Failed; - } - Self::notify_agent_state_changed(&watcher_tx_clone); - return; - } - }; - - // Step 2: store worktree info and render agent command/args/prompt. - let wt_path_str = wt_info.path.to_string_lossy().to_string(); - { - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.worktree_info = Some(wt_info.clone()); - } - } - - let (command, args, mut prompt) = match config_clone.render_agent_args( - &wt_path_str, - &sid, - Some(&aname), - Some(&wt_info.base_branch), - ) { - Ok(result) => result, - Err(e) => { - let error_msg = format!("Failed to render agent args: {e}"); - slog_error!("[agents] {error_msg}"); - let event = AgentEvent::Error { - story_id: sid.clone(), - agent_name: aname.clone(), - message: error_msg, - }; - if let Ok(mut log) = log_clone.lock() { - log.push(event.clone()); - } - let _ = tx_clone.send(event); - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Failed; - } - Self::notify_agent_state_changed(&watcher_tx_clone); - return; - } - }; - - // Append resume context if this is a restart with failure information. - if let Some(ctx) = resume_context_owned { - prompt.push_str(&ctx); - } - - // Step 3: transition to Running now that the worktree is ready. - { - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Running; - } - } - let _ = tx_clone.send(AgentEvent::Status { - story_id: sid.clone(), - agent_name: aname.clone(), - status: "running".to_string(), - }); - Self::notify_agent_state_changed(&watcher_tx_clone); - - // Step 4: launch the agent process via the configured runtime. - let runtime_name = config_clone - .find_agent(&aname) - .and_then(|a| a.runtime.as_deref()) - .unwrap_or("claude-code"); - - let run_result = match runtime_name { - "claude-code" => { - let runtime = ClaudeCodeRuntime::new(child_killers_clone.clone(), watcher_tx_clone.clone()); - let ctx = RuntimeContext { - story_id: sid.clone(), - agent_name: aname.clone(), - command, - args, - prompt, - cwd: wt_path_str, - inactivity_timeout_secs, - mcp_port: port_for_task, - }; - runtime - .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) - .await - } - "gemini" => { - let runtime = GeminiRuntime::new(); - let ctx = RuntimeContext { - story_id: sid.clone(), - agent_name: aname.clone(), - command, - args, - prompt, - cwd: wt_path_str, - inactivity_timeout_secs, - mcp_port: port_for_task, - }; - runtime - .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) - .await - } - "openai" => { - let runtime = OpenAiRuntime::new(); - let ctx = RuntimeContext { - story_id: sid.clone(), - agent_name: aname.clone(), - command, - args, - prompt, - cwd: wt_path_str, - inactivity_timeout_secs, - mcp_port: port_for_task, - }; - runtime - .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) - .await - } - other => Err(format!( - "Unknown agent runtime '{other}'; check the 'runtime' field in project.toml. \ - Supported: 'claude-code', 'gemini', 'openai'" - )), - }; - - match run_result { - Ok(result) => { - // Persist token usage if the agent reported it. - if let Some(ref usage) = result.token_usage - && let Ok(agents) = agents_ref.lock() - && let Some(agent) = agents.get(&key_clone) - && let Some(ref pr) = agent.project_root - { - let model = config_clone - .find_agent(&aname) - .and_then(|a| a.model.clone()); - let record = super::token_usage::build_record( - &sid, &aname, model, usage.clone(), - ); - if let Err(e) = super::token_usage::append_record(pr, &record) { - slog_error!( - "[agents] Failed to persist token usage for \ - {sid}:{aname}: {e}" - ); - } - } - - // Server-owned completion: run acceptance gates automatically - // when the agent process exits normally. - pipeline::run_server_owned_completion( - &agents_ref, - port_for_task, - &sid, - &aname, - result.session_id, - watcher_tx_clone.clone(), - ) - .await; - Self::notify_agent_state_changed(&watcher_tx_clone); - } - Err(e) => { - slog_error!("[agents] Agent process error for {aname} on {sid}: {e}"); - let event = AgentEvent::Error { - story_id: sid.clone(), - agent_name: aname.clone(), - message: e, - }; - if let Ok(mut log) = log_clone.lock() { - log.push(event.clone()); - } - let _ = tx_clone.send(event); - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Failed; - } - Self::notify_agent_state_changed(&watcher_tx_clone); - } - } - }); - - // Store the task handle while the agent is still Pending. - { - let mut agents = self.agents.lock().map_err(|e| e.to_string())?; - if let Some(agent) = agents.get_mut(&key) { - agent.task_handle = Some(handle); - } - } - - // Agent successfully spawned — prevent the guard from removing the entry. - pending_guard.disarm(); - - Ok(AgentInfo { - story_id: story_id.to_string(), - agent_name: resolved_name, - status: AgentStatus::Pending, - session_id: None, - worktree_path: None, - base_branch: None, - completion: None, - log_session_id: Some(log_session_id), - }) - } - - /// Stop a running agent. Worktree is preserved for inspection. - pub async fn stop_agent( - &self, - _project_root: &Path, - story_id: &str, - agent_name: &str, - ) -> Result<(), String> { - let key = composite_key(story_id, agent_name); - - let (worktree_info, task_handle, tx) = { - let mut agents = self.agents.lock().map_err(|e| e.to_string())?; - let agent = agents - .get_mut(&key) - .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; - - let wt = agent.worktree_info.clone(); - let handle = agent.task_handle.take(); - let tx = agent.tx.clone(); - agent.status = AgentStatus::Failed; - (wt, handle, tx) - }; - - // Abort the task and kill the PTY child process. - // Note: aborting a spawn_blocking task handle does not interrupt the blocking - // thread, so we must also kill the child process directly via the killer registry. - if let Some(handle) = task_handle { - handle.abort(); - let _ = handle.await; - } - self.kill_child_for_key(&key); - - // Preserve worktree for inspection — don't destroy agent's work on stop. - if let Some(ref wt) = worktree_info { - slog!( - "[agents] Worktree preserved for {story_id}:{agent_name}: {}", - wt.path.display() - ); - } - - let _ = tx.send(AgentEvent::Status { - story_id: story_id.to_string(), - agent_name: agent_name.to_string(), - status: "stopped".to_string(), - }); - - // Remove from map - { - let mut agents = self.agents.lock().map_err(|e| e.to_string())?; - agents.remove(&key); - } - - // Notify WebSocket clients so pipeline board and agent panel update. - Self::notify_agent_state_changed(&self.watcher_tx); - - Ok(()) - } - - /// Return the names of configured agents for `stage` that are not currently - /// running or pending. - pub fn available_agents_for_stage( - &self, - config: &ProjectConfig, - stage: &PipelineStage, - ) -> Result, String> { - let agents = self.agents.lock().map_err(|e| e.to_string())?; - Ok(config - .agent - .iter() - .filter(|cfg| agent_config_stage(cfg) == *stage) - .filter(|cfg| { - !agents.values().any(|a| { - a.agent_name == cfg.name - && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) - }) - }) - .map(|cfg| cfg.name.clone()) - .collect()) - } - - /// List all agents with their status. - pub fn list_agents(&self) -> Result, String> { - let agents = self.agents.lock().map_err(|e| e.to_string())?; - Ok(agents - .iter() - .map(|(key, agent)| { - // Extract story_id from composite key "story_id:agent_name" - let story_id = key - .rsplit_once(':') - .map(|(sid, _)| sid.to_string()) - .unwrap_or_else(|| key.clone()); - agent_info_from_entry(&story_id, agent) - }) - .collect()) - } - - /// Subscribe to events for a story agent. - pub fn subscribe( - &self, - story_id: &str, - agent_name: &str, - ) -> Result, String> { - let key = composite_key(story_id, agent_name); - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let agent = agents - .get(&key) - .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; - Ok(agent.tx.subscribe()) - } - - /// Drain accumulated events for polling. Returns all events since the last drain. - pub fn drain_events( - &self, - story_id: &str, - agent_name: &str, - ) -> Result, String> { - let key = composite_key(story_id, agent_name); - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let agent = agents - .get(&key) - .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; - let mut log = agent.event_log.lock().map_err(|e| e.to_string())?; - Ok(log.drain(..).collect()) - } - - /// Block until the agent reaches a terminal state (completed, failed, stopped). - /// Returns the agent's final `AgentInfo`. - /// `timeout_ms` caps how long to wait; returns an error if the deadline passes. - pub async fn wait_for_agent( - &self, - story_id: &str, - agent_name: &str, - timeout_ms: u64, - ) -> Result { - // Subscribe before checking status so we don't miss the terminal event - // if the agent completes in the window between the two operations. - let mut rx = self.subscribe(story_id, agent_name)?; - - // Return immediately if already in a terminal state. - { - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let key = composite_key(story_id, agent_name); - if let Some(agent) = agents.get(&key) - && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) - { - return Ok(agent_info_from_entry(story_id, agent)); - } - } - - let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms); - - loop { - let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); - if remaining.is_zero() { - return Err(format!( - "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" - )); - } - - match tokio::time::timeout(remaining, rx.recv()).await { - Ok(Ok(event)) => { - let is_terminal = match &event { - AgentEvent::Done { .. } | AgentEvent::Error { .. } => true, - AgentEvent::Status { status, .. } if status == "stopped" => true, - _ => false, - }; - if is_terminal { - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let key = composite_key(story_id, agent_name); - return Ok(if let Some(agent) = agents.get(&key) { - agent_info_from_entry(story_id, agent) - } else { - // Agent was removed from map (e.g. stop_agent removes it after - // the "stopped" status event is sent). - let (status, session_id) = match event { - AgentEvent::Done { session_id, .. } => { - (AgentStatus::Completed, session_id) - } - _ => (AgentStatus::Failed, None), - }; - AgentInfo { - story_id: story_id.to_string(), - agent_name: agent_name.to_string(), - status, - session_id, - worktree_path: None, - base_branch: None, - completion: None, - log_session_id: None, - } - }); - } - } - Ok(Err(broadcast::error::RecvError::Lagged(_))) => { - // Missed some buffered events — check current status before resuming. - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let key = composite_key(story_id, agent_name); - if let Some(agent) = agents.get(&key) - && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) - { - return Ok(agent_info_from_entry(story_id, agent)); - } - // Still running — continue the loop. - } - Ok(Err(broadcast::error::RecvError::Closed)) => { - // Channel closed: no more events will arrive. Return current state. - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let key = composite_key(story_id, agent_name); - if let Some(agent) = agents.get(&key) { - return Ok(agent_info_from_entry(story_id, agent)); - } - return Err(format!( - "Agent '{agent_name}' for story '{story_id}' channel closed unexpectedly" - )); - } - Err(_) => { - return Err(format!( - "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" - )); - } - } - } - } - - /// Create a worktree for the given story using the server port (writes .mcp.json). - pub async fn create_worktree( - &self, - project_root: &Path, - story_id: &str, - ) -> Result { - let config = ProjectConfig::load(project_root)?; - worktree::create_worktree(project_root, story_id, &config, self.port).await - } - - /// Get project root helper. - pub fn get_project_root(&self, state: &crate::state::SessionState) -> Result { - state.get_project_root() - } - - /// Get the log session ID and project root for an agent, if available. - /// - /// Used by MCP tools to find the persistent log file for a completed agent. - pub fn get_log_info(&self, story_id: &str, agent_name: &str) -> Option<(String, PathBuf)> { - let key = composite_key(story_id, agent_name); - let agents = self.agents.lock().ok()?; - let agent = agents.get(&key)?; - let session_id = agent.log_session_id.clone()?; - let project_root = agent.project_root.clone()?; - Some((session_id, project_root)) - } - - /// Remove all agent entries for a given story_id from the pool. - /// - /// Called when a story is archived so that stale entries don't accumulate. - /// Returns the number of entries removed. - pub fn remove_agents_for_story(&self, story_id: &str) -> usize { - let mut agents = match self.agents.lock() { - Ok(a) => a, - Err(e) => { - slog_error!("[agents] Failed to lock pool for cleanup of '{story_id}': {e}"); - return 0; - } - }; - let prefix = format!("{story_id}:"); - let keys_to_remove: Vec = agents - .keys() - .filter(|k| k.starts_with(&prefix)) - .cloned() - .collect(); - let count = keys_to_remove.len(); - for key in &keys_to_remove { - agents.remove(key); - } - if count > 0 { - slog!("[agents] Removed {count} agent entries for archived story '{story_id}'"); - } - count - } - - /// Test helper: inject a pre-built agent entry so unit tests can exercise - /// wait/subscribe logic without spawning a real process. - #[cfg(test)] - pub fn inject_test_agent( - &self, - story_id: &str, - agent_name: &str, - status: AgentStatus, - ) -> broadcast::Sender { - let (tx, _) = broadcast::channel::(64); - let key = composite_key(story_id, agent_name); - let mut agents = self.agents.lock().unwrap(); - agents.insert( - key, - StoryAgent { - agent_name: agent_name.to_string(), - status, - worktree_info: None, - session_id: None, - tx: tx.clone(), - task_handle: None, - event_log: Arc::new(Mutex::new(Vec::new())), - completion: None, - project_root: None, - log_session_id: None, - merge_failure_reported: false, - }, - ); - tx - } - - /// Test helper: inject an agent with a specific worktree path for testing - /// gate-related logic. - #[cfg(test)] - pub fn inject_test_agent_with_path( - &self, - story_id: &str, - agent_name: &str, - status: AgentStatus, - worktree_path: PathBuf, - ) -> broadcast::Sender { - let (tx, _) = broadcast::channel::(64); - let key = composite_key(story_id, agent_name); - let mut agents = self.agents.lock().unwrap(); - agents.insert( - key, - StoryAgent { - agent_name: agent_name.to_string(), - status, - worktree_info: Some(WorktreeInfo { - path: worktree_path, - branch: format!("feature/story-{story_id}"), - base_branch: "master".to_string(), - }), - session_id: None, - tx: tx.clone(), - task_handle: None, - event_log: Arc::new(Mutex::new(Vec::new())), - completion: None, - project_root: None, - log_session_id: None, - merge_failure_reported: false, - }, - ); - tx - } - - /// Test helper: inject an agent with a completion report and project_root - /// for testing pipeline advance logic without spawning real agents. - #[cfg(test)] - pub fn inject_test_agent_with_completion( - &self, - story_id: &str, - agent_name: &str, - status: AgentStatus, - project_root: PathBuf, - completion: CompletionReport, - ) -> broadcast::Sender { - let (tx, _) = broadcast::channel::(64); - let key = composite_key(story_id, agent_name); - let mut agents = self.agents.lock().unwrap(); - agents.insert( - key, - StoryAgent { - agent_name: agent_name.to_string(), - status, - worktree_info: None, - session_id: None, - tx: tx.clone(), - task_handle: None, - event_log: Arc::new(Mutex::new(Vec::new())), - completion: Some(completion), - project_root: Some(project_root), - log_session_id: None, - merge_failure_reported: false, - }, - ); - tx - } - - /// Inject a Running agent with a pre-built (possibly finished) task handle. - /// Used by watchdog tests to simulate an orphaned agent. - #[cfg(test)] - pub fn inject_test_agent_with_handle( - &self, - story_id: &str, - agent_name: &str, - status: AgentStatus, - task_handle: tokio::task::JoinHandle<()>, - ) -> broadcast::Sender { - let (tx, _) = broadcast::channel::(64); - let key = composite_key(story_id, agent_name); - let mut agents = self.agents.lock().unwrap(); - agents.insert( - key, - StoryAgent { - agent_name: agent_name.to_string(), - status, - worktree_info: None, - session_id: None, - tx: tx.clone(), - task_handle: Some(task_handle), - event_log: Arc::new(Mutex::new(Vec::new())), - completion: None, - project_root: None, - log_session_id: None, - merge_failure_reported: false, - }, - ); - tx - } - - /// Test helper: inject a child killer into the registry. - #[cfg(test)] - pub fn inject_child_killer(&self, key: &str, killer: Box) { - let mut killers = self.child_killers.lock().unwrap(); - killers.insert(key.to_string(), killer); - } - - /// Test helper: return the number of registered child killers. - #[cfg(test)] - pub fn child_killer_count(&self) -> usize { - self.child_killers.lock().unwrap().len() - } -} - -/// Return the active pipeline stage directory name for `story_id`, or `None` if the -/// story is not in any active stage (`2_current/`, `3_qa/`, `4_merge/`). -fn find_active_story_stage(project_root: &Path, story_id: &str) -> Option<&'static str> { - const STAGES: [&str; 3] = ["2_current", "3_qa", "4_merge"]; - for stage in &STAGES { - let path = project_root - .join(".storkit") - .join("work") - .join(stage) - .join(format!("{story_id}.md")); - if path.exists() { - return Some(stage); - } - } - None -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::agents::{AgentEvent, AgentStatus, PipelineStage}; - use crate::config::ProjectConfig; - use portable_pty::{CommandBuilder, PtySize, native_pty_system}; - use std::process::Command; - - fn make_config(toml_str: &str) -> ProjectConfig { - ProjectConfig::parse(toml_str).unwrap() - } - - #[tokio::test] - async fn wait_for_agent_returns_immediately_if_completed() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("s1", "bot", AgentStatus::Completed); - - let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap(); - assert_eq!(info.status, AgentStatus::Completed); - assert_eq!(info.story_id, "s1"); - assert_eq!(info.agent_name, "bot"); - } - - #[tokio::test] - async fn wait_for_agent_returns_immediately_if_failed() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("s2", "bot", AgentStatus::Failed); - - let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap(); - assert_eq!(info.status, AgentStatus::Failed); - } - - #[tokio::test] - async fn wait_for_agent_completes_on_done_event() { - let pool = AgentPool::new_test(3001); - let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running); - - // Send Done event after a short delay - let tx_clone = tx.clone(); - tokio::spawn(async move { - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - let _ = tx_clone.send(AgentEvent::Done { - story_id: "s3".to_string(), - agent_name: "bot".to_string(), - session_id: Some("sess-abc".to_string()), - }); - }); - - let info = pool.wait_for_agent("s3", "bot", 2000).await.unwrap(); - assert_eq!(info.story_id, "s3"); - } - - #[tokio::test] - async fn wait_for_agent_times_out() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("s4", "bot", AgentStatus::Running); - - let result = pool.wait_for_agent("s4", "bot", 50).await; - assert!(result.is_err()); - let msg = result.unwrap_err(); - assert!(msg.contains("Timed out"), "unexpected message: {msg}"); - } - - #[tokio::test] - async fn wait_for_agent_errors_for_nonexistent() { - let pool = AgentPool::new_test(3001); - let result = pool.wait_for_agent("no_story", "no_bot", 100).await; - assert!(result.is_err()); - } - - #[tokio::test] - async fn wait_for_agent_completes_on_stopped_status_event() { - let pool = AgentPool::new_test(3001); - let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running); - - let tx_clone = tx.clone(); - tokio::spawn(async move { - tokio::time::sleep(std::time::Duration::from_millis(30)).await; - let _ = tx_clone.send(AgentEvent::Status { - story_id: "s5".to_string(), - agent_name: "bot".to_string(), - status: "stopped".to_string(), - }); - }); - - let info = pool.wait_for_agent("s5", "bot", 2000).await.unwrap(); - assert_eq!(info.story_id, "s5"); - } - - // ── kill_all_children tests ──────────────────────────────────── - - /// Returns true if a process with the given PID is currently running. - fn process_is_running(pid: u32) -> bool { - Command::new("ps") - .args(["-p", &pid.to_string()]) - .output() - .map(|o| o.status.success()) - .unwrap_or(false) - } - - #[test] - fn kill_all_children_is_safe_on_empty_pool() { - let pool = AgentPool::new_test(3001); - pool.kill_all_children(); - assert_eq!(pool.child_killer_count(), 0); - } - - #[test] - fn kill_all_children_kills_real_process() { - let pool = AgentPool::new_test(3001); - - let pty_system = native_pty_system(); - let pair = pty_system - .openpty(PtySize { - rows: 24, - cols: 80, - pixel_width: 0, - pixel_height: 0, - }) - .expect("failed to open pty"); - - let mut cmd = CommandBuilder::new("sleep"); - cmd.arg("100"); - let mut child = pair - .slave - .spawn_command(cmd) - .expect("failed to spawn sleep"); - let pid = child.process_id().expect("no pid"); - - pool.inject_child_killer("story:agent", child.clone_killer()); - - assert!( - process_is_running(pid), - "process {pid} should be running before kill_all_children" - ); - - pool.kill_all_children(); - let _ = child.wait(); - - assert!( - !process_is_running(pid), - "process {pid} should have been killed by kill_all_children" - ); - } - - #[test] - fn kill_all_children_clears_registry() { - let pool = AgentPool::new_test(3001); - - let pty_system = native_pty_system(); - let pair = pty_system - .openpty(PtySize { - rows: 24, - cols: 80, - pixel_width: 0, - pixel_height: 0, - }) - .expect("failed to open pty"); - - let mut cmd = CommandBuilder::new("sleep"); - cmd.arg("1"); - let mut child = pair - .slave - .spawn_command(cmd) - .expect("failed to spawn sleep"); - - pool.inject_child_killer("story:agent", child.clone_killer()); - assert_eq!(pool.child_killer_count(), 1); - - pool.kill_all_children(); - let _ = child.wait(); - - assert_eq!( - pool.child_killer_count(), - 0, - "child_killers should be cleared after kill_all_children" - ); - } - - // ── available_agents_for_stage tests (story 190) ────────────────────────── - - #[test] - fn available_agents_for_stage_returns_idle_agents() { - let config = make_config( - r#" -[[agent]] -name = "coder-1" -stage = "coder" - -[[agent]] -name = "coder-2" -stage = "coder" - -[[agent]] -name = "qa" -stage = "qa" -"#, - ); - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); - - let available = pool - .available_agents_for_stage(&config, &PipelineStage::Coder) - .unwrap(); - assert_eq!(available, vec!["coder-2"]); - - let available_qa = pool - .available_agents_for_stage(&config, &PipelineStage::Qa) - .unwrap(); - assert_eq!(available_qa, vec!["qa"]); - } - - #[test] - fn available_agents_for_stage_returns_empty_when_all_busy() { - let config = make_config( - r#" -[[agent]] -name = "coder-1" -stage = "coder" -"#, - ); - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); - - let available = pool - .available_agents_for_stage(&config, &PipelineStage::Coder) - .unwrap(); - assert!(available.is_empty()); - } - - #[test] - fn available_agents_for_stage_ignores_completed_agents() { - let config = make_config( - r#" -[[agent]] -name = "coder-1" -stage = "coder" -"#, - ); - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story-1", "coder-1", AgentStatus::Completed); - - let available = pool - .available_agents_for_stage(&config, &PipelineStage::Coder) - .unwrap(); - assert_eq!(available, vec!["coder-1"]); - } - - #[tokio::test] - async fn start_agent_auto_selects_second_coder_when_first_busy() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write( - sk.join("project.toml"), - r#" -[[agent]] -name = "supervisor" -stage = "other" - -[[agent]] -name = "coder-1" -stage = "coder" - -[[agent]] -name = "coder-2" -stage = "coder" -"#, - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("other-story", "coder-1", AgentStatus::Running); - - let result = pool - .start_agent(tmp.path(), "42_my_story", None, None) - .await; - match result { - Ok(info) => { - assert_eq!(info.agent_name, "coder-2"); - } - Err(err) => { - assert!( - !err.contains("All coder agents are busy"), - "should have selected coder-2 but got: {err}" - ); - assert!( - !err.contains("No coder agent configured"), - "should not fail on agent selection, got: {err}" - ); - } - } - } - - #[tokio::test] - async fn start_agent_returns_busy_when_all_coders_occupied() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write( - sk.join("project.toml"), - r#" -[[agent]] -name = "coder-1" -stage = "coder" - -[[agent]] -name = "coder-2" -stage = "coder" -"#, - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); - pool.inject_test_agent("story-2", "coder-2", AgentStatus::Pending); - - let result = pool.start_agent(tmp.path(), "story-3", None, None).await; - assert!(result.is_err()); - let err = result.unwrap_err(); - assert!( - err.contains("All coder agents are busy"), - "expected busy error, got: {err}" - ); - } - - #[tokio::test] - async fn start_agent_moves_story_to_current_when_coders_busy() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - let backlog = sk.join("work/1_backlog"); - std::fs::create_dir_all(&backlog).unwrap(); - std::fs::write( - sk.join("project.toml"), - r#" -[[agent]] -name = "coder-1" -stage = "coder" -"#, - ) - .unwrap(); - std::fs::write(backlog.join("story-3.md"), "---\nname: Story 3\n---\n").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); - - let result = pool.start_agent(tmp.path(), "story-3", None, None).await; - - assert!(result.is_err()); - let err = result.unwrap_err(); - assert!( - err.contains("All coder agents are busy"), - "expected busy error, got: {err}" - ); - assert!( - err.contains("queued in work/2_current/"), - "expected story-to-current message, got: {err}" - ); - - let current_path = sk.join("work/2_current/story-3.md"); - assert!( - current_path.exists(), - "story should be in 2_current/ after busy error, but was not" - ); - let backlog_path = backlog.join("story-3.md"); - assert!( - !backlog_path.exists(), - "story should no longer be in 1_backlog/" - ); - } - - #[tokio::test] - async fn start_agent_story_already_in_current_is_noop() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - let current = sk.join("work/2_current"); - std::fs::create_dir_all(¤t).unwrap(); - std::fs::write( - sk.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", - ) - .unwrap(); - std::fs::write(current.join("story-5.md"), "---\nname: Story 5\n---\n").unwrap(); - - let pool = AgentPool::new_test(3001); - - let result = pool.start_agent(tmp.path(), "story-5", None, None).await; - match result { - Ok(_) => {} - Err(e) => { - assert!( - !e.contains("Failed to move"), - "should not fail on idempotent move, got: {e}" - ); - } - } - } - - #[tokio::test] - async fn start_agent_explicit_name_unchanged_when_busy() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write( - sk.join("project.toml"), - r#" -[[agent]] -name = "coder-1" -stage = "coder" - -[[agent]] -name = "coder-2" -stage = "coder" -"#, - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); - - let result = pool - .start_agent(tmp.path(), "story-2", Some("coder-1"), None) - .await; - assert!(result.is_err()); - let err = result.unwrap_err(); - assert!( - err.contains("coder-1") && err.contains("already running"), - "expected explicit busy error, got: {err}" - ); - } - - // ── start_agent single-instance concurrency tests ───────────────────────── - - #[tokio::test] - async fn start_agent_rejects_when_same_agent_already_running_on_another_story() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(&sk_dir).unwrap(); - fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story-a", "qa", AgentStatus::Running); - - let result = pool.start_agent(root, "story-b", Some("qa"), None).await; - - assert!( - result.is_err(), - "start_agent should fail when qa is already running on another story" - ); - let err = result.unwrap_err(); - assert!( - err.contains("already running") || err.contains("becomes available"), - "error message should explain why: got '{err}'" - ); - } - - #[tokio::test] - async fn start_agent_allows_new_story_when_previous_run_is_completed() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(&sk_dir).unwrap(); - fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story-a", "qa", AgentStatus::Completed); - - let result = pool.start_agent(root, "story-b", Some("qa"), None).await; - - if let Err(ref e) = result { - assert!( - !e.contains("already running") && !e.contains("becomes available"), - "completed agent must not trigger the concurrency guard: got '{e}'" - ); - } - } - - // ── bug 118: pending entry cleanup on start_agent failure ──────────────── - - #[tokio::test] - async fn start_agent_cleans_up_pending_entry_on_failure() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(&sk_dir).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", - ) - .unwrap(); - - let upcoming = root.join(".storkit/work/1_backlog"); - fs::create_dir_all(&upcoming).unwrap(); - fs::write(upcoming.join("50_story_test.md"), "---\nname: Test\n---\n").unwrap(); - - let pool = AgentPool::new_test(3099); - - let result = pool - .start_agent(root, "50_story_test", Some("coder-1"), None) - .await; - - assert!( - result.is_ok(), - "start_agent should return Ok(Pending) immediately: {:?}", - result.err() - ); - assert_eq!( - result.unwrap().status, - AgentStatus::Pending, - "initial status must be Pending" - ); - - let final_info = pool - .wait_for_agent("50_story_test", "coder-1", 5000) - .await - .expect("wait_for_agent should not time out"); - assert_eq!( - final_info.status, - AgentStatus::Failed, - "agent must transition to Failed after worktree creation error" - ); - - let agents = pool.agents.lock().unwrap(); - let failed_entry = agents - .values() - .find(|a| a.agent_name == "coder-1" && a.status == AgentStatus::Failed); - assert!( - failed_entry.is_some(), - "agent pool must retain a Failed entry so the UI can show the error state" - ); - drop(agents); - - let events = pool - .drain_events("50_story_test", "coder-1") - .expect("drain_events should succeed"); - let has_error_event = events.iter().any(|e| matches!(e, AgentEvent::Error { .. })); - assert!( - has_error_event, - "event_log must contain AgentEvent::Error after worktree creation fails" - ); - } - - #[tokio::test] - async fn start_agent_guard_does_not_remove_running_entry() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(&sk_dir).unwrap(); - fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap(); - - let pool = AgentPool::new_test(3099); - pool.inject_test_agent("story-x", "qa", AgentStatus::Running); - - let result = pool.start_agent(root, "story-y", Some("qa"), None).await; - - assert!(result.is_err()); - let err = result.unwrap_err(); - assert!( - err.contains("already running") || err.contains("becomes available"), - "running entry must survive: got '{err}'" - ); - } - - // ── TOCTOU race-condition regression tests (story 132) ─────────────────── - - #[tokio::test] - async fn toctou_pending_entry_blocks_same_agent_on_different_story() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(&sk_dir).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - pool.inject_test_agent("86_story_foo", "coder-1", AgentStatus::Pending); - - let result = pool - .start_agent(root, "130_story_bar", Some("coder-1"), None) - .await; - - assert!(result.is_err(), "second start_agent must be rejected"); - let err = result.unwrap_err(); - assert!( - err.contains("already running") || err.contains("becomes available"), - "expected concurrency-rejection message, got: '{err}'" - ); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn toctou_concurrent_start_agent_same_agent_exactly_one_concurrency_rejection() { - use std::fs; - use std::sync::Arc; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path().to_path_buf(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/1_backlog")).unwrap(); - fs::write( - root.join(".storkit/project.toml"), - "[[agent]]\nname = \"coder-1\"\n", - ) - .unwrap(); - fs::write( - root.join(".storkit/work/1_backlog/86_story_foo.md"), - "---\nname: Foo\n---\n", - ) - .unwrap(); - fs::write( - root.join(".storkit/work/1_backlog/130_story_bar.md"), - "---\nname: Bar\n---\n", - ) - .unwrap(); - - let pool = Arc::new(AgentPool::new_test(3099)); - - let pool1 = pool.clone(); - let root1 = root.clone(); - let t1 = tokio::spawn(async move { - pool1 - .start_agent(&root1, "86_story_foo", Some("coder-1"), None) - .await - }); - - let pool2 = pool.clone(); - let root2 = root.clone(); - let t2 = tokio::spawn(async move { - pool2 - .start_agent(&root2, "130_story_bar", Some("coder-1"), None) - .await - }); - - let (r1, r2) = tokio::join!(t1, t2); - let r1 = r1.unwrap(); - let r2 = r2.unwrap(); - - let concurrency_rejections = [&r1, &r2] - .iter() - .filter(|r| { - r.as_ref().is_err_and(|e| { - e.contains("already running") || e.contains("becomes available") - }) - }) - .count(); - - assert_eq!( - concurrency_rejections, 1, - "exactly one call must be rejected by the concurrency check; \ - got r1={r1:?} r2={r2:?}" - ); - } - - // ── story-230: prevent duplicate stage agents on same story ─────────────── - - #[tokio::test] - async fn start_agent_rejects_second_coder_stage_on_same_story() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(&sk_dir).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running); - - let result = pool - .start_agent(root, "42_story_foo", Some("coder-2"), None) - .await; - - assert!( - result.is_err(), - "second coder on same story must be rejected" - ); - let err = result.unwrap_err(); - assert!( - err.contains("same pipeline stage"), - "error must mention same pipeline stage, got: '{err}'" - ); - assert!( - err.contains("coder-1") && err.contains("coder-2"), - "error must name both agents, got: '{err}'" - ); - } - - #[tokio::test] - async fn start_agent_rejects_second_qa_stage_on_same_story() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(&sk_dir).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"qa-1\"\nstage = \"qa\"\n\n\ - [[agent]]\nname = \"qa-2\"\nstage = \"qa\"\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - pool.inject_test_agent("55_story_bar", "qa-1", AgentStatus::Running); - - let result = pool - .start_agent(root, "55_story_bar", Some("qa-2"), None) - .await; - - assert!(result.is_err(), "second qa on same story must be rejected"); - let err = result.unwrap_err(); - assert!( - err.contains("same pipeline stage"), - "error must mention same pipeline stage, got: '{err}'" - ); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn start_agent_concurrent_two_coders_same_story_exactly_one_stage_rejection() { - use std::fs; - use std::sync::Arc; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path().to_path_buf(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); - fs::write( - root.join(".storkit/project.toml"), - "[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n", - ) - .unwrap(); - fs::write( - root.join(".storkit/work/2_current/42_story_foo.md"), - "---\nname: Foo\n---\n", - ) - .unwrap(); - - let pool = Arc::new(AgentPool::new_test(3099)); - - let pool1 = pool.clone(); - let root1 = root.clone(); - let t1 = tokio::spawn(async move { - pool1 - .start_agent(&root1, "42_story_foo", Some("coder-1"), None) - .await - }); - - let pool2 = pool.clone(); - let root2 = root.clone(); - let t2 = tokio::spawn(async move { - pool2 - .start_agent(&root2, "42_story_foo", Some("coder-2"), None) - .await - }); - - let (r1, r2) = tokio::join!(t1, t2); - let r1 = r1.unwrap(); - let r2 = r2.unwrap(); - - let stage_rejections = [&r1, &r2] - .iter() - .filter(|r| r.as_ref().is_err_and(|e| e.contains("same pipeline stage"))) - .count(); - - assert_eq!( - stage_rejections, 1, - "exactly one call must be rejected by the stage-conflict check; \ - got r1={r1:?} r2={r2:?}" - ); - } - - #[tokio::test] - async fn start_agent_two_coders_different_stories_not_blocked_by_stage_check() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/1_backlog")).unwrap(); - fs::write( - root.join(".storkit/project.toml"), - "[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n", - ) - .unwrap(); - fs::write( - root.join(".storkit/work/1_backlog/99_story_baz.md"), - "---\nname: Baz\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running); - - let result = pool - .start_agent(root, "99_story_baz", Some("coder-2"), None) - .await; - - if let Err(ref e) = result { - assert!( - !e.contains("same pipeline stage"), - "stage-conflict guard must not fire for agents on different stories; \ - got: '{e}'" - ); - } - } - - // ── bug 312: stage-pipeline mismatch guard in start_agent ────────────── - - #[tokio::test] - async fn start_agent_rejects_mergemaster_on_coding_stage_story() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ - [[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", - ) - .unwrap(); - fs::write( - sk_dir.join("work/2_current/310_story_foo.md"), - "---\nname: Foo\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - let result = pool - .start_agent(root, "310_story_foo", Some("mergemaster"), None) - .await; - - assert!( - result.is_err(), - "mergemaster must not be assigned to a story in 2_current/" - ); - let err = result.unwrap_err(); - assert!( - err.contains("stage") && err.contains("2_current"), - "error must mention stage mismatch, got: '{err}'" - ); - } - - #[tokio::test] - async fn start_agent_rejects_coder_on_qa_stage_story() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/3_qa")).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ - [[agent]]\nname = \"qa\"\nstage = \"qa\"\n", - ) - .unwrap(); - fs::write( - sk_dir.join("work/3_qa/42_story_bar.md"), - "---\nname: Bar\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - let result = pool - .start_agent(root, "42_story_bar", Some("coder-1"), None) - .await; - - assert!( - result.is_err(), - "coder must not be assigned to a story in 3_qa/" - ); - let err = result.unwrap_err(); - assert!( - err.contains("stage") && err.contains("3_qa"), - "error must mention stage mismatch, got: '{err}'" - ); - } - - #[tokio::test] - async fn start_agent_rejects_qa_on_merge_stage_story() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/4_merge")).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"qa\"\nstage = \"qa\"\n\n\ - [[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", - ) - .unwrap(); - fs::write( - sk_dir.join("work/4_merge/55_story_baz.md"), - "---\nname: Baz\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - let result = pool - .start_agent(root, "55_story_baz", Some("qa"), None) - .await; - - assert!( - result.is_err(), - "qa must not be assigned to a story in 4_merge/" - ); - let err = result.unwrap_err(); - assert!( - err.contains("stage") && err.contains("4_merge"), - "error must mention stage mismatch, got: '{err}'" - ); - } - - #[tokio::test] - async fn start_agent_allows_supervisor_on_any_stage() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"supervisor\"\nstage = \"other\"\n", - ) - .unwrap(); - fs::write( - sk_dir.join("work/2_current/77_story_sup.md"), - "---\nname: Sup\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - let result = pool - .start_agent(root, "77_story_sup", Some("supervisor"), None) - .await; - - match result { - Ok(_) => {} - Err(e) => { - assert!( - !e.contains("stage:") || !e.contains("cannot be assigned"), - "supervisor should not be rejected for stage mismatch, got: '{e}'" - ); - } - } - } - - #[tokio::test] - async fn start_agent_allows_correct_stage_agent() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/4_merge")).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", - ) - .unwrap(); - fs::write( - sk_dir.join("work/4_merge/88_story_ok.md"), - "---\nname: OK\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - let result = pool - .start_agent(root, "88_story_ok", Some("mergemaster"), None) - .await; - - match result { - Ok(_) => {} - Err(e) => { - assert!( - !e.contains("cannot be assigned"), - "mergemaster on 4_merge/ story should not fail stage check, got: '{e}'" - ); - } - } - } - - // ── find_active_story_stage tests ───────────────────────────────────────── - - #[test] - fn find_active_story_stage_detects_current() { - use std::fs; - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - let current = root.join(".storkit/work/2_current"); - fs::create_dir_all(¤t).unwrap(); - fs::write(current.join("10_story_test.md"), "test").unwrap(); - - assert_eq!( - find_active_story_stage(root, "10_story_test"), - Some("2_current") - ); - } - - #[test] - fn find_active_story_stage_detects_qa() { - use std::fs; - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - let qa = root.join(".storkit/work/3_qa"); - fs::create_dir_all(&qa).unwrap(); - fs::write(qa.join("11_story_test.md"), "test").unwrap(); - - assert_eq!(find_active_story_stage(root, "11_story_test"), Some("3_qa")); - } - - #[test] - fn find_active_story_stage_detects_merge() { - use std::fs; - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - let merge = root.join(".storkit/work/4_merge"); - fs::create_dir_all(&merge).unwrap(); - fs::write(merge.join("12_story_test.md"), "test").unwrap(); - - assert_eq!( - find_active_story_stage(root, "12_story_test"), - Some("4_merge") - ); - } - - #[test] - fn find_active_story_stage_returns_none_for_unknown_story() { - let tmp = tempfile::tempdir().unwrap(); - assert_eq!(find_active_story_stage(tmp.path(), "99_nonexistent"), None); - } - - // ── remove_agents_for_story tests ──────────────────────────────────────── - - #[test] - fn remove_agents_for_story_removes_all_entries() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story_a", "coder-1", AgentStatus::Completed); - pool.inject_test_agent("story_a", "qa", AgentStatus::Failed); - pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running); - - let removed = pool.remove_agents_for_story("story_a"); - assert_eq!(removed, 2, "should remove both agents for story_a"); - - let agents = pool.list_agents().unwrap(); - assert_eq!(agents.len(), 1, "only story_b agent should remain"); - assert_eq!(agents[0].story_id, "story_b"); - } - - #[test] - fn remove_agents_for_story_returns_zero_when_no_match() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running); - - let removed = pool.remove_agents_for_story("nonexistent"); - assert_eq!(removed, 0); - - let agents = pool.list_agents().unwrap(); - assert_eq!(agents.len(), 1, "existing agents should not be affected"); - } - - // ── front matter agent preference (bug 379) ────────────────────────────── - - #[tokio::test] - async fn start_agent_honours_front_matter_agent_when_idle() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - let backlog = sk.join("work/1_backlog"); - std::fs::create_dir_all(&backlog).unwrap(); - std::fs::write( - sk.join("project.toml"), - r#" -[[agent]] -name = "coder-sonnet" -stage = "coder" - -[[agent]] -name = "coder-opus" -stage = "coder" -"#, - ) - .unwrap(); - // Story file with agent preference in front matter. - std::fs::write( - backlog.join("368_story_test.md"), - "---\nname: Test Story\nagent: coder-opus\n---\n# Story 368\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3010); - // coder-sonnet is busy so without front matter the auto-selection - // would skip coder-opus and try something else. - pool.inject_test_agent("other-story", "coder-sonnet", AgentStatus::Running); - - let result = pool - .start_agent(tmp.path(), "368_story_test", None, None) - .await; - match result { - Ok(info) => { - assert_eq!( - info.agent_name, "coder-opus", - "should pick the front-matter preferred agent" - ); - } - Err(err) => { - // Allowed to fail for infrastructure reasons (no git repo), - // but NOT due to agent selection ignoring the preference. - assert!( - !err.contains("All coder agents are busy"), - "should not report busy when coder-opus is idle: {err}" - ); - assert!( - !err.contains("coder-sonnet"), - "should not have picked coder-sonnet: {err}" - ); - } - } - } - - #[tokio::test] - async fn start_agent_returns_error_when_front_matter_agent_busy() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - let backlog = sk.join("work/1_backlog"); - std::fs::create_dir_all(&backlog).unwrap(); - std::fs::write( - sk.join("project.toml"), - r#" -[[agent]] -name = "coder-sonnet" -stage = "coder" - -[[agent]] -name = "coder-opus" -stage = "coder" -"#, - ) - .unwrap(); - std::fs::write( - backlog.join("368_story_test.md"), - "---\nname: Test Story\nagent: coder-opus\n---\n# Story 368\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3011); - // Preferred agent is busy — should NOT fall back to coder-sonnet. - pool.inject_test_agent("other-story", "coder-opus", AgentStatus::Running); - - let result = pool - .start_agent(tmp.path(), "368_story_test", None, None) - .await; - assert!(result.is_err(), "expected error when preferred agent is busy"); - let err = result.unwrap_err(); - assert!( - err.contains("coder-opus"), - "error should mention the preferred agent: {err}" - ); - assert!( - err.contains("busy") || err.contains("queued"), - "error should say agent is busy or story is queued: {err}" - ); - } - - // ── archive + cleanup integration test ─────────────────────────────────── - - #[tokio::test] - async fn archiving_story_removes_agent_entries_from_pool() { - use crate::agents::lifecycle::move_story_to_archived; - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let current = root.join(".storkit/work/2_current"); - fs::create_dir_all(¤t).unwrap(); - fs::write(current.join("60_story_cleanup.md"), "test").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("60_story_cleanup", "coder-1", AgentStatus::Completed); - pool.inject_test_agent("60_story_cleanup", "qa", AgentStatus::Completed); - pool.inject_test_agent("61_story_other", "coder-1", AgentStatus::Running); - - assert_eq!(pool.list_agents().unwrap().len(), 3); - - move_story_to_archived(root, "60_story_cleanup").unwrap(); - pool.remove_agents_for_story("60_story_cleanup"); - - let remaining = pool.list_agents().unwrap(); - assert_eq!( - remaining.len(), - 1, - "only the other story's agent should remain" - ); - assert_eq!(remaining[0].story_id, "61_story_other"); - - assert!( - root.join(".storkit/work/5_done/60_story_cleanup.md") - .exists() - ); - } } diff --git a/server/src/agents/pool/process.rs b/server/src/agents/pool/process.rs new file mode 100644 index 00000000..4ead32b5 --- /dev/null +++ b/server/src/agents/pool/process.rs @@ -0,0 +1,141 @@ +use crate::slog; + +use super::AgentPool; + +impl AgentPool { + /// Kill all active PTY child processes. + /// + /// Called on server shutdown to prevent orphaned Claude Code processes from + /// continuing to run after the server exits. Each registered killer is called + /// once, then the registry is cleared. + pub fn kill_all_children(&self) { + if let Ok(mut killers) = self.child_killers.lock() { + for (key, killer) in killers.iter_mut() { + slog!("[agents] Killing child process for {key} on shutdown"); + let _ = killer.kill(); + } + killers.clear(); + } + } + + /// Kill and deregister the child process for a specific agent key. + /// + /// Used by `stop_agent` to ensure the PTY child is terminated even though + /// aborting a `spawn_blocking` task handle does not interrupt the blocking thread. + pub(super) fn kill_child_for_key(&self, key: &str) { + if let Ok(mut killers) = self.child_killers.lock() + && let Some(mut killer) = killers.remove(key) + { + slog!("[agents] Killing child process for {key} on stop"); + let _ = killer.kill(); + } + } + + /// Test helper: inject a child killer into the registry. + #[cfg(test)] + pub fn inject_child_killer(&self, key: &str, killer: Box) { + let mut killers = self.child_killers.lock().unwrap(); + killers.insert(key.to_string(), killer); + } + + /// Test helper: return the number of registered child killers. + #[cfg(test)] + pub fn child_killer_count(&self) -> usize { + self.child_killers.lock().unwrap().len() + } +} + +#[cfg(test)] +mod tests { + use super::super::AgentPool; + use portable_pty::{CommandBuilder, PtySize, native_pty_system}; + use std::process::Command; + + /// Returns true if a process with the given PID is currently running. + fn process_is_running(pid: u32) -> bool { + Command::new("ps") + .args(["-p", &pid.to_string()]) + .output() + .map(|o| o.status.success()) + .unwrap_or(false) + } + + #[test] + fn kill_all_children_is_safe_on_empty_pool() { + let pool = AgentPool::new_test(3001); + pool.kill_all_children(); + assert_eq!(pool.child_killer_count(), 0); + } + + #[test] + fn kill_all_children_kills_real_process() { + let pool = AgentPool::new_test(3001); + + let pty_system = native_pty_system(); + let pair = pty_system + .openpty(PtySize { + rows: 24, + cols: 80, + pixel_width: 0, + pixel_height: 0, + }) + .expect("failed to open pty"); + + let mut cmd = CommandBuilder::new("sleep"); + cmd.arg("100"); + let mut child = pair + .slave + .spawn_command(cmd) + .expect("failed to spawn sleep"); + let pid = child.process_id().expect("no pid"); + + pool.inject_child_killer("story:agent", child.clone_killer()); + + assert!( + process_is_running(pid), + "process {pid} should be running before kill_all_children" + ); + + pool.kill_all_children(); + let _ = child.wait(); + + assert!( + !process_is_running(pid), + "process {pid} should have been killed by kill_all_children" + ); + } + + #[test] + fn kill_all_children_clears_registry() { + let pool = AgentPool::new_test(3001); + + let pty_system = native_pty_system(); + let pair = pty_system + .openpty(PtySize { + rows: 24, + cols: 80, + pixel_width: 0, + pixel_height: 0, + }) + .expect("failed to open pty"); + + let mut cmd = CommandBuilder::new("sleep"); + cmd.arg("1"); + let mut child = pair + .slave + .spawn_command(cmd) + .expect("failed to spawn sleep"); + + pool.inject_child_killer("story:agent", child.clone_killer()); + assert_eq!(pool.child_killer_count(), 1); + + pool.kill_all_children(); + let _ = child.wait(); + + assert_eq!( + pool.child_killer_count(), + 0, + "child_killers should be cleared after kill_all_children" + ); + } +} diff --git a/server/src/agents/pool/query.rs b/server/src/agents/pool/query.rs new file mode 100644 index 00000000..0b68e0d6 --- /dev/null +++ b/server/src/agents/pool/query.rs @@ -0,0 +1,166 @@ +use crate::config::ProjectConfig; +use std::path::PathBuf; +use tokio::sync::broadcast; + +use super::super::{AgentEvent, AgentInfo, AgentStatus, PipelineStage, agent_config_stage}; +use super::types::{agent_info_from_entry, composite_key}; +use super::AgentPool; + +impl AgentPool { + /// Return the names of configured agents for `stage` that are not currently + /// running or pending. + pub fn available_agents_for_stage( + &self, + config: &ProjectConfig, + stage: &PipelineStage, + ) -> Result, String> { + let agents = self.agents.lock().map_err(|e| e.to_string())?; + Ok(config + .agent + .iter() + .filter(|cfg| agent_config_stage(cfg) == *stage) + .filter(|cfg| { + !agents.values().any(|a| { + a.agent_name == cfg.name + && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) + }) + }) + .map(|cfg| cfg.name.clone()) + .collect()) + } + + /// List all agents with their status. + pub fn list_agents(&self) -> Result, String> { + let agents = self.agents.lock().map_err(|e| e.to_string())?; + Ok(agents + .iter() + .map(|(key, agent)| { + // Extract story_id from composite key "story_id:agent_name" + let story_id = key + .rsplit_once(':') + .map(|(sid, _)| sid.to_string()) + .unwrap_or_else(|| key.clone()); + agent_info_from_entry(&story_id, agent) + }) + .collect()) + } + + /// Subscribe to events for a story agent. + pub fn subscribe( + &self, + story_id: &str, + agent_name: &str, + ) -> Result, String> { + let key = composite_key(story_id, agent_name); + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let agent = agents + .get(&key) + .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; + Ok(agent.tx.subscribe()) + } + + /// Drain accumulated events for polling. Returns all events since the last drain. + pub fn drain_events( + &self, + story_id: &str, + agent_name: &str, + ) -> Result, String> { + let key = composite_key(story_id, agent_name); + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let agent = agents + .get(&key) + .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; + let mut log = agent.event_log.lock().map_err(|e| e.to_string())?; + Ok(log.drain(..).collect()) + } + + /// Get the log session ID and project root for an agent, if available. + /// + /// Used by MCP tools to find the persistent log file for a completed agent. + pub fn get_log_info(&self, story_id: &str, agent_name: &str) -> Option<(String, PathBuf)> { + let key = composite_key(story_id, agent_name); + let agents = self.agents.lock().ok()?; + let agent = agents.get(&key)?; + let session_id = agent.log_session_id.clone()?; + let project_root = agent.project_root.clone()?; + Some((session_id, project_root)) + } +} + +#[cfg(test)] +mod tests { + use super::super::AgentPool; + use crate::agents::{AgentStatus, PipelineStage}; + use crate::config::ProjectConfig; + + fn make_config(toml_str: &str) -> ProjectConfig { + ProjectConfig::parse(toml_str).unwrap() + } + + #[test] + fn available_agents_for_stage_returns_idle_agents() { + let config = make_config( + r#" +[[agent]] +name = "coder-1" +stage = "coder" + +[[agent]] +name = "coder-2" +stage = "coder" + +[[agent]] +name = "qa" +stage = "qa" +"#, + ); + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); + + let available = pool + .available_agents_for_stage(&config, &PipelineStage::Coder) + .unwrap(); + assert_eq!(available, vec!["coder-2"]); + + let available_qa = pool + .available_agents_for_stage(&config, &PipelineStage::Qa) + .unwrap(); + assert_eq!(available_qa, vec!["qa"]); + } + + #[test] + fn available_agents_for_stage_returns_empty_when_all_busy() { + let config = make_config( + r#" +[[agent]] +name = "coder-1" +stage = "coder" +"#, + ); + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); + + let available = pool + .available_agents_for_stage(&config, &PipelineStage::Coder) + .unwrap(); + assert!(available.is_empty()); + } + + #[test] + fn available_agents_for_stage_ignores_completed_agents() { + let config = make_config( + r#" +[[agent]] +name = "coder-1" +stage = "coder" +"#, + ); + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story-1", "coder-1", AgentStatus::Completed); + + let available = pool + .available_agents_for_stage(&config, &PipelineStage::Coder) + .unwrap(); + assert_eq!(available, vec!["coder-1"]); + } +} diff --git a/server/src/agents/pool/test_helpers.rs b/server/src/agents/pool/test_helpers.rs new file mode 100644 index 00000000..d5ffeaa7 --- /dev/null +++ b/server/src/agents/pool/test_helpers.rs @@ -0,0 +1,138 @@ +use crate::worktree::WorktreeInfo; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; + +use super::super::{AgentEvent, AgentStatus, CompletionReport}; +use super::types::{StoryAgent, composite_key}; +use super::AgentPool; + +impl AgentPool { + /// Test helper: inject a pre-built agent entry so unit tests can exercise + /// wait/subscribe logic without spawning a real process. + pub fn inject_test_agent( + &self, + story_id: &str, + agent_name: &str, + status: AgentStatus, + ) -> broadcast::Sender { + let (tx, _) = broadcast::channel::(64); + let key = composite_key(story_id, agent_name); + let mut agents = self.agents.lock().unwrap(); + agents.insert( + key, + StoryAgent { + agent_name: agent_name.to_string(), + status, + worktree_info: None, + session_id: None, + tx: tx.clone(), + task_handle: None, + event_log: Arc::new(Mutex::new(Vec::new())), + completion: None, + project_root: None, + log_session_id: None, + merge_failure_reported: false, + }, + ); + tx + } + + /// Test helper: inject an agent with a specific worktree path for testing + /// gate-related logic. + pub fn inject_test_agent_with_path( + &self, + story_id: &str, + agent_name: &str, + status: AgentStatus, + worktree_path: PathBuf, + ) -> broadcast::Sender { + let (tx, _) = broadcast::channel::(64); + let key = composite_key(story_id, agent_name); + let mut agents = self.agents.lock().unwrap(); + agents.insert( + key, + StoryAgent { + agent_name: agent_name.to_string(), + status, + worktree_info: Some(WorktreeInfo { + path: worktree_path, + branch: format!("feature/story-{story_id}"), + base_branch: "master".to_string(), + }), + session_id: None, + tx: tx.clone(), + task_handle: None, + event_log: Arc::new(Mutex::new(Vec::new())), + completion: None, + project_root: None, + log_session_id: None, + merge_failure_reported: false, + }, + ); + tx + } + + /// Test helper: inject an agent with a completion report and project_root + /// for testing pipeline advance logic without spawning real agents. + pub fn inject_test_agent_with_completion( + &self, + story_id: &str, + agent_name: &str, + status: AgentStatus, + project_root: PathBuf, + completion: CompletionReport, + ) -> broadcast::Sender { + let (tx, _) = broadcast::channel::(64); + let key = composite_key(story_id, agent_name); + let mut agents = self.agents.lock().unwrap(); + agents.insert( + key, + StoryAgent { + agent_name: agent_name.to_string(), + status, + worktree_info: None, + session_id: None, + tx: tx.clone(), + task_handle: None, + event_log: Arc::new(Mutex::new(Vec::new())), + completion: Some(completion), + project_root: Some(project_root), + log_session_id: None, + merge_failure_reported: false, + }, + ); + tx + } + + /// Inject a Running agent with a pre-built (possibly finished) task handle. + /// Used by watchdog tests to simulate an orphaned agent. + pub fn inject_test_agent_with_handle( + &self, + story_id: &str, + agent_name: &str, + status: AgentStatus, + task_handle: tokio::task::JoinHandle<()>, + ) -> broadcast::Sender { + let (tx, _) = broadcast::channel::(64); + let key = composite_key(story_id, agent_name); + let mut agents = self.agents.lock().unwrap(); + agents.insert( + key, + StoryAgent { + agent_name: agent_name.to_string(), + status, + worktree_info: None, + session_id: None, + tx: tx.clone(), + task_handle: Some(task_handle), + event_log: Arc::new(Mutex::new(Vec::new())), + completion: None, + project_root: None, + log_session_id: None, + merge_failure_reported: false, + }, + ); + tx + } +} diff --git a/server/src/agents/pool/types.rs b/server/src/agents/pool/types.rs new file mode 100644 index 00000000..7d2d8367 --- /dev/null +++ b/server/src/agents/pool/types.rs @@ -0,0 +1,103 @@ +use crate::slog; +use crate::worktree::WorktreeInfo; +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; + +use super::super::{AgentEvent, AgentInfo, AgentStatus, CompletionReport}; + +/// Build the composite key used to track agents in the pool. +pub(super) fn composite_key(story_id: &str, agent_name: &str) -> String { + format!("{story_id}:{agent_name}") +} + +/// RAII guard that removes a pending agent entry from the pool on drop. +/// +/// Created after inserting a `Pending` entry into the agent HashMap. +/// If `start_agent` succeeds (the agent process is spawned and status +/// transitions to `Running`), call [`disarm`](Self::disarm) to prevent +/// cleanup. If any intermediate step fails and the guard is dropped +/// without being disarmed, the pending entry is removed so it cannot +/// block future auto-assign dispatches. +pub(super) struct PendingGuard { + pub(super) agents: Arc>>, + pub(super) key: String, + pub(super) armed: bool, +} + +impl PendingGuard { + pub(super) fn new(agents: Arc>>, key: String) -> Self { + Self { + agents, + key, + armed: true, + } + } + + /// Prevent the guard from cleaning up the entry (call after + /// successful spawn). + pub(super) fn disarm(&mut self) { + self.armed = false; + } +} + +impl Drop for PendingGuard { + fn drop(&mut self) { + if self.armed + && let Ok(mut agents) = self.agents.lock() + && agents + .get(&self.key) + .is_some_and(|a| a.status == AgentStatus::Pending) + { + agents.remove(&self.key); + slog!( + "[agents] Cleaned up leaked Pending entry for '{}'", + self.key + ); + } + } +} + +pub(super) struct StoryAgent { + pub(super) agent_name: String, + pub(super) status: AgentStatus, + pub(super) worktree_info: Option, + pub(super) session_id: Option, + pub(super) tx: broadcast::Sender, + pub(super) task_handle: Option>, + /// Accumulated events for polling via get_agent_output. + pub(super) event_log: Arc>>, + /// Set when the agent calls report_completion. + pub(super) completion: Option, + /// Project root, stored for pipeline advancement after completion. + pub(super) project_root: Option, + /// UUID identifying the log file for this session. + pub(super) log_session_id: Option, + /// Set to `true` when the agent calls `report_merge_failure`. + /// Prevents the pipeline from blindly advancing to `5_done/` after a + /// failed merge: the server-owned gate check runs in the feature-branch + /// worktree (which compiles fine) and returns `gates_passed=true` even + /// though the code was never squash-merged onto master. + pub(super) merge_failure_reported: bool, +} + +/// Build an `AgentInfo` snapshot from a `StoryAgent` map entry. +pub(super) fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> AgentInfo { + AgentInfo { + story_id: story_id.to_string(), + agent_name: agent.agent_name.clone(), + status: agent.status.clone(), + session_id: agent.session_id.clone(), + worktree_path: agent + .worktree_info + .as_ref() + .map(|wt| wt.path.to_string_lossy().to_string()), + base_branch: agent + .worktree_info + .as_ref() + .map(|wt| wt.base_branch.clone()), + completion: agent.completion.clone(), + log_session_id: agent.log_session_id.clone(), + } +} diff --git a/server/src/agents/pool/worktree.rs b/server/src/agents/pool/worktree.rs new file mode 100644 index 00000000..c118481f --- /dev/null +++ b/server/src/agents/pool/worktree.rs @@ -0,0 +1,91 @@ +use crate::config::ProjectConfig; +use std::path::{Path, PathBuf}; + +use super::AgentPool; + +impl AgentPool { + /// Create a worktree for the given story using the server port (writes .mcp.json). + pub async fn create_worktree( + &self, + project_root: &Path, + story_id: &str, + ) -> Result { + let config = ProjectConfig::load(project_root)?; + crate::worktree::create_worktree(project_root, story_id, &config, self.port).await + } + + /// Get project root helper. + pub fn get_project_root(&self, state: &crate::state::SessionState) -> Result { + state.get_project_root() + } +} + +/// Return the active pipeline stage directory name for `story_id`, or `None` if the +/// story is not in any active stage (`2_current/`, `3_qa/`, `4_merge/`). +pub(super) fn find_active_story_stage(project_root: &Path, story_id: &str) -> Option<&'static str> { + const STAGES: [&str; 3] = ["2_current", "3_qa", "4_merge"]; + for stage in &STAGES { + let path = project_root + .join(".storkit") + .join("work") + .join(stage) + .join(format!("{story_id}.md")); + if path.exists() { + return Some(stage); + } + } + None +} + +#[cfg(test)] +mod tests { + use super::find_active_story_stage; + + #[test] + fn find_active_story_stage_detects_current() { + use std::fs; + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + let current = root.join(".storkit/work/2_current"); + fs::create_dir_all(¤t).unwrap(); + fs::write(current.join("10_story_test.md"), "test").unwrap(); + + assert_eq!( + find_active_story_stage(root, "10_story_test"), + Some("2_current") + ); + } + + #[test] + fn find_active_story_stage_detects_qa() { + use std::fs; + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + let qa = root.join(".storkit/work/3_qa"); + fs::create_dir_all(&qa).unwrap(); + fs::write(qa.join("11_story_test.md"), "test").unwrap(); + + assert_eq!(find_active_story_stage(root, "11_story_test"), Some("3_qa")); + } + + #[test] + fn find_active_story_stage_detects_merge() { + use std::fs; + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + let merge = root.join(".storkit/work/4_merge"); + fs::create_dir_all(&merge).unwrap(); + fs::write(merge.join("12_story_test.md"), "test").unwrap(); + + assert_eq!( + find_active_story_stage(root, "12_story_test"), + Some("4_merge") + ); + } + + #[test] + fn find_active_story_stage_returns_none_for_unknown_story() { + let tmp = tempfile::tempdir().unwrap(); + assert_eq!(find_active_story_stage(tmp.path(), "99_nonexistent"), None); + } +}