Accept story 30: Worktree-based agent orchestration

Add git worktree isolation for concurrent story agents. Each agent now
runs in its own worktree with setup/teardown commands driven by
.story_kit/project.toml config. Agents stream output via SSE and support
start/stop lifecycle with Pending/Running/Completed/Failed statuses.

Backend: config.rs (TOML parsing), worktree.rs (git worktree lifecycle),
refactored agents.rs (broadcast streaming), agents_sse.rs (SSE endpoint).
Frontend: AgentPanel.tsx with Run/Stop buttons and streaming output log.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Dave
2026-02-19 17:58:53 +00:00
parent 7e56648954
commit 5e5cdd9b2f
15 changed files with 1440 additions and 281 deletions

View File

@@ -1,167 +1,305 @@
use crate::config::ProjectConfig;
use crate::worktree::{self, WorktreeInfo};
use portable_pty::{CommandBuilder, PtySize, native_pty_system};
use serde::{Deserialize, Serialize};
use serde::Serialize;
use std::collections::HashMap;
use std::io::{BufRead, BufReader};
use std::sync::Mutex;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
/// Manages multiple concurrent Claude Code agent sessions.
///
/// Each agent is identified by a string name (e.g., "coder-1", "coder-2").
/// Agents run `claude -p` in a PTY for Max subscription billing.
/// Sessions can be resumed for multi-turn conversations.
pub struct AgentPool {
agents: Mutex<HashMap<String, AgentState>>,
/// Events streamed from a running agent to SSE clients.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum AgentEvent {
/// Agent status changed.
Status { story_id: String, status: String },
/// Raw text output from the agent process.
Output { story_id: String, text: String },
/// Agent produced a JSON event from `--output-format stream-json`.
AgentJson { story_id: String, data: serde_json::Value },
/// Agent finished.
Done {
story_id: String,
session_id: Option<String>,
},
/// Agent errored.
Error { story_id: String, message: String },
}
#[derive(Clone, Serialize)]
pub struct AgentInfo {
pub name: String,
pub role: String,
pub cwd: String,
pub session_id: Option<String>,
pub status: AgentStatus,
pub message_count: usize,
}
#[derive(Clone, Serialize)]
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum AgentStatus {
Idle,
Pending,
Running,
Completed,
Failed,
}
struct AgentState {
role: String,
cwd: String,
session_id: Option<String>,
message_count: usize,
impl std::fmt::Display for AgentStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pending => write!(f, "pending"),
Self::Running => write!(f, "running"),
Self::Completed => write!(f, "completed"),
Self::Failed => write!(f, "failed"),
}
}
}
#[derive(Deserialize)]
pub struct CreateAgentRequest {
pub name: String,
pub role: String,
pub cwd: String,
}
#[derive(Deserialize)]
pub struct SendMessageRequest {
pub message: String,
}
#[derive(Serialize)]
pub struct AgentResponse {
pub agent: String,
pub text: String,
#[derive(Serialize, Clone)]
pub struct AgentInfo {
pub story_id: String,
pub status: AgentStatus,
pub session_id: Option<String>,
pub model: Option<String>,
pub api_key_source: Option<String>,
pub rate_limit_type: Option<String>,
pub cost_usd: Option<f64>,
pub input_tokens: Option<u64>,
pub output_tokens: Option<u64>,
pub duration_ms: Option<u64>,
pub worktree_path: Option<String>,
}
struct StoryAgent {
status: AgentStatus,
worktree_info: Option<WorktreeInfo>,
config: ProjectConfig,
session_id: Option<String>,
tx: broadcast::Sender<AgentEvent>,
task_handle: Option<tokio::task::JoinHandle<()>>,
}
/// Manages concurrent story agents, each in its own worktree.
pub struct AgentPool {
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
}
impl AgentPool {
pub fn new() -> Self {
Self {
agents: Mutex::new(HashMap::new()),
agents: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn create_agent(&self, req: CreateAgentRequest) -> Result<AgentInfo, String> {
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
if agents.contains_key(&req.name) {
return Err(format!("Agent '{}' already exists", req.name));
/// Start an agent for a story: load config, create worktree, spawn agent.
pub async fn start_agent(
&self,
project_root: &Path,
story_id: &str,
) -> Result<AgentInfo, String> {
// Check not already running
{
let agents = self.agents.lock().map_err(|e| e.to_string())?;
if let Some(agent) = agents.get(story_id)
&& (agent.status == AgentStatus::Running || agent.status == AgentStatus::Pending) {
return Err(format!(
"Agent for story '{story_id}' is already {}",
agent.status
));
}
}
let state = AgentState {
role: req.role.clone(),
cwd: req.cwd.clone(),
session_id: None,
message_count: 0,
};
let config = ProjectConfig::load(project_root)?;
let (tx, _) = broadcast::channel::<AgentEvent>(256);
let info = AgentInfo {
name: req.name.clone(),
role: req.role,
cwd: req.cwd,
session_id: None,
status: AgentStatus::Idle,
message_count: 0,
};
// Register as pending
{
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
agents.insert(
story_id.to_string(),
StoryAgent {
status: AgentStatus::Pending,
worktree_info: None,
config: config.clone(),
session_id: None,
tx: tx.clone(),
task_handle: None,
},
);
}
agents.insert(req.name, state);
Ok(info)
let _ = tx.send(AgentEvent::Status {
story_id: story_id.to_string(),
status: "pending".to_string(),
});
// Create worktree
let wt_info = worktree::create_worktree(project_root, story_id, &config).await?;
// Update with worktree info
{
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
if let Some(agent) = agents.get_mut(story_id) {
agent.worktree_info = Some(wt_info.clone());
}
}
// Spawn the agent process
let wt_path_str = wt_info.path.to_string_lossy().to_string();
let rendered = config.render_agent_args(&wt_path_str, story_id);
let (command, args, prompt) = rendered.ok_or_else(|| {
"No [agent] section in config — cannot spawn agent".to_string()
})?;
let sid = story_id.to_string();
let tx_clone = tx.clone();
let agents_ref = self.agents.clone();
let cwd = wt_path_str.clone();
let handle = tokio::spawn(async move {
let _ = tx_clone.send(AgentEvent::Status {
story_id: sid.clone(),
status: "running".to_string(),
});
match run_agent_pty_streaming(&sid, &command, &args, &prompt, &cwd, &tx_clone).await {
Ok(session_id) => {
// Mark completed in the pool
if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&sid) {
agent.status = AgentStatus::Completed;
agent.session_id = session_id.clone();
}
let _ = tx_clone.send(AgentEvent::Done {
story_id: sid.clone(),
session_id,
});
}
Err(e) => {
// Mark failed in the pool
if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&sid) {
agent.status = AgentStatus::Failed;
}
let _ = tx_clone.send(AgentEvent::Error {
story_id: sid.clone(),
message: e,
});
}
}
});
// Update status to running with task handle
{
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
if let Some(agent) = agents.get_mut(story_id) {
agent.status = AgentStatus::Running;
agent.task_handle = Some(handle);
}
}
Ok(AgentInfo {
story_id: story_id.to_string(),
status: AgentStatus::Running,
session_id: None,
worktree_path: Some(wt_path_str),
})
}
/// Stop a running agent and clean up its worktree.
pub async fn stop_agent(&self, project_root: &Path, story_id: &str) -> Result<(), String> {
let (worktree_info, config, task_handle, tx) = {
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get_mut(story_id)
.ok_or_else(|| format!("No agent for story '{story_id}'"))?;
let wt = agent.worktree_info.clone();
let cfg = agent.config.clone();
let handle = agent.task_handle.take();
let tx = agent.tx.clone();
agent.status = AgentStatus::Failed;
(wt, cfg, handle, tx)
};
// Abort the task
if let Some(handle) = task_handle {
handle.abort();
let _ = handle.await;
}
// Remove worktree
if let Some(ref wt) = worktree_info
&& let Err(e) = worktree::remove_worktree(project_root, wt, &config).await {
eprintln!("[agents] Worktree cleanup warning for {story_id}: {e}");
}
let _ = tx.send(AgentEvent::Status {
story_id: story_id.to_string(),
status: "stopped".to_string(),
});
// Remove from map
{
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
agents.remove(story_id);
}
Ok(())
}
/// List all agents with their status.
pub fn list_agents(&self) -> Result<Vec<AgentInfo>, String> {
let agents = self.agents.lock().map_err(|e| e.to_string())?;
Ok(agents
.iter()
.map(|(name, state)| AgentInfo {
name: name.clone(),
role: state.role.clone(),
cwd: state.cwd.clone(),
session_id: state.session_id.clone(),
status: AgentStatus::Idle,
message_count: state.message_count,
.map(|(story_id, agent)| AgentInfo {
story_id: story_id.clone(),
status: agent.status.clone(),
session_id: agent.session_id.clone(),
worktree_path: agent
.worktree_info
.as_ref()
.map(|wt| wt.path.to_string_lossy().to_string()),
})
.collect())
}
/// Send a message to an agent and wait for the complete response.
/// This spawns a `claude -p` process in a PTY, optionally resuming
/// a previous session for multi-turn conversations.
pub async fn send_message(
/// Subscribe to events for a story agent.
pub fn subscribe(&self, story_id: &str) -> Result<broadcast::Receiver<AgentEvent>, String> {
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get(story_id)
.ok_or_else(|| format!("No agent for story '{story_id}'"))?;
Ok(agent.tx.subscribe())
}
/// Get project root helper.
pub fn get_project_root(
&self,
agent_name: &str,
message: &str,
) -> Result<AgentResponse, String> {
let (cwd, role, session_id) = {
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let state = agents
.get(agent_name)
.ok_or_else(|| format!("Agent '{}' not found", agent_name))?;
(
state.cwd.clone(),
state.role.clone(),
state.session_id.clone(),
)
};
let agent = agent_name.to_string();
let msg = message.to_string();
let role_clone = role.clone();
let result = tokio::task::spawn_blocking(move || {
run_agent_pty(&agent, &msg, &cwd, &role_clone, session_id.as_deref())
})
.await
.map_err(|e| format!("Agent task panicked: {e}"))??;
// Update session_id for next message
if let Some(ref sid) = result.session_id {
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
if let Some(state) = agents.get_mut(agent_name) {
state.session_id = Some(sid.clone());
state.message_count += 1;
}
}
Ok(result)
state: &crate::state::SessionState,
) -> Result<PathBuf, String> {
state.get_project_root()
}
}
fn run_agent_pty(
agent_name: &str,
message: &str,
/// Spawn claude agent in a PTY and stream events through the broadcast channel.
async fn run_agent_pty_streaming(
story_id: &str,
command: &str,
args: &[String],
prompt: &str,
cwd: &str,
role: &str,
resume_session: Option<&str>,
) -> Result<AgentResponse, String> {
tx: &broadcast::Sender<AgentEvent>,
) -> Result<Option<String>, String> {
let sid = story_id.to_string();
let cmd = command.to_string();
let args = args.to_vec();
let prompt = prompt.to_string();
let cwd = cwd.to_string();
let tx = tx.clone();
tokio::task::spawn_blocking(move || {
run_agent_pty_blocking(&sid, &cmd, &args, &prompt, &cwd, &tx)
})
.await
.map_err(|e| format!("Agent task panicked: {e}"))?
}
fn run_agent_pty_blocking(
story_id: &str,
command: &str,
args: &[String],
prompt: &str,
cwd: &str,
tx: &broadcast::Sender<AgentEvent>,
) -> Result<Option<String>, String> {
let pty_system = native_pty_system();
let pair = pty_system
@@ -173,9 +311,17 @@ fn run_agent_pty(
})
.map_err(|e| format!("Failed to open PTY: {e}"))?;
let mut cmd = CommandBuilder::new("claude");
let mut cmd = CommandBuilder::new(command);
// -p <prompt> must come first
cmd.arg("-p");
cmd.arg(message);
cmd.arg(prompt);
// Add configured args (e.g., --directory /path/to/worktree)
for arg in args {
cmd.arg(arg);
}
cmd.arg("--output-format");
cmd.arg("stream-json");
cmd.arg("--verbose");
@@ -184,32 +330,15 @@ fn run_agent_pty(
cmd.arg("--permission-mode");
cmd.arg("bypassPermissions");
// Append role as system prompt context
cmd.arg("--append-system-prompt");
cmd.arg(format!(
"You are agent '{}' with role: {}. Work autonomously on the task given.",
agent_name, role
));
// Resume previous session if available
if let Some(session_id) = resume_session {
cmd.arg("--resume");
cmd.arg(session_id);
}
cmd.cwd(cwd);
cmd.env("NO_COLOR", "1");
eprintln!(
"[agent:{}] Spawning claude -p (session: {:?})",
agent_name,
resume_session.unwrap_or("new")
);
eprintln!("[agent:{story_id}] Spawning {command} in {cwd} with args: {args:?}");
let mut child = pair
.slave
.spawn_command(cmd)
.map_err(|e| format!("Failed to spawn claude for agent {agent_name}: {e}"))?;
.map_err(|e| format!("Failed to spawn agent for {story_id}: {e}"))?;
drop(pair.slave);
@@ -221,18 +350,7 @@ fn run_agent_pty(
drop(pair.master);
let buf_reader = BufReader::new(reader);
let mut response = AgentResponse {
agent: agent_name.to_string(),
text: String::new(),
session_id: None,
model: None,
api_key_source: None,
rate_limit_type: None,
cost_usd: None,
input_tokens: None,
output_tokens: None,
duration_ms: None,
};
let mut session_id: Option<String> = None;
for line in buf_reader.lines() {
let line = match line {
@@ -245,67 +363,57 @@ fn run_agent_pty(
continue;
}
// Try to parse as JSON
let json: serde_json::Value = match serde_json::from_str(trimmed) {
Ok(j) => j,
Err(_) => continue, // skip non-JSON (terminal escapes)
Err(_) => {
// Non-JSON output (terminal escapes etc.) — send as raw output
let _ = tx.send(AgentEvent::Output {
story_id: story_id.to_string(),
text: trimmed.to_string(),
});
continue;
}
};
let event_type = json.get("type").and_then(|t| t.as_str()).unwrap_or("");
match event_type {
"system" => {
response.session_id = json
session_id = json
.get("session_id")
.and_then(|s| s.as_str())
.map(|s| s.to_string());
response.model = json
.get("model")
.and_then(|s| s.as_str())
.map(|s| s.to_string());
response.api_key_source = json
.get("apiKeySource")
.and_then(|s| s.as_str())
.map(|s| s.to_string());
}
"rate_limit_event" => {
if let Some(info) = json.get("rate_limit_info") {
response.rate_limit_type = info
.get("rateLimitType")
.and_then(|s| s.as_str())
.map(|s| s.to_string());
}
}
"assistant" => {
if let Some(message) = json.get("message") {
if let Some(content) = message.get("content").and_then(|c| c.as_array()) {
if let Some(message) = json.get("message")
&& let Some(content) = message.get("content").and_then(|c| c.as_array()) {
for block in content {
if let Some(text) = block.get("text").and_then(|t| t.as_str()) {
response.text.push_str(text);
let _ = tx.send(AgentEvent::Output {
story_id: story_id.to_string(),
text: text.to_string(),
});
}
}
}
}
}
"result" => {
response.cost_usd = json.get("total_cost_usd").and_then(|c| c.as_f64());
response.duration_ms = json.get("duration_ms").and_then(|d| d.as_u64());
if let Some(usage) = json.get("usage") {
response.input_tokens =
usage.get("input_tokens").and_then(|t| t.as_u64());
response.output_tokens =
usage.get("output_tokens").and_then(|t| t.as_u64());
}
}
_ => {}
}
// Forward all JSON events
let _ = tx.send(AgentEvent::AgentJson {
story_id: story_id.to_string(),
data: json,
});
}
let _ = child.kill();
eprintln!(
"[agent:{}] Done. Session: {:?}, tokens: {:?}/{:?}",
agent_name, response.session_id, response.input_tokens, response.output_tokens
"[agent:{story_id}] Done. Session: {:?}",
session_id
);
Ok(response)
Ok(session_id)
}