huskies: merge 688_refactor_decompose_server_src_crdt_snapshot_rs_1182_lines

This commit is contained in:
dave
2026-04-27 22:38:33 +00:00
parent 225c4f2b46
commit 571a057f52
3 changed files with 1181 additions and 1182 deletions
+476
View File
@@ -0,0 +1,476 @@
//! CRDT snapshot compaction with cross-node coordination.
//!
//! This module implements full CRDT state snapshots for compacting the op journal.
//! When the op log grows beyond a configurable threshold (default: 10,000 ops) or
//! on a scheduled tick (default: weekly), the leader node generates a snapshot of
//! the current CRDT state and coordinates with all alive peers to discard ops that
//! precede the snapshot.
//!
//! # Attribution preservation
//!
//! Compaction preserves the full attribution chain via an "op manifest" — a
//! compact record of `(author, story_id, signature, seq)` tuples for every op
//! in the compacted range. This allows incident-response forensics to
//! reconstruct who did what to which story, even after the op payloads are
//! discarded.
//!
//! # Wire messages
//!
//! - `{"type": "snapshot", "at_seq": <seq>, "state": <serialized CRDT state>}`
//! - `{"type": "snapshot_ack", "at_seq": <seq>}`
//!
//! # Leader selection
//!
//! The alive peer with the lowest `hash(node_id)` generates the snapshot,
//! mirroring the claim-priority shape from story 634.
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Mutex;
use std::sync::OnceLock;
use crate::crdt_state;
// ── Configuration ──────────────────────────────────────────────────────
/// Default threshold: snapshot fires when ALL_OPS.len() exceeds this.
pub const DEFAULT_SNAPSHOT_THRESHOLD: usize = 10_000;
/// Default scheduled interval in seconds (1 week).
pub const DEFAULT_SNAPSHOT_INTERVAL_SECS: u64 = 7 * 24 * 3600;
/// Configurable snapshot threshold (set at startup, defaults to [`DEFAULT_SNAPSHOT_THRESHOLD`]).
static SNAPSHOT_THRESHOLD: OnceLock<usize> = OnceLock::new();
/// Return the current snapshot threshold.
pub fn snapshot_threshold() -> usize {
SNAPSHOT_THRESHOLD
.get()
.copied()
.unwrap_or(DEFAULT_SNAPSHOT_THRESHOLD)
}
/// Set the snapshot threshold (must be called before first trigger check).
pub fn set_snapshot_threshold(threshold: usize) {
let _ = SNAPSHOT_THRESHOLD.set(threshold);
}
// ── Wire message types ─────────────────────────────────────────────────
/// A single entry in the attribution manifest — preserves forensic metadata
/// for each op that existed before the snapshot point.
///
/// This allows reconstructing who did what to which story, even after the
/// op payloads (state values) are discarded during compaction.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct OpManifestEntry {
/// Hex-encoded Ed25519 public key of the op's author.
pub author: String,
/// The story_id this op pertains to (extracted from the op's path/content).
pub story_id: String,
/// Hex-encoded Ed25519 signature proving the author created this op.
pub sig: String,
/// Lamport sequence number of the op.
pub seq: u64,
}
/// A CRDT state snapshot used for compaction.
///
/// Contains the serialized CRDT state at a specific sequence point, plus an
/// attribution manifest that preserves the forensic chain for all ops up to
/// that point.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct Snapshot {
/// The sequence number at which this snapshot was taken.
/// All ops with `seq < at_seq` can be discarded after compaction.
pub at_seq: u64,
/// The serialized CRDT state (all ops as JSON strings).
pub state: Vec<String>,
/// Attribution manifest: one entry per op in the compacted range.
/// Preserves author, story_id, signature, and seq for forensics.
pub op_manifest: Vec<OpManifestEntry>,
}
/// Acknowledgement that a peer has received and applied a snapshot.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct SnapshotAck {
/// The sequence number being acknowledged — must match the snapshot's `at_seq`.
pub at_seq: u64,
}
/// Wire messages for the snapshot protocol, exchanged as text frames during
/// the CRDT sync streaming phase.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum SnapshotMessage {
/// Leader broadcasts the snapshot to all peers.
Snapshot(Snapshot),
/// Peer acknowledges receipt and application of the snapshot.
SnapshotAck(SnapshotAck),
}
// ── Snapshot state ─────────────────────────────────────────────────────
/// In-memory snapshot state for this node.
struct SnapshotState {
/// The most recent completed snapshot (if any).
latest_snapshot: Option<Snapshot>,
/// Pending acks for an in-progress snapshot coordination.
/// Maps node_id → whether they've acked.
pending_acks: HashMap<String, bool>,
/// The at_seq of the in-progress snapshot (if any).
pending_at_seq: Option<u64>,
}
#[cfg(not(test))]
static SNAPSHOT_STATE: OnceLock<Mutex<SnapshotState>> = OnceLock::new();
#[cfg(test)]
thread_local! {
/// Per-thread snapshot state for test isolation. Each test thread gets its
/// own SnapshotState so parallel tests do not leak into each other's
/// snapshot/coordination history.
static SNAPSHOT_STATE_TL: OnceLock<Mutex<SnapshotState>> = const { OnceLock::new() };
}
/// Read access to the snapshot state.
///
/// In production this returns the global `SNAPSHOT_STATE`. In `cfg(test)`
/// each thread sees its own thread-local state, so parallel tests do not
/// share `SnapshotState`.
fn snapshot_state() -> Option<&'static Mutex<SnapshotState>> {
#[cfg(not(test))]
{
SNAPSHOT_STATE.get()
}
#[cfg(test)]
{
let ptr = SNAPSHOT_STATE_TL.with(|lock| lock as *const OnceLock<Mutex<SnapshotState>>);
// SAFETY: the thread-local lives as long as the thread. We only need
// 'static for the return type; consumers never outlive the spawning
// test thread.
unsafe { &*ptr }.get()
}
}
/// Initialise snapshot state. Safe to call multiple times.
///
/// In production: idempotent via `OnceLock`. In `cfg(test)`: initialises the
/// per-thread state on first call; subsequent calls on the same thread are
/// no-ops.
pub fn init() {
let value = || {
Mutex::new(SnapshotState {
latest_snapshot: None,
pending_acks: HashMap::new(),
pending_at_seq: None,
})
};
#[cfg(not(test))]
{
let _ = SNAPSHOT_STATE.set(value());
}
#[cfg(test)]
{
SNAPSHOT_STATE_TL.with(|lock| {
let _ = lock.set(value());
});
}
}
/// Return the most recent completed snapshot, if any.
pub fn latest_snapshot() -> Option<Snapshot> {
snapshot_state()?.lock().ok()?.latest_snapshot.clone()
}
// ── Leader selection ───────────────────────────────────────────────────
/// Compute the snapshot-leader priority hash for a node.
///
/// Uses the same SHA-256 approach as story 634's claim priority, but hashes
/// only the `node_id` (not a story_id) since snapshot leadership is global.
fn leader_hash(node_id: &str) -> u64 {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(node_id.as_bytes());
let digest = hasher.finalize();
u64::from_be_bytes(digest[..8].try_into().expect("sha256 is 32 bytes"))
}
/// Determine whether this node is the snapshot leader among alive peers.
///
/// The alive peer with the **lowest** `hash(node_id)` is the leader.
/// Returns `true` if `self_node_id` has a strictly lower hash than all
/// other alive peers. When there are no other alive peers (single-node
/// cluster), the result is always `true`.
pub fn is_snapshot_leader(self_node_id: &str, alive_peer_node_ids: &[String]) -> bool {
let my_hash = leader_hash(self_node_id);
for peer_id in alive_peer_node_ids {
if peer_id == self_node_id {
continue;
}
if leader_hash(peer_id) <= my_hash {
return false;
}
}
true
}
// ── Trigger logic ──────────────────────────────────────────────────────
/// Check whether a snapshot should be triggered based on the current op count.
///
/// Returns `true` when `ALL_OPS.len() > threshold` (configurable, default 10,000).
pub fn should_trigger_by_count() -> bool {
let count = crdt_state::all_ops_json().map(|ops| ops.len()).unwrap_or(0);
count > snapshot_threshold()
}
// ── Snapshot generation ────────────────────────────────────────────────
/// Generate a snapshot of the current CRDT state.
///
/// The snapshot contains:
/// 1. All ops as serialized JSON (the full CRDT state).
/// 2. An attribution manifest preserving author, story_id, sig, and seq
/// for every op — ensuring forensic traceability survives compaction.
///
/// The `at_seq` is the highest sequence number across all ops.
pub fn generate_snapshot() -> Option<Snapshot> {
let all_ops = crdt_state::all_ops_json()?;
if all_ops.is_empty() {
return None;
}
let mut max_seq: u64 = 0;
let mut manifest = Vec::with_capacity(all_ops.len());
for op_json in &all_ops {
if let Ok(signed_op) = serde_json::from_str::<bft_json_crdt::json_crdt::SignedOp>(op_json) {
let seq = signed_op.inner.seq;
if seq > max_seq {
max_seq = seq;
}
// Extract story_id from the op's content if available.
let story_id = extract_story_id_from_op(&signed_op);
manifest.push(OpManifestEntry {
author: crdt_state::hex::encode(&signed_op.author()),
story_id,
sig: crdt_state::hex::encode(&signed_op.signed_digest),
seq,
});
}
}
Some(Snapshot {
at_seq: max_seq,
state: all_ops,
op_manifest: manifest,
})
}
/// Extract the story_id from a SignedOp's content, if it contains one.
///
/// Ops that target pipeline items contain a JSON object with a `story_id`
/// field. For ops that don't (e.g. node presence), returns an empty string.
pub(crate) fn extract_story_id_from_op(op: &bft_json_crdt::json_crdt::SignedOp) -> String {
// Try to extract from the op's path — the second segment often contains
// the list index which maps to a story. However, the most reliable way
// is to look at the content if it's an insert op.
if let Some(bft_json_crdt::json_crdt::JsonValue::Object(map)) = &op.inner.content
&& let Some(bft_json_crdt::json_crdt::JsonValue::String(sid)) = map.get("story_id")
{
return sid.clone();
}
// For field-update ops (LWW set), the path tells us which item, but not
// the story_id directly. Return empty — the manifest still preserves
// author + sig + seq for forensic correlation.
String::new()
}
// ── Coordination ──────────────────────────────────────────────────────
/// Begin snapshot coordination as the leader.
///
/// Records which alive peers need to ack and stores the pending snapshot.
/// Returns the snapshot to broadcast to peers.
pub fn begin_coordination(
snapshot: Snapshot,
alive_peer_node_ids: &[String],
self_node_id: &str,
) -> Option<Snapshot> {
let state = snapshot_state()?;
let mut s = state.lock().ok()?;
let mut pending = HashMap::new();
for peer_id in alive_peer_node_ids {
if peer_id != self_node_id {
pending.insert(peer_id.clone(), false);
}
}
s.pending_at_seq = Some(snapshot.at_seq);
s.pending_acks = pending;
Some(snapshot)
}
/// Record a snapshot ack from a peer.
///
/// Returns `true` if quorum has been reached (all alive peers have acked).
pub fn record_ack(node_id: &str, at_seq: u64) -> bool {
let Some(state) = snapshot_state() else {
return false;
};
let Ok(mut s) = state.lock() else {
return false;
};
// Verify the ack matches the pending snapshot.
if s.pending_at_seq != Some(at_seq) {
return false;
}
if let Some(acked) = s.pending_acks.get_mut(node_id) {
*acked = true;
}
// Check if all peers have acked (quorum = all alive peers).
s.pending_acks.values().all(|&v| v)
}
/// Abort the current pending snapshot coordination.
///
/// Called when a peer goes offline mid-coordination or quorum times out.
pub fn abort_coordination() {
if let Some(state) = snapshot_state()
&& let Ok(mut s) = state.lock()
{
s.pending_at_seq = None;
s.pending_acks.clear();
}
}
/// Check whether there is a pending snapshot coordination in progress.
pub fn has_pending_coordination() -> bool {
snapshot_state()
.and_then(|s| s.lock().ok())
.map(|s| s.pending_at_seq.is_some())
.unwrap_or(false)
}
/// Return the list of peers that have NOT yet acked the pending snapshot.
pub fn unacked_peers() -> Vec<String> {
snapshot_state()
.and_then(|s| s.lock().ok())
.map(|s| {
s.pending_acks
.iter()
.filter(|&(_, &acked)| !acked)
.map(|(id, _)| id.clone())
.collect()
})
.unwrap_or_default()
}
// ── Compaction ─────────────────────────────────────────────────────────
/// Apply compaction: replace the op journal with the snapshot state.
///
/// After successful quorum, the leader (and each peer upon receiving the
/// snapshot) replaces its `ALL_OPS` with the snapshot's state and resets
/// the vector clock accordingly.
///
/// The snapshot's `op_manifest` is preserved in the snapshot state for
/// forensic queries.
///
/// Returns `true` if compaction was applied successfully.
pub fn apply_compaction(snapshot: Snapshot) -> bool {
// Store the snapshot as the latest for future new-node onboarding.
if let Some(state) = snapshot_state()
&& let Ok(mut s) = state.lock()
{
s.latest_snapshot = Some(snapshot.clone());
s.pending_at_seq = None;
s.pending_acks.clear();
}
// Replace ALL_OPS with the snapshot state (the compacted ops).
// In a real compaction, we'd keep only ops with seq >= at_seq, but the
// snapshot already contains the minimal set needed to reconstruct state.
//
// For this implementation, the snapshot state IS the full state — peers
// discard their old journal and replace it with the snapshot's ops.
// The op_manifest preserves attribution for the discarded ops.
if let Some(all_ops) = crdt_state::ALL_OPS.get()
&& let Ok(mut v) = all_ops.lock()
{
// Calculate ops to prune: those with seq < at_seq
let mut kept_ops = Vec::new();
let mut pruned_count = 0usize;
for op_json in v.iter() {
if let Ok(signed_op) =
serde_json::from_str::<bft_json_crdt::json_crdt::SignedOp>(op_json)
{
if signed_op.inner.seq >= snapshot.at_seq {
kept_ops.push(op_json.clone());
} else {
pruned_count += 1;
}
} else {
// Keep unparseable ops to avoid data loss.
kept_ops.push(op_json.clone());
}
}
*v = kept_ops;
// Rebuild vector clock from remaining ops.
if let Some(vc) = crdt_state::VECTOR_CLOCK.get()
&& let Ok(mut clock) = vc.lock()
{
clock.clear();
for op_json in v.iter() {
if let Ok(signed_op) =
serde_json::from_str::<bft_json_crdt::json_crdt::SignedOp>(op_json)
{
let author_hex = crdt_state::hex::encode(&signed_op.author());
*clock.entry(author_hex).or_insert(0) += 1;
}
}
}
crate::slog!(
"[crdt-snapshot] Compaction applied: pruned {pruned_count} ops, kept {} ops",
v.len()
);
return true;
}
false
}
/// Retrieve the op manifest from the latest snapshot for forensic queries.
///
/// Returns `None` if no snapshot has been taken yet.
pub fn latest_op_manifest() -> Option<Vec<OpManifestEntry>> {
latest_snapshot().map(|s| s.op_manifest)
}
/// Query the op manifest for a specific story's attribution chain.
///
/// Returns all manifest entries where `story_id` matches the query.
pub fn query_attribution(story_id: &str) -> Vec<OpManifestEntry> {
latest_op_manifest()
.unwrap_or_default()
.into_iter()
.filter(|entry| entry.story_id == story_id)
.collect()
}
// ── Tests ──────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests;
+705
View File
@@ -0,0 +1,705 @@
use super::*;
use bft_json_crdt::json_crdt::{BaseCrdt, JsonValue, SignedOp};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use crate::crdt_state::PipelineDoc;
// ── Wire message tests ──────────────────────────────────────────────
/// Snapshot wire message serialization round-trip.
#[test]
fn snapshot_message_serialization_roundtrip() {
let snapshot = Snapshot {
at_seq: 42,
state: vec!["op1".to_string(), "op2".to_string()],
op_manifest: vec![OpManifestEntry {
author: "aabbcc".to_string(),
story_id: "100_story_test".to_string(),
sig: "deadbeef".to_string(),
seq: 1,
}],
};
let msg = SnapshotMessage::Snapshot(snapshot.clone());
let json_str = serde_json::to_string(&msg).unwrap();
assert!(json_str.contains(r#""type":"snapshot""#));
let deserialized: SnapshotMessage = serde_json::from_str(&json_str).unwrap();
match deserialized {
SnapshotMessage::Snapshot(s) => {
assert_eq!(s.at_seq, 42);
assert_eq!(s.state.len(), 2);
assert_eq!(s.op_manifest.len(), 1);
assert_eq!(s.op_manifest[0].author, "aabbcc");
}
_ => panic!("Expected Snapshot"),
}
}
/// Snapshot ack wire message serialization round-trip.
#[test]
fn snapshot_ack_message_serialization_roundtrip() {
let msg = SnapshotMessage::SnapshotAck(SnapshotAck { at_seq: 42 });
let json_str = serde_json::to_string(&msg).unwrap();
assert!(json_str.contains(r#""type":"snapshot_ack""#));
let deserialized: SnapshotMessage = serde_json::from_str(&json_str).unwrap();
match deserialized {
SnapshotMessage::SnapshotAck(ack) => {
assert_eq!(ack.at_seq, 42);
}
_ => panic!("Expected SnapshotAck"),
}
}
/// Snapshot wire format matches the AC spec exactly.
#[test]
fn snapshot_wire_format_matches_spec() {
let snapshot = Snapshot {
at_seq: 100,
state: vec!["{}".to_string()],
op_manifest: vec![],
};
let msg = SnapshotMessage::Snapshot(snapshot);
let json_str = serde_json::to_string(&msg).unwrap();
// Must contain "type", "at_seq", "state" fields per AC1.
let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
assert_eq!(parsed["type"], "snapshot");
assert_eq!(parsed["at_seq"], 100);
assert!(parsed["state"].is_array());
}
/// SnapshotAck wire format matches the AC spec exactly.
#[test]
fn snapshot_ack_wire_format_matches_spec() {
let msg = SnapshotMessage::SnapshotAck(SnapshotAck { at_seq: 55 });
let json_str = serde_json::to_string(&msg).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
assert_eq!(parsed["type"], "snapshot_ack");
assert_eq!(parsed["at_seq"], 55);
}
// ── Leader selection tests ──────────────────────────────────────────
/// Single node is always leader.
#[test]
fn single_node_is_always_leader() {
assert!(is_snapshot_leader("node_a", &[]));
}
/// Leader is the node with the lowest hash.
#[test]
fn leader_is_lowest_hash_node() {
let nodes = vec![
"node_alpha".to_string(),
"node_beta".to_string(),
"node_gamma".to_string(),
];
// Find which node has the lowest hash.
let mut lowest_node = &nodes[0];
let mut lowest_hash = leader_hash(&nodes[0]);
for node in &nodes[1..] {
let h = leader_hash(node);
if h < lowest_hash {
lowest_hash = h;
lowest_node = node;
}
}
// Only the lowest-hash node should be leader.
for node in &nodes {
let is_leader = is_snapshot_leader(node, &nodes);
if node == lowest_node {
assert!(is_leader, "{node} should be leader (lowest hash)");
} else {
assert!(!is_leader, "{node} should NOT be leader");
}
}
}
/// Leader selection is deterministic.
#[test]
fn leader_selection_is_deterministic() {
let peers = vec!["a".to_string(), "b".to_string(), "c".to_string()];
let result1 = is_snapshot_leader("a", &peers);
let result2 = is_snapshot_leader("a", &peers);
assert_eq!(result1, result2);
}
// ── Trigger logic tests ─────────────────────────────────────────────
/// Threshold is configurable.
#[test]
fn snapshot_threshold_default() {
// When not set, defaults to DEFAULT_SNAPSHOT_THRESHOLD.
assert_eq!(snapshot_threshold(), DEFAULT_SNAPSHOT_THRESHOLD);
}
// ── Coordination tests ──────────────────────────────────────────────
/// Record ack returns true when all peers have acked.
#[test]
fn coordination_quorum_reached() {
init();
let snapshot = Snapshot {
at_seq: 10,
state: vec![],
op_manifest: vec![],
};
let peers = vec![
"self".to_string(),
"peer_a".to_string(),
"peer_b".to_string(),
];
begin_coordination(snapshot, &peers, "self");
assert!(!record_ack("peer_a", 10));
assert!(record_ack("peer_b", 10));
}
/// Ack for wrong at_seq is rejected.
#[test]
fn coordination_ack_wrong_seq_rejected() {
init();
let snapshot = Snapshot {
at_seq: 20,
state: vec![],
op_manifest: vec![],
};
begin_coordination(
snapshot,
&["self".to_string(), "peer_a".to_string()],
"self",
);
assert!(!record_ack("peer_a", 999));
}
/// Abort clears pending state.
#[test]
fn coordination_abort_clears_state() {
init();
let snapshot = Snapshot {
at_seq: 30,
state: vec![],
op_manifest: vec![],
};
begin_coordination(
snapshot,
&["self".to_string(), "peer_a".to_string()],
"self",
);
assert!(has_pending_coordination());
abort_coordination();
assert!(!has_pending_coordination());
}
/// Unacked peers are reported correctly.
#[test]
fn unacked_peers_reported() {
init();
let snapshot = Snapshot {
at_seq: 40,
state: vec![],
op_manifest: vec![],
};
begin_coordination(
snapshot,
&[
"self".to_string(),
"peer_a".to_string(),
"peer_b".to_string(),
],
"self",
);
let unacked = unacked_peers();
assert_eq!(unacked.len(), 2);
record_ack("peer_a", 40);
let unacked = unacked_peers();
assert_eq!(unacked.len(), 1);
assert_eq!(unacked[0], "peer_b");
}
// ── Snapshot generation tests ───────────────────────────────────────
/// Snapshot generation from ops includes attribution manifest.
#[test]
fn snapshot_generation_includes_manifest() {
crate::crdt_state::init_for_test();
// Write some items to populate ALL_OPS.
crate::crdt_state::write_item(
"636_test_a",
"1_backlog",
Some("Test A"),
None,
None,
None,
None,
None,
None,
None,
);
crate::crdt_state::write_item(
"636_test_b",
"2_current",
Some("Test B"),
None,
None,
None,
None,
None,
None,
None,
);
let snapshot = generate_snapshot();
assert!(snapshot.is_some());
let snapshot = snapshot.unwrap();
assert!(snapshot.at_seq > 0);
assert!(!snapshot.state.is_empty());
assert!(!snapshot.op_manifest.is_empty());
// Every manifest entry must have a non-empty author and sig.
for entry in &snapshot.op_manifest {
assert!(!entry.author.is_empty(), "author must not be empty");
assert!(!entry.sig.is_empty(), "sig must not be empty");
}
}
/// Attribution can be queried by story_id after snapshot.
#[test]
fn attribution_query_by_story_id() {
crate::crdt_state::init_for_test();
init();
crate::crdt_state::write_item(
"636_attrib_test",
"1_backlog",
Some("Attribution Test"),
None,
None,
None,
None,
None,
None,
None,
);
let snapshot = generate_snapshot().unwrap();
// Store as latest.
if let Some(state) = snapshot_state()
&& let Ok(mut s) = state.lock()
{
s.latest_snapshot = Some(snapshot.clone());
}
let attrib = query_attribution("636_attrib_test");
// Insert ops for the story should appear in the manifest.
let has_story = attrib.iter().any(|e| e.story_id == "636_attrib_test");
assert!(has_story, "attribution must include ops for the story");
}
// ── Compaction tests ────────────────────────────────────────────────
/// After compaction, ALL_OPS size is reduced.
#[test]
fn compaction_reduces_ops() {
crate::crdt_state::init_for_test();
init();
// Write several items.
for i in 0..5 {
crate::crdt_state::write_item(
&format!("636_compact_{i}"),
"1_backlog",
Some(&format!("Item {i}")),
None,
None,
None,
None,
None,
None,
None,
);
}
let ops_before = crate::crdt_state::all_ops_json().unwrap().len();
assert!(ops_before >= 5);
// Generate a snapshot — at_seq is the max seq across all ops.
let snapshot = generate_snapshot().unwrap();
let result = apply_compaction(snapshot);
assert!(result);
// After compaction, ops with seq < at_seq are gone, but ops with
// seq >= at_seq remain (which may be 0 or 1).
let ops_after = crate::crdt_state::all_ops_json().unwrap().len();
// At minimum, some pruning should occur if at_seq > some op seqs.
assert!(
ops_after <= ops_before,
"ops_after ({ops_after}) should be <= ops_before ({ops_before})"
);
}
/// Latest snapshot is available after compaction.
#[test]
fn latest_snapshot_available_after_compaction() {
crate::crdt_state::init_for_test();
init();
crate::crdt_state::write_item(
"636_latest_test",
"1_backlog",
Some("Latest Test"),
None,
None,
None,
None,
None,
None,
None,
);
let snapshot = generate_snapshot().unwrap();
let at_seq = snapshot.at_seq;
apply_compaction(snapshot);
let latest = latest_snapshot();
assert!(latest.is_some());
assert_eq!(latest.unwrap().at_seq, at_seq);
}
// ── Integration tests: 3-node compaction ───────────────────────────
/// Integration test: 3 nodes, force compaction, verify all converge on
/// the same `at_seq` and pruned ops disappear from each node's log.
#[test]
fn three_node_compaction_convergence() {
use bft_json_crdt::json_crdt::BaseCrdt;
// Simulate 3 independent nodes.
let kp_a = make_keypair();
let kp_b = make_keypair();
let kp_c = make_keypair();
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
let mut crdt_c = BaseCrdt::<PipelineDoc>::new(&kp_c);
// Node A creates items.
let mut all_ops_json: Vec<String> = Vec::new();
for i in 0..10u32 {
let item: JsonValue = json!({
"story_id": format!("636_3node_{i}"),
"stage": "1_backlog",
"name": format!("Item {i}"),
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
"merged_at": 0.0,
})
.into();
let op = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a);
crdt_a.apply(op.clone());
let op_json = serde_json::to_string(&op).unwrap();
all_ops_json.push(op_json);
}
// Sync ops to B and C.
for op_json in &all_ops_json {
let op: SignedOp = serde_json::from_str(op_json).unwrap();
crdt_b.apply(op.clone());
crdt_c.apply(op);
}
// All 3 nodes have the same 10 items.
assert_eq!(crdt_a.doc.items.view().len(), 10);
assert_eq!(crdt_b.doc.items.view().len(), 10);
assert_eq!(crdt_c.doc.items.view().len(), 10);
// Generate snapshot on leader (A).
let mut max_seq = 0u64;
let mut manifest = Vec::new();
for op_json in &all_ops_json {
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json) {
if signed_op.inner.seq > max_seq {
max_seq = signed_op.inner.seq;
}
manifest.push(OpManifestEntry {
author: crate::crdt_state::hex::encode(&signed_op.author()),
story_id: extract_story_id_from_op(&signed_op),
sig: crate::crdt_state::hex::encode(&signed_op.signed_digest),
seq: signed_op.inner.seq,
});
}
}
let snapshot = Snapshot {
at_seq: max_seq,
state: all_ops_json.clone(),
op_manifest: manifest,
};
// Broadcast snapshot wire message.
let msg = SnapshotMessage::Snapshot(snapshot.clone());
let wire = serde_json::to_string(&msg).unwrap();
// Each node receives and parses the snapshot.
let parsed: SnapshotMessage = serde_json::from_str(&wire).unwrap();
match parsed {
SnapshotMessage::Snapshot(s) => {
// All nodes converge on the same at_seq.
assert_eq!(s.at_seq, max_seq);
assert_eq!(s.state.len(), all_ops_json.len());
assert_eq!(s.op_manifest.len(), all_ops_json.len());
}
_ => panic!("Expected Snapshot"),
}
// Each node sends an ack.
let ack = SnapshotMessage::SnapshotAck(SnapshotAck { at_seq: max_seq });
let ack_wire = serde_json::to_string(&ack).unwrap();
let parsed_ack: SnapshotMessage = serde_json::from_str(&ack_wire).unwrap();
match parsed_ack {
SnapshotMessage::SnapshotAck(a) => {
assert_eq!(a.at_seq, max_seq);
}
_ => panic!("Expected SnapshotAck"),
}
// After all acks, compaction proceeds — ops with seq < at_seq are pruned.
// Since all ops have seq <= max_seq, only ops with seq == max_seq survive.
// The op manifest preserves attribution for all pruned ops.
assert!(!snapshot.op_manifest.is_empty());
for entry in &snapshot.op_manifest {
assert!(!entry.author.is_empty());
assert!(!entry.sig.is_empty());
}
}
// ── Failure mode test ──────────────────────────────────────────────
/// Simulate one node going offline mid-coordination; remaining peers
/// abort the compaction cleanly (no half-applied snapshot state).
#[test]
fn failure_mode_node_offline_aborts_cleanly() {
init();
let snapshot = Snapshot {
at_seq: 50,
state: vec!["op1".to_string()],
op_manifest: vec![OpManifestEntry {
author: "aabb".to_string(),
story_id: "636_fail_test".to_string(),
sig: "ccdd".to_string(),
seq: 1,
}],
};
let peers = vec![
"leader".to_string(),
"peer_a".to_string(),
"peer_b".to_string(),
];
begin_coordination(snapshot.clone(), &peers, "leader");
// peer_a acks, but peer_b goes offline (no ack).
assert!(!record_ack("peer_a", 50));
// Leader detects peer_b is offline → abort.
let unacked = unacked_peers();
assert!(unacked.contains(&"peer_b".to_string()));
abort_coordination();
assert!(!has_pending_coordination());
// No compaction was applied — state is clean.
// The latest_snapshot should NOT be set.
// (The snapshot was never committed.)
let state = snapshot_state().unwrap().lock().unwrap();
// pending_at_seq is cleared.
assert!(state.pending_at_seq.is_none());
assert!(state.pending_acks.is_empty());
}
// ── New-node onboarding test ───────────────────────────────────────
/// A node joining a snapshotted cluster receives the most recent snapshot
/// + ops with seq >= at_seq.
#[test]
fn new_node_onboarding_with_snapshot() {
let kp = make_keypair();
let mut crdt_existing = BaseCrdt::<PipelineDoc>::new(&kp);
// Create 10 ops on the existing cluster.
let mut all_ops: Vec<String> = Vec::new();
for i in 0..10u32 {
let item: JsonValue = json!({
"story_id": format!("636_onboard_{i}"),
"stage": "1_backlog",
"name": format!("Onboard {i}"),
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
"merged_at": 0.0,
})
.into();
let op = crdt_existing.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt_existing.apply(op.clone());
all_ops.push(serde_json::to_string(&op).unwrap());
}
// Snapshot taken at seq 5 — ops 0-4 are compacted.
let snapshot = Snapshot {
at_seq: 5,
state: all_ops[..5].to_vec(),
op_manifest: vec![],
};
// New node receives snapshot + ops with seq >= 5.
let mut crdt_new = BaseCrdt::<PipelineDoc>::new(&kp);
// Apply snapshot state first.
for op_json in &snapshot.state {
if let Ok(op) = serde_json::from_str::<SignedOp>(op_json) {
crdt_new.apply(op);
}
}
// Apply remaining ops (seq >= at_seq).
for op_json in &all_ops[5..] {
if let Ok(op) = serde_json::from_str::<SignedOp>(op_json) {
crdt_new.apply(op);
}
}
// New node should have all 10 items.
assert_eq!(crdt_new.doc.items.view().len(), 10);
}
// ── Backwards compatibility test ───────────────────────────────────
/// Peers without snapshot support fall back to vector-clock-based full sync.
#[test]
fn backwards_compat_unknown_snapshot_message_ignored() {
// A peer that doesn't understand snapshot messages should be able to
// parse them as unknown variants and ignore them gracefully.
let snapshot_json = r#"{"type":"snapshot","at_seq":100,"state":["op1"],"op_manifest":[]}"#;
// Attempt to parse as a legacy SyncMessage — should fail (unknown type).
let result: Result<crate::crdt_sync::SyncMessagePublic, _> =
serde_json::from_str(snapshot_json);
// This is expected to fail — old peers ignore unknown types.
assert!(
result.is_err(),
"legacy peers should fail to parse snapshot messages"
);
// The snapshot message parses correctly as SnapshotMessage.
let parsed: SnapshotMessage = serde_json::from_str(snapshot_json).unwrap();
match parsed {
SnapshotMessage::Snapshot(s) => {
assert_eq!(s.at_seq, 100);
}
_ => panic!("Expected Snapshot"),
}
}
// ── Attribution preservation integration test ──────────────────────
/// After compaction, an archived story's attribution can be reconstructed.
#[test]
fn attribution_preserved_after_compaction() {
crate::crdt_state::init_for_test();
init();
// Write a story through its lifecycle.
crate::crdt_state::write_item(
"636_archived_story",
"1_backlog",
Some("Archived Story"),
Some("coder-opus"),
None,
None,
None,
None,
None,
None,
);
crate::crdt_state::write_item(
"636_archived_story",
"2_current",
None,
None,
None,
None,
None,
None,
None,
None,
);
crate::crdt_state::write_item(
"636_archived_story",
"6_archived",
None,
None,
None,
None,
None,
None,
None,
None,
);
// Generate snapshot.
let snapshot = generate_snapshot().unwrap();
// Verify the manifest contains entries for the archived story.
let story_entries: Vec<&OpManifestEntry> = snapshot
.op_manifest
.iter()
.filter(|e| e.story_id == "636_archived_story")
.collect();
assert!(
!story_entries.is_empty(),
"manifest must contain entries for the archived story"
);
// Each entry must have author (node pubkey) and signature.
for entry in &story_entries {
assert!(!entry.author.is_empty(), "author must be preserved");
assert!(!entry.sig.is_empty(), "signature must be preserved");
assert!(entry.seq > 0, "seq must be preserved");
}
// Apply compaction.
let at_seq = snapshot.at_seq;
apply_compaction(snapshot);
// After compaction, the attribution chain is still queryable.
let attrib = query_attribution("636_archived_story");
assert!(
!attrib.is_empty(),
"attribution must be queryable after compaction"
);
for entry in &attrib {
assert_eq!(entry.story_id, "636_archived_story");
assert!(!entry.author.is_empty());
assert!(!entry.sig.is_empty());
}
// The latest snapshot records the at_seq.
let latest = latest_snapshot().unwrap();
assert_eq!(latest.at_seq, at_seq);
}