feat(884): persistent perm_rx lock-holder for Matrix bot

Before: handle_message.rs acquired services.perm_rx only while processing
one chat message and dropped it on chat_fut completion. The moment the
bot wasn't actively responding, prompt_permission auto-denied any spawned
coder bash call as "no interactive session" — making unattended coder
work impossible.

Now: a permission_listener task is spawned at bot startup and holds
perm_rx for the bot's lifetime. Permission requests are forwarded to
the first configured Matrix room, replies resolved by the existing
on_room_message handler via pending_perm_replies. Per-message acquire is
gone from handle_message.rs (chat_fut just awaits cleanly).

- New module: chat/transport/matrix/bot/permission_listener.rs.
- Wired into run_bot before BotContext construction; bot_sent_event_ids
  is hoisted out so the listener and the rest of the bot share it.
- handle_message.rs no longer touches perm_rx.
- diagnostics/permission.rs comment updated to reflect the new reality.
- Regression test asserts the listener forwards a PermissionForward to
  the target room and records the pending reply key — exactly the path
  that was broken when no chat_fut was in flight.

Discord/Slack/WhatsApp transports still acquire perm_rx per message
(commands.rs:368 / commands/llm.rs:83 / commands/llm.rs:82). They are
not the active transport in this deployment so their per-message acquire
remains dormant; the same listener pattern should be applied to them as
follow-up work in 884 phase 2.
This commit is contained in:
dave
2026-04-30 13:53:46 +00:00
parent 0e4a970e3a
commit 7ac3fc2e3e
5 changed files with 267 additions and 68 deletions
@@ -2,13 +2,11 @@
//! streams the assistant reply back to the room.
use crate::chat::util::drain_complete_paragraphs;
use crate::http::context::PermissionDecision;
use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult};
use crate::slog;
use matrix_sdk::ruma::OwnedRoomId;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::watch;
use super::super::context::BotContext;
@@ -113,64 +111,10 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message(
);
tokio::pin!(chat_fut);
// Lock the permission receiver for the duration of this chat session.
// Permission requests from the MCP `prompt_permission` tool arrive here.
let mut perm_rx_guard = ctx.services.perm_rx.lock().await;
let result = loop {
tokio::select! {
r = &mut chat_fut => break r,
Some(perm_fwd) = perm_rx_guard.recv() => {
// Post the permission prompt to the room via the transport.
let prompt_msg = format!(
"**Permission Request**\n\n\
Tool: `{}`\n```json\n{}\n```\n\n\
Reply **yes** to approve or **no** to deny.",
perm_fwd.tool_name,
serde_json::to_string_pretty(&perm_fwd.tool_input)
.unwrap_or_else(|_| perm_fwd.tool_input.to_string()),
);
let html = markdown_to_html(&prompt_msg);
if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &prompt_msg, &html).await
&& let Ok(event_id) = msg_id.parse()
{
sent_ids.lock().await.insert(event_id);
}
// Store the MCP oneshot sender so the event handler can
// resolve it when the user replies yes/no.
ctx.services.pending_perm_replies
.lock()
.await
.insert(room_id.to_string(), perm_fwd.response_tx);
// Spawn a timeout task: auto-deny if the user does not respond.
let pending = Arc::clone(&ctx.services.pending_perm_replies);
let timeout_room_id = room_id.to_string();
let timeout_transport = Arc::clone(&ctx.transport);
let timeout_room_id_str = room_id_str.clone();
let timeout_sent_ids = Arc::clone(&ctx.bot_sent_event_ids);
let timeout_secs = ctx.services.permission_timeout_secs;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(timeout_secs)).await;
if let Some(tx) = pending.lock().await.remove(&timeout_room_id) {
let _ = tx.send(PermissionDecision::Deny);
let msg = "Permission request timed out — denied (fail-closed).";
let html = markdown_to_html(msg);
if let Ok(msg_id) = timeout_transport
.send_message(&timeout_room_id_str, msg, &html)
.await
&& let Ok(event_id) = msg_id.parse()
{
timeout_sent_ids.lock().await.insert(event_id);
}
}
});
}
}
};
drop(perm_rx_guard);
// Permission requests are handled by the persistent permission_listener
// task spawned at bot startup (story 884) — they no longer route through
// per-message handlers. Just await chat_fut.
let result = (&mut chat_fut).await;
// Flush any remaining text that didn't end with a paragraph boundary.
let remaining = buffer.lock().unwrap().trim().to_string();
@@ -9,6 +9,9 @@ pub mod history;
pub mod mentions;
/// Message handlers — processes incoming Matrix room messages.
pub mod messages;
/// Permission listener — holds perm_rx for the bot's lifetime and forwards
/// permission requests to the configured Matrix room.
pub mod permission_listener;
/// Bot run loop — the main async task that drives the Matrix sync loop.
pub mod run;
/// Device verification — handles Matrix cross-signing and emoji verification flows.
@@ -0,0 +1,233 @@
//! Background task that holds `perm_rx` for the bot's lifetime and forwards
//! permission requests to the configured Matrix room.
//!
//! Before story 884, each chat message handler acquired `perm_rx` for the
//! duration of one chat_fut and dropped it afterwards. That meant whenever
//! the bot wasn't actively responding, `prompt_permission` auto-denied any
//! spawned coder bash call as "no interactive session" — making unattended
//! coder work impossible. This task holds the lock continuously while the
//! bot is connected, so requests can flow at any time.
use crate::chat::ChatTransport;
use crate::http::context::PermissionDecision;
use crate::services::Services;
use crate::slog;
use matrix_sdk::ruma::{OwnedEventId, OwnedRoomId};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex as TokioMutex;
use super::format::markdown_to_html;
/// Spawn a background task that holds `services.perm_rx` for the bot's
/// lifetime and forwards each incoming permission request to `target_room`
/// as a chat message. Replies (yes/no) are resolved by the existing
/// `on_room_message` handler via `pending_perm_replies`.
///
/// Returns the JoinHandle so the caller can keep ownership; the task exits
/// only when the `perm_rx` channel is closed (bot shutdown).
pub fn spawn_permission_listener(
services: Arc<Services>,
transport: Arc<dyn ChatTransport>,
target_room: OwnedRoomId,
bot_sent_event_ids: Arc<TokioMutex<HashSet<OwnedEventId>>>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut perm_rx = services.perm_rx.lock().await;
let target_room_str = target_room.as_str().to_string();
slog!(
"[matrix-bot] permission listener started; forwarding requests to {target_room_str}"
);
while let Some(perm_fwd) = perm_rx.recv().await {
let prompt_msg = format!(
"**Permission Request**\n\n\
Tool: `{}`\n```json\n{}\n```\n\n\
Reply **yes** to approve or **no** to deny.",
perm_fwd.tool_name,
serde_json::to_string_pretty(&perm_fwd.tool_input)
.unwrap_or_else(|_| perm_fwd.tool_input.to_string()),
);
let html = markdown_to_html(&prompt_msg);
if let Ok(msg_id) = transport
.send_message(&target_room_str, &prompt_msg, &html)
.await
&& let Ok(event_id) = msg_id.parse::<OwnedEventId>()
{
bot_sent_event_ids.lock().await.insert(event_id);
}
// Store the MCP oneshot sender so on_room_message can resolve it
// when the user replies yes/no in the target room.
services
.pending_perm_replies
.lock()
.await
.insert(target_room.to_string(), perm_fwd.response_tx);
// Spawn a per-request timeout: auto-deny if the user does not
// respond within `permission_timeout_secs`.
let pending = Arc::clone(&services.pending_perm_replies);
let timeout_room_key = target_room.to_string();
let timeout_transport = Arc::clone(&transport);
let timeout_room_str = target_room_str.clone();
let timeout_sent_ids = Arc::clone(&bot_sent_event_ids);
let timeout_secs = services.permission_timeout_secs;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(timeout_secs)).await;
if let Some(tx) = pending.lock().await.remove(&timeout_room_key) {
let _ = tx.send(PermissionDecision::Deny);
let msg = "Permission request timed out — denied (fail-closed).";
let html = markdown_to_html(msg);
if let Ok(msg_id) = timeout_transport
.send_message(&timeout_room_str, msg, &html)
.await
&& let Ok(event_id) = msg_id.parse::<OwnedEventId>()
{
timeout_sent_ids.lock().await.insert(event_id);
}
}
});
}
slog!("[matrix-bot] permission listener exiting (perm_rx channel closed)");
})
}
#[cfg(test)]
mod tests {
//! Story 884 regression: spawn the listener with no in-flight chat
//! message, send a PermissionForward through the channel, and verify
//! the listener forwards it to the configured Matrix room (instead of
//! being auto-denied for "no interactive session").
use super::*;
use crate::http::context::PermissionForward;
use crate::services::Services;
use async_trait::async_trait;
use serde_json::json;
use std::collections::HashMap;
use tokio::sync::{mpsc, oneshot};
struct RecordingTransport {
sent: Arc<std::sync::Mutex<Vec<(String, String)>>>,
}
#[async_trait]
impl crate::chat::ChatTransport for RecordingTransport {
async fn send_message(
&self,
room_id: &str,
plain: &str,
_html: &str,
) -> Result<crate::chat::MessageId, String> {
self.sent
.lock()
.unwrap()
.push((room_id.to_string(), plain.to_string()));
// Matrix event IDs start with `$`; OwnedEventId::parse expects
// a valid form so the listener can insert it into bot_sent_event_ids.
Ok("$test_event_id:example.com".to_string())
}
async fn edit_message(
&self,
_: &str,
_: &str,
_: &str,
_: &str,
) -> Result<(), String> {
Ok(())
}
async fn send_typing(&self, _: &str, _: bool) -> Result<(), String> {
Ok(())
}
}
fn test_services_with_tx() -> (
Arc<Services>,
mpsc::UnboundedSender<PermissionForward>,
) {
let (perm_tx, perm_rx) = mpsc::unbounded_channel();
let services = Arc::new(Services {
project_root: std::path::PathBuf::from("/tmp/test"),
agents: Arc::new(crate::agents::AgentPool::new_test(3000)),
bot_name: "Assistant".to_string(),
bot_user_id: "@bot:example.com".to_string(),
ambient_rooms: Arc::new(std::sync::Mutex::new(
std::collections::HashSet::new(),
)),
perm_rx: Arc::new(TokioMutex::new(perm_rx)),
pending_perm_replies: Arc::new(TokioMutex::new(HashMap::new())),
permission_timeout_secs: 120,
status: Arc::new(crate::service::status::StatusBroadcaster::new()),
});
(services, perm_tx)
}
#[tokio::test]
async fn listener_forwards_request_to_target_room_when_no_chat_in_flight() {
let (services, perm_tx) = test_services_with_tx();
let sent: Arc<std::sync::Mutex<Vec<(String, String)>>> =
Arc::new(std::sync::Mutex::new(Vec::new()));
let transport: Arc<dyn crate::chat::ChatTransport> = Arc::new(RecordingTransport {
sent: Arc::clone(&sent),
});
let target_room: OwnedRoomId = "!perm:example.com".parse().unwrap();
let bot_sent_event_ids = Arc::new(TokioMutex::new(HashSet::new()));
spawn_permission_listener(
Arc::clone(&services),
Arc::clone(&transport),
target_room.clone(),
Arc::clone(&bot_sent_event_ids),
);
// Yield so the listener task acquires perm_rx and starts recv'ing.
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let (response_tx, _response_rx) = oneshot::channel();
perm_tx
.send(PermissionForward {
request_id: "req-1".to_string(),
tool_name: "Bash".to_string(),
tool_input: json!({"command": "cargo test"}),
response_tx,
})
.expect("send PermissionForward");
// Give the listener a moment to process.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// The transport must have received exactly one send_message to the
// target room with the prompt content.
let recorded = sent.lock().unwrap().clone();
assert_eq!(recorded.len(), 1, "expected exactly one send_message");
assert_eq!(recorded[0].0, target_room.as_str());
assert!(
recorded[0].1.contains("Permission Request"),
"prompt body missing 'Permission Request' header: {}",
recorded[0].1
);
assert!(
recorded[0].1.contains("Bash"),
"prompt body missing tool name: {}",
recorded[0].1
);
// pending_perm_replies must contain an entry keyed by the target room
// (so the user-reply handler can resolve the request when they reply).
let pending = services.pending_perm_replies.lock().await;
assert!(
pending.contains_key(target_room.as_str()),
"pending_perm_replies missing entry for target room"
);
// bot_sent_event_ids must have recorded the prompt's event ID so the
// bot does not echo its own prompt back as user input.
let sent_ids = bot_sent_event_ids.lock().await;
assert_eq!(sent_ids.len(), 1, "expected one sent event ID recorded");
}
}
+21 -1
View File
@@ -275,6 +275,26 @@ pub async fn run_bot(
}
}
// Hoist bot_sent_event_ids out of BotContext so the permission listener
// can share it (the listener tracks which permission-prompt messages it
// posted so the bot doesn't echo them back as user input).
let bot_sent_event_ids: Arc<TokioMutex<HashSet<matrix_sdk::ruma::OwnedEventId>>> =
Arc::new(TokioMutex::new(HashSet::new()));
// Spawn the permission listener: holds `perm_rx` for the bot's lifetime
// and forwards permission requests to the first configured room. Story
// 884 — replaces the per-message lock acquire previously done in
// handle_message.rs, so spawned coders' bash calls reach chat even when
// the bot isn't actively responding.
if let Some(target_room) = target_room_ids.first() {
super::permission_listener::spawn_permission_listener(
Arc::clone(&services),
Arc::clone(&transport),
target_room.clone(),
Arc::clone(&bot_sent_event_ids),
);
}
let ctx = BotContext {
services,
matrix_user_id: bot_user_id,
@@ -282,7 +302,7 @@ pub async fn run_bot(
allowed_users: config.allowed_users,
history: Arc::new(TokioMutex::new(persisted)),
history_size: config.history_size,
bot_sent_event_ids: Arc::new(TokioMutex::new(HashSet::new())),
bot_sent_event_ids,
htop_sessions: Arc::new(TokioMutex::new(HashMap::new())),
transport: Arc::clone(&transport),
timer_store,
@@ -30,13 +30,12 @@ pub(crate) async fn tool_prompt_permission(
}
// Auto-deny immediately if no interactive session is currently listening on
// perm_rx. Interactive sessions (WebSocket, Matrix bot chat) hold the
// perm_rx lock for the duration of a chat. If try_lock succeeds, nobody is
// listening — this is a background agent call that should never reach chat.
//
// Without this check, agent permission requests queue in the channel and
// get forwarded to Matrix/Slack/etc. at the start of the next user session,
// flooding chat with stale agent prompts.
// perm_rx. Story 884 made the Matrix bot hold this lock for its lifetime
// via the permission_listener task spawned at startup, so requests reach
// chat asynchronously regardless of whether a chat message is in flight.
// Other transports (Discord/Slack/WhatsApp) still acquire per message; if
// none is active, try_lock succeeds — auto-deny so background agent calls
// don't queue and flood chat at the next user session.
if ctx.services.perm_rx.try_lock().is_ok() {
crate::slog!(
"[permission] Auto-denied '{tool_name}' (no interactive session — agent mode)"