//! Claude Code provider — runs Claude Code CLI in a PTY and parses structured output. #![allow(unused_imports, dead_code)] use crate::slog; use portable_pty::{CommandBuilder, PtySize, native_pty_system}; use std::io::{BufRead, BufReader}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use tokio::sync::watch; use crate::llm::types::{FunctionCall, Message, Role, ToolCall}; /// Result from a Claude Code session containing structured messages. pub struct ClaudeCodeResult { /// The conversation messages produced by Claude Code, including assistant /// turns (with optional tool_calls) and tool result turns. pub messages: Vec, /// Session ID for conversation resumption on subsequent requests. pub session_id: Option, } /// Manages a Claude Code session via a pseudo-terminal. /// /// Spawns `claude -p` in a PTY so isatty() returns true (which may /// influence billing), while using `--output-format stream-json` to /// get clean, structured NDJSON output instead of TUI escape sequences. /// /// Supports session resumption: if a `session_id` is provided, passes /// `--resume ` so Claude Code loads the prior conversation transcript /// from disk and continues with full context. /// /// Permissions are delegated to the MCP `prompt_permission` tool via /// `--permission-prompt-tool`, so Claude Code calls back into the server /// when a tool requires user approval. The frontend dialog handles the UX. mod events; mod parse; use events::{handle_stream_event, process_json_event}; pub struct ClaudeCodeProvider; impl ClaudeCodeProvider { pub fn new() -> Self { Self } #[allow(clippy::too_many_arguments)] pub async fn chat_stream( &self, user_message: &str, project_root: &str, session_id: Option<&str>, system_prompt: Option<&str>, cancel_rx: &mut watch::Receiver, mut on_token: F, mut on_thinking: T, mut on_activity: A, ) -> Result where F: FnMut(&str) + Send, T: FnMut(&str) + Send, A: FnMut(&str) + Send, { let cancelled = Arc::new(AtomicBool::new(false)); let cancelled_clone = cancelled.clone(); let mut cancel_watch = cancel_rx.clone(); tokio::spawn(async move { while cancel_watch.changed().await.is_ok() { if *cancel_watch.borrow() { cancelled_clone.store(true, Ordering::Relaxed); break; } } }); // Attempt up to 2 times: first try, then retry after OAuth refresh. for attempt in 0..2 { let message = user_message.to_string(); let cwd = project_root.to_string(); let resume_id = session_id.map(|s| s.to_string()); let sys_prompt = system_prompt.map(|s| s.to_string()); let cancelled_inner = cancelled.clone(); let auth_failed = Arc::new(AtomicBool::new(false)); let auth_failed_clone = auth_failed.clone(); let (token_tx, mut token_rx) = tokio::sync::mpsc::unbounded_channel::(); let (thinking_tx, mut thinking_rx) = tokio::sync::mpsc::unbounded_channel::(); let (activity_tx, mut activity_rx) = tokio::sync::mpsc::unbounded_channel::(); let (msg_tx, msg_rx) = std::sync::mpsc::channel::(); let (sid_tx, sid_rx) = tokio::sync::oneshot::channel::(); let pty_handle = tokio::task::spawn_blocking(move || { run_pty_session( &message, &cwd, resume_id.as_deref(), sys_prompt.as_deref(), cancelled_inner, auth_failed_clone, token_tx, thinking_tx, activity_tx, msg_tx, sid_tx, ) }); loop { tokio::select! { msg = token_rx.recv() => match msg { Some(t) => on_token(&t), None => break, }, msg = thinking_rx.recv() => if let Some(t) = msg { on_thinking(&t); }, msg = activity_rx.recv() => if let Some(name) = msg { on_activity(&name); }, } } // Drain any remaining activity/thinking messages that were buffered // when the token channel closed. while let Ok(t) = thinking_rx.try_recv() { on_thinking(&t); } while let Ok(name) = activity_rx.try_recv() { on_activity(&name); } pty_handle .await .map_err(|e| format!("PTY task panicked: {e}"))??; // Check if the PTY session failed due to expired OAuth token. if auth_failed.load(Ordering::Relaxed) && attempt == 0 { slog!("[oauth] Authentication failed, attempting token refresh"); match crate::llm::oauth::refresh_access_token().await { Ok(()) => { slog!("[oauth] Token refreshed, retrying request"); on_token("\n*Refreshing authentication token...*\n"); continue; } Err(_e) => { let port = crate::http::resolve_port(); let login_url = format!("http://localhost:{port}/oauth/authorize"); return Err(format!( "OAuth session expired or credentials missing. Please log in: {login_url}" )); } } } let captured_session_id = sid_rx.await.ok(); slog!("[pty-debug] RECEIVED session_id: {:?}", captured_session_id); let structured_messages: Vec = msg_rx.try_iter().collect(); return Ok(ClaudeCodeResult { messages: structured_messages, session_id: captured_session_id, }); } // Should never reach here, but just in case. Err("Authentication failed after retry".to_string()) } } /// Run `claude -p` with stream-json output inside a PTY. /// /// The PTY makes isatty() return true. The `-p` flag gives us /// single-shot non-interactive mode with structured output. /// /// Sends streaming text tokens via `token_tx` for real-time display, and /// complete structured `Message` values via `msg_tx` for the final message /// history (assistant turns with tool_calls, and tool result turns). /// /// Permission handling is delegated to the MCP `prompt_permission` tool /// via `--permission-prompt-tool`. Claude Code calls the MCP tool when it /// needs user approval, and the server bridges the request to the frontend. #[allow(clippy::too_many_arguments)] fn run_pty_session( user_message: &str, cwd: &str, resume_session_id: Option<&str>, _system_prompt: Option<&str>, cancelled: Arc, auth_failed: Arc, token_tx: tokio::sync::mpsc::UnboundedSender, thinking_tx: tokio::sync::mpsc::UnboundedSender, activity_tx: tokio::sync::mpsc::UnboundedSender, msg_tx: std::sync::mpsc::Sender, sid_tx: tokio::sync::oneshot::Sender, ) -> Result<(), String> { let pty_system = native_pty_system(); let pair = pty_system .openpty(PtySize { rows: 50, cols: 200, pixel_width: 0, pixel_height: 0, }) .map_err(|e| format!("Failed to open PTY: {e}"))?; let mut cmd = CommandBuilder::new("claude"); cmd.arg("-p"); cmd.arg(user_message); if let Some(sid) = resume_session_id { cmd.arg("--resume"); cmd.arg(sid); } cmd.arg("--output-format"); cmd.arg("stream-json"); cmd.arg("--verbose"); // Enable partial streaming events so we receive stream_event messages // containing raw API events (content_block_start, content_block_delta, // etc.). Without this flag, only complete assistant/user/result events // are emitted and tool-start activity signals never fire. cmd.arg("--include-partial-messages"); // Delegate permission decisions to the MCP prompt_permission tool. // Claude Code will call this tool via the huskies MCP server when // a tool requires user approval, instead of using PTY stdin/stdout. cmd.arg("--permission-prompt-tool"); cmd.arg("mcp__huskies__prompt_permission"); // Note: --system is not a valid Claude Code CLI flag. System-level // instructions (like bot name) are prepended to the user prompt instead. cmd.cwd(cwd); // Keep TERM reasonable but disable color cmd.env("NO_COLOR", "1"); // Allow nested spawning when the server itself runs inside Claude Code cmd.env("CLAUDECODE", ""); slog!( "[pty-debug] Spawning: claude -p \"{}\" {} --output-format stream-json --verbose --include-partial-messages --permission-prompt-tool mcp__huskies__prompt_permission", user_message, resume_session_id .map(|s| format!("--resume {s}")) .unwrap_or_default() ); let mut child = pair .slave .spawn_command(cmd) .map_err(|e| format!("Failed to spawn claude: {e}"))?; slog!("[pty-debug] Process spawned, pid: {:?}", child.process_id()); drop(pair.slave); let reader = pair .master .try_clone_reader() .map_err(|e| format!("Failed to clone PTY reader: {e}"))?; // We no longer need the writer — permission responses flow through MCP, // not PTY stdin. Drop it so the PTY sees EOF on stdin when appropriate. drop(pair.master); // Read NDJSON lines from stdout let (line_tx, line_rx) = std::sync::mpsc::channel::>(); let reader_handle = std::thread::spawn(move || { let buf_reader = BufReader::new(reader); slog!("[pty-debug] Reader thread started"); for line in buf_reader.lines() { match line { Ok(l) => { slog!("[pty-debug] raw line: {}", l); if line_tx.send(Some(l)).is_err() { break; } } Err(e) => { slog!("[pty-debug] read error: {e}"); let _ = line_tx.send(None); break; } } } slog!("[pty-debug] Reader thread done"); let _ = line_tx.send(None); }); let mut got_result = false; let mut sid_tx = Some(sid_tx); loop { if cancelled.load(Ordering::Relaxed) { let _ = child.kill(); let _ = child.wait(); let _ = reader_handle.join(); return Err("Cancelled".to_string()); } match line_rx.recv_timeout(std::time::Duration::from_millis(500)) { Ok(Some(line)) => { let trimmed = line.trim(); if trimmed.is_empty() { continue; } let mut end = trimmed.len().min(120); while !trimmed.is_char_boundary(end) { end -= 1; } slog!("[pty-debug] processing: {}...", &trimmed[..end]); // Try to parse as JSON if let Ok(json) = serde_json::from_str::(trimmed) && process_json_event( &json, &token_tx, &thinking_tx, &activity_tx, &msg_tx, &mut sid_tx, &auth_failed, ) { got_result = true; } // Ignore non-JSON lines (terminal escape sequences) if got_result { break; } } Ok(None) => break, // EOF Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { // Check if child has exited if let Ok(Some(_status)) = child.try_wait() { // Drain remaining lines through the same dispatch path // (process_json_event) so activity signals fire correctly. while let Ok(Some(line)) = line_rx.try_recv() { let trimmed = line.trim(); if let Ok(json) = serde_json::from_str::(trimmed) { process_json_event( &json, &token_tx, &thinking_tx, &activity_tx, &msg_tx, &mut sid_tx, &auth_failed, ); } } break; } } Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, } // Don't set got_result here — just let the process finish naturally let _ = got_result; } // Wait briefly for Claude Code to flush its session transcript to disk. // The `result` event means the API response is done, but the process // still needs to write the conversation to the JSONL session file. match child.try_wait() { Ok(Some(_)) => {} // Already exited _ => { // Give it up to 2 seconds to exit cleanly for _ in 0..20 { std::thread::sleep(std::time::Duration::from_millis(100)); if let Ok(Some(_)) = child.try_wait() { break; } } // If still running after 2s, kill it let _ = child.kill(); let _ = child.wait(); } } // Wait for the reader thread to release the cloned PTY master fd. let _ = reader_handle.join(); Ok(()) } /// Dispatch a single parsed JSON event to the appropriate handler. /// /// Returns `true` if a `result` event was received (signals session completion). /// Captures the session ID from the first event that carries it. #[cfg(test)] mod tests { use super::*; #[test] fn claude_code_provider_new() { let _provider = ClaudeCodeProvider::new(); } }