//! MCP JSON-RPC POST/GET handlers and gateway tool dispatch. use super::jsonrpc::{JsonRpcRequest, JsonRpcResponse, to_json_response}; use crate::service::gateway::{self, GatewayState}; use poem::handler; use poem::http::StatusCode; use poem::web::Data; use poem::{Body, Request, Response}; use serde_json::{Value, json}; use std::collections::BTreeMap; use std::sync::Arc; // ── MCP tool definitions ───────────────────────────────────────────────────── /// Gateway-specific MCP tools exposed alongside the proxied tools. const GATEWAY_TOOLS: &[&str] = &[ "switch_project", "gateway_status", "gateway_health", "init_project", "aggregate_pipeline_status", "agents.list", ]; /// Gateway tool definitions. pub(crate) fn gateway_tool_definitions() -> Vec { vec![ json!({ "name": "switch_project", "description": "Switch the active project. All subsequent MCP tool calls will be proxied to this project's container.", "inputSchema": { "type": "object", "properties": { "project": { "type": "string", "description": "Name of the project to switch to (must exist in projects.toml)" } }, "required": ["project"] } }), json!({ "name": "gateway_status", "description": "Show pipeline status for the active project by proxying the get_pipeline_status tool call.", "inputSchema": { "type": "object", "properties": {} } }), json!({ "name": "gateway_health", "description": "Health check aggregation across all registered projects. Returns the health status of every project container.", "inputSchema": { "type": "object", "properties": {} } }), json!({ "name": "init_project", "description": "Initialize a new huskies project at the given path by scaffolding .huskies/ and related files — the same as running `huskies init `. Prefer this tool over asking the user to run the CLI. If `name` and `url` are supplied the project is also registered in projects.toml so switch_project can reach it immediately.", "inputSchema": { "type": "object", "properties": { "path": { "type": "string", "description": "Absolute filesystem path to the project directory to initialise. The directory is created if it does not exist." }, "name": { "type": "string", "description": "Optional: short name to register the project under in projects.toml (e.g. 'my-app'). Requires `url`." }, "url": { "type": "string", "description": "Optional: base URL of the huskies container that will serve this project (e.g. 'http://my-app:3001'). Required when `name` is given." } }, "required": ["path"] } }), json!({ "name": "aggregate_pipeline_status", "description": "Fetch pipeline status from ALL registered projects in parallel and return an aggregated report. For each project: stage counts (backlog/current/qa/merge/done) and a list of blocked or failing items with triage detail. Unreachable projects are included with an error state rather than failing the whole call.", "inputSchema": { "type": "object", "properties": {} } }), json!({ "name": "agents.list", "description": "List all alive build agents currently registered with this gateway. Returns an array of agent objects with id, label, address, registered_at, last_seen, and assigned_project fields.", "inputSchema": { "type": "object", "properties": {} } }), ] } // ── MCP POST handler ───────────────────────────────────────────────────────── /// Main MCP POST handler for the gateway. Intercepts gateway-specific tools and /// proxies everything else to the active project's container. #[handler] pub async fn gateway_mcp_post_handler( req: &Request, body: Body, state: Data<&Arc>, ) -> Response { let content_type = req.header("content-type").unwrap_or(""); if !content_type.is_empty() && !content_type.contains("application/json") { return to_json_response(JsonRpcResponse::error( None, -32700, "Unsupported Content-Type; expected application/json".into(), )); } let bytes = match body.into_bytes().await { Ok(b) => b, Err(_) => { return to_json_response(JsonRpcResponse::error(None, -32700, "Parse error".into())); } }; let rpc: JsonRpcRequest = match serde_json::from_slice(&bytes) { Ok(r) => r, Err(_) => { return to_json_response(JsonRpcResponse::error(None, -32700, "Parse error".into())); } }; if rpc.jsonrpc != "2.0" { return to_json_response(JsonRpcResponse::error( rpc.id, -32600, "Invalid JSON-RPC version".into(), )); } if rpc.id.is_none() || rpc.id.as_ref() == Some(&Value::Null) { if rpc.method.starts_with("notifications/") { return Response::builder() .status(StatusCode::ACCEPTED) .body(Body::empty()); } return to_json_response(JsonRpcResponse::error(None, -32600, "Missing id".into())); } match rpc.method.as_str() { "initialize" => to_json_response(handle_initialize(rpc.id)), "tools/list" => match handle_tools_list(&state, rpc.id.clone()).await { Ok(resp) => to_json_response(resp), Err(e) => to_json_response(JsonRpcResponse::error(rpc.id, -32603, e)), }, "pipeline.get" => to_json_response(handle_pipeline_get(&state, rpc.id).await), "tools/call" => { let tool_name = rpc .params .get("name") .and_then(|v| v.as_str()) .unwrap_or(""); if GATEWAY_TOOLS.contains(&tool_name) { to_json_response( handle_gateway_tool(tool_name, &rpc.params, &state, rpc.id.clone()).await, ) } else { proxy_and_respond(&state, &bytes, rpc.id).await } } _ => proxy_and_respond(&state, &bytes, rpc.id).await, } } /// Proxy a request to the active project and format the response. async fn proxy_and_respond(state: &GatewayState, bytes: &[u8], id: Option) -> Response { let url = match state.active_url().await { Ok(u) => u, Err(e) => { return to_json_response(JsonRpcResponse::error(id, -32603, e.to_string())); } }; match gateway::io::proxy_mcp_call(&state.client, &url, bytes).await { Ok(resp_body) => Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/json") .body(Body::from(resp_body)), Err(e) => to_json_response(JsonRpcResponse::error( id, -32603, format!("proxy error: {e}"), )), } } /// GET handler — method not allowed. #[handler] pub async fn gateway_mcp_get_handler() -> Response { Response::builder() .status(StatusCode::METHOD_NOT_ALLOWED) .body(Body::empty()) } // ── Protocol handlers ──────────────────────────────────────────────────────── fn handle_initialize(id: Option) -> JsonRpcResponse { JsonRpcResponse::success( id, json!({ "protocolVersion": "2025-03-26", "capabilities": { "tools": {} }, "serverInfo": { "name": "huskies-gateway", "version": "1.0.0" } }), ) } /// Fetch tools/list from the active project and merge in gateway tools. async fn handle_tools_list( state: &GatewayState, id: Option, ) -> Result { let url = state.active_url().await.map_err(|e| e.to_string())?; let resp_json = gateway::io::fetch_tools_list(&state.client, &url).await?; let mut tools: Vec = resp_json .get("result") .and_then(|r| r.get("tools")) .and_then(|t| t.as_array()) .cloned() .unwrap_or_default(); let mut all_tools = gateway_tool_definitions(); all_tools.append(&mut tools); Ok(JsonRpcResponse::success(id, json!({ "tools": all_tools }))) } // ── Gateway tool dispatch ──────────────────────────────────────────────────── /// Dispatch a gateway-specific tool call. async fn handle_gateway_tool( tool_name: &str, params: &Value, state: &GatewayState, id: Option, ) -> JsonRpcResponse { match tool_name { "switch_project" => handle_switch_project_tool(params, state, id).await, "gateway_status" => handle_gateway_status_tool(state, id).await, "gateway_health" => handle_gateway_health_tool(state, id).await, "init_project" => handle_init_project_tool(params, state, id).await, "aggregate_pipeline_status" => handle_aggregate_pipeline_status_tool(state, id).await, "agents.list" => handle_agents_list_tool(id), _ => JsonRpcResponse::error(id, -32601, format!("Unknown gateway tool: {tool_name}")), } } async fn handle_switch_project_tool( params: &Value, state: &GatewayState, id: Option, ) -> JsonRpcResponse { let project = params .get("arguments") .and_then(|a| a.get("project")) .or_else(|| params.get("project")) .and_then(|v| v.as_str()) .unwrap_or(""); match gateway::switch_project(state, project).await { Ok(url) => JsonRpcResponse::success( id, json!({ "content": [{ "type": "text", "text": format!("Switched to project '{project}' ({url})") }] }), ), Err(e) => JsonRpcResponse::error(id, -32602, e.to_string()), } } async fn handle_gateway_status_tool(state: &GatewayState, id: Option) -> JsonRpcResponse { let active = state.active_project.read().await.clone(); let url = match state.active_url().await { Ok(u) => u, Err(e) => return JsonRpcResponse::error(id.clone(), -32603, e.to_string()), }; match gateway::io::fetch_pipeline_status_for_project(&state.client, &url).await { Ok(upstream) => { let pipeline = upstream.get("result").cloned().unwrap_or(json!(null)); JsonRpcResponse::success( id, json!({ "content": [{ "type": "text", "text": format!( "Pipeline status for '{active}':\n{}", serde_json::to_string_pretty(&pipeline).unwrap_or_default() ) }] }), ) } Err(e) => JsonRpcResponse::error(id, -32603, e), } } async fn handle_gateway_health_tool(state: &GatewayState, id: Option) -> JsonRpcResponse { let mut results = BTreeMap::new(); let project_entries: Vec<(String, String)> = state .projects .read() .await .iter() .map(|(n, e)| (n.clone(), e.url.clone())) .collect(); for (name, url) in &project_entries { let status = match gateway::io::check_project_health(&state.client, url).await { Ok(true) => "healthy".to_string(), Ok(false) => "unhealthy".to_string(), Err(e) => e, }; results.insert(name.clone(), status); } let active = state.active_project.read().await.clone(); JsonRpcResponse::success( id, json!({ "content": [{ "type": "text", "text": format!( "Health check (active: '{active}'):\n{}", results.iter() .map(|(name, status)| format!(" {name}: {status}")) .collect::>() .join("\n") ) }] }), ) } async fn handle_init_project_tool( params: &Value, state: &GatewayState, id: Option, ) -> JsonRpcResponse { let args = params.get("arguments").unwrap_or(params); let path_str = args.get("path").and_then(|v| v.as_str()).unwrap_or(""); let name = args.get("name").and_then(|v| v.as_str()); let url = args.get("url").and_then(|v| v.as_str()); match gateway::init_project(state, path_str, name, url).await { Ok(registered_name) => { let next_steps = if let Some(ref n) = registered_name { format!( "Project registered as '{n}' in projects.toml.\n\ Next steps:\n\ 1. Start a huskies server at '{path_str}' \ (e.g. `huskies {path_str}` or via Docker).\n\ 2. Call switch_project with name='{n}' to make it active.\n\ 3. Call wizard_status to begin the setup wizard." ) } else { format!( "Next steps:\n\ 1. Start a huskies server at '{path_str}' \ (e.g. `huskies {path_str}` or via Docker).\n\ 2. Register the project: call init_project again with name and url \ parameters, or add it to projects.toml manually.\n\ 3. Call switch_project and then wizard_status to begin the setup wizard.\n\n\ Note: wizard_* MCP tools require a running huskies server for the project." ) }; JsonRpcResponse::success( id, json!({ "content": [{ "type": "text", "text": format!("Successfully initialised huskies project at '{path_str}'.\n\n{next_steps}") }] }), ) } Err(e) => { let code = match &e { gateway::Error::Config(_) => -32602, gateway::Error::DuplicateToken(_) => -32602, _ => -32603, }; JsonRpcResponse::error(id, code, e.to_string()) } } } async fn handle_aggregate_pipeline_status_tool( state: &GatewayState, id: Option, ) -> JsonRpcResponse { let project_urls: BTreeMap = state .projects .read() .await .iter() .map(|(name, entry)| (name.clone(), entry.url.clone())) .collect(); let statuses = gateway::io::fetch_all_project_pipeline_statuses(&project_urls, &state.client).await; let active = state.active_project.read().await.clone(); JsonRpcResponse::success( id, json!({ "content": [{ "type": "text", "text": format!( "Aggregate pipeline status (active: '{active}'):\n{}", serde_json::to_string_pretty(&statuses).unwrap_or_default() ) }], "projects": statuses, "active": active, }), ) } /// Handle the `agents.list` gateway tool — returns all alive build agents from the CRDT. fn handle_agents_list_tool(id: Option) -> JsonRpcResponse { let agents = gateway::list_agents(); let agents_json = serde_json::to_value(&agents).unwrap_or(json!([])); JsonRpcResponse::success( id, json!({ "content": [{ "type": "text", "text": serde_json::to_string_pretty(&agents).unwrap_or_default() }], "agents": agents_json, }), ) } /// Handle the `pipeline.get` read-RPC — returns the same shape as the old /// `GET /api/gateway/pipeline` endpoint: `{ "active": "...", "projects": {...} }`. async fn handle_pipeline_get(state: &GatewayState, id: Option) -> JsonRpcResponse { let project_urls: BTreeMap = state .projects .read() .await .iter() .map(|(n, e)| (n.clone(), e.url.clone())) .collect(); let results = gateway::io::fetch_all_project_pipeline_statuses(&project_urls, &state.client).await; let active = state.active_project.read().await.clone(); JsonRpcResponse::success(id, json!({ "active": active, "projects": results })) }