diff --git a/server/src/agent_mode.rs b/server/src/agent_mode.rs index 12d7a6af..fcde901b 100644 --- a/server/src/agent_mode.rs +++ b/server/src/agent_mode.rs @@ -144,7 +144,8 @@ pub async fn run( if let Some(mut crdt_rx) = crdt_state::subscribe() { tokio::spawn(async move { while let Ok(evt) = crdt_rx.recv().await { - if evt.to_stage == "6_archived" + if crate::pipeline_state::Stage::from_dir(&evt.to_stage) + .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Archived { .. })) && let Some(root) = crdt_prune_root.as_ref().cloned() { let story_id = evt.story_id.clone(); @@ -179,7 +180,8 @@ pub async fn run( let mut rx = auto_rx; while let Ok(event) = rx.recv().await { if let watcher::WatcherEvent::WorkItem { ref stage, .. } = event - && matches!(stage.as_str(), "2_current" | "3_qa" | "4_merge") + && crate::pipeline_state::Stage::from_dir(stage.as_str()) + .is_some_and(|s| s.is_active()) { slog!("[agent-mode] CRDT transition in {stage}/; triggering auto-assign."); auto_agents.auto_assign_available_work(&auto_root).await; @@ -316,7 +318,7 @@ async fn scan_and_claim( for item in &items { // Only claim stories in active stages. - if !matches!(item.stage.as_str(), "2_current" | "3_qa" | "4_merge") { + if !crate::pipeline_state::Stage::from_dir(&item.stage).is_some_and(|s| s.is_active()) { continue; } @@ -425,7 +427,7 @@ fn reclaim_timed_out_work(_project_root: &Path) { let now = chrono::Utc::now().timestamp() as f64; for item in &items { - if !matches!(item.stage.as_str(), "2_current" | "3_qa" | "4_merge") { + if !crate::pipeline_state::Stage::from_dir(&item.stage).is_some_and(|s| s.is_active()) { continue; } diff --git a/server/src/agents/pool/auto_assign/reconcile.rs b/server/src/agents/pool/auto_assign/reconcile.rs index 89c61e12..f78eb2d2 100644 --- a/server/src/agents/pool/auto_assign/reconcile.rs +++ b/server/src/agents/pool/auto_assign/reconcile.rs @@ -3,6 +3,7 @@ use std::path::Path; use tokio::sync::broadcast; +use crate::pipeline_state::Stage; use crate::worktree; use super::super::super::ReconciliationEvent; @@ -52,20 +53,20 @@ impl AgentPool { let wt_path = wt_entry.path.clone(); // Determine which active stage the story is in. - let stage_dir = match find_active_story_stage(project_root, story_id) { + let stage = match find_active_story_stage(project_root, story_id) { Some(s) => s, None => continue, // Not in any active stage (backlog/archived or unknown). }; // 4_merge/ is left for auto_assign to handle with a fresh mergemaster. - if stage_dir == "4_merge" { + if matches!(stage, Stage::Merge { .. }) { continue; } let _ = progress_tx.send(ReconciliationEvent { story_id: story_id.clone(), status: "checking".to_string(), - message: format!("Checking for committed work in {stage_dir}/"), + message: format!("Checking for committed work in {}/", stage.dir_name()), }); // Check whether the worktree has commits ahead of the base branch. @@ -78,7 +79,8 @@ impl AgentPool { if !has_work { eprintln!( - "[startup:reconcile] No committed work for '{story_id}' in {stage_dir}/; skipping." + "[startup:reconcile] No committed work for '{story_id}' in {}/; skipping.", + stage.dir_name() ); let _ = progress_tx.send(ReconciliationEvent { story_id: story_id.clone(), @@ -89,7 +91,8 @@ impl AgentPool { } eprintln!( - "[startup:reconcile] Found committed work for '{story_id}' in {stage_dir}/. Running acceptance gates." + "[startup:reconcile] Found committed work for '{story_id}' in {}/. Running acceptance gates.", + stage.dir_name() ); let _ = progress_tx.send(ReconciliationEvent { story_id: story_id.clone(), @@ -130,7 +133,8 @@ impl AgentPool { if !gates_passed { eprintln!( "[startup:reconcile] Gates failed for '{story_id}': {gate_output}\n\ - Leaving in {stage_dir}/ for auto-assign to restart the agent." + Leaving in {}/ for auto-assign to restart the agent.", + stage.dir_name() ); let _ = progress_tx.send(ReconciliationEvent { story_id: story_id.clone(), @@ -140,9 +144,12 @@ impl AgentPool { continue; } - eprintln!("[startup:reconcile] Gates passed for '{story_id}' (stage: {stage_dir}/)."); + eprintln!( + "[startup:reconcile] Gates passed for '{story_id}' (stage: {}/).", + stage.dir_name() + ); - if stage_dir == "2_current" { + if matches!(stage, Stage::Coding) { // Coder stage — determine qa mode to decide next step. let qa_mode = { let item_type = crate::agents::lifecycle::item_type_from_id(story_id); @@ -232,7 +239,7 @@ impl AgentPool { } } } - } else if stage_dir == "3_qa" { + } else if matches!(stage, Stage::Qa) { // QA stage → run coverage gate before advancing to merge. let wt_path_for_cov = wt_path.clone(); let coverage_result = tokio::task::spawn_blocking(move || { diff --git a/server/src/agents/pool/pipeline/advance/mod.rs b/server/src/agents/pool/pipeline/advance/mod.rs index d8e50a4d..b4c3d187 100644 --- a/server/src/agents/pool/pipeline/advance/mod.rs +++ b/server/src/agents/pool/pipeline/advance/mod.rs @@ -342,17 +342,21 @@ impl AgentPool { // has already reached done or archived (e.g. a previous mergemaster // succeeded), this advance is a zombie — skip it entirely to avoid // phantom notifications and redundant post-merge test runs. - if let Ok(Some(typed_item)) = crate::pipeline_state::read_typed(story_id) { + if let Ok(Some(typed_item)) = crate::pipeline_state::read_typed(story_id) + && matches!( + typed_item.stage, + crate::pipeline_state::Stage::Done { .. } + | crate::pipeline_state::Stage::Archived { .. } + ) + { let current_dir = typed_item.stage.dir_name(); - if current_dir == "5_done" || current_dir == "6_archived" { - slog!( - "[pipeline] Skipping stale mergemaster advance for '{story_id}': \ - story is already in work/{current_dir}/" - ); - // Skip pipeline advancement — do not run post-merge tests, - // do not emit notifications, do not restart agents. - return; - } + slog!( + "[pipeline] Skipping stale mergemaster advance for '{story_id}': \ + story is already in work/{current_dir}/" + ); + // Skip pipeline advancement — do not run post-merge tests, + // do not emit notifications, do not restart agents. + return; } // Block advancement if the mergemaster explicitly reported a failure. diff --git a/server/src/agents/pool/start/validation.rs b/server/src/agents/pool/start/validation.rs index d9dc5e08..eb800038 100644 --- a/server/src/agents/pool/start/validation.rs +++ b/server/src/agents/pool/start/validation.rs @@ -3,6 +3,7 @@ use std::path::Path; use crate::config::ProjectConfig; +use crate::pipeline_state::Stage; use super::super::super::{PipelineStage, agent_config_stage, pipeline_stage}; use super::super::worktree::find_active_story_stage; @@ -30,19 +31,20 @@ pub(super) fn validate_agent_stage( if agent_stage == PipelineStage::Other { return Ok(()); } - let Some(story_stage_dir) = find_active_story_stage(project_root, story_id) else { + let Some(story_stage) = find_active_story_stage(project_root, story_id) else { return Ok(()); }; - let expected_stage = match story_stage_dir { - "2_current" => PipelineStage::Coder, - "3_qa" => PipelineStage::Qa, - "4_merge" => PipelineStage::Mergemaster, + let expected_stage = match story_stage { + Stage::Coding => PipelineStage::Coder, + Stage::Qa => PipelineStage::Qa, + Stage::Merge { .. } => PipelineStage::Mergemaster, _ => PipelineStage::Other, }; if expected_stage != PipelineStage::Other && expected_stage != agent_stage { return Err(format!( "Agent '{name}' (stage: {agent_stage:?}) cannot be assigned to \ - story '{story_id}' in {story_stage_dir}/ (requires stage: {expected_stage:?})" + story '{story_id}' in {}/ (requires stage: {expected_stage:?})", + story_stage.dir_name() )); } Ok(()) diff --git a/server/src/agents/pool/worktree.rs b/server/src/agents/pool/worktree.rs index 9133689b..471468ae 100644 --- a/server/src/agents/pool/worktree.rs +++ b/server/src/agents/pool/worktree.rs @@ -21,16 +21,16 @@ impl AgentPool { } } -/// Return the active pipeline stage directory name for `story_id`, or `None` if the -/// story is not in any active stage (`2_current/`, `3_qa/`, `4_merge/`). +/// Return the active pipeline stage for `story_id`, or `None` if the story is not +/// in any active stage (`2_current/`, `3_qa/`, `4_merge/`). pub(super) fn find_active_story_stage( _project_root: &Path, story_id: &str, -) -> Option<&'static str> { +) -> Option { if let Ok(Some(item)) = crate::pipeline_state::read_typed(story_id) && item.stage.is_active() { - return Some(item.stage.dir_name()); + return Some(item.stage); } None } @@ -44,10 +44,10 @@ mod tests { crate::db::ensure_content_store(); crate::db::write_item_with_content("10_story_test", "2_current", "---\nname: Test\n---\n"); let tmp = tempfile::tempdir().unwrap(); - assert_eq!( + assert!(matches!( find_active_story_stage(tmp.path(), "10_story_test"), - Some("2_current") - ); + Some(crate::pipeline_state::Stage::Coding) + )); } #[test] @@ -55,10 +55,10 @@ mod tests { crate::db::ensure_content_store(); crate::db::write_item_with_content("11_story_test", "3_qa", "---\nname: Test\n---\n"); let tmp = tempfile::tempdir().unwrap(); - assert_eq!( + assert!(matches!( find_active_story_stage(tmp.path(), "11_story_test"), - Some("3_qa") - ); + Some(crate::pipeline_state::Stage::Qa) + )); } #[test] @@ -66,10 +66,10 @@ mod tests { crate::db::ensure_content_store(); crate::db::write_item_with_content("12_story_test", "4_merge", "---\nname: Test\n---\n"); let tmp = tempfile::tempdir().unwrap(); - assert_eq!( + assert!(matches!( find_active_story_stage(tmp.path(), "12_story_test"), - Some("4_merge") - ); + Some(crate::pipeline_state::Stage::Merge { .. }) + )); } #[test] diff --git a/server/src/chat/transport/matrix/delete.rs b/server/src/chat/transport/matrix/delete.rs index 8aa81286..af78db2f 100644 --- a/server/src/chat/transport/matrix/delete.rs +++ b/server/src/chat/transport/matrix/delete.rs @@ -119,14 +119,15 @@ pub async fn handle_delete( /// Human-readable label for a pipeline stage directory name. fn stage_display_name(stage: &str) -> &str { - match stage { - "1_backlog" => "backlog", - "2_current" => "in-progress", - "3_qa" => "QA", - "4_merge" => "merge", - "5_done" => "done", - "6_archived" => "archived", - other => other, + use crate::pipeline_state::Stage; + match Stage::from_dir(stage) { + Some(Stage::Backlog) => "backlog", + Some(Stage::Coding) => "in-progress", + Some(Stage::Qa) => "QA", + Some(Stage::Merge { .. }) => "merge", + Some(Stage::Done { .. }) => "done", + Some(Stage::Archived { .. }) => "archived", + None => stage, } } diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index 85ed0c8b..6f83cd57 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -335,15 +335,12 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option bool { + use crate::pipeline_state::{Stage, read_all_typed}; let prefix = format!("{dep_number}_"); - if let Some(items) = read_all_items() { - items.iter().any(|item| { - item.story_id.starts_with(&prefix) - && matches!(item.stage.as_str(), "5_done" | "6_archived") - }) - } else { - false - } + read_all_typed().into_iter().any(|item| { + item.story_id.0.starts_with(&prefix) + && matches!(item.stage, Stage::Done { .. } | Stage::Archived { .. }) + }) } /// Check whether a dependency (by numeric ID prefix) is specifically in `6_archived` @@ -352,14 +349,11 @@ pub fn dep_is_done_crdt(dep_number: u32) -> bool { /// Used to detect when a dependency is satisfied via archive rather than via a clean /// completion through `5_done`. Returns `false` when the CRDT layer is not initialised. pub fn dep_is_archived_crdt(dep_number: u32) -> bool { + use crate::pipeline_state::{Stage, read_all_typed}; let prefix = format!("{dep_number}_"); - if let Some(items) = read_all_items() { - items - .iter() - .any(|item| item.story_id.starts_with(&prefix) && item.stage == "6_archived") - } else { - false - } + read_all_typed().into_iter().any(|item| { + item.story_id.0.starts_with(&prefix) && matches!(item.stage, Stage::Archived { .. }) + }) } /// Check unmet dependencies for a story by reading its `depends_on` from the diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 5a17708c..cd46a08b 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -247,7 +247,9 @@ pub fn write_item_with_content(story_id: &str, stage: &str, content: &str) { write_content(story_id, content); // Primary: CRDT ops. - let merged_at_ts = if stage == "5_done" { + let merged_at_ts = if crate::pipeline_state::Stage::from_dir(stage) + .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. })) + { Some(chrono::Utc::now().timestamp() as f64) } else { None @@ -321,7 +323,9 @@ pub fn move_item_stage( .unwrap_or((None, None, None, None, None)); // CRDT stage transition. - let merged_at_ts = if new_stage == "5_done" { + let merged_at_ts = if crate::pipeline_state::Stage::from_dir(new_stage) + .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. })) + { Some(chrono::Utc::now().timestamp() as f64) } else { None diff --git a/server/src/http/mcp/merge_tools.rs b/server/src/http/mcp/merge_tools.rs index 4aefd182..c4f8aef1 100644 --- a/server/src/http/mcp/merge_tools.rs +++ b/server/src/http/mcp/merge_tools.rs @@ -18,7 +18,13 @@ pub(super) async fn tool_merge_agent_work( // Check CRDT stage before attempting merge — if already done or archived, // return success immediately to avoid spurious error notifications. if let Some(item) = crate::crdt_state::read_item(story_id) - && (item.stage == "5_done" || item.stage == "6_archived") + && crate::pipeline_state::Stage::from_dir(&item.stage).is_some_and(|s| { + matches!( + s, + crate::pipeline_state::Stage::Done { .. } + | crate::pipeline_state::Stage::Archived { .. } + ) + }) { return serde_json::to_string_pretty(&json!({ "story_id": story_id, diff --git a/server/src/io/watcher.rs b/server/src/io/watcher.rs index 4a4d025c..570199bb 100644 --- a/server/src/io/watcher.rs +++ b/server/src/io/watcher.rs @@ -106,16 +106,16 @@ pub fn is_config_file(path: &Path, git_root: &Path) -> bool { /// Used by the CRDT-to-watcher bridge (in `main.rs`) to derive the action and /// commit message for `WatcherEvent::WorkItem` events. pub fn stage_metadata(stage: &str, item_id: &str) -> Option<(&'static str, String)> { - let (action, prefix) = match stage { - "1_backlog" => ("create", format!("huskies: create {item_id}")), - "2_current" => ("start", format!("huskies: start {item_id}")), - "3_qa" => ("qa", format!("huskies: queue {item_id} for QA")), - "4_merge" => ("merge", format!("huskies: queue {item_id} for merge")), - "5_done" => ("done", format!("huskies: done {item_id}")), - "6_archived" => ("accept", format!("huskies: accept {item_id}")), - _ => return None, + use crate::pipeline_state::Stage; + let (action, msg) = match Stage::from_dir(stage)? { + Stage::Backlog => ("create", format!("huskies: create {item_id}")), + Stage::Coding => ("start", format!("huskies: start {item_id}")), + Stage::Qa => ("qa", format!("huskies: queue {item_id} for QA")), + Stage::Merge { .. } => ("merge", format!("huskies: queue {item_id} for merge")), + Stage::Done { .. } => ("done", format!("huskies: done {item_id}")), + Stage::Archived { .. } => ("accept", format!("huskies: accept {item_id}")), }; - Some((action, prefix)) + Some((action, msg)) } /// Return the pipeline stage name for a path if it is a `.md` file living diff --git a/server/src/main.rs b/server/src/main.rs index ed3ce2c7..e9c77fcf 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -352,7 +352,8 @@ async fn main() -> Result<(), std::io::Error> { tokio::spawn(async move { while let Ok(evt) = crdt_rx.recv().await { // Prune the worktree when a story is archived. - if evt.to_stage == "6_archived" + if crate::pipeline_state::Stage::from_dir(&evt.to_stage) + .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Archived { .. })) && let Some(root) = crdt_prune_root.as_ref().cloned() { let story_id = evt.story_id.clone(); @@ -389,7 +390,8 @@ async fn main() -> Result<(), std::io::Error> { let mut rx = watcher_auto_rx; while let Ok(event) = rx.recv().await { if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event - && matches!(stage.as_str(), "2_current" | "3_qa" | "4_merge") + && crate::pipeline_state::Stage::from_dir(stage.as_str()) + .is_some_and(|s| s.is_active()) { slog!( "[auto-assign] CRDT transition detected in {stage}/; \ diff --git a/server/src/pipeline_state/mod.rs b/server/src/pipeline_state/mod.rs index b0f41b66..ebada0b4 100644 --- a/server/src/pipeline_state/mod.rs +++ b/server/src/pipeline_state/mod.rs @@ -12,17 +12,15 @@ //! event bus are fully defined and tested here. Consumers will be migrated to //! the typed API incrementally in follow-up stories. -// Foundation module — all items are exercised by tests but not yet called from -// non-test code. The dead_code lint is suppressed until consumer migration. -#![allow(unused_imports, dead_code)] +// Some items are exercised by tests or used only in non-active code paths; +// the dead_code lint is suppressed for the module. +#![allow(dead_code)] use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::fmt; use std::num::NonZeroU32; -use crate::crdt_state::PipelineItemView; - // ── Newtypes ──────────────────────────────────────────────────────────────── #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -134,6 +132,35 @@ impl Stage { } ) } + + /// Parse a stage from its filesystem directory name. + /// + /// This is the single canonical conversion boundary for turning a loose + /// stage-directory string (from CRDT fields or watcher events) into a + /// typed `Stage`. Rich variants (`Done`, `Archived`, `Merge`) are + /// synthesised with zero-value fields — callers should use this only for + /// stage *classification* (e.g. `is_active()`, `matches!`), not for + /// accessing the rich metadata fields. + pub fn from_dir(s: &str) -> Option { + match s { + "1_backlog" => Some(Stage::Backlog), + "2_current" => Some(Stage::Coding), + "3_qa" => Some(Stage::Qa), + "4_merge" => Some(Stage::Merge { + feature_branch: BranchName(String::new()), + commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), + }), + "5_done" => Some(Stage::Done { + merged_at: DateTime::::UNIX_EPOCH, + merge_commit: GitSha(String::new()), + }), + "6_archived" => Some(Stage::Archived { + archived_at: DateTime::::UNIX_EPOCH, + reason: ArchiveReason::Completed, + }), + _ => None, + } + } } // ── Per-node execution state ──────────────────────────────────────────────── @@ -464,8 +491,12 @@ mod events; mod projection; mod subscribers; +#[allow(unused_imports)] pub use events::{EventBus, TransitionFired, TransitionSubscriber}; -pub use projection::{ProjectionError, project_stage, read_all_typed, read_typed}; +#[allow(unused_imports)] +pub use projection::{ProjectionError, project_stage}; +pub use projection::{read_all_typed, read_typed}; +#[allow(unused_imports)] pub use subscribers::{ AutoAssignSubscriber, FileRendererSubscriber, MatrixBotSubscriber, PipelineItemsSubscriber, WebUiBroadcastSubscriber, diff --git a/server/src/pipeline_state/subscribers.rs b/server/src/pipeline_state/subscribers.rs index bef0ac46..9acd7eb8 100644 --- a/server/src/pipeline_state/subscribers.rs +++ b/server/src/pipeline_state/subscribers.rs @@ -2,6 +2,7 @@ use super::Stage; use super::events::{TransitionFired, TransitionSubscriber}; +#[allow(unused_imports)] use super::{event_label, stage_dir_name, stage_label}; // ── Subscriber stubs (real dispatch uses these as the interface) ───────────── diff --git a/server/src/service/notifications/format.rs b/server/src/service/notifications/format.rs index d87887dc..c75f6b1b 100644 --- a/server/src/service/notifications/format.rs +++ b/server/src/service/notifications/format.rs @@ -8,14 +8,15 @@ use crate::service::common::item_id::extract_item_number; /// Human-readable display name for a pipeline stage directory. pub fn stage_display_name(stage: &str) -> &'static str { - match stage { - "1_backlog" => "Backlog", - "2_current" => "Current", - "3_qa" => "QA", - "4_merge" => "Merge", - "5_done" => "Done", - "6_archived" => "Archived", - _ => "Unknown", + use crate::pipeline_state::Stage; + match Stage::from_dir(stage) { + Some(Stage::Backlog) => "Backlog", + Some(Stage::Coding) => "Current", + Some(Stage::Qa) => "QA", + Some(Stage::Merge { .. }) => "Merge", + Some(Stage::Done { .. }) => "Done", + Some(Stage::Archived { .. }) => "Archived", + None => "Unknown", } } diff --git a/server/src/service/story/front_matter.rs b/server/src/service/story/front_matter.rs index be0ee24c..04690dbd 100644 --- a/server/src/service/story/front_matter.rs +++ b/server/src/service/story/front_matter.rs @@ -8,23 +8,26 @@ /// /// Valid stage names match the `.huskies/work/N_name/` directory scheme. pub fn is_valid_stage(stage: &str) -> bool { - matches!( - stage, - "1_backlog" | "2_current" | "3_qa" | "4_merge" | "5_done" | "6_archived" - ) + crate::pipeline_state::Stage::from_dir(stage).is_some() } #[allow(dead_code)] /// Map a human-readable stage alias (e.g. `"backlog"`) to its directory name /// (e.g. `"1_backlog"`). Returns `None` for unrecognised aliases. pub fn stage_alias_to_dir(alias: &str) -> Option<&'static str> { + use crate::pipeline_state::Stage; + // Canonical directory names (e.g. "1_backlog") round-trip through the typed enum. + if let Some(stage) = Stage::from_dir(alias) { + return Some(stage.dir_name()); + } + // Short human-readable aliases (user-facing input normalization). match alias { - "backlog" | "1_backlog" => Some("1_backlog"), - "current" | "2_current" => Some("2_current"), - "qa" | "3_qa" => Some("3_qa"), - "merge" | "4_merge" => Some("4_merge"), - "done" | "5_done" => Some("5_done"), - "archived" | "6_archived" => Some("6_archived"), + "backlog" => Some("1_backlog"), + "current" => Some("2_current"), + "qa" => Some("3_qa"), + "merge" => Some("4_merge"), + "done" => Some("5_done"), + "archived" => Some("6_archived"), _ => None, } }