diff --git a/server/src/crdt_state.rs b/server/src/crdt_state.rs deleted file mode 100644 index 44df6cc5..00000000 --- a/server/src/crdt_state.rs +++ /dev/null @@ -1,2122 +0,0 @@ -//! CRDT state layer — manages pipeline state as a conflict-free replicated document backed by SQLite. -/// CRDT state layer for pipeline state, backed by SQLite. -/// -/// The CRDT document is the primary source of truth for pipeline item -/// metadata (stage, name, agent, etc.). CRDT ops are persisted to SQLite so -/// state survives restarts. The filesystem `.huskies/work/` directories are -/// still updated as a secondary output for backwards compatibility. -/// -/// Stage transitions detected by `write_item()` are broadcast as [`CrdtEvent`]s -/// so subscribers (auto-assign, WebSocket, notifications) can react without -/// polling the filesystem. -use std::collections::HashMap; -use std::sync::{Mutex, OnceLock}; - -/// A vector clock mapping node IDs (hex-encoded Ed25519 pubkeys) to the count -/// of ops seen from that node. Used for delta sync — a connecting peer sends -/// its clock so the other side can compute which ops are missing. -pub type VectorClock = HashMap; - -use bft_json_crdt::json_crdt::*; -use bft_json_crdt::keypair::make_keypair; -use bft_json_crdt::list_crdt::ListCrdt; -use bft_json_crdt::lww_crdt::LwwRegisterCrdt; -use bft_json_crdt::op::ROOT_ID; -use fastcrypto::ed25519::Ed25519KeyPair; -use fastcrypto::traits::ToFromBytes; -use serde_json::json; -use sqlx::SqlitePool; -use sqlx::sqlite::SqliteConnectOptions; -use std::path::Path; -use tokio::sync::{broadcast, mpsc}; - -use crate::slog; - -// ── CRDT events ───────────────────────────────────────────────────── - -/// An event emitted when a pipeline item's stage changes in the CRDT document. -#[derive(Clone, Debug)] -pub struct CrdtEvent { - /// Work item ID (e.g. `"42_story_my_feature"`). - pub story_id: String, - /// The stage the item was in before this transition, or `None` for new items. - pub from_stage: Option, - /// The stage the item is now in. - pub to_stage: String, - /// Human-readable story name from the CRDT document. - pub name: Option, -} - -/// Subscribe to CRDT state transition events. -/// -/// Returns `None` if the CRDT layer has not been initialised yet. -pub fn subscribe() -> Option> { - CRDT_EVENT_TX.get().map(|tx| tx.subscribe()) -} - -static CRDT_EVENT_TX: OnceLock> = OnceLock::new(); - -// ── Sync broadcast (outgoing ops to peers) ────────────────────────── - -static SYNC_TX: OnceLock> = OnceLock::new(); - -/// Subscribe to locally-created CRDT ops for sync replication. -/// -/// Each `SignedOp` broadcast here was created by *this* node and should be -/// forwarded to connected peers. Returns `None` before `init()`. -pub fn subscribe_ops() -> Option> { - SYNC_TX.get().map(|tx| tx.subscribe()) -} - -/// Return all persisted `SignedOp`s in causal order (oldest first). -/// -/// Used during initial sync handshake so a newly-connected peer can -/// reconstruct the full CRDT state. Returns `None` before `init()`. -pub fn all_ops_json() -> Option> { - ALL_OPS.get().map(|m| m.lock().unwrap().clone()) -} - -/// Return this node's current vector clock. -/// -/// The clock maps each author's hex-encoded Ed25519 public key to the count -/// of ops received from that author. A connecting peer sends its clock so -/// the other side can compute which ops are missing via [`ops_since`]. -/// -/// Returns `None` before `init()`. -pub fn our_vector_clock() -> Option { - VECTOR_CLOCK.get().map(|m| m.lock().unwrap().clone()) -} - -/// Return only the ops that a peer with the given `peer_clock` is missing. -/// -/// Iterates the local op journal and, for each author, skips the first N ops -/// (where N = `peer_clock[author]`) and returns the rest. An empty peer -/// clock returns all ops (full sync for new nodes). -/// -/// Returns `None` before `init()`. -pub fn ops_since(peer_clock: &VectorClock) -> Option> { - let all = ALL_OPS.get()?.lock().ok()?; - let mut author_counts: HashMap = HashMap::new(); - let mut result = Vec::new(); - - for op_json in all.iter() { - if let Ok(signed_op) = serde_json::from_str::(op_json) { - let author_hex = hex::encode(&signed_op.author()); - let count = author_counts.entry(author_hex.clone()).or_insert(0); - *count += 1; - - let peer_has = peer_clock.get(&author_hex).copied().unwrap_or(0); - if *count > peer_has { - result.push(op_json.clone()); - } - } - } - - Some(result) -} - -/// All persisted ops as JSON strings, in causal (insertion) order. -/// -/// Pub(crate) so that `crdt_snapshot` can access it for compaction. -pub(crate) static ALL_OPS: OnceLock>> = OnceLock::new(); - -/// Live vector clock tracking op counts per author. -/// -/// Updated in lockstep with `ALL_OPS` — every time an op is appended to the -/// journal, the corresponding author's count is incremented here. This avoids -/// re-parsing all ops when a peer requests `our_vector_clock()`. -/// Pub(crate) so that `crdt_snapshot` can access it for clock rebuild during compaction. -pub(crate) static VECTOR_CLOCK: OnceLock> = OnceLock::new(); - -/// Append an op's JSON to `ALL_OPS` and bump the author's count in `VECTOR_CLOCK`. -/// -/// Centralises the bookkeeping that must stay in sync between the two statics. -fn track_op(signed: &SignedOp, json: String) { - if let Some(all) = ALL_OPS.get() - && let Ok(mut v) = all.lock() - { - v.push(json); - } - if let Some(vc) = VECTOR_CLOCK.get() - && let Ok(mut clock) = vc.lock() - { - let author_hex = hex::encode(&signed.author()); - *clock.entry(author_hex).or_insert(0) += 1; - } -} - -// ── CRDT document types ────────────────────────────────────────────── - -#[add_crdt_fields] -#[derive(Clone, CrdtNode, Debug)] -pub struct PipelineDoc { - pub items: ListCrdt, - pub nodes: ListCrdt, -} - -#[add_crdt_fields] -#[derive(Clone, CrdtNode, Debug)] -pub struct PipelineItemCrdt { - pub story_id: LwwRegisterCrdt, - pub stage: LwwRegisterCrdt, - pub name: LwwRegisterCrdt, - pub agent: LwwRegisterCrdt, - pub retry_count: LwwRegisterCrdt, - pub blocked: LwwRegisterCrdt, - pub depends_on: LwwRegisterCrdt, - /// Node ID (hex-encoded Ed25519 pubkey) of the node that claimed this item. - /// Used for distributed work claiming — the LWW register resolves conflicts - /// deterministically so all nodes converge on the same claimer. - pub claimed_by: LwwRegisterCrdt, - /// Unix timestamp (seconds) when the claim was written. - /// Used for timeout-based reclaim: if a node crashes, other nodes can - /// reclaim the item after the timeout expires. - pub claimed_at: LwwRegisterCrdt, - /// Unix timestamp (seconds) when the item was merged to master. - /// Written once when the item transitions to `5_done`. Used by the - /// sweep loop to determine when to promote to `6_archived`. - pub merged_at: LwwRegisterCrdt, -} - -/// CRDT node that holds a single peer's presence entry. -#[add_crdt_fields] -#[derive(Clone, CrdtNode, Debug)] -pub struct NodePresenceCrdt { - /// Hex-encoded Ed25519 public key — stable identity across restarts. - pub node_id: LwwRegisterCrdt, - /// WebSocket URL this peer advertises, e.g. `ws://192.168.1.10:3001/crdt-sync`. - pub address: LwwRegisterCrdt, - /// Unix timestamp (seconds) of the last heartbeat written by this node. - pub last_seen: LwwRegisterCrdt, - /// `false` once a stale-detection pass has tombstoned this node. - pub alive: LwwRegisterCrdt, -} - -// ── Read-side view types ───────────────────────────────────────────── - -/// A snapshot of a single pipeline item derived from the CRDT document. -#[derive(Clone, Debug)] -pub struct PipelineItemView { - pub story_id: String, - pub stage: String, - pub name: Option, - pub agent: Option, - pub retry_count: Option, - pub blocked: Option, - pub depends_on: Option>, - /// Node ID of the node that claimed this item (hex-encoded Ed25519 pubkey). - pub claimed_by: Option, - /// Unix timestamp when the item was claimed. - pub claimed_at: Option, - /// Unix timestamp (seconds) when the item was merged to master. - /// `None` for items that were never in `5_done` or for legacy items. - pub merged_at: Option, -} - -/// A snapshot of a single node presence entry derived from the CRDT document. -#[derive(Clone, Debug)] -pub struct NodePresenceView { - pub node_id: String, - pub address: String, - /// Unix timestamp (seconds). - pub last_seen: f64, - pub alive: bool, -} - -// ── Internal state ─────────────────────────────────────────────────── - -struct CrdtState { - crdt: BaseCrdt, - keypair: Ed25519KeyPair, - /// Maps story_id → index in the items ListCrdt for O(1) lookup. - index: HashMap, - /// Maps node_id (hex) → index in the nodes ListCrdt for O(1) lookup. - node_index: HashMap, - /// Channel sender for fire-and-forget op persistence. - persist_tx: mpsc::UnboundedSender, -} - -static CRDT_STATE: OnceLock> = OnceLock::new(); - -#[cfg(test)] -thread_local! { - static CRDT_STATE_TL: OnceLock> = const { OnceLock::new() }; -} - -#[cfg(not(test))] -fn get_crdt() -> Option<&'static Mutex> { - CRDT_STATE.get() -} - -#[cfg(test)] -fn get_crdt() -> Option<&'static Mutex> { - let tl = CRDT_STATE_TL.with(|lock| { - if lock.get().is_some() { - Some(lock as *const OnceLock>) - } else { - None - } - }); - if let Some(ptr) = tl { - // SAFETY: The thread-local lives as long as the thread, which outlives - // any test using it. We only need 'static for the return type. - let lock = unsafe { &*ptr }; - lock.get() - } else { - CRDT_STATE.get() - } -} - -// ── Initialisation ─────────────────────────────────────────────────── - -/// Initialise the CRDT state layer. -/// -/// Opens the SQLite database, loads or creates a node keypair, replays any -/// persisted ops to reconstruct state, and spawns a background persistence -/// task. Safe to call only once; subsequent calls are no-ops. -pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { - if CRDT_STATE.get().is_some() { - return Ok(()); - } - - let options = SqliteConnectOptions::new() - .filename(db_path) - .create_if_missing(true); - let pool = SqlitePool::connect_with(options).await?; - sqlx::migrate!("./migrations").run(&pool).await?; - - // Load or create the node keypair. - let keypair = load_or_create_keypair(&pool).await?; - let mut crdt = BaseCrdt::::new(&keypair); - - // Replay persisted ops to reconstruct state. - let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") - .fetch_all(&pool) - .await?; - - let mut all_ops_vec = Vec::with_capacity(rows.len()); - let mut vector_clock = VectorClock::new(); - for (op_json,) in &rows { - if let Ok(signed_op) = serde_json::from_str::(op_json) { - let author_hex = hex::encode(&signed_op.author()); - *vector_clock.entry(author_hex).or_insert(0) += 1; - crdt.apply(signed_op); - all_ops_vec.push(op_json.clone()); - } else { - slog!("[crdt] Warning: failed to deserialize stored op"); - } - } - let _ = ALL_OPS.set(Mutex::new(all_ops_vec)); - let _ = VECTOR_CLOCK.set(Mutex::new(vector_clock)); - - // Build the indices from the reconstructed state. - let index = rebuild_index(&crdt); - let node_index = rebuild_node_index(&crdt); - - slog!( - "[crdt] Initialised: {} ops replayed, {} items indexed, {} nodes indexed", - rows.len(), - index.len(), - node_index.len() - ); - - // Spawn background persistence task. - let (persist_tx, mut persist_rx) = mpsc::unbounded_channel::(); - - tokio::spawn(async move { - while let Some(op) = persist_rx.recv().await { - let op_json = match serde_json::to_string(&op) { - Ok(j) => j, - Err(e) => { - slog!("[crdt] Failed to serialize op: {e}"); - continue; - } - }; - let op_id = hex::encode(&op.id()); - let seq = op.inner.seq as i64; - let now = chrono::Utc::now().to_rfc3339(); - - let result = sqlx::query( - "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \ - VALUES (?1, ?2, ?3, ?4) \ - ON CONFLICT(op_id) DO NOTHING", - ) - .bind(&op_id) - .bind(seq) - .bind(&op_json) - .bind(&now) - .execute(&pool) - .await; - - if let Err(e) = result { - slog!("[crdt] Failed to persist op {}: {e}", &op_id[..12]); - } - } - }); - - let state = CrdtState { - crdt, - keypair, - index, - node_index, - persist_tx, - }; - - let _ = CRDT_STATE.set(Mutex::new(state)); - - // Initialise the CRDT event broadcast channel. - let (event_tx, _) = broadcast::channel::(256); - let _ = CRDT_EVENT_TX.set(event_tx); - - // Initialise the sync broadcast channel for outgoing ops. - let (sync_tx, _) = broadcast::channel::(1024); - let _ = SYNC_TX.set(sync_tx); - - Ok(()) -} - -/// Initialise a minimal in-memory CRDT state for unit tests. -/// -/// This avoids the async SQLite setup from `init()`. Ops are accepted via a -/// channel whose receiver is immediately dropped, so nothing is persisted. -/// Safe to call multiple times — subsequent calls are no-ops (OnceLock). -#[cfg(test)] -pub fn init_for_test() { - // Initialise thread-local CRDT for test isolation. - // Only creates a new CRDT if one isn't set yet on this thread; - // subsequent calls are no-ops (matching the old OnceLock semantics - // while keeping each thread isolated). - CRDT_STATE_TL.with(|lock| { - if lock.get().is_none() { - let keypair = make_keypair(); - let crdt = BaseCrdt::::new(&keypair); - let (persist_tx, _rx) = mpsc::unbounded_channel(); - let state = CrdtState { - crdt, - keypair, - index: HashMap::new(), - node_index: HashMap::new(), - persist_tx, - }; - let _ = lock.set(Mutex::new(state)); - } - }); - let _ = CRDT_EVENT_TX.get_or_init(|| broadcast::channel::(256).0); - let _ = SYNC_TX.get_or_init(|| broadcast::channel::(1024).0); - let _ = ALL_OPS.get_or_init(|| Mutex::new(Vec::new())); - let _ = VECTOR_CLOCK.get_or_init(|| Mutex::new(VectorClock::new())); -} - -/// Load or create the Ed25519 keypair used by this node. -async fn load_or_create_keypair(pool: &SqlitePool) -> Result { - let row: Option<(Vec,)> = - sqlx::query_as("SELECT seed FROM crdt_node_identity WHERE id = 1") - .fetch_optional(pool) - .await?; - - if let Some((seed,)) = row { - // Reconstruct from stored seed. The seed is the 32-byte private key. - if let Ok(kp) = Ed25519KeyPair::from_bytes(&seed) { - return Ok(kp); - } - slog!("[crdt] Stored keypair invalid, regenerating"); - } - - let kp = make_keypair(); - let seed = kp.as_bytes().to_vec(); - sqlx::query("INSERT INTO crdt_node_identity (id, seed) VALUES (1, ?1) ON CONFLICT(id) DO UPDATE SET seed = excluded.seed") - .bind(&seed) - .execute(pool) - .await?; - - Ok(kp) -} - -/// Rebuild the story_id → list index mapping from the current CRDT state. -fn rebuild_index(crdt: &BaseCrdt) -> HashMap { - let mut map = HashMap::new(); - for (i, item) in crdt.doc.items.iter().enumerate() { - if let JsonValue::String(ref sid) = item.story_id.view() { - map.insert(sid.clone(), i); - } - } - map -} - -/// Rebuild the node_id → nodes list index mapping from the current CRDT state. -fn rebuild_node_index(crdt: &BaseCrdt) -> HashMap { - let mut map = HashMap::new(); - for (i, node) in crdt.doc.nodes.iter().enumerate() { - if let JsonValue::String(ref nid) = node.node_id.view() { - map.insert(nid.clone(), i); - } - } - map -} - -// ── Write path ─────────────────────────────────────────────────────── - -/// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the -/// persistence channel. The closure receives `&mut CrdtState` so it can -/// mutably access the CRDT document, while `sign` only needs `&keypair`. -fn apply_and_persist(state: &mut CrdtState, op_fn: F) -where - F: FnOnce(&mut CrdtState) -> bft_json_crdt::op::Op, -{ - let raw_op = op_fn(state); - let signed = raw_op.sign(&state.keypair); - state.crdt.apply(signed.clone()); - if let Err(e) = state.persist_tx.send(signed.clone()) { - crate::slog_error!( - "[crdt] Failed to send op to persist task: {e}; persist task may be dead. \ - In-memory state is now ahead of persisted state." - ); - } - - // Track in ALL_OPS + VECTOR_CLOCK, then broadcast to sync peers. - if let Ok(json) = serde_json::to_string(&signed) { - track_op(&signed, json); - } - if let Some(tx) = SYNC_TX.get() { - let _ = tx.send(signed); - } -} - -/// Write a pipeline item state through CRDT operations. -/// -/// If the item exists, updates its registers. If not, inserts a new item -/// into the list. All ops are signed and persisted to SQLite. -/// -/// When the stage changes (or a new item is created), a [`CrdtEvent`] is -/// broadcast so subscribers can react to the transition. -#[allow(clippy::too_many_arguments)] -pub fn write_item( - story_id: &str, - stage: &str, - name: Option<&str>, - agent: Option<&str>, - retry_count: Option, - blocked: Option, - depends_on: Option<&str>, - claimed_by: Option<&str>, - claimed_at: Option, - merged_at: Option, -) { - let Some(state_mutex) = get_crdt() else { - return; - }; - let Ok(mut state) = state_mutex.lock() else { - return; - }; - - if let Some(&idx) = state.index.get(story_id) { - // Capture the old stage before updating so we can detect transitions. - let old_stage = match state.crdt.doc.items[idx].stage.view() { - JsonValue::String(s) => Some(s), - _ => None, - }; - - // Update existing item registers. - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].stage.set(stage.to_string()) - }); - - if let Some(n) = name { - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].name.set(n.to_string()) - }); - } - if let Some(a) = agent { - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].agent.set(a.to_string()) - }); - } - if let Some(rc) = retry_count { - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].retry_count.set(rc as f64) - }); - } - if let Some(b) = blocked { - apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].blocked.set(b)); - } - if let Some(d) = depends_on { - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].depends_on.set(d.to_string()) - }); - } - if let Some(cb) = claimed_by { - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].claimed_by.set(cb.to_string()) - }); - } - if let Some(ca) = claimed_at { - apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(ca)); - } - if let Some(ma) = merged_at { - apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma)); - } - - // Broadcast a CrdtEvent if the stage actually changed. - let stage_changed = old_stage.as_deref() != Some(stage); - if stage_changed { - // Read the current name from the CRDT document for the event. - let current_name = match state.crdt.doc.items[idx].name.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - emit_event(CrdtEvent { - story_id: story_id.to_string(), - from_stage: old_stage, - to_stage: stage.to_string(), - name: current_name, - }); - } - } else { - // Insert new item. - let item_json: JsonValue = json!({ - "story_id": story_id, - "stage": stage, - "name": name.unwrap_or(""), - "agent": agent.unwrap_or(""), - "retry_count": retry_count.unwrap_or(0) as f64, - "blocked": blocked.unwrap_or(false), - "depends_on": depends_on.unwrap_or(""), - "claimed_by": claimed_by.unwrap_or(""), - "claimed_at": claimed_at.unwrap_or(0.0), - "merged_at": merged_at.unwrap_or(0.0), - }) - .into(); - - apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); - - // Rebuild index after insertion (indices may shift). - state.index = rebuild_index(&state.crdt); - - // Broadcast a CrdtEvent for the new item. - emit_event(CrdtEvent { - story_id: story_id.to_string(), - from_stage: None, - to_stage: stage.to_string(), - name: name.map(String::from), - }); - } -} - -/// Broadcast a CRDT event to all subscribers. -fn emit_event(event: CrdtEvent) { - if let Some(tx) = CRDT_EVENT_TX.get() { - let _ = tx.send(event); - } -} - -// ── Remote op ingestion (from sync peers) ─────────────────────────── - -/// Apply a `SignedOp` received from a remote peer. -/// -/// The op is validated, applied to the local CRDT, persisted to SQLite, -/// and any resulting stage transitions are broadcast as [`CrdtEvent`]s. -/// Unlike `apply_and_persist`, this does **not** re-broadcast the op on -/// the sync channel (to avoid infinite echo loops). -/// -/// Returns `true` if the op was new and applied, `false` if it was a -/// duplicate or failed validation. -pub fn apply_remote_op(op: SignedOp) -> bool { - let Some(state_mutex) = get_crdt() else { - return false; - }; - let Ok(mut state) = state_mutex.lock() else { - return false; - }; - - // Snapshot stage state before applying so we can detect transitions. - let pre_stages: HashMap = state - .index - .iter() - .filter_map(|(sid, &idx)| match state.crdt.doc.items[idx].stage.view() { - JsonValue::String(s) => Some((sid.clone(), s)), - _ => None, - }) - .collect(); - - let result = state.crdt.apply(op.clone()); - - // Self-loop guard: op was already applied (came back via echo from peer). - // Return false immediately — do not re-persist or re-add to ALL_OPS. - if result == bft_json_crdt::json_crdt::OpState::AlreadySeen { - return false; - } - - if result != bft_json_crdt::json_crdt::OpState::Ok - && result != bft_json_crdt::json_crdt::OpState::MissingCausalDependencies - { - return false; - } - - // Persist the op. - if let Err(e) = state.persist_tx.send(op.clone()) { - crate::slog_error!( - "[crdt] Failed to send remote op to persist task: {e}; persist task may be dead. \ - In-memory state is now ahead of persisted state." - ); - } - - // Track in ALL_OPS + VECTOR_CLOCK. - if let Ok(json) = serde_json::to_string(&op) { - track_op(&op, json); - } - - // Rebuild indices (new items or nodes may have been inserted). - state.index = rebuild_index(&state.crdt); - state.node_index = rebuild_node_index(&state.crdt); - - // Detect and broadcast stage transitions. - for (sid, &idx) in &state.index { - let new_stage = match state.crdt.doc.items[idx].stage.view() { - JsonValue::String(s) => s, - _ => continue, - }; - let old_stage = pre_stages.get(sid).cloned(); - let changed = old_stage.as_deref() != Some(&new_stage); - if changed { - let name = match state.crdt.doc.items[idx].name.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - emit_event(CrdtEvent { - story_id: sid.clone(), - from_stage: old_stage, - to_stage: new_stage, - name, - }); - } - } - - true -} - -// ── Node presence API ──────────────────────────────────────────────── - -/// Return the hex-encoded Ed25519 public key for this node. -/// -/// Used as the stable identity written into the CRDT nodes list. -/// Returns `None` before `init()`. -pub fn our_node_id() -> Option { - let state = get_crdt()?.lock().ok()?; - Some(hex::encode(&state.crdt.id)) -} - -/// Sign a challenge nonce with this node's keypair for WebSocket mutual auth. -/// -/// Returns `(pubkey_hex, signature_hex)` or `None` before `init()`. -pub fn sign_challenge(challenge: &str) -> Option<(String, String)> { - let state = get_crdt()?.lock().ok()?; - let pubkey_hex = crate::node_identity::public_key_hex(&state.keypair); - let sig_hex = crate::node_identity::sign_challenge(&state.keypair, challenge); - Some((pubkey_hex, sig_hex)) -} - -/// Write a claim on a pipeline item via CRDT. -/// -/// Sets `claimed_by` to this node's ID and `claimed_at` to the current time. -/// The LWW register ensures deterministic conflict resolution — if two nodes -/// claim the same item simultaneously, both will converge to the same winner -/// after CRDT sync. -/// -/// Returns `true` if the claim was written, `false` if the item doesn't exist -/// or CRDT is not initialised. -pub fn write_claim(story_id: &str) -> bool { - let Some(node_id) = our_node_id() else { - return false; - }; - let now = chrono::Utc::now().timestamp() as f64; - - let Some(state_mutex) = get_crdt() else { - return false; - }; - let Ok(mut state) = state_mutex.lock() else { - return false; - }; - - let Some(&idx) = state.index.get(story_id) else { - return false; - }; - - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].claimed_by.set(node_id.clone()) - }); - apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(now)); - - true -} - -/// Release a claim on a pipeline item (clear claimed_by and claimed_at). -pub fn release_claim(story_id: &str) { - let Some(state_mutex) = get_crdt() else { - return; - }; - let Ok(mut state) = state_mutex.lock() else { - return; - }; - let Some(&idx) = state.index.get(story_id) else { - return; - }; - - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].claimed_by.set(String::new()) - }); - apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(0.0)); -} - -/// Check if this node currently holds the claim on a pipeline item. -pub fn is_claimed_by_us(story_id: &str) -> bool { - let Some(node_id) = our_node_id() else { - return false; - }; - let Some(item) = read_item(story_id) else { - return false; - }; - item.claimed_by.as_deref() == Some(&node_id) -} - -/// Write or update a node presence entry in the CRDT. -/// -/// If a node with the given `node_id` already exists, only `last_seen`, -/// `alive`, and `address` are updated. If not, a new entry is inserted. -/// -/// This is the write path for both local heartbeats and tombstoning. -pub fn write_node_presence(node_id: &str, address: &str, last_seen: f64, alive: bool) { - let Some(state_mutex) = get_crdt() else { - return; - }; - let Ok(mut state) = state_mutex.lock() else { - return; - }; - - if let Some(&idx) = state.node_index.get(node_id) { - // Update existing entry — three separate ops so peers can merge independently. - apply_and_persist(&mut state, |s| { - s.crdt.doc.nodes[idx].last_seen.set(last_seen) - }); - apply_and_persist(&mut state, |s| s.crdt.doc.nodes[idx].alive.set(alive)); - apply_and_persist(&mut state, |s| { - s.crdt.doc.nodes[idx].address.set(address.to_string()) - }); - } else { - // Insert new node entry. - let node_json: JsonValue = json!({ - "node_id": node_id, - "address": address, - "last_seen": last_seen, - "alive": alive, - }) - .into(); - - apply_and_persist(&mut state, |s| s.crdt.doc.nodes.insert(ROOT_ID, node_json)); - - // Rebuild node index after insertion. - state.node_index = rebuild_node_index(&state.crdt); - } -} - -/// Read all node presence entries from the CRDT document. -/// -/// Returns `None` before `init()`. -pub fn read_all_node_presence() -> Option> { - let state_mutex = get_crdt()?; - let state = state_mutex.lock().ok()?; - - let mut nodes = Vec::new(); - for node_crdt in state.crdt.doc.nodes.iter() { - if let Some(view) = extract_node_view(node_crdt) { - nodes.push(view); - } - } - Some(nodes) -} - -/// Extract a `NodePresenceView` from a `NodePresenceCrdt`. -fn extract_node_view(node: &NodePresenceCrdt) -> Option { - let node_id = match node.node_id.view() { - JsonValue::String(s) if !s.is_empty() => s, - _ => return None, - }; - let address = match node.address.view() { - JsonValue::String(s) if !s.is_empty() => s, - _ => return None, - }; - let last_seen = match node.last_seen.view() { - JsonValue::Number(n) => n, - _ => 0.0, - }; - let alive = match node.alive.view() { - JsonValue::Bool(b) => b, - _ => true, - }; - Some(NodePresenceView { - node_id, - address, - last_seen, - alive, - }) -} - -// ── Debug dump ─────────────────────────────────────────────────────── - -/// A raw dump of a single CRDT list entry, including deleted items. -/// -/// Use `content_index` (hex of the list insert `OpId`) to cross-reference -/// with rows in the `crdt_ops` SQLite table. -pub struct CrdtItemDump { - pub story_id: Option, - pub stage: Option, - pub name: Option, - pub agent: Option, - pub retry_count: Option, - pub blocked: Option, - pub depends_on: Option>, - pub claimed_by: Option, - pub claimed_at: Option, - /// Hex-encoded OpId of the list insert op — cross-reference with `crdt_ops`. - pub content_index: String, - pub is_deleted: bool, -} - -/// Top-level debug dump of the in-memory CRDT state. -pub struct CrdtStateDump { - pub in_memory_state_loaded: bool, - /// Count of non-deleted items with a valid story_id and stage. - pub total_items: usize, - /// Total list-level ops seen (excludes root sentinel). - pub total_ops_in_list: usize, - /// Highest Lamport sequence number seen across all list-level ops. - pub max_seq_in_list: u64, - /// Count of ops in the ALL_OPS journal (persisted ops replayed at startup). - pub persisted_ops_count: usize, - pub items: Vec, -} - -/// Dump the raw in-memory CRDT state for debugging. -/// -/// Unlike [`read_all_items`] this includes tombstoned (deleted) entries and -/// exposes internal op metadata (content_index, seq). Pass a `story_id` -/// filter to restrict the output to a single item. -/// -/// **This is a debug tool.** For normal pipeline introspection use -/// [`read_all_items`] or the `get_pipeline_status` MCP tool instead. -pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump { - let in_memory_state_loaded = get_crdt().is_some(); - - let persisted_ops_count = ALL_OPS - .get() - .and_then(|m| m.lock().ok().map(|v| v.len())) - .unwrap_or(0); - - let Some(state_mutex) = get_crdt() else { - return CrdtStateDump { - in_memory_state_loaded, - total_items: 0, - total_ops_in_list: 0, - max_seq_in_list: 0, - persisted_ops_count, - items: Vec::new(), - }; - }; - - let Ok(state) = state_mutex.lock() else { - return CrdtStateDump { - in_memory_state_loaded, - total_items: 0, - total_ops_in_list: 0, - max_seq_in_list: 0, - persisted_ops_count, - items: Vec::new(), - }; - }; - - let total_items = state.crdt.doc.items.iter().count(); - - let max_seq_in_list = state - .crdt - .doc - .items - .ops - .iter() - .map(|op| op.seq) - .max() - .unwrap_or(0); - - // Subtract 1 for the root sentinel. - let total_ops_in_list = state.crdt.doc.items.ops.len().saturating_sub(1); - - let mut items = Vec::new(); - for op in &state.crdt.doc.items.ops { - // Skip root sentinel (id == [0u8; 32]). - if op.id == ROOT_ID { - continue; - } - let Some(ref item_crdt) = op.content else { - // No content — skip (orphaned slot, should not happen in normal use). - continue; - }; - - let story_id = match item_crdt.story_id.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - - // Apply story_id filter before doing any further work. - if let Some(filter) = story_id_filter - && story_id.as_deref() != Some(filter) - { - continue; - } - - let stage = match item_crdt.stage.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - let name = match item_crdt.name.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - let agent = match item_crdt.agent.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - let retry_count = match item_crdt.retry_count.view() { - JsonValue::Number(n) if n > 0.0 => Some(n as i64), - _ => None, - }; - let blocked = match item_crdt.blocked.view() { - JsonValue::Bool(b) => Some(b), - _ => None, - }; - let depends_on = match item_crdt.depends_on.view() { - JsonValue::String(s) if !s.is_empty() => serde_json::from_str::>(&s).ok(), - _ => None, - }; - - let claimed_by = match item_crdt.claimed_by.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - let claimed_at = match item_crdt.claimed_at.view() { - JsonValue::Number(n) if n > 0.0 => Some(n), - _ => None, - }; - - let content_index = op.id.iter().map(|b| format!("{b:02x}")).collect::(); - - items.push(CrdtItemDump { - story_id, - stage, - name, - agent, - retry_count, - blocked, - depends_on, - claimed_by, - claimed_at, - content_index, - is_deleted: op.is_deleted, - }); - } - - CrdtStateDump { - in_memory_state_loaded, - total_items, - total_ops_in_list, - max_seq_in_list, - persisted_ops_count, - items, - } -} - -// ── Read path ──────────────────────────────────────────────────────── - -/// Read the full pipeline state from the CRDT document. -/// -/// Returns items grouped by stage, or `None` if the CRDT layer is not -/// initialised. -pub fn read_all_items() -> Option> { - let state_mutex = get_crdt()?; - let state = state_mutex.lock().ok()?; - - // Only return items that appear in the deduplicated index. - // The index maps story_id → visible_index and represents the - // latest-wins view of each story. Iterating raw CRDT entries - // would return stale duplicates from earlier stage writes. - let mut items = Vec::with_capacity(state.index.len()); - for &idx in state.index.values() { - if let Some(view) = extract_item_view(&state.crdt.doc.items[idx]) { - items.push(view); - } - } - Some(items) -} - -/// Read a single pipeline item by story_id. -pub fn read_item(story_id: &str) -> Option { - let state_mutex = get_crdt()?; - let state = state_mutex.lock().ok()?; - let &idx = state.index.get(story_id)?; - extract_item_view(&state.crdt.doc.items[idx]) -} - -/// Mark a story as deleted in the in-memory CRDT and persist a tombstone op. -/// -/// This is the eviction primitive for story 521 — it lets external callers -/// (e.g. the `purge_story` MCP tool, or operator scripts during incident -/// response) clear an item from the running server's in-memory state -/// without needing a full process restart. -/// -/// Specifically: -/// 1. Looks up the item's CRDT `OpId` via the visible-index map. -/// 2. Constructs a delete op via the bft-json-crdt list `delete()` primitive. -/// 3. Signs it with the local node's keypair and applies it to the in-memory -/// CRDT (marking the item `is_deleted = true` so subsequent -/// `read_all_items` / `read_item` calls skip it). -/// 4. Persists the signed delete op to `crdt_ops` via the existing -/// `apply_and_persist` channel — so the eviction survives a restart. -/// 5. Rebuilds the `story_id → visible_index` map (visible indices shift -/// when an item is marked deleted). -/// 6. Drops the in-memory content-store entry for the story so the cached -/// body doesn't outlive the CRDT entry. -/// -/// Returns `Ok(())` if the item was found and a tombstone op was queued, -/// or an `Err` if the CRDT layer isn't initialised or the story_id is -/// unknown to the in-memory state. -pub fn evict_item(story_id: &str) -> Result<(), String> { - let state_mutex = get_crdt().ok_or_else(|| "CRDT layer not initialised".to_string())?; - let mut state = state_mutex - .lock() - .map_err(|e| format!("CRDT lock poisoned: {e}"))?; - - let idx = state - .index - .get(story_id) - .copied() - .ok_or_else(|| format!("Story '{story_id}' not found in in-memory CRDT"))?; - - // Resolve the item's OpId before the closure (the closure will mutably - // borrow `state`, so we can't access `state.crdt.doc.items` from inside). - let item_id = - state.crdt.doc.items.id_at(idx).ok_or_else(|| { - format!("Item index {idx} for '{story_id}' did not resolve to an OpId") - })?; - - // Write the delete op via the existing apply_and_persist machinery. - // This signs the op, applies it to the in-memory CRDT (marking the item - // is_deleted), and sends it to the persistence task. - apply_and_persist(&mut state, |s| s.crdt.doc.items.delete(item_id)); - - // Rebuild the story_id → visible_index map; the deleted item is no - // longer counted by the iter that rebuild_index uses. - state.index = rebuild_index(&state.crdt); - - // Drop the content-store entry so the cached body doesn't outlive the - // CRDT entry. (Bug 521 follow-up: when CONTENT_STORE becomes a true - // lazy cache, this explicit eviction can go away.) - crate::db::delete_content(story_id); - - Ok(()) -} - -/// Extract a `PipelineItemView` from a `PipelineItemCrdt`. -fn extract_item_view(item: &PipelineItemCrdt) -> Option { - let story_id = match item.story_id.view() { - JsonValue::String(s) if !s.is_empty() => s, - _ => return None, - }; - let stage = match item.stage.view() { - JsonValue::String(s) if !s.is_empty() => s, - _ => return None, - }; - let name = match item.name.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - let agent = match item.agent.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - let retry_count = match item.retry_count.view() { - JsonValue::Number(n) if n > 0.0 => Some(n as i64), - _ => None, - }; - let blocked = match item.blocked.view() { - JsonValue::Bool(b) => Some(b), - _ => None, - }; - let depends_on = match item.depends_on.view() { - JsonValue::String(s) if !s.is_empty() => serde_json::from_str::>(&s).ok(), - _ => None, - }; - - let claimed_by = match item.claimed_by.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - let claimed_at = match item.claimed_at.view() { - JsonValue::Number(n) if n > 0.0 => Some(n), - _ => None, - }; - let merged_at = match item.merged_at.view() { - JsonValue::Number(n) if n > 0.0 => Some(n), - _ => None, - }; - - Some(PipelineItemView { - story_id, - stage, - name, - agent, - retry_count, - blocked, - depends_on, - claimed_by, - claimed_at, - merged_at, - }) -} - -/// Check whether a dependency (by numeric ID prefix) is in `5_done` or `6_archived` -/// according to CRDT state. -/// -/// Returns `true` if the dependency is satisfied (item found in a done stage). -/// See `dep_is_archived_crdt` to distinguish archive-satisfied from cleanly-done. -pub fn dep_is_done_crdt(dep_number: u32) -> bool { - let prefix = format!("{dep_number}_"); - if let Some(items) = read_all_items() { - items.iter().any(|item| { - item.story_id.starts_with(&prefix) - && matches!(item.stage.as_str(), "5_done" | "6_archived") - }) - } else { - false - } -} - -/// Check whether a dependency (by numeric ID prefix) is specifically in `6_archived` -/// according to CRDT state. -/// -/// Used to detect when a dependency is satisfied via archive rather than via a clean -/// completion through `5_done`. Returns `false` when the CRDT layer is not initialised. -pub fn dep_is_archived_crdt(dep_number: u32) -> bool { - let prefix = format!("{dep_number}_"); - if let Some(items) = read_all_items() { - items - .iter() - .any(|item| item.story_id.starts_with(&prefix) && item.stage == "6_archived") - } else { - false - } -} - -/// Check unmet dependencies for a story by reading its `depends_on` from the -/// CRDT document and checking each dependency against CRDT state. -/// -/// Returns the list of dependency numbers that are NOT in `5_done` or `6_archived`. -pub fn check_unmet_deps_crdt(story_id: &str) -> Vec { - let item = match read_item(story_id) { - Some(i) => i, - None => return Vec::new(), - }; - let deps = match item.depends_on { - Some(d) => d, - None => return Vec::new(), - }; - deps.into_iter() - .filter(|&dep| !dep_is_done_crdt(dep)) - .collect() -} - -/// Return the list of dependency numbers from `story_id`'s `depends_on` that are -/// specifically in `6_archived` according to CRDT state. -/// -/// Used to emit a warning when promotion fires because a dep is archived rather than -/// cleanly completed. Returns an empty `Vec` when no deps are archived. -pub fn check_archived_deps_crdt(story_id: &str) -> Vec { - let item = match read_item(story_id) { - Some(i) => i, - None => return Vec::new(), - }; - let deps = match item.depends_on { - Some(d) => d, - None => return Vec::new(), - }; - deps.into_iter() - .filter(|&dep| dep_is_archived_crdt(dep)) - .collect() -} - -/// Hex-encode a byte slice (no external dep needed). -pub(crate) mod hex { - pub fn encode(bytes: &[u8]) -> String { - bytes.iter().map(|b| format!("{b:02x}")).collect() - } -} - -// ── Tests ──────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - use bft_json_crdt::json_crdt::OpState; - - #[test] - fn crdt_doc_insert_and_view() { - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - - let item_json: JsonValue = json!({ - "story_id": "10_story_test", - "stage": "2_current", - "name": "Test Story", - "agent": "coder-opus", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - - let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp); - assert_eq!(crdt.apply(op), OpState::Ok); - - let view = crdt.doc.items.view(); - assert_eq!(view.len(), 1); - - let item = &crdt.doc.items[0]; - assert_eq!( - item.story_id.view(), - JsonValue::String("10_story_test".to_string()) - ); - assert_eq!( - item.stage.view(), - JsonValue::String("2_current".to_string()) - ); - } - - #[test] - fn crdt_doc_update_stage() { - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - - let item_json: JsonValue = json!({ - "story_id": "20_story_move", - "stage": "1_backlog", - "name": "Move Me", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - - let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp); - crdt.apply(insert_op); - - // Update stage - let stage_op = crdt.doc.items[0] - .stage - .set("2_current".to_string()) - .sign(&kp); - crdt.apply(stage_op); - - assert_eq!( - crdt.doc.items[0].stage.view(), - JsonValue::String("2_current".to_string()) - ); - } - - #[test] - fn crdt_ops_replay_reconstructs_state() { - let kp = make_keypair(); - let mut crdt1 = BaseCrdt::::new(&kp); - - // Build state with a series of ops. - let item_json: JsonValue = json!({ - "story_id": "30_story_replay", - "stage": "1_backlog", - "name": "Replay Test", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - - let op1 = crdt1.doc.items.insert(ROOT_ID, item_json).sign(&kp); - crdt1.apply(op1.clone()); - - let op2 = crdt1.doc.items[0] - .stage - .set("2_current".to_string()) - .sign(&kp); - crdt1.apply(op2.clone()); - - let op3 = crdt1.doc.items[0] - .name - .set("Updated Name".to_string()) - .sign(&kp); - crdt1.apply(op3.clone()); - - // Replay ops on a fresh CRDT. - let mut crdt2 = BaseCrdt::::new(&kp); - crdt2.apply(op1); - crdt2.apply(op2); - crdt2.apply(op3); - - assert_eq!( - crdt1.doc.items[0].stage.view(), - crdt2.doc.items[0].stage.view() - ); - assert_eq!( - crdt1.doc.items[0].name.view(), - crdt2.doc.items[0].name.view() - ); - } - - #[test] - fn extract_item_view_parses_crdt_item() { - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - - let item_json: JsonValue = json!({ - "story_id": "40_story_view", - "stage": "3_qa", - "name": "View Test", - "agent": "coder-1", - "retry_count": 2.0, - "blocked": true, - "depends_on": "[10,20]", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - - let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp); - crdt.apply(op); - - let view = extract_item_view(&crdt.doc.items[0]).unwrap(); - assert_eq!(view.story_id, "40_story_view"); - assert_eq!(view.stage, "3_qa"); - assert_eq!(view.name.as_deref(), Some("View Test")); - assert_eq!(view.agent.as_deref(), Some("coder-1")); - assert_eq!(view.retry_count, Some(2)); - assert_eq!(view.blocked, Some(true)); - assert_eq!(view.depends_on, Some(vec![10, 20])); - } - - #[test] - fn rebuild_index_maps_story_ids() { - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - - for (sid, stage) in &[("10_story_a", "1_backlog"), ("20_story_b", "2_current")] { - let item: JsonValue = json!({ - "story_id": sid, - "stage": stage, - "name": "", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); - crdt.apply(op); - } - - let index = rebuild_index(&crdt); - assert_eq!(index.len(), 2); - assert!(index.contains_key("10_story_a")); - assert!(index.contains_key("20_story_b")); - } - - #[tokio::test] - async fn init_and_write_read_roundtrip() { - let tmp = tempfile::tempdir().unwrap(); - let db_path = tmp.path().join("crdt_test.db"); - - // Init directly (not via the global singleton, for test isolation). - let options = SqliteConnectOptions::new() - .filename(&db_path) - .create_if_missing(true); - let pool = SqlitePool::connect_with(options).await.unwrap(); - sqlx::migrate!("./migrations").run(&pool).await.unwrap(); - - let keypair = make_keypair(); - let mut crdt = BaseCrdt::::new(&keypair); - - // Insert and update like write_item does. - let item_json: JsonValue = json!({ - "story_id": "50_story_roundtrip", - "stage": "1_backlog", - "name": "Roundtrip", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - - let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&keypair); - crdt.apply(insert_op.clone()); - - // Persist the op. - let op_json = serde_json::to_string(&insert_op).unwrap(); - let op_id = hex::encode(&insert_op.id()); - let now = chrono::Utc::now().to_rfc3339(); - sqlx::query( - "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", - ) - .bind(&op_id) - .bind(insert_op.inner.seq as i64) - .bind(&op_json) - .bind(&now) - .execute(&pool) - .await - .unwrap(); - - // Reconstruct from DB. - let rows: Vec<(String,)> = - sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") - .fetch_all(&pool) - .await - .unwrap(); - - let mut crdt2 = BaseCrdt::::new(&keypair); - for (json_str,) in &rows { - let op: SignedOp = serde_json::from_str(json_str).unwrap(); - crdt2.apply(op); - } - - let view = extract_item_view(&crdt2.doc.items[0]).unwrap(); - assert_eq!(view.story_id, "50_story_roundtrip"); - assert_eq!(view.stage, "1_backlog"); - assert_eq!(view.name.as_deref(), Some("Roundtrip")); - } - - #[test] - fn signed_op_serialization_roundtrip() { - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - - let item: JsonValue = json!({ - "story_id": "60_story_serde", - "stage": "1_backlog", - "name": "Serde Test", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - - let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); - let json_str = serde_json::to_string(&op).unwrap(); - let deserialized: SignedOp = serde_json::from_str(&json_str).unwrap(); - - assert_eq!(op.id(), deserialized.id()); - assert_eq!(op.inner.seq, deserialized.inner.seq); - } - - // ── CrdtEvent tests ───────────────────────────────────────────────── - - #[test] - fn crdt_event_has_expected_fields() { - let evt = CrdtEvent { - story_id: "42_story_foo".to_string(), - from_stage: Some("1_backlog".to_string()), - to_stage: "2_current".to_string(), - name: Some("Foo Feature".to_string()), - }; - assert_eq!(evt.story_id, "42_story_foo"); - assert_eq!(evt.from_stage.as_deref(), Some("1_backlog")); - assert_eq!(evt.to_stage, "2_current"); - assert_eq!(evt.name.as_deref(), Some("Foo Feature")); - } - - #[test] - fn crdt_event_clone_preserves_data() { - let evt = CrdtEvent { - story_id: "10_story_bar".to_string(), - from_stage: None, - to_stage: "1_backlog".to_string(), - name: None, - }; - let cloned = evt.clone(); - assert_eq!(cloned.story_id, "10_story_bar"); - assert!(cloned.from_stage.is_none()); - assert!(cloned.name.is_none()); - } - - #[test] - fn emit_event_is_noop_when_channel_not_initialised() { - // Before CRDT_EVENT_TX is set, emit_event should not panic. - // This test verifies the guard clause works. In test binaries the - // OnceLock may already be set by another test, so we just verify - // the function doesn't panic regardless. - emit_event(CrdtEvent { - story_id: "99_story_noop".to_string(), - from_stage: None, - to_stage: "1_backlog".to_string(), - name: None, - }); - } - - #[test] - fn crdt_event_broadcast_channel_round_trip() { - let (tx, mut rx) = broadcast::channel::(16); - let evt = CrdtEvent { - story_id: "70_story_broadcast".to_string(), - from_stage: Some("1_backlog".to_string()), - to_stage: "2_current".to_string(), - name: Some("Broadcast Test".to_string()), - }; - tx.send(evt).unwrap(); - - let received = rx.try_recv().unwrap(); - assert_eq!(received.story_id, "70_story_broadcast"); - assert_eq!(received.from_stage.as_deref(), Some("1_backlog")); - assert_eq!(received.to_stage, "2_current"); - assert_eq!(received.name.as_deref(), Some("Broadcast Test")); - } - - #[test] - fn dep_is_done_crdt_returns_false_when_no_crdt_state() { - // When the global CRDT state is not initialised (or in a test environment), - // dep_is_done_crdt should return false rather than panicking. - // Note: in the test binary the global may or may not be initialised, - // but the function should never panic either way. - let _ = dep_is_done_crdt(9999); - } - - #[test] - fn check_unmet_deps_crdt_returns_empty_when_item_not_found() { - // Non-existent story should return empty deps. - let result = check_unmet_deps_crdt("nonexistent_story"); - assert!(result.is_empty()); - } - - // ── Bug 503: archived-dep visibility ───────────────────────────────────── - - #[test] - fn dep_is_archived_crdt_returns_false_when_no_crdt_state() { - // When the global CRDT state is not initialised, must not panic. - let _ = dep_is_archived_crdt(9998); - } - - #[test] - fn check_archived_deps_crdt_returns_empty_when_item_not_found() { - // Non-existent story should return empty archived deps. - let result = check_archived_deps_crdt("nonexistent_story_archived"); - assert!(result.is_empty()); - } - - // ── 478: WebSocket CRDT sync layer tests ──────────────────────────────── - - #[test] - fn apply_remote_op_returns_false_when_not_initialised() { - // Without the global CRDT state, apply_remote_op should return false. - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - let item: JsonValue = serde_json::json!({ - "story_id": "80_story_remote", - "stage": "1_backlog", - "name": "Remote", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let op = crdt - .doc - .items - .insert(bft_json_crdt::op::ROOT_ID, item) - .sign(&kp); - // This uses the global state which may not be initialised in tests. - let _ = apply_remote_op(op); - } - - #[test] - fn signed_op_survives_sync_serialization_roundtrip() { - // Verify that a SignedOp serialised to JSON and back produces - // the same op (critical for the sync wire protocol). - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - let item: JsonValue = serde_json::json!({ - "story_id": "90_story_wire", - "stage": "2_current", - "name": "Wire Test", - "agent": "coder", - "retry_count": 1.0, - "blocked": false, - "depends_on": "[10]", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let op = crdt - .doc - .items - .insert(bft_json_crdt::op::ROOT_ID, item) - .sign(&kp); - - let json1 = serde_json::to_string(&op).unwrap(); - let roundtripped: SignedOp = serde_json::from_str(&json1).unwrap(); - let json2 = serde_json::to_string(&roundtripped).unwrap(); - - assert_eq!(json1, json2); - assert_eq!(op.id(), roundtripped.id()); - assert_eq!(op.inner.seq, roundtripped.inner.seq); - assert_eq!(op.author(), roundtripped.author()); - } - - #[test] - fn sync_broadcast_channel_round_trip() { - let (tx, mut rx) = broadcast::channel::(16); - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - let item: JsonValue = serde_json::json!({ - "story_id": "95_story_sync_bcast", - "stage": "1_backlog", - "name": "", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let op = crdt - .doc - .items - .insert(bft_json_crdt::op::ROOT_ID, item) - .sign(&kp); - tx.send(op.clone()).unwrap(); - - let received = rx.try_recv().unwrap(); - assert_eq!(received.id(), op.id()); - } - - // ── Bug 511: CRDT lamport clock resets on restart ──────────────────────── - // - // Root cause: Op::sign() always produces SignedOp with depends_on = vec![], - // so the causal dependency queue never engages during replay. Field update - // ops (seq=1,2,3 from each field's LwwRegisterCrdt counter) are replayed - // before list insert ops (seq=N from the items ListCrdt counter) when - // ordered by `seq ASC`. They fail ErrPathMismatch silently, their our_seq - // is never updated, and the next field write re-uses seq=1. - // - // Fix: replay by `rowid ASC` (SQLite insertion order) instead of `seq ASC`. - // Rowid preserves the causal order ops were originally applied in, so field - // updates always come after the item insert they reference. - #[tokio::test] - async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { - let tmp = tempfile::tempdir().unwrap(); - let db_path = tmp.path().join("bug511.db"); - - let options = SqliteConnectOptions::new() - .filename(&db_path) - .create_if_missing(true); - let pool = SqlitePool::connect_with(options).await.unwrap(); - sqlx::migrate!("./migrations").run(&pool).await.unwrap(); - - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - - // Insert 5 dummy items to advance items.our_seq to 5. - for i in 0..5u32 { - let sid = format!("{}_story_warmup", i); - let item: JsonValue = json!({ - "story_id": sid, - "stage": "1_backlog", - "name": "", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); - crdt.apply(op.clone()); - // We don't persist these to the DB — they are pre-history. - } - - // Now insert the real item. items.our_seq was 5, so this op gets seq=6. - let target_item: JsonValue = json!({ - "story_id": "511_story_target", - "stage": "1_backlog", - "name": "Bug 511 target", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp); - crdt.apply(insert_op.clone()); - // insert_op.inner.seq == 6 - - // Now update the stage. The stage LwwRegisterCrdt for this item starts - // at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6. - let idx = rebuild_index(&crdt)["511_story_target"]; - let stage_op = crdt.doc.items[idx] - .stage - .set("2_current".to_string()) - .sign(&kp); - crdt.apply(stage_op.clone()); - // stage_op.inner.seq == 1 - - // Persist BOTH ops in causal order (insert first, update second). - // This means insert_op gets rowid < stage_op rowid. - let now = chrono::Utc::now().to_rfc3339(); - for op in [&insert_op, &stage_op] { - let op_json = serde_json::to_string(op).unwrap(); - let op_id = hex::encode(&op.id()); - sqlx::query( - "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", - ) - .bind(&op_id) - .bind(op.inner.seq as i64) - .bind(&op_json) - .bind(&now) - .execute(&pool) - .await - .unwrap(); - } - - // Replay by rowid ASC (the fix). The insert must come before the field - // update regardless of their field-level seq values. - let rows: Vec<(String,)> = - sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") - .fetch_all(&pool) - .await - .unwrap(); - - let mut crdt2 = BaseCrdt::::new(&kp); - for (json_str,) in &rows { - let op: SignedOp = serde_json::from_str(json_str).unwrap(); - crdt2.apply(op); - } - - // The item must be in the CRDT and must reflect the stage update. - let index2 = rebuild_index(&crdt2); - assert!( - index2.contains_key("511_story_target"), - "item not found after rowid-order replay" - ); - let idx2 = index2["511_story_target"]; - let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap(); - assert_eq!( - view.stage, "2_current", - "stage field update lost during replay (bug 511 regression)" - ); - - // Confirm the bug is reproducible by replaying seq ASC instead. - // With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6), - // fails ErrPathMismatch, and the item ends up at "1_backlog". - let rows_wrong_order: Vec<(String,)> = - sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC") - .fetch_all(&pool) - .await - .unwrap(); - - let mut crdt3 = BaseCrdt::::new(&kp); - for (json_str,) in &rows_wrong_order { - let op: SignedOp = serde_json::from_str(json_str).unwrap(); - crdt3.apply(op); - } - - let index3 = rebuild_index(&crdt3); - // With seq ASC replay, the item is created (insert_op eventually runs) - // but the stage update is lost (it ran before the item existed). - if let Some(idx3) = index3.get("511_story_target") { - let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap(); - // The bug: stage is still "1_backlog" because the update was dropped. - assert_eq!( - view3.stage, "1_backlog", - "expected seq-ASC replay to exhibit the bug (update lost)" - ); - } - } - - // ── Story 518: persist_tx send failure logging ─────────────────────────── - - #[test] - fn persist_tx_send_failure_logs_error() { - let kp = make_keypair(); - let crdt = BaseCrdt::::new(&kp); - let (persist_tx, persist_rx) = mpsc::unbounded_channel::(); - - let mut state = CrdtState { - crdt, - keypair: kp, - index: HashMap::new(), - node_index: HashMap::new(), - persist_tx, - }; - - // Drop the receiver so that the next send fails immediately. - drop(persist_rx); - - let item_json: JsonValue = json!({ - "story_id": "518_story_persist_fail", - "stage": "1_backlog", - "name": "Persist Fail Test", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - - let before_errors = crate::log_buffer::global() - .get_recent_entries(1000, None, Some(&crate::log_buffer::LogLevel::Error)) - .len(); - - apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); - - let error_entries = crate::log_buffer::global().get_recent_entries( - 1000, - None, - Some(&crate::log_buffer::LogLevel::Error), - ); - - assert!( - error_entries.len() > before_errors, - "expected an ERROR log entry when persist_tx send fails, but none was added" - ); - - let last_error = &error_entries[error_entries.len() - 1]; - assert!( - last_error.message.contains("persist"), - "error message should mention persist: {}", - last_error.message - ); - assert!( - last_error.message.contains("ahead") || last_error.message.contains("diverged"), - "error message should note in-memory/persisted divergence: {}", - last_error.message - ); - } - - // ── Story 631: vector clock delta sync tests ──────────────────────── - - /// Helper: create N signed insert ops on a CRDT and return them with their JSON. - fn make_ops( - kp: &Ed25519KeyPair, - crdt: &mut BaseCrdt, - count: usize, - prefix: &str, - ) -> Vec<(SignedOp, String)> { - let mut ops = Vec::new(); - for i in 0..count { - let item: JsonValue = json!({ - "story_id": format!("{prefix}_{i}"), - "stage": "1_backlog", - "name": format!("Item {i}"), - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let op = crdt.doc.items.insert(ROOT_ID, item).sign(kp); - crdt.apply(op.clone()); - let json = serde_json::to_string(&op).unwrap(); - ops.push((op, json)); - } - ops - } - - /// Build a vector clock from a list of (SignedOp, json) pairs. - fn build_clock(ops: &[(SignedOp, String)]) -> VectorClock { - let mut clock = VectorClock::new(); - for (op, _) in ops { - let author = hex::encode(&op.author()); - *clock.entry(author).or_insert(0) += 1; - } - clock - } - - /// Compute ops_since against a local journal and peer clock. - /// - /// Mirrors the production `ops_since` logic but operates on a local Vec - /// instead of the global `ALL_OPS` static. - fn local_ops_since(all_ops: &[(SignedOp, String)], peer_clock: &VectorClock) -> Vec { - let mut author_counts: HashMap = HashMap::new(); - let mut result = Vec::new(); - for (op, json) in all_ops { - let author = hex::encode(&op.author()); - let count = author_counts.entry(author.clone()).or_insert(0); - *count += 1; - let peer_has = peer_clock.get(&author).copied().unwrap_or(0); - if *count > peer_has { - result.push(json.clone()); - } - } - result - } - - /// Integration test (low-bandwidth sync): two nodes, A applies 100 ops, - /// B reconnects with a current clock — B receives 0 ops on the bulk phase. - #[test] - fn delta_sync_low_bandwidth_fully_caught_up() { - let kp_a = make_keypair(); - let mut crdt_a = BaseCrdt::::new(&kp_a); - - let ops_a = make_ops(&kp_a, &mut crdt_a, 100, "631_low"); - - // B has already seen all 100 ops (its clock matches A's journal). - let clock_b = build_clock(&ops_a); - - // Delta should be empty. - let delta = local_ops_since(&ops_a, &clock_b); - assert_eq!( - delta.len(), - 0, - "caught-up peer should receive 0 ops, got {}", - delta.len() - ); - } - - /// Integration test (mid-stream): A applies 100 ops, B disconnects, - /// A applies 50 more ops, B reconnects — B receives exactly the 50 missed ops. - #[test] - fn delta_sync_mid_stream_partial_catch_up() { - let kp_a = make_keypair(); - let mut crdt_a = BaseCrdt::::new(&kp_a); - - // Phase 1: 100 ops that B has seen. - let ops_phase1 = make_ops(&kp_a, &mut crdt_a, 100, "631_mid1"); - let clock_b = build_clock(&ops_phase1); - - // Phase 2: 50 more ops that B missed. - let ops_phase2 = make_ops(&kp_a, &mut crdt_a, 50, "631_mid2"); - - // A's full journal is phase1 + phase2. - let mut all_ops_a: Vec<(SignedOp, String)> = ops_phase1; - all_ops_a.extend(ops_phase2); - - let delta = local_ops_since(&all_ops_a, &clock_b); - assert_eq!( - delta.len(), - 50, - "peer should receive exactly 50 missed ops, got {}", - delta.len() - ); - } - - /// Integration test (new node): C connects with empty clock, - /// receives all 150 ops — verifies fallback behaviour. - #[test] - fn delta_sync_new_node_receives_all_ops() { - let kp_a = make_keypair(); - let mut crdt_a = BaseCrdt::::new(&kp_a); - - let ops_phase1 = make_ops(&kp_a, &mut crdt_a, 100, "631_new1"); - let ops_phase2 = make_ops(&kp_a, &mut crdt_a, 50, "631_new2"); - - let mut all_ops_a: Vec<(SignedOp, String)> = ops_phase1; - all_ops_a.extend(ops_phase2); - - // Empty clock = new node. - let empty_clock = VectorClock::new(); - let delta = local_ops_since(&all_ops_a, &empty_clock); - assert_eq!( - delta.len(), - 150, - "new node should receive all 150 ops, got {}", - delta.len() - ); - } - - /// Multi-author delta sync: ops from two different nodes, peer has seen - /// all of one author but none of the other. - #[test] - fn delta_sync_multi_author() { - use fastcrypto::traits::KeyPair; - - let kp_a = make_keypair(); - let kp_b = make_keypair(); - let mut crdt_a = BaseCrdt::::new(&kp_a); - let mut crdt_b = BaseCrdt::::new(&kp_b); - - let ops_a = make_ops(&kp_a, &mut crdt_a, 30, "631_ma_a"); - let ops_b = make_ops(&kp_b, &mut crdt_b, 20, "631_ma_b"); - - // Combined journal on a hypothetical server. - let mut all_ops: Vec<(SignedOp, String)> = ops_a.clone(); - all_ops.extend(ops_b); - - // Peer has seen all of A's ops but none of B's. - let mut peer_clock = VectorClock::new(); - let author_a_hex = hex::encode(&kp_a.public().0.to_bytes()); - peer_clock.insert(author_a_hex, 30); - - let delta = local_ops_since(&all_ops, &peer_clock); - assert_eq!( - delta.len(), - 20, - "peer should receive 20 ops from author B, got {}", - delta.len() - ); - } - - /// Vector clock construction from ops. - #[test] - fn build_vector_clock_from_ops() { - use fastcrypto::traits::KeyPair; - - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - let ops = make_ops(&kp, &mut crdt, 10, "631_vc"); - - let clock = build_clock(&ops); - let author_hex = hex::encode(&kp.public().0.to_bytes()); - - assert_eq!(clock.len(), 1, "single author should produce 1 clock entry"); - assert_eq!(clock[&author_hex], 10, "clock should show 10 ops"); - } - - /// Wire format: clock message serialization round-trip. - #[test] - fn clock_message_serialization_roundtrip() { - let mut clock = VectorClock::new(); - clock.insert("aabbcc".to_string(), 42); - clock.insert("ddeeff".to_string(), 7); - - let json = serde_json::to_value(&clock).unwrap(); - assert!(json.is_object()); - let deserialized: VectorClock = serde_json::from_value(json).unwrap(); - assert_eq!(deserialized["aabbcc"], 42); - assert_eq!(deserialized["ddeeff"], 7); - } -} diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs new file mode 100644 index 00000000..31399a01 --- /dev/null +++ b/server/src/crdt_state/mod.rs @@ -0,0 +1,53 @@ +//! CRDT state layer — manages pipeline state as a conflict-free replicated document backed by SQLite. +//! +//! The CRDT document is the primary source of truth for pipeline item +//! metadata (stage, name, agent, etc.). CRDT ops are persisted to SQLite so +//! state survives restarts. The filesystem `.huskies/work/` directories are +//! still updated as a secondary output for backwards compatibility. +//! +//! Stage transitions detected by `write_item()` are broadcast as [`CrdtEvent`]s +//! so subscribers (auto-assign, WebSocket, notifications) can react without +//! polling the filesystem. + +use std::collections::HashMap; + +/// A vector clock mapping node IDs (hex-encoded Ed25519 pubkeys) to the count +/// of ops seen from that node. Used for delta sync — a connecting peer sends +/// its clock so the other side can compute which ops are missing. +pub type VectorClock = HashMap; + +mod ops; +mod presence; +mod read; +mod state; +mod types; +mod write; + +pub use ops::{all_ops_json, apply_remote_op, ops_since, our_vector_clock, subscribe_ops}; +pub use presence::{ + is_claimed_by_us, our_node_id, read_all_node_presence, release_claim, sign_challenge, + write_claim, write_node_presence, +}; +pub use read::{ + CrdtItemDump, CrdtStateDump, check_archived_deps_crdt, check_unmet_deps_crdt, + dep_is_archived_crdt, dep_is_done_crdt, dump_crdt_state, evict_item, read_all_items, + read_item, +}; +pub use state::init; +pub use types::{ + CrdtEvent, NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, + PipelineItemView, subscribe, +}; +pub use write::write_item; + +#[cfg(test)] +pub use state::init_for_test; + +pub(crate) use state::{ALL_OPS, VECTOR_CLOCK}; + +/// Hex-encode a byte slice (no external dep needed). +pub(crate) mod hex { + pub fn encode(bytes: &[u8]) -> String { + bytes.iter().map(|b| format!("{b:02x}")).collect() + } +} diff --git a/server/src/crdt_state/ops.rs b/server/src/crdt_state/ops.rs new file mode 100644 index 00000000..a57c5ab2 --- /dev/null +++ b/server/src/crdt_state/ops.rs @@ -0,0 +1,460 @@ +//! Public sync-broadcast API and remote-op ingestion. + +use std::collections::HashMap; + +use bft_json_crdt::json_crdt::*; +use bft_json_crdt::op::ROOT_ID; +use super::hex; +use tokio::sync::broadcast; + +use super::VectorClock; +use super::state::{ + ALL_OPS, SYNC_TX, VECTOR_CLOCK, apply_and_persist, emit_event, get_crdt, rebuild_index, + rebuild_node_index, track_op, +}; +use super::types::{CrdtEvent, PipelineDoc}; +use crate::slog; + +/// Subscribe to locally-created CRDT ops for sync replication. +/// +/// Each `SignedOp` broadcast here was created by *this* node and should be +/// forwarded to connected peers. Returns `None` before `init()`. +pub fn subscribe_ops() -> Option> { + SYNC_TX.get().map(|tx| tx.subscribe()) +} + +/// Return all persisted `SignedOp`s in causal order (oldest first). +/// +/// Used during initial sync handshake so a newly-connected peer can +/// reconstruct the full CRDT state. Returns `None` before `init()`. +pub fn all_ops_json() -> Option> { + ALL_OPS.get().map(|m| m.lock().unwrap().clone()) +} + +/// Return this node's current vector clock. +/// +/// The clock maps each author's hex-encoded Ed25519 public key to the count +/// of ops received from that author. A connecting peer sends its clock so +/// the other side can compute which ops are missing via [`ops_since`]. +/// +/// Returns `None` before `init()`. +pub fn our_vector_clock() -> Option { + VECTOR_CLOCK.get().map(|m| m.lock().unwrap().clone()) +} + +/// Return only the ops that a peer with the given `peer_clock` is missing. +/// +/// Iterates the local op journal and, for each author, skips the first N ops +/// (where N = `peer_clock[author]`) and returns the rest. An empty peer +/// clock returns all ops (full sync for new nodes). +/// +/// Returns `None` before `init()`. +pub fn ops_since(peer_clock: &VectorClock) -> Option> { + let all = ALL_OPS.get()?.lock().ok()?; + let mut author_counts: HashMap = HashMap::new(); + let mut result = Vec::new(); + + for op_json in all.iter() { + if let Ok(signed_op) = serde_json::from_str::(op_json) { + let author_hex = super::hex::encode(&signed_op.author()); + let count = author_counts.entry(author_hex.clone()).or_insert(0); + *count += 1; + + let peer_has = peer_clock.get(&author_hex).copied().unwrap_or(0); + if *count > peer_has { + result.push(op_json.clone()); + } + } + } + + Some(result) +} + +// ── Remote op ingestion (from sync peers) ─────────────────────────── + +/// Apply a `SignedOp` received from a remote peer. +/// +/// The op is validated, applied to the local CRDT, persisted to SQLite, +/// and any resulting stage transitions are broadcast as [`CrdtEvent`]s. +/// Unlike `apply_and_persist`, this does **not** re-broadcast the op on +/// the sync channel (to avoid infinite echo loops). +/// +/// Returns `true` if the op was new and applied, `false` if it was a +/// duplicate or failed validation. +pub fn apply_remote_op(op: SignedOp) -> bool { + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + + // Snapshot stage state before applying so we can detect transitions. + let pre_stages: HashMap = state + .index + .iter() + .filter_map(|(sid, &idx)| match state.crdt.doc.items[idx].stage.view() { + JsonValue::String(s) => Some((sid.clone(), s)), + _ => None, + }) + .collect(); + + let result = state.crdt.apply(op.clone()); + + // Self-loop guard: op was already applied (came back via echo from peer). + // Return false immediately — do not re-persist or re-add to ALL_OPS. + if result == bft_json_crdt::json_crdt::OpState::AlreadySeen { + return false; + } + + if result != bft_json_crdt::json_crdt::OpState::Ok + && result != bft_json_crdt::json_crdt::OpState::MissingCausalDependencies + { + return false; + } + + // Persist the op. + if let Err(e) = state.persist_tx.send(op.clone()) { + crate::slog_error!( + "[crdt] Failed to send remote op to persist task: {e}; persist task may be dead. \ + In-memory state is now ahead of persisted state." + ); + } + + // Track in ALL_OPS + VECTOR_CLOCK. + if let Ok(json) = serde_json::to_string(&op) { + track_op(&op, json); + } + + // Rebuild indices (new items or nodes may have been inserted). + state.index = rebuild_index(&state.crdt); + state.node_index = rebuild_node_index(&state.crdt); + + // Detect and broadcast stage transitions. + for (sid, &idx) in &state.index { + let new_stage = match state.crdt.doc.items[idx].stage.view() { + JsonValue::String(s) => s, + _ => continue, + }; + let old_stage = pre_stages.get(sid).cloned(); + let changed = old_stage.as_deref() != Some(&new_stage); + if changed { + let name = match state.crdt.doc.items[idx].name.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + emit_event(CrdtEvent { + story_id: sid.clone(), + from_stage: old_stage, + to_stage: new_stage, + name, + }); + } + } + + true +} + +#[cfg(test)] +mod tests { + use super::*; + use super::super::state::init_for_test; + use super::super::types::{NodePresenceCrdt, PipelineItemCrdt}; + use super::super::write::write_item; + use bft_json_crdt::json_crdt::OpState; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use fastcrypto::ed25519::Ed25519KeyPair; + use serde_json::json; + + #[test] + fn signed_op_serialization_roundtrip() { + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + let item: JsonValue = json!({ + "story_id": "60_story_serde", + "stage": "1_backlog", + "name": "Serde Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + let json_str = serde_json::to_string(&op).unwrap(); + let deserialized: SignedOp = serde_json::from_str(&json_str).unwrap(); + + assert_eq!(op.id(), deserialized.id()); + assert_eq!(op.inner.seq, deserialized.inner.seq); + } + + #[test] + fn apply_remote_op_returns_false_when_not_initialised() { + // Without the global CRDT state, apply_remote_op should return false. + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + let item: JsonValue = serde_json::json!({ + "story_id": "80_story_remote", + "stage": "1_backlog", + "name": "Remote", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op = crdt + .doc + .items + .insert(bft_json_crdt::op::ROOT_ID, item) + .sign(&kp); + // This uses the global state which may not be initialised in tests. + let _ = apply_remote_op(op); + } + + #[test] + fn signed_op_survives_sync_serialization_roundtrip() { + // Verify that a SignedOp serialised to JSON and back produces + // the same op (critical for the sync wire protocol). + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + let item: JsonValue = serde_json::json!({ + "story_id": "90_story_wire", + "stage": "2_current", + "name": "Wire Test", + "agent": "coder", + "retry_count": 1.0, + "blocked": false, + "depends_on": "[10]", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op = crdt + .doc + .items + .insert(bft_json_crdt::op::ROOT_ID, item) + .sign(&kp); + + let json1 = serde_json::to_string(&op).unwrap(); + let roundtripped: SignedOp = serde_json::from_str(&json1).unwrap(); + let json2 = serde_json::to_string(&roundtripped).unwrap(); + + assert_eq!(json1, json2); + assert_eq!(op.id(), roundtripped.id()); + assert_eq!(op.inner.seq, roundtripped.inner.seq); + assert_eq!(op.author(), roundtripped.author()); + } + + #[test] + fn sync_broadcast_channel_round_trip() { + let (tx, mut rx) = broadcast::channel::(16); + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + let item: JsonValue = serde_json::json!({ + "story_id": "95_story_sync_bcast", + "stage": "1_backlog", + "name": "", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op = crdt + .doc + .items + .insert(bft_json_crdt::op::ROOT_ID, item) + .sign(&kp); + tx.send(op.clone()).unwrap(); + + let received = rx.try_recv().unwrap(); + assert_eq!(received.id(), op.id()); + } + + fn make_ops( + kp: &Ed25519KeyPair, + crdt: &mut BaseCrdt, + count: usize, + prefix: &str, + ) -> Vec<(SignedOp, String)> { + let mut ops = Vec::new(); + for i in 0..count { + let item: JsonValue = json!({ + "story_id": format!("{prefix}_{i}"), + "stage": "1_backlog", + "name": format!("Item {i}"), + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op = crdt.doc.items.insert(ROOT_ID, item).sign(kp); + crdt.apply(op.clone()); + let json = serde_json::to_string(&op).unwrap(); + ops.push((op, json)); + } + ops + } + + fn build_clock(ops: &[(SignedOp, String)]) -> VectorClock { + let mut clock = VectorClock::new(); + for (op, _) in ops { + let author = hex::encode(&op.author()); + *clock.entry(author).or_insert(0) += 1; + } + clock + } + + fn local_ops_since(all_ops: &[(SignedOp, String)], peer_clock: &VectorClock) -> Vec { + let mut author_counts: HashMap = HashMap::new(); + let mut result = Vec::new(); + for (op, json) in all_ops { + let author = hex::encode(&op.author()); + let count = author_counts.entry(author.clone()).or_insert(0); + *count += 1; + let peer_has = peer_clock.get(&author).copied().unwrap_or(0); + if *count > peer_has { + result.push(json.clone()); + } + } + result + } + + #[test] + fn delta_sync_low_bandwidth_fully_caught_up() { + let kp_a = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + + let ops_a = make_ops(&kp_a, &mut crdt_a, 100, "631_low"); + + // B has already seen all 100 ops (its clock matches A's journal). + let clock_b = build_clock(&ops_a); + + // Delta should be empty. + let delta = local_ops_since(&ops_a, &clock_b); + assert_eq!( + delta.len(), + 0, + "caught-up peer should receive 0 ops, got {}", + delta.len() + ); + } + + #[test] + fn delta_sync_mid_stream_partial_catch_up() { + let kp_a = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + + // Phase 1: 100 ops that B has seen. + let ops_phase1 = make_ops(&kp_a, &mut crdt_a, 100, "631_mid1"); + let clock_b = build_clock(&ops_phase1); + + // Phase 2: 50 more ops that B missed. + let ops_phase2 = make_ops(&kp_a, &mut crdt_a, 50, "631_mid2"); + + // A's full journal is phase1 + phase2. + let mut all_ops_a: Vec<(SignedOp, String)> = ops_phase1; + all_ops_a.extend(ops_phase2); + + let delta = local_ops_since(&all_ops_a, &clock_b); + assert_eq!( + delta.len(), + 50, + "peer should receive exactly 50 missed ops, got {}", + delta.len() + ); + } + + #[test] + fn delta_sync_new_node_receives_all_ops() { + let kp_a = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + + let ops_phase1 = make_ops(&kp_a, &mut crdt_a, 100, "631_new1"); + let ops_phase2 = make_ops(&kp_a, &mut crdt_a, 50, "631_new2"); + + let mut all_ops_a: Vec<(SignedOp, String)> = ops_phase1; + all_ops_a.extend(ops_phase2); + + // Empty clock = new node. + let empty_clock = VectorClock::new(); + let delta = local_ops_since(&all_ops_a, &empty_clock); + assert_eq!( + delta.len(), + 150, + "new node should receive all 150 ops, got {}", + delta.len() + ); + } + + #[test] + fn delta_sync_multi_author() { + use fastcrypto::traits::KeyPair; + + let kp_a = make_keypair(); + let kp_b = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + let mut crdt_b = BaseCrdt::::new(&kp_b); + + let ops_a = make_ops(&kp_a, &mut crdt_a, 30, "631_ma_a"); + let ops_b = make_ops(&kp_b, &mut crdt_b, 20, "631_ma_b"); + + // Combined journal on a hypothetical server. + let mut all_ops: Vec<(SignedOp, String)> = ops_a.clone(); + all_ops.extend(ops_b); + + // Peer has seen all of A's ops but none of B's. + let mut peer_clock = VectorClock::new(); + let author_a_hex = hex::encode(&kp_a.public().0.to_bytes()); + peer_clock.insert(author_a_hex, 30); + + let delta = local_ops_since(&all_ops, &peer_clock); + assert_eq!( + delta.len(), + 20, + "peer should receive 20 ops from author B, got {}", + delta.len() + ); + } + + #[test] + fn build_vector_clock_from_ops() { + use fastcrypto::traits::KeyPair; + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + let ops = make_ops(&kp, &mut crdt, 10, "631_vc"); + + let clock = build_clock(&ops); + let author_hex = hex::encode(&kp.public().0.to_bytes()); + + assert_eq!(clock.len(), 1, "single author should produce 1 clock entry"); + assert_eq!(clock[&author_hex], 10, "clock should show 10 ops"); + } + + #[test] + fn clock_message_serialization_roundtrip() { + let mut clock = VectorClock::new(); + clock.insert("aabbcc".to_string(), 42); + clock.insert("ddeeff".to_string(), 7); + + let json = serde_json::to_value(&clock).unwrap(); + assert!(json.is_object()); + let deserialized: VectorClock = serde_json::from_value(json).unwrap(); + assert_eq!(deserialized["aabbcc"], 42); + assert_eq!(deserialized["ddeeff"], 7); + } +} diff --git a/server/src/crdt_state/presence.rs b/server/src/crdt_state/presence.rs new file mode 100644 index 00000000..ec29de03 --- /dev/null +++ b/server/src/crdt_state/presence.rs @@ -0,0 +1,179 @@ +//! Node identity, work claiming, and node presence (heartbeat) API. + +use bft_json_crdt::json_crdt::*; +use bft_json_crdt::op::ROOT_ID; +use super::hex; +use super::read::read_item; +use bft_json_crdt::lww_crdt::LwwRegisterCrdt; +use fastcrypto::traits::{Signer, ToFromBytes}; +use serde_json::json; + +use super::state::{apply_and_persist, get_crdt, rebuild_node_index}; +use super::types::{NodePresenceCrdt, NodePresenceView, PipelineDoc}; +use crate::slog; + +// ── Node presence API ──────────────────────────────────────────────── + +/// Return the hex-encoded Ed25519 public key for this node. +/// +/// Used as the stable identity written into the CRDT nodes list. +/// Returns `None` before `init()`. +pub fn our_node_id() -> Option { + let state = get_crdt()?.lock().ok()?; + Some(hex::encode(&state.crdt.id)) +} + +/// Sign a challenge nonce with this node's keypair for WebSocket mutual auth. +/// +/// Returns `(pubkey_hex, signature_hex)` or `None` before `init()`. +pub fn sign_challenge(challenge: &str) -> Option<(String, String)> { + let state = get_crdt()?.lock().ok()?; + let pubkey_hex = crate::node_identity::public_key_hex(&state.keypair); + let sig_hex = crate::node_identity::sign_challenge(&state.keypair, challenge); + Some((pubkey_hex, sig_hex)) +} + +/// Write a claim on a pipeline item via CRDT. +/// +/// Sets `claimed_by` to this node's ID and `claimed_at` to the current time. +/// The LWW register ensures deterministic conflict resolution — if two nodes +/// claim the same item simultaneously, both will converge to the same winner +/// after CRDT sync. +/// +/// Returns `true` if the claim was written, `false` if the item doesn't exist +/// or CRDT is not initialised. +pub fn write_claim(story_id: &str) -> bool { + let Some(node_id) = our_node_id() else { + return false; + }; + let now = chrono::Utc::now().timestamp() as f64; + + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + + let Some(&idx) = state.index.get(story_id) else { + return false; + }; + + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].claimed_by.set(node_id.clone()) + }); + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(now)); + + true +} + +/// Release a claim on a pipeline item (clear claimed_by and claimed_at). +pub fn release_claim(story_id: &str) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + let Some(&idx) = state.index.get(story_id) else { + return; + }; + + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].claimed_by.set(String::new()) + }); + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(0.0)); +} + +/// Check if this node currently holds the claim on a pipeline item. +pub fn is_claimed_by_us(story_id: &str) -> bool { + let Some(node_id) = our_node_id() else { + return false; + }; + let Some(item) = read_item(story_id) else { + return false; + }; + item.claimed_by.as_deref() == Some(&node_id) +} + +/// Write or update a node presence entry in the CRDT. +/// +/// If a node with the given `node_id` already exists, only `last_seen`, +/// `alive`, and `address` are updated. If not, a new entry is inserted. +/// +/// This is the write path for both local heartbeats and tombstoning. +pub fn write_node_presence(node_id: &str, address: &str, last_seen: f64, alive: bool) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + + if let Some(&idx) = state.node_index.get(node_id) { + // Update existing entry — three separate ops so peers can merge independently. + apply_and_persist(&mut state, |s| { + s.crdt.doc.nodes[idx].last_seen.set(last_seen) + }); + apply_and_persist(&mut state, |s| s.crdt.doc.nodes[idx].alive.set(alive)); + apply_and_persist(&mut state, |s| { + s.crdt.doc.nodes[idx].address.set(address.to_string()) + }); + } else { + // Insert new node entry. + let node_json: JsonValue = json!({ + "node_id": node_id, + "address": address, + "last_seen": last_seen, + "alive": alive, + }) + .into(); + + apply_and_persist(&mut state, |s| s.crdt.doc.nodes.insert(ROOT_ID, node_json)); + + // Rebuild node index after insertion. + state.node_index = rebuild_node_index(&state.crdt); + } +} + +/// Read all node presence entries from the CRDT document. +/// +/// Returns `None` before `init()`. +pub fn read_all_node_presence() -> Option> { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + + let mut nodes = Vec::new(); + for node_crdt in state.crdt.doc.nodes.iter() { + if let Some(view) = extract_node_view(node_crdt) { + nodes.push(view); + } + } + Some(nodes) +} + +/// Extract a `NodePresenceView` from a `NodePresenceCrdt`. +fn extract_node_view(node: &NodePresenceCrdt) -> Option { + let node_id = match node.node_id.view() { + JsonValue::String(s) if !s.is_empty() => s, + _ => return None, + }; + let address = match node.address.view() { + JsonValue::String(s) if !s.is_empty() => s, + _ => return None, + }; + let last_seen = match node.last_seen.view() { + JsonValue::Number(n) => n, + _ => 0.0, + }; + let alive = match node.alive.view() { + JsonValue::Bool(b) => b, + _ => true, + }; + Some(NodePresenceView { + node_id, + address, + last_seen, + alive, + }) +} diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs new file mode 100644 index 00000000..db7d68d2 --- /dev/null +++ b/server/src/crdt_state/read.rs @@ -0,0 +1,472 @@ +//! Read API for pipeline items, dump introspection, and dependency helpers. + +use std::collections::HashMap; + +use bft_json_crdt::json_crdt::*; + +use super::state::{ALL_OPS, apply_and_persist, get_crdt, rebuild_index}; +use bft_json_crdt::op::ROOT_ID; +use super::types::{PipelineDoc, PipelineItemCrdt, PipelineItemView}; + +// ── Debug dump ─────────────────────────────────────────────────────── + +/// A raw dump of a single CRDT list entry, including deleted items. +/// +/// Use `content_index` (hex of the list insert `OpId`) to cross-reference +/// with rows in the `crdt_ops` SQLite table. +pub struct CrdtItemDump { + pub story_id: Option, + pub stage: Option, + pub name: Option, + pub agent: Option, + pub retry_count: Option, + pub blocked: Option, + pub depends_on: Option>, + pub claimed_by: Option, + pub claimed_at: Option, + /// Hex-encoded OpId of the list insert op — cross-reference with `crdt_ops`. + pub content_index: String, + pub is_deleted: bool, +} + +/// Top-level debug dump of the in-memory CRDT state. +pub struct CrdtStateDump { + pub in_memory_state_loaded: bool, + /// Count of non-deleted items with a valid story_id and stage. + pub total_items: usize, + /// Total list-level ops seen (excludes root sentinel). + pub total_ops_in_list: usize, + /// Highest Lamport sequence number seen across all list-level ops. + pub max_seq_in_list: u64, + /// Count of ops in the ALL_OPS journal (persisted ops replayed at startup). + pub persisted_ops_count: usize, + pub items: Vec, +} + +/// Dump the raw in-memory CRDT state for debugging. +/// +/// Unlike [`read_all_items`] this includes tombstoned (deleted) entries and +/// exposes internal op metadata (content_index, seq). Pass a `story_id` +/// filter to restrict the output to a single item. +/// +/// **This is a debug tool.** For normal pipeline introspection use +/// [`read_all_items`] or the `get_pipeline_status` MCP tool instead. +pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump { + let in_memory_state_loaded = get_crdt().is_some(); + + let persisted_ops_count = ALL_OPS + .get() + .and_then(|m| m.lock().ok().map(|v| v.len())) + .unwrap_or(0); + + let Some(state_mutex) = get_crdt() else { + return CrdtStateDump { + in_memory_state_loaded, + total_items: 0, + total_ops_in_list: 0, + max_seq_in_list: 0, + persisted_ops_count, + items: Vec::new(), + }; + }; + + let Ok(state) = state_mutex.lock() else { + return CrdtStateDump { + in_memory_state_loaded, + total_items: 0, + total_ops_in_list: 0, + max_seq_in_list: 0, + persisted_ops_count, + items: Vec::new(), + }; + }; + + let total_items = state.crdt.doc.items.iter().count(); + + let max_seq_in_list = state + .crdt + .doc + .items + .ops + .iter() + .map(|op| op.seq) + .max() + .unwrap_or(0); + + // Subtract 1 for the root sentinel. + let total_ops_in_list = state.crdt.doc.items.ops.len().saturating_sub(1); + + let mut items = Vec::new(); + for op in &state.crdt.doc.items.ops { + // Skip root sentinel (id == [0u8; 32]). + if op.id == ROOT_ID { + continue; + } + let Some(ref item_crdt) = op.content else { + // No content — skip (orphaned slot, should not happen in normal use). + continue; + }; + + let story_id = match item_crdt.story_id.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + + // Apply story_id filter before doing any further work. + if let Some(filter) = story_id_filter + && story_id.as_deref() != Some(filter) + { + continue; + } + + let stage = match item_crdt.stage.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let name = match item_crdt.name.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let agent = match item_crdt.agent.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let retry_count = match item_crdt.retry_count.view() { + JsonValue::Number(n) if n > 0.0 => Some(n as i64), + _ => None, + }; + let blocked = match item_crdt.blocked.view() { + JsonValue::Bool(b) => Some(b), + _ => None, + }; + let depends_on = match item_crdt.depends_on.view() { + JsonValue::String(s) if !s.is_empty() => serde_json::from_str::>(&s).ok(), + _ => None, + }; + + let claimed_by = match item_crdt.claimed_by.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let claimed_at = match item_crdt.claimed_at.view() { + JsonValue::Number(n) if n > 0.0 => Some(n), + _ => None, + }; + + let content_index = op.id.iter().map(|b| format!("{b:02x}")).collect::(); + + items.push(CrdtItemDump { + story_id, + stage, + name, + agent, + retry_count, + blocked, + depends_on, + claimed_by, + claimed_at, + content_index, + is_deleted: op.is_deleted, + }); + } + + CrdtStateDump { + in_memory_state_loaded, + total_items, + total_ops_in_list, + max_seq_in_list, + persisted_ops_count, + items, + } +} + +// ── Read path ──────────────────────────────────────────────────────── + +/// Read the full pipeline state from the CRDT document. +/// +/// Returns items grouped by stage, or `None` if the CRDT layer is not +/// initialised. +pub fn read_all_items() -> Option> { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + + // Only return items that appear in the deduplicated index. + // The index maps story_id → visible_index and represents the + // latest-wins view of each story. Iterating raw CRDT entries + // would return stale duplicates from earlier stage writes. + let mut items = Vec::with_capacity(state.index.len()); + for &idx in state.index.values() { + if let Some(view) = extract_item_view(&state.crdt.doc.items[idx]) { + items.push(view); + } + } + Some(items) +} + +/// Read a single pipeline item by story_id. +pub fn read_item(story_id: &str) -> Option { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + let &idx = state.index.get(story_id)?; + extract_item_view(&state.crdt.doc.items[idx]) +} + +/// Mark a story as deleted in the in-memory CRDT and persist a tombstone op. +/// +/// This is the eviction primitive for story 521 — it lets external callers +/// (e.g. the `purge_story` MCP tool, or operator scripts during incident +/// response) clear an item from the running server's in-memory state +/// without needing a full process restart. +/// +/// Specifically: +/// 1. Looks up the item's CRDT `OpId` via the visible-index map. +/// 2. Constructs a delete op via the bft-json-crdt list `delete()` primitive. +/// 3. Signs it with the local node's keypair and applies it to the in-memory +/// CRDT (marking the item `is_deleted = true` so subsequent +/// `read_all_items` / `read_item` calls skip it). +/// 4. Persists the signed delete op to `crdt_ops` via the existing +/// `apply_and_persist` channel — so the eviction survives a restart. +/// 5. Rebuilds the `story_id → visible_index` map (visible indices shift +/// when an item is marked deleted). +/// 6. Drops the in-memory content-store entry for the story so the cached +/// body doesn't outlive the CRDT entry. +/// +/// Returns `Ok(())` if the item was found and a tombstone op was queued, +/// or an `Err` if the CRDT layer isn't initialised or the story_id is +/// unknown to the in-memory state. +pub fn evict_item(story_id: &str) -> Result<(), String> { + let state_mutex = get_crdt().ok_or_else(|| "CRDT layer not initialised".to_string())?; + let mut state = state_mutex + .lock() + .map_err(|e| format!("CRDT lock poisoned: {e}"))?; + + let idx = state + .index + .get(story_id) + .copied() + .ok_or_else(|| format!("Story '{story_id}' not found in in-memory CRDT"))?; + + // Resolve the item's OpId before the closure (the closure will mutably + // borrow `state`, so we can't access `state.crdt.doc.items` from inside). + let item_id = + state.crdt.doc.items.id_at(idx).ok_or_else(|| { + format!("Item index {idx} for '{story_id}' did not resolve to an OpId") + })?; + + // Write the delete op via the existing apply_and_persist machinery. + // This signs the op, applies it to the in-memory CRDT (marking the item + // is_deleted), and sends it to the persistence task. + apply_and_persist(&mut state, |s| s.crdt.doc.items.delete(item_id)); + + // Rebuild the story_id → visible_index map; the deleted item is no + // longer counted by the iter that rebuild_index uses. + state.index = rebuild_index(&state.crdt); + + // Drop the content-store entry so the cached body doesn't outlive the + // CRDT entry. (Bug 521 follow-up: when CONTENT_STORE becomes a true + // lazy cache, this explicit eviction can go away.) + crate::db::delete_content(story_id); + + Ok(()) +} + +/// Extract a `PipelineItemView` from a `PipelineItemCrdt`. +pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option { + let story_id = match item.story_id.view() { + JsonValue::String(s) if !s.is_empty() => s, + _ => return None, + }; + let stage = match item.stage.view() { + JsonValue::String(s) if !s.is_empty() => s, + _ => return None, + }; + let name = match item.name.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let agent = match item.agent.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let retry_count = match item.retry_count.view() { + JsonValue::Number(n) if n > 0.0 => Some(n as i64), + _ => None, + }; + let blocked = match item.blocked.view() { + JsonValue::Bool(b) => Some(b), + _ => None, + }; + let depends_on = match item.depends_on.view() { + JsonValue::String(s) if !s.is_empty() => serde_json::from_str::>(&s).ok(), + _ => None, + }; + + let claimed_by = match item.claimed_by.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let claimed_at = match item.claimed_at.view() { + JsonValue::Number(n) if n > 0.0 => Some(n), + _ => None, + }; + let merged_at = match item.merged_at.view() { + JsonValue::Number(n) if n > 0.0 => Some(n), + _ => None, + }; + + Some(PipelineItemView { + story_id, + stage, + name, + agent, + retry_count, + blocked, + depends_on, + claimed_by, + claimed_at, + merged_at, + }) +} + +/// Check whether a dependency (by numeric ID prefix) is in `5_done` or `6_archived` +/// according to CRDT state. +/// +/// Returns `true` if the dependency is satisfied (item found in a done stage). +/// See `dep_is_archived_crdt` to distinguish archive-satisfied from cleanly-done. +pub fn dep_is_done_crdt(dep_number: u32) -> bool { + let prefix = format!("{dep_number}_"); + if let Some(items) = read_all_items() { + items.iter().any(|item| { + item.story_id.starts_with(&prefix) + && matches!(item.stage.as_str(), "5_done" | "6_archived") + }) + } else { + false + } +} + +/// Check whether a dependency (by numeric ID prefix) is specifically in `6_archived` +/// according to CRDT state. +/// +/// Used to detect when a dependency is satisfied via archive rather than via a clean +/// completion through `5_done`. Returns `false` when the CRDT layer is not initialised. +pub fn dep_is_archived_crdt(dep_number: u32) -> bool { + let prefix = format!("{dep_number}_"); + if let Some(items) = read_all_items() { + items + .iter() + .any(|item| item.story_id.starts_with(&prefix) && item.stage == "6_archived") + } else { + false + } +} + +/// Check unmet dependencies for a story by reading its `depends_on` from the +/// CRDT document and checking each dependency against CRDT state. +/// +/// Returns the list of dependency numbers that are NOT in `5_done` or `6_archived`. +pub fn check_unmet_deps_crdt(story_id: &str) -> Vec { + let item = match read_item(story_id) { + Some(i) => i, + None => return Vec::new(), + }; + let deps = match item.depends_on { + Some(d) => d, + None => return Vec::new(), + }; + deps.into_iter() + .filter(|&dep| !dep_is_done_crdt(dep)) + .collect() +} + +/// Return the list of dependency numbers from `story_id`'s `depends_on` that are +/// specifically in `6_archived` according to CRDT state. +/// +/// Used to emit a warning when promotion fires because a dep is archived rather than +/// cleanly completed. Returns an empty `Vec` when no deps are archived. +pub fn check_archived_deps_crdt(story_id: &str) -> Vec { + let item = match read_item(story_id) { + Some(i) => i, + None => return Vec::new(), + }; + let deps = match item.depends_on { + Some(d) => d, + None => return Vec::new(), + }; + deps.into_iter() + .filter(|&dep| dep_is_archived_crdt(dep)) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use super::super::state::init_for_test; + use super::super::types::PipelineItemCrdt; + use super::super::write::write_item; + use bft_json_crdt::op::ROOT_ID; + use super::super::state::rebuild_index; + use bft_json_crdt::json_crdt::OpState; + use bft_json_crdt::keypair::make_keypair; + use serde_json::json; + + #[test] + fn extract_item_view_parses_crdt_item() { + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + let item_json: JsonValue = json!({ + "story_id": "40_story_view", + "stage": "3_qa", + "name": "View Test", + "agent": "coder-1", + "retry_count": 2.0, + "blocked": true, + "depends_on": "[10,20]", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp); + crdt.apply(op); + + let view = extract_item_view(&crdt.doc.items[0]).unwrap(); + assert_eq!(view.story_id, "40_story_view"); + assert_eq!(view.stage, "3_qa"); + assert_eq!(view.name.as_deref(), Some("View Test")); + assert_eq!(view.agent.as_deref(), Some("coder-1")); + assert_eq!(view.retry_count, Some(2)); + assert_eq!(view.blocked, Some(true)); + assert_eq!(view.depends_on, Some(vec![10, 20])); + } + + #[test] + fn dep_is_done_crdt_returns_false_when_no_crdt_state() { + // When the global CRDT state is not initialised (or in a test environment), + // dep_is_done_crdt should return false rather than panicking. + // Note: in the test binary the global may or may not be initialised, + // but the function should never panic either way. + let _ = dep_is_done_crdt(9999); + } + + #[test] + fn check_unmet_deps_crdt_returns_empty_when_item_not_found() { + // Non-existent story should return empty deps. + let result = check_unmet_deps_crdt("nonexistent_story"); + assert!(result.is_empty()); + } + + #[test] + fn dep_is_archived_crdt_returns_false_when_no_crdt_state() { + // When the global CRDT state is not initialised, must not panic. + let _ = dep_is_archived_crdt(9998); + } + + #[test] + fn check_archived_deps_crdt_returns_empty_when_item_not_found() { + // Non-existent story should return empty archived deps. + let result = check_archived_deps_crdt("nonexistent_story_archived"); + assert!(result.is_empty()); + } +} diff --git a/server/src/crdt_state/state.rs b/server/src/crdt_state/state.rs new file mode 100644 index 00000000..4618ebe6 --- /dev/null +++ b/server/src/crdt_state/state.rs @@ -0,0 +1,535 @@ +//! Internal CRDT state struct, statics, initialisation, and central write primitive. + +use std::collections::HashMap; +use std::path::Path; +use std::sync::{Mutex, OnceLock}; + +use bft_json_crdt::json_crdt::*; +use bft_json_crdt::keypair::make_keypair; +use fastcrypto::ed25519::Ed25519KeyPair; +use fastcrypto::traits::ToFromBytes; +use serde_json::json; +use sqlx::SqlitePool; +use sqlx::sqlite::SqliteConnectOptions; +use tokio::sync::{broadcast, mpsc}; + +use super::VectorClock; +use super::hex; +use super::types::{CrdtEvent, PipelineDoc}; +use crate::slog; + +// ── Sync broadcast channels ────────────────────────────────────────── + +pub(super) static CRDT_EVENT_TX: OnceLock> = OnceLock::new(); + +pub(super) static SYNC_TX: OnceLock> = OnceLock::new(); + +/// All persisted ops as JSON strings, in causal (insertion) order. +/// +/// Pub(crate) so that `crdt_snapshot` can access it for compaction. +pub(crate) static ALL_OPS: OnceLock>> = OnceLock::new(); + +/// Live vector clock tracking op counts per author. +/// +/// Updated in lockstep with `ALL_OPS` — every time an op is appended to the +/// journal, the corresponding author's count is incremented here. This avoids +/// re-parsing all ops when a peer requests `our_vector_clock()`. +pub(crate) static VECTOR_CLOCK: OnceLock> = OnceLock::new(); + +/// Append an op's JSON to `ALL_OPS` and bump the author's count in `VECTOR_CLOCK`. +/// +/// Centralises the bookkeeping that must stay in sync between the two statics. +pub(super) fn track_op(signed: &SignedOp, json: String) { + if let Some(all) = ALL_OPS.get() + && let Ok(mut v) = all.lock() + { + v.push(json); + } + if let Some(vc) = VECTOR_CLOCK.get() + && let Ok(mut clock) = vc.lock() + { + let author_hex = super::hex::encode(&signed.author()); + *clock.entry(author_hex).or_insert(0) += 1; + } +} + +pub(super) struct CrdtState { + pub(super) crdt: BaseCrdt, + pub(super) keypair: Ed25519KeyPair, + /// Maps story_id → index in the items ListCrdt for O(1) lookup. + pub(super) index: HashMap, + /// Maps node_id (hex) → index in the nodes ListCrdt for O(1) lookup. + pub(super) node_index: HashMap, + /// Channel sender for fire-and-forget op persistence. + pub(super) persist_tx: mpsc::UnboundedSender, +} + +static CRDT_STATE: OnceLock> = OnceLock::new(); + +#[cfg(test)] +thread_local! { + static CRDT_STATE_TL: OnceLock> = const { OnceLock::new() }; +} + +#[cfg(not(test))] +pub(super) fn get_crdt() -> Option<&'static Mutex> { + CRDT_STATE.get() +} + +#[cfg(test)] +pub(super) fn get_crdt() -> Option<&'static Mutex> { + let tl = CRDT_STATE_TL.with(|lock| { + if lock.get().is_some() { + Some(lock as *const OnceLock>) + } else { + None + } + }); + if let Some(ptr) = tl { + // SAFETY: The thread-local lives as long as the thread, which outlives + // any test using it. We only need 'static for the return type. + let lock = unsafe { &*ptr }; + lock.get() + } else { + CRDT_STATE.get() + } +} + + + +/// Initialise the CRDT state layer. +/// +/// Opens the SQLite database, loads or creates a node keypair, replays any +/// persisted ops to reconstruct state, and spawns a background persistence +/// task. Safe to call only once; subsequent calls are no-ops. +pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { + if CRDT_STATE.get().is_some() { + return Ok(()); + } + + let options = SqliteConnectOptions::new() + .filename(db_path) + .create_if_missing(true); + let pool = SqlitePool::connect_with(options).await?; + sqlx::migrate!("./migrations").run(&pool).await?; + + // Load or create the node keypair. + let keypair = load_or_create_keypair(&pool).await?; + let mut crdt = BaseCrdt::::new(&keypair); + + // Replay persisted ops to reconstruct state. + let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") + .fetch_all(&pool) + .await?; + + let mut all_ops_vec = Vec::with_capacity(rows.len()); + let mut vector_clock = VectorClock::new(); + for (op_json,) in &rows { + if let Ok(signed_op) = serde_json::from_str::(op_json) { + let author_hex = hex::encode(&signed_op.author()); + *vector_clock.entry(author_hex).or_insert(0) += 1; + crdt.apply(signed_op); + all_ops_vec.push(op_json.clone()); + } else { + slog!("[crdt] Warning: failed to deserialize stored op"); + } + } + let _ = ALL_OPS.set(Mutex::new(all_ops_vec)); + let _ = VECTOR_CLOCK.set(Mutex::new(vector_clock)); + + // Build the indices from the reconstructed state. + let index = rebuild_index(&crdt); + let node_index = rebuild_node_index(&crdt); + + slog!( + "[crdt] Initialised: {} ops replayed, {} items indexed, {} nodes indexed", + rows.len(), + index.len(), + node_index.len() + ); + + // Spawn background persistence task. + let (persist_tx, mut persist_rx) = mpsc::unbounded_channel::(); + + tokio::spawn(async move { + while let Some(op) = persist_rx.recv().await { + let op_json = match serde_json::to_string(&op) { + Ok(j) => j, + Err(e) => { + slog!("[crdt] Failed to serialize op: {e}"); + continue; + } + }; + let op_id = hex::encode(&op.id()); + let seq = op.inner.seq as i64; + let now = chrono::Utc::now().to_rfc3339(); + + let result = sqlx::query( + "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \ + VALUES (?1, ?2, ?3, ?4) \ + ON CONFLICT(op_id) DO NOTHING", + ) + .bind(&op_id) + .bind(seq) + .bind(&op_json) + .bind(&now) + .execute(&pool) + .await; + + if let Err(e) = result { + slog!("[crdt] Failed to persist op {}: {e}", &op_id[..12]); + } + } + }); + + let state = CrdtState { + crdt, + keypair, + index, + node_index, + persist_tx, + }; + + let _ = CRDT_STATE.set(Mutex::new(state)); + + // Initialise the CRDT event broadcast channel. + let (event_tx, _) = broadcast::channel::(256); + let _ = CRDT_EVENT_TX.set(event_tx); + + // Initialise the sync broadcast channel for outgoing ops. + let (sync_tx, _) = broadcast::channel::(1024); + let _ = SYNC_TX.set(sync_tx); + + Ok(()) +} + +/// Initialise a minimal in-memory CRDT state for unit tests. +/// +/// This avoids the async SQLite setup from `init()`. Ops are accepted via a +/// channel whose receiver is immediately dropped, so nothing is persisted. +/// Safe to call multiple times — subsequent calls are no-ops (OnceLock). +#[cfg(test)] +pub fn init_for_test() { + // Initialise thread-local CRDT for test isolation. + // Only creates a new CRDT if one isn't set yet on this thread; + // subsequent calls are no-ops (matching the old OnceLock semantics + // while keeping each thread isolated). + CRDT_STATE_TL.with(|lock| { + if lock.get().is_none() { + let keypair = make_keypair(); + let crdt = BaseCrdt::::new(&keypair); + let (persist_tx, _rx) = mpsc::unbounded_channel(); + let state = CrdtState { + crdt, + keypair, + index: HashMap::new(), + node_index: HashMap::new(), + persist_tx, + }; + let _ = lock.set(Mutex::new(state)); + } + }); + let _ = CRDT_EVENT_TX.get_or_init(|| broadcast::channel::(256).0); + let _ = SYNC_TX.get_or_init(|| broadcast::channel::(1024).0); + let _ = ALL_OPS.get_or_init(|| Mutex::new(Vec::new())); + let _ = VECTOR_CLOCK.get_or_init(|| Mutex::new(VectorClock::new())); +} + +/// Load or create the Ed25519 keypair used by this node. +async fn load_or_create_keypair(pool: &SqlitePool) -> Result { + let row: Option<(Vec,)> = + sqlx::query_as("SELECT seed FROM crdt_node_identity WHERE id = 1") + .fetch_optional(pool) + .await?; + + if let Some((seed,)) = row { + // Reconstruct from stored seed. The seed is the 32-byte private key. + if let Ok(kp) = Ed25519KeyPair::from_bytes(&seed) { + return Ok(kp); + } + slog!("[crdt] Stored keypair invalid, regenerating"); + } + + let kp = make_keypair(); + let seed = kp.as_bytes().to_vec(); + sqlx::query("INSERT INTO crdt_node_identity (id, seed) VALUES (1, ?1) ON CONFLICT(id) DO UPDATE SET seed = excluded.seed") + .bind(&seed) + .execute(pool) + .await?; + + Ok(kp) +} + +/// Rebuild the story_id → list index mapping from the current CRDT state. +pub(super) fn rebuild_index(crdt: &BaseCrdt) -> HashMap { + let mut map = HashMap::new(); + for (i, item) in crdt.doc.items.iter().enumerate() { + if let JsonValue::String(ref sid) = item.story_id.view() { + map.insert(sid.clone(), i); + } + } + map +} + +/// Rebuild the node_id → nodes list index mapping from the current CRDT state. +pub(super) fn rebuild_node_index(crdt: &BaseCrdt) -> HashMap { + let mut map = HashMap::new(); + for (i, node) in crdt.doc.nodes.iter().enumerate() { + if let JsonValue::String(ref nid) = node.node_id.view() { + map.insert(nid.clone(), i); + } + } + map +} + +// ── Write path ─────────────────────────────────────────────────────── + +/// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the +/// persistence channel. The closure receives `&mut CrdtState` so it can +/// mutably access the CRDT document, while `sign` only needs `&keypair`. +pub(super) fn apply_and_persist(state: &mut CrdtState, op_fn: F) +where + F: FnOnce(&mut CrdtState) -> bft_json_crdt::op::Op, +{ + let raw_op = op_fn(state); + let signed = raw_op.sign(&state.keypair); + state.crdt.apply(signed.clone()); + if let Err(e) = state.persist_tx.send(signed.clone()) { + crate::slog_error!( + "[crdt] Failed to send op to persist task: {e}; persist task may be dead. \ + In-memory state is now ahead of persisted state." + ); + } + + // Track in ALL_OPS + VECTOR_CLOCK, then broadcast to sync peers. + if let Ok(json) = serde_json::to_string(&signed) { + track_op(&signed, json); + } + if let Some(tx) = SYNC_TX.get() { + let _ = tx.send(signed); + } +} + +/// Broadcast a CRDT event to all subscribers. +pub(super) fn emit_event(event: CrdtEvent) { + if let Some(tx) = CRDT_EVENT_TX.get() { + let _ = tx.send(event); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use super::super::types::PipelineItemCrdt; + use super::super::write::write_item; + use super::super::read::{extract_item_view, read_item}; + use bft_json_crdt::json_crdt::OpState; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use super::super::hex; + use serde_json::json; + + #[test] + fn crdt_ops_replay_reconstructs_state() { + let kp = make_keypair(); + let mut crdt1 = BaseCrdt::::new(&kp); + + // Build state with a series of ops. + let item_json: JsonValue = json!({ + "story_id": "30_story_replay", + "stage": "1_backlog", + "name": "Replay Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + let op1 = crdt1.doc.items.insert(ROOT_ID, item_json).sign(&kp); + crdt1.apply(op1.clone()); + + let op2 = crdt1.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp); + crdt1.apply(op2.clone()); + + let op3 = crdt1.doc.items[0] + .name + .set("Updated Name".to_string()) + .sign(&kp); + crdt1.apply(op3.clone()); + + // Replay ops on a fresh CRDT. + let mut crdt2 = BaseCrdt::::new(&kp); + crdt2.apply(op1); + crdt2.apply(op2); + crdt2.apply(op3); + + assert_eq!( + crdt1.doc.items[0].stage.view(), + crdt2.doc.items[0].stage.view() + ); + assert_eq!( + crdt1.doc.items[0].name.view(), + crdt2.doc.items[0].name.view() + ); + } + + #[test] + fn rebuild_index_maps_story_ids() { + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + for (sid, stage) in &[("10_story_a", "1_backlog"), ("20_story_b", "2_current")] { + let item: JsonValue = json!({ + "story_id": sid, + "stage": stage, + "name": "", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt.apply(op); + } + + let index = rebuild_index(&crdt); + assert_eq!(index.len(), 2); + assert!(index.contains_key("10_story_a")); + assert!(index.contains_key("20_story_b")); + } + + #[tokio::test] + async fn init_and_write_read_roundtrip() { + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("crdt_test.db"); + + // Init directly (not via the global singleton, for test isolation). + let options = SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = SqlitePool::connect_with(options).await.unwrap(); + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + + let keypair = make_keypair(); + let mut crdt = BaseCrdt::::new(&keypair); + + // Insert and update like write_item does. + let item_json: JsonValue = json!({ + "story_id": "50_story_roundtrip", + "stage": "1_backlog", + "name": "Roundtrip", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&keypair); + crdt.apply(insert_op.clone()); + + // Persist the op. + let op_json = serde_json::to_string(&insert_op).unwrap(); + let op_id = hex::encode(&insert_op.id()); + let now = chrono::Utc::now().to_rfc3339(); + sqlx::query( + "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", + ) + .bind(&op_id) + .bind(insert_op.inner.seq as i64) + .bind(&op_json) + .bind(&now) + .execute(&pool) + .await + .unwrap(); + + // Reconstruct from DB. + let rows: Vec<(String,)> = + sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") + .fetch_all(&pool) + .await + .unwrap(); + + let mut crdt2 = BaseCrdt::::new(&keypair); + for (json_str,) in &rows { + let op: SignedOp = serde_json::from_str(json_str).unwrap(); + crdt2.apply(op); + } + + let view = extract_item_view(&crdt2.doc.items[0]).unwrap(); + assert_eq!(view.story_id, "50_story_roundtrip"); + assert_eq!(view.stage, "1_backlog"); + assert_eq!(view.name.as_deref(), Some("Roundtrip")); + } + + #[test] + fn persist_tx_send_failure_logs_error() { + let kp = make_keypair(); + let crdt = BaseCrdt::::new(&kp); + let (persist_tx, persist_rx) = mpsc::unbounded_channel::(); + + let mut state = CrdtState { + crdt, + keypair: kp, + index: HashMap::new(), + node_index: HashMap::new(), + persist_tx, + }; + + // Drop the receiver so that the next send fails immediately. + drop(persist_rx); + + let item_json: JsonValue = json!({ + "story_id": "518_story_persist_fail", + "stage": "1_backlog", + "name": "Persist Fail Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + let before_errors = crate::log_buffer::global() + .get_recent_entries(1000, None, Some(&crate::log_buffer::LogLevel::Error)) + .len(); + + apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); + + let error_entries = crate::log_buffer::global().get_recent_entries( + 1000, + None, + Some(&crate::log_buffer::LogLevel::Error), + ); + + assert!( + error_entries.len() > before_errors, + "expected an ERROR log entry when persist_tx send fails, but none was added" + ); + + let last_error = &error_entries[error_entries.len() - 1]; + assert!( + last_error.message.contains("persist"), + "error message should mention persist: {}", + last_error.message + ); + assert!( + last_error.message.contains("ahead") || last_error.message.contains("diverged"), + "error message should note in-memory/persisted divergence: {}", + last_error.message + ); + } +} diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs new file mode 100644 index 00000000..3c48e4d5 --- /dev/null +++ b/server/src/crdt_state/types.rs @@ -0,0 +1,249 @@ +//! CRDT document types, read-side view types, and CRDT-state events. + +use bft_json_crdt::json_crdt::*; +use bft_json_crdt::list_crdt::ListCrdt; +use bft_json_crdt::lww_crdt::LwwRegisterCrdt; +use std::sync::OnceLock; +use tokio::sync::broadcast; + + +/// An event emitted when a pipeline item's stage changes in the CRDT document. +#[derive(Clone, Debug)] +pub struct CrdtEvent { + /// Work item ID (e.g. `"42_story_my_feature"`). + pub story_id: String, + /// The stage the item was in before this transition, or `None` for new items. + pub from_stage: Option, + /// The stage the item is now in. + pub to_stage: String, + /// Human-readable story name from the CRDT document. + pub name: Option, +} + +/// Subscribe to CRDT state transition events. +/// +/// Returns `None` if the CRDT layer has not been initialised yet. +pub fn subscribe() -> Option> { + CRDT_EVENT_TX.get().map(|tx| tx.subscribe()) +} + +static CRDT_EVENT_TX: OnceLock> = OnceLock::new(); + +// ── CRDT document types ────────────────────────────────────────────── + +#[add_crdt_fields] +#[derive(Clone, CrdtNode, Debug)] +pub struct PipelineDoc { + pub items: ListCrdt, + pub nodes: ListCrdt, +} + +#[add_crdt_fields] +#[derive(Clone, CrdtNode, Debug)] +pub struct PipelineItemCrdt { + pub story_id: LwwRegisterCrdt, + pub stage: LwwRegisterCrdt, + pub name: LwwRegisterCrdt, + pub agent: LwwRegisterCrdt, + pub retry_count: LwwRegisterCrdt, + pub blocked: LwwRegisterCrdt, + pub depends_on: LwwRegisterCrdt, + /// Node ID (hex-encoded Ed25519 pubkey) of the node that claimed this item. + /// Used for distributed work claiming — the LWW register resolves conflicts + /// deterministically so all nodes converge on the same claimer. + pub claimed_by: LwwRegisterCrdt, + /// Unix timestamp (seconds) when the claim was written. + /// Used for timeout-based reclaim: if a node crashes, other nodes can + /// reclaim the item after the timeout expires. + pub claimed_at: LwwRegisterCrdt, + /// Unix timestamp (seconds) when the item was merged to master. + /// Written once when the item transitions to `5_done`. Used by the + /// sweep loop to determine when to promote to `6_archived`. + pub merged_at: LwwRegisterCrdt, +} + +/// CRDT node that holds a single peer's presence entry. +#[add_crdt_fields] +#[derive(Clone, CrdtNode, Debug)] +pub struct NodePresenceCrdt { + /// Hex-encoded Ed25519 public key — stable identity across restarts. + pub node_id: LwwRegisterCrdt, + /// WebSocket URL this peer advertises, e.g. `ws://192.168.1.10:3001/crdt-sync`. + pub address: LwwRegisterCrdt, + /// Unix timestamp (seconds) of the last heartbeat written by this node. + pub last_seen: LwwRegisterCrdt, + /// `false` once a stale-detection pass has tombstoned this node. + pub alive: LwwRegisterCrdt, +} + +// ── Read-side view types ───────────────────────────────────────────── + +/// A snapshot of a single pipeline item derived from the CRDT document. +#[derive(Clone, Debug)] +pub struct PipelineItemView { + pub story_id: String, + pub stage: String, + pub name: Option, + pub agent: Option, + pub retry_count: Option, + pub blocked: Option, + pub depends_on: Option>, + /// Node ID of the node that claimed this item (hex-encoded Ed25519 pubkey). + pub claimed_by: Option, + /// Unix timestamp when the item was claimed. + pub claimed_at: Option, + /// Unix timestamp (seconds) when the item was merged to master. + /// `None` for items that were never in `5_done` or for legacy items. + pub merged_at: Option, +} + +/// A snapshot of a single node presence entry derived from the CRDT document. +#[derive(Clone, Debug)] +pub struct NodePresenceView { + pub node_id: String, + pub address: String, + /// Unix timestamp (seconds). + pub last_seen: f64, + pub alive: bool, +} + +#[cfg(test)] +mod tests { + use super::*; + use super::super::state::init_for_test; + use super::super::write::write_item; + use bft_json_crdt::json_crdt::OpState; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use super::super::state::emit_event; + use serde_json::json; + + #[test] + fn crdt_doc_insert_and_view() { + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + let item_json: JsonValue = json!({ + "story_id": "10_story_test", + "stage": "2_current", + "name": "Test Story", + "agent": "coder-opus", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp); + assert_eq!(crdt.apply(op), OpState::Ok); + + let view = crdt.doc.items.view(); + assert_eq!(view.len(), 1); + + let item = &crdt.doc.items[0]; + assert_eq!( + item.story_id.view(), + JsonValue::String("10_story_test".to_string()) + ); + assert_eq!( + item.stage.view(), + JsonValue::String("2_current".to_string()) + ); + } + + #[test] + fn crdt_doc_update_stage() { + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + let item_json: JsonValue = json!({ + "story_id": "20_story_move", + "stage": "1_backlog", + "name": "Move Me", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp); + crdt.apply(insert_op); + + // Update stage + let stage_op = crdt.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp); + crdt.apply(stage_op); + + assert_eq!( + crdt.doc.items[0].stage.view(), + JsonValue::String("2_current".to_string()) + ); + } + + #[test] + fn crdt_event_has_expected_fields() { + let evt = CrdtEvent { + story_id: "42_story_foo".to_string(), + from_stage: Some("1_backlog".to_string()), + to_stage: "2_current".to_string(), + name: Some("Foo Feature".to_string()), + }; + assert_eq!(evt.story_id, "42_story_foo"); + assert_eq!(evt.from_stage.as_deref(), Some("1_backlog")); + assert_eq!(evt.to_stage, "2_current"); + assert_eq!(evt.name.as_deref(), Some("Foo Feature")); + } + + #[test] + fn crdt_event_clone_preserves_data() { + let evt = CrdtEvent { + story_id: "10_story_bar".to_string(), + from_stage: None, + to_stage: "1_backlog".to_string(), + name: None, + }; + let cloned = evt.clone(); + assert_eq!(cloned.story_id, "10_story_bar"); + assert!(cloned.from_stage.is_none()); + assert!(cloned.name.is_none()); + } + + #[test] + fn emit_event_is_noop_when_channel_not_initialised() { + // Before CRDT_EVENT_TX is set, emit_event should not panic. + // This test verifies the guard clause works. In test binaries the + // OnceLock may already be set by another test, so we just verify + // the function doesn't panic regardless. + emit_event(CrdtEvent { + story_id: "99_story_noop".to_string(), + from_stage: None, + to_stage: "1_backlog".to_string(), + name: None, + }); + } + + #[test] + fn crdt_event_broadcast_channel_round_trip() { + let (tx, mut rx) = broadcast::channel::(16); + let evt = CrdtEvent { + story_id: "70_story_broadcast".to_string(), + from_stage: Some("1_backlog".to_string()), + to_stage: "2_current".to_string(), + name: Some("Broadcast Test".to_string()), + }; + tx.send(evt).unwrap(); + + let received = rx.try_recv().unwrap(); + assert_eq!(received.story_id, "70_story_broadcast"); + assert_eq!(received.from_stage.as_deref(), Some("1_backlog")); + assert_eq!(received.to_stage, "2_current"); + assert_eq!(received.name.as_deref(), Some("Broadcast Test")); + } +} diff --git a/server/src/crdt_state/write.rs b/server/src/crdt_state/write.rs new file mode 100644 index 00000000..434e2ff1 --- /dev/null +++ b/server/src/crdt_state/write.rs @@ -0,0 +1,280 @@ +//! High-level write API for pipeline items. + +use bft_json_crdt::json_crdt::*; +use bft_json_crdt::op::ROOT_ID; +use bft_json_crdt::lww_crdt::LwwRegisterCrdt; +use serde_json::json; + +use super::state::{apply_and_persist, emit_event, get_crdt, rebuild_index}; +use super::types::{CrdtEvent, PipelineDoc, PipelineItemCrdt}; +use crate::slog; + +/// Write a pipeline item state through CRDT operations. +/// +/// If the item exists, updates its registers. If not, inserts a new item +/// into the list. All ops are signed and persisted to SQLite. +/// +/// When the stage changes (or a new item is created), a [`CrdtEvent`] is +/// broadcast so subscribers can react to the transition. +#[allow(clippy::too_many_arguments)] +pub fn write_item( + story_id: &str, + stage: &str, + name: Option<&str>, + agent: Option<&str>, + retry_count: Option, + blocked: Option, + depends_on: Option<&str>, + claimed_by: Option<&str>, + claimed_at: Option, + merged_at: Option, +) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + + if let Some(&idx) = state.index.get(story_id) { + // Capture the old stage before updating so we can detect transitions. + let old_stage = match state.crdt.doc.items[idx].stage.view() { + JsonValue::String(s) => Some(s), + _ => None, + }; + + // Update existing item registers. + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].stage.set(stage.to_string()) + }); + + if let Some(n) = name { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].name.set(n.to_string()) + }); + } + if let Some(a) = agent { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].agent.set(a.to_string()) + }); + } + if let Some(rc) = retry_count { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].retry_count.set(rc as f64) + }); + } + if let Some(b) = blocked { + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].blocked.set(b)); + } + if let Some(d) = depends_on { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].depends_on.set(d.to_string()) + }); + } + if let Some(cb) = claimed_by { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].claimed_by.set(cb.to_string()) + }); + } + if let Some(ca) = claimed_at { + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(ca)); + } + if let Some(ma) = merged_at { + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma)); + } + + // Broadcast a CrdtEvent if the stage actually changed. + let stage_changed = old_stage.as_deref() != Some(stage); + if stage_changed { + // Read the current name from the CRDT document for the event. + let current_name = match state.crdt.doc.items[idx].name.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + emit_event(CrdtEvent { + story_id: story_id.to_string(), + from_stage: old_stage, + to_stage: stage.to_string(), + name: current_name, + }); + } + } else { + // Insert new item. + let item_json: JsonValue = json!({ + "story_id": story_id, + "stage": stage, + "name": name.unwrap_or(""), + "agent": agent.unwrap_or(""), + "retry_count": retry_count.unwrap_or(0) as f64, + "blocked": blocked.unwrap_or(false), + "depends_on": depends_on.unwrap_or(""), + "claimed_by": claimed_by.unwrap_or(""), + "claimed_at": claimed_at.unwrap_or(0.0), + "merged_at": merged_at.unwrap_or(0.0), + }) + .into(); + + apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); + + // Rebuild index after insertion (indices may shift). + state.index = rebuild_index(&state.crdt); + + // Broadcast a CrdtEvent for the new item. + emit_event(CrdtEvent { + story_id: story_id.to_string(), + from_stage: None, + to_stage: stage.to_string(), + name: name.map(String::from), + }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use super::super::state::init_for_test; + use super::super::read::read_item; + use bft_json_crdt::json_crdt::OpState; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use sqlx::SqlitePool; + use sqlx::sqlite::SqliteConnectOptions; + use super::super::hex; + use super::super::state::rebuild_index; + use super::super::read::extract_item_view; + use serde_json::json; + + #[tokio::test] + async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("bug511.db"); + + let options = SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = SqlitePool::connect_with(options).await.unwrap(); + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + // Insert 5 dummy items to advance items.our_seq to 5. + for i in 0..5u32 { + let sid = format!("{}_story_warmup", i); + let item: JsonValue = json!({ + "story_id": sid, + "stage": "1_backlog", + "name": "", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt.apply(op.clone()); + // We don't persist these to the DB — they are pre-history. + } + + // Now insert the real item. items.our_seq was 5, so this op gets seq=6. + let target_item: JsonValue = json!({ + "story_id": "511_story_target", + "stage": "1_backlog", + "name": "Bug 511 target", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp); + crdt.apply(insert_op.clone()); + // insert_op.inner.seq == 6 + + // Now update the stage. The stage LwwRegisterCrdt for this item starts + // at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6. + let idx = rebuild_index(&crdt)["511_story_target"]; + let stage_op = crdt.doc.items[idx] + .stage + .set("2_current".to_string()) + .sign(&kp); + crdt.apply(stage_op.clone()); + // stage_op.inner.seq == 1 + + // Persist BOTH ops in causal order (insert first, update second). + // This means insert_op gets rowid < stage_op rowid. + let now = chrono::Utc::now().to_rfc3339(); + for op in [&insert_op, &stage_op] { + let op_json = serde_json::to_string(op).unwrap(); + let op_id = hex::encode(&op.id()); + sqlx::query( + "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", + ) + .bind(&op_id) + .bind(op.inner.seq as i64) + .bind(&op_json) + .bind(&now) + .execute(&pool) + .await + .unwrap(); + } + + // Replay by rowid ASC (the fix). The insert must come before the field + // update regardless of their field-level seq values. + let rows: Vec<(String,)> = + sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") + .fetch_all(&pool) + .await + .unwrap(); + + let mut crdt2 = BaseCrdt::::new(&kp); + for (json_str,) in &rows { + let op: SignedOp = serde_json::from_str(json_str).unwrap(); + crdt2.apply(op); + } + + // The item must be in the CRDT and must reflect the stage update. + let index2 = rebuild_index(&crdt2); + assert!( + index2.contains_key("511_story_target"), + "item not found after rowid-order replay" + ); + let idx2 = index2["511_story_target"]; + let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap(); + assert_eq!( + view.stage, "2_current", + "stage field update lost during replay (bug 511 regression)" + ); + + // Confirm the bug is reproducible by replaying seq ASC instead. + // With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6), + // fails ErrPathMismatch, and the item ends up at "1_backlog". + let rows_wrong_order: Vec<(String,)> = + sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC") + .fetch_all(&pool) + .await + .unwrap(); + + let mut crdt3 = BaseCrdt::::new(&kp); + for (json_str,) in &rows_wrong_order { + let op: SignedOp = serde_json::from_str(json_str).unwrap(); + crdt3.apply(op); + } + + let index3 = rebuild_index(&crdt3); + // With seq ASC replay, the item is created (insert_op eventually runs) + // but the stage update is lost (it ran before the item existed). + if let Some(idx3) = index3.get("511_story_target") { + let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap(); + // The bug: stage is still "1_backlog" because the update was dropped. + assert_eq!( + view3.stage, "1_backlog", + "expected seq-ASC replay to exhibit the bug (update lost)" + ); + } + } +}