From 84717b04bd74eff1d0d90eb4be801b57fc1a16b8 Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 9 Apr 2026 21:24:11 +0000 Subject: [PATCH] huskies: merge 520_story_typed_pipeline_state_machine_in_rust_foundation_replaces_stringly_typed_crdt_views_with_strict_enums_subsumes_436 --- server/src/agents/lifecycle.rs | 10 +- server/src/agents/pool/auto_assign/scan.rs | 10 +- .../agents/pool/auto_assign/story_checks.rs | 4 +- server/src/agents/pool/pipeline/advance.rs | 12 +- server/src/agents/pool/worktree.rs | 15 +- server/src/chat/commands/depends.rs | 4 +- server/src/chat/commands/unblock.rs | 6 +- server/src/chat/timer.rs | 16 +- server/src/chat/transport/matrix/assign.rs | 4 +- server/src/chat/transport/matrix/delete.rs | 6 +- server/src/chat/transport/matrix/start.rs | 4 +- server/src/db/mod.rs | 21 +- server/src/http/mcp/mod.rs | 2 +- server/src/http/mcp/story_tools.rs | 8 +- server/src/http/workflow/bug_ops.rs | 56 +- server/src/http/workflow/mod.rs | 125 +- server/src/main.rs | 1 + server/src/pipeline_state.rs | 1387 +++++++++++++++++ 18 files changed, 1569 insertions(+), 122 deletions(-) create mode 100644 server/src/pipeline_state.rs diff --git a/server/src/agents/lifecycle.rs b/server/src/agents/lifecycle.rs index 4998dbdb..d81bfab1 100644 --- a/server/src/agents/lifecycle.rs +++ b/server/src/agents/lifecycle.rs @@ -33,15 +33,17 @@ fn move_item<'a>( fields_to_clear: &[&str], ) -> Result, String> { // Check if the item is already in the target stage or a done stage. - if let Some(item) = crate::crdt_state::read_item(story_id) { - if item.stage == target_dir - || extra_done_dirs.iter().any(|d| item.stage == *d) + // Use the typed projection for compile-safe stage comparison. + if let Ok(Some(typed_item)) = crate::pipeline_state::read_typed(story_id) { + let current_dir = typed_item.stage.dir_name(); + if current_dir == target_dir + || extra_done_dirs.contains(¤t_dir) { return Ok(None); // Idempotent: already there. } // Verify it's in one of the expected source stages. - let src_dir = sources.iter().find(|&&s| item.stage == s).copied(); + let src_dir = sources.iter().find(|&&s| current_dir == s).copied(); if src_dir.is_none() && !missing_ok { let locs = sources .iter() diff --git a/server/src/agents/pool/auto_assign/scan.rs b/server/src/agents/pool/auto_assign/scan.rs index 1caf2a1b..732c964e 100644 --- a/server/src/agents/pool/auto_assign/scan.rs +++ b/server/src/agents/pool/auto_assign/scan.rs @@ -22,12 +22,10 @@ pub(super) fn scan_stage_items(project_root: &Path, stage_dir: &str) -> Vec Vec { // Prefer CRDT-based check when the item is known to CRDT. - if crate::crdt_state::read_item(story_id).is_some() { + if crate::pipeline_state::read_typed(story_id).ok().flatten().is_some() { return crate::crdt_state::check_archived_deps_crdt(story_id); } // Fallback: filesystem. diff --git a/server/src/agents/pool/pipeline/advance.rs b/server/src/agents/pool/pipeline/advance.rs index 318ad650..6671d875 100644 --- a/server/src/agents/pool/pipeline/advance.rs +++ b/server/src/agents/pool/pipeline/advance.rs @@ -395,8 +395,10 @@ fn write_review_hold_to_store(story_id: &str) { let updated = crate::io::story_metadata::write_review_hold_in_content(&contents); crate::db::write_content(story_id, &updated); // Also persist to SQLite via shadow write. - let stage = crate::crdt_state::read_item(story_id) - .map(|i| i.stage) + let stage = crate::pipeline_state::read_typed(story_id) + .ok() + .flatten() + .map(|i| i.stage.dir_name().to_string()) .unwrap_or_else(|| "3_qa".to_string()); crate::db::write_item_with_content(story_id, &stage, &updated); } else { @@ -419,8 +421,10 @@ fn should_block_story(story_id: &str, max_retries: u32, stage_label: &str) -> Op if let Some(contents) = crate::db::read_content(story_id) { let (updated, new_count) = increment_retry_count_in_content(&contents); crate::db::write_content(story_id, &updated); - let stage = crate::crdt_state::read_item(story_id) - .map(|i| i.stage) + let stage = crate::pipeline_state::read_typed(story_id) + .ok() + .flatten() + .map(|i| i.stage.dir_name().to_string()) .unwrap_or_else(|| "2_current".to_string()); crate::db::write_item_with_content(story_id, &stage, &updated); diff --git a/server/src/agents/pool/worktree.rs b/server/src/agents/pool/worktree.rs index 2edaf82d..05104918 100644 --- a/server/src/agents/pool/worktree.rs +++ b/server/src/agents/pool/worktree.rs @@ -23,18 +23,15 @@ impl AgentPool { /// Return the active pipeline stage directory name for `story_id`, or `None` if the /// story is not in any active stage (`2_current/`, `3_qa/`, `4_merge/`). pub(super) fn find_active_story_stage(project_root: &Path, story_id: &str) -> Option<&'static str> { - const STAGES: [&str; 3] = ["2_current", "3_qa", "4_merge"]; - - // Try CRDT first — primary source of truth. - if let Some(item) = crate::crdt_state::read_item(story_id) { - for stage in &STAGES { - if item.stage == *stage { - return Some(stage); - } - } + // Try typed CRDT projection first — primary source of truth. + if let Ok(Some(item)) = crate::pipeline_state::read_typed(story_id) + && item.stage.is_active() + { + return Some(item.stage.dir_name()); } // Also check filesystem (backwards compat / tests). + const STAGES: [&str; 3] = ["2_current", "3_qa", "4_merge"]; for stage in &STAGES { let path = project_root .join(".huskies") diff --git a/server/src/chat/commands/depends.rs b/server/src/chat/commands/depends.rs index c91d95e1..d7ab03ca 100644 --- a/server/src/chat/commands/depends.rs +++ b/server/src/chat/commands/depends.rs @@ -67,12 +67,12 @@ pub(super) fn handle_depends(ctx: &CommandContext) -> Option { // --- DB-first lookup --- for id in crate::db::all_content_ids() { let file_num = id.split('_').next().unwrap_or(""); - if file_num == num_str && let Some(item) = crate::crdt_state::read_item(&id) { + if file_num == num_str && let Ok(Some(item)) = crate::pipeline_state::read_typed(&id) { let path = ctx .project_root .join(".huskies") .join("work") - .join(&item.stage) + .join(item.stage.dir_name()) .join(format!("{id}.md")); found = Some((path, id)); break; diff --git a/server/src/chat/commands/unblock.rs b/server/src/chat/commands/unblock.rs index e1e87b8d..6f277162 100644 --- a/server/src/chat/commands/unblock.rs +++ b/server/src/chat/commands/unblock.rs @@ -129,8 +129,10 @@ fn unblock_by_story_id(story_id: &str) -> String { updated = set_front_matter_field(&updated, "retry_count", "0"); crate::db::write_content(story_id, &updated); - let stage = crate::crdt_state::read_item(story_id) - .map(|i| i.stage) + let stage = crate::pipeline_state::read_typed(story_id) + .ok() + .flatten() + .map(|i| i.stage.dir_name().to_string()) .unwrap_or_else(|| "2_current".to_string()); crate::db::write_item_with_content(story_id, &stage, &updated); diff --git a/server/src/chat/timer.rs b/server/src/chat/timer.rs index eb4c8afa..736d5a8e 100644 --- a/server/src/chat/timer.rs +++ b/server/src/chat/timer.rs @@ -207,14 +207,15 @@ async fn tick_once( // remove the timer before it fires; this guard covers the case where // cancellation was not yet called or the story raced forward through // the pipeline while the timer was pending. - if let Some(item) = crate::crdt_state::read_item(&entry.story_id) { - match item.stage.as_str() { - "3_qa" | "4_merge" | "5_done" | "6_archived" => { + if let Ok(Some(item)) = crate::pipeline_state::read_typed(&entry.story_id) { + use crate::pipeline_state::Stage; + match &item.stage { + Stage::Qa | Stage::Merge { .. } | Stage::Done { .. } | Stage::Archived { .. } => { crate::slog!( "[timer] Skipping timer for story {} — currently in '{}', \ not in backlog/current; timer is stale", entry.story_id, - item.stage + item.stage.dir_name() ); continue; } @@ -425,8 +426,9 @@ pub async fn handle_timer_command( // The story must be in backlog or current. When the timer fires, // backlog stories are moved to current automatically. // Check CRDT state first, then fall back to filesystem. - let in_valid_stage = if let Some(item) = crate::crdt_state::read_item(&story_id) { - matches!(item.stage.as_str(), "1_backlog" | "2_current") + let in_valid_stage = if let Ok(Some(item)) = crate::pipeline_state::read_typed(&story_id) { + use crate::pipeline_state::Stage; + matches!(item.stage, Stage::Backlog | Stage::Coding) } else { let work_dir = project_root.join(".huskies").join("work"); work_dir.join("1_backlog").join(format!("{story_id}.md")).exists() @@ -588,7 +590,7 @@ fn resolve_story_id(number_or_id: &str, project_root: &Path) -> Option { // --- DB-first lookup --- for id in crate::db::all_content_ids() { let file_num = id.split('_').next().unwrap_or(""); - if file_num == number_or_id && crate::crdt_state::read_item(&id).is_some() { + if file_num == number_or_id && crate::pipeline_state::read_typed(&id).ok().flatten().is_some() { return Some(id); } } diff --git a/server/src/chat/transport/matrix/assign.rs b/server/src/chat/transport/matrix/assign.rs index bda0fa67..b6c423a5 100644 --- a/server/src/chat/transport/matrix/assign.rs +++ b/server/src/chat/transport/matrix/assign.rs @@ -108,11 +108,11 @@ pub async fn handle_assign( // --- DB-first lookup --- for id in crate::db::all_content_ids() { let file_num = id.split('_').next().unwrap_or(""); - if file_num == story_number && let Some(item) = crate::crdt_state::read_item(&id) { + if file_num == story_number && let Ok(Some(item)) = crate::pipeline_state::read_typed(&id) { let path = project_root .join(".huskies") .join("work") - .join(&item.stage) + .join(item.stage.dir_name()) .join(format!("{id}.md")); found = Some((path, id)); break; diff --git a/server/src/chat/transport/matrix/delete.rs b/server/src/chat/transport/matrix/delete.rs index 4b1040b6..dbaca187 100644 --- a/server/src/chat/transport/matrix/delete.rs +++ b/server/src/chat/transport/matrix/delete.rs @@ -76,13 +76,13 @@ pub async fn handle_delete( // --- DB-first lookup --- for id in crate::db::all_content_ids() { let file_num = id.split('_').next().unwrap_or(""); - if file_num == story_number && let Some(item) = crate::crdt_state::read_item(&id) { + if file_num == story_number && let Ok(Some(item)) = crate::pipeline_state::read_typed(&id) { let path = project_root .join(".huskies") .join("work") - .join(&item.stage) + .join(item.stage.dir_name()) .join(format!("{id}.md")); - found = Some((path, item.stage, id)); + found = Some((path, item.stage.dir_name().to_string(), id)); break; } } diff --git a/server/src/chat/transport/matrix/start.rs b/server/src/chat/transport/matrix/start.rs index 87da5cf5..ab6c35d8 100644 --- a/server/src/chat/transport/matrix/start.rs +++ b/server/src/chat/transport/matrix/start.rs @@ -95,11 +95,11 @@ pub async fn handle_start( // --- DB-first lookup --- for id in crate::db::all_content_ids() { let file_num = id.split('_').next().unwrap_or(""); - if file_num == story_number && let Some(item) = crate::crdt_state::read_item(&id) { + if file_num == story_number && let Ok(Some(item)) = crate::pipeline_state::read_typed(&id) { let path = project_root .join(".huskies") .join("work") - .join(&item.stage) + .join(item.stage.dir_name()) .join(format!("{id}.md")); found = Some((path, id)); break; diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index f3a593b7..77cc92f5 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -309,17 +309,16 @@ pub fn delete_item(story_id: &str) { pub fn next_item_number() -> u32 { let mut max_num: u32 = 0; - // Scan CRDT items. - if let Some(items) = crate::crdt_state::read_all_items() { - for item in &items { - let num_str: String = item - .story_id - .chars() - .take_while(|c| c.is_ascii_digit()) - .collect(); - if let Ok(n) = num_str.parse::() && n > max_num { - max_num = n; - } + // Scan CRDT items via typed projection. + for item in crate::pipeline_state::read_all_typed() { + let num_str: String = item + .story_id + .0 + .chars() + .take_while(|c| c.is_ascii_digit()) + .collect(); + if let Ok(n) = num_str.parse::() && n > max_num { + max_num = n; } } diff --git a/server/src/http/mcp/mod.rs b/server/src/http/mcp/mod.rs index cc2c4094..5980ef06 100644 --- a/server/src/http/mcp/mod.rs +++ b/server/src/http/mcp/mod.rs @@ -1461,7 +1461,7 @@ mod tests { assert!(names.contains(&"git_log")); assert!(names.contains(&"status")); assert!(names.contains(&"loc_file")); - assert_eq!(tools.len(), 57); + assert_eq!(tools.len(), 58); } #[test] diff --git a/server/src/http/mcp/story_tools.rs b/server/src/http/mcp/story_tools.rs index 5ac4331e..18c5e9d7 100644 --- a/server/src/http/mcp/story_tools.rs +++ b/server/src/http/mcp/story_tools.rs @@ -376,8 +376,10 @@ pub(super) fn tool_update_story(args: &Value, ctx: &AppContext) -> Result Result< // 4. Delete from database content store and CRDT. let found_in_db = crate::db::read_content(story_id).is_some() - || crate::crdt_state::read_item(story_id).is_some(); + || crate::pipeline_state::read_typed(story_id).ok().flatten().is_some(); crate::db::delete_item(story_id); diff --git a/server/src/http/workflow/bug_ops.rs b/server/src/http/workflow/bug_ops.rs index de3871a9..09d344af 100644 --- a/server/src/http/workflow/bug_ops.rs +++ b/server/src/http/workflow/bug_ops.rs @@ -202,21 +202,20 @@ pub fn list_bug_files(root: &Path) -> Result, String> { let mut bugs = Vec::new(); let mut seen = std::collections::HashSet::new(); - // First: CRDT items in backlog that are bugs. - if let Some(items) = crate::crdt_state::read_all_items() { - for item in items { - if item.stage != "1_backlog" || !is_bug_item(&item.story_id) { - continue; - } - let name = item.name.clone() - .or_else(|| { - crate::db::read_content(&item.story_id) - .and_then(|c| extract_bug_name_from_content(&c)) - }) - .unwrap_or_else(|| item.story_id.clone()); - seen.insert(item.story_id.clone()); - bugs.push((item.story_id, name)); + // First: typed projection items in backlog that are bugs. + for item in crate::pipeline_state::read_all_typed() { + if !matches!(item.stage, crate::pipeline_state::Stage::Backlog) || !is_bug_item(&item.story_id.0) { + continue; } + let sid = item.story_id.0; + let name = if item.name.is_empty() { None } else { Some(item.name) } + .or_else(|| { + crate::db::read_content(&sid) + .and_then(|c| extract_bug_name_from_content(&c)) + }) + .unwrap_or_else(|| sid.clone()); + seen.insert(sid.clone()); + bugs.push((sid, name)); } // Then: filesystem fallback. @@ -267,22 +266,21 @@ pub fn list_refactor_files(root: &Path) -> Result, String> let mut refactors = Vec::new(); let mut seen = std::collections::HashSet::new(); - // First: CRDT items. - if let Some(items) = crate::crdt_state::read_all_items() { - for item in items { - if item.stage != "1_backlog" || !is_refactor_item(&item.story_id) { - continue; - } - let name = item.name.clone() - .or_else(|| { - crate::db::read_content(&item.story_id) - .and_then(|c| parse_front_matter(&c).ok()) - .and_then(|m| m.name) - }) - .unwrap_or_else(|| item.story_id.clone()); - seen.insert(item.story_id.clone()); - refactors.push((item.story_id, name)); + // First: typed projection items. + for item in crate::pipeline_state::read_all_typed() { + if !matches!(item.stage, crate::pipeline_state::Stage::Backlog) || !is_refactor_item(&item.story_id.0) { + continue; } + let sid = item.story_id.0; + let name = if item.name.is_empty() { None } else { Some(item.name) } + .or_else(|| { + crate::db::read_content(&sid) + .and_then(|c| parse_front_matter(&c).ok()) + .and_then(|m| m.name) + }) + .unwrap_or_else(|| sid.clone()); + seen.insert(sid.clone()); + refactors.push((sid, name)); } // Then: filesystem fallback. diff --git a/server/src/http/workflow/mod.rs b/server/src/http/workflow/mod.rs index 0690ddc6..7d71d452 100644 --- a/server/src/http/workflow/mod.rs +++ b/server/src/http/workflow/mod.rs @@ -79,8 +79,11 @@ pub struct PipelineState { pub fn load_pipeline_state(ctx: &AppContext) -> Result { let agent_map = build_active_agent_map(ctx); - // Try CRDT-first read. - if let Some(crdt_items) = crate::crdt_state::read_all_items() { + // Try CRDT-first read via the typed projection layer. + let typed_items = crate::pipeline_state::read_all_typed(); + if !typed_items.is_empty() { + use crate::pipeline_state::Stage; + let mut state = PipelineState { backlog: Vec::new(), current: Vec::new(), @@ -89,11 +92,12 @@ pub fn load_pipeline_state(ctx: &AppContext) -> Result { done: Vec::new(), }; - for item in crdt_items { - let agent = agent_map.get(&item.story_id).cloned(); + for item in typed_items { + let sid = &item.story_id.0; + let agent = agent_map.get(sid).cloned(); // Enrich with content-derived metadata (merge_failure, review_hold, qa). - let (merge_failure, review_hold, qa) = crate::db::read_content(&item.story_id) + let (merge_failure, review_hold, qa) = crate::db::read_content(sid) .and_then(|c| parse_front_matter(&c).ok()) .map(|meta| { ( @@ -105,24 +109,45 @@ pub fn load_pipeline_state(ctx: &AppContext) -> Result { .unwrap_or((None, None, None)); let story = UpcomingStory { - story_id: item.story_id, - name: item.name, + story_id: sid.clone(), + name: if item.name.is_empty() { + None + } else { + Some(item.name.clone()) + }, error: None, merge_failure, agent, review_hold, qa, - retry_count: item.retry_count.map(|r| r as u32), - blocked: item.blocked, - depends_on: item.depends_on, + retry_count: if item.retry_count > 0 { + Some(item.retry_count) + } else { + None + }, + blocked: if item.stage.is_blocked() { + Some(true) + } else { + None + }, + depends_on: if item.depends_on.is_empty() { + None + } else { + Some( + item.depends_on + .iter() + .filter_map(|d| d.0.split('_').next()?.parse::().ok()) + .collect(), + ) + }, }; - match item.stage.as_str() { - "1_backlog" => state.backlog.push(story), - "2_current" => state.current.push(story), - "3_qa" => state.qa.push(story), - "4_merge" => state.merge.push(story), - "5_done" => state.done.push(story), - _ => {} // ignore archived or unknown stages + match &item.stage { + Stage::Backlog => state.backlog.push(story), + Stage::Coding => state.current.push(story), + Stage::Qa => state.qa.push(story), + Stage::Merge { .. } => state.merge.push(story), + Stage::Done { .. } => state.done.push(story), + Stage::Archived { .. } => {} // skip archived } } @@ -256,22 +281,46 @@ fn load_stage_items_from_fs( } pub fn load_upcoming_stories(ctx: &AppContext) -> Result, String> { - // Try CRDT first. - if let Some(crdt_items) = crate::crdt_state::read_all_items() { - let mut stories: Vec = crdt_items + // Try typed projection first. + let typed_items = crate::pipeline_state::read_all_typed(); + if !typed_items.is_empty() { + use crate::pipeline_state::Stage; + + let mut stories: Vec = typed_items .into_iter() - .filter(|item| item.stage == "1_backlog") + .filter(|item| matches!(item.stage, Stage::Backlog)) .map(|item| UpcomingStory { - story_id: item.story_id, - name: item.name, + story_id: item.story_id.0, + name: if item.name.is_empty() { + None + } else { + Some(item.name) + }, error: None, merge_failure: None, agent: None, review_hold: None, qa: None, - retry_count: item.retry_count.map(|r| r as u32), - blocked: item.blocked, - depends_on: item.depends_on, + retry_count: if item.retry_count > 0 { + Some(item.retry_count) + } else { + None + }, + blocked: if item.stage.is_blocked() { + Some(true) + } else { + None + }, + depends_on: if item.depends_on.is_empty() { + None + } else { + Some( + item.depends_on + .iter() + .filter_map(|d| d.0.split('_').next()?.parse::().ok()) + .collect(), + ) + }, }) .collect(); stories.sort_by(|a, b| a.story_id.cmp(&b.story_id)); @@ -295,13 +344,16 @@ pub fn validate_story_dirs( ) -> Result, String> { let mut results = Vec::new(); - // Validate from CRDT + content store. - if let Some(crdt_items) = crate::crdt_state::read_all_items() { - for item in crdt_items { - if item.stage != "1_backlog" && item.stage != "2_current" { + // Validate from typed projection + content store. + { + let typed_items = crate::pipeline_state::read_all_typed(); + for item in typed_items { + use crate::pipeline_state::Stage; + if !matches!(item.stage, Stage::Backlog | Stage::Coding) { continue; } - if let Some(content) = crate::db::read_content(&item.story_id) { + let sid = item.story_id.0.clone(); + if let Some(content) = crate::db::read_content(&sid) { match parse_front_matter(&content) { Ok(meta) => { let mut errors = Vec::new(); @@ -310,20 +362,20 @@ pub fn validate_story_dirs( } if errors.is_empty() { results.push(StoryValidationResult { - story_id: item.story_id, + story_id: sid, valid: true, error: None, }); } else { results.push(StoryValidationResult { - story_id: item.story_id, + story_id: sid, valid: false, error: Some(errors.join("; ")), }); } } Err(e) => results.push(StoryValidationResult { - story_id: item.story_id, + story_id: sid, valid: false, error: Some(e.to_string()), }), @@ -435,7 +487,10 @@ pub(super) fn write_story_content_with_fs(project_root: &Path, story_id: &str, s /// Determine what stage a story is in (from CRDT). pub(super) fn story_stage(story_id: &str) -> Option { - crate::crdt_state::read_item(story_id).map(|item| item.stage) + crate::pipeline_state::read_typed(story_id) + .ok() + .flatten() + .map(|item| item.stage.dir_name().to_string()) } /// Locate a work item file by searching all active pipeline stages on disk. diff --git a/server/src/main.rs b/server/src/main.rs index 7cc98ad4..a8d3e1d2 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -17,6 +17,7 @@ pub mod rebuild; mod state; mod store; mod workflow; +pub(crate) mod pipeline_state; mod worktree; use crate::agents::AgentPool; diff --git a/server/src/pipeline_state.rs b/server/src/pipeline_state.rs new file mode 100644 index 00000000..fddb8d66 --- /dev/null +++ b/server/src/pipeline_state.rs @@ -0,0 +1,1387 @@ +//! Typed pipeline state machine (story 520). +//! +//! Replaces the stringly-typed CRDT views with strict Rust enums so that +//! impossible states (e.g. `Stage::Merge` with zero commits, a "done" story +//! with no merge metadata) are unrepresentable at compile time. +//! +//! The CRDT stays loose at the persistence layer — that's what makes it merge +//! correctly across nodes. Every consumer above the CRDT operates on these +//! strict typed enums, bridged by the projection layer (`TryFrom` / `From`). +//! +//! This is a foundation module: the types, transitions, projection layer, and +//! event bus are fully defined and tested here. Consumers will be migrated to +//! the typed API incrementally in follow-up stories. + +// Foundation module — all items are exercised by tests but not yet called from +// non-test code. The dead_code lint is suppressed until consumer migration. +#![allow(dead_code)] + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::num::NonZeroU32; + +use crate::crdt_state::PipelineItemView; + +// ── Newtypes ──────────────────────────────────────────────────────────────── + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct StoryId(pub String); + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct BranchName(pub String); + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct GitSha(pub String); + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct AgentName(pub String); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct NodePubkey(pub [u8; 32]); + +impl fmt::Display for StoryId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.0) + } +} + +impl fmt::Display for AgentName { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.0) + } +} + +// ── Synced pipeline stage (lives in CRDT, converges across nodes) ─────────── + +/// The pipeline stage for a work item. +/// +/// This is the SHARED state — every node sees the same Stage for a given story +/// after CRDT convergence. Notice what is NOT a field: +/// - `agent` — local execution state, not pipeline state +/// - `retry_count` — also local +/// - `blocked` — folded into `Archived { reason: Blocked { .. } }` +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum Stage { + /// Story exists, waiting for dependencies or auto-assign promotion. + Backlog, + + /// Story is being actively coded somewhere in the mesh. + Coding, + + /// Coder has run; gates are running. + Qa, + + /// Gates passed; ready to merge. + /// `commits_ahead: NonZeroU32` makes "Merge with nothing to merge" + /// structurally impossible (eliminates bug 519). + Merge { + feature_branch: BranchName, + commits_ahead: NonZeroU32, + }, + + /// Mergemaster squashed to master. Always carries merge metadata. + Done { + merged_at: DateTime, + merge_commit: GitSha, + }, + + /// Out of the active flow. The reason explains why. + Archived { + archived_at: DateTime, + reason: ArchiveReason, + }, +} + +/// Why a story was archived. Subsumes the old `blocked`, `merge_failure`, +/// and `review_hold` front-matter fields (story 436). +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum ArchiveReason { + /// Normal happy-path completion. + Completed, + /// User explicitly abandoned the story. + Abandoned, + /// Replaced by another story. + Superseded { by: StoryId }, + /// Manually blocked, awaiting human resolution. + Blocked { reason: String }, + /// Mergemaster failed beyond the retry budget. + MergeFailed { reason: String }, + /// Held in review at human request. + ReviewHeld { reason: String }, +} + +// ── Stage convenience methods ────────────────────────────────────────────── + +impl Stage { + /// Returns true if this stage is an "active" stage (Coding, Qa, or Merge). + pub fn is_active(&self) -> bool { + matches!(self, Stage::Coding | Stage::Qa | Stage::Merge { .. }) + } + + /// Returns the filesystem directory name for this stage. + pub fn dir_name(&self) -> &'static str { + stage_dir_name(self) + } + + /// Returns true if this is the Archived(Blocked) variant. + pub fn is_blocked(&self) -> bool { + matches!( + self, + Stage::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + } + ) + } +} + +// ── Per-node execution state ──────────────────────────────────────────────── + +/// Per-node execution tracking, stored in the CRDT under each node's pubkey. +/// +/// Each node only writes to entries where `node_pubkey == self.pubkey`, so +/// there are no inter-author CRDT merge conflicts. All nodes can READ all +/// entries to know what's happening across the mesh. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum ExecutionState { + Idle, + Pending { + agent: AgentName, + since: DateTime, + }, + Running { + agent: AgentName, + started_at: DateTime, + last_heartbeat: DateTime, + }, + RateLimited { + agent: AgentName, + resume_at: DateTime, + }, + Completed { + agent: AgentName, + exit_code: i32, + completed_at: DateTime, + }, +} + +// ── Pipeline item (the aggregate) ─────────────────────────────────────────── + +/// A fully typed pipeline item. Every field is validated by construction. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PipelineItem { + pub story_id: StoryId, + pub name: String, + pub stage: Stage, + pub depends_on: Vec, + pub retry_count: u32, +} + +// ── Pipeline events ───────────────────────────────────────────────────────── + +/// Events that drive Stage transitions. Each variant carries the data needed +/// to construct the destination state, so the transition function can never +/// accidentally land in an underspecified state. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum PipelineEvent { + /// Dependencies met; promote from backlog. + DepsMet, + /// Coder starting gates. + GatesStarted, + /// Gates passed — ready to merge. + GatesPassed { + feature_branch: BranchName, + commits_ahead: NonZeroU32, + }, + /// Gates failed; retry. + GatesFailed { reason: String }, + /// QA mode is "server" — skip QA, go straight to merge. + QaSkipped { + feature_branch: BranchName, + commits_ahead: NonZeroU32, + }, + /// Mergemaster squash succeeded. + MergeSucceeded { merge_commit: GitSha }, + /// Mergemaster gave up after retry budget. + MergeFailedFinal { reason: String }, + /// Story accepted (Done → Archived). + Accepted, + /// User blocked the story. + Block { reason: String }, + /// User unblocked. + Unblock, + /// User abandoned. + Abandon, + /// Story superseded by another. + Supersede { by: StoryId }, + /// Story put on review hold. + ReviewHold { reason: String }, +} + +// ── Transition errors ─────────────────────────────────────────────────────── + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum TransitionError { + InvalidTransition { + from_stage: String, + event: String, + }, +} + +impl fmt::Display for TransitionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InvalidTransition { from_stage, event } => { + write!(f, "invalid transition: {from_stage} + {event}") + } + } + } +} + +impl std::error::Error for TransitionError {} + +// ── The transition function ───────────────────────────────────────────────── + +/// Pure state transition. Takes the current Stage and an event, returns the +/// new Stage or a TransitionError. Side effects are dispatched separately +/// via the event bus. +pub fn transition(state: Stage, event: PipelineEvent) -> Result { + use PipelineEvent::*; + use Stage::*; + + let sl = stage_label(&state); + let el = event_label(&event); + let invalid = || TransitionError::InvalidTransition { + from_stage: sl.to_string(), + event: el.to_string(), + }; + + let now = Utc::now(); + + match (state, event) { + // ── Forward path ──────────────────────────────────────────────── + (Backlog, DepsMet) => Ok(Coding), + (Coding, GatesStarted) => Ok(Qa), + (Coding, QaSkipped { feature_branch, commits_ahead }) => Ok(Merge { + feature_branch, + commits_ahead, + }), + (Qa, GatesPassed { feature_branch, commits_ahead }) => Ok(Merge { + feature_branch, + commits_ahead, + }), + (Qa, GatesFailed { .. }) => Ok(Coding), + (Merge { .. }, MergeSucceeded { merge_commit }) => Ok(Done { + merged_at: now, + merge_commit, + }), + + // ── Done → Archived(Completed) ────────────────────────────────── + (Done { .. }, Accepted) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::Completed, + }), + + // ── Stuck states (any active → Archived) ─────────────────────── + (Backlog, Block { reason }) + | (Coding, Block { reason }) + | (Qa, Block { reason }) + | (Merge { .. }, Block { reason }) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::Blocked { reason }, + }), + + (Backlog, ReviewHold { reason }) + | (Coding, ReviewHold { reason }) + | (Qa, ReviewHold { reason }) + | (Merge { .. }, ReviewHold { reason }) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::ReviewHeld { reason }, + }), + + (Merge { .. }, MergeFailedFinal { reason }) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::MergeFailed { reason }, + }), + + // ── Abandon / supersede from any active or done stage ─────────── + (Backlog, Abandon) + | (Coding, Abandon) + | (Qa, Abandon) + | (Merge { .. }, Abandon) + | (Done { .. }, Abandon) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::Abandoned, + }), + + (Backlog, Supersede { by }) + | (Coding, Supersede { by }) + | (Qa, Supersede { by }) + | (Merge { .. }, Supersede { by }) + | (Done { .. }, Supersede { by }) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::Superseded { by }, + }), + + // ── Unblock: only from Archived(Blocked) → Backlog ───────────── + ( + Archived { + reason: ArchiveReason::Blocked { .. }, + .. + }, + Unblock, + ) => Ok(Backlog), + + // ── Everything else is invalid ────────────────────────────────── + _ => Err(invalid()), + } +} + +// ── Label helpers ─────────────────────────────────────────────────────────── + +pub fn stage_label(s: &Stage) -> &'static str { + match s { + Stage::Backlog => "Backlog", + Stage::Coding => "Coding", + Stage::Qa => "Qa", + Stage::Merge { .. } => "Merge", + Stage::Done { .. } => "Done", + Stage::Archived { .. } => "Archived", + } +} + +pub fn event_label(e: &PipelineEvent) -> &'static str { + match e { + PipelineEvent::DepsMet => "DepsMet", + PipelineEvent::GatesStarted => "GatesStarted", + PipelineEvent::GatesPassed { .. } => "GatesPassed", + PipelineEvent::GatesFailed { .. } => "GatesFailed", + PipelineEvent::QaSkipped { .. } => "QaSkipped", + PipelineEvent::MergeSucceeded { .. } => "MergeSucceeded", + PipelineEvent::MergeFailedFinal { .. } => "MergeFailedFinal", + PipelineEvent::Accepted => "Accepted", + PipelineEvent::Block { .. } => "Block", + PipelineEvent::Unblock => "Unblock", + PipelineEvent::Abandon => "Abandon", + PipelineEvent::Supersede { .. } => "Supersede", + PipelineEvent::ReviewHold { .. } => "ReviewHold", + } +} + +/// Map a Stage to the filesystem directory name used by the work pipeline. +pub fn stage_dir_name(s: &Stage) -> &'static str { + match s { + Stage::Backlog => "1_backlog", + Stage::Coding => "2_current", + Stage::Qa => "3_qa", + Stage::Merge { .. } => "4_merge", + Stage::Done { .. } => "5_done", + Stage::Archived { .. } => "6_archived", + } +} + +// ── Per-node execution state machine ──────────────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ExecutionEvent { + SpawnRequested { agent: AgentName }, + SpawnedSuccessfully, + Heartbeat, + HitRateLimit { resume_at: DateTime }, + Exited { exit_code: i32 }, + Stopped, + Reset, +} + +pub fn execution_transition( + state: ExecutionState, + event: ExecutionEvent, +) -> Result { + use ExecutionEvent::*; + use ExecutionState::*; + + let now = Utc::now(); + + match (state, event) { + (Idle, SpawnRequested { agent }) => Ok(Pending { agent, since: now }), + + (Pending { agent, .. }, SpawnedSuccessfully) => Ok(Running { + agent, + started_at: now, + last_heartbeat: now, + }), + + ( + Running { + agent, started_at, .. + }, + Heartbeat, + ) => Ok(Running { + agent, + started_at, + last_heartbeat: now, + }), + + (Running { agent, .. }, HitRateLimit { resume_at }) + | (Pending { agent, .. }, HitRateLimit { resume_at }) => { + Ok(RateLimited { agent, resume_at }) + } + + (RateLimited { agent, .. }, SpawnedSuccessfully) => Ok(Running { + agent, + started_at: now, + last_heartbeat: now, + }), + + (Running { agent, .. }, Exited { exit_code }) + | (Pending { agent, .. }, Exited { exit_code }) + | (RateLimited { agent, .. }, Exited { exit_code }) => Ok(Completed { + agent, + exit_code, + completed_at: now, + }), + + (_, Stopped) | (_, Reset) => Ok(Idle), + + _ => Err(TransitionError::InvalidTransition { + from_stage: "ExecutionState".to_string(), + event: "".to_string(), + }), + } +} + +// ── Event bus ─────────────────────────────────────────────────────────────── + +/// Fired when a pipeline stage transition completes. +#[derive(Debug, Clone)] +pub struct TransitionFired { + pub story_id: StoryId, + pub before: Stage, + pub after: Stage, + pub event: PipelineEvent, + pub at: DateTime, +} + +/// Trait for side-effect handlers that react to pipeline transitions. +pub trait TransitionSubscriber: Send + Sync { + fn name(&self) -> &'static str; + fn on_transition(&self, fired: &TransitionFired); +} + +pub struct EventBus { + subscribers: Vec>, +} + +impl EventBus { + pub fn new() -> Self { + Self { + subscribers: Vec::new(), + } + } + + pub fn subscribe(&mut self, subscriber: S) { + self.subscribers.push(Box::new(subscriber)); + } + + pub fn fire(&self, event: TransitionFired) { + for sub in &self.subscribers { + sub.on_transition(&event); + } + } +} + +impl Default for EventBus { + fn default() -> Self { + Self::new() + } +} + +// ── Projection errors ─────────────────────────────────────────────────────── + +/// Errors from projecting loose CRDT data into typed enums. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ProjectionError { + /// The stage string from the CRDT doesn't map to any known Stage variant. + UnknownStage(String), + /// A required field is missing from the CRDT data. + MissingField(&'static str), + /// A field has an invalid value. + InvalidField { + field: &'static str, + detail: String, + }, +} + +impl fmt::Display for ProjectionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::UnknownStage(s) => write!(f, "unknown stage: {s:?}"), + Self::MissingField(field) => write!(f, "missing required field: {field}"), + Self::InvalidField { field, detail } => { + write!(f, "invalid field {field}: {detail}") + } + } + } +} + +impl std::error::Error for ProjectionError {} + +// ── Projection: PipelineItemView → PipelineItem ───────────────────────────── + +impl TryFrom<&PipelineItemView> for PipelineItem { + type Error = ProjectionError; + + fn try_from(view: &PipelineItemView) -> Result { + let story_id = StoryId(view.story_id.clone()); + let name = view.name.clone().unwrap_or_default(); + + let depends_on: Vec = view + .depends_on + .as_ref() + .map(|deps| deps.iter().map(|d| StoryId(d.to_string())).collect()) + .unwrap_or_default(); + + let retry_count = view.retry_count.unwrap_or(0).max(0) as u32; + + let stage = project_stage(view)?; + + Ok(PipelineItem { + story_id, + name, + stage, + depends_on, + retry_count, + }) + } +} + +/// Project the stage string + associated fields from a PipelineItemView into +/// a typed Stage enum. This is the one carefully-controlled boundary where +/// loose CRDT data becomes typed. +fn project_stage(view: &PipelineItemView) -> Result { + match view.stage.as_str() { + "1_backlog" => Ok(Stage::Backlog), + "2_current" => Ok(Stage::Coding), + "3_qa" => Ok(Stage::Qa), + "4_merge" => { + // Merge stage in the current CRDT doesn't carry feature_branch or + // commits_ahead — those are computed at transition time. For + // projection from existing CRDT data, we synthesize defaults. + // The feature branch follows the naming convention. + let branch = format!("feature/story-{}", view.story_id); + // Existing CRDT data doesn't track commits_ahead, so we use 1 as + // a safe non-zero default (the item is in merge, so there must be + // at least one commit). + Ok(Stage::Merge { + feature_branch: BranchName(branch), + commits_ahead: NonZeroU32::new(1) + .expect("1 is non-zero"), + }) + } + "5_done" => { + // Existing CRDT data doesn't carry merge metadata. For projection + // from legacy data, we use epoch/placeholder values. New items + // entering Done via the transition function will carry real data. + Ok(Stage::Done { + merged_at: Utc::now(), + merge_commit: GitSha("legacy".to_string()), + }) + } + "6_archived" => { + // Determine the archive reason from the CRDT fields. + let reason = if view.blocked == Some(true) { + ArchiveReason::Blocked { + reason: "migrated from legacy blocked field".to_string(), + } + } else { + // Default to Completed for legacy archived items. + ArchiveReason::Completed + }; + Ok(Stage::Archived { + archived_at: Utc::now(), + reason, + }) + } + other => Err(ProjectionError::UnknownStage(other.to_string())), + } +} + +// ── Reverse projection: PipelineItem → stage dir string ───────────────────── + +impl PipelineItem { + /// Convert back to the loose fields that the CRDT write path expects. + /// Returns `(stage_dir, blocked)`. + pub fn to_crdt_fields(&self) -> (&'static str, bool) { + let dir = stage_dir_name(&self.stage); + let blocked = matches!( + self.stage, + Stage::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + } + ); + (dir, blocked) + } +} + +// ── Bridge to existing CRDT reads ─────────────────────────────────────────── + +/// Read all pipeline items from the CRDT and project them into typed enums. +/// +/// Items that fail projection (e.g. unknown stage strings from a future +/// version) are logged and skipped — they don't poison the entire read. +pub fn read_all_typed() -> Vec { + let Some(views) = crate::crdt_state::read_all_items() else { + return Vec::new(); + }; + views + .iter() + .filter_map(|v| match PipelineItem::try_from(v) { + Ok(item) => Some(item), + Err(e) => { + crate::slog!( + "[pipeline_state] projection error for '{}': {e}", + v.story_id + ); + None + } + }) + .collect() +} + +/// Read a single pipeline item by story_id and project it into the typed enum. +pub fn read_typed(story_id: &str) -> Result, ProjectionError> { + let Some(view) = crate::crdt_state::read_item(story_id) else { + return Ok(None); + }; + PipelineItem::try_from(&view).map(Some) +} + +// ── Subscriber stubs (real dispatch uses these as the interface) ───────────── +// +// These are ready to wire into the event bus but not yet connected to the +// actual subsystems. Suppress dead_code until consumers are migrated. + +#[allow(dead_code)] +pub struct MatrixBotSubscriber; +#[allow(dead_code)] +impl TransitionSubscriber for MatrixBotSubscriber { + fn name(&self) -> &'static str { + "matrix-bot" + } + fn on_transition(&self, f: &TransitionFired) { + crate::slog!( + "[pipeline/matrix-bot] #{}: {} → {}", + f.story_id, + stage_label(&f.before), + stage_label(&f.after) + ); + } +} + +#[allow(dead_code)] +pub struct FileRendererSubscriber; +#[allow(dead_code)] +impl TransitionSubscriber for FileRendererSubscriber { + fn name(&self) -> &'static str { + "filesystem" + } + fn on_transition(&self, f: &TransitionFired) { + crate::slog!( + "[pipeline/filesystem] re-rendering work/{}/{}", + stage_dir_name(&f.after), + f.story_id + ); + } +} + +#[allow(dead_code)] +pub struct PipelineItemsSubscriber; +#[allow(dead_code)] +impl TransitionSubscriber for PipelineItemsSubscriber { + fn name(&self) -> &'static str { + "pipeline-items" + } + fn on_transition(&self, f: &TransitionFired) { + crate::slog!( + "[pipeline/items] UPDATE stage = '{}' WHERE id = '{}'", + stage_dir_name(&f.after), + f.story_id + ); + } +} + +#[allow(dead_code)] +pub struct AutoAssignSubscriber; +#[allow(dead_code)] +impl TransitionSubscriber for AutoAssignSubscriber { + fn name(&self) -> &'static str { + "auto-assign" + } + fn on_transition(&self, f: &TransitionFired) { + if matches!(f.after, Stage::Done { .. } | Stage::Archived { .. }) { + crate::slog!( + "[pipeline/auto-assign] story {} reached {}; checking for promotable backlog items", + f.story_id, + stage_label(&f.after) + ); + } + } +} + +#[allow(dead_code)] +pub struct WebUiBroadcastSubscriber; +#[allow(dead_code)] +impl TransitionSubscriber for WebUiBroadcastSubscriber { + fn name(&self) -> &'static str { + "web-ui-broadcast" + } + fn on_transition(&self, f: &TransitionFired) { + crate::slog!( + "[pipeline/web-ui] broadcasting #{} transition to connected clients", + f.story_id + ); + } +} + +// ── Tests ─────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + fn nz(n: u32) -> NonZeroU32 { + NonZeroU32::new(n).unwrap() + } + fn fb(name: &str) -> BranchName { + BranchName(name.to_string()) + } + fn sha(s: &str) -> GitSha { + GitSha(s.to_string()) + } + fn sid(s: &str) -> StoryId { + StoryId(s.to_string()) + } + + // ── Happy path transitions ────────────────────────────────────────── + + #[test] + fn happy_path_backlog_through_archived() { + let s = Stage::Backlog; + let s = transition(s, PipelineEvent::DepsMet).unwrap(); + assert!(matches!(s, Stage::Coding)); + + let s = transition( + s, + PipelineEvent::QaSkipped { + feature_branch: fb("feature/story-1"), + commits_ahead: nz(3), + }, + ) + .unwrap(); + assert!(matches!(s, Stage::Merge { .. })); + + let s = transition( + s, + PipelineEvent::MergeSucceeded { + merge_commit: sha("abc123"), + }, + ) + .unwrap(); + assert!(matches!(s, Stage::Done { .. })); + + let s = transition(s, PipelineEvent::Accepted).unwrap(); + assert!(matches!( + s, + Stage::Archived { + reason: ArchiveReason::Completed, + .. + } + )); + } + + #[test] + fn happy_path_with_qa() { + let s = Stage::Coding; + let s = transition(s, PipelineEvent::GatesStarted).unwrap(); + assert!(matches!(s, Stage::Qa)); + + let s = transition( + s, + PipelineEvent::GatesPassed { + feature_branch: fb("feature/story-2"), + commits_ahead: nz(5), + }, + ) + .unwrap(); + assert!(matches!(s, Stage::Merge { .. })); + } + + #[test] + fn qa_retry_loop() { + let s = Stage::Coding; + let s = transition(s, PipelineEvent::GatesStarted).unwrap(); + assert!(matches!(s, Stage::Qa)); + + let s = transition( + s, + PipelineEvent::GatesFailed { + reason: "tests failed".into(), + }, + ) + .unwrap(); + assert!(matches!(s, Stage::Coding)); + } + + // ── Bug 519: Merge with zero commits is unrepresentable ───────────── + + #[test] + fn merge_with_zero_commits_is_unrepresentable() { + assert!(NonZeroU32::new(0).is_none()); + } + + // ── Invalid transitions ───────────────────────────────────────────── + + #[test] + fn cannot_jump_backlog_to_done() { + let result = transition(Stage::Backlog, PipelineEvent::Accepted); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); + } + + #[test] + fn cannot_unblock_done_story() { + let s = Stage::Done { + merged_at: Utc::now(), + merge_commit: sha("abc"), + }; + let result = transition(s, PipelineEvent::Unblock); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); + } + + #[test] + fn cannot_unblock_review_held_story() { + let s = Stage::Archived { + archived_at: Utc::now(), + reason: ArchiveReason::ReviewHeld { + reason: "TBD".into(), + }, + }; + let result = transition(s, PipelineEvent::Unblock); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); + } + + #[test] + fn cannot_merge_from_backlog() { + let result = transition( + Stage::Backlog, + PipelineEvent::MergeSucceeded { + merge_commit: sha("abc"), + }, + ); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); + } + + #[test] + fn cannot_start_gates_from_backlog() { + let result = transition(Stage::Backlog, PipelineEvent::GatesStarted); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); + } + + #[test] + fn cannot_accept_from_coding() { + let result = transition(Stage::Coding, PipelineEvent::Accepted); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); + } + + // ── Block from any active stage ───────────────────────────────────── + + #[test] + fn block_from_any_active_stage() { + for s in [Stage::Backlog, Stage::Coding, Stage::Qa] { + let result = transition( + s.clone(), + PipelineEvent::Block { + reason: "stuck".into(), + }, + ); + assert!(matches!( + result, + Ok(Stage::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + }) + )); + } + + let m = Stage::Merge { + feature_branch: fb("f"), + commits_ahead: nz(1), + }; + let result = transition( + m, + PipelineEvent::Block { + reason: "stuck".into(), + }, + ); + assert!(matches!( + result, + Ok(Stage::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + }) + )); + } + + #[test] + fn unblock_returns_to_backlog() { + let s = Stage::Archived { + archived_at: Utc::now(), + reason: ArchiveReason::Blocked { + reason: "test".into(), + }, + }; + let result = transition(s, PipelineEvent::Unblock).unwrap(); + assert!(matches!(result, Stage::Backlog)); + } + + // ── Abandon / supersede ───────────────────────────────────────────── + + #[test] + fn abandon_from_any_active_or_done() { + for s in [ + Stage::Backlog, + Stage::Coding, + Stage::Qa, + Stage::Done { + merged_at: Utc::now(), + merge_commit: sha("x"), + }, + ] { + let result = transition(s, PipelineEvent::Abandon); + assert!(matches!( + result, + Ok(Stage::Archived { + reason: ArchiveReason::Abandoned, + .. + }) + )); + } + } + + #[test] + fn supersede_from_any_active_or_done() { + for s in [ + Stage::Backlog, + Stage::Coding, + Stage::Qa, + Stage::Done { + merged_at: Utc::now(), + merge_commit: sha("x"), + }, + ] { + let result = transition( + s, + PipelineEvent::Supersede { + by: sid("999_story_new"), + }, + ); + assert!(matches!( + result, + Ok(Stage::Archived { + reason: ArchiveReason::Superseded { .. }, + .. + }) + )); + } + } + + // ── Review hold ───────────────────────────────────────────────────── + + #[test] + fn review_hold_from_active_stages() { + for s in [Stage::Backlog, Stage::Coding, Stage::Qa] { + let result = transition( + s.clone(), + PipelineEvent::ReviewHold { + reason: "review".into(), + }, + ); + assert!(matches!( + result, + Ok(Stage::Archived { + reason: ArchiveReason::ReviewHeld { .. }, + .. + }) + )); + } + } + + // ── Merge failed final ────────────────────────────────────────────── + + #[test] + fn merge_failed_final() { + let s = Stage::Merge { + feature_branch: fb("f"), + commits_ahead: nz(1), + }; + let result = transition( + s, + PipelineEvent::MergeFailedFinal { + reason: "conflicts".into(), + }, + ) + .unwrap(); + assert!(matches!( + result, + Stage::Archived { + reason: ArchiveReason::MergeFailed { .. }, + .. + } + )); + } + + #[test] + fn merge_failed_only_from_merge() { + let result = transition( + Stage::Coding, + PipelineEvent::MergeFailedFinal { + reason: "conflicts".into(), + }, + ); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); + } + + // ── Execution state machine ───────────────────────────────────────── + + #[test] + fn execution_happy_path() { + let e = ExecutionState::Idle; + let e = execution_transition( + e, + ExecutionEvent::SpawnRequested { + agent: AgentName("coder-1".into()), + }, + ) + .unwrap(); + assert!(matches!(e, ExecutionState::Pending { .. })); + + let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap(); + assert!(matches!(e, ExecutionState::Running { .. })); + + let e = execution_transition(e, ExecutionEvent::Heartbeat).unwrap(); + assert!(matches!(e, ExecutionState::Running { .. })); + + let e = execution_transition(e, ExecutionEvent::Exited { exit_code: 0 }).unwrap(); + assert!(matches!( + e, + ExecutionState::Completed { exit_code: 0, .. } + )); + } + + #[test] + fn execution_rate_limit_then_resume() { + let e = ExecutionState::Running { + agent: AgentName("coder-1".into()), + started_at: Utc::now(), + last_heartbeat: Utc::now(), + }; + let e = execution_transition( + e, + ExecutionEvent::HitRateLimit { + resume_at: Utc::now() + chrono::Duration::minutes(5), + }, + ) + .unwrap(); + assert!(matches!(e, ExecutionState::RateLimited { .. })); + + let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap(); + assert!(matches!(e, ExecutionState::Running { .. })); + } + + #[test] + fn execution_stop_from_anywhere() { + let e = ExecutionState::Running { + agent: AgentName("coder-1".into()), + started_at: Utc::now(), + last_heartbeat: Utc::now(), + }; + let e = execution_transition(e, ExecutionEvent::Stopped).unwrap(); + assert!(matches!(e, ExecutionState::Idle)); + } + + // ── Projection tests ──────────────────────────────────────────────── + + #[test] + fn project_backlog_item() { + let view = PipelineItemView { + story_id: "42_story_test".to_string(), + stage: "1_backlog".to_string(), + name: Some("Test Story".to_string()), + agent: None, + retry_count: None, + blocked: None, + depends_on: Some(vec![10, 20]), + }; + let item = PipelineItem::try_from(&view).unwrap(); + assert_eq!(item.story_id, StoryId("42_story_test".to_string())); + assert_eq!(item.name, "Test Story"); + assert!(matches!(item.stage, Stage::Backlog)); + assert_eq!(item.depends_on.len(), 2); + assert_eq!(item.retry_count, 0); + } + + #[test] + fn project_current_item() { + let view = PipelineItemView { + story_id: "42_story_test".to_string(), + stage: "2_current".to_string(), + name: Some("Test".to_string()), + agent: Some("coder-1".to_string()), + retry_count: Some(2), + blocked: None, + depends_on: None, + }; + let item = PipelineItem::try_from(&view).unwrap(); + assert!(matches!(item.stage, Stage::Coding)); + assert_eq!(item.retry_count, 2); + } + + #[test] + fn project_merge_item() { + let view = PipelineItemView { + story_id: "42_story_test".to_string(), + stage: "4_merge".to_string(), + name: Some("Test".to_string()), + agent: None, + retry_count: None, + blocked: None, + depends_on: None, + }; + let item = PipelineItem::try_from(&view).unwrap(); + assert!(matches!(item.stage, Stage::Merge { .. })); + if let Stage::Merge { + feature_branch, + commits_ahead, + } = &item.stage + { + assert_eq!(feature_branch.0, "feature/story-42_story_test"); + assert_eq!(commits_ahead.get(), 1); + } + } + + #[test] + fn project_archived_blocked_item() { + let view = PipelineItemView { + story_id: "42_story_test".to_string(), + stage: "6_archived".to_string(), + name: Some("Test".to_string()), + agent: None, + retry_count: None, + blocked: Some(true), + depends_on: None, + }; + let item = PipelineItem::try_from(&view).unwrap(); + assert!(matches!( + item.stage, + Stage::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + } + )); + } + + #[test] + fn project_archived_completed_item() { + let view = PipelineItemView { + story_id: "42_story_test".to_string(), + stage: "6_archived".to_string(), + name: Some("Test".to_string()), + agent: None, + retry_count: None, + blocked: Some(false), + depends_on: None, + }; + let item = PipelineItem::try_from(&view).unwrap(); + assert!(matches!( + item.stage, + Stage::Archived { + reason: ArchiveReason::Completed, + .. + } + )); + } + + #[test] + fn project_unknown_stage_returns_error() { + let view = PipelineItemView { + story_id: "42_story_test".to_string(), + stage: "9_invalid".to_string(), + name: Some("Test".to_string()), + agent: None, + retry_count: None, + blocked: None, + depends_on: None, + }; + let result = PipelineItem::try_from(&view); + assert!(matches!( + result, + Err(ProjectionError::UnknownStage(s)) if s == "9_invalid" + )); + } + + // ── Reverse projection tests ──────────────────────────────────────── + + #[test] + fn reverse_projection_stage_dirs() { + let cases: Vec<(Stage, &str, bool)> = vec![ + (Stage::Backlog, "1_backlog", false), + (Stage::Coding, "2_current", false), + (Stage::Qa, "3_qa", false), + ( + Stage::Merge { + feature_branch: fb("f"), + commits_ahead: nz(1), + }, + "4_merge", + false, + ), + ( + Stage::Done { + merged_at: Utc::now(), + merge_commit: sha("abc"), + }, + "5_done", + false, + ), + ( + Stage::Archived { + archived_at: Utc::now(), + reason: ArchiveReason::Completed, + }, + "6_archived", + false, + ), + ( + Stage::Archived { + archived_at: Utc::now(), + reason: ArchiveReason::Blocked { + reason: "stuck".into(), + }, + }, + "6_archived", + true, + ), + ]; + + for (stage, expected_dir, expected_blocked) in cases { + let item = PipelineItem { + story_id: StoryId("test".into()), + name: "test".into(), + stage, + depends_on: vec![], + retry_count: 0, + }; + let (dir, blocked) = item.to_crdt_fields(); + assert_eq!(dir, expected_dir); + assert_eq!(blocked, expected_blocked); + } + } + + // ── Event bus tests ───────────────────────────────────────────────── + + #[test] + fn event_bus_fires_to_all_subscribers() { + use std::sync::atomic::{AtomicU32, Ordering}; + use std::sync::Arc; + + struct CountingSub(Arc); + impl TransitionSubscriber for CountingSub { + fn name(&self) -> &'static str { + "counter" + } + fn on_transition(&self, _: &TransitionFired) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + + let counter = Arc::new(AtomicU32::new(0)); + let mut bus = EventBus::new(); + bus.subscribe(CountingSub(counter.clone())); + bus.subscribe(CountingSub(counter.clone())); + + bus.fire(TransitionFired { + story_id: StoryId("test".into()), + before: Stage::Backlog, + after: Stage::Coding, + event: PipelineEvent::DepsMet, + at: Utc::now(), + }); + + assert_eq!(counter.load(Ordering::SeqCst), 2); + } + + // ── Bug 502 regression: agent field is not part of Stage ──────────── + + #[test] + fn bug_502_agent_not_in_stage() { + // Bug 502 was caused by a coder agent being assigned to a story in + // Merge stage. In the typed system, Stage has no `agent` field at all. + // Agent assignment is per-node ExecutionState. This test documents that + // the old failure mode is structurally impossible. + let merge = Stage::Merge { + feature_branch: BranchName("feature/story-1".into()), + commits_ahead: NonZeroU32::new(3).unwrap(), + }; + // Stage::Merge has exactly two fields: feature_branch and commits_ahead. + // There is no way to attach an agent name to it. The type system + // prevents bug 502 by construction. + assert!(matches!(merge, Stage::Merge { .. })); + } + + // ── TransitionError Display ───────────────────────────────────────── + + #[test] + fn transition_error_display() { + let err = TransitionError::InvalidTransition { + from_stage: "Backlog".into(), + event: "Accepted".into(), + }; + assert_eq!( + err.to_string(), + "invalid transition: Backlog + Accepted" + ); + } + + // ── ProjectionError Display ───────────────────────────────────────── + + #[test] + fn projection_error_display() { + let err = ProjectionError::UnknownStage("9_invalid".into()); + assert_eq!(err.to_string(), "unknown stage: \"9_invalid\""); + + let err = ProjectionError::MissingField("story_id"); + assert_eq!(err.to_string(), "missing required field: story_id"); + } +}