From deffcdc326c94809279619c6745c8b7169409262 Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 29 Apr 2026 16:24:44 +0000 Subject: [PATCH] huskies: merge 844 --- server/src/crdt_state/write.rs | 1219 --------------------- server/src/crdt_state/write/item.rs | 305 ++++++ server/src/crdt_state/write/migrations.rs | 182 +++ server/src/crdt_state/write/mod.rs | 16 + server/src/crdt_state/write/tests.rs | 735 +++++++++++++ 5 files changed, 1238 insertions(+), 1219 deletions(-) delete mode 100644 server/src/crdt_state/write.rs create mode 100644 server/src/crdt_state/write/item.rs create mode 100644 server/src/crdt_state/write/migrations.rs create mode 100644 server/src/crdt_state/write/mod.rs create mode 100644 server/src/crdt_state/write/tests.rs diff --git a/server/src/crdt_state/write.rs b/server/src/crdt_state/write.rs deleted file mode 100644 index aa9df715..00000000 --- a/server/src/crdt_state/write.rs +++ /dev/null @@ -1,1219 +0,0 @@ -//! High-level write API for pipeline items. - -#![allow(unused_imports, dead_code)] -use bft_json_crdt::json_crdt::*; -use bft_json_crdt::lww_crdt::LwwRegisterCrdt; -use bft_json_crdt::op::ROOT_ID; -use serde_json::json; - -use super::state::{apply_and_persist, emit_event, get_crdt, rebuild_index}; -use super::types::{CrdtEvent, PipelineDoc, PipelineItemCrdt}; -use crate::io::story_metadata::QaMode; -use crate::slog; - -// ── Name migration helpers ──────────────────────────────────────────── - -/// Derive a human-readable name from a story ID's slug component. -/// -/// Strips the numeric prefix and item-type prefix (story/bug/spike/refactor), -/// replaces underscores with spaces, and capitalises the first letter. -/// -/// Examples: -/// - `"729_story_store_story_name"` → `"Store story name"` -/// - `"4_bug_login_crash"` → `"Login crash"` -/// - `"10_spike_arch_review"` → `"Arch review"` -pub fn name_from_story_id(story_id: &str) -> String { - // Strip the leading digits then the first underscore: "729_story_..." → "story_..." - let after_num = story_id.trim_start_matches(|c: char| c.is_ascii_digit()); - let after_num = after_num.strip_prefix('_').unwrap_or(after_num); - - // Strip the item-type prefix. - let slug = after_num - .strip_prefix("story_") - .or_else(|| after_num.strip_prefix("bug_")) - .or_else(|| after_num.strip_prefix("spike_")) - .or_else(|| after_num.strip_prefix("refactor_")) - .unwrap_or(after_num); - - // Replace underscores with spaces. - let spaced = slug.replace('_', " "); - - // Capitalise the first character. - let mut chars = spaced.chars(); - match chars.next() { - None => String::new(), - Some(first) => { - let mut name = first.to_uppercase().to_string(); - name.push_str(chars.as_str()); - name - } - } -} - -/// Extract the numeric-only ID from a slug-form story ID, if applicable. -/// -/// Returns `Some("664")` for `"664_story_my_feature"`, and `None` for IDs -/// that are already numeric-only (`"664"`) or have no valid numeric prefix. -fn numeric_id_from_slug(story_id: &str) -> Option { - // Already numeric-only — no migration needed. - if story_id.chars().all(|c: char| c.is_ascii_digit()) { - return None; - } - // Must have a non-empty numeric segment before the first underscore. - let idx = story_id.find('_')?; - let prefix = &story_id[..idx]; - if prefix.is_empty() || !prefix.chars().all(|c| c.is_ascii_digit()) { - return None; - } - Some(prefix.to_string()) -} - -/// Migrate existing story IDs from slug form (`664_story_my_feature`) to -/// numeric-only form (`664`) in the in-memory CRDT, persisting a signed op -/// for each updated register so the change survives restarts. -/// -/// Returns the list of `(old_id, new_id)` pairs that were actually migrated. -/// Callers should use this list to rename downstream filesystem artifacts -/// (worktree directories, git branches, log directories). -/// -/// Items whose `story_id` is already numeric-only are left untouched. -/// Items where the target numeric ID is already in use are skipped to avoid -/// conflicts. Running this migration repeatedly is safe — subsequent calls -/// on already-migrated state are no-ops. -pub fn migrate_story_ids_to_numeric() -> Vec<(String, String)> { - let Some(state_mutex) = get_crdt() else { - return Vec::new(); - }; - - // First pass: collect (index, old_id, new_id) while holding the lock. - let migrations: Vec<(usize, String, String)> = { - let Ok(state) = state_mutex.lock() else { - return Vec::new(); - }; - - let existing_ids: std::collections::HashSet = state.index.keys().cloned().collect(); - - state - .index - .iter() - .filter_map(|(story_id, &idx)| { - let numeric = numeric_id_from_slug(story_id)?; - // Skip if the target numeric ID is already occupied. - if existing_ids.contains(&numeric) { - return None; - } - Some((idx, story_id.clone(), numeric)) - }) - .collect() - }; - - if migrations.is_empty() { - return Vec::new(); - } - - // Second pass: apply story_id register updates. - let Ok(mut state) = state_mutex.lock() else { - return Vec::new(); - }; - - let mut result = Vec::new(); - for (idx, old_id, new_id) in migrations { - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].story_id.set(new_id.clone()) - }); - result.push((old_id, new_id)); - } - - // Rebuild the index so all downstream reads use the new numeric IDs. - state.index = rebuild_index(&state.crdt); - - let count = result.len(); - slog!("[crdt] Migrated {count} story IDs from slug form to numeric"); - result -} - -/// Backfill the `name` CRDT field for pipeline items that have an empty name. -/// -/// Iterates over all items in the in-memory CRDT. For each item whose `name` -/// register is empty, derives a human-readable name from the story ID slug -/// (see [`name_from_story_id`]) and writes it via a signed CRDT op. -/// -/// This is a one-time startup migration: items created before the `name` field -/// was consistently populated will gain a name on the next server start. -/// Items that already have a non-empty name are left untouched. -pub fn migrate_names_from_slugs() { - let Some(state_mutex) = get_crdt() else { - return; - }; - - // First pass: collect (index, derived_name) pairs for items missing a name. - let migrations: Vec<(usize, String)> = { - let Ok(state) = state_mutex.lock() else { - return; - }; - state - .index - .iter() - .filter_map(|(story_id, &idx)| { - let item = &state.crdt.doc.items[idx]; - // Skip items that already have a name. - let already_named = - matches!(item.name.view(), JsonValue::String(ref s) if !s.is_empty()); - if already_named { - return None; - } - let name = name_from_story_id(story_id); - if name.is_empty() { - return None; - } - Some((idx, name)) - }) - .collect() - }; - - if migrations.is_empty() { - return; - } - - // Second pass: apply all name writes while holding the lock. - let Ok(mut state) = state_mutex.lock() else { - return; - }; - let count = migrations.len(); - for (idx, name) in migrations { - apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].name.set(name.clone())); - } - slog!("[crdt] Migrated names for {count} items from story ID slugs"); -} - -/// Set the typed `depends_on` CRDT register for a pipeline item. -/// -/// Encodes `deps` as a compact JSON array string (e.g. `"[837]"`) and writes it -/// to the item's `depends_on` register. An empty slice clears the register to an -/// empty string, which means "no dependencies". -/// -/// Returns `true` if the item was found and the op was applied, `false` otherwise. -pub fn set_depends_on(story_id: &str, deps: &[u32]) -> bool { - let Some(state_mutex) = get_crdt() else { - return false; - }; - let Ok(mut state) = state_mutex.lock() else { - return false; - }; - let Some(&idx) = state.index.get(story_id) else { - return false; - }; - let value = if deps.is_empty() { - String::new() - } else { - serde_json::to_string(deps).unwrap_or_default() - }; - apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].depends_on.set(value)); - true -} - -/// Set the `agent` field for a pipeline item by its story ID. -/// -/// `Some(name)` writes the agent name into the CRDT register. -/// `None` clears the register by writing an empty string — use this -/// to unpin an agent without touching the surrounding item. -/// -/// This is the typed setter counterpart to [`write_item`]'s `agent` parameter. -/// Callers that only need to update the agent (e.g. the `update_story` MCP tool -/// and the Matrix `!assign` command) should prefer this function over -/// passing `agent` through the full [`write_item`] call, which requires all -/// other fields to be known. -/// -/// Returns `true` if the item was found and the write was performed. -pub fn set_agent(story_id: &str, agent: Option<&str>) -> bool { - let Some(state_mutex) = get_crdt() else { - return false; - }; - let Ok(mut state) = state_mutex.lock() else { - return false; - }; - let Some(&idx) = state.index.get(story_id) else { - return false; - }; - let value = agent.unwrap_or("").to_string(); - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].agent.set(value.clone()) - }); - true -} - -/// Set the typed `qa_mode` CRDT register for a pipeline item. -/// -/// Passing `Some(mode)` writes the mode string (e.g. `"server"`, `"agent"`, `"human"`) -/// to the item's `qa_mode` register and persists a signed op. -/// Passing `None` clears the register to an empty string, which means -/// "use the project default" (same as if the field was never set). -/// -/// Returns `true` if the item was found and the op was applied, `false` otherwise. -pub fn set_qa_mode(story_id: &str, mode: Option) -> bool { - let Some(state_mutex) = get_crdt() else { - return false; - }; - let Ok(mut state) = state_mutex.lock() else { - return false; - }; - let Some(&idx) = state.index.get(story_id) else { - return false; - }; - let value = mode.map(|m| m.as_str().to_string()).unwrap_or_default(); - apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].qa_mode.set(value)); - true -} - -/// Set the `mergemaster_attempted` CRDT flag for a pipeline item. -/// -/// Passing `true` records that a mergemaster session has been spawned for this -/// item, preventing repeated auto-spawns across restarts. -/// Passing `false` explicitly writes `false` (does not remove the register) so -/// the cleared state is distinguishable from an unset register and survives -/// CRDT replay correctly. -/// -/// Returns `true` if the item was found and the op was applied, `false` otherwise. -pub fn set_mergemaster_attempted(story_id: &str, value: bool) -> bool { - let Some(state_mutex) = get_crdt() else { - return false; - }; - let Ok(mut state) = state_mutex.lock() else { - return false; - }; - let Some(&idx) = state.index.get(story_id) else { - return false; - }; - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].mergemaster_attempted.set(value) - }); - true -} - -/// Write a pipeline item state through CRDT operations. -/// -/// If the item exists, updates its registers. If not, inserts a new item -/// into the list. All ops are signed and persisted to SQLite. -/// -/// When the stage changes (or a new item is created), a [`CrdtEvent`] is -/// broadcast so subscribers can react to the transition. -#[allow(clippy::too_many_arguments)] -pub fn write_item( - story_id: &str, - stage: &str, - name: Option<&str>, - agent: Option<&str>, - retry_count: Option, - blocked: Option, - depends_on: Option<&str>, - claimed_by: Option<&str>, - claimed_at: Option, - merged_at: Option, -) { - let Some(state_mutex) = get_crdt() else { - return; - }; - let Ok(mut state) = state_mutex.lock() else { - return; - }; - - if let Some(&idx) = state.index.get(story_id) { - // Capture the old stage before updating so we can detect transitions. - let old_stage = match state.crdt.doc.items[idx].stage.view() { - JsonValue::String(s) => Some(s), - _ => None, - }; - - // Update existing item registers. - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].stage.set(stage.to_string()) - }); - - if let Some(n) = name { - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].name.set(n.to_string()) - }); - } - if let Some(a) = agent { - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].agent.set(a.to_string()) - }); - } - if let Some(rc) = retry_count { - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].retry_count.set(rc as f64) - }); - } - if let Some(b) = blocked { - apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].blocked.set(b)); - } - if let Some(d) = depends_on { - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].depends_on.set(d.to_string()) - }); - } - if let Some(cb) = claimed_by { - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].claimed_by.set(cb.to_string()) - }); - } - if let Some(ca) = claimed_at { - apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(ca)); - } - if let Some(ma) = merged_at { - apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma)); - } - - // Broadcast a CrdtEvent if the stage actually changed. - let stage_changed = old_stage.as_deref() != Some(stage); - if stage_changed { - // Read the current name from the CRDT document for the event. - let current_name = match state.crdt.doc.items[idx].name.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - emit_event(CrdtEvent { - story_id: story_id.to_string(), - from_stage: old_stage, - to_stage: stage.to_string(), - name: current_name, - }); - } - } else { - // Insert new item. - let item_json: JsonValue = json!({ - "story_id": story_id, - "stage": stage, - "name": name.unwrap_or(""), - "agent": agent.unwrap_or(""), - "retry_count": retry_count.unwrap_or(0) as f64, - "blocked": blocked.unwrap_or(false), - "depends_on": depends_on.unwrap_or(""), - "claimed_by": claimed_by.unwrap_or(""), - "claimed_at": claimed_at.unwrap_or(0.0), - "merged_at": merged_at.unwrap_or(0.0), - "qa_mode": "", - "mergemaster_attempted": false, - }) - .into(); - - apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); - - // Rebuild index after insertion (indices may shift). - state.index = rebuild_index(&state.crdt); - - // Advance the inner registers of the newly-created item to the Lamport - // floor so their first local ops don't re-emit low sequence numbers. - let floor = state.lamport_floor; - if floor > 0 - && let Some(&idx) = state.index.get(story_id) - { - let item = &mut state.crdt.doc.items[idx]; - item.story_id.advance_seq(floor); - item.stage.advance_seq(floor); - item.name.advance_seq(floor); - item.agent.advance_seq(floor); - item.retry_count.advance_seq(floor); - item.blocked.advance_seq(floor); - item.depends_on.advance_seq(floor); - item.claimed_by.advance_seq(floor); - item.claimed_at.advance_seq(floor); - item.merged_at.advance_seq(floor); - item.qa_mode.advance_seq(floor); - item.mergemaster_attempted.advance_seq(floor); - } - - // Broadcast a CrdtEvent for the new item. - emit_event(CrdtEvent { - story_id: story_id.to_string(), - from_stage: None, - to_stage: stage.to_string(), - name: name.map(String::from), - }); - } -} - -/// Set `retry_count` to an explicit value for a pipeline item. -/// -/// Pure metadata operation — the item's stage is not changed. -/// Call `set_retry_count(story_id, 0)` to reset the counter after a -/// stage transition or an explicit unblock. -pub fn set_retry_count(story_id: &str, count: i64) { - let Some(state_mutex) = get_crdt() else { - return; - }; - let Ok(mut state) = state_mutex.lock() else { - return; - }; - if let Some(&idx) = state.index.get(story_id) { - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].retry_count.set(count as f64) - }); - } -} - -/// Increment `retry_count` by 1 and return the new value. -/// -/// Pure metadata operation — the item's stage is not changed. -/// Returns 0 if the item is not found in the CRDT (no-op in that case). -/// Use the returned value to decide whether the story should be blocked. -pub fn bump_retry_count(story_id: &str) -> i64 { - let Some(state_mutex) = get_crdt() else { - return 0; - }; - let Ok(mut state) = state_mutex.lock() else { - return 0; - }; - let Some(&idx) = state.index.get(story_id) else { - return 0; - }; - let current = match state.crdt.doc.items[idx].retry_count.view() { - JsonValue::Number(n) => n as i64, - _ => 0, - }; - let new_count = current + 1; - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].retry_count.set(new_count as f64) - }); - new_count -} - -#[cfg(test)] -mod tests { - use super::super::hex; - use super::super::read::extract_item_view; - use super::super::read::read_item; - use super::super::state::init_for_test; - use super::super::state::rebuild_index; - use super::*; - - // ── name_from_story_id tests ───────────────────────────────────────────── - - #[test] - fn name_from_story_id_story_type() { - assert_eq!( - name_from_story_id("729_story_store_story_name_as_a_crdt_field"), - "Store story name as a crdt field" - ); - } - - #[test] - fn name_from_story_id_bug_type() { - assert_eq!(name_from_story_id("4_bug_login_crash"), "Login crash"); - } - - #[test] - fn name_from_story_id_spike_type() { - assert_eq!(name_from_story_id("10_spike_arch_review"), "Arch review"); - } - - #[test] - fn name_from_story_id_refactor_type() { - assert_eq!( - name_from_story_id("99_refactor_decompose_server"), - "Decompose server" - ); - } - - #[test] - fn name_from_story_id_single_word() { - assert_eq!(name_from_story_id("1_story_auth"), "Auth"); - } - - #[test] - fn name_from_story_id_unknown_type_fallback() { - // Unknown type prefix is left as-is after stripping the number. - assert_eq!(name_from_story_id("5_unknown_foo_bar"), "Unknown foo bar"); - } - - // ── migrate_names_from_slugs tests ─────────────────────────────────────── - - // ── numeric_id_from_slug tests ──────────────────────────────────────────── - - #[test] - fn numeric_id_from_slug_extracts_prefix() { - assert_eq!( - numeric_id_from_slug("664_story_my_feature"), - Some("664".to_string()) - ); - assert_eq!( - numeric_id_from_slug("4_bug_login_crash"), - Some("4".to_string()) - ); - assert_eq!( - numeric_id_from_slug("730_refactor_foo_bar"), - Some("730".to_string()) - ); - } - - #[test] - fn numeric_id_from_slug_returns_none_for_numeric_only() { - assert_eq!(numeric_id_from_slug("664"), None); - assert_eq!(numeric_id_from_slug("1"), None); - assert_eq!(numeric_id_from_slug("730"), None); - } - - #[test] - fn numeric_id_from_slug_returns_none_for_non_numeric_prefix() { - assert_eq!(numeric_id_from_slug("story_no_number"), None); - assert_eq!(numeric_id_from_slug(""), None); - assert_eq!(numeric_id_from_slug("abc_story"), None); - } - - // ── migrate_story_ids_to_numeric tests ─────────────────────────────────── - - #[test] - fn migrate_story_ids_to_numeric_rewrites_slug_ids() { - init_for_test(); - - write_item( - "42_story_my_feature", - "1_backlog", - Some("My Feature"), - None, - None, - None, - None, - None, - None, - None, - ); - - let result = migrate_story_ids_to_numeric(); - assert_eq!(result.len(), 1, "exactly one item should be migrated"); - assert_eq!(result[0].0, "42_story_my_feature"); - assert_eq!(result[0].1, "42"); - - // After migration the item is accessible by the numeric ID. - let item = read_item("42").expect("item should be found by numeric ID"); - assert_eq!(item.story_id, "42"); - - // The slug-based ID is gone. - assert!( - read_item("42_story_my_feature").is_none(), - "slug ID should no longer be in the index" - ); - } - - #[test] - fn migrate_story_ids_to_numeric_is_idempotent() { - init_for_test(); - - write_item( - "43", - "1_backlog", - Some("Already Numeric"), - None, - None, - None, - None, - None, - None, - None, - ); - - // First call — nothing to migrate. - let r1 = migrate_story_ids_to_numeric(); - assert!(r1.is_empty(), "no migration for already-numeric ID"); - - // Second call — still nothing. - let r2 = migrate_story_ids_to_numeric(); - assert!(r2.is_empty(), "second call must be a no-op"); - - // Item is still there. - assert!(read_item("43").is_some()); - } - - #[test] - fn migrate_story_ids_to_numeric_skips_conflict() { - init_for_test(); - - // Both the slug form AND its numeric target exist. - write_item( - "44_story_foo", - "1_backlog", - None, - None, - None, - None, - None, - None, - None, - None, - ); - write_item( - "44", - "2_current", - None, - None, - None, - None, - None, - None, - None, - None, - ); - - let result = migrate_story_ids_to_numeric(); - // The slug entry must NOT be migrated because "44" is already occupied. - assert!( - result.is_empty(), - "conflicting slug must not overwrite existing numeric entry" - ); - - // Both items remain as they were. - assert!(read_item("44_story_foo").is_some()); - assert!(read_item("44").is_some()); - } - - #[test] - fn migrate_story_ids_to_numeric_noop_when_crdt_not_initialised() { - // Must not panic when called before init. - migrate_story_ids_to_numeric(); - } - - #[test] - fn migrate_story_ids_to_numeric_preserves_stage_and_name() { - init_for_test(); - - write_item( - "45_bug_crash", - "2_current", - Some("Crash Bug"), - Some("coder-1"), - None, - None, - None, - None, - None, - None, - ); - - migrate_story_ids_to_numeric(); - - let item = read_item("45").expect("item must be accessible by numeric ID"); - assert_eq!(item.stage, "2_current"); - assert_eq!(item.name.as_deref(), Some("Crash Bug")); - assert_eq!(item.agent.as_deref(), Some("coder-1")); - } - - #[test] - fn migrate_names_from_slugs_fills_empty_names() { - init_for_test(); - - // Write an item without a name. - write_item( - "42_story_my_feature", - "1_backlog", - None, - None, - None, - None, - None, - None, - None, - None, - ); - - // Before migration the name should be empty. - let before = read_item("42_story_my_feature").unwrap(); - assert!( - before.name.as_deref().unwrap_or("").is_empty(), - "name should be empty before migration" - ); - - migrate_names_from_slugs(); - - // After migration the name should be derived from the slug. - let after = read_item("42_story_my_feature").unwrap(); - assert_eq!( - after.name.as_deref(), - Some("My feature"), - "name should be derived from slug after migration" - ); - } - - #[test] - fn migrate_names_from_slugs_leaves_existing_names_unchanged() { - init_for_test(); - - write_item( - "43_story_named_item", - "1_backlog", - Some("Already Named"), - None, - None, - None, - None, - None, - None, - None, - ); - - migrate_names_from_slugs(); - - let after = read_item("43_story_named_item").unwrap(); - assert_eq!( - after.name.as_deref(), - Some("Already Named"), - "pre-existing name must not be overwritten" - ); - } - - #[test] - fn migrate_names_from_slugs_noop_when_crdt_not_initialised() { - // Should not panic when called before init. - // In practice get_crdt() returns None in a fresh thread. - // We call it here just to confirm no panic. - migrate_names_from_slugs(); - } - - // ── set_depends_on regression tests ────────────────────────────────────── - - #[test] - fn set_depends_on_round_trip_and_clear() { - use super::super::read::{check_unmet_deps_crdt, read_item}; - init_for_test(); - - write_item( - "872_test_target", - "1_backlog", - Some("Target"), - None, - None, - None, - None, - None, - None, - None, - ); - - // Set depends_on to [837] and verify CRDT register holds the list. - let ok = set_depends_on("872_test_target", &[837]); - assert!(ok, "set_depends_on should return true for known item"); - let view = read_item("872_test_target").unwrap(); - assert_eq!( - view.depends_on, - Some(vec![837]), - "CRDT register should hold [837]" - ); - - // Clear by passing an empty slice. - let ok = set_depends_on("872_test_target", &[]); - assert!(ok, "set_depends_on([]) should return true"); - let view = read_item("872_test_target").unwrap(); - assert_eq!( - view.depends_on, None, - "clearing should leave register unset" - ); - - // Auto-assigner sees no unmet dependency after clearing. - let unmet = check_unmet_deps_crdt("872_test_target"); - assert!( - unmet.is_empty(), - "after clearing deps, auto-assigner should see no unmet dependencies" - ); - } - - #[test] - fn set_depends_on_returns_false_for_unknown_story() { - init_for_test(); - let ok = set_depends_on("nonexistent_story_872", &[1, 2, 3]); - assert!( - !ok, - "set_depends_on should return false for unknown story_id" - ); - } - - // ── set_agent tests ────────────────────────────────────────────────────── - - #[test] - fn set_agent_some_writes_name() { - init_for_test(); - - write_item( - "871_story_set_agent_write", - "2_current", - Some("Set Agent Write"), - None, - None, - None, - None, - None, - None, - None, - ); - - let found = set_agent("871_story_set_agent_write", Some("coder-1")); - assert!(found, "set_agent should return true for an existing item"); - - let item = read_item("871_story_set_agent_write").expect("item must exist"); - assert_eq!( - item.agent.as_deref(), - Some("coder-1"), - "agent should be written to CRDT register" - ); - } - - #[test] - fn set_agent_none_clears_register() { - init_for_test(); - - write_item( - "871_story_set_agent_clear", - "2_current", - Some("Set Agent Clear"), - Some("coder-2"), - None, - None, - None, - None, - None, - None, - ); - - // Confirm agent is set. - let before = read_item("871_story_set_agent_clear").expect("item must exist"); - assert_eq!(before.agent.as_deref(), Some("coder-2")); - - // Clear it. - let found = set_agent("871_story_set_agent_clear", None); - assert!(found, "set_agent should return true for an existing item"); - - let after = read_item("871_story_set_agent_clear").expect("item must exist"); - assert!( - after.agent.as_deref().unwrap_or("").is_empty(), - "agent should be cleared (empty string) after set_agent(None)" - ); - } - - #[test] - fn set_agent_returns_false_for_unknown_story() { - init_for_test(); - - let found = set_agent("999_story_nonexistent", Some("coder-1")); - assert!( - !found, - "set_agent should return false when story is not in the CRDT" - ); - } - - // ── set_qa_mode regression tests ───────────────────────────────────────── - - #[test] - fn set_qa_mode_round_trip_server_then_human() { - use crate::io::story_metadata::QaMode; - init_for_test(); - - write_item( - "869_story_qa_roundtrip", - "1_backlog", - None, - None, - None, - None, - None, - None, - None, - None, - ); - - // Set qa=server via typed path and assert CRDT register reflects it. - let ok = set_qa_mode("869_story_qa_roundtrip", Some(QaMode::Server)); - assert!(ok, "set_qa_mode should return true for known item"); - let view = read_item("869_story_qa_roundtrip").unwrap(); - assert_eq!( - view.qa_mode.as_deref(), - Some("server"), - "CRDT register should hold \"server\"" - ); - - // Set qa=human via typed path and assert CRDT register is updated. - let ok = set_qa_mode("869_story_qa_roundtrip", Some(QaMode::Human)); - assert!(ok, "set_qa_mode should return true for known item"); - let view = read_item("869_story_qa_roundtrip").unwrap(); - assert_eq!( - view.qa_mode.as_deref(), - Some("human"), - "CRDT register should hold \"human\"" - ); - - // Clear via None — register goes back to unset. - let ok = set_qa_mode("869_story_qa_roundtrip", None); - assert!(ok, "set_qa_mode(None) should return true"); - let view = read_item("869_story_qa_roundtrip").unwrap(); - assert_eq!( - view.qa_mode, None, - "clearing qa_mode should leave register unset" - ); - } - - // ── set_mergemaster_attempted regression tests ─────────────────────────── - - #[test] - fn set_mergemaster_attempted_true_then_false_flips_register() { - init_for_test(); - - write_item( - "873_story_mergemaster_flip", - "4_merge", - None, - None, - None, - None, - None, - None, - None, - None, - ); - - // Set true — register must read back as true. - let ok = set_mergemaster_attempted("873_story_mergemaster_flip", true); - assert!( - ok, - "set_mergemaster_attempted should return true for known item" - ); - let view = read_item("873_story_mergemaster_flip").unwrap(); - assert_eq!( - view.mergemaster_attempted, - Some(true), - "CRDT register should hold true after setting true" - ); - - // Set false — register must flip back to false (not unset). - let ok = set_mergemaster_attempted("873_story_mergemaster_flip", false); - assert!( - ok, - "set_mergemaster_attempted(false) should return true for known item" - ); - let view = read_item("873_story_mergemaster_flip").unwrap(); - assert_eq!( - view.mergemaster_attempted, - Some(false), - "CRDT register should hold false after explicit clear" - ); - } - - #[test] - fn set_mergemaster_attempted_returns_false_for_unknown_story() { - init_for_test(); - let ok = set_mergemaster_attempted("nonexistent_story_mm", true); - assert!( - !ok, - "set_mergemaster_attempted should return false for unknown story_id" - ); - } - - #[test] - fn set_qa_mode_returns_false_for_unknown_story() { - init_for_test(); - use crate::io::story_metadata::QaMode; - let ok = set_qa_mode("nonexistent_story_qa", Some(QaMode::Server)); - assert!(!ok, "set_qa_mode should return false for unknown story_id"); - } - use bft_json_crdt::json_crdt::OpState; - use bft_json_crdt::keypair::make_keypair; - use bft_json_crdt::op::ROOT_ID; - use serde_json::json; - use sqlx::SqlitePool; - use sqlx::sqlite::SqliteConnectOptions; - - // ── set_retry_count / bump_retry_count tests ───────────────────────────── - - #[test] - fn bump_retry_count_increments_by_one() { - init_for_test(); - write_item( - "9001_story_bump_test", - "2_current", - None, - None, - None, - None, - None, - None, - None, - None, - ); - - let v1 = bump_retry_count("9001_story_bump_test"); - assert_eq!(v1, 1, "first bump should return 1"); - - let v2 = bump_retry_count("9001_story_bump_test"); - assert_eq!(v2, 2, "second bump should return 2"); - - let item = read_item("9001_story_bump_test").expect("item must exist"); - assert_eq!( - item.retry_count, - Some(2), - "CRDT must reflect final bump value" - ); - } - - #[test] - fn set_retry_count_resets_to_zero() { - init_for_test(); - write_item( - "9002_story_set_test", - "2_current", - None, - None, - Some(5), - None, - None, - None, - None, - None, - ); - - set_retry_count("9002_story_set_test", 0); - - let item = read_item("9002_story_set_test").expect("item must exist"); - assert_eq!( - item.retry_count, - Some(0), - "set_retry_count(0) must reset to 0" - ); - } - - #[test] - fn bump_returns_zero_for_missing_item() { - init_for_test(); - let result = bump_retry_count("nonexistent_story"); - assert_eq!(result, 0, "bump on missing item should return 0 (no-op)"); - } - - #[tokio::test] - async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { - let tmp = tempfile::tempdir().unwrap(); - let db_path = tmp.path().join("bug511.db"); - - let options = SqliteConnectOptions::new() - .filename(&db_path) - .create_if_missing(true); - let pool = SqlitePool::connect_with(options).await.unwrap(); - sqlx::migrate!("./migrations").run(&pool).await.unwrap(); - - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - - // Insert 5 dummy items to advance items.our_seq to 5. - for i in 0..5u32 { - let sid = format!("{}_story_warmup", i); - let item: JsonValue = json!({ - "story_id": sid, - "stage": "1_backlog", - "name": "", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); - crdt.apply(op.clone()); - // We don't persist these to the DB — they are pre-history. - } - - // Now insert the real item. items.our_seq was 5, so this op gets seq=6. - let target_item: JsonValue = json!({ - "story_id": "511_story_target", - "stage": "1_backlog", - "name": "Bug 511 target", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp); - crdt.apply(insert_op.clone()); - // insert_op.inner.seq == 6 - - // Now update the stage. The stage LwwRegisterCrdt for this item starts - // at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6. - let idx = rebuild_index(&crdt)["511_story_target"]; - let stage_op = crdt.doc.items[idx] - .stage - .set("2_current".to_string()) - .sign(&kp); - crdt.apply(stage_op.clone()); - // stage_op.inner.seq == 1 - - // Persist BOTH ops in causal order (insert first, update second). - // This means insert_op gets rowid < stage_op rowid. - let now = chrono::Utc::now().to_rfc3339(); - for op in [&insert_op, &stage_op] { - let op_json = serde_json::to_string(op).unwrap(); - let op_id = hex::encode(&op.id()); - sqlx::query( - "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", - ) - .bind(&op_id) - .bind(op.inner.seq as i64) - .bind(&op_json) - .bind(&now) - .execute(&pool) - .await - .unwrap(); - } - - // Replay by rowid ASC (the fix). The insert must come before the field - // update regardless of their field-level seq values. - let rows: Vec<(String,)> = - sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") - .fetch_all(&pool) - .await - .unwrap(); - - let mut crdt2 = BaseCrdt::::new(&kp); - for (json_str,) in &rows { - let op: SignedOp = serde_json::from_str(json_str).unwrap(); - crdt2.apply(op); - } - - // The item must be in the CRDT and must reflect the stage update. - let index2 = rebuild_index(&crdt2); - assert!( - index2.contains_key("511_story_target"), - "item not found after rowid-order replay" - ); - let idx2 = index2["511_story_target"]; - let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap(); - assert_eq!( - view.stage, "2_current", - "stage field update lost during replay (bug 511 regression)" - ); - - // Confirm the bug is reproducible by replaying seq ASC instead. - // With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6), - // fails ErrPathMismatch, and the item ends up at "1_backlog". - let rows_wrong_order: Vec<(String,)> = - sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC") - .fetch_all(&pool) - .await - .unwrap(); - - let mut crdt3 = BaseCrdt::::new(&kp); - for (json_str,) in &rows_wrong_order { - let op: SignedOp = serde_json::from_str(json_str).unwrap(); - crdt3.apply(op); - } - - let index3 = rebuild_index(&crdt3); - // With seq ASC replay, the item is created (insert_op eventually runs) - // but the stage update is lost (it ran before the item existed). - if let Some(idx3) = index3.get("511_story_target") { - let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap(); - // The bug: stage is still "1_backlog" because the update was dropped. - assert_eq!( - view3.stage, "1_backlog", - "expected seq-ASC replay to exhibit the bug (update lost)" - ); - } - } -} diff --git a/server/src/crdt_state/write/item.rs b/server/src/crdt_state/write/item.rs new file mode 100644 index 00000000..12b39a33 --- /dev/null +++ b/server/src/crdt_state/write/item.rs @@ -0,0 +1,305 @@ +//! CRDT write operations for individual pipeline items. +//! +//! Provides typed setters for agent, QA mode, retry count, and the primary +//! `write_item` function that inserts or updates pipeline items in the CRDT. + +use bft_json_crdt::json_crdt::{CrdtNode, JsonValue}; +use bft_json_crdt::lww_crdt::LwwRegisterCrdt; +use bft_json_crdt::op::ROOT_ID; +use serde_json::json; + +use super::super::state::{apply_and_persist, emit_event, get_crdt, rebuild_index}; +use super::super::types::CrdtEvent; +use crate::io::story_metadata::QaMode; + +/// Set the typed `depends_on` CRDT register for a pipeline item. +/// +/// Encodes `deps` as a compact JSON array string (e.g. `"[837]"`) and writes it +/// to the item's `depends_on` register. An empty slice clears the register to an +/// empty string, which means "no dependencies". +/// +/// Returns `true` if the item was found and the op was applied, `false` otherwise. +pub fn set_depends_on(story_id: &str, deps: &[u32]) -> bool { + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + let Some(&idx) = state.index.get(story_id) else { + return false; + }; + let value = if deps.is_empty() { + String::new() + } else { + serde_json::to_string(deps).unwrap_or_default() + }; + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].depends_on.set(value)); + true +} + +/// Set the `mergemaster_attempted` CRDT flag for a pipeline item. +/// +/// Passing `true` records that a mergemaster session has been spawned for this +/// item, preventing repeated auto-spawns across restarts. +/// Passing `false` explicitly writes `false` (does not remove the register) so +/// the cleared state is distinguishable from an unset register and survives +/// CRDT replay correctly. +/// +/// Returns `true` if the item was found and the op was applied, `false` otherwise. +pub fn set_mergemaster_attempted(story_id: &str, value: bool) -> bool { + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + let Some(&idx) = state.index.get(story_id) else { + return false; + }; + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].mergemaster_attempted.set(value) + }); + true +} + +/// Set the `agent` field for a pipeline item by its story ID. +/// +/// `Some(name)` writes the agent name into the CRDT register. +/// `None` clears the register by writing an empty string — use this +/// to unpin an agent without touching the surrounding item. +/// +/// This is the typed setter counterpart to [`write_item`]'s `agent` parameter. +/// Callers that only need to update the agent (e.g. the `update_story` MCP tool +/// and the Matrix `!assign` command) should prefer this function over +/// passing `agent` through the full [`write_item`] call, which requires all +/// other fields to be known. +/// +/// Returns `true` if the item was found and the write was performed. +pub fn set_agent(story_id: &str, agent: Option<&str>) -> bool { + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + let Some(&idx) = state.index.get(story_id) else { + return false; + }; + let value = agent.unwrap_or("").to_string(); + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].agent.set(value.clone()) + }); + true +} + +/// Set the typed `qa_mode` CRDT register for a pipeline item. +/// +/// Passing `Some(mode)` writes the mode string (e.g. `"server"`, `"agent"`, `"human"`) +/// to the item's `qa_mode` register and persists a signed op. +/// Passing `None` clears the register to an empty string, which means +/// "use the project default" (same as if the field was never set). +/// +/// Returns `true` if the item was found and the op was applied, `false` otherwise. +pub fn set_qa_mode(story_id: &str, mode: Option) -> bool { + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + let Some(&idx) = state.index.get(story_id) else { + return false; + }; + let value = mode.map(|m| m.as_str().to_string()).unwrap_or_default(); + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].qa_mode.set(value)); + true +} + +/// Write a pipeline item state through CRDT operations. +/// +/// If the item exists, updates its registers. If not, inserts a new item +/// into the list. All ops are signed and persisted to SQLite. +/// +/// When the stage changes (or a new item is created), a [`CrdtEvent`] is +/// broadcast so subscribers can react to the transition. +#[allow(clippy::too_many_arguments)] +pub fn write_item( + story_id: &str, + stage: &str, + name: Option<&str>, + agent: Option<&str>, + retry_count: Option, + blocked: Option, + depends_on: Option<&str>, + claimed_by: Option<&str>, + claimed_at: Option, + merged_at: Option, +) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + + if let Some(&idx) = state.index.get(story_id) { + // Capture the old stage before updating so we can detect transitions. + let old_stage = match state.crdt.doc.items[idx].stage.view() { + JsonValue::String(s) => Some(s), + _ => None, + }; + + // Update existing item registers. + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].stage.set(stage.to_string()) + }); + + if let Some(n) = name { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].name.set(n.to_string()) + }); + } + if let Some(a) = agent { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].agent.set(a.to_string()) + }); + } + if let Some(rc) = retry_count { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].retry_count.set(rc as f64) + }); + } + if let Some(b) = blocked { + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].blocked.set(b)); + } + if let Some(d) = depends_on { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].depends_on.set(d.to_string()) + }); + } + if let Some(cb) = claimed_by { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].claimed_by.set(cb.to_string()) + }); + } + if let Some(ca) = claimed_at { + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(ca)); + } + if let Some(ma) = merged_at { + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma)); + } + + // Broadcast a CrdtEvent if the stage actually changed. + let stage_changed = old_stage.as_deref() != Some(stage); + if stage_changed { + // Read the current name from the CRDT document for the event. + let current_name = match state.crdt.doc.items[idx].name.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + emit_event(CrdtEvent { + story_id: story_id.to_string(), + from_stage: old_stage, + to_stage: stage.to_string(), + name: current_name, + }); + } + } else { + // Insert new item. + let item_json: JsonValue = json!({ + "story_id": story_id, + "stage": stage, + "name": name.unwrap_or(""), + "agent": agent.unwrap_or(""), + "retry_count": retry_count.unwrap_or(0) as f64, + "blocked": blocked.unwrap_or(false), + "depends_on": depends_on.unwrap_or(""), + "claimed_by": claimed_by.unwrap_or(""), + "claimed_at": claimed_at.unwrap_or(0.0), + "merged_at": merged_at.unwrap_or(0.0), + "qa_mode": "", + "mergemaster_attempted": false, + }) + .into(); + + apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); + + // Rebuild index after insertion (indices may shift). + state.index = rebuild_index(&state.crdt); + + // Advance the inner registers of the newly-created item to the Lamport + // floor so their first local ops don't re-emit low sequence numbers. + let floor = state.lamport_floor; + if floor > 0 + && let Some(&idx) = state.index.get(story_id) + { + let item = &mut state.crdt.doc.items[idx]; + item.story_id.advance_seq(floor); + item.stage.advance_seq(floor); + item.name.advance_seq(floor); + item.agent.advance_seq(floor); + item.retry_count.advance_seq(floor); + item.blocked.advance_seq(floor); + item.depends_on.advance_seq(floor); + item.claimed_by.advance_seq(floor); + item.claimed_at.advance_seq(floor); + item.merged_at.advance_seq(floor); + item.qa_mode.advance_seq(floor); + item.mergemaster_attempted.advance_seq(floor); + } + + // Broadcast a CrdtEvent for the new item. + emit_event(CrdtEvent { + story_id: story_id.to_string(), + from_stage: None, + to_stage: stage.to_string(), + name: name.map(String::from), + }); + } +} + +/// Set `retry_count` to an explicit value for a pipeline item. +/// +/// Pure metadata operation — the item's stage is not changed. +/// Call `set_retry_count(story_id, 0)` to reset the counter after a +/// stage transition or an explicit unblock. +pub fn set_retry_count(story_id: &str, count: i64) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + if let Some(&idx) = state.index.get(story_id) { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].retry_count.set(count as f64) + }); + } +} + +/// Increment `retry_count` by 1 and return the new value. +/// +/// Pure metadata operation — the item's stage is not changed. +/// Returns 0 if the item is not found in the CRDT (no-op in that case). +/// Use the returned value to decide whether the story should be blocked. +pub fn bump_retry_count(story_id: &str) -> i64 { + let Some(state_mutex) = get_crdt() else { + return 0; + }; + let Ok(mut state) = state_mutex.lock() else { + return 0; + }; + let Some(&idx) = state.index.get(story_id) else { + return 0; + }; + let current = match state.crdt.doc.items[idx].retry_count.view() { + JsonValue::Number(n) => n as i64, + _ => 0, + }; + let new_count = current + 1; + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].retry_count.set(new_count as f64) + }); + new_count +} diff --git a/server/src/crdt_state/write/migrations.rs b/server/src/crdt_state/write/migrations.rs new file mode 100644 index 00000000..6c4d9b2c --- /dev/null +++ b/server/src/crdt_state/write/migrations.rs @@ -0,0 +1,182 @@ +//! Name and story-ID migration helpers for pipeline items. +//! +//! Contains one-time startup migrations that backfill the `name` field from +//! story ID slugs and rewrite slug-form story IDs to numeric-only form. + +use bft_json_crdt::json_crdt::{CrdtNode, JsonValue}; + +use super::super::state::{apply_and_persist, get_crdt, rebuild_index}; +use crate::slog; + +/// Derive a human-readable name from a story ID's slug component. +/// +/// Strips the numeric prefix and item-type prefix (story/bug/spike/refactor), +/// replaces underscores with spaces, and capitalises the first letter. +/// +/// Examples: +/// - `"729_story_store_story_name"` → `"Store story name"` +/// - `"4_bug_login_crash"` → `"Login crash"` +/// - `"10_spike_arch_review"` → `"Arch review"` +pub fn name_from_story_id(story_id: &str) -> String { + // Strip the leading digits then the first underscore: "729_story_..." → "story_..." + let after_num = story_id.trim_start_matches(|c: char| c.is_ascii_digit()); + let after_num = after_num.strip_prefix('_').unwrap_or(after_num); + + // Strip the item-type prefix. + let slug = after_num + .strip_prefix("story_") + .or_else(|| after_num.strip_prefix("bug_")) + .or_else(|| after_num.strip_prefix("spike_")) + .or_else(|| after_num.strip_prefix("refactor_")) + .unwrap_or(after_num); + + // Replace underscores with spaces. + let spaced = slug.replace('_', " "); + + // Capitalise the first character. + let mut chars = spaced.chars(); + match chars.next() { + None => String::new(), + Some(first) => { + let mut name = first.to_uppercase().to_string(); + name.push_str(chars.as_str()); + name + } + } +} + +/// Extract the numeric-only ID from a slug-form story ID, if applicable. +/// +/// Returns `Some("664")` for `"664_story_my_feature"`, and `None` for IDs +/// that are already numeric-only (`"664"`) or have no valid numeric prefix. +pub(super) fn numeric_id_from_slug(story_id: &str) -> Option { + // Already numeric-only — no migration needed. + if story_id.chars().all(|c: char| c.is_ascii_digit()) { + return None; + } + // Must have a non-empty numeric segment before the first underscore. + let idx = story_id.find('_')?; + let prefix = &story_id[..idx]; + if prefix.is_empty() || !prefix.chars().all(|c| c.is_ascii_digit()) { + return None; + } + Some(prefix.to_string()) +} + +/// Migrate existing story IDs from slug form (`664_story_my_feature`) to +/// numeric-only form (`664`) in the in-memory CRDT, persisting a signed op +/// for each updated register so the change survives restarts. +/// +/// Returns the list of `(old_id, new_id)` pairs that were actually migrated. +/// Callers should use this list to rename downstream filesystem artifacts +/// (worktree directories, git branches, log directories). +/// +/// Items whose `story_id` is already numeric-only are left untouched. +/// Items where the target numeric ID is already in use are skipped to avoid +/// conflicts. Running this migration repeatedly is safe — subsequent calls +/// on already-migrated state are no-ops. +pub fn migrate_story_ids_to_numeric() -> Vec<(String, String)> { + let Some(state_mutex) = get_crdt() else { + return Vec::new(); + }; + + // First pass: collect (index, old_id, new_id) while holding the lock. + let migrations: Vec<(usize, String, String)> = { + let Ok(state) = state_mutex.lock() else { + return Vec::new(); + }; + + let existing_ids: std::collections::HashSet = state.index.keys().cloned().collect(); + + state + .index + .iter() + .filter_map(|(story_id, &idx)| { + let numeric = numeric_id_from_slug(story_id)?; + // Skip if the target numeric ID is already occupied. + if existing_ids.contains(&numeric) { + return None; + } + Some((idx, story_id.clone(), numeric)) + }) + .collect() + }; + + if migrations.is_empty() { + return Vec::new(); + } + + // Second pass: apply story_id register updates. + let Ok(mut state) = state_mutex.lock() else { + return Vec::new(); + }; + + let mut result = Vec::new(); + for (idx, old_id, new_id) in migrations { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].story_id.set(new_id.clone()) + }); + result.push((old_id, new_id)); + } + + // Rebuild the index so all downstream reads use the new numeric IDs. + state.index = rebuild_index(&state.crdt); + + let count = result.len(); + slog!("[crdt] Migrated {count} story IDs from slug form to numeric"); + result +} + +/// Backfill the `name` CRDT field for pipeline items that have an empty name. +/// +/// Iterates over all items in the in-memory CRDT. For each item whose `name` +/// register is empty, derives a human-readable name from the story ID slug +/// (see [`name_from_story_id`]) and writes it via a signed CRDT op. +/// +/// This is a one-time startup migration: items created before the `name` field +/// was consistently populated will gain a name on the next server start. +/// Items that already have a non-empty name are left untouched. +pub fn migrate_names_from_slugs() { + let Some(state_mutex) = get_crdt() else { + return; + }; + + // First pass: collect (index, derived_name) pairs for items missing a name. + let migrations: Vec<(usize, String)> = { + let Ok(state) = state_mutex.lock() else { + return; + }; + state + .index + .iter() + .filter_map(|(story_id, &idx)| { + let item = &state.crdt.doc.items[idx]; + // Skip items that already have a name. + let already_named = + matches!(item.name.view(), JsonValue::String(ref s) if !s.is_empty()); + if already_named { + return None; + } + let name = name_from_story_id(story_id); + if name.is_empty() { + return None; + } + Some((idx, name)) + }) + .collect() + }; + + if migrations.is_empty() { + return; + } + + // Second pass: apply all name writes while holding the lock. + let Ok(mut state) = state_mutex.lock() else { + return; + }; + let count = migrations.len(); + for (idx, name) in migrations { + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].name.set(name.clone())); + } + slog!("[crdt] Migrated names for {count} items from story ID slugs"); +} diff --git a/server/src/crdt_state/write/mod.rs b/server/src/crdt_state/write/mod.rs new file mode 100644 index 00000000..f3004d24 --- /dev/null +++ b/server/src/crdt_state/write/mod.rs @@ -0,0 +1,16 @@ +//! High-level write API for pipeline items. +//! +//! Provides typed setters and migration helpers for updating pipeline items +//! in the in-memory CRDT document, with ops persisted to SQLite. + +mod item; +mod migrations; + +#[cfg(test)] +mod tests; + +pub use item::{ + bump_retry_count, set_agent, set_depends_on, set_mergemaster_attempted, set_qa_mode, + set_retry_count, write_item, +}; +pub use migrations::{migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id}; diff --git a/server/src/crdt_state/write/tests.rs b/server/src/crdt_state/write/tests.rs new file mode 100644 index 00000000..742af414 --- /dev/null +++ b/server/src/crdt_state/write/tests.rs @@ -0,0 +1,735 @@ +//! Tests for the CRDT write module — migrations, item writes, and setters. + +use super::super::hex; +use super::super::read::{extract_item_view, read_item}; +use super::super::state::{init_for_test, rebuild_index}; +use super::super::types::PipelineDoc; +use super::migrations::numeric_id_from_slug; +use super::*; +use bft_json_crdt::json_crdt::{BaseCrdt, JsonValue, SignedOp}; +use bft_json_crdt::keypair::make_keypair; +use bft_json_crdt::op::ROOT_ID; +use serde_json::json; +use sqlx::SqlitePool; +use sqlx::sqlite::SqliteConnectOptions; + +// ── name_from_story_id tests ───────────────────────────────────────────── + +#[test] +fn name_from_story_id_story_type() { + assert_eq!( + name_from_story_id("729_story_store_story_name_as_a_crdt_field"), + "Store story name as a crdt field" + ); +} + +#[test] +fn name_from_story_id_bug_type() { + assert_eq!(name_from_story_id("4_bug_login_crash"), "Login crash"); +} + +#[test] +fn name_from_story_id_spike_type() { + assert_eq!(name_from_story_id("10_spike_arch_review"), "Arch review"); +} + +#[test] +fn name_from_story_id_refactor_type() { + assert_eq!( + name_from_story_id("99_refactor_decompose_server"), + "Decompose server" + ); +} + +#[test] +fn name_from_story_id_single_word() { + assert_eq!(name_from_story_id("1_story_auth"), "Auth"); +} + +#[test] +fn name_from_story_id_unknown_type_fallback() { + // Unknown type prefix is left as-is after stripping the number. + assert_eq!(name_from_story_id("5_unknown_foo_bar"), "Unknown foo bar"); +} + +// ── numeric_id_from_slug tests ──────────────────────────────────────────── + +#[test] +fn numeric_id_from_slug_extracts_prefix() { + assert_eq!( + numeric_id_from_slug("664_story_my_feature"), + Some("664".to_string()) + ); + assert_eq!( + numeric_id_from_slug("4_bug_login_crash"), + Some("4".to_string()) + ); + assert_eq!( + numeric_id_from_slug("730_refactor_foo_bar"), + Some("730".to_string()) + ); +} + +#[test] +fn numeric_id_from_slug_returns_none_for_numeric_only() { + assert_eq!(numeric_id_from_slug("664"), None); + assert_eq!(numeric_id_from_slug("1"), None); + assert_eq!(numeric_id_from_slug("730"), None); +} + +#[test] +fn numeric_id_from_slug_returns_none_for_non_numeric_prefix() { + assert_eq!(numeric_id_from_slug("story_no_number"), None); + assert_eq!(numeric_id_from_slug(""), None); + assert_eq!(numeric_id_from_slug("abc_story"), None); +} + +// ── migrate_story_ids_to_numeric tests ─────────────────────────────────── + +#[test] +fn migrate_story_ids_to_numeric_rewrites_slug_ids() { + init_for_test(); + + write_item( + "42_story_my_feature", + "1_backlog", + Some("My Feature"), + None, + None, + None, + None, + None, + None, + None, + ); + + let result = migrate_story_ids_to_numeric(); + assert_eq!(result.len(), 1, "exactly one item should be migrated"); + assert_eq!(result[0].0, "42_story_my_feature"); + assert_eq!(result[0].1, "42"); + + // After migration the item is accessible by the numeric ID. + let item = read_item("42").expect("item should be found by numeric ID"); + assert_eq!(item.story_id, "42"); + + // The slug-based ID is gone. + assert!( + read_item("42_story_my_feature").is_none(), + "slug ID should no longer be in the index" + ); +} + +#[test] +fn migrate_story_ids_to_numeric_is_idempotent() { + init_for_test(); + + write_item( + "43", + "1_backlog", + Some("Already Numeric"), + None, + None, + None, + None, + None, + None, + None, + ); + + // First call — nothing to migrate. + let r1 = migrate_story_ids_to_numeric(); + assert!(r1.is_empty(), "no migration for already-numeric ID"); + + // Second call — still nothing. + let r2 = migrate_story_ids_to_numeric(); + assert!(r2.is_empty(), "second call must be a no-op"); + + // Item is still there. + assert!(read_item("43").is_some()); +} + +#[test] +fn migrate_story_ids_to_numeric_skips_conflict() { + init_for_test(); + + // Both the slug form AND its numeric target exist. + write_item( + "44_story_foo", + "1_backlog", + None, + None, + None, + None, + None, + None, + None, + None, + ); + write_item( + "44", + "2_current", + None, + None, + None, + None, + None, + None, + None, + None, + ); + + let result = migrate_story_ids_to_numeric(); + // The slug entry must NOT be migrated because "44" is already occupied. + assert!( + result.is_empty(), + "conflicting slug must not overwrite existing numeric entry" + ); + + // Both items remain as they were. + assert!(read_item("44_story_foo").is_some()); + assert!(read_item("44").is_some()); +} + +#[test] +fn migrate_story_ids_to_numeric_noop_when_crdt_not_initialised() { + // Must not panic when called before init. + migrate_story_ids_to_numeric(); +} + +#[test] +fn migrate_story_ids_to_numeric_preserves_stage_and_name() { + init_for_test(); + + write_item( + "45_bug_crash", + "2_current", + Some("Crash Bug"), + Some("coder-1"), + None, + None, + None, + None, + None, + None, + ); + + migrate_story_ids_to_numeric(); + + let item = read_item("45").expect("item must be accessible by numeric ID"); + assert_eq!(item.stage, "2_current"); + assert_eq!(item.name.as_deref(), Some("Crash Bug")); + assert_eq!(item.agent.as_deref(), Some("coder-1")); +} + +#[test] +fn migrate_names_from_slugs_fills_empty_names() { + init_for_test(); + + // Write an item without a name. + write_item( + "42_story_my_feature", + "1_backlog", + None, + None, + None, + None, + None, + None, + None, + None, + ); + + // Before migration the name should be empty. + let before = read_item("42_story_my_feature").unwrap(); + assert!( + before.name.as_deref().unwrap_or("").is_empty(), + "name should be empty before migration" + ); + + migrate_names_from_slugs(); + + // After migration the name should be derived from the slug. + let after = read_item("42_story_my_feature").unwrap(); + assert_eq!( + after.name.as_deref(), + Some("My feature"), + "name should be derived from slug after migration" + ); +} + +#[test] +fn migrate_names_from_slugs_leaves_existing_names_unchanged() { + init_for_test(); + + write_item( + "43_story_named_item", + "1_backlog", + Some("Already Named"), + None, + None, + None, + None, + None, + None, + None, + ); + + migrate_names_from_slugs(); + + let after = read_item("43_story_named_item").unwrap(); + assert_eq!( + after.name.as_deref(), + Some("Already Named"), + "pre-existing name must not be overwritten" + ); +} + +#[test] +fn migrate_names_from_slugs_noop_when_crdt_not_initialised() { + // Should not panic when called before init. + // In practice get_crdt() returns None in a fresh thread. + // We call it here just to confirm no panic. + migrate_names_from_slugs(); +} + +// ── set_depends_on regression tests ────────────────────────────────────── + +#[test] +fn set_depends_on_round_trip_and_clear() { + use super::super::read::{check_unmet_deps_crdt, read_item}; + init_for_test(); + + write_item( + "872_test_target", + "1_backlog", + Some("Target"), + None, + None, + None, + None, + None, + None, + None, + ); + + // Set depends_on to [837] and verify CRDT register holds the list. + let ok = set_depends_on("872_test_target", &[837]); + assert!(ok, "set_depends_on should return true for known item"); + let view = read_item("872_test_target").unwrap(); + assert_eq!( + view.depends_on, + Some(vec![837]), + "CRDT register should hold [837]" + ); + + // Clear by passing an empty slice. + let ok = set_depends_on("872_test_target", &[]); + assert!(ok, "set_depends_on([]) should return true"); + let view = read_item("872_test_target").unwrap(); + assert_eq!( + view.depends_on, None, + "clearing should leave register unset" + ); + + // Auto-assigner sees no unmet dependency after clearing. + let unmet = check_unmet_deps_crdt("872_test_target"); + assert!( + unmet.is_empty(), + "after clearing deps, auto-assigner should see no unmet dependencies" + ); +} + +#[test] +fn set_depends_on_returns_false_for_unknown_story() { + init_for_test(); + let ok = set_depends_on("nonexistent_story_872", &[1, 2, 3]); + assert!( + !ok, + "set_depends_on should return false for unknown story_id" + ); +} + +// ── set_mergemaster_attempted regression tests ─────────────────────────── + +#[test] +fn set_mergemaster_attempted_true_then_false_flips_register() { + init_for_test(); + + write_item( + "873_story_mergemaster_flip", + "4_merge", + None, + None, + None, + None, + None, + None, + None, + None, + ); + + // Set true — register must read back as true. + let ok = set_mergemaster_attempted("873_story_mergemaster_flip", true); + assert!( + ok, + "set_mergemaster_attempted should return true for known item" + ); + let view = read_item("873_story_mergemaster_flip").unwrap(); + assert_eq!( + view.mergemaster_attempted, + Some(true), + "CRDT register should hold true after setting true" + ); + + // Set false — register must flip back to false (not unset). + let ok = set_mergemaster_attempted("873_story_mergemaster_flip", false); + assert!( + ok, + "set_mergemaster_attempted(false) should return true for known item" + ); + let view = read_item("873_story_mergemaster_flip").unwrap(); + assert_eq!( + view.mergemaster_attempted, + Some(false), + "CRDT register should hold false after explicit clear" + ); +} + +#[test] +fn set_mergemaster_attempted_returns_false_for_unknown_story() { + init_for_test(); + let ok = set_mergemaster_attempted("nonexistent_story_mm", true); + assert!( + !ok, + "set_mergemaster_attempted should return false for unknown story_id" + ); +} + +// ── set_agent tests ────────────────────────────────────────────────────── + +#[test] +fn set_agent_some_writes_name() { + init_for_test(); + + write_item( + "871_story_set_agent_write", + "2_current", + Some("Set Agent Write"), + None, + None, + None, + None, + None, + None, + None, + ); + + let found = set_agent("871_story_set_agent_write", Some("coder-1")); + assert!(found, "set_agent should return true for an existing item"); + + let item = read_item("871_story_set_agent_write").expect("item must exist"); + assert_eq!( + item.agent.as_deref(), + Some("coder-1"), + "agent should be written to CRDT register" + ); +} + +#[test] +fn set_agent_none_clears_register() { + init_for_test(); + + write_item( + "871_story_set_agent_clear", + "2_current", + Some("Set Agent Clear"), + Some("coder-2"), + None, + None, + None, + None, + None, + None, + ); + + // Confirm agent is set. + let before = read_item("871_story_set_agent_clear").expect("item must exist"); + assert_eq!(before.agent.as_deref(), Some("coder-2")); + + // Clear it. + let found = set_agent("871_story_set_agent_clear", None); + assert!(found, "set_agent should return true for an existing item"); + + let after = read_item("871_story_set_agent_clear").expect("item must exist"); + assert!( + after.agent.as_deref().unwrap_or("").is_empty(), + "agent should be cleared (empty string) after set_agent(None)" + ); +} + +#[test] +fn set_agent_returns_false_for_unknown_story() { + init_for_test(); + + let found = set_agent("999_story_nonexistent", Some("coder-1")); + assert!( + !found, + "set_agent should return false when story is not in the CRDT" + ); +} + +// ── set_qa_mode regression tests ───────────────────────────────────────── + +#[test] +fn set_qa_mode_round_trip_server_then_human() { + use crate::io::story_metadata::QaMode; + init_for_test(); + + write_item( + "869_story_qa_roundtrip", + "1_backlog", + None, + None, + None, + None, + None, + None, + None, + None, + ); + + // Set qa=server via typed path and assert CRDT register reflects it. + let ok = set_qa_mode("869_story_qa_roundtrip", Some(QaMode::Server)); + assert!(ok, "set_qa_mode should return true for known item"); + let view = read_item("869_story_qa_roundtrip").unwrap(); + assert_eq!( + view.qa_mode.as_deref(), + Some("server"), + "CRDT register should hold \"server\"" + ); + + // Set qa=human via typed path and assert CRDT register is updated. + let ok = set_qa_mode("869_story_qa_roundtrip", Some(QaMode::Human)); + assert!(ok, "set_qa_mode should return true for known item"); + let view = read_item("869_story_qa_roundtrip").unwrap(); + assert_eq!( + view.qa_mode.as_deref(), + Some("human"), + "CRDT register should hold \"human\"" + ); + + // Clear via None — register goes back to unset. + let ok = set_qa_mode("869_story_qa_roundtrip", None); + assert!(ok, "set_qa_mode(None) should return true"); + let view = read_item("869_story_qa_roundtrip").unwrap(); + assert_eq!( + view.qa_mode, None, + "clearing qa_mode should leave register unset" + ); +} + +#[test] +fn set_qa_mode_returns_false_for_unknown_story() { + init_for_test(); + use crate::io::story_metadata::QaMode; + let ok = set_qa_mode("nonexistent_story_qa", Some(QaMode::Server)); + assert!(!ok, "set_qa_mode should return false for unknown story_id"); +} + +// ── set_retry_count / bump_retry_count tests ───────────────────────────── + +#[test] +fn bump_retry_count_increments_by_one() { + init_for_test(); + write_item( + "9001_story_bump_test", + "2_current", + None, + None, + None, + None, + None, + None, + None, + None, + ); + + let v1 = bump_retry_count("9001_story_bump_test"); + assert_eq!(v1, 1, "first bump should return 1"); + + let v2 = bump_retry_count("9001_story_bump_test"); + assert_eq!(v2, 2, "second bump should return 2"); + + let item = read_item("9001_story_bump_test").expect("item must exist"); + assert_eq!( + item.retry_count, + Some(2), + "CRDT must reflect final bump value" + ); +} + +#[test] +fn set_retry_count_resets_to_zero() { + init_for_test(); + write_item( + "9002_story_set_test", + "2_current", + None, + None, + Some(5), + None, + None, + None, + None, + None, + ); + + set_retry_count("9002_story_set_test", 0); + + let item = read_item("9002_story_set_test").expect("item must exist"); + assert_eq!( + item.retry_count, + Some(0), + "set_retry_count(0) must reset to 0" + ); +} + +#[test] +fn bump_returns_zero_for_missing_item() { + init_for_test(); + let result = bump_retry_count("nonexistent_story"); + assert_eq!(result, 0, "bump on missing item should return 0 (no-op)"); +} + +#[tokio::test] +async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("bug511.db"); + + let options = SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = SqlitePool::connect_with(options).await.unwrap(); + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + // Insert 5 dummy items to advance items.our_seq to 5. + for i in 0..5u32 { + let sid = format!("{}_story_warmup", i); + let item: JsonValue = json!({ + "story_id": sid, + "stage": "1_backlog", + "name": "", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt.apply(op.clone()); + // We don't persist these to the DB — they are pre-history. + } + + // Now insert the real item. items.our_seq was 5, so this op gets seq=6. + let target_item: JsonValue = json!({ + "story_id": "511_story_target", + "stage": "1_backlog", + "name": "Bug 511 target", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp); + crdt.apply(insert_op.clone()); + // insert_op.inner.seq == 6 + + // Now update the stage. The stage LwwRegisterCrdt for this item starts + // at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6. + let idx = rebuild_index(&crdt)["511_story_target"]; + let stage_op = crdt.doc.items[idx] + .stage + .set("2_current".to_string()) + .sign(&kp); + crdt.apply(stage_op.clone()); + // stage_op.inner.seq == 1 + + // Persist BOTH ops in causal order (insert first, update second). + // This means insert_op gets rowid < stage_op rowid. + let now = chrono::Utc::now().to_rfc3339(); + for op in [&insert_op, &stage_op] { + let op_json = serde_json::to_string(op).unwrap(); + let op_id = hex::encode(&op.id()); + sqlx::query( + "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", + ) + .bind(&op_id) + .bind(op.inner.seq as i64) + .bind(&op_json) + .bind(&now) + .execute(&pool) + .await + .unwrap(); + } + + // Replay by rowid ASC (the fix). The insert must come before the field + // update regardless of their field-level seq values. + let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") + .fetch_all(&pool) + .await + .unwrap(); + + let mut crdt2 = BaseCrdt::::new(&kp); + for (json_str,) in &rows { + let op: SignedOp = serde_json::from_str(json_str).unwrap(); + crdt2.apply(op); + } + + // The item must be in the CRDT and must reflect the stage update. + let index2 = rebuild_index(&crdt2); + assert!( + index2.contains_key("511_story_target"), + "item not found after rowid-order replay" + ); + let idx2 = index2["511_story_target"]; + let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap(); + assert_eq!( + view.stage, "2_current", + "stage field update lost during replay (bug 511 regression)" + ); + + // Confirm the bug is reproducible by replaying seq ASC instead. + // With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6), + // fails ErrPathMismatch, and the item ends up at "1_backlog". + let rows_wrong_order: Vec<(String,)> = + sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC") + .fetch_all(&pool) + .await + .unwrap(); + + let mut crdt3 = BaseCrdt::::new(&kp); + for (json_str,) in &rows_wrong_order { + let op: SignedOp = serde_json::from_str(json_str).unwrap(); + crdt3.apply(op); + } + + let index3 = rebuild_index(&crdt3); + // With seq ASC replay, the item is created (insert_op eventually runs) + // but the stage update is lost (it ran before the item existed). + if let Some(idx3) = index3.get("511_story_target") { + let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap(); + // The bug: stage is still "1_backlog" because the update was dropped. + assert_eq!( + view3.stage, "1_backlog", + "expected seq-ASC replay to exhibit the bug (update lost)" + ); + } +}