2026-03-22 19:07:07 +00:00
|
|
|
mod auto_assign;
|
2026-03-27 15:53:32 +00:00
|
|
|
mod lifecycle;
|
2026-03-22 19:07:07 +00:00
|
|
|
mod pipeline;
|
2026-03-27 15:53:32 +00:00
|
|
|
mod process;
|
|
|
|
|
mod query;
|
|
|
|
|
mod types;
|
|
|
|
|
mod worktree;
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod test_helpers;
|
2026-03-22 19:07:07 +00:00
|
|
|
|
|
|
|
|
use crate::io::watcher::WatcherEvent;
|
|
|
|
|
use portable_pty::ChildKiller;
|
|
|
|
|
use std::collections::HashMap;
|
|
|
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
|
use tokio::sync::broadcast;
|
|
|
|
|
|
2026-03-27 15:53:32 +00:00
|
|
|
// Bring pool-internal types into pool's namespace so that sub-modules
|
|
|
|
|
// (auto_assign, pipeline, etc.) can access them via `use super::...`.
|
|
|
|
|
use types::{StoryAgent, composite_key};
|
|
|
|
|
use worktree::find_active_story_stage;
|
2026-03-22 19:07:07 +00:00
|
|
|
|
|
|
|
|
/// Manages concurrent story agents, each in its own worktree.
|
|
|
|
|
pub struct AgentPool {
|
|
|
|
|
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
|
|
|
|
|
port: u16,
|
|
|
|
|
/// Registry of active PTY child process killers, keyed by "{story_id}:{agent_name}".
|
|
|
|
|
/// Used to terminate child processes on server shutdown or agent stop, preventing
|
|
|
|
|
/// orphaned Claude Code processes from running after the server exits.
|
|
|
|
|
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
|
|
|
|
/// Broadcast channel for notifying WebSocket clients of agent state changes.
|
|
|
|
|
/// When an agent transitions state (Pending, Running, Completed, Failed, Stopped),
|
|
|
|
|
/// an `AgentStateChanged` event is emitted so the frontend can refresh the
|
|
|
|
|
/// pipeline board without waiting for a filesystem event.
|
|
|
|
|
watcher_tx: broadcast::Sender<WatcherEvent>,
|
|
|
|
|
/// Tracks background merge jobs started by `merge_agent_work`, keyed by story_id.
|
|
|
|
|
/// The MCP tool returns immediately and the mergemaster agent polls
|
|
|
|
|
/// `get_merge_status` until the job reaches a terminal state.
|
|
|
|
|
merge_jobs: Arc<Mutex<HashMap<String, super::merge::MergeJob>>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl AgentPool {
|
|
|
|
|
pub fn new(port: u16, watcher_tx: broadcast::Sender<WatcherEvent>) -> Self {
|
2026-03-28 09:21:03 +00:00
|
|
|
let pool = Self {
|
2026-03-22 19:07:07 +00:00
|
|
|
agents: Arc::new(Mutex::new(HashMap::new())),
|
|
|
|
|
port,
|
|
|
|
|
child_killers: Arc::new(Mutex::new(HashMap::new())),
|
2026-03-28 09:21:03 +00:00
|
|
|
watcher_tx: watcher_tx.clone(),
|
2026-03-22 19:07:07 +00:00
|
|
|
merge_jobs: Arc::new(Mutex::new(HashMap::new())),
|
2026-03-28 09:21:03 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Spawn a background task (only when inside a tokio runtime) that
|
|
|
|
|
// listens for RateLimitWarning and HardBlock events and updates the
|
|
|
|
|
// throttled flag on the relevant agent so status dots stay current.
|
|
|
|
|
if tokio::runtime::Handle::try_current().is_ok() {
|
|
|
|
|
let agents_clone = Arc::clone(&pool.agents);
|
|
|
|
|
let watcher_tx_clone = watcher_tx.clone();
|
|
|
|
|
let mut rx = watcher_tx.subscribe();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
loop {
|
|
|
|
|
let event = match rx.recv().await {
|
|
|
|
|
Ok(e) => e,
|
|
|
|
|
Err(broadcast::error::RecvError::Closed) => break,
|
|
|
|
|
Err(broadcast::error::RecvError::Lagged(_)) => continue,
|
|
|
|
|
};
|
|
|
|
|
let (story_id, agent_name) = match &event {
|
|
|
|
|
WatcherEvent::RateLimitWarning { story_id, agent_name }
|
|
|
|
|
| WatcherEvent::HardBlock { story_id, agent_name, .. } => {
|
|
|
|
|
(story_id.clone(), agent_name.clone())
|
|
|
|
|
}
|
|
|
|
|
_ => continue,
|
|
|
|
|
};
|
|
|
|
|
let key = composite_key(&story_id, &agent_name);
|
2026-03-28 09:55:19 +00:00
|
|
|
if let Ok(mut agents) = agents_clone.lock()
|
|
|
|
|
&& let Some(agent) = agents.get_mut(&key)
|
|
|
|
|
{
|
|
|
|
|
agent.throttled = true;
|
2026-03-28 09:21:03 +00:00
|
|
|
}
|
|
|
|
|
let _ = watcher_tx_clone.send(WatcherEvent::AgentStateChanged);
|
|
|
|
|
}
|
|
|
|
|
});
|
2026-03-22 19:07:07 +00:00
|
|
|
}
|
2026-03-28 09:21:03 +00:00
|
|
|
|
|
|
|
|
pool
|
2026-03-22 19:07:07 +00:00
|
|
|
}
|
|
|
|
|
|
2026-03-23 12:57:49 +00:00
|
|
|
pub fn port(&self) -> u16 {
|
|
|
|
|
self.port
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-22 19:07:07 +00:00
|
|
|
/// Create a pool with a dummy watcher channel for unit tests.
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
pub fn new_test(port: u16) -> Self {
|
|
|
|
|
let (watcher_tx, _) = broadcast::channel(16);
|
|
|
|
|
Self::new(port, watcher_tx)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Notify WebSocket clients that agent state has changed, so the pipeline
|
|
|
|
|
/// board and agent panel can refresh.
|
|
|
|
|
fn notify_agent_state_changed(watcher_tx: &broadcast::Sender<WatcherEvent>) {
|
|
|
|
|
let _ = watcher_tx.send(WatcherEvent::AgentStateChanged);
|
|
|
|
|
}
|
|
|
|
|
}
|