huskies: merge 505_story_signedop_wire_codec_for_crdt_sync_between_nodes

This commit is contained in:
dave
2026-04-10 15:31:22 +00:00
parent bfede09fe6
commit 73890c98fa
2 changed files with 319 additions and 0 deletions
+318
View File
@@ -0,0 +1,318 @@
/// Wire codec for `SignedOp` CRDT sync between nodes.
///
/// # Wire Format
///
/// **Chosen format: versioned JSON envelope (v1)**
///
/// Rationale: JSON is already used throughout the codebase for `SignedOp`
/// serialisation (`serde_json` + `serde_with::Bytes` annotations). Switching
/// to a binary format (CBOR / bincode / postcard) would require adding a new
/// dependency and a custom derive macro for the `Bytes` fields. JSON keeps
/// the codec zero-dependency, human-readable in logs, and consistent with the
/// existing `crdt_sync.rs` protocol. A version tag in the envelope ensures
/// we can migrate to a binary format in a future version without breaking
/// peers that are still on v1.
///
/// Each encoded message is a JSON object:
///
/// ```json
/// {"v": 1, "op": { ...SignedOp fields... }}
/// ```
///
/// - `v` — u32 version tag. Currently `1`. Unknown values are rejected with
/// [`WireError::UnknownVersion`] so peers can fail fast instead of
/// silently misinterpreting data.
/// - `op` — the full `SignedOp` serialised via its existing `serde` impl.
/// Binary fields (`signed_digest`, `depends_on`, `id`, …) use
/// `serde_with::Bytes` so they appear as base64 strings in JSON.
///
/// # Upgrading the format
///
/// Bump `WIRE_VERSION` and add a new arm to the `match envelope.v` block in
/// `decode`. Old decoders will return `WireError::UnknownVersion(N)` for
/// the new version, which is the intended failure mode.
use bft_json_crdt::json_crdt::SignedOp;
use serde::{Deserialize, Serialize};
/// The current wire format version.
pub const WIRE_VERSION: u32 = 1;
// ── Error type ──────────────────────────────────────────────────────────────
/// Errors returned by [`decode`].
#[derive(Debug, PartialEq)]
pub enum WireError {
/// The message carries a version tag that this codec does not recognise.
/// The inner value is the version found in the message.
UnknownVersion(u32),
/// The bytes could not be parsed as a valid wire message (malformed JSON,
/// wrong structure, or the embedded `SignedOp` failed to deserialise).
Malformed(String),
}
impl std::fmt::Display for WireError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WireError::UnknownVersion(v) => write!(f, "unknown wire version {v}"),
WireError::Malformed(msg) => write!(f, "malformed wire message: {msg}"),
}
}
}
impl std::error::Error for WireError {}
// ── Internal envelope ────────────────────────────────────────────────────────
#[derive(Serialize, Deserialize)]
struct WireEnvelope {
v: u32,
op: serde_json::Value,
}
// ── Public API ───────────────────────────────────────────────────────────────
/// Encode a `SignedOp` into wire bytes.
///
/// Returns a JSON-encoded versioned envelope. Panics only if `SignedOp`'s
/// serde impl is broken (which would be a programming error, not a runtime
/// condition).
pub fn encode(op: &SignedOp) -> Vec<u8> {
let op_value = serde_json::to_value(op).expect("SignedOp serialisation must not fail");
let envelope = WireEnvelope {
v: WIRE_VERSION,
op: op_value,
};
serde_json::to_vec(&envelope).expect("WireEnvelope serialisation must not fail")
}
/// Decode wire bytes into a `SignedOp`.
///
/// Returns [`WireError::UnknownVersion`] if the version tag is not recognised,
/// or [`WireError::Malformed`] if the bytes are not valid JSON / the envelope
/// structure is wrong / the embedded op fails to deserialise.
pub fn decode(bytes: &[u8]) -> Result<SignedOp, WireError> {
let envelope: WireEnvelope =
serde_json::from_slice(bytes).map_err(|e| WireError::Malformed(e.to_string()))?;
match envelope.v {
1 => {
let op: SignedOp = serde_json::from_value(envelope.op)
.map_err(|e| WireError::Malformed(e.to_string()))?;
Ok(op)
}
unknown => Err(WireError::UnknownVersion(unknown)),
}
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use bft_json_crdt::json_crdt::{BaseCrdt, JsonValue, OpState};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
// ── helpers ──────────────────────────────────────────────────────────────
/// Build a fresh CRDT and return its keypair along with a signed insert op.
fn make_insert_op() -> (BaseCrdt<PipelineDoc>, bft_json_crdt::keypair::Ed25519KeyPair, SignedOp) {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = json!({
"story_id": "505_story_wire_test",
"stage": "1_backlog",
"name": "Wire Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt.apply(op.clone());
(crdt, kp, op)
}
// ── round-trip tests (one per op variant) ────────────────────────────────
/// Insert op: round-trip through encode → decode.
#[test]
fn roundtrip_insert_op() {
let (_crdt, _kp, op) = make_insert_op();
let bytes = encode(&op);
let decoded = decode(&bytes).expect("decode must succeed");
assert_eq!(op, decoded);
}
/// Update op (LWW register set): round-trip.
#[test]
fn roundtrip_update_op() {
let (mut crdt, kp, _insert) = make_insert_op();
let update_op = crdt.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp);
crdt.apply(update_op.clone());
let bytes = encode(&update_op);
let decoded = decode(&bytes).expect("decode must succeed");
assert_eq!(update_op, decoded);
}
/// Delete op: round-trip.
#[test]
fn roundtrip_delete_op() {
let (mut crdt, kp, insert_op) = make_insert_op();
// Delete the inserted item.
let delete_op = crdt
.doc
.items
.delete(insert_op.inner.id)
.sign(&kp);
crdt.apply(delete_op.clone());
let bytes = encode(&delete_op);
let decoded = decode(&bytes).expect("decode must succeed");
assert_eq!(delete_op, decoded);
}
/// Op with multiple causal dependencies: round-trip.
#[test]
fn roundtrip_op_with_causal_deps() {
let (mut crdt, kp, _op1) = make_insert_op();
let op2 = crdt.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp);
crdt.apply(op2.clone());
// op3 depends on both _op1 and op2 causally.
let op3 = crdt.doc.items[0]
.name
.set("Updated Name".to_string())
.sign(&kp);
crdt.apply(op3.clone());
let bytes = encode(&op3);
let decoded = decode(&bytes).expect("decode must succeed");
assert_eq!(op3, decoded);
}
/// Signed digest is preserved exactly (no lossy base64 round-trip).
#[test]
fn signed_digest_preserved() {
let (_crdt, _kp, op) = make_insert_op();
let bytes = encode(&op);
let decoded = decode(&bytes).expect("decode must succeed");
assert_eq!(op.signed_digest, decoded.signed_digest);
}
// ── version tag tests ─────────────────────────────────────────────────────
/// Encoded bytes must carry the current version tag.
#[test]
fn encoded_bytes_contain_version_tag() {
let (_crdt, _kp, op) = make_insert_op();
let bytes = encode(&op);
let text = std::str::from_utf8(&bytes).expect("output is UTF-8 JSON");
// The envelope must include `"v":1`.
assert!(
text.contains("\"v\":1"),
"expected version tag in wire bytes, got: {text}"
);
}
/// Version 0 is rejected with UnknownVersion.
#[test]
fn unknown_version_zero_is_rejected() {
let payload = br#"{"v":0,"op":{}}"#;
assert_eq!(decode(payload), Err(WireError::UnknownVersion(0)));
}
/// A future version (e.g. 99) is rejected with UnknownVersion.
#[test]
fn unknown_version_future_is_rejected() {
let payload = br#"{"v":99,"op":{}}"#;
assert_eq!(decode(payload), Err(WireError::UnknownVersion(99)));
}
// ── malformed-bytes tests ─────────────────────────────────────────────────
/// Completely invalid UTF-8 / non-JSON input is rejected with Malformed.
#[test]
fn malformed_random_bytes_is_rejected() {
let payload = b"\x00\x01\x02\x03garbage";
match decode(payload) {
Err(WireError::Malformed(_)) => {}
other => panic!("expected Malformed, got {other:?}"),
}
}
/// Valid JSON but missing the `v` field.
#[test]
fn malformed_missing_version_field() {
let payload = br#"{"op":{}}"#;
match decode(payload) {
Err(WireError::Malformed(_)) => {}
other => panic!("expected Malformed, got {other:?}"),
}
}
/// Valid envelope version but `op` field is not a valid `SignedOp`.
#[test]
fn malformed_invalid_op_payload() {
let payload = br#"{"v":1,"op":{"totally":"wrong"}}"#;
match decode(payload) {
Err(WireError::Malformed(_)) => {}
other => panic!("expected Malformed, got {other:?}"),
}
}
/// Empty bytes.
#[test]
fn malformed_empty_bytes() {
match decode(b"") {
Err(WireError::Malformed(_)) => {}
other => panic!("expected Malformed, got {other:?}"),
}
}
// ── cross-node round-trip ─────────────────────────────────────────────────
/// Encode on node A, decode on node B, apply to node B's CRDT.
#[test]
fn cross_node_apply_after_decode() {
let (_crdt_a, kp_a, op) = make_insert_op();
let wire = encode(&op);
// Node B decodes and applies.
let kp_b = make_keypair();
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
let decoded = decode(&wire).expect("decode must succeed");
let result = crdt_b.apply(decoded);
assert_eq!(result, OpState::Ok);
assert_eq!(crdt_b.doc.items.view().len(), 1);
let _ = kp_a;
}
// ── WireError display ─────────────────────────────────────────────────────
#[test]
fn wire_error_display_unknown_version() {
let e = WireError::UnknownVersion(42);
assert_eq!(e.to_string(), "unknown wire version 42");
}
#[test]
fn wire_error_display_malformed() {
let e = WireError::Malformed("bad json".to_string());
assert!(e.to_string().contains("malformed wire message"));
}
}
+1
View File
@@ -8,6 +8,7 @@ mod chat;
mod config;
pub mod crdt_state;
pub mod crdt_sync;
pub mod crdt_wire;
mod db;
mod http;
mod io;