From 995576358f3f4ff8e107db703473b182f5195aa7 Mon Sep 17 00:00:00 2001 From: Timmy Date: Thu, 9 Apr 2026 21:02:01 +0100 Subject: [PATCH] fix(511): replay CRDT ops by rowid ASC instead of seq ASC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The CRDT lamport seq is per-author and per-field, not globally monotonic. Replaying by `seq ASC` causes field-update ops (which have low per-field seq counters like 1, 2, 3) to be applied BEFORE the list-insert ops they reference (which have higher per-list seq counters like N for the Nth item ever inserted). The field updates fail with ErrPathMismatch because the target item doesn't exist yet, the field counter is never advanced, and subsequent writes silently lose state. Concretely on 2026-04-09 we observed: post-restart writes were being persisted at seq=1,2,3,4,5,6,7 even though pre-restart seq had reached 492. On the next replay, those low-seq field updates would be applied before their seq=485+ creation ops, silently dropping the updates. This was the load-bearing "why does state keep flapping" bug today. Fix: replay by `rowid ASC` (SQLite insertion order) instead. Rowid preserves the causal order ops were originally applied in, so field updates always come after the item insert they reference. Adds a regression test that constructs the exact scenario: inserts a story (op gets seq=6), updates its stage (op gets seq=1 because field counter starts at 0), persists both ops in causal order, then replays both seq ASC (reproduces the bug — stage update is lost) and rowid ASC (the fix — stage update is preserved). Co-Authored-By: Claude Opus 4.6 (1M context) --- server/src/crdt_state.rs | 143 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 141 insertions(+), 2 deletions(-) diff --git a/server/src/crdt_state.rs b/server/src/crdt_state.rs index d3f780db..6b4ba526 100644 --- a/server/src/crdt_state.rs +++ b/server/src/crdt_state.rs @@ -143,7 +143,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { // Replay persisted ops to reconstruct state. let rows: Vec<(String,)> = - sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC") + sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") .fetch_all(&pool) .await?; @@ -828,7 +828,7 @@ mod tests { // Reconstruct from DB. let rows: Vec<(String,)> = - sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC") + sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") .fetch_all(&pool) .await .unwrap(); @@ -1033,4 +1033,143 @@ mod tests { let received = rx.try_recv().unwrap(); assert_eq!(received.id(), op.id()); } + + // ── Bug 511: CRDT lamport clock resets on restart ──────────────────────── + // + // Root cause: Op::sign() always produces SignedOp with depends_on = vec![], + // so the causal dependency queue never engages during replay. Field update + // ops (seq=1,2,3 from each field's LwwRegisterCrdt counter) are replayed + // before list insert ops (seq=N from the items ListCrdt counter) when + // ordered by `seq ASC`. They fail ErrPathMismatch silently, their our_seq + // is never updated, and the next field write re-uses seq=1. + // + // Fix: replay by `rowid ASC` (SQLite insertion order) instead of `seq ASC`. + // Rowid preserves the causal order ops were originally applied in, so field + // updates always come after the item insert they reference. + #[tokio::test] + async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("bug511.db"); + + let options = SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = SqlitePool::connect_with(options).await.unwrap(); + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + // Insert 5 dummy items to advance items.our_seq to 5. + for i in 0..5u32 { + let sid = format!("{}_story_warmup", i); + let item: JsonValue = json!({ + "story_id": sid, + "stage": "1_backlog", + "name": "", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt.apply(op.clone()); + // We don't persist these to the DB — they are pre-history. + } + + // Now insert the real item. items.our_seq was 5, so this op gets seq=6. + let target_item: JsonValue = json!({ + "story_id": "511_story_target", + "stage": "1_backlog", + "name": "Bug 511 target", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp); + crdt.apply(insert_op.clone()); + // insert_op.inner.seq == 6 + + // Now update the stage. The stage LwwRegisterCrdt for this item starts + // at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6. + let idx = rebuild_index(&crdt)["511_story_target"]; + let stage_op = crdt.doc.items[idx].stage.set("2_current".to_string()).sign(&kp); + crdt.apply(stage_op.clone()); + // stage_op.inner.seq == 1 + + // Persist BOTH ops in causal order (insert first, update second). + // This means insert_op gets rowid < stage_op rowid. + let now = chrono::Utc::now().to_rfc3339(); + for op in [&insert_op, &stage_op] { + let op_json = serde_json::to_string(op).unwrap(); + let op_id = hex::encode(&op.id()); + sqlx::query( + "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", + ) + .bind(&op_id) + .bind(op.inner.seq as i64) + .bind(&op_json) + .bind(&now) + .execute(&pool) + .await + .unwrap(); + } + + // Replay by rowid ASC (the fix). The insert must come before the field + // update regardless of their field-level seq values. + let rows: Vec<(String,)> = + sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") + .fetch_all(&pool) + .await + .unwrap(); + + let mut crdt2 = BaseCrdt::::new(&kp); + for (json_str,) in &rows { + let op: SignedOp = serde_json::from_str(json_str).unwrap(); + crdt2.apply(op); + } + + // The item must be in the CRDT and must reflect the stage update. + let index2 = rebuild_index(&crdt2); + assert!( + index2.contains_key("511_story_target"), + "item not found after rowid-order replay" + ); + let idx2 = index2["511_story_target"]; + let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap(); + assert_eq!( + view.stage, "2_current", + "stage field update lost during replay (bug 511 regression)" + ); + + // Confirm the bug is reproducible by replaying seq ASC instead. + // With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6), + // fails ErrPathMismatch, and the item ends up at "1_backlog". + let rows_wrong_order: Vec<(String,)> = + sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC") + .fetch_all(&pool) + .await + .unwrap(); + + let mut crdt3 = BaseCrdt::::new(&kp); + for (json_str,) in &rows_wrong_order { + let op: SignedOp = serde_json::from_str(json_str).unwrap(); + crdt3.apply(op); + } + + let index3 = rebuild_index(&crdt3); + // With seq ASC replay, the item is created (insert_op eventually runs) + // but the stage update is lost (it ran before the item existed). + if let Some(idx3) = index3.get("511_story_target") { + let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap(); + // The bug: stage is still "1_backlog" because the update was dropped. + assert_eq!( + view3.stage, "1_backlog", + "expected seq-ASC replay to exhibit the bug (update lost)" + ); + } + } }