use crate::config::ProjectConfig; use crate::worktree::{self, WorktreeInfo}; use portable_pty::{CommandBuilder, PtySize, native_pty_system}; use serde::Serialize; use std::collections::HashMap; use std::io::{BufRead, BufReader}; use std::path::{Path, PathBuf}; use std::process::Command; use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; /// Build the composite key used to track agents in the pool. fn composite_key(story_id: &str, agent_name: &str) -> String { format!("{story_id}:{agent_name}") } /// Events streamed from a running agent to SSE clients. #[derive(Debug, Clone, Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum AgentEvent { /// Agent status changed. Status { story_id: String, agent_name: String, status: String, }, /// Raw text output from the agent process. Output { story_id: String, agent_name: String, text: String, }, /// Agent produced a JSON event from `--output-format stream-json`. AgentJson { story_id: String, agent_name: String, data: serde_json::Value, }, /// Agent finished. Done { story_id: String, agent_name: String, session_id: Option, }, /// Agent errored. Error { story_id: String, agent_name: String, message: String, }, } #[derive(Debug, Clone, Serialize, PartialEq)] #[serde(rename_all = "snake_case")] pub enum AgentStatus { Pending, Running, Completed, Failed, } impl std::fmt::Display for AgentStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Pending => write!(f, "pending"), Self::Running => write!(f, "running"), Self::Completed => write!(f, "completed"), Self::Failed => write!(f, "failed"), } } } /// Report produced by an agent calling `report_completion`. #[derive(Debug, Serialize, Clone)] pub struct CompletionReport { pub summary: String, pub gates_passed: bool, pub gate_output: String, } #[derive(Debug, Serialize, Clone)] pub struct AgentInfo { pub story_id: String, pub agent_name: String, pub status: AgentStatus, pub session_id: Option, pub worktree_path: Option, pub base_branch: Option, pub completion: Option, } struct StoryAgent { agent_name: String, status: AgentStatus, worktree_info: Option, session_id: Option, tx: broadcast::Sender, task_handle: Option>, /// Accumulated events for polling via get_agent_output. event_log: Arc>>, /// Set when the agent calls report_completion. completion: Option, } /// Build an `AgentInfo` snapshot from a `StoryAgent` map entry. fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> AgentInfo { AgentInfo { story_id: story_id.to_string(), agent_name: agent.agent_name.clone(), status: agent.status.clone(), session_id: agent.session_id.clone(), worktree_path: agent .worktree_info .as_ref() .map(|wt| wt.path.to_string_lossy().to_string()), base_branch: agent .worktree_info .as_ref() .map(|wt| wt.base_branch.clone()), completion: agent.completion.clone(), } } /// Manages concurrent story agents, each in its own worktree. pub struct AgentPool { agents: Arc>>, port: u16, } impl AgentPool { pub fn new(port: u16) -> Self { Self { agents: Arc::new(Mutex::new(HashMap::new())), port, } } /// Start an agent for a story: load config, create worktree, spawn agent. /// If `agent_name` is None, defaults to the first configured agent. pub async fn start_agent( &self, project_root: &Path, story_id: &str, agent_name: Option<&str>, ) -> Result { let config = ProjectConfig::load(project_root)?; // Resolve agent name from config let resolved_name = match agent_name { Some(name) => { config .find_agent(name) .ok_or_else(|| format!("No agent named '{name}' in config"))?; name.to_string() } None => config .default_agent() .ok_or_else(|| "No agents configured".to_string())? .name .clone(), }; let key = composite_key(story_id, &resolved_name); // Check not already running { let agents = self.agents.lock().map_err(|e| e.to_string())?; if let Some(agent) = agents.get(&key) && (agent.status == AgentStatus::Running || agent.status == AgentStatus::Pending) { return Err(format!( "Agent '{resolved_name}' for story '{story_id}' is already {}", agent.status )); } } let (tx, _) = broadcast::channel::(1024); let event_log: Arc>> = Arc::new(Mutex::new(Vec::new())); // Register as pending { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; agents.insert( key.clone(), StoryAgent { agent_name: resolved_name.clone(), status: AgentStatus::Pending, worktree_info: None, session_id: None, tx: tx.clone(), task_handle: None, event_log: event_log.clone(), completion: None, }, ); } let _ = tx.send(AgentEvent::Status { story_id: story_id.to_string(), agent_name: resolved_name.clone(), status: "pending".to_string(), }); // Create worktree let wt_info = worktree::create_worktree(project_root, story_id, &config, self.port).await?; // Update with worktree info { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; if let Some(agent) = agents.get_mut(&key) { agent.worktree_info = Some(wt_info.clone()); } } // Spawn the agent process let wt_path_str = wt_info.path.to_string_lossy().to_string(); let (command, args, prompt) = config.render_agent_args(&wt_path_str, story_id, Some(&resolved_name), Some(&wt_info.base_branch))?; let sid = story_id.to_string(); let aname = resolved_name.clone(); let tx_clone = tx.clone(); let agents_ref = self.agents.clone(); let cwd = wt_path_str.clone(); let key_clone = key.clone(); let log_clone = event_log.clone(); let handle = tokio::spawn(async move { let _ = tx_clone.send(AgentEvent::Status { story_id: sid.clone(), agent_name: aname.clone(), status: "running".to_string(), }); match run_agent_pty_streaming( &sid, &aname, &command, &args, &prompt, &cwd, &tx_clone, &log_clone, ) .await { Ok(session_id) => { if let Ok(mut agents) = agents_ref.lock() && let Some(agent) = agents.get_mut(&key_clone) { agent.status = AgentStatus::Completed; agent.session_id = session_id.clone(); } let _ = tx_clone.send(AgentEvent::Done { story_id: sid.clone(), agent_name: aname.clone(), session_id, }); } Err(e) => { if let Ok(mut agents) = agents_ref.lock() && let Some(agent) = agents.get_mut(&key_clone) { agent.status = AgentStatus::Failed; } let _ = tx_clone.send(AgentEvent::Error { story_id: sid.clone(), agent_name: aname.clone(), message: e, }); } } }); // Update status to running with task handle { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; if let Some(agent) = agents.get_mut(&key) { agent.status = AgentStatus::Running; agent.task_handle = Some(handle); } } Ok(AgentInfo { story_id: story_id.to_string(), agent_name: resolved_name, status: AgentStatus::Running, session_id: None, worktree_path: Some(wt_path_str), base_branch: Some(wt_info.base_branch.clone()), completion: None, }) } /// Stop a running agent. Worktree is preserved for inspection. pub async fn stop_agent( &self, _project_root: &Path, story_id: &str, agent_name: &str, ) -> Result<(), String> { let key = composite_key(story_id, agent_name); let (worktree_info, task_handle, tx) = { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents .get_mut(&key) .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; let wt = agent.worktree_info.clone(); let handle = agent.task_handle.take(); let tx = agent.tx.clone(); agent.status = AgentStatus::Failed; (wt, handle, tx) }; // Abort the task if let Some(handle) = task_handle { handle.abort(); let _ = handle.await; } // Preserve worktree for inspection — don't destroy agent's work on stop. if let Some(ref wt) = worktree_info { eprintln!( "[agents] Worktree preserved for {story_id}:{agent_name}: {}", wt.path.display() ); } let _ = tx.send(AgentEvent::Status { story_id: story_id.to_string(), agent_name: agent_name.to_string(), status: "stopped".to_string(), }); // Remove from map { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; agents.remove(&key); } Ok(()) } /// List all agents with their status. pub fn list_agents(&self) -> Result, String> { let agents = self.agents.lock().map_err(|e| e.to_string())?; Ok(agents .iter() .map(|(key, agent)| { // Extract story_id from composite key "story_id:agent_name" let story_id = key .rsplit_once(':') .map(|(sid, _)| sid.to_string()) .unwrap_or_else(|| key.clone()); agent_info_from_entry(&story_id, agent) }) .collect()) } /// Subscribe to events for a story agent. pub fn subscribe( &self, story_id: &str, agent_name: &str, ) -> Result, String> { let key = composite_key(story_id, agent_name); let agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents .get(&key) .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; Ok(agent.tx.subscribe()) } /// Drain accumulated events for polling. Returns all events since the last drain. pub fn drain_events( &self, story_id: &str, agent_name: &str, ) -> Result, String> { let key = composite_key(story_id, agent_name); let agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents .get(&key) .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; let mut log = agent.event_log.lock().map_err(|e| e.to_string())?; Ok(log.drain(..).collect()) } /// Block until the agent reaches a terminal state (completed, failed, stopped). /// Returns the agent's final `AgentInfo`. /// `timeout_ms` caps how long to wait; returns an error if the deadline passes. pub async fn wait_for_agent( &self, story_id: &str, agent_name: &str, timeout_ms: u64, ) -> Result { // Subscribe before checking status so we don't miss the terminal event // if the agent completes in the window between the two operations. let mut rx = self.subscribe(story_id, agent_name)?; // Return immediately if already in a terminal state. { let agents = self.agents.lock().map_err(|e| e.to_string())?; let key = composite_key(story_id, agent_name); if let Some(agent) = agents.get(&key) && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) { return Ok(agent_info_from_entry(story_id, agent)); } } let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms); loop { let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); if remaining.is_zero() { return Err(format!( "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" )); } match tokio::time::timeout(remaining, rx.recv()).await { Ok(Ok(event)) => { let is_terminal = match &event { AgentEvent::Done { .. } | AgentEvent::Error { .. } => true, AgentEvent::Status { status, .. } if status == "stopped" => true, _ => false, }; if is_terminal { let agents = self.agents.lock().map_err(|e| e.to_string())?; let key = composite_key(story_id, agent_name); return Ok(if let Some(agent) = agents.get(&key) { agent_info_from_entry(story_id, agent) } else { // Agent was removed from map (e.g. stop_agent removes it after // the "stopped" status event is sent). let (status, session_id) = match event { AgentEvent::Done { session_id, .. } => { (AgentStatus::Completed, session_id) } _ => (AgentStatus::Failed, None), }; AgentInfo { story_id: story_id.to_string(), agent_name: agent_name.to_string(), status, session_id, worktree_path: None, base_branch: None, completion: None, } }); } } Ok(Err(broadcast::error::RecvError::Lagged(_))) => { // Missed some buffered events — check current status before resuming. let agents = self.agents.lock().map_err(|e| e.to_string())?; let key = composite_key(story_id, agent_name); if let Some(agent) = agents.get(&key) && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) { return Ok(agent_info_from_entry(story_id, agent)); } // Still running — continue the loop. } Ok(Err(broadcast::error::RecvError::Closed)) => { // Channel closed: no more events will arrive. Return current state. let agents = self.agents.lock().map_err(|e| e.to_string())?; let key = composite_key(story_id, agent_name); if let Some(agent) = agents.get(&key) { return Ok(agent_info_from_entry(story_id, agent)); } return Err(format!( "Agent '{agent_name}' for story '{story_id}' channel closed unexpectedly" )); } Err(_) => { return Err(format!( "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" )); } } } } /// Create a worktree for the given story using the server port (writes .mcp.json). pub async fn create_worktree( &self, project_root: &Path, story_id: &str, ) -> Result { let config = ProjectConfig::load(project_root)?; worktree::create_worktree(project_root, story_id, &config, self.port).await } /// Report that an agent has finished work on a story. /// /// - Rejects with an error if the worktree has uncommitted changes. /// - Runs acceptance gates (cargo clippy + cargo nextest run / cargo test). /// - Stores the `CompletionReport` on the agent record. /// - Transitions status to `Completed` (gates passed) or `Failed` (gates failed). /// - Emits a `Done` event so `wait_for_agent` unblocks. pub async fn report_completion( &self, story_id: &str, agent_name: &str, summary: &str, ) -> Result { let key = composite_key(story_id, agent_name); // Verify agent exists, is Running, and grab its worktree path. let worktree_path = { let agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents .get(&key) .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; if agent.status != AgentStatus::Running { return Err(format!( "Agent '{agent_name}' for story '{story_id}' is not running (status: {}). \ report_completion can only be called by a running agent.", agent.status )); } agent .worktree_info .as_ref() .map(|wt| wt.path.clone()) .ok_or_else(|| { format!( "Agent '{agent_name}' for story '{story_id}' has no worktree. \ Cannot run acceptance gates." ) })? }; let path = worktree_path.clone(); // Run gate checks in a blocking thread to avoid stalling the async runtime. let (gates_passed, gate_output) = tokio::task::spawn_blocking(move || { // Step 1: Reject if worktree is dirty. check_uncommitted_changes(&path)?; // Step 2: Run clippy + tests and return (passed, output). run_acceptance_gates(&path) }) .await .map_err(|e| format!("Gate check task panicked: {e}"))??; let report = CompletionReport { summary: summary.to_string(), gates_passed, gate_output, }; // Store the completion report and advance status. let (tx, session_id) = { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents.get_mut(&key).ok_or_else(|| { format!("Agent '{agent_name}' for story '{story_id}' disappeared during gate check") })?; agent.completion = Some(report.clone()); agent.status = if gates_passed { AgentStatus::Completed } else { AgentStatus::Failed }; (agent.tx.clone(), agent.session_id.clone()) }; // Emit Done so wait_for_agent unblocks. let _ = tx.send(AgentEvent::Done { story_id: story_id.to_string(), agent_name: agent_name.to_string(), session_id, }); Ok(report) } /// Return the port this server is running on. pub fn port(&self) -> u16 { self.port } /// Get project root helper. pub fn get_project_root( &self, state: &crate::state::SessionState, ) -> Result { state.get_project_root() } /// Test helper: inject a pre-built agent entry so unit tests can exercise /// wait/subscribe logic without spawning a real process. #[cfg(test)] pub fn inject_test_agent( &self, story_id: &str, agent_name: &str, status: AgentStatus, ) -> broadcast::Sender { let (tx, _) = broadcast::channel::(64); let key = composite_key(story_id, agent_name); let mut agents = self.agents.lock().unwrap(); agents.insert( key, StoryAgent { agent_name: agent_name.to_string(), status, worktree_info: None, session_id: None, tx: tx.clone(), task_handle: None, event_log: Arc::new(Mutex::new(Vec::new())), completion: None, }, ); tx } /// Test helper: inject an agent with a specific worktree path for testing /// gate-related logic. #[cfg(test)] pub fn inject_test_agent_with_path( &self, story_id: &str, agent_name: &str, status: AgentStatus, worktree_path: PathBuf, ) -> broadcast::Sender { let (tx, _) = broadcast::channel::(64); let key = composite_key(story_id, agent_name); let mut agents = self.agents.lock().unwrap(); agents.insert( key, StoryAgent { agent_name: agent_name.to_string(), status, worktree_info: Some(WorktreeInfo { path: worktree_path, branch: format!("feature/story-{story_id}"), base_branch: "master".to_string(), }), session_id: None, tx: tx.clone(), task_handle: None, event_log: Arc::new(Mutex::new(Vec::new())), completion: None, }, ); tx } } /// Move a story file from current/ to archived/ (human accept action). /// /// * If the story is in current/, it is renamed to archived/. /// * If the story is already in archived/, this is a no-op (idempotent). /// * If the story is not found in current/ or archived/, an error is returned. pub fn move_story_to_archived(project_root: &Path, story_id: &str) -> Result<(), String> { let stories_dir = project_root.join(".story_kit").join("stories"); let current_path = stories_dir.join("current").join(format!("{story_id}.md")); let archived_path = stories_dir.join("archived").join(format!("{story_id}.md")); if archived_path.exists() { // Already archived — idempotent, nothing to do. return Ok(()); } if current_path.exists() { let archived_dir = stories_dir.join("archived"); std::fs::create_dir_all(&archived_dir) .map_err(|e| format!("Failed to create archived stories directory: {e}"))?; std::fs::rename(¤t_path, &archived_path) .map_err(|e| format!("Failed to move story '{story_id}' to archived/: {e}"))?; eprintln!("[lifecycle] Moved story '{story_id}' from current/ to archived/"); return Ok(()); } Err(format!( "Story '{story_id}' not found in current/. Cannot accept story." )) } // ── Acceptance-gate helpers ─────────────────────────────────────────────────── /// Check whether the given directory has any uncommitted git changes. /// Returns `Err` with a descriptive message if there are any. fn check_uncommitted_changes(path: &Path) -> Result<(), String> { let output = Command::new("git") .args(["status", "--porcelain"]) .current_dir(path) .output() .map_err(|e| format!("Failed to run git status: {e}"))?; let stdout = String::from_utf8_lossy(&output.stdout); if !stdout.trim().is_empty() { return Err(format!( "Worktree has uncommitted changes. Commit your work before calling \ report_completion:\n{stdout}" )); } Ok(()) } /// Run `cargo clippy` and `cargo nextest run` (falling back to `cargo test`) in /// the given directory. Returns `(gates_passed, combined_output)`. fn run_acceptance_gates(path: &Path) -> Result<(bool, String), String> { let mut all_output = String::new(); let mut all_passed = true; // ── cargo clippy ────────────────────────────────────────────── let clippy = Command::new("cargo") .args(["clippy", "--all-targets", "--all-features"]) .current_dir(path) .output() .map_err(|e| format!("Failed to run cargo clippy: {e}"))?; all_output.push_str("=== cargo clippy ===\n"); let clippy_stdout = String::from_utf8_lossy(&clippy.stdout); let clippy_stderr = String::from_utf8_lossy(&clippy.stderr); if !clippy_stdout.is_empty() { all_output.push_str(&clippy_stdout); } if !clippy_stderr.is_empty() { all_output.push_str(&clippy_stderr); } all_output.push('\n'); if !clippy.status.success() { all_passed = false; } // ── cargo nextest run (fallback: cargo test) ────────────────── all_output.push_str("=== tests ===\n"); let (test_success, test_out) = match Command::new("cargo") .args(["nextest", "run"]) .current_dir(path) .output() { Ok(o) => { let combined = format!( "{}{}", String::from_utf8_lossy(&o.stdout), String::from_utf8_lossy(&o.stderr) ); (o.status.success(), combined) } Err(_) => { // nextest not available — fall back to cargo test let o = Command::new("cargo") .args(["test"]) .current_dir(path) .output() .map_err(|e| format!("Failed to run cargo test: {e}"))?; let combined = format!( "{}{}", String::from_utf8_lossy(&o.stdout), String::from_utf8_lossy(&o.stderr) ); (o.status.success(), combined) } }; all_output.push_str(&test_out); all_output.push('\n'); if !test_success { all_passed = false; } Ok((all_passed, all_output)) } /// Spawn claude agent in a PTY and stream events through the broadcast channel. #[allow(clippy::too_many_arguments)] async fn run_agent_pty_streaming( story_id: &str, agent_name: &str, command: &str, args: &[String], prompt: &str, cwd: &str, tx: &broadcast::Sender, event_log: &Arc>>, ) -> Result, String> { let sid = story_id.to_string(); let aname = agent_name.to_string(); let cmd = command.to_string(); let args = args.to_vec(); let prompt = prompt.to_string(); let cwd = cwd.to_string(); let tx = tx.clone(); let event_log = event_log.clone(); tokio::task::spawn_blocking(move || { run_agent_pty_blocking(&sid, &aname, &cmd, &args, &prompt, &cwd, &tx, &event_log) }) .await .map_err(|e| format!("Agent task panicked: {e}"))? } /// Helper to send an event to both broadcast and event log. fn emit_event( event: AgentEvent, tx: &broadcast::Sender, event_log: &Mutex>, ) { if let Ok(mut log) = event_log.lock() { log.push(event.clone()); } let _ = tx.send(event); } #[allow(clippy::too_many_arguments)] fn run_agent_pty_blocking( story_id: &str, agent_name: &str, command: &str, args: &[String], prompt: &str, cwd: &str, tx: &broadcast::Sender, event_log: &Mutex>, ) -> Result, String> { let pty_system = native_pty_system(); let pair = pty_system .openpty(PtySize { rows: 50, cols: 200, pixel_width: 0, pixel_height: 0, }) .map_err(|e| format!("Failed to open PTY: {e}"))?; let mut cmd = CommandBuilder::new(command); // -p must come first cmd.arg("-p"); cmd.arg(prompt); // Add configured args (e.g., --directory /path/to/worktree, --model, etc.) for arg in args { cmd.arg(arg); } cmd.arg("--output-format"); cmd.arg("stream-json"); cmd.arg("--verbose"); // Supervised agents don't need interactive permission prompts cmd.arg("--permission-mode"); cmd.arg("bypassPermissions"); cmd.cwd(cwd); cmd.env("NO_COLOR", "1"); // Allow spawning Claude Code from within a Claude Code session cmd.env_remove("CLAUDECODE"); cmd.env_remove("CLAUDE_CODE_ENTRYPOINT"); eprintln!("[agent:{story_id}:{agent_name}] Spawning {command} in {cwd} with args: {args:?}"); let mut child = pair .slave .spawn_command(cmd) .map_err(|e| format!("Failed to spawn agent for {story_id}:{agent_name}: {e}"))?; drop(pair.slave); let reader = pair .master .try_clone_reader() .map_err(|e| format!("Failed to clone PTY reader: {e}"))?; drop(pair.master); let buf_reader = BufReader::new(reader); let mut session_id: Option = None; for line in buf_reader.lines() { let line = match line { Ok(l) => l, Err(_) => break, }; let trimmed = line.trim(); if trimmed.is_empty() { continue; } // Try to parse as JSON let json: serde_json::Value = match serde_json::from_str(trimmed) { Ok(j) => j, Err(_) => { // Non-JSON output (terminal escapes etc.) — send as raw output emit_event( AgentEvent::Output { story_id: story_id.to_string(), agent_name: agent_name.to_string(), text: trimmed.to_string(), }, tx, event_log, ); continue; } }; let event_type = json.get("type").and_then(|t| t.as_str()).unwrap_or(""); match event_type { "system" => { session_id = json .get("session_id") .and_then(|s| s.as_str()) .map(|s| s.to_string()); } "assistant" => { if let Some(message) = json.get("message") && let Some(content) = message.get("content").and_then(|c| c.as_array()) { for block in content { if let Some(text) = block.get("text").and_then(|t| t.as_str()) { emit_event( AgentEvent::Output { story_id: story_id.to_string(), agent_name: agent_name.to_string(), text: text.to_string(), }, tx, event_log, ); } } } } _ => {} } // Forward all JSON events emit_event( AgentEvent::AgentJson { story_id: story_id.to_string(), agent_name: agent_name.to_string(), data: json, }, tx, event_log, ); } let _ = child.kill(); let _ = child.wait(); eprintln!( "[agent:{story_id}:{agent_name}] Done. Session: {:?}", session_id ); Ok(session_id) } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn wait_for_agent_returns_immediately_if_completed() { let pool = AgentPool::new(3001); pool.inject_test_agent("s1", "bot", AgentStatus::Completed); let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap(); assert_eq!(info.status, AgentStatus::Completed); assert_eq!(info.story_id, "s1"); assert_eq!(info.agent_name, "bot"); } #[tokio::test] async fn wait_for_agent_returns_immediately_if_failed() { let pool = AgentPool::new(3001); pool.inject_test_agent("s2", "bot", AgentStatus::Failed); let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap(); assert_eq!(info.status, AgentStatus::Failed); } #[tokio::test] async fn wait_for_agent_completes_on_done_event() { let pool = AgentPool::new(3001); let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running); // Send Done event after a short delay let tx_clone = tx.clone(); tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_millis(50)).await; // Mark status via event; real code also updates the map, but for // this unit test the map entry stays Running — we verify the // wait loop reacts to the event. let _ = tx_clone.send(AgentEvent::Done { story_id: "s3".to_string(), agent_name: "bot".to_string(), session_id: Some("sess-abc".to_string()), }); }); let info = pool.wait_for_agent("s3", "bot", 2000).await.unwrap(); // Status comes from the map entry (Running in this unit test) // — the important thing is that wait_for_agent returned without timing out. assert_eq!(info.story_id, "s3"); } #[tokio::test] async fn wait_for_agent_times_out() { let pool = AgentPool::new(3001); pool.inject_test_agent("s4", "bot", AgentStatus::Running); let result = pool.wait_for_agent("s4", "bot", 50).await; assert!(result.is_err()); let msg = result.unwrap_err(); assert!(msg.contains("Timed out"), "unexpected message: {msg}"); } #[tokio::test] async fn wait_for_agent_errors_for_nonexistent() { let pool = AgentPool::new(3001); let result = pool.wait_for_agent("no_story", "no_bot", 100).await; assert!(result.is_err()); } #[tokio::test] async fn wait_for_agent_completes_on_stopped_status_event() { let pool = AgentPool::new(3001); let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running); let tx_clone = tx.clone(); tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_millis(30)).await; let _ = tx_clone.send(AgentEvent::Status { story_id: "s5".to_string(), agent_name: "bot".to_string(), status: "stopped".to_string(), }); }); let info = pool.wait_for_agent("s5", "bot", 2000).await.unwrap(); assert_eq!(info.story_id, "s5"); } // ── report_completion tests ──────────────────────────────────── #[tokio::test] async fn report_completion_rejects_nonexistent_agent() { let pool = AgentPool::new(3001); let result = pool .report_completion("no_story", "no_bot", "done") .await; assert!(result.is_err()); let msg = result.unwrap_err(); assert!(msg.contains("No agent"), "unexpected: {msg}"); } #[tokio::test] async fn report_completion_rejects_non_running_agent() { let pool = AgentPool::new(3001); pool.inject_test_agent("s6", "bot", AgentStatus::Completed); let result = pool.report_completion("s6", "bot", "done").await; assert!(result.is_err()); let msg = result.unwrap_err(); assert!( msg.contains("not running"), "expected 'not running' in: {msg}" ); } #[tokio::test] async fn report_completion_rejects_dirty_worktree() { use std::fs; use tempfile::tempdir; let tmp = tempdir().unwrap(); let repo = tmp.path(); // Init a real git repo and make an initial commit Command::new("git") .args(["init"]) .current_dir(repo) .output() .unwrap(); Command::new("git") .args(["commit", "--allow-empty", "-m", "init"]) .current_dir(repo) .output() .unwrap(); // Write an uncommitted file fs::write(repo.join("dirty.txt"), "not committed").unwrap(); let pool = AgentPool::new(3001); pool.inject_test_agent_with_path("s7", "bot", AgentStatus::Running, repo.to_path_buf()); let result = pool.report_completion("s7", "bot", "done").await; assert!(result.is_err()); let msg = result.unwrap_err(); assert!( msg.contains("uncommitted"), "expected 'uncommitted' in: {msg}" ); } }