fix(crdt_snapshot): serialise tests that share global SNAPSHOT_STATE / ALL_OPS / VECTOR_CLOCK (bug 669)

The crdt_snapshot tests share three global statics:
- SNAPSHOT_STATE (latest_snapshot, pending_acks, pending_at_seq) — coordination state
- crdt_state::ALL_OPS / VECTOR_CLOCK — op journal + vector clock

Only the per-thread CRDT is thread-local (init_for_test); these other globals
are shared across test threads. Under default cargo test parallelism, two tests
running concurrently interleave their op writes and snapshot generation, so
assertions like assert_eq!(at_seq, 4) fail with at_seq=5 (the other thread's
ops snuck in).

Add a module-level GLOBAL_STATE_LOCK that all 17 affected tests grab at the
top. unwrap_or_else(|e| e.into_inner()) handles the case where a prior test
panicked while holding the lock (poisoned).

Fixes bug 669 — these two tests were the silent killer behind every agent's
script/test failure (see also bug 668, which advanced agents to merge despite
gates_passed=false; that compounded this by sending failing-tests worktrees
to mergemaster).

All 2636 tests now pass under default parallel execution (no --test-threads=1
needed).

Closes #669.
This commit is contained in:
dave
2026-04-27 02:43:49 +00:00
parent 404fd396f5
commit 8e608feec1
+23
View File
@@ -439,6 +439,12 @@ mod tests {
use crate::crdt_state::PipelineDoc; use crate::crdt_state::PipelineDoc;
/// Serialises tests that mutate the global SNAPSHOT_STATE / ALL_OPS / VECTOR_CLOCK
/// statics. These statics are shared across test threads (only the per-thread
/// CRDT is thread-local), so without this lock parallel tests interleave their
/// op writes and snapshot generation.
static GLOBAL_STATE_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
// ── Wire message tests ────────────────────────────────────────────── // ── Wire message tests ──────────────────────────────────────────────
/// Snapshot wire message serialization round-trip. /// Snapshot wire message serialization round-trip.
@@ -516,12 +522,14 @@ mod tests {
/// Single node is always leader. /// Single node is always leader.
#[test] #[test]
fn single_node_is_always_leader() { fn single_node_is_always_leader() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
assert!(is_snapshot_leader("node_a", &[])); assert!(is_snapshot_leader("node_a", &[]));
} }
/// Leader is the node with the lowest hash. /// Leader is the node with the lowest hash.
#[test] #[test]
fn leader_is_lowest_hash_node() { fn leader_is_lowest_hash_node() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let nodes = vec![ let nodes = vec![
"node_alpha".to_string(), "node_alpha".to_string(),
"node_beta".to_string(), "node_beta".to_string(),
@@ -553,6 +561,7 @@ mod tests {
/// Leader selection is deterministic. /// Leader selection is deterministic.
#[test] #[test]
fn leader_selection_is_deterministic() { fn leader_selection_is_deterministic() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let peers = vec!["a".to_string(), "b".to_string(), "c".to_string()]; let peers = vec!["a".to_string(), "b".to_string(), "c".to_string()];
let result1 = is_snapshot_leader("a", &peers); let result1 = is_snapshot_leader("a", &peers);
let result2 = is_snapshot_leader("a", &peers); let result2 = is_snapshot_leader("a", &peers);
@@ -564,6 +573,7 @@ mod tests {
/// Threshold is configurable. /// Threshold is configurable.
#[test] #[test]
fn snapshot_threshold_default() { fn snapshot_threshold_default() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
// When not set, defaults to DEFAULT_SNAPSHOT_THRESHOLD. // When not set, defaults to DEFAULT_SNAPSHOT_THRESHOLD.
assert_eq!(snapshot_threshold(), DEFAULT_SNAPSHOT_THRESHOLD); assert_eq!(snapshot_threshold(), DEFAULT_SNAPSHOT_THRESHOLD);
} }
@@ -573,6 +583,7 @@ mod tests {
/// Record ack returns true when all peers have acked. /// Record ack returns true when all peers have acked.
#[test] #[test]
fn coordination_quorum_reached() { fn coordination_quorum_reached() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
init(); init();
let snapshot = Snapshot { let snapshot = Snapshot {
at_seq: 10, at_seq: 10,
@@ -593,6 +604,7 @@ mod tests {
/// Ack for wrong at_seq is rejected. /// Ack for wrong at_seq is rejected.
#[test] #[test]
fn coordination_ack_wrong_seq_rejected() { fn coordination_ack_wrong_seq_rejected() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
init(); init();
let snapshot = Snapshot { let snapshot = Snapshot {
at_seq: 20, at_seq: 20,
@@ -610,6 +622,7 @@ mod tests {
/// Abort clears pending state. /// Abort clears pending state.
#[test] #[test]
fn coordination_abort_clears_state() { fn coordination_abort_clears_state() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
init(); init();
let snapshot = Snapshot { let snapshot = Snapshot {
at_seq: 30, at_seq: 30,
@@ -630,6 +643,7 @@ mod tests {
/// Unacked peers are reported correctly. /// Unacked peers are reported correctly.
#[test] #[test]
fn unacked_peers_reported() { fn unacked_peers_reported() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
init(); init();
let snapshot = Snapshot { let snapshot = Snapshot {
at_seq: 40, at_seq: 40,
@@ -660,6 +674,7 @@ mod tests {
/// Snapshot generation from ops includes attribution manifest. /// Snapshot generation from ops includes attribution manifest.
#[test] #[test]
fn snapshot_generation_includes_manifest() { fn snapshot_generation_includes_manifest() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
crdt_state::init_for_test(); crdt_state::init_for_test();
// Write some items to populate ALL_OPS. // Write some items to populate ALL_OPS.
@@ -705,6 +720,7 @@ mod tests {
/// Attribution can be queried by story_id after snapshot. /// Attribution can be queried by story_id after snapshot.
#[test] #[test]
fn attribution_query_by_story_id() { fn attribution_query_by_story_id() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
crdt_state::init_for_test(); crdt_state::init_for_test();
init(); init();
@@ -741,6 +757,7 @@ mod tests {
/// After compaction, ALL_OPS size is reduced. /// After compaction, ALL_OPS size is reduced.
#[test] #[test]
fn compaction_reduces_ops() { fn compaction_reduces_ops() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
crdt_state::init_for_test(); crdt_state::init_for_test();
init(); init();
@@ -782,6 +799,7 @@ mod tests {
/// Latest snapshot is available after compaction. /// Latest snapshot is available after compaction.
#[test] #[test]
fn latest_snapshot_available_after_compaction() { fn latest_snapshot_available_after_compaction() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
crdt_state::init_for_test(); crdt_state::init_for_test();
init(); init();
@@ -813,6 +831,7 @@ mod tests {
/// the same `at_seq` and pruned ops disappear from each node's log. /// the same `at_seq` and pruned ops disappear from each node's log.
#[test] #[test]
fn three_node_compaction_convergence() { fn three_node_compaction_convergence() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
use bft_json_crdt::json_crdt::BaseCrdt; use bft_json_crdt::json_crdt::BaseCrdt;
// Simulate 3 independent nodes. // Simulate 3 independent nodes.
@@ -924,6 +943,7 @@ mod tests {
/// abort the compaction cleanly (no half-applied snapshot state). /// abort the compaction cleanly (no half-applied snapshot state).
#[test] #[test]
fn failure_mode_node_offline_aborts_cleanly() { fn failure_mode_node_offline_aborts_cleanly() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
init(); init();
let snapshot = Snapshot { let snapshot = Snapshot {
@@ -969,6 +989,7 @@ mod tests {
/// + ops with seq >= at_seq. /// + ops with seq >= at_seq.
#[test] #[test]
fn new_node_onboarding_with_snapshot() { fn new_node_onboarding_with_snapshot() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let kp = make_keypair(); let kp = make_keypair();
let mut crdt_existing = BaseCrdt::<PipelineDoc>::new(&kp); let mut crdt_existing = BaseCrdt::<PipelineDoc>::new(&kp);
@@ -1026,6 +1047,7 @@ mod tests {
/// Peers without snapshot support fall back to vector-clock-based full sync. /// Peers without snapshot support fall back to vector-clock-based full sync.
#[test] #[test]
fn backwards_compat_unknown_snapshot_message_ignored() { fn backwards_compat_unknown_snapshot_message_ignored() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
// A peer that doesn't understand snapshot messages should be able to // A peer that doesn't understand snapshot messages should be able to
// parse them as unknown variants and ignore them gracefully. // parse them as unknown variants and ignore them gracefully.
let snapshot_json = r#"{"type":"snapshot","at_seq":100,"state":["op1"],"op_manifest":[]}"#; let snapshot_json = r#"{"type":"snapshot","at_seq":100,"state":["op1"],"op_manifest":[]}"#;
@@ -1054,6 +1076,7 @@ mod tests {
/// After compaction, an archived story's attribution can be reconstructed. /// After compaction, an archived story's attribution can be reconstructed.
#[test] #[test]
fn attribution_preserved_after_compaction() { fn attribution_preserved_after_compaction() {
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
crdt_state::init_for_test(); crdt_state::init_for_test();
init(); init();