story-kit: merge 316_refactor_abstract_bot_transport_layer_for_multi_platform_support

This commit is contained in:
Dave
2026-03-19 20:43:38 +00:00
parent 3b9cea22bb
commit 7eb9686bfb
10 changed files with 510 additions and 163 deletions
+72 -50
View File
@@ -2,6 +2,7 @@ use crate::agents::AgentPool;
use crate::http::context::{PermissionDecision, PermissionForward};
use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult};
use crate::slog;
use crate::transport::ChatTransport;
use matrix_sdk::{
Client,
config::SyncSettings,
@@ -10,7 +11,7 @@ use matrix_sdk::{
ruma::{
OwnedEventId, OwnedRoomId, OwnedUserId,
events::room::message::{
MessageType, OriginalSyncRoomMessageEvent, Relation, RoomMessageEventContent,
MessageType, OriginalSyncRoomMessageEvent, Relation,
RoomMessageEventContentWithoutRelation,
},
},
@@ -169,12 +170,18 @@ pub struct BotContext {
/// Set of room IDs where ambient mode is active. In ambient mode the bot
/// responds to all messages rather than only addressed ones.
/// Uses a sync mutex since locks are never held across await points.
pub ambient_rooms: Arc<std::sync::Mutex<HashSet<OwnedRoomId>>>,
/// Room IDs are stored as plain strings (platform-agnostic).
pub ambient_rooms: Arc<std::sync::Mutex<HashSet<String>>>,
/// Agent pool for checking agent availability.
pub agents: Arc<AgentPool>,
/// Per-room htop monitoring sessions. Keyed by room ID; each entry holds
/// a stop-signal sender that the background task watches.
pub htop_sessions: super::htop::HtopSessions,
/// Chat transport used for sending and editing messages.
///
/// All message I/O goes through this abstraction so the bot logic works
/// with any platform, not just Matrix.
pub transport: Arc<dyn ChatTransport>,
}
// ---------------------------------------------------------------------------
@@ -344,12 +351,11 @@ pub async fn run_bot(
persisted.len()
);
// Restore persisted ambient rooms from config, ignoring any that are not
// in the configured target_room_ids to avoid stale entries.
let persisted_ambient: HashSet<OwnedRoomId> = config
// Restore persisted ambient rooms from config.
let persisted_ambient: HashSet<String> = config
.ambient_rooms
.iter()
.filter_map(|s| s.parse::<OwnedRoomId>().ok())
.cloned()
.collect();
if !persisted_ambient.is_empty() {
slog!(
@@ -359,6 +365,18 @@ pub async fn run_bot(
);
}
// Create the transport abstraction based on the configured transport type.
let transport: Arc<dyn ChatTransport> = match config.transport.as_str() {
"whatsapp" => {
slog!("[matrix-bot] Using WhatsApp transport (stub)");
Arc::new(crate::whatsapp::WhatsAppTransport::new())
}
_ => {
slog!("[matrix-bot] Using Matrix transport");
Arc::new(super::transport_impl::MatrixTransport::new(client.clone()))
}
};
let bot_name = config
.display_name
.clone()
@@ -380,6 +398,7 @@ pub async fn run_bot(
ambient_rooms: Arc::new(std::sync::Mutex::new(persisted_ambient)),
agents,
htop_sessions: Arc::new(TokioMutex::new(HashMap::new())),
transport: Arc::clone(&transport),
};
slog!("[matrix-bot] Cryptographic identity verification is always ON — commands from unencrypted rooms or unverified devices are rejected");
@@ -391,9 +410,11 @@ pub async fn run_bot(
// Spawn the stage-transition notification listener before entering the
// sync loop so it starts receiving watcher events immediately.
let notif_room_id_strings: Vec<String> =
notif_room_ids.iter().map(|r| r.to_string()).collect();
super::notifications::spawn_notification_listener(
client.clone(),
notif_room_ids,
Arc::clone(&transport),
notif_room_id_strings,
watcher_rx,
notif_project_root,
);
@@ -403,15 +424,15 @@ pub async fn run_bot(
// reconnects internally so this code is never reached again on a network
// blip or sync resumption.
let announce_msg = format_startup_announcement(&announce_bot_name);
let announce_html = markdown_to_html(&announce_msg);
slog!("[matrix-bot] Sending startup announcement: {announce_msg}");
for room_id in &announce_room_ids {
if let Some(room) = client.get_room(room_id) {
let content = RoomMessageEventContent::text_plain(announce_msg.clone());
if let Err(e) = room.send(content).await {
slog!("[matrix-bot] Failed to send startup announcement to {room_id}: {e}");
}
} else {
slog!("[matrix-bot] Room {room_id} not found in client state, skipping announcement");
let room_id_str = room_id.to_string();
if let Err(e) = transport
.send_message(&room_id_str, &announce_msg, &announce_html)
.await
{
slog!("[matrix-bot] Failed to send startup announcement to {room_id}: {e}");
}
}
@@ -704,7 +725,8 @@ async fn on_room_message(
// ambient mode is enabled for this room.
let is_addressed = mentions_bot(&body, formatted_body.as_deref(), &ctx.bot_user_id)
|| is_reply_to_bot(ev.content.relates_to.as_ref(), &ctx.bot_sent_event_ids).await;
let is_ambient = ctx.ambient_rooms.lock().unwrap().contains(&incoming_room_id);
let room_id_str = incoming_room_id.to_string();
let is_ambient = ctx.ambient_rooms.lock().unwrap().contains(&room_id_str);
if !is_addressed && !is_ambient {
slog!(
@@ -765,11 +787,10 @@ async fn on_room_message(
"Permission denied."
};
let html = markdown_to_html(confirmation);
if let Ok(resp) = room
.send(RoomMessageEventContent::text_html(confirmation, html))
.await
if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, confirmation, &html).await
&& let Ok(event_id) = msg_id.parse()
{
ctx.bot_sent_event_ids.lock().await.insert(resp.event_id);
ctx.bot_sent_event_ids.lock().await.insert(event_id);
}
return;
}
@@ -788,17 +809,16 @@ async fn on_room_message(
project_root: &ctx.project_root,
agents: &ctx.agents,
ambient_rooms: &ctx.ambient_rooms,
room_id: &incoming_room_id,
room_id: &room_id_str,
is_addressed,
};
if let Some(response) = super::commands::try_handle_command(&dispatch, &user_message) {
slog!("[matrix-bot] Handled bot command from {sender}");
let html = markdown_to_html(&response);
if let Ok(resp) = room
.send(RoomMessageEventContent::text_html(response, html))
.await
if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &response, &html).await
&& let Ok(event_id) = msg_id.parse()
{
ctx.bot_sent_event_ids.lock().await.insert(resp.event_id);
ctx.bot_sent_event_ids.lock().await.insert(event_id);
}
return;
}
@@ -811,12 +831,13 @@ async fn on_room_message(
slog!("[matrix-bot] Handling htop command from {sender}: {htop_cmd:?}");
match htop_cmd {
super::htop::HtopCommand::Stop => {
super::htop::handle_htop_stop(&room, &incoming_room_id, &ctx.htop_sessions).await;
super::htop::handle_htop_stop(&*ctx.transport, &room_id_str, &ctx.htop_sessions)
.await;
}
super::htop::HtopCommand::Start { duration_secs } => {
super::htop::handle_htop_start(
&room,
&incoming_room_id,
&ctx.transport,
&room_id_str,
&ctx.htop_sessions,
Arc::clone(&ctx.agents),
duration_secs,
@@ -852,11 +873,10 @@ async fn on_room_message(
}
};
let html = markdown_to_html(&response);
if let Ok(resp) = room
.send(RoomMessageEventContent::text_html(response, html))
.await
if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &response, &html).await
&& let Ok(event_id) = msg_id.parse()
{
ctx.bot_sent_event_ids.lock().await.insert(resp.event_id);
ctx.bot_sent_event_ids.lock().await.insert(event_id);
}
return;
}
@@ -864,7 +884,7 @@ async fn on_room_message(
// Spawn a separate task so the Matrix sync loop is not blocked while we
// wait for the LLM response (which can take several seconds).
tokio::spawn(async move {
handle_message(room, incoming_room_id, ctx, sender, user_message).await;
handle_message(room_id_str, incoming_room_id, ctx, sender, user_message).await;
});
}
@@ -879,7 +899,7 @@ fn format_user_prompt(sender: &str, message: &str) -> String {
}
async fn handle_message(
room: Room,
room_id_str: String,
room_id: OwnedRoomId,
ctx: BotContext,
sender: String,
@@ -912,19 +932,19 @@ async fn handle_message(
let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let msg_tx_for_callback = msg_tx.clone();
// Spawn a task to post messages to Matrix as they arrive so we don't
// block the LLM stream while waiting for Matrix send round-trips.
let post_room = room.clone();
// Spawn a task to post messages via the transport as they arrive so we
// don't block the LLM stream while waiting for send round-trips.
let post_transport = Arc::clone(&ctx.transport);
let post_room_id = room_id_str.clone();
let sent_ids = Arc::clone(&ctx.bot_sent_event_ids);
let sent_ids_for_post = Arc::clone(&sent_ids);
let post_task = tokio::spawn(async move {
while let Some(chunk) = msg_rx.recv().await {
let html = markdown_to_html(&chunk);
if let Ok(response) = post_room
.send(RoomMessageEventContent::text_html(chunk, html))
.await
if let Ok(msg_id) = post_transport.send_message(&post_room_id, &chunk, &html).await
&& let Ok(event_id) = msg_id.parse()
{
sent_ids_for_post.lock().await.insert(response.event_id);
sent_ids_for_post.lock().await.insert(event_id);
}
}
});
@@ -966,7 +986,7 @@ async fn handle_message(
r = &mut chat_fut => break r,
Some(perm_fwd) = perm_rx_guard.recv() => {
// Post the permission prompt to the Matrix room.
// Post the permission prompt to the room via the transport.
let prompt_msg = format!(
"**Permission Request**\n\n\
Tool: `{}`\n```json\n{}\n```\n\n\
@@ -976,11 +996,10 @@ async fn handle_message(
.unwrap_or_else(|_| perm_fwd.tool_input.to_string()),
);
let html = markdown_to_html(&prompt_msg);
if let Ok(resp) = room
.send(RoomMessageEventContent::text_html(&prompt_msg, html))
.await
if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &prompt_msg, &html).await
&& let Ok(event_id) = msg_id.parse()
{
sent_ids.lock().await.insert(resp.event_id);
sent_ids.lock().await.insert(event_id);
}
// Store the MCP oneshot sender so the event handler can
@@ -993,7 +1012,8 @@ async fn handle_message(
// Spawn a timeout task: auto-deny if the user does not respond.
let pending = Arc::clone(&ctx.pending_perm_replies);
let timeout_room_id = room_id.clone();
let timeout_room = room.clone();
let timeout_transport = Arc::clone(&ctx.transport);
let timeout_room_id_str = room_id_str.clone();
let timeout_sent_ids = Arc::clone(&ctx.bot_sent_event_ids);
let timeout_secs = ctx.permission_timeout_secs;
tokio::spawn(async move {
@@ -1002,11 +1022,12 @@ async fn handle_message(
let _ = tx.send(PermissionDecision::Deny);
let msg = "Permission request timed out — denied (fail-closed).";
let html = markdown_to_html(msg);
if let Ok(resp) = timeout_room
.send(RoomMessageEventContent::text_html(msg, html))
if let Ok(msg_id) = timeout_transport
.send_message(&timeout_room_id_str, msg, &html)
.await
&& let Ok(event_id) = msg_id.parse()
{
timeout_sent_ids.lock().await.insert(resp.event_id);
timeout_sent_ids.lock().await.insert(event_id);
}
}
});
@@ -1372,6 +1393,7 @@ mod tests {
ambient_rooms: Arc::new(std::sync::Mutex::new(HashSet::new())),
agents: Arc::new(AgentPool::new_test(3000)),
htop_sessions: Arc::new(TokioMutex::new(HashMap::new())),
transport: Arc::new(crate::whatsapp::WhatsAppTransport::new()),
};
// Clone must work (required by Matrix SDK event handler injection).
let _cloned = ctx.clone();