From 1f02de8cd031a35f9edaf6556c1c2ae7f5c87c38 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 27 Apr 2026 02:32:11 +0000 Subject: [PATCH] refactor: split chat/transport/slack/commands.rs (875) into mod + llm The 875-line commands.rs is split: - llm.rs: handle_llm_message (LLM turn for non-command messages, ~190 lines) - mod.rs: SlackSlashCommandPayload + slash_command_to_bot_keyword + handle_incoming_message + tests (~700 lines) Tests stay co-located with handle_incoming_message in mod.rs. All 2636 tests pass; clippy clean. --- .../src/chat/transport/slack/commands/llm.rs | 196 ++++++++++++++++++ .../slack/{commands.rs => commands/mod.rs} | 190 +---------------- 2 files changed, 201 insertions(+), 185 deletions(-) create mode 100644 server/src/chat/transport/slack/commands/llm.rs rename server/src/chat/transport/slack/{commands.rs => commands/mod.rs} (77%) diff --git a/server/src/chat/transport/slack/commands/llm.rs b/server/src/chat/transport/slack/commands/llm.rs new file mode 100644 index 00000000..8d167104 --- /dev/null +++ b/server/src/chat/transport/slack/commands/llm.rs @@ -0,0 +1,196 @@ +//! Slack LLM message handler — runs an LLM turn for a non-command message. + +use std::sync::Arc; + +use crate::chat::ChatTransport; +use crate::chat::transport::matrix::{ConversationEntry, ConversationRole}; +use crate::http::context::PermissionDecision; +use crate::slog; + +use super::super::format::markdown_to_slack; +use super::super::history::save_slack_history; +use crate::chat::transport::slack::SlackWebhookContext; + +pub(super) async fn handle_llm_message( + ctx: &SlackWebhookContext, + channel: &str, + user: &str, + user_message: &str, +) { + use crate::chat::util::drain_complete_paragraphs; + use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult}; + use std::sync::atomic::{AtomicBool, Ordering}; + use tokio::sync::watch; + + // Look up existing session ID for this channel. + let resume_session_id: Option = { + let guard = ctx.history.lock().await; + guard.get(channel).and_then(|conv| conv.session_id.clone()) + }; + + let bot_name = &ctx.services.bot_name; + let prompt = format!( + "[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{user}: {user_message}" + ); + + let provider = ClaudeCodeProvider::new(); + let (_cancel_tx, mut cancel_rx) = watch::channel(false); + + // Channel for sending complete chunks to the Slack posting task. + let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); + let msg_tx_for_callback = msg_tx.clone(); + + // Spawn a task to post messages as they arrive. + let post_transport = Arc::clone(&ctx.transport); + let post_channel = channel.to_string(); + let post_task = tokio::spawn(async move { + while let Some(chunk) = msg_rx.recv().await { + let formatted = markdown_to_slack(&chunk); + let _ = post_transport + .send_message(&post_channel, &formatted, "") + .await; + } + }); + + // Shared buffer between the sync token callback and the async scope. + let buffer = Arc::new(std::sync::Mutex::new(String::new())); + let buffer_for_callback = Arc::clone(&buffer); + let sent_any_chunk = Arc::new(AtomicBool::new(false)); + let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk); + + let project_root_str = ctx.services.project_root.to_string_lossy().to_string(); + let chat_fut = provider.chat_stream( + &prompt, + &project_root_str, + resume_session_id.as_deref(), + None, + &mut cancel_rx, + move |token| { + let mut buf = buffer_for_callback.lock().unwrap(); + buf.push_str(token); + let paragraphs = drain_complete_paragraphs(&mut buf); + for chunk in paragraphs { + sent_any_chunk_for_callback.store(true, Ordering::Relaxed); + let _ = msg_tx_for_callback.send(chunk); + } + }, + |_thinking| {}, + |_activity| {}, + ); + tokio::pin!(chat_fut); + + // Lock the permission receiver for the duration of this chat session. + let mut perm_rx_guard = ctx.services.perm_rx.lock().await; + + let result = loop { + tokio::select! { + r = &mut chat_fut => break r, + + Some(perm_fwd) = perm_rx_guard.recv() => { + let prompt_msg = format!( + "*Permission Request*\n\nTool: `{}`\n```json\n{}\n```\n\nReply *yes* to approve or *no* to deny.", + perm_fwd.tool_name, + serde_json::to_string_pretty(&perm_fwd.tool_input) + .unwrap_or_else(|_| perm_fwd.tool_input.to_string()), + ); + let formatted = markdown_to_slack(&prompt_msg); + let _ = ctx.transport.send_message(channel, &formatted, "").await; + + // Store the response sender so the incoming message handler + // can resolve it when the user replies yes/no. + ctx.services.pending_perm_replies + .lock() + .await + .insert(channel.to_string(), perm_fwd.response_tx); + + // Spawn a timeout task: auto-deny if the user does not respond. + let pending = Arc::clone(&ctx.services.pending_perm_replies); + let timeout_channel = channel.to_string(); + let timeout_transport = Arc::clone(&ctx.transport); + let timeout_secs = ctx.services.permission_timeout_secs; + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await; + if let Some(tx) = pending.lock().await.remove(&timeout_channel) { + let _ = tx.send(PermissionDecision::Deny); + let msg = "Permission request timed out — denied (fail-closed)."; + let _ = timeout_transport.send_message(&timeout_channel, msg, "").await; + } + }); + } + } + }; + drop(perm_rx_guard); + + // Flush remaining text. + let remaining = buffer.lock().unwrap().trim().to_string(); + let did_send_any = sent_any_chunk.load(Ordering::Relaxed); + + let (assistant_reply, new_session_id) = match result { + Ok(ClaudeCodeResult { + messages, + session_id, + }) => { + let reply = if !remaining.is_empty() { + let _ = msg_tx.send(remaining.clone()); + remaining + } else if !did_send_any { + let last_text = messages + .iter() + .rev() + .find(|m| m.role == crate::llm::types::Role::Assistant && !m.content.is_empty()) + .map(|m| m.content.clone()) + .unwrap_or_default(); + if !last_text.is_empty() { + let _ = msg_tx.send(last_text.clone()); + } + last_text + } else { + remaining + }; + slog!("[slack] session_id from chat_stream: {:?}", session_id); + (reply, session_id) + } + Err(e) => { + slog!("[slack] LLM error: {e}"); + let err_msg = format!("Error processing your request: {e}"); + let _ = msg_tx.send(err_msg.clone()); + (err_msg, None) + } + }; + + // Signal the posting task to finish and wait for it. + drop(msg_tx); + let _ = post_task.await; + + // Record this exchange in conversation history. + if !assistant_reply.starts_with("Error processing") { + let mut guard = ctx.history.lock().await; + let conv = guard.entry(channel.to_string()).or_default(); + + if new_session_id.is_some() { + conv.session_id = new_session_id; + } + + conv.entries.push(ConversationEntry { + role: ConversationRole::User, + sender: user.to_string(), + content: user_message.to_string(), + }); + conv.entries.push(ConversationEntry { + role: ConversationRole::Assistant, + sender: String::new(), + content: assistant_reply, + }); + + // Trim to configured maximum. + if conv.entries.len() > ctx.history_size { + let excess = conv.entries.len() - ctx.history_size; + conv.entries.drain(..excess); + } + + save_slack_history(&ctx.services.project_root, &guard); + } +} + +// ── Tests ─────────────────────────────────────────────────────────────── + diff --git a/server/src/chat/transport/slack/commands.rs b/server/src/chat/transport/slack/commands/mod.rs similarity index 77% rename from server/src/chat/transport/slack/commands.rs rename to server/src/chat/transport/slack/commands/mod.rs index 18a78882..83050313 100644 --- a/server/src/chat/transport/slack/commands.rs +++ b/server/src/chat/transport/slack/commands/mod.rs @@ -8,12 +8,16 @@ use super::format::markdown_to_slack; use super::history::{SlackConversationHistory, save_slack_history}; use super::meta::SlackTransport; use crate::chat::ChatTransport; -use crate::chat::transport::matrix::{ConversationEntry, ConversationRole, RoomConversation}; +use crate::chat::transport::matrix::RoomConversation; use crate::chat::util::is_permission_approval; use crate::http::context::PermissionDecision; use crate::services::Services; use crate::slog; +mod llm; + +use llm::handle_llm_message; + // ── Slash command types ───────────────────────────────────────────────── /// Payload sent by Slack for slash commands (application/x-www-form-urlencoded). @@ -344,195 +348,11 @@ pub(super) async fn handle_incoming_message( } /// Forward a message to Claude Code and send the response back via Slack. -async fn handle_llm_message( - ctx: &SlackWebhookContext, - channel: &str, - user: &str, - user_message: &str, -) { - use crate::chat::util::drain_complete_paragraphs; - use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult}; - use std::sync::atomic::{AtomicBool, Ordering}; - use tokio::sync::watch; - - // Look up existing session ID for this channel. - let resume_session_id: Option = { - let guard = ctx.history.lock().await; - guard.get(channel).and_then(|conv| conv.session_id.clone()) - }; - - let bot_name = &ctx.services.bot_name; - let prompt = format!( - "[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{user}: {user_message}" - ); - - let provider = ClaudeCodeProvider::new(); - let (_cancel_tx, mut cancel_rx) = watch::channel(false); - - // Channel for sending complete chunks to the Slack posting task. - let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); - let msg_tx_for_callback = msg_tx.clone(); - - // Spawn a task to post messages as they arrive. - let post_transport = Arc::clone(&ctx.transport); - let post_channel = channel.to_string(); - let post_task = tokio::spawn(async move { - while let Some(chunk) = msg_rx.recv().await { - let formatted = markdown_to_slack(&chunk); - let _ = post_transport - .send_message(&post_channel, &formatted, "") - .await; - } - }); - - // Shared buffer between the sync token callback and the async scope. - let buffer = Arc::new(std::sync::Mutex::new(String::new())); - let buffer_for_callback = Arc::clone(&buffer); - let sent_any_chunk = Arc::new(AtomicBool::new(false)); - let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk); - - let project_root_str = ctx.services.project_root.to_string_lossy().to_string(); - let chat_fut = provider.chat_stream( - &prompt, - &project_root_str, - resume_session_id.as_deref(), - None, - &mut cancel_rx, - move |token| { - let mut buf = buffer_for_callback.lock().unwrap(); - buf.push_str(token); - let paragraphs = drain_complete_paragraphs(&mut buf); - for chunk in paragraphs { - sent_any_chunk_for_callback.store(true, Ordering::Relaxed); - let _ = msg_tx_for_callback.send(chunk); - } - }, - |_thinking| {}, - |_activity| {}, - ); - tokio::pin!(chat_fut); - - // Lock the permission receiver for the duration of this chat session. - let mut perm_rx_guard = ctx.services.perm_rx.lock().await; - - let result = loop { - tokio::select! { - r = &mut chat_fut => break r, - - Some(perm_fwd) = perm_rx_guard.recv() => { - let prompt_msg = format!( - "*Permission Request*\n\nTool: `{}`\n```json\n{}\n```\n\nReply *yes* to approve or *no* to deny.", - perm_fwd.tool_name, - serde_json::to_string_pretty(&perm_fwd.tool_input) - .unwrap_or_else(|_| perm_fwd.tool_input.to_string()), - ); - let formatted = markdown_to_slack(&prompt_msg); - let _ = ctx.transport.send_message(channel, &formatted, "").await; - - // Store the response sender so the incoming message handler - // can resolve it when the user replies yes/no. - ctx.services.pending_perm_replies - .lock() - .await - .insert(channel.to_string(), perm_fwd.response_tx); - - // Spawn a timeout task: auto-deny if the user does not respond. - let pending = Arc::clone(&ctx.services.pending_perm_replies); - let timeout_channel = channel.to_string(); - let timeout_transport = Arc::clone(&ctx.transport); - let timeout_secs = ctx.services.permission_timeout_secs; - tokio::spawn(async move { - tokio::time::sleep(std::time::Duration::from_secs(timeout_secs)).await; - if let Some(tx) = pending.lock().await.remove(&timeout_channel) { - let _ = tx.send(PermissionDecision::Deny); - let msg = "Permission request timed out — denied (fail-closed)."; - let _ = timeout_transport.send_message(&timeout_channel, msg, "").await; - } - }); - } - } - }; - drop(perm_rx_guard); - - // Flush remaining text. - let remaining = buffer.lock().unwrap().trim().to_string(); - let did_send_any = sent_any_chunk.load(Ordering::Relaxed); - - let (assistant_reply, new_session_id) = match result { - Ok(ClaudeCodeResult { - messages, - session_id, - }) => { - let reply = if !remaining.is_empty() { - let _ = msg_tx.send(remaining.clone()); - remaining - } else if !did_send_any { - let last_text = messages - .iter() - .rev() - .find(|m| m.role == crate::llm::types::Role::Assistant && !m.content.is_empty()) - .map(|m| m.content.clone()) - .unwrap_or_default(); - if !last_text.is_empty() { - let _ = msg_tx.send(last_text.clone()); - } - last_text - } else { - remaining - }; - slog!("[slack] session_id from chat_stream: {:?}", session_id); - (reply, session_id) - } - Err(e) => { - slog!("[slack] LLM error: {e}"); - let err_msg = format!("Error processing your request: {e}"); - let _ = msg_tx.send(err_msg.clone()); - (err_msg, None) - } - }; - - // Signal the posting task to finish and wait for it. - drop(msg_tx); - let _ = post_task.await; - - // Record this exchange in conversation history. - if !assistant_reply.starts_with("Error processing") { - let mut guard = ctx.history.lock().await; - let conv = guard.entry(channel.to_string()).or_default(); - - if new_session_id.is_some() { - conv.session_id = new_session_id; - } - - conv.entries.push(ConversationEntry { - role: ConversationRole::User, - sender: user.to_string(), - content: user_message.to_string(), - }); - conv.entries.push(ConversationEntry { - role: ConversationRole::Assistant, - sender: String::new(), - content: assistant_reply, - }); - - // Trim to configured maximum. - if conv.entries.len() > ctx.history_size { - let excess = conv.entries.len() - ctx.history_size; - conv.entries.drain(..excess); - } - - save_slack_history(&ctx.services.project_root, &guard); - } -} - -// ── Tests ─────────────────────────────────────────────────────────────── - #[cfg(test)] mod tests { use super::*; use std::collections::HashMap; - // ── Slash command types ──────────────────────────────────────────── #[test] fn parse_slash_command_payload() {