huskies: merge 778
This commit is contained in:
+181
-1
@@ -4,7 +4,7 @@
|
||||
//! the response. No inline business logic, no `reqwest`, no filesystem access.
|
||||
|
||||
use crate::service::gateway::{self, GatewayState};
|
||||
use futures::StreamExt;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use poem::handler;
|
||||
use poem::http::StatusCode;
|
||||
use poem::web::Path as PoemPath;
|
||||
@@ -513,6 +513,186 @@ pub async fn gateway_generate_token_handler(state: Data<&Arc<GatewayState>>) ->
|
||||
.body(Body::from(serde_json::to_vec(&body).unwrap_or_default()))
|
||||
}
|
||||
|
||||
// ── CRDT-sync WebSocket handler (agent registration) ───────────────────────
|
||||
|
||||
/// Query parameters accepted on the `/crdt-sync` WebSocket upgrade in gateway mode.
|
||||
#[derive(Deserialize)]
|
||||
struct GatewayCrdtSyncParams {
|
||||
/// One-time join token from `POST /gateway/tokens`.
|
||||
token: Option<String>,
|
||||
/// Human-readable label for the connecting agent.
|
||||
label: Option<String>,
|
||||
/// WebSocket address the agent exposes for mesh connections
|
||||
/// (e.g. `ws://0.0.0.0:3002/crdt-sync`).
|
||||
address: Option<String>,
|
||||
}
|
||||
|
||||
/// `GET /crdt-sync` — gateway-side WebSocket endpoint for build agent registration.
|
||||
///
|
||||
/// # Authentication
|
||||
///
|
||||
/// The connecting node must supply a valid one-time join token via the `token`
|
||||
/// query parameter, obtained from `POST /gateway/tokens`. The token is
|
||||
/// validated pre-upgrade and consumed by [`gateway::register_agent`] once the
|
||||
/// WebSocket is established.
|
||||
///
|
||||
/// # Protocol
|
||||
///
|
||||
/// 1. Pre-upgrade: token existence is verified (read-only).
|
||||
/// 2. Post-upgrade: [`gateway::register_agent`] writes the node to the CRDT and
|
||||
/// consumes the token.
|
||||
/// 3. Keepalive: the server sends Ping frames every 30 s; each Pong resets the
|
||||
/// pong deadline and calls [`gateway::heartbeat_agent`].
|
||||
/// 4. Disconnect: on clean close or pong timeout, [`gateway::remove_agent`]
|
||||
/// tombstones the node (`alive = false`).
|
||||
#[handler]
|
||||
pub async fn gateway_crdt_sync_handler(
|
||||
ws: WebSocket,
|
||||
state: Data<&Arc<GatewayState>>,
|
||||
Query(params): Query<GatewayCrdtSyncParams>,
|
||||
remote_addr: &poem::web::RemoteAddr,
|
||||
) -> poem::Response {
|
||||
use crate::crdt_sync::{PING_INTERVAL_SECS, PONG_TIMEOUT_SECS};
|
||||
|
||||
// ── Pre-upgrade: validate token exists ─────────────────────────────
|
||||
let token = match params.token {
|
||||
Some(t) if !t.is_empty() => t,
|
||||
_ => {
|
||||
return poem::Response::builder()
|
||||
.status(StatusCode::UNAUTHORIZED)
|
||||
.body("token query parameter required");
|
||||
}
|
||||
};
|
||||
|
||||
{
|
||||
let tokens = state.pending_tokens.read().await;
|
||||
if !tokens.contains_key(&token) {
|
||||
return poem::Response::builder()
|
||||
.status(StatusCode::UNAUTHORIZED)
|
||||
.body("invalid or already-used join token");
|
||||
}
|
||||
}
|
||||
|
||||
let label = params
|
||||
.label
|
||||
.filter(|l| !l.is_empty())
|
||||
.unwrap_or_else(|| "build-agent".to_string());
|
||||
let address = params
|
||||
.address
|
||||
.filter(|a| !a.is_empty())
|
||||
.unwrap_or_else(|| remote_addr.to_string());
|
||||
|
||||
// ── WebSocket upgrade ───────────────────────────────────────────────
|
||||
use poem::IntoResponse as _;
|
||||
let state = Arc::clone(&state);
|
||||
ws.on_upgrade(move |socket| async move {
|
||||
// Register the agent — consumes the one-time token.
|
||||
let node = match gateway::register_agent(&state, &token, label, address).await {
|
||||
Ok(n) => n,
|
||||
Err(e) => {
|
||||
crate::slog!("[gateway/crdt-sync] Registration failed: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let node_id = node.node_id.clone();
|
||||
crate::slog!(
|
||||
"[gateway/crdt-sync] Agent '{}' registered, node_id={:.12}…",
|
||||
node.label.as_deref().unwrap_or("?"),
|
||||
&node_id
|
||||
);
|
||||
|
||||
let (mut sink, mut stream) = socket.split();
|
||||
|
||||
let mut pong_deadline =
|
||||
tokio::time::Instant::now() + std::time::Duration::from_secs(PONG_TIMEOUT_SECS);
|
||||
let mut ping_ticker = tokio::time::interval_at(
|
||||
tokio::time::Instant::now() + std::time::Duration::from_secs(PING_INTERVAL_SECS),
|
||||
std::time::Duration::from_secs(PING_INTERVAL_SECS),
|
||||
);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = ping_ticker.tick() => {
|
||||
if tokio::time::Instant::now() >= pong_deadline {
|
||||
crate::slog!(
|
||||
"[gateway/crdt-sync] No pong from {:.12}… in {}s; disconnecting",
|
||||
&node_id, PONG_TIMEOUT_SECS
|
||||
);
|
||||
break;
|
||||
}
|
||||
if sink.send(WsMessage::Ping(vec![])).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
frame = stream.next() => {
|
||||
match frame {
|
||||
Some(Ok(WsMessage::Pong(_))) => {
|
||||
pong_deadline = tokio::time::Instant::now()
|
||||
+ std::time::Duration::from_secs(PONG_TIMEOUT_SECS);
|
||||
gateway::heartbeat_agent(&node_id);
|
||||
}
|
||||
Some(Ok(WsMessage::Ping(data))) => {
|
||||
let _ = sink.send(WsMessage::Pong(data)).await;
|
||||
}
|
||||
Some(Ok(WsMessage::Close(_))) | None => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
gateway::remove_agent(&node_id);
|
||||
crate::slog!(
|
||||
"[gateway/crdt-sync] Agent {:.12}… disconnected and tombstoned",
|
||||
&node_id
|
||||
);
|
||||
})
|
||||
.into_response()
|
||||
}
|
||||
|
||||
// ── Agent management HTTP handlers ──────────────────────────────────────────
|
||||
|
||||
/// `GET /gateway/agents` — list all alive build agents registered in the CRDT.
|
||||
#[handler]
|
||||
pub async fn gateway_list_agents_handler(_state: Data<&Arc<GatewayState>>) -> Response {
|
||||
let agents = gateway::list_agents();
|
||||
let body = serde_json::to_vec(&agents).unwrap_or_default();
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header("Content-Type", "application/json")
|
||||
.body(Body::from(body))
|
||||
}
|
||||
|
||||
/// Request body for assigning an agent to a project.
|
||||
#[derive(Deserialize)]
|
||||
struct AssignAgentRequest {
|
||||
project: Option<String>,
|
||||
}
|
||||
|
||||
/// `POST /gateway/agents/:id/assign` — assign (or unassign) an agent to a project.
|
||||
#[handler]
|
||||
pub async fn gateway_assign_agent_handler(
|
||||
PoemPath(id): PoemPath<String>,
|
||||
body: Json<AssignAgentRequest>,
|
||||
state: Data<&Arc<GatewayState>>,
|
||||
) -> Response {
|
||||
match gateway::assign_agent(&state, &id, body.0.project).await {
|
||||
Ok(agent) => {
|
||||
let body = serde_json::to_vec(&agent).unwrap_or_default();
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header("Content-Type", "application/json")
|
||||
.body(Body::from(body))
|
||||
}
|
||||
Err(gateway::Error::ProjectNotFound(msg)) => Response::builder()
|
||||
.status(StatusCode::BAD_REQUEST)
|
||||
.body(Body::from(msg)),
|
||||
Err(_) => Response::builder()
|
||||
.status(StatusCode::NOT_FOUND)
|
||||
.body(Body::from("agent not found")),
|
||||
}
|
||||
}
|
||||
|
||||
// ── Event-push WebSocket handler ────────────────────────────────────────────
|
||||
|
||||
/// Query parameters accepted on the `/gateway/events/push` WebSocket upgrade.
|
||||
|
||||
Reference in New Issue
Block a user