//! PTY process spawning and output loop: builds the command, drives the reader thread, //! and dispatches parsed JSON events to the broadcast channel. use std::collections::HashMap; use std::io::{BufRead, BufReader}; use std::sync::{Arc, Mutex}; use portable_pty::{ChildKiller, CommandBuilder, PtySize, native_pty_system}; use tokio::sync::broadcast; use crate::agent_log::AgentLogWriter; use crate::agents::{AgentEvent, TokenUsage}; use crate::io::watcher::WatcherEvent; use crate::slog; use crate::slog_warn; use super::events::{emit_event, handle_agent_stream_event}; use super::types::{ChildKillerGuard, PtyResult, composite_key}; /// Spawn claude agent in a PTY and stream events through the broadcast channel. /// /// ## Bug 645: `output.write(&bytes).is_ok()` assertion in Claude Code CLI /// /// The Claude Code CLI can panic with an `output.write(&bytes).is_ok()` assertion /// when writing to its stdout (the PTY slave end). This occurs inside the child /// process — not in this server code — when the PTY pipe breaks or fills. The /// `output` in the assertion is the CLI's stdout writer, and the write fails when /// the PTY master side is closed or the kernel pipe buffer is exhausted. /// /// When this happens, the child process dies, the PTY reader thread in this /// function receives EOF, and `run_agent_pty_blocking` returns `Ok(PtyResult)`. /// The server then runs completion gates via `run_server_owned_completion`. /// /// If the agent committed valid work before crashing, the "work survived" check /// in `pipeline::advance` detects the committed code and advances the story to /// QA instead of entering the retry/block path. #[allow(clippy::too_many_arguments)] pub(in crate::agents) async fn run_agent_pty_streaming( story_id: &str, agent_name: &str, command: &str, args: &[String], prompt: &str, cwd: &str, tx: &broadcast::Sender, event_log: &Arc>>, log_writer: Option>>, inactivity_timeout_secs: u64, child_killers: Arc>>>, watcher_tx: broadcast::Sender, session_id_to_resume: Option<&str>, ) -> Result { let sid = story_id.to_string(); let aname = agent_name.to_string(); let cmd = command.to_string(); let args = args.to_vec(); let prompt = prompt.to_string(); let cwd = cwd.to_string(); let tx = tx.clone(); let event_log = event_log.clone(); let resume_sid = session_id_to_resume.map(str::to_string); tokio::task::spawn_blocking(move || { run_agent_pty_blocking( &sid, &aname, &cmd, &args, &prompt, &cwd, &tx, &event_log, log_writer.as_deref(), inactivity_timeout_secs, &child_killers, &watcher_tx, resume_sid.as_deref(), ) }) .await .map_err(|e| format!("Agent task panicked: {e}"))? } #[allow(clippy::too_many_arguments)] fn run_agent_pty_blocking( story_id: &str, agent_name: &str, command: &str, args: &[String], prompt: &str, cwd: &str, tx: &broadcast::Sender, event_log: &Mutex>, log_writer: Option<&Mutex>, inactivity_timeout_secs: u64, child_killers: &Arc>>>, watcher_tx: &broadcast::Sender, session_id_to_resume: Option<&str>, ) -> Result { let pty_system = native_pty_system(); let pair = pty_system .openpty(PtySize { rows: 50, cols: 200, pixel_width: 0, pixel_height: 0, }) .map_err(|e| format!("Failed to open PTY: {e}"))?; let mut cmd = CommandBuilder::new(command); // Launch mode: resume an existing session or start fresh. if let Some(sid) = session_id_to_resume { // Resume: --resume restores previous conversation context. // The failure context (or empty string) is sent as a new message via -p. // Always pass -p so --include-partial-messages (added below) works: // claude CLI requires --print/-p to be set when --include-partial-messages // is used, regardless of whether the prompt is empty. cmd.arg("--resume"); cmd.arg(sid); cmd.arg("-p"); cmd.arg(prompt); } else { // Fresh session: deliver the full rendered prompt via -p. cmd.arg("-p"); cmd.arg(prompt); } // Add configured args (e.g., --directory /path/to/worktree, --model, etc.) for arg in args { cmd.arg(arg); } cmd.arg("--output-format"); cmd.arg("stream-json"); cmd.arg("--verbose"); // Enable partial streaming so we receive thinking_delta and text_delta // events in real-time, rather than only complete assistant events. // Without this, thinking traces may not appear in the structured output // and instead leak as unstructured PTY text. cmd.arg("--include-partial-messages"); // Agents use acceptEdits so file edits are auto-approved while other // tools (e.g. Bash) trigger the permission prompt tool, which auto-denies // for agents. The worktree's .claude/settings.json allowlist further // controls which tools are pre-approved. cmd.arg("--permission-mode"); cmd.arg("acceptEdits"); cmd.arg("--permission-prompt-tool"); cmd.arg("mcp__huskies__prompt_permission"); cmd.cwd(cwd); cmd.env("NO_COLOR", "1"); // Allow spawning Claude Code from within a Claude Code session cmd.env_remove("CLAUDECODE"); cmd.env_remove("CLAUDE_CODE_ENTRYPOINT"); // Count existing session files for this worktree to detect budget exhaustion. let session_dir = format!( "/home/huskies/.claude/projects/-workspace--huskies-worktrees-{}/", story_id.replace(['_', '.'], "-") ); let session_count = std::fs::read_dir(&session_dir) .map(|d| { d.filter(|e| { e.as_ref() .map(|e| e.path().extension().is_some_and(|ext| ext == "jsonl")) .unwrap_or(false) }) .count() }) .unwrap_or(0); let session_bytes: u64 = std::fs::read_dir(&session_dir) .map(|d| { d.filter_map(|e| e.ok()) .filter(|e| e.path().extension().is_some_and(|ext| ext == "jsonl")) .filter_map(|e| e.metadata().ok()) .map(|m| m.len()) .sum() }) .unwrap_or(0); slog!( "[agent:{story_id}:{agent_name}] Spawning {command} in {cwd} with args: {args:?} \ (prior_sessions={session_count}, session_log_bytes={session_bytes})" ); let mut child = pair .slave .spawn_command(cmd) .map_err(|e| format!("Failed to spawn agent for {story_id}:{agent_name}: {e}"))?; // Register the child killer so that kill_all_children() / stop_agent() can // terminate this process on server shutdown, even if the blocking thread // cannot be interrupted. The ChildKillerGuard deregisters on function exit. let killer_key = composite_key(story_id, agent_name); { let killer = child.clone_killer(); if let Ok(mut killers) = child_killers.lock() { killers.insert(killer_key.clone(), killer); } } let _killer_guard = ChildKillerGuard { killers: Arc::clone(child_killers), key: killer_key, }; drop(pair.slave); let reader = pair .master .try_clone_reader() .map_err(|e| format!("Failed to clone PTY reader: {e}"))?; drop(pair.master); // Spawn a reader thread to collect PTY output lines. // We use a channel so the main thread can apply an inactivity deadline // via recv_timeout: if no output arrives within the configured window // the process is killed and the agent is marked Failed. let (line_tx, line_rx) = std::sync::mpsc::channel::>(); let sid_for_reader = story_id.to_string(); let aname_for_reader = agent_name.to_string(); let reader_handle = std::thread::spawn(move || { let buf_reader = BufReader::new(reader); for line in buf_reader.lines() { if line_tx.send(line).is_err() { break; } } slog!("[agent:{sid_for_reader}:{aname_for_reader}] Reader thread exiting"); }); let timeout_dur = if inactivity_timeout_secs > 0 { Some(std::time::Duration::from_secs(inactivity_timeout_secs)) } else { None }; let mut session_id: Option = None; let mut token_usage: Option = None; loop { let recv_result = match timeout_dur { Some(dur) => line_rx.recv_timeout(dur), None => line_rx .recv() .map_err(|_| std::sync::mpsc::RecvTimeoutError::Disconnected), }; let line = match recv_result { Ok(Ok(l)) => l, Ok(Err(_)) => { // IO error reading from PTY — treat as EOF. break; } Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => { // Reader thread exited (EOF from PTY). break; } Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { slog_warn!( "[agent:{story_id}:{agent_name}] Inactivity timeout after \ {inactivity_timeout_secs}s with no output. Killing process." ); let _ = child.kill(); let _ = child.wait(); return Err(format!( "Agent inactivity timeout: no output received for {inactivity_timeout_secs}s" )); } }; let trimmed = line.trim(); if trimmed.is_empty() { continue; } // Try to parse as JSON let json: serde_json::Value = match serde_json::from_str(trimmed) { Ok(j) => j, Err(_) => { // Non-JSON output (terminal escapes etc.) — send as raw output emit_event( AgentEvent::Output { story_id: story_id.to_string(), agent_name: agent_name.to_string(), text: trimmed.to_string(), }, tx, event_log, log_writer, ); continue; } }; let event_type = json.get("type").and_then(|t| t.as_str()).unwrap_or(""); match event_type { "system" => { session_id = json .get("session_id") .and_then(|s| s.as_str()) .map(|s| s.to_string()); } // With --include-partial-messages, thinking and text arrive // incrementally via stream_event → content_block_delta. Handle // them here for real-time streaming to the frontend. "stream_event" => { if let Some(event) = json.get("event") { handle_agent_stream_event( event, story_id, agent_name, tx, event_log, log_writer, ); } } // Complete assistant events are skipped for content extraction // because thinking and text already arrived via stream_event. // The raw JSON is still forwarded as AgentJson below. "assistant" | "user" => {} "rate_limit_event" => { let rate_limit_info = json.get("rate_limit_info"); let status = rate_limit_info .and_then(|i| i.get("status")) .and_then(|s| s.as_str()) .unwrap_or(""); let is_hard_block = !status.is_empty() && status != "allowed_warning"; let reset_at = rate_limit_info .and_then(|i| i.get("reset_at")) .and_then(|r| r.as_str()) .and_then(|r| chrono::DateTime::parse_from_rfc3339(r).ok()) .map(|dt| dt.with_timezone(&chrono::Utc)); if is_hard_block { let reset_at = match reset_at { Some(t) => { slog!( "[agent:{story_id}:{agent_name}] API rate limit hard block \ (status={status}); resets at {t}" ); t } None => { let default = chrono::Utc::now() + chrono::Duration::minutes(5); slog!( "[agent:{story_id}:{agent_name}] API rate limit hard block \ (status={status}); no reset_at in rate_limit_info, \ defaulting to 5-minute backoff ({default})" ); default } }; let _ = watcher_tx.send(WatcherEvent::RateLimitHardBlock { story_id: story_id.to_string(), agent_name: agent_name.to_string(), reset_at, }); } else { slog!( "[agent:{story_id}:{agent_name}] API rate limit warning received \ (status={status})" ); let _ = watcher_tx.send(WatcherEvent::RateLimitWarning { story_id: story_id.to_string(), agent_name: agent_name.to_string(), }); } } "result" => { // Extract token usage from the result event. if let Some(usage) = TokenUsage::from_result_event(&json) { slog!( "[agent:{story_id}:{agent_name}] Token usage: in={} out={} cache_create={} cache_read={} cost=${:.4}", usage.input_tokens, usage.output_tokens, usage.cache_creation_input_tokens, usage.cache_read_input_tokens, usage.total_cost_usd, ); token_usage = Some(usage); } } _ => {} } // Forward all JSON events emit_event( AgentEvent::AgentJson { story_id: story_id.to_string(), agent_name: agent_name.to_string(), data: json, }, tx, event_log, log_writer, ); } let _ = child.kill(); let wait_result = child.wait(); match &wait_result { Ok(status) => { slog!("[agent:{story_id}:{agent_name}] Child exited: {status:?}"); } Err(e) => { slog!("[agent:{story_id}:{agent_name}] Child wait error: {e}"); } } // Wait for the reader thread to finish so it releases the cloned PTY // master fd before we return. Without this, the next PTY spawn for the // same story can collide with a still-open fd from this session (#453). if let Err(e) = reader_handle.join() { slog!("[agent:{story_id}:{agent_name}] Reader thread panicked: {e:?}"); } // Log whether session was created — Session: None indicates CLI died // before emitting any events (possible causes: rate limit, budget // exhaustion, PTY write failure, CLI crash). if session_id.is_none() { slog_warn!( "[agent:{story_id}:{agent_name}] SESSION NONE: CLI exited without creating a session. \ Check for 'fatal runtime error' in agent logs. \ prior_sessions={session_count}, session_log_bytes={session_bytes}" ); } slog!( "[agent:{story_id}:{agent_name}] Done. Session: {:?}", session_id ); Ok(PtyResult { session_id, token_usage, }) }