huskies: merge 762
This commit is contained in:
@@ -31,6 +31,9 @@ pub async fn run_bot(
|
|||||||
gateway_projects: Vec<String>,
|
gateway_projects: Vec<String>,
|
||||||
gateway_project_urls: std::collections::BTreeMap<String, String>,
|
gateway_project_urls: std::collections::BTreeMap<String, String>,
|
||||||
timer_store: Arc<TimerStore>,
|
timer_store: Arc<TimerStore>,
|
||||||
|
gateway_event_rx: Option<
|
||||||
|
tokio::sync::broadcast::Receiver<crate::service::gateway::GatewayStatusEvent>,
|
||||||
|
>,
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
let project_root = &services.project_root;
|
let project_root = &services.project_root;
|
||||||
let store_path = project_root.join(".huskies").join("matrix_store");
|
let store_path = project_root.join(".huskies").join("matrix_store");
|
||||||
@@ -322,6 +325,22 @@ pub async fn run_bot(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// In gateway mode, subscribe to the gateway-side status broadcaster and
|
||||||
|
// forward events to the configured Matrix rooms with a `[project-name]` prefix.
|
||||||
|
// This path delivers events pushed directly by project nodes over WebSocket
|
||||||
|
// (via `/gateway/events/push`), complementing the HTTP-polling path above.
|
||||||
|
// On broadcaster back-pressure (Lagged), the task re-subscribes automatically
|
||||||
|
// so it never permanently stalls.
|
||||||
|
if let Some(event_rx) = gateway_event_rx {
|
||||||
|
let broadcast_room_ids: Vec<String> =
|
||||||
|
announce_room_ids.iter().map(|r| r.to_string()).collect();
|
||||||
|
crate::gateway::spawn_gateway_broadcaster_forwarder(
|
||||||
|
Arc::clone(&transport),
|
||||||
|
broadcast_room_ids,
|
||||||
|
event_rx,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// Spawn a shutdown watcher that sends a best-effort goodbye message to all
|
// Spawn a shutdown watcher that sends a best-effort goodbye message to all
|
||||||
// configured rooms when the server is about to stop (SIGINT/SIGTERM or rebuild).
|
// configured rooms when the server is about to stop (SIGINT/SIGTERM or rebuild).
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -71,6 +71,9 @@ pub fn spawn_bot(
|
|||||||
gateway_projects: Vec<String>,
|
gateway_projects: Vec<String>,
|
||||||
gateway_project_urls: std::collections::BTreeMap<String, String>,
|
gateway_project_urls: std::collections::BTreeMap<String, String>,
|
||||||
timer_store: Arc<TimerStore>,
|
timer_store: Arc<TimerStore>,
|
||||||
|
gateway_event_rx: Option<
|
||||||
|
tokio::sync::broadcast::Receiver<crate::service::gateway::GatewayStatusEvent>,
|
||||||
|
>,
|
||||||
) -> Option<tokio::task::AbortHandle> {
|
) -> Option<tokio::task::AbortHandle> {
|
||||||
let config = match BotConfig::load(project_root) {
|
let config = match BotConfig::load(project_root) {
|
||||||
Some(c) => c,
|
Some(c) => c,
|
||||||
@@ -109,6 +112,7 @@ pub fn spawn_bot(
|
|||||||
gateway_projects,
|
gateway_projects,
|
||||||
gateway_project_urls,
|
gateway_project_urls,
|
||||||
timer_store,
|
timer_store,
|
||||||
|
gateway_event_rx,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
|
|||||||
+173
-1
@@ -14,7 +14,8 @@ use std::sync::Arc;
|
|||||||
pub use crate::service::gateway::{
|
pub use crate::service::gateway::{
|
||||||
GatewayConfig, GatewayState as GatewayStateType, GatewayStatusEvent, JoinedAgent, ProjectEntry,
|
GatewayConfig, GatewayState as GatewayStateType, GatewayStatusEvent, JoinedAgent, ProjectEntry,
|
||||||
broadcast_status_event, fetch_all_project_pipeline_statuses, format_aggregate_status_compact,
|
broadcast_status_event, fetch_all_project_pipeline_statuses, format_aggregate_status_compact,
|
||||||
spawn_gateway_notification_poller, subscribe_status_events,
|
spawn_gateway_broadcaster_forwarder, spawn_gateway_notification_poller,
|
||||||
|
subscribe_status_events,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Build the complete gateway route tree.
|
/// Build the complete gateway route tree.
|
||||||
@@ -130,6 +131,7 @@ pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> {
|
|||||||
gateway_projects,
|
gateway_projects,
|
||||||
gateway_project_urls,
|
gateway_project_urls,
|
||||||
port,
|
port,
|
||||||
|
Some(state_arc.event_tx.clone()),
|
||||||
);
|
);
|
||||||
*state_arc.bot_handle.lock().await = bot_abort;
|
*state_arc.bot_handle.lock().await = bot_abort;
|
||||||
|
|
||||||
@@ -976,6 +978,176 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Gateway broadcaster forwarder tests ─────────────────────────────
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn broadcaster_forwarder_forwards_events_with_project_tag() {
|
||||||
|
use crate::chat::{ChatTransport, MessageId};
|
||||||
|
use crate::service::events::StoredEvent;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
type CallLog = Arc<std::sync::Mutex<Vec<(String, String)>>>;
|
||||||
|
|
||||||
|
struct MockTransport {
|
||||||
|
calls: CallLog,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ChatTransport for MockTransport {
|
||||||
|
async fn send_message(
|
||||||
|
&self,
|
||||||
|
room_id: &str,
|
||||||
|
plain: &str,
|
||||||
|
_html: &str,
|
||||||
|
) -> Result<MessageId, String> {
|
||||||
|
self.calls
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.push((room_id.to_string(), plain.to_string()));
|
||||||
|
Ok("id".to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn edit_message(
|
||||||
|
&self,
|
||||||
|
_room_id: &str,
|
||||||
|
_id: &str,
|
||||||
|
_plain: &str,
|
||||||
|
_html: &str,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new()));
|
||||||
|
let transport = Arc::new(MockTransport {
|
||||||
|
calls: Arc::clone(&calls),
|
||||||
|
});
|
||||||
|
|
||||||
|
let (tx, rx) =
|
||||||
|
tokio::sync::broadcast::channel::<crate::service::gateway::GatewayStatusEvent>(16);
|
||||||
|
gateway::spawn_gateway_broadcaster_forwarder(
|
||||||
|
transport as Arc<dyn crate::chat::ChatTransport>,
|
||||||
|
vec!["!room:example.org".to_string()],
|
||||||
|
rx,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Give the forwarder task a moment to start.
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||||
|
|
||||||
|
let event = crate::service::gateway::GatewayStatusEvent {
|
||||||
|
project: "my-project".to_string(),
|
||||||
|
event: StoredEvent::StageTransition {
|
||||||
|
story_id: "7_story_x".to_string(),
|
||||||
|
from_stage: "2_current".to_string(),
|
||||||
|
to_stage: "3_qa".to_string(),
|
||||||
|
timestamp_ms: 100,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
tx.send(event).unwrap();
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
let messages = calls.lock().unwrap();
|
||||||
|
assert_eq!(messages.len(), 1, "Expected exactly one notification");
|
||||||
|
let (room, plain) = &messages[0];
|
||||||
|
assert_eq!(room, "!room:example.org");
|
||||||
|
assert!(
|
||||||
|
plain.starts_with("[my-project]"),
|
||||||
|
"Expected [my-project] prefix; got: {plain}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
plain.contains("7_story_x"),
|
||||||
|
"Expected story ID; got: {plain}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn broadcaster_forwarder_resubscribes_on_lag() {
|
||||||
|
use crate::chat::{ChatTransport, MessageId};
|
||||||
|
use crate::service::events::StoredEvent;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
type Counter = Arc<std::sync::Mutex<usize>>;
|
||||||
|
|
||||||
|
struct CountTransport {
|
||||||
|
count: Counter,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ChatTransport for CountTransport {
|
||||||
|
async fn send_message(
|
||||||
|
&self,
|
||||||
|
_room_id: &str,
|
||||||
|
_plain: &str,
|
||||||
|
_html: &str,
|
||||||
|
) -> Result<MessageId, String> {
|
||||||
|
*self.count.lock().unwrap() += 1;
|
||||||
|
Ok("id".to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn edit_message(
|
||||||
|
&self,
|
||||||
|
_room_id: &str,
|
||||||
|
_id: &str,
|
||||||
|
_plain: &str,
|
||||||
|
_html: &str,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let count: Counter = Arc::new(std::sync::Mutex::new(0));
|
||||||
|
let transport = Arc::new(CountTransport {
|
||||||
|
count: Arc::clone(&count),
|
||||||
|
});
|
||||||
|
|
||||||
|
// Use a tiny channel (capacity 1) so the second send causes a Lagged error.
|
||||||
|
let (tx, rx) =
|
||||||
|
tokio::sync::broadcast::channel::<crate::service::gateway::GatewayStatusEvent>(1);
|
||||||
|
|
||||||
|
// Flood the channel to trigger Lagged before the forwarder task starts.
|
||||||
|
let make_event = |n: u64| crate::service::gateway::GatewayStatusEvent {
|
||||||
|
project: "p".to_string(),
|
||||||
|
event: StoredEvent::StageTransition {
|
||||||
|
story_id: format!("{n}_story"),
|
||||||
|
from_stage: "2_current".to_string(),
|
||||||
|
to_stage: "3_qa".to_string(),
|
||||||
|
timestamp_ms: n,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
// Send 3 events to overflow the capacity-1 channel before the task runs.
|
||||||
|
let _ = tx.send(make_event(1));
|
||||||
|
let _ = tx.send(make_event(2));
|
||||||
|
let _ = tx.send(make_event(3));
|
||||||
|
|
||||||
|
gateway::spawn_gateway_broadcaster_forwarder(
|
||||||
|
transport as Arc<dyn crate::chat::ChatTransport>,
|
||||||
|
vec!["!r:x.org".to_string()],
|
||||||
|
rx,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Send one more event after the forwarder subscribes; it should arrive.
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
||||||
|
tx.send(make_event(4)).unwrap();
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
// After Lagged + resubscribe, the forwarder must still process event 4.
|
||||||
|
let received = *count.lock().unwrap();
|
||||||
|
assert!(
|
||||||
|
received >= 1,
|
||||||
|
"Expected at least one event after Lagged resubscribe; got {received}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// ── BotConfig tests ─────────────────────────────────────────────────
|
// ── BotConfig tests ─────────────────────────────────────────────────
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -811,6 +811,7 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
vec![],
|
vec![],
|
||||||
std::collections::BTreeMap::new(),
|
std::collections::BTreeMap::new(),
|
||||||
timer_store_for_bot,
|
timer_store_for_bot,
|
||||||
|
None,
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
// Keep the receiver alive (drop it) so the sender never errors.
|
// Keep the receiver alive (drop it) so the sender never errors.
|
||||||
|
|||||||
@@ -302,6 +302,50 @@ pub fn init_wizard_state(path: &Path) {
|
|||||||
|
|
||||||
// ── Notification poller ─────────────────────────────────────────────────────
|
// ── Notification poller ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Spawn a background task that reads [`super::GatewayStatusEvent`]s from the
|
||||||
|
/// gateway broadcast channel and forwards each one to the configured rooms via
|
||||||
|
/// `transport`, formatted with a `[project-name]` prefix.
|
||||||
|
///
|
||||||
|
/// Survives broadcaster back-pressure: when the receiver falls behind
|
||||||
|
/// ([`tokio::sync::broadcast::error::RecvError::Lagged`]), the task
|
||||||
|
/// re-subscribes so it does not permanently stall.
|
||||||
|
///
|
||||||
|
/// The task exits cleanly when the broadcast channel is closed (i.e. when
|
||||||
|
/// `GatewayState` is dropped).
|
||||||
|
pub fn spawn_gateway_broadcaster_forwarder(
|
||||||
|
transport: std::sync::Arc<dyn crate::chat::ChatTransport>,
|
||||||
|
room_ids: Vec<String>,
|
||||||
|
mut rx: tokio::sync::broadcast::Receiver<super::GatewayStatusEvent>,
|
||||||
|
) {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
match rx.recv().await {
|
||||||
|
Ok(event) => {
|
||||||
|
let (plain, html) =
|
||||||
|
super::polling::format_gateway_event(&event.project, &event.event);
|
||||||
|
for room_id in &room_ids {
|
||||||
|
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
|
||||||
|
crate::slog!(
|
||||||
|
"[gateway-forwarder] Failed to forward event to {room_id}: {e}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||||
|
crate::slog!(
|
||||||
|
"[gateway-forwarder] Broadcaster lagged by {n} messages; resubscribing"
|
||||||
|
);
|
||||||
|
rx = rx.resubscribe();
|
||||||
|
}
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||||
|
crate::slog!("[gateway-forwarder] Broadcast channel closed; forwarder exiting");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/// Spawn a background task that polls events from all project servers.
|
/// Spawn a background task that polls events from all project servers.
|
||||||
pub fn spawn_gateway_notification_poller(
|
pub fn spawn_gateway_notification_poller(
|
||||||
transport: std::sync::Arc<dyn crate::chat::ChatTransport>,
|
transport: std::sync::Arc<dyn crate::chat::ChatTransport>,
|
||||||
@@ -374,12 +418,17 @@ pub fn spawn_gateway_notification_poller(
|
|||||||
pub type ActiveProject = std::sync::Arc<tokio::sync::RwLock<String>>;
|
pub type ActiveProject = std::sync::Arc<tokio::sync::RwLock<String>>;
|
||||||
|
|
||||||
/// Attempt to spawn the Matrix bot against the gateway config directory.
|
/// Attempt to spawn the Matrix bot against the gateway config directory.
|
||||||
|
///
|
||||||
|
/// `gateway_event_tx` — when `Some`, the bot will subscribe to the gateway
|
||||||
|
/// status broadcaster and forward [`super::GatewayStatusEvent`]s to its
|
||||||
|
/// configured Matrix rooms with a `[project-name]` prefix.
|
||||||
pub fn spawn_gateway_bot(
|
pub fn spawn_gateway_bot(
|
||||||
config_dir: &Path,
|
config_dir: &Path,
|
||||||
active_project: ActiveProject,
|
active_project: ActiveProject,
|
||||||
gateway_projects: Vec<String>,
|
gateway_projects: Vec<String>,
|
||||||
gateway_project_urls: BTreeMap<String, String>,
|
gateway_project_urls: BTreeMap<String, String>,
|
||||||
port: u16,
|
port: u16,
|
||||||
|
gateway_event_tx: Option<tokio::sync::broadcast::Sender<super::GatewayStatusEvent>>,
|
||||||
) -> Option<tokio::task::AbortHandle> {
|
) -> Option<tokio::task::AbortHandle> {
|
||||||
use crate::agents::AgentPool;
|
use crate::agents::AgentPool;
|
||||||
use crate::services::Services;
|
use crate::services::Services;
|
||||||
@@ -412,6 +461,7 @@ pub fn spawn_gateway_bot(
|
|||||||
let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load(
|
let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load(
|
||||||
config_dir.join(".huskies").join("timers.json"),
|
config_dir.join(".huskies").join("timers.json"),
|
||||||
));
|
));
|
||||||
|
let gateway_event_rx = gateway_event_tx.map(|tx| tx.subscribe());
|
||||||
crate::chat::transport::matrix::spawn_bot(
|
crate::chat::transport::matrix::spawn_bot(
|
||||||
config_dir,
|
config_dir,
|
||||||
watcher_tx,
|
watcher_tx,
|
||||||
@@ -421,5 +471,6 @@ pub fn spawn_gateway_bot(
|
|||||||
gateway_projects,
|
gateway_projects,
|
||||||
gateway_project_urls,
|
gateway_project_urls,
|
||||||
timer_store,
|
timer_store,
|
||||||
|
gateway_event_rx,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,7 +16,10 @@ pub mod registration;
|
|||||||
|
|
||||||
pub use aggregation::format_aggregate_status_compact;
|
pub use aggregation::format_aggregate_status_compact;
|
||||||
pub use config::{GatewayConfig, ProjectEntry};
|
pub use config::{GatewayConfig, ProjectEntry};
|
||||||
pub use io::{fetch_all_project_pipeline_statuses, spawn_gateway_notification_poller};
|
pub use io::{
|
||||||
|
fetch_all_project_pipeline_statuses, spawn_gateway_broadcaster_forwarder,
|
||||||
|
spawn_gateway_notification_poller,
|
||||||
|
};
|
||||||
pub use registration::JoinedAgent;
|
pub use registration::JoinedAgent;
|
||||||
|
|
||||||
use io::Client;
|
use io::Client;
|
||||||
@@ -452,6 +455,7 @@ pub async fn save_bot_config_and_restart(state: &GatewayState, content: &str) ->
|
|||||||
gateway_projects,
|
gateway_projects,
|
||||||
gateway_project_urls,
|
gateway_project_urls,
|
||||||
state.port,
|
state.port,
|
||||||
|
Some(state.event_tx.clone()),
|
||||||
);
|
);
|
||||||
*handle = new_handle;
|
*handle = new_handle;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user