From 0d14fffe1c48637d44b2fed41cff4117d629f6c3 Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 28 Apr 2026 01:27:00 +0000 Subject: [PATCH] huskies: merge 762 --- server/src/chat/transport/matrix/bot/run.rs | 19 +++ server/src/chat/transport/matrix/mod.rs | 4 + server/src/gateway.rs | 174 +++++++++++++++++++- server/src/main.rs | 1 + server/src/service/gateway/io.rs | 51 ++++++ server/src/service/gateway/mod.rs | 6 +- 6 files changed, 253 insertions(+), 2 deletions(-) diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index c5541abb..5d06abc8 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -31,6 +31,9 @@ pub async fn run_bot( gateway_projects: Vec, gateway_project_urls: std::collections::BTreeMap, timer_store: Arc, + gateway_event_rx: Option< + tokio::sync::broadcast::Receiver, + >, ) -> Result<(), String> { let project_root = &services.project_root; let store_path = project_root.join(".huskies").join("matrix_store"); @@ -322,6 +325,22 @@ pub async fn run_bot( ); } + // In gateway mode, subscribe to the gateway-side status broadcaster and + // forward events to the configured Matrix rooms with a `[project-name]` prefix. + // This path delivers events pushed directly by project nodes over WebSocket + // (via `/gateway/events/push`), complementing the HTTP-polling path above. + // On broadcaster back-pressure (Lagged), the task re-subscribes automatically + // so it never permanently stalls. + if let Some(event_rx) = gateway_event_rx { + let broadcast_room_ids: Vec = + announce_room_ids.iter().map(|r| r.to_string()).collect(); + crate::gateway::spawn_gateway_broadcaster_forwarder( + Arc::clone(&transport), + broadcast_room_ids, + event_rx, + ); + } + // Spawn a shutdown watcher that sends a best-effort goodbye message to all // configured rooms when the server is about to stop (SIGINT/SIGTERM or rebuild). { diff --git a/server/src/chat/transport/matrix/mod.rs b/server/src/chat/transport/matrix/mod.rs index 68135629..c1f0a4c0 100644 --- a/server/src/chat/transport/matrix/mod.rs +++ b/server/src/chat/transport/matrix/mod.rs @@ -71,6 +71,9 @@ pub fn spawn_bot( gateway_projects: Vec, gateway_project_urls: std::collections::BTreeMap, timer_store: Arc, + gateway_event_rx: Option< + tokio::sync::broadcast::Receiver, + >, ) -> Option { let config = match BotConfig::load(project_root) { Some(c) => c, @@ -109,6 +112,7 @@ pub fn spawn_bot( gateway_projects, gateway_project_urls, timer_store, + gateway_event_rx, ) .await { diff --git a/server/src/gateway.rs b/server/src/gateway.rs index f0ab0903..2866583a 100644 --- a/server/src/gateway.rs +++ b/server/src/gateway.rs @@ -14,7 +14,8 @@ use std::sync::Arc; pub use crate::service::gateway::{ GatewayConfig, GatewayState as GatewayStateType, GatewayStatusEvent, JoinedAgent, ProjectEntry, broadcast_status_event, fetch_all_project_pipeline_statuses, format_aggregate_status_compact, - spawn_gateway_notification_poller, subscribe_status_events, + spawn_gateway_broadcaster_forwarder, spawn_gateway_notification_poller, + subscribe_status_events, }; /// Build the complete gateway route tree. @@ -130,6 +131,7 @@ pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> { gateway_projects, gateway_project_urls, port, + Some(state_arc.event_tx.clone()), ); *state_arc.bot_handle.lock().await = bot_abort; @@ -976,6 +978,176 @@ mod tests { } } + // ── Gateway broadcaster forwarder tests ───────────────────────────── + + #[tokio::test] + async fn broadcaster_forwarder_forwards_events_with_project_tag() { + use crate::chat::{ChatTransport, MessageId}; + use crate::service::events::StoredEvent; + use async_trait::async_trait; + + type CallLog = Arc>>; + + struct MockTransport { + calls: CallLog, + } + + #[async_trait] + impl ChatTransport for MockTransport { + async fn send_message( + &self, + room_id: &str, + plain: &str, + _html: &str, + ) -> Result { + self.calls + .lock() + .unwrap() + .push((room_id.to_string(), plain.to_string())); + Ok("id".to_string()) + } + + async fn edit_message( + &self, + _room_id: &str, + _id: &str, + _plain: &str, + _html: &str, + ) -> Result<(), String> { + Ok(()) + } + + async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { + Ok(()) + } + } + + let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); + let transport = Arc::new(MockTransport { + calls: Arc::clone(&calls), + }); + + let (tx, rx) = + tokio::sync::broadcast::channel::(16); + gateway::spawn_gateway_broadcaster_forwarder( + transport as Arc, + vec!["!room:example.org".to_string()], + rx, + ); + + // Give the forwarder task a moment to start. + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + let event = crate::service::gateway::GatewayStatusEvent { + project: "my-project".to_string(), + event: StoredEvent::StageTransition { + story_id: "7_story_x".to_string(), + from_stage: "2_current".to_string(), + to_stage: "3_qa".to_string(), + timestamp_ms: 100, + }, + }; + tx.send(event).unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let messages = calls.lock().unwrap(); + assert_eq!(messages.len(), 1, "Expected exactly one notification"); + let (room, plain) = &messages[0]; + assert_eq!(room, "!room:example.org"); + assert!( + plain.starts_with("[my-project]"), + "Expected [my-project] prefix; got: {plain}" + ); + assert!( + plain.contains("7_story_x"), + "Expected story ID; got: {plain}" + ); + } + + #[tokio::test] + async fn broadcaster_forwarder_resubscribes_on_lag() { + use crate::chat::{ChatTransport, MessageId}; + use crate::service::events::StoredEvent; + use async_trait::async_trait; + + type Counter = Arc>; + + struct CountTransport { + count: Counter, + } + + #[async_trait] + impl ChatTransport for CountTransport { + async fn send_message( + &self, + _room_id: &str, + _plain: &str, + _html: &str, + ) -> Result { + *self.count.lock().unwrap() += 1; + Ok("id".to_string()) + } + + async fn edit_message( + &self, + _room_id: &str, + _id: &str, + _plain: &str, + _html: &str, + ) -> Result<(), String> { + Ok(()) + } + + async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { + Ok(()) + } + } + + let count: Counter = Arc::new(std::sync::Mutex::new(0)); + let transport = Arc::new(CountTransport { + count: Arc::clone(&count), + }); + + // Use a tiny channel (capacity 1) so the second send causes a Lagged error. + let (tx, rx) = + tokio::sync::broadcast::channel::(1); + + // Flood the channel to trigger Lagged before the forwarder task starts. + let make_event = |n: u64| crate::service::gateway::GatewayStatusEvent { + project: "p".to_string(), + event: StoredEvent::StageTransition { + story_id: format!("{n}_story"), + from_stage: "2_current".to_string(), + to_stage: "3_qa".to_string(), + timestamp_ms: n, + }, + }; + // Send 3 events to overflow the capacity-1 channel before the task runs. + let _ = tx.send(make_event(1)); + let _ = tx.send(make_event(2)); + let _ = tx.send(make_event(3)); + + gateway::spawn_gateway_broadcaster_forwarder( + transport as Arc, + vec!["!r:x.org".to_string()], + rx, + ); + + // Send one more event after the forwarder subscribes; it should arrive. + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + tx.send(make_event(4)).unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // After Lagged + resubscribe, the forwarder must still process event 4. + let received = *count.lock().unwrap(); + assert!( + received >= 1, + "Expected at least one event after Lagged resubscribe; got {received}" + ); + } + // ── BotConfig tests ───────────────────────────────────────────────── #[test] diff --git a/server/src/main.rs b/server/src/main.rs index f95fe5b0..5f77855c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -811,6 +811,7 @@ async fn main() -> Result<(), std::io::Error> { vec![], std::collections::BTreeMap::new(), timer_store_for_bot, + None, ); } else { // Keep the receiver alive (drop it) so the sender never errors. diff --git a/server/src/service/gateway/io.rs b/server/src/service/gateway/io.rs index d46226cf..7a5f89da 100644 --- a/server/src/service/gateway/io.rs +++ b/server/src/service/gateway/io.rs @@ -302,6 +302,50 @@ pub fn init_wizard_state(path: &Path) { // ── Notification poller ───────────────────────────────────────────────────── +/// Spawn a background task that reads [`super::GatewayStatusEvent`]s from the +/// gateway broadcast channel and forwards each one to the configured rooms via +/// `transport`, formatted with a `[project-name]` prefix. +/// +/// Survives broadcaster back-pressure: when the receiver falls behind +/// ([`tokio::sync::broadcast::error::RecvError::Lagged`]), the task +/// re-subscribes so it does not permanently stall. +/// +/// The task exits cleanly when the broadcast channel is closed (i.e. when +/// `GatewayState` is dropped). +pub fn spawn_gateway_broadcaster_forwarder( + transport: std::sync::Arc, + room_ids: Vec, + mut rx: tokio::sync::broadcast::Receiver, +) { + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(event) => { + let (plain, html) = + super::polling::format_gateway_event(&event.project, &event.event); + for room_id in &room_ids { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + crate::slog!( + "[gateway-forwarder] Failed to forward event to {room_id}: {e}" + ); + } + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + crate::slog!( + "[gateway-forwarder] Broadcaster lagged by {n} messages; resubscribing" + ); + rx = rx.resubscribe(); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + crate::slog!("[gateway-forwarder] Broadcast channel closed; forwarder exiting"); + break; + } + } + } + }); +} + /// Spawn a background task that polls events from all project servers. pub fn spawn_gateway_notification_poller( transport: std::sync::Arc, @@ -374,12 +418,17 @@ pub fn spawn_gateway_notification_poller( pub type ActiveProject = std::sync::Arc>; /// Attempt to spawn the Matrix bot against the gateway config directory. +/// +/// `gateway_event_tx` — when `Some`, the bot will subscribe to the gateway +/// status broadcaster and forward [`super::GatewayStatusEvent`]s to its +/// configured Matrix rooms with a `[project-name]` prefix. pub fn spawn_gateway_bot( config_dir: &Path, active_project: ActiveProject, gateway_projects: Vec, gateway_project_urls: BTreeMap, port: u16, + gateway_event_tx: Option>, ) -> Option { use crate::agents::AgentPool; use crate::services::Services; @@ -412,6 +461,7 @@ pub fn spawn_gateway_bot( let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load( config_dir.join(".huskies").join("timers.json"), )); + let gateway_event_rx = gateway_event_tx.map(|tx| tx.subscribe()); crate::chat::transport::matrix::spawn_bot( config_dir, watcher_tx, @@ -421,5 +471,6 @@ pub fn spawn_gateway_bot( gateway_projects, gateway_project_urls, timer_store, + gateway_event_rx, ) } diff --git a/server/src/service/gateway/mod.rs b/server/src/service/gateway/mod.rs index a1921065..64e93c28 100644 --- a/server/src/service/gateway/mod.rs +++ b/server/src/service/gateway/mod.rs @@ -16,7 +16,10 @@ pub mod registration; pub use aggregation::format_aggregate_status_compact; pub use config::{GatewayConfig, ProjectEntry}; -pub use io::{fetch_all_project_pipeline_statuses, spawn_gateway_notification_poller}; +pub use io::{ + fetch_all_project_pipeline_statuses, spawn_gateway_broadcaster_forwarder, + spawn_gateway_notification_poller, +}; pub use registration::JoinedAgent; use io::Client; @@ -452,6 +455,7 @@ pub async fn save_bot_config_and_restart(state: &GatewayState, content: &str) -> gateway_projects, gateway_project_urls, state.port, + Some(state.event_tx.clone()), ); *handle = new_handle; }