diff --git a/server/src/crdt_wire.rs b/server/src/crdt_wire.rs new file mode 100644 index 00000000..19aeb242 --- /dev/null +++ b/server/src/crdt_wire.rs @@ -0,0 +1,318 @@ +/// Wire codec for `SignedOp` CRDT sync between nodes. +/// +/// # Wire Format +/// +/// **Chosen format: versioned JSON envelope (v1)** +/// +/// Rationale: JSON is already used throughout the codebase for `SignedOp` +/// serialisation (`serde_json` + `serde_with::Bytes` annotations). Switching +/// to a binary format (CBOR / bincode / postcard) would require adding a new +/// dependency and a custom derive macro for the `Bytes` fields. JSON keeps +/// the codec zero-dependency, human-readable in logs, and consistent with the +/// existing `crdt_sync.rs` protocol. A version tag in the envelope ensures +/// we can migrate to a binary format in a future version without breaking +/// peers that are still on v1. +/// +/// Each encoded message is a JSON object: +/// +/// ```json +/// {"v": 1, "op": { ...SignedOp fields... }} +/// ``` +/// +/// - `v` — u32 version tag. Currently `1`. Unknown values are rejected with +/// [`WireError::UnknownVersion`] so peers can fail fast instead of +/// silently misinterpreting data. +/// - `op` — the full `SignedOp` serialised via its existing `serde` impl. +/// Binary fields (`signed_digest`, `depends_on`, `id`, …) use +/// `serde_with::Bytes` so they appear as base64 strings in JSON. +/// +/// # Upgrading the format +/// +/// Bump `WIRE_VERSION` and add a new arm to the `match envelope.v` block in +/// `decode`. Old decoders will return `WireError::UnknownVersion(N)` for +/// the new version, which is the intended failure mode. +use bft_json_crdt::json_crdt::SignedOp; +use serde::{Deserialize, Serialize}; + +/// The current wire format version. +pub const WIRE_VERSION: u32 = 1; + +// ── Error type ────────────────────────────────────────────────────────────── + +/// Errors returned by [`decode`]. +#[derive(Debug, PartialEq)] +pub enum WireError { + /// The message carries a version tag that this codec does not recognise. + /// The inner value is the version found in the message. + UnknownVersion(u32), + /// The bytes could not be parsed as a valid wire message (malformed JSON, + /// wrong structure, or the embedded `SignedOp` failed to deserialise). + Malformed(String), +} + +impl std::fmt::Display for WireError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + WireError::UnknownVersion(v) => write!(f, "unknown wire version {v}"), + WireError::Malformed(msg) => write!(f, "malformed wire message: {msg}"), + } + } +} + +impl std::error::Error for WireError {} + +// ── Internal envelope ──────────────────────────────────────────────────────── + +#[derive(Serialize, Deserialize)] +struct WireEnvelope { + v: u32, + op: serde_json::Value, +} + +// ── Public API ─────────────────────────────────────────────────────────────── + +/// Encode a `SignedOp` into wire bytes. +/// +/// Returns a JSON-encoded versioned envelope. Panics only if `SignedOp`'s +/// serde impl is broken (which would be a programming error, not a runtime +/// condition). +pub fn encode(op: &SignedOp) -> Vec { + let op_value = serde_json::to_value(op).expect("SignedOp serialisation must not fail"); + let envelope = WireEnvelope { + v: WIRE_VERSION, + op: op_value, + }; + serde_json::to_vec(&envelope).expect("WireEnvelope serialisation must not fail") +} + +/// Decode wire bytes into a `SignedOp`. +/// +/// Returns [`WireError::UnknownVersion`] if the version tag is not recognised, +/// or [`WireError::Malformed`] if the bytes are not valid JSON / the envelope +/// structure is wrong / the embedded op fails to deserialise. +pub fn decode(bytes: &[u8]) -> Result { + let envelope: WireEnvelope = + serde_json::from_slice(bytes).map_err(|e| WireError::Malformed(e.to_string()))?; + + match envelope.v { + 1 => { + let op: SignedOp = serde_json::from_value(envelope.op) + .map_err(|e| WireError::Malformed(e.to_string()))?; + Ok(op) + } + unknown => Err(WireError::UnknownVersion(unknown)), + } +} + +// ── Tests ──────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + use bft_json_crdt::json_crdt::{BaseCrdt, JsonValue, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + // ── helpers ────────────────────────────────────────────────────────────── + + /// Build a fresh CRDT and return its keypair along with a signed insert op. + fn make_insert_op() -> (BaseCrdt, bft_json_crdt::keypair::Ed25519KeyPair, SignedOp) { + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + let item: JsonValue = json!({ + "story_id": "505_story_wire_test", + "stage": "1_backlog", + "name": "Wire Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt.apply(op.clone()); + (crdt, kp, op) + } + + // ── round-trip tests (one per op variant) ──────────────────────────────── + + /// Insert op: round-trip through encode → decode. + #[test] + fn roundtrip_insert_op() { + let (_crdt, _kp, op) = make_insert_op(); + let bytes = encode(&op); + let decoded = decode(&bytes).expect("decode must succeed"); + assert_eq!(op, decoded); + } + + /// Update op (LWW register set): round-trip. + #[test] + fn roundtrip_update_op() { + let (mut crdt, kp, _insert) = make_insert_op(); + let update_op = crdt.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp); + crdt.apply(update_op.clone()); + + let bytes = encode(&update_op); + let decoded = decode(&bytes).expect("decode must succeed"); + assert_eq!(update_op, decoded); + } + + /// Delete op: round-trip. + #[test] + fn roundtrip_delete_op() { + let (mut crdt, kp, insert_op) = make_insert_op(); + // Delete the inserted item. + let delete_op = crdt + .doc + .items + .delete(insert_op.inner.id) + .sign(&kp); + crdt.apply(delete_op.clone()); + + let bytes = encode(&delete_op); + let decoded = decode(&bytes).expect("decode must succeed"); + assert_eq!(delete_op, decoded); + } + + /// Op with multiple causal dependencies: round-trip. + #[test] + fn roundtrip_op_with_causal_deps() { + let (mut crdt, kp, _op1) = make_insert_op(); + let op2 = crdt.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp); + crdt.apply(op2.clone()); + + // op3 depends on both _op1 and op2 causally. + let op3 = crdt.doc.items[0] + .name + .set("Updated Name".to_string()) + .sign(&kp); + crdt.apply(op3.clone()); + + let bytes = encode(&op3); + let decoded = decode(&bytes).expect("decode must succeed"); + assert_eq!(op3, decoded); + } + + /// Signed digest is preserved exactly (no lossy base64 round-trip). + #[test] + fn signed_digest_preserved() { + let (_crdt, _kp, op) = make_insert_op(); + let bytes = encode(&op); + let decoded = decode(&bytes).expect("decode must succeed"); + assert_eq!(op.signed_digest, decoded.signed_digest); + } + + // ── version tag tests ───────────────────────────────────────────────────── + + /// Encoded bytes must carry the current version tag. + #[test] + fn encoded_bytes_contain_version_tag() { + let (_crdt, _kp, op) = make_insert_op(); + let bytes = encode(&op); + let text = std::str::from_utf8(&bytes).expect("output is UTF-8 JSON"); + // The envelope must include `"v":1`. + assert!( + text.contains("\"v\":1"), + "expected version tag in wire bytes, got: {text}" + ); + } + + /// Version 0 is rejected with UnknownVersion. + #[test] + fn unknown_version_zero_is_rejected() { + let payload = br#"{"v":0,"op":{}}"#; + assert_eq!(decode(payload), Err(WireError::UnknownVersion(0))); + } + + /// A future version (e.g. 99) is rejected with UnknownVersion. + #[test] + fn unknown_version_future_is_rejected() { + let payload = br#"{"v":99,"op":{}}"#; + assert_eq!(decode(payload), Err(WireError::UnknownVersion(99))); + } + + // ── malformed-bytes tests ───────────────────────────────────────────────── + + /// Completely invalid UTF-8 / non-JSON input is rejected with Malformed. + #[test] + fn malformed_random_bytes_is_rejected() { + let payload = b"\x00\x01\x02\x03garbage"; + match decode(payload) { + Err(WireError::Malformed(_)) => {} + other => panic!("expected Malformed, got {other:?}"), + } + } + + /// Valid JSON but missing the `v` field. + #[test] + fn malformed_missing_version_field() { + let payload = br#"{"op":{}}"#; + match decode(payload) { + Err(WireError::Malformed(_)) => {} + other => panic!("expected Malformed, got {other:?}"), + } + } + + /// Valid envelope version but `op` field is not a valid `SignedOp`. + #[test] + fn malformed_invalid_op_payload() { + let payload = br#"{"v":1,"op":{"totally":"wrong"}}"#; + match decode(payload) { + Err(WireError::Malformed(_)) => {} + other => panic!("expected Malformed, got {other:?}"), + } + } + + /// Empty bytes. + #[test] + fn malformed_empty_bytes() { + match decode(b"") { + Err(WireError::Malformed(_)) => {} + other => panic!("expected Malformed, got {other:?}"), + } + } + + // ── cross-node round-trip ───────────────────────────────────────────────── + + /// Encode on node A, decode on node B, apply to node B's CRDT. + #[test] + fn cross_node_apply_after_decode() { + let (_crdt_a, kp_a, op) = make_insert_op(); + + let wire = encode(&op); + + // Node B decodes and applies. + let kp_b = make_keypair(); + let mut crdt_b = BaseCrdt::::new(&kp_b); + let decoded = decode(&wire).expect("decode must succeed"); + let result = crdt_b.apply(decoded); + assert_eq!(result, OpState::Ok); + assert_eq!(crdt_b.doc.items.view().len(), 1); + + let _ = kp_a; + } + + // ── WireError display ───────────────────────────────────────────────────── + + #[test] + fn wire_error_display_unknown_version() { + let e = WireError::UnknownVersion(42); + assert_eq!(e.to_string(), "unknown wire version 42"); + } + + #[test] + fn wire_error_display_malformed() { + let e = WireError::Malformed("bad json".to_string()); + assert!(e.to_string().contains("malformed wire message")); + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 09231fe3..b3ba071f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -8,6 +8,7 @@ mod chat; mod config; pub mod crdt_state; pub mod crdt_sync; +pub mod crdt_wire; mod db; mod http; mod io;