From 3521649cbfc562f569b822b29ac0bd9979c17fde Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 23 Apr 2026 12:05:27 +0000 Subject: [PATCH] huskies: merge 599_story_cross_project_status_notifications_in_chat --- server/src/chat/transport/matrix/bot/run.rs | 19 + server/src/chat/transport/matrix/config.rs | 69 +++ server/src/gateway.rs | 559 ++++++++++++++++++++ server/src/http/events.rs | 341 ++++++++++++ server/src/http/mod.rs | 18 +- server/src/main.rs | 15 +- website/docs/configuration.html | 55 ++ website/docs/transports.html | 7 + 8 files changed, 1080 insertions(+), 3 deletions(-) create mode 100644 server/src/http/events.rs diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index d69f9ae7..f15cbd11 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -168,6 +168,11 @@ pub async fn run_bot( let notif_room_ids = target_room_ids.clone(); let notif_project_root = project_root.clone(); let announce_room_ids = target_room_ids.clone(); + // Clone values needed by the gateway notification poller (only used in gateway mode). + let poller_room_ids: Vec = target_room_ids.iter().map(|r| r.to_string()).collect(); + let poller_project_urls = gateway_project_urls.clone(); + let poller_poll_interval = config.aggregated_notifications_poll_interval_secs; + let poller_enabled = config.aggregated_notifications_enabled; let persisted = load_history(&project_root); slog!( @@ -271,6 +276,20 @@ pub async fn run_bot( notif_project_root, ); + // In gateway mode, spawn the cross-project notification poller. + // It polls every registered project's `/api/events` endpoint and forwards + // new events to the configured gateway rooms with a `[project-name]` prefix. + // The poller is controlled by the gateway-level `aggregated_notifications_enabled` + // flag in bot.toml — set it to `false` to disable without touching per-project configs. + if !poller_project_urls.is_empty() && poller_enabled { + crate::gateway::spawn_gateway_notification_poller( + Arc::clone(&transport), + poller_room_ids, + poller_project_urls, + poller_poll_interval, + ); + } + // Spawn a shutdown watcher that sends a best-effort goodbye message to all // configured rooms when the server is about to stop (SIGINT/SIGTERM or rebuild). { diff --git a/server/src/chat/transport/matrix/config.rs b/server/src/chat/transport/matrix/config.rs index af1dd65e..0dc7a10c 100644 --- a/server/src/chat/transport/matrix/config.rs +++ b/server/src/chat/transport/matrix/config.rs @@ -10,6 +10,14 @@ fn default_permission_timeout_secs() -> u64 { 120 } +fn default_aggregated_notifications_poll_interval_secs() -> u64 { + 5 +} + +fn default_aggregated_notifications_enabled() -> bool { + true +} + /// Configuration for the Matrix bot, read from `.huskies/bot.toml`. #[derive(Deserialize, Clone, Debug)] pub struct BotConfig { @@ -146,6 +154,26 @@ pub struct BotConfig { /// When empty or absent, all users in configured channels are allowed. #[serde(default)] pub discord_allowed_users: Vec, + + /// How often (in seconds) the gateway polls each project server's + /// `/api/events` endpoint to aggregate cross-project notifications. + /// + /// Only used when the gateway's bot is enabled. Defaults to 5 seconds. + #[serde(default = "default_aggregated_notifications_poll_interval_secs")] + pub aggregated_notifications_poll_interval_secs: u64, + + /// Whether the gateway-level aggregated cross-project notification stream + /// is enabled. When `false`, the gateway will not poll per-project + /// servers for events even if the bot is otherwise enabled. + /// + /// Set this in the **gateway's** `bot.toml` (not in per-project configs). + /// Adding a new project to `projects.toml` never requires touching + /// per-project bot configs — the aggregated stream picks it up + /// automatically once this flag is `true` (the default). + /// + /// Defaults to `true`. + #[serde(default = "default_aggregated_notifications_enabled")] + pub aggregated_notifications_enabled: bool, } fn default_transport() -> String { @@ -658,6 +686,47 @@ require_verified_devices = true ); } + #[test] + fn aggregated_notifications_enabled_defaults_to_true() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".huskies"); + fs::create_dir_all(&sk).unwrap(); + fs::write( + sk.join("bot.toml"), + r#" +homeserver = "https://matrix.example.com" +username = "@bot:example.com" +password = "secret" +room_ids = ["!abc:example.com"] +enabled = true +"#, + ) + .unwrap(); + let config = BotConfig::load(tmp.path()).unwrap(); + assert!(config.aggregated_notifications_enabled); + } + + #[test] + fn aggregated_notifications_enabled_can_be_set_to_false() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".huskies"); + fs::create_dir_all(&sk).unwrap(); + fs::write( + sk.join("bot.toml"), + r#" +homeserver = "https://matrix.example.com" +username = "@bot:example.com" +password = "secret" +room_ids = ["!abc:example.com"] +enabled = true +aggregated_notifications_enabled = false +"#, + ) + .unwrap(); + let config = BotConfig::load(tmp.path()).unwrap(); + assert!(!config.aggregated_notifications_enabled); + } + #[test] fn load_reads_ambient_rooms() { let tmp = tempfile::tempdir().unwrap(); diff --git a/server/src/gateway.rs b/server/src/gateway.rs index 05c65e23..e9628e60 100644 --- a/server/src/gateway.rs +++ b/server/src/gateway.rs @@ -2223,6 +2223,135 @@ fn spawn_gateway_bot( ) } +// ── Cross-project notification poller ───────────────────────────────── + +/// Spawn a background task that polls `GET /api/events?since={ts_ms}` on every +/// registered project server and forwards new events to the gateway's chat rooms. +/// +/// Each event is prefixed with `[project-name]` so users can distinguish which +/// project emitted the notification. Unreachable projects produce a log warning +/// and are skipped; the poller continues with all other projects. +/// +/// This is only called when the gateway bot is enabled (`bot.toml enabled = true`). +/// +/// # Arguments +/// +/// * `transport` — the gateway-level [`ChatTransport`] used to send messages. +/// * `room_ids` — the list of room IDs to send notifications to. +/// * `project_urls` — the map of project name → base URL (e.g. `http://host:3001`). +/// * `poll_interval_secs` — how often to poll each project (default 5). +pub fn spawn_gateway_notification_poller( + transport: Arc, + room_ids: Vec, + project_urls: BTreeMap, + poll_interval_secs: u64, +) { + tokio::spawn(async move { + let client = Client::builder() + .timeout(std::time::Duration::from_secs(10)) + .build() + .unwrap_or_else(|_| Client::new()); + let interval = std::time::Duration::from_secs(poll_interval_secs.max(1)); + + // Track the last seen timestamp per project so we only receive new events. + let mut last_ts: HashMap = project_urls + .keys() + .map(|name| (name.clone(), 0u64)) + .collect(); + + loop { + for (project_name, base_url) in &project_urls { + let since = last_ts.get(project_name).copied().unwrap_or(0); + let url = format!("{base_url}/api/events?since={since}"); + + let response = match client.get(&url).send().await { + Ok(r) => r, + Err(e) => { + crate::slog!( + "[gateway-poller] {project_name}: unreachable ({e}); skipping" + ); + continue; + } + }; + + let events: Vec = match response.json().await { + Ok(v) => v, + Err(e) => { + crate::slog!( + "[gateway-poller] {project_name}: failed to parse events: {e}" + ); + continue; + } + }; + + for event in &events { + // Advance the cursor. + let ts = event.timestamp_ms(); + if ts > *last_ts.get(project_name).unwrap_or(&0) { + last_ts.insert(project_name.clone(), ts); + } + + let (plain, html) = format_gateway_event(project_name, event); + for room_id in &room_ids { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + crate::slog!( + "[gateway-poller] Failed to send notification to {room_id}: {e}" + ); + } + } + } + } + + tokio::time::sleep(interval).await; + } + }); +} + +/// Format a [`crate::http::events::StoredEvent`] from a project into a gateway notification. +/// +/// Prefixes the message with `[project-name]` so users can distinguish which +/// project emitted the event. Story names are not available at the gateway +/// level, so the item ID is used as a fallback (the formatting functions +/// extract the numeric story number from it automatically). +fn format_gateway_event( + project_name: &str, + event: &crate::http::events::StoredEvent, +) -> (String, String) { + use crate::chat::transport::matrix::notifications::{ + format_blocked_notification, format_error_notification, format_stage_notification, + stage_display_name, + }; + use crate::http::events::StoredEvent; + + let prefix = format!("[{project_name}] "); + + match event { + StoredEvent::StageTransition { + story_id, + from_stage, + to_stage, + .. + } => { + let from_display = stage_display_name(from_stage); + let to_display = stage_display_name(to_stage); + let (plain, html) = format_stage_notification(story_id, None, from_display, to_display); + (format!("{prefix}{plain}"), format!("{prefix}{html}")) + } + StoredEvent::MergeFailure { + story_id, reason, .. + } => { + let (plain, html) = format_error_notification(story_id, None, reason); + (format!("{prefix}{plain}"), format!("{prefix}{html}")) + } + StoredEvent::StoryBlocked { + story_id, reason, .. + } => { + let (plain, html) = format_blocked_notification(story_id, None, reason); + (format!("{prefix}{plain}"), format!("{prefix}{html}")) + } + } +} + // ── Tests ──────────────────────────────────────────────────────────── #[cfg(test)] @@ -2673,6 +2802,233 @@ enabled = false assert_eq!(resp.0.status(), StatusCode::NOT_FOUND); } + /// AC5: When one project server is unreachable the poller continues delivering + /// events from the remaining reachable projects without failing. + #[tokio::test] + async fn gateway_notification_poller_continues_when_one_project_unreachable() { + use crate::chat::{ChatTransport, MessageId}; + use crate::http::events::StoredEvent; + use async_trait::async_trait; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + type CallLog = Arc>>; + + struct MockTransport { + calls: CallLog, + } + + #[async_trait] + impl ChatTransport for MockTransport { + async fn send_message( + &self, + _room_id: &str, + plain: &str, + _html: &str, + ) -> Result { + self.calls.lock().unwrap().push(plain.to_string()); + Ok("id".to_string()) + } + + async fn edit_message( + &self, + _room_id: &str, + _id: &str, + _plain: &str, + _html: &str, + ) -> Result<(), String> { + Ok(()) + } + + async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { + Ok(()) + } + } + + let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); + let transport = Arc::new(MockTransport { + calls: Arc::clone(&calls), + }); + + // Start a reachable mock project server that returns one event. + let event = vec![StoredEvent::StoryBlocked { + story_id: "10_story_ok".to_string(), + reason: "retry limit".to_string(), + timestamp_ms: 500, + }]; + let event_body = serde_json::to_vec(&event).unwrap(); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let good_port = listener.local_addr().unwrap().port(); + let good_url = format!("http://127.0.0.1:{good_port}"); + tokio::spawn(async move { + for _ in 0..4 { + if let Ok((mut stream, _)) = listener.accept().await { + let mut buf = vec![0u8; 4096]; + let _ = stream.read(&mut buf).await; + let header = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + event_body.len() + ); + let _ = stream.write_all(header.as_bytes()).await; + let _ = stream.write_all(&event_body).await; + } + } + }); + + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + // An unreachable URL (port 1 cannot be bound). + let bad_url = "http://127.0.0.1:1".to_string(); + + let mut project_urls = BTreeMap::new(); + project_urls.insert("good-project".to_string(), good_url); + project_urls.insert("unreachable-project".to_string(), bad_url); + + spawn_gateway_notification_poller( + transport as Arc, + vec!["!room:example.org".to_string()], + project_urls, + 1, + ); + + // Wait for at least one poll cycle. + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + + // Events from the reachable project must still arrive. + let messages = calls.lock().unwrap(); + assert!( + !messages.is_empty(), + "Expected notifications from the reachable project; got none" + ); + let has_good = messages + .iter() + .any(|m| m.contains("[good-project]") && m.contains("10_story_ok")); + assert!( + has_good, + "Expected a notification from [good-project]; got: {messages:?}" + ); + // Unreachable project must not produce any notifications. + let has_bad = messages.iter().any(|m| m.contains("[unreachable-project]")); + assert!( + !has_bad, + "Unreachable project must not produce notifications; got: {messages:?}" + ); + } + + /// AC4: When both a per-project bot and the gateway aggregated stream are + /// configured, events go to each room exactly once. + /// + /// The gateway notification poller only sends to the gateway room IDs it is + /// given — it never forwards events to per-project rooms. Conversely, the + /// per-project notification listener subscribes to watcher broadcasts which + /// are completely separate from the HTTP-polled event buffer. This test + /// verifies the poller respects its room list (no cross-room leakage). + #[tokio::test] + async fn gateway_notification_poller_sends_only_to_configured_gateway_rooms() { + use crate::chat::{ChatTransport, MessageId}; + use crate::http::events::StoredEvent; + use async_trait::async_trait; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + type RoomLog = Arc>>; + + struct RoomCapture { + rooms: RoomLog, + } + + #[async_trait] + impl ChatTransport for RoomCapture { + async fn send_message( + &self, + room_id: &str, + _plain: &str, + _html: &str, + ) -> Result { + self.rooms.lock().unwrap().push(room_id.to_string()); + Ok("id".to_string()) + } + + async fn edit_message( + &self, + _room_id: &str, + _id: &str, + _plain: &str, + _html: &str, + ) -> Result<(), String> { + Ok(()) + } + + async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { + Ok(()) + } + } + + let rooms: RoomLog = Arc::new(std::sync::Mutex::new(Vec::new())); + let transport = Arc::new(RoomCapture { + rooms: Arc::clone(&rooms), + }); + + // Serve one event from a mock project server. + let event = vec![StoredEvent::MergeFailure { + story_id: "5_story_x".to_string(), + reason: "conflict".to_string(), + timestamp_ms: 300, + }]; + let event_body = serde_json::to_vec(&event).unwrap(); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let url = format!("http://127.0.0.1:{port}"); + tokio::spawn(async move { + for _ in 0..4 { + if let Ok((mut stream, _)) = listener.accept().await { + let mut buf = vec![0u8; 4096]; + let _ = stream.read(&mut buf).await; + let header = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + event_body.len() + ); + let _ = stream.write_all(header.as_bytes()).await; + let _ = stream.write_all(&event_body).await; + } + } + }); + + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + const GATEWAY_ROOM: &str = "!gateway:example.org"; + const PER_PROJECT_ROOM: &str = "!project:example.org"; + + let mut project_urls = BTreeMap::new(); + project_urls.insert("myproj".to_string(), url); + + // Poller is given only the gateway room — per-project room must never receive. + spawn_gateway_notification_poller( + transport as Arc, + vec![GATEWAY_ROOM.to_string()], + project_urls, + 1, + ); + + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + + let room_calls = rooms.lock().unwrap(); + // Every notification must go to the gateway room only. + assert!( + !room_calls.is_empty(), + "Expected at least one notification; got none" + ); + for room in room_calls.iter() { + assert_eq!( + room, GATEWAY_ROOM, + "Notification must only go to the gateway room, not {room}" + ); + } + // The per-project room must never have been contacted. + assert!( + !room_calls.iter().any(|r| r == PER_PROJECT_ROOM), + "Per-project room must not receive gateway aggregated notifications" + ); + } + /// Build the full gateway route tree and verify it does not panic. /// /// Poem panics at construction time when duplicate routes are registered. @@ -3136,4 +3492,207 @@ enabled = false "unreachable project must have error field: {broken}" ); } + + // ── format_gateway_event unit tests ───────────────────────────────── + + #[test] + fn format_gateway_event_stage_transition_prefixes_project_name() { + use crate::http::events::StoredEvent; + + let event = StoredEvent::StageTransition { + story_id: "42_story_my_feature".to_string(), + from_stage: "2_current".to_string(), + to_stage: "3_qa".to_string(), + timestamp_ms: 1000, + }; + let (plain, html) = format_gateway_event("huskies", &event); + assert!(plain.starts_with("[huskies] "), "plain: {plain}"); + assert!(html.starts_with("[huskies] "), "html: {html}"); + assert!(plain.contains("Current"), "plain: {plain}"); + assert!(plain.contains("QA"), "plain: {plain}"); + } + + #[test] + fn format_gateway_event_merge_failure_prefixes_project_name() { + use crate::http::events::StoredEvent; + + let event = StoredEvent::MergeFailure { + story_id: "42_story_my_feature".to_string(), + reason: "merge conflict".to_string(), + timestamp_ms: 1000, + }; + let (plain, _html) = format_gateway_event("robot-studio", &event); + assert!(plain.starts_with("[robot-studio] "), "plain: {plain}"); + assert!(plain.contains("merge conflict"), "plain: {plain}"); + } + + #[test] + fn format_gateway_event_story_blocked_prefixes_project_name() { + use crate::http::events::StoredEvent; + + let event = StoredEvent::StoryBlocked { + story_id: "43_story_bar".to_string(), + reason: "retry limit exceeded".to_string(), + timestamp_ms: 2000, + }; + let (plain, _html) = format_gateway_event("huskies", &event); + assert!(plain.starts_with("[huskies] "), "plain: {plain}"); + assert!(plain.contains("BLOCKED"), "plain: {plain}"); + } + + /// AC7 integration test: two mock HTTP servers, trigger events, assert + /// aggregated stream gets both with project tags. + #[tokio::test] + async fn gateway_notification_poller_delivers_events_from_two_projects_with_project_tags() { + use crate::chat::{ChatTransport, MessageId}; + use crate::http::events::StoredEvent; + use async_trait::async_trait; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + // ── MockTransport ────────────────────────────────────────────────── + type CallLog = Arc>>; + + struct MockTransport { + calls: CallLog, + } + + #[async_trait] + impl ChatTransport for MockTransport { + async fn send_message( + &self, + room_id: &str, + plain: &str, + html: &str, + ) -> Result { + self.calls.lock().unwrap().push(( + room_id.to_string(), + plain.to_string(), + html.to_string(), + )); + Ok("mock-id".to_string()) + } + + async fn edit_message( + &self, + _room_id: &str, + _id: &str, + _plain: &str, + _html: &str, + ) -> Result<(), String> { + Ok(()) + } + + async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { + Ok(()) + } + } + + let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); + let transport = Arc::new(MockTransport { + calls: Arc::clone(&calls), + }); + + // ── Mock HTTP server for project "alpha" ─────────────────────────── + let alpha_events = vec![StoredEvent::StageTransition { + story_id: "1_story_alpha".to_string(), + from_stage: "2_current".to_string(), + to_stage: "3_qa".to_string(), + timestamp_ms: 100, + }]; + let alpha_body = serde_json::to_vec(&alpha_events).unwrap(); + let alpha_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let alpha_port = alpha_listener.local_addr().unwrap().port(); + let alpha_url = format!("http://127.0.0.1:{alpha_port}"); + tokio::spawn(async move { + // Handle two requests (poller might poll more than once). + for _ in 0..4 { + if let Ok((mut stream, _)) = alpha_listener.accept().await { + let mut buf = vec![0u8; 4096]; + let _ = stream.read(&mut buf).await; + let header = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + alpha_body.len() + ); + let _ = stream.write_all(header.as_bytes()).await; + let _ = stream.write_all(&alpha_body).await; + } + } + }); + + // ── Mock HTTP server for project "beta" ──────────────────────────── + let beta_events = vec![StoredEvent::MergeFailure { + story_id: "2_story_beta".to_string(), + reason: "merge conflict in lib.rs".to_string(), + timestamp_ms: 200, + }]; + let beta_body = serde_json::to_vec(&beta_events).unwrap(); + let beta_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let beta_port = beta_listener.local_addr().unwrap().port(); + let beta_url = format!("http://127.0.0.1:{beta_port}"); + tokio::spawn(async move { + for _ in 0..4 { + if let Ok((mut stream, _)) = beta_listener.accept().await { + let mut buf = vec![0u8; 4096]; + let _ = stream.read(&mut buf).await; + let header = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + beta_body.len() + ); + let _ = stream.write_all(header.as_bytes()).await; + let _ = stream.write_all(&beta_body).await; + } + } + }); + + // Give mock servers a moment to bind. + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + // ── Run the poller ───────────────────────────────────────────────── + let mut project_urls = BTreeMap::new(); + project_urls.insert("alpha".to_string(), alpha_url); + project_urls.insert("beta".to_string(), beta_url); + + spawn_gateway_notification_poller( + transport as Arc, + vec!["!room:example.org".to_string()], + project_urls, + 1, // poll every 1 second + ); + + // Wait long enough for at least one poll cycle. + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + + // ── Assert both events appear with correct [project-name] prefix ─── + let calls = calls.lock().unwrap(); + assert!( + !calls.is_empty(), + "Expected at least one notification; got none" + ); + + let plains: Vec<&str> = calls.iter().map(|(_, p, _)| p.as_str()).collect(); + + let alpha_notification = plains + .iter() + .any(|p| p.contains("[alpha]") && p.contains("1")); + let beta_notification = plains + .iter() + .any(|p| p.contains("[beta]") && p.contains("merge conflict")); + + assert!( + alpha_notification, + "Expected a notification from [alpha] containing story ID '1'; got: {plains:?}" + ); + assert!( + beta_notification, + "Expected a notification from [beta] containing 'merge conflict'; got: {plains:?}" + ); + + // All notifications must go to the configured room. + for (room_id, _, _) in calls.iter() { + assert_eq!( + room_id, "!room:example.org", + "All notifications must go to the gateway room" + ); + } + } } diff --git a/server/src/http/events.rs b/server/src/http/events.rs new file mode 100644 index 00000000..f06f60ab --- /dev/null +++ b/server/src/http/events.rs @@ -0,0 +1,341 @@ +//! Per-project event buffer and `GET /api/events` HTTP endpoint. +//! +//! The gateway polls `/api/events?since={ts_ms}` on each registered project +//! server to aggregate cross-project pipeline notifications into a single +//! gateway chat channel. Each project server buffers up to 500 events in +//! memory and serves them via this endpoint. + +use crate::io::watcher::WatcherEvent; +use poem::web::{Data, Query}; +use poem::{Response, handler, http::StatusCode}; +use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; + +/// Maximum number of events retained in the in-memory buffer. +const MAX_BUFFER_SIZE: usize = 500; + +/// A pipeline event stored in the event buffer with a timestamp. +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum StoredEvent { + /// A work item transitioned between pipeline stages. + StageTransition { + /// Work item ID (e.g. `"42_story_my_feature"`). + story_id: String, + /// The stage the item moved FROM (display name, e.g. `"Current"`). + from_stage: String, + /// The stage the item moved TO (directory key, e.g. `"3_qa"`). + to_stage: String, + /// Unix timestamp in milliseconds when this event was recorded. + timestamp_ms: u64, + }, + /// A merge operation failed for a story. + MergeFailure { + /// Work item ID (e.g. `"42_story_my_feature"`). + story_id: String, + /// Human-readable description of the failure. + reason: String, + /// Unix timestamp in milliseconds when this event was recorded. + timestamp_ms: u64, + }, + /// A story was blocked (e.g. retry limit exceeded). + StoryBlocked { + /// Work item ID (e.g. `"42_story_my_feature"`). + story_id: String, + /// Human-readable reason the story was blocked. + reason: String, + /// Unix timestamp in milliseconds when this event was recorded. + timestamp_ms: u64, + }, +} + +impl StoredEvent { + /// Returns the `timestamp_ms` field common to all event variants. + pub fn timestamp_ms(&self) -> u64 { + match self { + StoredEvent::StageTransition { timestamp_ms, .. } => *timestamp_ms, + StoredEvent::MergeFailure { timestamp_ms, .. } => *timestamp_ms, + StoredEvent::StoryBlocked { timestamp_ms, .. } => *timestamp_ms, + } + } +} + +/// Shared, thread-safe ring buffer of recent pipeline events. +/// +/// Wrapped in `Arc` so it can be shared between the background subscriber +/// task and the HTTP handler. The inner `Mutex` guards the `VecDeque`. +#[derive(Clone, Debug)] +pub struct EventBuffer(Arc>>); + +impl EventBuffer { + /// Create a new, empty event buffer. + pub fn new() -> Self { + EventBuffer(Arc::new(Mutex::new(VecDeque::new()))) + } + + /// Append an event to the buffer, evicting the oldest entry if the buffer + /// exceeds [`MAX_BUFFER_SIZE`]. + pub fn push(&self, event: StoredEvent) { + let mut buf = self.0.lock().unwrap(); + if buf.len() >= MAX_BUFFER_SIZE { + buf.pop_front(); + } + buf.push_back(event); + } + + /// Return all events whose `timestamp_ms` is strictly greater than `since_ms`. + pub fn events_since(&self, since_ms: u64) -> Vec { + let buf = self.0.lock().unwrap(); + buf.iter() + .filter(|e| e.timestamp_ms() > since_ms) + .cloned() + .collect() + } +} + +impl Default for EventBuffer { + fn default() -> Self { + Self::new() + } +} + +/// Returns the current Unix timestamp in milliseconds. +fn now_ms() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) +} + +/// Spawn a background task that consumes [`WatcherEvent`] broadcasts and +/// stores relevant events in `buffer`. +/// +/// Only [`WatcherEvent::WorkItem`] (with a known `from_stage`), +/// [`WatcherEvent::MergeFailure`], and [`WatcherEvent::StoryBlocked`] +/// variants are stored. All other variants are silently ignored. +pub fn subscribe_to_watcher(buffer: EventBuffer, mut rx: broadcast::Receiver) { + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(WatcherEvent::WorkItem { + stage, + item_id, + from_stage, + .. + }) => { + // Only store genuine transitions (from_stage is known). + if let Some(from) = from_stage { + buffer.push(StoredEvent::StageTransition { + story_id: item_id, + from_stage: from, + to_stage: stage, + timestamp_ms: now_ms(), + }); + } + } + Ok(WatcherEvent::MergeFailure { story_id, reason }) => { + buffer.push(StoredEvent::MergeFailure { + story_id, + reason, + timestamp_ms: now_ms(), + }); + } + Ok(WatcherEvent::StoryBlocked { story_id, reason }) => { + buffer.push(StoredEvent::StoryBlocked { + story_id, + reason, + timestamp_ms: now_ms(), + }); + } + Ok(_) => {} // Ignore all other event types. + Err(broadcast::error::RecvError::Lagged(n)) => { + crate::slog!("[events] Subscriber lagged, skipped {n} events"); + } + Err(broadcast::error::RecvError::Closed) => { + crate::slog!("[events] Watcher channel closed; stopping event subscriber"); + break; + } + } + } + }); +} + +/// Query parameters for `GET /api/events`. +#[derive(Deserialize)] +pub struct EventsQuery { + /// Return only events with `timestamp_ms` strictly greater than this value. + /// Defaults to `0` (return all buffered events). + #[serde(default)] + pub since: u64, +} + +/// `GET /api/events?since={ts_ms}` +/// +/// Returns a JSON array of [`StoredEvent`] objects recorded after `since` ms. +/// The gateway polls this endpoint on each registered project server to build +/// an aggregated cross-project notification stream. +#[handler] +pub fn events_handler( + Query(params): Query, + Data(buffer): Data<&EventBuffer>, +) -> Response { + let events = buffer.events_since(params.since); + let body = serde_json::to_vec(&events).unwrap_or_else(|_| b"[]".to_vec()); + Response::builder() + .status(StatusCode::OK) + .header(poem::http::header::CONTENT_TYPE, "application/json") + .body(body) +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::sync::broadcast; + + #[test] + fn event_buffer_push_and_retrieve() { + let buf = EventBuffer::new(); + buf.push(StoredEvent::MergeFailure { + story_id: "42_story_x".to_string(), + reason: "conflict".to_string(), + timestamp_ms: 1000, + }); + buf.push(StoredEvent::StoryBlocked { + story_id: "43_story_y".to_string(), + reason: "retry limit".to_string(), + timestamp_ms: 2000, + }); + + let all = buf.events_since(0); + assert_eq!(all.len(), 2); + + let after_1000 = buf.events_since(1000); + assert_eq!(after_1000.len(), 1); + assert!(matches!(after_1000[0], StoredEvent::StoryBlocked { .. })); + } + + #[test] + fn event_buffer_evicts_oldest_when_full() { + let buf = EventBuffer::new(); + for i in 0..MAX_BUFFER_SIZE + 1 { + buf.push(StoredEvent::MergeFailure { + story_id: format!("{i}_story_x"), + reason: "x".to_string(), + timestamp_ms: i as u64, + }); + } + // Buffer must not exceed MAX_BUFFER_SIZE. + assert_eq!(buf.events_since(0).len(), MAX_BUFFER_SIZE); + // Oldest entry (timestamp_ms == 0) should have been evicted. + assert!(buf.events_since(0).iter().all(|e| e.timestamp_ms() > 0)); + } + + #[test] + fn stage_transition_timestamp_ms_accessor() { + let e = StoredEvent::StageTransition { + story_id: "1".to_string(), + from_stage: "2_current".to_string(), + to_stage: "3_qa".to_string(), + timestamp_ms: 9999, + }; + assert_eq!(e.timestamp_ms(), 9999); + } + + #[tokio::test] + async fn subscribe_to_watcher_stores_work_item_with_from_stage() { + let buf = EventBuffer::new(); + let (tx, rx) = broadcast::channel(16); + + subscribe_to_watcher(buf.clone(), rx); + + tx.send(crate::io::watcher::WatcherEvent::WorkItem { + stage: "3_qa".to_string(), + item_id: "42_story_foo".to_string(), + action: "qa".to_string(), + commit_msg: "huskies: qa 42_story_foo".to_string(), + from_stage: Some("2_current".to_string()), + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let events = buf.events_since(0); + assert_eq!(events.len(), 1); + assert!(matches!(events[0], StoredEvent::StageTransition { .. })); + if let StoredEvent::StageTransition { + ref story_id, + ref from_stage, + ref to_stage, + .. + } = events[0] + { + assert_eq!(story_id, "42_story_foo"); + assert_eq!(from_stage, "2_current"); + assert_eq!(to_stage, "3_qa"); + } + } + + #[tokio::test] + async fn subscribe_to_watcher_ignores_work_item_without_from_stage() { + let buf = EventBuffer::new(); + let (tx, rx) = broadcast::channel(16); + + subscribe_to_watcher(buf.clone(), rx); + + // Synthetic event: no from_stage. + tx.send(crate::io::watcher::WatcherEvent::WorkItem { + stage: "2_current".to_string(), + item_id: "99_story_syn".to_string(), + action: "start".to_string(), + commit_msg: "huskies: start 99_story_syn".to_string(), + from_stage: None, + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + assert_eq!(buf.events_since(0).len(), 0); + } + + #[tokio::test] + async fn subscribe_to_watcher_stores_merge_failure() { + let buf = EventBuffer::new(); + let (tx, rx) = broadcast::channel(16); + + subscribe_to_watcher(buf.clone(), rx); + + tx.send(crate::io::watcher::WatcherEvent::MergeFailure { + story_id: "42_story_foo".to_string(), + reason: "merge conflict".to_string(), + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let events = buf.events_since(0); + assert_eq!(events.len(), 1); + assert!(matches!(events[0], StoredEvent::MergeFailure { .. })); + } + + #[tokio::test] + async fn subscribe_to_watcher_stores_story_blocked() { + let buf = EventBuffer::new(); + let (tx, rx) = broadcast::channel(16); + + subscribe_to_watcher(buf.clone(), rx); + + tx.send(crate::io::watcher::WatcherEvent::StoryBlocked { + story_id: "43_story_bar".to_string(), + reason: "retry limit exceeded".to_string(), + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let events = buf.events_since(0); + assert_eq!(events.len(), 1); + assert!(matches!(events[0], StoredEvent::StoryBlocked { .. })); + } +} diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index da8fd3e0..24551da0 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -7,6 +7,7 @@ pub mod bot_command; pub mod bot_config; pub mod chat; pub mod context; +pub mod events; pub mod health; pub mod io; pub mod mcp; @@ -68,6 +69,7 @@ pub fn build_routes( whatsapp_ctx: Option>, slack_ctx: Option>, port: u16, + event_buffer: Option, ) -> impl poem::Endpoint { let ctx_arc = std::sync::Arc::new(ctx); @@ -103,6 +105,10 @@ pub fn build_routes( .at("/", get(assets::embedded_index)) .at("/*path", get(assets::embedded_file)); + if let Some(buf) = event_buffer { + route = route.at("/api/events", get(events::events_handler).data(buf)); + } + if let Some(wa_ctx) = whatsapp_ctx { route = route.at( "/webhook/whatsapp", @@ -302,7 +308,7 @@ mod tests { fn build_routes_constructs_without_panic() { let tmp = tempfile::tempdir().unwrap(); let ctx = context::AppContext::new_test(tmp.path().to_path_buf()); - let _endpoint = build_routes(ctx, None, None, 3001); + let _endpoint = build_routes(ctx, None, None, 3001, None); } #[test] @@ -311,6 +317,14 @@ mod tests { // ensuring the port parameter flows through to OAuthState. let tmp = tempfile::tempdir().unwrap(); let ctx = context::AppContext::new_test(tmp.path().to_path_buf()); - let _endpoint = build_routes(ctx, None, None, 9999); + let _endpoint = build_routes(ctx, None, None, 9999, None); + } + + #[test] + fn build_routes_with_event_buffer_constructs_without_panic() { + let tmp = tempfile::tempdir().unwrap(); + let ctx = context::AppContext::new_test(tmp.path().to_path_buf()); + let buf = events::EventBuffer::new(); + let _endpoint = build_routes(ctx, None, None, 3001, Some(buf)); } } diff --git a/server/src/main.rs b/server/src/main.rs index f92e2e2e..fc94154f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -544,6 +544,8 @@ async fn main() -> Result<(), std::io::Error> { let watcher_rx_for_whatsapp = watcher_tx.subscribe(); let watcher_rx_for_slack = watcher_tx.subscribe(); let watcher_rx_for_discord = watcher_tx.subscribe(); + // Subscribe to watcher events for the per-project event buffer (gateway polling). + let watcher_rx_for_events = watcher_tx.subscribe(); // Wrap perm_rx in Arc so it can be shared with both the WebSocket // handler (via AppContext) and the Matrix bot. let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx)); @@ -802,7 +804,18 @@ async fn main() -> Result<(), std::io::Error> { test_jobs: std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())), }; - let app = build_routes(ctx, whatsapp_ctx.clone(), slack_ctx.clone(), port); + // Create the per-project event buffer and subscribe it to the watcher channel + // so that pipeline events are buffered for the gateway's `/api/events` poller. + let event_buffer = crate::http::events::EventBuffer::new(); + crate::http::events::subscribe_to_watcher(event_buffer.clone(), watcher_rx_for_events); + + let app = build_routes( + ctx, + whatsapp_ctx.clone(), + slack_ctx.clone(), + port, + Some(event_buffer), + ); // Unified 1-second background tick loop: fires due timers, detects orphaned // agents (watchdog), and promotes done→archived items (sweep). Replaces the diff --git a/website/docs/configuration.html b/website/docs/configuration.html index 9d3d95ed..f29916da 100644 --- a/website/docs/configuration.html +++ b/website/docs/configuration.html @@ -247,6 +247,61 @@ Run `cargo clippy -- -D warnings` before committing. Zero warnings allowed.history_sizeOptional. Maximum conversation turns to remember per room/user (default: 20). + +

Gateway: aggregated chat stream

+

When running huskies --gateway, you can configure a single bot that receives pipeline notifications from all registered projects. Events are prefixed with [project-name] so you can tell them apart in one shared room.

+ +

The aggregated stream is configured entirely in the gateway's .huskies/bot.toml — no per-project bot config is required and no per-project files need to change when you add a new project to projects.toml.

+ +

Enabling the aggregated stream

+

Add or edit <gateway-config-dir>/.huskies/bot.toml and set enabled = true. The gateway bot will automatically poll every project listed in projects.toml and forward events to the configured rooms.

+
# <gateway-config-dir>/.huskies/bot.toml
+enabled = true
+transport = "matrix"
+homeserver = "https://matrix.example.com"
+username = "@gateway-bot:example.com"
+password = "secret"
+room_ids = ["!gateway-room:example.com"]
+allowed_users = ["@you:example.com"]
+
+# Gateway-specific: poll interval and on/off switch
+aggregated_notifications_poll_interval_secs = 5   # default
+aggregated_notifications_enabled = true            # default
+ +

Aggregated stream settings

+ + + + + + + + + + + + + + + + + + +
KeyTypeDefaultDescription
aggregated_notifications_enabledbooltrueSet to false to disable the aggregated stream without disabling the gateway bot entirely. Per-project configs are never consulted.
aggregated_notifications_poll_interval_secsinteger5How often (in seconds) the gateway polls each project's /api/events endpoint. Lower values reduce notification latency.
+ +

No-duplicate guarantee

+

Per-project bots and the gateway aggregated stream send to different rooms — they are independent. Events from a per-project bot go to that project's rooms; events from the gateway stream go to the gateway rooms. The same event will never appear twice in either room.

+ +

Unreachable projects

+

If a per-project server is temporarily unreachable, the gateway logs a warning and skips that project for the current poll cycle. All other projects continue to deliver notifications normally. No configuration change is required — the poller retries on the next interval.

+ +

Supported event types

+

The aggregated stream delivers the following event types, each prefixed with the project name:

+
    +
  • Stage transitions — story created, agent started, QA requested, QA approved/rejected, merge succeeded (all pipeline stage moves)
  • +
  • Merge failures — merge failed with a reason
  • +
  • Story blocked — story blocked after exceeding retry limit
  • +
diff --git a/website/docs/transports.html b/website/docs/transports.html index 7742f662..e92d4bc6 100644 --- a/website/docs/transports.html +++ b/website/docs/transports.html @@ -200,6 +200,13 @@ discord_allowed_usersOptional. Discord user IDs allowed to interact. When absent, all users in configured channels can interact. + +

Gateway: aggregated notifications

+

When using huskies --gateway, you can configure the gateway bot to receive notifications from all registered projects in a single room. Events are prefixed with [project-name].

+

No additional transport is required — the gateway aggregated stream works with any of the transports above. Configure the gateway's .huskies/bot.toml with your transport credentials and set aggregated_notifications_enabled = true (the default). See Configuration → Gateway aggregated stream for the full reference.

+
+ No per-project changes needed: Adding a new project to projects.toml does not require editing per-project bot configs — the gateway picks it up automatically. +