Files
storkit/server/src/agents.rs

1520 lines
51 KiB
Rust
Raw Normal View History

use crate::config::ProjectConfig;
use crate::worktree::{self, WorktreeInfo};
use portable_pty::{CommandBuilder, PtySize, native_pty_system};
use serde::Serialize;
use std::collections::HashMap;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
/// Build the composite key used to track agents in the pool.
fn composite_key(story_id: &str, agent_name: &str) -> String {
format!("{story_id}:{agent_name}")
}
/// Events streamed from a running agent to SSE clients.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum AgentEvent {
/// Agent status changed.
Status {
story_id: String,
agent_name: String,
status: String,
},
/// Raw text output from the agent process.
Output {
story_id: String,
agent_name: String,
text: String,
},
/// Agent produced a JSON event from `--output-format stream-json`.
AgentJson {
story_id: String,
agent_name: String,
data: serde_json::Value,
},
/// Agent finished.
Done {
story_id: String,
agent_name: String,
session_id: Option<String>,
},
/// Agent errored.
Error {
story_id: String,
agent_name: String,
message: String,
},
}
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum AgentStatus {
Pending,
Running,
Completed,
Failed,
}
impl std::fmt::Display for AgentStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pending => write!(f, "pending"),
Self::Running => write!(f, "running"),
Self::Completed => write!(f, "completed"),
Self::Failed => write!(f, "failed"),
}
}
}
/// Report produced by an agent calling `report_completion`.
#[derive(Debug, Serialize, Clone)]
pub struct CompletionReport {
pub summary: String,
pub gates_passed: bool,
pub gate_output: String,
}
#[derive(Debug, Serialize, Clone)]
pub struct AgentInfo {
pub story_id: String,
pub agent_name: String,
pub status: AgentStatus,
pub session_id: Option<String>,
pub worktree_path: Option<String>,
pub base_branch: Option<String>,
pub completion: Option<CompletionReport>,
}
struct StoryAgent {
agent_name: String,
status: AgentStatus,
worktree_info: Option<WorktreeInfo>,
session_id: Option<String>,
tx: broadcast::Sender<AgentEvent>,
task_handle: Option<tokio::task::JoinHandle<()>>,
/// Accumulated events for polling via get_agent_output.
event_log: Arc<Mutex<Vec<AgentEvent>>>,
/// Set when the agent calls report_completion.
completion: Option<CompletionReport>,
}
/// Build an `AgentInfo` snapshot from a `StoryAgent` map entry.
fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> AgentInfo {
AgentInfo {
story_id: story_id.to_string(),
agent_name: agent.agent_name.clone(),
status: agent.status.clone(),
session_id: agent.session_id.clone(),
worktree_path: agent
.worktree_info
.as_ref()
.map(|wt| wt.path.to_string_lossy().to_string()),
base_branch: agent
.worktree_info
.as_ref()
.map(|wt| wt.base_branch.clone()),
completion: agent.completion.clone(),
}
}
/// Manages concurrent story agents, each in its own worktree.
pub struct AgentPool {
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
port: u16,
}
impl AgentPool {
pub fn new(port: u16) -> Self {
Self {
agents: Arc::new(Mutex::new(HashMap::new())),
port,
}
}
/// Start an agent for a story: load config, create worktree, spawn agent.
/// If `agent_name` is None, defaults to the first configured agent.
pub async fn start_agent(
&self,
project_root: &Path,
story_id: &str,
agent_name: Option<&str>,
) -> Result<AgentInfo, String> {
let config = ProjectConfig::load(project_root)?;
// Resolve agent name from config
let resolved_name = match agent_name {
Some(name) => {
config
.find_agent(name)
.ok_or_else(|| format!("No agent named '{name}' in config"))?;
name.to_string()
}
None => config
.default_agent()
.ok_or_else(|| "No agents configured".to_string())?
.name
.clone(),
};
let key = composite_key(story_id, &resolved_name);
// Check not already running
{
let agents = self.agents.lock().map_err(|e| e.to_string())?;
if let Some(agent) = agents.get(&key)
&& (agent.status == AgentStatus::Running || agent.status == AgentStatus::Pending)
{
return Err(format!(
"Agent '{resolved_name}' for story '{story_id}' is already {}",
agent.status
));
}
}
let (tx, _) = broadcast::channel::<AgentEvent>(1024);
let event_log: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
// Register as pending
{
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
agents.insert(
key.clone(),
StoryAgent {
agent_name: resolved_name.clone(),
status: AgentStatus::Pending,
worktree_info: None,
session_id: None,
tx: tx.clone(),
task_handle: None,
event_log: event_log.clone(),
completion: None,
},
);
}
let _ = tx.send(AgentEvent::Status {
story_id: story_id.to_string(),
agent_name: resolved_name.clone(),
status: "pending".to_string(),
});
// Move story from upcoming/ to current/ and auto-commit before creating the worktree.
move_story_to_current(project_root, story_id)?;
// Create worktree
let wt_info = worktree::create_worktree(project_root, story_id, &config, self.port).await?;
// Update with worktree info
{
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
if let Some(agent) = agents.get_mut(&key) {
agent.worktree_info = Some(wt_info.clone());
}
}
// Spawn the agent process
let wt_path_str = wt_info.path.to_string_lossy().to_string();
let (command, args, prompt) =
config.render_agent_args(&wt_path_str, story_id, Some(&resolved_name), Some(&wt_info.base_branch))?;
let sid = story_id.to_string();
let aname = resolved_name.clone();
let tx_clone = tx.clone();
let agents_ref = self.agents.clone();
let cwd = wt_path_str.clone();
let key_clone = key.clone();
let log_clone = event_log.clone();
let handle = tokio::spawn(async move {
let _ = tx_clone.send(AgentEvent::Status {
story_id: sid.clone(),
agent_name: aname.clone(),
status: "running".to_string(),
});
match run_agent_pty_streaming(
&sid, &aname, &command, &args, &prompt, &cwd, &tx_clone, &log_clone,
)
.await
{
Ok(session_id) => {
if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&key_clone)
{
agent.status = AgentStatus::Completed;
agent.session_id = session_id.clone();
}
let _ = tx_clone.send(AgentEvent::Done {
story_id: sid.clone(),
agent_name: aname.clone(),
session_id,
});
}
Err(e) => {
if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&key_clone)
{
agent.status = AgentStatus::Failed;
}
let _ = tx_clone.send(AgentEvent::Error {
story_id: sid.clone(),
agent_name: aname.clone(),
message: e,
});
}
}
});
// Update status to running with task handle
{
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
if let Some(agent) = agents.get_mut(&key) {
agent.status = AgentStatus::Running;
agent.task_handle = Some(handle);
}
}
Ok(AgentInfo {
story_id: story_id.to_string(),
agent_name: resolved_name,
status: AgentStatus::Running,
session_id: None,
worktree_path: Some(wt_path_str),
base_branch: Some(wt_info.base_branch.clone()),
completion: None,
})
}
/// Stop a running agent. Worktree is preserved for inspection.
pub async fn stop_agent(
&self,
_project_root: &Path,
story_id: &str,
agent_name: &str,
) -> Result<(), String> {
let key = composite_key(story_id, agent_name);
let (worktree_info, task_handle, tx) = {
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get_mut(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
let wt = agent.worktree_info.clone();
let handle = agent.task_handle.take();
let tx = agent.tx.clone();
agent.status = AgentStatus::Failed;
(wt, handle, tx)
};
// Abort the task
if let Some(handle) = task_handle {
handle.abort();
let _ = handle.await;
}
// Preserve worktree for inspection — don't destroy agent's work on stop.
if let Some(ref wt) = worktree_info {
eprintln!(
"[agents] Worktree preserved for {story_id}:{agent_name}: {}",
wt.path.display()
);
}
let _ = tx.send(AgentEvent::Status {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
status: "stopped".to_string(),
});
// Remove from map
{
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
agents.remove(&key);
}
Ok(())
}
/// List all agents with their status.
pub fn list_agents(&self) -> Result<Vec<AgentInfo>, String> {
let agents = self.agents.lock().map_err(|e| e.to_string())?;
Ok(agents
.iter()
.map(|(key, agent)| {
// Extract story_id from composite key "story_id:agent_name"
let story_id = key
.rsplit_once(':')
.map(|(sid, _)| sid.to_string())
.unwrap_or_else(|| key.clone());
agent_info_from_entry(&story_id, agent)
})
.collect())
}
/// Subscribe to events for a story agent.
pub fn subscribe(
&self,
story_id: &str,
agent_name: &str,
) -> Result<broadcast::Receiver<AgentEvent>, String> {
let key = composite_key(story_id, agent_name);
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
Ok(agent.tx.subscribe())
}
/// Drain accumulated events for polling. Returns all events since the last drain.
pub fn drain_events(
&self,
story_id: &str,
agent_name: &str,
) -> Result<Vec<AgentEvent>, String> {
let key = composite_key(story_id, agent_name);
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
let mut log = agent.event_log.lock().map_err(|e| e.to_string())?;
Ok(log.drain(..).collect())
}
/// Block until the agent reaches a terminal state (completed, failed, stopped).
/// Returns the agent's final `AgentInfo`.
/// `timeout_ms` caps how long to wait; returns an error if the deadline passes.
pub async fn wait_for_agent(
&self,
story_id: &str,
agent_name: &str,
timeout_ms: u64,
) -> Result<AgentInfo, String> {
// Subscribe before checking status so we don't miss the terminal event
// if the agent completes in the window between the two operations.
let mut rx = self.subscribe(story_id, agent_name)?;
// Return immediately if already in a terminal state.
{
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let key = composite_key(story_id, agent_name);
if let Some(agent) = agents.get(&key)
&& matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed)
{
return Ok(agent_info_from_entry(story_id, agent));
}
}
let deadline =
tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err(format!(
"Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'"
));
}
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(event)) => {
let is_terminal = match &event {
AgentEvent::Done { .. } | AgentEvent::Error { .. } => true,
AgentEvent::Status { status, .. } if status == "stopped" => true,
_ => false,
};
if is_terminal {
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let key = composite_key(story_id, agent_name);
return Ok(if let Some(agent) = agents.get(&key) {
agent_info_from_entry(story_id, agent)
} else {
// Agent was removed from map (e.g. stop_agent removes it after
// the "stopped" status event is sent).
let (status, session_id) = match event {
AgentEvent::Done { session_id, .. } => {
(AgentStatus::Completed, session_id)
}
_ => (AgentStatus::Failed, None),
};
AgentInfo {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
status,
session_id,
worktree_path: None,
base_branch: None,
completion: None,
}
});
}
}
Ok(Err(broadcast::error::RecvError::Lagged(_))) => {
// Missed some buffered events — check current status before resuming.
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let key = composite_key(story_id, agent_name);
if let Some(agent) = agents.get(&key)
&& matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed)
{
return Ok(agent_info_from_entry(story_id, agent));
}
// Still running — continue the loop.
}
Ok(Err(broadcast::error::RecvError::Closed)) => {
// Channel closed: no more events will arrive. Return current state.
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let key = composite_key(story_id, agent_name);
if let Some(agent) = agents.get(&key) {
return Ok(agent_info_from_entry(story_id, agent));
}
return Err(format!(
"Agent '{agent_name}' for story '{story_id}' channel closed unexpectedly"
));
}
Err(_) => {
return Err(format!(
"Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'"
));
}
}
}
}
/// Create a worktree for the given story using the server port (writes .mcp.json).
pub async fn create_worktree(
&self,
project_root: &Path,
story_id: &str,
) -> Result<worktree::WorktreeInfo, String> {
let config = ProjectConfig::load(project_root)?;
worktree::create_worktree(project_root, story_id, &config, self.port).await
}
/// Report that an agent has finished work on a story.
///
/// - Rejects with an error if the worktree has uncommitted changes.
/// - Runs acceptance gates (cargo clippy + cargo nextest run / cargo test).
/// - Stores the `CompletionReport` on the agent record.
/// - Transitions status to `Completed` (gates passed) or `Failed` (gates failed).
/// - Emits a `Done` event so `wait_for_agent` unblocks.
pub async fn report_completion(
&self,
story_id: &str,
agent_name: &str,
summary: &str,
) -> Result<CompletionReport, String> {
let key = composite_key(story_id, agent_name);
// Verify agent exists, is Running, and grab its worktree path.
let worktree_path = {
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
if agent.status != AgentStatus::Running {
return Err(format!(
"Agent '{agent_name}' for story '{story_id}' is not running (status: {}). \
report_completion can only be called by a running agent.",
agent.status
));
}
agent
.worktree_info
.as_ref()
.map(|wt| wt.path.clone())
.ok_or_else(|| {
format!(
"Agent '{agent_name}' for story '{story_id}' has no worktree. \
Cannot run acceptance gates."
)
})?
};
let path = worktree_path.clone();
// Run gate checks in a blocking thread to avoid stalling the async runtime.
let (gates_passed, gate_output) = tokio::task::spawn_blocking(move || {
// Step 1: Reject if worktree is dirty.
check_uncommitted_changes(&path)?;
// Step 2: Run clippy + tests and return (passed, output).
run_acceptance_gates(&path)
})
.await
.map_err(|e| format!("Gate check task panicked: {e}"))??;
let report = CompletionReport {
summary: summary.to_string(),
gates_passed,
gate_output,
};
// Store the completion report and advance status.
let (tx, session_id) = {
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents.get_mut(&key).ok_or_else(|| {
format!("Agent '{agent_name}' for story '{story_id}' disappeared during gate check")
})?;
agent.completion = Some(report.clone());
agent.status = if gates_passed {
AgentStatus::Completed
} else {
AgentStatus::Failed
};
(agent.tx.clone(), agent.session_id.clone())
};
// Emit Done so wait_for_agent unblocks.
let _ = tx.send(AgentEvent::Done {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
session_id,
});
Ok(report)
}
/// Return the port this server is running on.
#[allow(dead_code)]
pub fn port(&self) -> u16 {
self.port
}
/// Get project root helper.
pub fn get_project_root(
&self,
state: &crate::state::SessionState,
) -> Result<PathBuf, String> {
state.get_project_root()
}
/// Test helper: inject a pre-built agent entry so unit tests can exercise
/// wait/subscribe logic without spawning a real process.
#[cfg(test)]
pub fn inject_test_agent(
&self,
story_id: &str,
agent_name: &str,
status: AgentStatus,
) -> broadcast::Sender<AgentEvent> {
let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap();
agents.insert(
key,
StoryAgent {
agent_name: agent_name.to_string(),
status,
worktree_info: None,
session_id: None,
tx: tx.clone(),
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: None,
},
);
tx
}
/// Test helper: inject an agent with a specific worktree path for testing
/// gate-related logic.
#[cfg(test)]
pub fn inject_test_agent_with_path(
&self,
story_id: &str,
agent_name: &str,
status: AgentStatus,
worktree_path: PathBuf,
) -> broadcast::Sender<AgentEvent> {
let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap();
agents.insert(
key,
StoryAgent {
agent_name: agent_name.to_string(),
status,
worktree_info: Some(WorktreeInfo {
path: worktree_path,
branch: format!("feature/story-{story_id}"),
base_branch: "master".to_string(),
}),
session_id: None,
tx: tx.clone(),
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: None,
},
);
tx
}
}
/// Stage one or more file paths and create a deterministic commit in the given git root.
///
/// Pass deleted paths too so git stages their removal alongside any new files.
pub fn git_stage_and_commit(
git_root: &Path,
paths: &[&Path],
message: &str,
) -> Result<(), String> {
let mut add_cmd = Command::new("git");
add_cmd.arg("add").current_dir(git_root);
for path in paths {
add_cmd.arg(path.to_string_lossy().as_ref());
}
let output = add_cmd.output().map_err(|e| format!("git add: {e}"))?;
if !output.status.success() {
return Err(format!(
"git add failed: {}",
String::from_utf8_lossy(&output.stderr)
));
}
let output = Command::new("git")
.args(["commit", "-m", message])
.current_dir(git_root)
.output()
.map_err(|e| format!("git commit: {e}"))?;
if !output.status.success() {
return Err(format!(
"git commit failed: {}",
String::from_utf8_lossy(&output.stderr)
));
}
Ok(())
}
/// Determine the work item type from its ID.
/// Returns "bug" for `bug-*` IDs, "spike" for `spike-*` IDs, "story" otherwise.
fn item_type_from_id(item_id: &str) -> &'static str {
if item_id.starts_with("bug-") {
"bug"
} else if item_id.starts_with("spike-") {
"spike"
} else {
"story"
}
}
/// Return the source directory path for a work item based on its type.
fn item_source_dir(project_root: &Path, item_id: &str) -> PathBuf {
let sk = project_root.join(".story_kit");
match item_type_from_id(item_id) {
"bug" => sk.join("bugs"),
"spike" => sk.join("spikes"),
_ => sk.join("stories").join("upcoming"),
}
}
/// Return the archive directory path for a work item based on its type.
fn item_archive_dir(project_root: &Path, item_id: &str) -> PathBuf {
let sk = project_root.join(".story_kit");
match item_type_from_id(item_id) {
"bug" => sk.join("bugs").join("archive"),
"spike" => sk.join("spikes").join("archive"),
_ => sk.join("stories").join("archived"),
}
}
/// Move a work item (story, bug, or spike) to the unified `.story_kit/current/` directory.
///
/// Idempotent: if the item is already in `current/`, returns Ok without committing.
/// If the item is not found in its source directory, logs a warning and returns Ok.
pub fn move_story_to_current(project_root: &Path, story_id: &str) -> Result<(), String> {
let current_dir = project_root.join(".story_kit").join("current");
let current_path = current_dir.join(format!("{story_id}.md"));
if current_path.exists() {
// Already in current/ — idempotent, nothing to do.
return Ok(());
}
let source_dir = item_source_dir(project_root, story_id);
let source_path = source_dir.join(format!("{story_id}.md"));
if !source_path.exists() {
eprintln!(
"[lifecycle] Work item '{story_id}' not found in {}; skipping move to current/",
source_dir.display()
);
return Ok(());
}
std::fs::create_dir_all(&current_dir)
.map_err(|e| format!("Failed to create .story_kit/current/ directory: {e}"))?;
std::fs::rename(&source_path, &current_path)
.map_err(|e| format!("Failed to move '{story_id}' to current/: {e}"))?;
eprintln!(
"[lifecycle] Moved '{story_id}' from {} to current/",
source_dir.display()
);
let msg = format!("story-kit: start {story_id}");
git_stage_and_commit(
project_root,
&[current_path.as_path(), source_path.as_path()],
&msg,
)
}
/// Move a story from `.story_kit/current/` to `.story_kit/stories/archived/` and auto-commit.
///
/// * If the story is in `current/`, it is renamed to `stories/archived/` and committed.
/// * If the story is already in `stories/archived/`, this is a no-op (idempotent).
/// * If the story is not found in `current/` or `stories/archived/`, an error is returned.
pub fn move_story_to_archived(project_root: &Path, story_id: &str) -> Result<(), String> {
let current_path = project_root
.join(".story_kit")
.join("current")
.join(format!("{story_id}.md"));
let archived_dir = project_root
.join(".story_kit")
.join("stories")
.join("archived");
let archived_path = archived_dir.join(format!("{story_id}.md"));
if archived_path.exists() {
// Already archived — idempotent, nothing to do.
return Ok(());
}
if current_path.exists() {
std::fs::create_dir_all(&archived_dir)
.map_err(|e| format!("Failed to create stories/archived/ directory: {e}"))?;
std::fs::rename(&current_path, &archived_path)
.map_err(|e| format!("Failed to move story '{story_id}' to archived/: {e}"))?;
eprintln!("[lifecycle] Moved story '{story_id}' from current/ to stories/archived/");
let msg = format!("story-kit: accept story {story_id}");
git_stage_and_commit(
project_root,
&[archived_path.as_path(), current_path.as_path()],
&msg,
)?;
return Ok(());
}
Err(format!(
"Story '{story_id}' not found in current/. Cannot accept story."
))
}
/// Move a bug from `.story_kit/current/` to `.story_kit/bugs/archive/` and auto-commit.
///
/// * If the bug is in `current/`, it is moved to `bugs/archive/` and committed.
/// * If the bug is still in `bugs/` (never started), it is moved directly to `bugs/archive/`.
/// * If the bug is already in `bugs/archive/`, this is a no-op (idempotent).
/// * If the bug is not found anywhere, an error is returned.
pub fn close_bug_to_archive(project_root: &Path, bug_id: &str) -> Result<(), String> {
let sk = project_root.join(".story_kit");
let current_path = sk.join("current").join(format!("{bug_id}.md"));
let bugs_path = sk.join("bugs").join(format!("{bug_id}.md"));
let archive_dir = item_archive_dir(project_root, bug_id);
let archive_path = archive_dir.join(format!("{bug_id}.md"));
if archive_path.exists() {
return Ok(());
}
let source_path = if current_path.exists() {
current_path.clone()
} else if bugs_path.exists() {
bugs_path.clone()
} else {
return Err(format!(
"Bug '{bug_id}' not found in current/ or bugs/. Cannot close bug."
));
};
std::fs::create_dir_all(&archive_dir)
.map_err(|e| format!("Failed to create bugs/archive/ directory: {e}"))?;
std::fs::rename(&source_path, &archive_path)
.map_err(|e| format!("Failed to move bug '{bug_id}' to archive: {e}"))?;
eprintln!(
"[lifecycle] Closed bug '{bug_id}' → bugs/archive/"
);
let msg = format!("story-kit: close bug {bug_id}");
git_stage_and_commit(
project_root,
&[archive_path.as_path(), source_path.as_path()],
&msg,
)
}
// ── Acceptance-gate helpers ───────────────────────────────────────────────────
/// Check whether the given directory has any uncommitted git changes.
/// Returns `Err` with a descriptive message if there are any.
fn check_uncommitted_changes(path: &Path) -> Result<(), String> {
let output = Command::new("git")
.args(["status", "--porcelain"])
.current_dir(path)
.output()
.map_err(|e| format!("Failed to run git status: {e}"))?;
let stdout = String::from_utf8_lossy(&output.stdout);
if !stdout.trim().is_empty() {
return Err(format!(
"Worktree has uncommitted changes. Commit your work before calling \
report_completion:\n{stdout}"
));
}
Ok(())
}
/// Run `cargo clippy` and `cargo nextest run` (falling back to `cargo test`) in
/// the given directory. Returns `(gates_passed, combined_output)`.
fn run_acceptance_gates(path: &Path) -> Result<(bool, String), String> {
let mut all_output = String::new();
let mut all_passed = true;
// ── cargo clippy ──────────────────────────────────────────────
let clippy = Command::new("cargo")
.args(["clippy", "--all-targets", "--all-features"])
.current_dir(path)
.output()
.map_err(|e| format!("Failed to run cargo clippy: {e}"))?;
all_output.push_str("=== cargo clippy ===\n");
let clippy_stdout = String::from_utf8_lossy(&clippy.stdout);
let clippy_stderr = String::from_utf8_lossy(&clippy.stderr);
if !clippy_stdout.is_empty() {
all_output.push_str(&clippy_stdout);
}
if !clippy_stderr.is_empty() {
all_output.push_str(&clippy_stderr);
}
all_output.push('\n');
if !clippy.status.success() {
all_passed = false;
}
// ── cargo nextest run (fallback: cargo test) ──────────────────
all_output.push_str("=== tests ===\n");
let (test_success, test_out) = match Command::new("cargo")
.args(["nextest", "run"])
.current_dir(path)
.output()
{
Ok(o) => {
let combined = format!(
"{}{}",
String::from_utf8_lossy(&o.stdout),
String::from_utf8_lossy(&o.stderr)
);
(o.status.success(), combined)
}
Err(_) => {
// nextest not available — fall back to cargo test
let o = Command::new("cargo")
.args(["test"])
.current_dir(path)
.output()
.map_err(|e| format!("Failed to run cargo test: {e}"))?;
let combined = format!(
"{}{}",
String::from_utf8_lossy(&o.stdout),
String::from_utf8_lossy(&o.stderr)
);
(o.status.success(), combined)
}
};
all_output.push_str(&test_out);
all_output.push('\n');
if !test_success {
all_passed = false;
}
Ok((all_passed, all_output))
}
/// Spawn claude agent in a PTY and stream events through the broadcast channel.
#[allow(clippy::too_many_arguments)]
async fn run_agent_pty_streaming(
story_id: &str,
agent_name: &str,
command: &str,
args: &[String],
prompt: &str,
cwd: &str,
tx: &broadcast::Sender<AgentEvent>,
event_log: &Arc<Mutex<Vec<AgentEvent>>>,
) -> Result<Option<String>, String> {
let sid = story_id.to_string();
let aname = agent_name.to_string();
let cmd = command.to_string();
let args = args.to_vec();
let prompt = prompt.to_string();
let cwd = cwd.to_string();
let tx = tx.clone();
let event_log = event_log.clone();
tokio::task::spawn_blocking(move || {
run_agent_pty_blocking(&sid, &aname, &cmd, &args, &prompt, &cwd, &tx, &event_log)
})
.await
.map_err(|e| format!("Agent task panicked: {e}"))?
}
/// Helper to send an event to both broadcast and event log.
fn emit_event(
event: AgentEvent,
tx: &broadcast::Sender<AgentEvent>,
event_log: &Mutex<Vec<AgentEvent>>,
) {
if let Ok(mut log) = event_log.lock() {
log.push(event.clone());
}
let _ = tx.send(event);
}
#[allow(clippy::too_many_arguments)]
fn run_agent_pty_blocking(
story_id: &str,
agent_name: &str,
command: &str,
args: &[String],
prompt: &str,
cwd: &str,
tx: &broadcast::Sender<AgentEvent>,
event_log: &Mutex<Vec<AgentEvent>>,
) -> Result<Option<String>, String> {
let pty_system = native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: 50,
cols: 200,
pixel_width: 0,
pixel_height: 0,
})
.map_err(|e| format!("Failed to open PTY: {e}"))?;
let mut cmd = CommandBuilder::new(command);
// -p <prompt> must come first
cmd.arg("-p");
cmd.arg(prompt);
// Add configured args (e.g., --directory /path/to/worktree, --model, etc.)
for arg in args {
cmd.arg(arg);
}
cmd.arg("--output-format");
cmd.arg("stream-json");
cmd.arg("--verbose");
// Supervised agents don't need interactive permission prompts
cmd.arg("--permission-mode");
cmd.arg("bypassPermissions");
cmd.cwd(cwd);
cmd.env("NO_COLOR", "1");
// Allow spawning Claude Code from within a Claude Code session
cmd.env_remove("CLAUDECODE");
cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
eprintln!("[agent:{story_id}:{agent_name}] Spawning {command} in {cwd} with args: {args:?}");
let mut child = pair
.slave
.spawn_command(cmd)
.map_err(|e| format!("Failed to spawn agent for {story_id}:{agent_name}: {e}"))?;
drop(pair.slave);
let reader = pair
.master
.try_clone_reader()
.map_err(|e| format!("Failed to clone PTY reader: {e}"))?;
drop(pair.master);
let buf_reader = BufReader::new(reader);
let mut session_id: Option<String> = None;
for line in buf_reader.lines() {
let line = match line {
Ok(l) => l,
Err(_) => break,
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
// Try to parse as JSON
let json: serde_json::Value = match serde_json::from_str(trimmed) {
Ok(j) => j,
Err(_) => {
// Non-JSON output (terminal escapes etc.) — send as raw output
emit_event(
AgentEvent::Output {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
text: trimmed.to_string(),
},
tx,
event_log,
);
continue;
}
};
let event_type = json.get("type").and_then(|t| t.as_str()).unwrap_or("");
match event_type {
"system" => {
session_id = json
.get("session_id")
.and_then(|s| s.as_str())
.map(|s| s.to_string());
}
"assistant" => {
if let Some(message) = json.get("message")
&& let Some(content) = message.get("content").and_then(|c| c.as_array())
{
for block in content {
if let Some(text) = block.get("text").and_then(|t| t.as_str()) {
emit_event(
AgentEvent::Output {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
text: text.to_string(),
},
tx,
event_log,
);
}
}
}
}
_ => {}
}
// Forward all JSON events
emit_event(
AgentEvent::AgentJson {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
data: json,
},
tx,
event_log,
);
}
let _ = child.kill();
let _ = child.wait();
eprintln!(
"[agent:{story_id}:{agent_name}] Done. Session: {:?}",
session_id
);
Ok(session_id)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn wait_for_agent_returns_immediately_if_completed() {
let pool = AgentPool::new(3001);
pool.inject_test_agent("s1", "bot", AgentStatus::Completed);
let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap();
assert_eq!(info.status, AgentStatus::Completed);
assert_eq!(info.story_id, "s1");
assert_eq!(info.agent_name, "bot");
}
#[tokio::test]
async fn wait_for_agent_returns_immediately_if_failed() {
let pool = AgentPool::new(3001);
pool.inject_test_agent("s2", "bot", AgentStatus::Failed);
let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap();
assert_eq!(info.status, AgentStatus::Failed);
}
#[tokio::test]
async fn wait_for_agent_completes_on_done_event() {
let pool = AgentPool::new(3001);
let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running);
// Send Done event after a short delay
let tx_clone = tx.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// Mark status via event; real code also updates the map, but for
// this unit test the map entry stays Running — we verify the
// wait loop reacts to the event.
let _ = tx_clone.send(AgentEvent::Done {
story_id: "s3".to_string(),
agent_name: "bot".to_string(),
session_id: Some("sess-abc".to_string()),
});
});
let info = pool.wait_for_agent("s3", "bot", 2000).await.unwrap();
// Status comes from the map entry (Running in this unit test)
// — the important thing is that wait_for_agent returned without timing out.
assert_eq!(info.story_id, "s3");
}
#[tokio::test]
async fn wait_for_agent_times_out() {
let pool = AgentPool::new(3001);
pool.inject_test_agent("s4", "bot", AgentStatus::Running);
let result = pool.wait_for_agent("s4", "bot", 50).await;
assert!(result.is_err());
let msg = result.unwrap_err();
assert!(msg.contains("Timed out"), "unexpected message: {msg}");
}
#[tokio::test]
async fn wait_for_agent_errors_for_nonexistent() {
let pool = AgentPool::new(3001);
let result = pool.wait_for_agent("no_story", "no_bot", 100).await;
assert!(result.is_err());
}
#[tokio::test]
async fn wait_for_agent_completes_on_stopped_status_event() {
let pool = AgentPool::new(3001);
let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running);
let tx_clone = tx.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
let _ = tx_clone.send(AgentEvent::Status {
story_id: "s5".to_string(),
agent_name: "bot".to_string(),
status: "stopped".to_string(),
});
});
let info = pool.wait_for_agent("s5", "bot", 2000).await.unwrap();
assert_eq!(info.story_id, "s5");
}
// ── report_completion tests ────────────────────────────────────
#[tokio::test]
async fn report_completion_rejects_nonexistent_agent() {
let pool = AgentPool::new(3001);
let result = pool
.report_completion("no_story", "no_bot", "done")
.await;
assert!(result.is_err());
let msg = result.unwrap_err();
assert!(msg.contains("No agent"), "unexpected: {msg}");
}
#[tokio::test]
async fn report_completion_rejects_non_running_agent() {
let pool = AgentPool::new(3001);
pool.inject_test_agent("s6", "bot", AgentStatus::Completed);
let result = pool.report_completion("s6", "bot", "done").await;
assert!(result.is_err());
let msg = result.unwrap_err();
assert!(
msg.contains("not running"),
"expected 'not running' in: {msg}"
);
}
#[tokio::test]
async fn report_completion_rejects_dirty_worktree() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
// Init a real git repo and make an initial commit
Command::new("git")
.args(["init"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "--allow-empty", "-m", "init"])
.current_dir(repo)
.output()
.unwrap();
// Write an uncommitted file
fs::write(repo.join("dirty.txt"), "not committed").unwrap();
let pool = AgentPool::new(3001);
pool.inject_test_agent_with_path("s7", "bot", AgentStatus::Running, repo.to_path_buf());
let result = pool.report_completion("s7", "bot", "done").await;
assert!(result.is_err());
let msg = result.unwrap_err();
assert!(
msg.contains("uncommitted"),
"expected 'uncommitted' in: {msg}"
);
}
// ── move_story_to_current tests ────────────────────────────────────────────
fn init_git_repo(repo: &std::path::Path) {
Command::new("git")
.args(["init"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.email", "test@test.com"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.name", "Test"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "--allow-empty", "-m", "init"])
.current_dir(repo)
.output()
.unwrap();
}
#[test]
fn move_story_to_current_moves_file_and_commits() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
let upcoming = repo.join(".story_kit/stories/upcoming");
let current_dir = repo.join(".story_kit/current");
fs::create_dir_all(&upcoming).unwrap();
fs::create_dir_all(&current_dir).unwrap();
let story_file = upcoming.join("10_my_story.md");
fs::write(&story_file, "---\nname: Test\ntest_plan: pending\n---\n").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "add story"])
.current_dir(repo)
.output()
.unwrap();
move_story_to_current(repo, "10_my_story").unwrap();
assert!(!story_file.exists(), "upcoming file should be gone");
assert!(
current_dir.join("10_my_story.md").exists(),
"current/ file should exist"
);
}
#[test]
fn move_story_to_current_is_idempotent_when_already_current() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
let current_dir = repo.join(".story_kit/current");
fs::create_dir_all(&current_dir).unwrap();
fs::write(
current_dir.join("11_my_story.md"),
"---\nname: Test\ntest_plan: pending\n---\n",
)
.unwrap();
// Should succeed without error even though there's nothing to move
move_story_to_current(repo, "11_my_story").unwrap();
assert!(current_dir.join("11_my_story.md").exists());
}
#[test]
fn move_story_to_current_noop_when_not_in_upcoming() {
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
// Story doesn't exist anywhere — should return Ok (lenient)
let result = move_story_to_current(repo, "99_missing");
assert!(result.is_ok(), "should return Ok when story is not found");
}
#[test]
fn move_bug_to_current_moves_from_bugs_dir() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
let bugs_dir = repo.join(".story_kit/bugs");
let current_dir = repo.join(".story_kit/current");
fs::create_dir_all(&bugs_dir).unwrap();
fs::create_dir_all(&current_dir).unwrap();
let bug_file = bugs_dir.join("bug-1-test.md");
fs::write(&bug_file, "# Bug 1\n").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "add bug"])
.current_dir(repo)
.output()
.unwrap();
move_story_to_current(repo, "bug-1-test").unwrap();
assert!(!bug_file.exists(), "bugs/ file should be gone");
assert!(
current_dir.join("bug-1-test.md").exists(),
"current/ file should exist"
);
}
#[test]
fn close_bug_moves_from_current_to_archive() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
let current_dir = repo.join(".story_kit/current");
fs::create_dir_all(&current_dir).unwrap();
let bug_in_current = current_dir.join("bug-2-test.md");
fs::write(&bug_in_current, "# Bug 2\n").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "add bug to current"])
.current_dir(repo)
.output()
.unwrap();
close_bug_to_archive(repo, "bug-2-test").unwrap();
let archive_path = repo.join(".story_kit/bugs/archive/bug-2-test.md");
assert!(!bug_in_current.exists(), "current/ file should be gone");
assert!(archive_path.exists(), "archive file should exist");
}
#[test]
fn close_bug_moves_from_bugs_dir_when_not_started() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
let bugs_dir = repo.join(".story_kit/bugs");
fs::create_dir_all(&bugs_dir).unwrap();
let bug_file = bugs_dir.join("bug-3-test.md");
fs::write(&bug_file, "# Bug 3\n").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "add bug"])
.current_dir(repo)
.output()
.unwrap();
close_bug_to_archive(repo, "bug-3-test").unwrap();
let archive_path = repo.join(".story_kit/bugs/archive/bug-3-test.md");
assert!(!bug_file.exists(), "bugs/ file should be gone");
assert!(archive_path.exists(), "archive file should exist");
}
#[test]
fn item_type_from_id_detects_types() {
assert_eq!(item_type_from_id("bug-1-test"), "bug");
assert_eq!(item_type_from_id("spike-1-research"), "spike");
assert_eq!(item_type_from_id("50_my_story"), "story");
assert_eq!(item_type_from_id("1_simple"), "story");
}
// ── git_stage_and_commit tests ─────────────────────────────────────────────
#[test]
fn git_stage_and_commit_creates_commit() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
let file = repo.join("hello.txt");
fs::write(&file, "hello").unwrap();
git_stage_and_commit(repo, &[file.as_path()], "story-kit: test commit").unwrap();
// Verify the commit exists
let output = Command::new("git")
.args(["log", "--oneline", "-1"])
.current_dir(repo)
.output()
.unwrap();
let log = String::from_utf8_lossy(&output.stdout);
assert!(log.contains("story-kit: test commit"), "commit should appear in log: {log}");
}
}