huskies: merge 631_story_crdt_delta_sync_via_vector_clocks_replace_full_bulk_dumps
This commit is contained in:
+275
-13
@@ -12,6 +12,11 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::{Mutex, OnceLock};
|
use std::sync::{Mutex, OnceLock};
|
||||||
|
|
||||||
|
/// A vector clock mapping node IDs (hex-encoded Ed25519 pubkeys) to the count
|
||||||
|
/// of ops seen from that node. Used for delta sync — a connecting peer sends
|
||||||
|
/// its clock so the other side can compute which ops are missing.
|
||||||
|
pub type VectorClock = HashMap<String, u64>;
|
||||||
|
|
||||||
use bft_json_crdt::json_crdt::*;
|
use bft_json_crdt::json_crdt::*;
|
||||||
use bft_json_crdt::keypair::make_keypair;
|
use bft_json_crdt::keypair::make_keypair;
|
||||||
use bft_json_crdt::list_crdt::ListCrdt;
|
use bft_json_crdt::list_crdt::ListCrdt;
|
||||||
@@ -71,8 +76,71 @@ pub fn all_ops_json() -> Option<Vec<String>> {
|
|||||||
ALL_OPS.get().map(|m| m.lock().unwrap().clone())
|
ALL_OPS.get().map(|m| m.lock().unwrap().clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return this node's current vector clock.
|
||||||
|
///
|
||||||
|
/// The clock maps each author's hex-encoded Ed25519 public key to the count
|
||||||
|
/// of ops received from that author. A connecting peer sends its clock so
|
||||||
|
/// the other side can compute which ops are missing via [`ops_since`].
|
||||||
|
///
|
||||||
|
/// Returns `None` before `init()`.
|
||||||
|
pub fn our_vector_clock() -> Option<VectorClock> {
|
||||||
|
VECTOR_CLOCK.get().map(|m| m.lock().unwrap().clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return only the ops that a peer with the given `peer_clock` is missing.
|
||||||
|
///
|
||||||
|
/// Iterates the local op journal and, for each author, skips the first N ops
|
||||||
|
/// (where N = `peer_clock[author]`) and returns the rest. An empty peer
|
||||||
|
/// clock returns all ops (full sync for new nodes).
|
||||||
|
///
|
||||||
|
/// Returns `None` before `init()`.
|
||||||
|
pub fn ops_since(peer_clock: &VectorClock) -> Option<Vec<String>> {
|
||||||
|
let all = ALL_OPS.get()?.lock().ok()?;
|
||||||
|
let mut author_counts: HashMap<String, u64> = HashMap::new();
|
||||||
|
let mut result = Vec::new();
|
||||||
|
|
||||||
|
for op_json in all.iter() {
|
||||||
|
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json) {
|
||||||
|
let author_hex = hex::encode(&signed_op.author());
|
||||||
|
let count = author_counts.entry(author_hex.clone()).or_insert(0);
|
||||||
|
*count += 1;
|
||||||
|
|
||||||
|
let peer_has = peer_clock.get(&author_hex).copied().unwrap_or(0);
|
||||||
|
if *count > peer_has {
|
||||||
|
result.push(op_json.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(result)
|
||||||
|
}
|
||||||
|
|
||||||
static ALL_OPS: OnceLock<Mutex<Vec<String>>> = OnceLock::new();
|
static ALL_OPS: OnceLock<Mutex<Vec<String>>> = OnceLock::new();
|
||||||
|
|
||||||
|
/// Live vector clock tracking op counts per author.
|
||||||
|
///
|
||||||
|
/// Updated in lockstep with `ALL_OPS` — every time an op is appended to the
|
||||||
|
/// journal, the corresponding author's count is incremented here. This avoids
|
||||||
|
/// re-parsing all ops when a peer requests `our_vector_clock()`.
|
||||||
|
static VECTOR_CLOCK: OnceLock<Mutex<VectorClock>> = OnceLock::new();
|
||||||
|
|
||||||
|
/// Append an op's JSON to `ALL_OPS` and bump the author's count in `VECTOR_CLOCK`.
|
||||||
|
///
|
||||||
|
/// Centralises the bookkeeping that must stay in sync between the two statics.
|
||||||
|
fn track_op(signed: &SignedOp, json: String) {
|
||||||
|
if let Some(all) = ALL_OPS.get()
|
||||||
|
&& let Ok(mut v) = all.lock()
|
||||||
|
{
|
||||||
|
v.push(json);
|
||||||
|
}
|
||||||
|
if let Some(vc) = VECTOR_CLOCK.get()
|
||||||
|
&& let Ok(mut clock) = vc.lock()
|
||||||
|
{
|
||||||
|
let author_hex = hex::encode(&signed.author());
|
||||||
|
*clock.entry(author_hex).or_insert(0) += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ── CRDT document types ──────────────────────────────────────────────
|
// ── CRDT document types ──────────────────────────────────────────────
|
||||||
|
|
||||||
#[add_crdt_fields]
|
#[add_crdt_fields]
|
||||||
@@ -223,8 +291,11 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let mut all_ops_vec = Vec::with_capacity(rows.len());
|
let mut all_ops_vec = Vec::with_capacity(rows.len());
|
||||||
|
let mut vector_clock = VectorClock::new();
|
||||||
for (op_json,) in &rows {
|
for (op_json,) in &rows {
|
||||||
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json) {
|
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json) {
|
||||||
|
let author_hex = hex::encode(&signed_op.author());
|
||||||
|
*vector_clock.entry(author_hex).or_insert(0) += 1;
|
||||||
crdt.apply(signed_op);
|
crdt.apply(signed_op);
|
||||||
all_ops_vec.push(op_json.clone());
|
all_ops_vec.push(op_json.clone());
|
||||||
} else {
|
} else {
|
||||||
@@ -232,6 +303,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
let _ = ALL_OPS.set(Mutex::new(all_ops_vec));
|
let _ = ALL_OPS.set(Mutex::new(all_ops_vec));
|
||||||
|
let _ = VECTOR_CLOCK.set(Mutex::new(vector_clock));
|
||||||
|
|
||||||
// Build the indices from the reconstructed state.
|
// Build the indices from the reconstructed state.
|
||||||
let index = rebuild_index(&crdt);
|
let index = rebuild_index(&crdt);
|
||||||
@@ -328,6 +400,7 @@ pub fn init_for_test() {
|
|||||||
let _ = CRDT_EVENT_TX.get_or_init(|| broadcast::channel::<CrdtEvent>(256).0);
|
let _ = CRDT_EVENT_TX.get_or_init(|| broadcast::channel::<CrdtEvent>(256).0);
|
||||||
let _ = SYNC_TX.get_or_init(|| broadcast::channel::<SignedOp>(1024).0);
|
let _ = SYNC_TX.get_or_init(|| broadcast::channel::<SignedOp>(1024).0);
|
||||||
let _ = ALL_OPS.get_or_init(|| Mutex::new(Vec::new()));
|
let _ = ALL_OPS.get_or_init(|| Mutex::new(Vec::new()));
|
||||||
|
let _ = VECTOR_CLOCK.get_or_init(|| Mutex::new(VectorClock::new()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Load or create the Ed25519 keypair used by this node.
|
/// Load or create the Ed25519 keypair used by this node.
|
||||||
@@ -396,12 +469,9 @@ where
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track in ALL_OPS and broadcast to sync peers.
|
// Track in ALL_OPS + VECTOR_CLOCK, then broadcast to sync peers.
|
||||||
if let Ok(json) = serde_json::to_string(&signed)
|
if let Ok(json) = serde_json::to_string(&signed) {
|
||||||
&& let Some(all) = ALL_OPS.get()
|
track_op(&signed, json);
|
||||||
&& let Ok(mut v) = all.lock()
|
|
||||||
{
|
|
||||||
v.push(json);
|
|
||||||
}
|
}
|
||||||
if let Some(tx) = SYNC_TX.get() {
|
if let Some(tx) = SYNC_TX.get() {
|
||||||
let _ = tx.send(signed);
|
let _ = tx.send(signed);
|
||||||
@@ -586,12 +656,9 @@ pub fn apply_remote_op(op: SignedOp) -> bool {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track in ALL_OPS.
|
// Track in ALL_OPS + VECTOR_CLOCK.
|
||||||
if let Ok(json) = serde_json::to_string(&op)
|
if let Ok(json) = serde_json::to_string(&op) {
|
||||||
&& let Some(all) = ALL_OPS.get()
|
track_op(&op, json);
|
||||||
&& let Ok(mut v) = all.lock()
|
|
||||||
{
|
|
||||||
v.push(json);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rebuild indices (new items or nodes may have been inserted).
|
// Rebuild indices (new items or nodes may have been inserted).
|
||||||
@@ -1180,7 +1247,7 @@ pub fn check_archived_deps_crdt(story_id: &str) -> Vec<u32> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Hex-encode a byte slice (no external dep needed).
|
/// Hex-encode a byte slice (no external dep needed).
|
||||||
mod hex {
|
pub(crate) mod hex {
|
||||||
pub fn encode(bytes: &[u8]) -> String {
|
pub fn encode(bytes: &[u8]) -> String {
|
||||||
bytes.iter().map(|b| format!("{b:02x}")).collect()
|
bytes.iter().map(|b| format!("{b:02x}")).collect()
|
||||||
}
|
}
|
||||||
@@ -1853,4 +1920,199 @@ mod tests {
|
|||||||
last_error.message
|
last_error.message
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Story 631: vector clock delta sync tests ────────────────────────
|
||||||
|
|
||||||
|
/// Helper: create N signed insert ops on a CRDT and return them with their JSON.
|
||||||
|
fn make_ops(
|
||||||
|
kp: &Ed25519KeyPair,
|
||||||
|
crdt: &mut BaseCrdt<PipelineDoc>,
|
||||||
|
count: usize,
|
||||||
|
prefix: &str,
|
||||||
|
) -> Vec<(SignedOp, String)> {
|
||||||
|
let mut ops = Vec::new();
|
||||||
|
for i in 0..count {
|
||||||
|
let item: JsonValue = json!({
|
||||||
|
"story_id": format!("{prefix}_{i}"),
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": format!("Item {i}"),
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let op = crdt.doc.items.insert(ROOT_ID, item).sign(kp);
|
||||||
|
crdt.apply(op.clone());
|
||||||
|
let json = serde_json::to_string(&op).unwrap();
|
||||||
|
ops.push((op, json));
|
||||||
|
}
|
||||||
|
ops
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build a vector clock from a list of (SignedOp, json) pairs.
|
||||||
|
fn build_clock(ops: &[(SignedOp, String)]) -> VectorClock {
|
||||||
|
let mut clock = VectorClock::new();
|
||||||
|
for (op, _) in ops {
|
||||||
|
let author = hex::encode(&op.author());
|
||||||
|
*clock.entry(author).or_insert(0) += 1;
|
||||||
|
}
|
||||||
|
clock
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Compute ops_since against a local journal and peer clock.
|
||||||
|
///
|
||||||
|
/// Mirrors the production `ops_since` logic but operates on a local Vec
|
||||||
|
/// instead of the global `ALL_OPS` static.
|
||||||
|
fn local_ops_since(all_ops: &[(SignedOp, String)], peer_clock: &VectorClock) -> Vec<String> {
|
||||||
|
let mut author_counts: HashMap<String, u64> = HashMap::new();
|
||||||
|
let mut result = Vec::new();
|
||||||
|
for (op, json) in all_ops {
|
||||||
|
let author = hex::encode(&op.author());
|
||||||
|
let count = author_counts.entry(author.clone()).or_insert(0);
|
||||||
|
*count += 1;
|
||||||
|
let peer_has = peer_clock.get(&author).copied().unwrap_or(0);
|
||||||
|
if *count > peer_has {
|
||||||
|
result.push(json.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Integration test (low-bandwidth sync): two nodes, A applies 100 ops,
|
||||||
|
/// B reconnects with a current clock — B receives 0 ops on the bulk phase.
|
||||||
|
#[test]
|
||||||
|
fn delta_sync_low_bandwidth_fully_caught_up() {
|
||||||
|
let kp_a = make_keypair();
|
||||||
|
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
|
||||||
|
|
||||||
|
let ops_a = make_ops(&kp_a, &mut crdt_a, 100, "631_low");
|
||||||
|
|
||||||
|
// B has already seen all 100 ops (its clock matches A's journal).
|
||||||
|
let clock_b = build_clock(&ops_a);
|
||||||
|
|
||||||
|
// Delta should be empty.
|
||||||
|
let delta = local_ops_since(&ops_a, &clock_b);
|
||||||
|
assert_eq!(
|
||||||
|
delta.len(),
|
||||||
|
0,
|
||||||
|
"caught-up peer should receive 0 ops, got {}",
|
||||||
|
delta.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Integration test (mid-stream): A applies 100 ops, B disconnects,
|
||||||
|
/// A applies 50 more ops, B reconnects — B receives exactly the 50 missed ops.
|
||||||
|
#[test]
|
||||||
|
fn delta_sync_mid_stream_partial_catch_up() {
|
||||||
|
let kp_a = make_keypair();
|
||||||
|
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
|
||||||
|
|
||||||
|
// Phase 1: 100 ops that B has seen.
|
||||||
|
let ops_phase1 = make_ops(&kp_a, &mut crdt_a, 100, "631_mid1");
|
||||||
|
let clock_b = build_clock(&ops_phase1);
|
||||||
|
|
||||||
|
// Phase 2: 50 more ops that B missed.
|
||||||
|
let ops_phase2 = make_ops(&kp_a, &mut crdt_a, 50, "631_mid2");
|
||||||
|
|
||||||
|
// A's full journal is phase1 + phase2.
|
||||||
|
let mut all_ops_a: Vec<(SignedOp, String)> = ops_phase1;
|
||||||
|
all_ops_a.extend(ops_phase2);
|
||||||
|
|
||||||
|
let delta = local_ops_since(&all_ops_a, &clock_b);
|
||||||
|
assert_eq!(
|
||||||
|
delta.len(),
|
||||||
|
50,
|
||||||
|
"peer should receive exactly 50 missed ops, got {}",
|
||||||
|
delta.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Integration test (new node): C connects with empty clock,
|
||||||
|
/// receives all 150 ops — verifies fallback behaviour.
|
||||||
|
#[test]
|
||||||
|
fn delta_sync_new_node_receives_all_ops() {
|
||||||
|
let kp_a = make_keypair();
|
||||||
|
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
|
||||||
|
|
||||||
|
let ops_phase1 = make_ops(&kp_a, &mut crdt_a, 100, "631_new1");
|
||||||
|
let ops_phase2 = make_ops(&kp_a, &mut crdt_a, 50, "631_new2");
|
||||||
|
|
||||||
|
let mut all_ops_a: Vec<(SignedOp, String)> = ops_phase1;
|
||||||
|
all_ops_a.extend(ops_phase2);
|
||||||
|
|
||||||
|
// Empty clock = new node.
|
||||||
|
let empty_clock = VectorClock::new();
|
||||||
|
let delta = local_ops_since(&all_ops_a, &empty_clock);
|
||||||
|
assert_eq!(
|
||||||
|
delta.len(),
|
||||||
|
150,
|
||||||
|
"new node should receive all 150 ops, got {}",
|
||||||
|
delta.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Multi-author delta sync: ops from two different nodes, peer has seen
|
||||||
|
/// all of one author but none of the other.
|
||||||
|
#[test]
|
||||||
|
fn delta_sync_multi_author() {
|
||||||
|
use fastcrypto::traits::KeyPair;
|
||||||
|
|
||||||
|
let kp_a = make_keypair();
|
||||||
|
let kp_b = make_keypair();
|
||||||
|
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
|
||||||
|
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
|
||||||
|
|
||||||
|
let ops_a = make_ops(&kp_a, &mut crdt_a, 30, "631_ma_a");
|
||||||
|
let ops_b = make_ops(&kp_b, &mut crdt_b, 20, "631_ma_b");
|
||||||
|
|
||||||
|
// Combined journal on a hypothetical server.
|
||||||
|
let mut all_ops: Vec<(SignedOp, String)> = ops_a.clone();
|
||||||
|
all_ops.extend(ops_b);
|
||||||
|
|
||||||
|
// Peer has seen all of A's ops but none of B's.
|
||||||
|
let mut peer_clock = VectorClock::new();
|
||||||
|
let author_a_hex = hex::encode(&kp_a.public().0.to_bytes());
|
||||||
|
peer_clock.insert(author_a_hex, 30);
|
||||||
|
|
||||||
|
let delta = local_ops_since(&all_ops, &peer_clock);
|
||||||
|
assert_eq!(
|
||||||
|
delta.len(),
|
||||||
|
20,
|
||||||
|
"peer should receive 20 ops from author B, got {}",
|
||||||
|
delta.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Vector clock construction from ops.
|
||||||
|
#[test]
|
||||||
|
fn build_vector_clock_from_ops() {
|
||||||
|
use fastcrypto::traits::KeyPair;
|
||||||
|
|
||||||
|
let kp = make_keypair();
|
||||||
|
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
let ops = make_ops(&kp, &mut crdt, 10, "631_vc");
|
||||||
|
|
||||||
|
let clock = build_clock(&ops);
|
||||||
|
let author_hex = hex::encode(&kp.public().0.to_bytes());
|
||||||
|
|
||||||
|
assert_eq!(clock.len(), 1, "single author should produce 1 clock entry");
|
||||||
|
assert_eq!(clock[&author_hex], 10, "clock should show 10 ops");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wire format: clock message serialization round-trip.
|
||||||
|
#[test]
|
||||||
|
fn clock_message_serialization_roundtrip() {
|
||||||
|
let mut clock = VectorClock::new();
|
||||||
|
clock.insert("aabbcc".to_string(), 42);
|
||||||
|
clock.insert("ddeeff".to_string(), 7);
|
||||||
|
|
||||||
|
let json = serde_json::to_value(&clock).unwrap();
|
||||||
|
assert!(json.is_object());
|
||||||
|
let deserialized: VectorClock = serde_json::from_value(json).unwrap();
|
||||||
|
assert_eq!(deserialized["aabbcc"], 42);
|
||||||
|
assert_eq!(deserialized["ddeeff"], 7);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+350
-11
@@ -4,12 +4,26 @@
|
|||||||
///
|
///
|
||||||
/// # Protocol
|
/// # Protocol
|
||||||
///
|
///
|
||||||
/// The sync protocol is a hybrid of two frame types:
|
/// ## Version negotiation
|
||||||
///
|
///
|
||||||
/// ## Text frames (bulk initial state)
|
/// After the auth handshake, both sides send their first sync message:
|
||||||
|
///
|
||||||
|
/// - **v2 peers** send a `clock` frame: `{"type":"clock","clock":{ <node_id_hex>: <max_count>, ... }}`
|
||||||
|
/// containing a vector clock that maps each author's hex Ed25519 pubkey to the
|
||||||
|
/// count of ops received from that author. Upon receiving the peer's clock,
|
||||||
|
/// each side computes the delta via [`crdt_state::ops_since`] and sends only
|
||||||
|
/// the missing ops as a `bulk` frame.
|
||||||
|
///
|
||||||
|
/// - **v1 (legacy) peers** send a `bulk` frame directly (full op dump).
|
||||||
|
/// A v2 peer receiving a `bulk` first (instead of a `clock`) falls back to
|
||||||
|
/// the full-dump path: applies the incoming bulk and responds with its own
|
||||||
|
/// full bulk. This preserves backward compatibility — no code change needed
|
||||||
|
/// on the v1 side.
|
||||||
|
///
|
||||||
|
/// ## Text frames
|
||||||
/// A JSON object with a `"type"` field:
|
/// A JSON object with a `"type"` field:
|
||||||
/// - `{"type":"bulk","ops":[...]}` — Initial state dump (array of serialised
|
/// - `{"type":"clock","clock":{...}}` — Vector clock (v2 protocol).
|
||||||
/// `SignedOp` JSON strings). Sent by both sides immediately after connect.
|
/// - `{"type":"bulk","ops":[...]}` — Ops dump (full or delta).
|
||||||
///
|
///
|
||||||
/// ## Binary frames (real-time op broadcast)
|
/// ## Binary frames (real-time op broadcast)
|
||||||
/// Individual `SignedOp`s encoded via [`crate::crdt_wire`] (versioned JSON
|
/// Individual `SignedOp`s encoded via [`crate::crdt_wire`] (versioned JSON
|
||||||
@@ -89,10 +103,18 @@ struct AuthMessage {
|
|||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
#[serde(tag = "type", rename_all = "snake_case")]
|
#[serde(tag = "type", rename_all = "snake_case")]
|
||||||
enum SyncMessage {
|
enum SyncMessage {
|
||||||
/// Bulk state dump sent on connect.
|
/// Bulk state dump sent on connect (v1) or delta ops after clock exchange (v2).
|
||||||
Bulk { ops: Vec<String> },
|
Bulk { ops: Vec<String> },
|
||||||
/// A single new op.
|
/// A single new op.
|
||||||
Op { op: String },
|
Op { op: String },
|
||||||
|
/// Vector clock exchanged on connect (v2 protocol).
|
||||||
|
///
|
||||||
|
/// Each entry maps a node's hex-encoded Ed25519 public key to the count of
|
||||||
|
/// ops received from that node. The receiving side computes the delta via
|
||||||
|
/// [`crdt_state::ops_since`] and sends only the missing ops.
|
||||||
|
Clock {
|
||||||
|
clock: std::collections::HashMap<String, u64>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Server-side WebSocket handler ───────────────────────────────────
|
// ── Server-side WebSocket handler ───────────────────────────────────
|
||||||
@@ -174,15 +196,87 @@ pub async fn crdt_sync_handler(
|
|||||||
|
|
||||||
// ── Auth passed — proceed with CRDT sync ──────────────────
|
// ── Auth passed — proceed with CRDT sync ──────────────────
|
||||||
|
|
||||||
// Send bulk state dump.
|
// v2 protocol: send our vector clock so the peer can compute the delta.
|
||||||
if let Some(ops) = crdt_state::all_ops_json() {
|
let our_clock = crdt_state::our_vector_clock().unwrap_or_default();
|
||||||
let msg = SyncMessage::Bulk { ops };
|
let clock_msg = SyncMessage::Clock { clock: our_clock };
|
||||||
|
if let Ok(json) = serde_json::to_string(&clock_msg)
|
||||||
|
&& sink.send(WsMessage::Text(json)).await.is_err()
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the peer's first sync message to determine protocol version.
|
||||||
|
let first_msg = tokio::time::timeout(
|
||||||
|
std::time::Duration::from_secs(AUTH_TIMEOUT_SECS),
|
||||||
|
wait_for_sync_text(&mut stream, &mut sink),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match first_msg {
|
||||||
|
Ok(Some(SyncMessage::Clock { clock: peer_clock })) => {
|
||||||
|
// v2 peer — send only the ops the peer is missing.
|
||||||
|
let delta = crdt_state::ops_since(&peer_clock).unwrap_or_default();
|
||||||
|
slog!(
|
||||||
|
"[crdt-sync] v2 delta sync: sending {} ops (peer missing)",
|
||||||
|
delta.len()
|
||||||
|
);
|
||||||
|
let msg = SyncMessage::Bulk { ops: delta };
|
||||||
if let Ok(json) = serde_json::to_string(&msg)
|
if let Ok(json) = serde_json::to_string(&msg)
|
||||||
&& sink.send(WsMessage::Text(json)).await.is_err()
|
&& sink.send(WsMessage::Text(json)).await.is_err()
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(Some(SyncMessage::Bulk { ops })) => {
|
||||||
|
// v1 peer — apply their bulk and send our full bulk.
|
||||||
|
let mut applied = 0u64;
|
||||||
|
for op_json in &ops {
|
||||||
|
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json)
|
||||||
|
&& crdt_state::apply_remote_op(signed_op)
|
||||||
|
{
|
||||||
|
applied += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
slog!(
|
||||||
|
"[crdt-sync] v1 bulk sync: received {} ops, applied {applied}",
|
||||||
|
ops.len()
|
||||||
|
);
|
||||||
|
if let Some(all) = crdt_state::all_ops_json() {
|
||||||
|
let msg = SyncMessage::Bulk { ops: all };
|
||||||
|
if let Ok(json) = serde_json::to_string(&msg)
|
||||||
|
&& sink.send(WsMessage::Text(json)).await.is_err()
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Some(SyncMessage::Op { op })) => {
|
||||||
|
// Single op before negotiation — treat as v1.
|
||||||
|
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(&op) {
|
||||||
|
crdt_state::apply_remote_op(signed_op);
|
||||||
|
}
|
||||||
|
if let Some(all) = crdt_state::all_ops_json() {
|
||||||
|
let msg = SyncMessage::Bulk { ops: all };
|
||||||
|
if let Ok(json) = serde_json::to_string(&msg)
|
||||||
|
&& sink.send(WsMessage::Text(json)).await.is_err()
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// Timeout or error — send full bulk as fallback.
|
||||||
|
slog!("[crdt-sync] No sync message from peer; sending full bulk as fallback");
|
||||||
|
if let Some(all) = crdt_state::all_ops_json() {
|
||||||
|
let msg = SyncMessage::Bulk { ops: all };
|
||||||
|
if let Ok(json) = serde_json::to_string(&msg)
|
||||||
|
&& sink.send(WsMessage::Text(json)).await.is_err()
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Subscribe to new local ops.
|
// Subscribe to new local ops.
|
||||||
let Some(mut op_rx) = crdt_state::subscribe_ops() else {
|
let Some(mut op_rx) = crdt_state::subscribe_ops() else {
|
||||||
@@ -263,6 +357,28 @@ pub async fn crdt_sync_handler(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wait for the next text-frame sync message from the peer, handling Ping/Pong
|
||||||
|
/// transparently.
|
||||||
|
///
|
||||||
|
/// Returns `None` on connection close or read error.
|
||||||
|
async fn wait_for_sync_text(
|
||||||
|
stream: &mut futures::stream::SplitStream<poem::web::websocket::WebSocketStream>,
|
||||||
|
sink: &mut futures::stream::SplitSink<poem::web::websocket::WebSocketStream, WsMessage>,
|
||||||
|
) -> Option<SyncMessage> {
|
||||||
|
loop {
|
||||||
|
match stream.next().await {
|
||||||
|
Some(Ok(WsMessage::Text(text))) => {
|
||||||
|
return serde_json::from_str(&text).ok();
|
||||||
|
}
|
||||||
|
Some(Ok(WsMessage::Ping(data))) => {
|
||||||
|
let _ = sink.send(WsMessage::Pong(data)).await;
|
||||||
|
}
|
||||||
|
Some(Ok(WsMessage::Pong(_))) => continue,
|
||||||
|
_ => return None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Close the WebSocket with a generic `auth_failed` reason.
|
/// Close the WebSocket with a generic `auth_failed` reason.
|
||||||
///
|
///
|
||||||
/// The close reason is intentionally the same for all auth failures
|
/// The close reason is intentionally the same for all auth failures
|
||||||
@@ -313,6 +429,12 @@ fn handle_incoming_text(text: &str) {
|
|||||||
crdt_state::apply_remote_op(signed_op);
|
crdt_state::apply_remote_op(signed_op);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
SyncMessage::Clock { .. } => {
|
||||||
|
// Clock frames are handled during the initial negotiation phase.
|
||||||
|
// If one arrives during the streaming loop it is a protocol error
|
||||||
|
// on the peer's part — log and ignore.
|
||||||
|
slog!("[crdt-sync] Ignoring unexpected clock frame during streaming phase");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -426,15 +548,74 @@ async fn connect_and_sync(url: &str) -> Result<(), String> {
|
|||||||
|
|
||||||
slog!("[crdt-sync] Auth reply sent, waiting for sync data");
|
slog!("[crdt-sync] Auth reply sent, waiting for sync data");
|
||||||
|
|
||||||
// Send our bulk state.
|
// v2 protocol: send our vector clock.
|
||||||
if let Some(ops) = crdt_state::all_ops_json() {
|
let our_clock = crdt_state::our_vector_clock().unwrap_or_default();
|
||||||
let msg = SyncMessage::Bulk { ops };
|
let clock_msg = SyncMessage::Clock { clock: our_clock };
|
||||||
|
if let Ok(json) = serde_json::to_string(&clock_msg) {
|
||||||
|
sink.send(TungsteniteMsg::Text(json.into()))
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("Send clock failed: {e}"))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the server's first sync message.
|
||||||
|
let first_msg = tokio::time::timeout(
|
||||||
|
std::time::Duration::from_secs(AUTH_TIMEOUT_SECS),
|
||||||
|
wait_for_rendezvous_sync_text(&mut stream),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|_| "Timeout waiting for server sync message".to_string())?;
|
||||||
|
|
||||||
|
match first_msg {
|
||||||
|
Some(SyncMessage::Clock { clock: peer_clock }) => {
|
||||||
|
// v2 server — send only the ops the server is missing.
|
||||||
|
let delta = crdt_state::ops_since(&peer_clock).unwrap_or_default();
|
||||||
|
slog!(
|
||||||
|
"[crdt-sync] v2 delta sync: sending {} ops to server (server missing)",
|
||||||
|
delta.len()
|
||||||
|
);
|
||||||
|
let msg = SyncMessage::Bulk { ops: delta };
|
||||||
|
if let Ok(json) = serde_json::to_string(&msg) {
|
||||||
|
sink.send(TungsteniteMsg::Text(json.into()))
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("Send delta failed: {e}"))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(SyncMessage::Bulk { ops }) => {
|
||||||
|
// v1 server — apply their bulk and send our full bulk.
|
||||||
|
let mut applied = 0u64;
|
||||||
|
for op_json in &ops {
|
||||||
|
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json)
|
||||||
|
&& crdt_state::apply_remote_op(signed_op)
|
||||||
|
{
|
||||||
|
applied += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
slog!(
|
||||||
|
"[crdt-sync] v1 bulk sync: received {} ops from server, applied {applied}",
|
||||||
|
ops.len()
|
||||||
|
);
|
||||||
|
if let Some(all) = crdt_state::all_ops_json() {
|
||||||
|
let msg = SyncMessage::Bulk { ops: all };
|
||||||
if let Ok(json) = serde_json::to_string(&msg) {
|
if let Ok(json) = serde_json::to_string(&msg) {
|
||||||
sink.send(TungsteniteMsg::Text(json.into()))
|
sink.send(TungsteniteMsg::Text(json.into()))
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format!("Send bulk failed: {e}"))?;
|
.map_err(|e| format!("Send bulk failed: {e}"))?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// Fallback — send full bulk.
|
||||||
|
slog!("[crdt-sync] No sync message from server; sending full bulk as fallback");
|
||||||
|
if let Some(all) = crdt_state::all_ops_json() {
|
||||||
|
let msg = SyncMessage::Bulk { ops: all };
|
||||||
|
if let Ok(json) = serde_json::to_string(&msg) {
|
||||||
|
sink.send(TungsteniteMsg::Text(json.into()))
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("Send bulk failed: {e}"))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Subscribe to new local ops.
|
// Subscribe to new local ops.
|
||||||
let Some(mut op_rx) = crdt_state::subscribe_ops() else {
|
let Some(mut op_rx) = crdt_state::subscribe_ops() else {
|
||||||
@@ -516,6 +697,29 @@ async fn connect_and_sync(url: &str) -> Result<(), String> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wait for the next text-frame sync message from a tungstenite stream,
|
||||||
|
/// handling Ping/Pong transparently.
|
||||||
|
///
|
||||||
|
/// Returns `None` on connection close or read error.
|
||||||
|
async fn wait_for_rendezvous_sync_text(
|
||||||
|
stream: &mut futures::stream::SplitStream<
|
||||||
|
tokio_tungstenite::WebSocketStream<
|
||||||
|
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
|
||||||
|
>,
|
||||||
|
>,
|
||||||
|
) -> Option<SyncMessage> {
|
||||||
|
use tokio_tungstenite::tungstenite::Message as TungsteniteMsg;
|
||||||
|
loop {
|
||||||
|
match stream.next().await {
|
||||||
|
Some(Ok(TungsteniteMsg::Text(text))) => {
|
||||||
|
return serde_json::from_str(text.as_ref()).ok();
|
||||||
|
}
|
||||||
|
Some(Ok(TungsteniteMsg::Ping(_) | TungsteniteMsg::Pong(_))) => continue,
|
||||||
|
_ => return None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ── Tests ────────────────────────────────────────────────────────────
|
// ── Tests ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -2635,4 +2839,139 @@ name = "test"
|
|||||||
"Node A must disconnect due to keepalive timeout when Node B swallows Pongs"
|
"Node A must disconnect due to keepalive timeout when Node B swallows Pongs"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Story 631: vector clock wire format tests ───────────────────────
|
||||||
|
|
||||||
|
/// Clock message serialization round-trip via SyncMessage.
|
||||||
|
#[test]
|
||||||
|
fn sync_message_clock_serialization_roundtrip() {
|
||||||
|
let mut clock = std::collections::HashMap::new();
|
||||||
|
clock.insert("aabbcc00".to_string(), 42u64);
|
||||||
|
clock.insert("ddeeff11".to_string(), 7u64);
|
||||||
|
|
||||||
|
let msg = SyncMessage::Clock { clock };
|
||||||
|
let json = serde_json::to_string(&msg).unwrap();
|
||||||
|
assert!(json.contains(r#""type":"clock""#));
|
||||||
|
|
||||||
|
let deserialized: SyncMessage = serde_json::from_str(&json).unwrap();
|
||||||
|
match deserialized {
|
||||||
|
SyncMessage::Clock { clock } => {
|
||||||
|
assert_eq!(clock["aabbcc00"], 42);
|
||||||
|
assert_eq!(clock["ddeeff11"], 7);
|
||||||
|
}
|
||||||
|
_ => panic!("Expected Clock"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Empty clock (new node) serializes correctly.
|
||||||
|
#[test]
|
||||||
|
fn sync_message_clock_empty() {
|
||||||
|
let msg = SyncMessage::Clock {
|
||||||
|
clock: std::collections::HashMap::new(),
|
||||||
|
};
|
||||||
|
let json = serde_json::to_string(&msg).unwrap();
|
||||||
|
let deserialized: SyncMessage = serde_json::from_str(&json).unwrap();
|
||||||
|
match deserialized {
|
||||||
|
SyncMessage::Clock { clock } => assert!(clock.is_empty()),
|
||||||
|
_ => panic!("Expected Clock"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// v1 compat: a v1 peer that only knows `bulk` and `op` will fail to parse
|
||||||
|
/// a `clock` message — verify the parse error is a clean serde error, not a
|
||||||
|
/// panic.
|
||||||
|
#[test]
|
||||||
|
fn v1_peer_ignores_clock_message_gracefully() {
|
||||||
|
// Simulate: v1 peer only knows Bulk and Op.
|
||||||
|
// A clock message should fail deserialization (unknown variant).
|
||||||
|
let clock_json = r#"{"type":"clock","clock":{"abc":10}}"#;
|
||||||
|
// handle_incoming_text logs and returns — must not panic.
|
||||||
|
handle_incoming_text(clock_json);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// v2 delta sync simulation: two CRDT nodes, exchange clocks, send deltas.
|
||||||
|
#[test]
|
||||||
|
fn v2_delta_sync_via_clock_exchange() {
|
||||||
|
use bft_json_crdt::json_crdt::BaseCrdt;
|
||||||
|
use bft_json_crdt::keypair::make_keypair;
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use fastcrypto::traits::KeyPair;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use crate::crdt_state::PipelineDoc;
|
||||||
|
|
||||||
|
let kp_a = make_keypair();
|
||||||
|
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
|
||||||
|
|
||||||
|
// A creates 5 items.
|
||||||
|
let mut ops_a = Vec::new();
|
||||||
|
for i in 0..5 {
|
||||||
|
let item: bft_json_crdt::json_crdt::JsonValue = json!({
|
||||||
|
"story_id": format!("631_v2_{i}"),
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": format!("v2 item {i}"),
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let op = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a);
|
||||||
|
crdt_a.apply(op.clone());
|
||||||
|
ops_a.push(op);
|
||||||
|
}
|
||||||
|
|
||||||
|
// B has seen the first 3 ops (clock says 3).
|
||||||
|
let kp_b = make_keypair();
|
||||||
|
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
|
||||||
|
for op in &ops_a[..3] {
|
||||||
|
crdt_b.apply(op.clone());
|
||||||
|
}
|
||||||
|
assert_eq!(crdt_b.doc.items.view().len(), 3);
|
||||||
|
|
||||||
|
// Build B's clock.
|
||||||
|
let author_a_hex = crate::crdt_state::hex::encode(&kp_a.public().0.to_bytes());
|
||||||
|
let mut clock_b = std::collections::HashMap::new();
|
||||||
|
clock_b.insert(author_a_hex.clone(), 3u64);
|
||||||
|
|
||||||
|
// Serialize clock as wire message.
|
||||||
|
let clock_msg = SyncMessage::Clock {
|
||||||
|
clock: clock_b.clone(),
|
||||||
|
};
|
||||||
|
let clock_wire = serde_json::to_string(&clock_msg).unwrap();
|
||||||
|
|
||||||
|
// A receives B's clock and computes delta.
|
||||||
|
let parsed: SyncMessage = serde_json::from_str(&clock_wire).unwrap();
|
||||||
|
let delta_ops = match parsed {
|
||||||
|
SyncMessage::Clock { clock: peer_clock } => {
|
||||||
|
// Simulate ops_since: A has 5 ops from author_a, B has 3.
|
||||||
|
let all_json: Vec<String> = ops_a
|
||||||
|
.iter()
|
||||||
|
.map(|op| serde_json::to_string(op).unwrap())
|
||||||
|
.collect();
|
||||||
|
let mut result = Vec::new();
|
||||||
|
let mut count = 0u64;
|
||||||
|
for (i, _op) in ops_a.iter().enumerate() {
|
||||||
|
count += 1;
|
||||||
|
let peer_has = peer_clock.get(&author_a_hex).copied().unwrap_or(0);
|
||||||
|
if count > peer_has {
|
||||||
|
result.push(all_json[i].clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result
|
||||||
|
}
|
||||||
|
_ => panic!("Expected Clock"),
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(delta_ops.len(), 2, "delta should be 2 ops (ops 4 and 5)");
|
||||||
|
|
||||||
|
// B applies the delta.
|
||||||
|
for op_str in &delta_ops {
|
||||||
|
let signed: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(op_str).unwrap();
|
||||||
|
crdt_b.apply(signed);
|
||||||
|
}
|
||||||
|
assert_eq!(crdt_b.doc.items.view().len(), 5);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,6 +27,15 @@
|
|||||||
/// Binary fields (`signed_digest`, `depends_on`, `id`, …) use
|
/// Binary fields (`signed_digest`, `depends_on`, `id`, …) use
|
||||||
/// `serde_with::Bytes` so they appear as base64 strings in JSON.
|
/// `serde_with::Bytes` so they appear as base64 strings in JSON.
|
||||||
///
|
///
|
||||||
|
/// # Sync protocol versions
|
||||||
|
///
|
||||||
|
/// The wire codec encodes individual `SignedOp`s for real-time streaming.
|
||||||
|
/// The higher-level sync protocol (see [`crate::crdt_sync`]) uses text-frame
|
||||||
|
/// JSON messages for negotiation:
|
||||||
|
///
|
||||||
|
/// - **v2**: `{"type":"clock","clock":{...}}` — vector clock exchange for delta sync.
|
||||||
|
/// - **v1**: `{"type":"bulk","ops":[...]}` — full bulk dump (legacy, still supported).
|
||||||
|
///
|
||||||
/// # Upgrading the format
|
/// # Upgrading the format
|
||||||
///
|
///
|
||||||
/// Bump `WIRE_VERSION` and add a new arm to the `match envelope.v` block in
|
/// Bump `WIRE_VERSION` and add a new arm to the `match envelope.v` block in
|
||||||
|
|||||||
Reference in New Issue
Block a user