//! CRDT sync — WebSocket-based replication of pipeline state between huskies nodes. /// WebSocket-based CRDT sync layer for replicating pipeline state between /// huskies nodes. /// /// # Protocol /// /// The sync protocol is a hybrid of two frame types: /// /// ## Text frames (bulk initial state) /// A JSON object with a `"type"` field: /// - `{"type":"bulk","ops":[...]}` — Initial state dump (array of serialised /// `SignedOp` JSON strings). Sent by both sides immediately after connect. /// /// ## Binary frames (real-time op broadcast) /// Individual `SignedOp`s encoded via [`crate::crdt_wire`] (versioned JSON /// envelope: `{"v":1,"op":{...}}`). Each locally-applied op is immediately /// broadcast as a binary frame to all connected peers. /// /// Both the server endpoint and the rendezvous client use the same protocol, /// making the connection fully symmetric. /// /// ## Backpressure /// Each connected peer has its own [`tokio::sync::broadcast`] receiver. If a /// slow peer allows the channel to fill (indicated by a `Lagged` error), the /// connection is dropped with a warning log. The peer can reconnect and /// receive a fresh bulk state dump to catch up. use bft_json_crdt::json_crdt::SignedOp; use futures::{SinkExt, StreamExt}; use poem::handler; use poem::web::Data; use poem::web::websocket::{Message as WsMessage, WebSocket}; use serde::{Deserialize, Serialize}; use std::sync::Arc; use crate::crdt_state; use crate::crdt_wire; use crate::http::context::AppContext; use crate::slog; use crate::slog_error; use crate::slog_warn; // ── Wire protocol types ───────────────────────────────────────────── #[derive(Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] enum SyncMessage { /// Bulk state dump sent on connect. Bulk { ops: Vec }, /// A single new op. Op { op: String }, } // ── Server-side WebSocket handler ─────────────────────────────────── #[handler] pub async fn crdt_sync_handler( ws: WebSocket, _ctx: Data<&Arc>, ) -> impl poem::IntoResponse { ws.on_upgrade(|socket| async move { let (mut sink, mut stream) = socket.split(); slog!("[crdt-sync] Peer connected"); // Send bulk state dump. if let Some(ops) = crdt_state::all_ops_json() { let msg = SyncMessage::Bulk { ops }; if let Ok(json) = serde_json::to_string(&msg) && sink.send(WsMessage::Text(json)).await.is_err() { return; } } // Subscribe to new local ops. let Some(mut op_rx) = crdt_state::subscribe_ops() else { return; }; loop { tokio::select! { // Forward new local ops to the peer encoded via the wire codec. result = op_rx.recv() => { match result { Ok(signed_op) => { let bytes = crdt_wire::encode(&signed_op); if sink.send(WsMessage::Binary(bytes)).await.is_err() { break; } } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { // The peer cannot keep up; disconnect so it can // reconnect and receive a fresh bulk state dump. slog!("[crdt-sync] Slow peer lagged {n} ops; disconnecting"); break; } Err(_) => break, } } // Receive ops from the peer. frame = stream.next() => { match frame { Some(Ok(WsMessage::Text(text))) => { // Bulk state dump or legacy text-frame op. handle_incoming_text(&text); } Some(Ok(WsMessage::Binary(bytes))) => { // Real-time op encoded via wire codec. handle_incoming_binary(&bytes); } Some(Ok(WsMessage::Close(_))) | None => break, _ => {} } } } } slog!("[crdt-sync] Peer disconnected"); }) } /// Process an incoming text-frame sync message from a peer. /// /// Text frames carry the bulk state dump (`SyncMessage::Bulk`) or legacy /// single-op messages (`SyncMessage::Op`). fn handle_incoming_text(text: &str) { let msg: SyncMessage = match serde_json::from_str(text) { Ok(m) => m, Err(e) => { slog!("[crdt-sync] Bad text message from peer: {e}"); return; } }; match msg { SyncMessage::Bulk { ops } => { let mut applied = 0u64; for op_json in &ops { if let Ok(signed_op) = serde_json::from_str::(op_json) && crdt_state::apply_remote_op(signed_op) { applied += 1; } } slog!( "[crdt-sync] Bulk sync: received {} ops, applied {applied}", ops.len() ); } SyncMessage::Op { op } => { if let Ok(signed_op) = serde_json::from_str::(&op) { crdt_state::apply_remote_op(signed_op); } } } } /// Process an incoming binary-frame op from a peer. /// /// Binary frames carry a single `SignedOp` encoded via [`crdt_wire`]. fn handle_incoming_binary(bytes: &[u8]) { match crdt_wire::decode(bytes) { Ok(signed_op) => { crdt_state::apply_remote_op(signed_op); } Err(e) => { slog!("[crdt-sync] Bad binary frame from peer: {e}"); } } } // ── Rendezvous client ─────────────────────────────────────────────── /// Number of consecutive connection failures before escalating from WARN to ERROR. pub const RENDEZVOUS_ERROR_THRESHOLD: u32 = 10; /// Spawn a background task that connects to the configured rendezvous /// peer and exchanges CRDT ops bidirectionally. /// /// The client reconnects with exponential backoff if the connection drops. /// Individual failures are logged at WARN; after [`RENDEZVOUS_ERROR_THRESHOLD`] /// consecutive failures the log level escalates to ERROR. pub fn spawn_rendezvous_client(url: String) { tokio::spawn(async move { let mut backoff_secs = 1u64; let mut consecutive_failures: u32 = 0; loop { slog!("[crdt-sync] Connecting to rendezvous peer: {url}"); match connect_and_sync(&url).await { Ok(()) => { slog!("[crdt-sync] Rendezvous connection closed cleanly"); backoff_secs = 1; consecutive_failures = 0; } Err(e) => { consecutive_failures += 1; if consecutive_failures >= RENDEZVOUS_ERROR_THRESHOLD { slog_error!( "[crdt-sync] Rendezvous peer unreachable ({consecutive_failures} consecutive failures): {e}" ); } else { slog_warn!( "[crdt-sync] Rendezvous connection error (attempt {consecutive_failures}): {e}" ); } } } slog!("[crdt-sync] Reconnecting in {backoff_secs}s..."); tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await; backoff_secs = (backoff_secs * 2).min(30); } }); } /// Connect to a remote sync endpoint and exchange ops until disconnect. async fn connect_and_sync(url: &str) -> Result<(), String> { let (ws_stream, _) = tokio_tungstenite::connect_async(url) .await .map_err(|e| format!("WebSocket connect failed: {e}"))?; let (mut sink, mut stream) = ws_stream.split(); slog!("[crdt-sync] Connected to rendezvous peer"); // Send our bulk state. if let Some(ops) = crdt_state::all_ops_json() { let msg = SyncMessage::Bulk { ops }; if let Ok(json) = serde_json::to_string(&msg) { use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; sink.send(TungsteniteMsg::Text(json.into())) .await .map_err(|e| format!("Send bulk failed: {e}"))?; } } // Subscribe to new local ops. let Some(mut op_rx) = crdt_state::subscribe_ops() else { return Err("CRDT not initialised".to_string()); }; loop { tokio::select! { result = op_rx.recv() => { match result { Ok(signed_op) => { // Encode via wire codec and send as binary frame. let bytes = crdt_wire::encode(&signed_op); use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; if sink.send(TungsteniteMsg::Binary(bytes.into())).await.is_err() { break; } } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { slog!("[crdt-sync] Slow rendezvous link lagged {n} ops; disconnecting"); break; } Err(_) => break, } } frame = stream.next() => { match frame { Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) => { handle_incoming_text(text.as_ref()); } Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(bytes))) => { handle_incoming_binary(&bytes); } Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => break, Some(Err(e)) => { slog!("[crdt-sync] Rendezvous read error: {e}"); break; } _ => {} } } } } Ok(()) } // ── Tests ──────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; #[test] fn sync_message_bulk_serialization_roundtrip() { let msg = SyncMessage::Bulk { ops: vec!["op1".to_string(), "op2".to_string()], }; let json = serde_json::to_string(&msg).unwrap(); assert!(json.contains(r#""type":"bulk""#)); let deserialized: SyncMessage = serde_json::from_str(&json).unwrap(); match deserialized { SyncMessage::Bulk { ops } => { assert_eq!(ops.len(), 2); assert_eq!(ops[0], "op1"); assert_eq!(ops[1], "op2"); } _ => panic!("Expected Bulk"), } } #[test] fn sync_message_op_serialization_roundtrip() { let msg = SyncMessage::Op { op: r#"{"inner":{}}"#.to_string(), }; let json = serde_json::to_string(&msg).unwrap(); assert!(json.contains(r#""type":"op""#)); let deserialized: SyncMessage = serde_json::from_str(&json).unwrap(); match deserialized { SyncMessage::Op { op } => { assert_eq!(op, r#"{"inner":{}}"#); } _ => panic!("Expected Op"), } } #[test] fn handle_incoming_text_bad_json_does_not_panic() { handle_incoming_text("not valid json"); } #[test] fn handle_incoming_text_bulk_with_invalid_ops_does_not_panic() { let msg = SyncMessage::Bulk { ops: vec!["not a valid signed op".to_string()], }; let json = serde_json::to_string(&msg).unwrap(); handle_incoming_text(&json); } #[test] fn handle_incoming_text_op_with_invalid_op_does_not_panic() { let msg = SyncMessage::Op { op: "garbage".to_string(), }; let json = serde_json::to_string(&msg).unwrap(); handle_incoming_text(&json); } #[test] fn handle_incoming_binary_bad_bytes_does_not_panic() { handle_incoming_binary(b"not valid wire codec"); } #[test] fn handle_incoming_binary_empty_bytes_does_not_panic() { handle_incoming_binary(b""); } #[test] fn subscribe_ops_returns_none_before_init() { // Before crdt_state::init() the channel doesn't exist yet. // In test binaries it may or may not be initialised depending on // other tests, so we just verify no panic. let _ = crdt_state::subscribe_ops(); } #[test] fn all_ops_json_returns_none_before_init() { let _ = crdt_state::all_ops_json(); } #[test] fn sync_message_bulk_empty_ops() { let msg = SyncMessage::Bulk { ops: vec![] }; let json = serde_json::to_string(&msg).unwrap(); let deserialized: SyncMessage = serde_json::from_str(&json).unwrap(); match deserialized { SyncMessage::Bulk { ops } => assert!(ops.is_empty()), _ => panic!("Expected Bulk"), } } /// Simulate the sync protocol by creating real SignedOps on two separate /// CRDT instances and exchanging them through the SyncMessage wire format. #[test] fn two_node_sync_via_protocol_messages() { use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, OpState}; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use crate::crdt_state::PipelineDoc; // ── Node A: create an item ── let kp_a = make_keypair(); let mut crdt_a = BaseCrdt::::new(&kp_a); let item: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "100_story_sync_test", "stage": "1_backlog", "name": "Sync Test", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); assert_eq!(crdt_a.apply(op1.clone()), OpState::Ok); // Serialise op1 into a SyncMessage::Op. let op1_json = serde_json::to_string(&op1).unwrap(); let wire_msg = SyncMessage::Op { op: op1_json.clone(), }; let wire_json = serde_json::to_string(&wire_msg).unwrap(); // ── Node B: receive the op through protocol ── let kp_b = make_keypair(); let mut crdt_b = BaseCrdt::::new(&kp_b); assert!(crdt_b.doc.items.view().is_empty()); // Parse wire message and apply. let parsed: SyncMessage = serde_json::from_str(&wire_json).unwrap(); match parsed { SyncMessage::Op { op } => { let signed_op: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&op).unwrap(); let result = crdt_b.apply(signed_op); assert_eq!(result, OpState::Ok); } _ => panic!("Expected Op"), } // Verify Node B has the same state as Node A. assert_eq!(crdt_b.doc.items.view().len(), 1); assert_eq!( crdt_a.doc.items[0].story_id.view(), crdt_b.doc.items[0].story_id.view() ); assert_eq!( crdt_a.doc.items[0].stage.view(), crdt_b.doc.items[0].stage.view() ); // ── Node A: update stage ── let op2 = crdt_a.doc.items[0] .stage .set("2_current".to_string()) .sign(&kp_a); crdt_a.apply(op2.clone()); // Send via bulk message. let op2_json = serde_json::to_string(&op2).unwrap(); let bulk_msg = SyncMessage::Bulk { ops: vec![op1_json, op2_json], }; let bulk_wire = serde_json::to_string(&bulk_msg).unwrap(); // ── Node C: receives full state via bulk ── let kp_c = make_keypair(); let mut crdt_c = BaseCrdt::::new(&kp_c); let parsed_bulk: SyncMessage = serde_json::from_str(&bulk_wire).unwrap(); match parsed_bulk { SyncMessage::Bulk { ops } => { for op_str in &ops { let signed: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(op_str).unwrap(); crdt_c.apply(signed); } } _ => panic!("Expected Bulk"), } // Node C should have the updated stage. assert_eq!(crdt_c.doc.items.view().len(), 1); assert_eq!( crdt_c.doc.items[0].stage.view(), bft_json_crdt::json_crdt::JsonValue::String("2_current".to_string()) ); } /// Verify that a single node's ops (insert + update) can be replayed /// on another node via bulk sync and produce the same final state. /// This is the core property needed for partition healing: when a /// disconnected node reconnects, it sends all its ops as a bulk /// message and the receiver catches up. #[test] fn partition_heal_via_bulk_replay() { use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV}; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use crate::crdt_state::PipelineDoc; let kp = make_keypair(); // Node A creates an item and advances it. let mut crdt_a = BaseCrdt::::new(&kp); let item: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "200_story_heal", "stage": "1_backlog", "name": "Heal Test", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp); crdt_a.apply(op1.clone()); let op2 = crdt_a.doc.items[0] .stage .set("2_current".to_string()) .sign(&kp); crdt_a.apply(op2.clone()); let op3 = crdt_a.doc.items[0].stage.set("3_qa".to_string()).sign(&kp); crdt_a.apply(op3.clone()); // Serialise all ops as a bulk message (simulates partition heal). let ops_json: Vec = [&op1, &op2, &op3] .iter() .map(|op| serde_json::to_string(op).unwrap()) .collect(); let bulk = SyncMessage::Bulk { ops: ops_json }; let wire = serde_json::to_string(&bulk).unwrap(); // Node B receives the bulk and reconstructs state. let mut crdt_b = BaseCrdt::::new(&kp); let parsed: SyncMessage = serde_json::from_str(&wire).unwrap(); match parsed { SyncMessage::Bulk { ops } => { for op_str in &ops { let signed: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(op_str).unwrap(); crdt_b.apply(signed); } } _ => panic!("Expected Bulk"), } // Node B should match Node A exactly. assert_eq!(crdt_b.doc.items.view().len(), 1); assert_eq!( crdt_b.doc.items[0].stage.view(), JV::String("3_qa".to_string()) ); assert_eq!( crdt_a.doc.items[0].stage.view(), crdt_b.doc.items[0].stage.view() ); assert_eq!( crdt_a.doc.items[0].name.view(), crdt_b.doc.items[0].name.view() ); } #[test] fn config_rendezvous_parsed_from_toml() { let toml_str = r#" rendezvous = "ws://remote:3001/crdt-sync" [[agent]] name = "test" "#; let config: crate::config::ProjectConfig = toml::from_str(toml_str).unwrap(); assert_eq!( config.rendezvous.as_deref(), Some("ws://remote:3001/crdt-sync") ); } #[test] fn config_rendezvous_defaults_to_none() { let config = crate::config::ProjectConfig::default(); assert!(config.rendezvous.is_none()); } // ── AC4: Failure logging escalation ────────────────────────────────────── /// AC4: Connection errors must be logged at WARN for the first nine /// consecutive failures and escalate to ERROR from the tenth onwards. #[test] fn failure_counter_warn_below_threshold() { let threshold = super::RENDEZVOUS_ERROR_THRESHOLD; let mut consecutive_failures: u32 = 0; // First threshold-1 failures are below the ERROR threshold. for _ in 0..(threshold - 1) { consecutive_failures += 1; assert!( consecutive_failures < threshold, "failure {consecutive_failures} must be below ERROR threshold {threshold}" ); } } /// AC4: The tenth consecutive failure must trigger ERROR-level logging. #[test] fn failure_counter_error_at_threshold() { let threshold = super::RENDEZVOUS_ERROR_THRESHOLD; let consecutive_failures: u32 = threshold; assert!( consecutive_failures >= threshold, "failure {consecutive_failures} must reach or exceed ERROR threshold {threshold}" ); } /// AC4: A successful connection resets the failure counter so subsequent /// single failures are again logged at WARN (not ERROR). #[test] fn failure_counter_resets_on_success() { let threshold = super::RENDEZVOUS_ERROR_THRESHOLD; // Simulate sustained failure. let mut consecutive_failures: u32 = threshold + 5; assert!(consecutive_failures >= threshold); // Simulate a clean reconnect. consecutive_failures = 0; assert_eq!( consecutive_failures, 0, "counter must reset to 0 on success" ); // Next error is attempt 1 — well below the ERROR threshold. consecutive_failures += 1; assert!( consecutive_failures < threshold, "first failure after reset must be below ERROR threshold" ); } /// AC4: The RENDEZVOUS_ERROR_THRESHOLD constant must equal 10. #[test] fn error_threshold_is_ten() { assert_eq!( super::RENDEZVOUS_ERROR_THRESHOLD, 10, "ERROR escalation threshold must be 10 consecutive failures" ); } // ── AC5: Self-loop dedup ────────────────────────────────────────────────── /// AC5: Applying the same SignedOp twice returns AlreadySeen on the second /// call and leaves the CRDT state unchanged. #[test] fn self_loop_dedup_second_apply_is_noop() { use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use crate::crdt_state::PipelineDoc; let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); let item: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "507_dedup_test", "stage": "1_backlog", "name": "Dedup Test", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); // First apply: succeeds. assert_eq!(crdt.apply(op.clone()), OpState::Ok); assert_eq!(crdt.doc.items.view().len(), 1); // Second apply (self-loop): must be a no-op. assert_eq!(crdt.apply(op.clone()), OpState::AlreadySeen); // State must not have changed. assert_eq!(crdt.doc.items.view().len(), 1); // Stage update also deduplicated correctly. let stage_op = crdt.doc.items[0] .stage .set("2_current".to_string()) .sign(&kp); assert_eq!(crdt.apply(stage_op.clone()), OpState::Ok); assert_eq!( crdt.doc.items[0].stage.view(), JV::String("2_current".to_string()) ); assert_eq!(crdt.apply(stage_op), OpState::AlreadySeen); assert_eq!( crdt.doc.items[0].stage.view(), JV::String("2_current".to_string()), "stage must not change on duplicate apply" ); } // ── AC3 & AC7: Out-of-order causal queueing ─────────────────────────────── /// AC3/AC7: An op whose causal dependency has not yet arrived is held in the /// queue (returns MissingCausalDependencies). When the dependency arrives /// the queued op is released and applied automatically. #[test] fn out_of_order_causal_queueing_releases_on_dep_arrival() { use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use crate::crdt_state::PipelineDoc; let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); let item: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "507_causal_test", "stage": "1_backlog", "name": "Causal Test", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); // op1 = insert item (no deps) let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); // op2 = set stage, declared to depend on op1 // We must first apply op1 locally to generate op2 from the right state, // then we'll test op2-before-op1 on a fresh CRDT. crdt.apply(op1.clone()); let op2 = crdt.doc.items[0] .stage .set("2_current".to_string()) .sign_with_dependencies(&kp, vec![&op1]); // Create a fresh receiver CRDT. let mut receiver = BaseCrdt::::new(&kp); // Apply op2 first — dependency (op1) has not arrived yet. let r = receiver.apply(op2.clone()); assert_eq!( r, OpState::MissingCausalDependencies, "op2 must be queued when op1 has not arrived" ); // Queue length must reflect the held op. assert_eq!(receiver.causal_queue_len(), 1); // Item has NOT been inserted yet (op1 not applied). assert_eq!(receiver.doc.items.view().len(), 0); // Now deliver op1 — this should trigger op2 to be flushed automatically. let r = receiver.apply(op1.clone()); assert_eq!(r, OpState::Ok); // Both ops are now applied — item is present at stage 2_current. assert_eq!(receiver.doc.items.view().len(), 1); assert_eq!( receiver.doc.items[0].stage.view(), JV::String("2_current".to_string()), "op2 must have been applied automatically after op1 arrived" ); // Queue must be empty now. assert_eq!(receiver.causal_queue_len(), 0); } /// AC7: In-order apply works correctly (no causal queueing needed). #[test] fn in_order_apply_works_without_queueing() { use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use crate::crdt_state::PipelineDoc; let kp = make_keypair(); let mut crdt_a = BaseCrdt::::new(&kp); let item: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "507_inorder_test", "stage": "1_backlog", "name": "In-Order Test", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp); crdt_a.apply(op1.clone()); let op2 = crdt_a.doc.items[0] .stage .set("2_current".to_string()) .sign(&kp); crdt_a.apply(op2.clone()); let op3 = crdt_a.doc.items[0].stage.set("3_qa".to_string()).sign(&kp); crdt_a.apply(op3.clone()); // Receiver applies all ops in the correct order. let mut crdt_b = BaseCrdt::::new(&kp); assert_eq!(crdt_b.apply(op1), OpState::Ok); assert_eq!(crdt_b.apply(op2), OpState::Ok); assert_eq!(crdt_b.apply(op3), OpState::Ok); assert_eq!(crdt_b.causal_queue_len(), 0); assert_eq!( crdt_b.doc.items[0].stage.view(), JV::String("3_qa".to_string()) ); } // ── AC4: Queue overflow behaviour ───────────────────────────────────────── /// AC4: When the causal-order queue exceeds CAUSAL_QUEUE_MAX the oldest /// pending op is evicted (queue never grows beyond the cap). #[test] fn causal_queue_overflow_drops_oldest() { use bft_json_crdt::json_crdt::{BaseCrdt, CAUSAL_QUEUE_MAX, OpState}; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use crate::crdt_state::PipelineDoc; let kp = make_keypair(); // Build one "phantom" op that we'll claim as a dependency but never deliver. // We do this by creating it on a separate CRDT and never applying it. let mut source = BaseCrdt::::new(&kp); let phantom_item: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "507_phantom", "stage": "1_backlog", "name": "Phantom", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let phantom_op = source.doc.items.insert(ROOT_ID, phantom_item).sign(&kp); // Receiver never sees phantom_op, so any op declaring it as a dep will // sit in the causal queue forever (until evicted by overflow). let mut receiver = BaseCrdt::::new(&kp); source.apply(phantom_op.clone()); // Send CAUSAL_QUEUE_MAX + 5 stage-update ops all depending on phantom_op. // Each one will be queued because phantom_op is never delivered. let mut queued = 0usize; for i in 0..CAUSAL_QUEUE_MAX + 5 { let stage_name = format!("stage_{i}"); // Generate from source so seq numbers are valid. let op = source.doc.items[0] .stage .set(stage_name) .sign_with_dependencies(&kp, vec![&phantom_op]); source.apply(op.clone()); let r = receiver.apply(op); if r == OpState::MissingCausalDependencies { queued += 1; } } // We sent more than CAUSAL_QUEUE_MAX ops, but the queue must stay bounded. assert!( receiver.causal_queue_len() <= CAUSAL_QUEUE_MAX, "queue ({}) must not exceed CAUSAL_QUEUE_MAX ({CAUSAL_QUEUE_MAX})", receiver.causal_queue_len() ); assert!( queued > 0, "at least some ops must have been accepted into the queue" ); } // ── AC6: Convergence test ───────────────────────────────────────────────── /// AC6: Two CRDT instances generate interleaved ops on each side, simulate a /// network partition by withholding each other's ops, then exchange all /// buffered ops. Final state must be byte-identical on both nodes. #[test] fn convergence_after_partition_and_replay() { use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use crate::crdt_state::PipelineDoc; let kp_a = make_keypair(); let kp_b = make_keypair(); let mut crdt_a = BaseCrdt::::new(&kp_a); let mut crdt_b = BaseCrdt::::new(&kp_b); // ── Phase 1: A generates ops while partitioned from B ────────────── let item_a: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "507_convergence_a", "stage": "1_backlog", "name": "Story A", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op_a1 = crdt_a.doc.items.insert(ROOT_ID, item_a).sign(&kp_a); crdt_a.apply(op_a1.clone()); let op_a2 = crdt_a.doc.items[0] .stage .set("2_current".to_string()) .sign(&kp_a); crdt_a.apply(op_a2.clone()); // ── Phase 2: B generates ops while partitioned from A ────────────── let item_b: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "507_convergence_b", "stage": "1_backlog", "name": "Story B", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op_b1 = crdt_b.doc.items.insert(ROOT_ID, item_b).sign(&kp_b); crdt_b.apply(op_b1.clone()); let op_b2 = crdt_b.doc.items[0] .stage .set("2_current".to_string()) .sign(&kp_b); crdt_b.apply(op_b2.clone()); // ── Phase 3: Reconnect — both sides replay all buffered ops ──────── // A sends its ops to B. let r = crdt_b.apply(op_a1.clone()); assert!(r == OpState::Ok || r == OpState::AlreadySeen); let r = crdt_b.apply(op_a2.clone()); assert!(r == OpState::Ok || r == OpState::AlreadySeen); // B sends its ops to A. let r = crdt_a.apply(op_b1.clone()); assert!(r == OpState::Ok || r == OpState::AlreadySeen); let r = crdt_a.apply(op_b2.clone()); assert!(r == OpState::Ok || r == OpState::AlreadySeen); // ── Phase 4: Assert convergence ──────────────────────────────────── // Both nodes must have both stories. assert_eq!( crdt_a.doc.items.view().len(), 2, "A must have 2 items after convergence" ); assert_eq!( crdt_b.doc.items.view().len(), 2, "B must have 2 items after convergence" ); // Serialise both CRDT views to JSON and assert byte-identical. let view_a = serde_json::to_string(&CrdtNode::view(&crdt_a.doc.items)).unwrap(); let view_b = serde_json::to_string(&CrdtNode::view(&crdt_b.doc.items)).unwrap(); assert_eq!( view_a, view_b, "CRDT states must be byte-identical after convergence" ); // Spot-check: both stories are at 2_current on both nodes. let stories_a: Vec = crdt_a .doc .items .view() .iter() .filter_map(|item| { if let JV::Object(m) = CrdtNode::view(item) { m.get("story_id").and_then(|s| { if let JV::String(s) = s { Some(s.clone()) } else { None } }) } else { None } }) .collect(); assert!( stories_a.contains(&"507_convergence_a".to_string()), "A must contain story_a" ); assert!( stories_a.contains(&"507_convergence_b".to_string()), "A must contain story_b" ); } // ── AC8: peer lifecycle tests ───────────────────────────────────────────── /// AC8: A peer that connects and then receives a subsequently-applied op /// gets that op encoded via the wire codec (binary frame). #[test] fn peer_receives_op_encoded_via_wire_codec() { use bft_json_crdt::json_crdt::BaseCrdt; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use crate::crdt_state::PipelineDoc; use crate::crdt_wire; let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); let item: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "506_story_lifecycle_test", "stage": "1_backlog", "name": "Lifecycle Test", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); // Simulate what the broadcast handler does: encode via wire codec. let bytes = crdt_wire::encode(&op); // The bytes must be a versioned JSON envelope, not a SyncMessage wrapper. let text = std::str::from_utf8(&bytes).expect("wire output is valid UTF-8"); assert!( text.contains("\"v\":1"), "wire codec version tag must be present: {text}" ); assert!( !text.contains("\"type\":\"op\""), "must not be wrapped in SyncMessage: {text}" ); // The receiving peer can decode and apply the op. let decoded = crdt_wire::decode(&bytes).expect("decode must succeed"); assert_eq!(op, decoded); } /// AC8: Multiple connected peers all receive the same broadcast op. #[tokio::test] async fn multiple_peers_all_receive_broadcast_op() { use bft_json_crdt::json_crdt::BaseCrdt; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use tokio::sync::broadcast; use crate::crdt_state::PipelineDoc; use crate::crdt_wire; // Create a broadcast channel (analogous to SYNC_TX). let (tx, _) = broadcast::channel::(16); let mut rx_peer1 = tx.subscribe(); let mut rx_peer2 = tx.subscribe(); let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); let item: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "506_story_multi_peer_test", "stage": "1_backlog", "name": "Multi-Peer Test", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); // Broadcast one op. tx.send(op.clone()).expect("send must succeed"); // Both peers receive the same op. let received1 = rx_peer1.recv().await.expect("peer 1 must receive"); let received2 = rx_peer2.recv().await.expect("peer 2 must receive"); assert_eq!(received1, op); assert_eq!(received2, op); // Both encode identically via wire codec. let bytes1 = crdt_wire::encode(&received1); let bytes2 = crdt_wire::encode(&received2); assert_eq!(bytes1, bytes2, "wire-encoded bytes must be identical"); } /// AC8: A peer disconnecting mid-broadcast does not panic. /// Simulated by dropping the receiver before the sender sends an op. #[test] fn disconnected_peer_does_not_panic() { use bft_json_crdt::json_crdt::BaseCrdt; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use tokio::sync::broadcast; use crate::crdt_state::PipelineDoc; let (tx, rx) = broadcast::channel::(16); // Drop the receiver to simulate a peer that disconnected. drop(rx); let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); let item: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "506_story_disconnect_test", "stage": "1_backlog", "name": "Disconnect Test", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); // Sending to a channel with no receivers returns an error; must not panic. let _ = tx.send(op); } /// AC8: A lagged receiver gets a `Lagged` error (confirming the /// disconnect-on-overflow behaviour is reachable). #[tokio::test] async fn lagged_peer_gets_lagged_error() { use bft_json_crdt::json_crdt::BaseCrdt; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use tokio::sync::broadcast; use crate::crdt_state::PipelineDoc; // Tiny capacity so we can trigger Lagged easily. let (tx, mut rx) = broadcast::channel::(2); let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); let item: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "506_story_lag_test", "stage": "1_backlog", "name": "Lag Test", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); crdt.apply(op1.clone()); // Overflow the tiny buffer by sending more ops than the capacity. let op2 = crdt.doc.items[0] .stage .set("2_current".to_string()) .sign(&kp); crdt.apply(op2.clone()); let op3 = crdt.doc.items[0].stage.set("3_qa".to_string()).sign(&kp); crdt.apply(op3.clone()); let op4 = crdt.doc.items[0].stage.set("4_merge".to_string()).sign(&kp); crdt.apply(op4.clone()); // Send more ops than the channel capacity without consuming. let _ = tx.send(op1); let _ = tx.send(op2); let _ = tx.send(op3); let _ = tx.send(op4); // The slow peer should now see a Lagged error on next recv. // Consume until we hit Lagged or run out. let mut got_lagged = false; for _ in 0..10 { match rx.recv().await { Err(broadcast::error::RecvError::Lagged(_)) => { got_lagged = true; break; } Ok(_) => continue, Err(broadcast::error::RecvError::Closed) => break, } } assert!( got_lagged, "slow peer must receive a Lagged error when channel overflows" ); } // ── AC5 (story 508): E2E convergence test via real WebSocket ───────────── /// AC5: Spin up two in-process WebSocket nodes. Node A serves a /// `/crdt-sync`-compatible endpoint; Node B connects as a rendezvous client. /// Node A sends a bulk state; Node B applies it. Assert both nodes see the /// same items within a bounded time window. #[tokio::test] async fn e2e_convergence_two_websocket_nodes() { use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use futures::{SinkExt, StreamExt}; use serde_json::json; use std::sync::{Arc, Mutex}; use tokio::net::TcpListener; use tokio_tungstenite::tungstenite::Message as TMsg; use tokio_tungstenite::{accept_async, connect_async}; use crate::crdt_state::PipelineDoc; // ── Node A: build local state ────────────────────────────────────── let kp_a = make_keypair(); let mut crdt_a = BaseCrdt::::new(&kp_a); let item: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "508_e2e_convergence", "stage": "2_current", "name": "E2E Convergence Test", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); crdt_a.apply(op1.clone()); // Serialise A's full state as a bulk message. let op1_json = serde_json::to_string(&op1).unwrap(); let bulk_msg = SyncMessage::Bulk { ops: vec![op1_json], }; let bulk_wire = serde_json::to_string(&bulk_msg).unwrap(); // ── Start Node A's WebSocket server on a random port ─────────────── let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let bulk_to_send = bulk_wire.clone(); let received_by_a: Arc>> = Arc::new(Mutex::new(vec![])); let received_by_a_clone = received_by_a.clone(); tokio::spawn(async move { let (tcp_stream, _) = listener.accept().await.unwrap(); let ws_stream = accept_async(tcp_stream).await.unwrap(); let (mut sink, mut stream) = ws_stream.split(); // Send bulk state to the connecting peer. sink.send(TMsg::Text(bulk_to_send.into())).await.unwrap(); // Also listen for ops sent by the peer. if let Some(Ok(TMsg::Text(txt))) = stream.next().await { received_by_a_clone.lock().unwrap().push(txt.to_string()); } }); // ── Node B: connect to Node A and exchange state ─────────────────── let kp_b = make_keypair(); let mut crdt_b = BaseCrdt::::new(&kp_b); let url = format!("ws://{addr}"); let (ws_b, _) = connect_async(&url).await.unwrap(); let (mut sink_b, mut stream_b) = ws_b.split(); // Node B receives bulk from A. if let Some(Ok(TMsg::Text(txt))) = stream_b.next().await { let msg: SyncMessage = serde_json::from_str(txt.as_str()).unwrap(); match msg { SyncMessage::Bulk { ops } => { for op_str in &ops { let signed: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(op_str).unwrap(); let r = crdt_b.apply(signed); assert!(r == OpState::Ok || r == OpState::AlreadySeen); } } _ => panic!("Expected Bulk from Node A"), } } // Node B also creates a new op and sends it to A. let item_b: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "508_e2e_convergence_b", "stage": "1_backlog", "name": "E2E Convergence B", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op_b1 = crdt_b.doc.items.insert(ROOT_ID, item_b).sign(&kp_b); crdt_b.apply(op_b1.clone()); let op_b1_json = serde_json::to_string(&op_b1).unwrap(); let msg_to_a = SyncMessage::Op { op: op_b1_json }; sink_b .send(TMsg::Text(serde_json::to_string(&msg_to_a).unwrap().into())) .await .unwrap(); // Wait a moment for Node A to process. tokio::time::sleep(std::time::Duration::from_millis(50)).await; // ── Assert convergence ───────────────────────────────────────────── // Node B received Node A's item. assert_eq!( crdt_b.doc.items.view().len(), 2, "Node B must see both items after sync" ); let has_a_item = crdt_b .doc .items .view() .iter() .any(|item| item.story_id.view() == JV::String("508_e2e_convergence".to_string())); assert!(has_a_item, "Node B must have Node A's item"); // Node A received Node B's op via the WebSocket. let a_received = received_by_a.lock().unwrap(); assert!( !a_received.is_empty(), "Node A must have received an op from Node B" ); } // ── AC6 (story 508): Partition healing E2E via real WebSocket ───────────── /// AC6: Two nodes exchange ops, the connection is dropped (partition), each /// side mutates independently, then they reconnect and the reconnecting /// client sends a fresh bulk state. Assert both converge to the same final /// state. #[tokio::test] async fn e2e_partition_healing_websocket() { use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV}; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use futures::{SinkExt, StreamExt}; use serde_json::json; use tokio::net::TcpListener; use tokio_tungstenite::tungstenite::Message as TMsg; use tokio_tungstenite::{accept_async, connect_async}; use crate::crdt_state::PipelineDoc; // ── Phase 1: Both nodes start with op_a1 (before partition) ─────── let kp_a = make_keypair(); let kp_b = make_keypair(); let mut crdt_a = BaseCrdt::::new(&kp_a); let mut crdt_b = BaseCrdt::::new(&kp_b); let item_a: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "508_heal_a", "stage": "1_backlog", "name": "Heal A", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op_a1 = crdt_a.doc.items.insert(ROOT_ID, item_a).sign(&kp_a); crdt_a.apply(op_a1.clone()); // B also starts with op_a1 (shared state before partition). crdt_b.apply(op_a1.clone()); // ── Phase 2: Partition — each side mutates independently ────────── // A advances its story stage. let op_a2 = crdt_a.doc.items[0] .stage .set("2_current".to_string()) .sign(&kp_a); crdt_a.apply(op_a2.clone()); // B inserts a new story that A doesn't know about yet. let item_b: bft_json_crdt::json_crdt::JsonValue = json!({ "story_id": "508_heal_b", "stage": "1_backlog", "name": "Heal B", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op_b1 = crdt_b.doc.items.insert(ROOT_ID, item_b).sign(&kp_b); crdt_b.apply(op_b1.clone()); // Collect B's full state as bulk (what it will send on reconnect). let b_ops: Vec = [&op_a1, &op_b1] .iter() .map(|op| serde_json::to_string(op).unwrap()) .collect(); let b_bulk_wire = serde_json::to_string(&SyncMessage::Bulk { ops: b_ops }).unwrap(); // Collect A's full state as bulk (what it will send on reconnect). let a_ops: Vec = [&op_a1, &op_a2] .iter() .map(|op| serde_json::to_string(op).unwrap()) .collect(); let a_bulk_wire = serde_json::to_string(&SyncMessage::Bulk { ops: a_ops }).unwrap(); // ── Phase 3: Reconnect — use a real WebSocket to exchange bulk ──── let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let a_bulk_to_send = a_bulk_wire.clone(); let a_received_bulk: std::sync::Arc>> = std::sync::Arc::new(std::sync::Mutex::new(None)); let a_received_clone = a_received_bulk.clone(); tokio::spawn(async move { let (tcp, _) = listener.accept().await.unwrap(); let ws = accept_async(tcp).await.unwrap(); let (mut sink, mut stream) = ws.split(); // A sends its bulk state. sink.send(TMsg::Text(a_bulk_to_send.into())).await.unwrap(); // A receives B's bulk state. if let Some(Ok(TMsg::Text(txt))) = stream.next().await { *a_received_clone.lock().unwrap() = Some(txt.to_string()); } }); // B connects, exchanges bulk state. let (ws_b, _) = connect_async(format!("ws://{addr}")).await.unwrap(); let (mut sink_b, mut stream_b) = ws_b.split(); // B receives A's bulk and applies it. if let Some(Ok(TMsg::Text(txt))) = stream_b.next().await { let msg: SyncMessage = serde_json::from_str(txt.as_str()).unwrap(); if let SyncMessage::Bulk { ops } = msg { for op_str in &ops { let signed: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(op_str).unwrap(); let _ = crdt_b.apply(signed); } } } // B sends its bulk state to A. sink_b.send(TMsg::Text(b_bulk_wire.into())).await.unwrap(); tokio::time::sleep(std::time::Duration::from_millis(50)).await; // Apply A's received ops into crdt_a. if let Some(bulk_str) = a_received_bulk.lock().unwrap().take() { let msg: SyncMessage = serde_json::from_str(&bulk_str).unwrap(); if let SyncMessage::Bulk { ops } = msg { for op_str in &ops { let signed: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(op_str).unwrap(); let _ = crdt_a.apply(signed); } } } // ── Assert convergence ───────────────────────────────────────────── // Both nodes must have 2 items. assert_eq!( crdt_a.doc.items.view().len(), 2, "A must have 2 items after healing" ); assert_eq!( crdt_b.doc.items.view().len(), 2, "B must have 2 items after healing" ); // A must see B's story. let b_story_on_a = crdt_a .doc .items .view() .iter() .any(|item| item.story_id.view() == JV::String("508_heal_b".to_string())); assert!(b_story_on_a, "A must have B's story after healing"); // B must see A's stage advance. let a_story_on_b = crdt_b .doc .items .view() .iter() .any(|item| item.story_id.view() == JV::String("508_heal_a".to_string())); assert!(a_story_on_b, "B must have A's story after healing"); // CRDT views must be byte-identical (convergence). let view_a = serde_json::to_string(&CrdtNode::view(&crdt_a.doc.items)).unwrap(); let view_b = serde_json::to_string(&CrdtNode::view(&crdt_b.doc.items)).unwrap(); assert_eq!( view_a, view_b, "Both nodes must converge to identical state" ); } }