huskies: merge 600_story_gateway_aggregated_pipeline_status_mcp_and_chat

This commit is contained in:
dave
2026-04-23 10:38:30 +00:00
parent b3da321a3b
commit c9e8ed030e
2 changed files with 498 additions and 42 deletions
+476 -41
View File
@@ -257,6 +257,7 @@ const GATEWAY_TOOLS: &[&str] = &[
"gateway_status",
"gateway_health",
"init_project",
"aggregate_pipeline_status",
];
/// Main MCP POST handler for the gateway. Intercepts gateway-specific tools and
@@ -439,6 +440,14 @@ fn gateway_tool_definitions() -> Vec<Value> {
"required": ["path"]
}
}),
json!({
"name": "aggregate_pipeline_status",
"description": "Fetch pipeline status from ALL registered projects in parallel and return an aggregated report. For each project: stage counts (backlog/current/qa/merge/done) and a list of blocked or failing items with triage detail. Unreachable projects are included with an error state rather than failing the whole call.",
"inputSchema": {
"type": "object",
"properties": {}
}
}),
]
}
@@ -518,6 +527,7 @@ async fn handle_gateway_tool(
"gateway_status" => handle_gateway_status(state, id).await,
"gateway_health" => handle_gateway_health(state, id).await,
"init_project" => handle_init_project(params, state, id).await,
"aggregate_pipeline_status" => handle_aggregate_pipeline_status(state, id).await,
_ => JsonRpcResponse::error(id, -32601, format!("Unknown gateway tool: {tool_name}")),
}
}
@@ -659,6 +669,257 @@ async fn handle_gateway_health(state: &GatewayState, id: Option<Value>) -> JsonR
)
}
// ── Aggregate pipeline status ─────────────────────────────────────────
/// Fetch `get_pipeline_status` from every registered project URL in parallel.
///
/// Returns a `BTreeMap` of project name → per-project status JSON. Each value
/// is either `{"counts": {...}, "blocked": [...]}` on success or
/// `{"error": "..."}` when the project container is unreachable or returns an
/// unexpected response. A single flaky project never causes the whole call to
/// fail.
pub async fn fetch_all_project_pipeline_statuses(
project_urls: &BTreeMap<String, String>,
client: &Client,
) -> BTreeMap<String, Value> {
use futures::future::join_all;
let futures: Vec<_> = project_urls
.iter()
.map(|(name, url)| {
let name = name.clone();
let url = url.clone();
let client = client.clone();
async move {
let result = fetch_one_project_pipeline_status(&url, &client).await;
(name, result)
}
})
.collect();
join_all(futures).await.into_iter().collect()
}
/// Fetch and aggregate pipeline status for a single project URL.
async fn fetch_one_project_pipeline_status(url: &str, client: &Client) -> Value {
let mcp_url = format!("{}/mcp", url.trim_end_matches('/'));
let rpc_body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "get_pipeline_status",
"arguments": {}
}
});
match client.post(&mcp_url).json(&rpc_body).send().await {
Ok(resp) => match resp.json::<Value>().await {
Ok(upstream) => {
if let Some(text) = upstream
.get("result")
.and_then(|r| r.get("content"))
.and_then(|c| c.get(0))
.and_then(|c| c.get("text"))
.and_then(|t| t.as_str())
{
match serde_json::from_str::<Value>(text) {
Ok(pipeline) => aggregate_pipeline_counts(&pipeline),
Err(_) => json!({ "error": "invalid pipeline JSON" }),
}
} else {
json!({ "error": "unexpected response shape" })
}
}
Err(e) => json!({ "error": format!("invalid response: {e}") }),
},
Err(e) => json!({ "error": format!("unreachable: {e}") }),
}
}
/// Parse a `get_pipeline_status` JSON payload and produce aggregated counts
/// plus a list of blocked/failing items.
fn aggregate_pipeline_counts(pipeline: &Value) -> Value {
let active = pipeline
.get("active")
.and_then(|a| a.as_array())
.cloned()
.unwrap_or_default();
let backlog_count = pipeline
.get("backlog_count")
.and_then(|n| n.as_u64())
.unwrap_or(0);
let mut current = 0u64;
let mut qa = 0u64;
let mut merge = 0u64;
let mut done = 0u64;
let mut blocked: Vec<Value> = Vec::new();
for item in &active {
let stage = item
.get("stage")
.and_then(|s| s.as_str())
.unwrap_or("unknown");
match stage {
"current" => current += 1,
"qa" => qa += 1,
"merge" => merge += 1,
"done" => done += 1,
_ => {}
}
let is_blocked = item
.get("blocked")
.and_then(|b| b.as_bool())
.unwrap_or(false);
let merge_failure = item.get("merge_failure");
let has_merge_failure = merge_failure
.map(|f| !f.is_null() && f != "")
.unwrap_or(false);
if is_blocked || has_merge_failure {
let story_id = item
.get("story_id")
.and_then(|s| s.as_str())
.unwrap_or("?")
.to_string();
let story_name = item
.get("name")
.and_then(|s| s.as_str())
.unwrap_or("")
.to_string();
let reason = if has_merge_failure {
format!(
"merge failure: {}",
merge_failure.and_then(|f| f.as_str()).unwrap_or("unknown")
)
} else {
let rc = item
.get("retry_count")
.and_then(|n| n.as_u64())
.unwrap_or(0);
format!("blocked after {rc} retries")
};
blocked.push(json!({
"story_id": story_id,
"name": story_name,
"stage": stage,
"reason": reason,
}));
}
}
json!({
"counts": {
"backlog": backlog_count,
"current": current,
"qa": qa,
"merge": merge,
"done": done,
},
"blocked": blocked,
})
}
/// Format an aggregated status map as a compact, one-line-per-project string
/// suitable for Matrix/Slack messages.
///
/// Healthy projects: `🟢 **name** — B:5 C:2 Q:1 M:0 D:12`
/// Blocked items appended on the same line: `| blocked: 42 [story]`
/// Unreachable projects: `🔴 **name** — UNREACHABLE`
pub fn format_aggregate_status_compact(statuses: &BTreeMap<String, Value>) -> String {
let mut lines: Vec<String> = Vec::new();
for (name, status) in statuses {
if let Some(err) = status.get("error").and_then(|e| e.as_str()) {
lines.push(format!("\u{1F534} **{name}** — UNREACHABLE: {err}"));
} else {
let counts = status.get("counts");
let b = counts
.and_then(|c| c.get("backlog"))
.and_then(|n| n.as_u64())
.unwrap_or(0);
let c = counts
.and_then(|c| c.get("current"))
.and_then(|n| n.as_u64())
.unwrap_or(0);
let q = counts
.and_then(|c| c.get("qa"))
.and_then(|n| n.as_u64())
.unwrap_or(0);
let m = counts
.and_then(|c| c.get("merge"))
.and_then(|n| n.as_u64())
.unwrap_or(0);
let d = counts
.and_then(|c| c.get("done"))
.and_then(|n| n.as_u64())
.unwrap_or(0);
let blocked_arr = status
.get("blocked")
.and_then(|a| a.as_array())
.cloned()
.unwrap_or_default();
let indicator = if blocked_arr.is_empty() {
"\u{1F7E2}" // 🟢
} else {
"\u{1F7E0}" // 🟠
};
let mut line = format!("{indicator} **{name}** — B:{b} C:{c} Q:{q} M:{m} D:{d}");
if !blocked_arr.is_empty() {
let ids: Vec<String> = blocked_arr
.iter()
.filter_map(|item| item.get("story_id").and_then(|s| s.as_str()))
.map(|s| s.to_string())
.collect();
line.push_str(&format!(" | blocked: {}", ids.join(", ")));
}
lines.push(line);
}
}
if lines.is_empty() {
return "No projects registered.".to_string();
}
format!("**All Projects**\n\n{}", lines.join("\n\n"))
}
/// MCP tool handler for `aggregate_pipeline_status`.
async fn handle_aggregate_pipeline_status(
state: &GatewayState,
id: Option<Value>,
) -> JsonRpcResponse {
let project_urls: BTreeMap<String, String> = state
.projects
.read()
.await
.iter()
.map(|(name, entry)| (name.clone(), entry.url.clone()))
.collect();
let statuses = fetch_all_project_pipeline_statuses(&project_urls, &state.client).await;
let active = state.active_project.read().await.clone();
JsonRpcResponse::success(
id,
json!({
"content": [{
"type": "text",
"text": format!(
"Aggregate pipeline status (active: '{active}'):\n{}",
serde_json::to_string_pretty(&statuses).unwrap_or_default()
)
}],
"projects": statuses,
"active": active,
}),
)
}
/// Initialise a new huskies project at the given filesystem path.
///
/// Performs the same scaffolding as `huskies init <path>`: creates `.huskies/`,
@@ -1473,12 +1734,14 @@ fn toml_string(s: &str) -> String {
format!("\"{}\"", s.replace('\\', "\\\\").replace('"', "\\\""))
}
/// `GET /api/gateway/pipeline` — fetch pipeline status from all registered projects.
/// `GET /api/gateway/pipeline` — fetch pipeline status from all registered projects in parallel.
///
/// Returns `{ "active": "<project>", "projects": { "<name>": { "active": [...], "backlog": [...], "backlog_count": N } | { "error": "..." } } }`.
/// Returns `{ "active": "<project>", "projects": { "<name>": { "counts": {...}, "blocked": [...] } | { "error": "..." } } }`.
/// Requests to each project container are issued concurrently — wall-clock latency is
/// bounded by the slowest responding project, not the sum of all response times.
#[handler]
pub async fn gateway_all_pipeline_handler(state: Data<&Arc<GatewayState>>) -> Response {
let project_entries: Vec<(String, String)> = state
let project_urls: BTreeMap<String, String> = state
.projects
.read()
.await
@@ -1486,44 +1749,7 @@ pub async fn gateway_all_pipeline_handler(state: Data<&Arc<GatewayState>>) -> Re
.map(|(n, e)| (n.clone(), e.url.clone()))
.collect();
let mut results: BTreeMap<String, Value> = BTreeMap::new();
for (name, url) in &project_entries {
let mcp_url = format!("{}/mcp", url.trim_end_matches('/'));
let rpc_body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "get_pipeline_status",
"arguments": {}
}
});
let status = match state.client.post(&mcp_url).json(&rpc_body).send().await {
Ok(resp) => match resp.json::<Value>().await {
Ok(upstream) => {
// The tool result is a JSON string embedded in content[0].text.
if let Some(text) = upstream
.get("result")
.and_then(|r| r.get("content"))
.and_then(|c| c.get(0))
.and_then(|c| c.get("text"))
.and_then(|t| t.as_str())
{
serde_json::from_str(text)
.unwrap_or_else(|_| json!({ "error": "invalid pipeline json" }))
} else {
json!({ "error": "unexpected response shape" })
}
}
Err(e) => json!({ "error": format!("invalid response: {e}") }),
},
Err(e) => json!({ "error": format!("unreachable: {e}") }),
};
results.insert(name.clone(), status);
}
let results = fetch_all_project_pipeline_statuses(&project_urls, &state.client).await;
let active = state.active_project.read().await.clone();
let body = json!({ "active": active, "projects": results });
@@ -2701,4 +2927,213 @@ enabled = false
"init_project should be in gateway tool definitions"
);
}
#[test]
fn gateway_tool_definitions_includes_aggregate_pipeline_status() {
let defs = gateway_tool_definitions();
let names: Vec<&str> = defs
.iter()
.filter_map(|d| d.get("name").and_then(|n| n.as_str()))
.collect();
assert!(
names.contains(&"aggregate_pipeline_status"),
"aggregate_pipeline_status should be in gateway tool definitions"
);
}
// ── aggregate_pipeline_counts unit tests ─────────────────────────────────
#[test]
fn aggregate_pipeline_counts_empty_pipeline() {
let pipeline = json!({ "active": [], "backlog": [], "backlog_count": 0 });
let result = aggregate_pipeline_counts(&pipeline);
assert_eq!(result["counts"]["backlog"], 0);
assert_eq!(result["counts"]["current"], 0);
assert_eq!(result["counts"]["qa"], 0);
assert_eq!(result["counts"]["merge"], 0);
assert_eq!(result["counts"]["done"], 0);
assert_eq!(result["blocked"].as_array().unwrap().len(), 0);
}
#[test]
fn aggregate_pipeline_counts_stage_counts_correct() {
let pipeline = json!({
"active": [
{ "story_id": "1_story_a", "name": "A", "stage": "current" },
{ "story_id": "2_story_b", "name": "B", "stage": "current" },
{ "story_id": "3_story_c", "name": "C", "stage": "qa" },
{ "story_id": "4_story_d", "name": "D", "stage": "done" },
],
"backlog": [{ "story_id": "5_story_e", "name": "E" }, { "story_id": "6_story_f", "name": "F" }],
"backlog_count": 2
});
let result = aggregate_pipeline_counts(&pipeline);
assert_eq!(result["counts"]["backlog"], 2);
assert_eq!(result["counts"]["current"], 2);
assert_eq!(result["counts"]["qa"], 1);
assert_eq!(result["counts"]["merge"], 0);
assert_eq!(result["counts"]["done"], 1);
assert_eq!(result["blocked"].as_array().unwrap().len(), 0);
}
#[test]
fn aggregate_pipeline_counts_blocked_items_captured() {
let pipeline = json!({
"active": [
{ "story_id": "10_story_blocked", "name": "Blocked", "stage": "current", "blocked": true, "retry_count": 3 },
{ "story_id": "20_story_ok", "name": "OK", "stage": "qa" },
],
"backlog": [],
"backlog_count": 0
});
let result = aggregate_pipeline_counts(&pipeline);
let blocked = result["blocked"].as_array().unwrap();
assert_eq!(blocked.len(), 1);
assert_eq!(blocked[0]["story_id"], "10_story_blocked");
assert_eq!(blocked[0]["stage"], "current");
assert!(
blocked[0]["reason"]
.as_str()
.unwrap()
.contains("blocked after 3 retries"),
"reason: {}",
blocked[0]["reason"]
);
}
#[test]
fn format_aggregate_status_compact_healthy_project() {
let mut statuses = BTreeMap::new();
statuses.insert(
"huskies".to_string(),
json!({
"counts": { "backlog": 5, "current": 2, "qa": 1, "merge": 0, "done": 12 },
"blocked": []
}),
);
let output = format_aggregate_status_compact(&statuses);
assert!(output.contains("huskies"), "output: {output}");
assert!(output.contains("B:5"), "output: {output}");
assert!(output.contains("C:2"), "output: {output}");
assert!(output.contains("Q:1"), "output: {output}");
assert!(output.contains("D:12"), "output: {output}");
assert!(!output.contains("blocked:"), "output: {output}");
}
#[test]
fn format_aggregate_status_compact_unreachable_project() {
let mut statuses = BTreeMap::new();
statuses.insert(
"broken".to_string(),
json!({ "error": "connection refused" }),
);
let output = format_aggregate_status_compact(&statuses);
assert!(output.contains("broken"), "output: {output}");
assert!(output.contains("UNREACHABLE"), "output: {output}");
assert!(output.contains("connection refused"), "output: {output}");
}
#[test]
fn format_aggregate_status_compact_blocked_items_shown() {
let mut statuses = BTreeMap::new();
statuses.insert(
"myproj".to_string(),
json!({
"counts": { "backlog": 0, "current": 1, "qa": 0, "merge": 0, "done": 0 },
"blocked": [{ "story_id": "42_story_x", "name": "X", "stage": "current", "reason": "blocked after 3 retries" }]
}),
);
let output = format_aggregate_status_compact(&statuses);
assert!(output.contains("blocked:"), "output: {output}");
assert!(output.contains("42_story_x"), "output: {output}");
}
/// Integration test: two mock projects (one healthy, one unreachable).
/// Asserts that `fetch_all_project_pipeline_statuses` reports both correctly.
#[tokio::test]
async fn aggregate_pipeline_status_integration_healthy_and_unreachable() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
// Start a mock project MCP server that returns a get_pipeline_status response.
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let mock_port = listener.local_addr().unwrap().port();
let healthy_url = format!("http://127.0.0.1:{mock_port}");
// The mock responds to exactly 1 connection then stops.
tokio::spawn(async move {
if let Ok((mut stream, _)) = listener.accept().await {
let mut buf = vec![0u8; 4096];
let _ = stream.read(&mut buf).await;
// Return a pipeline status with items at multiple stages and one blocked item.
let pipeline_json = serde_json::to_string(&json!({
"active": [
{ "story_id": "1_story_a", "name": "A", "stage": "current" },
{ "story_id": "2_story_b", "name": "B", "stage": "qa" },
{ "story_id": "3_story_c", "name": "C", "stage": "current", "blocked": true, "retry_count": 5 },
],
"backlog": [{ "story_id": "4_story_d", "name": "D" }],
"backlog_count": 1
})).unwrap();
let body = serde_json::to_vec(&json!({
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [{ "type": "text", "text": pipeline_json }]
}
}))
.unwrap();
let header = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
body.len()
);
let _ = stream.write_all(header.as_bytes()).await;
let _ = stream.write_all(&body).await;
}
});
// Give the mock a moment to bind.
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
// Second "project" points to an unreachable port.
let unreachable_url = "http://127.0.0.1:1".to_string(); // port 1 is not bindable
let mut project_urls = BTreeMap::new();
project_urls.insert("healthy-project".to_string(), healthy_url);
project_urls.insert("broken-project".to_string(), unreachable_url);
let client = Client::new();
let statuses = fetch_all_project_pipeline_statuses(&project_urls, &client).await;
// Both projects should be present in the result.
assert!(
statuses.contains_key("healthy-project"),
"healthy-project must be in response"
);
assert!(
statuses.contains_key("broken-project"),
"broken-project must be in response"
);
// Healthy project should have correct counts.
let healthy = &statuses["healthy-project"];
assert!(
healthy.get("error").is_none(),
"healthy project should not have error: {healthy}"
);
assert_eq!(healthy["counts"]["backlog"], 1);
assert_eq!(healthy["counts"]["current"], 2);
assert_eq!(healthy["counts"]["qa"], 1);
// Healthy project should report the blocked item.
let blocked = healthy["blocked"].as_array().unwrap();
assert_eq!(blocked.len(), 1, "expected 1 blocked item: {blocked:?}");
assert_eq!(blocked[0]["story_id"], "3_story_c");
// Unreachable project should have an error field.
let broken = &statuses["broken-project"];
assert!(
broken.get("error").is_some(),
"unreachable project must have error field: {broken}"
);
}
}