refactor: split llm/providers/claude_code.rs into mod.rs + parse.rs + events.rs
The 1427-line claude_code.rs is split into: - parse.rs: parse_assistant_message + parse_tool_results + tests (332 lines) - events.rs: process_json_event + handle_stream_event + tests (749 lines) - mod.rs: doc, types (ClaudeCodeResult, ClaudeCodeProvider), chat_stream, run_pty_session (395 lines) Tests stay co-located. No behaviour change. All 44 claude_code tests pass; full suite green.
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,751 @@
|
||||
//! Claude Code event handling — streams JSON events from the CLI to the message channel.
|
||||
|
||||
use serde_json::Value;
|
||||
|
||||
use super::parse::{parse_assistant_message, parse_tool_results};
|
||||
use crate::llm::types::{Message, Role};
|
||||
use crate::slog;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
pub(super) fn process_json_event(
|
||||
json: &serde_json::Value,
|
||||
token_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||
thinking_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||
activity_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||
msg_tx: &std::sync::mpsc::Sender<Message>,
|
||||
sid_tx: &mut Option<tokio::sync::oneshot::Sender<String>>,
|
||||
auth_failed: &AtomicBool,
|
||||
) -> bool {
|
||||
let event_type = match json.get("type").and_then(|t| t.as_str()) {
|
||||
Some(t) => t,
|
||||
None => return false,
|
||||
};
|
||||
|
||||
// Capture session_id from the first event that carries it
|
||||
if let Some(tx) = sid_tx.take() {
|
||||
if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) {
|
||||
slog!("[pty-debug] CAPTURED session_id: {}", sid);
|
||||
let _ = tx.send(sid.to_string());
|
||||
} else {
|
||||
*sid_tx = Some(tx);
|
||||
}
|
||||
}
|
||||
|
||||
// Detect authentication_failed at the top level of any event.
|
||||
if json.get("error").and_then(|e| e.as_str()) == Some("authentication_failed") {
|
||||
slog!("[pty-debug] Detected authentication_failed error");
|
||||
auth_failed.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
match event_type {
|
||||
"stream_event" => {
|
||||
if let Some(event) = json.get("event") {
|
||||
handle_stream_event(event, token_tx, thinking_tx, activity_tx);
|
||||
}
|
||||
false
|
||||
}
|
||||
"assistant" => {
|
||||
if let Some(message) = json.get("message")
|
||||
&& let Some(content) = message.get("content").and_then(|c| c.as_array())
|
||||
{
|
||||
// Fire activity signals for tool_use blocks as a fallback path.
|
||||
// The primary path is via stream_event → content_block_start (real-time),
|
||||
// but assistant events also carry tool_use blocks and serve as a reliable
|
||||
// backup if stream_event delivery is delayed or missed.
|
||||
for block in content {
|
||||
if block.get("type").and_then(|t| t.as_str()) == Some("tool_use")
|
||||
&& let Some(name) = block.get("name").and_then(|n| n.as_str())
|
||||
{
|
||||
let _ = activity_tx.send(name.to_string());
|
||||
}
|
||||
}
|
||||
parse_assistant_message(content, msg_tx);
|
||||
}
|
||||
false
|
||||
}
|
||||
"user" => {
|
||||
if let Some(message) = json.get("message")
|
||||
&& let Some(content) = message.get("content").and_then(|c| c.as_array())
|
||||
{
|
||||
parse_tool_results(content, msg_tx);
|
||||
}
|
||||
false
|
||||
}
|
||||
"result" => true,
|
||||
// system, rate_limit_event, and unknown types are no-ops
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a complete `assistant` message content array.
|
||||
///
|
||||
/// Extracts text blocks into `content` and tool_use blocks into `tool_calls`,
|
||||
/// then sends a single `Message { role: Assistant }` via `msg_tx`.
|
||||
/// This is the authoritative source for the final message structure — streaming
|
||||
|
||||
pub(super) fn handle_stream_event(
|
||||
event: &serde_json::Value,
|
||||
token_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||
thinking_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||
activity_tx: &tokio::sync::mpsc::UnboundedSender<String>,
|
||||
) {
|
||||
let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
||||
|
||||
match event_type {
|
||||
"content_block_start" => {
|
||||
if let Some(cb) = event.get("content_block")
|
||||
&& cb.get("type").and_then(|t| t.as_str()) == Some("tool_use")
|
||||
&& let Some(name) = cb.get("name").and_then(|n| n.as_str())
|
||||
{
|
||||
let _ = activity_tx.send(name.to_string());
|
||||
}
|
||||
}
|
||||
// Text content streaming — only text_delta, not input_json_delta (tool args)
|
||||
"content_block_delta" => {
|
||||
if let Some(delta) = event.get("delta") {
|
||||
let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
||||
match delta_type {
|
||||
"text_delta" => {
|
||||
if let Some(text) = delta.get("text").and_then(|t| t.as_str()) {
|
||||
let _ = token_tx.send(text.to_string());
|
||||
}
|
||||
}
|
||||
"thinking_delta" => {
|
||||
if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) {
|
||||
let _ = thinking_tx.send(thinking.to_string());
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Log errors via streaming display
|
||||
"error" => {
|
||||
if let Some(error) = event.get("error") {
|
||||
let msg = error
|
||||
.get("message")
|
||||
.and_then(|m| m.as_str())
|
||||
.unwrap_or("unknown error");
|
||||
let _ = token_tx.send(format!("\n[error: {msg}]\n"));
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serde_json::json;
|
||||
|
||||
type Channels = (
|
||||
tokio::sync::mpsc::UnboundedSender<String>,
|
||||
tokio::sync::mpsc::UnboundedReceiver<String>,
|
||||
tokio::sync::mpsc::UnboundedSender<String>,
|
||||
tokio::sync::mpsc::UnboundedReceiver<String>,
|
||||
tokio::sync::mpsc::UnboundedSender<String>,
|
||||
tokio::sync::mpsc::UnboundedReceiver<String>,
|
||||
std::sync::mpsc::Sender<Message>,
|
||||
std::sync::mpsc::Receiver<Message>,
|
||||
);
|
||||
|
||||
fn make_channels() -> Channels {
|
||||
let (tok_tx, tok_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (thi_tx, thi_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (act_tx, act_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (msg_tx, msg_rx) = std::sync::mpsc::channel();
|
||||
(
|
||||
tok_tx, tok_rx, thi_tx, thi_rx, act_tx, act_rx, msg_tx, msg_rx,
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_stream_event_text_delta_sends_token() {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let event = json!({
|
||||
"type": "content_block_delta",
|
||||
"delta": {"type": "text_delta", "text": "hello "}
|
||||
});
|
||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||
drop(tx);
|
||||
let tokens: Vec<_> = {
|
||||
let mut v = vec![];
|
||||
while let Ok(t) = rx.try_recv() {
|
||||
v.push(t);
|
||||
}
|
||||
v
|
||||
};
|
||||
assert_eq!(tokens, vec!["hello "]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_stream_event_input_json_delta_not_sent() {
|
||||
// Tool argument JSON deltas should NOT be sent as text tokens
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let event = json!({
|
||||
"type": "content_block_delta",
|
||||
"delta": {"type": "input_json_delta", "partial_json": "{\"path\":"}
|
||||
});
|
||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||
drop(tx);
|
||||
let tokens: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
while let Ok(t) = rx.try_recv() {
|
||||
v.push(t);
|
||||
}
|
||||
v
|
||||
};
|
||||
assert!(tokens.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_stream_event_thinking_delta_routes_to_thinking_channel() {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (ttx, mut trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let event = json!({
|
||||
"type": "content_block_delta",
|
||||
"delta": {"type": "thinking_delta", "thinking": "I should check the file"}
|
||||
});
|
||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||
drop(tx);
|
||||
drop(ttx);
|
||||
// thinking token must NOT appear in the regular token channel
|
||||
let tokens: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
while let Ok(t) = rx.try_recv() {
|
||||
v.push(t);
|
||||
}
|
||||
v
|
||||
};
|
||||
assert!(
|
||||
tokens.is_empty(),
|
||||
"thinking token leaked into token channel"
|
||||
);
|
||||
// thinking token must appear in the dedicated thinking channel, without prefix
|
||||
let thinking: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
while let Ok(t) = trx.try_recv() {
|
||||
v.push(t);
|
||||
}
|
||||
v
|
||||
};
|
||||
assert_eq!(thinking, vec!["I should check the file"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_stream_event_error_sends_error_token() {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let event = json!({
|
||||
"type": "error",
|
||||
"error": {"type": "overloaded_error", "message": "Overloaded"}
|
||||
});
|
||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||
drop(tx);
|
||||
let tokens: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
while let Ok(t) = rx.try_recv() {
|
||||
v.push(t);
|
||||
}
|
||||
v
|
||||
};
|
||||
assert_eq!(tokens, vec!["\n[error: Overloaded]\n"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_stream_event_unknown_type_is_noop() {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let event = json!({"type": "ping"});
|
||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||
drop(tx);
|
||||
let tokens: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
while let Ok(t) = rx.try_recv() {
|
||||
v.push(t);
|
||||
}
|
||||
v
|
||||
};
|
||||
assert!(tokens.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_stream_event_tool_use_start_sends_activity() {
|
||||
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let event = json!({
|
||||
"type": "content_block_start",
|
||||
"index": 1,
|
||||
"content_block": {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {}}
|
||||
});
|
||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||
drop(atx);
|
||||
let activities: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
while let Ok(a) = arx.try_recv() {
|
||||
v.push(a);
|
||||
}
|
||||
v
|
||||
};
|
||||
assert_eq!(activities, vec!["Read"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_stream_event_text_block_start_no_activity() {
|
||||
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let event = json!({
|
||||
"type": "content_block_start",
|
||||
"index": 0,
|
||||
"content_block": {"type": "text", "text": ""}
|
||||
});
|
||||
handle_stream_event(&event, &tx, &ttx, &atx);
|
||||
drop(atx);
|
||||
let activities: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
while let Ok(a) = arx.try_recv() {
|
||||
v.push(a);
|
||||
}
|
||||
v
|
||||
};
|
||||
assert!(activities.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_result_returns_true() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::<String>();
|
||||
let mut sid_tx_opt = Some(sid_tx);
|
||||
let json = json!({"type": "result", "subtype": "success"});
|
||||
assert!(process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx_opt,
|
||||
&AtomicBool::new(false),
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_system_returns_false() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({"type": "system", "subtype": "init", "apiKeySource": "env"});
|
||||
assert!(!process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&AtomicBool::new(false),
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_rate_limit_returns_false() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({"type": "rate_limit_event"});
|
||||
assert!(!process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&AtomicBool::new(false),
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_unknown_type_returns_false() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({"type": "some_future_event"});
|
||||
assert!(!process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&AtomicBool::new(false),
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_no_type_returns_false() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({"content": "no type field"});
|
||||
assert!(!process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&AtomicBool::new(false),
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_captures_session_id() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (sid_tx, mut sid_rx) = tokio::sync::oneshot::channel::<String>();
|
||||
let mut sid_tx_opt = Some(sid_tx);
|
||||
let json = json!({"type": "system", "session_id": "sess-abc-123"});
|
||||
process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx_opt,
|
||||
&AtomicBool::new(false),
|
||||
);
|
||||
// sid_tx should have been consumed
|
||||
assert!(sid_tx_opt.is_none());
|
||||
let received = sid_rx.try_recv().unwrap();
|
||||
assert_eq!(received, "sess-abc-123");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_preserves_sid_tx_if_no_session_id() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::<String>();
|
||||
let mut sid_tx_opt = Some(sid_tx);
|
||||
let json = json!({"type": "system"});
|
||||
process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx_opt,
|
||||
&AtomicBool::new(false),
|
||||
);
|
||||
// sid_tx should still be present since no session_id in event
|
||||
assert!(sid_tx_opt.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_stream_event_forwards_token() {
|
||||
let (tok_tx, mut tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) =
|
||||
make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "stream_event",
|
||||
"session_id": "s1",
|
||||
"event": {
|
||||
"type": "content_block_delta",
|
||||
"delta": {"type": "text_delta", "text": "word"}
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&AtomicBool::new(false),
|
||||
));
|
||||
drop(tok_tx);
|
||||
let tokens: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
while let Ok(t) = tok_rx.try_recv() {
|
||||
v.push(t);
|
||||
}
|
||||
v
|
||||
};
|
||||
assert_eq!(tokens, vec!["word"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_stream_event_tool_use_fires_activity() {
|
||||
// This is the primary activity path: stream_event wrapping content_block_start
|
||||
// with a tool_use block. Requires --include-partial-messages to be enabled.
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) =
|
||||
make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "stream_event",
|
||||
"session_id": "s1",
|
||||
"event": {
|
||||
"type": "content_block_start",
|
||||
"index": 1,
|
||||
"content_block": {"type": "tool_use", "id": "toolu_abc", "name": "Bash", "input": {}}
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&AtomicBool::new(false),
|
||||
));
|
||||
drop(act_tx);
|
||||
let activities: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
while let Ok(a) = act_rx.try_recv() {
|
||||
v.push(a);
|
||||
}
|
||||
v
|
||||
};
|
||||
assert_eq!(activities, vec!["Bash"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_assistant_with_tool_use_fires_activity() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) =
|
||||
make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "assistant",
|
||||
"message": {
|
||||
"content": [
|
||||
{"type": "text", "text": "Let me read that file."},
|
||||
{"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {"file_path": "/foo.rs"}}
|
||||
]
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&AtomicBool::new(false),
|
||||
));
|
||||
drop(act_tx);
|
||||
let activities: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
while let Ok(a) = act_rx.try_recv() {
|
||||
v.push(a);
|
||||
}
|
||||
v
|
||||
};
|
||||
assert_eq!(activities, vec!["Read"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_assistant_with_multiple_tool_uses_fires_all_activities() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) =
|
||||
make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "assistant",
|
||||
"message": {
|
||||
"content": [
|
||||
{"type": "tool_use", "id": "id1", "name": "Glob", "input": {}},
|
||||
{"type": "tool_use", "id": "id2", "name": "Bash", "input": {}}
|
||||
]
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&AtomicBool::new(false),
|
||||
));
|
||||
drop(act_tx);
|
||||
let activities: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
while let Ok(a) = act_rx.try_recv() {
|
||||
v.push(a);
|
||||
}
|
||||
v
|
||||
};
|
||||
assert_eq!(activities, vec!["Glob", "Bash"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_assistant_text_only_no_activity() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) =
|
||||
make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "assistant",
|
||||
"message": {
|
||||
"content": [{"type": "text", "text": "Just text, no tools."}]
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&AtomicBool::new(false),
|
||||
));
|
||||
drop(act_tx);
|
||||
let activities: Vec<String> = {
|
||||
let mut v = vec![];
|
||||
while let Ok(a) = act_rx.try_recv() {
|
||||
v.push(a);
|
||||
}
|
||||
v
|
||||
};
|
||||
assert!(activities.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_assistant_event_parses_message() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "assistant",
|
||||
"message": {
|
||||
"content": [{"type": "text", "text": "Hi!"}]
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&AtomicBool::new(false),
|
||||
));
|
||||
drop(msg_tx);
|
||||
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].content, "Hi!");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_user_event_parses_tool_results() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "user",
|
||||
"message": {
|
||||
"content": [{"type": "tool_result", "tool_use_id": "tid1", "content": "done"}]
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&AtomicBool::new(false),
|
||||
));
|
||||
drop(msg_tx);
|
||||
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].role, Role::Tool);
|
||||
assert_eq!(msgs[0].content, "done");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_assistant_without_content_array_is_noop() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({
|
||||
"type": "assistant",
|
||||
"message": {"content": "not an array"}
|
||||
});
|
||||
assert!(!process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&AtomicBool::new(false),
|
||||
));
|
||||
drop(msg_tx);
|
||||
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
||||
assert!(msgs.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_user_without_content_array_is_noop() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let json = json!({"type": "user", "message": {"content": null}});
|
||||
assert!(!process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&AtomicBool::new(false),
|
||||
));
|
||||
drop(msg_tx);
|
||||
let msgs: Vec<Message> = msg_rx.try_iter().collect();
|
||||
assert!(msgs.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_detects_authentication_failed() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let auth_failed = AtomicBool::new(false);
|
||||
let json = json!({
|
||||
"type": "assistant",
|
||||
"error": "authentication_failed",
|
||||
"message": {
|
||||
"content": [{"type": "text", "text": "Failed to authenticate."}]
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&auth_failed,
|
||||
));
|
||||
assert!(auth_failed.load(Ordering::Relaxed));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_json_event_no_auth_failed_for_normal_events() {
|
||||
let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels();
|
||||
let mut sid_tx = None::<tokio::sync::oneshot::Sender<String>>;
|
||||
let auth_failed = AtomicBool::new(false);
|
||||
let json = json!({
|
||||
"type": "assistant",
|
||||
"message": {
|
||||
"content": [{"type": "text", "text": "Hello!"}]
|
||||
}
|
||||
});
|
||||
assert!(!process_json_event(
|
||||
&json,
|
||||
&tok_tx,
|
||||
&thi_tx,
|
||||
&act_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&auth_failed,
|
||||
));
|
||||
assert!(!auth_failed.load(Ordering::Relaxed));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,395 @@
|
||||
//! Claude Code provider — runs Claude Code CLI in a PTY and parses structured output.
|
||||
use crate::slog;
|
||||
use portable_pty::{CommandBuilder, PtySize, native_pty_system};
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use tokio::sync::watch;
|
||||
|
||||
use crate::llm::types::{FunctionCall, Message, Role, ToolCall};
|
||||
|
||||
/// Result from a Claude Code session containing structured messages.
|
||||
pub struct ClaudeCodeResult {
|
||||
/// The conversation messages produced by Claude Code, including assistant
|
||||
/// turns (with optional tool_calls) and tool result turns.
|
||||
pub messages: Vec<Message>,
|
||||
/// Session ID for conversation resumption on subsequent requests.
|
||||
pub session_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Manages a Claude Code session via a pseudo-terminal.
|
||||
///
|
||||
/// Spawns `claude -p` in a PTY so isatty() returns true (which may
|
||||
/// influence billing), while using `--output-format stream-json` to
|
||||
/// get clean, structured NDJSON output instead of TUI escape sequences.
|
||||
///
|
||||
/// Supports session resumption: if a `session_id` is provided, passes
|
||||
/// `--resume <id>` so Claude Code loads the prior conversation transcript
|
||||
/// from disk and continues with full context.
|
||||
///
|
||||
/// Permissions are delegated to the MCP `prompt_permission` tool via
|
||||
/// `--permission-prompt-tool`, so Claude Code calls back into the server
|
||||
/// when a tool requires user approval. The frontend dialog handles the UX.
|
||||
|
||||
mod events;
|
||||
mod parse;
|
||||
|
||||
use events::{handle_stream_event, process_json_event};
|
||||
|
||||
pub struct ClaudeCodeProvider;
|
||||
|
||||
impl ClaudeCodeProvider {
|
||||
pub fn new() -> Self {
|
||||
Self
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn chat_stream<F, T, A>(
|
||||
&self,
|
||||
user_message: &str,
|
||||
project_root: &str,
|
||||
session_id: Option<&str>,
|
||||
system_prompt: Option<&str>,
|
||||
cancel_rx: &mut watch::Receiver<bool>,
|
||||
mut on_token: F,
|
||||
mut on_thinking: T,
|
||||
mut on_activity: A,
|
||||
) -> Result<ClaudeCodeResult, String>
|
||||
where
|
||||
F: FnMut(&str) + Send,
|
||||
T: FnMut(&str) + Send,
|
||||
A: FnMut(&str) + Send,
|
||||
{
|
||||
let cancelled = Arc::new(AtomicBool::new(false));
|
||||
let cancelled_clone = cancelled.clone();
|
||||
|
||||
let mut cancel_watch = cancel_rx.clone();
|
||||
tokio::spawn(async move {
|
||||
while cancel_watch.changed().await.is_ok() {
|
||||
if *cancel_watch.borrow() {
|
||||
cancelled_clone.store(true, Ordering::Relaxed);
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Attempt up to 2 times: first try, then retry after OAuth refresh.
|
||||
for attempt in 0..2 {
|
||||
let message = user_message.to_string();
|
||||
let cwd = project_root.to_string();
|
||||
let resume_id = session_id.map(|s| s.to_string());
|
||||
let sys_prompt = system_prompt.map(|s| s.to_string());
|
||||
let cancelled_inner = cancelled.clone();
|
||||
let auth_failed = Arc::new(AtomicBool::new(false));
|
||||
let auth_failed_clone = auth_failed.clone();
|
||||
|
||||
let (token_tx, mut token_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (thinking_tx, mut thinking_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (activity_tx, mut activity_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||
let (msg_tx, msg_rx) = std::sync::mpsc::channel::<Message>();
|
||||
let (sid_tx, sid_rx) = tokio::sync::oneshot::channel::<String>();
|
||||
|
||||
let pty_handle = tokio::task::spawn_blocking(move || {
|
||||
run_pty_session(
|
||||
&message,
|
||||
&cwd,
|
||||
resume_id.as_deref(),
|
||||
sys_prompt.as_deref(),
|
||||
cancelled_inner,
|
||||
auth_failed_clone,
|
||||
token_tx,
|
||||
thinking_tx,
|
||||
activity_tx,
|
||||
msg_tx,
|
||||
sid_tx,
|
||||
)
|
||||
});
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
msg = token_rx.recv() => match msg {
|
||||
Some(t) => on_token(&t),
|
||||
None => break,
|
||||
},
|
||||
msg = thinking_rx.recv() => if let Some(t) = msg {
|
||||
on_thinking(&t);
|
||||
},
|
||||
msg = activity_rx.recv() => if let Some(name) = msg {
|
||||
on_activity(&name);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Drain any remaining activity/thinking messages that were buffered
|
||||
// when the token channel closed.
|
||||
while let Ok(t) = thinking_rx.try_recv() {
|
||||
on_thinking(&t);
|
||||
}
|
||||
while let Ok(name) = activity_rx.try_recv() {
|
||||
on_activity(&name);
|
||||
}
|
||||
|
||||
pty_handle
|
||||
.await
|
||||
.map_err(|e| format!("PTY task panicked: {e}"))??;
|
||||
|
||||
// Check if the PTY session failed due to expired OAuth token.
|
||||
if auth_failed.load(Ordering::Relaxed) && attempt == 0 {
|
||||
slog!("[oauth] Authentication failed, attempting token refresh");
|
||||
match crate::llm::oauth::refresh_access_token().await {
|
||||
Ok(()) => {
|
||||
slog!("[oauth] Token refreshed, retrying request");
|
||||
on_token("\n*Refreshing authentication token...*\n");
|
||||
continue;
|
||||
}
|
||||
Err(_e) => {
|
||||
let port = crate::http::resolve_port();
|
||||
let login_url = format!("http://localhost:{port}/oauth/authorize");
|
||||
return Err(format!(
|
||||
"OAuth session expired or credentials missing. Please log in: {login_url}"
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let captured_session_id = sid_rx.await.ok();
|
||||
slog!("[pty-debug] RECEIVED session_id: {:?}", captured_session_id);
|
||||
let structured_messages: Vec<Message> = msg_rx.try_iter().collect();
|
||||
|
||||
return Ok(ClaudeCodeResult {
|
||||
messages: structured_messages,
|
||||
session_id: captured_session_id,
|
||||
});
|
||||
}
|
||||
|
||||
// Should never reach here, but just in case.
|
||||
Err("Authentication failed after retry".to_string())
|
||||
}
|
||||
}
|
||||
|
||||
/// Run `claude -p` with stream-json output inside a PTY.
|
||||
///
|
||||
/// The PTY makes isatty() return true. The `-p` flag gives us
|
||||
/// single-shot non-interactive mode with structured output.
|
||||
///
|
||||
/// Sends streaming text tokens via `token_tx` for real-time display, and
|
||||
/// complete structured `Message` values via `msg_tx` for the final message
|
||||
/// history (assistant turns with tool_calls, and tool result turns).
|
||||
///
|
||||
/// Permission handling is delegated to the MCP `prompt_permission` tool
|
||||
/// via `--permission-prompt-tool`. Claude Code calls the MCP tool when it
|
||||
/// needs user approval, and the server bridges the request to the frontend.
|
||||
|
||||
fn run_pty_session(
|
||||
user_message: &str,
|
||||
cwd: &str,
|
||||
resume_session_id: Option<&str>,
|
||||
_system_prompt: Option<&str>,
|
||||
cancelled: Arc<AtomicBool>,
|
||||
auth_failed: Arc<AtomicBool>,
|
||||
token_tx: tokio::sync::mpsc::UnboundedSender<String>,
|
||||
thinking_tx: tokio::sync::mpsc::UnboundedSender<String>,
|
||||
activity_tx: tokio::sync::mpsc::UnboundedSender<String>,
|
||||
msg_tx: std::sync::mpsc::Sender<Message>,
|
||||
sid_tx: tokio::sync::oneshot::Sender<String>,
|
||||
) -> Result<(), String> {
|
||||
let pty_system = native_pty_system();
|
||||
|
||||
let pair = pty_system
|
||||
.openpty(PtySize {
|
||||
rows: 50,
|
||||
cols: 200,
|
||||
pixel_width: 0,
|
||||
pixel_height: 0,
|
||||
})
|
||||
.map_err(|e| format!("Failed to open PTY: {e}"))?;
|
||||
|
||||
let mut cmd = CommandBuilder::new("claude");
|
||||
cmd.arg("-p");
|
||||
cmd.arg(user_message);
|
||||
if let Some(sid) = resume_session_id {
|
||||
cmd.arg("--resume");
|
||||
cmd.arg(sid);
|
||||
}
|
||||
cmd.arg("--output-format");
|
||||
cmd.arg("stream-json");
|
||||
cmd.arg("--verbose");
|
||||
// Enable partial streaming events so we receive stream_event messages
|
||||
// containing raw API events (content_block_start, content_block_delta,
|
||||
// etc.). Without this flag, only complete assistant/user/result events
|
||||
// are emitted and tool-start activity signals never fire.
|
||||
cmd.arg("--include-partial-messages");
|
||||
// Delegate permission decisions to the MCP prompt_permission tool.
|
||||
// Claude Code will call this tool via the huskies MCP server when
|
||||
// a tool requires user approval, instead of using PTY stdin/stdout.
|
||||
cmd.arg("--permission-prompt-tool");
|
||||
cmd.arg("mcp__huskies__prompt_permission");
|
||||
// Note: --system is not a valid Claude Code CLI flag. System-level
|
||||
// instructions (like bot name) are prepended to the user prompt instead.
|
||||
cmd.cwd(cwd);
|
||||
// Keep TERM reasonable but disable color
|
||||
cmd.env("NO_COLOR", "1");
|
||||
// Allow nested spawning when the server itself runs inside Claude Code
|
||||
cmd.env("CLAUDECODE", "");
|
||||
|
||||
slog!(
|
||||
"[pty-debug] Spawning: claude -p \"{}\" {} --output-format stream-json --verbose --include-partial-messages --permission-prompt-tool mcp__huskies__prompt_permission",
|
||||
user_message,
|
||||
resume_session_id
|
||||
.map(|s| format!("--resume {s}"))
|
||||
.unwrap_or_default()
|
||||
);
|
||||
|
||||
let mut child = pair
|
||||
.slave
|
||||
.spawn_command(cmd)
|
||||
.map_err(|e| format!("Failed to spawn claude: {e}"))?;
|
||||
|
||||
slog!("[pty-debug] Process spawned, pid: {:?}", child.process_id());
|
||||
drop(pair.slave);
|
||||
|
||||
let reader = pair
|
||||
.master
|
||||
.try_clone_reader()
|
||||
.map_err(|e| format!("Failed to clone PTY reader: {e}"))?;
|
||||
|
||||
// We no longer need the writer — permission responses flow through MCP,
|
||||
// not PTY stdin. Drop it so the PTY sees EOF on stdin when appropriate.
|
||||
drop(pair.master);
|
||||
|
||||
// Read NDJSON lines from stdout
|
||||
let (line_tx, line_rx) = std::sync::mpsc::channel::<Option<String>>();
|
||||
|
||||
let reader_handle = std::thread::spawn(move || {
|
||||
let buf_reader = BufReader::new(reader);
|
||||
slog!("[pty-debug] Reader thread started");
|
||||
for line in buf_reader.lines() {
|
||||
match line {
|
||||
Ok(l) => {
|
||||
slog!("[pty-debug] raw line: {}", l);
|
||||
if line_tx.send(Some(l)).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
slog!("[pty-debug] read error: {e}");
|
||||
let _ = line_tx.send(None);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
slog!("[pty-debug] Reader thread done");
|
||||
let _ = line_tx.send(None);
|
||||
});
|
||||
|
||||
let mut got_result = false;
|
||||
let mut sid_tx = Some(sid_tx);
|
||||
|
||||
loop {
|
||||
if cancelled.load(Ordering::Relaxed) {
|
||||
let _ = child.kill();
|
||||
let _ = child.wait();
|
||||
let _ = reader_handle.join();
|
||||
return Err("Cancelled".to_string());
|
||||
}
|
||||
|
||||
match line_rx.recv_timeout(std::time::Duration::from_millis(500)) {
|
||||
Ok(Some(line)) => {
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut end = trimmed.len().min(120);
|
||||
while !trimmed.is_char_boundary(end) {
|
||||
end -= 1;
|
||||
}
|
||||
slog!("[pty-debug] processing: {}...", &trimmed[..end]);
|
||||
|
||||
// Try to parse as JSON
|
||||
if let Ok(json) = serde_json::from_str::<serde_json::Value>(trimmed)
|
||||
&& process_json_event(
|
||||
&json,
|
||||
&token_tx,
|
||||
&thinking_tx,
|
||||
&activity_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&auth_failed,
|
||||
)
|
||||
{
|
||||
got_result = true;
|
||||
}
|
||||
// Ignore non-JSON lines (terminal escape sequences)
|
||||
|
||||
if got_result {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(None) => break, // EOF
|
||||
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
|
||||
// Check if child has exited
|
||||
if let Ok(Some(_status)) = child.try_wait() {
|
||||
// Drain remaining lines through the same dispatch path
|
||||
// (process_json_event) so activity signals fire correctly.
|
||||
while let Ok(Some(line)) = line_rx.try_recv() {
|
||||
let trimmed = line.trim();
|
||||
if let Ok(json) = serde_json::from_str::<serde_json::Value>(trimmed) {
|
||||
process_json_event(
|
||||
&json,
|
||||
&token_tx,
|
||||
&thinking_tx,
|
||||
&activity_tx,
|
||||
&msg_tx,
|
||||
&mut sid_tx,
|
||||
&auth_failed,
|
||||
);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
|
||||
}
|
||||
|
||||
// Don't set got_result here — just let the process finish naturally
|
||||
let _ = got_result;
|
||||
}
|
||||
|
||||
// Wait briefly for Claude Code to flush its session transcript to disk.
|
||||
// The `result` event means the API response is done, but the process
|
||||
// still needs to write the conversation to the JSONL session file.
|
||||
match child.try_wait() {
|
||||
Ok(Some(_)) => {} // Already exited
|
||||
_ => {
|
||||
// Give it up to 2 seconds to exit cleanly
|
||||
for _ in 0..20 {
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
if let Ok(Some(_)) = child.try_wait() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// If still running after 2s, kill it
|
||||
let _ = child.kill();
|
||||
let _ = child.wait();
|
||||
}
|
||||
}
|
||||
// Wait for the reader thread to release the cloned PTY master fd.
|
||||
let _ = reader_handle.join();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Dispatch a single parsed JSON event to the appropriate handler.
|
||||
///
|
||||
/// Returns `true` if a `result` event was received (signals session completion).
|
||||
/// Captures the session ID from the first event that carries it.
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn claude_code_provider_new() {
|
||||
let _provider = ClaudeCodeProvider::new();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,332 @@
|
||||
//! Claude Code message parsing — extracts text and tool-use info from assistant
|
||||
//! and user messages.
|
||||
|
||||
use crate::llm::types::{FunctionCall, Message, Role, ToolCall};
|
||||
|
||||
pub(super) fn parse_assistant_message(
|
||||
content: &[serde_json::Value],
|
||||
msg_tx: &std::sync::mpsc::Sender<Message>,
|
||||
) {
|
||||
let mut text = String::new();
|
||||
let mut tool_calls: Vec<ToolCall> = Vec::new();
|
||||
|
||||
for block in content {
|
||||
match block.get("type").and_then(|t| t.as_str()) {
|
||||
Some("text") => {
|
||||
if let Some(t) = block.get("text").and_then(|t| t.as_str()) {
|
||||
text.push_str(t);
|
||||
}
|
||||
}
|
||||
Some("tool_use") => {
|
||||
let id = block
|
||||
.get("id")
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|s| s.to_string());
|
||||
let name = block
|
||||
.get("name")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
let input = block
|
||||
.get("input")
|
||||
.cloned()
|
||||
.unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
|
||||
let arguments = serde_json::to_string(&input).unwrap_or_default();
|
||||
tool_calls.push(ToolCall {
|
||||
id,
|
||||
function: FunctionCall { name, arguments },
|
||||
kind: "function".to_string(),
|
||||
});
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let msg = Message {
|
||||
role: Role::Assistant,
|
||||
content: text,
|
||||
tool_calls: if tool_calls.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(tool_calls)
|
||||
},
|
||||
tool_call_id: None,
|
||||
};
|
||||
let _ = msg_tx.send(msg);
|
||||
}
|
||||
|
||||
/// Parse a `user` message containing tool_result blocks.
|
||||
///
|
||||
/// Claude Code injects tool results into the conversation as `user` role
|
||||
|
||||
pub(super) fn parse_tool_results(content: &[serde_json::Value], msg_tx: &std::sync::mpsc::Sender<Message>) {
|
||||
for block in content {
|
||||
if block.get("type").and_then(|t| t.as_str()) != Some("tool_result") {
|
||||
continue;
|
||||
}
|
||||
|
||||
let tool_use_id = block
|
||||
.get("tool_use_id")
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|s| s.to_string());
|
||||
|
||||
// `content` in a tool_result can be a plain string or an array of content blocks
|
||||
let content_str = match block.get("content") {
|
||||
Some(serde_json::Value::String(s)) => s.clone(),
|
||||
Some(serde_json::Value::Array(arr)) => {
|
||||
// Extract text from content block array
|
||||
arr.iter()
|
||||
.filter_map(|b| {
|
||||
if b.get("type").and_then(|t| t.as_str()) == Some("text") {
|
||||
b.get("text")
|
||||
.and_then(|t| t.as_str())
|
||||
.map(|s| s.to_string())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n")
|
||||
}
|
||||
Some(other) => serde_json::to_string(other).unwrap_or_default(),
|
||||
None => String::new(),
|
||||
};
|
||||
|
||||
let _ = msg_tx.send(Message {
|
||||
role: Role::Tool,
|
||||
content: content_str,
|
||||
tool_calls: None,
|
||||
tool_call_id: tool_use_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract text from a stream event and send to the token channel for live display.
|
||||
///
|
||||
/// Stream events provide incremental text deltas for real-time rendering.
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serde_json::json;
|
||||
|
||||
fn collect_messages(f: impl Fn(&std::sync::mpsc::Sender<Message>)) -> Vec<Message> {
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
f(&tx);
|
||||
drop(tx);
|
||||
rx.try_iter().collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_assistant_message_text_only() {
|
||||
let content = vec![json!({"type": "text", "text": "Hello, world!"})];
|
||||
let msgs = collect_messages(|tx| parse_assistant_message(&content, tx));
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].role, Role::Assistant);
|
||||
assert_eq!(msgs[0].content, "Hello, world!");
|
||||
assert!(msgs[0].tool_calls.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_assistant_message_with_tool_use() {
|
||||
let content = vec![
|
||||
json!({"type": "text", "text": "I'll read that file."}),
|
||||
json!({
|
||||
"type": "tool_use",
|
||||
"id": "toolu_abc123",
|
||||
"name": "Read",
|
||||
"input": {"file_path": "/src/main.rs"}
|
||||
}),
|
||||
];
|
||||
let msgs = collect_messages(|tx| parse_assistant_message(&content, tx));
|
||||
assert_eq!(msgs.len(), 1);
|
||||
let msg = &msgs[0];
|
||||
assert_eq!(msg.role, Role::Assistant);
|
||||
assert_eq!(msg.content, "I'll read that file.");
|
||||
let tool_calls = msg.tool_calls.as_ref().expect("should have tool calls");
|
||||
assert_eq!(tool_calls.len(), 1);
|
||||
assert_eq!(tool_calls[0].id.as_deref(), Some("toolu_abc123"));
|
||||
assert_eq!(tool_calls[0].function.name, "Read");
|
||||
let args: serde_json::Value =
|
||||
serde_json::from_str(&tool_calls[0].function.arguments).unwrap();
|
||||
assert_eq!(args["file_path"], "/src/main.rs");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_assistant_message_multiple_tool_uses() {
|
||||
let content = vec![
|
||||
json!({"type": "tool_use", "id": "id1", "name": "Glob", "input": {"pattern": "*.rs"}}),
|
||||
json!({"type": "tool_use", "id": "id2", "name": "Bash", "input": {"command": "cargo test"}}),
|
||||
];
|
||||
let msgs = collect_messages(|tx| parse_assistant_message(&content, tx));
|
||||
assert_eq!(msgs.len(), 1);
|
||||
let tool_calls = msgs[0].tool_calls.as_ref().unwrap();
|
||||
assert_eq!(tool_calls.len(), 2);
|
||||
assert_eq!(tool_calls[0].function.name, "Glob");
|
||||
assert_eq!(tool_calls[1].function.name, "Bash");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_tool_results_string_content() {
|
||||
let content = vec![json!({
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "toolu_abc123",
|
||||
"content": "fn main() { println!(\"hello\"); }"
|
||||
})];
|
||||
let msgs = collect_messages(|tx| parse_tool_results(&content, tx));
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].role, Role::Tool);
|
||||
assert_eq!(msgs[0].content, "fn main() { println!(\"hello\"); }");
|
||||
assert_eq!(msgs[0].tool_call_id.as_deref(), Some("toolu_abc123"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_tool_results_array_content() {
|
||||
let content = vec![json!({
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "toolu_xyz",
|
||||
"content": [
|
||||
{"type": "text", "text": "Line 1"},
|
||||
{"type": "text", "text": "Line 2"}
|
||||
]
|
||||
})];
|
||||
let msgs = collect_messages(|tx| parse_tool_results(&content, tx));
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].content, "Line 1\nLine 2");
|
||||
assert_eq!(msgs[0].tool_call_id.as_deref(), Some("toolu_xyz"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_tool_results_multiple_results() {
|
||||
let content = vec![
|
||||
json!({"type": "tool_result", "tool_use_id": "id1", "content": "result1"}),
|
||||
json!({"type": "tool_result", "tool_use_id": "id2", "content": "result2"}),
|
||||
];
|
||||
let msgs = collect_messages(|tx| parse_tool_results(&content, tx));
|
||||
assert_eq!(msgs.len(), 2);
|
||||
assert_eq!(msgs[0].tool_call_id.as_deref(), Some("id1"));
|
||||
assert_eq!(msgs[1].tool_call_id.as_deref(), Some("id2"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_tool_results_ignores_non_tool_result_blocks() {
|
||||
let content = vec![
|
||||
json!({"type": "text", "text": "not a tool result"}),
|
||||
json!({"type": "tool_result", "tool_use_id": "id1", "content": "actual result"}),
|
||||
];
|
||||
let msgs = collect_messages(|tx| parse_tool_results(&content, tx));
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].tool_call_id.as_deref(), Some("id1"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_assistant_message_empty_text_with_tool_use() {
|
||||
// When a message has only tool_use (no text), content should be empty string
|
||||
let content = vec![json!({
|
||||
"type": "tool_use",
|
||||
"id": "toolu_1",
|
||||
"name": "Write",
|
||||
"input": {"file_path": "foo.rs", "content": "fn foo() {}"}
|
||||
})];
|
||||
let msgs = collect_messages(|tx| parse_assistant_message(&content, tx));
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].content, "");
|
||||
assert!(msgs[0].tool_calls.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_tool_results_no_tool_use_id() {
|
||||
let content = vec![json!({"type": "tool_result", "content": "output"})];
|
||||
let msgs = collect_messages(|tx| parse_tool_results(&content, tx));
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert!(msgs[0].tool_call_id.is_none());
|
||||
assert_eq!(msgs[0].content, "output");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_tool_results_null_content() {
|
||||
let content = vec![json!({"type": "tool_result", "tool_use_id": "id1"})];
|
||||
let msgs = collect_messages(|tx| parse_tool_results(&content, tx));
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].content, "");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_tool_results_other_json_content() {
|
||||
let content = vec![json!({
|
||||
"type": "tool_result",
|
||||
"tool_use_id": "id1",
|
||||
"content": {"nested": "object"}
|
||||
})];
|
||||
let msgs = collect_messages(|tx| parse_tool_results(&content, tx));
|
||||
assert_eq!(msgs.len(), 1);
|
||||
// Falls through to serde_json::to_string
|
||||
assert!(!msgs[0].content.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_assistant_message_empty_content_array() {
|
||||
let content: Vec<serde_json::Value> = vec![];
|
||||
let msgs = collect_messages(|tx| parse_assistant_message(&content, tx));
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].content, "");
|
||||
assert!(msgs[0].tool_calls.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_assistant_message_unknown_block_type() {
|
||||
let content = vec![
|
||||
json!({"type": "image", "source": {"type": "base64"}}),
|
||||
json!({"type": "text", "text": "done"}),
|
||||
];
|
||||
let msgs = collect_messages(|tx| parse_assistant_message(&content, tx));
|
||||
assert_eq!(msgs.len(), 1);
|
||||
assert_eq!(msgs[0].content, "done");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_assistant_message_tool_use_without_id() {
|
||||
let content = vec![json!({
|
||||
"type": "tool_use",
|
||||
"name": "Bash",
|
||||
"input": {"command": "ls"}
|
||||
})];
|
||||
let msgs = collect_messages(|tx| parse_assistant_message(&content, tx));
|
||||
assert_eq!(msgs.len(), 1);
|
||||
let calls = msgs[0].tool_calls.as_ref().unwrap();
|
||||
assert_eq!(calls.len(), 1);
|
||||
assert!(calls[0].id.is_none());
|
||||
assert_eq!(calls[0].function.name, "Bash");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_assistant_message_tool_use_without_input_defaults_to_empty_object() {
|
||||
let content = vec![json!({"type": "tool_use", "id": "tid", "name": "Read"})];
|
||||
let msgs = collect_messages(|tx| parse_assistant_message(&content, tx));
|
||||
let calls = msgs[0].tool_calls.as_ref().unwrap();
|
||||
let args: serde_json::Value = serde_json::from_str(&calls[0].function.arguments).unwrap();
|
||||
assert!(args.is_object());
|
||||
assert!(args.as_object().unwrap().is_empty());
|
||||
}
|
||||
|
||||
type Channels = (
|
||||
tokio::sync::mpsc::UnboundedSender<String>,
|
||||
tokio::sync::mpsc::UnboundedReceiver<String>,
|
||||
tokio::sync::mpsc::UnboundedSender<String>,
|
||||
tokio::sync::mpsc::UnboundedReceiver<String>,
|
||||
tokio::sync::mpsc::UnboundedSender<String>,
|
||||
tokio::sync::mpsc::UnboundedReceiver<String>,
|
||||
std::sync::mpsc::Sender<Message>,
|
||||
std::sync::mpsc::Receiver<Message>,
|
||||
);
|
||||
|
||||
fn make_channels() -> Channels {
|
||||
let (tok_tx, tok_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (thi_tx, thi_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (act_tx, act_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (msg_tx, msg_rx) = std::sync::mpsc::channel();
|
||||
(
|
||||
tok_tx, tok_rx, thi_tx, thi_rx, act_tx, act_rx, msg_tx, msg_rx,
|
||||
)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user