huskies: merge 507_story_apply_inbound_signedops_with_causal_order_queue_for_partition_recovery

This commit is contained in:
dave
2026-04-10 16:09:15 +00:00
parent 1ca9bc1bfd
commit b88857c2e4
3 changed files with 427 additions and 0 deletions
+52
View File
@@ -66,8 +66,23 @@ pub enum OpState {
/// We have not received all of the causal dependencies of this operation. It has been queued
/// up and will be executed when its causal dependencies have been delivered
MissingCausalDependencies,
/// This op has already been applied (identified by its `signed_digest`).
/// The CRDT state is unchanged — this is a no-op (idempotent self-loop guard).
AlreadySeen,
}
/// Maximum total number of ops that may sit in the causal-order hold queue at any
/// one time, summed across all pending dependency buckets.
///
/// **Overflow policy: drop oldest.**
/// When the limit is reached, the oldest pending op in the largest dependency bucket
/// is silently evicted before the new op is queued. Rationale: a misbehaving or
/// heavily-partitioned peer can send ops whose causal ancestors never arrive, causing
/// unbounded memory growth. Dropping the oldest entry preserves the most recent
/// information and caps memory use. The peer can reconnect and receive a fresh bulk
/// state dump to recover any dropped ops.
pub const CAUSAL_QUEUE_MAX: usize = 256;
/// The following types can be used as a 'terminal' type in CRDTs
pub trait MarkPrimitive: Into<JsonValue> + Default {}
impl MarkPrimitive for bool {}
@@ -112,6 +127,10 @@ pub struct BaseCrdt<T: CrdtNode> {
/// of messages we've seen (represented by their [`SignedDigest`]).
received: HashSet<SignedDigest>,
message_q: HashMap<SignedDigest, Vec<SignedOp>>,
/// Total count of ops currently held in [`message_q`] waiting for their causal
/// dependencies to be delivered. Used to enforce [`CAUSAL_QUEUE_MAX`].
queue_len: usize,
}
/// An [`Op<Value>`] with a few bits of extra metadata
@@ -213,6 +232,7 @@ impl<T: CrdtNode + DebugView> BaseCrdt<T> {
doc: T::new(id, vec![]),
received: HashSet::new(),
message_q: HashMap::new(),
queue_len: 0,
}
}
@@ -228,11 +248,36 @@ impl<T: CrdtNode + DebugView> BaseCrdt<T> {
}
let op_id = op.signed_digest;
// Self-loop / dedup guard: if we have already processed this op (identified by
// its signed_digest), return immediately without re-applying it. This prevents
// echo loops where an op we broadcast to a peer comes back to us.
if self.received.contains(&op_id) {
return OpState::AlreadySeen;
}
if !op.depends_on.is_empty() {
for origin in &op.depends_on {
if !self.received.contains(origin) {
self.log_missing_causal_dep(origin);
// Bounded queue overflow: evict the oldest op from the largest
// pending bucket before adding the new one. See CAUSAL_QUEUE_MAX.
if self.queue_len >= CAUSAL_QUEUE_MAX {
if let Some(bucket) = self
.message_q
.values_mut()
.max_by_key(|v| v.len())
{
if !bucket.is_empty() {
bucket.remove(0);
self.queue_len = self.queue_len.saturating_sub(1);
}
}
}
self.message_q.entry(*origin).or_default().push(op);
self.queue_len += 1;
return OpState::MissingCausalDependencies;
}
}
@@ -247,12 +292,19 @@ impl<T: CrdtNode + DebugView> BaseCrdt<T> {
// apply all of its causal dependents if there are any
let dependent_queue = self.message_q.remove(&op_id);
if let Some(mut q) = dependent_queue {
self.queue_len = self.queue_len.saturating_sub(q.len());
for dependent in q.drain(..) {
self.apply(dependent);
}
}
status
}
/// Number of ops currently held in the causal-order queue waiting for their
/// dependencies to be satisfied.
pub fn causal_queue_len(&self) -> usize {
self.queue_len
}
}
/// An enum representing a JSON value
+7
View File
@@ -461,6 +461,13 @@ pub fn apply_remote_op(op: SignedOp) -> bool {
.collect();
let result = state.crdt.apply(op.clone());
// Self-loop guard: op was already applied (came back via echo from peer).
// Return false immediately — do not re-persist or re-add to ALL_OPS.
if result == bft_json_crdt::json_crdt::OpState::AlreadySeen {
return false;
}
if result != bft_json_crdt::json_crdt::OpState::Ok
&& result != bft_json_crdt::json_crdt::OpState::MissingCausalDependencies
{
+368
View File
@@ -559,6 +559,374 @@ name = "test"
assert!(config.rendezvous.is_none());
}
// ── AC5: Self-loop dedup ──────────────────────────────────────────────────
/// AC5: Applying the same SignedOp twice returns AlreadySeen on the second
/// call and leaves the CRDT state unchanged.
#[test]
fn self_loop_dedup_second_apply_is_noop() {
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "507_dedup_test",
"stage": "1_backlog",
"name": "Dedup Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
// First apply: succeeds.
assert_eq!(crdt.apply(op.clone()), OpState::Ok);
assert_eq!(crdt.doc.items.view().len(), 1);
// Second apply (self-loop): must be a no-op.
assert_eq!(crdt.apply(op.clone()), OpState::AlreadySeen);
// State must not have changed.
assert_eq!(crdt.doc.items.view().len(), 1);
// Stage update also deduplicated correctly.
let stage_op = crdt.doc.items[0].stage.set("2_current".to_string()).sign(&kp);
assert_eq!(crdt.apply(stage_op.clone()), OpState::Ok);
assert_eq!(
crdt.doc.items[0].stage.view(),
JV::String("2_current".to_string())
);
assert_eq!(crdt.apply(stage_op), OpState::AlreadySeen);
assert_eq!(
crdt.doc.items[0].stage.view(),
JV::String("2_current".to_string()),
"stage must not change on duplicate apply"
);
}
// ── AC3 & AC7: Out-of-order causal queueing ───────────────────────────────
/// AC3/AC7: An op whose causal dependency has not yet arrived is held in the
/// queue (returns MissingCausalDependencies). When the dependency arrives
/// the queued op is released and applied automatically.
#[test]
fn out_of_order_causal_queueing_releases_on_dep_arrival() {
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "507_causal_test",
"stage": "1_backlog",
"name": "Causal Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
// op1 = insert item (no deps)
let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
// op2 = set stage, declared to depend on op1
// We must first apply op1 locally to generate op2 from the right state,
// then we'll test op2-before-op1 on a fresh CRDT.
crdt.apply(op1.clone());
let op2 = crdt.doc.items[0]
.stage
.set("2_current".to_string())
.sign_with_dependencies(&kp, vec![&op1]);
// Create a fresh receiver CRDT.
let mut receiver = BaseCrdt::<PipelineDoc>::new(&kp);
// Apply op2 first — dependency (op1) has not arrived yet.
let r = receiver.apply(op2.clone());
assert_eq!(
r,
OpState::MissingCausalDependencies,
"op2 must be queued when op1 has not arrived"
);
// Queue length must reflect the held op.
assert_eq!(receiver.causal_queue_len(), 1);
// Item has NOT been inserted yet (op1 not applied).
assert_eq!(receiver.doc.items.view().len(), 0);
// Now deliver op1 — this should trigger op2 to be flushed automatically.
let r = receiver.apply(op1.clone());
assert_eq!(r, OpState::Ok);
// Both ops are now applied — item is present at stage 2_current.
assert_eq!(receiver.doc.items.view().len(), 1);
assert_eq!(
receiver.doc.items[0].stage.view(),
JV::String("2_current".to_string()),
"op2 must have been applied automatically after op1 arrived"
);
// Queue must be empty now.
assert_eq!(receiver.causal_queue_len(), 0);
}
/// AC7: In-order apply works correctly (no causal queueing needed).
#[test]
fn in_order_apply_works_without_queueing() {
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
let kp = make_keypair();
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp);
let item: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "507_inorder_test",
"stage": "1_backlog",
"name": "In-Order Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt_a.apply(op1.clone());
let op2 = crdt_a.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp);
crdt_a.apply(op2.clone());
let op3 = crdt_a.doc.items[0]
.stage
.set("3_qa".to_string())
.sign(&kp);
crdt_a.apply(op3.clone());
// Receiver applies all ops in the correct order.
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp);
assert_eq!(crdt_b.apply(op1), OpState::Ok);
assert_eq!(crdt_b.apply(op2), OpState::Ok);
assert_eq!(crdt_b.apply(op3), OpState::Ok);
assert_eq!(crdt_b.causal_queue_len(), 0);
assert_eq!(
crdt_b.doc.items[0].stage.view(),
JV::String("3_qa".to_string())
);
}
// ── AC4: Queue overflow behaviour ─────────────────────────────────────────
/// AC4: When the causal-order queue exceeds CAUSAL_QUEUE_MAX the oldest
/// pending op is evicted (queue never grows beyond the cap).
#[test]
fn causal_queue_overflow_drops_oldest() {
use bft_json_crdt::json_crdt::{BaseCrdt, OpState, CAUSAL_QUEUE_MAX};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
let kp = make_keypair();
// Build one "phantom" op that we'll claim as a dependency but never deliver.
// We do this by creating it on a separate CRDT and never applying it.
let mut source = BaseCrdt::<PipelineDoc>::new(&kp);
let phantom_item: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "507_phantom",
"stage": "1_backlog",
"name": "Phantom",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let phantom_op = source
.doc
.items
.insert(ROOT_ID, phantom_item)
.sign(&kp);
// Receiver never sees phantom_op, so any op declaring it as a dep will
// sit in the causal queue forever (until evicted by overflow).
let mut receiver = BaseCrdt::<PipelineDoc>::new(&kp);
source.apply(phantom_op.clone());
// Send CAUSAL_QUEUE_MAX + 5 stage-update ops all depending on phantom_op.
// Each one will be queued because phantom_op is never delivered.
let mut queued = 0usize;
for i in 0..CAUSAL_QUEUE_MAX + 5 {
let stage_name = format!("stage_{i}");
// Generate from source so seq numbers are valid.
let op = source
.doc
.items[0]
.stage
.set(stage_name)
.sign_with_dependencies(&kp, vec![&phantom_op]);
source.apply(op.clone());
let r = receiver.apply(op);
if r == OpState::MissingCausalDependencies {
queued += 1;
}
}
// We sent more than CAUSAL_QUEUE_MAX ops, but the queue must stay bounded.
assert!(
receiver.causal_queue_len() <= CAUSAL_QUEUE_MAX,
"queue ({}) must not exceed CAUSAL_QUEUE_MAX ({CAUSAL_QUEUE_MAX})",
receiver.causal_queue_len()
);
assert!(
queued > 0,
"at least some ops must have been accepted into the queue"
);
}
// ── AC6: Convergence test ─────────────────────────────────────────────────
/// AC6: Two CRDT instances generate interleaved ops on each side, simulate a
/// network partition by withholding each other's ops, then exchange all
/// buffered ops. Final state must be byte-identical on both nodes.
#[test]
fn convergence_after_partition_and_replay() {
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
let kp_a = make_keypair();
let kp_b = make_keypair();
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
// ── Phase 1: A generates ops while partitioned from B ──────────────
let item_a: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "507_convergence_a",
"stage": "1_backlog",
"name": "Story A",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op_a1 = crdt_a.doc.items.insert(ROOT_ID, item_a).sign(&kp_a);
crdt_a.apply(op_a1.clone());
let op_a2 = crdt_a.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp_a);
crdt_a.apply(op_a2.clone());
// ── Phase 2: B generates ops while partitioned from A ──────────────
let item_b: bft_json_crdt::json_crdt::JsonValue = json!({
"story_id": "507_convergence_b",
"stage": "1_backlog",
"name": "Story B",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op_b1 = crdt_b.doc.items.insert(ROOT_ID, item_b).sign(&kp_b);
crdt_b.apply(op_b1.clone());
let op_b2 = crdt_b.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp_b);
crdt_b.apply(op_b2.clone());
// ── Phase 3: Reconnect — both sides replay all buffered ops ────────
// A sends its ops to B.
let r = crdt_b.apply(op_a1.clone());
assert!(r == OpState::Ok || r == OpState::AlreadySeen);
let r = crdt_b.apply(op_a2.clone());
assert!(r == OpState::Ok || r == OpState::AlreadySeen);
// B sends its ops to A.
let r = crdt_a.apply(op_b1.clone());
assert!(r == OpState::Ok || r == OpState::AlreadySeen);
let r = crdt_a.apply(op_b2.clone());
assert!(r == OpState::Ok || r == OpState::AlreadySeen);
// ── Phase 4: Assert convergence ────────────────────────────────────
// Both nodes must have both stories.
assert_eq!(
crdt_a.doc.items.view().len(),
2,
"A must have 2 items after convergence"
);
assert_eq!(
crdt_b.doc.items.view().len(),
2,
"B must have 2 items after convergence"
);
// Serialise both CRDT views to JSON and assert byte-identical.
let view_a = serde_json::to_string(&CrdtNode::view(&crdt_a.doc.items)).unwrap();
let view_b = serde_json::to_string(&CrdtNode::view(&crdt_b.doc.items)).unwrap();
assert_eq!(
view_a, view_b,
"CRDT states must be byte-identical after convergence"
);
// Spot-check: both stories are at 2_current on both nodes.
let stories_a: Vec<String> = crdt_a
.doc
.items
.view()
.iter()
.filter_map(|item| {
if let JV::Object(m) = CrdtNode::view(item) {
m.get("story_id")
.and_then(|s| if let JV::String(s) = s { Some(s.clone()) } else { None })
} else {
None
}
})
.collect();
assert!(
stories_a.contains(&"507_convergence_a".to_string()),
"A must contain story_a"
);
assert!(
stories_a.contains(&"507_convergence_b".to_string()),
"A must contain story_b"
);
}
// ── AC8: peer lifecycle tests ─────────────────────────────────────────────
/// AC8: A peer that connects and then receives a subsequently-applied op