From fb5a21cfbbada547934b954cc9549ead2b5a6919 Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 28 Apr 2026 09:54:51 +0000 Subject: [PATCH] huskies: merge 778 --- server/src/crdt_state/types.rs | 2 +- server/src/gateway.rs | 58 ++++++++++ server/src/http/gateway.rs | 182 +++++++++++++++++++++++++++++- server/src/service/gateway/mod.rs | 5 - 4 files changed, 240 insertions(+), 7 deletions(-) diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs index bef0c715..f1e4d2e6 100644 --- a/server/src/crdt_state/types.rs +++ b/server/src/crdt_state/types.rs @@ -109,7 +109,7 @@ pub struct PipelineItemView { } /// A snapshot of a single node presence entry derived from the CRDT document. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, serde::Serialize)] pub struct NodePresenceView { pub node_id: String, pub address: String, diff --git a/server/src/gateway.rs b/server/src/gateway.rs index dc8623ee..f92844da 100644 --- a/server/src/gateway.rs +++ b/server/src/gateway.rs @@ -58,6 +58,14 @@ pub fn build_gateway_route(state_arc: Arc) -> impl poem::Endpoint "/gateway/events/push", poem::get(gateway_event_push_handler), ) + // Agent registration via CRDT-sync WebSocket. + .at("/crdt-sync", poem::get(gateway_crdt_sync_handler)) + // Agent management REST endpoints. + .at("/gateway/agents", poem::get(gateway_list_agents_handler)) + .at( + "/gateway/agents/:id/assign", + poem::post(gateway_assign_agent_handler), + ) // Serve the embedded React frontend so the gateway has a UI. .at( "/assets/*path", @@ -161,6 +169,56 @@ mod tests { // ── poem::test::TestClient and mock HTTP servers remain here since they // ── test the combined HTTP + service interaction through real routes. + #[tokio::test] + async fn crdt_sync_handshake_then_register_writes_crdt_node() { + crate::crdt_state::init_for_test(); + let state = make_test_state(); + + // Generate a valid join token via the tokens endpoint. + let token_app = poem::Route::new() + .at( + "/gateway/tokens", + poem::post(gateway_generate_token_handler), + ) + .data(state.clone()); + let cli = poem::test::TestClient::new(token_app); + let resp = cli.post("/gateway/tokens").send().await; + assert_eq!(resp.0.status(), poem::http::StatusCode::OK); + let body: serde_json::Value = resp.0.into_body().into_json().await.unwrap(); + let token = body["token"].as_str().unwrap().to_string(); + + // Token must be pending before the upgrade. + assert!(state.pending_tokens.read().await.contains_key(&token)); + + // Call register_agent directly (the service function exercised by the handler). + let node = crate::service::gateway::register_agent( + &state, + &token, + "test-label".into(), + "ws://test:9000".into(), + ) + .await + .unwrap(); + + // Token is consumed after registration. + assert!(state.pending_tokens.read().await.is_empty()); + + // CRDT node was written and is visible via list_agents. + let agents = crate::service::gateway::list_agents(); + assert!( + agents.iter().any(|n| n.node_id == node.node_id), + "Registered node must appear in list_agents" + ); + + // remove_agent tombstones the node. + assert!(crate::service::gateway::remove_agent(&node.node_id)); + let alive = crate::service::gateway::list_agents(); + assert!( + !alive.iter().any(|n| n.node_id == node.node_id), + "Tombstoned node must not appear in list_agents" + ); + } + #[tokio::test] async fn generate_token_creates_pending_token() { let state = make_test_state(); diff --git a/server/src/http/gateway.rs b/server/src/http/gateway.rs index ac78e2cd..85f32743 100644 --- a/server/src/http/gateway.rs +++ b/server/src/http/gateway.rs @@ -4,7 +4,7 @@ //! the response. No inline business logic, no `reqwest`, no filesystem access. use crate::service::gateway::{self, GatewayState}; -use futures::StreamExt; +use futures::{SinkExt, StreamExt}; use poem::handler; use poem::http::StatusCode; use poem::web::Path as PoemPath; @@ -513,6 +513,186 @@ pub async fn gateway_generate_token_handler(state: Data<&Arc>) -> .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) } +// ── CRDT-sync WebSocket handler (agent registration) ─────────────────────── + +/// Query parameters accepted on the `/crdt-sync` WebSocket upgrade in gateway mode. +#[derive(Deserialize)] +struct GatewayCrdtSyncParams { + /// One-time join token from `POST /gateway/tokens`. + token: Option, + /// Human-readable label for the connecting agent. + label: Option, + /// WebSocket address the agent exposes for mesh connections + /// (e.g. `ws://0.0.0.0:3002/crdt-sync`). + address: Option, +} + +/// `GET /crdt-sync` — gateway-side WebSocket endpoint for build agent registration. +/// +/// # Authentication +/// +/// The connecting node must supply a valid one-time join token via the `token` +/// query parameter, obtained from `POST /gateway/tokens`. The token is +/// validated pre-upgrade and consumed by [`gateway::register_agent`] once the +/// WebSocket is established. +/// +/// # Protocol +/// +/// 1. Pre-upgrade: token existence is verified (read-only). +/// 2. Post-upgrade: [`gateway::register_agent`] writes the node to the CRDT and +/// consumes the token. +/// 3. Keepalive: the server sends Ping frames every 30 s; each Pong resets the +/// pong deadline and calls [`gateway::heartbeat_agent`]. +/// 4. Disconnect: on clean close or pong timeout, [`gateway::remove_agent`] +/// tombstones the node (`alive = false`). +#[handler] +pub async fn gateway_crdt_sync_handler( + ws: WebSocket, + state: Data<&Arc>, + Query(params): Query, + remote_addr: &poem::web::RemoteAddr, +) -> poem::Response { + use crate::crdt_sync::{PING_INTERVAL_SECS, PONG_TIMEOUT_SECS}; + + // ── Pre-upgrade: validate token exists ───────────────────────────── + let token = match params.token { + Some(t) if !t.is_empty() => t, + _ => { + return poem::Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body("token query parameter required"); + } + }; + + { + let tokens = state.pending_tokens.read().await; + if !tokens.contains_key(&token) { + return poem::Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body("invalid or already-used join token"); + } + } + + let label = params + .label + .filter(|l| !l.is_empty()) + .unwrap_or_else(|| "build-agent".to_string()); + let address = params + .address + .filter(|a| !a.is_empty()) + .unwrap_or_else(|| remote_addr.to_string()); + + // ── WebSocket upgrade ─────────────────────────────────────────────── + use poem::IntoResponse as _; + let state = Arc::clone(&state); + ws.on_upgrade(move |socket| async move { + // Register the agent — consumes the one-time token. + let node = match gateway::register_agent(&state, &token, label, address).await { + Ok(n) => n, + Err(e) => { + crate::slog!("[gateway/crdt-sync] Registration failed: {e}"); + return; + } + }; + let node_id = node.node_id.clone(); + crate::slog!( + "[gateway/crdt-sync] Agent '{}' registered, node_id={:.12}…", + node.label.as_deref().unwrap_or("?"), + &node_id + ); + + let (mut sink, mut stream) = socket.split(); + + let mut pong_deadline = + tokio::time::Instant::now() + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); + let mut ping_ticker = tokio::time::interval_at( + tokio::time::Instant::now() + std::time::Duration::from_secs(PING_INTERVAL_SECS), + std::time::Duration::from_secs(PING_INTERVAL_SECS), + ); + + loop { + tokio::select! { + _ = ping_ticker.tick() => { + if tokio::time::Instant::now() >= pong_deadline { + crate::slog!( + "[gateway/crdt-sync] No pong from {:.12}… in {}s; disconnecting", + &node_id, PONG_TIMEOUT_SECS + ); + break; + } + if sink.send(WsMessage::Ping(vec![])).await.is_err() { + break; + } + } + frame = stream.next() => { + match frame { + Some(Ok(WsMessage::Pong(_))) => { + pong_deadline = tokio::time::Instant::now() + + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); + gateway::heartbeat_agent(&node_id); + } + Some(Ok(WsMessage::Ping(data))) => { + let _ = sink.send(WsMessage::Pong(data)).await; + } + Some(Ok(WsMessage::Close(_))) | None => break, + _ => {} + } + } + } + } + + gateway::remove_agent(&node_id); + crate::slog!( + "[gateway/crdt-sync] Agent {:.12}… disconnected and tombstoned", + &node_id + ); + }) + .into_response() +} + +// ── Agent management HTTP handlers ────────────────────────────────────────── + +/// `GET /gateway/agents` — list all alive build agents registered in the CRDT. +#[handler] +pub async fn gateway_list_agents_handler(_state: Data<&Arc>) -> Response { + let agents = gateway::list_agents(); + let body = serde_json::to_vec(&agents).unwrap_or_default(); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(body)) +} + +/// Request body for assigning an agent to a project. +#[derive(Deserialize)] +struct AssignAgentRequest { + project: Option, +} + +/// `POST /gateway/agents/:id/assign` — assign (or unassign) an agent to a project. +#[handler] +pub async fn gateway_assign_agent_handler( + PoemPath(id): PoemPath, + body: Json, + state: Data<&Arc>, +) -> Response { + match gateway::assign_agent(&state, &id, body.0.project).await { + Ok(agent) => { + let body = serde_json::to_vec(&agent).unwrap_or_default(); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(body)) + } + Err(gateway::Error::ProjectNotFound(msg)) => Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(msg)), + Err(_) => Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("agent not found")), + } +} + // ── Event-push WebSocket handler ──────────────────────────────────────────── /// Query parameters accepted on the `/gateway/events/push` WebSocket upgrade. diff --git a/server/src/service/gateway/mod.rs b/server/src/service/gateway/mod.rs index a0c664ea..cee8da1b 100644 --- a/server/src/service/gateway/mod.rs +++ b/server/src/service/gateway/mod.rs @@ -186,7 +186,6 @@ pub async fn generate_join_token(state: &GatewayState) -> String { /// /// Validates and consumes the token, then writes the agent's node presence /// and metadata to the CRDT collection. Returns the newly-created node view. -#[allow(dead_code)] pub async fn register_agent( state: &GatewayState, token: &str, @@ -225,7 +224,6 @@ pub async fn register_agent( /// Tombstone a registered agent in the CRDT (set `alive = false`). /// /// Returns `true` if the node was found and tombstoned. -#[allow(dead_code)] pub fn remove_agent(node_id: &str) -> bool { let nodes = crate::crdt_state::read_all_node_presence().unwrap_or_default(); let Some(node) = nodes.iter().find(|n| n.node_id == node_id) else { @@ -240,7 +238,6 @@ pub fn remove_agent(node_id: &str) -> bool { /// /// Validates that the project exists in the gateway config (when assigning), /// then writes the updated `assigned_project` field to the CRDT. -#[allow(dead_code)] pub async fn assign_agent( state: &GatewayState, node_id: &str, @@ -275,7 +272,6 @@ pub async fn assign_agent( } /// Update an agent's heartbeat via CRDT. Returns `true` if the node was found. -#[allow(dead_code)] pub fn heartbeat_agent(id: &str) -> bool { let now = chrono::Utc::now().timestamp() as f64; let nodes = crate::crdt_state::read_all_node_presence().unwrap_or_default(); @@ -287,7 +283,6 @@ pub fn heartbeat_agent(id: &str) -> bool { } /// List all registered build agents from the CRDT nodes collection. -#[allow(dead_code)] pub fn list_agents() -> Vec { crate::crdt_state::read_all_node_presence() .unwrap_or_default()