Files
huskies/server/src/crdt_state/state/mod.rs
T
Timmy 8421104645 fix(914): thread-local ALL_OPS/VECTOR_CLOCK in cfg(test) so compaction tests don't race
Root cause was not the persist channel (the test-mode channel is unbounded
and its receiver is leaked, so sends never fail). It was that `ALL_OPS` and
`VECTOR_CLOCK` were process-wide `OnceLock` globals while `CRDT_STATE` was
already thread-local — so one test thread's `apply_compaction` would prune
another test thread's freshly-written ops out of the shared journal, and
the subsequent `all_ops_json()` read in `compaction_reduces_ops` would
return fewer than the 5 it had just written.

Mirror the pattern already used for `CRDT_STATE` and `SnapshotState`: in
`cfg(test)` use thread-local `OnceLock<Mutex<...>>`s for the op journal and
vector clock, accessed via new `all_ops_lock()` / `vector_clock_lock()`
helpers. Production code path is unchanged (still the global statics set
during `init()`).

Touches ops/read/snapshot call sites to go through the helpers. Note in
passing that this overlaps backlog story 518; that story is about the
production-side persist path, this is the cfg(test)-only journal-isolation
slice.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 16:09:38 +01:00

165 lines
6.9 KiB
Rust

//! Internal CRDT state struct, statics, initialisation, and central write primitive.
//!
//! This module is split into focused submodules:
//! - [`statics`]: broadcast channels and op-tracking statics
//! - [`indices`]: index-rebuild helpers for O(1) key lookup
//! - [`init`]: async startup and keypair persistence
//! - [`apply`]: write path (sign, apply, persist, broadcast)
use std::collections::{HashMap, HashSet};
use std::sync::{Mutex, OnceLock};
use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp};
use bft_json_crdt::keypair::make_keypair;
use fastcrypto::ed25519::Ed25519KeyPair;
use tokio::sync::{broadcast, mpsc};
use super::VectorClock;
use super::types::{CrdtEvent, PipelineDoc};
mod apply;
mod indices;
mod init;
mod statics;
#[cfg(test)]
mod tests;
// ── Re-exports for crdt_state siblings ──────────────────────────────
pub use init::init;
pub(super) use apply::{apply_and_persist, emit_event};
pub(super) use indices::{
rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_gateway_project_index,
rebuild_index, rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index,
rebuild_token_index,
};
pub(super) use statics::{SYNC_TX, track_op};
pub(crate) use statics::{all_ops_lock, vector_clock_lock};
// ── CrdtState struct ─────────────────────────────────────────────────
/// Holds the core CRDT document, signing keypair, and all O(1) lookup indices.
pub(super) struct CrdtState {
pub(super) crdt: BaseCrdt<PipelineDoc>,
pub(super) keypair: Ed25519KeyPair,
/// Maps story_id → index in the items ListCrdt for O(1) lookup.
pub(super) index: HashMap<String, usize>,
/// Maps node_id (hex) → index in the nodes ListCrdt for O(1) lookup.
pub(super) node_index: HashMap<String, usize>,
/// Maps agent_id → index in the tokens ListCrdt for O(1) lookup.
pub(super) token_index: HashMap<String, usize>,
/// Maps story_id → index in the merge_jobs ListCrdt for O(1) lookup.
pub(super) merge_job_index: HashMap<String, usize>,
/// Maps agent_id → index in the active_agents ListCrdt for O(1) lookup.
pub(super) active_agent_index: HashMap<String, usize>,
/// Maps story_id → index in the test_jobs ListCrdt for O(1) lookup.
pub(super) test_job_index: HashMap<String, usize>,
/// Maps node_id → index in the agent_throttle ListCrdt for O(1) lookup.
pub(super) agent_throttle_index: HashMap<String, usize>,
/// Maps project name → index in the gateway_projects ListCrdt for O(1) lookup.
pub(super) gateway_project_index: HashMap<String, usize>,
/// Channel sender for fire-and-forget op persistence.
pub(super) persist_tx: mpsc::UnboundedSender<SignedOp>,
/// Max sequence number seen across all ops during init() replay.
///
/// Newly-created registers (post-init) must have their Lamport clock
/// advanced to this floor so they don't re-emit low sequence numbers.
pub(super) lamport_floor: u64,
/// Story IDs permanently tombstoned via `evict_item`.
///
/// `write_item` consults this set before inserting a new CRDT entry so
/// that a concurrent or late-arriving write cannot resurrect a deleted
/// story. Rebuilt from the CRDT op log on restart.
pub(super) tombstones: HashSet<String>,
}
// ── Singleton and accessor ───────────────────────────────────────────
/// Process-wide singleton holding the initialised [`CrdtState`].
pub(super) static CRDT_STATE: OnceLock<Mutex<CrdtState>> = OnceLock::new();
#[cfg(test)]
thread_local! {
static CRDT_STATE_TL: OnceLock<Mutex<CrdtState>> = const { OnceLock::new() };
}
/// Returns a reference to the global [`CrdtState`] mutex, if initialised.
#[cfg(not(test))]
pub(super) fn get_crdt() -> Option<&'static Mutex<CrdtState>> {
CRDT_STATE.get()
}
/// Returns the thread-local [`CrdtState`] if set, otherwise the global one (test variant).
#[cfg(test)]
pub(super) fn get_crdt() -> Option<&'static Mutex<CrdtState>> {
let tl = CRDT_STATE_TL.with(|lock| {
if lock.get().is_some() {
Some(lock as *const OnceLock<Mutex<CrdtState>>)
} else {
None
}
});
if let Some(ptr) = tl {
// SAFETY: The thread-local lives as long as the thread, which outlives
// any test using it. We only need 'static for the return type.
let lock = unsafe { &*ptr };
lock.get()
} else {
CRDT_STATE.get()
}
}
/// Initialise a minimal in-memory CRDT state for unit tests.
///
/// This avoids the async SQLite setup from `init()`. Ops are sent to a
/// channel whose receiver is leaked (so nothing is persisted, but the channel
/// stays open and `apply_and_persist` succeeds silently).
/// Safe to call multiple times — subsequent calls are no-ops (thread-local).
#[cfg(test)]
pub fn init_for_test() {
// Initialise thread-local CRDT for test isolation.
// Only creates a new CRDT if one isn't set yet on this thread;
// subsequent calls are no-ops (matching the old OnceLock semantics
// while keeping each thread isolated).
CRDT_STATE_TL.with(|lock| {
if lock.get().is_none() {
let keypair = make_keypair();
let crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
let (persist_tx, rx) = mpsc::unbounded_channel();
// Leak the receiver so the channel stays open: apply_and_persist
// can then send without error, preventing [crdt_persist] WARNs
// from racing with other tests that watch the global log buffer.
std::mem::forget(rx);
let state = CrdtState {
crdt,
keypair,
index: HashMap::new(),
node_index: HashMap::new(),
token_index: HashMap::new(),
merge_job_index: HashMap::new(),
active_agent_index: HashMap::new(),
test_job_index: HashMap::new(),
agent_throttle_index: HashMap::new(),
gateway_project_index: HashMap::new(),
persist_tx,
lamport_floor: 0,
tombstones: HashSet::new(),
};
let _ = lock.set(Mutex::new(state));
}
});
let _ = statics::CRDT_EVENT_TX.get_or_init(|| broadcast::channel::<CrdtEvent>(256).0);
let _ = statics::SYNC_TX.get_or_init(|| broadcast::channel::<SignedOp>(1024).0);
// Per-thread op journal + vector clock — keeps parallel tests' writes
// from corrupting each other's view of ALL_OPS (notably, one thread's
// `apply_compaction` could otherwise prune another thread's ops).
statics::ALL_OPS_TL.with(|lock| {
let _ = lock.set(Mutex::new(Vec::new()));
});
statics::VECTOR_CLOCK_TL.with(|lock| {
let _ = lock.set(Mutex::new(VectorClock::new()));
});
}