From ce07c4d7b7b560dcbccc1bcb117042aa58579e80 Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 12 May 2026 16:17:09 +0000 Subject: [PATCH] huskies: merge 917 --- server/src/crdt_state/read.rs | 190 +++++++++++++++++++++++++++------- 1 file changed, 155 insertions(+), 35 deletions(-) diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index ab21a24a..039f93f5 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -4,10 +4,10 @@ use std::collections::HashMap; use bft_json_crdt::json_crdt::*; +use bft_json_crdt::op::{OpId, ROOT_ID}; use super::state::{all_ops_lock, apply_and_persist, get_crdt, rebuild_index}; use super::types::{PipelineDoc, PipelineItemCrdt, PipelineItemView}; -use bft_json_crdt::op::ROOT_ID; // ── Debug dump ─────────────────────────────────────────────────────── @@ -219,19 +219,23 @@ pub fn read_item(story_id: &str) -> Option { /// without needing a full process restart. /// /// Specifically: -/// 1. Looks up the item's CRDT `OpId` via the visible-index map. -/// 2. Constructs a delete op via the bft-json-crdt list `delete()` primitive. -/// 3. Signs it with the local node's keypair and applies it to the in-memory -/// CRDT (marking the item `is_deleted = true` so subsequent -/// `read_all_items` / `read_item` calls skip it). -/// 4. Persists the signed delete op to `crdt_ops` via the existing -/// `apply_and_persist` channel — so the eviction survives a restart. -/// 5. Rebuilds the `story_id → visible_index` map (visible indices shift -/// when an item is marked deleted). -/// 6. Drops the in-memory content-store entry for the story so the cached -/// body doesn't outlive the CRDT entry. +/// 1. Scans the items list for ALL non-deleted ops whose `story_id` register +/// matches the target. Under concurrent CRDT ops, two nodes can each +/// insert the same `story_id` independently; this produces duplicate list +/// entries. Using the stable `OpId` from a direct scan (not a positional +/// index translation) ensures every duplicate is tombstoned, so none can +/// survive CRDT replay after the deletion. +/// 2. Constructs a delete op per matching entry via the bft-json-crdt list +/// `delete()` primitive, signs it with the local keypair, and applies it +/// to the in-memory CRDT (marking each entry `is_deleted = true`). +/// 3. Persists every signed delete op to `crdt_ops` so the evictions survive +/// a restart. +/// 4. Rebuilds the `story_id → visible_index` map. +/// 5. Adds the story_id to the tombstone set so future `write_item` calls +/// cannot resurrect it. +/// 6. Drops the in-memory content-store entry. /// -/// Returns `Ok(())` if the item was found and a tombstone op was queued, +/// Returns `Ok(())` if at least one matching entry was found and tombstoned, /// or an `Err` if the CRDT layer isn't initialised or the story_id is /// unknown to the in-memory state. pub fn evict_item(story_id: &str) -> Result<(), String> { @@ -240,31 +244,42 @@ pub fn evict_item(story_id: &str) -> Result<(), String> { .lock() .map_err(|e| format!("CRDT lock poisoned: {e}"))?; - let idx = state - .index - .get(story_id) - .copied() - .ok_or_else(|| format!("Story '{story_id}' not found in in-memory CRDT"))?; + if !state.index.contains_key(story_id) { + return Err(format!("Story '{story_id}' not found in in-memory CRDT")); + } - // Resolve the item's OpId before the closure (the closure will mutably - // borrow `state`, so we can't access `state.crdt.doc.items` from inside). - // - // `rebuild_index` uses `items.iter()` which skips the invisible ROOT - // sentinel, so `idx` is a 0-based visible-item position. `id_at` counts - // ALL non-deleted ops including the sentinel at position 0, so we must - // add 1 to translate from visible-item position to `id_at` position. - let item_id = - state.crdt.doc.items.id_at(idx + 1).ok_or_else(|| { - format!("Item index {idx} for '{story_id}' did not resolve to an OpId") - })?; + // Collect the stable OpId of every non-deleted list entry whose story_id + // register matches. Two nodes can concurrently insert the same story_id + // (each seeing an empty local index at the time of the insert), which + // leaves duplicate entries in the CRDT list. The visible index (HashMap) + // keeps only the last-seen entry; without this full scan the earlier + // concurrent insert would survive the deletion and resurface after replay. + let item_op_ids: Vec = state + .crdt + .doc + .items + .ops + .iter() + .filter(|op| { + !op.is_deleted + && op.content.as_ref().is_some_and(|item| { + matches!(item.story_id.view(), JsonValue::String(ref s) if s == story_id) + }) + }) + .map(|op| op.id) + .collect(); - // Write the delete op via the existing apply_and_persist machinery. - // This signs the op, applies it to the in-memory CRDT (marking the item - // is_deleted), and sends it to the persistence task. - apply_and_persist(&mut state, |s| s.crdt.doc.items.delete(item_id)); + if item_op_ids.is_empty() { + return Err(format!("Story '{story_id}' not found in CRDT ops list")); + } - // Rebuild the story_id → visible_index map; the deleted item is no - // longer counted by the iter that rebuild_index uses. + // Delete every matching op so no duplicate can survive CRDT replay. + for item_op_id in item_op_ids { + apply_and_persist(&mut state, |s| s.crdt.doc.items.delete(item_op_id)); + } + + // Rebuild the story_id → visible_index map; deleted items are no longer + // counted by the iter that rebuild_index uses. state.index = rebuild_index(&state.crdt); // Record the tombstone so that any future write_item call for this @@ -421,6 +436,7 @@ pub fn check_archived_deps_crdt(story_id: &str) -> Vec { #[cfg(test)] mod tests { + use super::super::ops::apply_remote_op; use super::super::state::init_for_test; use super::super::state::rebuild_index; use super::super::types::PipelineItemCrdt; @@ -490,4 +506,108 @@ mod tests { let result = check_archived_deps_crdt("nonexistent_story_archived"); assert!(result.is_empty()); } + + /// Regression for story 917: evict_item must tombstone ALL list entries that + /// share the target story_id, not just the one pointed to by the visible index. + /// + /// When two nodes concurrently insert the same story_id (each seeing an empty + /// local index at the time), the CRDT list ends up with two entries. The + /// visible index (HashMap) keeps only the last-written entry. The old code + /// used `id_at(idx + 1)` which targeted only that last entry; the earlier + /// concurrent insert survived the deletion and would reappear after CRDT + /// replay — the "targeted item survives" bug. + #[test] + fn evict_item_tombstones_concurrent_duplicate_entries() { + init_for_test(); + let story_id = "917_story_concurrent_evict"; + + // Insert the story locally (simulates node 1's insert). + write_item( + story_id, + "1_backlog", + Some("Node 1 insert"), + None, + None, + None, + None, + None, + None, + None, + ); + + // The story is live on this node. + assert!( + read_item(story_id).is_some(), + "story must be visible after local insert" + ); + + // Simulate a concurrent insert of the same story_id from a different node + // (node 2 uses a separate keypair and its own BaseCrdt, representing a + // node that was partitioned and independently created the same story_id). + let kp2 = make_keypair(); + let mut remote_crdt = BaseCrdt::::new(&kp2); + let remote_item: JsonValue = json!({ + "story_id": story_id, + "stage": "1_backlog", + "name": "Node 2 concurrent insert", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + "merged_at": 0.0, + }) + .into(); + let remote_insert_op = remote_crdt + .doc + .items + .insert(ROOT_ID, remote_item) + .sign(&kp2); + remote_crdt.apply(remote_insert_op.clone()); + + // Deliver node 2's concurrent insert to this node. After this there are + // two list entries with story_id == story_id in the CRDT ops list. + let accepted = apply_remote_op(remote_insert_op); + assert!(accepted, "remote concurrent insert must be accepted"); + + // Now delete the story. + let result = evict_item(story_id); + assert!(result.is_ok(), "evict_item must succeed: {result:?}"); + + // Both the local AND the remote concurrent entry must be gone. + assert!( + read_item(story_id).is_none(), + "story must be absent from read_item after evict" + ); + + // Confirm via dump_crdt_state that every entry with this story_id is deleted. + let dump_state = dump_crdt_state(Some(story_id)); + let any_surviving = dump_state + .items + .iter() + .any(|i| i.story_id.as_deref() == Some(story_id) && !i.is_deleted); + assert!( + !any_surviving, + "no non-deleted CRDT entry must remain for story_id '{story_id}'" + ); + + // story_id must be in the tombstone set so write_item cannot resurrect it. + write_item( + story_id, + "1_backlog", + Some("Resurrection attempt"), + None, + None, + None, + None, + None, + None, + None, + ); + assert!( + read_item(story_id).is_none(), + "tombstoned story must not be resurrected by write_item" + ); + } }