From 05d057a40a9df3e059e8b586fb4cf73286f47b99 Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 28 Apr 2026 10:56:09 +0000 Subject: [PATCH] huskies: merge 782 --- server/src/http/gateway.rs | 1224 -------------------------- server/src/http/gateway/jsonrpc.rs | 64 ++ server/src/http/gateway/mcp.rs | 426 +++++++++ server/src/http/gateway/mod.rs | 24 + server/src/http/gateway/rest.rs | 493 +++++++++++ server/src/http/gateway/websocket.rs | 260 ++++++ 6 files changed, 1267 insertions(+), 1224 deletions(-) delete mode 100644 server/src/http/gateway.rs create mode 100644 server/src/http/gateway/jsonrpc.rs create mode 100644 server/src/http/gateway/mcp.rs create mode 100644 server/src/http/gateway/mod.rs create mode 100644 server/src/http/gateway/rest.rs create mode 100644 server/src/http/gateway/websocket.rs diff --git a/server/src/http/gateway.rs b/server/src/http/gateway.rs deleted file mode 100644 index 87d6e2f6..00000000 --- a/server/src/http/gateway.rs +++ /dev/null @@ -1,1224 +0,0 @@ -//! Gateway HTTP handlers — thin transport shells for the gateway service. -//! -//! Each handler calls `service::gateway::*` for business logic and formats -//! the response. No inline business logic, no `reqwest`, no filesystem access. - -use crate::service::gateway::{self, GatewayState}; -use futures::{SinkExt, StreamExt}; -use poem::handler; -use poem::http::StatusCode; -use poem::web::Path as PoemPath; -use poem::web::Query; -use poem::web::websocket::{Message as WsMessage, WebSocket}; -use poem::web::{Data, Json}; -use poem::{Body, Request, Response}; -use serde::{Deserialize, Serialize}; -use serde_json::{Value, json}; -use std::collections::BTreeMap; -use std::sync::Arc; - -// ── JSON-RPC types ────────────────────────────────────────────────────────── - -/// JSON-RPC request. -#[derive(Deserialize)] -struct JsonRpcRequest { - jsonrpc: String, - id: Option, - method: String, - #[serde(default)] - params: Value, -} - -/// JSON-RPC response. -#[derive(Serialize)] -pub(crate) struct JsonRpcResponse { - jsonrpc: &'static str, - #[serde(skip_serializing_if = "Option::is_none")] - pub(crate) id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub(crate) result: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub(crate) error: Option, -} - -#[derive(Debug, Serialize)] -pub(crate) struct JsonRpcError { - code: i64, - pub(crate) message: String, -} - -impl JsonRpcResponse { - pub(crate) fn success(id: Option, result: Value) -> Self { - Self { - jsonrpc: "2.0", - id, - result: Some(result), - error: None, - } - } - - pub(crate) fn error(id: Option, code: i64, message: String) -> Self { - Self { - jsonrpc: "2.0", - id, - result: None, - error: Some(JsonRpcError { code, message }), - } - } -} - -fn to_json_response(resp: JsonRpcResponse) -> Response { - let body = serde_json::to_vec(&resp).unwrap_or_default(); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from(body)) -} - -// ── MCP tool definitions ─────────────────────────────────────────���────────── - -/// Gateway-specific MCP tools exposed alongside the proxied tools. -const GATEWAY_TOOLS: &[&str] = &[ - "switch_project", - "gateway_status", - "gateway_health", - "init_project", - "aggregate_pipeline_status", -]; - -/// Gateway tool definitions. -pub(crate) fn gateway_tool_definitions() -> Vec { - vec![ - json!({ - "name": "switch_project", - "description": "Switch the active project. All subsequent MCP tool calls will be proxied to this project's container.", - "inputSchema": { - "type": "object", - "properties": { - "project": { - "type": "string", - "description": "Name of the project to switch to (must exist in projects.toml)" - } - }, - "required": ["project"] - } - }), - json!({ - "name": "gateway_status", - "description": "Show pipeline status for the active project by proxying the get_pipeline_status tool call.", - "inputSchema": { - "type": "object", - "properties": {} - } - }), - json!({ - "name": "gateway_health", - "description": "Health check aggregation across all registered projects. Returns the health status of every project container.", - "inputSchema": { - "type": "object", - "properties": {} - } - }), - json!({ - "name": "init_project", - "description": "Initialize a new huskies project at the given path by scaffolding .huskies/ and related files — the same as running `huskies init `. Prefer this tool over asking the user to run the CLI. If `name` and `url` are supplied the project is also registered in projects.toml so switch_project can reach it immediately.", - "inputSchema": { - "type": "object", - "properties": { - "path": { - "type": "string", - "description": "Absolute filesystem path to the project directory to initialise. The directory is created if it does not exist." - }, - "name": { - "type": "string", - "description": "Optional: short name to register the project under in projects.toml (e.g. 'my-app'). Requires `url`." - }, - "url": { - "type": "string", - "description": "Optional: base URL of the huskies container that will serve this project (e.g. 'http://my-app:3001'). Required when `name` is given." - } - }, - "required": ["path"] - } - }), - json!({ - "name": "aggregate_pipeline_status", - "description": "Fetch pipeline status from ALL registered projects in parallel and return an aggregated report. For each project: stage counts (backlog/current/qa/merge/done) and a list of blocked or failing items with triage detail. Unreachable projects are included with an error state rather than failing the whole call.", - "inputSchema": { - "type": "object", - "properties": {} - } - }), - ] -} - -// ── MCP POST handler ──────────────────────────────────────────────────────── - -/// Main MCP POST handler for the gateway. Intercepts gateway-specific tools and -/// proxies everything else to the active project's container. -#[handler] -pub async fn gateway_mcp_post_handler( - req: &Request, - body: Body, - state: Data<&Arc>, -) -> Response { - let content_type = req.header("content-type").unwrap_or(""); - if !content_type.is_empty() && !content_type.contains("application/json") { - return to_json_response(JsonRpcResponse::error( - None, - -32700, - "Unsupported Content-Type; expected application/json".into(), - )); - } - - let bytes = match body.into_bytes().await { - Ok(b) => b, - Err(_) => { - return to_json_response(JsonRpcResponse::error(None, -32700, "Parse error".into())); - } - }; - - let rpc: JsonRpcRequest = match serde_json::from_slice(&bytes) { - Ok(r) => r, - Err(_) => { - return to_json_response(JsonRpcResponse::error(None, -32700, "Parse error".into())); - } - }; - - if rpc.jsonrpc != "2.0" { - return to_json_response(JsonRpcResponse::error( - rpc.id, - -32600, - "Invalid JSON-RPC version".into(), - )); - } - - if rpc.id.is_none() || rpc.id.as_ref() == Some(&Value::Null) { - if rpc.method.starts_with("notifications/") { - return Response::builder() - .status(StatusCode::ACCEPTED) - .body(Body::empty()); - } - return to_json_response(JsonRpcResponse::error(None, -32600, "Missing id".into())); - } - - match rpc.method.as_str() { - "initialize" => to_json_response(handle_initialize(rpc.id)), - "tools/list" => match handle_tools_list(&state, rpc.id.clone()).await { - Ok(resp) => to_json_response(resp), - Err(e) => to_json_response(JsonRpcResponse::error(rpc.id, -32603, e)), - }, - "tools/call" => { - let tool_name = rpc - .params - .get("name") - .and_then(|v| v.as_str()) - .unwrap_or(""); - - if GATEWAY_TOOLS.contains(&tool_name) { - to_json_response( - handle_gateway_tool(tool_name, &rpc.params, &state, rpc.id.clone()).await, - ) - } else { - proxy_and_respond(&state, &bytes, rpc.id).await - } - } - _ => proxy_and_respond(&state, &bytes, rpc.id).await, - } -} - -/// Proxy a request to the active project and format the response. -async fn proxy_and_respond(state: &GatewayState, bytes: &[u8], id: Option) -> Response { - let url = match state.active_url().await { - Ok(u) => u, - Err(e) => { - return to_json_response(JsonRpcResponse::error(id, -32603, e.to_string())); - } - }; - match gateway::io::proxy_mcp_call(&state.client, &url, bytes).await { - Ok(resp_body) => Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from(resp_body)), - Err(e) => to_json_response(JsonRpcResponse::error( - id, - -32603, - format!("proxy error: {e}"), - )), - } -} - -/// GET handler — method not allowed. -#[handler] -pub async fn gateway_mcp_get_handler() -> Response { - Response::builder() - .status(StatusCode::METHOD_NOT_ALLOWED) - .body(Body::empty()) -} - -// ── Protocol handlers ─────────────────────────────────────────────────────── - -fn handle_initialize(id: Option) -> JsonRpcResponse { - JsonRpcResponse::success( - id, - json!({ - "protocolVersion": "2025-03-26", - "capabilities": { "tools": {} }, - "serverInfo": { - "name": "huskies-gateway", - "version": "1.0.0" - } - }), - ) -} - -/// Fetch tools/list from the active project and merge in gateway tools. -async fn handle_tools_list( - state: &GatewayState, - id: Option, -) -> Result { - let url = state.active_url().await.map_err(|e| e.to_string())?; - - let resp_json = gateway::io::fetch_tools_list(&state.client, &url).await?; - - let mut tools: Vec = resp_json - .get("result") - .and_then(|r| r.get("tools")) - .and_then(|t| t.as_array()) - .cloned() - .unwrap_or_default(); - - let mut all_tools = gateway_tool_definitions(); - all_tools.append(&mut tools); - - Ok(JsonRpcResponse::success(id, json!({ "tools": all_tools }))) -} - -// ── Gateway tool dispatch ──────────────────────────────────────────────��──── - -/// Dispatch a gateway-specific tool call. -async fn handle_gateway_tool( - tool_name: &str, - params: &Value, - state: &GatewayState, - id: Option, -) -> JsonRpcResponse { - match tool_name { - "switch_project" => handle_switch_project_tool(params, state, id).await, - "gateway_status" => handle_gateway_status_tool(state, id).await, - "gateway_health" => handle_gateway_health_tool(state, id).await, - "init_project" => handle_init_project_tool(params, state, id).await, - "aggregate_pipeline_status" => handle_aggregate_pipeline_status_tool(state, id).await, - _ => JsonRpcResponse::error(id, -32601, format!("Unknown gateway tool: {tool_name}")), - } -} - -async fn handle_switch_project_tool( - params: &Value, - state: &GatewayState, - id: Option, -) -> JsonRpcResponse { - let project = params - .get("arguments") - .and_then(|a| a.get("project")) - .or_else(|| params.get("project")) - .and_then(|v| v.as_str()) - .unwrap_or(""); - - match gateway::switch_project(state, project).await { - Ok(url) => JsonRpcResponse::success( - id, - json!({ - "content": [{ - "type": "text", - "text": format!("Switched to project '{project}' ({url})") - }] - }), - ), - Err(e) => JsonRpcResponse::error(id, -32602, e.to_string()), - } -} - -async fn handle_gateway_status_tool(state: &GatewayState, id: Option) -> JsonRpcResponse { - let active = state.active_project.read().await.clone(); - let url = match state.active_url().await { - Ok(u) => u, - Err(e) => return JsonRpcResponse::error(id.clone(), -32603, e.to_string()), - }; - - match gateway::io::fetch_pipeline_status_for_project(&state.client, &url).await { - Ok(upstream) => { - let pipeline = upstream.get("result").cloned().unwrap_or(json!(null)); - JsonRpcResponse::success( - id, - json!({ - "content": [{ - "type": "text", - "text": format!( - "Pipeline status for '{active}':\n{}", - serde_json::to_string_pretty(&pipeline).unwrap_or_default() - ) - }] - }), - ) - } - Err(e) => JsonRpcResponse::error(id, -32603, e), - } -} - -async fn handle_gateway_health_tool(state: &GatewayState, id: Option) -> JsonRpcResponse { - let mut results = BTreeMap::new(); - - let project_entries: Vec<(String, String)> = state - .projects - .read() - .await - .iter() - .map(|(n, e)| (n.clone(), e.url.clone())) - .collect(); - for (name, url) in &project_entries { - let status = match gateway::io::check_project_health(&state.client, url).await { - Ok(true) => "healthy".to_string(), - Ok(false) => "unhealthy".to_string(), - Err(e) => e, - }; - results.insert(name.clone(), status); - } - - let active = state.active_project.read().await.clone(); - JsonRpcResponse::success( - id, - json!({ - "content": [{ - "type": "text", - "text": format!( - "Health check (active: '{active}'):\n{}", - results.iter() - .map(|(name, status)| format!(" {name}: {status}")) - .collect::>() - .join("\n") - ) - }] - }), - ) -} - -async fn handle_init_project_tool( - params: &Value, - state: &GatewayState, - id: Option, -) -> JsonRpcResponse { - let args = params.get("arguments").unwrap_or(params); - - let path_str = args.get("path").and_then(|v| v.as_str()).unwrap_or(""); - let name = args.get("name").and_then(|v| v.as_str()); - let url = args.get("url").and_then(|v| v.as_str()); - - match gateway::init_project(state, path_str, name, url).await { - Ok(registered_name) => { - let next_steps = if let Some(ref n) = registered_name { - format!( - "Project registered as '{n}' in projects.toml.\n\ - Next steps:\n\ - 1. Start a huskies server at '{path_str}' \ - (e.g. `huskies {path_str}` or via Docker).\n\ - 2. Call switch_project with name='{n}' to make it active.\n\ - 3. Call wizard_status to begin the setup wizard." - ) - } else { - format!( - "Next steps:\n\ - 1. Start a huskies server at '{path_str}' \ - (e.g. `huskies {path_str}` or via Docker).\n\ - 2. Register the project: call init_project again with name and url \ - parameters, or add it to projects.toml manually.\n\ - 3. Call switch_project and then wizard_status to begin the setup wizard.\n\n\ - Note: wizard_* MCP tools require a running huskies server for the project." - ) - }; - - JsonRpcResponse::success( - id, - json!({ - "content": [{ - "type": "text", - "text": format!("Successfully initialised huskies project at '{path_str}'.\n\n{next_steps}") - }] - }), - ) - } - Err(e) => { - let code = match &e { - gateway::Error::Config(_) => -32602, - gateway::Error::DuplicateToken(_) => -32602, - _ => -32603, - }; - JsonRpcResponse::error(id, code, e.to_string()) - } - } -} - -async fn handle_aggregate_pipeline_status_tool( - state: &GatewayState, - id: Option, -) -> JsonRpcResponse { - let project_urls: BTreeMap = state - .projects - .read() - .await - .iter() - .map(|(name, entry)| (name.clone(), entry.url.clone())) - .collect(); - - let statuses = - gateway::io::fetch_all_project_pipeline_statuses(&project_urls, &state.client).await; - let active = state.active_project.read().await.clone(); - - JsonRpcResponse::success( - id, - json!({ - "content": [{ - "type": "text", - "text": format!( - "Aggregate pipeline status (active: '{active}'):\n{}", - serde_json::to_string_pretty(&statuses).unwrap_or_default() - ) - }], - "projects": statuses, - "active": active, - }), - ) -} - -// ── Agent REST handlers ───────────────────────────────────────────────────── - -/// `GET /gateway/mode` — returns `{"mode":"gateway"}`. -#[handler] -pub async fn gateway_mode_handler() -> Response { - let body = json!({ "mode": "gateway" }); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) -} - -/// `POST /gateway/tokens` — generate a one-time join token. -#[handler] -pub async fn gateway_generate_token_handler(state: Data<&Arc>) -> Response { - let token = gateway::generate_join_token(&state).await; - let body = json!({ "token": token }); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) -} - -// ── CRDT-sync WebSocket handler (agent registration) ─────────────────────── - -/// Query parameters accepted on the `/crdt-sync` WebSocket upgrade in gateway mode. -#[derive(Deserialize)] -struct GatewayCrdtSyncParams { - /// One-time join token from `POST /gateway/tokens`. - token: Option, - /// Human-readable label for the connecting agent. - label: Option, - /// WebSocket address the agent exposes for mesh connections - /// (e.g. `ws://0.0.0.0:3002/crdt-sync`). - address: Option, -} - -/// `GET /crdt-sync` — gateway-side WebSocket endpoint for build agent registration. -/// -/// # Authentication -/// -/// The connecting node must supply a valid one-time join token via the `token` -/// query parameter, obtained from `POST /gateway/tokens`. The token is -/// validated pre-upgrade and consumed by [`gateway::register_agent`] once the -/// WebSocket is established. -/// -/// # Protocol -/// -/// 1. Pre-upgrade: token existence is verified (read-only). -/// 2. Post-upgrade: [`gateway::register_agent`] writes the node to the CRDT and -/// consumes the token. -/// 3. Keepalive: the server sends Ping frames every 30 s; each Pong resets the -/// pong deadline and calls [`gateway::heartbeat_agent`]. -/// 4. Disconnect: on clean close or pong timeout, [`gateway::remove_agent`] -/// tombstones the node (`alive = false`). -#[handler] -pub async fn gateway_crdt_sync_handler( - ws: WebSocket, - state: Data<&Arc>, - Query(params): Query, - remote_addr: &poem::web::RemoteAddr, -) -> poem::Response { - use crate::crdt_sync::{PING_INTERVAL_SECS, PONG_TIMEOUT_SECS}; - - // ── Pre-upgrade: validate token exists ───────────────────────────── - let token = match params.token { - Some(t) if !t.is_empty() => t, - _ => { - return poem::Response::builder() - .status(StatusCode::UNAUTHORIZED) - .body("token query parameter required"); - } - }; - - { - let tokens = state.pending_tokens.read().await; - if !tokens.contains_key(&token) { - return poem::Response::builder() - .status(StatusCode::UNAUTHORIZED) - .body("invalid or already-used join token"); - } - } - - let label = params - .label - .filter(|l| !l.is_empty()) - .unwrap_or_else(|| "build-agent".to_string()); - let address = params - .address - .filter(|a| !a.is_empty()) - .unwrap_or_else(|| remote_addr.to_string()); - - // ── WebSocket upgrade ─────────────────────────────────────────────── - use poem::IntoResponse as _; - let state = Arc::clone(&state); - ws.on_upgrade(move |socket| async move { - // Register the agent — consumes the one-time token. - let node = match gateway::register_agent(&state, &token, label, address).await { - Ok(n) => n, - Err(e) => { - crate::slog!("[gateway/crdt-sync] Registration failed: {e}"); - return; - } - }; - let node_id = node.node_id.clone(); - crate::slog!( - "[gateway/crdt-sync] Agent '{}' registered, node_id={:.12}…", - node.label.as_deref().unwrap_or("?"), - &node_id - ); - - let (mut sink, mut stream) = socket.split(); - - let mut pong_deadline = - tokio::time::Instant::now() + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); - let mut ping_ticker = tokio::time::interval_at( - tokio::time::Instant::now() + std::time::Duration::from_secs(PING_INTERVAL_SECS), - std::time::Duration::from_secs(PING_INTERVAL_SECS), - ); - - loop { - tokio::select! { - _ = ping_ticker.tick() => { - if tokio::time::Instant::now() >= pong_deadline { - crate::slog!( - "[gateway/crdt-sync] No pong from {:.12}… in {}s; disconnecting", - &node_id, PONG_TIMEOUT_SECS - ); - break; - } - if sink.send(WsMessage::Ping(vec![])).await.is_err() { - break; - } - } - frame = stream.next() => { - match frame { - Some(Ok(WsMessage::Pong(_))) => { - pong_deadline = tokio::time::Instant::now() - + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); - gateway::heartbeat_agent(&node_id); - } - Some(Ok(WsMessage::Ping(data))) => { - let _ = sink.send(WsMessage::Pong(data)).await; - } - Some(Ok(WsMessage::Close(_))) | None => break, - _ => {} - } - } - } - } - - gateway::remove_agent(&node_id); - crate::slog!( - "[gateway/crdt-sync] Agent {:.12}… disconnected and tombstoned", - &node_id - ); - }) - .into_response() -} - -// ── Agent management HTTP handlers ────────────────────────────────────────── - -/// `GET /gateway/agents` — list all alive build agents registered in the CRDT. -#[handler] -pub async fn gateway_list_agents_handler(_state: Data<&Arc>) -> Response { - let agents = gateway::list_agents(); - let body = serde_json::to_vec(&agents).unwrap_or_default(); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from(body)) -} - -/// Request body for assigning an agent to a project. -#[derive(Deserialize)] -struct AssignAgentRequest { - project: Option, -} - -/// `POST /gateway/agents/:id/assign` — assign (or unassign) an agent to a project. -#[handler] -pub async fn gateway_assign_agent_handler( - PoemPath(id): PoemPath, - body: Json, - state: Data<&Arc>, -) -> Response { - match gateway::assign_agent(&state, &id, body.0.project).await { - Ok(agent) => { - let body = serde_json::to_vec(&agent).unwrap_or_default(); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from(body)) - } - Err(gateway::Error::ProjectNotFound(msg)) => Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::from(msg)), - Err(_) => Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::from("agent not found")), - } -} - -// ── Event-push WebSocket handler ──────────────────────────────────────────── - -/// Query parameters accepted on the `/gateway/events/push` WebSocket upgrade. -#[derive(Deserialize)] -struct EventPushQueryParams { - /// One-time join token generated by `POST /gateway/tokens`. - token: Option, - /// The project name this node represents (e.g. `"huskies"`). - project: Option, -} - -/// `GET /gateway/events/push` — WebSocket endpoint for project nodes to push -/// [`StatusEvent`] frames to the gateway. -/// -/// # Authentication -/// -/// The connecting node must supply a valid one-time join token via the `token` -/// query parameter, obtained from `POST /gateway/tokens`. The token is -/// consumed on the first successful upgrade — the connection itself is then -/// kept open indefinitely. -/// -/// # Protocol -/// -/// Each message from the project node must be a JSON-encoded -/// [`crate::service::events::StoredEvent`]. The gateway fan-outs the event -/// (tagged with the project name) to all current local subscribers. -/// -/// The server does not send data back; clients should treat any close frame -/// as a signal to reconnect with exponential back-off (see docs/gateway-protocol.html). -/// -/// # Reconnect-with-backoff -/// -/// Project nodes MUST reconnect on disconnect. Recommended policy: -/// -/// - Initial retry delay: **1 s** -/// - Back-off multiplier: **2×** per attempt -/// - Max delay cap: **60 s** -/// - Jitter: add ±10 % to the delay to avoid thundering herds -#[handler] -pub async fn gateway_event_push_handler( - ws: WebSocket, - state: Data<&Arc>, - Query(params): Query, -) -> poem::Response { - // ── Authentication (pre-upgrade) ───────────────────────────────────── - let token = match params.token { - Some(t) if !t.is_empty() => t, - _ => { - return poem::Response::builder() - .status(StatusCode::UNAUTHORIZED) - .body("token query parameter required"); - } - }; - - let project = match params.project { - Some(p) if !p.is_empty() => p, - _ => { - return poem::Response::builder() - .status(StatusCode::BAD_REQUEST) - .body("project query parameter required"); - } - }; - - // Validate and consume the one-time token. - { - let mut tokens = state.pending_tokens.write().await; - if !tokens.contains_key(&token) { - return poem::Response::builder() - .status(StatusCode::UNAUTHORIZED) - .body("invalid or already-used join token"); - } - tokens.remove(&token); - } - - // ── WebSocket upgrade ──────────────────────────────────────────────── - use poem::IntoResponse as _; - let state = Arc::clone(&state); - ws.on_upgrade(move |socket| async move { - let (_, mut stream) = socket.split(); - - crate::slog!( - "[gateway] Project node '{}' connected to event-push endpoint", - project - ); - - while let Some(msg) = stream.next().await { - let text = match msg { - Ok(WsMessage::Text(t)) => t, - Ok(WsMessage::Close(_)) | Err(_) => break, - _ => continue, - }; - - match serde_json::from_str::(&text) { - Ok(event) => { - gateway::broadcast_status_event(&state, project.clone(), event); - } - Err(e) => { - crate::slog!( - "[gateway] event-push: invalid frame from '{}': {e}", - project - ); - } - } - } - - crate::slog!( - "[gateway] Project node '{}' disconnected from event-push endpoint", - project - ); - }) - .into_response() -} - -// ── Gateway Web UI ────────────────────────────────────────────────────────── - -/// `GET /api/gateway` — returns the list of registered projects and the active project. -#[handler] -pub async fn gateway_api_handler(state: Data<&Arc>) -> Response { - let active = state.active_project.read().await.clone(); - let projects: Vec = state - .projects - .read() - .await - .iter() - .map(|(name, entry)| { - json!({ - "name": name, - "url": entry.url, - }) - }) - .collect(); - - let body = json!({ "active": active, "projects": projects }); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) -} - -/// Request body for `POST /api/gateway/switch`. -#[derive(Deserialize)] -struct SwitchRequest { - project: String, -} - -/// `POST /api/gateway/switch` — switch the active project. -#[handler] -pub async fn gateway_switch_handler( - state: Data<&Arc>, - body: Json, -) -> Response { - match gateway::switch_project(&state, &body.project).await { - Ok(_) => { - let body_val = json!({ "ok": true }); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from( - serde_json::to_vec(&body_val).unwrap_or_default(), - )) - } - Err(e) => { - let body_val = json!({ "ok": false, "error": e.to_string() }); - Response::builder() - .status(StatusCode::BAD_REQUEST) - .header("Content-Type", "application/json") - .body(Body::from( - serde_json::to_vec(&body_val).unwrap_or_default(), - )) - } - } -} - -// ── Project management API ────────────────────────────────────────────────── - -/// Request body for adding a new project. -#[derive(Deserialize)] -struct AddProjectRequest { - name: String, - url: String, -} - -/// `POST /api/gateway/projects` — add a new project. -#[handler] -pub async fn gateway_add_project_handler( - state: Data<&Arc>, - body: Json, -) -> Response { - match gateway::add_project(&state, &body.name, &body.url).await { - Ok(()) => { - let name = body.0.name.trim().to_string(); - let url = body.0.url.trim().to_string(); - let body_val = json!({ "name": name, "url": url }); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from( - serde_json::to_vec(&body_val).unwrap_or_default(), - )) - } - Err(gateway::Error::DuplicateToken(_)) => Response::builder() - .status(StatusCode::CONFLICT) - .body(Body::from(format!( - "project '{}' already exists", - body.0.name.trim() - ))), - Err(e) => Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::from(e.to_string())), - } -} - -/// `DELETE /api/gateway/projects/:name` — remove a project. -#[handler] -pub async fn gateway_remove_project_handler( - PoemPath(name): PoemPath, - state: Data<&Arc>, -) -> Response { - match gateway::remove_project(&state, &name).await { - Ok(()) => Response::builder() - .status(StatusCode::NO_CONTENT) - .body(Body::empty()), - Err(gateway::Error::ProjectNotFound(msg)) => Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::from(msg)), - Err(e) => Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::from(e.to_string())), - } -} - -// ── Bot configuration API ─────────────────────────────────────────────────── - -/// Request/response body for the bot configuration API. -#[derive(Deserialize, Serialize, Default)] -pub(crate) struct BotConfigPayload { - transport: String, - homeserver: Option, - username: Option, - password: Option, - slack_bot_token: Option, - slack_signing_secret: Option, -} - -/// `GET /api/gateway/bot-config` — return current bot.toml fields as JSON. -#[handler] -pub async fn gateway_bot_config_get_handler(state: Data<&Arc>) -> Response { - let fields = gateway::io::read_bot_config_raw(&state.config_dir); - let payload = BotConfigPayload { - transport: fields.transport, - homeserver: fields.homeserver, - username: fields.username, - password: fields.password, - slack_bot_token: fields.slack_bot_token, - slack_signing_secret: fields.slack_signing_secret, - }; - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from(serde_json::to_vec(&payload).unwrap_or_default())) -} - -/// `POST /api/gateway/bot-config` — write new bot.toml and restart the bot. -#[handler] -pub async fn gateway_bot_config_save_handler( - state: Data<&Arc>, - body: Json, -) -> Response { - let content = gateway::config::serialize_bot_config( - &body.transport, - body.homeserver.as_deref(), - body.username.as_deref(), - body.password.as_deref(), - body.slack_bot_token.as_deref(), - body.slack_signing_secret.as_deref(), - ); - - match gateway::save_bot_config_and_restart(&state, &content).await { - Ok(()) => { - let ok = json!({ "ok": true }); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from(serde_json::to_vec(&ok).unwrap_or_default())) - } - Err(e) => { - let err = json!({ "ok": false, "error": e.to_string() }); - Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .header("Content-Type", "application/json") - .body(Body::from(serde_json::to_vec(&err).unwrap_or_default())) - } - } -} - -/// `GET /api/gateway/pipeline` — fetch pipeline status from all registered projects. -#[handler] -pub async fn gateway_all_pipeline_handler(state: Data<&Arc>) -> Response { - let project_urls: BTreeMap = state - .projects - .read() - .await - .iter() - .map(|(n, e)| (n.clone(), e.url.clone())) - .collect(); - - let results = - gateway::io::fetch_all_project_pipeline_statuses(&project_urls, &state.client).await; - - let active = state.active_project.read().await.clone(); - let body = json!({ "active": active, "projects": results }); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) -} - -// ── Bot config page ───────────────────────────────────────────────────────── - -/// Self-contained HTML page for bot configuration. -const GATEWAY_BOT_CONFIG_HTML: &str = r#" - - - - -Bot Configuration — Huskies Gateway - - - -
-
- ← Gateway - -

Bot Configuration

-
-
- - -
-
-
-
- - -
-
- - -
-
- - -
-
- - -
-
- - - -"#; - -/// Serve the bot configuration HTML page at `GET /bot-config`. -#[handler] -pub async fn gateway_bot_config_page_handler() -> Response { - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "text/html; charset=utf-8") - .body(Body::from(GATEWAY_BOT_CONFIG_HTML)) -} diff --git a/server/src/http/gateway/jsonrpc.rs b/server/src/http/gateway/jsonrpc.rs new file mode 100644 index 00000000..4c90ed00 --- /dev/null +++ b/server/src/http/gateway/jsonrpc.rs @@ -0,0 +1,64 @@ +//! JSON-RPC 2.0 request/response types and helpers. + +use poem::http::StatusCode; +use poem::{Body, Response}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +// ── JSON-RPC types ────────────────────────────────────────────────────────── + +/// JSON-RPC request. +#[derive(Deserialize)] +pub(super) struct JsonRpcRequest { + pub(super) jsonrpc: String, + pub(super) id: Option, + pub(super) method: String, + #[serde(default)] + pub(super) params: Value, +} + +/// JSON-RPC response. +#[derive(Serialize)] +pub(crate) struct JsonRpcResponse { + jsonrpc: &'static str, + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) result: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) error: Option, +} + +#[derive(Debug, Serialize)] +pub(crate) struct JsonRpcError { + code: i64, + pub(crate) message: String, +} + +impl JsonRpcResponse { + pub(crate) fn success(id: Option, result: Value) -> Self { + Self { + jsonrpc: "2.0", + id, + result: Some(result), + error: None, + } + } + + pub(crate) fn error(id: Option, code: i64, message: String) -> Self { + Self { + jsonrpc: "2.0", + id, + result: None, + error: Some(JsonRpcError { code, message }), + } + } +} + +pub(super) fn to_json_response(resp: JsonRpcResponse) -> Response { + let body = serde_json::to_vec(&resp).unwrap_or_default(); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(body)) +} diff --git a/server/src/http/gateway/mcp.rs b/server/src/http/gateway/mcp.rs new file mode 100644 index 00000000..cd8fc136 --- /dev/null +++ b/server/src/http/gateway/mcp.rs @@ -0,0 +1,426 @@ +//! MCP JSON-RPC POST/GET handlers and gateway tool dispatch. + +use super::jsonrpc::{JsonRpcRequest, JsonRpcResponse, to_json_response}; +use crate::service::gateway::{self, GatewayState}; +use poem::handler; +use poem::http::StatusCode; +use poem::web::Data; +use poem::{Body, Request, Response}; +use serde_json::{Value, json}; +use std::collections::BTreeMap; +use std::sync::Arc; + +// ── MCP tool definitions ───────────────────────────────────────────────────── + +/// Gateway-specific MCP tools exposed alongside the proxied tools. +const GATEWAY_TOOLS: &[&str] = &[ + "switch_project", + "gateway_status", + "gateway_health", + "init_project", + "aggregate_pipeline_status", +]; + +/// Gateway tool definitions. +pub(crate) fn gateway_tool_definitions() -> Vec { + vec![ + json!({ + "name": "switch_project", + "description": "Switch the active project. All subsequent MCP tool calls will be proxied to this project's container.", + "inputSchema": { + "type": "object", + "properties": { + "project": { + "type": "string", + "description": "Name of the project to switch to (must exist in projects.toml)" + } + }, + "required": ["project"] + } + }), + json!({ + "name": "gateway_status", + "description": "Show pipeline status for the active project by proxying the get_pipeline_status tool call.", + "inputSchema": { + "type": "object", + "properties": {} + } + }), + json!({ + "name": "gateway_health", + "description": "Health check aggregation across all registered projects. Returns the health status of every project container.", + "inputSchema": { + "type": "object", + "properties": {} + } + }), + json!({ + "name": "init_project", + "description": "Initialize a new huskies project at the given path by scaffolding .huskies/ and related files — the same as running `huskies init `. Prefer this tool over asking the user to run the CLI. If `name` and `url` are supplied the project is also registered in projects.toml so switch_project can reach it immediately.", + "inputSchema": { + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "Absolute filesystem path to the project directory to initialise. The directory is created if it does not exist." + }, + "name": { + "type": "string", + "description": "Optional: short name to register the project under in projects.toml (e.g. 'my-app'). Requires `url`." + }, + "url": { + "type": "string", + "description": "Optional: base URL of the huskies container that will serve this project (e.g. 'http://my-app:3001'). Required when `name` is given." + } + }, + "required": ["path"] + } + }), + json!({ + "name": "aggregate_pipeline_status", + "description": "Fetch pipeline status from ALL registered projects in parallel and return an aggregated report. For each project: stage counts (backlog/current/qa/merge/done) and a list of blocked or failing items with triage detail. Unreachable projects are included with an error state rather than failing the whole call.", + "inputSchema": { + "type": "object", + "properties": {} + } + }), + ] +} + +// ── MCP POST handler ───────────────────────────────────────────────────────── + +/// Main MCP POST handler for the gateway. Intercepts gateway-specific tools and +/// proxies everything else to the active project's container. +#[handler] +pub async fn gateway_mcp_post_handler( + req: &Request, + body: Body, + state: Data<&Arc>, +) -> Response { + let content_type = req.header("content-type").unwrap_or(""); + if !content_type.is_empty() && !content_type.contains("application/json") { + return to_json_response(JsonRpcResponse::error( + None, + -32700, + "Unsupported Content-Type; expected application/json".into(), + )); + } + + let bytes = match body.into_bytes().await { + Ok(b) => b, + Err(_) => { + return to_json_response(JsonRpcResponse::error(None, -32700, "Parse error".into())); + } + }; + + let rpc: JsonRpcRequest = match serde_json::from_slice(&bytes) { + Ok(r) => r, + Err(_) => { + return to_json_response(JsonRpcResponse::error(None, -32700, "Parse error".into())); + } + }; + + if rpc.jsonrpc != "2.0" { + return to_json_response(JsonRpcResponse::error( + rpc.id, + -32600, + "Invalid JSON-RPC version".into(), + )); + } + + if rpc.id.is_none() || rpc.id.as_ref() == Some(&Value::Null) { + if rpc.method.starts_with("notifications/") { + return Response::builder() + .status(StatusCode::ACCEPTED) + .body(Body::empty()); + } + return to_json_response(JsonRpcResponse::error(None, -32600, "Missing id".into())); + } + + match rpc.method.as_str() { + "initialize" => to_json_response(handle_initialize(rpc.id)), + "tools/list" => match handle_tools_list(&state, rpc.id.clone()).await { + Ok(resp) => to_json_response(resp), + Err(e) => to_json_response(JsonRpcResponse::error(rpc.id, -32603, e)), + }, + "tools/call" => { + let tool_name = rpc + .params + .get("name") + .and_then(|v| v.as_str()) + .unwrap_or(""); + + if GATEWAY_TOOLS.contains(&tool_name) { + to_json_response( + handle_gateway_tool(tool_name, &rpc.params, &state, rpc.id.clone()).await, + ) + } else { + proxy_and_respond(&state, &bytes, rpc.id).await + } + } + _ => proxy_and_respond(&state, &bytes, rpc.id).await, + } +} + +/// Proxy a request to the active project and format the response. +async fn proxy_and_respond(state: &GatewayState, bytes: &[u8], id: Option) -> Response { + let url = match state.active_url().await { + Ok(u) => u, + Err(e) => { + return to_json_response(JsonRpcResponse::error(id, -32603, e.to_string())); + } + }; + match gateway::io::proxy_mcp_call(&state.client, &url, bytes).await { + Ok(resp_body) => Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(resp_body)), + Err(e) => to_json_response(JsonRpcResponse::error( + id, + -32603, + format!("proxy error: {e}"), + )), + } +} + +/// GET handler — method not allowed. +#[handler] +pub async fn gateway_mcp_get_handler() -> Response { + Response::builder() + .status(StatusCode::METHOD_NOT_ALLOWED) + .body(Body::empty()) +} + +// ── Protocol handlers ──────────────────────────────────────────────────────── + +fn handle_initialize(id: Option) -> JsonRpcResponse { + JsonRpcResponse::success( + id, + json!({ + "protocolVersion": "2025-03-26", + "capabilities": { "tools": {} }, + "serverInfo": { + "name": "huskies-gateway", + "version": "1.0.0" + } + }), + ) +} + +/// Fetch tools/list from the active project and merge in gateway tools. +async fn handle_tools_list( + state: &GatewayState, + id: Option, +) -> Result { + let url = state.active_url().await.map_err(|e| e.to_string())?; + + let resp_json = gateway::io::fetch_tools_list(&state.client, &url).await?; + + let mut tools: Vec = resp_json + .get("result") + .and_then(|r| r.get("tools")) + .and_then(|t| t.as_array()) + .cloned() + .unwrap_or_default(); + + let mut all_tools = gateway_tool_definitions(); + all_tools.append(&mut tools); + + Ok(JsonRpcResponse::success(id, json!({ "tools": all_tools }))) +} + +// ── Gateway tool dispatch ──────────────────────────────────────────────────── + +/// Dispatch a gateway-specific tool call. +async fn handle_gateway_tool( + tool_name: &str, + params: &Value, + state: &GatewayState, + id: Option, +) -> JsonRpcResponse { + match tool_name { + "switch_project" => handle_switch_project_tool(params, state, id).await, + "gateway_status" => handle_gateway_status_tool(state, id).await, + "gateway_health" => handle_gateway_health_tool(state, id).await, + "init_project" => handle_init_project_tool(params, state, id).await, + "aggregate_pipeline_status" => handle_aggregate_pipeline_status_tool(state, id).await, + _ => JsonRpcResponse::error(id, -32601, format!("Unknown gateway tool: {tool_name}")), + } +} + +async fn handle_switch_project_tool( + params: &Value, + state: &GatewayState, + id: Option, +) -> JsonRpcResponse { + let project = params + .get("arguments") + .and_then(|a| a.get("project")) + .or_else(|| params.get("project")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + + match gateway::switch_project(state, project).await { + Ok(url) => JsonRpcResponse::success( + id, + json!({ + "content": [{ + "type": "text", + "text": format!("Switched to project '{project}' ({url})") + }] + }), + ), + Err(e) => JsonRpcResponse::error(id, -32602, e.to_string()), + } +} + +async fn handle_gateway_status_tool(state: &GatewayState, id: Option) -> JsonRpcResponse { + let active = state.active_project.read().await.clone(); + let url = match state.active_url().await { + Ok(u) => u, + Err(e) => return JsonRpcResponse::error(id.clone(), -32603, e.to_string()), + }; + + match gateway::io::fetch_pipeline_status_for_project(&state.client, &url).await { + Ok(upstream) => { + let pipeline = upstream.get("result").cloned().unwrap_or(json!(null)); + JsonRpcResponse::success( + id, + json!({ + "content": [{ + "type": "text", + "text": format!( + "Pipeline status for '{active}':\n{}", + serde_json::to_string_pretty(&pipeline).unwrap_or_default() + ) + }] + }), + ) + } + Err(e) => JsonRpcResponse::error(id, -32603, e), + } +} + +async fn handle_gateway_health_tool(state: &GatewayState, id: Option) -> JsonRpcResponse { + let mut results = BTreeMap::new(); + + let project_entries: Vec<(String, String)> = state + .projects + .read() + .await + .iter() + .map(|(n, e)| (n.clone(), e.url.clone())) + .collect(); + for (name, url) in &project_entries { + let status = match gateway::io::check_project_health(&state.client, url).await { + Ok(true) => "healthy".to_string(), + Ok(false) => "unhealthy".to_string(), + Err(e) => e, + }; + results.insert(name.clone(), status); + } + + let active = state.active_project.read().await.clone(); + JsonRpcResponse::success( + id, + json!({ + "content": [{ + "type": "text", + "text": format!( + "Health check (active: '{active}'):\n{}", + results.iter() + .map(|(name, status)| format!(" {name}: {status}")) + .collect::>() + .join("\n") + ) + }] + }), + ) +} + +async fn handle_init_project_tool( + params: &Value, + state: &GatewayState, + id: Option, +) -> JsonRpcResponse { + let args = params.get("arguments").unwrap_or(params); + + let path_str = args.get("path").and_then(|v| v.as_str()).unwrap_or(""); + let name = args.get("name").and_then(|v| v.as_str()); + let url = args.get("url").and_then(|v| v.as_str()); + + match gateway::init_project(state, path_str, name, url).await { + Ok(registered_name) => { + let next_steps = if let Some(ref n) = registered_name { + format!( + "Project registered as '{n}' in projects.toml.\n\ + Next steps:\n\ + 1. Start a huskies server at '{path_str}' \ + (e.g. `huskies {path_str}` or via Docker).\n\ + 2. Call switch_project with name='{n}' to make it active.\n\ + 3. Call wizard_status to begin the setup wizard." + ) + } else { + format!( + "Next steps:\n\ + 1. Start a huskies server at '{path_str}' \ + (e.g. `huskies {path_str}` or via Docker).\n\ + 2. Register the project: call init_project again with name and url \ + parameters, or add it to projects.toml manually.\n\ + 3. Call switch_project and then wizard_status to begin the setup wizard.\n\n\ + Note: wizard_* MCP tools require a running huskies server for the project." + ) + }; + + JsonRpcResponse::success( + id, + json!({ + "content": [{ + "type": "text", + "text": format!("Successfully initialised huskies project at '{path_str}'.\n\n{next_steps}") + }] + }), + ) + } + Err(e) => { + let code = match &e { + gateway::Error::Config(_) => -32602, + gateway::Error::DuplicateToken(_) => -32602, + _ => -32603, + }; + JsonRpcResponse::error(id, code, e.to_string()) + } + } +} + +async fn handle_aggregate_pipeline_status_tool( + state: &GatewayState, + id: Option, +) -> JsonRpcResponse { + let project_urls: BTreeMap = state + .projects + .read() + .await + .iter() + .map(|(name, entry)| (name.clone(), entry.url.clone())) + .collect(); + + let statuses = + gateway::io::fetch_all_project_pipeline_statuses(&project_urls, &state.client).await; + let active = state.active_project.read().await.clone(); + + JsonRpcResponse::success( + id, + json!({ + "content": [{ + "type": "text", + "text": format!( + "Aggregate pipeline status (active: '{active}'):\n{}", + serde_json::to_string_pretty(&statuses).unwrap_or_default() + ) + }], + "projects": statuses, + "active": active, + }), + ) +} diff --git a/server/src/http/gateway/mod.rs b/server/src/http/gateway/mod.rs new file mode 100644 index 00000000..4d8e5034 --- /dev/null +++ b/server/src/http/gateway/mod.rs @@ -0,0 +1,24 @@ +//! Gateway HTTP handlers — thin transport shells for the gateway service. +//! +//! Each handler calls `service::gateway::*` for business logic and formats +//! the response. No inline business logic, no `reqwest`, no filesystem access. +//! +//! Submodules: +//! - [`jsonrpc`] — JSON-RPC 2.0 types and helpers +//! - [`mcp`] — MCP POST/GET handlers and gateway tool dispatch +//! - [`websocket`] — WebSocket handlers (CRDT-sync, event push) +//! - [`rest`] — REST API handlers (agents, projects, bot config, pipeline) + +mod jsonrpc; +mod mcp; +mod rest; +mod websocket; + +pub use mcp::{gateway_mcp_get_handler, gateway_mcp_post_handler}; +pub use rest::{ + gateway_add_project_handler, gateway_all_pipeline_handler, gateway_api_handler, + gateway_assign_agent_handler, gateway_bot_config_get_handler, gateway_bot_config_page_handler, + gateway_bot_config_save_handler, gateway_generate_token_handler, gateway_list_agents_handler, + gateway_mode_handler, gateway_remove_project_handler, gateway_switch_handler, +}; +pub use websocket::{gateway_crdt_sync_handler, gateway_event_push_handler}; diff --git a/server/src/http/gateway/rest.rs b/server/src/http/gateway/rest.rs new file mode 100644 index 00000000..8d35ae4b --- /dev/null +++ b/server/src/http/gateway/rest.rs @@ -0,0 +1,493 @@ +//! REST HTTP handlers for the gateway: agents, projects, bot configuration, and pipeline. + +use crate::service::gateway::{self, GatewayState}; +use poem::handler; +use poem::http::StatusCode; +use poem::web::Path as PoemPath; +use poem::web::{Data, Json}; +use poem::{Body, Response}; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, json}; +use std::collections::BTreeMap; +use std::sync::Arc; + +// ── Agent REST handlers ─────────────────────────────────────────────────────── + +/// `GET /gateway/mode` — returns `{"mode":"gateway"}`. +#[handler] +pub async fn gateway_mode_handler() -> Response { + let body = json!({ "mode": "gateway" }); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) +} + +/// `POST /gateway/tokens` — generate a one-time join token. +#[handler] +pub async fn gateway_generate_token_handler(state: Data<&Arc>) -> Response { + let token = gateway::generate_join_token(&state).await; + let body = json!({ "token": token }); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) +} + +/// `GET /gateway/agents` — list all alive build agents registered in the CRDT. +#[handler] +pub async fn gateway_list_agents_handler(_state: Data<&Arc>) -> Response { + let agents = gateway::list_agents(); + let body = serde_json::to_vec(&agents).unwrap_or_default(); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(body)) +} + +/// Request body for assigning an agent to a project. +#[derive(Deserialize)] +struct AssignAgentRequest { + project: Option, +} + +/// `POST /gateway/agents/:id/assign` — assign (or unassign) an agent to a project. +#[handler] +pub async fn gateway_assign_agent_handler( + PoemPath(id): PoemPath, + body: Json, + state: Data<&Arc>, +) -> Response { + match gateway::assign_agent(&state, &id, body.0.project).await { + Ok(agent) => { + let body = serde_json::to_vec(&agent).unwrap_or_default(); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(body)) + } + Err(gateway::Error::ProjectNotFound(msg)) => Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(msg)), + Err(_) => Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("agent not found")), + } +} + +// ── Gateway Web UI ──────────────────────────────────────────────────────────── + +/// `GET /api/gateway` — returns the list of registered projects and the active project. +#[handler] +pub async fn gateway_api_handler(state: Data<&Arc>) -> Response { + let active = state.active_project.read().await.clone(); + let projects: Vec = state + .projects + .read() + .await + .iter() + .map(|(name, entry)| { + json!({ + "name": name, + "url": entry.url, + }) + }) + .collect(); + + let body = json!({ "active": active, "projects": projects }); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) +} + +/// Request body for `POST /api/gateway/switch`. +#[derive(Deserialize)] +struct SwitchRequest { + project: String, +} + +/// `POST /api/gateway/switch` — switch the active project. +#[handler] +pub async fn gateway_switch_handler( + state: Data<&Arc>, + body: Json, +) -> Response { + match gateway::switch_project(&state, &body.project).await { + Ok(_) => { + let body_val = json!({ "ok": true }); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from( + serde_json::to_vec(&body_val).unwrap_or_default(), + )) + } + Err(e) => { + let body_val = json!({ "ok": false, "error": e.to_string() }); + Response::builder() + .status(StatusCode::BAD_REQUEST) + .header("Content-Type", "application/json") + .body(Body::from( + serde_json::to_vec(&body_val).unwrap_or_default(), + )) + } + } +} + +// ── Project management API ──────────────────────────────────────────────────── + +/// Request body for adding a new project. +#[derive(Deserialize)] +struct AddProjectRequest { + name: String, + url: String, +} + +/// `POST /api/gateway/projects` — add a new project. +#[handler] +pub async fn gateway_add_project_handler( + state: Data<&Arc>, + body: Json, +) -> Response { + match gateway::add_project(&state, &body.name, &body.url).await { + Ok(()) => { + let name = body.0.name.trim().to_string(); + let url = body.0.url.trim().to_string(); + let body_val = json!({ "name": name, "url": url }); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from( + serde_json::to_vec(&body_val).unwrap_or_default(), + )) + } + Err(gateway::Error::DuplicateToken(_)) => Response::builder() + .status(StatusCode::CONFLICT) + .body(Body::from(format!( + "project '{}' already exists", + body.0.name.trim() + ))), + Err(e) => Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(e.to_string())), + } +} + +/// `DELETE /api/gateway/projects/:name` — remove a project. +#[handler] +pub async fn gateway_remove_project_handler( + PoemPath(name): PoemPath, + state: Data<&Arc>, +) -> Response { + match gateway::remove_project(&state, &name).await { + Ok(()) => Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty()), + Err(gateway::Error::ProjectNotFound(msg)) => Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from(msg)), + Err(e) => Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(e.to_string())), + } +} + +// ── Bot configuration API ───────────────────────────────────────────────────── + +/// Request/response body for the bot configuration API. +#[derive(Deserialize, Serialize, Default)] +pub(crate) struct BotConfigPayload { + transport: String, + homeserver: Option, + username: Option, + password: Option, + slack_bot_token: Option, + slack_signing_secret: Option, +} + +/// `GET /api/gateway/bot-config` — return current bot.toml fields as JSON. +#[handler] +pub async fn gateway_bot_config_get_handler(state: Data<&Arc>) -> Response { + let fields = gateway::io::read_bot_config_raw(&state.config_dir); + let payload = BotConfigPayload { + transport: fields.transport, + homeserver: fields.homeserver, + username: fields.username, + password: fields.password, + slack_bot_token: fields.slack_bot_token, + slack_signing_secret: fields.slack_signing_secret, + }; + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(serde_json::to_vec(&payload).unwrap_or_default())) +} + +/// `POST /api/gateway/bot-config` — write new bot.toml and restart the bot. +#[handler] +pub async fn gateway_bot_config_save_handler( + state: Data<&Arc>, + body: Json, +) -> Response { + let content = gateway::config::serialize_bot_config( + &body.transport, + body.homeserver.as_deref(), + body.username.as_deref(), + body.password.as_deref(), + body.slack_bot_token.as_deref(), + body.slack_signing_secret.as_deref(), + ); + + match gateway::save_bot_config_and_restart(&state, &content).await { + Ok(()) => { + let ok = json!({ "ok": true }); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(serde_json::to_vec(&ok).unwrap_or_default())) + } + Err(e) => { + let err = json!({ "ok": false, "error": e.to_string() }); + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header("Content-Type", "application/json") + .body(Body::from(serde_json::to_vec(&err).unwrap_or_default())) + } + } +} + +/// `GET /api/gateway/pipeline` — fetch pipeline status from all registered projects. +#[handler] +pub async fn gateway_all_pipeline_handler(state: Data<&Arc>) -> Response { + let project_urls: BTreeMap = state + .projects + .read() + .await + .iter() + .map(|(n, e)| (n.clone(), e.url.clone())) + .collect(); + + let results = + gateway::io::fetch_all_project_pipeline_statuses(&project_urls, &state.client).await; + + let active = state.active_project.read().await.clone(); + let body = json!({ "active": active, "projects": results }); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) +} + +// ── Bot config page ─────────────────────────────────────────────────────────── + +/// Self-contained HTML page for bot configuration. +const GATEWAY_BOT_CONFIG_HTML: &str = r#" + + + + +Bot Configuration — Huskies Gateway + + + +
+
+ ← Gateway + +

Bot Configuration

+
+
+ + +
+
+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ + + +"#; + +/// Serve the bot configuration HTML page at `GET /bot-config`. +#[handler] +pub async fn gateway_bot_config_page_handler() -> Response { + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "text/html; charset=utf-8") + .body(Body::from(GATEWAY_BOT_CONFIG_HTML)) +} diff --git a/server/src/http/gateway/websocket.rs b/server/src/http/gateway/websocket.rs new file mode 100644 index 00000000..22dfa3de --- /dev/null +++ b/server/src/http/gateway/websocket.rs @@ -0,0 +1,260 @@ +//! WebSocket handlers for CRDT-sync agent registration and event push. + +use crate::service::gateway::{self, GatewayState}; +use futures::{SinkExt, StreamExt}; +use poem::handler; +use poem::http::StatusCode; +use poem::web::websocket::{Message as WsMessage, WebSocket}; +use poem::web::{Data, Query}; +use serde::Deserialize; +use std::sync::Arc; + +// ── CRDT-sync WebSocket handler (agent registration) ───────────────────────── + +/// Query parameters accepted on the `/crdt-sync` WebSocket upgrade in gateway mode. +#[derive(Deserialize)] +struct GatewayCrdtSyncParams { + /// One-time join token from `POST /gateway/tokens`. + token: Option, + /// Human-readable label for the connecting agent. + label: Option, + /// WebSocket address the agent exposes for mesh connections + /// (e.g. `ws://0.0.0.0:3002/crdt-sync`). + address: Option, +} + +/// `GET /crdt-sync` — gateway-side WebSocket endpoint for build agent registration. +/// +/// # Authentication +/// +/// The connecting node must supply a valid one-time join token via the `token` +/// query parameter, obtained from `POST /gateway/tokens`. The token is +/// validated pre-upgrade and consumed by [`gateway::register_agent`] once the +/// WebSocket is established. +/// +/// # Protocol +/// +/// 1. Pre-upgrade: token existence is verified (read-only). +/// 2. Post-upgrade: [`gateway::register_agent`] writes the node to the CRDT and +/// consumes the token. +/// 3. Keepalive: the server sends Ping frames every 30 s; each Pong resets the +/// pong deadline and calls [`gateway::heartbeat_agent`]. +/// 4. Disconnect: on clean close or pong timeout, [`gateway::remove_agent`] +/// tombstones the node (`alive = false`). +#[handler] +pub async fn gateway_crdt_sync_handler( + ws: WebSocket, + state: Data<&Arc>, + Query(params): Query, + remote_addr: &poem::web::RemoteAddr, +) -> poem::Response { + use crate::crdt_sync::{PING_INTERVAL_SECS, PONG_TIMEOUT_SECS}; + + // ── Pre-upgrade: validate token exists ───────────────────────────── + let token = match params.token { + Some(t) if !t.is_empty() => t, + _ => { + return poem::Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body("token query parameter required"); + } + }; + + { + let tokens = state.pending_tokens.read().await; + if !tokens.contains_key(&token) { + return poem::Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body("invalid or already-used join token"); + } + } + + let label = params + .label + .filter(|l| !l.is_empty()) + .unwrap_or_else(|| "build-agent".to_string()); + let address = params + .address + .filter(|a| !a.is_empty()) + .unwrap_or_else(|| remote_addr.to_string()); + + // ── WebSocket upgrade ─────────────────────────────────────────────── + use poem::IntoResponse as _; + let state = Arc::clone(&state); + ws.on_upgrade(move |socket| async move { + // Register the agent — consumes the one-time token. + let node = match gateway::register_agent(&state, &token, label, address).await { + Ok(n) => n, + Err(e) => { + crate::slog!("[gateway/crdt-sync] Registration failed: {e}"); + return; + } + }; + let node_id = node.node_id.clone(); + crate::slog!( + "[gateway/crdt-sync] Agent '{}' registered, node_id={:.12}…", + node.label.as_deref().unwrap_or("?"), + &node_id + ); + + let (mut sink, mut stream) = socket.split(); + + let mut pong_deadline = + tokio::time::Instant::now() + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); + let mut ping_ticker = tokio::time::interval_at( + tokio::time::Instant::now() + std::time::Duration::from_secs(PING_INTERVAL_SECS), + std::time::Duration::from_secs(PING_INTERVAL_SECS), + ); + + loop { + tokio::select! { + _ = ping_ticker.tick() => { + if tokio::time::Instant::now() >= pong_deadline { + crate::slog!( + "[gateway/crdt-sync] No pong from {:.12}… in {}s; disconnecting", + &node_id, PONG_TIMEOUT_SECS + ); + break; + } + if sink.send(WsMessage::Ping(vec![])).await.is_err() { + break; + } + } + frame = stream.next() => { + match frame { + Some(Ok(WsMessage::Pong(_))) => { + pong_deadline = tokio::time::Instant::now() + + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); + gateway::heartbeat_agent(&node_id); + } + Some(Ok(WsMessage::Ping(data))) => { + let _ = sink.send(WsMessage::Pong(data)).await; + } + Some(Ok(WsMessage::Close(_))) | None => break, + _ => {} + } + } + } + } + + gateway::remove_agent(&node_id); + crate::slog!( + "[gateway/crdt-sync] Agent {:.12}… disconnected and tombstoned", + &node_id + ); + }) + .into_response() +} + +// ── Event-push WebSocket handler ───────────────────────────────────────────── + +/// Query parameters accepted on the `/gateway/events/push` WebSocket upgrade. +#[derive(Deserialize)] +struct EventPushQueryParams { + /// One-time join token generated by `POST /gateway/tokens`. + token: Option, + /// The project name this node represents (e.g. `"huskies"`). + project: Option, +} + +/// `GET /gateway/events/push` — WebSocket endpoint for project nodes to push +/// [`StatusEvent`] frames to the gateway. +/// +/// # Authentication +/// +/// The connecting node must supply a valid one-time join token via the `token` +/// query parameter, obtained from `POST /gateway/tokens`. The token is +/// consumed on the first successful upgrade — the connection itself is then +/// kept open indefinitely. +/// +/// # Protocol +/// +/// Each message from the project node must be a JSON-encoded +/// [`crate::service::events::StoredEvent`]. The gateway fan-outs the event +/// (tagged with the project name) to all current local subscribers. +/// +/// The server does not send data back; clients should treat any close frame +/// as a signal to reconnect with exponential back-off (see docs/gateway-protocol.html). +/// +/// # Reconnect-with-backoff +/// +/// Project nodes MUST reconnect on disconnect. Recommended policy: +/// +/// - Initial retry delay: **1 s** +/// - Back-off multiplier: **2×** per attempt +/// - Max delay cap: **60 s** +/// - Jitter: add ±10 % to the delay to avoid thundering herds +#[handler] +pub async fn gateway_event_push_handler( + ws: WebSocket, + state: Data<&Arc>, + Query(params): Query, +) -> poem::Response { + // ── Authentication (pre-upgrade) ───────────────────────────────────── + let token = match params.token { + Some(t) if !t.is_empty() => t, + _ => { + return poem::Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body("token query parameter required"); + } + }; + + let project = match params.project { + Some(p) if !p.is_empty() => p, + _ => { + return poem::Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("project query parameter required"); + } + }; + + // Validate and consume the one-time token. + { + let mut tokens = state.pending_tokens.write().await; + if !tokens.contains_key(&token) { + return poem::Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body("invalid or already-used join token"); + } + tokens.remove(&token); + } + + // ── WebSocket upgrade ──────────────────────────────────────────────── + use poem::IntoResponse as _; + let state = Arc::clone(&state); + ws.on_upgrade(move |socket| async move { + let (_, mut stream) = socket.split(); + + crate::slog!( + "[gateway] Project node '{}' connected to event-push endpoint", + project + ); + + while let Some(msg) = stream.next().await { + let text = match msg { + Ok(WsMessage::Text(t)) => t, + Ok(WsMessage::Close(_)) | Err(_) => break, + _ => continue, + }; + + match serde_json::from_str::(&text) { + Ok(event) => { + gateway::broadcast_status_event(&state, project.clone(), event); + } + Err(e) => { + crate::slog!( + "[gateway] event-push: invalid frame from '{}': {e}", + project + ); + } + } + } + + crate::slog!( + "[gateway] Project node '{}' disconnected from event-push endpoint", + project + ); + }) + .into_response() +}