use crate::slog; use portable_pty::{CommandBuilder, PtySize, native_pty_system}; use std::io::{BufRead, BufReader}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use tokio::sync::watch; use crate::llm::types::{FunctionCall, Message, Role, ToolCall}; /// Result from a Claude Code session containing structured messages. pub struct ClaudeCodeResult { /// The conversation messages produced by Claude Code, including assistant /// turns (with optional tool_calls) and tool result turns. pub messages: Vec, /// Session ID for conversation resumption on subsequent requests. pub session_id: Option, } /// Manages a Claude Code session via a pseudo-terminal. /// /// Spawns `claude -p` in a PTY so isatty() returns true (which may /// influence billing), while using `--output-format stream-json` to /// get clean, structured NDJSON output instead of TUI escape sequences. /// /// Supports session resumption: if a `session_id` is provided, passes /// `--resume ` so Claude Code loads the prior conversation transcript /// from disk and continues with full context. /// /// Permissions are delegated to the MCP `prompt_permission` tool via /// `--permission-prompt-tool`, so Claude Code calls back into the server /// when a tool requires user approval. The frontend dialog handles the UX. pub struct ClaudeCodeProvider; impl ClaudeCodeProvider { pub fn new() -> Self { Self } #[allow(clippy::too_many_arguments)] pub async fn chat_stream( &self, user_message: &str, project_root: &str, session_id: Option<&str>, cancel_rx: &mut watch::Receiver, mut on_token: F, mut on_activity: A, ) -> Result where F: FnMut(&str) + Send, A: FnMut(&str) + Send, { let message = user_message.to_string(); let cwd = project_root.to_string(); let resume_id = session_id.map(|s| s.to_string()); let cancelled = Arc::new(AtomicBool::new(false)); let cancelled_clone = cancelled.clone(); let mut cancel_watch = cancel_rx.clone(); tokio::spawn(async move { while cancel_watch.changed().await.is_ok() { if *cancel_watch.borrow() { cancelled_clone.store(true, Ordering::Relaxed); break; } } }); let (token_tx, mut token_rx) = tokio::sync::mpsc::unbounded_channel::(); let (activity_tx, mut activity_rx) = tokio::sync::mpsc::unbounded_channel::(); let (msg_tx, msg_rx) = std::sync::mpsc::channel::(); let (sid_tx, sid_rx) = tokio::sync::oneshot::channel::(); let pty_handle = tokio::task::spawn_blocking(move || { run_pty_session( &message, &cwd, resume_id.as_deref(), cancelled, token_tx, activity_tx, msg_tx, sid_tx, ) }); loop { tokio::select! { msg = token_rx.recv() => match msg { Some(t) => on_token(&t), None => break, }, msg = activity_rx.recv() => if let Some(name) = msg { on_activity(&name); }, } } pty_handle .await .map_err(|e| format!("PTY task panicked: {e}"))??; let captured_session_id = sid_rx.await.ok(); let structured_messages: Vec = msg_rx.try_iter().collect(); Ok(ClaudeCodeResult { messages: structured_messages, session_id: captured_session_id, }) } } /// Run `claude -p` with stream-json output inside a PTY. /// /// The PTY makes isatty() return true. The `-p` flag gives us /// single-shot non-interactive mode with structured output. /// /// Sends streaming text tokens via `token_tx` for real-time display, and /// complete structured `Message` values via `msg_tx` for the final message /// history (assistant turns with tool_calls, and tool result turns). /// /// Permission handling is delegated to the MCP `prompt_permission` tool /// via `--permission-prompt-tool`. Claude Code calls the MCP tool when it /// needs user approval, and the server bridges the request to the frontend. #[allow(clippy::too_many_arguments)] fn run_pty_session( user_message: &str, cwd: &str, resume_session_id: Option<&str>, cancelled: Arc, token_tx: tokio::sync::mpsc::UnboundedSender, activity_tx: tokio::sync::mpsc::UnboundedSender, msg_tx: std::sync::mpsc::Sender, sid_tx: tokio::sync::oneshot::Sender, ) -> Result<(), String> { let pty_system = native_pty_system(); let pair = pty_system .openpty(PtySize { rows: 50, cols: 200, pixel_width: 0, pixel_height: 0, }) .map_err(|e| format!("Failed to open PTY: {e}"))?; let mut cmd = CommandBuilder::new("claude"); cmd.arg("-p"); cmd.arg(user_message); if let Some(sid) = resume_session_id { cmd.arg("--resume"); cmd.arg(sid); } cmd.arg("--output-format"); cmd.arg("stream-json"); cmd.arg("--verbose"); // Delegate permission decisions to the MCP prompt_permission tool. // Claude Code will call this tool via the story-kit MCP server when // a tool requires user approval, instead of using PTY stdin/stdout. cmd.arg("--permission-prompt-tool"); cmd.arg("mcp__story-kit__prompt_permission"); cmd.cwd(cwd); // Keep TERM reasonable but disable color cmd.env("NO_COLOR", "1"); // Allow nested spawning when the server itself runs inside Claude Code cmd.env("CLAUDECODE", ""); slog!( "[pty-debug] Spawning: claude -p \"{}\" {} --output-format stream-json --verbose --permission-prompt-tool mcp__story-kit__prompt_permission", user_message, resume_session_id .map(|s| format!("--resume {s}")) .unwrap_or_default() ); let mut child = pair .slave .spawn_command(cmd) .map_err(|e| format!("Failed to spawn claude: {e}"))?; slog!( "[pty-debug] Process spawned, pid: {:?}", child.process_id() ); drop(pair.slave); let reader = pair .master .try_clone_reader() .map_err(|e| format!("Failed to clone PTY reader: {e}"))?; // We no longer need the writer — permission responses flow through MCP, // not PTY stdin. Drop it so the PTY sees EOF on stdin when appropriate. drop(pair.master); // Read NDJSON lines from stdout let (line_tx, line_rx) = std::sync::mpsc::channel::>(); std::thread::spawn(move || { let buf_reader = BufReader::new(reader); slog!("[pty-debug] Reader thread started"); for line in buf_reader.lines() { match line { Ok(l) => { slog!("[pty-debug] raw line: {}", l); if line_tx.send(Some(l)).is_err() { break; } } Err(e) => { slog!("[pty-debug] read error: {e}"); let _ = line_tx.send(None); break; } } } slog!("[pty-debug] Reader thread done"); let _ = line_tx.send(None); }); let mut got_result = false; let mut sid_tx = Some(sid_tx); loop { if cancelled.load(Ordering::Relaxed) { let _ = child.kill(); return Err("Cancelled".to_string()); } match line_rx.recv_timeout(std::time::Duration::from_millis(500)) { Ok(Some(line)) => { let trimmed = line.trim(); if trimmed.is_empty() { continue; } slog!( "[pty-debug] processing: {}...", &trimmed[..trimmed.len().min(120)] ); // Try to parse as JSON if let Ok(json) = serde_json::from_str::(trimmed) && process_json_event(&json, &token_tx, &activity_tx, &msg_tx, &mut sid_tx) { got_result = true; } // Ignore non-JSON lines (terminal escape sequences) if got_result { break; } } Ok(None) => break, // EOF Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { // Check if child has exited if let Ok(Some(_status)) = child.try_wait() { // Drain remaining lines while let Ok(Some(line)) = line_rx.try_recv() { let trimmed = line.trim(); if let Ok(json) = serde_json::from_str::(trimmed) && let Some(event_type) = json.get("type").and_then(|t| t.as_str()) { match event_type { "stream_event" => { if let Some(event) = json.get("event") { handle_stream_event(event, &token_tx, &activity_tx); } } "assistant" => { if let Some(message) = json.get("message") && let Some(content) = message .get("content") .and_then(|c| c.as_array()) { parse_assistant_message(content, &msg_tx); } } "user" => { if let Some(message) = json.get("message") && let Some(content) = message .get("content") .and_then(|c| c.as_array()) { parse_tool_results(content, &msg_tx); } } _ => {} } } } break; } } Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, } // Don't set got_result here — just let the process finish naturally let _ = got_result; } // Wait briefly for Claude Code to flush its session transcript to disk. // The `result` event means the API response is done, but the process // still needs to write the conversation to the JSONL session file. match child.try_wait() { Ok(Some(_)) => {} // Already exited _ => { // Give it up to 2 seconds to exit cleanly for _ in 0..20 { std::thread::sleep(std::time::Duration::from_millis(100)); if let Ok(Some(_)) = child.try_wait() { break; } } // If still running after 2s, kill it let _ = child.kill(); } } Ok(()) } /// Dispatch a single parsed JSON event to the appropriate handler. /// /// Returns `true` if a `result` event was received (signals session completion). /// Captures the session ID from the first event that carries it. fn process_json_event( json: &serde_json::Value, token_tx: &tokio::sync::mpsc::UnboundedSender, activity_tx: &tokio::sync::mpsc::UnboundedSender, msg_tx: &std::sync::mpsc::Sender, sid_tx: &mut Option>, ) -> bool { let event_type = match json.get("type").and_then(|t| t.as_str()) { Some(t) => t, None => return false, }; // Capture session_id from the first event that carries it if let Some(tx) = sid_tx.take() { if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) { let _ = tx.send(sid.to_string()); } else { *sid_tx = Some(tx); } } match event_type { "stream_event" => { if let Some(event) = json.get("event") { handle_stream_event(event, token_tx, activity_tx); } false } "assistant" => { if let Some(message) = json.get("message") && let Some(content) = message.get("content").and_then(|c| c.as_array()) { parse_assistant_message(content, msg_tx); } false } "user" => { if let Some(message) = json.get("message") && let Some(content) = message.get("content").and_then(|c| c.as_array()) { parse_tool_results(content, msg_tx); } false } "result" => true, // system, rate_limit_event, and unknown types are no-ops _ => false, } } /// Parse a complete `assistant` message content array. /// /// Extracts text blocks into `content` and tool_use blocks into `tool_calls`, /// then sends a single `Message { role: Assistant }` via `msg_tx`. /// This is the authoritative source for the final message structure — streaming /// text deltas (via `handle_stream_event`) are only used for the live display. fn parse_assistant_message( content: &[serde_json::Value], msg_tx: &std::sync::mpsc::Sender, ) { let mut text = String::new(); let mut tool_calls: Vec = Vec::new(); for block in content { match block.get("type").and_then(|t| t.as_str()) { Some("text") => { if let Some(t) = block.get("text").and_then(|t| t.as_str()) { text.push_str(t); } } Some("tool_use") => { let id = block .get("id") .and_then(|v| v.as_str()) .map(|s| s.to_string()); let name = block .get("name") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); let input = block .get("input") .cloned() .unwrap_or(serde_json::Value::Object(serde_json::Map::new())); let arguments = serde_json::to_string(&input).unwrap_or_default(); tool_calls.push(ToolCall { id, function: FunctionCall { name, arguments }, kind: "function".to_string(), }); } _ => {} } } let msg = Message { role: Role::Assistant, content: text, tool_calls: if tool_calls.is_empty() { None } else { Some(tool_calls) }, tool_call_id: None, }; let _ = msg_tx.send(msg); } /// Parse a `user` message containing tool_result blocks. /// /// Claude Code injects tool results into the conversation as `user` role /// messages. Each `tool_result` block becomes a separate `Message { role: Tool }`. fn parse_tool_results( content: &[serde_json::Value], msg_tx: &std::sync::mpsc::Sender, ) { for block in content { if block.get("type").and_then(|t| t.as_str()) != Some("tool_result") { continue; } let tool_use_id = block .get("tool_use_id") .and_then(|v| v.as_str()) .map(|s| s.to_string()); // `content` in a tool_result can be a plain string or an array of content blocks let content_str = match block.get("content") { Some(serde_json::Value::String(s)) => s.clone(), Some(serde_json::Value::Array(arr)) => { // Extract text from content block array arr.iter() .filter_map(|b| { if b.get("type").and_then(|t| t.as_str()) == Some("text") { b.get("text").and_then(|t| t.as_str()).map(|s| s.to_string()) } else { None } }) .collect::>() .join("\n") } Some(other) => serde_json::to_string(other).unwrap_or_default(), None => String::new(), }; let _ = msg_tx.send(Message { role: Role::Tool, content: content_str, tool_calls: None, tool_call_id: tool_use_id, }); } } /// Extract text from a stream event and send to the token channel for live display. /// /// Stream events provide incremental text deltas for real-time rendering. /// The authoritative final message content comes from complete `assistant` events. fn handle_stream_event( event: &serde_json::Value, token_tx: &tokio::sync::mpsc::UnboundedSender, activity_tx: &tokio::sync::mpsc::UnboundedSender, ) { let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or(""); match event_type { "content_block_start" => { if let Some(cb) = event.get("content_block") && cb.get("type").and_then(|t| t.as_str()) == Some("tool_use") && let Some(name) = cb.get("name").and_then(|n| n.as_str()) { let _ = activity_tx.send(name.to_string()); } } // Text content streaming — only text_delta, not input_json_delta (tool args) "content_block_delta" => { if let Some(delta) = event.get("delta") { let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or(""); match delta_type { "text_delta" => { if let Some(text) = delta.get("text").and_then(|t| t.as_str()) { let _ = token_tx.send(text.to_string()); } } "thinking_delta" => { if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) { let _ = token_tx.send(format!("[thinking] {thinking}")); } } _ => {} } } } // Log errors via streaming display "error" => { if let Some(error) = event.get("error") { let msg = error .get("message") .and_then(|m| m.as_str()) .unwrap_or("unknown error"); let _ = token_tx.send(format!("\n[error: {msg}]\n")); } } _ => {} } } #[cfg(test)] mod tests { use super::*; use serde_json::json; fn collect_messages( f: impl Fn(&std::sync::mpsc::Sender), ) -> Vec { let (tx, rx) = std::sync::mpsc::channel(); f(&tx); drop(tx); rx.try_iter().collect() } #[test] fn parse_assistant_message_text_only() { let content = vec![json!({"type": "text", "text": "Hello, world!"})]; let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].role, Role::Assistant); assert_eq!(msgs[0].content, "Hello, world!"); assert!(msgs[0].tool_calls.is_none()); } #[test] fn parse_assistant_message_with_tool_use() { let content = vec![ json!({"type": "text", "text": "I'll read that file."}), json!({ "type": "tool_use", "id": "toolu_abc123", "name": "Read", "input": {"file_path": "/src/main.rs"} }), ]; let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); assert_eq!(msgs.len(), 1); let msg = &msgs[0]; assert_eq!(msg.role, Role::Assistant); assert_eq!(msg.content, "I'll read that file."); let tool_calls = msg.tool_calls.as_ref().expect("should have tool calls"); assert_eq!(tool_calls.len(), 1); assert_eq!(tool_calls[0].id.as_deref(), Some("toolu_abc123")); assert_eq!(tool_calls[0].function.name, "Read"); let args: serde_json::Value = serde_json::from_str(&tool_calls[0].function.arguments).unwrap(); assert_eq!(args["file_path"], "/src/main.rs"); } #[test] fn parse_assistant_message_multiple_tool_uses() { let content = vec![ json!({"type": "tool_use", "id": "id1", "name": "Glob", "input": {"pattern": "*.rs"}}), json!({"type": "tool_use", "id": "id2", "name": "Bash", "input": {"command": "cargo test"}}), ]; let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); assert_eq!(msgs.len(), 1); let tool_calls = msgs[0].tool_calls.as_ref().unwrap(); assert_eq!(tool_calls.len(), 2); assert_eq!(tool_calls[0].function.name, "Glob"); assert_eq!(tool_calls[1].function.name, "Bash"); } #[test] fn parse_tool_results_string_content() { let content = vec![json!({ "type": "tool_result", "tool_use_id": "toolu_abc123", "content": "fn main() { println!(\"hello\"); }" })]; let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].role, Role::Tool); assert_eq!(msgs[0].content, "fn main() { println!(\"hello\"); }"); assert_eq!(msgs[0].tool_call_id.as_deref(), Some("toolu_abc123")); } #[test] fn parse_tool_results_array_content() { let content = vec![json!({ "type": "tool_result", "tool_use_id": "toolu_xyz", "content": [ {"type": "text", "text": "Line 1"}, {"type": "text", "text": "Line 2"} ] })]; let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].content, "Line 1\nLine 2"); assert_eq!(msgs[0].tool_call_id.as_deref(), Some("toolu_xyz")); } #[test] fn parse_tool_results_multiple_results() { let content = vec![ json!({"type": "tool_result", "tool_use_id": "id1", "content": "result1"}), json!({"type": "tool_result", "tool_use_id": "id2", "content": "result2"}), ]; let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); assert_eq!(msgs.len(), 2); assert_eq!(msgs[0].tool_call_id.as_deref(), Some("id1")); assert_eq!(msgs[1].tool_call_id.as_deref(), Some("id2")); } #[test] fn parse_tool_results_ignores_non_tool_result_blocks() { let content = vec![ json!({"type": "text", "text": "not a tool result"}), json!({"type": "tool_result", "tool_use_id": "id1", "content": "actual result"}), ]; let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].tool_call_id.as_deref(), Some("id1")); } #[test] fn parse_assistant_message_empty_text_with_tool_use() { // When a message has only tool_use (no text), content should be empty string let content = vec![json!({ "type": "tool_use", "id": "toolu_1", "name": "Write", "input": {"file_path": "foo.rs", "content": "fn foo() {}"} })]; let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].content, ""); assert!(msgs[0].tool_calls.is_some()); } #[test] fn handle_stream_event_text_delta_sends_token() { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); let event = json!({ "type": "content_block_delta", "delta": {"type": "text_delta", "text": "hello "} }); handle_stream_event(&event, &tx, &atx); drop(tx); let tokens: Vec<_> = { let mut v = vec![]; while let Ok(t) = rx.try_recv() { v.push(t); } v }; assert_eq!(tokens, vec!["hello "]); } #[test] fn handle_stream_event_input_json_delta_not_sent() { // Tool argument JSON deltas should NOT be sent as text tokens let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); let event = json!({ "type": "content_block_delta", "delta": {"type": "input_json_delta", "partial_json": "{\"path\":"} }); handle_stream_event(&event, &tx, &atx); drop(tx); let tokens: Vec = { let mut v = vec![]; while let Ok(t) = rx.try_recv() { v.push(t); } v }; assert!(tokens.is_empty()); } #[test] fn handle_stream_event_thinking_delta_sends_prefixed_token() { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let (atx, _arx) = tokio::sync::mpsc::unbounded_channel(); let event = json!({ "type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "I should check the file"} }); handle_stream_event(&event, &tx, &atx); drop(tx); let tokens: Vec = { let mut v = vec![]; while let Ok(t) = rx.try_recv() { v.push(t); } v }; assert_eq!(tokens, vec!["[thinking] I should check the file"]); } #[test] fn handle_stream_event_error_sends_error_token() { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let (atx, _arx) = tokio::sync::mpsc::unbounded_channel(); let event = json!({ "type": "error", "error": {"type": "overloaded_error", "message": "Overloaded"} }); handle_stream_event(&event, &tx, &atx); drop(tx); let tokens: Vec = { let mut v = vec![]; while let Ok(t) = rx.try_recv() { v.push(t); } v }; assert_eq!(tokens, vec!["\n[error: Overloaded]\n"]); } #[test] fn handle_stream_event_unknown_type_is_noop() { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let (atx, _arx) = tokio::sync::mpsc::unbounded_channel(); let event = json!({"type": "ping"}); handle_stream_event(&event, &tx, &atx); drop(tx); let tokens: Vec = { let mut v = vec![]; while let Ok(t) = rx.try_recv() { v.push(t); } v }; assert!(tokens.is_empty()); } #[test] fn parse_tool_results_no_tool_use_id() { let content = vec![json!({"type": "tool_result", "content": "output"})]; let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); assert_eq!(msgs.len(), 1); assert!(msgs[0].tool_call_id.is_none()); assert_eq!(msgs[0].content, "output"); } #[test] fn parse_tool_results_null_content() { let content = vec![json!({"type": "tool_result", "tool_use_id": "id1"})]; let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].content, ""); } #[test] fn parse_tool_results_other_json_content() { let content = vec![json!({ "type": "tool_result", "tool_use_id": "id1", "content": {"nested": "object"} })]; let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); assert_eq!(msgs.len(), 1); // Falls through to serde_json::to_string assert!(!msgs[0].content.is_empty()); } #[test] fn parse_assistant_message_empty_content_array() { let content: Vec = vec![]; let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].content, ""); assert!(msgs[0].tool_calls.is_none()); } #[test] fn parse_assistant_message_unknown_block_type() { let content = vec![ json!({"type": "image", "source": {"type": "base64"}}), json!({"type": "text", "text": "done"}), ]; let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].content, "done"); } #[test] fn parse_assistant_message_tool_use_without_id() { let content = vec![json!({ "type": "tool_use", "name": "Bash", "input": {"command": "ls"} })]; let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); assert_eq!(msgs.len(), 1); let calls = msgs[0].tool_calls.as_ref().unwrap(); assert_eq!(calls.len(), 1); assert!(calls[0].id.is_none()); assert_eq!(calls[0].function.name, "Bash"); } #[test] fn parse_assistant_message_tool_use_without_input_defaults_to_empty_object() { let content = vec![json!({"type": "tool_use", "id": "tid", "name": "Read"})]; let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); let calls = msgs[0].tool_calls.as_ref().unwrap(); let args: serde_json::Value = serde_json::from_str(&calls[0].function.arguments).unwrap(); assert!(args.is_object()); assert!(args.as_object().unwrap().is_empty()); } type Channels = ( tokio::sync::mpsc::UnboundedSender, tokio::sync::mpsc::UnboundedReceiver, tokio::sync::mpsc::UnboundedSender, tokio::sync::mpsc::UnboundedReceiver, std::sync::mpsc::Sender, std::sync::mpsc::Receiver, ); fn make_channels() -> Channels { let (tok_tx, tok_rx) = tokio::sync::mpsc::unbounded_channel(); let (act_tx, act_rx) = tokio::sync::mpsc::unbounded_channel(); let (msg_tx, msg_rx) = std::sync::mpsc::channel(); (tok_tx, tok_rx, act_tx, act_rx, msg_tx, msg_rx) } #[test] fn process_json_event_result_returns_true() { let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::(); let mut sid_tx_opt = Some(sid_tx); let json = json!({"type": "result", "subtype": "success"}); assert!(process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx_opt)); } #[test] fn process_json_event_system_returns_false() { let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({"type": "system", "subtype": "init", "apiKeySource": "env"}); assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); } #[test] fn process_json_event_rate_limit_returns_false() { let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({"type": "rate_limit_event"}); assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); } #[test] fn process_json_event_unknown_type_returns_false() { let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({"type": "some_future_event"}); assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); } #[test] fn process_json_event_no_type_returns_false() { let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({"content": "no type field"}); assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); } #[test] fn process_json_event_captures_session_id() { let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let (sid_tx, mut sid_rx) = tokio::sync::oneshot::channel::(); let mut sid_tx_opt = Some(sid_tx); let json = json!({"type": "system", "session_id": "sess-abc-123"}); process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx_opt); // sid_tx should have been consumed assert!(sid_tx_opt.is_none()); let received = sid_rx.try_recv().unwrap(); assert_eq!(received, "sess-abc-123"); } #[test] fn process_json_event_preserves_sid_tx_if_no_session_id() { let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::(); let mut sid_tx_opt = Some(sid_tx); let json = json!({"type": "system"}); process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx_opt); // sid_tx should still be present since no session_id in event assert!(sid_tx_opt.is_some()); } #[test] fn process_json_event_stream_event_forwards_token() { let (tok_tx, mut tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "stream_event", "session_id": "s1", "event": { "type": "content_block_delta", "delta": {"type": "text_delta", "text": "word"} } }); assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(tok_tx); let tokens: Vec = { let mut v = vec![]; while let Ok(t) = tok_rx.try_recv() { v.push(t); } v }; assert_eq!(tokens, vec!["word"]); } #[test] fn process_json_event_assistant_event_parses_message() { let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "assistant", "message": { "content": [{"type": "text", "text": "Hi!"}] } }); assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(msg_tx); let msgs: Vec = msg_rx.try_iter().collect(); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].content, "Hi!"); } #[test] fn process_json_event_user_event_parses_tool_results() { let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "user", "message": { "content": [{"type": "tool_result", "tool_use_id": "tid1", "content": "done"}] } }); assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(msg_tx); let msgs: Vec = msg_rx.try_iter().collect(); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].role, Role::Tool); assert_eq!(msgs[0].content, "done"); } #[test] fn process_json_event_assistant_without_content_array_is_noop() { let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "assistant", "message": {"content": "not an array"} }); assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(msg_tx); let msgs: Vec = msg_rx.try_iter().collect(); assert!(msgs.is_empty()); } #[test] fn process_json_event_user_without_content_array_is_noop() { let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({"type": "user", "message": {"content": null}}); assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(msg_tx); let msgs: Vec = msg_rx.try_iter().collect(); assert!(msgs.is_empty()); } #[test] fn claude_code_provider_new() { let _provider = ClaudeCodeProvider::new(); } #[test] fn handle_stream_event_tool_use_start_sends_activity() { let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::(); let event = json!({ "type": "content_block_start", "index": 1, "content_block": {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {}} }); handle_stream_event(&event, &tx, &atx); drop(atx); let activities: Vec = { let mut v = vec![]; while let Ok(a) = arx.try_recv() { v.push(a); } v }; assert_eq!(activities, vec!["Read"]); } #[test] fn handle_stream_event_text_block_start_no_activity() { let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::(); let event = json!({ "type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""} }); handle_stream_event(&event, &tx, &atx); drop(atx); let activities: Vec = { let mut v = vec![]; while let Ok(a) = arx.try_recv() { v.push(a); } v }; assert!(activities.is_empty()); } }