/// CRDT state layer for pipeline state, backed by SQLite. /// /// The CRDT document is the primary source of truth for pipeline item /// metadata (stage, name, agent, etc.). CRDT ops are persisted to SQLite so /// state survives restarts. The filesystem `.huskies/work/` directories are /// still updated as a secondary output for backwards compatibility. /// /// Stage transitions detected by `write_item()` are broadcast as [`CrdtEvent`]s /// so subscribers (auto-assign, WebSocket, notifications) can react without /// polling the filesystem. use std::collections::HashMap; use std::sync::{Mutex, OnceLock}; use bft_json_crdt::json_crdt::*; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::list_crdt::ListCrdt; use bft_json_crdt::lww_crdt::LwwRegisterCrdt; use bft_json_crdt::op::ROOT_ID; use fastcrypto::ed25519::Ed25519KeyPair; use fastcrypto::traits::ToFromBytes; use serde_json::json; use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; use std::path::Path; use tokio::sync::{broadcast, mpsc}; use crate::slog; // ── CRDT events ───────────────────────────────────────────────────── /// An event emitted when a pipeline item's stage changes in the CRDT document. #[derive(Clone, Debug)] pub struct CrdtEvent { /// Work item ID (e.g. `"42_story_my_feature"`). pub story_id: String, /// The stage the item was in before this transition, or `None` for new items. pub from_stage: Option, /// The stage the item is now in. pub to_stage: String, /// Human-readable story name from the CRDT document. pub name: Option, } /// Subscribe to CRDT state transition events. /// /// Returns `None` if the CRDT layer has not been initialised yet. pub fn subscribe() -> Option> { CRDT_EVENT_TX.get().map(|tx| tx.subscribe()) } static CRDT_EVENT_TX: OnceLock> = OnceLock::new(); // ── Sync broadcast (outgoing ops to peers) ────────────────────────── static SYNC_TX: OnceLock> = OnceLock::new(); /// Subscribe to locally-created CRDT ops for sync replication. /// /// Each `SignedOp` broadcast here was created by *this* node and should be /// forwarded to connected peers. Returns `None` before `init()`. pub fn subscribe_ops() -> Option> { SYNC_TX.get().map(|tx| tx.subscribe()) } /// Return all persisted `SignedOp`s in causal order (oldest first). /// /// Used during initial sync handshake so a newly-connected peer can /// reconstruct the full CRDT state. Returns `None` before `init()`. pub fn all_ops_json() -> Option> { ALL_OPS.get().map(|m| m.lock().unwrap().clone()) } static ALL_OPS: OnceLock>> = OnceLock::new(); // ── CRDT document types ────────────────────────────────────────────── #[add_crdt_fields] #[derive(Clone, CrdtNode, Debug)] pub struct PipelineDoc { pub items: ListCrdt, } #[add_crdt_fields] #[derive(Clone, CrdtNode, Debug)] pub struct PipelineItemCrdt { pub story_id: LwwRegisterCrdt, pub stage: LwwRegisterCrdt, pub name: LwwRegisterCrdt, pub agent: LwwRegisterCrdt, pub retry_count: LwwRegisterCrdt, pub blocked: LwwRegisterCrdt, pub depends_on: LwwRegisterCrdt, } // ── Read-side view types ───────────────────────────────────────────── /// A snapshot of a single pipeline item derived from the CRDT document. #[derive(Clone, Debug)] pub struct PipelineItemView { pub story_id: String, pub stage: String, pub name: Option, pub agent: Option, pub retry_count: Option, pub blocked: Option, pub depends_on: Option>, } // ── Internal state ─────────────────────────────────────────────────── struct CrdtState { crdt: BaseCrdt, keypair: Ed25519KeyPair, /// Maps story_id → index in the ListCrdt for O(1) lookup. index: HashMap, /// Channel sender for fire-and-forget op persistence. persist_tx: mpsc::UnboundedSender, } static CRDT_STATE: OnceLock> = OnceLock::new(); // ── Initialisation ─────────────────────────────────────────────────── /// Initialise the CRDT state layer. /// /// Opens the SQLite database, loads or creates a node keypair, replays any /// persisted ops to reconstruct state, and spawns a background persistence /// task. Safe to call only once; subsequent calls are no-ops. pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { if CRDT_STATE.get().is_some() { return Ok(()); } let options = SqliteConnectOptions::new() .filename(db_path) .create_if_missing(true); let pool = SqlitePool::connect_with(options).await?; sqlx::migrate!("./migrations").run(&pool).await?; // Load or create the node keypair. let keypair = load_or_create_keypair(&pool).await?; let mut crdt = BaseCrdt::::new(&keypair); // Replay persisted ops to reconstruct state. let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") .fetch_all(&pool) .await?; let mut all_ops_vec = Vec::with_capacity(rows.len()); for (op_json,) in &rows { if let Ok(signed_op) = serde_json::from_str::(op_json) { crdt.apply(signed_op); all_ops_vec.push(op_json.clone()); } else { slog!("[crdt] Warning: failed to deserialize stored op"); } } let _ = ALL_OPS.set(Mutex::new(all_ops_vec)); // Build the index from the reconstructed state. let index = rebuild_index(&crdt); slog!( "[crdt] Initialised: {} ops replayed, {} items indexed", rows.len(), index.len() ); // Spawn background persistence task. let (persist_tx, mut persist_rx) = mpsc::unbounded_channel::(); tokio::spawn(async move { while let Some(op) = persist_rx.recv().await { let op_json = match serde_json::to_string(&op) { Ok(j) => j, Err(e) => { slog!("[crdt] Failed to serialize op: {e}"); continue; } }; let op_id = hex::encode(&op.id()); let seq = op.inner.seq as i64; let now = chrono::Utc::now().to_rfc3339(); let result = sqlx::query( "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \ VALUES (?1, ?2, ?3, ?4) \ ON CONFLICT(op_id) DO NOTHING", ) .bind(&op_id) .bind(seq) .bind(&op_json) .bind(&now) .execute(&pool) .await; if let Err(e) = result { slog!("[crdt] Failed to persist op {}: {e}", &op_id[..12]); } } }); let state = CrdtState { crdt, keypair, index, persist_tx, }; let _ = CRDT_STATE.set(Mutex::new(state)); // Initialise the CRDT event broadcast channel. let (event_tx, _) = broadcast::channel::(256); let _ = CRDT_EVENT_TX.set(event_tx); // Initialise the sync broadcast channel for outgoing ops. let (sync_tx, _) = broadcast::channel::(1024); let _ = SYNC_TX.set(sync_tx); Ok(()) } /// Load or create the Ed25519 keypair used by this node. async fn load_or_create_keypair(pool: &SqlitePool) -> Result { let row: Option<(Vec,)> = sqlx::query_as("SELECT seed FROM crdt_node_identity WHERE id = 1") .fetch_optional(pool) .await?; if let Some((seed,)) = row { // Reconstruct from stored seed. The seed is the 32-byte private key. if let Ok(kp) = Ed25519KeyPair::from_bytes(&seed) { return Ok(kp); } slog!("[crdt] Stored keypair invalid, regenerating"); } let kp = make_keypair(); let seed = kp.as_bytes().to_vec(); sqlx::query("INSERT INTO crdt_node_identity (id, seed) VALUES (1, ?1) ON CONFLICT(id) DO UPDATE SET seed = excluded.seed") .bind(&seed) .execute(pool) .await?; Ok(kp) } /// Rebuild the story_id → list index mapping from the current CRDT state. fn rebuild_index(crdt: &BaseCrdt) -> HashMap { let mut map = HashMap::new(); for (i, item) in crdt.doc.items.iter().enumerate() { if let JsonValue::String(ref sid) = item.story_id.view() { map.insert(sid.clone(), i); } } map } // ── Write path ─────────────────────────────────────────────────────── /// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the /// persistence channel. The closure receives `&mut CrdtState` so it can /// mutably access the CRDT document, while `sign` only needs `&keypair`. fn apply_and_persist(state: &mut CrdtState, op_fn: F) where F: FnOnce(&mut CrdtState) -> bft_json_crdt::op::Op, { let raw_op = op_fn(state); let signed = raw_op.sign(&state.keypair); state.crdt.apply(signed.clone()); let _ = state.persist_tx.send(signed.clone()); // Track in ALL_OPS and broadcast to sync peers. if let Ok(json) = serde_json::to_string(&signed) && let Some(all) = ALL_OPS.get() && let Ok(mut v) = all.lock() { v.push(json); } if let Some(tx) = SYNC_TX.get() { let _ = tx.send(signed); } } /// Write a pipeline item state through CRDT operations. /// /// If the item exists, updates its registers. If not, inserts a new item /// into the list. All ops are signed and persisted to SQLite. /// /// When the stage changes (or a new item is created), a [`CrdtEvent`] is /// broadcast so subscribers can react to the transition. pub fn write_item( story_id: &str, stage: &str, name: Option<&str>, agent: Option<&str>, retry_count: Option, blocked: Option, depends_on: Option<&str>, ) { let Some(state_mutex) = CRDT_STATE.get() else { return; }; let Ok(mut state) = state_mutex.lock() else { return; }; if let Some(&idx) = state.index.get(story_id) { // Capture the old stage before updating so we can detect transitions. let old_stage = match state.crdt.doc.items[idx].stage.view() { JsonValue::String(s) => Some(s), _ => None, }; // Update existing item registers. apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].stage.set(stage.to_string()) }); if let Some(n) = name { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].name.set(n.to_string()) }); } if let Some(a) = agent { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].agent.set(a.to_string()) }); } if let Some(rc) = retry_count { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].retry_count.set(rc as f64) }); } if let Some(b) = blocked { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].blocked.set(b) }); } if let Some(d) = depends_on { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].depends_on.set(d.to_string()) }); } // Broadcast a CrdtEvent if the stage actually changed. let stage_changed = old_stage.as_deref() != Some(stage); if stage_changed { // Read the current name from the CRDT document for the event. let current_name = match state.crdt.doc.items[idx].name.view() { JsonValue::String(s) if !s.is_empty() => Some(s), _ => None, }; emit_event(CrdtEvent { story_id: story_id.to_string(), from_stage: old_stage, to_stage: stage.to_string(), name: current_name, }); } } else { // Insert new item. let item_json: JsonValue = json!({ "story_id": story_id, "stage": stage, "name": name.unwrap_or(""), "agent": agent.unwrap_or(""), "retry_count": retry_count.unwrap_or(0) as f64, "blocked": blocked.unwrap_or(false), "depends_on": depends_on.unwrap_or(""), }) .into(); apply_and_persist(&mut state, |s| { s.crdt.doc.items.insert(ROOT_ID, item_json) }); // Rebuild index after insertion (indices may shift). state.index = rebuild_index(&state.crdt); // Broadcast a CrdtEvent for the new item. emit_event(CrdtEvent { story_id: story_id.to_string(), from_stage: None, to_stage: stage.to_string(), name: name.map(String::from), }); } } /// Broadcast a CRDT event to all subscribers. fn emit_event(event: CrdtEvent) { if let Some(tx) = CRDT_EVENT_TX.get() { let _ = tx.send(event); } } // ── Remote op ingestion (from sync peers) ─────────────────────────── /// Apply a `SignedOp` received from a remote peer. /// /// The op is validated, applied to the local CRDT, persisted to SQLite, /// and any resulting stage transitions are broadcast as [`CrdtEvent`]s. /// Unlike `apply_and_persist`, this does **not** re-broadcast the op on /// the sync channel (to avoid infinite echo loops). /// /// Returns `true` if the op was new and applied, `false` if it was a /// duplicate or failed validation. pub fn apply_remote_op(op: SignedOp) -> bool { let Some(state_mutex) = CRDT_STATE.get() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; // Snapshot stage state before applying so we can detect transitions. let pre_stages: HashMap = state .index .iter() .filter_map(|(sid, &idx)| { match state.crdt.doc.items[idx].stage.view() { JsonValue::String(s) => Some((sid.clone(), s)), _ => None, } }) .collect(); let result = state.crdt.apply(op.clone()); if result != bft_json_crdt::json_crdt::OpState::Ok && result != bft_json_crdt::json_crdt::OpState::MissingCausalDependencies { return false; } // Persist the op (fire-and-forget). let _ = state.persist_tx.send(op.clone()); // Track in ALL_OPS. if let Ok(json) = serde_json::to_string(&op) && let Some(all) = ALL_OPS.get() && let Ok(mut v) = all.lock() { v.push(json); } // Rebuild index (new items may have been inserted). state.index = rebuild_index(&state.crdt); // Detect and broadcast stage transitions. for (sid, &idx) in &state.index { let new_stage = match state.crdt.doc.items[idx].stage.view() { JsonValue::String(s) => s, _ => continue, }; let old_stage = pre_stages.get(sid).cloned(); let changed = old_stage.as_deref() != Some(&new_stage); if changed { let name = match state.crdt.doc.items[idx].name.view() { JsonValue::String(s) if !s.is_empty() => Some(s), _ => None, }; emit_event(CrdtEvent { story_id: sid.clone(), from_stage: old_stage, to_stage: new_stage, name, }); } } true } // ── Read path ──────────────────────────────────────────────────────── /// Read the full pipeline state from the CRDT document. /// /// Returns items grouped by stage, or `None` if the CRDT layer is not /// initialised. pub fn read_all_items() -> Option> { let state_mutex = CRDT_STATE.get()?; let state = state_mutex.lock().ok()?; let mut items = Vec::new(); for item_crdt in state.crdt.doc.items.iter() { if let Some(view) = extract_item_view(item_crdt) { items.push(view); } } Some(items) } /// Read a single pipeline item by story_id. pub fn read_item(story_id: &str) -> Option { let state_mutex = CRDT_STATE.get()?; let state = state_mutex.lock().ok()?; let &idx = state.index.get(story_id)?; extract_item_view(&state.crdt.doc.items[idx]) } /// Mark a story as deleted in the in-memory CRDT and persist a tombstone op. /// /// This is the eviction primitive for story 521 — it lets external callers /// (e.g. the `purge_story` MCP tool, or operator scripts during incident /// response) clear an item from the running server's in-memory state /// without needing a full process restart. /// /// Specifically: /// 1. Looks up the item's CRDT `OpId` via the visible-index map. /// 2. Constructs a delete op via the bft-json-crdt list `delete()` primitive. /// 3. Signs it with the local node's keypair and applies it to the in-memory /// CRDT (marking the item `is_deleted = true` so subsequent /// `read_all_items` / `read_item` calls skip it). /// 4. Persists the signed delete op to `crdt_ops` via the existing /// `apply_and_persist` channel — so the eviction survives a restart. /// 5. Rebuilds the `story_id → visible_index` map (visible indices shift /// when an item is marked deleted). /// 6. Drops the in-memory content-store entry for the story so the cached /// body doesn't outlive the CRDT entry. /// /// Returns `Ok(())` if the item was found and a tombstone op was queued, /// or an `Err` if the CRDT layer isn't initialised or the story_id is /// unknown to the in-memory state. pub fn evict_item(story_id: &str) -> Result<(), String> { let state_mutex = CRDT_STATE .get() .ok_or_else(|| "CRDT layer not initialised".to_string())?; let mut state = state_mutex .lock() .map_err(|e| format!("CRDT lock poisoned: {e}"))?; let idx = state .index .get(story_id) .copied() .ok_or_else(|| format!("Story '{story_id}' not found in in-memory CRDT"))?; // Resolve the item's OpId before the closure (the closure will mutably // borrow `state`, so we can't access `state.crdt.doc.items` from inside). let item_id = state .crdt .doc .items .id_at(idx) .ok_or_else(|| format!("Item index {idx} for '{story_id}' did not resolve to an OpId"))?; // Write the delete op via the existing apply_and_persist machinery. // This signs the op, applies it to the in-memory CRDT (marking the item // is_deleted), and sends it to the persistence task. apply_and_persist(&mut state, |s| s.crdt.doc.items.delete(item_id)); // Rebuild the story_id → visible_index map; the deleted item is no // longer counted by the iter that rebuild_index uses. state.index = rebuild_index(&state.crdt); // Drop the content-store entry so the cached body doesn't outlive the // CRDT entry. (Bug 521 follow-up: when CONTENT_STORE becomes a true // lazy cache, this explicit eviction can go away.) crate::db::delete_content(story_id); Ok(()) } /// Extract a `PipelineItemView` from a `PipelineItemCrdt`. fn extract_item_view(item: &PipelineItemCrdt) -> Option { let story_id = match item.story_id.view() { JsonValue::String(s) if !s.is_empty() => s, _ => return None, }; let stage = match item.stage.view() { JsonValue::String(s) if !s.is_empty() => s, _ => return None, }; let name = match item.name.view() { JsonValue::String(s) if !s.is_empty() => Some(s), _ => None, }; let agent = match item.agent.view() { JsonValue::String(s) if !s.is_empty() => Some(s), _ => None, }; let retry_count = match item.retry_count.view() { JsonValue::Number(n) if n > 0.0 => Some(n as i64), _ => None, }; let blocked = match item.blocked.view() { JsonValue::Bool(b) => Some(b), _ => None, }; let depends_on = match item.depends_on.view() { JsonValue::String(s) if !s.is_empty() => { serde_json::from_str::>(&s).ok() } _ => None, }; Some(PipelineItemView { story_id, stage, name, agent, retry_count, blocked, depends_on, }) } /// Check whether a dependency (by numeric ID prefix) is in `5_done` or `6_archived` /// according to CRDT state. /// /// Returns `true` if the dependency is satisfied (item found in a done stage). /// See `dep_is_archived_crdt` to distinguish archive-satisfied from cleanly-done. pub fn dep_is_done_crdt(dep_number: u32) -> bool { let prefix = format!("{dep_number}_"); if let Some(items) = read_all_items() { items.iter().any(|item| { item.story_id.starts_with(&prefix) && matches!(item.stage.as_str(), "5_done" | "6_archived") }) } else { false } } /// Check whether a dependency (by numeric ID prefix) is specifically in `6_archived` /// according to CRDT state. /// /// Used to detect when a dependency is satisfied via archive rather than via a clean /// completion through `5_done`. Returns `false` when the CRDT layer is not initialised. pub fn dep_is_archived_crdt(dep_number: u32) -> bool { let prefix = format!("{dep_number}_"); if let Some(items) = read_all_items() { items.iter().any(|item| { item.story_id.starts_with(&prefix) && item.stage == "6_archived" }) } else { false } } /// Check unmet dependencies for a story by reading its `depends_on` from the /// CRDT document and checking each dependency against CRDT state. /// /// Returns the list of dependency numbers that are NOT in `5_done` or `6_archived`. pub fn check_unmet_deps_crdt(story_id: &str) -> Vec { let item = match read_item(story_id) { Some(i) => i, None => return Vec::new(), }; let deps = match item.depends_on { Some(d) => d, None => return Vec::new(), }; deps.into_iter() .filter(|&dep| !dep_is_done_crdt(dep)) .collect() } /// Return the list of dependency numbers from `story_id`'s `depends_on` that are /// specifically in `6_archived` according to CRDT state. /// /// Used to emit a warning when promotion fires because a dep is archived rather than /// cleanly completed. Returns an empty `Vec` when no deps are archived. pub fn check_archived_deps_crdt(story_id: &str) -> Vec { let item = match read_item(story_id) { Some(i) => i, None => return Vec::new(), }; let deps = match item.depends_on { Some(d) => d, None => return Vec::new(), }; deps.into_iter() .filter(|&dep| dep_is_archived_crdt(dep)) .collect() } /// Hex-encode a byte slice (no external dep needed). mod hex { pub fn encode(bytes: &[u8]) -> String { bytes.iter().map(|b| format!("{b:02x}")).collect() } } // ── Tests ──────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; use bft_json_crdt::json_crdt::OpState; #[test] fn crdt_doc_insert_and_view() { let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); let item_json: JsonValue = json!({ "story_id": "10_story_test", "stage": "2_current", "name": "Test Story", "agent": "coder-opus", "retry_count": 0.0, "blocked": false, "depends_on": "", }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp); assert_eq!(crdt.apply(op), OpState::Ok); let view = crdt.doc.items.view(); assert_eq!(view.len(), 1); let item = &crdt.doc.items[0]; assert_eq!(item.story_id.view(), JsonValue::String("10_story_test".to_string())); assert_eq!(item.stage.view(), JsonValue::String("2_current".to_string())); } #[test] fn crdt_doc_update_stage() { let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); let item_json: JsonValue = json!({ "story_id": "20_story_move", "stage": "1_backlog", "name": "Move Me", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", }) .into(); let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp); crdt.apply(insert_op); // Update stage let stage_op = crdt.doc.items[0].stage.set("2_current".to_string()).sign(&kp); crdt.apply(stage_op); assert_eq!( crdt.doc.items[0].stage.view(), JsonValue::String("2_current".to_string()) ); } #[test] fn crdt_ops_replay_reconstructs_state() { let kp = make_keypair(); let mut crdt1 = BaseCrdt::::new(&kp); // Build state with a series of ops. let item_json: JsonValue = json!({ "story_id": "30_story_replay", "stage": "1_backlog", "name": "Replay Test", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", }) .into(); let op1 = crdt1.doc.items.insert(ROOT_ID, item_json).sign(&kp); crdt1.apply(op1.clone()); let op2 = crdt1.doc.items[0].stage.set("2_current".to_string()).sign(&kp); crdt1.apply(op2.clone()); let op3 = crdt1.doc.items[0].name.set("Updated Name".to_string()).sign(&kp); crdt1.apply(op3.clone()); // Replay ops on a fresh CRDT. let mut crdt2 = BaseCrdt::::new(&kp); crdt2.apply(op1); crdt2.apply(op2); crdt2.apply(op3); assert_eq!( crdt1.doc.items[0].stage.view(), crdt2.doc.items[0].stage.view() ); assert_eq!( crdt1.doc.items[0].name.view(), crdt2.doc.items[0].name.view() ); } #[test] fn extract_item_view_parses_crdt_item() { let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); let item_json: JsonValue = json!({ "story_id": "40_story_view", "stage": "3_qa", "name": "View Test", "agent": "coder-1", "retry_count": 2.0, "blocked": true, "depends_on": "[10,20]", }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp); crdt.apply(op); let view = extract_item_view(&crdt.doc.items[0]).unwrap(); assert_eq!(view.story_id, "40_story_view"); assert_eq!(view.stage, "3_qa"); assert_eq!(view.name.as_deref(), Some("View Test")); assert_eq!(view.agent.as_deref(), Some("coder-1")); assert_eq!(view.retry_count, Some(2)); assert_eq!(view.blocked, Some(true)); assert_eq!(view.depends_on, Some(vec![10, 20])); } #[test] fn rebuild_index_maps_story_ids() { let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); for (sid, stage) in &[("10_story_a", "1_backlog"), ("20_story_b", "2_current")] { let item: JsonValue = json!({ "story_id": sid, "stage": stage, "name": "", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); crdt.apply(op); } let index = rebuild_index(&crdt); assert_eq!(index.len(), 2); assert!(index.contains_key("10_story_a")); assert!(index.contains_key("20_story_b")); } #[tokio::test] async fn init_and_write_read_roundtrip() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("crdt_test.db"); // Init directly (not via the global singleton, for test isolation). let options = SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); let keypair = make_keypair(); let mut crdt = BaseCrdt::::new(&keypair); // Insert and update like write_item does. let item_json: JsonValue = json!({ "story_id": "50_story_roundtrip", "stage": "1_backlog", "name": "Roundtrip", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", }) .into(); let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&keypair); crdt.apply(insert_op.clone()); // Persist the op. let op_json = serde_json::to_string(&insert_op).unwrap(); let op_id = hex::encode(&insert_op.id()); let now = chrono::Utc::now().to_rfc3339(); sqlx::query( "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", ) .bind(&op_id) .bind(insert_op.inner.seq as i64) .bind(&op_json) .bind(&now) .execute(&pool) .await .unwrap(); // Reconstruct from DB. let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") .fetch_all(&pool) .await .unwrap(); let mut crdt2 = BaseCrdt::::new(&keypair); for (json_str,) in &rows { let op: SignedOp = serde_json::from_str(json_str).unwrap(); crdt2.apply(op); } let view = extract_item_view(&crdt2.doc.items[0]).unwrap(); assert_eq!(view.story_id, "50_story_roundtrip"); assert_eq!(view.stage, "1_backlog"); assert_eq!(view.name.as_deref(), Some("Roundtrip")); } #[test] fn signed_op_serialization_roundtrip() { let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); let item: JsonValue = json!({ "story_id": "60_story_serde", "stage": "1_backlog", "name": "Serde Test", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); let json_str = serde_json::to_string(&op).unwrap(); let deserialized: SignedOp = serde_json::from_str(&json_str).unwrap(); assert_eq!(op.id(), deserialized.id()); assert_eq!(op.inner.seq, deserialized.inner.seq); } // ── CrdtEvent tests ───────────────────────────────────────────────── #[test] fn crdt_event_has_expected_fields() { let evt = CrdtEvent { story_id: "42_story_foo".to_string(), from_stage: Some("1_backlog".to_string()), to_stage: "2_current".to_string(), name: Some("Foo Feature".to_string()), }; assert_eq!(evt.story_id, "42_story_foo"); assert_eq!(evt.from_stage.as_deref(), Some("1_backlog")); assert_eq!(evt.to_stage, "2_current"); assert_eq!(evt.name.as_deref(), Some("Foo Feature")); } #[test] fn crdt_event_clone_preserves_data() { let evt = CrdtEvent { story_id: "10_story_bar".to_string(), from_stage: None, to_stage: "1_backlog".to_string(), name: None, }; let cloned = evt.clone(); assert_eq!(cloned.story_id, "10_story_bar"); assert!(cloned.from_stage.is_none()); assert!(cloned.name.is_none()); } #[test] fn emit_event_is_noop_when_channel_not_initialised() { // Before CRDT_EVENT_TX is set, emit_event should not panic. // This test verifies the guard clause works. In test binaries the // OnceLock may already be set by another test, so we just verify // the function doesn't panic regardless. emit_event(CrdtEvent { story_id: "99_story_noop".to_string(), from_stage: None, to_stage: "1_backlog".to_string(), name: None, }); } #[test] fn crdt_event_broadcast_channel_round_trip() { let (tx, mut rx) = broadcast::channel::(16); let evt = CrdtEvent { story_id: "70_story_broadcast".to_string(), from_stage: Some("1_backlog".to_string()), to_stage: "2_current".to_string(), name: Some("Broadcast Test".to_string()), }; tx.send(evt).unwrap(); let received = rx.try_recv().unwrap(); assert_eq!(received.story_id, "70_story_broadcast"); assert_eq!(received.from_stage.as_deref(), Some("1_backlog")); assert_eq!(received.to_stage, "2_current"); assert_eq!(received.name.as_deref(), Some("Broadcast Test")); } #[test] fn dep_is_done_crdt_returns_false_when_no_crdt_state() { // When the global CRDT state is not initialised (or in a test environment), // dep_is_done_crdt should return false rather than panicking. // Note: in the test binary the global may or may not be initialised, // but the function should never panic either way. let _ = dep_is_done_crdt(9999); } #[test] fn check_unmet_deps_crdt_returns_empty_when_item_not_found() { // Non-existent story should return empty deps. let result = check_unmet_deps_crdt("nonexistent_story"); assert!(result.is_empty()); } // ── Bug 503: archived-dep visibility ───────────────────────────────────── #[test] fn dep_is_archived_crdt_returns_false_when_no_crdt_state() { // When the global CRDT state is not initialised, must not panic. let _ = dep_is_archived_crdt(9998); } #[test] fn check_archived_deps_crdt_returns_empty_when_item_not_found() { // Non-existent story should return empty archived deps. let result = check_archived_deps_crdt("nonexistent_story_archived"); assert!(result.is_empty()); } // ── 478: WebSocket CRDT sync layer tests ──────────────────────────────── #[test] fn apply_remote_op_returns_false_when_not_initialised() { // Without the global CRDT state, apply_remote_op should return false. let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); let item: JsonValue = serde_json::json!({ "story_id": "80_story_remote", "stage": "1_backlog", "name": "Remote", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", }) .into(); let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp); // This uses the global state which may not be initialised in tests. let _ = apply_remote_op(op); } #[test] fn signed_op_survives_sync_serialization_roundtrip() { // Verify that a SignedOp serialised to JSON and back produces // the same op (critical for the sync wire protocol). let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); let item: JsonValue = serde_json::json!({ "story_id": "90_story_wire", "stage": "2_current", "name": "Wire Test", "agent": "coder", "retry_count": 1.0, "blocked": false, "depends_on": "[10]", }) .into(); let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp); let json1 = serde_json::to_string(&op).unwrap(); let roundtripped: SignedOp = serde_json::from_str(&json1).unwrap(); let json2 = serde_json::to_string(&roundtripped).unwrap(); assert_eq!(json1, json2); assert_eq!(op.id(), roundtripped.id()); assert_eq!(op.inner.seq, roundtripped.inner.seq); assert_eq!(op.author(), roundtripped.author()); } #[test] fn sync_broadcast_channel_round_trip() { let (tx, mut rx) = broadcast::channel::(16); let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); let item: JsonValue = serde_json::json!({ "story_id": "95_story_sync_bcast", "stage": "1_backlog", "name": "", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", }) .into(); let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp); tx.send(op.clone()).unwrap(); let received = rx.try_recv().unwrap(); assert_eq!(received.id(), op.id()); } // ── Bug 511: CRDT lamport clock resets on restart ──────────────────────── // // Root cause: Op::sign() always produces SignedOp with depends_on = vec![], // so the causal dependency queue never engages during replay. Field update // ops (seq=1,2,3 from each field's LwwRegisterCrdt counter) are replayed // before list insert ops (seq=N from the items ListCrdt counter) when // ordered by `seq ASC`. They fail ErrPathMismatch silently, their our_seq // is never updated, and the next field write re-uses seq=1. // // Fix: replay by `rowid ASC` (SQLite insertion order) instead of `seq ASC`. // Rowid preserves the causal order ops were originally applied in, so field // updates always come after the item insert they reference. #[tokio::test] async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("bug511.db"); let options = SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); // Insert 5 dummy items to advance items.our_seq to 5. for i in 0..5u32 { let sid = format!("{}_story_warmup", i); let item: JsonValue = json!({ "story_id": sid, "stage": "1_backlog", "name": "", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); crdt.apply(op.clone()); // We don't persist these to the DB — they are pre-history. } // Now insert the real item. items.our_seq was 5, so this op gets seq=6. let target_item: JsonValue = json!({ "story_id": "511_story_target", "stage": "1_backlog", "name": "Bug 511 target", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", }) .into(); let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp); crdt.apply(insert_op.clone()); // insert_op.inner.seq == 6 // Now update the stage. The stage LwwRegisterCrdt for this item starts // at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6. let idx = rebuild_index(&crdt)["511_story_target"]; let stage_op = crdt.doc.items[idx].stage.set("2_current".to_string()).sign(&kp); crdt.apply(stage_op.clone()); // stage_op.inner.seq == 1 // Persist BOTH ops in causal order (insert first, update second). // This means insert_op gets rowid < stage_op rowid. let now = chrono::Utc::now().to_rfc3339(); for op in [&insert_op, &stage_op] { let op_json = serde_json::to_string(op).unwrap(); let op_id = hex::encode(&op.id()); sqlx::query( "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", ) .bind(&op_id) .bind(op.inner.seq as i64) .bind(&op_json) .bind(&now) .execute(&pool) .await .unwrap(); } // Replay by rowid ASC (the fix). The insert must come before the field // update regardless of their field-level seq values. let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") .fetch_all(&pool) .await .unwrap(); let mut crdt2 = BaseCrdt::::new(&kp); for (json_str,) in &rows { let op: SignedOp = serde_json::from_str(json_str).unwrap(); crdt2.apply(op); } // The item must be in the CRDT and must reflect the stage update. let index2 = rebuild_index(&crdt2); assert!( index2.contains_key("511_story_target"), "item not found after rowid-order replay" ); let idx2 = index2["511_story_target"]; let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap(); assert_eq!( view.stage, "2_current", "stage field update lost during replay (bug 511 regression)" ); // Confirm the bug is reproducible by replaying seq ASC instead. // With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6), // fails ErrPathMismatch, and the item ends up at "1_backlog". let rows_wrong_order: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC") .fetch_all(&pool) .await .unwrap(); let mut crdt3 = BaseCrdt::::new(&kp); for (json_str,) in &rows_wrong_order { let op: SignedOp = serde_json::from_str(json_str).unwrap(); crdt3.apply(op); } let index3 = rebuild_index(&crdt3); // With seq ASC replay, the item is created (insert_op eventually runs) // but the stage update is lost (it ran before the item existed). if let Some(idx3) = index3.get("511_story_target") { let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap(); // The bug: stage is still "1_backlog" because the update was dropped. assert_eq!( view3.stage, "1_backlog", "expected seq-ASC replay to exhibit the bug (update lost)" ); } } }