use crate::config::ProjectConfig; use crate::worktree::{self, WorktreeInfo}; use portable_pty::{CommandBuilder, PtySize, native_pty_system}; use serde::Serialize; use std::collections::HashMap; use std::io::{BufRead, BufReader}; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; /// Events streamed from a running agent to SSE clients. #[derive(Debug, Clone, Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum AgentEvent { /// Agent status changed. Status { story_id: String, status: String }, /// Raw text output from the agent process. Output { story_id: String, text: String }, /// Agent produced a JSON event from `--output-format stream-json`. AgentJson { story_id: String, data: serde_json::Value }, /// Agent finished. Done { story_id: String, session_id: Option, }, /// Agent errored. Error { story_id: String, message: String }, } #[derive(Debug, Clone, Serialize, PartialEq)] #[serde(rename_all = "snake_case")] pub enum AgentStatus { Pending, Running, Completed, Failed, } impl std::fmt::Display for AgentStatus { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Pending => write!(f, "pending"), Self::Running => write!(f, "running"), Self::Completed => write!(f, "completed"), Self::Failed => write!(f, "failed"), } } } #[derive(Serialize, Clone)] pub struct AgentInfo { pub story_id: String, pub status: AgentStatus, pub session_id: Option, pub worktree_path: Option, } struct StoryAgent { status: AgentStatus, worktree_info: Option, config: ProjectConfig, session_id: Option, tx: broadcast::Sender, task_handle: Option>, } /// Manages concurrent story agents, each in its own worktree. pub struct AgentPool { agents: Arc>>, } impl AgentPool { pub fn new() -> Self { Self { agents: Arc::new(Mutex::new(HashMap::new())), } } /// Start an agent for a story: load config, create worktree, spawn agent. pub async fn start_agent( &self, project_root: &Path, story_id: &str, ) -> Result { // Check not already running { let agents = self.agents.lock().map_err(|e| e.to_string())?; if let Some(agent) = agents.get(story_id) && (agent.status == AgentStatus::Running || agent.status == AgentStatus::Pending) { return Err(format!( "Agent for story '{story_id}' is already {}", agent.status )); } } let config = ProjectConfig::load(project_root)?; let (tx, _) = broadcast::channel::(256); // Register as pending { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; agents.insert( story_id.to_string(), StoryAgent { status: AgentStatus::Pending, worktree_info: None, config: config.clone(), session_id: None, tx: tx.clone(), task_handle: None, }, ); } let _ = tx.send(AgentEvent::Status { story_id: story_id.to_string(), status: "pending".to_string(), }); // Create worktree let wt_info = worktree::create_worktree(project_root, story_id, &config).await?; // Update with worktree info { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; if let Some(agent) = agents.get_mut(story_id) { agent.worktree_info = Some(wt_info.clone()); } } // Spawn the agent process let wt_path_str = wt_info.path.to_string_lossy().to_string(); let rendered = config.render_agent_args(&wt_path_str, story_id); let (command, args, prompt) = rendered.ok_or_else(|| { "No [agent] section in config — cannot spawn agent".to_string() })?; let sid = story_id.to_string(); let tx_clone = tx.clone(); let agents_ref = self.agents.clone(); let cwd = wt_path_str.clone(); let handle = tokio::spawn(async move { let _ = tx_clone.send(AgentEvent::Status { story_id: sid.clone(), status: "running".to_string(), }); match run_agent_pty_streaming(&sid, &command, &args, &prompt, &cwd, &tx_clone).await { Ok(session_id) => { // Mark completed in the pool if let Ok(mut agents) = agents_ref.lock() && let Some(agent) = agents.get_mut(&sid) { agent.status = AgentStatus::Completed; agent.session_id = session_id.clone(); } let _ = tx_clone.send(AgentEvent::Done { story_id: sid.clone(), session_id, }); } Err(e) => { // Mark failed in the pool if let Ok(mut agents) = agents_ref.lock() && let Some(agent) = agents.get_mut(&sid) { agent.status = AgentStatus::Failed; } let _ = tx_clone.send(AgentEvent::Error { story_id: sid.clone(), message: e, }); } } }); // Update status to running with task handle { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; if let Some(agent) = agents.get_mut(story_id) { agent.status = AgentStatus::Running; agent.task_handle = Some(handle); } } Ok(AgentInfo { story_id: story_id.to_string(), status: AgentStatus::Running, session_id: None, worktree_path: Some(wt_path_str), }) } /// Stop a running agent and clean up its worktree. pub async fn stop_agent(&self, project_root: &Path, story_id: &str) -> Result<(), String> { let (worktree_info, config, task_handle, tx) = { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents .get_mut(story_id) .ok_or_else(|| format!("No agent for story '{story_id}'"))?; let wt = agent.worktree_info.clone(); let cfg = agent.config.clone(); let handle = agent.task_handle.take(); let tx = agent.tx.clone(); agent.status = AgentStatus::Failed; (wt, cfg, handle, tx) }; // Abort the task if let Some(handle) = task_handle { handle.abort(); let _ = handle.await; } // Remove worktree if let Some(ref wt) = worktree_info && let Err(e) = worktree::remove_worktree(project_root, wt, &config).await { eprintln!("[agents] Worktree cleanup warning for {story_id}: {e}"); } let _ = tx.send(AgentEvent::Status { story_id: story_id.to_string(), status: "stopped".to_string(), }); // Remove from map { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; agents.remove(story_id); } Ok(()) } /// List all agents with their status. pub fn list_agents(&self) -> Result, String> { let agents = self.agents.lock().map_err(|e| e.to_string())?; Ok(agents .iter() .map(|(story_id, agent)| AgentInfo { story_id: story_id.clone(), status: agent.status.clone(), session_id: agent.session_id.clone(), worktree_path: agent .worktree_info .as_ref() .map(|wt| wt.path.to_string_lossy().to_string()), }) .collect()) } /// Subscribe to events for a story agent. pub fn subscribe(&self, story_id: &str) -> Result, String> { let agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents .get(story_id) .ok_or_else(|| format!("No agent for story '{story_id}'"))?; Ok(agent.tx.subscribe()) } /// Get project root helper. pub fn get_project_root( &self, state: &crate::state::SessionState, ) -> Result { state.get_project_root() } } /// Spawn claude agent in a PTY and stream events through the broadcast channel. async fn run_agent_pty_streaming( story_id: &str, command: &str, args: &[String], prompt: &str, cwd: &str, tx: &broadcast::Sender, ) -> Result, String> { let sid = story_id.to_string(); let cmd = command.to_string(); let args = args.to_vec(); let prompt = prompt.to_string(); let cwd = cwd.to_string(); let tx = tx.clone(); tokio::task::spawn_blocking(move || { run_agent_pty_blocking(&sid, &cmd, &args, &prompt, &cwd, &tx) }) .await .map_err(|e| format!("Agent task panicked: {e}"))? } fn run_agent_pty_blocking( story_id: &str, command: &str, args: &[String], prompt: &str, cwd: &str, tx: &broadcast::Sender, ) -> Result, String> { let pty_system = native_pty_system(); let pair = pty_system .openpty(PtySize { rows: 50, cols: 200, pixel_width: 0, pixel_height: 0, }) .map_err(|e| format!("Failed to open PTY: {e}"))?; let mut cmd = CommandBuilder::new(command); // -p must come first cmd.arg("-p"); cmd.arg(prompt); // Add configured args (e.g., --directory /path/to/worktree) for arg in args { cmd.arg(arg); } cmd.arg("--output-format"); cmd.arg("stream-json"); cmd.arg("--verbose"); // Supervised agents don't need interactive permission prompts cmd.arg("--permission-mode"); cmd.arg("bypassPermissions"); cmd.cwd(cwd); cmd.env("NO_COLOR", "1"); eprintln!("[agent:{story_id}] Spawning {command} in {cwd} with args: {args:?}"); let mut child = pair .slave .spawn_command(cmd) .map_err(|e| format!("Failed to spawn agent for {story_id}: {e}"))?; drop(pair.slave); let reader = pair .master .try_clone_reader() .map_err(|e| format!("Failed to clone PTY reader: {e}"))?; drop(pair.master); let buf_reader = BufReader::new(reader); let mut session_id: Option = None; for line in buf_reader.lines() { let line = match line { Ok(l) => l, Err(_) => break, }; let trimmed = line.trim(); if trimmed.is_empty() { continue; } // Try to parse as JSON let json: serde_json::Value = match serde_json::from_str(trimmed) { Ok(j) => j, Err(_) => { // Non-JSON output (terminal escapes etc.) — send as raw output let _ = tx.send(AgentEvent::Output { story_id: story_id.to_string(), text: trimmed.to_string(), }); continue; } }; let event_type = json.get("type").and_then(|t| t.as_str()).unwrap_or(""); match event_type { "system" => { session_id = json .get("session_id") .and_then(|s| s.as_str()) .map(|s| s.to_string()); } "assistant" => { if let Some(message) = json.get("message") && let Some(content) = message.get("content").and_then(|c| c.as_array()) { for block in content { if let Some(text) = block.get("text").and_then(|t| t.as_str()) { let _ = tx.send(AgentEvent::Output { story_id: story_id.to_string(), text: text.to_string(), }); } } } } _ => {} } // Forward all JSON events let _ = tx.send(AgentEvent::AgentJson { story_id: story_id.to_string(), data: json, }); } let _ = child.kill(); eprintln!( "[agent:{story_id}] Done. Session: {:?}", session_id ); Ok(session_id) }