use std::sync::Arc; use crate::chat::transport::matrix::{ConversationEntry, ConversationRole, RoomConversation}; use crate::http::context::{PermissionDecision}; use crate::slog; use super::WhatsAppWebhookContext; use super::format::{chunk_for_whatsapp, markdown_to_whatsapp}; use super::history::save_whatsapp_history; /// Returns `true` if the message body should be interpreted as permission approval. fn is_permission_approval(body: &str) -> bool { let trimmed = body.trim().to_ascii_lowercase(); matches!( trimmed.as_str(), "yes" | "y" | "approve" | "allow" | "ok" ) } /// Dispatch an incoming WhatsApp message to bot commands. pub(super) async fn handle_incoming_message(ctx: &WhatsAppWebhookContext, sender: &str, message: &str) { use crate::chat::commands::{CommandDispatch, try_handle_command}; // Allowlist check: when configured, silently ignore unauthorized senders. if !ctx.allowed_phones.is_empty() && !ctx.allowed_phones.iter().any(|p| p == sender) { slog!("[whatsapp] Ignoring message from unauthorized sender: {sender}"); return; } // Record this inbound message to keep the 24-hour window open. ctx.window_tracker.record_message(sender); // If there is a pending permission prompt for this sender, interpret the // message as a yes/no response instead of starting a new command/LLM flow. { let mut pending = ctx.pending_perm_replies.lock().await; if let Some(tx) = pending.remove(sender) { let decision = if is_permission_approval(message) { PermissionDecision::Approve } else { PermissionDecision::Deny }; let _ = tx.send(decision); let confirmation = if decision == PermissionDecision::Approve { "Permission approved." } else { "Permission denied." }; let formatted = markdown_to_whatsapp(confirmation); let _ = ctx.transport.send_message(sender, &formatted, "").await; return; } } let dispatch = CommandDispatch { bot_name: &ctx.bot_name, bot_user_id: &ctx.bot_user_id, project_root: &ctx.project_root, agents: &ctx.agents, ambient_rooms: &ctx.ambient_rooms, room_id: sender, }; if let Some(response) = try_handle_command(&dispatch, message) { slog!("[whatsapp] Sending command response to {sender}"); let formatted = markdown_to_whatsapp(&response); if let Err(e) = ctx.transport.send_message(sender, &formatted, "").await { slog!("[whatsapp] Failed to send reply to {sender}: {e}"); } return; } // Check for async commands (htop, delete). if let Some(htop_cmd) = crate::chat::transport::matrix::htop::extract_htop_command( message, &ctx.bot_name, &ctx.bot_user_id, ) { use crate::chat::transport::matrix::htop::HtopCommand; slog!("[whatsapp] Handling htop command from {sender}"); match htop_cmd { HtopCommand::Stop => { // htop stop — no-op on WhatsApp since there's no persistent // editable message; just acknowledge. let _ = ctx .transport .send_message(sender, "htop stopped.", "") .await; } HtopCommand::Start { duration_secs } => { // On WhatsApp, send a single snapshot instead of a live-updating // dashboard since we can't edit messages. let snapshot = crate::chat::transport::matrix::htop::build_htop_message( &ctx.agents, 0, duration_secs, ); let _ = ctx.transport.send_message(sender, &snapshot, "").await; } } return; } if let Some(del_cmd) = crate::chat::transport::matrix::delete::extract_delete_command( message, &ctx.bot_name, &ctx.bot_user_id, ) { let response = match del_cmd { crate::chat::transport::matrix::delete::DeleteCommand::Delete { story_number } => { slog!("[whatsapp] Handling delete command from {sender}: story {story_number}"); crate::chat::transport::matrix::delete::handle_delete( &ctx.bot_name, &story_number, &ctx.project_root, &ctx.agents, ) .await } crate::chat::transport::matrix::delete::DeleteCommand::BadArgs => { format!("Usage: `{} delete `", ctx.bot_name) } }; let _ = ctx.transport.send_message(sender, &response, "").await; return; } if crate::chat::transport::matrix::rebuild::extract_rebuild_command( message, &ctx.bot_name, &ctx.bot_user_id, ) .is_some() { slog!("[whatsapp] Handling rebuild command from {sender}"); let ack = "Rebuilding server… this may take a moment."; let _ = ctx.transport.send_message(sender, ack, "").await; let response = crate::chat::transport::matrix::rebuild::handle_rebuild( &ctx.bot_name, &ctx.project_root, &ctx.agents, ) .await; let _ = ctx.transport.send_message(sender, &response, "").await; return; } if let Some(rmtree_cmd) = crate::chat::transport::matrix::rmtree::extract_rmtree_command( message, &ctx.bot_name, &ctx.bot_user_id, ) { let response = match rmtree_cmd { crate::chat::transport::matrix::rmtree::RmtreeCommand::Rmtree { story_number } => { slog!("[whatsapp] Handling rmtree command from {sender}: story {story_number}"); crate::chat::transport::matrix::rmtree::handle_rmtree( &ctx.bot_name, &story_number, &ctx.project_root, &ctx.agents, ) .await } crate::chat::transport::matrix::rmtree::RmtreeCommand::BadArgs => { format!("Usage: `{} rmtree `", ctx.bot_name) } }; let _ = ctx.transport.send_message(sender, &response, "").await; return; } if crate::chat::transport::matrix::reset::extract_reset_command( message, &ctx.bot_name, &ctx.bot_user_id, ) .is_some() { slog!("[whatsapp] Handling reset command from {sender}"); { let mut guard = ctx.history.lock().await; let conv = guard.entry(sender.to_string()).or_insert_with(RoomConversation::default); conv.session_id = None; conv.entries.clear(); save_whatsapp_history(&ctx.project_root, &guard); } let _ = ctx .transport .send_message(sender, "Session cleared.", "") .await; return; } if let Some(start_cmd) = crate::chat::transport::matrix::start::extract_start_command( message, &ctx.bot_name, &ctx.bot_user_id, ) { let response = match start_cmd { crate::chat::transport::matrix::start::StartCommand::Start { story_number, agent_hint, } => { slog!("[whatsapp] Handling start command from {sender}: story {story_number}"); crate::chat::transport::matrix::start::handle_start( &ctx.bot_name, &story_number, agent_hint.as_deref(), &ctx.project_root, &ctx.agents, ) .await } crate::chat::transport::matrix::start::StartCommand::BadArgs => { format!("Usage: `{} start `", ctx.bot_name) } }; let _ = ctx.transport.send_message(sender, &response, "").await; return; } // No command matched — forward to LLM for conversational response. slog!("[whatsapp] No command matched, forwarding to LLM for {sender}"); handle_llm_message(ctx, sender, message).await; } /// Forward a message to Claude Code and send the response back via WhatsApp. async fn handle_llm_message(ctx: &WhatsAppWebhookContext, sender: &str, user_message: &str) { use crate::chat::util::drain_complete_paragraphs; use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult}; use std::sync::atomic::{AtomicBool, Ordering}; use tokio::sync::watch; // Look up existing session ID for this sender. let resume_session_id: Option = { let guard = ctx.history.lock().await; guard.get(sender).and_then(|conv| conv.session_id.clone()) }; let bot_name = &ctx.bot_name; let prompt = format!( "[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{sender}: {user_message}" ); let provider = ClaudeCodeProvider::new(); let (_cancel_tx, mut cancel_rx) = watch::channel(false); // Channel for sending complete chunks to the WhatsApp posting task. let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); let msg_tx_for_callback = msg_tx.clone(); // Spawn a task to post messages as they arrive. let post_transport = Arc::clone(&ctx.transport); let post_sender = sender.to_string(); let post_task = tokio::spawn(async move { while let Some(chunk) = msg_rx.recv().await { // Convert Markdown to WhatsApp formatting, then split into sized chunks. let formatted = markdown_to_whatsapp(&chunk); for part in chunk_for_whatsapp(&formatted) { let _ = post_transport.send_message(&post_sender, &part, "").await; } } }); // Shared buffer between the sync token callback and the async scope. let buffer = Arc::new(std::sync::Mutex::new(String::new())); let buffer_for_callback = Arc::clone(&buffer); let sent_any_chunk = Arc::new(AtomicBool::new(false)); let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk); let project_root_str = ctx.project_root.to_string_lossy().to_string(); let chat_fut = provider.chat_stream( &prompt, &project_root_str, resume_session_id.as_deref(), None, &mut cancel_rx, move |token| { let mut buf = buffer_for_callback.lock().unwrap(); buf.push_str(token); let paragraphs = drain_complete_paragraphs(&mut buf); for chunk in paragraphs { sent_any_chunk_for_callback.store(true, Ordering::Relaxed); let _ = msg_tx_for_callback.send(chunk); } }, |_thinking| {}, |_activity| {}, ); tokio::pin!(chat_fut); // Lock the permission receiver for the duration of this chat session. let mut perm_rx_guard = ctx.perm_rx.lock().await; let result = loop { tokio::select! { r = &mut chat_fut => break r, Some(perm_fwd) = perm_rx_guard.recv() => { let prompt_msg = format!( "*Permission Request*\n\nTool: `{}`\n```json\n{}\n```\n\nReply *yes* to approve or *no* to deny.", perm_fwd.tool_name, serde_json::to_string_pretty(&perm_fwd.tool_input) .unwrap_or_else(|_| perm_fwd.tool_input.to_string()), ); let formatted = markdown_to_whatsapp(&prompt_msg); for part in chunk_for_whatsapp(&formatted) { let _ = ctx.transport.send_message(sender, &part, "").await; } // Store the response sender so the incoming message handler // can resolve it when the user replies yes/no. ctx.pending_perm_replies .lock() .await .insert(sender.to_string(), perm_fwd.response_tx); // Spawn a timeout task: auto-deny if the user does not respond. let pending = Arc::clone(&ctx.pending_perm_replies); let timeout_sender = sender.to_string(); let timeout_transport = Arc::clone(&ctx.transport); let timeout_secs = ctx.permission_timeout_secs; tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await; if let Some(tx) = pending.lock().await.remove(&timeout_sender) { let _ = tx.send(PermissionDecision::Deny); let msg = "Permission request timed out — denied (fail-closed)."; let _ = timeout_transport.send_message(&timeout_sender, msg, "").await; } }); } } }; drop(perm_rx_guard); // Flush remaining text. let remaining = buffer.lock().unwrap().trim().to_string(); let did_send_any = sent_any_chunk.load(Ordering::Relaxed); let (assistant_reply, new_session_id) = match result { Ok(ClaudeCodeResult { messages, session_id, }) => { let reply = if !remaining.is_empty() { let _ = msg_tx.send(remaining.clone()); remaining } else if !did_send_any { let last_text = messages .iter() .rev() .find(|m| m.role == crate::llm::types::Role::Assistant && !m.content.is_empty()) .map(|m| m.content.clone()) .unwrap_or_default(); if !last_text.is_empty() { let _ = msg_tx.send(last_text.clone()); } last_text } else { remaining }; slog!("[whatsapp] session_id from chat_stream: {:?}", session_id); (reply, session_id) } Err(e) => { slog!("[whatsapp] LLM error: {e}"); let err_msg = format!("Error processing your request: {e}"); let _ = msg_tx.send(err_msg.clone()); (err_msg, None) } }; // Signal the posting task to finish and wait for it. drop(msg_tx); let _ = post_task.await; // Record this exchange in conversation history. if !assistant_reply.starts_with("Error processing") { let mut guard = ctx.history.lock().await; let conv = guard.entry(sender.to_string()).or_default(); if new_session_id.is_some() { conv.session_id = new_session_id; } conv.entries.push(ConversationEntry { role: ConversationRole::User, sender: sender.to_string(), content: user_message.to_string(), }); conv.entries.push(ConversationEntry { role: ConversationRole::Assistant, sender: String::new(), content: assistant_reply, }); // Trim to configured maximum. if conv.entries.len() > ctx.history_size { let excess = conv.entries.len() - ctx.history_size; conv.entries.drain(..excess); } save_whatsapp_history(&ctx.project_root, &guard); } } // ── Tests ─────────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use crate::agents::AgentPool; use crate::io::watcher::WatcherEvent; use crate::chat::transport::matrix::{ConversationEntry, ConversationRole, RoomConversation}; use super::super::history::{MessagingWindowTracker, WhatsAppConversationHistory}; use super::super::WhatsAppWebhookContext; use super::*; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex as TokioMutex; /// Build a minimal WhatsAppWebhookContext for allowlist tests. fn make_ctx_with_allowlist( allowed_phones: Vec, ) -> Arc { struct NullTransport; #[async_trait::async_trait] impl crate::chat::ChatTransport for NullTransport { async fn send_message( &self, _room: &str, _plain: &str, _html: &str, ) -> Result { Ok(String::new()) } async fn edit_message( &self, _room: &str, _id: &str, _plain: &str, _html: &str, ) -> Result<(), String> { Ok(()) } async fn send_typing(&self, _room: &str, _typing: bool) -> Result<(), String> { Ok(()) } } let tmp = tempfile::tempdir().unwrap(); let (tx, _rx) = tokio::sync::broadcast::channel::(16); let agents = Arc::new(AgentPool::new(3999, tx)); let tracker = Arc::new(MessagingWindowTracker::new()); let (_perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel(); Arc::new(WhatsAppWebhookContext { verify_token: "tok".to_string(), provider: "meta".to_string(), transport: Arc::new(NullTransport), project_root: tmp.path().to_path_buf(), agents, bot_name: "Bot".to_string(), bot_user_id: "whatsapp-bot".to_string(), ambient_rooms: Arc::new(std::sync::Mutex::new(Default::default())), history: Arc::new(tokio::sync::Mutex::new(Default::default())), history_size: 20, window_tracker: tracker, allowed_phones, perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(Default::default())), permission_timeout_secs: 120, }) } // ── Allowlist tests ─────────────────────────────────────────────────── #[tokio::test] async fn allowlist_blocks_unauthorized_sender() { let allowed = vec!["+15551111111".to_string()]; let ctx = make_ctx_with_allowlist(allowed); let unauthorized = "+15559999999"; handle_incoming_message(&ctx, unauthorized, "hello").await; // window_tracker is only updated AFTER the allowlist check, so an // unauthorized sender must leave the tracker untouched. assert!( !ctx.window_tracker.is_within_window(unauthorized), "unauthorized sender should not have updated the window tracker" ); } #[tokio::test] async fn allowlist_empty_allows_all_senders() { // Empty allowlist = open (backwards compatible). let ctx = make_ctx_with_allowlist(vec![]); let sender = "+15551234567"; handle_incoming_message(&ctx, sender, "hello").await; // window_tracker.record_message is called right after the allowlist // check passes, so the sender should be recorded. assert!( ctx.window_tracker.is_within_window(sender), "sender should be recorded when allowlist is empty" ); } #[tokio::test] async fn allowlist_allows_listed_sender() { let sender = "+15551111111"; let ctx = make_ctx_with_allowlist(vec![sender.to_string()]); handle_incoming_message(&ctx, sender, "hello").await; assert!( ctx.window_tracker.is_within_window(sender), "listed sender should be recorded in the window tracker" ); } // ── rebuild command extraction ───────────────────────────────────── #[test] fn rebuild_command_extracted_from_plain_message() { // WhatsApp messages arrive without a bot mention prefix. // extract_rebuild_command must recognise "rebuild" by itself. let result = crate::chat::transport::matrix::rebuild::extract_rebuild_command( "rebuild", "Timmy", "@timmy:home.local", ); assert!(result.is_some(), "plain 'rebuild' should be recognised"); } #[test] fn rebuild_command_extracted_with_bot_name_prefix() { let result = crate::chat::transport::matrix::rebuild::extract_rebuild_command( "Timmy rebuild", "Timmy", "@timmy:home.local", ); assert!(result.is_some(), "'Timmy rebuild' should be recognised"); } #[test] fn non_rebuild_whatsapp_message_not_extracted() { let result = crate::chat::transport::matrix::rebuild::extract_rebuild_command( "status", "Timmy", "@timmy:home.local", ); assert!(result.is_none(), "'status' should not be recognised as rebuild"); } // ── reset command extraction ─────────────────────────────────────── #[test] fn reset_command_extracted_from_plain_message() { let result = crate::chat::transport::matrix::reset::extract_reset_command( "reset", "Timmy", "@timmy:home.local", ); assert!(result.is_some(), "plain 'reset' should be recognised"); } #[test] fn reset_command_extracted_with_bot_name_prefix() { let result = crate::chat::transport::matrix::reset::extract_reset_command( "Timmy reset", "Timmy", "@timmy:home.local", ); assert!(result.is_some(), "'Timmy reset' should be recognised"); } #[tokio::test] async fn reset_command_clears_whatsapp_session() { let sender = "+15555550100"; let history: WhatsAppConversationHistory = Arc::new(TokioMutex::new({ let mut m = HashMap::new(); m.insert(sender.to_string(), RoomConversation { session_id: Some("old-session".to_string()), entries: vec![ConversationEntry { role: ConversationRole::User, sender: sender.to_string(), content: "previous message".to_string(), }], }); m })); let tmp = tempfile::tempdir().unwrap(); let sk = tmp.path().join(".storkit"); std::fs::create_dir_all(&sk).unwrap(); { let mut guard = history.lock().await; let conv = guard.entry(sender.to_string()).or_insert_with(RoomConversation::default); conv.session_id = None; conv.entries.clear(); save_whatsapp_history(tmp.path(), &guard); } let guard = history.lock().await; let conv = guard.get(sender).unwrap(); assert!(conv.session_id.is_none(), "session_id should be cleared"); assert!(conv.entries.is_empty(), "entries should be cleared"); } #[test] fn start_command_extracted_from_plain_message() { // WhatsApp messages arrive without a bot mention prefix. // extract_start_command must recognise "start 42" by itself. let result = crate::chat::transport::matrix::start::extract_start_command( "start 42", "Timmy", "@timmy:home.local", ); assert!(result.is_some(), "plain 'start 42' should be recognised"); assert_eq!( result, Some(crate::chat::transport::matrix::start::StartCommand::Start { story_number: "42".to_string(), agent_hint: None, }) ); } #[test] fn start_command_extracted_with_bot_name_prefix() { let result = crate::chat::transport::matrix::start::extract_start_command( "Timmy start 99", "Timmy", "@timmy:home.local", ); assert!(result.is_some(), "'Timmy start 99' should be recognised"); } #[test] fn non_start_whatsapp_message_not_extracted() { let result = crate::chat::transport::matrix::start::extract_start_command( "help", "Timmy", "@timmy:home.local", ); assert!(result.is_none(), "'help' should not be recognised as start"); } // ── rmtree command extraction ────────────────────────────────────── #[test] fn rmtree_command_extracted_from_plain_message() { // WhatsApp messages arrive without a bot mention prefix. // extract_rmtree_command must recognise "rmtree 42" by itself. let result = crate::chat::transport::matrix::rmtree::extract_rmtree_command( "rmtree 42", "Timmy", "@timmy:home.local", ); assert!( matches!( result, Some(crate::chat::transport::matrix::rmtree::RmtreeCommand::Rmtree { .. }) ), "plain 'rmtree 42' should be recognised" ); } #[test] fn rmtree_command_extracted_with_bot_name_prefix() { let result = crate::chat::transport::matrix::rmtree::extract_rmtree_command( "Timmy rmtree 42", "Timmy", "@timmy:home.local", ); assert!( matches!( result, Some(crate::chat::transport::matrix::rmtree::RmtreeCommand::Rmtree { .. }) ), "'Timmy rmtree 42' should be recognised" ); } #[test] fn rmtree_command_returns_bad_args_without_number() { let result = crate::chat::transport::matrix::rmtree::extract_rmtree_command( "rmtree", "Timmy", "@timmy:home.local", ); assert_eq!( result, Some(crate::chat::transport::matrix::rmtree::RmtreeCommand::BadArgs) ); } #[test] fn non_rmtree_whatsapp_message_not_extracted() { let result = crate::chat::transport::matrix::rmtree::extract_rmtree_command( "status", "Timmy", "@timmy:home.local", ); assert!(result.is_none(), "'status' should not be recognised as rmtree"); } }