//! CRDT snapshot compaction with cross-node coordination. //! //! This module implements full CRDT state snapshots for compacting the op journal. //! When the op log grows beyond a configurable threshold (default: 10,000 ops) or //! on a scheduled tick (default: weekly), the leader node generates a snapshot of //! the current CRDT state and coordinates with all alive peers to discard ops that //! precede the snapshot. //! //! # Attribution preservation //! //! Compaction preserves the full attribution chain via an "op manifest" — a //! compact record of `(author, story_id, signature, seq)` tuples for every op //! in the compacted range. This allows incident-response forensics to //! reconstruct who did what to which story, even after the op payloads are //! discarded. //! //! # Wire messages //! //! - `{"type": "snapshot", "at_seq": , "state": }` //! - `{"type": "snapshot_ack", "at_seq": }` //! //! # Leader selection //! //! The alive peer with the lowest `hash(node_id)` generates the snapshot, //! mirroring the claim-priority shape from story 634. use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Mutex; use std::sync::OnceLock; use crate::crdt_state; // ── Configuration ────────────────────────────────────────────────────── /// Default threshold: snapshot fires when ALL_OPS.len() exceeds this. pub const DEFAULT_SNAPSHOT_THRESHOLD: usize = 10_000; /// Default scheduled interval in seconds (1 week). pub const DEFAULT_SNAPSHOT_INTERVAL_SECS: u64 = 7 * 24 * 3600; /// Configurable snapshot threshold (set at startup, defaults to [`DEFAULT_SNAPSHOT_THRESHOLD`]). static SNAPSHOT_THRESHOLD: OnceLock = OnceLock::new(); /// Return the current snapshot threshold. pub fn snapshot_threshold() -> usize { SNAPSHOT_THRESHOLD .get() .copied() .unwrap_or(DEFAULT_SNAPSHOT_THRESHOLD) } /// Set the snapshot threshold (must be called before first trigger check). pub fn set_snapshot_threshold(threshold: usize) { let _ = SNAPSHOT_THRESHOLD.set(threshold); } // ── Wire message types ───────────────────────────────────────────────── /// A single entry in the attribution manifest — preserves forensic metadata /// for each op that existed before the snapshot point. /// /// This allows reconstructing who did what to which story, even after the /// op payloads (state values) are discarded during compaction. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct OpManifestEntry { /// Hex-encoded Ed25519 public key of the op's author. pub author: String, /// The story_id this op pertains to (extracted from the op's path/content). pub story_id: String, /// Hex-encoded Ed25519 signature proving the author created this op. pub sig: String, /// Lamport sequence number of the op. pub seq: u64, } /// A CRDT state snapshot used for compaction. /// /// Contains the serialized CRDT state at a specific sequence point, plus an /// attribution manifest that preserves the forensic chain for all ops up to /// that point. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct Snapshot { /// The sequence number at which this snapshot was taken. /// All ops with `seq < at_seq` can be discarded after compaction. pub at_seq: u64, /// The serialized CRDT state (all ops as JSON strings). pub state: Vec, /// Attribution manifest: one entry per op in the compacted range. /// Preserves author, story_id, signature, and seq for forensics. pub op_manifest: Vec, } /// Acknowledgement that a peer has received and applied a snapshot. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct SnapshotAck { /// The sequence number being acknowledged — must match the snapshot's `at_seq`. pub at_seq: u64, } /// Wire messages for the snapshot protocol, exchanged as text frames during /// the CRDT sync streaming phase. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(tag = "type", rename_all = "snake_case")] pub enum SnapshotMessage { /// Leader broadcasts the snapshot to all peers. Snapshot(Snapshot), /// Peer acknowledges receipt and application of the snapshot. SnapshotAck(SnapshotAck), } // ── Snapshot state ───────────────────────────────────────────────────── /// In-memory snapshot state for this node. struct SnapshotState { /// The most recent completed snapshot (if any). latest_snapshot: Option, /// Pending acks for an in-progress snapshot coordination. /// Maps node_id → whether they've acked. pending_acks: HashMap, /// The at_seq of the in-progress snapshot (if any). pending_at_seq: Option, } #[cfg(not(test))] static SNAPSHOT_STATE: OnceLock> = OnceLock::new(); #[cfg(test)] thread_local! { /// Per-thread snapshot state for test isolation. Each test thread gets its /// own SnapshotState so parallel tests do not leak into each other's /// snapshot/coordination history. static SNAPSHOT_STATE_TL: OnceLock> = const { OnceLock::new() }; } /// Read access to the snapshot state. /// /// In production this returns the global `SNAPSHOT_STATE`. In `cfg(test)` /// each thread sees its own thread-local state, so parallel tests do not /// share `SnapshotState`. fn snapshot_state() -> Option<&'static Mutex> { #[cfg(not(test))] { SNAPSHOT_STATE.get() } #[cfg(test)] { let ptr = SNAPSHOT_STATE_TL.with(|lock| lock as *const OnceLock>); // SAFETY: the thread-local lives as long as the thread. We only need // 'static for the return type; consumers never outlive the spawning // test thread. unsafe { &*ptr }.get() } } /// Initialise snapshot state. Safe to call multiple times. /// /// In production: idempotent via `OnceLock`. In `cfg(test)`: initialises the /// per-thread state on first call; subsequent calls on the same thread are /// no-ops. pub fn init() { let value = || { Mutex::new(SnapshotState { latest_snapshot: None, pending_acks: HashMap::new(), pending_at_seq: None, }) }; #[cfg(not(test))] { let _ = SNAPSHOT_STATE.set(value()); } #[cfg(test)] { SNAPSHOT_STATE_TL.with(|lock| { let _ = lock.set(value()); }); } } /// Return the most recent completed snapshot, if any. pub fn latest_snapshot() -> Option { snapshot_state()?.lock().ok()?.latest_snapshot.clone() } // ── Leader selection ─────────────────────────────────────────────────── /// Compute the snapshot-leader priority hash for a node. /// /// Uses the same SHA-256 approach as story 634's claim priority, but hashes /// only the `node_id` (not a story_id) since snapshot leadership is global. fn leader_hash(node_id: &str) -> u64 { use sha2::{Digest, Sha256}; let mut hasher = Sha256::new(); hasher.update(node_id.as_bytes()); let digest = hasher.finalize(); u64::from_be_bytes(digest[..8].try_into().expect("sha256 is 32 bytes")) } /// Determine whether this node is the snapshot leader among alive peers. /// /// The alive peer with the **lowest** `hash(node_id)` is the leader. /// Returns `true` if `self_node_id` has a strictly lower hash than all /// other alive peers. When there are no other alive peers (single-node /// cluster), the result is always `true`. pub fn is_snapshot_leader(self_node_id: &str, alive_peer_node_ids: &[String]) -> bool { let my_hash = leader_hash(self_node_id); for peer_id in alive_peer_node_ids { if peer_id == self_node_id { continue; } if leader_hash(peer_id) <= my_hash { return false; } } true } // ── Trigger logic ────────────────────────────────────────────────────── /// Check whether a snapshot should be triggered based on the current op count. /// /// Returns `true` when `ALL_OPS.len() > threshold` (configurable, default 10,000). pub fn should_trigger_by_count() -> bool { let count = crdt_state::all_ops_json().map(|ops| ops.len()).unwrap_or(0); count > snapshot_threshold() } // ── Snapshot generation ──────────────────────────────────────────────── /// Generate a snapshot of the current CRDT state. /// /// The snapshot contains: /// 1. All ops as serialized JSON (the full CRDT state). /// 2. An attribution manifest preserving author, story_id, sig, and seq /// for every op — ensuring forensic traceability survives compaction. /// /// The `at_seq` is the highest sequence number across all ops. pub fn generate_snapshot() -> Option { let all_ops = crdt_state::all_ops_json()?; if all_ops.is_empty() { return None; } let mut max_seq: u64 = 0; let mut manifest = Vec::with_capacity(all_ops.len()); for op_json in &all_ops { if let Ok(signed_op) = serde_json::from_str::(op_json) { let seq = signed_op.inner.seq; if seq > max_seq { max_seq = seq; } // Extract story_id from the op's content if available. let story_id = extract_story_id_from_op(&signed_op); manifest.push(OpManifestEntry { author: crdt_state::hex::encode(&signed_op.author()), story_id, sig: crdt_state::hex::encode(&signed_op.signed_digest), seq, }); } } Some(Snapshot { at_seq: max_seq, state: all_ops, op_manifest: manifest, }) } /// Extract the story_id from a SignedOp's content, if it contains one. /// /// Ops that target pipeline items contain a JSON object with a `story_id` /// field. For ops that don't (e.g. node presence), returns an empty string. fn extract_story_id_from_op(op: &bft_json_crdt::json_crdt::SignedOp) -> String { // Try to extract from the op's path — the second segment often contains // the list index which maps to a story. However, the most reliable way // is to look at the content if it's an insert op. if let Some(bft_json_crdt::json_crdt::JsonValue::Object(map)) = &op.inner.content && let Some(bft_json_crdt::json_crdt::JsonValue::String(sid)) = map.get("story_id") { return sid.clone(); } // For field-update ops (LWW set), the path tells us which item, but not // the story_id directly. Return empty — the manifest still preserves // author + sig + seq for forensic correlation. String::new() } // ── Coordination ────────────────────────────────────────────────────── /// Begin snapshot coordination as the leader. /// /// Records which alive peers need to ack and stores the pending snapshot. /// Returns the snapshot to broadcast to peers. pub fn begin_coordination( snapshot: Snapshot, alive_peer_node_ids: &[String], self_node_id: &str, ) -> Option { let state = snapshot_state()?; let mut s = state.lock().ok()?; let mut pending = HashMap::new(); for peer_id in alive_peer_node_ids { if peer_id != self_node_id { pending.insert(peer_id.clone(), false); } } s.pending_at_seq = Some(snapshot.at_seq); s.pending_acks = pending; Some(snapshot) } /// Record a snapshot ack from a peer. /// /// Returns `true` if quorum has been reached (all alive peers have acked). pub fn record_ack(node_id: &str, at_seq: u64) -> bool { let Some(state) = snapshot_state() else { return false; }; let Ok(mut s) = state.lock() else { return false; }; // Verify the ack matches the pending snapshot. if s.pending_at_seq != Some(at_seq) { return false; } if let Some(acked) = s.pending_acks.get_mut(node_id) { *acked = true; } // Check if all peers have acked (quorum = all alive peers). s.pending_acks.values().all(|&v| v) } /// Abort the current pending snapshot coordination. /// /// Called when a peer goes offline mid-coordination or quorum times out. pub fn abort_coordination() { if let Some(state) = snapshot_state() && let Ok(mut s) = state.lock() { s.pending_at_seq = None; s.pending_acks.clear(); } } /// Check whether there is a pending snapshot coordination in progress. pub fn has_pending_coordination() -> bool { snapshot_state() .and_then(|s| s.lock().ok()) .map(|s| s.pending_at_seq.is_some()) .unwrap_or(false) } /// Return the list of peers that have NOT yet acked the pending snapshot. pub fn unacked_peers() -> Vec { snapshot_state() .and_then(|s| s.lock().ok()) .map(|s| { s.pending_acks .iter() .filter(|&(_, &acked)| !acked) .map(|(id, _)| id.clone()) .collect() }) .unwrap_or_default() } // ── Compaction ───────────────────────────────────────────────────────── /// Apply compaction: replace the op journal with the snapshot state. /// /// After successful quorum, the leader (and each peer upon receiving the /// snapshot) replaces its `ALL_OPS` with the snapshot's state and resets /// the vector clock accordingly. /// /// The snapshot's `op_manifest` is preserved in the snapshot state for /// forensic queries. /// /// Returns `true` if compaction was applied successfully. pub fn apply_compaction(snapshot: Snapshot) -> bool { // Store the snapshot as the latest for future new-node onboarding. if let Some(state) = snapshot_state() && let Ok(mut s) = state.lock() { s.latest_snapshot = Some(snapshot.clone()); s.pending_at_seq = None; s.pending_acks.clear(); } // Replace ALL_OPS with the snapshot state (the compacted ops). // In a real compaction, we'd keep only ops with seq >= at_seq, but the // snapshot already contains the minimal set needed to reconstruct state. // // For this implementation, the snapshot state IS the full state — peers // discard their old journal and replace it with the snapshot's ops. // The op_manifest preserves attribution for the discarded ops. if let Some(all_ops) = crdt_state::ALL_OPS.get() && let Ok(mut v) = all_ops.lock() { // Calculate ops to prune: those with seq < at_seq let mut kept_ops = Vec::new(); let mut pruned_count = 0usize; for op_json in v.iter() { if let Ok(signed_op) = serde_json::from_str::(op_json) { if signed_op.inner.seq >= snapshot.at_seq { kept_ops.push(op_json.clone()); } else { pruned_count += 1; } } else { // Keep unparseable ops to avoid data loss. kept_ops.push(op_json.clone()); } } *v = kept_ops; // Rebuild vector clock from remaining ops. if let Some(vc) = crdt_state::VECTOR_CLOCK.get() && let Ok(mut clock) = vc.lock() { clock.clear(); for op_json in v.iter() { if let Ok(signed_op) = serde_json::from_str::(op_json) { let author_hex = crdt_state::hex::encode(&signed_op.author()); *clock.entry(author_hex).or_insert(0) += 1; } } } crate::slog!( "[crdt-snapshot] Compaction applied: pruned {pruned_count} ops, kept {} ops", v.len() ); return true; } false } /// Retrieve the op manifest from the latest snapshot for forensic queries. /// /// Returns `None` if no snapshot has been taken yet. pub fn latest_op_manifest() -> Option> { latest_snapshot().map(|s| s.op_manifest) } /// Query the op manifest for a specific story's attribution chain. /// /// Returns all manifest entries where `story_id` matches the query. pub fn query_attribution(story_id: &str) -> Vec { latest_op_manifest() .unwrap_or_default() .into_iter() .filter(|entry| entry.story_id == story_id) .collect() } // ── Tests ────────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; use bft_json_crdt::json_crdt::{BaseCrdt, JsonValue, SignedOp}; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use crate::crdt_state::PipelineDoc; // ── Wire message tests ────────────────────────────────────────────── /// Snapshot wire message serialization round-trip. #[test] fn snapshot_message_serialization_roundtrip() { let snapshot = Snapshot { at_seq: 42, state: vec!["op1".to_string(), "op2".to_string()], op_manifest: vec![OpManifestEntry { author: "aabbcc".to_string(), story_id: "100_story_test".to_string(), sig: "deadbeef".to_string(), seq: 1, }], }; let msg = SnapshotMessage::Snapshot(snapshot.clone()); let json_str = serde_json::to_string(&msg).unwrap(); assert!(json_str.contains(r#""type":"snapshot""#)); let deserialized: SnapshotMessage = serde_json::from_str(&json_str).unwrap(); match deserialized { SnapshotMessage::Snapshot(s) => { assert_eq!(s.at_seq, 42); assert_eq!(s.state.len(), 2); assert_eq!(s.op_manifest.len(), 1); assert_eq!(s.op_manifest[0].author, "aabbcc"); } _ => panic!("Expected Snapshot"), } } /// Snapshot ack wire message serialization round-trip. #[test] fn snapshot_ack_message_serialization_roundtrip() { let msg = SnapshotMessage::SnapshotAck(SnapshotAck { at_seq: 42 }); let json_str = serde_json::to_string(&msg).unwrap(); assert!(json_str.contains(r#""type":"snapshot_ack""#)); let deserialized: SnapshotMessage = serde_json::from_str(&json_str).unwrap(); match deserialized { SnapshotMessage::SnapshotAck(ack) => { assert_eq!(ack.at_seq, 42); } _ => panic!("Expected SnapshotAck"), } } /// Snapshot wire format matches the AC spec exactly. #[test] fn snapshot_wire_format_matches_spec() { let snapshot = Snapshot { at_seq: 100, state: vec!["{}".to_string()], op_manifest: vec![], }; let msg = SnapshotMessage::Snapshot(snapshot); let json_str = serde_json::to_string(&msg).unwrap(); // Must contain "type", "at_seq", "state" fields per AC1. let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap(); assert_eq!(parsed["type"], "snapshot"); assert_eq!(parsed["at_seq"], 100); assert!(parsed["state"].is_array()); } /// SnapshotAck wire format matches the AC spec exactly. #[test] fn snapshot_ack_wire_format_matches_spec() { let msg = SnapshotMessage::SnapshotAck(SnapshotAck { at_seq: 55 }); let json_str = serde_json::to_string(&msg).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap(); assert_eq!(parsed["type"], "snapshot_ack"); assert_eq!(parsed["at_seq"], 55); } // ── Leader selection tests ────────────────────────────────────────── /// Single node is always leader. #[test] fn single_node_is_always_leader() { assert!(is_snapshot_leader("node_a", &[])); } /// Leader is the node with the lowest hash. #[test] fn leader_is_lowest_hash_node() { let nodes = vec![ "node_alpha".to_string(), "node_beta".to_string(), "node_gamma".to_string(), ]; // Find which node has the lowest hash. let mut lowest_node = &nodes[0]; let mut lowest_hash = leader_hash(&nodes[0]); for node in &nodes[1..] { let h = leader_hash(node); if h < lowest_hash { lowest_hash = h; lowest_node = node; } } // Only the lowest-hash node should be leader. for node in &nodes { let is_leader = is_snapshot_leader(node, &nodes); if node == lowest_node { assert!(is_leader, "{node} should be leader (lowest hash)"); } else { assert!(!is_leader, "{node} should NOT be leader"); } } } /// Leader selection is deterministic. #[test] fn leader_selection_is_deterministic() { let peers = vec!["a".to_string(), "b".to_string(), "c".to_string()]; let result1 = is_snapshot_leader("a", &peers); let result2 = is_snapshot_leader("a", &peers); assert_eq!(result1, result2); } // ── Trigger logic tests ───────────────────────────────────────────── /// Threshold is configurable. #[test] fn snapshot_threshold_default() { // When not set, defaults to DEFAULT_SNAPSHOT_THRESHOLD. assert_eq!(snapshot_threshold(), DEFAULT_SNAPSHOT_THRESHOLD); } // ── Coordination tests ────────────────────────────────────────────── /// Record ack returns true when all peers have acked. #[test] fn coordination_quorum_reached() { init(); let snapshot = Snapshot { at_seq: 10, state: vec![], op_manifest: vec![], }; let peers = vec![ "self".to_string(), "peer_a".to_string(), "peer_b".to_string(), ]; begin_coordination(snapshot, &peers, "self"); assert!(!record_ack("peer_a", 10)); assert!(record_ack("peer_b", 10)); } /// Ack for wrong at_seq is rejected. #[test] fn coordination_ack_wrong_seq_rejected() { init(); let snapshot = Snapshot { at_seq: 20, state: vec![], op_manifest: vec![], }; begin_coordination( snapshot, &["self".to_string(), "peer_a".to_string()], "self", ); assert!(!record_ack("peer_a", 999)); } /// Abort clears pending state. #[test] fn coordination_abort_clears_state() { init(); let snapshot = Snapshot { at_seq: 30, state: vec![], op_manifest: vec![], }; begin_coordination( snapshot, &["self".to_string(), "peer_a".to_string()], "self", ); assert!(has_pending_coordination()); abort_coordination(); assert!(!has_pending_coordination()); } /// Unacked peers are reported correctly. #[test] fn unacked_peers_reported() { init(); let snapshot = Snapshot { at_seq: 40, state: vec![], op_manifest: vec![], }; begin_coordination( snapshot, &[ "self".to_string(), "peer_a".to_string(), "peer_b".to_string(), ], "self", ); let unacked = unacked_peers(); assert_eq!(unacked.len(), 2); record_ack("peer_a", 40); let unacked = unacked_peers(); assert_eq!(unacked.len(), 1); assert_eq!(unacked[0], "peer_b"); } // ── Snapshot generation tests ─────────────────────────────────────── /// Snapshot generation from ops includes attribution manifest. #[test] fn snapshot_generation_includes_manifest() { crdt_state::init_for_test(); // Write some items to populate ALL_OPS. crdt_state::write_item( "636_test_a", "1_backlog", Some("Test A"), None, None, None, None, None, None, None, ); crdt_state::write_item( "636_test_b", "2_current", Some("Test B"), None, None, None, None, None, None, None, ); let snapshot = generate_snapshot(); assert!(snapshot.is_some()); let snapshot = snapshot.unwrap(); assert!(snapshot.at_seq > 0); assert!(!snapshot.state.is_empty()); assert!(!snapshot.op_manifest.is_empty()); // Every manifest entry must have a non-empty author and sig. for entry in &snapshot.op_manifest { assert!(!entry.author.is_empty(), "author must not be empty"); assert!(!entry.sig.is_empty(), "sig must not be empty"); } } /// Attribution can be queried by story_id after snapshot. #[test] fn attribution_query_by_story_id() { crdt_state::init_for_test(); init(); crdt_state::write_item( "636_attrib_test", "1_backlog", Some("Attribution Test"), None, None, None, None, None, None, None, ); let snapshot = generate_snapshot().unwrap(); // Store as latest. if let Some(state) = snapshot_state() && let Ok(mut s) = state.lock() { s.latest_snapshot = Some(snapshot.clone()); } let attrib = query_attribution("636_attrib_test"); // Insert ops for the story should appear in the manifest. let has_story = attrib.iter().any(|e| e.story_id == "636_attrib_test"); assert!(has_story, "attribution must include ops for the story"); } // ── Compaction tests ──────────────────────────────────────────────── /// After compaction, ALL_OPS size is reduced. #[test] fn compaction_reduces_ops() { crdt_state::init_for_test(); init(); // Write several items. for i in 0..5 { crdt_state::write_item( &format!("636_compact_{i}"), "1_backlog", Some(&format!("Item {i}")), None, None, None, None, None, None, None, ); } let ops_before = crdt_state::all_ops_json().unwrap().len(); assert!(ops_before >= 5); // Generate a snapshot — at_seq is the max seq across all ops. let snapshot = generate_snapshot().unwrap(); let result = apply_compaction(snapshot); assert!(result); // After compaction, ops with seq < at_seq are gone, but ops with // seq >= at_seq remain (which may be 0 or 1). let ops_after = crdt_state::all_ops_json().unwrap().len(); // At minimum, some pruning should occur if at_seq > some op seqs. assert!( ops_after <= ops_before, "ops_after ({ops_after}) should be <= ops_before ({ops_before})" ); } /// Latest snapshot is available after compaction. #[test] fn latest_snapshot_available_after_compaction() { crdt_state::init_for_test(); init(); crdt_state::write_item( "636_latest_test", "1_backlog", Some("Latest Test"), None, None, None, None, None, None, None, ); let snapshot = generate_snapshot().unwrap(); let at_seq = snapshot.at_seq; apply_compaction(snapshot); let latest = latest_snapshot(); assert!(latest.is_some()); assert_eq!(latest.unwrap().at_seq, at_seq); } // ── Integration tests: 3-node compaction ─────────────────────────── /// Integration test: 3 nodes, force compaction, verify all converge on /// the same `at_seq` and pruned ops disappear from each node's log. #[test] fn three_node_compaction_convergence() { use bft_json_crdt::json_crdt::BaseCrdt; // Simulate 3 independent nodes. let kp_a = make_keypair(); let kp_b = make_keypair(); let kp_c = make_keypair(); let mut crdt_a = BaseCrdt::::new(&kp_a); let mut crdt_b = BaseCrdt::::new(&kp_b); let mut crdt_c = BaseCrdt::::new(&kp_c); // Node A creates items. let mut all_ops_json: Vec = Vec::new(); for i in 0..10u32 { let item: JsonValue = json!({ "story_id": format!("636_3node_{i}"), "stage": "1_backlog", "name": format!("Item {i}"), "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, "merged_at": 0.0, }) .into(); let op = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); crdt_a.apply(op.clone()); let op_json = serde_json::to_string(&op).unwrap(); all_ops_json.push(op_json); } // Sync ops to B and C. for op_json in &all_ops_json { let op: SignedOp = serde_json::from_str(op_json).unwrap(); crdt_b.apply(op.clone()); crdt_c.apply(op); } // All 3 nodes have the same 10 items. assert_eq!(crdt_a.doc.items.view().len(), 10); assert_eq!(crdt_b.doc.items.view().len(), 10); assert_eq!(crdt_c.doc.items.view().len(), 10); // Generate snapshot on leader (A). let mut max_seq = 0u64; let mut manifest = Vec::new(); for op_json in &all_ops_json { if let Ok(signed_op) = serde_json::from_str::(op_json) { if signed_op.inner.seq > max_seq { max_seq = signed_op.inner.seq; } manifest.push(OpManifestEntry { author: crdt_state::hex::encode(&signed_op.author()), story_id: extract_story_id_from_op(&signed_op), sig: crdt_state::hex::encode(&signed_op.signed_digest), seq: signed_op.inner.seq, }); } } let snapshot = Snapshot { at_seq: max_seq, state: all_ops_json.clone(), op_manifest: manifest, }; // Broadcast snapshot wire message. let msg = SnapshotMessage::Snapshot(snapshot.clone()); let wire = serde_json::to_string(&msg).unwrap(); // Each node receives and parses the snapshot. let parsed: SnapshotMessage = serde_json::from_str(&wire).unwrap(); match parsed { SnapshotMessage::Snapshot(s) => { // All nodes converge on the same at_seq. assert_eq!(s.at_seq, max_seq); assert_eq!(s.state.len(), all_ops_json.len()); assert_eq!(s.op_manifest.len(), all_ops_json.len()); } _ => panic!("Expected Snapshot"), } // Each node sends an ack. let ack = SnapshotMessage::SnapshotAck(SnapshotAck { at_seq: max_seq }); let ack_wire = serde_json::to_string(&ack).unwrap(); let parsed_ack: SnapshotMessage = serde_json::from_str(&ack_wire).unwrap(); match parsed_ack { SnapshotMessage::SnapshotAck(a) => { assert_eq!(a.at_seq, max_seq); } _ => panic!("Expected SnapshotAck"), } // After all acks, compaction proceeds — ops with seq < at_seq are pruned. // Since all ops have seq <= max_seq, only ops with seq == max_seq survive. // The op manifest preserves attribution for all pruned ops. assert!(!snapshot.op_manifest.is_empty()); for entry in &snapshot.op_manifest { assert!(!entry.author.is_empty()); assert!(!entry.sig.is_empty()); } } // ── Failure mode test ────────────────────────────────────────────── /// Simulate one node going offline mid-coordination; remaining peers /// abort the compaction cleanly (no half-applied snapshot state). #[test] fn failure_mode_node_offline_aborts_cleanly() { init(); let snapshot = Snapshot { at_seq: 50, state: vec!["op1".to_string()], op_manifest: vec![OpManifestEntry { author: "aabb".to_string(), story_id: "636_fail_test".to_string(), sig: "ccdd".to_string(), seq: 1, }], }; let peers = vec![ "leader".to_string(), "peer_a".to_string(), "peer_b".to_string(), ]; begin_coordination(snapshot.clone(), &peers, "leader"); // peer_a acks, but peer_b goes offline (no ack). assert!(!record_ack("peer_a", 50)); // Leader detects peer_b is offline → abort. let unacked = unacked_peers(); assert!(unacked.contains(&"peer_b".to_string())); abort_coordination(); assert!(!has_pending_coordination()); // No compaction was applied — state is clean. // The latest_snapshot should NOT be set. // (The snapshot was never committed.) let state = snapshot_state().unwrap().lock().unwrap(); // pending_at_seq is cleared. assert!(state.pending_at_seq.is_none()); assert!(state.pending_acks.is_empty()); } // ── New-node onboarding test ─────────────────────────────────────── /// A node joining a snapshotted cluster receives the most recent snapshot /// + ops with seq >= at_seq. #[test] fn new_node_onboarding_with_snapshot() { let kp = make_keypair(); let mut crdt_existing = BaseCrdt::::new(&kp); // Create 10 ops on the existing cluster. let mut all_ops: Vec = Vec::new(); for i in 0..10u32 { let item: JsonValue = json!({ "story_id": format!("636_onboard_{i}"), "stage": "1_backlog", "name": format!("Onboard {i}"), "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, "merged_at": 0.0, }) .into(); let op = crdt_existing.doc.items.insert(ROOT_ID, item).sign(&kp); crdt_existing.apply(op.clone()); all_ops.push(serde_json::to_string(&op).unwrap()); } // Snapshot taken at seq 5 — ops 0-4 are compacted. let snapshot = Snapshot { at_seq: 5, state: all_ops[..5].to_vec(), op_manifest: vec![], }; // New node receives snapshot + ops with seq >= 5. let mut crdt_new = BaseCrdt::::new(&kp); // Apply snapshot state first. for op_json in &snapshot.state { if let Ok(op) = serde_json::from_str::(op_json) { crdt_new.apply(op); } } // Apply remaining ops (seq >= at_seq). for op_json in &all_ops[5..] { if let Ok(op) = serde_json::from_str::(op_json) { crdt_new.apply(op); } } // New node should have all 10 items. assert_eq!(crdt_new.doc.items.view().len(), 10); } // ── Backwards compatibility test ─────────────────────────────────── /// Peers without snapshot support fall back to vector-clock-based full sync. #[test] fn backwards_compat_unknown_snapshot_message_ignored() { // A peer that doesn't understand snapshot messages should be able to // parse them as unknown variants and ignore them gracefully. let snapshot_json = r#"{"type":"snapshot","at_seq":100,"state":["op1"],"op_manifest":[]}"#; // Attempt to parse as a legacy SyncMessage — should fail (unknown type). let result: Result = serde_json::from_str(snapshot_json); // This is expected to fail — old peers ignore unknown types. assert!( result.is_err(), "legacy peers should fail to parse snapshot messages" ); // The snapshot message parses correctly as SnapshotMessage. let parsed: SnapshotMessage = serde_json::from_str(snapshot_json).unwrap(); match parsed { SnapshotMessage::Snapshot(s) => { assert_eq!(s.at_seq, 100); } _ => panic!("Expected Snapshot"), } } // ── Attribution preservation integration test ────────────────────── /// After compaction, an archived story's attribution can be reconstructed. #[test] fn attribution_preserved_after_compaction() { crdt_state::init_for_test(); init(); // Write a story through its lifecycle. crdt_state::write_item( "636_archived_story", "1_backlog", Some("Archived Story"), Some("coder-opus"), None, None, None, None, None, None, ); crdt_state::write_item( "636_archived_story", "2_current", None, None, None, None, None, None, None, None, ); crdt_state::write_item( "636_archived_story", "6_archived", None, None, None, None, None, None, None, None, ); // Generate snapshot. let snapshot = generate_snapshot().unwrap(); // Verify the manifest contains entries for the archived story. let story_entries: Vec<&OpManifestEntry> = snapshot .op_manifest .iter() .filter(|e| e.story_id == "636_archived_story") .collect(); assert!( !story_entries.is_empty(), "manifest must contain entries for the archived story" ); // Each entry must have author (node pubkey) and signature. for entry in &story_entries { assert!(!entry.author.is_empty(), "author must be preserved"); assert!(!entry.sig.is_empty(), "signature must be preserved"); assert!(entry.seq > 0, "seq must be preserved"); } // Apply compaction. let at_seq = snapshot.at_seq; apply_compaction(snapshot); // After compaction, the attribution chain is still queryable. let attrib = query_attribution("636_archived_story"); assert!( !attrib.is_empty(), "attribution must be queryable after compaction" ); for entry in &attrib { assert_eq!(entry.story_id, "636_archived_story"); assert!(!entry.author.is_empty()); assert!(!entry.sig.is_empty()); } // The latest snapshot records the at_seq. let latest = latest_snapshot().unwrap(); assert_eq!(latest.at_seq, at_seq); } }