diff --git a/server/src/http/ws.rs b/server/src/http/ws.rs index 7a9645a..124a2c3 100644 --- a/server/src/http/ws.rs +++ b/server/src/http/ws.rs @@ -247,7 +247,6 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem tool_name: tool_name.to_string(), }); }, - Some(perm_tx), ); tokio::pin!(chat_fut); diff --git a/server/src/llm/chat.rs b/server/src/llm/chat.rs index bef08a9..6989318 100644 --- a/server/src/llm/chat.rs +++ b/server/src/llm/chat.rs @@ -187,11 +187,6 @@ pub async fn chat( mut on_update: F, mut on_token: U, mut on_activity: A, - permission_tx: Option< - tokio::sync::mpsc::UnboundedSender< - crate::llm::providers::claude_code::PermissionReqMsg, - >, - >, ) -> Result where F: FnMut(&[Message]) + Send, @@ -249,7 +244,6 @@ where &mut cancel_rx, |token| on_token(token), |tool_name| on_activity(tool_name), - permission_tx, ) .await .map_err(|e| format!("Claude Code Error: {e}"))?; diff --git a/server/src/llm/providers/claude_code.rs b/server/src/llm/providers/claude_code.rs index 11ed57b..d00f194 100644 --- a/server/src/llm/providers/claude_code.rs +++ b/server/src/llm/providers/claude_code.rs @@ -45,7 +45,6 @@ impl ClaudeCodeProvider { cancel_rx: &mut watch::Receiver, mut on_token: F, mut on_activity: A, - permission_tx: Option>, ) -> Result where F: FnMut(&str) + Send, @@ -241,101 +240,9 @@ fn run_pty_session( // Try to parse as JSON if let Ok(json) = serde_json::from_str::(trimmed) - && process_json_event(&json, &token_tx, &msg_tx, &mut sid_tx) + && process_json_event(&json, &token_tx, &activity_tx, &msg_tx, &mut sid_tx) { got_result = true; - // Capture session_id from any event that has it - if let Some(tx) = sid_tx.take() { - if let Some(sid) = json.get("session_id").and_then(|s| s.as_str()) { - let _ = tx.send(sid.to_string()); - } else { - // Put it back if this event didn't have a session_id - sid_tx = Some(tx); - } - } - - match event_type { - // Streaming deltas — used for real-time text display only - "stream_event" => { - if let Some(event) = json.get("event") { - handle_stream_event(event, &token_tx, &activity_tx); - } - } - // Complete assistant message — extract text and tool_use blocks - "assistant" => { - if let Some(message) = json.get("message") - && let Some(content) = - message.get("content").and_then(|c| c.as_array()) - { - parse_assistant_message(content, &msg_tx); - } - } - // User message containing tool results from Claude Code's own execution - "user" => { - if let Some(message) = json.get("message") - && let Some(content) = - message.get("content").and_then(|c| c.as_array()) - { - parse_tool_results(content, &msg_tx); - } - } - // Final result with usage stats - "result" => { - got_result = true; - } - // System init — suppress noisy model/apiKey notification - "system" => {} - // Rate limit info — suppress noisy notification - "rate_limit_event" => {} - // Claude Code is requesting user approval before executing a tool. - // Forward the request to the async context via permission_tx and - // block until the user responds (or a 5-minute timeout elapses). - "permission_request" => { - let request_id = json - .get("id") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - let tool_name = json - .get("tool_name") - .and_then(|v| v.as_str()) - .unwrap_or("unknown") - .to_string(); - let tool_input = json - .get("input") - .cloned() - .unwrap_or(serde_json::Value::Object(serde_json::Map::new())); - - if let Some(ref ptx) = permission_tx { - let (resp_tx, resp_rx) = std::sync::mpsc::sync_channel(1); - let _ = ptx.send(PermissionReqMsg { - request_id: request_id.clone(), - tool_name, - tool_input, - response_tx: resp_tx, - }); - // Block until the user responds or a 5-minute timeout elapses. - let approved = resp_rx - .recv_timeout(std::time::Duration::from_secs(300)) - .unwrap_or(false); - let response = serde_json::json!({ - "type": "permission_response", - "id": request_id, - "approved": approved, - }); - let _ = writeln!(pty_writer, "{}", response); - } else { - // No handler configured — deny by default. - let response = serde_json::json!({ - "type": "permission_response", - "id": request_id, - "approved": false, - }); - let _ = writeln!(pty_writer, "{}", response); - } - } - _ => {} - } } // Ignore non-JSON lines (terminal escape sequences) @@ -419,6 +326,7 @@ fn run_pty_session( fn process_json_event( json: &serde_json::Value, token_tx: &tokio::sync::mpsc::UnboundedSender, + activity_tx: &tokio::sync::mpsc::UnboundedSender, msg_tx: &std::sync::mpsc::Sender, sid_tx: &mut Option>, ) -> bool { @@ -439,7 +347,7 @@ fn process_json_event( match event_type { "stream_event" => { if let Some(event) = json.get("event") { - handle_stream_event(event, token_tx); + handle_stream_event(event, token_tx, activity_tx); } false } @@ -800,11 +708,12 @@ mod tests { #[test] fn handle_stream_event_thinking_delta_sends_prefixed_token() { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel(); let event = json!({ "type": "content_block_delta", "delta": {"type": "thinking_delta", "thinking": "I should check the file"} }); - handle_stream_event(&event, &tx); + handle_stream_event(&event, &tx, &atx); drop(tx); let tokens: Vec = { let mut v = vec![]; @@ -819,11 +728,12 @@ mod tests { #[test] fn handle_stream_event_error_sends_error_token() { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel(); let event = json!({ "type": "error", "error": {"type": "overloaded_error", "message": "Overloaded"} }); - handle_stream_event(&event, &tx); + handle_stream_event(&event, &tx, &atx); drop(tx); let tokens: Vec = { let mut v = vec![]; @@ -838,8 +748,9 @@ mod tests { #[test] fn handle_stream_event_unknown_type_is_noop() { let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (atx, _arx) = tokio::sync::mpsc::unbounded_channel(); let event = json!({"type": "ping"}); - handle_stream_event(&event, &tx); + handle_stream_event(&event, &tx, &atx); drop(tx); let tokens: Vec = { let mut v = vec![]; @@ -927,64 +838,67 @@ mod tests { } fn make_channels() -> ( + tokio::sync::mpsc::UnboundedSender, + tokio::sync::mpsc::UnboundedReceiver, tokio::sync::mpsc::UnboundedSender, tokio::sync::mpsc::UnboundedReceiver, std::sync::mpsc::Sender, std::sync::mpsc::Receiver, ) { let (tok_tx, tok_rx) = tokio::sync::mpsc::unbounded_channel(); + let (act_tx, act_rx) = tokio::sync::mpsc::unbounded_channel(); let (msg_tx, msg_rx) = std::sync::mpsc::channel(); - (tok_tx, tok_rx, msg_tx, msg_rx) + (tok_tx, tok_rx, act_tx, act_rx, msg_tx, msg_rx) } #[test] fn process_json_event_result_returns_true() { - let (tok_tx, _tok_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::(); let mut sid_tx_opt = Some(sid_tx); let json = json!({"type": "result", "subtype": "success"}); - assert!(process_json_event(&json, &tok_tx, &msg_tx, &mut sid_tx_opt)); + assert!(process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx_opt)); } #[test] fn process_json_event_system_returns_false() { - let (tok_tx, _tok_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({"type": "system", "subtype": "init", "apiKeySource": "env"}); - assert!(!process_json_event(&json, &tok_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); } #[test] fn process_json_event_rate_limit_returns_false() { - let (tok_tx, _tok_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({"type": "rate_limit_event"}); - assert!(!process_json_event(&json, &tok_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); } #[test] fn process_json_event_unknown_type_returns_false() { - let (tok_tx, _tok_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({"type": "some_future_event"}); - assert!(!process_json_event(&json, &tok_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); } #[test] fn process_json_event_no_type_returns_false() { - let (tok_tx, _tok_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({"content": "no type field"}); - assert!(!process_json_event(&json, &tok_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); } #[test] fn process_json_event_captures_session_id() { - let (tok_tx, _tok_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let (sid_tx, mut sid_rx) = tokio::sync::oneshot::channel::(); let mut sid_tx_opt = Some(sid_tx); let json = json!({"type": "system", "session_id": "sess-abc-123"}); - process_json_event(&json, &tok_tx, &msg_tx, &mut sid_tx_opt); + process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx_opt); // sid_tx should have been consumed assert!(sid_tx_opt.is_none()); let received = sid_rx.try_recv().unwrap(); @@ -993,18 +907,18 @@ mod tests { #[test] fn process_json_event_preserves_sid_tx_if_no_session_id() { - let (tok_tx, _tok_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let (sid_tx, _sid_rx) = tokio::sync::oneshot::channel::(); let mut sid_tx_opt = Some(sid_tx); let json = json!({"type": "system"}); - process_json_event(&json, &tok_tx, &msg_tx, &mut sid_tx_opt); + process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx_opt); // sid_tx should still be present since no session_id in event assert!(sid_tx_opt.is_some()); } #[test] fn process_json_event_stream_event_forwards_token() { - let (tok_tx, mut tok_rx, msg_tx, _msg_rx) = make_channels(); + let (tok_tx, mut tok_rx, act_tx, _act_rx, msg_tx, _msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "stream_event", @@ -1014,7 +928,7 @@ mod tests { "delta": {"type": "text_delta", "text": "word"} } }); - assert!(!process_json_event(&json, &tok_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(tok_tx); let tokens: Vec = { let mut v = vec![]; @@ -1028,7 +942,7 @@ mod tests { #[test] fn process_json_event_assistant_event_parses_message() { - let (tok_tx, _tok_rx, msg_tx, msg_rx) = make_channels(); + let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "assistant", @@ -1036,7 +950,7 @@ mod tests { "content": [{"type": "text", "text": "Hi!"}] } }); - assert!(!process_json_event(&json, &tok_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(msg_tx); let msgs: Vec = msg_rx.try_iter().collect(); assert_eq!(msgs.len(), 1); @@ -1045,7 +959,7 @@ mod tests { #[test] fn process_json_event_user_event_parses_tool_results() { - let (tok_tx, _tok_rx, msg_tx, msg_rx) = make_channels(); + let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "user", @@ -1053,7 +967,7 @@ mod tests { "content": [{"type": "tool_result", "tool_use_id": "tid1", "content": "done"}] } }); - assert!(!process_json_event(&json, &tok_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(msg_tx); let msgs: Vec = msg_rx.try_iter().collect(); assert_eq!(msgs.len(), 1); @@ -1063,13 +977,13 @@ mod tests { #[test] fn process_json_event_assistant_without_content_array_is_noop() { - let (tok_tx, _tok_rx, msg_tx, msg_rx) = make_channels(); + let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({ "type": "assistant", "message": {"content": "not an array"} }); - assert!(!process_json_event(&json, &tok_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(msg_tx); let msgs: Vec = msg_rx.try_iter().collect(); assert!(msgs.is_empty()); @@ -1077,10 +991,10 @@ mod tests { #[test] fn process_json_event_user_without_content_array_is_noop() { - let (tok_tx, _tok_rx, msg_tx, msg_rx) = make_channels(); + let (tok_tx, _tok_rx, act_tx, _act_rx, msg_tx, msg_rx) = make_channels(); let mut sid_tx = None::>; let json = json!({"type": "user", "message": {"content": null}}); - assert!(!process_json_event(&json, &tok_tx, &msg_tx, &mut sid_tx)); + assert!(!process_json_event(&json, &tok_tx, &act_tx, &msg_tx, &mut sid_tx)); drop(msg_tx); let msgs: Vec = msg_rx.try_iter().collect(); assert!(msgs.is_empty());