fix: thread-local CRDT and content store for test isolation

Tests shared a global CRDT singleton and content store HashMap, causing
flaky failures when parallel tests wrote items that polluted each
other's assertions. 3-5 random test failures per run.

Both CRDT_STATE and CONTENT_STORE now use thread_local! in test mode
so each test thread gets its own isolated instance. Production code
is unchanged — it still uses the global OnceLock singletons.

Also fixed 3 tests (create_story_writes_correct_content,
next_item_number_increments_from_existing_bugs,
next_item_number_scans_archived_too) that relied on leaked state
from other tests — they now write to the content store explicitly.

Result: 1902 passed, 0 failed across 5 consecutive runs.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
dave
2026-04-11 13:02:09 +00:00
parent dd53870c59
commit eea54ca616
4 changed files with 111 additions and 42 deletions
+57 -34
View File
@@ -158,6 +158,35 @@ struct CrdtState {
static CRDT_STATE: OnceLock<Mutex<CrdtState>> = OnceLock::new();
#[cfg(test)]
thread_local! {
static CRDT_STATE_TL: OnceLock<Mutex<CrdtState>> = const { OnceLock::new() };
}
#[cfg(not(test))]
fn get_crdt() -> Option<&'static Mutex<CrdtState>> {
CRDT_STATE.get()
}
#[cfg(test)]
fn get_crdt() -> Option<&'static Mutex<CrdtState>> {
let tl = CRDT_STATE_TL.with(|lock| {
if lock.get().is_some() {
Some(lock as *const OnceLock<Mutex<CrdtState>>)
} else {
None
}
});
if let Some(ptr) = tl {
// SAFETY: The thread-local lives as long as the thread, which outlives
// any test using it. We only need 'static for the return type.
let lock = unsafe { &*ptr };
lock.get()
} else {
CRDT_STATE.get()
}
}
// ── Initialisation ───────────────────────────────────────────────────
/// Initialise the CRDT state layer.
@@ -270,27 +299,22 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
/// Safe to call multiple times — subsequent calls are no-ops (OnceLock).
#[cfg(test)]
pub fn init_for_test() {
if CRDT_STATE.get().is_some() {
return;
}
let keypair = make_keypair();
let crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
let index = HashMap::new();
let node_index = HashMap::new();
let (persist_tx, _rx) = mpsc::unbounded_channel();
let state = CrdtState {
crdt,
keypair,
index,
node_index,
persist_tx,
};
let _ = CRDT_STATE.set(Mutex::new(state));
let (event_tx, _) = broadcast::channel::<CrdtEvent>(256);
let _ = CRDT_EVENT_TX.set(event_tx);
let (sync_tx, _) = broadcast::channel::<SignedOp>(1024);
let _ = SYNC_TX.set(sync_tx);
let _ = ALL_OPS.set(Mutex::new(Vec::new()));
// Initialise thread-local CRDT for test isolation.
// Only creates a new CRDT if one isn't set yet on this thread;
// subsequent calls are no-ops (matching the old OnceLock semantics
// while keeping each thread isolated).
CRDT_STATE_TL.with(|lock| {
if lock.get().is_none() {
let keypair = make_keypair();
let crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
let (persist_tx, _rx) = mpsc::unbounded_channel();
let state = CrdtState { crdt, keypair, index: HashMap::new(), node_index: HashMap::new(), persist_tx };
let _ = lock.set(Mutex::new(state));
}
});
let _ = CRDT_EVENT_TX.get_or_init(|| broadcast::channel::<CrdtEvent>(256).0);
let _ = SYNC_TX.get_or_init(|| broadcast::channel::<SignedOp>(1024).0);
let _ = ALL_OPS.get_or_init(|| Mutex::new(Vec::new()));
}
/// Load or create the Ed25519 keypair used by this node.
@@ -390,7 +414,7 @@ pub fn write_item(
claimed_by: Option<&str>,
claimed_at: Option<f64>,
) {
let Some(state_mutex) = CRDT_STATE.get() else {
let Some(state_mutex) = get_crdt() else {
return;
};
let Ok(mut state) = state_mutex.lock() else {
@@ -511,7 +535,7 @@ fn emit_event(event: CrdtEvent) {
/// Returns `true` if the op was new and applied, `false` if it was a
/// duplicate or failed validation.
pub fn apply_remote_op(op: SignedOp) -> bool {
let Some(state_mutex) = CRDT_STATE.get() else {
let Some(state_mutex) = get_crdt() else {
return false;
};
let Ok(mut state) = state_mutex.lock() else {
@@ -596,7 +620,7 @@ pub fn apply_remote_op(op: SignedOp) -> bool {
/// Used as the stable identity written into the CRDT nodes list.
/// Returns `None` before `init()`.
pub fn our_node_id() -> Option<String> {
let state = CRDT_STATE.get()?.lock().ok()?;
let state = get_crdt()?.lock().ok()?;
Some(hex::encode(&state.crdt.id))
}
@@ -615,7 +639,7 @@ pub fn write_claim(story_id: &str) -> bool {
};
let now = chrono::Utc::now().timestamp() as f64;
let Some(state_mutex) = CRDT_STATE.get() else {
let Some(state_mutex) = get_crdt() else {
return false;
};
let Ok(mut state) = state_mutex.lock() else {
@@ -638,7 +662,7 @@ pub fn write_claim(story_id: &str) -> bool {
/// Release a claim on a pipeline item (clear claimed_by and claimed_at).
pub fn release_claim(story_id: &str) {
let Some(state_mutex) = CRDT_STATE.get() else {
let Some(state_mutex) = get_crdt() else {
return;
};
let Ok(mut state) = state_mutex.lock() else {
@@ -674,7 +698,7 @@ pub fn is_claimed_by_us(story_id: &str) -> bool {
///
/// This is the write path for both local heartbeats and tombstoning.
pub fn write_node_presence(node_id: &str, address: &str, last_seen: f64, alive: bool) {
let Some(state_mutex) = CRDT_STATE.get() else {
let Some(state_mutex) = get_crdt() else {
return;
};
let Ok(mut state) = state_mutex.lock() else {
@@ -715,7 +739,7 @@ pub fn write_node_presence(node_id: &str, address: &str, last_seen: f64, alive:
///
/// Returns `None` before `init()`.
pub fn read_all_node_presence() -> Option<Vec<NodePresenceView>> {
let state_mutex = CRDT_STATE.get()?;
let state_mutex = get_crdt()?;
let state = state_mutex.lock().ok()?;
let mut nodes = Vec::new();
@@ -797,14 +821,14 @@ pub struct CrdtStateDump {
/// **This is a debug tool.** For normal pipeline introspection use
/// [`read_all_items`] or the `get_pipeline_status` MCP tool instead.
pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump {
let in_memory_state_loaded = CRDT_STATE.get().is_some();
let in_memory_state_loaded = get_crdt().is_some();
let persisted_ops_count = ALL_OPS
.get()
.and_then(|m| m.lock().ok().map(|v| v.len()))
.unwrap_or(0);
let Some(state_mutex) = CRDT_STATE.get() else {
let Some(state_mutex) = get_crdt() else {
return CrdtStateDump {
in_memory_state_loaded,
total_items: 0,
@@ -932,7 +956,7 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump {
/// Returns items grouped by stage, or `None` if the CRDT layer is not
/// initialised.
pub fn read_all_items() -> Option<Vec<PipelineItemView>> {
let state_mutex = CRDT_STATE.get()?;
let state_mutex = get_crdt()?;
let state = state_mutex.lock().ok()?;
// Only return items that appear in the deduplicated index.
@@ -950,7 +974,7 @@ pub fn read_all_items() -> Option<Vec<PipelineItemView>> {
/// Read a single pipeline item by story_id.
pub fn read_item(story_id: &str) -> Option<PipelineItemView> {
let state_mutex = CRDT_STATE.get()?;
let state_mutex = get_crdt()?;
let state = state_mutex.lock().ok()?;
let &idx = state.index.get(story_id)?;
extract_item_view(&state.crdt.doc.items[idx])
@@ -980,8 +1004,7 @@ pub fn read_item(story_id: &str) -> Option<PipelineItemView> {
/// or an `Err` if the CRDT layer isn't initialised or the story_id is
/// unknown to the in-memory state.
pub fn evict_item(story_id: &str) -> Result<(), String> {
let state_mutex = CRDT_STATE
.get()
let state_mutex = get_crdt()
.ok_or_else(|| "CRDT layer not initialised".to_string())?;
let mut state = state_mutex
.lock()