diff --git a/crates/bft-json-crdt/src/json_crdt.rs b/crates/bft-json-crdt/src/json_crdt.rs deleted file mode 100644 index e51f6166..00000000 --- a/crates/bft-json-crdt/src/json_crdt.rs +++ /dev/null @@ -1,947 +0,0 @@ -use std::{ - collections::{HashMap, HashSet}, - fmt::Display, -}; - -use crate::{ - debug::{debug_op_on_primitive, DebugView}, - keypair::{sha256, sign, AuthorId, SignedDigest}, - list_crdt::ListCrdt, - lww_crdt::LwwRegisterCrdt, - op::{print_hex, print_path, Hashable, Op, OpId, PathSegment}, -}; -pub use bft_crdt_derive::*; -use fastcrypto::traits::VerifyingKey; -use fastcrypto::{ - ed25519::{Ed25519KeyPair, Ed25519PublicKey, Ed25519Signature}, - traits::{KeyPair, ToFromBytes}, - // Verifier, -}; -// TODO: serde's json object serialization and deserialization (correctly) do not define anything -// object field order in JSON objects. However, the hash check impl in bft-json-bft-crdt does take order -// into account. This is going to cause problems later for non-Rust implementations, BFT hash checking -// currently depends on JSON serialization/deserialization object order. This shouldn't be the case -// but I've hacked in an IndexMap for the moment to get the PoC working. To see the problem, replace this with -// a std HashMap, everything will screw up (annoyingly, only *most* of the time). -use indexmap::IndexMap; -use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, Bytes}; - -/// Anything that can be nested in a JSON CRDT -pub trait CrdtNode: CrdtNodeFromValue + Hashable + Clone { - /// Create a new CRDT of this type - fn new(id: AuthorId, path: Vec) -> Self; - /// Apply an operation to this CRDT, forwarding if necessary - fn apply(&mut self, op: Op) -> OpState; - /// Get a JSON representation of the value in this node - fn view(&self) -> JsonValue; -} - -/// Enum representing possible outcomes of applying an operation to a CRDT -#[derive(Debug, PartialEq)] -pub enum OpState { - /// Operation applied successfully - Ok, - /// Tried to apply an operation to a non-CRDT primitive (i.e. f64, bool, etc.) - /// If you would like a mutable primitive, wrap it in a [`LWWRegisterCRDT`] - ErrApplyOnPrimitive, - /// Tried to apply an operation to a static struct CRDT - /// If you would like a mutable object, use a [`Value`] - ErrApplyOnStruct, - /// Tried to apply an operation that contains content of the wrong type. - /// In other words, the content cannot be coerced to the CRDT at the path specified. - ErrMismatchedType, - /// The signed digest of the message did not match the claimed author of the message. - /// This can happen if the message was tampered with during delivery - ErrDigestMismatch, - /// The hash of the message did not match the contents of the message. - /// This can happen if the author tried to perform an equivocation attack by creating an - /// operation and modifying it has already been created - ErrHashMismatch, - /// Tried to apply an operation to a non-existent path. The author may have forgotten to attach - /// a causal dependency - ErrPathMismatch, - /// Trying to modify/delete the sentinel (zero-th) node element that is used for book-keeping - ErrListApplyToEmpty, - /// We have not received all of the causal dependencies of this operation. It has been queued - /// up and will be executed when its causal dependencies have been delivered - MissingCausalDependencies, - /// This op has already been applied (identified by its `signed_digest`). - /// The CRDT state is unchanged — this is a no-op (idempotent self-loop guard). - AlreadySeen, -} - -/// Maximum total number of ops that may sit in the causal-order hold queue at any -/// one time, summed across all pending dependency buckets. -/// -/// **Overflow policy: drop oldest.** -/// When the limit is reached, the oldest pending op in the largest dependency bucket -/// is silently evicted before the new op is queued. Rationale: a misbehaving or -/// heavily-partitioned peer can send ops whose causal ancestors never arrive, causing -/// unbounded memory growth. Dropping the oldest entry preserves the most recent -/// information and caps memory use. The peer can reconnect and receive a fresh bulk -/// state dump to recover any dropped ops. -pub const CAUSAL_QUEUE_MAX: usize = 256; - -/// The following types can be used as a 'terminal' type in CRDTs -pub trait MarkPrimitive: Into + Default {} -impl MarkPrimitive for bool {} -impl MarkPrimitive for i32 {} -impl MarkPrimitive for i64 {} -impl MarkPrimitive for f64 {} -impl MarkPrimitive for char {} -impl MarkPrimitive for String {} -impl MarkPrimitive for JsonValue {} - -/// Implement CrdtNode for non-CRDTs -/// This is a stub implementation so most functions don't do anything/log an error -impl CrdtNode for T -where - T: CrdtNodeFromValue + MarkPrimitive + Hashable + Clone, -{ - fn apply(&mut self, _op: Op) -> OpState { - OpState::ErrApplyOnPrimitive - } - - fn view(&self) -> JsonValue { - self.to_owned().into() - } - - fn new(_id: AuthorId, _path: Vec) -> Self { - debug_op_on_primitive(_path); - Default::default() - } -} - -/// The base struct for a JSON CRDT. Allows for declaring causal -/// dependencies across fields. It only accepts messages of [`SignedOp`] for BFT. -pub struct BaseCrdt { - /// Public key of this CRDT - pub id: AuthorId, - - /// Internal base CRDT - pub doc: T, - - /// In a real world scenario, this would be a proper hash graph that allows for - /// efficient reconciliation of missing dependencies. We naively keep a hash set - /// of messages we've seen (represented by their [`SignedDigest`]). - received: HashSet, - message_q: HashMap>, - - /// Total count of ops currently held in [`message_q`] waiting for their causal - /// dependencies to be delivered. Used to enforce [`CAUSAL_QUEUE_MAX`]. - queue_len: usize, -} - -/// An [`Op`] with a few bits of extra metadata -#[serde_as] -#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] -pub struct SignedOp { - // Note that this can be different from the author of the inner op as the inner op could have been created - // by a different person - author: AuthorId, - /// Signed hash using priv key of author. Effectively [`OpID`] Use this as the ID to figure out what has been delivered already - #[serde_as(as = "Bytes")] - pub signed_digest: SignedDigest, - pub inner: Op, - /// List of causal dependencies - #[serde_as(as = "Vec")] - pub depends_on: Vec, -} - -impl SignedOp { - pub fn id(&self) -> OpId { - self.inner.id - } - - pub fn author(&self) -> AuthorId { - self.author - } - - /// Creates a digest of the following fields. Any changes in the fields will change the signed digest - /// - id (hash of the following) - /// - origin - /// - author - /// - seq - /// - is_deleted - /// - path - /// - dependencies - fn digest(&self) -> [u8; 32] { - let path_string = print_path(self.inner.path.clone()); - let dependency_string = self - .depends_on - .iter() - .map(print_hex) - .collect::>() - .join(""); - let fmt_str = format!("{:?},{path_string},{dependency_string}", self.id()); - sha256(fmt_str) - } - - /// Sign this digest with the given keypair. Shouldn't need to be called manually, - /// just use [`SignedOp::from_op`] instead - fn sign_digest(&mut self, keypair: &Ed25519KeyPair) { - self.signed_digest = sign(keypair, &self.digest()).sig.to_bytes() - } - - /// Ensure digest was actually signed by the author it claims to be signed by - pub fn is_valid_digest(&self) -> bool { - let digest = Ed25519Signature::from_bytes(&self.signed_digest); - let pubkey = Ed25519PublicKey::from_bytes(&self.author()); - match (digest, pubkey) { - (Ok(digest), Ok(pubkey)) => pubkey.verify(&self.digest(), &digest).is_ok(), - (_, _) => false, - } - } - - /// Sign a normal op and add all the needed metadata - pub fn from_op( - value: Op, - keypair: &Ed25519KeyPair, - depends_on: Vec, - ) -> Self { - let author = keypair.public().0.to_bytes(); - let mut new = Self { - inner: Op { - content: value.content.map(|c| c.view()), - origin: value.origin, - author: value.author, - seq: value.seq, - path: value.path, - is_deleted: value.is_deleted, - id: value.id, - }, - author, - signed_digest: [0u8; 64], - depends_on, - }; - new.sign_digest(keypair); - new - } -} - -impl BaseCrdt { - /// Create a new BaseCRDT of the given type. Multiple BaseCRDTs - /// can be created from a single keypair but you are responsible for - /// routing messages to the right BaseCRDT. Usually you should just make a single - /// struct that contains all the state you need. - pub fn new(keypair: &Ed25519KeyPair) -> Self { - let id = keypair.public().0.to_bytes(); - Self { - id, - doc: T::new(id, vec![]), - received: HashSet::new(), - message_q: HashMap::new(), - queue_len: 0, - } - } - - /// Apply a signed operation to this BaseCRDT, verifying integrity and routing to the right - /// nested CRDT - pub fn apply(&mut self, op: SignedOp) -> OpState { - // self.log_try_apply(&op); - - #[cfg(feature = "bft")] - if !op.is_valid_digest() { - self.debug_digest_failure(op); - return OpState::ErrDigestMismatch; - } - - let op_id = op.signed_digest; - - // Self-loop / dedup guard: if we have already processed this op (identified by - // its signed_digest), return immediately without re-applying it. This prevents - // echo loops where an op we broadcast to a peer comes back to us. - if self.received.contains(&op_id) { - return OpState::AlreadySeen; - } - - if !op.depends_on.is_empty() { - for origin in &op.depends_on { - if !self.received.contains(origin) { - self.log_missing_causal_dep(origin); - - // Bounded queue overflow: evict the oldest op from the largest - // pending bucket before adding the new one. See CAUSAL_QUEUE_MAX. - if self.queue_len >= CAUSAL_QUEUE_MAX { - if let Some(bucket) = self.message_q.values_mut().max_by_key(|v| v.len()) { - if !bucket.is_empty() { - bucket.remove(0); - self.queue_len = self.queue_len.saturating_sub(1); - } - } - } - - self.message_q.entry(*origin).or_default().push(op); - self.queue_len += 1; - return OpState::MissingCausalDependencies; - } - } - } - - // apply - // self.log_actually_apply(&op); - let status = self.doc.apply(op.inner); - // self.debug_view(); - - // Only record the op as seen when it applied successfully. If the op - // was rejected (e.g. ErrHashMismatch from a tampered payload), we must - // NOT add its signed_digest to `received`: a legitimate op that shares - // the same signed_digest (e.g. the un-tampered original) would otherwise - // be silently dropped as AlreadySeen. - // Only mark as received and unblock dependents when the op was actually - // applied. If we insert on error (e.g. ErrHashMismatch), a subsequent - // apply of a *legitimate* op with the same signed_digest would be - // silently dropped as AlreadySeen, preventing equivocation detection - // from working correctly. - if status == OpState::Ok { - self.received.insert(op_id); - - // apply all of its causal dependents if there are any - let dependent_queue = self.message_q.remove(&op_id); - if let Some(mut q) = dependent_queue { - self.queue_len = self.queue_len.saturating_sub(q.len()); - for dependent in q.drain(..) { - self.apply(dependent); - } - } - } - status - } - - /// Number of ops currently held in the causal-order queue waiting for their - /// dependencies to be satisfied. - pub fn causal_queue_len(&self) -> usize { - self.queue_len - } -} - -/// An enum representing a JSON value -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum JsonValue { - Null, - Bool(bool), - Number(f64), - String(String), - Array(Vec), - Object(IndexMap), -} - -impl Display for JsonValue { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}", - match self { - JsonValue::Null => "null".to_string(), - JsonValue::Bool(b) => b.to_string(), - JsonValue::Number(n) => n.to_string(), - JsonValue::String(s) => format!("\"{s}\""), - JsonValue::Array(arr) => { - if arr.len() > 1 { - format!( - "[\n{}\n]", - arr.iter() - .map(|x| format!(" {x}")) - .collect::>() - .join(",\n") - ) - } else { - format!( - "[ {} ]", - arr.iter() - .map(|x| x.to_string()) - .collect::>() - .join(", ") - ) - } - } - JsonValue::Object(obj) => format!( - "{{ {} }}", - obj.iter() - .map(|(k, v)| format!(" \"{k}\": {v}")) - .collect::>() - .join(",\n") - ), - } - ) - } -} - -impl Default for JsonValue { - fn default() -> Self { - Self::Null - } -} - -/// Allow easy conversion to and from serde's JSON format. This allows us to use the [`json!`] -/// macro -impl From for serde_json::Value { - fn from(value: JsonValue) -> Self { - match value { - JsonValue::Null => serde_json::Value::Null, - JsonValue::Bool(x) => serde_json::Value::Bool(x), - JsonValue::Number(x) => { - serde_json::Value::Number(serde_json::Number::from_f64(x).unwrap()) - } - JsonValue::String(x) => serde_json::Value::String(x), - JsonValue::Array(x) => { - serde_json::Value::Array(x.iter().map(|a| a.clone().into()).collect()) - } - JsonValue::Object(x) => serde_json::Value::Object( - x.iter() - .map(|(k, v)| (k.clone(), v.clone().into())) - .collect(), - ), - } - } -} - -impl From for JsonValue { - fn from(value: serde_json::Value) -> Self { - match value { - serde_json::Value::Null => JsonValue::Null, - serde_json::Value::Bool(x) => JsonValue::Bool(x), - serde_json::Value::Number(x) => JsonValue::Number(x.as_f64().unwrap()), - serde_json::Value::String(x) => JsonValue::String(x), - serde_json::Value::Array(x) => { - JsonValue::Array(x.iter().map(|a| a.clone().into()).collect()) - } - serde_json::Value::Object(x) => JsonValue::Object( - x.iter() - .map(|(k, v)| (k.clone(), v.clone().into())) - .collect(), - ), - } - } -} - -impl JsonValue { - pub fn into_json(self) -> serde_json::Value { - self.into() - } -} - -/// Conversions from primitive types to [`JsonValue`] -impl From for JsonValue { - fn from(val: bool) -> Self { - JsonValue::Bool(val) - } -} - -impl From for JsonValue { - fn from(val: i64) -> Self { - JsonValue::Number(val as f64) - } -} - -impl From for JsonValue { - fn from(val: i32) -> Self { - JsonValue::Number(val as f64) - } -} - -impl From for JsonValue { - fn from(val: f64) -> Self { - JsonValue::Number(val) - } -} - -impl From for JsonValue { - fn from(val: String) -> Self { - JsonValue::String(val) - } -} - -impl From for JsonValue { - fn from(val: char) -> Self { - JsonValue::String(val.into()) - } -} - -impl From> for JsonValue -where - T: CrdtNode, -{ - fn from(val: Option) -> Self { - match val { - Some(x) => x.view(), - None => JsonValue::Null, - } - } -} - -impl From> for JsonValue -where - T: CrdtNode, -{ - fn from(value: Vec) -> Self { - JsonValue::Array(value.iter().map(|x| x.view()).collect()) - } -} - -/// Fallibly create a CRDT Node from a JSON Value -pub trait CrdtNodeFromValue: Sized { - fn node_from(value: JsonValue, id: AuthorId, path: Vec) -> Result; -} - -/// Fallibly cast a JSON Value into a CRDT Node -pub trait IntoCrdtNode: Sized { - fn into_node(self, id: AuthorId, path: Vec) -> Result; -} - -/// [`CrdtNodeFromValue`] implies [`IntoCrdtNode`] -impl IntoCrdtNode for JsonValue -where - T: CrdtNodeFromValue, -{ - fn into_node(self, id: AuthorId, path: Vec) -> Result { - T::node_from(self, id, path) - } -} - -/// Trivial conversion from [`JsonValue`] to [`JsonValue`] as [`CrdtNodeFromValue`] -impl CrdtNodeFromValue for JsonValue { - fn node_from(value: JsonValue, _id: AuthorId, _path: Vec) -> Result { - Ok(value) - } -} - -/// Conversions from bool to CRDT -impl CrdtNodeFromValue for bool { - fn node_from(value: JsonValue, _id: AuthorId, _path: Vec) -> Result { - if let JsonValue::Bool(x) = value { - Ok(x) - } else { - Err(format!("failed to convert {value:?} -> bool")) - } - } -} - -/// Conversions from f64 to CRDT -impl CrdtNodeFromValue for f64 { - fn node_from(value: JsonValue, _id: AuthorId, _path: Vec) -> Result { - if let JsonValue::Number(x) = value { - Ok(x) - } else { - Err(format!("failed to convert {value:?} -> f64")) - } - } -} - -/// Conversions from i64 to CRDT -impl CrdtNodeFromValue for i64 { - fn node_from(value: JsonValue, _id: AuthorId, _path: Vec) -> Result { - if let JsonValue::Number(x) = value { - Ok(x as i64) - } else { - Err(format!("failed to convert {value:?} -> f64")) - } - } -} - -/// Conversions from String to CRDT -impl CrdtNodeFromValue for String { - fn node_from(value: JsonValue, _id: AuthorId, _path: Vec) -> Result { - if let JsonValue::String(x) = value { - Ok(x) - } else { - Err(format!("failed to convert {value:?} -> String")) - } - } -} - -/// Conversions from char to CRDT -impl CrdtNodeFromValue for char { - fn node_from(value: JsonValue, _id: AuthorId, _path: Vec) -> Result { - if let JsonValue::String(x) = value.clone() { - x.chars().next().ok_or(format!( - "failed to convert {value:?} -> char: found a zero-length string" - )) - } else { - Err(format!("failed to convert {value:?} -> char")) - } - } -} - -impl CrdtNodeFromValue for LwwRegisterCrdt -where - T: CrdtNode, -{ - fn node_from(value: JsonValue, id: AuthorId, path: Vec) -> Result { - let mut crdt = LwwRegisterCrdt::new(id, path); - crdt.set(value); - Ok(crdt) - } -} - -impl CrdtNodeFromValue for ListCrdt -where - T: CrdtNode, -{ - fn node_from(value: JsonValue, id: AuthorId, path: Vec) -> Result { - if let JsonValue::Array(arr) = value { - let mut crdt = ListCrdt::new(id, path); - let result: Result<(), String> = - arr.into_iter().enumerate().try_for_each(|(i, val)| { - crdt.insert_idx(i, val); - Ok(()) - }); - result?; - Ok(crdt) - } else { - Err(format!("failed to convert {value:?} -> ListCRDT")) - } - } -} - -#[cfg(test)] -mod test { - use serde_json::json; - - use crate::{ - json_crdt::{add_crdt_fields, BaseCrdt, CrdtNode, IntoCrdtNode, JsonValue, OpState}, - keypair::make_keypair, - list_crdt::ListCrdt, - lww_crdt::LwwRegisterCrdt, - op::{print_path, ROOT_ID}, - }; - - #[test] - fn test_derive_basic() { - #[add_crdt_fields] - #[derive(Clone, CrdtNode, Debug)] - struct Player { - x: LwwRegisterCrdt, - y: LwwRegisterCrdt, - } - - let keypair = make_keypair(); - let crdt = BaseCrdt::::new(&keypair); - assert_eq!(print_path(crdt.doc.x.path), "x"); - assert_eq!(print_path(crdt.doc.y.path), "y"); - } - - #[test] - fn test_derive_nested() { - #[add_crdt_fields] - #[derive(Clone, CrdtNode, Debug)] - struct Position { - x: LwwRegisterCrdt, - y: LwwRegisterCrdt, - } - - #[add_crdt_fields] - #[derive(Clone, CrdtNode, Debug)] - struct Player { - pos: Position, - balance: LwwRegisterCrdt, - messages: ListCrdt, - } - - let keypair = make_keypair(); - let crdt = BaseCrdt::::new(&keypair); - assert_eq!(print_path(crdt.doc.pos.x.path), "pos.x"); - assert_eq!(print_path(crdt.doc.pos.y.path), "pos.y"); - assert_eq!(print_path(crdt.doc.balance.path), "balance"); - assert_eq!(print_path(crdt.doc.messages.path), "messages"); - } - - #[test] - fn test_lww_ops() { - #[add_crdt_fields] - #[derive(Clone, CrdtNode, Debug)] - struct Test { - a: LwwRegisterCrdt, - b: LwwRegisterCrdt, - c: LwwRegisterCrdt, - } - - let kp1 = make_keypair(); - let kp2 = make_keypair(); - let mut base1 = BaseCrdt::::new(&kp1); - let mut base2 = BaseCrdt::::new(&kp2); - - let _1_a_1 = base1.doc.a.set(3.0).sign(&kp1); - let _1_b_1 = base1.doc.b.set(true).sign(&kp1); - let _2_a_1 = base2.doc.a.set(1.5).sign(&kp2); - let _2_a_2 = base2.doc.a.set(2.13).sign(&kp2); - let _2_c_1 = base2.doc.c.set("abc".to_string()).sign(&kp2); - - assert_eq!(base1.doc.a.view(), json!(3.0).into()); - assert_eq!(base2.doc.a.view(), json!(2.13).into()); - assert_eq!(base1.doc.b.view(), json!(true).into()); - assert_eq!(base2.doc.c.view(), json!("abc").into()); - - assert_eq!( - base1.doc.view().into_json(), - json!({ - "a": 3.0, - "b": true, - "c": null, - }) - ); - assert_eq!( - base2.doc.view().into_json(), - json!({ - "a": 2.13, - "b": null, - "c": "abc", - }) - ); - - assert_eq!(base2.apply(_1_a_1), OpState::Ok); - assert_eq!(base2.apply(_1_b_1), OpState::Ok); - assert_eq!(base1.apply(_2_a_1), OpState::Ok); - assert_eq!(base1.apply(_2_a_2), OpState::Ok); - assert_eq!(base1.apply(_2_c_1), OpState::Ok); - - assert_eq!(base1.doc.view().into_json(), base2.doc.view().into_json()); - assert_eq!( - base1.doc.view().into_json(), - json!({ - "a": 2.13, - "b": true, - "c": "abc" - }) - ) - } - - #[test] - fn test_vec_and_map_ops() { - #[add_crdt_fields] - #[derive(Clone, CrdtNode, Debug)] - struct Test { - a: ListCrdt, - } - - let kp1 = make_keypair(); - let kp2 = make_keypair(); - let mut base1 = BaseCrdt::::new(&kp1); - let mut base2 = BaseCrdt::::new(&kp2); - - let _1a = base1.doc.a.insert(ROOT_ID, "a".to_string()).sign(&kp1); - let _1b = base1.doc.a.insert(_1a.id(), "b".to_string()).sign(&kp1); - let _2c = base2.doc.a.insert(ROOT_ID, "c".to_string()).sign(&kp2); - let _2d = base2.doc.a.insert(_1b.id(), "d".to_string()).sign(&kp2); - - assert_eq!( - base1.doc.view().into_json(), - json!({ - "a": ["a", "b"], - }) - ); - - // as _1b hasn't been delivered to base2 yet - assert_eq!( - base2.doc.view().into_json(), - json!({ - "a": ["c"], - }) - ); - - assert_eq!(base2.apply(_1b), OpState::MissingCausalDependencies); - assert_eq!(base2.apply(_1a), OpState::Ok); - assert_eq!(base1.apply(_2d), OpState::Ok); - assert_eq!(base1.apply(_2c), OpState::Ok); - assert_eq!(base1.doc.view().into_json(), base2.doc.view().into_json()); - } - - #[test] - fn test_causal_field_dependency() { - #[add_crdt_fields] - #[derive(Clone, CrdtNode, Debug)] - struct Item { - name: LwwRegisterCrdt, - soulbound: LwwRegisterCrdt, - } - - #[add_crdt_fields] - #[derive(Clone, CrdtNode, Debug)] - struct Player { - inventory: ListCrdt, - balance: LwwRegisterCrdt, - } - - let kp1 = make_keypair(); - let kp2 = make_keypair(); - let mut base1 = BaseCrdt::::new(&kp1); - let mut base2 = BaseCrdt::::new(&kp2); - - // require balance update to happen before inventory update - let _add_money = base1.doc.balance.set(5000.0).sign(&kp1); - let _spend_money = base1 - .doc - .balance - .set(3000.0) - .sign_with_dependencies(&kp1, vec![&_add_money]); - - let sword: JsonValue = json!({ - "name": "Sword", - "soulbound": true, - }) - .into(); - let _new_inventory_item = base1 - .doc - .inventory - .insert_idx(0, sword) - .sign_with_dependencies(&kp1, vec![&_spend_money]); - - assert_eq!( - base1.doc.view().into_json(), - json!({ - "balance": 3000.0, - "inventory": [ - { - "name": "Sword", - "soulbound": true - } - ] - }) - ); - - // do it completely out of order - assert_eq!( - base2.apply(_new_inventory_item), - OpState::MissingCausalDependencies - ); - assert_eq!( - base2.apply(_spend_money), - OpState::MissingCausalDependencies - ); - assert_eq!(base2.apply(_add_money), OpState::Ok); - assert_eq!(base1.doc.view().into_json(), base2.doc.view().into_json()); - } - - #[test] - fn test_2d_grid() { - #[add_crdt_fields] - #[derive(Clone, CrdtNode, Debug)] - struct Game { - grid: ListCrdt>>, - } - - let kp1 = make_keypair(); - let kp2 = make_keypair(); - let mut base1 = BaseCrdt::::new(&kp1); - let mut base2 = BaseCrdt::::new(&kp2); - - // init a 2d grid - let row0: JsonValue = json!([true, false]).into(); - let row1: JsonValue = json!([false, true]).into(); - let construct1 = base1.doc.grid.insert_idx(0, row0).sign(&kp1); - let construct2 = base1.doc.grid.insert_idx(1, row1).sign(&kp1); - - assert_eq!(base2.apply(construct1), OpState::Ok); - assert_eq!(base2.apply(construct2.clone()), OpState::Ok); - - assert_eq!(base1.doc.view().into_json(), base2.doc.view().into_json()); - assert_eq!( - base1.doc.view().into_json(), - json!({ - "grid": [[true, false], [false, true]] - }) - ); - - let set1 = base1.doc.grid[0][0].set(false).sign(&kp1); - let set2 = base2.doc.grid[1][1].set(false).sign(&kp2); - assert_eq!(base1.apply(set2), OpState::Ok); - assert_eq!(base2.apply(set1), OpState::Ok); - - assert_eq!(base1.doc.view().into_json(), base2.doc.view().into_json()); - assert_eq!( - base1.doc.view().into_json(), - json!({ - "grid": [[false, false], [false, false]] - }) - ); - - let topright = base1.doc.grid[0].id_at(1).unwrap(); - base1.doc.grid[0].delete(topright); - assert_eq!( - base1.doc.view().into_json(), - json!({ - "grid": [[false], [false, false]] - }) - ); - - base1.doc.grid.delete(construct2.id()); - assert_eq!( - base1.doc.view().into_json(), - json!({ - "grid": [[false]] - }) - ); - } - - #[test] - fn test_arb_json() { - #[add_crdt_fields] - #[derive(Clone, CrdtNode, Debug)] - struct Test { - reg: LwwRegisterCrdt, - } - - let kp1 = make_keypair(); - let mut base1 = BaseCrdt::::new(&kp1); - - let base_val: JsonValue = json!({ - "a": true, - "b": "asdf", - "c": { - "d": [], - "e": [ false ] - } - }) - .into(); - base1.doc.reg.set(base_val).sign(&kp1); - assert_eq!( - base1.doc.view().into_json(), - json!({ - "reg": { - "a": true, - "b": "asdf", - "c": { - "d": [], - "e": [ false ] - } - } - }) - ); - } - - #[test] - fn test_wrong_json_types() { - #[add_crdt_fields] - #[derive(Clone, CrdtNode, Debug)] - struct Nested { - list: ListCrdt, - } - - #[add_crdt_fields] - #[derive(Clone, CrdtNode, Debug)] - struct Test { - reg: LwwRegisterCrdt, - strct: ListCrdt, - } - - let key = make_keypair(); - let mut crdt = BaseCrdt::::new(&key); - - // wrong type should not go through - crdt.doc.reg.set(32); - assert_eq!(crdt.doc.reg.view(), json!(null).into()); - crdt.doc.reg.set(true); - assert_eq!(crdt.doc.reg.view(), json!(true).into()); - - // set nested - let mut list_view: JsonValue = crdt.doc.strct.view().into(); - assert_eq!(list_view, json!([]).into()); - - // only keeps actual numbers - let list: JsonValue = json!({"list": [0, 123, -0.45, "char", []]}).into(); - crdt.doc.strct.insert_idx(0, list); - list_view = crdt.doc.strct.view().into(); - assert_eq!(list_view, json!([{ "list": [0, 123, -0.45]}]).into()); - } -} diff --git a/crates/bft-json-crdt/src/json_crdt/base.rs b/crates/bft-json-crdt/src/json_crdt/base.rs new file mode 100644 index 00000000..49e82e10 --- /dev/null +++ b/crates/bft-json-crdt/src/json_crdt/base.rs @@ -0,0 +1,127 @@ +//! [`BaseCrdt`] — the top-level causal-delivery wrapper around any [`CrdtNode`]. + +use std::collections::{HashMap, HashSet}; + +use fastcrypto::ed25519::Ed25519KeyPair; +use fastcrypto::traits::KeyPair; + +use crate::debug::DebugView; +use crate::keypair::SignedDigest; + +use super::{CrdtNode, OpState, SignedOp, CAUSAL_QUEUE_MAX}; + +/// The base struct for a JSON CRDT. Allows for declaring causal +/// dependencies across fields. It only accepts messages of [`SignedOp`] for BFT. +pub struct BaseCrdt { + /// Public key of this CRDT + pub id: crate::keypair::AuthorId, + + /// Internal base CRDT + pub doc: T, + + /// In a real world scenario, this would be a proper hash graph that allows for + /// efficient reconciliation of missing dependencies. We naively keep a hash set + /// of messages we've seen (represented by their [`SignedDigest`]). + received: HashSet, + message_q: HashMap>, + + /// Total count of ops currently held in [`message_q`] waiting for their causal + /// dependencies to be delivered. Used to enforce [`CAUSAL_QUEUE_MAX`]. + queue_len: usize, +} + +impl BaseCrdt { + /// Create a new BaseCRDT of the given type. Multiple BaseCRDTs + /// can be created from a single keypair but you are responsible for + /// routing messages to the right BaseCRDT. Usually you should just make a single + /// struct that contains all the state you need. + pub fn new(keypair: &Ed25519KeyPair) -> Self { + let id = keypair.public().0.to_bytes(); + Self { + id, + doc: T::new(id, vec![]), + received: HashSet::new(), + message_q: HashMap::new(), + queue_len: 0, + } + } + + /// Apply a signed operation to this BaseCRDT, verifying integrity and routing to the right + /// nested CRDT + pub fn apply(&mut self, op: SignedOp) -> OpState { + // self.log_try_apply(&op); + + #[cfg(feature = "bft")] + if !op.is_valid_digest() { + self.debug_digest_failure(op); + return OpState::ErrDigestMismatch; + } + + let op_id = op.signed_digest; + + // Self-loop / dedup guard: if we have already processed this op (identified by + // its signed_digest), return immediately without re-applying it. This prevents + // echo loops where an op we broadcast to a peer comes back to us. + if self.received.contains(&op_id) { + return OpState::AlreadySeen; + } + + if !op.depends_on.is_empty() { + for origin in &op.depends_on { + if !self.received.contains(origin) { + self.log_missing_causal_dep(origin); + + // Bounded queue overflow: evict the oldest op from the largest + // pending bucket before adding the new one. See CAUSAL_QUEUE_MAX. + if self.queue_len >= CAUSAL_QUEUE_MAX { + if let Some(bucket) = self.message_q.values_mut().max_by_key(|v| v.len()) { + if !bucket.is_empty() { + bucket.remove(0); + self.queue_len = self.queue_len.saturating_sub(1); + } + } + } + + self.message_q.entry(*origin).or_default().push(op); + self.queue_len += 1; + return OpState::MissingCausalDependencies; + } + } + } + + // apply + // self.log_actually_apply(&op); + let status = self.doc.apply(op.inner); + // self.debug_view(); + + // Only record the op as seen when it applied successfully. If the op + // was rejected (e.g. ErrHashMismatch from a tampered payload), we must + // NOT add its signed_digest to `received`: a legitimate op that shares + // the same signed_digest (e.g. the un-tampered original) would otherwise + // be silently dropped as AlreadySeen. + // Only mark as received and unblock dependents when the op was actually + // applied. If we insert on error (e.g. ErrHashMismatch), a subsequent + // apply of a *legitimate* op with the same signed_digest would be + // silently dropped as AlreadySeen, preventing equivocation detection + // from working correctly. + if status == OpState::Ok { + self.received.insert(op_id); + + // apply all of its causal dependents if there are any + let dependent_queue = self.message_q.remove(&op_id); + if let Some(mut q) = dependent_queue { + self.queue_len = self.queue_len.saturating_sub(q.len()); + for dependent in q.drain(..) { + self.apply(dependent); + } + } + } + status + } + + /// Number of ops currently held in the causal-order queue waiting for their + /// dependencies to be satisfied. + pub fn causal_queue_len(&self) -> usize { + self.queue_len + } +} diff --git a/crates/bft-json-crdt/src/json_crdt/mod.rs b/crates/bft-json-crdt/src/json_crdt/mod.rs new file mode 100644 index 00000000..4effe1ec --- /dev/null +++ b/crates/bft-json-crdt/src/json_crdt/mod.rs @@ -0,0 +1,439 @@ +//! JSON CRDT public interface: core traits, re-exports, and integration tests. +// TODO: serde's json object serialization and deserialization (correctly) do not define anything +// object field order in JSON objects. However, the hash check impl in bft-json-bft-crdt does take order +// into account. This is going to cause problems later for non-Rust implementations, BFT hash checking +// currently depends on JSON serialization/deserialization object order. This shouldn't be the case +// but I've hacked in an IndexMap for the moment to get the PoC working. To see the problem, replace this with +// a std HashMap, everything will screw up (annoyingly, only *most* of the time). + +use crate::debug::debug_op_on_primitive; +use crate::keypair::AuthorId; +use crate::op::{Hashable, Op, PathSegment}; + +pub use bft_crdt_derive::*; + +mod base; +mod signed_op; +mod value; + +pub use base::BaseCrdt; +pub use signed_op::{OpState, SignedOp, CAUSAL_QUEUE_MAX}; +pub use value::JsonValue; + +/// Anything that can be nested in a JSON CRDT +pub trait CrdtNode: CrdtNodeFromValue + Hashable + Clone { + /// Create a new CRDT of this type + fn new(id: AuthorId, path: Vec) -> Self; + /// Apply an operation to this CRDT, forwarding if necessary + fn apply(&mut self, op: Op) -> OpState; + /// Get a JSON representation of the value in this node + fn view(&self) -> JsonValue; +} + +/// The following types can be used as a 'terminal' type in CRDTs +pub trait MarkPrimitive: Into + Default {} +impl MarkPrimitive for bool {} +impl MarkPrimitive for i32 {} +impl MarkPrimitive for i64 {} +impl MarkPrimitive for f64 {} +impl MarkPrimitive for char {} +impl MarkPrimitive for String {} +impl MarkPrimitive for JsonValue {} + +/// Implement CrdtNode for non-CRDTs +/// This is a stub implementation so most functions don't do anything/log an error +impl CrdtNode for T +where + T: CrdtNodeFromValue + MarkPrimitive + Hashable + Clone, +{ + fn apply(&mut self, _op: Op) -> OpState { + OpState::ErrApplyOnPrimitive + } + + fn view(&self) -> JsonValue { + self.to_owned().into() + } + + fn new(_id: AuthorId, _path: Vec) -> Self { + debug_op_on_primitive(_path); + Default::default() + } +} + +/// Fallibly create a CRDT Node from a JSON Value +pub trait CrdtNodeFromValue: Sized { + fn node_from(value: JsonValue, id: AuthorId, path: Vec) -> Result; +} + +/// Fallibly cast a JSON Value into a CRDT Node +pub trait IntoCrdtNode: Sized { + fn into_node(self, id: AuthorId, path: Vec) -> Result; +} + +/// [`CrdtNodeFromValue`] implies [`IntoCrdtNode`] +impl IntoCrdtNode for JsonValue +where + T: CrdtNodeFromValue, +{ + fn into_node(self, id: AuthorId, path: Vec) -> Result { + T::node_from(self, id, path) + } +} + +/// Trivial conversion from [`JsonValue`] to [`JsonValue`] as [`CrdtNodeFromValue`] +impl CrdtNodeFromValue for JsonValue { + fn node_from(value: JsonValue, _id: AuthorId, _path: Vec) -> Result { + Ok(value) + } +} + +#[cfg(test)] +mod test { + use serde_json::json; + + use crate::{ + json_crdt::{add_crdt_fields, BaseCrdt, CrdtNode, IntoCrdtNode, JsonValue, OpState}, + keypair::make_keypair, + list_crdt::ListCrdt, + lww_crdt::LwwRegisterCrdt, + op::{print_path, ROOT_ID}, + }; + + #[test] + fn test_derive_basic() { + #[add_crdt_fields] + #[derive(Clone, CrdtNode, Debug)] + struct Player { + x: LwwRegisterCrdt, + y: LwwRegisterCrdt, + } + + let keypair = make_keypair(); + let crdt = BaseCrdt::::new(&keypair); + assert_eq!(print_path(crdt.doc.x.path), "x"); + assert_eq!(print_path(crdt.doc.y.path), "y"); + } + + #[test] + fn test_derive_nested() { + #[add_crdt_fields] + #[derive(Clone, CrdtNode, Debug)] + struct Position { + x: LwwRegisterCrdt, + y: LwwRegisterCrdt, + } + + #[add_crdt_fields] + #[derive(Clone, CrdtNode, Debug)] + struct Player { + pos: Position, + balance: LwwRegisterCrdt, + messages: ListCrdt, + } + + let keypair = make_keypair(); + let crdt = BaseCrdt::::new(&keypair); + assert_eq!(print_path(crdt.doc.pos.x.path), "pos.x"); + assert_eq!(print_path(crdt.doc.pos.y.path), "pos.y"); + assert_eq!(print_path(crdt.doc.balance.path), "balance"); + assert_eq!(print_path(crdt.doc.messages.path), "messages"); + } + + #[test] + fn test_lww_ops() { + #[add_crdt_fields] + #[derive(Clone, CrdtNode, Debug)] + struct Test { + a: LwwRegisterCrdt, + b: LwwRegisterCrdt, + c: LwwRegisterCrdt, + } + + let kp1 = make_keypair(); + let kp2 = make_keypair(); + let mut base1 = BaseCrdt::::new(&kp1); + let mut base2 = BaseCrdt::::new(&kp2); + + let _1_a_1 = base1.doc.a.set(3.0).sign(&kp1); + let _1_b_1 = base1.doc.b.set(true).sign(&kp1); + let _2_a_1 = base2.doc.a.set(1.5).sign(&kp2); + let _2_a_2 = base2.doc.a.set(2.13).sign(&kp2); + let _2_c_1 = base2.doc.c.set("abc".to_string()).sign(&kp2); + + assert_eq!(base1.doc.a.view(), json!(3.0).into()); + assert_eq!(base2.doc.a.view(), json!(2.13).into()); + assert_eq!(base1.doc.b.view(), json!(true).into()); + assert_eq!(base2.doc.c.view(), json!("abc").into()); + + assert_eq!( + base1.doc.view().into_json(), + json!({ + "a": 3.0, + "b": true, + "c": null, + }) + ); + assert_eq!( + base2.doc.view().into_json(), + json!({ + "a": 2.13, + "b": null, + "c": "abc", + }) + ); + + assert_eq!(base2.apply(_1_a_1), OpState::Ok); + assert_eq!(base2.apply(_1_b_1), OpState::Ok); + assert_eq!(base1.apply(_2_a_1), OpState::Ok); + assert_eq!(base1.apply(_2_a_2), OpState::Ok); + assert_eq!(base1.apply(_2_c_1), OpState::Ok); + + assert_eq!(base1.doc.view().into_json(), base2.doc.view().into_json()); + assert_eq!( + base1.doc.view().into_json(), + json!({ + "a": 2.13, + "b": true, + "c": "abc" + }) + ) + } + + #[test] + fn test_vec_and_map_ops() { + #[add_crdt_fields] + #[derive(Clone, CrdtNode, Debug)] + struct Test { + a: ListCrdt, + } + + let kp1 = make_keypair(); + let kp2 = make_keypair(); + let mut base1 = BaseCrdt::::new(&kp1); + let mut base2 = BaseCrdt::::new(&kp2); + + let _1a = base1.doc.a.insert(ROOT_ID, "a".to_string()).sign(&kp1); + let _1b = base1.doc.a.insert(_1a.id(), "b".to_string()).sign(&kp1); + let _2c = base2.doc.a.insert(ROOT_ID, "c".to_string()).sign(&kp2); + let _2d = base2.doc.a.insert(_1b.id(), "d".to_string()).sign(&kp2); + + assert_eq!( + base1.doc.view().into_json(), + json!({ + "a": ["a", "b"], + }) + ); + + // as _1b hasn't been delivered to base2 yet + assert_eq!( + base2.doc.view().into_json(), + json!({ + "a": ["c"], + }) + ); + + assert_eq!(base2.apply(_1b), OpState::MissingCausalDependencies); + assert_eq!(base2.apply(_1a), OpState::Ok); + assert_eq!(base1.apply(_2d), OpState::Ok); + assert_eq!(base1.apply(_2c), OpState::Ok); + assert_eq!(base1.doc.view().into_json(), base2.doc.view().into_json()); + } + + #[test] + fn test_causal_field_dependency() { + #[add_crdt_fields] + #[derive(Clone, CrdtNode, Debug)] + struct Item { + name: LwwRegisterCrdt, + soulbound: LwwRegisterCrdt, + } + + #[add_crdt_fields] + #[derive(Clone, CrdtNode, Debug)] + struct Player { + inventory: ListCrdt, + balance: LwwRegisterCrdt, + } + + let kp1 = make_keypair(); + let kp2 = make_keypair(); + let mut base1 = BaseCrdt::::new(&kp1); + let mut base2 = BaseCrdt::::new(&kp2); + + // require balance update to happen before inventory update + let _add_money = base1.doc.balance.set(5000.0).sign(&kp1); + let _spend_money = base1 + .doc + .balance + .set(3000.0) + .sign_with_dependencies(&kp1, vec![&_add_money]); + + let sword: JsonValue = json!({ + "name": "Sword", + "soulbound": true, + }) + .into(); + let _new_inventory_item = base1 + .doc + .inventory + .insert_idx(0, sword) + .sign_with_dependencies(&kp1, vec![&_spend_money]); + + assert_eq!( + base1.doc.view().into_json(), + json!({ + "balance": 3000.0, + "inventory": [ + { + "name": "Sword", + "soulbound": true + } + ] + }) + ); + + // do it completely out of order + assert_eq!( + base2.apply(_new_inventory_item), + OpState::MissingCausalDependencies + ); + assert_eq!( + base2.apply(_spend_money), + OpState::MissingCausalDependencies + ); + assert_eq!(base2.apply(_add_money), OpState::Ok); + assert_eq!(base1.doc.view().into_json(), base2.doc.view().into_json()); + } + + #[test] + fn test_2d_grid() { + #[add_crdt_fields] + #[derive(Clone, CrdtNode, Debug)] + struct Game { + grid: ListCrdt>>, + } + + let kp1 = make_keypair(); + let kp2 = make_keypair(); + let mut base1 = BaseCrdt::::new(&kp1); + let mut base2 = BaseCrdt::::new(&kp2); + + // init a 2d grid + let row0: JsonValue = json!([true, false]).into(); + let row1: JsonValue = json!([false, true]).into(); + let construct1 = base1.doc.grid.insert_idx(0, row0).sign(&kp1); + let construct2 = base1.doc.grid.insert_idx(1, row1).sign(&kp1); + + assert_eq!(base2.apply(construct1), OpState::Ok); + assert_eq!(base2.apply(construct2.clone()), OpState::Ok); + + assert_eq!(base1.doc.view().into_json(), base2.doc.view().into_json()); + assert_eq!( + base1.doc.view().into_json(), + json!({ + "grid": [[true, false], [false, true]] + }) + ); + + let set1 = base1.doc.grid[0][0].set(false).sign(&kp1); + let set2 = base2.doc.grid[1][1].set(false).sign(&kp2); + assert_eq!(base1.apply(set2), OpState::Ok); + assert_eq!(base2.apply(set1), OpState::Ok); + + assert_eq!(base1.doc.view().into_json(), base2.doc.view().into_json()); + assert_eq!( + base1.doc.view().into_json(), + json!({ + "grid": [[false, false], [false, false]] + }) + ); + + let topright = base1.doc.grid[0].id_at(1).unwrap(); + base1.doc.grid[0].delete(topright); + assert_eq!( + base1.doc.view().into_json(), + json!({ + "grid": [[false], [false, false]] + }) + ); + + base1.doc.grid.delete(construct2.id()); + assert_eq!( + base1.doc.view().into_json(), + json!({ + "grid": [[false]] + }) + ); + } + + #[test] + fn test_arb_json() { + #[add_crdt_fields] + #[derive(Clone, CrdtNode, Debug)] + struct Test { + reg: LwwRegisterCrdt, + } + + let kp1 = make_keypair(); + let mut base1 = BaseCrdt::::new(&kp1); + + let base_val: JsonValue = json!({ + "a": true, + "b": "asdf", + "c": { + "d": [], + "e": [ false ] + } + }) + .into(); + base1.doc.reg.set(base_val).sign(&kp1); + assert_eq!( + base1.doc.view().into_json(), + json!({ + "reg": { + "a": true, + "b": "asdf", + "c": { + "d": [], + "e": [ false ] + } + } + }) + ); + } + + #[test] + fn test_wrong_json_types() { + #[add_crdt_fields] + #[derive(Clone, CrdtNode, Debug)] + struct Nested { + list: ListCrdt, + } + + #[add_crdt_fields] + #[derive(Clone, CrdtNode, Debug)] + struct Test { + reg: LwwRegisterCrdt, + strct: ListCrdt, + } + + let key = make_keypair(); + let mut crdt = BaseCrdt::::new(&key); + + // wrong type should not go through + crdt.doc.reg.set(32); + assert_eq!(crdt.doc.reg.view(), json!(null).into()); + crdt.doc.reg.set(true); + assert_eq!(crdt.doc.reg.view(), json!(true).into()); + + // set nested + let mut list_view: JsonValue = crdt.doc.strct.view().into(); + assert_eq!(list_view, json!([]).into()); + + // only keeps actual numbers + let list: JsonValue = json!({"list": [0, 123, -0.45, "char", []]}).into(); + crdt.doc.strct.insert_idx(0, list); + list_view = crdt.doc.strct.view().into(); + assert_eq!(list_view, json!([{ "list": [0, 123, -0.45]}]).into()); + } +} diff --git a/crates/bft-json-crdt/src/json_crdt/signed_op.rs b/crates/bft-json-crdt/src/json_crdt/signed_op.rs new file mode 100644 index 00000000..11c05064 --- /dev/null +++ b/crates/bft-json-crdt/src/json_crdt/signed_op.rs @@ -0,0 +1,147 @@ +//! [`SignedOp`], [`OpState`], and the causal queue capacity constant. + +use fastcrypto::traits::VerifyingKey; +use fastcrypto::{ + ed25519::{Ed25519KeyPair, Ed25519PublicKey, Ed25519Signature}, + traits::{KeyPair, ToFromBytes}, +}; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, Bytes}; + +use crate::keypair::{sha256, sign, AuthorId, SignedDigest}; +use crate::op::{print_hex, print_path, Op, OpId}; + +use super::{CrdtNode, JsonValue}; + +/// Enum representing possible outcomes of applying an operation to a CRDT +#[derive(Debug, PartialEq)] +pub enum OpState { + /// Operation applied successfully + Ok, + /// Tried to apply an operation to a non-CRDT primitive (i.e. f64, bool, etc.) + /// If you would like a mutable primitive, wrap it in a [`LWWRegisterCRDT`] + ErrApplyOnPrimitive, + /// Tried to apply an operation to a static struct CRDT + /// If you would like a mutable object, use a [`Value`] + ErrApplyOnStruct, + /// Tried to apply an operation that contains content of the wrong type. + /// In other words, the content cannot be coerced to the CRDT at the path specified. + ErrMismatchedType, + /// The signed digest of the message did not match the claimed author of the message. + /// This can happen if the message was tampered with during delivery + ErrDigestMismatch, + /// The hash of the message did not match the contents of the message. + /// This can happen if the author tried to perform an equivocation attack by creating an + /// operation and modifying it has already been created + ErrHashMismatch, + /// Tried to apply an operation to a non-existent path. The author may have forgotten to attach + /// a causal dependency + ErrPathMismatch, + /// Trying to modify/delete the sentinel (zero-th) node element that is used for book-keeping + ErrListApplyToEmpty, + /// We have not received all of the causal dependencies of this operation. It has been queued + /// up and will be executed when its causal dependencies have been delivered + MissingCausalDependencies, + /// This op has already been applied (identified by its `signed_digest`). + /// The CRDT state is unchanged — this is a no-op (idempotent self-loop guard). + AlreadySeen, +} + +/// Maximum total number of ops that may sit in the causal-order hold queue at any +/// one time, summed across all pending dependency buckets. +/// +/// **Overflow policy: drop oldest.** +/// When the limit is reached, the oldest pending op in the largest dependency bucket +/// is silently evicted before the new op is queued. Rationale: a misbehaving or +/// heavily-partitioned peer can send ops whose causal ancestors never arrive, causing +/// unbounded memory growth. Dropping the oldest entry preserves the most recent +/// information and caps memory use. The peer can reconnect and receive a fresh bulk +/// state dump to recover any dropped ops. +pub const CAUSAL_QUEUE_MAX: usize = 256; + +/// An [`Op`] with a few bits of extra metadata +#[serde_as] +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] +pub struct SignedOp { + // Note that this can be different from the author of the inner op as the inner op could have been created + // by a different person + author: AuthorId, + /// Signed hash using priv key of author. Effectively [`OpID`] Use this as the ID to figure out what has been delivered already + #[serde_as(as = "Bytes")] + pub signed_digest: SignedDigest, + pub inner: Op, + /// List of causal dependencies + #[serde_as(as = "Vec")] + pub depends_on: Vec, +} + +impl SignedOp { + pub fn id(&self) -> OpId { + self.inner.id + } + + pub fn author(&self) -> AuthorId { + self.author + } + + /// Creates a digest of the following fields. Any changes in the fields will change the signed digest + /// - id (hash of the following) + /// - origin + /// - author + /// - seq + /// - is_deleted + /// - path + /// - dependencies + fn digest(&self) -> [u8; 32] { + let path_string = print_path(self.inner.path.clone()); + let dependency_string = self + .depends_on + .iter() + .map(print_hex) + .collect::>() + .join(""); + let fmt_str = format!("{:?},{path_string},{dependency_string}", self.id()); + sha256(fmt_str) + } + + /// Sign this digest with the given keypair. Shouldn't need to be called manually, + /// just use [`SignedOp::from_op`] instead + fn sign_digest(&mut self, keypair: &Ed25519KeyPair) { + self.signed_digest = sign(keypair, &self.digest()).sig.to_bytes() + } + + /// Ensure digest was actually signed by the author it claims to be signed by + pub fn is_valid_digest(&self) -> bool { + let digest = Ed25519Signature::from_bytes(&self.signed_digest); + let pubkey = Ed25519PublicKey::from_bytes(&self.author()); + match (digest, pubkey) { + (Ok(digest), Ok(pubkey)) => pubkey.verify(&self.digest(), &digest).is_ok(), + (_, _) => false, + } + } + + /// Sign a normal op and add all the needed metadata + pub fn from_op( + value: Op, + keypair: &Ed25519KeyPair, + depends_on: Vec, + ) -> Self { + let author = keypair.public().0.to_bytes(); + let mut new = Self { + inner: Op { + content: value.content.map(|c| c.view()), + origin: value.origin, + author: value.author, + seq: value.seq, + path: value.path, + is_deleted: value.is_deleted, + id: value.id, + }, + author, + signed_digest: [0u8; 64], + depends_on, + }; + new.sign_digest(keypair); + new + } +} diff --git a/crates/bft-json-crdt/src/json_crdt/value.rs b/crates/bft-json-crdt/src/json_crdt/value.rs new file mode 100644 index 00000000..4681627a --- /dev/null +++ b/crates/bft-json-crdt/src/json_crdt/value.rs @@ -0,0 +1,262 @@ +//! The [`JsonValue`] enum and all its conversions to/from primitive and CRDT types. + +use std::fmt::Display; + +use indexmap::IndexMap; +use serde::{Deserialize, Serialize}; + +use crate::{keypair::AuthorId, list_crdt::ListCrdt, lww_crdt::LwwRegisterCrdt, op::PathSegment}; + +use super::{CrdtNode, CrdtNodeFromValue}; + +/// An enum representing a JSON value +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub enum JsonValue { + Null, + Bool(bool), + Number(f64), + String(String), + Array(Vec), + Object(IndexMap), +} + +impl Display for JsonValue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + JsonValue::Null => "null".to_string(), + JsonValue::Bool(b) => b.to_string(), + JsonValue::Number(n) => n.to_string(), + JsonValue::String(s) => format!("\"{s}\""), + JsonValue::Array(arr) => { + if arr.len() > 1 { + format!( + "[\n{}\n]", + arr.iter() + .map(|x| format!(" {x}")) + .collect::>() + .join(",\n") + ) + } else { + format!( + "[ {} ]", + arr.iter() + .map(|x| x.to_string()) + .collect::>() + .join(", ") + ) + } + } + JsonValue::Object(obj) => format!( + "{{ {} }}", + obj.iter() + .map(|(k, v)| format!(" \"{k}\": {v}")) + .collect::>() + .join(",\n") + ), + } + ) + } +} + +impl Default for JsonValue { + fn default() -> Self { + Self::Null + } +} + +/// Allow easy conversion to and from serde's JSON format. This allows us to use the [`json!`] +/// macro +impl From for serde_json::Value { + fn from(value: JsonValue) -> Self { + match value { + JsonValue::Null => serde_json::Value::Null, + JsonValue::Bool(x) => serde_json::Value::Bool(x), + JsonValue::Number(x) => { + serde_json::Value::Number(serde_json::Number::from_f64(x).unwrap()) + } + JsonValue::String(x) => serde_json::Value::String(x), + JsonValue::Array(x) => { + serde_json::Value::Array(x.iter().map(|a| a.clone().into()).collect()) + } + JsonValue::Object(x) => serde_json::Value::Object( + x.iter() + .map(|(k, v)| (k.clone(), v.clone().into())) + .collect(), + ), + } + } +} + +impl From for JsonValue { + fn from(value: serde_json::Value) -> Self { + match value { + serde_json::Value::Null => JsonValue::Null, + serde_json::Value::Bool(x) => JsonValue::Bool(x), + serde_json::Value::Number(x) => JsonValue::Number(x.as_f64().unwrap()), + serde_json::Value::String(x) => JsonValue::String(x), + serde_json::Value::Array(x) => { + JsonValue::Array(x.iter().map(|a| a.clone().into()).collect()) + } + serde_json::Value::Object(x) => JsonValue::Object( + x.iter() + .map(|(k, v)| (k.clone(), v.clone().into())) + .collect(), + ), + } + } +} + +impl JsonValue { + pub fn into_json(self) -> serde_json::Value { + self.into() + } +} + +/// Conversions from primitive types to [`JsonValue`] +impl From for JsonValue { + fn from(val: bool) -> Self { + JsonValue::Bool(val) + } +} + +impl From for JsonValue { + fn from(val: i64) -> Self { + JsonValue::Number(val as f64) + } +} + +impl From for JsonValue { + fn from(val: i32) -> Self { + JsonValue::Number(val as f64) + } +} + +impl From for JsonValue { + fn from(val: f64) -> Self { + JsonValue::Number(val) + } +} + +impl From for JsonValue { + fn from(val: String) -> Self { + JsonValue::String(val) + } +} + +impl From for JsonValue { + fn from(val: char) -> Self { + JsonValue::String(val.into()) + } +} + +impl From> for JsonValue +where + T: CrdtNode, +{ + fn from(val: Option) -> Self { + match val { + Some(x) => x.view(), + None => JsonValue::Null, + } + } +} + +impl From> for JsonValue +where + T: CrdtNode, +{ + fn from(value: Vec) -> Self { + JsonValue::Array(value.iter().map(|x| x.view()).collect()) + } +} + +/// Conversions from bool to CRDT +impl CrdtNodeFromValue for bool { + fn node_from(value: JsonValue, _id: AuthorId, _path: Vec) -> Result { + if let JsonValue::Bool(x) = value { + Ok(x) + } else { + Err(format!("failed to convert {value:?} -> bool")) + } + } +} + +/// Conversions from f64 to CRDT +impl CrdtNodeFromValue for f64 { + fn node_from(value: JsonValue, _id: AuthorId, _path: Vec) -> Result { + if let JsonValue::Number(x) = value { + Ok(x) + } else { + Err(format!("failed to convert {value:?} -> f64")) + } + } +} + +/// Conversions from i64 to CRDT +impl CrdtNodeFromValue for i64 { + fn node_from(value: JsonValue, _id: AuthorId, _path: Vec) -> Result { + if let JsonValue::Number(x) = value { + Ok(x as i64) + } else { + Err(format!("failed to convert {value:?} -> f64")) + } + } +} + +/// Conversions from String to CRDT +impl CrdtNodeFromValue for String { + fn node_from(value: JsonValue, _id: AuthorId, _path: Vec) -> Result { + if let JsonValue::String(x) = value { + Ok(x) + } else { + Err(format!("failed to convert {value:?} -> String")) + } + } +} + +/// Conversions from char to CRDT +impl CrdtNodeFromValue for char { + fn node_from(value: JsonValue, _id: AuthorId, _path: Vec) -> Result { + if let JsonValue::String(x) = value.clone() { + x.chars().next().ok_or(format!( + "failed to convert {value:?} -> char: found a zero-length string" + )) + } else { + Err(format!("failed to convert {value:?} -> char")) + } + } +} + +impl CrdtNodeFromValue for LwwRegisterCrdt +where + T: CrdtNode, +{ + fn node_from(value: JsonValue, id: AuthorId, path: Vec) -> Result { + let mut crdt = LwwRegisterCrdt::new(id, path); + crdt.set(value); + Ok(crdt) + } +} + +impl CrdtNodeFromValue for ListCrdt +where + T: CrdtNode, +{ + fn node_from(value: JsonValue, id: AuthorId, path: Vec) -> Result { + if let JsonValue::Array(arr) = value { + let mut crdt = ListCrdt::new(id, path); + let result: Result<(), String> = + arr.into_iter().enumerate().try_for_each(|(i, val)| { + crdt.insert_idx(i, val); + Ok(()) + }); + result?; + Ok(crdt) + } else { + Err(format!("failed to convert {value:?} -> ListCRDT")) + } + } +}