Stream Matrix bot responses on double-newline paragraph boundaries
Instead of waiting for the full LLM response and sending it as a single message, stream bot responses to Matrix as they are generated. Paragraphs are delimited by double-newline boundaries, giving users incremental feedback while the model is still thinking. Story: 184_story_stream_bot_responses_to_matrix_on_double_newline_boundaries
This commit is contained in:
@@ -12,8 +12,9 @@ use matrix_sdk::{
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use super::config::BotConfig;
|
use super::config::BotConfig;
|
||||||
@@ -133,81 +134,105 @@ async fn on_room_message(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_message(room: Room, ctx: BotContext, user_message: String) {
|
/// Drain all complete paragraphs from `buffer` and return them.
|
||||||
match call_claude_code(&ctx.project_root, &user_message).await {
|
///
|
||||||
Ok(response) => {
|
/// A paragraph boundary is a double newline (`\n\n`). Each drained paragraph
|
||||||
let _ = room
|
/// is trimmed of surrounding whitespace; empty paragraphs are discarded.
|
||||||
.send(RoomMessageEventContent::text_plain(response))
|
/// The buffer is left with only the remaining incomplete text.
|
||||||
.await;
|
pub fn drain_complete_paragraphs(buffer: &mut String) -> Vec<String> {
|
||||||
}
|
let mut paragraphs = Vec::new();
|
||||||
Err(e) => {
|
while let Some(pos) = buffer.find("\n\n") {
|
||||||
slog!("[matrix-bot] LLM error: {e}");
|
let chunk = buffer[..pos].trim().to_string();
|
||||||
let _ = room
|
*buffer = buffer[pos + 2..].to_string();
|
||||||
.send(RoomMessageEventContent::text_plain(format!(
|
if !chunk.is_empty() {
|
||||||
"Error processing your request: {e}"
|
paragraphs.push(chunk);
|
||||||
)))
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
paragraphs
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Call Claude Code with the user's message.
|
async fn handle_message(room: Room, ctx: BotContext, user_message: String) {
|
||||||
///
|
|
||||||
/// Uses the same `ClaudeCodeProvider` as the web UI chat. Claude Code manages
|
|
||||||
/// its own tools (including MCP tools) natively — no separate tool schemas or
|
|
||||||
/// HTTP self-calls needed.
|
|
||||||
async fn call_claude_code(
|
|
||||||
project_root: &Path,
|
|
||||||
user_message: &str,
|
|
||||||
) -> Result<String, String> {
|
|
||||||
let provider = ClaudeCodeProvider::new();
|
let provider = ClaudeCodeProvider::new();
|
||||||
|
|
||||||
// Create a cancel channel that never fires — the bot doesn't support
|
|
||||||
// mid-request cancellation (Matrix messages are fire-and-forget).
|
|
||||||
let (cancel_tx, mut cancel_rx) = watch::channel(false);
|
let (cancel_tx, mut cancel_rx) = watch::channel(false);
|
||||||
// Keep the sender alive for the duration of the call.
|
// Keep the sender alive for the duration of the call.
|
||||||
let _cancel_tx = cancel_tx;
|
let _cancel_tx = cancel_tx;
|
||||||
|
|
||||||
// Collect text tokens into the final response. We don't stream to Matrix
|
// Channel for sending complete paragraphs to the Matrix posting task.
|
||||||
// (each message is posted as a single reply), so we just accumulate.
|
let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
||||||
let response_text = Arc::new(std::sync::Mutex::new(String::new()));
|
let msg_tx_for_callback = msg_tx.clone();
|
||||||
let response_clone = Arc::clone(&response_text);
|
|
||||||
|
|
||||||
let ClaudeCodeResult { messages, .. } = provider
|
// Spawn a task to post messages to Matrix as they arrive so we don't
|
||||||
|
// block the LLM stream while waiting for Matrix send round-trips.
|
||||||
|
let post_room = room.clone();
|
||||||
|
let post_task = tokio::spawn(async move {
|
||||||
|
while let Some(chunk) = msg_rx.recv().await {
|
||||||
|
let _ = post_room
|
||||||
|
.send(RoomMessageEventContent::text_plain(chunk))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Shared state between the sync token callback and the async outer scope.
|
||||||
|
let buffer = Arc::new(std::sync::Mutex::new(String::new()));
|
||||||
|
let buffer_for_callback = Arc::clone(&buffer);
|
||||||
|
let sent_any_chunk = Arc::new(AtomicBool::new(false));
|
||||||
|
let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk);
|
||||||
|
|
||||||
|
let result = provider
|
||||||
.chat_stream(
|
.chat_stream(
|
||||||
user_message,
|
&user_message,
|
||||||
&project_root.to_string_lossy(),
|
&ctx.project_root.to_string_lossy(),
|
||||||
None, // No session resumption for now (see story 182)
|
None, // No session resumption for now (see story 182)
|
||||||
&mut cancel_rx,
|
&mut cancel_rx,
|
||||||
move |token| {
|
move |token| {
|
||||||
response_clone.lock().unwrap().push_str(token);
|
let mut buf = buffer_for_callback.lock().unwrap();
|
||||||
|
buf.push_str(token);
|
||||||
|
// Flush complete paragraphs as they arrive.
|
||||||
|
let paragraphs = drain_complete_paragraphs(&mut buf);
|
||||||
|
for chunk in paragraphs {
|
||||||
|
sent_any_chunk_for_callback.store(true, Ordering::Relaxed);
|
||||||
|
let _ = msg_tx_for_callback.send(chunk);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
|_thinking| {}, // Discard thinking tokens
|
|_thinking| {}, // Discard thinking tokens
|
||||||
|_activity| {}, // Discard activity signals
|
|_activity| {}, // Discard activity signals
|
||||||
)
|
)
|
||||||
.await?;
|
.await;
|
||||||
|
|
||||||
// Prefer the accumulated streamed text. If nothing was streamed (e.g.
|
// Flush any remaining text that didn't end with a paragraph boundary.
|
||||||
// Claude Code returned only tool calls with no final text), fall back to
|
let remaining = buffer.lock().unwrap().trim().to_string();
|
||||||
// extracting the last assistant message from the structured result.
|
let did_send_any = sent_any_chunk.load(Ordering::Relaxed);
|
||||||
let streamed = response_text.lock().unwrap().clone();
|
|
||||||
if !streamed.is_empty() {
|
match result {
|
||||||
return Ok(streamed);
|
Ok(ClaudeCodeResult { messages, .. }) => {
|
||||||
|
if !remaining.is_empty() {
|
||||||
|
let _ = msg_tx.send(remaining);
|
||||||
|
} else if !did_send_any {
|
||||||
|
// Nothing was streamed at all (e.g. only tool calls with no
|
||||||
|
// final text) — fall back to the last assistant message from
|
||||||
|
// the structured result.
|
||||||
|
let last_text = messages
|
||||||
|
.iter()
|
||||||
|
.rev()
|
||||||
|
.find(|m| m.role == crate::llm::types::Role::Assistant && !m.content.is_empty())
|
||||||
|
.map(|m| m.content.clone())
|
||||||
|
.unwrap_or_default();
|
||||||
|
if !last_text.is_empty() {
|
||||||
|
let _ = msg_tx.send(last_text);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
slog!("[matrix-bot] LLM error: {e}");
|
||||||
|
// Discard any partial buffered text and send the error as one message.
|
||||||
|
let _ = msg_tx.send(format!("Error processing your request: {e}"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fallback: find the last assistant message
|
// Drop the sender to signal the posting task that no more messages will
|
||||||
let last_text = messages
|
// arrive, then wait for all pending Matrix sends to complete.
|
||||||
.iter()
|
drop(msg_tx);
|
||||||
.rev()
|
let _ = post_task.await;
|
||||||
.find(|m| m.role == crate::llm::types::Role::Assistant && !m.content.is_empty())
|
|
||||||
.map(|m| m.content.clone())
|
|
||||||
.unwrap_or_default();
|
|
||||||
|
|
||||||
if last_text.is_empty() {
|
|
||||||
Err("Claude Code returned no response text".to_string())
|
|
||||||
} else {
|
|
||||||
Ok(last_text)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -221,15 +246,75 @@ mod tests {
|
|||||||
assert_clone::<BotContext>();
|
assert_clone::<BotContext>();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[test]
|
||||||
async fn call_claude_code_returns_error_when_claude_not_installed() {
|
fn drain_complete_paragraphs_no_boundary_returns_empty() {
|
||||||
// When `claude` binary is not in PATH (or returns an error), the
|
let mut buf = "Hello World".to_string();
|
||||||
// provider should return an Err rather than panic.
|
let paras = drain_complete_paragraphs(&mut buf);
|
||||||
let fake_root = PathBuf::from("/tmp/nonexistent_project_root");
|
assert!(paras.is_empty());
|
||||||
let result = call_claude_code(&fake_root, "hello").await;
|
assert_eq!(buf, "Hello World");
|
||||||
// We expect either an error (claude not found) or a valid response
|
}
|
||||||
// if claude happens to be installed. Both are acceptable — the key
|
|
||||||
// property is that it doesn't panic.
|
#[test]
|
||||||
let _ = result;
|
fn drain_complete_paragraphs_single_boundary() {
|
||||||
|
let mut buf = "Paragraph one.\n\nParagraph two.".to_string();
|
||||||
|
let paras = drain_complete_paragraphs(&mut buf);
|
||||||
|
assert_eq!(paras, vec!["Paragraph one."]);
|
||||||
|
assert_eq!(buf, "Paragraph two.");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn drain_complete_paragraphs_multiple_boundaries() {
|
||||||
|
let mut buf = "A\n\nB\n\nC".to_string();
|
||||||
|
let paras = drain_complete_paragraphs(&mut buf);
|
||||||
|
assert_eq!(paras, vec!["A", "B"]);
|
||||||
|
assert_eq!(buf, "C");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn drain_complete_paragraphs_trailing_boundary() {
|
||||||
|
let mut buf = "A\n\nB\n\n".to_string();
|
||||||
|
let paras = drain_complete_paragraphs(&mut buf);
|
||||||
|
assert_eq!(paras, vec!["A", "B"]);
|
||||||
|
assert_eq!(buf, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn drain_complete_paragraphs_empty_input() {
|
||||||
|
let mut buf = String::new();
|
||||||
|
let paras = drain_complete_paragraphs(&mut buf);
|
||||||
|
assert!(paras.is_empty());
|
||||||
|
assert_eq!(buf, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn drain_complete_paragraphs_skips_empty_chunks() {
|
||||||
|
// Consecutive double-newlines produce no empty paragraphs.
|
||||||
|
let mut buf = "\n\n\n\nHello".to_string();
|
||||||
|
let paras = drain_complete_paragraphs(&mut buf);
|
||||||
|
assert!(paras.is_empty());
|
||||||
|
assert_eq!(buf, "Hello");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn drain_complete_paragraphs_trims_whitespace() {
|
||||||
|
let mut buf = " Hello \n\n World ".to_string();
|
||||||
|
let paras = drain_complete_paragraphs(&mut buf);
|
||||||
|
assert_eq!(paras, vec!["Hello"]);
|
||||||
|
assert_eq!(buf, " World ");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn drain_complete_paragraphs_incremental_simulation() {
|
||||||
|
// Simulate tokens arriving one character at a time.
|
||||||
|
let mut buf = String::new();
|
||||||
|
let mut all_paragraphs = Vec::new();
|
||||||
|
|
||||||
|
for ch in "First para.\n\nSecond para.\n\nThird.".chars() {
|
||||||
|
buf.push(ch);
|
||||||
|
all_paragraphs.extend(drain_complete_paragraphs(&mut buf));
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_eq!(all_paragraphs, vec!["First para.", "Second para."]);
|
||||||
|
assert_eq!(buf, "Third.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user