diff --git a/server/src/crdt_state.rs b/server/src/crdt_state.rs index d3f780db..6b4ba526 100644 --- a/server/src/crdt_state.rs +++ b/server/src/crdt_state.rs @@ -143,7 +143,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { // Replay persisted ops to reconstruct state. let rows: Vec<(String,)> = - sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC") + sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") .fetch_all(&pool) .await?; @@ -828,7 +828,7 @@ mod tests { // Reconstruct from DB. let rows: Vec<(String,)> = - sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC") + sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") .fetch_all(&pool) .await .unwrap(); @@ -1033,4 +1033,143 @@ mod tests { let received = rx.try_recv().unwrap(); assert_eq!(received.id(), op.id()); } + + // ── Bug 511: CRDT lamport clock resets on restart ──────────────────────── + // + // Root cause: Op::sign() always produces SignedOp with depends_on = vec![], + // so the causal dependency queue never engages during replay. Field update + // ops (seq=1,2,3 from each field's LwwRegisterCrdt counter) are replayed + // before list insert ops (seq=N from the items ListCrdt counter) when + // ordered by `seq ASC`. They fail ErrPathMismatch silently, their our_seq + // is never updated, and the next field write re-uses seq=1. + // + // Fix: replay by `rowid ASC` (SQLite insertion order) instead of `seq ASC`. + // Rowid preserves the causal order ops were originally applied in, so field + // updates always come after the item insert they reference. + #[tokio::test] + async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("bug511.db"); + + let options = SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = SqlitePool::connect_with(options).await.unwrap(); + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + // Insert 5 dummy items to advance items.our_seq to 5. + for i in 0..5u32 { + let sid = format!("{}_story_warmup", i); + let item: JsonValue = json!({ + "story_id": sid, + "stage": "1_backlog", + "name": "", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt.apply(op.clone()); + // We don't persist these to the DB — they are pre-history. + } + + // Now insert the real item. items.our_seq was 5, so this op gets seq=6. + let target_item: JsonValue = json!({ + "story_id": "511_story_target", + "stage": "1_backlog", + "name": "Bug 511 target", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp); + crdt.apply(insert_op.clone()); + // insert_op.inner.seq == 6 + + // Now update the stage. The stage LwwRegisterCrdt for this item starts + // at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6. + let idx = rebuild_index(&crdt)["511_story_target"]; + let stage_op = crdt.doc.items[idx].stage.set("2_current".to_string()).sign(&kp); + crdt.apply(stage_op.clone()); + // stage_op.inner.seq == 1 + + // Persist BOTH ops in causal order (insert first, update second). + // This means insert_op gets rowid < stage_op rowid. + let now = chrono::Utc::now().to_rfc3339(); + for op in [&insert_op, &stage_op] { + let op_json = serde_json::to_string(op).unwrap(); + let op_id = hex::encode(&op.id()); + sqlx::query( + "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", + ) + .bind(&op_id) + .bind(op.inner.seq as i64) + .bind(&op_json) + .bind(&now) + .execute(&pool) + .await + .unwrap(); + } + + // Replay by rowid ASC (the fix). The insert must come before the field + // update regardless of their field-level seq values. + let rows: Vec<(String,)> = + sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") + .fetch_all(&pool) + .await + .unwrap(); + + let mut crdt2 = BaseCrdt::::new(&kp); + for (json_str,) in &rows { + let op: SignedOp = serde_json::from_str(json_str).unwrap(); + crdt2.apply(op); + } + + // The item must be in the CRDT and must reflect the stage update. + let index2 = rebuild_index(&crdt2); + assert!( + index2.contains_key("511_story_target"), + "item not found after rowid-order replay" + ); + let idx2 = index2["511_story_target"]; + let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap(); + assert_eq!( + view.stage, "2_current", + "stage field update lost during replay (bug 511 regression)" + ); + + // Confirm the bug is reproducible by replaying seq ASC instead. + // With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6), + // fails ErrPathMismatch, and the item ends up at "1_backlog". + let rows_wrong_order: Vec<(String,)> = + sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC") + .fetch_all(&pool) + .await + .unwrap(); + + let mut crdt3 = BaseCrdt::::new(&kp); + for (json_str,) in &rows_wrong_order { + let op: SignedOp = serde_json::from_str(json_str).unwrap(); + crdt3.apply(op); + } + + let index3 = rebuild_index(&crdt3); + // With seq ASC replay, the item is created (insert_op eventually runs) + // but the stage update is lost (it ran before the item existed). + if let Some(idx3) = index3.get("511_story_target") { + let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap(); + // The bug: stage is still "1_backlog" because the update was dropped. + assert_eq!( + view3.stage, "1_backlog", + "expected seq-ASC replay to exhibit the bug (update lost)" + ); + } + } }