//! Multi-project gateway — proxies MCP calls to per-project Docker containers. //! //! When `huskies --gateway` is used, the server starts in gateway mode: it reads //! a `projects.toml` config that maps project names to container URLs, maintains //! an "active project" selection, and proxies all MCP tool calls to the active //! project's container. Gateway-specific tools allow switching projects, querying //! status, and aggregating health checks across all registered projects. use poem::EndpointExt; use poem::handler; use poem::http::StatusCode; use poem::web::Path as PoemPath; use poem::web::{Data, Json}; use poem::{Body, Request, Response}; use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use std::collections::BTreeMap; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::sync::Mutex as TokioMutex; use tokio::sync::RwLock; use uuid::Uuid; // Re-export active_project type alias for clarity in gateway bot helpers. type ActiveProject = Arc>; // ── Config ─────────────────────────────────────────────────────────── /// A single project entry in `projects.toml`. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct ProjectEntry { /// Base URL of the project's huskies container (e.g. `http://localhost:3001`). pub url: String, } /// Top-level `projects.toml` config. #[derive(Debug, Clone, Deserialize, Serialize)] pub struct GatewayConfig { /// Map of project name → container URL. #[serde(default)] pub projects: BTreeMap, } impl GatewayConfig { /// Load gateway config from a `projects.toml` file. pub fn load(path: &Path) -> Result { let contents = std::fs::read_to_string(path) .map_err(|e| format!("cannot read {}: {e}", path.display()))?; toml::from_str(&contents).map_err(|e| format!("invalid projects.toml: {e}")) } } // ── Agent join types ───────────────────────────────────────────────── /// A build agent that has registered with this gateway. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct JoinedAgent { /// Unique ID assigned by the gateway on registration. pub id: String, /// Human-readable label provided by the agent (e.g. `build-agent-abc123`). pub label: String, /// The agent's CRDT-sync WebSocket address (e.g. `ws://host:3001/crdt-sync`). pub address: String, /// Unix timestamp when the agent registered. pub registered_at: f64, /// Unix timestamp of the last heartbeat from this agent. Defaults to `registered_at` /// for agents loaded from persisted state that predate the heartbeat feature. #[serde(default)] pub last_seen: f64, /// Project this agent is assigned to, if any. #[serde(default, skip_serializing_if = "Option::is_none")] pub assigned_project: Option, } /// A one-time join token that has been generated but not yet consumed. struct PendingToken { #[allow(dead_code)] created_at: f64, } /// Request body sent by a build agent when registering with the gateway. #[derive(Deserialize)] struct RegisterAgentRequest { token: String, label: String, address: String, } /// Request body for assigning or reassigning an agent to a project. /// /// Send `{"project": "my-project"}` to assign, or `{"project": null}` to unassign. #[derive(Deserialize)] struct AssignAgentRequest { project: Option, } // ── Gateway state ──────────────────────────────────────────────────── /// Shared gateway state threaded through HTTP handlers. #[derive(Clone)] pub struct GatewayState { /// The live set of registered projects (initially loaded from `projects.toml`). pub projects: Arc>>, /// The currently active project name. pub active_project: Arc>, /// HTTP client for proxying requests to project containers. pub client: Client, /// Build agents that have joined this gateway. pub joined_agents: Arc>>, /// One-time join tokens that have been issued but not yet consumed. pending_tokens: Arc>>, /// Directory containing `projects.toml` and the `.huskies/` subfolder. pub config_dir: PathBuf, /// HTTP port the gateway is listening on. pub port: u16, /// Abort handle for the running Matrix bot task (if any). /// Stored so the bot can be restarted when credentials change. pub bot_handle: Arc>>, } /// Load persisted agents from `/gateway_agents.json`. /// Returns an empty list if the file does not exist or cannot be parsed. fn load_agents(config_dir: &Path) -> Vec { let path = config_dir.join("gateway_agents.json"); match std::fs::read(&path) { Ok(data) => serde_json::from_slice(&data).unwrap_or_default(), Err(_) => Vec::new(), } } /// Persist the current projects map to `/projects.toml`. /// Silently ignores write errors or skips when `config_dir` is empty. async fn save_config(projects: &BTreeMap, config_dir: &Path) { if config_dir.as_os_str().is_empty() { return; } let path = config_dir.join("projects.toml"); let config = GatewayConfig { projects: projects.clone(), }; if let Ok(data) = toml::to_string_pretty(&config) { let _ = tokio::fs::write(&path, data).await; } } /// Persist the current agent list to `/gateway_agents.json`. /// Silently ignores write errors (e.g. read-only filesystem or empty path). async fn save_agents(agents: &[JoinedAgent], config_dir: &Path) { if config_dir == Path::new("") { return; } let path = config_dir.join("gateway_agents.json"); if let Ok(data) = serde_json::to_vec_pretty(agents) { let _ = tokio::fs::write(&path, data).await; } } impl GatewayState { /// Create a new gateway state from a config and config directory. /// /// The first project in the config becomes the active project by default. /// Previously registered agents are loaded from `gateway_agents.json` in /// `config_dir` if the file exists. pub fn new(config: GatewayConfig, config_dir: PathBuf, port: u16) -> Result { if config.projects.is_empty() { return Err("projects.toml must define at least one project".to_string()); } let first = config.projects.keys().next().unwrap().clone(); let agents = load_agents(&config_dir); Ok(Self { projects: Arc::new(RwLock::new(config.projects)), active_project: Arc::new(RwLock::new(first)), client: Client::new(), joined_agents: Arc::new(RwLock::new(agents)), pending_tokens: Arc::new(RwLock::new(HashMap::new())), config_dir, port, bot_handle: Arc::new(TokioMutex::new(None)), }) } /// Get the URL of the currently active project. async fn active_url(&self) -> Result { let name = self.active_project.read().await.clone(); self.projects .read() .await .get(&name) .map(|p| p.url.clone()) .ok_or_else(|| format!("active project '{name}' not found in config")) } } // ── MCP proxy handler ──────────────────────────────────────────────── /// JSON-RPC request (duplicated here to keep the gateway self-contained). #[derive(Deserialize)] struct JsonRpcRequest { jsonrpc: String, id: Option, method: String, #[serde(default)] params: Value, } /// JSON-RPC response. #[derive(Serialize)] struct JsonRpcResponse { jsonrpc: &'static str, #[serde(skip_serializing_if = "Option::is_none")] id: Option, #[serde(skip_serializing_if = "Option::is_none")] result: Option, #[serde(skip_serializing_if = "Option::is_none")] error: Option, } #[derive(Serialize)] struct JsonRpcError { code: i64, message: String, } impl JsonRpcResponse { fn success(id: Option, result: Value) -> Self { Self { jsonrpc: "2.0", id, result: Some(result), error: None, } } fn error(id: Option, code: i64, message: String) -> Self { Self { jsonrpc: "2.0", id, result: None, error: Some(JsonRpcError { code, message }), } } } fn to_json_response(resp: JsonRpcResponse) -> Response { let body = serde_json::to_vec(&resp).unwrap_or_default(); Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(Body::from(body)) } /// Gateway-specific MCP tools exposed alongside the proxied tools. const GATEWAY_TOOLS: &[&str] = &["switch_project", "gateway_status", "gateway_health"]; /// Main MCP POST handler for the gateway. Intercepts gateway-specific tools and /// proxies everything else to the active project's container. #[handler] pub async fn gateway_mcp_post_handler( req: &Request, body: Body, state: Data<&Arc>, ) -> Response { let content_type = req.header("content-type").unwrap_or(""); if !content_type.is_empty() && !content_type.contains("application/json") { return to_json_response(JsonRpcResponse::error( None, -32700, "Unsupported Content-Type; expected application/json".into(), )); } let bytes = match body.into_bytes().await { Ok(b) => b, Err(_) => { return to_json_response(JsonRpcResponse::error(None, -32700, "Parse error".into())); } }; let rpc: JsonRpcRequest = match serde_json::from_slice(&bytes) { Ok(r) => r, Err(_) => { return to_json_response(JsonRpcResponse::error(None, -32700, "Parse error".into())); } }; if rpc.jsonrpc != "2.0" { return to_json_response(JsonRpcResponse::error( rpc.id, -32600, "Invalid JSON-RPC version".into(), )); } // Accept notifications silently. if rpc.id.is_none() || rpc.id.as_ref() == Some(&Value::Null) { if rpc.method.starts_with("notifications/") { return Response::builder() .status(StatusCode::ACCEPTED) .body(Body::empty()); } return to_json_response(JsonRpcResponse::error(None, -32600, "Missing id".into())); } match rpc.method.as_str() { "initialize" => to_json_response(handle_initialize(rpc.id)), "tools/list" => { // Merge gateway tools with proxied tools from the active project. match merge_tools_list(&state, rpc.id.clone()).await { Ok(resp) => to_json_response(resp), Err(e) => to_json_response(JsonRpcResponse::error(rpc.id, -32603, e)), } } "tools/call" => { let tool_name = rpc .params .get("name") .and_then(|v| v.as_str()) .unwrap_or(""); if GATEWAY_TOOLS.contains(&tool_name) { to_json_response(handle_gateway_tool(tool_name, &rpc.params, &state).await) } else { // Proxy to active project's container. match proxy_mcp_call(&state, &bytes).await { Ok(resp_body) => Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(Body::from(resp_body)), Err(e) => to_json_response(JsonRpcResponse::error( rpc.id, -32603, format!("proxy error: {e}"), )), } } } _ => { // Proxy unknown methods too. match proxy_mcp_call(&state, &bytes).await { Ok(resp_body) => Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(Body::from(resp_body)), Err(e) => to_json_response(JsonRpcResponse::error( rpc.id, -32603, format!("proxy error: {e}"), )), } } } } /// GET handler — method not allowed (matches the regular MCP endpoint behavior). #[handler] pub async fn gateway_mcp_get_handler() -> Response { Response::builder() .status(StatusCode::METHOD_NOT_ALLOWED) .body(Body::empty()) } // ── Protocol handlers ──────────────────────────────────────────────── fn handle_initialize(id: Option) -> JsonRpcResponse { JsonRpcResponse::success( id, json!({ "protocolVersion": "2025-03-26", "capabilities": { "tools": {} }, "serverInfo": { "name": "huskies-gateway", "version": "1.0.0" } }), ) } /// Gateway tool definitions. fn gateway_tool_definitions() -> Vec { vec![ json!({ "name": "switch_project", "description": "Switch the active project. All subsequent MCP tool calls will be proxied to this project's container.", "inputSchema": { "type": "object", "properties": { "project": { "type": "string", "description": "Name of the project to switch to (must exist in projects.toml)" } }, "required": ["project"] } }), json!({ "name": "gateway_status", "description": "Show pipeline status for the active project by proxying the get_pipeline_status tool call.", "inputSchema": { "type": "object", "properties": {} } }), json!({ "name": "gateway_health", "description": "Health check aggregation across all registered projects. Returns the health status of every project container.", "inputSchema": { "type": "object", "properties": {} } }), ] } /// Fetch tools/list from the active project and merge in gateway tools. async fn merge_tools_list( state: &GatewayState, id: Option, ) -> Result { let url = state.active_url().await?; let mcp_url = format!("{}/mcp", url.trim_end_matches('/')); let rpc_body = json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {} }); let resp = state .client .post(&mcp_url) .json(&rpc_body) .send() .await .map_err(|e| format!("failed to reach {mcp_url}: {e}"))?; let resp_json: Value = resp .json() .await .map_err(|e| format!("invalid JSON from upstream: {e}"))?; let mut tools: Vec = resp_json .get("result") .and_then(|r| r.get("tools")) .and_then(|t| t.as_array()) .cloned() .unwrap_or_default(); // Prepend gateway-specific tools. let mut all_tools = gateway_tool_definitions(); all_tools.append(&mut tools); Ok(JsonRpcResponse::success(id, json!({ "tools": all_tools }))) } /// Proxy a raw MCP request body to the active project's container. async fn proxy_mcp_call(state: &GatewayState, request_bytes: &[u8]) -> Result, String> { let url = state.active_url().await?; let mcp_url = format!("{}/mcp", url.trim_end_matches('/')); let resp = state .client .post(&mcp_url) .header("Content-Type", "application/json") .body(request_bytes.to_vec()) .send() .await .map_err(|e| format!("failed to reach {mcp_url}: {e}"))?; resp.bytes() .await .map(|b| b.to_vec()) .map_err(|e| format!("failed to read response from {mcp_url}: {e}")) } // ── Gateway-specific tools ─────────────────────────────────────────── /// Dispatch a gateway-specific tool call. async fn handle_gateway_tool( tool_name: &str, params: &Value, state: &GatewayState, ) -> JsonRpcResponse { let id = None; // The caller wraps this in a proper response. match tool_name { "switch_project" => handle_switch_project(params, state).await, "gateway_status" => handle_gateway_status(state).await, "gateway_health" => handle_gateway_health(state).await, _ => JsonRpcResponse::error(id, -32601, format!("Unknown gateway tool: {tool_name}")), } } /// Switch the active project. async fn handle_switch_project(params: &Value, state: &GatewayState) -> JsonRpcResponse { let project = params .get("arguments") .and_then(|a| a.get("project")) .or_else(|| params.get("project")) .and_then(|v| v.as_str()) .unwrap_or(""); if project.is_empty() { return JsonRpcResponse::error(None, -32602, "missing required parameter: project".into()); } let url = { let projects = state.projects.read().await; if !projects.contains_key(project) { let available: Vec<&str> = projects.keys().map(|s| s.as_str()).collect(); return JsonRpcResponse::error( None, -32602, format!( "unknown project '{project}'. Available: {}", available.join(", ") ), ); } projects[project].url.clone() }; *state.active_project.write().await = project.to_string(); JsonRpcResponse::success( None, json!({ "content": [{ "type": "text", "text": format!("Switched to project '{project}' ({url})") }] }), ) } /// Show pipeline status for the active project by proxying `get_pipeline_status`. async fn handle_gateway_status(state: &GatewayState) -> JsonRpcResponse { let active = state.active_project.read().await.clone(); let url = match state.active_url().await { Ok(u) => u, Err(e) => return JsonRpcResponse::error(None, -32603, e), }; let mcp_url = format!("{}/mcp", url.trim_end_matches('/')); let rpc_body = json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "get_pipeline_status", "arguments": {} } }); match state.client.post(&mcp_url).json(&rpc_body).send().await { Ok(resp) => { match resp.json::().await { Ok(upstream) => { // Extract the result from the upstream response and wrap it. let pipeline = upstream.get("result").cloned().unwrap_or(json!(null)); JsonRpcResponse::success( None, json!({ "content": [{ "type": "text", "text": format!( "Pipeline status for '{active}':\n{}", serde_json::to_string_pretty(&pipeline).unwrap_or_default() ) }] }), ) } Err(e) => { JsonRpcResponse::error(None, -32603, format!("invalid upstream response: {e}")) } } } Err(e) => JsonRpcResponse::error(None, -32603, format!("failed to reach {mcp_url}: {e}")), } } /// Aggregate health checks across all registered projects. async fn handle_gateway_health(state: &GatewayState) -> JsonRpcResponse { let mut results = BTreeMap::new(); let project_entries: Vec<(String, String)> = state .projects .read() .await .iter() .map(|(n, e)| (n.clone(), e.url.clone())) .collect(); for (name, url) in &project_entries { let health_url = format!("{}/health", url.trim_end_matches('/')); let status = match state.client.get(&health_url).send().await { Ok(resp) => { if resp.status().is_success() { "healthy".to_string() } else { format!("unhealthy (HTTP {})", resp.status().as_u16()) } } Err(e) => format!("unreachable: {e}"), }; results.insert(name.clone(), status); } let active = state.active_project.read().await.clone(); JsonRpcResponse::success( None, json!({ "content": [{ "type": "text", "text": format!( "Health check (active: '{active}'):\n{}", results.iter() .map(|(name, status)| format!(" {name}: {status}")) .collect::>() .join("\n") ) }] }), ) } // ── Agent join handlers ─────────────────────────────────────────────── /// `GET /gateway/mode` — returns `{"mode":"gateway"}` so clients can detect gateway mode. #[handler] pub async fn gateway_mode_handler() -> Response { let body = json!({ "mode": "gateway" }); Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) } /// `POST /gateway/tokens` — generate a one-time join token for a build agent. /// /// Returns `{"token": ""}`. The token is valid until consumed by /// `POST /gateway/register` or the process restarts. #[handler] pub async fn gateway_generate_token_handler(state: Data<&Arc>) -> Response { let token = Uuid::new_v4().to_string(); let now = chrono::Utc::now().timestamp() as f64; state .pending_tokens .write() .await .insert(token.clone(), PendingToken { created_at: now }); crate::slog!("[gateway] Generated join token {:.8}…", &token); let body = json!({ "token": token }); Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) } /// `POST /gateway/register` — build agent presents its join token and registers. /// /// Expects JSON body: `{ "token": "...", "label": "...", "address": "..." }`. /// On success returns the `JoinedAgent` record. The token is consumed immediately. #[handler] pub async fn gateway_register_agent_handler( body: Body, state: Data<&Arc>, ) -> Response { let bytes = match body.into_bytes().await { Ok(b) => b, Err(_) => { return Response::builder() .status(StatusCode::BAD_REQUEST) .body(Body::from("could not read request body")); } }; let req: RegisterAgentRequest = match serde_json::from_slice(&bytes) { Ok(r) => r, Err(_) => { return Response::builder() .status(StatusCode::BAD_REQUEST) .body(Body::from("invalid JSON body")); } }; // Validate and consume the token. let mut tokens = state.pending_tokens.write().await; if !tokens.contains_key(&req.token) { return Response::builder() .status(StatusCode::UNAUTHORIZED) .body(Body::from("invalid or already-used join token")); } tokens.remove(&req.token); drop(tokens); let now = chrono::Utc::now().timestamp() as f64; let agent = JoinedAgent { id: Uuid::new_v4().to_string(), label: req.label, address: req.address, registered_at: now, last_seen: now, assigned_project: None, }; crate::slog!( "[gateway] Agent '{}' registered (id={})", agent.label, agent.id ); { let mut agents = state.joined_agents.write().await; agents.push(agent.clone()); save_agents(&agents, &state.config_dir).await; } let body = serde_json::to_vec(&agent).unwrap_or_default(); Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(Body::from(body)) } /// `GET /gateway/agents` — list all registered build agents. #[handler] pub async fn gateway_list_agents_handler(state: Data<&Arc>) -> Response { let agents = state.joined_agents.read().await.clone(); let body = serde_json::to_vec(&agents).unwrap_or_default(); Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(Body::from(body)) } /// `DELETE /gateway/agents/:id` — remove a registered build agent. #[handler] pub async fn gateway_remove_agent_handler( PoemPath(id): PoemPath, state: Data<&Arc>, ) -> Response { let removed = { let mut agents = state.joined_agents.write().await; let before = agents.len(); agents.retain(|a| a.id != id); let removed = agents.len() < before; if removed { save_agents(&agents, &state.config_dir).await; } removed }; if removed { crate::slog!("[gateway] Removed agent id={id}"); Response::builder() .status(StatusCode::NO_CONTENT) .body(Body::empty()) } else { Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from("agent not found")) } } /// `POST /gateway/agents/:id/assign` — assign or unassign an agent to a project. /// /// Body: `{ "project": "my-project" }` to assign, or `{ "project": null }` to unassign. /// Returns the updated `JoinedAgent` on success. The assignment is persisted to disk /// so it survives gateway restarts. #[handler] pub async fn gateway_assign_agent_handler( PoemPath(id): PoemPath, body: Json, state: Data<&Arc>, ) -> Response { let project = body .0 .project .and_then(|p| if p.is_empty() { None } else { Some(p) }); if let Some(ref p) = project && !state.projects.read().await.contains_key(p.as_str()) { return Response::builder() .status(StatusCode::BAD_REQUEST) .body(Body::from(format!("unknown project '{p}'"))); } let updated = { let mut agents = state.joined_agents.write().await; match agents.iter_mut().find(|a| a.id == id) { None => None, Some(a) => { a.assigned_project = project; Some(a.clone()) } } }; match updated { None => Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from("agent not found")), Some(agent) => { crate::slog!( "[gateway] Agent '{}' (id={}) assigned to {:?}", agent.label, agent.id, agent.assigned_project ); let agents = state.joined_agents.read().await.clone(); save_agents(&agents, &state.config_dir).await; let body = serde_json::to_vec(&agent).unwrap_or_default(); Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(Body::from(body)) } } } /// `POST /gateway/agents/:id/heartbeat` — update an agent's last-seen timestamp. /// /// Build agents should call this periodically (e.g. every 30 s) so the gateway /// can distinguish live agents from disconnected ones. Returns 204 No Content on /// success or 404 if the agent ID is not found. #[handler] pub async fn gateway_heartbeat_handler( PoemPath(id): PoemPath, state: Data<&Arc>, ) -> Response { let found = { let mut agents = state.joined_agents.write().await; match agents.iter_mut().find(|a| a.id == id) { None => false, Some(a) => { a.last_seen = chrono::Utc::now().timestamp() as f64; true } } }; if found { Response::builder() .status(StatusCode::NO_CONTENT) .body(Body::empty()) } else { Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from("agent not found")) } } // ── Health aggregation endpoint ────────────────────────────────────── /// HTTP GET `/health` handler for the gateway — aggregates health from all projects. #[handler] pub async fn gateway_health_handler(state: Data<&Arc>) -> Response { let mut all_healthy = true; let mut statuses = BTreeMap::new(); let project_entries: Vec<(String, String)> = state .projects .read() .await .iter() .map(|(n, e)| (n.clone(), e.url.clone())) .collect(); for (name, url) in &project_entries { let health_url = format!("{}/health", url.trim_end_matches('/')); let healthy = match state.client.get(&health_url).send().await { Ok(resp) => resp.status().is_success(), Err(_) => false, }; if !healthy { all_healthy = false; } statuses.insert(name.clone(), if healthy { "ok" } else { "error" }); } let body = json!({ "status": if all_healthy { "ok" } else { "degraded" }, "projects": statuses, }); let status = if all_healthy { StatusCode::OK } else { StatusCode::SERVICE_UNAVAILABLE }; Response::builder() .status(status) .header("Content-Type", "application/json") .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) } // ── Gateway Web UI ─────────────────────────────────────────────────── /// Self-contained HTML page for the gateway web UI. Fetches project list from /// `/api/gateway` and switches projects via `POST /api/gateway/switch`, which /// internally calls the `switch_project` MCP tool logic. const GATEWAY_UI_HTML: &str = r#" Huskies Gateway

Huskies Gateway

Multi-project orchestration
"#; /// Serve the gateway web UI HTML page at `GET /`. #[handler] pub async fn gateway_index_handler() -> Response { Response::builder() .status(StatusCode::OK) .header("Content-Type", "text/html; charset=utf-8") .body(Body::from(GATEWAY_UI_HTML)) } /// `GET /api/gateway` — returns the list of registered projects and the active project. #[handler] pub async fn gateway_api_handler(state: Data<&Arc>) -> Response { let active = state.active_project.read().await.clone(); let projects: Vec = state .projects .read() .await .iter() .map(|(name, entry)| { json!({ "name": name, "url": entry.url, }) }) .collect(); let body = json!({ "active": active, "projects": projects }); Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) } /// Request body for `POST /api/gateway/switch`. #[derive(Deserialize)] struct SwitchRequest { project: String, } /// `POST /api/gateway/switch` — switch the active project by calling the /// `switch_project` MCP tool logic, then return `{"ok": true}` or `{"ok": false, "error": "..."}`. #[handler] pub async fn gateway_switch_handler( state: Data<&Arc>, body: Json, ) -> Response { let params = json!({ "arguments": { "project": body.project } }); let resp = handle_switch_project(¶ms, &state).await; let (ok, error) = if resp.result.is_some() { (true, None) } else { let msg = resp .error .as_ref() .map(|e| e.message.clone()) .unwrap_or_else(|| "unknown error".to_string()); (false, Some(msg)) }; let body_val = if ok { json!({ "ok": true }) } else { json!({ "ok": false, "error": error }) }; let status = if ok { StatusCode::OK } else { StatusCode::BAD_REQUEST }; Response::builder() .status(status) .header("Content-Type", "application/json") .body(Body::from( serde_json::to_vec(&body_val).unwrap_or_default(), )) } // ── Project management API ─────────────────────────────────────────── /// Request body for adding a new project. #[derive(Deserialize)] struct AddProjectRequest { name: String, url: String, } /// `POST /api/gateway/projects` — add a new project to the gateway config. /// /// Expects JSON `{ "name": "...", "url": "..." }`. Returns the created project /// or 409 Conflict if a project with the same name already exists. #[handler] pub async fn gateway_add_project_handler( state: Data<&Arc>, body: Json, ) -> Response { let name = body.0.name.trim().to_string(); let url = body.0.url.trim().to_string(); if name.is_empty() { return Response::builder() .status(StatusCode::BAD_REQUEST) .body(Body::from("project name must not be empty")); } if url.is_empty() { return Response::builder() .status(StatusCode::BAD_REQUEST) .body(Body::from("project url must not be empty")); } { let mut projects = state.projects.write().await; if projects.contains_key(&name) { return Response::builder() .status(StatusCode::CONFLICT) .body(Body::from(format!("project '{name}' already exists"))); } projects.insert(name.clone(), ProjectEntry { url: url.clone() }); } let snapshot = state.projects.read().await.clone(); save_config(&snapshot, &state.config_dir).await; crate::slog!("[gateway] Added project '{name}' ({url})"); let body_val = json!({ "name": name, "url": url }); Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(Body::from( serde_json::to_vec(&body_val).unwrap_or_default(), )) } /// `DELETE /api/gateway/projects/:name` — remove a project from the gateway config. /// /// Returns 204 No Content on success. Returns 400 if this is the last project /// (the gateway requires at least one project to remain configured). #[handler] pub async fn gateway_remove_project_handler( PoemPath(name): PoemPath, state: Data<&Arc>, ) -> Response { let active = state.active_project.read().await.clone(); { let mut projects = state.projects.write().await; if !projects.contains_key(&name) { return Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from(format!("project '{name}' not found"))); } if projects.len() == 1 { return Response::builder() .status(StatusCode::BAD_REQUEST) .body(Body::from("cannot remove the last project")); } projects.remove(&name); } let snapshot = state.projects.read().await.clone(); save_config(&snapshot, &state.config_dir).await; // If the removed project was active, switch to the first remaining. if active == name { let first = state.projects.read().await.keys().next().cloned(); if let Some(new_active) = first { *state.active_project.write().await = new_active; } } crate::slog!("[gateway] Removed project '{name}'"); Response::builder() .status(StatusCode::NO_CONTENT) .body(Body::empty()) } // ── Bot configuration API ──────────────────────────────────────────── /// Request/response body for the bot configuration API. #[derive(Deserialize, Serialize, Default)] struct BotConfigPayload { /// Chat transport: `"matrix"` or `"slack"`. transport: String, // Matrix fields homeserver: Option, username: Option, password: Option, // Slack fields slack_bot_token: Option, slack_signing_secret: Option, } /// Read the current raw bot.toml (without validation) as key/value pairs for /// the configuration UI. Returns an empty payload if the file does not exist. fn read_bot_config_raw(config_dir: &Path) -> BotConfigPayload { let path = config_dir.join(".huskies").join("bot.toml"); let content = match std::fs::read_to_string(&path) { Ok(c) => c, Err(_) => return BotConfigPayload::default(), }; let table: toml::Value = match toml::from_str(&content) { Ok(v) => v, Err(_) => return BotConfigPayload::default(), }; let s = |key: &str| -> Option { table .get(key) .and_then(|v| v.as_str()) .map(|s| s.to_string()) }; BotConfigPayload { transport: s("transport").unwrap_or_else(|| "matrix".to_string()), homeserver: s("homeserver"), username: s("username"), password: s("password"), slack_bot_token: s("slack_bot_token"), slack_signing_secret: s("slack_signing_secret"), } } /// Write a `bot.toml` from the given payload. fn write_bot_config(config_dir: &Path, payload: &BotConfigPayload) -> Result<(), String> { let huskies_dir = config_dir.join(".huskies"); std::fs::create_dir_all(&huskies_dir) .map_err(|e| format!("cannot create .huskies dir: {e}"))?; let path = huskies_dir.join("bot.toml"); let content = match payload.transport.as_str() { "slack" => { format!( "enabled = true\ntransport = \"slack\"\n\nslack_bot_token = {}\nslack_signing_secret = {}\nslack_channel_ids = []\n", toml_string(payload.slack_bot_token.as_deref().unwrap_or("")), toml_string(payload.slack_signing_secret.as_deref().unwrap_or("")), ) } _ => { // Default to matrix format!( "enabled = true\ntransport = \"matrix\"\n\nhomeserver = {}\nusername = {}\npassword = {}\nroom_ids = []\nallowed_users = []\n", toml_string(payload.homeserver.as_deref().unwrap_or("")), toml_string(payload.username.as_deref().unwrap_or("")), toml_string(payload.password.as_deref().unwrap_or("")), ) } }; std::fs::write(&path, content).map_err(|e| format!("cannot write bot.toml: {e}")) } /// Escape a string as a TOML quoted string. fn toml_string(s: &str) -> String { format!("\"{}\"", s.replace('\\', "\\\\").replace('"', "\\\"")) } /// `GET /api/gateway/pipeline` — fetch pipeline status from all registered projects. /// /// Returns `{ "active": "", "projects": { "": { "active": [...], "backlog": [...], "backlog_count": N } | { "error": "..." } } }`. #[handler] pub async fn gateway_all_pipeline_handler(state: Data<&Arc>) -> Response { let project_entries: Vec<(String, String)> = state .projects .read() .await .iter() .map(|(n, e)| (n.clone(), e.url.clone())) .collect(); let mut results: BTreeMap = BTreeMap::new(); for (name, url) in &project_entries { let mcp_url = format!("{}/mcp", url.trim_end_matches('/')); let rpc_body = json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "get_pipeline_status", "arguments": {} } }); let status = match state.client.post(&mcp_url).json(&rpc_body).send().await { Ok(resp) => match resp.json::().await { Ok(upstream) => { // The tool result is a JSON string embedded in content[0].text. if let Some(text) = upstream .get("result") .and_then(|r| r.get("content")) .and_then(|c| c.get(0)) .and_then(|c| c.get("text")) .and_then(|t| t.as_str()) { serde_json::from_str(text) .unwrap_or_else(|_| json!({ "error": "invalid pipeline json" })) } else { json!({ "error": "unexpected response shape" }) } } Err(e) => json!({ "error": format!("invalid response: {e}") }), }, Err(e) => json!({ "error": format!("unreachable: {e}") }), }; results.insert(name.clone(), status); } let active = state.active_project.read().await.clone(); let body = json!({ "active": active, "projects": results }); Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) } /// `GET /api/gateway/bot-config` — return current bot.toml fields as JSON. #[handler] pub async fn gateway_bot_config_get_handler(state: Data<&Arc>) -> Response { let payload = read_bot_config_raw(&state.config_dir); Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(Body::from(serde_json::to_vec(&payload).unwrap_or_default())) } /// `POST /api/gateway/bot-config` — write new bot.toml and restart the bot. #[handler] pub async fn gateway_bot_config_save_handler( state: Data<&Arc>, body: Json, ) -> Response { if let Err(e) = write_bot_config(&state.config_dir, &body) { let err = json!({ "ok": false, "error": e }); return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .header("Content-Type", "application/json") .body(Body::from(serde_json::to_vec(&err).unwrap_or_default())); } // Abort the existing bot task (if any) and spawn a fresh one with the new config. { let mut handle = state.bot_handle.lock().await; if let Some(h) = handle.take() { h.abort(); } let gateway_projects: Vec = state.projects.read().await.keys().cloned().collect(); let new_handle = spawn_gateway_bot( &state.config_dir, Arc::clone(&state.active_project), gateway_projects, state.port, ); *handle = new_handle; } crate::slog!("[gateway] Bot configuration saved; bot restarted"); let ok = json!({ "ok": true }); Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(Body::from(serde_json::to_vec(&ok).unwrap_or_default())) } /// Self-contained HTML page for bot configuration. const GATEWAY_BOT_CONFIG_HTML: &str = r#" Bot Configuration — Huskies Gateway
← Gateway

Bot Configuration


"#; /// Serve the bot configuration HTML page at `GET /bot-config`. #[handler] pub async fn gateway_bot_config_page_handler() -> Response { Response::builder() .status(StatusCode::OK) .header("Content-Type", "text/html; charset=utf-8") .body(Body::from(GATEWAY_BOT_CONFIG_HTML)) } // ── Gateway server startup ─────────────────────────────────────────── /// Build the complete gateway route tree. /// /// Extracted from `run` so that tests can construct the full route tree and /// catch duplicate-route panics before they reach production. pub fn build_gateway_route(state_arc: Arc) -> impl poem::Endpoint { poem::Route::new() .at("/bot-config", poem::get(gateway_bot_config_page_handler)) .at("/api/gateway", poem::get(gateway_api_handler)) .at("/api/gateway/switch", poem::post(gateway_switch_handler)) .at( "/api/gateway/pipeline", poem::get(gateway_all_pipeline_handler), ) .at( "/api/gateway/projects", poem::post(gateway_add_project_handler), ) .at( "/api/gateway/projects/:name", poem::delete(gateway_remove_project_handler), ) .at( "/api/gateway/bot-config", poem::get(gateway_bot_config_get_handler).post(gateway_bot_config_save_handler), ) .at( "/mcp", poem::post(gateway_mcp_post_handler).get(gateway_mcp_get_handler), ) .at("/health", poem::get(gateway_health_handler)) // Agent join endpoints. .at("/gateway/mode", poem::get(gateway_mode_handler)) .at( "/gateway/tokens", poem::post(gateway_generate_token_handler), ) .at( "/gateway/register", poem::post(gateway_register_agent_handler), ) .at("/gateway/agents", poem::get(gateway_list_agents_handler)) .at( "/gateway/agents/:id", poem::delete(gateway_remove_agent_handler), ) .at( "/gateway/agents/:id/assign", poem::post(gateway_assign_agent_handler), ) .at( "/gateway/agents/:id/heartbeat", poem::post(gateway_heartbeat_handler), ) // Serve the embedded React frontend so the gateway has a UI. .at( "/assets/*path", poem::get(crate::http::assets::embedded_asset), ) .at("/*path", poem::get(crate::http::assets::embedded_file)) .at("/", poem::get(crate::http::assets::embedded_index)) .data(state_arc) } /// Start the gateway HTTP server. This is the entry point when `--gateway` is used. pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> { // Locate the gateway config directory (parent of `projects.toml`). let config_dir = config_path .parent() .unwrap_or(std::path::Path::new(".")) .to_path_buf(); let config = GatewayConfig::load(config_path).map_err(std::io::Error::other)?; let state = GatewayState::new(config, config_dir.clone(), port).map_err(std::io::Error::other)?; let state_arc = Arc::new(state); let active = state_arc.active_project.read().await.clone(); crate::slog!("[gateway] Starting gateway on port {port}, active project: {active}"); crate::slog!( "[gateway] Registered projects: {}", state_arc .projects .read() .await .keys() .cloned() .collect::>() .join(", ") ); // Write `.mcp.json` so that the gateway's Matrix bot's Claude Code CLI // connects to this gateway's MCP endpoint (which proxies to the active project). if let Err(e) = write_gateway_mcp_json(&config_dir, port) { crate::slog!("[gateway] Warning: could not write .mcp.json: {e}"); } // Spawn the Matrix bot if `.huskies/bot.toml` exists in the config directory. let gateway_projects: Vec = state_arc.projects.read().await.keys().cloned().collect(); let bot_abort = spawn_gateway_bot( &config_dir, Arc::clone(&state_arc.active_project), gateway_projects, port, ); *state_arc.bot_handle.lock().await = bot_abort; let route = build_gateway_route(state_arc); let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); let addr = format!("{host}:{port}"); crate::slog!("[gateway] Listening on {addr}"); poem::Server::new(poem::listener::TcpListener::bind(&addr)) .run(route) .await } // ── Matrix bot integration ─────────────────────────────────────────── /// Write (or overwrite) a `.mcp.json` in `config_dir` that points Claude Code /// CLI at the gateway's own `/mcp` endpoint. This lets the gateway's Matrix /// bot use gateway-proxied tool calls instead of a project-specific server. fn write_gateway_mcp_json(config_dir: &Path, port: u16) -> Result<(), std::io::Error> { let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); let url = format!("http://{host}:{port}/mcp"); let content = serde_json::json!({ "mcpServers": { "huskies": { "type": "http", "url": url } } }); let path = config_dir.join(".mcp.json"); std::fs::write(&path, serde_json::to_string_pretty(&content).unwrap())?; crate::slog!("[gateway] Wrote {} pointing to {}", path.display(), url); Ok(()) } /// Attempt to spawn the Matrix bot against the gateway config directory. /// /// Reads `/.huskies/bot.toml`. If absent or disabled the function /// returns immediately without spawning anything. When the bot is enabled it /// receives a shared reference to the gateway's active-project `RwLock` so the /// `switch` command can change the active project without going through HTTP. /// /// Returns an [`tokio::task::AbortHandle`] if the bot task was spawned, `None` otherwise. fn spawn_gateway_bot( config_dir: &Path, active_project: ActiveProject, gateway_projects: Vec, port: u16, ) -> Option { use crate::agents::AgentPool; use tokio::sync::{broadcast, mpsc}; // Create a watcher broadcast channel (no file-system watcher in gateway mode). let (watcher_tx, _) = broadcast::channel(16); // Create a dummy permission channel — permission prompts are not forwarded // across the proxy boundary in this initial implementation. let (_perm_tx, perm_rx) = mpsc::unbounded_channel(); let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx)); // Create a shutdown watch channel. Gateway process exit signals Ctrl-C // via OS signal, not through a watch channel, so we leave this at None // (no shutdown announcement). The sender is kept alive for the duration. let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel::>(None); // Keep sender alive so the receiver is never prematurely closed. std::mem::forget(shutdown_tx); let agents = Arc::new(AgentPool::new(port, watcher_tx.clone())); crate::chat::transport::matrix::spawn_bot( config_dir, watcher_tx, perm_rx, agents, shutdown_rx, Some(active_project), gateway_projects, ) } // ── Tests ──────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; #[test] fn parse_valid_projects_toml() { let toml_str = r#" [projects.huskies] url = "http://localhost:3001" [projects.robot-studio] url = "http://localhost:3002" "#; let config: GatewayConfig = toml::from_str(toml_str).unwrap(); assert_eq!(config.projects.len(), 2); assert_eq!(config.projects["huskies"].url, "http://localhost:3001"); assert_eq!(config.projects["robot-studio"].url, "http://localhost:3002"); } #[test] fn parse_empty_projects_toml() { let toml_str = "[projects]\n"; let config: GatewayConfig = toml::from_str(toml_str).unwrap(); assert!(config.projects.is_empty()); } #[test] fn gateway_state_rejects_empty_config() { let config = GatewayConfig { projects: BTreeMap::new(), }; assert!(GatewayState::new(config, PathBuf::from("."), 3000).is_err()); } #[test] fn gateway_state_sets_first_project_active() { let mut projects = BTreeMap::new(); projects.insert( "alpha".into(), ProjectEntry { url: "http://a:3001".into(), }, ); projects.insert( "beta".into(), ProjectEntry { url: "http://b:3002".into(), }, ); let config = GatewayConfig { projects }; let state = GatewayState::new(config, PathBuf::from("."), 3000).unwrap(); let active = state.active_project.blocking_read().clone(); assert_eq!(active, "alpha"); // BTreeMap sorts alphabetically. } #[test] fn gateway_tool_definitions_has_expected_tools() { let defs = gateway_tool_definitions(); let names: Vec<&str> = defs .iter() .filter_map(|d| d.get("name").and_then(|n| n.as_str())) .collect(); assert!(names.contains(&"switch_project")); assert!(names.contains(&"gateway_status")); assert!(names.contains(&"gateway_health")); } #[tokio::test] async fn switch_project_to_known_project() { let mut projects = BTreeMap::new(); projects.insert( "alpha".into(), ProjectEntry { url: "http://a:3001".into(), }, ); projects.insert( "beta".into(), ProjectEntry { url: "http://b:3002".into(), }, ); let config = GatewayConfig { projects }; let state = GatewayState::new(config, PathBuf::from("."), 3000).unwrap(); let params = json!({ "arguments": { "project": "beta" } }); let resp = handle_switch_project(¶ms, &state).await; assert!(resp.result.is_some()); let active = state.active_project.read().await.clone(); assert_eq!(active, "beta"); } #[tokio::test] async fn switch_project_to_unknown_project_fails() { let mut projects = BTreeMap::new(); projects.insert( "alpha".into(), ProjectEntry { url: "http://a:3001".into(), }, ); let config = GatewayConfig { projects }; let state = GatewayState::new(config, PathBuf::from("."), 3000).unwrap(); let params = json!({ "arguments": { "project": "nonexistent" } }); let resp = handle_switch_project(¶ms, &state).await; assert!(resp.error.is_some()); } #[tokio::test] async fn active_url_returns_correct_url() { let mut projects = BTreeMap::new(); projects.insert( "myproj".into(), ProjectEntry { url: "http://my:3001".into(), }, ); let config = GatewayConfig { projects }; let state = GatewayState::new(config, PathBuf::from("."), 3000).unwrap(); let url = state.active_url().await.unwrap(); assert_eq!(url, "http://my:3001"); } #[test] fn json_rpc_response_success_serializes() { let resp = JsonRpcResponse::success(Some(json!(1)), json!({"ok": true})); let s = serde_json::to_string(&resp).unwrap(); assert!(s.contains("\"result\"")); assert!(!s.contains("\"error\"")); } #[test] fn json_rpc_response_error_serializes() { let resp = JsonRpcResponse::error(Some(json!(1)), -32600, "bad".into()); let s = serde_json::to_string(&resp).unwrap(); assert!(s.contains("\"error\"")); assert!(!s.contains("\"result\"")); } #[test] fn load_config_from_file() { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("projects.toml"); std::fs::write( &path, r#" [projects.test] url = "http://localhost:9999" "#, ) .unwrap(); let config = GatewayConfig::load(&path).unwrap(); assert_eq!(config.projects.len(), 1); assert_eq!(config.projects["test"].url, "http://localhost:9999"); } #[test] fn load_config_missing_file_fails() { let result = GatewayConfig::load(Path::new("/nonexistent/projects.toml")); assert!(result.is_err()); } // ── bot.toml in gateway and standalone modes ───────────────────────── // // Both gateway and standalone modes load bot.toml via `BotConfig::load(dir)` // which looks for `dir/.huskies/bot.toml`. These tests document that the // same loading convention works from a gateway config directory. #[test] fn bot_config_loads_from_gateway_config_dir() { use crate::chat::transport::matrix::BotConfig; let tmp = tempfile::tempdir().unwrap(); let huskies_dir = tmp.path().join(".huskies"); std::fs::create_dir_all(&huskies_dir).unwrap(); std::fs::write( huskies_dir.join("bot.toml"), r#" homeserver = "https://matrix.example.com" username = "@bot:example.com" password = "secret" room_ids = ["!abc:example.com"] enabled = true "#, ) .unwrap(); // Gateway passes config_dir (parent of projects.toml) to spawn_bot, // which calls BotConfig::load(config_dir). Verify this resolves correctly. let config = BotConfig::load(tmp.path()); assert!( config.is_some(), "bot.toml should load from gateway config dir" ); let config = config.unwrap(); assert_eq!( config.homeserver.as_deref(), Some("https://matrix.example.com") ); } #[test] fn bot_config_absent_returns_none_in_gateway_mode() { use crate::chat::transport::matrix::BotConfig; // A gateway config directory without a .huskies/bot.toml should yield None, // allowing the gateway to start without a Matrix bot. let tmp = tempfile::tempdir().unwrap(); let config = BotConfig::load(tmp.path()); assert!( config.is_none(), "absent bot.toml must return None in gateway mode" ); } #[test] fn bot_config_disabled_returns_none_in_gateway_mode() { use crate::chat::transport::matrix::BotConfig; let tmp = tempfile::tempdir().unwrap(); let huskies_dir = tmp.path().join(".huskies"); std::fs::create_dir_all(&huskies_dir).unwrap(); std::fs::write( huskies_dir.join("bot.toml"), r#" homeserver = "https://matrix.example.com" username = "@bot:example.com" password = "secret" room_ids = ["!abc:example.com"] enabled = false "#, ) .unwrap(); let config = BotConfig::load(tmp.path()); assert!( config.is_none(), "disabled bot.toml must return None in gateway mode" ); } // ── Agent join mechanism tests ─────────────────────────────────────── fn make_test_state() -> Arc { let mut projects = BTreeMap::new(); projects.insert( "test".into(), ProjectEntry { url: "http://test:3001".into(), }, ); let config = GatewayConfig { projects }; Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap()) } #[tokio::test] async fn generate_token_creates_pending_token() { let state = make_test_state(); let app = poem::Route::new() .at( "/gateway/tokens", poem::post(gateway_generate_token_handler), ) .data(state.clone()); let cli = poem::test::TestClient::new(app); let resp = cli.post("/gateway/tokens").send().await; assert_eq!(resp.0.status(), StatusCode::OK); let body: Value = resp.0.into_body().into_json().await.unwrap(); let token = body["token"].as_str().unwrap(); assert!(!token.is_empty()); let tokens = state.pending_tokens.read().await; assert!(tokens.contains_key(token)); } #[tokio::test] async fn register_agent_consumes_token() { let state = make_test_state(); // Insert a token manually. let token = "test-token-123".to_string(); state.pending_tokens.write().await.insert( token.clone(), PendingToken { created_at: chrono::Utc::now().timestamp() as f64, }, ); let app = poem::Route::new() .at( "/gateway/register", poem::post(gateway_register_agent_handler), ) .data(state.clone()); let cli = poem::test::TestClient::new(app); let resp = cli .post("/gateway/register") .header("Content-Type", "application/json") .body( json!({ "token": token, "label": "test-agent", "address": "ws://localhost:3001/crdt-sync" }) .to_string(), ) .send() .await; assert_eq!(resp.0.status(), StatusCode::OK); // Token consumed. assert!(state.pending_tokens.read().await.is_empty()); // Agent registered. let agents = state.joined_agents.read().await; assert_eq!(agents.len(), 1); assert_eq!(agents[0].label, "test-agent"); } #[tokio::test] async fn register_agent_rejects_invalid_token() { let state = make_test_state(); let app = poem::Route::new() .at( "/gateway/register", poem::post(gateway_register_agent_handler), ) .data(state.clone()); let cli = poem::test::TestClient::new(app); let resp = cli .post("/gateway/register") .header("Content-Type", "application/json") .body( json!({ "token": "bad-token", "label": "agent", "address": "ws://localhost:3001/crdt-sync" }) .to_string(), ) .send() .await; assert_eq!(resp.0.status(), StatusCode::UNAUTHORIZED); assert!(state.joined_agents.read().await.is_empty()); } #[tokio::test] async fn list_agents_returns_registered_agents() { let state = make_test_state(); state.joined_agents.write().await.push(JoinedAgent { id: "id-1".into(), label: "agent-1".into(), address: "ws://a:3001/crdt-sync".into(), registered_at: 0.0, last_seen: 0.0, assigned_project: None, }); let app = poem::Route::new() .at("/gateway/agents", poem::get(gateway_list_agents_handler)) .data(state.clone()); let cli = poem::test::TestClient::new(app); let resp = cli.get("/gateway/agents").send().await; assert_eq!(resp.0.status(), StatusCode::OK); let agents: Vec = resp.0.into_body().into_json().await.unwrap(); assert_eq!(agents.len(), 1); assert_eq!(agents[0]["label"], "agent-1"); } #[tokio::test] async fn remove_agent_deletes_by_id() { let state = make_test_state(); state.joined_agents.write().await.push(JoinedAgent { id: "del-id".into(), label: "to-delete".into(), address: "ws://x:3001/crdt-sync".into(), registered_at: 0.0, last_seen: 0.0, assigned_project: None, }); let app = poem::Route::new() .at( "/gateway/agents/:id", poem::delete(gateway_remove_agent_handler), ) .data(state.clone()); let cli = poem::test::TestClient::new(app); let resp = cli.delete("/gateway/agents/del-id").send().await; assert_eq!(resp.0.status(), StatusCode::NO_CONTENT); assert!(state.joined_agents.read().await.is_empty()); } #[tokio::test] async fn remove_agent_unknown_id_returns_not_found() { let state = make_test_state(); let app = poem::Route::new() .at( "/gateway/agents/:id", poem::delete(gateway_remove_agent_handler), ) .data(state.clone()); let cli = poem::test::TestClient::new(app); let resp = cli.delete("/gateway/agents/no-such-id").send().await; assert_eq!(resp.0.status(), StatusCode::NOT_FOUND); } #[tokio::test] async fn heartbeat_updates_last_seen() { let state = make_test_state(); state.joined_agents.write().await.push(JoinedAgent { id: "hb-id".into(), label: "hb-agent".into(), address: "ws://hb:3001/crdt-sync".into(), registered_at: 0.0, last_seen: 0.0, assigned_project: None, }); let app = poem::Route::new() .at( "/gateway/agents/:id/heartbeat", poem::post(gateway_heartbeat_handler), ) .data(state.clone()); let cli = poem::test::TestClient::new(app); let resp = cli.post("/gateway/agents/hb-id/heartbeat").send().await; assert_eq!(resp.0.status(), StatusCode::NO_CONTENT); let agents = state.joined_agents.read().await; assert!(agents[0].last_seen > 0.0); } #[tokio::test] async fn heartbeat_unknown_id_returns_not_found() { let state = make_test_state(); let app = poem::Route::new() .at( "/gateway/agents/:id/heartbeat", poem::post(gateway_heartbeat_handler), ) .data(state.clone()); let cli = poem::test::TestClient::new(app); let resp = cli .post("/gateway/agents/no-such-id/heartbeat") .send() .await; assert_eq!(resp.0.status(), StatusCode::NOT_FOUND); } /// Build the full gateway route tree and verify it does not panic. /// /// Poem panics at construction time when duplicate routes are registered. /// This test catches any regression where a duplicate route is re-introduced /// (e.g. the `/` vs `/*path` duplicate fixed in commit 0969fb5d). #[test] fn gateway_route_tree_builds_without_panic() { let state = make_test_state(); // build_gateway_route will panic if any route is registered more than once. let _route = build_gateway_route(state); } }