f16545ec36
The reader thread spawned in run_agent_pty_blocking was never joined, leaving a cloned PTY master fd open after the agent exited. When the pipeline restarted the agent on the same worktree, the stale fd from the previous session interfered with the new PTY allocation, causing Claude Code's bundled ripgrep to crash with: fatal runtime error: assertion failed: output.write(&bytes).is_ok() Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
699 lines
24 KiB
Rust
699 lines
24 KiB
Rust
use std::collections::HashMap;
|
|
use std::io::{BufRead, BufReader};
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use portable_pty::{ChildKiller, CommandBuilder, PtySize, native_pty_system};
|
|
use tokio::sync::broadcast;
|
|
|
|
use super::{AgentEvent, TokenUsage};
|
|
use crate::agent_log::AgentLogWriter;
|
|
use crate::io::watcher::WatcherEvent;
|
|
use crate::slog;
|
|
use crate::slog_warn;
|
|
|
|
/// Result from a PTY agent session, containing the session ID and token usage.
|
|
pub(in crate::agents) struct PtyResult {
|
|
pub session_id: Option<String>,
|
|
pub token_usage: Option<TokenUsage>,
|
|
}
|
|
|
|
fn composite_key(story_id: &str, agent_name: &str) -> String {
|
|
format!("{story_id}:{agent_name}")
|
|
}
|
|
|
|
struct ChildKillerGuard {
|
|
killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
|
key: String,
|
|
}
|
|
|
|
impl Drop for ChildKillerGuard {
|
|
fn drop(&mut self) {
|
|
if let Ok(mut killers) = self.killers.lock() {
|
|
killers.remove(&self.key);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Spawn claude agent in a PTY and stream events through the broadcast channel.
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub(in crate::agents) async fn run_agent_pty_streaming(
|
|
story_id: &str,
|
|
agent_name: &str,
|
|
command: &str,
|
|
args: &[String],
|
|
prompt: &str,
|
|
cwd: &str,
|
|
tx: &broadcast::Sender<AgentEvent>,
|
|
event_log: &Arc<Mutex<Vec<AgentEvent>>>,
|
|
log_writer: Option<Arc<Mutex<AgentLogWriter>>>,
|
|
inactivity_timeout_secs: u64,
|
|
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
|
watcher_tx: broadcast::Sender<WatcherEvent>,
|
|
) -> Result<PtyResult, String> {
|
|
let sid = story_id.to_string();
|
|
let aname = agent_name.to_string();
|
|
let cmd = command.to_string();
|
|
let args = args.to_vec();
|
|
let prompt = prompt.to_string();
|
|
let cwd = cwd.to_string();
|
|
let tx = tx.clone();
|
|
let event_log = event_log.clone();
|
|
|
|
tokio::task::spawn_blocking(move || {
|
|
run_agent_pty_blocking(
|
|
&sid,
|
|
&aname,
|
|
&cmd,
|
|
&args,
|
|
&prompt,
|
|
&cwd,
|
|
&tx,
|
|
&event_log,
|
|
log_writer.as_deref(),
|
|
inactivity_timeout_secs,
|
|
&child_killers,
|
|
&watcher_tx,
|
|
)
|
|
})
|
|
.await
|
|
.map_err(|e| format!("Agent task panicked: {e}"))?
|
|
}
|
|
|
|
/// Dispatch a `stream_event` from Claude Code's `--include-partial-messages` output.
|
|
///
|
|
/// Extracts `thinking_delta` and `text_delta` from `content_block_delta` events
|
|
/// and routes them as `AgentEvent::Thinking` and `AgentEvent::Output` respectively.
|
|
/// This ensures thinking traces flow through the dedicated `ThinkingBlock` UI
|
|
/// component rather than appearing as unbounded regular output.
|
|
fn handle_agent_stream_event(
|
|
event: &serde_json::Value,
|
|
story_id: &str,
|
|
agent_name: &str,
|
|
tx: &broadcast::Sender<AgentEvent>,
|
|
event_log: &Mutex<Vec<AgentEvent>>,
|
|
log_writer: Option<&Mutex<AgentLogWriter>>,
|
|
) {
|
|
let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
|
|
|
if event_type == "content_block_delta"
|
|
&& let Some(delta) = event.get("delta")
|
|
{
|
|
let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
|
match delta_type {
|
|
"thinking_delta" => {
|
|
if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) {
|
|
emit_event(
|
|
AgentEvent::Thinking {
|
|
story_id: story_id.to_string(),
|
|
agent_name: agent_name.to_string(),
|
|
text: thinking.to_string(),
|
|
},
|
|
tx,
|
|
event_log,
|
|
log_writer,
|
|
);
|
|
}
|
|
}
|
|
"text_delta" => {
|
|
if let Some(text) = delta.get("text").and_then(|t| t.as_str()) {
|
|
emit_event(
|
|
AgentEvent::Output {
|
|
story_id: story_id.to_string(),
|
|
agent_name: agent_name.to_string(),
|
|
text: text.to_string(),
|
|
},
|
|
tx,
|
|
event_log,
|
|
log_writer,
|
|
);
|
|
}
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Helper to send an event to broadcast, event log, and optional persistent log file.
|
|
pub(super) fn emit_event(
|
|
event: AgentEvent,
|
|
tx: &broadcast::Sender<AgentEvent>,
|
|
event_log: &Mutex<Vec<AgentEvent>>,
|
|
log_writer: Option<&Mutex<AgentLogWriter>>,
|
|
) {
|
|
if let Ok(mut log) = event_log.lock() {
|
|
log.push(event.clone());
|
|
}
|
|
if let Some(writer) = log_writer
|
|
&& let Ok(mut w) = writer.lock()
|
|
&& let Err(e) = w.write_event(&event)
|
|
{
|
|
eprintln!("[agent_log] Failed to write event to log file: {e}");
|
|
}
|
|
let _ = tx.send(event);
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
fn run_agent_pty_blocking(
|
|
story_id: &str,
|
|
agent_name: &str,
|
|
command: &str,
|
|
args: &[String],
|
|
prompt: &str,
|
|
cwd: &str,
|
|
tx: &broadcast::Sender<AgentEvent>,
|
|
event_log: &Mutex<Vec<AgentEvent>>,
|
|
log_writer: Option<&Mutex<AgentLogWriter>>,
|
|
inactivity_timeout_secs: u64,
|
|
child_killers: &Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
|
watcher_tx: &broadcast::Sender<WatcherEvent>,
|
|
) -> Result<PtyResult, String> {
|
|
let pty_system = native_pty_system();
|
|
|
|
let pair = pty_system
|
|
.openpty(PtySize {
|
|
rows: 50,
|
|
cols: 200,
|
|
pixel_width: 0,
|
|
pixel_height: 0,
|
|
})
|
|
.map_err(|e| format!("Failed to open PTY: {e}"))?;
|
|
|
|
let mut cmd = CommandBuilder::new(command);
|
|
|
|
// -p <prompt> must come first
|
|
cmd.arg("-p");
|
|
cmd.arg(prompt);
|
|
|
|
// Add configured args (e.g., --directory /path/to/worktree, --model, etc.)
|
|
for arg in args {
|
|
cmd.arg(arg);
|
|
}
|
|
|
|
cmd.arg("--output-format");
|
|
cmd.arg("stream-json");
|
|
cmd.arg("--verbose");
|
|
// Enable partial streaming so we receive thinking_delta and text_delta
|
|
// events in real-time, rather than only complete assistant events.
|
|
// Without this, thinking traces may not appear in the structured output
|
|
// and instead leak as unstructured PTY text.
|
|
cmd.arg("--include-partial-messages");
|
|
|
|
// Supervised agents don't need interactive permission prompts
|
|
cmd.arg("--permission-mode");
|
|
cmd.arg("bypassPermissions");
|
|
|
|
cmd.cwd(cwd);
|
|
cmd.env("NO_COLOR", "1");
|
|
|
|
// Allow spawning Claude Code from within a Claude Code session
|
|
cmd.env_remove("CLAUDECODE");
|
|
cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
|
|
|
|
slog!("[agent:{story_id}:{agent_name}] Spawning {command} in {cwd} with args: {args:?}");
|
|
|
|
let mut child = pair
|
|
.slave
|
|
.spawn_command(cmd)
|
|
.map_err(|e| format!("Failed to spawn agent for {story_id}:{agent_name}: {e}"))?;
|
|
|
|
// Register the child killer so that kill_all_children() / stop_agent() can
|
|
// terminate this process on server shutdown, even if the blocking thread
|
|
// cannot be interrupted. The ChildKillerGuard deregisters on function exit.
|
|
let killer_key = composite_key(story_id, agent_name);
|
|
{
|
|
let killer = child.clone_killer();
|
|
if let Ok(mut killers) = child_killers.lock() {
|
|
killers.insert(killer_key.clone(), killer);
|
|
}
|
|
}
|
|
let _killer_guard = ChildKillerGuard {
|
|
killers: Arc::clone(child_killers),
|
|
key: killer_key,
|
|
};
|
|
|
|
drop(pair.slave);
|
|
|
|
let reader = pair
|
|
.master
|
|
.try_clone_reader()
|
|
.map_err(|e| format!("Failed to clone PTY reader: {e}"))?;
|
|
|
|
drop(pair.master);
|
|
|
|
// Spawn a reader thread to collect PTY output lines.
|
|
// We use a channel so the main thread can apply an inactivity deadline
|
|
// via recv_timeout: if no output arrives within the configured window
|
|
// the process is killed and the agent is marked Failed.
|
|
let (line_tx, line_rx) = std::sync::mpsc::channel::<std::io::Result<String>>();
|
|
let sid_for_reader = story_id.to_string();
|
|
let aname_for_reader = agent_name.to_string();
|
|
let reader_handle = std::thread::spawn(move || {
|
|
let buf_reader = BufReader::new(reader);
|
|
for line in buf_reader.lines() {
|
|
if line_tx.send(line).is_err() {
|
|
break;
|
|
}
|
|
}
|
|
slog!("[agent:{sid_for_reader}:{aname_for_reader}] Reader thread exiting");
|
|
});
|
|
|
|
let timeout_dur = if inactivity_timeout_secs > 0 {
|
|
Some(std::time::Duration::from_secs(inactivity_timeout_secs))
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let mut session_id: Option<String> = None;
|
|
let mut token_usage: Option<TokenUsage> = None;
|
|
|
|
loop {
|
|
let recv_result = match timeout_dur {
|
|
Some(dur) => line_rx.recv_timeout(dur),
|
|
None => line_rx
|
|
.recv()
|
|
.map_err(|_| std::sync::mpsc::RecvTimeoutError::Disconnected),
|
|
};
|
|
|
|
let line = match recv_result {
|
|
Ok(Ok(l)) => l,
|
|
Ok(Err(_)) => {
|
|
// IO error reading from PTY — treat as EOF.
|
|
break;
|
|
}
|
|
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
|
|
// Reader thread exited (EOF from PTY).
|
|
break;
|
|
}
|
|
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
|
|
slog_warn!(
|
|
"[agent:{story_id}:{agent_name}] Inactivity timeout after \
|
|
{inactivity_timeout_secs}s with no output. Killing process."
|
|
);
|
|
let _ = child.kill();
|
|
let _ = child.wait();
|
|
return Err(format!(
|
|
"Agent inactivity timeout: no output received for {inactivity_timeout_secs}s"
|
|
));
|
|
}
|
|
};
|
|
|
|
let trimmed = line.trim();
|
|
if trimmed.is_empty() {
|
|
continue;
|
|
}
|
|
|
|
// Try to parse as JSON
|
|
let json: serde_json::Value = match serde_json::from_str(trimmed) {
|
|
Ok(j) => j,
|
|
Err(_) => {
|
|
// Non-JSON output (terminal escapes etc.) — send as raw output
|
|
emit_event(
|
|
AgentEvent::Output {
|
|
story_id: story_id.to_string(),
|
|
agent_name: agent_name.to_string(),
|
|
text: trimmed.to_string(),
|
|
},
|
|
tx,
|
|
event_log,
|
|
log_writer,
|
|
);
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let event_type = json.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
|
|
|
match event_type {
|
|
"system" => {
|
|
session_id = json
|
|
.get("session_id")
|
|
.and_then(|s| s.as_str())
|
|
.map(|s| s.to_string());
|
|
}
|
|
// With --include-partial-messages, thinking and text arrive
|
|
// incrementally via stream_event → content_block_delta. Handle
|
|
// them here for real-time streaming to the frontend.
|
|
"stream_event" => {
|
|
if let Some(event) = json.get("event") {
|
|
handle_agent_stream_event(
|
|
event,
|
|
story_id,
|
|
agent_name,
|
|
tx,
|
|
event_log,
|
|
log_writer,
|
|
);
|
|
}
|
|
}
|
|
// Complete assistant events are skipped for content extraction
|
|
// because thinking and text already arrived via stream_event.
|
|
// The raw JSON is still forwarded as AgentJson below.
|
|
"assistant" | "user" => {}
|
|
"rate_limit_event" => {
|
|
let rate_limit_info = json.get("rate_limit_info");
|
|
let status = rate_limit_info
|
|
.and_then(|i| i.get("status"))
|
|
.and_then(|s| s.as_str())
|
|
.unwrap_or("");
|
|
let is_hard_block = !status.is_empty() && status != "allowed_warning";
|
|
let reset_at = rate_limit_info
|
|
.and_then(|i| i.get("reset_at"))
|
|
.and_then(|r| r.as_str())
|
|
.and_then(|r| chrono::DateTime::parse_from_rfc3339(r).ok())
|
|
.map(|dt| dt.with_timezone(&chrono::Utc));
|
|
|
|
if is_hard_block {
|
|
if let Some(reset_at) = reset_at {
|
|
slog!(
|
|
"[agent:{story_id}:{agent_name}] API rate limit hard block \
|
|
(status={status}); resets at {reset_at}"
|
|
);
|
|
let _ = watcher_tx.send(WatcherEvent::RateLimitHardBlock {
|
|
story_id: story_id.to_string(),
|
|
agent_name: agent_name.to_string(),
|
|
reset_at,
|
|
});
|
|
} else {
|
|
slog!(
|
|
"[agent:{story_id}:{agent_name}] API rate limit hard block \
|
|
(status={status}); no reset_at in rate_limit_info"
|
|
);
|
|
let _ = watcher_tx.send(WatcherEvent::RateLimitWarning {
|
|
story_id: story_id.to_string(),
|
|
agent_name: agent_name.to_string(),
|
|
});
|
|
}
|
|
} else {
|
|
slog!(
|
|
"[agent:{story_id}:{agent_name}] API rate limit warning received \
|
|
(status={status})"
|
|
);
|
|
let _ = watcher_tx.send(WatcherEvent::RateLimitWarning {
|
|
story_id: story_id.to_string(),
|
|
agent_name: agent_name.to_string(),
|
|
});
|
|
}
|
|
}
|
|
"result" => {
|
|
// Extract token usage from the result event.
|
|
if let Some(usage) = TokenUsage::from_result_event(&json) {
|
|
slog!(
|
|
"[agent:{story_id}:{agent_name}] Token usage: in={} out={} cache_create={} cache_read={} cost=${:.4}",
|
|
usage.input_tokens,
|
|
usage.output_tokens,
|
|
usage.cache_creation_input_tokens,
|
|
usage.cache_read_input_tokens,
|
|
usage.total_cost_usd,
|
|
);
|
|
token_usage = Some(usage);
|
|
}
|
|
}
|
|
_ => {}
|
|
}
|
|
|
|
// Forward all JSON events
|
|
emit_event(
|
|
AgentEvent::AgentJson {
|
|
story_id: story_id.to_string(),
|
|
agent_name: agent_name.to_string(),
|
|
data: json,
|
|
},
|
|
tx,
|
|
event_log,
|
|
log_writer,
|
|
);
|
|
}
|
|
|
|
let _ = child.kill();
|
|
let _ = child.wait();
|
|
|
|
// Wait for the reader thread to finish so it releases the cloned PTY
|
|
// master fd before we return. Without this, the next PTY spawn for the
|
|
// same story can collide with a still-open fd from this session (#453).
|
|
slog!("[agent:{story_id}:{agent_name}] Waiting for reader thread to exit");
|
|
if let Err(e) = reader_handle.join() {
|
|
slog!("[agent:{story_id}:{agent_name}] Reader thread panicked: {e:?}");
|
|
}
|
|
slog!("[agent:{story_id}:{agent_name}] Reader thread joined");
|
|
|
|
slog!(
|
|
"[agent:{story_id}:{agent_name}] Done. Session: {:?}",
|
|
session_id
|
|
);
|
|
|
|
Ok(PtyResult {
|
|
session_id,
|
|
token_usage,
|
|
})
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::agents::AgentEvent;
|
|
use crate::io::watcher::WatcherEvent;
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
|
|
// ── AC1: pty detects rate_limit_event and emits RateLimitWarning ─────────
|
|
|
|
/// Verify that when a `rate_limit_event` JSON line appears in PTY output,
|
|
/// `run_agent_pty_streaming` sends a `WatcherEvent::RateLimitWarning` with
|
|
/// the correct story_id and agent_name.
|
|
///
|
|
/// The command invoked is: `sh -p -- <script>` where `--` terminates
|
|
/// option parsing so the script path is treated as the operand.
|
|
#[tokio::test]
|
|
async fn rate_limit_event_json_sends_watcher_warning() {
|
|
use std::os::unix::fs::PermissionsExt;
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let script = tmp.path().join("emit_rate_limit.sh");
|
|
std::fs::write(
|
|
&script,
|
|
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"allowed_warning\"}}'\n",
|
|
)
|
|
.unwrap();
|
|
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
|
|
|
|
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
|
let (watcher_tx, mut watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
|
let event_log = Arc::new(Mutex::new(Vec::new()));
|
|
let child_killers = Arc::new(Mutex::new(HashMap::new()));
|
|
|
|
// sh -p "--" <script>: -p = privileged mode, "--" = end options,
|
|
// then the script path is the file operand.
|
|
let result = run_agent_pty_streaming(
|
|
"365_story_test",
|
|
"coder-1",
|
|
"sh",
|
|
&[script.to_string_lossy().to_string()],
|
|
"--",
|
|
"/tmp",
|
|
&tx,
|
|
&event_log,
|
|
None,
|
|
0,
|
|
child_killers,
|
|
watcher_tx,
|
|
)
|
|
.await;
|
|
|
|
assert!(result.is_ok(), "PTY run should succeed: {:?}", result.err());
|
|
|
|
let evt = watcher_rx
|
|
.try_recv()
|
|
.expect("Expected a RateLimitWarning to be sent on watcher_tx");
|
|
match evt {
|
|
WatcherEvent::RateLimitWarning {
|
|
story_id,
|
|
agent_name,
|
|
} => {
|
|
assert_eq!(story_id, "365_story_test");
|
|
assert_eq!(agent_name, "coder-1");
|
|
}
|
|
other => panic!("Expected RateLimitWarning, got: {other:?}"),
|
|
}
|
|
}
|
|
|
|
/// AC1: hard block with `reset_at` emits `RateLimitHardBlock` with the
|
|
/// correct story_id, agent_name, and parsed reset_at timestamp.
|
|
#[tokio::test]
|
|
async fn rate_limit_hard_block_sends_watcher_hard_block_event() {
|
|
use std::os::unix::fs::PermissionsExt;
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let script = tmp.path().join("emit_hard_block.sh");
|
|
std::fs::write(
|
|
&script,
|
|
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"hard_block\",\"reset_at\":\"2099-01-01T12:00:00Z\"}}'\n",
|
|
)
|
|
.unwrap();
|
|
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
|
|
|
|
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
|
let (watcher_tx, mut watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
|
let event_log = Arc::new(Mutex::new(Vec::new()));
|
|
let child_killers = Arc::new(Mutex::new(HashMap::new()));
|
|
|
|
let result = run_agent_pty_streaming(
|
|
"423_story_rate_limit",
|
|
"coder-1",
|
|
"sh",
|
|
&[script.to_string_lossy().to_string()],
|
|
"--",
|
|
"/tmp",
|
|
&tx,
|
|
&event_log,
|
|
None,
|
|
0,
|
|
child_killers,
|
|
watcher_tx,
|
|
)
|
|
.await;
|
|
|
|
assert!(result.is_ok(), "PTY run should succeed: {:?}", result.err());
|
|
|
|
let evt = watcher_rx
|
|
.try_recv()
|
|
.expect("Expected a RateLimitHardBlock to be sent on watcher_tx");
|
|
match evt {
|
|
WatcherEvent::RateLimitHardBlock {
|
|
story_id,
|
|
agent_name,
|
|
reset_at,
|
|
} => {
|
|
assert_eq!(story_id, "423_story_rate_limit");
|
|
assert_eq!(agent_name, "coder-1");
|
|
assert_eq!(
|
|
reset_at.to_rfc3339(),
|
|
"2099-01-01T12:00:00+00:00",
|
|
"reset_at should match the parsed timestamp"
|
|
);
|
|
}
|
|
other => panic!("Expected RateLimitHardBlock, got: {other:?}"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_emit_event_writes_to_log_writer() {
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let root = tmp.path();
|
|
|
|
let log_writer =
|
|
AgentLogWriter::new(root, "42_story_foo", "coder-1", "sess-emit").unwrap();
|
|
let log_mutex = Mutex::new(log_writer);
|
|
|
|
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
|
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
|
|
|
let event = AgentEvent::Status {
|
|
story_id: "42_story_foo".to_string(),
|
|
agent_name: "coder-1".to_string(),
|
|
status: "running".to_string(),
|
|
};
|
|
|
|
emit_event(event, &tx, &event_log, Some(&log_mutex));
|
|
|
|
// Verify event was added to in-memory log
|
|
let mem_events = event_log.lock().unwrap();
|
|
assert_eq!(mem_events.len(), 1);
|
|
drop(mem_events);
|
|
|
|
// Verify event was written to the log file
|
|
let log_path =
|
|
crate::agent_log::log_file_path(root, "42_story_foo", "coder-1", "sess-emit");
|
|
let entries = crate::agent_log::read_log(&log_path).unwrap();
|
|
assert_eq!(entries.len(), 1);
|
|
assert_eq!(entries[0].event["type"], "status");
|
|
assert_eq!(entries[0].event["status"], "running");
|
|
}
|
|
|
|
// ── bug 167: handle_agent_stream_event routes thinking/text correctly ───
|
|
|
|
#[test]
|
|
fn stream_event_thinking_delta_emits_thinking_event() {
|
|
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
|
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
|
|
|
let event = serde_json::json!({
|
|
"type": "content_block_delta",
|
|
"delta": {"type": "thinking_delta", "thinking": "Let me analyze this..."}
|
|
});
|
|
|
|
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
|
|
|
let received = rx.try_recv().unwrap();
|
|
match received {
|
|
AgentEvent::Thinking {
|
|
story_id,
|
|
agent_name,
|
|
text,
|
|
} => {
|
|
assert_eq!(story_id, "s1");
|
|
assert_eq!(agent_name, "coder-1");
|
|
assert_eq!(text, "Let me analyze this...");
|
|
}
|
|
other => panic!("Expected Thinking event, got: {other:?}"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn stream_event_text_delta_emits_output_event() {
|
|
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
|
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
|
|
|
let event = serde_json::json!({
|
|
"type": "content_block_delta",
|
|
"delta": {"type": "text_delta", "text": "Here is the result."}
|
|
});
|
|
|
|
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
|
|
|
let received = rx.try_recv().unwrap();
|
|
match received {
|
|
AgentEvent::Output {
|
|
story_id,
|
|
agent_name,
|
|
text,
|
|
} => {
|
|
assert_eq!(story_id, "s1");
|
|
assert_eq!(agent_name, "coder-1");
|
|
assert_eq!(text, "Here is the result.");
|
|
}
|
|
other => panic!("Expected Output event, got: {other:?}"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn stream_event_input_json_delta_ignored() {
|
|
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
|
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
|
|
|
let event = serde_json::json!({
|
|
"type": "content_block_delta",
|
|
"delta": {"type": "input_json_delta", "partial_json": "{\"file\":"}
|
|
});
|
|
|
|
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
|
|
|
// No event should be emitted for tool argument deltas
|
|
assert!(rx.try_recv().is_err());
|
|
}
|
|
|
|
#[test]
|
|
fn stream_event_non_delta_type_ignored() {
|
|
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
|
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
|
|
|
let event = serde_json::json!({
|
|
"type": "message_start",
|
|
"message": {"role": "assistant"}
|
|
});
|
|
|
|
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
|
|
|
assert!(rx.try_recv().is_err());
|
|
}
|
|
}
|