fix(511): replay CRDT ops by rowid ASC instead of seq ASC

The CRDT lamport seq is per-author and per-field, not globally
monotonic. Replaying by `seq ASC` causes field-update ops (which
have low per-field seq counters like 1, 2, 3) to be applied
BEFORE the list-insert ops they reference (which have higher
per-list seq counters like N for the Nth item ever inserted).
The field updates fail with ErrPathMismatch because the target
item doesn't exist yet, the field counter is never advanced,
and subsequent writes silently lose state.

Concretely on 2026-04-09 we observed: post-restart writes were
being persisted at seq=1,2,3,4,5,6,7 even though pre-restart
seq had reached 492. On the next replay, those low-seq field
updates would be applied before their seq=485+ creation ops,
silently dropping the updates. This was the load-bearing
"why does state keep flapping" bug today.

Fix: replay by `rowid ASC` (SQLite insertion order) instead.
Rowid preserves the causal order ops were originally applied
in, so field updates always come after the item insert they
reference.

Adds a regression test that constructs the exact scenario:
inserts a story (op gets seq=6), updates its stage (op gets
seq=1 because field counter starts at 0), persists both ops
in causal order, then replays both seq ASC (reproduces the
bug — stage update is lost) and rowid ASC (the fix — stage
update is preserved).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Timmy
2026-04-09 21:02:01 +01:00
parent 5765fb57be
commit 995576358f
+141 -2
View File
@@ -143,7 +143,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
// Replay persisted ops to reconstruct state. // Replay persisted ops to reconstruct state.
let rows: Vec<(String,)> = let rows: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC") sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
.fetch_all(&pool) .fetch_all(&pool)
.await?; .await?;
@@ -828,7 +828,7 @@ mod tests {
// Reconstruct from DB. // Reconstruct from DB.
let rows: Vec<(String,)> = let rows: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC") sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
.fetch_all(&pool) .fetch_all(&pool)
.await .await
.unwrap(); .unwrap();
@@ -1033,4 +1033,143 @@ mod tests {
let received = rx.try_recv().unwrap(); let received = rx.try_recv().unwrap();
assert_eq!(received.id(), op.id()); assert_eq!(received.id(), op.id());
} }
// ── Bug 511: CRDT lamport clock resets on restart ────────────────────────
//
// Root cause: Op::sign() always produces SignedOp with depends_on = vec![],
// so the causal dependency queue never engages during replay. Field update
// ops (seq=1,2,3 from each field's LwwRegisterCrdt counter) are replayed
// before list insert ops (seq=N from the items ListCrdt counter) when
// ordered by `seq ASC`. They fail ErrPathMismatch silently, their our_seq
// is never updated, and the next field write re-uses seq=1.
//
// Fix: replay by `rowid ASC` (SQLite insertion order) instead of `seq ASC`.
// Rowid preserves the causal order ops were originally applied in, so field
// updates always come after the item insert they reference.
#[tokio::test]
async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("bug511.db");
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
// Insert 5 dummy items to advance items.our_seq to 5.
for i in 0..5u32 {
let sid = format!("{}_story_warmup", i);
let item: JsonValue = json!({
"story_id": sid,
"stage": "1_backlog",
"name": "",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt.apply(op.clone());
// We don't persist these to the DB — they are pre-history.
}
// Now insert the real item. items.our_seq was 5, so this op gets seq=6.
let target_item: JsonValue = json!({
"story_id": "511_story_target",
"stage": "1_backlog",
"name": "Bug 511 target",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp);
crdt.apply(insert_op.clone());
// insert_op.inner.seq == 6
// Now update the stage. The stage LwwRegisterCrdt for this item starts
// at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6.
let idx = rebuild_index(&crdt)["511_story_target"];
let stage_op = crdt.doc.items[idx].stage.set("2_current".to_string()).sign(&kp);
crdt.apply(stage_op.clone());
// stage_op.inner.seq == 1
// Persist BOTH ops in causal order (insert first, update second).
// This means insert_op gets rowid < stage_op rowid.
let now = chrono::Utc::now().to_rfc3339();
for op in [&insert_op, &stage_op] {
let op_json = serde_json::to_string(op).unwrap();
let op_id = hex::encode(&op.id());
sqlx::query(
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)",
)
.bind(&op_id)
.bind(op.inner.seq as i64)
.bind(&op_json)
.bind(&now)
.execute(&pool)
.await
.unwrap();
}
// Replay by rowid ASC (the fix). The insert must come before the field
// update regardless of their field-level seq values.
let rows: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
.fetch_all(&pool)
.await
.unwrap();
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
for (json_str,) in &rows {
let op: SignedOp = serde_json::from_str(json_str).unwrap();
crdt2.apply(op);
}
// The item must be in the CRDT and must reflect the stage update.
let index2 = rebuild_index(&crdt2);
assert!(
index2.contains_key("511_story_target"),
"item not found after rowid-order replay"
);
let idx2 = index2["511_story_target"];
let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap();
assert_eq!(
view.stage, "2_current",
"stage field update lost during replay (bug 511 regression)"
);
// Confirm the bug is reproducible by replaying seq ASC instead.
// With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6),
// fails ErrPathMismatch, and the item ends up at "1_backlog".
let rows_wrong_order: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC")
.fetch_all(&pool)
.await
.unwrap();
let mut crdt3 = BaseCrdt::<PipelineDoc>::new(&kp);
for (json_str,) in &rows_wrong_order {
let op: SignedOp = serde_json::from_str(json_str).unwrap();
crdt3.apply(op);
}
let index3 = rebuild_index(&crdt3);
// With seq ASC replay, the item is created (insert_op eventually runs)
// but the stage update is lost (it ran before the item existed).
if let Some(idx3) = index3.get("511_story_target") {
let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap();
// The bug: stage is still "1_backlog" because the update was dropped.
assert_eq!(
view3.stage, "1_backlog",
"expected seq-ASC replay to exhibit the bug (update lost)"
);
}
}
} }