diff --git a/server/src/crdt_state/state.rs b/server/src/crdt_state/state.rs deleted file mode 100644 index ef5e3eb3..00000000 --- a/server/src/crdt_state/state.rs +++ /dev/null @@ -1,869 +0,0 @@ -//! Internal CRDT state struct, statics, initialisation, and central write primitive. - -#![allow(unused_imports, dead_code)] -use std::collections::HashMap; -use std::path::Path; -use std::sync::{Mutex, OnceLock}; - -use bft_json_crdt::json_crdt::*; -use bft_json_crdt::keypair::make_keypair; -use fastcrypto::ed25519::Ed25519KeyPair; -use fastcrypto::traits::ToFromBytes; -use serde_json::json; -use sqlx::SqlitePool; -use sqlx::sqlite::SqliteConnectOptions; -use tokio::sync::{broadcast, mpsc}; - -use super::VectorClock; -use super::hex; -use super::types::{CrdtEvent, PipelineDoc}; -use crate::slog; - -// ── Sync broadcast channels ────────────────────────────────────────── - -pub(super) static CRDT_EVENT_TX: OnceLock> = OnceLock::new(); - -pub(super) static SYNC_TX: OnceLock> = OnceLock::new(); - -/// All persisted ops as JSON strings, in causal (insertion) order. -/// -/// Pub(crate) so that `crdt_snapshot` can access it for compaction. -pub(crate) static ALL_OPS: OnceLock>> = OnceLock::new(); - -/// Live vector clock tracking op counts per author. -/// -/// Updated in lockstep with `ALL_OPS` — every time an op is appended to the -/// journal, the corresponding author's count is incremented here. This avoids -/// re-parsing all ops when a peer requests `our_vector_clock()`. -pub(crate) static VECTOR_CLOCK: OnceLock> = OnceLock::new(); - -/// Append an op's JSON to `ALL_OPS` and bump the author's count in `VECTOR_CLOCK`. -/// -/// Centralises the bookkeeping that must stay in sync between the two statics. -pub(super) fn track_op(signed: &SignedOp, json: String) { - if let Some(all) = ALL_OPS.get() - && let Ok(mut v) = all.lock() - { - v.push(json); - } - if let Some(vc) = VECTOR_CLOCK.get() - && let Ok(mut clock) = vc.lock() - { - let author_hex = super::hex::encode(&signed.author()); - *clock.entry(author_hex).or_insert(0) += 1; - } -} - -pub(super) struct CrdtState { - pub(super) crdt: BaseCrdt, - pub(super) keypair: Ed25519KeyPair, - /// Maps story_id → index in the items ListCrdt for O(1) lookup. - pub(super) index: HashMap, - /// Maps node_id (hex) → index in the nodes ListCrdt for O(1) lookup. - pub(super) node_index: HashMap, - /// Maps agent_id → index in the tokens ListCrdt for O(1) lookup. - pub(super) token_index: HashMap, - /// Maps story_id → index in the merge_jobs ListCrdt for O(1) lookup. - pub(super) merge_job_index: HashMap, - /// Maps agent_id → index in the active_agents ListCrdt for O(1) lookup. - pub(super) active_agent_index: HashMap, - /// Maps story_id → index in the test_jobs ListCrdt for O(1) lookup. - pub(super) test_job_index: HashMap, - /// Maps node_id → index in the agent_throttle ListCrdt for O(1) lookup. - pub(super) agent_throttle_index: HashMap, - /// Maps project name → index in the gateway_projects ListCrdt for O(1) lookup. - pub(super) gateway_project_index: HashMap, - /// Channel sender for fire-and-forget op persistence. - pub(super) persist_tx: mpsc::UnboundedSender, - /// Max sequence number seen across all ops during init() replay. - /// - /// Newly-created registers (post-init) must have their Lamport clock - /// advanced to this floor so they don't re-emit low sequence numbers. - pub(super) lamport_floor: u64, -} - -static CRDT_STATE: OnceLock> = OnceLock::new(); - -#[cfg(test)] -thread_local! { - static CRDT_STATE_TL: OnceLock> = const { OnceLock::new() }; -} - -#[cfg(not(test))] -pub(super) fn get_crdt() -> Option<&'static Mutex> { - CRDT_STATE.get() -} - -#[cfg(test)] -pub(super) fn get_crdt() -> Option<&'static Mutex> { - let tl = CRDT_STATE_TL.with(|lock| { - if lock.get().is_some() { - Some(lock as *const OnceLock>) - } else { - None - } - }); - if let Some(ptr) = tl { - // SAFETY: The thread-local lives as long as the thread, which outlives - // any test using it. We only need 'static for the return type. - let lock = unsafe { &*ptr }; - lock.get() - } else { - CRDT_STATE.get() - } -} - -/// Initialise the CRDT state layer. -/// -/// Opens the SQLite database, loads or creates a node keypair, replays any -/// persisted ops to reconstruct state, and spawns a background persistence -/// task. Safe to call only once; subsequent calls are no-ops. -pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { - if CRDT_STATE.get().is_some() { - return Ok(()); - } - - let options = SqliteConnectOptions::new() - .filename(db_path) - .create_if_missing(true); - let pool = SqlitePool::connect_with(options).await?; - sqlx::migrate!("./migrations").run(&pool).await?; - - // Load or create the node keypair. - let keypair = load_or_create_keypair(&pool).await?; - let mut crdt = BaseCrdt::::new(&keypair); - - // Replay persisted ops to reconstruct state. - let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") - .fetch_all(&pool) - .await?; - - let mut all_ops_vec = Vec::with_capacity(rows.len()); - let mut vector_clock = VectorClock::new(); - let mut lamport_floor: u64 = 0; - for (op_json,) in &rows { - if let Ok(signed_op) = serde_json::from_str::(op_json) { - let author_hex = hex::encode(&signed_op.author()); - *vector_clock.entry(author_hex).or_insert(0) += 1; - lamport_floor = lamport_floor.max(signed_op.inner.seq); - crdt.apply(signed_op); - all_ops_vec.push(op_json.clone()); - } else { - slog!("[crdt] Warning: failed to deserialize stored op"); - } - } - let _ = ALL_OPS.set(Mutex::new(all_ops_vec)); - let _ = VECTOR_CLOCK.set(Mutex::new(vector_clock)); - - // Build the indices from the reconstructed state. - let index = rebuild_index(&crdt); - let node_index = rebuild_node_index(&crdt); - let token_index = rebuild_token_index(&crdt); - let merge_job_index = rebuild_merge_job_index(&crdt); - let active_agent_index = rebuild_active_agent_index(&crdt); - let test_job_index = rebuild_test_job_index(&crdt); - let agent_throttle_index = rebuild_agent_throttle_index(&crdt); - let gateway_project_index = rebuild_gateway_project_index(&crdt); - - // Advance the top-level list clocks to the Lamport floor so that - // list-level inserts don't re-emit low seq numbers. - crdt.doc.items.advance_seq(lamport_floor); - crdt.doc.nodes.advance_seq(lamport_floor); - crdt.doc.tokens.advance_seq(lamport_floor); - crdt.doc.merge_jobs.advance_seq(lamport_floor); - crdt.doc.active_agents.advance_seq(lamport_floor); - crdt.doc.test_jobs.advance_seq(lamport_floor); - crdt.doc.agent_throttle.advance_seq(lamport_floor); - crdt.doc.gateway_projects.advance_seq(lamport_floor); - crdt.doc - .gateway_config - .active_project - .advance_seq(lamport_floor); - - slog!( - "[crdt] Initialised: {} ops replayed, {} items indexed, {} nodes indexed, lamport_floor={}", - rows.len(), - index.len(), - node_index.len(), - lamport_floor, - ); - - // Spawn background persistence task. - let (persist_tx, mut persist_rx) = mpsc::unbounded_channel::(); - - tokio::spawn(async move { - while let Some(op) = persist_rx.recv().await { - let op_json = match serde_json::to_string(&op) { - Ok(j) => j, - Err(e) => { - slog!("[crdt] Failed to serialize op: {e}"); - continue; - } - }; - let op_id = hex::encode(&op.id()); - let seq = op.inner.seq as i64; - let now = chrono::Utc::now().to_rfc3339(); - - let result = sqlx::query( - "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \ - VALUES (?1, ?2, ?3, ?4) \ - ON CONFLICT(op_id) DO NOTHING", - ) - .bind(&op_id) - .bind(seq) - .bind(&op_json) - .bind(&now) - .execute(&pool) - .await; - - if let Err(e) = result { - slog!("[crdt] Failed to persist op {}: {e}", &op_id[..12]); - } - } - }); - - let state = CrdtState { - crdt, - keypair, - index, - node_index, - token_index, - merge_job_index, - active_agent_index, - test_job_index, - agent_throttle_index, - gateway_project_index, - persist_tx, - lamport_floor, - }; - - let _ = CRDT_STATE.set(Mutex::new(state)); - - // Initialise the CRDT event broadcast channel. - let (event_tx, _) = broadcast::channel::(256); - let _ = CRDT_EVENT_TX.set(event_tx); - - // Initialise the sync broadcast channel for outgoing ops. - let (sync_tx, _) = broadcast::channel::(1024); - let _ = SYNC_TX.set(sync_tx); - - Ok(()) -} - -/// Initialise a minimal in-memory CRDT state for unit tests. -/// -/// This avoids the async SQLite setup from `init()`. Ops are accepted via a -/// channel whose receiver is immediately dropped, so nothing is persisted. -/// Safe to call multiple times — subsequent calls are no-ops (OnceLock). -#[cfg(test)] -pub fn init_for_test() { - // Initialise thread-local CRDT for test isolation. - // Only creates a new CRDT if one isn't set yet on this thread; - // subsequent calls are no-ops (matching the old OnceLock semantics - // while keeping each thread isolated). - CRDT_STATE_TL.with(|lock| { - if lock.get().is_none() { - let keypair = make_keypair(); - let crdt = BaseCrdt::::new(&keypair); - let (persist_tx, _rx) = mpsc::unbounded_channel(); - let state = CrdtState { - crdt, - keypair, - index: HashMap::new(), - node_index: HashMap::new(), - token_index: HashMap::new(), - merge_job_index: HashMap::new(), - active_agent_index: HashMap::new(), - test_job_index: HashMap::new(), - agent_throttle_index: HashMap::new(), - gateway_project_index: HashMap::new(), - persist_tx, - lamport_floor: 0, - }; - let _ = lock.set(Mutex::new(state)); - } - }); - let _ = CRDT_EVENT_TX.get_or_init(|| broadcast::channel::(256).0); - let _ = SYNC_TX.get_or_init(|| broadcast::channel::(1024).0); - let _ = ALL_OPS.get_or_init(|| Mutex::new(Vec::new())); - let _ = VECTOR_CLOCK.get_or_init(|| Mutex::new(VectorClock::new())); -} - -/// Load or create the Ed25519 keypair used by this node. -async fn load_or_create_keypair(pool: &SqlitePool) -> Result { - let row: Option<(Vec,)> = - sqlx::query_as("SELECT seed FROM crdt_node_identity WHERE id = 1") - .fetch_optional(pool) - .await?; - - if let Some((seed,)) = row { - // Reconstruct from stored seed. The seed is the 32-byte private key. - if let Ok(kp) = Ed25519KeyPair::from_bytes(&seed) { - return Ok(kp); - } - slog!("[crdt] Stored keypair invalid, regenerating"); - } - - let kp = make_keypair(); - let seed = kp.as_bytes().to_vec(); - sqlx::query("INSERT INTO crdt_node_identity (id, seed) VALUES (1, ?1) ON CONFLICT(id) DO UPDATE SET seed = excluded.seed") - .bind(&seed) - .execute(pool) - .await?; - - Ok(kp) -} - -/// Rebuild the story_id → list index mapping from the current CRDT state. -pub(super) fn rebuild_index(crdt: &BaseCrdt) -> HashMap { - let mut map = HashMap::new(); - for (i, item) in crdt.doc.items.iter().enumerate() { - if let JsonValue::String(ref sid) = item.story_id.view() { - map.insert(sid.clone(), i); - } - } - map -} - -/// Rebuild the node_id → nodes list index mapping from the current CRDT state. -pub(super) fn rebuild_node_index(crdt: &BaseCrdt) -> HashMap { - let mut map = HashMap::new(); - for (i, node) in crdt.doc.nodes.iter().enumerate() { - if let JsonValue::String(ref nid) = node.node_id.view() { - map.insert(nid.clone(), i); - } - } - map -} - -/// Rebuild the agent_id → tokens list index. -pub(super) fn rebuild_token_index(crdt: &BaseCrdt) -> HashMap { - let mut map = HashMap::new(); - for (i, entry) in crdt.doc.tokens.iter().enumerate() { - if let JsonValue::String(ref k) = entry.agent_id.view() { - map.insert(k.clone(), i); - } - } - map -} - -/// Rebuild the story_id → merge_jobs list index. -pub(super) fn rebuild_merge_job_index(crdt: &BaseCrdt) -> HashMap { - let mut map = HashMap::new(); - for (i, entry) in crdt.doc.merge_jobs.iter().enumerate() { - if let JsonValue::String(ref k) = entry.story_id.view() { - map.insert(k.clone(), i); - } - } - map -} - -/// Rebuild the agent_id → active_agents list index. -pub(super) fn rebuild_active_agent_index(crdt: &BaseCrdt) -> HashMap { - let mut map = HashMap::new(); - for (i, entry) in crdt.doc.active_agents.iter().enumerate() { - if let JsonValue::String(ref k) = entry.agent_id.view() { - map.insert(k.clone(), i); - } - } - map -} - -/// Rebuild the story_id → test_jobs list index. -pub(super) fn rebuild_test_job_index(crdt: &BaseCrdt) -> HashMap { - let mut map = HashMap::new(); - for (i, entry) in crdt.doc.test_jobs.iter().enumerate() { - if let JsonValue::String(ref k) = entry.story_id.view() { - map.insert(k.clone(), i); - } - } - map -} - -/// Rebuild the node_id → agent_throttle list index. -pub(super) fn rebuild_agent_throttle_index(crdt: &BaseCrdt) -> HashMap { - let mut map = HashMap::new(); - for (i, entry) in crdt.doc.agent_throttle.iter().enumerate() { - if let JsonValue::String(ref k) = entry.node_id.view() { - map.insert(k.clone(), i); - } - } - map -} - -/// Rebuild the project name → gateway_projects list index. -pub(super) fn rebuild_gateway_project_index( - crdt: &BaseCrdt, -) -> HashMap { - let mut map = HashMap::new(); - for (i, entry) in crdt.doc.gateway_projects.iter().enumerate() { - if let JsonValue::String(ref k) = entry.name.view() { - map.insert(k.clone(), i); - } - } - map -} - -// ── Write path ─────────────────────────────────────────────────────── - -/// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the -/// persistence channel. The closure receives `&mut CrdtState` so it can -/// mutably access the CRDT document, while `sign` only needs `&keypair`. -pub(super) fn apply_and_persist(state: &mut CrdtState, op_fn: F) -where - F: FnOnce(&mut CrdtState) -> bft_json_crdt::op::Op, -{ - let raw_op = op_fn(state); - let signed = raw_op.sign(&state.keypair); - state.crdt.apply(signed.clone()); - if state.persist_tx.send(signed.clone()).is_err() { - let op_type = if signed.inner.is_deleted { - "Delete" - } else { - "Insert" - }; - let seq = signed.inner.seq; - crate::slog_warn!( - "[crdt_persist] persist channel send failed: op_type={op_type} seq={seq}" - ); - } - - // Track in ALL_OPS + VECTOR_CLOCK, then broadcast to sync peers. - if let Ok(json) = serde_json::to_string(&signed) { - track_op(&signed, json); - } - if let Some(tx) = SYNC_TX.get() { - let _ = tx.send(signed); - } -} - -/// Broadcast a CRDT event to all subscribers. -pub(super) fn emit_event(event: CrdtEvent) { - if let Some(tx) = CRDT_EVENT_TX.get() { - let _ = tx.send(event); - } -} - -#[cfg(test)] -mod tests { - use super::super::hex; - use super::super::read::{extract_item_view, read_item}; - use super::super::types::PipelineItemCrdt; - use super::super::write::write_item; - use super::*; - use bft_json_crdt::json_crdt::OpState; - use bft_json_crdt::keypair::make_keypair; - use bft_json_crdt::op::ROOT_ID; - use serde_json::json; - - #[test] - fn crdt_ops_replay_reconstructs_state() { - let kp = make_keypair(); - let mut crdt1 = BaseCrdt::::new(&kp); - - // Build state with a series of ops. - let item_json: JsonValue = json!({ - "story_id": "30_story_replay", - "stage": "1_backlog", - "name": "Replay Test", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - - let op1 = crdt1.doc.items.insert(ROOT_ID, item_json).sign(&kp); - crdt1.apply(op1.clone()); - - let op2 = crdt1.doc.items[0] - .stage - .set("2_current".to_string()) - .sign(&kp); - crdt1.apply(op2.clone()); - - let op3 = crdt1.doc.items[0] - .name - .set("Updated Name".to_string()) - .sign(&kp); - crdt1.apply(op3.clone()); - - // Replay ops on a fresh CRDT. - let mut crdt2 = BaseCrdt::::new(&kp); - crdt2.apply(op1); - crdt2.apply(op2); - crdt2.apply(op3); - - assert_eq!( - crdt1.doc.items[0].stage.view(), - crdt2.doc.items[0].stage.view() - ); - assert_eq!( - crdt1.doc.items[0].name.view(), - crdt2.doc.items[0].name.view() - ); - } - - #[test] - fn rebuild_index_maps_story_ids() { - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - - for (sid, stage) in &[("10_story_a", "1_backlog"), ("20_story_b", "2_current")] { - let item: JsonValue = json!({ - "story_id": sid, - "stage": stage, - "name": "", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); - crdt.apply(op); - } - - let index = rebuild_index(&crdt); - assert_eq!(index.len(), 2); - assert!(index.contains_key("10_story_a")); - assert!(index.contains_key("20_story_b")); - } - - #[tokio::test] - async fn init_and_write_read_roundtrip() { - let tmp = tempfile::tempdir().unwrap(); - let db_path = tmp.path().join("crdt_test.db"); - - // Init directly (not via the global singleton, for test isolation). - let options = SqliteConnectOptions::new() - .filename(&db_path) - .create_if_missing(true); - let pool = SqlitePool::connect_with(options).await.unwrap(); - sqlx::migrate!("./migrations").run(&pool).await.unwrap(); - - let keypair = make_keypair(); - let mut crdt = BaseCrdt::::new(&keypair); - - // Insert and update like write_item does. - let item_json: JsonValue = json!({ - "story_id": "50_story_roundtrip", - "stage": "1_backlog", - "name": "Roundtrip", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - - let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&keypair); - crdt.apply(insert_op.clone()); - - // Persist the op. - let op_json = serde_json::to_string(&insert_op).unwrap(); - let op_id = hex::encode(&insert_op.id()); - let now = chrono::Utc::now().to_rfc3339(); - sqlx::query( - "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", - ) - .bind(&op_id) - .bind(insert_op.inner.seq as i64) - .bind(&op_json) - .bind(&now) - .execute(&pool) - .await - .unwrap(); - - // Reconstruct from DB. - let rows: Vec<(String,)> = - sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") - .fetch_all(&pool) - .await - .unwrap(); - - let mut crdt2 = BaseCrdt::::new(&keypair); - for (json_str,) in &rows { - let op: SignedOp = serde_json::from_str(json_str).unwrap(); - crdt2.apply(op); - } - - let view = extract_item_view(&crdt2.doc.items[0]).unwrap(); - assert_eq!(view.story_id, "50_story_roundtrip"); - assert_eq!(view.stage, "1_backlog"); - assert_eq!(view.name.as_deref(), Some("Roundtrip")); - } - - #[test] - fn persist_tx_send_failure_logs_warn_with_op_type_and_seq() { - let kp = make_keypair(); - let crdt = BaseCrdt::::new(&kp); - let (persist_tx, persist_rx) = mpsc::unbounded_channel::(); - - let mut state = CrdtState { - crdt, - keypair: kp, - index: HashMap::new(), - node_index: HashMap::new(), - token_index: HashMap::new(), - merge_job_index: HashMap::new(), - active_agent_index: HashMap::new(), - test_job_index: HashMap::new(), - agent_throttle_index: HashMap::new(), - gateway_project_index: HashMap::new(), - persist_tx, - lamport_floor: 0, - }; - - // Drop the receiver so that the next send fails immediately. - drop(persist_rx); - - let item_json: JsonValue = json!({ - "story_id": "676_story_persist_fail", - "stage": "1_backlog", - "name": "Persist Fail Test", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - - let before_warns = crate::log_buffer::global() - .get_recent_entries(1000, None, Some(&crate::log_buffer::LogLevel::Warn)) - .len(); - - apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); - - let warn_entries = crate::log_buffer::global().get_recent_entries( - 1000, - None, - Some(&crate::log_buffer::LogLevel::Warn), - ); - - assert_eq!( - warn_entries.len(), - before_warns + 1, - "expected exactly one WARN log entry when persist_tx send fails" - ); - - let warn = &warn_entries[warn_entries.len() - 1]; - assert!( - warn.message.contains("[crdt_persist]"), - "WARN message must be prefixed [crdt_persist]: {}", - warn.message - ); - assert!( - warn.message.contains("op_type="), - "WARN message must include op_type: {}", - warn.message - ); - assert!( - warn.message.contains("seq="), - "WARN message must include seq: {}", - warn.message - ); - } - - #[test] - fn persist_tx_send_success_emits_no_warn() { - let kp = make_keypair(); - let crdt = BaseCrdt::::new(&kp); - let (persist_tx, _persist_rx) = mpsc::unbounded_channel::(); - - let mut state = CrdtState { - crdt, - keypair: kp, - index: HashMap::new(), - node_index: HashMap::new(), - token_index: HashMap::new(), - merge_job_index: HashMap::new(), - active_agent_index: HashMap::new(), - test_job_index: HashMap::new(), - agent_throttle_index: HashMap::new(), - gateway_project_index: HashMap::new(), - persist_tx, - lamport_floor: 0, - }; - - let item_json: JsonValue = json!({ - "story_id": "676_story_happy_path", - "stage": "1_backlog", - "name": "Happy Path Test", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - - let before_warns = crate::log_buffer::global() - .get_recent_entries( - 1000, - Some("[crdt_persist]"), - Some(&crate::log_buffer::LogLevel::Warn), - ) - .len(); - - apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); - - let after_warns = crate::log_buffer::global() - .get_recent_entries( - 1000, - Some("[crdt_persist]"), - Some(&crate::log_buffer::LogLevel::Warn), - ) - .len(); - - assert_eq!( - after_warns, before_warns, - "no [crdt_persist] WARN should be emitted when persist_tx send succeeds" - ); - } - - /// After replaying ops from a journal, a brand-new register created - /// post-init must emit its first local op with seq = lamport_floor + 1, - /// not seq = 1. This is the Phase C integration test. - #[tokio::test] - async fn restart_new_register_resumes_from_lamport_floor() { - let tmp = tempfile::tempdir().unwrap(); - let db_path = tmp.path().join("lamport_floor.db"); - - let options = SqliteConnectOptions::new() - .filename(&db_path) - .create_if_missing(true); - let pool = SqlitePool::connect_with(options).await.unwrap(); - sqlx::migrate!("./migrations").run(&pool).await.unwrap(); - - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - - // Insert an item and update its stage a few times to push seq up. - let item: JsonValue = json!({ - "story_id": "664_story_original", - "stage": "1_backlog", - "name": "Original", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - "merged_at": 0.0, - }) - .into(); - - let mut ops = Vec::new(); - - let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); - crdt.apply(op1.clone()); - ops.push(op1); - - let idx = rebuild_index(&crdt)["664_story_original"]; - let op2 = crdt.doc.items[idx] - .stage - .set("2_current".to_string()) - .sign(&kp); - crdt.apply(op2.clone()); - ops.push(op2); - - let op3 = crdt.doc.items[idx] - .stage - .set("3_review".to_string()) - .sign(&kp); - crdt.apply(op3.clone()); - ops.push(op3); - - // Record the max seq across all persisted ops — this is the floor. - let max_seq = ops.iter().map(|o| o.inner.seq).max().unwrap(); - - // Persist all ops. - let now = chrono::Utc::now().to_rfc3339(); - for op in &ops { - let op_json = serde_json::to_string(op).unwrap(); - let op_id = hex::encode(&op.id()); - sqlx::query( - "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", - ) - .bind(&op_id) - .bind(op.inner.seq as i64) - .bind(&op_json) - .bind(&now) - .execute(&pool) - .await - .unwrap(); - } - - // --- Simulate restart: replay from journal into a fresh CRDT --- - let rows: Vec<(String,)> = - sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") - .fetch_all(&pool) - .await - .unwrap(); - - let mut crdt2 = BaseCrdt::::new(&kp); - let mut lamport_floor: u64 = 0; - for (json_str,) in &rows { - let signed: SignedOp = serde_json::from_str(json_str).unwrap(); - lamport_floor = lamport_floor.max(signed.inner.seq); - crdt2.apply(signed); - } - - // Advance top-level lists (mirrors what init() does). - crdt2.doc.items.advance_seq(lamport_floor); - crdt2.doc.nodes.advance_seq(lamport_floor); - - assert_eq!(lamport_floor, max_seq); - - // Insert a brand-new item — simulating a new story arriving after restart. - let new_item: JsonValue = json!({ - "story_id": "664_story_new_after_restart", - "stage": "1_backlog", - "name": "New After Restart", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - "merged_at": 0.0, - }) - .into(); - - let insert_op = crdt2.doc.items.insert(ROOT_ID, new_item); - // The list-level insert must have seq > lamport_floor. - assert!( - insert_op.seq > max_seq, - "list insert seq ({}) must be > lamport_floor ({})", - insert_op.seq, - max_seq, - ); - let insert_signed = insert_op.sign(&kp); - crdt2.apply(insert_signed); - - // Advance the new item's inner registers to the floor (mirrors write_item). - let idx2 = rebuild_index(&crdt2)["664_story_new_after_restart"]; - let new_crdt_item = &mut crdt2.doc.items[idx2]; - new_crdt_item.stage.advance_seq(lamport_floor); - - // Now update the stage — the first field-level op must also be > floor. - let stage_op = crdt2.doc.items[idx2].stage.set("2_current".to_string()); - assert!( - stage_op.seq > max_seq, - "first field op seq ({}) on new register must be > lamport_floor ({}); \ - got seq = 1 means the register reset its clock on restart", - stage_op.seq, - max_seq, - ); - } -} diff --git a/server/src/crdt_state/state/apply.rs b/server/src/crdt_state/state/apply.rs new file mode 100644 index 00000000..a98e11e9 --- /dev/null +++ b/server/src/crdt_state/state/apply.rs @@ -0,0 +1,50 @@ +//! Write path: create, sign, apply, persist, and broadcast a CRDT op. +//! +//! [`apply_and_persist`] is the single entry point for all CRDT mutations. +//! It invokes the caller's op-factory closure, signs the resulting op, applies +//! it to the live document, sends it to the persistence channel, and broadcasts +//! it to sync peers via [`super::SYNC_TX`]. + +use bft_json_crdt::json_crdt::JsonValue; +use bft_json_crdt::op::Op; + +use super::super::types::CrdtEvent; +use super::{CrdtState, statics}; + +/// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the +/// persistence channel. The closure receives `&mut CrdtState` so it can +/// mutably access the CRDT document, while `sign` only needs `&keypair`. +pub(in crate::crdt_state) fn apply_and_persist(state: &mut CrdtState, op_fn: F) +where + F: FnOnce(&mut CrdtState) -> Op, +{ + let raw_op = op_fn(state); + let signed = raw_op.sign(&state.keypair); + state.crdt.apply(signed.clone()); + if state.persist_tx.send(signed.clone()).is_err() { + let op_type = if signed.inner.is_deleted { + "Delete" + } else { + "Insert" + }; + let seq = signed.inner.seq; + crate::slog_warn!( + "[crdt_persist] persist channel send failed: op_type={op_type} seq={seq}" + ); + } + + // Track in ALL_OPS + VECTOR_CLOCK, then broadcast to sync peers. + if let Ok(json) = serde_json::to_string(&signed) { + statics::track_op(&signed, json); + } + if let Some(tx) = statics::SYNC_TX.get() { + let _ = tx.send(signed); + } +} + +/// Broadcast a CRDT event to all subscribers. +pub(in crate::crdt_state) fn emit_event(event: CrdtEvent) { + if let Some(tx) = statics::CRDT_EVENT_TX.get() { + let _ = tx.send(event); + } +} diff --git a/server/src/crdt_state/state/indices.rs b/server/src/crdt_state/state/indices.rs new file mode 100644 index 00000000..31f801ec --- /dev/null +++ b/server/src/crdt_state/state/indices.rs @@ -0,0 +1,115 @@ +//! Index-rebuild helpers: map domain keys to `ListCrdt` positions for O(1) lookup. +//! +//! Each function scans the corresponding `ListCrdt` in the CRDT document and +//! returns a fresh `HashMap` from the entry's primary key to its position index. +//! These are called once during [`super::init::init`] to populate the secondary +//! indices in [`super::CrdtState`], and again by write helpers whenever an entry +//! is inserted. + +use std::collections::HashMap; + +use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue}; + +use super::super::types::PipelineDoc; + +/// Rebuild the story_id → list index mapping from the current CRDT state. +pub(in crate::crdt_state) fn rebuild_index(crdt: &BaseCrdt) -> HashMap { + let mut map = HashMap::new(); + for (i, item) in crdt.doc.items.iter().enumerate() { + if let JsonValue::String(ref sid) = item.story_id.view() { + map.insert(sid.clone(), i); + } + } + map +} + +/// Rebuild the node_id → nodes list index mapping from the current CRDT state. +pub(in crate::crdt_state) fn rebuild_node_index( + crdt: &BaseCrdt, +) -> HashMap { + let mut map = HashMap::new(); + for (i, node) in crdt.doc.nodes.iter().enumerate() { + if let JsonValue::String(ref nid) = node.node_id.view() { + map.insert(nid.clone(), i); + } + } + map +} + +/// Rebuild the agent_id → tokens list index. +pub(in crate::crdt_state) fn rebuild_token_index( + crdt: &BaseCrdt, +) -> HashMap { + let mut map = HashMap::new(); + for (i, entry) in crdt.doc.tokens.iter().enumerate() { + if let JsonValue::String(ref k) = entry.agent_id.view() { + map.insert(k.clone(), i); + } + } + map +} + +/// Rebuild the story_id → merge_jobs list index. +pub(in crate::crdt_state) fn rebuild_merge_job_index( + crdt: &BaseCrdt, +) -> HashMap { + let mut map = HashMap::new(); + for (i, entry) in crdt.doc.merge_jobs.iter().enumerate() { + if let JsonValue::String(ref k) = entry.story_id.view() { + map.insert(k.clone(), i); + } + } + map +} + +/// Rebuild the agent_id → active_agents list index. +pub(in crate::crdt_state) fn rebuild_active_agent_index( + crdt: &BaseCrdt, +) -> HashMap { + let mut map = HashMap::new(); + for (i, entry) in crdt.doc.active_agents.iter().enumerate() { + if let JsonValue::String(ref k) = entry.agent_id.view() { + map.insert(k.clone(), i); + } + } + map +} + +/// Rebuild the story_id → test_jobs list index. +pub(in crate::crdt_state) fn rebuild_test_job_index( + crdt: &BaseCrdt, +) -> HashMap { + let mut map = HashMap::new(); + for (i, entry) in crdt.doc.test_jobs.iter().enumerate() { + if let JsonValue::String(ref k) = entry.story_id.view() { + map.insert(k.clone(), i); + } + } + map +} + +/// Rebuild the node_id → agent_throttle list index. +pub(in crate::crdt_state) fn rebuild_agent_throttle_index( + crdt: &BaseCrdt, +) -> HashMap { + let mut map = HashMap::new(); + for (i, entry) in crdt.doc.agent_throttle.iter().enumerate() { + if let JsonValue::String(ref k) = entry.node_id.view() { + map.insert(k.clone(), i); + } + } + map +} + +/// Rebuild the project name → gateway_projects list index. +pub(in crate::crdt_state) fn rebuild_gateway_project_index( + crdt: &BaseCrdt, +) -> HashMap { + let mut map = HashMap::new(); + for (i, entry) in crdt.doc.gateway_projects.iter().enumerate() { + if let JsonValue::String(ref k) = entry.name.view() { + map.insert(k.clone(), i); + } + } + map +} diff --git a/server/src/crdt_state/state/init.rs b/server/src/crdt_state/state/init.rs new file mode 100644 index 00000000..7bb3a3f2 --- /dev/null +++ b/server/src/crdt_state/state/init.rs @@ -0,0 +1,192 @@ +//! CRDT initialisation: async startup and keypair persistence. +//! +//! [`init`] opens the SQLite database, loads or creates the node keypair, +//! replays all persisted ops to reconstruct state, and spawns a background +//! persistence task. It is safe to call only once; subsequent calls are +//! no-ops (guarded by [`super::CRDT_STATE`]). + +use std::collections::HashMap; +use std::path::Path; +use std::sync::Mutex; + +use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp}; +use bft_json_crdt::keypair::make_keypair; +use fastcrypto::ed25519::Ed25519KeyPair; +use fastcrypto::traits::ToFromBytes; +use sqlx::SqlitePool; +use sqlx::sqlite::SqliteConnectOptions; +use tokio::sync::{broadcast, mpsc}; + +use super::super::VectorClock; +use super::super::hex; +use super::super::types::{CrdtEvent, PipelineDoc}; +use super::indices::{ + rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_gateway_project_index, + rebuild_index, rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index, + rebuild_token_index, +}; +use super::statics::{ALL_OPS, CRDT_EVENT_TX, SYNC_TX, VECTOR_CLOCK}; +use super::{CRDT_STATE, CrdtState}; +use crate::slog; + +/// Initialise the CRDT state layer. +/// +/// Opens the SQLite database, loads or creates a node keypair, replays any +/// persisted ops to reconstruct state, and spawns a background persistence +/// task. Safe to call only once; subsequent calls are no-ops. +pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { + if CRDT_STATE.get().is_some() { + return Ok(()); + } + + let options = SqliteConnectOptions::new() + .filename(db_path) + .create_if_missing(true); + let pool = SqlitePool::connect_with(options).await?; + sqlx::migrate!("./migrations").run(&pool).await?; + + // Load or create the node keypair. + let keypair = load_or_create_keypair(&pool).await?; + let mut crdt = BaseCrdt::::new(&keypair); + + // Replay persisted ops to reconstruct state. + let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") + .fetch_all(&pool) + .await?; + + let mut all_ops_vec = Vec::with_capacity(rows.len()); + let mut vector_clock = VectorClock::new(); + let mut lamport_floor: u64 = 0; + for (op_json,) in &rows { + if let Ok(signed_op) = serde_json::from_str::(op_json) { + let author_hex = hex::encode(&signed_op.author()); + *vector_clock.entry(author_hex).or_insert(0) += 1; + lamport_floor = lamport_floor.max(signed_op.inner.seq); + crdt.apply(signed_op); + all_ops_vec.push(op_json.clone()); + } else { + slog!("[crdt] Warning: failed to deserialize stored op"); + } + } + let _ = ALL_OPS.set(Mutex::new(all_ops_vec)); + let _ = VECTOR_CLOCK.set(Mutex::new(vector_clock)); + + // Build the indices from the reconstructed state. + let index = rebuild_index(&crdt); + let node_index = rebuild_node_index(&crdt); + let token_index = rebuild_token_index(&crdt); + let merge_job_index = rebuild_merge_job_index(&crdt); + let active_agent_index = rebuild_active_agent_index(&crdt); + let test_job_index = rebuild_test_job_index(&crdt); + let agent_throttle_index = rebuild_agent_throttle_index(&crdt); + let gateway_project_index = rebuild_gateway_project_index(&crdt); + + // Advance the top-level list clocks to the Lamport floor so that + // list-level inserts don't re-emit low seq numbers. + crdt.doc.items.advance_seq(lamport_floor); + crdt.doc.nodes.advance_seq(lamport_floor); + crdt.doc.tokens.advance_seq(lamport_floor); + crdt.doc.merge_jobs.advance_seq(lamport_floor); + crdt.doc.active_agents.advance_seq(lamport_floor); + crdt.doc.test_jobs.advance_seq(lamport_floor); + crdt.doc.agent_throttle.advance_seq(lamport_floor); + crdt.doc.gateway_projects.advance_seq(lamport_floor); + crdt.doc + .gateway_config + .active_project + .advance_seq(lamport_floor); + + slog!( + "[crdt] Initialised: {} ops replayed, {} items indexed, {} nodes indexed, lamport_floor={}", + rows.len(), + index.len(), + node_index.len(), + lamport_floor, + ); + + // Spawn background persistence task. + let (persist_tx, mut persist_rx) = mpsc::unbounded_channel::(); + + tokio::spawn(async move { + while let Some(op) = persist_rx.recv().await { + let op_json = match serde_json::to_string(&op) { + Ok(j) => j, + Err(e) => { + slog!("[crdt] Failed to serialize op: {e}"); + continue; + } + }; + let op_id = hex::encode(&op.id()); + let seq = op.inner.seq as i64; + let now = chrono::Utc::now().to_rfc3339(); + + let result = sqlx::query( + "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \ + VALUES (?1, ?2, ?3, ?4) \ + ON CONFLICT(op_id) DO NOTHING", + ) + .bind(&op_id) + .bind(seq) + .bind(&op_json) + .bind(&now) + .execute(&pool) + .await; + + if let Err(e) = result { + slog!("[crdt] Failed to persist op {}: {e}", &op_id[..12]); + } + } + }); + + let state = CrdtState { + crdt, + keypair, + index, + node_index, + token_index, + merge_job_index, + active_agent_index, + test_job_index, + agent_throttle_index, + gateway_project_index, + persist_tx, + lamport_floor, + }; + + let _ = CRDT_STATE.set(Mutex::new(state)); + + // Initialise the CRDT event broadcast channel. + let (event_tx, _) = broadcast::channel::(256); + let _ = CRDT_EVENT_TX.set(event_tx); + + // Initialise the sync broadcast channel for outgoing ops. + let (sync_tx, _) = broadcast::channel::(1024); + let _ = SYNC_TX.set(sync_tx); + + Ok(()) +} + +/// Load or create the Ed25519 keypair used by this node. +async fn load_or_create_keypair(pool: &SqlitePool) -> Result { + let row: Option<(Vec,)> = + sqlx::query_as("SELECT seed FROM crdt_node_identity WHERE id = 1") + .fetch_optional(pool) + .await?; + + if let Some((seed,)) = row { + // Reconstruct from stored seed. The seed is the 32-byte private key. + if let Ok(kp) = Ed25519KeyPair::from_bytes(&seed) { + return Ok(kp); + } + slog!("[crdt] Stored keypair invalid, regenerating"); + } + + let kp = make_keypair(); + let seed = kp.as_bytes().to_vec(); + sqlx::query("INSERT INTO crdt_node_identity (id, seed) VALUES (1, ?1) ON CONFLICT(id) DO UPDATE SET seed = excluded.seed") + .bind(&seed) + .execute(pool) + .await?; + + Ok(kp) +} diff --git a/server/src/crdt_state/state/mod.rs b/server/src/crdt_state/state/mod.rs new file mode 100644 index 00000000..822d4d04 --- /dev/null +++ b/server/src/crdt_state/state/mod.rs @@ -0,0 +1,145 @@ +//! Internal CRDT state struct, statics, initialisation, and central write primitive. +//! +//! This module is split into focused submodules: +//! - [`statics`]: broadcast channels and op-tracking statics +//! - [`indices`]: index-rebuild helpers for O(1) key lookup +//! - [`init`]: async startup and keypair persistence +//! - [`apply`]: write path (sign, apply, persist, broadcast) + +use std::collections::HashMap; +use std::sync::{Mutex, OnceLock}; + +use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp}; +use bft_json_crdt::keypair::make_keypair; +use fastcrypto::ed25519::Ed25519KeyPair; +use tokio::sync::{broadcast, mpsc}; + +use super::VectorClock; +use super::types::{CrdtEvent, PipelineDoc}; + +mod apply; +mod indices; +mod init; +mod statics; + +#[cfg(test)] +mod tests; + +// ── Re-exports for crdt_state siblings ────────────────────────────── + +pub use init::init; + +pub(super) use apply::{apply_and_persist, emit_event}; +pub(super) use indices::{ + rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_gateway_project_index, + rebuild_index, rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index, + rebuild_token_index, +}; +pub(crate) use statics::{ALL_OPS, VECTOR_CLOCK}; +pub(super) use statics::{SYNC_TX, track_op}; + +// ── CrdtState struct ───────────────────────────────────────────────── + +/// Holds the core CRDT document, signing keypair, and all O(1) lookup indices. +pub(super) struct CrdtState { + pub(super) crdt: BaseCrdt, + pub(super) keypair: Ed25519KeyPair, + /// Maps story_id → index in the items ListCrdt for O(1) lookup. + pub(super) index: HashMap, + /// Maps node_id (hex) → index in the nodes ListCrdt for O(1) lookup. + pub(super) node_index: HashMap, + /// Maps agent_id → index in the tokens ListCrdt for O(1) lookup. + pub(super) token_index: HashMap, + /// Maps story_id → index in the merge_jobs ListCrdt for O(1) lookup. + pub(super) merge_job_index: HashMap, + /// Maps agent_id → index in the active_agents ListCrdt for O(1) lookup. + pub(super) active_agent_index: HashMap, + /// Maps story_id → index in the test_jobs ListCrdt for O(1) lookup. + pub(super) test_job_index: HashMap, + /// Maps node_id → index in the agent_throttle ListCrdt for O(1) lookup. + pub(super) agent_throttle_index: HashMap, + /// Maps project name → index in the gateway_projects ListCrdt for O(1) lookup. + pub(super) gateway_project_index: HashMap, + /// Channel sender for fire-and-forget op persistence. + pub(super) persist_tx: mpsc::UnboundedSender, + /// Max sequence number seen across all ops during init() replay. + /// + /// Newly-created registers (post-init) must have their Lamport clock + /// advanced to this floor so they don't re-emit low sequence numbers. + pub(super) lamport_floor: u64, +} + +// ── Singleton and accessor ─────────────────────────────────────────── + +/// Process-wide singleton holding the initialised [`CrdtState`]. +pub(super) static CRDT_STATE: OnceLock> = OnceLock::new(); + +#[cfg(test)] +thread_local! { + static CRDT_STATE_TL: OnceLock> = const { OnceLock::new() }; +} + +/// Returns a reference to the global [`CrdtState`] mutex, if initialised. +#[cfg(not(test))] +pub(super) fn get_crdt() -> Option<&'static Mutex> { + CRDT_STATE.get() +} + +/// Returns the thread-local [`CrdtState`] if set, otherwise the global one (test variant). +#[cfg(test)] +pub(super) fn get_crdt() -> Option<&'static Mutex> { + let tl = CRDT_STATE_TL.with(|lock| { + if lock.get().is_some() { + Some(lock as *const OnceLock>) + } else { + None + } + }); + if let Some(ptr) = tl { + // SAFETY: The thread-local lives as long as the thread, which outlives + // any test using it. We only need 'static for the return type. + let lock = unsafe { &*ptr }; + lock.get() + } else { + CRDT_STATE.get() + } +} + +/// Initialise a minimal in-memory CRDT state for unit tests. +/// +/// This avoids the async SQLite setup from `init()`. Ops are accepted via a +/// channel whose receiver is immediately dropped, so nothing is persisted. +/// Safe to call multiple times — subsequent calls are no-ops (OnceLock). +#[cfg(test)] +pub fn init_for_test() { + // Initialise thread-local CRDT for test isolation. + // Only creates a new CRDT if one isn't set yet on this thread; + // subsequent calls are no-ops (matching the old OnceLock semantics + // while keeping each thread isolated). + CRDT_STATE_TL.with(|lock| { + if lock.get().is_none() { + let keypair = make_keypair(); + let crdt = BaseCrdt::::new(&keypair); + let (persist_tx, _rx) = mpsc::unbounded_channel(); + let state = CrdtState { + crdt, + keypair, + index: HashMap::new(), + node_index: HashMap::new(), + token_index: HashMap::new(), + merge_job_index: HashMap::new(), + active_agent_index: HashMap::new(), + test_job_index: HashMap::new(), + agent_throttle_index: HashMap::new(), + gateway_project_index: HashMap::new(), + persist_tx, + lamport_floor: 0, + }; + let _ = lock.set(Mutex::new(state)); + } + }); + let _ = statics::CRDT_EVENT_TX.get_or_init(|| broadcast::channel::(256).0); + let _ = statics::SYNC_TX.get_or_init(|| broadcast::channel::(1024).0); + let _ = statics::ALL_OPS.get_or_init(|| Mutex::new(Vec::new())); + let _ = statics::VECTOR_CLOCK.get_or_init(|| Mutex::new(VectorClock::new())); +} diff --git a/server/src/crdt_state/state/statics.rs b/server/src/crdt_state/state/statics.rs new file mode 100644 index 00000000..4f11b60a --- /dev/null +++ b/server/src/crdt_state/state/statics.rs @@ -0,0 +1,50 @@ +//! Broadcast channels and op-tracking statics for the CRDT state layer. +//! +//! Provides the outbound sync channel ([`SYNC_TX`]), the event broadcast +//! channel ([`CRDT_EVENT_TX`]), and the in-memory op journal +//! ([`ALL_OPS`] / [`VECTOR_CLOCK`]) that tracks every applied op for +//! delta-sync. + +use std::sync::{Mutex, OnceLock}; + +use bft_json_crdt::json_crdt::SignedOp; +use tokio::sync::broadcast; + +use super::super::VectorClock; +use super::super::hex; +use super::super::types::CrdtEvent; + +/// Broadcast channel for CRDT events (stage transitions, etc.). +pub(super) static CRDT_EVENT_TX: OnceLock> = OnceLock::new(); + +/// Broadcast channel for outbound ops to sync peers. +pub(in crate::crdt_state) static SYNC_TX: OnceLock> = OnceLock::new(); + +/// All persisted ops as JSON strings, in causal (insertion) order. +/// +/// Pub(crate) so that `crdt_snapshot` can access it for compaction. +pub(crate) static ALL_OPS: OnceLock>> = OnceLock::new(); + +/// Live vector clock tracking op counts per author. +/// +/// Updated in lockstep with `ALL_OPS` — every time an op is appended to the +/// journal, the corresponding author's count is incremented here. This avoids +/// re-parsing all ops when a peer requests `our_vector_clock()`. +pub(crate) static VECTOR_CLOCK: OnceLock> = OnceLock::new(); + +/// Append an op's JSON to `ALL_OPS` and bump the author's count in `VECTOR_CLOCK`. +/// +/// Centralises the bookkeeping that must stay in sync between the two statics. +pub(in crate::crdt_state) fn track_op(signed: &SignedOp, json: String) { + if let Some(all) = ALL_OPS.get() + && let Ok(mut v) = all.lock() + { + v.push(json); + } + if let Some(vc) = VECTOR_CLOCK.get() + && let Ok(mut clock) = vc.lock() + { + let author_hex = hex::encode(&signed.author()); + *clock.entry(author_hex).or_insert(0) += 1; + } +} diff --git a/server/src/crdt_state/state/tests.rs b/server/src/crdt_state/state/tests.rs new file mode 100644 index 00000000..574f603c --- /dev/null +++ b/server/src/crdt_state/state/tests.rs @@ -0,0 +1,424 @@ +//! Unit and integration tests for the core CRDT state module. +//! +//! Covers op replay, index rebuild, round-trip persistence, the +//! Lamport-floor invariant, and the persist-channel error path. + +use super::super::hex; +use super::super::read::extract_item_view; +use super::super::types::PipelineDoc; +use super::*; +use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue, SignedOp}; +use bft_json_crdt::keypair::make_keypair; +use bft_json_crdt::op::ROOT_ID; +use serde_json::json; +use sqlx::SqlitePool; +use sqlx::sqlite::SqliteConnectOptions; +use std::collections::HashMap; +use tokio::sync::mpsc; + +#[test] +fn crdt_ops_replay_reconstructs_state() { + let kp = make_keypair(); + let mut crdt1 = BaseCrdt::::new(&kp); + + // Build state with a series of ops. + let item_json: JsonValue = json!({ + "story_id": "30_story_replay", + "stage": "1_backlog", + "name": "Replay Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + let op1 = crdt1.doc.items.insert(ROOT_ID, item_json).sign(&kp); + crdt1.apply(op1.clone()); + + let op2 = crdt1.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp); + crdt1.apply(op2.clone()); + + let op3 = crdt1.doc.items[0] + .name + .set("Updated Name".to_string()) + .sign(&kp); + crdt1.apply(op3.clone()); + + // Replay ops on a fresh CRDT. + let mut crdt2 = BaseCrdt::::new(&kp); + crdt2.apply(op1); + crdt2.apply(op2); + crdt2.apply(op3); + + assert_eq!( + crdt1.doc.items[0].stage.view(), + crdt2.doc.items[0].stage.view() + ); + assert_eq!( + crdt1.doc.items[0].name.view(), + crdt2.doc.items[0].name.view() + ); +} + +#[test] +fn rebuild_index_maps_story_ids() { + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + for (sid, stage) in &[("10_story_a", "1_backlog"), ("20_story_b", "2_current")] { + let item: JsonValue = json!({ + "story_id": sid, + "stage": stage, + "name": "", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt.apply(op); + } + + let index = rebuild_index(&crdt); + assert_eq!(index.len(), 2); + assert!(index.contains_key("10_story_a")); + assert!(index.contains_key("20_story_b")); +} + +#[tokio::test] +async fn init_and_write_read_roundtrip() { + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("crdt_test.db"); + + // Init directly (not via the global singleton, for test isolation). + let options = SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = SqlitePool::connect_with(options).await.unwrap(); + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + + let keypair = make_keypair(); + let mut crdt = BaseCrdt::::new(&keypair); + + // Insert and update like write_item does. + let item_json: JsonValue = json!({ + "story_id": "50_story_roundtrip", + "stage": "1_backlog", + "name": "Roundtrip", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&keypair); + crdt.apply(insert_op.clone()); + + // Persist the op. + let op_json = serde_json::to_string(&insert_op).unwrap(); + let op_id = hex::encode(&insert_op.id()); + let now = chrono::Utc::now().to_rfc3339(); + sqlx::query("INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)") + .bind(&op_id) + .bind(insert_op.inner.seq as i64) + .bind(&op_json) + .bind(&now) + .execute(&pool) + .await + .unwrap(); + + // Reconstruct from DB. + let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") + .fetch_all(&pool) + .await + .unwrap(); + + let mut crdt2 = BaseCrdt::::new(&keypair); + for (json_str,) in &rows { + let op: SignedOp = serde_json::from_str(json_str).unwrap(); + crdt2.apply(op); + } + + let view = extract_item_view(&crdt2.doc.items[0]).unwrap(); + assert_eq!(view.story_id, "50_story_roundtrip"); + assert_eq!(view.stage, "1_backlog"); + assert_eq!(view.name.as_deref(), Some("Roundtrip")); +} + +#[test] +fn persist_tx_send_failure_logs_warn_with_op_type_and_seq() { + let kp = make_keypair(); + let crdt = BaseCrdt::::new(&kp); + let (persist_tx, persist_rx) = mpsc::unbounded_channel::(); + + let mut state = CrdtState { + crdt, + keypair: kp, + index: HashMap::new(), + node_index: HashMap::new(), + token_index: HashMap::new(), + merge_job_index: HashMap::new(), + active_agent_index: HashMap::new(), + test_job_index: HashMap::new(), + agent_throttle_index: HashMap::new(), + gateway_project_index: HashMap::new(), + persist_tx, + lamport_floor: 0, + }; + + // Drop the receiver so that the next send fails immediately. + drop(persist_rx); + + let item_json: JsonValue = json!({ + "story_id": "676_story_persist_fail", + "stage": "1_backlog", + "name": "Persist Fail Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + let before_warns = crate::log_buffer::global() + .get_recent_entries(1000, None, Some(&crate::log_buffer::LogLevel::Warn)) + .len(); + + apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); + + let warn_entries = crate::log_buffer::global().get_recent_entries( + 1000, + None, + Some(&crate::log_buffer::LogLevel::Warn), + ); + + assert_eq!( + warn_entries.len(), + before_warns + 1, + "expected exactly one WARN log entry when persist_tx send fails" + ); + + let warn = &warn_entries[warn_entries.len() - 1]; + assert!( + warn.message.contains("[crdt_persist]"), + "WARN message must be prefixed [crdt_persist]: {}", + warn.message + ); + assert!( + warn.message.contains("op_type="), + "WARN message must include op_type: {}", + warn.message + ); + assert!( + warn.message.contains("seq="), + "WARN message must include seq: {}", + warn.message + ); +} + +#[test] +fn persist_tx_send_success_emits_no_warn() { + let kp = make_keypair(); + let crdt = BaseCrdt::::new(&kp); + let (persist_tx, _persist_rx) = mpsc::unbounded_channel::(); + + let mut state = CrdtState { + crdt, + keypair: kp, + index: HashMap::new(), + node_index: HashMap::new(), + token_index: HashMap::new(), + merge_job_index: HashMap::new(), + active_agent_index: HashMap::new(), + test_job_index: HashMap::new(), + agent_throttle_index: HashMap::new(), + gateway_project_index: HashMap::new(), + persist_tx, + lamport_floor: 0, + }; + + let item_json: JsonValue = json!({ + "story_id": "676_story_happy_path", + "stage": "1_backlog", + "name": "Happy Path Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + let before_warns = crate::log_buffer::global() + .get_recent_entries( + 1000, + Some("[crdt_persist]"), + Some(&crate::log_buffer::LogLevel::Warn), + ) + .len(); + + apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); + + let after_warns = crate::log_buffer::global() + .get_recent_entries( + 1000, + Some("[crdt_persist]"), + Some(&crate::log_buffer::LogLevel::Warn), + ) + .len(); + + assert_eq!( + after_warns, before_warns, + "no [crdt_persist] WARN should be emitted when persist_tx send succeeds" + ); +} + +/// After replaying ops from a journal, a brand-new register created +/// post-init must emit its first local op with seq = lamport_floor + 1, +/// not seq = 1. This is the Phase C integration test. +#[tokio::test] +async fn restart_new_register_resumes_from_lamport_floor() { + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("lamport_floor.db"); + + let options = SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = SqlitePool::connect_with(options).await.unwrap(); + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + // Insert an item and update its stage a few times to push seq up. + let item: JsonValue = json!({ + "story_id": "664_story_original", + "stage": "1_backlog", + "name": "Original", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + "merged_at": 0.0, + }) + .into(); + + let mut ops = Vec::new(); + + let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt.apply(op1.clone()); + ops.push(op1); + + let idx = rebuild_index(&crdt)["664_story_original"]; + let op2 = crdt.doc.items[idx] + .stage + .set("2_current".to_string()) + .sign(&kp); + crdt.apply(op2.clone()); + ops.push(op2); + + let op3 = crdt.doc.items[idx] + .stage + .set("3_review".to_string()) + .sign(&kp); + crdt.apply(op3.clone()); + ops.push(op3); + + // Record the max seq across all persisted ops — this is the floor. + let max_seq = ops.iter().map(|o| o.inner.seq).max().unwrap(); + + // Persist all ops. + let now = chrono::Utc::now().to_rfc3339(); + for op in &ops { + let op_json = serde_json::to_string(op).unwrap(); + let op_id = hex::encode(&op.id()); + sqlx::query( + "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", + ) + .bind(&op_id) + .bind(op.inner.seq as i64) + .bind(&op_json) + .bind(&now) + .execute(&pool) + .await + .unwrap(); + } + + // --- Simulate restart: replay from journal into a fresh CRDT --- + let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") + .fetch_all(&pool) + .await + .unwrap(); + + let mut crdt2 = BaseCrdt::::new(&kp); + let mut lamport_floor: u64 = 0; + for (json_str,) in &rows { + let signed: SignedOp = serde_json::from_str(json_str).unwrap(); + lamport_floor = lamport_floor.max(signed.inner.seq); + crdt2.apply(signed); + } + + // Advance top-level lists (mirrors what init() does). + crdt2.doc.items.advance_seq(lamport_floor); + crdt2.doc.nodes.advance_seq(lamport_floor); + + assert_eq!(lamport_floor, max_seq); + + // Insert a brand-new item — simulating a new story arriving after restart. + let new_item: JsonValue = json!({ + "story_id": "664_story_new_after_restart", + "stage": "1_backlog", + "name": "New After Restart", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + "merged_at": 0.0, + }) + .into(); + + let insert_op = crdt2.doc.items.insert(ROOT_ID, new_item); + // The list-level insert must have seq > lamport_floor. + assert!( + insert_op.seq > max_seq, + "list insert seq ({}) must be > lamport_floor ({})", + insert_op.seq, + max_seq, + ); + let insert_signed = insert_op.sign(&kp); + crdt2.apply(insert_signed); + + // Advance the new item's inner registers to the floor (mirrors write_item). + let idx2 = rebuild_index(&crdt2)["664_story_new_after_restart"]; + let new_crdt_item = &mut crdt2.doc.items[idx2]; + new_crdt_item.stage.advance_seq(lamport_floor); + + // Now update the stage — the first field-level op must also be > floor. + let stage_op = crdt2.doc.items[idx2].stage.set("2_current".to_string()); + assert!( + stage_op.seq > max_seq, + "first field op seq ({}) on new register must be > lamport_floor ({}); \ + got seq = 1 means the register reset its clock on restart", + stage_op.seq, + max_seq, + ); +}