diff --git a/server/src/chat/transport/matrix/bot/messages/handle_message.rs b/server/src/chat/transport/matrix/bot/messages/handle_message.rs new file mode 100644 index 00000000..cdb8996d --- /dev/null +++ b/server/src/chat/transport/matrix/bot/messages/handle_message.rs @@ -0,0 +1,268 @@ +//! Matrix handle_message — runs the LLM turn for a verified incoming message and +//! streams the assistant reply back to the room. + +use crate::chat::util::drain_complete_paragraphs; +use crate::http::context::PermissionDecision; +use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult}; +use crate::slog; +use matrix_sdk::ruma::OwnedRoomId; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::Duration; +use tokio::sync::watch; + +use super::super::context::BotContext; +use super::super::format::markdown_to_html; +use super::super::history::{ConversationEntry, ConversationRole, save_history}; + +use super::format_user_prompt; + +pub(in crate::chat::transport::matrix::bot) async fn handle_message( + room_id_str: String, + room_id: OwnedRoomId, + ctx: BotContext, + sender: String, + user_message: String, +) { + // Look up the room's existing Claude Code session ID (if any) so we can + // resume the conversation with structured API messages instead of + // flattening history into a text prefix. + let resume_session_id: Option = { + let guard = ctx.history.lock().await; + guard.get(&room_id).and_then(|conv| conv.session_id.clone()) + }; + + // The prompt is just the current message with sender attribution. + // Prior conversation context is carried by the Claude Code session. + let bot_name = &ctx.services.bot_name; + let active_project_ctx = if let Some(ref ap) = ctx.gateway_active_project { + let name = ap.read().await.clone(); + format!("[Active project: {name}]\n") + } else { + String::new() + }; + let prompt = format!( + "[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n{active_project_ctx}\n{}", + format_user_prompt(&sender, &user_message) + ); + + let provider = ClaudeCodeProvider::new(); + let (cancel_tx, mut cancel_rx) = watch::channel(false); + // Keep the sender alive for the duration of the call. + let _cancel_tx = cancel_tx; + + // Channel for sending complete paragraphs to the Matrix posting task. + let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); + let msg_tx_for_callback = msg_tx.clone(); + + // Spawn a task to post messages via the transport as they arrive so we + // don't block the LLM stream while waiting for send round-trips. + let post_transport = Arc::clone(&ctx.transport); + let post_room_id = room_id_str.clone(); + let sent_ids = Arc::clone(&ctx.bot_sent_event_ids); + let sent_ids_for_post = Arc::clone(&sent_ids); + let post_task = tokio::spawn(async move { + while let Some(chunk) = msg_rx.recv().await { + let html = markdown_to_html(&chunk); + if let Ok(msg_id) = post_transport + .send_message(&post_room_id, &chunk, &html) + .await + && let Ok(event_id) = msg_id.parse() + { + sent_ids_for_post.lock().await.insert(event_id); + } + } + }); + + // Shared state between the sync token callback and the async outer scope. + let buffer = Arc::new(std::sync::Mutex::new(String::new())); + let buffer_for_callback = Arc::clone(&buffer); + let sent_any_chunk = Arc::new(AtomicBool::new(false)); + let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk); + + // In gateway mode, run Claude Code in the gateway config directory so it + // picks up the `.mcp.json` that points to the gateway's MCP proxy endpoint. + // The gateway proxies tool calls to the active project automatically. + // In standalone mode, use the project root directly. + let project_root_str = if ctx.is_gateway() { + ctx.services.project_root.to_string_lossy().to_string() + } else { + ctx.effective_project_root() + .await + .to_string_lossy() + .to_string() + }; + let chat_fut = provider.chat_stream( + &prompt, + &project_root_str, + resume_session_id.as_deref(), + None, + &mut cancel_rx, + move |token| { + let mut buf = buffer_for_callback.lock().unwrap(); + buf.push_str(token); + // Flush complete paragraphs as they arrive. + let paragraphs = drain_complete_paragraphs(&mut buf); + for chunk in paragraphs { + sent_any_chunk_for_callback.store(true, Ordering::Relaxed); + let _ = msg_tx_for_callback.send(chunk); + } + }, + |_thinking| {}, // Discard thinking tokens + |_activity| {}, // Discard activity signals + ); + tokio::pin!(chat_fut); + + // Lock the permission receiver for the duration of this chat session. + // Permission requests from the MCP `prompt_permission` tool arrive here. + let mut perm_rx_guard = ctx.services.perm_rx.lock().await; + + let result = loop { + tokio::select! { + r = &mut chat_fut => break r, + + Some(perm_fwd) = perm_rx_guard.recv() => { + // Post the permission prompt to the room via the transport. + let prompt_msg = format!( + "**Permission Request**\n\n\ + Tool: `{}`\n```json\n{}\n```\n\n\ + Reply **yes** to approve or **no** to deny.", + perm_fwd.tool_name, + serde_json::to_string_pretty(&perm_fwd.tool_input) + .unwrap_or_else(|_| perm_fwd.tool_input.to_string()), + ); + let html = markdown_to_html(&prompt_msg); + if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &prompt_msg, &html).await + && let Ok(event_id) = msg_id.parse() + { + sent_ids.lock().await.insert(event_id); + } + + // Store the MCP oneshot sender so the event handler can + // resolve it when the user replies yes/no. + ctx.services.pending_perm_replies + .lock() + .await + .insert(room_id.to_string(), perm_fwd.response_tx); + + // Spawn a timeout task: auto-deny if the user does not respond. + let pending = Arc::clone(&ctx.services.pending_perm_replies); + let timeout_room_id = room_id.to_string(); + let timeout_transport = Arc::clone(&ctx.transport); + let timeout_room_id_str = room_id_str.clone(); + let timeout_sent_ids = Arc::clone(&ctx.bot_sent_event_ids); + let timeout_secs = ctx.services.permission_timeout_secs; + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(timeout_secs)).await; + if let Some(tx) = pending.lock().await.remove(&timeout_room_id) { + let _ = tx.send(PermissionDecision::Deny); + let msg = "Permission request timed out — denied (fail-closed)."; + let html = markdown_to_html(msg); + if let Ok(msg_id) = timeout_transport + .send_message(&timeout_room_id_str, msg, &html) + .await + && let Ok(event_id) = msg_id.parse() + { + timeout_sent_ids.lock().await.insert(event_id); + } + } + }); + } + } + }; + drop(perm_rx_guard); + + // Flush any remaining text that didn't end with a paragraph boundary. + let remaining = buffer.lock().unwrap().trim().to_string(); + let did_send_any = sent_any_chunk.load(Ordering::Relaxed); + + let (assistant_reply, new_session_id) = match result { + Ok(ClaudeCodeResult { + messages, + session_id, + }) => { + let reply = if !remaining.is_empty() { + let _ = msg_tx.send(remaining.clone()); + remaining + } else if !did_send_any { + // Nothing was streamed at all (e.g. only tool calls with no + // final text) — fall back to the last assistant message from + // the structured result. + let last_text = messages + .iter() + .rev() + .find(|m| m.role == crate::llm::types::Role::Assistant && !m.content.is_empty()) + .map(|m| m.content.clone()) + .unwrap_or_default(); + if !last_text.is_empty() { + let _ = msg_tx.send(last_text.clone()); + } + last_text + } else { + remaining + }; + slog!("[matrix-bot] session_id from chat_stream: {:?}", session_id); + (reply, session_id) + } + Err(e) => { + slog!("[matrix-bot] LLM error: {e}"); + let err_msg = if let Some(url) = crate::llm::oauth::extract_login_url_from_error(&e) { + format!("Authentication required. [Click here to log in to Claude]({url})") + } else { + format!("Error processing your request: {e}") + }; + let _ = msg_tx.send(err_msg.clone()); + (err_msg, None) + } + }; + + // Drop the sender to signal the posting task that no more messages will + // arrive, then wait for all pending Matrix sends to complete. + drop(msg_tx); + let _ = post_task.await; + + // Record this exchange in the per-room conversation history and persist + // the session ID so the next turn resumes with structured API messages. + if !assistant_reply.starts_with("Error processing") { + let mut guard = ctx.history.lock().await; + let conv = guard.entry(room_id).or_default(); + + // Store the session ID so the next turn uses --resume. + slog!( + "[matrix-bot] storing session_id: {:?} (was: {:?})", + new_session_id, + conv.session_id + ); + if new_session_id.is_some() { + conv.session_id = new_session_id; + } + + conv.entries.push(ConversationEntry { + role: ConversationRole::User, + sender: sender.clone(), + content: user_message, + }); + conv.entries.push(ConversationEntry { + role: ConversationRole::Assistant, + sender: String::new(), + content: assistant_reply, + }); + + // Trim to the configured maximum, dropping the oldest entries first. + // The session_id is preserved: Claude Code's --resume loads the full + // conversation from its own session transcript on disk, so trimming + // our local tracking doesn't affect the LLM's context. + if conv.entries.len() > ctx.history_size { + let excess = conv.entries.len() - ctx.history_size; + conv.entries.drain(..excess); + } + + // Persist to disk so history survives server restarts. + save_history(&ctx.services.project_root, &guard); + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + diff --git a/server/src/chat/transport/matrix/bot/messages/mod.rs b/server/src/chat/transport/matrix/bot/messages/mod.rs new file mode 100644 index 00000000..a3f92049 --- /dev/null +++ b/server/src/chat/transport/matrix/bot/messages/mod.rs @@ -0,0 +1,66 @@ +//! Matrix message handler — processes incoming room messages and dispatches commands. + +mod handle_message; +mod on_room_message; + +pub(in crate::chat::transport::matrix::bot) use handle_message::handle_message; +pub(in crate::chat::transport::matrix::bot) use on_room_message::on_room_message; + +/// sender is included so the LLM can distinguish participants. +pub(super) fn format_user_prompt(sender: &str, message: &str) -> String { + format!("{sender}: {message}") +} + +/// Matrix event handler for room messages. Each invocation spawns an +#[cfg(test)] +mod tests { + use super::*; + + // -- format_user_prompt ------------------------------------------------- + + #[test] + fn format_user_prompt_includes_sender_and_message() { + let prompt = format_user_prompt("@alice:example.com", "Hello!"); + assert_eq!(prompt, "@alice:example.com: Hello!"); + } + + #[test] + fn format_user_prompt_different_users() { + let prompt = format_user_prompt("@bob:example.com", "What's up?"); + assert_eq!(prompt, "@bob:example.com: What's up?"); + } + + // -- OAuth login link formatting ---------------------------------------- + + #[test] + fn oauth_error_produces_login_link() { + let err = "OAuth session expired or credentials missing. Please log in: http://localhost:3001/oauth/authorize"; + let url = crate::llm::oauth::extract_login_url_from_error(err); + assert!(url.is_some(), "should extract URL from OAuth error"); + let msg = format!( + "Authentication required. [Click here to log in to Claude]({})", + url.unwrap() + ); + assert!(msg.contains("http://localhost:3001/oauth/authorize")); + assert!(msg.contains("[Click here to log in to Claude]")); + } + + #[test] + fn non_oauth_error_not_formatted_as_link() { + let err = "Some unrelated error"; + assert!(crate::llm::oauth::extract_login_url_from_error(err).is_none()); + } + + // -- bot_name / system prompt ------------------------------------------- + + #[test] + fn bot_name_system_prompt_format() { + let bot_name = "Timmy"; + let system_prompt = + format!("Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude."); + assert_eq!( + system_prompt, + "Your name is Timmy. Refer to yourself as Timmy, not Claude." + ); + } +} diff --git a/server/src/chat/transport/matrix/bot/messages.rs b/server/src/chat/transport/matrix/bot/messages/on_room_message.rs similarity index 57% rename from server/src/chat/transport/matrix/bot/messages.rs rename to server/src/chat/transport/matrix/bot/messages/on_room_message.rs index e4ca5a43..40b43cb8 100644 --- a/server/src/chat/transport/matrix/bot/messages.rs +++ b/server/src/chat/transport/matrix/bot/messages/on_room_message.rs @@ -1,37 +1,27 @@ -//! Matrix message handler — processes incoming room messages and dispatches commands. -use crate::chat::util::{drain_complete_paragraphs, is_permission_approval}; +//! Matrix on_room_message handler — receives an incoming Matrix room event and +//! dispatches it to handle_message after access checks and command routing. + +use crate::chat::util::is_permission_approval; use crate::http::context::PermissionDecision; -use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult}; use crate::slog; use matrix_sdk::{ Client, event_handler::Ctx, room::Room, ruma::{ - OwnedRoomId, events::room::message::{MessageType, OriginalSyncRoomMessageEvent}, }, }; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::Duration; -use tokio::sync::watch; -use super::context::BotContext; -use super::format::markdown_to_html; -use super::history::{ConversationEntry, ConversationRole, save_history}; -use super::mentions::{is_addressed_to_other, is_reply_to_bot, mentions_bot}; -use super::verification::check_sender_verified; +use super::super::context::BotContext; +use super::super::mentions::{is_addressed_to_other, is_reply_to_bot, mentions_bot}; +use super::super::format::markdown_to_html; +use super::super::verification::check_sender_verified; -/// Build the user-facing prompt for a single turn. In multi-user rooms the -/// sender is included so the LLM can distinguish participants. -pub(super) fn format_user_prompt(sender: &str, message: &str) -> String { - format!("{sender}: {message}") -} +use super::handle_message; -/// Matrix event handler for room messages. Each invocation spawns an -/// independent task so the sync loop is not blocked by LLM calls. -pub(super) async fn on_room_message( +pub(in crate::chat::transport::matrix::bot) async fn on_room_message( ev: OriginalSyncRoomMessageEvent, room: Room, client: Client, @@ -261,14 +251,14 @@ pub(super) async fn on_room_message( // Check for bot-level commands (help, status, ambient, …) before invoking // the LLM. All commands are registered in commands.rs — no special-casing // needed here. - let dispatch = super::super::commands::CommandDispatch { + let dispatch = super::super::super::commands::CommandDispatch { services: &ctx.services, project_root: &effective_root, bot_user_id: ctx.matrix_user_id.as_str(), room_id: &room_id_str, }; if let Some((response, response_html)) = - super::super::commands::try_handle_command_with_html(&dispatch, &user_message) + super::super::super::commands::try_handle_command_with_html(&dispatch, &user_message) { slog!("[matrix-bot] Handled bot command from {sender}"); if let Ok(msg_id) = ctx @@ -284,20 +274,20 @@ pub(super) async fn on_room_message( // Check for the assign command, which requires async agent ops (stop + // start) and cannot be handled by the sync command registry. - if let Some(assign_cmd) = super::super::assign::extract_assign_command( + if let Some(assign_cmd) = super::super::super::assign::extract_assign_command( &user_message, &ctx.services.bot_name, ctx.matrix_user_id.as_str(), ) { let response = match assign_cmd { - super::super::assign::AssignCommand::Assign { + super::super::super::assign::AssignCommand::Assign { story_number, model, } => { slog!( "[matrix-bot] Handling assign command from {sender}: story {story_number} model={model}" ); - super::super::assign::handle_assign( + super::super::super::assign::handle_assign( &ctx.services.bot_name, &story_number, &model, @@ -306,7 +296,7 @@ pub(super) async fn on_room_message( ) .await } - super::super::assign::AssignCommand::BadArgs => { + super::super::super::assign::AssignCommand::BadArgs => { format!( "Usage: `{} assign ` (e.g. `assign 42 opus`)", ctx.services.bot_name @@ -327,23 +317,23 @@ pub(super) async fn on_room_message( // Check for the htop command, which requires async Matrix access (Room) // and cannot be handled by the sync command registry. - if let Some(htop_cmd) = super::super::htop::extract_htop_command( + if let Some(htop_cmd) = super::super::super::htop::extract_htop_command( &user_message, &ctx.services.bot_name, ctx.matrix_user_id.as_str(), ) { slog!("[matrix-bot] Handling htop command from {sender}: {htop_cmd:?}"); match htop_cmd { - super::super::htop::HtopCommand::Stop => { - super::super::htop::handle_htop_stop( + super::super::super::htop::HtopCommand::Stop => { + super::super::super::htop::handle_htop_stop( &*ctx.transport, &room_id_str, &ctx.htop_sessions, ) .await; } - super::super::htop::HtopCommand::Start { duration_secs } => { - super::super::htop::handle_htop_start( + super::super::super::htop::HtopCommand::Start { duration_secs } => { + super::super::super::htop::handle_htop_start( &ctx.transport, &room_id_str, &ctx.htop_sessions, @@ -358,15 +348,15 @@ pub(super) async fn on_room_message( // Check for the delete command, which requires async agent/worktree ops // and cannot be handled by the sync command registry. - if let Some(del_cmd) = super::super::delete::extract_delete_command( + if let Some(del_cmd) = super::super::super::delete::extract_delete_command( &user_message, &ctx.services.bot_name, ctx.matrix_user_id.as_str(), ) { let response = match del_cmd { - super::super::delete::DeleteCommand::Delete { story_number } => { + super::super::super::delete::DeleteCommand::Delete { story_number } => { slog!("[matrix-bot] Handling delete command from {sender}: story {story_number}"); - super::super::delete::handle_delete( + super::super::super::delete::handle_delete( &ctx.services.bot_name, &story_number, &effective_root, @@ -374,7 +364,7 @@ pub(super) async fn on_room_message( ) .await } - super::super::delete::DeleteCommand::BadArgs => { + super::super::super::delete::DeleteCommand::BadArgs => { format!("Usage: `{} delete `", ctx.services.bot_name) } }; @@ -392,15 +382,15 @@ pub(super) async fn on_room_message( // Check for the rmtree command, which requires async agent/worktree ops // and cannot be handled by the sync command registry. - if let Some(rmtree_cmd) = super::super::rmtree::extract_rmtree_command( + if let Some(rmtree_cmd) = super::super::super::rmtree::extract_rmtree_command( &user_message, &ctx.services.bot_name, ctx.matrix_user_id.as_str(), ) { let response = match rmtree_cmd { - super::super::rmtree::RmtreeCommand::Rmtree { story_number } => { + super::super::super::rmtree::RmtreeCommand::Rmtree { story_number } => { slog!("[matrix-bot] Handling rmtree command from {sender}: story {story_number}"); - super::super::rmtree::handle_rmtree( + super::super::super::rmtree::handle_rmtree( &ctx.services.bot_name, &story_number, &effective_root, @@ -408,7 +398,7 @@ pub(super) async fn on_room_message( ) .await } - super::super::rmtree::RmtreeCommand::BadArgs => { + super::super::super::rmtree::RmtreeCommand::BadArgs => { format!("Usage: `{} rmtree `", ctx.services.bot_name) } }; @@ -426,20 +416,20 @@ pub(super) async fn on_room_message( // Check for the start command, which requires async agent ops and cannot // be handled by the sync command registry. - if let Some(start_cmd) = super::super::start::extract_start_command( + if let Some(start_cmd) = super::super::super::start::extract_start_command( &user_message, &ctx.services.bot_name, ctx.matrix_user_id.as_str(), ) { let response = match start_cmd { - super::super::start::StartCommand::Start { + super::super::super::start::StartCommand::Start { story_number, agent_hint, } => { slog!( "[matrix-bot] Handling start command from {sender}: story {story_number} agent={agent_hint:?}" ); - super::super::start::handle_start( + super::super::super::start::handle_start( &ctx.services.bot_name, &story_number, agent_hint.as_deref(), @@ -448,7 +438,7 @@ pub(super) async fn on_room_message( ) .await } - super::super::start::StartCommand::BadArgs => { + super::super::super::start::StartCommand::BadArgs => { format!( "Usage: `{} start ` or `{} start opus`", ctx.services.bot_name, ctx.services.bot_name @@ -469,7 +459,7 @@ pub(super) async fn on_room_message( // Check for the reset command, which requires async access to the shared // conversation history and cannot be handled by the sync command registry. - if super::super::reset::extract_reset_command( + if super::super::super::reset::extract_reset_command( &user_message, &ctx.services.bot_name, ctx.matrix_user_id.as_str(), @@ -477,7 +467,7 @@ pub(super) async fn on_room_message( .is_some() { slog!("[matrix-bot] Handling reset command from {sender}"); - let response = super::super::reset::handle_reset( + let response = super::super::super::reset::handle_reset( &ctx.services.bot_name, &incoming_room_id, &ctx.history, @@ -498,7 +488,7 @@ pub(super) async fn on_room_message( // Check for the rebuild command, which requires async agent and process ops // and cannot be handled by the sync command registry. - if super::super::rebuild::extract_rebuild_command( + if super::super::super::rebuild::extract_rebuild_command( &user_message, &ctx.services.bot_name, ctx.matrix_user_id.as_str(), @@ -517,7 +507,7 @@ pub(super) async fn on_room_message( { ctx.bot_sent_event_ids.lock().await.insert(event_id); } - let response = super::super::rebuild::handle_rebuild( + let response = super::super::super::rebuild::handle_rebuild( &ctx.services.bot_name, &ctx.services.project_root, &ctx.services.agents, @@ -609,304 +599,3 @@ pub(super) async fn on_room_message( }); } -pub(super) async fn handle_message( - room_id_str: String, - room_id: OwnedRoomId, - ctx: BotContext, - sender: String, - user_message: String, -) { - // Look up the room's existing Claude Code session ID (if any) so we can - // resume the conversation with structured API messages instead of - // flattening history into a text prefix. - let resume_session_id: Option = { - let guard = ctx.history.lock().await; - guard.get(&room_id).and_then(|conv| conv.session_id.clone()) - }; - - // The prompt is just the current message with sender attribution. - // Prior conversation context is carried by the Claude Code session. - let bot_name = &ctx.services.bot_name; - let active_project_ctx = if let Some(ref ap) = ctx.gateway_active_project { - let name = ap.read().await.clone(); - format!("[Active project: {name}]\n") - } else { - String::new() - }; - let prompt = format!( - "[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n{active_project_ctx}\n{}", - format_user_prompt(&sender, &user_message) - ); - - let provider = ClaudeCodeProvider::new(); - let (cancel_tx, mut cancel_rx) = watch::channel(false); - // Keep the sender alive for the duration of the call. - let _cancel_tx = cancel_tx; - - // Channel for sending complete paragraphs to the Matrix posting task. - let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); - let msg_tx_for_callback = msg_tx.clone(); - - // Spawn a task to post messages via the transport as they arrive so we - // don't block the LLM stream while waiting for send round-trips. - let post_transport = Arc::clone(&ctx.transport); - let post_room_id = room_id_str.clone(); - let sent_ids = Arc::clone(&ctx.bot_sent_event_ids); - let sent_ids_for_post = Arc::clone(&sent_ids); - let post_task = tokio::spawn(async move { - while let Some(chunk) = msg_rx.recv().await { - let html = markdown_to_html(&chunk); - if let Ok(msg_id) = post_transport - .send_message(&post_room_id, &chunk, &html) - .await - && let Ok(event_id) = msg_id.parse() - { - sent_ids_for_post.lock().await.insert(event_id); - } - } - }); - - // Shared state between the sync token callback and the async outer scope. - let buffer = Arc::new(std::sync::Mutex::new(String::new())); - let buffer_for_callback = Arc::clone(&buffer); - let sent_any_chunk = Arc::new(AtomicBool::new(false)); - let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk); - - // In gateway mode, run Claude Code in the gateway config directory so it - // picks up the `.mcp.json` that points to the gateway's MCP proxy endpoint. - // The gateway proxies tool calls to the active project automatically. - // In standalone mode, use the project root directly. - let project_root_str = if ctx.is_gateway() { - ctx.services.project_root.to_string_lossy().to_string() - } else { - ctx.effective_project_root() - .await - .to_string_lossy() - .to_string() - }; - let chat_fut = provider.chat_stream( - &prompt, - &project_root_str, - resume_session_id.as_deref(), - None, - &mut cancel_rx, - move |token| { - let mut buf = buffer_for_callback.lock().unwrap(); - buf.push_str(token); - // Flush complete paragraphs as they arrive. - let paragraphs = drain_complete_paragraphs(&mut buf); - for chunk in paragraphs { - sent_any_chunk_for_callback.store(true, Ordering::Relaxed); - let _ = msg_tx_for_callback.send(chunk); - } - }, - |_thinking| {}, // Discard thinking tokens - |_activity| {}, // Discard activity signals - ); - tokio::pin!(chat_fut); - - // Lock the permission receiver for the duration of this chat session. - // Permission requests from the MCP `prompt_permission` tool arrive here. - let mut perm_rx_guard = ctx.services.perm_rx.lock().await; - - let result = loop { - tokio::select! { - r = &mut chat_fut => break r, - - Some(perm_fwd) = perm_rx_guard.recv() => { - // Post the permission prompt to the room via the transport. - let prompt_msg = format!( - "**Permission Request**\n\n\ - Tool: `{}`\n```json\n{}\n```\n\n\ - Reply **yes** to approve or **no** to deny.", - perm_fwd.tool_name, - serde_json::to_string_pretty(&perm_fwd.tool_input) - .unwrap_or_else(|_| perm_fwd.tool_input.to_string()), - ); - let html = markdown_to_html(&prompt_msg); - if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &prompt_msg, &html).await - && let Ok(event_id) = msg_id.parse() - { - sent_ids.lock().await.insert(event_id); - } - - // Store the MCP oneshot sender so the event handler can - // resolve it when the user replies yes/no. - ctx.services.pending_perm_replies - .lock() - .await - .insert(room_id.to_string(), perm_fwd.response_tx); - - // Spawn a timeout task: auto-deny if the user does not respond. - let pending = Arc::clone(&ctx.services.pending_perm_replies); - let timeout_room_id = room_id.to_string(); - let timeout_transport = Arc::clone(&ctx.transport); - let timeout_room_id_str = room_id_str.clone(); - let timeout_sent_ids = Arc::clone(&ctx.bot_sent_event_ids); - let timeout_secs = ctx.services.permission_timeout_secs; - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(timeout_secs)).await; - if let Some(tx) = pending.lock().await.remove(&timeout_room_id) { - let _ = tx.send(PermissionDecision::Deny); - let msg = "Permission request timed out — denied (fail-closed)."; - let html = markdown_to_html(msg); - if let Ok(msg_id) = timeout_transport - .send_message(&timeout_room_id_str, msg, &html) - .await - && let Ok(event_id) = msg_id.parse() - { - timeout_sent_ids.lock().await.insert(event_id); - } - } - }); - } - } - }; - drop(perm_rx_guard); - - // Flush any remaining text that didn't end with a paragraph boundary. - let remaining = buffer.lock().unwrap().trim().to_string(); - let did_send_any = sent_any_chunk.load(Ordering::Relaxed); - - let (assistant_reply, new_session_id) = match result { - Ok(ClaudeCodeResult { - messages, - session_id, - }) => { - let reply = if !remaining.is_empty() { - let _ = msg_tx.send(remaining.clone()); - remaining - } else if !did_send_any { - // Nothing was streamed at all (e.g. only tool calls with no - // final text) — fall back to the last assistant message from - // the structured result. - let last_text = messages - .iter() - .rev() - .find(|m| m.role == crate::llm::types::Role::Assistant && !m.content.is_empty()) - .map(|m| m.content.clone()) - .unwrap_or_default(); - if !last_text.is_empty() { - let _ = msg_tx.send(last_text.clone()); - } - last_text - } else { - remaining - }; - slog!("[matrix-bot] session_id from chat_stream: {:?}", session_id); - (reply, session_id) - } - Err(e) => { - slog!("[matrix-bot] LLM error: {e}"); - let err_msg = if let Some(url) = crate::llm::oauth::extract_login_url_from_error(&e) { - format!("Authentication required. [Click here to log in to Claude]({url})") - } else { - format!("Error processing your request: {e}") - }; - let _ = msg_tx.send(err_msg.clone()); - (err_msg, None) - } - }; - - // Drop the sender to signal the posting task that no more messages will - // arrive, then wait for all pending Matrix sends to complete. - drop(msg_tx); - let _ = post_task.await; - - // Record this exchange in the per-room conversation history and persist - // the session ID so the next turn resumes with structured API messages. - if !assistant_reply.starts_with("Error processing") { - let mut guard = ctx.history.lock().await; - let conv = guard.entry(room_id).or_default(); - - // Store the session ID so the next turn uses --resume. - slog!( - "[matrix-bot] storing session_id: {:?} (was: {:?})", - new_session_id, - conv.session_id - ); - if new_session_id.is_some() { - conv.session_id = new_session_id; - } - - conv.entries.push(ConversationEntry { - role: ConversationRole::User, - sender: sender.clone(), - content: user_message, - }); - conv.entries.push(ConversationEntry { - role: ConversationRole::Assistant, - sender: String::new(), - content: assistant_reply, - }); - - // Trim to the configured maximum, dropping the oldest entries first. - // The session_id is preserved: Claude Code's --resume loads the full - // conversation from its own session transcript on disk, so trimming - // our local tracking doesn't affect the LLM's context. - if conv.entries.len() > ctx.history_size { - let excess = conv.entries.len() - ctx.history_size; - conv.entries.drain(..excess); - } - - // Persist to disk so history survives server restarts. - save_history(&ctx.services.project_root, &guard); - } -} - -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- - -#[cfg(test)] -mod tests { - use super::*; - - // -- format_user_prompt ------------------------------------------------- - - #[test] - fn format_user_prompt_includes_sender_and_message() { - let prompt = format_user_prompt("@alice:example.com", "Hello!"); - assert_eq!(prompt, "@alice:example.com: Hello!"); - } - - #[test] - fn format_user_prompt_different_users() { - let prompt = format_user_prompt("@bob:example.com", "What's up?"); - assert_eq!(prompt, "@bob:example.com: What's up?"); - } - - // -- OAuth login link formatting ---------------------------------------- - - #[test] - fn oauth_error_produces_login_link() { - let err = "OAuth session expired or credentials missing. Please log in: http://localhost:3001/oauth/authorize"; - let url = crate::llm::oauth::extract_login_url_from_error(err); - assert!(url.is_some(), "should extract URL from OAuth error"); - let msg = format!( - "Authentication required. [Click here to log in to Claude]({})", - url.unwrap() - ); - assert!(msg.contains("http://localhost:3001/oauth/authorize")); - assert!(msg.contains("[Click here to log in to Claude]")); - } - - #[test] - fn non_oauth_error_not_formatted_as_link() { - let err = "Some unrelated error"; - assert!(crate::llm::oauth::extract_login_url_from_error(err).is_none()); - } - - // -- bot_name / system prompt ------------------------------------------- - - #[test] - fn bot_name_system_prompt_format() { - let bot_name = "Timmy"; - let system_prompt = - format!("Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude."); - assert_eq!( - system_prompt, - "Your name is Timmy. Refer to yourself as Timmy, not Claude." - ); - } -}