From d78dd9e8f94a5d9bfda9206aed0293ca1f0ca69d Mon Sep 17 00:00:00 2001 From: Timmy Date: Tue, 12 May 2026 22:31:59 +0100 Subject: [PATCH] feat(934): typed Stage enum replaces directory-string state model The state machine's `Stage` enum becomes the source of truth for pipeline state. Six stages of work land together: 1. Clean wire vocabulary (`coding`, `merge`, `merge_failure`, ...) replaces legacy directory-style strings (`2_current`, `4_merge`, ...) on the wire. `Stage::from_dir` accepted both during deployment; new writes always emit the clean form via `stage_dir_name`. Lexicographic `dir >= "5_done"` checks in lifecycle.rs become typed `matches!` checks since the new vocabulary doesn't sort in pipeline order. 2. `crdt_state::write_item` takes typed `&Stage`, serialising via `stage_dir_name` at the CRDT boundary. `#[cfg(test)] write_item_str` parses legacy strings for test fixtures. 3. `WorkItem::stage()` returns typed `crdt_state::Stage`; `stage_str()` is gone from the public API. Projection dispatches on the typed enum. 4. `frozen` becomes an orthogonal CRDT register. `Stage::Frozen` and `PipelineEvent::Freeze`/`Unfreeze` are removed; `transition_to_frozen`/ `unfrozen` set the flag directly without touching the stage register. 5. Watcher sweep and `tool_update_story`'s `blocked` setter route through `apply_transition` so the typed transition table validates every stage change. `update_story` gains a `frozen` field for symmetry. 6. One-shot startup migration rewrites pre-934 directory-style stage registers (and sets `frozen=true` on items previously at `7_frozen`). `Stage::from_dir` drops legacy aliases. The db boundary keeps a small normaliser so callers with legacy strings (MCP, tests) still work. Co-Authored-By: Claude Opus 4.7 (1M context) --- server/src/agent_mode/claim.rs | 6 +- server/src/agent_mode/loop_ops.rs | 6 +- server/src/agents/lifecycle.rs | 66 +++++---- server/src/agents/pool/auto_assign/scan.rs | 22 ++- .../agents/pool/auto_assign/story_checks.rs | 31 ++--- .../watchdog/tests/limits_tests.rs | 30 ++-- .../pool/pipeline/advance/tests_regression.rs | 6 +- .../agents/pool/start/tests_concurrency.rs | 12 +- .../src/agents/pool/start/tests_selection.rs | 2 +- server/src/chat/commands/backlog.rs | 2 + server/src/chat/commands/freeze.rs | 2 +- server/src/chat/commands/status/tests.rs | 2 + server/src/chat/commands/unblock.rs | 4 +- server/src/chat/lookup.rs | 14 +- server/src/chat/transport/matrix/assign.rs | 6 +- server/src/chat/transport/matrix/delete.rs | 3 +- server/src/crdt_snapshot/tests.rs | 16 +-- server/src/crdt_state/mod.rs | 10 +- server/src/crdt_state/ops.rs | 4 +- server/src/crdt_state/read.rs | 12 +- server/src/crdt_state/state/tests.rs | 12 +- server/src/crdt_state/types.rs | 80 +++++++---- server/src/crdt_state/write/item.rs | 96 ++++++++++++- server/src/crdt_state/write/migrations.rs | 85 ++++++++++++ server/src/crdt_state/write/mod.rs | 10 +- server/src/crdt_state/write/tests.rs | 34 ++--- server/src/db/mod.rs | 8 +- server/src/db/ops.rs | 59 ++++++-- server/src/http/agents/tests.rs | 4 +- server/src/http/mcp/merge_tools.rs | 23 ++-- server/src/http/mcp/status_tools.rs | 10 +- server/src/http/mcp/story_tools/epic.rs | 24 ++-- .../src/http/mcp/story_tools/story/freeze.rs | 7 +- .../src/http/mcp/story_tools/story/update.rs | 44 +++++- server/src/http/workflow/pipeline.rs | 7 +- server/src/io/story_metadata/parser.rs | 13 +- server/src/io/watcher/mod.rs | 1 - server/src/io/watcher/sweep.rs | 39 ++---- server/src/io/watcher/tests.rs | 10 +- server/src/pipeline_state/apply.rs | 35 +++-- server/src/pipeline_state/mod.rs | 7 +- server/src/pipeline_state/projection.rs | 102 +++++++------- server/src/pipeline_state/tests.rs | 130 ++++-------------- server/src/pipeline_state/transition.rs | 17 --- server/src/pipeline_state/types.rs | 76 +++++----- server/src/service/agents/mod.rs | 30 ++-- server/src/service/gateway/polling.rs | 4 +- server/src/service/notifications/format.rs | 13 +- .../service/notifications/io/tests_stage.rs | 14 +- server/src/service/status/format.rs | 16 +-- server/src/service/story/front_matter.rs | 65 ++++----- server/src/service/work_item/assign.rs | 4 +- server/src/service/work_item/delete.rs | 2 +- server/src/service/work_item/freeze.rs | 24 ++-- server/src/startup/project.rs | 6 + 55 files changed, 783 insertions(+), 584 deletions(-) diff --git a/server/src/agent_mode/claim.rs b/server/src/agent_mode/claim.rs index c765f2a2..2d9d01cb 100644 --- a/server/src/agent_mode/claim.rs +++ b/server/src/agent_mode/claim.rs @@ -78,7 +78,9 @@ mod tests { #[test] #[allow(clippy::string_slice)] // stale_holder is a hex/ASCII string literal; [..12] always valid fn stale_claim_displaced_and_logged() { - use crate::crdt_state::{init_for_test, our_node_id, read_item, write_claim, write_item}; + use crate::crdt_state::{ + init_for_test, our_node_id, read_item, write_claim, write_item_str, + }; init_for_test(); @@ -88,7 +90,7 @@ mod tests { let stale_time = chrono::Utc::now().timestamp() as f64 - CLAIM_TIMEOUT_SECS - 300.0; // Seed the story with a stale claim from a foreign node. - write_item( + write_item_str( story_id, "2_current", Some("Stale Claim Displacement Test"), diff --git a/server/src/agent_mode/loop_ops.rs b/server/src/agent_mode/loop_ops.rs index 03edb054..4220dd75 100644 --- a/server/src/agent_mode/loop_ops.rs +++ b/server/src/agent_mode/loop_ops.rs @@ -42,8 +42,7 @@ pub(super) async fn scan_and_claim( for item in &items { // Only claim stories in active stages. - if !crate::pipeline_state::Stage::from_dir(item.stage_str()).is_some_and(|s| s.is_active()) - { + if !item.stage().is_active() { continue; } @@ -166,8 +165,7 @@ pub(super) fn reclaim_timed_out_work(_project_root: &Path) { let now = chrono::Utc::now().timestamp() as f64; for item in &items { - if !crate::pipeline_state::Stage::from_dir(item.stage_str()).is_some_and(|s| s.is_active()) - { + if !item.stage().is_active() { continue; } diff --git a/server/src/agents/lifecycle.rs b/server/src/agents/lifecycle.rs index 97d08647..67d68966 100644 --- a/server/src/agents/lifecycle.rs +++ b/server/src/agents/lifecycle.rs @@ -103,10 +103,9 @@ pub fn feature_branch_has_unmerged_changes(project_root: &Path, story_id: &str) /// Spikes may transition directly from `3_qa/` to `5_done/`, skipping the merge stage. pub fn move_story_to_done(story_id: &str) -> Result<(), String> { let item = read_typed_or_err(story_id)?; - let dir = item.stage.dir_name(); // Idempotent: already at or past done. - if dir >= "5_done" { + if matches!(item.stage, Stage::Done { .. } | Stage::Archived { .. }) { return Ok(()); } @@ -134,10 +133,15 @@ pub fn move_story_to_done(story_id: &str) -> Result<(), String> { /// Idempotent if already in `4_merge/`. Errors if not found in `2_current/` or `3_qa/`. pub fn move_story_to_merge(story_id: &str) -> Result<(), String> { let item = read_typed_or_err(story_id)?; - let dir = item.stage.dir_name(); // Idempotent: already at or past merge. - if dir >= "4_merge" { + if matches!( + item.stage, + Stage::Merge { .. } + | Stage::MergeFailure { .. } + | Stage::Done { .. } + | Stage::Archived { .. } + ) { return Ok(()); } @@ -170,10 +174,16 @@ pub fn move_story_to_merge(story_id: &str) -> Result<(), String> { /// Idempotent if already in `3_qa/`. Errors if not found in `2_current/`. pub fn move_story_to_qa(story_id: &str) -> Result<(), String> { let item = read_typed_or_err(story_id)?; - let dir = item.stage.dir_name(); // Idempotent: already at or past qa. - if dir >= "3_qa" { + if matches!( + item.stage, + Stage::Qa + | Stage::Merge { .. } + | Stage::MergeFailure { .. } + | Stage::Done { .. } + | Stage::Archived { .. } + ) { return Ok(()); } @@ -349,16 +359,19 @@ fn map_stage_move_to_event( /// Move any work item to an arbitrary pipeline stage by searching all stages. /// /// Accepts `target_stage` as one of: `backlog`, `current`, `qa`, `merge`, `done`. +/// (`current` is the user-facing alias for the `coding` stage.) /// Idempotent: if the item is already in the target stage, returns Ok. /// Returns `(from_stage, to_stage)` on success. pub fn move_story_to_stage(story_id: &str, target_stage: &str) -> Result<(String, String), String> { - // Validate target. - let target_dir = match target_stage { - "backlog" => "1_backlog", - "current" => "2_current", - "qa" => "3_qa", - "merge" => "4_merge", - "done" => "5_done", + // Validate target. We accept the user-facing aliases (which include + // "current" as the historical alias for "coding") and normalise to the + // canonical clean wire form for the idempotency check. + let target_wire = match target_stage { + "backlog" => "backlog", + "current" => "coding", + "qa" => "qa", + "merge" => "merge", + "done" => "done", _ => { return Err(format!( "Invalid target_stage '{target_stage}'. Must be one of: backlog, current, qa, merge, done" @@ -370,7 +383,7 @@ pub fn move_story_to_stage(story_id: &str, target_stage: &str) -> Result<(String let from_name = stage_to_name(&item.stage); // Idempotent: already in the target stage. - if item.stage.dir_name() == target_dir { + if item.stage.dir_name() == target_wire { return Ok((target_stage.to_string(), target_stage.to_string())); } @@ -387,7 +400,7 @@ pub fn move_story_to_stage(story_id: &str, target_stage: &str) -> Result<(String pub fn close_bug_to_archive(bug_id: &str) -> Result<(), String> { let item = read_typed_or_err(bug_id)?; - if item.stage.dir_name() >= "5_done" { + if matches!(item.stage, Stage::Done { .. } | Stage::Archived { .. }) { return Ok(()); } @@ -415,7 +428,6 @@ fn stage_to_name(s: &Stage) -> &'static str { Stage::MergeFailure { .. } => "merge_failure", Stage::Done { .. } => "done", Stage::Archived { .. } => "archived", - Stage::Frozen { .. } => "frozen", } } @@ -444,8 +456,8 @@ mod tests { .expect("item should exist in CRDT after move"); assert_eq!( item.stage.dir_name(), - "2_current", - "item should be in 2_current after move" + "coding", + "item should be in coding after move" ); } @@ -476,8 +488,8 @@ mod tests { .expect("item should exist in CRDT"); assert_eq!( item.stage.dir_name(), - "5_done", - "item should be in 5_done after move" + "done", + "item should be in done after move" ); } @@ -540,11 +552,7 @@ mod tests { let item = crate::pipeline_state::read_typed("99866_story_block_test") .expect("read should succeed") .expect("item should exist"); - assert_eq!( - item.stage.dir_name(), - "2_current", - "should start in 2_current" - ); + assert_eq!(item.stage.dir_name(), "coding", "should start in coding"); // Block via the state machine. transition_to_blocked("99866_story_block_test", "retry limit exceeded") @@ -556,8 +564,8 @@ mod tests { .expect("item should exist after block"); assert_eq!( item.stage.dir_name(), - "2_blocked", - "should be in 2_blocked after transition_to_blocked" + "blocked", + "should be in blocked after transition_to_blocked" ); assert!(item.stage.is_blocked(), "is_blocked() should return true"); assert!( @@ -575,8 +583,8 @@ mod tests { .expect("item should exist after unblock"); assert_eq!( item.stage.dir_name(), - "2_current", - "should return to 2_current after unblock" + "coding", + "should return to coding after unblock" ); assert!( matches!(item.stage, Stage::Coding), diff --git a/server/src/agents/pool/auto_assign/scan.rs b/server/src/agents/pool/auto_assign/scan.rs index 30e1a2a0..6e94f08c 100644 --- a/server/src/agents/pool/auto_assign/scan.rs +++ b/server/src/agents/pool/auto_assign/scan.rs @@ -22,9 +22,29 @@ pub(super) fn scan_stage_items(_project_root: &Path, stage_dir: &str) -> Vec "upcoming", + "1_backlog" => "backlog", + "2_current" => "coding", + "2_blocked" => "blocked", + "3_qa" => "qa", + "4_merge" => "merge", + "4_merge_failure" => "merge_failure", + "5_done" => "done", + "6_archived" => "archived", + other => other, + }; + let Some(want) = crate::pipeline_state::Stage::from_dir(normalised) else { + return Vec::new(); + }; + let want = want.dir_name(); + // CRDT is the only source of truth — no filesystem fallback. for item in crate::pipeline_state::read_all_typed() { - if item.stage.dir_name() == stage_dir { + if item.stage.dir_name() == want { items.insert(item.story_id.0.clone()); } } diff --git a/server/src/agents/pool/auto_assign/story_checks.rs b/server/src/agents/pool/auto_assign/story_checks.rs index c322319d..eb7990fb 100644 --- a/server/src/agents/pool/auto_assign/story_checks.rs +++ b/server/src/agents/pool/auto_assign/story_checks.rs @@ -116,14 +116,13 @@ pub(super) fn check_archived_dependencies( crate::crdt_state::check_archived_deps_crdt(story_id) } -/// Return `true` if the story is in the `Frozen` pipeline stage. +/// Return `true` if the story's `frozen` CRDT flag is set (story 934, stage 4). /// -/// Checks the typed CRDT stage via `read_typed`. +/// `frozen` is orthogonal to [`Stage`]: a frozen story keeps its current stage +/// register but is skipped by the auto-assigner. pub(super) fn is_story_frozen(_project_root: &Path, _stage_dir: &str, story_id: &str) -> bool { - crate::pipeline_state::read_typed(story_id) - .ok() - .flatten() - .map(|item| item.stage.is_frozen()) + crate::crdt_state::read_item(story_id) + .map(|view| view.frozen()) .unwrap_or(false) } @@ -140,7 +139,7 @@ mod tests { crate::crdt_state::init_for_test(); crate::db::ensure_content_store(); let tmp = tempfile::tempdir().unwrap(); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "890_spike_held", "3_qa", Some("Held Spike"), @@ -161,7 +160,7 @@ mod tests { crate::crdt_state::init_for_test(); crate::db::ensure_content_store(); let tmp = tempfile::tempdir().unwrap(); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "890_spike_active_qa", "3_qa", Some("Active QA Spike"), @@ -253,7 +252,7 @@ mod tests { fn has_unmet_dependencies_returns_true_when_dep_not_done() { crate::crdt_state::init_for_test(); let tmp = tempfile::tempdir().unwrap(); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "10_story_blocked", "2_current", Some("Blocked"), @@ -276,7 +275,7 @@ mod tests { fn has_unmet_dependencies_returns_false_when_dep_done() { crate::crdt_state::init_for_test(); let tmp = tempfile::tempdir().unwrap(); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "999_story_dep", "5_done", Some("Dep"), @@ -288,7 +287,7 @@ mod tests { None, None, ); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "10_story_ok", "2_current", Some("Ok"), @@ -311,7 +310,7 @@ mod tests { fn has_unmet_dependencies_returns_false_when_no_deps() { crate::crdt_state::init_for_test(); let tmp = tempfile::tempdir().unwrap(); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "5_story_free", "2_current", Some("Free"), @@ -337,7 +336,7 @@ mod tests { fn check_archived_dependencies_returns_archived_ids() { crate::crdt_state::init_for_test(); let tmp = tempfile::tempdir().unwrap(); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "500_spike_crdt", "6_archived", Some("CRDT Spike"), @@ -349,7 +348,7 @@ mod tests { None, None, ); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "503_story_dependent", "1_backlog", Some("Dependent"), @@ -371,7 +370,7 @@ mod tests { fn check_archived_dependencies_empty_when_dep_in_done() { crate::crdt_state::init_for_test(); let tmp = tempfile::tempdir().unwrap(); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "490_story_done", "5_done", Some("Done"), @@ -383,7 +382,7 @@ mod tests { None, None, ); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "503_story_waiting", "1_backlog", Some("Waiting"), diff --git a/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs b/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs index 78cde8fd..af520cd0 100644 --- a/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs +++ b/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs @@ -243,7 +243,7 @@ max_turns = 10 let story_id = "42_story_runaway"; let initial = "---\nname: Runaway Story\n---\n# Runaway Story\n"; crate::db::write_content(story_id, initial); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( story_id, "2_current", Some("Runaway Story"), @@ -274,10 +274,10 @@ max_turns = 10 let item = crate::crdt_state::read_item(story_id) .expect("story must be in CRDT after watchdog termination"); assert_eq!( - item.stage_str(), - "2_blocked", + item.stage().as_dir(), + "blocked", "story stage must be 2_blocked after limit termination with max_retries=1 — got: {}", - item.stage_str() + item.stage().as_dir() ); // Sanity: the agent itself is also Failed with the right reason. @@ -371,7 +371,7 @@ max_turns = 10 let story_id = "story_e_per_session"; crate::db::write_content(story_id, "---\nname: Per-Session Test\n---\n"); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( story_id, "2_current", Some("Per-Session Test"), @@ -416,8 +416,8 @@ max_turns = 10 let item = crate::crdt_state::read_item(story_id) .expect("story must be in CRDT after per-session overrun"); assert_eq!( - item.stage_str(), - "2_blocked", + item.stage().as_dir(), + "blocked", "story stage must be 2_blocked after per-session overrun with max_retries=1" ); } @@ -451,7 +451,7 @@ max_turns = 10 let initial = "---\nname: Retry Test\n---\n"; crate::crdt_state::init_for_test(); crate::db::write_content(story_id, initial); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( story_id, "2_current", Some("Retry Test"), @@ -478,8 +478,8 @@ max_turns = 10 "after session 1, retry_count should be 1 in CRDT" ); assert_ne!( - item.stage_str(), - "2_blocked", + item.stage().as_dir(), + "blocked", "story should NOT be blocked after session 1" ); } @@ -498,8 +498,8 @@ max_turns = 10 "after session 2, retry_count should be 2 in CRDT" ); assert_ne!( - item.stage_str(), - "2_blocked", + item.stage().as_dir(), + "blocked", "story should NOT be blocked after session 2" ); } @@ -513,10 +513,10 @@ max_turns = 10 let item = crate::crdt_state::read_item(story_id).expect("story must be in CRDT"); assert_eq!( - item.stage_str(), - "2_blocked", + item.stage().as_dir(), + "blocked", "story must be blocked after session 3 (retry_count=3 >= max_retries=3) — got: {}", - item.stage_str() + item.stage().as_dir() ); // retry_count resets to 0 on stage transition (Bug 780) — the fact // that the story reached 2_blocked proves the retry limit was hit. diff --git a/server/src/agents/pool/pipeline/advance/tests_regression.rs b/server/src/agents/pool/pipeline/advance/tests_regression.rs index 997c09b3..a42d0101 100644 --- a/server/src/agents/pool/pipeline/advance/tests_regression.rs +++ b/server/src/agents/pool/pipeline/advance/tests_regression.rs @@ -298,12 +298,12 @@ async fn stale_mergemaster_advance_for_done_story_is_noop() { "No StoryBlocked event should be emitted for a stale advance" ); - // The story should still be in 5_done (not moved elsewhere). + // The story should still be in done (not moved elsewhere). if let Ok(Some(item)) = crate::pipeline_state::read_typed(story_id) { assert_eq!( item.stage.dir_name(), - "5_done", - "Story should remain in 5_done after stale mergemaster advance" + "done", + "Story should remain in done after stale mergemaster advance" ); } } diff --git a/server/src/agents/pool/start/tests_concurrency.rs b/server/src/agents/pool/start/tests_concurrency.rs index 9aabde99..4f65a019 100644 --- a/server/src/agents/pool/start/tests_concurrency.rs +++ b/server/src/agents/pool/start/tests_concurrency.rs @@ -443,11 +443,11 @@ async fn start_agent_rejects_mergemaster_on_coding_stage_story() { assert!( result.is_err(), - "mergemaster must not be assigned to a story in 2_current/" + "mergemaster must not be assigned to a story in coding stage" ); let err = result.unwrap_err(); assert!( - err.contains("stage") && err.contains("2_current"), + err.contains("stage") && err.contains("coding"), "error must mention stage mismatch, got: '{err}'" ); } @@ -482,11 +482,11 @@ async fn start_agent_rejects_coder_on_qa_stage_story() { assert!( result.is_err(), - "coder must not be assigned to a story in 3_qa/" + "coder must not be assigned to a story in qa stage" ); let err = result.unwrap_err(); assert!( - err.contains("stage") && err.contains("3_qa"), + err.contains("stage") && err.contains("qa"), "error must mention stage mismatch, got: '{err}'" ); } @@ -521,11 +521,11 @@ async fn start_agent_rejects_qa_on_merge_stage_story() { assert!( result.is_err(), - "qa must not be assigned to a story in 4_merge/" + "qa must not be assigned to a story in merge stage" ); let err = result.unwrap_err(); assert!( - err.contains("stage") && err.contains("4_merge"), + err.contains("stage") && err.contains("merge"), "error must mention stage mismatch, got: '{err}'" ); } diff --git a/server/src/agents/pool/start/tests_selection.rs b/server/src/agents/pool/start/tests_selection.rs index 1ff0a438..d7439a8c 100644 --- a/server/src/agents/pool/start/tests_selection.rs +++ b/server/src/agents/pool/start/tests_selection.rs @@ -283,7 +283,7 @@ stage = "coder" crate::db::write_content("368_story_test", story_content); // Story 929: agent pin comes from the CRDT register, not YAML. Seed it. crate::crdt_state::init_for_test(); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "368_story_test", "2_current", Some("Test Story"), diff --git a/server/src/chat/commands/backlog.rs b/server/src/chat/commands/backlog.rs index ebd89b38..93d57f52 100644 --- a/server/src/chat/commands/backlog.rs +++ b/server/src/chat/commands/backlog.rs @@ -71,6 +71,7 @@ mod tests { stage, depends_on: Vec::new(), retry_count: 0, + frozen: false, } } @@ -81,6 +82,7 @@ mod tests { stage, depends_on: deps.iter().map(|n| StoryId(n.to_string())).collect(), retry_count: 0, + frozen: false, } } diff --git a/server/src/chat/commands/freeze.rs b/server/src/chat/commands/freeze.rs index 4254c4dd..73cfe88e 100644 --- a/server/src/chat/commands/freeze.rs +++ b/server/src/chat/commands/freeze.rs @@ -205,7 +205,7 @@ mod tests { .expect("read_typed should succeed") .expect("item should be present"); assert!( - item.stage.is_frozen(), + item.is_frozen(), "stage should be Frozen after freeze: {:?}", item.stage ); diff --git a/server/src/chat/commands/status/tests.rs b/server/src/chat/commands/status/tests.rs index c7f83ef7..1e16e961 100644 --- a/server/src/chat/commands/status/tests.rs +++ b/server/src/chat/commands/status/tests.rs @@ -13,6 +13,7 @@ fn make_item(id: &str, name: &str, stage: Stage) -> PipelineItem { stage, depends_on: Vec::new(), retry_count: 0, + frozen: false, } } @@ -24,6 +25,7 @@ fn make_item_with_deps(id: &str, name: &str, stage: Stage, deps: Vec) -> Pi stage, depends_on: deps.iter().map(|n| StoryId(n.to_string())).collect(), retry_count: 0, + frozen: false, } } diff --git a/server/src/chat/commands/unblock.rs b/server/src/chat/commands/unblock.rs index c6d33045..641920e3 100644 --- a/server/src/chat/commands/unblock.rs +++ b/server/src/chat/commands/unblock.rs @@ -205,7 +205,7 @@ mod tests { ); // Seed the story in the CRDT in 2_blocked stage so the typed // Blocked → Coding transition fires and clears `blocked` properly. - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "9903_story_stuck", "2_blocked", Some("Stuck Story"), @@ -273,7 +273,7 @@ mod tests { ); // Seed CRDT registers: blocked=true, retry_count=5, with a name so the // response can echo it back instead of falling through to the raw id. - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( story_id, stage, Some("Stuck Story"), diff --git a/server/src/chat/lookup.rs b/server/src/chat/lookup.rs index 36efdad8..73fb0438 100644 --- a/server/src/chat/lookup.rs +++ b/server/src/chat/lookup.rs @@ -33,18 +33,14 @@ pub(crate) fn find_story_by_number( if let Some(items) = crate::crdt_state::read_all_items() { for item in items { if item.story_id().split('_').next().unwrap_or("") == number { + let stage_dir = item.stage().as_dir().to_string(); let path = project_root .join(".huskies") .join("work") - .join(item.stage_str()) + .join(&stage_dir) .join(format!("{}.md", item.story_id())); let content = crate::db::read_content(item.story_id()); - return Some(( - item.story_id().to_string(), - item.stage_str().to_string(), - path, - content, - )); + return Some((item.story_id().to_string(), stage_dir, path, content)); } } } @@ -58,8 +54,8 @@ pub(crate) fn find_story_by_number( continue; } let stage_dir = crate::crdt_state::read_item(&id) - .map(|v| v.stage_str().to_string()) - .unwrap_or_else(|| "1_backlog".to_string()); + .map(|v| v.stage().as_dir().to_string()) + .unwrap_or_else(|| "backlog".to_string()); let path = project_root .join(".huskies") .join("work") diff --git a/server/src/chat/transport/matrix/assign.rs b/server/src/chat/transport/matrix/assign.rs index 8dae27d6..152f150b 100644 --- a/server/src/chat/transport/matrix/assign.rs +++ b/server/src/chat/transport/matrix/assign.rs @@ -313,7 +313,7 @@ mod tests { None, ); // Seed CRDT so set_agent can write to the item. - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "9972_story_test", "1_backlog", Some("Test Feature"), @@ -369,7 +369,7 @@ mod tests { "---\nname: Small Story\n---\n", None, ); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "9973_story_small", "1_backlog", Some("Small Story"), @@ -420,7 +420,7 @@ mod tests { "---\nname: Existing\nagent: coder-sonnet\n---\n", None, ); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "9974_story_existing", "1_backlog", Some("Existing"), diff --git a/server/src/chat/transport/matrix/delete.rs b/server/src/chat/transport/matrix/delete.rs index 0ba49bdc..5a3e0ae8 100644 --- a/server/src/chat/transport/matrix/delete.rs +++ b/server/src/chat/transport/matrix/delete.rs @@ -112,7 +112,6 @@ fn stage_display_name(stage: &str) -> &str { Some(Stage::Done { .. }) => "done", Some(Stage::Archived { .. }) => "archived", Some(Stage::MergeFailure { .. }) => "merge-failure", - Some(Stage::Frozen { .. }) => "frozen", None => stage, } } @@ -239,7 +238,7 @@ mod tests { let story_number = "9977"; // Seed in CRDT. - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( story_id, "1_backlog", Some("CRDT Tombstone Check"), diff --git a/server/src/crdt_snapshot/tests.rs b/server/src/crdt_snapshot/tests.rs index 4ce553d3..ecc26239 100644 --- a/server/src/crdt_snapshot/tests.rs +++ b/server/src/crdt_snapshot/tests.rs @@ -231,7 +231,7 @@ fn snapshot_generation_includes_manifest() { crate::crdt_state::init_for_test(); // Write some items to populate ALL_OPS. - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "636_test_a", "1_backlog", Some("Test A"), @@ -243,7 +243,7 @@ fn snapshot_generation_includes_manifest() { None, None, ); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "636_test_b", "2_current", Some("Test B"), @@ -276,7 +276,7 @@ fn attribution_query_by_story_id() { crate::crdt_state::init_for_test(); init(); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "636_attrib_test", "1_backlog", Some("Attribution Test"), @@ -314,7 +314,7 @@ fn compaction_reduces_ops() { // Write several items. for i in 0..5 { - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( &format!("636_compact_{i}"), "1_backlog", Some(&format!("Item {i}")), @@ -353,7 +353,7 @@ fn latest_snapshot_available_after_compaction() { crate::crdt_state::init_for_test(); init(); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "636_latest_test", "1_backlog", Some("Latest Test"), @@ -626,7 +626,7 @@ fn attribution_preserved_after_compaction() { init(); // Write a story through its lifecycle. - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "636_archived_story", "1_backlog", Some("Archived Story"), @@ -638,7 +638,7 @@ fn attribution_preserved_after_compaction() { None, None, ); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "636_archived_story", "2_current", None, @@ -650,7 +650,7 @@ fn attribution_preserved_after_compaction() { None, None, ); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "636_archived_story", "6_archived", None, diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index b95e39ba..bc797a78 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -52,11 +52,15 @@ pub use types::{ TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem, }; pub use write::{ - bump_retry_count, migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id, - set_agent, set_blocked, set_depends_on, set_epic, set_item_type, set_mergemaster_attempted, - set_name, set_qa_mode, set_retry_count, set_review_hold, write_item, + bump_retry_count, migrate_legacy_stage_strings, migrate_names_from_slugs, + migrate_story_ids_to_numeric, name_from_story_id, set_agent, set_blocked, set_depends_on, + set_epic, set_frozen, set_item_type, set_mergemaster_attempted, set_name, set_qa_mode, + set_retry_count, set_review_hold, write_item, }; +#[cfg(test)] +pub use write::write_item_str; + #[cfg(test)] pub use state::init_for_test; diff --git a/server/src/crdt_state/ops.rs b/server/src/crdt_state/ops.rs index 30f7d233..a585ab2e 100644 --- a/server/src/crdt_state/ops.rs +++ b/server/src/crdt_state/ops.rs @@ -190,7 +190,7 @@ pub fn apply_remote_op(op: SignedOp) -> bool { mod tests { use super::super::state::init_for_test; use super::super::types::{NodePresenceCrdt, PipelineItemCrdt}; - use super::super::write::write_item; + use super::super::write::write_item_str; use super::*; use bft_json_crdt::json_crdt::OpState; use bft_json_crdt::keypair::make_keypair; @@ -542,7 +542,7 @@ mod tests { ); // Attempt resurrection via write_item — must be rejected by tombstone check. - write_item( + write_item_str( story_id, "1_backlog", Some("Resurrected"), diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index d2ba3c2d..d19c4d39 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -363,6 +363,11 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option None, }; + let frozen = match item.frozen.view() { + JsonValue::Bool(b) => Some(b), + _ => None, + }; + Some(PipelineItemView { story_id, stage, @@ -379,6 +384,7 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option, + /// Set to `true` when a story is frozen. Frozen stories stay at their + /// current `Stage` but are skipped by the auto-assigner until explicitly + /// unfrozen. Orthogonal to `Stage` (story 934, stage 4); replaces the + /// pre-934 `Stage::Frozen { resume_to }` variant whose resume payload was + /// just "the stage you were in when you froze". + pub frozen: LwwRegisterCrdt, } /// CRDT node that holds a single peer's presence entry. @@ -143,46 +149,56 @@ pub enum Stage { Done, /// Out of the active flow (`6_archived`). Archived, - /// Frozen, awaiting human review (`7_frozen`). - Frozen, /// An unrecognised stage string — forward-compatible catch-all. Unknown(String), } impl Stage { - /// Parse a stage directory string into the typed enum. + /// Parse a stage wire string into the typed enum. + /// + /// Accepts only the post-934 clean vocabulary (`"backlog"`, `"coding"`, + /// `"qa"`, `"merge"`, `"merge_failure"`, `"blocked"`, `"done"`, + /// `"archived"`, `"upcoming"`). Pre-934 directory-style strings + /// (`"2_current"`, `"4_merge"`, etc.) are no longer accepted — they are + /// rewritten at startup by `migrate_legacy_stage_strings`. pub fn from_dir(s: &str) -> Self { match s { - "0_upcoming" => Stage::Upcoming, - "1_backlog" => Stage::Backlog, - "2_current" => Stage::Coding, - "2_blocked" => Stage::Blocked, - "3_qa" => Stage::Qa, - "4_merge" => Stage::Merge, - "4_merge_failure" => Stage::MergeFailure, - "5_done" => Stage::Done, - "6_archived" => Stage::Archived, - "7_frozen" => Stage::Frozen, + "upcoming" => Stage::Upcoming, + "backlog" => Stage::Backlog, + "coding" => Stage::Coding, + "blocked" => Stage::Blocked, + "qa" => Stage::Qa, + "merge" => Stage::Merge, + "merge_failure" => Stage::MergeFailure, + "done" => Stage::Done, + "archived" => Stage::Archived, other => Stage::Unknown(other.to_string()), } } - /// Convert back to the filesystem directory name string. + /// Convert back to the wire string for persistence into the CRDT. + /// + /// Post-934: clean vocabulary (no numeric prefixes); the strings only + /// survive at this single CRDT-serialisation boundary. pub fn as_dir(&self) -> &str { match self { - Stage::Upcoming => "0_upcoming", - Stage::Backlog => "1_backlog", - Stage::Coding => "2_current", - Stage::Blocked => "2_blocked", - Stage::Qa => "3_qa", - Stage::Merge => "4_merge", - Stage::MergeFailure => "4_merge_failure", - Stage::Done => "5_done", - Stage::Archived => "6_archived", - Stage::Frozen => "7_frozen", + Stage::Upcoming => "upcoming", + Stage::Backlog => "backlog", + Stage::Coding => "coding", + Stage::Blocked => "blocked", + Stage::Qa => "qa", + Stage::Merge => "merge", + Stage::MergeFailure => "merge_failure", + Stage::Done => "done", + Stage::Archived => "archived", Stage::Unknown(s) => s.as_str(), } } + + /// `true` if this is an "active" stage (`Coding`, `Qa`, or `Merge`). + pub fn is_active(&self) -> bool { + matches!(self, Stage::Coding | Stage::Qa | Stage::Merge) + } } /// A typed snapshot of a single pipeline work item derived from the CRDT document. @@ -219,6 +235,8 @@ pub struct WorkItem { pub(super) item_type: Option, /// Epic ID this item belongs to, or `None` (sub-story 933). pub(super) epic: Option, + /// Whether the item is frozen (story 934, stage 4). Orthogonal to `Stage`. + pub(super) frozen: Option, } impl WorkItem { @@ -232,11 +250,6 @@ impl WorkItem { Stage::from_dir(&self.stage) } - /// Raw stage directory string (e.g. `"2_current"`). - pub fn stage_str(&self) -> &str { - &self.stage - } - /// Human-readable story name, or `None` when unset. pub fn name(&self) -> Option<&str> { self.name.as_deref() @@ -303,6 +316,13 @@ impl WorkItem { self.epic.as_deref() } + /// Whether the item is frozen (story 934, stage 4). Returns `false` when + /// the register is unset. Orthogonal to [`Self::stage`]: a frozen story + /// stays at its current stage but is skipped by the auto-assigner. + pub fn frozen(&self) -> bool { + self.frozen.unwrap_or(false) + } + /// Construct a `WorkItem` for use in tests outside `crdt_state::*`. /// /// Within `crdt_state` use a struct literal directly (fields are `pub(super)`). @@ -325,6 +345,7 @@ impl WorkItem { review_hold: Option, item_type: Option, epic: Option, + frozen: Option, ) -> Self { Self { story_id: story_id.into(), @@ -342,6 +363,7 @@ impl WorkItem { review_hold, item_type, epic, + frozen, } } } diff --git a/server/src/crdt_state/write/item.rs b/server/src/crdt_state/write/item.rs index a5aa32ad..3015300f 100644 --- a/server/src/crdt_state/write/item.rs +++ b/server/src/crdt_state/write/item.rs @@ -11,6 +11,7 @@ use serde_json::json; use super::super::state::{apply_and_persist, emit_event, get_crdt, rebuild_index}; use super::super::types::CrdtEvent; use crate::io::story_metadata::QaMode; +use crate::pipeline_state::{Stage, stage_dir_name}; /// Set the typed `depends_on` CRDT register for a pipeline item. /// @@ -103,6 +104,28 @@ pub fn set_review_hold(story_id: &str, value: bool) -> bool { true } +/// Set the `frozen` CRDT flag for a pipeline item (story 934, stage 4). +/// +/// `true` freezes the story at its current `Stage` — the auto-assigner skips +/// it but the stage register is untouched. `false` unfreezes; the story +/// remains at its current stage and resumes auto-assignment. Both writes +/// are explicit (not removals) so the cleared state survives CRDT replay. +/// +/// Returns `true` if the item was found and the op was applied, `false` otherwise. +pub fn set_frozen(story_id: &str, value: bool) -> bool { + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + let Some(&idx) = state.index.get(story_id) else { + return false; + }; + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].frozen.set(value)); + true +} + /// Set the `mergemaster_attempted` CRDT flag for a pipeline item. /// /// Passing `true` records that a mergemaster session has been spawned for this @@ -211,10 +234,13 @@ pub fn set_qa_mode(story_id: &str, mode: Option) -> bool { /// /// When the stage changes (or a new item is created), a [`CrdtEvent`] is /// broadcast so subscribers can react to the transition. +/// +/// `stage` is the typed pipeline state; it is serialised to the canonical +/// clean wire form (story 934) via [`stage_dir_name`] at the CRDT boundary. #[allow(clippy::too_many_arguments)] pub fn write_item( story_id: &str, - stage: &str, + stage: &Stage, name: Option<&str>, agent: Option<&str>, retry_count: Option, @@ -224,6 +250,7 @@ pub fn write_item( claimed_at: Option, merged_at: Option, ) { + let stage_str = stage_dir_name(stage); let Some(state_mutex) = get_crdt() else { return; }; @@ -247,7 +274,7 @@ pub fn write_item( // Update existing item registers. apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].stage.set(stage.to_string()) + s.crdt.doc.items[idx].stage.set(stage_str.to_string()) }); if let Some(n) = name { @@ -286,7 +313,7 @@ pub fn write_item( } // Broadcast a CrdtEvent if the stage actually changed. - let stage_changed = old_stage.as_deref() != Some(stage); + let stage_changed = old_stage.as_deref() != Some(stage_str); if stage_changed { // Read the current name from the CRDT document for the event. let current_name = match state.crdt.doc.items[idx].name.view() { @@ -296,7 +323,7 @@ pub fn write_item( emit_event(CrdtEvent { story_id: story_id.to_string(), from_stage: old_stage, - to_stage: stage.to_string(), + to_stage: stage_str.to_string(), name: current_name, }); } @@ -304,7 +331,7 @@ pub fn write_item( // Insert new item. let item_json: JsonValue = json!({ "story_id": story_id, - "stage": stage, + "stage": stage_str, "name": name.unwrap_or(""), "agent": agent.unwrap_or(""), "retry_count": retry_count.unwrap_or(0) as f64, @@ -318,6 +345,7 @@ pub fn write_item( "review_hold": false, "item_type": "", "epic": "", + "frozen": false, }) .into(); @@ -348,18 +376,74 @@ pub fn write_item( item.review_hold.advance_seq(floor); item.item_type.advance_seq(floor); item.epic.advance_seq(floor); + item.frozen.advance_seq(floor); } // Broadcast a CrdtEvent for the new item. emit_event(CrdtEvent { story_id: story_id.to_string(), from_stage: None, - to_stage: stage.to_string(), + to_stage: stage_str.to_string(), name: name.map(String::from), }); } } +/// Test-only convenience that parses a wire-form stage string and forwards +/// to [`write_item`]. Existing tests seed CRDT items with legacy directory +/// strings (`"2_current"`, `"4_merge"`, etc.) — this shim keeps that idiom +/// working without forcing every test to construct typed `Stage` payloads. +/// +/// Stages are normalised through [`Stage::from_dir`]: unknown strings cause +/// the write to be skipped (with a log line). +#[cfg(test)] +#[allow(clippy::too_many_arguments)] +pub fn write_item_str( + story_id: &str, + stage: &str, + name: Option<&str>, + agent: Option<&str>, + retry_count: Option, + blocked: Option, + depends_on: Option<&str>, + claimed_by: Option<&str>, + claimed_at: Option, + merged_at: Option, +) { + // Normalise pre-934 directory-style strings to clean wire form so + // existing test fixtures keep working after stage 6 dropped the legacy + // aliases from `Stage::from_dir`. See `db::ops::normalise_stage_str` + // for the user-facing equivalent on the db boundary. + let normalised = match stage { + "0_upcoming" => "upcoming", + "1_backlog" => "backlog", + "2_current" => "coding", + "2_blocked" => "blocked", + "3_qa" => "qa", + "4_merge" => "merge", + "4_merge_failure" => "merge_failure", + "5_done" => "done", + "6_archived" => "archived", + other => other, + }; + let Some(typed) = Stage::from_dir(normalised) else { + crate::slog!("[crdt_state] write_item_str: unknown stage '{stage}' for {story_id}"); + return; + }; + write_item( + story_id, + &typed, + name, + agent, + retry_count, + blocked, + depends_on, + claimed_by, + claimed_at, + merged_at, + ); +} + /// Set `retry_count` to an explicit value for a pipeline item. /// /// Pure metadata operation — the item's stage is not changed. diff --git a/server/src/crdt_state/write/migrations.rs b/server/src/crdt_state/write/migrations.rs index 7bcd6f14..f94777aa 100644 --- a/server/src/crdt_state/write/migrations.rs +++ b/server/src/crdt_state/write/migrations.rs @@ -181,3 +181,88 @@ pub fn migrate_names_from_slugs() { } slog!("[crdt] Migrated names for {count} items from story ID slugs"); } + +/// Map a pre-934 legacy directory-style stage string to its clean wire form. +/// +/// Returns `None` if `s` is already in clean wire form (or is genuinely +/// unknown), so the migration can quickly skip already-clean items. +fn legacy_stage_to_clean(s: &str) -> Option<&'static str> { + match s { + "0_upcoming" => Some("upcoming"), + "1_backlog" => Some("backlog"), + "2_current" => Some("coding"), + "2_blocked" => Some("blocked"), + "3_qa" => Some("qa"), + "4_merge" => Some("merge"), + "4_merge_failure" => Some("merge_failure"), + "5_done" => Some("done"), + "6_archived" => Some("archived"), + // Story 934, stage 4: `Stage::Frozen` no longer exists. Items that + // were previously frozen become orthogonal-flag-frozen: their stage + // register collapses to `backlog` (a safe "not progressing" default + // since the original resume_to payload was lost when the variant was + // dropped) and a separate write sets `frozen = true`. + "7_frozen" => Some("backlog"), + _ => None, + } +} + +/// Rewrite every pipeline item whose `stage` register still carries a pre-934 +/// directory-style string (`"2_current"`, `"4_merge"`, etc.) to the clean wire +/// vocabulary (`"coding"`, `"merge"`, etc.). +/// +/// Items that were at `"7_frozen"` additionally get the new `frozen` flag set +/// — the stage variant `Frozen` was dropped in story 934 stage 4 in favour of +/// an orthogonal CRDT register. +/// +/// One-time startup migration: items that have transitioned at least once +/// since story 934 stage 1 (which made writes emit clean form) are no-ops. +pub fn migrate_legacy_stage_strings() { + let Some(state_mutex) = get_crdt() else { + return; + }; + + // First pass: collect (index, clean_stage, set_frozen) for items that + // still carry legacy stage strings. + let migrations: Vec<(usize, &'static str, bool)> = { + let Ok(state) = state_mutex.lock() else { + return; + }; + state + .index + .iter() + .filter_map(|(_story_id, &idx)| { + let item = &state.crdt.doc.items[idx]; + let current = match item.stage.view() { + JsonValue::String(s) => s, + _ => return None, + }; + let clean = legacy_stage_to_clean(¤t)?; + let was_frozen = current == "7_frozen"; + Some((idx, clean, was_frozen)) + }) + .collect() + }; + + if migrations.is_empty() { + return; + } + + let Ok(mut state) = state_mutex.lock() else { + return; + }; + let count = migrations.len(); + let frozen_count = migrations.iter().filter(|(_, _, f)| *f).count(); + for (idx, clean, was_frozen) in migrations { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].stage.set(clean.to_string()) + }); + if was_frozen { + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].frozen.set(true)); + } + } + slog!( + "[crdt] Migrated {count} legacy stage strings to clean wire form \ + ({frozen_count} of which were '7_frozen' → backlog + frozen=true)" + ); +} diff --git a/server/src/crdt_state/write/mod.rs b/server/src/crdt_state/write/mod.rs index 98a1122d..e45ad42a 100644 --- a/server/src/crdt_state/write/mod.rs +++ b/server/src/crdt_state/write/mod.rs @@ -10,7 +10,13 @@ mod migrations; mod tests; pub use item::{ - bump_retry_count, set_agent, set_blocked, set_depends_on, set_epic, set_item_type, + bump_retry_count, set_agent, set_blocked, set_depends_on, set_epic, set_frozen, set_item_type, set_mergemaster_attempted, set_name, set_qa_mode, set_retry_count, set_review_hold, write_item, }; -pub use migrations::{migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id}; + +#[cfg(test)] +pub use item::write_item_str; +pub use migrations::{ + migrate_legacy_stage_strings, migrate_names_from_slugs, migrate_story_ids_to_numeric, + name_from_story_id, +}; diff --git a/server/src/crdt_state/write/tests.rs b/server/src/crdt_state/write/tests.rs index aa8129ef..f1dfbe9d 100644 --- a/server/src/crdt_state/write/tests.rs +++ b/server/src/crdt_state/write/tests.rs @@ -90,7 +90,7 @@ fn numeric_id_from_slug_returns_none_for_non_numeric_prefix() { fn migrate_story_ids_to_numeric_rewrites_slug_ids() { init_for_test(); - write_item( + write_item_str( "42_story_my_feature", "1_backlog", Some("My Feature"), @@ -123,7 +123,7 @@ fn migrate_story_ids_to_numeric_rewrites_slug_ids() { fn migrate_story_ids_to_numeric_is_idempotent() { init_for_test(); - write_item( + write_item_str( "43", "1_backlog", Some("Already Numeric"), @@ -153,7 +153,7 @@ fn migrate_story_ids_to_numeric_skips_conflict() { init_for_test(); // Both the slug form AND its numeric target exist. - write_item( + write_item_str( "44_story_foo", "1_backlog", None, @@ -165,7 +165,7 @@ fn migrate_story_ids_to_numeric_skips_conflict() { None, None, ); - write_item( + write_item_str( "44", "2_current", None, @@ -200,7 +200,7 @@ fn migrate_story_ids_to_numeric_noop_when_crdt_not_initialised() { fn migrate_story_ids_to_numeric_preserves_stage_and_name() { init_for_test(); - write_item( + write_item_str( "45_bug_crash", "2_current", Some("Crash Bug"), @@ -216,7 +216,7 @@ fn migrate_story_ids_to_numeric_preserves_stage_and_name() { migrate_story_ids_to_numeric(); let item = read_item("45").expect("item must be accessible by numeric ID"); - assert_eq!(item.stage, "2_current"); + assert_eq!(item.stage, "coding"); assert_eq!(item.name.as_deref(), Some("Crash Bug")); assert_eq!(item.agent.as_deref(), Some("coder-1")); } @@ -226,7 +226,7 @@ fn migrate_names_from_slugs_fills_empty_names() { init_for_test(); // Write an item without a name. - write_item( + write_item_str( "42_story_my_feature", "1_backlog", None, @@ -261,7 +261,7 @@ fn migrate_names_from_slugs_fills_empty_names() { fn migrate_names_from_slugs_leaves_existing_names_unchanged() { init_for_test(); - write_item( + write_item_str( "43_story_named_item", "1_backlog", Some("Already Named"), @@ -299,7 +299,7 @@ fn set_depends_on_round_trip_and_clear() { use super::super::read::{check_unmet_deps_crdt, read_item}; init_for_test(); - write_item( + write_item_str( "872_test_target", "1_backlog", Some("Target"), @@ -355,7 +355,7 @@ fn set_depends_on_returns_false_for_unknown_story() { fn set_mergemaster_attempted_true_then_false_flips_register() { init_for_test(); - write_item( + write_item_str( "873_story_mergemaster_flip", "4_merge", None, @@ -411,7 +411,7 @@ fn set_mergemaster_attempted_returns_false_for_unknown_story() { fn set_agent_some_writes_name() { init_for_test(); - write_item( + write_item_str( "871_story_set_agent_write", "2_current", Some("Set Agent Write"), @@ -439,7 +439,7 @@ fn set_agent_some_writes_name() { fn set_agent_none_clears_register() { init_for_test(); - write_item( + write_item_str( "871_story_set_agent_clear", "2_current", Some("Set Agent Clear"), @@ -485,7 +485,7 @@ fn set_qa_mode_round_trip_server_then_human() { use crate::io::story_metadata::QaMode; init_for_test(); - write_item( + write_item_str( "869_story_qa_roundtrip", "1_backlog", None, @@ -541,7 +541,7 @@ fn set_qa_mode_returns_false_for_unknown_story() { #[test] fn bump_retry_count_increments_by_one() { init_for_test(); - write_item( + write_item_str( "9001_story_bump_test", "2_current", None, @@ -571,7 +571,7 @@ fn bump_retry_count_increments_by_one() { #[test] fn set_retry_count_resets_to_zero() { init_for_test(); - write_item( + write_item_str( "9002_story_set_test", "2_current", None, @@ -755,7 +755,7 @@ async fn tombstone_survives_concurrent_writes() { let story_id = "889_story_tombstone_concurrent"; - write_item( + write_item_str( story_id, "2_current", Some("Tombstone Concurrent Test"), @@ -777,7 +777,7 @@ async fn tombstone_survives_concurrent_writes() { let writer = tokio::task::spawn(async move { while !stop_clone.load(Ordering::Relaxed) { - write_item( + write_item_str( story_id, "2_current", Some("Tombstone Concurrent Test"), diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 50a15fe8..915601c3 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -328,7 +328,7 @@ mod tests { write_item_with_content(story_id, "2_current", content, meta); let view = crate::crdt_state::read_item(story_id).expect("story exists in CRDT"); - assert_eq!(view.stage_str(), "2_current"); + assert_eq!(view.stage().as_dir(), "coding"); assert_eq!(view.name(), Some("Typed Name")); assert_eq!(view.agent(), Some("coder-1")); assert_eq!(view.retry_count(), 2); @@ -353,7 +353,7 @@ mod tests { write_item_with_content(story_id, "2_current", content, ItemMeta::default()); let view = crate::crdt_state::read_item(story_id).expect("story exists in CRDT"); - assert_eq!(view.stage_str(), "2_current"); + assert_eq!(view.stage().as_dir(), "coding"); assert_eq!( view.name(), None, @@ -406,7 +406,7 @@ mod tests { // Seed the story in 2_current with retry_count = 3 (a coder that // burned all its retries). - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( story_id, "2_current", Some("Retry reset test"), @@ -433,7 +433,7 @@ mod tests { let typed_after = crate::pipeline_state::read_typed(story_id) .expect("read should succeed") .expect("story exists in CRDT"); - assert_eq!(typed_after.stage.dir_name(), "4_merge"); + assert_eq!(typed_after.stage.dir_name(), "merge"); assert_eq!( typed_after.retry_count, 0, "retry_count must reset to 0 on stage transition" diff --git a/server/src/db/ops.rs b/server/src/db/ops.rs index f1de730a..4e56c557 100644 --- a/server/src/db/ops.rs +++ b/server/src/db/ops.rs @@ -33,6 +33,33 @@ impl ItemMeta { } } +/// Normalise a stage string at the db boundary. +/// +/// Accepts the clean post-934 vocabulary (passthrough) and the pre-934 +/// directory-style strings (`"2_current"`, `"4_merge"`, etc.) by mapping +/// them to the clean form before handing off to `Stage::from_dir` (which +/// itself only accepts clean form after stage 6). This keeps the public +/// db API tolerant for callers that still pass legacy strings while the +/// internal type stays strict. +fn normalise_stage_str(stage: &str) -> &str { + match stage { + "0_upcoming" => "upcoming", + "1_backlog" => "backlog", + "2_current" => "coding", + "2_blocked" => "blocked", + "3_qa" => "qa", + "4_merge" => "merge", + "4_merge_failure" => "merge_failure", + "5_done" => "done", + "6_archived" => "archived", + // `7_frozen` has no direct clean equivalent (the variant was + // removed in story 934 stage 4). Returning the unmapped string + // makes `Stage::from_dir` return None, so the write is logged and + // skipped — frozen items should be seeded via the `frozen` flag. + other => other, + } +} + /// Write a pipeline item from in-memory content (no filesystem access). /// /// This is the primary write path for the DB-backed pipeline. It updates @@ -52,16 +79,18 @@ pub fn write_item_with_content(story_id: &str, stage: &str, content: &str, meta: write_content(story_id, content); // Primary: CRDT ops. - let merged_at_ts = if crate::pipeline_state::Stage::from_dir(stage) - .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. })) - { - Some(chrono::Utc::now().timestamp() as f64) - } else { - None + let stage = normalise_stage_str(stage); + let Some(typed_stage) = crate::pipeline_state::Stage::from_dir(stage) else { + crate::slog!( + "[db] write_item_with_content: unknown stage '{stage}' for {story_id}; skipping CRDT write" + ); + return; }; + let merged_at_ts = matches!(typed_stage, crate::pipeline_state::Stage::Done { .. }) + .then(|| chrono::Utc::now().timestamp() as f64); crate::crdt_state::write_item( story_id, - stage, + &typed_stage, meta.name.as_deref(), meta.agent.as_deref(), meta.retry_count, @@ -114,16 +143,18 @@ pub fn move_item_stage( // CRDT typed registers — no need to re-derive it from the content body's // YAML front matter on every stage transition. Pass `None` for those // fields so write_item leaves the existing registers untouched. - let merged_at_ts = if crate::pipeline_state::Stage::from_dir(new_stage) - .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. })) - { - Some(chrono::Utc::now().timestamp() as f64) - } else { - None + let new_stage = normalise_stage_str(new_stage); + let Some(typed_stage) = crate::pipeline_state::Stage::from_dir(new_stage) else { + crate::slog!( + "[db] move_item_stage: unknown stage '{new_stage}' for {story_id}; skipping CRDT write" + ); + return; }; + let merged_at_ts = matches!(typed_stage, crate::pipeline_state::Stage::Done { .. }) + .then(|| chrono::Utc::now().timestamp() as f64); crate::crdt_state::write_item( story_id, - new_stage, + &typed_stage, None, None, None, diff --git a/server/src/http/agents/tests.rs b/server/src/http/agents/tests.rs index 6e8017c5..e1fcf885 100644 --- a/server/src/http/agents/tests.rs +++ b/server/src/http/agents/tests.rs @@ -275,7 +275,7 @@ async fn get_work_item_content_returns_content_from_backlog() { ) .unwrap(); // Story 929: name lives in the typed CRDT register, not in YAML on disk. - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "42_story_foo", "1_backlog", Some("Foo Story"), @@ -310,7 +310,7 @@ async fn get_work_item_content_returns_content_from_current() { "---\nname: \"Bar Story\"\n---\n\nBar content.", ) .unwrap(); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "43_story_bar", "2_current", Some("Bar Story"), diff --git a/server/src/http/mcp/merge_tools.rs b/server/src/http/mcp/merge_tools.rs index c7d18d01..4a9983c9 100644 --- a/server/src/http/mcp/merge_tools.rs +++ b/server/src/http/mcp/merge_tools.rs @@ -17,21 +17,18 @@ pub(super) async fn tool_merge_agent_work( // Check CRDT stage before attempting merge — if already done or archived, // return success immediately to avoid spurious error notifications. if let Some(item) = crate::crdt_state::read_item(story_id) - && crate::pipeline_state::Stage::from_dir(item.stage_str()).is_some_and(|s| { - matches!( - s, - crate::pipeline_state::Stage::Done { .. } - | crate::pipeline_state::Stage::Archived { .. } - ) - }) + && matches!( + item.stage(), + crate::crdt_state::Stage::Done | crate::crdt_state::Stage::Archived + ) { + let stage_name = item.stage().as_dir().to_string(); return serde_json::to_string_pretty(&json!({ "story_id": story_id, "status": "completed", "success": true, "message": format!( - "Story '{}' is already in '{}' — no merge needed.", - story_id, item.stage_str() + "Story '{story_id}' is already in '{stage_name}' — no merge needed.", ), })) .map_err(|e| format!("Serialization error: {e}")); @@ -283,7 +280,7 @@ mod tests { #[tokio::test] async fn tool_merge_agent_work_already_done_returns_success() { crate::crdt_state::init_for_test(); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "99_story_already_done", "5_done", Some("Already done story"), @@ -304,13 +301,13 @@ mod tests { let v: serde_json::Value = serde_json::from_str(&body).unwrap(); assert_eq!(v["status"], "completed"); assert_eq!(v["success"], true); - assert!(v["message"].as_str().unwrap().contains("5_done")); + assert!(v["message"].as_str().unwrap().contains("done")); } #[tokio::test] async fn tool_merge_agent_work_already_archived_returns_success() { crate::crdt_state::init_for_test(); - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "98_story_already_archived", "6_archived", Some("Already archived story"), @@ -331,7 +328,7 @@ mod tests { let v: serde_json::Value = serde_json::from_str(&body).unwrap(); assert_eq!(v["status"], "completed"); assert_eq!(v["success"], true); - assert!(v["message"].as_str().unwrap().contains("6_archived")); + assert!(v["message"].as_str().unwrap().contains("archived")); } #[tokio::test] diff --git a/server/src/http/mcp/status_tools.rs b/server/src/http/mcp/status_tools.rs index 07722072..694c7dc3 100644 --- a/server/src/http/mcp/status_tools.rs +++ b/server/src/http/mcp/status_tools.rs @@ -158,16 +158,16 @@ pub(super) async fn tool_status(args: &Value, ctx: &AppContext) -> Result Result "backlog", - Stage::Coding => "current", - Stage::Qa => "qa", - Stage::Merge { .. } => "merge", - Stage::Done { .. } => "done", - Stage::Archived { .. } => "archived", - Stage::MergeFailure { .. } => "merge_failure", - Stage::Frozen { .. } => "frozen", - Stage::Blocked { .. } => "blocked", + // Frozen is now an orthogonal CRDT flag (story 934, stage 4). + let stage_name = if member_view.frozen() { + "frozen" + } else { + match &item.stage { + Stage::Upcoming | Stage::Backlog => "backlog", + Stage::Coding => "current", + Stage::Qa => "qa", + Stage::Merge { .. } => "merge", + Stage::Done { .. } => "done", + Stage::Archived { .. } => "archived", + Stage::MergeFailure { .. } => "merge_failure", + Stage::Blocked { .. } => "blocked", + } }; member_items.push(json!({ "story_id": sid, diff --git a/server/src/http/mcp/story_tools/story/freeze.rs b/server/src/http/mcp/story_tools/story/freeze.rs index 7559c34a..db295f06 100644 --- a/server/src/http/mcp/story_tools/story/freeze.rs +++ b/server/src/http/mcp/story_tools/story/freeze.rs @@ -75,10 +75,7 @@ mod tests { let item = crate::pipeline_state::read_typed(story_id) .expect("read_typed should succeed") .expect("item should be present"); - assert!( - item.stage.is_frozen(), - "stage should be frozen after MCP freeze" - ); + assert!(item.is_frozen(), "stage should be frozen after MCP freeze"); } #[test] @@ -109,7 +106,7 @@ mod tests { .expect("read_typed should succeed") .expect("item should be present"); assert!( - !item.stage.is_frozen(), + !item.is_frozen(), "stage should not be frozen after MCP unfreeze" ); } diff --git a/server/src/http/mcp/story_tools/story/update.rs b/server/src/http/mcp/story_tools/story/update.rs index 90e5e3d3..8f3a447c 100644 --- a/server/src/http/mcp/story_tools/story/update.rs +++ b/server/src/http/mcp/story_tools/story/update.rs @@ -71,12 +71,23 @@ pub(crate) fn tool_update_story(args: &Value, ctx: &AppContext) -> Result { - if let Some(b) = value.as_bool() { - crate::crdt_state::set_blocked(story_id, b); - } else if value.as_str() == Some("true") { - crate::crdt_state::set_blocked(story_id, true); - } else if value.as_str() == Some("false") { - crate::crdt_state::set_blocked(story_id, false); + // Story 934, stage 5: blocked is now a stage transition, + // not a raw register write. Route through the state + // machine so invalid sources (Done/Archived/Upcoming) are + // rejected and downstream subscribers see a TransitionFired. + let want_blocked = match value { + Value::Bool(b) => Some(*b), + Value::String(s) if s == "true" => Some(true), + Value::String(s) if s == "false" => Some(false), + _ => None, + }; + if let Some(true) = want_blocked { + crate::agents::lifecycle::transition_to_blocked( + story_id, + "Set via update_story", + )?; + } else if let Some(false) = want_blocked { + crate::agents::lifecycle::transition_to_unblocked(story_id)?; } } "review_hold" => { @@ -88,6 +99,24 @@ pub(crate) fn tool_update_story(args: &Value, ctx: &AppContext) -> Result { + // Story 934, stage 4: frozen is an orthogonal CRDT flag. + // Route through the state-machine API so callers see the + // canonical NotFound behaviour if the story is missing. + let want_frozen = match value { + Value::Bool(b) => Some(*b), + Value::String(s) if s == "true" => Some(true), + Value::String(s) if s == "false" => Some(false), + _ => None, + }; + match want_frozen { + Some(true) => crate::pipeline_state::transition_to_frozen(story_id) + .map_err(|e| e.to_string())?, + Some(false) => crate::pipeline_state::transition_to_unfrozen(story_id) + .map_err(|e| e.to_string())?, + None => {} + } + } "retry_count" => { let n = value .as_i64() @@ -105,7 +134,8 @@ pub(crate) fn tool_update_story(args: &Value, ctx: &AppContext) -> Result Result { }, epic_id, }; + // Frozen items (CRDT flag) are routed to the backlog bucket regardless + // of their underlying stage — they're paused, not progressing. + if item.is_frozen() { + state.backlog.push(story); + continue; + } match &item.stage { Stage::Upcoming => state.backlog.push(story), // upcoming shown with backlog Stage::Backlog => state.backlog.push(story), @@ -147,7 +153,6 @@ pub fn load_pipeline_state(ctx: &AppContext) -> Result { Stage::MergeFailure { .. } => state.merge.push(story), // show merge failures with merge Stage::Done { .. } => state.done.push(story), Stage::Archived { .. } => {} // skip archived - Stage::Frozen { .. } => state.backlog.push(story), // show frozen with backlog } } diff --git a/server/src/io/story_metadata/parser.rs b/server/src/io/story_metadata/parser.rs index 204fdca9..21b32ddb 100644 --- a/server/src/io/story_metadata/parser.rs +++ b/server/src/io/story_metadata/parser.rs @@ -29,15 +29,14 @@ pub fn resolve_qa_mode(story_id: &str, default: QaMode) -> QaMode { .unwrap_or(default) } -/// Return `true` if the story is in the `Frozen` pipeline stage. +/// Return `true` if the story's `frozen` CRDT flag is set (story 934, stage 4). /// -/// Checks the typed CRDT stage via `read_typed`. Used by the pipeline advance -/// code to suppress stage transitions for frozen stories. +/// Used by the pipeline advance code to suppress stage transitions for frozen +/// stories. `frozen` is orthogonal to [`Stage`]: a frozen story still has its +/// stage register intact but is paused until unfrozen. pub fn is_story_frozen_in_store(story_id: &str) -> bool { - crate::pipeline_state::read_typed(story_id) - .ok() - .flatten() - .map(|item| item.stage.is_frozen()) + crate::crdt_state::read_item(story_id) + .map(|view| view.frozen()) .unwrap_or(false) } diff --git a/server/src/io/watcher/mod.rs b/server/src/io/watcher/mod.rs index c4da8209..96bbcd2d 100644 --- a/server/src/io/watcher/mod.rs +++ b/server/src/io/watcher/mod.rs @@ -60,7 +60,6 @@ pub fn stage_metadata(stage: &str, item_id: &str) -> Option<(&'static str, Strin } Stage::Done { .. } => ("done", format!("huskies: done {item_id}")), Stage::Archived { .. } => ("accept", format!("huskies: accept {item_id}")), - Stage::Frozen { .. } => ("freeze", format!("huskies: freeze {item_id}")), }; Some((action, msg)) } diff --git a/server/src/io/watcher/sweep.rs b/server/src/io/watcher/sweep.rs index e3136510..b8a7f0bd 100644 --- a/server/src/io/watcher/sweep.rs +++ b/server/src/io/watcher/sweep.rs @@ -1,18 +1,21 @@ -//! Periodic sweep of completed work items from `5_done` to `6_archived`. +//! Periodic sweep of completed work items from `done` to `archived`. //! -//! Items that have been in `5_done` for longer than the configured retention -//! period are automatically promoted to `6_archived` via CRDT state transitions. +//! Items in `Stage::Done` whose `merged_at` timestamp exceeds the configured +//! retention duration are promoted to `Stage::Archived` via the canonical +//! pipeline state machine (story 934, stage 5). use crate::slog; use std::time::Duration; -/// Sweep items in `5_done` whose `merged_at` timestamp exceeds the retention -/// duration to `6_archived` via CRDT state transitions. +/// Sweep items in `Stage::Done` whose `merged_at` timestamp exceeds the +/// retention duration to `Stage::Archived` via the typed transition table. /// -/// All state is read from and written to CRDT — no filesystem access. -/// Worktree pruning is handled separately by the CRDT event subscriber. +/// Routes through [`crate::pipeline_state::apply_transition`] so the +/// `Done + Accepted → Archived` transition is validated and a +/// `TransitionFired` event is emitted to subscribers (worktree pruning, +/// matrix notifier, etc.). pub(crate) fn sweep_done_to_archived(done_retention: Duration) { - use crate::pipeline_state::{PipelineEvent, Stage, read_all_typed, stage_dir_name, transition}; + use crate::pipeline_state::{PipelineEvent, Stage, apply_transition, read_all_typed}; for item in read_all_typed() { if let Stage::Done { merged_at, .. } = &item.stage { @@ -22,24 +25,10 @@ pub(crate) fn sweep_done_to_archived(done_retention: Duration) { .unwrap_or_default(); if age >= done_retention { let story_id = item.story_id.0.clone(); - match transition(item.stage.clone(), PipelineEvent::Accepted) { - Ok(new_stage) => { - crate::crdt_state::write_item( - &story_id, - stage_dir_name(&new_stage), - None, - None, - None, - Some(false), - None, - None, - None, - None, - ); - slog!("[watcher] sweep: promoted {story_id} → 6_archived/"); - } + match apply_transition(&story_id, PipelineEvent::Accepted, None) { + Ok(_) => slog!("[watcher] sweep: promoted {story_id} → archived"), Err(e) => { - slog!("[watcher] sweep: transition error for {story_id}: {e}"); + slog!("[watcher] sweep: transition error for {story_id}: {e}") } } } diff --git a/server/src/io/watcher/tests.rs b/server/src/io/watcher/tests.rs index 518d19d8..644674ca 100644 --- a/server/src/io/watcher/tests.rs +++ b/server/src/io/watcher/tests.rs @@ -51,15 +51,15 @@ fn is_config_file_rejects_wrong_root() { #[test] fn stage_metadata_returns_correct_actions() { - let (action, msg) = stage_metadata("2_current", "42_story_foo").unwrap(); + let (action, msg) = stage_metadata("coding", "42_story_foo").unwrap(); assert_eq!(action, "start"); assert_eq!(msg, "huskies: start 42_story_foo"); - let (action, msg) = stage_metadata("5_done", "42_story_foo").unwrap(); + let (action, msg) = stage_metadata("done", "42_story_foo").unwrap(); assert_eq!(action, "done"); assert_eq!(msg, "huskies: done 42_story_foo"); - let (action, msg) = stage_metadata("6_archived", "42_story_foo").unwrap(); + let (action, msg) = stage_metadata("archived", "42_story_foo").unwrap(); assert_eq!(action, "accept"); assert_eq!(msg, "huskies: accept 42_story_foo"); @@ -157,7 +157,7 @@ fn sweep_uses_crdt_merged_at_not_utc_now() { let ten_seconds_ago = (chrono::Utc::now() - chrono::Duration::seconds(10)).timestamp() as f64; // Write item in 5_done with an explicit past merged_at timestamp. - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "9883_story_sweep_merged_at", "5_done", Some("merged_at test"), @@ -190,7 +190,7 @@ fn sweep_keeps_item_newer_than_retention() { let one_second_ago = (chrono::Utc::now() - chrono::Duration::seconds(1)).timestamp() as f64; - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "9884_story_sweep_recent", "5_done", Some("recent merged_at test"), diff --git a/server/src/pipeline_state/apply.rs b/server/src/pipeline_state/apply.rs index 429d4d5a..4ebbbe48 100644 --- a/server/src/pipeline_state/apply.rs +++ b/server/src/pipeline_state/apply.rs @@ -98,20 +98,33 @@ pub fn apply_transition_str( apply_transition(story_id, event, content_transform).map_err(|e| e.to_string()) } -/// Freeze a story at its current stage. +/// Freeze a story. /// -/// Story 929: the YAML write of `resume_to_stage` is gone; the projection -/// layer no longer reads it (defaults to Coding). Story 934 will make -/// frozen a flag orthogonal to Stage, so the story stays in its current -/// Stage rather than encoding a "where to resume" payload — at which point -/// the read-side default also becomes moot. -pub fn transition_to_frozen(story_id: &str) -> Result { - apply_transition(story_id, PipelineEvent::Freeze, None) +/// Story 934, stage 4: `frozen` is now a CRDT flag orthogonal to [`Stage`], +/// so the story stays at its current stage and only the boolean register +/// changes. Returns `Err(NotFound)` if no item exists for `story_id`. +pub fn transition_to_frozen(story_id: &str) -> Result<(), ApplyError> { + if read_typed(story_id)?.is_none() { + return Err(ApplyError::NotFound(story_id.to_string())); + } + crate::crdt_state::set_frozen(story_id, true); + crate::slog!("[pipeline/transition] #{}: Freeze (flag set)", story_id); + Ok(()) } /// Unfreeze a story. /// -/// Story 929: paired with `transition_to_frozen`, no longer touches YAML. -pub fn transition_to_unfrozen(story_id: &str) -> Result { - apply_transition(story_id, PipelineEvent::Unfreeze, None) +/// Story 934, stage 4: paired with [`transition_to_frozen`]; clears the +/// CRDT `frozen` flag without touching the stage register. Returns +/// `Err(NotFound)` if no item exists for `story_id`. +pub fn transition_to_unfrozen(story_id: &str) -> Result<(), ApplyError> { + if read_typed(story_id)?.is_none() { + return Err(ApplyError::NotFound(story_id.to_string())); + } + crate::crdt_state::set_frozen(story_id, false); + crate::slog!( + "[pipeline/transition] #{}: Unfreeze (flag cleared)", + story_id + ); + Ok(()) } diff --git a/server/src/pipeline_state/mod.rs b/server/src/pipeline_state/mod.rs index 22313a09..5854fa93 100644 --- a/server/src/pipeline_state/mod.rs +++ b/server/src/pipeline_state/mod.rs @@ -19,8 +19,11 @@ //! - [`projection`] — CRDT → typed projection layer (`read_typed`, `read_all_typed`) //! - [`subscribers`] — concrete subscriber stubs -// Some items are exercised by tests or used only in non-active code paths; -// the dead_code lint is suppressed for the module. +// Scaffolding types (AgentName, NodePubkey, ExecutionState, ExecutionEvent, +// execution_transition, apply_transition_str, to_crdt_fields, is_upcoming, +// MissingField/InvalidField) exist for stages 2–5 of story 934 and the +// per-node execution-state work; they are only exercised by tests today. +// Revisit and drop the allow once those stages land. #![allow(dead_code)] mod apply; diff --git a/server/src/pipeline_state/projection.rs b/server/src/pipeline_state/projection.rs index e69cdf34..207e7473 100644 --- a/server/src/pipeline_state/projection.rs +++ b/server/src/pipeline_state/projection.rs @@ -1,15 +1,12 @@ //! Projection layer — converts loose CRDT views into typed `PipelineItem` enums. -#![allow(unused_imports, dead_code)] use chrono::{DateTime, Utc}; use std::fmt; use std::num::NonZeroU32; -use crate::crdt_state::{PipelineItemView, read_all_items, read_item}; +use crate::crdt_state::PipelineItemView; -use super::{ - ArchiveReason, BranchName, ExecutionState, GitSha, PipelineItem, Stage, StoryId, stage_dir_name, -}; +use super::{ArchiveReason, BranchName, GitSha, PipelineItem, Stage, StoryId, stage_dir_name}; /// Errors from projecting loose CRDT data into typed enums. #[derive(Debug, Clone, PartialEq, Eq)] @@ -61,27 +58,33 @@ impl TryFrom<&PipelineItemView> for PipelineItem { stage, depends_on, retry_count, + frozen: view.frozen(), }) } } -/// Project the stage string + associated fields from a WorkItem into -/// a typed Stage enum. This is the one carefully-controlled boundary where -/// loose CRDT data becomes typed. +/// Project the typed low-level [`crdt_state::Stage`] plus the view's +/// associated fields into a rich [`Stage`] with payload defaults. +/// +/// This is the one carefully-controlled boundary where the CRDT's +/// stringly-typed stage register gains payload fields (merge metadata, +/// archive reason, etc.) synthesised from sibling registers and sane +/// defaults. Unknown stage strings (forward-compat aliases) surface as +/// [`ProjectionError::UnknownStage`]. pub fn project_stage(view: &PipelineItemView) -> Result { - match view.stage_str() { - "0_upcoming" => Ok(Stage::Upcoming), - "1_backlog" => Ok(Stage::Backlog), - "2_blocked" => Ok(Stage::Blocked { + use crate::crdt_state::Stage as LowStage; + match view.stage() { + LowStage::Upcoming => Ok(Stage::Upcoming), + LowStage::Backlog => Ok(Stage::Backlog), + LowStage::Blocked => Ok(Stage::Blocked { reason: String::new(), }), - "2_current" => Ok(Stage::Coding), - "3_qa" => Ok(Stage::Qa), - "4_merge" => { + LowStage::Coding => Ok(Stage::Coding), + LowStage::Qa => Ok(Stage::Qa), + LowStage::Merge => { // Merge stage in the current CRDT doesn't carry feature_branch or // commits_ahead — those are computed at transition time. For // projection from existing CRDT data, we synthesize defaults. - // The feature branch follows the naming convention. let branch = format!("feature/story-{}", view.story_id()); // Existing CRDT data doesn't track commits_ahead, so we use 1 as // a safe non-zero default (the item is in merge, so there must be @@ -91,19 +94,19 @@ pub fn project_stage(view: &PipelineItemView) -> Result commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), }) } - "4_merge_failure" => { - // The reason is persisted in front-matter (merge_failure: "...") but - // is not part of the raw CRDT view; the projection uses an empty - // string here. Consumers that need the reason should read content. + LowStage::MergeFailure => { + // The reason is persisted in the content body but is not part of + // the raw CRDT view; the projection uses an empty string here. + // Consumers that need the reason should read content directly. Ok(Stage::MergeFailure { reason: String::new(), }) } - "5_done" => { + LowStage::Done => { // Use the stored merged_at timestamp if present. Legacy items // that pre-date this field have merged_at = None, so we fall back // to UNIX_EPOCH, which makes them older than any retention window - // and therefore eligible for immediate sweep to 6_archived. + // and therefore eligible for immediate sweep to archived. let merged_at = view .merged_at() .map(|ts| { @@ -115,14 +118,12 @@ pub fn project_stage(view: &PipelineItemView) -> Result merge_commit: GitSha("legacy".to_string()), }) } - "6_archived" => { - // Determine the archive reason from the CRDT fields. + LowStage::Archived => { let reason = if view.blocked() { ArchiveReason::Blocked { reason: "migrated from legacy blocked field".to_string(), } } else { - // Default to Completed for legacy archived items. ArchiveReason::Completed }; Ok(Stage::Archived { @@ -130,16 +131,7 @@ pub fn project_stage(view: &PipelineItemView) -> Result reason, }) } - "7_frozen" => { - // Story 929: resume_to was previously read from YAML front matter; - // we default to Coding here. Story 934 will obviate this — frozen - // becomes a flag orthogonal to Stage, so the story stays in its - // current Stage rather than encoding a "where to go next" payload. - Ok(Stage::Frozen { - resume_to: Box::new(Stage::Coding), - }) - } - other => Err(ProjectionError::UnknownStage(other.to_string())), + LowStage::Unknown(s) => Err(ProjectionError::UnknownStage(s)), } } @@ -158,7 +150,6 @@ impl PipelineItem { .. } ); - // Frozen stories map to "7_frozen"; they are not "blocked" in the CRDT sense. (dir, blocked) } } @@ -199,7 +190,6 @@ pub fn read_typed(story_id: &str) -> Result, ProjectionErro #[cfg(test)] mod tests { use super::*; - use chrono::TimeZone; use std::num::NonZeroU32; fn nz(n: u32) -> NonZeroU32 { @@ -232,12 +222,13 @@ mod tests { None, None, None, + None, ) } #[test] fn project_upcoming_item() { - let view = make_view("42_story_test", "0_upcoming", Some("Test Story")); + let view = make_view("42_story_test", "upcoming", Some("Test Story")); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Upcoming)); } @@ -246,7 +237,7 @@ mod tests { fn project_backlog_item() { let view = PipelineItemView::for_test( "42_story_test", - "1_backlog", + "backlog", Some("Test Story".to_string()), None, None, @@ -260,6 +251,7 @@ mod tests { None, None, None, + None, ); let item = PipelineItem::try_from(&view).unwrap(); assert_eq!(item.story_id, StoryId("42_story_test".to_string())); @@ -273,7 +265,7 @@ mod tests { fn project_current_item() { let view = PipelineItemView::for_test( "42_story_test", - "2_current", + "coding", Some("Test".to_string()), Some("coder-1".to_string()), Some(2), @@ -287,6 +279,7 @@ mod tests { None, None, None, + None, ); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Coding)); @@ -295,7 +288,7 @@ mod tests { #[test] fn project_merge_item() { - let view = make_view("42_story_test", "4_merge", Some("Test")); + let view = make_view("42_story_test", "merge", Some("Test")); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Merge { .. })); if let Stage::Merge { @@ -310,7 +303,7 @@ mod tests { #[test] fn project_blocked_item() { - let view = make_view("42_story_test", "2_blocked", Some("Test")); + let view = make_view("42_story_test", "blocked", Some("Test")); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Blocked { .. })); } @@ -319,7 +312,7 @@ mod tests { fn project_archived_blocked_item() { let view = PipelineItemView::for_test( "42_story_test", - "6_archived", + "archived", Some("Test".to_string()), None, None, @@ -333,6 +326,7 @@ mod tests { None, None, None, + None, ); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!( @@ -348,7 +342,7 @@ mod tests { fn project_archived_completed_item() { let view = PipelineItemView::for_test( "42_story_test", - "6_archived", + "archived", Some("Test".to_string()), None, None, @@ -362,6 +356,7 @@ mod tests { None, None, None, + None, ); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!( @@ -388,23 +383,23 @@ mod tests { #[test] fn reverse_projection_stage_dirs() { let cases: Vec<(Stage, &str, bool)> = vec![ - (Stage::Upcoming, "0_upcoming", false), - (Stage::Backlog, "1_backlog", false), - (Stage::Coding, "2_current", false), + (Stage::Upcoming, "upcoming", false), + (Stage::Backlog, "backlog", false), + (Stage::Coding, "coding", false), ( Stage::Blocked { reason: "stuck".into(), }, - "2_blocked", + "blocked", true, ), - (Stage::Qa, "3_qa", false), + (Stage::Qa, "qa", false), ( Stage::Merge { feature_branch: fb("f"), commits_ahead: nz(1), }, - "4_merge", + "merge", false, ), ( @@ -412,7 +407,7 @@ mod tests { merged_at: Utc::now(), merge_commit: sha("abc"), }, - "5_done", + "done", false, ), ( @@ -420,7 +415,7 @@ mod tests { archived_at: Utc::now(), reason: ArchiveReason::Completed, }, - "6_archived", + "archived", false, ), ( @@ -430,7 +425,7 @@ mod tests { reason: "stuck".into(), }, }, - "6_archived", + "archived", true, ), ]; @@ -442,6 +437,7 @@ mod tests { stage, depends_on: vec![], retry_count: 0, + frozen: false, }; let (dir, blocked) = item.to_crdt_fields(); assert_eq!(dir, expected_dir); diff --git a/server/src/pipeline_state/tests.rs b/server/src/pipeline_state/tests.rs index 8ef27a8f..add9fe1e 100644 --- a/server/src/pipeline_state/tests.rs +++ b/server/src/pipeline_state/tests.rs @@ -571,129 +571,51 @@ fn cannot_reject_from_archived() { )); } -// ── Freeze / Unfreeze ─────────────────────────────────────────────── +// ── Freeze / Unfreeze (story 934, stage 4: orthogonal flag) ──────────────── +/// Freeze sets the `frozen` flag without changing the stage register. +/// Unfreeze clears the flag — the stage was never touched so there's nothing +/// to "restore". Tests the freeze/unfreeze API on the apply layer, since +/// freeze/unfreeze are no longer pure stage transitions. #[test] -fn freeze_from_active_stages() { - for s in [Stage::Upcoming, Stage::Backlog, Stage::Coding, Stage::Qa] { - let result = transition(s.clone(), PipelineEvent::Freeze).unwrap(); - assert!( - matches!(result, Stage::Frozen { .. }), - "expected Frozen from {s:?}" - ); - if let Stage::Frozen { resume_to } = result { - assert_eq!(*resume_to, s); - } - } -} - -#[test] -fn freeze_from_merge() { - let m = Stage::Merge { - feature_branch: fb("f"), - commits_ahead: nz(1), - }; - let result = transition(m.clone(), PipelineEvent::Freeze).unwrap(); - assert!(matches!(result, Stage::Frozen { .. })); - if let Stage::Frozen { resume_to } = result { - assert_eq!(*resume_to, m); - } -} - -#[test] -fn unfreeze_restores_prior_stage() { - let prior = Stage::Coding; - let frozen = Stage::Frozen { - resume_to: Box::new(prior.clone()), - }; - let result = transition(frozen, PipelineEvent::Unfreeze).unwrap(); - assert_eq!(result, prior); -} - -#[test] -fn cannot_freeze_done() { - let s = Stage::Done { - merged_at: chrono::Utc::now(), - merge_commit: sha("abc"), - }; - let result = transition(s, PipelineEvent::Freeze); - assert!(matches!( - result, - Err(TransitionError::InvalidTransition { .. }) - )); -} - -#[test] -fn cannot_freeze_archived() { - let s = Stage::Archived { - archived_at: chrono::Utc::now(), - reason: ArchiveReason::Completed, - }; - let result = transition(s, PipelineEvent::Freeze); - assert!(matches!( - result, - Err(TransitionError::InvalidTransition { .. }) - )); -} - -#[test] -fn cannot_unfreeze_coding() { - let result = transition(Stage::Coding, PipelineEvent::Unfreeze); - assert!(matches!( - result, - Err(TransitionError::InvalidTransition { .. }) - )); -} - -/// Regression test: freeze → unfreeze round-trip via `apply_transition`. -/// Verifies that the CRDT shows the correct prior stage restored. -#[test] -fn regression_freeze_unfreeze_restores_crdt_stage() { +fn freeze_sets_flag_without_changing_stage() { crate::crdt_state::init_for_test(); crate::db::ensure_content_store(); let story_id = "9950_story_freeze_regression"; - let content = "---\nname: Freeze Regression\n---\n# Story\n"; crate::db::write_item_with_content( story_id, "2_current", - content, + "---\nname: Freeze Regression\n---\n# Story\n", crate::db::ItemMeta::named("Freeze Regression"), ); - // Confirm starting stage. let item = read_typed(story_id).unwrap().unwrap(); - assert!( - matches!(item.stage, Stage::Coding), - "should start at Coding" - ); + assert!(matches!(item.stage, Stage::Coding)); + assert!(!item.is_frozen()); - // Freeze. super::apply::transition_to_frozen(story_id).expect("freeze should succeed"); let item = read_typed(story_id).unwrap().unwrap(); assert!( - matches!(item.stage, Stage::Frozen { .. }), - "should be Frozen after freeze: {:?}", + matches!(item.stage, Stage::Coding), + "stage register stays at Coding after freeze: {:?}", item.stage ); - if let Stage::Frozen { ref resume_to } = item.stage { - assert!( - matches!(**resume_to, Stage::Coding), - "resume_to should be Coding: {:?}", - resume_to - ); - } + assert!(item.is_frozen(), "frozen flag should be set after freeze"); - // Unfreeze. super::apply::transition_to_unfrozen(story_id).expect("unfreeze should succeed"); let item = read_typed(story_id).unwrap().unwrap(); assert!( matches!(item.stage, Stage::Coding), - "should be restored to Coding after unfreeze: {:?}", + "stage register still at Coding after unfreeze: {:?}", item.stage ); + assert!( + !item.is_frozen(), + "frozen flag should be cleared after unfreeze" + ); } // ── Story 868: MergeFailure regression ───────────────────────────── @@ -745,7 +667,7 @@ fn merge_failure_transition_emits_event_with_full_reason() { .expect("item should exist"); assert_eq!( item.stage.dir_name(), - "4_merge_failure", + "merge_failure", "CRDT stage should be 4_merge_failure" ); } @@ -781,7 +703,7 @@ fn repeated_merge_failure_apply_transition_no_error_no_duplicate_notification() let story_id = "99913_story_merge_failure_selfloop"; crate::db::write_item_with_content( story_id, - "4_merge_failure", + "merge_failure", "---\nname: MergeFailure Self-loop Test\n---\n# Story\n", crate::db::ItemMeta::named("MergeFailure Self-loop Test"), ); @@ -809,14 +731,14 @@ fn repeated_merge_failure_apply_transition_no_error_no_duplicate_notification() fired.after ); - // Verify the CRDT stage is still 4_merge_failure. + // Verify the CRDT stage is still merge_failure. let item = read_typed(story_id) .expect("CRDT read should succeed") .expect("item should still exist"); assert_eq!( item.stage.dir_name(), - "4_merge_failure", - "CRDT stage should remain 4_merge_failure after self-loop" + "merge_failure", + "CRDT stage should remain merge_failure after self-loop" ); // Simulate the caller's de-dup logic: since fired.before is already MergeFailure, @@ -854,7 +776,7 @@ fn merge_failure_accept_moves_to_done_via_crdt() { let story_id = "99892_story_merge_failure_accept"; crate::db::write_item_with_content( story_id, - "4_merge_failure", + "merge_failure", "---\nname: MergeFailure Accept Test\n---\n# Story\n", crate::db::ItemMeta::named("MergeFailure Accept Test"), ); @@ -883,14 +805,14 @@ fn merge_failure_accept_moves_to_done_via_crdt() { fired.event ); - // CRDT reflects 5_done. + // CRDT reflects done. let item = read_typed(story_id) .expect("CRDT read should succeed") .expect("item should exist"); assert_eq!( item.stage.dir_name(), - "5_done", - "CRDT stage should be 5_done after MergeFailure + Accepted" + "done", + "CRDT stage should be done after MergeFailure + Accepted" ); } diff --git a/server/src/pipeline_state/transition.rs b/server/src/pipeline_state/transition.rs index 79095fcf..34cb340b 100644 --- a/server/src/pipeline_state/transition.rs +++ b/server/src/pipeline_state/transition.rs @@ -59,10 +59,6 @@ pub enum PipelineEvent { Close, /// Manual demotion back to backlog from an active stage. Demote, - /// Freeze the story at its current stage (suspends pipeline and auto-assign). - Freeze, - /// Unfreeze the story, restoring it to the stage it was at when frozen. - Unfreeze, } // ── Per-node execution events ─────────────────────────────────────────────── @@ -102,8 +98,6 @@ pub fn event_label(e: &PipelineEvent) -> &'static str { PipelineEvent::Triage => "Triage", PipelineEvent::Close => "Close", PipelineEvent::Demote => "Demote", - PipelineEvent::Freeze => "Freeze", - PipelineEvent::Unfreeze => "Unfreeze", } } @@ -267,17 +261,6 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Backlog), - // ── Freeze: any active stage → Frozen(resume_to=current) ──────── - (stage @ (Upcoming | Backlog | Coding | Qa), Freeze) => Ok(Frozen { - resume_to: Box::new(stage), - }), - (stage @ Merge { .. }, Freeze) => Ok(Frozen { - resume_to: Box::new(stage), - }), - - // ── Unfreeze: Frozen → resume_to ───────────────────────────────── - (Frozen { resume_to }, Unfreeze) => Ok(*resume_to), - // ── Everything else is invalid ────────────────────────────────── _ => Err(invalid()), } diff --git a/server/src/pipeline_state/types.rs b/server/src/pipeline_state/types.rs index c93c04c2..ee838f4d 100644 --- a/server/src/pipeline_state/types.rs +++ b/server/src/pipeline_state/types.rs @@ -110,10 +110,6 @@ pub enum Stage { /// awaiting human intervention or retry. Unlike `Archived(MergeFailed)`, /// this is a recoverable intermediate state — `Unblock` returns to `Backlog`. MergeFailure { reason: String }, - - /// Pipeline advancement and auto-assign are suspended. Resumes to - /// `resume_to` when unfrozen. - Frozen { resume_to: Box }, } /// Why a story was archived. Subsumes the old `blocked`, `merge_failure`, @@ -144,11 +140,6 @@ impl Stage { matches!(self, Stage::Coding | Stage::Qa | Stage::Merge { .. }) } - /// Returns true if this stage is `Frozen`. - pub fn is_frozen(&self) -> bool { - matches!(self, Stage::Frozen { .. }) - } - /// Returns true if this is the Upcoming variant. pub fn is_upcoming(&self) -> bool { matches!(self, Stage::Upcoming) @@ -183,35 +174,28 @@ impl Stage { /// accessing the rich metadata fields. pub fn from_dir(s: &str) -> Option { match s { - "0_upcoming" => Some(Stage::Upcoming), - "1_backlog" => Some(Stage::Backlog), - "2_blocked" => Some(Stage::Blocked { + "upcoming" => Some(Stage::Upcoming), + "backlog" => Some(Stage::Backlog), + "coding" => Some(Stage::Coding), + "blocked" => Some(Stage::Blocked { reason: String::new(), }), - "2_current" => Some(Stage::Coding), - "3_qa" => Some(Stage::Qa), - "4_merge" => Some(Stage::Merge { + "qa" => Some(Stage::Qa), + "merge" => Some(Stage::Merge { feature_branch: BranchName(String::new()), commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), }), - "5_done" => Some(Stage::Done { + "merge_failure" => Some(Stage::MergeFailure { + reason: String::new(), + }), + "done" => Some(Stage::Done { merged_at: DateTime::::UNIX_EPOCH, merge_commit: GitSha(String::new()), }), - "6_archived" => Some(Stage::Archived { + "archived" => Some(Stage::Archived { archived_at: DateTime::::UNIX_EPOCH, reason: ArchiveReason::Completed, }), - // Frozen: stub with Coding as resume_to — rich resume_to is loaded - // from front matter by the projection layer. - "4_merge_failure" => Some(Stage::MergeFailure { - reason: String::new(), - }), - // Frozen: stub with Coding as resume_to — rich resume_to is loaded - // from front matter by the projection layer. - "7_frozen" => Some(Stage::Frozen { - resume_to: Box::new(Stage::Coding), - }), _ => None, } } @@ -257,6 +241,18 @@ pub struct PipelineItem { pub stage: Stage, pub depends_on: Vec, pub retry_count: u32, + /// Whether the item is frozen — orthogonal to [`Self::stage`]. + /// Frozen items remain at their current stage but are skipped by the + /// auto-assigner until explicitly unfrozen (story 934, stage 4). + pub frozen: bool, +} + +impl PipelineItem { + /// Whether the item is frozen. Frozen items stay at their current + /// [`Stage`] but are skipped by the auto-assigner until unfrozen. + pub fn is_frozen(&self) -> bool { + self.frozen + } } // ── Transition errors ─────────────────────────────────────────────────────── @@ -293,22 +289,24 @@ pub fn stage_label(s: &Stage) -> &'static str { Stage::Done { .. } => "Done", Stage::Blocked { .. } => "Blocked", Stage::Archived { .. } => "Archived", - Stage::Frozen { .. } => "Frozen", } } -/// Map a Stage to the filesystem directory name used by the work pipeline. +/// Map a Stage to its canonical wire-format string (story 934). +/// +/// Post-934 emits clean vocabulary with no numeric prefix. Legacy directory +/// strings (`"2_current"`, `"4_merge"`, etc.) are still accepted by `from_dir` +/// for migration but are never produced here. pub fn stage_dir_name(s: &Stage) -> &'static str { match s { - Stage::Upcoming => "0_upcoming", - Stage::Backlog => "1_backlog", - Stage::Coding => "2_current", - Stage::Blocked { .. } => "2_blocked", - Stage::Qa => "3_qa", - Stage::Merge { .. } => "4_merge", - Stage::MergeFailure { .. } => "4_merge_failure", - Stage::Done { .. } => "5_done", - Stage::Archived { .. } => "6_archived", - Stage::Frozen { .. } => "7_frozen", + Stage::Upcoming => "upcoming", + Stage::Backlog => "backlog", + Stage::Coding => "coding", + Stage::Blocked { .. } => "blocked", + Stage::Qa => "qa", + Stage::Merge { .. } => "merge", + Stage::MergeFailure { .. } => "merge_failure", + Stage::Done { .. } => "done", + Stage::Archived { .. } => "archived", } } diff --git a/server/src/service/agents/mod.rs b/server/src/service/agents/mod.rs index 1c041aef..074bce1a 100644 --- a/server/src/service/agents/mod.rs +++ b/server/src/service/agents/mod.rs @@ -208,17 +208,23 @@ pub fn get_work_item_content( .map_err(|e| Error::Io(format!("Pipeline read error: {e}")))?; let stage = item .as_ref() - .map(|i| match &i.stage { - crate::pipeline_state::Stage::Upcoming => "upcoming", - crate::pipeline_state::Stage::Backlog => "backlog", - crate::pipeline_state::Stage::Coding => "current", - crate::pipeline_state::Stage::Blocked { .. } => "blocked", - crate::pipeline_state::Stage::Qa => "qa", - crate::pipeline_state::Stage::Merge { .. } => "merge", - crate::pipeline_state::Stage::MergeFailure { .. } => "merge_failure", - crate::pipeline_state::Stage::Done { .. } => "done", - crate::pipeline_state::Stage::Archived { .. } => "archived", - crate::pipeline_state::Stage::Frozen { .. } => "frozen", + .map(|i| { + // Frozen is now an orthogonal CRDT flag (story 934, stage 4). + if i.is_frozen() { + "frozen" + } else { + match &i.stage { + crate::pipeline_state::Stage::Upcoming => "upcoming", + crate::pipeline_state::Stage::Backlog => "backlog", + crate::pipeline_state::Stage::Coding => "current", + crate::pipeline_state::Stage::Blocked { .. } => "blocked", + crate::pipeline_state::Stage::Qa => "qa", + crate::pipeline_state::Stage::Merge { .. } => "merge", + crate::pipeline_state::Stage::MergeFailure { .. } => "merge_failure", + crate::pipeline_state::Stage::Done { .. } => "done", + crate::pipeline_state::Stage::Archived { .. } => "archived", + } + } }) .unwrap_or("unknown") .to_string(); @@ -344,7 +350,7 @@ max_budget_usd = 5.0 "---\nname: \"Foo Story\"\n---\n\nSome content.", ); // Story 929: name lives in the CRDT register. - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( "42_story_foo", "1_backlog", Some("Foo Story"), diff --git a/server/src/service/gateway/polling.rs b/server/src/service/gateway/polling.rs index 02587137..cff6b81b 100644 --- a/server/src/service/gateway/polling.rs +++ b/server/src/service/gateway/polling.rs @@ -54,8 +54,8 @@ mod tests { fn stage_transition_prefixes_project_name() { let event = StoredEvent::StageTransition { story_id: "42_story_my_feature".to_string(), - from_stage: "2_current".to_string(), - to_stage: "3_qa".to_string(), + from_stage: "coding".to_string(), + to_stage: "qa".to_string(), timestamp_ms: 1000, }; let (plain, html) = format_gateway_event("huskies", &event); diff --git a/server/src/service/notifications/format.rs b/server/src/service/notifications/format.rs index 9b7a6494..bd6a5bbc 100644 --- a/server/src/service/notifications/format.rs +++ b/server/src/service/notifications/format.rs @@ -19,7 +19,6 @@ pub fn stage_display_name(stage: &str) -> &'static str { Some(Stage::Done { .. }) => "Done", Some(Stage::Archived { .. }) => "Archived", Some(Stage::MergeFailure { .. }) => "MergeFailure", - Some(Stage::Frozen { .. }) => "Frozen", None => "Unknown", } } @@ -185,12 +184,12 @@ mod tests { #[test] fn stage_display_name_maps_all_known_stages() { - assert_eq!(stage_display_name("1_backlog"), "Backlog"); - assert_eq!(stage_display_name("2_current"), "Current"); - assert_eq!(stage_display_name("3_qa"), "QA"); - assert_eq!(stage_display_name("4_merge"), "Merge"); - assert_eq!(stage_display_name("5_done"), "Done"); - assert_eq!(stage_display_name("6_archived"), "Archived"); + assert_eq!(stage_display_name("backlog"), "Backlog"); + assert_eq!(stage_display_name("coding"), "Current"); + assert_eq!(stage_display_name("qa"), "QA"); + assert_eq!(stage_display_name("merge"), "Merge"); + assert_eq!(stage_display_name("done"), "Done"); + assert_eq!(stage_display_name("archived"), "Archived"); assert_eq!(stage_display_name("unknown"), "Unknown"); } diff --git a/server/src/service/notifications/io/tests_stage.rs b/server/src/service/notifications/io/tests_stage.rs index 395903c1..68f2070e 100644 --- a/server/src/service/notifications/io/tests_stage.rs +++ b/server/src/service/notifications/io/tests_stage.rs @@ -46,11 +46,11 @@ async fn stage_notification_uses_dynamic_room_ids() { watcher_tx .send(WatcherEvent::WorkItem { - stage: "3_qa".to_string(), + stage: "qa".to_string(), item_id: "10_story_foo".to_string(), action: "qa".to_string(), commit_msg: "huskies: qa 10_story_foo".to_string(), - from_stage: Some("2_current".to_string()), + from_stage: Some("coding".to_string()), }) .unwrap(); @@ -88,11 +88,11 @@ async fn stage_notification_with_no_rooms_is_silent() { watcher_tx .send(WatcherEvent::WorkItem { - stage: "3_qa".to_string(), + stage: "qa".to_string(), item_id: "10_story_foo".to_string(), action: "qa".to_string(), commit_msg: "huskies: qa 10_story_foo".to_string(), - from_stage: Some("2_current".to_string()), + from_stage: Some("coding".to_string()), }) .unwrap(); @@ -165,7 +165,7 @@ async fn synthetic_event_without_from_stage_does_not_notify() { // Synthetic reassign event within 4_merge — no actual stage change. watcher_tx .send(WatcherEvent::WorkItem { - stage: "4_merge".to_string(), + stage: "merge".to_string(), item_id: "549_story_skip_qa".to_string(), action: "reassign".to_string(), commit_msg: String::new(), @@ -209,11 +209,11 @@ async fn skip_qa_shows_current_to_merge_not_qa_to_merge() { // Story skips QA: from_stage is 2_current, not 3_qa. watcher_tx .send(WatcherEvent::WorkItem { - stage: "4_merge".to_string(), + stage: "merge".to_string(), item_id: "549_story_skip_qa".to_string(), action: "merge".to_string(), commit_msg: "huskies: merge 549_story_skip_qa".to_string(), - from_stage: Some("2_current".to_string()), + from_stage: Some("coding".to_string()), }) .unwrap(); diff --git a/server/src/service/status/format.rs b/server/src/service/status/format.rs index f9cedf8c..a720a284 100644 --- a/server/src/service/status/format.rs +++ b/server/src/service/status/format.rs @@ -83,8 +83,8 @@ mod tests { let event = StatusEvent::StageTransition { story_id: "42_story_foo".to_string(), story_name: Some("Foo Story".to_string()), - from_stage: "4_merge".to_string(), - to_stage: "5_done".to_string(), + from_stage: "merge".to_string(), + to_stage: "done".to_string(), }; let s = format_status_event(&event); assert!( @@ -101,8 +101,8 @@ mod tests { let event = StatusEvent::StageTransition { story_id: "10_story_bar".to_string(), story_name: Some("Bar".to_string()), - from_stage: "1_backlog".to_string(), - to_stage: "2_current".to_string(), + from_stage: "backlog".to_string(), + to_stage: "coding".to_string(), }; let s = format_status_event(&event); assert!(!s.contains("\u{1f389}")); @@ -114,8 +114,8 @@ mod tests { let event = StatusEvent::StageTransition { story_id: "5_story_x".to_string(), story_name: None, - from_stage: "2_current".to_string(), - to_stage: "3_qa".to_string(), + from_stage: "coding".to_string(), + to_stage: "qa".to_string(), }; let s = format_status_event(&event); assert!(s.contains("5_story_x")); @@ -182,8 +182,8 @@ mod tests { StatusEvent::StageTransition { story_id: "1_story_a".to_string(), story_name: None, - from_stage: "1_backlog".to_string(), - to_stage: "2_current".to_string(), + from_stage: "backlog".to_string(), + to_stage: "coding".to_string(), }, StatusEvent::MergeFailure { story_id: "2_story_b".to_string(), diff --git a/server/src/service/story/front_matter.rs b/server/src/service/story/front_matter.rs index 04690dbd..eba23325 100644 --- a/server/src/service/story/front_matter.rs +++ b/server/src/service/story/front_matter.rs @@ -4,32 +4,19 @@ //! without performing any I/O. Parsing is delegated to `crate::io::story_metadata`. #[allow(dead_code)] -/// Return `true` if `stage` is a recognised pipeline stage directory name. +/// Return `true` if `stage` is a recognised pipeline stage name. /// -/// Valid stage names match the `.huskies/work/N_name/` directory scheme. +/// Accepts both the clean post-934 wire form (e.g. `"backlog"`) and the +/// legacy directory-style form (e.g. `"1_backlog"`). pub fn is_valid_stage(stage: &str) -> bool { crate::pipeline_state::Stage::from_dir(stage).is_some() } #[allow(dead_code)] -/// Map a human-readable stage alias (e.g. `"backlog"`) to its directory name -/// (e.g. `"1_backlog"`). Returns `None` for unrecognised aliases. +/// Map any recognised stage alias (clean wire form or legacy directory form) +/// to the canonical clean wire form. Returns `None` for unrecognised aliases. pub fn stage_alias_to_dir(alias: &str) -> Option<&'static str> { - use crate::pipeline_state::Stage; - // Canonical directory names (e.g. "1_backlog") round-trip through the typed enum. - if let Some(stage) = Stage::from_dir(alias) { - return Some(stage.dir_name()); - } - // Short human-readable aliases (user-facing input normalization). - match alias { - "backlog" => Some("1_backlog"), - "current" => Some("2_current"), - "qa" => Some("3_qa"), - "merge" => Some("4_merge"), - "done" => Some("5_done"), - "archived" => Some("6_archived"), - _ => None, - } + crate::pipeline_state::Stage::from_dir(alias).map(|s| s.dir_name()) } // ── Tests ───────────────────────────────────────────────────────────────────── @@ -40,36 +27,42 @@ mod tests { #[test] fn is_valid_stage_accepts_all_known_stages() { - assert!(is_valid_stage("1_backlog")); - assert!(is_valid_stage("2_current")); - assert!(is_valid_stage("3_qa")); - assert!(is_valid_stage("4_merge")); - assert!(is_valid_stage("5_done")); - assert!(is_valid_stage("6_archived")); + // Clean post-934 vocabulary. + assert!(is_valid_stage("backlog")); + assert!(is_valid_stage("coding")); + assert!(is_valid_stage("qa")); + assert!(is_valid_stage("merge")); + assert!(is_valid_stage("done")); + assert!(is_valid_stage("archived")); } #[test] fn is_valid_stage_rejects_unknown() { - assert!(!is_valid_stage("current")); - assert!(!is_valid_stage("backlog")); + // Story 934 stage 6 dropped legacy directory-style aliases. + assert!(!is_valid_stage("current")); // pre-934 short alias, no longer mapped + assert!(!is_valid_stage("1_backlog")); + assert!(!is_valid_stage("2_current")); assert!(!is_valid_stage("7_future")); assert!(!is_valid_stage("")); } #[test] fn stage_alias_maps_short_names() { - assert_eq!(stage_alias_to_dir("backlog"), Some("1_backlog")); - assert_eq!(stage_alias_to_dir("current"), Some("2_current")); - assert_eq!(stage_alias_to_dir("qa"), Some("3_qa")); - assert_eq!(stage_alias_to_dir("merge"), Some("4_merge")); - assert_eq!(stage_alias_to_dir("done"), Some("5_done")); - assert_eq!(stage_alias_to_dir("archived"), Some("6_archived")); + assert_eq!(stage_alias_to_dir("backlog"), Some("backlog")); + assert_eq!(stage_alias_to_dir("coding"), Some("coding")); + assert_eq!(stage_alias_to_dir("qa"), Some("qa")); + assert_eq!(stage_alias_to_dir("merge"), Some("merge")); + assert_eq!(stage_alias_to_dir("done"), Some("done")); + assert_eq!(stage_alias_to_dir("archived"), Some("archived")); } #[test] - fn stage_alias_maps_full_dir_names() { - assert_eq!(stage_alias_to_dir("1_backlog"), Some("1_backlog")); - assert_eq!(stage_alias_to_dir("6_archived"), Some("6_archived")); + fn stage_alias_returns_none_for_legacy_dir_names() { + // Story 934 stage 6: legacy directory-style aliases are no longer + // recognised — startup migration rewrites stored CRDT values, and + // user-facing aliases now use only the clean wire vocabulary. + assert_eq!(stage_alias_to_dir("1_backlog"), None); + assert_eq!(stage_alias_to_dir("6_archived"), None); } #[test] diff --git a/server/src/service/work_item/assign.rs b/server/src/service/work_item/assign.rs index ba63302d..e586e998 100644 --- a/server/src/service/work_item/assign.rs +++ b/server/src/service/work_item/assign.rs @@ -56,7 +56,7 @@ mod tests { let story_id = "8770_story_assign_regression_crdt"; // Seed the CRDT so set_agent can find the item. - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( story_id, "2_current", Some("Assign Regression"), @@ -98,7 +98,7 @@ mod tests { let story_id_b = "8772_story_assign_path_b"; for sid in &[story_id_a, story_id_b] { - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( sid, "2_current", Some("Path Test"), diff --git a/server/src/service/work_item/delete.rs b/server/src/service/work_item/delete.rs index d3241866..29bee7d7 100644 --- a/server/src/service/work_item/delete.rs +++ b/server/src/service/work_item/delete.rs @@ -195,7 +195,7 @@ mod tests { let story_id = "8750_story_service_delete_regression"; // Seed CRDT. - crate::crdt_state::write_item( + crate::crdt_state::write_item_str( story_id, "1_backlog", Some("Service Delete Regression"), diff --git a/server/src/service/work_item/freeze.rs b/server/src/service/work_item/freeze.rs index 440158c2..3d804342 100644 --- a/server/src/service/work_item/freeze.rs +++ b/server/src/service/work_item/freeze.rs @@ -28,10 +28,8 @@ pub enum UnfreezeStatus { /// stage without making any CRDT writes. Returns `Err` if the state transition /// fails (e.g. the item is not found or is in a terminal stage). pub fn freeze(story_id: &str) -> Result { - let already_frozen = crate::pipeline_state::read_typed(story_id) - .ok() - .flatten() - .map(|i| i.stage.is_frozen()) + let already_frozen = crate::crdt_state::read_item(story_id) + .map(|view| view.frozen()) .unwrap_or(false); if already_frozen { @@ -45,13 +43,11 @@ pub fn freeze(story_id: &str) -> Result { /// Unfreeze a work item, resuming normal pipeline behaviour. /// -/// Returns [`UnfreezeStatus::NotFrozen`] if the item is not currently in the -/// frozen stage. Returns `Err` if the state transition fails. +/// Returns [`UnfreezeStatus::NotFrozen`] if the item is not currently frozen. +/// Returns `Err` if the state transition fails. pub fn unfreeze(story_id: &str) -> Result { - let is_frozen = crate::pipeline_state::read_typed(story_id) - .ok() - .flatten() - .map(|i| i.stage.is_frozen()) + let is_frozen = crate::crdt_state::read_item(story_id) + .map(|view| view.frozen()) .unwrap_or(false); if !is_frozen { @@ -92,7 +88,7 @@ mod tests { .expect("read_typed should succeed") .expect("item should be present"); assert!( - item.stage.is_frozen(), + item.is_frozen(), "stage should be Frozen after freeze: {:?}", item.stage ); @@ -141,7 +137,7 @@ mod tests { .expect("read_typed should succeed") .expect("item should be present"); assert!( - !item.stage.is_frozen(), + !item.is_frozen(), "stage should not be Frozen after unfreeze: {:?}", item.stage ); @@ -212,12 +208,12 @@ mod tests { .expect("MCP-path item should be in CRDT"); assert!( - state_a.stage.is_frozen(), + state_a.is_frozen(), "chat-path CRDT stage must be frozen: {:?}", state_a.stage ); assert!( - state_b.stage.is_frozen(), + state_b.is_frozen(), "MCP-path CRDT stage must be frozen: {:?}", state_b.stage ); diff --git a/server/src/startup/project.rs b/server/src/startup/project.rs index 856f36f6..c9d13938 100644 --- a/server/src/startup/project.rs +++ b/server/src/startup/project.rs @@ -151,6 +151,12 @@ pub(crate) async fn init_subsystems(app_state: &Arc, cwd: &Path) { crate::slog!("[crdt] Failed to initialise CRDT state layer: {e}"); } else { crdt_state::migrate_names_from_slugs(); + // Story 934 stage 6: rewrite any pre-934 directory-style stage + // strings to the clean post-934 wire vocabulary, and set the new + // `frozen` flag on items that were previously at `Stage::Frozen`. + // Must run before legacy stage-string acceptance is dropped from + // `Stage::from_dir` (also part of stage 6). + crdt_state::migrate_legacy_stage_strings(); let id_migrations = crdt_state::migrate_story_ids_to_numeric(); if !id_migrations.is_empty() && let Some(project_root) = db_path.parent().and_then(|p| p.parent())