From 619bdd9c826c7e9321fc653e69874d3956c138e5 Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 28 Apr 2026 16:34:16 +0000 Subject: [PATCH] huskies: merge 801 --- server/src/agents/pty/events.rs | 80 ++++ server/src/agents/pty/mod.rs | 328 +++++++++++++++ server/src/agents/{pty.rs => pty/runner.rs} | 418 +------------------- server/src/agents/pty/types.rs | 30 ++ 4 files changed, 443 insertions(+), 413 deletions(-) create mode 100644 server/src/agents/pty/events.rs create mode 100644 server/src/agents/pty/mod.rs rename server/src/agents/{pty.rs => pty/runner.rs} (51%) create mode 100644 server/src/agents/pty/types.rs diff --git a/server/src/agents/pty/events.rs b/server/src/agents/pty/events.rs new file mode 100644 index 00000000..36d568a2 --- /dev/null +++ b/server/src/agents/pty/events.rs @@ -0,0 +1,80 @@ +//! Event emission helpers: routing PTY output lines to the broadcast channel and log. +use std::sync::Mutex; + +use tokio::sync::broadcast; + +use crate::agent_log::AgentLogWriter; +use crate::agents::AgentEvent; + +/// Dispatch a `stream_event` from Claude Code's `--include-partial-messages` output. +/// +/// Extracts `thinking_delta` and `text_delta` from `content_block_delta` events +/// and routes them as `AgentEvent::Thinking` and `AgentEvent::Output` respectively. +/// This ensures thinking traces flow through the dedicated `ThinkingBlock` UI +/// component rather than appearing as unbounded regular output. +pub(super) fn handle_agent_stream_event( + event: &serde_json::Value, + story_id: &str, + agent_name: &str, + tx: &broadcast::Sender, + event_log: &Mutex>, + log_writer: Option<&Mutex>, +) { + let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or(""); + + if event_type == "content_block_delta" + && let Some(delta) = event.get("delta") + { + let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or(""); + match delta_type { + "thinking_delta" => { + if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) { + emit_event( + AgentEvent::Thinking { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + text: thinking.to_string(), + }, + tx, + event_log, + log_writer, + ); + } + } + "text_delta" => { + if let Some(text) = delta.get("text").and_then(|t| t.as_str()) { + emit_event( + AgentEvent::Output { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + text: text.to_string(), + }, + tx, + event_log, + log_writer, + ); + } + } + _ => {} + } + } +} + +/// Helper to send an event to broadcast, event log, and optional persistent log file. +pub(in crate::agents) fn emit_event( + event: AgentEvent, + tx: &broadcast::Sender, + event_log: &Mutex>, + log_writer: Option<&Mutex>, +) { + if let Ok(mut log) = event_log.lock() { + log.push(event.clone()); + } + if let Some(writer) = log_writer + && let Ok(mut w) = writer.lock() + && let Err(e) = w.write_event(&event) + { + eprintln!("[agent_log] Failed to write event to log file: {e}"); + } + let _ = tx.send(event); +} diff --git a/server/src/agents/pty/mod.rs b/server/src/agents/pty/mod.rs new file mode 100644 index 00000000..2f56477b --- /dev/null +++ b/server/src/agents/pty/mod.rs @@ -0,0 +1,328 @@ +//! PTY runner — spawns agent processes in pseudo-terminals and streams their output. + +mod events; +mod runner; +mod types; + +pub(in crate::agents) use events::emit_event; +pub(in crate::agents) use runner::run_agent_pty_streaming; + +#[cfg(test)] +mod tests { + use super::events::handle_agent_stream_event; + use super::*; + use crate::agents::AgentEvent; + use crate::io::watcher::WatcherEvent; + use std::collections::HashMap; + use std::sync::{Arc, Mutex}; + use tokio::sync::broadcast; + + // ── AC1: pty detects rate_limit_event and emits RateLimitWarning ───────── + + /// Verify that when a `rate_limit_event` JSON line appears in PTY output, + /// `run_agent_pty_streaming` sends a `WatcherEvent::RateLimitWarning` with + /// the correct story_id and agent_name. + /// + /// The command invoked is: `sh -p --