diff --git a/server/src/agent_mode/mod.rs b/server/src/agent_mode/mod.rs index 0b741f97..89af8d9c 100644 --- a/server/src/agent_mode/mod.rs +++ b/server/src/agent_mode/mod.rs @@ -95,8 +95,7 @@ pub async fn run( if let Some(mut crdt_rx) = crdt_state::subscribe() { tokio::spawn(async move { while let Ok(evt) = crdt_rx.recv().await { - if crate::pipeline_state::Stage::from_dir(&evt.to_stage) - .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Archived { .. })) + if matches!(evt.to_stage, crate::pipeline_state::Stage::Archived { .. }) && let Some(root) = crdt_prune_root.as_ref().cloned() { let story_id = evt.story_id.clone(); @@ -108,14 +107,13 @@ pub async fn run( }); } let (action, commit_msg) = - watcher::stage_metadata(&evt.to_stage, &evt.story_id) - .unwrap_or(("update", format!("huskies: update {}", evt.story_id))); + watcher::stage_metadata(&evt.to_stage, &evt.story_id); let watcher_evt = watcher::WatcherEvent::WorkItem { - stage: evt.to_stage, + stage: evt.to_stage.dir_name().to_string(), item_id: evt.story_id, action: action.to_string(), commit_msg, - from_stage: evt.from_stage, + from_stage: evt.from_stage.map(|s| s.dir_name().to_string()), }; let _ = crdt_watcher_tx.send(watcher_evt); } diff --git a/server/src/agents/pool/auto_assign/auto_assign.rs b/server/src/agents/pool/auto_assign/auto_assign.rs index eb919a40..8fb3d64a 100644 --- a/server/src/agents/pool/auto_assign/auto_assign.rs +++ b/server/src/agents/pool/auto_assign/auto_assign.rs @@ -16,7 +16,7 @@ impl AgentPool { /// 3. Trigger server-side merges (or auto-spawn mergemaster) for `4_merge/`. pub async fn auto_assign_available_work(&self, project_root: &Path) { // Promote any backlog stories whose dependencies are all done. - self.promote_ready_backlog_stories(project_root); + self.promote_ready_backlog_stories(); let config = match ProjectConfig::load(project_root) { Ok(c) => c, diff --git a/server/src/agents/pool/auto_assign/backlog.rs b/server/src/agents/pool/auto_assign/backlog.rs index 995a1616..3a217d2e 100644 --- a/server/src/agents/pool/auto_assign/backlog.rs +++ b/server/src/agents/pool/auto_assign/backlog.rs @@ -1,7 +1,6 @@ //! Backlog promotion: scan `1_backlog/` and promote stories whose `depends_on` are all met. -use std::path::Path; - +use crate::pipeline_state::Stage; use crate::slog; use crate::slog_warn; @@ -23,8 +22,8 @@ impl AgentPool { /// was abandoned/superseded before the dependent existed), a prominent warning is /// logged so the user can see the promotion was triggered by an archived dep, not /// a clean completion. - pub(super) fn promote_ready_backlog_stories(&self, project_root: &Path) { - let items = scan_stage_items(project_root, "1_backlog"); + pub(super) fn promote_ready_backlog_stories(&self) { + let items = scan_stage_items(&Stage::Backlog); for story_id in &items { // Only promote stories that explicitly declare dependencies // (story 929: read from the CRDT register, not YAML). @@ -35,11 +34,11 @@ impl AgentPool { continue; } // Check whether any dependencies are still unmet. - if has_unmet_dependencies(project_root, "1_backlog", story_id) { + if has_unmet_dependencies(story_id) { continue; } // Warn if any deps were satisfied via archive rather than via clean done. - let archived_deps = check_archived_dependencies(project_root, "1_backlog", story_id); + let archived_deps = check_archived_dependencies(story_id); if !archived_deps.is_empty() { slog_warn!( "[auto-assign] Story '{story_id}' is being promoted because deps \ diff --git a/server/src/agents/pool/auto_assign/merge.rs b/server/src/agents/pool/auto_assign/merge.rs index 8207331f..9a5d993b 100644 --- a/server/src/agents/pool/auto_assign/merge.rs +++ b/server/src/agents/pool/auto_assign/merge.rs @@ -1,8 +1,10 @@ //! Merge stage dispatch: trigger server-side merges and auto-spawn mergemaster for content conflicts. +use std::num::NonZeroU32; use std::path::Path; use crate::config::ProjectConfig; +use crate::pipeline_state::{BranchName, Stage}; use crate::slog; use crate::slog_error; use crate::slog_warn; @@ -34,22 +36,26 @@ impl AgentPool { // written to the CRDT and a notification is emitted; the story stays in // 4_merge/ until a human intervenes or an explicit `start_agent mergemaster` // call invokes the LLM-driven recovery path. - let merge_items = scan_stage_items(project_root, "4_merge"); + let merge_stage = Stage::Merge { + feature_branch: BranchName(String::new()), + commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), + }; + let merge_items = scan_stage_items(&merge_stage); for story_id in &merge_items { - if has_review_hold(project_root, "4_merge", story_id) { + if has_review_hold(story_id) { continue; } - if is_story_frozen(project_root, "4_merge", story_id) { + if is_story_frozen(story_id) { slog!("[auto-assign] Story '{story_id}' in 4_merge/ is frozen; skipping."); continue; } - if is_story_blocked(project_root, "4_merge", story_id) { + if is_story_blocked(story_id) { continue; } - if has_unmet_dependencies(project_root, "4_merge", story_id) { + if has_unmet_dependencies(story_id) { slog!("[auto-assign] Story '{story_id}' in 4_merge/ has unmet deps; skipping."); continue; } @@ -113,11 +119,14 @@ impl AgentPool { // Stories transition to 4_merge_failure when the server-side merge fails. // Content conflicts get one automatic mergemaster attempt; other failures // require human intervention. - let merge_failure_items = scan_stage_items(project_root, "4_merge_failure"); + let merge_failure_stage = Stage::MergeFailure { + reason: String::new(), + feature_branch: BranchName(String::new()), + commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), + }; + let merge_failure_items = scan_stage_items(&merge_failure_stage); for story_id in &merge_failure_items { - if has_content_conflict_failure(project_root, "4_merge_failure", story_id) - && !has_mergemaster_attempted(project_root, "4_merge_failure", story_id) - { + if has_content_conflict_failure(story_id) && !has_mergemaster_attempted(story_id) { let mergemaster_agent = { let agents = match self.agents.lock() { Ok(a) => a, diff --git a/server/src/agents/pool/auto_assign/pipeline.rs b/server/src/agents/pool/auto_assign/pipeline.rs index 5ae4d2cb..0aa534e5 100644 --- a/server/src/agents/pool/auto_assign/pipeline.rs +++ b/server/src/agents/pool/auto_assign/pipeline.rs @@ -3,6 +3,7 @@ use std::path::Path; use crate::config::ProjectConfig; +use crate::pipeline_state::Stage; use crate::slog; use crate::slog_error; @@ -25,13 +26,14 @@ impl AgentPool { /// guards. Agent front-matter preferences and stage-mismatch fallback are handled /// here as well. pub(super) async fn assign_pipeline_stages(&self, project_root: &Path, config: &ProjectConfig) { - let stages: [(&str, PipelineStage); 2] = [ - ("2_current", PipelineStage::Coder), - ("3_qa", PipelineStage::Qa), + let stages: [(Stage, PipelineStage); 2] = [ + (Stage::Coding, PipelineStage::Coder), + (Stage::Qa, PipelineStage::Qa), ]; - for (stage_dir, stage) in &stages { - let items = scan_stage_items(project_root, stage_dir); + for (pipeline_stage, stage) in &stages { + let stage_dir = pipeline_stage.dir_name(); + let items = scan_stage_items(pipeline_stage); if items.is_empty() { continue; } @@ -39,23 +41,23 @@ impl AgentPool { for story_id in &items { // Items marked with review_hold (e.g. spikes after QA passes) stay // in their current stage for human review — don't auto-assign agents. - if has_review_hold(project_root, stage_dir, story_id) { + if has_review_hold(story_id) { continue; } // Skip frozen stories — pipeline advancement is suspended. - if is_story_frozen(project_root, stage_dir, story_id) { + if is_story_frozen(story_id) { slog!("[auto-assign] Story '{story_id}' is frozen; skipping until unfrozen."); continue; } // Skip blocked stories (retry limit exceeded). - if is_story_blocked(project_root, stage_dir, story_id) { + if is_story_blocked(story_id) { continue; } // Skip stories whose dependencies haven't landed yet. - if has_unmet_dependencies(project_root, stage_dir, story_id) { + if has_unmet_dependencies(story_id) { slog!( "[auto-assign] Story '{story_id}' has unmet dependencies; skipping until deps are done." ); @@ -64,8 +66,7 @@ impl AgentPool { // Re-acquire the lock on each iteration to see state changes // from previous start_agent calls in the same pass. - let preferred_agent = - read_story_front_matter_agent(project_root, stage_dir, story_id); + let preferred_agent = read_story_front_matter_agent(story_id); // Check max_coders limit for the Coder stage before agent selection. // If the pool is full, all remaining items in this stage wait. diff --git a/server/src/agents/pool/auto_assign/scan.rs b/server/src/agents/pool/auto_assign/scan.rs index 33762000..e81e0651 100644 --- a/server/src/agents/pool/auto_assign/scan.rs +++ b/server/src/agents/pool/auto_assign/scan.rs @@ -2,7 +2,6 @@ use crate::config::ProjectConfig; use std::collections::HashMap; -use std::path::Path; use super::super::super::{AgentStatus, PipelineStage, agent_config_stage, pipeline_stage}; use super::super::StoryAgent; @@ -18,32 +17,14 @@ pub(in crate::agents::pool) fn is_agent_free( }) } -pub(super) fn scan_stage_items(_project_root: &Path, stage_dir: &str) -> Vec { +/// Return all story_ids in the given pipeline `Stage`, sourced from the CRDT. +pub(super) fn scan_stage_items(want: &crate::pipeline_state::Stage) -> Vec { use std::collections::BTreeSet; let mut items = BTreeSet::new(); - // Accept legacy directory-style strings (`"2_current"`, `"4_merge"`, - // etc.) at the boundary; `Stage::from_dir` itself is strict post-934 - // stage 6, so we normalise here. - let normalised = match stage_dir { - "0_upcoming" => "upcoming", - "1_backlog" => "backlog", - "2_current" => "coding", - "2_blocked" => "blocked", - "3_qa" => "qa", - "4_merge" => "merge", - "4_merge_failure" => "merge_failure", - "5_done" => "done", - "6_archived" => "archived", - other => other, - }; - let Some(want) = crate::pipeline_state::Stage::from_dir(normalised) else { - return Vec::new(); - }; - // CRDT is the only source of truth — no filesystem fallback. for item in crate::pipeline_state::read_all_typed() { - if std::mem::discriminant(&item.stage) == std::mem::discriminant(&want) { + if std::mem::discriminant(&item.stage) == std::mem::discriminant(want) { items.insert(item.story_id.0.clone()); } } @@ -181,6 +162,7 @@ mod tests { // attempt to promote an archived story. #[test] fn scan_stage_items_skips_filesystem_item_known_to_crdt_at_different_stage() { + use crate::pipeline_state::Stage; crate::db::ensure_content_store(); // Write the story into the CRDT as 6_archived. crate::db::write_item_with_content( @@ -190,34 +172,16 @@ mod tests { crate::db::ItemMeta::named("Archived"), ); - // Also place a stale .md file in a temp 1_backlog/ dir. - let tmp = tempfile::tempdir().unwrap(); - let backlog = tmp.path().join(".huskies/work/1_backlog"); - std::fs::create_dir_all(&backlog).unwrap(); - std::fs::write( - backlog.join("9970_story_archived.md"), - "---\nname: Archived\n---\n", - ) - .unwrap(); - - let items = scan_stage_items(tmp.path(), "1_backlog"); + let items = scan_stage_items(&Stage::Backlog); assert!( !items.contains(&"9970_story_archived".to_string()), - "archived CRDT story must not appear in 1_backlog scan via stale filesystem shadow" + "archived CRDT story must not appear in backlog scan" ); } - #[test] - fn scan_stage_items_returns_empty_for_missing_dir() { - // Use a unique stage name that no other test writes to, so - // the global CRDT store won't contribute stale items. - let tmp = tempfile::tempdir().unwrap(); - let items = scan_stage_items(tmp.path(), "9_nonexistent"); - assert!(items.is_empty()); - } - #[test] fn scan_stage_items_returns_sorted_story_ids() { + use crate::pipeline_state::Stage; // Write items via the CRDT store (the primary source of truth). crate::db::ensure_content_store(); crate::db::write_item_with_content( @@ -239,8 +203,7 @@ mod tests { crate::db::ItemMeta::named("baz"), ); - let tmp = tempfile::tempdir().unwrap(); - let items = scan_stage_items(tmp.path(), "2_current"); + let items = scan_stage_items(&Stage::Coding); // The global CRDT may contain items from other tests, so check // that our three items are present and appear in sorted order. assert!( diff --git a/server/src/agents/pool/auto_assign/story_checks.rs b/server/src/agents/pool/auto_assign/story_checks.rs index f34eed3d..baa41f5f 100644 --- a/server/src/agents/pool/auto_assign/story_checks.rs +++ b/server/src/agents/pool/auto_assign/story_checks.rs @@ -1,18 +1,12 @@ //! Front-matter checks for story files: review holds, blocked state, and merge failures. -use std::path::Path; - /// Read the optional `agent:` pin for a story. /// /// After story 871 the agent assignment lives in the CRDT typed register /// (`PipelineItemView.agent`), not the YAML front matter. We check the CRDT /// first; falling back to legacy YAML parsing keeps behaviour intact for any /// stories whose CRDT entry doesn't yet have the field set. -pub(super) fn read_story_front_matter_agent( - _project_root: &Path, - _stage_dir: &str, - story_id: &str, -) -> Option { +pub(super) fn read_story_front_matter_agent(story_id: &str) -> Option { // Story 929: agent name comes from the CRDT register. The previous // YAML fallback is gone — post-891 every story has its CRDT entry, // and any story without one is treated as having no pinned agent. @@ -26,7 +20,7 @@ pub(super) fn read_story_front_matter_agent( /// The auto-assigner uses this to keep human-QA items / spikes parked after /// gates pass until a reviewer explicitly clears the hold (e.g. via /// `tool_approve_qa`). -pub(super) fn has_review_hold(_project_root: &Path, _stage_dir: &str, story_id: &str) -> bool { +pub(super) fn has_review_hold(story_id: &str) -> bool { crate::crdt_state::read_item(story_id) .map(|w| w.stage().is_review_hold()) .unwrap_or(false) @@ -37,7 +31,7 @@ pub(super) fn has_review_hold(_project_root: &Path, _stage_dir: &str, story_id: /// /// The typed pipeline stage register is the only source consulted — the legacy /// `blocked: true` YAML front-matter field is no longer checked. -pub(super) fn is_story_blocked(_project_root: &Path, _stage_dir: &str, story_id: &str) -> bool { +pub(super) fn is_story_blocked(story_id: &str) -> bool { crate::pipeline_state::read_typed(story_id) .ok() .flatten() @@ -52,11 +46,7 @@ pub(super) fn is_story_blocked(_project_root: &Path, _stage_dir: &str, story_id: /// The typed stage register is consulted first; the CRDT content store is then /// scanned for conflict markers (the projection layer does not carry the reason /// string). No YAML front-matter parsing is performed. -pub(super) fn has_content_conflict_failure( - _project_root: &Path, - _stage_dir: &str, - story_id: &str, -) -> bool { +pub(super) fn has_content_conflict_failure(story_id: &str) -> bool { let is_merge_failure = crate::pipeline_state::read_typed(story_id) .ok() .flatten() @@ -86,11 +76,7 @@ pub(super) fn has_content_conflict_failure( /// the legacy `mergemaster_attempted: bool` CRDT register has been deleted. /// Used to prevent the auto-assigner from repeatedly spawning mergemaster for /// the same story after a failed mergemaster session. -pub(super) fn has_mergemaster_attempted( - _project_root: &Path, - _stage_dir: &str, - story_id: &str, -) -> bool { +pub(super) fn has_mergemaster_attempted(story_id: &str) -> bool { crate::crdt_state::read_item(story_id) .map(|view| view.stage().is_mergemaster_attempted()) .unwrap_or(false) @@ -98,22 +84,14 @@ pub(super) fn has_mergemaster_attempted( /// Return `true` if the story has any `depends_on` entries that are not yet in /// `5_done` or `6_archived`. Reads dependency state from the CRDT (story 929). -pub(super) fn has_unmet_dependencies( - _project_root: &Path, - _stage_dir: &str, - story_id: &str, -) -> bool { +pub(super) fn has_unmet_dependencies(story_id: &str) -> bool { !crate::crdt_state::check_unmet_deps_crdt(story_id).is_empty() } /// Return the list of dependency story numbers that are in `6_archived` (satisfied /// via archive rather than via a clean `5_done` completion). Reads from the CRDT /// (story 929). -pub(super) fn check_archived_dependencies( - _project_root: &Path, - _stage_dir: &str, - story_id: &str, -) -> Vec { +pub(super) fn check_archived_dependencies(story_id: &str) -> Vec { crate::crdt_state::check_archived_deps_crdt(story_id) } @@ -123,7 +101,7 @@ pub(super) fn check_archived_dependencies( /// the legacy `frozen: bool` CRDT register has been deleted. Frozen stories /// are skipped by the auto-assigner until `Unfreeze` returns them to /// `resume_to`. -pub(super) fn is_story_frozen(_project_root: &Path, _stage_dir: &str, story_id: &str) -> bool { +pub(super) fn is_story_frozen(story_id: &str) -> bool { crate::crdt_state::read_item(story_id) .map(|view| view.stage().is_frozen()) .unwrap_or(false) @@ -141,7 +119,6 @@ mod tests { fn has_review_hold_returns_true_when_flag_set() { crate::crdt_state::init_for_test(); crate::db::ensure_content_store(); - let tmp = tempfile::tempdir().unwrap(); // Story 945: review_hold is now a typed Stage variant, seeded via // the wire-form stage register directly. crate::crdt_state::write_item_str( @@ -155,14 +132,13 @@ mod tests { None, None, ); - assert!(has_review_hold(tmp.path(), "3_qa", "890_spike_held")); + assert!(has_review_hold("890_spike_held")); } #[test] fn has_review_hold_returns_false_when_flag_unset() { crate::crdt_state::init_for_test(); crate::db::ensure_content_store(); - let tmp = tempfile::tempdir().unwrap(); crate::crdt_state::write_item_str( "890_spike_active_qa", "3_qa", @@ -174,13 +150,12 @@ mod tests { None, None, ); - assert!(!has_review_hold(tmp.path(), "3_qa", "890_spike_active_qa")); + assert!(!has_review_hold("890_spike_active_qa")); } #[test] fn has_review_hold_returns_false_when_story_unknown() { - let tmp = tempfile::tempdir().unwrap(); - assert!(!has_review_hold(tmp.path(), "3_qa", "99_spike_missing")); + assert!(!has_review_hold("99_spike_missing")); } // ── is_story_blocked — regression: typed stage is sole authority ────────── @@ -189,25 +164,19 @@ mod tests { fn is_story_blocked_set_via_typed_stage_returns_true() { crate::crdt_state::init_for_test(); crate::db::ensure_content_store(); - let tmp = tempfile::tempdir().unwrap(); crate::db::write_item_with_content( "890_story_blocked_set", "2_blocked", "---\nname: Blocked Story\n---\n", crate::db::ItemMeta::named("Blocked Story"), ); - assert!(is_story_blocked( - tmp.path(), - "2_blocked", - "890_story_blocked_set" - )); + assert!(is_story_blocked("890_story_blocked_set")); } #[test] fn is_story_blocked_cleared_via_typed_stage_returns_false() { crate::crdt_state::init_for_test(); crate::db::ensure_content_store(); - let tmp = tempfile::tempdir().unwrap(); // First set to blocked. crate::db::write_item_with_content( "890_story_blocked_clear", @@ -222,18 +191,13 @@ mod tests { "---\nname: Clearable Story\n---\n", crate::db::ItemMeta::named("Clearable Story"), ); - assert!(!is_story_blocked( - tmp.path(), - "2_current", - "890_story_blocked_clear" - )); + assert!(!is_story_blocked("890_story_blocked_clear")); } #[test] fn is_story_blocked_stale_yaml_is_ignored() { crate::crdt_state::init_for_test(); crate::db::ensure_content_store(); - let tmp = tempfile::tempdir().unwrap(); // YAML front matter says `blocked: true`, but the typed CRDT stage is backlog. // After removing the YAML fallback, the function must return false. crate::db::write_item_with_content( @@ -243,7 +207,7 @@ mod tests { crate::db::ItemMeta::named("Stale"), ); assert!( - !is_story_blocked(tmp.path(), "1_backlog", "890_story_stale_yaml"), + !is_story_blocked("890_story_stale_yaml"), "stale YAML `blocked: true` must not be reported as blocked when typed stage is Backlog" ); } @@ -253,7 +217,6 @@ mod tests { #[test] fn has_unmet_dependencies_returns_true_when_dep_not_done() { crate::crdt_state::init_for_test(); - let tmp = tempfile::tempdir().unwrap(); crate::crdt_state::write_item_str( "10_story_blocked", "2_current", @@ -265,17 +228,12 @@ mod tests { None, None, ); - assert!(has_unmet_dependencies( - tmp.path(), - "2_current", - "10_story_blocked" - )); + assert!(has_unmet_dependencies("10_story_blocked")); } #[test] fn has_unmet_dependencies_returns_false_when_dep_done() { crate::crdt_state::init_for_test(); - let tmp = tempfile::tempdir().unwrap(); crate::crdt_state::write_item_str( "999_story_dep", "5_done", @@ -298,17 +256,12 @@ mod tests { None, None, ); - assert!(!has_unmet_dependencies( - tmp.path(), - "2_current", - "10_story_ok" - )); + assert!(!has_unmet_dependencies("10_story_ok")); } #[test] fn has_unmet_dependencies_returns_false_when_no_deps() { crate::crdt_state::init_for_test(); - let tmp = tempfile::tempdir().unwrap(); crate::crdt_state::write_item_str( "5_story_free", "2_current", @@ -320,11 +273,7 @@ mod tests { None, None, ); - assert!(!has_unmet_dependencies( - tmp.path(), - "2_current", - "5_story_free" - )); + assert!(!has_unmet_dependencies("5_story_free")); } // ── Bug 503: archived-dep visibility ───────────────────────────────────── @@ -333,7 +282,6 @@ mod tests { #[test] fn check_archived_dependencies_returns_archived_ids() { crate::crdt_state::init_for_test(); - let tmp = tempfile::tempdir().unwrap(); crate::crdt_state::write_item_str( "500_spike_crdt", "6_archived", @@ -356,8 +304,7 @@ mod tests { None, None, ); - let archived_deps = - check_archived_dependencies(tmp.path(), "1_backlog", "503_story_dependent"); + let archived_deps = check_archived_dependencies("503_story_dependent"); assert_eq!(archived_deps, vec![500]); } @@ -365,7 +312,6 @@ mod tests { #[test] fn check_archived_dependencies_empty_when_dep_in_done() { crate::crdt_state::init_for_test(); - let tmp = tempfile::tempdir().unwrap(); crate::crdt_state::write_item_str( "490_story_done", "5_done", @@ -388,8 +334,7 @@ mod tests { None, None, ); - let archived_deps = - check_archived_dependencies(tmp.path(), "1_backlog", "503_story_waiting"); + let archived_deps = check_archived_dependencies("503_story_waiting"); assert!(archived_deps.is_empty()); } } diff --git a/server/src/crdt_state/ops.rs b/server/src/crdt_state/ops.rs index 5bdefd04..277d725b 100644 --- a/server/src/crdt_state/ops.rs +++ b/server/src/crdt_state/ops.rs @@ -163,21 +163,26 @@ pub fn apply_remote_op(op: SignedOp) -> bool { // Detect and broadcast stage transitions. for (sid, &idx) in &state.index { - let new_stage = match state.crdt.doc.items[idx].stage.view() { + let new_stage_str = match state.crdt.doc.items[idx].stage.view() { JsonValue::String(s) => s, _ => continue, }; - let old_stage = pre_stages.get(sid).cloned(); - let changed = old_stage.as_deref() != Some(&new_stage); + let old_stage_str = pre_stages.get(sid).cloned(); + let changed = old_stage_str.as_deref() != Some(&new_stage_str); if changed { + // Storage seam: convert the raw CRDT stage strings to typed Stage values here. + let Some(to_stage) = crate::pipeline_state::Stage::from_dir(&new_stage_str) else { + continue; + }; + let from_stage = old_stage_str.and_then(|s| crate::pipeline_state::Stage::from_dir(&s)); let name = match state.crdt.doc.items[idx].name.view() { JsonValue::String(s) if !s.is_empty() => Some(s), _ => None, }; emit_event(CrdtEvent { story_id: sid.clone(), - from_stage: old_stage, - to_stage: new_stage, + from_stage, + to_stage, name, }); } diff --git a/server/src/crdt_state/state/tests.rs b/server/src/crdt_state/state/tests.rs index 96b27889..cdef6a4a 100644 --- a/server/src/crdt_state/state/tests.rs +++ b/server/src/crdt_state/state/tests.rs @@ -124,7 +124,10 @@ async fn subscribe_receives_stage_transition_events() { let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on insert"); assert_eq!(evt.story_id, "906_story_subscribe"); assert!(evt.from_stage.is_none()); - assert_eq!(evt.to_stage, "backlog"); + assert!(matches!( + evt.to_stage, + crate::pipeline_state::Stage::Backlog + )); // Update stage — emit_event fires again with the real from_stage. write_item_str( @@ -141,8 +144,11 @@ async fn subscribe_receives_stage_transition_events() { let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on stage change"); assert_eq!(evt.story_id, "906_story_subscribe"); - assert_eq!(evt.from_stage.as_deref(), Some("backlog")); - assert_eq!(evt.to_stage, "coding"); + assert!(matches!( + evt.from_stage, + Some(crate::pipeline_state::Stage::Backlog) + )); + assert!(matches!(evt.to_stage, crate::pipeline_state::Stage::Coding)); } #[tokio::test] diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs index 2a2b5c16..de768536 100644 --- a/server/src/crdt_state/types.rs +++ b/server/src/crdt_state/types.rs @@ -12,9 +12,9 @@ pub struct CrdtEvent { /// Work item ID (e.g. `"42_story_my_feature"`). pub story_id: String, /// The stage the item was in before this transition, or `None` for new items. - pub from_stage: Option, + pub from_stage: Option, /// The stage the item is now in. - pub to_stage: String, + pub to_stage: crate::pipeline_state::Stage, /// Human-readable story name from the CRDT document. pub name: Option, } @@ -536,13 +536,16 @@ mod tests { fn crdt_event_has_expected_fields() { let evt = CrdtEvent { story_id: "42_story_foo".to_string(), - from_stage: Some("1_backlog".to_string()), - to_stage: "2_current".to_string(), + from_stage: Some(crate::pipeline_state::Stage::Backlog), + to_stage: crate::pipeline_state::Stage::Coding, name: Some("Foo Feature".to_string()), }; assert_eq!(evt.story_id, "42_story_foo"); - assert_eq!(evt.from_stage.as_deref(), Some("1_backlog")); - assert_eq!(evt.to_stage, "2_current"); + assert!(matches!( + evt.from_stage, + Some(crate::pipeline_state::Stage::Backlog) + )); + assert!(matches!(evt.to_stage, crate::pipeline_state::Stage::Coding)); assert_eq!(evt.name.as_deref(), Some("Foo Feature")); } @@ -551,7 +554,7 @@ mod tests { let evt = CrdtEvent { story_id: "10_story_bar".to_string(), from_stage: None, - to_stage: "1_backlog".to_string(), + to_stage: crate::pipeline_state::Stage::Backlog, name: None, }; let cloned = evt.clone(); @@ -569,7 +572,7 @@ mod tests { emit_event(CrdtEvent { story_id: "99_story_noop".to_string(), from_stage: None, - to_stage: "1_backlog".to_string(), + to_stage: crate::pipeline_state::Stage::Backlog, name: None, }); } @@ -686,19 +689,20 @@ mod tests { #[test] fn crdt_event_broadcast_channel_round_trip() { + use crate::pipeline_state::Stage; let (tx, mut rx) = broadcast::channel::(16); let evt = CrdtEvent { story_id: "70_story_broadcast".to_string(), - from_stage: Some("1_backlog".to_string()), - to_stage: "2_current".to_string(), + from_stage: Some(Stage::Backlog), + to_stage: Stage::Coding, name: Some("Broadcast Test".to_string()), }; tx.send(evt).unwrap(); let received = rx.try_recv().unwrap(); assert_eq!(received.story_id, "70_story_broadcast"); - assert_eq!(received.from_stage.as_deref(), Some("1_backlog")); - assert_eq!(received.to_stage, "2_current"); + assert!(matches!(received.from_stage, Some(Stage::Backlog))); + assert!(matches!(received.to_stage, Stage::Coding)); assert_eq!(received.name.as_deref(), Some("Broadcast Test")); } } diff --git a/server/src/crdt_state/write/item.rs b/server/src/crdt_state/write/item.rs index b7ca8235..52d7e641 100644 --- a/server/src/crdt_state/write/item.rs +++ b/server/src/crdt_state/write/item.rs @@ -276,10 +276,12 @@ pub fn write_item( JsonValue::String(s) if !s.is_empty() => Some(s), _ => None, }; + // Storage seam: convert the old raw CRDT stage string to a typed Stage. + let from_stage = old_stage.and_then(|s| Stage::from_dir(&s)); emit_event(CrdtEvent { story_id: story_id.to_string(), - from_stage: old_stage, - to_stage: stage_str.to_string(), + from_stage, + to_stage: stage.clone(), name: current_name, }); } @@ -333,7 +335,7 @@ pub fn write_item( emit_event(CrdtEvent { story_id: story_id.to_string(), from_stage: None, - to_stage: stage_str.to_string(), + to_stage: stage.clone(), name: name.map(String::from), }); } diff --git a/server/src/io/watcher/mod.rs b/server/src/io/watcher/mod.rs index 0d11a47c..b2dc5ee9 100644 --- a/server/src/io/watcher/mod.rs +++ b/server/src/io/watcher/mod.rs @@ -42,13 +42,16 @@ pub fn is_config_file(path: &Path, git_root: &Path) -> bool { path == huskies.join("project.toml") || path == huskies.join("agents.toml") } -/// Map a pipeline directory name to a (action, commit-message-prefix) pair. +/// Map a typed pipeline stage to a (action, commit-message-prefix) pair. /// -/// Used by the CRDT-to-watcher bridge (in `main.rs`) to derive the action and -/// commit message for `WatcherEvent::WorkItem` events. -pub fn stage_metadata(stage: &str, item_id: &str) -> Option<(&'static str, String)> { +/// Used by the CRDT-to-watcher bridge to derive the action and commit message +/// for `WatcherEvent::WorkItem` events. +pub fn stage_metadata( + stage: &crate::pipeline_state::Stage, + item_id: &str, +) -> (&'static str, String) { use crate::pipeline_state::Stage; - let (action, msg) = match Stage::from_dir(stage)? { + match stage { Stage::Upcoming => ("create", format!("huskies: triage {item_id}")), Stage::Backlog => ("create", format!("huskies: create {item_id}")), Stage::Coding => ("start", format!("huskies: start {item_id}")), @@ -66,8 +69,7 @@ pub fn stage_metadata(stage: &str, item_id: &str) -> Option<(&'static str, Strin Stage::ReviewHold { .. } => ("review_hold", format!("huskies: review_hold {item_id}")), Stage::Done { .. } => ("done", format!("huskies: done {item_id}")), Stage::Archived { .. } => ("accept", format!("huskies: accept {item_id}")), - }; - Some((action, msg)) + } } /// Start the filesystem watcher on a dedicated OS thread. diff --git a/server/src/io/watcher/tests.rs b/server/src/io/watcher/tests.rs index a6f77a3e..e6cc4363 100644 --- a/server/src/io/watcher/tests.rs +++ b/server/src/io/watcher/tests.rs @@ -51,19 +51,32 @@ fn is_config_file_rejects_wrong_root() { #[test] fn stage_metadata_returns_correct_actions() { - let (action, msg) = stage_metadata("coding", "42_story_foo").unwrap(); + use crate::pipeline_state::{GitSha, Stage}; + use chrono::Utc; + + let (action, msg) = stage_metadata(&Stage::Coding, "42_story_foo"); assert_eq!(action, "start"); assert_eq!(msg, "huskies: start 42_story_foo"); - let (action, msg) = stage_metadata("done", "42_story_foo").unwrap(); + let (action, msg) = stage_metadata( + &Stage::Done { + merged_at: Utc::now(), + merge_commit: GitSha(String::new()), + }, + "42_story_foo", + ); assert_eq!(action, "done"); assert_eq!(msg, "huskies: done 42_story_foo"); - let (action, msg) = stage_metadata("archived", "42_story_foo").unwrap(); + let (action, msg) = stage_metadata( + &Stage::Archived { + archived_at: Utc::now(), + reason: crate::pipeline_state::ArchiveReason::Completed, + }, + "42_story_foo", + ); assert_eq!(action, "accept"); assert_eq!(msg, "huskies: accept 42_story_foo"); - - assert!(stage_metadata("unknown", "id").is_none()); } // ── sweep_done_to_archived (CRDT-based) ───────────────────────────────── diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index 6adaaa2f..7f0d5158 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -28,8 +28,7 @@ pub(crate) fn spawn_event_bridges( if let Some(mut crdt_rx) = crate::crdt_state::subscribe() { tokio::spawn(async move { while let Ok(evt) = crdt_rx.recv().await { - if crate::pipeline_state::Stage::from_dir(&evt.to_stage) - .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Archived { .. })) + if matches!(evt.to_stage, crate::pipeline_state::Stage::Archived { .. }) && let Some(root) = crdt_prune_root.as_ref().cloned() { let story_id = evt.story_id.clone(); @@ -42,14 +41,13 @@ pub(crate) fn spawn_event_bridges( }); } let (action, commit_msg) = - io::watcher::stage_metadata(&evt.to_stage, &evt.story_id) - .unwrap_or(("update", format!("huskies: update {}", evt.story_id))); + io::watcher::stage_metadata(&evt.to_stage, &evt.story_id); let watcher_evt = io::watcher::WatcherEvent::WorkItem { - stage: evt.to_stage, + stage: evt.to_stage.dir_name().to_string(), item_id: evt.story_id, action: action.to_string(), commit_msg, - from_stage: evt.from_stage, + from_stage: evt.from_stage.map(|s| s.dir_name().to_string()), }; let _ = crdt_watcher_tx.send(watcher_evt); }