//! Pipeline state — types and loading functions for the story pipeline. use crate::agents::AgentStatus; use crate::http::context::AppContext; use crate::io::story_metadata::parse_front_matter; use serde::Serialize; use std::collections::HashMap; use std::path::Path; /// Agent assignment embedded in a pipeline stage item. #[derive(Clone, Debug, Serialize)] pub struct AgentAssignment { pub agent_name: String, pub model: Option, pub status: String, } /// A story/bug/spike item as it appears in a pipeline stage listing. #[derive(Clone, Debug, Serialize)] pub struct UpcomingStory { pub story_id: String, pub name: Option, pub error: Option, /// Merge failure reason persisted to front matter by the mergemaster agent. pub merge_failure: Option, /// Active agent working on this item, if any. pub agent: Option, /// True when the item is held in QA for human review. #[serde(skip_serializing_if = "Option::is_none")] pub review_hold: Option, /// QA mode for this item: "human", "server", or "agent". #[serde(skip_serializing_if = "Option::is_none")] pub qa: Option, /// Number of retries at the current pipeline stage. #[serde(skip_serializing_if = "Option::is_none")] pub retry_count: Option, /// True when the story has exceeded its retry limit and will not be auto-assigned. #[serde(skip_serializing_if = "Option::is_none")] pub blocked: Option, /// Story numbers this story depends on. #[serde(skip_serializing_if = "Option::is_none")] pub depends_on: Option>, } /// Validation outcome for a single story. pub struct StoryValidationResult { pub story_id: String, pub valid: bool, pub error: Option, } /// Full pipeline state across all stages. #[derive(Clone, Debug, Serialize)] pub struct PipelineState { pub backlog: Vec, pub current: Vec, pub qa: Vec, pub merge: Vec, pub done: Vec, /// Story IDs that currently have a deterministic merge in progress. pub deterministic_merges_in_flight: Vec, } /// Load the full pipeline state (all 5 active stages). /// /// Reads from the CRDT document and enriches with content from the /// in-memory content store. Agent assignments are overlaid from the /// in-memory agent pool. pub fn load_pipeline_state(ctx: &AppContext) -> Result { let agent_map = build_active_agent_map(ctx); use crate::pipeline_state::Stage; let typed_items = crate::pipeline_state::read_all_typed(); let deterministic_merges_in_flight = ctx .services .agents .list_running_merges() .unwrap_or_default(); let mut state = PipelineState { backlog: Vec::new(), current: Vec::new(), qa: Vec::new(), merge: Vec::new(), done: Vec::new(), deterministic_merges_in_flight, }; for item in typed_items { let sid = &item.story_id.0; let agent = agent_map.get(sid).cloned(); // Enrich with content-derived metadata (merge_failure, review_hold, qa). let (merge_failure, review_hold, qa) = crate::db::read_content(sid) .and_then(|c| parse_front_matter(&c).ok()) .map(|meta| { ( meta.merge_failure, meta.review_hold, meta.qa.map(|m| m.as_str().to_string()), ) }) .unwrap_or((None, None, None)); let story = UpcomingStory { story_id: sid.clone(), name: if item.name.is_empty() { None } else { Some(item.name.clone()) }, error: None, merge_failure, agent, review_hold, qa, retry_count: if item.retry_count > 0 { Some(item.retry_count) } else { None }, blocked: if item.stage.is_blocked() { Some(true) } else { None }, depends_on: if item.depends_on.is_empty() { None } else { Some( item.depends_on .iter() .filter_map(|d| d.0.split('_').next()?.parse::().ok()) .collect(), ) }, }; match &item.stage { Stage::Upcoming => state.backlog.push(story), // upcoming shown with backlog Stage::Backlog => state.backlog.push(story), Stage::Coding => state.current.push(story), Stage::Qa => state.qa.push(story), Stage::Merge { .. } => state.merge.push(story), Stage::Done { .. } => state.done.push(story), Stage::Archived { .. } => {} // skip archived } } // Sort each stage for deterministic output. state.backlog.sort_by(|a, b| a.story_id.cmp(&b.story_id)); state.current.sort_by(|a, b| a.story_id.cmp(&b.story_id)); state.qa.sort_by(|a, b| a.story_id.cmp(&b.story_id)); state.merge.sort_by(|a, b| a.story_id.cmp(&b.story_id)); state.done.sort_by(|a, b| a.story_id.cmp(&b.story_id)); Ok(state) } /// Build a map from story_id → AgentAssignment for all pending/running agents. fn build_active_agent_map(ctx: &AppContext) -> HashMap { let agents = match ctx.services.agents.list_agents() { Ok(a) => a, Err(_) => return HashMap::new(), }; let config_opt = ctx .state .get_project_root() .ok() .and_then(|root| crate::config::ProjectConfig::load(&root).ok()); let mut map = HashMap::new(); for agent in agents { if !matches!(agent.status, AgentStatus::Pending | AgentStatus::Running) { continue; } let model = config_opt .as_ref() .and_then(|cfg| cfg.find_agent(&agent.agent_name)) .and_then(|ac| ac.model.clone()); map.insert( agent.story_id.clone(), AgentAssignment { agent_name: agent.agent_name, model, status: agent.status.to_string(), }, ); } map } /// Load all stories currently in the backlog stage. pub fn load_upcoming_stories(_ctx: &AppContext) -> Result, String> { use crate::pipeline_state::Stage; let typed_items = crate::pipeline_state::read_all_typed(); let mut stories: Vec = typed_items .into_iter() .filter(|item| matches!(item.stage, Stage::Backlog)) .map(|item| UpcomingStory { story_id: item.story_id.0, name: if item.name.is_empty() { None } else { Some(item.name) }, error: None, merge_failure: None, agent: None, review_hold: None, qa: None, retry_count: if item.retry_count > 0 { Some(item.retry_count) } else { None }, blocked: if item.stage.is_blocked() { Some(true) } else { None }, depends_on: if item.depends_on.is_empty() { None } else { Some( item.depends_on .iter() .filter_map(|d| d.0.split('_').next()?.parse::().ok()) .collect(), ) }, }) .collect(); stories.sort_by(|a, b| a.story_id.cmp(&b.story_id)); Ok(stories) } /// Validate story front matter for all backlog and current items. pub fn validate_story_dirs(_root: &Path) -> Result, String> { use crate::pipeline_state::Stage; let mut results = Vec::new(); let typed_items = crate::pipeline_state::read_all_typed(); for item in typed_items { // Only validate backlog and current items (matching the old behaviour). if !matches!(item.stage, Stage::Backlog | Stage::Coding) { continue; } let story_id = item.story_id.0.clone(); match crate::db::read_content(&story_id) { Some(contents) => match parse_front_matter(&contents) { Ok(meta) => { let mut errors = Vec::new(); if meta.name.is_none() { errors.push("Missing 'name' field".to_string()); } if errors.is_empty() { results.push(StoryValidationResult { story_id, valid: true, error: None, }); } else { results.push(StoryValidationResult { story_id, valid: false, error: Some(errors.join("; ")), }); } } Err(e) => results.push(StoryValidationResult { story_id, valid: false, error: Some(e.to_string()), }), }, None => results.push(StoryValidationResult { story_id, valid: false, error: Some("No content found in content store".to_string()), }), } } results.sort_by(|a, b| a.story_id.cmp(&b.story_id)); Ok(results) } #[cfg(test)] mod tests { use super::*; #[test] fn load_pipeline_state_loads_all_stages() { let tmp = tempfile::tempdir().unwrap(); let root = tmp.path().to_path_buf(); crate::db::ensure_content_store(); for (stage, id) in &[ ("1_backlog", "9810_story_upcoming"), ("2_current", "9820_story_current"), ("3_qa", "9830_story_qa"), ("4_merge", "9840_story_merge"), ("5_done", "9850_story_done"), ] { crate::db::write_item_with_content(id, stage, &format!("---\nname: {id}\n---\n")); } let ctx = crate::http::context::AppContext::new_test(root); let state = load_pipeline_state(&ctx).unwrap(); assert!( state .backlog .iter() .any(|s| s.story_id == "9810_story_upcoming") ); assert!( state .current .iter() .any(|s| s.story_id == "9820_story_current") ); assert!(state.qa.iter().any(|s| s.story_id == "9830_story_qa")); assert!(state.merge.iter().any(|s| s.story_id == "9840_story_merge")); assert!(state.done.iter().any(|s| s.story_id == "9850_story_done")); } #[test] fn load_upcoming_returns_empty_when_no_dir() { // With CRDT there is no filesystem dependency. The function should // succeed even without a .huskies directory. Other tests may have // inserted items into the global CRDT, so we only assert no error. let tmp = tempfile::tempdir().unwrap(); let root = tmp.path().to_path_buf(); let ctx = crate::http::context::AppContext::new_test(root); let _result = load_upcoming_stories(&ctx).unwrap(); } #[test] fn pipeline_state_includes_agent_for_running_story() { let tmp = tempfile::tempdir().unwrap(); let root = tmp.path().to_path_buf(); crate::db::ensure_content_store(); crate::db::write_item_with_content( "9860_story_test", "2_current", "---\nname: Test Story\n---\n# Story\n", ); let ctx = crate::http::context::AppContext::new_test(root); ctx.services.agents.inject_test_agent( "9860_story_test", "coder-1", crate::agents::AgentStatus::Running, ); let state = load_pipeline_state(&ctx).unwrap(); let item = state .current .iter() .find(|s| s.story_id == "9860_story_test") .unwrap(); assert!( item.agent.is_some(), "running agent should appear on work item" ); let agent = item.agent.as_ref().unwrap(); assert_eq!(agent.agent_name, "coder-1"); assert_eq!(agent.status, "running"); } #[test] fn pipeline_state_no_agent_for_completed_story() { let tmp = tempfile::tempdir().unwrap(); let root = tmp.path().to_path_buf(); crate::db::ensure_content_store(); crate::db::write_item_with_content( "9861_story_done", "2_current", "---\nname: Done Story\n---\n# Story\n", ); let ctx = crate::http::context::AppContext::new_test(root); ctx.services.agents.inject_test_agent( "9861_story_done", "coder-1", crate::agents::AgentStatus::Completed, ); let state = load_pipeline_state(&ctx).unwrap(); let item = state .current .iter() .find(|s| s.story_id == "9861_story_done") .unwrap(); assert!( item.agent.is_none(), "completed agent should not appear on work item" ); } #[test] fn pipeline_state_pending_agent_included() { let tmp = tempfile::tempdir().unwrap(); let root = tmp.path().to_path_buf(); crate::db::ensure_content_store(); crate::db::write_item_with_content( "9862_story_pending", "2_current", "---\nname: Pending Story\n---\n# Story\n", ); let ctx = crate::http::context::AppContext::new_test(root); ctx.services.agents.inject_test_agent( "9862_story_pending", "coder-1", crate::agents::AgentStatus::Pending, ); let state = load_pipeline_state(&ctx).unwrap(); let item = state .current .iter() .find(|s| s.story_id == "9862_story_pending") .unwrap(); assert!( item.agent.is_some(), "pending agent should appear on work item" ); assert_eq!(item.agent.as_ref().unwrap().status, "pending"); } #[test] fn pipeline_state_includes_depends_on() { crate::db::ensure_content_store(); crate::db::write_item_with_content( "9863_story_dependent", "1_backlog", "---\nname: Dependent Story\ndepends_on: [10, 11]\n---\n", ); crate::db::write_item_with_content( "9864_story_independent", "1_backlog", "---\nname: Independent Story\n---\n", ); let tmp = tempfile::tempdir().unwrap(); let ctx = crate::http::context::AppContext::new_test(tmp.path().to_path_buf()); let state = load_pipeline_state(&ctx).unwrap(); let dependent = state .backlog .iter() .find(|s| s.story_id == "9863_story_dependent") .unwrap(); assert_eq!(dependent.depends_on, Some(vec![10, 11])); let independent = state .backlog .iter() .find(|s| s.story_id == "9864_story_independent") .unwrap(); assert_eq!(independent.depends_on, None); } #[test] fn load_upcoming_parses_metadata() { crate::db::ensure_content_store(); crate::db::write_item_with_content( "9870_story_view_upcoming", "1_backlog", "---\nname: View Upcoming\n---\n# Story\n", ); crate::db::write_item_with_content( "9871_story_worktree", "1_backlog", "---\nname: Worktree Orchestration\n---\n# Story\n", ); let tmp = tempfile::tempdir().unwrap(); let ctx = crate::http::context::AppContext::new_test(tmp.path().to_path_buf()); let stories = load_upcoming_stories(&ctx).unwrap(); let s1 = stories .iter() .find(|s| s.story_id == "9870_story_view_upcoming") .unwrap(); assert_eq!(s1.name.as_deref(), Some("View Upcoming")); let s2 = stories .iter() .find(|s| s.story_id == "9871_story_worktree") .unwrap(); assert_eq!(s2.name.as_deref(), Some("Worktree Orchestration")); } #[test] fn load_upcoming_skips_non_md_files() { // Non-.md files are a filesystem concept. With CRDT, only real items // appear. Just verify the CRDT item is returned. crate::db::ensure_content_store(); crate::db::write_item_with_content( "9872_story_example", "1_backlog", "---\nname: A Story\n---\n", ); let tmp = tempfile::tempdir().unwrap(); let ctx = crate::http::context::AppContext::new_test(tmp.path().to_path_buf()); let stories = load_upcoming_stories(&ctx).unwrap(); assert!(stories.iter().any(|s| s.story_id == "9872_story_example")); } #[test] fn validate_story_dirs_valid_files() { crate::db::ensure_content_store(); crate::db::write_item_with_content( "9873_story_todos", "2_current", "---\nname: Show TODOs\n---\n# Story\n", ); crate::db::write_item_with_content( "9874_story_front_matter", "1_backlog", "---\nname: Enforce Front Matter\n---\n# Story\n", ); let tmp = tempfile::tempdir().unwrap(); let results = validate_story_dirs(tmp.path()).unwrap(); let r1 = results .iter() .find(|r| r.story_id == "9873_story_todos") .unwrap(); assert!(r1.valid); let r2 = results .iter() .find(|r| r.story_id == "9874_story_front_matter") .unwrap(); assert!(r2.valid); } #[test] fn validate_story_dirs_missing_front_matter() { crate::db::ensure_content_store(); crate::db::write_item_with_content("9875_story_no_fm", "2_current", "# No front matter\n"); let tmp = tempfile::tempdir().unwrap(); let results = validate_story_dirs(tmp.path()).unwrap(); let r = results .iter() .find(|r| r.story_id == "9875_story_no_fm") .unwrap(); assert!(!r.valid); assert_eq!(r.error.as_deref(), Some("Missing front matter")); } #[test] fn validate_story_dirs_missing_required_fields() { crate::db::ensure_content_store(); crate::db::write_item_with_content( "9876_story_no_name", "2_current", "---\n---\n# Story\n", ); let tmp = tempfile::tempdir().unwrap(); let results = validate_story_dirs(tmp.path()).unwrap(); let r = results .iter() .find(|r| r.story_id == "9876_story_no_name") .unwrap(); assert!(!r.valid); let err = r.error.as_deref().unwrap(); assert!(err.contains("Missing 'name' field")); } #[test] fn validate_story_dirs_empty_when_no_dirs() { // With CRDT there's always global state; this test just ensures no panic. let tmp = tempfile::tempdir().unwrap(); let _results = validate_story_dirs(tmp.path()); } }