2026-03-19 22:41:17 +00:00
|
|
|
mod auto_assign;
|
|
|
|
|
mod pipeline;
|
|
|
|
|
|
|
|
|
|
use crate::agent_log::AgentLogWriter;
|
|
|
|
|
use crate::config::ProjectConfig;
|
|
|
|
|
use crate::io::watcher::WatcherEvent;
|
|
|
|
|
use crate::slog;
|
|
|
|
|
use crate::slog_error;
|
|
|
|
|
use crate::worktree::{self, WorktreeInfo};
|
|
|
|
|
use portable_pty::ChildKiller;
|
|
|
|
|
use std::collections::HashMap;
|
|
|
|
|
use std::path::{Path, PathBuf};
|
|
|
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
|
use tokio::sync::broadcast;
|
|
|
|
|
|
|
|
|
|
use super::{
|
|
|
|
|
AgentEvent, AgentInfo, AgentStatus, CompletionReport, PipelineStage, agent_config_stage,
|
|
|
|
|
pipeline_stage,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/// Build the composite key used to track agents in the pool.
|
|
|
|
|
fn composite_key(story_id: &str, agent_name: &str) -> String {
|
|
|
|
|
format!("{story_id}:{agent_name}")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// RAII guard that removes a pending agent entry from the pool on drop.
|
|
|
|
|
///
|
|
|
|
|
/// Created after inserting a `Pending` entry into the agent HashMap.
|
|
|
|
|
/// If `start_agent` succeeds (the agent process is spawned and status
|
|
|
|
|
/// transitions to `Running`), call [`disarm`](Self::disarm) to prevent
|
|
|
|
|
/// cleanup. If any intermediate step fails and the guard is dropped
|
|
|
|
|
/// without being disarmed, the pending entry is removed so it cannot
|
|
|
|
|
/// block future auto-assign dispatches.
|
|
|
|
|
struct PendingGuard {
|
|
|
|
|
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
|
|
|
|
|
key: String,
|
|
|
|
|
armed: bool,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl PendingGuard {
|
|
|
|
|
fn new(agents: Arc<Mutex<HashMap<String, StoryAgent>>>, key: String) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
agents,
|
|
|
|
|
key,
|
|
|
|
|
armed: true,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Prevent the guard from cleaning up the entry (call after
|
|
|
|
|
/// successful spawn).
|
|
|
|
|
fn disarm(&mut self) {
|
|
|
|
|
self.armed = false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Drop for PendingGuard {
|
|
|
|
|
fn drop(&mut self) {
|
|
|
|
|
if self.armed
|
|
|
|
|
&& let Ok(mut agents) = self.agents.lock()
|
|
|
|
|
&& agents
|
|
|
|
|
.get(&self.key)
|
|
|
|
|
.is_some_and(|a| a.status == AgentStatus::Pending)
|
|
|
|
|
{
|
|
|
|
|
agents.remove(&self.key);
|
|
|
|
|
slog!(
|
|
|
|
|
"[agents] Cleaned up leaked Pending entry for '{}'",
|
|
|
|
|
self.key
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct StoryAgent {
|
|
|
|
|
agent_name: String,
|
|
|
|
|
status: AgentStatus,
|
|
|
|
|
worktree_info: Option<WorktreeInfo>,
|
|
|
|
|
session_id: Option<String>,
|
|
|
|
|
tx: broadcast::Sender<AgentEvent>,
|
|
|
|
|
task_handle: Option<tokio::task::JoinHandle<()>>,
|
|
|
|
|
/// Accumulated events for polling via get_agent_output.
|
|
|
|
|
event_log: Arc<Mutex<Vec<AgentEvent>>>,
|
|
|
|
|
/// Set when the agent calls report_completion.
|
|
|
|
|
completion: Option<CompletionReport>,
|
|
|
|
|
/// Project root, stored for pipeline advancement after completion.
|
|
|
|
|
project_root: Option<PathBuf>,
|
|
|
|
|
/// UUID identifying the log file for this session.
|
|
|
|
|
log_session_id: Option<String>,
|
|
|
|
|
/// Set to `true` when the agent calls `report_merge_failure`.
|
|
|
|
|
/// Prevents the pipeline from blindly advancing to `5_done/` after a
|
|
|
|
|
/// failed merge: the server-owned gate check runs in the feature-branch
|
|
|
|
|
/// worktree (which compiles fine) and returns `gates_passed=true` even
|
|
|
|
|
/// though the code was never squash-merged onto master.
|
|
|
|
|
merge_failure_reported: bool,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Build an `AgentInfo` snapshot from a `StoryAgent` map entry.
|
|
|
|
|
fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> AgentInfo {
|
|
|
|
|
AgentInfo {
|
|
|
|
|
story_id: story_id.to_string(),
|
|
|
|
|
agent_name: agent.agent_name.clone(),
|
|
|
|
|
status: agent.status.clone(),
|
|
|
|
|
session_id: agent.session_id.clone(),
|
|
|
|
|
worktree_path: agent
|
|
|
|
|
.worktree_info
|
|
|
|
|
.as_ref()
|
|
|
|
|
.map(|wt| wt.path.to_string_lossy().to_string()),
|
|
|
|
|
base_branch: agent
|
|
|
|
|
.worktree_info
|
|
|
|
|
.as_ref()
|
|
|
|
|
.map(|wt| wt.base_branch.clone()),
|
|
|
|
|
completion: agent.completion.clone(),
|
|
|
|
|
log_session_id: agent.log_session_id.clone(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Manages concurrent story agents, each in its own worktree.
|
|
|
|
|
pub struct AgentPool {
|
|
|
|
|
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
|
|
|
|
|
port: u16,
|
|
|
|
|
/// Registry of active PTY child process killers, keyed by "{story_id}:{agent_name}".
|
|
|
|
|
/// Used to terminate child processes on server shutdown or agent stop, preventing
|
|
|
|
|
/// orphaned Claude Code processes from running after the server exits.
|
|
|
|
|
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
|
|
|
|
/// Broadcast channel for notifying WebSocket clients of agent state changes.
|
|
|
|
|
/// When an agent transitions state (Pending, Running, Completed, Failed, Stopped),
|
|
|
|
|
/// an `AgentStateChanged` event is emitted so the frontend can refresh the
|
|
|
|
|
/// pipeline board without waiting for a filesystem event.
|
|
|
|
|
watcher_tx: broadcast::Sender<WatcherEvent>,
|
|
|
|
|
/// Tracks background merge jobs started by `merge_agent_work`, keyed by story_id.
|
|
|
|
|
/// The MCP tool returns immediately and the mergemaster agent polls
|
|
|
|
|
/// `get_merge_status` until the job reaches a terminal state.
|
|
|
|
|
merge_jobs: Arc<Mutex<HashMap<String, super::merge::MergeJob>>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl AgentPool {
|
|
|
|
|
pub fn new(port: u16, watcher_tx: broadcast::Sender<WatcherEvent>) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
agents: Arc::new(Mutex::new(HashMap::new())),
|
|
|
|
|
port,
|
|
|
|
|
child_killers: Arc::new(Mutex::new(HashMap::new())),
|
|
|
|
|
watcher_tx,
|
|
|
|
|
merge_jobs: Arc::new(Mutex::new(HashMap::new())),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Create a pool with a dummy watcher channel for unit tests.
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
pub fn new_test(port: u16) -> Self {
|
|
|
|
|
let (watcher_tx, _) = broadcast::channel(16);
|
|
|
|
|
Self::new(port, watcher_tx)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Notify WebSocket clients that agent state has changed, so the pipeline
|
|
|
|
|
/// board and agent panel can refresh.
|
|
|
|
|
fn notify_agent_state_changed(watcher_tx: &broadcast::Sender<WatcherEvent>) {
|
|
|
|
|
let _ = watcher_tx.send(WatcherEvent::AgentStateChanged);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Kill all active PTY child processes.
|
|
|
|
|
///
|
|
|
|
|
/// Called on server shutdown to prevent orphaned Claude Code processes from
|
|
|
|
|
/// continuing to run after the server exits. Each registered killer is called
|
|
|
|
|
/// once, then the registry is cleared.
|
|
|
|
|
pub fn kill_all_children(&self) {
|
|
|
|
|
if let Ok(mut killers) = self.child_killers.lock() {
|
|
|
|
|
for (key, killer) in killers.iter_mut() {
|
|
|
|
|
slog!("[agents] Killing child process for {key} on shutdown");
|
|
|
|
|
let _ = killer.kill();
|
|
|
|
|
}
|
|
|
|
|
killers.clear();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Kill and deregister the child process for a specific agent key.
|
|
|
|
|
///
|
|
|
|
|
/// Used by `stop_agent` to ensure the PTY child is terminated even though
|
|
|
|
|
/// aborting a `spawn_blocking` task handle does not interrupt the blocking thread.
|
|
|
|
|
fn kill_child_for_key(&self, key: &str) {
|
|
|
|
|
if let Ok(mut killers) = self.child_killers.lock()
|
|
|
|
|
&& let Some(mut killer) = killers.remove(key)
|
|
|
|
|
{
|
|
|
|
|
slog!("[agents] Killing child process for {key} on stop");
|
|
|
|
|
let _ = killer.kill();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Start an agent for a story: load config, create worktree, spawn agent.
|
|
|
|
|
///
|
|
|
|
|
/// When `agent_name` is `None`, automatically selects the first idle coder
|
|
|
|
|
/// agent (story 190). If all coders are busy the call fails with an error
|
|
|
|
|
/// indicating the story will be picked up when one becomes available.
|
|
|
|
|
///
|
|
|
|
|
/// If `resume_context` is provided, it is appended to the rendered prompt
|
|
|
|
|
/// so the agent can pick up from a previous failed attempt.
|
|
|
|
|
pub async fn start_agent(
|
|
|
|
|
&self,
|
|
|
|
|
project_root: &Path,
|
|
|
|
|
story_id: &str,
|
|
|
|
|
agent_name: Option<&str>,
|
|
|
|
|
resume_context: Option<&str>,
|
|
|
|
|
) -> Result<AgentInfo, String> {
|
|
|
|
|
let config = ProjectConfig::load(project_root)?;
|
|
|
|
|
|
|
|
|
|
// Validate explicit agent name early (no lock needed).
|
|
|
|
|
if let Some(name) = agent_name {
|
|
|
|
|
config
|
|
|
|
|
.find_agent(name)
|
|
|
|
|
.ok_or_else(|| format!("No agent named '{name}' in config"))?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create name-independent shared resources before the lock so they are
|
|
|
|
|
// ready for the atomic check-and-insert (story 132).
|
|
|
|
|
let (tx, _) = broadcast::channel::<AgentEvent>(1024);
|
|
|
|
|
let event_log: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
|
|
|
|
|
let log_session_id = uuid::Uuid::new_v4().to_string();
|
|
|
|
|
|
|
|
|
|
// Move story from backlog/ to current/ before checking agent
|
|
|
|
|
// availability so that auto_assign_available_work can pick it up even
|
|
|
|
|
// when all coders are currently busy (story 203). This is idempotent:
|
|
|
|
|
// if the story is already in 2_current/ or a later stage, the call is
|
|
|
|
|
// a no-op.
|
|
|
|
|
super::lifecycle::move_story_to_current(project_root, story_id)?;
|
|
|
|
|
|
|
|
|
|
// Validate that the agent's configured stage matches the story's
|
|
|
|
|
// pipeline stage. This prevents any caller (auto-assign, MCP tool,
|
|
|
|
|
// pipeline advance, supervisor) from starting a wrong-stage agent on
|
|
|
|
|
// a story — e.g. mergemaster on a coding-stage story (bug 312).
|
|
|
|
|
if let Some(name) = agent_name {
|
|
|
|
|
let agent_stage = config
|
|
|
|
|
.find_agent(name)
|
|
|
|
|
.map(agent_config_stage)
|
|
|
|
|
.unwrap_or_else(|| pipeline_stage(name));
|
|
|
|
|
if agent_stage != PipelineStage::Other
|
|
|
|
|
&& let Some(story_stage_dir) = find_active_story_stage(project_root, story_id)
|
|
|
|
|
{
|
|
|
|
|
let expected_stage = match story_stage_dir {
|
|
|
|
|
"2_current" => PipelineStage::Coder,
|
|
|
|
|
"3_qa" => PipelineStage::Qa,
|
|
|
|
|
"4_merge" => PipelineStage::Mergemaster,
|
|
|
|
|
_ => PipelineStage::Other,
|
|
|
|
|
};
|
|
|
|
|
if expected_stage != PipelineStage::Other && expected_stage != agent_stage {
|
|
|
|
|
return Err(format!(
|
|
|
|
|
"Agent '{name}' (stage: {agent_stage:?}) cannot be assigned to \
|
|
|
|
|
story '{story_id}' in {story_stage_dir}/ (requires stage: {expected_stage:?})"
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Atomically resolve agent name, check availability, and register as
|
|
|
|
|
// Pending. When `agent_name` is `None` the first idle coder is
|
|
|
|
|
// selected inside the lock so no TOCTOU race can occur between the
|
|
|
|
|
// availability check and the Pending insert (story 132, story 190).
|
|
|
|
|
//
|
|
|
|
|
// The `PendingGuard` ensures that if any step below fails the entry is
|
|
|
|
|
// removed from the pool so it does not permanently block auto-assign
|
|
|
|
|
// (bug 118).
|
|
|
|
|
let resolved_name: String;
|
|
|
|
|
let key: String;
|
|
|
|
|
{
|
|
|
|
|
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
|
|
|
|
|
|
|
|
resolved_name = match agent_name {
|
|
|
|
|
Some(name) => name.to_string(),
|
|
|
|
|
None => auto_assign::find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder)
|
|
|
|
|
.map(|s| s.to_string())
|
|
|
|
|
.ok_or_else(|| {
|
|
|
|
|
if config
|
|
|
|
|
.agent
|
|
|
|
|
.iter()
|
|
|
|
|
.any(|a| agent_config_stage(a) == PipelineStage::Coder)
|
|
|
|
|
{
|
|
|
|
|
format!(
|
|
|
|
|
"All coder agents are busy; story '{story_id}' has been \
|
|
|
|
|
queued in work/2_current/ and will be auto-assigned when \
|
|
|
|
|
one becomes available"
|
|
|
|
|
)
|
|
|
|
|
} else {
|
|
|
|
|
"No coder agent configured. Specify an agent_name explicitly."
|
|
|
|
|
.to_string()
|
|
|
|
|
}
|
|
|
|
|
})?,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
key = composite_key(story_id, &resolved_name);
|
|
|
|
|
|
|
|
|
|
// Check for duplicate assignment (same story + same agent already active).
|
|
|
|
|
if let Some(agent) = agents.get(&key)
|
|
|
|
|
&& (agent.status == AgentStatus::Running || agent.status == AgentStatus::Pending)
|
|
|
|
|
{
|
|
|
|
|
return Err(format!(
|
|
|
|
|
"Agent '{resolved_name}' for story '{story_id}' is already {}",
|
|
|
|
|
agent.status
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
// Enforce single-stage concurrency: reject if there is already a
|
|
|
|
|
// Running/Pending agent at the same pipeline stage for this story.
|
|
|
|
|
// This prevents two coders (or two QA/mergemaster agents) from
|
|
|
|
|
// corrupting each other's work in the same worktree.
|
|
|
|
|
// Applies to both explicit and auto-selected agents; the Other
|
|
|
|
|
// stage (supervisors, unknown agents) is exempt.
|
|
|
|
|
let resolved_stage = config
|
|
|
|
|
.find_agent(&resolved_name)
|
|
|
|
|
.map(agent_config_stage)
|
|
|
|
|
.unwrap_or_else(|| pipeline_stage(&resolved_name));
|
|
|
|
|
if resolved_stage != PipelineStage::Other
|
|
|
|
|
&& let Some(conflicting_name) = agents.iter().find_map(|(k, a)| {
|
|
|
|
|
let k_story = k.rsplit_once(':').map(|(s, _)| s).unwrap_or(k);
|
|
|
|
|
if k_story == story_id
|
|
|
|
|
&& a.agent_name != resolved_name
|
|
|
|
|
&& matches!(a.status, AgentStatus::Running | AgentStatus::Pending)
|
|
|
|
|
{
|
|
|
|
|
let a_stage = config
|
|
|
|
|
.find_agent(&a.agent_name)
|
|
|
|
|
.map(agent_config_stage)
|
|
|
|
|
.unwrap_or_else(|| pipeline_stage(&a.agent_name));
|
|
|
|
|
if a_stage == resolved_stage {
|
|
|
|
|
Some(a.agent_name.clone())
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
{
|
|
|
|
|
return Err(format!(
|
|
|
|
|
"Cannot start '{resolved_name}' on story '{story_id}': \
|
|
|
|
|
'{conflicting_name}' is already active at the same pipeline stage"
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
// Enforce single-instance concurrency for explicitly-named agents:
|
|
|
|
|
// if this agent is already running on any other story, reject.
|
|
|
|
|
// Auto-selected agents are already guaranteed idle by
|
|
|
|
|
// find_free_agent_for_stage, so this check is only needed for
|
|
|
|
|
// explicit requests.
|
|
|
|
|
if agent_name.is_some()
|
|
|
|
|
&& let Some(busy_story) = agents.iter().find_map(|(k, a)| {
|
|
|
|
|
if a.agent_name == resolved_name
|
|
|
|
|
&& matches!(a.status, AgentStatus::Running | AgentStatus::Pending)
|
|
|
|
|
{
|
|
|
|
|
Some(
|
|
|
|
|
k.rsplit_once(':')
|
|
|
|
|
.map(|(sid, _)| sid)
|
|
|
|
|
.unwrap_or(k)
|
|
|
|
|
.to_string(),
|
|
|
|
|
)
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
{
|
|
|
|
|
return Err(format!(
|
|
|
|
|
"Agent '{resolved_name}' is already running on story '{busy_story}'; \
|
|
|
|
|
story '{story_id}' will be picked up when the agent becomes available"
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
agents.insert(
|
|
|
|
|
key.clone(),
|
|
|
|
|
StoryAgent {
|
|
|
|
|
agent_name: resolved_name.clone(),
|
|
|
|
|
status: AgentStatus::Pending,
|
|
|
|
|
worktree_info: None,
|
|
|
|
|
session_id: None,
|
|
|
|
|
tx: tx.clone(),
|
|
|
|
|
task_handle: None,
|
|
|
|
|
event_log: event_log.clone(),
|
|
|
|
|
completion: None,
|
|
|
|
|
project_root: Some(project_root.to_path_buf()),
|
|
|
|
|
log_session_id: Some(log_session_id.clone()),
|
|
|
|
|
merge_failure_reported: false,
|
|
|
|
|
},
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
let mut pending_guard = PendingGuard::new(self.agents.clone(), key.clone());
|
|
|
|
|
|
|
|
|
|
// Create persistent log writer (needs resolved_name, so must be after
|
|
|
|
|
// the atomic resolution above).
|
|
|
|
|
let log_writer =
|
|
|
|
|
match AgentLogWriter::new(project_root, story_id, &resolved_name, &log_session_id) {
|
|
|
|
|
Ok(w) => Some(Arc::new(Mutex::new(w))),
|
|
|
|
|
Err(e) => {
|
|
|
|
|
eprintln!(
|
|
|
|
|
"[agents] Failed to create log writer for {story_id}:{resolved_name}: {e}"
|
|
|
|
|
);
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Notify WebSocket clients that a new agent is pending.
|
|
|
|
|
Self::notify_agent_state_changed(&self.watcher_tx);
|
|
|
|
|
|
|
|
|
|
let _ = tx.send(AgentEvent::Status {
|
|
|
|
|
story_id: story_id.to_string(),
|
|
|
|
|
agent_name: resolved_name.clone(),
|
|
|
|
|
status: "pending".to_string(),
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Extract inactivity timeout from the agent config before cloning config.
|
|
|
|
|
let inactivity_timeout_secs = config
|
|
|
|
|
.find_agent(&resolved_name)
|
|
|
|
|
.map(|a| a.inactivity_timeout_secs)
|
|
|
|
|
.unwrap_or(300);
|
|
|
|
|
|
|
|
|
|
// Clone all values needed inside the background spawn.
|
|
|
|
|
let project_root_clone = project_root.to_path_buf();
|
|
|
|
|
let config_clone = config.clone();
|
|
|
|
|
let resume_context_owned = resume_context.map(str::to_string);
|
|
|
|
|
let sid = story_id.to_string();
|
|
|
|
|
let aname = resolved_name.clone();
|
|
|
|
|
let tx_clone = tx.clone();
|
|
|
|
|
let agents_ref = self.agents.clone();
|
|
|
|
|
let key_clone = key.clone();
|
|
|
|
|
let log_clone = event_log.clone();
|
|
|
|
|
let port_for_task = self.port;
|
|
|
|
|
let log_writer_clone = log_writer.clone();
|
|
|
|
|
let child_killers_clone = self.child_killers.clone();
|
|
|
|
|
let watcher_tx_clone = self.watcher_tx.clone();
|
|
|
|
|
|
|
|
|
|
// Spawn the background task. Worktree creation and agent launch happen here
|
|
|
|
|
// so `start_agent` returns immediately after registering the agent as
|
|
|
|
|
// Pending — non-blocking by design (story 157).
|
|
|
|
|
let handle = tokio::spawn(async move {
|
|
|
|
|
// Step 1: create the worktree (slow — git checkout, pnpm install, etc.)
|
|
|
|
|
let wt_info = match worktree::create_worktree(
|
|
|
|
|
&project_root_clone,
|
|
|
|
|
&sid,
|
|
|
|
|
&config_clone,
|
|
|
|
|
port_for_task,
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(wt) => wt,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
let error_msg = format!("Failed to create worktree: {e}");
|
|
|
|
|
slog_error!("[agents] {error_msg}");
|
|
|
|
|
let event = AgentEvent::Error {
|
|
|
|
|
story_id: sid.clone(),
|
|
|
|
|
agent_name: aname.clone(),
|
|
|
|
|
message: error_msg,
|
|
|
|
|
};
|
|
|
|
|
if let Ok(mut log) = log_clone.lock() {
|
|
|
|
|
log.push(event.clone());
|
|
|
|
|
}
|
|
|
|
|
let _ = tx_clone.send(event);
|
|
|
|
|
if let Ok(mut agents) = agents_ref.lock()
|
|
|
|
|
&& let Some(agent) = agents.get_mut(&key_clone)
|
|
|
|
|
{
|
|
|
|
|
agent.status = AgentStatus::Failed;
|
|
|
|
|
}
|
|
|
|
|
Self::notify_agent_state_changed(&watcher_tx_clone);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Step 2: store worktree info and render agent command/args/prompt.
|
|
|
|
|
let wt_path_str = wt_info.path.to_string_lossy().to_string();
|
|
|
|
|
{
|
|
|
|
|
if let Ok(mut agents) = agents_ref.lock()
|
|
|
|
|
&& let Some(agent) = agents.get_mut(&key_clone)
|
|
|
|
|
{
|
|
|
|
|
agent.worktree_info = Some(wt_info.clone());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let (command, args, mut prompt) = match config_clone.render_agent_args(
|
|
|
|
|
&wt_path_str,
|
|
|
|
|
&sid,
|
|
|
|
|
Some(&aname),
|
|
|
|
|
Some(&wt_info.base_branch),
|
|
|
|
|
) {
|
|
|
|
|
Ok(result) => result,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
let error_msg = format!("Failed to render agent args: {e}");
|
|
|
|
|
slog_error!("[agents] {error_msg}");
|
|
|
|
|
let event = AgentEvent::Error {
|
|
|
|
|
story_id: sid.clone(),
|
|
|
|
|
agent_name: aname.clone(),
|
|
|
|
|
message: error_msg,
|
|
|
|
|
};
|
|
|
|
|
if let Ok(mut log) = log_clone.lock() {
|
|
|
|
|
log.push(event.clone());
|
|
|
|
|
}
|
|
|
|
|
let _ = tx_clone.send(event);
|
|
|
|
|
if let Ok(mut agents) = agents_ref.lock()
|
|
|
|
|
&& let Some(agent) = agents.get_mut(&key_clone)
|
|
|
|
|
{
|
|
|
|
|
agent.status = AgentStatus::Failed;
|
|
|
|
|
}
|
|
|
|
|
Self::notify_agent_state_changed(&watcher_tx_clone);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Append resume context if this is a restart with failure information.
|
|
|
|
|
if let Some(ctx) = resume_context_owned {
|
|
|
|
|
prompt.push_str(&ctx);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Step 3: transition to Running now that the worktree is ready.
|
|
|
|
|
{
|
|
|
|
|
if let Ok(mut agents) = agents_ref.lock()
|
|
|
|
|
&& let Some(agent) = agents.get_mut(&key_clone)
|
|
|
|
|
{
|
|
|
|
|
agent.status = AgentStatus::Running;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
let _ = tx_clone.send(AgentEvent::Status {
|
|
|
|
|
story_id: sid.clone(),
|
|
|
|
|
agent_name: aname.clone(),
|
|
|
|
|
status: "running".to_string(),
|
|
|
|
|
});
|
|
|
|
|
Self::notify_agent_state_changed(&watcher_tx_clone);
|
|
|
|
|
|
|
|
|
|
// Step 4: launch the agent process.
|
|
|
|
|
match super::pty::run_agent_pty_streaming(
|
|
|
|
|
&sid,
|
|
|
|
|
&aname,
|
|
|
|
|
&command,
|
|
|
|
|
&args,
|
|
|
|
|
&prompt,
|
|
|
|
|
&wt_path_str,
|
|
|
|
|
&tx_clone,
|
|
|
|
|
&log_clone,
|
|
|
|
|
log_writer_clone,
|
|
|
|
|
inactivity_timeout_secs,
|
|
|
|
|
child_killers_clone,
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(pty_result) => {
|
|
|
|
|
// Persist token usage if the agent reported it.
|
|
|
|
|
if let Some(ref usage) = pty_result.token_usage
|
|
|
|
|
&& let Ok(agents) = agents_ref.lock()
|
|
|
|
|
&& let Some(agent) = agents.get(&key_clone)
|
|
|
|
|
&& let Some(ref pr) = agent.project_root
|
|
|
|
|
{
|
|
|
|
|
let model = config_clone
|
|
|
|
|
.find_agent(&aname)
|
|
|
|
|
.and_then(|a| a.model.clone());
|
|
|
|
|
let record = super::token_usage::build_record(
|
|
|
|
|
&sid, &aname, model, usage.clone(),
|
|
|
|
|
);
|
|
|
|
|
if let Err(e) = super::token_usage::append_record(pr, &record) {
|
|
|
|
|
slog_error!(
|
|
|
|
|
"[agents] Failed to persist token usage for \
|
|
|
|
|
{sid}:{aname}: {e}"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Server-owned completion: run acceptance gates automatically
|
|
|
|
|
// when the agent process exits normally.
|
|
|
|
|
pipeline::run_server_owned_completion(
|
|
|
|
|
&agents_ref,
|
|
|
|
|
port_for_task,
|
|
|
|
|
&sid,
|
|
|
|
|
&aname,
|
|
|
|
|
pty_result.session_id,
|
|
|
|
|
watcher_tx_clone.clone(),
|
|
|
|
|
)
|
|
|
|
|
.await;
|
|
|
|
|
Self::notify_agent_state_changed(&watcher_tx_clone);
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
slog_error!("[agents] Agent process error for {aname} on {sid}: {e}");
|
|
|
|
|
let event = AgentEvent::Error {
|
|
|
|
|
story_id: sid.clone(),
|
|
|
|
|
agent_name: aname.clone(),
|
|
|
|
|
message: e,
|
|
|
|
|
};
|
|
|
|
|
if let Ok(mut log) = log_clone.lock() {
|
|
|
|
|
log.push(event.clone());
|
|
|
|
|
}
|
|
|
|
|
let _ = tx_clone.send(event);
|
|
|
|
|
if let Ok(mut agents) = agents_ref.lock()
|
|
|
|
|
&& let Some(agent) = agents.get_mut(&key_clone)
|
|
|
|
|
{
|
|
|
|
|
agent.status = AgentStatus::Failed;
|
|
|
|
|
}
|
|
|
|
|
Self::notify_agent_state_changed(&watcher_tx_clone);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Store the task handle while the agent is still Pending.
|
|
|
|
|
{
|
|
|
|
|
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
|
|
|
if let Some(agent) = agents.get_mut(&key) {
|
|
|
|
|
agent.task_handle = Some(handle);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Agent successfully spawned — prevent the guard from removing the entry.
|
|
|
|
|
pending_guard.disarm();
|
|
|
|
|
|
|
|
|
|
Ok(AgentInfo {
|
|
|
|
|
story_id: story_id.to_string(),
|
|
|
|
|
agent_name: resolved_name,
|
|
|
|
|
status: AgentStatus::Pending,
|
|
|
|
|
session_id: None,
|
|
|
|
|
worktree_path: None,
|
|
|
|
|
base_branch: None,
|
|
|
|
|
completion: None,
|
|
|
|
|
log_session_id: Some(log_session_id),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Stop a running agent. Worktree is preserved for inspection.
|
|
|
|
|
pub async fn stop_agent(
|
|
|
|
|
&self,
|
|
|
|
|
_project_root: &Path,
|
|
|
|
|
story_id: &str,
|
|
|
|
|
agent_name: &str,
|
|
|
|
|
) -> Result<(), String> {
|
|
|
|
|
let key = composite_key(story_id, agent_name);
|
|
|
|
|
|
|
|
|
|
let (worktree_info, task_handle, tx) = {
|
|
|
|
|
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
|
|
|
let agent = agents
|
|
|
|
|
.get_mut(&key)
|
|
|
|
|
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
|
|
|
|
|
|
|
|
|
|
let wt = agent.worktree_info.clone();
|
|
|
|
|
let handle = agent.task_handle.take();
|
|
|
|
|
let tx = agent.tx.clone();
|
|
|
|
|
agent.status = AgentStatus::Failed;
|
|
|
|
|
(wt, handle, tx)
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Abort the task and kill the PTY child process.
|
|
|
|
|
// Note: aborting a spawn_blocking task handle does not interrupt the blocking
|
|
|
|
|
// thread, so we must also kill the child process directly via the killer registry.
|
|
|
|
|
if let Some(handle) = task_handle {
|
|
|
|
|
handle.abort();
|
|
|
|
|
let _ = handle.await;
|
|
|
|
|
}
|
|
|
|
|
self.kill_child_for_key(&key);
|
|
|
|
|
|
|
|
|
|
// Preserve worktree for inspection — don't destroy agent's work on stop.
|
|
|
|
|
if let Some(ref wt) = worktree_info {
|
|
|
|
|
slog!(
|
|
|
|
|
"[agents] Worktree preserved for {story_id}:{agent_name}: {}",
|
|
|
|
|
wt.path.display()
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let _ = tx.send(AgentEvent::Status {
|
|
|
|
|
story_id: story_id.to_string(),
|
|
|
|
|
agent_name: agent_name.to_string(),
|
|
|
|
|
status: "stopped".to_string(),
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Remove from map
|
|
|
|
|
{
|
|
|
|
|
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
|
|
|
agents.remove(&key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Notify WebSocket clients so pipeline board and agent panel update.
|
|
|
|
|
Self::notify_agent_state_changed(&self.watcher_tx);
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Return the names of configured agents for `stage` that are not currently
|
|
|
|
|
/// running or pending.
|
|
|
|
|
pub fn available_agents_for_stage(
|
|
|
|
|
&self,
|
|
|
|
|
config: &ProjectConfig,
|
|
|
|
|
stage: &PipelineStage,
|
|
|
|
|
) -> Result<Vec<String>, String> {
|
|
|
|
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
|
|
|
Ok(config
|
|
|
|
|
.agent
|
|
|
|
|
.iter()
|
|
|
|
|
.filter(|cfg| agent_config_stage(cfg) == *stage)
|
|
|
|
|
.filter(|cfg| {
|
|
|
|
|
!agents.values().any(|a| {
|
|
|
|
|
a.agent_name == cfg.name
|
|
|
|
|
&& matches!(a.status, AgentStatus::Running | AgentStatus::Pending)
|
|
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
.map(|cfg| cfg.name.clone())
|
|
|
|
|
.collect())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// List all agents with their status.
|
|
|
|
|
pub fn list_agents(&self) -> Result<Vec<AgentInfo>, String> {
|
|
|
|
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
|
|
|
Ok(agents
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|(key, agent)| {
|
|
|
|
|
// Extract story_id from composite key "story_id:agent_name"
|
|
|
|
|
let story_id = key
|
|
|
|
|
.rsplit_once(':')
|
|
|
|
|
.map(|(sid, _)| sid.to_string())
|
|
|
|
|
.unwrap_or_else(|| key.clone());
|
|
|
|
|
agent_info_from_entry(&story_id, agent)
|
|
|
|
|
})
|
|
|
|
|
.collect())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Subscribe to events for a story agent.
|
|
|
|
|
pub fn subscribe(
|
|
|
|
|
&self,
|
|
|
|
|
story_id: &str,
|
|
|
|
|
agent_name: &str,
|
|
|
|
|
) -> Result<broadcast::Receiver<AgentEvent>, String> {
|
|
|
|
|
let key = composite_key(story_id, agent_name);
|
|
|
|
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
|
|
|
let agent = agents
|
|
|
|
|
.get(&key)
|
|
|
|
|
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
|
|
|
|
|
Ok(agent.tx.subscribe())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Drain accumulated events for polling. Returns all events since the last drain.
|
|
|
|
|
pub fn drain_events(
|
|
|
|
|
&self,
|
|
|
|
|
story_id: &str,
|
|
|
|
|
agent_name: &str,
|
|
|
|
|
) -> Result<Vec<AgentEvent>, String> {
|
|
|
|
|
let key = composite_key(story_id, agent_name);
|
|
|
|
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
|
|
|
let agent = agents
|
|
|
|
|
.get(&key)
|
|
|
|
|
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
|
|
|
|
|
let mut log = agent.event_log.lock().map_err(|e| e.to_string())?;
|
|
|
|
|
Ok(log.drain(..).collect())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Block until the agent reaches a terminal state (completed, failed, stopped).
|
|
|
|
|
/// Returns the agent's final `AgentInfo`.
|
|
|
|
|
/// `timeout_ms` caps how long to wait; returns an error if the deadline passes.
|
|
|
|
|
pub async fn wait_for_agent(
|
|
|
|
|
&self,
|
|
|
|
|
story_id: &str,
|
|
|
|
|
agent_name: &str,
|
|
|
|
|
timeout_ms: u64,
|
|
|
|
|
) -> Result<AgentInfo, String> {
|
|
|
|
|
// Subscribe before checking status so we don't miss the terminal event
|
|
|
|
|
// if the agent completes in the window between the two operations.
|
|
|
|
|
let mut rx = self.subscribe(story_id, agent_name)?;
|
|
|
|
|
|
|
|
|
|
// Return immediately if already in a terminal state.
|
|
|
|
|
{
|
|
|
|
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
|
|
|
let key = composite_key(story_id, agent_name);
|
|
|
|
|
if let Some(agent) = agents.get(&key)
|
|
|
|
|
&& matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed)
|
|
|
|
|
{
|
|
|
|
|
return Ok(agent_info_from_entry(story_id, agent));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
|
|
|
|
|
if remaining.is_zero() {
|
|
|
|
|
return Err(format!(
|
|
|
|
|
"Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'"
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
match tokio::time::timeout(remaining, rx.recv()).await {
|
|
|
|
|
Ok(Ok(event)) => {
|
|
|
|
|
let is_terminal = match &event {
|
|
|
|
|
AgentEvent::Done { .. } | AgentEvent::Error { .. } => true,
|
|
|
|
|
AgentEvent::Status { status, .. } if status == "stopped" => true,
|
|
|
|
|
_ => false,
|
|
|
|
|
};
|
|
|
|
|
if is_terminal {
|
|
|
|
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
|
|
|
let key = composite_key(story_id, agent_name);
|
|
|
|
|
return Ok(if let Some(agent) = agents.get(&key) {
|
|
|
|
|
agent_info_from_entry(story_id, agent)
|
|
|
|
|
} else {
|
|
|
|
|
// Agent was removed from map (e.g. stop_agent removes it after
|
|
|
|
|
// the "stopped" status event is sent).
|
|
|
|
|
let (status, session_id) = match event {
|
|
|
|
|
AgentEvent::Done { session_id, .. } => {
|
|
|
|
|
(AgentStatus::Completed, session_id)
|
|
|
|
|
}
|
|
|
|
|
_ => (AgentStatus::Failed, None),
|
|
|
|
|
};
|
|
|
|
|
AgentInfo {
|
|
|
|
|
story_id: story_id.to_string(),
|
|
|
|
|
agent_name: agent_name.to_string(),
|
|
|
|
|
status,
|
|
|
|
|
session_id,
|
|
|
|
|
worktree_path: None,
|
|
|
|
|
base_branch: None,
|
|
|
|
|
completion: None,
|
|
|
|
|
log_session_id: None,
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Ok(Err(broadcast::error::RecvError::Lagged(_))) => {
|
|
|
|
|
// Missed some buffered events — check current status before resuming.
|
|
|
|
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
|
|
|
let key = composite_key(story_id, agent_name);
|
|
|
|
|
if let Some(agent) = agents.get(&key)
|
|
|
|
|
&& matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed)
|
|
|
|
|
{
|
|
|
|
|
return Ok(agent_info_from_entry(story_id, agent));
|
|
|
|
|
}
|
|
|
|
|
// Still running — continue the loop.
|
|
|
|
|
}
|
|
|
|
|
Ok(Err(broadcast::error::RecvError::Closed)) => {
|
|
|
|
|
// Channel closed: no more events will arrive. Return current state.
|
|
|
|
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
|
|
|
let key = composite_key(story_id, agent_name);
|
|
|
|
|
if let Some(agent) = agents.get(&key) {
|
|
|
|
|
return Ok(agent_info_from_entry(story_id, agent));
|
|
|
|
|
}
|
|
|
|
|
return Err(format!(
|
|
|
|
|
"Agent '{agent_name}' for story '{story_id}' channel closed unexpectedly"
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
Err(_) => {
|
|
|
|
|
return Err(format!(
|
|
|
|
|
"Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'"
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Create a worktree for the given story using the server port (writes .mcp.json).
|
|
|
|
|
pub async fn create_worktree(
|
|
|
|
|
&self,
|
|
|
|
|
project_root: &Path,
|
|
|
|
|
story_id: &str,
|
|
|
|
|
) -> Result<worktree::WorktreeInfo, String> {
|
|
|
|
|
let config = ProjectConfig::load(project_root)?;
|
|
|
|
|
worktree::create_worktree(project_root, story_id, &config, self.port).await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get project root helper.
|
|
|
|
|
pub fn get_project_root(&self, state: &crate::state::SessionState) -> Result<PathBuf, String> {
|
|
|
|
|
state.get_project_root()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get the log session ID and project root for an agent, if available.
|
|
|
|
|
///
|
|
|
|
|
/// Used by MCP tools to find the persistent log file for a completed agent.
|
|
|
|
|
pub fn get_log_info(&self, story_id: &str, agent_name: &str) -> Option<(String, PathBuf)> {
|
|
|
|
|
let key = composite_key(story_id, agent_name);
|
|
|
|
|
let agents = self.agents.lock().ok()?;
|
|
|
|
|
let agent = agents.get(&key)?;
|
|
|
|
|
let session_id = agent.log_session_id.clone()?;
|
|
|
|
|
let project_root = agent.project_root.clone()?;
|
|
|
|
|
Some((session_id, project_root))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Remove all agent entries for a given story_id from the pool.
|
|
|
|
|
///
|
|
|
|
|
/// Called when a story is archived so that stale entries don't accumulate.
|
|
|
|
|
/// Returns the number of entries removed.
|
|
|
|
|
pub fn remove_agents_for_story(&self, story_id: &str) -> usize {
|
|
|
|
|
let mut agents = match self.agents.lock() {
|
|
|
|
|
Ok(a) => a,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
slog_error!("[agents] Failed to lock pool for cleanup of '{story_id}': {e}");
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
let prefix = format!("{story_id}:");
|
|
|
|
|
let keys_to_remove: Vec<String> = agents
|
|
|
|
|
.keys()
|
|
|
|
|
.filter(|k| k.starts_with(&prefix))
|
|
|
|
|
.cloned()
|
|
|
|
|
.collect();
|
|
|
|
|
let count = keys_to_remove.len();
|
|
|
|
|
for key in &keys_to_remove {
|
|
|
|
|
agents.remove(key);
|
|
|
|
|
}
|
|
|
|
|
if count > 0 {
|
|
|
|
|
slog!("[agents] Removed {count} agent entries for archived story '{story_id}'");
|
|
|
|
|
}
|
|
|
|
|
count
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Test helper: inject a pre-built agent entry so unit tests can exercise
|
|
|
|
|
/// wait/subscribe logic without spawning a real process.
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
pub fn inject_test_agent(
|
|
|
|
|
&self,
|
|
|
|
|
story_id: &str,
|
|
|
|
|
agent_name: &str,
|
|
|
|
|
status: AgentStatus,
|
|
|
|
|
) -> broadcast::Sender<AgentEvent> {
|
|
|
|
|
let (tx, _) = broadcast::channel::<AgentEvent>(64);
|
|
|
|
|
let key = composite_key(story_id, agent_name);
|
|
|
|
|
let mut agents = self.agents.lock().unwrap();
|
|
|
|
|
agents.insert(
|
|
|
|
|
key,
|
|
|
|
|
StoryAgent {
|
|
|
|
|
agent_name: agent_name.to_string(),
|
|
|
|
|
status,
|
|
|
|
|
worktree_info: None,
|
|
|
|
|
session_id: None,
|
|
|
|
|
tx: tx.clone(),
|
|
|
|
|
task_handle: None,
|
|
|
|
|
event_log: Arc::new(Mutex::new(Vec::new())),
|
|
|
|
|
completion: None,
|
|
|
|
|
project_root: None,
|
|
|
|
|
log_session_id: None,
|
|
|
|
|
merge_failure_reported: false,
|
|
|
|
|
},
|
|
|
|
|
);
|
|
|
|
|
tx
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Test helper: inject an agent with a specific worktree path for testing
|
|
|
|
|
/// gate-related logic.
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
pub fn inject_test_agent_with_path(
|
|
|
|
|
&self,
|
|
|
|
|
story_id: &str,
|
|
|
|
|
agent_name: &str,
|
|
|
|
|
status: AgentStatus,
|
|
|
|
|
worktree_path: PathBuf,
|
|
|
|
|
) -> broadcast::Sender<AgentEvent> {
|
|
|
|
|
let (tx, _) = broadcast::channel::<AgentEvent>(64);
|
|
|
|
|
let key = composite_key(story_id, agent_name);
|
|
|
|
|
let mut agents = self.agents.lock().unwrap();
|
|
|
|
|
agents.insert(
|
|
|
|
|
key,
|
|
|
|
|
StoryAgent {
|
|
|
|
|
agent_name: agent_name.to_string(),
|
|
|
|
|
status,
|
|
|
|
|
worktree_info: Some(WorktreeInfo {
|
|
|
|
|
path: worktree_path,
|
|
|
|
|
branch: format!("feature/story-{story_id}"),
|
|
|
|
|
base_branch: "master".to_string(),
|
|
|
|
|
}),
|
|
|
|
|
session_id: None,
|
|
|
|
|
tx: tx.clone(),
|
|
|
|
|
task_handle: None,
|
|
|
|
|
event_log: Arc::new(Mutex::new(Vec::new())),
|
|
|
|
|
completion: None,
|
|
|
|
|
project_root: None,
|
|
|
|
|
log_session_id: None,
|
|
|
|
|
merge_failure_reported: false,
|
|
|
|
|
},
|
|
|
|
|
);
|
|
|
|
|
tx
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Test helper: inject an agent with a completion report and project_root
|
|
|
|
|
/// for testing pipeline advance logic without spawning real agents.
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
pub fn inject_test_agent_with_completion(
|
|
|
|
|
&self,
|
|
|
|
|
story_id: &str,
|
|
|
|
|
agent_name: &str,
|
|
|
|
|
status: AgentStatus,
|
|
|
|
|
project_root: PathBuf,
|
|
|
|
|
completion: CompletionReport,
|
|
|
|
|
) -> broadcast::Sender<AgentEvent> {
|
|
|
|
|
let (tx, _) = broadcast::channel::<AgentEvent>(64);
|
|
|
|
|
let key = composite_key(story_id, agent_name);
|
|
|
|
|
let mut agents = self.agents.lock().unwrap();
|
|
|
|
|
agents.insert(
|
|
|
|
|
key,
|
|
|
|
|
StoryAgent {
|
|
|
|
|
agent_name: agent_name.to_string(),
|
|
|
|
|
status,
|
|
|
|
|
worktree_info: None,
|
|
|
|
|
session_id: None,
|
|
|
|
|
tx: tx.clone(),
|
|
|
|
|
task_handle: None,
|
|
|
|
|
event_log: Arc::new(Mutex::new(Vec::new())),
|
|
|
|
|
completion: Some(completion),
|
|
|
|
|
project_root: Some(project_root),
|
|
|
|
|
log_session_id: None,
|
|
|
|
|
merge_failure_reported: false,
|
|
|
|
|
},
|
|
|
|
|
);
|
|
|
|
|
tx
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Inject a Running agent with a pre-built (possibly finished) task handle.
|
|
|
|
|
/// Used by watchdog tests to simulate an orphaned agent.
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
pub fn inject_test_agent_with_handle(
|
|
|
|
|
&self,
|
|
|
|
|
story_id: &str,
|
|
|
|
|
agent_name: &str,
|
|
|
|
|
status: AgentStatus,
|
|
|
|
|
task_handle: tokio::task::JoinHandle<()>,
|
|
|
|
|
) -> broadcast::Sender<AgentEvent> {
|
|
|
|
|
let (tx, _) = broadcast::channel::<AgentEvent>(64);
|
|
|
|
|
let key = composite_key(story_id, agent_name);
|
|
|
|
|
let mut agents = self.agents.lock().unwrap();
|
|
|
|
|
agents.insert(
|
|
|
|
|
key,
|
|
|
|
|
StoryAgent {
|
|
|
|
|
agent_name: agent_name.to_string(),
|
|
|
|
|
status,
|
|
|
|
|
worktree_info: None,
|
|
|
|
|
session_id: None,
|
|
|
|
|
tx: tx.clone(),
|
|
|
|
|
task_handle: Some(task_handle),
|
|
|
|
|
event_log: Arc::new(Mutex::new(Vec::new())),
|
|
|
|
|
completion: None,
|
|
|
|
|
project_root: None,
|
|
|
|
|
log_session_id: None,
|
|
|
|
|
merge_failure_reported: false,
|
|
|
|
|
},
|
|
|
|
|
);
|
|
|
|
|
tx
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Test helper: inject a child killer into the registry.
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
pub fn inject_child_killer(&self, key: &str, killer: Box<dyn ChildKiller + Send + Sync>) {
|
|
|
|
|
let mut killers = self.child_killers.lock().unwrap();
|
|
|
|
|
killers.insert(key.to_string(), killer);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Test helper: return the number of registered child killers.
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
pub fn child_killer_count(&self) -> usize {
|
|
|
|
|
self.child_killers.lock().unwrap().len()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Return the active pipeline stage directory name for `story_id`, or `None` if the
|
|
|
|
|
/// story is not in any active stage (`2_current/`, `3_qa/`, `4_merge/`).
|
|
|
|
|
fn find_active_story_stage(project_root: &Path, story_id: &str) -> Option<&'static str> {
|
|
|
|
|
const STAGES: [&str; 3] = ["2_current", "3_qa", "4_merge"];
|
|
|
|
|
for stage in &STAGES {
|
|
|
|
|
let path = project_root
|
2026-03-20 11:34:53 +00:00
|
|
|
.join(".storkit")
|
2026-03-19 22:41:17 +00:00
|
|
|
.join("work")
|
|
|
|
|
.join(stage)
|
|
|
|
|
.join(format!("{story_id}.md"));
|
|
|
|
|
if path.exists() {
|
|
|
|
|
return Some(stage);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests {
|
|
|
|
|
use super::*;
|
|
|
|
|
use crate::agents::{AgentEvent, AgentStatus, PipelineStage};
|
|
|
|
|
use crate::config::ProjectConfig;
|
|
|
|
|
use portable_pty::{CommandBuilder, PtySize, native_pty_system};
|
|
|
|
|
|
|
|
|
|
fn make_config(toml_str: &str) -> ProjectConfig {
|
|
|
|
|
ProjectConfig::parse(toml_str).unwrap()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn wait_for_agent_returns_immediately_if_completed() {
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.inject_test_agent("s1", "bot", AgentStatus::Completed);
|
|
|
|
|
|
|
|
|
|
let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap();
|
|
|
|
|
assert_eq!(info.status, AgentStatus::Completed);
|
|
|
|
|
assert_eq!(info.story_id, "s1");
|
|
|
|
|
assert_eq!(info.agent_name, "bot");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn wait_for_agent_returns_immediately_if_failed() {
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.inject_test_agent("s2", "bot", AgentStatus::Failed);
|
|
|
|
|
|
|
|
|
|
let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap();
|
|
|
|
|
assert_eq!(info.status, AgentStatus::Failed);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn wait_for_agent_completes_on_done_event() {
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
// Send Done event after a short delay
|
|
|
|
|
let tx_clone = tx.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
|
|
|
|
let _ = tx_clone.send(AgentEvent::Done {
|
|
|
|
|
story_id: "s3".to_string(),
|
|
|
|
|
agent_name: "bot".to_string(),
|
|
|
|
|
session_id: Some("sess-abc".to_string()),
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let info = pool.wait_for_agent("s3", "bot", 2000).await.unwrap();
|
|
|
|
|
assert_eq!(info.story_id, "s3");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn wait_for_agent_times_out() {
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.inject_test_agent("s4", "bot", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
let result = pool.wait_for_agent("s4", "bot", 50).await;
|
|
|
|
|
assert!(result.is_err());
|
|
|
|
|
let msg = result.unwrap_err();
|
|
|
|
|
assert!(msg.contains("Timed out"), "unexpected message: {msg}");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn wait_for_agent_errors_for_nonexistent() {
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
let result = pool.wait_for_agent("no_story", "no_bot", 100).await;
|
|
|
|
|
assert!(result.is_err());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn wait_for_agent_completes_on_stopped_status_event() {
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
let tx_clone = tx.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
|
|
|
|
|
let _ = tx_clone.send(AgentEvent::Status {
|
|
|
|
|
story_id: "s5".to_string(),
|
|
|
|
|
agent_name: "bot".to_string(),
|
|
|
|
|
status: "stopped".to_string(),
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let info = pool.wait_for_agent("s5", "bot", 2000).await.unwrap();
|
|
|
|
|
assert_eq!(info.story_id, "s5");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── kill_all_children tests ────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
/// Returns true if a process with the given PID is currently running.
|
|
|
|
|
fn process_is_running(pid: u32) -> bool {
|
|
|
|
|
std::process::Command::new("ps")
|
|
|
|
|
.arg("-p")
|
|
|
|
|
.arg(pid.to_string())
|
|
|
|
|
.stdout(std::process::Stdio::null())
|
|
|
|
|
.stderr(std::process::Stdio::null())
|
|
|
|
|
.status()
|
|
|
|
|
.map(|s| s.success())
|
|
|
|
|
.unwrap_or(false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn kill_all_children_is_safe_on_empty_pool() {
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.kill_all_children();
|
|
|
|
|
assert_eq!(pool.child_killer_count(), 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn kill_all_children_kills_real_process() {
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
|
|
|
|
|
let pty_system = native_pty_system();
|
|
|
|
|
let pair = pty_system
|
|
|
|
|
.openpty(PtySize {
|
|
|
|
|
rows: 24,
|
|
|
|
|
cols: 80,
|
|
|
|
|
pixel_width: 0,
|
|
|
|
|
pixel_height: 0,
|
|
|
|
|
})
|
|
|
|
|
.expect("failed to open pty");
|
|
|
|
|
|
|
|
|
|
let mut cmd = CommandBuilder::new("sleep");
|
|
|
|
|
cmd.arg("100");
|
|
|
|
|
let mut child = pair
|
|
|
|
|
.slave
|
|
|
|
|
.spawn_command(cmd)
|
|
|
|
|
.expect("failed to spawn sleep");
|
|
|
|
|
let pid = child.process_id().expect("no pid");
|
|
|
|
|
|
|
|
|
|
pool.inject_child_killer("story:agent", child.clone_killer());
|
|
|
|
|
|
|
|
|
|
assert!(
|
|
|
|
|
process_is_running(pid),
|
|
|
|
|
"process {pid} should be running before kill_all_children"
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
pool.kill_all_children();
|
|
|
|
|
let _ = child.wait();
|
|
|
|
|
|
|
|
|
|
assert!(
|
|
|
|
|
!process_is_running(pid),
|
|
|
|
|
"process {pid} should have been killed by kill_all_children"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn kill_all_children_clears_registry() {
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
|
|
|
|
|
let pty_system = native_pty_system();
|
|
|
|
|
let pair = pty_system
|
|
|
|
|
.openpty(PtySize {
|
|
|
|
|
rows: 24,
|
|
|
|
|
cols: 80,
|
|
|
|
|
pixel_width: 0,
|
|
|
|
|
pixel_height: 0,
|
|
|
|
|
})
|
|
|
|
|
.expect("failed to open pty");
|
|
|
|
|
|
|
|
|
|
let mut cmd = CommandBuilder::new("sleep");
|
|
|
|
|
cmd.arg("1");
|
|
|
|
|
let mut child = pair
|
|
|
|
|
.slave
|
|
|
|
|
.spawn_command(cmd)
|
|
|
|
|
.expect("failed to spawn sleep");
|
|
|
|
|
|
|
|
|
|
pool.inject_child_killer("story:agent", child.clone_killer());
|
|
|
|
|
assert_eq!(pool.child_killer_count(), 1);
|
|
|
|
|
|
|
|
|
|
pool.kill_all_children();
|
|
|
|
|
let _ = child.wait();
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
pool.child_killer_count(),
|
|
|
|
|
0,
|
|
|
|
|
"child_killers should be cleared after kill_all_children"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── available_agents_for_stage tests (story 190) ──────────────────────────
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn available_agents_for_stage_returns_idle_agents() {
|
|
|
|
|
let config = make_config(
|
|
|
|
|
r#"
|
|
|
|
|
[[agent]]
|
|
|
|
|
name = "coder-1"
|
|
|
|
|
stage = "coder"
|
|
|
|
|
|
|
|
|
|
[[agent]]
|
|
|
|
|
name = "coder-2"
|
|
|
|
|
stage = "coder"
|
|
|
|
|
|
|
|
|
|
[[agent]]
|
|
|
|
|
name = "qa"
|
|
|
|
|
stage = "qa"
|
|
|
|
|
"#,
|
|
|
|
|
);
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
let available = pool
|
|
|
|
|
.available_agents_for_stage(&config, &PipelineStage::Coder)
|
|
|
|
|
.unwrap();
|
|
|
|
|
assert_eq!(available, vec!["coder-2"]);
|
|
|
|
|
|
|
|
|
|
let available_qa = pool
|
|
|
|
|
.available_agents_for_stage(&config, &PipelineStage::Qa)
|
|
|
|
|
.unwrap();
|
|
|
|
|
assert_eq!(available_qa, vec!["qa"]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn available_agents_for_stage_returns_empty_when_all_busy() {
|
|
|
|
|
let config = make_config(
|
|
|
|
|
r#"
|
|
|
|
|
[[agent]]
|
|
|
|
|
name = "coder-1"
|
|
|
|
|
stage = "coder"
|
|
|
|
|
"#,
|
|
|
|
|
);
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
let available = pool
|
|
|
|
|
.available_agents_for_stage(&config, &PipelineStage::Coder)
|
|
|
|
|
.unwrap();
|
|
|
|
|
assert!(available.is_empty());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn available_agents_for_stage_ignores_completed_agents() {
|
|
|
|
|
let config = make_config(
|
|
|
|
|
r#"
|
|
|
|
|
[[agent]]
|
|
|
|
|
name = "coder-1"
|
|
|
|
|
stage = "coder"
|
|
|
|
|
"#,
|
|
|
|
|
);
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.inject_test_agent("story-1", "coder-1", AgentStatus::Completed);
|
|
|
|
|
|
|
|
|
|
let available = pool
|
|
|
|
|
.available_agents_for_stage(&config, &PipelineStage::Coder)
|
|
|
|
|
.unwrap();
|
|
|
|
|
assert_eq!(available, vec!["coder-1"]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_auto_selects_second_coder_when_first_busy() {
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk = tmp.path().join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
std::fs::create_dir_all(&sk).unwrap();
|
|
|
|
|
std::fs::write(
|
|
|
|
|
sk.join("project.toml"),
|
|
|
|
|
r#"
|
|
|
|
|
[[agent]]
|
|
|
|
|
name = "supervisor"
|
|
|
|
|
stage = "other"
|
|
|
|
|
|
|
|
|
|
[[agent]]
|
|
|
|
|
name = "coder-1"
|
|
|
|
|
stage = "coder"
|
|
|
|
|
|
|
|
|
|
[[agent]]
|
|
|
|
|
name = "coder-2"
|
|
|
|
|
stage = "coder"
|
|
|
|
|
"#,
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.inject_test_agent("other-story", "coder-1", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
let result = pool
|
|
|
|
|
.start_agent(tmp.path(), "42_my_story", None, None)
|
|
|
|
|
.await;
|
|
|
|
|
match result {
|
|
|
|
|
Ok(info) => {
|
|
|
|
|
assert_eq!(info.agent_name, "coder-2");
|
|
|
|
|
}
|
|
|
|
|
Err(err) => {
|
|
|
|
|
assert!(
|
|
|
|
|
!err.contains("All coder agents are busy"),
|
|
|
|
|
"should have selected coder-2 but got: {err}"
|
|
|
|
|
);
|
|
|
|
|
assert!(
|
|
|
|
|
!err.contains("No coder agent configured"),
|
|
|
|
|
"should not fail on agent selection, got: {err}"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_returns_busy_when_all_coders_occupied() {
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk = tmp.path().join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
std::fs::create_dir_all(&sk).unwrap();
|
|
|
|
|
std::fs::write(
|
|
|
|
|
sk.join("project.toml"),
|
|
|
|
|
r#"
|
|
|
|
|
[[agent]]
|
|
|
|
|
name = "coder-1"
|
|
|
|
|
stage = "coder"
|
|
|
|
|
|
|
|
|
|
[[agent]]
|
|
|
|
|
name = "coder-2"
|
|
|
|
|
stage = "coder"
|
|
|
|
|
"#,
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running);
|
|
|
|
|
pool.inject_test_agent("story-2", "coder-2", AgentStatus::Pending);
|
|
|
|
|
|
|
|
|
|
let result = pool.start_agent(tmp.path(), "story-3", None, None).await;
|
|
|
|
|
assert!(result.is_err());
|
|
|
|
|
let err = result.unwrap_err();
|
|
|
|
|
assert!(
|
|
|
|
|
err.contains("All coder agents are busy"),
|
|
|
|
|
"expected busy error, got: {err}"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_moves_story_to_current_when_coders_busy() {
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk = tmp.path().join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
let backlog = sk.join("work/1_backlog");
|
|
|
|
|
std::fs::create_dir_all(&backlog).unwrap();
|
|
|
|
|
std::fs::write(
|
|
|
|
|
sk.join("project.toml"),
|
|
|
|
|
r#"
|
|
|
|
|
[[agent]]
|
|
|
|
|
name = "coder-1"
|
|
|
|
|
stage = "coder"
|
|
|
|
|
"#,
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
std::fs::write(backlog.join("story-3.md"), "---\nname: Story 3\n---\n").unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
let result = pool.start_agent(tmp.path(), "story-3", None, None).await;
|
|
|
|
|
|
|
|
|
|
assert!(result.is_err());
|
|
|
|
|
let err = result.unwrap_err();
|
|
|
|
|
assert!(
|
|
|
|
|
err.contains("All coder agents are busy"),
|
|
|
|
|
"expected busy error, got: {err}"
|
|
|
|
|
);
|
|
|
|
|
assert!(
|
|
|
|
|
err.contains("queued in work/2_current/"),
|
|
|
|
|
"expected story-to-current message, got: {err}"
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let current_path = sk.join("work/2_current/story-3.md");
|
|
|
|
|
assert!(
|
|
|
|
|
current_path.exists(),
|
|
|
|
|
"story should be in 2_current/ after busy error, but was not"
|
|
|
|
|
);
|
|
|
|
|
let backlog_path = backlog.join("story-3.md");
|
|
|
|
|
assert!(
|
|
|
|
|
!backlog_path.exists(),
|
|
|
|
|
"story should no longer be in 1_backlog/"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_story_already_in_current_is_noop() {
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk = tmp.path().join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
let current = sk.join("work/2_current");
|
|
|
|
|
std::fs::create_dir_all(¤t).unwrap();
|
|
|
|
|
std::fs::write(
|
|
|
|
|
sk.join("project.toml"),
|
|
|
|
|
"[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
std::fs::write(current.join("story-5.md"), "---\nname: Story 5\n---\n").unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
|
|
|
|
|
let result = pool.start_agent(tmp.path(), "story-5", None, None).await;
|
|
|
|
|
match result {
|
|
|
|
|
Ok(_) => {}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
assert!(
|
|
|
|
|
!e.contains("Failed to move"),
|
|
|
|
|
"should not fail on idempotent move, got: {e}"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_explicit_name_unchanged_when_busy() {
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk = tmp.path().join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
std::fs::create_dir_all(&sk).unwrap();
|
|
|
|
|
std::fs::write(
|
|
|
|
|
sk.join("project.toml"),
|
|
|
|
|
r#"
|
|
|
|
|
[[agent]]
|
|
|
|
|
name = "coder-1"
|
|
|
|
|
stage = "coder"
|
|
|
|
|
|
|
|
|
|
[[agent]]
|
|
|
|
|
name = "coder-2"
|
|
|
|
|
stage = "coder"
|
|
|
|
|
"#,
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
let result = pool
|
|
|
|
|
.start_agent(tmp.path(), "story-2", Some("coder-1"), None)
|
|
|
|
|
.await;
|
|
|
|
|
assert!(result.is_err());
|
|
|
|
|
let err = result.unwrap_err();
|
|
|
|
|
assert!(
|
|
|
|
|
err.contains("coder-1") && err.contains("already running"),
|
|
|
|
|
"expected explicit busy error, got: {err}"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── start_agent single-instance concurrency tests ─────────────────────────
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_rejects_when_same_agent_already_running_on_another_story() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk_dir = root.join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(&sk_dir).unwrap();
|
|
|
|
|
fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.inject_test_agent("story-a", "qa", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
let result = pool.start_agent(root, "story-b", Some("qa"), None).await;
|
|
|
|
|
|
|
|
|
|
assert!(
|
|
|
|
|
result.is_err(),
|
|
|
|
|
"start_agent should fail when qa is already running on another story"
|
|
|
|
|
);
|
|
|
|
|
let err = result.unwrap_err();
|
|
|
|
|
assert!(
|
|
|
|
|
err.contains("already running") || err.contains("becomes available"),
|
|
|
|
|
"error message should explain why: got '{err}'"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_allows_new_story_when_previous_run_is_completed() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk_dir = root.join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(&sk_dir).unwrap();
|
|
|
|
|
fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.inject_test_agent("story-a", "qa", AgentStatus::Completed);
|
|
|
|
|
|
|
|
|
|
let result = pool.start_agent(root, "story-b", Some("qa"), None).await;
|
|
|
|
|
|
|
|
|
|
if let Err(ref e) = result {
|
|
|
|
|
assert!(
|
|
|
|
|
!e.contains("already running") && !e.contains("becomes available"),
|
|
|
|
|
"completed agent must not trigger the concurrency guard: got '{e}'"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── bug 118: pending entry cleanup on start_agent failure ────────────────
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_cleans_up_pending_entry_on_failure() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk_dir = root.join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(&sk_dir).unwrap();
|
|
|
|
|
fs::write(
|
|
|
|
|
sk_dir.join("project.toml"),
|
|
|
|
|
"[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let upcoming = root.join(".storkit/work/1_backlog");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(&upcoming).unwrap();
|
|
|
|
|
fs::write(upcoming.join("50_story_test.md"), "---\nname: Test\n---\n").unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3099);
|
|
|
|
|
|
|
|
|
|
let result = pool
|
|
|
|
|
.start_agent(root, "50_story_test", Some("coder-1"), None)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
assert!(
|
|
|
|
|
result.is_ok(),
|
|
|
|
|
"start_agent should return Ok(Pending) immediately: {:?}",
|
|
|
|
|
result.err()
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(
|
|
|
|
|
result.unwrap().status,
|
|
|
|
|
AgentStatus::Pending,
|
|
|
|
|
"initial status must be Pending"
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let final_info = pool
|
|
|
|
|
.wait_for_agent("50_story_test", "coder-1", 5000)
|
|
|
|
|
.await
|
|
|
|
|
.expect("wait_for_agent should not time out");
|
|
|
|
|
assert_eq!(
|
|
|
|
|
final_info.status,
|
|
|
|
|
AgentStatus::Failed,
|
|
|
|
|
"agent must transition to Failed after worktree creation error"
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let agents = pool.agents.lock().unwrap();
|
|
|
|
|
let failed_entry = agents
|
|
|
|
|
.values()
|
|
|
|
|
.find(|a| a.agent_name == "coder-1" && a.status == AgentStatus::Failed);
|
|
|
|
|
assert!(
|
|
|
|
|
failed_entry.is_some(),
|
|
|
|
|
"agent pool must retain a Failed entry so the UI can show the error state"
|
|
|
|
|
);
|
|
|
|
|
drop(agents);
|
|
|
|
|
|
|
|
|
|
let events = pool
|
|
|
|
|
.drain_events("50_story_test", "coder-1")
|
|
|
|
|
.expect("drain_events should succeed");
|
|
|
|
|
let has_error_event = events.iter().any(|e| matches!(e, AgentEvent::Error { .. }));
|
|
|
|
|
assert!(
|
|
|
|
|
has_error_event,
|
|
|
|
|
"event_log must contain AgentEvent::Error after worktree creation fails"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_guard_does_not_remove_running_entry() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk_dir = root.join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(&sk_dir).unwrap();
|
|
|
|
|
fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3099);
|
|
|
|
|
pool.inject_test_agent("story-x", "qa", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
let result = pool.start_agent(root, "story-y", Some("qa"), None).await;
|
|
|
|
|
|
|
|
|
|
assert!(result.is_err());
|
|
|
|
|
let err = result.unwrap_err();
|
|
|
|
|
assert!(
|
|
|
|
|
err.contains("already running") || err.contains("becomes available"),
|
|
|
|
|
"running entry must survive: got '{err}'"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── TOCTOU race-condition regression tests (story 132) ───────────────────
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn toctou_pending_entry_blocks_same_agent_on_different_story() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk_dir = root.join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(&sk_dir).unwrap();
|
|
|
|
|
fs::write(
|
|
|
|
|
sk_dir.join("project.toml"),
|
|
|
|
|
"[[agent]]\nname = \"coder-1\"\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3099);
|
|
|
|
|
pool.inject_test_agent("86_story_foo", "coder-1", AgentStatus::Pending);
|
|
|
|
|
|
|
|
|
|
let result = pool
|
|
|
|
|
.start_agent(root, "130_story_bar", Some("coder-1"), None)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
assert!(result.is_err(), "second start_agent must be rejected");
|
|
|
|
|
let err = result.unwrap_err();
|
|
|
|
|
assert!(
|
|
|
|
|
err.contains("already running") || err.contains("becomes available"),
|
|
|
|
|
"expected concurrency-rejection message, got: '{err}'"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
|
|
|
async fn toctou_concurrent_start_agent_same_agent_exactly_one_concurrency_rejection() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path().to_path_buf();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk_dir = root.join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(sk_dir.join("work/1_backlog")).unwrap();
|
|
|
|
|
fs::write(
|
2026-03-20 11:34:53 +00:00
|
|
|
root.join(".storkit/project.toml"),
|
2026-03-19 22:41:17 +00:00
|
|
|
"[[agent]]\nname = \"coder-1\"\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
fs::write(
|
2026-03-20 11:34:53 +00:00
|
|
|
root.join(".storkit/work/1_backlog/86_story_foo.md"),
|
2026-03-19 22:41:17 +00:00
|
|
|
"---\nname: Foo\n---\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
fs::write(
|
2026-03-20 11:34:53 +00:00
|
|
|
root.join(".storkit/work/1_backlog/130_story_bar.md"),
|
2026-03-19 22:41:17 +00:00
|
|
|
"---\nname: Bar\n---\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = Arc::new(AgentPool::new_test(3099));
|
|
|
|
|
|
|
|
|
|
let pool1 = pool.clone();
|
|
|
|
|
let root1 = root.clone();
|
|
|
|
|
let t1 = tokio::spawn(async move {
|
|
|
|
|
pool1
|
|
|
|
|
.start_agent(&root1, "86_story_foo", Some("coder-1"), None)
|
|
|
|
|
.await
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let pool2 = pool.clone();
|
|
|
|
|
let root2 = root.clone();
|
|
|
|
|
let t2 = tokio::spawn(async move {
|
|
|
|
|
pool2
|
|
|
|
|
.start_agent(&root2, "130_story_bar", Some("coder-1"), None)
|
|
|
|
|
.await
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let (r1, r2) = tokio::join!(t1, t2);
|
|
|
|
|
let r1 = r1.unwrap();
|
|
|
|
|
let r2 = r2.unwrap();
|
|
|
|
|
|
|
|
|
|
let concurrency_rejections = [&r1, &r2]
|
|
|
|
|
.iter()
|
|
|
|
|
.filter(|r| {
|
|
|
|
|
r.as_ref().is_err_and(|e| {
|
|
|
|
|
e.contains("already running") || e.contains("becomes available")
|
|
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
.count();
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
concurrency_rejections, 1,
|
|
|
|
|
"exactly one call must be rejected by the concurrency check; \
|
|
|
|
|
got r1={r1:?} r2={r2:?}"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── story-230: prevent duplicate stage agents on same story ───────────────
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_rejects_second_coder_stage_on_same_story() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk_dir = root.join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(&sk_dir).unwrap();
|
|
|
|
|
fs::write(
|
|
|
|
|
sk_dir.join("project.toml"),
|
|
|
|
|
"[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3099);
|
|
|
|
|
pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
let result = pool
|
|
|
|
|
.start_agent(root, "42_story_foo", Some("coder-2"), None)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
assert!(
|
|
|
|
|
result.is_err(),
|
|
|
|
|
"second coder on same story must be rejected"
|
|
|
|
|
);
|
|
|
|
|
let err = result.unwrap_err();
|
|
|
|
|
assert!(
|
|
|
|
|
err.contains("same pipeline stage"),
|
|
|
|
|
"error must mention same pipeline stage, got: '{err}'"
|
|
|
|
|
);
|
|
|
|
|
assert!(
|
|
|
|
|
err.contains("coder-1") && err.contains("coder-2"),
|
|
|
|
|
"error must name both agents, got: '{err}'"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_rejects_second_qa_stage_on_same_story() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk_dir = root.join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(&sk_dir).unwrap();
|
|
|
|
|
fs::write(
|
|
|
|
|
sk_dir.join("project.toml"),
|
|
|
|
|
"[[agent]]\nname = \"qa-1\"\nstage = \"qa\"\n\n\
|
|
|
|
|
[[agent]]\nname = \"qa-2\"\nstage = \"qa\"\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3099);
|
|
|
|
|
pool.inject_test_agent("55_story_bar", "qa-1", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
let result = pool
|
|
|
|
|
.start_agent(root, "55_story_bar", Some("qa-2"), None)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
assert!(result.is_err(), "second qa on same story must be rejected");
|
|
|
|
|
let err = result.unwrap_err();
|
|
|
|
|
assert!(
|
|
|
|
|
err.contains("same pipeline stage"),
|
|
|
|
|
"error must mention same pipeline stage, got: '{err}'"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
|
|
|
async fn start_agent_concurrent_two_coders_same_story_exactly_one_stage_rejection() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path().to_path_buf();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk_dir = root.join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(sk_dir.join("work/2_current")).unwrap();
|
|
|
|
|
fs::write(
|
2026-03-20 11:34:53 +00:00
|
|
|
root.join(".storkit/project.toml"),
|
2026-03-19 22:41:17 +00:00
|
|
|
"[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
fs::write(
|
2026-03-20 11:34:53 +00:00
|
|
|
root.join(".storkit/work/2_current/42_story_foo.md"),
|
2026-03-19 22:41:17 +00:00
|
|
|
"---\nname: Foo\n---\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = Arc::new(AgentPool::new_test(3099));
|
|
|
|
|
|
|
|
|
|
let pool1 = pool.clone();
|
|
|
|
|
let root1 = root.clone();
|
|
|
|
|
let t1 = tokio::spawn(async move {
|
|
|
|
|
pool1
|
|
|
|
|
.start_agent(&root1, "42_story_foo", Some("coder-1"), None)
|
|
|
|
|
.await
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let pool2 = pool.clone();
|
|
|
|
|
let root2 = root.clone();
|
|
|
|
|
let t2 = tokio::spawn(async move {
|
|
|
|
|
pool2
|
|
|
|
|
.start_agent(&root2, "42_story_foo", Some("coder-2"), None)
|
|
|
|
|
.await
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let (r1, r2) = tokio::join!(t1, t2);
|
|
|
|
|
let r1 = r1.unwrap();
|
|
|
|
|
let r2 = r2.unwrap();
|
|
|
|
|
|
|
|
|
|
let stage_rejections = [&r1, &r2]
|
|
|
|
|
.iter()
|
|
|
|
|
.filter(|r| r.as_ref().is_err_and(|e| e.contains("same pipeline stage")))
|
|
|
|
|
.count();
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
stage_rejections, 1,
|
|
|
|
|
"exactly one call must be rejected by the stage-conflict check; \
|
|
|
|
|
got r1={r1:?} r2={r2:?}"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_two_coders_different_stories_not_blocked_by_stage_check() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk_dir = root.join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(sk_dir.join("work/1_backlog")).unwrap();
|
|
|
|
|
fs::write(
|
2026-03-20 11:34:53 +00:00
|
|
|
root.join(".storkit/project.toml"),
|
2026-03-19 22:41:17 +00:00
|
|
|
"[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
fs::write(
|
2026-03-20 11:34:53 +00:00
|
|
|
root.join(".storkit/work/1_backlog/99_story_baz.md"),
|
2026-03-19 22:41:17 +00:00
|
|
|
"---\nname: Baz\n---\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3099);
|
|
|
|
|
pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
let result = pool
|
|
|
|
|
.start_agent(root, "99_story_baz", Some("coder-2"), None)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
if let Err(ref e) = result {
|
|
|
|
|
assert!(
|
|
|
|
|
!e.contains("same pipeline stage"),
|
|
|
|
|
"stage-conflict guard must not fire for agents on different stories; \
|
|
|
|
|
got: '{e}'"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── bug 312: stage-pipeline mismatch guard in start_agent ──────────────
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_rejects_mergemaster_on_coding_stage_story() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk_dir = root.join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(sk_dir.join("work/2_current")).unwrap();
|
|
|
|
|
fs::write(
|
|
|
|
|
sk_dir.join("project.toml"),
|
|
|
|
|
"[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\
|
|
|
|
|
[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
fs::write(
|
|
|
|
|
sk_dir.join("work/2_current/310_story_foo.md"),
|
|
|
|
|
"---\nname: Foo\n---\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3099);
|
|
|
|
|
let result = pool
|
|
|
|
|
.start_agent(root, "310_story_foo", Some("mergemaster"), None)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
assert!(
|
|
|
|
|
result.is_err(),
|
|
|
|
|
"mergemaster must not be assigned to a story in 2_current/"
|
|
|
|
|
);
|
|
|
|
|
let err = result.unwrap_err();
|
|
|
|
|
assert!(
|
|
|
|
|
err.contains("stage") && err.contains("2_current"),
|
|
|
|
|
"error must mention stage mismatch, got: '{err}'"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_rejects_coder_on_qa_stage_story() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk_dir = root.join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(sk_dir.join("work/3_qa")).unwrap();
|
|
|
|
|
fs::write(
|
|
|
|
|
sk_dir.join("project.toml"),
|
|
|
|
|
"[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\
|
|
|
|
|
[[agent]]\nname = \"qa\"\nstage = \"qa\"\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
fs::write(
|
|
|
|
|
sk_dir.join("work/3_qa/42_story_bar.md"),
|
|
|
|
|
"---\nname: Bar\n---\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3099);
|
|
|
|
|
let result = pool
|
|
|
|
|
.start_agent(root, "42_story_bar", Some("coder-1"), None)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
assert!(
|
|
|
|
|
result.is_err(),
|
|
|
|
|
"coder must not be assigned to a story in 3_qa/"
|
|
|
|
|
);
|
|
|
|
|
let err = result.unwrap_err();
|
|
|
|
|
assert!(
|
|
|
|
|
err.contains("stage") && err.contains("3_qa"),
|
|
|
|
|
"error must mention stage mismatch, got: '{err}'"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_rejects_qa_on_merge_stage_story() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk_dir = root.join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(sk_dir.join("work/4_merge")).unwrap();
|
|
|
|
|
fs::write(
|
|
|
|
|
sk_dir.join("project.toml"),
|
|
|
|
|
"[[agent]]\nname = \"qa\"\nstage = \"qa\"\n\n\
|
|
|
|
|
[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
fs::write(
|
|
|
|
|
sk_dir.join("work/4_merge/55_story_baz.md"),
|
|
|
|
|
"---\nname: Baz\n---\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3099);
|
|
|
|
|
let result = pool
|
|
|
|
|
.start_agent(root, "55_story_baz", Some("qa"), None)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
assert!(
|
|
|
|
|
result.is_err(),
|
|
|
|
|
"qa must not be assigned to a story in 4_merge/"
|
|
|
|
|
);
|
|
|
|
|
let err = result.unwrap_err();
|
|
|
|
|
assert!(
|
|
|
|
|
err.contains("stage") && err.contains("4_merge"),
|
|
|
|
|
"error must mention stage mismatch, got: '{err}'"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_allows_supervisor_on_any_stage() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk_dir = root.join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(sk_dir.join("work/2_current")).unwrap();
|
|
|
|
|
fs::write(
|
|
|
|
|
sk_dir.join("project.toml"),
|
|
|
|
|
"[[agent]]\nname = \"supervisor\"\nstage = \"other\"\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
fs::write(
|
|
|
|
|
sk_dir.join("work/2_current/77_story_sup.md"),
|
|
|
|
|
"---\nname: Sup\n---\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3099);
|
|
|
|
|
let result = pool
|
|
|
|
|
.start_agent(root, "77_story_sup", Some("supervisor"), None)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
match result {
|
|
|
|
|
Ok(_) => {}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
assert!(
|
|
|
|
|
!e.contains("stage:") || !e.contains("cannot be assigned"),
|
|
|
|
|
"supervisor should not be rejected for stage mismatch, got: '{e}'"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn start_agent_allows_correct_stage_agent() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let sk_dir = root.join(".storkit");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(sk_dir.join("work/4_merge")).unwrap();
|
|
|
|
|
fs::write(
|
|
|
|
|
sk_dir.join("project.toml"),
|
|
|
|
|
"[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
fs::write(
|
|
|
|
|
sk_dir.join("work/4_merge/88_story_ok.md"),
|
|
|
|
|
"---\nname: OK\n---\n",
|
|
|
|
|
)
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3099);
|
|
|
|
|
let result = pool
|
|
|
|
|
.start_agent(root, "88_story_ok", Some("mergemaster"), None)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
match result {
|
|
|
|
|
Ok(_) => {}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
assert!(
|
|
|
|
|
!e.contains("cannot be assigned"),
|
|
|
|
|
"mergemaster on 4_merge/ story should not fail stage check, got: '{e}'"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── find_active_story_stage tests ─────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn find_active_story_stage_detects_current() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
2026-03-20 11:34:53 +00:00
|
|
|
let current = root.join(".storkit/work/2_current");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(¤t).unwrap();
|
|
|
|
|
fs::write(current.join("10_story_test.md"), "test").unwrap();
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
find_active_story_stage(root, "10_story_test"),
|
|
|
|
|
Some("2_current")
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn find_active_story_stage_detects_qa() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
2026-03-20 11:34:53 +00:00
|
|
|
let qa = root.join(".storkit/work/3_qa");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(&qa).unwrap();
|
|
|
|
|
fs::write(qa.join("11_story_test.md"), "test").unwrap();
|
|
|
|
|
|
|
|
|
|
assert_eq!(find_active_story_stage(root, "11_story_test"), Some("3_qa"));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn find_active_story_stage_detects_merge() {
|
|
|
|
|
use std::fs;
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
2026-03-20 11:34:53 +00:00
|
|
|
let merge = root.join(".storkit/work/4_merge");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(&merge).unwrap();
|
|
|
|
|
fs::write(merge.join("12_story_test.md"), "test").unwrap();
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
find_active_story_stage(root, "12_story_test"),
|
|
|
|
|
Some("4_merge")
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn find_active_story_stage_returns_none_for_unknown_story() {
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
assert_eq!(find_active_story_stage(tmp.path(), "99_nonexistent"), None);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── remove_agents_for_story tests ────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn remove_agents_for_story_removes_all_entries() {
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.inject_test_agent("story_a", "coder-1", AgentStatus::Completed);
|
|
|
|
|
pool.inject_test_agent("story_a", "qa", AgentStatus::Failed);
|
|
|
|
|
pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
let removed = pool.remove_agents_for_story("story_a");
|
|
|
|
|
assert_eq!(removed, 2, "should remove both agents for story_a");
|
|
|
|
|
|
|
|
|
|
let agents = pool.list_agents().unwrap();
|
|
|
|
|
assert_eq!(agents.len(), 1, "only story_b agent should remain");
|
|
|
|
|
assert_eq!(agents[0].story_id, "story_b");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn remove_agents_for_story_returns_zero_when_no_match() {
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
let removed = pool.remove_agents_for_story("nonexistent");
|
|
|
|
|
assert_eq!(removed, 0);
|
|
|
|
|
|
|
|
|
|
let agents = pool.list_agents().unwrap();
|
|
|
|
|
assert_eq!(agents.len(), 1, "existing agents should not be affected");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── archive + cleanup integration test ───────────────────────────────────
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn archiving_story_removes_agent_entries_from_pool() {
|
|
|
|
|
use crate::agents::lifecycle::move_story_to_archived;
|
|
|
|
|
use std::fs;
|
|
|
|
|
|
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
|
|
|
let root = tmp.path();
|
|
|
|
|
|
2026-03-20 11:34:53 +00:00
|
|
|
let current = root.join(".storkit/work/2_current");
|
2026-03-19 22:41:17 +00:00
|
|
|
fs::create_dir_all(¤t).unwrap();
|
|
|
|
|
fs::write(current.join("60_story_cleanup.md"), "test").unwrap();
|
|
|
|
|
|
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
|
|
|
pool.inject_test_agent("60_story_cleanup", "coder-1", AgentStatus::Completed);
|
|
|
|
|
pool.inject_test_agent("60_story_cleanup", "qa", AgentStatus::Completed);
|
|
|
|
|
pool.inject_test_agent("61_story_other", "coder-1", AgentStatus::Running);
|
|
|
|
|
|
|
|
|
|
assert_eq!(pool.list_agents().unwrap().len(), 3);
|
|
|
|
|
|
|
|
|
|
move_story_to_archived(root, "60_story_cleanup").unwrap();
|
|
|
|
|
pool.remove_agents_for_story("60_story_cleanup");
|
|
|
|
|
|
|
|
|
|
let remaining = pool.list_agents().unwrap();
|
|
|
|
|
assert_eq!(
|
|
|
|
|
remaining.len(),
|
|
|
|
|
1,
|
|
|
|
|
"only the other story's agent should remain"
|
|
|
|
|
);
|
|
|
|
|
assert_eq!(remaining[0].story_id, "61_story_other");
|
|
|
|
|
|
|
|
|
|
assert!(
|
2026-03-20 11:34:53 +00:00
|
|
|
root.join(".storkit/work/5_done/60_story_cleanup.md")
|
2026-03-19 22:41:17 +00:00
|
|
|
.exists()
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|