2026-02-25 12:42:11 +00:00
|
|
|
use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult};
|
|
|
|
|
use crate::slog;
|
|
|
|
|
use matrix_sdk::{
|
|
|
|
|
Client,
|
|
|
|
|
config::SyncSettings,
|
|
|
|
|
event_handler::Ctx,
|
|
|
|
|
room::Room,
|
|
|
|
|
ruma::{
|
|
|
|
|
OwnedRoomId, OwnedUserId,
|
|
|
|
|
events::room::message::{
|
|
|
|
|
MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent,
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
};
|
2026-02-25 14:17:55 +00:00
|
|
|
use std::path::PathBuf;
|
2026-02-25 12:42:11 +00:00
|
|
|
use std::sync::Arc;
|
2026-02-25 14:17:55 +00:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2026-02-25 12:42:11 +00:00
|
|
|
use tokio::sync::watch;
|
|
|
|
|
|
|
|
|
|
use super::config::BotConfig;
|
|
|
|
|
|
|
|
|
|
/// Shared context injected into Matrix event handlers.
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
pub struct BotContext {
|
|
|
|
|
pub bot_user_id: OwnedUserId,
|
|
|
|
|
pub target_room_id: OwnedRoomId,
|
|
|
|
|
pub project_root: PathBuf,
|
2026-02-25 14:59:20 +00:00
|
|
|
pub allowed_users: Vec<String>,
|
2026-02-25 12:42:11 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Connect to the Matrix homeserver, join the configured room, and start
|
|
|
|
|
/// listening for messages. Runs the full Matrix sync loop — call from a
|
|
|
|
|
/// `tokio::spawn` task so it doesn't block the main thread.
|
|
|
|
|
pub async fn run_bot(config: BotConfig, project_root: PathBuf) -> Result<(), String> {
|
2026-02-25 13:46:20 +00:00
|
|
|
let store_path = project_root.join(".story_kit").join("matrix_store");
|
2026-02-25 12:42:11 +00:00
|
|
|
let client = Client::builder()
|
|
|
|
|
.homeserver_url(&config.homeserver)
|
2026-02-25 13:46:20 +00:00
|
|
|
.sqlite_store(&store_path, None)
|
2026-02-25 12:42:11 +00:00
|
|
|
.build()
|
|
|
|
|
.await
|
|
|
|
|
.map_err(|e| format!("Failed to build Matrix client: {e}"))?;
|
|
|
|
|
|
|
|
|
|
// Login
|
|
|
|
|
client
|
|
|
|
|
.matrix_auth()
|
|
|
|
|
.login_username(&config.username, &config.password)
|
|
|
|
|
.initial_device_display_name("Story Kit Bot")
|
|
|
|
|
.await
|
|
|
|
|
.map_err(|e| format!("Matrix login failed: {e}"))?;
|
|
|
|
|
|
|
|
|
|
let bot_user_id = client
|
|
|
|
|
.user_id()
|
|
|
|
|
.ok_or_else(|| "No user ID after login".to_string())?
|
|
|
|
|
.to_owned();
|
|
|
|
|
|
|
|
|
|
slog!("[matrix-bot] Logged in as {bot_user_id}");
|
|
|
|
|
|
|
|
|
|
// Parse and join the configured room
|
|
|
|
|
let target_room_id: OwnedRoomId = config
|
|
|
|
|
.room_id
|
|
|
|
|
.parse()
|
|
|
|
|
.map_err(|_| format!("Invalid room ID '{}'", config.room_id))?;
|
|
|
|
|
|
2026-02-25 13:46:20 +00:00
|
|
|
// Try to join the room with a timeout. Conduit sometimes hangs or
|
|
|
|
|
// returns errors on join if the bot is already a member.
|
|
|
|
|
match tokio::time::timeout(
|
|
|
|
|
std::time::Duration::from_secs(10),
|
|
|
|
|
client.join_room_by_id(&target_room_id),
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(Ok(_)) => slog!("[matrix-bot] Joined room {target_room_id}"),
|
|
|
|
|
Ok(Err(e)) => slog!("[matrix-bot] Join room error (may already be a member): {e}"),
|
|
|
|
|
Err(_) => slog!("[matrix-bot] Join room timed out (may already be a member)"),
|
|
|
|
|
}
|
2026-02-25 12:42:11 +00:00
|
|
|
|
2026-02-25 14:59:20 +00:00
|
|
|
if config.allowed_users.is_empty() {
|
|
|
|
|
return Err(
|
|
|
|
|
"allowed_users is empty in bot.toml — refusing to start (fail-closed). \
|
|
|
|
|
Add at least one Matrix user ID to allowed_users."
|
|
|
|
|
.to_string(),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
slog!(
|
|
|
|
|
"[matrix-bot] Allowed users: {:?}",
|
|
|
|
|
config.allowed_users
|
|
|
|
|
);
|
|
|
|
|
|
2026-02-25 12:42:11 +00:00
|
|
|
let ctx = BotContext {
|
|
|
|
|
bot_user_id,
|
|
|
|
|
target_room_id,
|
|
|
|
|
project_root,
|
2026-02-25 14:59:20 +00:00
|
|
|
allowed_users: config.allowed_users,
|
2026-02-25 12:42:11 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Register event handler and inject shared context
|
|
|
|
|
client.add_event_handler_context(ctx);
|
|
|
|
|
client.add_event_handler(on_room_message);
|
|
|
|
|
|
|
|
|
|
slog!("[matrix-bot] Starting Matrix sync loop");
|
|
|
|
|
|
|
|
|
|
// This blocks until the connection is terminated or an error occurs.
|
|
|
|
|
client
|
|
|
|
|
.sync(SyncSettings::default())
|
|
|
|
|
.await
|
|
|
|
|
.map_err(|e| format!("Matrix sync error: {e}"))?;
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Matrix event handler for room messages. Each invocation spawns an
|
|
|
|
|
/// independent task so the sync loop is not blocked by LLM calls.
|
|
|
|
|
async fn on_room_message(
|
|
|
|
|
ev: OriginalSyncRoomMessageEvent,
|
|
|
|
|
room: Room,
|
|
|
|
|
Ctx(ctx): Ctx<BotContext>,
|
|
|
|
|
) {
|
2026-02-25 13:46:20 +00:00
|
|
|
slog!(
|
|
|
|
|
"[matrix-bot] Event received: room={} sender={} target={}",
|
|
|
|
|
room.room_id(),
|
|
|
|
|
ev.sender,
|
|
|
|
|
ctx.target_room_id
|
|
|
|
|
);
|
|
|
|
|
|
2026-02-25 12:42:11 +00:00
|
|
|
// Only handle messages in the configured room
|
|
|
|
|
if room.room_id() != &*ctx.target_room_id {
|
2026-02-25 13:46:20 +00:00
|
|
|
slog!("[matrix-bot] Ignoring message from wrong room");
|
2026-02-25 12:42:11 +00:00
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Ignore the bot's own messages to prevent echo loops
|
|
|
|
|
if ev.sender == ctx.bot_user_id {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-25 14:59:20 +00:00
|
|
|
// Only respond to users on the allowlist (fail-closed)
|
|
|
|
|
if !ctx.allowed_users.iter().any(|u| u == ev.sender.as_str()) {
|
|
|
|
|
slog!(
|
|
|
|
|
"[matrix-bot] Ignoring message from unauthorised user: {}",
|
|
|
|
|
ev.sender
|
|
|
|
|
);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-25 12:42:11 +00:00
|
|
|
// Only handle plain text messages
|
|
|
|
|
let MessageType::Text(text_content) = ev.content.msgtype else {
|
|
|
|
|
return;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let user_message = text_content.body.clone();
|
|
|
|
|
slog!("[matrix-bot] Message from {}: {user_message}", ev.sender);
|
|
|
|
|
|
|
|
|
|
// Spawn a separate task so the Matrix sync loop is not blocked while we
|
|
|
|
|
// wait for the LLM response (which can take several seconds).
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
handle_message(room, ctx, user_message).await;
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-25 14:17:55 +00:00
|
|
|
/// Drain all complete paragraphs from `buffer` and return them.
|
|
|
|
|
///
|
|
|
|
|
/// A paragraph boundary is a double newline (`\n\n`). Each drained paragraph
|
|
|
|
|
/// is trimmed of surrounding whitespace; empty paragraphs are discarded.
|
|
|
|
|
/// The buffer is left with only the remaining incomplete text.
|
|
|
|
|
pub fn drain_complete_paragraphs(buffer: &mut String) -> Vec<String> {
|
|
|
|
|
let mut paragraphs = Vec::new();
|
|
|
|
|
while let Some(pos) = buffer.find("\n\n") {
|
|
|
|
|
let chunk = buffer[..pos].trim().to_string();
|
|
|
|
|
*buffer = buffer[pos + 2..].to_string();
|
|
|
|
|
if !chunk.is_empty() {
|
|
|
|
|
paragraphs.push(chunk);
|
2026-02-25 12:42:11 +00:00
|
|
|
}
|
|
|
|
|
}
|
2026-02-25 14:17:55 +00:00
|
|
|
paragraphs
|
2026-02-25 12:42:11 +00:00
|
|
|
}
|
|
|
|
|
|
2026-02-25 14:17:55 +00:00
|
|
|
async fn handle_message(room: Room, ctx: BotContext, user_message: String) {
|
2026-02-25 12:42:11 +00:00
|
|
|
let provider = ClaudeCodeProvider::new();
|
|
|
|
|
let (cancel_tx, mut cancel_rx) = watch::channel(false);
|
|
|
|
|
// Keep the sender alive for the duration of the call.
|
|
|
|
|
let _cancel_tx = cancel_tx;
|
|
|
|
|
|
2026-02-25 14:17:55 +00:00
|
|
|
// Channel for sending complete paragraphs to the Matrix posting task.
|
|
|
|
|
let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
|
|
|
|
|
let msg_tx_for_callback = msg_tx.clone();
|
|
|
|
|
|
|
|
|
|
// Spawn a task to post messages to Matrix as they arrive so we don't
|
|
|
|
|
// block the LLM stream while waiting for Matrix send round-trips.
|
|
|
|
|
let post_room = room.clone();
|
|
|
|
|
let post_task = tokio::spawn(async move {
|
|
|
|
|
while let Some(chunk) = msg_rx.recv().await {
|
|
|
|
|
let _ = post_room
|
|
|
|
|
.send(RoomMessageEventContent::text_plain(chunk))
|
|
|
|
|
.await;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Shared state between the sync token callback and the async outer scope.
|
|
|
|
|
let buffer = Arc::new(std::sync::Mutex::new(String::new()));
|
|
|
|
|
let buffer_for_callback = Arc::clone(&buffer);
|
|
|
|
|
let sent_any_chunk = Arc::new(AtomicBool::new(false));
|
|
|
|
|
let sent_any_chunk_for_callback = Arc::clone(&sent_any_chunk);
|
2026-02-25 12:42:11 +00:00
|
|
|
|
2026-02-25 14:17:55 +00:00
|
|
|
let result = provider
|
2026-02-25 12:42:11 +00:00
|
|
|
.chat_stream(
|
2026-02-25 14:17:55 +00:00
|
|
|
&user_message,
|
|
|
|
|
&ctx.project_root.to_string_lossy(),
|
2026-02-25 12:42:11 +00:00
|
|
|
None, // No session resumption for now (see story 182)
|
|
|
|
|
&mut cancel_rx,
|
|
|
|
|
move |token| {
|
2026-02-25 14:17:55 +00:00
|
|
|
let mut buf = buffer_for_callback.lock().unwrap();
|
|
|
|
|
buf.push_str(token);
|
|
|
|
|
// Flush complete paragraphs as they arrive.
|
|
|
|
|
let paragraphs = drain_complete_paragraphs(&mut buf);
|
|
|
|
|
for chunk in paragraphs {
|
|
|
|
|
sent_any_chunk_for_callback.store(true, Ordering::Relaxed);
|
|
|
|
|
let _ = msg_tx_for_callback.send(chunk);
|
|
|
|
|
}
|
2026-02-25 12:42:11 +00:00
|
|
|
},
|
|
|
|
|
|_thinking| {}, // Discard thinking tokens
|
|
|
|
|
|_activity| {}, // Discard activity signals
|
|
|
|
|
)
|
2026-02-25 14:17:55 +00:00
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
// Flush any remaining text that didn't end with a paragraph boundary.
|
|
|
|
|
let remaining = buffer.lock().unwrap().trim().to_string();
|
|
|
|
|
let did_send_any = sent_any_chunk.load(Ordering::Relaxed);
|
2026-02-25 12:42:11 +00:00
|
|
|
|
2026-02-25 14:17:55 +00:00
|
|
|
match result {
|
|
|
|
|
Ok(ClaudeCodeResult { messages, .. }) => {
|
|
|
|
|
if !remaining.is_empty() {
|
|
|
|
|
let _ = msg_tx.send(remaining);
|
|
|
|
|
} else if !did_send_any {
|
|
|
|
|
// Nothing was streamed at all (e.g. only tool calls with no
|
|
|
|
|
// final text) — fall back to the last assistant message from
|
|
|
|
|
// the structured result.
|
|
|
|
|
let last_text = messages
|
|
|
|
|
.iter()
|
|
|
|
|
.rev()
|
|
|
|
|
.find(|m| m.role == crate::llm::types::Role::Assistant && !m.content.is_empty())
|
|
|
|
|
.map(|m| m.content.clone())
|
|
|
|
|
.unwrap_or_default();
|
|
|
|
|
if !last_text.is_empty() {
|
|
|
|
|
let _ = msg_tx.send(last_text);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
slog!("[matrix-bot] LLM error: {e}");
|
|
|
|
|
// Discard any partial buffered text and send the error as one message.
|
|
|
|
|
let _ = msg_tx.send(format!("Error processing your request: {e}"));
|
|
|
|
|
}
|
2026-02-25 12:42:11 +00:00
|
|
|
}
|
2026-02-25 14:17:55 +00:00
|
|
|
|
|
|
|
|
// Drop the sender to signal the posting task that no more messages will
|
|
|
|
|
// arrive, then wait for all pending Matrix sends to complete.
|
|
|
|
|
drop(msg_tx);
|
|
|
|
|
let _ = post_task.await;
|
2026-02-25 12:42:11 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests {
|
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn bot_context_is_clone() {
|
|
|
|
|
// BotContext must be Clone for the Matrix event handler injection.
|
|
|
|
|
fn assert_clone<T: Clone>() {}
|
|
|
|
|
assert_clone::<BotContext>();
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-25 14:17:55 +00:00
|
|
|
#[test]
|
|
|
|
|
fn drain_complete_paragraphs_no_boundary_returns_empty() {
|
|
|
|
|
let mut buf = "Hello World".to_string();
|
|
|
|
|
let paras = drain_complete_paragraphs(&mut buf);
|
|
|
|
|
assert!(paras.is_empty());
|
|
|
|
|
assert_eq!(buf, "Hello World");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn drain_complete_paragraphs_single_boundary() {
|
|
|
|
|
let mut buf = "Paragraph one.\n\nParagraph two.".to_string();
|
|
|
|
|
let paras = drain_complete_paragraphs(&mut buf);
|
|
|
|
|
assert_eq!(paras, vec!["Paragraph one."]);
|
|
|
|
|
assert_eq!(buf, "Paragraph two.");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn drain_complete_paragraphs_multiple_boundaries() {
|
|
|
|
|
let mut buf = "A\n\nB\n\nC".to_string();
|
|
|
|
|
let paras = drain_complete_paragraphs(&mut buf);
|
|
|
|
|
assert_eq!(paras, vec!["A", "B"]);
|
|
|
|
|
assert_eq!(buf, "C");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn drain_complete_paragraphs_trailing_boundary() {
|
|
|
|
|
let mut buf = "A\n\nB\n\n".to_string();
|
|
|
|
|
let paras = drain_complete_paragraphs(&mut buf);
|
|
|
|
|
assert_eq!(paras, vec!["A", "B"]);
|
|
|
|
|
assert_eq!(buf, "");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn drain_complete_paragraphs_empty_input() {
|
|
|
|
|
let mut buf = String::new();
|
|
|
|
|
let paras = drain_complete_paragraphs(&mut buf);
|
|
|
|
|
assert!(paras.is_empty());
|
|
|
|
|
assert_eq!(buf, "");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn drain_complete_paragraphs_skips_empty_chunks() {
|
|
|
|
|
// Consecutive double-newlines produce no empty paragraphs.
|
|
|
|
|
let mut buf = "\n\n\n\nHello".to_string();
|
|
|
|
|
let paras = drain_complete_paragraphs(&mut buf);
|
|
|
|
|
assert!(paras.is_empty());
|
|
|
|
|
assert_eq!(buf, "Hello");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn drain_complete_paragraphs_trims_whitespace() {
|
|
|
|
|
let mut buf = " Hello \n\n World ".to_string();
|
|
|
|
|
let paras = drain_complete_paragraphs(&mut buf);
|
|
|
|
|
assert_eq!(paras, vec!["Hello"]);
|
|
|
|
|
assert_eq!(buf, " World ");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn drain_complete_paragraphs_incremental_simulation() {
|
|
|
|
|
// Simulate tokens arriving one character at a time.
|
|
|
|
|
let mut buf = String::new();
|
|
|
|
|
let mut all_paragraphs = Vec::new();
|
|
|
|
|
|
|
|
|
|
for ch in "First para.\n\nSecond para.\n\nThird.".chars() {
|
|
|
|
|
buf.push(ch);
|
|
|
|
|
all_paragraphs.extend(drain_complete_paragraphs(&mut buf));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
assert_eq!(all_paragraphs, vec!["First para.", "Second para."]);
|
|
|
|
|
assert_eq!(buf, "Third.");
|
2026-02-25 12:42:11 +00:00
|
|
|
}
|
|
|
|
|
}
|