feat(934): typed Stage enum replaces directory-string state model

The state machine's `Stage` enum becomes the source of truth for pipeline
state. Six stages of work land together:

  1. Clean wire vocabulary (`coding`, `merge`, `merge_failure`, ...) replaces
     legacy directory-style strings (`2_current`, `4_merge`, ...) on the wire.
     `Stage::from_dir` accepted both during deployment; new writes always
     emit the clean form via `stage_dir_name`. Lexicographic `dir >= "5_done"`
     checks in lifecycle.rs become typed `matches!` checks since the new
     vocabulary doesn't sort in pipeline order.
  2. `crdt_state::write_item` takes typed `&Stage`, serialising via
     `stage_dir_name` at the CRDT boundary. `#[cfg(test)] write_item_str`
     parses legacy strings for test fixtures.
  3. `WorkItem::stage()` returns typed `crdt_state::Stage`; `stage_str()`
     is gone from the public API. Projection dispatches on the typed enum.
  4. `frozen` becomes an orthogonal CRDT register. `Stage::Frozen` and
     `PipelineEvent::Freeze`/`Unfreeze` are removed; `transition_to_frozen`/
     `unfrozen` set the flag directly without touching the stage register.
  5. Watcher sweep and `tool_update_story`'s `blocked` setter route through
     `apply_transition` so the typed transition table validates every
     stage change. `update_story` gains a `frozen` field for symmetry.
  6. One-shot startup migration rewrites pre-934 directory-style stage
     registers (and sets `frozen=true` on items previously at `7_frozen`).
     `Stage::from_dir` drops legacy aliases. The db boundary keeps a small
     normaliser so callers with legacy strings (MCP, tests) still work.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Timmy
2026-05-12 22:31:59 +01:00
parent 93443e2ff1
commit d78dd9e8f9
55 changed files with 783 additions and 584 deletions
+90 -6
View File
@@ -11,6 +11,7 @@ use serde_json::json;
use super::super::state::{apply_and_persist, emit_event, get_crdt, rebuild_index};
use super::super::types::CrdtEvent;
use crate::io::story_metadata::QaMode;
use crate::pipeline_state::{Stage, stage_dir_name};
/// Set the typed `depends_on` CRDT register for a pipeline item.
///
@@ -103,6 +104,28 @@ pub fn set_review_hold(story_id: &str, value: bool) -> bool {
true
}
/// Set the `frozen` CRDT flag for a pipeline item (story 934, stage 4).
///
/// `true` freezes the story at its current `Stage` — the auto-assigner skips
/// it but the stage register is untouched. `false` unfreezes; the story
/// remains at its current stage and resumes auto-assignment. Both writes
/// are explicit (not removals) so the cleared state survives CRDT replay.
///
/// Returns `true` if the item was found and the op was applied, `false` otherwise.
pub fn set_frozen(story_id: &str, value: bool) -> bool {
let Some(state_mutex) = get_crdt() else {
return false;
};
let Ok(mut state) = state_mutex.lock() else {
return false;
};
let Some(&idx) = state.index.get(story_id) else {
return false;
};
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].frozen.set(value));
true
}
/// Set the `mergemaster_attempted` CRDT flag for a pipeline item.
///
/// Passing `true` records that a mergemaster session has been spawned for this
@@ -211,10 +234,13 @@ pub fn set_qa_mode(story_id: &str, mode: Option<QaMode>) -> bool {
///
/// When the stage changes (or a new item is created), a [`CrdtEvent`] is
/// broadcast so subscribers can react to the transition.
///
/// `stage` is the typed pipeline state; it is serialised to the canonical
/// clean wire form (story 934) via [`stage_dir_name`] at the CRDT boundary.
#[allow(clippy::too_many_arguments)]
pub fn write_item(
story_id: &str,
stage: &str,
stage: &Stage,
name: Option<&str>,
agent: Option<&str>,
retry_count: Option<i64>,
@@ -224,6 +250,7 @@ pub fn write_item(
claimed_at: Option<f64>,
merged_at: Option<f64>,
) {
let stage_str = stage_dir_name(stage);
let Some(state_mutex) = get_crdt() else {
return;
};
@@ -247,7 +274,7 @@ pub fn write_item(
// Update existing item registers.
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].stage.set(stage.to_string())
s.crdt.doc.items[idx].stage.set(stage_str.to_string())
});
if let Some(n) = name {
@@ -286,7 +313,7 @@ pub fn write_item(
}
// Broadcast a CrdtEvent if the stage actually changed.
let stage_changed = old_stage.as_deref() != Some(stage);
let stage_changed = old_stage.as_deref() != Some(stage_str);
if stage_changed {
// Read the current name from the CRDT document for the event.
let current_name = match state.crdt.doc.items[idx].name.view() {
@@ -296,7 +323,7 @@ pub fn write_item(
emit_event(CrdtEvent {
story_id: story_id.to_string(),
from_stage: old_stage,
to_stage: stage.to_string(),
to_stage: stage_str.to_string(),
name: current_name,
});
}
@@ -304,7 +331,7 @@ pub fn write_item(
// Insert new item.
let item_json: JsonValue = json!({
"story_id": story_id,
"stage": stage,
"stage": stage_str,
"name": name.unwrap_or(""),
"agent": agent.unwrap_or(""),
"retry_count": retry_count.unwrap_or(0) as f64,
@@ -318,6 +345,7 @@ pub fn write_item(
"review_hold": false,
"item_type": "",
"epic": "",
"frozen": false,
})
.into();
@@ -348,18 +376,74 @@ pub fn write_item(
item.review_hold.advance_seq(floor);
item.item_type.advance_seq(floor);
item.epic.advance_seq(floor);
item.frozen.advance_seq(floor);
}
// Broadcast a CrdtEvent for the new item.
emit_event(CrdtEvent {
story_id: story_id.to_string(),
from_stage: None,
to_stage: stage.to_string(),
to_stage: stage_str.to_string(),
name: name.map(String::from),
});
}
}
/// Test-only convenience that parses a wire-form stage string and forwards
/// to [`write_item`]. Existing tests seed CRDT items with legacy directory
/// strings (`"2_current"`, `"4_merge"`, etc.) — this shim keeps that idiom
/// working without forcing every test to construct typed `Stage` payloads.
///
/// Stages are normalised through [`Stage::from_dir`]: unknown strings cause
/// the write to be skipped (with a log line).
#[cfg(test)]
#[allow(clippy::too_many_arguments)]
pub fn write_item_str(
story_id: &str,
stage: &str,
name: Option<&str>,
agent: Option<&str>,
retry_count: Option<i64>,
blocked: Option<bool>,
depends_on: Option<&str>,
claimed_by: Option<&str>,
claimed_at: Option<f64>,
merged_at: Option<f64>,
) {
// Normalise pre-934 directory-style strings to clean wire form so
// existing test fixtures keep working after stage 6 dropped the legacy
// aliases from `Stage::from_dir`. See `db::ops::normalise_stage_str`
// for the user-facing equivalent on the db boundary.
let normalised = match stage {
"0_upcoming" => "upcoming",
"1_backlog" => "backlog",
"2_current" => "coding",
"2_blocked" => "blocked",
"3_qa" => "qa",
"4_merge" => "merge",
"4_merge_failure" => "merge_failure",
"5_done" => "done",
"6_archived" => "archived",
other => other,
};
let Some(typed) = Stage::from_dir(normalised) else {
crate::slog!("[crdt_state] write_item_str: unknown stage '{stage}' for {story_id}");
return;
};
write_item(
story_id,
&typed,
name,
agent,
retry_count,
blocked,
depends_on,
claimed_by,
claimed_at,
merged_at,
);
}
/// Set `retry_count` to an explicit value for a pipeline item.
///
/// Pure metadata operation — the item's stage is not changed.
+85
View File
@@ -181,3 +181,88 @@ pub fn migrate_names_from_slugs() {
}
slog!("[crdt] Migrated names for {count} items from story ID slugs");
}
/// Map a pre-934 legacy directory-style stage string to its clean wire form.
///
/// Returns `None` if `s` is already in clean wire form (or is genuinely
/// unknown), so the migration can quickly skip already-clean items.
fn legacy_stage_to_clean(s: &str) -> Option<&'static str> {
match s {
"0_upcoming" => Some("upcoming"),
"1_backlog" => Some("backlog"),
"2_current" => Some("coding"),
"2_blocked" => Some("blocked"),
"3_qa" => Some("qa"),
"4_merge" => Some("merge"),
"4_merge_failure" => Some("merge_failure"),
"5_done" => Some("done"),
"6_archived" => Some("archived"),
// Story 934, stage 4: `Stage::Frozen` no longer exists. Items that
// were previously frozen become orthogonal-flag-frozen: their stage
// register collapses to `backlog` (a safe "not progressing" default
// since the original resume_to payload was lost when the variant was
// dropped) and a separate write sets `frozen = true`.
"7_frozen" => Some("backlog"),
_ => None,
}
}
/// Rewrite every pipeline item whose `stage` register still carries a pre-934
/// directory-style string (`"2_current"`, `"4_merge"`, etc.) to the clean wire
/// vocabulary (`"coding"`, `"merge"`, etc.).
///
/// Items that were at `"7_frozen"` additionally get the new `frozen` flag set
/// — the stage variant `Frozen` was dropped in story 934 stage 4 in favour of
/// an orthogonal CRDT register.
///
/// One-time startup migration: items that have transitioned at least once
/// since story 934 stage 1 (which made writes emit clean form) are no-ops.
pub fn migrate_legacy_stage_strings() {
let Some(state_mutex) = get_crdt() else {
return;
};
// First pass: collect (index, clean_stage, set_frozen) for items that
// still carry legacy stage strings.
let migrations: Vec<(usize, &'static str, bool)> = {
let Ok(state) = state_mutex.lock() else {
return;
};
state
.index
.iter()
.filter_map(|(_story_id, &idx)| {
let item = &state.crdt.doc.items[idx];
let current = match item.stage.view() {
JsonValue::String(s) => s,
_ => return None,
};
let clean = legacy_stage_to_clean(&current)?;
let was_frozen = current == "7_frozen";
Some((idx, clean, was_frozen))
})
.collect()
};
if migrations.is_empty() {
return;
}
let Ok(mut state) = state_mutex.lock() else {
return;
};
let count = migrations.len();
let frozen_count = migrations.iter().filter(|(_, _, f)| *f).count();
for (idx, clean, was_frozen) in migrations {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].stage.set(clean.to_string())
});
if was_frozen {
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].frozen.set(true));
}
}
slog!(
"[crdt] Migrated {count} legacy stage strings to clean wire form \
({frozen_count} of which were '7_frozen' → backlog + frozen=true)"
);
}
+8 -2
View File
@@ -10,7 +10,13 @@ mod migrations;
mod tests;
pub use item::{
bump_retry_count, set_agent, set_blocked, set_depends_on, set_epic, set_item_type,
bump_retry_count, set_agent, set_blocked, set_depends_on, set_epic, set_frozen, set_item_type,
set_mergemaster_attempted, set_name, set_qa_mode, set_retry_count, set_review_hold, write_item,
};
pub use migrations::{migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id};
#[cfg(test)]
pub use item::write_item_str;
pub use migrations::{
migrate_legacy_stage_strings, migrate_names_from_slugs, migrate_story_ids_to_numeric,
name_from_story_id,
};
+17 -17
View File
@@ -90,7 +90,7 @@ fn numeric_id_from_slug_returns_none_for_non_numeric_prefix() {
fn migrate_story_ids_to_numeric_rewrites_slug_ids() {
init_for_test();
write_item(
write_item_str(
"42_story_my_feature",
"1_backlog",
Some("My Feature"),
@@ -123,7 +123,7 @@ fn migrate_story_ids_to_numeric_rewrites_slug_ids() {
fn migrate_story_ids_to_numeric_is_idempotent() {
init_for_test();
write_item(
write_item_str(
"43",
"1_backlog",
Some("Already Numeric"),
@@ -153,7 +153,7 @@ fn migrate_story_ids_to_numeric_skips_conflict() {
init_for_test();
// Both the slug form AND its numeric target exist.
write_item(
write_item_str(
"44_story_foo",
"1_backlog",
None,
@@ -165,7 +165,7 @@ fn migrate_story_ids_to_numeric_skips_conflict() {
None,
None,
);
write_item(
write_item_str(
"44",
"2_current",
None,
@@ -200,7 +200,7 @@ fn migrate_story_ids_to_numeric_noop_when_crdt_not_initialised() {
fn migrate_story_ids_to_numeric_preserves_stage_and_name() {
init_for_test();
write_item(
write_item_str(
"45_bug_crash",
"2_current",
Some("Crash Bug"),
@@ -216,7 +216,7 @@ fn migrate_story_ids_to_numeric_preserves_stage_and_name() {
migrate_story_ids_to_numeric();
let item = read_item("45").expect("item must be accessible by numeric ID");
assert_eq!(item.stage, "2_current");
assert_eq!(item.stage, "coding");
assert_eq!(item.name.as_deref(), Some("Crash Bug"));
assert_eq!(item.agent.as_deref(), Some("coder-1"));
}
@@ -226,7 +226,7 @@ fn migrate_names_from_slugs_fills_empty_names() {
init_for_test();
// Write an item without a name.
write_item(
write_item_str(
"42_story_my_feature",
"1_backlog",
None,
@@ -261,7 +261,7 @@ fn migrate_names_from_slugs_fills_empty_names() {
fn migrate_names_from_slugs_leaves_existing_names_unchanged() {
init_for_test();
write_item(
write_item_str(
"43_story_named_item",
"1_backlog",
Some("Already Named"),
@@ -299,7 +299,7 @@ fn set_depends_on_round_trip_and_clear() {
use super::super::read::{check_unmet_deps_crdt, read_item};
init_for_test();
write_item(
write_item_str(
"872_test_target",
"1_backlog",
Some("Target"),
@@ -355,7 +355,7 @@ fn set_depends_on_returns_false_for_unknown_story() {
fn set_mergemaster_attempted_true_then_false_flips_register() {
init_for_test();
write_item(
write_item_str(
"873_story_mergemaster_flip",
"4_merge",
None,
@@ -411,7 +411,7 @@ fn set_mergemaster_attempted_returns_false_for_unknown_story() {
fn set_agent_some_writes_name() {
init_for_test();
write_item(
write_item_str(
"871_story_set_agent_write",
"2_current",
Some("Set Agent Write"),
@@ -439,7 +439,7 @@ fn set_agent_some_writes_name() {
fn set_agent_none_clears_register() {
init_for_test();
write_item(
write_item_str(
"871_story_set_agent_clear",
"2_current",
Some("Set Agent Clear"),
@@ -485,7 +485,7 @@ fn set_qa_mode_round_trip_server_then_human() {
use crate::io::story_metadata::QaMode;
init_for_test();
write_item(
write_item_str(
"869_story_qa_roundtrip",
"1_backlog",
None,
@@ -541,7 +541,7 @@ fn set_qa_mode_returns_false_for_unknown_story() {
#[test]
fn bump_retry_count_increments_by_one() {
init_for_test();
write_item(
write_item_str(
"9001_story_bump_test",
"2_current",
None,
@@ -571,7 +571,7 @@ fn bump_retry_count_increments_by_one() {
#[test]
fn set_retry_count_resets_to_zero() {
init_for_test();
write_item(
write_item_str(
"9002_story_set_test",
"2_current",
None,
@@ -755,7 +755,7 @@ async fn tombstone_survives_concurrent_writes() {
let story_id = "889_story_tombstone_concurrent";
write_item(
write_item_str(
story_id,
"2_current",
Some("Tombstone Concurrent Test"),
@@ -777,7 +777,7 @@ async fn tombstone_survives_concurrent_writes() {
let writer = tokio::task::spawn(async move {
while !stop_clone.load(Ordering::Relaxed) {
write_item(
write_item_str(
story_id,
"2_current",
Some("Tombstone Concurrent Test"),