diff --git a/server/src/crdt_snapshot.rs b/server/src/crdt_snapshot.rs index cdd8c13c..a7f2b346 100644 --- a/server/src/crdt_snapshot.rs +++ b/server/src/crdt_snapshot.rs @@ -439,6 +439,12 @@ mod tests { use crate::crdt_state::PipelineDoc; + /// Serialises tests that mutate the global SNAPSHOT_STATE / ALL_OPS / VECTOR_CLOCK + /// statics. These statics are shared across test threads (only the per-thread + /// CRDT is thread-local), so without this lock parallel tests interleave their + /// op writes and snapshot generation. + static GLOBAL_STATE_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(()); + // ── Wire message tests ────────────────────────────────────────────── /// Snapshot wire message serialization round-trip. @@ -516,12 +522,14 @@ mod tests { /// Single node is always leader. #[test] fn single_node_is_always_leader() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); assert!(is_snapshot_leader("node_a", &[])); } /// Leader is the node with the lowest hash. #[test] fn leader_is_lowest_hash_node() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let nodes = vec![ "node_alpha".to_string(), "node_beta".to_string(), @@ -553,6 +561,7 @@ mod tests { /// Leader selection is deterministic. #[test] fn leader_selection_is_deterministic() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let peers = vec!["a".to_string(), "b".to_string(), "c".to_string()]; let result1 = is_snapshot_leader("a", &peers); let result2 = is_snapshot_leader("a", &peers); @@ -564,6 +573,7 @@ mod tests { /// Threshold is configurable. #[test] fn snapshot_threshold_default() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); // When not set, defaults to DEFAULT_SNAPSHOT_THRESHOLD. assert_eq!(snapshot_threshold(), DEFAULT_SNAPSHOT_THRESHOLD); } @@ -573,6 +583,7 @@ mod tests { /// Record ack returns true when all peers have acked. #[test] fn coordination_quorum_reached() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); init(); let snapshot = Snapshot { at_seq: 10, @@ -593,6 +604,7 @@ mod tests { /// Ack for wrong at_seq is rejected. #[test] fn coordination_ack_wrong_seq_rejected() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); init(); let snapshot = Snapshot { at_seq: 20, @@ -610,6 +622,7 @@ mod tests { /// Abort clears pending state. #[test] fn coordination_abort_clears_state() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); init(); let snapshot = Snapshot { at_seq: 30, @@ -630,6 +643,7 @@ mod tests { /// Unacked peers are reported correctly. #[test] fn unacked_peers_reported() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); init(); let snapshot = Snapshot { at_seq: 40, @@ -660,6 +674,7 @@ mod tests { /// Snapshot generation from ops includes attribution manifest. #[test] fn snapshot_generation_includes_manifest() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); crdt_state::init_for_test(); // Write some items to populate ALL_OPS. @@ -705,6 +720,7 @@ mod tests { /// Attribution can be queried by story_id after snapshot. #[test] fn attribution_query_by_story_id() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); crdt_state::init_for_test(); init(); @@ -741,6 +757,7 @@ mod tests { /// After compaction, ALL_OPS size is reduced. #[test] fn compaction_reduces_ops() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); crdt_state::init_for_test(); init(); @@ -782,6 +799,7 @@ mod tests { /// Latest snapshot is available after compaction. #[test] fn latest_snapshot_available_after_compaction() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); crdt_state::init_for_test(); init(); @@ -813,6 +831,7 @@ mod tests { /// the same `at_seq` and pruned ops disappear from each node's log. #[test] fn three_node_compaction_convergence() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); use bft_json_crdt::json_crdt::BaseCrdt; // Simulate 3 independent nodes. @@ -924,6 +943,7 @@ mod tests { /// abort the compaction cleanly (no half-applied snapshot state). #[test] fn failure_mode_node_offline_aborts_cleanly() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); init(); let snapshot = Snapshot { @@ -969,6 +989,7 @@ mod tests { /// + ops with seq >= at_seq. #[test] fn new_node_onboarding_with_snapshot() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let kp = make_keypair(); let mut crdt_existing = BaseCrdt::::new(&kp); @@ -1026,6 +1047,7 @@ mod tests { /// Peers without snapshot support fall back to vector-clock-based full sync. #[test] fn backwards_compat_unknown_snapshot_message_ignored() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); // A peer that doesn't understand snapshot messages should be able to // parse them as unknown variants and ignore them gracefully. let snapshot_json = r#"{"type":"snapshot","at_seq":100,"state":["op1"],"op_manifest":[]}"#; @@ -1054,6 +1076,7 @@ mod tests { /// After compaction, an archived story's attribution can be reconstructed. #[test] fn attribution_preserved_after_compaction() { + let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner()); crdt_state::init_for_test(); init();