Files
huskies/server/src/crdt_state.rs
T
Timmy 995576358f fix(511): replay CRDT ops by rowid ASC instead of seq ASC
The CRDT lamport seq is per-author and per-field, not globally
monotonic. Replaying by `seq ASC` causes field-update ops (which
have low per-field seq counters like 1, 2, 3) to be applied
BEFORE the list-insert ops they reference (which have higher
per-list seq counters like N for the Nth item ever inserted).
The field updates fail with ErrPathMismatch because the target
item doesn't exist yet, the field counter is never advanced,
and subsequent writes silently lose state.

Concretely on 2026-04-09 we observed: post-restart writes were
being persisted at seq=1,2,3,4,5,6,7 even though pre-restart
seq had reached 492. On the next replay, those low-seq field
updates would be applied before their seq=485+ creation ops,
silently dropping the updates. This was the load-bearing
"why does state keep flapping" bug today.

Fix: replay by `rowid ASC` (SQLite insertion order) instead.
Rowid preserves the causal order ops were originally applied
in, so field updates always come after the item insert they
reference.

Adds a regression test that constructs the exact scenario:
inserts a story (op gets seq=6), updates its stage (op gets
seq=1 because field counter starts at 0), persists both ops
in causal order, then replays both seq ASC (reproduces the
bug — stage update is lost) and rowid ASC (the fix — stage
update is preserved).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 21:02:01 +01:00

1176 lines
41 KiB
Rust

/// CRDT state layer for pipeline state, backed by SQLite.
///
/// The CRDT document is the primary source of truth for pipeline item
/// metadata (stage, name, agent, etc.). CRDT ops are persisted to SQLite so
/// state survives restarts. The filesystem `.huskies/work/` directories are
/// still updated as a secondary output for backwards compatibility.
///
/// Stage transitions detected by `write_item()` are broadcast as [`CrdtEvent`]s
/// so subscribers (auto-assign, WebSocket, notifications) can react without
/// polling the filesystem.
use std::collections::HashMap;
use std::sync::{Mutex, OnceLock};
use bft_json_crdt::json_crdt::*;
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::list_crdt::ListCrdt;
use bft_json_crdt::lww_crdt::LwwRegisterCrdt;
use bft_json_crdt::op::ROOT_ID;
use fastcrypto::ed25519::Ed25519KeyPair;
use fastcrypto::traits::ToFromBytes;
use serde_json::json;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use std::path::Path;
use tokio::sync::{broadcast, mpsc};
use crate::slog;
// ── CRDT events ─────────────────────────────────────────────────────
/// An event emitted when a pipeline item's stage changes in the CRDT document.
#[derive(Clone, Debug)]
pub struct CrdtEvent {
/// Work item ID (e.g. `"42_story_my_feature"`).
pub story_id: String,
/// The stage the item was in before this transition, or `None` for new items.
pub from_stage: Option<String>,
/// The stage the item is now in.
pub to_stage: String,
/// Human-readable story name from the CRDT document.
pub name: Option<String>,
}
/// Subscribe to CRDT state transition events.
///
/// Returns `None` if the CRDT layer has not been initialised yet.
pub fn subscribe() -> Option<broadcast::Receiver<CrdtEvent>> {
CRDT_EVENT_TX.get().map(|tx| tx.subscribe())
}
static CRDT_EVENT_TX: OnceLock<broadcast::Sender<CrdtEvent>> = OnceLock::new();
// ── Sync broadcast (outgoing ops to peers) ──────────────────────────
static SYNC_TX: OnceLock<broadcast::Sender<SignedOp>> = OnceLock::new();
/// Subscribe to locally-created CRDT ops for sync replication.
///
/// Each `SignedOp` broadcast here was created by *this* node and should be
/// forwarded to connected peers. Returns `None` before `init()`.
pub fn subscribe_ops() -> Option<broadcast::Receiver<SignedOp>> {
SYNC_TX.get().map(|tx| tx.subscribe())
}
/// Return all persisted `SignedOp`s in causal order (oldest first).
///
/// Used during initial sync handshake so a newly-connected peer can
/// reconstruct the full CRDT state. Returns `None` before `init()`.
pub fn all_ops_json() -> Option<Vec<String>> {
ALL_OPS.get().map(|m| m.lock().unwrap().clone())
}
static ALL_OPS: OnceLock<Mutex<Vec<String>>> = OnceLock::new();
// ── CRDT document types ──────────────────────────────────────────────
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Debug)]
pub struct PipelineDoc {
pub items: ListCrdt<PipelineItemCrdt>,
}
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Debug)]
pub struct PipelineItemCrdt {
pub story_id: LwwRegisterCrdt<String>,
pub stage: LwwRegisterCrdt<String>,
pub name: LwwRegisterCrdt<String>,
pub agent: LwwRegisterCrdt<String>,
pub retry_count: LwwRegisterCrdt<f64>,
pub blocked: LwwRegisterCrdt<bool>,
pub depends_on: LwwRegisterCrdt<String>,
}
// ── Read-side view types ─────────────────────────────────────────────
/// A snapshot of a single pipeline item derived from the CRDT document.
#[derive(Clone, Debug)]
pub struct PipelineItemView {
pub story_id: String,
pub stage: String,
pub name: Option<String>,
pub agent: Option<String>,
pub retry_count: Option<i64>,
pub blocked: Option<bool>,
pub depends_on: Option<Vec<u32>>,
}
// ── Internal state ───────────────────────────────────────────────────
struct CrdtState {
crdt: BaseCrdt<PipelineDoc>,
keypair: Ed25519KeyPair,
/// Maps story_id → index in the ListCrdt for O(1) lookup.
index: HashMap<String, usize>,
/// Channel sender for fire-and-forget op persistence.
persist_tx: mpsc::UnboundedSender<SignedOp>,
}
static CRDT_STATE: OnceLock<Mutex<CrdtState>> = OnceLock::new();
// ── Initialisation ───────────────────────────────────────────────────
/// Initialise the CRDT state layer.
///
/// Opens the SQLite database, loads or creates a node keypair, replays any
/// persisted ops to reconstruct state, and spawns a background persistence
/// task. Safe to call only once; subsequent calls are no-ops.
pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
if CRDT_STATE.get().is_some() {
return Ok(());
}
let options = SqliteConnectOptions::new()
.filename(db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await?;
sqlx::migrate!("./migrations").run(&pool).await?;
// Load or create the node keypair.
let keypair = load_or_create_keypair(&pool).await?;
let mut crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
// Replay persisted ops to reconstruct state.
let rows: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
.fetch_all(&pool)
.await?;
let mut all_ops_vec = Vec::with_capacity(rows.len());
for (op_json,) in &rows {
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json) {
crdt.apply(signed_op);
all_ops_vec.push(op_json.clone());
} else {
slog!("[crdt] Warning: failed to deserialize stored op");
}
}
let _ = ALL_OPS.set(Mutex::new(all_ops_vec));
// Build the index from the reconstructed state.
let index = rebuild_index(&crdt);
slog!(
"[crdt] Initialised: {} ops replayed, {} items indexed",
rows.len(),
index.len()
);
// Spawn background persistence task.
let (persist_tx, mut persist_rx) = mpsc::unbounded_channel::<SignedOp>();
tokio::spawn(async move {
while let Some(op) = persist_rx.recv().await {
let op_json = match serde_json::to_string(&op) {
Ok(j) => j,
Err(e) => {
slog!("[crdt] Failed to serialize op: {e}");
continue;
}
};
let op_id = hex::encode(&op.id());
let seq = op.inner.seq as i64;
let now = chrono::Utc::now().to_rfc3339();
let result = sqlx::query(
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \
VALUES (?1, ?2, ?3, ?4) \
ON CONFLICT(op_id) DO NOTHING",
)
.bind(&op_id)
.bind(seq)
.bind(&op_json)
.bind(&now)
.execute(&pool)
.await;
if let Err(e) = result {
slog!("[crdt] Failed to persist op {}: {e}", &op_id[..12]);
}
}
});
let state = CrdtState {
crdt,
keypair,
index,
persist_tx,
};
let _ = CRDT_STATE.set(Mutex::new(state));
// Initialise the CRDT event broadcast channel.
let (event_tx, _) = broadcast::channel::<CrdtEvent>(256);
let _ = CRDT_EVENT_TX.set(event_tx);
// Initialise the sync broadcast channel for outgoing ops.
let (sync_tx, _) = broadcast::channel::<SignedOp>(1024);
let _ = SYNC_TX.set(sync_tx);
Ok(())
}
/// Load or create the Ed25519 keypair used by this node.
async fn load_or_create_keypair(pool: &SqlitePool) -> Result<Ed25519KeyPair, sqlx::Error> {
let row: Option<(Vec<u8>,)> =
sqlx::query_as("SELECT seed FROM crdt_node_identity WHERE id = 1")
.fetch_optional(pool)
.await?;
if let Some((seed,)) = row {
// Reconstruct from stored seed. The seed is the 32-byte private key.
if let Ok(kp) = Ed25519KeyPair::from_bytes(&seed) {
return Ok(kp);
}
slog!("[crdt] Stored keypair invalid, regenerating");
}
let kp = make_keypair();
let seed = kp.as_bytes().to_vec();
sqlx::query("INSERT INTO crdt_node_identity (id, seed) VALUES (1, ?1) ON CONFLICT(id) DO UPDATE SET seed = excluded.seed")
.bind(&seed)
.execute(pool)
.await?;
Ok(kp)
}
/// Rebuild the story_id → list index mapping from the current CRDT state.
fn rebuild_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
let mut map = HashMap::new();
for (i, item) in crdt.doc.items.iter().enumerate() {
if let JsonValue::String(ref sid) = item.story_id.view() {
map.insert(sid.clone(), i);
}
}
map
}
// ── Write path ───────────────────────────────────────────────────────
/// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the
/// persistence channel. The closure receives `&mut CrdtState` so it can
/// mutably access the CRDT document, while `sign` only needs `&keypair`.
fn apply_and_persist<F>(state: &mut CrdtState, op_fn: F)
where
F: FnOnce(&mut CrdtState) -> bft_json_crdt::op::Op<JsonValue>,
{
let raw_op = op_fn(state);
let signed = raw_op.sign(&state.keypair);
state.crdt.apply(signed.clone());
let _ = state.persist_tx.send(signed.clone());
// Track in ALL_OPS and broadcast to sync peers.
if let Ok(json) = serde_json::to_string(&signed)
&& let Some(all) = ALL_OPS.get()
&& let Ok(mut v) = all.lock()
{
v.push(json);
}
if let Some(tx) = SYNC_TX.get() {
let _ = tx.send(signed);
}
}
/// Write a pipeline item state through CRDT operations.
///
/// If the item exists, updates its registers. If not, inserts a new item
/// into the list. All ops are signed and persisted to SQLite.
///
/// When the stage changes (or a new item is created), a [`CrdtEvent`] is
/// broadcast so subscribers can react to the transition.
pub fn write_item(
story_id: &str,
stage: &str,
name: Option<&str>,
agent: Option<&str>,
retry_count: Option<i64>,
blocked: Option<bool>,
depends_on: Option<&str>,
) {
let Some(state_mutex) = CRDT_STATE.get() else {
return;
};
let Ok(mut state) = state_mutex.lock() else {
return;
};
if let Some(&idx) = state.index.get(story_id) {
// Capture the old stage before updating so we can detect transitions.
let old_stage = match state.crdt.doc.items[idx].stage.view() {
JsonValue::String(s) => Some(s),
_ => None,
};
// Update existing item registers.
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].stage.set(stage.to_string())
});
if let Some(n) = name {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].name.set(n.to_string())
});
}
if let Some(a) = agent {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].agent.set(a.to_string())
});
}
if let Some(rc) = retry_count {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].retry_count.set(rc as f64)
});
}
if let Some(b) = blocked {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].blocked.set(b)
});
}
if let Some(d) = depends_on {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].depends_on.set(d.to_string())
});
}
// Broadcast a CrdtEvent if the stage actually changed.
let stage_changed = old_stage.as_deref() != Some(stage);
if stage_changed {
// Read the current name from the CRDT document for the event.
let current_name = match state.crdt.doc.items[idx].name.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
emit_event(CrdtEvent {
story_id: story_id.to_string(),
from_stage: old_stage,
to_stage: stage.to_string(),
name: current_name,
});
}
} else {
// Insert new item.
let item_json: JsonValue = json!({
"story_id": story_id,
"stage": stage,
"name": name.unwrap_or(""),
"agent": agent.unwrap_or(""),
"retry_count": retry_count.unwrap_or(0) as f64,
"blocked": blocked.unwrap_or(false),
"depends_on": depends_on.unwrap_or(""),
})
.into();
apply_and_persist(&mut state, |s| {
s.crdt.doc.items.insert(ROOT_ID, item_json)
});
// Rebuild index after insertion (indices may shift).
state.index = rebuild_index(&state.crdt);
// Broadcast a CrdtEvent for the new item.
emit_event(CrdtEvent {
story_id: story_id.to_string(),
from_stage: None,
to_stage: stage.to_string(),
name: name.map(String::from),
});
}
}
/// Broadcast a CRDT event to all subscribers.
fn emit_event(event: CrdtEvent) {
if let Some(tx) = CRDT_EVENT_TX.get() {
let _ = tx.send(event);
}
}
// ── Remote op ingestion (from sync peers) ───────────────────────────
/// Apply a `SignedOp` received from a remote peer.
///
/// The op is validated, applied to the local CRDT, persisted to SQLite,
/// and any resulting stage transitions are broadcast as [`CrdtEvent`]s.
/// Unlike `apply_and_persist`, this does **not** re-broadcast the op on
/// the sync channel (to avoid infinite echo loops).
///
/// Returns `true` if the op was new and applied, `false` if it was a
/// duplicate or failed validation.
pub fn apply_remote_op(op: SignedOp) -> bool {
let Some(state_mutex) = CRDT_STATE.get() else {
return false;
};
let Ok(mut state) = state_mutex.lock() else {
return false;
};
// Snapshot stage state before applying so we can detect transitions.
let pre_stages: HashMap<String, String> = state
.index
.iter()
.filter_map(|(sid, &idx)| {
match state.crdt.doc.items[idx].stage.view() {
JsonValue::String(s) => Some((sid.clone(), s)),
_ => None,
}
})
.collect();
let result = state.crdt.apply(op.clone());
if result != bft_json_crdt::json_crdt::OpState::Ok
&& result != bft_json_crdt::json_crdt::OpState::MissingCausalDependencies
{
return false;
}
// Persist the op (fire-and-forget).
let _ = state.persist_tx.send(op.clone());
// Track in ALL_OPS.
if let Ok(json) = serde_json::to_string(&op)
&& let Some(all) = ALL_OPS.get()
&& let Ok(mut v) = all.lock()
{
v.push(json);
}
// Rebuild index (new items may have been inserted).
state.index = rebuild_index(&state.crdt);
// Detect and broadcast stage transitions.
for (sid, &idx) in &state.index {
let new_stage = match state.crdt.doc.items[idx].stage.view() {
JsonValue::String(s) => s,
_ => continue,
};
let old_stage = pre_stages.get(sid).cloned();
let changed = old_stage.as_deref() != Some(&new_stage);
if changed {
let name = match state.crdt.doc.items[idx].name.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
emit_event(CrdtEvent {
story_id: sid.clone(),
from_stage: old_stage,
to_stage: new_stage,
name,
});
}
}
true
}
// ── Read path ────────────────────────────────────────────────────────
/// Read the full pipeline state from the CRDT document.
///
/// Returns items grouped by stage, or `None` if the CRDT layer is not
/// initialised.
pub fn read_all_items() -> Option<Vec<PipelineItemView>> {
let state_mutex = CRDT_STATE.get()?;
let state = state_mutex.lock().ok()?;
let mut items = Vec::new();
for item_crdt in state.crdt.doc.items.iter() {
if let Some(view) = extract_item_view(item_crdt) {
items.push(view);
}
}
Some(items)
}
/// Read a single pipeline item by story_id.
pub fn read_item(story_id: &str) -> Option<PipelineItemView> {
let state_mutex = CRDT_STATE.get()?;
let state = state_mutex.lock().ok()?;
let &idx = state.index.get(story_id)?;
extract_item_view(&state.crdt.doc.items[idx])
}
/// Extract a `PipelineItemView` from a `PipelineItemCrdt`.
fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemView> {
let story_id = match item.story_id.view() {
JsonValue::String(s) if !s.is_empty() => s,
_ => return None,
};
let stage = match item.stage.view() {
JsonValue::String(s) if !s.is_empty() => s,
_ => return None,
};
let name = match item.name.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let agent = match item.agent.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let retry_count = match item.retry_count.view() {
JsonValue::Number(n) if n > 0.0 => Some(n as i64),
_ => None,
};
let blocked = match item.blocked.view() {
JsonValue::Bool(b) => Some(b),
_ => None,
};
let depends_on = match item.depends_on.view() {
JsonValue::String(s) if !s.is_empty() => {
serde_json::from_str::<Vec<u32>>(&s).ok()
}
_ => None,
};
Some(PipelineItemView {
story_id,
stage,
name,
agent,
retry_count,
blocked,
depends_on,
})
}
/// Check whether a dependency (by numeric ID prefix) is in `5_done` or `6_archived`
/// according to CRDT state.
///
/// Returns `true` if the dependency is satisfied (item found in a done stage).
/// See `dep_is_archived_crdt` to distinguish archive-satisfied from cleanly-done.
pub fn dep_is_done_crdt(dep_number: u32) -> bool {
let prefix = format!("{dep_number}_");
if let Some(items) = read_all_items() {
items.iter().any(|item| {
item.story_id.starts_with(&prefix)
&& matches!(item.stage.as_str(), "5_done" | "6_archived")
})
} else {
false
}
}
/// Check whether a dependency (by numeric ID prefix) is specifically in `6_archived`
/// according to CRDT state.
///
/// Used to detect when a dependency is satisfied via archive rather than via a clean
/// completion through `5_done`. Returns `false` when the CRDT layer is not initialised.
pub fn dep_is_archived_crdt(dep_number: u32) -> bool {
let prefix = format!("{dep_number}_");
if let Some(items) = read_all_items() {
items.iter().any(|item| {
item.story_id.starts_with(&prefix) && item.stage == "6_archived"
})
} else {
false
}
}
/// Check unmet dependencies for a story by reading its `depends_on` from the
/// CRDT document and checking each dependency against CRDT state.
///
/// Returns the list of dependency numbers that are NOT in `5_done` or `6_archived`.
pub fn check_unmet_deps_crdt(story_id: &str) -> Vec<u32> {
let item = match read_item(story_id) {
Some(i) => i,
None => return Vec::new(),
};
let deps = match item.depends_on {
Some(d) => d,
None => return Vec::new(),
};
deps.into_iter()
.filter(|&dep| !dep_is_done_crdt(dep))
.collect()
}
/// Return the list of dependency numbers from `story_id`'s `depends_on` that are
/// specifically in `6_archived` according to CRDT state.
///
/// Used to emit a warning when promotion fires because a dep is archived rather than
/// cleanly completed. Returns an empty `Vec` when no deps are archived.
pub fn check_archived_deps_crdt(story_id: &str) -> Vec<u32> {
let item = match read_item(story_id) {
Some(i) => i,
None => return Vec::new(),
};
let deps = match item.depends_on {
Some(d) => d,
None => return Vec::new(),
};
deps.into_iter()
.filter(|&dep| dep_is_archived_crdt(dep))
.collect()
}
/// Hex-encode a byte slice (no external dep needed).
mod hex {
pub fn encode(bytes: &[u8]) -> String {
bytes.iter().map(|b| format!("{b:02x}")).collect()
}
}
// ── Tests ────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use bft_json_crdt::json_crdt::OpState;
#[test]
fn crdt_doc_insert_and_view() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item_json: JsonValue = json!({
"story_id": "10_story_test",
"stage": "2_current",
"name": "Test Story",
"agent": "coder-opus",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
assert_eq!(crdt.apply(op), OpState::Ok);
let view = crdt.doc.items.view();
assert_eq!(view.len(), 1);
let item = &crdt.doc.items[0];
assert_eq!(item.story_id.view(), JsonValue::String("10_story_test".to_string()));
assert_eq!(item.stage.view(), JsonValue::String("2_current".to_string()));
}
#[test]
fn crdt_doc_update_stage() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item_json: JsonValue = json!({
"story_id": "20_story_move",
"stage": "1_backlog",
"name": "Move Me",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
crdt.apply(insert_op);
// Update stage
let stage_op = crdt.doc.items[0].stage.set("2_current".to_string()).sign(&kp);
crdt.apply(stage_op);
assert_eq!(
crdt.doc.items[0].stage.view(),
JsonValue::String("2_current".to_string())
);
}
#[test]
fn crdt_ops_replay_reconstructs_state() {
let kp = make_keypair();
let mut crdt1 = BaseCrdt::<PipelineDoc>::new(&kp);
// Build state with a series of ops.
let item_json: JsonValue = json!({
"story_id": "30_story_replay",
"stage": "1_backlog",
"name": "Replay Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op1 = crdt1.doc.items.insert(ROOT_ID, item_json).sign(&kp);
crdt1.apply(op1.clone());
let op2 = crdt1.doc.items[0].stage.set("2_current".to_string()).sign(&kp);
crdt1.apply(op2.clone());
let op3 = crdt1.doc.items[0].name.set("Updated Name".to_string()).sign(&kp);
crdt1.apply(op3.clone());
// Replay ops on a fresh CRDT.
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
crdt2.apply(op1);
crdt2.apply(op2);
crdt2.apply(op3);
assert_eq!(
crdt1.doc.items[0].stage.view(),
crdt2.doc.items[0].stage.view()
);
assert_eq!(
crdt1.doc.items[0].name.view(),
crdt2.doc.items[0].name.view()
);
}
#[test]
fn extract_item_view_parses_crdt_item() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item_json: JsonValue = json!({
"story_id": "40_story_view",
"stage": "3_qa",
"name": "View Test",
"agent": "coder-1",
"retry_count": 2.0,
"blocked": true,
"depends_on": "[10,20]",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
crdt.apply(op);
let view = extract_item_view(&crdt.doc.items[0]).unwrap();
assert_eq!(view.story_id, "40_story_view");
assert_eq!(view.stage, "3_qa");
assert_eq!(view.name.as_deref(), Some("View Test"));
assert_eq!(view.agent.as_deref(), Some("coder-1"));
assert_eq!(view.retry_count, Some(2));
assert_eq!(view.blocked, Some(true));
assert_eq!(view.depends_on, Some(vec![10, 20]));
}
#[test]
fn rebuild_index_maps_story_ids() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
for (sid, stage) in &[("10_story_a", "1_backlog"), ("20_story_b", "2_current")] {
let item: JsonValue = json!({
"story_id": sid,
"stage": stage,
"name": "",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt.apply(op);
}
let index = rebuild_index(&crdt);
assert_eq!(index.len(), 2);
assert!(index.contains_key("10_story_a"));
assert!(index.contains_key("20_story_b"));
}
#[tokio::test]
async fn init_and_write_read_roundtrip() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("crdt_test.db");
// Init directly (not via the global singleton, for test isolation).
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
let keypair = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
// Insert and update like write_item does.
let item_json: JsonValue = json!({
"story_id": "50_story_roundtrip",
"stage": "1_backlog",
"name": "Roundtrip",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&keypair);
crdt.apply(insert_op.clone());
// Persist the op.
let op_json = serde_json::to_string(&insert_op).unwrap();
let op_id = hex::encode(&insert_op.id());
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)",
)
.bind(&op_id)
.bind(insert_op.inner.seq as i64)
.bind(&op_json)
.bind(&now)
.execute(&pool)
.await
.unwrap();
// Reconstruct from DB.
let rows: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
.fetch_all(&pool)
.await
.unwrap();
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&keypair);
for (json_str,) in &rows {
let op: SignedOp = serde_json::from_str(json_str).unwrap();
crdt2.apply(op);
}
let view = extract_item_view(&crdt2.doc.items[0]).unwrap();
assert_eq!(view.story_id, "50_story_roundtrip");
assert_eq!(view.stage, "1_backlog");
assert_eq!(view.name.as_deref(), Some("Roundtrip"));
}
#[test]
fn signed_op_serialization_roundtrip() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = json!({
"story_id": "60_story_serde",
"stage": "1_backlog",
"name": "Serde Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
let json_str = serde_json::to_string(&op).unwrap();
let deserialized: SignedOp = serde_json::from_str(&json_str).unwrap();
assert_eq!(op.id(), deserialized.id());
assert_eq!(op.inner.seq, deserialized.inner.seq);
}
// ── CrdtEvent tests ─────────────────────────────────────────────────
#[test]
fn crdt_event_has_expected_fields() {
let evt = CrdtEvent {
story_id: "42_story_foo".to_string(),
from_stage: Some("1_backlog".to_string()),
to_stage: "2_current".to_string(),
name: Some("Foo Feature".to_string()),
};
assert_eq!(evt.story_id, "42_story_foo");
assert_eq!(evt.from_stage.as_deref(), Some("1_backlog"));
assert_eq!(evt.to_stage, "2_current");
assert_eq!(evt.name.as_deref(), Some("Foo Feature"));
}
#[test]
fn crdt_event_clone_preserves_data() {
let evt = CrdtEvent {
story_id: "10_story_bar".to_string(),
from_stage: None,
to_stage: "1_backlog".to_string(),
name: None,
};
let cloned = evt.clone();
assert_eq!(cloned.story_id, "10_story_bar");
assert!(cloned.from_stage.is_none());
assert!(cloned.name.is_none());
}
#[test]
fn emit_event_is_noop_when_channel_not_initialised() {
// Before CRDT_EVENT_TX is set, emit_event should not panic.
// This test verifies the guard clause works. In test binaries the
// OnceLock may already be set by another test, so we just verify
// the function doesn't panic regardless.
emit_event(CrdtEvent {
story_id: "99_story_noop".to_string(),
from_stage: None,
to_stage: "1_backlog".to_string(),
name: None,
});
}
#[test]
fn crdt_event_broadcast_channel_round_trip() {
let (tx, mut rx) = broadcast::channel::<CrdtEvent>(16);
let evt = CrdtEvent {
story_id: "70_story_broadcast".to_string(),
from_stage: Some("1_backlog".to_string()),
to_stage: "2_current".to_string(),
name: Some("Broadcast Test".to_string()),
};
tx.send(evt).unwrap();
let received = rx.try_recv().unwrap();
assert_eq!(received.story_id, "70_story_broadcast");
assert_eq!(received.from_stage.as_deref(), Some("1_backlog"));
assert_eq!(received.to_stage, "2_current");
assert_eq!(received.name.as_deref(), Some("Broadcast Test"));
}
#[test]
fn dep_is_done_crdt_returns_false_when_no_crdt_state() {
// When the global CRDT state is not initialised (or in a test environment),
// dep_is_done_crdt should return false rather than panicking.
// Note: in the test binary the global may or may not be initialised,
// but the function should never panic either way.
let _ = dep_is_done_crdt(9999);
}
#[test]
fn check_unmet_deps_crdt_returns_empty_when_item_not_found() {
// Non-existent story should return empty deps.
let result = check_unmet_deps_crdt("nonexistent_story");
assert!(result.is_empty());
}
// ── Bug 503: archived-dep visibility ─────────────────────────────────────
#[test]
fn dep_is_archived_crdt_returns_false_when_no_crdt_state() {
// When the global CRDT state is not initialised, must not panic.
let _ = dep_is_archived_crdt(9998);
}
#[test]
fn check_archived_deps_crdt_returns_empty_when_item_not_found() {
// Non-existent story should return empty archived deps.
let result = check_archived_deps_crdt("nonexistent_story_archived");
assert!(result.is_empty());
}
// ── 478: WebSocket CRDT sync layer tests ────────────────────────────────
#[test]
fn apply_remote_op_returns_false_when_not_initialised() {
// Without the global CRDT state, apply_remote_op should return false.
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = serde_json::json!({
"story_id": "80_story_remote",
"stage": "1_backlog",
"name": "Remote",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp);
// This uses the global state which may not be initialised in tests.
let _ = apply_remote_op(op);
}
#[test]
fn signed_op_survives_sync_serialization_roundtrip() {
// Verify that a SignedOp serialised to JSON and back produces
// the same op (critical for the sync wire protocol).
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = serde_json::json!({
"story_id": "90_story_wire",
"stage": "2_current",
"name": "Wire Test",
"agent": "coder",
"retry_count": 1.0,
"blocked": false,
"depends_on": "[10]",
})
.into();
let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp);
let json1 = serde_json::to_string(&op).unwrap();
let roundtripped: SignedOp = serde_json::from_str(&json1).unwrap();
let json2 = serde_json::to_string(&roundtripped).unwrap();
assert_eq!(json1, json2);
assert_eq!(op.id(), roundtripped.id());
assert_eq!(op.inner.seq, roundtripped.inner.seq);
assert_eq!(op.author(), roundtripped.author());
}
#[test]
fn sync_broadcast_channel_round_trip() {
let (tx, mut rx) = broadcast::channel::<SignedOp>(16);
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = serde_json::json!({
"story_id": "95_story_sync_bcast",
"stage": "1_backlog",
"name": "",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp);
tx.send(op.clone()).unwrap();
let received = rx.try_recv().unwrap();
assert_eq!(received.id(), op.id());
}
// ── Bug 511: CRDT lamport clock resets on restart ────────────────────────
//
// Root cause: Op::sign() always produces SignedOp with depends_on = vec![],
// so the causal dependency queue never engages during replay. Field update
// ops (seq=1,2,3 from each field's LwwRegisterCrdt counter) are replayed
// before list insert ops (seq=N from the items ListCrdt counter) when
// ordered by `seq ASC`. They fail ErrPathMismatch silently, their our_seq
// is never updated, and the next field write re-uses seq=1.
//
// Fix: replay by `rowid ASC` (SQLite insertion order) instead of `seq ASC`.
// Rowid preserves the causal order ops were originally applied in, so field
// updates always come after the item insert they reference.
#[tokio::test]
async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("bug511.db");
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
// Insert 5 dummy items to advance items.our_seq to 5.
for i in 0..5u32 {
let sid = format!("{}_story_warmup", i);
let item: JsonValue = json!({
"story_id": sid,
"stage": "1_backlog",
"name": "",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt.apply(op.clone());
// We don't persist these to the DB — they are pre-history.
}
// Now insert the real item. items.our_seq was 5, so this op gets seq=6.
let target_item: JsonValue = json!({
"story_id": "511_story_target",
"stage": "1_backlog",
"name": "Bug 511 target",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp);
crdt.apply(insert_op.clone());
// insert_op.inner.seq == 6
// Now update the stage. The stage LwwRegisterCrdt for this item starts
// at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6.
let idx = rebuild_index(&crdt)["511_story_target"];
let stage_op = crdt.doc.items[idx].stage.set("2_current".to_string()).sign(&kp);
crdt.apply(stage_op.clone());
// stage_op.inner.seq == 1
// Persist BOTH ops in causal order (insert first, update second).
// This means insert_op gets rowid < stage_op rowid.
let now = chrono::Utc::now().to_rfc3339();
for op in [&insert_op, &stage_op] {
let op_json = serde_json::to_string(op).unwrap();
let op_id = hex::encode(&op.id());
sqlx::query(
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)",
)
.bind(&op_id)
.bind(op.inner.seq as i64)
.bind(&op_json)
.bind(&now)
.execute(&pool)
.await
.unwrap();
}
// Replay by rowid ASC (the fix). The insert must come before the field
// update regardless of their field-level seq values.
let rows: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
.fetch_all(&pool)
.await
.unwrap();
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
for (json_str,) in &rows {
let op: SignedOp = serde_json::from_str(json_str).unwrap();
crdt2.apply(op);
}
// The item must be in the CRDT and must reflect the stage update.
let index2 = rebuild_index(&crdt2);
assert!(
index2.contains_key("511_story_target"),
"item not found after rowid-order replay"
);
let idx2 = index2["511_story_target"];
let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap();
assert_eq!(
view.stage, "2_current",
"stage field update lost during replay (bug 511 regression)"
);
// Confirm the bug is reproducible by replaying seq ASC instead.
// With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6),
// fails ErrPathMismatch, and the item ends up at "1_backlog".
let rows_wrong_order: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC")
.fetch_all(&pool)
.await
.unwrap();
let mut crdt3 = BaseCrdt::<PipelineDoc>::new(&kp);
for (json_str,) in &rows_wrong_order {
let op: SignedOp = serde_json::from_str(json_str).unwrap();
crdt3.apply(op);
}
let index3 = rebuild_index(&crdt3);
// With seq ASC replay, the item is created (insert_op eventually runs)
// but the stage update is lost (it ran before the item existed).
if let Some(idx3) = index3.get("511_story_target") {
let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap();
// The bug: stage is still "1_backlog" because the update was dropped.
assert_eq!(
view3.stage, "1_backlog",
"expected seq-ASC replay to exhibit the bug (update lost)"
);
}
}
}