fix: resolve merge conflict in claude_code.rs

Keep master's quiet system/rate_limit_event handlers while preserving
the story-62 permission_request handler (the core feature).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Dave
2026-02-23 16:01:22 +00:00
parent 026ba3cbcf
commit 6962e92f0c
8 changed files with 367 additions and 43 deletions

View File

@@ -8,6 +8,7 @@ use poem::handler;
use poem::web::Data;
use poem::web::websocket::{Message as WsMessage, WebSocket};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
@@ -17,12 +18,17 @@ use tokio::sync::mpsc;
///
/// - `chat` starts a streaming chat session.
/// - `cancel` stops the active session.
/// - `permission_response` approves or denies a pending permission request.
enum WsRequest {
Chat {
messages: Vec<Message>,
config: chat::ProviderConfig,
},
Cancel,
PermissionResponse {
request_id: String,
approved: bool,
},
}
#[derive(Serialize)]
@@ -63,6 +69,12 @@ enum WsResponse {
qa: Vec<crate::http::workflow::UpcomingStory>,
merge: Vec<crate::http::workflow::UpcomingStory>,
},
/// Claude Code is requesting user approval before executing a tool.
PermissionRequest {
request_id: String,
tool_name: String,
tool_input: serde_json::Value,
},
}
impl From<WatcherEvent> for WsResponse {
@@ -139,52 +151,105 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
}
});
while let Some(Ok(msg)) = stream.next().await {
if let WsMessage::Text(text) = msg {
let parsed: Result<WsRequest, _> = serde_json::from_str(&text);
match parsed {
Ok(WsRequest::Chat { messages, config }) => {
let tx_updates = tx.clone();
let tx_tokens = tx.clone();
let ctx_clone = ctx.clone();
// Channel for permission requests flowing from the PTY thread to this handler.
let (perm_req_tx, mut perm_req_rx) =
mpsc::unbounded_channel::<crate::llm::providers::claude_code::PermissionReqMsg>();
// Map of pending permission request_id → one-shot responder.
let mut pending_perms: HashMap<String, std::sync::mpsc::SyncSender<bool>> = HashMap::new();
let result = chat::chat(
messages,
config,
&ctx_clone.state,
ctx_clone.store.as_ref(),
|history| {
let _ = tx_updates.send(WsResponse::Update {
messages: history.to_vec(),
});
},
|token| {
let _ = tx_tokens.send(WsResponse::Token {
content: token.to_string(),
});
},
)
.await;
loop {
// Outer loop: wait for the next WebSocket message.
let Some(Ok(WsMessage::Text(text))) = stream.next().await else {
break;
};
match result {
Ok(chat_result) => {
if let Some(sid) = chat_result.session_id {
let _ = tx.send(WsResponse::SessionId { session_id: sid });
let parsed: Result<WsRequest, _> = serde_json::from_str(&text);
match parsed {
Ok(WsRequest::Chat { messages, config }) => {
let tx_updates = tx.clone();
let tx_tokens = tx.clone();
let ctx_clone = ctx.clone();
let perm_tx = perm_req_tx.clone();
// Build the chat future without driving it yet so we can
// interleave it with permission-request forwarding.
let chat_fut = chat::chat(
messages,
config,
&ctx_clone.state,
ctx_clone.store.as_ref(),
move |history| {
let _ = tx_updates.send(WsResponse::Update {
messages: history.to_vec(),
});
},
move |token| {
let _ = tx_tokens.send(WsResponse::Token {
content: token.to_string(),
});
},
Some(perm_tx),
);
tokio::pin!(chat_fut);
// Inner loop: drive the chat while concurrently handling
// permission requests and WebSocket messages.
let chat_result = loop {
tokio::select! {
result = &mut chat_fut => break result,
// Forward permission requests from PTY to the client.
Some(perm_req) = perm_req_rx.recv() => {
let _ = tx.send(WsResponse::PermissionRequest {
request_id: perm_req.request_id.clone(),
tool_name: perm_req.tool_name.clone(),
tool_input: perm_req.tool_input.clone(),
});
pending_perms.insert(
perm_req.request_id,
perm_req.response_tx,
);
}
// Handle WebSocket messages during an active chat
// (permission responses and cancellations).
Some(Ok(WsMessage::Text(inner_text))) = stream.next() => {
match serde_json::from_str::<WsRequest>(&inner_text) {
Ok(WsRequest::PermissionResponse { request_id, approved }) => {
if let Some(resp_tx) = pending_perms.remove(&request_id) {
let _ = resp_tx.send(approved);
}
}
Ok(WsRequest::Cancel) => {
let _ = chat::cancel_chat(&ctx.state);
}
_ => {}
}
}
Err(err) => {
let _ = tx.send(WsResponse::Error { message: err });
}
};
match chat_result {
Ok(chat_result) => {
if let Some(sid) = chat_result.session_id {
let _ = tx.send(WsResponse::SessionId { session_id: sid });
}
}
Err(err) => {
let _ = tx.send(WsResponse::Error { message: err });
}
}
Ok(WsRequest::Cancel) => {
let _ = chat::cancel_chat(&ctx.state);
}
Err(err) => {
let _ = tx.send(WsResponse::Error {
message: format!("Invalid request: {err}"),
});
}
}
Ok(WsRequest::Cancel) => {
let _ = chat::cancel_chat(&ctx.state);
}
Ok(WsRequest::PermissionResponse { .. }) => {
// Permission responses outside an active chat are ignored.
}
Err(err) => {
let _ = tx.send(WsResponse::Error {
message: format!("Invalid request: {err}"),
});
}
}
}

View File

@@ -183,6 +183,11 @@ pub async fn chat<F, U>(
store: &dyn StoreOps,
mut on_update: F,
mut on_token: U,
permission_tx: Option<
tokio::sync::mpsc::UnboundedSender<
crate::llm::providers::claude_code::PermissionReqMsg,
>,
>,
) -> Result<ChatResult, String>
where
F: FnMut(&[Message]) + Send,
@@ -238,6 +243,7 @@ where
config.session_id.as_deref(),
&mut cancel_rx,
|token| on_token(token),
permission_tx,
)
.await
.map_err(|e| format!("Claude Code Error: {e}"))?;

View File

@@ -1,11 +1,23 @@
use portable_pty::{CommandBuilder, PtySize, native_pty_system};
use std::io::{BufRead, BufReader};
use std::io::{BufRead, BufReader, Write};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::watch;
use crate::llm::types::{FunctionCall, Message, Role, ToolCall};
/// A permission request emitted by Claude Code that must be resolved by the user.
pub struct PermissionReqMsg {
/// Unique identifier for this request (used to correlate the response).
pub request_id: String,
/// The tool that is requesting permission (e.g. "Bash", "Write").
pub tool_name: String,
/// The tool's input arguments (e.g. `{"command": "git push"}`).
pub tool_input: serde_json::Value,
/// One-shot channel to send the user's decision back to the PTY thread.
pub response_tx: std::sync::mpsc::SyncSender<bool>,
}
/// Result from a Claude Code session containing structured messages.
pub struct ClaudeCodeResult {
/// The conversation messages produced by Claude Code, including assistant
@@ -38,6 +50,7 @@ impl ClaudeCodeProvider {
session_id: Option<&str>,
cancel_rx: &mut watch::Receiver<bool>,
mut on_token: F,
permission_tx: Option<tokio::sync::mpsc::UnboundedSender<PermissionReqMsg>>,
) -> Result<ClaudeCodeResult, String>
where
F: FnMut(&str) + Send,
@@ -71,6 +84,7 @@ impl ClaudeCodeProvider {
token_tx,
msg_tx,
sid_tx,
permission_tx,
)
});
@@ -100,6 +114,7 @@ impl ClaudeCodeProvider {
/// Sends streaming text tokens via `token_tx` for real-time display, and
/// complete structured `Message` values via `msg_tx` for the final message
/// history (assistant turns with tool_calls, and tool result turns).
#[allow(clippy::too_many_arguments)]
fn run_pty_session(
user_message: &str,
cwd: &str,
@@ -108,6 +123,7 @@ fn run_pty_session(
token_tx: tokio::sync::mpsc::UnboundedSender<String>,
msg_tx: std::sync::mpsc::Sender<Message>,
sid_tx: tokio::sync::oneshot::Sender<String>,
permission_tx: Option<tokio::sync::mpsc::UnboundedSender<PermissionReqMsg>>,
) -> Result<(), String> {
let pty_system = native_pty_system();
@@ -160,8 +176,11 @@ fn run_pty_session(
.try_clone_reader()
.map_err(|e| format!("Failed to clone PTY reader: {e}"))?;
// We don't need to write anything — -p mode takes prompt as arg
drop(pair.master);
// Keep a writer handle so we can respond to permission_request events.
let mut pty_writer = pair
.master
.take_writer()
.map_err(|e| format!("Failed to take PTY writer: {e}"))?;
// Read NDJSON lines from stdout
let (line_tx, line_rx) = std::sync::mpsc::channel::<Option<String>>();
@@ -256,6 +275,53 @@ fn run_pty_session(
"system" => {}
// Rate limit info — suppress noisy notification
"rate_limit_event" => {}
// Claude Code is requesting user approval before executing a tool.
// Forward the request to the async context via permission_tx and
// block until the user responds (or a 5-minute timeout elapses).
"permission_request" => {
let request_id = json
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let tool_name = json
.get("tool_name")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
let tool_input = json
.get("input")
.cloned()
.unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
if let Some(ref ptx) = permission_tx {
let (resp_tx, resp_rx) = std::sync::mpsc::sync_channel(1);
let _ = ptx.send(PermissionReqMsg {
request_id: request_id.clone(),
tool_name,
tool_input,
response_tx: resp_tx,
});
// Block until the user responds or a 5-minute timeout elapses.
let approved = resp_rx
.recv_timeout(std::time::Duration::from_secs(300))
.unwrap_or(false);
let response = serde_json::json!({
"type": "permission_response",
"id": request_id,
"approved": approved,
});
let _ = writeln!(pty_writer, "{}", response);
} else {
// No handler configured — deny by default.
let response = serde_json::json!({
"type": "permission_response",
"id": request_id,
"approved": false,
});
let _ = writeln!(pty_writer, "{}", response);
}
}
_ => {}
}
}