diff --git a/server/src/crdt_state.rs b/server/src/crdt_state.rs index 9cfd3ae2..9da2c93a 100644 --- a/server/src/crdt_state.rs +++ b/server/src/crdt_state.rs @@ -158,6 +158,35 @@ struct CrdtState { static CRDT_STATE: OnceLock> = OnceLock::new(); +#[cfg(test)] +thread_local! { + static CRDT_STATE_TL: OnceLock> = const { OnceLock::new() }; +} + +#[cfg(not(test))] +fn get_crdt() -> Option<&'static Mutex> { + CRDT_STATE.get() +} + +#[cfg(test)] +fn get_crdt() -> Option<&'static Mutex> { + let tl = CRDT_STATE_TL.with(|lock| { + if lock.get().is_some() { + Some(lock as *const OnceLock>) + } else { + None + } + }); + if let Some(ptr) = tl { + // SAFETY: The thread-local lives as long as the thread, which outlives + // any test using it. We only need 'static for the return type. + let lock = unsafe { &*ptr }; + lock.get() + } else { + CRDT_STATE.get() + } +} + // ── Initialisation ─────────────────────────────────────────────────── /// Initialise the CRDT state layer. @@ -270,27 +299,22 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { /// Safe to call multiple times — subsequent calls are no-ops (OnceLock). #[cfg(test)] pub fn init_for_test() { - if CRDT_STATE.get().is_some() { - return; - } - let keypair = make_keypair(); - let crdt = BaseCrdt::::new(&keypair); - let index = HashMap::new(); - let node_index = HashMap::new(); - let (persist_tx, _rx) = mpsc::unbounded_channel(); - let state = CrdtState { - crdt, - keypair, - index, - node_index, - persist_tx, - }; - let _ = CRDT_STATE.set(Mutex::new(state)); - let (event_tx, _) = broadcast::channel::(256); - let _ = CRDT_EVENT_TX.set(event_tx); - let (sync_tx, _) = broadcast::channel::(1024); - let _ = SYNC_TX.set(sync_tx); - let _ = ALL_OPS.set(Mutex::new(Vec::new())); + // Initialise thread-local CRDT for test isolation. + // Only creates a new CRDT if one isn't set yet on this thread; + // subsequent calls are no-ops (matching the old OnceLock semantics + // while keeping each thread isolated). + CRDT_STATE_TL.with(|lock| { + if lock.get().is_none() { + let keypair = make_keypair(); + let crdt = BaseCrdt::::new(&keypair); + let (persist_tx, _rx) = mpsc::unbounded_channel(); + let state = CrdtState { crdt, keypair, index: HashMap::new(), node_index: HashMap::new(), persist_tx }; + let _ = lock.set(Mutex::new(state)); + } + }); + let _ = CRDT_EVENT_TX.get_or_init(|| broadcast::channel::(256).0); + let _ = SYNC_TX.get_or_init(|| broadcast::channel::(1024).0); + let _ = ALL_OPS.get_or_init(|| Mutex::new(Vec::new())); } /// Load or create the Ed25519 keypair used by this node. @@ -390,7 +414,7 @@ pub fn write_item( claimed_by: Option<&str>, claimed_at: Option, ) { - let Some(state_mutex) = CRDT_STATE.get() else { + let Some(state_mutex) = get_crdt() else { return; }; let Ok(mut state) = state_mutex.lock() else { @@ -511,7 +535,7 @@ fn emit_event(event: CrdtEvent) { /// Returns `true` if the op was new and applied, `false` if it was a /// duplicate or failed validation. pub fn apply_remote_op(op: SignedOp) -> bool { - let Some(state_mutex) = CRDT_STATE.get() else { + let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { @@ -596,7 +620,7 @@ pub fn apply_remote_op(op: SignedOp) -> bool { /// Used as the stable identity written into the CRDT nodes list. /// Returns `None` before `init()`. pub fn our_node_id() -> Option { - let state = CRDT_STATE.get()?.lock().ok()?; + let state = get_crdt()?.lock().ok()?; Some(hex::encode(&state.crdt.id)) } @@ -615,7 +639,7 @@ pub fn write_claim(story_id: &str) -> bool { }; let now = chrono::Utc::now().timestamp() as f64; - let Some(state_mutex) = CRDT_STATE.get() else { + let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { @@ -638,7 +662,7 @@ pub fn write_claim(story_id: &str) -> bool { /// Release a claim on a pipeline item (clear claimed_by and claimed_at). pub fn release_claim(story_id: &str) { - let Some(state_mutex) = CRDT_STATE.get() else { + let Some(state_mutex) = get_crdt() else { return; }; let Ok(mut state) = state_mutex.lock() else { @@ -674,7 +698,7 @@ pub fn is_claimed_by_us(story_id: &str) -> bool { /// /// This is the write path for both local heartbeats and tombstoning. pub fn write_node_presence(node_id: &str, address: &str, last_seen: f64, alive: bool) { - let Some(state_mutex) = CRDT_STATE.get() else { + let Some(state_mutex) = get_crdt() else { return; }; let Ok(mut state) = state_mutex.lock() else { @@ -715,7 +739,7 @@ pub fn write_node_presence(node_id: &str, address: &str, last_seen: f64, alive: /// /// Returns `None` before `init()`. pub fn read_all_node_presence() -> Option> { - let state_mutex = CRDT_STATE.get()?; + let state_mutex = get_crdt()?; let state = state_mutex.lock().ok()?; let mut nodes = Vec::new(); @@ -797,14 +821,14 @@ pub struct CrdtStateDump { /// **This is a debug tool.** For normal pipeline introspection use /// [`read_all_items`] or the `get_pipeline_status` MCP tool instead. pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump { - let in_memory_state_loaded = CRDT_STATE.get().is_some(); + let in_memory_state_loaded = get_crdt().is_some(); let persisted_ops_count = ALL_OPS .get() .and_then(|m| m.lock().ok().map(|v| v.len())) .unwrap_or(0); - let Some(state_mutex) = CRDT_STATE.get() else { + let Some(state_mutex) = get_crdt() else { return CrdtStateDump { in_memory_state_loaded, total_items: 0, @@ -932,7 +956,7 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump { /// Returns items grouped by stage, or `None` if the CRDT layer is not /// initialised. pub fn read_all_items() -> Option> { - let state_mutex = CRDT_STATE.get()?; + let state_mutex = get_crdt()?; let state = state_mutex.lock().ok()?; // Only return items that appear in the deduplicated index. @@ -950,7 +974,7 @@ pub fn read_all_items() -> Option> { /// Read a single pipeline item by story_id. pub fn read_item(story_id: &str) -> Option { - let state_mutex = CRDT_STATE.get()?; + let state_mutex = get_crdt()?; let state = state_mutex.lock().ok()?; let &idx = state.index.get(story_id)?; extract_item_view(&state.crdt.doc.items[idx]) @@ -980,8 +1004,7 @@ pub fn read_item(story_id: &str) -> Option { /// or an `Err` if the CRDT layer isn't initialised or the story_id is /// unknown to the in-memory state. pub fn evict_item(story_id: &str) -> Result<(), String> { - let state_mutex = CRDT_STATE - .get() + let state_mutex = get_crdt() .ok_or_else(|| "CRDT layer not initialised".to_string())?; let mut state = state_mutex .lock() diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 360ad212..ee98ef11 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -45,9 +45,38 @@ static PIPELINE_DB: OnceLock = OnceLock::new(); static CONTENT_STORE: OnceLock>> = OnceLock::new(); +#[cfg(test)] +thread_local! { + static CONTENT_STORE_TL: OnceLock>> = const { OnceLock::new() }; +} + +#[cfg(not(test))] +fn get_content_store() -> Option<&'static Mutex>> { + CONTENT_STORE.get() +} + +#[cfg(test)] +fn get_content_store() -> Option<&'static Mutex>> { + let tl = CONTENT_STORE_TL.with(|lock| { + if lock.get().is_some() { + Some(lock as *const OnceLock>>) + } else { + None + } + }); + if let Some(ptr) = tl { + // SAFETY: The thread-local lives as long as the thread, which outlives + // any test using it. We only need 'static for the return type. + let lock = unsafe { &*ptr }; + lock.get() + } else { + CONTENT_STORE.get() + } +} + /// Read the full markdown content of a story from the in-memory store. pub fn read_content(story_id: &str) -> Option { - let store = CONTENT_STORE.get()?; + let store = get_content_store()?; let map = store.lock().ok()?; map.get(story_id).cloned() } @@ -56,14 +85,14 @@ pub fn read_content(story_id: &str) -> Option { /// /// Updates the in-memory store immediately. pub fn write_content(story_id: &str, content: &str) { - if let Some(store) = CONTENT_STORE.get() && let Ok(mut map) = store.lock() { + if let Some(store) = get_content_store() && let Ok(mut map) = store.lock() { map.insert(story_id.to_string(), content.to_string()); } } /// Remove a story's content from the in-memory store. pub fn delete_content(story_id: &str) { - if let Some(store) = CONTENT_STORE.get() && let Ok(mut map) = store.lock() { + if let Some(store) = get_content_store() && let Ok(mut map) = store.lock() { map.remove(story_id); } } @@ -72,16 +101,23 @@ pub fn delete_content(story_id: &str) { /// /// Safe to call multiple times — the `OnceLock` is set at most once. pub fn ensure_content_store() { - let _ = CONTENT_STORE.set(Mutex::new(HashMap::new())); - // In tests, also initialise the in-memory CRDT state so that - // write_item_with_content() and read_all_typed() work without async SQLite. + #[cfg(not(test))] + { let _ = CONTENT_STORE.set(Mutex::new(HashMap::new())); } + #[cfg(test)] - crate::crdt_state::init_for_test(); + { + CONTENT_STORE_TL.with(|lock| { + if lock.get().is_none() { + let _ = lock.set(Mutex::new(HashMap::new())); + } + }); + crate::crdt_state::init_for_test(); + } } /// Return all story IDs present in the content store. pub fn all_content_ids() -> Vec { - match CONTENT_STORE.get() { + match get_content_store() { Some(store) => match store.lock() { Ok(map) => map.keys().cloned().collect(), Err(_) => Vec::new(), diff --git a/server/src/http/workflow/bug_ops.rs b/server/src/http/workflow/bug_ops.rs index 1b853baf..db21ddcc 100644 --- a/server/src/http/workflow/bug_ops.rs +++ b/server/src/http/workflow/bug_ops.rs @@ -269,22 +269,29 @@ mod tests { #[test] fn next_item_number_increments_from_existing_bugs() { + crate::db::ensure_content_store(); let tmp = tempfile::tempdir().unwrap(); let backlog = tmp.path().join(".huskies/work/1_backlog"); fs::create_dir_all(&backlog).unwrap(); fs::write(backlog.join("1_bug_crash.md"), "").unwrap(); fs::write(backlog.join("3_bug_another.md"), "").unwrap(); + // Also write to content store so next_item_number sees them. + crate::db::write_item_with_content("1_bug_crash", "1_backlog", "---\nname: Crash\n---\n"); + crate::db::write_item_with_content("3_bug_another", "1_backlog", "---\nname: Another\n---\n"); assert!(super::super::next_item_number(tmp.path()).unwrap() >= 4); } #[test] fn next_item_number_scans_archived_too() { + crate::db::ensure_content_store(); let tmp = tempfile::tempdir().unwrap(); let backlog = tmp.path().join(".huskies/work/1_backlog"); let archived = tmp.path().join(".huskies/work/5_done"); fs::create_dir_all(&backlog).unwrap(); fs::create_dir_all(&archived).unwrap(); fs::write(archived.join("5_bug_old.md"), "").unwrap(); + // Also write to content store so next_item_number sees it. + crate::db::write_item_with_content("5_bug_old", "5_done", "---\nname: Old Bug\n---\n"); assert!(super::super::next_item_number(tmp.path()).unwrap() >= 6); } diff --git a/server/src/http/workflow/story_ops.rs b/server/src/http/workflow/story_ops.rs index c6683a6b..c7b6eb54 100644 --- a/server/src/http/workflow/story_ops.rs +++ b/server/src/http/workflow/story_ops.rs @@ -315,10 +315,13 @@ mod tests { #[test] fn create_story_writes_correct_content() { + crate::db::ensure_content_store(); let tmp = tempfile::tempdir().unwrap(); let backlog = tmp.path().join(".huskies/work/1_backlog"); fs::create_dir_all(&backlog).unwrap(); fs::write(backlog.join("36_story_existing.md"), "").unwrap(); + // Also write to content store so next_item_number sees it. + crate::db::write_item_with_content("36_story_existing", "1_backlog", "---\nname: Existing\n---\n"); let number = super::super::next_item_number(tmp.path()).unwrap(); // The number must be >= 37 (at least higher than the existing "36_story_existing.md"),