use portable_pty::{CommandBuilder, PtySize, native_pty_system}; use std::io::{BufRead, BufReader}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use tokio::sync::watch; use crate::llm::types::{FunctionCall, Message, Role, ToolCall}; /// Result from a Claude Code session containing structured messages. pub struct ClaudeCodeResult { /// The conversation messages produced by Claude Code, including assistant /// turns (with optional tool_calls) and tool result turns. pub messages: Vec, /// Session ID for conversation resumption on subsequent requests. pub session_id: Option, } /// Manages a Claude Code session via a pseudo-terminal. /// /// Spawns `claude -p` in a PTY so isatty() returns true (which may /// influence billing), while using `--output-format stream-json` to /// get clean, structured NDJSON output instead of TUI escape sequences. /// /// Supports session resumption: if a `session_id` is provided, passes /// `--resume ` so Claude Code loads the prior conversation transcript /// from disk and continues with full context. pub struct ClaudeCodeProvider; impl ClaudeCodeProvider { pub fn new() -> Self { Self } pub async fn chat_stream( &self, user_message: &str, project_root: &str, session_id: Option<&str>, cancel_rx: &mut watch::Receiver, mut on_token: F, ) -> Result where F: FnMut(&str) + Send, { let message = user_message.to_string(); let cwd = project_root.to_string(); let resume_id = session_id.map(|s| s.to_string()); let cancelled = Arc::new(AtomicBool::new(false)); let cancelled_clone = cancelled.clone(); let mut cancel_watch = cancel_rx.clone(); tokio::spawn(async move { while cancel_watch.changed().await.is_ok() { if *cancel_watch.borrow() { cancelled_clone.store(true, Ordering::Relaxed); break; } } }); let (token_tx, mut token_rx) = tokio::sync::mpsc::unbounded_channel::(); let (msg_tx, msg_rx) = std::sync::mpsc::channel::(); let (sid_tx, sid_rx) = tokio::sync::oneshot::channel::(); let pty_handle = tokio::task::spawn_blocking(move || { run_pty_session( &message, &cwd, resume_id.as_deref(), cancelled, token_tx, msg_tx, sid_tx, ) }); while let Some(token) = token_rx.recv().await { on_token(&token); } pty_handle .await .map_err(|e| format!("PTY task panicked: {e}"))??; let captured_session_id = sid_rx.await.ok(); let structured_messages: Vec = msg_rx.try_iter().collect(); Ok(ClaudeCodeResult { messages: structured_messages, session_id: captured_session_id, }) } } /// Run `claude -p` with stream-json output inside a PTY. /// /// The PTY makes isatty() return true. The `-p` flag gives us /// single-shot non-interactive mode with structured output. /// /// Sends streaming text tokens via `token_tx` for real-time display, and /// complete structured `Message` values via `msg_tx` for the final message /// history (assistant turns with tool_calls, and tool result turns). fn run_pty_session( user_message: &str, cwd: &str, resume_session_id: Option<&str>, cancelled: Arc, token_tx: tokio::sync::mpsc::UnboundedSender, msg_tx: std::sync::mpsc::Sender, sid_tx: tokio::sync::oneshot::Sender, ) -> Result<(), String> { let pty_system = native_pty_system(); let pair = pty_system .openpty(PtySize { rows: 50, cols: 200, pixel_width: 0, pixel_height: 0, }) .map_err(|e| format!("Failed to open PTY: {e}"))?; let mut cmd = CommandBuilder::new("claude"); cmd.arg("-p"); cmd.arg(user_message); if let Some(sid) = resume_session_id { cmd.arg("--resume"); cmd.arg(sid); } cmd.arg("--output-format"); cmd.arg("stream-json"); cmd.arg("--verbose"); cmd.cwd(cwd); // Keep TERM reasonable but disable color cmd.env("NO_COLOR", "1"); // Allow nested spawning when the server itself runs inside Claude Code cmd.env("CLAUDECODE", ""); eprintln!( "[pty-debug] Spawning: claude -p \"{}\" {} --output-format stream-json --verbose", user_message, resume_session_id .map(|s| format!("--resume {s}")) .unwrap_or_default() ); let mut child = pair .slave .spawn_command(cmd) .map_err(|e| format!("Failed to spawn claude: {e}"))?; eprintln!( "[pty-debug] Process spawned, pid: {:?}", child.process_id() ); drop(pair.slave); let reader = pair .master .try_clone_reader() .map_err(|e| format!("Failed to clone PTY reader: {e}"))?; // We don't need to write anything — -p mode takes prompt as arg drop(pair.master); // Read NDJSON lines from stdout let (line_tx, line_rx) = std::sync::mpsc::channel::>(); std::thread::spawn(move || { let buf_reader = BufReader::new(reader); eprintln!("[pty-debug] Reader thread started"); for line in buf_reader.lines() { match line { Ok(l) => { eprintln!("[pty-debug] raw line: {}", l); if line_tx.send(Some(l)).is_err() { break; } } Err(e) => { eprintln!("[pty-debug] read error: {e}"); let _ = line_tx.send(None); break; } } } eprintln!("[pty-debug] Reader thread done"); let _ = line_tx.send(None); }); let mut got_result = false; let mut sid_tx = Some(sid_tx); loop { if cancelled.load(Ordering::Relaxed) { let _ = child.kill(); return Err("Cancelled".to_string()); } match line_rx.recv_timeout(std::time::Duration::from_millis(500)) { Ok(Some(line)) => { let trimmed = line.trim(); if trimmed.is_empty() { continue; } eprintln!( "[pty-debug] processing: {}...", &trimmed[..trimmed.len().min(120)] ); // Try to parse as JSON if let Ok(json) = serde_json::from_str::(trimmed) && let Some(event_type) = json.get("type").and_then(|t| t.as_str()) { // Capture session_id from any event that has it if let Some(tx) = sid_tx.take() { if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) { let _ = tx.send(sid.to_string()); } else { // Put it back if this event didn't have a session_id sid_tx = Some(tx); } } match event_type { // Streaming deltas — used for real-time text display only "stream_event" => { if let Some(event) = json.get("event") { handle_stream_event(event, &token_tx); } } // Complete assistant message — extract text and tool_use blocks "assistant" => { if let Some(message) = json.get("message") && let Some(content) = message.get("content").and_then(|c| c.as_array()) { parse_assistant_message(content, &msg_tx); } } // User message containing tool results from Claude Code's own execution "user" => { if let Some(message) = json.get("message") && let Some(content) = message.get("content").and_then(|c| c.as_array()) { parse_tool_results(content, &msg_tx); } } // Final result with usage stats "result" => { got_result = true; } // System init — log billing info via streaming display "system" => { let api_source = json .get("apiKeySource") .and_then(|s| s.as_str()) .unwrap_or("unknown"); let model = json .get("model") .and_then(|s| s.as_str()) .unwrap_or("unknown"); let _ = token_tx .send(format!("_[{model} | apiKey: {api_source}]_\n\n")); } // Rate limit info — surface briefly in streaming display "rate_limit_event" => { if let Some(info) = json.get("rate_limit_info") { let status = info .get("status") .and_then(|s| s.as_str()) .unwrap_or("unknown"); let limit_type = info .get("rateLimitType") .and_then(|s| s.as_str()) .unwrap_or("unknown"); let _ = token_tx.send(format!( "_[rate limit: {status} ({limit_type})]_\n\n" )); } } _ => {} } } // Ignore non-JSON lines (terminal escape sequences) if got_result { break; } } Ok(None) => break, // EOF Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { // Check if child has exited if let Ok(Some(_status)) = child.try_wait() { // Drain remaining lines while let Ok(Some(line)) = line_rx.try_recv() { let trimmed = line.trim(); if let Ok(json) = serde_json::from_str::(trimmed) && let Some(event_type) = json.get("type").and_then(|t| t.as_str()) { match event_type { "stream_event" => { if let Some(event) = json.get("event") { handle_stream_event(event, &token_tx); } } "assistant" => { if let Some(message) = json.get("message") && let Some(content) = message .get("content") .and_then(|c| c.as_array()) { parse_assistant_message(content, &msg_tx); } } "user" => { if let Some(message) = json.get("message") && let Some(content) = message .get("content") .and_then(|c| c.as_array()) { parse_tool_results(content, &msg_tx); } } _ => {} } } } break; } } Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, } // Don't set got_result here — just let the process finish naturally let _ = got_result; } // Wait briefly for Claude Code to flush its session transcript to disk. // The `result` event means the API response is done, but the process // still needs to write the conversation to the JSONL session file. match child.try_wait() { Ok(Some(_)) => {} // Already exited _ => { // Give it up to 2 seconds to exit cleanly for _ in 0..20 { std::thread::sleep(std::time::Duration::from_millis(100)); if let Ok(Some(_)) = child.try_wait() { break; } } // If still running after 2s, kill it let _ = child.kill(); } } Ok(()) } /// Parse a complete `assistant` message content array. /// /// Extracts text blocks into `content` and tool_use blocks into `tool_calls`, /// then sends a single `Message { role: Assistant }` via `msg_tx`. /// This is the authoritative source for the final message structure — streaming /// text deltas (via `handle_stream_event`) are only used for the live display. fn parse_assistant_message( content: &[serde_json::Value], msg_tx: &std::sync::mpsc::Sender, ) { let mut text = String::new(); let mut tool_calls: Vec = Vec::new(); for block in content { match block.get("type").and_then(|t| t.as_str()) { Some("text") => { if let Some(t) = block.get("text").and_then(|t| t.as_str()) { text.push_str(t); } } Some("tool_use") => { let id = block .get("id") .and_then(|v| v.as_str()) .map(|s| s.to_string()); let name = block .get("name") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); let input = block .get("input") .cloned() .unwrap_or(serde_json::Value::Object(serde_json::Map::new())); let arguments = serde_json::to_string(&input).unwrap_or_default(); tool_calls.push(ToolCall { id, function: FunctionCall { name, arguments }, kind: "function".to_string(), }); } _ => {} } } let msg = Message { role: Role::Assistant, content: text, tool_calls: if tool_calls.is_empty() { None } else { Some(tool_calls) }, tool_call_id: None, }; let _ = msg_tx.send(msg); } /// Parse a `user` message containing tool_result blocks. /// /// Claude Code injects tool results into the conversation as `user` role /// messages. Each `tool_result` block becomes a separate `Message { role: Tool }`. fn parse_tool_results( content: &[serde_json::Value], msg_tx: &std::sync::mpsc::Sender, ) { for block in content { if block.get("type").and_then(|t| t.as_str()) != Some("tool_result") { continue; } let tool_use_id = block .get("tool_use_id") .and_then(|v| v.as_str()) .map(|s| s.to_string()); // `content` in a tool_result can be a plain string or an array of content blocks let content_str = match block.get("content") { Some(serde_json::Value::String(s)) => s.clone(), Some(serde_json::Value::Array(arr)) => { // Extract text from content block array arr.iter() .filter_map(|b| { if b.get("type").and_then(|t| t.as_str()) == Some("text") { b.get("text").and_then(|t| t.as_str()).map(|s| s.to_string()) } else { None } }) .collect::>() .join("\n") } Some(other) => serde_json::to_string(other).unwrap_or_default(), None => String::new(), }; let _ = msg_tx.send(Message { role: Role::Tool, content: content_str, tool_calls: None, tool_call_id: tool_use_id, }); } } /// Extract text from a stream event and send to the token channel for live display. /// /// Stream events provide incremental text deltas for real-time rendering. /// The authoritative final message content comes from complete `assistant` events. fn handle_stream_event( event: &serde_json::Value, token_tx: &tokio::sync::mpsc::UnboundedSender, ) { let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or(""); match event_type { // Text content streaming — only text_delta, not input_json_delta (tool args) "content_block_delta" => { if let Some(delta) = event.get("delta") { let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or(""); match delta_type { "text_delta" => { if let Some(text) = delta.get("text").and_then(|t| t.as_str()) { let _ = token_tx.send(text.to_string()); } } "thinking_delta" => { if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) { let _ = token_tx.send(format!("[thinking] {thinking}")); } } _ => {} } } } // Log errors via streaming display "error" => { if let Some(error) = event.get("error") { let msg = error .get("message") .and_then(|m| m.as_str()) .unwrap_or("unknown error"); let _ = token_tx.send(format!("\n[error: {msg}]\n")); } } _ => {} } } #[cfg(test)] mod tests { use super::*; use serde_json::json; fn collect_messages( f: impl Fn(&std::sync::mpsc::Sender), ) -> Vec { let (tx, rx) = std::sync::mpsc::channel(); f(&tx); drop(tx); rx.try_iter().collect() } #[test] fn parse_assistant_message_text_only() { let content = vec![json!({"type": "text", "text": "Hello, world!"})]; let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].role, Role::Assistant); assert_eq!(msgs[0].content, "Hello, world!"); assert!(msgs[0].tool_calls.is_none()); } #[test] fn parse_assistant_message_with_tool_use() { let content = vec![ json!({"type": "text", "text": "I'll read that file."}), json!({ "type": "tool_use", "id": "toolu_abc123", "name": "Read", "input": {"file_path": "/src/main.rs"} }), ]; let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); assert_eq!(msgs.len(), 1); let msg = &msgs[0]; assert_eq!(msg.role, Role::Assistant); assert_eq!(msg.content, "I'll read that file."); let tool_calls = msg.tool_calls.as_ref().expect("should have tool calls"); assert_eq!(tool_calls.len(), 1); assert_eq!(tool_calls[0].id.as_deref(), Some("toolu_abc123")); assert_eq!(tool_calls[0].function.name, "Read"); let args: serde_json::Value = serde_json::from_str(&tool_calls[0].function.arguments).unwrap(); assert_eq!(args["file_path"], "/src/main.rs"); } #[test] fn parse_assistant_message_multiple_tool_uses() { let content = vec![ json!({"type": "tool_use", "id": "id1", "name": "Glob", "input": {"pattern": "*.rs"}}), json!({"type": "tool_use", "id": "id2", "name": "Bash", "input": {"command": "cargo test"}}), ]; let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); assert_eq!(msgs.len(), 1); let tool_calls = msgs[0].tool_calls.as_ref().unwrap(); assert_eq!(tool_calls.len(), 2); assert_eq!(tool_calls[0].function.name, "Glob"); assert_eq!(tool_calls[1].function.name, "Bash"); } #[test] fn parse_tool_results_string_content() { let content = vec![json!({ "type": "tool_result", "tool_use_id": "toolu_abc123", "content": "fn main() { println!(\"hello\"); }" })]; let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].role, Role::Tool); assert_eq!(msgs[0].content, "fn main() { println!(\"hello\"); }"); assert_eq!(msgs[0].tool_call_id.as_deref(), Some("toolu_abc123")); } #[test] fn parse_tool_results_array_content() { let content = vec![json!({ "type": "tool_result", "tool_use_id": "toolu_xyz", "content": [ {"type": "text", "text": "Line 1"}, {"type": "text", "text": "Line 2"} ] })]; let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].content, "Line 1\nLine 2"); assert_eq!(msgs[0].tool_call_id.as_deref(), Some("toolu_xyz")); } #[test] fn parse_tool_results_multiple_results() { let content = vec![ json!({"type": "tool_result", "tool_use_id": "id1", "content": "result1"}), json!({"type": "tool_result", "tool_use_id": "id2", "content": "result2"}), ]; let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); assert_eq!(msgs.len(), 2); assert_eq!(msgs[0].tool_call_id.as_deref(), Some("id1")); assert_eq!(msgs[1].tool_call_id.as_deref(), Some("id2")); } #[test] fn parse_tool_results_ignores_non_tool_result_blocks() { let content = vec![ json!({"type": "text", "text": "not a tool result"}), json!({"type": "tool_result", "tool_use_id": "id1", "content": "actual result"}), ]; let msgs = collect_messages(|tx| parse_tool_results(&content, tx)); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].tool_call_id.as_deref(), Some("id1")); } #[test] fn parse_assistant_message_empty_text_with_tool_use() { // When a message has only tool_use (no text), content should be empty string let content = vec![json!({ "type": "tool_use", "id": "toolu_1", "name": "Write", "input": {"file_path": "foo.rs", "content": "fn foo() {}"} })]; let msgs = collect_messages(|tx| parse_assistant_message(&content, tx)); assert_eq!(msgs.len(), 1); assert_eq!(msgs[0].content, ""); assert!(msgs[0].tool_calls.is_some()); } #[test] fn handle_stream_event_text_delta_sends_token() { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let event = json!({ "type": "content_block_delta", "delta": {"type": "text_delta", "text": "hello "} }); handle_stream_event(&event, &tx); drop(tx); let tokens: Vec<_> = { let mut v = vec![]; while let Ok(t) = rx.try_recv() { v.push(t); } v }; assert_eq!(tokens, vec!["hello "]); } #[test] fn handle_stream_event_input_json_delta_not_sent() { // Tool argument JSON deltas should NOT be sent as text tokens let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let event = json!({ "type": "content_block_delta", "delta": {"type": "input_json_delta", "partial_json": "{\"path\":"} }); handle_stream_event(&event, &tx); drop(tx); let tokens: Vec = { let mut v = vec![]; while let Ok(t) = rx.try_recv() { v.push(t); } v }; assert!(tokens.is_empty()); } }