diff --git a/server/src/chat/transport/matrix/bot/messages.rs b/server/src/chat/transport/matrix/bot/messages.rs index 481a891a..dff4703d 100644 --- a/server/src/chat/transport/matrix/bot/messages.rs +++ b/server/src/chat/transport/matrix/bot/messages.rs @@ -185,7 +185,8 @@ pub(super) async fn on_room_message( // endpoint. Only a small set of gateway-local commands are handled here. if ctx.is_gateway() { // Commands that are meaningful on the gateway itself (no project state needed). - const GATEWAY_LOCAL_COMMANDS: &[&str] = &["help", "ambient", "reset", "switch"]; + const GATEWAY_LOCAL_COMMANDS: &[&str] = + &["help", "ambient", "reset", "switch", "all_status"]; let stripped = crate::chat::util::strip_bot_mention( &user_message, @@ -229,6 +230,26 @@ pub(super) async fn on_room_message( } return; } + + // `all_status` — aggregate pipeline status across all projects (gateway-only). + if cmd == "all_status" { + let project_urls = ctx.gateway_project_urls.clone(); + let client = reqwest::Client::new(); + let statuses = + crate::gateway::fetch_all_project_pipeline_statuses(&project_urls, &client).await; + let response = crate::gateway::format_aggregate_status_compact(&statuses); + let html = markdown_to_html(&response); + if let Ok(msg_id) = ctx + .transport + .send_message(&room_id_str, &response, &html) + .await + && let Ok(event_id) = msg_id.parse() + { + ctx.bot_sent_event_ids.lock().await.insert(event_id); + } + return; + } + // Gateway-local commands and freeform text fall through to normal handling below. } diff --git a/server/src/gateway.rs b/server/src/gateway.rs index 26c95df4..05c65e23 100644 --- a/server/src/gateway.rs +++ b/server/src/gateway.rs @@ -257,6 +257,7 @@ const GATEWAY_TOOLS: &[&str] = &[ "gateway_status", "gateway_health", "init_project", + "aggregate_pipeline_status", ]; /// Main MCP POST handler for the gateway. Intercepts gateway-specific tools and @@ -439,6 +440,14 @@ fn gateway_tool_definitions() -> Vec { "required": ["path"] } }), + json!({ + "name": "aggregate_pipeline_status", + "description": "Fetch pipeline status from ALL registered projects in parallel and return an aggregated report. For each project: stage counts (backlog/current/qa/merge/done) and a list of blocked or failing items with triage detail. Unreachable projects are included with an error state rather than failing the whole call.", + "inputSchema": { + "type": "object", + "properties": {} + } + }), ] } @@ -518,6 +527,7 @@ async fn handle_gateway_tool( "gateway_status" => handle_gateway_status(state, id).await, "gateway_health" => handle_gateway_health(state, id).await, "init_project" => handle_init_project(params, state, id).await, + "aggregate_pipeline_status" => handle_aggregate_pipeline_status(state, id).await, _ => JsonRpcResponse::error(id, -32601, format!("Unknown gateway tool: {tool_name}")), } } @@ -659,6 +669,257 @@ async fn handle_gateway_health(state: &GatewayState, id: Option) -> JsonR ) } +// ── Aggregate pipeline status ───────────────────────────────────────── + +/// Fetch `get_pipeline_status` from every registered project URL in parallel. +/// +/// Returns a `BTreeMap` of project name → per-project status JSON. Each value +/// is either `{"counts": {...}, "blocked": [...]}` on success or +/// `{"error": "..."}` when the project container is unreachable or returns an +/// unexpected response. A single flaky project never causes the whole call to +/// fail. +pub async fn fetch_all_project_pipeline_statuses( + project_urls: &BTreeMap, + client: &Client, +) -> BTreeMap { + use futures::future::join_all; + + let futures: Vec<_> = project_urls + .iter() + .map(|(name, url)| { + let name = name.clone(); + let url = url.clone(); + let client = client.clone(); + async move { + let result = fetch_one_project_pipeline_status(&url, &client).await; + (name, result) + } + }) + .collect(); + + join_all(futures).await.into_iter().collect() +} + +/// Fetch and aggregate pipeline status for a single project URL. +async fn fetch_one_project_pipeline_status(url: &str, client: &Client) -> Value { + let mcp_url = format!("{}/mcp", url.trim_end_matches('/')); + let rpc_body = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/call", + "params": { + "name": "get_pipeline_status", + "arguments": {} + } + }); + + match client.post(&mcp_url).json(&rpc_body).send().await { + Ok(resp) => match resp.json::().await { + Ok(upstream) => { + if let Some(text) = upstream + .get("result") + .and_then(|r| r.get("content")) + .and_then(|c| c.get(0)) + .and_then(|c| c.get("text")) + .and_then(|t| t.as_str()) + { + match serde_json::from_str::(text) { + Ok(pipeline) => aggregate_pipeline_counts(&pipeline), + Err(_) => json!({ "error": "invalid pipeline JSON" }), + } + } else { + json!({ "error": "unexpected response shape" }) + } + } + Err(e) => json!({ "error": format!("invalid response: {e}") }), + }, + Err(e) => json!({ "error": format!("unreachable: {e}") }), + } +} + +/// Parse a `get_pipeline_status` JSON payload and produce aggregated counts +/// plus a list of blocked/failing items. +fn aggregate_pipeline_counts(pipeline: &Value) -> Value { + let active = pipeline + .get("active") + .and_then(|a| a.as_array()) + .cloned() + .unwrap_or_default(); + let backlog_count = pipeline + .get("backlog_count") + .and_then(|n| n.as_u64()) + .unwrap_or(0); + + let mut current = 0u64; + let mut qa = 0u64; + let mut merge = 0u64; + let mut done = 0u64; + let mut blocked: Vec = Vec::new(); + + for item in &active { + let stage = item + .get("stage") + .and_then(|s| s.as_str()) + .unwrap_or("unknown"); + match stage { + "current" => current += 1, + "qa" => qa += 1, + "merge" => merge += 1, + "done" => done += 1, + _ => {} + } + + let is_blocked = item + .get("blocked") + .and_then(|b| b.as_bool()) + .unwrap_or(false); + let merge_failure = item.get("merge_failure"); + let has_merge_failure = merge_failure + .map(|f| !f.is_null() && f != "") + .unwrap_or(false); + + if is_blocked || has_merge_failure { + let story_id = item + .get("story_id") + .and_then(|s| s.as_str()) + .unwrap_or("?") + .to_string(); + let story_name = item + .get("name") + .and_then(|s| s.as_str()) + .unwrap_or("") + .to_string(); + let reason = if has_merge_failure { + format!( + "merge failure: {}", + merge_failure.and_then(|f| f.as_str()).unwrap_or("unknown") + ) + } else { + let rc = item + .get("retry_count") + .and_then(|n| n.as_u64()) + .unwrap_or(0); + format!("blocked after {rc} retries") + }; + blocked.push(json!({ + "story_id": story_id, + "name": story_name, + "stage": stage, + "reason": reason, + })); + } + } + + json!({ + "counts": { + "backlog": backlog_count, + "current": current, + "qa": qa, + "merge": merge, + "done": done, + }, + "blocked": blocked, + }) +} + +/// Format an aggregated status map as a compact, one-line-per-project string +/// suitable for Matrix/Slack messages. +/// +/// Healthy projects: `🟢 **name** — B:5 C:2 Q:1 M:0 D:12` +/// Blocked items appended on the same line: `| blocked: 42 [story]` +/// Unreachable projects: `🔴 **name** — UNREACHABLE` +pub fn format_aggregate_status_compact(statuses: &BTreeMap) -> String { + let mut lines: Vec = Vec::new(); + for (name, status) in statuses { + if let Some(err) = status.get("error").and_then(|e| e.as_str()) { + lines.push(format!("\u{1F534} **{name}** — UNREACHABLE: {err}")); + } else { + let counts = status.get("counts"); + let b = counts + .and_then(|c| c.get("backlog")) + .and_then(|n| n.as_u64()) + .unwrap_or(0); + let c = counts + .and_then(|c| c.get("current")) + .and_then(|n| n.as_u64()) + .unwrap_or(0); + let q = counts + .and_then(|c| c.get("qa")) + .and_then(|n| n.as_u64()) + .unwrap_or(0); + let m = counts + .and_then(|c| c.get("merge")) + .and_then(|n| n.as_u64()) + .unwrap_or(0); + let d = counts + .and_then(|c| c.get("done")) + .and_then(|n| n.as_u64()) + .unwrap_or(0); + + let blocked_arr = status + .get("blocked") + .and_then(|a| a.as_array()) + .cloned() + .unwrap_or_default(); + + let indicator = if blocked_arr.is_empty() { + "\u{1F7E2}" // 🟢 + } else { + "\u{1F7E0}" // 🟠 + }; + + let mut line = format!("{indicator} **{name}** — B:{b} C:{c} Q:{q} M:{m} D:{d}"); + + if !blocked_arr.is_empty() { + let ids: Vec = blocked_arr + .iter() + .filter_map(|item| item.get("story_id").and_then(|s| s.as_str())) + .map(|s| s.to_string()) + .collect(); + line.push_str(&format!(" | blocked: {}", ids.join(", "))); + } + + lines.push(line); + } + } + if lines.is_empty() { + return "No projects registered.".to_string(); + } + format!("**All Projects**\n\n{}", lines.join("\n\n")) +} + +/// MCP tool handler for `aggregate_pipeline_status`. +async fn handle_aggregate_pipeline_status( + state: &GatewayState, + id: Option, +) -> JsonRpcResponse { + let project_urls: BTreeMap = state + .projects + .read() + .await + .iter() + .map(|(name, entry)| (name.clone(), entry.url.clone())) + .collect(); + + let statuses = fetch_all_project_pipeline_statuses(&project_urls, &state.client).await; + let active = state.active_project.read().await.clone(); + + JsonRpcResponse::success( + id, + json!({ + "content": [{ + "type": "text", + "text": format!( + "Aggregate pipeline status (active: '{active}'):\n{}", + serde_json::to_string_pretty(&statuses).unwrap_or_default() + ) + }], + "projects": statuses, + "active": active, + }), + ) +} + /// Initialise a new huskies project at the given filesystem path. /// /// Performs the same scaffolding as `huskies init `: creates `.huskies/`, @@ -1473,12 +1734,14 @@ fn toml_string(s: &str) -> String { format!("\"{}\"", s.replace('\\', "\\\\").replace('"', "\\\"")) } -/// `GET /api/gateway/pipeline` — fetch pipeline status from all registered projects. +/// `GET /api/gateway/pipeline` — fetch pipeline status from all registered projects in parallel. /// -/// Returns `{ "active": "", "projects": { "": { "active": [...], "backlog": [...], "backlog_count": N } | { "error": "..." } } }`. +/// Returns `{ "active": "", "projects": { "": { "counts": {...}, "blocked": [...] } | { "error": "..." } } }`. +/// Requests to each project container are issued concurrently — wall-clock latency is +/// bounded by the slowest responding project, not the sum of all response times. #[handler] pub async fn gateway_all_pipeline_handler(state: Data<&Arc>) -> Response { - let project_entries: Vec<(String, String)> = state + let project_urls: BTreeMap = state .projects .read() .await @@ -1486,44 +1749,7 @@ pub async fn gateway_all_pipeline_handler(state: Data<&Arc>) -> Re .map(|(n, e)| (n.clone(), e.url.clone())) .collect(); - let mut results: BTreeMap = BTreeMap::new(); - - for (name, url) in &project_entries { - let mcp_url = format!("{}/mcp", url.trim_end_matches('/')); - let rpc_body = json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "tools/call", - "params": { - "name": "get_pipeline_status", - "arguments": {} - } - }); - - let status = match state.client.post(&mcp_url).json(&rpc_body).send().await { - Ok(resp) => match resp.json::().await { - Ok(upstream) => { - // The tool result is a JSON string embedded in content[0].text. - if let Some(text) = upstream - .get("result") - .and_then(|r| r.get("content")) - .and_then(|c| c.get(0)) - .and_then(|c| c.get("text")) - .and_then(|t| t.as_str()) - { - serde_json::from_str(text) - .unwrap_or_else(|_| json!({ "error": "invalid pipeline json" })) - } else { - json!({ "error": "unexpected response shape" }) - } - } - Err(e) => json!({ "error": format!("invalid response: {e}") }), - }, - Err(e) => json!({ "error": format!("unreachable: {e}") }), - }; - - results.insert(name.clone(), status); - } + let results = fetch_all_project_pipeline_statuses(&project_urls, &state.client).await; let active = state.active_project.read().await.clone(); let body = json!({ "active": active, "projects": results }); @@ -2701,4 +2927,213 @@ enabled = false "init_project should be in gateway tool definitions" ); } + + #[test] + fn gateway_tool_definitions_includes_aggregate_pipeline_status() { + let defs = gateway_tool_definitions(); + let names: Vec<&str> = defs + .iter() + .filter_map(|d| d.get("name").and_then(|n| n.as_str())) + .collect(); + assert!( + names.contains(&"aggregate_pipeline_status"), + "aggregate_pipeline_status should be in gateway tool definitions" + ); + } + + // ── aggregate_pipeline_counts unit tests ───────────────────────────────── + + #[test] + fn aggregate_pipeline_counts_empty_pipeline() { + let pipeline = json!({ "active": [], "backlog": [], "backlog_count": 0 }); + let result = aggregate_pipeline_counts(&pipeline); + assert_eq!(result["counts"]["backlog"], 0); + assert_eq!(result["counts"]["current"], 0); + assert_eq!(result["counts"]["qa"], 0); + assert_eq!(result["counts"]["merge"], 0); + assert_eq!(result["counts"]["done"], 0); + assert_eq!(result["blocked"].as_array().unwrap().len(), 0); + } + + #[test] + fn aggregate_pipeline_counts_stage_counts_correct() { + let pipeline = json!({ + "active": [ + { "story_id": "1_story_a", "name": "A", "stage": "current" }, + { "story_id": "2_story_b", "name": "B", "stage": "current" }, + { "story_id": "3_story_c", "name": "C", "stage": "qa" }, + { "story_id": "4_story_d", "name": "D", "stage": "done" }, + ], + "backlog": [{ "story_id": "5_story_e", "name": "E" }, { "story_id": "6_story_f", "name": "F" }], + "backlog_count": 2 + }); + let result = aggregate_pipeline_counts(&pipeline); + assert_eq!(result["counts"]["backlog"], 2); + assert_eq!(result["counts"]["current"], 2); + assert_eq!(result["counts"]["qa"], 1); + assert_eq!(result["counts"]["merge"], 0); + assert_eq!(result["counts"]["done"], 1); + assert_eq!(result["blocked"].as_array().unwrap().len(), 0); + } + + #[test] + fn aggregate_pipeline_counts_blocked_items_captured() { + let pipeline = json!({ + "active": [ + { "story_id": "10_story_blocked", "name": "Blocked", "stage": "current", "blocked": true, "retry_count": 3 }, + { "story_id": "20_story_ok", "name": "OK", "stage": "qa" }, + ], + "backlog": [], + "backlog_count": 0 + }); + let result = aggregate_pipeline_counts(&pipeline); + let blocked = result["blocked"].as_array().unwrap(); + assert_eq!(blocked.len(), 1); + assert_eq!(blocked[0]["story_id"], "10_story_blocked"); + assert_eq!(blocked[0]["stage"], "current"); + assert!( + blocked[0]["reason"] + .as_str() + .unwrap() + .contains("blocked after 3 retries"), + "reason: {}", + blocked[0]["reason"] + ); + } + + #[test] + fn format_aggregate_status_compact_healthy_project() { + let mut statuses = BTreeMap::new(); + statuses.insert( + "huskies".to_string(), + json!({ + "counts": { "backlog": 5, "current": 2, "qa": 1, "merge": 0, "done": 12 }, + "blocked": [] + }), + ); + let output = format_aggregate_status_compact(&statuses); + assert!(output.contains("huskies"), "output: {output}"); + assert!(output.contains("B:5"), "output: {output}"); + assert!(output.contains("C:2"), "output: {output}"); + assert!(output.contains("Q:1"), "output: {output}"); + assert!(output.contains("D:12"), "output: {output}"); + assert!(!output.contains("blocked:"), "output: {output}"); + } + + #[test] + fn format_aggregate_status_compact_unreachable_project() { + let mut statuses = BTreeMap::new(); + statuses.insert( + "broken".to_string(), + json!({ "error": "connection refused" }), + ); + let output = format_aggregate_status_compact(&statuses); + assert!(output.contains("broken"), "output: {output}"); + assert!(output.contains("UNREACHABLE"), "output: {output}"); + assert!(output.contains("connection refused"), "output: {output}"); + } + + #[test] + fn format_aggregate_status_compact_blocked_items_shown() { + let mut statuses = BTreeMap::new(); + statuses.insert( + "myproj".to_string(), + json!({ + "counts": { "backlog": 0, "current": 1, "qa": 0, "merge": 0, "done": 0 }, + "blocked": [{ "story_id": "42_story_x", "name": "X", "stage": "current", "reason": "blocked after 3 retries" }] + }), + ); + let output = format_aggregate_status_compact(&statuses); + assert!(output.contains("blocked:"), "output: {output}"); + assert!(output.contains("42_story_x"), "output: {output}"); + } + + /// Integration test: two mock projects (one healthy, one unreachable). + /// Asserts that `fetch_all_project_pipeline_statuses` reports both correctly. + #[tokio::test] + async fn aggregate_pipeline_status_integration_healthy_and_unreachable() { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + // Start a mock project MCP server that returns a get_pipeline_status response. + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let mock_port = listener.local_addr().unwrap().port(); + let healthy_url = format!("http://127.0.0.1:{mock_port}"); + + // The mock responds to exactly 1 connection then stops. + tokio::spawn(async move { + if let Ok((mut stream, _)) = listener.accept().await { + let mut buf = vec![0u8; 4096]; + let _ = stream.read(&mut buf).await; + // Return a pipeline status with items at multiple stages and one blocked item. + let pipeline_json = serde_json::to_string(&json!({ + "active": [ + { "story_id": "1_story_a", "name": "A", "stage": "current" }, + { "story_id": "2_story_b", "name": "B", "stage": "qa" }, + { "story_id": "3_story_c", "name": "C", "stage": "current", "blocked": true, "retry_count": 5 }, + ], + "backlog": [{ "story_id": "4_story_d", "name": "D" }], + "backlog_count": 1 + })).unwrap(); + let body = serde_json::to_vec(&json!({ + "jsonrpc": "2.0", + "id": 1, + "result": { + "content": [{ "type": "text", "text": pipeline_json }] + } + })) + .unwrap(); + let header = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + body.len() + ); + let _ = stream.write_all(header.as_bytes()).await; + let _ = stream.write_all(&body).await; + } + }); + + // Give the mock a moment to bind. + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + // Second "project" points to an unreachable port. + let unreachable_url = "http://127.0.0.1:1".to_string(); // port 1 is not bindable + + let mut project_urls = BTreeMap::new(); + project_urls.insert("healthy-project".to_string(), healthy_url); + project_urls.insert("broken-project".to_string(), unreachable_url); + + let client = Client::new(); + let statuses = fetch_all_project_pipeline_statuses(&project_urls, &client).await; + + // Both projects should be present in the result. + assert!( + statuses.contains_key("healthy-project"), + "healthy-project must be in response" + ); + assert!( + statuses.contains_key("broken-project"), + "broken-project must be in response" + ); + + // Healthy project should have correct counts. + let healthy = &statuses["healthy-project"]; + assert!( + healthy.get("error").is_none(), + "healthy project should not have error: {healthy}" + ); + assert_eq!(healthy["counts"]["backlog"], 1); + assert_eq!(healthy["counts"]["current"], 2); + assert_eq!(healthy["counts"]["qa"], 1); + + // Healthy project should report the blocked item. + let blocked = healthy["blocked"].as_array().unwrap(); + assert_eq!(blocked.len(), 1, "expected 1 blocked item: {blocked:?}"); + assert_eq!(blocked[0]["story_id"], "3_story_c"); + + // Unreachable project should have an error field. + let broken = &statuses["broken-project"]; + assert!( + broken.get("error").is_some(), + "unreachable project must have error field: {broken}" + ); + } }