From ef162d91ffd6c552a8cef85472ea0e329128015b Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 24 Feb 2026 12:08:59 +0000 Subject: [PATCH] story-kit: merge 86_story_show_live_activity_status_instead_of_static_thinking_indicator_in_chat --- frontend/src/api/client.ts | 1 + server/src/http/ws.rs | 1 + server/src/llm/chat.rs | 8 + server/src/llm/providers/claude_code.rs | 197 +++++++++++++++++++++++- 4 files changed, 202 insertions(+), 5 deletions(-) diff --git a/frontend/src/api/client.ts b/frontend/src/api/client.ts index f4ca2b1..daabdfe 100644 --- a/frontend/src/api/client.ts +++ b/frontend/src/api/client.ts @@ -60,6 +60,7 @@ export type WsResponse = } /** `.story_kit/project.toml` was modified; re-fetch the agent roster. */ | { type: "agent_config_changed" }; + | { type: "tool_activity"; tool_name: string }; export interface ProviderConfig { provider: string; diff --git a/server/src/http/ws.rs b/server/src/http/ws.rs index 124a2c3..7a9645a 100644 --- a/server/src/http/ws.rs +++ b/server/src/http/ws.rs @@ -247,6 +247,7 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem tool_name: tool_name.to_string(), }); }, + Some(perm_tx), ); tokio::pin!(chat_fut); diff --git a/server/src/llm/chat.rs b/server/src/llm/chat.rs index c951fff..bef08a9 100644 --- a/server/src/llm/chat.rs +++ b/server/src/llm/chat.rs @@ -178,6 +178,7 @@ pub fn set_anthropic_api_key(store: &dyn StoreOps, api_key: String) -> Result<() set_anthropic_api_key_impl(store, &api_key) } +#[allow(clippy::too_many_arguments)] pub async fn chat( messages: Vec, config: ProviderConfig, @@ -186,6 +187,11 @@ pub async fn chat( mut on_update: F, mut on_token: U, mut on_activity: A, + permission_tx: Option< + tokio::sync::mpsc::UnboundedSender< + crate::llm::providers::claude_code::PermissionReqMsg, + >, + >, ) -> Result where F: FnMut(&[Message]) + Send, @@ -242,6 +248,8 @@ where config.session_id.as_deref(), &mut cancel_rx, |token| on_token(token), + |tool_name| on_activity(tool_name), + permission_tx, ) .await .map_err(|e| format!("Claude Code Error: {e}"))?; diff --git a/server/src/llm/providers/claude_code.rs b/server/src/llm/providers/claude_code.rs index ea8f4fe..3ca8bfc 100644 --- a/server/src/llm/providers/claude_code.rs +++ b/server/src/llm/providers/claude_code.rs @@ -36,16 +36,20 @@ impl ClaudeCodeProvider { Self } - pub async fn chat_stream( + #[allow(clippy::too_many_arguments)] + pub async fn chat_stream( &self, user_message: &str, project_root: &str, session_id: Option<&str>, cancel_rx: &mut watch::Receiver, mut on_token: F, + mut on_activity: A, + permission_tx: Option>, ) -> Result where F: FnMut(&str) + Send, + A: FnMut(&str) + Send, { let message = user_message.to_string(); let cwd = project_root.to_string(); @@ -64,6 +68,7 @@ impl ClaudeCodeProvider { }); let (token_tx, mut token_rx) = tokio::sync::mpsc::unbounded_channel::(); + let (activity_tx, mut activity_rx) = tokio::sync::mpsc::unbounded_channel::(); let (msg_tx, msg_rx) = std::sync::mpsc::channel::(); let (sid_tx, sid_rx) = tokio::sync::oneshot::channel::(); @@ -74,13 +79,22 @@ impl ClaudeCodeProvider { resume_id.as_deref(), cancelled, token_tx, + activity_tx, msg_tx, sid_tx, ) }); - while let Some(token) = token_rx.recv().await { - on_token(&token); + loop { + tokio::select! { + msg = token_rx.recv() => match msg { + Some(t) => on_token(&t), + None => break, + }, + msg = activity_rx.recv() => if let Some(name) = msg { + on_activity(&name); + }, + } } pty_handle @@ -115,6 +129,7 @@ fn run_pty_session( resume_session_id: Option<&str>, cancelled: Arc, token_tx: tokio::sync::mpsc::UnboundedSender, + activity_tx: tokio::sync::mpsc::UnboundedSender, msg_tx: std::sync::mpsc::Sender, sid_tx: tokio::sync::oneshot::Sender, ) -> Result<(), String> { @@ -229,6 +244,98 @@ fn run_pty_session( && process_json_event(&json, &token_tx, &msg_tx, &mut sid_tx) { got_result = true; + // Capture session_id from any event that has it + if let Some(tx) = sid_tx.take() { + if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) { + let _ = tx.send(sid.to_string()); + } else { + // Put it back if this event didn't have a session_id + sid_tx = Some(tx); + } + } + + match event_type { + // Streaming deltas — used for real-time text display only + "stream_event" => { + if let Some(event) = json.get("event") { + handle_stream_event(event, &token_tx, &activity_tx); + } + } + // Complete assistant message — extract text and tool_use blocks + "assistant" => { + if let Some(message) = json.get("message") + && let Some(content) = + message.get("content").and_then(|c| c.as_array()) + { + parse_assistant_message(content, &msg_tx); + } + } + // User message containing tool results from Claude Code's own execution + "user" => { + if let Some(message) = json.get("message") + && let Some(content) = + message.get("content").and_then(|c| c.as_array()) + { + parse_tool_results(content, &msg_tx); + } + } + // Final result with usage stats + "result" => { + got_result = true; + } + // System init — suppress noisy model/apiKey notification + "system" => {} + // Rate limit info — suppress noisy notification + "rate_limit_event" => {} + // Claude Code is requesting user approval before executing a tool. + // Forward the request to the async context via permission_tx and + // block until the user responds (or a 5-minute timeout elapses). + "permission_request" => { + let request_id = json + .get("id") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let tool_name = json + .get("tool_name") + .and_then(|v| v.as_str()) + .unwrap_or("unknown") + .to_string(); + let tool_input = json + .get("input") + .cloned() + .unwrap_or(serde_json::Value::Object(serde_json::Map::new())); + + if let Some(ref ptx) = permission_tx { + let (resp_tx, resp_rx) = std::sync::mpsc::sync_channel(1); + let _ = ptx.send(PermissionReqMsg { + request_id: request_id.clone(), + tool_name, + tool_input, + response_tx: resp_tx, + }); + // Block until the user responds or a 5-minute timeout elapses. + let approved = resp_rx + .recv_timeout(std::time::Duration::from_secs(300)) + .unwrap_or(false); + let response = serde_json::json!({ + "type": "permission_response", + "id": request_id, + "approved": approved, + }); + let _ = writeln!(pty_writer, "{}", response); + } else { + // No handler configured — deny by default. + let response = serde_json::json!({ + "type": "permission_response", + "id": request_id, + "approved": false, + }); + let _ = writeln!(pty_writer, "{}", response); + } + } + _ => {} + } } // Ignore non-JSON lines (terminal escape sequences) @@ -245,6 +352,36 @@ fn run_pty_session( let trimmed = line.trim(); if let Ok(json) = serde_json::from_str::(trimmed) { process_json_event(&json, &token_tx, &msg_tx, &mut sid_tx); + if let Ok(json) = serde_json::from_str::(trimmed) + && let Some(event_type) = + json.get("type").and_then(|t| t.as_str()) + { + match event_type { + "stream_event" => { + if let Some(event) = json.get("event") { + handle_stream_event(event, &token_tx, &activity_tx); + } + } + "assistant" => { + if let Some(message) = json.get("message") + && let Some(content) = message + .get("content") + .and_then(|c| c.as_array()) + { + parse_assistant_message(content, &msg_tx); + } + } + "user" => { + if let Some(message) = json.get("message") + && let Some(content) = message + .get("content") + .and_then(|c| c.as_array()) + { + parse_tool_results(content, &msg_tx); + } + } + _ => {} + } } } break; @@ -442,10 +579,19 @@ fn parse_tool_results( fn handle_stream_event( event: &serde_json::Value, token_tx: &tokio::sync::mpsc::UnboundedSender, + activity_tx: &tokio::sync::mpsc::UnboundedSender, ) { let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or(""); match event_type { + "content_block_start" => { + if let Some(cb) = event.get("content_block") + && cb.get("type").and_then(|t| t.as_str()) == Some("tool_use") + && let Some(name) = cb.get("name").and_then(|n| n.as_str()) + { + let _ = activity_tx.send(name.to_string()); + } + } // Text content streaming — only text_delta, not input_json_delta (tool args) "content_block_delta" => { if let Some(delta) = event.get("delta") { @@ -615,11 +761,12 @@ mod tests { #[test] fn handle_stream_event_text_delta_sends_token() { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); let event = json!({ "type": "content_block_delta", "delta": {"type": "text_delta", "text": "hello "} }); - handle_stream_event(&event, &tx); + handle_stream_event(&event, &tx, &atx); drop(tx); let tokens: Vec<_> = { let mut v = vec![]; @@ -635,11 +782,12 @@ mod tests { fn handle_stream_event_input_json_delta_not_sent() { // Tool argument JSON deltas should NOT be sent as text tokens let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel::(); let event = json!({ "type": "content_block_delta", "delta": {"type": "input_json_delta", "partial_json": "{\"path\":"} }); - handle_stream_event(&event, &tx); + handle_stream_event(&event, &tx, &atx); drop(tx); let tokens: Vec = { let mut v = vec![]; @@ -943,5 +1091,44 @@ mod tests { #[test] fn claude_code_provider_new() { let _provider = ClaudeCodeProvider::new(); + fn handle_stream_event_tool_use_start_sends_activity() { + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({ + "type": "content_block_start", + "index": 1, + "content_block": {"type": "tool_use", "id": "toolu_1", "name": "Read", "input": {}} + }); + handle_stream_event(&event, &tx, &atx); + drop(atx); + let activities: Vec = { + let mut v = vec![]; + while let Ok(a) = arx.try_recv() { + v.push(a); + } + v + }; + assert_eq!(activities, vec!["Read"]); + } + + #[test] + fn handle_stream_event_text_block_start_no_activity() { + let (tx, _rx) = tokio::sync::mpsc::unbounded_channel::(); + let (atx, mut arx) = tokio::sync::mpsc::unbounded_channel::(); + let event = json!({ + "type": "content_block_start", + "index": 0, + "content_block": {"type": "text", "text": ""} + }); + handle_stream_event(&event, &tx, &atx); + drop(atx); + let activities: Vec = { + let mut v = vec![]; + while let Ok(a) = arx.try_recv() { + v.push(a); + } + v + }; + assert!(activities.is_empty()); } }