diff --git a/server/src/main.rs b/server/src/main.rs index a33a4e5..a215a7b 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -12,7 +12,9 @@ pub mod log_buffer; mod matrix; mod state; mod store; +pub mod transport; mod workflow; +pub mod whatsapp; mod worktree; use crate::agents::AgentPool; diff --git a/server/src/matrix/bot.rs b/server/src/matrix/bot.rs index 3c0b27d..3fee04d 100644 --- a/server/src/matrix/bot.rs +++ b/server/src/matrix/bot.rs @@ -2,6 +2,7 @@ use crate::agents::AgentPool; use crate::http::context::{PermissionDecision, PermissionForward}; use crate::llm::providers::claude_code::{ClaudeCodeProvider, ClaudeCodeResult}; use crate::slog; +use crate::transport::ChatTransport; use matrix_sdk::{ Client, config::SyncSettings, @@ -10,7 +11,7 @@ use matrix_sdk::{ ruma::{ OwnedEventId, OwnedRoomId, OwnedUserId, events::room::message::{ - MessageType, OriginalSyncRoomMessageEvent, Relation, RoomMessageEventContent, + MessageType, OriginalSyncRoomMessageEvent, Relation, RoomMessageEventContentWithoutRelation, }, }, @@ -169,12 +170,18 @@ pub struct BotContext { /// Set of room IDs where ambient mode is active. In ambient mode the bot /// responds to all messages rather than only addressed ones. /// Uses a sync mutex since locks are never held across await points. - pub ambient_rooms: Arc>>, + /// Room IDs are stored as plain strings (platform-agnostic). + pub ambient_rooms: Arc>>, /// Agent pool for checking agent availability. pub agents: Arc, /// Per-room htop monitoring sessions. Keyed by room ID; each entry holds /// a stop-signal sender that the background task watches. pub htop_sessions: super::htop::HtopSessions, + /// Chat transport used for sending and editing messages. + /// + /// All message I/O goes through this abstraction so the bot logic works + /// with any platform, not just Matrix. + pub transport: Arc, } // --------------------------------------------------------------------------- @@ -344,12 +351,11 @@ pub async fn run_bot( persisted.len() ); - // Restore persisted ambient rooms from config, ignoring any that are not - // in the configured target_room_ids to avoid stale entries. - let persisted_ambient: HashSet = config + // Restore persisted ambient rooms from config. + let persisted_ambient: HashSet = config .ambient_rooms .iter() - .filter_map(|s| s.parse::().ok()) + .cloned() .collect(); if !persisted_ambient.is_empty() { slog!( @@ -359,6 +365,18 @@ pub async fn run_bot( ); } + // Create the transport abstraction based on the configured transport type. + let transport: Arc = match config.transport.as_str() { + "whatsapp" => { + slog!("[matrix-bot] Using WhatsApp transport (stub)"); + Arc::new(crate::whatsapp::WhatsAppTransport::new()) + } + _ => { + slog!("[matrix-bot] Using Matrix transport"); + Arc::new(super::transport_impl::MatrixTransport::new(client.clone())) + } + }; + let bot_name = config .display_name .clone() @@ -380,6 +398,7 @@ pub async fn run_bot( ambient_rooms: Arc::new(std::sync::Mutex::new(persisted_ambient)), agents, htop_sessions: Arc::new(TokioMutex::new(HashMap::new())), + transport: Arc::clone(&transport), }; slog!("[matrix-bot] Cryptographic identity verification is always ON — commands from unencrypted rooms or unverified devices are rejected"); @@ -391,9 +410,11 @@ pub async fn run_bot( // Spawn the stage-transition notification listener before entering the // sync loop so it starts receiving watcher events immediately. + let notif_room_id_strings: Vec = + notif_room_ids.iter().map(|r| r.to_string()).collect(); super::notifications::spawn_notification_listener( - client.clone(), - notif_room_ids, + Arc::clone(&transport), + notif_room_id_strings, watcher_rx, notif_project_root, ); @@ -403,15 +424,15 @@ pub async fn run_bot( // reconnects internally so this code is never reached again on a network // blip or sync resumption. let announce_msg = format_startup_announcement(&announce_bot_name); + let announce_html = markdown_to_html(&announce_msg); slog!("[matrix-bot] Sending startup announcement: {announce_msg}"); for room_id in &announce_room_ids { - if let Some(room) = client.get_room(room_id) { - let content = RoomMessageEventContent::text_plain(announce_msg.clone()); - if let Err(e) = room.send(content).await { - slog!("[matrix-bot] Failed to send startup announcement to {room_id}: {e}"); - } - } else { - slog!("[matrix-bot] Room {room_id} not found in client state, skipping announcement"); + let room_id_str = room_id.to_string(); + if let Err(e) = transport + .send_message(&room_id_str, &announce_msg, &announce_html) + .await + { + slog!("[matrix-bot] Failed to send startup announcement to {room_id}: {e}"); } } @@ -704,7 +725,8 @@ async fn on_room_message( // ambient mode is enabled for this room. let is_addressed = mentions_bot(&body, formatted_body.as_deref(), &ctx.bot_user_id) || is_reply_to_bot(ev.content.relates_to.as_ref(), &ctx.bot_sent_event_ids).await; - let is_ambient = ctx.ambient_rooms.lock().unwrap().contains(&incoming_room_id); + let room_id_str = incoming_room_id.to_string(); + let is_ambient = ctx.ambient_rooms.lock().unwrap().contains(&room_id_str); if !is_addressed && !is_ambient { slog!( @@ -765,11 +787,10 @@ async fn on_room_message( "Permission denied." }; let html = markdown_to_html(confirmation); - if let Ok(resp) = room - .send(RoomMessageEventContent::text_html(confirmation, html)) - .await + if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, confirmation, &html).await + && let Ok(event_id) = msg_id.parse() { - ctx.bot_sent_event_ids.lock().await.insert(resp.event_id); + ctx.bot_sent_event_ids.lock().await.insert(event_id); } return; } @@ -788,17 +809,16 @@ async fn on_room_message( project_root: &ctx.project_root, agents: &ctx.agents, ambient_rooms: &ctx.ambient_rooms, - room_id: &incoming_room_id, + room_id: &room_id_str, is_addressed, }; if let Some(response) = super::commands::try_handle_command(&dispatch, &user_message) { slog!("[matrix-bot] Handled bot command from {sender}"); let html = markdown_to_html(&response); - if let Ok(resp) = room - .send(RoomMessageEventContent::text_html(response, html)) - .await + if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &response, &html).await + && let Ok(event_id) = msg_id.parse() { - ctx.bot_sent_event_ids.lock().await.insert(resp.event_id); + ctx.bot_sent_event_ids.lock().await.insert(event_id); } return; } @@ -811,12 +831,13 @@ async fn on_room_message( slog!("[matrix-bot] Handling htop command from {sender}: {htop_cmd:?}"); match htop_cmd { super::htop::HtopCommand::Stop => { - super::htop::handle_htop_stop(&room, &incoming_room_id, &ctx.htop_sessions).await; + super::htop::handle_htop_stop(&*ctx.transport, &room_id_str, &ctx.htop_sessions) + .await; } super::htop::HtopCommand::Start { duration_secs } => { super::htop::handle_htop_start( - &room, - &incoming_room_id, + &ctx.transport, + &room_id_str, &ctx.htop_sessions, Arc::clone(&ctx.agents), duration_secs, @@ -852,11 +873,10 @@ async fn on_room_message( } }; let html = markdown_to_html(&response); - if let Ok(resp) = room - .send(RoomMessageEventContent::text_html(response, html)) - .await + if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &response, &html).await + && let Ok(event_id) = msg_id.parse() { - ctx.bot_sent_event_ids.lock().await.insert(resp.event_id); + ctx.bot_sent_event_ids.lock().await.insert(event_id); } return; } @@ -864,7 +884,7 @@ async fn on_room_message( // Spawn a separate task so the Matrix sync loop is not blocked while we // wait for the LLM response (which can take several seconds). tokio::spawn(async move { - handle_message(room, incoming_room_id, ctx, sender, user_message).await; + handle_message(room_id_str, incoming_room_id, ctx, sender, user_message).await; }); } @@ -879,7 +899,7 @@ fn format_user_prompt(sender: &str, message: &str) -> String { } async fn handle_message( - room: Room, + room_id_str: String, room_id: OwnedRoomId, ctx: BotContext, sender: String, @@ -912,19 +932,19 @@ async fn handle_message( let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); let msg_tx_for_callback = msg_tx.clone(); - // Spawn a task to post messages to Matrix as they arrive so we don't - // block the LLM stream while waiting for Matrix send round-trips. - let post_room = room.clone(); + // Spawn a task to post messages via the transport as they arrive so we + // don't block the LLM stream while waiting for send round-trips. + let post_transport = Arc::clone(&ctx.transport); + let post_room_id = room_id_str.clone(); let sent_ids = Arc::clone(&ctx.bot_sent_event_ids); let sent_ids_for_post = Arc::clone(&sent_ids); let post_task = tokio::spawn(async move { while let Some(chunk) = msg_rx.recv().await { let html = markdown_to_html(&chunk); - if let Ok(response) = post_room - .send(RoomMessageEventContent::text_html(chunk, html)) - .await + if let Ok(msg_id) = post_transport.send_message(&post_room_id, &chunk, &html).await + && let Ok(event_id) = msg_id.parse() { - sent_ids_for_post.lock().await.insert(response.event_id); + sent_ids_for_post.lock().await.insert(event_id); } } }); @@ -966,7 +986,7 @@ async fn handle_message( r = &mut chat_fut => break r, Some(perm_fwd) = perm_rx_guard.recv() => { - // Post the permission prompt to the Matrix room. + // Post the permission prompt to the room via the transport. let prompt_msg = format!( "**Permission Request**\n\n\ Tool: `{}`\n```json\n{}\n```\n\n\ @@ -976,11 +996,10 @@ async fn handle_message( .unwrap_or_else(|_| perm_fwd.tool_input.to_string()), ); let html = markdown_to_html(&prompt_msg); - if let Ok(resp) = room - .send(RoomMessageEventContent::text_html(&prompt_msg, html)) - .await + if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &prompt_msg, &html).await + && let Ok(event_id) = msg_id.parse() { - sent_ids.lock().await.insert(resp.event_id); + sent_ids.lock().await.insert(event_id); } // Store the MCP oneshot sender so the event handler can @@ -993,7 +1012,8 @@ async fn handle_message( // Spawn a timeout task: auto-deny if the user does not respond. let pending = Arc::clone(&ctx.pending_perm_replies); let timeout_room_id = room_id.clone(); - let timeout_room = room.clone(); + let timeout_transport = Arc::clone(&ctx.transport); + let timeout_room_id_str = room_id_str.clone(); let timeout_sent_ids = Arc::clone(&ctx.bot_sent_event_ids); let timeout_secs = ctx.permission_timeout_secs; tokio::spawn(async move { @@ -1002,11 +1022,12 @@ async fn handle_message( let _ = tx.send(PermissionDecision::Deny); let msg = "Permission request timed out — denied (fail-closed)."; let html = markdown_to_html(msg); - if let Ok(resp) = timeout_room - .send(RoomMessageEventContent::text_html(msg, html)) + if let Ok(msg_id) = timeout_transport + .send_message(&timeout_room_id_str, msg, &html) .await + && let Ok(event_id) = msg_id.parse() { - timeout_sent_ids.lock().await.insert(resp.event_id); + timeout_sent_ids.lock().await.insert(event_id); } } }); @@ -1372,6 +1393,7 @@ mod tests { ambient_rooms: Arc::new(std::sync::Mutex::new(HashSet::new())), agents: Arc::new(AgentPool::new_test(3000)), htop_sessions: Arc::new(TokioMutex::new(HashMap::new())), + transport: Arc::new(crate::whatsapp::WhatsAppTransport::new()), }; // Clone must work (required by Matrix SDK event handler injection). let _cloned = ctx.clone(); diff --git a/server/src/matrix/commands.rs b/server/src/matrix/commands.rs index 3af2673..558a72f 100644 --- a/server/src/matrix/commands.rs +++ b/server/src/matrix/commands.rs @@ -7,7 +7,6 @@ use crate::agents::{AgentPool, AgentStatus}; use crate::config::ProjectConfig; -use matrix_sdk::ruma::OwnedRoomId; use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::{Arc, Mutex}; @@ -31,19 +30,22 @@ pub struct BotCommand { /// Groups all the caller-supplied context needed to dispatch and execute bot /// commands. Construct one per incoming message and pass it alongside the raw /// message body. +/// +/// All identifiers are platform-agnostic strings so this struct works with +/// any [`ChatTransport`](crate::transport::ChatTransport) implementation. pub struct CommandDispatch<'a> { /// The bot's display name (e.g., "Timmy"). pub bot_name: &'a str, - /// The bot's full Matrix user ID (e.g., `"@timmy:homeserver.local"`). + /// The bot's full user ID (e.g., `"@timmy:homeserver.local"` on Matrix). pub bot_user_id: &'a str, /// Project root directory (needed by status, ambient). pub project_root: &'a Path, /// Agent pool (needed by status). pub agents: &'a AgentPool, - /// Set of rooms with ambient mode enabled (needed by ambient). - pub ambient_rooms: &'a Arc>>, + /// Set of room IDs with ambient mode enabled (needed by ambient). + pub ambient_rooms: &'a Arc>>, /// The room this message came from (needed by ambient). - pub room_id: &'a OwnedRoomId, + pub room_id: &'a str, /// Whether the message directly addressed the bot (mention/reply). /// Some commands (e.g. ambient) only operate when directly addressed. pub is_addressed: bool, @@ -59,10 +61,10 @@ pub struct CommandContext<'a> { pub project_root: &'a Path, /// Agent pool (needed by status). pub agents: &'a AgentPool, - /// Set of rooms with ambient mode enabled (needed by ambient). - pub ambient_rooms: &'a Arc>>, + /// Set of room IDs with ambient mode enabled (needed by ambient). + pub ambient_rooms: &'a Arc>>, /// The room this message came from (needed by ambient). - pub room_id: &'a OwnedRoomId, + pub room_id: &'a str, /// Whether the message directly addressed the bot (mention/reply). /// Some commands (e.g. ambient) only operate when directly addressed. pub is_addressed: bool, @@ -389,11 +391,11 @@ fn handle_ambient(ctx: &CommandContext) -> Option { let room_ids: Vec = { let mut ambient = ctx.ambient_rooms.lock().unwrap(); if enable { - ambient.insert(ctx.room_id.clone()); + ambient.insert(ctx.room_id.to_string()); } else { ambient.remove(ctx.room_id); } - ambient.iter().map(|r| r.to_string()).collect() + ambient.iter().cloned().collect() }; save_ambient_rooms(ctx.project_root, &room_ids); let msg = if enable { @@ -665,11 +667,7 @@ mod tests { // -- test helpers ------------------------------------------------------- - fn make_room_id(s: &str) -> OwnedRoomId { - s.parse().unwrap() - } - - fn test_ambient_rooms() -> Arc>> { + fn test_ambient_rooms() -> Arc>> { Arc::new(Mutex::new(HashSet::new())) } @@ -681,11 +679,11 @@ mod tests { bot_name: &str, bot_user_id: &str, message: &str, - ambient_rooms: &Arc>>, + ambient_rooms: &Arc>>, is_addressed: bool, ) -> Option { let agents = test_agents(); - let room_id = make_room_id("!test:example.com"); + let room_id = "!test:example.com".to_string(); let dispatch = CommandDispatch { bot_name, bot_user_id, @@ -855,7 +853,7 @@ mod tests { #[test] fn ambient_on_requires_addressed() { let ambient_rooms = test_ambient_rooms(); - let room_id = make_room_id("!myroom:example.com"); + let room_id = "!myroom:example.com".to_string(); let result = try_cmd( "Timmy", "@timmy:homeserver.local", @@ -875,7 +873,7 @@ mod tests { fn ambient_on_enables_ambient_mode() { let ambient_rooms = test_ambient_rooms(); let agents = test_agents(); - let room_id = make_room_id("!myroom:example.com"); + let room_id = "!myroom:example.com".to_string(); let dispatch = CommandDispatch { bot_name: "Timmy", bot_user_id: "@timmy:homeserver.local", @@ -902,7 +900,7 @@ mod tests { fn ambient_off_disables_ambient_mode() { let ambient_rooms = test_ambient_rooms(); let agents = test_agents(); - let room_id = make_room_id("!myroom:example.com"); + let room_id = "!myroom:example.com".to_string(); // Pre-insert the room ambient_rooms.lock().unwrap().insert(room_id.clone()); @@ -1211,7 +1209,7 @@ mod tests { .unwrap_or(std::path::Path::new(".")); let agents = test_agents(); let ambient_rooms = test_ambient_rooms(); - let room_id = make_room_id("!test:example.com"); + let room_id = "!test:example.com".to_string(); let dispatch = CommandDispatch { bot_name: "Timmy", bot_user_id: "@timmy:homeserver.local", @@ -1232,7 +1230,7 @@ mod tests { .unwrap_or(std::path::Path::new(".")); let agents = test_agents(); let ambient_rooms = test_ambient_rooms(); - let room_id = make_room_id("!test:example.com"); + let room_id = "!test:example.com".to_string(); let dispatch = CommandDispatch { bot_name: "Timmy", bot_user_id: "@timmy:homeserver.local", @@ -1256,7 +1254,7 @@ mod tests { .unwrap_or(std::path::Path::new(".")); let agents = test_agents(); let ambient_rooms = test_ambient_rooms(); - let room_id = make_room_id("!test:example.com"); + let room_id = "!test:example.com".to_string(); let dispatch = CommandDispatch { bot_name: "Timmy", bot_user_id: "@timmy:homeserver.local", @@ -1280,7 +1278,7 @@ mod tests { .unwrap_or(std::path::Path::new(".")); let agents = test_agents(); let ambient_rooms = test_ambient_rooms(); - let room_id = make_room_id("!test:example.com"); + let room_id = "!test:example.com".to_string(); let dispatch = CommandDispatch { bot_name: "Timmy", bot_user_id: "@timmy:homeserver.local", @@ -1335,7 +1333,7 @@ mod tests { fn cost_cmd_with_root(root: &std::path::Path) -> Option { let agents = test_agents(); let ambient_rooms = test_ambient_rooms(); - let room_id = make_room_id("!test:example.com"); + let room_id = "!test:example.com".to_string(); let dispatch = CommandDispatch { bot_name: "Timmy", bot_user_id: "@timmy:homeserver.local", @@ -1473,7 +1471,7 @@ mod tests { fn show_cmd_with_root(root: &std::path::Path, args: &str) -> Option { let agents = test_agents(); let ambient_rooms = test_ambient_rooms(); - let room_id = make_room_id("!test:example.com"); + let room_id = "!test:example.com".to_string(); let dispatch = CommandDispatch { bot_name: "Timmy", bot_user_id: "@timmy:homeserver.local", diff --git a/server/src/matrix/config.rs b/server/src/matrix/config.rs index a7b14da..b475260 100644 --- a/server/src/matrix/config.rs +++ b/server/src/matrix/config.rs @@ -58,6 +58,18 @@ pub struct BotConfig { /// manually while the bot is running. #[serde(default)] pub ambient_rooms: Vec, + /// Chat transport to use: `"matrix"` (default) or `"whatsapp"`. + /// + /// Selects which [`ChatTransport`] implementation the bot uses for + /// sending and editing messages. Currently only read during bot + /// startup to select the transport; the field is kept for config + /// round-tripping. + #[serde(default = "default_transport")] + pub transport: String, +} + +fn default_transport() -> String { + "matrix".to_string() } impl BotConfig { @@ -509,4 +521,45 @@ ambient_rooms = ["!abc:example.com"] let config = BotConfig::load(tmp.path()).unwrap(); assert!(config.ambient_rooms.is_empty()); } + + #[test] + fn load_transport_defaults_to_matrix() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".story_kit"); + fs::create_dir_all(&sk).unwrap(); + fs::write( + sk.join("bot.toml"), + r#" +homeserver = "https://matrix.example.com" +username = "@bot:example.com" +password = "secret" +room_ids = ["!abc:example.com"] +enabled = true +"#, + ) + .unwrap(); + let config = BotConfig::load(tmp.path()).unwrap(); + assert_eq!(config.transport, "matrix"); + } + + #[test] + fn load_transport_reads_custom_value() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".story_kit"); + fs::create_dir_all(&sk).unwrap(); + fs::write( + sk.join("bot.toml"), + r#" +homeserver = "https://matrix.example.com" +username = "@bot:example.com" +password = "secret" +room_ids = ["!abc:example.com"] +enabled = true +transport = "whatsapp" +"#, + ) + .unwrap(); + let config = BotConfig::load(tmp.path()).unwrap(); + assert_eq!(config.transport, "whatsapp"); + } } diff --git a/server/src/matrix/htop.rs b/server/src/matrix/htop.rs index 28b5099..045ade3 100644 --- a/server/src/matrix/htop.rs +++ b/server/src/matrix/htop.rs @@ -10,15 +10,11 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use matrix_sdk::room::Room; -use matrix_sdk::ruma::OwnedEventId; -use matrix_sdk::ruma::events::room::message::{ - ReplacementMetadata, RoomMessageEventContent, RoomMessageEventContentWithoutRelation, -}; use tokio::sync::{Mutex as TokioMutex, watch}; use crate::agents::{AgentPool, AgentStatus}; use crate::slog; +use crate::transport::ChatTransport; use super::bot::markdown_to_html; @@ -39,7 +35,10 @@ pub struct HtopSession { } /// Per-room htop session map type alias. -pub type HtopSessions = Arc>>; +/// +/// Keys are platform-agnostic room ID strings (e.g. `"!abc:example.com"` on +/// Matrix) so this type works with any [`ChatTransport`] implementation. +pub type HtopSessions = Arc>>; /// Parse an htop command from a raw Matrix message body. /// @@ -253,40 +252,19 @@ pub fn build_htop_message(agents: &AgentPool, tick: u32, total_duration_secs: u6 lines.join("\n") } -// --------------------------------------------------------------------------- -// Matrix replacement helper -// --------------------------------------------------------------------------- - -/// Edit an existing Matrix message by sending a replacement event. -/// -/// Uses `RoomMessageEventContentWithoutRelation::make_replacement` with -/// `ReplacementMetadata` so the replacement carries the original event ID. -async fn send_replacement( - room: &Room, - original_event_id: &OwnedEventId, - plain: &str, - html: &str, -) -> Result<(), String> { - let new_content = - RoomMessageEventContentWithoutRelation::text_html(plain.to_string(), html.to_string()); - let metadata = ReplacementMetadata::new(original_event_id.clone(), None); - let content = new_content.make_replacement(metadata); - - room.send(content) - .await - .map(|_| ()) - .map_err(|e| format!("Matrix send error: {e}")) -} - // --------------------------------------------------------------------------- // Background monitoring loop // --------------------------------------------------------------------------- /// Run the htop background loop: update the message every 5 seconds until /// the stop signal is received or the timeout expires. +/// +/// Uses the [`ChatTransport`] abstraction so the loop works with any chat +/// platform, not just Matrix. pub async fn run_htop_loop( - room: Room, - initial_event_id: OwnedEventId, + transport: Arc, + room_id: String, + initial_message_id: String, agents: Arc, mut stop_rx: watch::Receiver, duration_secs: u64, @@ -303,7 +281,7 @@ pub async fn run_htop_loop( _ = &mut sleep => {} Ok(()) = stop_rx.changed() => { if *stop_rx.borrow() { - send_stopped_message(&room, &initial_event_id).await; + send_stopped_message(&*transport, &room_id, &initial_message_id).await; return; } } @@ -311,27 +289,27 @@ pub async fn run_htop_loop( // Re-check after waking — the sender might have signalled while we slept. if *stop_rx.borrow() { - send_stopped_message(&room, &initial_event_id).await; + send_stopped_message(&*transport, &room_id, &initial_message_id).await; return; } let text = build_htop_message(&agents, tick as u32, duration_secs); let html = markdown_to_html(&text); - if let Err(e) = send_replacement(&room, &initial_event_id, &text, &html).await { + if let Err(e) = transport.edit_message(&room_id, &initial_message_id, &text, &html).await { slog!("[htop] Failed to update message: {e}"); return; } } // Auto-stop: timeout reached. - send_stopped_message(&room, &initial_event_id).await; + send_stopped_message(&*transport, &room_id, &initial_message_id).await; } -async fn send_stopped_message(room: &Room, event_id: &OwnedEventId) { +async fn send_stopped_message(transport: &dyn ChatTransport, room_id: &str, message_id: &str) { let text = "**htop** — monitoring stopped."; let html = markdown_to_html(text); - if let Err(e) = send_replacement(room, event_id, text, &html).await { + if let Err(e) = transport.edit_message(room_id, message_id, text, &html).await { slog!("[htop] Failed to send stop message: {e}"); } } @@ -344,9 +322,11 @@ async fn send_stopped_message(room: &Room, event_id: &OwnedEventId) { /// /// Stops any existing session for the room, sends the initial dashboard /// message, and spawns a background task that edits it every 5 seconds. +/// +/// Uses the [`ChatTransport`] abstraction so htop works with any platform. pub async fn handle_htop_start( - room: &Room, - room_id: &matrix_sdk::ruma::OwnedRoomId, + transport: &Arc, + room_id: &str, htop_sessions: &HtopSessions, agents: Arc, duration_secs: u64, @@ -357,15 +337,8 @@ pub async fn handle_htop_start( // Send the initial message. let initial_text = build_htop_message(&agents, 0, duration_secs); let initial_html = markdown_to_html(&initial_text); - let send_result = room - .send(RoomMessageEventContent::text_html( - initial_text, - initial_html, - )) - .await; - - let event_id = match send_result { - Ok(r) => r.event_id, + let message_id = match transport.send_message(room_id, &initial_text, &initial_html).await { + Ok(id) => id, Err(e) => { slog!("[htop] Failed to send initial message: {e}"); return; @@ -376,18 +349,26 @@ pub async fn handle_htop_start( let (stop_tx, stop_rx) = watch::channel(false); { let mut sessions = htop_sessions.lock().await; - sessions.insert(room_id.clone(), HtopSession { stop_tx }); + sessions.insert(room_id.to_string(), HtopSession { stop_tx }); } // Spawn the background update loop. - let room_clone = room.clone(); + let transport_clone = Arc::clone(transport); let sessions_clone = Arc::clone(htop_sessions); - let room_id_clone = room_id.clone(); + let room_id_owned = room_id.to_string(); tokio::spawn(async move { - run_htop_loop(room_clone, event_id, agents, stop_rx, duration_secs).await; + run_htop_loop( + transport_clone, + room_id_owned.clone(), + message_id, + agents, + stop_rx, + duration_secs, + ) + .await; // Clean up the session entry when the loop exits naturally. let mut sessions = sessions_clone.lock().await; - sessions.remove(&room_id_clone); + sessions.remove(&room_id_owned); }); } @@ -396,18 +377,15 @@ pub async fn handle_htop_start( /// When there is no active session, sends a "no active session" reply /// to the room so the user knows the command was received. pub async fn handle_htop_stop( - room: &Room, - room_id: &matrix_sdk::ruma::OwnedRoomId, + transport: &dyn ChatTransport, + room_id: &str, htop_sessions: &HtopSessions, ) { let had_session = stop_existing_session(htop_sessions, room_id).await; if !had_session { let msg = "No active htop session in this room."; let html = markdown_to_html(msg); - if let Err(e) = room - .send(RoomMessageEventContent::text_html(msg, html)) - .await - { + if let Err(e) = transport.send_message(room_id, msg, &html).await { slog!("[htop] Failed to send no-session reply: {e}"); } } @@ -417,10 +395,7 @@ pub async fn handle_htop_stop( /// Signal and remove the existing session for `room_id`. /// /// Returns `true` if a session was found and stopped. -async fn stop_existing_session( - htop_sessions: &HtopSessions, - room_id: &matrix_sdk::ruma::OwnedRoomId, -) -> bool { +async fn stop_existing_session(htop_sessions: &HtopSessions, room_id: &str) -> bool { let mut sessions = htop_sessions.lock().await; if let Some(session) = sessions.remove(room_id) { // Signal the background task to stop (ignore error — task may be done). diff --git a/server/src/matrix/mod.rs b/server/src/matrix/mod.rs index d2725a9..6213ef1 100644 --- a/server/src/matrix/mod.rs +++ b/server/src/matrix/mod.rs @@ -21,6 +21,7 @@ mod config; pub mod delete; pub mod htop; pub mod notifications; +pub mod transport_impl; pub use config::BotConfig; diff --git a/server/src/matrix/notifications.rs b/server/src/matrix/notifications.rs index 7b32134..3ed17dc 100644 --- a/server/src/matrix/notifications.rs +++ b/server/src/matrix/notifications.rs @@ -6,10 +6,9 @@ use crate::io::story_metadata::parse_front_matter; use crate::io::watcher::WatcherEvent; use crate::slog; -use matrix_sdk::ruma::events::room::message::RoomMessageEventContent; -use matrix_sdk::ruma::OwnedRoomId; -use matrix_sdk::Client; +use crate::transport::ChatTransport; use std::path::{Path, PathBuf}; +use std::sync::Arc; use tokio::sync::broadcast; /// Human-readable display name for a pipeline stage directory. @@ -100,10 +99,11 @@ pub fn format_error_notification( } /// Spawn a background task that listens for watcher events and posts -/// stage-transition notifications to all configured Matrix rooms. +/// stage-transition notifications to all configured rooms via the +/// [`ChatTransport`] abstraction. pub fn spawn_notification_listener( - client: Client, - room_ids: Vec, + transport: Arc, + room_ids: Vec, watcher_rx: broadcast::Receiver, project_root: PathBuf, ) { @@ -133,14 +133,10 @@ pub fn spawn_notification_listener( slog!("[matrix-bot] Sending stage notification: {plain}"); for room_id in &room_ids { - if let Some(room) = client.get_room(room_id) { - let content = - RoomMessageEventContent::text_html(plain.clone(), html.clone()); - if let Err(e) = room.send(content).await { - slog!( - "[matrix-bot] Failed to send notification to {room_id}: {e}" - ); - } + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!( + "[matrix-bot] Failed to send notification to {room_id}: {e}" + ); } } } @@ -159,14 +155,10 @@ pub fn spawn_notification_listener( slog!("[matrix-bot] Sending error notification: {plain}"); for room_id in &room_ids { - if let Some(room) = client.get_room(room_id) { - let content = - RoomMessageEventContent::text_html(plain.clone(), html.clone()); - if let Err(e) = room.send(content).await { - slog!( - "[matrix-bot] Failed to send error notification to {room_id}: {e}" - ); - } + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!( + "[matrix-bot] Failed to send error notification to {room_id}: {e}" + ); } } } diff --git a/server/src/matrix/transport_impl.rs b/server/src/matrix/transport_impl.rs new file mode 100644 index 0000000..8e30ed1 --- /dev/null +++ b/server/src/matrix/transport_impl.rs @@ -0,0 +1,96 @@ +//! Matrix implementation of [`ChatTransport`]. +//! +//! Wraps a [`matrix_sdk::Client`] and delegates message sending / editing +//! to the Matrix SDK. + +use async_trait::async_trait; +use matrix_sdk::Client; +use matrix_sdk::ruma::OwnedRoomId; +use matrix_sdk::ruma::events::room::message::{ + ReplacementMetadata, RoomMessageEventContent, RoomMessageEventContentWithoutRelation, +}; + +use crate::transport::{ChatTransport, MessageId}; + +/// Matrix-backed [`ChatTransport`] implementation. +/// +/// Holds a [`Client`] and resolves room IDs at send time. +pub struct MatrixTransport { + client: Client, +} + +impl MatrixTransport { + pub fn new(client: Client) -> Self { + Self { client } + } +} + +#[async_trait] +impl ChatTransport for MatrixTransport { + async fn send_message( + &self, + room_id: &str, + plain: &str, + html: &str, + ) -> Result { + let room_id: OwnedRoomId = room_id + .parse() + .map_err(|e| format!("Invalid room ID '{room_id}': {e}"))?; + let room = self + .client + .get_room(&room_id) + .ok_or_else(|| format!("Room {room_id} not found in client state"))?; + + let content = RoomMessageEventContent::text_html(plain.to_string(), html.to_string()); + let resp = room + .send(content) + .await + .map_err(|e| format!("Matrix send error: {e}"))?; + + Ok(resp.event_id.to_string()) + } + + async fn edit_message( + &self, + room_id: &str, + original_message_id: &str, + plain: &str, + html: &str, + ) -> Result<(), String> { + let room_id: OwnedRoomId = room_id + .parse() + .map_err(|e| format!("Invalid room ID '{room_id}': {e}"))?; + let room = self + .client + .get_room(&room_id) + .ok_or_else(|| format!("Room {room_id} not found in client state"))?; + + let original_event_id = original_message_id + .parse() + .map_err(|e| format!("Invalid event ID '{original_message_id}': {e}"))?; + + let new_content = + RoomMessageEventContentWithoutRelation::text_html(plain.to_string(), html.to_string()); + let metadata = ReplacementMetadata::new(original_event_id, None); + let content = new_content.make_replacement(metadata); + + room.send(content) + .await + .map(|_| ()) + .map_err(|e| format!("Matrix edit error: {e}")) + } + + async fn send_typing(&self, room_id: &str, typing: bool) -> Result<(), String> { + let room_id: OwnedRoomId = room_id + .parse() + .map_err(|e| format!("Invalid room ID '{room_id}': {e}"))?; + let room = self + .client + .get_room(&room_id) + .ok_or_else(|| format!("Room {room_id} not found in client state"))?; + + room.typing_notice(typing) + .await + .map_err(|e| format!("Matrix typing indicator error: {e}")) + } +} diff --git a/server/src/transport.rs b/server/src/transport.rs new file mode 100644 index 0000000..169035b --- /dev/null +++ b/server/src/transport.rs @@ -0,0 +1,91 @@ +//! Transport abstraction for chat platforms. +//! +//! The [`ChatTransport`] trait defines a platform-agnostic interface for +//! sending and editing messages, allowing the bot logic (commands, htop, +//! notifications) to work against any chat platform — Matrix, WhatsApp, etc. + +use async_trait::async_trait; + +/// A platform-agnostic identifier for a sent message. +/// +/// On Matrix this is the event ID; on other platforms it may be a message ID +/// or similar opaque string. The transport implementation is responsible for +/// producing and consuming these identifiers. +pub type MessageId = String; + +/// A platform-agnostic identifier for a chat room / channel / conversation. +pub type RoomId = String; + +/// Abstraction over a chat platform's message-sending capabilities. +/// +/// Implementations must be `Send + Sync` so they can be shared across +/// async tasks via `Arc`. +#[async_trait] +pub trait ChatTransport: Send + Sync { + /// Send a plain-text + HTML message to a room. + /// + /// Returns the platform-specific message ID on success so it can be + /// referenced later (e.g. for edits or reply detection). + async fn send_message( + &self, + room_id: &str, + plain: &str, + html: &str, + ) -> Result; + + /// Edit a previously sent message. + /// + /// `original_message_id` is the [`MessageId`] returned by a prior + /// [`send_message`](ChatTransport::send_message) call. + /// + /// Platforms that do not support editing (e.g. WhatsApp) should send a + /// new message instead. + async fn edit_message( + &self, + room_id: &str, + original_message_id: &str, + plain: &str, + html: &str, + ) -> Result<(), String>; + + /// Signal that the bot is typing (or has stopped typing) in a room. + /// + /// Platforms that do not support typing indicators should no-op. + async fn send_typing(&self, room_id: &str, typing: bool) -> Result<(), String>; +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + + /// Verify that the WhatsApp stub satisfies the ChatTransport trait and + /// can be used as `Arc`. + #[tokio::test] + async fn whatsapp_transport_satisfies_trait() { + let transport: Arc = + Arc::new(crate::whatsapp::WhatsAppTransport::new()); + + let msg_id = transport + .send_message("room-1", "hello", "

hello

") + .await + .unwrap(); + assert!(!msg_id.is_empty()); + + transport + .edit_message("room-1", &msg_id, "edited", "

edited

") + .await + .unwrap(); + + transport.send_typing("room-1", true).await.unwrap(); + transport.send_typing("room-1", false).await.unwrap(); + } + + /// MatrixTransport cannot be tested without a live homeserver, but we + /// can verify the type implements the trait at compile time. + #[test] + fn matrix_transport_is_send_sync() { + fn assert_send_sync() {} + assert_send_sync::(); + } +} diff --git a/server/src/whatsapp.rs b/server/src/whatsapp.rs new file mode 100644 index 0000000..b535a48 --- /dev/null +++ b/server/src/whatsapp.rs @@ -0,0 +1,117 @@ +//! WhatsApp stub implementation of [`ChatTransport`]. +//! +//! This is a placeholder transport that logs operations and returns stub +//! values. It exists to prove the transport abstraction works with a +//! second platform and will be replaced with a real WhatsApp Business API +//! integration in the future. + +use async_trait::async_trait; + +use crate::slog; +use crate::transport::{ChatTransport, MessageId}; + +/// Stub WhatsApp transport. +/// +/// All methods log the operation and return success with placeholder values. +/// Message editing is not supported by WhatsApp — `edit_message` sends a +/// new message instead (TODO: implement via WhatsApp Business API). +pub struct WhatsAppTransport { + /// Counter for generating unique stub message IDs. + next_id: std::sync::atomic::AtomicU64, +} + +impl WhatsAppTransport { + pub fn new() -> Self { + Self { + next_id: std::sync::atomic::AtomicU64::new(1), + } + } + + fn next_message_id(&self) -> String { + let id = self + .next_id + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + format!("whatsapp-stub-{id}") + } +} + +impl Default for WhatsAppTransport { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl ChatTransport for WhatsAppTransport { + async fn send_message( + &self, + room_id: &str, + plain: &str, + _html: &str, + ) -> Result { + // TODO: Send via WhatsApp Business API + let msg_id = self.next_message_id(); + slog!( + "[whatsapp-stub] send_message to {room_id}: {plain:.80} (id={msg_id})" + ); + Ok(msg_id) + } + + async fn edit_message( + &self, + room_id: &str, + original_message_id: &str, + plain: &str, + html: &str, + ) -> Result<(), String> { + // WhatsApp does not support message editing. + // Send a new message instead. + slog!( + "[whatsapp-stub] edit_message (original={original_message_id}) — \ + WhatsApp does not support edits, sending new message" + ); + self.send_message(room_id, plain, html).await.map(|_| ()) + } + + async fn send_typing(&self, room_id: &str, typing: bool) -> Result<(), String> { + // TODO: Send typing indicator via WhatsApp Business API + slog!("[whatsapp-stub] send_typing to {room_id}: typing={typing}"); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn send_message_returns_unique_ids() { + let transport = WhatsAppTransport::new(); + let id1 = transport + .send_message("room1", "hello", "

hello

") + .await + .unwrap(); + let id2 = transport + .send_message("room1", "world", "

world

") + .await + .unwrap(); + assert_ne!(id1, id2, "each message should get a unique ID"); + assert!(id1.starts_with("whatsapp-stub-")); + } + + #[tokio::test] + async fn edit_message_succeeds() { + let transport = WhatsAppTransport::new(); + let result = transport + .edit_message("room1", "msg-1", "updated", "

updated

") + .await; + assert!(result.is_ok(), "edit should succeed (sends new message)"); + } + + #[tokio::test] + async fn send_typing_succeeds() { + let transport = WhatsAppTransport::new(); + assert!(transport.send_typing("room1", true).await.is_ok()); + assert!(transport.send_typing("room1", false).await.is_ok()); + } +}