huskies: merge 782

This commit is contained in:
dave
2026-04-28 10:56:09 +00:00
parent 01169332b3
commit 05d057a40a
6 changed files with 1267 additions and 1224 deletions
File diff suppressed because it is too large Load Diff
+64
View File
@@ -0,0 +1,64 @@
//! JSON-RPC 2.0 request/response types and helpers.
use poem::http::StatusCode;
use poem::{Body, Response};
use serde::{Deserialize, Serialize};
use serde_json::Value;
// ── JSON-RPC types ──────────────────────────────────────────────────────────
/// JSON-RPC request.
#[derive(Deserialize)]
pub(super) struct JsonRpcRequest {
pub(super) jsonrpc: String,
pub(super) id: Option<Value>,
pub(super) method: String,
#[serde(default)]
pub(super) params: Value,
}
/// JSON-RPC response.
#[derive(Serialize)]
pub(crate) struct JsonRpcResponse {
jsonrpc: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) id: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) error: Option<JsonRpcError>,
}
#[derive(Debug, Serialize)]
pub(crate) struct JsonRpcError {
code: i64,
pub(crate) message: String,
}
impl JsonRpcResponse {
pub(crate) fn success(id: Option<Value>, result: Value) -> Self {
Self {
jsonrpc: "2.0",
id,
result: Some(result),
error: None,
}
}
pub(crate) fn error(id: Option<Value>, code: i64, message: String) -> Self {
Self {
jsonrpc: "2.0",
id,
result: None,
error: Some(JsonRpcError { code, message }),
}
}
}
pub(super) fn to_json_response(resp: JsonRpcResponse) -> Response {
let body = serde_json::to_vec(&resp).unwrap_or_default();
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(body))
}
+426
View File
@@ -0,0 +1,426 @@
//! MCP JSON-RPC POST/GET handlers and gateway tool dispatch.
use super::jsonrpc::{JsonRpcRequest, JsonRpcResponse, to_json_response};
use crate::service::gateway::{self, GatewayState};
use poem::handler;
use poem::http::StatusCode;
use poem::web::Data;
use poem::{Body, Request, Response};
use serde_json::{Value, json};
use std::collections::BTreeMap;
use std::sync::Arc;
// ── MCP tool definitions ─────────────────────────────────────────────────────
/// Gateway-specific MCP tools exposed alongside the proxied tools.
const GATEWAY_TOOLS: &[&str] = &[
"switch_project",
"gateway_status",
"gateway_health",
"init_project",
"aggregate_pipeline_status",
];
/// Gateway tool definitions.
pub(crate) fn gateway_tool_definitions() -> Vec<Value> {
vec![
json!({
"name": "switch_project",
"description": "Switch the active project. All subsequent MCP tool calls will be proxied to this project's container.",
"inputSchema": {
"type": "object",
"properties": {
"project": {
"type": "string",
"description": "Name of the project to switch to (must exist in projects.toml)"
}
},
"required": ["project"]
}
}),
json!({
"name": "gateway_status",
"description": "Show pipeline status for the active project by proxying the get_pipeline_status tool call.",
"inputSchema": {
"type": "object",
"properties": {}
}
}),
json!({
"name": "gateway_health",
"description": "Health check aggregation across all registered projects. Returns the health status of every project container.",
"inputSchema": {
"type": "object",
"properties": {}
}
}),
json!({
"name": "init_project",
"description": "Initialize a new huskies project at the given path by scaffolding .huskies/ and related files — the same as running `huskies init <path>`. Prefer this tool over asking the user to run the CLI. If `name` and `url` are supplied the project is also registered in projects.toml so switch_project can reach it immediately.",
"inputSchema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute filesystem path to the project directory to initialise. The directory is created if it does not exist."
},
"name": {
"type": "string",
"description": "Optional: short name to register the project under in projects.toml (e.g. 'my-app'). Requires `url`."
},
"url": {
"type": "string",
"description": "Optional: base URL of the huskies container that will serve this project (e.g. 'http://my-app:3001'). Required when `name` is given."
}
},
"required": ["path"]
}
}),
json!({
"name": "aggregate_pipeline_status",
"description": "Fetch pipeline status from ALL registered projects in parallel and return an aggregated report. For each project: stage counts (backlog/current/qa/merge/done) and a list of blocked or failing items with triage detail. Unreachable projects are included with an error state rather than failing the whole call.",
"inputSchema": {
"type": "object",
"properties": {}
}
}),
]
}
// ── MCP POST handler ─────────────────────────────────────────────────────────
/// Main MCP POST handler for the gateway. Intercepts gateway-specific tools and
/// proxies everything else to the active project's container.
#[handler]
pub async fn gateway_mcp_post_handler(
req: &Request,
body: Body,
state: Data<&Arc<GatewayState>>,
) -> Response {
let content_type = req.header("content-type").unwrap_or("");
if !content_type.is_empty() && !content_type.contains("application/json") {
return to_json_response(JsonRpcResponse::error(
None,
-32700,
"Unsupported Content-Type; expected application/json".into(),
));
}
let bytes = match body.into_bytes().await {
Ok(b) => b,
Err(_) => {
return to_json_response(JsonRpcResponse::error(None, -32700, "Parse error".into()));
}
};
let rpc: JsonRpcRequest = match serde_json::from_slice(&bytes) {
Ok(r) => r,
Err(_) => {
return to_json_response(JsonRpcResponse::error(None, -32700, "Parse error".into()));
}
};
if rpc.jsonrpc != "2.0" {
return to_json_response(JsonRpcResponse::error(
rpc.id,
-32600,
"Invalid JSON-RPC version".into(),
));
}
if rpc.id.is_none() || rpc.id.as_ref() == Some(&Value::Null) {
if rpc.method.starts_with("notifications/") {
return Response::builder()
.status(StatusCode::ACCEPTED)
.body(Body::empty());
}
return to_json_response(JsonRpcResponse::error(None, -32600, "Missing id".into()));
}
match rpc.method.as_str() {
"initialize" => to_json_response(handle_initialize(rpc.id)),
"tools/list" => match handle_tools_list(&state, rpc.id.clone()).await {
Ok(resp) => to_json_response(resp),
Err(e) => to_json_response(JsonRpcResponse::error(rpc.id, -32603, e)),
},
"tools/call" => {
let tool_name = rpc
.params
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("");
if GATEWAY_TOOLS.contains(&tool_name) {
to_json_response(
handle_gateway_tool(tool_name, &rpc.params, &state, rpc.id.clone()).await,
)
} else {
proxy_and_respond(&state, &bytes, rpc.id).await
}
}
_ => proxy_and_respond(&state, &bytes, rpc.id).await,
}
}
/// Proxy a request to the active project and format the response.
async fn proxy_and_respond(state: &GatewayState, bytes: &[u8], id: Option<Value>) -> Response {
let url = match state.active_url().await {
Ok(u) => u,
Err(e) => {
return to_json_response(JsonRpcResponse::error(id, -32603, e.to_string()));
}
};
match gateway::io::proxy_mcp_call(&state.client, &url, bytes).await {
Ok(resp_body) => Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(resp_body)),
Err(e) => to_json_response(JsonRpcResponse::error(
id,
-32603,
format!("proxy error: {e}"),
)),
}
}
/// GET handler — method not allowed.
#[handler]
pub async fn gateway_mcp_get_handler() -> Response {
Response::builder()
.status(StatusCode::METHOD_NOT_ALLOWED)
.body(Body::empty())
}
// ── Protocol handlers ────────────────────────────────────────────────────────
fn handle_initialize(id: Option<Value>) -> JsonRpcResponse {
JsonRpcResponse::success(
id,
json!({
"protocolVersion": "2025-03-26",
"capabilities": { "tools": {} },
"serverInfo": {
"name": "huskies-gateway",
"version": "1.0.0"
}
}),
)
}
/// Fetch tools/list from the active project and merge in gateway tools.
async fn handle_tools_list(
state: &GatewayState,
id: Option<Value>,
) -> Result<JsonRpcResponse, String> {
let url = state.active_url().await.map_err(|e| e.to_string())?;
let resp_json = gateway::io::fetch_tools_list(&state.client, &url).await?;
let mut tools: Vec<Value> = resp_json
.get("result")
.and_then(|r| r.get("tools"))
.and_then(|t| t.as_array())
.cloned()
.unwrap_or_default();
let mut all_tools = gateway_tool_definitions();
all_tools.append(&mut tools);
Ok(JsonRpcResponse::success(id, json!({ "tools": all_tools })))
}
// ── Gateway tool dispatch ────────────────────────────────────────────────────
/// Dispatch a gateway-specific tool call.
async fn handle_gateway_tool(
tool_name: &str,
params: &Value,
state: &GatewayState,
id: Option<Value>,
) -> JsonRpcResponse {
match tool_name {
"switch_project" => handle_switch_project_tool(params, state, id).await,
"gateway_status" => handle_gateway_status_tool(state, id).await,
"gateway_health" => handle_gateway_health_tool(state, id).await,
"init_project" => handle_init_project_tool(params, state, id).await,
"aggregate_pipeline_status" => handle_aggregate_pipeline_status_tool(state, id).await,
_ => JsonRpcResponse::error(id, -32601, format!("Unknown gateway tool: {tool_name}")),
}
}
async fn handle_switch_project_tool(
params: &Value,
state: &GatewayState,
id: Option<Value>,
) -> JsonRpcResponse {
let project = params
.get("arguments")
.and_then(|a| a.get("project"))
.or_else(|| params.get("project"))
.and_then(|v| v.as_str())
.unwrap_or("");
match gateway::switch_project(state, project).await {
Ok(url) => JsonRpcResponse::success(
id,
json!({
"content": [{
"type": "text",
"text": format!("Switched to project '{project}' ({url})")
}]
}),
),
Err(e) => JsonRpcResponse::error(id, -32602, e.to_string()),
}
}
async fn handle_gateway_status_tool(state: &GatewayState, id: Option<Value>) -> JsonRpcResponse {
let active = state.active_project.read().await.clone();
let url = match state.active_url().await {
Ok(u) => u,
Err(e) => return JsonRpcResponse::error(id.clone(), -32603, e.to_string()),
};
match gateway::io::fetch_pipeline_status_for_project(&state.client, &url).await {
Ok(upstream) => {
let pipeline = upstream.get("result").cloned().unwrap_or(json!(null));
JsonRpcResponse::success(
id,
json!({
"content": [{
"type": "text",
"text": format!(
"Pipeline status for '{active}':\n{}",
serde_json::to_string_pretty(&pipeline).unwrap_or_default()
)
}]
}),
)
}
Err(e) => JsonRpcResponse::error(id, -32603, e),
}
}
async fn handle_gateway_health_tool(state: &GatewayState, id: Option<Value>) -> JsonRpcResponse {
let mut results = BTreeMap::new();
let project_entries: Vec<(String, String)> = state
.projects
.read()
.await
.iter()
.map(|(n, e)| (n.clone(), e.url.clone()))
.collect();
for (name, url) in &project_entries {
let status = match gateway::io::check_project_health(&state.client, url).await {
Ok(true) => "healthy".to_string(),
Ok(false) => "unhealthy".to_string(),
Err(e) => e,
};
results.insert(name.clone(), status);
}
let active = state.active_project.read().await.clone();
JsonRpcResponse::success(
id,
json!({
"content": [{
"type": "text",
"text": format!(
"Health check (active: '{active}'):\n{}",
results.iter()
.map(|(name, status)| format!(" {name}: {status}"))
.collect::<Vec<_>>()
.join("\n")
)
}]
}),
)
}
async fn handle_init_project_tool(
params: &Value,
state: &GatewayState,
id: Option<Value>,
) -> JsonRpcResponse {
let args = params.get("arguments").unwrap_or(params);
let path_str = args.get("path").and_then(|v| v.as_str()).unwrap_or("");
let name = args.get("name").and_then(|v| v.as_str());
let url = args.get("url").and_then(|v| v.as_str());
match gateway::init_project(state, path_str, name, url).await {
Ok(registered_name) => {
let next_steps = if let Some(ref n) = registered_name {
format!(
"Project registered as '{n}' in projects.toml.\n\
Next steps:\n\
1. Start a huskies server at '{path_str}' \
(e.g. `huskies {path_str}` or via Docker).\n\
2. Call switch_project with name='{n}' to make it active.\n\
3. Call wizard_status to begin the setup wizard."
)
} else {
format!(
"Next steps:\n\
1. Start a huskies server at '{path_str}' \
(e.g. `huskies {path_str}` or via Docker).\n\
2. Register the project: call init_project again with name and url \
parameters, or add it to projects.toml manually.\n\
3. Call switch_project and then wizard_status to begin the setup wizard.\n\n\
Note: wizard_* MCP tools require a running huskies server for the project."
)
};
JsonRpcResponse::success(
id,
json!({
"content": [{
"type": "text",
"text": format!("Successfully initialised huskies project at '{path_str}'.\n\n{next_steps}")
}]
}),
)
}
Err(e) => {
let code = match &e {
gateway::Error::Config(_) => -32602,
gateway::Error::DuplicateToken(_) => -32602,
_ => -32603,
};
JsonRpcResponse::error(id, code, e.to_string())
}
}
}
async fn handle_aggregate_pipeline_status_tool(
state: &GatewayState,
id: Option<Value>,
) -> JsonRpcResponse {
let project_urls: BTreeMap<String, String> = state
.projects
.read()
.await
.iter()
.map(|(name, entry)| (name.clone(), entry.url.clone()))
.collect();
let statuses =
gateway::io::fetch_all_project_pipeline_statuses(&project_urls, &state.client).await;
let active = state.active_project.read().await.clone();
JsonRpcResponse::success(
id,
json!({
"content": [{
"type": "text",
"text": format!(
"Aggregate pipeline status (active: '{active}'):\n{}",
serde_json::to_string_pretty(&statuses).unwrap_or_default()
)
}],
"projects": statuses,
"active": active,
}),
)
}
+24
View File
@@ -0,0 +1,24 @@
//! Gateway HTTP handlers — thin transport shells for the gateway service.
//!
//! Each handler calls `service::gateway::*` for business logic and formats
//! the response. No inline business logic, no `reqwest`, no filesystem access.
//!
//! Submodules:
//! - [`jsonrpc`] — JSON-RPC 2.0 types and helpers
//! - [`mcp`] — MCP POST/GET handlers and gateway tool dispatch
//! - [`websocket`] — WebSocket handlers (CRDT-sync, event push)
//! - [`rest`] — REST API handlers (agents, projects, bot config, pipeline)
mod jsonrpc;
mod mcp;
mod rest;
mod websocket;
pub use mcp::{gateway_mcp_get_handler, gateway_mcp_post_handler};
pub use rest::{
gateway_add_project_handler, gateway_all_pipeline_handler, gateway_api_handler,
gateway_assign_agent_handler, gateway_bot_config_get_handler, gateway_bot_config_page_handler,
gateway_bot_config_save_handler, gateway_generate_token_handler, gateway_list_agents_handler,
gateway_mode_handler, gateway_remove_project_handler, gateway_switch_handler,
};
pub use websocket::{gateway_crdt_sync_handler, gateway_event_push_handler};
+493
View File
@@ -0,0 +1,493 @@
//! REST HTTP handlers for the gateway: agents, projects, bot configuration, and pipeline.
use crate::service::gateway::{self, GatewayState};
use poem::handler;
use poem::http::StatusCode;
use poem::web::Path as PoemPath;
use poem::web::{Data, Json};
use poem::{Body, Response};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::collections::BTreeMap;
use std::sync::Arc;
// ── Agent REST handlers ───────────────────────────────────────────────────────
/// `GET /gateway/mode` — returns `{"mode":"gateway"}`.
#[handler]
pub async fn gateway_mode_handler() -> Response {
let body = json!({ "mode": "gateway" });
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_vec(&body).unwrap_or_default()))
}
/// `POST /gateway/tokens` — generate a one-time join token.
#[handler]
pub async fn gateway_generate_token_handler(state: Data<&Arc<GatewayState>>) -> Response {
let token = gateway::generate_join_token(&state).await;
let body = json!({ "token": token });
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_vec(&body).unwrap_or_default()))
}
/// `GET /gateway/agents` — list all alive build agents registered in the CRDT.
#[handler]
pub async fn gateway_list_agents_handler(_state: Data<&Arc<GatewayState>>) -> Response {
let agents = gateway::list_agents();
let body = serde_json::to_vec(&agents).unwrap_or_default();
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(body))
}
/// Request body for assigning an agent to a project.
#[derive(Deserialize)]
struct AssignAgentRequest {
project: Option<String>,
}
/// `POST /gateway/agents/:id/assign` — assign (or unassign) an agent to a project.
#[handler]
pub async fn gateway_assign_agent_handler(
PoemPath(id): PoemPath<String>,
body: Json<AssignAgentRequest>,
state: Data<&Arc<GatewayState>>,
) -> Response {
match gateway::assign_agent(&state, &id, body.0.project).await {
Ok(agent) => {
let body = serde_json::to_vec(&agent).unwrap_or_default();
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(body))
}
Err(gateway::Error::ProjectNotFound(msg)) => Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from(msg)),
Err(_) => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("agent not found")),
}
}
// ── Gateway Web UI ────────────────────────────────────────────────────────────
/// `GET /api/gateway` — returns the list of registered projects and the active project.
#[handler]
pub async fn gateway_api_handler(state: Data<&Arc<GatewayState>>) -> Response {
let active = state.active_project.read().await.clone();
let projects: Vec<Value> = state
.projects
.read()
.await
.iter()
.map(|(name, entry)| {
json!({
"name": name,
"url": entry.url,
})
})
.collect();
let body = json!({ "active": active, "projects": projects });
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_vec(&body).unwrap_or_default()))
}
/// Request body for `POST /api/gateway/switch`.
#[derive(Deserialize)]
struct SwitchRequest {
project: String,
}
/// `POST /api/gateway/switch` — switch the active project.
#[handler]
pub async fn gateway_switch_handler(
state: Data<&Arc<GatewayState>>,
body: Json<SwitchRequest>,
) -> Response {
match gateway::switch_project(&state, &body.project).await {
Ok(_) => {
let body_val = json!({ "ok": true });
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(
serde_json::to_vec(&body_val).unwrap_or_default(),
))
}
Err(e) => {
let body_val = json!({ "ok": false, "error": e.to_string() });
Response::builder()
.status(StatusCode::BAD_REQUEST)
.header("Content-Type", "application/json")
.body(Body::from(
serde_json::to_vec(&body_val).unwrap_or_default(),
))
}
}
}
// ── Project management API ────────────────────────────────────────────────────
/// Request body for adding a new project.
#[derive(Deserialize)]
struct AddProjectRequest {
name: String,
url: String,
}
/// `POST /api/gateway/projects` — add a new project.
#[handler]
pub async fn gateway_add_project_handler(
state: Data<&Arc<GatewayState>>,
body: Json<AddProjectRequest>,
) -> Response {
match gateway::add_project(&state, &body.name, &body.url).await {
Ok(()) => {
let name = body.0.name.trim().to_string();
let url = body.0.url.trim().to_string();
let body_val = json!({ "name": name, "url": url });
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(
serde_json::to_vec(&body_val).unwrap_or_default(),
))
}
Err(gateway::Error::DuplicateToken(_)) => Response::builder()
.status(StatusCode::CONFLICT)
.body(Body::from(format!(
"project '{}' already exists",
body.0.name.trim()
))),
Err(e) => Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from(e.to_string())),
}
}
/// `DELETE /api/gateway/projects/:name` — remove a project.
#[handler]
pub async fn gateway_remove_project_handler(
PoemPath(name): PoemPath<String>,
state: Data<&Arc<GatewayState>>,
) -> Response {
match gateway::remove_project(&state, &name).await {
Ok(()) => Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::empty()),
Err(gateway::Error::ProjectNotFound(msg)) => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from(msg)),
Err(e) => Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from(e.to_string())),
}
}
// ── Bot configuration API ─────────────────────────────────────────────────────
/// Request/response body for the bot configuration API.
#[derive(Deserialize, Serialize, Default)]
pub(crate) struct BotConfigPayload {
transport: String,
homeserver: Option<String>,
username: Option<String>,
password: Option<String>,
slack_bot_token: Option<String>,
slack_signing_secret: Option<String>,
}
/// `GET /api/gateway/bot-config` — return current bot.toml fields as JSON.
#[handler]
pub async fn gateway_bot_config_get_handler(state: Data<&Arc<GatewayState>>) -> Response {
let fields = gateway::io::read_bot_config_raw(&state.config_dir);
let payload = BotConfigPayload {
transport: fields.transport,
homeserver: fields.homeserver,
username: fields.username,
password: fields.password,
slack_bot_token: fields.slack_bot_token,
slack_signing_secret: fields.slack_signing_secret,
};
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_vec(&payload).unwrap_or_default()))
}
/// `POST /api/gateway/bot-config` — write new bot.toml and restart the bot.
#[handler]
pub async fn gateway_bot_config_save_handler(
state: Data<&Arc<GatewayState>>,
body: Json<BotConfigPayload>,
) -> Response {
let content = gateway::config::serialize_bot_config(
&body.transport,
body.homeserver.as_deref(),
body.username.as_deref(),
body.password.as_deref(),
body.slack_bot_token.as_deref(),
body.slack_signing_secret.as_deref(),
);
match gateway::save_bot_config_and_restart(&state, &content).await {
Ok(()) => {
let ok = json!({ "ok": true });
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_vec(&ok).unwrap_or_default()))
}
Err(e) => {
let err = json!({ "ok": false, "error": e.to_string() });
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_vec(&err).unwrap_or_default()))
}
}
}
/// `GET /api/gateway/pipeline` — fetch pipeline status from all registered projects.
#[handler]
pub async fn gateway_all_pipeline_handler(state: Data<&Arc<GatewayState>>) -> Response {
let project_urls: BTreeMap<String, String> = state
.projects
.read()
.await
.iter()
.map(|(n, e)| (n.clone(), e.url.clone()))
.collect();
let results =
gateway::io::fetch_all_project_pipeline_statuses(&project_urls, &state.client).await;
let active = state.active_project.read().await.clone();
let body = json!({ "active": active, "projects": results });
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_vec(&body).unwrap_or_default()))
}
// ── Bot config page ───────────────────────────────────────────────────────────
/// Self-contained HTML page for bot configuration.
const GATEWAY_BOT_CONFIG_HTML: &str = r#"<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Bot Configuration &#x2014; Huskies Gateway</title>
<style>
*, *::before, *::after { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: #0f172a;
color: #e2e8f0;
min-height: 100vh;
display: flex;
align-items: center;
justify-content: center;
}
.card {
background: #1e293b;
border: 1px solid #334155;
border-radius: 12px;
padding: 2rem;
width: 100%;
max-width: 520px;
box-shadow: 0 4px 24px rgba(0,0,0,0.4);
}
.header {
display: flex;
align-items: center;
gap: 0.75rem;
margin-bottom: 1.5rem;
}
.back {
color: #64748b;
text-decoration: none;
font-size: 0.85rem;
margin-right: auto;
}
.back:hover { color: #94a3b8; }
.logo { font-size: 1.5rem; }
h1 { font-size: 1.2rem; font-weight: 600; color: #f8fafc; }
.field { margin-bottom: 1rem; }
label {
display: block;
font-size: 0.75rem;
font-weight: 500;
color: #94a3b8;
text-transform: uppercase;
letter-spacing: 0.05em;
margin-bottom: 0.4rem;
}
input, select {
width: 100%;
padding: 0.625rem 0.875rem;
background: #0f172a;
border: 1px solid #334155;
border-radius: 8px;
color: #f1f5f9;
font-size: 0.9rem;
}
input:focus, select:focus { outline: none; border-color: #6366f1; box-shadow: 0 0 0 2px rgba(99,102,241,0.25); }
select {
cursor: pointer;
appearance: none;
background-image: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='12' height='12' viewBox='0 0 12 12'%3E%3Cpath fill='%2394a3b8' d='M6 8L1 3h10z'/%3E%3C/svg%3E");
background-repeat: no-repeat;
background-position: right 0.875rem center;
padding-right: 2.5rem;
}
.section { margin-top: 1rem; }
.divider {
border: none;
border-top: 1px solid #334155;
margin: 1.25rem 0;
}
button {
width: 100%;
padding: 0.75rem;
background: #6366f1;
border: none;
border-radius: 8px;
color: #fff;
font-size: 0.9rem;
font-weight: 600;
cursor: pointer;
margin-top: 1.25rem;
}
button:hover { background: #4f46e5; }
button:disabled { background: #334155; color: #64748b; cursor: not-allowed; }
.status { margin-top: 0.875rem; font-size: 0.8rem; color: #64748b; min-height: 1.25rem; }
.status.ok { color: #4ade80; }
.status.err { color: #f87171; }
</style>
</head>
<body>
<div class="card">
<div class="header">
<a href="/" class="back">&#x2190; Gateway</a>
<span class="logo">&#x1F916;</span>
<h1>Bot Configuration</h1>
</div>
<div class="field">
<label for="transport">Transport</label>
<select id="transport" onchange="onTransportChange(this.value)">
<option value="matrix">Matrix</option>
<option value="slack">Slack</option>
</select>
</div>
<hr class="divider">
<div id="matrix-fields" class="section">
<div class="field">
<label for="homeserver">Homeserver URL</label>
<input type="text" id="homeserver" placeholder="https://matrix.example.com">
</div>
<div class="field">
<label for="username">Bot Username</label>
<input type="text" id="username" placeholder="@bot:example.com">
</div>
<div class="field">
<label for="password">Password</label>
<input type="password" id="password" placeholder="&#x2022;&#x2022;&#x2022;&#x2022;&#x2022;&#x2022;&#x2022;&#x2022;">
</div>
</div>
<div id="slack-fields" class="section" style="display:none">
<div class="field">
<label for="slack-bot-token">Bot Token</label>
<input type="password" id="slack-bot-token" placeholder="xoxb-&#x2026;">
</div>
<div class="field">
<label for="slack-signing-secret">App / Signing Secret</label>
<input type="password" id="slack-signing-secret" placeholder="Your signing secret">
</div>
</div>
<button id="save-btn" onclick="save()">Save &amp; Restart Bot</button>
<div id="status" class="status"></div>
</div>
<script>
function onTransportChange(v) {
document.getElementById('matrix-fields').style.display = v === 'matrix' ? '' : 'none';
document.getElementById('slack-fields').style.display = v === 'slack' ? '' : 'none';
}
async function loadConfig() {
try {
const r = await fetch('/api/gateway/bot-config');
const d = await r.json();
document.getElementById('transport').value = d.transport || 'matrix';
onTransportChange(d.transport || 'matrix');
document.getElementById('homeserver').value = d.homeserver || '';
document.getElementById('username').value = d.username || '';
document.getElementById('password').value = d.password || '';
document.getElementById('slack-bot-token').value = d.slack_bot_token || '';
document.getElementById('slack-signing-secret').value = d.slack_signing_secret || '';
} catch(e) {
document.getElementById('status').textContent = 'Failed to load config: ' + e;
document.getElementById('status').className = 'status err';
}
}
async function save() {
const btn = document.getElementById('save-btn');
const statusEl = document.getElementById('status');
btn.disabled = true;
btn.textContent = 'Saving\u2026';
statusEl.className = 'status';
statusEl.textContent = '';
const transport = document.getElementById('transport').value;
const payload = { transport };
if (transport === 'matrix') {
payload.homeserver = document.getElementById('homeserver').value;
payload.username = document.getElementById('username').value;
payload.password = document.getElementById('password').value;
} else {
payload.slack_bot_token = document.getElementById('slack-bot-token').value;
payload.slack_signing_secret = document.getElementById('slack-signing-secret').value;
}
try {
const r = await fetch('/api/gateway/bot-config', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify(payload)
});
const d = await r.json();
if (d.ok) {
statusEl.className = 'status ok';
statusEl.textContent = 'Saved \u2014 bot restarted with new credentials.';
} else {
statusEl.className = 'status err';
statusEl.textContent = d.error || 'Save failed';
}
} catch(e) {
statusEl.className = 'status err';
statusEl.textContent = 'Error: ' + e;
}
btn.disabled = false;
btn.textContent = 'Save & Restart Bot';
}
loadConfig();
</script>
</body>
</html>
"#;
/// Serve the bot configuration HTML page at `GET /bot-config`.
#[handler]
pub async fn gateway_bot_config_page_handler() -> Response {
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/html; charset=utf-8")
.body(Body::from(GATEWAY_BOT_CONFIG_HTML))
}
+260
View File
@@ -0,0 +1,260 @@
//! WebSocket handlers for CRDT-sync agent registration and event push.
use crate::service::gateway::{self, GatewayState};
use futures::{SinkExt, StreamExt};
use poem::handler;
use poem::http::StatusCode;
use poem::web::websocket::{Message as WsMessage, WebSocket};
use poem::web::{Data, Query};
use serde::Deserialize;
use std::sync::Arc;
// ── CRDT-sync WebSocket handler (agent registration) ─────────────────────────
/// Query parameters accepted on the `/crdt-sync` WebSocket upgrade in gateway mode.
#[derive(Deserialize)]
struct GatewayCrdtSyncParams {
/// One-time join token from `POST /gateway/tokens`.
token: Option<String>,
/// Human-readable label for the connecting agent.
label: Option<String>,
/// WebSocket address the agent exposes for mesh connections
/// (e.g. `ws://0.0.0.0:3002/crdt-sync`).
address: Option<String>,
}
/// `GET /crdt-sync` — gateway-side WebSocket endpoint for build agent registration.
///
/// # Authentication
///
/// The connecting node must supply a valid one-time join token via the `token`
/// query parameter, obtained from `POST /gateway/tokens`. The token is
/// validated pre-upgrade and consumed by [`gateway::register_agent`] once the
/// WebSocket is established.
///
/// # Protocol
///
/// 1. Pre-upgrade: token existence is verified (read-only).
/// 2. Post-upgrade: [`gateway::register_agent`] writes the node to the CRDT and
/// consumes the token.
/// 3. Keepalive: the server sends Ping frames every 30 s; each Pong resets the
/// pong deadline and calls [`gateway::heartbeat_agent`].
/// 4. Disconnect: on clean close or pong timeout, [`gateway::remove_agent`]
/// tombstones the node (`alive = false`).
#[handler]
pub async fn gateway_crdt_sync_handler(
ws: WebSocket,
state: Data<&Arc<GatewayState>>,
Query(params): Query<GatewayCrdtSyncParams>,
remote_addr: &poem::web::RemoteAddr,
) -> poem::Response {
use crate::crdt_sync::{PING_INTERVAL_SECS, PONG_TIMEOUT_SECS};
// ── Pre-upgrade: validate token exists ─────────────────────────────
let token = match params.token {
Some(t) if !t.is_empty() => t,
_ => {
return poem::Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body("token query parameter required");
}
};
{
let tokens = state.pending_tokens.read().await;
if !tokens.contains_key(&token) {
return poem::Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body("invalid or already-used join token");
}
}
let label = params
.label
.filter(|l| !l.is_empty())
.unwrap_or_else(|| "build-agent".to_string());
let address = params
.address
.filter(|a| !a.is_empty())
.unwrap_or_else(|| remote_addr.to_string());
// ── WebSocket upgrade ───────────────────────────────────────────────
use poem::IntoResponse as _;
let state = Arc::clone(&state);
ws.on_upgrade(move |socket| async move {
// Register the agent — consumes the one-time token.
let node = match gateway::register_agent(&state, &token, label, address).await {
Ok(n) => n,
Err(e) => {
crate::slog!("[gateway/crdt-sync] Registration failed: {e}");
return;
}
};
let node_id = node.node_id.clone();
crate::slog!(
"[gateway/crdt-sync] Agent '{}' registered, node_id={:.12}…",
node.label.as_deref().unwrap_or("?"),
&node_id
);
let (mut sink, mut stream) = socket.split();
let mut pong_deadline =
tokio::time::Instant::now() + std::time::Duration::from_secs(PONG_TIMEOUT_SECS);
let mut ping_ticker = tokio::time::interval_at(
tokio::time::Instant::now() + std::time::Duration::from_secs(PING_INTERVAL_SECS),
std::time::Duration::from_secs(PING_INTERVAL_SECS),
);
loop {
tokio::select! {
_ = ping_ticker.tick() => {
if tokio::time::Instant::now() >= pong_deadline {
crate::slog!(
"[gateway/crdt-sync] No pong from {:.12}… in {}s; disconnecting",
&node_id, PONG_TIMEOUT_SECS
);
break;
}
if sink.send(WsMessage::Ping(vec![])).await.is_err() {
break;
}
}
frame = stream.next() => {
match frame {
Some(Ok(WsMessage::Pong(_))) => {
pong_deadline = tokio::time::Instant::now()
+ std::time::Duration::from_secs(PONG_TIMEOUT_SECS);
gateway::heartbeat_agent(&node_id);
}
Some(Ok(WsMessage::Ping(data))) => {
let _ = sink.send(WsMessage::Pong(data)).await;
}
Some(Ok(WsMessage::Close(_))) | None => break,
_ => {}
}
}
}
}
gateway::remove_agent(&node_id);
crate::slog!(
"[gateway/crdt-sync] Agent {:.12}… disconnected and tombstoned",
&node_id
);
})
.into_response()
}
// ── Event-push WebSocket handler ─────────────────────────────────────────────
/// Query parameters accepted on the `/gateway/events/push` WebSocket upgrade.
#[derive(Deserialize)]
struct EventPushQueryParams {
/// One-time join token generated by `POST /gateway/tokens`.
token: Option<String>,
/// The project name this node represents (e.g. `"huskies"`).
project: Option<String>,
}
/// `GET /gateway/events/push` — WebSocket endpoint for project nodes to push
/// [`StatusEvent`] frames to the gateway.
///
/// # Authentication
///
/// The connecting node must supply a valid one-time join token via the `token`
/// query parameter, obtained from `POST /gateway/tokens`. The token is
/// consumed on the first successful upgrade — the connection itself is then
/// kept open indefinitely.
///
/// # Protocol
///
/// Each message from the project node must be a JSON-encoded
/// [`crate::service::events::StoredEvent`]. The gateway fan-outs the event
/// (tagged with the project name) to all current local subscribers.
///
/// The server does not send data back; clients should treat any close frame
/// as a signal to reconnect with exponential back-off (see docs/gateway-protocol.html).
///
/// # Reconnect-with-backoff
///
/// Project nodes MUST reconnect on disconnect. Recommended policy:
///
/// - Initial retry delay: **1 s**
/// - Back-off multiplier: **2×** per attempt
/// - Max delay cap: **60 s**
/// - Jitter: add ±10 % to the delay to avoid thundering herds
#[handler]
pub async fn gateway_event_push_handler(
ws: WebSocket,
state: Data<&Arc<GatewayState>>,
Query(params): Query<EventPushQueryParams>,
) -> poem::Response {
// ── Authentication (pre-upgrade) ─────────────────────────────────────
let token = match params.token {
Some(t) if !t.is_empty() => t,
_ => {
return poem::Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body("token query parameter required");
}
};
let project = match params.project {
Some(p) if !p.is_empty() => p,
_ => {
return poem::Response::builder()
.status(StatusCode::BAD_REQUEST)
.body("project query parameter required");
}
};
// Validate and consume the one-time token.
{
let mut tokens = state.pending_tokens.write().await;
if !tokens.contains_key(&token) {
return poem::Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body("invalid or already-used join token");
}
tokens.remove(&token);
}
// ── WebSocket upgrade ────────────────────────────────────────────────
use poem::IntoResponse as _;
let state = Arc::clone(&state);
ws.on_upgrade(move |socket| async move {
let (_, mut stream) = socket.split();
crate::slog!(
"[gateway] Project node '{}' connected to event-push endpoint",
project
);
while let Some(msg) = stream.next().await {
let text = match msg {
Ok(WsMessage::Text(t)) => t,
Ok(WsMessage::Close(_)) | Err(_) => break,
_ => continue,
};
match serde_json::from_str::<crate::service::events::StoredEvent>(&text) {
Ok(event) => {
gateway::broadcast_status_event(&state, project.clone(), event);
}
Err(e) => {
crate::slog!(
"[gateway] event-push: invalid frame from '{}': {e}",
project
);
}
}
}
crate::slog!(
"[gateway] Project node '{}' disconnected from event-push endpoint",
project
);
})
.into_response()
}