story-kit: merge 298_story_bot_htop_command_with_live_updating_process_dashboard

Adds htop bot command with live-updating Matrix message showing system
load and per-agent CPU/memory usage. Supports timeout override and
htop stop. Resolved conflict with git command in commands.rs registry.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dave
2026-03-19 10:33:21 +00:00
parent 99d301b467
commit 981fd3fd81
4 changed files with 636 additions and 1 deletions

View File

@@ -172,6 +172,9 @@ pub struct BotContext {
pub ambient_rooms: Arc<std::sync::Mutex<HashSet<OwnedRoomId>>>,
/// Agent pool for checking agent availability.
pub agents: Arc<AgentPool>,
/// Per-room htop monitoring sessions. Keyed by room ID; each entry holds
/// a stop-signal sender that the background task watches.
pub htop_sessions: super::htop::HtopSessions,
}
// ---------------------------------------------------------------------------
@@ -376,6 +379,7 @@ pub async fn run_bot(
bot_name,
ambient_rooms: Arc::new(std::sync::Mutex::new(persisted_ambient)),
agents,
htop_sessions: Arc::new(TokioMutex::new(HashMap::new())),
};
slog!("[matrix-bot] Cryptographic identity verification is always ON — commands from unencrypted rooms or unverified devices are rejected");
@@ -799,6 +803,30 @@ async fn on_room_message(
return;
}
// Check for the htop command, which requires async Matrix access (Room)
// and cannot be handled by the sync command registry.
if let Some(htop_cmd) =
super::htop::extract_htop_command(&user_message, &ctx.bot_name, ctx.bot_user_id.as_str())
{
slog!("[matrix-bot] Handling htop command from {sender}: {htop_cmd:?}");
match htop_cmd {
super::htop::HtopCommand::Stop => {
super::htop::handle_htop_stop(&room, &incoming_room_id, &ctx.htop_sessions).await;
}
super::htop::HtopCommand::Start { duration_secs } => {
super::htop::handle_htop_start(
&room,
&incoming_room_id,
&ctx.htop_sessions,
Arc::clone(&ctx.agents),
duration_secs,
)
.await;
}
}
return;
}
// Spawn a separate task so the Matrix sync loop is not blocked while we
// wait for the LLM response (which can take several seconds).
tokio::spawn(async move {
@@ -1309,6 +1337,7 @@ mod tests {
bot_name: "Assistant".to_string(),
ambient_rooms: Arc::new(std::sync::Mutex::new(HashSet::new())),
agents: Arc::new(AgentPool::new_test(3000)),
htop_sessions: Arc::new(TokioMutex::new(HashMap::new())),
};
// Clone must work (required by Matrix SDK event handler injection).
let _cloned = ctx.clone();

View File

@@ -93,6 +93,11 @@ pub fn commands() -> &'static [BotCommand] {
description: "Show git status: branch, uncommitted changes, and ahead/behind remote",
handler: handle_git,
},
BotCommand {
name: "htop",
description: "Show live system and agent process dashboard (`htop`, `htop 10m`, `htop stop`)",
handler: handle_htop_fallback,
},
]
}
@@ -442,6 +447,16 @@ fn handle_git(ctx: &CommandContext) -> Option<String> {
Some(out)
}
/// Fallback handler for the `htop` command when it is not intercepted by the
/// async handler in `on_room_message`. In practice this is never called —
/// htop is detected and handled before `try_handle_command` is invoked.
/// The entry exists in the registry only so `help` lists it.
///
/// Returns `None` to prevent the LLM from receiving "htop" as a prompt.
fn handle_htop_fallback(_ctx: &CommandContext) -> Option<String> {
None
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
@@ -726,7 +741,7 @@ mod tests {
);
}
// -- help lists status and ambient --------------------------------------
// -- help lists status, ambient, and htop --------------------------------
#[test]
fn help_output_includes_status() {
@@ -742,6 +757,24 @@ mod tests {
assert!(output.contains("ambient"), "help should list ambient command: {output}");
}
#[test]
fn help_output_includes_htop() {
let result = try_cmd_addressed("Timmy", "@timmy:homeserver.local", "@timmy help");
let output = result.unwrap();
assert!(output.contains("htop"), "help should list htop command: {output}");
}
#[test]
fn htop_command_falls_through_to_none() {
// The htop handler returns None so the message is handled asynchronously
// in on_room_message, not here. try_handle_command must return None.
let result = try_cmd_addressed("Timmy", "@timmy:homeserver.local", "@timmy htop");
assert!(
result.is_none(),
"htop should not produce a sync response (handled async): {result:?}"
);
}
// -- strip_prefix_ci ----------------------------------------------------
#[test]

572
server/src/matrix/htop.rs Normal file
View File

@@ -0,0 +1,572 @@
//! htop command: live-updating system and agent process dashboard.
//!
//! Sends an initial message to a Matrix room showing load average and
//! per-agent process info, then edits it in-place every 5 seconds using
//! Matrix replacement events. A single htop session per room is enforced;
//! a new `htop` invocation stops any existing session and starts a fresh one.
//! Sessions auto-stop after the configured duration (default 5 minutes).
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use matrix_sdk::room::Room;
use matrix_sdk::ruma::OwnedEventId;
use matrix_sdk::ruma::events::room::message::{
ReplacementMetadata, RoomMessageEventContent, RoomMessageEventContentWithoutRelation,
};
use tokio::sync::{Mutex as TokioMutex, watch};
use crate::agents::{AgentPool, AgentStatus};
use crate::slog;
use super::bot::markdown_to_html;
/// A parsed htop command from a Matrix message body.
#[derive(Debug, PartialEq)]
pub enum HtopCommand {
/// Start (or restart) monitoring. `duration_secs` is the auto-stop
/// timeout; defaults to 300 (5 minutes).
Start { duration_secs: u64 },
/// Stop any active monitoring session for the room.
Stop,
}
/// Per-room htop session: holds the stop-signal sender so callers can cancel.
pub struct HtopSession {
/// Send `true` to request a graceful stop of the background loop.
pub stop_tx: watch::Sender<bool>,
}
/// Per-room htop session map type alias.
pub type HtopSessions = Arc<TokioMutex<HashMap<matrix_sdk::ruma::OwnedRoomId, HtopSession>>>;
/// Parse an htop command from a raw Matrix message body.
///
/// Strips the bot mention prefix and checks whether the first word is `htop`.
/// Returns `None` when the message is not an htop command.
///
/// Recognised forms (after stripping the bot mention):
/// - `htop` → `Start { duration_secs: 300 }`
/// - `htop stop` → `Stop`
/// - `htop 10m` → `Start { duration_secs: 600 }`
/// - `htop 120` → `Start { duration_secs: 120 }` (bare seconds)
pub fn extract_htop_command(message: &str, bot_name: &str, bot_user_id: &str) -> Option<HtopCommand> {
let stripped = strip_mention(message, bot_name, bot_user_id);
let trimmed = stripped.trim();
// Strip leading punctuation (e.g. the comma in "@timmy, htop")
let trimmed = trimmed.trim_start_matches(|c: char| !c.is_alphanumeric());
let (cmd, args) = match trimmed.split_once(char::is_whitespace) {
Some((c, a)) => (c, a.trim()),
None => (trimmed, ""),
};
if !cmd.eq_ignore_ascii_case("htop") {
return None;
}
if args.eq_ignore_ascii_case("stop") {
return Some(HtopCommand::Stop);
}
let duration_secs = parse_duration(args).unwrap_or(300);
Some(HtopCommand::Start { duration_secs })
}
/// Parse an optional duration argument.
///
/// Accepts `""` (empty → `None`), `"5m"` / `"10M"` (minutes), or a bare
/// integer interpreted as seconds.
fn parse_duration(s: &str) -> Option<u64> {
if s.is_empty() {
return None;
}
if let Some(mins_str) = s.strip_suffix('m').or_else(|| s.strip_suffix('M')) {
return mins_str.parse::<u64>().ok().map(|m| m * 60);
}
s.parse::<u64>().ok()
}
/// Strip the bot mention prefix from a raw Matrix message body.
///
/// Mirrors the logic in `commands::strip_bot_mention` so htop detection works
/// without depending on private symbols in that module.
fn strip_mention<'a>(message: &'a str, bot_name: &str, bot_user_id: &str) -> &'a str {
let trimmed = message.trim();
if let Some(rest) = strip_prefix_ci(trimmed, bot_user_id) {
return rest;
}
if let Some(localpart) = bot_user_id.split(':').next()
&& let Some(rest) = strip_prefix_ci(trimmed, localpart)
{
return rest;
}
if let Some(rest) = strip_prefix_ci(trimmed, bot_name) {
return rest;
}
trimmed
}
fn strip_prefix_ci<'a>(text: &'a str, prefix: &str) -> Option<&'a str> {
if text.len() < prefix.len() {
return None;
}
if !text[..prefix.len()].eq_ignore_ascii_case(prefix) {
return None;
}
let rest = &text[prefix.len()..];
match rest.chars().next() {
None => Some(rest),
Some(c) if c.is_alphanumeric() || c == '-' || c == '_' => None,
_ => Some(rest),
}
}
// ---------------------------------------------------------------------------
// System stats
// ---------------------------------------------------------------------------
/// Read the system load average using the `uptime` command.
///
/// Returns a short string like `"load average: 1.23, 0.98, 0.75"` on success,
/// or `"load: unknown"` on failure.
fn get_load_average() -> String {
let output = std::process::Command::new("uptime")
.output()
.ok()
.and_then(|o| String::from_utf8(o.stdout).ok())
.unwrap_or_default();
// uptime output typically contains "load average: X, Y, Z" (Linux/macOS)
// or "load averages: X Y Z" (some BSD variants).
if let Some(idx) = output.find("load average") {
output[idx..].trim().trim_end_matches('\n').to_string()
} else {
"load: unknown".to_string()
}
}
/// Process stats for a single agent, gathered from `ps`.
#[derive(Debug, Default)]
struct AgentProcessStats {
cpu_pct: f64,
mem_pct: f64,
num_procs: usize,
}
/// Gather CPU% and MEM% for processes whose command line contains `worktree_path`.
///
/// Runs `ps aux` and sums all matching lines. Returns `None` when no
/// matching process is found.
fn gather_process_stats(worktree_path: &str) -> Option<AgentProcessStats> {
let output = std::process::Command::new("ps")
.args(["aux"])
.output()
.ok()
.and_then(|o| String::from_utf8(o.stdout).ok())?;
let mut stats = AgentProcessStats::default();
for line in output.lines().skip(1) {
// Avoid matching against our own status display (the ps command itself)
if !line.contains(worktree_path) {
continue;
}
let parts: Vec<&str> = line.split_whitespace().collect();
// ps aux columns: USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND...
if parts.len() >= 4
&& let (Ok(cpu), Ok(mem)) = (parts[2].parse::<f64>(), parts[3].parse::<f64>())
{
stats.cpu_pct += cpu;
stats.mem_pct += mem;
stats.num_procs += 1;
}
}
if stats.num_procs > 0 {
Some(stats)
} else {
None
}
}
// ---------------------------------------------------------------------------
// Message formatting
// ---------------------------------------------------------------------------
/// Build the Markdown text for the htop dashboard.
///
/// `tick` is the number of updates sent so far (0 = initial).
/// `total_duration_secs` is the configured auto-stop timeout.
pub fn build_htop_message(agents: &AgentPool, tick: u32, total_duration_secs: u64) -> String {
let elapsed_secs = (tick as u64) * 5;
let remaining_secs = total_duration_secs.saturating_sub(elapsed_secs);
let remaining_mins = remaining_secs / 60;
let remaining_secs_rem = remaining_secs % 60;
let load = get_load_average();
let mut lines = vec![
format!("**htop** — {load}"),
format!(
"*Updates every 5s · auto-stops in {}m{}s · send `htop stop` to stop*",
remaining_mins, remaining_secs_rem
),
String::new(),
];
let all_agents = agents.list_agents().unwrap_or_default();
let active: Vec<_> = all_agents
.iter()
.filter(|a| matches!(a.status, AgentStatus::Running | AgentStatus::Pending))
.collect();
if active.is_empty() {
lines.push("*No agents currently running.*".to_string());
} else {
lines.push("| Agent | Story | CPU% | MEM% | Procs |".to_string());
lines.push("|-------|-------|-----:|-----:|------:|".to_string());
for agent in &active {
let story_label = agent
.story_id
.split('_')
.next()
.unwrap_or(&agent.story_id)
.to_string();
let stats = agent
.worktree_path
.as_deref()
.and_then(gather_process_stats)
.unwrap_or_default();
lines.push(format!(
"| {} | {} | {:.1} | {:.1} | {} |",
agent.agent_name,
story_label,
stats.cpu_pct,
stats.mem_pct,
stats.num_procs,
));
}
}
lines.join("\n")
}
// ---------------------------------------------------------------------------
// Matrix replacement helper
// ---------------------------------------------------------------------------
/// Edit an existing Matrix message by sending a replacement event.
///
/// Uses `RoomMessageEventContentWithoutRelation::make_replacement` with
/// `ReplacementMetadata` so the replacement carries the original event ID.
async fn send_replacement(
room: &Room,
original_event_id: &OwnedEventId,
plain: &str,
html: &str,
) -> Result<(), String> {
let new_content =
RoomMessageEventContentWithoutRelation::text_html(plain.to_string(), html.to_string());
let metadata = ReplacementMetadata::new(original_event_id.clone(), None);
let content = new_content.make_replacement(metadata);
room.send(content)
.await
.map(|_| ())
.map_err(|e| format!("Matrix send error: {e}"))
}
// ---------------------------------------------------------------------------
// Background monitoring loop
// ---------------------------------------------------------------------------
/// Run the htop background loop: update the message every 5 seconds until
/// the stop signal is received or the timeout expires.
pub async fn run_htop_loop(
room: Room,
initial_event_id: OwnedEventId,
agents: Arc<AgentPool>,
mut stop_rx: watch::Receiver<bool>,
duration_secs: u64,
) {
let interval_secs: u64 = 5;
let max_ticks = (duration_secs / interval_secs).max(1);
for tick in 1..=max_ticks {
// Wait for the interval or a stop signal.
let sleep = tokio::time::sleep(Duration::from_secs(interval_secs));
tokio::pin!(sleep);
tokio::select! {
_ = &mut sleep => {}
Ok(()) = stop_rx.changed() => {
if *stop_rx.borrow() {
send_stopped_message(&room, &initial_event_id).await;
return;
}
}
}
// Re-check after waking — the sender might have signalled while we slept.
if *stop_rx.borrow() {
send_stopped_message(&room, &initial_event_id).await;
return;
}
let text = build_htop_message(&agents, tick as u32, duration_secs);
let html = markdown_to_html(&text);
if let Err(e) = send_replacement(&room, &initial_event_id, &text, &html).await {
slog!("[htop] Failed to update message: {e}");
return;
}
}
// Auto-stop: timeout reached.
send_stopped_message(&room, &initial_event_id).await;
}
async fn send_stopped_message(room: &Room, event_id: &OwnedEventId) {
let text = "**htop** — monitoring stopped.";
let html = markdown_to_html(text);
if let Err(e) = send_replacement(room, event_id, text, &html).await {
slog!("[htop] Failed to send stop message: {e}");
}
}
// ---------------------------------------------------------------------------
// Public command handlers (called from on_room_message in bot.rs)
// ---------------------------------------------------------------------------
/// Start a new htop monitoring session for `room_id`.
///
/// Stops any existing session for the room, sends the initial dashboard
/// message, and spawns a background task that edits it every 5 seconds.
pub async fn handle_htop_start(
room: &Room,
room_id: &matrix_sdk::ruma::OwnedRoomId,
htop_sessions: &HtopSessions,
agents: Arc<AgentPool>,
duration_secs: u64,
) {
// Stop any existing session (best-effort; ignore errors if already done).
stop_existing_session(htop_sessions, room_id).await;
// Send the initial message.
let initial_text = build_htop_message(&agents, 0, duration_secs);
let initial_html = markdown_to_html(&initial_text);
let send_result = room
.send(RoomMessageEventContent::text_html(
initial_text,
initial_html,
))
.await;
let event_id = match send_result {
Ok(r) => r.event_id,
Err(e) => {
slog!("[htop] Failed to send initial message: {e}");
return;
}
};
// Create the stop channel and register the session.
let (stop_tx, stop_rx) = watch::channel(false);
{
let mut sessions = htop_sessions.lock().await;
sessions.insert(room_id.clone(), HtopSession { stop_tx });
}
// Spawn the background update loop.
let room_clone = room.clone();
let sessions_clone = Arc::clone(htop_sessions);
let room_id_clone = room_id.clone();
tokio::spawn(async move {
run_htop_loop(room_clone, event_id, agents, stop_rx, duration_secs).await;
// Clean up the session entry when the loop exits naturally.
let mut sessions = sessions_clone.lock().await;
sessions.remove(&room_id_clone);
});
}
/// Stop the active htop session for `room_id`, if any.
///
/// When there is no active session, sends a "no active session" reply
/// to the room so the user knows the command was received.
pub async fn handle_htop_stop(
room: &Room,
room_id: &matrix_sdk::ruma::OwnedRoomId,
htop_sessions: &HtopSessions,
) {
let had_session = stop_existing_session(htop_sessions, room_id).await;
if !had_session {
let msg = "No active htop session in this room.";
let html = markdown_to_html(msg);
if let Err(e) = room
.send(RoomMessageEventContent::text_html(msg, html))
.await
{
slog!("[htop] Failed to send no-session reply: {e}");
}
}
// When a session was active, the background task handles the final edit.
}
/// Signal and remove the existing session for `room_id`.
///
/// Returns `true` if a session was found and stopped.
async fn stop_existing_session(
htop_sessions: &HtopSessions,
room_id: &matrix_sdk::ruma::OwnedRoomId,
) -> bool {
let mut sessions = htop_sessions.lock().await;
if let Some(session) = sessions.remove(room_id) {
// Signal the background task to stop (ignore error — task may be done).
let _ = session.stop_tx.send(true);
true
} else {
false
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
// -- extract_htop_command -----------------------------------------------
#[test]
fn htop_bare_command() {
let cmd = extract_htop_command("@timmy htop", "Timmy", "@timmy:homeserver.local");
assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 300 }));
}
#[test]
fn htop_with_display_name() {
let cmd = extract_htop_command("Timmy htop", "Timmy", "@timmy:homeserver.local");
assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 300 }));
}
#[test]
fn htop_stop() {
let cmd = extract_htop_command("@timmy htop stop", "Timmy", "@timmy:homeserver.local");
assert_eq!(cmd, Some(HtopCommand::Stop));
}
#[test]
fn htop_duration_minutes() {
let cmd = extract_htop_command("@timmy htop 10m", "Timmy", "@timmy:homeserver.local");
assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 600 }));
}
#[test]
fn htop_duration_uppercase_m() {
let cmd = extract_htop_command("@timmy htop 2M", "Timmy", "@timmy:homeserver.local");
assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 120 }));
}
#[test]
fn htop_duration_seconds() {
let cmd = extract_htop_command("@timmy htop 90", "Timmy", "@timmy:homeserver.local");
assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 90 }));
}
#[test]
fn non_htop_command_returns_none() {
let cmd = extract_htop_command("@timmy status", "Timmy", "@timmy:homeserver.local");
assert!(cmd.is_none());
}
#[test]
fn unrelated_message_returns_none() {
let cmd = extract_htop_command("hello world", "Timmy", "@timmy:homeserver.local");
assert!(cmd.is_none());
}
#[test]
fn htop_case_insensitive() {
let cmd = extract_htop_command("@timmy HTOP", "Timmy", "@timmy:homeserver.local");
assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 300 }));
}
#[test]
fn htop_full_user_id() {
let cmd = extract_htop_command(
"@timmy:homeserver.local htop",
"Timmy",
"@timmy:homeserver.local",
);
assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 300 }));
}
#[test]
fn htop_with_comma_after_mention() {
// Some Matrix clients format mentions as "@timmy, htop"
let cmd = extract_htop_command("@timmy, htop", "Timmy", "@timmy:homeserver.local");
assert_eq!(cmd, Some(HtopCommand::Start { duration_secs: 300 }));
}
// -- parse_duration -----------------------------------------------------
#[test]
fn parse_duration_empty_returns_none() {
assert_eq!(parse_duration(""), None);
}
#[test]
fn parse_duration_minutes() {
assert_eq!(parse_duration("5m"), Some(300));
}
#[test]
fn parse_duration_seconds() {
assert_eq!(parse_duration("120"), Some(120));
}
#[test]
fn parse_duration_invalid_returns_none() {
assert_eq!(parse_duration("abc"), None);
}
// -- build_htop_message -------------------------------------------------
#[test]
fn build_htop_message_no_agents() {
let pool = Arc::new(crate::agents::AgentPool::new_test(3000));
let text = build_htop_message(&pool, 0, 300);
assert!(text.contains("htop"), "should mention htop: {text}");
assert!(
text.contains("No agents currently running"),
"should note no agents: {text}"
);
}
#[test]
fn build_htop_message_contains_load() {
let pool = Arc::new(crate::agents::AgentPool::new_test(3000));
let text = build_htop_message(&pool, 0, 300);
// Load average is gathered via `uptime`; it should appear in some form.
assert!(
text.contains("load"),
"message should contain load info: {text}"
);
}
#[test]
fn build_htop_message_shows_remaining_time() {
let pool = Arc::new(crate::agents::AgentPool::new_test(3000));
let text = build_htop_message(&pool, 0, 300);
assert!(
text.contains("auto-stops in"),
"should show remaining time: {text}"
);
}
}

View File

@@ -18,6 +18,7 @@
mod bot;
pub mod commands;
mod config;
pub mod htop;
pub mod notifications;
pub use config::BotConfig;