From b88857c2e4096078766faee4089067bbbc00adda Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 10 Apr 2026 16:09:15 +0000 Subject: [PATCH] huskies: merge 507_story_apply_inbound_signedops_with_causal_order_queue_for_partition_recovery --- crates/bft-json-crdt/src/json_crdt.rs | 52 ++++ server/src/crdt_state.rs | 7 + server/src/crdt_sync.rs | 368 ++++++++++++++++++++++++++ 3 files changed, 427 insertions(+) diff --git a/crates/bft-json-crdt/src/json_crdt.rs b/crates/bft-json-crdt/src/json_crdt.rs index 638ba715..975a37ff 100644 --- a/crates/bft-json-crdt/src/json_crdt.rs +++ b/crates/bft-json-crdt/src/json_crdt.rs @@ -66,8 +66,23 @@ pub enum OpState { /// We have not received all of the causal dependencies of this operation. It has been queued /// up and will be executed when its causal dependencies have been delivered MissingCausalDependencies, + /// This op has already been applied (identified by its `signed_digest`). + /// The CRDT state is unchanged — this is a no-op (idempotent self-loop guard). + AlreadySeen, } +/// Maximum total number of ops that may sit in the causal-order hold queue at any +/// one time, summed across all pending dependency buckets. +/// +/// **Overflow policy: drop oldest.** +/// When the limit is reached, the oldest pending op in the largest dependency bucket +/// is silently evicted before the new op is queued. Rationale: a misbehaving or +/// heavily-partitioned peer can send ops whose causal ancestors never arrive, causing +/// unbounded memory growth. Dropping the oldest entry preserves the most recent +/// information and caps memory use. The peer can reconnect and receive a fresh bulk +/// state dump to recover any dropped ops. +pub const CAUSAL_QUEUE_MAX: usize = 256; + /// The following types can be used as a 'terminal' type in CRDTs pub trait MarkPrimitive: Into + Default {} impl MarkPrimitive for bool {} @@ -112,6 +127,10 @@ pub struct BaseCrdt { /// of messages we've seen (represented by their [`SignedDigest`]). received: HashSet, message_q: HashMap>, + + /// Total count of ops currently held in [`message_q`] waiting for their causal + /// dependencies to be delivered. Used to enforce [`CAUSAL_QUEUE_MAX`]. + queue_len: usize, } /// An [`Op`] with a few bits of extra metadata @@ -213,6 +232,7 @@ impl BaseCrdt { doc: T::new(id, vec![]), received: HashSet::new(), message_q: HashMap::new(), + queue_len: 0, } } @@ -228,11 +248,36 @@ impl BaseCrdt { } let op_id = op.signed_digest; + + // Self-loop / dedup guard: if we have already processed this op (identified by + // its signed_digest), return immediately without re-applying it. This prevents + // echo loops where an op we broadcast to a peer comes back to us. + if self.received.contains(&op_id) { + return OpState::AlreadySeen; + } + if !op.depends_on.is_empty() { for origin in &op.depends_on { if !self.received.contains(origin) { self.log_missing_causal_dep(origin); + + // Bounded queue overflow: evict the oldest op from the largest + // pending bucket before adding the new one. See CAUSAL_QUEUE_MAX. + if self.queue_len >= CAUSAL_QUEUE_MAX { + if let Some(bucket) = self + .message_q + .values_mut() + .max_by_key(|v| v.len()) + { + if !bucket.is_empty() { + bucket.remove(0); + self.queue_len = self.queue_len.saturating_sub(1); + } + } + } + self.message_q.entry(*origin).or_default().push(op); + self.queue_len += 1; return OpState::MissingCausalDependencies; } } @@ -247,12 +292,19 @@ impl BaseCrdt { // apply all of its causal dependents if there are any let dependent_queue = self.message_q.remove(&op_id); if let Some(mut q) = dependent_queue { + self.queue_len = self.queue_len.saturating_sub(q.len()); for dependent in q.drain(..) { self.apply(dependent); } } status } + + /// Number of ops currently held in the causal-order queue waiting for their + /// dependencies to be satisfied. + pub fn causal_queue_len(&self) -> usize { + self.queue_len + } } /// An enum representing a JSON value diff --git a/server/src/crdt_state.rs b/server/src/crdt_state.rs index 5fce6393..317ab5eb 100644 --- a/server/src/crdt_state.rs +++ b/server/src/crdt_state.rs @@ -461,6 +461,13 @@ pub fn apply_remote_op(op: SignedOp) -> bool { .collect(); let result = state.crdt.apply(op.clone()); + + // Self-loop guard: op was already applied (came back via echo from peer). + // Return false immediately — do not re-persist or re-add to ALL_OPS. + if result == bft_json_crdt::json_crdt::OpState::AlreadySeen { + return false; + } + if result != bft_json_crdt::json_crdt::OpState::Ok && result != bft_json_crdt::json_crdt::OpState::MissingCausalDependencies { diff --git a/server/src/crdt_sync.rs b/server/src/crdt_sync.rs index 54df4bfa..c4cde08a 100644 --- a/server/src/crdt_sync.rs +++ b/server/src/crdt_sync.rs @@ -559,6 +559,374 @@ name = "test" assert!(config.rendezvous.is_none()); } + // ── AC5: Self-loop dedup ────────────────────────────────────────────────── + + /// AC5: Applying the same SignedOp twice returns AlreadySeen on the second + /// call and leaves the CRDT state unchanged. + #[test] + fn self_loop_dedup_second_apply_is_noop() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "507_dedup_test", + "stage": "1_backlog", + "name": "Dedup Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + + let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + + // First apply: succeeds. + assert_eq!(crdt.apply(op.clone()), OpState::Ok); + assert_eq!(crdt.doc.items.view().len(), 1); + + // Second apply (self-loop): must be a no-op. + assert_eq!(crdt.apply(op.clone()), OpState::AlreadySeen); + + // State must not have changed. + assert_eq!(crdt.doc.items.view().len(), 1); + + // Stage update also deduplicated correctly. + let stage_op = crdt.doc.items[0].stage.set("2_current".to_string()).sign(&kp); + assert_eq!(crdt.apply(stage_op.clone()), OpState::Ok); + assert_eq!( + crdt.doc.items[0].stage.view(), + JV::String("2_current".to_string()) + ); + assert_eq!(crdt.apply(stage_op), OpState::AlreadySeen); + assert_eq!( + crdt.doc.items[0].stage.view(), + JV::String("2_current".to_string()), + "stage must not change on duplicate apply" + ); + } + + // ── AC3 & AC7: Out-of-order causal queueing ─────────────────────────────── + + /// AC3/AC7: An op whose causal dependency has not yet arrived is held in the + /// queue (returns MissingCausalDependencies). When the dependency arrives + /// the queued op is released and applied automatically. + #[test] + fn out_of_order_causal_queueing_releases_on_dep_arrival() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "507_causal_test", + "stage": "1_backlog", + "name": "Causal Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + + // op1 = insert item (no deps) + let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + + // op2 = set stage, declared to depend on op1 + // We must first apply op1 locally to generate op2 from the right state, + // then we'll test op2-before-op1 on a fresh CRDT. + crdt.apply(op1.clone()); + let op2 = crdt.doc.items[0] + .stage + .set("2_current".to_string()) + .sign_with_dependencies(&kp, vec![&op1]); + + // Create a fresh receiver CRDT. + let mut receiver = BaseCrdt::::new(&kp); + + // Apply op2 first — dependency (op1) has not arrived yet. + let r = receiver.apply(op2.clone()); + assert_eq!( + r, + OpState::MissingCausalDependencies, + "op2 must be queued when op1 has not arrived" + ); + // Queue length must reflect the held op. + assert_eq!(receiver.causal_queue_len(), 1); + + // Item has NOT been inserted yet (op1 not applied). + assert_eq!(receiver.doc.items.view().len(), 0); + + // Now deliver op1 — this should trigger op2 to be flushed automatically. + let r = receiver.apply(op1.clone()); + assert_eq!(r, OpState::Ok); + + // Both ops are now applied — item is present at stage 2_current. + assert_eq!(receiver.doc.items.view().len(), 1); + assert_eq!( + receiver.doc.items[0].stage.view(), + JV::String("2_current".to_string()), + "op2 must have been applied automatically after op1 arrived" + ); + + // Queue must be empty now. + assert_eq!(receiver.causal_queue_len(), 0); + } + + /// AC7: In-order apply works correctly (no causal queueing needed). + #[test] + fn in_order_apply_works_without_queueing() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + let kp = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp); + + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "507_inorder_test", + "stage": "1_backlog", + "name": "In-Order Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + + let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt_a.apply(op1.clone()); + let op2 = crdt_a.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp); + crdt_a.apply(op2.clone()); + let op3 = crdt_a.doc.items[0] + .stage + .set("3_qa".to_string()) + .sign(&kp); + crdt_a.apply(op3.clone()); + + // Receiver applies all ops in the correct order. + let mut crdt_b = BaseCrdt::::new(&kp); + assert_eq!(crdt_b.apply(op1), OpState::Ok); + assert_eq!(crdt_b.apply(op2), OpState::Ok); + assert_eq!(crdt_b.apply(op3), OpState::Ok); + assert_eq!(crdt_b.causal_queue_len(), 0); + assert_eq!( + crdt_b.doc.items[0].stage.view(), + JV::String("3_qa".to_string()) + ); + } + + // ── AC4: Queue overflow behaviour ───────────────────────────────────────── + + /// AC4: When the causal-order queue exceeds CAUSAL_QUEUE_MAX the oldest + /// pending op is evicted (queue never grows beyond the cap). + #[test] + fn causal_queue_overflow_drops_oldest() { + use bft_json_crdt::json_crdt::{BaseCrdt, OpState, CAUSAL_QUEUE_MAX}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + let kp = make_keypair(); + + // Build one "phantom" op that we'll claim as a dependency but never deliver. + // We do this by creating it on a separate CRDT and never applying it. + let mut source = BaseCrdt::::new(&kp); + let phantom_item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "507_phantom", + "stage": "1_backlog", + "name": "Phantom", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let phantom_op = source + .doc + .items + .insert(ROOT_ID, phantom_item) + .sign(&kp); + + // Receiver never sees phantom_op, so any op declaring it as a dep will + // sit in the causal queue forever (until evicted by overflow). + let mut receiver = BaseCrdt::::new(&kp); + source.apply(phantom_op.clone()); + + // Send CAUSAL_QUEUE_MAX + 5 stage-update ops all depending on phantom_op. + // Each one will be queued because phantom_op is never delivered. + let mut queued = 0usize; + for i in 0..CAUSAL_QUEUE_MAX + 5 { + let stage_name = format!("stage_{i}"); + // Generate from source so seq numbers are valid. + let op = source + .doc + .items[0] + .stage + .set(stage_name) + .sign_with_dependencies(&kp, vec![&phantom_op]); + source.apply(op.clone()); + let r = receiver.apply(op); + if r == OpState::MissingCausalDependencies { + queued += 1; + } + } + + // We sent more than CAUSAL_QUEUE_MAX ops, but the queue must stay bounded. + assert!( + receiver.causal_queue_len() <= CAUSAL_QUEUE_MAX, + "queue ({}) must not exceed CAUSAL_QUEUE_MAX ({CAUSAL_QUEUE_MAX})", + receiver.causal_queue_len() + ); + assert!( + queued > 0, + "at least some ops must have been accepted into the queue" + ); + } + + // ── AC6: Convergence test ───────────────────────────────────────────────── + + /// AC6: Two CRDT instances generate interleaved ops on each side, simulate a + /// network partition by withholding each other's ops, then exchange all + /// buffered ops. Final state must be byte-identical on both nodes. + #[test] + fn convergence_after_partition_and_replay() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + let kp_a = make_keypair(); + let kp_b = make_keypair(); + + let mut crdt_a = BaseCrdt::::new(&kp_a); + let mut crdt_b = BaseCrdt::::new(&kp_b); + + // ── Phase 1: A generates ops while partitioned from B ────────────── + + let item_a: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "507_convergence_a", + "stage": "1_backlog", + "name": "Story A", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op_a1 = crdt_a.doc.items.insert(ROOT_ID, item_a).sign(&kp_a); + crdt_a.apply(op_a1.clone()); + + let op_a2 = crdt_a.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp_a); + crdt_a.apply(op_a2.clone()); + + // ── Phase 2: B generates ops while partitioned from A ────────────── + + let item_b: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "507_convergence_b", + "stage": "1_backlog", + "name": "Story B", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op_b1 = crdt_b.doc.items.insert(ROOT_ID, item_b).sign(&kp_b); + crdt_b.apply(op_b1.clone()); + + let op_b2 = crdt_b.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp_b); + crdt_b.apply(op_b2.clone()); + + // ── Phase 3: Reconnect — both sides replay all buffered ops ──────── + + // A sends its ops to B. + let r = crdt_b.apply(op_a1.clone()); + assert!(r == OpState::Ok || r == OpState::AlreadySeen); + let r = crdt_b.apply(op_a2.clone()); + assert!(r == OpState::Ok || r == OpState::AlreadySeen); + + // B sends its ops to A. + let r = crdt_a.apply(op_b1.clone()); + assert!(r == OpState::Ok || r == OpState::AlreadySeen); + let r = crdt_a.apply(op_b2.clone()); + assert!(r == OpState::Ok || r == OpState::AlreadySeen); + + // ── Phase 4: Assert convergence ──────────────────────────────────── + + // Both nodes must have both stories. + assert_eq!( + crdt_a.doc.items.view().len(), + 2, + "A must have 2 items after convergence" + ); + assert_eq!( + crdt_b.doc.items.view().len(), + 2, + "B must have 2 items after convergence" + ); + + // Serialise both CRDT views to JSON and assert byte-identical. + let view_a = serde_json::to_string(&CrdtNode::view(&crdt_a.doc.items)).unwrap(); + let view_b = serde_json::to_string(&CrdtNode::view(&crdt_b.doc.items)).unwrap(); + assert_eq!( + view_a, view_b, + "CRDT states must be byte-identical after convergence" + ); + + // Spot-check: both stories are at 2_current on both nodes. + let stories_a: Vec = crdt_a + .doc + .items + .view() + .iter() + .filter_map(|item| { + if let JV::Object(m) = CrdtNode::view(item) { + m.get("story_id") + .and_then(|s| if let JV::String(s) = s { Some(s.clone()) } else { None }) + } else { + None + } + }) + .collect(); + assert!( + stories_a.contains(&"507_convergence_a".to_string()), + "A must contain story_a" + ); + assert!( + stories_a.contains(&"507_convergence_b".to_string()), + "A must contain story_b" + ); + } + // ── AC8: peer lifecycle tests ───────────────────────────────────────────── /// AC8: A peer that connects and then receives a subsequently-applied op