Files
huskies/server/src/service/pipeline/mod.rs
T

156 lines
5.3 KiB
Rust

//! Pipeline service — shared pipeline-domain logic.
//!
//! Contains pure functions for parsing and aggregating pipeline status data.
//! Used by the gateway service for cross-project aggregation and potentially
//! by other consumers that need to reason about pipeline stage counts.
use serde_json::{Value, json};
/// Parse a `get_pipeline_status` JSON payload and produce aggregated counts
/// plus a list of blocked/failing items.
pub fn aggregate_pipeline_counts(pipeline: &Value) -> Value {
let active = pipeline
.get("active")
.and_then(|a| a.as_array())
.cloned()
.unwrap_or_default();
let backlog_count = pipeline
.get("backlog_count")
.and_then(|n| n.as_u64())
.unwrap_or(0);
let mut current = 0u64;
let mut qa = 0u64;
let mut merge = 0u64;
let mut done = 0u64;
let mut blocked: Vec<Value> = Vec::new();
for item in &active {
let stage = item
.get("stage")
.and_then(|s| s.as_str())
.unwrap_or("unknown");
match stage {
"current" => current += 1,
"qa" => qa += 1,
"merge" => merge += 1,
"done" => done += 1,
_ => {}
}
let is_blocked = item
.get("blocked")
.and_then(|b| b.as_bool())
.unwrap_or(false);
let merge_failure = item.get("merge_failure");
let has_merge_failure = merge_failure
.map(|f| !f.is_null() && f != "")
.unwrap_or(false);
if is_blocked || has_merge_failure {
let story_id = item
.get("story_id")
.and_then(|s| s.as_str())
.unwrap_or("?")
.to_string();
let story_name = item
.get("name")
.and_then(|s| s.as_str())
.unwrap_or("")
.to_string();
let reason = if has_merge_failure {
format!(
"merge failure: {}",
merge_failure.and_then(|f| f.as_str()).unwrap_or("unknown")
)
} else {
let rc = item
.get("retry_count")
.and_then(|n| n.as_u64())
.unwrap_or(0);
format!("blocked after {rc} retries")
};
blocked.push(json!({
"story_id": story_id,
"name": story_name,
"stage": stage,
"reason": reason,
}));
}
}
json!({
"counts": {
"backlog": backlog_count,
"current": current,
"qa": qa,
"merge": merge,
"done": done,
},
"blocked": blocked,
})
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn aggregate_empty_pipeline() {
let pipeline = json!({ "active": [], "backlog": [], "backlog_count": 0 });
let result = aggregate_pipeline_counts(&pipeline);
assert_eq!(result["counts"]["backlog"], 0);
assert_eq!(result["counts"]["current"], 0);
assert_eq!(result["counts"]["qa"], 0);
assert_eq!(result["counts"]["merge"], 0);
assert_eq!(result["counts"]["done"], 0);
assert_eq!(result["blocked"].as_array().unwrap().len(), 0);
}
#[test]
fn aggregate_stage_counts_correct() {
let pipeline = json!({
"active": [
{ "story_id": "1_story_a", "name": "A", "stage": "current" },
{ "story_id": "2_story_b", "name": "B", "stage": "current" },
{ "story_id": "3_story_c", "name": "C", "stage": "qa" },
{ "story_id": "4_story_d", "name": "D", "stage": "done" },
],
"backlog": [{ "story_id": "5_story_e", "name": "E" }, { "story_id": "6_story_f", "name": "F" }],
"backlog_count": 2
});
let result = aggregate_pipeline_counts(&pipeline);
assert_eq!(result["counts"]["backlog"], 2);
assert_eq!(result["counts"]["current"], 2);
assert_eq!(result["counts"]["qa"], 1);
assert_eq!(result["counts"]["merge"], 0);
assert_eq!(result["counts"]["done"], 1);
assert_eq!(result["blocked"].as_array().unwrap().len(), 0);
}
#[test]
fn aggregate_blocked_items_captured() {
let pipeline = json!({
"active": [
{ "story_id": "10_story_blocked", "name": "Blocked", "stage": "current", "blocked": true, "retry_count": 3 },
{ "story_id": "20_story_ok", "name": "OK", "stage": "qa" },
],
"backlog": [],
"backlog_count": 0
});
let result = aggregate_pipeline_counts(&pipeline);
let blocked = result["blocked"].as_array().unwrap();
assert_eq!(blocked.len(), 1);
assert_eq!(blocked[0]["story_id"], "10_story_blocked");
assert_eq!(blocked[0]["stage"], "current");
assert!(
blocked[0]["reason"]
.as_str()
.unwrap()
.contains("blocked after 3 retries"),
);
}
}