//! Pipeline rendering: builds the full status text from pipeline items. use crate::agents::{AgentPool, AgentStatus}; use crate::config::ProjectConfig; use crate::pipeline_state::{PipelineItem, Stage}; use std::collections::{HashMap, HashSet}; /// Check which dependency numbers from `item.depends_on` are unmet. /// /// A dependency is considered met if the dep is in `Done` or `Archived` stage /// in `all_items`. If the dep is not found in `all_items` at all (e.g. it was /// archived before the CRDT migration and has no row), it is treated as met. pub(crate) fn unmet_deps_from_items(item: &PipelineItem, all_items: &[PipelineItem]) -> Vec { item.depends_on .iter() .filter_map(|dep_id| { // dep_id.0 is the raw number string (e.g. "999") as projected // from PipelineItemView.depends_on: Vec. let dep_num: u32 = dep_id.0.parse().ok()?; // Find the dep by matching the numeric prefix of its story_id. let dep = all_items.iter().find(|i| { i.story_id.0 == dep_id.0 || i.story_id.0.split('_').next() == Some(dep_id.0.as_str()) }); match dep { Some(d) if matches!(d.stage, Stage::Done { .. } | Stage::Archived { .. }) => None, Some(_) => Some(dep_num), // Found but not done = unmet None => None, // Not in CRDT; treat as met } }) .collect() } /// Extract the first non-empty line from `text`, truncated to `max_len` chars. pub(crate) fn first_non_empty_snippet(text: &str, max_len: usize) -> String { let line = text.lines().find(|l| !l.trim().is_empty()).unwrap_or(""); let mut chars = line.chars(); let truncated: String = chars.by_ref().take(max_len).collect(); if chars.next().is_some() { format!("{truncated}…") } else { truncated } } /// Build the full pipeline status text formatted for Matrix (markdown). pub(crate) fn build_pipeline_status(project_root: &std::path::Path, agents: &AgentPool) -> String { let items = crate::pipeline_state::read_all_typed(); build_status_from_items(project_root, agents, &items) } /// Inner implementation that accepts pre-loaded items for testability. pub(crate) fn build_status_from_items( project_root: &std::path::Path, agents: &AgentPool, items: &[PipelineItem], ) -> String { // Build a map from story_id → active AgentInfo for quick lookup. let active_agents = agents.list_agents().unwrap_or_default(); let active_map: HashMap = active_agents .iter() .filter(|a| matches!(a.status, AgentStatus::Running | AgentStatus::Pending)) .map(|a| (a.story_id.clone(), a)) .collect(); // Read token usage once for all stories to avoid repeated file I/O. let cost_by_story: HashMap = crate::agents::token_usage::read_all(project_root) .unwrap_or_default() .into_iter() .fold(HashMap::new(), |mut map, r| { *map.entry(r.story_id).or_insert(0.0) += r.usage.total_cost_usd; map }); let config = ProjectConfig::load(project_root).ok(); // Pre-fetch merge-specific state: deterministic merges in flight and // any merge_failure text persisted to the story's front matter. let running_merges: HashSet = agents .list_running_merges() .unwrap_or_default() .into_iter() .collect(); // Merge-failure detail now lives on the typed MergeJob CRDT entry // (story 929 — CRDT is the sole source of metadata). let merge_failures: HashMap = items .iter() .filter(|i| matches!(i.stage, Stage::Merge { .. })) .filter_map(|i| { let job = crate::crdt_state::read_merge_job(&i.story_id.0)?; let err = job.error?; Some((i.story_id.0.clone(), err)) }) .collect(); let mut out = String::from("**Pipeline Status**\n\n"); // Active pipeline stages to display (Archived is handled separately below). type StagePredicate = fn(&Stage) -> bool; let stage_filters: &[(&str, StagePredicate)] = &[ ("Backlog", |s| matches!(s, Stage::Backlog)), ("In Progress", |s| matches!(s, Stage::Coding)), ("QA", |s| matches!(s, Stage::Qa)), ("Merge", |s| matches!(s, Stage::Merge { .. })), ("Done", |s| matches!(s, Stage::Done { .. })), ]; for (label, filter) in stage_filters { let mut stage_items: Vec<&PipelineItem> = items.iter().filter(|i| filter(&i.stage)).collect(); stage_items.sort_by(|a, b| a.story_id.0.cmp(&b.story_id.0)); let count = stage_items.len(); out.push_str(&format!("**{label}** ({count})\n")); if stage_items.is_empty() { out.push_str(" *(none)*\n"); } else { for item in &stage_items { out.push_str(&render_item_line( item, items, &active_map, &cost_by_story, &config, &running_merges, &merge_failures, )); } } out.push('\n'); } // Blocked items: Archived { reason: Blocked } shown with 🔴 indicator. let mut blocked_items: Vec<&PipelineItem> = items.iter().filter(|i| i.stage.is_blocked()).collect(); blocked_items.sort_by(|a, b| a.story_id.0.cmp(&b.story_id.0)); if !blocked_items.is_empty() { out.push_str(&format!("**Blocked** ({})\n", blocked_items.len())); for item in &blocked_items { out.push_str(&render_item_line( item, items, &active_map, &cost_by_story, &config, &running_merges, &merge_failures, )); } out.push('\n'); } // Free agents: configured agents not currently running or pending. out.push_str("**Free Agents**\n"); if let Some(cfg) = &config { let busy_names: HashSet = active_agents .iter() .filter(|a| matches!(a.status, AgentStatus::Running | AgentStatus::Pending)) .map(|a| a.agent_name.clone()) .collect(); let free: Vec = cfg .agent .iter() .filter(|a| !busy_names.contains(&a.name)) .map(|a| match &a.model { Some(m) => format!("{} ({})", a.name, m), None => a.name.clone(), }) .collect(); if free.is_empty() { out.push_str(" *(none — all agents busy)*\n"); } else { for name in &free { out.push_str(&format!(" • {name}\n")); } } } else { out.push_str(" *(no agent config found)*\n"); } out } /// Render a single status line for one pipeline item. fn render_item_line( item: &PipelineItem, all_items: &[PipelineItem], active_map: &HashMap, cost_by_story: &HashMap, config: &Option, running_merges: &HashSet, merge_failures: &HashMap, ) -> String { let story_id = &item.story_id.0; let name_opt = if item.name.is_empty() { None } else { Some(item.name.as_str()) }; let frozen = crate::io::story_metadata::is_story_frozen_in_store(story_id); let base_label = super::story_short_label(story_id, name_opt); let display = if frozen { format!("\u{2744}\u{FE0F} {base_label}") // ❄️ prefix } else { base_label }; let cost_suffix = cost_by_story .get(story_id) .filter(|&&c| c > 0.0) .map(|c| format!(" — ${c:.2}")) .unwrap_or_default(); let agent = active_map.get(story_id); let unmet = unmet_deps_from_items(item, all_items); let dep_suffix = if unmet.is_empty() { String::new() } else { let nums: Vec = unmet.iter().map(|n| n.to_string()).collect(); format!(" *(waiting on: {})*", nums.join(", ")) }; // Merge-stage items get a dedicated breakdown indicator instead of the // generic traffic-light dot. if matches!(item.stage, Stage::Merge { .. }) { let in_det_merge = running_merges.contains(story_id); let merge_failure = merge_failures.get(story_id); if in_det_merge { // A fresh deterministic merge is in progress — always prefer 🔄, // even when a previous attempt recorded a merge_failure. return format!( " \u{1F504} {display}{cost_suffix}{dep_suffix} — deterministic-merge running\n" ); } else if agent.is_some() { return format!( " \u{1F916} {display}{cost_suffix}{dep_suffix} — mergemaster running\n" ); } else if let Some(mf) = merge_failure { let snippet = first_non_empty_snippet(mf, 120); return format!(" \u{26D4} {display}{cost_suffix}{dep_suffix} — {snippet}\n"); } else { return format!(" \u{23F3} {display}{cost_suffix}{dep_suffix}\n"); } } let blocked = item.stage.is_blocked(); let throttled = agent.map(|a| a.throttled).unwrap_or(false); let dot = super::traffic_light_dot(blocked, throttled, agent.is_some()); if let Some(agent) = agent { let model_str = config .as_ref() .and_then(|cfg| cfg.find_agent(&agent.agent_name)) .and_then(|ac| ac.model.as_deref()) .unwrap_or("?"); format!( " {dot}{display}{cost_suffix}{dep_suffix} — {} ({model_str})\n", agent.agent_name ) } else { format!(" {dot}{display}{cost_suffix}{dep_suffix}\n") } }