diff --git a/server/src/matrix/bot.rs b/server/src/matrix/bot.rs index e61f7d0..daf0db6 100644 --- a/server/src/matrix/bot.rs +++ b/server/src/matrix/bot.rs @@ -172,6 +172,9 @@ pub struct BotContext { pub ambient_rooms: Arc>>, /// Agent pool for checking agent availability. pub agents: Arc, + /// Per-room htop monitoring sessions. Keyed by room ID; each entry holds + /// a stop-signal sender that the background task watches. + pub htop_sessions: super::htop::HtopSessions, } // --------------------------------------------------------------------------- @@ -376,6 +379,7 @@ pub async fn run_bot( bot_name, ambient_rooms: Arc::new(std::sync::Mutex::new(persisted_ambient)), agents, + htop_sessions: Arc::new(TokioMutex::new(HashMap::new())), }; slog!("[matrix-bot] Cryptographic identity verification is always ON — commands from unencrypted rooms or unverified devices are rejected"); @@ -799,6 +803,30 @@ async fn on_room_message( return; } + // Check for the htop command, which requires async Matrix access (Room) + // and cannot be handled by the sync command registry. + if let Some(htop_cmd) = + super::htop::extract_htop_command(&user_message, &ctx.bot_name, ctx.bot_user_id.as_str()) + { + slog!("[matrix-bot] Handling htop command from {sender}: {htop_cmd:?}"); + match htop_cmd { + super::htop::HtopCommand::Stop => { + super::htop::handle_htop_stop(&room, &incoming_room_id, &ctx.htop_sessions).await; + } + super::htop::HtopCommand::Start { duration_secs } => { + super::htop::handle_htop_start( + &room, + &incoming_room_id, + &ctx.htop_sessions, + Arc::clone(&ctx.agents), + duration_secs, + ) + .await; + } + } + return; + } + // Spawn a separate task so the Matrix sync loop is not blocked while we // wait for the LLM response (which can take several seconds). tokio::spawn(async move { @@ -1309,6 +1337,7 @@ mod tests { bot_name: "Assistant".to_string(), ambient_rooms: Arc::new(std::sync::Mutex::new(HashSet::new())), agents: Arc::new(AgentPool::new_test(3000)), + htop_sessions: Arc::new(TokioMutex::new(HashMap::new())), }; // Clone must work (required by Matrix SDK event handler injection). let _cloned = ctx.clone(); diff --git a/server/src/matrix/commands.rs b/server/src/matrix/commands.rs index e83501c..85cec8c 100644 --- a/server/src/matrix/commands.rs +++ b/server/src/matrix/commands.rs @@ -93,6 +93,11 @@ pub fn commands() -> &'static [BotCommand] { description: "Show git status: branch, uncommitted changes, and ahead/behind remote", handler: handle_git, }, + BotCommand { + name: "htop", + description: "Show live system and agent process dashboard (`htop`, `htop 10m`, `htop stop`)", + handler: handle_htop_fallback, + }, ] } @@ -442,6 +447,16 @@ fn handle_git(ctx: &CommandContext) -> Option { Some(out) } +/// Fallback handler for the `htop` command when it is not intercepted by the +/// async handler in `on_room_message`. In practice this is never called — +/// htop is detected and handled before `try_handle_command` is invoked. +/// The entry exists in the registry only so `help` lists it. +/// +/// Returns `None` to prevent the LLM from receiving "htop" as a prompt. +fn handle_htop_fallback(_ctx: &CommandContext) -> Option { + None +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- @@ -726,7 +741,7 @@ mod tests { ); } - // -- help lists status and ambient -------------------------------------- + // -- help lists status, ambient, and htop -------------------------------- #[test] fn help_output_includes_status() { @@ -742,6 +757,24 @@ mod tests { assert!(output.contains("ambient"), "help should list ambient command: {output}"); } + #[test] + fn help_output_includes_htop() { + let result = try_cmd_addressed("Timmy", "@timmy:homeserver.local", "@timmy help"); + let output = result.unwrap(); + assert!(output.contains("htop"), "help should list htop command: {output}"); + } + + #[test] + fn htop_command_falls_through_to_none() { + // The htop handler returns None so the message is handled asynchronously + // in on_room_message, not here. try_handle_command must return None. + let result = try_cmd_addressed("Timmy", "@timmy:homeserver.local", "@timmy htop"); + assert!( + result.is_none(), + "htop should not produce a sync response (handled async): {result:?}" + ); + } + // -- strip_prefix_ci ---------------------------------------------------- #[test] diff --git a/server/src/matrix/htop.rs b/server/src/matrix/htop.rs new file mode 100644 index 0000000..164ca29 --- /dev/null +++ b/server/src/matrix/htop.rs @@ -0,0 +1,572 @@ +//! htop command: live-updating system and agent process dashboard. +//! +//! Sends an initial message to a Matrix room showing load average and +//! per-agent process info, then edits it in-place every 5 seconds using +//! Matrix replacement events. A single htop session per room is enforced; +//! a new `htop` invocation stops any existing session and starts a fresh one. +//! Sessions auto-stop after the configured duration (default 5 minutes). + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use matrix_sdk::room::Room; +use matrix_sdk::ruma::OwnedEventId; +use matrix_sdk::ruma::events::room::message::{ + ReplacementMetadata, RoomMessageEventContent, RoomMessageEventContentWithoutRelation, +}; +use tokio::sync::{Mutex as TokioMutex, watch}; + +use crate::agents::{AgentPool, AgentStatus}; +use crate::slog; + +use super::bot::markdown_to_html; + +/// A parsed htop command from a Matrix message body. +#[derive(Debug, PartialEq)] +pub enum HtopCommand { + /// Start (or restart) monitoring. `duration_secs` is the auto-stop + /// timeout; defaults to 300 (5 minutes). + Start { duration_secs: u64 }, + /// Stop any active monitoring session for the room. + Stop, +} + +/// Per-room htop session: holds the stop-signal sender so callers can cancel. +pub struct HtopSession { + /// Send `true` to request a graceful stop of the background loop. + pub stop_tx: watch::Sender, +} + +/// Per-room htop session map type alias. +pub type HtopSessions = Arc>>; + +/// Parse an htop command from a raw Matrix message body. +/// +/// Strips the bot mention prefix and checks whether the first word is `htop`. +/// Returns `None` when the message is not an htop command. +/// +/// Recognised forms (after stripping the bot mention): +/// - `htop` → `Start { duration_secs: 300 }` +/// - `htop stop` → `Stop` +/// - `htop 10m` → `Start { duration_secs: 600 }` +/// - `htop 120` → `Start { duration_secs: 120 }` (bare seconds) +pub fn extract_htop_command(message: &str, bot_name: &str, bot_user_id: &str) -> Option { + let stripped = strip_mention(message, bot_name, bot_user_id); + let trimmed = stripped.trim(); + + // Strip leading punctuation (e.g. the comma in "@timmy, htop") + let trimmed = trimmed.trim_start_matches(|c: char| !c.is_alphanumeric()); + + let (cmd, args) = match trimmed.split_once(char::is_whitespace) { + Some((c, a)) => (c, a.trim()), + None => (trimmed, ""), + }; + + if !cmd.eq_ignore_ascii_case("htop") { + return None; + } + + if args.eq_ignore_ascii_case("stop") { + return Some(HtopCommand::Stop); + } + + let duration_secs = parse_duration(args).unwrap_or(300); + Some(HtopCommand::Start { duration_secs }) +} + +/// Parse an optional duration argument. +/// +/// Accepts `""` (empty → `None`), `"5m"` / `"10M"` (minutes), or a bare +/// integer interpreted as seconds. +fn parse_duration(s: &str) -> Option { + if s.is_empty() { + return None; + } + if let Some(mins_str) = s.strip_suffix('m').or_else(|| s.strip_suffix('M')) { + return mins_str.parse::().ok().map(|m| m * 60); + } + s.parse::().ok() +} + +/// Strip the bot mention prefix from a raw Matrix message body. +/// +/// Mirrors the logic in `commands::strip_bot_mention` so htop detection works +/// without depending on private symbols in that module. +fn strip_mention<'a>(message: &'a str, bot_name: &str, bot_user_id: &str) -> &'a str { + let trimmed = message.trim(); + + if let Some(rest) = strip_prefix_ci(trimmed, bot_user_id) { + return rest; + } + if let Some(localpart) = bot_user_id.split(':').next() + && let Some(rest) = strip_prefix_ci(trimmed, localpart) + { + return rest; + } + if let Some(rest) = strip_prefix_ci(trimmed, bot_name) { + return rest; + } + trimmed +} + +fn strip_prefix_ci<'a>(text: &'a str, prefix: &str) -> Option<&'a str> { + if text.len() < prefix.len() { + return None; + } + if !text[..prefix.len()].eq_ignore_ascii_case(prefix) { + return None; + } + let rest = &text[prefix.len()..]; + match rest.chars().next() { + None => Some(rest), + Some(c) if c.is_alphanumeric() || c == '-' || c == '_' => None, + _ => Some(rest), + } +} + +// --------------------------------------------------------------------------- +// System stats +// --------------------------------------------------------------------------- + +/// Read the system load average using the `uptime` command. +/// +/// Returns a short string like `"load average: 1.23, 0.98, 0.75"` on success, +/// or `"load: unknown"` on failure. +fn get_load_average() -> String { + let output = std::process::Command::new("uptime") + .output() + .ok() + .and_then(|o| String::from_utf8(o.stdout).ok()) + .unwrap_or_default(); + + // uptime output typically contains "load average: X, Y, Z" (Linux/macOS) + // or "load averages: X Y Z" (some BSD variants). + if let Some(idx) = output.find("load average") { + output[idx..].trim().trim_end_matches('\n').to_string() + } else { + "load: unknown".to_string() + } +} + +/// Process stats for a single agent, gathered from `ps`. +#[derive(Debug, Default)] +struct AgentProcessStats { + cpu_pct: f64, + mem_pct: f64, + num_procs: usize, +} + +/// Gather CPU% and MEM% for processes whose command line contains `worktree_path`. +/// +/// Runs `ps aux` and sums all matching lines. Returns `None` when no +/// matching process is found. +fn gather_process_stats(worktree_path: &str) -> Option { + let output = std::process::Command::new("ps") + .args(["aux"]) + .output() + .ok() + .and_then(|o| String::from_utf8(o.stdout).ok())?; + + let mut stats = AgentProcessStats::default(); + + for line in output.lines().skip(1) { + // Avoid matching against our own status display (the ps command itself) + if !line.contains(worktree_path) { + continue; + } + let parts: Vec<&str> = line.split_whitespace().collect(); + // ps aux columns: USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND... + if parts.len() >= 4 + && let (Ok(cpu), Ok(mem)) = (parts[2].parse::(), parts[3].parse::()) + { + stats.cpu_pct += cpu; + stats.mem_pct += mem; + stats.num_procs += 1; + } + } + + if stats.num_procs > 0 { + Some(stats) + } else { + None + } +} + +// --------------------------------------------------------------------------- +// Message formatting +// --------------------------------------------------------------------------- + +/// Build the Markdown text for the htop dashboard. +/// +/// `tick` is the number of updates sent so far (0 = initial). +/// `total_duration_secs` is the configured auto-stop timeout. +pub fn build_htop_message(agents: &AgentPool, tick: u32, total_duration_secs: u64) -> String { + let elapsed_secs = (tick as u64) * 5; + let remaining_secs = total_duration_secs.saturating_sub(elapsed_secs); + let remaining_mins = remaining_secs / 60; + let remaining_secs_rem = remaining_secs % 60; + + let load = get_load_average(); + + let mut lines = vec![ + format!("**htop** — {load}"), + format!( + "*Updates every 5s · auto-stops in {}m{}s · send `htop stop` to stop*", + remaining_mins, remaining_secs_rem + ), + String::new(), + ]; + + let all_agents = agents.list_agents().unwrap_or_default(); + let active: Vec<_> = all_agents + .iter() + .filter(|a| matches!(a.status, AgentStatus::Running | AgentStatus::Pending)) + .collect(); + + if active.is_empty() { + lines.push("*No agents currently running.*".to_string()); + } else { + lines.push("| Agent | Story | CPU% | MEM% | Procs |".to_string()); + lines.push("|-------|-------|-----:|-----:|------:|".to_string()); + for agent in &active { + let story_label = agent + .story_id + .split('_') + .next() + .unwrap_or(&agent.story_id) + .to_string(); + let stats = agent + .worktree_path + .as_deref() + .and_then(gather_process_stats) + .unwrap_or_default(); + lines.push(format!( + "| {} | {} | {:.1} | {:.1} | {} |", + agent.agent_name, + story_label, + stats.cpu_pct, + stats.mem_pct, + stats.num_procs, + )); + } + } + + lines.join("\n") +} + +// --------------------------------------------------------------------------- +// Matrix replacement helper +// --------------------------------------------------------------------------- + +/// Edit an existing Matrix message by sending a replacement event. +/// +/// Uses `RoomMessageEventContentWithoutRelation::make_replacement` with +/// `ReplacementMetadata` so the replacement carries the original event ID. +async fn send_replacement( + room: &Room, + original_event_id: &OwnedEventId, + plain: &str, + html: &str, +) -> Result<(), String> { + let new_content = + RoomMessageEventContentWithoutRelation::text_html(plain.to_string(), html.to_string()); + let metadata = ReplacementMetadata::new(original_event_id.clone(), None); + let content = new_content.make_replacement(metadata); + + room.send(content) + .await + .map(|_| ()) + .map_err(|e| format!("Matrix send error: {e}")) +} + +// --------------------------------------------------------------------------- +// Background monitoring loop +// --------------------------------------------------------------------------- + +/// Run the htop background loop: update the message every 5 seconds until +/// the stop signal is received or the timeout expires. +pub async fn run_htop_loop( + room: Room, + initial_event_id: OwnedEventId, + agents: Arc, + mut stop_rx: watch::Receiver, + duration_secs: u64, +) { + let interval_secs: u64 = 5; + let max_ticks = (duration_secs / interval_secs).max(1); + + for tick in 1..=max_ticks { + // Wait for the interval or a stop signal. + let sleep = tokio::time::sleep(Duration::from_secs(interval_secs)); + tokio::pin!(sleep); + + tokio::select! { + _ = &mut sleep => {} + Ok(()) = stop_rx.changed() => { + if *stop_rx.borrow() { + send_stopped_message(&room, &initial_event_id).await; + return; + } + } + } + + // Re-check after waking — the sender might have signalled while we slept. + if *stop_rx.borrow() { + send_stopped_message(&room, &initial_event_id).await; + return; + } + + let text = build_htop_message(&agents, tick as u32, duration_secs); + let html = markdown_to_html(&text); + + if let Err(e) = send_replacement(&room, &initial_event_id, &text, &html).await { + slog!("[htop] Failed to update message: {e}"); + return; + } + } + + // Auto-stop: timeout reached. + send_stopped_message(&room, &initial_event_id).await; +} + +async fn send_stopped_message(room: &Room, event_id: &OwnedEventId) { + let text = "**htop** — monitoring stopped."; + let html = markdown_to_html(text); + if let Err(e) = send_replacement(room, event_id, text, &html).await { + slog!("[htop] Failed to send stop message: {e}"); + } +} + +// --------------------------------------------------------------------------- +// Public command handlers (called from on_room_message in bot.rs) +// --------------------------------------------------------------------------- + +/// Start a new htop monitoring session for `room_id`. +/// +/// Stops any existing session for the room, sends the initial dashboard +/// message, and spawns a background task that edits it every 5 seconds. +pub async fn handle_htop_start( + room: &Room, + room_id: &matrix_sdk::ruma::OwnedRoomId, + htop_sessions: &HtopSessions, + agents: Arc, + duration_secs: u64, +) { + // Stop any existing session (best-effort; ignore errors if already done). + stop_existing_session(htop_sessions, room_id).await; + + // Send the initial message. + let initial_text = build_htop_message(&agents, 0, duration_secs); + let initial_html = markdown_to_html(&initial_text); + let send_result = room + .send(RoomMessageEventContent::text_html( + initial_text, + initial_html, + )) + .await; + + let event_id = match send_result { + Ok(r) => r.event_id, + Err(e) => { + slog!("[htop] Failed to send initial message: {e}"); + return; + } + }; + + // Create the stop channel and register the session. + let (stop_tx, stop_rx) = watch::channel(false); + { + let mut sessions = htop_sessions.lock().await; + sessions.insert(room_id.clone(), HtopSession { stop_tx }); + } + + // Spawn the background update loop. + let room_clone = room.clone(); + let sessions_clone = Arc::clone(htop_sessions); + let room_id_clone = room_id.clone(); + tokio::spawn(async move { + run_htop_loop(room_clone, event_id, agents, stop_rx, duration_secs).await; + // Clean up the session entry when the loop exits naturally. + let mut sessions = sessions_clone.lock().await; + sessions.remove(&room_id_clone); + }); +} + +/// Stop the active htop session for `room_id`, if any. +/// +/// When there is no active session, sends a "no active session" reply +/// to the room so the user knows the command was received. +pub async fn handle_htop_stop( + room: &Room, + room_id: &matrix_sdk::ruma::OwnedRoomId, + htop_sessions: &HtopSessions, +) { + let had_session = stop_existing_session(htop_sessions, room_id).await; + if !had_session { + let msg = "No active htop session in this room."; + let html = markdown_to_html(msg); + if let Err(e) = room + .send(RoomMessageEventContent::text_html(msg, html)) + .await + { + slog!("[htop] Failed to send no-session reply: {e}"); + } + } + // When a session was active, the background task handles the final edit. +} + +/// Signal and remove the existing session for `room_id`. +/// +/// Returns `true` if a session was found and stopped. +async fn stop_existing_session( + htop_sessions: &HtopSessions, + room_id: &matrix_sdk::ruma::OwnedRoomId, +) -> bool { + let mut sessions = htop_sessions.lock().await; + if let Some(session) = sessions.remove(room_id) { + // Signal the background task to stop (ignore error — task may be done). + let _ = session.stop_tx.send(true); + true + } else { + false + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + // -- extract_htop_command ----------------------------------------------- + + #[test] + fn htop_bare_command() { + let cmd = extract_htop_command("@timmy htop", "Timmy", "@timmy:homeserver.local"); + assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 300 })); + } + + #[test] + fn htop_with_display_name() { + let cmd = extract_htop_command("Timmy htop", "Timmy", "@timmy:homeserver.local"); + assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 300 })); + } + + #[test] + fn htop_stop() { + let cmd = extract_htop_command("@timmy htop stop", "Timmy", "@timmy:homeserver.local"); + assert_eq!(cmd, Some(HtopCommand::Stop)); + } + + #[test] + fn htop_duration_minutes() { + let cmd = extract_htop_command("@timmy htop 10m", "Timmy", "@timmy:homeserver.local"); + assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 600 })); + } + + #[test] + fn htop_duration_uppercase_m() { + let cmd = extract_htop_command("@timmy htop 2M", "Timmy", "@timmy:homeserver.local"); + assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 120 })); + } + + #[test] + fn htop_duration_seconds() { + let cmd = extract_htop_command("@timmy htop 90", "Timmy", "@timmy:homeserver.local"); + assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 90 })); + } + + #[test] + fn non_htop_command_returns_none() { + let cmd = extract_htop_command("@timmy status", "Timmy", "@timmy:homeserver.local"); + assert!(cmd.is_none()); + } + + #[test] + fn unrelated_message_returns_none() { + let cmd = extract_htop_command("hello world", "Timmy", "@timmy:homeserver.local"); + assert!(cmd.is_none()); + } + + #[test] + fn htop_case_insensitive() { + let cmd = extract_htop_command("@timmy HTOP", "Timmy", "@timmy:homeserver.local"); + assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 300 })); + } + + #[test] + fn htop_full_user_id() { + let cmd = extract_htop_command( + "@timmy:homeserver.local htop", + "Timmy", + "@timmy:homeserver.local", + ); + assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 300 })); + } + + #[test] + fn htop_with_comma_after_mention() { + // Some Matrix clients format mentions as "@timmy, htop" + let cmd = extract_htop_command("@timmy, htop", "Timmy", "@timmy:homeserver.local"); + assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 300 })); + } + + // -- parse_duration ----------------------------------------------------- + + #[test] + fn parse_duration_empty_returns_none() { + assert_eq!(parse_duration(""), None); + } + + #[test] + fn parse_duration_minutes() { + assert_eq!(parse_duration("5m"), Some(300)); + } + + #[test] + fn parse_duration_seconds() { + assert_eq!(parse_duration("120"), Some(120)); + } + + #[test] + fn parse_duration_invalid_returns_none() { + assert_eq!(parse_duration("abc"), None); + } + + // -- build_htop_message ------------------------------------------------- + + #[test] + fn build_htop_message_no_agents() { + let pool = Arc::new(crate::agents::AgentPool::new_test(3000)); + let text = build_htop_message(&pool, 0, 300); + assert!(text.contains("htop"), "should mention htop: {text}"); + assert!( + text.contains("No agents currently running"), + "should note no agents: {text}" + ); + } + + #[test] + fn build_htop_message_contains_load() { + let pool = Arc::new(crate::agents::AgentPool::new_test(3000)); + let text = build_htop_message(&pool, 0, 300); + // Load average is gathered via `uptime`; it should appear in some form. + assert!( + text.contains("load"), + "message should contain load info: {text}" + ); + } + + #[test] + fn build_htop_message_shows_remaining_time() { + let pool = Arc::new(crate::agents::AgentPool::new_test(3000)); + let text = build_htop_message(&pool, 0, 300); + assert!( + text.contains("auto-stops in"), + "should show remaining time: {text}" + ); + } +} diff --git a/server/src/matrix/mod.rs b/server/src/matrix/mod.rs index 474164d..f09ffad 100644 --- a/server/src/matrix/mod.rs +++ b/server/src/matrix/mod.rs @@ -18,6 +18,7 @@ mod bot; pub mod commands; mod config; +pub mod htop; pub mod notifications; pub use config::BotConfig;