diff --git a/server/src/main.rs b/server/src/main.rs index f5d4773..a412399 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -167,6 +167,9 @@ async fn main() -> Result<(), std::io::Error> { // Permission channel: MCP prompt_permission → WebSocket handler. let (perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel(); + // Clone watcher_tx for the Matrix bot before it is moved into AppContext. + let watcher_tx_for_bot = watcher_tx.clone(); + // Capture project root, agents Arc, and reconciliation sender before ctx // is consumed by build_routes. let startup_root: Option = app_state.project_root.lock().unwrap().clone(); @@ -191,7 +194,7 @@ async fn main() -> Result<(), std::io::Error> { // Optional Matrix bot: connect to the homeserver and start listening for // messages if `.story_kit/bot.toml` is present and enabled. if let Some(ref root) = startup_root { - matrix::spawn_bot(root); + matrix::spawn_bot(root, watcher_tx_for_bot); } // On startup: diff --git a/server/src/matrix/bot.rs b/server/src/matrix/bot.rs index aa2afd1..0eb764e 100644 --- a/server/src/matrix/bot.rs +++ b/server/src/matrix/bot.rs @@ -86,7 +86,11 @@ pub struct BotContext { /// Connect to the Matrix homeserver, join all configured rooms, and start /// listening for messages. Runs the full Matrix sync loop — call from a /// `tokio::spawn` task so it doesn't block the main thread. -pub async fn run_bot(config: BotConfig, project_root: PathBuf) -> Result<(), String> { +pub async fn run_bot( + config: BotConfig, + project_root: PathBuf, + watcher_rx: tokio::sync::broadcast::Receiver, +) -> Result<(), String> { let store_path = project_root.join(".story_kit").join("matrix_store"); let client = Client::builder() .homeserver_url(&config.homeserver) @@ -181,6 +185,11 @@ pub async fn run_bot(config: BotConfig, project_root: PathBuf) -> Result<(), Str target_room_ids ); + // Clone values needed by the notification listener before they are moved + // into BotContext. + let notif_room_ids = target_room_ids.clone(); + let notif_project_root = project_root.clone(); + let ctx = BotContext { bot_user_id, target_room_ids, @@ -198,6 +207,15 @@ pub async fn run_bot(config: BotConfig, project_root: PathBuf) -> Result<(), Str client.add_event_handler(on_room_message); client.add_event_handler(on_to_device_verification_request); + // Spawn the stage-transition notification listener before entering the + // sync loop so it starts receiving watcher events immediately. + super::notifications::spawn_notification_listener( + client.clone(), + notif_room_ids, + watcher_rx, + notif_project_root, + ); + slog!("[matrix-bot] Starting Matrix sync loop"); // This blocks until the connection is terminated or an error occurs. diff --git a/server/src/matrix/mod.rs b/server/src/matrix/mod.rs index 2699d10..3269f61 100644 --- a/server/src/matrix/mod.rs +++ b/server/src/matrix/mod.rs @@ -17,10 +17,13 @@ mod bot; mod config; +pub mod notifications; pub use config::BotConfig; +use crate::io::watcher::WatcherEvent; use std::path::Path; +use tokio::sync::broadcast; /// Attempt to start the Matrix bot. /// @@ -28,8 +31,12 @@ use std::path::Path; /// absent or `enabled = false`, this function returns immediately without /// spawning anything — the server continues normally. /// +/// When the bot is enabled, a notification listener is also spawned that +/// posts stage-transition messages to all configured rooms whenever a work +/// item moves between pipeline stages. +/// /// Must be called from within a Tokio runtime context (e.g., from `main`). -pub fn spawn_bot(project_root: &Path) { +pub fn spawn_bot(project_root: &Path, watcher_tx: broadcast::Sender) { let config = match BotConfig::load(project_root) { Some(c) => c, None => { @@ -45,8 +52,9 @@ pub fn spawn_bot(project_root: &Path) { ); let root = project_root.to_path_buf(); + let watcher_rx = watcher_tx.subscribe(); tokio::spawn(async move { - if let Err(e) = bot::run_bot(config, root).await { + if let Err(e) = bot::run_bot(config, root, watcher_rx).await { crate::slog!("[matrix-bot] Fatal error: {e}"); } }); diff --git a/server/src/matrix/notifications.rs b/server/src/matrix/notifications.rs new file mode 100644 index 0000000..90820f9 --- /dev/null +++ b/server/src/matrix/notifications.rs @@ -0,0 +1,296 @@ +//! Stage transition notifications for Matrix rooms. +//! +//! Subscribes to [`WatcherEvent`] broadcasts and posts a notification to all +//! configured Matrix rooms whenever a work item moves between pipeline stages. + +use crate::io::story_metadata::parse_front_matter; +use crate::io::watcher::WatcherEvent; +use crate::slog; +use matrix_sdk::ruma::events::room::message::RoomMessageEventContent; +use matrix_sdk::ruma::OwnedRoomId; +use matrix_sdk::Client; +use std::path::{Path, PathBuf}; +use tokio::sync::broadcast; + +/// Human-readable display name for a pipeline stage directory. +pub fn stage_display_name(stage: &str) -> &'static str { + match stage { + "1_upcoming" => "Upcoming", + "2_current" => "Current", + "3_qa" => "QA", + "4_merge" => "Merge", + "5_done" => "Done", + "6_archived" => "Archived", + _ => "Unknown", + } +} + +/// Infer the previous pipeline stage for a given destination stage. +/// +/// Returns `None` for `1_upcoming` since items are created there (not +/// transitioned from another stage). +pub fn inferred_from_stage(to_stage: &str) -> Option<&'static str> { + match to_stage { + "2_current" => Some("Upcoming"), + "3_qa" => Some("Current"), + "4_merge" => Some("QA"), + "5_done" => Some("Merge"), + "6_archived" => Some("Done"), + _ => None, + } +} + +/// Extract the numeric story number from an item ID like `"261_story_slug"`. +pub fn extract_story_number(item_id: &str) -> Option<&str> { + item_id + .split('_') + .next() + .filter(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit())) +} + +/// Read the story name from the work item file's YAML front matter. +/// +/// Returns `None` if the file doesn't exist or has no parseable name. +pub fn read_story_name(project_root: &Path, stage: &str, item_id: &str) -> Option { + let path = project_root + .join(".story_kit") + .join("work") + .join(stage) + .join(format!("{item_id}.md")); + let contents = std::fs::read_to_string(&path).ok()?; + let meta = parse_front_matter(&contents).ok()?; + meta.name +} + +/// Format a stage transition notification message. +/// +/// Returns `(plain_text, html)` suitable for `RoomMessageEventContent::text_html`. +pub fn format_stage_notification( + item_id: &str, + story_name: Option<&str>, + from_stage: &str, + to_stage: &str, +) -> (String, String) { + let number = extract_story_number(item_id).unwrap_or(item_id); + let name = story_name.unwrap_or(item_id); + + let plain = format!("#{number} {name} \u{2014} {from_stage} \u{2192} {to_stage}"); + let html = format!( + "#{number} {name} \u{2014} {from_stage} \u{2192} {to_stage}" + ); + (plain, html) +} + +/// Spawn a background task that listens for watcher events and posts +/// stage-transition notifications to all configured Matrix rooms. +pub fn spawn_notification_listener( + client: Client, + room_ids: Vec, + watcher_rx: broadcast::Receiver, + project_root: PathBuf, +) { + tokio::spawn(async move { + let mut rx = watcher_rx; + loop { + match rx.recv().await { + Ok(WatcherEvent::WorkItem { + ref stage, + ref item_id, + .. + }) => { + // Only notify on stage transitions, not creations. + let Some(from_display) = inferred_from_stage(stage) else { + continue; + }; + let to_display = stage_display_name(stage); + + let story_name = read_story_name(&project_root, stage, item_id); + let (plain, html) = format_stage_notification( + item_id, + story_name.as_deref(), + from_display, + to_display, + ); + + slog!("[matrix-bot] Sending stage notification: {plain}"); + + for room_id in &room_ids { + if let Some(room) = client.get_room(room_id) { + let content = + RoomMessageEventContent::text_html(plain.clone(), html.clone()); + if let Err(e) = room.send(content).await { + slog!( + "[matrix-bot] Failed to send notification to {room_id}: {e}" + ); + } + } + } + } + Ok(_) => {} // Ignore non-work-item events + Err(broadcast::error::RecvError::Lagged(n)) => { + slog!( + "[matrix-bot] Notification listener lagged, skipped {n} events" + ); + } + Err(broadcast::error::RecvError::Closed) => { + slog!( + "[matrix-bot] Watcher channel closed, stopping notification listener" + ); + break; + } + } + } + }); +} + +#[cfg(test)] +mod tests { + use super::*; + + // ── stage_display_name ────────────────────────────────────────────────── + + #[test] + fn stage_display_name_maps_all_known_stages() { + assert_eq!(stage_display_name("1_upcoming"), "Upcoming"); + assert_eq!(stage_display_name("2_current"), "Current"); + assert_eq!(stage_display_name("3_qa"), "QA"); + assert_eq!(stage_display_name("4_merge"), "Merge"); + assert_eq!(stage_display_name("5_done"), "Done"); + assert_eq!(stage_display_name("6_archived"), "Archived"); + assert_eq!(stage_display_name("unknown"), "Unknown"); + } + + // ── inferred_from_stage ───────────────────────────────────────────────── + + #[test] + fn inferred_from_stage_returns_previous_stage() { + assert_eq!(inferred_from_stage("2_current"), Some("Upcoming")); + assert_eq!(inferred_from_stage("3_qa"), Some("Current")); + assert_eq!(inferred_from_stage("4_merge"), Some("QA")); + assert_eq!(inferred_from_stage("5_done"), Some("Merge")); + assert_eq!(inferred_from_stage("6_archived"), Some("Done")); + } + + #[test] + fn inferred_from_stage_returns_none_for_upcoming() { + assert_eq!(inferred_from_stage("1_upcoming"), None); + } + + #[test] + fn inferred_from_stage_returns_none_for_unknown() { + assert_eq!(inferred_from_stage("9_unknown"), None); + } + + // ── extract_story_number ──────────────────────────────────────────────── + + #[test] + fn extract_story_number_parses_numeric_prefix() { + assert_eq!( + extract_story_number("261_story_bot_notifications"), + Some("261") + ); + assert_eq!(extract_story_number("42_bug_fix_thing"), Some("42")); + assert_eq!(extract_story_number("1_spike_research"), Some("1")); + } + + #[test] + fn extract_story_number_returns_none_for_non_numeric() { + assert_eq!(extract_story_number("abc_story_thing"), None); + assert_eq!(extract_story_number(""), None); + } + + // ── read_story_name ───────────────────────────────────────────────────── + + #[test] + fn read_story_name_reads_from_front_matter() { + let tmp = tempfile::tempdir().unwrap(); + let stage_dir = tmp + .path() + .join(".story_kit") + .join("work") + .join("2_current"); + std::fs::create_dir_all(&stage_dir).unwrap(); + std::fs::write( + stage_dir.join("42_story_my_feature.md"), + "---\nname: My Cool Feature\n---\n# Story\n", + ) + .unwrap(); + + let name = read_story_name(tmp.path(), "2_current", "42_story_my_feature"); + assert_eq!(name.as_deref(), Some("My Cool Feature")); + } + + #[test] + fn read_story_name_returns_none_for_missing_file() { + let tmp = tempfile::tempdir().unwrap(); + let name = read_story_name(tmp.path(), "2_current", "99_story_missing"); + assert_eq!(name, None); + } + + #[test] + fn read_story_name_returns_none_for_missing_name_field() { + let tmp = tempfile::tempdir().unwrap(); + let stage_dir = tmp + .path() + .join(".story_kit") + .join("work") + .join("2_current"); + std::fs::create_dir_all(&stage_dir).unwrap(); + std::fs::write( + stage_dir.join("42_story_no_name.md"), + "---\ncoverage_baseline: 50%\n---\n# Story\n", + ) + .unwrap(); + + let name = read_story_name(tmp.path(), "2_current", "42_story_no_name"); + assert_eq!(name, None); + } + + // ── format_stage_notification ─────────────────────────────────────────── + + #[test] + fn format_notification_with_story_name() { + let (plain, html) = format_stage_notification( + "261_story_bot_notifications", + Some("Bot notifications"), + "Upcoming", + "Current", + ); + assert_eq!( + plain, + "#261 Bot notifications \u{2014} Upcoming \u{2192} Current" + ); + assert_eq!( + html, + "#261 Bot notifications \u{2014} Upcoming \u{2192} Current" + ); + } + + #[test] + fn format_notification_without_story_name_falls_back_to_item_id() { + let (plain, _html) = format_stage_notification( + "42_bug_fix_thing", + None, + "Current", + "QA", + ); + assert_eq!( + plain, + "#42 42_bug_fix_thing \u{2014} Current \u{2192} QA" + ); + } + + #[test] + fn format_notification_non_numeric_id_uses_full_id() { + let (plain, _html) = format_stage_notification( + "abc_story_thing", + Some("Some Story"), + "QA", + "Merge", + ); + assert_eq!( + plain, + "#abc_story_thing Some Story \u{2014} QA \u{2192} Merge" + ); + } +}