huskies: merge 1111 bug Test isolation: init_for_test() and ensure_content_store() are once-per-thread, not once-per-test, polluting CRDT state across tests

This commit is contained in:
dave
2026-05-17 00:28:48 +00:00
parent f8212f102f
commit a40500eea9
9 changed files with 111 additions and 36 deletions
+42 -34
View File
@@ -122,49 +122,57 @@ pub(super) fn get_crdt() -> Option<&'static Mutex<CrdtState>> {
/// This avoids the async SQLite setup from `init()`. Ops are sent to a
/// channel whose receiver is leaked (so nothing is persisted, but the channel
/// stays open and `apply_and_persist` succeeds silently).
/// Safe to call multiple times — subsequent calls are no-ops (thread-local).
/// Always resets all thread-local state so each call produces a clean slate —
/// no cross-test pollution when two tests share the same thread.
#[cfg(test)]
pub fn init_for_test() {
// Initialise thread-local CRDT for test isolation.
// Only creates a new CRDT if one isn't set yet on this thread;
// subsequent calls are no-ops (matching the old OnceLock semantics
// while keeping each thread isolated).
let keypair = make_keypair();
let crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
let (persist_tx, rx) = mpsc::unbounded_channel();
// Leak the receiver so the channel stays open: apply_and_persist
// can then send without error, preventing [crdt_persist] WARNs
// from racing with other tests that watch the global log buffer.
std::mem::forget(rx);
let fresh = CrdtState {
crdt,
keypair,
index: HashMap::new(),
node_index: HashMap::new(),
token_index: HashMap::new(),
merge_job_index: HashMap::new(),
active_agent_index: HashMap::new(),
test_job_index: HashMap::new(),
agent_throttle_index: HashMap::new(),
gateway_project_index: HashMap::new(),
persist_tx,
lamport_floor: 0,
tombstones: HashSet::new(),
};
CRDT_STATE_TL.with(|lock| {
if lock.get().is_none() {
let keypair = make_keypair();
let crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
let (persist_tx, rx) = mpsc::unbounded_channel();
// Leak the receiver so the channel stays open: apply_and_persist
// can then send without error, preventing [crdt_persist] WARNs
// from racing with other tests that watch the global log buffer.
std::mem::forget(rx);
let state = CrdtState {
crdt,
keypair,
index: HashMap::new(),
node_index: HashMap::new(),
token_index: HashMap::new(),
merge_job_index: HashMap::new(),
active_agent_index: HashMap::new(),
test_job_index: HashMap::new(),
agent_throttle_index: HashMap::new(),
gateway_project_index: HashMap::new(),
persist_tx,
lamport_floor: 0,
tombstones: HashSet::new(),
};
let _ = lock.set(Mutex::new(state));
if let Some(mutex) = lock.get() {
// Already set on this thread — replace contents so the second
// (and subsequent) test on the same thread starts clean.
*mutex.lock().unwrap() = fresh;
} else {
let _ = lock.set(Mutex::new(fresh));
}
});
let _ = statics::CRDT_EVENT_TX.get_or_init(|| broadcast::channel::<CrdtEvent>(256).0);
let _ = statics::SYNC_TX.get_or_init(|| broadcast::channel::<SignedOp>(1024).0);
// Per-thread op journal + vector clock — keeps parallel tests' writes
// from corrupting each other's view of ALL_OPS (notably, one thread's
// `apply_compaction` could otherwise prune another thread's ops).
// Per-thread op journal + vector clock — always cleared so a second test
// on the same thread cannot see ops written by the first.
statics::ALL_OPS_TL.with(|lock| {
let _ = lock.set(Mutex::new(Vec::new()));
if let Some(mutex) = lock.get() {
mutex.lock().unwrap().clear();
} else {
let _ = lock.set(Mutex::new(Vec::new()));
}
});
statics::VECTOR_CLOCK_TL.with(|lock| {
let _ = lock.set(Mutex::new(VectorClock::new()));
if let Some(mutex) = lock.get() {
mutex.lock().unwrap().clear();
} else {
let _ = lock.set(Mutex::new(VectorClock::new()));
}
});
}