2026-04-13 13:02:41 +00:00
//! Multi-project gateway — proxies MCP calls to per-project Docker containers.
//!
//! When `huskies --gateway` is used, the server starts in gateway mode: it reads
//! a `projects.toml` config that maps project names to container URLs, maintains
//! an "active project" selection, and proxies all MCP tool calls to the active
//! project's container. Gateway-specific tools allow switching projects, querying
//! status, and aggregating health checks across all registered projects.
use poem ::EndpointExt ;
use poem ::handler ;
use poem ::http ::StatusCode ;
2026-04-14 12:02:17 +00:00
use poem ::web ::Path as PoemPath ;
2026-04-14 11:24:12 +00:00
use poem ::web ::{ Data , Json } ;
2026-04-13 13:02:41 +00:00
use poem ::{ Body , Request , Response } ;
use reqwest ::Client ;
use serde ::{ Deserialize , Serialize } ;
2026-04-13 14:07:08 +00:00
use serde_json ::{ Value , json } ;
2026-04-13 13:02:41 +00:00
use std ::collections ::BTreeMap ;
2026-04-14 12:02:17 +00:00
use std ::collections ::HashMap ;
2026-04-14 12:25:12 +00:00
use std ::path ::{ Path , PathBuf } ;
2026-04-13 13:02:41 +00:00
use std ::sync ::Arc ;
2026-04-14 18:53:41 +00:00
use tokio ::sync ::Mutex as TokioMutex ;
2026-04-13 13:02:41 +00:00
use tokio ::sync ::RwLock ;
2026-04-14 12:02:17 +00:00
use uuid ::Uuid ;
2026-04-13 13:02:41 +00:00
2026-04-14 09:57:11 +00:00
// Re-export active_project type alias for clarity in gateway bot helpers.
type ActiveProject = Arc < RwLock < String > > ;
2026-04-13 13:02:41 +00:00
// ── Config ───────────────────────────────────────────────────────────
/// A single project entry in `projects.toml`.
#[ derive(Debug, Clone, Deserialize, Serialize) ]
pub struct ProjectEntry {
/// Base URL of the project's huskies container (e.g. `http://localhost:3001`).
pub url : String ,
}
/// Top-level `projects.toml` config.
#[ derive(Debug, Clone, Deserialize, Serialize) ]
pub struct GatewayConfig {
/// Map of project name → container URL.
#[ serde(default) ]
pub projects : BTreeMap < String , ProjectEntry > ,
}
impl GatewayConfig {
/// Load gateway config from a `projects.toml` file.
pub fn load ( path : & Path ) -> Result < Self , String > {
let contents = std ::fs ::read_to_string ( path )
. map_err ( | e | format! ( " cannot read {} : {e} " , path . display ( ) ) ) ? ;
2026-04-13 14:07:08 +00:00
toml ::from_str ( & contents ) . map_err ( | e | format! ( " invalid projects.toml: {e} " ) )
2026-04-13 13:02:41 +00:00
}
}
2026-04-14 12:02:17 +00:00
// ── Agent join types ─────────────────────────────────────────────────
/// A build agent that has registered with this gateway.
#[ derive(Debug, Clone, Serialize, Deserialize) ]
pub struct JoinedAgent {
/// Unique ID assigned by the gateway on registration.
pub id : String ,
/// Human-readable label provided by the agent (e.g. `build-agent-abc123`).
pub label : String ,
/// The agent's CRDT-sync WebSocket address (e.g. `ws://host:3001/crdt-sync`).
pub address : String ,
/// Unix timestamp when the agent registered.
pub registered_at : f64 ,
2026-04-15 18:20:39 +00:00
/// Unix timestamp of the last heartbeat from this agent. Defaults to `registered_at`
/// for agents loaded from persisted state that predate the heartbeat feature.
#[ serde(default) ]
pub last_seen : f64 ,
2026-04-14 12:25:12 +00:00
/// Project this agent is assigned to, if any.
#[ serde(default, skip_serializing_if = " Option::is_none " ) ]
pub assigned_project : Option < String > ,
2026-04-14 12:02:17 +00:00
}
/// A one-time join token that has been generated but not yet consumed.
struct PendingToken {
#[ allow(dead_code) ]
created_at : f64 ,
}
/// Request body sent by a build agent when registering with the gateway.
#[ derive(Deserialize) ]
struct RegisterAgentRequest {
token : String ,
label : String ,
address : String ,
}
2026-04-14 12:25:12 +00:00
/// Request body for assigning or reassigning an agent to a project.
///
/// Send `{"project": "my-project"}` to assign, or `{"project": null}` to unassign.
#[ derive(Deserialize) ]
struct AssignAgentRequest {
project : Option < String > ,
}
2026-04-13 13:02:41 +00:00
// ── Gateway state ────────────────────────────────────────────────────
/// Shared gateway state threaded through HTTP handlers.
#[ derive(Clone) ]
pub struct GatewayState {
2026-04-15 18:02:47 +00:00
/// The live set of registered projects (initially loaded from `projects.toml`).
pub projects : Arc < RwLock < BTreeMap < String , ProjectEntry > > > ,
2026-04-13 13:02:41 +00:00
/// The currently active project name.
pub active_project : Arc < RwLock < String > > ,
/// HTTP client for proxying requests to project containers.
pub client : Client ,
2026-04-14 12:02:17 +00:00
/// Build agents that have joined this gateway.
pub joined_agents : Arc < RwLock < Vec < JoinedAgent > > > ,
/// One-time join tokens that have been issued but not yet consumed.
pending_tokens : Arc < RwLock < HashMap < String , PendingToken > > > ,
2026-04-14 18:53:41 +00:00
/// Directory containing `projects.toml` and the `.huskies/` subfolder.
2026-04-14 12:25:12 +00:00
pub config_dir : PathBuf ,
2026-04-14 18:53:41 +00:00
/// HTTP port the gateway is listening on.
pub port : u16 ,
/// Abort handle for the running Matrix bot task (if any).
/// Stored so the bot can be restarted when credentials change.
pub bot_handle : Arc < TokioMutex < Option < tokio ::task ::AbortHandle > > > ,
2026-04-14 12:25:12 +00:00
}
/// Load persisted agents from `<config_dir>/gateway_agents.json`.
/// Returns an empty list if the file does not exist or cannot be parsed.
fn load_agents ( config_dir : & Path ) -> Vec < JoinedAgent > {
let path = config_dir . join ( " gateway_agents.json " ) ;
match std ::fs ::read ( & path ) {
Ok ( data ) = > serde_json ::from_slice ( & data ) . unwrap_or_default ( ) ,
Err ( _ ) = > Vec ::new ( ) ,
}
}
2026-04-15 18:02:47 +00:00
/// Persist the current projects map to `<config_dir>/projects.toml`.
/// Silently ignores write errors or skips when `config_dir` is empty.
async fn save_config ( projects : & BTreeMap < String , ProjectEntry > , config_dir : & Path ) {
if config_dir . as_os_str ( ) . is_empty ( ) {
return ;
}
let path = config_dir . join ( " projects.toml " ) ;
let config = GatewayConfig {
projects : projects . clone ( ) ,
} ;
if let Ok ( data ) = toml ::to_string_pretty ( & config ) {
let _ = tokio ::fs ::write ( & path , data ) . await ;
}
}
2026-04-14 12:25:12 +00:00
/// Persist the current agent list to `<config_dir>/gateway_agents.json`.
/// Silently ignores write errors (e.g. read-only filesystem or empty path).
async fn save_agents ( agents : & [ JoinedAgent ] , config_dir : & Path ) {
if config_dir = = Path ::new ( " " ) {
return ;
}
let path = config_dir . join ( " gateway_agents.json " ) ;
if let Ok ( data ) = serde_json ::to_vec_pretty ( agents ) {
let _ = tokio ::fs ::write ( & path , data ) . await ;
}
2026-04-13 13:02:41 +00:00
}
impl GatewayState {
2026-04-14 12:25:12 +00:00
/// Create a new gateway state from a config and config directory.
///
/// The first project in the config becomes the active project by default.
/// Previously registered agents are loaded from `gateway_agents.json` in
/// `config_dir` if the file exists.
2026-04-14 18:53:41 +00:00
pub fn new ( config : GatewayConfig , config_dir : PathBuf , port : u16 ) -> Result < Self , String > {
2026-04-13 13:02:41 +00:00
if config . projects . is_empty ( ) {
return Err ( " projects.toml must define at least one project " . to_string ( ) ) ;
}
let first = config . projects . keys ( ) . next ( ) . unwrap ( ) . clone ( ) ;
2026-04-14 12:25:12 +00:00
let agents = load_agents ( & config_dir ) ;
2026-04-13 13:02:41 +00:00
Ok ( Self {
2026-04-15 18:02:47 +00:00
projects : Arc ::new ( RwLock ::new ( config . projects ) ) ,
2026-04-13 13:02:41 +00:00
active_project : Arc ::new ( RwLock ::new ( first ) ) ,
client : Client ::new ( ) ,
2026-04-14 12:25:12 +00:00
joined_agents : Arc ::new ( RwLock ::new ( agents ) ) ,
2026-04-14 12:02:17 +00:00
pending_tokens : Arc ::new ( RwLock ::new ( HashMap ::new ( ) ) ) ,
2026-04-14 12:25:12 +00:00
config_dir ,
2026-04-14 18:53:41 +00:00
port ,
bot_handle : Arc ::new ( TokioMutex ::new ( None ) ) ,
2026-04-13 13:02:41 +00:00
} )
}
/// Get the URL of the currently active project.
async fn active_url ( & self ) -> Result < String , String > {
let name = self . active_project . read ( ) . await . clone ( ) ;
2026-04-15 18:02:47 +00:00
self . projects
. read ( )
. await
2026-04-13 13:02:41 +00:00
. get ( & name )
. map ( | p | p . url . clone ( ) )
. ok_or_else ( | | format! ( " active project ' {name} ' not found in config " ) )
}
}
// ── MCP proxy handler ────────────────────────────────────────────────
/// JSON-RPC request (duplicated here to keep the gateway self-contained).
#[ derive(Deserialize) ]
struct JsonRpcRequest {
jsonrpc : String ,
id : Option < Value > ,
method : String ,
#[ serde(default) ]
params : Value ,
}
/// JSON-RPC response.
#[ derive(Serialize) ]
struct JsonRpcResponse {
jsonrpc : & 'static str ,
#[ serde(skip_serializing_if = " Option::is_none " ) ]
id : Option < Value > ,
#[ serde(skip_serializing_if = " Option::is_none " ) ]
result : Option < Value > ,
#[ serde(skip_serializing_if = " Option::is_none " ) ]
error : Option < JsonRpcError > ,
}
2026-04-22 21:33:44 +00:00
#[ derive(Debug, Serialize) ]
2026-04-13 13:02:41 +00:00
struct JsonRpcError {
code : i64 ,
message : String ,
}
impl JsonRpcResponse {
fn success ( id : Option < Value > , result : Value ) -> Self {
2026-04-13 14:07:08 +00:00
Self {
jsonrpc : " 2.0 " ,
id ,
result : Some ( result ) ,
error : None ,
}
2026-04-13 13:02:41 +00:00
}
fn error ( id : Option < Value > , code : i64 , message : String ) -> Self {
2026-04-13 14:07:08 +00:00
Self {
jsonrpc : " 2.0 " ,
id ,
result : None ,
error : Some ( JsonRpcError { code , message } ) ,
}
2026-04-13 13:02:41 +00:00
}
}
fn to_json_response ( resp : JsonRpcResponse ) -> Response {
let body = serde_json ::to_vec ( & resp ) . unwrap_or_default ( ) ;
Response ::builder ( )
. status ( StatusCode ::OK )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from ( body ) )
}
/// Gateway-specific MCP tools exposed alongside the proxied tools.
2026-04-22 21:33:44 +00:00
const GATEWAY_TOOLS : & [ & str ] = & [
" switch_project " ,
" gateway_status " ,
" gateway_health " ,
" init_project " ,
2026-04-23 10:38:30 +00:00
" aggregate_pipeline_status " ,
2026-04-22 21:33:44 +00:00
] ;
2026-04-13 13:02:41 +00:00
/// Main MCP POST handler for the gateway. Intercepts gateway-specific tools and
/// proxies everything else to the active project's container.
#[ handler ]
pub async fn gateway_mcp_post_handler (
req : & Request ,
body : Body ,
state : Data < & Arc < GatewayState > > ,
) -> Response {
let content_type = req . header ( " content-type " ) . unwrap_or ( " " ) ;
if ! content_type . is_empty ( ) & & ! content_type . contains ( " application/json " ) {
return to_json_response ( JsonRpcResponse ::error (
2026-04-13 14:07:08 +00:00
None ,
- 32700 ,
" Unsupported Content-Type; expected application/json " . into ( ) ,
2026-04-13 13:02:41 +00:00
) ) ;
}
let bytes = match body . into_bytes ( ) . await {
Ok ( b ) = > b ,
2026-04-13 14:07:08 +00:00
Err ( _ ) = > {
return to_json_response ( JsonRpcResponse ::error ( None , - 32700 , " Parse error " . into ( ) ) ) ;
}
2026-04-13 13:02:41 +00:00
} ;
let rpc : JsonRpcRequest = match serde_json ::from_slice ( & bytes ) {
Ok ( r ) = > r ,
2026-04-13 14:07:08 +00:00
Err ( _ ) = > {
return to_json_response ( JsonRpcResponse ::error ( None , - 32700 , " Parse error " . into ( ) ) ) ;
}
2026-04-13 13:02:41 +00:00
} ;
if rpc . jsonrpc ! = " 2.0 " {
2026-04-13 14:07:08 +00:00
return to_json_response ( JsonRpcResponse ::error (
rpc . id ,
- 32600 ,
" Invalid JSON-RPC version " . into ( ) ,
) ) ;
2026-04-13 13:02:41 +00:00
}
// Accept notifications silently.
if rpc . id . is_none ( ) | | rpc . id . as_ref ( ) = = Some ( & Value ::Null ) {
if rpc . method . starts_with ( " notifications/ " ) {
return Response ::builder ( )
. status ( StatusCode ::ACCEPTED )
. body ( Body ::empty ( ) ) ;
}
return to_json_response ( JsonRpcResponse ::error ( None , - 32600 , " Missing id " . into ( ) ) ) ;
}
match rpc . method . as_str ( ) {
" initialize " = > to_json_response ( handle_initialize ( rpc . id ) ) ,
" tools/list " = > {
// Merge gateway tools with proxied tools from the active project.
match merge_tools_list ( & state , rpc . id . clone ( ) ) . await {
Ok ( resp ) = > to_json_response ( resp ) ,
Err ( e ) = > to_json_response ( JsonRpcResponse ::error ( rpc . id , - 32603 , e ) ) ,
}
}
" tools/call " = > {
2026-04-13 14:07:08 +00:00
let tool_name = rpc
. params
2026-04-13 13:02:41 +00:00
. get ( " name " )
. and_then ( | v | v . as_str ( ) )
. unwrap_or ( " " ) ;
if GATEWAY_TOOLS . contains ( & tool_name ) {
2026-04-16 11:37:30 +00:00
to_json_response (
handle_gateway_tool ( tool_name , & rpc . params , & state , rpc . id . clone ( ) ) . await ,
)
2026-04-13 13:02:41 +00:00
} else {
// Proxy to active project's container.
match proxy_mcp_call ( & state , & bytes ) . await {
Ok ( resp_body ) = > Response ::builder ( )
. status ( StatusCode ::OK )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from ( resp_body ) ) ,
Err ( e ) = > to_json_response ( JsonRpcResponse ::error (
2026-04-13 14:07:08 +00:00
rpc . id ,
- 32603 ,
format! ( " proxy error: {e} " ) ,
2026-04-13 13:02:41 +00:00
) ) ,
}
}
}
_ = > {
// Proxy unknown methods too.
match proxy_mcp_call ( & state , & bytes ) . await {
Ok ( resp_body ) = > Response ::builder ( )
. status ( StatusCode ::OK )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from ( resp_body ) ) ,
Err ( e ) = > to_json_response ( JsonRpcResponse ::error (
2026-04-13 14:07:08 +00:00
rpc . id ,
- 32603 ,
format! ( " proxy error: {e} " ) ,
2026-04-13 13:02:41 +00:00
) ) ,
}
}
}
}
/// GET handler — method not allowed (matches the regular MCP endpoint behavior).
#[ handler ]
pub async fn gateway_mcp_get_handler ( ) -> Response {
Response ::builder ( )
. status ( StatusCode ::METHOD_NOT_ALLOWED )
. body ( Body ::empty ( ) )
}
// ── Protocol handlers ────────────────────────────────────────────────
fn handle_initialize ( id : Option < Value > ) -> JsonRpcResponse {
JsonRpcResponse ::success (
id ,
json! ( {
" protocolVersion " : " 2025-03-26 " ,
" capabilities " : { " tools " : { } } ,
" serverInfo " : {
" name " : " huskies-gateway " ,
" version " : " 1.0.0 "
}
} ) ,
)
}
/// Gateway tool definitions.
fn gateway_tool_definitions ( ) -> Vec < Value > {
vec! [
json! ( {
" name " : " switch_project " ,
" description " : " Switch the active project. All subsequent MCP tool calls will be proxied to this project's container. " ,
" inputSchema " : {
" type " : " object " ,
" properties " : {
" project " : {
" type " : " string " ,
" description " : " Name of the project to switch to (must exist in projects.toml) "
}
} ,
" required " : [ " project " ]
}
} ) ,
json! ( {
" name " : " gateway_status " ,
" description " : " Show pipeline status for the active project by proxying the get_pipeline_status tool call. " ,
" inputSchema " : {
" type " : " object " ,
" properties " : { }
}
} ) ,
json! ( {
" name " : " gateway_health " ,
" description " : " Health check aggregation across all registered projects. Returns the health status of every project container. " ,
" inputSchema " : {
" type " : " object " ,
" properties " : { }
}
} ) ,
2026-04-22 21:33:44 +00:00
json! ( {
" name " : " init_project " ,
" description " : " Initialize a new huskies project at the given path by scaffolding .huskies/ and related files — the same as running `huskies init <path>`. Prefer this tool over asking the user to run the CLI. If `name` and `url` are supplied the project is also registered in projects.toml so switch_project can reach it immediately. " ,
" inputSchema " : {
" type " : " object " ,
" properties " : {
" path " : {
" type " : " string " ,
" description " : " Absolute filesystem path to the project directory to initialise. The directory is created if it does not exist. "
} ,
" name " : {
" type " : " string " ,
" description " : " Optional: short name to register the project under in projects.toml (e.g. 'my-app'). Requires `url`. "
} ,
" url " : {
" type " : " string " ,
" description " : " Optional: base URL of the huskies container that will serve this project (e.g. 'http://my-app:3001'). Required when `name` is given. "
}
} ,
" required " : [ " path " ]
}
} ) ,
2026-04-23 10:38:30 +00:00
json! ( {
" name " : " aggregate_pipeline_status " ,
" description " : " Fetch pipeline status from ALL registered projects in parallel and return an aggregated report. For each project: stage counts (backlog/current/qa/merge/done) and a list of blocked or failing items with triage detail. Unreachable projects are included with an error state rather than failing the whole call. " ,
" inputSchema " : {
" type " : " object " ,
" properties " : { }
}
} ) ,
2026-04-13 13:02:41 +00:00
]
}
/// Fetch tools/list from the active project and merge in gateway tools.
async fn merge_tools_list (
state : & GatewayState ,
id : Option < Value > ,
) -> Result < JsonRpcResponse , String > {
let url = state . active_url ( ) . await ? ;
let mcp_url = format! ( " {} /mcp " , url . trim_end_matches ( '/' ) ) ;
let rpc_body = json! ( {
" jsonrpc " : " 2.0 " ,
" id " : 1 ,
" method " : " tools/list " ,
" params " : { }
} ) ;
2026-04-13 14:07:08 +00:00
let resp = state
. client
2026-04-13 13:02:41 +00:00
. post ( & mcp_url )
. json ( & rpc_body )
. send ( )
. await
. map_err ( | e | format! ( " failed to reach {mcp_url} : {e} " ) ) ? ;
2026-04-13 14:07:08 +00:00
let resp_json : Value = resp
. json ( )
. await
2026-04-13 13:02:41 +00:00
. map_err ( | e | format! ( " invalid JSON from upstream: {e} " ) ) ? ;
let mut tools : Vec < Value > = resp_json
. get ( " result " )
. and_then ( | r | r . get ( " tools " ) )
. and_then ( | t | t . as_array ( ) )
. cloned ( )
. unwrap_or_default ( ) ;
// Prepend gateway-specific tools.
let mut all_tools = gateway_tool_definitions ( ) ;
all_tools . append ( & mut tools ) ;
Ok ( JsonRpcResponse ::success ( id , json! ( { " tools " : all_tools } ) ) )
}
/// Proxy a raw MCP request body to the active project's container.
2026-04-13 14:07:08 +00:00
async fn proxy_mcp_call ( state : & GatewayState , request_bytes : & [ u8 ] ) -> Result < Vec < u8 > , String > {
2026-04-13 13:02:41 +00:00
let url = state . active_url ( ) . await ? ;
let mcp_url = format! ( " {} /mcp " , url . trim_end_matches ( '/' ) ) ;
2026-04-13 14:07:08 +00:00
let resp = state
. client
2026-04-13 13:02:41 +00:00
. post ( & mcp_url )
. header ( " Content-Type " , " application/json " )
. body ( request_bytes . to_vec ( ) )
. send ( )
. await
. map_err ( | e | format! ( " failed to reach {mcp_url} : {e} " ) ) ? ;
resp . bytes ( )
. await
. map ( | b | b . to_vec ( ) )
. map_err ( | e | format! ( " failed to read response from {mcp_url} : {e} " ) )
}
// ── Gateway-specific tools ───────────────────────────────────────────
/// Dispatch a gateway-specific tool call.
async fn handle_gateway_tool (
tool_name : & str ,
params : & Value ,
state : & GatewayState ,
2026-04-16 11:37:30 +00:00
id : Option < Value > ,
2026-04-13 13:02:41 +00:00
) -> JsonRpcResponse {
match tool_name {
2026-04-16 11:37:30 +00:00
" switch_project " = > handle_switch_project ( params , state , id ) . await ,
" gateway_status " = > handle_gateway_status ( state , id ) . await ,
" gateway_health " = > handle_gateway_health ( state , id ) . await ,
2026-04-22 21:33:44 +00:00
" init_project " = > handle_init_project ( params , state , id ) . await ,
2026-04-23 10:38:30 +00:00
" aggregate_pipeline_status " = > handle_aggregate_pipeline_status ( state , id ) . await ,
2026-04-13 13:02:41 +00:00
_ = > JsonRpcResponse ::error ( id , - 32601 , format! ( " Unknown gateway tool: {tool_name} " ) ) ,
}
}
/// Switch the active project.
2026-04-16 11:37:30 +00:00
async fn handle_switch_project (
params : & Value ,
state : & GatewayState ,
id : Option < Value > ,
) -> JsonRpcResponse {
2026-04-13 13:02:41 +00:00
let project = params
. get ( " arguments " )
. and_then ( | a | a . get ( " project " ) )
. or_else ( | | params . get ( " project " ) )
. and_then ( | v | v . as_str ( ) )
. unwrap_or ( " " ) ;
if project . is_empty ( ) {
2026-04-16 11:37:30 +00:00
return JsonRpcResponse ::error ( id , - 32602 , " missing required parameter: project " . into ( ) ) ;
2026-04-13 13:02:41 +00:00
}
2026-04-15 18:02:47 +00:00
let url = {
let projects = state . projects . read ( ) . await ;
if ! projects . contains_key ( project ) {
let available : Vec < & str > = projects . keys ( ) . map ( | s | s . as_str ( ) ) . collect ( ) ;
return JsonRpcResponse ::error (
2026-04-16 11:37:30 +00:00
id ,
2026-04-15 18:02:47 +00:00
- 32602 ,
format! (
" unknown project ' {project} '. Available: {} " ,
available . join ( " , " )
) ,
) ;
}
projects [ project ] . url . clone ( )
} ;
2026-04-13 13:02:41 +00:00
* state . active_project . write ( ) . await = project . to_string ( ) ;
JsonRpcResponse ::success (
2026-04-16 11:37:30 +00:00
id ,
2026-04-13 13:02:41 +00:00
json! ( {
" content " : [ {
" type " : " text " ,
2026-04-15 18:02:47 +00:00
" text " : format ! ( " Switched to project '{project}' ({url}) " )
2026-04-13 13:02:41 +00:00
} ]
} ) ,
)
}
/// Show pipeline status for the active project by proxying `get_pipeline_status`.
2026-04-16 11:37:30 +00:00
async fn handle_gateway_status ( state : & GatewayState , id : Option < Value > ) -> JsonRpcResponse {
2026-04-13 13:02:41 +00:00
let active = state . active_project . read ( ) . await . clone ( ) ;
let url = match state . active_url ( ) . await {
Ok ( u ) = > u ,
2026-04-16 11:37:30 +00:00
Err ( e ) = > return JsonRpcResponse ::error ( id . clone ( ) , - 32603 , e ) ,
2026-04-13 13:02:41 +00:00
} ;
let mcp_url = format! ( " {} /mcp " , url . trim_end_matches ( '/' ) ) ;
let rpc_body = json! ( {
" jsonrpc " : " 2.0 " ,
" id " : 1 ,
" method " : " tools/call " ,
" params " : {
" name " : " get_pipeline_status " ,
" arguments " : { }
}
} ) ;
match state . client . post ( & mcp_url ) . json ( & rpc_body ) . send ( ) . await {
Ok ( resp ) = > {
match resp . json ::< Value > ( ) . await {
Ok ( upstream ) = > {
// Extract the result from the upstream response and wrap it.
let pipeline = upstream . get ( " result " ) . cloned ( ) . unwrap_or ( json! ( null ) ) ;
JsonRpcResponse ::success (
2026-04-16 11:37:30 +00:00
id ,
2026-04-13 13:02:41 +00:00
json! ( {
" content " : [ {
" type " : " text " ,
" text " : format ! (
" Pipeline status for '{active}': \n {} " ,
serde_json ::to_string_pretty ( & pipeline ) . unwrap_or_default ( )
)
} ]
} ) ,
)
}
2026-04-13 14:07:08 +00:00
Err ( e ) = > {
2026-04-16 11:37:30 +00:00
JsonRpcResponse ::error ( id , - 32603 , format! ( " invalid upstream response: {e} " ) )
2026-04-13 14:07:08 +00:00
}
2026-04-13 13:02:41 +00:00
}
}
2026-04-16 11:37:30 +00:00
Err ( e ) = > JsonRpcResponse ::error ( id , - 32603 , format! ( " failed to reach {mcp_url} : {e} " ) ) ,
2026-04-13 13:02:41 +00:00
}
}
/// Aggregate health checks across all registered projects.
2026-04-16 11:37:30 +00:00
async fn handle_gateway_health ( state : & GatewayState , id : Option < Value > ) -> JsonRpcResponse {
2026-04-13 13:02:41 +00:00
let mut results = BTreeMap ::new ( ) ;
2026-04-15 18:02:47 +00:00
let project_entries : Vec < ( String , String ) > = state
. projects
. read ( )
. await
. iter ( )
. map ( | ( n , e ) | ( n . clone ( ) , e . url . clone ( ) ) )
. collect ( ) ;
for ( name , url ) in & project_entries {
let health_url = format! ( " {} /health " , url . trim_end_matches ( '/' ) ) ;
2026-04-13 13:02:41 +00:00
let status = match state . client . get ( & health_url ) . send ( ) . await {
Ok ( resp ) = > {
if resp . status ( ) . is_success ( ) {
" healthy " . to_string ( )
} else {
format! ( " unhealthy (HTTP {} ) " , resp . status ( ) . as_u16 ( ) )
}
}
Err ( e ) = > format! ( " unreachable: {e} " ) ,
} ;
results . insert ( name . clone ( ) , status ) ;
}
let active = state . active_project . read ( ) . await . clone ( ) ;
JsonRpcResponse ::success (
2026-04-16 11:37:30 +00:00
id ,
2026-04-13 13:02:41 +00:00
json! ( {
" content " : [ {
" type " : " text " ,
" text " : format ! (
" Health check (active: '{active}'): \n {} " ,
results . iter ( )
. map ( | ( name , status ) | format! ( " {name} : {status} " ) )
. collect ::< Vec < _ > > ( )
. join ( " \n " )
)
} ]
} ) ,
)
}
2026-04-23 10:38:30 +00:00
// ── Aggregate pipeline status ─────────────────────────────────────────
/// Fetch `get_pipeline_status` from every registered project URL in parallel.
///
/// Returns a `BTreeMap` of project name → per-project status JSON. Each value
/// is either `{"counts": {...}, "blocked": [...]}` on success or
/// `{"error": "..."}` when the project container is unreachable or returns an
/// unexpected response. A single flaky project never causes the whole call to
/// fail.
pub async fn fetch_all_project_pipeline_statuses (
project_urls : & BTreeMap < String , String > ,
client : & Client ,
) -> BTreeMap < String , Value > {
use futures ::future ::join_all ;
let futures : Vec < _ > = project_urls
. iter ( )
. map ( | ( name , url ) | {
let name = name . clone ( ) ;
let url = url . clone ( ) ;
let client = client . clone ( ) ;
async move {
let result = fetch_one_project_pipeline_status ( & url , & client ) . await ;
( name , result )
}
} )
. collect ( ) ;
join_all ( futures ) . await . into_iter ( ) . collect ( )
}
/// Fetch and aggregate pipeline status for a single project URL.
async fn fetch_one_project_pipeline_status ( url : & str , client : & Client ) -> Value {
let mcp_url = format! ( " {} /mcp " , url . trim_end_matches ( '/' ) ) ;
let rpc_body = json! ( {
" jsonrpc " : " 2.0 " ,
" id " : 1 ,
" method " : " tools/call " ,
" params " : {
" name " : " get_pipeline_status " ,
" arguments " : { }
}
} ) ;
match client . post ( & mcp_url ) . json ( & rpc_body ) . send ( ) . await {
Ok ( resp ) = > match resp . json ::< Value > ( ) . await {
Ok ( upstream ) = > {
if let Some ( text ) = upstream
. get ( " result " )
. and_then ( | r | r . get ( " content " ) )
. and_then ( | c | c . get ( 0 ) )
. and_then ( | c | c . get ( " text " ) )
. and_then ( | t | t . as_str ( ) )
{
match serde_json ::from_str ::< Value > ( text ) {
Ok ( pipeline ) = > aggregate_pipeline_counts ( & pipeline ) ,
Err ( _ ) = > json! ( { " error " : " invalid pipeline JSON " } ) ,
}
} else {
json! ( { " error " : " unexpected response shape " } )
}
}
Err ( e ) = > json! ( { " error " : format ! ( " invalid response: {e} " ) } ) ,
} ,
Err ( e ) = > json! ( { " error " : format ! ( " unreachable: {e} " ) } ) ,
}
}
/// Parse a `get_pipeline_status` JSON payload and produce aggregated counts
/// plus a list of blocked/failing items.
fn aggregate_pipeline_counts ( pipeline : & Value ) -> Value {
let active = pipeline
. get ( " active " )
. and_then ( | a | a . as_array ( ) )
. cloned ( )
. unwrap_or_default ( ) ;
let backlog_count = pipeline
. get ( " backlog_count " )
. and_then ( | n | n . as_u64 ( ) )
. unwrap_or ( 0 ) ;
let mut current = 0 u64 ;
let mut qa = 0 u64 ;
let mut merge = 0 u64 ;
let mut done = 0 u64 ;
let mut blocked : Vec < Value > = Vec ::new ( ) ;
for item in & active {
let stage = item
. get ( " stage " )
. and_then ( | s | s . as_str ( ) )
. unwrap_or ( " unknown " ) ;
match stage {
" current " = > current + = 1 ,
" qa " = > qa + = 1 ,
" merge " = > merge + = 1 ,
" done " = > done + = 1 ,
_ = > { }
}
let is_blocked = item
. get ( " blocked " )
. and_then ( | b | b . as_bool ( ) )
. unwrap_or ( false ) ;
let merge_failure = item . get ( " merge_failure " ) ;
let has_merge_failure = merge_failure
. map ( | f | ! f . is_null ( ) & & f ! = " " )
. unwrap_or ( false ) ;
if is_blocked | | has_merge_failure {
let story_id = item
. get ( " story_id " )
. and_then ( | s | s . as_str ( ) )
. unwrap_or ( " ? " )
. to_string ( ) ;
let story_name = item
. get ( " name " )
. and_then ( | s | s . as_str ( ) )
. unwrap_or ( " " )
. to_string ( ) ;
let reason = if has_merge_failure {
format! (
" merge failure: {} " ,
merge_failure . and_then ( | f | f . as_str ( ) ) . unwrap_or ( " unknown " )
)
} else {
let rc = item
. get ( " retry_count " )
. and_then ( | n | n . as_u64 ( ) )
. unwrap_or ( 0 ) ;
format! ( " blocked after {rc} retries " )
} ;
blocked . push ( json! ( {
" story_id " : story_id ,
" name " : story_name ,
" stage " : stage ,
" reason " : reason ,
} ) ) ;
}
}
json! ( {
" counts " : {
" backlog " : backlog_count ,
" current " : current ,
" qa " : qa ,
" merge " : merge ,
" done " : done ,
} ,
" blocked " : blocked ,
} )
}
/// Format an aggregated status map as a compact, one-line-per-project string
/// suitable for Matrix/Slack messages.
///
/// Healthy projects: `🟢 **name** — B:5 C:2 Q:1 M:0 D:12`
/// Blocked items appended on the same line: `| blocked: 42 [story]`
/// Unreachable projects: `🔴 **name** — UNREACHABLE`
pub fn format_aggregate_status_compact ( statuses : & BTreeMap < String , Value > ) -> String {
let mut lines : Vec < String > = Vec ::new ( ) ;
for ( name , status ) in statuses {
if let Some ( err ) = status . get ( " error " ) . and_then ( | e | e . as_str ( ) ) {
lines . push ( format! ( " \u{1F534} ** {name} ** — UNREACHABLE: {err} " ) ) ;
} else {
let counts = status . get ( " counts " ) ;
let b = counts
. and_then ( | c | c . get ( " backlog " ) )
. and_then ( | n | n . as_u64 ( ) )
. unwrap_or ( 0 ) ;
let c = counts
. and_then ( | c | c . get ( " current " ) )
. and_then ( | n | n . as_u64 ( ) )
. unwrap_or ( 0 ) ;
let q = counts
. and_then ( | c | c . get ( " qa " ) )
. and_then ( | n | n . as_u64 ( ) )
. unwrap_or ( 0 ) ;
let m = counts
. and_then ( | c | c . get ( " merge " ) )
. and_then ( | n | n . as_u64 ( ) )
. unwrap_or ( 0 ) ;
let d = counts
. and_then ( | c | c . get ( " done " ) )
. and_then ( | n | n . as_u64 ( ) )
. unwrap_or ( 0 ) ;
let blocked_arr = status
. get ( " blocked " )
. and_then ( | a | a . as_array ( ) )
. cloned ( )
. unwrap_or_default ( ) ;
let indicator = if blocked_arr . is_empty ( ) {
" \u{1F7E2} " // 🟢
} else {
" \u{1F7E0} " // 🟠
} ;
let mut line = format! ( " {indicator} ** {name} ** — B: {b} C: {c} Q: {q} M: {m} D: {d} " ) ;
if ! blocked_arr . is_empty ( ) {
let ids : Vec < String > = blocked_arr
. iter ( )
. filter_map ( | item | item . get ( " story_id " ) . and_then ( | s | s . as_str ( ) ) )
. map ( | s | s . to_string ( ) )
. collect ( ) ;
line . push_str ( & format! ( " | blocked: {} " , ids . join ( " , " ) ) ) ;
}
lines . push ( line ) ;
}
}
if lines . is_empty ( ) {
return " No projects registered. " . to_string ( ) ;
}
format! ( " **All Projects** \n \n {} " , lines . join ( " \n \n " ) )
}
/// MCP tool handler for `aggregate_pipeline_status`.
async fn handle_aggregate_pipeline_status (
state : & GatewayState ,
id : Option < Value > ,
) -> JsonRpcResponse {
let project_urls : BTreeMap < String , String > = state
. projects
. read ( )
. await
. iter ( )
. map ( | ( name , entry ) | ( name . clone ( ) , entry . url . clone ( ) ) )
. collect ( ) ;
let statuses = fetch_all_project_pipeline_statuses ( & project_urls , & state . client ) . await ;
let active = state . active_project . read ( ) . await . clone ( ) ;
JsonRpcResponse ::success (
id ,
json! ( {
" content " : [ {
" type " : " text " ,
" text " : format ! (
" Aggregate pipeline status (active: '{active}'): \n {} " ,
serde_json ::to_string_pretty ( & statuses ) . unwrap_or_default ( )
)
} ] ,
" projects " : statuses ,
" active " : active ,
} ) ,
)
}
2026-04-22 21:33:44 +00:00
/// Initialise a new huskies project at the given filesystem path.
///
/// Performs the same scaffolding as `huskies init <path>`: creates `.huskies/`,
/// default config files, pipeline directories, and the wizard state. If `name`
/// and `url` are both provided the new project is also registered in
/// `projects.toml` so `switch_project` can reach it immediately.
///
/// Returns an error when the path already contains a `.huskies/` directory.
/// After success the tool response tells the caller what to do next to make
/// `wizard_*` MCP tools work against the new project.
async fn handle_init_project (
params : & Value ,
state : & GatewayState ,
id : Option < Value > ,
) -> JsonRpcResponse {
let args = params . get ( " arguments " ) . unwrap_or ( params ) ;
let path_str = args
. get ( " path " )
. and_then ( | v | v . as_str ( ) )
. unwrap_or ( " " )
. trim ( ) ;
if path_str . is_empty ( ) {
return JsonRpcResponse ::error ( id , - 32602 , " missing required parameter: path " . to_string ( ) ) ;
}
let project_path = std ::path ::Path ::new ( path_str ) ;
// Guard: already a huskies project.
if project_path . join ( " .huskies " ) . exists ( ) {
return JsonRpcResponse ::error (
id ,
- 32602 ,
format! (
" path ' {} ' is already a huskies project (.huskies/ exists). \
Use wizard_status to check setup progress. " ,
project_path . display ( )
) ,
) ;
}
// Create directory if it does not yet exist.
if ! project_path . exists ( )
& & let Err ( e ) = std ::fs ::create_dir_all ( project_path )
{
return JsonRpcResponse ::error (
id ,
- 32603 ,
format! (
" failed to create directory ' {} ': {e} " ,
project_path . display ( )
) ,
) ;
}
// Scaffold .huskies/ — same logic as `huskies init`.
// Port 3001 is written into .mcp.json only when the file is absent; if it
// already exists it is never overwritten (the value is environment-specific).
if let Err ( e ) = crate ::io ::fs ::scaffold ::scaffold_story_kit ( project_path , 3001 ) {
return JsonRpcResponse ::error ( id , - 32603 , format! ( " scaffold failed: {e} " ) ) ;
}
// Initialise wizard state so wizard_status returns a valid response
// immediately after the project server is started.
crate ::io ::wizard ::WizardState ::init_if_missing ( project_path ) ;
// Optionally register the project in projects.toml.
let name = args . get ( " name " ) . and_then ( | v | v . as_str ( ) ) . map ( str ::trim ) ;
let url = args . get ( " url " ) . and_then ( | v | v . as_str ( ) ) . map ( str ::trim ) ;
let registered_name : Option < String > = match ( name , url ) {
( Some ( n ) , Some ( u ) ) if ! n . is_empty ( ) & & ! u . is_empty ( ) = > {
let mut projects = state . projects . write ( ) . await ;
if projects . contains_key ( n ) {
return JsonRpcResponse ::error (
id ,
- 32602 ,
format! (
" project ' {n} ' is already registered. \
Choose a different name or use switch_project. "
) ,
) ;
}
projects . insert ( n . to_string ( ) , ProjectEntry { url : u . to_string ( ) } ) ;
save_config ( & projects , & state . config_dir ) . await ;
crate ::slog! ( " [gateway] init_project: registered '{n}' ({u}) " ) ;
Some ( n . to_string ( ) )
}
_ = > None ,
} ;
let next_steps = if let Some ( ref n ) = registered_name {
format! (
" Project registered as ' {n} ' in projects.toml. \n \
Next steps: \n \
1. Start a huskies server at ' {path_str} ' \
(e.g. `huskies {path_str} ` or via Docker). \n \
2. Call switch_project with name=' {n} ' to make it active. \n \
3. Call wizard_status to begin the setup wizard. "
)
} else {
format! (
" Next steps: \n \
1. Start a huskies server at ' {path_str} ' \
(e.g. `huskies {path_str} ` or via Docker). \n \
2. Register the project: call init_project again with name and url \
parameters, or add it to projects.toml manually. \n \
3. Call switch_project and then wizard_status to begin the setup wizard. \n \n \
Note: wizard_* MCP tools require a running huskies server for the project. "
)
} ;
JsonRpcResponse ::success (
id ,
json! ( {
" content " : [ {
" type " : " text " ,
" text " : format ! (
" Successfully initialised huskies project at '{path_str}'. \n \n {next_steps} "
)
} ]
} ) ,
)
}
2026-04-14 12:02:17 +00:00
// ── Agent join handlers ───────────────────────────────────────────────
/// `GET /gateway/mode` — returns `{"mode":"gateway"}` so clients can detect gateway mode.
#[ handler ]
pub async fn gateway_mode_handler ( ) -> Response {
let body = json! ( { " mode " : " gateway " } ) ;
Response ::builder ( )
. status ( StatusCode ::OK )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from ( serde_json ::to_vec ( & body ) . unwrap_or_default ( ) ) )
}
/// `POST /gateway/tokens` — generate a one-time join token for a build agent.
///
/// Returns `{"token": "<uuid>"}`. The token is valid until consumed by
/// `POST /gateway/register` or the process restarts.
#[ handler ]
pub async fn gateway_generate_token_handler ( state : Data < & Arc < GatewayState > > ) -> Response {
let token = Uuid ::new_v4 ( ) . to_string ( ) ;
let now = chrono ::Utc ::now ( ) . timestamp ( ) as f64 ;
state
. pending_tokens
. write ( )
. await
. insert ( token . clone ( ) , PendingToken { created_at : now } ) ;
crate ::slog! ( " [gateway] Generated join token {:.8}… " , & token ) ;
let body = json! ( { " token " : token } ) ;
Response ::builder ( )
. status ( StatusCode ::OK )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from ( serde_json ::to_vec ( & body ) . unwrap_or_default ( ) ) )
}
/// `POST /gateway/register` — build agent presents its join token and registers.
///
/// Expects JSON body: `{ "token": "...", "label": "...", "address": "..." }`.
/// On success returns the `JoinedAgent` record. The token is consumed immediately.
#[ handler ]
pub async fn gateway_register_agent_handler (
body : Body ,
state : Data < & Arc < GatewayState > > ,
) -> Response {
let bytes = match body . into_bytes ( ) . await {
Ok ( b ) = > b ,
Err ( _ ) = > {
return Response ::builder ( )
. status ( StatusCode ::BAD_REQUEST )
. body ( Body ::from ( " could not read request body " ) ) ;
}
} ;
let req : RegisterAgentRequest = match serde_json ::from_slice ( & bytes ) {
Ok ( r ) = > r ,
Err ( _ ) = > {
return Response ::builder ( )
. status ( StatusCode ::BAD_REQUEST )
. body ( Body ::from ( " invalid JSON body " ) ) ;
}
} ;
// Validate and consume the token.
let mut tokens = state . pending_tokens . write ( ) . await ;
if ! tokens . contains_key ( & req . token ) {
return Response ::builder ( )
. status ( StatusCode ::UNAUTHORIZED )
. body ( Body ::from ( " invalid or already-used join token " ) ) ;
}
tokens . remove ( & req . token ) ;
drop ( tokens ) ;
2026-04-15 18:20:39 +00:00
let now = chrono ::Utc ::now ( ) . timestamp ( ) as f64 ;
2026-04-14 12:02:17 +00:00
let agent = JoinedAgent {
id : Uuid ::new_v4 ( ) . to_string ( ) ,
label : req . label ,
address : req . address ,
2026-04-15 18:20:39 +00:00
registered_at : now ,
last_seen : now ,
2026-04-14 12:25:12 +00:00
assigned_project : None ,
2026-04-14 12:02:17 +00:00
} ;
crate ::slog! (
" [gateway] Agent '{}' registered (id={}) " ,
agent . label ,
agent . id
) ;
2026-04-14 12:25:12 +00:00
{
let mut agents = state . joined_agents . write ( ) . await ;
agents . push ( agent . clone ( ) ) ;
save_agents ( & agents , & state . config_dir ) . await ;
}
2026-04-14 12:02:17 +00:00
let body = serde_json ::to_vec ( & agent ) . unwrap_or_default ( ) ;
Response ::builder ( )
. status ( StatusCode ::OK )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from ( body ) )
}
/// `GET /gateway/agents` — list all registered build agents.
#[ handler ]
pub async fn gateway_list_agents_handler ( state : Data < & Arc < GatewayState > > ) -> Response {
let agents = state . joined_agents . read ( ) . await . clone ( ) ;
let body = serde_json ::to_vec ( & agents ) . unwrap_or_default ( ) ;
Response ::builder ( )
. status ( StatusCode ::OK )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from ( body ) )
}
/// `DELETE /gateway/agents/:id` — remove a registered build agent.
#[ handler ]
pub async fn gateway_remove_agent_handler (
PoemPath ( id ) : PoemPath < String > ,
state : Data < & Arc < GatewayState > > ,
) -> Response {
2026-04-14 12:25:12 +00:00
let removed = {
let mut agents = state . joined_agents . write ( ) . await ;
let before = agents . len ( ) ;
agents . retain ( | a | a . id ! = id ) ;
let removed = agents . len ( ) < before ;
if removed {
save_agents ( & agents , & state . config_dir ) . await ;
}
removed
} ;
2026-04-14 12:02:17 +00:00
if removed {
crate ::slog! ( " [gateway] Removed agent id={id} " ) ;
Response ::builder ( )
. status ( StatusCode ::NO_CONTENT )
. body ( Body ::empty ( ) )
} else {
Response ::builder ( )
. status ( StatusCode ::NOT_FOUND )
. body ( Body ::from ( " agent not found " ) )
}
}
2026-04-14 12:25:12 +00:00
/// `POST /gateway/agents/:id/assign` — assign or unassign an agent to a project.
///
/// Body: `{ "project": "my-project" }` to assign, or `{ "project": null }` to unassign.
/// Returns the updated `JoinedAgent` on success. The assignment is persisted to disk
/// so it survives gateway restarts.
#[ handler ]
pub async fn gateway_assign_agent_handler (
PoemPath ( id ) : PoemPath < String > ,
body : Json < AssignAgentRequest > ,
state : Data < & Arc < GatewayState > > ,
) -> Response {
let project = body
. 0
. project
. and_then ( | p | if p . is_empty ( ) { None } else { Some ( p ) } ) ;
if let Some ( ref p ) = project
2026-04-15 18:02:47 +00:00
& & ! state . projects . read ( ) . await . contains_key ( p . as_str ( ) )
2026-04-14 12:25:12 +00:00
{
return Response ::builder ( )
. status ( StatusCode ::BAD_REQUEST )
. body ( Body ::from ( format! ( " unknown project ' {p} ' " ) ) ) ;
}
let updated = {
let mut agents = state . joined_agents . write ( ) . await ;
match agents . iter_mut ( ) . find ( | a | a . id = = id ) {
None = > None ,
Some ( a ) = > {
a . assigned_project = project ;
Some ( a . clone ( ) )
}
}
} ;
match updated {
None = > Response ::builder ( )
. status ( StatusCode ::NOT_FOUND )
. body ( Body ::from ( " agent not found " ) ) ,
Some ( agent ) = > {
crate ::slog! (
" [gateway] Agent '{}' (id={}) assigned to {:?} " ,
agent . label ,
agent . id ,
agent . assigned_project
) ;
let agents = state . joined_agents . read ( ) . await . clone ( ) ;
save_agents ( & agents , & state . config_dir ) . await ;
let body = serde_json ::to_vec ( & agent ) . unwrap_or_default ( ) ;
Response ::builder ( )
. status ( StatusCode ::OK )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from ( body ) )
}
}
}
2026-04-15 18:20:39 +00:00
/// `POST /gateway/agents/:id/heartbeat` — update an agent's last-seen timestamp.
///
/// Build agents should call this periodically (e.g. every 30 s) so the gateway
/// can distinguish live agents from disconnected ones. Returns 204 No Content on
/// success or 404 if the agent ID is not found.
#[ handler ]
pub async fn gateway_heartbeat_handler (
PoemPath ( id ) : PoemPath < String > ,
state : Data < & Arc < GatewayState > > ,
) -> Response {
let found = {
let mut agents = state . joined_agents . write ( ) . await ;
match agents . iter_mut ( ) . find ( | a | a . id = = id ) {
None = > false ,
Some ( a ) = > {
a . last_seen = chrono ::Utc ::now ( ) . timestamp ( ) as f64 ;
true
}
}
} ;
if found {
Response ::builder ( )
. status ( StatusCode ::NO_CONTENT )
. body ( Body ::empty ( ) )
} else {
Response ::builder ( )
. status ( StatusCode ::NOT_FOUND )
. body ( Body ::from ( " agent not found " ) )
}
}
2026-04-13 13:02:41 +00:00
// ── Health aggregation endpoint ──────────────────────────────────────
/// HTTP GET `/health` handler for the gateway — aggregates health from all projects.
#[ handler ]
pub async fn gateway_health_handler ( state : Data < & Arc < GatewayState > > ) -> Response {
let mut all_healthy = true ;
let mut statuses = BTreeMap ::new ( ) ;
2026-04-15 18:02:47 +00:00
let project_entries : Vec < ( String , String ) > = state
. projects
. read ( )
. await
. iter ( )
. map ( | ( n , e ) | ( n . clone ( ) , e . url . clone ( ) ) )
. collect ( ) ;
for ( name , url ) in & project_entries {
let health_url = format! ( " {} /health " , url . trim_end_matches ( '/' ) ) ;
2026-04-13 13:02:41 +00:00
let healthy = match state . client . get ( & health_url ) . send ( ) . await {
Ok ( resp ) = > resp . status ( ) . is_success ( ) ,
Err ( _ ) = > false ,
} ;
if ! healthy {
all_healthy = false ;
}
statuses . insert ( name . clone ( ) , if healthy { " ok " } else { " error " } ) ;
}
let body = json! ( {
" status " : if all_healthy { " ok " } else { " degraded " } ,
" projects " : statuses ,
} ) ;
2026-04-13 14:07:08 +00:00
let status = if all_healthy {
StatusCode ::OK
} else {
StatusCode ::SERVICE_UNAVAILABLE
} ;
2026-04-13 13:02:41 +00:00
Response ::builder ( )
. status ( status )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from ( serde_json ::to_vec ( & body ) . unwrap_or_default ( ) ) )
}
2026-04-14 11:24:12 +00:00
// ── Gateway Web UI ───────────────────────────────────────────────────
/// Self-contained HTML page for the gateway web UI. Fetches project list from
/// `/api/gateway` and switches projects via `POST /api/gateway/switch`, which
/// internally calls the `switch_project` MCP tool logic.
const GATEWAY_UI_HTML : & str = r # "<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Huskies Gateway</title>
<style>
*, *::before, *::after { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: #0f172a;
color: #e2e8f0;
min-height: 100vh;
display: flex;
align-items: center;
justify-content: center;
}
.card {
background: #1e293b;
border: 1px solid #334155;
border-radius: 12px;
padding: 2rem;
width: 100%;
max-width: 480px;
box-shadow: 0 4px 24px rgba(0,0,0,0.4);
}
.header {
display: flex;
align-items: center;
gap: 0.75rem;
margin-bottom: 1.5rem;
}
.logo { font-size: 1.75rem; }
h1 { font-size: 1.25rem; font-weight: 600; color: #f8fafc; }
.subtitle { font-size: 0.8rem; color: #64748b; margin-top: 0.125rem; }
label {
display: block;
font-size: 0.75rem;
font-weight: 500;
color: #94a3b8;
text-transform: uppercase;
letter-spacing: 0.05em;
margin-bottom: 0.5rem;
}
select {
width: 100%;
padding: 0.625rem 0.875rem;
background: #0f172a;
border: 1px solid #334155;
border-radius: 8px;
color: #f1f5f9;
font-size: 0.9rem;
cursor: pointer;
appearance: none;
background-image: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='12' height='12' viewBox='0 0 12 12'%3E%3Cpath fill='%2394a3b8' d='M6 8L1 3h10z'/%3E%3C/svg%3E");
background-repeat: no-repeat;
background-position: right 0.875rem center;
padding-right: 2.5rem;
}
select:focus { outline: none; border-color: #6366f1; box-shadow: 0 0 0 2px rgba(99,102,241,0.25); }
.active-badge {
display: inline-flex;
align-items: center;
gap: 0.375rem;
background: rgba(99,102,241,0.15);
border: 1px solid rgba(99,102,241,0.4);
border-radius: 999px;
padding: 0.25rem 0.75rem;
font-size: 0.75rem;
color: #a5b4fc;
margin-top: 0.875rem;
}
.dot { width: 6px; height: 6px; border-radius: 50%; background: #6366f1; }
.status { margin-top: 1rem; font-size: 0.8rem; color: #64748b; min-height: 1.25rem; }
.status.ok { color: #4ade80; }
.status.err { color: #f87171; }
2026-04-14 18:53:41 +00:00
.nav { margin-top: 1.25rem; padding-top: 1rem; border-top: 1px solid #334155; display: flex; gap: 1rem; }
.nav a { font-size: 0.8rem; color: #64748b; text-decoration: none; }
.nav a:hover { color: #94a3b8; }
2026-04-14 11:24:12 +00:00
</style>
</head>
<body>
<div class="card">
<div class="header">
<span class="logo">🐺</span>
<div>
<h1>Huskies Gateway</h1>
<div class="subtitle">Multi-project orchestration</div>
</div>
</div>
<label for="project-select">Active Project</label>
<select id="project-select" onchange="switchProject(this.value)">
<option disabled>Loading…</option>
</select>
<div id="active-label" class="active-badge" style="display:none">
<span class="dot"></span>
<span id="active-name"></span>
</div>
<div id="status" class="status"></div>
2026-04-14 18:53:41 +00:00
<nav class="nav">
<a href="/bot-config">🤖 Bot Configuration</a>
</nav>
2026-04-14 11:24:12 +00:00
</div>
<script>
async function loadState() {
try {
const r = await fetch('/api/gateway');
const data = await r.json();
const sel = document.getElementById('project-select');
sel.innerHTML = '';
for (const p of data.projects) {
const opt = document.createElement('option');
opt.value = p.name;
opt.textContent = p.name + ' — ' + p.url;
if (p.name === data.active) opt.selected = true;
sel.appendChild(opt);
}
document.getElementById('active-name').textContent = data.active;
document.getElementById('active-label').style.display = 'inline-flex';
} catch(e) {
document.getElementById('status').textContent = 'Failed to load state: ' + e;
document.getElementById('status').className = 'status err';
}
}
async function switchProject(name) {
const statusEl = document.getElementById('status');
statusEl.className = 'status';
statusEl.textContent = 'Switching…';
try {
const r = await fetch('/api/gateway/switch', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({project: name})
});
const data = await r.json();
if (data.ok) {
document.getElementById('active-name').textContent = name;
statusEl.className = 'status ok';
statusEl.textContent = 'Switched to ' + name;
} else {
statusEl.className = 'status err';
statusEl.textContent = data.error || 'Switch failed';
loadState();
}
} catch(e) {
statusEl.className = 'status err';
statusEl.textContent = 'Error: ' + e;
loadState();
}
}
loadState();
</script>
</body>
</html>
"# ;
/// Serve the gateway web UI HTML page at `GET /`.
#[ handler ]
pub async fn gateway_index_handler ( ) -> Response {
Response ::builder ( )
. status ( StatusCode ::OK )
. header ( " Content-Type " , " text/html; charset=utf-8 " )
. body ( Body ::from ( GATEWAY_UI_HTML ) )
}
/// `GET /api/gateway` — returns the list of registered projects and the active project.
#[ handler ]
pub async fn gateway_api_handler ( state : Data < & Arc < GatewayState > > ) -> Response {
let active = state . active_project . read ( ) . await . clone ( ) ;
let projects : Vec < Value > = state
. projects
2026-04-15 18:02:47 +00:00
. read ( )
. await
2026-04-14 11:24:12 +00:00
. iter ( )
. map ( | ( name , entry ) | {
json! ( {
" name " : name ,
" url " : entry . url ,
} )
} )
. collect ( ) ;
let body = json! ( { " active " : active , " projects " : projects } ) ;
Response ::builder ( )
. status ( StatusCode ::OK )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from ( serde_json ::to_vec ( & body ) . unwrap_or_default ( ) ) )
}
/// Request body for `POST /api/gateway/switch`.
#[ derive(Deserialize) ]
struct SwitchRequest {
project : String ,
}
/// `POST /api/gateway/switch` — switch the active project by calling the
/// `switch_project` MCP tool logic, then return `{"ok": true}` or `{"ok": false, "error": "..."}`.
#[ handler ]
pub async fn gateway_switch_handler (
state : Data < & Arc < GatewayState > > ,
body : Json < SwitchRequest > ,
) -> Response {
let params = json! ( { " arguments " : { " project " : body . project } } ) ;
2026-04-16 11:37:30 +00:00
let resp = handle_switch_project ( & params , & state , None ) . await ;
2026-04-14 11:24:12 +00:00
let ( ok , error ) = if resp . result . is_some ( ) {
( true , None )
} else {
let msg = resp
. error
. as_ref ( )
. map ( | e | e . message . clone ( ) )
. unwrap_or_else ( | | " unknown error " . to_string ( ) ) ;
( false , Some ( msg ) )
} ;
let body_val = if ok {
json! ( { " ok " : true } )
} else {
json! ( { " ok " : false , " error " : error } )
} ;
let status = if ok {
StatusCode ::OK
} else {
StatusCode ::BAD_REQUEST
} ;
Response ::builder ( )
. status ( status )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from (
serde_json ::to_vec ( & body_val ) . unwrap_or_default ( ) ,
) )
}
2026-04-15 18:02:47 +00:00
// ── Project management API ───────────────────────────────────────────
/// Request body for adding a new project.
#[ derive(Deserialize) ]
struct AddProjectRequest {
name : String ,
url : String ,
}
/// `POST /api/gateway/projects` — add a new project to the gateway config.
///
/// Expects JSON `{ "name": "...", "url": "..." }`. Returns the created project
/// or 409 Conflict if a project with the same name already exists.
#[ handler ]
pub async fn gateway_add_project_handler (
state : Data < & Arc < GatewayState > > ,
body : Json < AddProjectRequest > ,
) -> Response {
let name = body . 0. name . trim ( ) . to_string ( ) ;
let url = body . 0. url . trim ( ) . to_string ( ) ;
if name . is_empty ( ) {
return Response ::builder ( )
. status ( StatusCode ::BAD_REQUEST )
. body ( Body ::from ( " project name must not be empty " ) ) ;
}
if url . is_empty ( ) {
return Response ::builder ( )
. status ( StatusCode ::BAD_REQUEST )
. body ( Body ::from ( " project url must not be empty " ) ) ;
}
{
let mut projects = state . projects . write ( ) . await ;
if projects . contains_key ( & name ) {
return Response ::builder ( )
. status ( StatusCode ::CONFLICT )
. body ( Body ::from ( format! ( " project ' {name} ' already exists " ) ) ) ;
}
projects . insert ( name . clone ( ) , ProjectEntry { url : url . clone ( ) } ) ;
}
let snapshot = state . projects . read ( ) . await . clone ( ) ;
save_config ( & snapshot , & state . config_dir ) . await ;
crate ::slog! ( " [gateway] Added project '{name}' ({url}) " ) ;
let body_val = json! ( { " name " : name , " url " : url } ) ;
Response ::builder ( )
. status ( StatusCode ::OK )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from (
serde_json ::to_vec ( & body_val ) . unwrap_or_default ( ) ,
) )
}
/// `DELETE /api/gateway/projects/:name` — remove a project from the gateway config.
///
/// Returns 204 No Content on success. Returns 400 if this is the last project
/// (the gateway requires at least one project to remain configured).
#[ handler ]
pub async fn gateway_remove_project_handler (
PoemPath ( name ) : PoemPath < String > ,
state : Data < & Arc < GatewayState > > ,
) -> Response {
let active = state . active_project . read ( ) . await . clone ( ) ;
{
let mut projects = state . projects . write ( ) . await ;
if ! projects . contains_key ( & name ) {
return Response ::builder ( )
. status ( StatusCode ::NOT_FOUND )
. body ( Body ::from ( format! ( " project ' {name} ' not found " ) ) ) ;
}
if projects . len ( ) = = 1 {
return Response ::builder ( )
. status ( StatusCode ::BAD_REQUEST )
. body ( Body ::from ( " cannot remove the last project " ) ) ;
}
projects . remove ( & name ) ;
}
let snapshot = state . projects . read ( ) . await . clone ( ) ;
save_config ( & snapshot , & state . config_dir ) . await ;
// If the removed project was active, switch to the first remaining.
if active = = name {
let first = state . projects . read ( ) . await . keys ( ) . next ( ) . cloned ( ) ;
if let Some ( new_active ) = first {
* state . active_project . write ( ) . await = new_active ;
}
}
crate ::slog! ( " [gateway] Removed project '{name}' " ) ;
Response ::builder ( )
. status ( StatusCode ::NO_CONTENT )
. body ( Body ::empty ( ) )
}
2026-04-14 18:53:41 +00:00
// ── Bot configuration API ────────────────────────────────────────────
/// Request/response body for the bot configuration API.
#[ derive(Deserialize, Serialize, Default) ]
struct BotConfigPayload {
/// Chat transport: `"matrix"` or `"slack"`.
transport : String ,
// Matrix fields
homeserver : Option < String > ,
username : Option < String > ,
password : Option < String > ,
// Slack fields
slack_bot_token : Option < String > ,
slack_signing_secret : Option < String > ,
}
/// Read the current raw bot.toml (without validation) as key/value pairs for
/// the configuration UI. Returns an empty payload if the file does not exist.
fn read_bot_config_raw ( config_dir : & Path ) -> BotConfigPayload {
let path = config_dir . join ( " .huskies " ) . join ( " bot.toml " ) ;
let content = match std ::fs ::read_to_string ( & path ) {
Ok ( c ) = > c ,
Err ( _ ) = > return BotConfigPayload ::default ( ) ,
} ;
let table : toml ::Value = match toml ::from_str ( & content ) {
Ok ( v ) = > v ,
Err ( _ ) = > return BotConfigPayload ::default ( ) ,
} ;
let s = | key : & str | -> Option < String > {
table
. get ( key )
. and_then ( | v | v . as_str ( ) )
. map ( | s | s . to_string ( ) )
} ;
BotConfigPayload {
transport : s ( " transport " ) . unwrap_or_else ( | | " matrix " . to_string ( ) ) ,
homeserver : s ( " homeserver " ) ,
username : s ( " username " ) ,
password : s ( " password " ) ,
slack_bot_token : s ( " slack_bot_token " ) ,
slack_signing_secret : s ( " slack_signing_secret " ) ,
}
}
/// Write a `bot.toml` from the given payload.
fn write_bot_config ( config_dir : & Path , payload : & BotConfigPayload ) -> Result < ( ) , String > {
let huskies_dir = config_dir . join ( " .huskies " ) ;
std ::fs ::create_dir_all ( & huskies_dir )
. map_err ( | e | format! ( " cannot create .huskies dir: {e} " ) ) ? ;
let path = huskies_dir . join ( " bot.toml " ) ;
let content = match payload . transport . as_str ( ) {
" slack " = > {
format! (
" enabled = true \n transport = \" slack \" \n \n slack_bot_token = {} \n slack_signing_secret = {} \n slack_channel_ids = [] \n " ,
toml_string ( payload . slack_bot_token . as_deref ( ) . unwrap_or ( " " ) ) ,
toml_string ( payload . slack_signing_secret . as_deref ( ) . unwrap_or ( " " ) ) ,
)
}
_ = > {
// Default to matrix
format! (
" enabled = true \n transport = \" matrix \" \n \n homeserver = {} \n username = {} \n password = {} \n room_ids = [] \n allowed_users = [] \n " ,
toml_string ( payload . homeserver . as_deref ( ) . unwrap_or ( " " ) ) ,
toml_string ( payload . username . as_deref ( ) . unwrap_or ( " " ) ) ,
toml_string ( payload . password . as_deref ( ) . unwrap_or ( " " ) ) ,
)
}
} ;
std ::fs ::write ( & path , content ) . map_err ( | e | format! ( " cannot write bot.toml: {e} " ) )
}
/// Escape a string as a TOML quoted string.
fn toml_string ( s : & str ) -> String {
format! ( " \" {} \" " , s . replace ( '\\' , " \\ \\ " ) . replace ( '"' , " \\ \" " ) )
}
2026-04-23 10:38:30 +00:00
/// `GET /api/gateway/pipeline` — fetch pipeline status from all registered projects in parallel.
2026-04-15 18:34:37 +00:00
///
2026-04-23 10:38:30 +00:00
/// Returns `{ "active": "<project>", "projects": { "<name>": { "counts": {...}, "blocked": [...] } | { "error": "..." } } }`.
/// Requests to each project container are issued concurrently — wall-clock latency is
/// bounded by the slowest responding project, not the sum of all response times.
2026-04-15 18:34:37 +00:00
#[ handler ]
pub async fn gateway_all_pipeline_handler ( state : Data < & Arc < GatewayState > > ) -> Response {
2026-04-23 10:38:30 +00:00
let project_urls : BTreeMap < String , String > = state
2026-04-15 18:34:37 +00:00
. projects
. read ( )
. await
. iter ( )
. map ( | ( n , e ) | ( n . clone ( ) , e . url . clone ( ) ) )
. collect ( ) ;
2026-04-23 10:38:30 +00:00
let results = fetch_all_project_pipeline_statuses ( & project_urls , & state . client ) . await ;
2026-04-15 18:34:37 +00:00
let active = state . active_project . read ( ) . await . clone ( ) ;
let body = json! ( { " active " : active , " projects " : results } ) ;
Response ::builder ( )
. status ( StatusCode ::OK )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from ( serde_json ::to_vec ( & body ) . unwrap_or_default ( ) ) )
}
2026-04-14 18:53:41 +00:00
/// `GET /api/gateway/bot-config` — return current bot.toml fields as JSON.
#[ handler ]
pub async fn gateway_bot_config_get_handler ( state : Data < & Arc < GatewayState > > ) -> Response {
let payload = read_bot_config_raw ( & state . config_dir ) ;
Response ::builder ( )
. status ( StatusCode ::OK )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from ( serde_json ::to_vec ( & payload ) . unwrap_or_default ( ) ) )
}
/// `POST /api/gateway/bot-config` — write new bot.toml and restart the bot.
#[ handler ]
pub async fn gateway_bot_config_save_handler (
state : Data < & Arc < GatewayState > > ,
body : Json < BotConfigPayload > ,
) -> Response {
if let Err ( e ) = write_bot_config ( & state . config_dir , & body ) {
let err = json! ( { " ok " : false , " error " : e } ) ;
return Response ::builder ( )
. status ( StatusCode ::INTERNAL_SERVER_ERROR )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from ( serde_json ::to_vec ( & err ) . unwrap_or_default ( ) ) ) ;
}
// Abort the existing bot task (if any) and spawn a fresh one with the new config.
{
let mut handle = state . bot_handle . lock ( ) . await ;
if let Some ( h ) = handle . take ( ) {
h . abort ( ) ;
}
2026-04-15 18:02:47 +00:00
let gateway_projects : Vec < String > = state . projects . read ( ) . await . keys ( ) . cloned ( ) . collect ( ) ;
2026-04-21 11:47:06 +01:00
let gateway_project_urls : std ::collections ::BTreeMap < String , String > = state
. projects
. read ( )
. await
. iter ( )
. map ( | ( name , entry ) | ( name . clone ( ) , entry . url . clone ( ) ) )
. collect ( ) ;
2026-04-14 18:53:41 +00:00
let new_handle = spawn_gateway_bot (
& state . config_dir ,
Arc ::clone ( & state . active_project ) ,
gateway_projects ,
2026-04-21 11:47:06 +01:00
gateway_project_urls ,
2026-04-14 18:53:41 +00:00
state . port ,
) ;
* handle = new_handle ;
}
crate ::slog! ( " [gateway] Bot configuration saved; bot restarted " ) ;
let ok = json! ( { " ok " : true } ) ;
Response ::builder ( )
. status ( StatusCode ::OK )
. header ( " Content-Type " , " application/json " )
. body ( Body ::from ( serde_json ::to_vec ( & ok ) . unwrap_or_default ( ) ) )
}
/// Self-contained HTML page for bot configuration.
const GATEWAY_BOT_CONFIG_HTML : & str = r # "<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Bot Configuration — Huskies Gateway</title>
<style>
*, *::before, *::after { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: #0f172a;
color: #e2e8f0;
min-height: 100vh;
display: flex;
align-items: center;
justify-content: center;
}
.card {
background: #1e293b;
border: 1px solid #334155;
border-radius: 12px;
padding: 2rem;
width: 100%;
max-width: 520px;
box-shadow: 0 4px 24px rgba(0,0,0,0.4);
}
.header {
display: flex;
align-items: center;
gap: 0.75rem;
margin-bottom: 1.5rem;
}
.back {
color: #64748b;
text-decoration: none;
font-size: 0.85rem;
margin-right: auto;
}
.back:hover { color: #94a3b8; }
.logo { font-size: 1.5rem; }
h1 { font-size: 1.2rem; font-weight: 600; color: #f8fafc; }
.field { margin-bottom: 1rem; }
label {
display: block;
font-size: 0.75rem;
font-weight: 500;
color: #94a3b8;
text-transform: uppercase;
letter-spacing: 0.05em;
margin-bottom: 0.4rem;
}
input, select {
width: 100%;
padding: 0.625rem 0.875rem;
background: #0f172a;
border: 1px solid #334155;
border-radius: 8px;
color: #f1f5f9;
font-size: 0.9rem;
}
input:focus, select:focus { outline: none; border-color: #6366f1; box-shadow: 0 0 0 2px rgba(99,102,241,0.25); }
select {
cursor: pointer;
appearance: none;
background-image: url("data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='12' height='12' viewBox='0 0 12 12'%3E%3Cpath fill='%2394a3b8' d='M6 8L1 3h10z'/%3E%3C/svg%3E");
background-repeat: no-repeat;
background-position: right 0.875rem center;
padding-right: 2.5rem;
}
.section { margin-top: 1rem; }
.divider {
border: none;
border-top: 1px solid #334155;
margin: 1.25rem 0;
}
button {
width: 100%;
padding: 0.75rem;
background: #6366f1;
border: none;
border-radius: 8px;
color: #fff;
font-size: 0.9rem;
font-weight: 600;
cursor: pointer;
margin-top: 1.25rem;
}
button:hover { background: #4f46e5; }
button:disabled { background: #334155; color: #64748b; cursor: not-allowed; }
.status { margin-top: 0.875rem; font-size: 0.8rem; color: #64748b; min-height: 1.25rem; }
.status.ok { color: #4ade80; }
.status.err { color: #f87171; }
</style>
</head>
<body>
<div class="card">
<div class="header">
<a href="/" class="back">← Gateway</a>
<span class="logo">🤖</span>
<h1>Bot Configuration</h1>
</div>
<div class="field">
<label for="transport">Transport</label>
<select id="transport" onchange="onTransportChange(this.value)">
<option value="matrix">Matrix</option>
<option value="slack">Slack</option>
</select>
</div>
<hr class="divider">
<div id="matrix-fields" class="section">
<div class="field">
<label for="homeserver">Homeserver URL</label>
<input type="text" id="homeserver" placeholder="https://matrix.example.com">
</div>
<div class="field">
<label for="username">Bot Username</label>
<input type="text" id="username" placeholder="@bot:example.com">
</div>
<div class="field">
<label for="password">Password</label>
<input type="password" id="password" placeholder="••••••••">
</div>
</div>
<div id="slack-fields" class="section" style="display:none">
<div class="field">
<label for="slack-bot-token">Bot Token</label>
<input type="password" id="slack-bot-token" placeholder="xoxb-…">
</div>
<div class="field">
<label for="slack-signing-secret">App / Signing Secret</label>
<input type="password" id="slack-signing-secret" placeholder="Your signing secret">
</div>
</div>
<button id="save-btn" onclick="save()">Save & Restart Bot</button>
<div id="status" class="status"></div>
</div>
<script>
function onTransportChange(v) {
document.getElementById('matrix-fields').style.display = v === 'matrix' ? '' : 'none';
document.getElementById('slack-fields').style.display = v === 'slack' ? '' : 'none';
}
async function loadConfig() {
try {
const r = await fetch('/api/gateway/bot-config');
const d = await r.json();
document.getElementById('transport').value = d.transport || 'matrix';
onTransportChange(d.transport || 'matrix');
document.getElementById('homeserver').value = d.homeserver || '';
document.getElementById('username').value = d.username || '';
document.getElementById('password').value = d.password || '';
document.getElementById('slack-bot-token').value = d.slack_bot_token || '';
document.getElementById('slack-signing-secret').value = d.slack_signing_secret || '';
} catch(e) {
document.getElementById('status').textContent = 'Failed to load config: ' + e;
document.getElementById('status').className = 'status err';
}
}
async function save() {
const btn = document.getElementById('save-btn');
const statusEl = document.getElementById('status');
btn.disabled = true;
btn.textContent = 'Saving…';
statusEl.className = 'status';
statusEl.textContent = '';
const transport = document.getElementById('transport').value;
const payload = { transport };
if (transport === 'matrix') {
payload.homeserver = document.getElementById('homeserver').value;
payload.username = document.getElementById('username').value;
payload.password = document.getElementById('password').value;
} else {
payload.slack_bot_token = document.getElementById('slack-bot-token').value;
payload.slack_signing_secret = document.getElementById('slack-signing-secret').value;
}
try {
const r = await fetch('/api/gateway/bot-config', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify(payload)
});
const d = await r.json();
if (d.ok) {
statusEl.className = 'status ok';
statusEl.textContent = 'Saved — bot restarted with new credentials.';
} else {
statusEl.className = 'status err';
statusEl.textContent = d.error || 'Save failed';
}
} catch(e) {
statusEl.className = 'status err';
statusEl.textContent = 'Error: ' + e;
}
btn.disabled = false;
btn.textContent = 'Save & Restart Bot';
}
loadConfig();
</script>
</body>
</html>
"# ;
/// Serve the bot configuration HTML page at `GET /bot-config`.
#[ handler ]
pub async fn gateway_bot_config_page_handler ( ) -> Response {
Response ::builder ( )
. status ( StatusCode ::OK )
. header ( " Content-Type " , " text/html; charset=utf-8 " )
. body ( Body ::from ( GATEWAY_BOT_CONFIG_HTML ) )
}
2026-04-13 13:02:41 +00:00
// ── Gateway server startup ───────────────────────────────────────────
2026-04-15 19:50:27 +00:00
/// Build the complete gateway route tree.
///
/// Extracted from `run` so that tests can construct the full route tree and
/// catch duplicate-route panics before they reach production.
pub fn build_gateway_route ( state_arc : Arc < GatewayState > ) -> impl poem ::Endpoint {
poem ::Route ::new ( )
2026-04-14 18:53:41 +00:00
. at ( " /bot-config " , poem ::get ( gateway_bot_config_page_handler ) )
2026-04-14 11:24:12 +00:00
. at ( " /api/gateway " , poem ::get ( gateway_api_handler ) )
. at ( " /api/gateway/switch " , poem ::post ( gateway_switch_handler ) )
2026-04-15 18:34:37 +00:00
. at (
" /api/gateway/pipeline " ,
poem ::get ( gateway_all_pipeline_handler ) ,
)
2026-04-15 18:02:47 +00:00
. at (
" /api/gateway/projects " ,
poem ::post ( gateway_add_project_handler ) ,
)
. at (
" /api/gateway/projects/:name " ,
poem ::delete ( gateway_remove_project_handler ) ,
)
2026-04-14 18:53:41 +00:00
. at (
" /api/gateway/bot-config " ,
poem ::get ( gateway_bot_config_get_handler ) . post ( gateway_bot_config_save_handler ) ,
)
2026-04-13 13:02:41 +00:00
. at (
" /mcp " ,
poem ::post ( gateway_mcp_post_handler ) . get ( gateway_mcp_get_handler ) ,
)
. at ( " /health " , poem ::get ( gateway_health_handler ) )
2026-04-14 12:02:17 +00:00
// Agent join endpoints.
. at ( " /gateway/mode " , poem ::get ( gateway_mode_handler ) )
. at (
" /gateway/tokens " ,
poem ::post ( gateway_generate_token_handler ) ,
)
. at (
" /gateway/register " ,
poem ::post ( gateway_register_agent_handler ) ,
)
. at ( " /gateway/agents " , poem ::get ( gateway_list_agents_handler ) )
. at (
" /gateway/agents/:id " ,
poem ::delete ( gateway_remove_agent_handler ) ,
)
2026-04-14 12:25:12 +00:00
. at (
" /gateway/agents/:id/assign " ,
poem ::post ( gateway_assign_agent_handler ) ,
)
2026-04-15 18:20:39 +00:00
. at (
" /gateway/agents/:id/heartbeat " ,
poem ::post ( gateway_heartbeat_handler ) ,
)
2026-04-14 12:02:17 +00:00
// Serve the embedded React frontend so the gateway has a UI.
. at (
" /assets/*path " ,
poem ::get ( crate ::http ::assets ::embedded_asset ) ,
)
. at ( " /*path " , poem ::get ( crate ::http ::assets ::embedded_file ) )
. at ( " / " , poem ::get ( crate ::http ::assets ::embedded_index ) )
2026-04-15 19:50:27 +00:00
. data ( state_arc )
}
/// Start the gateway HTTP server. This is the entry point when `--gateway` is used.
pub async fn run ( config_path : & Path , port : u16 ) -> Result < ( ) , std ::io ::Error > {
// Locate the gateway config directory (parent of `projects.toml`).
let config_dir = config_path
. parent ( )
. unwrap_or ( std ::path ::Path ::new ( " . " ) )
. to_path_buf ( ) ;
let config = GatewayConfig ::load ( config_path ) . map_err ( std ::io ::Error ::other ) ? ;
let state =
GatewayState ::new ( config , config_dir . clone ( ) , port ) . map_err ( std ::io ::Error ::other ) ? ;
let state_arc = Arc ::new ( state ) ;
let active = state_arc . active_project . read ( ) . await . clone ( ) ;
crate ::slog! ( " [gateway] Starting gateway on port {port}, active project: {active} " ) ;
crate ::slog! (
" [gateway] Registered projects: {} " ,
state_arc
. projects
. read ( )
. await
. keys ( )
. cloned ( )
. collect ::< Vec < _ > > ( )
. join ( " , " )
) ;
// Write `.mcp.json` so that the gateway's Matrix bot's Claude Code CLI
// connects to this gateway's MCP endpoint (which proxies to the active project).
if let Err ( e ) = write_gateway_mcp_json ( & config_dir , port ) {
crate ::slog! ( " [gateway] Warning: could not write .mcp.json: {e} " ) ;
}
// Spawn the Matrix bot if `.huskies/bot.toml` exists in the config directory.
let gateway_projects : Vec < String > = state_arc . projects . read ( ) . await . keys ( ) . cloned ( ) . collect ( ) ;
2026-04-21 11:47:06 +01:00
let gateway_project_urls : std ::collections ::BTreeMap < String , String > = state_arc
. projects
. read ( )
. await
. iter ( )
. map ( | ( name , entry ) | ( name . clone ( ) , entry . url . clone ( ) ) )
. collect ( ) ;
2026-04-15 19:50:27 +00:00
let bot_abort = spawn_gateway_bot (
& config_dir ,
Arc ::clone ( & state_arc . active_project ) ,
gateway_projects ,
2026-04-21 11:47:06 +01:00
gateway_project_urls ,
2026-04-15 19:50:27 +00:00
port ,
) ;
* state_arc . bot_handle . lock ( ) . await = bot_abort ;
let route = build_gateway_route ( state_arc ) ;
2026-04-13 13:02:41 +00:00
let host = std ::env ::var ( " HUSKIES_HOST " ) . unwrap_or_else ( | _ | " 127.0.0.1 " . to_string ( ) ) ;
let addr = format! ( " {host} : {port} " ) ;
crate ::slog! ( " [gateway] Listening on {addr} " ) ;
poem ::Server ::new ( poem ::listener ::TcpListener ::bind ( & addr ) )
. run ( route )
. await
}
2026-04-14 09:57:11 +00:00
// ── Matrix bot integration ───────────────────────────────────────────
/// Write (or overwrite) a `.mcp.json` in `config_dir` that points Claude Code
/// CLI at the gateway's own `/mcp` endpoint. This lets the gateway's Matrix
/// bot use gateway-proxied tool calls instead of a project-specific server.
fn write_gateway_mcp_json ( config_dir : & Path , port : u16 ) -> Result < ( ) , std ::io ::Error > {
let host = std ::env ::var ( " HUSKIES_HOST " ) . unwrap_or_else ( | _ | " 127.0.0.1 " . to_string ( ) ) ;
let url = format! ( " http:// {host} : {port} /mcp " ) ;
let content = serde_json ::json! ( {
" mcpServers " : {
" huskies " : {
" type " : " http " ,
" url " : url
}
}
} ) ;
let path = config_dir . join ( " .mcp.json " ) ;
std ::fs ::write ( & path , serde_json ::to_string_pretty ( & content ) . unwrap ( ) ) ? ;
crate ::slog! ( " [gateway] Wrote {} pointing to {} " , path . display ( ) , url ) ;
Ok ( ( ) )
}
/// Attempt to spawn the Matrix bot against the gateway config directory.
///
/// Reads `<config_dir>/.huskies/bot.toml`. If absent or disabled the function
/// returns immediately without spawning anything. When the bot is enabled it
/// receives a shared reference to the gateway's active-project `RwLock` so the
/// `switch` command can change the active project without going through HTTP.
2026-04-14 18:53:41 +00:00
///
/// Returns an [`tokio::task::AbortHandle`] if the bot task was spawned, `None` otherwise.
2026-04-14 09:57:11 +00:00
fn spawn_gateway_bot (
config_dir : & Path ,
active_project : ActiveProject ,
gateway_projects : Vec < String > ,
2026-04-21 11:47:06 +01:00
gateway_project_urls : std ::collections ::BTreeMap < String , String > ,
2026-04-14 09:57:11 +00:00
port : u16 ,
2026-04-14 18:53:41 +00:00
) -> Option < tokio ::task ::AbortHandle > {
2026-04-14 09:57:11 +00:00
use crate ::agents ::AgentPool ;
use tokio ::sync ::{ broadcast , mpsc } ;
// Create a watcher broadcast channel (no file-system watcher in gateway mode).
let ( watcher_tx , _ ) = broadcast ::channel ( 16 ) ;
// Create a dummy permission channel — permission prompts are not forwarded
// across the proxy boundary in this initial implementation.
let ( _perm_tx , perm_rx ) = mpsc ::unbounded_channel ( ) ;
let perm_rx = Arc ::new ( tokio ::sync ::Mutex ::new ( perm_rx ) ) ;
// Create a shutdown watch channel. Gateway process exit signals Ctrl-C
// via OS signal, not through a watch channel, so we leave this at None
// (no shutdown announcement). The sender is kept alive for the duration.
let ( shutdown_tx , shutdown_rx ) =
tokio ::sync ::watch ::channel ::< Option < crate ::rebuild ::ShutdownReason > > ( None ) ;
// Keep sender alive so the receiver is never prematurely closed.
std ::mem ::forget ( shutdown_tx ) ;
let agents = Arc ::new ( AgentPool ::new ( port , watcher_tx . clone ( ) ) ) ;
crate ::chat ::transport ::matrix ::spawn_bot (
config_dir ,
watcher_tx ,
perm_rx ,
agents ,
shutdown_rx ,
Some ( active_project ) ,
gateway_projects ,
2026-04-21 11:47:06 +01:00
gateway_project_urls ,
2026-04-14 18:53:41 +00:00
)
2026-04-14 09:57:11 +00:00
}
2026-04-13 13:02:41 +00:00
// ── Tests ────────────────────────────────────────────────────────────
#[ cfg(test) ]
mod tests {
use super ::* ;
#[ test ]
fn parse_valid_projects_toml ( ) {
let toml_str = r # "
[projects.huskies]
url = "http://localhost:3001"
[projects.robot-studio]
url = "http://localhost:3002"
"# ;
let config : GatewayConfig = toml ::from_str ( toml_str ) . unwrap ( ) ;
assert_eq! ( config . projects . len ( ) , 2 ) ;
assert_eq! ( config . projects [ " huskies " ] . url , " http://localhost:3001 " ) ;
assert_eq! ( config . projects [ " robot-studio " ] . url , " http://localhost:3002 " ) ;
}
#[ test ]
fn parse_empty_projects_toml ( ) {
let toml_str = " [projects] \n " ;
let config : GatewayConfig = toml ::from_str ( toml_str ) . unwrap ( ) ;
assert! ( config . projects . is_empty ( ) ) ;
}
#[ test ]
fn gateway_state_rejects_empty_config ( ) {
2026-04-13 14:07:08 +00:00
let config = GatewayConfig {
projects : BTreeMap ::new ( ) ,
} ;
2026-04-14 18:53:41 +00:00
assert! ( GatewayState ::new ( config , PathBuf ::from ( " . " ) , 3000 ) . is_err ( ) ) ;
2026-04-13 13:02:41 +00:00
}
#[ test ]
fn gateway_state_sets_first_project_active ( ) {
let mut projects = BTreeMap ::new ( ) ;
2026-04-13 14:07:08 +00:00
projects . insert (
" alpha " . into ( ) ,
ProjectEntry {
url : " http://a:3001 " . into ( ) ,
} ,
) ;
projects . insert (
" beta " . into ( ) ,
ProjectEntry {
url : " http://b:3002 " . into ( ) ,
} ,
) ;
2026-04-13 13:02:41 +00:00
let config = GatewayConfig { projects } ;
2026-04-14 18:53:41 +00:00
let state = GatewayState ::new ( config , PathBuf ::from ( " . " ) , 3000 ) . unwrap ( ) ;
2026-04-13 13:02:41 +00:00
let active = state . active_project . blocking_read ( ) . clone ( ) ;
assert_eq! ( active , " alpha " ) ; // BTreeMap sorts alphabetically.
}
#[ test ]
fn gateway_tool_definitions_has_expected_tools ( ) {
let defs = gateway_tool_definitions ( ) ;
2026-04-13 14:07:08 +00:00
let names : Vec < & str > = defs
. iter ( )
2026-04-13 13:02:41 +00:00
. filter_map ( | d | d . get ( " name " ) . and_then ( | n | n . as_str ( ) ) )
. collect ( ) ;
assert! ( names . contains ( & " switch_project " ) ) ;
assert! ( names . contains ( & " gateway_status " ) ) ;
assert! ( names . contains ( & " gateway_health " ) ) ;
}
#[ tokio::test ]
async fn switch_project_to_known_project ( ) {
let mut projects = BTreeMap ::new ( ) ;
2026-04-13 14:07:08 +00:00
projects . insert (
" alpha " . into ( ) ,
ProjectEntry {
url : " http://a:3001 " . into ( ) ,
} ,
) ;
projects . insert (
" beta " . into ( ) ,
ProjectEntry {
url : " http://b:3002 " . into ( ) ,
} ,
) ;
2026-04-13 13:02:41 +00:00
let config = GatewayConfig { projects } ;
2026-04-14 18:53:41 +00:00
let state = GatewayState ::new ( config , PathBuf ::from ( " . " ) , 3000 ) . unwrap ( ) ;
2026-04-13 13:02:41 +00:00
let params = json! ( { " arguments " : { " project " : " beta " } } ) ;
2026-04-16 11:37:30 +00:00
let resp = handle_switch_project ( & params , & state , None ) . await ;
2026-04-13 13:02:41 +00:00
assert! ( resp . result . is_some ( ) ) ;
let active = state . active_project . read ( ) . await . clone ( ) ;
assert_eq! ( active , " beta " ) ;
}
#[ tokio::test ]
async fn switch_project_to_unknown_project_fails ( ) {
let mut projects = BTreeMap ::new ( ) ;
2026-04-13 14:07:08 +00:00
projects . insert (
" alpha " . into ( ) ,
ProjectEntry {
url : " http://a:3001 " . into ( ) ,
} ,
) ;
2026-04-13 13:02:41 +00:00
let config = GatewayConfig { projects } ;
2026-04-14 18:53:41 +00:00
let state = GatewayState ::new ( config , PathBuf ::from ( " . " ) , 3000 ) . unwrap ( ) ;
2026-04-13 13:02:41 +00:00
let params = json! ( { " arguments " : { " project " : " nonexistent " } } ) ;
2026-04-16 11:37:30 +00:00
let resp = handle_switch_project ( & params , & state , None ) . await ;
2026-04-13 13:02:41 +00:00
assert! ( resp . error . is_some ( ) ) ;
}
#[ tokio::test ]
async fn active_url_returns_correct_url ( ) {
let mut projects = BTreeMap ::new ( ) ;
2026-04-13 14:07:08 +00:00
projects . insert (
" myproj " . into ( ) ,
ProjectEntry {
url : " http://my:3001 " . into ( ) ,
} ,
) ;
2026-04-13 13:02:41 +00:00
let config = GatewayConfig { projects } ;
2026-04-14 18:53:41 +00:00
let state = GatewayState ::new ( config , PathBuf ::from ( " . " ) , 3000 ) . unwrap ( ) ;
2026-04-13 13:02:41 +00:00
let url = state . active_url ( ) . await . unwrap ( ) ;
assert_eq! ( url , " http://my:3001 " ) ;
}
#[ test ]
fn json_rpc_response_success_serializes ( ) {
let resp = JsonRpcResponse ::success ( Some ( json! ( 1 ) ) , json! ( { " ok " : true } ) ) ;
let s = serde_json ::to_string ( & resp ) . unwrap ( ) ;
assert! ( s . contains ( " \" result \" " ) ) ;
assert! ( ! s . contains ( " \" error \" " ) ) ;
}
#[ test ]
fn json_rpc_response_error_serializes ( ) {
let resp = JsonRpcResponse ::error ( Some ( json! ( 1 ) ) , - 32600 , " bad " . into ( ) ) ;
let s = serde_json ::to_string ( & resp ) . unwrap ( ) ;
assert! ( s . contains ( " \" error \" " ) ) ;
assert! ( ! s . contains ( " \" result \" " ) ) ;
}
#[ test ]
fn load_config_from_file ( ) {
let dir = tempfile ::tempdir ( ) . unwrap ( ) ;
let path = dir . path ( ) . join ( " projects.toml " ) ;
2026-04-13 14:07:08 +00:00
std ::fs ::write (
& path ,
r # "
2026-04-13 13:02:41 +00:00
[projects.test]
url = "http://localhost:9999"
2026-04-13 14:07:08 +00:00
"# ,
)
. unwrap ( ) ;
2026-04-13 13:02:41 +00:00
let config = GatewayConfig ::load ( & path ) . unwrap ( ) ;
assert_eq! ( config . projects . len ( ) , 1 ) ;
assert_eq! ( config . projects [ " test " ] . url , " http://localhost:9999 " ) ;
}
#[ test ]
fn load_config_missing_file_fails ( ) {
let result = GatewayConfig ::load ( Path ::new ( " /nonexistent/projects.toml " ) ) ;
assert! ( result . is_err ( ) ) ;
}
2026-04-14 09:57:11 +00:00
// ── bot.toml in gateway and standalone modes ─────────────────────────
//
// Both gateway and standalone modes load bot.toml via `BotConfig::load(dir)`
// which looks for `dir/.huskies/bot.toml`. These tests document that the
// same loading convention works from a gateway config directory.
#[ test ]
fn bot_config_loads_from_gateway_config_dir ( ) {
use crate ::chat ::transport ::matrix ::BotConfig ;
let tmp = tempfile ::tempdir ( ) . unwrap ( ) ;
let huskies_dir = tmp . path ( ) . join ( " .huskies " ) ;
std ::fs ::create_dir_all ( & huskies_dir ) . unwrap ( ) ;
std ::fs ::write (
huskies_dir . join ( " bot.toml " ) ,
r # "
homeserver = "https://matrix.example.com"
username = "@bot:example.com"
password = "secret"
room_ids = ["!abc:example.com"]
enabled = true
"# ,
)
. unwrap ( ) ;
// Gateway passes config_dir (parent of projects.toml) to spawn_bot,
// which calls BotConfig::load(config_dir). Verify this resolves correctly.
let config = BotConfig ::load ( tmp . path ( ) ) ;
assert! (
config . is_some ( ) ,
" bot.toml should load from gateway config dir "
) ;
let config = config . unwrap ( ) ;
assert_eq! (
config . homeserver . as_deref ( ) ,
Some ( " https://matrix.example.com " )
) ;
}
#[ test ]
fn bot_config_absent_returns_none_in_gateway_mode ( ) {
use crate ::chat ::transport ::matrix ::BotConfig ;
// A gateway config directory without a .huskies/bot.toml should yield None,
// allowing the gateway to start without a Matrix bot.
let tmp = tempfile ::tempdir ( ) . unwrap ( ) ;
let config = BotConfig ::load ( tmp . path ( ) ) ;
assert! (
config . is_none ( ) ,
" absent bot.toml must return None in gateway mode "
) ;
}
#[ test ]
fn bot_config_disabled_returns_none_in_gateway_mode ( ) {
use crate ::chat ::transport ::matrix ::BotConfig ;
let tmp = tempfile ::tempdir ( ) . unwrap ( ) ;
let huskies_dir = tmp . path ( ) . join ( " .huskies " ) ;
std ::fs ::create_dir_all ( & huskies_dir ) . unwrap ( ) ;
std ::fs ::write (
huskies_dir . join ( " bot.toml " ) ,
r # "
homeserver = "https://matrix.example.com"
username = "@bot:example.com"
password = "secret"
room_ids = ["!abc:example.com"]
enabled = false
"# ,
)
. unwrap ( ) ;
let config = BotConfig ::load ( tmp . path ( ) ) ;
assert! (
config . is_none ( ) ,
" disabled bot.toml must return None in gateway mode "
) ;
}
2026-04-14 12:02:17 +00:00
// ── Agent join mechanism tests ───────────────────────────────────────
fn make_test_state ( ) -> Arc < GatewayState > {
let mut projects = BTreeMap ::new ( ) ;
projects . insert (
" test " . into ( ) ,
ProjectEntry {
url : " http://test:3001 " . into ( ) ,
} ,
) ;
let config = GatewayConfig { projects } ;
2026-04-14 18:53:41 +00:00
Arc ::new ( GatewayState ::new ( config , PathBuf ::new ( ) , 3000 ) . unwrap ( ) )
2026-04-14 12:02:17 +00:00
}
#[ tokio::test ]
async fn generate_token_creates_pending_token ( ) {
let state = make_test_state ( ) ;
let app = poem ::Route ::new ( )
. at (
" /gateway/tokens " ,
poem ::post ( gateway_generate_token_handler ) ,
)
. data ( state . clone ( ) ) ;
let cli = poem ::test ::TestClient ::new ( app ) ;
let resp = cli . post ( " /gateway/tokens " ) . send ( ) . await ;
assert_eq! ( resp . 0. status ( ) , StatusCode ::OK ) ;
let body : Value = resp . 0. into_body ( ) . into_json ( ) . await . unwrap ( ) ;
let token = body [ " token " ] . as_str ( ) . unwrap ( ) ;
assert! ( ! token . is_empty ( ) ) ;
let tokens = state . pending_tokens . read ( ) . await ;
assert! ( tokens . contains_key ( token ) ) ;
}
#[ tokio::test ]
async fn register_agent_consumes_token ( ) {
let state = make_test_state ( ) ;
// Insert a token manually.
let token = " test-token-123 " . to_string ( ) ;
state . pending_tokens . write ( ) . await . insert (
token . clone ( ) ,
PendingToken {
created_at : chrono ::Utc ::now ( ) . timestamp ( ) as f64 ,
} ,
) ;
let app = poem ::Route ::new ( )
. at (
" /gateway/register " ,
poem ::post ( gateway_register_agent_handler ) ,
)
. data ( state . clone ( ) ) ;
let cli = poem ::test ::TestClient ::new ( app ) ;
let resp = cli
. post ( " /gateway/register " )
. header ( " Content-Type " , " application/json " )
. body (
json! ( {
" token " : token ,
" label " : " test-agent " ,
" address " : " ws://localhost:3001/crdt-sync "
} )
. to_string ( ) ,
)
. send ( )
. await ;
assert_eq! ( resp . 0. status ( ) , StatusCode ::OK ) ;
// Token consumed.
assert! ( state . pending_tokens . read ( ) . await . is_empty ( ) ) ;
// Agent registered.
let agents = state . joined_agents . read ( ) . await ;
assert_eq! ( agents . len ( ) , 1 ) ;
assert_eq! ( agents [ 0 ] . label , " test-agent " ) ;
}
#[ tokio::test ]
async fn register_agent_rejects_invalid_token ( ) {
let state = make_test_state ( ) ;
let app = poem ::Route ::new ( )
. at (
" /gateway/register " ,
poem ::post ( gateway_register_agent_handler ) ,
)
. data ( state . clone ( ) ) ;
let cli = poem ::test ::TestClient ::new ( app ) ;
let resp = cli
. post ( " /gateway/register " )
. header ( " Content-Type " , " application/json " )
. body (
json! ( {
" token " : " bad-token " ,
" label " : " agent " ,
" address " : " ws://localhost:3001/crdt-sync "
} )
. to_string ( ) ,
)
. send ( )
. await ;
assert_eq! ( resp . 0. status ( ) , StatusCode ::UNAUTHORIZED ) ;
assert! ( state . joined_agents . read ( ) . await . is_empty ( ) ) ;
}
#[ tokio::test ]
async fn list_agents_returns_registered_agents ( ) {
let state = make_test_state ( ) ;
state . joined_agents . write ( ) . await . push ( JoinedAgent {
id : " id-1 " . into ( ) ,
label : " agent-1 " . into ( ) ,
address : " ws://a:3001/crdt-sync " . into ( ) ,
registered_at : 0.0 ,
2026-04-15 18:20:39 +00:00
last_seen : 0.0 ,
2026-04-14 12:25:12 +00:00
assigned_project : None ,
2026-04-14 12:02:17 +00:00
} ) ;
let app = poem ::Route ::new ( )
. at ( " /gateway/agents " , poem ::get ( gateway_list_agents_handler ) )
. data ( state . clone ( ) ) ;
let cli = poem ::test ::TestClient ::new ( app ) ;
let resp = cli . get ( " /gateway/agents " ) . send ( ) . await ;
assert_eq! ( resp . 0. status ( ) , StatusCode ::OK ) ;
let agents : Vec < Value > = resp . 0. into_body ( ) . into_json ( ) . await . unwrap ( ) ;
assert_eq! ( agents . len ( ) , 1 ) ;
assert_eq! ( agents [ 0 ] [ " label " ] , " agent-1 " ) ;
}
#[ tokio::test ]
async fn remove_agent_deletes_by_id ( ) {
let state = make_test_state ( ) ;
state . joined_agents . write ( ) . await . push ( JoinedAgent {
id : " del-id " . into ( ) ,
label : " to-delete " . into ( ) ,
address : " ws://x:3001/crdt-sync " . into ( ) ,
registered_at : 0.0 ,
2026-04-15 18:20:39 +00:00
last_seen : 0.0 ,
2026-04-14 12:25:12 +00:00
assigned_project : None ,
2026-04-14 12:02:17 +00:00
} ) ;
let app = poem ::Route ::new ( )
. at (
" /gateway/agents/:id " ,
poem ::delete ( gateway_remove_agent_handler ) ,
)
. data ( state . clone ( ) ) ;
let cli = poem ::test ::TestClient ::new ( app ) ;
let resp = cli . delete ( " /gateway/agents/del-id " ) . send ( ) . await ;
assert_eq! ( resp . 0. status ( ) , StatusCode ::NO_CONTENT ) ;
assert! ( state . joined_agents . read ( ) . await . is_empty ( ) ) ;
}
#[ tokio::test ]
async fn remove_agent_unknown_id_returns_not_found ( ) {
let state = make_test_state ( ) ;
let app = poem ::Route ::new ( )
. at (
" /gateway/agents/:id " ,
poem ::delete ( gateway_remove_agent_handler ) ,
)
. data ( state . clone ( ) ) ;
let cli = poem ::test ::TestClient ::new ( app ) ;
let resp = cli . delete ( " /gateway/agents/no-such-id " ) . send ( ) . await ;
assert_eq! ( resp . 0. status ( ) , StatusCode ::NOT_FOUND ) ;
}
2026-04-15 18:20:39 +00:00
#[ tokio::test ]
async fn heartbeat_updates_last_seen ( ) {
let state = make_test_state ( ) ;
state . joined_agents . write ( ) . await . push ( JoinedAgent {
id : " hb-id " . into ( ) ,
label : " hb-agent " . into ( ) ,
address : " ws://hb:3001/crdt-sync " . into ( ) ,
registered_at : 0.0 ,
last_seen : 0.0 ,
assigned_project : None ,
} ) ;
let app = poem ::Route ::new ( )
. at (
" /gateway/agents/:id/heartbeat " ,
poem ::post ( gateway_heartbeat_handler ) ,
)
. data ( state . clone ( ) ) ;
let cli = poem ::test ::TestClient ::new ( app ) ;
let resp = cli . post ( " /gateway/agents/hb-id/heartbeat " ) . send ( ) . await ;
assert_eq! ( resp . 0. status ( ) , StatusCode ::NO_CONTENT ) ;
let agents = state . joined_agents . read ( ) . await ;
assert! ( agents [ 0 ] . last_seen > 0.0 ) ;
}
#[ tokio::test ]
async fn heartbeat_unknown_id_returns_not_found ( ) {
let state = make_test_state ( ) ;
let app = poem ::Route ::new ( )
. at (
" /gateway/agents/:id/heartbeat " ,
poem ::post ( gateway_heartbeat_handler ) ,
)
. data ( state . clone ( ) ) ;
let cli = poem ::test ::TestClient ::new ( app ) ;
let resp = cli
. post ( " /gateway/agents/no-such-id/heartbeat " )
. send ( )
. await ;
assert_eq! ( resp . 0. status ( ) , StatusCode ::NOT_FOUND ) ;
}
2026-04-15 19:50:27 +00:00
/// Build the full gateway route tree and verify it does not panic.
///
/// Poem panics at construction time when duplicate routes are registered.
/// This test catches any regression where a duplicate route is re-introduced
/// (e.g. the `/` vs `/*path` duplicate fixed in commit 0969fb5d).
#[ test ]
fn gateway_route_tree_builds_without_panic ( ) {
let state = make_test_state ( ) ;
// build_gateway_route will panic if any route is registered more than once.
let _route = build_gateway_route ( state ) ;
}
2026-04-22 21:33:44 +00:00
// ── init_project tool tests ──────────────────────────────────────────
#[ tokio::test ]
async fn init_project_scaffolds_huskies_dir ( ) {
let dir = tempfile ::tempdir ( ) . unwrap ( ) ;
let state = make_test_state ( ) ;
let params = json! ( { " arguments " : { " path " : dir . path ( ) . to_str ( ) . unwrap ( ) } } ) ;
let resp = handle_init_project ( & params , & state , Some ( json! ( 1 ) ) ) . await ;
assert! (
resp . result . is_some ( ) ,
" init_project should succeed: {:?} " ,
resp . error
) ;
assert! (
dir . path ( ) . join ( " .huskies " ) . exists ( ) ,
" .huskies/ should be created "
) ;
assert! ( dir . path ( ) . join ( " .huskies/project.toml " ) . exists ( ) ) ;
assert! ( dir . path ( ) . join ( " .huskies/agents.toml " ) . exists ( ) ) ;
assert! ( dir . path ( ) . join ( " script/test " ) . exists ( ) ) ;
}
#[ tokio::test ]
async fn init_project_creates_wizard_state ( ) {
let dir = tempfile ::tempdir ( ) . unwrap ( ) ;
let state = make_test_state ( ) ;
let params = json! ( { " arguments " : { " path " : dir . path ( ) . to_str ( ) . unwrap ( ) } } ) ;
handle_init_project ( & params , & state , None ) . await ;
let wizard_state_path = dir . path ( ) . join ( " .huskies/wizard_state.json " ) ;
assert! (
wizard_state_path . exists ( ) ,
" wizard_state.json should be created "
) ;
let content = std ::fs ::read_to_string ( & wizard_state_path ) . unwrap ( ) ;
let v : Value =
serde_json ::from_str ( & content ) . expect ( " wizard_state.json should be valid JSON " ) ;
assert! (
v . get ( " steps " ) . is_some ( ) ,
" wizard state should have a 'steps' field "
) ;
assert! (
v . get ( " completed " ) . is_some ( ) ,
" wizard state should have a 'completed' field "
) ;
}
#[ tokio::test ]
async fn init_project_already_initialised_returns_error ( ) {
let dir = tempfile ::tempdir ( ) . unwrap ( ) ;
std ::fs ::create_dir_all ( dir . path ( ) . join ( " .huskies " ) ) . unwrap ( ) ;
let state = make_test_state ( ) ;
let params = json! ( { " arguments " : { " path " : dir . path ( ) . to_str ( ) . unwrap ( ) } } ) ;
let resp = handle_init_project ( & params , & state , None ) . await ;
assert! (
resp . error . is_some ( ) ,
" should return error for already-initialised project "
) ;
let msg = & resp . error . unwrap ( ) . message ;
assert! ( msg . contains ( " .huskies/ " ) , " error should mention .huskies/ " ) ;
}
#[ tokio::test ]
async fn init_project_missing_path_returns_error ( ) {
let state = make_test_state ( ) ;
let params = json! ( { " arguments " : { } } ) ;
let resp = handle_init_project ( & params , & state , None ) . await ;
assert! ( resp . error . is_some ( ) ) ;
}
#[ tokio::test ]
async fn init_project_registers_in_projects_toml_when_name_and_url_given ( ) {
let dir = tempfile ::tempdir ( ) . unwrap ( ) ;
let config_dir = tempfile ::tempdir ( ) . unwrap ( ) ;
let mut projects = BTreeMap ::new ( ) ;
projects . insert (
" existing " . into ( ) ,
ProjectEntry {
url : " http://existing:3001 " . into ( ) ,
} ,
) ;
let config = GatewayConfig { projects } ;
let state =
Arc ::new ( GatewayState ::new ( config , config_dir . path ( ) . to_path_buf ( ) , 3000 ) . unwrap ( ) ) ;
let params = json! ( {
" arguments " : {
" path " : dir . path ( ) . to_str ( ) . unwrap ( ) ,
" name " : " new-project " ,
" url " : " http://new-project:3002 "
}
} ) ;
let resp = handle_init_project ( & params , & state , Some ( json! ( 1 ) ) ) . await ;
assert! ( resp . result . is_some ( ) , " should succeed: {:?} " , resp . error ) ;
// Project should be registered.
let projects = state . projects . read ( ) . await ;
assert! (
projects . contains_key ( " new-project " ) ,
" new-project should be in projects map "
) ;
assert_eq! ( projects [ " new-project " ] . url , " http://new-project:3002 " ) ;
}
#[ tokio::test ]
async fn init_project_duplicate_name_returns_error ( ) {
let dir = tempfile ::tempdir ( ) . unwrap ( ) ;
let mut projects = BTreeMap ::new ( ) ;
projects . insert (
" taken " . into ( ) ,
ProjectEntry {
url : " http://taken:3001 " . into ( ) ,
} ,
) ;
let config = GatewayConfig { projects } ;
let state = Arc ::new ( GatewayState ::new ( config , PathBuf ::new ( ) , 3000 ) . unwrap ( ) ) ;
let params = json! ( {
" arguments " : {
" path " : dir . path ( ) . to_str ( ) . unwrap ( ) ,
" name " : " taken " ,
" url " : " http://new:3002 "
}
} ) ;
let resp = handle_init_project ( & params , & state , None ) . await ;
assert! ( resp . error . is_some ( ) , " duplicate name should return error " ) ;
}
/// Integration test: call init_project then call wizard_status via the MCP
/// proxy and confirm a valid wizard state response is returned.
///
/// A lightweight mock HTTP server is started to stand in for the project
/// container, returning a pre-canned wizard_status result.
#[ tokio::test ]
async fn init_project_then_wizard_status_integration ( ) {
use tokio ::io ::{ AsyncReadExt , AsyncWriteExt } ;
// Start a mock project MCP server on an ephemeral port.
let listener = tokio ::net ::TcpListener ::bind ( " 127.0.0.1:0 " ) . await . unwrap ( ) ;
let mock_port = listener . local_addr ( ) . unwrap ( ) . port ( ) ;
let mock_url = format! ( " http://127.0.0.1: {mock_port} " ) ;
// Spawn the mock: accept one connection and return a wizard_status response.
tokio ::spawn ( async move {
if let Ok ( ( mut stream , _ ) ) = listener . accept ( ) . await {
let mut buf = vec! [ 0 u8 ; 4096 ] ;
let _ = stream . read ( & mut buf ) . await ;
let body = serde_json ::json! ( {
" jsonrpc " : " 2.0 " ,
" id " : 1 ,
" result " : {
" content " : [ {
" type " : " text " ,
" text " : " { \" steps \" :[{ \" id \" : \" scaffold \" , \" title \" : \" Scaffold \" , \" status \" : \" confirmed \" }], \" completed \" :false} "
} ]
}
} ) ;
let body_bytes = serde_json ::to_vec ( & body ) . unwrap ( ) ;
let header = format! (
" HTTP/1.1 200 OK \r \n Content-Type: application/json \r \n Content-Length: {} \r \n Connection: close \r \n \r \n " ,
body_bytes . len ( )
) ;
let _ = stream . write_all ( header . as_bytes ( ) ) . await ;
let _ = stream . write_all ( & body_bytes ) . await ;
}
} ) ;
// Give the mock a moment to start.
tokio ::time ::sleep ( std ::time ::Duration ::from_millis ( 10 ) ) . await ;
// Create gateway state pointing at the mock server.
let mut projects = BTreeMap ::new ( ) ;
projects . insert ( " mock-project " . into ( ) , ProjectEntry { url : mock_url } ) ;
let config = GatewayConfig { projects } ;
let config_dir = tempfile ::tempdir ( ) . unwrap ( ) ;
let state =
Arc ::new ( GatewayState ::new ( config , config_dir . path ( ) . to_path_buf ( ) , 3000 ) . unwrap ( ) ) ;
// 1. Call init_project.
let project_dir = tempfile ::tempdir ( ) . unwrap ( ) ;
let params = json! ( {
" arguments " : { " path " : project_dir . path ( ) . to_str ( ) . unwrap ( ) }
} ) ;
let resp = handle_init_project ( & params , & state , Some ( json! ( 1 ) ) ) . await ;
assert! (
resp . result . is_some ( ) ,
" init_project should succeed: {:?} " ,
resp . error
) ;
// Verify scaffolding.
assert! (
project_dir . path ( ) . join ( " .huskies " ) . exists ( ) ,
" .huskies/ must be created "
) ;
let wizard_path = project_dir . path ( ) . join ( " .huskies/wizard_state.json " ) ;
assert! ( wizard_path . exists ( ) , " wizard_state.json must be created " ) ;
// 2. Call wizard_status via the MCP proxy (proxied to our mock server).
let proxy_body = serde_json ::to_vec ( & json! ( {
" jsonrpc " : " 2.0 " ,
" id " : 2 ,
" method " : " tools/call " ,
" params " : { " name " : " wizard_status " , " arguments " : { } }
} ) )
. unwrap ( ) ;
let proxy_resp = proxy_mcp_call ( & state , & proxy_body ) . await ;
assert! (
proxy_resp . is_ok ( ) ,
" proxy call should succeed: {:?} " ,
proxy_resp . err ( )
) ;
// 3. Confirm the response contains wizard state data.
let resp_json : Value = serde_json ::from_slice ( & proxy_resp . unwrap ( ) ) . unwrap ( ) ;
let result = resp_json . get ( " result " ) ;
assert! ( result . is_some ( ) , " response should have a result field " ) ;
let text = result
. and_then ( | r | r . get ( " content " ) )
. and_then ( | c | c . get ( 0 ) )
. and_then ( | c | c . get ( " text " ) )
. and_then ( | t | t . as_str ( ) )
. unwrap_or ( " " ) ;
let wizard : Value =
serde_json ::from_str ( text ) . expect ( " text should be valid wizard state JSON " ) ;
assert! (
wizard . get ( " steps " ) . is_some ( ) ,
" wizard state should have a 'steps' field "
) ;
}
#[ test ]
fn gateway_tool_definitions_includes_init_project ( ) {
let defs = gateway_tool_definitions ( ) ;
let names : Vec < & str > = defs
. iter ( )
. filter_map ( | d | d . get ( " name " ) . and_then ( | n | n . as_str ( ) ) )
. collect ( ) ;
assert! (
names . contains ( & " init_project " ) ,
" init_project should be in gateway tool definitions "
) ;
}
2026-04-23 10:38:30 +00:00
#[ test ]
fn gateway_tool_definitions_includes_aggregate_pipeline_status ( ) {
let defs = gateway_tool_definitions ( ) ;
let names : Vec < & str > = defs
. iter ( )
. filter_map ( | d | d . get ( " name " ) . and_then ( | n | n . as_str ( ) ) )
. collect ( ) ;
assert! (
names . contains ( & " aggregate_pipeline_status " ) ,
" aggregate_pipeline_status should be in gateway tool definitions "
) ;
}
// ── aggregate_pipeline_counts unit tests ─────────────────────────────────
#[ test ]
fn aggregate_pipeline_counts_empty_pipeline ( ) {
let pipeline = json! ( { " active " : [ ] , " backlog " : [ ] , " backlog_count " : 0 } ) ;
let result = aggregate_pipeline_counts ( & pipeline ) ;
assert_eq! ( result [ " counts " ] [ " backlog " ] , 0 ) ;
assert_eq! ( result [ " counts " ] [ " current " ] , 0 ) ;
assert_eq! ( result [ " counts " ] [ " qa " ] , 0 ) ;
assert_eq! ( result [ " counts " ] [ " merge " ] , 0 ) ;
assert_eq! ( result [ " counts " ] [ " done " ] , 0 ) ;
assert_eq! ( result [ " blocked " ] . as_array ( ) . unwrap ( ) . len ( ) , 0 ) ;
}
#[ test ]
fn aggregate_pipeline_counts_stage_counts_correct ( ) {
let pipeline = json! ( {
" active " : [
{ " story_id " : " 1_story_a " , " name " : " A " , " stage " : " current " } ,
{ " story_id " : " 2_story_b " , " name " : " B " , " stage " : " current " } ,
{ " story_id " : " 3_story_c " , " name " : " C " , " stage " : " qa " } ,
{ " story_id " : " 4_story_d " , " name " : " D " , " stage " : " done " } ,
] ,
" backlog " : [ { " story_id " : " 5_story_e " , " name " : " E " } , { " story_id " : " 6_story_f " , " name " : " F " } ] ,
" backlog_count " : 2
} ) ;
let result = aggregate_pipeline_counts ( & pipeline ) ;
assert_eq! ( result [ " counts " ] [ " backlog " ] , 2 ) ;
assert_eq! ( result [ " counts " ] [ " current " ] , 2 ) ;
assert_eq! ( result [ " counts " ] [ " qa " ] , 1 ) ;
assert_eq! ( result [ " counts " ] [ " merge " ] , 0 ) ;
assert_eq! ( result [ " counts " ] [ " done " ] , 1 ) ;
assert_eq! ( result [ " blocked " ] . as_array ( ) . unwrap ( ) . len ( ) , 0 ) ;
}
#[ test ]
fn aggregate_pipeline_counts_blocked_items_captured ( ) {
let pipeline = json! ( {
" active " : [
{ " story_id " : " 10_story_blocked " , " name " : " Blocked " , " stage " : " current " , " blocked " : true , " retry_count " : 3 } ,
{ " story_id " : " 20_story_ok " , " name " : " OK " , " stage " : " qa " } ,
] ,
" backlog " : [ ] ,
" backlog_count " : 0
} ) ;
let result = aggregate_pipeline_counts ( & pipeline ) ;
let blocked = result [ " blocked " ] . as_array ( ) . unwrap ( ) ;
assert_eq! ( blocked . len ( ) , 1 ) ;
assert_eq! ( blocked [ 0 ] [ " story_id " ] , " 10_story_blocked " ) ;
assert_eq! ( blocked [ 0 ] [ " stage " ] , " current " ) ;
assert! (
blocked [ 0 ] [ " reason " ]
. as_str ( )
. unwrap ( )
. contains ( " blocked after 3 retries " ) ,
" reason: {} " ,
blocked [ 0 ] [ " reason " ]
) ;
}
#[ test ]
fn format_aggregate_status_compact_healthy_project ( ) {
let mut statuses = BTreeMap ::new ( ) ;
statuses . insert (
" huskies " . to_string ( ) ,
json! ( {
" counts " : { " backlog " : 5 , " current " : 2 , " qa " : 1 , " merge " : 0 , " done " : 12 } ,
" blocked " : [ ]
} ) ,
) ;
let output = format_aggregate_status_compact ( & statuses ) ;
assert! ( output . contains ( " huskies " ) , " output: {output} " ) ;
assert! ( output . contains ( " B:5 " ) , " output: {output} " ) ;
assert! ( output . contains ( " C:2 " ) , " output: {output} " ) ;
assert! ( output . contains ( " Q:1 " ) , " output: {output} " ) ;
assert! ( output . contains ( " D:12 " ) , " output: {output} " ) ;
assert! ( ! output . contains ( " blocked: " ) , " output: {output} " ) ;
}
#[ test ]
fn format_aggregate_status_compact_unreachable_project ( ) {
let mut statuses = BTreeMap ::new ( ) ;
statuses . insert (
" broken " . to_string ( ) ,
json! ( { " error " : " connection refused " } ) ,
) ;
let output = format_aggregate_status_compact ( & statuses ) ;
assert! ( output . contains ( " broken " ) , " output: {output} " ) ;
assert! ( output . contains ( " UNREACHABLE " ) , " output: {output} " ) ;
assert! ( output . contains ( " connection refused " ) , " output: {output} " ) ;
}
#[ test ]
fn format_aggregate_status_compact_blocked_items_shown ( ) {
let mut statuses = BTreeMap ::new ( ) ;
statuses . insert (
" myproj " . to_string ( ) ,
json! ( {
" counts " : { " backlog " : 0 , " current " : 1 , " qa " : 0 , " merge " : 0 , " done " : 0 } ,
" blocked " : [ { " story_id " : " 42_story_x " , " name " : " X " , " stage " : " current " , " reason " : " blocked after 3 retries " } ]
} ) ,
) ;
let output = format_aggregate_status_compact ( & statuses ) ;
assert! ( output . contains ( " blocked: " ) , " output: {output} " ) ;
assert! ( output . contains ( " 42_story_x " ) , " output: {output} " ) ;
}
/// Integration test: two mock projects (one healthy, one unreachable).
/// Asserts that `fetch_all_project_pipeline_statuses` reports both correctly.
#[ tokio::test ]
async fn aggregate_pipeline_status_integration_healthy_and_unreachable ( ) {
use tokio ::io ::{ AsyncReadExt , AsyncWriteExt } ;
// Start a mock project MCP server that returns a get_pipeline_status response.
let listener = tokio ::net ::TcpListener ::bind ( " 127.0.0.1:0 " ) . await . unwrap ( ) ;
let mock_port = listener . local_addr ( ) . unwrap ( ) . port ( ) ;
let healthy_url = format! ( " http://127.0.0.1: {mock_port} " ) ;
// The mock responds to exactly 1 connection then stops.
tokio ::spawn ( async move {
if let Ok ( ( mut stream , _ ) ) = listener . accept ( ) . await {
let mut buf = vec! [ 0 u8 ; 4096 ] ;
let _ = stream . read ( & mut buf ) . await ;
// Return a pipeline status with items at multiple stages and one blocked item.
let pipeline_json = serde_json ::to_string ( & json! ( {
" active " : [
{ " story_id " : " 1_story_a " , " name " : " A " , " stage " : " current " } ,
{ " story_id " : " 2_story_b " , " name " : " B " , " stage " : " qa " } ,
{ " story_id " : " 3_story_c " , " name " : " C " , " stage " : " current " , " blocked " : true , " retry_count " : 5 } ,
] ,
" backlog " : [ { " story_id " : " 4_story_d " , " name " : " D " } ] ,
" backlog_count " : 1
} ) ) . unwrap ( ) ;
let body = serde_json ::to_vec ( & json! ( {
" jsonrpc " : " 2.0 " ,
" id " : 1 ,
" result " : {
" content " : [ { " type " : " text " , " text " : pipeline_json } ]
}
} ) )
. unwrap ( ) ;
let header = format! (
" HTTP/1.1 200 OK \r \n Content-Type: application/json \r \n Content-Length: {} \r \n Connection: close \r \n \r \n " ,
body . len ( )
) ;
let _ = stream . write_all ( header . as_bytes ( ) ) . await ;
let _ = stream . write_all ( & body ) . await ;
}
} ) ;
// Give the mock a moment to bind.
tokio ::time ::sleep ( std ::time ::Duration ::from_millis ( 10 ) ) . await ;
// Second "project" points to an unreachable port.
let unreachable_url = " http://127.0.0.1:1 " . to_string ( ) ; // port 1 is not bindable
let mut project_urls = BTreeMap ::new ( ) ;
project_urls . insert ( " healthy-project " . to_string ( ) , healthy_url ) ;
project_urls . insert ( " broken-project " . to_string ( ) , unreachable_url ) ;
let client = Client ::new ( ) ;
let statuses = fetch_all_project_pipeline_statuses ( & project_urls , & client ) . await ;
// Both projects should be present in the result.
assert! (
statuses . contains_key ( " healthy-project " ) ,
" healthy-project must be in response "
) ;
assert! (
statuses . contains_key ( " broken-project " ) ,
" broken-project must be in response "
) ;
// Healthy project should have correct counts.
let healthy = & statuses [ " healthy-project " ] ;
assert! (
healthy . get ( " error " ) . is_none ( ) ,
" healthy project should not have error: {healthy} "
) ;
assert_eq! ( healthy [ " counts " ] [ " backlog " ] , 1 ) ;
assert_eq! ( healthy [ " counts " ] [ " current " ] , 2 ) ;
assert_eq! ( healthy [ " counts " ] [ " qa " ] , 1 ) ;
// Healthy project should report the blocked item.
let blocked = healthy [ " blocked " ] . as_array ( ) . unwrap ( ) ;
assert_eq! ( blocked . len ( ) , 1 , " expected 1 blocked item: {blocked:?} " ) ;
assert_eq! ( blocked [ 0 ] [ " story_id " ] , " 3_story_c " ) ;
// Unreachable project should have an error field.
let broken = & statuses [ " broken-project " ] ;
assert! (
broken . get ( " error " ) . is_some ( ) ,
" unreachable project must have error field: {broken} "
) ;
}
2026-04-13 13:02:41 +00:00
}