huskies: merge 917

This commit is contained in:
dave
2026-05-12 16:17:09 +00:00
parent 916dc2b11d
commit ce07c4d7b7
+155 -35
View File
@@ -4,10 +4,10 @@
use std::collections::HashMap; use std::collections::HashMap;
use bft_json_crdt::json_crdt::*; use bft_json_crdt::json_crdt::*;
use bft_json_crdt::op::{OpId, ROOT_ID};
use super::state::{all_ops_lock, apply_and_persist, get_crdt, rebuild_index}; use super::state::{all_ops_lock, apply_and_persist, get_crdt, rebuild_index};
use super::types::{PipelineDoc, PipelineItemCrdt, PipelineItemView}; use super::types::{PipelineDoc, PipelineItemCrdt, PipelineItemView};
use bft_json_crdt::op::ROOT_ID;
// ── Debug dump ─────────────────────────────────────────────────────── // ── Debug dump ───────────────────────────────────────────────────────
@@ -219,19 +219,23 @@ pub fn read_item(story_id: &str) -> Option<PipelineItemView> {
/// without needing a full process restart. /// without needing a full process restart.
/// ///
/// Specifically: /// Specifically:
/// 1. Looks up the item's CRDT `OpId` via the visible-index map. /// 1. Scans the items list for ALL non-deleted ops whose `story_id` register
/// 2. Constructs a delete op via the bft-json-crdt list `delete()` primitive. /// matches the target. Under concurrent CRDT ops, two nodes can each
/// 3. Signs it with the local node's keypair and applies it to the in-memory /// insert the same `story_id` independently; this produces duplicate list
/// CRDT (marking the item `is_deleted = true` so subsequent /// entries. Using the stable `OpId` from a direct scan (not a positional
/// `read_all_items` / `read_item` calls skip it). /// index translation) ensures every duplicate is tombstoned, so none can
/// 4. Persists the signed delete op to `crdt_ops` via the existing /// survive CRDT replay after the deletion.
/// `apply_and_persist` channel — so the eviction survives a restart. /// 2. Constructs a delete op per matching entry via the bft-json-crdt list
/// 5. Rebuilds the `story_id → visible_index` map (visible indices shift /// `delete()` primitive, signs it with the local keypair, and applies it
/// when an item is marked deleted). /// to the in-memory CRDT (marking each entry `is_deleted = true`).
/// 6. Drops the in-memory content-store entry for the story so the cached /// 3. Persists every signed delete op to `crdt_ops` so the evictions survive
/// body doesn't outlive the CRDT entry. /// a restart.
/// 4. Rebuilds the `story_id → visible_index` map.
/// 5. Adds the story_id to the tombstone set so future `write_item` calls
/// cannot resurrect it.
/// 6. Drops the in-memory content-store entry.
/// ///
/// Returns `Ok(())` if the item was found and a tombstone op was queued, /// Returns `Ok(())` if at least one matching entry was found and tombstoned,
/// or an `Err` if the CRDT layer isn't initialised or the story_id is /// or an `Err` if the CRDT layer isn't initialised or the story_id is
/// unknown to the in-memory state. /// unknown to the in-memory state.
pub fn evict_item(story_id: &str) -> Result<(), String> { pub fn evict_item(story_id: &str) -> Result<(), String> {
@@ -240,31 +244,42 @@ pub fn evict_item(story_id: &str) -> Result<(), String> {
.lock() .lock()
.map_err(|e| format!("CRDT lock poisoned: {e}"))?; .map_err(|e| format!("CRDT lock poisoned: {e}"))?;
let idx = state if !state.index.contains_key(story_id) {
.index return Err(format!("Story '{story_id}' not found in in-memory CRDT"));
.get(story_id) }
.copied()
.ok_or_else(|| format!("Story '{story_id}' not found in in-memory CRDT"))?;
// Resolve the item's OpId before the closure (the closure will mutably // Collect the stable OpId of every non-deleted list entry whose story_id
// borrow `state`, so we can't access `state.crdt.doc.items` from inside). // register matches. Two nodes can concurrently insert the same story_id
// // (each seeing an empty local index at the time of the insert), which
// `rebuild_index` uses `items.iter()` which skips the invisible ROOT // leaves duplicate entries in the CRDT list. The visible index (HashMap)
// sentinel, so `idx` is a 0-based visible-item position. `id_at` counts // keeps only the last-seen entry; without this full scan the earlier
// ALL non-deleted ops including the sentinel at position 0, so we must // concurrent insert would survive the deletion and resurface after replay.
// add 1 to translate from visible-item position to `id_at` position. let item_op_ids: Vec<OpId> = state
let item_id = .crdt
state.crdt.doc.items.id_at(idx + 1).ok_or_else(|| { .doc
format!("Item index {idx} for '{story_id}' did not resolve to an OpId") .items
})?; .ops
.iter()
.filter(|op| {
!op.is_deleted
&& op.content.as_ref().is_some_and(|item| {
matches!(item.story_id.view(), JsonValue::String(ref s) if s == story_id)
})
})
.map(|op| op.id)
.collect();
// Write the delete op via the existing apply_and_persist machinery. if item_op_ids.is_empty() {
// This signs the op, applies it to the in-memory CRDT (marking the item return Err(format!("Story '{story_id}' not found in CRDT ops list"));
// is_deleted), and sends it to the persistence task. }
apply_and_persist(&mut state, |s| s.crdt.doc.items.delete(item_id));
// Rebuild the story_id → visible_index map; the deleted item is no // Delete every matching op so no duplicate can survive CRDT replay.
// longer counted by the iter that rebuild_index uses. for item_op_id in item_op_ids {
apply_and_persist(&mut state, |s| s.crdt.doc.items.delete(item_op_id));
}
// Rebuild the story_id → visible_index map; deleted items are no longer
// counted by the iter that rebuild_index uses.
state.index = rebuild_index(&state.crdt); state.index = rebuild_index(&state.crdt);
// Record the tombstone so that any future write_item call for this // Record the tombstone so that any future write_item call for this
@@ -421,6 +436,7 @@ pub fn check_archived_deps_crdt(story_id: &str) -> Vec<u32> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::super::ops::apply_remote_op;
use super::super::state::init_for_test; use super::super::state::init_for_test;
use super::super::state::rebuild_index; use super::super::state::rebuild_index;
use super::super::types::PipelineItemCrdt; use super::super::types::PipelineItemCrdt;
@@ -490,4 +506,108 @@ mod tests {
let result = check_archived_deps_crdt("nonexistent_story_archived"); let result = check_archived_deps_crdt("nonexistent_story_archived");
assert!(result.is_empty()); assert!(result.is_empty());
} }
/// Regression for story 917: evict_item must tombstone ALL list entries that
/// share the target story_id, not just the one pointed to by the visible index.
///
/// When two nodes concurrently insert the same story_id (each seeing an empty
/// local index at the time), the CRDT list ends up with two entries. The
/// visible index (HashMap) keeps only the last-written entry. The old code
/// used `id_at(idx + 1)` which targeted only that last entry; the earlier
/// concurrent insert survived the deletion and would reappear after CRDT
/// replay — the "targeted item survives" bug.
#[test]
fn evict_item_tombstones_concurrent_duplicate_entries() {
init_for_test();
let story_id = "917_story_concurrent_evict";
// Insert the story locally (simulates node 1's insert).
write_item(
story_id,
"1_backlog",
Some("Node 1 insert"),
None,
None,
None,
None,
None,
None,
None,
);
// The story is live on this node.
assert!(
read_item(story_id).is_some(),
"story must be visible after local insert"
);
// Simulate a concurrent insert of the same story_id from a different node
// (node 2 uses a separate keypair and its own BaseCrdt, representing a
// node that was partitioned and independently created the same story_id).
let kp2 = make_keypair();
let mut remote_crdt = BaseCrdt::<PipelineDoc>::new(&kp2);
let remote_item: JsonValue = json!({
"story_id": story_id,
"stage": "1_backlog",
"name": "Node 2 concurrent insert",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
"merged_at": 0.0,
})
.into();
let remote_insert_op = remote_crdt
.doc
.items
.insert(ROOT_ID, remote_item)
.sign(&kp2);
remote_crdt.apply(remote_insert_op.clone());
// Deliver node 2's concurrent insert to this node. After this there are
// two list entries with story_id == story_id in the CRDT ops list.
let accepted = apply_remote_op(remote_insert_op);
assert!(accepted, "remote concurrent insert must be accepted");
// Now delete the story.
let result = evict_item(story_id);
assert!(result.is_ok(), "evict_item must succeed: {result:?}");
// Both the local AND the remote concurrent entry must be gone.
assert!(
read_item(story_id).is_none(),
"story must be absent from read_item after evict"
);
// Confirm via dump_crdt_state that every entry with this story_id is deleted.
let dump_state = dump_crdt_state(Some(story_id));
let any_surviving = dump_state
.items
.iter()
.any(|i| i.story_id.as_deref() == Some(story_id) && !i.is_deleted);
assert!(
!any_surviving,
"no non-deleted CRDT entry must remain for story_id '{story_id}'"
);
// story_id must be in the tombstone set so write_item cannot resurrect it.
write_item(
story_id,
"1_backlog",
Some("Resurrection attempt"),
None,
None,
None,
None,
None,
None,
None,
);
assert!(
read_item(story_id).is_none(),
"tombstoned story must not be resurrected by write_item"
);
}
} }