huskies: merge 794
This commit is contained in:
@@ -0,0 +1,127 @@
|
||||
//! [`BaseCrdt`] — the top-level causal-delivery wrapper around any [`CrdtNode`].
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use fastcrypto::ed25519::Ed25519KeyPair;
|
||||
use fastcrypto::traits::KeyPair;
|
||||
|
||||
use crate::debug::DebugView;
|
||||
use crate::keypair::SignedDigest;
|
||||
|
||||
use super::{CrdtNode, OpState, SignedOp, CAUSAL_QUEUE_MAX};
|
||||
|
||||
/// The base struct for a JSON CRDT. Allows for declaring causal
|
||||
/// dependencies across fields. It only accepts messages of [`SignedOp`] for BFT.
|
||||
pub struct BaseCrdt<T: CrdtNode> {
|
||||
/// Public key of this CRDT
|
||||
pub id: crate::keypair::AuthorId,
|
||||
|
||||
/// Internal base CRDT
|
||||
pub doc: T,
|
||||
|
||||
/// In a real world scenario, this would be a proper hash graph that allows for
|
||||
/// efficient reconciliation of missing dependencies. We naively keep a hash set
|
||||
/// of messages we've seen (represented by their [`SignedDigest`]).
|
||||
received: HashSet<SignedDigest>,
|
||||
message_q: HashMap<SignedDigest, Vec<SignedOp>>,
|
||||
|
||||
/// Total count of ops currently held in [`message_q`] waiting for their causal
|
||||
/// dependencies to be delivered. Used to enforce [`CAUSAL_QUEUE_MAX`].
|
||||
queue_len: usize,
|
||||
}
|
||||
|
||||
impl<T: CrdtNode + DebugView> BaseCrdt<T> {
|
||||
/// Create a new BaseCRDT of the given type. Multiple BaseCRDTs
|
||||
/// can be created from a single keypair but you are responsible for
|
||||
/// routing messages to the right BaseCRDT. Usually you should just make a single
|
||||
/// struct that contains all the state you need.
|
||||
pub fn new(keypair: &Ed25519KeyPair) -> Self {
|
||||
let id = keypair.public().0.to_bytes();
|
||||
Self {
|
||||
id,
|
||||
doc: T::new(id, vec![]),
|
||||
received: HashSet::new(),
|
||||
message_q: HashMap::new(),
|
||||
queue_len: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply a signed operation to this BaseCRDT, verifying integrity and routing to the right
|
||||
/// nested CRDT
|
||||
pub fn apply(&mut self, op: SignedOp) -> OpState {
|
||||
// self.log_try_apply(&op);
|
||||
|
||||
#[cfg(feature = "bft")]
|
||||
if !op.is_valid_digest() {
|
||||
self.debug_digest_failure(op);
|
||||
return OpState::ErrDigestMismatch;
|
||||
}
|
||||
|
||||
let op_id = op.signed_digest;
|
||||
|
||||
// Self-loop / dedup guard: if we have already processed this op (identified by
|
||||
// its signed_digest), return immediately without re-applying it. This prevents
|
||||
// echo loops where an op we broadcast to a peer comes back to us.
|
||||
if self.received.contains(&op_id) {
|
||||
return OpState::AlreadySeen;
|
||||
}
|
||||
|
||||
if !op.depends_on.is_empty() {
|
||||
for origin in &op.depends_on {
|
||||
if !self.received.contains(origin) {
|
||||
self.log_missing_causal_dep(origin);
|
||||
|
||||
// Bounded queue overflow: evict the oldest op from the largest
|
||||
// pending bucket before adding the new one. See CAUSAL_QUEUE_MAX.
|
||||
if self.queue_len >= CAUSAL_QUEUE_MAX {
|
||||
if let Some(bucket) = self.message_q.values_mut().max_by_key(|v| v.len()) {
|
||||
if !bucket.is_empty() {
|
||||
bucket.remove(0);
|
||||
self.queue_len = self.queue_len.saturating_sub(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.message_q.entry(*origin).or_default().push(op);
|
||||
self.queue_len += 1;
|
||||
return OpState::MissingCausalDependencies;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// apply
|
||||
// self.log_actually_apply(&op);
|
||||
let status = self.doc.apply(op.inner);
|
||||
// self.debug_view();
|
||||
|
||||
// Only record the op as seen when it applied successfully. If the op
|
||||
// was rejected (e.g. ErrHashMismatch from a tampered payload), we must
|
||||
// NOT add its signed_digest to `received`: a legitimate op that shares
|
||||
// the same signed_digest (e.g. the un-tampered original) would otherwise
|
||||
// be silently dropped as AlreadySeen.
|
||||
// Only mark as received and unblock dependents when the op was actually
|
||||
// applied. If we insert on error (e.g. ErrHashMismatch), a subsequent
|
||||
// apply of a *legitimate* op with the same signed_digest would be
|
||||
// silently dropped as AlreadySeen, preventing equivocation detection
|
||||
// from working correctly.
|
||||
if status == OpState::Ok {
|
||||
self.received.insert(op_id);
|
||||
|
||||
// apply all of its causal dependents if there are any
|
||||
let dependent_queue = self.message_q.remove(&op_id);
|
||||
if let Some(mut q) = dependent_queue {
|
||||
self.queue_len = self.queue_len.saturating_sub(q.len());
|
||||
for dependent in q.drain(..) {
|
||||
self.apply(dependent);
|
||||
}
|
||||
}
|
||||
}
|
||||
status
|
||||
}
|
||||
|
||||
/// Number of ops currently held in the causal-order queue waiting for their
|
||||
/// dependencies to be satisfied.
|
||||
pub fn causal_queue_len(&self) -> usize {
|
||||
self.queue_len
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user