use std::collections::HashMap; use std::io::{BufRead, BufReader}; use std::sync::{Arc, Mutex}; use portable_pty::{ChildKiller, CommandBuilder, PtySize, native_pty_system}; use tokio::sync::broadcast; use super::{AgentEvent, TokenUsage}; use crate::agent_log::AgentLogWriter; use crate::slog; use crate::slog_warn; /// Result from a PTY agent session, containing the session ID and token usage. pub(super) struct PtyResult { pub session_id: Option, pub token_usage: Option, } fn composite_key(story_id: &str, agent_name: &str) -> String { format!("{story_id}:{agent_name}") } struct ChildKillerGuard { killers: Arc>>>, key: String, } impl Drop for ChildKillerGuard { fn drop(&mut self) { if let Ok(mut killers) = self.killers.lock() { killers.remove(&self.key); } } } /// Spawn claude agent in a PTY and stream events through the broadcast channel. #[allow(clippy::too_many_arguments)] pub(super) async fn run_agent_pty_streaming( story_id: &str, agent_name: &str, command: &str, args: &[String], prompt: &str, cwd: &str, tx: &broadcast::Sender, event_log: &Arc>>, log_writer: Option>>, inactivity_timeout_secs: u64, child_killers: Arc>>>, ) -> Result { let sid = story_id.to_string(); let aname = agent_name.to_string(); let cmd = command.to_string(); let args = args.to_vec(); let prompt = prompt.to_string(); let cwd = cwd.to_string(); let tx = tx.clone(); let event_log = event_log.clone(); tokio::task::spawn_blocking(move || { run_agent_pty_blocking( &sid, &aname, &cmd, &args, &prompt, &cwd, &tx, &event_log, log_writer.as_deref(), inactivity_timeout_secs, &child_killers, ) }) .await .map_err(|e| format!("Agent task panicked: {e}"))? } /// Dispatch a `stream_event` from Claude Code's `--include-partial-messages` output. /// /// Extracts `thinking_delta` and `text_delta` from `content_block_delta` events /// and routes them as `AgentEvent::Thinking` and `AgentEvent::Output` respectively. /// This ensures thinking traces flow through the dedicated `ThinkingBlock` UI /// component rather than appearing as unbounded regular output. fn handle_agent_stream_event( event: &serde_json::Value, story_id: &str, agent_name: &str, tx: &broadcast::Sender, event_log: &Mutex>, log_writer: Option<&Mutex>, ) { let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or(""); if event_type == "content_block_delta" && let Some(delta) = event.get("delta") { let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or(""); match delta_type { "thinking_delta" => { if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) { emit_event( AgentEvent::Thinking { story_id: story_id.to_string(), agent_name: agent_name.to_string(), text: thinking.to_string(), }, tx, event_log, log_writer, ); } } "text_delta" => { if let Some(text) = delta.get("text").and_then(|t| t.as_str()) { emit_event( AgentEvent::Output { story_id: story_id.to_string(), agent_name: agent_name.to_string(), text: text.to_string(), }, tx, event_log, log_writer, ); } } _ => {} } } } /// Helper to send an event to broadcast, event log, and optional persistent log file. pub(super) fn emit_event( event: AgentEvent, tx: &broadcast::Sender, event_log: &Mutex>, log_writer: Option<&Mutex>, ) { if let Ok(mut log) = event_log.lock() { log.push(event.clone()); } if let Some(writer) = log_writer && let Ok(mut w) = writer.lock() && let Err(e) = w.write_event(&event) { eprintln!("[agent_log] Failed to write event to log file: {e}"); } let _ = tx.send(event); } #[allow(clippy::too_many_arguments)] fn run_agent_pty_blocking( story_id: &str, agent_name: &str, command: &str, args: &[String], prompt: &str, cwd: &str, tx: &broadcast::Sender, event_log: &Mutex>, log_writer: Option<&Mutex>, inactivity_timeout_secs: u64, child_killers: &Arc>>>, ) -> Result { let pty_system = native_pty_system(); let pair = pty_system .openpty(PtySize { rows: 50, cols: 200, pixel_width: 0, pixel_height: 0, }) .map_err(|e| format!("Failed to open PTY: {e}"))?; let mut cmd = CommandBuilder::new(command); // -p must come first cmd.arg("-p"); cmd.arg(prompt); // Add configured args (e.g., --directory /path/to/worktree, --model, etc.) for arg in args { cmd.arg(arg); } cmd.arg("--output-format"); cmd.arg("stream-json"); cmd.arg("--verbose"); // Enable partial streaming so we receive thinking_delta and text_delta // events in real-time, rather than only complete assistant events. // Without this, thinking traces may not appear in the structured output // and instead leak as unstructured PTY text. cmd.arg("--include-partial-messages"); // Supervised agents don't need interactive permission prompts cmd.arg("--permission-mode"); cmd.arg("bypassPermissions"); cmd.cwd(cwd); cmd.env("NO_COLOR", "1"); // Allow spawning Claude Code from within a Claude Code session cmd.env_remove("CLAUDECODE"); cmd.env_remove("CLAUDE_CODE_ENTRYPOINT"); slog!("[agent:{story_id}:{agent_name}] Spawning {command} in {cwd} with args: {args:?}"); let mut child = pair .slave .spawn_command(cmd) .map_err(|e| format!("Failed to spawn agent for {story_id}:{agent_name}: {e}"))?; // Register the child killer so that kill_all_children() / stop_agent() can // terminate this process on server shutdown, even if the blocking thread // cannot be interrupted. The ChildKillerGuard deregisters on function exit. let killer_key = composite_key(story_id, agent_name); { let killer = child.clone_killer(); if let Ok(mut killers) = child_killers.lock() { killers.insert(killer_key.clone(), killer); } } let _killer_guard = ChildKillerGuard { killers: Arc::clone(child_killers), key: killer_key, }; drop(pair.slave); let reader = pair .master .try_clone_reader() .map_err(|e| format!("Failed to clone PTY reader: {e}"))?; drop(pair.master); // Spawn a reader thread to collect PTY output lines. // We use a channel so the main thread can apply an inactivity deadline // via recv_timeout: if no output arrives within the configured window // the process is killed and the agent is marked Failed. let (line_tx, line_rx) = std::sync::mpsc::channel::>(); std::thread::spawn(move || { let buf_reader = BufReader::new(reader); for line in buf_reader.lines() { if line_tx.send(line).is_err() { break; } } }); let timeout_dur = if inactivity_timeout_secs > 0 { Some(std::time::Duration::from_secs(inactivity_timeout_secs)) } else { None }; let mut session_id: Option = None; let mut token_usage: Option = None; loop { let recv_result = match timeout_dur { Some(dur) => line_rx.recv_timeout(dur), None => line_rx .recv() .map_err(|_| std::sync::mpsc::RecvTimeoutError::Disconnected), }; let line = match recv_result { Ok(Ok(l)) => l, Ok(Err(_)) => { // IO error reading from PTY — treat as EOF. break; } Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => { // Reader thread exited (EOF from PTY). break; } Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { slog_warn!( "[agent:{story_id}:{agent_name}] Inactivity timeout after \ {inactivity_timeout_secs}s with no output. Killing process." ); let _ = child.kill(); let _ = child.wait(); return Err(format!( "Agent inactivity timeout: no output received for {inactivity_timeout_secs}s" )); } }; let trimmed = line.trim(); if trimmed.is_empty() { continue; } // Try to parse as JSON let json: serde_json::Value = match serde_json::from_str(trimmed) { Ok(j) => j, Err(_) => { // Non-JSON output (terminal escapes etc.) — send as raw output emit_event( AgentEvent::Output { story_id: story_id.to_string(), agent_name: agent_name.to_string(), text: trimmed.to_string(), }, tx, event_log, log_writer, ); continue; } }; let event_type = json.get("type").and_then(|t| t.as_str()).unwrap_or(""); match event_type { "system" => { session_id = json .get("session_id") .and_then(|s| s.as_str()) .map(|s| s.to_string()); } // With --include-partial-messages, thinking and text arrive // incrementally via stream_event → content_block_delta. Handle // them here for real-time streaming to the frontend. "stream_event" => { if let Some(event) = json.get("event") { handle_agent_stream_event( event, story_id, agent_name, tx, event_log, log_writer, ); } } // Complete assistant events are skipped for content extraction // because thinking and text already arrived via stream_event. // The raw JSON is still forwarded as AgentJson below. "assistant" | "user" => {} "result" => { // Extract token usage from the result event. if let Some(usage) = TokenUsage::from_result_event(&json) { slog!( "[agent:{story_id}:{agent_name}] Token usage: in={} out={} cache_create={} cache_read={} cost=${:.4}", usage.input_tokens, usage.output_tokens, usage.cache_creation_input_tokens, usage.cache_read_input_tokens, usage.total_cost_usd, ); token_usage = Some(usage); } } _ => {} } // Forward all JSON events emit_event( AgentEvent::AgentJson { story_id: story_id.to_string(), agent_name: agent_name.to_string(), data: json, }, tx, event_log, log_writer, ); } let _ = child.kill(); let _ = child.wait(); slog!( "[agent:{story_id}:{agent_name}] Done. Session: {:?}", session_id ); Ok(PtyResult { session_id, token_usage, }) } #[cfg(test)] mod tests { use super::*; use crate::agents::AgentEvent; #[test] fn test_emit_event_writes_to_log_writer() { let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); let log_writer = AgentLogWriter::new(root, "42_story_foo", "coder-1", "sess-emit").unwrap(); let log_mutex = Mutex::new(log_writer); let (tx, _rx) = broadcast::channel::(64); let event_log: Mutex> = Mutex::new(Vec::new()); let event = AgentEvent::Status { story_id: "42_story_foo".to_string(), agent_name: "coder-1".to_string(), status: "running".to_string(), }; emit_event(event, &tx, &event_log, Some(&log_mutex)); // Verify event was added to in-memory log let mem_events = event_log.lock().unwrap(); assert_eq!(mem_events.len(), 1); drop(mem_events); // Verify event was written to the log file let log_path = crate::agent_log::log_file_path(root, "42_story_foo", "coder-1", "sess-emit"); let entries = crate::agent_log::read_log(&log_path).unwrap(); assert_eq!(entries.len(), 1); assert_eq!(entries[0].event["type"], "status"); assert_eq!(entries[0].event["status"], "running"); } // ── bug 167: handle_agent_stream_event routes thinking/text correctly ─── #[test] fn stream_event_thinking_delta_emits_thinking_event() { let (tx, mut rx) = broadcast::channel::(64); let event_log: Mutex> = Mutex::new(Vec::new()); let event = serde_json::json!({ "type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "Let me analyze this..."} }); handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None); let received = rx.try_recv().unwrap(); match received { AgentEvent::Thinking { story_id, agent_name, text, } => { assert_eq!(story_id, "s1"); assert_eq!(agent_name, "coder-1"); assert_eq!(text, "Let me analyze this..."); } other => panic!("Expected Thinking event, got: {other:?}"), } } #[test] fn stream_event_text_delta_emits_output_event() { let (tx, mut rx) = broadcast::channel::(64); let event_log: Mutex> = Mutex::new(Vec::new()); let event = serde_json::json!({ "type": "content_block_delta", "delta": {"type": "text_delta", "text": "Here is the result."} }); handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None); let received = rx.try_recv().unwrap(); match received { AgentEvent::Output { story_id, agent_name, text, } => { assert_eq!(story_id, "s1"); assert_eq!(agent_name, "coder-1"); assert_eq!(text, "Here is the result."); } other => panic!("Expected Output event, got: {other:?}"), } } #[test] fn stream_event_input_json_delta_ignored() { let (tx, mut rx) = broadcast::channel::(64); let event_log: Mutex> = Mutex::new(Vec::new()); let event = serde_json::json!({ "type": "content_block_delta", "delta": {"type": "input_json_delta", "partial_json": "{\"file\":"} }); handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None); // No event should be emitted for tool argument deltas assert!(rx.try_recv().is_err()); } #[test] fn stream_event_non_delta_type_ignored() { let (tx, mut rx) = broadcast::channel::(64); let event_log: Mutex> = Mutex::new(Vec::new()); let event = serde_json::json!({ "type": "message_start", "message": {"role": "assistant"} }); handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None); assert!(rx.try_recv().is_err()); } }