storkit: merge 415_refactor_split_agents_pool_mod_rs_into_submodules

This commit is contained in:
dave
2026-03-27 15:53:32 +00:00
parent 5173bf4aef
commit 7c25aca39b
7 changed files with 2414 additions and 2350 deletions
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+141
View File
@@ -0,0 +1,141 @@
use crate::slog;
use super::AgentPool;
impl AgentPool {
/// Kill all active PTY child processes.
///
/// Called on server shutdown to prevent orphaned Claude Code processes from
/// continuing to run after the server exits. Each registered killer is called
/// once, then the registry is cleared.
pub fn kill_all_children(&self) {
if let Ok(mut killers) = self.child_killers.lock() {
for (key, killer) in killers.iter_mut() {
slog!("[agents] Killing child process for {key} on shutdown");
let _ = killer.kill();
}
killers.clear();
}
}
/// Kill and deregister the child process for a specific agent key.
///
/// Used by `stop_agent` to ensure the PTY child is terminated even though
/// aborting a `spawn_blocking` task handle does not interrupt the blocking thread.
pub(super) fn kill_child_for_key(&self, key: &str) {
if let Ok(mut killers) = self.child_killers.lock()
&& let Some(mut killer) = killers.remove(key)
{
slog!("[agents] Killing child process for {key} on stop");
let _ = killer.kill();
}
}
/// Test helper: inject a child killer into the registry.
#[cfg(test)]
pub fn inject_child_killer(&self, key: &str, killer: Box<dyn portable_pty::ChildKiller + Send + Sync>) {
let mut killers = self.child_killers.lock().unwrap();
killers.insert(key.to_string(), killer);
}
/// Test helper: return the number of registered child killers.
#[cfg(test)]
pub fn child_killer_count(&self) -> usize {
self.child_killers.lock().unwrap().len()
}
}
#[cfg(test)]
mod tests {
use super::super::AgentPool;
use portable_pty::{CommandBuilder, PtySize, native_pty_system};
use std::process::Command;
/// Returns true if a process with the given PID is currently running.
fn process_is_running(pid: u32) -> bool {
Command::new("ps")
.args(["-p", &pid.to_string()])
.output()
.map(|o| o.status.success())
.unwrap_or(false)
}
#[test]
fn kill_all_children_is_safe_on_empty_pool() {
let pool = AgentPool::new_test(3001);
pool.kill_all_children();
assert_eq!(pool.child_killer_count(), 0);
}
#[test]
fn kill_all_children_kills_real_process() {
let pool = AgentPool::new_test(3001);
let pty_system = native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
})
.expect("failed to open pty");
let mut cmd = CommandBuilder::new("sleep");
cmd.arg("100");
let mut child = pair
.slave
.spawn_command(cmd)
.expect("failed to spawn sleep");
let pid = child.process_id().expect("no pid");
pool.inject_child_killer("story:agent", child.clone_killer());
assert!(
process_is_running(pid),
"process {pid} should be running before kill_all_children"
);
pool.kill_all_children();
let _ = child.wait();
assert!(
!process_is_running(pid),
"process {pid} should have been killed by kill_all_children"
);
}
#[test]
fn kill_all_children_clears_registry() {
let pool = AgentPool::new_test(3001);
let pty_system = native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
})
.expect("failed to open pty");
let mut cmd = CommandBuilder::new("sleep");
cmd.arg("1");
let mut child = pair
.slave
.spawn_command(cmd)
.expect("failed to spawn sleep");
pool.inject_child_killer("story:agent", child.clone_killer());
assert_eq!(pool.child_killer_count(), 1);
pool.kill_all_children();
let _ = child.wait();
assert_eq!(
pool.child_killer_count(),
0,
"child_killers should be cleared after kill_all_children"
);
}
}
+166
View File
@@ -0,0 +1,166 @@
use crate::config::ProjectConfig;
use std::path::PathBuf;
use tokio::sync::broadcast;
use super::super::{AgentEvent, AgentInfo, AgentStatus, PipelineStage, agent_config_stage};
use super::types::{agent_info_from_entry, composite_key};
use super::AgentPool;
impl AgentPool {
/// Return the names of configured agents for `stage` that are not currently
/// running or pending.
pub fn available_agents_for_stage(
&self,
config: &ProjectConfig,
stage: &PipelineStage,
) -> Result<Vec<String>, String> {
let agents = self.agents.lock().map_err(|e| e.to_string())?;
Ok(config
.agent
.iter()
.filter(|cfg| agent_config_stage(cfg) == *stage)
.filter(|cfg| {
!agents.values().any(|a| {
a.agent_name == cfg.name
&& matches!(a.status, AgentStatus::Running | AgentStatus::Pending)
})
})
.map(|cfg| cfg.name.clone())
.collect())
}
/// List all agents with their status.
pub fn list_agents(&self) -> Result<Vec<AgentInfo>, String> {
let agents = self.agents.lock().map_err(|e| e.to_string())?;
Ok(agents
.iter()
.map(|(key, agent)| {
// Extract story_id from composite key "story_id:agent_name"
let story_id = key
.rsplit_once(':')
.map(|(sid, _)| sid.to_string())
.unwrap_or_else(|| key.clone());
agent_info_from_entry(&story_id, agent)
})
.collect())
}
/// Subscribe to events for a story agent.
pub fn subscribe(
&self,
story_id: &str,
agent_name: &str,
) -> Result<broadcast::Receiver<AgentEvent>, String> {
let key = composite_key(story_id, agent_name);
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
Ok(agent.tx.subscribe())
}
/// Drain accumulated events for polling. Returns all events since the last drain.
pub fn drain_events(
&self,
story_id: &str,
agent_name: &str,
) -> Result<Vec<AgentEvent>, String> {
let key = composite_key(story_id, agent_name);
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
let mut log = agent.event_log.lock().map_err(|e| e.to_string())?;
Ok(log.drain(..).collect())
}
/// Get the log session ID and project root for an agent, if available.
///
/// Used by MCP tools to find the persistent log file for a completed agent.
pub fn get_log_info(&self, story_id: &str, agent_name: &str) -> Option<(String, PathBuf)> {
let key = composite_key(story_id, agent_name);
let agents = self.agents.lock().ok()?;
let agent = agents.get(&key)?;
let session_id = agent.log_session_id.clone()?;
let project_root = agent.project_root.clone()?;
Some((session_id, project_root))
}
}
#[cfg(test)]
mod tests {
use super::super::AgentPool;
use crate::agents::{AgentStatus, PipelineStage};
use crate::config::ProjectConfig;
fn make_config(toml_str: &str) -> ProjectConfig {
ProjectConfig::parse(toml_str).unwrap()
}
#[test]
fn available_agents_for_stage_returns_idle_agents() {
let config = make_config(
r#"
[[agent]]
name = "coder-1"
stage = "coder"
[[agent]]
name = "coder-2"
stage = "coder"
[[agent]]
name = "qa"
stage = "qa"
"#,
);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running);
let available = pool
.available_agents_for_stage(&config, &PipelineStage::Coder)
.unwrap();
assert_eq!(available, vec!["coder-2"]);
let available_qa = pool
.available_agents_for_stage(&config, &PipelineStage::Qa)
.unwrap();
assert_eq!(available_qa, vec!["qa"]);
}
#[test]
fn available_agents_for_stage_returns_empty_when_all_busy() {
let config = make_config(
r#"
[[agent]]
name = "coder-1"
stage = "coder"
"#,
);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running);
let available = pool
.available_agents_for_stage(&config, &PipelineStage::Coder)
.unwrap();
assert!(available.is_empty());
}
#[test]
fn available_agents_for_stage_ignores_completed_agents() {
let config = make_config(
r#"
[[agent]]
name = "coder-1"
stage = "coder"
"#,
);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("story-1", "coder-1", AgentStatus::Completed);
let available = pool
.available_agents_for_stage(&config, &PipelineStage::Coder)
.unwrap();
assert_eq!(available, vec!["coder-1"]);
}
}
+138
View File
@@ -0,0 +1,138 @@
use crate::worktree::WorktreeInfo;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use super::super::{AgentEvent, AgentStatus, CompletionReport};
use super::types::{StoryAgent, composite_key};
use super::AgentPool;
impl AgentPool {
/// Test helper: inject a pre-built agent entry so unit tests can exercise
/// wait/subscribe logic without spawning a real process.
pub fn inject_test_agent(
&self,
story_id: &str,
agent_name: &str,
status: AgentStatus,
) -> broadcast::Sender<AgentEvent> {
let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap();
agents.insert(
key,
StoryAgent {
agent_name: agent_name.to_string(),
status,
worktree_info: None,
session_id: None,
tx: tx.clone(),
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: None,
project_root: None,
log_session_id: None,
merge_failure_reported: false,
},
);
tx
}
/// Test helper: inject an agent with a specific worktree path for testing
/// gate-related logic.
pub fn inject_test_agent_with_path(
&self,
story_id: &str,
agent_name: &str,
status: AgentStatus,
worktree_path: PathBuf,
) -> broadcast::Sender<AgentEvent> {
let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap();
agents.insert(
key,
StoryAgent {
agent_name: agent_name.to_string(),
status,
worktree_info: Some(WorktreeInfo {
path: worktree_path,
branch: format!("feature/story-{story_id}"),
base_branch: "master".to_string(),
}),
session_id: None,
tx: tx.clone(),
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: None,
project_root: None,
log_session_id: None,
merge_failure_reported: false,
},
);
tx
}
/// Test helper: inject an agent with a completion report and project_root
/// for testing pipeline advance logic without spawning real agents.
pub fn inject_test_agent_with_completion(
&self,
story_id: &str,
agent_name: &str,
status: AgentStatus,
project_root: PathBuf,
completion: CompletionReport,
) -> broadcast::Sender<AgentEvent> {
let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap();
agents.insert(
key,
StoryAgent {
agent_name: agent_name.to_string(),
status,
worktree_info: None,
session_id: None,
tx: tx.clone(),
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: Some(completion),
project_root: Some(project_root),
log_session_id: None,
merge_failure_reported: false,
},
);
tx
}
/// Inject a Running agent with a pre-built (possibly finished) task handle.
/// Used by watchdog tests to simulate an orphaned agent.
pub fn inject_test_agent_with_handle(
&self,
story_id: &str,
agent_name: &str,
status: AgentStatus,
task_handle: tokio::task::JoinHandle<()>,
) -> broadcast::Sender<AgentEvent> {
let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap();
agents.insert(
key,
StoryAgent {
agent_name: agent_name.to_string(),
status,
worktree_info: None,
session_id: None,
tx: tx.clone(),
task_handle: Some(task_handle),
event_log: Arc::new(Mutex::new(Vec::new())),
completion: None,
project_root: None,
log_session_id: None,
merge_failure_reported: false,
},
);
tx
}
}
+103
View File
@@ -0,0 +1,103 @@
use crate::slog;
use crate::worktree::WorktreeInfo;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use super::super::{AgentEvent, AgentInfo, AgentStatus, CompletionReport};
/// Build the composite key used to track agents in the pool.
pub(super) fn composite_key(story_id: &str, agent_name: &str) -> String {
format!("{story_id}:{agent_name}")
}
/// RAII guard that removes a pending agent entry from the pool on drop.
///
/// Created after inserting a `Pending` entry into the agent HashMap.
/// If `start_agent` succeeds (the agent process is spawned and status
/// transitions to `Running`), call [`disarm`](Self::disarm) to prevent
/// cleanup. If any intermediate step fails and the guard is dropped
/// without being disarmed, the pending entry is removed so it cannot
/// block future auto-assign dispatches.
pub(super) struct PendingGuard {
pub(super) agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
pub(super) key: String,
pub(super) armed: bool,
}
impl PendingGuard {
pub(super) fn new(agents: Arc<Mutex<HashMap<String, StoryAgent>>>, key: String) -> Self {
Self {
agents,
key,
armed: true,
}
}
/// Prevent the guard from cleaning up the entry (call after
/// successful spawn).
pub(super) fn disarm(&mut self) {
self.armed = false;
}
}
impl Drop for PendingGuard {
fn drop(&mut self) {
if self.armed
&& let Ok(mut agents) = self.agents.lock()
&& agents
.get(&self.key)
.is_some_and(|a| a.status == AgentStatus::Pending)
{
agents.remove(&self.key);
slog!(
"[agents] Cleaned up leaked Pending entry for '{}'",
self.key
);
}
}
}
pub(super) struct StoryAgent {
pub(super) agent_name: String,
pub(super) status: AgentStatus,
pub(super) worktree_info: Option<WorktreeInfo>,
pub(super) session_id: Option<String>,
pub(super) tx: broadcast::Sender<AgentEvent>,
pub(super) task_handle: Option<tokio::task::JoinHandle<()>>,
/// Accumulated events for polling via get_agent_output.
pub(super) event_log: Arc<Mutex<Vec<AgentEvent>>>,
/// Set when the agent calls report_completion.
pub(super) completion: Option<CompletionReport>,
/// Project root, stored for pipeline advancement after completion.
pub(super) project_root: Option<PathBuf>,
/// UUID identifying the log file for this session.
pub(super) log_session_id: Option<String>,
/// Set to `true` when the agent calls `report_merge_failure`.
/// Prevents the pipeline from blindly advancing to `5_done/` after a
/// failed merge: the server-owned gate check runs in the feature-branch
/// worktree (which compiles fine) and returns `gates_passed=true` even
/// though the code was never squash-merged onto master.
pub(super) merge_failure_reported: bool,
}
/// Build an `AgentInfo` snapshot from a `StoryAgent` map entry.
pub(super) fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> AgentInfo {
AgentInfo {
story_id: story_id.to_string(),
agent_name: agent.agent_name.clone(),
status: agent.status.clone(),
session_id: agent.session_id.clone(),
worktree_path: agent
.worktree_info
.as_ref()
.map(|wt| wt.path.to_string_lossy().to_string()),
base_branch: agent
.worktree_info
.as_ref()
.map(|wt| wt.base_branch.clone()),
completion: agent.completion.clone(),
log_session_id: agent.log_session_id.clone(),
}
}
+91
View File
@@ -0,0 +1,91 @@
use crate::config::ProjectConfig;
use std::path::{Path, PathBuf};
use super::AgentPool;
impl AgentPool {
/// Create a worktree for the given story using the server port (writes .mcp.json).
pub async fn create_worktree(
&self,
project_root: &Path,
story_id: &str,
) -> Result<crate::worktree::WorktreeInfo, String> {
let config = ProjectConfig::load(project_root)?;
crate::worktree::create_worktree(project_root, story_id, &config, self.port).await
}
/// Get project root helper.
pub fn get_project_root(&self, state: &crate::state::SessionState) -> Result<PathBuf, String> {
state.get_project_root()
}
}
/// Return the active pipeline stage directory name for `story_id`, or `None` if the
/// story is not in any active stage (`2_current/`, `3_qa/`, `4_merge/`).
pub(super) fn find_active_story_stage(project_root: &Path, story_id: &str) -> Option<&'static str> {
const STAGES: [&str; 3] = ["2_current", "3_qa", "4_merge"];
for stage in &STAGES {
let path = project_root
.join(".storkit")
.join("work")
.join(stage)
.join(format!("{story_id}.md"));
if path.exists() {
return Some(stage);
}
}
None
}
#[cfg(test)]
mod tests {
use super::find_active_story_stage;
#[test]
fn find_active_story_stage_detects_current() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let current = root.join(".storkit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("10_story_test.md"), "test").unwrap();
assert_eq!(
find_active_story_stage(root, "10_story_test"),
Some("2_current")
);
}
#[test]
fn find_active_story_stage_detects_qa() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let qa = root.join(".storkit/work/3_qa");
fs::create_dir_all(&qa).unwrap();
fs::write(qa.join("11_story_test.md"), "test").unwrap();
assert_eq!(find_active_story_stage(root, "11_story_test"), Some("3_qa"));
}
#[test]
fn find_active_story_stage_detects_merge() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let merge = root.join(".storkit/work/4_merge");
fs::create_dir_all(&merge).unwrap();
fs::write(merge.join("12_story_test.md"), "test").unwrap();
assert_eq!(
find_active_story_stage(root, "12_story_test"),
Some("4_merge")
);
}
#[test]
fn find_active_story_stage_returns_none_for_unknown_story() {
let tmp = tempfile::tempdir().unwrap();
assert_eq!(find_active_story_stage(tmp.path(), "99_nonexistent"), None);
}
}