diff --git a/crates/bft-json-crdt/src/list_crdt.rs b/crates/bft-json-crdt/src/list_crdt.rs index d130f0be..e56cbfd1 100644 --- a/crates/bft-json-crdt/src/list_crdt.rs +++ b/crates/bft-json-crdt/src/list_crdt.rs @@ -47,6 +47,21 @@ where } } + /// Returns the current Lamport sequence number for this list. + pub fn our_seq(&self) -> SequenceNumber { + self.our_seq + } + + /// Advance the internal sequence counter to at least `seq`. + /// + /// After `advance_seq(n)`, the next local op will carry `seq = max(our_seq, n) + 1` + /// instead of the default `1`. Used on restart to resume the Lamport clock + /// from the document-wide floor so that newly-created registers don't + /// re-emit low sequence numbers. + pub fn advance_seq(&mut self, seq: SequenceNumber) { + self.our_seq = max(self.our_seq, seq); + } + /// Locally insert some content causally after the given operation pub fn insert>(&mut self, after: OpId, content: U) -> Op { let mut op = Op::new( @@ -365,6 +380,18 @@ mod test { assert_eq!(list.view(), vec![1, 4, 2, 3]); } + #[test] + fn test_advance_seq_resumes_from_floor() { + let mut list = ListCrdt::::new(make_author(1), vec![]); + list.advance_seq(100); + assert_eq!(list.our_seq(), 100); + let op = list.insert(ROOT_ID, 42); + assert_eq!( + op.seq, 101, + "first op after advance_seq(100) must have seq=101" + ); + } + #[test] fn test_list_idempotence() { let mut list = ListCrdt::::new(make_author(1), vec![]); diff --git a/crates/bft-json-crdt/src/lww_crdt.rs b/crates/bft-json-crdt/src/lww_crdt.rs index e8a39860..6d4e30ba 100644 --- a/crates/bft-json-crdt/src/lww_crdt.rs +++ b/crates/bft-json-crdt/src/lww_crdt.rs @@ -37,6 +37,21 @@ where } } + /// Returns the current Lamport sequence number for this register. + pub fn our_seq(&self) -> SequenceNumber { + self.our_seq + } + + /// Advance the internal sequence counter to at least `seq`. + /// + /// After `advance_seq(n)`, the next local op will carry `seq = max(our_seq, n) + 1` + /// instead of the default `1`. Used on restart to resume the Lamport clock + /// from the document-wide floor so that newly-created registers don't + /// re-emit low sequence numbers. + pub fn advance_seq(&mut self, seq: SequenceNumber) { + self.our_seq = max(self.our_seq, seq); + } + /// Sets the current value of the register pub fn set>(&mut self, content: U) -> Op { let mut op = Op::new( @@ -174,6 +189,18 @@ mod test { assert_eq!(register.view(), Some(1)); } + #[test] + fn test_advance_seq_resumes_from_floor() { + let mut register = LwwRegisterCrdt::::new(make_author(1), vec![]); + register.advance_seq(100); + assert_eq!(register.our_seq(), 100); + let op = register.set(42); + assert_eq!( + op.seq, 101, + "first op after advance_seq(100) must have seq=101" + ); + } + #[test] fn test_lww_consistent_tiebreak() { let mut register1 = LwwRegisterCrdt::new(make_author(1), vec![]); diff --git a/server/src/crdt_state/presence.rs b/server/src/crdt_state/presence.rs index ac30b348..811f431f 100644 --- a/server/src/crdt_state/presence.rs +++ b/server/src/crdt_state/presence.rs @@ -134,6 +134,19 @@ pub fn write_node_presence(node_id: &str, address: &str, last_seen: f64, alive: // Rebuild node index after insertion. state.node_index = rebuild_node_index(&state.crdt); + + // Advance the inner registers of the newly-created node to the Lamport + // floor so their first local ops don't re-emit low sequence numbers. + let floor = state.lamport_floor; + if floor > 0 + && let Some(&idx) = state.node_index.get(node_id) + { + let node = &mut state.crdt.doc.nodes[idx]; + node.node_id.advance_seq(floor); + node.address.advance_seq(floor); + node.last_seen.advance_seq(floor); + node.alive.advance_seq(floor); + } } } diff --git a/server/src/crdt_state/state.rs b/server/src/crdt_state/state.rs index bf03fcc5..29e53f6a 100644 --- a/server/src/crdt_state/state.rs +++ b/server/src/crdt_state/state.rs @@ -63,6 +63,11 @@ pub(super) struct CrdtState { pub(super) node_index: HashMap, /// Channel sender for fire-and-forget op persistence. pub(super) persist_tx: mpsc::UnboundedSender, + /// Max sequence number seen across all ops during init() replay. + /// + /// Newly-created registers (post-init) must have their Lamport clock + /// advanced to this floor so they don't re-emit low sequence numbers. + pub(super) lamport_floor: u64, } static CRDT_STATE: OnceLock> = OnceLock::new(); @@ -123,10 +128,12 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { let mut all_ops_vec = Vec::with_capacity(rows.len()); let mut vector_clock = VectorClock::new(); + let mut lamport_floor: u64 = 0; for (op_json,) in &rows { if let Ok(signed_op) = serde_json::from_str::(op_json) { let author_hex = hex::encode(&signed_op.author()); *vector_clock.entry(author_hex).or_insert(0) += 1; + lamport_floor = lamport_floor.max(signed_op.inner.seq); crdt.apply(signed_op); all_ops_vec.push(op_json.clone()); } else { @@ -140,11 +147,17 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { let index = rebuild_index(&crdt); let node_index = rebuild_node_index(&crdt); + // Advance the top-level list clocks to the Lamport floor so that + // list-level inserts (new items / new nodes) don't re-emit low seq numbers. + crdt.doc.items.advance_seq(lamport_floor); + crdt.doc.nodes.advance_seq(lamport_floor); + slog!( - "[crdt] Initialised: {} ops replayed, {} items indexed, {} nodes indexed", + "[crdt] Initialised: {} ops replayed, {} items indexed, {} nodes indexed, lamport_floor={}", rows.len(), index.len(), - node_index.len() + node_index.len(), + lamport_floor, ); // Spawn background persistence task. @@ -187,6 +200,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { index, node_index, persist_tx, + lamport_floor, }; let _ = CRDT_STATE.set(Mutex::new(state)); @@ -224,6 +238,7 @@ pub fn init_for_test() { index: HashMap::new(), node_index: HashMap::new(), persist_tx, + lamport_floor: 0, }; let _ = lock.set(Mutex::new(state)); } @@ -484,6 +499,7 @@ mod tests { index: HashMap::new(), node_index: HashMap::new(), persist_tx, + lamport_floor: 0, }; // Drop the receiver so that the next send fails immediately. @@ -531,4 +547,140 @@ mod tests { last_error.message ); } + + /// After replaying ops from a journal, a brand-new register created + /// post-init must emit its first local op with seq = lamport_floor + 1, + /// not seq = 1. This is the Phase C integration test. + #[tokio::test] + async fn restart_new_register_resumes_from_lamport_floor() { + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("lamport_floor.db"); + + let options = SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = SqlitePool::connect_with(options).await.unwrap(); + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + // Insert an item and update its stage a few times to push seq up. + let item: JsonValue = json!({ + "story_id": "664_story_original", + "stage": "1_backlog", + "name": "Original", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + "merged_at": 0.0, + }) + .into(); + + let mut ops = Vec::new(); + + let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt.apply(op1.clone()); + ops.push(op1); + + let idx = rebuild_index(&crdt)["664_story_original"]; + let op2 = crdt.doc.items[idx] + .stage + .set("2_current".to_string()) + .sign(&kp); + crdt.apply(op2.clone()); + ops.push(op2); + + let op3 = crdt.doc.items[idx] + .stage + .set("3_review".to_string()) + .sign(&kp); + crdt.apply(op3.clone()); + ops.push(op3); + + // Record the max seq across all persisted ops — this is the floor. + let max_seq = ops.iter().map(|o| o.inner.seq).max().unwrap(); + + // Persist all ops. + let now = chrono::Utc::now().to_rfc3339(); + for op in &ops { + let op_json = serde_json::to_string(op).unwrap(); + let op_id = hex::encode(&op.id()); + sqlx::query( + "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)", + ) + .bind(&op_id) + .bind(op.inner.seq as i64) + .bind(&op_json) + .bind(&now) + .execute(&pool) + .await + .unwrap(); + } + + // --- Simulate restart: replay from journal into a fresh CRDT --- + let rows: Vec<(String,)> = + sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC") + .fetch_all(&pool) + .await + .unwrap(); + + let mut crdt2 = BaseCrdt::::new(&kp); + let mut lamport_floor: u64 = 0; + for (json_str,) in &rows { + let signed: SignedOp = serde_json::from_str(json_str).unwrap(); + lamport_floor = lamport_floor.max(signed.inner.seq); + crdt2.apply(signed); + } + + // Advance top-level lists (mirrors what init() does). + crdt2.doc.items.advance_seq(lamport_floor); + crdt2.doc.nodes.advance_seq(lamport_floor); + + assert_eq!(lamport_floor, max_seq); + + // Insert a brand-new item — simulating a new story arriving after restart. + let new_item: JsonValue = json!({ + "story_id": "664_story_new_after_restart", + "stage": "1_backlog", + "name": "New After Restart", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + "merged_at": 0.0, + }) + .into(); + + let insert_op = crdt2.doc.items.insert(ROOT_ID, new_item); + // The list-level insert must have seq > lamport_floor. + assert!( + insert_op.seq > max_seq, + "list insert seq ({}) must be > lamport_floor ({})", + insert_op.seq, + max_seq, + ); + let insert_signed = insert_op.sign(&kp); + crdt2.apply(insert_signed); + + // Advance the new item's inner registers to the floor (mirrors write_item). + let idx2 = rebuild_index(&crdt2)["664_story_new_after_restart"]; + let new_crdt_item = &mut crdt2.doc.items[idx2]; + new_crdt_item.stage.advance_seq(lamport_floor); + + // Now update the stage — the first field-level op must also be > floor. + let stage_op = crdt2.doc.items[idx2].stage.set("2_current".to_string()); + assert!( + stage_op.seq > max_seq, + "first field op seq ({}) on new register must be > lamport_floor ({}); \ + got seq = 1 means the register reset its clock on restart", + stage_op.seq, + max_seq, + ); + } } diff --git a/server/src/crdt_state/write.rs b/server/src/crdt_state/write.rs index 487ee898..e79d8ae4 100644 --- a/server/src/crdt_state/write.rs +++ b/server/src/crdt_state/write.rs @@ -120,6 +120,25 @@ pub fn write_item( // Rebuild index after insertion (indices may shift). state.index = rebuild_index(&state.crdt); + // Advance the inner registers of the newly-created item to the Lamport + // floor so their first local ops don't re-emit low sequence numbers. + let floor = state.lamport_floor; + if floor > 0 + && let Some(&idx) = state.index.get(story_id) + { + let item = &mut state.crdt.doc.items[idx]; + item.story_id.advance_seq(floor); + item.stage.advance_seq(floor); + item.name.advance_seq(floor); + item.agent.advance_seq(floor); + item.retry_count.advance_seq(floor); + item.blocked.advance_seq(floor); + item.depends_on.advance_seq(floor); + item.claimed_by.advance_seq(floor); + item.claimed_at.advance_seq(floor); + item.merged_at.advance_seq(floor); + } + // Broadcast a CrdtEvent for the new item. emit_event(CrdtEvent { story_id: story_id.to_string(),