2026-04-24 18:39:16 +00:00
|
|
|
//! Pipeline service — shared pipeline-domain logic.
|
|
|
|
|
//!
|
|
|
|
|
//! Contains pure functions for parsing and aggregating pipeline status data.
|
|
|
|
|
//! Used by the gateway service for cross-project aggregation and potentially
|
|
|
|
|
//! by other consumers that need to reason about pipeline stage counts.
|
|
|
|
|
|
|
|
|
|
use serde_json::{Value, json};
|
|
|
|
|
|
|
|
|
|
/// Parse a `get_pipeline_status` JSON payload and produce aggregated counts
|
|
|
|
|
/// plus a list of blocked/failing items.
|
|
|
|
|
pub fn aggregate_pipeline_counts(pipeline: &Value) -> Value {
|
|
|
|
|
let active = pipeline
|
|
|
|
|
.get("active")
|
|
|
|
|
.and_then(|a| a.as_array())
|
|
|
|
|
.cloned()
|
|
|
|
|
.unwrap_or_default();
|
|
|
|
|
let backlog_count = pipeline
|
|
|
|
|
.get("backlog_count")
|
|
|
|
|
.and_then(|n| n.as_u64())
|
|
|
|
|
.unwrap_or(0);
|
|
|
|
|
|
|
|
|
|
let mut current = 0u64;
|
|
|
|
|
let mut qa = 0u64;
|
|
|
|
|
let mut merge = 0u64;
|
|
|
|
|
let mut done = 0u64;
|
|
|
|
|
let mut blocked: Vec<Value> = Vec::new();
|
|
|
|
|
|
|
|
|
|
for item in &active {
|
|
|
|
|
let stage = item
|
|
|
|
|
.get("stage")
|
|
|
|
|
.and_then(|s| s.as_str())
|
|
|
|
|
.unwrap_or("unknown");
|
|
|
|
|
match stage {
|
|
|
|
|
"current" => current += 1,
|
|
|
|
|
"qa" => qa += 1,
|
|
|
|
|
"merge" => merge += 1,
|
|
|
|
|
"done" => done += 1,
|
|
|
|
|
_ => {}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let is_blocked = item
|
|
|
|
|
.get("blocked")
|
|
|
|
|
.and_then(|b| b.as_bool())
|
|
|
|
|
.unwrap_or(false);
|
|
|
|
|
let merge_failure = item.get("merge_failure");
|
|
|
|
|
let has_merge_failure = merge_failure
|
|
|
|
|
.map(|f| !f.is_null() && f != "")
|
|
|
|
|
.unwrap_or(false);
|
|
|
|
|
|
2026-05-14 18:04:35 +00:00
|
|
|
if (is_blocked || has_merge_failure) && stage != "done" {
|
2026-04-24 18:39:16 +00:00
|
|
|
let story_id = item
|
|
|
|
|
.get("story_id")
|
|
|
|
|
.and_then(|s| s.as_str())
|
|
|
|
|
.unwrap_or("?")
|
|
|
|
|
.to_string();
|
|
|
|
|
let story_name = item
|
|
|
|
|
.get("name")
|
|
|
|
|
.and_then(|s| s.as_str())
|
|
|
|
|
.unwrap_or("")
|
|
|
|
|
.to_string();
|
|
|
|
|
let reason = if has_merge_failure {
|
|
|
|
|
format!(
|
|
|
|
|
"merge failure: {}",
|
|
|
|
|
merge_failure.and_then(|f| f.as_str()).unwrap_or("unknown")
|
|
|
|
|
)
|
|
|
|
|
} else {
|
|
|
|
|
let rc = item
|
|
|
|
|
.get("retry_count")
|
|
|
|
|
.and_then(|n| n.as_u64())
|
|
|
|
|
.unwrap_or(0);
|
|
|
|
|
format!("blocked after {rc} retries")
|
|
|
|
|
};
|
|
|
|
|
blocked.push(json!({
|
|
|
|
|
"story_id": story_id,
|
|
|
|
|
"name": story_name,
|
|
|
|
|
"stage": stage,
|
|
|
|
|
"reason": reason,
|
|
|
|
|
}));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
json!({
|
|
|
|
|
"counts": {
|
|
|
|
|
"backlog": backlog_count,
|
|
|
|
|
"current": current,
|
|
|
|
|
"qa": qa,
|
|
|
|
|
"merge": merge,
|
|
|
|
|
"done": done,
|
|
|
|
|
},
|
|
|
|
|
"blocked": blocked,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── Tests ────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests {
|
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn aggregate_empty_pipeline() {
|
|
|
|
|
let pipeline = json!({ "active": [], "backlog": [], "backlog_count": 0 });
|
|
|
|
|
let result = aggregate_pipeline_counts(&pipeline);
|
|
|
|
|
assert_eq!(result["counts"]["backlog"], 0);
|
|
|
|
|
assert_eq!(result["counts"]["current"], 0);
|
|
|
|
|
assert_eq!(result["counts"]["qa"], 0);
|
|
|
|
|
assert_eq!(result["counts"]["merge"], 0);
|
|
|
|
|
assert_eq!(result["counts"]["done"], 0);
|
|
|
|
|
assert_eq!(result["blocked"].as_array().unwrap().len(), 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn aggregate_stage_counts_correct() {
|
|
|
|
|
let pipeline = json!({
|
|
|
|
|
"active": [
|
|
|
|
|
{ "story_id": "1_story_a", "name": "A", "stage": "current" },
|
|
|
|
|
{ "story_id": "2_story_b", "name": "B", "stage": "current" },
|
|
|
|
|
{ "story_id": "3_story_c", "name": "C", "stage": "qa" },
|
|
|
|
|
{ "story_id": "4_story_d", "name": "D", "stage": "done" },
|
|
|
|
|
],
|
|
|
|
|
"backlog": [{ "story_id": "5_story_e", "name": "E" }, { "story_id": "6_story_f", "name": "F" }],
|
|
|
|
|
"backlog_count": 2
|
|
|
|
|
});
|
|
|
|
|
let result = aggregate_pipeline_counts(&pipeline);
|
|
|
|
|
assert_eq!(result["counts"]["backlog"], 2);
|
|
|
|
|
assert_eq!(result["counts"]["current"], 2);
|
|
|
|
|
assert_eq!(result["counts"]["qa"], 1);
|
|
|
|
|
assert_eq!(result["counts"]["merge"], 0);
|
|
|
|
|
assert_eq!(result["counts"]["done"], 1);
|
|
|
|
|
assert_eq!(result["blocked"].as_array().unwrap().len(), 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn aggregate_blocked_items_captured() {
|
|
|
|
|
let pipeline = json!({
|
|
|
|
|
"active": [
|
|
|
|
|
{ "story_id": "10_story_blocked", "name": "Blocked", "stage": "current", "blocked": true, "retry_count": 3 },
|
|
|
|
|
{ "story_id": "20_story_ok", "name": "OK", "stage": "qa" },
|
|
|
|
|
],
|
|
|
|
|
"backlog": [],
|
|
|
|
|
"backlog_count": 0
|
|
|
|
|
});
|
|
|
|
|
let result = aggregate_pipeline_counts(&pipeline);
|
|
|
|
|
let blocked = result["blocked"].as_array().unwrap();
|
|
|
|
|
assert_eq!(blocked.len(), 1);
|
|
|
|
|
assert_eq!(blocked[0]["story_id"], "10_story_blocked");
|
|
|
|
|
assert_eq!(blocked[0]["stage"], "current");
|
|
|
|
|
assert!(
|
|
|
|
|
blocked[0]["reason"]
|
|
|
|
|
.as_str()
|
|
|
|
|
.unwrap()
|
|
|
|
|
.contains("blocked after 3 retries"),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|