Files
huskies/server/src/agents.rs
T
Dave 1539e52b19 Inject story content into agent prompts so coders know what to build
The worktree doesn't have .story_kit/work/ so agents had no access to
the story requirements. Read the story file from the project root and
prepend it to the prompt. Without this, coders would start, read
CLAUDE.md, have nothing to implement, and exit with no code.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 18:50:41 +00:00

3102 lines
112 KiB
Rust

use crate::config::ProjectConfig;
use crate::worktree::{self, WorktreeInfo};
use portable_pty::{CommandBuilder, PtySize, native_pty_system};
use serde::Serialize;
use std::collections::HashMap;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
/// Build the composite key used to track agents in the pool.
fn composite_key(story_id: &str, agent_name: &str) -> String {
format!("{story_id}:{agent_name}")
}
/// Events streamed from a running agent to SSE clients.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum AgentEvent {
/// Agent status changed.
Status {
story_id: String,
agent_name: String,
status: String,
},
/// Raw text output from the agent process.
Output {
story_id: String,
agent_name: String,
text: String,
},
/// Agent produced a JSON event from `--output-format stream-json`.
AgentJson {
story_id: String,
agent_name: String,
data: serde_json::Value,
},
/// Agent finished.
Done {
story_id: String,
agent_name: String,
session_id: Option<String>,
},
/// Agent errored.
Error {
story_id: String,
agent_name: String,
message: String,
},
}
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum AgentStatus {
Pending,
Running,
Completed,
Failed,
}
impl std::fmt::Display for AgentStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pending => write!(f, "pending"),
Self::Running => write!(f, "running"),
Self::Completed => write!(f, "completed"),
Self::Failed => write!(f, "failed"),
}
}
}
/// Pipeline stages for automatic story advancement.
#[derive(Debug, Clone, PartialEq)]
pub enum PipelineStage {
/// Coding agents (coder-1, coder-2, etc.)
Coder,
/// QA review agent
Qa,
/// Mergemaster agent
Mergemaster,
/// Supervisors and unknown agents — no automatic advancement.
Other,
}
/// Determine the pipeline stage from an agent name.
pub fn pipeline_stage(agent_name: &str) -> PipelineStage {
match agent_name {
"qa" => PipelineStage::Qa,
"mergemaster" => PipelineStage::Mergemaster,
name if name.starts_with("coder") => PipelineStage::Coder,
_ => PipelineStage::Other,
}
}
/// Completion report produced when acceptance gates are run.
///
/// Created automatically by the server when an agent process exits normally,
/// or via the internal `report_completion` method.
#[derive(Debug, Serialize, Clone)]
pub struct CompletionReport {
pub summary: String,
pub gates_passed: bool,
pub gate_output: String,
}
#[derive(Debug, Serialize, Clone)]
pub struct AgentInfo {
pub story_id: String,
pub agent_name: String,
pub status: AgentStatus,
pub session_id: Option<String>,
pub worktree_path: Option<String>,
pub base_branch: Option<String>,
pub completion: Option<CompletionReport>,
}
struct StoryAgent {
agent_name: String,
status: AgentStatus,
worktree_info: Option<WorktreeInfo>,
session_id: Option<String>,
tx: broadcast::Sender<AgentEvent>,
task_handle: Option<tokio::task::JoinHandle<()>>,
/// Accumulated events for polling via get_agent_output.
event_log: Arc<Mutex<Vec<AgentEvent>>>,
/// Set when the agent calls report_completion.
completion: Option<CompletionReport>,
/// Project root, stored for pipeline advancement after completion.
project_root: Option<PathBuf>,
}
/// Build an `AgentInfo` snapshot from a `StoryAgent` map entry.
fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> AgentInfo {
AgentInfo {
story_id: story_id.to_string(),
agent_name: agent.agent_name.clone(),
status: agent.status.clone(),
session_id: agent.session_id.clone(),
worktree_path: agent
.worktree_info
.as_ref()
.map(|wt| wt.path.to_string_lossy().to_string()),
base_branch: agent
.worktree_info
.as_ref()
.map(|wt| wt.base_branch.clone()),
completion: agent.completion.clone(),
}
}
/// Manages concurrent story agents, each in its own worktree.
pub struct AgentPool {
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
port: u16,
}
impl AgentPool {
pub fn new(port: u16) -> Self {
Self {
agents: Arc::new(Mutex::new(HashMap::new())),
port,
}
}
/// Start an agent for a story: load config, create worktree, spawn agent.
/// If `agent_name` is None, defaults to the first configured agent.
/// If `resume_context` is provided, it is appended to the rendered prompt
/// so the agent can pick up from a previous failed attempt.
pub async fn start_agent(
&self,
project_root: &Path,
story_id: &str,
agent_name: Option<&str>,
resume_context: Option<&str>,
) -> Result<AgentInfo, String> {
let config = ProjectConfig::load(project_root)?;
// Resolve agent name from config
let resolved_name = match agent_name {
Some(name) => {
config
.find_agent(name)
.ok_or_else(|| format!("No agent named '{name}' in config"))?;
name.to_string()
}
None => config
.default_agent()
.ok_or_else(|| "No agents configured".to_string())?
.name
.clone(),
};
let key = composite_key(story_id, &resolved_name);
// Check not already running
{
let agents = self.agents.lock().map_err(|e| e.to_string())?;
if let Some(agent) = agents.get(&key)
&& (agent.status == AgentStatus::Running || agent.status == AgentStatus::Pending)
{
return Err(format!(
"Agent '{resolved_name}' for story '{story_id}' is already {}",
agent.status
));
}
}
let (tx, _) = broadcast::channel::<AgentEvent>(1024);
let event_log: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
// Register as pending
{
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
agents.insert(
key.clone(),
StoryAgent {
agent_name: resolved_name.clone(),
status: AgentStatus::Pending,
worktree_info: None,
session_id: None,
tx: tx.clone(),
task_handle: None,
event_log: event_log.clone(),
completion: None,
project_root: Some(project_root.to_path_buf()),
},
);
}
let _ = tx.send(AgentEvent::Status {
story_id: story_id.to_string(),
agent_name: resolved_name.clone(),
status: "pending".to_string(),
});
// Move story from upcoming/ to current/ and auto-commit before creating the worktree.
move_story_to_current(project_root, story_id)?;
// Create worktree
let wt_info = worktree::create_worktree(project_root, story_id, &config, self.port).await?;
// Update with worktree info
{
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
if let Some(agent) = agents.get_mut(&key) {
agent.worktree_info = Some(wt_info.clone());
}
}
// Read story content from the project root (the worktree may not have
// .story_kit/work/) and inject it into the agent prompt so the coder
// knows what to implement.
let story_content = read_story_content(project_root, story_id);
// Spawn the agent process
let wt_path_str = wt_info.path.to_string_lossy().to_string();
let (command, args, mut prompt) =
config.render_agent_args(&wt_path_str, story_id, Some(&resolved_name), Some(&wt_info.base_branch))?;
// Prepend story content so the agent sees the requirements first.
if let Some(content) = story_content {
prompt = format!("## Story Requirements\n\n{content}\n\n---\n\n{prompt}");
}
// Append resume context if this is a restart with failure information.
if let Some(ctx) = resume_context {
prompt.push_str(ctx);
}
let sid = story_id.to_string();
let aname = resolved_name.clone();
let tx_clone = tx.clone();
let agents_ref = self.agents.clone();
let cwd = wt_path_str.clone();
let key_clone = key.clone();
let log_clone = event_log.clone();
let port_for_task = self.port;
let handle = tokio::spawn(async move {
let _ = tx_clone.send(AgentEvent::Status {
story_id: sid.clone(),
agent_name: aname.clone(),
status: "running".to_string(),
});
match run_agent_pty_streaming(
&sid, &aname, &command, &args, &prompt, &cwd, &tx_clone, &log_clone,
)
.await
{
Ok(session_id) => {
// Server-owned completion: run acceptance gates automatically
// when the agent process exits normally.
run_server_owned_completion(
&agents_ref,
port_for_task,
&sid,
&aname,
session_id,
)
.await;
}
Err(e) => {
if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&key_clone)
{
agent.status = AgentStatus::Failed;
}
let _ = tx_clone.send(AgentEvent::Error {
story_id: sid.clone(),
agent_name: aname.clone(),
message: e,
});
}
}
});
// Update status to running with task handle
{
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
if let Some(agent) = agents.get_mut(&key) {
agent.status = AgentStatus::Running;
agent.task_handle = Some(handle);
}
}
Ok(AgentInfo {
story_id: story_id.to_string(),
agent_name: resolved_name,
status: AgentStatus::Running,
session_id: None,
worktree_path: Some(wt_path_str),
base_branch: Some(wt_info.base_branch.clone()),
completion: None,
})
}
/// Stop a running agent. Worktree is preserved for inspection.
pub async fn stop_agent(
&self,
_project_root: &Path,
story_id: &str,
agent_name: &str,
) -> Result<(), String> {
let key = composite_key(story_id, agent_name);
let (worktree_info, task_handle, tx) = {
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get_mut(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
let wt = agent.worktree_info.clone();
let handle = agent.task_handle.take();
let tx = agent.tx.clone();
agent.status = AgentStatus::Failed;
(wt, handle, tx)
};
// Abort the task
if let Some(handle) = task_handle {
handle.abort();
let _ = handle.await;
}
// Preserve worktree for inspection — don't destroy agent's work on stop.
if let Some(ref wt) = worktree_info {
eprintln!(
"[agents] Worktree preserved for {story_id}:{agent_name}: {}",
wt.path.display()
);
}
let _ = tx.send(AgentEvent::Status {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
status: "stopped".to_string(),
});
// Remove from map
{
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
agents.remove(&key);
}
Ok(())
}
/// List all agents with their status.
pub fn list_agents(&self) -> Result<Vec<AgentInfo>, String> {
let agents = self.agents.lock().map_err(|e| e.to_string())?;
Ok(agents
.iter()
.map(|(key, agent)| {
// Extract story_id from composite key "story_id:agent_name"
let story_id = key
.rsplit_once(':')
.map(|(sid, _)| sid.to_string())
.unwrap_or_else(|| key.clone());
agent_info_from_entry(&story_id, agent)
})
.collect())
}
/// Subscribe to events for a story agent.
pub fn subscribe(
&self,
story_id: &str,
agent_name: &str,
) -> Result<broadcast::Receiver<AgentEvent>, String> {
let key = composite_key(story_id, agent_name);
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
Ok(agent.tx.subscribe())
}
/// Drain accumulated events for polling. Returns all events since the last drain.
pub fn drain_events(
&self,
story_id: &str,
agent_name: &str,
) -> Result<Vec<AgentEvent>, String> {
let key = composite_key(story_id, agent_name);
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
let mut log = agent.event_log.lock().map_err(|e| e.to_string())?;
Ok(log.drain(..).collect())
}
/// Block until the agent reaches a terminal state (completed, failed, stopped).
/// Returns the agent's final `AgentInfo`.
/// `timeout_ms` caps how long to wait; returns an error if the deadline passes.
pub async fn wait_for_agent(
&self,
story_id: &str,
agent_name: &str,
timeout_ms: u64,
) -> Result<AgentInfo, String> {
// Subscribe before checking status so we don't miss the terminal event
// if the agent completes in the window between the two operations.
let mut rx = self.subscribe(story_id, agent_name)?;
// Return immediately if already in a terminal state.
{
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let key = composite_key(story_id, agent_name);
if let Some(agent) = agents.get(&key)
&& matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed)
{
return Ok(agent_info_from_entry(story_id, agent));
}
}
let deadline =
tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err(format!(
"Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'"
));
}
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(event)) => {
let is_terminal = match &event {
AgentEvent::Done { .. } | AgentEvent::Error { .. } => true,
AgentEvent::Status { status, .. } if status == "stopped" => true,
_ => false,
};
if is_terminal {
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let key = composite_key(story_id, agent_name);
return Ok(if let Some(agent) = agents.get(&key) {
agent_info_from_entry(story_id, agent)
} else {
// Agent was removed from map (e.g. stop_agent removes it after
// the "stopped" status event is sent).
let (status, session_id) = match event {
AgentEvent::Done { session_id, .. } => {
(AgentStatus::Completed, session_id)
}
_ => (AgentStatus::Failed, None),
};
AgentInfo {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
status,
session_id,
worktree_path: None,
base_branch: None,
completion: None,
}
});
}
}
Ok(Err(broadcast::error::RecvError::Lagged(_))) => {
// Missed some buffered events — check current status before resuming.
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let key = composite_key(story_id, agent_name);
if let Some(agent) = agents.get(&key)
&& matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed)
{
return Ok(agent_info_from_entry(story_id, agent));
}
// Still running — continue the loop.
}
Ok(Err(broadcast::error::RecvError::Closed)) => {
// Channel closed: no more events will arrive. Return current state.
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let key = composite_key(story_id, agent_name);
if let Some(agent) = agents.get(&key) {
return Ok(agent_info_from_entry(story_id, agent));
}
return Err(format!(
"Agent '{agent_name}' for story '{story_id}' channel closed unexpectedly"
));
}
Err(_) => {
return Err(format!(
"Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'"
));
}
}
}
}
/// Create a worktree for the given story using the server port (writes .mcp.json).
pub async fn create_worktree(
&self,
project_root: &Path,
story_id: &str,
) -> Result<worktree::WorktreeInfo, String> {
let config = ProjectConfig::load(project_root)?;
worktree::create_worktree(project_root, story_id, &config, self.port).await
}
/// Advance the pipeline after an agent completes.
///
/// Called internally by `report_completion` as a background task.
/// Reads the stored completion report and project_root from the agent,
/// then drives the next pipeline stage based on the agent's role:
///
/// - **Coder** + gates passed → move story to `work/3_qa/`, start `qa` agent.
/// - **Coder** + gates failed → restart the same coder agent with failure context.
/// - **QA** + gates passed + coverage passed → move story to `work/4_merge/`, start `mergemaster` agent.
/// - **QA** + gates passed + coverage failed → restart `qa` with coverage failure context.
/// - **QA** + gates failed → restart `qa` with failure context.
/// - **Mergemaster** → run `script/test` on master; if pass: archive + cleanup worktree;
/// if fail: restart `mergemaster` with failure context.
/// - **Other** (supervisor, unknown) → no automatic advancement.
async fn run_pipeline_advance_for_completed_agent(&self, story_id: &str, agent_name: &str) {
let key = composite_key(story_id, agent_name);
let (completion, project_root, worktree_path) = {
let agents = match self.agents.lock() {
Ok(a) => a,
Err(e) => {
eprintln!("[pipeline] Failed to lock agents for '{story_id}:{agent_name}': {e}");
return;
}
};
let agent = match agents.get(&key) {
Some(a) => a,
None => return,
};
let wt_path = agent
.worktree_info
.as_ref()
.map(|wt| wt.path.clone());
(agent.completion.clone(), agent.project_root.clone(), wt_path)
};
let completion = match completion {
Some(c) => c,
None => {
eprintln!("[pipeline] No completion report for '{story_id}:{agent_name}'");
return;
}
};
let project_root = match project_root {
Some(p) => p,
None => {
eprintln!("[pipeline] No project_root for '{story_id}:{agent_name}'");
return;
}
};
let stage = pipeline_stage(agent_name);
match stage {
PipelineStage::Other => {
// Supervisors and unknown agents do not advance the pipeline.
}
PipelineStage::Coder => {
if completion.gates_passed {
eprintln!(
"[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. Moving to QA."
);
if let Err(e) = move_story_to_qa(&project_root, story_id) {
eprintln!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}");
return;
}
if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), None)
.await
{
eprintln!("[pipeline] Failed to start qa agent for '{story_id}': {e}");
}
// Coder slot is now free — pick up any other unassigned work in 2_current/.
self.auto_assign_available_work(&project_root).await;
} else {
eprintln!(
"[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting."
);
let context = format!(
"\n\n---\n## Previous Attempt Failed\n\
The acceptance gates failed with the following output:\n{}\n\n\
Please review the failures above, fix the issues, and try again.",
completion.gate_output
);
if let Err(e) = self
.start_agent(&project_root, story_id, Some(agent_name), Some(&context))
.await
{
eprintln!(
"[pipeline] Failed to restart coder '{agent_name}' for '{story_id}': {e}"
);
}
}
}
PipelineStage::Qa => {
if completion.gates_passed {
// Run coverage gate in the QA worktree before advancing to merge.
let coverage_path = worktree_path.clone().unwrap_or_else(|| project_root.clone());
let cp = coverage_path.clone();
let coverage_result =
tokio::task::spawn_blocking(move || run_coverage_gate(&cp))
.await
.unwrap_or_else(|e| {
eprintln!("[pipeline] Coverage gate task panicked: {e}");
Ok((false, format!("Coverage gate task panicked: {e}")))
});
let (coverage_passed, coverage_output) = match coverage_result {
Ok(pair) => pair,
Err(e) => (false, e),
};
if coverage_passed {
eprintln!(
"[pipeline] QA passed gates and coverage for '{story_id}'. Moving to merge."
);
if let Err(e) = move_story_to_merge(&project_root, story_id) {
eprintln!("[pipeline] Failed to move '{story_id}' to 4_merge/: {e}");
return;
}
if let Err(e) = self
.start_agent(&project_root, story_id, Some("mergemaster"), None)
.await
{
eprintln!("[pipeline] Failed to start mergemaster for '{story_id}': {e}");
}
// QA slot is now free — pick up any other unassigned work in 3_qa/.
self.auto_assign_available_work(&project_root).await;
} else {
eprintln!(
"[pipeline] QA coverage gate failed for '{story_id}'. Restarting QA."
);
let context = format!(
"\n\n---\n## Coverage Gate Failed\n\
The coverage gate (script/test_coverage) failed with the following output:\n{}\n\n\
Please improve test coverage until the coverage gate passes.",
coverage_output
);
if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), Some(&context))
.await
{
eprintln!("[pipeline] Failed to restart qa for '{story_id}': {e}");
}
}
} else {
eprintln!(
"[pipeline] QA failed gates for '{story_id}'. Restarting."
);
let context = format!(
"\n\n---\n## Previous QA Attempt Failed\n\
The acceptance gates failed with the following output:\n{}\n\n\
Please re-run and fix the issues.",
completion.gate_output
);
if let Err(e) = self
.start_agent(&project_root, story_id, Some("qa"), Some(&context))
.await
{
eprintln!("[pipeline] Failed to restart qa for '{story_id}': {e}");
}
}
}
PipelineStage::Mergemaster => {
// Run script/test on master (project_root) as the post-merge verification.
eprintln!(
"[pipeline] Mergemaster completed for '{story_id}'. Running post-merge tests on master."
);
let root = project_root.clone();
let test_result = tokio::task::spawn_blocking(move || run_project_tests(&root))
.await
.unwrap_or_else(|e| {
eprintln!("[pipeline] Post-merge test task panicked: {e}");
Ok((false, format!("Test task panicked: {e}")))
});
let (passed, output) = match test_result {
Ok(pair) => pair,
Err(e) => (false, e),
};
if passed {
eprintln!(
"[pipeline] Post-merge tests passed for '{story_id}'. Archiving."
);
if let Err(e) = move_story_to_archived(&project_root, story_id) {
eprintln!("[pipeline] Failed to archive '{story_id}': {e}");
}
// Mergemaster slot is now free — pick up any other items in 4_merge/.
self.auto_assign_available_work(&project_root).await;
// TODO: Re-enable worktree cleanup once we have persistent agent logs.
// Removing worktrees destroys evidence needed to debug empty-commit agents.
// let config =
// crate::config::ProjectConfig::load(&project_root).unwrap_or_default();
// if let Err(e) =
// worktree::remove_worktree_by_story_id(&project_root, story_id, &config)
// .await
// {
// eprintln!(
// "[pipeline] Failed to remove worktree for '{story_id}': {e}"
// );
// }
eprintln!(
"[pipeline] Story '{story_id}' archived. Worktree preserved for inspection."
);
} else {
eprintln!(
"[pipeline] Post-merge tests failed for '{story_id}'. Restarting mergemaster."
);
let context = format!(
"\n\n---\n## Post-Merge Test Failed\n\
The tests on master failed with the following output:\n{}\n\n\
Please investigate and resolve the failures, then call merge_agent_work again.",
output
);
if let Err(e) = self
.start_agent(&project_root, story_id, Some("mergemaster"), Some(&context))
.await
{
eprintln!(
"[pipeline] Failed to restart mergemaster for '{story_id}': {e}"
);
}
}
}
}
}
/// Internal: report that an agent has finished work on a story.
///
/// **Note:** This is no longer exposed as an MCP tool. The server now
/// automatically runs completion gates when an agent process exits
/// (see `run_server_owned_completion`). This method is retained for
/// backwards compatibility and testing.
///
/// - Rejects with an error if the worktree has uncommitted changes.
/// - Runs acceptance gates (cargo clippy + cargo nextest run / cargo test).
/// - Stores the `CompletionReport` on the agent record.
/// - Transitions status to `Completed` (gates passed) or `Failed` (gates failed).
/// - Emits a `Done` event so `wait_for_agent` unblocks.
#[allow(dead_code)]
pub async fn report_completion(
&self,
story_id: &str,
agent_name: &str,
summary: &str,
) -> Result<CompletionReport, String> {
let key = composite_key(story_id, agent_name);
// Verify agent exists, is Running, and grab its worktree path.
let worktree_path = {
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
if agent.status != AgentStatus::Running {
return Err(format!(
"Agent '{agent_name}' for story '{story_id}' is not running (status: {}). \
report_completion can only be called by a running agent.",
agent.status
));
}
agent
.worktree_info
.as_ref()
.map(|wt| wt.path.clone())
.ok_or_else(|| {
format!(
"Agent '{agent_name}' for story '{story_id}' has no worktree. \
Cannot run acceptance gates."
)
})?
};
let path = worktree_path.clone();
// Run gate checks in a blocking thread to avoid stalling the async runtime.
let (gates_passed, gate_output) = tokio::task::spawn_blocking(move || {
// Step 1: Reject if worktree is dirty.
check_uncommitted_changes(&path)?;
// Step 2: Run clippy + tests and return (passed, output).
run_acceptance_gates(&path)
})
.await
.map_err(|e| format!("Gate check task panicked: {e}"))??;
let report = CompletionReport {
summary: summary.to_string(),
gates_passed,
gate_output,
};
// Store the completion report and advance status.
let (tx, session_id) = {
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents.get_mut(&key).ok_or_else(|| {
format!("Agent '{agent_name}' for story '{story_id}' disappeared during gate check")
})?;
agent.completion = Some(report.clone());
agent.status = if gates_passed {
AgentStatus::Completed
} else {
AgentStatus::Failed
};
(agent.tx.clone(), agent.session_id.clone())
};
// Emit Done so wait_for_agent unblocks.
let _ = tx.send(AgentEvent::Done {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
session_id,
});
// Advance the pipeline state machine in a background task.
// Only advance when the agent completed (not failed) to avoid spurious restarts
// from agents that never ran acceptance gates properly.
let pool_clone = Self {
agents: Arc::clone(&self.agents),
port: self.port,
};
let sid = story_id.to_string();
let aname = agent_name.to_string();
tokio::spawn(async move {
pool_clone
.run_pipeline_advance_for_completed_agent(&sid, &aname)
.await;
});
Ok(report)
}
/// Run the full mergemaster pipeline for a completed story:
///
/// 1. Squash-merge the story's feature branch into the current branch (master).
/// 2. If conflicts are found: abort the merge and report them.
/// 3. If the merge succeeds: run quality gates (cargo clippy + tests + pnpm).
/// 4. If all gates pass: archive the story and clean up the worktree.
///
/// Returns a `MergeReport` with full details of what happened.
pub async fn merge_agent_work(
&self,
project_root: &Path,
story_id: &str,
) -> Result<MergeReport, String> {
let branch = format!("feature/story-{story_id}");
let wt_path = worktree::worktree_path(project_root, story_id);
let root = project_root.to_path_buf();
let sid = story_id.to_string();
let br = branch.clone();
// Run blocking operations (git + cargo) off the async runtime.
let (merge_success, had_conflicts, conflict_details, merge_output) =
tokio::task::spawn_blocking(move || run_squash_merge(&root, &br, &sid))
.await
.map_err(|e| format!("Merge task panicked: {e}"))??;
if !merge_success {
return Ok(MergeReport {
story_id: story_id.to_string(),
success: false,
had_conflicts,
conflict_details,
gates_passed: false,
gate_output: merge_output,
worktree_cleaned_up: false,
story_archived: false,
});
}
// Merge succeeded — run quality gates in the project root.
let root2 = project_root.to_path_buf();
let (gates_passed, gate_output) =
tokio::task::spawn_blocking(move || run_merge_quality_gates(&root2))
.await
.map_err(|e| format!("Gate check task panicked: {e}"))??;
if !gates_passed {
return Ok(MergeReport {
story_id: story_id.to_string(),
success: true,
had_conflicts: false,
conflict_details: None,
gates_passed: false,
gate_output,
worktree_cleaned_up: false,
story_archived: false,
});
}
// Gates passed — archive the story.
let story_archived = move_story_to_archived(project_root, story_id).is_ok();
// Clean up the worktree if it exists.
let worktree_cleaned_up = if wt_path.exists() {
let config = crate::config::ProjectConfig::load(project_root)
.unwrap_or_default();
worktree::remove_worktree_by_story_id(project_root, story_id, &config)
.await
.is_ok()
} else {
false
};
Ok(MergeReport {
story_id: story_id.to_string(),
success: true,
had_conflicts: false,
conflict_details: None,
gates_passed: true,
gate_output,
worktree_cleaned_up,
story_archived,
})
}
/// Return the port this server is running on.
#[allow(dead_code)]
pub fn port(&self) -> u16 {
self.port
}
/// Get project root helper.
pub fn get_project_root(
&self,
state: &crate::state::SessionState,
) -> Result<PathBuf, String> {
state.get_project_root()
}
/// Test helper: inject a pre-built agent entry so unit tests can exercise
/// wait/subscribe logic without spawning a real process.
#[cfg(test)]
pub fn inject_test_agent(
&self,
story_id: &str,
agent_name: &str,
status: AgentStatus,
) -> broadcast::Sender<AgentEvent> {
let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap();
agents.insert(
key,
StoryAgent {
agent_name: agent_name.to_string(),
status,
worktree_info: None,
session_id: None,
tx: tx.clone(),
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: None,
project_root: None,
},
);
tx
}
/// Test helper: inject an agent with a specific worktree path for testing
/// gate-related logic.
#[cfg(test)]
pub fn inject_test_agent_with_path(
&self,
story_id: &str,
agent_name: &str,
status: AgentStatus,
worktree_path: PathBuf,
) -> broadcast::Sender<AgentEvent> {
let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap();
agents.insert(
key,
StoryAgent {
agent_name: agent_name.to_string(),
status,
worktree_info: Some(WorktreeInfo {
path: worktree_path,
branch: format!("feature/story-{story_id}"),
base_branch: "master".to_string(),
}),
session_id: None,
tx: tx.clone(),
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: None,
project_root: None,
},
);
tx
}
/// Automatically assign free agents to stories waiting in the active pipeline stages.
///
/// Scans `work/2_current/`, `work/3_qa/`, and `work/4_merge/` for items that have no
/// active agent and assigns the first free agent of the appropriate role. Items in
/// `work/1_upcoming/` are never auto-started.
///
/// Respects the configured agent roster: the maximum number of concurrently active agents
/// per role is bounded by the count of agents of that role defined in `project.toml`.
pub async fn auto_assign_available_work(&self, project_root: &Path) {
let config = match ProjectConfig::load(project_root) {
Ok(c) => c,
Err(e) => {
eprintln!("[auto-assign] Failed to load project config: {e}");
return;
}
};
// Process each active pipeline stage in order.
let stages: [(&str, PipelineStage); 3] = [
("2_current", PipelineStage::Coder),
("3_qa", PipelineStage::Qa),
("4_merge", PipelineStage::Mergemaster),
];
for (stage_dir, stage) in &stages {
let items = scan_stage_items(project_root, stage_dir);
if items.is_empty() {
continue;
}
for story_id in &items {
// Re-acquire the lock on each iteration to see state changes
// from previous start_agent calls in the same pass.
let (already_assigned, free_agent) = {
let agents = match self.agents.lock() {
Ok(a) => a,
Err(e) => {
eprintln!("[auto-assign] Failed to lock agents: {e}");
break;
}
};
let assigned = is_story_assigned_for_stage(&agents, story_id, stage);
let free = if assigned {
None
} else {
find_free_agent_for_stage(&config, &agents, stage)
.map(|s| s.to_string())
};
(assigned, free)
};
if already_assigned {
// Story already has an active agent — skip silently.
continue;
}
match free_agent {
Some(agent_name) => {
eprintln!(
"[auto-assign] Assigning '{agent_name}' to '{story_id}' in {stage_dir}/"
);
if let Err(e) = self
.start_agent(project_root, story_id, Some(&agent_name), None)
.await
{
eprintln!(
"[auto-assign] Failed to start '{agent_name}' for '{story_id}': {e}"
);
}
}
None => {
// No free agents of this type — stop scanning this stage.
eprintln!(
"[auto-assign] All {:?} agents busy; remaining items in {stage_dir}/ will wait.",
stage
);
break;
}
}
}
}
}
/// Test helper: inject an agent with a completion report and project_root
/// for testing pipeline advance logic without spawning real agents.
#[cfg(test)]
pub fn inject_test_agent_with_completion(
&self,
story_id: &str,
agent_name: &str,
status: AgentStatus,
project_root: PathBuf,
completion: CompletionReport,
) -> broadcast::Sender<AgentEvent> {
let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap();
agents.insert(
key,
StoryAgent {
agent_name: agent_name.to_string(),
status,
worktree_info: None,
session_id: None,
tx: tx.clone(),
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: Some(completion),
project_root: Some(project_root),
},
);
tx
}
}
/// Scan a work pipeline stage directory and return story IDs, sorted alphabetically.
/// Returns an empty `Vec` if the directory does not exist.
fn scan_stage_items(project_root: &Path, stage_dir: &str) -> Vec<String> {
let dir = project_root
.join(".story_kit")
.join("work")
.join(stage_dir);
if !dir.is_dir() {
return Vec::new();
}
let mut items = Vec::new();
if let Ok(entries) = std::fs::read_dir(&dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("md")
&& let Some(stem) = path.file_stem().and_then(|s| s.to_str())
{
items.push(stem.to_string());
}
}
}
items.sort();
items
}
/// Return `true` if `story_id` has any active (pending/running) agent matching `stage`.
fn is_story_assigned_for_stage(
agents: &HashMap<String, StoryAgent>,
story_id: &str,
stage: &PipelineStage,
) -> bool {
agents.iter().any(|(key, agent)| {
// Composite key format: "{story_id}:{agent_name}"
let key_story_id = key.rsplit_once(':').map(|(sid, _)| sid).unwrap_or(key);
key_story_id == story_id
&& pipeline_stage(&agent.agent_name) == *stage
&& matches!(agent.status, AgentStatus::Running | AgentStatus::Pending)
})
}
/// Find the first configured agent for `stage` that has no active (pending/running) assignment.
/// Returns `None` if all agents for that stage are busy or none are configured.
fn find_free_agent_for_stage<'a>(
config: &'a ProjectConfig,
agents: &HashMap<String, StoryAgent>,
stage: &PipelineStage,
) -> Option<&'a str> {
for agent_config in &config.agent {
if pipeline_stage(&agent_config.name) != *stage {
continue;
}
let is_busy = agents.values().any(|a| {
a.agent_name == agent_config.name
&& matches!(a.status, AgentStatus::Running | AgentStatus::Pending)
});
if !is_busy {
return Some(&agent_config.name);
}
}
None
}
/// Server-owned completion: runs acceptance gates when an agent process exits
/// normally, and advances the pipeline based on results.
///
/// This is a **free function** (not a method on `AgentPool`) to break the
/// opaque type cycle that would otherwise arise: `start_agent` → spawned task
/// → server-owned completion → pipeline advance → `start_agent`.
///
/// If the agent already has a completion report (e.g. from a legacy
/// `report_completion` call), this is a no-op to avoid double-running gates.
async fn run_server_owned_completion(
agents: &Arc<Mutex<HashMap<String, StoryAgent>>>,
port: u16,
story_id: &str,
agent_name: &str,
session_id: Option<String>,
) {
let key = composite_key(story_id, agent_name);
// Guard: skip if completion was already recorded (legacy path).
{
let lock = match agents.lock() {
Ok(a) => a,
Err(_) => return,
};
match lock.get(&key) {
Some(agent) if agent.completion.is_some() => {
eprintln!(
"[agents] Completion already recorded for '{story_id}:{agent_name}'; \
skipping server-owned gates."
);
return;
}
Some(_) => {}
None => return,
}
}
// Get worktree path for running gates.
let worktree_path = {
let lock = match agents.lock() {
Ok(a) => a,
Err(_) => return,
};
lock.get(&key)
.and_then(|a| a.worktree_info.as_ref().map(|wt| wt.path.clone()))
};
// Run acceptance gates.
let (gates_passed, gate_output) = if let Some(wt_path) = worktree_path {
let path = wt_path;
match tokio::task::spawn_blocking(move || {
check_uncommitted_changes(&path)?;
run_acceptance_gates(&path)
})
.await
{
Ok(Ok(result)) => result,
Ok(Err(e)) => (false, e),
Err(e) => (false, format!("Gate check task panicked: {e}")),
}
} else {
(
false,
"No worktree path available to run acceptance gates".to_string(),
)
};
eprintln!(
"[agents] Server-owned completion for '{story_id}:{agent_name}': gates_passed={gates_passed}"
);
let report = CompletionReport {
summary: "Agent process exited normally".to_string(),
gates_passed,
gate_output,
};
// Store completion report and set status.
let tx = {
let mut lock = match agents.lock() {
Ok(a) => a,
Err(_) => return,
};
let agent = match lock.get_mut(&key) {
Some(a) => a,
None => return,
};
agent.completion = Some(report);
agent.session_id = session_id.clone();
agent.status = if gates_passed {
AgentStatus::Completed
} else {
AgentStatus::Failed
};
agent.tx.clone()
};
// Emit Done so wait_for_agent unblocks.
let _ = tx.send(AgentEvent::Done {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
session_id,
});
// Advance the pipeline state machine in a background task.
// Uses a non-async helper to break the opaque type cycle.
spawn_pipeline_advance(Arc::clone(agents), port, story_id, agent_name);
}
/// Spawn pipeline advancement as a background task.
///
/// This is a **non-async** function so it does not participate in the opaque
/// type cycle between `start_agent` and `run_server_owned_completion`.
fn spawn_pipeline_advance(
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
port: u16,
story_id: &str,
agent_name: &str,
) {
let sid = story_id.to_string();
let aname = agent_name.to_string();
tokio::spawn(async move {
let pool = AgentPool { agents, port };
pool.run_pipeline_advance_for_completed_agent(&sid, &aname)
.await;
});
}
/// Result of a mergemaster merge operation.
#[derive(Debug, Serialize, Clone)]
pub struct MergeReport {
pub story_id: String,
pub success: bool,
pub had_conflicts: bool,
pub conflict_details: Option<String>,
pub gates_passed: bool,
pub gate_output: String,
pub worktree_cleaned_up: bool,
pub story_archived: bool,
}
/// Determine the work item type from its ID (new naming: `{N}_{type}_{slug}`).
/// Returns "bug", "spike", or "story".
#[allow(dead_code)]
fn item_type_from_id(item_id: &str) -> &'static str {
// New format: {digits}_{type}_{slug}
let after_num = item_id.trim_start_matches(|c: char| c.is_ascii_digit());
if after_num.starts_with("_bug_") {
"bug"
} else if after_num.starts_with("_spike_") {
"spike"
} else {
"story"
}
}
/// Return the source directory path for a work item (always work/1_upcoming/).
/// Read story/bug content from any pipeline stage directory.
/// Returns the file contents if found, or None if the file doesn't exist anywhere.
fn read_story_content(project_root: &Path, story_id: &str) -> Option<String> {
let sk = project_root.join(".story_kit").join("work");
let filename = format!("{story_id}.md");
for stage in &["2_current", "1_upcoming", "3_qa", "4_merge"] {
let path = sk.join(stage).join(&filename);
if let Ok(content) = std::fs::read_to_string(&path) {
return Some(content);
}
}
None
}
fn item_source_dir(project_root: &Path, _item_id: &str) -> PathBuf {
project_root.join(".story_kit").join("work").join("1_upcoming")
}
/// Return the archive directory path for a work item (always work/5_archived/).
fn item_archive_dir(project_root: &Path, _item_id: &str) -> PathBuf {
project_root.join(".story_kit").join("work").join("5_archived")
}
/// Move a work item (story, bug, or spike) from `work/1_upcoming/` to `work/2_current/`.
///
/// Idempotent: if the item is already in `2_current/`, returns Ok without committing.
/// If the item is not found in `1_upcoming/`, logs a warning and returns Ok.
pub fn move_story_to_current(project_root: &Path, story_id: &str) -> Result<(), String> {
let sk = project_root.join(".story_kit").join("work");
let current_dir = sk.join("2_current");
let current_path = current_dir.join(format!("{story_id}.md"));
if current_path.exists() {
// Already in 2_current/ — idempotent, nothing to do.
return Ok(());
}
let source_dir = item_source_dir(project_root, story_id);
let source_path = source_dir.join(format!("{story_id}.md"));
if !source_path.exists() {
eprintln!(
"[lifecycle] Work item '{story_id}' not found in {}; skipping move to 2_current/",
source_dir.display()
);
return Ok(());
}
std::fs::create_dir_all(&current_dir)
.map_err(|e| format!("Failed to create work/2_current/ directory: {e}"))?;
std::fs::rename(&source_path, &current_path)
.map_err(|e| format!("Failed to move '{story_id}' to 2_current/: {e}"))?;
eprintln!(
"[lifecycle] Moved '{story_id}' from {} to work/2_current/",
source_dir.display()
);
Ok(())
}
/// Move a story from `work/2_current/` to `work/5_archived/` and auto-commit.
///
/// * If the story is in `2_current/`, it is moved to `5_archived/` and committed.
/// * If the story is in `4_merge/`, it is moved to `5_archived/` and committed.
/// * If the story is already in `5_archived/`, this is a no-op (idempotent).
/// * If the story is not found in `2_current/`, `4_merge/`, or `5_archived/`, an error is returned.
pub fn move_story_to_archived(project_root: &Path, story_id: &str) -> Result<(), String> {
let sk = project_root.join(".story_kit").join("work");
let current_path = sk.join("2_current").join(format!("{story_id}.md"));
let merge_path = sk.join("4_merge").join(format!("{story_id}.md"));
let archived_dir = sk.join("5_archived");
let archived_path = archived_dir.join(format!("{story_id}.md"));
if archived_path.exists() {
// Already archived — idempotent, nothing to do.
return Ok(());
}
// Check 2_current/ first, then 4_merge/
let source_path = if current_path.exists() {
current_path.clone()
} else if merge_path.exists() {
merge_path.clone()
} else {
return Err(format!(
"Story '{story_id}' not found in work/2_current/ or work/4_merge/. Cannot accept story."
));
};
std::fs::create_dir_all(&archived_dir)
.map_err(|e| format!("Failed to create work/5_archived/ directory: {e}"))?;
std::fs::rename(&source_path, &archived_path)
.map_err(|e| format!("Failed to move story '{story_id}' to 5_archived/: {e}"))?;
let from_dir = if source_path == current_path {
"work/2_current/"
} else {
"work/4_merge/"
};
eprintln!("[lifecycle] Moved story '{story_id}' from {from_dir} to work/5_archived/");
Ok(())
}
/// Move a story/bug from `work/2_current/` or `work/3_qa/` to `work/4_merge/`.
///
/// This stages a work item as ready for the mergemaster to pick up and merge into master.
/// Idempotent: if already in `4_merge/`, returns Ok without committing.
pub fn move_story_to_merge(project_root: &Path, story_id: &str) -> Result<(), String> {
let sk = project_root.join(".story_kit").join("work");
let current_path = sk.join("2_current").join(format!("{story_id}.md"));
let qa_path = sk.join("3_qa").join(format!("{story_id}.md"));
let merge_dir = sk.join("4_merge");
let merge_path = merge_dir.join(format!("{story_id}.md"));
if merge_path.exists() {
// Already in 4_merge/ — idempotent, nothing to do.
return Ok(());
}
// Accept from 2_current/ (manual trigger) or 3_qa/ (pipeline advancement from QA stage).
let source_path = if current_path.exists() {
current_path.clone()
} else if qa_path.exists() {
qa_path.clone()
} else {
return Err(format!(
"Work item '{story_id}' not found in work/2_current/ or work/3_qa/. Cannot move to 4_merge/."
));
};
std::fs::create_dir_all(&merge_dir)
.map_err(|e| format!("Failed to create work/4_merge/ directory: {e}"))?;
std::fs::rename(&source_path, &merge_path)
.map_err(|e| format!("Failed to move '{story_id}' to 4_merge/: {e}"))?;
let from_dir = if source_path == current_path {
"work/2_current/"
} else {
"work/3_qa/"
};
eprintln!("[lifecycle] Moved '{story_id}' from {from_dir} to work/4_merge/");
Ok(())
}
/// Move a story/bug from `work/2_current/` to `work/3_qa/` and auto-commit.
///
/// This stages a work item for QA review before merging to master.
/// Idempotent: if already in `3_qa/`, returns Ok without committing.
pub fn move_story_to_qa(project_root: &Path, story_id: &str) -> Result<(), String> {
let sk = project_root.join(".story_kit").join("work");
let current_path = sk.join("2_current").join(format!("{story_id}.md"));
let qa_dir = sk.join("3_qa");
let qa_path = qa_dir.join(format!("{story_id}.md"));
if qa_path.exists() {
// Already in 3_qa/ — idempotent, nothing to do.
return Ok(());
}
if !current_path.exists() {
return Err(format!(
"Work item '{story_id}' not found in work/2_current/. Cannot move to 3_qa/."
));
}
std::fs::create_dir_all(&qa_dir)
.map_err(|e| format!("Failed to create work/3_qa/ directory: {e}"))?;
std::fs::rename(&current_path, &qa_path)
.map_err(|e| format!("Failed to move '{story_id}' to 3_qa/: {e}"))?;
eprintln!("[lifecycle] Moved '{story_id}' from work/2_current/ to work/3_qa/");
Ok(())
}
/// Move a bug from `work/2_current/` or `work/1_upcoming/` to `work/5_archived/` and auto-commit.
///
/// * If the bug is in `2_current/`, it is moved to `5_archived/` and committed.
/// * If the bug is still in `1_upcoming/` (never started), it is moved directly to `5_archived/`.
/// * If the bug is already in `5_archived/`, this is a no-op (idempotent).
/// * If the bug is not found anywhere, an error is returned.
pub fn close_bug_to_archive(project_root: &Path, bug_id: &str) -> Result<(), String> {
let sk = project_root.join(".story_kit").join("work");
let current_path = sk.join("2_current").join(format!("{bug_id}.md"));
let upcoming_path = sk.join("1_upcoming").join(format!("{bug_id}.md"));
let archive_dir = item_archive_dir(project_root, bug_id);
let archive_path = archive_dir.join(format!("{bug_id}.md"));
if archive_path.exists() {
return Ok(());
}
let source_path = if current_path.exists() {
current_path.clone()
} else if upcoming_path.exists() {
upcoming_path.clone()
} else {
return Err(format!(
"Bug '{bug_id}' not found in work/2_current/ or work/1_upcoming/. Cannot close bug."
));
};
std::fs::create_dir_all(&archive_dir)
.map_err(|e| format!("Failed to create work/5_archived/ directory: {e}"))?;
std::fs::rename(&source_path, &archive_path)
.map_err(|e| format!("Failed to move bug '{bug_id}' to 5_archived/: {e}"))?;
eprintln!(
"[lifecycle] Closed bug '{bug_id}' → work/5_archived/"
);
Ok(())
}
// ── Acceptance-gate helpers ───────────────────────────────────────────────────
/// Check whether the given directory has any uncommitted git changes.
/// Returns `Err` with a descriptive message if there are any.
fn check_uncommitted_changes(path: &Path) -> Result<(), String> {
let output = Command::new("git")
.args(["status", "--porcelain"])
.current_dir(path)
.output()
.map_err(|e| format!("Failed to run git status: {e}"))?;
let stdout = String::from_utf8_lossy(&output.stdout);
if !stdout.trim().is_empty() {
return Err(format!(
"Worktree has uncommitted changes. Please commit all work before \
the agent exits:\n{stdout}"
));
}
Ok(())
}
/// Run the project's test suite.
///
/// Uses `script/test` if present, treating it as the canonical single test entry point.
/// Falls back to `cargo nextest run` / `cargo test` when `script/test` is absent.
/// Returns `(tests_passed, output)`.
fn run_project_tests(path: &Path) -> Result<(bool, String), String> {
let script_test = path.join("script").join("test");
if script_test.exists() {
let mut output = String::from("=== script/test ===\n");
let result = Command::new(&script_test)
.current_dir(path)
.output()
.map_err(|e| format!("Failed to run script/test: {e}"))?;
let out = format!(
"{}{}",
String::from_utf8_lossy(&result.stdout),
String::from_utf8_lossy(&result.stderr)
);
output.push_str(&out);
output.push('\n');
return Ok((result.status.success(), output));
}
// Fallback: cargo nextest run / cargo test
let mut output = String::from("=== tests ===\n");
let (success, test_out) = match Command::new("cargo")
.args(["nextest", "run"])
.current_dir(path)
.output()
{
Ok(o) => {
let combined = format!(
"{}{}",
String::from_utf8_lossy(&o.stdout),
String::from_utf8_lossy(&o.stderr)
);
(o.status.success(), combined)
}
Err(_) => {
// nextest not available — fall back to cargo test
let o = Command::new("cargo")
.args(["test"])
.current_dir(path)
.output()
.map_err(|e| format!("Failed to run cargo test: {e}"))?;
let combined = format!(
"{}{}",
String::from_utf8_lossy(&o.stdout),
String::from_utf8_lossy(&o.stderr)
);
(o.status.success(), combined)
}
};
output.push_str(&test_out);
output.push('\n');
Ok((success, output))
}
/// Run `cargo clippy` and the project test suite (via `script/test` if present,
/// otherwise `cargo nextest run` / `cargo test`) in the given directory.
/// Returns `(gates_passed, combined_output)`.
fn run_acceptance_gates(path: &Path) -> Result<(bool, String), String> {
let mut all_output = String::new();
let mut all_passed = true;
// ── cargo clippy ──────────────────────────────────────────────
let clippy = Command::new("cargo")
.args(["clippy", "--all-targets", "--all-features"])
.current_dir(path)
.output()
.map_err(|e| format!("Failed to run cargo clippy: {e}"))?;
all_output.push_str("=== cargo clippy ===\n");
let clippy_stdout = String::from_utf8_lossy(&clippy.stdout);
let clippy_stderr = String::from_utf8_lossy(&clippy.stderr);
if !clippy_stdout.is_empty() {
all_output.push_str(&clippy_stdout);
}
if !clippy_stderr.is_empty() {
all_output.push_str(&clippy_stderr);
}
all_output.push('\n');
if !clippy.status.success() {
all_passed = false;
}
// ── tests (script/test if available, else cargo nextest/test) ─
let (test_success, test_out) = run_project_tests(path)?;
all_output.push_str(&test_out);
if !test_success {
all_passed = false;
}
Ok((all_passed, all_output))
}
/// Run `script/test_coverage` in the given directory if the script exists.
///
/// Used as a QA gate before advancing a story from `3_qa/` to `4_merge/`.
/// Returns `(passed, output)`. If the script does not exist, returns `(true, …)`.
fn run_coverage_gate(path: &Path) -> Result<(bool, String), String> {
let script = path.join("script").join("test_coverage");
if !script.exists() {
return Ok((
true,
"script/test_coverage not found; coverage gate skipped.\n".to_string(),
));
}
let mut output = String::from("=== script/test_coverage ===\n");
let result = Command::new(&script)
.current_dir(path)
.output()
.map_err(|e| format!("Failed to run script/test_coverage: {e}"))?;
let combined = format!(
"{}{}",
String::from_utf8_lossy(&result.stdout),
String::from_utf8_lossy(&result.stderr)
);
output.push_str(&combined);
output.push('\n');
Ok((result.status.success(), output))
}
// ── Mergemaster helpers ───────────────────────────────────────────────────────
/// Squash-merge a feature branch into the current branch in the project root.
///
/// Returns `(success, had_conflicts, conflict_details, output)`.
fn run_squash_merge(
project_root: &Path,
branch: &str,
story_id: &str,
) -> Result<(bool, bool, Option<String>, String), String> {
let mut all_output = String::new();
// ── git merge --squash ────────────────────────────────────────
all_output.push_str(&format!("=== git merge --squash {branch} ===\n"));
let merge = Command::new("git")
.args(["merge", "--squash", branch])
.current_dir(project_root)
.output()
.map_err(|e| format!("Failed to run git merge: {e}"))?;
let merge_stdout = String::from_utf8_lossy(&merge.stdout).to_string();
let merge_stderr = String::from_utf8_lossy(&merge.stderr).to_string();
all_output.push_str(&merge_stdout);
all_output.push_str(&merge_stderr);
all_output.push('\n');
if !merge.status.success() {
// Conflicts detected — abort the merge and report.
let conflict_details = format!(
"Merge conflicts in branch '{branch}':\n{merge_stdout}{merge_stderr}"
);
// Abort the merge to restore clean state.
let _ = Command::new("git")
.args(["merge", "--abort"])
.current_dir(project_root)
.output();
all_output.push_str("=== Merge aborted due to conflicts ===\n");
return Ok((false, true, Some(conflict_details), all_output));
}
// ── git commit ─────────────────────────────────────────────
all_output.push_str("=== git commit ===\n");
let commit_msg = format!("story-kit: merge {story_id}");
let commit = Command::new("git")
.args(["commit", "-m", &commit_msg])
.current_dir(project_root)
.output()
.map_err(|e| format!("Failed to run git commit: {e}"))?;
let commit_stdout = String::from_utf8_lossy(&commit.stdout).to_string();
let commit_stderr = String::from_utf8_lossy(&commit.stderr).to_string();
all_output.push_str(&commit_stdout);
all_output.push_str(&commit_stderr);
all_output.push('\n');
if !commit.status.success() {
// Nothing to commit (e.g. empty diff) — treat as success.
if commit_stderr.contains("nothing to commit")
|| commit_stdout.contains("nothing to commit")
{
return Ok((true, false, None, all_output));
}
return Ok((false, false, None, all_output));
}
Ok((true, false, None, all_output))
}
/// Run quality gates in the project root after a successful merge.
///
/// Runs: cargo clippy, cargo nextest run / cargo test, and pnpm gates if frontend/ exists.
/// Returns `(gates_passed, combined_output)`.
fn run_merge_quality_gates(project_root: &Path) -> Result<(bool, String), String> {
let mut all_output = String::new();
let mut all_passed = true;
// ── cargo clippy ──────────────────────────────────────────────
let clippy = Command::new("cargo")
.args(["clippy", "--all-targets", "--all-features"])
.current_dir(project_root)
.output()
.map_err(|e| format!("Failed to run cargo clippy: {e}"))?;
all_output.push_str("=== cargo clippy ===\n");
let clippy_out = format!(
"{}{}",
String::from_utf8_lossy(&clippy.stdout),
String::from_utf8_lossy(&clippy.stderr)
);
all_output.push_str(&clippy_out);
all_output.push('\n');
if !clippy.status.success() {
all_passed = false;
}
// ── tests (script/test if available, else cargo nextest/test) ─
let (test_success, test_out) = run_project_tests(project_root)?;
all_output.push_str(&test_out);
if !test_success {
all_passed = false;
}
// ── pnpm build (if frontend/ directory exists) ────────────────
// pnpm test is handled by script/test when present; only run it here as
// a standalone fallback when there is no script/test.
let frontend_dir = project_root.join("frontend");
if frontend_dir.exists() {
all_output.push_str("=== pnpm build ===\n");
let pnpm_build = Command::new("pnpm")
.args(["run", "build"])
.current_dir(&frontend_dir)
.output()
.map_err(|e| format!("Failed to run pnpm build: {e}"))?;
let build_out = format!(
"{}{}",
String::from_utf8_lossy(&pnpm_build.stdout),
String::from_utf8_lossy(&pnpm_build.stderr)
);
all_output.push_str(&build_out);
all_output.push('\n');
if !pnpm_build.status.success() {
all_passed = false;
}
// Only run pnpm test separately when script/test is absent (it would
// already cover frontend tests in that case).
let script_test = project_root.join("script").join("test");
if !script_test.exists() {
all_output.push_str("=== pnpm test ===\n");
let pnpm_test = Command::new("pnpm")
.args(["test", "--run"])
.current_dir(&frontend_dir)
.output()
.map_err(|e| format!("Failed to run pnpm test: {e}"))?;
let pnpm_test_out = format!(
"{}{}",
String::from_utf8_lossy(&pnpm_test.stdout),
String::from_utf8_lossy(&pnpm_test.stderr)
);
all_output.push_str(&pnpm_test_out);
all_output.push('\n');
if !pnpm_test.status.success() {
all_passed = false;
}
}
}
Ok((all_passed, all_output))
}
/// Spawn claude agent in a PTY and stream events through the broadcast channel.
#[allow(clippy::too_many_arguments)]
async fn run_agent_pty_streaming(
story_id: &str,
agent_name: &str,
command: &str,
args: &[String],
prompt: &str,
cwd: &str,
tx: &broadcast::Sender<AgentEvent>,
event_log: &Arc<Mutex<Vec<AgentEvent>>>,
) -> Result<Option<String>, String> {
let sid = story_id.to_string();
let aname = agent_name.to_string();
let cmd = command.to_string();
let args = args.to_vec();
let prompt = prompt.to_string();
let cwd = cwd.to_string();
let tx = tx.clone();
let event_log = event_log.clone();
tokio::task::spawn_blocking(move || {
run_agent_pty_blocking(&sid, &aname, &cmd, &args, &prompt, &cwd, &tx, &event_log)
})
.await
.map_err(|e| format!("Agent task panicked: {e}"))?
}
/// Helper to send an event to both broadcast and event log.
fn emit_event(
event: AgentEvent,
tx: &broadcast::Sender<AgentEvent>,
event_log: &Mutex<Vec<AgentEvent>>,
) {
if let Ok(mut log) = event_log.lock() {
log.push(event.clone());
}
let _ = tx.send(event);
}
#[allow(clippy::too_many_arguments)]
fn run_agent_pty_blocking(
story_id: &str,
agent_name: &str,
command: &str,
args: &[String],
prompt: &str,
cwd: &str,
tx: &broadcast::Sender<AgentEvent>,
event_log: &Mutex<Vec<AgentEvent>>,
) -> Result<Option<String>, String> {
let pty_system = native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: 50,
cols: 200,
pixel_width: 0,
pixel_height: 0,
})
.map_err(|e| format!("Failed to open PTY: {e}"))?;
let mut cmd = CommandBuilder::new(command);
// -p <prompt> must come first
cmd.arg("-p");
cmd.arg(prompt);
// Add configured args (e.g., --directory /path/to/worktree, --model, etc.)
for arg in args {
cmd.arg(arg);
}
cmd.arg("--output-format");
cmd.arg("stream-json");
cmd.arg("--verbose");
// Supervised agents don't need interactive permission prompts
cmd.arg("--permission-mode");
cmd.arg("bypassPermissions");
cmd.cwd(cwd);
cmd.env("NO_COLOR", "1");
// Allow spawning Claude Code from within a Claude Code session
cmd.env_remove("CLAUDECODE");
cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
eprintln!("[agent:{story_id}:{agent_name}] Spawning {command} in {cwd} with args: {args:?}");
let mut child = pair
.slave
.spawn_command(cmd)
.map_err(|e| format!("Failed to spawn agent for {story_id}:{agent_name}: {e}"))?;
drop(pair.slave);
let reader = pair
.master
.try_clone_reader()
.map_err(|e| format!("Failed to clone PTY reader: {e}"))?;
drop(pair.master);
let buf_reader = BufReader::new(reader);
let mut session_id: Option<String> = None;
for line in buf_reader.lines() {
let line = match line {
Ok(l) => l,
Err(_) => break,
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
// Try to parse as JSON
let json: serde_json::Value = match serde_json::from_str(trimmed) {
Ok(j) => j,
Err(_) => {
// Non-JSON output (terminal escapes etc.) — send as raw output
emit_event(
AgentEvent::Output {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
text: trimmed.to_string(),
},
tx,
event_log,
);
continue;
}
};
let event_type = json.get("type").and_then(|t| t.as_str()).unwrap_or("");
match event_type {
"system" => {
session_id = json
.get("session_id")
.and_then(|s| s.as_str())
.map(|s| s.to_string());
}
"assistant" => {
if let Some(message) = json.get("message")
&& let Some(content) = message.get("content").and_then(|c| c.as_array())
{
for block in content {
if let Some(text) = block.get("text").and_then(|t| t.as_str()) {
emit_event(
AgentEvent::Output {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
text: text.to_string(),
},
tx,
event_log,
);
}
}
}
}
_ => {}
}
// Forward all JSON events
emit_event(
AgentEvent::AgentJson {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
data: json,
},
tx,
event_log,
);
}
let _ = child.kill();
let _ = child.wait();
eprintln!(
"[agent:{story_id}:{agent_name}] Done. Session: {:?}",
session_id
);
Ok(session_id)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn wait_for_agent_returns_immediately_if_completed() {
let pool = AgentPool::new(3001);
pool.inject_test_agent("s1", "bot", AgentStatus::Completed);
let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap();
assert_eq!(info.status, AgentStatus::Completed);
assert_eq!(info.story_id, "s1");
assert_eq!(info.agent_name, "bot");
}
#[tokio::test]
async fn wait_for_agent_returns_immediately_if_failed() {
let pool = AgentPool::new(3001);
pool.inject_test_agent("s2", "bot", AgentStatus::Failed);
let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap();
assert_eq!(info.status, AgentStatus::Failed);
}
#[tokio::test]
async fn wait_for_agent_completes_on_done_event() {
let pool = AgentPool::new(3001);
let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running);
// Send Done event after a short delay
let tx_clone = tx.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// Mark status via event; real code also updates the map, but for
// this unit test the map entry stays Running — we verify the
// wait loop reacts to the event.
let _ = tx_clone.send(AgentEvent::Done {
story_id: "s3".to_string(),
agent_name: "bot".to_string(),
session_id: Some("sess-abc".to_string()),
});
});
let info = pool.wait_for_agent("s3", "bot", 2000).await.unwrap();
// Status comes from the map entry (Running in this unit test)
// — the important thing is that wait_for_agent returned without timing out.
assert_eq!(info.story_id, "s3");
}
#[tokio::test]
async fn wait_for_agent_times_out() {
let pool = AgentPool::new(3001);
pool.inject_test_agent("s4", "bot", AgentStatus::Running);
let result = pool.wait_for_agent("s4", "bot", 50).await;
assert!(result.is_err());
let msg = result.unwrap_err();
assert!(msg.contains("Timed out"), "unexpected message: {msg}");
}
#[tokio::test]
async fn wait_for_agent_errors_for_nonexistent() {
let pool = AgentPool::new(3001);
let result = pool.wait_for_agent("no_story", "no_bot", 100).await;
assert!(result.is_err());
}
#[tokio::test]
async fn wait_for_agent_completes_on_stopped_status_event() {
let pool = AgentPool::new(3001);
let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running);
let tx_clone = tx.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
let _ = tx_clone.send(AgentEvent::Status {
story_id: "s5".to_string(),
agent_name: "bot".to_string(),
status: "stopped".to_string(),
});
});
let info = pool.wait_for_agent("s5", "bot", 2000).await.unwrap();
assert_eq!(info.story_id, "s5");
}
// ── report_completion tests ────────────────────────────────────
#[tokio::test]
async fn report_completion_rejects_nonexistent_agent() {
let pool = AgentPool::new(3001);
let result = pool
.report_completion("no_story", "no_bot", "done")
.await;
assert!(result.is_err());
let msg = result.unwrap_err();
assert!(msg.contains("No agent"), "unexpected: {msg}");
}
#[tokio::test]
async fn report_completion_rejects_non_running_agent() {
let pool = AgentPool::new(3001);
pool.inject_test_agent("s6", "bot", AgentStatus::Completed);
let result = pool.report_completion("s6", "bot", "done").await;
assert!(result.is_err());
let msg = result.unwrap_err();
assert!(
msg.contains("not running"),
"expected 'not running' in: {msg}"
);
}
#[tokio::test]
async fn report_completion_rejects_dirty_worktree() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
// Init a real git repo and make an initial commit
Command::new("git")
.args(["init"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "--allow-empty", "-m", "init"])
.current_dir(repo)
.output()
.unwrap();
// Write an uncommitted file
fs::write(repo.join("dirty.txt"), "not committed").unwrap();
let pool = AgentPool::new(3001);
pool.inject_test_agent_with_path("s7", "bot", AgentStatus::Running, repo.to_path_buf());
let result = pool.report_completion("s7", "bot", "done").await;
assert!(result.is_err());
let msg = result.unwrap_err();
assert!(
msg.contains("uncommitted"),
"expected 'uncommitted' in: {msg}"
);
}
// ── server-owned completion tests ───────────────────────────────────────────
#[tokio::test]
async fn server_owned_completion_skips_when_already_completed() {
let pool = AgentPool::new(3001);
let report = CompletionReport {
summary: "Already done".to_string(),
gates_passed: true,
gate_output: String::new(),
};
pool.inject_test_agent_with_completion(
"s10",
"coder-1",
AgentStatus::Completed,
PathBuf::from("/tmp/nonexistent"),
report,
);
// Subscribe before calling so we can check if Done event was emitted.
let mut rx = pool.subscribe("s10", "coder-1").unwrap();
run_server_owned_completion(&pool.agents, pool.port, "s10", "coder-1", Some("sess-1".to_string()))
.await;
// Status should remain Completed (unchanged) — no gate re-run.
let agents = pool.agents.lock().unwrap();
let key = composite_key("s10", "coder-1");
let agent = agents.get(&key).unwrap();
assert_eq!(agent.status, AgentStatus::Completed);
// Summary should still be the original, not overwritten.
assert_eq!(
agent.completion.as_ref().unwrap().summary,
"Already done"
);
drop(agents);
// No Done event should have been emitted.
assert!(
rx.try_recv().is_err(),
"should not emit Done when completion already exists"
);
}
#[tokio::test]
async fn server_owned_completion_runs_gates_on_clean_worktree() {
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
let pool = AgentPool::new(3001);
pool.inject_test_agent_with_path(
"s11",
"coder-1",
AgentStatus::Running,
repo.to_path_buf(),
);
let mut rx = pool.subscribe("s11", "coder-1").unwrap();
run_server_owned_completion(&pool.agents, pool.port, "s11", "coder-1", Some("sess-2".to_string()))
.await;
// Completion report should exist (gates were run, though they may fail
// because this is not a real Cargo project).
let agents = pool.agents.lock().unwrap();
let key = composite_key("s11", "coder-1");
let agent = agents.get(&key).unwrap();
assert!(
agent.completion.is_some(),
"completion report should be created"
);
assert_eq!(
agent.completion.as_ref().unwrap().summary,
"Agent process exited normally"
);
// Session ID should be stored.
assert_eq!(agent.session_id, Some("sess-2".to_string()));
// Status should be terminal (Completed or Failed depending on gate results).
assert!(
agent.status == AgentStatus::Completed || agent.status == AgentStatus::Failed,
"status should be terminal, got: {:?}",
agent.status
);
drop(agents);
// A Done event should have been emitted.
let event = rx.try_recv().expect("should emit Done event");
assert!(
matches!(event, AgentEvent::Done { .. }),
"expected Done event, got: {event:?}"
);
}
#[tokio::test]
async fn server_owned_completion_fails_on_dirty_worktree() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
// Create an uncommitted file.
fs::write(repo.join("dirty.txt"), "not committed").unwrap();
let pool = AgentPool::new(3001);
pool.inject_test_agent_with_path(
"s12",
"coder-1",
AgentStatus::Running,
repo.to_path_buf(),
);
run_server_owned_completion(&pool.agents, pool.port, "s12", "coder-1", None)
.await;
let agents = pool.agents.lock().unwrap();
let key = composite_key("s12", "coder-1");
let agent = agents.get(&key).unwrap();
assert!(agent.completion.is_some());
assert!(!agent.completion.as_ref().unwrap().gates_passed);
assert_eq!(agent.status, AgentStatus::Failed);
assert!(
agent
.completion
.as_ref()
.unwrap()
.gate_output
.contains("uncommitted"),
"gate_output should mention uncommitted changes"
);
}
#[tokio::test]
async fn server_owned_completion_nonexistent_agent_is_noop() {
let pool = AgentPool::new(3001);
// Should not panic or error — just silently return.
run_server_owned_completion(&pool.agents, pool.port, "nonexistent", "bot", None)
.await;
}
// ── move_story_to_current tests ────────────────────────────────────────────
// No git repo needed: the watcher handles commits asynchronously.
fn init_git_repo(repo: &std::path::Path) {
Command::new("git")
.args(["init"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.email", "test@test.com"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.name", "Test"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "--allow-empty", "-m", "init"])
.current_dir(repo)
.output()
.unwrap();
}
#[test]
fn move_story_to_current_moves_file() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let upcoming = root.join(".story_kit/work/1_upcoming");
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&upcoming).unwrap();
fs::create_dir_all(&current).unwrap();
fs::write(upcoming.join("10_story_foo.md"), "test").unwrap();
move_story_to_current(root, "10_story_foo").unwrap();
assert!(!upcoming.join("10_story_foo.md").exists());
assert!(current.join("10_story_foo.md").exists());
}
#[test]
fn move_story_to_current_is_idempotent_when_already_current() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("11_story_foo.md"), "test").unwrap();
move_story_to_current(root, "11_story_foo").unwrap();
assert!(current.join("11_story_foo.md").exists());
}
#[test]
fn move_story_to_current_noop_when_not_in_upcoming() {
let tmp = tempfile::tempdir().unwrap();
assert!(move_story_to_current(tmp.path(), "99_missing").is_ok());
}
#[test]
fn move_bug_to_current_moves_from_upcoming() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let upcoming = root.join(".story_kit/work/1_upcoming");
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&upcoming).unwrap();
fs::create_dir_all(&current).unwrap();
fs::write(upcoming.join("1_bug_test.md"), "# Bug 1\n").unwrap();
move_story_to_current(root, "1_bug_test").unwrap();
assert!(!upcoming.join("1_bug_test.md").exists());
assert!(current.join("1_bug_test.md").exists());
}
#[test]
fn close_bug_moves_from_current_to_archive() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("2_bug_test.md"), "# Bug 2\n").unwrap();
close_bug_to_archive(root, "2_bug_test").unwrap();
assert!(!current.join("2_bug_test.md").exists());
assert!(root.join(".story_kit/work/5_archived/2_bug_test.md").exists());
}
#[test]
fn close_bug_moves_from_upcoming_when_not_started() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let upcoming = root.join(".story_kit/work/1_upcoming");
fs::create_dir_all(&upcoming).unwrap();
fs::write(upcoming.join("3_bug_test.md"), "# Bug 3\n").unwrap();
close_bug_to_archive(root, "3_bug_test").unwrap();
assert!(!upcoming.join("3_bug_test.md").exists());
assert!(root.join(".story_kit/work/5_archived/3_bug_test.md").exists());
}
#[test]
fn item_type_from_id_detects_types() {
assert_eq!(item_type_from_id("1_bug_test"), "bug");
assert_eq!(item_type_from_id("1_spike_research"), "spike");
assert_eq!(item_type_from_id("50_story_my_story"), "story");
assert_eq!(item_type_from_id("1_story_simple"), "story");
}
// ── pipeline_stage tests ──────────────────────────────────────────────────
#[test]
fn pipeline_stage_detects_coders() {
assert_eq!(pipeline_stage("coder-1"), PipelineStage::Coder);
assert_eq!(pipeline_stage("coder-2"), PipelineStage::Coder);
assert_eq!(pipeline_stage("coder-3"), PipelineStage::Coder);
}
#[test]
fn pipeline_stage_detects_qa() {
assert_eq!(pipeline_stage("qa"), PipelineStage::Qa);
}
#[test]
fn pipeline_stage_detects_mergemaster() {
assert_eq!(pipeline_stage("mergemaster"), PipelineStage::Mergemaster);
}
#[test]
fn pipeline_stage_supervisor_is_other() {
assert_eq!(pipeline_stage("supervisor"), PipelineStage::Other);
assert_eq!(pipeline_stage("default"), PipelineStage::Other);
assert_eq!(pipeline_stage("unknown"), PipelineStage::Other);
}
// ── pipeline advance tests ────────────────────────────────────────────────
#[tokio::test]
async fn pipeline_advance_coder_gates_pass_moves_story_to_qa() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Set up story in 2_current/
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("50_story_test.md"), "test").unwrap();
let pool = AgentPool::new(3001);
pool.inject_test_agent_with_completion(
"50_story_test",
"coder-1",
AgentStatus::Completed,
root.to_path_buf(),
CompletionReport {
summary: "done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
);
// Call pipeline advance directly (bypasses background spawn for testing).
pool.run_pipeline_advance_for_completed_agent("50_story_test", "coder-1")
.await;
// Story should have moved to 3_qa/ (start_agent for qa will fail in tests but
// the file move happens before that).
assert!(
root.join(".story_kit/work/3_qa/50_story_test.md").exists(),
"story should be in 3_qa/"
);
assert!(
!current.join("50_story_test.md").exists(),
"story should not still be in 2_current/"
);
}
#[tokio::test]
async fn pipeline_advance_qa_gates_pass_moves_story_to_merge() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Set up story in 3_qa/
let qa_dir = root.join(".story_kit/work/3_qa");
fs::create_dir_all(&qa_dir).unwrap();
fs::write(qa_dir.join("51_story_test.md"), "test").unwrap();
let pool = AgentPool::new(3001);
pool.inject_test_agent_with_completion(
"51_story_test",
"qa",
AgentStatus::Completed,
root.to_path_buf(),
CompletionReport {
summary: "QA done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
);
pool.run_pipeline_advance_for_completed_agent("51_story_test", "qa")
.await;
// Story should have moved to 4_merge/
assert!(
root.join(".story_kit/work/4_merge/51_story_test.md").exists(),
"story should be in 4_merge/"
);
assert!(
!qa_dir.join("51_story_test.md").exists(),
"story should not still be in 3_qa/"
);
}
#[tokio::test]
async fn pipeline_advance_supervisor_does_not_advance() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("52_story_test.md"), "test").unwrap();
let pool = AgentPool::new(3001);
pool.inject_test_agent_with_completion(
"52_story_test",
"supervisor",
AgentStatus::Completed,
root.to_path_buf(),
CompletionReport {
summary: "supervised".to_string(),
gates_passed: true,
gate_output: String::new(),
},
);
pool.run_pipeline_advance_for_completed_agent("52_story_test", "supervisor")
.await;
// Story should NOT have moved (supervisors don't advance pipeline)
assert!(
current.join("52_story_test.md").exists(),
"story should still be in 2_current/ for supervisor"
);
}
// ── move_story_to_merge tests ──────────────────────────────────────────────
#[test]
fn move_story_to_merge_moves_file() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("20_story_foo.md"), "test").unwrap();
move_story_to_merge(root, "20_story_foo").unwrap();
assert!(!current.join("20_story_foo.md").exists());
assert!(root.join(".story_kit/work/4_merge/20_story_foo.md").exists());
}
#[test]
fn move_story_to_merge_from_qa_dir() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let qa_dir = root.join(".story_kit/work/3_qa");
fs::create_dir_all(&qa_dir).unwrap();
fs::write(qa_dir.join("40_story_test.md"), "test").unwrap();
move_story_to_merge(root, "40_story_test").unwrap();
assert!(!qa_dir.join("40_story_test.md").exists());
assert!(root.join(".story_kit/work/4_merge/40_story_test.md").exists());
}
#[test]
fn move_story_to_merge_idempotent_when_already_in_merge() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let merge_dir = root.join(".story_kit/work/4_merge");
fs::create_dir_all(&merge_dir).unwrap();
fs::write(merge_dir.join("21_story_test.md"), "test").unwrap();
move_story_to_merge(root, "21_story_test").unwrap();
assert!(merge_dir.join("21_story_test.md").exists());
}
#[test]
fn move_story_to_merge_errors_when_not_in_current_or_qa() {
let tmp = tempfile::tempdir().unwrap();
let result = move_story_to_merge(tmp.path(), "99_nonexistent");
assert!(result.unwrap_err().contains("not found in work/2_current/ or work/3_qa/"));
}
// ── move_story_to_qa tests ────────────────────────────────────────────────
#[test]
fn move_story_to_qa_moves_file() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("30_story_qa.md"), "test").unwrap();
move_story_to_qa(root, "30_story_qa").unwrap();
assert!(!current.join("30_story_qa.md").exists());
assert!(root.join(".story_kit/work/3_qa/30_story_qa.md").exists());
}
#[test]
fn move_story_to_qa_idempotent_when_already_in_qa() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let qa_dir = root.join(".story_kit/work/3_qa");
fs::create_dir_all(&qa_dir).unwrap();
fs::write(qa_dir.join("31_story_test.md"), "test").unwrap();
move_story_to_qa(root, "31_story_test").unwrap();
assert!(qa_dir.join("31_story_test.md").exists());
}
#[test]
fn move_story_to_qa_errors_when_not_in_current() {
let tmp = tempfile::tempdir().unwrap();
let result = move_story_to_qa(tmp.path(), "99_nonexistent");
assert!(result.unwrap_err().contains("not found in work/2_current/"));
}
// ── move_story_to_archived tests ──────────────────────────────────────────
#[test]
fn move_story_to_archived_finds_in_merge_dir() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let merge_dir = root.join(".story_kit/work/4_merge");
fs::create_dir_all(&merge_dir).unwrap();
fs::write(merge_dir.join("22_story_test.md"), "test").unwrap();
move_story_to_archived(root, "22_story_test").unwrap();
assert!(!merge_dir.join("22_story_test.md").exists());
assert!(root.join(".story_kit/work/5_archived/22_story_test.md").exists());
}
#[test]
fn move_story_to_archived_error_when_not_in_current_or_merge() {
let tmp = tempfile::tempdir().unwrap();
let result = move_story_to_archived(tmp.path(), "99_nonexistent");
assert!(result.unwrap_err().contains("4_merge"));
}
// ── merge_agent_work tests ────────────────────────────────────────────────
#[tokio::test]
async fn merge_agent_work_returns_error_when_branch_not_found() {
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
let pool = AgentPool::new(3001);
// branch feature/story-99_nonexistent does not exist
let result = pool
.merge_agent_work(repo, "99_nonexistent")
.await
.unwrap();
// Should fail (no branch) — not panic
assert!(!result.success, "should fail when branch missing");
}
#[tokio::test]
async fn merge_agent_work_succeeds_on_clean_branch() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
// Create a feature branch with a commit
Command::new("git")
.args(["checkout", "-b", "feature/story-23_test"])
.current_dir(repo)
.output()
.unwrap();
fs::write(repo.join("feature.txt"), "feature content").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "add feature"])
.current_dir(repo)
.output()
.unwrap();
// Switch back to master (initial branch)
Command::new("git")
.args(["checkout", "master"])
.current_dir(repo)
.output()
.unwrap();
// Create the story file in 4_merge/ so we can test archival
let merge_dir = repo.join(".story_kit/work/4_merge");
fs::create_dir_all(&merge_dir).unwrap();
let story_file = merge_dir.join("23_test.md");
fs::write(&story_file, "---\nname: Test\ntest_plan: approved\n---\n").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "add story in merge"])
.current_dir(repo)
.output()
.unwrap();
let pool = AgentPool::new(3001);
let report = pool.merge_agent_work(repo, "23_test").await.unwrap();
// Merge should succeed (gates will run but cargo/pnpm results will depend on env)
// At minimum the merge itself should succeed
assert!(!report.had_conflicts, "should have no conflicts");
// Note: gates_passed may be false in test env without Rust project, that's OK
// The important thing is the merge itself ran
assert!(
report.success || report.gate_output.contains("Failed to run") || !report.gates_passed,
"report should be coherent: {report:?}"
);
// Story should be archived if gates passed
if report.story_archived {
let archived = repo.join(".story_kit/work/5_archived/23_test.md");
assert!(archived.exists(), "archived file should exist");
}
}
// ── run_project_tests tests ───────────────────────────────────
#[cfg(unix)]
#[test]
fn run_project_tests_uses_script_test_when_present_and_passes() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let path = tmp.path();
let script_dir = path.join("script");
fs::create_dir_all(&script_dir).unwrap();
let script_test = script_dir.join("test");
fs::write(&script_test, "#!/usr/bin/env bash\necho 'all tests passed'\nexit 0\n").unwrap();
let mut perms = fs::metadata(&script_test).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script_test, perms).unwrap();
let (passed, output) = run_project_tests(path).unwrap();
assert!(passed, "script/test exiting 0 should pass");
assert!(output.contains("script/test"), "output should mention script/test");
}
#[cfg(unix)]
#[test]
fn run_project_tests_reports_failure_when_script_test_exits_nonzero() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let path = tmp.path();
let script_dir = path.join("script");
fs::create_dir_all(&script_dir).unwrap();
let script_test = script_dir.join("test");
fs::write(&script_test, "#!/usr/bin/env bash\nexit 1\n").unwrap();
let mut perms = fs::metadata(&script_test).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script_test, perms).unwrap();
let (passed, output) = run_project_tests(path).unwrap();
assert!(!passed, "script/test exiting 1 should fail");
assert!(output.contains("script/test"), "output should mention script/test");
}
// ── run_coverage_gate tests ───────────────────────────────────────────────
#[cfg(unix)]
#[test]
fn coverage_gate_passes_when_script_absent() {
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let (passed, output) = run_coverage_gate(tmp.path()).unwrap();
assert!(passed, "coverage gate should pass when script is absent");
assert!(
output.contains("not found"),
"output should mention script not found"
);
}
#[cfg(unix)]
#[test]
fn coverage_gate_passes_when_script_exits_zero() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let path = tmp.path();
let script_dir = path.join("script");
fs::create_dir_all(&script_dir).unwrap();
let script = script_dir.join("test_coverage");
fs::write(
&script,
"#!/usr/bin/env bash\necho 'Rust line coverage: 85%'\necho 'PASS: Coverage 85% meets threshold 0%'\nexit 0\n",
)
.unwrap();
let mut perms = fs::metadata(&script).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script, perms).unwrap();
let (passed, output) = run_coverage_gate(path).unwrap();
assert!(passed, "coverage gate should pass when script exits 0");
assert!(
output.contains("script/test_coverage"),
"output should mention script/test_coverage"
);
}
#[cfg(unix)]
#[test]
fn coverage_gate_fails_when_script_exits_nonzero() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let path = tmp.path();
let script_dir = path.join("script");
fs::create_dir_all(&script_dir).unwrap();
let script = script_dir.join("test_coverage");
fs::write(
&script,
"#!/usr/bin/env bash\necho 'FAIL: Coverage 40% is below threshold 80%'\nexit 1\n",
)
.unwrap();
let mut perms = fs::metadata(&script).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script, perms).unwrap();
let (passed, output) = run_coverage_gate(path).unwrap();
assert!(!passed, "coverage gate should fail when script exits 1");
assert!(
output.contains("script/test_coverage"),
"output should mention script/test_coverage"
);
}
// ── auto-assign helper tests ───────────────────────────────────
#[test]
fn scan_stage_items_returns_empty_for_missing_dir() {
let tmp = tempfile::tempdir().unwrap();
let items = scan_stage_items(tmp.path(), "2_current");
assert!(items.is_empty());
}
#[test]
fn scan_stage_items_returns_sorted_story_ids() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let stage_dir = tmp.path().join(".story_kit").join("work").join("2_current");
fs::create_dir_all(&stage_dir).unwrap();
fs::write(stage_dir.join("42_story_foo.md"), "---\nname: foo\n---").unwrap();
fs::write(stage_dir.join("10_story_bar.md"), "---\nname: bar\n---").unwrap();
fs::write(stage_dir.join("5_story_baz.md"), "---\nname: baz\n---").unwrap();
// non-md file should be ignored
fs::write(stage_dir.join("README.txt"), "ignore me").unwrap();
let items = scan_stage_items(tmp.path(), "2_current");
assert_eq!(items, vec!["10_story_bar", "42_story_foo", "5_story_baz"]);
}
#[test]
fn is_story_assigned_returns_true_for_running_coder() {
let pool = AgentPool::new(3001);
pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running);
let agents = pool.agents.lock().unwrap();
assert!(is_story_assigned_for_stage(
&agents,
"42_story_foo",
&PipelineStage::Coder
));
// Same story but wrong stage — should be false
assert!(!is_story_assigned_for_stage(
&agents,
"42_story_foo",
&PipelineStage::Qa
));
// Different story — should be false
assert!(!is_story_assigned_for_stage(
&agents,
"99_story_other",
&PipelineStage::Coder
));
}
#[test]
fn is_story_assigned_returns_false_for_completed_agent() {
let pool = AgentPool::new(3001);
pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Completed);
let agents = pool.agents.lock().unwrap();
// Completed agents don't count as assigned
assert!(!is_story_assigned_for_stage(
&agents,
"42_story_foo",
&PipelineStage::Coder
));
}
#[test]
fn find_free_agent_returns_none_when_all_busy() {
use crate::config::ProjectConfig;
let config = ProjectConfig::parse(
r#"
[[agent]]
name = "coder-1"
[[agent]]
name = "coder-2"
"#,
)
.unwrap();
let pool = AgentPool::new(3001);
pool.inject_test_agent("s1", "coder-1", AgentStatus::Running);
pool.inject_test_agent("s2", "coder-2", AgentStatus::Running);
let agents = pool.agents.lock().unwrap();
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert!(free.is_none(), "no free coders should be available");
}
#[test]
fn find_free_agent_returns_first_free_coder() {
use crate::config::ProjectConfig;
let config = ProjectConfig::parse(
r#"
[[agent]]
name = "coder-1"
[[agent]]
name = "coder-2"
[[agent]]
name = "coder-3"
"#,
)
.unwrap();
let pool = AgentPool::new(3001);
// coder-1 is busy, coder-2 is free
pool.inject_test_agent("s1", "coder-1", AgentStatus::Running);
let agents = pool.agents.lock().unwrap();
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert_eq!(free, Some("coder-2"), "coder-2 should be the first free coder");
}
#[test]
fn find_free_agent_ignores_completed_agents() {
use crate::config::ProjectConfig;
let config = ProjectConfig::parse(
r#"
[[agent]]
name = "coder-1"
"#,
)
.unwrap();
let pool = AgentPool::new(3001);
// coder-1 completed its previous story — it's free for a new one
pool.inject_test_agent("s1", "coder-1", AgentStatus::Completed);
let agents = pool.agents.lock().unwrap();
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert_eq!(free, Some("coder-1"), "completed coder-1 should be free");
}
#[test]
fn find_free_agent_returns_none_for_wrong_stage() {
use crate::config::ProjectConfig;
let config = ProjectConfig::parse(
r#"
[[agent]]
name = "qa"
"#,
)
.unwrap();
let agents: HashMap<String, StoryAgent> = HashMap::new();
// Looking for a Coder but only QA is configured
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert!(free.is_none());
// Looking for QA should find it
let free_qa = find_free_agent_for_stage(&config, &agents, &PipelineStage::Qa);
assert_eq!(free_qa, Some("qa"));
}
}