diff --git a/server/src/agents/pty/events.rs b/server/src/agents/pty/events.rs new file mode 100644 index 00000000..36d568a2 --- /dev/null +++ b/server/src/agents/pty/events.rs @@ -0,0 +1,80 @@ +//! Event emission helpers: routing PTY output lines to the broadcast channel and log. +use std::sync::Mutex; + +use tokio::sync::broadcast; + +use crate::agent_log::AgentLogWriter; +use crate::agents::AgentEvent; + +/// Dispatch a `stream_event` from Claude Code's `--include-partial-messages` output. +/// +/// Extracts `thinking_delta` and `text_delta` from `content_block_delta` events +/// and routes them as `AgentEvent::Thinking` and `AgentEvent::Output` respectively. +/// This ensures thinking traces flow through the dedicated `ThinkingBlock` UI +/// component rather than appearing as unbounded regular output. +pub(super) fn handle_agent_stream_event( + event: &serde_json::Value, + story_id: &str, + agent_name: &str, + tx: &broadcast::Sender, + event_log: &Mutex>, + log_writer: Option<&Mutex>, +) { + let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or(""); + + if event_type == "content_block_delta" + && let Some(delta) = event.get("delta") + { + let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or(""); + match delta_type { + "thinking_delta" => { + if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) { + emit_event( + AgentEvent::Thinking { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + text: thinking.to_string(), + }, + tx, + event_log, + log_writer, + ); + } + } + "text_delta" => { + if let Some(text) = delta.get("text").and_then(|t| t.as_str()) { + emit_event( + AgentEvent::Output { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + text: text.to_string(), + }, + tx, + event_log, + log_writer, + ); + } + } + _ => {} + } + } +} + +/// Helper to send an event to broadcast, event log, and optional persistent log file. +pub(in crate::agents) fn emit_event( + event: AgentEvent, + tx: &broadcast::Sender, + event_log: &Mutex>, + log_writer: Option<&Mutex>, +) { + if let Ok(mut log) = event_log.lock() { + log.push(event.clone()); + } + if let Some(writer) = log_writer + && let Ok(mut w) = writer.lock() + && let Err(e) = w.write_event(&event) + { + eprintln!("[agent_log] Failed to write event to log file: {e}"); + } + let _ = tx.send(event); +} diff --git a/server/src/agents/pty/mod.rs b/server/src/agents/pty/mod.rs new file mode 100644 index 00000000..2f56477b --- /dev/null +++ b/server/src/agents/pty/mod.rs @@ -0,0 +1,328 @@ +//! PTY runner — spawns agent processes in pseudo-terminals and streams their output. + +mod events; +mod runner; +mod types; + +pub(in crate::agents) use events::emit_event; +pub(in crate::agents) use runner::run_agent_pty_streaming; + +#[cfg(test)] +mod tests { + use super::events::handle_agent_stream_event; + use super::*; + use crate::agents::AgentEvent; + use crate::io::watcher::WatcherEvent; + use std::collections::HashMap; + use std::sync::{Arc, Mutex}; + use tokio::sync::broadcast; + + // ── AC1: pty detects rate_limit_event and emits RateLimitWarning ───────── + + /// Verify that when a `rate_limit_event` JSON line appears in PTY output, + /// `run_agent_pty_streaming` sends a `WatcherEvent::RateLimitWarning` with + /// the correct story_id and agent_name. + /// + /// The command invoked is: `sh -p --