huskies: merge 889

This commit is contained in:
dave
2026-05-01 14:56:13 +00:00
parent 61cf7684de
commit f8a295eaec
7 changed files with 215 additions and 3 deletions
+95
View File
@@ -138,6 +138,30 @@ pub fn apply_remote_op(op: SignedOp) -> bool {
state.test_job_index = rebuild_test_job_index(&state.crdt);
state.agent_throttle_index = rebuild_agent_throttle_index(&state.crdt);
// Propagate tombstones from remotely-received delete ops.
//
// The tombstone set is normally populated only by the local evict_item
// call. Without this step, any node that receives a delete op from a
// peer would leave the story absent from state.index but NOT in
// state.tombstones — so a concurrent write_item call on this node could
// re-insert the story, undoing the remote delete.
if op.inner.is_deleted {
let new_tombstones: Vec<String> = state
.crdt
.doc
.items
.ops
.iter()
.filter(|o| o.is_deleted)
.filter_map(|o| o.content.as_ref())
.filter_map(|item| match item.story_id.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
})
.collect();
state.tombstones.extend(new_tombstones);
}
// Detect and broadcast stage transitions.
for (sid, &idx) in &state.index {
let new_stage = match state.crdt.doc.items[idx].stage.view() {
@@ -465,4 +489,75 @@ mod tests {
assert_eq!(deserialized["aabbcc"], 42);
assert_eq!(deserialized["ddeeff"], 7);
}
/// Regression for story 889 AC4: applying a remote delete op must update the
/// local tombstone set so that a subsequent write_item call cannot resurrect
/// the deleted story. Before the fix, apply_remote_op rebuilt indices but
/// left state.tombstones empty, allowing write_item to re-insert the item.
#[test]
fn remote_delete_op_prevents_resurrection() {
use super::super::read::read_item;
use super::super::types::PipelineDoc;
let story_id = "889_story_remote_del_resurrect";
// Build insert + delete ops on a standalone CRDT (the "remote" node).
let kp = make_keypair();
let mut remote = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = json!({
"story_id": story_id,
"stage": "1_backlog",
"name": "Remote Delete Resurrection Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
"merged_at": 0.0,
})
.into();
let signed_insert = remote.doc.items.insert(ROOT_ID, item).sign(&kp);
remote.apply(signed_insert.clone());
let insert_op_id = signed_insert.id();
let signed_delete = remote.doc.items.delete(insert_op_id).sign(&kp);
// Feed the insert op to the local test singleton so the story exists here.
init_for_test();
let accepted = apply_remote_op(signed_insert);
assert!(accepted, "remote insert must be accepted");
assert!(
read_item(story_id).is_some(),
"story must be active after remote insert"
);
// Apply the remote delete op.
let accepted = apply_remote_op(signed_delete);
assert!(accepted, "remote delete op must be accepted");
assert!(
read_item(story_id).is_none(),
"story must be gone after remote delete op"
);
// Attempt resurrection via write_item — must be rejected by tombstone check.
write_item(
story_id,
"1_backlog",
Some("Resurrected"),
None,
None,
None,
None,
None,
None,
None,
);
assert!(
read_item(story_id).is_none(),
"tombstoned story must not be resurrected by write_item after remote delete"
);
}
}
+4
View File
@@ -268,6 +268,10 @@ pub fn evict_item(story_id: &str) -> Result<(), String> {
// longer counted by the iter that rebuild_index uses.
state.index = rebuild_index(&state.crdt);
// Record the tombstone so that any future write_item call for this
// story_id is rejected even if the index no longer contains it.
state.tombstones.insert(story_id.to_string());
// Drop the content-store entry so the cached body doesn't outlive the
// CRDT entry. (Bug 521 follow-up: when CONTENT_STORE becomes a true
// lazy cache, this explicit eviction can go away.)
+18 -2
View File
@@ -5,11 +5,11 @@
//! persistence task. It is safe to call only once; subsequent calls are
//! no-ops (guarded by [`super::CRDT_STATE`]).
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Mutex;
use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp};
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue, SignedOp};
use bft_json_crdt::keypair::make_keypair;
use fastcrypto::ed25519::Ed25519KeyPair;
use fastcrypto::traits::ToFromBytes;
@@ -71,6 +71,21 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
let _ = ALL_OPS.set(Mutex::new(all_ops_vec));
let _ = VECTOR_CLOCK.set(Mutex::new(vector_clock));
// Rebuild tombstone set: deleted list items still carry their original
// PipelineItemCrdt content, so we can extract the story_id directly.
let tombstones: HashSet<String> = crdt
.doc
.items
.ops
.iter()
.filter(|op| op.is_deleted)
.filter_map(|op| op.content.as_ref())
.filter_map(|item| match item.story_id.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
})
.collect();
// Build the indices from the reconstructed state.
let index = rebuild_index(&crdt);
let node_index = rebuild_node_index(&crdt);
@@ -151,6 +166,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
gateway_project_index,
persist_tx,
lamport_floor,
tombstones,
};
let _ = CRDT_STATE.set(Mutex::new(state));
+8 -1
View File
@@ -6,7 +6,7 @@
//! - [`init`]: async startup and keypair persistence
//! - [`apply`]: write path (sign, apply, persist, broadcast)
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::{Mutex, OnceLock};
use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp};
@@ -67,6 +67,12 @@ pub(super) struct CrdtState {
/// Newly-created registers (post-init) must have their Lamport clock
/// advanced to this floor so they don't re-emit low sequence numbers.
pub(super) lamport_floor: u64,
/// Story IDs permanently tombstoned via `evict_item`.
///
/// `write_item` consults this set before inserting a new CRDT entry so
/// that a concurrent or late-arriving write cannot resurrect a deleted
/// story. Rebuilt from the CRDT op log on restart.
pub(super) tombstones: HashSet<String>,
}
// ── Singleton and accessor ───────────────────────────────────────────
@@ -134,6 +140,7 @@ pub fn init_for_test() {
gateway_project_index: HashMap::new(),
persist_tx,
lamport_floor: 0,
tombstones: HashSet::new(),
};
let _ = lock.set(Mutex::new(state));
}
+2
View File
@@ -176,6 +176,7 @@ fn persist_tx_send_failure_logs_warn_with_op_type_and_seq() {
gateway_project_index: HashMap::new(),
persist_tx,
lamport_floor: 0,
tombstones: std::collections::HashSet::new(),
};
// Drop the receiver so that the next send fails immediately.
@@ -249,6 +250,7 @@ fn persist_tx_send_success_emits_no_warn() {
gateway_project_index: HashMap::new(),
persist_tx,
lamport_floor: 0,
tombstones: std::collections::HashSet::new(),
};
let item_json: JsonValue = json!({
+7
View File
@@ -143,6 +143,13 @@ pub fn write_item(
return;
};
// Reject any write (insert or update) for a tombstoned story_id.
// This prevents a concurrent or late-arriving write from resurrecting
// a story that was permanently deleted via evict_item.
if state.tombstones.contains(story_id) {
return;
}
if let Some(&idx) = state.index.get(story_id) {
// Capture the old stage before updating so we can detect transitions.
let old_stage = match state.crdt.doc.items[idx].stage.view() {
+81
View File
@@ -733,3 +733,84 @@ async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() {
);
}
}
// ── Story 889 regression tests ───────────────────────────────────────────────
/// Regression for story 889: a tombstoned story must not be resurrected by
/// concurrent write_item calls racing the delete. Spawns a tokio task that
/// hammers write_item every 10ms, tombstones the item mid-race, then verifies
/// the projection stays empty for ~500ms and remains empty after the writer
/// stops.
///
/// The tokio current_thread runtime keeps all tasks on the same OS thread, so
/// the thread-local test CRDT is visible to the spawned task.
#[tokio::test]
async fn tombstone_survives_concurrent_writes() {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use super::super::read::{evict_item, read_item};
init_for_test();
let story_id = "889_story_tombstone_concurrent";
write_item(
story_id,
"2_current",
Some("Tombstone Concurrent Test"),
None,
None,
None,
None,
None,
None,
None,
);
assert!(
read_item(story_id).is_some(),
"item must exist before eviction"
);
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
let writer = tokio::task::spawn(async move {
while !stop_clone.load(Ordering::Relaxed) {
write_item(
story_id,
"2_current",
Some("Tombstone Concurrent Test"),
None,
None,
None,
None,
None,
None,
None,
);
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
});
tokio::time::sleep(tokio::time::Duration::from_millis(30)).await;
evict_item(story_id).expect("evict_item must succeed");
let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_millis(500);
while tokio::time::Instant::now() < deadline {
assert!(
read_item(story_id).is_none(),
"tombstoned story must not reappear while concurrent writes are in flight"
);
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
stop.store(true, Ordering::Relaxed);
writer.await.unwrap();
assert!(
read_item(story_id).is_none(),
"tombstoned story must stay gone after concurrent writer stops"
);
}