huskies: merge 801
This commit is contained in:
@@ -0,0 +1,80 @@
|
|||||||
|
//! Event emission helpers: routing PTY output lines to the broadcast channel and log.
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
|
use crate::agent_log::AgentLogWriter;
|
||||||
|
use crate::agents::AgentEvent;
|
||||||
|
|
||||||
|
/// Dispatch a `stream_event` from Claude Code's `--include-partial-messages` output.
|
||||||
|
///
|
||||||
|
/// Extracts `thinking_delta` and `text_delta` from `content_block_delta` events
|
||||||
|
/// and routes them as `AgentEvent::Thinking` and `AgentEvent::Output` respectively.
|
||||||
|
/// This ensures thinking traces flow through the dedicated `ThinkingBlock` UI
|
||||||
|
/// component rather than appearing as unbounded regular output.
|
||||||
|
pub(super) fn handle_agent_stream_event(
|
||||||
|
event: &serde_json::Value,
|
||||||
|
story_id: &str,
|
||||||
|
agent_name: &str,
|
||||||
|
tx: &broadcast::Sender<AgentEvent>,
|
||||||
|
event_log: &Mutex<Vec<AgentEvent>>,
|
||||||
|
log_writer: Option<&Mutex<AgentLogWriter>>,
|
||||||
|
) {
|
||||||
|
let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
||||||
|
|
||||||
|
if event_type == "content_block_delta"
|
||||||
|
&& let Some(delta) = event.get("delta")
|
||||||
|
{
|
||||||
|
let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
||||||
|
match delta_type {
|
||||||
|
"thinking_delta" => {
|
||||||
|
if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) {
|
||||||
|
emit_event(
|
||||||
|
AgentEvent::Thinking {
|
||||||
|
story_id: story_id.to_string(),
|
||||||
|
agent_name: agent_name.to_string(),
|
||||||
|
text: thinking.to_string(),
|
||||||
|
},
|
||||||
|
tx,
|
||||||
|
event_log,
|
||||||
|
log_writer,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"text_delta" => {
|
||||||
|
if let Some(text) = delta.get("text").and_then(|t| t.as_str()) {
|
||||||
|
emit_event(
|
||||||
|
AgentEvent::Output {
|
||||||
|
story_id: story_id.to_string(),
|
||||||
|
agent_name: agent_name.to_string(),
|
||||||
|
text: text.to_string(),
|
||||||
|
},
|
||||||
|
tx,
|
||||||
|
event_log,
|
||||||
|
log_writer,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper to send an event to broadcast, event log, and optional persistent log file.
|
||||||
|
pub(in crate::agents) fn emit_event(
|
||||||
|
event: AgentEvent,
|
||||||
|
tx: &broadcast::Sender<AgentEvent>,
|
||||||
|
event_log: &Mutex<Vec<AgentEvent>>,
|
||||||
|
log_writer: Option<&Mutex<AgentLogWriter>>,
|
||||||
|
) {
|
||||||
|
if let Ok(mut log) = event_log.lock() {
|
||||||
|
log.push(event.clone());
|
||||||
|
}
|
||||||
|
if let Some(writer) = log_writer
|
||||||
|
&& let Ok(mut w) = writer.lock()
|
||||||
|
&& let Err(e) = w.write_event(&event)
|
||||||
|
{
|
||||||
|
eprintln!("[agent_log] Failed to write event to log file: {e}");
|
||||||
|
}
|
||||||
|
let _ = tx.send(event);
|
||||||
|
}
|
||||||
@@ -0,0 +1,328 @@
|
|||||||
|
//! PTY runner — spawns agent processes in pseudo-terminals and streams their output.
|
||||||
|
|
||||||
|
mod events;
|
||||||
|
mod runner;
|
||||||
|
mod types;
|
||||||
|
|
||||||
|
pub(in crate::agents) use events::emit_event;
|
||||||
|
pub(in crate::agents) use runner::run_agent_pty_streaming;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::events::handle_agent_stream_event;
|
||||||
|
use super::*;
|
||||||
|
use crate::agents::AgentEvent;
|
||||||
|
use crate::io::watcher::WatcherEvent;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
|
// ── AC1: pty detects rate_limit_event and emits RateLimitWarning ─────────
|
||||||
|
|
||||||
|
/// Verify that when a `rate_limit_event` JSON line appears in PTY output,
|
||||||
|
/// `run_agent_pty_streaming` sends a `WatcherEvent::RateLimitWarning` with
|
||||||
|
/// the correct story_id and agent_name.
|
||||||
|
///
|
||||||
|
/// The command invoked is: `sh -p -- <script>` where `--` terminates
|
||||||
|
/// option parsing so the script path is treated as the operand.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn rate_limit_event_json_sends_watcher_warning() {
|
||||||
|
use std::os::unix::fs::PermissionsExt;
|
||||||
|
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let script = tmp.path().join("emit_rate_limit.sh");
|
||||||
|
std::fs::write(
|
||||||
|
&script,
|
||||||
|
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"allowed_warning\"}}'\n",
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
|
||||||
|
|
||||||
|
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
||||||
|
let (watcher_tx, mut watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
||||||
|
let event_log = Arc::new(Mutex::new(Vec::new()));
|
||||||
|
let child_killers = Arc::new(Mutex::new(HashMap::new()));
|
||||||
|
|
||||||
|
// sh -p "--" <script>: -p = privileged mode, "--" = end options,
|
||||||
|
// then the script path is the file operand.
|
||||||
|
let result = run_agent_pty_streaming(
|
||||||
|
"365_story_test",
|
||||||
|
"coder-1",
|
||||||
|
"sh",
|
||||||
|
&[script.to_string_lossy().to_string()],
|
||||||
|
"--",
|
||||||
|
"/tmp",
|
||||||
|
&tx,
|
||||||
|
&event_log,
|
||||||
|
None,
|
||||||
|
0,
|
||||||
|
child_killers,
|
||||||
|
watcher_tx,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert!(result.is_ok(), "PTY run should succeed: {:?}", result.err());
|
||||||
|
|
||||||
|
let evt = watcher_rx
|
||||||
|
.try_recv()
|
||||||
|
.expect("Expected a RateLimitWarning to be sent on watcher_tx");
|
||||||
|
match evt {
|
||||||
|
WatcherEvent::RateLimitWarning {
|
||||||
|
story_id,
|
||||||
|
agent_name,
|
||||||
|
} => {
|
||||||
|
assert_eq!(story_id, "365_story_test");
|
||||||
|
assert_eq!(agent_name, "coder-1");
|
||||||
|
}
|
||||||
|
other => panic!("Expected RateLimitWarning, got: {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC1: hard block with `reset_at` emits `RateLimitHardBlock` with the
|
||||||
|
/// correct story_id, agent_name, and parsed reset_at timestamp.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn rate_limit_hard_block_sends_watcher_hard_block_event() {
|
||||||
|
use std::os::unix::fs::PermissionsExt;
|
||||||
|
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let script = tmp.path().join("emit_hard_block.sh");
|
||||||
|
std::fs::write(
|
||||||
|
&script,
|
||||||
|
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"hard_block\",\"reset_at\":\"2099-01-01T12:00:00Z\"}}'\n",
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
|
||||||
|
|
||||||
|
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
||||||
|
let (watcher_tx, mut watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
||||||
|
let event_log = Arc::new(Mutex::new(Vec::new()));
|
||||||
|
let child_killers = Arc::new(Mutex::new(HashMap::new()));
|
||||||
|
|
||||||
|
let result = run_agent_pty_streaming(
|
||||||
|
"423_story_rate_limit",
|
||||||
|
"coder-1",
|
||||||
|
"sh",
|
||||||
|
&[script.to_string_lossy().to_string()],
|
||||||
|
"--",
|
||||||
|
"/tmp",
|
||||||
|
&tx,
|
||||||
|
&event_log,
|
||||||
|
None,
|
||||||
|
0,
|
||||||
|
child_killers,
|
||||||
|
watcher_tx,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert!(result.is_ok(), "PTY run should succeed: {:?}", result.err());
|
||||||
|
|
||||||
|
let evt = watcher_rx
|
||||||
|
.try_recv()
|
||||||
|
.expect("Expected a RateLimitHardBlock to be sent on watcher_tx");
|
||||||
|
match evt {
|
||||||
|
WatcherEvent::RateLimitHardBlock {
|
||||||
|
story_id,
|
||||||
|
agent_name,
|
||||||
|
reset_at,
|
||||||
|
} => {
|
||||||
|
assert_eq!(story_id, "423_story_rate_limit");
|
||||||
|
assert_eq!(agent_name, "coder-1");
|
||||||
|
assert_eq!(
|
||||||
|
reset_at.to_rfc3339(),
|
||||||
|
"2099-01-01T12:00:00+00:00",
|
||||||
|
"reset_at should match the parsed timestamp"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
other => panic!("Expected RateLimitHardBlock, got: {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bug 496: hard block WITHOUT `reset_at` must still emit `RateLimitHardBlock`
|
||||||
|
/// (not `RateLimitWarning`), using a default 5-minute backoff so the
|
||||||
|
/// auto-scheduler can set a retry timer.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn rate_limit_hard_block_without_reset_at_sends_hard_block_event() {
|
||||||
|
use std::os::unix::fs::PermissionsExt;
|
||||||
|
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let script = tmp.path().join("emit_hard_block_no_reset.sh");
|
||||||
|
std::fs::write(
|
||||||
|
&script,
|
||||||
|
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"rejected\"}}'\n",
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
|
||||||
|
|
||||||
|
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
||||||
|
let (watcher_tx, mut watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
||||||
|
let event_log = Arc::new(Mutex::new(Vec::new()));
|
||||||
|
let child_killers = Arc::new(Mutex::new(HashMap::new()));
|
||||||
|
|
||||||
|
let before = chrono::Utc::now();
|
||||||
|
let result = run_agent_pty_streaming(
|
||||||
|
"496_bug_hard_rate_limit",
|
||||||
|
"coder-1",
|
||||||
|
"sh",
|
||||||
|
&[script.to_string_lossy().to_string()],
|
||||||
|
"--",
|
||||||
|
"/tmp",
|
||||||
|
&tx,
|
||||||
|
&event_log,
|
||||||
|
None,
|
||||||
|
0,
|
||||||
|
child_killers,
|
||||||
|
watcher_tx,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let after = chrono::Utc::now();
|
||||||
|
|
||||||
|
assert!(result.is_ok(), "PTY run should succeed: {:?}", result.err());
|
||||||
|
|
||||||
|
let evt = watcher_rx
|
||||||
|
.try_recv()
|
||||||
|
.expect("Expected a RateLimitHardBlock to be sent on watcher_tx");
|
||||||
|
match evt {
|
||||||
|
WatcherEvent::RateLimitHardBlock {
|
||||||
|
story_id,
|
||||||
|
agent_name,
|
||||||
|
reset_at,
|
||||||
|
} => {
|
||||||
|
assert_eq!(story_id, "496_bug_hard_rate_limit");
|
||||||
|
assert_eq!(agent_name, "coder-1");
|
||||||
|
// reset_at should be ~5 minutes from when the event fired
|
||||||
|
let min_expected = before + chrono::Duration::minutes(4);
|
||||||
|
let max_expected = after + chrono::Duration::minutes(6);
|
||||||
|
assert!(
|
||||||
|
reset_at >= min_expected && reset_at <= max_expected,
|
||||||
|
"reset_at {reset_at} should be ~5 minutes from now"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
other => panic!("Expected RateLimitHardBlock (with default backoff), got: {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_emit_event_writes_to_log_writer() {
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let root = tmp.path();
|
||||||
|
|
||||||
|
let log_writer =
|
||||||
|
crate::agent_log::AgentLogWriter::new(root, "42_story_foo", "coder-1", "sess-emit")
|
||||||
|
.unwrap();
|
||||||
|
let log_mutex = Mutex::new(log_writer);
|
||||||
|
|
||||||
|
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
||||||
|
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
||||||
|
|
||||||
|
let event = AgentEvent::Status {
|
||||||
|
story_id: "42_story_foo".to_string(),
|
||||||
|
agent_name: "coder-1".to_string(),
|
||||||
|
status: "running".to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
emit_event(event, &tx, &event_log, Some(&log_mutex));
|
||||||
|
|
||||||
|
// Verify event was added to in-memory log
|
||||||
|
let mem_events = event_log.lock().unwrap();
|
||||||
|
assert_eq!(mem_events.len(), 1);
|
||||||
|
drop(mem_events);
|
||||||
|
|
||||||
|
// Verify event was written to the log file
|
||||||
|
let log_path =
|
||||||
|
crate::agent_log::log_file_path(root, "42_story_foo", "coder-1", "sess-emit");
|
||||||
|
let entries = crate::agent_log::read_log(&log_path).unwrap();
|
||||||
|
assert_eq!(entries.len(), 1);
|
||||||
|
assert_eq!(entries[0].event["type"], "status");
|
||||||
|
assert_eq!(entries[0].event["status"], "running");
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── bug 167: handle_agent_stream_event routes thinking/text correctly ───
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn stream_event_thinking_delta_emits_thinking_event() {
|
||||||
|
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
||||||
|
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
||||||
|
|
||||||
|
let event = serde_json::json!({
|
||||||
|
"type": "content_block_delta",
|
||||||
|
"delta": {"type": "thinking_delta", "thinking": "Let me analyze this..."}
|
||||||
|
});
|
||||||
|
|
||||||
|
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
||||||
|
|
||||||
|
let received = rx.try_recv().unwrap();
|
||||||
|
match received {
|
||||||
|
AgentEvent::Thinking {
|
||||||
|
story_id,
|
||||||
|
agent_name,
|
||||||
|
text,
|
||||||
|
} => {
|
||||||
|
assert_eq!(story_id, "s1");
|
||||||
|
assert_eq!(agent_name, "coder-1");
|
||||||
|
assert_eq!(text, "Let me analyze this...");
|
||||||
|
}
|
||||||
|
other => panic!("Expected Thinking event, got: {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn stream_event_text_delta_emits_output_event() {
|
||||||
|
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
||||||
|
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
||||||
|
|
||||||
|
let event = serde_json::json!({
|
||||||
|
"type": "content_block_delta",
|
||||||
|
"delta": {"type": "text_delta", "text": "Here is the result."}
|
||||||
|
});
|
||||||
|
|
||||||
|
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
||||||
|
|
||||||
|
let received = rx.try_recv().unwrap();
|
||||||
|
match received {
|
||||||
|
AgentEvent::Output {
|
||||||
|
story_id,
|
||||||
|
agent_name,
|
||||||
|
text,
|
||||||
|
} => {
|
||||||
|
assert_eq!(story_id, "s1");
|
||||||
|
assert_eq!(agent_name, "coder-1");
|
||||||
|
assert_eq!(text, "Here is the result.");
|
||||||
|
}
|
||||||
|
other => panic!("Expected Output event, got: {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn stream_event_input_json_delta_ignored() {
|
||||||
|
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
||||||
|
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
||||||
|
|
||||||
|
let event = serde_json::json!({
|
||||||
|
"type": "content_block_delta",
|
||||||
|
"delta": {"type": "input_json_delta", "partial_json": "{\"file\":"}
|
||||||
|
});
|
||||||
|
|
||||||
|
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
||||||
|
|
||||||
|
// No event should be emitted for tool argument deltas
|
||||||
|
assert!(rx.try_recv().is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn stream_event_non_delta_type_ignored() {
|
||||||
|
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
||||||
|
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
||||||
|
|
||||||
|
let event = serde_json::json!({
|
||||||
|
"type": "message_start",
|
||||||
|
"message": {"role": "assistant"}
|
||||||
|
});
|
||||||
|
|
||||||
|
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
||||||
|
|
||||||
|
assert!(rx.try_recv().is_err());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
//! PTY runner — spawns agent processes in pseudo-terminals and streams their output.
|
//! PTY process spawning and output loop: builds the command, drives the reader thread,
|
||||||
|
//! and dispatches parsed JSON events to the broadcast channel.
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::{BufRead, BufReader};
|
use std::io::{BufRead, BufReader};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
@@ -6,34 +7,14 @@ use std::sync::{Arc, Mutex};
|
|||||||
use portable_pty::{ChildKiller, CommandBuilder, PtySize, native_pty_system};
|
use portable_pty::{ChildKiller, CommandBuilder, PtySize, native_pty_system};
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
use super::{AgentEvent, TokenUsage};
|
|
||||||
use crate::agent_log::AgentLogWriter;
|
use crate::agent_log::AgentLogWriter;
|
||||||
|
use crate::agents::{AgentEvent, TokenUsage};
|
||||||
use crate::io::watcher::WatcherEvent;
|
use crate::io::watcher::WatcherEvent;
|
||||||
use crate::slog;
|
use crate::slog;
|
||||||
use crate::slog_warn;
|
use crate::slog_warn;
|
||||||
|
|
||||||
/// Result from a PTY agent session, containing the session ID and token usage.
|
use super::events::{emit_event, handle_agent_stream_event};
|
||||||
pub(in crate::agents) struct PtyResult {
|
use super::types::{ChildKillerGuard, PtyResult, composite_key};
|
||||||
pub session_id: Option<String>,
|
|
||||||
pub token_usage: Option<TokenUsage>,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn composite_key(story_id: &str, agent_name: &str) -> String {
|
|
||||||
format!("{story_id}:{agent_name}")
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ChildKillerGuard {
|
|
||||||
killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
|
||||||
key: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for ChildKillerGuard {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
if let Ok(mut killers) = self.killers.lock() {
|
|
||||||
killers.remove(&self.key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Spawn claude agent in a PTY and stream events through the broadcast channel.
|
/// Spawn claude agent in a PTY and stream events through the broadcast channel.
|
||||||
///
|
///
|
||||||
@@ -99,79 +80,6 @@ pub(in crate::agents) async fn run_agent_pty_streaming(
|
|||||||
.map_err(|e| format!("Agent task panicked: {e}"))?
|
.map_err(|e| format!("Agent task panicked: {e}"))?
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Dispatch a `stream_event` from Claude Code's `--include-partial-messages` output.
|
|
||||||
///
|
|
||||||
/// Extracts `thinking_delta` and `text_delta` from `content_block_delta` events
|
|
||||||
/// and routes them as `AgentEvent::Thinking` and `AgentEvent::Output` respectively.
|
|
||||||
/// This ensures thinking traces flow through the dedicated `ThinkingBlock` UI
|
|
||||||
/// component rather than appearing as unbounded regular output.
|
|
||||||
fn handle_agent_stream_event(
|
|
||||||
event: &serde_json::Value,
|
|
||||||
story_id: &str,
|
|
||||||
agent_name: &str,
|
|
||||||
tx: &broadcast::Sender<AgentEvent>,
|
|
||||||
event_log: &Mutex<Vec<AgentEvent>>,
|
|
||||||
log_writer: Option<&Mutex<AgentLogWriter>>,
|
|
||||||
) {
|
|
||||||
let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
|
||||||
|
|
||||||
if event_type == "content_block_delta"
|
|
||||||
&& let Some(delta) = event.get("delta")
|
|
||||||
{
|
|
||||||
let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
|
||||||
match delta_type {
|
|
||||||
"thinking_delta" => {
|
|
||||||
if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) {
|
|
||||||
emit_event(
|
|
||||||
AgentEvent::Thinking {
|
|
||||||
story_id: story_id.to_string(),
|
|
||||||
agent_name: agent_name.to_string(),
|
|
||||||
text: thinking.to_string(),
|
|
||||||
},
|
|
||||||
tx,
|
|
||||||
event_log,
|
|
||||||
log_writer,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
"text_delta" => {
|
|
||||||
if let Some(text) = delta.get("text").and_then(|t| t.as_str()) {
|
|
||||||
emit_event(
|
|
||||||
AgentEvent::Output {
|
|
||||||
story_id: story_id.to_string(),
|
|
||||||
agent_name: agent_name.to_string(),
|
|
||||||
text: text.to_string(),
|
|
||||||
},
|
|
||||||
tx,
|
|
||||||
event_log,
|
|
||||||
log_writer,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Helper to send an event to broadcast, event log, and optional persistent log file.
|
|
||||||
pub(super) fn emit_event(
|
|
||||||
event: AgentEvent,
|
|
||||||
tx: &broadcast::Sender<AgentEvent>,
|
|
||||||
event_log: &Mutex<Vec<AgentEvent>>,
|
|
||||||
log_writer: Option<&Mutex<AgentLogWriter>>,
|
|
||||||
) {
|
|
||||||
if let Ok(mut log) = event_log.lock() {
|
|
||||||
log.push(event.clone());
|
|
||||||
}
|
|
||||||
if let Some(writer) = log_writer
|
|
||||||
&& let Ok(mut w) = writer.lock()
|
|
||||||
&& let Err(e) = w.write_event(&event)
|
|
||||||
{
|
|
||||||
eprintln!("[agent_log] Failed to write event to log file: {e}");
|
|
||||||
}
|
|
||||||
let _ = tx.send(event);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
fn run_agent_pty_blocking(
|
fn run_agent_pty_blocking(
|
||||||
story_id: &str,
|
story_id: &str,
|
||||||
@@ -527,319 +435,3 @@ fn run_agent_pty_blocking(
|
|||||||
token_usage,
|
token_usage,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use crate::agents::AgentEvent;
|
|
||||||
use crate::io::watcher::WatcherEvent;
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
// ── AC1: pty detects rate_limit_event and emits RateLimitWarning ─────────
|
|
||||||
|
|
||||||
/// Verify that when a `rate_limit_event` JSON line appears in PTY output,
|
|
||||||
/// `run_agent_pty_streaming` sends a `WatcherEvent::RateLimitWarning` with
|
|
||||||
/// the correct story_id and agent_name.
|
|
||||||
///
|
|
||||||
/// The command invoked is: `sh -p -- <script>` where `--` terminates
|
|
||||||
/// option parsing so the script path is treated as the operand.
|
|
||||||
#[tokio::test]
|
|
||||||
async fn rate_limit_event_json_sends_watcher_warning() {
|
|
||||||
use std::os::unix::fs::PermissionsExt;
|
|
||||||
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let script = tmp.path().join("emit_rate_limit.sh");
|
|
||||||
std::fs::write(
|
|
||||||
&script,
|
|
||||||
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"allowed_warning\"}}'\n",
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
|
|
||||||
|
|
||||||
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
|
||||||
let (watcher_tx, mut watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
|
||||||
let event_log = Arc::new(Mutex::new(Vec::new()));
|
|
||||||
let child_killers = Arc::new(Mutex::new(HashMap::new()));
|
|
||||||
|
|
||||||
// sh -p "--" <script>: -p = privileged mode, "--" = end options,
|
|
||||||
// then the script path is the file operand.
|
|
||||||
let result = run_agent_pty_streaming(
|
|
||||||
"365_story_test",
|
|
||||||
"coder-1",
|
|
||||||
"sh",
|
|
||||||
&[script.to_string_lossy().to_string()],
|
|
||||||
"--",
|
|
||||||
"/tmp",
|
|
||||||
&tx,
|
|
||||||
&event_log,
|
|
||||||
None,
|
|
||||||
0,
|
|
||||||
child_killers,
|
|
||||||
watcher_tx,
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
assert!(result.is_ok(), "PTY run should succeed: {:?}", result.err());
|
|
||||||
|
|
||||||
let evt = watcher_rx
|
|
||||||
.try_recv()
|
|
||||||
.expect("Expected a RateLimitWarning to be sent on watcher_tx");
|
|
||||||
match evt {
|
|
||||||
WatcherEvent::RateLimitWarning {
|
|
||||||
story_id,
|
|
||||||
agent_name,
|
|
||||||
} => {
|
|
||||||
assert_eq!(story_id, "365_story_test");
|
|
||||||
assert_eq!(agent_name, "coder-1");
|
|
||||||
}
|
|
||||||
other => panic!("Expected RateLimitWarning, got: {other:?}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// AC1: hard block with `reset_at` emits `RateLimitHardBlock` with the
|
|
||||||
/// correct story_id, agent_name, and parsed reset_at timestamp.
|
|
||||||
#[tokio::test]
|
|
||||||
async fn rate_limit_hard_block_sends_watcher_hard_block_event() {
|
|
||||||
use std::os::unix::fs::PermissionsExt;
|
|
||||||
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let script = tmp.path().join("emit_hard_block.sh");
|
|
||||||
std::fs::write(
|
|
||||||
&script,
|
|
||||||
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"hard_block\",\"reset_at\":\"2099-01-01T12:00:00Z\"}}'\n",
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
|
|
||||||
|
|
||||||
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
|
||||||
let (watcher_tx, mut watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
|
||||||
let event_log = Arc::new(Mutex::new(Vec::new()));
|
|
||||||
let child_killers = Arc::new(Mutex::new(HashMap::new()));
|
|
||||||
|
|
||||||
let result = run_agent_pty_streaming(
|
|
||||||
"423_story_rate_limit",
|
|
||||||
"coder-1",
|
|
||||||
"sh",
|
|
||||||
&[script.to_string_lossy().to_string()],
|
|
||||||
"--",
|
|
||||||
"/tmp",
|
|
||||||
&tx,
|
|
||||||
&event_log,
|
|
||||||
None,
|
|
||||||
0,
|
|
||||||
child_killers,
|
|
||||||
watcher_tx,
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
assert!(result.is_ok(), "PTY run should succeed: {:?}", result.err());
|
|
||||||
|
|
||||||
let evt = watcher_rx
|
|
||||||
.try_recv()
|
|
||||||
.expect("Expected a RateLimitHardBlock to be sent on watcher_tx");
|
|
||||||
match evt {
|
|
||||||
WatcherEvent::RateLimitHardBlock {
|
|
||||||
story_id,
|
|
||||||
agent_name,
|
|
||||||
reset_at,
|
|
||||||
} => {
|
|
||||||
assert_eq!(story_id, "423_story_rate_limit");
|
|
||||||
assert_eq!(agent_name, "coder-1");
|
|
||||||
assert_eq!(
|
|
||||||
reset_at.to_rfc3339(),
|
|
||||||
"2099-01-01T12:00:00+00:00",
|
|
||||||
"reset_at should match the parsed timestamp"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
other => panic!("Expected RateLimitHardBlock, got: {other:?}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Bug 496: hard block WITHOUT `reset_at` must still emit `RateLimitHardBlock`
|
|
||||||
/// (not `RateLimitWarning`), using a default 5-minute backoff so the
|
|
||||||
/// auto-scheduler can set a retry timer.
|
|
||||||
#[tokio::test]
|
|
||||||
async fn rate_limit_hard_block_without_reset_at_sends_hard_block_event() {
|
|
||||||
use std::os::unix::fs::PermissionsExt;
|
|
||||||
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let script = tmp.path().join("emit_hard_block_no_reset.sh");
|
|
||||||
std::fs::write(
|
|
||||||
&script,
|
|
||||||
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"rejected\"}}'\n",
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
|
|
||||||
|
|
||||||
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
|
||||||
let (watcher_tx, mut watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
|
||||||
let event_log = Arc::new(Mutex::new(Vec::new()));
|
|
||||||
let child_killers = Arc::new(Mutex::new(HashMap::new()));
|
|
||||||
|
|
||||||
let before = chrono::Utc::now();
|
|
||||||
let result = run_agent_pty_streaming(
|
|
||||||
"496_bug_hard_rate_limit",
|
|
||||||
"coder-1",
|
|
||||||
"sh",
|
|
||||||
&[script.to_string_lossy().to_string()],
|
|
||||||
"--",
|
|
||||||
"/tmp",
|
|
||||||
&tx,
|
|
||||||
&event_log,
|
|
||||||
None,
|
|
||||||
0,
|
|
||||||
child_killers,
|
|
||||||
watcher_tx,
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
let after = chrono::Utc::now();
|
|
||||||
|
|
||||||
assert!(result.is_ok(), "PTY run should succeed: {:?}", result.err());
|
|
||||||
|
|
||||||
let evt = watcher_rx
|
|
||||||
.try_recv()
|
|
||||||
.expect("Expected a RateLimitHardBlock to be sent on watcher_tx");
|
|
||||||
match evt {
|
|
||||||
WatcherEvent::RateLimitHardBlock {
|
|
||||||
story_id,
|
|
||||||
agent_name,
|
|
||||||
reset_at,
|
|
||||||
} => {
|
|
||||||
assert_eq!(story_id, "496_bug_hard_rate_limit");
|
|
||||||
assert_eq!(agent_name, "coder-1");
|
|
||||||
// reset_at should be ~5 minutes from when the event fired
|
|
||||||
let min_expected = before + chrono::Duration::minutes(4);
|
|
||||||
let max_expected = after + chrono::Duration::minutes(6);
|
|
||||||
assert!(
|
|
||||||
reset_at >= min_expected && reset_at <= max_expected,
|
|
||||||
"reset_at {reset_at} should be ~5 minutes from now"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
other => panic!("Expected RateLimitHardBlock (with default backoff), got: {other:?}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_emit_event_writes_to_log_writer() {
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let root = tmp.path();
|
|
||||||
|
|
||||||
let log_writer = AgentLogWriter::new(root, "42_story_foo", "coder-1", "sess-emit").unwrap();
|
|
||||||
let log_mutex = Mutex::new(log_writer);
|
|
||||||
|
|
||||||
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
|
||||||
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
|
||||||
|
|
||||||
let event = AgentEvent::Status {
|
|
||||||
story_id: "42_story_foo".to_string(),
|
|
||||||
agent_name: "coder-1".to_string(),
|
|
||||||
status: "running".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
emit_event(event, &tx, &event_log, Some(&log_mutex));
|
|
||||||
|
|
||||||
// Verify event was added to in-memory log
|
|
||||||
let mem_events = event_log.lock().unwrap();
|
|
||||||
assert_eq!(mem_events.len(), 1);
|
|
||||||
drop(mem_events);
|
|
||||||
|
|
||||||
// Verify event was written to the log file
|
|
||||||
let log_path =
|
|
||||||
crate::agent_log::log_file_path(root, "42_story_foo", "coder-1", "sess-emit");
|
|
||||||
let entries = crate::agent_log::read_log(&log_path).unwrap();
|
|
||||||
assert_eq!(entries.len(), 1);
|
|
||||||
assert_eq!(entries[0].event["type"], "status");
|
|
||||||
assert_eq!(entries[0].event["status"], "running");
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── bug 167: handle_agent_stream_event routes thinking/text correctly ───
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn stream_event_thinking_delta_emits_thinking_event() {
|
|
||||||
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
|
||||||
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
|
||||||
|
|
||||||
let event = serde_json::json!({
|
|
||||||
"type": "content_block_delta",
|
|
||||||
"delta": {"type": "thinking_delta", "thinking": "Let me analyze this..."}
|
|
||||||
});
|
|
||||||
|
|
||||||
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
|
||||||
|
|
||||||
let received = rx.try_recv().unwrap();
|
|
||||||
match received {
|
|
||||||
AgentEvent::Thinking {
|
|
||||||
story_id,
|
|
||||||
agent_name,
|
|
||||||
text,
|
|
||||||
} => {
|
|
||||||
assert_eq!(story_id, "s1");
|
|
||||||
assert_eq!(agent_name, "coder-1");
|
|
||||||
assert_eq!(text, "Let me analyze this...");
|
|
||||||
}
|
|
||||||
other => panic!("Expected Thinking event, got: {other:?}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn stream_event_text_delta_emits_output_event() {
|
|
||||||
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
|
||||||
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
|
||||||
|
|
||||||
let event = serde_json::json!({
|
|
||||||
"type": "content_block_delta",
|
|
||||||
"delta": {"type": "text_delta", "text": "Here is the result."}
|
|
||||||
});
|
|
||||||
|
|
||||||
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
|
||||||
|
|
||||||
let received = rx.try_recv().unwrap();
|
|
||||||
match received {
|
|
||||||
AgentEvent::Output {
|
|
||||||
story_id,
|
|
||||||
agent_name,
|
|
||||||
text,
|
|
||||||
} => {
|
|
||||||
assert_eq!(story_id, "s1");
|
|
||||||
assert_eq!(agent_name, "coder-1");
|
|
||||||
assert_eq!(text, "Here is the result.");
|
|
||||||
}
|
|
||||||
other => panic!("Expected Output event, got: {other:?}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn stream_event_input_json_delta_ignored() {
|
|
||||||
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
|
||||||
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
|
||||||
|
|
||||||
let event = serde_json::json!({
|
|
||||||
"type": "content_block_delta",
|
|
||||||
"delta": {"type": "input_json_delta", "partial_json": "{\"file\":"}
|
|
||||||
});
|
|
||||||
|
|
||||||
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
|
||||||
|
|
||||||
// No event should be emitted for tool argument deltas
|
|
||||||
assert!(rx.try_recv().is_err());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn stream_event_non_delta_type_ignored() {
|
|
||||||
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
|
||||||
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
|
||||||
|
|
||||||
let event = serde_json::json!({
|
|
||||||
"type": "message_start",
|
|
||||||
"message": {"role": "assistant"}
|
|
||||||
});
|
|
||||||
|
|
||||||
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
|
||||||
|
|
||||||
assert!(rx.try_recv().is_err());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,30 @@
|
|||||||
|
//! Core types for the PTY runner: result container and process lifecycle helpers.
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use portable_pty::ChildKiller;
|
||||||
|
|
||||||
|
use crate::agents::TokenUsage;
|
||||||
|
|
||||||
|
/// Result from a PTY agent session, containing the session ID and token usage.
|
||||||
|
pub(in crate::agents) struct PtyResult {
|
||||||
|
pub session_id: Option<String>,
|
||||||
|
pub token_usage: Option<TokenUsage>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn composite_key(story_id: &str, agent_name: &str) -> String {
|
||||||
|
format!("{story_id}:{agent_name}")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) struct ChildKillerGuard {
|
||||||
|
pub killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
||||||
|
pub key: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for ChildKillerGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Ok(mut killers) = self.killers.lock() {
|
||||||
|
killers.remove(&self.key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user