huskies: merge 760

This commit is contained in:
dave
2026-04-28 00:17:44 +00:00
parent 63ce7b9ec3
commit d1a2393b32
4 changed files with 221 additions and 3 deletions
+116
View File
@@ -4,9 +4,12 @@
//! the response. No inline business logic, no `reqwest`, no filesystem access.
use crate::service::gateway::{self, GatewayState};
use futures::StreamExt;
use poem::handler;
use poem::http::StatusCode;
use poem::web::Path as PoemPath;
use poem::web::Query;
use poem::web::websocket::{Message as WsMessage, WebSocket};
use poem::web::{Data, Json};
use poem::{Body, Request, Response};
use serde::{Deserialize, Serialize};
@@ -631,6 +634,119 @@ pub async fn gateway_heartbeat_handler(
}
}
// ── Event-push WebSocket handler ────────────────────────────────────────────
/// Query parameters accepted on the `/gateway/events/push` WebSocket upgrade.
#[derive(Deserialize)]
struct EventPushQueryParams {
/// One-time join token generated by `POST /gateway/tokens`.
token: Option<String>,
/// The project name this node represents (e.g. `"huskies"`).
project: Option<String>,
}
/// `GET /gateway/events/push` — WebSocket endpoint for project nodes to push
/// [`StatusEvent`] frames to the gateway.
///
/// # Authentication
///
/// The connecting node must supply a valid one-time join token via the `token`
/// query parameter, obtained from `POST /gateway/tokens`. The token is
/// consumed on the first successful upgrade — the connection itself is then
/// kept open indefinitely.
///
/// # Protocol
///
/// Each message from the project node must be a JSON-encoded
/// [`crate::service::events::StoredEvent`]. The gateway fan-outs the event
/// (tagged with the project name) to all current local subscribers.
///
/// The server does not send data back; clients should treat any close frame
/// as a signal to reconnect with exponential back-off (see docs/gateway-protocol.html).
///
/// # Reconnect-with-backoff
///
/// Project nodes MUST reconnect on disconnect. Recommended policy:
///
/// - Initial retry delay: **1 s**
/// - Back-off multiplier: **2×** per attempt
/// - Max delay cap: **60 s**
/// - Jitter: add ±10 % to the delay to avoid thundering herds
#[handler]
pub async fn gateway_event_push_handler(
ws: WebSocket,
state: Data<&Arc<GatewayState>>,
Query(params): Query<EventPushQueryParams>,
) -> poem::Response {
// ── Authentication (pre-upgrade) ─────────────────────────────────────
let token = match params.token {
Some(t) if !t.is_empty() => t,
_ => {
return poem::Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body("token query parameter required");
}
};
let project = match params.project {
Some(p) if !p.is_empty() => p,
_ => {
return poem::Response::builder()
.status(StatusCode::BAD_REQUEST)
.body("project query parameter required");
}
};
// Validate and consume the one-time token.
{
let mut tokens = state.pending_tokens.write().await;
if !tokens.contains_key(&token) {
return poem::Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body("invalid or already-used join token");
}
tokens.remove(&token);
}
// ── WebSocket upgrade ────────────────────────────────────────────────
use poem::IntoResponse as _;
let state = Arc::clone(&state);
ws.on_upgrade(move |socket| async move {
let (_, mut stream) = socket.split();
crate::slog!(
"[gateway] Project node '{}' connected to event-push endpoint",
project
);
while let Some(msg) = stream.next().await {
let text = match msg {
Ok(WsMessage::Text(t)) => t,
Ok(WsMessage::Close(_)) | Err(_) => break,
_ => continue,
};
match serde_json::from_str::<crate::service::events::StoredEvent>(&text) {
Ok(event) => {
gateway::broadcast_status_event(&state, project.clone(), event);
}
Err(e) => {
crate::slog!(
"[gateway] event-push: invalid frame from '{}': {e}",
project
);
}
}
}
crate::slog!(
"[gateway] Project node '{}' disconnected from event-push endpoint",
project
);
})
.into_response()
}
// ── Health handler ──────────────────────────────────────────────────────────
/// HTTP GET `/health` handler for the gateway.