From f8a295eaec794fc897bbdfde9b98ea52775d042d Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 1 May 2026 14:56:13 +0000 Subject: [PATCH] huskies: merge 889 --- server/src/crdt_state/ops.rs | 95 ++++++++++++++++++++++++++++ server/src/crdt_state/read.rs | 4 ++ server/src/crdt_state/state/init.rs | 20 +++++- server/src/crdt_state/state/mod.rs | 9 ++- server/src/crdt_state/state/tests.rs | 2 + server/src/crdt_state/write/item.rs | 7 ++ server/src/crdt_state/write/tests.rs | 81 ++++++++++++++++++++++++ 7 files changed, 215 insertions(+), 3 deletions(-) diff --git a/server/src/crdt_state/ops.rs b/server/src/crdt_state/ops.rs index 3b564cef..6ec96f08 100644 --- a/server/src/crdt_state/ops.rs +++ b/server/src/crdt_state/ops.rs @@ -138,6 +138,30 @@ pub fn apply_remote_op(op: SignedOp) -> bool { state.test_job_index = rebuild_test_job_index(&state.crdt); state.agent_throttle_index = rebuild_agent_throttle_index(&state.crdt); + // Propagate tombstones from remotely-received delete ops. + // + // The tombstone set is normally populated only by the local evict_item + // call. Without this step, any node that receives a delete op from a + // peer would leave the story absent from state.index but NOT in + // state.tombstones — so a concurrent write_item call on this node could + // re-insert the story, undoing the remote delete. + if op.inner.is_deleted { + let new_tombstones: Vec = state + .crdt + .doc + .items + .ops + .iter() + .filter(|o| o.is_deleted) + .filter_map(|o| o.content.as_ref()) + .filter_map(|item| match item.story_id.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }) + .collect(); + state.tombstones.extend(new_tombstones); + } + // Detect and broadcast stage transitions. for (sid, &idx) in &state.index { let new_stage = match state.crdt.doc.items[idx].stage.view() { @@ -465,4 +489,75 @@ mod tests { assert_eq!(deserialized["aabbcc"], 42); assert_eq!(deserialized["ddeeff"], 7); } + + /// Regression for story 889 AC4: applying a remote delete op must update the + /// local tombstone set so that a subsequent write_item call cannot resurrect + /// the deleted story. Before the fix, apply_remote_op rebuilt indices but + /// left state.tombstones empty, allowing write_item to re-insert the item. + #[test] + fn remote_delete_op_prevents_resurrection() { + use super::super::read::read_item; + use super::super::types::PipelineDoc; + + let story_id = "889_story_remote_del_resurrect"; + + // Build insert + delete ops on a standalone CRDT (the "remote" node). + let kp = make_keypair(); + let mut remote = BaseCrdt::::new(&kp); + + let item: JsonValue = json!({ + "story_id": story_id, + "stage": "1_backlog", + "name": "Remote Delete Resurrection Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + "merged_at": 0.0, + }) + .into(); + + let signed_insert = remote.doc.items.insert(ROOT_ID, item).sign(&kp); + remote.apply(signed_insert.clone()); + + let insert_op_id = signed_insert.id(); + let signed_delete = remote.doc.items.delete(insert_op_id).sign(&kp); + + // Feed the insert op to the local test singleton so the story exists here. + init_for_test(); + let accepted = apply_remote_op(signed_insert); + assert!(accepted, "remote insert must be accepted"); + assert!( + read_item(story_id).is_some(), + "story must be active after remote insert" + ); + + // Apply the remote delete op. + let accepted = apply_remote_op(signed_delete); + assert!(accepted, "remote delete op must be accepted"); + assert!( + read_item(story_id).is_none(), + "story must be gone after remote delete op" + ); + + // Attempt resurrection via write_item — must be rejected by tombstone check. + write_item( + story_id, + "1_backlog", + Some("Resurrected"), + None, + None, + None, + None, + None, + None, + None, + ); + assert!( + read_item(story_id).is_none(), + "tombstoned story must not be resurrected by write_item after remote delete" + ); + } } diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index 99f51ab0..f1e5cc33 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -268,6 +268,10 @@ pub fn evict_item(story_id: &str) -> Result<(), String> { // longer counted by the iter that rebuild_index uses. state.index = rebuild_index(&state.crdt); + // Record the tombstone so that any future write_item call for this + // story_id is rejected even if the index no longer contains it. + state.tombstones.insert(story_id.to_string()); + // Drop the content-store entry so the cached body doesn't outlive the // CRDT entry. (Bug 521 follow-up: when CONTENT_STORE becomes a true // lazy cache, this explicit eviction can go away.) diff --git a/server/src/crdt_state/state/init.rs b/server/src/crdt_state/state/init.rs index 7bb3a3f2..87430306 100644 --- a/server/src/crdt_state/state/init.rs +++ b/server/src/crdt_state/state/init.rs @@ -5,11 +5,11 @@ //! persistence task. It is safe to call only once; subsequent calls are //! no-ops (guarded by [`super::CRDT_STATE`]). -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::Mutex; -use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp}; +use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue, SignedOp}; use bft_json_crdt::keypair::make_keypair; use fastcrypto::ed25519::Ed25519KeyPair; use fastcrypto::traits::ToFromBytes; @@ -71,6 +71,21 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { let _ = ALL_OPS.set(Mutex::new(all_ops_vec)); let _ = VECTOR_CLOCK.set(Mutex::new(vector_clock)); + // Rebuild tombstone set: deleted list items still carry their original + // PipelineItemCrdt content, so we can extract the story_id directly. + let tombstones: HashSet = crdt + .doc + .items + .ops + .iter() + .filter(|op| op.is_deleted) + .filter_map(|op| op.content.as_ref()) + .filter_map(|item| match item.story_id.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }) + .collect(); + // Build the indices from the reconstructed state. let index = rebuild_index(&crdt); let node_index = rebuild_node_index(&crdt); @@ -151,6 +166,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { gateway_project_index, persist_tx, lamport_floor, + tombstones, }; let _ = CRDT_STATE.set(Mutex::new(state)); diff --git a/server/src/crdt_state/state/mod.rs b/server/src/crdt_state/state/mod.rs index 822d4d04..4a58a78b 100644 --- a/server/src/crdt_state/state/mod.rs +++ b/server/src/crdt_state/state/mod.rs @@ -6,7 +6,7 @@ //! - [`init`]: async startup and keypair persistence //! - [`apply`]: write path (sign, apply, persist, broadcast) -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::{Mutex, OnceLock}; use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp}; @@ -67,6 +67,12 @@ pub(super) struct CrdtState { /// Newly-created registers (post-init) must have their Lamport clock /// advanced to this floor so they don't re-emit low sequence numbers. pub(super) lamport_floor: u64, + /// Story IDs permanently tombstoned via `evict_item`. + /// + /// `write_item` consults this set before inserting a new CRDT entry so + /// that a concurrent or late-arriving write cannot resurrect a deleted + /// story. Rebuilt from the CRDT op log on restart. + pub(super) tombstones: HashSet, } // ── Singleton and accessor ─────────────────────────────────────────── @@ -134,6 +140,7 @@ pub fn init_for_test() { gateway_project_index: HashMap::new(), persist_tx, lamport_floor: 0, + tombstones: HashSet::new(), }; let _ = lock.set(Mutex::new(state)); } diff --git a/server/src/crdt_state/state/tests.rs b/server/src/crdt_state/state/tests.rs index 574f603c..f4a056d2 100644 --- a/server/src/crdt_state/state/tests.rs +++ b/server/src/crdt_state/state/tests.rs @@ -176,6 +176,7 @@ fn persist_tx_send_failure_logs_warn_with_op_type_and_seq() { gateway_project_index: HashMap::new(), persist_tx, lamport_floor: 0, + tombstones: std::collections::HashSet::new(), }; // Drop the receiver so that the next send fails immediately. @@ -249,6 +250,7 @@ fn persist_tx_send_success_emits_no_warn() { gateway_project_index: HashMap::new(), persist_tx, lamport_floor: 0, + tombstones: std::collections::HashSet::new(), }; let item_json: JsonValue = json!({ diff --git a/server/src/crdt_state/write/item.rs b/server/src/crdt_state/write/item.rs index 12b39a33..addedde5 100644 --- a/server/src/crdt_state/write/item.rs +++ b/server/src/crdt_state/write/item.rs @@ -143,6 +143,13 @@ pub fn write_item( return; }; + // Reject any write (insert or update) for a tombstoned story_id. + // This prevents a concurrent or late-arriving write from resurrecting + // a story that was permanently deleted via evict_item. + if state.tombstones.contains(story_id) { + return; + } + if let Some(&idx) = state.index.get(story_id) { // Capture the old stage before updating so we can detect transitions. let old_stage = match state.crdt.doc.items[idx].stage.view() { diff --git a/server/src/crdt_state/write/tests.rs b/server/src/crdt_state/write/tests.rs index 742af414..aa8129ef 100644 --- a/server/src/crdt_state/write/tests.rs +++ b/server/src/crdt_state/write/tests.rs @@ -733,3 +733,84 @@ async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { ); } } + +// ── Story 889 regression tests ─────────────────────────────────────────────── + +/// Regression for story 889: a tombstoned story must not be resurrected by +/// concurrent write_item calls racing the delete. Spawns a tokio task that +/// hammers write_item every 10ms, tombstones the item mid-race, then verifies +/// the projection stays empty for ~500ms and remains empty after the writer +/// stops. +/// +/// The tokio current_thread runtime keeps all tasks on the same OS thread, so +/// the thread-local test CRDT is visible to the spawned task. +#[tokio::test] +async fn tombstone_survives_concurrent_writes() { + use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering}; + + use super::super::read::{evict_item, read_item}; + + init_for_test(); + + let story_id = "889_story_tombstone_concurrent"; + + write_item( + story_id, + "2_current", + Some("Tombstone Concurrent Test"), + None, + None, + None, + None, + None, + None, + None, + ); + assert!( + read_item(story_id).is_some(), + "item must exist before eviction" + ); + + let stop = Arc::new(AtomicBool::new(false)); + let stop_clone = stop.clone(); + + let writer = tokio::task::spawn(async move { + while !stop_clone.load(Ordering::Relaxed) { + write_item( + story_id, + "2_current", + Some("Tombstone Concurrent Test"), + None, + None, + None, + None, + None, + None, + None, + ); + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + }); + + tokio::time::sleep(tokio::time::Duration::from_millis(30)).await; + + evict_item(story_id).expect("evict_item must succeed"); + + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(500); + while tokio::time::Instant::now() < deadline { + assert!( + read_item(story_id).is_none(), + "tombstoned story must not reappear while concurrent writes are in flight" + ); + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + + stop.store(true, Ordering::Relaxed); + writer.await.unwrap(); + + assert!( + read_item(story_id).is_none(), + "tombstoned story must stay gone after concurrent writer stops" + ); +}