diff --git a/frontend/src/api/client.ts b/frontend/src/api/client.ts index 5ae4dc4..f11c1d6 100644 --- a/frontend/src/api/client.ts +++ b/frontend/src/api/client.ts @@ -69,7 +69,9 @@ export type WsResponse = /** Heartbeat response confirming the connection is alive. */ | { type: "pong" } /** Sent on connect when the project still needs onboarding (specs are placeholders). */ - | { type: "onboarding_status"; needs_onboarding: boolean }; + | { type: "onboarding_status"; needs_onboarding: boolean } + /** Streaming thinking token from an extended-thinking block, separate from regular text. */ + | { type: "thinking_token"; content: string }; export interface ProviderConfig { provider: string; @@ -270,6 +272,7 @@ export class ChatWebSocket { private static refCount = 0; private socket?: WebSocket; private onToken?: (content: string) => void; + private onThinkingToken?: (content: string) => void; private onUpdate?: (messages: Message[]) => void; private onSessionId?: (sessionId: string) => void; private onError?: (message: string) => void; @@ -339,6 +342,8 @@ export class ChatWebSocket { try { const data = JSON.parse(event.data) as WsResponse; if (data.type === "token") this.onToken?.(data.content); + if (data.type === "thinking_token") + this.onThinkingToken?.(data.content); if (data.type === "update") this.onUpdate?.(data.messages); if (data.type === "session_id") this.onSessionId?.(data.session_id); if (data.type === "error") this.onError?.(data.message); @@ -401,6 +406,7 @@ export class ChatWebSocket { connect( handlers: { onToken?: (content: string) => void; + onThinkingToken?: (content: string) => void; onUpdate?: (messages: Message[]) => void; onSessionId?: (sessionId: string) => void; onError?: (message: string) => void; @@ -423,6 +429,7 @@ export class ChatWebSocket { wsPath = DEFAULT_WS_PATH, ) { this.onToken = handlers.onToken; + this.onThinkingToken = handlers.onThinkingToken; this.onUpdate = handlers.onUpdate; this.onSessionId = handlers.onSessionId; this.onError = handlers.onError; diff --git a/frontend/src/components/Chat.tsx b/frontend/src/components/Chat.tsx index 1aca024..99f1c03 100644 --- a/frontend/src/components/Chat.tsx +++ b/frontend/src/components/Chat.tsx @@ -13,6 +13,56 @@ import { StagePanel } from "./StagePanel"; const { useCallback, useEffect, useRef, useState } = React; +/** Fixed-height thinking trace block that auto-scrolls to bottom as text arrives. */ +function ThinkingBlock({ text }: { text: string }) { + const scrollRef = useRef(null); + + useEffect(() => { + const el = scrollRef.current; + if (el) { + el.scrollTop = el.scrollHeight; + } + }, [text]); + + return ( +
+ + thinking + + {text} +
+ ); +} + const NARROW_BREAKPOINT = 900; function formatToolActivity(toolName: string): string { @@ -64,6 +114,7 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { const [availableModels, setAvailableModels] = useState([]); const [claudeModels, setClaudeModels] = useState([]); const [streamingContent, setStreamingContent] = useState(""); + const [streamingThinking, setStreamingThinking] = useState(""); const [showApiKeyDialog, setShowApiKeyDialog] = useState(false); const [apiKeyInput, setApiKeyInput] = useState(""); const [hasAnthropicKey, setHasAnthropicKey] = useState(false); @@ -208,9 +259,13 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { onToken: (content) => { setStreamingContent((prev: string) => prev + content); }, + onThinkingToken: (content) => { + setStreamingThinking((prev: string) => prev + content); + }, onUpdate: (history) => { setMessages(history); setStreamingContent(""); + setStreamingThinking(""); const last = history[history.length - 1]; if (last?.role === "assistant" && !last.tool_calls) { setLoading(false); @@ -303,7 +358,8 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { lastScrollTopRef.current = currentScrollTop; }; - const autoScrollKey = messages.length + streamingContent.length; + const autoScrollKey = + messages.length + streamingContent.length + streamingThinking.length; useEffect(() => { if ( @@ -351,6 +407,7 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { setStreamingContent(""); } + setStreamingThinking(""); setLoading(false); setActivityStatus(null); } catch (e) { @@ -395,6 +452,7 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { } setLoading(true); setStreamingContent(""); + setStreamingThinking(""); setActivityStatus(null); try { @@ -471,6 +529,7 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { clearMessages(); setStreamingContent(""); + setStreamingThinking(""); setLoading(false); setActivityStatus(null); setClaudeSessionId(null); @@ -761,6 +820,9 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { ))} + {loading && streamingThinking && ( + + )} {loading && streamingContent && (
)} - {loading && (activityStatus != null || !streamingContent) && ( -
- - {activityStatus ?? "Thinking..."} - -
- )} + {loading && + (activityStatus != null || + (!streamingContent && !streamingThinking)) && ( +
+ + {activityStatus ?? "Thinking..."} + +
+ )}
@@ -1075,7 +1139,7 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { stateVersion={agentStateVersion} /> - + diff --git a/server/src/http/ws.rs b/server/src/http/ws.rs index 57cd5cb..755fb09 100644 --- a/server/src/http/ws.rs +++ b/server/src/http/ws.rs @@ -103,6 +103,12 @@ enum WsResponse { /// Heartbeat response to a client `Ping`. Lets the client confirm the /// connection is alive and cancel any stale-connection timeout. Pong, + /// Streaming thinking token from an extended-thinking block. + /// Sent separately from `Token` so the frontend can render them in + /// a constrained, scrollable ThinkingBlock rather than inline. + ThinkingToken { + content: String, + }, /// Sent on connect when the project's spec files still contain scaffold /// placeholder content and the user needs to go through onboarding. OnboardingStatus { @@ -257,6 +263,7 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem Ok(WsRequest::Chat { messages, config }) => { let tx_updates = tx.clone(); let tx_tokens = tx.clone(); + let tx_thinking = tx.clone(); let tx_activity = tx.clone(); let ctx_clone = ctx.clone(); @@ -277,6 +284,11 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem content: token.to_string(), }); }, + move |thinking: &str| { + let _ = tx_thinking.send(WsResponse::ThinkingToken { + content: thinking.to_string(), + }); + }, move |tool_name: &str| { let _ = tx_activity.send(WsResponse::ToolActivity { tool_name: tool_name.to_string(), @@ -595,6 +607,16 @@ mod tests { assert_eq!(json["type"], "pong"); } + #[test] + fn serialize_thinking_token_response() { + let resp = WsResponse::ThinkingToken { + content: "I need to think about this...".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "thinking_token"); + assert_eq!(json["content"], "I need to think about this..."); + } + #[test] fn serialize_onboarding_status_true() { let resp = WsResponse::OnboardingStatus { diff --git a/server/src/llm/chat.rs b/server/src/llm/chat.rs index dc1215e..ec9e4a7 100644 --- a/server/src/llm/chat.rs +++ b/server/src/llm/chat.rs @@ -180,18 +180,20 @@ pub fn set_anthropic_api_key(store: &dyn StoreOps, api_key: String) -> Result<() } #[allow(clippy::too_many_arguments)] -pub async fn chat( +pub async fn chat( messages: Vec, config: ProviderConfig, state: &SessionState, store: &dyn StoreOps, mut on_update: F, mut on_token: U, + mut on_thinking: T, mut on_activity: A, ) -> Result where F: FnMut(&[Message]) + Send, U: FnMut(&str) + Send, + T: FnMut(&str) + Send, A: FnMut(&str) + Send, { use crate::llm::providers::anthropic::AnthropicProvider; @@ -244,6 +246,7 @@ where config.session_id.as_deref(), &mut cancel_rx, |token| on_token(token), + |thinking| on_thinking(thinking), |tool_name| on_activity(tool_name), ) .await @@ -799,6 +802,7 @@ mod tests { |_| {}, |_| {}, |_| {}, + |_| {}, ) .await; @@ -840,6 +844,7 @@ mod tests { |_| {}, |_| {}, |_| {}, + |_| {}, ) .await; @@ -879,6 +884,7 @@ mod tests { |_| {}, |_| {}, |_| {}, + |_| {}, ) .await; diff --git a/server/src/llm/providers/claude_code.rs b/server/src/llm/providers/claude_code.rs index c075c92..b3fd1e4 100644 --- a/server/src/llm/providers/claude_code.rs +++ b/server/src/llm/providers/claude_code.rs @@ -37,17 +37,19 @@ impl ClaudeCodeProvider { } #[allow(clippy::too_many_arguments)] - pub async fn chat_stream( + pub async fn chat_stream( &self, user_message: &str, project_root: &str, session_id: Option<&str>, cancel_rx: &mut watch::Receiver, mut on_token: F, + mut on_thinking: T, mut on_activity: A, ) -> Result where F: FnMut(&str) + Send, + T: FnMut(&str) + Send, A: FnMut(&str) + Send, { let message = user_message.to_string(); @@ -67,6 +69,7 @@ impl ClaudeCodeProvider { }); let (token_tx, mut token_rx) = tokio::sync::mpsc::unbounded_channel::(); + let (thinking_tx, mut thinking_rx) = tokio::sync::mpsc::unbounded_channel::(); let (activity_tx, mut activity_rx) = tokio::sync::mpsc::unbounded_channel::(); let (msg_tx, msg_rx) = std::sync::mpsc::channel::(); let (sid_tx, sid_rx) = tokio::sync::oneshot::channel::(); @@ -78,6 +81,7 @@ impl ClaudeCodeProvider { resume_id.as_deref(), cancelled, token_tx, + thinking_tx, activity_tx, msg_tx, sid_tx, @@ -90,12 +94,20 @@ impl ClaudeCodeProvider { Some(t) => on_token(&t), None => break, }, + msg = thinking_rx.recv() => if let Some(t) = msg { + on_thinking(&t); + }, msg = activity_rx.recv() => if let Some(name) = msg { on_activity(&name); }, } } + // Drain any remaining activity/thinking messages that were buffered when + // the token channel closed. + while let Ok(t) = thinking_rx.try_recv() { + on_thinking(&t); + } // Drain any remaining activity messages that were buffered when the // token channel closed. The select! loop breaks on token_rx → None, // but activity_rx may still hold signals sent in the same instant. @@ -136,6 +148,7 @@ fn run_pty_session( resume_session_id: Option<&str>, cancelled: Arc, token_tx: tokio::sync::mpsc::UnboundedSender, + thinking_tx: tokio::sync::mpsc::UnboundedSender, activity_tx: tokio::sync::mpsc::UnboundedSender, msg_tx: std::sync::mpsc::Sender, sid_tx: tokio::sync::oneshot::Sender, @@ -254,7 +267,7 @@ fn run_pty_session( // Try to parse as JSON if let Ok(json) = serde_json::from_str::(trimmed) - && process_json_event(&json, &token_tx, &activity_tx, &msg_tx, &mut sid_tx) + && process_json_event(&json, &token_tx, &thinking_tx, &activity_tx, &msg_tx, &mut sid_tx) { got_result = true; } @@ -276,6 +289,7 @@ fn run_pty_session( process_json_event( &json, &token_tx, + &thinking_tx, &activity_tx, &msg_tx, &mut sid_tx, @@ -319,6 +333,7 @@ fn run_pty_session( fn process_json_event( json: &serde_json::Value, token_tx: &tokio::sync::mpsc::UnboundedSender, + thinking_tx: &tokio::sync::mpsc::UnboundedSender, activity_tx: &tokio::sync::mpsc::UnboundedSender, msg_tx: &std::sync::mpsc::Sender, sid_tx: &mut Option>, @@ -340,7 +355,7 @@ fn process_json_event( match event_type { "stream_event" => { if let Some(event) = json.get("event") { - handle_stream_event(event, token_tx, activity_tx); + handle_stream_event(event, token_tx, thinking_tx, activity_tx); } false } @@ -489,6 +504,7 @@ fn parse_tool_results( fn handle_stream_event( event: &serde_json::Value, token_tx: &tokio::sync::mpsc::UnboundedSender, + thinking_tx: &tokio::sync::mpsc::UnboundedSender, activity_tx: &tokio::sync::mpsc::UnboundedSender, ) { let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or(""); @@ -516,7 +532,7 @@ fn handle_stream_event( if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) { - let _ = token_tx.send(format!("[thinking] {thinking}")); + let _ = thinking_tx.send(thinking.to_string()); } } _ => {} @@ -671,12 +687,13 @@ mod tests { #[test] fn handle_stream_event_text_delta_sends_token() { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); let event = json!({ "type": "content_block_delta", "delta": {"type": "text_delta", "text": "hello "} }); - handle_stream_event(&event, &tx, &atx); + handle_stream_event(&event, &tx, &ttx, &atx); drop(tx); let tokens: Vec<_> = { let mut v = vec![]; @@ -692,12 +709,13 @@ mod tests { fn handle_stream_event_input_json_delta_not_sent() { // Tool argument JSON deltas should NOT be sent as text tokens let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); let event = json!({ "type": "content_block_delta", "delta": {"type": "input_json_delta", "partial_json": "{\"path\":"} }); - handle_stream_event(&event, &tx, &atx); + handle_stream_event(&event, &tx, &ttx, &atx); drop(tx); let tokens: Vec = { let mut v = vec![]; @@ -710,15 +728,18 @@ mod tests { } #[test] - fn handle_stream_event_thinking_delta_sends_prefixed_token() { + fn handle_stream_event_thinking_delta_routes_to_thinking_channel() { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let (atx, _arx) = tokio::sync::mpsc::unbounded_channel(); + let (ttx, mut trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); let event = json!({ "type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "I should check the file"} }); - handle_stream_event(&event, &tx, &atx); + handle_stream_event(&event, &tx, &ttx, &atx); drop(tx); + drop(ttx); + // thinking token must NOT appear in the regular token channel let tokens: Vec = { let mut v = vec![]; while let Ok(t) = rx.try_recv() { @@ -726,18 +747,28 @@ mod tests { } v }; - assert_eq!(tokens, vec!["[thinking] I should check the file"]); + assert!(tokens.is_empty(), "thinking token leaked into token channel"); + // thinking token must appear in the dedicated thinking channel, without prefix + let thinking: Vec = { + let mut v = vec![]; + while let Ok(t) = trx.try_recv() { + v.push(t); + } + v + }; + assert_eq!(thinking, vec!["I should check the file"]); } #[test] fn handle_stream_event_error_sends_error_token() { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let (atx, _arx) = tokio::sync::mpsc::unbounded_channel(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); let event = json!({ "type": "error", "error": {"type": "overloaded_error", "message": "Overloaded"} }); - handle_stream_event(&event, &tx, &atx); + handle_stream_event(&event, &tx, &ttx, &atx); drop(tx); let tokens: Vec = { let mut v = vec![]; @@ -752,9 +783,10 @@ mod tests { #[test] fn handle_stream_event_unknown_type_is_noop() { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let (atx, _arx) = tokio::sync::mpsc::unbounded_channel(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); let event = json!({"type": "ping"}); - handle_stream_event(&event, &tx, &atx); + handle_stream_event(&event, &tx, &ttx, &atx); drop(tx); let tokens: Vec = { let mut v = vec![]; @@ -846,65 +878,68 @@ mod tests { tokio::sync::mpsc::UnboundedReceiver, tokio::sync::mpsc::UnboundedSender, tokio::sync::mpsc::UnboundedReceiver, + tokio::sync::mpsc::UnboundedSender, + tokio::sync::mpsc::UnboundedReceiver, std::sync::mpsc::Sender, std::sync::mpsc::Receiver, ); fn make_channels() -> Channels { let (tok_tx, tok_rx) = tokio::sync::mpsc::unbounded_channel(); + let (thi_tx, thi_rx) = tokio::sync::mpsc::unbounded_channel(); let (act_tx, act_rx) = tokio::sync::mpsc::unbounded_channel(); let (msg_tx, msg_rx) = std::sync::mpsc::channel(); - (tok_tx, tok_rx, act_tx, act_rx, msg_tx, msg_rx) + (tok_tx, tok_rx, thi_tx, thi_rx, act_tx, act_rx, msg_tx, msg_rx) } #[test] fn process_json_event_result_returns_true() { - let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::(); let mut sid_tx_opt = Some(sid_tx); let json = json!({"type": "result", "subtype": "success"}); - assert!(process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx_opt)); + assert!(process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx_opt)); } #[test] fn process_json_event_system_returns_false() { - let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({"type": "system", "subtype": "init", "apiKeySource": "env"}); - assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx)); } #[test] fn process_json_event_rate_limit_returns_false() { - let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({"type": "rate_limit_event"}); - assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx)); } #[test] fn process_json_event_unknown_type_returns_false() { - let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({"type": "some_future_event"}); - assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx)); } #[test] fn process_json_event_no_type_returns_false() { - let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({"content": "no type field"}); - assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx)); } #[test] fn process_json_event_captures_session_id() { - let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let (sid_tx, mut sid_rx) = tokio::sync::oneshot::channel::(); let mut sid_tx_opt = Some(sid_tx); let json = json!({"type": "system", "session_id": "sess-abc-123"}); - process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx_opt); + process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx_opt); // sid_tx should have been consumed assert!(sid_tx_opt.is_none()); let received = sid_rx.try_recv().unwrap(); @@ -913,18 +948,18 @@ mod tests { #[test] fn process_json_event_preserves_sid_tx_if_no_session_id() { - let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::(); let mut sid_tx_opt = Some(sid_tx); let json = json!({"type": "system"}); - process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx_opt); + process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx_opt); // sid_tx should still be present since no session_id in event assert!(sid_tx_opt.is_some()); } #[test] fn process_json_event_stream_event_forwards_token() { - let (tok_tx, mut tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, mut tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "stream_event", @@ -934,7 +969,7 @@ mod tests { "delta": {"type": "text_delta", "text": "word"} } }); - assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(tok_tx); let tokens: Vec = { let mut v = vec![]; @@ -950,7 +985,7 @@ mod tests { fn process_json_event_stream_event_tool_use_fires_activity() { // This is the primary activity path: stream_event wrapping content_block_start // with a tool_use block. Requires --include-partial-messages to be enabled. - let (tok_tx, _tok_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "stream_event", @@ -961,7 +996,7 @@ mod tests { "content_block": {"type": "tool_use", "id": "toolu_abc", "name": "Bash", "input": {}} } }); - assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(act_tx); let activities: Vec = { let mut v = vec![]; @@ -975,7 +1010,7 @@ mod tests { #[test] fn process_json_event_assistant_with_tool_use_fires_activity() { - let (tok_tx, _tok_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "assistant", @@ -986,7 +1021,7 @@ mod tests { ] } }); - assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(act_tx); let activities: Vec = { let mut v = vec![]; @@ -1000,7 +1035,7 @@ mod tests { #[test] fn process_json_event_assistant_with_multiple_tool_uses_fires_all_activities() { - let (tok_tx, _tok_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "assistant", @@ -1011,7 +1046,7 @@ mod tests { ] } }); - assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(act_tx); let activities: Vec = { let mut v = vec![]; @@ -1025,7 +1060,7 @@ mod tests { #[test] fn process_json_event_assistant_text_only_no_activity() { - let (tok_tx, _tok_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, mut act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "assistant", @@ -1033,7 +1068,7 @@ mod tests { "content": [{"type": "text", "text": "Just text, no tools."}] } }); - assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(act_tx); let activities: Vec = { let mut v = vec![]; @@ -1047,7 +1082,7 @@ mod tests { #[test] fn process_json_event_assistant_event_parses_message() { - let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "assistant", @@ -1055,7 +1090,7 @@ mod tests { "content": [{"type": "text", "text": "Hi!"}] } }); - assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(msg_tx); let msgs: Vec = msg_rx.try_iter().collect(); assert_eq!(msgs.len(), 1); @@ -1064,7 +1099,7 @@ mod tests { #[test] fn process_json_event_user_event_parses_tool_results() { - let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "user", @@ -1072,7 +1107,7 @@ mod tests { "content": [{"type": "tool_result", "tool_use_id": "tid1", "content": "done"}] } }); - assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(msg_tx); let msgs: Vec = msg_rx.try_iter().collect(); assert_eq!(msgs.len(), 1); @@ -1082,13 +1117,13 @@ mod tests { #[test] fn process_json_event_assistant_without_content_array_is_noop() { - let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "assistant", "message": {"content": "not an array"} }); - assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(msg_tx); let msgs: Vec = msg_rx.try_iter().collect(); assert!(msgs.is_empty()); @@ -1096,10 +1131,10 @@ mod tests { #[test] fn process_json_event_user_without_content_array_is_noop() { - let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); + let (tok_tx, _tok_rx, thi_tx, _thi_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({"type": "user", "message": {"content": null}}); - assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &thi_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(msg_tx); let msgs: Vec = msg_rx.try_iter().collect(); assert!(msgs.is_empty()); @@ -1113,13 +1148,14 @@ mod tests { #[test] fn handle_stream_event_tool_use_start_sends_activity() { let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::(); let event = json!({ "type": "content_block_start", "index": 1, "content_block": {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {}} }); - handle_stream_event(&event, &tx, &atx); + handle_stream_event(&event, &tx, &ttx, &atx); drop(atx); let activities: Vec = { let mut v = vec![]; @@ -1134,13 +1170,14 @@ mod tests { #[test] fn handle_stream_event_text_block_start_no_activity() { let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); + let (ttx, _trx) = tokio::sync::mpsc::unbounded_channel::(); let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::(); let event = json!({ "type": "content_block_start", "index": 0, "content_block": {"type": "text", "text": ""} }); - handle_stream_event(&event, &tx, &atx); + handle_stream_event(&event, &tx, &ttx, &atx); drop(atx); let activities: Vec = { let mut v = vec![];