Files
storkit/server/src/whatsapp.rs

580 lines
18 KiB
Rust
Raw Normal View History

//! WhatsApp Business API integration.
//!
//! Provides:
//! - [`WhatsAppTransport`] — a [`ChatTransport`] that sends messages via the
//! Meta Graph API (`graph.facebook.com/v21.0/{phone_number_id}/messages`).
//! - [`webhook_verify`] / [`webhook_receive`] — Poem handlers for the WhatsApp
//! webhook (GET verification handshake + POST incoming messages).
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use crate::agents::AgentPool;
use crate::slog;
use crate::transport::{ChatTransport, MessageId};
// ── Graph API base URL (overridable for tests) ──────────────────────────
const GRAPH_API_BASE: &str = "https://graph.facebook.com/v21.0";
// ── WhatsApp Transport ──────────────────────────────────────────────────
/// Real WhatsApp Business API transport.
///
/// Sends text messages via `POST {GRAPH_API_BASE}/{phone_number_id}/messages`.
pub struct WhatsAppTransport {
phone_number_id: String,
access_token: String,
client: reqwest::Client,
/// Optional base URL override for tests.
api_base: String,
}
impl WhatsAppTransport {
pub fn new(phone_number_id: String, access_token: String) -> Self {
Self {
phone_number_id,
access_token,
client: reqwest::Client::new(),
api_base: GRAPH_API_BASE.to_string(),
}
}
#[cfg(test)]
fn with_api_base(phone_number_id: String, access_token: String, api_base: String) -> Self {
Self {
phone_number_id,
access_token,
client: reqwest::Client::new(),
api_base,
}
}
/// Send a text message to a WhatsApp user via the Graph API.
async fn send_text(&self, to: &str, body: &str) -> Result<String, String> {
let url = format!(
"{}/{}/messages",
self.api_base, self.phone_number_id
);
let payload = GraphSendMessage {
messaging_product: "whatsapp",
to,
r#type: "text",
text: GraphTextBody { body },
};
let resp = self
.client
.post(&url)
.bearer_auth(&self.access_token)
.json(&payload)
.send()
.await
.map_err(|e| format!("WhatsApp API request failed: {e}"))?;
let status = resp.status();
let resp_text = resp
.text()
.await
.unwrap_or_else(|_| "<no body>".to_string());
if !status.is_success() {
return Err(format!(
"WhatsApp API returned {status}: {resp_text}"
));
}
// Extract the message ID from the response.
let parsed: GraphSendResponse = serde_json::from_str(&resp_text).map_err(|e| {
format!("Failed to parse WhatsApp API response: {e} — body: {resp_text}")
})?;
let msg_id = parsed
.messages
.first()
.map(|m| m.id.clone())
.unwrap_or_default();
Ok(msg_id)
}
}
#[async_trait]
impl ChatTransport for WhatsAppTransport {
async fn send_message(
&self,
recipient: &str,
plain: &str,
_html: &str,
) -> Result<MessageId, String> {
slog!("[whatsapp] send_message to {recipient}: {plain:.80}");
self.send_text(recipient, plain).await
}
async fn edit_message(
&self,
recipient: &str,
_original_message_id: &str,
plain: &str,
html: &str,
) -> Result<(), String> {
// WhatsApp does not support message editing — send a new message.
slog!("[whatsapp] edit_message — WhatsApp does not support edits, sending new message");
self.send_message(recipient, plain, html).await.map(|_| ())
}
async fn send_typing(&self, _recipient: &str, _typing: bool) -> Result<(), String> {
// WhatsApp Business API does not expose typing indicators.
Ok(())
}
}
// ── Graph API request/response types ────────────────────────────────────
#[derive(Serialize)]
struct GraphSendMessage<'a> {
messaging_product: &'a str,
to: &'a str,
r#type: &'a str,
text: GraphTextBody<'a>,
}
#[derive(Serialize)]
struct GraphTextBody<'a> {
body: &'a str,
}
#[derive(Deserialize)]
struct GraphSendResponse {
#[serde(default)]
messages: Vec<GraphMessageId>,
}
#[derive(Deserialize)]
struct GraphMessageId {
id: String,
}
// ── Webhook types (Meta → us) ───────────────────────────────────────────
/// Top-level webhook payload from Meta.
#[derive(Deserialize, Debug)]
pub struct WebhookPayload {
#[serde(default)]
pub entry: Vec<WebhookEntry>,
}
#[derive(Deserialize, Debug)]
pub struct WebhookEntry {
#[serde(default)]
pub changes: Vec<WebhookChange>,
}
#[derive(Deserialize, Debug)]
pub struct WebhookChange {
pub value: Option<WebhookValue>,
}
#[derive(Deserialize, Debug)]
pub struct WebhookValue {
#[serde(default)]
pub messages: Vec<WebhookMessage>,
pub metadata: Option<WebhookMetadata>,
}
#[derive(Deserialize, Debug)]
pub struct WebhookMetadata {
pub phone_number_id: Option<String>,
}
#[derive(Deserialize, Debug)]
pub struct WebhookMessage {
pub from: Option<String>,
pub r#type: Option<String>,
pub text: Option<WebhookText>,
}
#[derive(Deserialize, Debug)]
pub struct WebhookText {
pub body: Option<String>,
}
/// Extract text messages from a webhook payload.
///
/// Returns `(sender_phone, message_body)` pairs.
pub fn extract_text_messages(payload: &WebhookPayload) -> Vec<(String, String)> {
let mut messages = Vec::new();
for entry in &payload.entry {
for change in &entry.changes {
if let Some(value) = &change.value {
for msg in &value.messages {
if msg.r#type.as_deref() == Some("text")
&& let (Some(from), Some(text)) = (&msg.from, &msg.text)
&& let Some(body) = &text.body
{
messages.push((from.clone(), body.clone()));
}
}
}
}
}
messages
}
// ── Webhook handlers (Poem) ────────────────────────────────────────────
use poem::{Request, Response, handler, http::StatusCode, web::Query};
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Mutex;
/// Query parameters for the webhook verification GET request.
#[derive(Deserialize)]
pub struct VerifyQuery {
#[serde(rename = "hub.mode")]
pub hub_mode: Option<String>,
#[serde(rename = "hub.verify_token")]
pub hub_verify_token: Option<String>,
#[serde(rename = "hub.challenge")]
pub hub_challenge: Option<String>,
}
/// Shared context for webhook handlers, injected via Poem's `Data` extractor.
pub struct WhatsAppWebhookContext {
pub verify_token: String,
pub transport: Arc<WhatsAppTransport>,
pub project_root: PathBuf,
pub agents: Arc<AgentPool>,
pub bot_name: String,
/// The bot's "user ID" for command dispatch (e.g. "whatsapp-bot").
pub bot_user_id: String,
pub ambient_rooms: Arc<Mutex<HashSet<String>>>,
}
/// GET /webhook/whatsapp — Meta verification handshake.
///
/// Meta sends `hub.mode=subscribe&hub.verify_token=<token>&hub.challenge=<challenge>`.
/// We return the challenge if the token matches.
#[handler]
pub async fn webhook_verify(
Query(q): Query<VerifyQuery>,
ctx: poem::web::Data<&Arc<WhatsAppWebhookContext>>,
) -> Response {
if q.hub_mode.as_deref() == Some("subscribe")
&& q.hub_verify_token.as_deref() == Some(&ctx.verify_token)
&& let Some(challenge) = q.hub_challenge
{
slog!("[whatsapp] Webhook verification succeeded");
return Response::builder()
.status(StatusCode::OK)
.body(challenge);
}
slog!("[whatsapp] Webhook verification failed");
Response::builder()
.status(StatusCode::FORBIDDEN)
.body("Verification failed")
}
/// POST /webhook/whatsapp — receive incoming messages from Meta.
#[handler]
pub async fn webhook_receive(
req: &Request,
body: poem::Body,
ctx: poem::web::Data<&Arc<WhatsAppWebhookContext>>,
) -> Response {
let _ = req;
let bytes = match body.into_bytes().await {
Ok(b) => b,
Err(e) => {
slog!("[whatsapp] Failed to read webhook body: {e}");
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.body("Bad request");
}
};
let payload: WebhookPayload = match serde_json::from_slice(&bytes) {
Ok(p) => p,
Err(e) => {
slog!("[whatsapp] Failed to parse webhook payload: {e}");
// Meta expects 200 even on parse errors to avoid retries.
return Response::builder()
.status(StatusCode::OK)
.body("ok");
}
};
let messages = extract_text_messages(&payload);
if messages.is_empty() {
// Status updates, read receipts, etc. — acknowledge silently.
return Response::builder()
.status(StatusCode::OK)
.body("ok");
}
let ctx = Arc::clone(*ctx);
tokio::spawn(async move {
for (sender, text) in messages {
slog!("[whatsapp] Message from {sender}: {text}");
handle_incoming_message(&ctx, &sender, &text).await;
}
});
Response::builder()
.status(StatusCode::OK)
.body("ok")
}
/// Dispatch an incoming WhatsApp message to bot commands.
async fn handle_incoming_message(
ctx: &WhatsAppWebhookContext,
sender: &str,
message: &str,
) {
use crate::matrix::commands::{CommandDispatch, try_handle_command};
let dispatch = CommandDispatch {
bot_name: &ctx.bot_name,
bot_user_id: &ctx.bot_user_id,
project_root: &ctx.project_root,
agents: &ctx.agents,
ambient_rooms: &ctx.ambient_rooms,
room_id: sender,
// WhatsApp messages are always "addressed" to the bot (1:1 or bot-specific).
is_addressed: true,
};
if let Some(response) = try_handle_command(&dispatch, message) {
slog!("[whatsapp] Sending command response to {sender}");
if let Err(e) = ctx.transport.send_message(sender, &response, "").await {
slog!("[whatsapp] Failed to send reply to {sender}: {e}");
}
return;
}
// Check for async commands (htop, delete).
if let Some(htop_cmd) = crate::matrix::htop::extract_htop_command(
message,
&ctx.bot_name,
&ctx.bot_user_id,
) {
use crate::matrix::htop::HtopCommand;
slog!("[whatsapp] Handling htop command from {sender}");
match htop_cmd {
HtopCommand::Stop => {
// htop stop — no-op on WhatsApp since there's no persistent
// editable message; just acknowledge.
let _ = ctx
.transport
.send_message(sender, "htop stopped.", "")
.await;
}
HtopCommand::Start { duration_secs } => {
// On WhatsApp, send a single snapshot instead of a live-updating
// dashboard since we can't edit messages.
let snapshot =
crate::matrix::htop::build_htop_message(&ctx.agents, 0, duration_secs);
let _ = ctx
.transport
.send_message(sender, &snapshot, "")
.await;
}
}
return;
}
if let Some(del_cmd) = crate::matrix::delete::extract_delete_command(
message,
&ctx.bot_name,
&ctx.bot_user_id,
) {
let response = match del_cmd {
crate::matrix::delete::DeleteCommand::Delete { story_number } => {
slog!("[whatsapp] Handling delete command from {sender}: story {story_number}");
crate::matrix::delete::handle_delete(
&ctx.bot_name,
&story_number,
&ctx.project_root,
&ctx.agents,
)
.await
}
crate::matrix::delete::DeleteCommand::BadArgs => {
format!("Usage: `{} delete <number>`", ctx.bot_name)
}
};
let _ = ctx.transport.send_message(sender, &response, "").await;
return;
}
// No command matched — inform the user that only commands are supported.
// (LLM passthrough is a separate story.)
let _ = ctx
.transport
.send_message(
sender,
"I only respond to commands right now. Try `help` to see what's available.",
"",
)
.await;
}
// ── Tests ───────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn extract_text_messages_parses_valid_payload() {
let json = r#"{
"entry": [{
"changes": [{
"value": {
"messages": [{
"from": "15551234567",
"type": "text",
"text": { "body": "help" }
}],
"metadata": { "phone_number_id": "123456" }
}
}]
}]
}"#;
let payload: WebhookPayload = serde_json::from_str(json).unwrap();
let msgs = extract_text_messages(&payload);
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].0, "15551234567");
assert_eq!(msgs[0].1, "help");
}
#[test]
fn extract_text_messages_ignores_non_text() {
let json = r#"{
"entry": [{
"changes": [{
"value": {
"messages": [{
"from": "15551234567",
"type": "image",
"image": { "id": "img123" }
}],
"metadata": { "phone_number_id": "123456" }
}
}]
}]
}"#;
let payload: WebhookPayload = serde_json::from_str(json).unwrap();
let msgs = extract_text_messages(&payload);
assert!(msgs.is_empty());
}
#[test]
fn extract_text_messages_handles_empty_payload() {
let json = r#"{ "entry": [] }"#;
let payload: WebhookPayload = serde_json::from_str(json).unwrap();
let msgs = extract_text_messages(&payload);
assert!(msgs.is_empty());
}
#[test]
fn extract_text_messages_handles_multiple_messages() {
let json = r#"{
"entry": [{
"changes": [{
"value": {
"messages": [
{ "from": "111", "type": "text", "text": { "body": "status" } },
{ "from": "222", "type": "text", "text": { "body": "help" } }
],
"metadata": { "phone_number_id": "123456" }
}
}]
}]
}"#;
let payload: WebhookPayload = serde_json::from_str(json).unwrap();
let msgs = extract_text_messages(&payload);
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0].1, "status");
assert_eq!(msgs[1].1, "help");
}
#[tokio::test]
async fn transport_send_message_calls_graph_api() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("POST", "/123456/messages")
.match_header("authorization", "Bearer test-token")
.with_body(r#"{"messages": [{"id": "wamid.abc123"}]}"#)
.create_async()
.await;
let transport = WhatsAppTransport::with_api_base(
"123456".to_string(),
"test-token".to_string(),
server.url(),
);
let result = transport
.send_message("15551234567", "hello", "<p>hello</p>")
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), "wamid.abc123");
mock.assert_async().await;
}
#[tokio::test]
async fn transport_edit_sends_new_message() {
let mut server = mockito::Server::new_async().await;
let mock = server
.mock("POST", "/123456/messages")
.with_body(r#"{"messages": [{"id": "wamid.xyz"}]}"#)
.create_async()
.await;
let transport = WhatsAppTransport::with_api_base(
"123456".to_string(),
"test-token".to_string(),
server.url(),
);
let result = transport
.edit_message("15551234567", "old-msg-id", "updated", "<p>updated</p>")
.await;
assert!(result.is_ok());
mock.assert_async().await;
}
#[tokio::test]
async fn transport_send_typing_succeeds() {
let transport = WhatsAppTransport::new("123".to_string(), "tok".to_string());
assert!(transport.send_typing("room1", true).await.is_ok());
assert!(transport.send_typing("room1", false).await.is_ok());
}
#[tokio::test]
async fn transport_handles_api_error() {
let mut server = mockito::Server::new_async().await;
server
.mock("POST", "/123456/messages")
.with_status(401)
.with_body(r#"{"error": {"message": "Invalid token"}}"#)
.create_async()
.await;
let transport = WhatsAppTransport::with_api_base(
"123456".to_string(),
"bad-token".to_string(),
server.url(),
);
let result = transport
.send_message("15551234567", "hello", "")
.await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("401"));
}
}