diff --git a/server/src/matrix/bot.rs b/server/src/matrix/bot.rs index 682c81c..13705ce 100644 --- a/server/src/matrix/bot.rs +++ b/server/src/matrix/bot.rs @@ -12,8 +12,9 @@ use matrix_sdk::{ }, }, }; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use tokio::sync::watch; use super::config::BotConfig; @@ -133,81 +134,105 @@ async fn on_room_message( }); } -async fn handle_message(room: Room, ctx: BotContext, user_message: String) { - match call_claude_code(&ctx.project_root, &user_message).await { - Ok(response) => { - let _ = room - .send(RoomMessageEventContent::text_plain(response)) - .await; - } - Err(e) => { - slog!("[matrix-bot] LLM error: {e}"); - let _ = room - .send(RoomMessageEventContent::text_plain(format!( - "Error processing your request: {e}" - ))) - .await; +/// Drain all complete paragraphs from `buffer` and return them. +/// +/// A paragraph boundary is a double newline (`\n\n`). Each drained paragraph +/// is trimmed of surrounding whitespace; empty paragraphs are discarded. +/// The buffer is left with only the remaining incomplete text. +pub fn drain_complete_paragraphs(buffer: &mut String) -> Vec { + let mut paragraphs = Vec::new(); + while let Some(pos) = buffer.find("\n\n") { + let chunk = buffer[..pos].trim().to_string(); + *buffer = buffer[pos + 2..].to_string(); + if !chunk.is_empty() { + paragraphs.push(chunk); } } + paragraphs } -/// Call Claude Code with the user's message. -/// -/// Uses the same `ClaudeCodeProvider` as the web UI chat. Claude Code manages -/// its own tools (including MCP tools) natively — no separate tool schemas or -/// HTTP self-calls needed. -async fn call_claude_code( - project_root: &Path, - user_message: &str, -) -> Result { +async fn handle_message(room: Room, ctx: BotContext, user_message: String) { let provider = ClaudeCodeProvider::new(); - - // Create a cancel channel that never fires — the bot doesn't support - // mid-request cancellation (Matrix messages are fire-and-forget). let (cancel_tx, mut cancel_rx) = watch::channel(false); // Keep the sender alive for the duration of the call. let _cancel_tx = cancel_tx; - // Collect text tokens into the final response. We don't stream to Matrix - // (each message is posted as a single reply), so we just accumulate. - let response_text = Arc::new(std::sync::Mutex::new(String::new())); - let response_clone = Arc::clone(&response_text); + // Channel for sending complete paragraphs to the Matrix posting task. + let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); + let msg_tx_for_callback = msg_tx.clone(); - let ClaudeCodeResult { messages, .. } = provider + // Spawn a task to post messages to Matrix as they arrive so we don't + // block the LLM stream while waiting for Matrix send round-trips. + let post_room = room.clone(); + let post_task = tokio::spawn(async move { + while let Some(chunk) = msg_rx.recv().await { + let _ = post_room + .send(RoomMessageEventContent::text_plain(chunk)) + .await; + } + }); + + // Shared state between the sync token callback and the async outer scope. + let buffer = Arc::new(std::sync::Mutex::new(String::new())); + let buffer_for_callback = Arc::clone(&buffer); + let sent_any_chunk = Arc::new(AtomicBool::new(false)); + let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk); + + let result = provider .chat_stream( - user_message, - &project_root.to_string_lossy(), + &user_message, + &ctx.project_root.to_string_lossy(), None, // No session resumption for now (see story 182) &mut cancel_rx, move |token| { - response_clone.lock().unwrap().push_str(token); + let mut buf = buffer_for_callback.lock().unwrap(); + buf.push_str(token); + // Flush complete paragraphs as they arrive. + let paragraphs = drain_complete_paragraphs(&mut buf); + for chunk in paragraphs { + sent_any_chunk_for_callback.store(true, Ordering::Relaxed); + let _ = msg_tx_for_callback.send(chunk); + } }, |_thinking| {}, // Discard thinking tokens |_activity| {}, // Discard activity signals ) - .await?; + .await; - // Prefer the accumulated streamed text. If nothing was streamed (e.g. - // Claude Code returned only tool calls with no final text), fall back to - // extracting the last assistant message from the structured result. - let streamed = response_text.lock().unwrap().clone(); - if !streamed.is_empty() { - return Ok(streamed); + // Flush any remaining text that didn't end with a paragraph boundary. + let remaining = buffer.lock().unwrap().trim().to_string(); + let did_send_any = sent_any_chunk.load(Ordering::Relaxed); + + match result { + Ok(ClaudeCodeResult { messages, .. }) => { + if !remaining.is_empty() { + let _ = msg_tx.send(remaining); + } else if !did_send_any { + // Nothing was streamed at all (e.g. only tool calls with no + // final text) — fall back to the last assistant message from + // the structured result. + let last_text = messages + .iter() + .rev() + .find(|m| m.role == crate::llm::types::Role::Assistant && !m.content.is_empty()) + .map(|m| m.content.clone()) + .unwrap_or_default(); + if !last_text.is_empty() { + let _ = msg_tx.send(last_text); + } + } + } + Err(e) => { + slog!("[matrix-bot] LLM error: {e}"); + // Discard any partial buffered text and send the error as one message. + let _ = msg_tx.send(format!("Error processing your request: {e}")); + } } - // Fallback: find the last assistant message - let last_text = messages - .iter() - .rev() - .find(|m| m.role == crate::llm::types::Role::Assistant && !m.content.is_empty()) - .map(|m| m.content.clone()) - .unwrap_or_default(); - - if last_text.is_empty() { - Err("Claude Code returned no response text".to_string()) - } else { - Ok(last_text) - } + // Drop the sender to signal the posting task that no more messages will + // arrive, then wait for all pending Matrix sends to complete. + drop(msg_tx); + let _ = post_task.await; } #[cfg(test)] @@ -221,15 +246,75 @@ mod tests { assert_clone::(); } - #[tokio::test] - async fn call_claude_code_returns_error_when_claude_not_installed() { - // When `claude` binary is not in PATH (or returns an error), the - // provider should return an Err rather than panic. - let fake_root = PathBuf::from("/tmp/nonexistent_project_root"); - let result = call_claude_code(&fake_root, "hello").await; - // We expect either an error (claude not found) or a valid response - // if claude happens to be installed. Both are acceptable — the key - // property is that it doesn't panic. - let _ = result; + #[test] + fn drain_complete_paragraphs_no_boundary_returns_empty() { + let mut buf = "Hello World".to_string(); + let paras = drain_complete_paragraphs(&mut buf); + assert!(paras.is_empty()); + assert_eq!(buf, "Hello World"); + } + + #[test] + fn drain_complete_paragraphs_single_boundary() { + let mut buf = "Paragraph one.\n\nParagraph two.".to_string(); + let paras = drain_complete_paragraphs(&mut buf); + assert_eq!(paras, vec!["Paragraph one."]); + assert_eq!(buf, "Paragraph two."); + } + + #[test] + fn drain_complete_paragraphs_multiple_boundaries() { + let mut buf = "A\n\nB\n\nC".to_string(); + let paras = drain_complete_paragraphs(&mut buf); + assert_eq!(paras, vec!["A", "B"]); + assert_eq!(buf, "C"); + } + + #[test] + fn drain_complete_paragraphs_trailing_boundary() { + let mut buf = "A\n\nB\n\n".to_string(); + let paras = drain_complete_paragraphs(&mut buf); + assert_eq!(paras, vec!["A", "B"]); + assert_eq!(buf, ""); + } + + #[test] + fn drain_complete_paragraphs_empty_input() { + let mut buf = String::new(); + let paras = drain_complete_paragraphs(&mut buf); + assert!(paras.is_empty()); + assert_eq!(buf, ""); + } + + #[test] + fn drain_complete_paragraphs_skips_empty_chunks() { + // Consecutive double-newlines produce no empty paragraphs. + let mut buf = "\n\n\n\nHello".to_string(); + let paras = drain_complete_paragraphs(&mut buf); + assert!(paras.is_empty()); + assert_eq!(buf, "Hello"); + } + + #[test] + fn drain_complete_paragraphs_trims_whitespace() { + let mut buf = " Hello \n\n World ".to_string(); + let paras = drain_complete_paragraphs(&mut buf); + assert_eq!(paras, vec!["Hello"]); + assert_eq!(buf, " World "); + } + + #[test] + fn drain_complete_paragraphs_incremental_simulation() { + // Simulate tokens arriving one character at a time. + let mut buf = String::new(); + let mut all_paragraphs = Vec::new(); + + for ch in "First para.\n\nSecond para.\n\nThird.".chars() { + buf.push(ch); + all_paragraphs.extend(drain_complete_paragraphs(&mut buf)); + } + + assert_eq!(all_paragraphs, vec!["First para.", "Second para."]); + assert_eq!(buf, "Third."); } }