storkit: merge 394_story_whatsapp_and_slack_permission_prompt_forwarding

This commit is contained in:
dave
2026-03-25 15:31:54 +00:00
parent 65e3643655
commit 6521c83eec
3 changed files with 219 additions and 39 deletions
+101 -20
View File
@@ -11,12 +11,14 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Write as FmtWrite;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::Mutex as TokioMutex;
use crate::agents::AgentPool;
use crate::chat::transport::matrix::{ConversationEntry, ConversationRole, RoomConversation};
use crate::slog;
use crate::chat::{ChatTransport, MessageId};
use crate::http::context::{PermissionDecision, PermissionForward};
// ── Slack API base URL (overridable for tests) ──────────────────────────
@@ -506,6 +508,13 @@ pub struct SlackWebhookContext {
pub history_size: usize,
/// Allowed channel IDs (messages from other channels are ignored).
pub channel_ids: HashSet<String>,
/// Permission requests from the MCP `prompt_permission` tool arrive here.
pub perm_rx: Arc<TokioMutex<tokio::sync::mpsc::UnboundedReceiver<PermissionForward>>>,
/// Pending permission replies keyed by channel ID.
pub pending_perm_replies:
Arc<TokioMutex<HashMap<String, oneshot::Sender<PermissionDecision>>>>,
/// Seconds before an unanswered permission prompt is auto-denied.
pub permission_timeout_secs: u64,
}
/// POST /webhook/slack — receive incoming events from Slack Events API.
@@ -696,6 +705,15 @@ pub async fn slash_command_receive(
}
/// Dispatch an incoming Slack message to bot commands or LLM.
/// Returns `true` if the message body should be interpreted as permission approval.
fn is_permission_approval(body: &str) -> bool {
let trimmed = body.trim().to_ascii_lowercase();
matches!(
trimmed.as_str(),
"yes" | "y" | "approve" | "allow" | "ok"
)
}
async fn handle_incoming_message(
ctx: &SlackWebhookContext,
channel: &str,
@@ -704,6 +722,28 @@ async fn handle_incoming_message(
) {
use crate::chat::commands::{CommandDispatch, try_handle_command};
// If there is a pending permission prompt for this channel, interpret the
// message as a yes/no response instead of starting a new command/LLM flow.
{
let mut pending = ctx.pending_perm_replies.lock().await;
if let Some(tx) = pending.remove(channel) {
let decision = if is_permission_approval(message) {
PermissionDecision::Approve
} else {
PermissionDecision::Deny
};
let _ = tx.send(decision);
let confirmation = if decision == PermissionDecision::Approve {
"Permission approved."
} else {
"Permission denied."
};
let formatted = markdown_to_slack(confirmation);
let _ = ctx.transport.send_message(channel, &formatted, "").await;
return;
}
}
let dispatch = CommandDispatch {
bot_name: &ctx.bot_name,
bot_user_id: &ctx.bot_user_id,
@@ -856,26 +896,67 @@ async fn handle_llm_message(
let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk);
let project_root_str = ctx.project_root.to_string_lossy().to_string();
let result = provider
.chat_stream(
&prompt,
&project_root_str,
resume_session_id.as_deref(),
None,
&mut cancel_rx,
move |token| {
let mut buf = buffer_for_callback.lock().unwrap();
buf.push_str(token);
let paragraphs = drain_complete_paragraphs(&mut buf);
for chunk in paragraphs {
sent_any_chunk_for_callback.store(true, Ordering::Relaxed);
let _ = msg_tx_for_callback.send(chunk);
}
},
|_thinking| {},
|_activity| {},
)
.await;
let chat_fut = provider.chat_stream(
&prompt,
&project_root_str,
resume_session_id.as_deref(),
None,
&mut cancel_rx,
move |token| {
let mut buf = buffer_for_callback.lock().unwrap();
buf.push_str(token);
let paragraphs = drain_complete_paragraphs(&mut buf);
for chunk in paragraphs {
sent_any_chunk_for_callback.store(true, Ordering::Relaxed);
let _ = msg_tx_for_callback.send(chunk);
}
},
|_thinking| {},
|_activity| {},
);
tokio::pin!(chat_fut);
// Lock the permission receiver for the duration of this chat session.
let mut perm_rx_guard = ctx.perm_rx.lock().await;
let result = loop {
tokio::select! {
r = &mut chat_fut => break r,
Some(perm_fwd) = perm_rx_guard.recv() => {
let prompt_msg = format!(
"*Permission Request*\n\nTool: `{}`\n```json\n{}\n```\n\nReply *yes* to approve or *no* to deny.",
perm_fwd.tool_name,
serde_json::to_string_pretty(&perm_fwd.tool_input)
.unwrap_or_else(|_| perm_fwd.tool_input.to_string()),
);
let formatted = markdown_to_slack(&prompt_msg);
let _ = ctx.transport.send_message(channel, &formatted, "").await;
// Store the response sender so the incoming message handler
// can resolve it when the user replies yes/no.
ctx.pending_perm_replies
.lock()
.await
.insert(channel.to_string(), perm_fwd.response_tx);
// Spawn a timeout task: auto-deny if the user does not respond.
let pending = Arc::clone(&ctx.pending_perm_replies);
let timeout_channel = channel.to_string();
let timeout_transport = Arc::clone(&ctx.transport);
let timeout_secs = ctx.permission_timeout_secs;
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await;
if let Some(tx) = pending.lock().await.remove(&timeout_channel) {
let _ = tx.send(PermissionDecision::Deny);
let msg = "Permission request timed out — denied (fail-closed).";
let _ = timeout_transport.send_message(&timeout_channel, msg, "").await;
}
});
}
}
};
drop(perm_rx_guard);
// Flush remaining text.
let remaining = buffer.lock().unwrap().trim().to_string();
+106 -19
View File
@@ -11,11 +11,13 @@ use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::Mutex as TokioMutex;
use crate::agents::AgentPool;
use crate::chat::transport::matrix::{ConversationEntry, ConversationRole, RoomConversation};
use crate::chat::{ChatTransport, MessageId};
use crate::http::context::{PermissionDecision, PermissionForward};
use crate::slog;
// ── API base URLs (overridable for tests) ────────────────────────────────
@@ -884,6 +886,13 @@ pub struct WhatsAppWebhookContext {
/// Phone numbers allowed to send messages to the bot.
/// When empty, all numbers are allowed (backwards compatible).
pub allowed_phones: Vec<String>,
/// Permission requests from the MCP `prompt_permission` tool arrive here.
pub perm_rx: Arc<TokioMutex<tokio::sync::mpsc::UnboundedReceiver<PermissionForward>>>,
/// Pending permission replies keyed by sender phone number.
pub pending_perm_replies:
Arc<TokioMutex<HashMap<String, oneshot::Sender<PermissionDecision>>>>,
/// Seconds before an unanswered permission prompt is auto-denied.
pub permission_timeout_secs: u64,
}
/// GET /webhook/whatsapp — webhook verification.
@@ -976,6 +985,15 @@ pub async fn webhook_receive(
Response::builder().status(StatusCode::OK).body("ok")
}
/// Returns `true` if the message body should be interpreted as permission approval.
fn is_permission_approval(body: &str) -> bool {
let trimmed = body.trim().to_ascii_lowercase();
matches!(
trimmed.as_str(),
"yes" | "y" | "approve" | "allow" | "ok"
)
}
/// Dispatch an incoming WhatsApp message to bot commands.
async fn handle_incoming_message(ctx: &WhatsAppWebhookContext, sender: &str, message: &str) {
use crate::chat::commands::{CommandDispatch, try_handle_command};
@@ -991,6 +1009,28 @@ async fn handle_incoming_message(ctx: &WhatsAppWebhookContext, sender: &str, mes
// Record this inbound message to keep the 24-hour window open.
ctx.window_tracker.record_message(sender);
// If there is a pending permission prompt for this sender, interpret the
// message as a yes/no response instead of starting a new command/LLM flow.
{
let mut pending = ctx.pending_perm_replies.lock().await;
if let Some(tx) = pending.remove(sender) {
let decision = if is_permission_approval(message) {
PermissionDecision::Approve
} else {
PermissionDecision::Deny
};
let _ = tx.send(decision);
let confirmation = if decision == PermissionDecision::Approve {
"Permission approved."
} else {
"Permission denied."
};
let formatted = markdown_to_whatsapp(confirmation);
let _ = ctx.transport.send_message(sender, &formatted, "").await;
return;
}
}
let dispatch = CommandDispatch {
bot_name: &ctx.bot_name,
bot_user_id: &ctx.bot_user_id,
@@ -1114,26 +1154,69 @@ async fn handle_llm_message(ctx: &WhatsAppWebhookContext, sender: &str, user_mes
let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk);
let project_root_str = ctx.project_root.to_string_lossy().to_string();
let result = provider
.chat_stream(
&prompt,
&project_root_str,
resume_session_id.as_deref(),
None,
&mut cancel_rx,
move |token| {
let mut buf = buffer_for_callback.lock().unwrap();
buf.push_str(token);
let paragraphs = drain_complete_paragraphs(&mut buf);
for chunk in paragraphs {
sent_any_chunk_for_callback.store(true, Ordering::Relaxed);
let _ = msg_tx_for_callback.send(chunk);
let chat_fut = provider.chat_stream(
&prompt,
&project_root_str,
resume_session_id.as_deref(),
None,
&mut cancel_rx,
move |token| {
let mut buf = buffer_for_callback.lock().unwrap();
buf.push_str(token);
let paragraphs = drain_complete_paragraphs(&mut buf);
for chunk in paragraphs {
sent_any_chunk_for_callback.store(true, Ordering::Relaxed);
let _ = msg_tx_for_callback.send(chunk);
}
},
|_thinking| {},
|_activity| {},
);
tokio::pin!(chat_fut);
// Lock the permission receiver for the duration of this chat session.
let mut perm_rx_guard = ctx.perm_rx.lock().await;
let result = loop {
tokio::select! {
r = &mut chat_fut => break r,
Some(perm_fwd) = perm_rx_guard.recv() => {
let prompt_msg = format!(
"*Permission Request*\n\nTool: `{}`\n```json\n{}\n```\n\nReply *yes* to approve or *no* to deny.",
perm_fwd.tool_name,
serde_json::to_string_pretty(&perm_fwd.tool_input)
.unwrap_or_else(|_| perm_fwd.tool_input.to_string()),
);
let formatted = markdown_to_whatsapp(&prompt_msg);
for part in chunk_for_whatsapp(&formatted) {
let _ = ctx.transport.send_message(sender, &part, "").await;
}
},
|_thinking| {},
|_activity| {},
)
.await;
// Store the response sender so the incoming message handler
// can resolve it when the user replies yes/no.
ctx.pending_perm_replies
.lock()
.await
.insert(sender.to_string(), perm_fwd.response_tx);
// Spawn a timeout task: auto-deny if the user does not respond.
let pending = Arc::clone(&ctx.pending_perm_replies);
let timeout_sender = sender.to_string();
let timeout_transport = Arc::clone(&ctx.transport);
let timeout_secs = ctx.permission_timeout_secs;
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await;
if let Some(tx) = pending.lock().await.remove(&timeout_sender) {
let _ = tx.send(PermissionDecision::Deny);
let msg = "Permission request timed out — denied (fail-closed).";
let _ = timeout_transport.send_message(&timeout_sender, msg, "").await;
}
});
}
}
};
drop(perm_rx_guard);
// Flush remaining text.
let remaining = buffer.lock().unwrap().trim().to_string();
@@ -1870,6 +1953,7 @@ mod tests {
let (tx, _rx) = tokio::sync::broadcast::channel::<WatcherEvent>(16);
let agents = Arc::new(AgentPool::new(3999, tx));
let tracker = Arc::new(MessagingWindowTracker::new());
let (_perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel();
Arc::new(WhatsAppWebhookContext {
verify_token: "tok".to_string(),
provider: "meta".to_string(),
@@ -1883,6 +1967,9 @@ mod tests {
history_size: 20,
window_tracker: tracker,
allowed_phones,
perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)),
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(Default::default())),
permission_timeout_secs: 120,
})
}
+12
View File
@@ -254,6 +254,8 @@ async fn main() -> Result<(), std::io::Error> {
// handler (via AppContext) and the Matrix bot.
let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx));
let perm_rx_for_bot = Arc::clone(&perm_rx);
let perm_rx_for_whatsapp = Arc::clone(&perm_rx);
let perm_rx_for_slack = Arc::clone(&perm_rx);
// Capture project root, agents Arc, and reconciliation sender before ctx
// is consumed by build_routes.
@@ -307,6 +309,11 @@ async fn main() -> Result<(), std::io::Error> {
history_size: cfg.history_size,
window_tracker: Arc::new(chat::transport::whatsapp::MessagingWindowTracker::new()),
allowed_phones: cfg.whatsapp_allowed_phones.clone(),
perm_rx: perm_rx_for_whatsapp,
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(
std::collections::HashMap::new(),
)),
permission_timeout_secs: cfg.permission_timeout_secs,
})
});
@@ -338,6 +345,11 @@ async fn main() -> Result<(), std::io::Error> {
history: std::sync::Arc::new(tokio::sync::Mutex::new(history)),
history_size: cfg.history_size,
channel_ids,
perm_rx: perm_rx_for_slack,
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(
std::collections::HashMap::new(),
)),
permission_timeout_secs: cfg.permission_timeout_secs,
})
});