huskies: merge 845
This commit is contained in:
@@ -1,749 +0,0 @@
|
|||||||
//! Claude Code event handling — streams JSON events from the CLI to the message channel.
|
|
||||||
|
|
||||||
use serde_json::Value;
|
|
||||||
|
|
||||||
use super::parse::{parse_assistant_message, parse_tool_results};
|
|
||||||
use crate::llm::types::{Message, Role};
|
|
||||||
use crate::slog;
|
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
|
||||||
|
|
||||||
pub(super) fn process_json_event(
|
|
||||||
json: &serde_json::Value,
|
|
||||||
token_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
|
||||||
thinking_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
|
||||||
activity_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
|
||||||
msg_tx: &std::sync::mpsc::Sender<Message>,
|
|
||||||
sid_tx: &mut Option<tokio::sync::oneshot::Sender<String>>,
|
|
||||||
auth_failed: &AtomicBool,
|
|
||||||
) -> bool {
|
|
||||||
let event_type = match json.get("type").and_then(|t| t.as_str()) {
|
|
||||||
Some(t) => t,
|
|
||||||
None => return false,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Capture session_id from the first event that carries it
|
|
||||||
if let Some(tx) = sid_tx.take() {
|
|
||||||
if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) {
|
|
||||||
slog!("[pty-debug] CAPTURED session_id: {}", sid);
|
|
||||||
let _ = tx.send(sid.to_string());
|
|
||||||
} else {
|
|
||||||
*sid_tx = Some(tx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Detect authentication_failed at the top level of any event.
|
|
||||||
if json.get("error").and_then(|e| e.as_str()) == Some("authentication_failed") {
|
|
||||||
slog!("[pty-debug] Detected authentication_failed error");
|
|
||||||
auth_failed.store(true, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
match event_type {
|
|
||||||
"stream_event" => {
|
|
||||||
if let Some(event) = json.get("event") {
|
|
||||||
handle_stream_event(event, token_tx, thinking_tx, activity_tx);
|
|
||||||
}
|
|
||||||
false
|
|
||||||
}
|
|
||||||
"assistant" => {
|
|
||||||
if let Some(message) = json.get("message")
|
|
||||||
&& let Some(content) = message.get("content").and_then(|c| c.as_array())
|
|
||||||
{
|
|
||||||
// Fire activity signals for tool_use blocks as a fallback path.
|
|
||||||
// The primary path is via stream_event → content_block_start (real-time),
|
|
||||||
// but assistant events also carry tool_use blocks and serve as a reliable
|
|
||||||
// backup if stream_event delivery is delayed or missed.
|
|
||||||
for block in content {
|
|
||||||
if block.get("type").and_then(|t| t.as_str()) == Some("tool_use")
|
|
||||||
&& let Some(name) = block.get("name").and_then(|n| n.as_str())
|
|
||||||
{
|
|
||||||
let _ = activity_tx.send(name.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
parse_assistant_message(content, msg_tx);
|
|
||||||
}
|
|
||||||
false
|
|
||||||
}
|
|
||||||
"user" => {
|
|
||||||
if let Some(message) = json.get("message")
|
|
||||||
&& let Some(content) = message.get("content").and_then(|c| c.as_array())
|
|
||||||
{
|
|
||||||
parse_tool_results(content, msg_tx);
|
|
||||||
}
|
|
||||||
false
|
|
||||||
}
|
|
||||||
"result" => true,
|
|
||||||
// system, rate_limit_event, and unknown types are no-ops
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Parse a complete `assistant` message content array.
|
|
||||||
///
|
|
||||||
/// Extracts text blocks into `content` and tool_use blocks into `tool_calls`,
|
|
||||||
/// then sends a single `Message { role: Assistant }` via `msg_tx`.
|
|
||||||
/// This is the authoritative source for the final message structure — streaming
|
|
||||||
pub(super) fn handle_stream_event(
|
|
||||||
event: &serde_json::Value,
|
|
||||||
token_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
|
||||||
thinking_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
|
||||||
activity_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
|
||||||
) {
|
|
||||||
let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
|
||||||
|
|
||||||
match event_type {
|
|
||||||
"content_block_start" => {
|
|
||||||
if let Some(cb) = event.get("content_block")
|
|
||||||
&& cb.get("type").and_then(|t| t.as_str()) == Some("tool_use")
|
|
||||||
&& let Some(name) = cb.get("name").and_then(|n| n.as_str())
|
|
||||||
{
|
|
||||||
let _ = activity_tx.send(name.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Text content streaming — only text_delta, not input_json_delta (tool args)
|
|
||||||
"content_block_delta" => {
|
|
||||||
if let Some(delta) = event.get("delta") {
|
|
||||||
let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
|
||||||
match delta_type {
|
|
||||||
"text_delta" => {
|
|
||||||
if let Some(text) = delta.get("text").and_then(|t| t.as_str()) {
|
|
||||||
let _ = token_tx.send(text.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
"thinking_delta" => {
|
|
||||||
if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) {
|
|
||||||
let _ = thinking_tx.send(thinking.to_string());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Log errors via streaming display
|
|
||||||
"error" => {
|
|
||||||
if let Some(error) = event.get("error") {
|
|
||||||
let msg = error
|
|
||||||
.get("message")
|
|
||||||
.and_then(|m| m.as_str())
|
|
||||||
.unwrap_or("unknown error");
|
|
||||||
let _ = token_tx.send(format!("\n[error: {msg}]\n"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use serde_json::json;
|
|
||||||
|
|
||||||
type Channels = (
|
|
||||||
tokio::sync::mpsc::UnboundedSender<String>,
|
|
||||||
tokio::sync::mpsc::UnboundedReceiver<String>,
|
|
||||||
tokio::sync::mpsc::UnboundedSender<String>,
|
|
||||||
tokio::sync::mpsc::UnboundedReceiver<String>,
|
|
||||||
tokio::sync::mpsc::UnboundedSender<String>,
|
|
||||||
tokio::sync::mpsc::UnboundedReceiver<String>,
|
|
||||||
std::sync::mpsc::Sender<Message>,
|
|
||||||
std::sync::mpsc::Receiver<Message>,
|
|
||||||
);
|
|
||||||
|
|
||||||
fn make_channels() -> Channels {
|
|
||||||
let (tok_tx, tok_rx) = tokio::sync::mpsc::unbounded_channel();
|
|
||||||
let (thi_tx, thi_rx) = tokio::sync::mpsc::unbounded_channel();
|
|
||||||
let (act_tx, act_rx) = tokio::sync::mpsc::unbounded_channel();
|
|
||||||
let (msg_tx, msg_rx) = std::sync::mpsc::channel();
|
|
||||||
(
|
|
||||||
tok_tx, tok_rx, thi_tx, thi_rx, act_tx, act_rx, msg_tx, msg_rx,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn handle_stream_event_text_delta_sends_token() {
|
|
||||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
|
||||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let event = json!({
|
|
||||||
"type": "content_block_delta",
|
|
||||||
"delta": {"type": "text_delta", "text": "hello "}
|
|
||||||
});
|
|
||||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
|
||||||
drop(tx);
|
|
||||||
let tokens: Vec<_> = {
|
|
||||||
let mut v = vec![];
|
|
||||||
while let Ok(t) = rx.try_recv() {
|
|
||||||
v.push(t);
|
|
||||||
}
|
|
||||||
v
|
|
||||||
};
|
|
||||||
assert_eq!(tokens, vec!["hello "]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn handle_stream_event_input_json_delta_not_sent() {
|
|
||||||
// Tool argument JSON deltas should NOT be sent as text tokens
|
|
||||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
|
||||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let event = json!({
|
|
||||||
"type": "content_block_delta",
|
|
||||||
"delta": {"type": "input_json_delta", "partial_json": "{\"path\":"}
|
|
||||||
});
|
|
||||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
|
||||||
drop(tx);
|
|
||||||
let tokens: Vec<String> = {
|
|
||||||
let mut v = vec![];
|
|
||||||
while let Ok(t) = rx.try_recv() {
|
|
||||||
v.push(t);
|
|
||||||
}
|
|
||||||
v
|
|
||||||
};
|
|
||||||
assert!(tokens.is_empty());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn handle_stream_event_thinking_delta_routes_to_thinking_channel() {
|
|
||||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
|
||||||
let (ttx, mut trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let event = json!({
|
|
||||||
"type": "content_block_delta",
|
|
||||||
"delta": {"type": "thinking_delta", "thinking": "I should check the file"}
|
|
||||||
});
|
|
||||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
|
||||||
drop(tx);
|
|
||||||
drop(ttx);
|
|
||||||
// thinking token must NOT appear in the regular token channel
|
|
||||||
let tokens: Vec<String> = {
|
|
||||||
let mut v = vec![];
|
|
||||||
while let Ok(t) = rx.try_recv() {
|
|
||||||
v.push(t);
|
|
||||||
}
|
|
||||||
v
|
|
||||||
};
|
|
||||||
assert!(
|
|
||||||
tokens.is_empty(),
|
|
||||||
"thinking token leaked into token channel"
|
|
||||||
);
|
|
||||||
// thinking token must appear in the dedicated thinking channel, without prefix
|
|
||||||
let thinking: Vec<String> = {
|
|
||||||
let mut v = vec![];
|
|
||||||
while let Ok(t) = trx.try_recv() {
|
|
||||||
v.push(t);
|
|
||||||
}
|
|
||||||
v
|
|
||||||
};
|
|
||||||
assert_eq!(thinking, vec!["I should check the file"]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn handle_stream_event_error_sends_error_token() {
|
|
||||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
|
||||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let event = json!({
|
|
||||||
"type": "error",
|
|
||||||
"error": {"type": "overloaded_error", "message": "Overloaded"}
|
|
||||||
});
|
|
||||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
|
||||||
drop(tx);
|
|
||||||
let tokens: Vec<String> = {
|
|
||||||
let mut v = vec![];
|
|
||||||
while let Ok(t) = rx.try_recv() {
|
|
||||||
v.push(t);
|
|
||||||
}
|
|
||||||
v
|
|
||||||
};
|
|
||||||
assert_eq!(tokens, vec!["\n[error: Overloaded]\n"]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn handle_stream_event_unknown_type_is_noop() {
|
|
||||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
|
||||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let event = json!({"type": "ping"});
|
|
||||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
|
||||||
drop(tx);
|
|
||||||
let tokens: Vec<String> = {
|
|
||||||
let mut v = vec![];
|
|
||||||
while let Ok(t) = rx.try_recv() {
|
|
||||||
v.push(t);
|
|
||||||
}
|
|
||||||
v
|
|
||||||
};
|
|
||||||
assert!(tokens.is_empty());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn handle_stream_event_tool_use_start_sends_activity() {
|
|
||||||
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let event = json!({
|
|
||||||
"type": "content_block_start",
|
|
||||||
"index": 1,
|
|
||||||
"content_block": {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {}}
|
|
||||||
});
|
|
||||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
|
||||||
drop(atx);
|
|
||||||
let activities: Vec<String> = {
|
|
||||||
let mut v = vec![];
|
|
||||||
while let Ok(a) = arx.try_recv() {
|
|
||||||
v.push(a);
|
|
||||||
}
|
|
||||||
v
|
|
||||||
};
|
|
||||||
assert_eq!(activities, vec!["Read"]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn handle_stream_event_text_block_start_no_activity() {
|
|
||||||
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
||||||
let event = json!({
|
|
||||||
"type": "content_block_start",
|
|
||||||
"index": 0,
|
|
||||||
"content_block": {"type": "text", "text": ""}
|
|
||||||
});
|
|
||||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
|
||||||
drop(atx);
|
|
||||||
let activities: Vec<String> = {
|
|
||||||
let mut v = vec![];
|
|
||||||
while let Ok(a) = arx.try_recv() {
|
|
||||||
v.push(a);
|
|
||||||
}
|
|
||||||
v
|
|
||||||
};
|
|
||||||
assert!(activities.is_empty());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_result_returns_true() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
|
||||||
let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::<String>();
|
|
||||||
let mut sid_tx_opt = Some(sid_tx);
|
|
||||||
let json = json!({"type": "result", "subtype": "success"});
|
|
||||||
assert!(process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx_opt,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_system_returns_false() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
|
||||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
|
||||||
let json = json!({"type": "system", "subtype": "init", "apiKeySource": "env"});
|
|
||||||
assert!(!process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_rate_limit_returns_false() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
|
||||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
|
||||||
let json = json!({"type": "rate_limit_event"});
|
|
||||||
assert!(!process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_unknown_type_returns_false() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
|
||||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
|
||||||
let json = json!({"type": "some_future_event"});
|
|
||||||
assert!(!process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_no_type_returns_false() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
|
||||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
|
||||||
let json = json!({"content": "no type field"});
|
|
||||||
assert!(!process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_captures_session_id() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
|
||||||
let (sid_tx, mut sid_rx) = tokio::sync::oneshot::channel::<String>();
|
|
||||||
let mut sid_tx_opt = Some(sid_tx);
|
|
||||||
let json = json!({"type": "system", "session_id": "sess-abc-123"});
|
|
||||||
process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx_opt,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
);
|
|
||||||
// sid_tx should have been consumed
|
|
||||||
assert!(sid_tx_opt.is_none());
|
|
||||||
let received = sid_rx.try_recv().unwrap();
|
|
||||||
assert_eq!(received, "sess-abc-123");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_preserves_sid_tx_if_no_session_id() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
|
||||||
let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::<String>();
|
|
||||||
let mut sid_tx_opt = Some(sid_tx);
|
|
||||||
let json = json!({"type": "system"});
|
|
||||||
process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx_opt,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
);
|
|
||||||
// sid_tx should still be present since no session_id in event
|
|
||||||
assert!(sid_tx_opt.is_some());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_stream_event_forwards_token() {
|
|
||||||
let (tok_tx, mut tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) =
|
|
||||||
make_channels();
|
|
||||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
|
||||||
let json = json!({
|
|
||||||
"type": "stream_event",
|
|
||||||
"session_id": "s1",
|
|
||||||
"event": {
|
|
||||||
"type": "content_block_delta",
|
|
||||||
"delta": {"type": "text_delta", "text": "word"}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
assert!(!process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
));
|
|
||||||
drop(tok_tx);
|
|
||||||
let tokens: Vec<String> = {
|
|
||||||
let mut v = vec![];
|
|
||||||
while let Ok(t) = tok_rx.try_recv() {
|
|
||||||
v.push(t);
|
|
||||||
}
|
|
||||||
v
|
|
||||||
};
|
|
||||||
assert_eq!(tokens, vec!["word"]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_stream_event_tool_use_fires_activity() {
|
|
||||||
// This is the primary activity path: stream_event wrapping content_block_start
|
|
||||||
// with a tool_use block. Requires --include-partial-messages to be enabled.
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) =
|
|
||||||
make_channels();
|
|
||||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
|
||||||
let json = json!({
|
|
||||||
"type": "stream_event",
|
|
||||||
"session_id": "s1",
|
|
||||||
"event": {
|
|
||||||
"type": "content_block_start",
|
|
||||||
"index": 1,
|
|
||||||
"content_block": {"type": "tool_use", "id": "toolu_abc", "name": "Bash", "input": {}}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
assert!(!process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
));
|
|
||||||
drop(act_tx);
|
|
||||||
let activities: Vec<String> = {
|
|
||||||
let mut v = vec![];
|
|
||||||
while let Ok(a) = act_rx.try_recv() {
|
|
||||||
v.push(a);
|
|
||||||
}
|
|
||||||
v
|
|
||||||
};
|
|
||||||
assert_eq!(activities, vec!["Bash"]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_assistant_with_tool_use_fires_activity() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) =
|
|
||||||
make_channels();
|
|
||||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
|
||||||
let json = json!({
|
|
||||||
"type": "assistant",
|
|
||||||
"message": {
|
|
||||||
"content": [
|
|
||||||
{"type": "text", "text": "Let me read that file."},
|
|
||||||
{"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {"file_path": "/foo.rs"}}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
});
|
|
||||||
assert!(!process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
));
|
|
||||||
drop(act_tx);
|
|
||||||
let activities: Vec<String> = {
|
|
||||||
let mut v = vec![];
|
|
||||||
while let Ok(a) = act_rx.try_recv() {
|
|
||||||
v.push(a);
|
|
||||||
}
|
|
||||||
v
|
|
||||||
};
|
|
||||||
assert_eq!(activities, vec!["Read"]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_assistant_with_multiple_tool_uses_fires_all_activities() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) =
|
|
||||||
make_channels();
|
|
||||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
|
||||||
let json = json!({
|
|
||||||
"type": "assistant",
|
|
||||||
"message": {
|
|
||||||
"content": [
|
|
||||||
{"type": "tool_use", "id": "id1", "name": "Glob", "input": {}},
|
|
||||||
{"type": "tool_use", "id": "id2", "name": "Bash", "input": {}}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
});
|
|
||||||
assert!(!process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
));
|
|
||||||
drop(act_tx);
|
|
||||||
let activities: Vec<String> = {
|
|
||||||
let mut v = vec![];
|
|
||||||
while let Ok(a) = act_rx.try_recv() {
|
|
||||||
v.push(a);
|
|
||||||
}
|
|
||||||
v
|
|
||||||
};
|
|
||||||
assert_eq!(activities, vec!["Glob", "Bash"]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_assistant_text_only_no_activity() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) =
|
|
||||||
make_channels();
|
|
||||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
|
||||||
let json = json!({
|
|
||||||
"type": "assistant",
|
|
||||||
"message": {
|
|
||||||
"content": [{"type": "text", "text": "Just text, no tools."}]
|
|
||||||
}
|
|
||||||
});
|
|
||||||
assert!(!process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
));
|
|
||||||
drop(act_tx);
|
|
||||||
let activities: Vec<String> = {
|
|
||||||
let mut v = vec![];
|
|
||||||
while let Ok(a) = act_rx.try_recv() {
|
|
||||||
v.push(a);
|
|
||||||
}
|
|
||||||
v
|
|
||||||
};
|
|
||||||
assert!(activities.is_empty());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_assistant_event_parses_message() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
|
||||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
|
||||||
let json = json!({
|
|
||||||
"type": "assistant",
|
|
||||||
"message": {
|
|
||||||
"content": [{"type": "text", "text": "Hi!"}]
|
|
||||||
}
|
|
||||||
});
|
|
||||||
assert!(!process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
));
|
|
||||||
drop(msg_tx);
|
|
||||||
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
|
||||||
assert_eq!(msgs.len(), 1);
|
|
||||||
assert_eq!(msgs[0].content, "Hi!");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_user_event_parses_tool_results() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
|
||||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
|
||||||
let json = json!({
|
|
||||||
"type": "user",
|
|
||||||
"message": {
|
|
||||||
"content": [{"type": "tool_result", "tool_use_id": "tid1", "content": "done"}]
|
|
||||||
}
|
|
||||||
});
|
|
||||||
assert!(!process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
));
|
|
||||||
drop(msg_tx);
|
|
||||||
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
|
||||||
assert_eq!(msgs.len(), 1);
|
|
||||||
assert_eq!(msgs[0].role, Role::Tool);
|
|
||||||
assert_eq!(msgs[0].content, "done");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_assistant_without_content_array_is_noop() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
|
||||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
|
||||||
let json = json!({
|
|
||||||
"type": "assistant",
|
|
||||||
"message": {"content": "not an array"}
|
|
||||||
});
|
|
||||||
assert!(!process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
));
|
|
||||||
drop(msg_tx);
|
|
||||||
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
|
||||||
assert!(msgs.is_empty());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_user_without_content_array_is_noop() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
|
||||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
|
||||||
let json = json!({"type": "user", "message": {"content": null}});
|
|
||||||
assert!(!process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx,
|
|
||||||
&AtomicBool::new(false),
|
|
||||||
));
|
|
||||||
drop(msg_tx);
|
|
||||||
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
|
||||||
assert!(msgs.is_empty());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_detects_authentication_failed() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
|
||||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
|
||||||
let auth_failed = AtomicBool::new(false);
|
|
||||||
let json = json!({
|
|
||||||
"type": "assistant",
|
|
||||||
"error": "authentication_failed",
|
|
||||||
"message": {
|
|
||||||
"content": [{"type": "text", "text": "Failed to authenticate."}]
|
|
||||||
}
|
|
||||||
});
|
|
||||||
assert!(!process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx,
|
|
||||||
&auth_failed,
|
|
||||||
));
|
|
||||||
assert!(auth_failed.load(Ordering::Relaxed));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn process_json_event_no_auth_failed_for_normal_events() {
|
|
||||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
|
||||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
|
||||||
let auth_failed = AtomicBool::new(false);
|
|
||||||
let json = json!({
|
|
||||||
"type": "assistant",
|
|
||||||
"message": {
|
|
||||||
"content": [{"type": "text", "text": "Hello!"}]
|
|
||||||
}
|
|
||||||
});
|
|
||||||
assert!(!process_json_event(
|
|
||||||
&json,
|
|
||||||
&tok_tx,
|
|
||||||
&thi_tx,
|
|
||||||
&act_tx,
|
|
||||||
&msg_tx,
|
|
||||||
&mut sid_tx,
|
|
||||||
&auth_failed,
|
|
||||||
));
|
|
||||||
assert!(!auth_failed.load(Ordering::Relaxed));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,87 @@
|
|||||||
|
//! Claude Code event handling — dispatches JSON events from the CLI to message channels.
|
||||||
|
|
||||||
|
mod stream;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests;
|
||||||
|
|
||||||
|
use super::parse::{parse_assistant_message, parse_tool_results};
|
||||||
|
use crate::llm::types::Message;
|
||||||
|
use crate::slog;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use stream::handle_stream_event;
|
||||||
|
|
||||||
|
/// Dispatch a top-level JSON event from the Claude Code CLI.
|
||||||
|
///
|
||||||
|
/// Routes the event to the appropriate handler based on `type`, emitting tokens,
|
||||||
|
/// thinking output, activity signals, and parsed messages to their respective channels.
|
||||||
|
/// Returns `true` only for `"result"` events, which signal that the CLI turn is complete.
|
||||||
|
pub(super) fn process_json_event(
|
||||||
|
json: &serde_json::Value,
|
||||||
|
token_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||||
|
thinking_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||||
|
activity_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||||
|
msg_tx: &std::sync::mpsc::Sender<Message>,
|
||||||
|
sid_tx: &mut Option<tokio::sync::oneshot::Sender<String>>,
|
||||||
|
auth_failed: &AtomicBool,
|
||||||
|
) -> bool {
|
||||||
|
let event_type = match json.get("type").and_then(|t| t.as_str()) {
|
||||||
|
Some(t) => t,
|
||||||
|
None => return false,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Capture session_id from the first event that carries it
|
||||||
|
if let Some(tx) = sid_tx.take() {
|
||||||
|
if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) {
|
||||||
|
slog!("[pty-debug] CAPTURED session_id: {}", sid);
|
||||||
|
let _ = tx.send(sid.to_string());
|
||||||
|
} else {
|
||||||
|
*sid_tx = Some(tx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Detect authentication_failed at the top level of any event.
|
||||||
|
if json.get("error").and_then(|e| e.as_str()) == Some("authentication_failed") {
|
||||||
|
slog!("[pty-debug] Detected authentication_failed error");
|
||||||
|
auth_failed.store(true, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
match event_type {
|
||||||
|
"stream_event" => {
|
||||||
|
if let Some(event) = json.get("event") {
|
||||||
|
handle_stream_event(event, token_tx, thinking_tx, activity_tx);
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
"assistant" => {
|
||||||
|
if let Some(message) = json.get("message")
|
||||||
|
&& let Some(content) = message.get("content").and_then(|c| c.as_array())
|
||||||
|
{
|
||||||
|
// Fire activity signals for tool_use blocks as a fallback path.
|
||||||
|
// The primary path is via stream_event → content_block_start (real-time),
|
||||||
|
// but assistant events also carry tool_use blocks and serve as a reliable
|
||||||
|
// backup if stream_event delivery is delayed or missed.
|
||||||
|
for block in content {
|
||||||
|
if block.get("type").and_then(|t| t.as_str()) == Some("tool_use")
|
||||||
|
&& let Some(name) = block.get("name").and_then(|n| n.as_str())
|
||||||
|
{
|
||||||
|
let _ = activity_tx.send(name.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
parse_assistant_message(content, msg_tx);
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
"user" => {
|
||||||
|
if let Some(message) = json.get("message")
|
||||||
|
&& let Some(content) = message.get("content").and_then(|c| c.as_array())
|
||||||
|
{
|
||||||
|
parse_tool_results(content, msg_tx);
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
"result" => true,
|
||||||
|
// system, rate_limit_event, and unknown types are no-ops
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,55 @@
|
|||||||
|
//! Stream event handling — processes `stream_event` payloads from the Claude Code CLI.
|
||||||
|
|
||||||
|
/// Handle a single unwrapped stream event (the inner `event` object from a `stream_event` JSON).
|
||||||
|
///
|
||||||
|
/// Routes `content_block_delta` text tokens to `token_tx`, thinking tokens to `thinking_tx`,
|
||||||
|
/// `content_block_start` tool-use names to `activity_tx`, and error messages back to `token_tx`.
|
||||||
|
pub(super) fn handle_stream_event(
|
||||||
|
event: &serde_json::Value,
|
||||||
|
token_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||||
|
thinking_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||||
|
activity_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||||
|
) {
|
||||||
|
let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
||||||
|
|
||||||
|
match event_type {
|
||||||
|
"content_block_start" => {
|
||||||
|
if let Some(cb) = event.get("content_block")
|
||||||
|
&& cb.get("type").and_then(|t| t.as_str()) == Some("tool_use")
|
||||||
|
&& let Some(name) = cb.get("name").and_then(|n| n.as_str())
|
||||||
|
{
|
||||||
|
let _ = activity_tx.send(name.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Text content streaming — only text_delta, not input_json_delta (tool args)
|
||||||
|
"content_block_delta" => {
|
||||||
|
if let Some(delta) = event.get("delta") {
|
||||||
|
let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
||||||
|
match delta_type {
|
||||||
|
"text_delta" => {
|
||||||
|
if let Some(text) = delta.get("text").and_then(|t| t.as_str()) {
|
||||||
|
let _ = token_tx.send(text.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"thinking_delta" => {
|
||||||
|
if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) {
|
||||||
|
let _ = thinking_tx.send(thinking.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Log errors via streaming display
|
||||||
|
"error" => {
|
||||||
|
if let Some(error) = event.get("error") {
|
||||||
|
let msg = error
|
||||||
|
.get("message")
|
||||||
|
.and_then(|m| m.as_str())
|
||||||
|
.unwrap_or("unknown error");
|
||||||
|
let _ = token_tx.send(format!("\n[error: {msg}]\n"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,613 @@
|
|||||||
|
//! Tests for Claude Code JSON event processing — covers both stream event routing
|
||||||
|
//! and the top-level JSON event dispatcher.
|
||||||
|
|
||||||
|
use super::stream::handle_stream_event;
|
||||||
|
use super::*;
|
||||||
|
use crate::llm::types::{Message, Role};
|
||||||
|
use serde_json::json;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
|
||||||
|
type Channels = (
|
||||||
|
tokio::sync::mpsc::UnboundedSender<String>,
|
||||||
|
tokio::sync::mpsc::UnboundedReceiver<String>,
|
||||||
|
tokio::sync::mpsc::UnboundedSender<String>,
|
||||||
|
tokio::sync::mpsc::UnboundedReceiver<String>,
|
||||||
|
tokio::sync::mpsc::UnboundedSender<String>,
|
||||||
|
tokio::sync::mpsc::UnboundedReceiver<String>,
|
||||||
|
std::sync::mpsc::Sender<Message>,
|
||||||
|
std::sync::mpsc::Receiver<Message>,
|
||||||
|
);
|
||||||
|
|
||||||
|
fn make_channels() -> Channels {
|
||||||
|
let (tok_tx, tok_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
let (thi_tx, thi_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
let (act_tx, act_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
let (msg_tx, msg_rx) = std::sync::mpsc::channel();
|
||||||
|
(
|
||||||
|
tok_tx, tok_rx, thi_tx, thi_rx, act_tx, act_rx, msg_tx, msg_rx,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn handle_stream_event_text_delta_sends_token() {
|
||||||
|
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let event = json!({
|
||||||
|
"type": "content_block_delta",
|
||||||
|
"delta": {"type": "text_delta", "text": "hello "}
|
||||||
|
});
|
||||||
|
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||||
|
drop(tx);
|
||||||
|
let tokens: Vec<_> = {
|
||||||
|
let mut v = vec![];
|
||||||
|
while let Ok(t) = rx.try_recv() {
|
||||||
|
v.push(t);
|
||||||
|
}
|
||||||
|
v
|
||||||
|
};
|
||||||
|
assert_eq!(tokens, vec!["hello "]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn handle_stream_event_input_json_delta_not_sent() {
|
||||||
|
// Tool argument JSON deltas should NOT be sent as text tokens
|
||||||
|
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let event = json!({
|
||||||
|
"type": "content_block_delta",
|
||||||
|
"delta": {"type": "input_json_delta", "partial_json": "{\"path\":"}
|
||||||
|
});
|
||||||
|
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||||
|
drop(tx);
|
||||||
|
let tokens: Vec<String> = {
|
||||||
|
let mut v = vec![];
|
||||||
|
while let Ok(t) = rx.try_recv() {
|
||||||
|
v.push(t);
|
||||||
|
}
|
||||||
|
v
|
||||||
|
};
|
||||||
|
assert!(tokens.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn handle_stream_event_thinking_delta_routes_to_thinking_channel() {
|
||||||
|
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
let (ttx, mut trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let event = json!({
|
||||||
|
"type": "content_block_delta",
|
||||||
|
"delta": {"type": "thinking_delta", "thinking": "I should check the file"}
|
||||||
|
});
|
||||||
|
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||||
|
drop(tx);
|
||||||
|
drop(ttx);
|
||||||
|
// thinking token must NOT appear in the regular token channel
|
||||||
|
let tokens: Vec<String> = {
|
||||||
|
let mut v = vec![];
|
||||||
|
while let Ok(t) = rx.try_recv() {
|
||||||
|
v.push(t);
|
||||||
|
}
|
||||||
|
v
|
||||||
|
};
|
||||||
|
assert!(
|
||||||
|
tokens.is_empty(),
|
||||||
|
"thinking token leaked into token channel"
|
||||||
|
);
|
||||||
|
// thinking token must appear in the dedicated thinking channel, without prefix
|
||||||
|
let thinking: Vec<String> = {
|
||||||
|
let mut v = vec![];
|
||||||
|
while let Ok(t) = trx.try_recv() {
|
||||||
|
v.push(t);
|
||||||
|
}
|
||||||
|
v
|
||||||
|
};
|
||||||
|
assert_eq!(thinking, vec!["I should check the file"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn handle_stream_event_error_sends_error_token() {
|
||||||
|
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let event = json!({
|
||||||
|
"type": "error",
|
||||||
|
"error": {"type": "overloaded_error", "message": "Overloaded"}
|
||||||
|
});
|
||||||
|
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||||
|
drop(tx);
|
||||||
|
let tokens: Vec<String> = {
|
||||||
|
let mut v = vec![];
|
||||||
|
while let Ok(t) = rx.try_recv() {
|
||||||
|
v.push(t);
|
||||||
|
}
|
||||||
|
v
|
||||||
|
};
|
||||||
|
assert_eq!(tokens, vec!["\n[error: Overloaded]\n"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn handle_stream_event_unknown_type_is_noop() {
|
||||||
|
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let event = json!({"type": "ping"});
|
||||||
|
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||||
|
drop(tx);
|
||||||
|
let tokens: Vec<String> = {
|
||||||
|
let mut v = vec![];
|
||||||
|
while let Ok(t) = rx.try_recv() {
|
||||||
|
v.push(t);
|
||||||
|
}
|
||||||
|
v
|
||||||
|
};
|
||||||
|
assert!(tokens.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn handle_stream_event_tool_use_start_sends_activity() {
|
||||||
|
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let event = json!({
|
||||||
|
"type": "content_block_start",
|
||||||
|
"index": 1,
|
||||||
|
"content_block": {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {}}
|
||||||
|
});
|
||||||
|
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||||
|
drop(atx);
|
||||||
|
let activities: Vec<String> = {
|
||||||
|
let mut v = vec![];
|
||||||
|
while let Ok(a) = arx.try_recv() {
|
||||||
|
v.push(a);
|
||||||
|
}
|
||||||
|
v
|
||||||
|
};
|
||||||
|
assert_eq!(activities, vec!["Read"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn handle_stream_event_text_block_start_no_activity() {
|
||||||
|
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
|
let event = json!({
|
||||||
|
"type": "content_block_start",
|
||||||
|
"index": 0,
|
||||||
|
"content_block": {"type": "text", "text": ""}
|
||||||
|
});
|
||||||
|
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||||
|
drop(atx);
|
||||||
|
let activities: Vec<String> = {
|
||||||
|
let mut v = vec![];
|
||||||
|
while let Ok(a) = arx.try_recv() {
|
||||||
|
v.push(a);
|
||||||
|
}
|
||||||
|
v
|
||||||
|
};
|
||||||
|
assert!(activities.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_result_returns_true() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||||
|
let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::<String>();
|
||||||
|
let mut sid_tx_opt = Some(sid_tx);
|
||||||
|
let json = json!({"type": "result", "subtype": "success"});
|
||||||
|
assert!(process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx_opt,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_system_returns_false() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||||
|
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||||
|
let json = json!({"type": "system", "subtype": "init", "apiKeySource": "env"});
|
||||||
|
assert!(!process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_rate_limit_returns_false() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||||
|
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||||
|
let json = json!({"type": "rate_limit_event"});
|
||||||
|
assert!(!process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_unknown_type_returns_false() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||||
|
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||||
|
let json = json!({"type": "some_future_event"});
|
||||||
|
assert!(!process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_no_type_returns_false() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||||
|
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||||
|
let json = json!({"content": "no type field"});
|
||||||
|
assert!(!process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_captures_session_id() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||||
|
let (sid_tx, mut sid_rx) = tokio::sync::oneshot::channel::<String>();
|
||||||
|
let mut sid_tx_opt = Some(sid_tx);
|
||||||
|
let json = json!({"type": "system", "session_id": "sess-abc-123"});
|
||||||
|
process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx_opt,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
);
|
||||||
|
// sid_tx should have been consumed
|
||||||
|
assert!(sid_tx_opt.is_none());
|
||||||
|
let received = sid_rx.try_recv().unwrap();
|
||||||
|
assert_eq!(received, "sess-abc-123");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_preserves_sid_tx_if_no_session_id() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||||
|
let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::<String>();
|
||||||
|
let mut sid_tx_opt = Some(sid_tx);
|
||||||
|
let json = json!({"type": "system"});
|
||||||
|
process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx_opt,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
);
|
||||||
|
// sid_tx should still be present since no session_id in event
|
||||||
|
assert!(sid_tx_opt.is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_stream_event_forwards_token() {
|
||||||
|
let (tok_tx, mut tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||||
|
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||||
|
let json = json!({
|
||||||
|
"type": "stream_event",
|
||||||
|
"session_id": "s1",
|
||||||
|
"event": {
|
||||||
|
"type": "content_block_delta",
|
||||||
|
"delta": {"type": "text_delta", "text": "word"}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert!(!process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
));
|
||||||
|
drop(tok_tx);
|
||||||
|
let tokens: Vec<String> = {
|
||||||
|
let mut v = vec![];
|
||||||
|
while let Ok(t) = tok_rx.try_recv() {
|
||||||
|
v.push(t);
|
||||||
|
}
|
||||||
|
v
|
||||||
|
};
|
||||||
|
assert_eq!(tokens, vec!["word"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_stream_event_tool_use_fires_activity() {
|
||||||
|
// This is the primary activity path: stream_event wrapping content_block_start
|
||||||
|
// with a tool_use block. Requires --include-partial-messages to be enabled.
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels();
|
||||||
|
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||||
|
let json = json!({
|
||||||
|
"type": "stream_event",
|
||||||
|
"session_id": "s1",
|
||||||
|
"event": {
|
||||||
|
"type": "content_block_start",
|
||||||
|
"index": 1,
|
||||||
|
"content_block": {"type": "tool_use", "id": "toolu_abc", "name": "Bash", "input": {}}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert!(!process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
));
|
||||||
|
drop(act_tx);
|
||||||
|
let activities: Vec<String> = {
|
||||||
|
let mut v = vec![];
|
||||||
|
while let Ok(a) = act_rx.try_recv() {
|
||||||
|
v.push(a);
|
||||||
|
}
|
||||||
|
v
|
||||||
|
};
|
||||||
|
assert_eq!(activities, vec!["Bash"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_assistant_with_tool_use_fires_activity() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels();
|
||||||
|
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||||
|
let json = json!({
|
||||||
|
"type": "assistant",
|
||||||
|
"message": {
|
||||||
|
"content": [
|
||||||
|
{"type": "text", "text": "Let me read that file."},
|
||||||
|
{"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {"file_path": "/foo.rs"}}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert!(!process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
));
|
||||||
|
drop(act_tx);
|
||||||
|
let activities: Vec<String> = {
|
||||||
|
let mut v = vec![];
|
||||||
|
while let Ok(a) = act_rx.try_recv() {
|
||||||
|
v.push(a);
|
||||||
|
}
|
||||||
|
v
|
||||||
|
};
|
||||||
|
assert_eq!(activities, vec!["Read"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_assistant_with_multiple_tool_uses_fires_all_activities() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels();
|
||||||
|
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||||
|
let json = json!({
|
||||||
|
"type": "assistant",
|
||||||
|
"message": {
|
||||||
|
"content": [
|
||||||
|
{"type": "tool_use", "id": "id1", "name": "Glob", "input": {}},
|
||||||
|
{"type": "tool_use", "id": "id2", "name": "Bash", "input": {}}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert!(!process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
));
|
||||||
|
drop(act_tx);
|
||||||
|
let activities: Vec<String> = {
|
||||||
|
let mut v = vec![];
|
||||||
|
while let Ok(a) = act_rx.try_recv() {
|
||||||
|
v.push(a);
|
||||||
|
}
|
||||||
|
v
|
||||||
|
};
|
||||||
|
assert_eq!(activities, vec!["Glob", "Bash"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_assistant_text_only_no_activity() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels();
|
||||||
|
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||||
|
let json = json!({
|
||||||
|
"type": "assistant",
|
||||||
|
"message": {
|
||||||
|
"content": [{"type": "text", "text": "Just text, no tools."}]
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert!(!process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
));
|
||||||
|
drop(act_tx);
|
||||||
|
let activities: Vec<String> = {
|
||||||
|
let mut v = vec![];
|
||||||
|
while let Ok(a) = act_rx.try_recv() {
|
||||||
|
v.push(a);
|
||||||
|
}
|
||||||
|
v
|
||||||
|
};
|
||||||
|
assert!(activities.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_assistant_event_parses_message() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||||
|
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||||
|
let json = json!({
|
||||||
|
"type": "assistant",
|
||||||
|
"message": {
|
||||||
|
"content": [{"type": "text", "text": "Hi!"}]
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert!(!process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
));
|
||||||
|
drop(msg_tx);
|
||||||
|
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
||||||
|
assert_eq!(msgs.len(), 1);
|
||||||
|
assert_eq!(msgs[0].content, "Hi!");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_user_event_parses_tool_results() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||||
|
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||||
|
let json = json!({
|
||||||
|
"type": "user",
|
||||||
|
"message": {
|
||||||
|
"content": [{"type": "tool_result", "tool_use_id": "tid1", "content": "done"}]
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert!(!process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
));
|
||||||
|
drop(msg_tx);
|
||||||
|
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
||||||
|
assert_eq!(msgs.len(), 1);
|
||||||
|
assert_eq!(msgs[0].role, Role::Tool);
|
||||||
|
assert_eq!(msgs[0].content, "done");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_assistant_without_content_array_is_noop() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||||
|
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||||
|
let json = json!({
|
||||||
|
"type": "assistant",
|
||||||
|
"message": {"content": "not an array"}
|
||||||
|
});
|
||||||
|
assert!(!process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
));
|
||||||
|
drop(msg_tx);
|
||||||
|
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
||||||
|
assert!(msgs.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_user_without_content_array_is_noop() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||||
|
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||||
|
let json = json!({"type": "user", "message": {"content": null}});
|
||||||
|
assert!(!process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx,
|
||||||
|
&AtomicBool::new(false),
|
||||||
|
));
|
||||||
|
drop(msg_tx);
|
||||||
|
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
||||||
|
assert!(msgs.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_detects_authentication_failed() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||||
|
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||||
|
let auth_failed = AtomicBool::new(false);
|
||||||
|
let json = json!({
|
||||||
|
"type": "assistant",
|
||||||
|
"error": "authentication_failed",
|
||||||
|
"message": {
|
||||||
|
"content": [{"type": "text", "text": "Failed to authenticate."}]
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert!(!process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx,
|
||||||
|
&auth_failed,
|
||||||
|
));
|
||||||
|
assert!(auth_failed.load(Ordering::Relaxed));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn process_json_event_no_auth_failed_for_normal_events() {
|
||||||
|
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||||
|
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||||
|
let auth_failed = AtomicBool::new(false);
|
||||||
|
let json = json!({
|
||||||
|
"type": "assistant",
|
||||||
|
"message": {
|
||||||
|
"content": [{"type": "text", "text": "Hello!"}]
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assert!(!process_json_event(
|
||||||
|
&json,
|
||||||
|
&tok_tx,
|
||||||
|
&thi_tx,
|
||||||
|
&act_tx,
|
||||||
|
&msg_tx,
|
||||||
|
&mut sid_tx,
|
||||||
|
&auth_failed,
|
||||||
|
));
|
||||||
|
assert!(!auth_failed.load(Ordering::Relaxed));
|
||||||
|
}
|
||||||
@@ -34,7 +34,7 @@ pub struct ClaudeCodeResult {
|
|||||||
mod events;
|
mod events;
|
||||||
mod parse;
|
mod parse;
|
||||||
|
|
||||||
use events::{handle_stream_event, process_json_event};
|
use events::process_json_event;
|
||||||
|
|
||||||
/// Orchestrates Claude Code CLI sessions via a PTY for streaming agent chat.
|
/// Orchestrates Claude Code CLI sessions via a PTY for streaming agent chat.
|
||||||
pub struct ClaudeCodeProvider;
|
pub struct ClaudeCodeProvider;
|
||||||
|
|||||||
Reference in New Issue
Block a user