From 404fd396f528c9e0e18415030843ff359d8fbf66 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 27 Apr 2026 02:37:22 +0000 Subject: [PATCH] refactor: split chat/transport/whatsapp/commands.rs (837) into mod + llm The 837-line commands.rs is split: - llm.rs: handle_llm_message (LLM turn for non-command messages, ~195 lines) - mod.rs: handle_incoming_message + tests (~660 lines) Tests stay co-located with handle_incoming_message in mod.rs. All 2636 tests pass; clippy clean. --- .../chat/transport/whatsapp/commands/llm.rs | 200 ++++++++++++++++++ .../whatsapp/{commands.rs => commands/mod.rs} | 194 +---------------- 2 files changed, 206 insertions(+), 188 deletions(-) create mode 100644 server/src/chat/transport/whatsapp/commands/llm.rs rename server/src/chat/transport/whatsapp/{commands.rs => commands/mod.rs} (75%) diff --git a/server/src/chat/transport/whatsapp/commands/llm.rs b/server/src/chat/transport/whatsapp/commands/llm.rs new file mode 100644 index 00000000..76973ba6 --- /dev/null +++ b/server/src/chat/transport/whatsapp/commands/llm.rs @@ -0,0 +1,200 @@ +//! WhatsApp LLM message handler — runs an LLM turn for a non-command message. + +use std::sync::Arc; + +use crate::chat::transport::matrix::{ConversationEntry, ConversationRole}; +use crate::http::context::PermissionDecision; +use crate::slog; + +use super::super::WhatsAppWebhookContext; +use super::super::format::{chunk_for_whatsapp, markdown_to_whatsapp}; +use super::super::history::save_whatsapp_history; + +pub(super) async fn handle_llm_message( + ctx: &WhatsAppWebhookContext, + sender: &str, + user_message: &str, +) { + use crate::chat::util::drain_complete_paragraphs; + use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult}; + use std::sync::atomic::{AtomicBool, Ordering}; + use tokio::sync::watch; + + // Look up existing session ID for this sender. + let resume_session_id: Option = { + let guard = ctx.history.lock().await; + guard.get(sender).and_then(|conv| conv.session_id.clone()) + }; + + let bot_name = &ctx.services.bot_name; + let prompt = format!( + "[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{sender}: {user_message}" + ); + + let provider = ClaudeCodeProvider::new(); + let (_cancel_tx, mut cancel_rx) = watch::channel(false); + + // Channel for sending complete chunks to the WhatsApp posting task. + let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); + let msg_tx_for_callback = msg_tx.clone(); + + // Spawn a task to post messages as they arrive. + let post_transport = Arc::clone(&ctx.transport); + let post_sender = sender.to_string(); + let post_task = tokio::spawn(async move { + while let Some(chunk) = msg_rx.recv().await { + // Convert Markdown to WhatsApp formatting, then split into sized chunks. + let formatted = markdown_to_whatsapp(&chunk); + for part in chunk_for_whatsapp(&formatted) { + let _ = post_transport.send_message(&post_sender, &part, "").await; + } + } + }); + + // Shared buffer between the sync token callback and the async scope. + let buffer = Arc::new(std::sync::Mutex::new(String::new())); + let buffer_for_callback = Arc::clone(&buffer); + let sent_any_chunk = Arc::new(AtomicBool::new(false)); + let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk); + + let project_root_str = ctx.services.project_root.to_string_lossy().to_string(); + let chat_fut = provider.chat_stream( + &prompt, + &project_root_str, + resume_session_id.as_deref(), + None, + &mut cancel_rx, + move |token| { + let mut buf = buffer_for_callback.lock().unwrap(); + buf.push_str(token); + let paragraphs = drain_complete_paragraphs(&mut buf); + for chunk in paragraphs { + sent_any_chunk_for_callback.store(true, Ordering::Relaxed); + let _ = msg_tx_for_callback.send(chunk); + } + }, + |_thinking| {}, + |_activity| {}, + ); + tokio::pin!(chat_fut); + + // Lock the permission receiver for the duration of this chat session. + let mut perm_rx_guard = ctx.services.perm_rx.lock().await; + + let result = loop { + tokio::select! { + r = &mut chat_fut => break r, + + Some(perm_fwd) = perm_rx_guard.recv() => { + let prompt_msg = format!( + "*Permission Request*\n\nTool: `{}`\n```json\n{}\n```\n\nReply *yes* to approve or *no* to deny.", + perm_fwd.tool_name, + serde_json::to_string_pretty(&perm_fwd.tool_input) + .unwrap_or_else(|_| perm_fwd.tool_input.to_string()), + ); + let formatted = markdown_to_whatsapp(&prompt_msg); + for part in chunk_for_whatsapp(&formatted) { + let _ = ctx.transport.send_message(sender, &part, "").await; + } + + // Store the response sender so the incoming message handler + // can resolve it when the user replies yes/no. + ctx.services.pending_perm_replies + .lock() + .await + .insert(sender.to_string(), perm_fwd.response_tx); + + // Spawn a timeout task: auto-deny if the user does not respond. + let pending = Arc::clone(&ctx.services.pending_perm_replies); + let timeout_sender = sender.to_string(); + let timeout_transport = Arc::clone(&ctx.transport); + let timeout_secs = ctx.services.permission_timeout_secs; + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await; + if let Some(tx) = pending.lock().await.remove(&timeout_sender) { + let _ = tx.send(PermissionDecision::Deny); + let msg = "Permission request timed out — denied (fail-closed)."; + let _ = timeout_transport.send_message(&timeout_sender, msg, "").await; + } + }); + } + } + }; + drop(perm_rx_guard); + + // Flush remaining text. + let remaining = buffer.lock().unwrap().trim().to_string(); + let did_send_any = sent_any_chunk.load(Ordering::Relaxed); + + let (assistant_reply, new_session_id) = match result { + Ok(ClaudeCodeResult { + messages, + session_id, + }) => { + let reply = if !remaining.is_empty() { + let _ = msg_tx.send(remaining.clone()); + remaining + } else if !did_send_any { + let last_text = messages + .iter() + .rev() + .find(|m| m.role == crate::llm::types::Role::Assistant && !m.content.is_empty()) + .map(|m| m.content.clone()) + .unwrap_or_default(); + if !last_text.is_empty() { + let _ = msg_tx.send(last_text.clone()); + } + last_text + } else { + remaining + }; + slog!("[whatsapp] session_id from chat_stream: {:?}", session_id); + (reply, session_id) + } + Err(e) => { + slog!("[whatsapp] LLM error: {e}"); + let err_msg = if let Some(url) = crate::llm::oauth::extract_login_url_from_error(&e) { + format!("Authentication required. Log in to Claude here: {url}") + } else { + format!("Error processing your request: {e}") + }; + let _ = msg_tx.send(err_msg.clone()); + (err_msg, None) + } + }; + + // Signal the posting task to finish and wait for it. + drop(msg_tx); + let _ = post_task.await; + + // Record this exchange in conversation history. + if !assistant_reply.starts_with("Error processing") { + let mut guard = ctx.history.lock().await; + let conv = guard.entry(sender.to_string()).or_default(); + + if new_session_id.is_some() { + conv.session_id = new_session_id; + } + + conv.entries.push(ConversationEntry { + role: ConversationRole::User, + sender: sender.to_string(), + content: user_message.to_string(), + }); + conv.entries.push(ConversationEntry { + role: ConversationRole::Assistant, + sender: String::new(), + content: assistant_reply, + }); + + // Trim to configured maximum. + if conv.entries.len() > ctx.history_size { + let excess = conv.entries.len() - ctx.history_size; + conv.entries.drain(..excess); + } + + save_whatsapp_history(&ctx.services.project_root, &guard); + } +} + +// ── Tests ─────────────────────────────────────────────────────────────── diff --git a/server/src/chat/transport/whatsapp/commands.rs b/server/src/chat/transport/whatsapp/commands/mod.rs similarity index 75% rename from server/src/chat/transport/whatsapp/commands.rs rename to server/src/chat/transport/whatsapp/commands/mod.rs index 1338e0e5..842d9d1f 100644 --- a/server/src/chat/transport/whatsapp/commands.rs +++ b/server/src/chat/transport/whatsapp/commands/mod.rs @@ -1,14 +1,17 @@ //! WhatsApp command handling — processes incoming WhatsApp messages as bot commands. -use std::sync::Arc; use super::WhatsAppWebhookContext; -use super::format::{chunk_for_whatsapp, markdown_to_whatsapp}; +use super::format::markdown_to_whatsapp; use super::history::save_whatsapp_history; -use crate::chat::transport::matrix::{ConversationEntry, ConversationRole, RoomConversation}; +use crate::chat::transport::matrix::RoomConversation; use crate::chat::util::is_permission_approval; use crate::http::context::PermissionDecision; use crate::slog; +mod llm; + +use llm::handle_llm_message; + /// Dispatch an incoming WhatsApp message to bot commands. pub(super) async fn handle_incoming_message( ctx: &WhatsAppWebhookContext, @@ -252,191 +255,6 @@ pub(super) async fn handle_incoming_message( } /// Forward a message to Claude Code and send the response back via WhatsApp. -async fn handle_llm_message(ctx: &WhatsAppWebhookContext, sender: &str, user_message: &str) { - use crate::chat::util::drain_complete_paragraphs; - use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult}; - use std::sync::atomic::{AtomicBool, Ordering}; - use tokio::sync::watch; - - // Look up existing session ID for this sender. - let resume_session_id: Option = { - let guard = ctx.history.lock().await; - guard.get(sender).and_then(|conv| conv.session_id.clone()) - }; - - let bot_name = &ctx.services.bot_name; - let prompt = format!( - "[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{sender}: {user_message}" - ); - - let provider = ClaudeCodeProvider::new(); - let (_cancel_tx, mut cancel_rx) = watch::channel(false); - - // Channel for sending complete chunks to the WhatsApp posting task. - let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); - let msg_tx_for_callback = msg_tx.clone(); - - // Spawn a task to post messages as they arrive. - let post_transport = Arc::clone(&ctx.transport); - let post_sender = sender.to_string(); - let post_task = tokio::spawn(async move { - while let Some(chunk) = msg_rx.recv().await { - // Convert Markdown to WhatsApp formatting, then split into sized chunks. - let formatted = markdown_to_whatsapp(&chunk); - for part in chunk_for_whatsapp(&formatted) { - let _ = post_transport.send_message(&post_sender, &part, "").await; - } - } - }); - - // Shared buffer between the sync token callback and the async scope. - let buffer = Arc::new(std::sync::Mutex::new(String::new())); - let buffer_for_callback = Arc::clone(&buffer); - let sent_any_chunk = Arc::new(AtomicBool::new(false)); - let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk); - - let project_root_str = ctx.services.project_root.to_string_lossy().to_string(); - let chat_fut = provider.chat_stream( - &prompt, - &project_root_str, - resume_session_id.as_deref(), - None, - &mut cancel_rx, - move |token| { - let mut buf = buffer_for_callback.lock().unwrap(); - buf.push_str(token); - let paragraphs = drain_complete_paragraphs(&mut buf); - for chunk in paragraphs { - sent_any_chunk_for_callback.store(true, Ordering::Relaxed); - let _ = msg_tx_for_callback.send(chunk); - } - }, - |_thinking| {}, - |_activity| {}, - ); - tokio::pin!(chat_fut); - - // Lock the permission receiver for the duration of this chat session. - let mut perm_rx_guard = ctx.services.perm_rx.lock().await; - - let result = loop { - tokio::select! { - r = &mut chat_fut => break r, - - Some(perm_fwd) = perm_rx_guard.recv() => { - let prompt_msg = format!( - "*Permission Request*\n\nTool: `{}`\n```json\n{}\n```\n\nReply *yes* to approve or *no* to deny.", - perm_fwd.tool_name, - serde_json::to_string_pretty(&perm_fwd.tool_input) - .unwrap_or_else(|_| perm_fwd.tool_input.to_string()), - ); - let formatted = markdown_to_whatsapp(&prompt_msg); - for part in chunk_for_whatsapp(&formatted) { - let _ = ctx.transport.send_message(sender, &part, "").await; - } - - // Store the response sender so the incoming message handler - // can resolve it when the user replies yes/no. - ctx.services.pending_perm_replies - .lock() - .await - .insert(sender.to_string(), perm_fwd.response_tx); - - // Spawn a timeout task: auto-deny if the user does not respond. - let pending = Arc::clone(&ctx.services.pending_perm_replies); - let timeout_sender = sender.to_string(); - let timeout_transport = Arc::clone(&ctx.transport); - let timeout_secs = ctx.services.permission_timeout_secs; - tokio::spawn(async move { - tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await; - if let Some(tx) = pending.lock().await.remove(&timeout_sender) { - let _ = tx.send(PermissionDecision::Deny); - let msg = "Permission request timed out — denied (fail-closed)."; - let _ = timeout_transport.send_message(&timeout_sender, msg, "").await; - } - }); - } - } - }; - drop(perm_rx_guard); - - // Flush remaining text. - let remaining = buffer.lock().unwrap().trim().to_string(); - let did_send_any = sent_any_chunk.load(Ordering::Relaxed); - - let (assistant_reply, new_session_id) = match result { - Ok(ClaudeCodeResult { - messages, - session_id, - }) => { - let reply = if !remaining.is_empty() { - let _ = msg_tx.send(remaining.clone()); - remaining - } else if !did_send_any { - let last_text = messages - .iter() - .rev() - .find(|m| m.role == crate::llm::types::Role::Assistant && !m.content.is_empty()) - .map(|m| m.content.clone()) - .unwrap_or_default(); - if !last_text.is_empty() { - let _ = msg_tx.send(last_text.clone()); - } - last_text - } else { - remaining - }; - slog!("[whatsapp] session_id from chat_stream: {:?}", session_id); - (reply, session_id) - } - Err(e) => { - slog!("[whatsapp] LLM error: {e}"); - let err_msg = if let Some(url) = crate::llm::oauth::extract_login_url_from_error(&e) { - format!("Authentication required. Log in to Claude here: {url}") - } else { - format!("Error processing your request: {e}") - }; - let _ = msg_tx.send(err_msg.clone()); - (err_msg, None) - } - }; - - // Signal the posting task to finish and wait for it. - drop(msg_tx); - let _ = post_task.await; - - // Record this exchange in conversation history. - if !assistant_reply.starts_with("Error processing") { - let mut guard = ctx.history.lock().await; - let conv = guard.entry(sender.to_string()).or_default(); - - if new_session_id.is_some() { - conv.session_id = new_session_id; - } - - conv.entries.push(ConversationEntry { - role: ConversationRole::User, - sender: sender.to_string(), - content: user_message.to_string(), - }); - conv.entries.push(ConversationEntry { - role: ConversationRole::Assistant, - sender: String::new(), - content: assistant_reply, - }); - - // Trim to configured maximum. - if conv.entries.len() > ctx.history_size { - let excess = conv.entries.len() - ctx.history_size; - conv.entries.drain(..excess); - } - - save_whatsapp_history(&ctx.services.project_root, &guard); - } -} - -// ── Tests ─────────────────────────────────────────────────────────────── - #[cfg(test)] mod tests { use super::super::WhatsAppWebhookContext;