diff --git a/server/src/crdt_state.rs b/server/src/crdt_state.rs index 0e580746..3a40fdd4 100644 --- a/server/src/crdt_state.rs +++ b/server/src/crdt_state.rs @@ -12,6 +12,11 @@ use std::collections::HashMap; use std::sync::{Mutex, OnceLock}; +/// A vector clock mapping node IDs (hex-encoded Ed25519 pubkeys) to the count +/// of ops seen from that node. Used for delta sync — a connecting peer sends +/// its clock so the other side can compute which ops are missing. +pub type VectorClock = HashMap; + use bft_json_crdt::json_crdt::*; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::list_crdt::ListCrdt; @@ -71,8 +76,71 @@ pub fn all_ops_json() -> Option> { ALL_OPS.get().map(|m| m.lock().unwrap().clone()) } +/// Return this node's current vector clock. +/// +/// The clock maps each author's hex-encoded Ed25519 public key to the count +/// of ops received from that author. A connecting peer sends its clock so +/// the other side can compute which ops are missing via [`ops_since`]. +/// +/// Returns `None` before `init()`. +pub fn our_vector_clock() -> Option { + VECTOR_CLOCK.get().map(|m| m.lock().unwrap().clone()) +} + +/// Return only the ops that a peer with the given `peer_clock` is missing. +/// +/// Iterates the local op journal and, for each author, skips the first N ops +/// (where N = `peer_clock[author]`) and returns the rest. An empty peer +/// clock returns all ops (full sync for new nodes). +/// +/// Returns `None` before `init()`. +pub fn ops_since(peer_clock: &VectorClock) -> Option> { + let all = ALL_OPS.get()?.lock().ok()?; + let mut author_counts: HashMap = HashMap::new(); + let mut result = Vec::new(); + + for op_json in all.iter() { + if let Ok(signed_op) = serde_json::from_str::(op_json) { + let author_hex = hex::encode(&signed_op.author()); + let count = author_counts.entry(author_hex.clone()).or_insert(0); + *count += 1; + + let peer_has = peer_clock.get(&author_hex).copied().unwrap_or(0); + if *count > peer_has { + result.push(op_json.clone()); + } + } + } + + Some(result) +} + static ALL_OPS: OnceLock>> = OnceLock::new(); +/// Live vector clock tracking op counts per author. +/// +/// Updated in lockstep with `ALL_OPS` — every time an op is appended to the +/// journal, the corresponding author's count is incremented here. This avoids +/// re-parsing all ops when a peer requests `our_vector_clock()`. +static VECTOR_CLOCK: OnceLock> = OnceLock::new(); + +/// Append an op's JSON to `ALL_OPS` and bump the author's count in `VECTOR_CLOCK`. +/// +/// Centralises the bookkeeping that must stay in sync between the two statics. +fn track_op(signed: &SignedOp, json: String) { + if let Some(all) = ALL_OPS.get() + && let Ok(mut v) = all.lock() + { + v.push(json); + } + if let Some(vc) = VECTOR_CLOCK.get() + && let Ok(mut clock) = vc.lock() + { + let author_hex = hex::encode(&signed.author()); + *clock.entry(author_hex).or_insert(0) += 1; + } +} + // ── CRDT document types ────────────────────────────────────────────── #[add_crdt_fields] @@ -223,8 +291,11 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { .await?; let mut all_ops_vec = Vec::with_capacity(rows.len()); + let mut vector_clock = VectorClock::new(); for (op_json,) in &rows { if let Ok(signed_op) = serde_json::from_str::(op_json) { + let author_hex = hex::encode(&signed_op.author()); + *vector_clock.entry(author_hex).or_insert(0) += 1; crdt.apply(signed_op); all_ops_vec.push(op_json.clone()); } else { @@ -232,6 +303,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { } } let _ = ALL_OPS.set(Mutex::new(all_ops_vec)); + let _ = VECTOR_CLOCK.set(Mutex::new(vector_clock)); // Build the indices from the reconstructed state. let index = rebuild_index(&crdt); @@ -328,6 +400,7 @@ pub fn init_for_test() { let _ = CRDT_EVENT_TX.get_or_init(|| broadcast::channel::(256).0); let _ = SYNC_TX.get_or_init(|| broadcast::channel::(1024).0); let _ = ALL_OPS.get_or_init(|| Mutex::new(Vec::new())); + let _ = VECTOR_CLOCK.get_or_init(|| Mutex::new(VectorClock::new())); } /// Load or create the Ed25519 keypair used by this node. @@ -396,12 +469,9 @@ where ); } - // Track in ALL_OPS and broadcast to sync peers. - if let Ok(json) = serde_json::to_string(&signed) - && let Some(all) = ALL_OPS.get() - && let Ok(mut v) = all.lock() - { - v.push(json); + // Track in ALL_OPS + VECTOR_CLOCK, then broadcast to sync peers. + if let Ok(json) = serde_json::to_string(&signed) { + track_op(&signed, json); } if let Some(tx) = SYNC_TX.get() { let _ = tx.send(signed); @@ -586,12 +656,9 @@ pub fn apply_remote_op(op: SignedOp) -> bool { ); } - // Track in ALL_OPS. - if let Ok(json) = serde_json::to_string(&op) - && let Some(all) = ALL_OPS.get() - && let Ok(mut v) = all.lock() - { - v.push(json); + // Track in ALL_OPS + VECTOR_CLOCK. + if let Ok(json) = serde_json::to_string(&op) { + track_op(&op, json); } // Rebuild indices (new items or nodes may have been inserted). @@ -1180,7 +1247,7 @@ pub fn check_archived_deps_crdt(story_id: &str) -> Vec { } /// Hex-encode a byte slice (no external dep needed). -mod hex { +pub(crate) mod hex { pub fn encode(bytes: &[u8]) -> String { bytes.iter().map(|b| format!("{b:02x}")).collect() } @@ -1853,4 +1920,199 @@ mod tests { last_error.message ); } + + // ── Story 631: vector clock delta sync tests ──────────────────────── + + /// Helper: create N signed insert ops on a CRDT and return them with their JSON. + fn make_ops( + kp: &Ed25519KeyPair, + crdt: &mut BaseCrdt, + count: usize, + prefix: &str, + ) -> Vec<(SignedOp, String)> { + let mut ops = Vec::new(); + for i in 0..count { + let item: JsonValue = json!({ + "story_id": format!("{prefix}_{i}"), + "stage": "1_backlog", + "name": format!("Item {i}"), + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op = crdt.doc.items.insert(ROOT_ID, item).sign(kp); + crdt.apply(op.clone()); + let json = serde_json::to_string(&op).unwrap(); + ops.push((op, json)); + } + ops + } + + /// Build a vector clock from a list of (SignedOp, json) pairs. + fn build_clock(ops: &[(SignedOp, String)]) -> VectorClock { + let mut clock = VectorClock::new(); + for (op, _) in ops { + let author = hex::encode(&op.author()); + *clock.entry(author).or_insert(0) += 1; + } + clock + } + + /// Compute ops_since against a local journal and peer clock. + /// + /// Mirrors the production `ops_since` logic but operates on a local Vec + /// instead of the global `ALL_OPS` static. + fn local_ops_since(all_ops: &[(SignedOp, String)], peer_clock: &VectorClock) -> Vec { + let mut author_counts: HashMap = HashMap::new(); + let mut result = Vec::new(); + for (op, json) in all_ops { + let author = hex::encode(&op.author()); + let count = author_counts.entry(author.clone()).or_insert(0); + *count += 1; + let peer_has = peer_clock.get(&author).copied().unwrap_or(0); + if *count > peer_has { + result.push(json.clone()); + } + } + result + } + + /// Integration test (low-bandwidth sync): two nodes, A applies 100 ops, + /// B reconnects with a current clock — B receives 0 ops on the bulk phase. + #[test] + fn delta_sync_low_bandwidth_fully_caught_up() { + let kp_a = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + + let ops_a = make_ops(&kp_a, &mut crdt_a, 100, "631_low"); + + // B has already seen all 100 ops (its clock matches A's journal). + let clock_b = build_clock(&ops_a); + + // Delta should be empty. + let delta = local_ops_since(&ops_a, &clock_b); + assert_eq!( + delta.len(), + 0, + "caught-up peer should receive 0 ops, got {}", + delta.len() + ); + } + + /// Integration test (mid-stream): A applies 100 ops, B disconnects, + /// A applies 50 more ops, B reconnects — B receives exactly the 50 missed ops. + #[test] + fn delta_sync_mid_stream_partial_catch_up() { + let kp_a = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + + // Phase 1: 100 ops that B has seen. + let ops_phase1 = make_ops(&kp_a, &mut crdt_a, 100, "631_mid1"); + let clock_b = build_clock(&ops_phase1); + + // Phase 2: 50 more ops that B missed. + let ops_phase2 = make_ops(&kp_a, &mut crdt_a, 50, "631_mid2"); + + // A's full journal is phase1 + phase2. + let mut all_ops_a: Vec<(SignedOp, String)> = ops_phase1; + all_ops_a.extend(ops_phase2); + + let delta = local_ops_since(&all_ops_a, &clock_b); + assert_eq!( + delta.len(), + 50, + "peer should receive exactly 50 missed ops, got {}", + delta.len() + ); + } + + /// Integration test (new node): C connects with empty clock, + /// receives all 150 ops — verifies fallback behaviour. + #[test] + fn delta_sync_new_node_receives_all_ops() { + let kp_a = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + + let ops_phase1 = make_ops(&kp_a, &mut crdt_a, 100, "631_new1"); + let ops_phase2 = make_ops(&kp_a, &mut crdt_a, 50, "631_new2"); + + let mut all_ops_a: Vec<(SignedOp, String)> = ops_phase1; + all_ops_a.extend(ops_phase2); + + // Empty clock = new node. + let empty_clock = VectorClock::new(); + let delta = local_ops_since(&all_ops_a, &empty_clock); + assert_eq!( + delta.len(), + 150, + "new node should receive all 150 ops, got {}", + delta.len() + ); + } + + /// Multi-author delta sync: ops from two different nodes, peer has seen + /// all of one author but none of the other. + #[test] + fn delta_sync_multi_author() { + use fastcrypto::traits::KeyPair; + + let kp_a = make_keypair(); + let kp_b = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + let mut crdt_b = BaseCrdt::::new(&kp_b); + + let ops_a = make_ops(&kp_a, &mut crdt_a, 30, "631_ma_a"); + let ops_b = make_ops(&kp_b, &mut crdt_b, 20, "631_ma_b"); + + // Combined journal on a hypothetical server. + let mut all_ops: Vec<(SignedOp, String)> = ops_a.clone(); + all_ops.extend(ops_b); + + // Peer has seen all of A's ops but none of B's. + let mut peer_clock = VectorClock::new(); + let author_a_hex = hex::encode(&kp_a.public().0.to_bytes()); + peer_clock.insert(author_a_hex, 30); + + let delta = local_ops_since(&all_ops, &peer_clock); + assert_eq!( + delta.len(), + 20, + "peer should receive 20 ops from author B, got {}", + delta.len() + ); + } + + /// Vector clock construction from ops. + #[test] + fn build_vector_clock_from_ops() { + use fastcrypto::traits::KeyPair; + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + let ops = make_ops(&kp, &mut crdt, 10, "631_vc"); + + let clock = build_clock(&ops); + let author_hex = hex::encode(&kp.public().0.to_bytes()); + + assert_eq!(clock.len(), 1, "single author should produce 1 clock entry"); + assert_eq!(clock[&author_hex], 10, "clock should show 10 ops"); + } + + /// Wire format: clock message serialization round-trip. + #[test] + fn clock_message_serialization_roundtrip() { + let mut clock = VectorClock::new(); + clock.insert("aabbcc".to_string(), 42); + clock.insert("ddeeff".to_string(), 7); + + let json = serde_json::to_value(&clock).unwrap(); + assert!(json.is_object()); + let deserialized: VectorClock = serde_json::from_value(json).unwrap(); + assert_eq!(deserialized["aabbcc"], 42); + assert_eq!(deserialized["ddeeff"], 7); + } } diff --git a/server/src/crdt_sync.rs b/server/src/crdt_sync.rs index 021ff544..a5488729 100644 --- a/server/src/crdt_sync.rs +++ b/server/src/crdt_sync.rs @@ -4,12 +4,26 @@ /// /// # Protocol /// -/// The sync protocol is a hybrid of two frame types: +/// ## Version negotiation /// -/// ## Text frames (bulk initial state) +/// After the auth handshake, both sides send their first sync message: +/// +/// - **v2 peers** send a `clock` frame: `{"type":"clock","clock":{ : , ... }}` +/// containing a vector clock that maps each author's hex Ed25519 pubkey to the +/// count of ops received from that author. Upon receiving the peer's clock, +/// each side computes the delta via [`crdt_state::ops_since`] and sends only +/// the missing ops as a `bulk` frame. +/// +/// - **v1 (legacy) peers** send a `bulk` frame directly (full op dump). +/// A v2 peer receiving a `bulk` first (instead of a `clock`) falls back to +/// the full-dump path: applies the incoming bulk and responds with its own +/// full bulk. This preserves backward compatibility — no code change needed +/// on the v1 side. +/// +/// ## Text frames /// A JSON object with a `"type"` field: -/// - `{"type":"bulk","ops":[...]}` — Initial state dump (array of serialised -/// `SignedOp` JSON strings). Sent by both sides immediately after connect. +/// - `{"type":"clock","clock":{...}}` — Vector clock (v2 protocol). +/// - `{"type":"bulk","ops":[...]}` — Ops dump (full or delta). /// /// ## Binary frames (real-time op broadcast) /// Individual `SignedOp`s encoded via [`crate::crdt_wire`] (versioned JSON @@ -89,10 +103,18 @@ struct AuthMessage { #[derive(Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] enum SyncMessage { - /// Bulk state dump sent on connect. + /// Bulk state dump sent on connect (v1) or delta ops after clock exchange (v2). Bulk { ops: Vec }, /// A single new op. Op { op: String }, + /// Vector clock exchanged on connect (v2 protocol). + /// + /// Each entry maps a node's hex-encoded Ed25519 public key to the count of + /// ops received from that node. The receiving side computes the delta via + /// [`crdt_state::ops_since`] and sends only the missing ops. + Clock { + clock: std::collections::HashMap, + }, } // ── Server-side WebSocket handler ─────────────────────────────────── @@ -174,13 +196,85 @@ pub async fn crdt_sync_handler( // ── Auth passed — proceed with CRDT sync ────────────────── - // Send bulk state dump. - if let Some(ops) = crdt_state::all_ops_json() { - let msg = SyncMessage::Bulk { ops }; - if let Ok(json) = serde_json::to_string(&msg) - && sink.send(WsMessage::Text(json)).await.is_err() - { - return; + // v2 protocol: send our vector clock so the peer can compute the delta. + let our_clock = crdt_state::our_vector_clock().unwrap_or_default(); + let clock_msg = SyncMessage::Clock { clock: our_clock }; + if let Ok(json) = serde_json::to_string(&clock_msg) + && sink.send(WsMessage::Text(json)).await.is_err() + { + return; + } + + // Wait for the peer's first sync message to determine protocol version. + let first_msg = tokio::time::timeout( + std::time::Duration::from_secs(AUTH_TIMEOUT_SECS), + wait_for_sync_text(&mut stream, &mut sink), + ) + .await; + + match first_msg { + Ok(Some(SyncMessage::Clock { clock: peer_clock })) => { + // v2 peer — send only the ops the peer is missing. + let delta = crdt_state::ops_since(&peer_clock).unwrap_or_default(); + slog!( + "[crdt-sync] v2 delta sync: sending {} ops (peer missing)", + delta.len() + ); + let msg = SyncMessage::Bulk { ops: delta }; + if let Ok(json) = serde_json::to_string(&msg) + && sink.send(WsMessage::Text(json)).await.is_err() + { + return; + } + } + Ok(Some(SyncMessage::Bulk { ops })) => { + // v1 peer — apply their bulk and send our full bulk. + let mut applied = 0u64; + for op_json in &ops { + if let Ok(signed_op) = serde_json::from_str::(op_json) + && crdt_state::apply_remote_op(signed_op) + { + applied += 1; + } + } + slog!( + "[crdt-sync] v1 bulk sync: received {} ops, applied {applied}", + ops.len() + ); + if let Some(all) = crdt_state::all_ops_json() { + let msg = SyncMessage::Bulk { ops: all }; + if let Ok(json) = serde_json::to_string(&msg) + && sink.send(WsMessage::Text(json)).await.is_err() + { + return; + } + } + } + Ok(Some(SyncMessage::Op { op })) => { + // Single op before negotiation — treat as v1. + if let Ok(signed_op) = serde_json::from_str::(&op) { + crdt_state::apply_remote_op(signed_op); + } + if let Some(all) = crdt_state::all_ops_json() { + let msg = SyncMessage::Bulk { ops: all }; + if let Ok(json) = serde_json::to_string(&msg) + && sink.send(WsMessage::Text(json)).await.is_err() + { + return; + } + } + } + _ => { + // Timeout or error — send full bulk as fallback. + slog!("[crdt-sync] No sync message from peer; sending full bulk as fallback"); + if let Some(all) = crdt_state::all_ops_json() { + let msg = SyncMessage::Bulk { ops: all }; + if let Ok(json) = serde_json::to_string(&msg) + && sink.send(WsMessage::Text(json)).await.is_err() + { + return; + } + } } } @@ -263,6 +357,28 @@ pub async fn crdt_sync_handler( }) } +/// Wait for the next text-frame sync message from the peer, handling Ping/Pong +/// transparently. +/// +/// Returns `None` on connection close or read error. +async fn wait_for_sync_text( + stream: &mut futures::stream::SplitStream, + sink: &mut futures::stream::SplitSink, +) -> Option { + loop { + match stream.next().await { + Some(Ok(WsMessage::Text(text))) => { + return serde_json::from_str(&text).ok(); + } + Some(Ok(WsMessage::Ping(data))) => { + let _ = sink.send(WsMessage::Pong(data)).await; + } + Some(Ok(WsMessage::Pong(_))) => continue, + _ => return None, + } + } +} + /// Close the WebSocket with a generic `auth_failed` reason. /// /// The close reason is intentionally the same for all auth failures @@ -313,6 +429,12 @@ fn handle_incoming_text(text: &str) { crdt_state::apply_remote_op(signed_op); } } + SyncMessage::Clock { .. } => { + // Clock frames are handled during the initial negotiation phase. + // If one arrives during the streaming loop it is a protocol error + // on the peer's part — log and ignore. + slog!("[crdt-sync] Ignoring unexpected clock frame during streaming phase"); + } } } @@ -426,13 +548,72 @@ async fn connect_and_sync(url: &str) -> Result<(), String> { slog!("[crdt-sync] Auth reply sent, waiting for sync data"); - // Send our bulk state. - if let Some(ops) = crdt_state::all_ops_json() { - let msg = SyncMessage::Bulk { ops }; - if let Ok(json) = serde_json::to_string(&msg) { - sink.send(TungsteniteMsg::Text(json.into())) - .await - .map_err(|e| format!("Send bulk failed: {e}"))?; + // v2 protocol: send our vector clock. + let our_clock = crdt_state::our_vector_clock().unwrap_or_default(); + let clock_msg = SyncMessage::Clock { clock: our_clock }; + if let Ok(json) = serde_json::to_string(&clock_msg) { + sink.send(TungsteniteMsg::Text(json.into())) + .await + .map_err(|e| format!("Send clock failed: {e}"))?; + } + + // Wait for the server's first sync message. + let first_msg = tokio::time::timeout( + std::time::Duration::from_secs(AUTH_TIMEOUT_SECS), + wait_for_rendezvous_sync_text(&mut stream), + ) + .await + .map_err(|_| "Timeout waiting for server sync message".to_string())?; + + match first_msg { + Some(SyncMessage::Clock { clock: peer_clock }) => { + // v2 server — send only the ops the server is missing. + let delta = crdt_state::ops_since(&peer_clock).unwrap_or_default(); + slog!( + "[crdt-sync] v2 delta sync: sending {} ops to server (server missing)", + delta.len() + ); + let msg = SyncMessage::Bulk { ops: delta }; + if let Ok(json) = serde_json::to_string(&msg) { + sink.send(TungsteniteMsg::Text(json.into())) + .await + .map_err(|e| format!("Send delta failed: {e}"))?; + } + } + Some(SyncMessage::Bulk { ops }) => { + // v1 server — apply their bulk and send our full bulk. + let mut applied = 0u64; + for op_json in &ops { + if let Ok(signed_op) = serde_json::from_str::(op_json) + && crdt_state::apply_remote_op(signed_op) + { + applied += 1; + } + } + slog!( + "[crdt-sync] v1 bulk sync: received {} ops from server, applied {applied}", + ops.len() + ); + if let Some(all) = crdt_state::all_ops_json() { + let msg = SyncMessage::Bulk { ops: all }; + if let Ok(json) = serde_json::to_string(&msg) { + sink.send(TungsteniteMsg::Text(json.into())) + .await + .map_err(|e| format!("Send bulk failed: {e}"))?; + } + } + } + _ => { + // Fallback — send full bulk. + slog!("[crdt-sync] No sync message from server; sending full bulk as fallback"); + if let Some(all) = crdt_state::all_ops_json() { + let msg = SyncMessage::Bulk { ops: all }; + if let Ok(json) = serde_json::to_string(&msg) { + sink.send(TungsteniteMsg::Text(json.into())) + .await + .map_err(|e| format!("Send bulk failed: {e}"))?; + } + } } } @@ -516,6 +697,29 @@ async fn connect_and_sync(url: &str) -> Result<(), String> { Ok(()) } +/// Wait for the next text-frame sync message from a tungstenite stream, +/// handling Ping/Pong transparently. +/// +/// Returns `None` on connection close or read error. +async fn wait_for_rendezvous_sync_text( + stream: &mut futures::stream::SplitStream< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + >, +) -> Option { + use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; + loop { + match stream.next().await { + Some(Ok(TungsteniteMsg::Text(text))) => { + return serde_json::from_str(text.as_ref()).ok(); + } + Some(Ok(TungsteniteMsg::Ping(_) | TungsteniteMsg::Pong(_))) => continue, + _ => return None, + } + } +} + // ── Tests ──────────────────────────────────────────────────────────── #[cfg(test)] @@ -2635,4 +2839,139 @@ name = "test" "Node A must disconnect due to keepalive timeout when Node B swallows Pongs" ); } + + // ── Story 631: vector clock wire format tests ─────────────────────── + + /// Clock message serialization round-trip via SyncMessage. + #[test] + fn sync_message_clock_serialization_roundtrip() { + let mut clock = std::collections::HashMap::new(); + clock.insert("aabbcc00".to_string(), 42u64); + clock.insert("ddeeff11".to_string(), 7u64); + + let msg = SyncMessage::Clock { clock }; + let json = serde_json::to_string(&msg).unwrap(); + assert!(json.contains(r#""type":"clock""#)); + + let deserialized: SyncMessage = serde_json::from_str(&json).unwrap(); + match deserialized { + SyncMessage::Clock { clock } => { + assert_eq!(clock["aabbcc00"], 42); + assert_eq!(clock["ddeeff11"], 7); + } + _ => panic!("Expected Clock"), + } + } + + /// Empty clock (new node) serializes correctly. + #[test] + fn sync_message_clock_empty() { + let msg = SyncMessage::Clock { + clock: std::collections::HashMap::new(), + }; + let json = serde_json::to_string(&msg).unwrap(); + let deserialized: SyncMessage = serde_json::from_str(&json).unwrap(); + match deserialized { + SyncMessage::Clock { clock } => assert!(clock.is_empty()), + _ => panic!("Expected Clock"), + } + } + + /// v1 compat: a v1 peer that only knows `bulk` and `op` will fail to parse + /// a `clock` message — verify the parse error is a clean serde error, not a + /// panic. + #[test] + fn v1_peer_ignores_clock_message_gracefully() { + // Simulate: v1 peer only knows Bulk and Op. + // A clock message should fail deserialization (unknown variant). + let clock_json = r#"{"type":"clock","clock":{"abc":10}}"#; + // handle_incoming_text logs and returns — must not panic. + handle_incoming_text(clock_json); + } + + /// v2 delta sync simulation: two CRDT nodes, exchange clocks, send deltas. + #[test] + fn v2_delta_sync_via_clock_exchange() { + use bft_json_crdt::json_crdt::BaseCrdt; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use fastcrypto::traits::KeyPair; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + let kp_a = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + + // A creates 5 items. + let mut ops_a = Vec::new(); + for i in 0..5 { + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": format!("631_v2_{i}"), + "stage": "1_backlog", + "name": format!("v2 item {i}"), + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); + crdt_a.apply(op.clone()); + ops_a.push(op); + } + + // B has seen the first 3 ops (clock says 3). + let kp_b = make_keypair(); + let mut crdt_b = BaseCrdt::::new(&kp_b); + for op in &ops_a[..3] { + crdt_b.apply(op.clone()); + } + assert_eq!(crdt_b.doc.items.view().len(), 3); + + // Build B's clock. + let author_a_hex = crate::crdt_state::hex::encode(&kp_a.public().0.to_bytes()); + let mut clock_b = std::collections::HashMap::new(); + clock_b.insert(author_a_hex.clone(), 3u64); + + // Serialize clock as wire message. + let clock_msg = SyncMessage::Clock { + clock: clock_b.clone(), + }; + let clock_wire = serde_json::to_string(&clock_msg).unwrap(); + + // A receives B's clock and computes delta. + let parsed: SyncMessage = serde_json::from_str(&clock_wire).unwrap(); + let delta_ops = match parsed { + SyncMessage::Clock { clock: peer_clock } => { + // Simulate ops_since: A has 5 ops from author_a, B has 3. + let all_json: Vec = ops_a + .iter() + .map(|op| serde_json::to_string(op).unwrap()) + .collect(); + let mut result = Vec::new(); + let mut count = 0u64; + for (i, _op) in ops_a.iter().enumerate() { + count += 1; + let peer_has = peer_clock.get(&author_a_hex).copied().unwrap_or(0); + if count > peer_has { + result.push(all_json[i].clone()); + } + } + result + } + _ => panic!("Expected Clock"), + }; + + assert_eq!(delta_ops.len(), 2, "delta should be 2 ops (ops 4 and 5)"); + + // B applies the delta. + for op_str in &delta_ops { + let signed: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(op_str).unwrap(); + crdt_b.apply(signed); + } + assert_eq!(crdt_b.doc.items.view().len(), 5); + } } diff --git a/server/src/crdt_wire.rs b/server/src/crdt_wire.rs index 065a33d4..fe39a732 100644 --- a/server/src/crdt_wire.rs +++ b/server/src/crdt_wire.rs @@ -27,6 +27,15 @@ /// Binary fields (`signed_digest`, `depends_on`, `id`, …) use /// `serde_with::Bytes` so they appear as base64 strings in JSON. /// +/// # Sync protocol versions +/// +/// The wire codec encodes individual `SignedOp`s for real-time streaming. +/// The higher-level sync protocol (see [`crate::crdt_sync`]) uses text-frame +/// JSON messages for negotiation: +/// +/// - **v2**: `{"type":"clock","clock":{...}}` — vector clock exchange for delta sync. +/// - **v1**: `{"type":"bulk","ops":[...]}` — full bulk dump (legacy, still supported). +/// /// # Upgrading the format /// /// Bump `WIRE_VERSION` and add a new arm to the `match envelope.v` block in