story-kit: merge 182_story_matrix_bot_conversation_context_and_multi_room

This commit is contained in:
Dave
2026-02-25 15:25:13 +00:00
parent 01ca1a20d7
commit 4b4d221d6c
4 changed files with 492 additions and 72 deletions

View File

@@ -12,23 +12,66 @@ use matrix_sdk::{
},
},
};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::watch;
use tokio::sync::Mutex as TokioMutex;
use super::config::BotConfig;
// ---------------------------------------------------------------------------
// Conversation history types
// ---------------------------------------------------------------------------
/// Role of a participant in the conversation history.
#[derive(Clone, Debug, PartialEq)]
pub enum ConversationRole {
/// A message sent by a Matrix room participant.
User,
/// A response generated by the bot / LLM.
Assistant,
}
/// A single turn in the per-room conversation history.
#[derive(Clone, Debug)]
pub struct ConversationEntry {
pub role: ConversationRole,
/// Matrix user ID (e.g. `@alice:example.com`). Empty for assistant turns.
pub sender: String,
pub content: String,
}
/// Per-room conversation history, keyed by room ID.
///
/// Wrapped in `Arc<TokioMutex<…>>` so it can be shared across concurrent
/// event-handler tasks without blocking the sync loop.
pub type ConversationHistory = Arc<TokioMutex<HashMap<OwnedRoomId, Vec<ConversationEntry>>>>;
// ---------------------------------------------------------------------------
// Bot context
// ---------------------------------------------------------------------------
/// Shared context injected into Matrix event handlers.
#[derive(Clone)]
pub struct BotContext {
pub bot_user_id: OwnedUserId,
pub target_room_id: OwnedRoomId,
/// All room IDs the bot listens in.
pub target_room_ids: Vec<OwnedRoomId>,
pub project_root: PathBuf,
pub allowed_users: Vec<String>,
/// Shared, per-room rolling conversation history.
pub history: ConversationHistory,
/// Maximum number of entries to keep per room before trimming the oldest.
pub history_size: usize,
}
/// Connect to the Matrix homeserver, join the configured room, and start
// ---------------------------------------------------------------------------
// Bot entry point
// ---------------------------------------------------------------------------
/// Connect to the Matrix homeserver, join all configured rooms, and start
/// listening for messages. Runs the full Matrix sync loop — call from a
/// `tokio::spawn` task so it doesn't block the main thread.
pub async fn run_bot(config: BotConfig, project_root: PathBuf) -> Result<(), String> {
@@ -55,25 +98,6 @@ pub async fn run_bot(config: BotConfig, project_root: PathBuf) -> Result<(), Str
slog!("[matrix-bot] Logged in as {bot_user_id}");
// Parse and join the configured room
let target_room_id: OwnedRoomId = config
.room_id
.parse()
.map_err(|_| format!("Invalid room ID '{}'", config.room_id))?;
// Try to join the room with a timeout. Conduit sometimes hangs or
// returns errors on join if the bot is already a member.
match tokio::time::timeout(
std::time::Duration::from_secs(10),
client.join_room_by_id(&target_room_id),
)
.await
{
Ok(Ok(_)) => slog!("[matrix-bot] Joined room {target_room_id}"),
Ok(Err(e)) => slog!("[matrix-bot] Join room error (may already be a member): {e}"),
Err(_) => slog!("[matrix-bot] Join room timed out (may already be a member)"),
}
if config.allowed_users.is_empty() {
return Err(
"allowed_users is empty in bot.toml — refusing to start (fail-closed). \
@@ -87,11 +111,48 @@ pub async fn run_bot(config: BotConfig, project_root: PathBuf) -> Result<(), Str
config.allowed_users
);
// Parse and join all configured rooms.
let mut target_room_ids: Vec<OwnedRoomId> = Vec::new();
for room_id_str in config.effective_room_ids() {
let room_id: OwnedRoomId = room_id_str
.parse()
.map_err(|_| format!("Invalid room ID '{room_id_str}'"))?;
// Try to join with a timeout. Conduit sometimes hangs or returns
// errors on join if the bot is already a member.
match tokio::time::timeout(
std::time::Duration::from_secs(10),
client.join_room_by_id(&room_id),
)
.await
{
Ok(Ok(_)) => slog!("[matrix-bot] Joined room {room_id}"),
Ok(Err(e)) => {
slog!("[matrix-bot] Join room error (may already be a member): {e}")
}
Err(_) => slog!("[matrix-bot] Join room timed out (may already be a member)"),
}
target_room_ids.push(room_id);
}
if target_room_ids.is_empty() {
return Err("No valid room IDs configured — cannot start".to_string());
}
slog!(
"[matrix-bot] Listening in {} room(s): {:?}",
target_room_ids.len(),
target_room_ids
);
let ctx = BotContext {
bot_user_id,
target_room_id,
target_room_ids,
project_root,
allowed_users: config.allowed_users,
history: Arc::new(TokioMutex::new(HashMap::new())),
history_size: config.history_size,
};
// Register event handler and inject shared context
@@ -109,6 +170,10 @@ pub async fn run_bot(config: BotConfig, project_root: PathBuf) -> Result<(), Str
Ok(())
}
// ---------------------------------------------------------------------------
// Event handler
// ---------------------------------------------------------------------------
/// Matrix event handler for room messages. Each invocation spawns an
/// independent task so the sync loop is not blocked by LLM calls.
async fn on_room_message(
@@ -116,25 +181,30 @@ async fn on_room_message(
room: Room,
Ctx(ctx): Ctx<BotContext>,
) {
let incoming_room_id = room.room_id().to_owned();
slog!(
"[matrix-bot] Event received: room={} sender={} target={}",
room.room_id(),
"[matrix-bot] Event received: room={} sender={}",
incoming_room_id,
ev.sender,
ctx.target_room_id
);
// Only handle messages in the configured room
if room.room_id() != &*ctx.target_room_id {
slog!("[matrix-bot] Ignoring message from wrong room");
// Only handle messages from rooms we are configured to listen in.
if !ctx
.target_room_ids
.iter()
.any(|r| r == &incoming_room_id)
{
slog!("[matrix-bot] Ignoring message from unconfigured room {incoming_room_id}");
return;
}
// Ignore the bot's own messages to prevent echo loops
// Ignore the bot's own messages to prevent echo loops.
if ev.sender == ctx.bot_user_id {
return;
}
// Only respond to users on the allowlist (fail-closed)
// Only respond to users on the allowlist (fail-closed).
if !ctx.allowed_users.iter().any(|u| u == ev.sender.as_str()) {
slog!(
"[matrix-bot] Ignoring message from unauthorised user: {}",
@@ -143,39 +213,72 @@ async fn on_room_message(
return;
}
// Only handle plain text messages
// Only handle plain text messages.
let MessageType::Text(text_content) = ev.content.msgtype else {
return;
};
let sender = ev.sender.to_string();
let user_message = text_content.body.clone();
slog!("[matrix-bot] Message from {}: {user_message}", ev.sender);
slog!("[matrix-bot] Message from {sender}: {user_message}");
// Spawn a separate task so the Matrix sync loop is not blocked while we
// wait for the LLM response (which can take several seconds).
tokio::spawn(async move {
handle_message(room, ctx, user_message).await;
handle_message(room, incoming_room_id, ctx, sender, user_message).await;
});
}
/// Drain all complete paragraphs from `buffer` and return them.
///
/// A paragraph boundary is a double newline (`\n\n`). Each drained paragraph
/// is trimmed of surrounding whitespace; empty paragraphs are discarded.
/// The buffer is left with only the remaining incomplete text.
pub fn drain_complete_paragraphs(buffer: &mut String) -> Vec<String> {
let mut paragraphs = Vec::new();
while let Some(pos) = buffer.find("\n\n") {
let chunk = buffer[..pos].trim().to_string();
*buffer = buffer[pos + 2..].to_string();
if !chunk.is_empty() {
paragraphs.push(chunk);
// ---------------------------------------------------------------------------
// Message handler
// ---------------------------------------------------------------------------
/// Build a context string from the room's conversation history to prepend to
/// the user's current message. Returns an empty string when history is empty.
fn build_context_prefix(
history: &[ConversationEntry],
current_sender: &str,
current_message: &str,
) -> String {
if history.is_empty() {
return format!("{current_sender}: {current_message}");
}
let mut out = String::from("[Conversation history for this room]\n");
for entry in history {
match entry.role {
ConversationRole::User => {
out.push_str(&format!("User ({}): {}\n", entry.sender, entry.content));
}
ConversationRole::Assistant => {
out.push_str(&format!("Assistant: {}\n", entry.content));
}
}
}
paragraphs
out.push('\n');
out.push_str(&format!(
"Current message from {current_sender}: {current_message}"
));
out
}
async fn handle_message(room: Room, ctx: BotContext, user_message: String) {
async fn handle_message(
room: Room,
room_id: OwnedRoomId,
ctx: BotContext,
sender: String,
user_message: String,
) {
// Read current history for this room before calling the LLM.
let history_snapshot: Vec<ConversationEntry> = {
let guard = ctx.history.lock().await;
guard.get(&room_id).cloned().unwrap_or_default()
};
// Build the prompt with conversation context.
let prompt_with_context =
build_context_prefix(&history_snapshot, &sender, &user_message);
let provider = ClaudeCodeProvider::new();
let (cancel_tx, mut cancel_rx) = watch::channel(false);
// Keep the sender alive for the duration of the call.
@@ -204,9 +307,9 @@ async fn handle_message(room: Room, ctx: BotContext, user_message: String) {
let result = provider
.chat_stream(
&user_message,
&prompt_with_context,
&ctx.project_root.to_string_lossy(),
None, // No session resumption for now (see story 182)
None, // Each Matrix conversation turn is independent at the Claude Code session level.
&mut cancel_rx,
move |token| {
let mut buf = buffer_for_callback.lock().unwrap();
@@ -227,10 +330,11 @@ async fn handle_message(room: Room, ctx: BotContext, user_message: String) {
let remaining = buffer.lock().unwrap().trim().to_string();
let did_send_any = sent_any_chunk.load(Ordering::Relaxed);
match result {
let assistant_reply = match result {
Ok(ClaudeCodeResult { messages, .. }) => {
if !remaining.is_empty() {
let _ = msg_tx.send(remaining);
let _ = msg_tx.send(remaining.clone());
remaining
} else if !did_send_any {
// Nothing was streamed at all (e.g. only tool calls with no
// final text) — fall back to the last assistant message from
@@ -242,23 +346,73 @@ async fn handle_message(room: Room, ctx: BotContext, user_message: String) {
.map(|m| m.content.clone())
.unwrap_or_default();
if !last_text.is_empty() {
let _ = msg_tx.send(last_text);
let _ = msg_tx.send(last_text.clone());
}
last_text
} else {
remaining
}
}
Err(e) => {
slog!("[matrix-bot] LLM error: {e}");
// Discard any partial buffered text and send the error as one message.
let _ = msg_tx.send(format!("Error processing your request: {e}"));
let err_msg = format!("Error processing your request: {e}");
let _ = msg_tx.send(err_msg.clone());
err_msg
}
}
};
// Drop the sender to signal the posting task that no more messages will
// arrive, then wait for all pending Matrix sends to complete.
drop(msg_tx);
let _ = post_task.await;
// Record this exchange in the per-room conversation history.
if !assistant_reply.starts_with("Error processing") {
let mut guard = ctx.history.lock().await;
let entries = guard.entry(room_id).or_default();
entries.push(ConversationEntry {
role: ConversationRole::User,
sender: sender.clone(),
content: user_message,
});
entries.push(ConversationEntry {
role: ConversationRole::Assistant,
sender: String::new(),
content: assistant_reply,
});
// Trim to the configured maximum, dropping the oldest entries first.
if entries.len() > ctx.history_size {
let excess = entries.len() - ctx.history_size;
entries.drain(..excess);
}
}
}
// ---------------------------------------------------------------------------
// Paragraph buffering helper
// ---------------------------------------------------------------------------
/// Drain all complete paragraphs from `buffer` and return them.
///
/// A paragraph boundary is a double newline (`\n\n`). Each drained paragraph
/// is trimmed of surrounding whitespace; empty paragraphs are discarded.
/// The buffer is left with only the remaining incomplete text.
pub fn drain_complete_paragraphs(buffer: &mut String) -> Vec<String> {
let mut paragraphs = Vec::new();
while let Some(pos) = buffer.find("\n\n") {
let chunk = buffer[..pos].trim().to_string();
*buffer = buffer[pos + 2..].to_string();
if !chunk.is_empty() {
paragraphs.push(chunk);
}
}
paragraphs
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
@@ -270,6 +424,8 @@ mod tests {
assert_clone::<BotContext>();
}
// -- drain_complete_paragraphs ------------------------------------------
#[test]
fn drain_complete_paragraphs_no_boundary_returns_empty() {
let mut buf = "Hello World".to_string();
@@ -341,4 +497,137 @@ mod tests {
assert_eq!(all_paragraphs, vec!["First para.", "Second para."]);
assert_eq!(buf, "Third.");
}
// -- build_context_prefix -----------------------------------------------
#[test]
fn build_context_prefix_empty_history() {
let prefix = build_context_prefix(&[], "@alice:example.com", "Hello!");
assert_eq!(prefix, "@alice:example.com: Hello!");
}
#[test]
fn build_context_prefix_includes_history_entries() {
let history = vec![
ConversationEntry {
role: ConversationRole::User,
sender: "@alice:example.com".to_string(),
content: "What is story 42?".to_string(),
},
ConversationEntry {
role: ConversationRole::Assistant,
sender: String::new(),
content: "Story 42 is about…".to_string(),
},
];
let prefix = build_context_prefix(&history, "@bob:example.com", "Tell me more.");
assert!(prefix.contains("[Conversation history for this room]"));
assert!(prefix.contains("User (@alice:example.com): What is story 42?"));
assert!(prefix.contains("Assistant: Story 42 is about…"));
assert!(prefix.contains("Current message from @bob:example.com: Tell me more."));
}
#[test]
fn build_context_prefix_attributes_multiple_users() {
let history = vec![
ConversationEntry {
role: ConversationRole::User,
sender: "@alice:example.com".to_string(),
content: "First question".to_string(),
},
ConversationEntry {
role: ConversationRole::Assistant,
sender: String::new(),
content: "First answer".to_string(),
},
ConversationEntry {
role: ConversationRole::User,
sender: "@bob:example.com".to_string(),
content: "Follow-up".to_string(),
},
ConversationEntry {
role: ConversationRole::Assistant,
sender: String::new(),
content: "Second answer".to_string(),
},
];
let prefix = build_context_prefix(&history, "@alice:example.com", "Another question");
assert!(prefix.contains("User (@alice:example.com): First question"));
assert!(prefix.contains("User (@bob:example.com): Follow-up"));
}
// -- conversation history trimming --------------------------------------
#[tokio::test]
async fn history_trims_to_configured_size() {
let history: ConversationHistory =
Arc::new(TokioMutex::new(HashMap::new()));
let room_id: OwnedRoomId = "!test:example.com".parse().unwrap();
let history_size = 4usize; // keep at most 4 entries
// Add 6 entries (3 user + 3 assistant turns).
{
let mut guard = history.lock().await;
let entries = guard.entry(room_id.clone()).or_default();
for i in 0..3usize {
entries.push(ConversationEntry {
role: ConversationRole::User,
sender: "@user:example.com".to_string(),
content: format!("msg {i}"),
});
entries.push(ConversationEntry {
role: ConversationRole::Assistant,
sender: String::new(),
content: format!("reply {i}"),
});
if entries.len() > history_size {
let excess = entries.len() - history_size;
entries.drain(..excess);
}
}
}
let guard = history.lock().await;
let entries = guard.get(&room_id).unwrap();
assert_eq!(
entries.len(),
history_size,
"history must be trimmed to history_size"
);
// The oldest entries (msg 0 / reply 0) should have been dropped.
assert!(
entries.iter().all(|e| !e.content.contains("msg 0")),
"oldest entries must be dropped"
);
}
#[tokio::test]
async fn each_room_has_independent_history() {
let history: ConversationHistory =
Arc::new(TokioMutex::new(HashMap::new()));
let room_a: OwnedRoomId = "!room_a:example.com".parse().unwrap();
let room_b: OwnedRoomId = "!room_b:example.com".parse().unwrap();
{
let mut guard = history.lock().await;
guard.entry(room_a.clone()).or_default().push(ConversationEntry {
role: ConversationRole::User,
sender: "@alice:example.com".to_string(),
content: "Room A message".to_string(),
});
guard.entry(room_b.clone()).or_default().push(ConversationEntry {
role: ConversationRole::User,
sender: "@bob:example.com".to_string(),
content: "Room B message".to_string(),
});
}
let guard = history.lock().await;
let entries_a = guard.get(&room_a).unwrap();
let entries_b = guard.get(&room_b).unwrap();
assert_eq!(entries_a.len(), 1);
assert_eq!(entries_b.len(), 1);
assert_eq!(entries_a[0].content, "Room A message");
assert_eq!(entries_b[0].content, "Room B message");
}
}