From e9a0858d532ab356717ca1d575e9d98d5c654165 Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 19 Mar 2026 23:54:51 +0000 Subject: [PATCH] story-kit: merge 323_story_whatsapp_llm_passthrough_for_conversational_queries --- server/src/main.rs | 6 +- server/src/matrix/mod.rs | 1 + server/src/whatsapp.rs | 410 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 408 insertions(+), 9 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 811e363..41faaac 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -207,14 +207,18 @@ async fn main() -> Result<(), std::io::Error> { .display_name .clone() .unwrap_or_else(|| "Assistant".to_string()); + let root = startup_root.clone().unwrap(); + let history = whatsapp::load_whatsapp_history(&root); Arc::new(whatsapp::WhatsAppWebhookContext { verify_token: cfg.whatsapp_verify_token.clone().unwrap_or_default(), transport, - project_root: startup_root.clone().unwrap(), + project_root: root, agents: Arc::clone(&startup_agents), bot_name, bot_user_id: "whatsapp-bot".to_string(), ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())), + history: std::sync::Arc::new(tokio::sync::Mutex::new(history)), + history_size: cfg.history_size, }) }); diff --git a/server/src/matrix/mod.rs b/server/src/matrix/mod.rs index e672b91..c6537fe 100644 --- a/server/src/matrix/mod.rs +++ b/server/src/matrix/mod.rs @@ -23,6 +23,7 @@ pub mod htop; pub mod notifications; pub mod transport_impl; +pub use bot::{ConversationEntry, ConversationRole, RoomConversation, drain_complete_paragraphs}; pub use config::BotConfig; use crate::agents::AgentPool; diff --git a/server/src/whatsapp.rs b/server/src/whatsapp.rs index 05265dc..ce263b8 100644 --- a/server/src/whatsapp.rs +++ b/server/src/whatsapp.rs @@ -8,9 +8,12 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::sync::Arc; +use tokio::sync::Mutex as TokioMutex; use crate::agents::AgentPool; +use crate::matrix::{ConversationEntry, ConversationRole, RoomConversation}; use crate::slog; use crate::transport::{ChatTransport, MessageId}; @@ -223,6 +226,103 @@ pub fn extract_text_messages(payload: &WebhookPayload) -> Vec<(String, String)> messages } +// ── WhatsApp message size limit ────────────────────────────────────── + +/// WhatsApp Business API maximum message body size in characters. +const WHATSAPP_MAX_MESSAGE_LEN: usize = 4096; + +/// Split a text into chunks that fit within WhatsApp's message size limit. +/// +/// Tries to split on paragraph boundaries (`\n\n`), falling back to line +/// boundaries (`\n`), and finally hard-splitting at the character limit. +pub fn chunk_for_whatsapp(text: &str) -> Vec { + if text.len() <= WHATSAPP_MAX_MESSAGE_LEN { + return vec![text.to_string()]; + } + + let mut chunks = Vec::new(); + let mut remaining = text; + + while !remaining.is_empty() { + if remaining.len() <= WHATSAPP_MAX_MESSAGE_LEN { + chunks.push(remaining.to_string()); + break; + } + + // Find the best split point within the limit. + let window = &remaining[..WHATSAPP_MAX_MESSAGE_LEN]; + + // Prefer paragraph boundary. + let split_pos = window + .rfind("\n\n") + .or_else(|| window.rfind('\n')) + .unwrap_or(WHATSAPP_MAX_MESSAGE_LEN); + + let (chunk, rest) = remaining.split_at(split_pos); + let chunk = chunk.trim(); + if !chunk.is_empty() { + chunks.push(chunk.to_string()); + } + + // Skip the delimiter. + remaining = rest.trim_start_matches('\n'); + } + + chunks +} + +// ── Conversation history persistence ───────────────────────────────── + +/// Per-sender conversation history, keyed by phone number. +pub type WhatsAppConversationHistory = Arc>>; + +/// On-disk format for persisted WhatsApp conversation history. +#[derive(Serialize, Deserialize)] +struct PersistedWhatsAppHistory { + senders: HashMap, +} + +/// Path to the persisted WhatsApp conversation history file. +const WHATSAPP_HISTORY_FILE: &str = ".story_kit/whatsapp_history.json"; + +/// Load WhatsApp conversation history from disk. +pub fn load_whatsapp_history( + project_root: &std::path::Path, +) -> HashMap { + let path = project_root.join(WHATSAPP_HISTORY_FILE); + let data = match std::fs::read_to_string(&path) { + Ok(d) => d, + Err(_) => return HashMap::new(), + }; + let persisted: PersistedWhatsAppHistory = match serde_json::from_str(&data) { + Ok(p) => p, + Err(e) => { + slog!("[whatsapp] Failed to parse history file: {e}"); + return HashMap::new(); + } + }; + persisted.senders +} + +/// Save WhatsApp conversation history to disk. +fn save_whatsapp_history( + project_root: &std::path::Path, + history: &HashMap, +) { + let persisted = PersistedWhatsAppHistory { + senders: history.clone(), + }; + let path = project_root.join(WHATSAPP_HISTORY_FILE); + match serde_json::to_string_pretty(&persisted) { + Ok(json) => { + if let Err(e) = std::fs::write(&path, json) { + slog!("[whatsapp] Failed to write history file: {e}"); + } + } + Err(e) => slog!("[whatsapp] Failed to serialise history: {e}"), + } +} + // ── Webhook handlers (Poem) ──────────────────────────────────────────── use poem::{Request, Response, handler, http::StatusCode, web::Query}; @@ -251,6 +351,10 @@ pub struct WhatsAppWebhookContext { /// The bot's "user ID" for command dispatch (e.g. "whatsapp-bot"). pub bot_user_id: String, pub ambient_rooms: Arc>>, + /// Per-sender conversation history for LLM passthrough. + pub history: WhatsAppConversationHistory, + /// Maximum number of conversation entries to keep per sender. + pub history_size: usize, } /// GET /webhook/whatsapp — Meta verification handshake. @@ -409,16 +513,153 @@ async fn handle_incoming_message( return; } - // No command matched — inform the user that only commands are supported. - // (LLM passthrough is a separate story.) - let _ = ctx - .transport - .send_message( - sender, - "I only respond to commands right now. Try `help` to see what's available.", - "", + // No command matched — forward to LLM for conversational response. + slog!("[whatsapp] No command matched, forwarding to LLM for {sender}"); + handle_llm_message(ctx, sender, message).await; +} + +/// Forward a message to Claude Code and send the response back via WhatsApp. +async fn handle_llm_message( + ctx: &WhatsAppWebhookContext, + sender: &str, + user_message: &str, +) { + use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult}; + use crate::matrix::drain_complete_paragraphs; + use std::sync::atomic::{AtomicBool, Ordering}; + use tokio::sync::watch; + + // Look up existing session ID for this sender. + let resume_session_id: Option = { + let guard = ctx.history.lock().await; + guard + .get(sender) + .and_then(|conv| conv.session_id.clone()) + }; + + let bot_name = &ctx.bot_name; + let prompt = format!( + "[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{sender}: {user_message}" + ); + + let provider = ClaudeCodeProvider::new(); + let (_cancel_tx, mut cancel_rx) = watch::channel(false); + + // Channel for sending complete chunks to the WhatsApp posting task. + let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); + let msg_tx_for_callback = msg_tx.clone(); + + // Spawn a task to post messages as they arrive. + let post_transport = Arc::clone(&ctx.transport); + let post_sender = sender.to_string(); + let post_task = tokio::spawn(async move { + while let Some(chunk) = msg_rx.recv().await { + // Split into WhatsApp-sized chunks. + for part in chunk_for_whatsapp(&chunk) { + let _ = post_transport.send_message(&post_sender, &part, "").await; + } + } + }); + + // Shared buffer between the sync token callback and the async scope. + let buffer = Arc::new(std::sync::Mutex::new(String::new())); + let buffer_for_callback = Arc::clone(&buffer); + let sent_any_chunk = Arc::new(AtomicBool::new(false)); + let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk); + + let project_root_str = ctx.project_root.to_string_lossy().to_string(); + let result = provider + .chat_stream( + &prompt, + &project_root_str, + resume_session_id.as_deref(), + None, + &mut cancel_rx, + move |token| { + let mut buf = buffer_for_callback.lock().unwrap(); + buf.push_str(token); + let paragraphs = drain_complete_paragraphs(&mut buf); + for chunk in paragraphs { + sent_any_chunk_for_callback.store(true, Ordering::Relaxed); + let _ = msg_tx_for_callback.send(chunk); + } + }, + |_thinking| {}, + |_activity| {}, ) .await; + + // Flush remaining text. + let remaining = buffer.lock().unwrap().trim().to_string(); + let did_send_any = sent_any_chunk.load(Ordering::Relaxed); + + let (assistant_reply, new_session_id) = match result { + Ok(ClaudeCodeResult { + messages, + session_id, + }) => { + let reply = if !remaining.is_empty() { + let _ = msg_tx.send(remaining.clone()); + remaining + } else if !did_send_any { + let last_text = messages + .iter() + .rev() + .find(|m| { + m.role == crate::llm::types::Role::Assistant && !m.content.is_empty() + }) + .map(|m| m.content.clone()) + .unwrap_or_default(); + if !last_text.is_empty() { + let _ = msg_tx.send(last_text.clone()); + } + last_text + } else { + remaining + }; + slog!("[whatsapp] session_id from chat_stream: {:?}", session_id); + (reply, session_id) + } + Err(e) => { + slog!("[whatsapp] LLM error: {e}"); + let err_msg = format!("Error processing your request: {e}"); + let _ = msg_tx.send(err_msg.clone()); + (err_msg, None) + } + }; + + // Signal the posting task to finish and wait for it. + drop(msg_tx); + let _ = post_task.await; + + // Record this exchange in conversation history. + if !assistant_reply.starts_with("Error processing") { + let mut guard = ctx.history.lock().await; + let conv = guard.entry(sender.to_string()).or_default(); + + if new_session_id.is_some() { + conv.session_id = new_session_id; + } + + conv.entries.push(ConversationEntry { + role: ConversationRole::User, + sender: sender.to_string(), + content: user_message.to_string(), + }); + conv.entries.push(ConversationEntry { + role: ConversationRole::Assistant, + sender: String::new(), + content: assistant_reply, + }); + + // Trim to configured maximum. + if conv.entries.len() > ctx.history_size { + let excess = conv.entries.len() - ctx.history_size; + conv.entries.drain(..excess); + } + + save_whatsapp_history(&ctx.project_root, &guard); + } } // ── Tests ─────────────────────────────────────────────────────────────── @@ -576,4 +817,157 @@ mod tests { assert!(result.is_err()); assert!(result.unwrap_err().contains("401")); } + + // ── chunk_for_whatsapp tests ──────────────────────────────────────── + + #[test] + fn chunk_short_message_returns_single_chunk() { + let chunks = chunk_for_whatsapp("Hello world"); + assert_eq!(chunks, vec!["Hello world"]); + } + + #[test] + fn chunk_exactly_at_limit_returns_single_chunk() { + let text = "a".repeat(WHATSAPP_MAX_MESSAGE_LEN); + let chunks = chunk_for_whatsapp(&text); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].len(), WHATSAPP_MAX_MESSAGE_LEN); + } + + #[test] + fn chunk_splits_on_paragraph_boundary() { + // Create text with a paragraph boundary near the split point. + let first_para = "a".repeat(4000); + let second_para = "b".repeat(200); + let text = format!("{first_para}\n\n{second_para}"); + let chunks = chunk_for_whatsapp(&text); + assert_eq!(chunks.len(), 2); + assert_eq!(chunks[0], first_para); + assert_eq!(chunks[1], second_para); + } + + #[test] + fn chunk_splits_on_line_boundary_when_no_paragraph_break() { + let first_line = "a".repeat(4000); + let second_line = "b".repeat(200); + let text = format!("{first_line}\n{second_line}"); + let chunks = chunk_for_whatsapp(&text); + assert_eq!(chunks.len(), 2); + assert_eq!(chunks[0], first_line); + assert_eq!(chunks[1], second_line); + } + + #[test] + fn chunk_hard_splits_continuous_text() { + let text = "x".repeat(WHATSAPP_MAX_MESSAGE_LEN * 2 + 100); + let chunks = chunk_for_whatsapp(&text); + assert!(chunks.len() >= 2); + for chunk in &chunks { + assert!(chunk.len() <= WHATSAPP_MAX_MESSAGE_LEN); + } + // Verify all content is preserved. + let reassembled: String = chunks.join(""); + assert_eq!(reassembled.len(), text.len()); + } + + #[test] + fn chunk_empty_string_returns_single_empty() { + let chunks = chunk_for_whatsapp(""); + assert_eq!(chunks, vec![""]); + } + + // ── WhatsApp history persistence tests ────────────────────────────── + + #[test] + fn save_and_load_whatsapp_history_round_trips() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".story_kit"); + std::fs::create_dir_all(&sk).unwrap(); + + let mut history = HashMap::new(); + history.insert( + "15551234567".to_string(), + RoomConversation { + session_id: Some("sess-abc".to_string()), + entries: vec![ + ConversationEntry { + role: ConversationRole::User, + sender: "15551234567".to_string(), + content: "hello".to_string(), + }, + ConversationEntry { + role: ConversationRole::Assistant, + sender: String::new(), + content: "hi there!".to_string(), + }, + ], + }, + ); + + save_whatsapp_history(tmp.path(), &history); + let loaded = load_whatsapp_history(tmp.path()); + + assert_eq!(loaded.len(), 1); + let conv = loaded.get("15551234567").unwrap(); + assert_eq!(conv.session_id.as_deref(), Some("sess-abc")); + assert_eq!(conv.entries.len(), 2); + assert_eq!(conv.entries[0].content, "hello"); + assert_eq!(conv.entries[1].content, "hi there!"); + } + + #[test] + fn load_whatsapp_history_returns_empty_when_file_missing() { + let tmp = tempfile::tempdir().unwrap(); + let history = load_whatsapp_history(tmp.path()); + assert!(history.is_empty()); + } + + #[test] + fn load_whatsapp_history_returns_empty_on_invalid_json() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".story_kit"); + std::fs::create_dir_all(&sk).unwrap(); + std::fs::write(sk.join("whatsapp_history.json"), "not json {{{").unwrap(); + let history = load_whatsapp_history(tmp.path()); + assert!(history.is_empty()); + } + + #[test] + fn save_whatsapp_history_preserves_multiple_senders() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".story_kit"); + std::fs::create_dir_all(&sk).unwrap(); + + let mut history = HashMap::new(); + history.insert( + "111".to_string(), + RoomConversation { + session_id: None, + entries: vec![ConversationEntry { + role: ConversationRole::User, + sender: "111".to_string(), + content: "msg1".to_string(), + }], + }, + ); + history.insert( + "222".to_string(), + RoomConversation { + session_id: Some("sess-222".to_string()), + entries: vec![ConversationEntry { + role: ConversationRole::User, + sender: "222".to_string(), + content: "msg2".to_string(), + }], + }, + ); + + save_whatsapp_history(tmp.path(), &history); + let loaded = load_whatsapp_history(tmp.path()); + + assert_eq!(loaded.len(), 2); + assert!(loaded.contains_key("111")); + assert!(loaded.contains_key("222")); + assert_eq!(loaded["222"].session_id.as_deref(), Some("sess-222")); + } }