story-kit: merge 323_story_whatsapp_llm_passthrough_for_conversational_queries

This commit is contained in:
Dave
2026-03-19 23:54:51 +00:00
parent c84c33a1a7
commit e9a0858d53
3 changed files with 408 additions and 9 deletions

View File

@@ -207,14 +207,18 @@ async fn main() -> Result<(), std::io::Error> {
.display_name .display_name
.clone() .clone()
.unwrap_or_else(|| "Assistant".to_string()); .unwrap_or_else(|| "Assistant".to_string());
let root = startup_root.clone().unwrap();
let history = whatsapp::load_whatsapp_history(&root);
Arc::new(whatsapp::WhatsAppWebhookContext { Arc::new(whatsapp::WhatsAppWebhookContext {
verify_token: cfg.whatsapp_verify_token.clone().unwrap_or_default(), verify_token: cfg.whatsapp_verify_token.clone().unwrap_or_default(),
transport, transport,
project_root: startup_root.clone().unwrap(), project_root: root,
agents: Arc::clone(&startup_agents), agents: Arc::clone(&startup_agents),
bot_name, bot_name,
bot_user_id: "whatsapp-bot".to_string(), bot_user_id: "whatsapp-bot".to_string(),
ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())), ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())),
history: std::sync::Arc::new(tokio::sync::Mutex::new(history)),
history_size: cfg.history_size,
}) })
}); });

View File

@@ -23,6 +23,7 @@ pub mod htop;
pub mod notifications; pub mod notifications;
pub mod transport_impl; pub mod transport_impl;
pub use bot::{ConversationEntry, ConversationRole, RoomConversation, drain_complete_paragraphs};
pub use config::BotConfig; pub use config::BotConfig;
use crate::agents::AgentPool; use crate::agents::AgentPool;

View File

@@ -8,9 +8,12 @@
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Mutex as TokioMutex;
use crate::agents::AgentPool; use crate::agents::AgentPool;
use crate::matrix::{ConversationEntry, ConversationRole, RoomConversation};
use crate::slog; use crate::slog;
use crate::transport::{ChatTransport, MessageId}; use crate::transport::{ChatTransport, MessageId};
@@ -223,6 +226,103 @@ pub fn extract_text_messages(payload: &WebhookPayload) -> Vec<(String, String)>
messages messages
} }
// ── WhatsApp message size limit ──────────────────────────────────────
/// WhatsApp Business API maximum message body size in characters.
const WHATSAPP_MAX_MESSAGE_LEN: usize = 4096;
/// Split a text into chunks that fit within WhatsApp's message size limit.
///
/// Tries to split on paragraph boundaries (`\n\n`), falling back to line
/// boundaries (`\n`), and finally hard-splitting at the character limit.
pub fn chunk_for_whatsapp(text: &str) -> Vec<String> {
if text.len() <= WHATSAPP_MAX_MESSAGE_LEN {
return vec![text.to_string()];
}
let mut chunks = Vec::new();
let mut remaining = text;
while !remaining.is_empty() {
if remaining.len() <= WHATSAPP_MAX_MESSAGE_LEN {
chunks.push(remaining.to_string());
break;
}
// Find the best split point within the limit.
let window = &remaining[..WHATSAPP_MAX_MESSAGE_LEN];
// Prefer paragraph boundary.
let split_pos = window
.rfind("\n\n")
.or_else(|| window.rfind('\n'))
.unwrap_or(WHATSAPP_MAX_MESSAGE_LEN);
let (chunk, rest) = remaining.split_at(split_pos);
let chunk = chunk.trim();
if !chunk.is_empty() {
chunks.push(chunk.to_string());
}
// Skip the delimiter.
remaining = rest.trim_start_matches('\n');
}
chunks
}
// ── Conversation history persistence ─────────────────────────────────
/// Per-sender conversation history, keyed by phone number.
pub type WhatsAppConversationHistory = Arc<TokioMutex<HashMap<String, RoomConversation>>>;
/// On-disk format for persisted WhatsApp conversation history.
#[derive(Serialize, Deserialize)]
struct PersistedWhatsAppHistory {
senders: HashMap<String, RoomConversation>,
}
/// Path to the persisted WhatsApp conversation history file.
const WHATSAPP_HISTORY_FILE: &str = ".story_kit/whatsapp_history.json";
/// Load WhatsApp conversation history from disk.
pub fn load_whatsapp_history(
project_root: &std::path::Path,
) -> HashMap<String, RoomConversation> {
let path = project_root.join(WHATSAPP_HISTORY_FILE);
let data = match std::fs::read_to_string(&path) {
Ok(d) => d,
Err(_) => return HashMap::new(),
};
let persisted: PersistedWhatsAppHistory = match serde_json::from_str(&data) {
Ok(p) => p,
Err(e) => {
slog!("[whatsapp] Failed to parse history file: {e}");
return HashMap::new();
}
};
persisted.senders
}
/// Save WhatsApp conversation history to disk.
fn save_whatsapp_history(
project_root: &std::path::Path,
history: &HashMap<String, RoomConversation>,
) {
let persisted = PersistedWhatsAppHistory {
senders: history.clone(),
};
let path = project_root.join(WHATSAPP_HISTORY_FILE);
match serde_json::to_string_pretty(&persisted) {
Ok(json) => {
if let Err(e) = std::fs::write(&path, json) {
slog!("[whatsapp] Failed to write history file: {e}");
}
}
Err(e) => slog!("[whatsapp] Failed to serialise history: {e}"),
}
}
// ── Webhook handlers (Poem) ──────────────────────────────────────────── // ── Webhook handlers (Poem) ────────────────────────────────────────────
use poem::{Request, Response, handler, http::StatusCode, web::Query}; use poem::{Request, Response, handler, http::StatusCode, web::Query};
@@ -251,6 +351,10 @@ pub struct WhatsAppWebhookContext {
/// The bot's "user ID" for command dispatch (e.g. "whatsapp-bot"). /// The bot's "user ID" for command dispatch (e.g. "whatsapp-bot").
pub bot_user_id: String, pub bot_user_id: String,
pub ambient_rooms: Arc<Mutex<HashSet<String>>>, pub ambient_rooms: Arc<Mutex<HashSet<String>>>,
/// Per-sender conversation history for LLM passthrough.
pub history: WhatsAppConversationHistory,
/// Maximum number of conversation entries to keep per sender.
pub history_size: usize,
} }
/// GET /webhook/whatsapp — Meta verification handshake. /// GET /webhook/whatsapp — Meta verification handshake.
@@ -409,16 +513,153 @@ async fn handle_incoming_message(
return; return;
} }
// No command matched — inform the user that only commands are supported. // No command matched — forward to LLM for conversational response.
// (LLM passthrough is a separate story.) slog!("[whatsapp] No command matched, forwarding to LLM for {sender}");
let _ = ctx handle_llm_message(ctx, sender, message).await;
.transport }
.send_message(
sender, /// Forward a message to Claude Code and send the response back via WhatsApp.
"I only respond to commands right now. Try `help` to see what's available.", async fn handle_llm_message(
"", ctx: &WhatsAppWebhookContext,
sender: &str,
user_message: &str,
) {
use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult};
use crate::matrix::drain_complete_paragraphs;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::watch;
// Look up existing session ID for this sender.
let resume_session_id: Option<String> = {
let guard = ctx.history.lock().await;
guard
.get(sender)
.and_then(|conv| conv.session_id.clone())
};
let bot_name = &ctx.bot_name;
let prompt = format!(
"[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{sender}: {user_message}"
);
let provider = ClaudeCodeProvider::new();
let (_cancel_tx, mut cancel_rx) = watch::channel(false);
// Channel for sending complete chunks to the WhatsApp posting task.
let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let msg_tx_for_callback = msg_tx.clone();
// Spawn a task to post messages as they arrive.
let post_transport = Arc::clone(&ctx.transport);
let post_sender = sender.to_string();
let post_task = tokio::spawn(async move {
while let Some(chunk) = msg_rx.recv().await {
// Split into WhatsApp-sized chunks.
for part in chunk_for_whatsapp(&chunk) {
let _ = post_transport.send_message(&post_sender, &part, "").await;
}
}
});
// Shared buffer between the sync token callback and the async scope.
let buffer = Arc::new(std::sync::Mutex::new(String::new()));
let buffer_for_callback = Arc::clone(&buffer);
let sent_any_chunk = Arc::new(AtomicBool::new(false));
let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk);
let project_root_str = ctx.project_root.to_string_lossy().to_string();
let result = provider
.chat_stream(
&prompt,
&project_root_str,
resume_session_id.as_deref(),
None,
&mut cancel_rx,
move |token| {
let mut buf = buffer_for_callback.lock().unwrap();
buf.push_str(token);
let paragraphs = drain_complete_paragraphs(&mut buf);
for chunk in paragraphs {
sent_any_chunk_for_callback.store(true, Ordering::Relaxed);
let _ = msg_tx_for_callback.send(chunk);
}
},
|_thinking| {},
|_activity| {},
) )
.await; .await;
// Flush remaining text.
let remaining = buffer.lock().unwrap().trim().to_string();
let did_send_any = sent_any_chunk.load(Ordering::Relaxed);
let (assistant_reply, new_session_id) = match result {
Ok(ClaudeCodeResult {
messages,
session_id,
}) => {
let reply = if !remaining.is_empty() {
let _ = msg_tx.send(remaining.clone());
remaining
} else if !did_send_any {
let last_text = messages
.iter()
.rev()
.find(|m| {
m.role == crate::llm::types::Role::Assistant && !m.content.is_empty()
})
.map(|m| m.content.clone())
.unwrap_or_default();
if !last_text.is_empty() {
let _ = msg_tx.send(last_text.clone());
}
last_text
} else {
remaining
};
slog!("[whatsapp] session_id from chat_stream: {:?}", session_id);
(reply, session_id)
}
Err(e) => {
slog!("[whatsapp] LLM error: {e}");
let err_msg = format!("Error processing your request: {e}");
let _ = msg_tx.send(err_msg.clone());
(err_msg, None)
}
};
// Signal the posting task to finish and wait for it.
drop(msg_tx);
let _ = post_task.await;
// Record this exchange in conversation history.
if !assistant_reply.starts_with("Error processing") {
let mut guard = ctx.history.lock().await;
let conv = guard.entry(sender.to_string()).or_default();
if new_session_id.is_some() {
conv.session_id = new_session_id;
}
conv.entries.push(ConversationEntry {
role: ConversationRole::User,
sender: sender.to_string(),
content: user_message.to_string(),
});
conv.entries.push(ConversationEntry {
role: ConversationRole::Assistant,
sender: String::new(),
content: assistant_reply,
});
// Trim to configured maximum.
if conv.entries.len() > ctx.history_size {
let excess = conv.entries.len() - ctx.history_size;
conv.entries.drain(..excess);
}
save_whatsapp_history(&ctx.project_root, &guard);
}
} }
// ── Tests ─────────────────────────────────────────────────────────────── // ── Tests ───────────────────────────────────────────────────────────────
@@ -576,4 +817,157 @@ mod tests {
assert!(result.is_err()); assert!(result.is_err());
assert!(result.unwrap_err().contains("401")); assert!(result.unwrap_err().contains("401"));
} }
// ── chunk_for_whatsapp tests ────────────────────────────────────────
#[test]
fn chunk_short_message_returns_single_chunk() {
let chunks = chunk_for_whatsapp("Hello world");
assert_eq!(chunks, vec!["Hello world"]);
}
#[test]
fn chunk_exactly_at_limit_returns_single_chunk() {
let text = "a".repeat(WHATSAPP_MAX_MESSAGE_LEN);
let chunks = chunk_for_whatsapp(&text);
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].len(), WHATSAPP_MAX_MESSAGE_LEN);
}
#[test]
fn chunk_splits_on_paragraph_boundary() {
// Create text with a paragraph boundary near the split point.
let first_para = "a".repeat(4000);
let second_para = "b".repeat(200);
let text = format!("{first_para}\n\n{second_para}");
let chunks = chunk_for_whatsapp(&text);
assert_eq!(chunks.len(), 2);
assert_eq!(chunks[0], first_para);
assert_eq!(chunks[1], second_para);
}
#[test]
fn chunk_splits_on_line_boundary_when_no_paragraph_break() {
let first_line = "a".repeat(4000);
let second_line = "b".repeat(200);
let text = format!("{first_line}\n{second_line}");
let chunks = chunk_for_whatsapp(&text);
assert_eq!(chunks.len(), 2);
assert_eq!(chunks[0], first_line);
assert_eq!(chunks[1], second_line);
}
#[test]
fn chunk_hard_splits_continuous_text() {
let text = "x".repeat(WHATSAPP_MAX_MESSAGE_LEN * 2 + 100);
let chunks = chunk_for_whatsapp(&text);
assert!(chunks.len() >= 2);
for chunk in &chunks {
assert!(chunk.len() <= WHATSAPP_MAX_MESSAGE_LEN);
}
// Verify all content is preserved.
let reassembled: String = chunks.join("");
assert_eq!(reassembled.len(), text.len());
}
#[test]
fn chunk_empty_string_returns_single_empty() {
let chunks = chunk_for_whatsapp("");
assert_eq!(chunks, vec![""]);
}
// ── WhatsApp history persistence tests ──────────────────────────────
#[test]
fn save_and_load_whatsapp_history_round_trips() {
let tmp = tempfile::tempdir().unwrap();
let sk = tmp.path().join(".story_kit");
std::fs::create_dir_all(&sk).unwrap();
let mut history = HashMap::new();
history.insert(
"15551234567".to_string(),
RoomConversation {
session_id: Some("sess-abc".to_string()),
entries: vec![
ConversationEntry {
role: ConversationRole::User,
sender: "15551234567".to_string(),
content: "hello".to_string(),
},
ConversationEntry {
role: ConversationRole::Assistant,
sender: String::new(),
content: "hi there!".to_string(),
},
],
},
);
save_whatsapp_history(tmp.path(), &history);
let loaded = load_whatsapp_history(tmp.path());
assert_eq!(loaded.len(), 1);
let conv = loaded.get("15551234567").unwrap();
assert_eq!(conv.session_id.as_deref(), Some("sess-abc"));
assert_eq!(conv.entries.len(), 2);
assert_eq!(conv.entries[0].content, "hello");
assert_eq!(conv.entries[1].content, "hi there!");
}
#[test]
fn load_whatsapp_history_returns_empty_when_file_missing() {
let tmp = tempfile::tempdir().unwrap();
let history = load_whatsapp_history(tmp.path());
assert!(history.is_empty());
}
#[test]
fn load_whatsapp_history_returns_empty_on_invalid_json() {
let tmp = tempfile::tempdir().unwrap();
let sk = tmp.path().join(".story_kit");
std::fs::create_dir_all(&sk).unwrap();
std::fs::write(sk.join("whatsapp_history.json"), "not json {{{").unwrap();
let history = load_whatsapp_history(tmp.path());
assert!(history.is_empty());
}
#[test]
fn save_whatsapp_history_preserves_multiple_senders() {
let tmp = tempfile::tempdir().unwrap();
let sk = tmp.path().join(".story_kit");
std::fs::create_dir_all(&sk).unwrap();
let mut history = HashMap::new();
history.insert(
"111".to_string(),
RoomConversation {
session_id: None,
entries: vec![ConversationEntry {
role: ConversationRole::User,
sender: "111".to_string(),
content: "msg1".to_string(),
}],
},
);
history.insert(
"222".to_string(),
RoomConversation {
session_id: Some("sess-222".to_string()),
entries: vec![ConversationEntry {
role: ConversationRole::User,
sender: "222".to_string(),
content: "msg2".to_string(),
}],
},
);
save_whatsapp_history(tmp.path(), &history);
let loaded = load_whatsapp_history(tmp.path());
assert_eq!(loaded.len(), 2);
assert!(loaded.contains_key("111"));
assert!(loaded.contains_key("222"));
assert_eq!(loaded["222"].session_id.as_deref(), Some("sess-222"));
}
} }