From 7408cc5b4b5e06c9ddadd9150f525aa94d1ed6b4 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 27 Apr 2026 02:49:53 +0000 Subject: [PATCH] fix(crdt_snapshot): per-thread SNAPSHOT_STATE in cfg(test) instead of shared static (bug 669) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the test-time GLOBAL_STATE_LOCK approach (which was just disguised single-threading) with proper test isolation: each test thread gets its own SnapshotState via a thread_local!. Pattern matches crdt_state::CRDT_STATE_TL — production keeps the global OnceLock; tests get a per-thread OnceLock that's accessed through a snapshot_state() helper. The unsafe `&*ptr` cast to 'static is safe because the thread_local lives as long as the spawning test thread. The race: latest_snapshot_available_after_compaction captured at_seq from a freshly-generated snapshot, then asserted it equalled SNAPSHOT_STATE's latest.at_seq. With shared SNAPSHOT_STATE, another test thread's apply_compaction could overwrite latest_snapshot between capture and read. Per-thread state eliminates the race at its source. ALL_OPS / VECTOR_CLOCK stay shared — the tests don't assert on absolute counts, only on (this-thread's at_seq) == (this-thread's latest.at_seq). 5 consecutive default-parallel `cargo test --bin huskies` runs all green at 2636/2636. --- server/src/crdt_snapshot.rs | 77 +++++++++++++++++++++++++++++-------- 1 file changed, 60 insertions(+), 17 deletions(-) diff --git a/server/src/crdt_snapshot.rs b/server/src/crdt_snapshot.rs index cdd8c13c..1dc4db97 100644 --- a/server/src/crdt_snapshot.rs +++ b/server/src/crdt_snapshot.rs @@ -122,20 +122,65 @@ struct SnapshotState { pending_at_seq: Option, } +#[cfg(not(test))] static SNAPSHOT_STATE: OnceLock> = OnceLock::new(); -/// Initialise snapshot state. Safe to call multiple times (OnceLock). +#[cfg(test)] +thread_local! { + /// Per-thread snapshot state for test isolation. Each test thread gets its + /// own SnapshotState so parallel tests do not leak into each other's + /// snapshot/coordination history. + static SNAPSHOT_STATE_TL: OnceLock> = const { OnceLock::new() }; +} + +/// Read access to the snapshot state. +/// +/// In production this returns the global `SNAPSHOT_STATE`. In `cfg(test)` +/// each thread sees its own thread-local state, so parallel tests do not +/// share `SnapshotState`. +fn snapshot_state() -> Option<&'static Mutex> { + #[cfg(not(test))] + { + SNAPSHOT_STATE.get() + } + #[cfg(test)] + { + let ptr = SNAPSHOT_STATE_TL.with(|lock| lock as *const OnceLock>); + // SAFETY: the thread-local lives as long as the thread. We only need + // 'static for the return type; consumers never outlive the spawning + // test thread. + unsafe { &*ptr }.get() + } +} + +/// Initialise snapshot state. Safe to call multiple times. +/// +/// In production: idempotent via `OnceLock`. In `cfg(test)`: initialises the +/// per-thread state on first call; subsequent calls on the same thread are +/// no-ops. pub fn init() { - let _ = SNAPSHOT_STATE.set(Mutex::new(SnapshotState { - latest_snapshot: None, - pending_acks: HashMap::new(), - pending_at_seq: None, - })); + let value = || { + Mutex::new(SnapshotState { + latest_snapshot: None, + pending_acks: HashMap::new(), + pending_at_seq: None, + }) + }; + #[cfg(not(test))] + { + let _ = SNAPSHOT_STATE.set(value()); + } + #[cfg(test)] + { + SNAPSHOT_STATE_TL.with(|lock| { + let _ = lock.set(value()); + }); + } } /// Return the most recent completed snapshot, if any. pub fn latest_snapshot() -> Option { - SNAPSHOT_STATE.get()?.lock().ok()?.latest_snapshot.clone() + snapshot_state()?.lock().ok()?.latest_snapshot.clone() } // ── Leader selection ─────────────────────────────────────────────────── @@ -256,7 +301,7 @@ pub fn begin_coordination( alive_peer_node_ids: &[String], self_node_id: &str, ) -> Option { - let state = SNAPSHOT_STATE.get()?; + let state = snapshot_state()?; let mut s = state.lock().ok()?; let mut pending = HashMap::new(); @@ -276,7 +321,7 @@ pub fn begin_coordination( /// /// Returns `true` if quorum has been reached (all alive peers have acked). pub fn record_ack(node_id: &str, at_seq: u64) -> bool { - let Some(state) = SNAPSHOT_STATE.get() else { + let Some(state) = snapshot_state() else { return false; }; let Ok(mut s) = state.lock() else { @@ -300,7 +345,7 @@ pub fn record_ack(node_id: &str, at_seq: u64) -> bool { /// /// Called when a peer goes offline mid-coordination or quorum times out. pub fn abort_coordination() { - if let Some(state) = SNAPSHOT_STATE.get() + if let Some(state) = snapshot_state() && let Ok(mut s) = state.lock() { s.pending_at_seq = None; @@ -310,8 +355,7 @@ pub fn abort_coordination() { /// Check whether there is a pending snapshot coordination in progress. pub fn has_pending_coordination() -> bool { - SNAPSHOT_STATE - .get() + snapshot_state() .and_then(|s| s.lock().ok()) .map(|s| s.pending_at_seq.is_some()) .unwrap_or(false) @@ -319,8 +363,7 @@ pub fn has_pending_coordination() -> bool { /// Return the list of peers that have NOT yet acked the pending snapshot. pub fn unacked_peers() -> Vec { - SNAPSHOT_STATE - .get() + snapshot_state() .and_then(|s| s.lock().ok()) .map(|s| { s.pending_acks @@ -346,7 +389,7 @@ pub fn unacked_peers() -> Vec { /// Returns `true` if compaction was applied successfully. pub fn apply_compaction(snapshot: Snapshot) -> bool { // Store the snapshot as the latest for future new-node onboarding. - if let Some(state) = SNAPSHOT_STATE.get() + if let Some(state) = snapshot_state() && let Ok(mut s) = state.lock() { s.latest_snapshot = Some(snapshot.clone()); @@ -724,7 +767,7 @@ mod tests { let snapshot = generate_snapshot().unwrap(); // Store as latest. - if let Some(state) = SNAPSHOT_STATE.get() + if let Some(state) = snapshot_state() && let Ok(mut s) = state.lock() { s.latest_snapshot = Some(snapshot.clone()); @@ -957,7 +1000,7 @@ mod tests { // No compaction was applied — state is clean. // The latest_snapshot should NOT be set. // (The snapshot was never committed.) - let state = SNAPSHOT_STATE.get().unwrap().lock().unwrap(); + let state = snapshot_state().unwrap().lock().unwrap(); // pending_at_seq is cleared. assert!(state.pending_at_seq.is_none()); assert!(state.pending_acks.is_empty());