diff --git a/.storkit/project.toml b/.storkit/project.toml index fe28d449..233e6979 100644 --- a/.storkit/project.toml +++ b/.storkit/project.toml @@ -21,7 +21,7 @@ base_branch = "master" [[component]] name = "frontend" path = "frontend" -setup = ["npm install", "npm run build"] +setup = ["npm ci", "npm run build"] teardown = [] [[component]] diff --git a/.storkit/work/1_backlog/403_bug_whatsapp_and_slack_missing_rmtree_command_handler.md b/.storkit/work/1_backlog/403_bug_whatsapp_and_slack_missing_rmtree_command_handler.md index bf606e43..ab7d6a9e 100644 --- a/.storkit/work/1_backlog/403_bug_whatsapp_and_slack_missing_rmtree_command_handler.md +++ b/.storkit/work/1_backlog/403_bug_whatsapp_and_slack_missing_rmtree_command_handler.md @@ -1,5 +1,7 @@ --- name: "WhatsApp and Slack missing rmtree command handler" +retry_count: 0 +blocked: false --- # Bug 403: WhatsApp and Slack missing rmtree command handler diff --git a/.storkit/work/1_backlog/407_spike_fly_io_machines_for_multi_tenant_storkit_saas.md b/.storkit/work/1_backlog/407_spike_fly_io_machines_for_multi_tenant_storkit_saas.md deleted file mode 100644 index d9b756a7..00000000 --- a/.storkit/work/1_backlog/407_spike_fly_io_machines_for_multi_tenant_storkit_saas.md +++ /dev/null @@ -1,36 +0,0 @@ ---- -name: "Fly.io Machines for multi-tenant storkit SaaS — docs, security & pricing" ---- - -# Spike 407: Fly.io Machines for multi-tenant storkit SaaS — docs, security & pricing - -## Question - -What do Fly.io's published docs, security claims, and pricing say about using Machines as the isolation layer for a multi-tenant storkit SaaS? Is there anything that rules it out before we write code? - -## Hypothesis - -Fly.io Machines (Firecracker-based microVMs) are a viable isolation primitive for tenants running arbitrary shell commands, and the pricing model is workable at early SaaS scale. - -## Timebox - -2 hours - -## Investigation Plan - -- [ ] Read Fly.io Machines API docs — what are the core primitives (machine lifecycle, networking, volumes, secrets)? -- [ ] Research Fly.io's published isolation model — what security guarantees do they document for Firecracker microVMs? Summarise claims and explicitly flag what would require independent security review before production use. -- [ ] Research cold start time — what do Fly.io docs and community benchmarks claim? Note that real numbers require a test account (covered in spike 408). -- [ ] Research persistent volume support — can a volume be attached per-tenant? What are the size/count limits? -- [ ] Research secret injection options — env vars, Fly Secrets API, volume mounts. What's the right approach for per-tenant `~/.claude/.credentials.json`? -- [ ] Research machine count and org limits — any hard caps that would block SaaS growth? -- [ ] Research pricing — always-on vs stop-on-idle machine costs at 10, 100, 1000 tenants. Include volume and egress costs. -- [ ] Identify any documented showstoppers. - -## Findings - -- TBD - -## Recommendation - -- TBD diff --git a/.storkit/work/1_backlog/411_story_multi_account_oauth_token_rotation_on_rate_limit.md b/.storkit/work/1_backlog/411_story_multi_account_oauth_token_rotation_on_rate_limit.md new file mode 100644 index 00000000..43e39844 --- /dev/null +++ b/.storkit/work/1_backlog/411_story_multi_account_oauth_token_rotation_on_rate_limit.md @@ -0,0 +1,22 @@ +--- +name: "Multi-account OAuth token rotation on rate limit" +--- + +# Story 411: Multi-account OAuth token rotation on rate limit + +## User Story + +As a storkit user with multiple Claude Max subscriptions, I want the system to automatically rotate to a different account when one gets rate limited, so that agents and chat don't stall out waiting for limits to reset. + +## Acceptance Criteria + +- [ ] OAuth login flow stores credentials per-account (keyed by email), not overwriting previous accounts +- [ ] GET /oauth/status returns all stored accounts and their status (active, rate-limited, expired) +- [ ] When the active account hits a rate limit, storkit automatically swaps to the next available account's refresh token, refreshes, and retries +- [ ] The bot sends a notification in Matrix/WhatsApp when it swaps accounts +- [ ] If all accounts are rate limited, the bot surfaces a clear message with the time until the earliest reset +- [ ] A new /oauth/authorize login adds to the account pool rather than replacing the current credentials + +## Out of Scope + +- TBD diff --git a/.storkit/work/1_backlog/412_story_recheck_bot_command_to_re_run_gates_without_restarting_agent.md b/.storkit/work/1_backlog/412_story_recheck_bot_command_to_re_run_gates_without_restarting_agent.md new file mode 100644 index 00000000..bd484dbc --- /dev/null +++ b/.storkit/work/1_backlog/412_story_recheck_bot_command_to_re_run_gates_without_restarting_agent.md @@ -0,0 +1,23 @@ +--- +name: "Recheck bot command to re-run gates without restarting agent" +--- + +# Story 412: Recheck bot command to re-run gates without restarting agent + +## User Story + +As a user, I want to send `recheck ` to the bot so that it re-runs acceptance gates on an existing worktree without spawning a new agent, so I can unblock stories that failed due to environment issues without wasting agent turns. + +## Acceptance Criteria + +- [ ] recheck command is registered in chat/commands/mod.rs and appears in help output +- [ ] `recheck ` runs run_acceptance_gates on the story's existing worktree +- [ ] If gates pass, the story advances through the pipeline (same as if a coder completed successfully) +- [ ] If gates fail, the error output is returned to the user (not silently retried) +- [ ] If no worktree exists for the story, returns a clear error +- [ ] Does not spawn a new agent or increment retry_count +- [ ] Works from all transports (Matrix, WhatsApp, Slack) + +## Out of Scope + +- TBD diff --git a/.storkit/work/5_done/399_story_cli_port_flag_with_project_toml_persistence.md b/.storkit/work/6_archived/399_story_cli_port_flag_with_project_toml_persistence.md similarity index 100% rename from .storkit/work/5_done/399_story_cli_port_flag_with_project_toml_persistence.md rename to .storkit/work/6_archived/399_story_cli_port_flag_with_project_toml_persistence.md diff --git a/.storkit/work/5_done/400_bug_whatsapp_and_slack_missing_reset_command_handler.md b/.storkit/work/6_archived/400_bug_whatsapp_and_slack_missing_reset_command_handler.md similarity index 100% rename from .storkit/work/5_done/400_bug_whatsapp_and_slack_missing_reset_command_handler.md rename to .storkit/work/6_archived/400_bug_whatsapp_and_slack_missing_reset_command_handler.md diff --git a/.storkit/work/5_done/401_bug_whatsapp_and_slack_missing_start_command_handler.md b/.storkit/work/6_archived/401_bug_whatsapp_and_slack_missing_start_command_handler.md similarity index 100% rename from .storkit/work/5_done/401_bug_whatsapp_and_slack_missing_start_command_handler.md rename to .storkit/work/6_archived/401_bug_whatsapp_and_slack_missing_start_command_handler.md diff --git a/.storkit/work/6_archived/402_bug_whatsapp_and_slack_missing_rebuild_command_handler.md b/.storkit/work/6_archived/402_bug_whatsapp_and_slack_missing_rebuild_command_handler.md new file mode 100644 index 00000000..2c5c245b --- /dev/null +++ b/.storkit/work/6_archived/402_bug_whatsapp_and_slack_missing_rebuild_command_handler.md @@ -0,0 +1,26 @@ +--- +name: "WhatsApp and Slack missing rebuild command handler" +--- + +# Bug 402: WhatsApp and Slack missing rebuild command handler + +## Description + +The rebuild command has a fallback handler in chat/commands/mod.rs that returns None. Only Matrix has pre-dispatch handling for this command. On WhatsApp and Slack, the command falls through to the LLM path. + +## How to Reproduce + +1. Configure bot with transport = "whatsapp" or "slack"\n2. Send "rebuild" to the bot\n3. Check server logs + +## Actual Result + +Command falls through to LLM instead of triggering a server rebuild. + +## Expected Result + +The bot triggers a server rebuild and replies with confirmation. + +## Acceptance Criteria + +- [ ] WhatsApp transport handles rebuild command: triggers rebuild and replies with confirmation +- [ ] Slack transport handles rebuild command: triggers rebuild and replies with confirmation diff --git a/.storkit/work/5_done/405_story_auto_refresh_expired_oauth_token_for_claude_code_pty.md b/.storkit/work/6_archived/405_story_auto_refresh_expired_oauth_token_for_claude_code_pty.md similarity index 100% rename from .storkit/work/5_done/405_story_auto_refresh_expired_oauth_token_for_claude_code_pty.md rename to .storkit/work/6_archived/405_story_auto_refresh_expired_oauth_token_for_claude_code_pty.md diff --git a/.storkit/work/5_done/406_story_browser_based_oauth_login_flow_from_web_ui_and_chat_integrations.md b/.storkit/work/6_archived/406_story_browser_based_oauth_login_flow_from_web_ui_and_chat_integrations.md similarity index 100% rename from .storkit/work/5_done/406_story_browser_based_oauth_login_flow_from_web_ui_and_chat_integrations.md rename to .storkit/work/6_archived/406_story_browser_based_oauth_login_flow_from_web_ui_and_chat_integrations.md diff --git a/server/src/chat/commands/loc.rs b/server/src/chat/commands/loc.rs new file mode 100644 index 00000000..0500a569 --- /dev/null +++ b/server/src/chat/commands/loc.rs @@ -0,0 +1,244 @@ +//! Handler for the `loc` command — top source files by line count. + +use super::CommandContext; +use walkdir::WalkDir; + +const DEFAULT_TOP_N: usize = 10; + +/// Directories to skip during traversal. +const SKIP_DIRS: &[&str] = &[ + "target", + "node_modules", + ".git", + "dist", + "build", + ".next", + "coverage", + "test-results", +]; + +/// Path components that indicate a worktree path that should be skipped. +const SKIP_PATH_COMPONENTS: &[&str] = &[".storkit/worktrees"]; + +pub(super) fn handle_loc(ctx: &CommandContext) -> Option { + let top_n = if ctx.args.is_empty() { + DEFAULT_TOP_N + } else { + match ctx.args.split_whitespace().next().and_then(|s| s.parse::().ok()) { + Some(n) if n > 0 => n, + _ => { + return Some(format!( + "Usage: `loc [N]` — show top N source files by line count (default {DEFAULT_TOP_N})" + )); + } + } + }; + + let mut files: Vec<(usize, String)> = WalkDir::new(ctx.project_root) + .follow_links(false) + .into_iter() + .filter_entry(|e| { + if e.file_type().is_dir() { + let name = e.file_name().to_string_lossy(); + if SKIP_DIRS.iter().any(|s| *s == name.as_ref()) { + return false; + } + // Skip .storkit/worktrees + let path = e.path().to_string_lossy(); + if SKIP_PATH_COMPONENTS.iter().any(|s| path.contains(s)) { + return false; + } + } + true + }) + .filter_map(|entry| { + let entry = entry.ok()?; + if !entry.file_type().is_file() { + return None; + } + let path = entry.path(); + // Skip binary/generated files without a recognisable text extension. + let ext = path.extension().and_then(|e| e.to_str()).unwrap_or(""); + if !is_source_extension(ext) { + return None; + } + let content = std::fs::read_to_string(path).ok()?; + let line_count = content.lines().count(); + if line_count == 0 { + return None; + } + // Make path relative to project_root for display. + let rel = path + .strip_prefix(ctx.project_root) + .unwrap_or(path) + .to_string_lossy() + .into_owned(); + Some((line_count, rel)) + }) + .collect(); + + files.sort_by(|a, b| b.0.cmp(&a.0)); + files.truncate(top_n); + + if files.is_empty() { + return Some("No source files found.".to_string()); + } + + let mut out = format!("**Top {} files by line count**\n\n", files.len()); + for (rank, (lines, path)) in files.iter().enumerate() { + out.push_str(&format!("{}. `{}` — {} lines\n", rank + 1, path, lines)); + } + Some(out) +} + +/// Returns true for file extensions considered source/text files. +fn is_source_extension(ext: &str) -> bool { + matches!( + ext, + "rs" | "ts" | "tsx" | "js" | "jsx" | "py" | "go" | "java" | "c" | "cpp" | "h" + | "hpp" | "cs" | "rb" | "swift" | "kt" | "scala" | "hs" | "ml" | "ex" | "exs" + | "clj" | "lua" | "sh" | "bash" | "zsh" | "fish" | "ps1" | "toml" | "yaml" + | "yml" | "json" | "md" | "html" | "css" | "scss" | "less" | "sql" | "graphql" + | "proto" | "tf" | "hcl" | "nix" | "r" | "jl" | "dart" | "vue" | "svelte" + ) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use crate::agents::AgentPool; + use std::collections::HashSet; + use std::sync::{Arc, Mutex}; + + fn make_ctx<'a>( + agents: &'a Arc, + ambient_rooms: &'a Arc>>, + project_root: &'a std::path::Path, + args: &'a str, + ) -> super::super::CommandContext<'a> { + super::super::CommandContext { + bot_name: "Timmy", + args, + project_root, + agents, + ambient_rooms, + room_id: "!test:example.com", + } + } + + #[test] + fn loc_command_is_registered() { + use super::super::commands; + let found = commands().iter().any(|c| c.name == "loc"); + assert!(found, "loc command must be in the registry"); + } + + #[test] + fn loc_command_appears_in_help() { + let result = super::super::tests::try_cmd_addressed( + "Timmy", + "@timmy:homeserver.local", + "@timmy help", + ); + let output = result.unwrap(); + assert!(output.contains("loc"), "help should list loc command: {output}"); + } + + #[test] + fn loc_default_returns_top_10() { + let agents = Arc::new(AgentPool::new_test(3000)); + let ambient_rooms = Arc::new(Mutex::new(HashSet::new())); + let repo_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap_or(std::path::Path::new(".")); + let ctx = make_ctx(&agents, &ambient_rooms, repo_root, ""); + let output = handle_loc(&ctx).unwrap(); + assert!( + output.contains("Top"), + "output should contain 'Top': {output}" + ); + // At most 10 entries (numbered lines "1." through "10.") + let count = output.lines().filter(|l| l.contains(". `")).count(); + assert!(count <= 10, "default should return at most 10 files, got {count}"); + } + + #[test] + fn loc_with_arg_5_returns_at_most_5() { + let agents = Arc::new(AgentPool::new_test(3000)); + let ambient_rooms = Arc::new(Mutex::new(HashSet::new())); + let repo_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap_or(std::path::Path::new(".")); + let ctx = make_ctx(&agents, &ambient_rooms, repo_root, "5"); + let output = handle_loc(&ctx).unwrap(); + let count = output.lines().filter(|l| l.contains(". `")).count(); + assert!(count <= 5, "loc 5 should return at most 5 files, got {count}"); + } + + #[test] + fn loc_with_arg_20_returns_at_most_20() { + let agents = Arc::new(AgentPool::new_test(3000)); + let ambient_rooms = Arc::new(Mutex::new(HashSet::new())); + let repo_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap_or(std::path::Path::new(".")); + let ctx = make_ctx(&agents, &ambient_rooms, repo_root, "20"); + let output = handle_loc(&ctx).unwrap(); + let count = output.lines().filter(|l| l.contains(". `")).count(); + assert!(count <= 20, "loc 20 should return at most 20 files, got {count}"); + } + + #[test] + fn loc_output_contains_rank_and_line_count() { + let agents = Arc::new(AgentPool::new_test(3000)); + let ambient_rooms = Arc::new(Mutex::new(HashSet::new())); + let repo_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap_or(std::path::Path::new(".")); + let ctx = make_ctx(&agents, &ambient_rooms, repo_root, ""); + let output = handle_loc(&ctx).unwrap(); + // Each entry should have "N. `path` — N lines" + assert!( + output.contains("1. `"), + "first result should start with rank: {output}" + ); + assert!( + output.contains("lines"), + "output should mention 'lines': {output}" + ); + } + + #[test] + fn loc_invalid_arg_returns_usage() { + let agents = Arc::new(AgentPool::new_test(3000)); + let ambient_rooms = Arc::new(Mutex::new(HashSet::new())); + let repo_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap_or(std::path::Path::new(".")); + let ctx = make_ctx(&agents, &ambient_rooms, repo_root, "abc"); + let output = handle_loc(&ctx).unwrap(); + assert!( + output.contains("Usage"), + "invalid arg should show usage: {output}" + ); + } + + #[test] + fn loc_skips_worktrees_directory() { + let agents = Arc::new(AgentPool::new_test(3000)); + let ambient_rooms = Arc::new(Mutex::new(HashSet::new())); + let repo_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap_or(std::path::Path::new(".")); + let ctx = make_ctx(&agents, &ambient_rooms, repo_root, ""); + let output = handle_loc(&ctx).unwrap(); + assert!( + !output.contains(".storkit/worktrees"), + "output must not include paths inside worktrees: {output}" + ); + } +} diff --git a/server/src/chat/commands/mod.rs b/server/src/chat/commands/mod.rs index 21b06b3f..a0cfb8eb 100644 --- a/server/src/chat/commands/mod.rs +++ b/server/src/chat/commands/mod.rs @@ -10,6 +10,7 @@ mod assign; mod cost; mod git; mod help; +mod loc; mod move_story; mod overview; mod show; @@ -114,6 +115,11 @@ pub fn commands() -> &'static [BotCommand] { description: "Show token spend: 24h total, top stories, breakdown by agent type, and all-time total", handler: cost::handle_cost, }, + BotCommand { + name: "loc", + description: "Show top source files by line count: `loc` (top 10) or `loc `", + handler: loc::handle_loc, + }, BotCommand { name: "move", description: "Move a work item to a pipeline stage: `move ` (stages: backlog, current, qa, merge, done)", diff --git a/server/src/chat/transport/slack.rs b/server/src/chat/transport/slack.rs index baf1f87b..b2c9de86 100644 --- a/server/src/chat/transport/slack.rs +++ b/server/src/chat/transport/slack.rs @@ -863,6 +863,31 @@ async fn handle_incoming_message( return; } + if let Some(rmtree_cmd) = crate::chat::transport::matrix::rmtree::extract_rmtree_command( + message, + &ctx.bot_name, + &ctx.bot_user_id, + ) { + let response = match rmtree_cmd { + crate::chat::transport::matrix::rmtree::RmtreeCommand::Rmtree { story_number } => { + slog!("[slack] Handling rmtree command from {user} in {channel}: story {story_number}"); + crate::chat::transport::matrix::rmtree::handle_rmtree( + &ctx.bot_name, + &story_number, + &ctx.project_root, + &ctx.agents, + ) + .await + } + crate::chat::transport::matrix::rmtree::RmtreeCommand::BadArgs => { + format!("Usage: `{} rmtree `", ctx.bot_name) + } + }; + let response = markdown_to_slack(&response); + let _ = ctx.transport.send_message(channel, &response, "").await; + return; + } + if crate::chat::transport::matrix::reset::extract_reset_command( message, &ctx.bot_name, diff --git a/server/src/chat/transport/whatsapp.rs b/server/src/chat/transport/whatsapp.rs index 27aedccd..7e9f4da5 100644 --- a/server/src/chat/transport/whatsapp.rs +++ b/server/src/chat/transport/whatsapp.rs @@ -1124,6 +1124,30 @@ async fn handle_incoming_message(ctx: &WhatsAppWebhookContext, sender: &str, mes return; } + if let Some(rmtree_cmd) = crate::chat::transport::matrix::rmtree::extract_rmtree_command( + message, + &ctx.bot_name, + &ctx.bot_user_id, + ) { + let response = match rmtree_cmd { + crate::chat::transport::matrix::rmtree::RmtreeCommand::Rmtree { story_number } => { + slog!("[whatsapp] Handling rmtree command from {sender}: story {story_number}"); + crate::chat::transport::matrix::rmtree::handle_rmtree( + &ctx.bot_name, + &story_number, + &ctx.project_root, + &ctx.agents, + ) + .await + } + crate::chat::transport::matrix::rmtree::RmtreeCommand::BadArgs => { + format!("Usage: `{} rmtree `", ctx.bot_name) + } + }; + let _ = ctx.transport.send_message(sender, &response, "").await; + return; + } + if crate::chat::transport::matrix::reset::extract_reset_command( message, &ctx.bot_name, diff --git a/server/src/chat/transport/whatsapp/commands.rs b/server/src/chat/transport/whatsapp/commands.rs new file mode 100644 index 00000000..d892e1c3 --- /dev/null +++ b/server/src/chat/transport/whatsapp/commands.rs @@ -0,0 +1,710 @@ +use std::sync::Arc; + +use crate::chat::transport::matrix::{ConversationEntry, ConversationRole, RoomConversation}; +use crate::http::context::{PermissionDecision}; +use crate::slog; +use super::WhatsAppWebhookContext; +use super::format::{chunk_for_whatsapp, markdown_to_whatsapp}; +use super::history::save_whatsapp_history; + +/// Returns `true` if the message body should be interpreted as permission approval. +fn is_permission_approval(body: &str) -> bool { + let trimmed = body.trim().to_ascii_lowercase(); + matches!( + trimmed.as_str(), + "yes" | "y" | "approve" | "allow" | "ok" + ) +} + +/// Dispatch an incoming WhatsApp message to bot commands. +pub(super) async fn handle_incoming_message(ctx: &WhatsAppWebhookContext, sender: &str, message: &str) { + use crate::chat::commands::{CommandDispatch, try_handle_command}; + + // Allowlist check: when configured, silently ignore unauthorized senders. + if !ctx.allowed_phones.is_empty() + && !ctx.allowed_phones.iter().any(|p| p == sender) + { + slog!("[whatsapp] Ignoring message from unauthorized sender: {sender}"); + return; + } + + // Record this inbound message to keep the 24-hour window open. + ctx.window_tracker.record_message(sender); + + // If there is a pending permission prompt for this sender, interpret the + // message as a yes/no response instead of starting a new command/LLM flow. + { + let mut pending = ctx.pending_perm_replies.lock().await; + if let Some(tx) = pending.remove(sender) { + let decision = if is_permission_approval(message) { + PermissionDecision::Approve + } else { + PermissionDecision::Deny + }; + let _ = tx.send(decision); + let confirmation = if decision == PermissionDecision::Approve { + "Permission approved." + } else { + "Permission denied." + }; + let formatted = markdown_to_whatsapp(confirmation); + let _ = ctx.transport.send_message(sender, &formatted, "").await; + return; + } + } + + let dispatch = CommandDispatch { + bot_name: &ctx.bot_name, + bot_user_id: &ctx.bot_user_id, + project_root: &ctx.project_root, + agents: &ctx.agents, + ambient_rooms: &ctx.ambient_rooms, + room_id: sender, + }; + + if let Some(response) = try_handle_command(&dispatch, message) { + slog!("[whatsapp] Sending command response to {sender}"); + let formatted = markdown_to_whatsapp(&response); + if let Err(e) = ctx.transport.send_message(sender, &formatted, "").await { + slog!("[whatsapp] Failed to send reply to {sender}: {e}"); + } + return; + } + + // Check for async commands (htop, delete). + if let Some(htop_cmd) = crate::chat::transport::matrix::htop::extract_htop_command( + message, + &ctx.bot_name, + &ctx.bot_user_id, + ) { + use crate::chat::transport::matrix::htop::HtopCommand; + slog!("[whatsapp] Handling htop command from {sender}"); + match htop_cmd { + HtopCommand::Stop => { + // htop stop — no-op on WhatsApp since there's no persistent + // editable message; just acknowledge. + let _ = ctx + .transport + .send_message(sender, "htop stopped.", "") + .await; + } + HtopCommand::Start { duration_secs } => { + // On WhatsApp, send a single snapshot instead of a live-updating + // dashboard since we can't edit messages. + let snapshot = crate::chat::transport::matrix::htop::build_htop_message( + &ctx.agents, + 0, + duration_secs, + ); + let _ = ctx.transport.send_message(sender, &snapshot, "").await; + } + } + return; + } + + if let Some(del_cmd) = crate::chat::transport::matrix::delete::extract_delete_command( + message, + &ctx.bot_name, + &ctx.bot_user_id, + ) { + let response = match del_cmd { + crate::chat::transport::matrix::delete::DeleteCommand::Delete { story_number } => { + slog!("[whatsapp] Handling delete command from {sender}: story {story_number}"); + crate::chat::transport::matrix::delete::handle_delete( + &ctx.bot_name, + &story_number, + &ctx.project_root, + &ctx.agents, + ) + .await + } + crate::chat::transport::matrix::delete::DeleteCommand::BadArgs => { + format!("Usage: `{} delete `", ctx.bot_name) + } + }; + let _ = ctx.transport.send_message(sender, &response, "").await; + return; + } + + if crate::chat::transport::matrix::rebuild::extract_rebuild_command( + message, + &ctx.bot_name, + &ctx.bot_user_id, + ) + .is_some() + { + slog!("[whatsapp] Handling rebuild command from {sender}"); + let ack = "Rebuilding server… this may take a moment."; + let _ = ctx.transport.send_message(sender, ack, "").await; + let response = crate::chat::transport::matrix::rebuild::handle_rebuild( + &ctx.bot_name, + &ctx.project_root, + &ctx.agents, + ) + .await; + let _ = ctx.transport.send_message(sender, &response, "").await; + return; + } + + if let Some(rmtree_cmd) = crate::chat::transport::matrix::rmtree::extract_rmtree_command( + message, + &ctx.bot_name, + &ctx.bot_user_id, + ) { + let response = match rmtree_cmd { + crate::chat::transport::matrix::rmtree::RmtreeCommand::Rmtree { story_number } => { + slog!("[whatsapp] Handling rmtree command from {sender}: story {story_number}"); + crate::chat::transport::matrix::rmtree::handle_rmtree( + &ctx.bot_name, + &story_number, + &ctx.project_root, + &ctx.agents, + ) + .await + } + crate::chat::transport::matrix::rmtree::RmtreeCommand::BadArgs => { + format!("Usage: `{} rmtree `", ctx.bot_name) + } + }; + let _ = ctx.transport.send_message(sender, &response, "").await; + return; + } + + if crate::chat::transport::matrix::reset::extract_reset_command( + message, + &ctx.bot_name, + &ctx.bot_user_id, + ) + .is_some() + { + slog!("[whatsapp] Handling reset command from {sender}"); + { + let mut guard = ctx.history.lock().await; + let conv = guard.entry(sender.to_string()).or_insert_with(RoomConversation::default); + conv.session_id = None; + conv.entries.clear(); + save_whatsapp_history(&ctx.project_root, &guard); + } + let _ = ctx + .transport + .send_message(sender, "Session cleared.", "") + .await; + return; + } + + if let Some(start_cmd) = crate::chat::transport::matrix::start::extract_start_command( + message, + &ctx.bot_name, + &ctx.bot_user_id, + ) { + let response = match start_cmd { + crate::chat::transport::matrix::start::StartCommand::Start { + story_number, + agent_hint, + } => { + slog!("[whatsapp] Handling start command from {sender}: story {story_number}"); + crate::chat::transport::matrix::start::handle_start( + &ctx.bot_name, + &story_number, + agent_hint.as_deref(), + &ctx.project_root, + &ctx.agents, + ) + .await + } + crate::chat::transport::matrix::start::StartCommand::BadArgs => { + format!("Usage: `{} start `", ctx.bot_name) + } + }; + let _ = ctx.transport.send_message(sender, &response, "").await; + return; + } + + // No command matched — forward to LLM for conversational response. + slog!("[whatsapp] No command matched, forwarding to LLM for {sender}"); + handle_llm_message(ctx, sender, message).await; +} + +/// Forward a message to Claude Code and send the response back via WhatsApp. +async fn handle_llm_message(ctx: &WhatsAppWebhookContext, sender: &str, user_message: &str) { + use crate::chat::util::drain_complete_paragraphs; + use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult}; + use std::sync::atomic::{AtomicBool, Ordering}; + use tokio::sync::watch; + + // Look up existing session ID for this sender. + let resume_session_id: Option = { + let guard = ctx.history.lock().await; + guard.get(sender).and_then(|conv| conv.session_id.clone()) + }; + + let bot_name = &ctx.bot_name; + let prompt = format!( + "[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{sender}: {user_message}" + ); + + let provider = ClaudeCodeProvider::new(); + let (_cancel_tx, mut cancel_rx) = watch::channel(false); + + // Channel for sending complete chunks to the WhatsApp posting task. + let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); + let msg_tx_for_callback = msg_tx.clone(); + + // Spawn a task to post messages as they arrive. + let post_transport = Arc::clone(&ctx.transport); + let post_sender = sender.to_string(); + let post_task = tokio::spawn(async move { + while let Some(chunk) = msg_rx.recv().await { + // Convert Markdown to WhatsApp formatting, then split into sized chunks. + let formatted = markdown_to_whatsapp(&chunk); + for part in chunk_for_whatsapp(&formatted) { + let _ = post_transport.send_message(&post_sender, &part, "").await; + } + } + }); + + // Shared buffer between the sync token callback and the async scope. + let buffer = Arc::new(std::sync::Mutex::new(String::new())); + let buffer_for_callback = Arc::clone(&buffer); + let sent_any_chunk = Arc::new(AtomicBool::new(false)); + let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk); + + let project_root_str = ctx.project_root.to_string_lossy().to_string(); + let chat_fut = provider.chat_stream( + &prompt, + &project_root_str, + resume_session_id.as_deref(), + None, + &mut cancel_rx, + move |token| { + let mut buf = buffer_for_callback.lock().unwrap(); + buf.push_str(token); + let paragraphs = drain_complete_paragraphs(&mut buf); + for chunk in paragraphs { + sent_any_chunk_for_callback.store(true, Ordering::Relaxed); + let _ = msg_tx_for_callback.send(chunk); + } + }, + |_thinking| {}, + |_activity| {}, + ); + tokio::pin!(chat_fut); + + // Lock the permission receiver for the duration of this chat session. + let mut perm_rx_guard = ctx.perm_rx.lock().await; + + let result = loop { + tokio::select! { + r = &mut chat_fut => break r, + + Some(perm_fwd) = perm_rx_guard.recv() => { + let prompt_msg = format!( + "*Permission Request*\n\nTool: `{}`\n```json\n{}\n```\n\nReply *yes* to approve or *no* to deny.", + perm_fwd.tool_name, + serde_json::to_string_pretty(&perm_fwd.tool_input) + .unwrap_or_else(|_| perm_fwd.tool_input.to_string()), + ); + let formatted = markdown_to_whatsapp(&prompt_msg); + for part in chunk_for_whatsapp(&formatted) { + let _ = ctx.transport.send_message(sender, &part, "").await; + } + + // Store the response sender so the incoming message handler + // can resolve it when the user replies yes/no. + ctx.pending_perm_replies + .lock() + .await + .insert(sender.to_string(), perm_fwd.response_tx); + + // Spawn a timeout task: auto-deny if the user does not respond. + let pending = Arc::clone(&ctx.pending_perm_replies); + let timeout_sender = sender.to_string(); + let timeout_transport = Arc::clone(&ctx.transport); + let timeout_secs = ctx.permission_timeout_secs; + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await; + if let Some(tx) = pending.lock().await.remove(&timeout_sender) { + let _ = tx.send(PermissionDecision::Deny); + let msg = "Permission request timed out — denied (fail-closed)."; + let _ = timeout_transport.send_message(&timeout_sender, msg, "").await; + } + }); + } + } + }; + drop(perm_rx_guard); + + // Flush remaining text. + let remaining = buffer.lock().unwrap().trim().to_string(); + let did_send_any = sent_any_chunk.load(Ordering::Relaxed); + + let (assistant_reply, new_session_id) = match result { + Ok(ClaudeCodeResult { + messages, + session_id, + }) => { + let reply = if !remaining.is_empty() { + let _ = msg_tx.send(remaining.clone()); + remaining + } else if !did_send_any { + let last_text = messages + .iter() + .rev() + .find(|m| m.role == crate::llm::types::Role::Assistant && !m.content.is_empty()) + .map(|m| m.content.clone()) + .unwrap_or_default(); + if !last_text.is_empty() { + let _ = msg_tx.send(last_text.clone()); + } + last_text + } else { + remaining + }; + slog!("[whatsapp] session_id from chat_stream: {:?}", session_id); + (reply, session_id) + } + Err(e) => { + slog!("[whatsapp] LLM error: {e}"); + let err_msg = format!("Error processing your request: {e}"); + let _ = msg_tx.send(err_msg.clone()); + (err_msg, None) + } + }; + + // Signal the posting task to finish and wait for it. + drop(msg_tx); + let _ = post_task.await; + + // Record this exchange in conversation history. + if !assistant_reply.starts_with("Error processing") { + let mut guard = ctx.history.lock().await; + let conv = guard.entry(sender.to_string()).or_default(); + + if new_session_id.is_some() { + conv.session_id = new_session_id; + } + + conv.entries.push(ConversationEntry { + role: ConversationRole::User, + sender: sender.to_string(), + content: user_message.to_string(), + }); + conv.entries.push(ConversationEntry { + role: ConversationRole::Assistant, + sender: String::new(), + content: assistant_reply, + }); + + // Trim to configured maximum. + if conv.entries.len() > ctx.history_size { + let excess = conv.entries.len() - ctx.history_size; + conv.entries.drain(..excess); + } + + save_whatsapp_history(&ctx.project_root, &guard); + } +} + +// ── Tests ─────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use crate::agents::AgentPool; + use crate::io::watcher::WatcherEvent; + use crate::chat::transport::matrix::{ConversationEntry, ConversationRole, RoomConversation}; + use super::super::history::{MessagingWindowTracker, WhatsAppConversationHistory}; + use super::super::WhatsAppWebhookContext; + use super::*; + use std::collections::HashMap; + use std::sync::Arc; + use tokio::sync::Mutex as TokioMutex; + + /// Build a minimal WhatsAppWebhookContext for allowlist tests. + fn make_ctx_with_allowlist( + allowed_phones: Vec, + ) -> Arc { + struct NullTransport; + + #[async_trait::async_trait] + impl crate::chat::ChatTransport for NullTransport { + async fn send_message( + &self, + _room: &str, + _plain: &str, + _html: &str, + ) -> Result { + Ok(String::new()) + } + async fn edit_message( + &self, + _room: &str, + _id: &str, + _plain: &str, + _html: &str, + ) -> Result<(), String> { + Ok(()) + } + async fn send_typing(&self, _room: &str, _typing: bool) -> Result<(), String> { + Ok(()) + } + } + + let tmp = tempfile::tempdir().unwrap(); + let (tx, _rx) = tokio::sync::broadcast::channel::(16); + let agents = Arc::new(AgentPool::new(3999, tx)); + let tracker = Arc::new(MessagingWindowTracker::new()); + let (_perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel(); + Arc::new(WhatsAppWebhookContext { + verify_token: "tok".to_string(), + provider: "meta".to_string(), + transport: Arc::new(NullTransport), + project_root: tmp.path().to_path_buf(), + agents, + bot_name: "Bot".to_string(), + bot_user_id: "whatsapp-bot".to_string(), + ambient_rooms: Arc::new(std::sync::Mutex::new(Default::default())), + history: Arc::new(tokio::sync::Mutex::new(Default::default())), + history_size: 20, + window_tracker: tracker, + allowed_phones, + perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), + pending_perm_replies: Arc::new(tokio::sync::Mutex::new(Default::default())), + permission_timeout_secs: 120, + }) + } + + // ── Allowlist tests ─────────────────────────────────────────────────── + + #[tokio::test] + async fn allowlist_blocks_unauthorized_sender() { + let allowed = vec!["+15551111111".to_string()]; + let ctx = make_ctx_with_allowlist(allowed); + let unauthorized = "+15559999999"; + + handle_incoming_message(&ctx, unauthorized, "hello").await; + + // window_tracker is only updated AFTER the allowlist check, so an + // unauthorized sender must leave the tracker untouched. + assert!( + !ctx.window_tracker.is_within_window(unauthorized), + "unauthorized sender should not have updated the window tracker" + ); + } + + #[tokio::test] + async fn allowlist_empty_allows_all_senders() { + // Empty allowlist = open (backwards compatible). + let ctx = make_ctx_with_allowlist(vec![]); + let sender = "+15551234567"; + + handle_incoming_message(&ctx, sender, "hello").await; + + // window_tracker.record_message is called right after the allowlist + // check passes, so the sender should be recorded. + assert!( + ctx.window_tracker.is_within_window(sender), + "sender should be recorded when allowlist is empty" + ); + } + + #[tokio::test] + async fn allowlist_allows_listed_sender() { + let sender = "+15551111111"; + let ctx = make_ctx_with_allowlist(vec![sender.to_string()]); + + handle_incoming_message(&ctx, sender, "hello").await; + + assert!( + ctx.window_tracker.is_within_window(sender), + "listed sender should be recorded in the window tracker" + ); + } + + // ── rebuild command extraction ───────────────────────────────────── + + #[test] + fn rebuild_command_extracted_from_plain_message() { + // WhatsApp messages arrive without a bot mention prefix. + // extract_rebuild_command must recognise "rebuild" by itself. + let result = crate::chat::transport::matrix::rebuild::extract_rebuild_command( + "rebuild", + "Timmy", + "@timmy:home.local", + ); + assert!(result.is_some(), "plain 'rebuild' should be recognised"); + } + + #[test] + fn rebuild_command_extracted_with_bot_name_prefix() { + let result = crate::chat::transport::matrix::rebuild::extract_rebuild_command( + "Timmy rebuild", + "Timmy", + "@timmy:home.local", + ); + assert!(result.is_some(), "'Timmy rebuild' should be recognised"); + } + + #[test] + fn non_rebuild_whatsapp_message_not_extracted() { + let result = crate::chat::transport::matrix::rebuild::extract_rebuild_command( + "status", + "Timmy", + "@timmy:home.local", + ); + assert!(result.is_none(), "'status' should not be recognised as rebuild"); + } + + // ── reset command extraction ─────────────────────────────────────── + + #[test] + fn reset_command_extracted_from_plain_message() { + let result = crate::chat::transport::matrix::reset::extract_reset_command( + "reset", + "Timmy", + "@timmy:home.local", + ); + assert!(result.is_some(), "plain 'reset' should be recognised"); + } + + #[test] + fn reset_command_extracted_with_bot_name_prefix() { + let result = crate::chat::transport::matrix::reset::extract_reset_command( + "Timmy reset", + "Timmy", + "@timmy:home.local", + ); + assert!(result.is_some(), "'Timmy reset' should be recognised"); + } + + #[tokio::test] + async fn reset_command_clears_whatsapp_session() { + let sender = "+15555550100"; + let history: WhatsAppConversationHistory = Arc::new(TokioMutex::new({ + let mut m = HashMap::new(); + m.insert(sender.to_string(), RoomConversation { + session_id: Some("old-session".to_string()), + entries: vec![ConversationEntry { + role: ConversationRole::User, + sender: sender.to_string(), + content: "previous message".to_string(), + }], + }); + m + })); + + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + std::fs::create_dir_all(&sk).unwrap(); + + { + let mut guard = history.lock().await; + let conv = guard.entry(sender.to_string()).or_insert_with(RoomConversation::default); + conv.session_id = None; + conv.entries.clear(); + save_whatsapp_history(tmp.path(), &guard); + } + + let guard = history.lock().await; + let conv = guard.get(sender).unwrap(); + assert!(conv.session_id.is_none(), "session_id should be cleared"); + assert!(conv.entries.is_empty(), "entries should be cleared"); + } + + #[test] + fn start_command_extracted_from_plain_message() { + // WhatsApp messages arrive without a bot mention prefix. + // extract_start_command must recognise "start 42" by itself. + let result = crate::chat::transport::matrix::start::extract_start_command( + "start 42", + "Timmy", + "@timmy:home.local", + ); + assert!(result.is_some(), "plain 'start 42' should be recognised"); + assert_eq!( + result, + Some(crate::chat::transport::matrix::start::StartCommand::Start { + story_number: "42".to_string(), + agent_hint: None, + }) + ); + } + + #[test] + fn start_command_extracted_with_bot_name_prefix() { + let result = crate::chat::transport::matrix::start::extract_start_command( + "Timmy start 99", + "Timmy", + "@timmy:home.local", + ); + assert!(result.is_some(), "'Timmy start 99' should be recognised"); + } + + #[test] + fn non_start_whatsapp_message_not_extracted() { + let result = crate::chat::transport::matrix::start::extract_start_command( + "help", + "Timmy", + "@timmy:home.local", + ); + assert!(result.is_none(), "'help' should not be recognised as start"); + } + + // ── rmtree command extraction ────────────────────────────────────── + + #[test] + fn rmtree_command_extracted_from_plain_message() { + // WhatsApp messages arrive without a bot mention prefix. + // extract_rmtree_command must recognise "rmtree 42" by itself. + let result = crate::chat::transport::matrix::rmtree::extract_rmtree_command( + "rmtree 42", + "Timmy", + "@timmy:home.local", + ); + assert!( + matches!( + result, + Some(crate::chat::transport::matrix::rmtree::RmtreeCommand::Rmtree { .. }) + ), + "plain 'rmtree 42' should be recognised" + ); + } + + #[test] + fn rmtree_command_extracted_with_bot_name_prefix() { + let result = crate::chat::transport::matrix::rmtree::extract_rmtree_command( + "Timmy rmtree 42", + "Timmy", + "@timmy:home.local", + ); + assert!( + matches!( + result, + Some(crate::chat::transport::matrix::rmtree::RmtreeCommand::Rmtree { .. }) + ), + "'Timmy rmtree 42' should be recognised" + ); + } + + #[test] + fn rmtree_command_returns_bad_args_without_number() { + let result = crate::chat::transport::matrix::rmtree::extract_rmtree_command( + "rmtree", + "Timmy", + "@timmy:home.local", + ); + assert_eq!( + result, + Some(crate::chat::transport::matrix::rmtree::RmtreeCommand::BadArgs) + ); + } + + #[test] + fn non_rmtree_whatsapp_message_not_extracted() { + let result = crate::chat::transport::matrix::rmtree::extract_rmtree_command( + "status", + "Timmy", + "@timmy:home.local", + ); + assert!(result.is_none(), "'status' should not be recognised as rmtree"); + } +} diff --git a/server/src/chat/transport/whatsapp/format.rs b/server/src/chat/transport/whatsapp/format.rs new file mode 100644 index 00000000..bf9fa9f4 --- /dev/null +++ b/server/src/chat/transport/whatsapp/format.rs @@ -0,0 +1,247 @@ +use regex::Regex; +use std::sync::LazyLock; + +/// WhatsApp Business API maximum message body size in characters. +pub(super) const WHATSAPP_MAX_MESSAGE_LEN: usize = 4096; + +/// Split a text into chunks that fit within WhatsApp's message size limit. +/// +/// Tries to split on paragraph boundaries (`\n\n`), falling back to line +/// boundaries (`\n`), and finally hard-splitting at the character limit. +pub fn chunk_for_whatsapp(text: &str) -> Vec { + if text.len() <= WHATSAPP_MAX_MESSAGE_LEN { + return vec![text.to_string()]; + } + + let mut chunks = Vec::new(); + let mut remaining = text; + + while !remaining.is_empty() { + if remaining.len() <= WHATSAPP_MAX_MESSAGE_LEN { + chunks.push(remaining.to_string()); + break; + } + + // Find the best split point within the limit. + let window = &remaining[..WHATSAPP_MAX_MESSAGE_LEN]; + + // Prefer paragraph boundary. + let split_pos = window + .rfind("\n\n") + .or_else(|| window.rfind('\n')) + .unwrap_or(WHATSAPP_MAX_MESSAGE_LEN); + + let (chunk, rest) = remaining.split_at(split_pos); + let chunk = chunk.trim(); + if !chunk.is_empty() { + chunks.push(chunk.to_string()); + } + + // Skip the delimiter. + remaining = rest.trim_start_matches('\n'); + } + + chunks +} + +/// Convert standard Markdown formatting to WhatsApp-native formatting. +/// +/// WhatsApp supports a limited subset of formatting: +/// - Bold: `*text*` +/// - Italic: `_text_` +/// - Strikethrough: `~text~` +/// - Monospace / code: backtick-delimited (same as Markdown) +/// +/// This function converts common Markdown constructs so messages render +/// nicely in WhatsApp instead of showing raw Markdown syntax. +pub fn markdown_to_whatsapp(text: &str) -> String { + // Regexes are compiled once and reused across calls. + static RE_FENCED_BLOCK: LazyLock = + LazyLock::new(|| Regex::new(r"(?ms)^```.*?\n(.*?)^```").unwrap()); + static RE_HEADER: LazyLock = + LazyLock::new(|| Regex::new(r"(?m)^#{1,6}\s+(.+)$").unwrap()); + static RE_BOLD_ITALIC: LazyLock = + LazyLock::new(|| Regex::new(r"\*\*\*(.+?)\*\*\*").unwrap()); + static RE_BOLD: LazyLock = + LazyLock::new(|| Regex::new(r"\*\*(.+?)\*\*").unwrap()); + static RE_STRIKETHROUGH: LazyLock = + LazyLock::new(|| Regex::new(r"~~(.+?)~~").unwrap()); + static RE_LINK: LazyLock = + LazyLock::new(|| Regex::new(r"\[([^\]]+)\]\(([^)]+)\)").unwrap()); + static RE_HR: LazyLock = + LazyLock::new(|| Regex::new(r"(?m)^---+$").unwrap()); + + // 1. Protect fenced code blocks by replacing them with placeholders. + let mut code_blocks: Vec = Vec::new(); + let protected = RE_FENCED_BLOCK.replace_all(text, |caps: ®ex::Captures| { + let idx = code_blocks.len(); + code_blocks.push(caps[0].to_string()); + format!("\x00CODEBLOCK{idx}\x00") + }); + let mut out = protected.into_owned(); + + // 2. Headers → bold text. + out = RE_HEADER.replace_all(&out, "*$1*").into_owned(); + + // 3. Bold+italic (***text***) → bold italic (*_text_*). + out = RE_BOLD_ITALIC.replace_all(&out, "*_${1}_*").into_owned(); + + // 4. Bold (**text**) → WhatsApp bold (*text*). + out = RE_BOLD.replace_all(&out, "*$1*").into_owned(); + + // 5. Strikethrough (~~text~~) → WhatsApp strikethrough (~text~). + out = RE_STRIKETHROUGH.replace_all(&out, "~$1~").into_owned(); + + // 6. Links [text](url) → text (url). + out = RE_LINK.replace_all(&out, "$1 ($2)").into_owned(); + + // 7. Horizontal rules → empty line (just remove them). + out = RE_HR.replace_all(&out, "").into_owned(); + + // 8. Restore code blocks. + for (idx, block) in code_blocks.iter().enumerate() { + out = out.replace(&format!("\x00CODEBLOCK{idx}\x00"), block); + } + + out +} + +// ── Tests ─────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + // ── chunk_for_whatsapp tests ──────────────────────────────────────── + + #[test] + fn chunk_short_message_returns_single_chunk() { + let chunks = chunk_for_whatsapp("Hello world"); + assert_eq!(chunks, vec!["Hello world"]); + } + + #[test] + fn chunk_exactly_at_limit_returns_single_chunk() { + let text = "a".repeat(WHATSAPP_MAX_MESSAGE_LEN); + let chunks = chunk_for_whatsapp(&text); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].len(), WHATSAPP_MAX_MESSAGE_LEN); + } + + #[test] + fn chunk_splits_on_paragraph_boundary() { + // Create text with a paragraph boundary near the split point. + let first_para = "a".repeat(4000); + let second_para = "b".repeat(200); + let text = format!("{first_para}\n\n{second_para}"); + let chunks = chunk_for_whatsapp(&text); + assert_eq!(chunks.len(), 2); + assert_eq!(chunks[0], first_para); + assert_eq!(chunks[1], second_para); + } + + #[test] + fn chunk_splits_on_line_boundary_when_no_paragraph_break() { + let first_line = "a".repeat(4000); + let second_line = "b".repeat(200); + let text = format!("{first_line}\n{second_line}"); + let chunks = chunk_for_whatsapp(&text); + assert_eq!(chunks.len(), 2); + assert_eq!(chunks[0], first_line); + assert_eq!(chunks[1], second_line); + } + + #[test] + fn chunk_hard_splits_continuous_text() { + let text = "x".repeat(WHATSAPP_MAX_MESSAGE_LEN * 2 + 100); + let chunks = chunk_for_whatsapp(&text); + assert!(chunks.len() >= 2); + for chunk in &chunks { + assert!(chunk.len() <= WHATSAPP_MAX_MESSAGE_LEN); + } + // Verify all content is preserved. + let reassembled: String = chunks.join(""); + assert_eq!(reassembled.len(), text.len()); + } + + #[test] + fn chunk_empty_string_returns_single_empty() { + let chunks = chunk_for_whatsapp(""); + assert_eq!(chunks, vec![""]); + } + + // ── markdown_to_whatsapp tests ──────────────────────────────────────── + + #[test] + fn md_to_wa_converts_headers_to_bold() { + assert_eq!(markdown_to_whatsapp("# Title"), "*Title*"); + assert_eq!(markdown_to_whatsapp("## Subtitle"), "*Subtitle*"); + assert_eq!(markdown_to_whatsapp("### Section"), "*Section*"); + assert_eq!(markdown_to_whatsapp("###### Deep"), "*Deep*"); + } + + #[test] + fn md_to_wa_converts_bold() { + assert_eq!(markdown_to_whatsapp("**bold text**"), "*bold text*"); + } + + #[test] + fn md_to_wa_converts_bold_italic() { + assert_eq!(markdown_to_whatsapp("***emphasis***"), "*_emphasis_*"); + } + + #[test] + fn md_to_wa_converts_strikethrough() { + assert_eq!(markdown_to_whatsapp("~~removed~~"), "~removed~"); + } + + #[test] + fn md_to_wa_converts_links() { + assert_eq!( + markdown_to_whatsapp("[click here](https://example.com)"), + "click here (https://example.com)" + ); + } + + #[test] + fn md_to_wa_removes_horizontal_rules() { + assert_eq!(markdown_to_whatsapp("above\n---\nbelow"), "above\n\nbelow"); + } + + #[test] + fn md_to_wa_preserves_inline_code() { + assert_eq!(markdown_to_whatsapp("use `foo()` here"), "use `foo()` here"); + } + + #[test] + fn md_to_wa_preserves_code_blocks() { + let input = "before\n```rust\nfn main() {\n println!(\"**not bold**\");\n}\n```\nafter"; + let output = markdown_to_whatsapp(input); + // Code block content must NOT be converted. + assert!(output.contains("\"**not bold**\"")); + // But surrounding text is still converted. + assert!(output.contains("before")); + assert!(output.contains("after")); + } + + #[test] + fn md_to_wa_mixed_message() { + let input = "### Philosophy\n- **Stories** define the change\n- ~~old~~ is gone\n- See [docs](https://example.com)"; + let output = markdown_to_whatsapp(input); + assert!(output.starts_with("*Philosophy*")); + assert!(output.contains("*Stories*")); + assert!(output.contains("~old~")); + assert!(output.contains("docs (https://example.com)")); + } + + #[test] + fn md_to_wa_passthrough_plain_text() { + let plain = "Hello, how are you?"; + assert_eq!(markdown_to_whatsapp(plain), plain); + } + + #[test] + fn md_to_wa_empty_string() { + assert_eq!(markdown_to_whatsapp(""), ""); + } +} diff --git a/server/src/chat/transport/whatsapp/history.rs b/server/src/chat/transport/whatsapp/history.rs new file mode 100644 index 00000000..b8027c9a --- /dev/null +++ b/server/src/chat/transport/whatsapp/history.rs @@ -0,0 +1,254 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::Mutex as TokioMutex; + +use crate::chat::transport::matrix::RoomConversation; +use crate::slog; + +// ── Messaging window tracker ───────────────────────────────────────────── + +/// Tracks the 24-hour messaging window per WhatsApp phone number. +/// +/// Meta's Business API only permits free-form text messages within 24 hours of +/// the last *inbound* message from that user. After that window expires, only +/// approved message templates may be sent. +/// +/// Call [`record_message`] whenever an inbound message is received. Before +/// sending a proactive notification, call [`is_within_window`] to choose +/// between free-form text and a template message. +pub struct MessagingWindowTracker { + last_message: std::sync::Mutex>, + #[allow(dead_code)] // Used by Meta provider path (is_within_window → send_notification) + window_duration: std::time::Duration, +} + +impl Default for MessagingWindowTracker { + fn default() -> Self { + Self::new() + } +} + +impl MessagingWindowTracker { + /// Create a tracker with the standard 24-hour window. + pub fn new() -> Self { + Self { + last_message: std::sync::Mutex::new(HashMap::new()), + window_duration: std::time::Duration::from_secs(24 * 60 * 60), + } + } + + /// Create a tracker with a custom window duration (useful in tests). + #[cfg(test)] + pub(crate) fn with_duration(window_duration: std::time::Duration) -> Self { + Self { + last_message: std::sync::Mutex::new(HashMap::new()), + window_duration, + } + } + + /// Record that `phone` sent an inbound message right now. + pub fn record_message(&self, phone: &str) { + self.last_message + .lock() + .unwrap() + .insert(phone.to_string(), std::time::Instant::now()); + } + + /// Returns `true` when the last inbound message from `phone` arrived within + /// the 24-hour window, meaning free-form replies are still permitted. + #[allow(dead_code)] // Used by Meta provider path (send_notification) + pub fn is_within_window(&self, phone: &str) -> bool { + let map = self.last_message.lock().unwrap(); + match map.get(phone) { + Some(&instant) => instant.elapsed() < self.window_duration, + None => false, + } + } +} + +// ── Conversation history persistence ───────────────────────────────── + +/// Per-sender conversation history, keyed by phone number. +pub type WhatsAppConversationHistory = Arc>>; + +/// On-disk format for persisted WhatsApp conversation history. +#[derive(Serialize, Deserialize)] +struct PersistedWhatsAppHistory { + senders: HashMap, +} + +/// Path to the persisted WhatsApp conversation history file. +const WHATSAPP_HISTORY_FILE: &str = ".storkit/whatsapp_history.json"; + +/// Load WhatsApp conversation history from disk. +pub fn load_whatsapp_history(project_root: &std::path::Path) -> HashMap { + let path = project_root.join(WHATSAPP_HISTORY_FILE); + let data = match std::fs::read_to_string(&path) { + Ok(d) => d, + Err(_) => return HashMap::new(), + }; + let persisted: PersistedWhatsAppHistory = match serde_json::from_str(&data) { + Ok(p) => p, + Err(e) => { + slog!("[whatsapp] Failed to parse history file: {e}"); + return HashMap::new(); + } + }; + persisted.senders +} + +/// Save WhatsApp conversation history to disk. +pub(super) fn save_whatsapp_history( + project_root: &std::path::Path, + history: &HashMap, +) { + let persisted = PersistedWhatsAppHistory { + senders: history.clone(), + }; + let path = project_root.join(WHATSAPP_HISTORY_FILE); + match serde_json::to_string_pretty(&persisted) { + Ok(json) => { + if let Err(e) = std::fs::write(&path, json) { + slog!("[whatsapp] Failed to write history file: {e}"); + } + } + Err(e) => slog!("[whatsapp] Failed to serialise history: {e}"), + } +} + +// ── Tests ─────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::chat::transport::matrix::{ConversationEntry, ConversationRole, RoomConversation}; + + // ── MessagingWindowTracker ──────────────────────────────────────── + + #[test] + fn window_tracker_unknown_user_is_outside_window() { + let tracker = MessagingWindowTracker::new(); + assert!(!tracker.is_within_window("15551234567")); + } + + #[test] + fn window_tracker_records_within_window() { + let tracker = MessagingWindowTracker::new(); + tracker.record_message("15551234567"); + assert!(tracker.is_within_window("15551234567")); + } + + #[test] + fn window_tracker_expired_window_returns_false() { + // Use a 1-nanosecond window so it expires immediately. + let tracker = MessagingWindowTracker::with_duration(std::time::Duration::from_nanos(1)); + tracker.record_message("15551234567"); + // Sleep briefly to ensure the instant has elapsed. + std::thread::sleep(std::time::Duration::from_millis(1)); + assert!(!tracker.is_within_window("15551234567")); + } + + #[test] + fn window_tracker_tracks_users_independently() { + let tracker = MessagingWindowTracker::new(); + tracker.record_message("111"); + assert!(tracker.is_within_window("111")); + assert!(!tracker.is_within_window("222")); + } + + // ── WhatsApp history persistence tests ────────────────────────────── + + #[test] + fn save_and_load_whatsapp_history_round_trips() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + std::fs::create_dir_all(&sk).unwrap(); + + let mut history = HashMap::new(); + history.insert( + "15551234567".to_string(), + RoomConversation { + session_id: Some("sess-abc".to_string()), + entries: vec![ + ConversationEntry { + role: ConversationRole::User, + sender: "15551234567".to_string(), + content: "hello".to_string(), + }, + ConversationEntry { + role: ConversationRole::Assistant, + sender: String::new(), + content: "hi there!".to_string(), + }, + ], + }, + ); + + save_whatsapp_history(tmp.path(), &history); + let loaded = load_whatsapp_history(tmp.path()); + + assert_eq!(loaded.len(), 1); + let conv = loaded.get("15551234567").unwrap(); + assert_eq!(conv.session_id.as_deref(), Some("sess-abc")); + assert_eq!(conv.entries.len(), 2); + assert_eq!(conv.entries[0].content, "hello"); + assert_eq!(conv.entries[1].content, "hi there!"); + } + + #[test] + fn load_whatsapp_history_returns_empty_when_file_missing() { + let tmp = tempfile::tempdir().unwrap(); + let history = load_whatsapp_history(tmp.path()); + assert!(history.is_empty()); + } + + #[test] + fn load_whatsapp_history_returns_empty_on_invalid_json() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + std::fs::create_dir_all(&sk).unwrap(); + std::fs::write(sk.join("whatsapp_history.json"), "not json {{{").unwrap(); + let history = load_whatsapp_history(tmp.path()); + assert!(history.is_empty()); + } + + #[test] + fn save_whatsapp_history_preserves_multiple_senders() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + std::fs::create_dir_all(&sk).unwrap(); + + let mut history = HashMap::new(); + history.insert( + "111".to_string(), + RoomConversation { + session_id: None, + entries: vec![ConversationEntry { + role: ConversationRole::User, + sender: "111".to_string(), + content: "msg1".to_string(), + }], + }, + ); + history.insert( + "222".to_string(), + RoomConversation { + session_id: Some("sess-222".to_string()), + entries: vec![ConversationEntry { + role: ConversationRole::User, + sender: "222".to_string(), + content: "msg2".to_string(), + }], + }, + ); + + save_whatsapp_history(tmp.path(), &history); + let loaded = load_whatsapp_history(tmp.path()); + + assert_eq!(loaded.len(), 2); + assert!(loaded.contains_key("111")); + assert!(loaded.contains_key("222")); + assert_eq!(loaded["222"].session_id.as_deref(), Some("sess-222")); + } +} diff --git a/server/src/chat/transport/whatsapp/meta.rs b/server/src/chat/transport/whatsapp/meta.rs new file mode 100644 index 00000000..60021fc2 --- /dev/null +++ b/server/src/chat/transport/whatsapp/meta.rs @@ -0,0 +1,583 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +use crate::chat::{ChatTransport, MessageId}; +use crate::slog; +use super::history::MessagingWindowTracker; + +// ── API base URLs (overridable for tests) ──────────────────────────────── + +pub(super) const GRAPH_API_BASE: &str = "https://graph.facebook.com/v21.0"; + +/// Graph API error code indicating the 24-hour messaging window has elapsed. +/// +/// When Meta returns this code the caller must fall back to an approved message +/// template instead of free-form text. +pub(super) const ERROR_CODE_OUTSIDE_WINDOW: i64 = 131047; + +/// Sentinel error string returned by [`WhatsAppTransport::send_text`] when the +/// Graph API reports that the 24-hour messaging window has expired. +pub(super) const OUTSIDE_WINDOW_ERR: &str = "OUTSIDE_MESSAGING_WINDOW"; + +// ── WhatsApp Transport ────────────────────────────────────────────────── + +/// Real WhatsApp Business API transport. +/// +/// Sends text messages via `POST {GRAPH_API_BASE}/{phone_number_id}/messages`. +/// Falls back to approved notification templates when the 24-hour window has +/// elapsed (Meta error 131047). +pub struct WhatsAppTransport { + phone_number_id: String, + access_token: String, + client: reqwest::Client, + /// Name of the approved Meta message template used for notifications + /// outside the 24-hour messaging window. + #[allow(dead_code)] // Used by Meta provider path (send_template_notification) + notification_template_name: String, + /// Optional base URL override for tests. + api_base: String, +} + +impl WhatsAppTransport { + pub fn new( + phone_number_id: String, + access_token: String, + notification_template_name: String, + ) -> Self { + Self { + phone_number_id, + access_token, + client: reqwest::Client::new(), + notification_template_name, + api_base: GRAPH_API_BASE.to_string(), + } + } + + #[cfg(test)] + pub(crate) fn with_api_base(phone_number_id: String, access_token: String, api_base: String) -> Self { + Self { + phone_number_id, + access_token, + client: reqwest::Client::new(), + notification_template_name: "pipeline_notification".to_string(), + api_base, + } + } + + /// Send a free-form text message to a WhatsApp user via the Graph API. + /// + /// Returns [`OUTSIDE_WINDOW_ERR`] if the API responds with error code + /// 131047 (messaging window expired). All other errors are returned as + /// descriptive strings. + async fn send_text(&self, to: &str, body: &str) -> Result { + let url = format!("{}/{}/messages", self.api_base, self.phone_number_id); + + let payload = GraphSendMessage { + messaging_product: "whatsapp", + to, + r#type: "text", + text: GraphTextBody { body }, + }; + + let resp = self + .client + .post(&url) + .bearer_auth(&self.access_token) + .json(&payload) + .send() + .await + .map_err(|e| format!("WhatsApp API request failed: {e}"))?; + + let status = resp.status(); + let resp_text = resp + .text() + .await + .unwrap_or_else(|_| "".to_string()); + + if !status.is_success() { + // Check for 'outside messaging window' (code 131047). Return a + // distinct sentinel so callers can fall back to a template without + // crashing. + if let Ok(err_body) = serde_json::from_str::(&resp_text) + && err_body.error.as_ref().and_then(|e| e.code) == Some(ERROR_CODE_OUTSIDE_WINDOW) + { + slog!( + "[whatsapp] Outside 24-hour messaging window for {to}; \ + template required (error 131047)" + ); + return Err(OUTSIDE_WINDOW_ERR.to_string()); + } + return Err(format!("WhatsApp API returned {status}: {resp_text}")); + } + + // Extract the message ID from the response. + let parsed: GraphSendResponse = serde_json::from_str(&resp_text).map_err(|e| { + format!("Failed to parse WhatsApp API response: {e} — body: {resp_text}") + })?; + + let msg_id = parsed + .messages + .first() + .map(|m| m.id.clone()) + .unwrap_or_default(); + + Ok(msg_id) + } + + /// Send an approved template notification message. + /// + /// Used when the 24-hour window has expired and free-form text is not + /// permitted. The template must already be approved in the Meta Business + /// Manager under the name configured in `bot.toml` + /// (`whatsapp_notification_template`, default `pipeline_notification`). + /// + /// The template body is expected to accept two positional parameters: + /// `{{1}}` = story name, `{{2}}` = pipeline stage. + #[allow(dead_code)] // Meta provider path — template fallback for expired 24h window + pub async fn send_template_notification( + &self, + to: &str, + story_name: &str, + stage: &str, + ) -> Result { + let url = format!("{}/{}/messages", self.api_base, self.phone_number_id); + + let payload = GraphTemplateMessage { + messaging_product: "whatsapp", + to, + r#type: "template", + template: GraphTemplate { + name: &self.notification_template_name, + language: GraphLanguage { code: "en_US" }, + components: vec![GraphTemplateComponent { + r#type: "body", + parameters: vec![ + GraphTemplateParameter { + r#type: "text", + text: story_name.to_string(), + }, + GraphTemplateParameter { + r#type: "text", + text: stage.to_string(), + }, + ], + }], + }, + }; + + let resp = self + .client + .post(&url) + .bearer_auth(&self.access_token) + .json(&payload) + .send() + .await + .map_err(|e| format!("WhatsApp template API request failed: {e}"))?; + + let status = resp.status(); + let resp_text = resp + .text() + .await + .unwrap_or_else(|_| "".to_string()); + + if !status.is_success() { + return Err(format!( + "WhatsApp template API returned {status}: {resp_text}" + )); + } + + let parsed: GraphSendResponse = serde_json::from_str(&resp_text).map_err(|e| { + format!("Failed to parse WhatsApp template API response: {e} — body: {resp_text}") + })?; + + let msg_id = parsed + .messages + .first() + .map(|m| m.id.clone()) + .unwrap_or_default(); + + Ok(msg_id) + } + + /// Send a pipeline notification, respecting the 24-hour messaging window. + /// + /// - Within the window: sends a free-form text message. + /// - Outside the window (or if the API returns 131047): sends an approved + /// template message instead. + /// + /// This method never crashes on a messaging-window error — it always + /// attempts the template fallback and logs what happened. + #[allow(dead_code)] // Meta provider path — window-aware notification dispatch + pub async fn send_notification( + &self, + to: &str, + tracker: &MessagingWindowTracker, + story_name: &str, + stage: &str, + ) -> Result { + if tracker.is_within_window(to) { + let text = format!("Story '{story_name}' has moved to {stage}."); + match self.send_text(to, &text).await { + Ok(id) => return Ok(id), + Err(ref e) if e == OUTSIDE_WINDOW_ERR => { + // Window expired between our check and the API call — + // fall through to the template path. + slog!( + "[whatsapp] Window expired mid-flight for {to}; \ + falling back to template" + ); + } + Err(e) => return Err(e), + } + } + + // Outside window — use the approved template. + slog!("[whatsapp] Sending template notification to {to} (outside 24h window)"); + self.send_template_notification(to, story_name, stage).await + } +} + +#[async_trait] +impl ChatTransport for WhatsAppTransport { + async fn send_message( + &self, + recipient: &str, + plain: &str, + _html: &str, + ) -> Result { + slog!("[whatsapp] send_message to {recipient}: {plain:.80}"); + match self.send_text(recipient, plain).await { + Ok(id) => Ok(id), + Err(ref e) if e == OUTSIDE_WINDOW_ERR => { + // Graceful degradation: log and surface a meaningful error + // rather than crashing. Callers sending command responses + // should normally be within the window; this handles the edge + // case where processing was delayed. + slog!( + "[whatsapp] Cannot send to {recipient}: outside 24h window \ + (message dropped)" + ); + Err(format!( + "Outside 24-hour messaging window for {recipient}; \ + send a message to the bot first to re-open the window" + )) + } + Err(e) => Err(e), + } + } + + async fn edit_message( + &self, + recipient: &str, + _original_message_id: &str, + plain: &str, + html: &str, + ) -> Result<(), String> { + // WhatsApp does not support message editing — send a new message. + slog!("[whatsapp] edit_message — WhatsApp does not support edits, sending new message"); + self.send_message(recipient, plain, html).await.map(|_| ()) + } + + async fn send_typing(&self, _recipient: &str, _typing: bool) -> Result<(), String> { + // WhatsApp Business API does not expose typing indicators. + Ok(()) + } +} + +// ── Graph API request/response types ──────────────────────────────────── + +#[derive(Serialize)] +struct GraphSendMessage<'a> { + messaging_product: &'a str, + to: &'a str, + r#type: &'a str, + text: GraphTextBody<'a>, +} + +#[derive(Serialize)] +struct GraphTextBody<'a> { + body: &'a str, +} + +#[derive(Deserialize)] +struct GraphSendResponse { + #[serde(default)] + messages: Vec, +} + +#[derive(Deserialize)] +struct GraphMessageId { + id: String, +} + +// ── Graph API error response types ───────────────────────────────────── + +#[derive(Deserialize)] +struct GraphApiErrorResponse { + error: Option, +} + +#[derive(Deserialize)] +struct GraphApiError { + code: Option, + #[allow(dead_code)] + message: Option, +} + +// ── Template message types ────────────────────────────────────────────── + +#[allow(dead_code)] // Meta provider path — template message types +#[derive(Serialize)] +struct GraphTemplateMessage<'a> { + messaging_product: &'a str, + to: &'a str, + r#type: &'a str, + template: GraphTemplate<'a>, +} + +#[allow(dead_code)] +#[derive(Serialize)] +struct GraphTemplate<'a> { + name: &'a str, + language: GraphLanguage, + components: Vec, +} + +#[allow(dead_code)] +#[derive(Serialize)] +struct GraphLanguage { + code: &'static str, +} + +#[allow(dead_code)] +#[derive(Serialize)] +struct GraphTemplateComponent { + r#type: &'static str, + parameters: Vec, +} + +#[allow(dead_code)] +#[derive(Serialize)] +struct GraphTemplateParameter { + r#type: &'static str, + text: String, +} + +// ── Tests ─────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::chat::transport::whatsapp::history::MessagingWindowTracker; + + // ── send_text error handling ─────────────────────────────────────── + + #[tokio::test] + async fn send_text_handles_131047_outside_window_error() { + let mut server = mockito::Server::new_async().await; + server + .mock("POST", "/123456/messages") + .with_status(400) + .with_body( + r#"{"error":{"message":"More than 24 hours have passed","type":"OAuthException","code":131047}}"#, + ) + .create_async() + .await; + + let transport = WhatsAppTransport::with_api_base( + "123456".to_string(), + "test-token".to_string(), + server.url(), + ); + + let result = transport.send_text("15551234567", "hello").await; + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), OUTSIDE_WINDOW_ERR); + } + + #[tokio::test] + async fn send_message_handles_outside_window_gracefully() { + let mut server = mockito::Server::new_async().await; + server + .mock("POST", "/123456/messages") + .with_status(400) + .with_body( + r#"{"error":{"message":"More than 24 hours have passed","type":"OAuthException","code":131047}}"#, + ) + .create_async() + .await; + + let transport = WhatsAppTransport::with_api_base( + "123456".to_string(), + "test-token".to_string(), + server.url(), + ); + + // send_message must not panic — it returns Err with a human-readable message. + let result = transport.send_message("15551234567", "hello", "").await; + assert!(result.is_err()); + let msg = result.unwrap_err(); + assert!( + msg.contains("24-hour messaging window"), + "unexpected: {msg}" + ); + } + + // ── send_template_notification ──────────────────────────────────── + + #[tokio::test] + async fn send_template_notification_calls_graph_api() { + let mut server = mockito::Server::new_async().await; + let mock = server + .mock("POST", "/123456/messages") + .match_header("authorization", "Bearer test-token") + .match_body(mockito::Matcher::PartialJsonString( + r#"{"type":"template"}"#.to_string(), + )) + .with_body(r#"{"messages": [{"id": "wamid.tpl123"}]}"#) + .create_async() + .await; + + let transport = WhatsAppTransport::with_api_base( + "123456".to_string(), + "test-token".to_string(), + server.url(), + ); + + let result = transport + .send_template_notification("15551234567", "my-story", "done") + .await; + assert!(result.is_ok(), "unexpected err: {:?}", result.err()); + assert_eq!(result.unwrap(), "wamid.tpl123"); + mock.assert_async().await; + } + + #[tokio::test] + async fn send_notification_uses_text_within_window() { + let mut server = mockito::Server::new_async().await; + let mock = server + .mock("POST", "/123456/messages") + .match_body(mockito::Matcher::PartialJsonString( + r#"{"type":"text"}"#.to_string(), + )) + .with_body(r#"{"messages": [{"id": "wamid.txt1"}]}"#) + .create_async() + .await; + + let transport = WhatsAppTransport::with_api_base( + "123456".to_string(), + "test-token".to_string(), + server.url(), + ); + let tracker = MessagingWindowTracker::new(); + tracker.record_message("15551234567"); + + let result = transport + .send_notification("15551234567", &tracker, "my-story", "done") + .await; + assert!(result.is_ok()); + mock.assert_async().await; + } + + #[tokio::test] + async fn send_notification_uses_template_outside_window() { + let mut server = mockito::Server::new_async().await; + let mock = server + .mock("POST", "/123456/messages") + .match_body(mockito::Matcher::PartialJsonString( + r#"{"type":"template"}"#.to_string(), + )) + .with_body(r#"{"messages": [{"id": "wamid.tpl2"}]}"#) + .create_async() + .await; + + let transport = WhatsAppTransport::with_api_base( + "123456".to_string(), + "test-token".to_string(), + server.url(), + ); + // No record_message call — user is outside the window. + let tracker = MessagingWindowTracker::new(); + + let result = transport + .send_notification("15551234567", &tracker, "my-story", "done") + .await; + assert!(result.is_ok()); + mock.assert_async().await; + } + + #[tokio::test] + async fn transport_send_message_calls_graph_api() { + let mut server = mockito::Server::new_async().await; + let mock = server + .mock("POST", "/123456/messages") + .match_header("authorization", "Bearer test-token") + .with_body(r#"{"messages": [{"id": "wamid.abc123"}]}"#) + .create_async() + .await; + + let transport = WhatsAppTransport::with_api_base( + "123456".to_string(), + "test-token".to_string(), + server.url(), + ); + + let result = transport + .send_message("15551234567", "hello", "

hello

") + .await; + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "wamid.abc123"); + mock.assert_async().await; + } + + #[tokio::test] + async fn transport_edit_sends_new_message() { + let mut server = mockito::Server::new_async().await; + let mock = server + .mock("POST", "/123456/messages") + .with_body(r#"{"messages": [{"id": "wamid.xyz"}]}"#) + .create_async() + .await; + + let transport = WhatsAppTransport::with_api_base( + "123456".to_string(), + "test-token".to_string(), + server.url(), + ); + + let result = transport + .edit_message("15551234567", "old-msg-id", "updated", "

updated

") + .await; + assert!(result.is_ok()); + mock.assert_async().await; + } + + #[tokio::test] + async fn transport_send_typing_succeeds() { + let transport = + WhatsAppTransport::new("123".to_string(), "tok".to_string(), "tpl".to_string()); + assert!(transport.send_typing("room1", true).await.is_ok()); + assert!(transport.send_typing("room1", false).await.is_ok()); + } + + #[tokio::test] + async fn transport_handles_api_error() { + let mut server = mockito::Server::new_async().await; + server + .mock("POST", "/123456/messages") + .with_status(401) + .with_body(r#"{"error": {"message": "Invalid token"}}"#) + .create_async() + .await; + + let transport = WhatsAppTransport::with_api_base( + "123456".to_string(), + "bad-token".to_string(), + server.url(), + ); + + let result = transport.send_message("15551234567", "hello", "").await; + assert!(result.is_err()); + assert!(result.unwrap_err().contains("401")); + } +} diff --git a/server/src/chat/transport/whatsapp/mod.rs b/server/src/chat/transport/whatsapp/mod.rs new file mode 100644 index 00000000..4722e3d4 --- /dev/null +++ b/server/src/chat/transport/whatsapp/mod.rs @@ -0,0 +1,312 @@ +//! WhatsApp Business API integration. +//! +//! Provides: +//! - [`WhatsAppTransport`] — a [`ChatTransport`] that sends messages via the +//! Meta Graph API (`graph.facebook.com/v21.0/{phone_number_id}/messages`). +//! - [`MessagingWindowTracker`] — tracks the 24-hour messaging window per user. +//! - [`webhook_verify`] / [`webhook_receive`] — Poem handlers for the WhatsApp +//! webhook (GET verification handshake + POST incoming messages). + +pub mod commands; +pub mod format; +pub mod history; +pub mod meta; +pub mod twilio; + +pub use history::{load_whatsapp_history, MessagingWindowTracker, WhatsAppConversationHistory}; +pub use meta::WhatsAppTransport; +pub use twilio::{extract_twilio_text_messages, TwilioWhatsAppTransport}; + +use serde::Deserialize; +use std::collections::{HashMap, HashSet}; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use tokio::sync::{Mutex as TokioMutex, oneshot}; + +use crate::agents::AgentPool; +use crate::chat::ChatTransport; +use crate::http::context::{PermissionDecision, PermissionForward}; +use crate::slog; +use poem::{Request, Response, handler, http::StatusCode, web::Query}; + +// ── Webhook types (Meta → us) ─────────────────────────────────────────── + +/// Top-level webhook payload from Meta. +#[derive(Deserialize, Debug)] +pub struct WebhookPayload { + #[serde(default)] + pub entry: Vec, +} + +#[derive(Deserialize, Debug)] +pub struct WebhookEntry { + #[serde(default)] + pub changes: Vec, +} + +#[derive(Deserialize, Debug)] +pub struct WebhookChange { + pub value: Option, +} + +#[derive(Deserialize, Debug)] +pub struct WebhookValue { + #[serde(default)] + pub messages: Vec, + #[allow(dead_code)] // Present in Meta webhook JSON, kept for deserialization + pub metadata: Option, +} + +#[derive(Deserialize, Debug)] +pub struct WebhookMetadata { + #[allow(dead_code)] + pub phone_number_id: Option, +} + +#[derive(Deserialize, Debug)] +pub struct WebhookMessage { + pub from: Option, + pub r#type: Option, + pub text: Option, +} + +#[derive(Deserialize, Debug)] +pub struct WebhookText { + pub body: Option, +} + +/// Extract text messages from a webhook payload. +/// +/// Returns `(sender_phone, message_body)` pairs. +pub fn extract_text_messages(payload: &WebhookPayload) -> Vec<(String, String)> { + let mut messages = Vec::new(); + for entry in &payload.entry { + for change in &entry.changes { + if let Some(value) = &change.value { + for msg in &value.messages { + if msg.r#type.as_deref() == Some("text") + && let (Some(from), Some(text)) = (&msg.from, &msg.text) + && let Some(body) = &text.body + { + messages.push((from.clone(), body.clone())); + } + } + } + } + } + messages +} + +/// Query parameters for the webhook verification GET request. +#[derive(Deserialize)] +pub struct VerifyQuery { + #[serde(rename = "hub.mode")] + pub hub_mode: Option, + #[serde(rename = "hub.verify_token")] + pub hub_verify_token: Option, + #[serde(rename = "hub.challenge")] + pub hub_challenge: Option, +} + +/// Shared context for webhook handlers, injected via Poem's `Data` extractor. +pub struct WhatsAppWebhookContext { + pub verify_token: String, + /// Active provider: `"meta"` (Meta Graph API) or `"twilio"` (Twilio REST API). + pub provider: String, + pub transport: Arc, + pub project_root: PathBuf, + pub agents: Arc, + pub bot_name: String, + /// The bot's "user ID" for command dispatch (e.g. "whatsapp-bot"). + pub bot_user_id: String, + pub ambient_rooms: Arc>>, + /// Per-sender conversation history for LLM passthrough. + pub history: WhatsAppConversationHistory, + /// Maximum number of conversation entries to keep per sender. + pub history_size: usize, + /// Tracks the 24-hour messaging window per user phone number. + pub window_tracker: Arc, + /// Phone numbers allowed to send messages to the bot. + /// When empty, all numbers are allowed (backwards compatible). + pub allowed_phones: Vec, + /// Permission requests from the MCP `prompt_permission` tool arrive here. + pub perm_rx: Arc>>, + /// Pending permission replies keyed by sender phone number. + pub pending_perm_replies: + Arc>>>, + /// Seconds before an unanswered permission prompt is auto-denied. + pub permission_timeout_secs: u64, +} + +/// GET /webhook/whatsapp — webhook verification. +/// +/// For Meta: responds to the `hub.mode=subscribe` challenge handshake. +/// For Twilio: Twilio does not send GET verification; always returns 200 OK. +#[handler] +pub async fn webhook_verify( + Query(q): Query, + ctx: poem::web::Data<&Arc>, +) -> Response { + // Twilio does not use a GET challenge; just acknowledge. + if ctx.provider == "twilio" { + return Response::builder().status(StatusCode::OK).body("ok"); + } + + // Meta verification handshake. + if q.hub_mode.as_deref() == Some("subscribe") + && q.hub_verify_token.as_deref() == Some(&ctx.verify_token) + && let Some(challenge) = q.hub_challenge + { + slog!("[whatsapp] Webhook verification succeeded"); + return Response::builder().status(StatusCode::OK).body(challenge); + } + slog!("[whatsapp] Webhook verification failed"); + Response::builder() + .status(StatusCode::FORBIDDEN) + .body("Verification failed") +} + +/// POST /webhook/whatsapp — receive incoming messages. +/// +/// Dispatches to the appropriate parser based on the configured provider: +/// - `"meta"`: parses Meta's JSON `WebhookPayload`. +/// - `"twilio"`: parses Twilio's `application/x-www-form-urlencoded` body. +/// +/// Both providers expect a `200 OK` response, even on parse errors. +#[handler] +pub async fn webhook_receive( + req: &Request, + body: poem::Body, + ctx: poem::web::Data<&Arc>, +) -> Response { + let _ = req; + let bytes = match body.into_bytes().await { + Ok(b) => b, + Err(e) => { + slog!("[whatsapp] Failed to read webhook body: {e}"); + return Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("Bad request"); + } + }; + + let messages = if ctx.provider == "twilio" { + let msgs = extract_twilio_text_messages(&bytes); + if msgs.is_empty() { + slog!("[whatsapp/twilio] No text messages in webhook body; ignoring"); + } + msgs + } else { + let payload: WebhookPayload = match serde_json::from_slice(&bytes) { + Ok(p) => p, + Err(e) => { + slog!("[whatsapp] Failed to parse webhook payload: {e}"); + // Meta expects 200 even on parse errors to avoid retries. + return Response::builder().status(StatusCode::OK).body("ok"); + } + }; + let msgs = extract_text_messages(&payload); + if msgs.is_empty() { + // Status updates, read receipts, etc. — acknowledge silently. + return Response::builder().status(StatusCode::OK).body("ok"); + } + msgs + }; + + if messages.is_empty() { + return Response::builder().status(StatusCode::OK).body("ok"); + } + + let ctx = Arc::clone(*ctx); + tokio::spawn(async move { + for (sender, text) in messages { + slog!("[whatsapp] Message from {sender}: {text}"); + commands::handle_incoming_message(&ctx, &sender, &text).await; + } + }); + + Response::builder().status(StatusCode::OK).body("ok") +} + +// ── Tests ─────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + // ── Existing webhook / transport tests ──────────────────────────── + + #[test] + fn extract_text_messages_parses_valid_payload() { + let json = r#"{ + "entry": [{ + "changes": [{ + "value": { + "messages": [{ + "from": "15551234567", + "type": "text", + "text": { "body": "help" } + }], + "metadata": { "phone_number_id": "123456" } + } + }] + }] + }"#; + let payload: WebhookPayload = serde_json::from_str(json).unwrap(); + let msgs = extract_text_messages(&payload); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].0, "15551234567"); + assert_eq!(msgs[0].1, "help"); + } + + #[test] + fn extract_text_messages_ignores_non_text() { + let json = r#"{ + "entry": [{ + "changes": [{ + "value": { + "messages": [{ + "from": "15551234567", + "type": "image", + "image": { "id": "img123" } + }], + "metadata": { "phone_number_id": "123456" } + } + }] + }] + }"#; + let payload: WebhookPayload = serde_json::from_str(json).unwrap(); + let msgs = extract_text_messages(&payload); + assert!(msgs.is_empty()); + } + + #[test] + fn extract_text_messages_handles_empty_payload() { + let json = r#"{ "entry": [] }"#; + let payload: WebhookPayload = serde_json::from_str(json).unwrap(); + let msgs = extract_text_messages(&payload); + assert!(msgs.is_empty()); + } + + #[test] + fn extract_text_messages_handles_multiple_messages() { + let json = r#"{ + "entry": [{ + "changes": [{ + "value": { + "messages": [ + { "from": "111", "type": "text", "text": { "body": "status" } }, + { "from": "222", "type": "text", "text": { "body": "help" } } + ], + "metadata": { "phone_number_id": "123456" } + } + }] + }] + }"#; + let payload: WebhookPayload = serde_json::from_str(json).unwrap(); + let msgs = extract_text_messages(&payload); + assert_eq!(msgs.len(), 2); + assert_eq!(msgs[0].1, "status"); + assert_eq!(msgs[1].1, "help"); + } +} diff --git a/server/src/chat/transport/whatsapp/twilio.rs b/server/src/chat/transport/whatsapp/twilio.rs new file mode 100644 index 00000000..ff1c5c16 --- /dev/null +++ b/server/src/chat/transport/whatsapp/twilio.rs @@ -0,0 +1,320 @@ +use async_trait::async_trait; +use serde::Deserialize; + +use crate::chat::{ChatTransport, MessageId}; +use crate::slog; + +// ── API base URL ───────────────────────────────────────────────────────── + +pub(super) const TWILIO_API_BASE: &str = "https://api.twilio.com"; + +// ── Twilio Transport ──────────────────────────────────────────────────── + +/// WhatsApp transport that routes through Twilio's REST API. +/// +/// Sends messages via `POST {TWILIO_API_BASE}/2010-04-01/Accounts/{account_sid}/Messages.json` +/// using HTTP Basic Auth (Account SID as username, Auth Token as password). +/// +/// Inbound messages from Twilio arrive as `application/x-www-form-urlencoded` +/// POST bodies; use [`extract_twilio_text_messages`] to parse them. +pub struct TwilioWhatsAppTransport { + account_sid: String, + auth_token: String, + /// Sender number in E.164 format, e.g. `+14155551234`. + from_number: String, + client: reqwest::Client, + /// Optional base URL override for tests. + api_base: String, +} + +impl TwilioWhatsAppTransport { + pub fn new(account_sid: String, auth_token: String, from_number: String) -> Self { + Self { + account_sid, + auth_token, + from_number, + client: reqwest::Client::new(), + api_base: TWILIO_API_BASE.to_string(), + } + } + + #[cfg(test)] + pub(crate) fn with_api_base( + account_sid: String, + auth_token: String, + from_number: String, + api_base: String, + ) -> Self { + Self { + account_sid, + auth_token, + from_number, + client: reqwest::Client::new(), + api_base, + } + } + + /// Send a WhatsApp message via Twilio's Messaging REST API. + async fn send_text(&self, to: &str, body: &str) -> Result { + let url = format!( + "{}/2010-04-01/Accounts/{}/Messages.json", + self.api_base, self.account_sid + ); + + // Twilio expects the WhatsApp number with a "whatsapp:" prefix. + let from = if self.from_number.starts_with("whatsapp:") { + self.from_number.clone() + } else { + format!("whatsapp:{}", self.from_number) + }; + let to_wa = if to.starts_with("whatsapp:") { + to.to_string() + } else { + format!("whatsapp:{}", to) + }; + + let params = [ + ("From", from.as_str()), + ("To", to_wa.as_str()), + ("Body", body), + ]; + + let resp = self + .client + .post(&url) + .basic_auth(&self.account_sid, Some(&self.auth_token)) + .form(¶ms) + .send() + .await + .map_err(|e| format!("Twilio API request failed: {e}"))?; + + let status = resp.status(); + let resp_text = resp + .text() + .await + .unwrap_or_else(|_| "".to_string()); + + if !status.is_success() { + return Err(format!("Twilio API returned {status}: {resp_text}")); + } + + let parsed: TwilioSendResponse = serde_json::from_str(&resp_text) + .map_err(|e| format!("Failed to parse Twilio API response: {e} — body: {resp_text}"))?; + + Ok(parsed.sid.unwrap_or_default()) + } +} + +#[async_trait] +impl ChatTransport for TwilioWhatsAppTransport { + async fn send_message( + &self, + recipient: &str, + plain: &str, + _html: &str, + ) -> Result { + slog!("[whatsapp/twilio] send_message to {recipient}: {plain:.80}"); + self.send_text(recipient, plain).await + } + + async fn edit_message( + &self, + recipient: &str, + _original_message_id: &str, + plain: &str, + html: &str, + ) -> Result<(), String> { + // Twilio does not support message editing — send a new message. + slog!( + "[whatsapp/twilio] edit_message — Twilio does not support edits, sending new message" + ); + self.send_message(recipient, plain, html).await.map(|_| ()) + } + + async fn send_typing(&self, _recipient: &str, _typing: bool) -> Result<(), String> { + // Twilio WhatsApp API does not expose typing indicators. + Ok(()) + } +} + +// ── Twilio API request/response types ────────────────────────────────── + +#[derive(Deserialize)] +struct TwilioSendResponse { + sid: Option, +} + +// ── Twilio webhook types (Twilio → us) ───────────────────────────────── + +/// Form-encoded fields from a Twilio WhatsApp inbound webhook POST. +#[derive(Deserialize, Debug)] +pub struct TwilioWebhookForm { + /// Sender number with `whatsapp:` prefix, e.g. `whatsapp:+15551234567`. + #[serde(rename = "From")] + pub from: Option, + /// Message body text. + #[serde(rename = "Body")] + pub body: Option, +} + +/// Extract text messages from a Twilio form-encoded webhook body. +/// +/// Returns `(sender_phone, message_body)` pairs, with the `whatsapp:` prefix +/// stripped from the sender number. +pub fn extract_twilio_text_messages(bytes: &[u8]) -> Vec<(String, String)> { + let form: TwilioWebhookForm = match serde_urlencoded::from_bytes(bytes) { + Ok(f) => f, + Err(e) => { + slog!("[whatsapp/twilio] Failed to parse webhook form body: {e}"); + return vec![]; + } + }; + + let from = match form.from { + Some(f) => f, + None => return vec![], + }; + let body = match form.body { + Some(b) if !b.is_empty() => b, + _ => return vec![], + }; + + // Strip the "whatsapp:" prefix so the sender is stored as a plain phone number. + let sender = from.strip_prefix("whatsapp:").unwrap_or(&from).to_string(); + + vec![(sender, body)] +} + +// ── Tests ─────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + // ── TwilioWhatsAppTransport tests ───────────────────────────────── + + #[tokio::test] + async fn twilio_send_message_calls_twilio_api() { + let mut server = mockito::Server::new_async().await; + let mock = server + .mock("POST", "/2010-04-01/Accounts/ACtest/Messages.json") + .with_body(r#"{"sid": "SMtest123"}"#) + .create_async() + .await; + + let transport = TwilioWhatsAppTransport::with_api_base( + "ACtest".to_string(), + "authtoken".to_string(), + "+14155551234".to_string(), + server.url(), + ); + + let result = transport.send_message("+15551234567", "hello", "").await; + assert!(result.is_ok(), "unexpected err: {:?}", result.err()); + assert_eq!(result.unwrap(), "SMtest123"); + mock.assert_async().await; + } + + #[tokio::test] + async fn twilio_send_message_returns_err_on_api_error() { + let mut server = mockito::Server::new_async().await; + server + .mock("POST", "/2010-04-01/Accounts/ACtest/Messages.json") + .with_status(401) + .with_body(r#"{"message": "Unauthorized"}"#) + .create_async() + .await; + + let transport = TwilioWhatsAppTransport::with_api_base( + "ACtest".to_string(), + "badtoken".to_string(), + "+14155551234".to_string(), + server.url(), + ); + + let result = transport.send_message("+15551234567", "hello", "").await; + assert!(result.is_err()); + assert!(result.unwrap_err().contains("401")); + } + + #[tokio::test] + async fn twilio_edit_message_sends_new_message() { + let mut server = mockito::Server::new_async().await; + let mock = server + .mock("POST", "/2010-04-01/Accounts/ACtest/Messages.json") + .with_body(r#"{"sid": "SMedit456"}"#) + .create_async() + .await; + + let transport = TwilioWhatsAppTransport::with_api_base( + "ACtest".to_string(), + "authtoken".to_string(), + "+14155551234".to_string(), + server.url(), + ); + + let result = transport + .edit_message("+15551234567", "old-sid", "updated text", "") + .await; + assert!(result.is_ok()); + mock.assert_async().await; + } + + #[tokio::test] + async fn twilio_send_typing_is_noop() { + let transport = TwilioWhatsAppTransport::new( + "ACtest".to_string(), + "authtoken".to_string(), + "+14155551234".to_string(), + ); + assert!(transport.send_typing("+15551234567", true).await.is_ok()); + } + + // ── extract_twilio_text_messages tests ──────────────────────────── + + #[test] + fn extract_twilio_text_messages_parses_valid_form() { + let body = b"From=whatsapp%3A%2B15551234567&Body=hello+world&To=whatsapp%3A%2B14155551234&MessageSid=SMtest"; + let msgs = extract_twilio_text_messages(body); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].0, "+15551234567"); + assert_eq!(msgs[0].1, "hello world"); + } + + #[test] + fn extract_twilio_text_messages_strips_whatsapp_prefix() { + let body = b"From=whatsapp%3A%2B15551234567&Body=hi"; + let msgs = extract_twilio_text_messages(body); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].0, "+15551234567"); + } + + #[test] + fn extract_twilio_text_messages_returns_empty_on_missing_from() { + let body = b"Body=hello"; + let msgs = extract_twilio_text_messages(body); + assert!(msgs.is_empty()); + } + + #[test] + fn extract_twilio_text_messages_returns_empty_on_missing_body() { + let body = b"From=whatsapp%3A%2B15551234567"; + let msgs = extract_twilio_text_messages(body); + assert!(msgs.is_empty()); + } + + #[test] + fn extract_twilio_text_messages_returns_empty_on_empty_body() { + let body = b"From=whatsapp%3A%2B15551234567&Body="; + let msgs = extract_twilio_text_messages(body); + assert!(msgs.is_empty()); + } + + #[test] + fn extract_twilio_text_messages_returns_empty_on_invalid_form() { + let body = b"not valid form encoded {{{{"; + // serde_urlencoded is lenient, so this might parse or return empty + // Either way it must not panic. + let _msgs = extract_twilio_text_messages(body); + } +}