//! Tests for CRDT snapshot compaction and cross-node coordination. use super::*; use bft_json_crdt::json_crdt::{BaseCrdt, JsonValue, SignedOp}; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use crate::crdt_state::PipelineDoc; // ── Wire message tests ────────────────────────────────────────────── /// Snapshot wire message serialization round-trip. #[test] fn snapshot_message_serialization_roundtrip() { let snapshot = Snapshot { at_seq: 42, state: vec!["op1".to_string(), "op2".to_string()], op_manifest: vec![OpManifestEntry { author: "aabbcc".to_string(), story_id: "100_story_test".to_string(), sig: "deadbeef".to_string(), seq: 1, }], }; let msg = SnapshotMessage::Snapshot(snapshot.clone()); let json_str = serde_json::to_string(&msg).unwrap(); assert!(json_str.contains(r#""type":"snapshot""#)); let deserialized: SnapshotMessage = serde_json::from_str(&json_str).unwrap(); match deserialized { SnapshotMessage::Snapshot(s) => { assert_eq!(s.at_seq, 42); assert_eq!(s.state.len(), 2); assert_eq!(s.op_manifest.len(), 1); assert_eq!(s.op_manifest[0].author, "aabbcc"); } _ => panic!("Expected Snapshot"), } } /// Snapshot ack wire message serialization round-trip. #[test] fn snapshot_ack_message_serialization_roundtrip() { let msg = SnapshotMessage::SnapshotAck(SnapshotAck { at_seq: 42 }); let json_str = serde_json::to_string(&msg).unwrap(); assert!(json_str.contains(r#""type":"snapshot_ack""#)); let deserialized: SnapshotMessage = serde_json::from_str(&json_str).unwrap(); match deserialized { SnapshotMessage::SnapshotAck(ack) => { assert_eq!(ack.at_seq, 42); } _ => panic!("Expected SnapshotAck"), } } /// Snapshot wire format matches the AC spec exactly. #[test] fn snapshot_wire_format_matches_spec() { let snapshot = Snapshot { at_seq: 100, state: vec!["{}".to_string()], op_manifest: vec![], }; let msg = SnapshotMessage::Snapshot(snapshot); let json_str = serde_json::to_string(&msg).unwrap(); // Must contain "type", "at_seq", "state" fields per AC1. let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap(); assert_eq!(parsed["type"], "snapshot"); assert_eq!(parsed["at_seq"], 100); assert!(parsed["state"].is_array()); } /// SnapshotAck wire format matches the AC spec exactly. #[test] fn snapshot_ack_wire_format_matches_spec() { let msg = SnapshotMessage::SnapshotAck(SnapshotAck { at_seq: 55 }); let json_str = serde_json::to_string(&msg).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap(); assert_eq!(parsed["type"], "snapshot_ack"); assert_eq!(parsed["at_seq"], 55); } // ── Leader selection tests ────────────────────────────────────────── /// Single node is always leader. #[test] fn single_node_is_always_leader() { assert!(is_snapshot_leader("node_a", &[])); } /// Leader is the node with the lowest hash. #[test] fn leader_is_lowest_hash_node() { let nodes = vec![ "node_alpha".to_string(), "node_beta".to_string(), "node_gamma".to_string(), ]; // Find which node has the lowest hash. let mut lowest_node = &nodes[0]; let mut lowest_hash = leader_hash(&nodes[0]); for node in &nodes[1..] { let h = leader_hash(node); if h < lowest_hash { lowest_hash = h; lowest_node = node; } } // Only the lowest-hash node should be leader. for node in &nodes { let is_leader = is_snapshot_leader(node, &nodes); if node == lowest_node { assert!(is_leader, "{node} should be leader (lowest hash)"); } else { assert!(!is_leader, "{node} should NOT be leader"); } } } /// Leader selection is deterministic. #[test] fn leader_selection_is_deterministic() { let peers = vec!["a".to_string(), "b".to_string(), "c".to_string()]; let result1 = is_snapshot_leader("a", &peers); let result2 = is_snapshot_leader("a", &peers); assert_eq!(result1, result2); } // ── Trigger logic tests ───────────────────────────────────────────── /// Threshold is configurable. #[test] fn snapshot_threshold_default() { // When not set, defaults to DEFAULT_SNAPSHOT_THRESHOLD. assert_eq!(snapshot_threshold(), DEFAULT_SNAPSHOT_THRESHOLD); } // ── Coordination tests ────────────────────────────────────────────── /// Record ack returns true when all peers have acked. #[test] fn coordination_quorum_reached() { init(); let snapshot = Snapshot { at_seq: 10, state: vec![], op_manifest: vec![], }; let peers = vec![ "self".to_string(), "peer_a".to_string(), "peer_b".to_string(), ]; begin_coordination(snapshot, &peers, "self"); assert!(!record_ack("peer_a", 10)); assert!(record_ack("peer_b", 10)); } /// Ack for wrong at_seq is rejected. #[test] fn coordination_ack_wrong_seq_rejected() { init(); let snapshot = Snapshot { at_seq: 20, state: vec![], op_manifest: vec![], }; begin_coordination( snapshot, &["self".to_string(), "peer_a".to_string()], "self", ); assert!(!record_ack("peer_a", 999)); } /// Abort clears pending state. #[test] fn coordination_abort_clears_state() { init(); let snapshot = Snapshot { at_seq: 30, state: vec![], op_manifest: vec![], }; begin_coordination( snapshot, &["self".to_string(), "peer_a".to_string()], "self", ); assert!(has_pending_coordination()); abort_coordination(); assert!(!has_pending_coordination()); } /// Unacked peers are reported correctly. #[test] fn unacked_peers_reported() { init(); let snapshot = Snapshot { at_seq: 40, state: vec![], op_manifest: vec![], }; begin_coordination( snapshot, &[ "self".to_string(), "peer_a".to_string(), "peer_b".to_string(), ], "self", ); let unacked = unacked_peers(); assert_eq!(unacked.len(), 2); record_ack("peer_a", 40); let unacked = unacked_peers(); assert_eq!(unacked.len(), 1); assert_eq!(unacked[0], "peer_b"); } // ── Snapshot generation tests ─────────────────────────────────────── /// Snapshot generation from ops includes attribution manifest. #[test] fn snapshot_generation_includes_manifest() { crate::crdt_state::init_for_test(); // Write some items to populate ALL_OPS. crate::crdt_state::write_item( "636_test_a", "1_backlog", Some("Test A"), None, None, None, None, None, None, None, ); crate::crdt_state::write_item( "636_test_b", "2_current", Some("Test B"), None, None, None, None, None, None, None, ); let snapshot = generate_snapshot(); assert!(snapshot.is_some()); let snapshot = snapshot.unwrap(); assert!(snapshot.at_seq > 0); assert!(!snapshot.state.is_empty()); assert!(!snapshot.op_manifest.is_empty()); // Every manifest entry must have a non-empty author and sig. for entry in &snapshot.op_manifest { assert!(!entry.author.is_empty(), "author must not be empty"); assert!(!entry.sig.is_empty(), "sig must not be empty"); } } /// Attribution can be queried by story_id after snapshot. #[test] fn attribution_query_by_story_id() { crate::crdt_state::init_for_test(); init(); crate::crdt_state::write_item( "636_attrib_test", "1_backlog", Some("Attribution Test"), None, None, None, None, None, None, None, ); let snapshot = generate_snapshot().unwrap(); // Store as latest. if let Some(state) = snapshot_state() && let Ok(mut s) = state.lock() { s.latest_snapshot = Some(snapshot.clone()); } let attrib = query_attribution("636_attrib_test"); // Insert ops for the story should appear in the manifest. let has_story = attrib.iter().any(|e| e.story_id == "636_attrib_test"); assert!(has_story, "attribution must include ops for the story"); } // ── Compaction tests ──────────────────────────────────────────────── /// After compaction, ALL_OPS size is reduced. #[test] fn compaction_reduces_ops() { crate::crdt_state::init_for_test(); init(); // Write several items. for i in 0..5 { crate::crdt_state::write_item( &format!("636_compact_{i}"), "1_backlog", Some(&format!("Item {i}")), None, None, None, None, None, None, None, ); } let ops_before = crate::crdt_state::all_ops_json().unwrap().len(); assert!(ops_before >= 5); // Generate a snapshot — at_seq is the max seq across all ops. let snapshot = generate_snapshot().unwrap(); let result = apply_compaction(snapshot); assert!(result); // After compaction, ops with seq < at_seq are gone, but ops with // seq >= at_seq remain (which may be 0 or 1). let ops_after = crate::crdt_state::all_ops_json().unwrap().len(); // At minimum, some pruning should occur if at_seq > some op seqs. assert!( ops_after <= ops_before, "ops_after ({ops_after}) should be <= ops_before ({ops_before})" ); } /// Latest snapshot is available after compaction. #[test] fn latest_snapshot_available_after_compaction() { crate::crdt_state::init_for_test(); init(); crate::crdt_state::write_item( "636_latest_test", "1_backlog", Some("Latest Test"), None, None, None, None, None, None, None, ); let snapshot = generate_snapshot().unwrap(); let at_seq = snapshot.at_seq; apply_compaction(snapshot); let latest = latest_snapshot(); assert!(latest.is_some()); assert_eq!(latest.unwrap().at_seq, at_seq); } // ── Integration tests: 3-node compaction ─────────────────────────── /// Integration test: 3 nodes, force compaction, verify all converge on /// the same `at_seq` and pruned ops disappear from each node's log. #[test] fn three_node_compaction_convergence() { use bft_json_crdt::json_crdt::BaseCrdt; // Simulate 3 independent nodes. let kp_a = make_keypair(); let kp_b = make_keypair(); let kp_c = make_keypair(); let mut crdt_a = BaseCrdt::::new(&kp_a); let mut crdt_b = BaseCrdt::::new(&kp_b); let mut crdt_c = BaseCrdt::::new(&kp_c); // Node A creates items. let mut all_ops_json: Vec = Vec::new(); for i in 0..10u32 { let item: JsonValue = json!({ "story_id": format!("636_3node_{i}"), "stage": "1_backlog", "name": format!("Item {i}"), "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, "merged_at": 0.0, }) .into(); let op = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); crdt_a.apply(op.clone()); let op_json = serde_json::to_string(&op).unwrap(); all_ops_json.push(op_json); } // Sync ops to B and C. for op_json in &all_ops_json { let op: SignedOp = serde_json::from_str(op_json).unwrap(); crdt_b.apply(op.clone()); crdt_c.apply(op); } // All 3 nodes have the same 10 items. assert_eq!(crdt_a.doc.items.view().len(), 10); assert_eq!(crdt_b.doc.items.view().len(), 10); assert_eq!(crdt_c.doc.items.view().len(), 10); // Generate snapshot on leader (A). let mut max_seq = 0u64; let mut manifest = Vec::new(); for op_json in &all_ops_json { if let Ok(signed_op) = serde_json::from_str::(op_json) { if signed_op.inner.seq > max_seq { max_seq = signed_op.inner.seq; } manifest.push(OpManifestEntry { author: crate::crdt_state::hex::encode(&signed_op.author()), story_id: extract_story_id_from_op(&signed_op), sig: crate::crdt_state::hex::encode(&signed_op.signed_digest), seq: signed_op.inner.seq, }); } } let snapshot = Snapshot { at_seq: max_seq, state: all_ops_json.clone(), op_manifest: manifest, }; // Broadcast snapshot wire message. let msg = SnapshotMessage::Snapshot(snapshot.clone()); let wire = serde_json::to_string(&msg).unwrap(); // Each node receives and parses the snapshot. let parsed: SnapshotMessage = serde_json::from_str(&wire).unwrap(); match parsed { SnapshotMessage::Snapshot(s) => { // All nodes converge on the same at_seq. assert_eq!(s.at_seq, max_seq); assert_eq!(s.state.len(), all_ops_json.len()); assert_eq!(s.op_manifest.len(), all_ops_json.len()); } _ => panic!("Expected Snapshot"), } // Each node sends an ack. let ack = SnapshotMessage::SnapshotAck(SnapshotAck { at_seq: max_seq }); let ack_wire = serde_json::to_string(&ack).unwrap(); let parsed_ack: SnapshotMessage = serde_json::from_str(&ack_wire).unwrap(); match parsed_ack { SnapshotMessage::SnapshotAck(a) => { assert_eq!(a.at_seq, max_seq); } _ => panic!("Expected SnapshotAck"), } // After all acks, compaction proceeds — ops with seq < at_seq are pruned. // Since all ops have seq <= max_seq, only ops with seq == max_seq survive. // The op manifest preserves attribution for all pruned ops. assert!(!snapshot.op_manifest.is_empty()); for entry in &snapshot.op_manifest { assert!(!entry.author.is_empty()); assert!(!entry.sig.is_empty()); } } // ── Failure mode test ────────────────────────────────────────────── /// Simulate one node going offline mid-coordination; remaining peers /// abort the compaction cleanly (no half-applied snapshot state). #[test] fn failure_mode_node_offline_aborts_cleanly() { init(); let snapshot = Snapshot { at_seq: 50, state: vec!["op1".to_string()], op_manifest: vec![OpManifestEntry { author: "aabb".to_string(), story_id: "636_fail_test".to_string(), sig: "ccdd".to_string(), seq: 1, }], }; let peers = vec![ "leader".to_string(), "peer_a".to_string(), "peer_b".to_string(), ]; begin_coordination(snapshot.clone(), &peers, "leader"); // peer_a acks, but peer_b goes offline (no ack). assert!(!record_ack("peer_a", 50)); // Leader detects peer_b is offline → abort. let unacked = unacked_peers(); assert!(unacked.contains(&"peer_b".to_string())); abort_coordination(); assert!(!has_pending_coordination()); // No compaction was applied — state is clean. // The latest_snapshot should NOT be set. // (The snapshot was never committed.) let state = snapshot_state().unwrap().lock().unwrap(); // pending_at_seq is cleared. assert!(state.pending_at_seq.is_none()); assert!(state.pending_acks.is_empty()); } // ── New-node onboarding test ─────────────────────────────────────── /// A node joining a snapshotted cluster receives the most recent snapshot /// + ops with seq >= at_seq. #[test] fn new_node_onboarding_with_snapshot() { let kp = make_keypair(); let mut crdt_existing = BaseCrdt::::new(&kp); // Create 10 ops on the existing cluster. let mut all_ops: Vec = Vec::new(); for i in 0..10u32 { let item: JsonValue = json!({ "story_id": format!("636_onboard_{i}"), "stage": "1_backlog", "name": format!("Onboard {i}"), "agent": "", "retry_count": 0.0, "blocked": false, "depends_on": "", "claimed_by": "", "claimed_at": 0.0, "merged_at": 0.0, }) .into(); let op = crdt_existing.doc.items.insert(ROOT_ID, item).sign(&kp); crdt_existing.apply(op.clone()); all_ops.push(serde_json::to_string(&op).unwrap()); } // Snapshot taken at seq 5 — ops 0-4 are compacted. let snapshot = Snapshot { at_seq: 5, state: all_ops[..5].to_vec(), op_manifest: vec![], }; // New node receives snapshot + ops with seq >= 5. let mut crdt_new = BaseCrdt::::new(&kp); // Apply snapshot state first. for op_json in &snapshot.state { if let Ok(op) = serde_json::from_str::(op_json) { crdt_new.apply(op); } } // Apply remaining ops (seq >= at_seq). for op_json in &all_ops[5..] { if let Ok(op) = serde_json::from_str::(op_json) { crdt_new.apply(op); } } // New node should have all 10 items. assert_eq!(crdt_new.doc.items.view().len(), 10); } // ── Backwards compatibility test ─────────────────────────────────── /// Peers without snapshot support fall back to vector-clock-based full sync. #[test] fn backwards_compat_unknown_snapshot_message_ignored() { // A peer that doesn't understand snapshot messages should be able to // parse them as unknown variants and ignore them gracefully. let snapshot_json = r#"{"type":"snapshot","at_seq":100,"state":["op1"],"op_manifest":[]}"#; // Attempt to parse as a legacy SyncMessage — should fail (unknown type). let result: Result = serde_json::from_str(snapshot_json); // This is expected to fail — old peers ignore unknown types. assert!( result.is_err(), "legacy peers should fail to parse snapshot messages" ); // The snapshot message parses correctly as SnapshotMessage. let parsed: SnapshotMessage = serde_json::from_str(snapshot_json).unwrap(); match parsed { SnapshotMessage::Snapshot(s) => { assert_eq!(s.at_seq, 100); } _ => panic!("Expected Snapshot"), } } // ── Attribution preservation integration test ────────────────────── /// After compaction, an archived story's attribution can be reconstructed. #[test] fn attribution_preserved_after_compaction() { crate::crdt_state::init_for_test(); init(); // Write a story through its lifecycle. crate::crdt_state::write_item( "636_archived_story", "1_backlog", Some("Archived Story"), Some("coder-opus"), None, None, None, None, None, None, ); crate::crdt_state::write_item( "636_archived_story", "2_current", None, None, None, None, None, None, None, None, ); crate::crdt_state::write_item( "636_archived_story", "6_archived", None, None, None, None, None, None, None, None, ); // Generate snapshot. let snapshot = generate_snapshot().unwrap(); // Verify the manifest contains entries for the archived story. let story_entries: Vec<&OpManifestEntry> = snapshot .op_manifest .iter() .filter(|e| e.story_id == "636_archived_story") .collect(); assert!( !story_entries.is_empty(), "manifest must contain entries for the archived story" ); // Each entry must have author (node pubkey) and signature. for entry in &story_entries { assert!(!entry.author.is_empty(), "author must be preserved"); assert!(!entry.sig.is_empty(), "signature must be preserved"); assert!(entry.seq > 0, "seq must be preserved"); } // Apply compaction. let at_seq = snapshot.at_seq; apply_compaction(snapshot); // After compaction, the attribution chain is still queryable. let attrib = query_attribution("636_archived_story"); assert!( !attrib.is_empty(), "attribution must be queryable after compaction" ); for entry in &attrib { assert_eq!(entry.story_id, "636_archived_story"); assert!(!entry.author.is_empty()); assert!(!entry.sig.is_empty()); } // The latest snapshot records the at_seq. let latest = latest_snapshot().unwrap(); assert_eq!(latest.at_seq, at_seq); }