diff --git a/server/src/llm/providers/claude_code.rs b/server/src/llm/providers/claude_code.rs deleted file mode 100644 index 7dc1fea7..00000000 --- a/server/src/llm/providers/claude_code.rs +++ /dev/null @@ -1,1427 +0,0 @@ -//! Claude Code provider — runs Claude Code CLI in a PTY and parses structured output. -use crate::slog; -use portable_pty::{CommandBuilder, PtySize, native_pty_system}; -use std::io::{BufRead, BufReader}; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; -use tokio::sync::watch; - -use crate::llm::types::{FunctionCall, Message, Role, ToolCall}; - -/// Result from a Claude Code session containing structured messages. -pub struct ClaudeCodeResult { - /// The conversation messages produced by Claude Code, including assistant - /// turns (with optional tool_calls) and tool result turns. - pub messages: Vec, - /// Session ID for conversation resumption on subsequent requests. - pub session_id: Option, -} - -/// Manages a Claude Code session via a pseudo-terminal. -/// -/// Spawns `claude -p` in a PTY so isatty() returns true (which may -/// influence billing), while using `--output-format stream-json` to -/// get clean, structured NDJSON output instead of TUI escape sequences. -/// -/// Supports session resumption: if a `session_id` is provided, passes -/// `--resume ` so Claude Code loads the prior conversation transcript -/// from disk and continues with full context. -/// -/// Permissions are delegated to the MCP `prompt_permission` tool via -/// `--permission-prompt-tool`, so Claude Code calls back into the server -/// when a tool requires user approval. The frontend dialog handles the UX. -pub struct ClaudeCodeProvider; - -impl ClaudeCodeProvider { - pub fn new() -> Self { - Self - } - - #[allow(clippy::too_many_arguments)] - pub async fn chat_stream( - &self, - user_message: &str, - project_root: &str, - session_id: Option<&str>, - system_prompt: Option<&str>, - cancel_rx: &mut watch::Receiver, - mut on_token: F, - mut on_thinking: T, - mut on_activity: A, - ) -> Result - where - F: FnMut(&str) + Send, - T: FnMut(&str) + Send, - A: FnMut(&str) + Send, - { - let cancelled = Arc::new(AtomicBool::new(false)); - let cancelled_clone = cancelled.clone(); - - let mut cancel_watch = cancel_rx.clone(); - tokio::spawn(async move { - while cancel_watch.changed().await.is_ok() { - if *cancel_watch.borrow() { - cancelled_clone.store(true, Ordering::Relaxed); - break; - } - } - }); - - // Attempt up to 2 times: first try, then retry after OAuth refresh. - for attempt in 0..2 { - let message = user_message.to_string(); - let cwd = project_root.to_string(); - let resume_id = session_id.map(|s| s.to_string()); - let sys_prompt = system_prompt.map(|s| s.to_string()); - let cancelled_inner = cancelled.clone(); - let auth_failed = Arc::new(AtomicBool::new(false)); - let auth_failed_clone = auth_failed.clone(); - - let (token_tx, mut token_rx) = tokio::sync::mpsc::unbounded_channel::(); - let (thinking_tx, mut thinking_rx) = tokio::sync::mpsc::unbounded_channel::(); - let (activity_tx, mut activity_rx) = tokio::sync::mpsc::unbounded_channel::(); - let (msg_tx, msg_rx) = std::sync::mpsc::channel::(); - let (sid_tx, sid_rx) = tokio::sync::oneshot::channel::(); - - let pty_handle = tokio::task::spawn_blocking(move || { - run_pty_session( - &message, - &cwd, - resume_id.as_deref(), - sys_prompt.as_deref(), - cancelled_inner, - auth_failed_clone, - token_tx, - thinking_tx, - activity_tx, - msg_tx, - sid_tx, - ) - }); - - loop { - tokio::select! { - msg = token_rx.recv() => match msg { - Some(t) => on_token(&t), - None => break, - }, - msg = thinking_rx.recv() => if let Some(t) = msg { - on_thinking(&t); - }, - msg = activity_rx.recv() => if let Some(name) = msg { - on_activity(&name); - }, - } - } - - // Drain any remaining activity/thinking messages that were buffered - // when the token channel closed. - while let Ok(t) = thinking_rx.try_recv() { - on_thinking(&t); - } - while let Ok(name) = activity_rx.try_recv() { - on_activity(&name); - } - - pty_handle - .await - .map_err(|e| format!("PTY task panicked: {e}"))??; - - // Check if the PTY session failed due to expired OAuth token. - if auth_failed.load(Ordering::Relaxed) && attempt == 0 { - slog!("[oauth] Authentication failed, attempting token refresh"); - match crate::llm::oauth::refresh_access_token().await { - Ok(()) => { - slog!("[oauth] Token refreshed, retrying request"); - on_token("\n*Refreshing authentication token...*\n"); - continue; - } - Err(_e) => { - let port = crate::http::resolve_port(); - let login_url = format!("http://localhost:{port}/oauth/authorize"); - return Err(format!( - "OAuth session expired or credentials missing. Please log in: {login_url}" - )); - } - } - } - - let captured_session_id = sid_rx.await.ok(); - slog!("[pty-debug] RECEIVED session_id: {:?}", captured_session_id); - let structured_messages: Vec = msg_rx.try_iter().collect(); - - return Ok(ClaudeCodeResult { - messages: structured_messages, - session_id: captured_session_id, - }); - } - - // Should never reach here, but just in case. - Err("Authentication failed after retry".to_string()) - } -} - -/// Run `claude -p` with stream-json output inside a PTY. -/// -/// The PTY makes isatty() return true. The `-p` flag gives us -/// single-shot non-interactive mode with structured output. -/// -/// Sends streaming text tokens via `token_tx` for real-time display, and -/// complete structured `Message` values via `msg_tx` for the final message -/// history (assistant turns with tool_calls, and tool result turns). -/// -/// Permission handling is delegated to the MCP `prompt_permission` tool -/// via `--permission-prompt-tool`. Claude Code calls the MCP tool when it -/// needs user approval, and the server bridges the request to the frontend. -#[allow(clippy::too_many_arguments)] -fn run_pty_session( - user_message: &str, - cwd: &str, - resume_session_id: Option<&str>, - _system_prompt: Option<&str>, - cancelled: Arc, - auth_failed: Arc, - token_tx: tokio::sync::mpsc::UnboundedSender, - thinking_tx: tokio::sync::mpsc::UnboundedSender, - activity_tx: tokio::sync::mpsc::UnboundedSender, - msg_tx: std::sync::mpsc::Sender, - sid_tx: tokio::sync::oneshot::Sender, -) -> Result<(), String> { - let pty_system = native_pty_system(); - - let pair = pty_system - .openpty(PtySize { - rows: 50, - cols: 200, - pixel_width: 0, - pixel_height: 0, - }) - .map_err(|e| format!("Failed to open PTY: {e}"))?; - - let mut cmd = CommandBuilder::new("claude"); - cmd.arg("-p"); - cmd.arg(user_message); - if let Some(sid) = resume_session_id { - cmd.arg("--resume"); - cmd.arg(sid); - } - cmd.arg("--output-format"); - cmd.arg("stream-json"); - cmd.arg("--verbose"); - // Enable partial streaming events so we receive stream_event messages - // containing raw API events (content_block_start, content_block_delta, - // etc.). Without this flag, only complete assistant/user/result events - // are emitted and tool-start activity signals never fire. - cmd.arg("--include-partial-messages"); - // Delegate permission decisions to the MCP prompt_permission tool. - // Claude Code will call this tool via the huskies MCP server when - // a tool requires user approval, instead of using PTY stdin/stdout. - cmd.arg("--permission-prompt-tool"); - cmd.arg("mcp__huskies__prompt_permission"); - // Note: --system is not a valid Claude Code CLI flag. System-level - // instructions (like bot name) are prepended to the user prompt instead. - cmd.cwd(cwd); - // Keep TERM reasonable but disable color - cmd.env("NO_COLOR", "1"); - // Allow nested spawning when the server itself runs inside Claude Code - cmd.env("CLAUDECODE", ""); - - slog!( - "[pty-debug] Spawning: claude -p \"{}\" {} --output-format stream-json --verbose --include-partial-messages --permission-prompt-tool mcp__huskies__prompt_permission", - user_message, - resume_session_id - .map(|s| format!("--resume {s}")) - .unwrap_or_default() - ); - - let mut child = pair - .slave - .spawn_command(cmd) - .map_err(|e| format!("Failed to spawn claude: {e}"))?; - - slog!("[pty-debug] Process spawned, pid: {:?}", child.process_id()); - drop(pair.slave); - - let reader = pair - .master - .try_clone_reader() - .map_err(|e| format!("Failed to clone PTY reader: {e}"))?; - - // We no longer need the writer — permission responses flow through MCP, - // not PTY stdin. Drop it so the PTY sees EOF on stdin when appropriate. - drop(pair.master); - - // Read NDJSON lines from stdout - let (line_tx, line_rx) = std::sync::mpsc::channel::>(); - - let reader_handle = std::thread::spawn(move || { - let buf_reader = BufReader::new(reader); - slog!("[pty-debug] Reader thread started"); - for line in buf_reader.lines() { - match line { - Ok(l) => { - slog!("[pty-debug] raw line: {}", l); - if line_tx.send(Some(l)).is_err() { - break; - } - } - Err(e) => { - slog!("[pty-debug] read error: {e}"); - let _ = line_tx.send(None); - break; - } - } - } - slog!("[pty-debug] Reader thread done"); - let _ = line_tx.send(None); - }); - - let mut got_result = false; - let mut sid_tx = Some(sid_tx); - - loop { - if cancelled.load(Ordering::Relaxed) { - let _ = child.kill(); - let _ = child.wait(); - let _ = reader_handle.join(); - return Err("Cancelled".to_string()); - } - - match line_rx.recv_timeout(std::time::Duration::from_millis(500)) { - Ok(Some(line)) => { - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } - - let mut end = trimmed.len().min(120); - while !trimmed.is_char_boundary(end) { - end -= 1; - } - slog!("[pty-debug] processing: {}...", &trimmed[..end]); - - // Try to parse as JSON - if let Ok(json) = serde_json::from_str::(trimmed) - && process_json_event( - &json, - &token_tx, - &thinking_tx, - &activity_tx, - &msg_tx, - &mut sid_tx, - &auth_failed, - ) - { - got_result = true; - } - // Ignore non-JSON lines (terminal escape sequences) - - if got_result { - break; - } - } - Ok(None) => break, // EOF - Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { - // Check if child has exited - if let Ok(Some(_status)) = child.try_wait() { - // Drain remaining lines through the same dispatch path - // (process_json_event) so activity signals fire correctly. - while let Ok(Some(line)) = line_rx.try_recv() { - let trimmed = line.trim(); - if let Ok(json) = serde_json::from_str::(trimmed) { - process_json_event( - &json, - &token_tx, - &thinking_tx, - &activity_tx, - &msg_tx, - &mut sid_tx, - &auth_failed, - ); - } - } - break; - } - } - Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, - } - - // Don't set got_result here — just let the process finish naturally - let _ = got_result; - } - - // Wait briefly for Claude Code to flush its session transcript to disk. - // The `result` event means the API response is done, but the process - // still needs to write the conversation to the JSONL session file. - match child.try_wait() { - Ok(Some(_)) => {} // Already exited - _ => { - // Give it up to 2 seconds to exit cleanly - for _ in 0..20 { - std::thread::sleep(std::time::Duration::from_millis(100)); - if let Ok(Some(_)) = child.try_wait() { - break; - } - } - // If still running after 2s, kill it - let _ = child.kill(); - let _ = child.wait(); - } - } - // Wait for the reader thread to release the cloned PTY master fd. - let _ = reader_handle.join(); - Ok(()) -} - -/// Dispatch a single parsed JSON event to the appropriate handler. -/// -/// Returns `true` if a `result` event was received (signals session completion). -/// Captures the session ID from the first event that carries it. -/// Sets `auth_failed` to `true` if an `authentication_failed` error is detected. -fn process_json_event( - json: &serde_json::Value, - token_tx: &tokio::sync::mpsc::UnboundedSender, - thinking_tx: &tokio::sync::mpsc::UnboundedSender, - activity_tx: &tokio::sync::mpsc::UnboundedSender, - msg_tx: &std::sync::mpsc::Sender, - sid_tx: &mut Option>, - auth_failed: &AtomicBool, -) -> bool { - let event_type = match json.get("type").and_then(|t| t.as_str()) { - Some(t) => t, - None => return false, - }; - - // Capture session_id from the first event that carries it - if let Some(tx) = sid_tx.take() { - if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) { - slog!("[pty-debug] CAPTURED session_id: {}", sid); - let _ = tx.send(sid.to_string()); - } else { - *sid_tx = Some(tx); - } - } - - // Detect authentication_failed at the top level of any event. - if json.get("error").and_then(|e| e.as_str()) == Some("authentication_failed") { - slog!("[pty-debug] Detected authentication_failed error"); - auth_failed.store(true, Ordering::Relaxed); - } - - match event_type { - "stream_event" => { - if let Some(event) = json.get("event") { - handle_stream_event(event, token_tx, thinking_tx, activity_tx); - } - false - } - "assistant" => { - if let Some(message) = json.get("message") - && let Some(content) = message.get("content").and_then(|c| c.as_array()) - { - // Fire activity signals for tool_use blocks as a fallback path. - // The primary path is via stream_event → content_block_start (real-time), - // but assistant events also carry tool_use blocks and serve as a reliable - // backup if stream_event delivery is delayed or missed. - for block in content { - if block.get("type").and_then(|t| t.as_str()) == Some("tool_use") - && let Some(name) = block.get("name").and_then(|n| n.as_str()) - { - let _ = activity_tx.send(name.to_string()); - } - } - parse_assistant_message(content, msg_tx); - } - false - } - "user" => { - if let Some(message) = json.get("message") - && let Some(content) = message.get("content").and_then(|c| c.as_array()) - { - parse_tool_results(content, msg_tx); - } - false - } - "result" => true, - // system, rate_limit_event, and unknown types are no-ops - _ => false, - } -} - -/// Parse a complete `assistant` message content array. -/// -/// Extracts text blocks into `content` and tool_use blocks into `tool_calls`, -/// then sends a single `Message { role: Assistant }` via `msg_tx`. -/// This is the authoritative source for the final message structure — streaming -/// text deltas (via `handle_stream_event`) are only used for the live display. -fn parse_assistant_message( - content: &[serde_json::Value], - msg_tx: &std::sync::mpsc::Sender, -) { - let mut text = String::new(); - let mut tool_calls: Vec = Vec::new(); - - for block in content { - match block.get("type").and_then(|t| t.as_str()) { - Some("text") => { - if let Some(t) = block.get("text").and_then(|t| t.as_str()) { - text.push_str(t); - } - } - Some("tool_use") => { - let id = block - .get("id") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()); - let name = block - .get("name") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - let input = block - .get("input") - .cloned() - .unwrap_or(serde_json::Value::Object(serde_json::Map::new())); - let arguments = serde_json::to_string(&input).unwrap_or_default(); - tool_calls.push(ToolCall { - id, - function: FunctionCall { name, arguments }, - kind: "function".to_string(), - }); - } - _ => {} - } - } - - let msg = Message { - role: Role::Assistant, - content: text, - tool_calls: if tool_calls.is_empty() { - None - } else { - Some(tool_calls) - }, - tool_call_id: None, - }; - let _ = msg_tx.send(msg); -} - -/// Parse a `user` message containing tool_result blocks. -/// -/// Claude Code injects tool results into the conversation as `user` role -/// messages. Each `tool_result` block becomes a separate `Message { role: Tool }`. -fn parse_tool_results(content: &[serde_json::Value], msg_tx: &std::sync::mpsc::Sender) { - for block in content { - if block.get("type").and_then(|t| t.as_str()) != Some("tool_result") { - continue; - } - - let tool_use_id = block - .get("tool_use_id") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()); - - // `content` in a tool_result can be a plain string or an array of content blocks - let content_str = match block.get("content") { - Some(serde_json::Value::String(s)) => s.clone(), - Some(serde_json::Value::Array(arr)) => { - // Extract text from content block array - arr.iter() - .filter_map(|b| { - if b.get("type").and_then(|t| t.as_str()) == Some("text") { - b.get("text") - .and_then(|t| t.as_str()) - .map(|s| s.to_string()) - } else { - None - } - }) - .collect::>() - .join("\n") - } - Some(other) => serde_json::to_string(other).unwrap_or_default(), - None => String::new(), - }; - - let _ = msg_tx.send(Message { - role: Role::Tool, - content: content_str, - tool_calls: None, - tool_call_id: tool_use_id, - }); - } -} - -/// Extract text from a stream event and send to the token channel for live display. -/// -/// Stream events provide incremental text deltas for real-time rendering. -/// The authoritative final message content comes from complete `assistant` events. -fn handle_stream_event( - event: &serde_json::Value, - token_tx: &tokio::sync::mpsc::UnboundedSender, - thinking_tx: &tokio::sync::mpsc::UnboundedSender, - activity_tx: &tokio::sync::mpsc::UnboundedSender, -) { - let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or(""); - - match event_type { - "content_block_start" => { - if let Some(cb) = event.get("content_block") - && cb.get("type").and_then(|t| t.as_str()) == Some("tool_use") - && let Some(name) = cb.get("name").and_then(|n| n.as_str()) - { - let _ = activity_tx.send(name.to_string()); - } - } - // Text content streaming — only text_delta, not input_json_delta (tool args) - "content_block_delta" => { - if let Some(delta) = event.get("delta") { - let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or(""); - match delta_type { - "text_delta" => { - if let Some(text) = delta.get("text").and_then(|t| t.as_str()) { - let _ = token_tx.send(text.to_string()); - } - } - "thinking_delta" => { - if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) { - let _ = thinking_tx.send(thinking.to_string()); - } - } - _ => {} - } - } - } - // Log errors via streaming display - "error" => { - if let Some(error) = event.get("error") { - let msg = error - .get("message") - .and_then(|m| m.as_str()) - .unwrap_or("unknown error"); - let _ = token_tx.send(format!("\n[error: {msg}]\n")); - } - } - _ => {} - } -} - -#[cfg(test)] -mod tests { - use super::*; - use serde_json::json; - - fn collect_messages(f: impl Fn(&std::sync::mpsc::Sender)) -> Vec { - let (tx, rx) = std::sync::mpsc::channel(); - f(&tx); - drop(tx); - rx.try_iter().collect() - } - - #[test] - fn parse_assistant_message_text_only() { - let content = vec![json!({"type": "text", "text": "Hello, world!"})]; - let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); - assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].role, Role::Assistant); - assert_eq!(msgs[0].content, "Hello, world!"); - assert!(msgs[0].tool_calls.is_none()); - } - - #[test] - fn parse_assistant_message_with_tool_use() { - let content = vec![ - json!({"type": "text", "text": "I'll read that file."}), - json!({ - "type": "tool_use", - "id": "toolu_abc123", - "name": "Read", - "input": {"file_path": "/src/main.rs"} - }), - ]; - let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); - assert_eq!(msgs.len(), 1); - let msg = &msgs[0]; - assert_eq!(msg.role, Role::Assistant); - assert_eq!(msg.content, "I'll read that file."); - let tool_calls = msg.tool_calls.as_ref().expect("should have tool calls"); - assert_eq!(tool_calls.len(), 1); - assert_eq!(tool_calls[0].id.as_deref(), Some("toolu_abc123")); - assert_eq!(tool_calls[0].function.name, "Read"); - let args: serde_json::Value = - serde_json::from_str(&tool_calls[0].function.arguments).unwrap(); - assert_eq!(args["file_path"], "/src/main.rs"); - } - - #[test] - fn parse_assistant_message_multiple_tool_uses() { - let content = vec![ - json!({"type": "tool_use", "id": "id1", "name": "Glob", "input": {"pattern": "*.rs"}}), - json!({"type": "tool_use", "id": "id2", "name": "Bash", "input": {"command": "cargo test"}}), - ]; - let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); - assert_eq!(msgs.len(), 1); - let tool_calls = msgs[0].tool_calls.as_ref().unwrap(); - assert_eq!(tool_calls.len(), 2); - assert_eq!(tool_calls[0].function.name, "Glob"); - assert_eq!(tool_calls[1].function.name, "Bash"); - } - - #[test] - fn parse_tool_results_string_content() { - let content = vec![json!({ - "type": "tool_result", - "tool_use_id": "toolu_abc123", - "content": "fn main() { println!(\"hello\"); }" - })]; - let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); - assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].role, Role::Tool); - assert_eq!(msgs[0].content, "fn main() { println!(\"hello\"); }"); - assert_eq!(msgs[0].tool_call_id.as_deref(), Some("toolu_abc123")); - } - - #[test] - fn parse_tool_results_array_content() { - let content = vec![json!({ - "type": "tool_result", - "tool_use_id": "toolu_xyz", - "content": [ - {"type": "text", "text": "Line 1"}, - {"type": "text", "text": "Line 2"} - ] - })]; - let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); - assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].content, "Line 1\nLine 2"); - assert_eq!(msgs[0].tool_call_id.as_deref(), Some("toolu_xyz")); - } - - #[test] - fn parse_tool_results_multiple_results() { - let content = vec![ - json!({"type": "tool_result", "tool_use_id": "id1", "content": "result1"}), - json!({"type": "tool_result", "tool_use_id": "id2", "content": "result2"}), - ]; - let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); - assert_eq!(msgs.len(), 2); - assert_eq!(msgs[0].tool_call_id.as_deref(), Some("id1")); - assert_eq!(msgs[1].tool_call_id.as_deref(), Some("id2")); - } - - #[test] - fn parse_tool_results_ignores_non_tool_result_blocks() { - let content = vec![ - json!({"type": "text", "text": "not a tool result"}), - json!({"type": "tool_result", "tool_use_id": "id1", "content": "actual result"}), - ]; - let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); - assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].tool_call_id.as_deref(), Some("id1")); - } - - #[test] - fn parse_assistant_message_empty_text_with_tool_use() { - // When a message has only tool_use (no text), content should be empty string - let content = vec![json!({ - "type": "tool_use", - "id": "toolu_1", - "name": "Write", - "input": {"file_path": "foo.rs", "content": "fn foo() {}"} - })]; - let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); - assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].content, ""); - assert!(msgs[0].tool_calls.is_some()); - } - - #[test] - fn handle_stream_event_text_delta_sends_token() { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); - let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); - let event = json!({ - "type": "content_block_delta", - "delta": {"type": "text_delta", "text": "hello "} - }); - handle_stream_event(&event, &tx, &ttx, &atx); - drop(tx); - let tokens: Vec<_> = { - let mut v = vec![]; - while let Ok(t) = rx.try_recv() { - v.push(t); - } - v - }; - assert_eq!(tokens, vec!["hello "]); - } - - #[test] - fn handle_stream_event_input_json_delta_not_sent() { - // Tool argument JSON deltas should NOT be sent as text tokens - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); - let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); - let event = json!({ - "type": "content_block_delta", - "delta": {"type": "input_json_delta", "partial_json": "{\"path\":"} - }); - handle_stream_event(&event, &tx, &ttx, &atx); - drop(tx); - let tokens: Vec = { - let mut v = vec![]; - while let Ok(t) = rx.try_recv() { - v.push(t); - } - v - }; - assert!(tokens.is_empty()); - } - - #[test] - fn handle_stream_event_thinking_delta_routes_to_thinking_channel() { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let (ttx, mut trx) = tokio::sync::mpsc::unbounded_channel::(); - let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); - let event = json!({ - "type": "content_block_delta", - "delta": {"type": "thinking_delta", "thinking": "I should check the file"} - }); - handle_stream_event(&event, &tx, &ttx, &atx); - drop(tx); - drop(ttx); - // thinking token must NOT appear in the regular token channel - let tokens: Vec = { - let mut v = vec![]; - while let Ok(t) = rx.try_recv() { - v.push(t); - } - v - }; - assert!( - tokens.is_empty(), - "thinking token leaked into token channel" - ); - // thinking token must appear in the dedicated thinking channel, without prefix - let thinking: Vec = { - let mut v = vec![]; - while let Ok(t) = trx.try_recv() { - v.push(t); - } - v - }; - assert_eq!(thinking, vec!["I should check the file"]); - } - - #[test] - fn handle_stream_event_error_sends_error_token() { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); - let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); - let event = json!({ - "type": "error", - "error": {"type": "overloaded_error", "message": "Overloaded"} - }); - handle_stream_event(&event, &tx, &ttx, &atx); - drop(tx); - let tokens: Vec = { - let mut v = vec![]; - while let Ok(t) = rx.try_recv() { - v.push(t); - } - v - }; - assert_eq!(tokens, vec!["\n[error: Overloaded]\n"]); - } - - #[test] - fn handle_stream_event_unknown_type_is_noop() { - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); - let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); - let event = json!({"type": "ping"}); - handle_stream_event(&event, &tx, &ttx, &atx); - drop(tx); - let tokens: Vec = { - let mut v = vec![]; - while let Ok(t) = rx.try_recv() { - v.push(t); - } - v - }; - assert!(tokens.is_empty()); - } - - #[test] - fn parse_tool_results_no_tool_use_id() { - let content = vec![json!({"type": "tool_result", "content": "output"})]; - let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); - assert_eq!(msgs.len(), 1); - assert!(msgs[0].tool_call_id.is_none()); - assert_eq!(msgs[0].content, "output"); - } - - #[test] - fn parse_tool_results_null_content() { - let content = vec![json!({"type": "tool_result", "tool_use_id": "id1"})]; - let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); - assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].content, ""); - } - - #[test] - fn parse_tool_results_other_json_content() { - let content = vec![json!({ - "type": "tool_result", - "tool_use_id": "id1", - "content": {"nested": "object"} - })]; - let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); - assert_eq!(msgs.len(), 1); - // Falls through to serde_json::to_string - assert!(!msgs[0].content.is_empty()); - } - - #[test] - fn parse_assistant_message_empty_content_array() { - let content: Vec = vec![]; - let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); - assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].content, ""); - assert!(msgs[0].tool_calls.is_none()); - } - - #[test] - fn parse_assistant_message_unknown_block_type() { - let content = vec![ - json!({"type": "image", "source": {"type": "base64"}}), - json!({"type": "text", "text": "done"}), - ]; - let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); - assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].content, "done"); - } - - #[test] - fn parse_assistant_message_tool_use_without_id() { - let content = vec![json!({ - "type": "tool_use", - "name": "Bash", - "input": {"command": "ls"} - })]; - let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); - assert_eq!(msgs.len(), 1); - let calls = msgs[0].tool_calls.as_ref().unwrap(); - assert_eq!(calls.len(), 1); - assert!(calls[0].id.is_none()); - assert_eq!(calls[0].function.name, "Bash"); - } - - #[test] - fn parse_assistant_message_tool_use_without_input_defaults_to_empty_object() { - let content = vec![json!({"type": "tool_use", "id": "tid", "name": "Read"})]; - let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); - let calls = msgs[0].tool_calls.as_ref().unwrap(); - let args: serde_json::Value = serde_json::from_str(&calls[0].function.arguments).unwrap(); - assert!(args.is_object()); - assert!(args.as_object().unwrap().is_empty()); - } - - type Channels = ( - tokio::sync::mpsc::UnboundedSender, - tokio::sync::mpsc::UnboundedReceiver, - tokio::sync::mpsc::UnboundedSender, - tokio::sync::mpsc::UnboundedReceiver, - tokio::sync::mpsc::UnboundedSender, - tokio::sync::mpsc::UnboundedReceiver, - std::sync::mpsc::Sender, - std::sync::mpsc::Receiver, - ); - - fn make_channels() -> Channels { - let (tok_tx, tok_rx) = tokio::sync::mpsc::unbounded_channel(); - let (thi_tx, thi_rx) = tokio::sync::mpsc::unbounded_channel(); - let (act_tx, act_rx) = tokio::sync::mpsc::unbounded_channel(); - let (msg_tx, msg_rx) = std::sync::mpsc::channel(); - ( - tok_tx, tok_rx, thi_tx, thi_rx, act_tx, act_rx, msg_tx, msg_rx, - ) - } - - #[test] - fn process_json_event_result_returns_true() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::(); - let mut sid_tx_opt = Some(sid_tx); - let json = json!({"type": "result", "subtype": "success"}); - assert!(process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx_opt, - &AtomicBool::new(false), - )); - } - - #[test] - fn process_json_event_system_returns_false() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({"type": "system", "subtype": "init", "apiKeySource": "env"}); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - } - - #[test] - fn process_json_event_rate_limit_returns_false() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({"type": "rate_limit_event"}); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - } - - #[test] - fn process_json_event_unknown_type_returns_false() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({"type": "some_future_event"}); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - } - - #[test] - fn process_json_event_no_type_returns_false() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({"content": "no type field"}); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - } - - #[test] - fn process_json_event_captures_session_id() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let (sid_tx, mut sid_rx) = tokio::sync::oneshot::channel::(); - let mut sid_tx_opt = Some(sid_tx); - let json = json!({"type": "system", "session_id": "sess-abc-123"}); - process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx_opt, - &AtomicBool::new(false), - ); - // sid_tx should have been consumed - assert!(sid_tx_opt.is_none()); - let received = sid_rx.try_recv().unwrap(); - assert_eq!(received, "sess-abc-123"); - } - - #[test] - fn process_json_event_preserves_sid_tx_if_no_session_id() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::(); - let mut sid_tx_opt = Some(sid_tx); - let json = json!({"type": "system"}); - process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx_opt, - &AtomicBool::new(false), - ); - // sid_tx should still be present since no session_id in event - assert!(sid_tx_opt.is_some()); - } - - #[test] - fn process_json_event_stream_event_forwards_token() { - let (tok_tx, mut tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = - make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "stream_event", - "session_id": "s1", - "event": { - "type": "content_block_delta", - "delta": {"type": "text_delta", "text": "word"} - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(tok_tx); - let tokens: Vec = { - let mut v = vec![]; - while let Ok(t) = tok_rx.try_recv() { - v.push(t); - } - v - }; - assert_eq!(tokens, vec!["word"]); - } - - #[test] - fn process_json_event_stream_event_tool_use_fires_activity() { - // This is the primary activity path: stream_event wrapping content_block_start - // with a tool_use block. Requires --include-partial-messages to be enabled. - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = - make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "stream_event", - "session_id": "s1", - "event": { - "type": "content_block_start", - "index": 1, - "content_block": {"type": "tool_use", "id": "toolu_abc", "name": "Bash", "input": {}} - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(act_tx); - let activities: Vec = { - let mut v = vec![]; - while let Ok(a) = act_rx.try_recv() { - v.push(a); - } - v - }; - assert_eq!(activities, vec!["Bash"]); - } - - #[test] - fn process_json_event_assistant_with_tool_use_fires_activity() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = - make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "assistant", - "message": { - "content": [ - {"type": "text", "text": "Let me read that file."}, - {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {"file_path": "/foo.rs"}} - ] - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(act_tx); - let activities: Vec = { - let mut v = vec![]; - while let Ok(a) = act_rx.try_recv() { - v.push(a); - } - v - }; - assert_eq!(activities, vec!["Read"]); - } - - #[test] - fn process_json_event_assistant_with_multiple_tool_uses_fires_all_activities() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = - make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "assistant", - "message": { - "content": [ - {"type": "tool_use", "id": "id1", "name": "Glob", "input": {}}, - {"type": "tool_use", "id": "id2", "name": "Bash", "input": {}} - ] - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(act_tx); - let activities: Vec = { - let mut v = vec![]; - while let Ok(a) = act_rx.try_recv() { - v.push(a); - } - v - }; - assert_eq!(activities, vec!["Glob", "Bash"]); - } - - #[test] - fn process_json_event_assistant_text_only_no_activity() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = - make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "assistant", - "message": { - "content": [{"type": "text", "text": "Just text, no tools."}] - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(act_tx); - let activities: Vec = { - let mut v = vec![]; - while let Ok(a) = act_rx.try_recv() { - v.push(a); - } - v - }; - assert!(activities.is_empty()); - } - - #[test] - fn process_json_event_assistant_event_parses_message() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "assistant", - "message": { - "content": [{"type": "text", "text": "Hi!"}] - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(msg_tx); - let msgs: Vec = msg_rx.try_iter().collect(); - assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].content, "Hi!"); - } - - #[test] - fn process_json_event_user_event_parses_tool_results() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "user", - "message": { - "content": [{"type": "tool_result", "tool_use_id": "tid1", "content": "done"}] - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(msg_tx); - let msgs: Vec = msg_rx.try_iter().collect(); - assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].role, Role::Tool); - assert_eq!(msgs[0].content, "done"); - } - - #[test] - fn process_json_event_assistant_without_content_array_is_noop() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({ - "type": "assistant", - "message": {"content": "not an array"} - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(msg_tx); - let msgs: Vec = msg_rx.try_iter().collect(); - assert!(msgs.is_empty()); - } - - #[test] - fn process_json_event_user_without_content_array_is_noop() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); - let mut sid_tx = None::>; - let json = json!({"type": "user", "message": {"content": null}}); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &AtomicBool::new(false), - )); - drop(msg_tx); - let msgs: Vec = msg_rx.try_iter().collect(); - assert!(msgs.is_empty()); - } - - #[test] - fn process_json_event_detects_authentication_failed() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let mut sid_tx = None::>; - let auth_failed = AtomicBool::new(false); - let json = json!({ - "type": "assistant", - "error": "authentication_failed", - "message": { - "content": [{"type": "text", "text": "Failed to authenticate."}] - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &auth_failed, - )); - assert!(auth_failed.load(Ordering::Relaxed)); - } - - #[test] - fn process_json_event_no_auth_failed_for_normal_events() { - let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); - let mut sid_tx = None::>; - let auth_failed = AtomicBool::new(false); - let json = json!({ - "type": "assistant", - "message": { - "content": [{"type": "text", "text": "Hello!"}] - } - }); - assert!(!process_json_event( - &json, - &tok_tx, - &thi_tx, - &act_tx, - &msg_tx, - &mut sid_tx, - &auth_failed, - )); - assert!(!auth_failed.load(Ordering::Relaxed)); - } - - #[test] - fn claude_code_provider_new() { - let _provider = ClaudeCodeProvider::new(); - } - - #[test] - fn handle_stream_event_tool_use_start_sends_activity() { - let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); - let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); - let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::(); - let event = json!({ - "type": "content_block_start", - "index": 1, - "content_block": {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {}} - }); - handle_stream_event(&event, &tx, &ttx, &atx); - drop(atx); - let activities: Vec = { - let mut v = vec![]; - while let Ok(a) = arx.try_recv() { - v.push(a); - } - v - }; - assert_eq!(activities, vec!["Read"]); - } - - #[test] - fn handle_stream_event_text_block_start_no_activity() { - let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); - let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); - let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::(); - let event = json!({ - "type": "content_block_start", - "index": 0, - "content_block": {"type": "text", "text": ""} - }); - handle_stream_event(&event, &tx, &ttx, &atx); - drop(atx); - let activities: Vec = { - let mut v = vec![]; - while let Ok(a) = arx.try_recv() { - v.push(a); - } - v - }; - assert!(activities.is_empty()); - } -} diff --git a/server/src/llm/providers/claude_code/events.rs b/server/src/llm/providers/claude_code/events.rs new file mode 100644 index 00000000..d43258e2 --- /dev/null +++ b/server/src/llm/providers/claude_code/events.rs @@ -0,0 +1,751 @@ +//! Claude Code event handling — streams JSON events from the CLI to the message channel. + +use serde_json::Value; + +use super::parse::{parse_assistant_message, parse_tool_results}; +use crate::llm::types::{Message, Role}; +use crate::slog; +use std::sync::atomic::{AtomicBool, Ordering}; + +pub(super) fn process_json_event( + json: &serde_json::Value, + token_tx: &tokio::sync::mpsc::UnboundedSender, + thinking_tx: &tokio::sync::mpsc::UnboundedSender, + activity_tx: &tokio::sync::mpsc::UnboundedSender, + msg_tx: &std::sync::mpsc::Sender, + sid_tx: &mut Option>, + auth_failed: &AtomicBool, +) -> bool { + let event_type = match json.get("type").and_then(|t| t.as_str()) { + Some(t) => t, + None => return false, + }; + + // Capture session_id from the first event that carries it + if let Some(tx) = sid_tx.take() { + if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) { + slog!("[pty-debug] CAPTURED session_id: {}", sid); + let _ = tx.send(sid.to_string()); + } else { + *sid_tx = Some(tx); + } + } + + // Detect authentication_failed at the top level of any event. + if json.get("error").and_then(|e| e.as_str()) == Some("authentication_failed") { + slog!("[pty-debug] Detected authentication_failed error"); + auth_failed.store(true, Ordering::Relaxed); + } + + match event_type { + "stream_event" => { + if let Some(event) = json.get("event") { + handle_stream_event(event, token_tx, thinking_tx, activity_tx); + } + false + } + "assistant" => { + if let Some(message) = json.get("message") + && let Some(content) = message.get("content").and_then(|c| c.as_array()) + { + // Fire activity signals for tool_use blocks as a fallback path. + // The primary path is via stream_event → content_block_start (real-time), + // but assistant events also carry tool_use blocks and serve as a reliable + // backup if stream_event delivery is delayed or missed. + for block in content { + if block.get("type").and_then(|t| t.as_str()) == Some("tool_use") + && let Some(name) = block.get("name").and_then(|n| n.as_str()) + { + let _ = activity_tx.send(name.to_string()); + } + } + parse_assistant_message(content, msg_tx); + } + false + } + "user" => { + if let Some(message) = json.get("message") + && let Some(content) = message.get("content").and_then(|c| c.as_array()) + { + parse_tool_results(content, msg_tx); + } + false + } + "result" => true, + // system, rate_limit_event, and unknown types are no-ops + _ => false, + } +} + +/// Parse a complete `assistant` message content array. +/// +/// Extracts text blocks into `content` and tool_use blocks into `tool_calls`, +/// then sends a single `Message { role: Assistant }` via `msg_tx`. +/// This is the authoritative source for the final message structure — streaming + +pub(super) fn handle_stream_event( + event: &serde_json::Value, + token_tx: &tokio::sync::mpsc::UnboundedSender, + thinking_tx: &tokio::sync::mpsc::UnboundedSender, + activity_tx: &tokio::sync::mpsc::UnboundedSender, +) { + let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or(""); + + match event_type { + "content_block_start" => { + if let Some(cb) = event.get("content_block") + && cb.get("type").and_then(|t| t.as_str()) == Some("tool_use") + && let Some(name) = cb.get("name").and_then(|n| n.as_str()) + { + let _ = activity_tx.send(name.to_string()); + } + } + // Text content streaming — only text_delta, not input_json_delta (tool args) + "content_block_delta" => { + if let Some(delta) = event.get("delta") { + let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or(""); + match delta_type { + "text_delta" => { + if let Some(text) = delta.get("text").and_then(|t| t.as_str()) { + let _ = token_tx.send(text.to_string()); + } + } + "thinking_delta" => { + if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) { + let _ = thinking_tx.send(thinking.to_string()); + } + } + _ => {} + } + } + } + // Log errors via streaming display + "error" => { + if let Some(error) = event.get("error") { + let msg = error + .get("message") + .and_then(|m| m.as_str()) + .unwrap_or("unknown error"); + let _ = token_tx.send(format!("\n[error: {msg}]\n")); + } + } + _ => {} + } +} + + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + type Channels = ( + tokio::sync::mpsc::UnboundedSender, + tokio::sync::mpsc::UnboundedReceiver, + tokio::sync::mpsc::UnboundedSender, + tokio::sync::mpsc::UnboundedReceiver, + tokio::sync::mpsc::UnboundedSender, + tokio::sync::mpsc::UnboundedReceiver, + std::sync::mpsc::Sender, + std::sync::mpsc::Receiver, + ); + + fn make_channels() -> Channels { + let (tok_tx, tok_rx) = tokio::sync::mpsc::unbounded_channel(); + let (thi_tx, thi_rx) = tokio::sync::mpsc::unbounded_channel(); + let (act_tx, act_rx) = tokio::sync::mpsc::unbounded_channel(); + let (msg_tx, msg_rx) = std::sync::mpsc::channel(); + ( + tok_tx, tok_rx, thi_tx, thi_rx, act_tx, act_rx, msg_tx, msg_rx, + ) + } + + #[test] + fn handle_stream_event_text_delta_sends_token() { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({ + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": "hello "} + }); + handle_stream_event(&event, &tx, &ttx, &atx); + drop(tx); + let tokens: Vec<_> = { + let mut v = vec![]; + while let Ok(t) = rx.try_recv() { + v.push(t); + } + v + }; + assert_eq!(tokens, vec!["hello "]); + } + + #[test] + fn handle_stream_event_input_json_delta_not_sent() { + // Tool argument JSON deltas should NOT be sent as text tokens + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({ + "type": "content_block_delta", + "delta": {"type": "input_json_delta", "partial_json": "{\"path\":"} + }); + handle_stream_event(&event, &tx, &ttx, &atx); + drop(tx); + let tokens: Vec = { + let mut v = vec![]; + while let Ok(t) = rx.try_recv() { + v.push(t); + } + v + }; + assert!(tokens.is_empty()); + } + + #[test] + fn handle_stream_event_thinking_delta_routes_to_thinking_channel() { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (ttx, mut trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({ + "type": "content_block_delta", + "delta": {"type": "thinking_delta", "thinking": "I should check the file"} + }); + handle_stream_event(&event, &tx, &ttx, &atx); + drop(tx); + drop(ttx); + // thinking token must NOT appear in the regular token channel + let tokens: Vec = { + let mut v = vec![]; + while let Ok(t) = rx.try_recv() { + v.push(t); + } + v + }; + assert!( + tokens.is_empty(), + "thinking token leaked into token channel" + ); + // thinking token must appear in the dedicated thinking channel, without prefix + let thinking: Vec = { + let mut v = vec![]; + while let Ok(t) = trx.try_recv() { + v.push(t); + } + v + }; + assert_eq!(thinking, vec!["I should check the file"]); + } + + #[test] + fn handle_stream_event_error_sends_error_token() { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({ + "type": "error", + "error": {"type": "overloaded_error", "message": "Overloaded"} + }); + handle_stream_event(&event, &tx, &ttx, &atx); + drop(tx); + let tokens: Vec = { + let mut v = vec![]; + while let Ok(t) = rx.try_recv() { + v.push(t); + } + v + }; + assert_eq!(tokens, vec!["\n[error: Overloaded]\n"]); + } + + #[test] + fn handle_stream_event_unknown_type_is_noop() { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({"type": "ping"}); + handle_stream_event(&event, &tx, &ttx, &atx); + drop(tx); + let tokens: Vec = { + let mut v = vec![]; + while let Ok(t) = rx.try_recv() { + v.push(t); + } + v + }; + assert!(tokens.is_empty()); + } + + #[test] + fn handle_stream_event_tool_use_start_sends_activity() { + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({ + "type": "content_block_start", + "index": 1, + "content_block": {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {}} + }); + handle_stream_event(&event, &tx, &ttx, &atx); + drop(atx); + let activities: Vec = { + let mut v = vec![]; + while let Ok(a) = arx.try_recv() { + v.push(a); + } + v + }; + assert_eq!(activities, vec!["Read"]); + } + + #[test] + fn handle_stream_event_text_block_start_no_activity() { + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({ + "type": "content_block_start", + "index": 0, + "content_block": {"type": "text", "text": ""} + }); + handle_stream_event(&event, &tx, &ttx, &atx); + drop(atx); + let activities: Vec = { + let mut v = vec![]; + while let Ok(a) = arx.try_recv() { + v.push(a); + } + v + }; + assert!(activities.is_empty()); + } + + #[test] + fn process_json_event_result_returns_true() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::(); + let mut sid_tx_opt = Some(sid_tx); + let json = json!({"type": "result", "subtype": "success"}); + assert!(process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx_opt, + &AtomicBool::new(false), + )); + } + + #[test] + fn process_json_event_system_returns_false() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({"type": "system", "subtype": "init", "apiKeySource": "env"}); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + } + + #[test] + fn process_json_event_rate_limit_returns_false() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({"type": "rate_limit_event"}); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + } + + #[test] + fn process_json_event_unknown_type_returns_false() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({"type": "some_future_event"}); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + } + + #[test] + fn process_json_event_no_type_returns_false() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({"content": "no type field"}); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + } + + #[test] + fn process_json_event_captures_session_id() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let (sid_tx, mut sid_rx) = tokio::sync::oneshot::channel::(); + let mut sid_tx_opt = Some(sid_tx); + let json = json!({"type": "system", "session_id": "sess-abc-123"}); + process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx_opt, + &AtomicBool::new(false), + ); + // sid_tx should have been consumed + assert!(sid_tx_opt.is_none()); + let received = sid_rx.try_recv().unwrap(); + assert_eq!(received, "sess-abc-123"); + } + + #[test] + fn process_json_event_preserves_sid_tx_if_no_session_id() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::(); + let mut sid_tx_opt = Some(sid_tx); + let json = json!({"type": "system"}); + process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx_opt, + &AtomicBool::new(false), + ); + // sid_tx should still be present since no session_id in event + assert!(sid_tx_opt.is_some()); + } + + #[test] + fn process_json_event_stream_event_forwards_token() { + let (tok_tx, mut tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = + make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "stream_event", + "session_id": "s1", + "event": { + "type": "content_block_delta", + "delta": {"type": "text_delta", "text": "word"} + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(tok_tx); + let tokens: Vec = { + let mut v = vec![]; + while let Ok(t) = tok_rx.try_recv() { + v.push(t); + } + v + }; + assert_eq!(tokens, vec!["word"]); + } + + #[test] + fn process_json_event_stream_event_tool_use_fires_activity() { + // This is the primary activity path: stream_event wrapping content_block_start + // with a tool_use block. Requires --include-partial-messages to be enabled. + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = + make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "stream_event", + "session_id": "s1", + "event": { + "type": "content_block_start", + "index": 1, + "content_block": {"type": "tool_use", "id": "toolu_abc", "name": "Bash", "input": {}} + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(act_tx); + let activities: Vec = { + let mut v = vec![]; + while let Ok(a) = act_rx.try_recv() { + v.push(a); + } + v + }; + assert_eq!(activities, vec!["Bash"]); + } + + #[test] + fn process_json_event_assistant_with_tool_use_fires_activity() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = + make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "assistant", + "message": { + "content": [ + {"type": "text", "text": "Let me read that file."}, + {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {"file_path": "/foo.rs"}} + ] + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(act_tx); + let activities: Vec = { + let mut v = vec![]; + while let Ok(a) = act_rx.try_recv() { + v.push(a); + } + v + }; + assert_eq!(activities, vec!["Read"]); + } + + #[test] + fn process_json_event_assistant_with_multiple_tool_uses_fires_all_activities() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = + make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "assistant", + "message": { + "content": [ + {"type": "tool_use", "id": "id1", "name": "Glob", "input": {}}, + {"type": "tool_use", "id": "id2", "name": "Bash", "input": {}} + ] + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(act_tx); + let activities: Vec = { + let mut v = vec![]; + while let Ok(a) = act_rx.try_recv() { + v.push(a); + } + v + }; + assert_eq!(activities, vec!["Glob", "Bash"]); + } + + #[test] + fn process_json_event_assistant_text_only_no_activity() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = + make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "assistant", + "message": { + "content": [{"type": "text", "text": "Just text, no tools."}] + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(act_tx); + let activities: Vec = { + let mut v = vec![]; + while let Ok(a) = act_rx.try_recv() { + v.push(a); + } + v + }; + assert!(activities.is_empty()); + } + + #[test] + fn process_json_event_assistant_event_parses_message() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "assistant", + "message": { + "content": [{"type": "text", "text": "Hi!"}] + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(msg_tx); + let msgs: Vec = msg_rx.try_iter().collect(); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].content, "Hi!"); + } + + #[test] + fn process_json_event_user_event_parses_tool_results() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "user", + "message": { + "content": [{"type": "tool_result", "tool_use_id": "tid1", "content": "done"}] + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(msg_tx); + let msgs: Vec = msg_rx.try_iter().collect(); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].role, Role::Tool); + assert_eq!(msgs[0].content, "done"); + } + + #[test] + fn process_json_event_assistant_without_content_array_is_noop() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({ + "type": "assistant", + "message": {"content": "not an array"} + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(msg_tx); + let msgs: Vec = msg_rx.try_iter().collect(); + assert!(msgs.is_empty()); + } + + #[test] + fn process_json_event_user_without_content_array_is_noop() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); + let mut sid_tx = None::>; + let json = json!({"type": "user", "message": {"content": null}}); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &AtomicBool::new(false), + )); + drop(msg_tx); + let msgs: Vec = msg_rx.try_iter().collect(); + assert!(msgs.is_empty()); + } + + #[test] + fn process_json_event_detects_authentication_failed() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let auth_failed = AtomicBool::new(false); + let json = json!({ + "type": "assistant", + "error": "authentication_failed", + "message": { + "content": [{"type": "text", "text": "Failed to authenticate."}] + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &auth_failed, + )); + assert!(auth_failed.load(Ordering::Relaxed)); + } + + #[test] + fn process_json_event_no_auth_failed_for_normal_events() { + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let mut sid_tx = None::>; + let auth_failed = AtomicBool::new(false); + let json = json!({ + "type": "assistant", + "message": { + "content": [{"type": "text", "text": "Hello!"}] + } + }); + assert!(!process_json_event( + &json, + &tok_tx, + &thi_tx, + &act_tx, + &msg_tx, + &mut sid_tx, + &auth_failed, + )); + assert!(!auth_failed.load(Ordering::Relaxed)); + } +} diff --git a/server/src/llm/providers/claude_code/mod.rs b/server/src/llm/providers/claude_code/mod.rs new file mode 100644 index 00000000..c55233d6 --- /dev/null +++ b/server/src/llm/providers/claude_code/mod.rs @@ -0,0 +1,395 @@ +//! Claude Code provider — runs Claude Code CLI in a PTY and parses structured output. +use crate::slog; +use portable_pty::{CommandBuilder, PtySize, native_pty_system}; +use std::io::{BufRead, BufReader}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use tokio::sync::watch; + +use crate::llm::types::{FunctionCall, Message, Role, ToolCall}; + +/// Result from a Claude Code session containing structured messages. +pub struct ClaudeCodeResult { + /// The conversation messages produced by Claude Code, including assistant + /// turns (with optional tool_calls) and tool result turns. + pub messages: Vec, + /// Session ID for conversation resumption on subsequent requests. + pub session_id: Option, +} + +/// Manages a Claude Code session via a pseudo-terminal. +/// +/// Spawns `claude -p` in a PTY so isatty() returns true (which may +/// influence billing), while using `--output-format stream-json` to +/// get clean, structured NDJSON output instead of TUI escape sequences. +/// +/// Supports session resumption: if a `session_id` is provided, passes +/// `--resume ` so Claude Code loads the prior conversation transcript +/// from disk and continues with full context. +/// +/// Permissions are delegated to the MCP `prompt_permission` tool via +/// `--permission-prompt-tool`, so Claude Code calls back into the server +/// when a tool requires user approval. The frontend dialog handles the UX. + +mod events; +mod parse; + +use events::{handle_stream_event, process_json_event}; + +pub struct ClaudeCodeProvider; + +impl ClaudeCodeProvider { + pub fn new() -> Self { + Self + } + + #[allow(clippy::too_many_arguments)] + pub async fn chat_stream( + &self, + user_message: &str, + project_root: &str, + session_id: Option<&str>, + system_prompt: Option<&str>, + cancel_rx: &mut watch::Receiver, + mut on_token: F, + mut on_thinking: T, + mut on_activity: A, + ) -> Result + where + F: FnMut(&str) + Send, + T: FnMut(&str) + Send, + A: FnMut(&str) + Send, + { + let cancelled = Arc::new(AtomicBool::new(false)); + let cancelled_clone = cancelled.clone(); + + let mut cancel_watch = cancel_rx.clone(); + tokio::spawn(async move { + while cancel_watch.changed().await.is_ok() { + if *cancel_watch.borrow() { + cancelled_clone.store(true, Ordering::Relaxed); + break; + } + } + }); + + // Attempt up to 2 times: first try, then retry after OAuth refresh. + for attempt in 0..2 { + let message = user_message.to_string(); + let cwd = project_root.to_string(); + let resume_id = session_id.map(|s| s.to_string()); + let sys_prompt = system_prompt.map(|s| s.to_string()); + let cancelled_inner = cancelled.clone(); + let auth_failed = Arc::new(AtomicBool::new(false)); + let auth_failed_clone = auth_failed.clone(); + + let (token_tx, mut token_rx) = tokio::sync::mpsc::unbounded_channel::(); + let (thinking_tx, mut thinking_rx) = tokio::sync::mpsc::unbounded_channel::(); + let (activity_tx, mut activity_rx) = tokio::sync::mpsc::unbounded_channel::(); + let (msg_tx, msg_rx) = std::sync::mpsc::channel::(); + let (sid_tx, sid_rx) = tokio::sync::oneshot::channel::(); + + let pty_handle = tokio::task::spawn_blocking(move || { + run_pty_session( + &message, + &cwd, + resume_id.as_deref(), + sys_prompt.as_deref(), + cancelled_inner, + auth_failed_clone, + token_tx, + thinking_tx, + activity_tx, + msg_tx, + sid_tx, + ) + }); + + loop { + tokio::select! { + msg = token_rx.recv() => match msg { + Some(t) => on_token(&t), + None => break, + }, + msg = thinking_rx.recv() => if let Some(t) = msg { + on_thinking(&t); + }, + msg = activity_rx.recv() => if let Some(name) = msg { + on_activity(&name); + }, + } + } + + // Drain any remaining activity/thinking messages that were buffered + // when the token channel closed. + while let Ok(t) = thinking_rx.try_recv() { + on_thinking(&t); + } + while let Ok(name) = activity_rx.try_recv() { + on_activity(&name); + } + + pty_handle + .await + .map_err(|e| format!("PTY task panicked: {e}"))??; + + // Check if the PTY session failed due to expired OAuth token. + if auth_failed.load(Ordering::Relaxed) && attempt == 0 { + slog!("[oauth] Authentication failed, attempting token refresh"); + match crate::llm::oauth::refresh_access_token().await { + Ok(()) => { + slog!("[oauth] Token refreshed, retrying request"); + on_token("\n*Refreshing authentication token...*\n"); + continue; + } + Err(_e) => { + let port = crate::http::resolve_port(); + let login_url = format!("http://localhost:{port}/oauth/authorize"); + return Err(format!( + "OAuth session expired or credentials missing. Please log in: {login_url}" + )); + } + } + } + + let captured_session_id = sid_rx.await.ok(); + slog!("[pty-debug] RECEIVED session_id: {:?}", captured_session_id); + let structured_messages: Vec = msg_rx.try_iter().collect(); + + return Ok(ClaudeCodeResult { + messages: structured_messages, + session_id: captured_session_id, + }); + } + + // Should never reach here, but just in case. + Err("Authentication failed after retry".to_string()) + } +} + +/// Run `claude -p` with stream-json output inside a PTY. +/// +/// The PTY makes isatty() return true. The `-p` flag gives us +/// single-shot non-interactive mode with structured output. +/// +/// Sends streaming text tokens via `token_tx` for real-time display, and +/// complete structured `Message` values via `msg_tx` for the final message +/// history (assistant turns with tool_calls, and tool result turns). +/// +/// Permission handling is delegated to the MCP `prompt_permission` tool +/// via `--permission-prompt-tool`. Claude Code calls the MCP tool when it +/// needs user approval, and the server bridges the request to the frontend. + +fn run_pty_session( + user_message: &str, + cwd: &str, + resume_session_id: Option<&str>, + _system_prompt: Option<&str>, + cancelled: Arc, + auth_failed: Arc, + token_tx: tokio::sync::mpsc::UnboundedSender, + thinking_tx: tokio::sync::mpsc::UnboundedSender, + activity_tx: tokio::sync::mpsc::UnboundedSender, + msg_tx: std::sync::mpsc::Sender, + sid_tx: tokio::sync::oneshot::Sender, +) -> Result<(), String> { + let pty_system = native_pty_system(); + + let pair = pty_system + .openpty(PtySize { + rows: 50, + cols: 200, + pixel_width: 0, + pixel_height: 0, + }) + .map_err(|e| format!("Failed to open PTY: {e}"))?; + + let mut cmd = CommandBuilder::new("claude"); + cmd.arg("-p"); + cmd.arg(user_message); + if let Some(sid) = resume_session_id { + cmd.arg("--resume"); + cmd.arg(sid); + } + cmd.arg("--output-format"); + cmd.arg("stream-json"); + cmd.arg("--verbose"); + // Enable partial streaming events so we receive stream_event messages + // containing raw API events (content_block_start, content_block_delta, + // etc.). Without this flag, only complete assistant/user/result events + // are emitted and tool-start activity signals never fire. + cmd.arg("--include-partial-messages"); + // Delegate permission decisions to the MCP prompt_permission tool. + // Claude Code will call this tool via the huskies MCP server when + // a tool requires user approval, instead of using PTY stdin/stdout. + cmd.arg("--permission-prompt-tool"); + cmd.arg("mcp__huskies__prompt_permission"); + // Note: --system is not a valid Claude Code CLI flag. System-level + // instructions (like bot name) are prepended to the user prompt instead. + cmd.cwd(cwd); + // Keep TERM reasonable but disable color + cmd.env("NO_COLOR", "1"); + // Allow nested spawning when the server itself runs inside Claude Code + cmd.env("CLAUDECODE", ""); + + slog!( + "[pty-debug] Spawning: claude -p \"{}\" {} --output-format stream-json --verbose --include-partial-messages --permission-prompt-tool mcp__huskies__prompt_permission", + user_message, + resume_session_id + .map(|s| format!("--resume {s}")) + .unwrap_or_default() + ); + + let mut child = pair + .slave + .spawn_command(cmd) + .map_err(|e| format!("Failed to spawn claude: {e}"))?; + + slog!("[pty-debug] Process spawned, pid: {:?}", child.process_id()); + drop(pair.slave); + + let reader = pair + .master + .try_clone_reader() + .map_err(|e| format!("Failed to clone PTY reader: {e}"))?; + + // We no longer need the writer — permission responses flow through MCP, + // not PTY stdin. Drop it so the PTY sees EOF on stdin when appropriate. + drop(pair.master); + + // Read NDJSON lines from stdout + let (line_tx, line_rx) = std::sync::mpsc::channel::>(); + + let reader_handle = std::thread::spawn(move || { + let buf_reader = BufReader::new(reader); + slog!("[pty-debug] Reader thread started"); + for line in buf_reader.lines() { + match line { + Ok(l) => { + slog!("[pty-debug] raw line: {}", l); + if line_tx.send(Some(l)).is_err() { + break; + } + } + Err(e) => { + slog!("[pty-debug] read error: {e}"); + let _ = line_tx.send(None); + break; + } + } + } + slog!("[pty-debug] Reader thread done"); + let _ = line_tx.send(None); + }); + + let mut got_result = false; + let mut sid_tx = Some(sid_tx); + + loop { + if cancelled.load(Ordering::Relaxed) { + let _ = child.kill(); + let _ = child.wait(); + let _ = reader_handle.join(); + return Err("Cancelled".to_string()); + } + + match line_rx.recv_timeout(std::time::Duration::from_millis(500)) { + Ok(Some(line)) => { + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + + let mut end = trimmed.len().min(120); + while !trimmed.is_char_boundary(end) { + end -= 1; + } + slog!("[pty-debug] processing: {}...", &trimmed[..end]); + + // Try to parse as JSON + if let Ok(json) = serde_json::from_str::(trimmed) + && process_json_event( + &json, + &token_tx, + &thinking_tx, + &activity_tx, + &msg_tx, + &mut sid_tx, + &auth_failed, + ) + { + got_result = true; + } + // Ignore non-JSON lines (terminal escape sequences) + + if got_result { + break; + } + } + Ok(None) => break, // EOF + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + // Check if child has exited + if let Ok(Some(_status)) = child.try_wait() { + // Drain remaining lines through the same dispatch path + // (process_json_event) so activity signals fire correctly. + while let Ok(Some(line)) = line_rx.try_recv() { + let trimmed = line.trim(); + if let Ok(json) = serde_json::from_str::(trimmed) { + process_json_event( + &json, + &token_tx, + &thinking_tx, + &activity_tx, + &msg_tx, + &mut sid_tx, + &auth_failed, + ); + } + } + break; + } + } + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, + } + + // Don't set got_result here — just let the process finish naturally + let _ = got_result; + } + + // Wait briefly for Claude Code to flush its session transcript to disk. + // The `result` event means the API response is done, but the process + // still needs to write the conversation to the JSONL session file. + match child.try_wait() { + Ok(Some(_)) => {} // Already exited + _ => { + // Give it up to 2 seconds to exit cleanly + for _ in 0..20 { + std::thread::sleep(std::time::Duration::from_millis(100)); + if let Ok(Some(_)) = child.try_wait() { + break; + } + } + // If still running after 2s, kill it + let _ = child.kill(); + let _ = child.wait(); + } + } + // Wait for the reader thread to release the cloned PTY master fd. + let _ = reader_handle.join(); + Ok(()) +} + +/// Dispatch a single parsed JSON event to the appropriate handler. +/// +/// Returns `true` if a `result` event was received (signals session completion). +/// Captures the session ID from the first event that carries it. + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn claude_code_provider_new() { + let _provider = ClaudeCodeProvider::new(); + } +} diff --git a/server/src/llm/providers/claude_code/parse.rs b/server/src/llm/providers/claude_code/parse.rs new file mode 100644 index 00000000..29460782 --- /dev/null +++ b/server/src/llm/providers/claude_code/parse.rs @@ -0,0 +1,332 @@ +//! Claude Code message parsing — extracts text and tool-use info from assistant +//! and user messages. + +use crate::llm::types::{FunctionCall, Message, Role, ToolCall}; + +pub(super) fn parse_assistant_message( + content: &[serde_json::Value], + msg_tx: &std::sync::mpsc::Sender, +) { + let mut text = String::new(); + let mut tool_calls: Vec = Vec::new(); + + for block in content { + match block.get("type").and_then(|t| t.as_str()) { + Some("text") => { + if let Some(t) = block.get("text").and_then(|t| t.as_str()) { + text.push_str(t); + } + } + Some("tool_use") => { + let id = block + .get("id") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + let name = block + .get("name") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let input = block + .get("input") + .cloned() + .unwrap_or(serde_json::Value::Object(serde_json::Map::new())); + let arguments = serde_json::to_string(&input).unwrap_or_default(); + tool_calls.push(ToolCall { + id, + function: FunctionCall { name, arguments }, + kind: "function".to_string(), + }); + } + _ => {} + } + } + + let msg = Message { + role: Role::Assistant, + content: text, + tool_calls: if tool_calls.is_empty() { + None + } else { + Some(tool_calls) + }, + tool_call_id: None, + }; + let _ = msg_tx.send(msg); +} + +/// Parse a `user` message containing tool_result blocks. +/// +/// Claude Code injects tool results into the conversation as `user` role + +pub(super) fn parse_tool_results(content: &[serde_json::Value], msg_tx: &std::sync::mpsc::Sender) { + for block in content { + if block.get("type").and_then(|t| t.as_str()) != Some("tool_result") { + continue; + } + + let tool_use_id = block + .get("tool_use_id") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + // `content` in a tool_result can be a plain string or an array of content blocks + let content_str = match block.get("content") { + Some(serde_json::Value::String(s)) => s.clone(), + Some(serde_json::Value::Array(arr)) => { + // Extract text from content block array + arr.iter() + .filter_map(|b| { + if b.get("type").and_then(|t| t.as_str()) == Some("text") { + b.get("text") + .and_then(|t| t.as_str()) + .map(|s| s.to_string()) + } else { + None + } + }) + .collect::>() + .join("\n") + } + Some(other) => serde_json::to_string(other).unwrap_or_default(), + None => String::new(), + }; + + let _ = msg_tx.send(Message { + role: Role::Tool, + content: content_str, + tool_calls: None, + tool_call_id: tool_use_id, + }); + } +} + +/// Extract text from a stream event and send to the token channel for live display. +/// +/// Stream events provide incremental text deltas for real-time rendering. + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn collect_messages(f: impl Fn(&std::sync::mpsc::Sender)) -> Vec { + let (tx, rx) = std::sync::mpsc::channel(); + f(&tx); + drop(tx); + rx.try_iter().collect() + } + + #[test] + fn parse_assistant_message_text_only() { + let content = vec![json!({"type": "text", "text": "Hello, world!"})]; + let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].role, Role::Assistant); + assert_eq!(msgs[0].content, "Hello, world!"); + assert!(msgs[0].tool_calls.is_none()); + } + + #[test] + fn parse_assistant_message_with_tool_use() { + let content = vec![ + json!({"type": "text", "text": "I'll read that file."}), + json!({ + "type": "tool_use", + "id": "toolu_abc123", + "name": "Read", + "input": {"file_path": "/src/main.rs"} + }), + ]; + let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); + assert_eq!(msgs.len(), 1); + let msg = &msgs[0]; + assert_eq!(msg.role, Role::Assistant); + assert_eq!(msg.content, "I'll read that file."); + let tool_calls = msg.tool_calls.as_ref().expect("should have tool calls"); + assert_eq!(tool_calls.len(), 1); + assert_eq!(tool_calls[0].id.as_deref(), Some("toolu_abc123")); + assert_eq!(tool_calls[0].function.name, "Read"); + let args: serde_json::Value = + serde_json::from_str(&tool_calls[0].function.arguments).unwrap(); + assert_eq!(args["file_path"], "/src/main.rs"); + } + + #[test] + fn parse_assistant_message_multiple_tool_uses() { + let content = vec![ + json!({"type": "tool_use", "id": "id1", "name": "Glob", "input": {"pattern": "*.rs"}}), + json!({"type": "tool_use", "id": "id2", "name": "Bash", "input": {"command": "cargo test"}}), + ]; + let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); + assert_eq!(msgs.len(), 1); + let tool_calls = msgs[0].tool_calls.as_ref().unwrap(); + assert_eq!(tool_calls.len(), 2); + assert_eq!(tool_calls[0].function.name, "Glob"); + assert_eq!(tool_calls[1].function.name, "Bash"); + } + + #[test] + fn parse_tool_results_string_content() { + let content = vec![json!({ + "type": "tool_result", + "tool_use_id": "toolu_abc123", + "content": "fn main() { println!(\"hello\"); }" + })]; + let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].role, Role::Tool); + assert_eq!(msgs[0].content, "fn main() { println!(\"hello\"); }"); + assert_eq!(msgs[0].tool_call_id.as_deref(), Some("toolu_abc123")); + } + + #[test] + fn parse_tool_results_array_content() { + let content = vec![json!({ + "type": "tool_result", + "tool_use_id": "toolu_xyz", + "content": [ + {"type": "text", "text": "Line 1"}, + {"type": "text", "text": "Line 2"} + ] + })]; + let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].content, "Line 1\nLine 2"); + assert_eq!(msgs[0].tool_call_id.as_deref(), Some("toolu_xyz")); + } + + #[test] + fn parse_tool_results_multiple_results() { + let content = vec![ + json!({"type": "tool_result", "tool_use_id": "id1", "content": "result1"}), + json!({"type": "tool_result", "tool_use_id": "id2", "content": "result2"}), + ]; + let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); + assert_eq!(msgs.len(), 2); + assert_eq!(msgs[0].tool_call_id.as_deref(), Some("id1")); + assert_eq!(msgs[1].tool_call_id.as_deref(), Some("id2")); + } + + #[test] + fn parse_tool_results_ignores_non_tool_result_blocks() { + let content = vec![ + json!({"type": "text", "text": "not a tool result"}), + json!({"type": "tool_result", "tool_use_id": "id1", "content": "actual result"}), + ]; + let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].tool_call_id.as_deref(), Some("id1")); + } + + #[test] + fn parse_assistant_message_empty_text_with_tool_use() { + // When a message has only tool_use (no text), content should be empty string + let content = vec![json!({ + "type": "tool_use", + "id": "toolu_1", + "name": "Write", + "input": {"file_path": "foo.rs", "content": "fn foo() {}"} + })]; + let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].content, ""); + assert!(msgs[0].tool_calls.is_some()); + } + + #[test] + fn parse_tool_results_no_tool_use_id() { + let content = vec![json!({"type": "tool_result", "content": "output"})]; + let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); + assert_eq!(msgs.len(), 1); + assert!(msgs[0].tool_call_id.is_none()); + assert_eq!(msgs[0].content, "output"); + } + + #[test] + fn parse_tool_results_null_content() { + let content = vec![json!({"type": "tool_result", "tool_use_id": "id1"})]; + let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].content, ""); + } + + #[test] + fn parse_tool_results_other_json_content() { + let content = vec![json!({ + "type": "tool_result", + "tool_use_id": "id1", + "content": {"nested": "object"} + })]; + let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); + assert_eq!(msgs.len(), 1); + // Falls through to serde_json::to_string + assert!(!msgs[0].content.is_empty()); + } + + #[test] + fn parse_assistant_message_empty_content_array() { + let content: Vec = vec![]; + let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].content, ""); + assert!(msgs[0].tool_calls.is_none()); + } + + #[test] + fn parse_assistant_message_unknown_block_type() { + let content = vec![ + json!({"type": "image", "source": {"type": "base64"}}), + json!({"type": "text", "text": "done"}), + ]; + let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].content, "done"); + } + + #[test] + fn parse_assistant_message_tool_use_without_id() { + let content = vec![json!({ + "type": "tool_use", + "name": "Bash", + "input": {"command": "ls"} + })]; + let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); + assert_eq!(msgs.len(), 1); + let calls = msgs[0].tool_calls.as_ref().unwrap(); + assert_eq!(calls.len(), 1); + assert!(calls[0].id.is_none()); + assert_eq!(calls[0].function.name, "Bash"); + } + + #[test] + fn parse_assistant_message_tool_use_without_input_defaults_to_empty_object() { + let content = vec![json!({"type": "tool_use", "id": "tid", "name": "Read"})]; + let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); + let calls = msgs[0].tool_calls.as_ref().unwrap(); + let args: serde_json::Value = serde_json::from_str(&calls[0].function.arguments).unwrap(); + assert!(args.is_object()); + assert!(args.as_object().unwrap().is_empty()); + } + + type Channels = ( + tokio::sync::mpsc::UnboundedSender, + tokio::sync::mpsc::UnboundedReceiver, + tokio::sync::mpsc::UnboundedSender, + tokio::sync::mpsc::UnboundedReceiver, + tokio::sync::mpsc::UnboundedSender, + tokio::sync::mpsc::UnboundedReceiver, + std::sync::mpsc::Sender, + std::sync::mpsc::Receiver, + ); + + fn make_channels() -> Channels { + let (tok_tx, tok_rx) = tokio::sync::mpsc::unbounded_channel(); + let (thi_tx, thi_rx) = tokio::sync::mpsc::unbounded_channel(); + let (act_tx, act_rx) = tokio::sync::mpsc::unbounded_channel(); + let (msg_tx, msg_rx) = std::sync::mpsc::channel(); + ( + tok_tx, tok_rx, thi_tx, thi_rx, act_tx, act_rx, msg_tx, msg_rx, + ) + } +}