huskies: merge 834

This commit is contained in:
dave
2026-04-29 10:07:21 +00:00
parent 0403dc9871
commit d22a591fdc
4 changed files with 1039 additions and 1028 deletions
File diff suppressed because it is too large Load Diff
+83
View File
@@ -0,0 +1,83 @@
//! Shared message dispatch — handle inbound text/binary frames once auth is complete.
use bft_json_crdt::json_crdt::SignedOp;
use crate::crdt_snapshot;
use crate::crdt_state;
use crate::crdt_wire;
use crate::slog;
use super::wire::SyncMessage;
mod snapshot;
#[cfg(test)]
mod tests;
/// Dispatch an incoming text frame from a peer.
///
/// First attempts to parse the frame as a snapshot protocol message; on
/// failure falls through to the legacy [`SyncMessage`] handler.
pub(super) fn handle_incoming_text(text: &str) {
// First try to parse as a snapshot protocol message.
if let Ok(snapshot_msg) = serde_json::from_str::<crdt_snapshot::SnapshotMessage>(text) {
snapshot::handle_snapshot_message(snapshot_msg);
return;
}
let msg: SyncMessage = match serde_json::from_str(text) {
Ok(m) => m,
Err(e) => {
slog!("[crdt-sync] Bad text message from peer: {e}");
return;
}
};
match msg {
SyncMessage::Bulk { ops } => {
let mut applied = 0u64;
for op_json in &ops {
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json)
&& crdt_state::apply_remote_op(signed_op)
{
applied += 1;
}
}
slog!(
"[crdt-sync] Bulk sync: received {} ops, applied {applied}",
ops.len()
);
}
SyncMessage::Op { op } => {
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(&op) {
crdt_state::apply_remote_op(signed_op);
}
}
SyncMessage::Clock { .. } => {
// Clock frames are handled during the initial negotiation phase.
// If one arrives during the streaming loop it is a protocol error
// on the peer's part — log and ignore.
slog!("[crdt-sync] Ignoring unexpected clock frame during streaming phase");
}
SyncMessage::Ready => {
// Ready frames are intercepted inline in the streaming loop before
// this function is called. If one reaches here it is a protocol
// error — log and ignore.
slog!("[crdt-sync] Ignoring unexpected ready frame in handle_incoming_text");
}
}
}
/// Process an incoming binary-frame op from a peer.
///
/// Binary frames carry a single `SignedOp` encoded via [`crdt_wire`].
pub(super) fn handle_incoming_binary(bytes: &[u8]) {
match crdt_wire::decode(bytes) {
Ok(signed_op) => {
crdt_state::apply_remote_op(signed_op);
}
Err(e) => {
slog!("[crdt-sync] Bad binary frame from peer: {e}");
}
}
}
+49
View File
@@ -0,0 +1,49 @@
//! Snapshot protocol message handling for the CRDT sync layer.
use crate::crdt_snapshot;
use crate::crdt_state;
use crate::slog;
/// Handle an incoming snapshot protocol message.
///
/// - **Snapshot**: apply the snapshot state and send an ack back.
/// Peers without snapshot support will never reach this code path because
/// the `SnapshotMessage` parse will fail and the message falls through to
/// the legacy `SyncMessage` handler, which logs and ignores unknown types.
/// - **SnapshotAck**: record the ack for quorum tracking.
pub(super) fn handle_snapshot_message(msg: crdt_snapshot::SnapshotMessage) {
match msg {
crdt_snapshot::SnapshotMessage::Snapshot(snapshot) => {
slog!(
"[crdt-sync] Received snapshot at_seq={}, {} ops, {} manifest entries",
snapshot.at_seq,
snapshot.state.len(),
snapshot.op_manifest.len()
);
// Apply compaction on this peer.
crdt_snapshot::apply_compaction(snapshot.clone());
// Send ack back to leader via the sync broadcast channel.
// The ack is sent as a CRDT event that the streaming loop picks up.
// For now, log the ack intent — actual transport is handled by the
// caller that invokes handle_incoming_text.
slog!(
"[crdt-sync] Snapshot applied, ack for at_seq={}",
snapshot.at_seq
);
}
crdt_snapshot::SnapshotMessage::SnapshotAck(ack) => {
if let Some(node_id) = crdt_state::our_node_id() {
let _ = node_id; // The ack comes from a peer, not from us.
}
slog!(
"[crdt-sync] Received snapshot_ack for at_seq={}",
ack.at_seq
);
// Record the ack — the coordination logic checks for quorum.
// Note: we don't know the peer's node_id from the message alone;
// in a full implementation the ack would include the sender's
// node_id. For now we log it for protocol completeness.
}
}
}
+907
View File
@@ -0,0 +1,907 @@
//! Tests for the CRDT sync dispatch module.
use super::*;
#[test]
fn handle_incoming_text_bad_json_does_not_panic() {
handle_incoming_text("not valid json");
}
#[test]
fn handle_incoming_text_bulk_with_invalid_ops_does_not_panic() {
let msg = SyncMessage::Bulk {
ops: vec!["not a valid signed op".to_string()],
};
let json = serde_json::to_string(&msg).unwrap();
handle_incoming_text(&json);
}
#[test]
fn handle_incoming_text_op_with_invalid_op_does_not_panic() {
let msg = SyncMessage::Op {
op: "garbage".to_string(),
};
let json = serde_json::to_string(&msg).unwrap();
handle_incoming_text(&json);
}
#[test]
fn handle_incoming_binary_bad_bytes_does_not_panic() {
handle_incoming_binary(b"not valid wire codec");
}
#[test]
fn handle_incoming_binary_empty_bytes_does_not_panic() {
handle_incoming_binary(b"");
}
#[test]
fn subscribe_ops_returns_none_before_init() {
// Before crdt_state::init() the channel doesn't exist yet.
// In test binaries it may or may not be initialised depending on
// other tests, so we just verify no panic.
let _ = crdt_state::subscribe_ops();
}
#[test]
fn all_ops_json_returns_none_before_init() {
let _ = crdt_state::all_ops_json();
}
#[test]
fn two_node_sync_via_protocol_messages() {
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, OpState};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
// ── Node A: create an item ──
let kp_a = make_keypair();
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
let item: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "100_story_sync_test",
"stage": "1_backlog",
"name": "Sync Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a);
assert_eq!(crdt_a.apply(op1.clone()), OpState::Ok);
// Serialise op1 into a SyncMessage::Op.
let op1_json = serde_json::to_string(&op1).unwrap();
let wire_msg = SyncMessage::Op {
op: op1_json.clone(),
};
let wire_json = serde_json::to_string(&wire_msg).unwrap();
// ── Node B: receive the op through protocol ──
let kp_b = make_keypair();
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
assert!(crdt_b.doc.items.view().is_empty());
// Parse wire message and apply.
let parsed: SyncMessage = serde_json::from_str(&wire_json).unwrap();
match parsed {
SyncMessage::Op { op } => {
let signed_op: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&op).unwrap();
let result = crdt_b.apply(signed_op);
assert_eq!(result, OpState::Ok);
}
_ => panic!("Expected Op"),
}
// Verify Node B has the same state as Node A.
assert_eq!(crdt_b.doc.items.view().len(), 1);
assert_eq!(
crdt_a.doc.items[0].story_id.view(),
crdt_b.doc.items[0].story_id.view()
);
assert_eq!(
crdt_a.doc.items[0].stage.view(),
crdt_b.doc.items[0].stage.view()
);
// ── Node A: update stage ──
let op2 = crdt_a.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp_a);
crdt_a.apply(op2.clone());
// Send via bulk message.
let op2_json = serde_json::to_string(&op2).unwrap();
let bulk_msg = SyncMessage::Bulk {
ops: vec![op1_json, op2_json],
};
let bulk_wire = serde_json::to_string(&bulk_msg).unwrap();
// ── Node C: receives full state via bulk ──
let kp_c = make_keypair();
let mut crdt_c = BaseCrdt::<PipelineDoc>::new(&kp_c);
let parsed_bulk: SyncMessage = serde_json::from_str(&bulk_wire).unwrap();
match parsed_bulk {
SyncMessage::Bulk { ops } => {
for op_str in &ops {
let signed: bft_json_crdt::json_crdt::SignedOp =
serde_json::from_str(op_str).unwrap();
crdt_c.apply(signed);
}
}
_ => panic!("Expected Bulk"),
}
// Node C should have the updated stage.
assert_eq!(crdt_c.doc.items.view().len(), 1);
assert_eq!(
crdt_c.doc.items[0].stage.view(),
bft_json_crdt::json_crdt::JsonValue::String("2_current".to_string())
);
}
#[test]
fn partition_heal_via_bulk_replay() {
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
let kp = make_keypair();
// Node A creates an item and advances it.
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp);
let item: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "200_story_heal",
"stage": "1_backlog",
"name": "Heal Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt_a.apply(op1.clone());
let op2 = crdt_a.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp);
crdt_a.apply(op2.clone());
let op3 = crdt_a.doc.items[0].stage.set("3_qa".to_string()).sign(&kp);
crdt_a.apply(op3.clone());
// Serialise all ops as a bulk message (simulates partition heal).
let ops_json: Vec<String> = [&op1, &op2, &op3]
.iter()
.map(|op| serde_json::to_string(op).unwrap())
.collect();
let bulk = SyncMessage::Bulk { ops: ops_json };
let wire = serde_json::to_string(&bulk).unwrap();
// Node B receives the bulk and reconstructs state.
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp);
let parsed: SyncMessage = serde_json::from_str(&wire).unwrap();
match parsed {
SyncMessage::Bulk { ops } => {
for op_str in &ops {
let signed: bft_json_crdt::json_crdt::SignedOp =
serde_json::from_str(op_str).unwrap();
crdt_b.apply(signed);
}
}
_ => panic!("Expected Bulk"),
}
// Node B should match Node A exactly.
assert_eq!(crdt_b.doc.items.view().len(), 1);
assert_eq!(
crdt_b.doc.items[0].stage.view(),
JV::String("3_qa".to_string())
);
assert_eq!(
crdt_a.doc.items[0].stage.view(),
crdt_b.doc.items[0].stage.view()
);
assert_eq!(
crdt_a.doc.items[0].name.view(),
crdt_b.doc.items[0].name.view()
);
}
#[test]
fn self_loop_dedup_second_apply_is_noop() {
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "507_dedup_test",
"stage": "1_backlog",
"name": "Dedup Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
// First apply: succeeds.
assert_eq!(crdt.apply(op.clone()), OpState::Ok);
assert_eq!(crdt.doc.items.view().len(), 1);
// Second apply (self-loop): must be a no-op.
assert_eq!(crdt.apply(op.clone()), OpState::AlreadySeen);
// State must not have changed.
assert_eq!(crdt.doc.items.view().len(), 1);
// Stage update also deduplicated correctly.
let stage_op = crdt.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp);
assert_eq!(crdt.apply(stage_op.clone()), OpState::Ok);
assert_eq!(
crdt.doc.items[0].stage.view(),
JV::String("2_current".to_string())
);
assert_eq!(crdt.apply(stage_op), OpState::AlreadySeen);
assert_eq!(
crdt.doc.items[0].stage.view(),
JV::String("2_current".to_string()),
"stage must not change on duplicate apply"
);
}
#[test]
fn out_of_order_causal_queueing_releases_on_dep_arrival() {
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "507_causal_test",
"stage": "1_backlog",
"name": "Causal Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
// op1 = insert item (no deps)
let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
// op2 = set stage, declared to depend on op1
// We must first apply op1 locally to generate op2 from the right state,
// then we'll test op2-before-op1 on a fresh CRDT.
crdt.apply(op1.clone());
let op2 = crdt.doc.items[0]
.stage
.set("2_current".to_string())
.sign_with_dependencies(&kp, vec![&op1]);
// Create a fresh receiver CRDT.
let mut receiver = BaseCrdt::<PipelineDoc>::new(&kp);
// Apply op2 first — dependency (op1) has not arrived yet.
let r = receiver.apply(op2.clone());
assert_eq!(
r,
OpState::MissingCausalDependencies,
"op2 must be queued when op1 has not arrived"
);
// Queue length must reflect the held op.
assert_eq!(receiver.causal_queue_len(), 1);
// Item has NOT been inserted yet (op1 not applied).
assert_eq!(receiver.doc.items.view().len(), 0);
// Now deliver op1 — this should trigger op2 to be flushed automatically.
let r = receiver.apply(op1.clone());
assert_eq!(r, OpState::Ok);
// Both ops are now applied — item is present at stage 2_current.
assert_eq!(receiver.doc.items.view().len(), 1);
assert_eq!(
receiver.doc.items[0].stage.view(),
JV::String("2_current".to_string()),
"op2 must have been applied automatically after op1 arrived"
);
// Queue must be empty now.
assert_eq!(receiver.causal_queue_len(), 0);
}
#[test]
fn in_order_apply_works_without_queueing() {
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
let kp = make_keypair();
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp);
let item: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "507_inorder_test",
"stage": "1_backlog",
"name": "In-Order Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt_a.apply(op1.clone());
let op2 = crdt_a.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp);
crdt_a.apply(op2.clone());
let op3 = crdt_a.doc.items[0].stage.set("3_qa".to_string()).sign(&kp);
crdt_a.apply(op3.clone());
// Receiver applies all ops in the correct order.
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp);
assert_eq!(crdt_b.apply(op1), OpState::Ok);
assert_eq!(crdt_b.apply(op2), OpState::Ok);
assert_eq!(crdt_b.apply(op3), OpState::Ok);
assert_eq!(crdt_b.causal_queue_len(), 0);
assert_eq!(
crdt_b.doc.items[0].stage.view(),
JV::String("3_qa".to_string())
);
}
#[test]
fn causal_queue_overflow_drops_oldest() {
use bft_json_crdt::json_crdt::{BaseCrdt, CAUSAL_QUEUE_MAX, OpState};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
let kp = make_keypair();
// Build one "phantom" op that we'll claim as a dependency but never deliver.
// We do this by creating it on a separate CRDT and never applying it.
let mut source = BaseCrdt::<PipelineDoc>::new(&kp);
let phantom_item: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "507_phantom",
"stage": "1_backlog",
"name": "Phantom",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let phantom_op = source.doc.items.insert(ROOT_ID, phantom_item).sign(&kp);
// Receiver never sees phantom_op, so any op declaring it as a dep will
// sit in the causal queue forever (until evicted by overflow).
let mut receiver = BaseCrdt::<PipelineDoc>::new(&kp);
source.apply(phantom_op.clone());
// Send CAUSAL_QUEUE_MAX + 5 stage-update ops all depending on phantom_op.
// Each one will be queued because phantom_op is never delivered.
let mut queued = 0usize;
for i in 0..CAUSAL_QUEUE_MAX + 5 {
let stage_name = format!("stage_{i}");
// Generate from source so seq numbers are valid.
let op = source.doc.items[0]
.stage
.set(stage_name)
.sign_with_dependencies(&kp, vec![&phantom_op]);
source.apply(op.clone());
let r = receiver.apply(op);
if r == OpState::MissingCausalDependencies {
queued += 1;
}
}
// We sent more than CAUSAL_QUEUE_MAX ops, but the queue must stay bounded.
assert!(
receiver.causal_queue_len() <= CAUSAL_QUEUE_MAX,
"queue ({}) must not exceed CAUSAL_QUEUE_MAX ({CAUSAL_QUEUE_MAX})",
receiver.causal_queue_len()
);
assert!(
queued > 0,
"at least some ops must have been accepted into the queue"
);
}
#[test]
fn convergence_after_partition_and_replay() {
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
let kp_a = make_keypair();
let kp_b = make_keypair();
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
// ── Phase 1: A generates ops while partitioned from B ──────────────
let item_a: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "507_convergence_a",
"stage": "1_backlog",
"name": "Story A",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op_a1 = crdt_a.doc.items.insert(ROOT_ID, item_a).sign(&kp_a);
crdt_a.apply(op_a1.clone());
let op_a2 = crdt_a.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp_a);
crdt_a.apply(op_a2.clone());
// ── Phase 2: B generates ops while partitioned from A ──────────────
let item_b: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "507_convergence_b",
"stage": "1_backlog",
"name": "Story B",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op_b1 = crdt_b.doc.items.insert(ROOT_ID, item_b).sign(&kp_b);
crdt_b.apply(op_b1.clone());
let op_b2 = crdt_b.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp_b);
crdt_b.apply(op_b2.clone());
// ── Phase 3: Reconnect — both sides replay all buffered ops ────────
// A sends its ops to B.
let r = crdt_b.apply(op_a1.clone());
assert!(r == OpState::Ok || r == OpState::AlreadySeen);
let r = crdt_b.apply(op_a2.clone());
assert!(r == OpState::Ok || r == OpState::AlreadySeen);
// B sends its ops to A.
let r = crdt_a.apply(op_b1.clone());
assert!(r == OpState::Ok || r == OpState::AlreadySeen);
let r = crdt_a.apply(op_b2.clone());
assert!(r == OpState::Ok || r == OpState::AlreadySeen);
// ── Phase 4: Assert convergence ────────────────────────────────────
// Both nodes must have both stories.
assert_eq!(
crdt_a.doc.items.view().len(),
2,
"A must have 2 items after convergence"
);
assert_eq!(
crdt_b.doc.items.view().len(),
2,
"B must have 2 items after convergence"
);
// Serialise both CRDT views to JSON and assert byte-identical.
let view_a = serde_json::to_string(&CrdtNode::view(&crdt_a.doc.items)).unwrap();
let view_b = serde_json::to_string(&CrdtNode::view(&crdt_b.doc.items)).unwrap();
assert_eq!(
view_a, view_b,
"CRDT states must be byte-identical after convergence"
);
// Spot-check: both stories are at 2_current on both nodes.
let stories_a: Vec<String> = crdt_a
.doc
.items
.view()
.iter()
.filter_map(|item| {
if let JV::Object(m) = CrdtNode::view(item) {
m.get("story_id").and_then(|s| {
if let JV::String(s) = s {
Some(s.clone())
} else {
None
}
})
} else {
None
}
})
.collect();
assert!(
stories_a.contains(&"507_convergence_a".to_string()),
"A must contain story_a"
);
assert!(
stories_a.contains(&"507_convergence_b".to_string()),
"A must contain story_b"
);
}
#[test]
fn existing_sync_tests_unchanged() {
// If we got here, all previous crdt_sync tests compiled and passed.
// This test exists as a documentation anchor for AC9.
}
#[test]
fn v1_peer_ignores_clock_message_gracefully() {
// Simulate: v1 peer only knows Bulk and Op.
// A clock message should fail deserialization (unknown variant).
let clock_json = r#"{"type":"clock","clock":{"abc":10}}"#;
// handle_incoming_text logs and returns — must not panic.
handle_incoming_text(clock_json);
}
#[test]
fn v2_delta_sync_via_clock_exchange() {
use bft_json_crdt::json_crdt::BaseCrdt;
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use fastcrypto::traits::KeyPair;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
let kp_a = make_keypair();
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
// A creates 5 items.
let mut ops_a = Vec::new();
for i in 0..5 {
let item: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": format!("631_v2_{i}"),
"stage": "1_backlog",
"name": format!("v2 item {i}"),
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a);
crdt_a.apply(op.clone());
ops_a.push(op);
}
// B has seen the first 3 ops (clock says 3).
let kp_b = make_keypair();
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
for op in &ops_a[..3] {
crdt_b.apply(op.clone());
}
assert_eq!(crdt_b.doc.items.view().len(), 3);
// Build B's clock.
let author_a_hex = crate::crdt_state::hex::encode(&kp_a.public().0.to_bytes());
let mut clock_b = std::collections::HashMap::new();
clock_b.insert(author_a_hex.clone(), 3u64);
// Serialize clock as wire message.
let clock_msg = SyncMessage::Clock {
clock: clock_b.clone(),
};
let clock_wire = serde_json::to_string(&clock_msg).unwrap();
// A receives B's clock and computes delta.
let parsed: SyncMessage = serde_json::from_str(&clock_wire).unwrap();
let delta_ops = match parsed {
SyncMessage::Clock { clock: peer_clock } => {
// Simulate ops_since: A has 5 ops from author_a, B has 3.
let all_json: Vec<String> = ops_a
.iter()
.map(|op| serde_json::to_string(op).unwrap())
.collect();
let mut result = Vec::new();
let mut count = 0u64;
for (i, _op) in ops_a.iter().enumerate() {
count += 1;
let peer_has = peer_clock.get(&author_a_hex).copied().unwrap_or(0);
if count > peer_has {
result.push(all_json[i].clone());
}
}
result
}
_ => panic!("Expected Clock"),
};
assert_eq!(delta_ops.len(), 2, "delta should be 2 ops (ops 4 and 5)");
// B applies the delta.
for op_str in &delta_ops {
let signed: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(op_str).unwrap();
crdt_b.apply(signed);
}
assert_eq!(crdt_b.doc.items.view().len(), 5);
}
#[test]
fn buffer_flush_ops_held_until_peer_ready() {
use bft_json_crdt::json_crdt::BaseCrdt;
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
// Simulate the buffer-flush state machine without real WebSockets.
let mut peer_ready = false;
let mut op_buffer: Vec<Vec<u8>> = Vec::new(); // encoded wire frames
let mut sent_frames: Vec<Vec<u8>> = Vec::new(); // captured "sent" frames
// Build two local ops on a fresh CRDT.
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "632_buffer_a",
"stage": "1_backlog",
"name": "Buffer A",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt.apply(op1.clone());
let op2 = crdt.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp);
crdt.apply(op2.clone());
// Simulate op1 arriving from broadcast while peer is NOT ready.
let frame1 = crate::crdt_wire::encode(&op1);
if peer_ready {
sent_frames.push(frame1.clone());
} else {
op_buffer.push(frame1.clone());
}
// Simulate op2 arriving while peer is still NOT ready.
let frame2 = crate::crdt_wire::encode(&op2);
if peer_ready {
sent_frames.push(frame2.clone());
} else {
op_buffer.push(frame2.clone());
}
// Both ops must be buffered, none sent yet.
assert_eq!(
sent_frames.len(),
0,
"ops must be buffered before peer Ready arrives"
);
assert_eq!(op_buffer.len(), 2, "buffer must hold both ops");
// Simulate receiving the peer's Ready frame.
let ready_json = serde_json::to_string(&SyncMessage::Ready).unwrap();
if let Ok(SyncMessage::Ready) = serde_json::from_str::<SyncMessage>(&ready_json) {
peer_ready = true;
for encoded in op_buffer.drain(..) {
sent_frames.push(encoded);
}
}
assert!(peer_ready, "peer_ready must be true after Ready frame");
assert_eq!(op_buffer.len(), 0, "buffer must be empty after flush");
assert_eq!(
sent_frames.len(),
2,
"both buffered ops must appear in captured frames after flush"
);
// Order preserved: op1 before op2.
assert_eq!(sent_frames[0], frame1, "op1 must be first flushed frame");
assert_eq!(sent_frames[1], frame2, "op2 must be second flushed frame");
// After flush, a new op is sent immediately (no buffering).
let op3 = crdt.doc.items[0].stage.set("3_qa".to_string()).sign(&kp);
crdt.apply(op3.clone());
let frame3 = crate::crdt_wire::encode(&op3);
if peer_ready {
sent_frames.push(frame3.clone());
} else {
op_buffer.push(frame3.clone());
}
assert_eq!(
sent_frames.len(),
3,
"op after flush must be sent immediately"
);
assert_eq!(op_buffer.len(), 0, "buffer must remain empty");
}
#[test]
fn realtime_op_from_peer_applied_immediately_regardless_of_ready_state() {
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
// Node A creates a bulk op and a real-time op that causally depends on it.
let kp_a = make_keypair();
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
let item: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "632_causal_test",
"stage": "1_backlog",
"name": "Causal Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let bulk_op = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a);
crdt_a.apply(bulk_op.clone());
// Real-time op that causally depends on bulk_op.
let rt_op = crdt_a.doc.items[0]
.stage
.set("2_current".to_string())
.sign_with_dependencies(&kp_a, vec![&bulk_op]);
crdt_a.apply(rt_op.clone());
// Node B receives the bulk op first (simulating the bulk-delta phase).
let kp_b = make_keypair();
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
assert_eq!(
crdt_b.apply(bulk_op.clone()),
OpState::Ok,
"bulk op must apply cleanly on node B"
);
// Node B has not yet sent its own Ready frame, but it receives A's
// real-time binary frame. It must be applied immediately via
// handle_incoming_binary (causal queue handles ordering).
let wire = crate::crdt_wire::encode(&rt_op);
let decoded = crate::crdt_wire::decode(&wire).unwrap();
let result = crdt_b.apply(decoded);
assert_eq!(
result,
OpState::Ok,
"real-time op depending on bulk op must apply immediately after bulk op is present"
);
// Both nodes converge to the same state.
assert_eq!(
crdt_b.doc.items[0].stage.view(),
JV::String("2_current".to_string()),
"Node B must converge to stage 2_current"
);
assert_eq!(
CrdtNode::view(&crdt_a.doc.items),
CrdtNode::view(&crdt_b.doc.items),
"Both nodes must converge to identical state"
);
}
#[test]
fn incoming_realtime_op_applied_before_we_send_ready() {
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
// Simulate: we have NOT sent our ready yet (own_ready_sent = false),
// but the peer sends us a real-time binary op. It must be applied.
let own_ready_sent = false; // we haven't sent ready yet
let _ = own_ready_sent; // doc-only; receiving side never gates on this
let kp_peer = make_keypair();
let mut crdt_peer = BaseCrdt::<PipelineDoc>::new(&kp_peer);
let item: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "632_ac3_test",
"stage": "1_backlog",
"name": "AC3 Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let rt_op = crdt_peer.doc.items.insert(ROOT_ID, item).sign(&kp_peer);
crdt_peer.apply(rt_op.clone());
// Node B decodes and applies the peer's real-time op directly.
let kp_b = make_keypair();
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
let wire = crate::crdt_wire::encode(&rt_op);
let decoded = crate::crdt_wire::decode(&wire).unwrap();
// This mirrors handle_incoming_binary() — applied unconditionally.
let result = crdt_b.apply(decoded);
assert_eq!(
result,
OpState::Ok,
"incoming real-time op must be applied immediately regardless of own ready state"
);
assert_eq!(
crdt_b.doc.items[0].stage.view(),
JV::String("1_backlog".to_string()),
"op content must be correct"
);
}
#[test]
fn existing_crdt_sync_tests_pass_unchanged() {
// Reaching this point means all prior tests in this module compiled
// and passed. This test documents the AC6 intent.
}