From 571a057f524ab8bf90cb22e8670889eeae17c171 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 27 Apr 2026 22:38:33 +0000 Subject: [PATCH] huskies: merge 688_refactor_decompose_server_src_crdt_snapshot_rs_1182_lines --- server/src/crdt_snapshot.rs | 1182 ----------------------------- server/src/crdt_snapshot/mod.rs | 476 ++++++++++++ server/src/crdt_snapshot/tests.rs | 705 +++++++++++++++++ 3 files changed, 1181 insertions(+), 1182 deletions(-) delete mode 100644 server/src/crdt_snapshot.rs create mode 100644 server/src/crdt_snapshot/mod.rs create mode 100644 server/src/crdt_snapshot/tests.rs diff --git a/server/src/crdt_snapshot.rs b/server/src/crdt_snapshot.rs deleted file mode 100644 index 1dc4db97..00000000 --- a/server/src/crdt_snapshot.rs +++ /dev/null @@ -1,1182 +0,0 @@ -//! CRDT snapshot compaction with cross-node coordination. -//! -//! This module implements full CRDT state snapshots for compacting the op journal. -//! When the op log grows beyond a configurable threshold (default: 10,000 ops) or -//! on a scheduled tick (default: weekly), the leader node generates a snapshot of -//! the current CRDT state and coordinates with all alive peers to discard ops that -//! precede the snapshot. -//! -//! # Attribution preservation -//! -//! Compaction preserves the full attribution chain via an "op manifest" — a -//! compact record of `(author, story_id, signature, seq)` tuples for every op -//! in the compacted range. This allows incident-response forensics to -//! reconstruct who did what to which story, even after the op payloads are -//! discarded. -//! -//! # Wire messages -//! -//! - `{"type": "snapshot", "at_seq": , "state": }` -//! - `{"type": "snapshot_ack", "at_seq": }` -//! -//! # Leader selection -//! -//! The alive peer with the lowest `hash(node_id)` generates the snapshot, -//! mirroring the claim-priority shape from story 634. - -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::sync::Mutex; -use std::sync::OnceLock; - -use crate::crdt_state; - -// ── Configuration ────────────────────────────────────────────────────── - -/// Default threshold: snapshot fires when ALL_OPS.len() exceeds this. -pub const DEFAULT_SNAPSHOT_THRESHOLD: usize = 10_000; - -/// Default scheduled interval in seconds (1 week). -pub const DEFAULT_SNAPSHOT_INTERVAL_SECS: u64 = 7 * 24 * 3600; - -/// Configurable snapshot threshold (set at startup, defaults to [`DEFAULT_SNAPSHOT_THRESHOLD`]). -static SNAPSHOT_THRESHOLD: OnceLock = OnceLock::new(); - -/// Return the current snapshot threshold. -pub fn snapshot_threshold() -> usize { - SNAPSHOT_THRESHOLD - .get() - .copied() - .unwrap_or(DEFAULT_SNAPSHOT_THRESHOLD) -} - -/// Set the snapshot threshold (must be called before first trigger check). -pub fn set_snapshot_threshold(threshold: usize) { - let _ = SNAPSHOT_THRESHOLD.set(threshold); -} - -// ── Wire message types ───────────────────────────────────────────────── - -/// A single entry in the attribution manifest — preserves forensic metadata -/// for each op that existed before the snapshot point. -/// -/// This allows reconstructing who did what to which story, even after the -/// op payloads (state values) are discarded during compaction. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct OpManifestEntry { - /// Hex-encoded Ed25519 public key of the op's author. - pub author: String, - /// The story_id this op pertains to (extracted from the op's path/content). - pub story_id: String, - /// Hex-encoded Ed25519 signature proving the author created this op. - pub sig: String, - /// Lamport sequence number of the op. - pub seq: u64, -} - -/// A CRDT state snapshot used for compaction. -/// -/// Contains the serialized CRDT state at a specific sequence point, plus an -/// attribution manifest that preserves the forensic chain for all ops up to -/// that point. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct Snapshot { - /// The sequence number at which this snapshot was taken. - /// All ops with `seq < at_seq` can be discarded after compaction. - pub at_seq: u64, - /// The serialized CRDT state (all ops as JSON strings). - pub state: Vec, - /// Attribution manifest: one entry per op in the compacted range. - /// Preserves author, story_id, signature, and seq for forensics. - pub op_manifest: Vec, -} - -/// Acknowledgement that a peer has received and applied a snapshot. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct SnapshotAck { - /// The sequence number being acknowledged — must match the snapshot's `at_seq`. - pub at_seq: u64, -} - -/// Wire messages for the snapshot protocol, exchanged as text frames during -/// the CRDT sync streaming phase. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum SnapshotMessage { - /// Leader broadcasts the snapshot to all peers. - Snapshot(Snapshot), - /// Peer acknowledges receipt and application of the snapshot. - SnapshotAck(SnapshotAck), -} - -// ── Snapshot state ───────────────────────────────────────────────────── - -/// In-memory snapshot state for this node. -struct SnapshotState { - /// The most recent completed snapshot (if any). - latest_snapshot: Option, - /// Pending acks for an in-progress snapshot coordination. - /// Maps node_id → whether they've acked. - pending_acks: HashMap, - /// The at_seq of the in-progress snapshot (if any). - pending_at_seq: Option, -} - -#[cfg(not(test))] -static SNAPSHOT_STATE: OnceLock> = OnceLock::new(); - -#[cfg(test)] -thread_local! { - /// Per-thread snapshot state for test isolation. Each test thread gets its - /// own SnapshotState so parallel tests do not leak into each other's - /// snapshot/coordination history. - static SNAPSHOT_STATE_TL: OnceLock> = const { OnceLock::new() }; -} - -/// Read access to the snapshot state. -/// -/// In production this returns the global `SNAPSHOT_STATE`. In `cfg(test)` -/// each thread sees its own thread-local state, so parallel tests do not -/// share `SnapshotState`. -fn snapshot_state() -> Option<&'static Mutex> { - #[cfg(not(test))] - { - SNAPSHOT_STATE.get() - } - #[cfg(test)] - { - let ptr = SNAPSHOT_STATE_TL.with(|lock| lock as *const OnceLock>); - // SAFETY: the thread-local lives as long as the thread. We only need - // 'static for the return type; consumers never outlive the spawning - // test thread. - unsafe { &*ptr }.get() - } -} - -/// Initialise snapshot state. Safe to call multiple times. -/// -/// In production: idempotent via `OnceLock`. In `cfg(test)`: initialises the -/// per-thread state on first call; subsequent calls on the same thread are -/// no-ops. -pub fn init() { - let value = || { - Mutex::new(SnapshotState { - latest_snapshot: None, - pending_acks: HashMap::new(), - pending_at_seq: None, - }) - }; - #[cfg(not(test))] - { - let _ = SNAPSHOT_STATE.set(value()); - } - #[cfg(test)] - { - SNAPSHOT_STATE_TL.with(|lock| { - let _ = lock.set(value()); - }); - } -} - -/// Return the most recent completed snapshot, if any. -pub fn latest_snapshot() -> Option { - snapshot_state()?.lock().ok()?.latest_snapshot.clone() -} - -// ── Leader selection ─────────────────────────────────────────────────── - -/// Compute the snapshot-leader priority hash for a node. -/// -/// Uses the same SHA-256 approach as story 634's claim priority, but hashes -/// only the `node_id` (not a story_id) since snapshot leadership is global. -fn leader_hash(node_id: &str) -> u64 { - use sha2::{Digest, Sha256}; - let mut hasher = Sha256::new(); - hasher.update(node_id.as_bytes()); - let digest = hasher.finalize(); - u64::from_be_bytes(digest[..8].try_into().expect("sha256 is 32 bytes")) -} - -/// Determine whether this node is the snapshot leader among alive peers. -/// -/// The alive peer with the **lowest** `hash(node_id)` is the leader. -/// Returns `true` if `self_node_id` has a strictly lower hash than all -/// other alive peers. When there are no other alive peers (single-node -/// cluster), the result is always `true`. -pub fn is_snapshot_leader(self_node_id: &str, alive_peer_node_ids: &[String]) -> bool { - let my_hash = leader_hash(self_node_id); - for peer_id in alive_peer_node_ids { - if peer_id == self_node_id { - continue; - } - if leader_hash(peer_id) <= my_hash { - return false; - } - } - true -} - -// ── Trigger logic ────────────────────────────────────────────────────── - -/// Check whether a snapshot should be triggered based on the current op count. -/// -/// Returns `true` when `ALL_OPS.len() > threshold` (configurable, default 10,000). -pub fn should_trigger_by_count() -> bool { - let count = crdt_state::all_ops_json().map(|ops| ops.len()).unwrap_or(0); - count > snapshot_threshold() -} - -// ── Snapshot generation ──────────────────────────────────────────────── - -/// Generate a snapshot of the current CRDT state. -/// -/// The snapshot contains: -/// 1. All ops as serialized JSON (the full CRDT state). -/// 2. An attribution manifest preserving author, story_id, sig, and seq -/// for every op — ensuring forensic traceability survives compaction. -/// -/// The `at_seq` is the highest sequence number across all ops. -pub fn generate_snapshot() -> Option { - let all_ops = crdt_state::all_ops_json()?; - if all_ops.is_empty() { - return None; - } - - let mut max_seq: u64 = 0; - let mut manifest = Vec::with_capacity(all_ops.len()); - - for op_json in &all_ops { - if let Ok(signed_op) = serde_json::from_str::(op_json) { - let seq = signed_op.inner.seq; - if seq > max_seq { - max_seq = seq; - } - - // Extract story_id from the op's content if available. - let story_id = extract_story_id_from_op(&signed_op); - - manifest.push(OpManifestEntry { - author: crdt_state::hex::encode(&signed_op.author()), - story_id, - sig: crdt_state::hex::encode(&signed_op.signed_digest), - seq, - }); - } - } - - Some(Snapshot { - at_seq: max_seq, - state: all_ops, - op_manifest: manifest, - }) -} - -/// Extract the story_id from a SignedOp's content, if it contains one. -/// -/// Ops that target pipeline items contain a JSON object with a `story_id` -/// field. For ops that don't (e.g. node presence), returns an empty string. -fn extract_story_id_from_op(op: &bft_json_crdt::json_crdt::SignedOp) -> String { - // Try to extract from the op's path — the second segment often contains - // the list index which maps to a story. However, the most reliable way - // is to look at the content if it's an insert op. - if let Some(bft_json_crdt::json_crdt::JsonValue::Object(map)) = &op.inner.content - && let Some(bft_json_crdt::json_crdt::JsonValue::String(sid)) = map.get("story_id") - { - return sid.clone(); - } - // For field-update ops (LWW set), the path tells us which item, but not - // the story_id directly. Return empty — the manifest still preserves - // author + sig + seq for forensic correlation. - String::new() -} - -// ── Coordination ────────────────────────────────────────────────────── - -/// Begin snapshot coordination as the leader. -/// -/// Records which alive peers need to ack and stores the pending snapshot. -/// Returns the snapshot to broadcast to peers. -pub fn begin_coordination( - snapshot: Snapshot, - alive_peer_node_ids: &[String], - self_node_id: &str, -) -> Option { - let state = snapshot_state()?; - let mut s = state.lock().ok()?; - - let mut pending = HashMap::new(); - for peer_id in alive_peer_node_ids { - if peer_id != self_node_id { - pending.insert(peer_id.clone(), false); - } - } - - s.pending_at_seq = Some(snapshot.at_seq); - s.pending_acks = pending; - - Some(snapshot) -} - -/// Record a snapshot ack from a peer. -/// -/// Returns `true` if quorum has been reached (all alive peers have acked). -pub fn record_ack(node_id: &str, at_seq: u64) -> bool { - let Some(state) = snapshot_state() else { - return false; - }; - let Ok(mut s) = state.lock() else { - return false; - }; - - // Verify the ack matches the pending snapshot. - if s.pending_at_seq != Some(at_seq) { - return false; - } - - if let Some(acked) = s.pending_acks.get_mut(node_id) { - *acked = true; - } - - // Check if all peers have acked (quorum = all alive peers). - s.pending_acks.values().all(|&v| v) -} - -/// Abort the current pending snapshot coordination. -/// -/// Called when a peer goes offline mid-coordination or quorum times out. -pub fn abort_coordination() { - if let Some(state) = snapshot_state() - && let Ok(mut s) = state.lock() - { - s.pending_at_seq = None; - s.pending_acks.clear(); - } -} - -/// Check whether there is a pending snapshot coordination in progress. -pub fn has_pending_coordination() -> bool { - snapshot_state() - .and_then(|s| s.lock().ok()) - .map(|s| s.pending_at_seq.is_some()) - .unwrap_or(false) -} - -/// Return the list of peers that have NOT yet acked the pending snapshot. -pub fn unacked_peers() -> Vec { - snapshot_state() - .and_then(|s| s.lock().ok()) - .map(|s| { - s.pending_acks - .iter() - .filter(|&(_, &acked)| !acked) - .map(|(id, _)| id.clone()) - .collect() - }) - .unwrap_or_default() -} - -// ── Compaction ───────────────────────────────────────────────────────── - -/// Apply compaction: replace the op journal with the snapshot state. -/// -/// After successful quorum, the leader (and each peer upon receiving the -/// snapshot) replaces its `ALL_OPS` with the snapshot's state and resets -/// the vector clock accordingly. -/// -/// The snapshot's `op_manifest` is preserved in the snapshot state for -/// forensic queries. -/// -/// Returns `true` if compaction was applied successfully. -pub fn apply_compaction(snapshot: Snapshot) -> bool { - // Store the snapshot as the latest for future new-node onboarding. - if let Some(state) = snapshot_state() - && let Ok(mut s) = state.lock() - { - s.latest_snapshot = Some(snapshot.clone()); - s.pending_at_seq = None; - s.pending_acks.clear(); - } - - // Replace ALL_OPS with the snapshot state (the compacted ops). - // In a real compaction, we'd keep only ops with seq >= at_seq, but the - // snapshot already contains the minimal set needed to reconstruct state. - // - // For this implementation, the snapshot state IS the full state — peers - // discard their old journal and replace it with the snapshot's ops. - // The op_manifest preserves attribution for the discarded ops. - if let Some(all_ops) = crdt_state::ALL_OPS.get() - && let Ok(mut v) = all_ops.lock() - { - // Calculate ops to prune: those with seq < at_seq - let mut kept_ops = Vec::new(); - let mut pruned_count = 0usize; - for op_json in v.iter() { - if let Ok(signed_op) = - serde_json::from_str::(op_json) - { - if signed_op.inner.seq >= snapshot.at_seq { - kept_ops.push(op_json.clone()); - } else { - pruned_count += 1; - } - } else { - // Keep unparseable ops to avoid data loss. - kept_ops.push(op_json.clone()); - } - } - - *v = kept_ops; - - // Rebuild vector clock from remaining ops. - if let Some(vc) = crdt_state::VECTOR_CLOCK.get() - && let Ok(mut clock) = vc.lock() - { - clock.clear(); - for op_json in v.iter() { - if let Ok(signed_op) = - serde_json::from_str::(op_json) - { - let author_hex = crdt_state::hex::encode(&signed_op.author()); - *clock.entry(author_hex).or_insert(0) += 1; - } - } - } - - crate::slog!( - "[crdt-snapshot] Compaction applied: pruned {pruned_count} ops, kept {} ops", - v.len() - ); - return true; - } - - false -} - -/// Retrieve the op manifest from the latest snapshot for forensic queries. -/// -/// Returns `None` if no snapshot has been taken yet. -pub fn latest_op_manifest() -> Option> { - latest_snapshot().map(|s| s.op_manifest) -} - -/// Query the op manifest for a specific story's attribution chain. -/// -/// Returns all manifest entries where `story_id` matches the query. -pub fn query_attribution(story_id: &str) -> Vec { - latest_op_manifest() - .unwrap_or_default() - .into_iter() - .filter(|entry| entry.story_id == story_id) - .collect() -} - -// ── Tests ────────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - use bft_json_crdt::json_crdt::{BaseCrdt, JsonValue, SignedOp}; - use bft_json_crdt::keypair::make_keypair; - use bft_json_crdt::op::ROOT_ID; - use serde_json::json; - - use crate::crdt_state::PipelineDoc; - - // ── Wire message tests ────────────────────────────────────────────── - - /// Snapshot wire message serialization round-trip. - #[test] - fn snapshot_message_serialization_roundtrip() { - let snapshot = Snapshot { - at_seq: 42, - state: vec!["op1".to_string(), "op2".to_string()], - op_manifest: vec![OpManifestEntry { - author: "aabbcc".to_string(), - story_id: "100_story_test".to_string(), - sig: "deadbeef".to_string(), - seq: 1, - }], - }; - let msg = SnapshotMessage::Snapshot(snapshot.clone()); - let json_str = serde_json::to_string(&msg).unwrap(); - assert!(json_str.contains(r#""type":"snapshot""#)); - let deserialized: SnapshotMessage = serde_json::from_str(&json_str).unwrap(); - match deserialized { - SnapshotMessage::Snapshot(s) => { - assert_eq!(s.at_seq, 42); - assert_eq!(s.state.len(), 2); - assert_eq!(s.op_manifest.len(), 1); - assert_eq!(s.op_manifest[0].author, "aabbcc"); - } - _ => panic!("Expected Snapshot"), - } - } - - /// Snapshot ack wire message serialization round-trip. - #[test] - fn snapshot_ack_message_serialization_roundtrip() { - let msg = SnapshotMessage::SnapshotAck(SnapshotAck { at_seq: 42 }); - let json_str = serde_json::to_string(&msg).unwrap(); - assert!(json_str.contains(r#""type":"snapshot_ack""#)); - let deserialized: SnapshotMessage = serde_json::from_str(&json_str).unwrap(); - match deserialized { - SnapshotMessage::SnapshotAck(ack) => { - assert_eq!(ack.at_seq, 42); - } - _ => panic!("Expected SnapshotAck"), - } - } - - /// Snapshot wire format matches the AC spec exactly. - #[test] - fn snapshot_wire_format_matches_spec() { - let snapshot = Snapshot { - at_seq: 100, - state: vec!["{}".to_string()], - op_manifest: vec![], - }; - let msg = SnapshotMessage::Snapshot(snapshot); - let json_str = serde_json::to_string(&msg).unwrap(); - // Must contain "type", "at_seq", "state" fields per AC1. - let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap(); - assert_eq!(parsed["type"], "snapshot"); - assert_eq!(parsed["at_seq"], 100); - assert!(parsed["state"].is_array()); - } - - /// SnapshotAck wire format matches the AC spec exactly. - #[test] - fn snapshot_ack_wire_format_matches_spec() { - let msg = SnapshotMessage::SnapshotAck(SnapshotAck { at_seq: 55 }); - let json_str = serde_json::to_string(&msg).unwrap(); - let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap(); - assert_eq!(parsed["type"], "snapshot_ack"); - assert_eq!(parsed["at_seq"], 55); - } - - // ── Leader selection tests ────────────────────────────────────────── - - /// Single node is always leader. - #[test] - fn single_node_is_always_leader() { - assert!(is_snapshot_leader("node_a", &[])); - } - - /// Leader is the node with the lowest hash. - #[test] - fn leader_is_lowest_hash_node() { - let nodes = vec![ - "node_alpha".to_string(), - "node_beta".to_string(), - "node_gamma".to_string(), - ]; - - // Find which node has the lowest hash. - let mut lowest_node = &nodes[0]; - let mut lowest_hash = leader_hash(&nodes[0]); - for node in &nodes[1..] { - let h = leader_hash(node); - if h < lowest_hash { - lowest_hash = h; - lowest_node = node; - } - } - - // Only the lowest-hash node should be leader. - for node in &nodes { - let is_leader = is_snapshot_leader(node, &nodes); - if node == lowest_node { - assert!(is_leader, "{node} should be leader (lowest hash)"); - } else { - assert!(!is_leader, "{node} should NOT be leader"); - } - } - } - - /// Leader selection is deterministic. - #[test] - fn leader_selection_is_deterministic() { - let peers = vec!["a".to_string(), "b".to_string(), "c".to_string()]; - let result1 = is_snapshot_leader("a", &peers); - let result2 = is_snapshot_leader("a", &peers); - assert_eq!(result1, result2); - } - - // ── Trigger logic tests ───────────────────────────────────────────── - - /// Threshold is configurable. - #[test] - fn snapshot_threshold_default() { - // When not set, defaults to DEFAULT_SNAPSHOT_THRESHOLD. - assert_eq!(snapshot_threshold(), DEFAULT_SNAPSHOT_THRESHOLD); - } - - // ── Coordination tests ────────────────────────────────────────────── - - /// Record ack returns true when all peers have acked. - #[test] - fn coordination_quorum_reached() { - init(); - let snapshot = Snapshot { - at_seq: 10, - state: vec![], - op_manifest: vec![], - }; - let peers = vec![ - "self".to_string(), - "peer_a".to_string(), - "peer_b".to_string(), - ]; - begin_coordination(snapshot, &peers, "self"); - - assert!(!record_ack("peer_a", 10)); - assert!(record_ack("peer_b", 10)); - } - - /// Ack for wrong at_seq is rejected. - #[test] - fn coordination_ack_wrong_seq_rejected() { - init(); - let snapshot = Snapshot { - at_seq: 20, - state: vec![], - op_manifest: vec![], - }; - begin_coordination( - snapshot, - &["self".to_string(), "peer_a".to_string()], - "self", - ); - assert!(!record_ack("peer_a", 999)); - } - - /// Abort clears pending state. - #[test] - fn coordination_abort_clears_state() { - init(); - let snapshot = Snapshot { - at_seq: 30, - state: vec![], - op_manifest: vec![], - }; - begin_coordination( - snapshot, - &["self".to_string(), "peer_a".to_string()], - "self", - ); - assert!(has_pending_coordination()); - - abort_coordination(); - assert!(!has_pending_coordination()); - } - - /// Unacked peers are reported correctly. - #[test] - fn unacked_peers_reported() { - init(); - let snapshot = Snapshot { - at_seq: 40, - state: vec![], - op_manifest: vec![], - }; - begin_coordination( - snapshot, - &[ - "self".to_string(), - "peer_a".to_string(), - "peer_b".to_string(), - ], - "self", - ); - - let unacked = unacked_peers(); - assert_eq!(unacked.len(), 2); - - record_ack("peer_a", 40); - let unacked = unacked_peers(); - assert_eq!(unacked.len(), 1); - assert_eq!(unacked[0], "peer_b"); - } - - // ── Snapshot generation tests ─────────────────────────────────────── - - /// Snapshot generation from ops includes attribution manifest. - #[test] - fn snapshot_generation_includes_manifest() { - crdt_state::init_for_test(); - - // Write some items to populate ALL_OPS. - crdt_state::write_item( - "636_test_a", - "1_backlog", - Some("Test A"), - None, - None, - None, - None, - None, - None, - None, - ); - crdt_state::write_item( - "636_test_b", - "2_current", - Some("Test B"), - None, - None, - None, - None, - None, - None, - None, - ); - - let snapshot = generate_snapshot(); - assert!(snapshot.is_some()); - let snapshot = snapshot.unwrap(); - assert!(snapshot.at_seq > 0); - assert!(!snapshot.state.is_empty()); - assert!(!snapshot.op_manifest.is_empty()); - - // Every manifest entry must have a non-empty author and sig. - for entry in &snapshot.op_manifest { - assert!(!entry.author.is_empty(), "author must not be empty"); - assert!(!entry.sig.is_empty(), "sig must not be empty"); - } - } - - /// Attribution can be queried by story_id after snapshot. - #[test] - fn attribution_query_by_story_id() { - crdt_state::init_for_test(); - init(); - - crdt_state::write_item( - "636_attrib_test", - "1_backlog", - Some("Attribution Test"), - None, - None, - None, - None, - None, - None, - None, - ); - - let snapshot = generate_snapshot().unwrap(); - - // Store as latest. - if let Some(state) = snapshot_state() - && let Ok(mut s) = state.lock() - { - s.latest_snapshot = Some(snapshot.clone()); - } - - let attrib = query_attribution("636_attrib_test"); - // Insert ops for the story should appear in the manifest. - let has_story = attrib.iter().any(|e| e.story_id == "636_attrib_test"); - assert!(has_story, "attribution must include ops for the story"); - } - - // ── Compaction tests ──────────────────────────────────────────────── - - /// After compaction, ALL_OPS size is reduced. - #[test] - fn compaction_reduces_ops() { - crdt_state::init_for_test(); - init(); - - // Write several items. - for i in 0..5 { - crdt_state::write_item( - &format!("636_compact_{i}"), - "1_backlog", - Some(&format!("Item {i}")), - None, - None, - None, - None, - None, - None, - None, - ); - } - - let ops_before = crdt_state::all_ops_json().unwrap().len(); - assert!(ops_before >= 5); - - // Generate a snapshot — at_seq is the max seq across all ops. - let snapshot = generate_snapshot().unwrap(); - - let result = apply_compaction(snapshot); - assert!(result); - - // After compaction, ops with seq < at_seq are gone, but ops with - // seq >= at_seq remain (which may be 0 or 1). - let ops_after = crdt_state::all_ops_json().unwrap().len(); - // At minimum, some pruning should occur if at_seq > some op seqs. - assert!( - ops_after <= ops_before, - "ops_after ({ops_after}) should be <= ops_before ({ops_before})" - ); - } - - /// Latest snapshot is available after compaction. - #[test] - fn latest_snapshot_available_after_compaction() { - crdt_state::init_for_test(); - init(); - - crdt_state::write_item( - "636_latest_test", - "1_backlog", - Some("Latest Test"), - None, - None, - None, - None, - None, - None, - None, - ); - - let snapshot = generate_snapshot().unwrap(); - let at_seq = snapshot.at_seq; - apply_compaction(snapshot); - - let latest = latest_snapshot(); - assert!(latest.is_some()); - assert_eq!(latest.unwrap().at_seq, at_seq); - } - - // ── Integration tests: 3-node compaction ─────────────────────────── - - /// Integration test: 3 nodes, force compaction, verify all converge on - /// the same `at_seq` and pruned ops disappear from each node's log. - #[test] - fn three_node_compaction_convergence() { - use bft_json_crdt::json_crdt::BaseCrdt; - - // Simulate 3 independent nodes. - let kp_a = make_keypair(); - let kp_b = make_keypair(); - let kp_c = make_keypair(); - - let mut crdt_a = BaseCrdt::::new(&kp_a); - let mut crdt_b = BaseCrdt::::new(&kp_b); - let mut crdt_c = BaseCrdt::::new(&kp_c); - - // Node A creates items. - let mut all_ops_json: Vec = Vec::new(); - for i in 0..10u32 { - let item: JsonValue = json!({ - "story_id": format!("636_3node_{i}"), - "stage": "1_backlog", - "name": format!("Item {i}"), - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - "merged_at": 0.0, - }) - .into(); - let op = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); - crdt_a.apply(op.clone()); - let op_json = serde_json::to_string(&op).unwrap(); - all_ops_json.push(op_json); - } - - // Sync ops to B and C. - for op_json in &all_ops_json { - let op: SignedOp = serde_json::from_str(op_json).unwrap(); - crdt_b.apply(op.clone()); - crdt_c.apply(op); - } - - // All 3 nodes have the same 10 items. - assert_eq!(crdt_a.doc.items.view().len(), 10); - assert_eq!(crdt_b.doc.items.view().len(), 10); - assert_eq!(crdt_c.doc.items.view().len(), 10); - - // Generate snapshot on leader (A). - let mut max_seq = 0u64; - let mut manifest = Vec::new(); - for op_json in &all_ops_json { - if let Ok(signed_op) = serde_json::from_str::(op_json) { - if signed_op.inner.seq > max_seq { - max_seq = signed_op.inner.seq; - } - manifest.push(OpManifestEntry { - author: crdt_state::hex::encode(&signed_op.author()), - story_id: extract_story_id_from_op(&signed_op), - sig: crdt_state::hex::encode(&signed_op.signed_digest), - seq: signed_op.inner.seq, - }); - } - } - - let snapshot = Snapshot { - at_seq: max_seq, - state: all_ops_json.clone(), - op_manifest: manifest, - }; - - // Broadcast snapshot wire message. - let msg = SnapshotMessage::Snapshot(snapshot.clone()); - let wire = serde_json::to_string(&msg).unwrap(); - - // Each node receives and parses the snapshot. - let parsed: SnapshotMessage = serde_json::from_str(&wire).unwrap(); - match parsed { - SnapshotMessage::Snapshot(s) => { - // All nodes converge on the same at_seq. - assert_eq!(s.at_seq, max_seq); - assert_eq!(s.state.len(), all_ops_json.len()); - assert_eq!(s.op_manifest.len(), all_ops_json.len()); - } - _ => panic!("Expected Snapshot"), - } - - // Each node sends an ack. - let ack = SnapshotMessage::SnapshotAck(SnapshotAck { at_seq: max_seq }); - let ack_wire = serde_json::to_string(&ack).unwrap(); - let parsed_ack: SnapshotMessage = serde_json::from_str(&ack_wire).unwrap(); - match parsed_ack { - SnapshotMessage::SnapshotAck(a) => { - assert_eq!(a.at_seq, max_seq); - } - _ => panic!("Expected SnapshotAck"), - } - - // After all acks, compaction proceeds — ops with seq < at_seq are pruned. - // Since all ops have seq <= max_seq, only ops with seq == max_seq survive. - // The op manifest preserves attribution for all pruned ops. - assert!(!snapshot.op_manifest.is_empty()); - for entry in &snapshot.op_manifest { - assert!(!entry.author.is_empty()); - assert!(!entry.sig.is_empty()); - } - } - - // ── Failure mode test ────────────────────────────────────────────── - - /// Simulate one node going offline mid-coordination; remaining peers - /// abort the compaction cleanly (no half-applied snapshot state). - #[test] - fn failure_mode_node_offline_aborts_cleanly() { - init(); - - let snapshot = Snapshot { - at_seq: 50, - state: vec!["op1".to_string()], - op_manifest: vec![OpManifestEntry { - author: "aabb".to_string(), - story_id: "636_fail_test".to_string(), - sig: "ccdd".to_string(), - seq: 1, - }], - }; - - let peers = vec![ - "leader".to_string(), - "peer_a".to_string(), - "peer_b".to_string(), - ]; - begin_coordination(snapshot.clone(), &peers, "leader"); - - // peer_a acks, but peer_b goes offline (no ack). - assert!(!record_ack("peer_a", 50)); - - // Leader detects peer_b is offline → abort. - let unacked = unacked_peers(); - assert!(unacked.contains(&"peer_b".to_string())); - - abort_coordination(); - assert!(!has_pending_coordination()); - - // No compaction was applied — state is clean. - // The latest_snapshot should NOT be set. - // (The snapshot was never committed.) - let state = snapshot_state().unwrap().lock().unwrap(); - // pending_at_seq is cleared. - assert!(state.pending_at_seq.is_none()); - assert!(state.pending_acks.is_empty()); - } - - // ── New-node onboarding test ─────────────────────────────────────── - - /// A node joining a snapshotted cluster receives the most recent snapshot - /// + ops with seq >= at_seq. - #[test] - fn new_node_onboarding_with_snapshot() { - let kp = make_keypair(); - let mut crdt_existing = BaseCrdt::::new(&kp); - - // Create 10 ops on the existing cluster. - let mut all_ops: Vec = Vec::new(); - for i in 0..10u32 { - let item: JsonValue = json!({ - "story_id": format!("636_onboard_{i}"), - "stage": "1_backlog", - "name": format!("Onboard {i}"), - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - "merged_at": 0.0, - }) - .into(); - let op = crdt_existing.doc.items.insert(ROOT_ID, item).sign(&kp); - crdt_existing.apply(op.clone()); - all_ops.push(serde_json::to_string(&op).unwrap()); - } - - // Snapshot taken at seq 5 — ops 0-4 are compacted. - let snapshot = Snapshot { - at_seq: 5, - state: all_ops[..5].to_vec(), - op_manifest: vec![], - }; - - // New node receives snapshot + ops with seq >= 5. - let mut crdt_new = BaseCrdt::::new(&kp); - - // Apply snapshot state first. - for op_json in &snapshot.state { - if let Ok(op) = serde_json::from_str::(op_json) { - crdt_new.apply(op); - } - } - - // Apply remaining ops (seq >= at_seq). - for op_json in &all_ops[5..] { - if let Ok(op) = serde_json::from_str::(op_json) { - crdt_new.apply(op); - } - } - - // New node should have all 10 items. - assert_eq!(crdt_new.doc.items.view().len(), 10); - } - - // ── Backwards compatibility test ─────────────────────────────────── - - /// Peers without snapshot support fall back to vector-clock-based full sync. - #[test] - fn backwards_compat_unknown_snapshot_message_ignored() { - // A peer that doesn't understand snapshot messages should be able to - // parse them as unknown variants and ignore them gracefully. - let snapshot_json = r#"{"type":"snapshot","at_seq":100,"state":["op1"],"op_manifest":[]}"#; - - // Attempt to parse as a legacy SyncMessage — should fail (unknown type). - let result: Result = - serde_json::from_str(snapshot_json); - // This is expected to fail — old peers ignore unknown types. - assert!( - result.is_err(), - "legacy peers should fail to parse snapshot messages" - ); - - // The snapshot message parses correctly as SnapshotMessage. - let parsed: SnapshotMessage = serde_json::from_str(snapshot_json).unwrap(); - match parsed { - SnapshotMessage::Snapshot(s) => { - assert_eq!(s.at_seq, 100); - } - _ => panic!("Expected Snapshot"), - } - } - - // ── Attribution preservation integration test ────────────────────── - - /// After compaction, an archived story's attribution can be reconstructed. - #[test] - fn attribution_preserved_after_compaction() { - crdt_state::init_for_test(); - init(); - - // Write a story through its lifecycle. - crdt_state::write_item( - "636_archived_story", - "1_backlog", - Some("Archived Story"), - Some("coder-opus"), - None, - None, - None, - None, - None, - None, - ); - crdt_state::write_item( - "636_archived_story", - "2_current", - None, - None, - None, - None, - None, - None, - None, - None, - ); - crdt_state::write_item( - "636_archived_story", - "6_archived", - None, - None, - None, - None, - None, - None, - None, - None, - ); - - // Generate snapshot. - let snapshot = generate_snapshot().unwrap(); - - // Verify the manifest contains entries for the archived story. - let story_entries: Vec<&OpManifestEntry> = snapshot - .op_manifest - .iter() - .filter(|e| e.story_id == "636_archived_story") - .collect(); - assert!( - !story_entries.is_empty(), - "manifest must contain entries for the archived story" - ); - - // Each entry must have author (node pubkey) and signature. - for entry in &story_entries { - assert!(!entry.author.is_empty(), "author must be preserved"); - assert!(!entry.sig.is_empty(), "signature must be preserved"); - assert!(entry.seq > 0, "seq must be preserved"); - } - - // Apply compaction. - let at_seq = snapshot.at_seq; - apply_compaction(snapshot); - - // After compaction, the attribution chain is still queryable. - let attrib = query_attribution("636_archived_story"); - assert!( - !attrib.is_empty(), - "attribution must be queryable after compaction" - ); - for entry in &attrib { - assert_eq!(entry.story_id, "636_archived_story"); - assert!(!entry.author.is_empty()); - assert!(!entry.sig.is_empty()); - } - - // The latest snapshot records the at_seq. - let latest = latest_snapshot().unwrap(); - assert_eq!(latest.at_seq, at_seq); - } -} diff --git a/server/src/crdt_snapshot/mod.rs b/server/src/crdt_snapshot/mod.rs new file mode 100644 index 00000000..5f7fe0a4 --- /dev/null +++ b/server/src/crdt_snapshot/mod.rs @@ -0,0 +1,476 @@ +//! CRDT snapshot compaction with cross-node coordination. +//! +//! This module implements full CRDT state snapshots for compacting the op journal. +//! When the op log grows beyond a configurable threshold (default: 10,000 ops) or +//! on a scheduled tick (default: weekly), the leader node generates a snapshot of +//! the current CRDT state and coordinates with all alive peers to discard ops that +//! precede the snapshot. +//! +//! # Attribution preservation +//! +//! Compaction preserves the full attribution chain via an "op manifest" — a +//! compact record of `(author, story_id, signature, seq)` tuples for every op +//! in the compacted range. This allows incident-response forensics to +//! reconstruct who did what to which story, even after the op payloads are +//! discarded. +//! +//! # Wire messages +//! +//! - `{"type": "snapshot", "at_seq": , "state": }` +//! - `{"type": "snapshot_ack", "at_seq": }` +//! +//! # Leader selection +//! +//! The alive peer with the lowest `hash(node_id)` generates the snapshot, +//! mirroring the claim-priority shape from story 634. + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Mutex; +use std::sync::OnceLock; + +use crate::crdt_state; + +// ── Configuration ────────────────────────────────────────────────────── + +/// Default threshold: snapshot fires when ALL_OPS.len() exceeds this. +pub const DEFAULT_SNAPSHOT_THRESHOLD: usize = 10_000; + +/// Default scheduled interval in seconds (1 week). +pub const DEFAULT_SNAPSHOT_INTERVAL_SECS: u64 = 7 * 24 * 3600; + +/// Configurable snapshot threshold (set at startup, defaults to [`DEFAULT_SNAPSHOT_THRESHOLD`]). +static SNAPSHOT_THRESHOLD: OnceLock = OnceLock::new(); + +/// Return the current snapshot threshold. +pub fn snapshot_threshold() -> usize { + SNAPSHOT_THRESHOLD + .get() + .copied() + .unwrap_or(DEFAULT_SNAPSHOT_THRESHOLD) +} + +/// Set the snapshot threshold (must be called before first trigger check). +pub fn set_snapshot_threshold(threshold: usize) { + let _ = SNAPSHOT_THRESHOLD.set(threshold); +} + +// ── Wire message types ───────────────────────────────────────────────── + +/// A single entry in the attribution manifest — preserves forensic metadata +/// for each op that existed before the snapshot point. +/// +/// This allows reconstructing who did what to which story, even after the +/// op payloads (state values) are discarded during compaction. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct OpManifestEntry { + /// Hex-encoded Ed25519 public key of the op's author. + pub author: String, + /// The story_id this op pertains to (extracted from the op's path/content). + pub story_id: String, + /// Hex-encoded Ed25519 signature proving the author created this op. + pub sig: String, + /// Lamport sequence number of the op. + pub seq: u64, +} + +/// A CRDT state snapshot used for compaction. +/// +/// Contains the serialized CRDT state at a specific sequence point, plus an +/// attribution manifest that preserves the forensic chain for all ops up to +/// that point. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct Snapshot { + /// The sequence number at which this snapshot was taken. + /// All ops with `seq < at_seq` can be discarded after compaction. + pub at_seq: u64, + /// The serialized CRDT state (all ops as JSON strings). + pub state: Vec, + /// Attribution manifest: one entry per op in the compacted range. + /// Preserves author, story_id, signature, and seq for forensics. + pub op_manifest: Vec, +} + +/// Acknowledgement that a peer has received and applied a snapshot. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct SnapshotAck { + /// The sequence number being acknowledged — must match the snapshot's `at_seq`. + pub at_seq: u64, +} + +/// Wire messages for the snapshot protocol, exchanged as text frames during +/// the CRDT sync streaming phase. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum SnapshotMessage { + /// Leader broadcasts the snapshot to all peers. + Snapshot(Snapshot), + /// Peer acknowledges receipt and application of the snapshot. + SnapshotAck(SnapshotAck), +} + +// ── Snapshot state ───────────────────────────────────────────────────── + +/// In-memory snapshot state for this node. +struct SnapshotState { + /// The most recent completed snapshot (if any). + latest_snapshot: Option, + /// Pending acks for an in-progress snapshot coordination. + /// Maps node_id → whether they've acked. + pending_acks: HashMap, + /// The at_seq of the in-progress snapshot (if any). + pending_at_seq: Option, +} + +#[cfg(not(test))] +static SNAPSHOT_STATE: OnceLock> = OnceLock::new(); + +#[cfg(test)] +thread_local! { + /// Per-thread snapshot state for test isolation. Each test thread gets its + /// own SnapshotState so parallel tests do not leak into each other's + /// snapshot/coordination history. + static SNAPSHOT_STATE_TL: OnceLock> = const { OnceLock::new() }; +} + +/// Read access to the snapshot state. +/// +/// In production this returns the global `SNAPSHOT_STATE`. In `cfg(test)` +/// each thread sees its own thread-local state, so parallel tests do not +/// share `SnapshotState`. +fn snapshot_state() -> Option<&'static Mutex> { + #[cfg(not(test))] + { + SNAPSHOT_STATE.get() + } + #[cfg(test)] + { + let ptr = SNAPSHOT_STATE_TL.with(|lock| lock as *const OnceLock>); + // SAFETY: the thread-local lives as long as the thread. We only need + // 'static for the return type; consumers never outlive the spawning + // test thread. + unsafe { &*ptr }.get() + } +} + +/// Initialise snapshot state. Safe to call multiple times. +/// +/// In production: idempotent via `OnceLock`. In `cfg(test)`: initialises the +/// per-thread state on first call; subsequent calls on the same thread are +/// no-ops. +pub fn init() { + let value = || { + Mutex::new(SnapshotState { + latest_snapshot: None, + pending_acks: HashMap::new(), + pending_at_seq: None, + }) + }; + #[cfg(not(test))] + { + let _ = SNAPSHOT_STATE.set(value()); + } + #[cfg(test)] + { + SNAPSHOT_STATE_TL.with(|lock| { + let _ = lock.set(value()); + }); + } +} + +/// Return the most recent completed snapshot, if any. +pub fn latest_snapshot() -> Option { + snapshot_state()?.lock().ok()?.latest_snapshot.clone() +} + +// ── Leader selection ─────────────────────────────────────────────────── + +/// Compute the snapshot-leader priority hash for a node. +/// +/// Uses the same SHA-256 approach as story 634's claim priority, but hashes +/// only the `node_id` (not a story_id) since snapshot leadership is global. +fn leader_hash(node_id: &str) -> u64 { + use sha2::{Digest, Sha256}; + let mut hasher = Sha256::new(); + hasher.update(node_id.as_bytes()); + let digest = hasher.finalize(); + u64::from_be_bytes(digest[..8].try_into().expect("sha256 is 32 bytes")) +} + +/// Determine whether this node is the snapshot leader among alive peers. +/// +/// The alive peer with the **lowest** `hash(node_id)` is the leader. +/// Returns `true` if `self_node_id` has a strictly lower hash than all +/// other alive peers. When there are no other alive peers (single-node +/// cluster), the result is always `true`. +pub fn is_snapshot_leader(self_node_id: &str, alive_peer_node_ids: &[String]) -> bool { + let my_hash = leader_hash(self_node_id); + for peer_id in alive_peer_node_ids { + if peer_id == self_node_id { + continue; + } + if leader_hash(peer_id) <= my_hash { + return false; + } + } + true +} + +// ── Trigger logic ────────────────────────────────────────────────────── + +/// Check whether a snapshot should be triggered based on the current op count. +/// +/// Returns `true` when `ALL_OPS.len() > threshold` (configurable, default 10,000). +pub fn should_trigger_by_count() -> bool { + let count = crdt_state::all_ops_json().map(|ops| ops.len()).unwrap_or(0); + count > snapshot_threshold() +} + +// ── Snapshot generation ──────────────────────────────────────────────── + +/// Generate a snapshot of the current CRDT state. +/// +/// The snapshot contains: +/// 1. All ops as serialized JSON (the full CRDT state). +/// 2. An attribution manifest preserving author, story_id, sig, and seq +/// for every op — ensuring forensic traceability survives compaction. +/// +/// The `at_seq` is the highest sequence number across all ops. +pub fn generate_snapshot() -> Option { + let all_ops = crdt_state::all_ops_json()?; + if all_ops.is_empty() { + return None; + } + + let mut max_seq: u64 = 0; + let mut manifest = Vec::with_capacity(all_ops.len()); + + for op_json in &all_ops { + if let Ok(signed_op) = serde_json::from_str::(op_json) { + let seq = signed_op.inner.seq; + if seq > max_seq { + max_seq = seq; + } + + // Extract story_id from the op's content if available. + let story_id = extract_story_id_from_op(&signed_op); + + manifest.push(OpManifestEntry { + author: crdt_state::hex::encode(&signed_op.author()), + story_id, + sig: crdt_state::hex::encode(&signed_op.signed_digest), + seq, + }); + } + } + + Some(Snapshot { + at_seq: max_seq, + state: all_ops, + op_manifest: manifest, + }) +} + +/// Extract the story_id from a SignedOp's content, if it contains one. +/// +/// Ops that target pipeline items contain a JSON object with a `story_id` +/// field. For ops that don't (e.g. node presence), returns an empty string. +pub(crate) fn extract_story_id_from_op(op: &bft_json_crdt::json_crdt::SignedOp) -> String { + // Try to extract from the op's path — the second segment often contains + // the list index which maps to a story. However, the most reliable way + // is to look at the content if it's an insert op. + if let Some(bft_json_crdt::json_crdt::JsonValue::Object(map)) = &op.inner.content + && let Some(bft_json_crdt::json_crdt::JsonValue::String(sid)) = map.get("story_id") + { + return sid.clone(); + } + // For field-update ops (LWW set), the path tells us which item, but not + // the story_id directly. Return empty — the manifest still preserves + // author + sig + seq for forensic correlation. + String::new() +} + +// ── Coordination ────────────────────────────────────────────────────── + +/// Begin snapshot coordination as the leader. +/// +/// Records which alive peers need to ack and stores the pending snapshot. +/// Returns the snapshot to broadcast to peers. +pub fn begin_coordination( + snapshot: Snapshot, + alive_peer_node_ids: &[String], + self_node_id: &str, +) -> Option { + let state = snapshot_state()?; + let mut s = state.lock().ok()?; + + let mut pending = HashMap::new(); + for peer_id in alive_peer_node_ids { + if peer_id != self_node_id { + pending.insert(peer_id.clone(), false); + } + } + + s.pending_at_seq = Some(snapshot.at_seq); + s.pending_acks = pending; + + Some(snapshot) +} + +/// Record a snapshot ack from a peer. +/// +/// Returns `true` if quorum has been reached (all alive peers have acked). +pub fn record_ack(node_id: &str, at_seq: u64) -> bool { + let Some(state) = snapshot_state() else { + return false; + }; + let Ok(mut s) = state.lock() else { + return false; + }; + + // Verify the ack matches the pending snapshot. + if s.pending_at_seq != Some(at_seq) { + return false; + } + + if let Some(acked) = s.pending_acks.get_mut(node_id) { + *acked = true; + } + + // Check if all peers have acked (quorum = all alive peers). + s.pending_acks.values().all(|&v| v) +} + +/// Abort the current pending snapshot coordination. +/// +/// Called when a peer goes offline mid-coordination or quorum times out. +pub fn abort_coordination() { + if let Some(state) = snapshot_state() + && let Ok(mut s) = state.lock() + { + s.pending_at_seq = None; + s.pending_acks.clear(); + } +} + +/// Check whether there is a pending snapshot coordination in progress. +pub fn has_pending_coordination() -> bool { + snapshot_state() + .and_then(|s| s.lock().ok()) + .map(|s| s.pending_at_seq.is_some()) + .unwrap_or(false) +} + +/// Return the list of peers that have NOT yet acked the pending snapshot. +pub fn unacked_peers() -> Vec { + snapshot_state() + .and_then(|s| s.lock().ok()) + .map(|s| { + s.pending_acks + .iter() + .filter(|&(_, &acked)| !acked) + .map(|(id, _)| id.clone()) + .collect() + }) + .unwrap_or_default() +} + +// ── Compaction ───────────────────────────────────────────────────────── + +/// Apply compaction: replace the op journal with the snapshot state. +/// +/// After successful quorum, the leader (and each peer upon receiving the +/// snapshot) replaces its `ALL_OPS` with the snapshot's state and resets +/// the vector clock accordingly. +/// +/// The snapshot's `op_manifest` is preserved in the snapshot state for +/// forensic queries. +/// +/// Returns `true` if compaction was applied successfully. +pub fn apply_compaction(snapshot: Snapshot) -> bool { + // Store the snapshot as the latest for future new-node onboarding. + if let Some(state) = snapshot_state() + && let Ok(mut s) = state.lock() + { + s.latest_snapshot = Some(snapshot.clone()); + s.pending_at_seq = None; + s.pending_acks.clear(); + } + + // Replace ALL_OPS with the snapshot state (the compacted ops). + // In a real compaction, we'd keep only ops with seq >= at_seq, but the + // snapshot already contains the minimal set needed to reconstruct state. + // + // For this implementation, the snapshot state IS the full state — peers + // discard their old journal and replace it with the snapshot's ops. + // The op_manifest preserves attribution for the discarded ops. + if let Some(all_ops) = crdt_state::ALL_OPS.get() + && let Ok(mut v) = all_ops.lock() + { + // Calculate ops to prune: those with seq < at_seq + let mut kept_ops = Vec::new(); + let mut pruned_count = 0usize; + for op_json in v.iter() { + if let Ok(signed_op) = + serde_json::from_str::(op_json) + { + if signed_op.inner.seq >= snapshot.at_seq { + kept_ops.push(op_json.clone()); + } else { + pruned_count += 1; + } + } else { + // Keep unparseable ops to avoid data loss. + kept_ops.push(op_json.clone()); + } + } + + *v = kept_ops; + + // Rebuild vector clock from remaining ops. + if let Some(vc) = crdt_state::VECTOR_CLOCK.get() + && let Ok(mut clock) = vc.lock() + { + clock.clear(); + for op_json in v.iter() { + if let Ok(signed_op) = + serde_json::from_str::(op_json) + { + let author_hex = crdt_state::hex::encode(&signed_op.author()); + *clock.entry(author_hex).or_insert(0) += 1; + } + } + } + + crate::slog!( + "[crdt-snapshot] Compaction applied: pruned {pruned_count} ops, kept {} ops", + v.len() + ); + return true; + } + + false +} + +/// Retrieve the op manifest from the latest snapshot for forensic queries. +/// +/// Returns `None` if no snapshot has been taken yet. +pub fn latest_op_manifest() -> Option> { + latest_snapshot().map(|s| s.op_manifest) +} + +/// Query the op manifest for a specific story's attribution chain. +/// +/// Returns all manifest entries where `story_id` matches the query. +pub fn query_attribution(story_id: &str) -> Vec { + latest_op_manifest() + .unwrap_or_default() + .into_iter() + .filter(|entry| entry.story_id == story_id) + .collect() +} + +// ── Tests ────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests; diff --git a/server/src/crdt_snapshot/tests.rs b/server/src/crdt_snapshot/tests.rs new file mode 100644 index 00000000..3c7cf63e --- /dev/null +++ b/server/src/crdt_snapshot/tests.rs @@ -0,0 +1,705 @@ +use super::*; +use bft_json_crdt::json_crdt::{BaseCrdt, JsonValue, SignedOp}; +use bft_json_crdt::keypair::make_keypair; +use bft_json_crdt::op::ROOT_ID; +use serde_json::json; + +use crate::crdt_state::PipelineDoc; + +// ── Wire message tests ────────────────────────────────────────────── + +/// Snapshot wire message serialization round-trip. +#[test] +fn snapshot_message_serialization_roundtrip() { + let snapshot = Snapshot { + at_seq: 42, + state: vec!["op1".to_string(), "op2".to_string()], + op_manifest: vec![OpManifestEntry { + author: "aabbcc".to_string(), + story_id: "100_story_test".to_string(), + sig: "deadbeef".to_string(), + seq: 1, + }], + }; + let msg = SnapshotMessage::Snapshot(snapshot.clone()); + let json_str = serde_json::to_string(&msg).unwrap(); + assert!(json_str.contains(r#""type":"snapshot""#)); + let deserialized: SnapshotMessage = serde_json::from_str(&json_str).unwrap(); + match deserialized { + SnapshotMessage::Snapshot(s) => { + assert_eq!(s.at_seq, 42); + assert_eq!(s.state.len(), 2); + assert_eq!(s.op_manifest.len(), 1); + assert_eq!(s.op_manifest[0].author, "aabbcc"); + } + _ => panic!("Expected Snapshot"), + } +} + +/// Snapshot ack wire message serialization round-trip. +#[test] +fn snapshot_ack_message_serialization_roundtrip() { + let msg = SnapshotMessage::SnapshotAck(SnapshotAck { at_seq: 42 }); + let json_str = serde_json::to_string(&msg).unwrap(); + assert!(json_str.contains(r#""type":"snapshot_ack""#)); + let deserialized: SnapshotMessage = serde_json::from_str(&json_str).unwrap(); + match deserialized { + SnapshotMessage::SnapshotAck(ack) => { + assert_eq!(ack.at_seq, 42); + } + _ => panic!("Expected SnapshotAck"), + } +} + +/// Snapshot wire format matches the AC spec exactly. +#[test] +fn snapshot_wire_format_matches_spec() { + let snapshot = Snapshot { + at_seq: 100, + state: vec!["{}".to_string()], + op_manifest: vec![], + }; + let msg = SnapshotMessage::Snapshot(snapshot); + let json_str = serde_json::to_string(&msg).unwrap(); + // Must contain "type", "at_seq", "state" fields per AC1. + let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap(); + assert_eq!(parsed["type"], "snapshot"); + assert_eq!(parsed["at_seq"], 100); + assert!(parsed["state"].is_array()); +} + +/// SnapshotAck wire format matches the AC spec exactly. +#[test] +fn snapshot_ack_wire_format_matches_spec() { + let msg = SnapshotMessage::SnapshotAck(SnapshotAck { at_seq: 55 }); + let json_str = serde_json::to_string(&msg).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap(); + assert_eq!(parsed["type"], "snapshot_ack"); + assert_eq!(parsed["at_seq"], 55); +} + +// ── Leader selection tests ────────────────────────────────────────── + +/// Single node is always leader. +#[test] +fn single_node_is_always_leader() { + assert!(is_snapshot_leader("node_a", &[])); +} + +/// Leader is the node with the lowest hash. +#[test] +fn leader_is_lowest_hash_node() { + let nodes = vec![ + "node_alpha".to_string(), + "node_beta".to_string(), + "node_gamma".to_string(), + ]; + + // Find which node has the lowest hash. + let mut lowest_node = &nodes[0]; + let mut lowest_hash = leader_hash(&nodes[0]); + for node in &nodes[1..] { + let h = leader_hash(node); + if h < lowest_hash { + lowest_hash = h; + lowest_node = node; + } + } + + // Only the lowest-hash node should be leader. + for node in &nodes { + let is_leader = is_snapshot_leader(node, &nodes); + if node == lowest_node { + assert!(is_leader, "{node} should be leader (lowest hash)"); + } else { + assert!(!is_leader, "{node} should NOT be leader"); + } + } +} + +/// Leader selection is deterministic. +#[test] +fn leader_selection_is_deterministic() { + let peers = vec!["a".to_string(), "b".to_string(), "c".to_string()]; + let result1 = is_snapshot_leader("a", &peers); + let result2 = is_snapshot_leader("a", &peers); + assert_eq!(result1, result2); +} + +// ── Trigger logic tests ───────────────────────────────────────────── + +/// Threshold is configurable. +#[test] +fn snapshot_threshold_default() { + // When not set, defaults to DEFAULT_SNAPSHOT_THRESHOLD. + assert_eq!(snapshot_threshold(), DEFAULT_SNAPSHOT_THRESHOLD); +} + +// ── Coordination tests ────────────────────────────────────────────── + +/// Record ack returns true when all peers have acked. +#[test] +fn coordination_quorum_reached() { + init(); + let snapshot = Snapshot { + at_seq: 10, + state: vec![], + op_manifest: vec![], + }; + let peers = vec![ + "self".to_string(), + "peer_a".to_string(), + "peer_b".to_string(), + ]; + begin_coordination(snapshot, &peers, "self"); + + assert!(!record_ack("peer_a", 10)); + assert!(record_ack("peer_b", 10)); +} + +/// Ack for wrong at_seq is rejected. +#[test] +fn coordination_ack_wrong_seq_rejected() { + init(); + let snapshot = Snapshot { + at_seq: 20, + state: vec![], + op_manifest: vec![], + }; + begin_coordination( + snapshot, + &["self".to_string(), "peer_a".to_string()], + "self", + ); + assert!(!record_ack("peer_a", 999)); +} + +/// Abort clears pending state. +#[test] +fn coordination_abort_clears_state() { + init(); + let snapshot = Snapshot { + at_seq: 30, + state: vec![], + op_manifest: vec![], + }; + begin_coordination( + snapshot, + &["self".to_string(), "peer_a".to_string()], + "self", + ); + assert!(has_pending_coordination()); + + abort_coordination(); + assert!(!has_pending_coordination()); +} + +/// Unacked peers are reported correctly. +#[test] +fn unacked_peers_reported() { + init(); + let snapshot = Snapshot { + at_seq: 40, + state: vec![], + op_manifest: vec![], + }; + begin_coordination( + snapshot, + &[ + "self".to_string(), + "peer_a".to_string(), + "peer_b".to_string(), + ], + "self", + ); + + let unacked = unacked_peers(); + assert_eq!(unacked.len(), 2); + + record_ack("peer_a", 40); + let unacked = unacked_peers(); + assert_eq!(unacked.len(), 1); + assert_eq!(unacked[0], "peer_b"); +} + +// ── Snapshot generation tests ─────────────────────────────────────── + +/// Snapshot generation from ops includes attribution manifest. +#[test] +fn snapshot_generation_includes_manifest() { + crate::crdt_state::init_for_test(); + + // Write some items to populate ALL_OPS. + crate::crdt_state::write_item( + "636_test_a", + "1_backlog", + Some("Test A"), + None, + None, + None, + None, + None, + None, + None, + ); + crate::crdt_state::write_item( + "636_test_b", + "2_current", + Some("Test B"), + None, + None, + None, + None, + None, + None, + None, + ); + + let snapshot = generate_snapshot(); + assert!(snapshot.is_some()); + let snapshot = snapshot.unwrap(); + assert!(snapshot.at_seq > 0); + assert!(!snapshot.state.is_empty()); + assert!(!snapshot.op_manifest.is_empty()); + + // Every manifest entry must have a non-empty author and sig. + for entry in &snapshot.op_manifest { + assert!(!entry.author.is_empty(), "author must not be empty"); + assert!(!entry.sig.is_empty(), "sig must not be empty"); + } +} + +/// Attribution can be queried by story_id after snapshot. +#[test] +fn attribution_query_by_story_id() { + crate::crdt_state::init_for_test(); + init(); + + crate::crdt_state::write_item( + "636_attrib_test", + "1_backlog", + Some("Attribution Test"), + None, + None, + None, + None, + None, + None, + None, + ); + + let snapshot = generate_snapshot().unwrap(); + + // Store as latest. + if let Some(state) = snapshot_state() + && let Ok(mut s) = state.lock() + { + s.latest_snapshot = Some(snapshot.clone()); + } + + let attrib = query_attribution("636_attrib_test"); + // Insert ops for the story should appear in the manifest. + let has_story = attrib.iter().any(|e| e.story_id == "636_attrib_test"); + assert!(has_story, "attribution must include ops for the story"); +} + +// ── Compaction tests ──────────────────────────────────────────────── + +/// After compaction, ALL_OPS size is reduced. +#[test] +fn compaction_reduces_ops() { + crate::crdt_state::init_for_test(); + init(); + + // Write several items. + for i in 0..5 { + crate::crdt_state::write_item( + &format!("636_compact_{i}"), + "1_backlog", + Some(&format!("Item {i}")), + None, + None, + None, + None, + None, + None, + None, + ); + } + + let ops_before = crate::crdt_state::all_ops_json().unwrap().len(); + assert!(ops_before >= 5); + + // Generate a snapshot — at_seq is the max seq across all ops. + let snapshot = generate_snapshot().unwrap(); + + let result = apply_compaction(snapshot); + assert!(result); + + // After compaction, ops with seq < at_seq are gone, but ops with + // seq >= at_seq remain (which may be 0 or 1). + let ops_after = crate::crdt_state::all_ops_json().unwrap().len(); + // At minimum, some pruning should occur if at_seq > some op seqs. + assert!( + ops_after <= ops_before, + "ops_after ({ops_after}) should be <= ops_before ({ops_before})" + ); +} + +/// Latest snapshot is available after compaction. +#[test] +fn latest_snapshot_available_after_compaction() { + crate::crdt_state::init_for_test(); + init(); + + crate::crdt_state::write_item( + "636_latest_test", + "1_backlog", + Some("Latest Test"), + None, + None, + None, + None, + None, + None, + None, + ); + + let snapshot = generate_snapshot().unwrap(); + let at_seq = snapshot.at_seq; + apply_compaction(snapshot); + + let latest = latest_snapshot(); + assert!(latest.is_some()); + assert_eq!(latest.unwrap().at_seq, at_seq); +} + +// ── Integration tests: 3-node compaction ─────────────────────────── + +/// Integration test: 3 nodes, force compaction, verify all converge on +/// the same `at_seq` and pruned ops disappear from each node's log. +#[test] +fn three_node_compaction_convergence() { + use bft_json_crdt::json_crdt::BaseCrdt; + + // Simulate 3 independent nodes. + let kp_a = make_keypair(); + let kp_b = make_keypair(); + let kp_c = make_keypair(); + + let mut crdt_a = BaseCrdt::::new(&kp_a); + let mut crdt_b = BaseCrdt::::new(&kp_b); + let mut crdt_c = BaseCrdt::::new(&kp_c); + + // Node A creates items. + let mut all_ops_json: Vec = Vec::new(); + for i in 0..10u32 { + let item: JsonValue = json!({ + "story_id": format!("636_3node_{i}"), + "stage": "1_backlog", + "name": format!("Item {i}"), + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + "merged_at": 0.0, + }) + .into(); + let op = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); + crdt_a.apply(op.clone()); + let op_json = serde_json::to_string(&op).unwrap(); + all_ops_json.push(op_json); + } + + // Sync ops to B and C. + for op_json in &all_ops_json { + let op: SignedOp = serde_json::from_str(op_json).unwrap(); + crdt_b.apply(op.clone()); + crdt_c.apply(op); + } + + // All 3 nodes have the same 10 items. + assert_eq!(crdt_a.doc.items.view().len(), 10); + assert_eq!(crdt_b.doc.items.view().len(), 10); + assert_eq!(crdt_c.doc.items.view().len(), 10); + + // Generate snapshot on leader (A). + let mut max_seq = 0u64; + let mut manifest = Vec::new(); + for op_json in &all_ops_json { + if let Ok(signed_op) = serde_json::from_str::(op_json) { + if signed_op.inner.seq > max_seq { + max_seq = signed_op.inner.seq; + } + manifest.push(OpManifestEntry { + author: crate::crdt_state::hex::encode(&signed_op.author()), + story_id: extract_story_id_from_op(&signed_op), + sig: crate::crdt_state::hex::encode(&signed_op.signed_digest), + seq: signed_op.inner.seq, + }); + } + } + + let snapshot = Snapshot { + at_seq: max_seq, + state: all_ops_json.clone(), + op_manifest: manifest, + }; + + // Broadcast snapshot wire message. + let msg = SnapshotMessage::Snapshot(snapshot.clone()); + let wire = serde_json::to_string(&msg).unwrap(); + + // Each node receives and parses the snapshot. + let parsed: SnapshotMessage = serde_json::from_str(&wire).unwrap(); + match parsed { + SnapshotMessage::Snapshot(s) => { + // All nodes converge on the same at_seq. + assert_eq!(s.at_seq, max_seq); + assert_eq!(s.state.len(), all_ops_json.len()); + assert_eq!(s.op_manifest.len(), all_ops_json.len()); + } + _ => panic!("Expected Snapshot"), + } + + // Each node sends an ack. + let ack = SnapshotMessage::SnapshotAck(SnapshotAck { at_seq: max_seq }); + let ack_wire = serde_json::to_string(&ack).unwrap(); + let parsed_ack: SnapshotMessage = serde_json::from_str(&ack_wire).unwrap(); + match parsed_ack { + SnapshotMessage::SnapshotAck(a) => { + assert_eq!(a.at_seq, max_seq); + } + _ => panic!("Expected SnapshotAck"), + } + + // After all acks, compaction proceeds — ops with seq < at_seq are pruned. + // Since all ops have seq <= max_seq, only ops with seq == max_seq survive. + // The op manifest preserves attribution for all pruned ops. + assert!(!snapshot.op_manifest.is_empty()); + for entry in &snapshot.op_manifest { + assert!(!entry.author.is_empty()); + assert!(!entry.sig.is_empty()); + } +} + +// ── Failure mode test ────────────────────────────────────────────── + +/// Simulate one node going offline mid-coordination; remaining peers +/// abort the compaction cleanly (no half-applied snapshot state). +#[test] +fn failure_mode_node_offline_aborts_cleanly() { + init(); + + let snapshot = Snapshot { + at_seq: 50, + state: vec!["op1".to_string()], + op_manifest: vec![OpManifestEntry { + author: "aabb".to_string(), + story_id: "636_fail_test".to_string(), + sig: "ccdd".to_string(), + seq: 1, + }], + }; + + let peers = vec![ + "leader".to_string(), + "peer_a".to_string(), + "peer_b".to_string(), + ]; + begin_coordination(snapshot.clone(), &peers, "leader"); + + // peer_a acks, but peer_b goes offline (no ack). + assert!(!record_ack("peer_a", 50)); + + // Leader detects peer_b is offline → abort. + let unacked = unacked_peers(); + assert!(unacked.contains(&"peer_b".to_string())); + + abort_coordination(); + assert!(!has_pending_coordination()); + + // No compaction was applied — state is clean. + // The latest_snapshot should NOT be set. + // (The snapshot was never committed.) + let state = snapshot_state().unwrap().lock().unwrap(); + // pending_at_seq is cleared. + assert!(state.pending_at_seq.is_none()); + assert!(state.pending_acks.is_empty()); +} + +// ── New-node onboarding test ─────────────────────────────────────── + +/// A node joining a snapshotted cluster receives the most recent snapshot +/// + ops with seq >= at_seq. +#[test] +fn new_node_onboarding_with_snapshot() { + let kp = make_keypair(); + let mut crdt_existing = BaseCrdt::::new(&kp); + + // Create 10 ops on the existing cluster. + let mut all_ops: Vec = Vec::new(); + for i in 0..10u32 { + let item: JsonValue = json!({ + "story_id": format!("636_onboard_{i}"), + "stage": "1_backlog", + "name": format!("Onboard {i}"), + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + "merged_at": 0.0, + }) + .into(); + let op = crdt_existing.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt_existing.apply(op.clone()); + all_ops.push(serde_json::to_string(&op).unwrap()); + } + + // Snapshot taken at seq 5 — ops 0-4 are compacted. + let snapshot = Snapshot { + at_seq: 5, + state: all_ops[..5].to_vec(), + op_manifest: vec![], + }; + + // New node receives snapshot + ops with seq >= 5. + let mut crdt_new = BaseCrdt::::new(&kp); + + // Apply snapshot state first. + for op_json in &snapshot.state { + if let Ok(op) = serde_json::from_str::(op_json) { + crdt_new.apply(op); + } + } + + // Apply remaining ops (seq >= at_seq). + for op_json in &all_ops[5..] { + if let Ok(op) = serde_json::from_str::(op_json) { + crdt_new.apply(op); + } + } + + // New node should have all 10 items. + assert_eq!(crdt_new.doc.items.view().len(), 10); +} + +// ── Backwards compatibility test ─────────────────────────────────── + +/// Peers without snapshot support fall back to vector-clock-based full sync. +#[test] +fn backwards_compat_unknown_snapshot_message_ignored() { + // A peer that doesn't understand snapshot messages should be able to + // parse them as unknown variants and ignore them gracefully. + let snapshot_json = r#"{"type":"snapshot","at_seq":100,"state":["op1"],"op_manifest":[]}"#; + + // Attempt to parse as a legacy SyncMessage — should fail (unknown type). + let result: Result = + serde_json::from_str(snapshot_json); + // This is expected to fail — old peers ignore unknown types. + assert!( + result.is_err(), + "legacy peers should fail to parse snapshot messages" + ); + + // The snapshot message parses correctly as SnapshotMessage. + let parsed: SnapshotMessage = serde_json::from_str(snapshot_json).unwrap(); + match parsed { + SnapshotMessage::Snapshot(s) => { + assert_eq!(s.at_seq, 100); + } + _ => panic!("Expected Snapshot"), + } +} + +// ── Attribution preservation integration test ────────────────────── + +/// After compaction, an archived story's attribution can be reconstructed. +#[test] +fn attribution_preserved_after_compaction() { + crate::crdt_state::init_for_test(); + init(); + + // Write a story through its lifecycle. + crate::crdt_state::write_item( + "636_archived_story", + "1_backlog", + Some("Archived Story"), + Some("coder-opus"), + None, + None, + None, + None, + None, + None, + ); + crate::crdt_state::write_item( + "636_archived_story", + "2_current", + None, + None, + None, + None, + None, + None, + None, + None, + ); + crate::crdt_state::write_item( + "636_archived_story", + "6_archived", + None, + None, + None, + None, + None, + None, + None, + None, + ); + + // Generate snapshot. + let snapshot = generate_snapshot().unwrap(); + + // Verify the manifest contains entries for the archived story. + let story_entries: Vec<&OpManifestEntry> = snapshot + .op_manifest + .iter() + .filter(|e| e.story_id == "636_archived_story") + .collect(); + assert!( + !story_entries.is_empty(), + "manifest must contain entries for the archived story" + ); + + // Each entry must have author (node pubkey) and signature. + for entry in &story_entries { + assert!(!entry.author.is_empty(), "author must be preserved"); + assert!(!entry.sig.is_empty(), "signature must be preserved"); + assert!(entry.seq > 0, "seq must be preserved"); + } + + // Apply compaction. + let at_seq = snapshot.at_seq; + apply_compaction(snapshot); + + // After compaction, the attribution chain is still queryable. + let attrib = query_attribution("636_archived_story"); + assert!( + !attrib.is_empty(), + "attribution must be queryable after compaction" + ); + for entry in &attrib { + assert_eq!(entry.story_id, "636_archived_story"); + assert!(!entry.author.is_empty()); + assert!(!entry.sig.is_empty()); + } + + // The latest snapshot records the at_seq. + let latest = latest_snapshot().unwrap(); + assert_eq!(latest.at_seq, at_seq); +}