//! htop command: live-updating system and agent process dashboard. //! //! Sends an initial message to a Matrix room showing load average and //! per-agent process info, then edits it in-place every 5 seconds using //! Matrix replacement events. A single htop session per room is enforced; //! a new `htop` invocation stops any existing session and starts a fresh one. //! Sessions auto-stop after the configured duration (default 5 minutes). use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex as TokioMutex, watch}; use crate::agents::{AgentPool, AgentStatus}; use crate::slog; use crate::transport::ChatTransport; use super::bot::markdown_to_html; /// A parsed htop command from a Matrix message body. #[derive(Debug, PartialEq)] pub enum HtopCommand { /// Start (or restart) monitoring. `duration_secs` is the auto-stop /// timeout; defaults to 300 (5 minutes). Start { duration_secs: u64 }, /// Stop any active monitoring session for the room. Stop, } /// Per-room htop session: holds the stop-signal sender so callers can cancel. pub struct HtopSession { /// Send `true` to request a graceful stop of the background loop. pub stop_tx: watch::Sender, } /// Per-room htop session map type alias. /// /// Keys are platform-agnostic room ID strings (e.g. `"!abc:example.com"` on /// Matrix) so this type works with any [`ChatTransport`] implementation. pub type HtopSessions = Arc>>; /// Parse an htop command from a raw Matrix message body. /// /// Strips the bot mention prefix and checks whether the first word is `htop`. /// Returns `None` when the message is not an htop command. /// /// Recognised forms (after stripping the bot mention): /// - `htop` → `Start { duration_secs: 300 }` /// - `htop stop` → `Stop` /// - `htop 10m` → `Start { duration_secs: 600 }` /// - `htop 120` → `Start { duration_secs: 120 }` (bare seconds) pub fn extract_htop_command(message: &str, bot_name: &str, bot_user_id: &str) -> Option { let stripped = strip_mention(message, bot_name, bot_user_id); let trimmed = stripped.trim(); // Strip leading punctuation (e.g. the comma in "@timmy, htop") let trimmed = trimmed.trim_start_matches(|c: char| !c.is_alphanumeric()); let (cmd, args) = match trimmed.split_once(char::is_whitespace) { Some((c, a)) => (c, a.trim()), None => (trimmed, ""), }; if !cmd.eq_ignore_ascii_case("htop") { return None; } if args.eq_ignore_ascii_case("stop") { return Some(HtopCommand::Stop); } let duration_secs = parse_duration(args).unwrap_or(300); Some(HtopCommand::Start { duration_secs }) } /// Parse an optional duration argument. /// /// Accepts `""` (empty → `None`), `"5m"` / `"10M"` (minutes), or a bare /// integer interpreted as seconds. fn parse_duration(s: &str) -> Option { if s.is_empty() { return None; } if let Some(mins_str) = s.strip_suffix('m').or_else(|| s.strip_suffix('M')) { return mins_str.parse::().ok().map(|m| m * 60); } s.parse::().ok() } /// Strip the bot mention prefix from a raw Matrix message body. /// /// Mirrors the logic in `commands::strip_bot_mention` so htop detection works /// without depending on private symbols in that module. fn strip_mention<'a>(message: &'a str, bot_name: &str, bot_user_id: &str) -> &'a str { let trimmed = message.trim(); if let Some(rest) = strip_prefix_ci(trimmed, bot_user_id) { return rest; } if let Some(localpart) = bot_user_id.split(':').next() && let Some(rest) = strip_prefix_ci(trimmed, localpart) { return rest; } if let Some(rest) = strip_prefix_ci(trimmed, bot_name) { return rest; } trimmed } fn strip_prefix_ci<'a>(text: &'a str, prefix: &str) -> Option<&'a str> { if text.len() < prefix.len() { return None; } if !text[..prefix.len()].eq_ignore_ascii_case(prefix) { return None; } let rest = &text[prefix.len()..]; match rest.chars().next() { None => Some(rest), Some(c) if c.is_alphanumeric() || c == '-' || c == '_' => None, _ => Some(rest), } } // --------------------------------------------------------------------------- // System stats // --------------------------------------------------------------------------- /// Read the system load average using the `uptime` command. /// /// Returns a short string like `"load average: 1.23, 0.98, 0.75"` on success, /// or `"load: unknown"` on failure. fn get_load_average() -> String { let output = std::process::Command::new("uptime") .output() .ok() .and_then(|o| String::from_utf8(o.stdout).ok()) .unwrap_or_default(); // uptime output typically contains "load average: X, Y, Z" (Linux/macOS) // or "load averages: X Y Z" (some BSD variants). if let Some(idx) = output.find("load average") { output[idx..].trim().trim_end_matches('\n').to_string() } else { "load: unknown".to_string() } } /// Process stats for a single agent, gathered from `ps`. #[derive(Debug, Default)] struct AgentProcessStats { cpu_pct: f64, mem_pct: f64, num_procs: usize, } /// Gather CPU% and MEM% for processes whose command line contains `worktree_path`. /// /// Runs `ps aux` and sums all matching lines. Returns `None` when no /// matching process is found. fn gather_process_stats(worktree_path: &str) -> Option { let output = std::process::Command::new("ps") .args(["aux"]) .output() .ok() .and_then(|o| String::from_utf8(o.stdout).ok())?; let mut stats = AgentProcessStats::default(); for line in output.lines().skip(1) { // Avoid matching against our own status display (the ps command itself) if !line.contains(worktree_path) { continue; } let parts: Vec<&str> = line.split_whitespace().collect(); // ps aux columns: USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND... if parts.len() >= 4 && let (Ok(cpu), Ok(mem)) = (parts[2].parse::(), parts[3].parse::()) { stats.cpu_pct += cpu; stats.mem_pct += mem; stats.num_procs += 1; } } if stats.num_procs > 0 { Some(stats) } else { None } } // --------------------------------------------------------------------------- // Message formatting // --------------------------------------------------------------------------- /// Build the Markdown text for the htop dashboard. /// /// `tick` is the number of updates sent so far (0 = initial). /// `total_duration_secs` is the configured auto-stop timeout. /// /// Output uses a compact single-line format per agent so it renders /// without wrapping on narrow screens (~40 chars), such as mobile /// Matrix clients. pub fn build_htop_message(agents: &AgentPool, tick: u32, total_duration_secs: u64) -> String { let elapsed_secs = (tick as u64) * 5; let remaining_secs = total_duration_secs.saturating_sub(elapsed_secs); let remaining_mins = remaining_secs / 60; let remaining_secs_rem = remaining_secs % 60; let load = get_load_average(); let mut lines = vec![ format!( "**htop** · auto-stops in {}m{}s", remaining_mins, remaining_secs_rem ), load, String::new(), ]; let all_agents = agents.list_agents().unwrap_or_default(); let active: Vec<_> = all_agents .iter() .filter(|a| matches!(a.status, AgentStatus::Running | AgentStatus::Pending)) .collect(); if active.is_empty() { lines.push("*No agents currently running.*".to_string()); } else { for agent in &active { let story_label = agent .story_id .split('_') .next() .unwrap_or(&agent.story_id) .to_string(); let stats = agent .worktree_path .as_deref() .and_then(gather_process_stats) .unwrap_or_default(); lines.push(format!( "**{}** #{} cpu:{:.1}% mem:{:.1}%", agent.agent_name, story_label, stats.cpu_pct, stats.mem_pct, )); } } lines.join("\n") } // --------------------------------------------------------------------------- // Background monitoring loop // --------------------------------------------------------------------------- /// Run the htop background loop: update the message every 5 seconds until /// the stop signal is received or the timeout expires. /// /// Uses the [`ChatTransport`] abstraction so the loop works with any chat /// platform, not just Matrix. pub async fn run_htop_loop( transport: Arc, room_id: String, initial_message_id: String, agents: Arc, mut stop_rx: watch::Receiver, duration_secs: u64, ) { let interval_secs: u64 = 5; let max_ticks = (duration_secs / interval_secs).max(1); for tick in 1..=max_ticks { // Wait for the interval or a stop signal. let sleep = tokio::time::sleep(Duration::from_secs(interval_secs)); tokio::pin!(sleep); tokio::select! { _ = &mut sleep => {} Ok(()) = stop_rx.changed() => { if *stop_rx.borrow() { send_stopped_message(&*transport, &room_id, &initial_message_id).await; return; } } } // Re-check after waking — the sender might have signalled while we slept. if *stop_rx.borrow() { send_stopped_message(&*transport, &room_id, &initial_message_id).await; return; } let text = build_htop_message(&agents, tick as u32, duration_secs); let html = markdown_to_html(&text); if let Err(e) = transport.edit_message(&room_id, &initial_message_id, &text, &html).await { slog!("[htop] Failed to update message: {e}"); return; } } // Auto-stop: timeout reached. send_stopped_message(&*transport, &room_id, &initial_message_id).await; } async fn send_stopped_message(transport: &dyn ChatTransport, room_id: &str, message_id: &str) { let text = "**htop** — monitoring stopped."; let html = markdown_to_html(text); if let Err(e) = transport.edit_message(room_id, message_id, text, &html).await { slog!("[htop] Failed to send stop message: {e}"); } } // --------------------------------------------------------------------------- // Public command handlers (called from on_room_message in bot.rs) // --------------------------------------------------------------------------- /// Start a new htop monitoring session for `room_id`. /// /// Stops any existing session for the room, sends the initial dashboard /// message, and spawns a background task that edits it every 5 seconds. /// /// Uses the [`ChatTransport`] abstraction so htop works with any platform. pub async fn handle_htop_start( transport: &Arc, room_id: &str, htop_sessions: &HtopSessions, agents: Arc, duration_secs: u64, ) { // Stop any existing session (best-effort; ignore errors if already done). stop_existing_session(htop_sessions, room_id).await; // Send the initial message. let initial_text = build_htop_message(&agents, 0, duration_secs); let initial_html = markdown_to_html(&initial_text); let message_id = match transport.send_message(room_id, &initial_text, &initial_html).await { Ok(id) => id, Err(e) => { slog!("[htop] Failed to send initial message: {e}"); return; } }; // Create the stop channel and register the session. let (stop_tx, stop_rx) = watch::channel(false); { let mut sessions = htop_sessions.lock().await; sessions.insert(room_id.to_string(), HtopSession { stop_tx }); } // Spawn the background update loop. let transport_clone = Arc::clone(transport); let sessions_clone = Arc::clone(htop_sessions); let room_id_owned = room_id.to_string(); tokio::spawn(async move { run_htop_loop( transport_clone, room_id_owned.clone(), message_id, agents, stop_rx, duration_secs, ) .await; // Clean up the session entry when the loop exits naturally. let mut sessions = sessions_clone.lock().await; sessions.remove(&room_id_owned); }); } /// Stop the active htop session for `room_id`, if any. /// /// When there is no active session, sends a "no active session" reply /// to the room so the user knows the command was received. pub async fn handle_htop_stop( transport: &dyn ChatTransport, room_id: &str, htop_sessions: &HtopSessions, ) { let had_session = stop_existing_session(htop_sessions, room_id).await; if !had_session { let msg = "No active htop session in this room."; let html = markdown_to_html(msg); if let Err(e) = transport.send_message(room_id, msg, &html).await { slog!("[htop] Failed to send no-session reply: {e}"); } } // When a session was active, the background task handles the final edit. } /// Signal and remove the existing session for `room_id`. /// /// Returns `true` if a session was found and stopped. async fn stop_existing_session(htop_sessions: &HtopSessions, room_id: &str) -> bool { let mut sessions = htop_sessions.lock().await; if let Some(session) = sessions.remove(room_id) { // Signal the background task to stop (ignore error — task may be done). let _ = session.stop_tx.send(true); true } else { false } } // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- #[cfg(test)] mod tests { use super::*; // -- extract_htop_command ----------------------------------------------- #[test] fn htop_bare_command() { let cmd = extract_htop_command("@timmy htop", "Timmy", "@timmy:homeserver.local"); assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 300 })); } #[test] fn htop_with_display_name() { let cmd = extract_htop_command("Timmy htop", "Timmy", "@timmy:homeserver.local"); assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 300 })); } #[test] fn htop_stop() { let cmd = extract_htop_command("@timmy htop stop", "Timmy", "@timmy:homeserver.local"); assert_eq!(cmd, Some(HtopCommand::Stop)); } #[test] fn htop_duration_minutes() { let cmd = extract_htop_command("@timmy htop 10m", "Timmy", "@timmy:homeserver.local"); assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 600 })); } #[test] fn htop_duration_uppercase_m() { let cmd = extract_htop_command("@timmy htop 2M", "Timmy", "@timmy:homeserver.local"); assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 120 })); } #[test] fn htop_duration_seconds() { let cmd = extract_htop_command("@timmy htop 90", "Timmy", "@timmy:homeserver.local"); assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 90 })); } #[test] fn non_htop_command_returns_none() { let cmd = extract_htop_command("@timmy status", "Timmy", "@timmy:homeserver.local"); assert!(cmd.is_none()); } #[test] fn unrelated_message_returns_none() { let cmd = extract_htop_command("hello world", "Timmy", "@timmy:homeserver.local"); assert!(cmd.is_none()); } #[test] fn htop_case_insensitive() { let cmd = extract_htop_command("@timmy HTOP", "Timmy", "@timmy:homeserver.local"); assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 300 })); } #[test] fn htop_full_user_id() { let cmd = extract_htop_command( "@timmy:homeserver.local htop", "Timmy", "@timmy:homeserver.local", ); assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 300 })); } #[test] fn htop_with_comma_after_mention() { // Some Matrix clients format mentions as "@timmy, htop" let cmd = extract_htop_command("@timmy, htop", "Timmy", "@timmy:homeserver.local"); assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 300 })); } // -- parse_duration ----------------------------------------------------- #[test] fn parse_duration_empty_returns_none() { assert_eq!(parse_duration(""), None); } #[test] fn parse_duration_minutes() { assert_eq!(parse_duration("5m"), Some(300)); } #[test] fn parse_duration_seconds() { assert_eq!(parse_duration("120"), Some(120)); } #[test] fn parse_duration_invalid_returns_none() { assert_eq!(parse_duration("abc"), None); } // -- build_htop_message ------------------------------------------------- #[test] fn build_htop_message_no_agents() { let pool = Arc::new(crate::agents::AgentPool::new_test(3000)); let text = build_htop_message(&pool, 0, 300); assert!(text.contains("htop"), "should mention htop: {text}"); assert!( text.contains("No agents currently running"), "should note no agents: {text}" ); } #[test] fn build_htop_message_contains_load() { let pool = Arc::new(crate::agents::AgentPool::new_test(3000)); let text = build_htop_message(&pool, 0, 300); // Load average is gathered via `uptime`; it should appear in some form. assert!( text.contains("load"), "message should contain load info: {text}" ); } #[test] fn build_htop_message_shows_remaining_time() { let pool = Arc::new(crate::agents::AgentPool::new_test(3000)); let text = build_htop_message(&pool, 0, 300); assert!( text.contains("auto-stops in"), "should show remaining time: {text}" ); } #[test] fn build_htop_message_load_on_own_line() { // Load average must be on its own line, not combined with the htop header. let pool = Arc::new(crate::agents::AgentPool::new_test(3000)); let text = build_htop_message(&pool, 0, 300); let lines: Vec<&str> = text.lines().collect(); let header_line = lines.first().expect("should have a header line"); // Header line must NOT contain "load" — load is on the second line. assert!( !header_line.contains("load"), "load should be on its own line, not the header: {header_line}" ); // Second line must contain "load". let load_line = lines.get(1).expect("should have a load line"); assert!( load_line.contains("load"), "second line should contain load info: {load_line}" ); } #[test] fn build_htop_message_no_table_syntax() { // Must not use Markdown table format (pipes/separators) — those are too // wide for narrow mobile screens. let pool = Arc::new(crate::agents::AgentPool::new_test(3000)); let text = build_htop_message(&pool, 0, 300); assert!( !text.contains("|----"), "output must not contain table separator rows: {text}" ); assert!( !text.contains("| Agent"), "output must not contain table header row: {text}" ); } #[test] fn build_htop_message_header_fits_40_chars() { // The header line (htop + remaining time) must fit in ~40 rendered chars. let pool = Arc::new(crate::agents::AgentPool::new_test(3000)); let text = build_htop_message(&pool, 0, 300); let header = text.lines().next().expect("should have a header line"); // Strip markdown bold markers (**) for length calculation. let rendered = header.replace("**", ""); assert!( rendered.len() <= 40, "header line too wide for mobile ({} chars): {rendered}", rendered.len() ); } }