//! High-level write API for pipeline items. #![allow(unused_imports, dead_code)] use bft_json_crdt::json_crdt::*; use bft_json_crdt::lww_crdt::LwwRegisterCrdt; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use super::state::{apply_and_persist, emit_event, get_crdt, rebuild_index}; use super::types::{CrdtEvent, PipelineDoc, PipelineItemCrdt}; use crate::slog; // ── Name migration helpers ──────────────────────────────────────────── /// Derive a human-readable name from a story ID's slug component. /// /// Strips the numeric prefix and item-type prefix (story/bug/spike/refactor), /// replaces underscores with spaces, and capitalises the first letter. /// /// Examples: /// - `"729_story_store_story_name"` → `"Store story name"` /// - `"4_bug_login_crash"` → `"Login crash"` /// - `"10_spike_arch_review"` → `"Arch review"` pub fn name_from_story_id(story_id: &str) -> String { // Strip the leading digits then the first underscore: "729_story_..." → "story_..." let after_num = story_id.trim_start_matches(|c: char| c.is_ascii_digit()); let after_num = after_num.strip_prefix('_').unwrap_or(after_num); // Strip the item-type prefix. let slug = after_num .strip_prefix("story_") .or_else(|| after_num.strip_prefix("bug_")) .or_else(|| after_num.strip_prefix("spike_")) .or_else(|| after_num.strip_prefix("refactor_")) .unwrap_or(after_num); // Replace underscores with spaces. let spaced = slug.replace('_', " "); // Capitalise the first character. let mut chars = spaced.chars(); match chars.next() { None => String::new(), Some(first) => { let mut name = first.to_uppercase().to_string(); name.push_str(chars.as_str()); name } } } /// Extract the numeric-only ID from a slug-form story ID, if applicable. /// /// Returns `Some("664")` for `"664_story_my_feature"`, and `None` for IDs /// that are already numeric-only (`"664"`) or have no valid numeric prefix. fn numeric_id_from_slug(story_id: &str) -> Option { // Already numeric-only — no migration needed. if story_id.chars().all(|c: char| c.is_ascii_digit()) { return None; } // Must have a non-empty numeric segment before the first underscore. let idx = story_id.find('_')?; let prefix = &story_id[..idx]; if prefix.is_empty() || !prefix.chars().all(|c| c.is_ascii_digit()) { return None; } Some(prefix.to_string()) } /// Migrate existing story IDs from slug form (`664_story_my_feature`) to /// numeric-only form (`664`) in the in-memory CRDT, persisting a signed op /// for each updated register so the change survives restarts. /// /// Returns the list of `(old_id, new_id)` pairs that were actually migrated. /// Callers should use this list to rename downstream filesystem artifacts /// (worktree directories, git branches, log directories). /// /// Items whose `story_id` is already numeric-only are left untouched. /// Items where the target numeric ID is already in use are skipped to avoid /// conflicts. Running this migration repeatedly is safe — subsequent calls /// on already-migrated state are no-ops. pub fn migrate_story_ids_to_numeric() -> Vec<(String, String)> { let Some(state_mutex) = get_crdt() else { return Vec::new(); }; // First pass: collect (index, old_id, new_id) while holding the lock. let migrations: Vec<(usize, String, String)> = { let Ok(state) = state_mutex.lock() else { return Vec::new(); }; let existing_ids: std::collections::HashSet = state.index.keys().cloned().collect(); state .index .iter() .filter_map(|(story_id, &idx)| { let numeric = numeric_id_from_slug(story_id)?; // Skip if the target numeric ID is already occupied. if existing_ids.contains(&numeric) { return None; } Some((idx, story_id.clone(), numeric)) }) .collect() }; if migrations.is_empty() { return Vec::new(); } // Second pass: apply story_id register updates. let Ok(mut state) = state_mutex.lock() else { return Vec::new(); }; let mut result = Vec::new(); for (idx, old_id, new_id) in migrations { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].story_id.set(new_id.clone()) }); result.push((old_id, new_id)); } // Rebuild the index so all downstream reads use the new numeric IDs. state.index = rebuild_index(&state.crdt); let count = result.len(); slog!("[crdt] Migrated {count} story IDs from slug form to numeric"); result } /// Backfill the `name` CRDT field for pipeline items that have an empty name. /// /// Iterates over all items in the in-memory CRDT. For each item whose `name` /// register is empty, derives a human-readable name from the story ID slug /// (see [`name_from_story_id`]) and writes it via a signed CRDT op. /// /// This is a one-time startup migration: items created before the `name` field /// was consistently populated will gain a name on the next server start. /// Items that already have a non-empty name are left untouched. pub fn migrate_names_from_slugs() { let Some(state_mutex) = get_crdt() else { return; }; // First pass: collect (index, derived_name) pairs for items missing a name. let migrations: Vec<(usize, String)> = { let Ok(state) = state_mutex.lock() else { return; }; state .index .iter() .filter_map(|(story_id, &idx)| { let item = &state.crdt.doc.items[idx]; // Skip items that already have a name. let already_named = matches!(item.name.view(), JsonValue::String(ref s) if !s.is_empty()); if already_named { return None; } let name = name_from_story_id(story_id); if name.is_empty() { return None; } Some((idx, name)) }) .collect() }; if migrations.is_empty() { return; } // Second pass: apply all name writes while holding the lock. let Ok(mut state) = state_mutex.lock() else { return; }; let count = migrations.len(); for (idx, name) in migrations { apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].name.set(name.clone())); } slog!("[crdt] Migrated names for {count} items from story ID slugs"); } /// Write a pipeline item state through CRDT operations. /// /// If the item exists, updates its registers. If not, inserts a new item /// into the list. All ops are signed and persisted to SQLite. /// /// When the stage changes (or a new item is created), a [`CrdtEvent`] is /// broadcast so subscribers can react to the transition. #[allow(clippy::too_many_arguments)] pub fn write_item( story_id: &str, stage: &str, name: Option<&str>, agent: Option<&str>, retry_count: Option, blocked: Option, depends_on: Option<&str>, claimed_by: Option<&str>, claimed_at: Option, merged_at: Option, ) { let Some(state_mutex) = get_crdt() else { return; }; let Ok(mut state) = state_mutex.lock() else { return; }; if let Some(&idx) = state.index.get(story_id) { // Capture the old stage before updating so we can detect transitions. let old_stage = match state.crdt.doc.items[idx].stage.view() { JsonValue::String(s) => Some(s), _ => None, }; // Update existing item registers. apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].stage.set(stage.to_string()) }); if let Some(n) = name { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].name.set(n.to_string()) }); } if let Some(a) = agent { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].agent.set(a.to_string()) }); } if let Some(rc) = retry_count { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].retry_count.set(rc as f64) }); } if let Some(b) = blocked { apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].blocked.set(b)); } if let Some(d) = depends_on { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].depends_on.set(d.to_string()) }); } if let Some(cb) = claimed_by { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].claimed_by.set(cb.to_string()) }); } if let Some(ca) = claimed_at { apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(ca)); } if let Some(ma) = merged_at { apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma)); } // Broadcast a CrdtEvent if the stage actually changed. let stage_changed = old_stage.as_deref() != Some(stage); if stage_changed { // Read the current name from the CRDT document for the event. let current_name = match state.crdt.doc.items[idx].name.view() { JsonValue::String(s) if !s.is_empty() => Some(s), _ => None, }; emit_event(CrdtEvent { story_id: story_id.to_string(), from_stage: old_stage, to_stage: stage.to_string(), name: current_name, }); } } else { // Insert new item. let item_json: JsonValue = json!({ "story_id": story_id, "stage": stage, "name": name.unwrap_or(""), "agent": agent.unwrap_or(""), "retry_count": retry_count.unwrap_or(0) as f64, "blocked": blocked.unwrap_or(false), "depends_on": depends_on.unwrap_or(""), "claimed_by": claimed_by.unwrap_or(""), "claimed_at": claimed_at.unwrap_or(0.0), "merged_at": merged_at.unwrap_or(0.0), }) .into(); apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); // Rebuild index after insertion (indices may shift). state.index = rebuild_index(&state.crdt); // Advance the inner registers of the newly-created item to the Lamport // floor so their first local ops don't re-emit low sequence numbers. let floor = state.lamport_floor; if floor > 0 && let Some(&idx) = state.index.get(story_id) { let item = &mut state.crdt.doc.items[idx]; item.story_id.advance_seq(floor); item.stage.advance_seq(floor); item.name.advance_seq(floor); item.agent.advance_seq(floor); item.retry_count.advance_seq(floor); item.blocked.advance_seq(floor); item.depends_on.advance_seq(floor); item.claimed_by.advance_seq(floor); item.claimed_at.advance_seq(floor); item.merged_at.advance_seq(floor); } // Broadcast a CrdtEvent for the new item. emit_event(CrdtEvent { story_id: story_id.to_string(), from_stage: None, to_stage: stage.to_string(), name: name.map(String::from), }); } } #[cfg(test)] mod tests { use super::super::hex; use super::super::read::extract_item_view; use super::super::read::read_item; use super::super::state::init_for_test; use super::super::state::rebuild_index; use super::*; // ── name_from_story_id tests ───────────────────────────────────────────── #[test] fn name_from_story_id_story_type() { assert_eq!( name_from_story_id("729_story_store_story_name_as_a_crdt_field"), "Store story name as a crdt field" ); } #[test] fn name_from_story_id_bug_type() { assert_eq!(name_from_story_id("4_bug_login_crash"), "Login crash"); } #[test] fn name_from_story_id_spike_type() { assert_eq!(name_from_story_id("10_spike_arch_review"), "Arch review"); } #[test] fn name_from_story_id_refactor_type() { assert_eq!( name_from_story_id("99_refactor_decompose_server"), "Decompose server" ); } #[test] fn name_from_story_id_single_word() { assert_eq!(name_from_story_id("1_story_auth"), "Auth"); } #[test] fn name_from_story_id_unknown_type_fallback() { // Unknown type prefix is left as-is after stripping the number. assert_eq!(name_from_story_id("5_unknown_foo_bar"), "Unknown foo bar"); } // ── migrate_names_from_slugs tests ─────────────────────────────────────── // ── numeric_id_from_slug tests ──────────────────────────────────────────── #[test] fn numeric_id_from_slug_extracts_prefix() { assert_eq!( numeric_id_from_slug("664_story_my_feature"), Some("664".to_string()) ); assert_eq!( numeric_id_from_slug("4_bug_login_crash"), Some("4".to_string()) ); assert_eq!( numeric_id_from_slug("730_refactor_foo_bar"), Some("730".to_string()) ); } #[test] fn numeric_id_from_slug_returns_none_for_numeric_only() { assert_eq!(numeric_id_from_slug("664"), None); assert_eq!(numeric_id_from_slug("1"), None); assert_eq!(numeric_id_from_slug("730"), None); } #[test] fn numeric_id_from_slug_returns_none_for_non_numeric_prefix() { assert_eq!(numeric_id_from_slug("story_no_number"), None); assert_eq!(numeric_id_from_slug(""), None); assert_eq!(numeric_id_from_slug("abc_story"), None); } // ── migrate_story_ids_to_numeric tests ─────────────────────────────────── #[test] fn migrate_story_ids_to_numeric_rewrites_slug_ids() { init_for_test(); write_item( "42_story_my_feature", "1_backlog", Some("My Feature"), None, None, None, None, None, None, None, ); let result = migrate_story_ids_to_numeric(); assert_eq!(result.len(), 1, "exactly one item should be migrated"); assert_eq!(result[0].0, "42_story_my_feature"); assert_eq!(result[0].1, "42"); // After migration the item is accessible by the numeric ID. let item = read_item("42").expect("item should be found by numeric ID"); assert_eq!(item.story_id, "42"); // The slug-based ID is gone. assert!( read_item("42_story_my_feature").is_none(), "slug ID should no longer be in the index" ); } #[test] fn migrate_story_ids_to_numeric_is_idempotent() { init_for_test(); write_item( "43", "1_backlog", Some("Already Numeric"), None, None, None, None, None, None, None, ); // First call — nothing to migrate. let r1 = migrate_story_ids_to_numeric(); assert!(r1.is_empty(), "no migration for already-numeric ID"); // Second call — still nothing. let r2 = migrate_story_ids_to_numeric(); assert!(r2.is_empty(), "second call must be a no-op"); // Item is still there. assert!(read_item("43").is_some()); } #[test] fn migrate_story_ids_to_numeric_skips_conflict() { init_for_test(); // Both the slug form AND its numeric target exist. write_item( "44_story_foo", "1_backlog", None, None, None, None, None, None, None, None, ); write_item( "44", "2_current", None, None, None, None, None, None, None, None, ); let result = migrate_story_ids_to_numeric(); // The slug entry must NOT be migrated because "44" is already occupied. assert!( result.is_empty(), "conflicting slug must not overwrite existing numeric entry" ); // Both items remain as they were. assert!(read_item("44_story_foo").is_some()); assert!(read_item("44").is_some()); } #[test] fn migrate_story_ids_to_numeric_noop_when_crdt_not_initialised() { // Must not panic when called before init. migrate_story_ids_to_numeric(); } #[test] fn migrate_story_ids_to_numeric_preserves_stage_and_name() { init_for_test(); write_item( "45_bug_crash", "2_current", Some("Crash Bug"), Some("coder-1"), None, None, None, None, None, None, ); migrate_story_ids_to_numeric(); let item = read_item("45").expect("item must be accessible by numeric ID"); assert_eq!(item.stage, "2_current"); assert_eq!(item.name.as_deref(), Some("Crash Bug")); assert_eq!(item.agent.as_deref(), Some("coder-1")); } #[test] fn migrate_names_from_slugs_fills_empty_names() { init_for_test(); // Write an item without a name. write_item( "42_story_my_feature", "1_backlog", None, None, None, None, None, None, None, None, ); // Before migration the name should be empty. let before = read_item("42_story_my_feature").unwrap(); assert!( before.name.as_deref().unwrap_or("").is_empty(), "name should be empty before migration" ); migrate_names_from_slugs(); // After migration the name should be derived from the slug. let after = read_item("42_story_my_feature").unwrap(); assert_eq!( after.name.as_deref(), Some("My feature"), "name should be derived from slug after migration" ); } #[test] fn migrate_names_from_slugs_leaves_existing_names_unchanged() { init_for_test(); write_item( "43_story_named_item", "1_backlog", Some("Already Named"), None, None, None, None, None, None, None, ); migrate_names_from_slugs(); let after = read_item("43_story_named_item").unwrap(); assert_eq!( after.name.as_deref(), Some("Already Named"), "pre-existing name must not be overwritten" ); } #[test] fn migrate_names_from_slugs_noop_when_crdt_not_initialised() { // Should not panic when called before init. // In practice get_crdt() returns None in a fresh thread. // We call it here just to confirm no panic. migrate_names_from_slugs(); } use bft_json_crdt::json_crdt::OpState; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use sqlx::SqlitePool; use sqlx::sqlite::SqliteConnectOptions; #[tokio::test] async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("bug511.db"); let options = SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); // Insert 5 dummy items to advance items.our_seq to 5. for i in 0..5u32 { let sid = format!("{}_story_warmup", i); let item: JsonValue = json!({ "story_id": sid, "stage": "1_backlog", "name": "", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); crdt.apply(op.clone()); // We don't persist these to the DB — they are pre-history. } // Now insert the real item. items.our_seq was 5, so this op gets seq=6. let target_item: JsonValue = json!({ "story_id": "511_story_target", "stage": "1_backlog", "name": "Bug 511 target", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp); crdt.apply(insert_op.clone()); // insert_op.inner.seq == 6 // Now update the stage. The stage LwwRegisterCrdt for this item starts // at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6. let idx = rebuild_index(&crdt)["511_story_target"]; let stage_op = crdt.doc.items[idx] .stage .set("2_current".to_string()) .sign(&kp); crdt.apply(stage_op.clone()); // stage_op.inner.seq == 1 // Persist BOTH ops in causal order (insert first, update second). // This means insert_op gets rowid < stage_op rowid. let now = chrono::Utc::now().to_rfc3339(); for op in [&insert_op, &stage_op] { let op_json = serde_json::to_string(op).unwrap(); let op_id = hex::encode(&op.id()); sqlx::query( "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", ) .bind(&op_id) .bind(op.inner.seq as i64) .bind(&op_json) .bind(&now) .execute(&pool) .await .unwrap(); } // Replay by rowid ASC (the fix). The insert must come before the field // update regardless of their field-level seq values. let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") .fetch_all(&pool) .await .unwrap(); let mut crdt2 = BaseCrdt::::new(&kp); for (json_str,) in &rows { let op: SignedOp = serde_json::from_str(json_str).unwrap(); crdt2.apply(op); } // The item must be in the CRDT and must reflect the stage update. let index2 = rebuild_index(&crdt2); assert!( index2.contains_key("511_story_target"), "item not found after rowid-order replay" ); let idx2 = index2["511_story_target"]; let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap(); assert_eq!( view.stage, "2_current", "stage field update lost during replay (bug 511 regression)" ); // Confirm the bug is reproducible by replaying seq ASC instead. // With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6), // fails ErrPathMismatch, and the item ends up at "1_backlog". let rows_wrong_order: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC") .fetch_all(&pool) .await .unwrap(); let mut crdt3 = BaseCrdt::::new(&kp); for (json_str,) in &rows_wrong_order { let op: SignedOp = serde_json::from_str(json_str).unwrap(); crdt3.apply(op); } let index3 = rebuild_index(&crdt3); // With seq ASC replay, the item is created (insert_op eventually runs) // but the stage update is lost (it ran before the item existed). if let Some(idx3) = index3.get("511_story_target") { let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap(); // The bug: stage is still "1_backlog" because the update was dropped. assert_eq!( view3.stage, "1_backlog", "expected seq-ASC replay to exhibit the bug (update lost)" ); } } }