diff --git a/server/src/llm/providers/claude_code/events.rs b/server/src/llm/providers/claude_code/events.rs deleted file mode 100644 index 303ba984..00000000 --- a/server/src/llm/providers/claude_code/events.rs +++ /dev/null @@ -1,749 +0,0 @@ -//! Claude Code event handling — streams JSON events from the CLI to the message channel. - -use serde_json::Value; - -use super::parse::{parse_assistant_message, parse_tool_results}; -use crate::llm::types::{Message, Role}; -use crate::slog; -use std::sync::atomic::{AtomicBool, Ordering}; - -pub(super) fn process_json_event( - json: &serde_json::Value, - token_tx: &tokio::sync::mpsc::UnboundedSender, - thinking_tx: &tokio::sync::mpsc::UnboundedSender, - activity_tx: &tokio::sync::mpsc::UnboundedSender, - msg_tx: &std::sync::mpsc::Sender, - sid_tx: &mut Option>, - auth_failed: &AtomicBool, -) -> bool { - let event_type = match json.get("type").and_then(|t| t.as_str()) { - Some(t) => t, - None => return false, - }; - - // Capture session_id from the first event that carries it - if let Some(tx) = sid_tx.take() { - if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) { - slog!("[pty-debug] CAPTURED session_id: {}", sid); - let _ = tx.send(sid.to_string()); - } else { - *sid_tx = Some(tx); - } - } - - // Detect authentication_failed at the top level of any event. - if json.get("error").and_then(|e| e.as_str()) == Some("authentication_failed") { - slog!("[pty-debug] Detected authentication_failed error"); - auth_failed.store(true, Ordering::Relaxed); - } - - match event_type { - "stream_event" => { - if let Some(event) = json.get("event") { - handle_stream_event(event, token_tx, thinking_tx, activity_tx); - } - false - } - "assistant" => { - if let Some(message) = json.get("message") - && let Some(content) = message.get("content").and_then(|c| c.as_array()) - { - // Fire activity signals for tool_use blocks as a fallback path. - // The primary path is via stream_event → content_block_start (real-time), - // but assistant events also carry tool_use blocks and serve as a reliable - // backup if stream_event delivery is delayed or missed. - for block in content { - if block.get("type").and_then(|t| t.as_str()) == Some("tool_use") - && let Some(name) = block.get("name").and_then(|n| n.as_str()) - { - let _ = activity_tx.send(name.to_string()); - } - } - parse_assistant_message(content, msg_tx); - } - false - } - "user" => { - if let Some(message) = json.get("message") - && let Some(content) = message.get("content").and_then(|c| c.as_array()) - { - parse_tool_results(content, msg_tx); - } - false - } - "result" => true, - // system, rate_limit_event, and unknown types are no-ops - _ => false, - } -} - -/// Parse a complete `assistant` message content array. -/// -/// Extracts text blocks into `content` and tool_use blocks into `tool_calls`, -/// then sends a single `Message { role: Assistant }` via `msg_tx`. -/// This is the authoritative source for the final message structure — streaming -pub(super) fn handle_stream_event( - event: &serde_json::Value, - token_tx: &tokio::sync::mpsc::UnboundedSender, - thinking_tx: &tokio::sync::mpsc::UnboundedSender, - activity_tx: &tokio::sync::mpsc::UnboundedSender, -) { - let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or(""); - - match event_type { - "content_block_start" => { - if let Some(cb) = event.get("content_block") - && cb.get("type").and_then(|t| t.as_str()) == Some("tool_use") - && let Some(name) = cb.get("name").and_then(|n| n.as_str()) - { - let _ = activity_tx.send(name.to_string()); - } - } - // Text content streaming — only text_delta, not input_json_delta (tool args) - "content_block_delta" => { - if let Some(delta) = event.get("delta") { - let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or(""); - match delta_type { - "text_delta" => { - if let Some(text) = delta.get("text").and_then(|t| t.as_str()) { - let _ = token_tx.send(text.to_string()); - } - } - "thinking_delta" => { - if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) { - let _ = thinking_tx.send(thinking.to_string()); - } - } - _ => {} - } - } - } - // Log errors via streaming display - "error" => { - if let Some(error) = event.get("error") { - let msg = error - .get("message") - .and_then(|m| m.as_str()) - .unwrap_or("unknown error"); - let _ = token_tx.send(format!("\n[error: {msg}]\n")); - } - } - _ => {} - } -} - -#[cfg(test)] -mod tests { - use super::*; - use serde_json::json; - - type Channels = ( - tokio::sync::mpsc::UnboundedSender, - tokio::sync::mpsc::UnboundedReceiver, - tokio::sync::mpsc::UnboundedSender, - tokio::sync::mpsc::UnboundedReceiver, - tokio::sync::mpsc::UnboundedSender, - tokio::sync::mpsc::UnboundedReceiver, - std::sync::mpsc::Sender, - std::sync::mpsc::Receiver, - ); - - fn make_channels() -> Channels { - let (tok_tx, tok_rx) = tokio::sync::mpsc::unbounded_channel(); - let (thi_tx, thi_rx) = tokio::sync::mpsc::unbounded_channel(); - let (act_tx, act_rx) = tokio::sync::mpsc::unbounded_channel(); - let (msg_tx, msg_rx) = std::sync::mpsc::channel(); - ( - tok_tx, tok_rx, thi_tx, thi_rx, act_tx, act_rx, msg_tx, msg_rx, - ) - } - - #[test] - fn handle_stream_event_text_delta_sends_token() { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); - let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); - let event = json!({ - "type": "content_block_delta", - "delta": {"type": "text_delta", "text": "hello "} - }); - handle_stream_event(&event, &tx, &ttx, &atx); - drop(tx); - let tokens: Vec<_> = { - let mut v = vec![]; - while let Ok(t) = rx.try_recv() { - v.push(t); - } - v - }; - assert_eq!(tokens, vec!["hello "]); - } - - #[test] - fn handle_stream_event_input_json_delta_not_sent() { - // Tool argument JSON deltas should NOT be sent as text tokens - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); - let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); - let event = json!({ - "type": "content_block_delta", - "delta": {"type": "input_json_delta", "partial_json": "{\"path\":"} - }); - handle_stream_event(&event, &tx, &ttx, &atx); - drop(tx); - let tokens: Vec = { - let mut v = vec![]; - while let Ok(t) = rx.try_recv() { - v.push(t); - } - v - }; - assert!(tokens.is_empty()); - } - - #[test] - fn handle_stream_event_thinking_delta_routes_to_thinking_channel() { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let (ttx, mut trx) = tokio::sync::mpsc::unbounded_channel::(); - let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); - let event = json!({ - "type": "content_block_delta", - "delta": {"type": "thinking_delta", "thinking": "I should check the file"} - }); - handle_stream_event(&event, &tx, &ttx, &atx); - drop(tx); - drop(ttx); - // thinking token must NOT appear in the regular token channel - let tokens: Vec = { - let mut v = vec![]; - while let Ok(t) = rx.try_recv() { - v.push(t); - } - v - }; - assert!( - tokens.is_empty(), - "thinking token leaked into token channel" - ); - // thinking token must appear in the dedicated thinking channel, without prefix - let thinking: Vec = { - let mut v = vec![]; - while let Ok(t) = trx.try_recv() { - v.push(t); - } - v - }; - assert_eq!(thinking, vec!["I should check the file"]); - } - - #[test] - fn handle_stream_event_error_sends_error_token() { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); - let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); - let event = json!({ - "type": "error", - "error": {"type": "overloaded_error", "message": "Overloaded"} - }); - handle_stream_event(&event, &tx, &ttx, &atx); - drop(tx); - let tokens: Vec = { - let mut v = vec![]; - while let Ok(t) = rx.try_recv() { - v.push(t); - } - v - }; - assert_eq!(tokens, vec!["\n[error: Overloaded]\n"]); - } - - #[test] - fn handle_stream_event_unknown_type_is_noop() { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); - let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); - let event = json!({"type": "ping"}); - handle_stream_event(&event, &tx, &ttx, &atx); - drop(tx); - let tokens: Vec = { - let mut v = vec![]; - while let Ok(t) = rx.try_recv() { - v.push(t); - } - v - }; - assert!(tokens.is_empty()); - } - - #[test] - fn handle_stream_event_tool_use_start_sends_activity() { - let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); - let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); - let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::(); - let event = json!({ - "type": "content_block_start", - "index": 1, - "content_block": {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {}} - }); - handle_stream_event(&event, &tx, &ttx, &atx); - drop(atx); - let activities: Vec = { - let mut v = vec![]; - while let Ok(a) = arx.try_recv() { - v.push(a); - } - v - }; - assert_eq!(activities, vec!["Read"]); - } - - #[test] - fn handle_stream_event_text_block_start_no_activity() { - let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); - let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); - let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::(); - let event = json!({ - "type": "content_block_start", - "index": 0, - "content_block": {"type": "text", "text": ""} - }); - handle_stream_event(&event, &tx, &ttx, &atx); - drop(atx); - let activities: Vec = { - let mut v = vec![]; - while let Ok(a) = arx.try_recv() { - v.push(a); - } - v - }; - assert!(activities.is_empty()); - } - - #[test] - fn process_json_event_result_returns_true() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::(); - let mut sid_tx_opt = Some(sid_tx); - let json = json!({"type": "result", "subtype": "success"}); - assert!(process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx_opt, - &AtomicBool::new(false), - )); - } - - #[test] - fn process_json_event_system_returns_false() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({"type": "system", "subtype": "init", "apiKeySource": "env"}); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - } - - #[test] - fn process_json_event_rate_limit_returns_false() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({"type": "rate_limit_event"}); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - } - - #[test] - fn process_json_event_unknown_type_returns_false() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({"type": "some_future_event"}); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - } - - #[test] - fn process_json_event_no_type_returns_false() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({"content": "no type field"}); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - } - - #[test] - fn process_json_event_captures_session_id() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let (sid_tx, mut sid_rx) = tokio::sync::oneshot::channel::(); - let mut sid_tx_opt = Some(sid_tx); - let json = json!({"type": "system", "session_id": "sess-abc-123"}); - process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx_opt, - &AtomicBool::new(false), - ); - // sid_tx should have been consumed - assert!(sid_tx_opt.is_none()); - let received = sid_rx.try_recv().unwrap(); - assert_eq!(received, "sess-abc-123"); - } - - #[test] - fn process_json_event_preserves_sid_tx_if_no_session_id() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::(); - let mut sid_tx_opt = Some(sid_tx); - let json = json!({"type": "system"}); - process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx_opt, - &AtomicBool::new(false), - ); - // sid_tx should still be present since no session_id in event - assert!(sid_tx_opt.is_some()); - } - - #[test] - fn process_json_event_stream_event_forwards_token() { - let (tok_tx, mut tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = - make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "stream_event", - "session_id": "s1", - "event": { - "type": "content_block_delta", - "delta": {"type": "text_delta", "text": "word"} - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(tok_tx); - let tokens: Vec = { - let mut v = vec![]; - while let Ok(t) = tok_rx.try_recv() { - v.push(t); - } - v - }; - assert_eq!(tokens, vec!["word"]); - } - - #[test] - fn process_json_event_stream_event_tool_use_fires_activity() { - // This is the primary activity path: stream_event wrapping content_block_start - // with a tool_use block. Requires --include-partial-messages to be enabled. - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = - make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "stream_event", - "session_id": "s1", - "event": { - "type": "content_block_start", - "index": 1, - "content_block": {"type": "tool_use", "id": "toolu_abc", "name": "Bash", "input": {}} - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(act_tx); - let activities: Vec = { - let mut v = vec![]; - while let Ok(a) = act_rx.try_recv() { - v.push(a); - } - v - }; - assert_eq!(activities, vec!["Bash"]); - } - - #[test] - fn process_json_event_assistant_with_tool_use_fires_activity() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = - make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "assistant", - "message": { - "content": [ - {"type": "text", "text": "Let me read that file."}, - {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {"file_path": "/foo.rs"}} - ] - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(act_tx); - let activities: Vec = { - let mut v = vec![]; - while let Ok(a) = act_rx.try_recv() { - v.push(a); - } - v - }; - assert_eq!(activities, vec!["Read"]); - } - - #[test] - fn process_json_event_assistant_with_multiple_tool_uses_fires_all_activities() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = - make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "assistant", - "message": { - "content": [ - {"type": "tool_use", "id": "id1", "name": "Glob", "input": {}}, - {"type": "tool_use", "id": "id2", "name": "Bash", "input": {}} - ] - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(act_tx); - let activities: Vec = { - let mut v = vec![]; - while let Ok(a) = act_rx.try_recv() { - v.push(a); - } - v - }; - assert_eq!(activities, vec!["Glob", "Bash"]); - } - - #[test] - fn process_json_event_assistant_text_only_no_activity() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = - make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "assistant", - "message": { - "content": [{"type": "text", "text": "Just text, no tools."}] - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(act_tx); - let activities: Vec = { - let mut v = vec![]; - while let Ok(a) = act_rx.try_recv() { - v.push(a); - } - v - }; - assert!(activities.is_empty()); - } - - #[test] - fn process_json_event_assistant_event_parses_message() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "assistant", - "message": { - "content": [{"type": "text", "text": "Hi!"}] - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(msg_tx); - let msgs: Vec = msg_rx.try_iter().collect(); - assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].content, "Hi!"); - } - - #[test] - fn process_json_event_user_event_parses_tool_results() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "user", - "message": { - "content": [{"type": "tool_result", "tool_use_id": "tid1", "content": "done"}] - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(msg_tx); - let msgs: Vec = msg_rx.try_iter().collect(); - assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].role, Role::Tool); - assert_eq!(msgs[0].content, "done"); - } - - #[test] - fn process_json_event_assistant_without_content_array_is_noop() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "assistant", - "message": {"content": "not an array"} - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(msg_tx); - let msgs: Vec = msg_rx.try_iter().collect(); - assert!(msgs.is_empty()); - } - - #[test] - fn process_json_event_user_without_content_array_is_noop() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({"type": "user", "message": {"content": null}}); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(msg_tx); - let msgs: Vec = msg_rx.try_iter().collect(); - assert!(msgs.is_empty()); - } - - #[test] - fn process_json_event_detects_authentication_failed() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let mut sid_tx = None::>; - let auth_failed = AtomicBool::new(false); - let json = json!({ - "type": "assistant", - "error": "authentication_failed", - "message": { - "content": [{"type": "text", "text": "Failed to authenticate."}] - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &auth_failed, - )); - assert!(auth_failed.load(Ordering::Relaxed)); - } - - #[test] - fn process_json_event_no_auth_failed_for_normal_events() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let mut sid_tx = None::>; - let auth_failed = AtomicBool::new(false); - let json = json!({ - "type": "assistant", - "message": { - "content": [{"type": "text", "text": "Hello!"}] - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &auth_failed, - )); - assert!(!auth_failed.load(Ordering::Relaxed)); - } -} diff --git a/server/src/llm/providers/claude_code/events/mod.rs b/server/src/llm/providers/claude_code/events/mod.rs new file mode 100644 index 00000000..b3e3b398 --- /dev/null +++ b/server/src/llm/providers/claude_code/events/mod.rs @@ -0,0 +1,87 @@ +//! Claude Code event handling — dispatches JSON events from the CLI to message channels. + +mod stream; + +#[cfg(test)] +mod tests; + +use super::parse::{parse_assistant_message, parse_tool_results}; +use crate::llm::types::Message; +use crate::slog; +use std::sync::atomic::{AtomicBool, Ordering}; +use stream::handle_stream_event; + +/// Dispatch a top-level JSON event from the Claude Code CLI. +/// +/// Routes the event to the appropriate handler based on `type`, emitting tokens, +/// thinking output, activity signals, and parsed messages to their respective channels. +/// Returns `true` only for `"result"` events, which signal that the CLI turn is complete. +pub(super) fn process_json_event( + json: &serde_json::Value, + token_tx: &tokio::sync::mpsc::UnboundedSender, + thinking_tx: &tokio::sync::mpsc::UnboundedSender, + activity_tx: &tokio::sync::mpsc::UnboundedSender, + msg_tx: &std::sync::mpsc::Sender, + sid_tx: &mut Option>, + auth_failed: &AtomicBool, +) -> bool { + let event_type = match json.get("type").and_then(|t| t.as_str()) { + Some(t) => t, + None => return false, + }; + + // Capture session_id from the first event that carries it + if let Some(tx) = sid_tx.take() { + if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) { + slog!("[pty-debug] CAPTURED session_id: {}", sid); + let _ = tx.send(sid.to_string()); + } else { + *sid_tx = Some(tx); + } + } + + // Detect authentication_failed at the top level of any event. + if json.get("error").and_then(|e| e.as_str()) == Some("authentication_failed") { + slog!("[pty-debug] Detected authentication_failed error"); + auth_failed.store(true, Ordering::Relaxed); + } + + match event_type { + "stream_event" => { + if let Some(event) = json.get("event") { + handle_stream_event(event, token_tx, thinking_tx, activity_tx); + } + false + } + "assistant" => { + if let Some(message) = json.get("message") + && let Some(content) = message.get("content").and_then(|c| c.as_array()) + { + // Fire activity signals for tool_use blocks as a fallback path. + // The primary path is via stream_event → content_block_start (real-time), + // but assistant events also carry tool_use blocks and serve as a reliable + // backup if stream_event delivery is delayed or missed. + for block in content { + if block.get("type").and_then(|t| t.as_str()) == Some("tool_use") + && let Some(name) = block.get("name").and_then(|n| n.as_str()) + { + let _ = activity_tx.send(name.to_string()); + } + } + parse_assistant_message(content, msg_tx); + } + false + } + "user" => { + if let Some(message) = json.get("message") + && let Some(content) = message.get("content").and_then(|c| c.as_array()) + { + parse_tool_results(content, msg_tx); + } + false + } + "result" => true, + // system, rate_limit_event, and unknown types are no-ops + _ => false, + } +} diff --git a/server/src/llm/providers/claude_code/events/stream.rs b/server/src/llm/providers/claude_code/events/stream.rs new file mode 100644 index 00000000..46fe652d --- /dev/null +++ b/server/src/llm/providers/claude_code/events/stream.rs @@ -0,0 +1,55 @@ +//! Stream event handling — processes `stream_event` payloads from the Claude Code CLI. + +/// Handle a single unwrapped stream event (the inner `event` object from a `stream_event` JSON). +/// +/// Routes `content_block_delta` text tokens to `token_tx`, thinking tokens to `thinking_tx`, +/// `content_block_start` tool-use names to `activity_tx`, and error messages back to `token_tx`. +pub(super) fn handle_stream_event( + event: &serde_json::Value, + token_tx: &tokio::sync::mpsc::UnboundedSender, + thinking_tx: &tokio::sync::mpsc::UnboundedSender, + activity_tx: &tokio::sync::mpsc::UnboundedSender, +) { + let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or(""); + + match event_type { + "content_block_start" => { + if let Some(cb) = event.get("content_block") + && cb.get("type").and_then(|t| t.as_str()) == Some("tool_use") + && let Some(name) = cb.get("name").and_then(|n| n.as_str()) + { + let _ = activity_tx.send(name.to_string()); + } + } + // Text content streaming — only text_delta, not input_json_delta (tool args) + "content_block_delta" => { + if let Some(delta) = event.get("delta") { + let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or(""); + match delta_type { + "text_delta" => { + if let Some(text) = delta.get("text").and_then(|t| t.as_str()) { + let _ = token_tx.send(text.to_string()); + } + } + "thinking_delta" => { + if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) { + let _ = thinking_tx.send(thinking.to_string()); + } + } + _ => {} + } + } + } + // Log errors via streaming display + "error" => { + if let Some(error) = event.get("error") { + let msg = error + .get("message") + .and_then(|m| m.as_str()) + .unwrap_or("unknown error"); + let _ = token_tx.send(format!("\n[error: {msg}]\n")); + } + } + _ => {} + } +} diff --git a/server/src/llm/providers/claude_code/events/tests.rs b/server/src/llm/providers/claude_code/events/tests.rs new file mode 100644 index 00000000..c8df4cc1 --- /dev/null +++ b/server/src/llm/providers/claude_code/events/tests.rs @@ -0,0 +1,613 @@ +//! Tests for Claude Code JSON event processing — covers both stream event routing +//! and the top-level JSON event dispatcher. + +use super::stream::handle_stream_event; +use super::*; +use crate::llm::types::{Message, Role}; +use serde_json::json; +use std::sync::atomic::{AtomicBool, Ordering}; + +type Channels = ( + tokio::sync::mpsc::UnboundedSender, + tokio::sync::mpsc::UnboundedReceiver, + tokio::sync::mpsc::UnboundedSender, + tokio::sync::mpsc::UnboundedReceiver, + tokio::sync::mpsc::UnboundedSender, + tokio::sync::mpsc::UnboundedReceiver, + std::sync::mpsc::Sender, + std::sync::mpsc::Receiver, +); + +fn make_channels() -> Channels { + let (tok_tx, tok_rx) = tokio::sync::mpsc::unbounded_channel(); + let (thi_tx, thi_rx) = tokio::sync::mpsc::unbounded_channel(); + let (act_tx, act_rx) = tokio::sync::mpsc::unbounded_channel(); + let (msg_tx, msg_rx) = std::sync::mpsc::channel(); + ( + tok_tx, tok_rx, thi_tx, thi_rx, act_tx, act_rx, msg_tx, msg_rx, + ) +} + +#[test] +fn handle_stream_event_text_delta_sends_token() { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({ + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": "hello "} + }); + handle_stream_event(&event, &tx, &ttx, &atx); + drop(tx); + let tokens: Vec<_> = { + let mut v = vec![]; + while let Ok(t) = rx.try_recv() { + v.push(t); + } + v + }; + assert_eq!(tokens, vec!["hello "]); +} + +#[test] +fn handle_stream_event_input_json_delta_not_sent() { + // Tool argument JSON deltas should NOT be sent as text tokens + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({ + "type": "content_block_delta", + "delta": {"type": "input_json_delta", "partial_json": "{\"path\":"} + }); + handle_stream_event(&event, &tx, &ttx, &atx); + drop(tx); + let tokens: Vec = { + let mut v = vec![]; + while let Ok(t) = rx.try_recv() { + v.push(t); + } + v + }; + assert!(tokens.is_empty()); +} + +#[test] +fn handle_stream_event_thinking_delta_routes_to_thinking_channel() { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (ttx, mut trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({ + "type": "content_block_delta", + "delta": {"type": "thinking_delta", "thinking": "I should check the file"} + }); + handle_stream_event(&event, &tx, &ttx, &atx); + drop(tx); + drop(ttx); + // thinking token must NOT appear in the regular token channel + let tokens: Vec = { + let mut v = vec![]; + while let Ok(t) = rx.try_recv() { + v.push(t); + } + v + }; + assert!( + tokens.is_empty(), + "thinking token leaked into token channel" + ); + // thinking token must appear in the dedicated thinking channel, without prefix + let thinking: Vec = { + let mut v = vec![]; + while let Ok(t) = trx.try_recv() { + v.push(t); + } + v + }; + assert_eq!(thinking, vec!["I should check the file"]); +} + +#[test] +fn handle_stream_event_error_sends_error_token() { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({ + "type": "error", + "error": {"type": "overloaded_error", "message": "Overloaded"} + }); + handle_stream_event(&event, &tx, &ttx, &atx); + drop(tx); + let tokens: Vec = { + let mut v = vec![]; + while let Ok(t) = rx.try_recv() { + v.push(t); + } + v + }; + assert_eq!(tokens, vec!["\n[error: Overloaded]\n"]); +} + +#[test] +fn handle_stream_event_unknown_type_is_noop() { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({"type": "ping"}); + handle_stream_event(&event, &tx, &ttx, &atx); + drop(tx); + let tokens: Vec = { + let mut v = vec![]; + while let Ok(t) = rx.try_recv() { + v.push(t); + } + v + }; + assert!(tokens.is_empty()); +} + +#[test] +fn handle_stream_event_tool_use_start_sends_activity() { + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({ + "type": "content_block_start", + "index": 1, + "content_block": {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {}} + }); + handle_stream_event(&event, &tx, &ttx, &atx); + drop(atx); + let activities: Vec = { + let mut v = vec![]; + while let Ok(a) = arx.try_recv() { + v.push(a); + } + v + }; + assert_eq!(activities, vec!["Read"]); +} + +#[test] +fn handle_stream_event_text_block_start_no_activity() { + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({ + "type": "content_block_start", + "index": 0, + "content_block": {"type": "text", "text": ""} + }); + handle_stream_event(&event, &tx, &ttx, &atx); + drop(atx); + let activities: Vec = { + let mut v = vec![]; + while let Ok(a) = arx.try_recv() { + v.push(a); + } + v + }; + assert!(activities.is_empty()); +} + +#[test] +fn process_json_event_result_returns_true() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::(); + let mut sid_tx_opt = Some(sid_tx); + let json = json!({"type": "result", "subtype": "success"}); + assert!(process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx_opt, + &AtomicBool::new(false), + )); +} + +#[test] +fn process_json_event_system_returns_false() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({"type": "system", "subtype": "init", "apiKeySource": "env"}); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); +} + +#[test] +fn process_json_event_rate_limit_returns_false() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({"type": "rate_limit_event"}); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); +} + +#[test] +fn process_json_event_unknown_type_returns_false() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({"type": "some_future_event"}); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); +} + +#[test] +fn process_json_event_no_type_returns_false() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({"content": "no type field"}); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); +} + +#[test] +fn process_json_event_captures_session_id() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let (sid_tx, mut sid_rx) = tokio::sync::oneshot::channel::(); + let mut sid_tx_opt = Some(sid_tx); + let json = json!({"type": "system", "session_id": "sess-abc-123"}); + process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx_opt, + &AtomicBool::new(false), + ); + // sid_tx should have been consumed + assert!(sid_tx_opt.is_none()); + let received = sid_rx.try_recv().unwrap(); + assert_eq!(received, "sess-abc-123"); +} + +#[test] +fn process_json_event_preserves_sid_tx_if_no_session_id() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::(); + let mut sid_tx_opt = Some(sid_tx); + let json = json!({"type": "system"}); + process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx_opt, + &AtomicBool::new(false), + ); + // sid_tx should still be present since no session_id in event + assert!(sid_tx_opt.is_some()); +} + +#[test] +fn process_json_event_stream_event_forwards_token() { + let (tok_tx, mut tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "stream_event", + "session_id": "s1", + "event": { + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": "word"} + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(tok_tx); + let tokens: Vec = { + let mut v = vec![]; + while let Ok(t) = tok_rx.try_recv() { + v.push(t); + } + v + }; + assert_eq!(tokens, vec!["word"]); +} + +#[test] +fn process_json_event_stream_event_tool_use_fires_activity() { + // This is the primary activity path: stream_event wrapping content_block_start + // with a tool_use block. Requires --include-partial-messages to be enabled. + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "stream_event", + "session_id": "s1", + "event": { + "type": "content_block_start", + "index": 1, + "content_block": {"type": "tool_use", "id": "toolu_abc", "name": "Bash", "input": {}} + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(act_tx); + let activities: Vec = { + let mut v = vec![]; + while let Ok(a) = act_rx.try_recv() { + v.push(a); + } + v + }; + assert_eq!(activities, vec!["Bash"]); +} + +#[test] +fn process_json_event_assistant_with_tool_use_fires_activity() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "assistant", + "message": { + "content": [ + {"type": "text", "text": "Let me read that file."}, + {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {"file_path": "/foo.rs"}} + ] + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(act_tx); + let activities: Vec = { + let mut v = vec![]; + while let Ok(a) = act_rx.try_recv() { + v.push(a); + } + v + }; + assert_eq!(activities, vec!["Read"]); +} + +#[test] +fn process_json_event_assistant_with_multiple_tool_uses_fires_all_activities() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "assistant", + "message": { + "content": [ + {"type": "tool_use", "id": "id1", "name": "Glob", "input": {}}, + {"type": "tool_use", "id": "id2", "name": "Bash", "input": {}} + ] + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(act_tx); + let activities: Vec = { + let mut v = vec![]; + while let Ok(a) = act_rx.try_recv() { + v.push(a); + } + v + }; + assert_eq!(activities, vec!["Glob", "Bash"]); +} + +#[test] +fn process_json_event_assistant_text_only_no_activity() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "assistant", + "message": { + "content": [{"type": "text", "text": "Just text, no tools."}] + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(act_tx); + let activities: Vec = { + let mut v = vec![]; + while let Ok(a) = act_rx.try_recv() { + v.push(a); + } + v + }; + assert!(activities.is_empty()); +} + +#[test] +fn process_json_event_assistant_event_parses_message() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "assistant", + "message": { + "content": [{"type": "text", "text": "Hi!"}] + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(msg_tx); + let msgs: Vec = msg_rx.try_iter().collect(); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].content, "Hi!"); +} + +#[test] +fn process_json_event_user_event_parses_tool_results() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "user", + "message": { + "content": [{"type": "tool_result", "tool_use_id": "tid1", "content": "done"}] + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(msg_tx); + let msgs: Vec = msg_rx.try_iter().collect(); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].role, Role::Tool); + assert_eq!(msgs[0].content, "done"); +} + +#[test] +fn process_json_event_assistant_without_content_array_is_noop() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "assistant", + "message": {"content": "not an array"} + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(msg_tx); + let msgs: Vec = msg_rx.try_iter().collect(); + assert!(msgs.is_empty()); +} + +#[test] +fn process_json_event_user_without_content_array_is_noop() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({"type": "user", "message": {"content": null}}); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(msg_tx); + let msgs: Vec = msg_rx.try_iter().collect(); + assert!(msgs.is_empty()); +} + +#[test] +fn process_json_event_detects_authentication_failed() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let auth_failed = AtomicBool::new(false); + let json = json!({ + "type": "assistant", + "error": "authentication_failed", + "message": { + "content": [{"type": "text", "text": "Failed to authenticate."}] + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &auth_failed, + )); + assert!(auth_failed.load(Ordering::Relaxed)); +} + +#[test] +fn process_json_event_no_auth_failed_for_normal_events() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let auth_failed = AtomicBool::new(false); + let json = json!({ + "type": "assistant", + "message": { + "content": [{"type": "text", "text": "Hello!"}] + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &auth_failed, + )); + assert!(!auth_failed.load(Ordering::Relaxed)); +} diff --git a/server/src/llm/providers/claude_code/mod.rs b/server/src/llm/providers/claude_code/mod.rs index 1e127488..598edae9 100644 --- a/server/src/llm/providers/claude_code/mod.rs +++ b/server/src/llm/providers/claude_code/mod.rs @@ -34,7 +34,7 @@ pub struct ClaudeCodeResult { mod events; mod parse; -use events::{handle_stream_event, process_json_event}; +use events::process_json_event; /// Orchestrates Claude Code CLI sessions via a PTY for streaming agent chat. pub struct ClaudeCodeProvider;