use crate::chat::util::{drain_complete_paragraphs, is_permission_approval}; use crate::http::context::PermissionDecision; use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult}; use crate::slog; use matrix_sdk::{ Client, event_handler::Ctx, room::Room, ruma::{ OwnedRoomId, events::room::message::{MessageType, OriginalSyncRoomMessageEvent}, }, }; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use tokio::sync::watch; use super::context::BotContext; use super::format::markdown_to_html; use super::history::{ConversationEntry, ConversationRole, save_history}; use super::mentions::{is_reply_to_bot, mentions_bot}; use super::verification::check_sender_verified; /// Build the user-facing prompt for a single turn. In multi-user rooms the /// sender is included so the LLM can distinguish participants. pub(super) fn format_user_prompt(sender: &str, message: &str) -> String { format!("{sender}: {message}") } /// Matrix event handler for room messages. Each invocation spawns an /// independent task so the sync loop is not blocked by LLM calls. pub(super) async fn on_room_message( ev: OriginalSyncRoomMessageEvent, room: Room, client: Client, Ctx(ctx): Ctx, ) { let incoming_room_id = room.room_id().to_owned(); slog!( "[matrix-bot] Event received: room={} sender={}", incoming_room_id, ev.sender, ); // Only handle messages from rooms we are configured to listen in. if !ctx.target_room_ids.iter().any(|r| r == &incoming_room_id) { slog!("[matrix-bot] Ignoring message from unconfigured room {incoming_room_id}"); return; } // Ignore the bot's own messages to prevent echo loops. if ev.sender == ctx.bot_user_id { return; } // Only respond to users on the allowlist (fail-closed). if !ctx.allowed_users.iter().any(|u| u == ev.sender.as_str()) { slog!( "[matrix-bot] Ignoring message from unauthorised user: {}", ev.sender ); return; } // Only handle plain text messages. let (body, formatted_body) = match &ev.content.msgtype { MessageType::Text(t) => (t.body.clone(), t.formatted.as_ref().map(|f| f.body.clone())), _ => return, }; // Only respond when the bot is directly addressed (mentioned by name/ID), // when the message is a reply to one of the bot's own messages, or when // ambient mode is enabled for this room. let is_addressed = mentions_bot(&body, formatted_body.as_deref(), &ctx.bot_user_id) || is_reply_to_bot(ev.content.relates_to.as_ref(), &ctx.bot_sent_event_ids).await; let room_id_str = incoming_room_id.to_string(); let is_ambient = ctx.ambient_rooms.lock().unwrap().contains(&room_id_str); // Always let "ambient on" through — it is the one command that must work // even when the bot is not mentioned and ambient mode is off, otherwise // there is no way to re-enable ambient mode without an @-mention. let is_ambient_on = body .to_ascii_lowercase() .contains("ambient on"); if !is_addressed && !is_ambient && !is_ambient_on { slog!( "[matrix-bot] Ignoring unaddressed message from {}", ev.sender ); return; } // Reject commands from unencrypted rooms — E2EE is mandatory. if !room.encryption_state().is_encrypted() { slog!( "[matrix-bot] Rejecting message from {} — room {} is not encrypted. \ Commands are only accepted from encrypted rooms.", ev.sender, incoming_room_id ); return; } // Always verify that the sender has a cross-signing identity. // This check is unconditional and cannot be disabled via config. match check_sender_verified(&client, &ev.sender).await { Ok(true) => { /* sender has a cross-signing identity — proceed */ } Ok(false) => { slog!( "[matrix-bot] Rejecting message from {} — no cross-signing identity \ found in encrypted room {}", ev.sender, incoming_room_id ); return; } Err(e) => { slog!( "[matrix-bot] Error checking verification for {}: {e} — \ rejecting message (fail-closed)", ev.sender ); return; } } // If there is a pending permission prompt for this room, interpret the // message as a yes/no response instead of starting a new chat. { let mut pending = ctx.pending_perm_replies.lock().await; if let Some(tx) = pending.remove(&incoming_room_id) { let decision = if is_permission_approval(&body) { PermissionDecision::Approve } else { PermissionDecision::Deny }; let _ = tx.send(decision); let confirmation = if decision == PermissionDecision::Approve { "Permission approved." } else { "Permission denied." }; let html = markdown_to_html(confirmation); if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, confirmation, &html).await && let Ok(event_id) = msg_id.parse() { ctx.bot_sent_event_ids.lock().await.insert(event_id); } return; } } let sender = ev.sender.to_string(); let user_message = body; slog!("[matrix-bot] Message from {sender}: {user_message}"); // Check for bot-level commands (help, status, ambient, …) before invoking // the LLM. All commands are registered in commands.rs — no special-casing // needed here. let dispatch = super::super::commands::CommandDispatch { bot_name: &ctx.bot_name, bot_user_id: ctx.bot_user_id.as_str(), project_root: &ctx.project_root, agents: &ctx.agents, ambient_rooms: &ctx.ambient_rooms, room_id: &room_id_str, }; if let Some((response, response_html)) = super::super::commands::try_handle_command_with_html(&dispatch, &user_message) { slog!("[matrix-bot] Handled bot command from {sender}"); if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &response, &response_html).await && let Ok(event_id) = msg_id.parse() { ctx.bot_sent_event_ids.lock().await.insert(event_id); } return; } // Check for the assign command, which requires async agent ops (stop + // start) and cannot be handled by the sync command registry. if let Some(assign_cmd) = super::super::assign::extract_assign_command( &user_message, &ctx.bot_name, ctx.bot_user_id.as_str(), ) { let response = match assign_cmd { super::super::assign::AssignCommand::Assign { story_number, model, } => { slog!( "[matrix-bot] Handling assign command from {sender}: story {story_number} model={model}" ); super::super::assign::handle_assign( &ctx.bot_name, &story_number, &model, &ctx.project_root, &ctx.agents, ) .await } super::super::assign::AssignCommand::BadArgs => { format!( "Usage: `{} assign ` (e.g. `assign 42 opus`)", ctx.bot_name ) } }; let html = markdown_to_html(&response); if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &response, &html).await && let Ok(event_id) = msg_id.parse() { ctx.bot_sent_event_ids.lock().await.insert(event_id); } return; } // Check for the htop command, which requires async Matrix access (Room) // and cannot be handled by the sync command registry. if let Some(htop_cmd) = super::super::htop::extract_htop_command( &user_message, &ctx.bot_name, ctx.bot_user_id.as_str(), ) { slog!("[matrix-bot] Handling htop command from {sender}: {htop_cmd:?}"); match htop_cmd { super::super::htop::HtopCommand::Stop => { super::super::htop::handle_htop_stop( &*ctx.transport, &room_id_str, &ctx.htop_sessions, ) .await; } super::super::htop::HtopCommand::Start { duration_secs } => { super::super::htop::handle_htop_start( &ctx.transport, &room_id_str, &ctx.htop_sessions, Arc::clone(&ctx.agents), duration_secs, ) .await; } } return; } // Check for the delete command, which requires async agent/worktree ops // and cannot be handled by the sync command registry. if let Some(del_cmd) = super::super::delete::extract_delete_command( &user_message, &ctx.bot_name, ctx.bot_user_id.as_str(), ) { let response = match del_cmd { super::super::delete::DeleteCommand::Delete { story_number } => { slog!( "[matrix-bot] Handling delete command from {sender}: story {story_number}" ); super::super::delete::handle_delete( &ctx.bot_name, &story_number, &ctx.project_root, &ctx.agents, ) .await } super::super::delete::DeleteCommand::BadArgs => { format!("Usage: `{} delete `", ctx.bot_name) } }; let html = markdown_to_html(&response); if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &response, &html).await && let Ok(event_id) = msg_id.parse() { ctx.bot_sent_event_ids.lock().await.insert(event_id); } return; } // Check for the rmtree command, which requires async agent/worktree ops // and cannot be handled by the sync command registry. if let Some(rmtree_cmd) = super::super::rmtree::extract_rmtree_command( &user_message, &ctx.bot_name, ctx.bot_user_id.as_str(), ) { let response = match rmtree_cmd { super::super::rmtree::RmtreeCommand::Rmtree { story_number } => { slog!( "[matrix-bot] Handling rmtree command from {sender}: story {story_number}" ); super::super::rmtree::handle_rmtree( &ctx.bot_name, &story_number, &ctx.project_root, &ctx.agents, ) .await } super::super::rmtree::RmtreeCommand::BadArgs => { format!("Usage: `{} rmtree `", ctx.bot_name) } }; let html = markdown_to_html(&response); if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &response, &html).await && let Ok(event_id) = msg_id.parse() { ctx.bot_sent_event_ids.lock().await.insert(event_id); } return; } // Check for the start command, which requires async agent ops and cannot // be handled by the sync command registry. if let Some(start_cmd) = super::super::start::extract_start_command( &user_message, &ctx.bot_name, ctx.bot_user_id.as_str(), ) { let response = match start_cmd { super::super::start::StartCommand::Start { story_number, agent_hint, } => { slog!( "[matrix-bot] Handling start command from {sender}: story {story_number} agent={agent_hint:?}" ); super::super::start::handle_start( &ctx.bot_name, &story_number, agent_hint.as_deref(), &ctx.project_root, &ctx.agents, ) .await } super::super::start::StartCommand::BadArgs => { format!( "Usage: `{} start ` or `{} start opus`", ctx.bot_name, ctx.bot_name ) } }; let html = markdown_to_html(&response); if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &response, &html).await && let Ok(event_id) = msg_id.parse() { ctx.bot_sent_event_ids.lock().await.insert(event_id); } return; } // Check for the reset command, which requires async access to the shared // conversation history and cannot be handled by the sync command registry. if super::super::reset::extract_reset_command( &user_message, &ctx.bot_name, ctx.bot_user_id.as_str(), ) .is_some() { slog!("[matrix-bot] Handling reset command from {sender}"); let response = super::super::reset::handle_reset( &ctx.bot_name, &incoming_room_id, &ctx.history, &ctx.project_root, ) .await; let html = markdown_to_html(&response); if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &response, &html).await && let Ok(event_id) = msg_id.parse() { ctx.bot_sent_event_ids.lock().await.insert(event_id); } return; } // Check for the rebuild command, which requires async agent and process ops // and cannot be handled by the sync command registry. if super::super::rebuild::extract_rebuild_command( &user_message, &ctx.bot_name, ctx.bot_user_id.as_str(), ) .is_some() { slog!("[matrix-bot] Handling rebuild command from {sender}"); // Acknowledge immediately — the rebuild may take a while or re-exec. let ack = "Rebuilding server… this may take a moment."; let ack_html = markdown_to_html(ack); if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, ack, &ack_html).await && let Ok(event_id) = msg_id.parse() { ctx.bot_sent_event_ids.lock().await.insert(event_id); } let response = super::super::rebuild::handle_rebuild( &ctx.bot_name, &ctx.project_root, &ctx.agents, ) .await; let html = markdown_to_html(&response); if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &response, &html).await && let Ok(event_id) = msg_id.parse() { ctx.bot_sent_event_ids.lock().await.insert(event_id); } return; } // Check for the timer command, which requires async file I/O and cannot // be handled by the sync command registry. if let Some(timer_cmd) = crate::chat::timer::extract_timer_command( &user_message, &ctx.bot_name, ctx.bot_user_id.as_str(), ) { slog!("[matrix-bot] Handling timer command from {sender}: {timer_cmd:?}"); let response = crate::chat::timer::handle_timer_command( timer_cmd, &ctx.timer_store, &ctx.project_root, ) .await; let html = markdown_to_html(&response); if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &response, &html).await && let Ok(event_id) = msg_id.parse() { ctx.bot_sent_event_ids.lock().await.insert(event_id); } return; } // Spawn a separate task so the Matrix sync loop is not blocked while we // wait for the LLM response (which can take several seconds). tokio::spawn(async move { handle_message(room_id_str, incoming_room_id, ctx, sender, user_message).await; }); } pub(super) async fn handle_message( room_id_str: String, room_id: OwnedRoomId, ctx: BotContext, sender: String, user_message: String, ) { // Look up the room's existing Claude Code session ID (if any) so we can // resume the conversation with structured API messages instead of // flattening history into a text prefix. let resume_session_id: Option = { let guard = ctx.history.lock().await; guard .get(&room_id) .and_then(|conv| conv.session_id.clone()) }; // The prompt is just the current message with sender attribution. // Prior conversation context is carried by the Claude Code session. let bot_name = &ctx.bot_name; let prompt = format!( "[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{}", format_user_prompt(&sender, &user_message) ); let provider = ClaudeCodeProvider::new(); let (cancel_tx, mut cancel_rx) = watch::channel(false); // Keep the sender alive for the duration of the call. let _cancel_tx = cancel_tx; // Channel for sending complete paragraphs to the Matrix posting task. let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); let msg_tx_for_callback = msg_tx.clone(); // Spawn a task to post messages via the transport as they arrive so we // don't block the LLM stream while waiting for send round-trips. let post_transport = Arc::clone(&ctx.transport); let post_room_id = room_id_str.clone(); let sent_ids = Arc::clone(&ctx.bot_sent_event_ids); let sent_ids_for_post = Arc::clone(&sent_ids); let post_task = tokio::spawn(async move { while let Some(chunk) = msg_rx.recv().await { let html = markdown_to_html(&chunk); if let Ok(msg_id) = post_transport.send_message(&post_room_id, &chunk, &html).await && let Ok(event_id) = msg_id.parse() { sent_ids_for_post.lock().await.insert(event_id); } } }); // Shared state between the sync token callback and the async outer scope. let buffer = Arc::new(std::sync::Mutex::new(String::new())); let buffer_for_callback = Arc::clone(&buffer); let sent_any_chunk = Arc::new(AtomicBool::new(false)); let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk); let project_root_str = ctx.project_root.to_string_lossy().to_string(); let chat_fut = provider.chat_stream( &prompt, &project_root_str, resume_session_id.as_deref(), None, &mut cancel_rx, move |token| { let mut buf = buffer_for_callback.lock().unwrap(); buf.push_str(token); // Flush complete paragraphs as they arrive. let paragraphs = drain_complete_paragraphs(&mut buf); for chunk in paragraphs { sent_any_chunk_for_callback.store(true, Ordering::Relaxed); let _ = msg_tx_for_callback.send(chunk); } }, |_thinking| {}, // Discard thinking tokens |_activity| {}, // Discard activity signals ); tokio::pin!(chat_fut); // Lock the permission receiver for the duration of this chat session. // Permission requests from the MCP `prompt_permission` tool arrive here. let mut perm_rx_guard = ctx.perm_rx.lock().await; let result = loop { tokio::select! { r = &mut chat_fut => break r, Some(perm_fwd) = perm_rx_guard.recv() => { // Post the permission prompt to the room via the transport. let prompt_msg = format!( "**Permission Request**\n\n\ Tool: `{}`\n```json\n{}\n```\n\n\ Reply **yes** to approve or **no** to deny.", perm_fwd.tool_name, serde_json::to_string_pretty(&perm_fwd.tool_input) .unwrap_or_else(|_| perm_fwd.tool_input.to_string()), ); let html = markdown_to_html(&prompt_msg); if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &prompt_msg, &html).await && let Ok(event_id) = msg_id.parse() { sent_ids.lock().await.insert(event_id); } // Store the MCP oneshot sender so the event handler can // resolve it when the user replies yes/no. ctx.pending_perm_replies .lock() .await .insert(room_id.clone(), perm_fwd.response_tx); // Spawn a timeout task: auto-deny if the user does not respond. let pending = Arc::clone(&ctx.pending_perm_replies); let timeout_room_id = room_id.clone(); let timeout_transport = Arc::clone(&ctx.transport); let timeout_room_id_str = room_id_str.clone(); let timeout_sent_ids = Arc::clone(&ctx.bot_sent_event_ids); let timeout_secs = ctx.permission_timeout_secs; tokio::spawn(async move { tokio::time::sleep(Duration::from_secs(timeout_secs)).await; if let Some(tx) = pending.lock().await.remove(&timeout_room_id) { let _ = tx.send(PermissionDecision::Deny); let msg = "Permission request timed out — denied (fail-closed)."; let html = markdown_to_html(msg); if let Ok(msg_id) = timeout_transport .send_message(&timeout_room_id_str, msg, &html) .await && let Ok(event_id) = msg_id.parse() { timeout_sent_ids.lock().await.insert(event_id); } } }); } } }; drop(perm_rx_guard); // Flush any remaining text that didn't end with a paragraph boundary. let remaining = buffer.lock().unwrap().trim().to_string(); let did_send_any = sent_any_chunk.load(Ordering::Relaxed); let (assistant_reply, new_session_id) = match result { Ok(ClaudeCodeResult { messages, session_id, }) => { let reply = if !remaining.is_empty() { let _ = msg_tx.send(remaining.clone()); remaining } else if !did_send_any { // Nothing was streamed at all (e.g. only tool calls with no // final text) — fall back to the last assistant message from // the structured result. let last_text = messages .iter() .rev() .find(|m| m.role == crate::llm::types::Role::Assistant && !m.content.is_empty()) .map(|m| m.content.clone()) .unwrap_or_default(); if !last_text.is_empty() { let _ = msg_tx.send(last_text.clone()); } last_text } else { remaining }; slog!("[matrix-bot] session_id from chat_stream: {:?}", session_id); (reply, session_id) } Err(e) => { slog!("[matrix-bot] LLM error: {e}"); let err_msg = if let Some(url) = crate::llm::oauth::extract_login_url_from_error(&e) { format!( "Authentication required. [Click here to log in to Claude]({url})" ) } else { format!("Error processing your request: {e}") }; let _ = msg_tx.send(err_msg.clone()); (err_msg, None) } }; // Drop the sender to signal the posting task that no more messages will // arrive, then wait for all pending Matrix sends to complete. drop(msg_tx); let _ = post_task.await; // Record this exchange in the per-room conversation history and persist // the session ID so the next turn resumes with structured API messages. if !assistant_reply.starts_with("Error processing") { let mut guard = ctx.history.lock().await; let conv = guard.entry(room_id).or_default(); // Store the session ID so the next turn uses --resume. slog!("[matrix-bot] storing session_id: {:?} (was: {:?})", new_session_id, conv.session_id); if new_session_id.is_some() { conv.session_id = new_session_id; } conv.entries.push(ConversationEntry { role: ConversationRole::User, sender: sender.clone(), content: user_message, }); conv.entries.push(ConversationEntry { role: ConversationRole::Assistant, sender: String::new(), content: assistant_reply, }); // Trim to the configured maximum, dropping the oldest entries first. // The session_id is preserved: Claude Code's --resume loads the full // conversation from its own session transcript on disk, so trimming // our local tracking doesn't affect the LLM's context. if conv.entries.len() > ctx.history_size { let excess = conv.entries.len() - ctx.history_size; conv.entries.drain(..excess); } // Persist to disk so history survives server restarts. save_history(&ctx.project_root, &guard); } } // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- #[cfg(test)] mod tests { use super::*; // -- format_user_prompt ------------------------------------------------- #[test] fn format_user_prompt_includes_sender_and_message() { let prompt = format_user_prompt("@alice:example.com", "Hello!"); assert_eq!(prompt, "@alice:example.com: Hello!"); } #[test] fn format_user_prompt_different_users() { let prompt = format_user_prompt("@bob:example.com", "What's up?"); assert_eq!(prompt, "@bob:example.com: What's up?"); } // -- OAuth login link formatting ---------------------------------------- #[test] fn oauth_error_produces_login_link() { let err = "OAuth session expired or credentials missing. Please log in: http://localhost:3001/oauth/authorize"; let url = crate::llm::oauth::extract_login_url_from_error(err); assert!(url.is_some(), "should extract URL from OAuth error"); let msg = format!("Authentication required. [Click here to log in to Claude]({})", url.unwrap()); assert!(msg.contains("http://localhost:3001/oauth/authorize")); assert!(msg.contains("[Click here to log in to Claude]")); } #[test] fn non_oauth_error_not_formatted_as_link() { let err = "Some unrelated error"; assert!(crate::llm::oauth::extract_login_url_from_error(err).is_none()); } // -- bot_name / system prompt ------------------------------------------- #[test] fn bot_name_system_prompt_format() { let bot_name = "Timmy"; let system_prompt = format!("Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude."); assert_eq!( system_prompt, "Your name is Timmy. Refer to yourself as Timmy, not Claude." ); } }