diff --git a/server/src/crdt_sync/dispatch.rs b/server/src/crdt_sync/dispatch.rs deleted file mode 100644 index 5692e6c4..00000000 --- a/server/src/crdt_sync/dispatch.rs +++ /dev/null @@ -1,1028 +0,0 @@ -//! Shared message dispatch — handle inbound text/binary frames once auth is complete. - -use bft_json_crdt::json_crdt::SignedOp; - -use crate::crdt_snapshot; -use crate::crdt_state; -use crate::crdt_wire; -use crate::slog; - -use super::wire::SyncMessage; - -pub(super) fn handle_incoming_text(text: &str) { - // First try to parse as a snapshot protocol message. - if let Ok(snapshot_msg) = serde_json::from_str::(text) { - handle_snapshot_message(snapshot_msg); - return; - } - - let msg: SyncMessage = match serde_json::from_str(text) { - Ok(m) => m, - Err(e) => { - slog!("[crdt-sync] Bad text message from peer: {e}"); - return; - } - }; - - match msg { - SyncMessage::Bulk { ops } => { - let mut applied = 0u64; - for op_json in &ops { - if let Ok(signed_op) = serde_json::from_str::(op_json) - && crdt_state::apply_remote_op(signed_op) - { - applied += 1; - } - } - slog!( - "[crdt-sync] Bulk sync: received {} ops, applied {applied}", - ops.len() - ); - } - SyncMessage::Op { op } => { - if let Ok(signed_op) = serde_json::from_str::(&op) { - crdt_state::apply_remote_op(signed_op); - } - } - SyncMessage::Clock { .. } => { - // Clock frames are handled during the initial negotiation phase. - // If one arrives during the streaming loop it is a protocol error - // on the peer's part — log and ignore. - slog!("[crdt-sync] Ignoring unexpected clock frame during streaming phase"); - } - SyncMessage::Ready => { - // Ready frames are intercepted inline in the streaming loop before - // this function is called. If one reaches here it is a protocol - // error — log and ignore. - slog!("[crdt-sync] Ignoring unexpected ready frame in handle_incoming_text"); - } - } -} - -/// Handle an incoming snapshot protocol message. -/// -/// - **Snapshot**: apply the snapshot state and send an ack back. -/// Peers without snapshot support will never reach this code path because -/// the `SnapshotMessage` parse will fail and the message falls through to -/// the legacy `SyncMessage` handler, which logs and ignores unknown types. -/// - **SnapshotAck**: record the ack for quorum tracking. -pub(super) fn handle_snapshot_message(msg: crdt_snapshot::SnapshotMessage) { - match msg { - crdt_snapshot::SnapshotMessage::Snapshot(snapshot) => { - slog!( - "[crdt-sync] Received snapshot at_seq={}, {} ops, {} manifest entries", - snapshot.at_seq, - snapshot.state.len(), - snapshot.op_manifest.len() - ); - // Apply compaction on this peer. - crdt_snapshot::apply_compaction(snapshot.clone()); - - // Send ack back to leader via the sync broadcast channel. - // The ack is sent as a CRDT event that the streaming loop picks up. - // For now, log the ack intent — actual transport is handled by the - // caller that invokes handle_incoming_text. - slog!( - "[crdt-sync] Snapshot applied, ack for at_seq={}", - snapshot.at_seq - ); - } - crdt_snapshot::SnapshotMessage::SnapshotAck(ack) => { - if let Some(node_id) = crdt_state::our_node_id() { - let _ = node_id; // The ack comes from a peer, not from us. - } - slog!( - "[crdt-sync] Received snapshot_ack for at_seq={}", - ack.at_seq - ); - // Record the ack — the coordination logic checks for quorum. - // Note: we don't know the peer's node_id from the message alone; - // in a full implementation the ack would include the sender's - // node_id. For now we log it for protocol completeness. - } - } -} - -/// Process an incoming binary-frame op from a peer. -/// -/// Binary frames carry a single `SignedOp` encoded via [`crdt_wire`]. -pub(super) fn handle_incoming_binary(bytes: &[u8]) { - match crdt_wire::decode(bytes) { - Ok(signed_op) => { - crdt_state::apply_remote_op(signed_op); - } - Err(e) => { - slog!("[crdt-sync] Bad binary frame from peer: {e}"); - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn handle_incoming_text_bad_json_does_not_panic() { - handle_incoming_text("not valid json"); - } - - #[test] - fn handle_incoming_text_bulk_with_invalid_ops_does_not_panic() { - let msg = SyncMessage::Bulk { - ops: vec!["not a valid signed op".to_string()], - }; - let json = serde_json::to_string(&msg).unwrap(); - handle_incoming_text(&json); - } - - #[test] - fn handle_incoming_text_op_with_invalid_op_does_not_panic() { - let msg = SyncMessage::Op { - op: "garbage".to_string(), - }; - let json = serde_json::to_string(&msg).unwrap(); - handle_incoming_text(&json); - } - - #[test] - fn handle_incoming_binary_bad_bytes_does_not_panic() { - handle_incoming_binary(b"not valid wire codec"); - } - - #[test] - fn handle_incoming_binary_empty_bytes_does_not_panic() { - handle_incoming_binary(b""); - } - - #[test] - fn subscribe_ops_returns_none_before_init() { - // Before crdt_state::init() the channel doesn't exist yet. - // In test binaries it may or may not be initialised depending on - // other tests, so we just verify no panic. - let _ = crdt_state::subscribe_ops(); - } - - #[test] - fn all_ops_json_returns_none_before_init() { - let _ = crdt_state::all_ops_json(); - } - - #[test] - fn two_node_sync_via_protocol_messages() { - use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, OpState}; - use bft_json_crdt::keypair::make_keypair; - use bft_json_crdt::op::ROOT_ID; - use serde_json::json; - - use crate::crdt_state::PipelineDoc; - - // ── Node A: create an item ── - let kp_a = make_keypair(); - let mut crdt_a = BaseCrdt::::new(&kp_a); - - let item: bft_json_crdt::json_crdt::JsonValue = json!({ - "story_id": "100_story_sync_test", - "stage": "1_backlog", - "name": "Sync Test", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); - assert_eq!(crdt_a.apply(op1.clone()), OpState::Ok); - - // Serialise op1 into a SyncMessage::Op. - let op1_json = serde_json::to_string(&op1).unwrap(); - let wire_msg = SyncMessage::Op { - op: op1_json.clone(), - }; - let wire_json = serde_json::to_string(&wire_msg).unwrap(); - - // ── Node B: receive the op through protocol ── - let kp_b = make_keypair(); - let mut crdt_b = BaseCrdt::::new(&kp_b); - assert!(crdt_b.doc.items.view().is_empty()); - - // Parse wire message and apply. - let parsed: SyncMessage = serde_json::from_str(&wire_json).unwrap(); - match parsed { - SyncMessage::Op { op } => { - let signed_op: bft_json_crdt::json_crdt::SignedOp = - serde_json::from_str(&op).unwrap(); - let result = crdt_b.apply(signed_op); - assert_eq!(result, OpState::Ok); - } - _ => panic!("Expected Op"), - } - - // Verify Node B has the same state as Node A. - assert_eq!(crdt_b.doc.items.view().len(), 1); - assert_eq!( - crdt_a.doc.items[0].story_id.view(), - crdt_b.doc.items[0].story_id.view() - ); - assert_eq!( - crdt_a.doc.items[0].stage.view(), - crdt_b.doc.items[0].stage.view() - ); - - // ── Node A: update stage ── - let op2 = crdt_a.doc.items[0] - .stage - .set("2_current".to_string()) - .sign(&kp_a); - crdt_a.apply(op2.clone()); - - // Send via bulk message. - let op2_json = serde_json::to_string(&op2).unwrap(); - let bulk_msg = SyncMessage::Bulk { - ops: vec![op1_json, op2_json], - }; - let bulk_wire = serde_json::to_string(&bulk_msg).unwrap(); - - // ── Node C: receives full state via bulk ── - let kp_c = make_keypair(); - let mut crdt_c = BaseCrdt::::new(&kp_c); - - let parsed_bulk: SyncMessage = serde_json::from_str(&bulk_wire).unwrap(); - match parsed_bulk { - SyncMessage::Bulk { ops } => { - for op_str in &ops { - let signed: bft_json_crdt::json_crdt::SignedOp = - serde_json::from_str(op_str).unwrap(); - crdt_c.apply(signed); - } - } - _ => panic!("Expected Bulk"), - } - - // Node C should have the updated stage. - assert_eq!(crdt_c.doc.items.view().len(), 1); - assert_eq!( - crdt_c.doc.items[0].stage.view(), - bft_json_crdt::json_crdt::JsonValue::String("2_current".to_string()) - ); - } - - #[test] - fn partition_heal_via_bulk_replay() { - use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV}; - use bft_json_crdt::keypair::make_keypair; - use bft_json_crdt::op::ROOT_ID; - use serde_json::json; - - use crate::crdt_state::PipelineDoc; - - let kp = make_keypair(); - - // Node A creates an item and advances it. - let mut crdt_a = BaseCrdt::::new(&kp); - let item: bft_json_crdt::json_crdt::JsonValue = json!({ - "story_id": "200_story_heal", - "stage": "1_backlog", - "name": "Heal Test", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - - let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp); - crdt_a.apply(op1.clone()); - - let op2 = crdt_a.doc.items[0] - .stage - .set("2_current".to_string()) - .sign(&kp); - crdt_a.apply(op2.clone()); - - let op3 = crdt_a.doc.items[0].stage.set("3_qa".to_string()).sign(&kp); - crdt_a.apply(op3.clone()); - - // Serialise all ops as a bulk message (simulates partition heal). - let ops_json: Vec = [&op1, &op2, &op3] - .iter() - .map(|op| serde_json::to_string(op).unwrap()) - .collect(); - let bulk = SyncMessage::Bulk { ops: ops_json }; - let wire = serde_json::to_string(&bulk).unwrap(); - - // Node B receives the bulk and reconstructs state. - let mut crdt_b = BaseCrdt::::new(&kp); - let parsed: SyncMessage = serde_json::from_str(&wire).unwrap(); - match parsed { - SyncMessage::Bulk { ops } => { - for op_str in &ops { - let signed: bft_json_crdt::json_crdt::SignedOp = - serde_json::from_str(op_str).unwrap(); - crdt_b.apply(signed); - } - } - _ => panic!("Expected Bulk"), - } - - // Node B should match Node A exactly. - assert_eq!(crdt_b.doc.items.view().len(), 1); - assert_eq!( - crdt_b.doc.items[0].stage.view(), - JV::String("3_qa".to_string()) - ); - assert_eq!( - crdt_a.doc.items[0].stage.view(), - crdt_b.doc.items[0].stage.view() - ); - assert_eq!( - crdt_a.doc.items[0].name.view(), - crdt_b.doc.items[0].name.view() - ); - } - - #[test] - fn self_loop_dedup_second_apply_is_noop() { - use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; - use bft_json_crdt::keypair::make_keypair; - use bft_json_crdt::op::ROOT_ID; - use serde_json::json; - - use crate::crdt_state::PipelineDoc; - - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - - let item: bft_json_crdt::json_crdt::JsonValue = json!({ - "story_id": "507_dedup_test", - "stage": "1_backlog", - "name": "Dedup Test", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - - let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); - - // First apply: succeeds. - assert_eq!(crdt.apply(op.clone()), OpState::Ok); - assert_eq!(crdt.doc.items.view().len(), 1); - - // Second apply (self-loop): must be a no-op. - assert_eq!(crdt.apply(op.clone()), OpState::AlreadySeen); - - // State must not have changed. - assert_eq!(crdt.doc.items.view().len(), 1); - - // Stage update also deduplicated correctly. - let stage_op = crdt.doc.items[0] - .stage - .set("2_current".to_string()) - .sign(&kp); - assert_eq!(crdt.apply(stage_op.clone()), OpState::Ok); - assert_eq!( - crdt.doc.items[0].stage.view(), - JV::String("2_current".to_string()) - ); - assert_eq!(crdt.apply(stage_op), OpState::AlreadySeen); - assert_eq!( - crdt.doc.items[0].stage.view(), - JV::String("2_current".to_string()), - "stage must not change on duplicate apply" - ); - } - - #[test] - fn out_of_order_causal_queueing_releases_on_dep_arrival() { - use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; - use bft_json_crdt::keypair::make_keypair; - use bft_json_crdt::op::ROOT_ID; - use serde_json::json; - - use crate::crdt_state::PipelineDoc; - - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - - let item: bft_json_crdt::json_crdt::JsonValue = json!({ - "story_id": "507_causal_test", - "stage": "1_backlog", - "name": "Causal Test", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - - // op1 = insert item (no deps) - let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); - - // op2 = set stage, declared to depend on op1 - // We must first apply op1 locally to generate op2 from the right state, - // then we'll test op2-before-op1 on a fresh CRDT. - crdt.apply(op1.clone()); - let op2 = crdt.doc.items[0] - .stage - .set("2_current".to_string()) - .sign_with_dependencies(&kp, vec![&op1]); - - // Create a fresh receiver CRDT. - let mut receiver = BaseCrdt::::new(&kp); - - // Apply op2 first — dependency (op1) has not arrived yet. - let r = receiver.apply(op2.clone()); - assert_eq!( - r, - OpState::MissingCausalDependencies, - "op2 must be queued when op1 has not arrived" - ); - // Queue length must reflect the held op. - assert_eq!(receiver.causal_queue_len(), 1); - - // Item has NOT been inserted yet (op1 not applied). - assert_eq!(receiver.doc.items.view().len(), 0); - - // Now deliver op1 — this should trigger op2 to be flushed automatically. - let r = receiver.apply(op1.clone()); - assert_eq!(r, OpState::Ok); - - // Both ops are now applied — item is present at stage 2_current. - assert_eq!(receiver.doc.items.view().len(), 1); - assert_eq!( - receiver.doc.items[0].stage.view(), - JV::String("2_current".to_string()), - "op2 must have been applied automatically after op1 arrived" - ); - - // Queue must be empty now. - assert_eq!(receiver.causal_queue_len(), 0); - } - - #[test] - fn in_order_apply_works_without_queueing() { - use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; - use bft_json_crdt::keypair::make_keypair; - use bft_json_crdt::op::ROOT_ID; - use serde_json::json; - - use crate::crdt_state::PipelineDoc; - - let kp = make_keypair(); - let mut crdt_a = BaseCrdt::::new(&kp); - - let item: bft_json_crdt::json_crdt::JsonValue = json!({ - "story_id": "507_inorder_test", - "stage": "1_backlog", - "name": "In-Order Test", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - - let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp); - crdt_a.apply(op1.clone()); - let op2 = crdt_a.doc.items[0] - .stage - .set("2_current".to_string()) - .sign(&kp); - crdt_a.apply(op2.clone()); - let op3 = crdt_a.doc.items[0].stage.set("3_qa".to_string()).sign(&kp); - crdt_a.apply(op3.clone()); - - // Receiver applies all ops in the correct order. - let mut crdt_b = BaseCrdt::::new(&kp); - assert_eq!(crdt_b.apply(op1), OpState::Ok); - assert_eq!(crdt_b.apply(op2), OpState::Ok); - assert_eq!(crdt_b.apply(op3), OpState::Ok); - assert_eq!(crdt_b.causal_queue_len(), 0); - assert_eq!( - crdt_b.doc.items[0].stage.view(), - JV::String("3_qa".to_string()) - ); - } - - #[test] - fn causal_queue_overflow_drops_oldest() { - use bft_json_crdt::json_crdt::{BaseCrdt, CAUSAL_QUEUE_MAX, OpState}; - use bft_json_crdt::keypair::make_keypair; - use bft_json_crdt::op::ROOT_ID; - use serde_json::json; - - use crate::crdt_state::PipelineDoc; - - let kp = make_keypair(); - - // Build one "phantom" op that we'll claim as a dependency but never deliver. - // We do this by creating it on a separate CRDT and never applying it. - let mut source = BaseCrdt::::new(&kp); - let phantom_item: bft_json_crdt::json_crdt::JsonValue = json!({ - "story_id": "507_phantom", - "stage": "1_backlog", - "name": "Phantom", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let phantom_op = source.doc.items.insert(ROOT_ID, phantom_item).sign(&kp); - - // Receiver never sees phantom_op, so any op declaring it as a dep will - // sit in the causal queue forever (until evicted by overflow). - let mut receiver = BaseCrdt::::new(&kp); - source.apply(phantom_op.clone()); - - // Send CAUSAL_QUEUE_MAX + 5 stage-update ops all depending on phantom_op. - // Each one will be queued because phantom_op is never delivered. - let mut queued = 0usize; - for i in 0..CAUSAL_QUEUE_MAX + 5 { - let stage_name = format!("stage_{i}"); - // Generate from source so seq numbers are valid. - let op = source.doc.items[0] - .stage - .set(stage_name) - .sign_with_dependencies(&kp, vec![&phantom_op]); - source.apply(op.clone()); - let r = receiver.apply(op); - if r == OpState::MissingCausalDependencies { - queued += 1; - } - } - - // We sent more than CAUSAL_QUEUE_MAX ops, but the queue must stay bounded. - assert!( - receiver.causal_queue_len() <= CAUSAL_QUEUE_MAX, - "queue ({}) must not exceed CAUSAL_QUEUE_MAX ({CAUSAL_QUEUE_MAX})", - receiver.causal_queue_len() - ); - assert!( - queued > 0, - "at least some ops must have been accepted into the queue" - ); - } - - #[test] - fn convergence_after_partition_and_replay() { - use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; - use bft_json_crdt::keypair::make_keypair; - use bft_json_crdt::op::ROOT_ID; - use serde_json::json; - - use crate::crdt_state::PipelineDoc; - - let kp_a = make_keypair(); - let kp_b = make_keypair(); - - let mut crdt_a = BaseCrdt::::new(&kp_a); - let mut crdt_b = BaseCrdt::::new(&kp_b); - - // ── Phase 1: A generates ops while partitioned from B ────────────── - - let item_a: bft_json_crdt::json_crdt::JsonValue = json!({ - "story_id": "507_convergence_a", - "stage": "1_backlog", - "name": "Story A", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let op_a1 = crdt_a.doc.items.insert(ROOT_ID, item_a).sign(&kp_a); - crdt_a.apply(op_a1.clone()); - - let op_a2 = crdt_a.doc.items[0] - .stage - .set("2_current".to_string()) - .sign(&kp_a); - crdt_a.apply(op_a2.clone()); - - // ── Phase 2: B generates ops while partitioned from A ────────────── - - let item_b: bft_json_crdt::json_crdt::JsonValue = json!({ - "story_id": "507_convergence_b", - "stage": "1_backlog", - "name": "Story B", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let op_b1 = crdt_b.doc.items.insert(ROOT_ID, item_b).sign(&kp_b); - crdt_b.apply(op_b1.clone()); - - let op_b2 = crdt_b.doc.items[0] - .stage - .set("2_current".to_string()) - .sign(&kp_b); - crdt_b.apply(op_b2.clone()); - - // ── Phase 3: Reconnect — both sides replay all buffered ops ──────── - - // A sends its ops to B. - let r = crdt_b.apply(op_a1.clone()); - assert!(r == OpState::Ok || r == OpState::AlreadySeen); - let r = crdt_b.apply(op_a2.clone()); - assert!(r == OpState::Ok || r == OpState::AlreadySeen); - - // B sends its ops to A. - let r = crdt_a.apply(op_b1.clone()); - assert!(r == OpState::Ok || r == OpState::AlreadySeen); - let r = crdt_a.apply(op_b2.clone()); - assert!(r == OpState::Ok || r == OpState::AlreadySeen); - - // ── Phase 4: Assert convergence ──────────────────────────────────── - - // Both nodes must have both stories. - assert_eq!( - crdt_a.doc.items.view().len(), - 2, - "A must have 2 items after convergence" - ); - assert_eq!( - crdt_b.doc.items.view().len(), - 2, - "B must have 2 items after convergence" - ); - - // Serialise both CRDT views to JSON and assert byte-identical. - let view_a = serde_json::to_string(&CrdtNode::view(&crdt_a.doc.items)).unwrap(); - let view_b = serde_json::to_string(&CrdtNode::view(&crdt_b.doc.items)).unwrap(); - assert_eq!( - view_a, view_b, - "CRDT states must be byte-identical after convergence" - ); - - // Spot-check: both stories are at 2_current on both nodes. - let stories_a: Vec = crdt_a - .doc - .items - .view() - .iter() - .filter_map(|item| { - if let JV::Object(m) = CrdtNode::view(item) { - m.get("story_id").and_then(|s| { - if let JV::String(s) = s { - Some(s.clone()) - } else { - None - } - }) - } else { - None - } - }) - .collect(); - assert!( - stories_a.contains(&"507_convergence_a".to_string()), - "A must contain story_a" - ); - assert!( - stories_a.contains(&"507_convergence_b".to_string()), - "A must contain story_b" - ); - } - - #[test] - fn existing_sync_tests_unchanged() { - // If we got here, all previous crdt_sync tests compiled and passed. - // This test exists as a documentation anchor for AC9. - } - - #[test] - fn v1_peer_ignores_clock_message_gracefully() { - // Simulate: v1 peer only knows Bulk and Op. - // A clock message should fail deserialization (unknown variant). - let clock_json = r#"{"type":"clock","clock":{"abc":10}}"#; - // handle_incoming_text logs and returns — must not panic. - handle_incoming_text(clock_json); - } - - #[test] - fn v2_delta_sync_via_clock_exchange() { - use bft_json_crdt::json_crdt::BaseCrdt; - use bft_json_crdt::keypair::make_keypair; - use bft_json_crdt::op::ROOT_ID; - use fastcrypto::traits::KeyPair; - use serde_json::json; - - use crate::crdt_state::PipelineDoc; - - let kp_a = make_keypair(); - let mut crdt_a = BaseCrdt::::new(&kp_a); - - // A creates 5 items. - let mut ops_a = Vec::new(); - for i in 0..5 { - let item: bft_json_crdt::json_crdt::JsonValue = json!({ - "story_id": format!("631_v2_{i}"), - "stage": "1_backlog", - "name": format!("v2 item {i}"), - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let op = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); - crdt_a.apply(op.clone()); - ops_a.push(op); - } - - // B has seen the first 3 ops (clock says 3). - let kp_b = make_keypair(); - let mut crdt_b = BaseCrdt::::new(&kp_b); - for op in &ops_a[..3] { - crdt_b.apply(op.clone()); - } - assert_eq!(crdt_b.doc.items.view().len(), 3); - - // Build B's clock. - let author_a_hex = crate::crdt_state::hex::encode(&kp_a.public().0.to_bytes()); - let mut clock_b = std::collections::HashMap::new(); - clock_b.insert(author_a_hex.clone(), 3u64); - - // Serialize clock as wire message. - let clock_msg = SyncMessage::Clock { - clock: clock_b.clone(), - }; - let clock_wire = serde_json::to_string(&clock_msg).unwrap(); - - // A receives B's clock and computes delta. - let parsed: SyncMessage = serde_json::from_str(&clock_wire).unwrap(); - let delta_ops = match parsed { - SyncMessage::Clock { clock: peer_clock } => { - // Simulate ops_since: A has 5 ops from author_a, B has 3. - let all_json: Vec = ops_a - .iter() - .map(|op| serde_json::to_string(op).unwrap()) - .collect(); - let mut result = Vec::new(); - let mut count = 0u64; - for (i, _op) in ops_a.iter().enumerate() { - count += 1; - let peer_has = peer_clock.get(&author_a_hex).copied().unwrap_or(0); - if count > peer_has { - result.push(all_json[i].clone()); - } - } - result - } - _ => panic!("Expected Clock"), - }; - - assert_eq!(delta_ops.len(), 2, "delta should be 2 ops (ops 4 and 5)"); - - // B applies the delta. - for op_str in &delta_ops { - let signed: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(op_str).unwrap(); - crdt_b.apply(signed); - } - assert_eq!(crdt_b.doc.items.view().len(), 5); - } - - #[test] - fn buffer_flush_ops_held_until_peer_ready() { - use bft_json_crdt::json_crdt::BaseCrdt; - use bft_json_crdt::keypair::make_keypair; - use bft_json_crdt::op::ROOT_ID; - use serde_json::json; - - use crate::crdt_state::PipelineDoc; - - // Simulate the buffer-flush state machine without real WebSockets. - let mut peer_ready = false; - let mut op_buffer: Vec> = Vec::new(); // encoded wire frames - let mut sent_frames: Vec> = Vec::new(); // captured "sent" frames - - // Build two local ops on a fresh CRDT. - let kp = make_keypair(); - let mut crdt = BaseCrdt::::new(&kp); - let item: bft_json_crdt::json_crdt::JsonValue = json!({ - "story_id": "632_buffer_a", - "stage": "1_backlog", - "name": "Buffer A", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); - crdt.apply(op1.clone()); - let op2 = crdt.doc.items[0] - .stage - .set("2_current".to_string()) - .sign(&kp); - crdt.apply(op2.clone()); - - // Simulate op1 arriving from broadcast while peer is NOT ready. - let frame1 = crate::crdt_wire::encode(&op1); - if peer_ready { - sent_frames.push(frame1.clone()); - } else { - op_buffer.push(frame1.clone()); - } - - // Simulate op2 arriving while peer is still NOT ready. - let frame2 = crate::crdt_wire::encode(&op2); - if peer_ready { - sent_frames.push(frame2.clone()); - } else { - op_buffer.push(frame2.clone()); - } - - // Both ops must be buffered, none sent yet. - assert_eq!( - sent_frames.len(), - 0, - "ops must be buffered before peer Ready arrives" - ); - assert_eq!(op_buffer.len(), 2, "buffer must hold both ops"); - - // Simulate receiving the peer's Ready frame. - let ready_json = serde_json::to_string(&SyncMessage::Ready).unwrap(); - if let Ok(SyncMessage::Ready) = serde_json::from_str::(&ready_json) { - peer_ready = true; - for encoded in op_buffer.drain(..) { - sent_frames.push(encoded); - } - } - - assert!(peer_ready, "peer_ready must be true after Ready frame"); - assert_eq!(op_buffer.len(), 0, "buffer must be empty after flush"); - assert_eq!( - sent_frames.len(), - 2, - "both buffered ops must appear in captured frames after flush" - ); - // Order preserved: op1 before op2. - assert_eq!(sent_frames[0], frame1, "op1 must be first flushed frame"); - assert_eq!(sent_frames[1], frame2, "op2 must be second flushed frame"); - - // After flush, a new op is sent immediately (no buffering). - let op3 = crdt.doc.items[0].stage.set("3_qa".to_string()).sign(&kp); - crdt.apply(op3.clone()); - let frame3 = crate::crdt_wire::encode(&op3); - if peer_ready { - sent_frames.push(frame3.clone()); - } else { - op_buffer.push(frame3.clone()); - } - assert_eq!( - sent_frames.len(), - 3, - "op after flush must be sent immediately" - ); - assert_eq!(op_buffer.len(), 0, "buffer must remain empty"); - } - - #[test] - fn realtime_op_from_peer_applied_immediately_regardless_of_ready_state() { - use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; - use bft_json_crdt::keypair::make_keypair; - use bft_json_crdt::op::ROOT_ID; - use serde_json::json; - - use crate::crdt_state::PipelineDoc; - - // Node A creates a bulk op and a real-time op that causally depends on it. - let kp_a = make_keypair(); - let mut crdt_a = BaseCrdt::::new(&kp_a); - let item: bft_json_crdt::json_crdt::JsonValue = json!({ - "story_id": "632_causal_test", - "stage": "1_backlog", - "name": "Causal Test", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let bulk_op = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); - crdt_a.apply(bulk_op.clone()); - - // Real-time op that causally depends on bulk_op. - let rt_op = crdt_a.doc.items[0] - .stage - .set("2_current".to_string()) - .sign_with_dependencies(&kp_a, vec![&bulk_op]); - crdt_a.apply(rt_op.clone()); - - // Node B receives the bulk op first (simulating the bulk-delta phase). - let kp_b = make_keypair(); - let mut crdt_b = BaseCrdt::::new(&kp_b); - assert_eq!( - crdt_b.apply(bulk_op.clone()), - OpState::Ok, - "bulk op must apply cleanly on node B" - ); - - // Node B has not yet sent its own Ready frame, but it receives A's - // real-time binary frame. It must be applied immediately via - // handle_incoming_binary (causal queue handles ordering). - let wire = crate::crdt_wire::encode(&rt_op); - let decoded = crate::crdt_wire::decode(&wire).unwrap(); - let result = crdt_b.apply(decoded); - assert_eq!( - result, - OpState::Ok, - "real-time op depending on bulk op must apply immediately after bulk op is present" - ); - - // Both nodes converge to the same state. - assert_eq!( - crdt_b.doc.items[0].stage.view(), - JV::String("2_current".to_string()), - "Node B must converge to stage 2_current" - ); - assert_eq!( - CrdtNode::view(&crdt_a.doc.items), - CrdtNode::view(&crdt_b.doc.items), - "Both nodes must converge to identical state" - ); - } - - #[test] - fn incoming_realtime_op_applied_before_we_send_ready() { - use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; - use bft_json_crdt::keypair::make_keypair; - use bft_json_crdt::op::ROOT_ID; - use serde_json::json; - - use crate::crdt_state::PipelineDoc; - - // Simulate: we have NOT sent our ready yet (own_ready_sent = false), - // but the peer sends us a real-time binary op. It must be applied. - let own_ready_sent = false; // we haven't sent ready yet - let _ = own_ready_sent; // doc-only; receiving side never gates on this - - let kp_peer = make_keypair(); - let mut crdt_peer = BaseCrdt::::new(&kp_peer); - let item: bft_json_crdt::json_crdt::JsonValue = json!({ - "story_id": "632_ac3_test", - "stage": "1_backlog", - "name": "AC3 Test", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let rt_op = crdt_peer.doc.items.insert(ROOT_ID, item).sign(&kp_peer); - crdt_peer.apply(rt_op.clone()); - - // Node B decodes and applies the peer's real-time op directly. - let kp_b = make_keypair(); - let mut crdt_b = BaseCrdt::::new(&kp_b); - let wire = crate::crdt_wire::encode(&rt_op); - let decoded = crate::crdt_wire::decode(&wire).unwrap(); - // This mirrors handle_incoming_binary() — applied unconditionally. - let result = crdt_b.apply(decoded); - assert_eq!( - result, - OpState::Ok, - "incoming real-time op must be applied immediately regardless of own ready state" - ); - assert_eq!( - crdt_b.doc.items[0].stage.view(), - JV::String("1_backlog".to_string()), - "op content must be correct" - ); - } - - #[test] - fn existing_crdt_sync_tests_pass_unchanged() { - // Reaching this point means all prior tests in this module compiled - // and passed. This test documents the AC6 intent. - } -} diff --git a/server/src/crdt_sync/dispatch/mod.rs b/server/src/crdt_sync/dispatch/mod.rs new file mode 100644 index 00000000..1f241201 --- /dev/null +++ b/server/src/crdt_sync/dispatch/mod.rs @@ -0,0 +1,83 @@ +//! Shared message dispatch — handle inbound text/binary frames once auth is complete. + +use bft_json_crdt::json_crdt::SignedOp; + +use crate::crdt_snapshot; +use crate::crdt_state; +use crate::crdt_wire; +use crate::slog; + +use super::wire::SyncMessage; + +mod snapshot; + +#[cfg(test)] +mod tests; + +/// Dispatch an incoming text frame from a peer. +/// +/// First attempts to parse the frame as a snapshot protocol message; on +/// failure falls through to the legacy [`SyncMessage`] handler. +pub(super) fn handle_incoming_text(text: &str) { + // First try to parse as a snapshot protocol message. + if let Ok(snapshot_msg) = serde_json::from_str::(text) { + snapshot::handle_snapshot_message(snapshot_msg); + return; + } + + let msg: SyncMessage = match serde_json::from_str(text) { + Ok(m) => m, + Err(e) => { + slog!("[crdt-sync] Bad text message from peer: {e}"); + return; + } + }; + + match msg { + SyncMessage::Bulk { ops } => { + let mut applied = 0u64; + for op_json in &ops { + if let Ok(signed_op) = serde_json::from_str::(op_json) + && crdt_state::apply_remote_op(signed_op) + { + applied += 1; + } + } + slog!( + "[crdt-sync] Bulk sync: received {} ops, applied {applied}", + ops.len() + ); + } + SyncMessage::Op { op } => { + if let Ok(signed_op) = serde_json::from_str::(&op) { + crdt_state::apply_remote_op(signed_op); + } + } + SyncMessage::Clock { .. } => { + // Clock frames are handled during the initial negotiation phase. + // If one arrives during the streaming loop it is a protocol error + // on the peer's part — log and ignore. + slog!("[crdt-sync] Ignoring unexpected clock frame during streaming phase"); + } + SyncMessage::Ready => { + // Ready frames are intercepted inline in the streaming loop before + // this function is called. If one reaches here it is a protocol + // error — log and ignore. + slog!("[crdt-sync] Ignoring unexpected ready frame in handle_incoming_text"); + } + } +} + +/// Process an incoming binary-frame op from a peer. +/// +/// Binary frames carry a single `SignedOp` encoded via [`crdt_wire`]. +pub(super) fn handle_incoming_binary(bytes: &[u8]) { + match crdt_wire::decode(bytes) { + Ok(signed_op) => { + crdt_state::apply_remote_op(signed_op); + } + Err(e) => { + slog!("[crdt-sync] Bad binary frame from peer: {e}"); + } + } +} diff --git a/server/src/crdt_sync/dispatch/snapshot.rs b/server/src/crdt_sync/dispatch/snapshot.rs new file mode 100644 index 00000000..e3cb8d2c --- /dev/null +++ b/server/src/crdt_sync/dispatch/snapshot.rs @@ -0,0 +1,49 @@ +//! Snapshot protocol message handling for the CRDT sync layer. + +use crate::crdt_snapshot; +use crate::crdt_state; +use crate::slog; + +/// Handle an incoming snapshot protocol message. +/// +/// - **Snapshot**: apply the snapshot state and send an ack back. +/// Peers without snapshot support will never reach this code path because +/// the `SnapshotMessage` parse will fail and the message falls through to +/// the legacy `SyncMessage` handler, which logs and ignores unknown types. +/// - **SnapshotAck**: record the ack for quorum tracking. +pub(super) fn handle_snapshot_message(msg: crdt_snapshot::SnapshotMessage) { + match msg { + crdt_snapshot::SnapshotMessage::Snapshot(snapshot) => { + slog!( + "[crdt-sync] Received snapshot at_seq={}, {} ops, {} manifest entries", + snapshot.at_seq, + snapshot.state.len(), + snapshot.op_manifest.len() + ); + // Apply compaction on this peer. + crdt_snapshot::apply_compaction(snapshot.clone()); + + // Send ack back to leader via the sync broadcast channel. + // The ack is sent as a CRDT event that the streaming loop picks up. + // For now, log the ack intent — actual transport is handled by the + // caller that invokes handle_incoming_text. + slog!( + "[crdt-sync] Snapshot applied, ack for at_seq={}", + snapshot.at_seq + ); + } + crdt_snapshot::SnapshotMessage::SnapshotAck(ack) => { + if let Some(node_id) = crdt_state::our_node_id() { + let _ = node_id; // The ack comes from a peer, not from us. + } + slog!( + "[crdt-sync] Received snapshot_ack for at_seq={}", + ack.at_seq + ); + // Record the ack — the coordination logic checks for quorum. + // Note: we don't know the peer's node_id from the message alone; + // in a full implementation the ack would include the sender's + // node_id. For now we log it for protocol completeness. + } + } +} diff --git a/server/src/crdt_sync/dispatch/tests.rs b/server/src/crdt_sync/dispatch/tests.rs new file mode 100644 index 00000000..8319499e --- /dev/null +++ b/server/src/crdt_sync/dispatch/tests.rs @@ -0,0 +1,907 @@ +//! Tests for the CRDT sync dispatch module. + +use super::*; + +#[test] +fn handle_incoming_text_bad_json_does_not_panic() { + handle_incoming_text("not valid json"); +} + +#[test] +fn handle_incoming_text_bulk_with_invalid_ops_does_not_panic() { + let msg = SyncMessage::Bulk { + ops: vec!["not a valid signed op".to_string()], + }; + let json = serde_json::to_string(&msg).unwrap(); + handle_incoming_text(&json); +} + +#[test] +fn handle_incoming_text_op_with_invalid_op_does_not_panic() { + let msg = SyncMessage::Op { + op: "garbage".to_string(), + }; + let json = serde_json::to_string(&msg).unwrap(); + handle_incoming_text(&json); +} + +#[test] +fn handle_incoming_binary_bad_bytes_does_not_panic() { + handle_incoming_binary(b"not valid wire codec"); +} + +#[test] +fn handle_incoming_binary_empty_bytes_does_not_panic() { + handle_incoming_binary(b""); +} + +#[test] +fn subscribe_ops_returns_none_before_init() { + // Before crdt_state::init() the channel doesn't exist yet. + // In test binaries it may or may not be initialised depending on + // other tests, so we just verify no panic. + let _ = crdt_state::subscribe_ops(); +} + +#[test] +fn all_ops_json_returns_none_before_init() { + let _ = crdt_state::all_ops_json(); +} + +#[test] +fn two_node_sync_via_protocol_messages() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + // ── Node A: create an item ── + let kp_a = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "100_story_sync_test", + "stage": "1_backlog", + "name": "Sync Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); + assert_eq!(crdt_a.apply(op1.clone()), OpState::Ok); + + // Serialise op1 into a SyncMessage::Op. + let op1_json = serde_json::to_string(&op1).unwrap(); + let wire_msg = SyncMessage::Op { + op: op1_json.clone(), + }; + let wire_json = serde_json::to_string(&wire_msg).unwrap(); + + // ── Node B: receive the op through protocol ── + let kp_b = make_keypair(); + let mut crdt_b = BaseCrdt::::new(&kp_b); + assert!(crdt_b.doc.items.view().is_empty()); + + // Parse wire message and apply. + let parsed: SyncMessage = serde_json::from_str(&wire_json).unwrap(); + match parsed { + SyncMessage::Op { op } => { + let signed_op: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&op).unwrap(); + let result = crdt_b.apply(signed_op); + assert_eq!(result, OpState::Ok); + } + _ => panic!("Expected Op"), + } + + // Verify Node B has the same state as Node A. + assert_eq!(crdt_b.doc.items.view().len(), 1); + assert_eq!( + crdt_a.doc.items[0].story_id.view(), + crdt_b.doc.items[0].story_id.view() + ); + assert_eq!( + crdt_a.doc.items[0].stage.view(), + crdt_b.doc.items[0].stage.view() + ); + + // ── Node A: update stage ── + let op2 = crdt_a.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp_a); + crdt_a.apply(op2.clone()); + + // Send via bulk message. + let op2_json = serde_json::to_string(&op2).unwrap(); + let bulk_msg = SyncMessage::Bulk { + ops: vec![op1_json, op2_json], + }; + let bulk_wire = serde_json::to_string(&bulk_msg).unwrap(); + + // ── Node C: receives full state via bulk ── + let kp_c = make_keypair(); + let mut crdt_c = BaseCrdt::::new(&kp_c); + + let parsed_bulk: SyncMessage = serde_json::from_str(&bulk_wire).unwrap(); + match parsed_bulk { + SyncMessage::Bulk { ops } => { + for op_str in &ops { + let signed: bft_json_crdt::json_crdt::SignedOp = + serde_json::from_str(op_str).unwrap(); + crdt_c.apply(signed); + } + } + _ => panic!("Expected Bulk"), + } + + // Node C should have the updated stage. + assert_eq!(crdt_c.doc.items.view().len(), 1); + assert_eq!( + crdt_c.doc.items[0].stage.view(), + bft_json_crdt::json_crdt::JsonValue::String("2_current".to_string()) + ); +} + +#[test] +fn partition_heal_via_bulk_replay() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + let kp = make_keypair(); + + // Node A creates an item and advances it. + let mut crdt_a = BaseCrdt::::new(&kp); + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "200_story_heal", + "stage": "1_backlog", + "name": "Heal Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt_a.apply(op1.clone()); + + let op2 = crdt_a.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp); + crdt_a.apply(op2.clone()); + + let op3 = crdt_a.doc.items[0].stage.set("3_qa".to_string()).sign(&kp); + crdt_a.apply(op3.clone()); + + // Serialise all ops as a bulk message (simulates partition heal). + let ops_json: Vec = [&op1, &op2, &op3] + .iter() + .map(|op| serde_json::to_string(op).unwrap()) + .collect(); + let bulk = SyncMessage::Bulk { ops: ops_json }; + let wire = serde_json::to_string(&bulk).unwrap(); + + // Node B receives the bulk and reconstructs state. + let mut crdt_b = BaseCrdt::::new(&kp); + let parsed: SyncMessage = serde_json::from_str(&wire).unwrap(); + match parsed { + SyncMessage::Bulk { ops } => { + for op_str in &ops { + let signed: bft_json_crdt::json_crdt::SignedOp = + serde_json::from_str(op_str).unwrap(); + crdt_b.apply(signed); + } + } + _ => panic!("Expected Bulk"), + } + + // Node B should match Node A exactly. + assert_eq!(crdt_b.doc.items.view().len(), 1); + assert_eq!( + crdt_b.doc.items[0].stage.view(), + JV::String("3_qa".to_string()) + ); + assert_eq!( + crdt_a.doc.items[0].stage.view(), + crdt_b.doc.items[0].stage.view() + ); + assert_eq!( + crdt_a.doc.items[0].name.view(), + crdt_b.doc.items[0].name.view() + ); +} + +#[test] +fn self_loop_dedup_second_apply_is_noop() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "507_dedup_test", + "stage": "1_backlog", + "name": "Dedup Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + + // First apply: succeeds. + assert_eq!(crdt.apply(op.clone()), OpState::Ok); + assert_eq!(crdt.doc.items.view().len(), 1); + + // Second apply (self-loop): must be a no-op. + assert_eq!(crdt.apply(op.clone()), OpState::AlreadySeen); + + // State must not have changed. + assert_eq!(crdt.doc.items.view().len(), 1); + + // Stage update also deduplicated correctly. + let stage_op = crdt.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp); + assert_eq!(crdt.apply(stage_op.clone()), OpState::Ok); + assert_eq!( + crdt.doc.items[0].stage.view(), + JV::String("2_current".to_string()) + ); + assert_eq!(crdt.apply(stage_op), OpState::AlreadySeen); + assert_eq!( + crdt.doc.items[0].stage.view(), + JV::String("2_current".to_string()), + "stage must not change on duplicate apply" + ); +} + +#[test] +fn out_of_order_causal_queueing_releases_on_dep_arrival() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "507_causal_test", + "stage": "1_backlog", + "name": "Causal Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + // op1 = insert item (no deps) + let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + + // op2 = set stage, declared to depend on op1 + // We must first apply op1 locally to generate op2 from the right state, + // then we'll test op2-before-op1 on a fresh CRDT. + crdt.apply(op1.clone()); + let op2 = crdt.doc.items[0] + .stage + .set("2_current".to_string()) + .sign_with_dependencies(&kp, vec![&op1]); + + // Create a fresh receiver CRDT. + let mut receiver = BaseCrdt::::new(&kp); + + // Apply op2 first — dependency (op1) has not arrived yet. + let r = receiver.apply(op2.clone()); + assert_eq!( + r, + OpState::MissingCausalDependencies, + "op2 must be queued when op1 has not arrived" + ); + // Queue length must reflect the held op. + assert_eq!(receiver.causal_queue_len(), 1); + + // Item has NOT been inserted yet (op1 not applied). + assert_eq!(receiver.doc.items.view().len(), 0); + + // Now deliver op1 — this should trigger op2 to be flushed automatically. + let r = receiver.apply(op1.clone()); + assert_eq!(r, OpState::Ok); + + // Both ops are now applied — item is present at stage 2_current. + assert_eq!(receiver.doc.items.view().len(), 1); + assert_eq!( + receiver.doc.items[0].stage.view(), + JV::String("2_current".to_string()), + "op2 must have been applied automatically after op1 arrived" + ); + + // Queue must be empty now. + assert_eq!(receiver.causal_queue_len(), 0); +} + +#[test] +fn in_order_apply_works_without_queueing() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + let kp = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp); + + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "507_inorder_test", + "stage": "1_backlog", + "name": "In-Order Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + + let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt_a.apply(op1.clone()); + let op2 = crdt_a.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp); + crdt_a.apply(op2.clone()); + let op3 = crdt_a.doc.items[0].stage.set("3_qa".to_string()).sign(&kp); + crdt_a.apply(op3.clone()); + + // Receiver applies all ops in the correct order. + let mut crdt_b = BaseCrdt::::new(&kp); + assert_eq!(crdt_b.apply(op1), OpState::Ok); + assert_eq!(crdt_b.apply(op2), OpState::Ok); + assert_eq!(crdt_b.apply(op3), OpState::Ok); + assert_eq!(crdt_b.causal_queue_len(), 0); + assert_eq!( + crdt_b.doc.items[0].stage.view(), + JV::String("3_qa".to_string()) + ); +} + +#[test] +fn causal_queue_overflow_drops_oldest() { + use bft_json_crdt::json_crdt::{BaseCrdt, CAUSAL_QUEUE_MAX, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + let kp = make_keypair(); + + // Build one "phantom" op that we'll claim as a dependency but never deliver. + // We do this by creating it on a separate CRDT and never applying it. + let mut source = BaseCrdt::::new(&kp); + let phantom_item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "507_phantom", + "stage": "1_backlog", + "name": "Phantom", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let phantom_op = source.doc.items.insert(ROOT_ID, phantom_item).sign(&kp); + + // Receiver never sees phantom_op, so any op declaring it as a dep will + // sit in the causal queue forever (until evicted by overflow). + let mut receiver = BaseCrdt::::new(&kp); + source.apply(phantom_op.clone()); + + // Send CAUSAL_QUEUE_MAX + 5 stage-update ops all depending on phantom_op. + // Each one will be queued because phantom_op is never delivered. + let mut queued = 0usize; + for i in 0..CAUSAL_QUEUE_MAX + 5 { + let stage_name = format!("stage_{i}"); + // Generate from source so seq numbers are valid. + let op = source.doc.items[0] + .stage + .set(stage_name) + .sign_with_dependencies(&kp, vec![&phantom_op]); + source.apply(op.clone()); + let r = receiver.apply(op); + if r == OpState::MissingCausalDependencies { + queued += 1; + } + } + + // We sent more than CAUSAL_QUEUE_MAX ops, but the queue must stay bounded. + assert!( + receiver.causal_queue_len() <= CAUSAL_QUEUE_MAX, + "queue ({}) must not exceed CAUSAL_QUEUE_MAX ({CAUSAL_QUEUE_MAX})", + receiver.causal_queue_len() + ); + assert!( + queued > 0, + "at least some ops must have been accepted into the queue" + ); +} + +#[test] +fn convergence_after_partition_and_replay() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + let kp_a = make_keypair(); + let kp_b = make_keypair(); + + let mut crdt_a = BaseCrdt::::new(&kp_a); + let mut crdt_b = BaseCrdt::::new(&kp_b); + + // ── Phase 1: A generates ops while partitioned from B ────────────── + + let item_a: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "507_convergence_a", + "stage": "1_backlog", + "name": "Story A", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op_a1 = crdt_a.doc.items.insert(ROOT_ID, item_a).sign(&kp_a); + crdt_a.apply(op_a1.clone()); + + let op_a2 = crdt_a.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp_a); + crdt_a.apply(op_a2.clone()); + + // ── Phase 2: B generates ops while partitioned from A ────────────── + + let item_b: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "507_convergence_b", + "stage": "1_backlog", + "name": "Story B", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op_b1 = crdt_b.doc.items.insert(ROOT_ID, item_b).sign(&kp_b); + crdt_b.apply(op_b1.clone()); + + let op_b2 = crdt_b.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp_b); + crdt_b.apply(op_b2.clone()); + + // ── Phase 3: Reconnect — both sides replay all buffered ops ──────── + + // A sends its ops to B. + let r = crdt_b.apply(op_a1.clone()); + assert!(r == OpState::Ok || r == OpState::AlreadySeen); + let r = crdt_b.apply(op_a2.clone()); + assert!(r == OpState::Ok || r == OpState::AlreadySeen); + + // B sends its ops to A. + let r = crdt_a.apply(op_b1.clone()); + assert!(r == OpState::Ok || r == OpState::AlreadySeen); + let r = crdt_a.apply(op_b2.clone()); + assert!(r == OpState::Ok || r == OpState::AlreadySeen); + + // ── Phase 4: Assert convergence ──────────────────────────────────── + + // Both nodes must have both stories. + assert_eq!( + crdt_a.doc.items.view().len(), + 2, + "A must have 2 items after convergence" + ); + assert_eq!( + crdt_b.doc.items.view().len(), + 2, + "B must have 2 items after convergence" + ); + + // Serialise both CRDT views to JSON and assert byte-identical. + let view_a = serde_json::to_string(&CrdtNode::view(&crdt_a.doc.items)).unwrap(); + let view_b = serde_json::to_string(&CrdtNode::view(&crdt_b.doc.items)).unwrap(); + assert_eq!( + view_a, view_b, + "CRDT states must be byte-identical after convergence" + ); + + // Spot-check: both stories are at 2_current on both nodes. + let stories_a: Vec = crdt_a + .doc + .items + .view() + .iter() + .filter_map(|item| { + if let JV::Object(m) = CrdtNode::view(item) { + m.get("story_id").and_then(|s| { + if let JV::String(s) = s { + Some(s.clone()) + } else { + None + } + }) + } else { + None + } + }) + .collect(); + assert!( + stories_a.contains(&"507_convergence_a".to_string()), + "A must contain story_a" + ); + assert!( + stories_a.contains(&"507_convergence_b".to_string()), + "A must contain story_b" + ); +} + +#[test] +fn existing_sync_tests_unchanged() { + // If we got here, all previous crdt_sync tests compiled and passed. + // This test exists as a documentation anchor for AC9. +} + +#[test] +fn v1_peer_ignores_clock_message_gracefully() { + // Simulate: v1 peer only knows Bulk and Op. + // A clock message should fail deserialization (unknown variant). + let clock_json = r#"{"type":"clock","clock":{"abc":10}}"#; + // handle_incoming_text logs and returns — must not panic. + handle_incoming_text(clock_json); +} + +#[test] +fn v2_delta_sync_via_clock_exchange() { + use bft_json_crdt::json_crdt::BaseCrdt; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use fastcrypto::traits::KeyPair; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + let kp_a = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + + // A creates 5 items. + let mut ops_a = Vec::new(); + for i in 0..5 { + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": format!("631_v2_{i}"), + "stage": "1_backlog", + "name": format!("v2 item {i}"), + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); + crdt_a.apply(op.clone()); + ops_a.push(op); + } + + // B has seen the first 3 ops (clock says 3). + let kp_b = make_keypair(); + let mut crdt_b = BaseCrdt::::new(&kp_b); + for op in &ops_a[..3] { + crdt_b.apply(op.clone()); + } + assert_eq!(crdt_b.doc.items.view().len(), 3); + + // Build B's clock. + let author_a_hex = crate::crdt_state::hex::encode(&kp_a.public().0.to_bytes()); + let mut clock_b = std::collections::HashMap::new(); + clock_b.insert(author_a_hex.clone(), 3u64); + + // Serialize clock as wire message. + let clock_msg = SyncMessage::Clock { + clock: clock_b.clone(), + }; + let clock_wire = serde_json::to_string(&clock_msg).unwrap(); + + // A receives B's clock and computes delta. + let parsed: SyncMessage = serde_json::from_str(&clock_wire).unwrap(); + let delta_ops = match parsed { + SyncMessage::Clock { clock: peer_clock } => { + // Simulate ops_since: A has 5 ops from author_a, B has 3. + let all_json: Vec = ops_a + .iter() + .map(|op| serde_json::to_string(op).unwrap()) + .collect(); + let mut result = Vec::new(); + let mut count = 0u64; + for (i, _op) in ops_a.iter().enumerate() { + count += 1; + let peer_has = peer_clock.get(&author_a_hex).copied().unwrap_or(0); + if count > peer_has { + result.push(all_json[i].clone()); + } + } + result + } + _ => panic!("Expected Clock"), + }; + + assert_eq!(delta_ops.len(), 2, "delta should be 2 ops (ops 4 and 5)"); + + // B applies the delta. + for op_str in &delta_ops { + let signed: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(op_str).unwrap(); + crdt_b.apply(signed); + } + assert_eq!(crdt_b.doc.items.view().len(), 5); +} + +#[test] +fn buffer_flush_ops_held_until_peer_ready() { + use bft_json_crdt::json_crdt::BaseCrdt; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + // Simulate the buffer-flush state machine without real WebSockets. + let mut peer_ready = false; + let mut op_buffer: Vec> = Vec::new(); // encoded wire frames + let mut sent_frames: Vec> = Vec::new(); // captured "sent" frames + + // Build two local ops on a fresh CRDT. + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "632_buffer_a", + "stage": "1_backlog", + "name": "Buffer A", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt.apply(op1.clone()); + let op2 = crdt.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp); + crdt.apply(op2.clone()); + + // Simulate op1 arriving from broadcast while peer is NOT ready. + let frame1 = crate::crdt_wire::encode(&op1); + if peer_ready { + sent_frames.push(frame1.clone()); + } else { + op_buffer.push(frame1.clone()); + } + + // Simulate op2 arriving while peer is still NOT ready. + let frame2 = crate::crdt_wire::encode(&op2); + if peer_ready { + sent_frames.push(frame2.clone()); + } else { + op_buffer.push(frame2.clone()); + } + + // Both ops must be buffered, none sent yet. + assert_eq!( + sent_frames.len(), + 0, + "ops must be buffered before peer Ready arrives" + ); + assert_eq!(op_buffer.len(), 2, "buffer must hold both ops"); + + // Simulate receiving the peer's Ready frame. + let ready_json = serde_json::to_string(&SyncMessage::Ready).unwrap(); + if let Ok(SyncMessage::Ready) = serde_json::from_str::(&ready_json) { + peer_ready = true; + for encoded in op_buffer.drain(..) { + sent_frames.push(encoded); + } + } + + assert!(peer_ready, "peer_ready must be true after Ready frame"); + assert_eq!(op_buffer.len(), 0, "buffer must be empty after flush"); + assert_eq!( + sent_frames.len(), + 2, + "both buffered ops must appear in captured frames after flush" + ); + // Order preserved: op1 before op2. + assert_eq!(sent_frames[0], frame1, "op1 must be first flushed frame"); + assert_eq!(sent_frames[1], frame2, "op2 must be second flushed frame"); + + // After flush, a new op is sent immediately (no buffering). + let op3 = crdt.doc.items[0].stage.set("3_qa".to_string()).sign(&kp); + crdt.apply(op3.clone()); + let frame3 = crate::crdt_wire::encode(&op3); + if peer_ready { + sent_frames.push(frame3.clone()); + } else { + op_buffer.push(frame3.clone()); + } + assert_eq!( + sent_frames.len(), + 3, + "op after flush must be sent immediately" + ); + assert_eq!(op_buffer.len(), 0, "buffer must remain empty"); +} + +#[test] +fn realtime_op_from_peer_applied_immediately_regardless_of_ready_state() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + // Node A creates a bulk op and a real-time op that causally depends on it. + let kp_a = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "632_causal_test", + "stage": "1_backlog", + "name": "Causal Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let bulk_op = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); + crdt_a.apply(bulk_op.clone()); + + // Real-time op that causally depends on bulk_op. + let rt_op = crdt_a.doc.items[0] + .stage + .set("2_current".to_string()) + .sign_with_dependencies(&kp_a, vec![&bulk_op]); + crdt_a.apply(rt_op.clone()); + + // Node B receives the bulk op first (simulating the bulk-delta phase). + let kp_b = make_keypair(); + let mut crdt_b = BaseCrdt::::new(&kp_b); + assert_eq!( + crdt_b.apply(bulk_op.clone()), + OpState::Ok, + "bulk op must apply cleanly on node B" + ); + + // Node B has not yet sent its own Ready frame, but it receives A's + // real-time binary frame. It must be applied immediately via + // handle_incoming_binary (causal queue handles ordering). + let wire = crate::crdt_wire::encode(&rt_op); + let decoded = crate::crdt_wire::decode(&wire).unwrap(); + let result = crdt_b.apply(decoded); + assert_eq!( + result, + OpState::Ok, + "real-time op depending on bulk op must apply immediately after bulk op is present" + ); + + // Both nodes converge to the same state. + assert_eq!( + crdt_b.doc.items[0].stage.view(), + JV::String("2_current".to_string()), + "Node B must converge to stage 2_current" + ); + assert_eq!( + CrdtNode::view(&crdt_a.doc.items), + CrdtNode::view(&crdt_b.doc.items), + "Both nodes must converge to identical state" + ); +} + +#[test] +fn incoming_realtime_op_applied_before_we_send_ready() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + // Simulate: we have NOT sent our ready yet (own_ready_sent = false), + // but the peer sends us a real-time binary op. It must be applied. + let own_ready_sent = false; // we haven't sent ready yet + let _ = own_ready_sent; // doc-only; receiving side never gates on this + + let kp_peer = make_keypair(); + let mut crdt_peer = BaseCrdt::::new(&kp_peer); + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "632_ac3_test", + "stage": "1_backlog", + "name": "AC3 Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let rt_op = crdt_peer.doc.items.insert(ROOT_ID, item).sign(&kp_peer); + crdt_peer.apply(rt_op.clone()); + + // Node B decodes and applies the peer's real-time op directly. + let kp_b = make_keypair(); + let mut crdt_b = BaseCrdt::::new(&kp_b); + let wire = crate::crdt_wire::encode(&rt_op); + let decoded = crate::crdt_wire::decode(&wire).unwrap(); + // This mirrors handle_incoming_binary() — applied unconditionally. + let result = crdt_b.apply(decoded); + assert_eq!( + result, + OpState::Ok, + "incoming real-time op must be applied immediately regardless of own ready state" + ); + assert_eq!( + crdt_b.doc.items[0].stage.view(), + JV::String("1_backlog".to_string()), + "op content must be correct" + ); +} + +#[test] +fn existing_crdt_sync_tests_pass_unchanged() { + // Reaching this point means all prior tests in this module compiled + // and passed. This test documents the AC6 intent. +}