From 6521c83eecdcc59881ade750812650009408dc4f Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 25 Mar 2026 15:31:54 +0000 Subject: [PATCH] storkit: merge 394_story_whatsapp_and_slack_permission_prompt_forwarding --- server/src/chat/transport/slack.rs | 121 ++++++++++++++++++++----- server/src/chat/transport/whatsapp.rs | 125 ++++++++++++++++++++++---- server/src/main.rs | 12 +++ 3 files changed, 219 insertions(+), 39 deletions(-) diff --git a/server/src/chat/transport/slack.rs b/server/src/chat/transport/slack.rs index 7a1f0f1a..89046c22 100644 --- a/server/src/chat/transport/slack.rs +++ b/server/src/chat/transport/slack.rs @@ -11,12 +11,14 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt::Write as FmtWrite; use std::sync::Arc; +use tokio::sync::oneshot; use tokio::sync::Mutex as TokioMutex; use crate::agents::AgentPool; use crate::chat::transport::matrix::{ConversationEntry, ConversationRole, RoomConversation}; use crate::slog; use crate::chat::{ChatTransport, MessageId}; +use crate::http::context::{PermissionDecision, PermissionForward}; // ── Slack API base URL (overridable for tests) ────────────────────────── @@ -506,6 +508,13 @@ pub struct SlackWebhookContext { pub history_size: usize, /// Allowed channel IDs (messages from other channels are ignored). pub channel_ids: HashSet, + /// Permission requests from the MCP `prompt_permission` tool arrive here. + pub perm_rx: Arc>>, + /// Pending permission replies keyed by channel ID. + pub pending_perm_replies: + Arc>>>, + /// Seconds before an unanswered permission prompt is auto-denied. + pub permission_timeout_secs: u64, } /// POST /webhook/slack — receive incoming events from Slack Events API. @@ -696,6 +705,15 @@ pub async fn slash_command_receive( } /// Dispatch an incoming Slack message to bot commands or LLM. +/// Returns `true` if the message body should be interpreted as permission approval. +fn is_permission_approval(body: &str) -> bool { + let trimmed = body.trim().to_ascii_lowercase(); + matches!( + trimmed.as_str(), + "yes" | "y" | "approve" | "allow" | "ok" + ) +} + async fn handle_incoming_message( ctx: &SlackWebhookContext, channel: &str, @@ -704,6 +722,28 @@ async fn handle_incoming_message( ) { use crate::chat::commands::{CommandDispatch, try_handle_command}; + // If there is a pending permission prompt for this channel, interpret the + // message as a yes/no response instead of starting a new command/LLM flow. + { + let mut pending = ctx.pending_perm_replies.lock().await; + if let Some(tx) = pending.remove(channel) { + let decision = if is_permission_approval(message) { + PermissionDecision::Approve + } else { + PermissionDecision::Deny + }; + let _ = tx.send(decision); + let confirmation = if decision == PermissionDecision::Approve { + "Permission approved." + } else { + "Permission denied." + }; + let formatted = markdown_to_slack(confirmation); + let _ = ctx.transport.send_message(channel, &formatted, "").await; + return; + } + } + let dispatch = CommandDispatch { bot_name: &ctx.bot_name, bot_user_id: &ctx.bot_user_id, @@ -856,26 +896,67 @@ async fn handle_llm_message( let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk); let project_root_str = ctx.project_root.to_string_lossy().to_string(); - let result = provider - .chat_stream( - &prompt, - &project_root_str, - resume_session_id.as_deref(), - None, - &mut cancel_rx, - move |token| { - let mut buf = buffer_for_callback.lock().unwrap(); - buf.push_str(token); - let paragraphs = drain_complete_paragraphs(&mut buf); - for chunk in paragraphs { - sent_any_chunk_for_callback.store(true, Ordering::Relaxed); - let _ = msg_tx_for_callback.send(chunk); - } - }, - |_thinking| {}, - |_activity| {}, - ) - .await; + let chat_fut = provider.chat_stream( + &prompt, + &project_root_str, + resume_session_id.as_deref(), + None, + &mut cancel_rx, + move |token| { + let mut buf = buffer_for_callback.lock().unwrap(); + buf.push_str(token); + let paragraphs = drain_complete_paragraphs(&mut buf); + for chunk in paragraphs { + sent_any_chunk_for_callback.store(true, Ordering::Relaxed); + let _ = msg_tx_for_callback.send(chunk); + } + }, + |_thinking| {}, + |_activity| {}, + ); + tokio::pin!(chat_fut); + + // Lock the permission receiver for the duration of this chat session. + let mut perm_rx_guard = ctx.perm_rx.lock().await; + + let result = loop { + tokio::select! { + r = &mut chat_fut => break r, + + Some(perm_fwd) = perm_rx_guard.recv() => { + let prompt_msg = format!( + "*Permission Request*\n\nTool: `{}`\n```json\n{}\n```\n\nReply *yes* to approve or *no* to deny.", + perm_fwd.tool_name, + serde_json::to_string_pretty(&perm_fwd.tool_input) + .unwrap_or_else(|_| perm_fwd.tool_input.to_string()), + ); + let formatted = markdown_to_slack(&prompt_msg); + let _ = ctx.transport.send_message(channel, &formatted, "").await; + + // Store the response sender so the incoming message handler + // can resolve it when the user replies yes/no. + ctx.pending_perm_replies + .lock() + .await + .insert(channel.to_string(), perm_fwd.response_tx); + + // Spawn a timeout task: auto-deny if the user does not respond. + let pending = Arc::clone(&ctx.pending_perm_replies); + let timeout_channel = channel.to_string(); + let timeout_transport = Arc::clone(&ctx.transport); + let timeout_secs = ctx.permission_timeout_secs; + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await; + if let Some(tx) = pending.lock().await.remove(&timeout_channel) { + let _ = tx.send(PermissionDecision::Deny); + let msg = "Permission request timed out — denied (fail-closed)."; + let _ = timeout_transport.send_message(&timeout_channel, msg, "").await; + } + }); + } + } + }; + drop(perm_rx_guard); // Flush remaining text. let remaining = buffer.lock().unwrap().trim().to_string(); diff --git a/server/src/chat/transport/whatsapp.rs b/server/src/chat/transport/whatsapp.rs index 21083720..898b488b 100644 --- a/server/src/chat/transport/whatsapp.rs +++ b/server/src/chat/transport/whatsapp.rs @@ -11,11 +11,13 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; +use tokio::sync::oneshot; use tokio::sync::Mutex as TokioMutex; use crate::agents::AgentPool; use crate::chat::transport::matrix::{ConversationEntry, ConversationRole, RoomConversation}; use crate::chat::{ChatTransport, MessageId}; +use crate::http::context::{PermissionDecision, PermissionForward}; use crate::slog; // ── API base URLs (overridable for tests) ──────────────────────────────── @@ -884,6 +886,13 @@ pub struct WhatsAppWebhookContext { /// Phone numbers allowed to send messages to the bot. /// When empty, all numbers are allowed (backwards compatible). pub allowed_phones: Vec, + /// Permission requests from the MCP `prompt_permission` tool arrive here. + pub perm_rx: Arc>>, + /// Pending permission replies keyed by sender phone number. + pub pending_perm_replies: + Arc>>>, + /// Seconds before an unanswered permission prompt is auto-denied. + pub permission_timeout_secs: u64, } /// GET /webhook/whatsapp — webhook verification. @@ -976,6 +985,15 @@ pub async fn webhook_receive( Response::builder().status(StatusCode::OK).body("ok") } +/// Returns `true` if the message body should be interpreted as permission approval. +fn is_permission_approval(body: &str) -> bool { + let trimmed = body.trim().to_ascii_lowercase(); + matches!( + trimmed.as_str(), + "yes" | "y" | "approve" | "allow" | "ok" + ) +} + /// Dispatch an incoming WhatsApp message to bot commands. async fn handle_incoming_message(ctx: &WhatsAppWebhookContext, sender: &str, message: &str) { use crate::chat::commands::{CommandDispatch, try_handle_command}; @@ -991,6 +1009,28 @@ async fn handle_incoming_message(ctx: &WhatsAppWebhookContext, sender: &str, mes // Record this inbound message to keep the 24-hour window open. ctx.window_tracker.record_message(sender); + // If there is a pending permission prompt for this sender, interpret the + // message as a yes/no response instead of starting a new command/LLM flow. + { + let mut pending = ctx.pending_perm_replies.lock().await; + if let Some(tx) = pending.remove(sender) { + let decision = if is_permission_approval(message) { + PermissionDecision::Approve + } else { + PermissionDecision::Deny + }; + let _ = tx.send(decision); + let confirmation = if decision == PermissionDecision::Approve { + "Permission approved." + } else { + "Permission denied." + }; + let formatted = markdown_to_whatsapp(confirmation); + let _ = ctx.transport.send_message(sender, &formatted, "").await; + return; + } + } + let dispatch = CommandDispatch { bot_name: &ctx.bot_name, bot_user_id: &ctx.bot_user_id, @@ -1114,26 +1154,69 @@ async fn handle_llm_message(ctx: &WhatsAppWebhookContext, sender: &str, user_mes let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk); let project_root_str = ctx.project_root.to_string_lossy().to_string(); - let result = provider - .chat_stream( - &prompt, - &project_root_str, - resume_session_id.as_deref(), - None, - &mut cancel_rx, - move |token| { - let mut buf = buffer_for_callback.lock().unwrap(); - buf.push_str(token); - let paragraphs = drain_complete_paragraphs(&mut buf); - for chunk in paragraphs { - sent_any_chunk_for_callback.store(true, Ordering::Relaxed); - let _ = msg_tx_for_callback.send(chunk); + let chat_fut = provider.chat_stream( + &prompt, + &project_root_str, + resume_session_id.as_deref(), + None, + &mut cancel_rx, + move |token| { + let mut buf = buffer_for_callback.lock().unwrap(); + buf.push_str(token); + let paragraphs = drain_complete_paragraphs(&mut buf); + for chunk in paragraphs { + sent_any_chunk_for_callback.store(true, Ordering::Relaxed); + let _ = msg_tx_for_callback.send(chunk); + } + }, + |_thinking| {}, + |_activity| {}, + ); + tokio::pin!(chat_fut); + + // Lock the permission receiver for the duration of this chat session. + let mut perm_rx_guard = ctx.perm_rx.lock().await; + + let result = loop { + tokio::select! { + r = &mut chat_fut => break r, + + Some(perm_fwd) = perm_rx_guard.recv() => { + let prompt_msg = format!( + "*Permission Request*\n\nTool: `{}`\n```json\n{}\n```\n\nReply *yes* to approve or *no* to deny.", + perm_fwd.tool_name, + serde_json::to_string_pretty(&perm_fwd.tool_input) + .unwrap_or_else(|_| perm_fwd.tool_input.to_string()), + ); + let formatted = markdown_to_whatsapp(&prompt_msg); + for part in chunk_for_whatsapp(&formatted) { + let _ = ctx.transport.send_message(sender, &part, "").await; } - }, - |_thinking| {}, - |_activity| {}, - ) - .await; + + // Store the response sender so the incoming message handler + // can resolve it when the user replies yes/no. + ctx.pending_perm_replies + .lock() + .await + .insert(sender.to_string(), perm_fwd.response_tx); + + // Spawn a timeout task: auto-deny if the user does not respond. + let pending = Arc::clone(&ctx.pending_perm_replies); + let timeout_sender = sender.to_string(); + let timeout_transport = Arc::clone(&ctx.transport); + let timeout_secs = ctx.permission_timeout_secs; + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await; + if let Some(tx) = pending.lock().await.remove(&timeout_sender) { + let _ = tx.send(PermissionDecision::Deny); + let msg = "Permission request timed out — denied (fail-closed)."; + let _ = timeout_transport.send_message(&timeout_sender, msg, "").await; + } + }); + } + } + }; + drop(perm_rx_guard); // Flush remaining text. let remaining = buffer.lock().unwrap().trim().to_string(); @@ -1870,6 +1953,7 @@ mod tests { let (tx, _rx) = tokio::sync::broadcast::channel::(16); let agents = Arc::new(AgentPool::new(3999, tx)); let tracker = Arc::new(MessagingWindowTracker::new()); + let (_perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel(); Arc::new(WhatsAppWebhookContext { verify_token: "tok".to_string(), provider: "meta".to_string(), @@ -1883,6 +1967,9 @@ mod tests { history_size: 20, window_tracker: tracker, allowed_phones, + perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), + pending_perm_replies: Arc::new(tokio::sync::Mutex::new(Default::default())), + permission_timeout_secs: 120, }) } diff --git a/server/src/main.rs b/server/src/main.rs index b8f696c1..2d585805 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -254,6 +254,8 @@ async fn main() -> Result<(), std::io::Error> { // handler (via AppContext) and the Matrix bot. let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx)); let perm_rx_for_bot = Arc::clone(&perm_rx); + let perm_rx_for_whatsapp = Arc::clone(&perm_rx); + let perm_rx_for_slack = Arc::clone(&perm_rx); // Capture project root, agents Arc, and reconciliation sender before ctx // is consumed by build_routes. @@ -307,6 +309,11 @@ async fn main() -> Result<(), std::io::Error> { history_size: cfg.history_size, window_tracker: Arc::new(chat::transport::whatsapp::MessagingWindowTracker::new()), allowed_phones: cfg.whatsapp_allowed_phones.clone(), + perm_rx: perm_rx_for_whatsapp, + pending_perm_replies: Arc::new(tokio::sync::Mutex::new( + std::collections::HashMap::new(), + )), + permission_timeout_secs: cfg.permission_timeout_secs, }) }); @@ -338,6 +345,11 @@ async fn main() -> Result<(), std::io::Error> { history: std::sync::Arc::new(tokio::sync::Mutex::new(history)), history_size: cfg.history_size, channel_ids, + perm_rx: perm_rx_for_slack, + pending_perm_replies: Arc::new(tokio::sync::Mutex::new( + std::collections::HashMap::new(), + )), + permission_timeout_secs: cfg.permission_timeout_secs, }) });