From d1a2393b32eb2739b484f59dd069882a165e302b Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 28 Apr 2026 00:17:44 +0000 Subject: [PATCH] huskies: merge 760 --- server/src/gateway.rs | 10 ++- server/src/http/gateway.rs | 116 ++++++++++++++++++++++++++++++ server/src/service/gateway/mod.rs | 47 ++++++++++++ website/docs/cli.html | 51 +++++++++++++ 4 files changed, 221 insertions(+), 3 deletions(-) diff --git a/server/src/gateway.rs b/server/src/gateway.rs index fc132c6c..f0ab0903 100644 --- a/server/src/gateway.rs +++ b/server/src/gateway.rs @@ -12,9 +12,9 @@ use std::sync::Arc; // Re-export public types that callers reference as `crate::gateway::*`. pub use crate::service::gateway::{ - GatewayConfig, GatewayState as GatewayStateType, JoinedAgent, ProjectEntry, - fetch_all_project_pipeline_statuses, format_aggregate_status_compact, - spawn_gateway_notification_poller, + GatewayConfig, GatewayState as GatewayStateType, GatewayStatusEvent, JoinedAgent, ProjectEntry, + broadcast_status_event, fetch_all_project_pipeline_statuses, format_aggregate_status_compact, + spawn_gateway_notification_poller, subscribe_status_events, }; /// Build the complete gateway route tree. @@ -70,6 +70,10 @@ pub fn build_gateway_route(state_arc: Arc) -> impl poem::Endpoint "/gateway/agents/:id/heartbeat", poem::post(gateway_heartbeat_handler), ) + .at( + "/gateway/events/push", + poem::get(gateway_event_push_handler), + ) // Serve the embedded React frontend so the gateway has a UI. .at( "/assets/*path", diff --git a/server/src/http/gateway.rs b/server/src/http/gateway.rs index 6b399897..b5ccdaee 100644 --- a/server/src/http/gateway.rs +++ b/server/src/http/gateway.rs @@ -4,9 +4,12 @@ //! the response. No inline business logic, no `reqwest`, no filesystem access. use crate::service::gateway::{self, GatewayState}; +use futures::StreamExt; use poem::handler; use poem::http::StatusCode; use poem::web::Path as PoemPath; +use poem::web::Query; +use poem::web::websocket::{Message as WsMessage, WebSocket}; use poem::web::{Data, Json}; use poem::{Body, Request, Response}; use serde::{Deserialize, Serialize}; @@ -631,6 +634,119 @@ pub async fn gateway_heartbeat_handler( } } +// ── Event-push WebSocket handler ──────────────────────────────────────────── + +/// Query parameters accepted on the `/gateway/events/push` WebSocket upgrade. +#[derive(Deserialize)] +struct EventPushQueryParams { + /// One-time join token generated by `POST /gateway/tokens`. + token: Option, + /// The project name this node represents (e.g. `"huskies"`). + project: Option, +} + +/// `GET /gateway/events/push` — WebSocket endpoint for project nodes to push +/// [`StatusEvent`] frames to the gateway. +/// +/// # Authentication +/// +/// The connecting node must supply a valid one-time join token via the `token` +/// query parameter, obtained from `POST /gateway/tokens`. The token is +/// consumed on the first successful upgrade — the connection itself is then +/// kept open indefinitely. +/// +/// # Protocol +/// +/// Each message from the project node must be a JSON-encoded +/// [`crate::service::events::StoredEvent`]. The gateway fan-outs the event +/// (tagged with the project name) to all current local subscribers. +/// +/// The server does not send data back; clients should treat any close frame +/// as a signal to reconnect with exponential back-off (see docs/gateway-protocol.html). +/// +/// # Reconnect-with-backoff +/// +/// Project nodes MUST reconnect on disconnect. Recommended policy: +/// +/// - Initial retry delay: **1 s** +/// - Back-off multiplier: **2×** per attempt +/// - Max delay cap: **60 s** +/// - Jitter: add ±10 % to the delay to avoid thundering herds +#[handler] +pub async fn gateway_event_push_handler( + ws: WebSocket, + state: Data<&Arc>, + Query(params): Query, +) -> poem::Response { + // ── Authentication (pre-upgrade) ───────────────────────────────────── + let token = match params.token { + Some(t) if !t.is_empty() => t, + _ => { + return poem::Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body("token query parameter required"); + } + }; + + let project = match params.project { + Some(p) if !p.is_empty() => p, + _ => { + return poem::Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("project query parameter required"); + } + }; + + // Validate and consume the one-time token. + { + let mut tokens = state.pending_tokens.write().await; + if !tokens.contains_key(&token) { + return poem::Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body("invalid or already-used join token"); + } + tokens.remove(&token); + } + + // ── WebSocket upgrade ──────────────────────────────────────────────── + use poem::IntoResponse as _; + let state = Arc::clone(&state); + ws.on_upgrade(move |socket| async move { + let (_, mut stream) = socket.split(); + + crate::slog!( + "[gateway] Project node '{}' connected to event-push endpoint", + project + ); + + while let Some(msg) = stream.next().await { + let text = match msg { + Ok(WsMessage::Text(t)) => t, + Ok(WsMessage::Close(_)) | Err(_) => break, + _ => continue, + }; + + match serde_json::from_str::(&text) { + Ok(event) => { + gateway::broadcast_status_event(&state, project.clone(), event); + } + Err(e) => { + crate::slog!( + "[gateway] event-push: invalid frame from '{}': {e}", + project + ); + } + } + } + + crate::slog!( + "[gateway] Project node '{}' disconnected from event-push endpoint", + project + ); + }) + .into_response() +} + // ── Health handler ────────────────────────────────────────────────────────── /// HTTP GET `/health` handler for the gateway. diff --git a/server/src/service/gateway/mod.rs b/server/src/service/gateway/mod.rs index 2d69d76c..a1921065 100644 --- a/server/src/service/gateway/mod.rs +++ b/server/src/service/gateway/mod.rs @@ -26,6 +26,21 @@ use std::sync::Arc; use tokio::sync::Mutex as TokioMutex; use tokio::sync::RwLock; +// ── Status event broadcaster ──────────────────────────────────────────────── + +/// Capacity of the gateway status event broadcast channel. +const EVENT_CHANNEL_CAPACITY: usize = 64; + +/// A status event pushed by a project node and fanned out to all local +/// subscribers (e.g. the Web UI, notification forwarders). +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct GatewayStatusEvent { + /// The project name that emitted this event. + pub project: String, + /// The pipeline event payload. + pub event: crate::service::events::StoredEvent, +} + // ── Error type ────────────────────────────────────────────────────────────── /// Typed errors returned by `service::gateway` functions. @@ -93,6 +108,10 @@ pub struct GatewayState { pub port: u16, /// Abort handle for the running Matrix bot task (if any). pub bot_handle: Arc>>, + /// Broadcast sender for [`GatewayStatusEvent`]s pushed by project nodes. + /// + /// Call `event_tx.subscribe()` to obtain a receiver for outbound fan-out. + pub event_tx: tokio::sync::broadcast::Sender, } impl GatewayState { @@ -107,6 +126,7 @@ impl GatewayState { ) -> Result { let first = config::validate_config(&gateway_config)?; let agents = io::load_agents(&config_dir); + let (event_tx, _) = tokio::sync::broadcast::channel(EVENT_CHANNEL_CAPACITY); Ok(Self { projects: Arc::new(RwLock::new(gateway_config.projects)), active_project: Arc::new(RwLock::new(first)), @@ -116,6 +136,7 @@ impl GatewayState { config_dir, port, bot_handle: Arc::new(TokioMutex::new(None)), + event_tx, }) } @@ -380,6 +401,32 @@ pub async fn health_check_all(state: &GatewayState) -> (bool, BTreeMap usize { + let msg = GatewayStatusEvent { project, event }; + state.event_tx.send(msg).unwrap_or(0) +} + +/// Subscribe to the gateway's status event stream. +/// +/// Returns a broadcast receiver that will yield [`GatewayStatusEvent`]s as +/// project nodes push them. If the receiver falls behind (more than +/// [`EVENT_CHANNEL_CAPACITY`] events are queued), it will receive a +/// [`tokio::sync::broadcast::error::RecvError::Lagged`] error; callers +/// should discard lagged events and continue. +pub fn subscribe_status_events( + state: &GatewayState, +) -> tokio::sync::broadcast::Receiver { + state.event_tx.subscribe() +} + /// Save bot config and restart the bot. pub async fn save_bot_config_and_restart(state: &GatewayState, content: &str) -> Result<(), Error> { io::write_bot_config(&state.config_dir, content).map_err(Error::Config)?; diff --git a/website/docs/cli.html b/website/docs/cli.html index 775ad6aa..45c8c676 100644 --- a/website/docs/cli.html +++ b/website/docs/cli.html @@ -146,6 +146,57 @@ HUSKIES_PORT=3002 huskies +

Gateway event-push protocol

+

Project nodes can push pipeline status events to the gateway in real time over a WebSocket connection. The gateway fans each event out to all connected local subscribers.

+ +

Connecting

+
    +
  1. Obtain a one-time join token: POST /gateway/tokens{"token":"…"}
  2. +
  3. Open a WebSocket upgrade to GET /gateway/events/push?token=TOKEN&project=PROJECT_NAME
  4. +
  5. The token is consumed on upgrade. The project name is attached to every event the server broadcasts downstream.
  6. +
+ +

Sending events

+

Each message must be a JSON-encoded StoredEvent frame:

+
// Stage transition
+{"type":"stage_transition","story_id":"42_story_login","from_stage":"2_current","to_stage":"3_qa","timestamp_ms":1700000000000}
+
+// Merge failure
+{"type":"merge_failure","story_id":"42_story_login","reason":"conflict in src/main.rs","timestamp_ms":1700000001000}
+
+// Story blocked
+{"type":"story_blocked","story_id":"42_story_login","reason":"retry limit exceeded","timestamp_ms":1700000002000}
+

The server does not send frames back. Any other frames received by the project node indicate an error or server restart — treat them as a disconnect signal.

+ +

Reconnect with exponential back-off

+

Project nodes must reconnect on any disconnect. Use the following policy to avoid thundering herds after a gateway restart:

+ + + + + + + + +
ParameterValue
Initial delay1 s
Back-off multiplier2× per attempt
Maximum delay60 s
Jitter±10 % of the computed delay
+

Pseudocode:

+
delay = 1.0          // seconds
+max_delay = 60.0
+
+loop:
+    token = POST /gateway/tokens
+    connect ws:/gateway/events/push?token=TOKEN&project=NAME
+    while connected:
+        send StoredEvent frames
+    // disconnected — wait and retry
+    jitter = delay * (random(0.9, 1.1))
+    sleep(min(jitter, max_delay))
+    delay = min(delay * 2, max_delay)
+ +
+ New token per connection: Each WebSocket upgrade consumes the join token. Request a fresh token for every reconnect attempt. +
+

Building from source

Standard release build

cargo build --release