diff --git a/server/src/chat/transport/whatsapp.rs b/server/src/chat/transport/whatsapp.rs deleted file mode 100644 index 7e9f4da5..00000000 --- a/server/src/chat/transport/whatsapp.rs +++ /dev/null @@ -1,2302 +0,0 @@ -//! WhatsApp Business API integration. -//! -//! Provides: -//! - [`WhatsAppTransport`] — a [`ChatTransport`] that sends messages via the -//! Meta Graph API (`graph.facebook.com/v21.0/{phone_number_id}/messages`). -//! - [`MessagingWindowTracker`] — tracks the 24-hour messaging window per user. -//! - [`webhook_verify`] / [`webhook_receive`] — Poem handlers for the WhatsApp -//! webhook (GET verification handshake + POST incoming messages). - -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::oneshot; -use tokio::sync::Mutex as TokioMutex; - -use crate::agents::AgentPool; -use crate::chat::transport::matrix::{ConversationEntry, ConversationRole, RoomConversation}; -use crate::chat::{ChatTransport, MessageId}; -use crate::http::context::{PermissionDecision, PermissionForward}; -use crate::slog; - -// ── API base URLs (overridable for tests) ──────────────────────────────── - -const GRAPH_API_BASE: &str = "https://graph.facebook.com/v21.0"; -const TWILIO_API_BASE: &str = "https://api.twilio.com"; - -/// Graph API error code indicating the 24-hour messaging window has elapsed. -/// -/// When Meta returns this code the caller must fall back to an approved message -/// template instead of free-form text. -const ERROR_CODE_OUTSIDE_WINDOW: i64 = 131047; - -/// Sentinel error string returned by [`WhatsAppTransport::send_text`] when the -/// Graph API reports that the 24-hour messaging window has expired. -const OUTSIDE_WINDOW_ERR: &str = "OUTSIDE_MESSAGING_WINDOW"; - -// ── Messaging window tracker ───────────────────────────────────────────── - -/// Tracks the 24-hour messaging window per WhatsApp phone number. -/// -/// Meta's Business API only permits free-form text messages within 24 hours of -/// the last *inbound* message from that user. After that window expires, only -/// approved message templates may be sent. -/// -/// Call [`record_message`] whenever an inbound message is received. Before -/// sending a proactive notification, call [`is_within_window`] to choose -/// between free-form text and a template message. -pub struct MessagingWindowTracker { - last_message: std::sync::Mutex>, - #[allow(dead_code)] // Used by Meta provider path (is_within_window → send_notification) - window_duration: std::time::Duration, -} - -impl Default for MessagingWindowTracker { - fn default() -> Self { - Self::new() - } -} - -impl MessagingWindowTracker { - /// Create a tracker with the standard 24-hour window. - pub fn new() -> Self { - Self { - last_message: std::sync::Mutex::new(HashMap::new()), - window_duration: std::time::Duration::from_secs(24 * 60 * 60), - } - } - - /// Create a tracker with a custom window duration (useful in tests). - #[cfg(test)] - fn with_duration(window_duration: std::time::Duration) -> Self { - Self { - last_message: std::sync::Mutex::new(HashMap::new()), - window_duration, - } - } - - /// Record that `phone` sent an inbound message right now. - pub fn record_message(&self, phone: &str) { - self.last_message - .lock() - .unwrap() - .insert(phone.to_string(), std::time::Instant::now()); - } - - /// Returns `true` when the last inbound message from `phone` arrived within - /// the 24-hour window, meaning free-form replies are still permitted. - #[allow(dead_code)] // Used by Meta provider path (send_notification) - pub fn is_within_window(&self, phone: &str) -> bool { - let map = self.last_message.lock().unwrap(); - match map.get(phone) { - Some(&instant) => instant.elapsed() < self.window_duration, - None => false, - } - } -} - -// ── WhatsApp Transport ────────────────────────────────────────────────── - -/// Real WhatsApp Business API transport. -/// -/// Sends text messages via `POST {GRAPH_API_BASE}/{phone_number_id}/messages`. -/// Falls back to approved notification templates when the 24-hour window has -/// elapsed (Meta error 131047). -pub struct WhatsAppTransport { - phone_number_id: String, - access_token: String, - client: reqwest::Client, - /// Name of the approved Meta message template used for notifications - /// outside the 24-hour messaging window. - #[allow(dead_code)] // Used by Meta provider path (send_template_notification) - notification_template_name: String, - /// Optional base URL override for tests. - api_base: String, -} - -impl WhatsAppTransport { - pub fn new( - phone_number_id: String, - access_token: String, - notification_template_name: String, - ) -> Self { - Self { - phone_number_id, - access_token, - client: reqwest::Client::new(), - notification_template_name, - api_base: GRAPH_API_BASE.to_string(), - } - } - - #[cfg(test)] - fn with_api_base(phone_number_id: String, access_token: String, api_base: String) -> Self { - Self { - phone_number_id, - access_token, - client: reqwest::Client::new(), - notification_template_name: "pipeline_notification".to_string(), - api_base, - } - } - - /// Send a free-form text message to a WhatsApp user via the Graph API. - /// - /// Returns [`OUTSIDE_WINDOW_ERR`] if the API responds with error code - /// 131047 (messaging window expired). All other errors are returned as - /// descriptive strings. - async fn send_text(&self, to: &str, body: &str) -> Result { - let url = format!("{}/{}/messages", self.api_base, self.phone_number_id); - - let payload = GraphSendMessage { - messaging_product: "whatsapp", - to, - r#type: "text", - text: GraphTextBody { body }, - }; - - let resp = self - .client - .post(&url) - .bearer_auth(&self.access_token) - .json(&payload) - .send() - .await - .map_err(|e| format!("WhatsApp API request failed: {e}"))?; - - let status = resp.status(); - let resp_text = resp - .text() - .await - .unwrap_or_else(|_| "".to_string()); - - if !status.is_success() { - // Check for 'outside messaging window' (code 131047). Return a - // distinct sentinel so callers can fall back to a template without - // crashing. - if let Ok(err_body) = serde_json::from_str::(&resp_text) - && err_body.error.as_ref().and_then(|e| e.code) == Some(ERROR_CODE_OUTSIDE_WINDOW) - { - slog!( - "[whatsapp] Outside 24-hour messaging window for {to}; \ - template required (error 131047)" - ); - return Err(OUTSIDE_WINDOW_ERR.to_string()); - } - return Err(format!("WhatsApp API returned {status}: {resp_text}")); - } - - // Extract the message ID from the response. - let parsed: GraphSendResponse = serde_json::from_str(&resp_text).map_err(|e| { - format!("Failed to parse WhatsApp API response: {e} — body: {resp_text}") - })?; - - let msg_id = parsed - .messages - .first() - .map(|m| m.id.clone()) - .unwrap_or_default(); - - Ok(msg_id) - } - - /// Send an approved template notification message. - /// - /// Used when the 24-hour window has expired and free-form text is not - /// permitted. The template must already be approved in the Meta Business - /// Manager under the name configured in `bot.toml` - /// (`whatsapp_notification_template`, default `pipeline_notification`). - /// - /// The template body is expected to accept two positional parameters: - /// `{{1}}` = story name, `{{2}}` = pipeline stage. - #[allow(dead_code)] // Meta provider path — template fallback for expired 24h window - pub async fn send_template_notification( - &self, - to: &str, - story_name: &str, - stage: &str, - ) -> Result { - let url = format!("{}/{}/messages", self.api_base, self.phone_number_id); - - let payload = GraphTemplateMessage { - messaging_product: "whatsapp", - to, - r#type: "template", - template: GraphTemplate { - name: &self.notification_template_name, - language: GraphLanguage { code: "en_US" }, - components: vec![GraphTemplateComponent { - r#type: "body", - parameters: vec![ - GraphTemplateParameter { - r#type: "text", - text: story_name.to_string(), - }, - GraphTemplateParameter { - r#type: "text", - text: stage.to_string(), - }, - ], - }], - }, - }; - - let resp = self - .client - .post(&url) - .bearer_auth(&self.access_token) - .json(&payload) - .send() - .await - .map_err(|e| format!("WhatsApp template API request failed: {e}"))?; - - let status = resp.status(); - let resp_text = resp - .text() - .await - .unwrap_or_else(|_| "".to_string()); - - if !status.is_success() { - return Err(format!( - "WhatsApp template API returned {status}: {resp_text}" - )); - } - - let parsed: GraphSendResponse = serde_json::from_str(&resp_text).map_err(|e| { - format!("Failed to parse WhatsApp template API response: {e} — body: {resp_text}") - })?; - - let msg_id = parsed - .messages - .first() - .map(|m| m.id.clone()) - .unwrap_or_default(); - - Ok(msg_id) - } - - /// Send a pipeline notification, respecting the 24-hour messaging window. - /// - /// - Within the window: sends a free-form text message. - /// - Outside the window (or if the API returns 131047): sends an approved - /// template message instead. - /// - /// This method never crashes on a messaging-window error — it always - /// attempts the template fallback and logs what happened. - #[allow(dead_code)] // Meta provider path — window-aware notification dispatch - pub async fn send_notification( - &self, - to: &str, - tracker: &MessagingWindowTracker, - story_name: &str, - stage: &str, - ) -> Result { - if tracker.is_within_window(to) { - let text = format!("Story '{story_name}' has moved to {stage}."); - match self.send_text(to, &text).await { - Ok(id) => return Ok(id), - Err(ref e) if e == OUTSIDE_WINDOW_ERR => { - // Window expired between our check and the API call — - // fall through to the template path. - slog!( - "[whatsapp] Window expired mid-flight for {to}; \ - falling back to template" - ); - } - Err(e) => return Err(e), - } - } - - // Outside window — use the approved template. - slog!("[whatsapp] Sending template notification to {to} (outside 24h window)"); - self.send_template_notification(to, story_name, stage).await - } -} - -#[async_trait] -impl ChatTransport for WhatsAppTransport { - async fn send_message( - &self, - recipient: &str, - plain: &str, - _html: &str, - ) -> Result { - slog!("[whatsapp] send_message to {recipient}: {plain:.80}"); - match self.send_text(recipient, plain).await { - Ok(id) => Ok(id), - Err(ref e) if e == OUTSIDE_WINDOW_ERR => { - // Graceful degradation: log and surface a meaningful error - // rather than crashing. Callers sending command responses - // should normally be within the window; this handles the edge - // case where processing was delayed. - slog!( - "[whatsapp] Cannot send to {recipient}: outside 24h window \ - (message dropped)" - ); - Err(format!( - "Outside 24-hour messaging window for {recipient}; \ - send a message to the bot first to re-open the window" - )) - } - Err(e) => Err(e), - } - } - - async fn edit_message( - &self, - recipient: &str, - _original_message_id: &str, - plain: &str, - html: &str, - ) -> Result<(), String> { - // WhatsApp does not support message editing — send a new message. - slog!("[whatsapp] edit_message — WhatsApp does not support edits, sending new message"); - self.send_message(recipient, plain, html).await.map(|_| ()) - } - - async fn send_typing(&self, _recipient: &str, _typing: bool) -> Result<(), String> { - // WhatsApp Business API does not expose typing indicators. - Ok(()) - } -} - -// ── Twilio Transport ──────────────────────────────────────────────────── - -/// WhatsApp transport that routes through Twilio's REST API. -/// -/// Sends messages via `POST {TWILIO_API_BASE}/2010-04-01/Accounts/{account_sid}/Messages.json` -/// using HTTP Basic Auth (Account SID as username, Auth Token as password). -/// -/// Inbound messages from Twilio arrive as `application/x-www-form-urlencoded` -/// POST bodies; use [`extract_twilio_text_messages`] to parse them. -pub struct TwilioWhatsAppTransport { - account_sid: String, - auth_token: String, - /// Sender number in E.164 format, e.g. `+14155551234`. - from_number: String, - client: reqwest::Client, - /// Optional base URL override for tests. - api_base: String, -} - -impl TwilioWhatsAppTransport { - pub fn new(account_sid: String, auth_token: String, from_number: String) -> Self { - Self { - account_sid, - auth_token, - from_number, - client: reqwest::Client::new(), - api_base: TWILIO_API_BASE.to_string(), - } - } - - #[cfg(test)] - fn with_api_base( - account_sid: String, - auth_token: String, - from_number: String, - api_base: String, - ) -> Self { - Self { - account_sid, - auth_token, - from_number, - client: reqwest::Client::new(), - api_base, - } - } - - /// Send a WhatsApp message via Twilio's Messaging REST API. - async fn send_text(&self, to: &str, body: &str) -> Result { - let url = format!( - "{}/2010-04-01/Accounts/{}/Messages.json", - self.api_base, self.account_sid - ); - - // Twilio expects the WhatsApp number with a "whatsapp:" prefix. - let from = if self.from_number.starts_with("whatsapp:") { - self.from_number.clone() - } else { - format!("whatsapp:{}", self.from_number) - }; - let to_wa = if to.starts_with("whatsapp:") { - to.to_string() - } else { - format!("whatsapp:{}", to) - }; - - let params = [ - ("From", from.as_str()), - ("To", to_wa.as_str()), - ("Body", body), - ]; - - let resp = self - .client - .post(&url) - .basic_auth(&self.account_sid, Some(&self.auth_token)) - .form(¶ms) - .send() - .await - .map_err(|e| format!("Twilio API request failed: {e}"))?; - - let status = resp.status(); - let resp_text = resp - .text() - .await - .unwrap_or_else(|_| "".to_string()); - - if !status.is_success() { - return Err(format!("Twilio API returned {status}: {resp_text}")); - } - - let parsed: TwilioSendResponse = serde_json::from_str(&resp_text) - .map_err(|e| format!("Failed to parse Twilio API response: {e} — body: {resp_text}"))?; - - Ok(parsed.sid.unwrap_or_default()) - } -} - -#[async_trait] -impl ChatTransport for TwilioWhatsAppTransport { - async fn send_message( - &self, - recipient: &str, - plain: &str, - _html: &str, - ) -> Result { - slog!("[whatsapp/twilio] send_message to {recipient}: {plain:.80}"); - self.send_text(recipient, plain).await - } - - async fn edit_message( - &self, - recipient: &str, - _original_message_id: &str, - plain: &str, - html: &str, - ) -> Result<(), String> { - // Twilio does not support message editing — send a new message. - slog!( - "[whatsapp/twilio] edit_message — Twilio does not support edits, sending new message" - ); - self.send_message(recipient, plain, html).await.map(|_| ()) - } - - async fn send_typing(&self, _recipient: &str, _typing: bool) -> Result<(), String> { - // Twilio WhatsApp API does not expose typing indicators. - Ok(()) - } -} - -// ── Twilio API request/response types ────────────────────────────────── - -#[derive(Deserialize)] -struct TwilioSendResponse { - sid: Option, -} - -// ── Twilio webhook types (Twilio → us) ───────────────────────────────── - -/// Form-encoded fields from a Twilio WhatsApp inbound webhook POST. -#[derive(Deserialize, Debug)] -pub struct TwilioWebhookForm { - /// Sender number with `whatsapp:` prefix, e.g. `whatsapp:+15551234567`. - #[serde(rename = "From")] - pub from: Option, - /// Message body text. - #[serde(rename = "Body")] - pub body: Option, -} - -/// Extract text messages from a Twilio form-encoded webhook body. -/// -/// Returns `(sender_phone, message_body)` pairs, with the `whatsapp:` prefix -/// stripped from the sender number. -pub fn extract_twilio_text_messages(bytes: &[u8]) -> Vec<(String, String)> { - let form: TwilioWebhookForm = match serde_urlencoded::from_bytes(bytes) { - Ok(f) => f, - Err(e) => { - slog!("[whatsapp/twilio] Failed to parse webhook form body: {e}"); - return vec![]; - } - }; - - let from = match form.from { - Some(f) => f, - None => return vec![], - }; - let body = match form.body { - Some(b) if !b.is_empty() => b, - _ => return vec![], - }; - - // Strip the "whatsapp:" prefix so the sender is stored as a plain phone number. - let sender = from.strip_prefix("whatsapp:").unwrap_or(&from).to_string(); - - vec![(sender, body)] -} - -// ── Graph API request/response types ──────────────────────────────────── - -#[derive(Serialize)] -struct GraphSendMessage<'a> { - messaging_product: &'a str, - to: &'a str, - r#type: &'a str, - text: GraphTextBody<'a>, -} - -#[derive(Serialize)] -struct GraphTextBody<'a> { - body: &'a str, -} - -#[derive(Deserialize)] -struct GraphSendResponse { - #[serde(default)] - messages: Vec, -} - -#[derive(Deserialize)] -struct GraphMessageId { - id: String, -} - -// ── Graph API error response types ───────────────────────────────────── - -#[derive(Deserialize)] -struct GraphApiErrorResponse { - error: Option, -} - -#[derive(Deserialize)] -struct GraphApiError { - code: Option, - #[allow(dead_code)] - message: Option, -} - -// ── Template message types ────────────────────────────────────────────── - -#[allow(dead_code)] // Meta provider path — template message types -#[derive(Serialize)] -struct GraphTemplateMessage<'a> { - messaging_product: &'a str, - to: &'a str, - r#type: &'a str, - template: GraphTemplate<'a>, -} - -#[allow(dead_code)] -#[derive(Serialize)] -struct GraphTemplate<'a> { - name: &'a str, - language: GraphLanguage, - components: Vec, -} - -#[allow(dead_code)] -#[derive(Serialize)] -struct GraphLanguage { - code: &'static str, -} - -#[allow(dead_code)] -#[derive(Serialize)] -struct GraphTemplateComponent { - r#type: &'static str, - parameters: Vec, -} - -#[allow(dead_code)] -#[derive(Serialize)] -struct GraphTemplateParameter { - r#type: &'static str, - text: String, -} - -// ── Webhook types (Meta → us) ─────────────────────────────────────────── - -/// Top-level webhook payload from Meta. -#[derive(Deserialize, Debug)] -pub struct WebhookPayload { - #[serde(default)] - pub entry: Vec, -} - -#[derive(Deserialize, Debug)] -pub struct WebhookEntry { - #[serde(default)] - pub changes: Vec, -} - -#[derive(Deserialize, Debug)] -pub struct WebhookChange { - pub value: Option, -} - -#[derive(Deserialize, Debug)] -pub struct WebhookValue { - #[serde(default)] - pub messages: Vec, - #[allow(dead_code)] // Present in Meta webhook JSON, kept for deserialization - pub metadata: Option, -} - -#[derive(Deserialize, Debug)] -pub struct WebhookMetadata { - #[allow(dead_code)] - pub phone_number_id: Option, -} - -#[derive(Deserialize, Debug)] -pub struct WebhookMessage { - pub from: Option, - pub r#type: Option, - pub text: Option, -} - -#[derive(Deserialize, Debug)] -pub struct WebhookText { - pub body: Option, -} - -/// Extract text messages from a webhook payload. -/// -/// Returns `(sender_phone, message_body)` pairs. -pub fn extract_text_messages(payload: &WebhookPayload) -> Vec<(String, String)> { - let mut messages = Vec::new(); - for entry in &payload.entry { - for change in &entry.changes { - if let Some(value) = &change.value { - for msg in &value.messages { - if msg.r#type.as_deref() == Some("text") - && let (Some(from), Some(text)) = (&msg.from, &msg.text) - && let Some(body) = &text.body - { - messages.push((from.clone(), body.clone())); - } - } - } - } - } - messages -} - -// ── WhatsApp message size limit ────────────────────────────────────── - -/// WhatsApp Business API maximum message body size in characters. -const WHATSAPP_MAX_MESSAGE_LEN: usize = 4096; - -/// Split a text into chunks that fit within WhatsApp's message size limit. -/// -/// Tries to split on paragraph boundaries (`\n\n`), falling back to line -/// boundaries (`\n`), and finally hard-splitting at the character limit. -pub fn chunk_for_whatsapp(text: &str) -> Vec { - if text.len() <= WHATSAPP_MAX_MESSAGE_LEN { - return vec![text.to_string()]; - } - - let mut chunks = Vec::new(); - let mut remaining = text; - - while !remaining.is_empty() { - if remaining.len() <= WHATSAPP_MAX_MESSAGE_LEN { - chunks.push(remaining.to_string()); - break; - } - - // Find the best split point within the limit. - let window = &remaining[..WHATSAPP_MAX_MESSAGE_LEN]; - - // Prefer paragraph boundary. - let split_pos = window - .rfind("\n\n") - .or_else(|| window.rfind('\n')) - .unwrap_or(WHATSAPP_MAX_MESSAGE_LEN); - - let (chunk, rest) = remaining.split_at(split_pos); - let chunk = chunk.trim(); - if !chunk.is_empty() { - chunks.push(chunk.to_string()); - } - - // Skip the delimiter. - remaining = rest.trim_start_matches('\n'); - } - - chunks -} - -// ── Markdown → WhatsApp formatting ─────────────────────────────────── - -/// Convert standard Markdown formatting to WhatsApp-native formatting. -/// -/// WhatsApp supports a limited subset of formatting: -/// - Bold: `*text*` -/// - Italic: `_text_` -/// - Strikethrough: `~text~` -/// - Monospace / code: backtick-delimited (same as Markdown) -/// -/// This function converts common Markdown constructs so messages render -/// nicely in WhatsApp instead of showing raw Markdown syntax. -pub fn markdown_to_whatsapp(text: &str) -> String { - use regex::Regex; - use std::sync::LazyLock; - - // Regexes are compiled once and reused across calls. - static RE_FENCED_BLOCK: LazyLock = - LazyLock::new(|| Regex::new(r"(?ms)^```.*?\n(.*?)^```").unwrap()); - static RE_HEADER: LazyLock = - LazyLock::new(|| Regex::new(r"(?m)^#{1,6}\s+(.+)$").unwrap()); - static RE_BOLD_ITALIC: LazyLock = - LazyLock::new(|| Regex::new(r"\*\*\*(.+?)\*\*\*").unwrap()); - static RE_BOLD: LazyLock = - LazyLock::new(|| Regex::new(r"\*\*(.+?)\*\*").unwrap()); - static RE_STRIKETHROUGH: LazyLock = - LazyLock::new(|| Regex::new(r"~~(.+?)~~").unwrap()); - static RE_LINK: LazyLock = - LazyLock::new(|| Regex::new(r"\[([^\]]+)\]\(([^)]+)\)").unwrap()); - static RE_HR: LazyLock = - LazyLock::new(|| Regex::new(r"(?m)^---+$").unwrap()); - - // 1. Protect fenced code blocks by replacing them with placeholders. - let mut code_blocks: Vec = Vec::new(); - let protected = RE_FENCED_BLOCK.replace_all(text, |caps: ®ex::Captures| { - let idx = code_blocks.len(); - code_blocks.push(caps[0].to_string()); - format!("\x00CODEBLOCK{idx}\x00") - }); - let mut out = protected.into_owned(); - - // 2. Headers → bold text. - out = RE_HEADER.replace_all(&out, "*$1*").into_owned(); - - // 3. Bold+italic (***text***) → bold italic (*_text_*). - out = RE_BOLD_ITALIC.replace_all(&out, "*_${1}_*").into_owned(); - - // 4. Bold (**text**) → WhatsApp bold (*text*). - out = RE_BOLD.replace_all(&out, "*$1*").into_owned(); - - // 5. Strikethrough (~~text~~) → WhatsApp strikethrough (~text~). - out = RE_STRIKETHROUGH.replace_all(&out, "~$1~").into_owned(); - - // 6. Links [text](url) → text (url). - out = RE_LINK.replace_all(&out, "$1 ($2)").into_owned(); - - // 7. Horizontal rules → empty line (just remove them). - out = RE_HR.replace_all(&out, "").into_owned(); - - // 8. Restore code blocks. - for (idx, block) in code_blocks.iter().enumerate() { - out = out.replace(&format!("\x00CODEBLOCK{idx}\x00"), block); - } - - out -} - -// ── Conversation history persistence ───────────────────────────────── - -/// Per-sender conversation history, keyed by phone number. -pub type WhatsAppConversationHistory = Arc>>; - -/// On-disk format for persisted WhatsApp conversation history. -#[derive(Serialize, Deserialize)] -struct PersistedWhatsAppHistory { - senders: HashMap, -} - -/// Path to the persisted WhatsApp conversation history file. -const WHATSAPP_HISTORY_FILE: &str = ".storkit/whatsapp_history.json"; - -/// Load WhatsApp conversation history from disk. -pub fn load_whatsapp_history(project_root: &std::path::Path) -> HashMap { - let path = project_root.join(WHATSAPP_HISTORY_FILE); - let data = match std::fs::read_to_string(&path) { - Ok(d) => d, - Err(_) => return HashMap::new(), - }; - let persisted: PersistedWhatsAppHistory = match serde_json::from_str(&data) { - Ok(p) => p, - Err(e) => { - slog!("[whatsapp] Failed to parse history file: {e}"); - return HashMap::new(); - } - }; - persisted.senders -} - -/// Save WhatsApp conversation history to disk. -fn save_whatsapp_history( - project_root: &std::path::Path, - history: &HashMap, -) { - let persisted = PersistedWhatsAppHistory { - senders: history.clone(), - }; - let path = project_root.join(WHATSAPP_HISTORY_FILE); - match serde_json::to_string_pretty(&persisted) { - Ok(json) => { - if let Err(e) = std::fs::write(&path, json) { - slog!("[whatsapp] Failed to write history file: {e}"); - } - } - Err(e) => slog!("[whatsapp] Failed to serialise history: {e}"), - } -} - -// ── Webhook handlers (Poem) ──────────────────────────────────────────── - -use poem::{Request, Response, handler, http::StatusCode, web::Query}; -use std::collections::HashSet; -use std::path::PathBuf; -use std::sync::Mutex; - -/// Query parameters for the webhook verification GET request. -#[derive(Deserialize)] -pub struct VerifyQuery { - #[serde(rename = "hub.mode")] - pub hub_mode: Option, - #[serde(rename = "hub.verify_token")] - pub hub_verify_token: Option, - #[serde(rename = "hub.challenge")] - pub hub_challenge: Option, -} - -/// Shared context for webhook handlers, injected via Poem's `Data` extractor. -pub struct WhatsAppWebhookContext { - pub verify_token: String, - /// Active provider: `"meta"` (Meta Graph API) or `"twilio"` (Twilio REST API). - pub provider: String, - pub transport: Arc, - pub project_root: PathBuf, - pub agents: Arc, - pub bot_name: String, - /// The bot's "user ID" for command dispatch (e.g. "whatsapp-bot"). - pub bot_user_id: String, - pub ambient_rooms: Arc>>, - /// Per-sender conversation history for LLM passthrough. - pub history: WhatsAppConversationHistory, - /// Maximum number of conversation entries to keep per sender. - pub history_size: usize, - /// Tracks the 24-hour messaging window per user phone number. - pub window_tracker: Arc, - /// Phone numbers allowed to send messages to the bot. - /// When empty, all numbers are allowed (backwards compatible). - pub allowed_phones: Vec, - /// Permission requests from the MCP `prompt_permission` tool arrive here. - pub perm_rx: Arc>>, - /// Pending permission replies keyed by sender phone number. - pub pending_perm_replies: - Arc>>>, - /// Seconds before an unanswered permission prompt is auto-denied. - pub permission_timeout_secs: u64, -} - -/// GET /webhook/whatsapp — webhook verification. -/// -/// For Meta: responds to the `hub.mode=subscribe` challenge handshake. -/// For Twilio: Twilio does not send GET verification; always returns 200 OK. -#[handler] -pub async fn webhook_verify( - Query(q): Query, - ctx: poem::web::Data<&Arc>, -) -> Response { - // Twilio does not use a GET challenge; just acknowledge. - if ctx.provider == "twilio" { - return Response::builder().status(StatusCode::OK).body("ok"); - } - - // Meta verification handshake. - if q.hub_mode.as_deref() == Some("subscribe") - && q.hub_verify_token.as_deref() == Some(&ctx.verify_token) - && let Some(challenge) = q.hub_challenge - { - slog!("[whatsapp] Webhook verification succeeded"); - return Response::builder().status(StatusCode::OK).body(challenge); - } - slog!("[whatsapp] Webhook verification failed"); - Response::builder() - .status(StatusCode::FORBIDDEN) - .body("Verification failed") -} - -/// POST /webhook/whatsapp — receive incoming messages. -/// -/// Dispatches to the appropriate parser based on the configured provider: -/// - `"meta"`: parses Meta's JSON `WebhookPayload`. -/// - `"twilio"`: parses Twilio's `application/x-www-form-urlencoded` body. -/// -/// Both providers expect a `200 OK` response, even on parse errors. -#[handler] -pub async fn webhook_receive( - req: &Request, - body: poem::Body, - ctx: poem::web::Data<&Arc>, -) -> Response { - let _ = req; - let bytes = match body.into_bytes().await { - Ok(b) => b, - Err(e) => { - slog!("[whatsapp] Failed to read webhook body: {e}"); - return Response::builder() - .status(StatusCode::BAD_REQUEST) - .body("Bad request"); - } - }; - - let messages = if ctx.provider == "twilio" { - let msgs = extract_twilio_text_messages(&bytes); - if msgs.is_empty() { - slog!("[whatsapp/twilio] No text messages in webhook body; ignoring"); - } - msgs - } else { - let payload: WebhookPayload = match serde_json::from_slice(&bytes) { - Ok(p) => p, - Err(e) => { - slog!("[whatsapp] Failed to parse webhook payload: {e}"); - // Meta expects 200 even on parse errors to avoid retries. - return Response::builder().status(StatusCode::OK).body("ok"); - } - }; - let msgs = extract_text_messages(&payload); - if msgs.is_empty() { - // Status updates, read receipts, etc. — acknowledge silently. - return Response::builder().status(StatusCode::OK).body("ok"); - } - msgs - }; - - if messages.is_empty() { - return Response::builder().status(StatusCode::OK).body("ok"); - } - - let ctx = Arc::clone(*ctx); - tokio::spawn(async move { - for (sender, text) in messages { - slog!("[whatsapp] Message from {sender}: {text}"); - handle_incoming_message(&ctx, &sender, &text).await; - } - }); - - Response::builder().status(StatusCode::OK).body("ok") -} - -/// Returns `true` if the message body should be interpreted as permission approval. -fn is_permission_approval(body: &str) -> bool { - let trimmed = body.trim().to_ascii_lowercase(); - matches!( - trimmed.as_str(), - "yes" | "y" | "approve" | "allow" | "ok" - ) -} - -/// Dispatch an incoming WhatsApp message to bot commands. -async fn handle_incoming_message(ctx: &WhatsAppWebhookContext, sender: &str, message: &str) { - use crate::chat::commands::{CommandDispatch, try_handle_command}; - - // Allowlist check: when configured, silently ignore unauthorized senders. - if !ctx.allowed_phones.is_empty() - && !ctx.allowed_phones.iter().any(|p| p == sender) - { - slog!("[whatsapp] Ignoring message from unauthorized sender: {sender}"); - return; - } - - // Record this inbound message to keep the 24-hour window open. - ctx.window_tracker.record_message(sender); - - // If there is a pending permission prompt for this sender, interpret the - // message as a yes/no response instead of starting a new command/LLM flow. - { - let mut pending = ctx.pending_perm_replies.lock().await; - if let Some(tx) = pending.remove(sender) { - let decision = if is_permission_approval(message) { - PermissionDecision::Approve - } else { - PermissionDecision::Deny - }; - let _ = tx.send(decision); - let confirmation = if decision == PermissionDecision::Approve { - "Permission approved." - } else { - "Permission denied." - }; - let formatted = markdown_to_whatsapp(confirmation); - let _ = ctx.transport.send_message(sender, &formatted, "").await; - return; - } - } - - let dispatch = CommandDispatch { - bot_name: &ctx.bot_name, - bot_user_id: &ctx.bot_user_id, - project_root: &ctx.project_root, - agents: &ctx.agents, - ambient_rooms: &ctx.ambient_rooms, - room_id: sender, - }; - - if let Some(response) = try_handle_command(&dispatch, message) { - slog!("[whatsapp] Sending command response to {sender}"); - let formatted = markdown_to_whatsapp(&response); - if let Err(e) = ctx.transport.send_message(sender, &formatted, "").await { - slog!("[whatsapp] Failed to send reply to {sender}: {e}"); - } - return; - } - - // Check for async commands (htop, delete). - if let Some(htop_cmd) = crate::chat::transport::matrix::htop::extract_htop_command( - message, - &ctx.bot_name, - &ctx.bot_user_id, - ) { - use crate::chat::transport::matrix::htop::HtopCommand; - slog!("[whatsapp] Handling htop command from {sender}"); - match htop_cmd { - HtopCommand::Stop => { - // htop stop — no-op on WhatsApp since there's no persistent - // editable message; just acknowledge. - let _ = ctx - .transport - .send_message(sender, "htop stopped.", "") - .await; - } - HtopCommand::Start { duration_secs } => { - // On WhatsApp, send a single snapshot instead of a live-updating - // dashboard since we can't edit messages. - let snapshot = crate::chat::transport::matrix::htop::build_htop_message( - &ctx.agents, - 0, - duration_secs, - ); - let _ = ctx.transport.send_message(sender, &snapshot, "").await; - } - } - return; - } - - if let Some(del_cmd) = crate::chat::transport::matrix::delete::extract_delete_command( - message, - &ctx.bot_name, - &ctx.bot_user_id, - ) { - let response = match del_cmd { - crate::chat::transport::matrix::delete::DeleteCommand::Delete { story_number } => { - slog!("[whatsapp] Handling delete command from {sender}: story {story_number}"); - crate::chat::transport::matrix::delete::handle_delete( - &ctx.bot_name, - &story_number, - &ctx.project_root, - &ctx.agents, - ) - .await - } - crate::chat::transport::matrix::delete::DeleteCommand::BadArgs => { - format!("Usage: `{} delete `", ctx.bot_name) - } - }; - let _ = ctx.transport.send_message(sender, &response, "").await; - return; - } - - if crate::chat::transport::matrix::rebuild::extract_rebuild_command( - message, - &ctx.bot_name, - &ctx.bot_user_id, - ) - .is_some() - { - slog!("[whatsapp] Handling rebuild command from {sender}"); - let ack = "Rebuilding server… this may take a moment."; - let _ = ctx.transport.send_message(sender, ack, "").await; - let response = crate::chat::transport::matrix::rebuild::handle_rebuild( - &ctx.bot_name, - &ctx.project_root, - &ctx.agents, - ) - .await; - let _ = ctx.transport.send_message(sender, &response, "").await; - return; - } - - if let Some(rmtree_cmd) = crate::chat::transport::matrix::rmtree::extract_rmtree_command( - message, - &ctx.bot_name, - &ctx.bot_user_id, - ) { - let response = match rmtree_cmd { - crate::chat::transport::matrix::rmtree::RmtreeCommand::Rmtree { story_number } => { - slog!("[whatsapp] Handling rmtree command from {sender}: story {story_number}"); - crate::chat::transport::matrix::rmtree::handle_rmtree( - &ctx.bot_name, - &story_number, - &ctx.project_root, - &ctx.agents, - ) - .await - } - crate::chat::transport::matrix::rmtree::RmtreeCommand::BadArgs => { - format!("Usage: `{} rmtree `", ctx.bot_name) - } - }; - let _ = ctx.transport.send_message(sender, &response, "").await; - return; - } - - if crate::chat::transport::matrix::reset::extract_reset_command( - message, - &ctx.bot_name, - &ctx.bot_user_id, - ) - .is_some() - { - slog!("[whatsapp] Handling reset command from {sender}"); - { - let mut guard = ctx.history.lock().await; - let conv = guard.entry(sender.to_string()).or_insert_with(RoomConversation::default); - conv.session_id = None; - conv.entries.clear(); - save_whatsapp_history(&ctx.project_root, &guard); - } - let _ = ctx - .transport - .send_message(sender, "Session cleared.", "") - .await; - return; - } - - if let Some(start_cmd) = crate::chat::transport::matrix::start::extract_start_command( - message, - &ctx.bot_name, - &ctx.bot_user_id, - ) { - let response = match start_cmd { - crate::chat::transport::matrix::start::StartCommand::Start { - story_number, - agent_hint, - } => { - slog!("[whatsapp] Handling start command from {sender}: story {story_number}"); - crate::chat::transport::matrix::start::handle_start( - &ctx.bot_name, - &story_number, - agent_hint.as_deref(), - &ctx.project_root, - &ctx.agents, - ) - .await - } - crate::chat::transport::matrix::start::StartCommand::BadArgs => { - format!("Usage: `{} start `", ctx.bot_name) - } - }; - let _ = ctx.transport.send_message(sender, &response, "").await; - return; - } - - // No command matched — forward to LLM for conversational response. - slog!("[whatsapp] No command matched, forwarding to LLM for {sender}"); - handle_llm_message(ctx, sender, message).await; -} - -/// Forward a message to Claude Code and send the response back via WhatsApp. -async fn handle_llm_message(ctx: &WhatsAppWebhookContext, sender: &str, user_message: &str) { - use crate::chat::util::drain_complete_paragraphs; - use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult}; - use std::sync::atomic::{AtomicBool, Ordering}; - use tokio::sync::watch; - - // Look up existing session ID for this sender. - let resume_session_id: Option = { - let guard = ctx.history.lock().await; - guard.get(sender).and_then(|conv| conv.session_id.clone()) - }; - - let bot_name = &ctx.bot_name; - let prompt = format!( - "[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{sender}: {user_message}" - ); - - let provider = ClaudeCodeProvider::new(); - let (_cancel_tx, mut cancel_rx) = watch::channel(false); - - // Channel for sending complete chunks to the WhatsApp posting task. - let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); - let msg_tx_for_callback = msg_tx.clone(); - - // Spawn a task to post messages as they arrive. - let post_transport = Arc::clone(&ctx.transport); - let post_sender = sender.to_string(); - let post_task = tokio::spawn(async move { - while let Some(chunk) = msg_rx.recv().await { - // Convert Markdown to WhatsApp formatting, then split into sized chunks. - let formatted = markdown_to_whatsapp(&chunk); - for part in chunk_for_whatsapp(&formatted) { - let _ = post_transport.send_message(&post_sender, &part, "").await; - } - } - }); - - // Shared buffer between the sync token callback and the async scope. - let buffer = Arc::new(std::sync::Mutex::new(String::new())); - let buffer_for_callback = Arc::clone(&buffer); - let sent_any_chunk = Arc::new(AtomicBool::new(false)); - let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk); - - let project_root_str = ctx.project_root.to_string_lossy().to_string(); - let chat_fut = provider.chat_stream( - &prompt, - &project_root_str, - resume_session_id.as_deref(), - None, - &mut cancel_rx, - move |token| { - let mut buf = buffer_for_callback.lock().unwrap(); - buf.push_str(token); - let paragraphs = drain_complete_paragraphs(&mut buf); - for chunk in paragraphs { - sent_any_chunk_for_callback.store(true, Ordering::Relaxed); - let _ = msg_tx_for_callback.send(chunk); - } - }, - |_thinking| {}, - |_activity| {}, - ); - tokio::pin!(chat_fut); - - // Lock the permission receiver for the duration of this chat session. - let mut perm_rx_guard = ctx.perm_rx.lock().await; - - let result = loop { - tokio::select! { - r = &mut chat_fut => break r, - - Some(perm_fwd) = perm_rx_guard.recv() => { - let prompt_msg = format!( - "*Permission Request*\n\nTool: `{}`\n```json\n{}\n```\n\nReply *yes* to approve or *no* to deny.", - perm_fwd.tool_name, - serde_json::to_string_pretty(&perm_fwd.tool_input) - .unwrap_or_else(|_| perm_fwd.tool_input.to_string()), - ); - let formatted = markdown_to_whatsapp(&prompt_msg); - for part in chunk_for_whatsapp(&formatted) { - let _ = ctx.transport.send_message(sender, &part, "").await; - } - - // Store the response sender so the incoming message handler - // can resolve it when the user replies yes/no. - ctx.pending_perm_replies - .lock() - .await - .insert(sender.to_string(), perm_fwd.response_tx); - - // Spawn a timeout task: auto-deny if the user does not respond. - let pending = Arc::clone(&ctx.pending_perm_replies); - let timeout_sender = sender.to_string(); - let timeout_transport = Arc::clone(&ctx.transport); - let timeout_secs = ctx.permission_timeout_secs; - tokio::spawn(async move { - tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await; - if let Some(tx) = pending.lock().await.remove(&timeout_sender) { - let _ = tx.send(PermissionDecision::Deny); - let msg = "Permission request timed out — denied (fail-closed)."; - let _ = timeout_transport.send_message(&timeout_sender, msg, "").await; - } - }); - } - } - }; - drop(perm_rx_guard); - - // Flush remaining text. - let remaining = buffer.lock().unwrap().trim().to_string(); - let did_send_any = sent_any_chunk.load(Ordering::Relaxed); - - let (assistant_reply, new_session_id) = match result { - Ok(ClaudeCodeResult { - messages, - session_id, - }) => { - let reply = if !remaining.is_empty() { - let _ = msg_tx.send(remaining.clone()); - remaining - } else if !did_send_any { - let last_text = messages - .iter() - .rev() - .find(|m| m.role == crate::llm::types::Role::Assistant && !m.content.is_empty()) - .map(|m| m.content.clone()) - .unwrap_or_default(); - if !last_text.is_empty() { - let _ = msg_tx.send(last_text.clone()); - } - last_text - } else { - remaining - }; - slog!("[whatsapp] session_id from chat_stream: {:?}", session_id); - (reply, session_id) - } - Err(e) => { - slog!("[whatsapp] LLM error: {e}"); - let err_msg = format!("Error processing your request: {e}"); - let _ = msg_tx.send(err_msg.clone()); - (err_msg, None) - } - }; - - // Signal the posting task to finish and wait for it. - drop(msg_tx); - let _ = post_task.await; - - // Record this exchange in conversation history. - if !assistant_reply.starts_with("Error processing") { - let mut guard = ctx.history.lock().await; - let conv = guard.entry(sender.to_string()).or_default(); - - if new_session_id.is_some() { - conv.session_id = new_session_id; - } - - conv.entries.push(ConversationEntry { - role: ConversationRole::User, - sender: sender.to_string(), - content: user_message.to_string(), - }); - conv.entries.push(ConversationEntry { - role: ConversationRole::Assistant, - sender: String::new(), - content: assistant_reply, - }); - - // Trim to configured maximum. - if conv.entries.len() > ctx.history_size { - let excess = conv.entries.len() - ctx.history_size; - conv.entries.drain(..excess); - } - - save_whatsapp_history(&ctx.project_root, &guard); - } -} - -// ── Tests ─────────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - - // ── MessagingWindowTracker ──────────────────────────────────────── - - #[test] - fn window_tracker_unknown_user_is_outside_window() { - let tracker = MessagingWindowTracker::new(); - assert!(!tracker.is_within_window("15551234567")); - } - - #[test] - fn window_tracker_records_within_window() { - let tracker = MessagingWindowTracker::new(); - tracker.record_message("15551234567"); - assert!(tracker.is_within_window("15551234567")); - } - - #[test] - fn window_tracker_expired_window_returns_false() { - // Use a 1-nanosecond window so it expires immediately. - let tracker = MessagingWindowTracker::with_duration(std::time::Duration::from_nanos(1)); - tracker.record_message("15551234567"); - // Sleep briefly to ensure the instant has elapsed. - std::thread::sleep(std::time::Duration::from_millis(1)); - assert!(!tracker.is_within_window("15551234567")); - } - - #[test] - fn window_tracker_tracks_users_independently() { - let tracker = MessagingWindowTracker::new(); - tracker.record_message("111"); - assert!(tracker.is_within_window("111")); - assert!(!tracker.is_within_window("222")); - } - - // ── send_text error handling ─────────────────────────────────────── - - #[tokio::test] - async fn send_text_handles_131047_outside_window_error() { - let mut server = mockito::Server::new_async().await; - server - .mock("POST", "/123456/messages") - .with_status(400) - .with_body( - r#"{"error":{"message":"More than 24 hours have passed","type":"OAuthException","code":131047}}"#, - ) - .create_async() - .await; - - let transport = WhatsAppTransport::with_api_base( - "123456".to_string(), - "test-token".to_string(), - server.url(), - ); - - let result = transport.send_text("15551234567", "hello").await; - assert!(result.is_err()); - assert_eq!(result.unwrap_err(), OUTSIDE_WINDOW_ERR); - } - - #[tokio::test] - async fn send_message_handles_outside_window_gracefully() { - let mut server = mockito::Server::new_async().await; - server - .mock("POST", "/123456/messages") - .with_status(400) - .with_body( - r#"{"error":{"message":"More than 24 hours have passed","type":"OAuthException","code":131047}}"#, - ) - .create_async() - .await; - - let transport = WhatsAppTransport::with_api_base( - "123456".to_string(), - "test-token".to_string(), - server.url(), - ); - - // send_message must not panic — it returns Err with a human-readable message. - let result = transport.send_message("15551234567", "hello", "").await; - assert!(result.is_err()); - let msg = result.unwrap_err(); - assert!( - msg.contains("24-hour messaging window"), - "unexpected: {msg}" - ); - } - - // ── send_template_notification ──────────────────────────────────── - - #[tokio::test] - async fn send_template_notification_calls_graph_api() { - let mut server = mockito::Server::new_async().await; - let mock = server - .mock("POST", "/123456/messages") - .match_header("authorization", "Bearer test-token") - .match_body(mockito::Matcher::PartialJsonString( - r#"{"type":"template"}"#.to_string(), - )) - .with_body(r#"{"messages": [{"id": "wamid.tpl123"}]}"#) - .create_async() - .await; - - let transport = WhatsAppTransport::with_api_base( - "123456".to_string(), - "test-token".to_string(), - server.url(), - ); - - let result = transport - .send_template_notification("15551234567", "my-story", "done") - .await; - assert!(result.is_ok(), "unexpected err: {:?}", result.err()); - assert_eq!(result.unwrap(), "wamid.tpl123"); - mock.assert_async().await; - } - - #[tokio::test] - async fn send_notification_uses_text_within_window() { - let mut server = mockito::Server::new_async().await; - let mock = server - .mock("POST", "/123456/messages") - .match_body(mockito::Matcher::PartialJsonString( - r#"{"type":"text"}"#.to_string(), - )) - .with_body(r#"{"messages": [{"id": "wamid.txt1"}]}"#) - .create_async() - .await; - - let transport = WhatsAppTransport::with_api_base( - "123456".to_string(), - "test-token".to_string(), - server.url(), - ); - let tracker = MessagingWindowTracker::new(); - tracker.record_message("15551234567"); - - let result = transport - .send_notification("15551234567", &tracker, "my-story", "done") - .await; - assert!(result.is_ok()); - mock.assert_async().await; - } - - #[tokio::test] - async fn send_notification_uses_template_outside_window() { - let mut server = mockito::Server::new_async().await; - let mock = server - .mock("POST", "/123456/messages") - .match_body(mockito::Matcher::PartialJsonString( - r#"{"type":"template"}"#.to_string(), - )) - .with_body(r#"{"messages": [{"id": "wamid.tpl2"}]}"#) - .create_async() - .await; - - let transport = WhatsAppTransport::with_api_base( - "123456".to_string(), - "test-token".to_string(), - server.url(), - ); - // No record_message call — user is outside the window. - let tracker = MessagingWindowTracker::new(); - - let result = transport - .send_notification("15551234567", &tracker, "my-story", "done") - .await; - assert!(result.is_ok()); - mock.assert_async().await; - } - - // ── Existing webhook / transport tests ──────────────────────────── - - #[test] - fn extract_text_messages_parses_valid_payload() { - let json = r#"{ - "entry": [{ - "changes": [{ - "value": { - "messages": [{ - "from": "15551234567", - "type": "text", - "text": { "body": "help" } - }], - "metadata": { "phone_number_id": "123456" } - } - }] - }] - }"#; - let payload: WebhookPayload = serde_json::from_str(json).unwrap(); - let msgs = extract_text_messages(&payload); - assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].0, "15551234567"); - assert_eq!(msgs[0].1, "help"); - } - - #[test] - fn extract_text_messages_ignores_non_text() { - let json = r#"{ - "entry": [{ - "changes": [{ - "value": { - "messages": [{ - "from": "15551234567", - "type": "image", - "image": { "id": "img123" } - }], - "metadata": { "phone_number_id": "123456" } - } - }] - }] - }"#; - let payload: WebhookPayload = serde_json::from_str(json).unwrap(); - let msgs = extract_text_messages(&payload); - assert!(msgs.is_empty()); - } - - #[test] - fn extract_text_messages_handles_empty_payload() { - let json = r#"{ "entry": [] }"#; - let payload: WebhookPayload = serde_json::from_str(json).unwrap(); - let msgs = extract_text_messages(&payload); - assert!(msgs.is_empty()); - } - - #[test] - fn extract_text_messages_handles_multiple_messages() { - let json = r#"{ - "entry": [{ - "changes": [{ - "value": { - "messages": [ - { "from": "111", "type": "text", "text": { "body": "status" } }, - { "from": "222", "type": "text", "text": { "body": "help" } } - ], - "metadata": { "phone_number_id": "123456" } - } - }] - }] - }"#; - let payload: WebhookPayload = serde_json::from_str(json).unwrap(); - let msgs = extract_text_messages(&payload); - assert_eq!(msgs.len(), 2); - assert_eq!(msgs[0].1, "status"); - assert_eq!(msgs[1].1, "help"); - } - - #[tokio::test] - async fn transport_send_message_calls_graph_api() { - let mut server = mockito::Server::new_async().await; - let mock = server - .mock("POST", "/123456/messages") - .match_header("authorization", "Bearer test-token") - .with_body(r#"{"messages": [{"id": "wamid.abc123"}]}"#) - .create_async() - .await; - - let transport = WhatsAppTransport::with_api_base( - "123456".to_string(), - "test-token".to_string(), - server.url(), - ); - - let result = transport - .send_message("15551234567", "hello", "

hello

") - .await; - assert!(result.is_ok()); - assert_eq!(result.unwrap(), "wamid.abc123"); - mock.assert_async().await; - } - - #[tokio::test] - async fn transport_edit_sends_new_message() { - let mut server = mockito::Server::new_async().await; - let mock = server - .mock("POST", "/123456/messages") - .with_body(r#"{"messages": [{"id": "wamid.xyz"}]}"#) - .create_async() - .await; - - let transport = WhatsAppTransport::with_api_base( - "123456".to_string(), - "test-token".to_string(), - server.url(), - ); - - let result = transport - .edit_message("15551234567", "old-msg-id", "updated", "

updated

") - .await; - assert!(result.is_ok()); - mock.assert_async().await; - } - - #[tokio::test] - async fn transport_send_typing_succeeds() { - let transport = - WhatsAppTransport::new("123".to_string(), "tok".to_string(), "tpl".to_string()); - assert!(transport.send_typing("room1", true).await.is_ok()); - assert!(transport.send_typing("room1", false).await.is_ok()); - } - - #[tokio::test] - async fn transport_handles_api_error() { - let mut server = mockito::Server::new_async().await; - server - .mock("POST", "/123456/messages") - .with_status(401) - .with_body(r#"{"error": {"message": "Invalid token"}}"#) - .create_async() - .await; - - let transport = WhatsAppTransport::with_api_base( - "123456".to_string(), - "bad-token".to_string(), - server.url(), - ); - - let result = transport.send_message("15551234567", "hello", "").await; - assert!(result.is_err()); - assert!(result.unwrap_err().contains("401")); - } - - // ── chunk_for_whatsapp tests ──────────────────────────────────────── - - #[test] - fn chunk_short_message_returns_single_chunk() { - let chunks = chunk_for_whatsapp("Hello world"); - assert_eq!(chunks, vec!["Hello world"]); - } - - #[test] - fn chunk_exactly_at_limit_returns_single_chunk() { - let text = "a".repeat(WHATSAPP_MAX_MESSAGE_LEN); - let chunks = chunk_for_whatsapp(&text); - assert_eq!(chunks.len(), 1); - assert_eq!(chunks[0].len(), WHATSAPP_MAX_MESSAGE_LEN); - } - - #[test] - fn chunk_splits_on_paragraph_boundary() { - // Create text with a paragraph boundary near the split point. - let first_para = "a".repeat(4000); - let second_para = "b".repeat(200); - let text = format!("{first_para}\n\n{second_para}"); - let chunks = chunk_for_whatsapp(&text); - assert_eq!(chunks.len(), 2); - assert_eq!(chunks[0], first_para); - assert_eq!(chunks[1], second_para); - } - - #[test] - fn chunk_splits_on_line_boundary_when_no_paragraph_break() { - let first_line = "a".repeat(4000); - let second_line = "b".repeat(200); - let text = format!("{first_line}\n{second_line}"); - let chunks = chunk_for_whatsapp(&text); - assert_eq!(chunks.len(), 2); - assert_eq!(chunks[0], first_line); - assert_eq!(chunks[1], second_line); - } - - #[test] - fn chunk_hard_splits_continuous_text() { - let text = "x".repeat(WHATSAPP_MAX_MESSAGE_LEN * 2 + 100); - let chunks = chunk_for_whatsapp(&text); - assert!(chunks.len() >= 2); - for chunk in &chunks { - assert!(chunk.len() <= WHATSAPP_MAX_MESSAGE_LEN); - } - // Verify all content is preserved. - let reassembled: String = chunks.join(""); - assert_eq!(reassembled.len(), text.len()); - } - - #[test] - fn chunk_empty_string_returns_single_empty() { - let chunks = chunk_for_whatsapp(""); - assert_eq!(chunks, vec![""]); - } - - // ── markdown_to_whatsapp tests ──────────────────────────────────────── - - #[test] - fn md_to_wa_converts_headers_to_bold() { - assert_eq!(markdown_to_whatsapp("# Title"), "*Title*"); - assert_eq!(markdown_to_whatsapp("## Subtitle"), "*Subtitle*"); - assert_eq!(markdown_to_whatsapp("### Section"), "*Section*"); - assert_eq!(markdown_to_whatsapp("###### Deep"), "*Deep*"); - } - - #[test] - fn md_to_wa_converts_bold() { - assert_eq!(markdown_to_whatsapp("**bold text**"), "*bold text*"); - } - - #[test] - fn md_to_wa_converts_bold_italic() { - assert_eq!(markdown_to_whatsapp("***emphasis***"), "*_emphasis_*"); - } - - #[test] - fn md_to_wa_converts_strikethrough() { - assert_eq!(markdown_to_whatsapp("~~removed~~"), "~removed~"); - } - - #[test] - fn md_to_wa_converts_links() { - assert_eq!( - markdown_to_whatsapp("[click here](https://example.com)"), - "click here (https://example.com)" - ); - } - - #[test] - fn md_to_wa_removes_horizontal_rules() { - assert_eq!(markdown_to_whatsapp("above\n---\nbelow"), "above\n\nbelow"); - } - - #[test] - fn md_to_wa_preserves_inline_code() { - assert_eq!(markdown_to_whatsapp("use `foo()` here"), "use `foo()` here"); - } - - #[test] - fn md_to_wa_preserves_code_blocks() { - let input = "before\n```rust\nfn main() {\n println!(\"**not bold**\");\n}\n```\nafter"; - let output = markdown_to_whatsapp(input); - // Code block content must NOT be converted. - assert!(output.contains("\"**not bold**\"")); - // But surrounding text is still converted. - assert!(output.contains("before")); - assert!(output.contains("after")); - } - - #[test] - fn md_to_wa_mixed_message() { - let input = "### Philosophy\n- **Stories** define the change\n- ~~old~~ is gone\n- See [docs](https://example.com)"; - let output = markdown_to_whatsapp(input); - assert!(output.starts_with("*Philosophy*")); - assert!(output.contains("*Stories*")); - assert!(output.contains("~old~")); - assert!(output.contains("docs (https://example.com)")); - } - - #[test] - fn md_to_wa_passthrough_plain_text() { - let plain = "Hello, how are you?"; - assert_eq!(markdown_to_whatsapp(plain), plain); - } - - #[test] - fn md_to_wa_empty_string() { - assert_eq!(markdown_to_whatsapp(""), ""); - } - - // ── WhatsApp history persistence tests ────────────────────────────── - - #[test] - fn save_and_load_whatsapp_history_round_trips() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - std::fs::create_dir_all(&sk).unwrap(); - - let mut history = HashMap::new(); - history.insert( - "15551234567".to_string(), - RoomConversation { - session_id: Some("sess-abc".to_string()), - entries: vec![ - ConversationEntry { - role: ConversationRole::User, - sender: "15551234567".to_string(), - content: "hello".to_string(), - }, - ConversationEntry { - role: ConversationRole::Assistant, - sender: String::new(), - content: "hi there!".to_string(), - }, - ], - }, - ); - - save_whatsapp_history(tmp.path(), &history); - let loaded = load_whatsapp_history(tmp.path()); - - assert_eq!(loaded.len(), 1); - let conv = loaded.get("15551234567").unwrap(); - assert_eq!(conv.session_id.as_deref(), Some("sess-abc")); - assert_eq!(conv.entries.len(), 2); - assert_eq!(conv.entries[0].content, "hello"); - assert_eq!(conv.entries[1].content, "hi there!"); - } - - // ── TwilioWhatsAppTransport tests ───────────────────────────────── - - #[tokio::test] - async fn twilio_send_message_calls_twilio_api() { - let mut server = mockito::Server::new_async().await; - let mock = server - .mock("POST", "/2010-04-01/Accounts/ACtest/Messages.json") - .with_body(r#"{"sid": "SMtest123"}"#) - .create_async() - .await; - - let transport = TwilioWhatsAppTransport::with_api_base( - "ACtest".to_string(), - "authtoken".to_string(), - "+14155551234".to_string(), - server.url(), - ); - - let result = transport.send_message("+15551234567", "hello", "").await; - assert!(result.is_ok(), "unexpected err: {:?}", result.err()); - assert_eq!(result.unwrap(), "SMtest123"); - mock.assert_async().await; - } - - #[tokio::test] - async fn twilio_send_message_returns_err_on_api_error() { - let mut server = mockito::Server::new_async().await; - server - .mock("POST", "/2010-04-01/Accounts/ACtest/Messages.json") - .with_status(401) - .with_body(r#"{"message": "Unauthorized"}"#) - .create_async() - .await; - - let transport = TwilioWhatsAppTransport::with_api_base( - "ACtest".to_string(), - "badtoken".to_string(), - "+14155551234".to_string(), - server.url(), - ); - - let result = transport.send_message("+15551234567", "hello", "").await; - assert!(result.is_err()); - assert!(result.unwrap_err().contains("401")); - } - - #[tokio::test] - async fn twilio_edit_message_sends_new_message() { - let mut server = mockito::Server::new_async().await; - let mock = server - .mock("POST", "/2010-04-01/Accounts/ACtest/Messages.json") - .with_body(r#"{"sid": "SMedit456"}"#) - .create_async() - .await; - - let transport = TwilioWhatsAppTransport::with_api_base( - "ACtest".to_string(), - "authtoken".to_string(), - "+14155551234".to_string(), - server.url(), - ); - - let result = transport - .edit_message("+15551234567", "old-sid", "updated text", "") - .await; - assert!(result.is_ok()); - mock.assert_async().await; - } - - #[tokio::test] - async fn twilio_send_typing_is_noop() { - let transport = TwilioWhatsAppTransport::new( - "ACtest".to_string(), - "authtoken".to_string(), - "+14155551234".to_string(), - ); - assert!(transport.send_typing("+15551234567", true).await.is_ok()); - } - - // ── extract_twilio_text_messages tests ──────────────────────────── - - #[test] - fn extract_twilio_text_messages_parses_valid_form() { - let body = b"From=whatsapp%3A%2B15551234567&Body=hello+world&To=whatsapp%3A%2B14155551234&MessageSid=SMtest"; - let msgs = extract_twilio_text_messages(body); - assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].0, "+15551234567"); - assert_eq!(msgs[0].1, "hello world"); - } - - #[test] - fn extract_twilio_text_messages_strips_whatsapp_prefix() { - let body = b"From=whatsapp%3A%2B15551234567&Body=hi"; - let msgs = extract_twilio_text_messages(body); - assert_eq!(msgs.len(), 1); - assert_eq!(msgs[0].0, "+15551234567"); - } - - #[test] - fn extract_twilio_text_messages_returns_empty_on_missing_from() { - let body = b"Body=hello"; - let msgs = extract_twilio_text_messages(body); - assert!(msgs.is_empty()); - } - - #[test] - fn extract_twilio_text_messages_returns_empty_on_missing_body() { - let body = b"From=whatsapp%3A%2B15551234567"; - let msgs = extract_twilio_text_messages(body); - assert!(msgs.is_empty()); - } - - #[test] - fn extract_twilio_text_messages_returns_empty_on_empty_body() { - let body = b"From=whatsapp%3A%2B15551234567&Body="; - let msgs = extract_twilio_text_messages(body); - assert!(msgs.is_empty()); - } - - #[test] - fn extract_twilio_text_messages_returns_empty_on_invalid_form() { - let body = b"not valid form encoded {{{{"; - // serde_urlencoded is lenient, so this might parse or return empty - // Either way it must not panic. - let _msgs = extract_twilio_text_messages(body); - } - - // ── Allowlist tests ─────────────────────────────────────────────────── - - /// Build a minimal WhatsAppWebhookContext for allowlist tests. - fn make_ctx_with_allowlist( - allowed_phones: Vec, - ) -> Arc { - use crate::agents::AgentPool; - use crate::io::watcher::WatcherEvent; - - struct NullTransport; - - #[async_trait::async_trait] - impl crate::chat::ChatTransport for NullTransport { - async fn send_message( - &self, - _room: &str, - _plain: &str, - _html: &str, - ) -> Result { - Ok(String::new()) - } - async fn edit_message( - &self, - _room: &str, - _id: &str, - _plain: &str, - _html: &str, - ) -> Result<(), String> { - Ok(()) - } - async fn send_typing(&self, _room: &str, _typing: bool) -> Result<(), String> { - Ok(()) - } - } - - let tmp = tempfile::tempdir().unwrap(); - let (tx, _rx) = tokio::sync::broadcast::channel::(16); - let agents = Arc::new(AgentPool::new(3999, tx)); - let tracker = Arc::new(MessagingWindowTracker::new()); - let (_perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel(); - Arc::new(WhatsAppWebhookContext { - verify_token: "tok".to_string(), - provider: "meta".to_string(), - transport: Arc::new(NullTransport), - project_root: tmp.path().to_path_buf(), - agents, - bot_name: "Bot".to_string(), - bot_user_id: "whatsapp-bot".to_string(), - ambient_rooms: Arc::new(std::sync::Mutex::new(Default::default())), - history: Arc::new(tokio::sync::Mutex::new(Default::default())), - history_size: 20, - window_tracker: tracker, - allowed_phones, - perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), - pending_perm_replies: Arc::new(tokio::sync::Mutex::new(Default::default())), - permission_timeout_secs: 120, - }) - } - - #[tokio::test] - async fn allowlist_blocks_unauthorized_sender() { - let allowed = vec!["+15551111111".to_string()]; - let ctx = make_ctx_with_allowlist(allowed); - let unauthorized = "+15559999999"; - - handle_incoming_message(&ctx, unauthorized, "hello").await; - - // window_tracker is only updated AFTER the allowlist check, so an - // unauthorized sender must leave the tracker untouched. - assert!( - !ctx.window_tracker.is_within_window(unauthorized), - "unauthorized sender should not have updated the window tracker" - ); - } - - #[tokio::test] - async fn allowlist_empty_allows_all_senders() { - // Empty allowlist = open (backwards compatible). - let ctx = make_ctx_with_allowlist(vec![]); - let sender = "+15551234567"; - - handle_incoming_message(&ctx, sender, "hello").await; - - // window_tracker.record_message is called right after the allowlist - // check passes, so the sender should be recorded. - assert!( - ctx.window_tracker.is_within_window(sender), - "sender should be recorded when allowlist is empty" - ); - } - - #[tokio::test] - async fn allowlist_allows_listed_sender() { - let sender = "+15551111111"; - let ctx = make_ctx_with_allowlist(vec![sender.to_string()]); - - handle_incoming_message(&ctx, sender, "hello").await; - - assert!( - ctx.window_tracker.is_within_window(sender), - "listed sender should be recorded in the window tracker" - ); - } - - // ── rebuild command extraction ───────────────────────────────────── - - #[test] - fn rebuild_command_extracted_from_plain_message() { - // WhatsApp messages arrive without a bot mention prefix. - // extract_rebuild_command must recognise "rebuild" by itself. - let result = crate::chat::transport::matrix::rebuild::extract_rebuild_command( - "rebuild", - "Timmy", - "@timmy:home.local", - ); - assert!(result.is_some(), "plain 'rebuild' should be recognised"); - } - - #[test] - fn rebuild_command_extracted_with_bot_name_prefix() { - let result = crate::chat::transport::matrix::rebuild::extract_rebuild_command( - "Timmy rebuild", - "Timmy", - "@timmy:home.local", - ); - assert!(result.is_some(), "'Timmy rebuild' should be recognised"); - } - - #[test] - fn non_rebuild_whatsapp_message_not_extracted() { - let result = crate::chat::transport::matrix::rebuild::extract_rebuild_command( - "status", - "Timmy", - "@timmy:home.local", - ); - assert!(result.is_none(), "'status' should not be recognised as rebuild"); - } - - #[test] - fn load_whatsapp_history_returns_empty_when_file_missing() { - let tmp = tempfile::tempdir().unwrap(); - let history = load_whatsapp_history(tmp.path()); - assert!(history.is_empty()); - } - - #[test] - fn load_whatsapp_history_returns_empty_on_invalid_json() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write(sk.join("whatsapp_history.json"), "not json {{{").unwrap(); - let history = load_whatsapp_history(tmp.path()); - assert!(history.is_empty()); - } - - #[test] - fn save_whatsapp_history_preserves_multiple_senders() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - std::fs::create_dir_all(&sk).unwrap(); - - let mut history = HashMap::new(); - history.insert( - "111".to_string(), - RoomConversation { - session_id: None, - entries: vec![ConversationEntry { - role: ConversationRole::User, - sender: "111".to_string(), - content: "msg1".to_string(), - }], - }, - ); - history.insert( - "222".to_string(), - RoomConversation { - session_id: Some("sess-222".to_string()), - entries: vec![ConversationEntry { - role: ConversationRole::User, - sender: "222".to_string(), - content: "msg2".to_string(), - }], - }, - ); - - save_whatsapp_history(tmp.path(), &history); - let loaded = load_whatsapp_history(tmp.path()); - - assert_eq!(loaded.len(), 2); - assert!(loaded.contains_key("111")); - assert!(loaded.contains_key("222")); - assert_eq!(loaded["222"].session_id.as_deref(), Some("sess-222")); - } - - // ── reset command extraction ─────────────────────────────────────── - - #[test] - fn reset_command_extracted_from_plain_message() { - let result = crate::chat::transport::matrix::reset::extract_reset_command( - "reset", - "Timmy", - "@timmy:home.local", - ); - assert!(result.is_some(), "plain 'reset' should be recognised"); - } - - #[test] - fn reset_command_extracted_with_bot_name_prefix() { - let result = crate::chat::transport::matrix::reset::extract_reset_command( - "Timmy reset", - "Timmy", - "@timmy:home.local", - ); - assert!(result.is_some(), "'Timmy reset' should be recognised"); - } - - #[tokio::test] - async fn reset_command_clears_whatsapp_session() { - use std::sync::Arc; - use tokio::sync::Mutex as TokioMutex; - - let sender = "+15555550100"; - let history: WhatsAppConversationHistory = Arc::new(TokioMutex::new({ - let mut m = HashMap::new(); - m.insert(sender.to_string(), RoomConversation { - session_id: Some("old-session".to_string()), - entries: vec![ConversationEntry { - role: ConversationRole::User, - sender: sender.to_string(), - content: "previous message".to_string(), - }], - }); - m - })); - - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - std::fs::create_dir_all(&sk).unwrap(); - - { - let mut guard = history.lock().await; - let conv = guard.entry(sender.to_string()).or_insert_with(RoomConversation::default); - conv.session_id = None; - conv.entries.clear(); - save_whatsapp_history(tmp.path(), &guard); - } - - let guard = history.lock().await; - let conv = guard.get(sender).unwrap(); - assert!(conv.session_id.is_none(), "session_id should be cleared"); - assert!(conv.entries.is_empty(), "entries should be cleared"); - } - - #[test] - fn start_command_extracted_from_plain_message() { - // WhatsApp messages arrive without a bot mention prefix. - // extract_start_command must recognise "start 42" by itself. - let result = crate::chat::transport::matrix::start::extract_start_command( - "start 42", - "Timmy", - "@timmy:home.local", - ); - assert!(result.is_some(), "plain 'start 42' should be recognised"); - assert_eq!( - result, - Some(crate::chat::transport::matrix::start::StartCommand::Start { - story_number: "42".to_string(), - agent_hint: None, - }) - ); - } - - #[test] - fn start_command_extracted_with_bot_name_prefix() { - let result = crate::chat::transport::matrix::start::extract_start_command( - "Timmy start 99", - "Timmy", - "@timmy:home.local", - ); - assert!(result.is_some(), "'Timmy start 99' should be recognised"); - } - - #[test] - fn non_start_whatsapp_message_not_extracted() { - let result = crate::chat::transport::matrix::start::extract_start_command( - "help", - "Timmy", - "@timmy:home.local", - ); - assert!(result.is_none(), "'help' should not be recognised as start"); - } -} diff --git a/server/src/main.rs b/server/src/main.rs index 8df77ce6..65c57b9b 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -4,12 +4,12 @@ mod agent_log; mod agents; +mod chat; mod config; mod http; mod io; mod llm; pub mod log_buffer; -mod chat; pub mod rebuild; mod state; mod store; @@ -17,6 +17,7 @@ mod workflow; mod worktree; use crate::agents::AgentPool; +use crate::chat::transport::whatsapp::WhatsAppConversationHistory; use crate::http::build_routes; use crate::http::context::AppContext; use crate::http::{remove_port_file, resolve_port, write_port_file}; @@ -116,17 +117,11 @@ async fn main() -> Result<(), std::io::Error> { // directory. We do not create directories from the command line. if let Some(ref path) = explicit_path { if !path.exists() { - eprintln!( - "error: path does not exist: {}", - path.display() - ); + eprintln!("error: path does not exist: {}", path.display()); std::process::exit(1); } if !path.is_dir() { - eprintln!( - "error: path is not a directory: {}", - path.display() - ); + eprintln!("error: path is not a directory: {}", path.display()); std::process::exit(1); } } @@ -276,24 +271,23 @@ async fn main() -> Result<(), std::io::Error> { .filter(|cfg| cfg.transport == "whatsapp") .map(|cfg| { let provider = cfg.whatsapp_provider.clone(); - let transport: Arc = - if provider == "twilio" { - Arc::new(chat::transport::whatsapp::TwilioWhatsAppTransport::new( - cfg.twilio_account_sid.clone().unwrap_or_default(), - cfg.twilio_auth_token.clone().unwrap_or_default(), - cfg.twilio_whatsapp_number.clone().unwrap_or_default(), - )) - } else { - let template_name = cfg - .whatsapp_notification_template - .clone() - .unwrap_or_else(|| "pipeline_notification".to_string()); - Arc::new(chat::transport::whatsapp::WhatsAppTransport::new( - cfg.whatsapp_phone_number_id.clone().unwrap_or_default(), - cfg.whatsapp_access_token.clone().unwrap_or_default(), - template_name, - )) - }; + let transport: Arc = if provider == "twilio" { + Arc::new(chat::transport::whatsapp::TwilioWhatsAppTransport::new( + cfg.twilio_account_sid.clone().unwrap_or_default(), + cfg.twilio_auth_token.clone().unwrap_or_default(), + cfg.twilio_whatsapp_number.clone().unwrap_or_default(), + )) + } else { + let template_name = cfg + .whatsapp_notification_template + .clone() + .unwrap_or_else(|| "pipeline_notification".to_string()); + Arc::new(chat::transport::whatsapp::WhatsAppTransport::new( + cfg.whatsapp_phone_number_id.clone().unwrap_or_default(), + cfg.whatsapp_access_token.clone().unwrap_or_default(), + template_name, + )) + }; let bot_name = cfg .display_name .clone() @@ -363,17 +357,16 @@ async fn main() -> Result<(), std::io::Error> { // • WhatsApp: active senders are tracked at runtime in ambient_rooms. // We keep the WhatsApp context Arc so we can read the rooms at shutdown. // • Matrix: the bot task manages its own announcement via matrix_shutdown_tx. - let bot_shutdown_notifier: Option> = - if let Some(ref ctx) = slack_ctx { - let channels: Vec = ctx.channel_ids.iter().cloned().collect(); - Some(Arc::new(BotShutdownNotifier::new( - Arc::clone(&ctx.transport) as Arc, - channels, - ctx.bot_name.clone(), - ))) - } else { - None - }; + let bot_shutdown_notifier: Option> = if let Some(ref ctx) = slack_ctx { + let channels: Vec = ctx.channel_ids.iter().cloned().collect(); + Some(Arc::new(BotShutdownNotifier::new( + Arc::clone(&ctx.transport) as Arc, + channels, + ctx.bot_name.clone(), + ))) + } else { + None + }; // Retain a reference to the WhatsApp context for shutdown notifications. // At shutdown time we read ambient_rooms to get the current set of active senders. let whatsapp_ctx_for_shutdown: Option> = @@ -391,14 +384,13 @@ async fn main() -> Result<(), std::io::Error> { if let Some(ref ctx) = whatsapp_ctx { let transport = Arc::clone(&ctx.transport); let bot_name = ctx.bot_name.clone(); - let history = Arc::clone(&ctx.history); + let history: WhatsAppConversationHistory = Arc::clone(&ctx.history); tokio::spawn(async move { let senders: Vec = history.lock().await.keys().cloned().collect(); if senders.is_empty() { return; } - let notifier = - crate::rebuild::BotShutdownNotifier::new(transport, senders, bot_name); + let notifier = crate::rebuild::BotShutdownNotifier::new(transport, senders, bot_name); notifier.notify_startup().await; }); } @@ -410,8 +402,7 @@ async fn main() -> Result<(), std::io::Error> { if channels.is_empty() { return; } - let notifier = - crate::rebuild::BotShutdownNotifier::new(transport, channels, bot_name); + let notifier = crate::rebuild::BotShutdownNotifier::new(transport, channels, bot_name); notifier.notify_startup().await; }); } @@ -591,10 +582,7 @@ name = "coder" #[test] fn classify_help_short() { - assert_eq!( - classify_cli_args(&["-h".to_string()]), - CliDirective::Help - ); + assert_eq!(classify_cli_args(&["-h".to_string()]), CliDirective::Help); } #[test]