use crate::config::ProjectConfig; use crate::worktree::{self, WorktreeInfo}; use portable_pty::{CommandBuilder, PtySize, native_pty_system}; use serde::Serialize; use std::collections::HashMap; use std::io::{BufRead, BufReader}; use std::path::{Path, PathBuf}; use std::process::Command; use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; /// Build the composite key used to track agents in the pool. fn composite_key(story_id: &str, agent_name: &str) -> String { format!("{story_id}:{agent_name}") } /// Events streamed from a running agent to SSE clients. #[derive(Debug, Clone, Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum AgentEvent { /// Agent status changed. Status { story_id: String, agent_name: String, status: String, }, /// Raw text output from the agent process. Output { story_id: String, agent_name: String, text: String, }, /// Agent produced a JSON event from `--output-format stream-json`. AgentJson { story_id: String, agent_name: String, data: serde_json::Value, }, /// Agent finished. Done { story_id: String, agent_name: String, session_id: Option, }, /// Agent errored. Error { story_id: String, agent_name: String, message: String, }, } #[derive(Debug, Clone, Serialize, PartialEq)] #[serde(rename_all = "snake_case")] pub enum AgentStatus { Pending, Running, Completed, Failed, } impl std::fmt::Display for AgentStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Pending => write!(f, "pending"), Self::Running => write!(f, "running"), Self::Completed => write!(f, "completed"), Self::Failed => write!(f, "failed"), } } } /// Pipeline stages for automatic story advancement. #[derive(Debug, Clone, PartialEq)] pub enum PipelineStage { /// Coding agents (coder-1, coder-2, etc.) Coder, /// QA review agent Qa, /// Mergemaster agent Mergemaster, /// Supervisors and unknown agents — no automatic advancement. Other, } /// Determine the pipeline stage from an agent name. pub fn pipeline_stage(agent_name: &str) -> PipelineStage { match agent_name { "qa" => PipelineStage::Qa, "mergemaster" => PipelineStage::Mergemaster, name if name.starts_with("coder") => PipelineStage::Coder, _ => PipelineStage::Other, } } /// Completion report produced when acceptance gates are run. /// /// Created automatically by the server when an agent process exits normally, /// or via the internal `report_completion` method. #[derive(Debug, Serialize, Clone)] pub struct CompletionReport { pub summary: String, pub gates_passed: bool, pub gate_output: String, } #[derive(Debug, Serialize, Clone)] pub struct AgentInfo { pub story_id: String, pub agent_name: String, pub status: AgentStatus, pub session_id: Option, pub worktree_path: Option, pub base_branch: Option, pub completion: Option, } struct StoryAgent { agent_name: String, status: AgentStatus, worktree_info: Option, session_id: Option, tx: broadcast::Sender, task_handle: Option>, /// Accumulated events for polling via get_agent_output. event_log: Arc>>, /// Set when the agent calls report_completion. completion: Option, /// Project root, stored for pipeline advancement after completion. project_root: Option, } /// Build an `AgentInfo` snapshot from a `StoryAgent` map entry. fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> AgentInfo { AgentInfo { story_id: story_id.to_string(), agent_name: agent.agent_name.clone(), status: agent.status.clone(), session_id: agent.session_id.clone(), worktree_path: agent .worktree_info .as_ref() .map(|wt| wt.path.to_string_lossy().to_string()), base_branch: agent .worktree_info .as_ref() .map(|wt| wt.base_branch.clone()), completion: agent.completion.clone(), } } /// Manages concurrent story agents, each in its own worktree. pub struct AgentPool { agents: Arc>>, port: u16, } impl AgentPool { pub fn new(port: u16) -> Self { Self { agents: Arc::new(Mutex::new(HashMap::new())), port, } } /// Start an agent for a story: load config, create worktree, spawn agent. /// If `agent_name` is None, defaults to the first configured agent. /// If `resume_context` is provided, it is appended to the rendered prompt /// so the agent can pick up from a previous failed attempt. pub async fn start_agent( &self, project_root: &Path, story_id: &str, agent_name: Option<&str>, resume_context: Option<&str>, ) -> Result { let config = ProjectConfig::load(project_root)?; // Resolve agent name from config let resolved_name = match agent_name { Some(name) => { config .find_agent(name) .ok_or_else(|| format!("No agent named '{name}' in config"))?; name.to_string() } None => config .default_agent() .ok_or_else(|| "No agents configured".to_string())? .name .clone(), }; let key = composite_key(story_id, &resolved_name); // Check not already running { let agents = self.agents.lock().map_err(|e| e.to_string())?; if let Some(agent) = agents.get(&key) && (agent.status == AgentStatus::Running || agent.status == AgentStatus::Pending) { return Err(format!( "Agent '{resolved_name}' for story '{story_id}' is already {}", agent.status )); } } let (tx, _) = broadcast::channel::(1024); let event_log: Arc>> = Arc::new(Mutex::new(Vec::new())); // Register as pending { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; agents.insert( key.clone(), StoryAgent { agent_name: resolved_name.clone(), status: AgentStatus::Pending, worktree_info: None, session_id: None, tx: tx.clone(), task_handle: None, event_log: event_log.clone(), completion: None, project_root: Some(project_root.to_path_buf()), }, ); } let _ = tx.send(AgentEvent::Status { story_id: story_id.to_string(), agent_name: resolved_name.clone(), status: "pending".to_string(), }); // Move story from upcoming/ to current/ and auto-commit before creating the worktree. move_story_to_current(project_root, story_id)?; // Create worktree let wt_info = worktree::create_worktree(project_root, story_id, &config, self.port).await?; // Update with worktree info { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; if let Some(agent) = agents.get_mut(&key) { agent.worktree_info = Some(wt_info.clone()); } } // Spawn the agent process let wt_path_str = wt_info.path.to_string_lossy().to_string(); let (command, args, mut prompt) = config.render_agent_args(&wt_path_str, story_id, Some(&resolved_name), Some(&wt_info.base_branch))?; // Append resume context if this is a restart with failure information. if let Some(ctx) = resume_context { prompt.push_str(ctx); } let sid = story_id.to_string(); let aname = resolved_name.clone(); let tx_clone = tx.clone(); let agents_ref = self.agents.clone(); let cwd = wt_path_str.clone(); let key_clone = key.clone(); let log_clone = event_log.clone(); let port_for_task = self.port; let handle = tokio::spawn(async move { let _ = tx_clone.send(AgentEvent::Status { story_id: sid.clone(), agent_name: aname.clone(), status: "running".to_string(), }); match run_agent_pty_streaming( &sid, &aname, &command, &args, &prompt, &cwd, &tx_clone, &log_clone, ) .await { Ok(session_id) => { // Server-owned completion: run acceptance gates automatically // when the agent process exits normally. run_server_owned_completion( &agents_ref, port_for_task, &sid, &aname, session_id, ) .await; } Err(e) => { if let Ok(mut agents) = agents_ref.lock() && let Some(agent) = agents.get_mut(&key_clone) { agent.status = AgentStatus::Failed; } let _ = tx_clone.send(AgentEvent::Error { story_id: sid.clone(), agent_name: aname.clone(), message: e, }); } } }); // Update status to running with task handle { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; if let Some(agent) = agents.get_mut(&key) { agent.status = AgentStatus::Running; agent.task_handle = Some(handle); } } Ok(AgentInfo { story_id: story_id.to_string(), agent_name: resolved_name, status: AgentStatus::Running, session_id: None, worktree_path: Some(wt_path_str), base_branch: Some(wt_info.base_branch.clone()), completion: None, }) } /// Stop a running agent. Worktree is preserved for inspection. pub async fn stop_agent( &self, _project_root: &Path, story_id: &str, agent_name: &str, ) -> Result<(), String> { let key = composite_key(story_id, agent_name); let (worktree_info, task_handle, tx) = { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents .get_mut(&key) .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; let wt = agent.worktree_info.clone(); let handle = agent.task_handle.take(); let tx = agent.tx.clone(); agent.status = AgentStatus::Failed; (wt, handle, tx) }; // Abort the task if let Some(handle) = task_handle { handle.abort(); let _ = handle.await; } // Preserve worktree for inspection — don't destroy agent's work on stop. if let Some(ref wt) = worktree_info { eprintln!( "[agents] Worktree preserved for {story_id}:{agent_name}: {}", wt.path.display() ); } let _ = tx.send(AgentEvent::Status { story_id: story_id.to_string(), agent_name: agent_name.to_string(), status: "stopped".to_string(), }); // Remove from map { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; agents.remove(&key); } Ok(()) } /// List all agents with their status. pub fn list_agents(&self) -> Result, String> { let agents = self.agents.lock().map_err(|e| e.to_string())?; Ok(agents .iter() .map(|(key, agent)| { // Extract story_id from composite key "story_id:agent_name" let story_id = key .rsplit_once(':') .map(|(sid, _)| sid.to_string()) .unwrap_or_else(|| key.clone()); agent_info_from_entry(&story_id, agent) }) .collect()) } /// Subscribe to events for a story agent. pub fn subscribe( &self, story_id: &str, agent_name: &str, ) -> Result, String> { let key = composite_key(story_id, agent_name); let agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents .get(&key) .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; Ok(agent.tx.subscribe()) } /// Drain accumulated events for polling. Returns all events since the last drain. pub fn drain_events( &self, story_id: &str, agent_name: &str, ) -> Result, String> { let key = composite_key(story_id, agent_name); let agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents .get(&key) .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; let mut log = agent.event_log.lock().map_err(|e| e.to_string())?; Ok(log.drain(..).collect()) } /// Block until the agent reaches a terminal state (completed, failed, stopped). /// Returns the agent's final `AgentInfo`. /// `timeout_ms` caps how long to wait; returns an error if the deadline passes. pub async fn wait_for_agent( &self, story_id: &str, agent_name: &str, timeout_ms: u64, ) -> Result { // Subscribe before checking status so we don't miss the terminal event // if the agent completes in the window between the two operations. let mut rx = self.subscribe(story_id, agent_name)?; // Return immediately if already in a terminal state. { let agents = self.agents.lock().map_err(|e| e.to_string())?; let key = composite_key(story_id, agent_name); if let Some(agent) = agents.get(&key) && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) { return Ok(agent_info_from_entry(story_id, agent)); } } let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms); loop { let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); if remaining.is_zero() { return Err(format!( "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" )); } match tokio::time::timeout(remaining, rx.recv()).await { Ok(Ok(event)) => { let is_terminal = match &event { AgentEvent::Done { .. } | AgentEvent::Error { .. } => true, AgentEvent::Status { status, .. } if status == "stopped" => true, _ => false, }; if is_terminal { let agents = self.agents.lock().map_err(|e| e.to_string())?; let key = composite_key(story_id, agent_name); return Ok(if let Some(agent) = agents.get(&key) { agent_info_from_entry(story_id, agent) } else { // Agent was removed from map (e.g. stop_agent removes it after // the "stopped" status event is sent). let (status, session_id) = match event { AgentEvent::Done { session_id, .. } => { (AgentStatus::Completed, session_id) } _ => (AgentStatus::Failed, None), }; AgentInfo { story_id: story_id.to_string(), agent_name: agent_name.to_string(), status, session_id, worktree_path: None, base_branch: None, completion: None, } }); } } Ok(Err(broadcast::error::RecvError::Lagged(_))) => { // Missed some buffered events — check current status before resuming. let agents = self.agents.lock().map_err(|e| e.to_string())?; let key = composite_key(story_id, agent_name); if let Some(agent) = agents.get(&key) && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) { return Ok(agent_info_from_entry(story_id, agent)); } // Still running — continue the loop. } Ok(Err(broadcast::error::RecvError::Closed)) => { // Channel closed: no more events will arrive. Return current state. let agents = self.agents.lock().map_err(|e| e.to_string())?; let key = composite_key(story_id, agent_name); if let Some(agent) = agents.get(&key) { return Ok(agent_info_from_entry(story_id, agent)); } return Err(format!( "Agent '{agent_name}' for story '{story_id}' channel closed unexpectedly" )); } Err(_) => { return Err(format!( "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" )); } } } } /// Create a worktree for the given story using the server port (writes .mcp.json). pub async fn create_worktree( &self, project_root: &Path, story_id: &str, ) -> Result { let config = ProjectConfig::load(project_root)?; worktree::create_worktree(project_root, story_id, &config, self.port).await } /// Advance the pipeline after an agent completes. /// /// Called internally by `report_completion` as a background task. /// Reads the stored completion report and project_root from the agent, /// then drives the next pipeline stage based on the agent's role: /// /// - **Coder** + gates passed → move story to `work/3_qa/`, start `qa` agent. /// - **Coder** + gates failed → restart the same coder agent with failure context. /// - **QA** + gates passed + coverage passed → move story to `work/4_merge/`, start `mergemaster` agent. /// - **QA** + gates passed + coverage failed → restart `qa` with coverage failure context. /// - **QA** + gates failed → restart `qa` with failure context. /// - **Mergemaster** → run `script/test` on master; if pass: archive + cleanup worktree; /// if fail: restart `mergemaster` with failure context. /// - **Other** (supervisor, unknown) → no automatic advancement. async fn run_pipeline_advance_for_completed_agent(&self, story_id: &str, agent_name: &str) { let key = composite_key(story_id, agent_name); let (completion, project_root, worktree_path) = { let agents = match self.agents.lock() { Ok(a) => a, Err(e) => { eprintln!("[pipeline] Failed to lock agents for '{story_id}:{agent_name}': {e}"); return; } }; let agent = match agents.get(&key) { Some(a) => a, None => return, }; let wt_path = agent .worktree_info .as_ref() .map(|wt| wt.path.clone()); (agent.completion.clone(), agent.project_root.clone(), wt_path) }; let completion = match completion { Some(c) => c, None => { eprintln!("[pipeline] No completion report for '{story_id}:{agent_name}'"); return; } }; let project_root = match project_root { Some(p) => p, None => { eprintln!("[pipeline] No project_root for '{story_id}:{agent_name}'"); return; } }; let stage = pipeline_stage(agent_name); match stage { PipelineStage::Other => { // Supervisors and unknown agents do not advance the pipeline. } PipelineStage::Coder => { if completion.gates_passed { eprintln!( "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. Moving to QA." ); if let Err(e) = move_story_to_qa(&project_root, story_id) { eprintln!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"); return; } if let Err(e) = self .start_agent(&project_root, story_id, Some("qa"), None) .await { eprintln!("[pipeline] Failed to start qa agent for '{story_id}': {e}"); } // Coder slot is now free — pick up any other unassigned work in 2_current/. self.auto_assign_available_work(&project_root).await; } else { eprintln!( "[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting." ); let context = format!( "\n\n---\n## Previous Attempt Failed\n\ The acceptance gates failed with the following output:\n{}\n\n\ Please review the failures above, fix the issues, and try again.", completion.gate_output ); if let Err(e) = self .start_agent(&project_root, story_id, Some(agent_name), Some(&context)) .await { eprintln!( "[pipeline] Failed to restart coder '{agent_name}' for '{story_id}': {e}" ); } } } PipelineStage::Qa => { if completion.gates_passed { // Run coverage gate in the QA worktree before advancing to merge. let coverage_path = worktree_path.clone().unwrap_or_else(|| project_root.clone()); let cp = coverage_path.clone(); let coverage_result = tokio::task::spawn_blocking(move || run_coverage_gate(&cp)) .await .unwrap_or_else(|e| { eprintln!("[pipeline] Coverage gate task panicked: {e}"); Ok((false, format!("Coverage gate task panicked: {e}"))) }); let (coverage_passed, coverage_output) = match coverage_result { Ok(pair) => pair, Err(e) => (false, e), }; if coverage_passed { eprintln!( "[pipeline] QA passed gates and coverage for '{story_id}'. Moving to merge." ); if let Err(e) = move_story_to_merge(&project_root, story_id) { eprintln!("[pipeline] Failed to move '{story_id}' to 4_merge/: {e}"); return; } if let Err(e) = self .start_agent(&project_root, story_id, Some("mergemaster"), None) .await { eprintln!("[pipeline] Failed to start mergemaster for '{story_id}': {e}"); } // QA slot is now free — pick up any other unassigned work in 3_qa/. self.auto_assign_available_work(&project_root).await; } else { eprintln!( "[pipeline] QA coverage gate failed for '{story_id}'. Restarting QA." ); let context = format!( "\n\n---\n## Coverage Gate Failed\n\ The coverage gate (script/test_coverage) failed with the following output:\n{}\n\n\ Please improve test coverage until the coverage gate passes.", coverage_output ); if let Err(e) = self .start_agent(&project_root, story_id, Some("qa"), Some(&context)) .await { eprintln!("[pipeline] Failed to restart qa for '{story_id}': {e}"); } } } else { eprintln!( "[pipeline] QA failed gates for '{story_id}'. Restarting." ); let context = format!( "\n\n---\n## Previous QA Attempt Failed\n\ The acceptance gates failed with the following output:\n{}\n\n\ Please re-run and fix the issues.", completion.gate_output ); if let Err(e) = self .start_agent(&project_root, story_id, Some("qa"), Some(&context)) .await { eprintln!("[pipeline] Failed to restart qa for '{story_id}': {e}"); } } } PipelineStage::Mergemaster => { // Run script/test on master (project_root) as the post-merge verification. eprintln!( "[pipeline] Mergemaster completed for '{story_id}'. Running post-merge tests on master." ); let root = project_root.clone(); let test_result = tokio::task::spawn_blocking(move || run_project_tests(&root)) .await .unwrap_or_else(|e| { eprintln!("[pipeline] Post-merge test task panicked: {e}"); Ok((false, format!("Test task panicked: {e}"))) }); let (passed, output) = match test_result { Ok(pair) => pair, Err(e) => (false, e), }; if passed { eprintln!( "[pipeline] Post-merge tests passed for '{story_id}'. Archiving." ); if let Err(e) = move_story_to_archived(&project_root, story_id) { eprintln!("[pipeline] Failed to archive '{story_id}': {e}"); } // Mergemaster slot is now free — pick up any other items in 4_merge/. self.auto_assign_available_work(&project_root).await; // TODO: Re-enable worktree cleanup once we have persistent agent logs. // Removing worktrees destroys evidence needed to debug empty-commit agents. // let config = // crate::config::ProjectConfig::load(&project_root).unwrap_or_default(); // if let Err(e) = // worktree::remove_worktree_by_story_id(&project_root, story_id, &config) // .await // { // eprintln!( // "[pipeline] Failed to remove worktree for '{story_id}': {e}" // ); // } eprintln!( "[pipeline] Story '{story_id}' archived. Worktree preserved for inspection." ); } else { eprintln!( "[pipeline] Post-merge tests failed for '{story_id}'. Restarting mergemaster." ); let context = format!( "\n\n---\n## Post-Merge Test Failed\n\ The tests on master failed with the following output:\n{}\n\n\ Please investigate and resolve the failures, then call merge_agent_work again.", output ); if let Err(e) = self .start_agent(&project_root, story_id, Some("mergemaster"), Some(&context)) .await { eprintln!( "[pipeline] Failed to restart mergemaster for '{story_id}': {e}" ); } } } } } /// Internal: report that an agent has finished work on a story. /// /// **Note:** This is no longer exposed as an MCP tool. The server now /// automatically runs completion gates when an agent process exits /// (see `run_server_owned_completion`). This method is retained for /// backwards compatibility and testing. /// /// - Rejects with an error if the worktree has uncommitted changes. /// - Runs acceptance gates (cargo clippy + cargo nextest run / cargo test). /// - Stores the `CompletionReport` on the agent record. /// - Transitions status to `Completed` (gates passed) or `Failed` (gates failed). /// - Emits a `Done` event so `wait_for_agent` unblocks. #[allow(dead_code)] pub async fn report_completion( &self, story_id: &str, agent_name: &str, summary: &str, ) -> Result { let key = composite_key(story_id, agent_name); // Verify agent exists, is Running, and grab its worktree path. let worktree_path = { let agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents .get(&key) .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; if agent.status != AgentStatus::Running { return Err(format!( "Agent '{agent_name}' for story '{story_id}' is not running (status: {}). \ report_completion can only be called by a running agent.", agent.status )); } agent .worktree_info .as_ref() .map(|wt| wt.path.clone()) .ok_or_else(|| { format!( "Agent '{agent_name}' for story '{story_id}' has no worktree. \ Cannot run acceptance gates." ) })? }; let path = worktree_path.clone(); // Run gate checks in a blocking thread to avoid stalling the async runtime. let (gates_passed, gate_output) = tokio::task::spawn_blocking(move || { // Step 1: Reject if worktree is dirty. check_uncommitted_changes(&path)?; // Step 2: Run clippy + tests and return (passed, output). run_acceptance_gates(&path) }) .await .map_err(|e| format!("Gate check task panicked: {e}"))??; let report = CompletionReport { summary: summary.to_string(), gates_passed, gate_output, }; // Store the completion report and advance status. let (tx, session_id) = { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents.get_mut(&key).ok_or_else(|| { format!("Agent '{agent_name}' for story '{story_id}' disappeared during gate check") })?; agent.completion = Some(report.clone()); agent.status = if gates_passed { AgentStatus::Completed } else { AgentStatus::Failed }; (agent.tx.clone(), agent.session_id.clone()) }; // Emit Done so wait_for_agent unblocks. let _ = tx.send(AgentEvent::Done { story_id: story_id.to_string(), agent_name: agent_name.to_string(), session_id, }); // Advance the pipeline state machine in a background task. // Only advance when the agent completed (not failed) to avoid spurious restarts // from agents that never ran acceptance gates properly. let pool_clone = Self { agents: Arc::clone(&self.agents), port: self.port, }; let sid = story_id.to_string(); let aname = agent_name.to_string(); tokio::spawn(async move { pool_clone .run_pipeline_advance_for_completed_agent(&sid, &aname) .await; }); Ok(report) } /// Run the full mergemaster pipeline for a completed story: /// /// 1. Squash-merge the story's feature branch into the current branch (master). /// 2. If conflicts are found: abort the merge and report them. /// 3. If the merge succeeds: run quality gates (cargo clippy + tests + pnpm). /// 4. If all gates pass: archive the story and clean up the worktree. /// /// Returns a `MergeReport` with full details of what happened. pub async fn merge_agent_work( &self, project_root: &Path, story_id: &str, ) -> Result { let branch = format!("feature/story-{story_id}"); let wt_path = worktree::worktree_path(project_root, story_id); let root = project_root.to_path_buf(); let sid = story_id.to_string(); let br = branch.clone(); // Run blocking operations (git + cargo) off the async runtime. let (merge_success, had_conflicts, conflict_details, merge_output) = tokio::task::spawn_blocking(move || run_squash_merge(&root, &br, &sid)) .await .map_err(|e| format!("Merge task panicked: {e}"))??; if !merge_success { return Ok(MergeReport { story_id: story_id.to_string(), success: false, had_conflicts, conflict_details, gates_passed: false, gate_output: merge_output, worktree_cleaned_up: false, story_archived: false, }); } // Merge succeeded — run quality gates in the project root. let root2 = project_root.to_path_buf(); let (gates_passed, gate_output) = tokio::task::spawn_blocking(move || run_merge_quality_gates(&root2)) .await .map_err(|e| format!("Gate check task panicked: {e}"))??; if !gates_passed { return Ok(MergeReport { story_id: story_id.to_string(), success: true, had_conflicts: false, conflict_details: None, gates_passed: false, gate_output, worktree_cleaned_up: false, story_archived: false, }); } // Gates passed — archive the story. let story_archived = move_story_to_archived(project_root, story_id).is_ok(); // Clean up the worktree if it exists. let worktree_cleaned_up = if wt_path.exists() { let config = crate::config::ProjectConfig::load(project_root) .unwrap_or_default(); worktree::remove_worktree_by_story_id(project_root, story_id, &config) .await .is_ok() } else { false }; Ok(MergeReport { story_id: story_id.to_string(), success: true, had_conflicts: false, conflict_details: None, gates_passed: true, gate_output, worktree_cleaned_up, story_archived, }) } /// Return the port this server is running on. #[allow(dead_code)] pub fn port(&self) -> u16 { self.port } /// Get project root helper. pub fn get_project_root( &self, state: &crate::state::SessionState, ) -> Result { state.get_project_root() } /// Test helper: inject a pre-built agent entry so unit tests can exercise /// wait/subscribe logic without spawning a real process. #[cfg(test)] pub fn inject_test_agent( &self, story_id: &str, agent_name: &str, status: AgentStatus, ) -> broadcast::Sender { let (tx, _) = broadcast::channel::(64); let key = composite_key(story_id, agent_name); let mut agents = self.agents.lock().unwrap(); agents.insert( key, StoryAgent { agent_name: agent_name.to_string(), status, worktree_info: None, session_id: None, tx: tx.clone(), task_handle: None, event_log: Arc::new(Mutex::new(Vec::new())), completion: None, project_root: None, }, ); tx } /// Test helper: inject an agent with a specific worktree path for testing /// gate-related logic. #[cfg(test)] pub fn inject_test_agent_with_path( &self, story_id: &str, agent_name: &str, status: AgentStatus, worktree_path: PathBuf, ) -> broadcast::Sender { let (tx, _) = broadcast::channel::(64); let key = composite_key(story_id, agent_name); let mut agents = self.agents.lock().unwrap(); agents.insert( key, StoryAgent { agent_name: agent_name.to_string(), status, worktree_info: Some(WorktreeInfo { path: worktree_path, branch: format!("feature/story-{story_id}"), base_branch: "master".to_string(), }), session_id: None, tx: tx.clone(), task_handle: None, event_log: Arc::new(Mutex::new(Vec::new())), completion: None, project_root: None, }, ); tx } /// Automatically assign free agents to stories waiting in the active pipeline stages. /// /// Scans `work/2_current/`, `work/3_qa/`, and `work/4_merge/` for items that have no /// active agent and assigns the first free agent of the appropriate role. Items in /// `work/1_upcoming/` are never auto-started. /// /// Respects the configured agent roster: the maximum number of concurrently active agents /// per role is bounded by the count of agents of that role defined in `project.toml`. pub async fn auto_assign_available_work(&self, project_root: &Path) { let config = match ProjectConfig::load(project_root) { Ok(c) => c, Err(e) => { eprintln!("[auto-assign] Failed to load project config: {e}"); return; } }; // Process each active pipeline stage in order. let stages: [(&str, PipelineStage); 3] = [ ("2_current", PipelineStage::Coder), ("3_qa", PipelineStage::Qa), ("4_merge", PipelineStage::Mergemaster), ]; for (stage_dir, stage) in &stages { let items = scan_stage_items(project_root, stage_dir); if items.is_empty() { continue; } for story_id in &items { // Re-acquire the lock on each iteration to see state changes // from previous start_agent calls in the same pass. let (already_assigned, free_agent) = { let agents = match self.agents.lock() { Ok(a) => a, Err(e) => { eprintln!("[auto-assign] Failed to lock agents: {e}"); break; } }; let assigned = is_story_assigned_for_stage(&agents, story_id, stage); let free = if assigned { None } else { find_free_agent_for_stage(&config, &agents, stage) .map(|s| s.to_string()) }; (assigned, free) }; if already_assigned { // Story already has an active agent — skip silently. continue; } match free_agent { Some(agent_name) => { eprintln!( "[auto-assign] Assigning '{agent_name}' to '{story_id}' in {stage_dir}/" ); if let Err(e) = self .start_agent(project_root, story_id, Some(&agent_name), None) .await { eprintln!( "[auto-assign] Failed to start '{agent_name}' for '{story_id}': {e}" ); } } None => { // No free agents of this type — stop scanning this stage. eprintln!( "[auto-assign] All {:?} agents busy; remaining items in {stage_dir}/ will wait.", stage ); break; } } } } } /// Test helper: inject an agent with a completion report and project_root /// for testing pipeline advance logic without spawning real agents. #[cfg(test)] pub fn inject_test_agent_with_completion( &self, story_id: &str, agent_name: &str, status: AgentStatus, project_root: PathBuf, completion: CompletionReport, ) -> broadcast::Sender { let (tx, _) = broadcast::channel::(64); let key = composite_key(story_id, agent_name); let mut agents = self.agents.lock().unwrap(); agents.insert( key, StoryAgent { agent_name: agent_name.to_string(), status, worktree_info: None, session_id: None, tx: tx.clone(), task_handle: None, event_log: Arc::new(Mutex::new(Vec::new())), completion: Some(completion), project_root: Some(project_root), }, ); tx } } /// Scan a work pipeline stage directory and return story IDs, sorted alphabetically. /// Returns an empty `Vec` if the directory does not exist. fn scan_stage_items(project_root: &Path, stage_dir: &str) -> Vec { let dir = project_root .join(".story_kit") .join("work") .join(stage_dir); if !dir.is_dir() { return Vec::new(); } let mut items = Vec::new(); if let Ok(entries) = std::fs::read_dir(&dir) { for entry in entries.flatten() { let path = entry.path(); if path.extension().and_then(|e| e.to_str()) == Some("md") && let Some(stem) = path.file_stem().and_then(|s| s.to_str()) { items.push(stem.to_string()); } } } items.sort(); items } /// Return `true` if `story_id` has any active (pending/running) agent matching `stage`. fn is_story_assigned_for_stage( agents: &HashMap, story_id: &str, stage: &PipelineStage, ) -> bool { agents.iter().any(|(key, agent)| { // Composite key format: "{story_id}:{agent_name}" let key_story_id = key.rsplit_once(':').map(|(sid, _)| sid).unwrap_or(key); key_story_id == story_id && pipeline_stage(&agent.agent_name) == *stage && matches!(agent.status, AgentStatus::Running | AgentStatus::Pending) }) } /// Find the first configured agent for `stage` that has no active (pending/running) assignment. /// Returns `None` if all agents for that stage are busy or none are configured. fn find_free_agent_for_stage<'a>( config: &'a ProjectConfig, agents: &HashMap, stage: &PipelineStage, ) -> Option<&'a str> { for agent_config in &config.agent { if pipeline_stage(&agent_config.name) != *stage { continue; } let is_busy = agents.values().any(|a| { a.agent_name == agent_config.name && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) }); if !is_busy { return Some(&agent_config.name); } } None } /// Server-owned completion: runs acceptance gates when an agent process exits /// normally, and advances the pipeline based on results. /// /// This is a **free function** (not a method on `AgentPool`) to break the /// opaque type cycle that would otherwise arise: `start_agent` → spawned task /// → server-owned completion → pipeline advance → `start_agent`. /// /// If the agent already has a completion report (e.g. from a legacy /// `report_completion` call), this is a no-op to avoid double-running gates. async fn run_server_owned_completion( agents: &Arc>>, port: u16, story_id: &str, agent_name: &str, session_id: Option, ) { let key = composite_key(story_id, agent_name); // Guard: skip if completion was already recorded (legacy path). { let lock = match agents.lock() { Ok(a) => a, Err(_) => return, }; match lock.get(&key) { Some(agent) if agent.completion.is_some() => { eprintln!( "[agents] Completion already recorded for '{story_id}:{agent_name}'; \ skipping server-owned gates." ); return; } Some(_) => {} None => return, } } // Get worktree path for running gates. let worktree_path = { let lock = match agents.lock() { Ok(a) => a, Err(_) => return, }; lock.get(&key) .and_then(|a| a.worktree_info.as_ref().map(|wt| wt.path.clone())) }; // Run acceptance gates. let (gates_passed, gate_output) = if let Some(wt_path) = worktree_path { let path = wt_path; match tokio::task::spawn_blocking(move || { check_uncommitted_changes(&path)?; run_acceptance_gates(&path) }) .await { Ok(Ok(result)) => result, Ok(Err(e)) => (false, e), Err(e) => (false, format!("Gate check task panicked: {e}")), } } else { ( false, "No worktree path available to run acceptance gates".to_string(), ) }; eprintln!( "[agents] Server-owned completion for '{story_id}:{agent_name}': gates_passed={gates_passed}" ); let report = CompletionReport { summary: "Agent process exited normally".to_string(), gates_passed, gate_output, }; // Store completion report and set status. let tx = { let mut lock = match agents.lock() { Ok(a) => a, Err(_) => return, }; let agent = match lock.get_mut(&key) { Some(a) => a, None => return, }; agent.completion = Some(report); agent.session_id = session_id.clone(); agent.status = if gates_passed { AgentStatus::Completed } else { AgentStatus::Failed }; agent.tx.clone() }; // Emit Done so wait_for_agent unblocks. let _ = tx.send(AgentEvent::Done { story_id: story_id.to_string(), agent_name: agent_name.to_string(), session_id, }); // Advance the pipeline state machine in a background task. // Uses a non-async helper to break the opaque type cycle. spawn_pipeline_advance(Arc::clone(agents), port, story_id, agent_name); } /// Spawn pipeline advancement as a background task. /// /// This is a **non-async** function so it does not participate in the opaque /// type cycle between `start_agent` and `run_server_owned_completion`. fn spawn_pipeline_advance( agents: Arc>>, port: u16, story_id: &str, agent_name: &str, ) { let sid = story_id.to_string(); let aname = agent_name.to_string(); tokio::spawn(async move { let pool = AgentPool { agents, port }; pool.run_pipeline_advance_for_completed_agent(&sid, &aname) .await; }); } /// Result of a mergemaster merge operation. #[derive(Debug, Serialize, Clone)] pub struct MergeReport { pub story_id: String, pub success: bool, pub had_conflicts: bool, pub conflict_details: Option, pub gates_passed: bool, pub gate_output: String, pub worktree_cleaned_up: bool, pub story_archived: bool, } /// Determine the work item type from its ID (new naming: `{N}_{type}_{slug}`). /// Returns "bug", "spike", or "story". #[allow(dead_code)] fn item_type_from_id(item_id: &str) -> &'static str { // New format: {digits}_{type}_{slug} let after_num = item_id.trim_start_matches(|c: char| c.is_ascii_digit()); if after_num.starts_with("_bug_") { "bug" } else if after_num.starts_with("_spike_") { "spike" } else { "story" } } /// Return the source directory path for a work item (always work/1_upcoming/). fn item_source_dir(project_root: &Path, _item_id: &str) -> PathBuf { project_root.join(".story_kit").join("work").join("1_upcoming") } /// Return the archive directory path for a work item (always work/5_archived/). fn item_archive_dir(project_root: &Path, _item_id: &str) -> PathBuf { project_root.join(".story_kit").join("work").join("5_archived") } /// Move a work item (story, bug, or spike) from `work/1_upcoming/` to `work/2_current/`. /// /// Idempotent: if the item is already in `2_current/`, returns Ok without committing. /// If the item is not found in `1_upcoming/`, logs a warning and returns Ok. pub fn move_story_to_current(project_root: &Path, story_id: &str) -> Result<(), String> { let sk = project_root.join(".story_kit").join("work"); let current_dir = sk.join("2_current"); let current_path = current_dir.join(format!("{story_id}.md")); if current_path.exists() { // Already in 2_current/ — idempotent, nothing to do. return Ok(()); } let source_dir = item_source_dir(project_root, story_id); let source_path = source_dir.join(format!("{story_id}.md")); if !source_path.exists() { eprintln!( "[lifecycle] Work item '{story_id}' not found in {}; skipping move to 2_current/", source_dir.display() ); return Ok(()); } std::fs::create_dir_all(¤t_dir) .map_err(|e| format!("Failed to create work/2_current/ directory: {e}"))?; std::fs::rename(&source_path, ¤t_path) .map_err(|e| format!("Failed to move '{story_id}' to 2_current/: {e}"))?; eprintln!( "[lifecycle] Moved '{story_id}' from {} to work/2_current/", source_dir.display() ); Ok(()) } /// Move a story from `work/2_current/` to `work/5_archived/` and auto-commit. /// /// * If the story is in `2_current/`, it is moved to `5_archived/` and committed. /// * If the story is in `4_merge/`, it is moved to `5_archived/` and committed. /// * If the story is already in `5_archived/`, this is a no-op (idempotent). /// * If the story is not found in `2_current/`, `4_merge/`, or `5_archived/`, an error is returned. pub fn move_story_to_archived(project_root: &Path, story_id: &str) -> Result<(), String> { let sk = project_root.join(".story_kit").join("work"); let current_path = sk.join("2_current").join(format!("{story_id}.md")); let merge_path = sk.join("4_merge").join(format!("{story_id}.md")); let archived_dir = sk.join("5_archived"); let archived_path = archived_dir.join(format!("{story_id}.md")); if archived_path.exists() { // Already archived — idempotent, nothing to do. return Ok(()); } // Check 2_current/ first, then 4_merge/ let source_path = if current_path.exists() { current_path.clone() } else if merge_path.exists() { merge_path.clone() } else { return Err(format!( "Story '{story_id}' not found in work/2_current/ or work/4_merge/. Cannot accept story." )); }; std::fs::create_dir_all(&archived_dir) .map_err(|e| format!("Failed to create work/5_archived/ directory: {e}"))?; std::fs::rename(&source_path, &archived_path) .map_err(|e| format!("Failed to move story '{story_id}' to 5_archived/: {e}"))?; let from_dir = if source_path == current_path { "work/2_current/" } else { "work/4_merge/" }; eprintln!("[lifecycle] Moved story '{story_id}' from {from_dir} to work/5_archived/"); Ok(()) } /// Move a story/bug from `work/2_current/` or `work/3_qa/` to `work/4_merge/`. /// /// This stages a work item as ready for the mergemaster to pick up and merge into master. /// Idempotent: if already in `4_merge/`, returns Ok without committing. pub fn move_story_to_merge(project_root: &Path, story_id: &str) -> Result<(), String> { let sk = project_root.join(".story_kit").join("work"); let current_path = sk.join("2_current").join(format!("{story_id}.md")); let qa_path = sk.join("3_qa").join(format!("{story_id}.md")); let merge_dir = sk.join("4_merge"); let merge_path = merge_dir.join(format!("{story_id}.md")); if merge_path.exists() { // Already in 4_merge/ — idempotent, nothing to do. return Ok(()); } // Accept from 2_current/ (manual trigger) or 3_qa/ (pipeline advancement from QA stage). let source_path = if current_path.exists() { current_path.clone() } else if qa_path.exists() { qa_path.clone() } else { return Err(format!( "Work item '{story_id}' not found in work/2_current/ or work/3_qa/. Cannot move to 4_merge/." )); }; std::fs::create_dir_all(&merge_dir) .map_err(|e| format!("Failed to create work/4_merge/ directory: {e}"))?; std::fs::rename(&source_path, &merge_path) .map_err(|e| format!("Failed to move '{story_id}' to 4_merge/: {e}"))?; let from_dir = if source_path == current_path { "work/2_current/" } else { "work/3_qa/" }; eprintln!("[lifecycle] Moved '{story_id}' from {from_dir} to work/4_merge/"); Ok(()) } /// Move a story/bug from `work/2_current/` to `work/3_qa/` and auto-commit. /// /// This stages a work item for QA review before merging to master. /// Idempotent: if already in `3_qa/`, returns Ok without committing. pub fn move_story_to_qa(project_root: &Path, story_id: &str) -> Result<(), String> { let sk = project_root.join(".story_kit").join("work"); let current_path = sk.join("2_current").join(format!("{story_id}.md")); let qa_dir = sk.join("3_qa"); let qa_path = qa_dir.join(format!("{story_id}.md")); if qa_path.exists() { // Already in 3_qa/ — idempotent, nothing to do. return Ok(()); } if !current_path.exists() { return Err(format!( "Work item '{story_id}' not found in work/2_current/. Cannot move to 3_qa/." )); } std::fs::create_dir_all(&qa_dir) .map_err(|e| format!("Failed to create work/3_qa/ directory: {e}"))?; std::fs::rename(¤t_path, &qa_path) .map_err(|e| format!("Failed to move '{story_id}' to 3_qa/: {e}"))?; eprintln!("[lifecycle] Moved '{story_id}' from work/2_current/ to work/3_qa/"); Ok(()) } /// Move a bug from `work/2_current/` or `work/1_upcoming/` to `work/5_archived/` and auto-commit. /// /// * If the bug is in `2_current/`, it is moved to `5_archived/` and committed. /// * If the bug is still in `1_upcoming/` (never started), it is moved directly to `5_archived/`. /// * If the bug is already in `5_archived/`, this is a no-op (idempotent). /// * If the bug is not found anywhere, an error is returned. pub fn close_bug_to_archive(project_root: &Path, bug_id: &str) -> Result<(), String> { let sk = project_root.join(".story_kit").join("work"); let current_path = sk.join("2_current").join(format!("{bug_id}.md")); let upcoming_path = sk.join("1_upcoming").join(format!("{bug_id}.md")); let archive_dir = item_archive_dir(project_root, bug_id); let archive_path = archive_dir.join(format!("{bug_id}.md")); if archive_path.exists() { return Ok(()); } let source_path = if current_path.exists() { current_path.clone() } else if upcoming_path.exists() { upcoming_path.clone() } else { return Err(format!( "Bug '{bug_id}' not found in work/2_current/ or work/1_upcoming/. Cannot close bug." )); }; std::fs::create_dir_all(&archive_dir) .map_err(|e| format!("Failed to create work/5_archived/ directory: {e}"))?; std::fs::rename(&source_path, &archive_path) .map_err(|e| format!("Failed to move bug '{bug_id}' to 5_archived/: {e}"))?; eprintln!( "[lifecycle] Closed bug '{bug_id}' → work/5_archived/" ); Ok(()) } // ── Acceptance-gate helpers ─────────────────────────────────────────────────── /// Check whether the given directory has any uncommitted git changes. /// Returns `Err` with a descriptive message if there are any. fn check_uncommitted_changes(path: &Path) -> Result<(), String> { let output = Command::new("git") .args(["status", "--porcelain"]) .current_dir(path) .output() .map_err(|e| format!("Failed to run git status: {e}"))?; let stdout = String::from_utf8_lossy(&output.stdout); if !stdout.trim().is_empty() { return Err(format!( "Worktree has uncommitted changes. Please commit all work before \ the agent exits:\n{stdout}" )); } Ok(()) } /// Run the project's test suite. /// /// Uses `script/test` if present, treating it as the canonical single test entry point. /// Falls back to `cargo nextest run` / `cargo test` when `script/test` is absent. /// Returns `(tests_passed, output)`. fn run_project_tests(path: &Path) -> Result<(bool, String), String> { let script_test = path.join("script").join("test"); if script_test.exists() { let mut output = String::from("=== script/test ===\n"); let result = Command::new(&script_test) .current_dir(path) .output() .map_err(|e| format!("Failed to run script/test: {e}"))?; let out = format!( "{}{}", String::from_utf8_lossy(&result.stdout), String::from_utf8_lossy(&result.stderr) ); output.push_str(&out); output.push('\n'); return Ok((result.status.success(), output)); } // Fallback: cargo nextest run / cargo test let mut output = String::from("=== tests ===\n"); let (success, test_out) = match Command::new("cargo") .args(["nextest", "run"]) .current_dir(path) .output() { Ok(o) => { let combined = format!( "{}{}", String::from_utf8_lossy(&o.stdout), String::from_utf8_lossy(&o.stderr) ); (o.status.success(), combined) } Err(_) => { // nextest not available — fall back to cargo test let o = Command::new("cargo") .args(["test"]) .current_dir(path) .output() .map_err(|e| format!("Failed to run cargo test: {e}"))?; let combined = format!( "{}{}", String::from_utf8_lossy(&o.stdout), String::from_utf8_lossy(&o.stderr) ); (o.status.success(), combined) } }; output.push_str(&test_out); output.push('\n'); Ok((success, output)) } /// Run `cargo clippy` and the project test suite (via `script/test` if present, /// otherwise `cargo nextest run` / `cargo test`) in the given directory. /// Returns `(gates_passed, combined_output)`. fn run_acceptance_gates(path: &Path) -> Result<(bool, String), String> { let mut all_output = String::new(); let mut all_passed = true; // ── cargo clippy ────────────────────────────────────────────── let clippy = Command::new("cargo") .args(["clippy", "--all-targets", "--all-features"]) .current_dir(path) .output() .map_err(|e| format!("Failed to run cargo clippy: {e}"))?; all_output.push_str("=== cargo clippy ===\n"); let clippy_stdout = String::from_utf8_lossy(&clippy.stdout); let clippy_stderr = String::from_utf8_lossy(&clippy.stderr); if !clippy_stdout.is_empty() { all_output.push_str(&clippy_stdout); } if !clippy_stderr.is_empty() { all_output.push_str(&clippy_stderr); } all_output.push('\n'); if !clippy.status.success() { all_passed = false; } // ── tests (script/test if available, else cargo nextest/test) ─ let (test_success, test_out) = run_project_tests(path)?; all_output.push_str(&test_out); if !test_success { all_passed = false; } Ok((all_passed, all_output)) } /// Run `script/test_coverage` in the given directory if the script exists. /// /// Used as a QA gate before advancing a story from `3_qa/` to `4_merge/`. /// Returns `(passed, output)`. If the script does not exist, returns `(true, …)`. fn run_coverage_gate(path: &Path) -> Result<(bool, String), String> { let script = path.join("script").join("test_coverage"); if !script.exists() { return Ok(( true, "script/test_coverage not found; coverage gate skipped.\n".to_string(), )); } let mut output = String::from("=== script/test_coverage ===\n"); let result = Command::new(&script) .current_dir(path) .output() .map_err(|e| format!("Failed to run script/test_coverage: {e}"))?; let combined = format!( "{}{}", String::from_utf8_lossy(&result.stdout), String::from_utf8_lossy(&result.stderr) ); output.push_str(&combined); output.push('\n'); Ok((result.status.success(), output)) } // ── Mergemaster helpers ─────────────────────────────────────────────────────── /// Squash-merge a feature branch into the current branch in the project root. /// /// Returns `(success, had_conflicts, conflict_details, output)`. fn run_squash_merge( project_root: &Path, branch: &str, story_id: &str, ) -> Result<(bool, bool, Option, String), String> { let mut all_output = String::new(); // ── git merge --squash ──────────────────────────────────────── all_output.push_str(&format!("=== git merge --squash {branch} ===\n")); let merge = Command::new("git") .args(["merge", "--squash", branch]) .current_dir(project_root) .output() .map_err(|e| format!("Failed to run git merge: {e}"))?; let merge_stdout = String::from_utf8_lossy(&merge.stdout).to_string(); let merge_stderr = String::from_utf8_lossy(&merge.stderr).to_string(); all_output.push_str(&merge_stdout); all_output.push_str(&merge_stderr); all_output.push('\n'); if !merge.status.success() { // Conflicts detected — abort the merge and report. let conflict_details = format!( "Merge conflicts in branch '{branch}':\n{merge_stdout}{merge_stderr}" ); // Abort the merge to restore clean state. let _ = Command::new("git") .args(["merge", "--abort"]) .current_dir(project_root) .output(); all_output.push_str("=== Merge aborted due to conflicts ===\n"); return Ok((false, true, Some(conflict_details), all_output)); } // ── git commit ───────────────────────────────────────────── all_output.push_str("=== git commit ===\n"); let commit_msg = format!("story-kit: merge {story_id}"); let commit = Command::new("git") .args(["commit", "-m", &commit_msg]) .current_dir(project_root) .output() .map_err(|e| format!("Failed to run git commit: {e}"))?; let commit_stdout = String::from_utf8_lossy(&commit.stdout).to_string(); let commit_stderr = String::from_utf8_lossy(&commit.stderr).to_string(); all_output.push_str(&commit_stdout); all_output.push_str(&commit_stderr); all_output.push('\n'); if !commit.status.success() { // Nothing to commit (e.g. empty diff) — treat as success. if commit_stderr.contains("nothing to commit") || commit_stdout.contains("nothing to commit") { return Ok((true, false, None, all_output)); } return Ok((false, false, None, all_output)); } Ok((true, false, None, all_output)) } /// Run quality gates in the project root after a successful merge. /// /// Runs: cargo clippy, cargo nextest run / cargo test, and pnpm gates if frontend/ exists. /// Returns `(gates_passed, combined_output)`. fn run_merge_quality_gates(project_root: &Path) -> Result<(bool, String), String> { let mut all_output = String::new(); let mut all_passed = true; // ── cargo clippy ────────────────────────────────────────────── let clippy = Command::new("cargo") .args(["clippy", "--all-targets", "--all-features"]) .current_dir(project_root) .output() .map_err(|e| format!("Failed to run cargo clippy: {e}"))?; all_output.push_str("=== cargo clippy ===\n"); let clippy_out = format!( "{}{}", String::from_utf8_lossy(&clippy.stdout), String::from_utf8_lossy(&clippy.stderr) ); all_output.push_str(&clippy_out); all_output.push('\n'); if !clippy.status.success() { all_passed = false; } // ── tests (script/test if available, else cargo nextest/test) ─ let (test_success, test_out) = run_project_tests(project_root)?; all_output.push_str(&test_out); if !test_success { all_passed = false; } // ── pnpm build (if frontend/ directory exists) ──────────────── // pnpm test is handled by script/test when present; only run it here as // a standalone fallback when there is no script/test. let frontend_dir = project_root.join("frontend"); if frontend_dir.exists() { all_output.push_str("=== pnpm build ===\n"); let pnpm_build = Command::new("pnpm") .args(["run", "build"]) .current_dir(&frontend_dir) .output() .map_err(|e| format!("Failed to run pnpm build: {e}"))?; let build_out = format!( "{}{}", String::from_utf8_lossy(&pnpm_build.stdout), String::from_utf8_lossy(&pnpm_build.stderr) ); all_output.push_str(&build_out); all_output.push('\n'); if !pnpm_build.status.success() { all_passed = false; } // Only run pnpm test separately when script/test is absent (it would // already cover frontend tests in that case). let script_test = project_root.join("script").join("test"); if !script_test.exists() { all_output.push_str("=== pnpm test ===\n"); let pnpm_test = Command::new("pnpm") .args(["test", "--run"]) .current_dir(&frontend_dir) .output() .map_err(|e| format!("Failed to run pnpm test: {e}"))?; let pnpm_test_out = format!( "{}{}", String::from_utf8_lossy(&pnpm_test.stdout), String::from_utf8_lossy(&pnpm_test.stderr) ); all_output.push_str(&pnpm_test_out); all_output.push('\n'); if !pnpm_test.status.success() { all_passed = false; } } } Ok((all_passed, all_output)) } /// Spawn claude agent in a PTY and stream events through the broadcast channel. #[allow(clippy::too_many_arguments)] async fn run_agent_pty_streaming( story_id: &str, agent_name: &str, command: &str, args: &[String], prompt: &str, cwd: &str, tx: &broadcast::Sender, event_log: &Arc>>, ) -> Result, String> { let sid = story_id.to_string(); let aname = agent_name.to_string(); let cmd = command.to_string(); let args = args.to_vec(); let prompt = prompt.to_string(); let cwd = cwd.to_string(); let tx = tx.clone(); let event_log = event_log.clone(); tokio::task::spawn_blocking(move || { run_agent_pty_blocking(&sid, &aname, &cmd, &args, &prompt, &cwd, &tx, &event_log) }) .await .map_err(|e| format!("Agent task panicked: {e}"))? } /// Helper to send an event to both broadcast and event log. fn emit_event( event: AgentEvent, tx: &broadcast::Sender, event_log: &Mutex>, ) { if let Ok(mut log) = event_log.lock() { log.push(event.clone()); } let _ = tx.send(event); } #[allow(clippy::too_many_arguments)] fn run_agent_pty_blocking( story_id: &str, agent_name: &str, command: &str, args: &[String], prompt: &str, cwd: &str, tx: &broadcast::Sender, event_log: &Mutex>, ) -> Result, String> { let pty_system = native_pty_system(); let pair = pty_system .openpty(PtySize { rows: 50, cols: 200, pixel_width: 0, pixel_height: 0, }) .map_err(|e| format!("Failed to open PTY: {e}"))?; let mut cmd = CommandBuilder::new(command); // -p must come first cmd.arg("-p"); cmd.arg(prompt); // Add configured args (e.g., --directory /path/to/worktree, --model, etc.) for arg in args { cmd.arg(arg); } cmd.arg("--output-format"); cmd.arg("stream-json"); cmd.arg("--verbose"); // Supervised agents don't need interactive permission prompts cmd.arg("--permission-mode"); cmd.arg("bypassPermissions"); cmd.cwd(cwd); cmd.env("NO_COLOR", "1"); // Allow spawning Claude Code from within a Claude Code session cmd.env_remove("CLAUDECODE"); cmd.env_remove("CLAUDE_CODE_ENTRYPOINT"); eprintln!("[agent:{story_id}:{agent_name}] Spawning {command} in {cwd} with args: {args:?}"); let mut child = pair .slave .spawn_command(cmd) .map_err(|e| format!("Failed to spawn agent for {story_id}:{agent_name}: {e}"))?; drop(pair.slave); let reader = pair .master .try_clone_reader() .map_err(|e| format!("Failed to clone PTY reader: {e}"))?; drop(pair.master); let buf_reader = BufReader::new(reader); let mut session_id: Option = None; for line in buf_reader.lines() { let line = match line { Ok(l) => l, Err(_) => break, }; let trimmed = line.trim(); if trimmed.is_empty() { continue; } // Try to parse as JSON let json: serde_json::Value = match serde_json::from_str(trimmed) { Ok(j) => j, Err(_) => { // Non-JSON output (terminal escapes etc.) — send as raw output emit_event( AgentEvent::Output { story_id: story_id.to_string(), agent_name: agent_name.to_string(), text: trimmed.to_string(), }, tx, event_log, ); continue; } }; let event_type = json.get("type").and_then(|t| t.as_str()).unwrap_or(""); match event_type { "system" => { session_id = json .get("session_id") .and_then(|s| s.as_str()) .map(|s| s.to_string()); } "assistant" => { if let Some(message) = json.get("message") && let Some(content) = message.get("content").and_then(|c| c.as_array()) { for block in content { if let Some(text) = block.get("text").and_then(|t| t.as_str()) { emit_event( AgentEvent::Output { story_id: story_id.to_string(), agent_name: agent_name.to_string(), text: text.to_string(), }, tx, event_log, ); } } } } _ => {} } // Forward all JSON events emit_event( AgentEvent::AgentJson { story_id: story_id.to_string(), agent_name: agent_name.to_string(), data: json, }, tx, event_log, ); } let _ = child.kill(); let _ = child.wait(); eprintln!( "[agent:{story_id}:{agent_name}] Done. Session: {:?}", session_id ); Ok(session_id) } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn wait_for_agent_returns_immediately_if_completed() { let pool = AgentPool::new(3001); pool.inject_test_agent("s1", "bot", AgentStatus::Completed); let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap(); assert_eq!(info.status, AgentStatus::Completed); assert_eq!(info.story_id, "s1"); assert_eq!(info.agent_name, "bot"); } #[tokio::test] async fn wait_for_agent_returns_immediately_if_failed() { let pool = AgentPool::new(3001); pool.inject_test_agent("s2", "bot", AgentStatus::Failed); let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap(); assert_eq!(info.status, AgentStatus::Failed); } #[tokio::test] async fn wait_for_agent_completes_on_done_event() { let pool = AgentPool::new(3001); let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running); // Send Done event after a short delay let tx_clone = tx.clone(); tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_millis(50)).await; // Mark status via event; real code also updates the map, but for // this unit test the map entry stays Running — we verify the // wait loop reacts to the event. let _ = tx_clone.send(AgentEvent::Done { story_id: "s3".to_string(), agent_name: "bot".to_string(), session_id: Some("sess-abc".to_string()), }); }); let info = pool.wait_for_agent("s3", "bot", 2000).await.unwrap(); // Status comes from the map entry (Running in this unit test) // — the important thing is that wait_for_agent returned without timing out. assert_eq!(info.story_id, "s3"); } #[tokio::test] async fn wait_for_agent_times_out() { let pool = AgentPool::new(3001); pool.inject_test_agent("s4", "bot", AgentStatus::Running); let result = pool.wait_for_agent("s4", "bot", 50).await; assert!(result.is_err()); let msg = result.unwrap_err(); assert!(msg.contains("Timed out"), "unexpected message: {msg}"); } #[tokio::test] async fn wait_for_agent_errors_for_nonexistent() { let pool = AgentPool::new(3001); let result = pool.wait_for_agent("no_story", "no_bot", 100).await; assert!(result.is_err()); } #[tokio::test] async fn wait_for_agent_completes_on_stopped_status_event() { let pool = AgentPool::new(3001); let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running); let tx_clone = tx.clone(); tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_millis(30)).await; let _ = tx_clone.send(AgentEvent::Status { story_id: "s5".to_string(), agent_name: "bot".to_string(), status: "stopped".to_string(), }); }); let info = pool.wait_for_agent("s5", "bot", 2000).await.unwrap(); assert_eq!(info.story_id, "s5"); } // ── report_completion tests ──────────────────────────────────── #[tokio::test] async fn report_completion_rejects_nonexistent_agent() { let pool = AgentPool::new(3001); let result = pool .report_completion("no_story", "no_bot", "done") .await; assert!(result.is_err()); let msg = result.unwrap_err(); assert!(msg.contains("No agent"), "unexpected: {msg}"); } #[tokio::test] async fn report_completion_rejects_non_running_agent() { let pool = AgentPool::new(3001); pool.inject_test_agent("s6", "bot", AgentStatus::Completed); let result = pool.report_completion("s6", "bot", "done").await; assert!(result.is_err()); let msg = result.unwrap_err(); assert!( msg.contains("not running"), "expected 'not running' in: {msg}" ); } #[tokio::test] async fn report_completion_rejects_dirty_worktree() { use std::fs; use tempfile::tempdir; let tmp = tempdir().unwrap(); let repo = tmp.path(); // Init a real git repo and make an initial commit Command::new("git") .args(["init"]) .current_dir(repo) .output() .unwrap(); Command::new("git") .args(["commit", "--allow-empty", "-m", "init"]) .current_dir(repo) .output() .unwrap(); // Write an uncommitted file fs::write(repo.join("dirty.txt"), "not committed").unwrap(); let pool = AgentPool::new(3001); pool.inject_test_agent_with_path("s7", "bot", AgentStatus::Running, repo.to_path_buf()); let result = pool.report_completion("s7", "bot", "done").await; assert!(result.is_err()); let msg = result.unwrap_err(); assert!( msg.contains("uncommitted"), "expected 'uncommitted' in: {msg}" ); } // ── server-owned completion tests ─────────────────────────────────────────── #[tokio::test] async fn server_owned_completion_skips_when_already_completed() { let pool = AgentPool::new(3001); let report = CompletionReport { summary: "Already done".to_string(), gates_passed: true, gate_output: String::new(), }; pool.inject_test_agent_with_completion( "s10", "coder-1", AgentStatus::Completed, PathBuf::from("/tmp/nonexistent"), report, ); // Subscribe before calling so we can check if Done event was emitted. let mut rx = pool.subscribe("s10", "coder-1").unwrap(); run_server_owned_completion(&pool.agents, pool.port, "s10", "coder-1", Some("sess-1".to_string())) .await; // Status should remain Completed (unchanged) — no gate re-run. let agents = pool.agents.lock().unwrap(); let key = composite_key("s10", "coder-1"); let agent = agents.get(&key).unwrap(); assert_eq!(agent.status, AgentStatus::Completed); // Summary should still be the original, not overwritten. assert_eq!( agent.completion.as_ref().unwrap().summary, "Already done" ); drop(agents); // No Done event should have been emitted. assert!( rx.try_recv().is_err(), "should not emit Done when completion already exists" ); } #[tokio::test] async fn server_owned_completion_runs_gates_on_clean_worktree() { use tempfile::tempdir; let tmp = tempdir().unwrap(); let repo = tmp.path(); init_git_repo(repo); let pool = AgentPool::new(3001); pool.inject_test_agent_with_path( "s11", "coder-1", AgentStatus::Running, repo.to_path_buf(), ); let mut rx = pool.subscribe("s11", "coder-1").unwrap(); run_server_owned_completion(&pool.agents, pool.port, "s11", "coder-1", Some("sess-2".to_string())) .await; // Completion report should exist (gates were run, though they may fail // because this is not a real Cargo project). let agents = pool.agents.lock().unwrap(); let key = composite_key("s11", "coder-1"); let agent = agents.get(&key).unwrap(); assert!( agent.completion.is_some(), "completion report should be created" ); assert_eq!( agent.completion.as_ref().unwrap().summary, "Agent process exited normally" ); // Session ID should be stored. assert_eq!(agent.session_id, Some("sess-2".to_string())); // Status should be terminal (Completed or Failed depending on gate results). assert!( agent.status == AgentStatus::Completed || agent.status == AgentStatus::Failed, "status should be terminal, got: {:?}", agent.status ); drop(agents); // A Done event should have been emitted. let event = rx.try_recv().expect("should emit Done event"); assert!( matches!(event, AgentEvent::Done { .. }), "expected Done event, got: {event:?}" ); } #[tokio::test] async fn server_owned_completion_fails_on_dirty_worktree() { use std::fs; use tempfile::tempdir; let tmp = tempdir().unwrap(); let repo = tmp.path(); init_git_repo(repo); // Create an uncommitted file. fs::write(repo.join("dirty.txt"), "not committed").unwrap(); let pool = AgentPool::new(3001); pool.inject_test_agent_with_path( "s12", "coder-1", AgentStatus::Running, repo.to_path_buf(), ); run_server_owned_completion(&pool.agents, pool.port, "s12", "coder-1", None) .await; let agents = pool.agents.lock().unwrap(); let key = composite_key("s12", "coder-1"); let agent = agents.get(&key).unwrap(); assert!(agent.completion.is_some()); assert!(!agent.completion.as_ref().unwrap().gates_passed); assert_eq!(agent.status, AgentStatus::Failed); assert!( agent .completion .as_ref() .unwrap() .gate_output .contains("uncommitted"), "gate_output should mention uncommitted changes" ); } #[tokio::test] async fn server_owned_completion_nonexistent_agent_is_noop() { let pool = AgentPool::new(3001); // Should not panic or error — just silently return. run_server_owned_completion(&pool.agents, pool.port, "nonexistent", "bot", None) .await; } // ── move_story_to_current tests ──────────────────────────────────────────── // No git repo needed: the watcher handles commits asynchronously. fn init_git_repo(repo: &std::path::Path) { Command::new("git") .args(["init"]) .current_dir(repo) .output() .unwrap(); Command::new("git") .args(["config", "user.email", "test@test.com"]) .current_dir(repo) .output() .unwrap(); Command::new("git") .args(["config", "user.name", "Test"]) .current_dir(repo) .output() .unwrap(); Command::new("git") .args(["commit", "--allow-empty", "-m", "init"]) .current_dir(repo) .output() .unwrap(); } #[test] fn move_story_to_current_moves_file() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let upcoming = root.join(".story_kit/work/1_upcoming"); let current = root.join(".story_kit/work/2_current"); fs::create_dir_all(&upcoming).unwrap(); fs::create_dir_all(¤t).unwrap(); fs::write(upcoming.join("10_story_foo.md"), "test").unwrap(); move_story_to_current(root, "10_story_foo").unwrap(); assert!(!upcoming.join("10_story_foo.md").exists()); assert!(current.join("10_story_foo.md").exists()); } #[test] fn move_story_to_current_is_idempotent_when_already_current() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let current = root.join(".story_kit/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write(current.join("11_story_foo.md"), "test").unwrap(); move_story_to_current(root, "11_story_foo").unwrap(); assert!(current.join("11_story_foo.md").exists()); } #[test] fn move_story_to_current_noop_when_not_in_upcoming() { let tmp = tempfile::tempdir().unwrap(); assert!(move_story_to_current(tmp.path(), "99_missing").is_ok()); } #[test] fn move_bug_to_current_moves_from_upcoming() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let upcoming = root.join(".story_kit/work/1_upcoming"); let current = root.join(".story_kit/work/2_current"); fs::create_dir_all(&upcoming).unwrap(); fs::create_dir_all(¤t).unwrap(); fs::write(upcoming.join("1_bug_test.md"), "# Bug 1\n").unwrap(); move_story_to_current(root, "1_bug_test").unwrap(); assert!(!upcoming.join("1_bug_test.md").exists()); assert!(current.join("1_bug_test.md").exists()); } #[test] fn close_bug_moves_from_current_to_archive() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let current = root.join(".story_kit/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write(current.join("2_bug_test.md"), "# Bug 2\n").unwrap(); close_bug_to_archive(root, "2_bug_test").unwrap(); assert!(!current.join("2_bug_test.md").exists()); assert!(root.join(".story_kit/work/5_archived/2_bug_test.md").exists()); } #[test] fn close_bug_moves_from_upcoming_when_not_started() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let upcoming = root.join(".story_kit/work/1_upcoming"); fs::create_dir_all(&upcoming).unwrap(); fs::write(upcoming.join("3_bug_test.md"), "# Bug 3\n").unwrap(); close_bug_to_archive(root, "3_bug_test").unwrap(); assert!(!upcoming.join("3_bug_test.md").exists()); assert!(root.join(".story_kit/work/5_archived/3_bug_test.md").exists()); } #[test] fn item_type_from_id_detects_types() { assert_eq!(item_type_from_id("1_bug_test"), "bug"); assert_eq!(item_type_from_id("1_spike_research"), "spike"); assert_eq!(item_type_from_id("50_story_my_story"), "story"); assert_eq!(item_type_from_id("1_story_simple"), "story"); } // ── pipeline_stage tests ────────────────────────────────────────────────── #[test] fn pipeline_stage_detects_coders() { assert_eq!(pipeline_stage("coder-1"), PipelineStage::Coder); assert_eq!(pipeline_stage("coder-2"), PipelineStage::Coder); assert_eq!(pipeline_stage("coder-3"), PipelineStage::Coder); } #[test] fn pipeline_stage_detects_qa() { assert_eq!(pipeline_stage("qa"), PipelineStage::Qa); } #[test] fn pipeline_stage_detects_mergemaster() { assert_eq!(pipeline_stage("mergemaster"), PipelineStage::Mergemaster); } #[test] fn pipeline_stage_supervisor_is_other() { assert_eq!(pipeline_stage("supervisor"), PipelineStage::Other); assert_eq!(pipeline_stage("default"), PipelineStage::Other); assert_eq!(pipeline_stage("unknown"), PipelineStage::Other); } // ── pipeline advance tests ──────────────────────────────────────────────── #[tokio::test] async fn pipeline_advance_coder_gates_pass_moves_story_to_qa() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); // Set up story in 2_current/ let current = root.join(".story_kit/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write(current.join("50_story_test.md"), "test").unwrap(); let pool = AgentPool::new(3001); pool.inject_test_agent_with_completion( "50_story_test", "coder-1", AgentStatus::Completed, root.to_path_buf(), CompletionReport { summary: "done".to_string(), gates_passed: true, gate_output: String::new(), }, ); // Call pipeline advance directly (bypasses background spawn for testing). pool.run_pipeline_advance_for_completed_agent("50_story_test", "coder-1") .await; // Story should have moved to 3_qa/ (start_agent for qa will fail in tests but // the file move happens before that). assert!( root.join(".story_kit/work/3_qa/50_story_test.md").exists(), "story should be in 3_qa/" ); assert!( !current.join("50_story_test.md").exists(), "story should not still be in 2_current/" ); } #[tokio::test] async fn pipeline_advance_qa_gates_pass_moves_story_to_merge() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); // Set up story in 3_qa/ let qa_dir = root.join(".story_kit/work/3_qa"); fs::create_dir_all(&qa_dir).unwrap(); fs::write(qa_dir.join("51_story_test.md"), "test").unwrap(); let pool = AgentPool::new(3001); pool.inject_test_agent_with_completion( "51_story_test", "qa", AgentStatus::Completed, root.to_path_buf(), CompletionReport { summary: "QA done".to_string(), gates_passed: true, gate_output: String::new(), }, ); pool.run_pipeline_advance_for_completed_agent("51_story_test", "qa") .await; // Story should have moved to 4_merge/ assert!( root.join(".story_kit/work/4_merge/51_story_test.md").exists(), "story should be in 4_merge/" ); assert!( !qa_dir.join("51_story_test.md").exists(), "story should not still be in 3_qa/" ); } #[tokio::test] async fn pipeline_advance_supervisor_does_not_advance() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let current = root.join(".story_kit/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write(current.join("52_story_test.md"), "test").unwrap(); let pool = AgentPool::new(3001); pool.inject_test_agent_with_completion( "52_story_test", "supervisor", AgentStatus::Completed, root.to_path_buf(), CompletionReport { summary: "supervised".to_string(), gates_passed: true, gate_output: String::new(), }, ); pool.run_pipeline_advance_for_completed_agent("52_story_test", "supervisor") .await; // Story should NOT have moved (supervisors don't advance pipeline) assert!( current.join("52_story_test.md").exists(), "story should still be in 2_current/ for supervisor" ); } // ── move_story_to_merge tests ────────────────────────────────────────────── #[test] fn move_story_to_merge_moves_file() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let current = root.join(".story_kit/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write(current.join("20_story_foo.md"), "test").unwrap(); move_story_to_merge(root, "20_story_foo").unwrap(); assert!(!current.join("20_story_foo.md").exists()); assert!(root.join(".story_kit/work/4_merge/20_story_foo.md").exists()); } #[test] fn move_story_to_merge_from_qa_dir() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let qa_dir = root.join(".story_kit/work/3_qa"); fs::create_dir_all(&qa_dir).unwrap(); fs::write(qa_dir.join("40_story_test.md"), "test").unwrap(); move_story_to_merge(root, "40_story_test").unwrap(); assert!(!qa_dir.join("40_story_test.md").exists()); assert!(root.join(".story_kit/work/4_merge/40_story_test.md").exists()); } #[test] fn move_story_to_merge_idempotent_when_already_in_merge() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let merge_dir = root.join(".story_kit/work/4_merge"); fs::create_dir_all(&merge_dir).unwrap(); fs::write(merge_dir.join("21_story_test.md"), "test").unwrap(); move_story_to_merge(root, "21_story_test").unwrap(); assert!(merge_dir.join("21_story_test.md").exists()); } #[test] fn move_story_to_merge_errors_when_not_in_current_or_qa() { let tmp = tempfile::tempdir().unwrap(); let result = move_story_to_merge(tmp.path(), "99_nonexistent"); assert!(result.unwrap_err().contains("not found in work/2_current/ or work/3_qa/")); } // ── move_story_to_qa tests ──────────────────────────────────────────────── #[test] fn move_story_to_qa_moves_file() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let current = root.join(".story_kit/work/2_current"); fs::create_dir_all(¤t).unwrap(); fs::write(current.join("30_story_qa.md"), "test").unwrap(); move_story_to_qa(root, "30_story_qa").unwrap(); assert!(!current.join("30_story_qa.md").exists()); assert!(root.join(".story_kit/work/3_qa/30_story_qa.md").exists()); } #[test] fn move_story_to_qa_idempotent_when_already_in_qa() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let qa_dir = root.join(".story_kit/work/3_qa"); fs::create_dir_all(&qa_dir).unwrap(); fs::write(qa_dir.join("31_story_test.md"), "test").unwrap(); move_story_to_qa(root, "31_story_test").unwrap(); assert!(qa_dir.join("31_story_test.md").exists()); } #[test] fn move_story_to_qa_errors_when_not_in_current() { let tmp = tempfile::tempdir().unwrap(); let result = move_story_to_qa(tmp.path(), "99_nonexistent"); assert!(result.unwrap_err().contains("not found in work/2_current/")); } // ── move_story_to_archived tests ────────────────────────────────────────── #[test] fn move_story_to_archived_finds_in_merge_dir() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let merge_dir = root.join(".story_kit/work/4_merge"); fs::create_dir_all(&merge_dir).unwrap(); fs::write(merge_dir.join("22_story_test.md"), "test").unwrap(); move_story_to_archived(root, "22_story_test").unwrap(); assert!(!merge_dir.join("22_story_test.md").exists()); assert!(root.join(".story_kit/work/5_archived/22_story_test.md").exists()); } #[test] fn move_story_to_archived_error_when_not_in_current_or_merge() { let tmp = tempfile::tempdir().unwrap(); let result = move_story_to_archived(tmp.path(), "99_nonexistent"); assert!(result.unwrap_err().contains("4_merge")); } // ── merge_agent_work tests ──────────────────────────────────────────────── #[tokio::test] async fn merge_agent_work_returns_error_when_branch_not_found() { use tempfile::tempdir; let tmp = tempdir().unwrap(); let repo = tmp.path(); init_git_repo(repo); let pool = AgentPool::new(3001); // branch feature/story-99_nonexistent does not exist let result = pool .merge_agent_work(repo, "99_nonexistent") .await .unwrap(); // Should fail (no branch) — not panic assert!(!result.success, "should fail when branch missing"); } #[tokio::test] async fn merge_agent_work_succeeds_on_clean_branch() { use std::fs; use tempfile::tempdir; let tmp = tempdir().unwrap(); let repo = tmp.path(); init_git_repo(repo); // Create a feature branch with a commit Command::new("git") .args(["checkout", "-b", "feature/story-23_test"]) .current_dir(repo) .output() .unwrap(); fs::write(repo.join("feature.txt"), "feature content").unwrap(); Command::new("git") .args(["add", "."]) .current_dir(repo) .output() .unwrap(); Command::new("git") .args(["commit", "-m", "add feature"]) .current_dir(repo) .output() .unwrap(); // Switch back to master (initial branch) Command::new("git") .args(["checkout", "master"]) .current_dir(repo) .output() .unwrap(); // Create the story file in 4_merge/ so we can test archival let merge_dir = repo.join(".story_kit/work/4_merge"); fs::create_dir_all(&merge_dir).unwrap(); let story_file = merge_dir.join("23_test.md"); fs::write(&story_file, "---\nname: Test\n---\n").unwrap(); Command::new("git") .args(["add", "."]) .current_dir(repo) .output() .unwrap(); Command::new("git") .args(["commit", "-m", "add story in merge"]) .current_dir(repo) .output() .unwrap(); let pool = AgentPool::new(3001); let report = pool.merge_agent_work(repo, "23_test").await.unwrap(); // Merge should succeed (gates will run but cargo/pnpm results will depend on env) // At minimum the merge itself should succeed assert!(!report.had_conflicts, "should have no conflicts"); // Note: gates_passed may be false in test env without Rust project, that's OK // The important thing is the merge itself ran assert!( report.success || report.gate_output.contains("Failed to run") || !report.gates_passed, "report should be coherent: {report:?}" ); // Story should be archived if gates passed if report.story_archived { let archived = repo.join(".story_kit/work/5_archived/23_test.md"); assert!(archived.exists(), "archived file should exist"); } } // ── run_project_tests tests ─────────────────────────────────── #[cfg(unix)] #[test] fn run_project_tests_uses_script_test_when_present_and_passes() { use std::fs; use std::os::unix::fs::PermissionsExt; use tempfile::tempdir; let tmp = tempdir().unwrap(); let path = tmp.path(); let script_dir = path.join("script"); fs::create_dir_all(&script_dir).unwrap(); let script_test = script_dir.join("test"); fs::write(&script_test, "#!/usr/bin/env bash\necho 'all tests passed'\nexit 0\n").unwrap(); let mut perms = fs::metadata(&script_test).unwrap().permissions(); perms.set_mode(0o755); fs::set_permissions(&script_test, perms).unwrap(); let (passed, output) = run_project_tests(path).unwrap(); assert!(passed, "script/test exiting 0 should pass"); assert!(output.contains("script/test"), "output should mention script/test"); } #[cfg(unix)] #[test] fn run_project_tests_reports_failure_when_script_test_exits_nonzero() { use std::fs; use std::os::unix::fs::PermissionsExt; use tempfile::tempdir; let tmp = tempdir().unwrap(); let path = tmp.path(); let script_dir = path.join("script"); fs::create_dir_all(&script_dir).unwrap(); let script_test = script_dir.join("test"); fs::write(&script_test, "#!/usr/bin/env bash\nexit 1\n").unwrap(); let mut perms = fs::metadata(&script_test).unwrap().permissions(); perms.set_mode(0o755); fs::set_permissions(&script_test, perms).unwrap(); let (passed, output) = run_project_tests(path).unwrap(); assert!(!passed, "script/test exiting 1 should fail"); assert!(output.contains("script/test"), "output should mention script/test"); } // ── run_coverage_gate tests ─────────────────────────────────────────────── #[cfg(unix)] #[test] fn coverage_gate_passes_when_script_absent() { use tempfile::tempdir; let tmp = tempdir().unwrap(); let (passed, output) = run_coverage_gate(tmp.path()).unwrap(); assert!(passed, "coverage gate should pass when script is absent"); assert!( output.contains("not found"), "output should mention script not found" ); } #[cfg(unix)] #[test] fn coverage_gate_passes_when_script_exits_zero() { use std::fs; use std::os::unix::fs::PermissionsExt; use tempfile::tempdir; let tmp = tempdir().unwrap(); let path = tmp.path(); let script_dir = path.join("script"); fs::create_dir_all(&script_dir).unwrap(); let script = script_dir.join("test_coverage"); fs::write( &script, "#!/usr/bin/env bash\necho 'Rust line coverage: 85%'\necho 'PASS: Coverage 85% meets threshold 0%'\nexit 0\n", ) .unwrap(); let mut perms = fs::metadata(&script).unwrap().permissions(); perms.set_mode(0o755); fs::set_permissions(&script, perms).unwrap(); let (passed, output) = run_coverage_gate(path).unwrap(); assert!(passed, "coverage gate should pass when script exits 0"); assert!( output.contains("script/test_coverage"), "output should mention script/test_coverage" ); } #[cfg(unix)] #[test] fn coverage_gate_fails_when_script_exits_nonzero() { use std::fs; use std::os::unix::fs::PermissionsExt; use tempfile::tempdir; let tmp = tempdir().unwrap(); let path = tmp.path(); let script_dir = path.join("script"); fs::create_dir_all(&script_dir).unwrap(); let script = script_dir.join("test_coverage"); fs::write( &script, "#!/usr/bin/env bash\necho 'FAIL: Coverage 40% is below threshold 80%'\nexit 1\n", ) .unwrap(); let mut perms = fs::metadata(&script).unwrap().permissions(); perms.set_mode(0o755); fs::set_permissions(&script, perms).unwrap(); let (passed, output) = run_coverage_gate(path).unwrap(); assert!(!passed, "coverage gate should fail when script exits 1"); assert!( output.contains("script/test_coverage"), "output should mention script/test_coverage" ); } // ── auto-assign helper tests ─────────────────────────────────── #[test] fn scan_stage_items_returns_empty_for_missing_dir() { let tmp = tempfile::tempdir().unwrap(); let items = scan_stage_items(tmp.path(), "2_current"); assert!(items.is_empty()); } #[test] fn scan_stage_items_returns_sorted_story_ids() { use std::fs; let tmp = tempfile::tempdir().unwrap(); let stage_dir = tmp.path().join(".story_kit").join("work").join("2_current"); fs::create_dir_all(&stage_dir).unwrap(); fs::write(stage_dir.join("42_story_foo.md"), "---\nname: foo\n---").unwrap(); fs::write(stage_dir.join("10_story_bar.md"), "---\nname: bar\n---").unwrap(); fs::write(stage_dir.join("5_story_baz.md"), "---\nname: baz\n---").unwrap(); // non-md file should be ignored fs::write(stage_dir.join("README.txt"), "ignore me").unwrap(); let items = scan_stage_items(tmp.path(), "2_current"); assert_eq!(items, vec!["10_story_bar", "42_story_foo", "5_story_baz"]); } #[test] fn is_story_assigned_returns_true_for_running_coder() { let pool = AgentPool::new(3001); pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running); let agents = pool.agents.lock().unwrap(); assert!(is_story_assigned_for_stage( &agents, "42_story_foo", &PipelineStage::Coder )); // Same story but wrong stage — should be false assert!(!is_story_assigned_for_stage( &agents, "42_story_foo", &PipelineStage::Qa )); // Different story — should be false assert!(!is_story_assigned_for_stage( &agents, "99_story_other", &PipelineStage::Coder )); } #[test] fn is_story_assigned_returns_false_for_completed_agent() { let pool = AgentPool::new(3001); pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Completed); let agents = pool.agents.lock().unwrap(); // Completed agents don't count as assigned assert!(!is_story_assigned_for_stage( &agents, "42_story_foo", &PipelineStage::Coder )); } #[test] fn find_free_agent_returns_none_when_all_busy() { use crate::config::ProjectConfig; let config = ProjectConfig::parse( r#" [[agent]] name = "coder-1" [[agent]] name = "coder-2" "#, ) .unwrap(); let pool = AgentPool::new(3001); pool.inject_test_agent("s1", "coder-1", AgentStatus::Running); pool.inject_test_agent("s2", "coder-2", AgentStatus::Running); let agents = pool.agents.lock().unwrap(); let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); assert!(free.is_none(), "no free coders should be available"); } #[test] fn find_free_agent_returns_first_free_coder() { use crate::config::ProjectConfig; let config = ProjectConfig::parse( r#" [[agent]] name = "coder-1" [[agent]] name = "coder-2" [[agent]] name = "coder-3" "#, ) .unwrap(); let pool = AgentPool::new(3001); // coder-1 is busy, coder-2 is free pool.inject_test_agent("s1", "coder-1", AgentStatus::Running); let agents = pool.agents.lock().unwrap(); let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); assert_eq!(free, Some("coder-2"), "coder-2 should be the first free coder"); } #[test] fn find_free_agent_ignores_completed_agents() { use crate::config::ProjectConfig; let config = ProjectConfig::parse( r#" [[agent]] name = "coder-1" "#, ) .unwrap(); let pool = AgentPool::new(3001); // coder-1 completed its previous story — it's free for a new one pool.inject_test_agent("s1", "coder-1", AgentStatus::Completed); let agents = pool.agents.lock().unwrap(); let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); assert_eq!(free, Some("coder-1"), "completed coder-1 should be free"); } #[test] fn find_free_agent_returns_none_for_wrong_stage() { use crate::config::ProjectConfig; let config = ProjectConfig::parse( r#" [[agent]] name = "qa" "#, ) .unwrap(); let agents: HashMap = HashMap::new(); // Looking for a Coder but only QA is configured let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder); assert!(free.is_none()); // Looking for QA should find it let free_qa = find_free_agent_for_stage(&config, &agents, &PipelineStage::Qa); assert_eq!(free_qa, Some("qa")); } }