//! CRDT wire codec — serialization format for `SignedOp` sync messages between nodes. /// Wire codec for `SignedOp` CRDT sync between nodes. /// /// # Wire Format /// /// **Chosen format: versioned JSON envelope (v1)** /// /// Rationale: JSON is already used throughout the codebase for `SignedOp` /// serialisation (`serde_json` + `serde_with::Bytes` annotations). Switching /// to a binary format (CBOR / bincode / postcard) would require adding a new /// dependency and a custom derive macro for the `Bytes` fields. JSON keeps /// the codec zero-dependency, human-readable in logs, and consistent with the /// existing `crdt_sync.rs` protocol. A version tag in the envelope ensures /// we can migrate to a binary format in a future version without breaking /// peers that are still on v1. /// /// Each encoded message is a JSON object: /// /// ```json /// {"v": 1, "op": { ...SignedOp fields... }} /// ``` /// /// - `v` — u32 version tag. Currently `1`. Unknown values are rejected with /// [`WireError::UnknownVersion`] so peers can fail fast instead of /// silently misinterpreting data. /// - `op` — the full `SignedOp` serialised via its existing `serde` impl. /// Binary fields (`signed_digest`, `depends_on`, `id`, …) use /// `serde_with::Bytes` so they appear as base64 strings in JSON. /// /// # Upgrading the format /// /// Bump `WIRE_VERSION` and add a new arm to the `match envelope.v` block in /// `decode`. Old decoders will return `WireError::UnknownVersion(N)` for /// the new version, which is the intended failure mode. use bft_json_crdt::json_crdt::SignedOp; use serde::{Deserialize, Serialize}; /// The current wire format version. pub const WIRE_VERSION: u32 = 1; // ── Error type ────────────────────────────────────────────────────────────── /// Errors returned by [`decode`]. #[derive(Debug, PartialEq)] pub enum WireError { /// The message carries a version tag that this codec does not recognise. /// The inner value is the version found in the message. UnknownVersion(u32), /// The bytes could not be parsed as a valid wire message (malformed JSON, /// wrong structure, or the embedded `SignedOp` failed to deserialise). Malformed(String), } impl std::fmt::Display for WireError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { WireError::UnknownVersion(v) => write!(f, "unknown wire version {v}"), WireError::Malformed(msg) => write!(f, "malformed wire message: {msg}"), } } } impl std::error::Error for WireError {} // ── Internal envelope ──────────────────────────────────────────────────────── #[derive(Serialize, Deserialize)] struct WireEnvelope { v: u32, op: serde_json::Value, } // ── Public API ─────────────────────────────────────────────────────────────── /// Encode a `SignedOp` into wire bytes. /// /// Returns a JSON-encoded versioned envelope. Panics only if `SignedOp`'s /// serde impl is broken (which would be a programming error, not a runtime /// condition). pub fn encode(op: &SignedOp) -> Vec { let op_value = serde_json::to_value(op).expect("SignedOp serialisation must not fail"); let envelope = WireEnvelope { v: WIRE_VERSION, op: op_value, }; serde_json::to_vec(&envelope).expect("WireEnvelope serialisation must not fail") } /// Decode wire bytes into a `SignedOp`. /// /// Returns [`WireError::UnknownVersion`] if the version tag is not recognised, /// or [`WireError::Malformed`] if the bytes are not valid JSON / the envelope /// structure is wrong / the embedded op fails to deserialise. pub fn decode(bytes: &[u8]) -> Result { let envelope: WireEnvelope = serde_json::from_slice(bytes).map_err(|e| WireError::Malformed(e.to_string()))?; match envelope.v { 1 => { let op: SignedOp = serde_json::from_value(envelope.op) .map_err(|e| WireError::Malformed(e.to_string()))?; Ok(op) } unknown => Err(WireError::UnknownVersion(unknown)), } } // ── Tests ──────────────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; use bft_json_crdt::json_crdt::{BaseCrdt, JsonValue, OpState}; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use crate::crdt_state::PipelineDoc; // ── helpers ────────────────────────────────────────────────────────────── /// Build a fresh CRDT and return its keypair along with a signed insert op. fn make_insert_op() -> (BaseCrdt, bft_json_crdt::keypair::Ed25519KeyPair, SignedOp) { let kp = make_keypair(); let mut crdt = BaseCrdt::::new(&kp); let item: JsonValue = json!({ "story_id": "505_story_wire_test", "stage": "1_backlog", "name": "Wire Test", "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); crdt.apply(op.clone()); (crdt, kp, op) } // ── round-trip tests (one per op variant) ──────────────────────────────── /// Insert op: round-trip through encode → decode. #[test] fn roundtrip_insert_op() { let (_crdt, _kp, op) = make_insert_op(); let bytes = encode(&op); let decoded = decode(&bytes).expect("decode must succeed"); assert_eq!(op, decoded); } /// Update op (LWW register set): round-trip. #[test] fn roundtrip_update_op() { let (mut crdt, kp, _insert) = make_insert_op(); let update_op = crdt.doc.items[0] .stage .set("2_current".to_string()) .sign(&kp); crdt.apply(update_op.clone()); let bytes = encode(&update_op); let decoded = decode(&bytes).expect("decode must succeed"); assert_eq!(update_op, decoded); } /// Delete op: round-trip. #[test] fn roundtrip_delete_op() { let (mut crdt, kp, insert_op) = make_insert_op(); // Delete the inserted item. let delete_op = crdt .doc .items .delete(insert_op.inner.id) .sign(&kp); crdt.apply(delete_op.clone()); let bytes = encode(&delete_op); let decoded = decode(&bytes).expect("decode must succeed"); assert_eq!(delete_op, decoded); } /// Op with multiple causal dependencies: round-trip. #[test] fn roundtrip_op_with_causal_deps() { let (mut crdt, kp, _op1) = make_insert_op(); let op2 = crdt.doc.items[0] .stage .set("2_current".to_string()) .sign(&kp); crdt.apply(op2.clone()); // op3 depends on both _op1 and op2 causally. let op3 = crdt.doc.items[0] .name .set("Updated Name".to_string()) .sign(&kp); crdt.apply(op3.clone()); let bytes = encode(&op3); let decoded = decode(&bytes).expect("decode must succeed"); assert_eq!(op3, decoded); } /// Signed digest is preserved exactly (no lossy base64 round-trip). #[test] fn signed_digest_preserved() { let (_crdt, _kp, op) = make_insert_op(); let bytes = encode(&op); let decoded = decode(&bytes).expect("decode must succeed"); assert_eq!(op.signed_digest, decoded.signed_digest); } // ── version tag tests ───────────────────────────────────────────────────── /// Encoded bytes must carry the current version tag. #[test] fn encoded_bytes_contain_version_tag() { let (_crdt, _kp, op) = make_insert_op(); let bytes = encode(&op); let text = std::str::from_utf8(&bytes).expect("output is UTF-8 JSON"); // The envelope must include `"v":1`. assert!( text.contains("\"v\":1"), "expected version tag in wire bytes, got: {text}" ); } /// Version 0 is rejected with UnknownVersion. #[test] fn unknown_version_zero_is_rejected() { let payload = br#"{"v":0,"op":{}}"#; assert_eq!(decode(payload), Err(WireError::UnknownVersion(0))); } /// A future version (e.g. 99) is rejected with UnknownVersion. #[test] fn unknown_version_future_is_rejected() { let payload = br#"{"v":99,"op":{}}"#; assert_eq!(decode(payload), Err(WireError::UnknownVersion(99))); } // ── malformed-bytes tests ───────────────────────────────────────────────── /// Completely invalid UTF-8 / non-JSON input is rejected with Malformed. #[test] fn malformed_random_bytes_is_rejected() { let payload = b"\x00\x01\x02\x03garbage"; match decode(payload) { Err(WireError::Malformed(_)) => {} other => panic!("expected Malformed, got {other:?}"), } } /// Valid JSON but missing the `v` field. #[test] fn malformed_missing_version_field() { let payload = br#"{"op":{}}"#; match decode(payload) { Err(WireError::Malformed(_)) => {} other => panic!("expected Malformed, got {other:?}"), } } /// Valid envelope version but `op` field is not a valid `SignedOp`. #[test] fn malformed_invalid_op_payload() { let payload = br#"{"v":1,"op":{"totally":"wrong"}}"#; match decode(payload) { Err(WireError::Malformed(_)) => {} other => panic!("expected Malformed, got {other:?}"), } } /// Empty bytes. #[test] fn malformed_empty_bytes() { match decode(b"") { Err(WireError::Malformed(_)) => {} other => panic!("expected Malformed, got {other:?}"), } } // ── cross-node round-trip ───────────────────────────────────────────────── /// Encode on node A, decode on node B, apply to node B's CRDT. #[test] fn cross_node_apply_after_decode() { let (_crdt_a, kp_a, op) = make_insert_op(); let wire = encode(&op); // Node B decodes and applies. let kp_b = make_keypair(); let mut crdt_b = BaseCrdt::::new(&kp_b); let decoded = decode(&wire).expect("decode must succeed"); let result = crdt_b.apply(decoded); assert_eq!(result, OpState::Ok); assert_eq!(crdt_b.doc.items.view().len(), 1); let _ = kp_a; } // ── WireError display ───────────────────────────────────────────────────── #[test] fn wire_error_display_unknown_version() { let e = WireError::UnknownVersion(42); assert_eq!(e.to_string(), "unknown wire version 42"); } #[test] fn wire_error_display_malformed() { let e = WireError::Malformed("bad json".to_string()); assert!(e.to_string().contains("malformed wire message")); } }