From 8421104645ee0355b9808b0489f364ddfda8eac5 Mon Sep 17 00:00:00 2001 From: Timmy Date: Tue, 12 May 2026 16:09:38 +0100 Subject: [PATCH] fix(914): thread-local ALL_OPS/VECTOR_CLOCK in cfg(test) so compaction tests don't race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause was not the persist channel (the test-mode channel is unbounded and its receiver is leaked, so sends never fail). It was that `ALL_OPS` and `VECTOR_CLOCK` were process-wide `OnceLock` globals while `CRDT_STATE` was already thread-local — so one test thread's `apply_compaction` would prune another test thread's freshly-written ops out of the shared journal, and the subsequent `all_ops_json()` read in `compaction_reduces_ops` would return fewer than the 5 it had just written. Mirror the pattern already used for `CRDT_STATE` and `SnapshotState`: in `cfg(test)` use thread-local `OnceLock>`s for the op journal and vector clock, accessed via new `all_ops_lock()` / `vector_clock_lock()` helpers. Production code path is unchanged (still the global statics set during `init()`). Touches ops/read/snapshot call sites to go through the helpers. Note in passing that this overlaps backlog story 518; that story is about the production-side persist path, this is the cfg(test)-only journal-isolation slice. Co-Authored-By: Claude Opus 4.7 (1M context) --- server/src/crdt_snapshot/mod.rs | 4 +- server/src/crdt_state/mod.rs | 2 +- server/src/crdt_state/ops.rs | 13 +++--- server/src/crdt_state/read.rs | 5 +-- server/src/crdt_state/state/mod.rs | 13 ++++-- server/src/crdt_state/state/statics.rs | 55 +++++++++++++++++++++++++- 6 files changed, 74 insertions(+), 18 deletions(-) diff --git a/server/src/crdt_snapshot/mod.rs b/server/src/crdt_snapshot/mod.rs index 5f7fe0a4..33e5d383 100644 --- a/server/src/crdt_snapshot/mod.rs +++ b/server/src/crdt_snapshot/mod.rs @@ -404,7 +404,7 @@ pub fn apply_compaction(snapshot: Snapshot) -> bool { // For this implementation, the snapshot state IS the full state — peers // discard their old journal and replace it with the snapshot's ops. // The op_manifest preserves attribution for the discarded ops. - if let Some(all_ops) = crdt_state::ALL_OPS.get() + if let Some(all_ops) = crdt_state::all_ops_lock() && let Ok(mut v) = all_ops.lock() { // Calculate ops to prune: those with seq < at_seq @@ -428,7 +428,7 @@ pub fn apply_compaction(snapshot: Snapshot) -> bool { *v = kept_ops; // Rebuild vector clock from remaining ops. - if let Some(vc) = crdt_state::VECTOR_CLOCK.get() + if let Some(vc) = crdt_state::vector_clock_lock() && let Ok(mut clock) = vc.lock() { clock.clear(); diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index ece975a1..5ae73560 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -60,7 +60,7 @@ pub use write::{ #[cfg(test)] pub use state::init_for_test; -pub(crate) use state::{ALL_OPS, VECTOR_CLOCK}; +pub(crate) use state::{all_ops_lock, vector_clock_lock}; /// Hex-encode a byte slice (no external dep needed). pub(crate) mod hex { diff --git a/server/src/crdt_state/ops.rs b/server/src/crdt_state/ops.rs index 6ec96f08..30f7d233 100644 --- a/server/src/crdt_state/ops.rs +++ b/server/src/crdt_state/ops.rs @@ -10,10 +10,9 @@ use tokio::sync::broadcast; use super::VectorClock; use super::state::{ - ALL_OPS, SYNC_TX, VECTOR_CLOCK, apply_and_persist, emit_event, get_crdt, - rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_index, - rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index, rebuild_token_index, - track_op, + SYNC_TX, all_ops_lock, apply_and_persist, emit_event, get_crdt, rebuild_active_agent_index, + rebuild_agent_throttle_index, rebuild_index, rebuild_merge_job_index, rebuild_node_index, + rebuild_test_job_index, rebuild_token_index, track_op, vector_clock_lock, }; use super::types::{CrdtEvent, PipelineDoc}; use crate::slog; @@ -31,7 +30,7 @@ pub fn subscribe_ops() -> Option> { /// Used during initial sync handshake so a newly-connected peer can /// reconstruct the full CRDT state. Returns `None` before `init()`. pub fn all_ops_json() -> Option> { - ALL_OPS.get().map(|m| m.lock().unwrap().clone()) + all_ops_lock().map(|m| m.lock().unwrap().clone()) } /// Return this node's current vector clock. @@ -42,7 +41,7 @@ pub fn all_ops_json() -> Option> { /// /// Returns `None` before `init()`. pub fn our_vector_clock() -> Option { - VECTOR_CLOCK.get().map(|m| m.lock().unwrap().clone()) + vector_clock_lock().map(|m| m.lock().unwrap().clone()) } /// Return only the ops that a peer with the given `peer_clock` is missing. @@ -53,7 +52,7 @@ pub fn our_vector_clock() -> Option { /// /// Returns `None` before `init()`. pub fn ops_since(peer_clock: &VectorClock) -> Option> { - let all = ALL_OPS.get()?.lock().ok()?; + let all = all_ops_lock()?.lock().ok()?; let mut author_counts: HashMap = HashMap::new(); let mut result = Vec::new(); diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index f1e5cc33..ab21a24a 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use bft_json_crdt::json_crdt::*; -use super::state::{ALL_OPS, apply_and_persist, get_crdt, rebuild_index}; +use super::state::{all_ops_lock, apply_and_persist, get_crdt, rebuild_index}; use super::types::{PipelineDoc, PipelineItemCrdt, PipelineItemView}; use bft_json_crdt::op::ROOT_ID; @@ -55,8 +55,7 @@ pub struct CrdtStateDump { pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump { let in_memory_state_loaded = get_crdt().is_some(); - let persisted_ops_count = ALL_OPS - .get() + let persisted_ops_count = all_ops_lock() .and_then(|m| m.lock().ok().map(|v| v.len())) .unwrap_or(0); diff --git a/server/src/crdt_state/state/mod.rs b/server/src/crdt_state/state/mod.rs index f4d640a2..0e1a59e7 100644 --- a/server/src/crdt_state/state/mod.rs +++ b/server/src/crdt_state/state/mod.rs @@ -35,8 +35,8 @@ pub(super) use indices::{ rebuild_index, rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index, rebuild_token_index, }; -pub(crate) use statics::{ALL_OPS, VECTOR_CLOCK}; pub(super) use statics::{SYNC_TX, track_op}; +pub(crate) use statics::{all_ops_lock, vector_clock_lock}; // ── CrdtState struct ───────────────────────────────────────────────── @@ -152,6 +152,13 @@ pub fn init_for_test() { }); let _ = statics::CRDT_EVENT_TX.get_or_init(|| broadcast::channel::(256).0); let _ = statics::SYNC_TX.get_or_init(|| broadcast::channel::(1024).0); - let _ = statics::ALL_OPS.get_or_init(|| Mutex::new(Vec::new())); - let _ = statics::VECTOR_CLOCK.get_or_init(|| Mutex::new(VectorClock::new())); + // Per-thread op journal + vector clock — keeps parallel tests' writes + // from corrupting each other's view of ALL_OPS (notably, one thread's + // `apply_compaction` could otherwise prune another thread's ops). + statics::ALL_OPS_TL.with(|lock| { + let _ = lock.set(Mutex::new(Vec::new())); + }); + statics::VECTOR_CLOCK_TL.with(|lock| { + let _ = lock.set(Mutex::new(VectorClock::new())); + }); } diff --git a/server/src/crdt_state/state/statics.rs b/server/src/crdt_state/state/statics.rs index 4f11b60a..161c2018 100644 --- a/server/src/crdt_state/state/statics.rs +++ b/server/src/crdt_state/state/statics.rs @@ -4,6 +4,11 @@ //! channel ([`CRDT_EVENT_TX`]), and the in-memory op journal //! ([`ALL_OPS`] / [`VECTOR_CLOCK`]) that tracks every applied op for //! delta-sync. +//! +//! In `cfg(test)`, the op journal and vector clock are stored in +//! thread-local `OnceLock`s (mirroring [`super::CRDT_STATE_TL`]) so parallel +//! tests do not share `ALL_OPS` — preventing one test's `apply_compaction` +//! from pruning another test's freshly-written ops. use std::sync::{Mutex, OnceLock}; @@ -32,16 +37,62 @@ pub(crate) static ALL_OPS: OnceLock>> = OnceLock::new(); /// re-parsing all ops when a peer requests `our_vector_clock()`. pub(crate) static VECTOR_CLOCK: OnceLock> = OnceLock::new(); +#[cfg(test)] +thread_local! { + /// Per-thread op journal for test isolation. Each test thread sees its + /// own ALL_OPS so parallel tests cannot prune each other's ops via + /// `apply_compaction`. Set up by `init_for_test`. + pub(in crate::crdt_state) static ALL_OPS_TL: OnceLock>> = const { OnceLock::new() }; + /// Per-thread vector clock for test isolation. See [`ALL_OPS_TL`]. + pub(in crate::crdt_state) static VECTOR_CLOCK_TL: OnceLock> = const { OnceLock::new() }; +} + +/// Return the mutex guarding the op journal, if initialised. +/// +/// In production: the global `ALL_OPS`. In `cfg(test)`: the per-thread +/// `ALL_OPS_TL`, so parallel tests do not share the journal. +pub(crate) fn all_ops_lock() -> Option<&'static Mutex>> { + #[cfg(not(test))] + { + ALL_OPS.get() + } + #[cfg(test)] + { + let ptr = ALL_OPS_TL.with(|lock| lock as *const OnceLock>>); + // SAFETY: the thread-local lives as long as the spawning thread, + // which outlives any test code using it. We only need 'static for + // the return type; consumers never hold the reference past the test. + unsafe { &*ptr }.get() + } +} + +/// Return the mutex guarding the vector clock, if initialised. +/// +/// In production: the global `VECTOR_CLOCK`. In `cfg(test)`: the per-thread +/// `VECTOR_CLOCK_TL`. +pub(crate) fn vector_clock_lock() -> Option<&'static Mutex> { + #[cfg(not(test))] + { + VECTOR_CLOCK.get() + } + #[cfg(test)] + { + let ptr = VECTOR_CLOCK_TL.with(|lock| lock as *const OnceLock>); + // SAFETY: see all_ops_lock above. + unsafe { &*ptr }.get() + } +} + /// Append an op's JSON to `ALL_OPS` and bump the author's count in `VECTOR_CLOCK`. /// /// Centralises the bookkeeping that must stay in sync between the two statics. pub(in crate::crdt_state) fn track_op(signed: &SignedOp, json: String) { - if let Some(all) = ALL_OPS.get() + if let Some(all) = all_ops_lock() && let Ok(mut v) = all.lock() { v.push(json); } - if let Some(vc) = VECTOR_CLOCK.get() + if let Some(vc) = vector_clock_lock() && let Ok(mut clock) = vc.lock() { let author_hex = hex::encode(&signed.author());