//! Tests for the CRDT write module — migrations, item writes, and setters. use super::super::hex; use super::super::read::{extract_item_view, read_item}; use super::super::state::{init_for_test, rebuild_index}; use super::super::types::PipelineDoc; use super::migrations::numeric_id_from_slug; use super::*; use bft_json_crdt::json_crdt::{BaseCrdt, JsonValue, SignedOp}; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use sqlx::SqlitePool; use sqlx::sqlite::SqliteConnectOptions; // ── name_from_story_id tests ───────────────────────────────────────────── #[test] fn name_from_story_id_story_type() { assert_eq!( name_from_story_id("729_story_store_story_name_as_a_crdt_field"), "Store story name as a crdt field" ); } #[test] fn name_from_story_id_bug_type() { assert_eq!(name_from_story_id("4_bug_login_crash"), "Login crash"); } #[test] fn name_from_story_id_spike_type() { assert_eq!(name_from_story_id("10_spike_arch_review"), "Arch review"); } #[test] fn name_from_story_id_refactor_type() { assert_eq!( name_from_story_id("99_refactor_decompose_server"), "Decompose server" ); } #[test] fn name_from_story_id_single_word() { assert_eq!(name_from_story_id("1_story_auth"), "Auth"); } #[test] fn name_from_story_id_unknown_type_fallback() { // Unknown type prefix is left as-is after stripping the number. assert_eq!(name_from_story_id("5_unknown_foo_bar"), "Unknown foo bar"); } // ── numeric_id_from_slug tests ──────────────────────────────────────────── #[test] fn numeric_id_from_slug_extracts_prefix() { assert_eq!( numeric_id_from_slug("664_story_my_feature"), Some("664".to_string()) ); assert_eq!( numeric_id_from_slug("4_bug_login_crash"), Some("4".to_string()) ); assert_eq!( numeric_id_from_slug("730_refactor_foo_bar"), Some("730".to_string()) ); } #[test] fn numeric_id_from_slug_returns_none_for_numeric_only() { assert_eq!(numeric_id_from_slug("664"), None); assert_eq!(numeric_id_from_slug("1"), None); assert_eq!(numeric_id_from_slug("730"), None); } #[test] fn numeric_id_from_slug_returns_none_for_non_numeric_prefix() { assert_eq!(numeric_id_from_slug("story_no_number"), None); assert_eq!(numeric_id_from_slug(""), None); assert_eq!(numeric_id_from_slug("abc_story"), None); } // ── migrate_story_ids_to_numeric tests ─────────────────────────────────── #[test] fn migrate_story_ids_to_numeric_rewrites_slug_ids() { init_for_test(); write_item( "42_story_my_feature", "1_backlog", Some("My Feature"), None, None, None, None, None, None, None, ); let result = migrate_story_ids_to_numeric(); assert_eq!(result.len(), 1, "exactly one item should be migrated"); assert_eq!(result[0].0, "42_story_my_feature"); assert_eq!(result[0].1, "42"); // After migration the item is accessible by the numeric ID. let item = read_item("42").expect("item should be found by numeric ID"); assert_eq!(item.story_id, "42"); // The slug-based ID is gone. assert!( read_item("42_story_my_feature").is_none(), "slug ID should no longer be in the index" ); } #[test] fn migrate_story_ids_to_numeric_is_idempotent() { init_for_test(); write_item( "43", "1_backlog", Some("Already Numeric"), None, None, None, None, None, None, None, ); // First call — nothing to migrate. let r1 = migrate_story_ids_to_numeric(); assert!(r1.is_empty(), "no migration for already-numeric ID"); // Second call — still nothing. let r2 = migrate_story_ids_to_numeric(); assert!(r2.is_empty(), "second call must be a no-op"); // Item is still there. assert!(read_item("43").is_some()); } #[test] fn migrate_story_ids_to_numeric_skips_conflict() { init_for_test(); // Both the slug form AND its numeric target exist. write_item( "44_story_foo", "1_backlog", None, None, None, None, None, None, None, None, ); write_item( "44", "2_current", None, None, None, None, None, None, None, None, ); let result = migrate_story_ids_to_numeric(); // The slug entry must NOT be migrated because "44" is already occupied. assert!( result.is_empty(), "conflicting slug must not overwrite existing numeric entry" ); // Both items remain as they were. assert!(read_item("44_story_foo").is_some()); assert!(read_item("44").is_some()); } #[test] fn migrate_story_ids_to_numeric_noop_when_crdt_not_initialised() { // Must not panic when called before init. migrate_story_ids_to_numeric(); } #[test] fn migrate_story_ids_to_numeric_preserves_stage_and_name() { init_for_test(); write_item( "45_bug_crash", "2_current", Some("Crash Bug"), Some("coder-1"), None, None, None, None, None, None, ); migrate_story_ids_to_numeric(); let item = read_item("45").expect("item must be accessible by numeric ID"); assert_eq!(item.stage, "2_current"); assert_eq!(item.name.as_deref(), Some("Crash Bug")); assert_eq!(item.agent.as_deref(), Some("coder-1")); } #[test] fn migrate_names_from_slugs_fills_empty_names() { init_for_test(); // Write an item without a name. write_item( "42_story_my_feature", "1_backlog", None, None, None, None, None, None, None, None, ); // Before migration the name should be empty. let before = read_item("42_story_my_feature").unwrap(); assert!( before.name.as_deref().unwrap_or("").is_empty(), "name should be empty before migration" ); migrate_names_from_slugs(); // After migration the name should be derived from the slug. let after = read_item("42_story_my_feature").unwrap(); assert_eq!( after.name.as_deref(), Some("My feature"), "name should be derived from slug after migration" ); } #[test] fn migrate_names_from_slugs_leaves_existing_names_unchanged() { init_for_test(); write_item( "43_story_named_item", "1_backlog", Some("Already Named"), None, None, None, None, None, None, None, ); migrate_names_from_slugs(); let after = read_item("43_story_named_item").unwrap(); assert_eq!( after.name.as_deref(), Some("Already Named"), "pre-existing name must not be overwritten" ); } #[test] fn migrate_names_from_slugs_noop_when_crdt_not_initialised() { // Should not panic when called before init. // In practice get_crdt() returns None in a fresh thread. // We call it here just to confirm no panic. migrate_names_from_slugs(); } // ── set_depends_on regression tests ────────────────────────────────────── #[test] fn set_depends_on_round_trip_and_clear() { use super::super::read::{check_unmet_deps_crdt, read_item}; init_for_test(); write_item( "872_test_target", "1_backlog", Some("Target"), None, None, None, None, None, None, None, ); // Set depends_on to [837] and verify CRDT register holds the list. let ok = set_depends_on("872_test_target", &[837]); assert!(ok, "set_depends_on should return true for known item"); let view = read_item("872_test_target").unwrap(); assert_eq!( view.depends_on, Some(vec![837]), "CRDT register should hold [837]" ); // Clear by passing an empty slice. let ok = set_depends_on("872_test_target", &[]); assert!(ok, "set_depends_on([]) should return true"); let view = read_item("872_test_target").unwrap(); assert_eq!( view.depends_on, None, "clearing should leave register unset" ); // Auto-assigner sees no unmet dependency after clearing. let unmet = check_unmet_deps_crdt("872_test_target"); assert!( unmet.is_empty(), "after clearing deps, auto-assigner should see no unmet dependencies" ); } #[test] fn set_depends_on_returns_false_for_unknown_story() { init_for_test(); let ok = set_depends_on("nonexistent_story_872", &[1, 2, 3]); assert!( !ok, "set_depends_on should return false for unknown story_id" ); } // ── set_mergemaster_attempted regression tests ─────────────────────────── #[test] fn set_mergemaster_attempted_true_then_false_flips_register() { init_for_test(); write_item( "873_story_mergemaster_flip", "4_merge", None, None, None, None, None, None, None, None, ); // Set true — register must read back as true. let ok = set_mergemaster_attempted("873_story_mergemaster_flip", true); assert!( ok, "set_mergemaster_attempted should return true for known item" ); let view = read_item("873_story_mergemaster_flip").unwrap(); assert_eq!( view.mergemaster_attempted, Some(true), "CRDT register should hold true after setting true" ); // Set false — register must flip back to false (not unset). let ok = set_mergemaster_attempted("873_story_mergemaster_flip", false); assert!( ok, "set_mergemaster_attempted(false) should return true for known item" ); let view = read_item("873_story_mergemaster_flip").unwrap(); assert_eq!( view.mergemaster_attempted, Some(false), "CRDT register should hold false after explicit clear" ); } #[test] fn set_mergemaster_attempted_returns_false_for_unknown_story() { init_for_test(); let ok = set_mergemaster_attempted("nonexistent_story_mm", true); assert!( !ok, "set_mergemaster_attempted should return false for unknown story_id" ); } // ── set_agent tests ────────────────────────────────────────────────────── #[test] fn set_agent_some_writes_name() { init_for_test(); write_item( "871_story_set_agent_write", "2_current", Some("Set Agent Write"), None, None, None, None, None, None, None, ); let found = set_agent("871_story_set_agent_write", Some("coder-1")); assert!(found, "set_agent should return true for an existing item"); let item = read_item("871_story_set_agent_write").expect("item must exist"); assert_eq!( item.agent.as_deref(), Some("coder-1"), "agent should be written to CRDT register" ); } #[test] fn set_agent_none_clears_register() { init_for_test(); write_item( "871_story_set_agent_clear", "2_current", Some("Set Agent Clear"), Some("coder-2"), None, None, None, None, None, None, ); // Confirm agent is set. let before = read_item("871_story_set_agent_clear").expect("item must exist"); assert_eq!(before.agent.as_deref(), Some("coder-2")); // Clear it. let found = set_agent("871_story_set_agent_clear", None); assert!(found, "set_agent should return true for an existing item"); let after = read_item("871_story_set_agent_clear").expect("item must exist"); assert!( after.agent.as_deref().unwrap_or("").is_empty(), "agent should be cleared (empty string) after set_agent(None)" ); } #[test] fn set_agent_returns_false_for_unknown_story() { init_for_test(); let found = set_agent("999_story_nonexistent", Some("coder-1")); assert!( !found, "set_agent should return false when story is not in the CRDT" ); } // ── set_qa_mode regression tests ───────────────────────────────────────── #[test] fn set_qa_mode_round_trip_server_then_human() { use crate::io::story_metadata::QaMode; init_for_test(); write_item( "869_story_qa_roundtrip", "1_backlog", None, None, None, None, None, None, None, None, ); // Set qa=server via typed path and assert CRDT register reflects it. let ok = set_qa_mode("869_story_qa_roundtrip", Some(QaMode::Server)); assert!(ok, "set_qa_mode should return true for known item"); let view = read_item("869_story_qa_roundtrip").unwrap(); assert_eq!( view.qa_mode.as_deref(), Some("server"), "CRDT register should hold \"server\"" ); // Set qa=human via typed path and assert CRDT register is updated. let ok = set_qa_mode("869_story_qa_roundtrip", Some(QaMode::Human)); assert!(ok, "set_qa_mode should return true for known item"); let view = read_item("869_story_qa_roundtrip").unwrap(); assert_eq!( view.qa_mode.as_deref(), Some("human"), "CRDT register should hold \"human\"" ); // Clear via None — register goes back to unset. let ok = set_qa_mode("869_story_qa_roundtrip", None); assert!(ok, "set_qa_mode(None) should return true"); let view = read_item("869_story_qa_roundtrip").unwrap(); assert_eq!( view.qa_mode, None, "clearing qa_mode should leave register unset" ); } #[test] fn set_qa_mode_returns_false_for_unknown_story() { init_for_test(); use crate::io::story_metadata::QaMode; let ok = set_qa_mode("nonexistent_story_qa", Some(QaMode::Server)); assert!(!ok, "set_qa_mode should return false for unknown story_id"); } // ── set_retry_count / bump_retry_count tests ───────────────────────────── #[test] fn bump_retry_count_increments_by_one() { init_for_test(); write_item( "9001_story_bump_test", "2_current", None, None, None, None, None, None, None, None, ); let v1 = bump_retry_count("9001_story_bump_test"); assert_eq!(v1, 1, "first bump should return 1"); let v2 = bump_retry_count("9001_story_bump_test"); assert_eq!(v2, 2, "second bump should return 2"); let item = read_item("9001_story_bump_test").expect("item must exist"); assert_eq!( item.retry_count, Some(2), "CRDT must reflect final bump value" ); } #[test] fn set_retry_count_resets_to_zero() { init_for_test(); write_item( "9002_story_set_test", "2_current", None, None, Some(5), None, None, None, None, None, ); set_retry_count("9002_story_set_test", 0); let item = read_item("9002_story_set_test").expect("item must exist"); assert_eq!( item.retry_count, Some(0), "set_retry_count(0) must reset to 0" ); } #[test] fn bump_returns_zero_for_missing_item() { init_for_test(); let result = bump_retry_count("nonexistent_story"); assert_eq!(result, 0, "bump on missing item should return 0 (no-op)"); } #[tokio::test] async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("bug511.db"); let options = SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); // Insert 5 dummy items to advance items.our_seq to 5. for i in 0..5u32 { let sid = format!("{}_story_warmup", i); let item: JsonValue = json!({ "story_id": sid, "stage": "1_backlog", "name": "", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); crdt.apply(op.clone()); // We don't persist these to the DB — they are pre-history. } // Now insert the real item. items.our_seq was 5, so this op gets seq=6. let target_item: JsonValue = json!({ "story_id": "511_story_target", "stage": "1_backlog", "name": "Bug 511 target", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp); crdt.apply(insert_op.clone()); // insert_op.inner.seq == 6 // Now update the stage. The stage LwwRegisterCrdt for this item starts // at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6. let idx = rebuild_index(&crdt)["511_story_target"]; let stage_op = crdt.doc.items[idx] .stage .set("2_current".to_string()) .sign(&kp); crdt.apply(stage_op.clone()); // stage_op.inner.seq == 1 // Persist BOTH ops in causal order (insert first, update second). // This means insert_op gets rowid < stage_op rowid. let now = chrono::Utc::now().to_rfc3339(); for op in [&insert_op, &stage_op] { let op_json = serde_json::to_string(op).unwrap(); let op_id = hex::encode(&op.id()); sqlx::query( "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", ) .bind(&op_id) .bind(op.inner.seq as i64) .bind(&op_json) .bind(&now) .execute(&pool) .await .unwrap(); } // Replay by rowid ASC (the fix). The insert must come before the field // update regardless of their field-level seq values. let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") .fetch_all(&pool) .await .unwrap(); let mut crdt2 = BaseCrdt::::new(&kp); for (json_str,) in &rows { let op: SignedOp = serde_json::from_str(json_str).unwrap(); crdt2.apply(op); } // The item must be in the CRDT and must reflect the stage update. let index2 = rebuild_index(&crdt2); assert!( index2.contains_key("511_story_target"), "item not found after rowid-order replay" ); let idx2 = index2["511_story_target"]; let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap(); assert_eq!( view.stage, "2_current", "stage field update lost during replay (bug 511 regression)" ); // Confirm the bug is reproducible by replaying seq ASC instead. // With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6), // fails ErrPathMismatch, and the item ends up at "1_backlog". let rows_wrong_order: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC") .fetch_all(&pool) .await .unwrap(); let mut crdt3 = BaseCrdt::::new(&kp); for (json_str,) in &rows_wrong_order { let op: SignedOp = serde_json::from_str(json_str).unwrap(); crdt3.apply(op); } let index3 = rebuild_index(&crdt3); // With seq ASC replay, the item is created (insert_op eventually runs) // but the stage update is lost (it ran before the item existed). if let Some(idx3) = index3.get("511_story_target") { let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap(); // The bug: stage is still "1_backlog" because the update was dropped. assert_eq!( view3.stage, "1_backlog", "expected seq-ASC replay to exhibit the bug (update lost)" ); } } // ── Story 889 regression tests ─────────────────────────────────────────────── /// Regression for story 889: a tombstoned story must not be resurrected by /// concurrent write_item calls racing the delete. Spawns a tokio task that /// hammers write_item every 10ms, tombstones the item mid-race, then verifies /// the projection stays empty for ~500ms and remains empty after the writer /// stops. /// /// The tokio current_thread runtime keeps all tasks on the same OS thread, so /// the thread-local test CRDT is visible to the spawned task. #[tokio::test] async fn tombstone_survives_concurrent_writes() { use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use super::super::read::{evict_item, read_item}; init_for_test(); let story_id = "889_story_tombstone_concurrent"; write_item( story_id, "2_current", Some("Tombstone Concurrent Test"), None, None, None, None, None, None, None, ); assert!( read_item(story_id).is_some(), "item must exist before eviction" ); let stop = Arc::new(AtomicBool::new(false)); let stop_clone = stop.clone(); let writer = tokio::task::spawn(async move { while !stop_clone.load(Ordering::Relaxed) { write_item( story_id, "2_current", Some("Tombstone Concurrent Test"), None, None, None, None, None, None, None, ); tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } }); tokio::time::sleep(tokio::time::Duration::from_millis(30)).await; evict_item(story_id).expect("evict_item must succeed"); let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(500); while tokio::time::Instant::now() < deadline { assert!( read_item(story_id).is_none(), "tombstoned story must not reappear while concurrent writes are in flight" ); tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } stop.store(true, Ordering::Relaxed); writer.await.unwrap(); assert!( read_item(story_id).is_none(), "tombstoned story must stay gone after concurrent writer stops" ); }