From 7ac3fc2e3efff1f57c7425bab162a7ebb452f854 Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 30 Apr 2026 13:53:46 +0000 Subject: [PATCH] feat(884): persistent perm_rx lock-holder for Matrix bot MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before: handle_message.rs acquired services.perm_rx only while processing one chat message and dropped it on chat_fut completion. The moment the bot wasn't actively responding, prompt_permission auto-denied any spawned coder bash call as "no interactive session" — making unattended coder work impossible. Now: a permission_listener task is spawned at bot startup and holds perm_rx for the bot's lifetime. Permission requests are forwarded to the first configured Matrix room, replies resolved by the existing on_room_message handler via pending_perm_replies. Per-message acquire is gone from handle_message.rs (chat_fut just awaits cleanly). - New module: chat/transport/matrix/bot/permission_listener.rs. - Wired into run_bot before BotContext construction; bot_sent_event_ids is hoisted out so the listener and the rest of the bot share it. - handle_message.rs no longer touches perm_rx. - diagnostics/permission.rs comment updated to reflect the new reality. - Regression test asserts the listener forwards a PermissionForward to the target room and records the pending reply key — exactly the path that was broken when no chat_fut was in flight. Discord/Slack/WhatsApp transports still acquire perm_rx per message (commands.rs:368 / commands/llm.rs:83 / commands/llm.rs:82). They are not the active transport in this deployment so their per-message acquire remains dormant; the same listener pattern should be applied to them as follow-up work in 884 phase 2. --- .../matrix/bot/messages/handle_message.rs | 64 +---- server/src/chat/transport/matrix/bot/mod.rs | 3 + .../matrix/bot/permission_listener.rs | 233 ++++++++++++++++++ server/src/chat/transport/matrix/bot/run.rs | 22 +- server/src/http/mcp/diagnostics/permission.rs | 13 +- 5 files changed, 267 insertions(+), 68 deletions(-) create mode 100644 server/src/chat/transport/matrix/bot/permission_listener.rs diff --git a/server/src/chat/transport/matrix/bot/messages/handle_message.rs b/server/src/chat/transport/matrix/bot/messages/handle_message.rs index aa75b994..8ef116cf 100644 --- a/server/src/chat/transport/matrix/bot/messages/handle_message.rs +++ b/server/src/chat/transport/matrix/bot/messages/handle_message.rs @@ -2,13 +2,11 @@ //! streams the assistant reply back to the room. use crate::chat::util::drain_complete_paragraphs; -use crate::http::context::PermissionDecision; use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult}; use crate::slog; use matrix_sdk::ruma::OwnedRoomId; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::Duration; use tokio::sync::watch; use super::super::context::BotContext; @@ -113,64 +111,10 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message( ); tokio::pin!(chat_fut); - // Lock the permission receiver for the duration of this chat session. - // Permission requests from the MCP `prompt_permission` tool arrive here. - let mut perm_rx_guard = ctx.services.perm_rx.lock().await; - - let result = loop { - tokio::select! { - r = &mut chat_fut => break r, - - Some(perm_fwd) = perm_rx_guard.recv() => { - // Post the permission prompt to the room via the transport. - let prompt_msg = format!( - "**Permission Request**\n\n\ - Tool: `{}`\n```json\n{}\n```\n\n\ - Reply **yes** to approve or **no** to deny.", - perm_fwd.tool_name, - serde_json::to_string_pretty(&perm_fwd.tool_input) - .unwrap_or_else(|_| perm_fwd.tool_input.to_string()), - ); - let html = markdown_to_html(&prompt_msg); - if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &prompt_msg, &html).await - && let Ok(event_id) = msg_id.parse() - { - sent_ids.lock().await.insert(event_id); - } - - // Store the MCP oneshot sender so the event handler can - // resolve it when the user replies yes/no. - ctx.services.pending_perm_replies - .lock() - .await - .insert(room_id.to_string(), perm_fwd.response_tx); - - // Spawn a timeout task: auto-deny if the user does not respond. - let pending = Arc::clone(&ctx.services.pending_perm_replies); - let timeout_room_id = room_id.to_string(); - let timeout_transport = Arc::clone(&ctx.transport); - let timeout_room_id_str = room_id_str.clone(); - let timeout_sent_ids = Arc::clone(&ctx.bot_sent_event_ids); - let timeout_secs = ctx.services.permission_timeout_secs; - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(timeout_secs)).await; - if let Some(tx) = pending.lock().await.remove(&timeout_room_id) { - let _ = tx.send(PermissionDecision::Deny); - let msg = "Permission request timed out — denied (fail-closed)."; - let html = markdown_to_html(msg); - if let Ok(msg_id) = timeout_transport - .send_message(&timeout_room_id_str, msg, &html) - .await - && let Ok(event_id) = msg_id.parse() - { - timeout_sent_ids.lock().await.insert(event_id); - } - } - }); - } - } - }; - drop(perm_rx_guard); + // Permission requests are handled by the persistent permission_listener + // task spawned at bot startup (story 884) — they no longer route through + // per-message handlers. Just await chat_fut. + let result = (&mut chat_fut).await; // Flush any remaining text that didn't end with a paragraph boundary. let remaining = buffer.lock().unwrap().trim().to_string(); diff --git a/server/src/chat/transport/matrix/bot/mod.rs b/server/src/chat/transport/matrix/bot/mod.rs index 3421088b..b614b57e 100644 --- a/server/src/chat/transport/matrix/bot/mod.rs +++ b/server/src/chat/transport/matrix/bot/mod.rs @@ -9,6 +9,9 @@ pub mod history; pub mod mentions; /// Message handlers — processes incoming Matrix room messages. pub mod messages; +/// Permission listener — holds perm_rx for the bot's lifetime and forwards +/// permission requests to the configured Matrix room. +pub mod permission_listener; /// Bot run loop — the main async task that drives the Matrix sync loop. pub mod run; /// Device verification — handles Matrix cross-signing and emoji verification flows. diff --git a/server/src/chat/transport/matrix/bot/permission_listener.rs b/server/src/chat/transport/matrix/bot/permission_listener.rs new file mode 100644 index 00000000..3e6bf788 --- /dev/null +++ b/server/src/chat/transport/matrix/bot/permission_listener.rs @@ -0,0 +1,233 @@ +//! Background task that holds `perm_rx` for the bot's lifetime and forwards +//! permission requests to the configured Matrix room. +//! +//! Before story 884, each chat message handler acquired `perm_rx` for the +//! duration of one chat_fut and dropped it afterwards. That meant whenever +//! the bot wasn't actively responding, `prompt_permission` auto-denied any +//! spawned coder bash call as "no interactive session" — making unattended +//! coder work impossible. This task holds the lock continuously while the +//! bot is connected, so requests can flow at any time. + +use crate::chat::ChatTransport; +use crate::http::context::PermissionDecision; +use crate::services::Services; +use crate::slog; +use matrix_sdk::ruma::{OwnedEventId, OwnedRoomId}; +use std::collections::HashSet; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex as TokioMutex; + +use super::format::markdown_to_html; + +/// Spawn a background task that holds `services.perm_rx` for the bot's +/// lifetime and forwards each incoming permission request to `target_room` +/// as a chat message. Replies (yes/no) are resolved by the existing +/// `on_room_message` handler via `pending_perm_replies`. +/// +/// Returns the JoinHandle so the caller can keep ownership; the task exits +/// only when the `perm_rx` channel is closed (bot shutdown). +pub fn spawn_permission_listener( + services: Arc, + transport: Arc, + target_room: OwnedRoomId, + bot_sent_event_ids: Arc>>, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let mut perm_rx = services.perm_rx.lock().await; + let target_room_str = target_room.as_str().to_string(); + slog!( + "[matrix-bot] permission listener started; forwarding requests to {target_room_str}" + ); + + while let Some(perm_fwd) = perm_rx.recv().await { + let prompt_msg = format!( + "**Permission Request**\n\n\ + Tool: `{}`\n```json\n{}\n```\n\n\ + Reply **yes** to approve or **no** to deny.", + perm_fwd.tool_name, + serde_json::to_string_pretty(&perm_fwd.tool_input) + .unwrap_or_else(|_| perm_fwd.tool_input.to_string()), + ); + let html = markdown_to_html(&prompt_msg); + if let Ok(msg_id) = transport + .send_message(&target_room_str, &prompt_msg, &html) + .await + && let Ok(event_id) = msg_id.parse::() + { + bot_sent_event_ids.lock().await.insert(event_id); + } + + // Store the MCP oneshot sender so on_room_message can resolve it + // when the user replies yes/no in the target room. + services + .pending_perm_replies + .lock() + .await + .insert(target_room.to_string(), perm_fwd.response_tx); + + // Spawn a per-request timeout: auto-deny if the user does not + // respond within `permission_timeout_secs`. + let pending = Arc::clone(&services.pending_perm_replies); + let timeout_room_key = target_room.to_string(); + let timeout_transport = Arc::clone(&transport); + let timeout_room_str = target_room_str.clone(); + let timeout_sent_ids = Arc::clone(&bot_sent_event_ids); + let timeout_secs = services.permission_timeout_secs; + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(timeout_secs)).await; + if let Some(tx) = pending.lock().await.remove(&timeout_room_key) { + let _ = tx.send(PermissionDecision::Deny); + let msg = "Permission request timed out — denied (fail-closed)."; + let html = markdown_to_html(msg); + if let Ok(msg_id) = timeout_transport + .send_message(&timeout_room_str, msg, &html) + .await + && let Ok(event_id) = msg_id.parse::() + { + timeout_sent_ids.lock().await.insert(event_id); + } + } + }); + } + + slog!("[matrix-bot] permission listener exiting (perm_rx channel closed)"); + }) +} + +#[cfg(test)] +mod tests { + //! Story 884 regression: spawn the listener with no in-flight chat + //! message, send a PermissionForward through the channel, and verify + //! the listener forwards it to the configured Matrix room (instead of + //! being auto-denied for "no interactive session"). + + use super::*; + use crate::http::context::PermissionForward; + use crate::services::Services; + use async_trait::async_trait; + use serde_json::json; + use std::collections::HashMap; + use tokio::sync::{mpsc, oneshot}; + + struct RecordingTransport { + sent: Arc>>, + } + + #[async_trait] + impl crate::chat::ChatTransport for RecordingTransport { + async fn send_message( + &self, + room_id: &str, + plain: &str, + _html: &str, + ) -> Result { + self.sent + .lock() + .unwrap() + .push((room_id.to_string(), plain.to_string())); + // Matrix event IDs start with `$`; OwnedEventId::parse expects + // a valid form so the listener can insert it into bot_sent_event_ids. + Ok("$test_event_id:example.com".to_string()) + } + + async fn edit_message( + &self, + _: &str, + _: &str, + _: &str, + _: &str, + ) -> Result<(), String> { + Ok(()) + } + + async fn send_typing(&self, _: &str, _: bool) -> Result<(), String> { + Ok(()) + } + } + + fn test_services_with_tx() -> ( + Arc, + mpsc::UnboundedSender, + ) { + let (perm_tx, perm_rx) = mpsc::unbounded_channel(); + let services = Arc::new(Services { + project_root: std::path::PathBuf::from("/tmp/test"), + agents: Arc::new(crate::agents::AgentPool::new_test(3000)), + bot_name: "Assistant".to_string(), + bot_user_id: "@bot:example.com".to_string(), + ambient_rooms: Arc::new(std::sync::Mutex::new( + std::collections::HashSet::new(), + )), + perm_rx: Arc::new(TokioMutex::new(perm_rx)), + pending_perm_replies: Arc::new(TokioMutex::new(HashMap::new())), + permission_timeout_secs: 120, + status: Arc::new(crate::service::status::StatusBroadcaster::new()), + }); + (services, perm_tx) + } + + #[tokio::test] + async fn listener_forwards_request_to_target_room_when_no_chat_in_flight() { + let (services, perm_tx) = test_services_with_tx(); + let sent: Arc>> = + Arc::new(std::sync::Mutex::new(Vec::new())); + let transport: Arc = Arc::new(RecordingTransport { + sent: Arc::clone(&sent), + }); + let target_room: OwnedRoomId = "!perm:example.com".parse().unwrap(); + let bot_sent_event_ids = Arc::new(TokioMutex::new(HashSet::new())); + + spawn_permission_listener( + Arc::clone(&services), + Arc::clone(&transport), + target_room.clone(), + Arc::clone(&bot_sent_event_ids), + ); + + // Yield so the listener task acquires perm_rx and starts recv'ing. + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + let (response_tx, _response_rx) = oneshot::channel(); + perm_tx + .send(PermissionForward { + request_id: "req-1".to_string(), + tool_name: "Bash".to_string(), + tool_input: json!({"command": "cargo test"}), + response_tx, + }) + .expect("send PermissionForward"); + + // Give the listener a moment to process. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // The transport must have received exactly one send_message to the + // target room with the prompt content. + let recorded = sent.lock().unwrap().clone(); + assert_eq!(recorded.len(), 1, "expected exactly one send_message"); + assert_eq!(recorded[0].0, target_room.as_str()); + assert!( + recorded[0].1.contains("Permission Request"), + "prompt body missing 'Permission Request' header: {}", + recorded[0].1 + ); + assert!( + recorded[0].1.contains("Bash"), + "prompt body missing tool name: {}", + recorded[0].1 + ); + + // pending_perm_replies must contain an entry keyed by the target room + // (so the user-reply handler can resolve the request when they reply). + let pending = services.pending_perm_replies.lock().await; + assert!( + pending.contains_key(target_room.as_str()), + "pending_perm_replies missing entry for target room" + ); + + // bot_sent_event_ids must have recorded the prompt's event ID so the + // bot does not echo its own prompt back as user input. + let sent_ids = bot_sent_event_ids.lock().await; + assert_eq!(sent_ids.len(), 1, "expected one sent event ID recorded"); + } +} diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index 5d06abc8..c8746ce2 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -275,6 +275,26 @@ pub async fn run_bot( } } + // Hoist bot_sent_event_ids out of BotContext so the permission listener + // can share it (the listener tracks which permission-prompt messages it + // posted so the bot doesn't echo them back as user input). + let bot_sent_event_ids: Arc>> = + Arc::new(TokioMutex::new(HashSet::new())); + + // Spawn the permission listener: holds `perm_rx` for the bot's lifetime + // and forwards permission requests to the first configured room. Story + // 884 — replaces the per-message lock acquire previously done in + // handle_message.rs, so spawned coders' bash calls reach chat even when + // the bot isn't actively responding. + if let Some(target_room) = target_room_ids.first() { + super::permission_listener::spawn_permission_listener( + Arc::clone(&services), + Arc::clone(&transport), + target_room.clone(), + Arc::clone(&bot_sent_event_ids), + ); + } + let ctx = BotContext { services, matrix_user_id: bot_user_id, @@ -282,7 +302,7 @@ pub async fn run_bot( allowed_users: config.allowed_users, history: Arc::new(TokioMutex::new(persisted)), history_size: config.history_size, - bot_sent_event_ids: Arc::new(TokioMutex::new(HashSet::new())), + bot_sent_event_ids, htop_sessions: Arc::new(TokioMutex::new(HashMap::new())), transport: Arc::clone(&transport), timer_store, diff --git a/server/src/http/mcp/diagnostics/permission.rs b/server/src/http/mcp/diagnostics/permission.rs index 3f926598..9443c890 100644 --- a/server/src/http/mcp/diagnostics/permission.rs +++ b/server/src/http/mcp/diagnostics/permission.rs @@ -30,13 +30,12 @@ pub(crate) async fn tool_prompt_permission( } // Auto-deny immediately if no interactive session is currently listening on - // perm_rx. Interactive sessions (WebSocket, Matrix bot chat) hold the - // perm_rx lock for the duration of a chat. If try_lock succeeds, nobody is - // listening — this is a background agent call that should never reach chat. - // - // Without this check, agent permission requests queue in the channel and - // get forwarded to Matrix/Slack/etc. at the start of the next user session, - // flooding chat with stale agent prompts. + // perm_rx. Story 884 made the Matrix bot hold this lock for its lifetime + // via the permission_listener task spawned at startup, so requests reach + // chat asynchronously regardless of whether a chat message is in flight. + // Other transports (Discord/Slack/WhatsApp) still acquire per message; if + // none is active, try_lock succeeds — auto-deny so background agent calls + // don't queue and flood chat at the next user session. if ctx.services.perm_rx.try_lock().is_ok() { crate::slog!( "[permission] Auto-denied '{tool_name}' (no interactive session — agent mode)"