//! Pipeline rendering: builds the full status text from pipeline items. use crate::agents::{AgentPool, AgentStatus}; use crate::config::ProjectConfig; use crate::pipeline_state::{ArchiveReason, PipelineItem, Stage}; use std::collections::{HashMap, HashSet}; /// Map a stage to its display section label, or `None` to skip it entirely. /// /// This is the single source of truth for the "where does this item appear" /// decision. It mirrors the bucket routing in `http/workflow/pipeline.rs` /// so that chat output and the web UI are always consistent. /// /// `Stage::Frozen { resume_to }` is handled recursively: a frozen story /// appears in the same section its `resume_to` stage would land in. pub(crate) fn display_section(s: &Stage) -> Option<&'static str> { match s { Stage::Upcoming | Stage::Backlog => Some("Backlog"), Stage::Coding | Stage::Blocked { .. } | Stage::Archived { reason: ArchiveReason::Blocked { .. }, .. } => Some("In Progress"), Stage::Qa | Stage::ReviewHold { .. } => Some("QA"), Stage::Merge { .. } | Stage::MergeFailure { .. } | Stage::MergeFailureFinal { .. } => { Some("Merge") } Stage::Done { .. } => Some("Done"), Stage::Frozen { resume_to } => display_section(resume_to), Stage::Archived { .. } => None, // other archived variants are hidden } } /// Check which dependency numbers from `item.depends_on` are unmet. /// /// A dependency is considered met if the dep is in `Done` or `Archived` stage /// in `all_items`. If the dep is not found in `all_items` at all (e.g. it was /// archived before the CRDT migration and has no row), it is treated as met. pub(crate) fn unmet_deps_from_items(item: &PipelineItem, all_items: &[PipelineItem]) -> Vec { item.depends_on .iter() .filter_map(|dep_id| { // dep_id.0 is the raw number string (e.g. "999") as projected // from PipelineItemView.depends_on: Vec. let dep_num: u32 = dep_id.0.parse().ok()?; // Find the dep by matching the numeric prefix of its story_id. let dep = all_items.iter().find(|i| { i.story_id.0 == dep_id.0 || i.story_id.0.split('_').next() == Some(dep_id.0.as_str()) }); match dep { Some(d) if matches!(d.stage, Stage::Done { .. } | Stage::Archived { .. }) => None, Some(_) => Some(dep_num), // Found but not done = unmet None => None, // Not in CRDT; treat as met } }) .collect() } /// Extract the first non-empty line from `text`, truncated to `max_len` chars. pub(crate) fn first_non_empty_snippet(text: &str, max_len: usize) -> String { let line = text.lines().find(|l| !l.trim().is_empty()).unwrap_or(""); let mut chars = line.chars(); let truncated: String = chars.by_ref().take(max_len).collect(); if chars.next().is_some() { format!("{truncated}…") } else { truncated } } /// Build the full pipeline status text formatted for Matrix (markdown). pub(crate) fn build_pipeline_status(project_root: &std::path::Path, agents: &AgentPool) -> String { let items = crate::pipeline_state::read_all_typed(); build_status_from_items(project_root, agents, &items) } /// Inner implementation that accepts pre-loaded items for testability. pub(crate) fn build_status_from_items( project_root: &std::path::Path, agents: &AgentPool, items: &[PipelineItem], ) -> String { // Build a map from story_id → active AgentInfo for quick lookup. let active_agents = agents.list_agents().unwrap_or_default(); let active_map: HashMap = active_agents .iter() .filter(|a| matches!(a.status, AgentStatus::Running | AgentStatus::Pending)) .map(|a| (a.story_id.clone(), a)) .collect(); // Read token usage once for all stories to avoid repeated file I/O. let cost_by_story: HashMap = crate::agents::token_usage::read_all(project_root) .unwrap_or_default() .into_iter() .fold(HashMap::new(), |mut map, r| { *map.entry(r.story_id).or_insert(0.0) += r.usage.total_cost_usd; map }); let config = ProjectConfig::load(project_root).ok(); // Pre-fetch merge-specific state: deterministic merges in flight and // any merge_failure text persisted to the story's front matter. let running_merges: HashSet = agents .list_running_merges() .unwrap_or_default() .into_iter() .collect(); // Merge-failure detail now lives on the typed MergeJob CRDT entry // (story 929 — CRDT is the sole source of metadata). let merge_failures: HashMap = items .iter() .filter(|i| matches!(i.stage, Stage::Merge { .. })) .filter_map(|i| { let job = crate::crdt_state::read_merge_job(&i.story_id.0)?; let err = job.error?; Some((i.story_id.0.clone(), err)) }) .collect(); let mut out = String::from("**Pipeline Status**\n\n"); // Render each display section in order. Blocked items appear in-place // under their stage section (determined by `display_section`); there is // no separate "Blocked" section. Frozen items appear under the section // their `resume_to` stage maps to. let sections = ["Backlog", "In Progress", "QA", "Merge", "Done"]; for label in sections { let mut section_items: Vec<&PipelineItem> = items .iter() .filter(|i| display_section(&i.stage) == Some(label)) .collect(); section_items.sort_by(|a, b| a.story_id.0.cmp(&b.story_id.0)); let count = section_items.len(); out.push_str(&format!("**{label}** ({count})\n")); if section_items.is_empty() { out.push_str(" *(none)*\n"); } else { for item in §ion_items { out.push_str(&render_item_line( item, items, &active_map, &cost_by_story, &config, &running_merges, &merge_failures, )); } } out.push('\n'); } // Free agents: configured agents not currently running or pending. out.push_str("**Free Agents**\n"); if let Some(cfg) = &config { let busy_names: HashSet = active_agents .iter() .filter(|a| matches!(a.status, AgentStatus::Running | AgentStatus::Pending)) .map(|a| a.agent_name.clone()) .collect(); let free: Vec = cfg .agent .iter() .filter(|a| !busy_names.contains(&a.name)) .map(|a| match &a.model { Some(m) => format!("{} ({})", a.name, m), None => a.name.clone(), }) .collect(); if free.is_empty() { out.push_str(" *(none — all agents busy)*\n"); } else { for name in &free { out.push_str(&format!(" • {name}\n")); } } } else { out.push_str(" *(no agent config found)*\n"); } out } /// Render a single status line for one pipeline item. fn render_item_line( item: &PipelineItem, all_items: &[PipelineItem], active_map: &HashMap, cost_by_story: &HashMap, config: &Option, running_merges: &HashSet, merge_failures: &HashMap, ) -> String { let story_id = &item.story_id.0; let name_opt = if item.name.is_empty() { None } else { Some(item.name.as_str()) }; // Use the typed CRDT stage as the sole source of truth (story 945). let frozen = item.stage.is_frozen(); let base_label = super::story_short_label(story_id, name_opt); let display = if frozen { format!("\u{2744}\u{FE0F} {base_label}") // ❄️ prefix } else { base_label }; let cost_suffix = cost_by_story .get(story_id) .filter(|&&c| c > 0.0) .map(|c| format!(" — ${c:.2}")) .unwrap_or_default(); let agent = active_map.get(story_id); let unmet = unmet_deps_from_items(item, all_items); let dep_suffix = if unmet.is_empty() { String::new() } else { let nums: Vec = unmet.iter().map(|n| n.to_string()).collect(); format!(" *(waiting on: {})*", nums.join(", ")) }; // Merge-stage items get dedicated breakdown indicators instead of the // generic traffic-light dot. MergeFailure / MergeFailureFinal items // now also appear in the Merge section (in-place) so they are handled // here alongside normal Merge items. if matches!( item.stage, Stage::Merge { .. } | Stage::MergeFailure { .. } | Stage::MergeFailureFinal { .. } ) { match &item.stage { // MergeFailureFinal: mergemaster already tried and gave up — always ⛔. Stage::MergeFailureFinal { reason } => { let snippet = first_non_empty_snippet(reason, 120); return format!(" \u{26D4} {display}{cost_suffix}{dep_suffix} — {snippet}\n"); } // MergeFailure: a recovery agent may be running or queued. Stage::MergeFailure { reason, .. } => { return match agent.map(|a| &a.status) { Some(AgentStatus::Running) => format!( " \u{1F916} {display}{cost_suffix}{dep_suffix} — mergemaster running\n" ), Some(AgentStatus::Pending) => format!( " \u{23F3} {display}{cost_suffix}{dep_suffix} — mergemaster queued\n" ), _ => { let snippet = first_non_empty_snippet(reason, 120); format!(" \u{26D4} {display}{cost_suffix}{dep_suffix} — {snippet}\n") } }; } _ => {} } let in_det_merge = running_merges.contains(story_id); let merge_failure = merge_failures.get(story_id); if in_det_merge { // A fresh deterministic merge is in progress — always prefer 🔄, // even when a previous attempt recorded a merge_failure. return format!( " \u{1F504} {display}{cost_suffix}{dep_suffix} — deterministic-merge running\n" ); } else if agent.is_some() { return format!( " \u{1F916} {display}{cost_suffix}{dep_suffix} — mergemaster running\n" ); } else if let Some(mf) = merge_failure { let snippet = first_non_empty_snippet(mf, 120); return format!(" \u{26D4} {display}{cost_suffix}{dep_suffix} — {snippet}\n"); } else { return format!(" \u{23F3} {display}{cost_suffix}{dep_suffix}\n"); } } let blocked = item.stage.is_blocked(); // Blocked items with a recovery agent get differentiated indicators. if blocked { return match agent.map(|a| &a.status) { Some(AgentStatus::Running) => { format!(" \u{1F916} {display}{cost_suffix}{dep_suffix} — recovery coder running\n") } Some(AgentStatus::Pending) => { format!(" \u{23F3} {display}{cost_suffix}{dep_suffix} — recovery coder queued\n") } _ => format!(" \u{1F534} {display}{cost_suffix}{dep_suffix}\n"), }; } let throttled = agent.map(|a| a.throttled).unwrap_or(false); let dot = super::traffic_light_dot(blocked, throttled, agent.is_some()); if let Some(agent) = agent { let model_str = config .as_ref() .and_then(|cfg| cfg.find_agent(&agent.agent_name)) .and_then(|ac| ac.model.as_deref()) .unwrap_or("?"); format!( " {dot}{display}{cost_suffix}{dep_suffix} — {} ({model_str})\n", agent.agent_name ) } else { format!(" {dot}{display}{cost_suffix}{dep_suffix}\n") } }