diff --git a/crates/bft-json-crdt/src/json_crdt.rs b/crates/bft-json-crdt/src/json_crdt.rs index b49730f5..fcdb289f 100644 --- a/crates/bft-json-crdt/src/json_crdt.rs +++ b/crates/bft-json-crdt/src/json_crdt.rs @@ -293,6 +293,11 @@ impl BaseCrdt { // NOT add its signed_digest to `received`: a legitimate op that shares // the same signed_digest (e.g. the un-tampered original) would otherwise // be silently dropped as AlreadySeen. + // Only mark as received and unblock dependents when the op was actually + // applied. If we insert on error (e.g. ErrHashMismatch), a subsequent + // apply of a *legitimate* op with the same signed_digest would be + // silently dropped as AlreadySeen, preventing equivocation detection + // from working correctly. if status == OpState::Ok { self.received.insert(op_id); diff --git a/server/src/crdt_state.rs b/server/src/crdt_state.rs index 9da2c93a..8d2e97f8 100644 --- a/server/src/crdt_state.rs +++ b/server/src/crdt_state.rs @@ -99,6 +99,10 @@ pub struct PipelineItemCrdt { /// Used for timeout-based reclaim: if a node crashes, other nodes can /// reclaim the item after the timeout expires. pub claimed_at: LwwRegisterCrdt, + /// Unix timestamp (seconds) when the item was merged to master. + /// Written once when the item transitions to `5_done`. Used by the + /// sweep loop to determine when to promote to `6_archived`. + pub merged_at: LwwRegisterCrdt, } /// CRDT node that holds a single peer's presence entry. @@ -131,6 +135,9 @@ pub struct PipelineItemView { pub claimed_by: Option, /// Unix timestamp when the item was claimed. pub claimed_at: Option, + /// Unix timestamp (seconds) when the item was merged to master. + /// `None` for items that were never in `5_done` or for legacy items. + pub merged_at: Option, } /// A snapshot of a single node presence entry derived from the CRDT document. @@ -413,6 +420,7 @@ pub fn write_item( depends_on: Option<&str>, claimed_by: Option<&str>, claimed_at: Option, + merged_at: Option, ) { let Some(state_mutex) = get_crdt() else { return; @@ -468,6 +476,11 @@ pub fn write_item( s.crdt.doc.items[idx].claimed_at.set(ca) }); } + if let Some(ma) = merged_at { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].merged_at.set(ma) + }); + } // Broadcast a CrdtEvent if the stage actually changed. let stage_changed = old_stage.as_deref() != Some(stage); @@ -496,6 +509,7 @@ pub fn write_item( "depends_on": depends_on.unwrap_or(""), "claimed_by": claimed_by.unwrap_or(""), "claimed_at": claimed_at.unwrap_or(0.0), + "merged_at": merged_at.unwrap_or(0.0), }) .into(); @@ -1083,6 +1097,10 @@ fn extract_item_view(item: &PipelineItemCrdt) -> Option { JsonValue::Number(n) if n > 0.0 => Some(n), _ => None, }; + let merged_at = match item.merged_at.view() { + JsonValue::Number(n) if n > 0.0 => Some(n), + _ => None, + }; Some(PipelineItemView { story_id, @@ -1094,6 +1112,7 @@ fn extract_item_view(item: &PipelineItemCrdt) -> Option { depends_on, claimed_by, claimed_at, + merged_at, }) } diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 2401cc2e..9cc7f019 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -244,6 +244,11 @@ pub fn write_item_with_content( write_content(story_id, content); // Primary: CRDT ops. + let merged_at_ts = if stage == "5_done" { + Some(chrono::Utc::now().timestamp() as f64) + } else { + None + }; crate::crdt_state::write_item( story_id, stage, @@ -254,6 +259,7 @@ pub fn write_item_with_content( depends_on.as_deref(), None, None, + merged_at_ts, ); // Shadow: pipeline_items table (only when DB is initialised). @@ -312,6 +318,11 @@ pub fn move_item_stage( .unwrap_or((None, None, None, None, None)); // CRDT stage transition. + let merged_at_ts = if new_stage == "5_done" { + Some(chrono::Utc::now().timestamp() as f64) + } else { + None + }; crate::crdt_state::write_item( story_id, new_stage, @@ -322,6 +333,7 @@ pub fn move_item_stage( depends_on.as_deref(), None, None, + merged_at_ts, ); // Shadow table. @@ -453,6 +465,7 @@ pub async fn sync_crdt_stages_from_db(db_path: &Path) { depends_on.as_deref(), None, None, + None, // merged_at unknown for migrated items; epoch fallback sweeps them ); corrected += 1; } else { diff --git a/server/src/io/watcher.rs b/server/src/io/watcher.rs index b239aa3a..2ed3d375 100644 --- a/server/src/io/watcher.rs +++ b/server/src/io/watcher.rs @@ -349,6 +349,7 @@ pub(crate) fn sweep_done_to_archived(done_retention: Duration) { None, None, None, + None, ); slog!("[watcher] sweep: promoted {story_id} → 6_archived/"); } @@ -1125,4 +1126,78 @@ mod tests { "item should be archived with zero retention" ); } + + /// Prove that the sweep reads `merged_at` from the CRDT (not `Utc::now()`). + /// + /// This test sets `merged_at` to 10 seconds in the past and uses a 5-second + /// retention. If the sweep were still using `Utc::now()` as the start time + /// (the original bug), the elapsed time would be ~0 and the item would NOT + /// be swept. With the fix, the item is swept because 10s > 5s retention. + #[test] + fn sweep_uses_crdt_merged_at_not_utc_now() { + crate::db::ensure_content_store(); + + let ten_seconds_ago = + (chrono::Utc::now() - chrono::Duration::seconds(10)).timestamp() as f64; + + // Write item in 5_done with an explicit past merged_at timestamp. + crate::crdt_state::write_item( + "9883_story_sweep_merged_at", + "5_done", + Some("merged_at test"), + None, + None, + None, + None, + None, + None, + Some(ten_seconds_ago), + ); + + // 5-second retention: item is 10s old → should be swept. + sweep_done_to_archived(Duration::from_secs(5)); + + let items = crate::pipeline_state::read_all_typed(); + let item = items + .iter() + .find(|i| i.story_id.0 == "9883_story_sweep_merged_at"); + assert!( + item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })), + "item with merged_at 10s ago should be archived with 5s retention" + ); + } + + /// Prove that an item with merged_at NEWER than done_retention is NOT swept. + #[test] + fn sweep_keeps_item_newer_than_retention() { + crate::db::ensure_content_store(); + + let one_second_ago = + (chrono::Utc::now() - chrono::Duration::seconds(1)).timestamp() as f64; + + crate::crdt_state::write_item( + "9884_story_sweep_recent", + "5_done", + Some("recent merged_at test"), + None, + None, + None, + None, + None, + None, + Some(one_second_ago), + ); + + // 1-hour retention: item is only 1s old → should NOT be swept. + sweep_done_to_archived(Duration::from_secs(3600)); + + let items = crate::pipeline_state::read_all_typed(); + let item = items + .iter() + .find(|i| i.story_id.0 == "9884_story_sweep_recent"); + assert!( + item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Done { .. })), + "item with merged_at 1s ago should stay in Done with 1-hour retention" + ); + } } diff --git a/server/src/pipeline_state.rs b/server/src/pipeline_state.rs index 488550e7..31f59fc5 100644 --- a/server/src/pipeline_state.rs +++ b/server/src/pipeline_state.rs @@ -585,11 +585,19 @@ fn project_stage(view: &PipelineItemView) -> Result { }) } "5_done" => { - // Existing CRDT data doesn't carry merge metadata. For projection - // from legacy data, we use epoch/placeholder values. New items - // entering Done via the transition function will carry real data. + // Use the stored merged_at timestamp if present. Legacy items + // that pre-date this field have merged_at = None, so we fall back + // to UNIX_EPOCH, which makes them older than any retention window + // and therefore eligible for immediate sweep to 6_archived. + let merged_at = view + .merged_at + .map(|ts| { + DateTime::from_timestamp(ts as i64, 0) + .unwrap_or(DateTime::::UNIX_EPOCH) + }) + .unwrap_or(DateTime::::UNIX_EPOCH); Ok(Stage::Done { - merged_at: Utc::now(), + merged_at, merge_commit: GitSha("legacy".to_string()), }) } @@ -1147,6 +1155,7 @@ mod tests { depends_on: Some(vec![10, 20]), claimed_by: None, claimed_at: None, + merged_at: None, }; let item = PipelineItem::try_from(&view).unwrap(); assert_eq!(item.story_id, StoryId("42_story_test".to_string())); @@ -1168,6 +1177,7 @@ mod tests { depends_on: None, claimed_by: None, claimed_at: None, + merged_at: None, }; let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Coding)); @@ -1186,6 +1196,7 @@ mod tests { depends_on: None, claimed_by: None, claimed_at: None, + merged_at: None, }; let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Merge { .. })); @@ -1211,6 +1222,7 @@ mod tests { depends_on: None, claimed_by: None, claimed_at: None, + merged_at: None, }; let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!( @@ -1234,6 +1246,7 @@ mod tests { depends_on: None, claimed_by: None, claimed_at: None, + merged_at: None, }; let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!( @@ -1257,6 +1270,7 @@ mod tests { depends_on: None, claimed_by: None, claimed_at: None, + merged_at: None, }; let result = PipelineItem::try_from(&view); assert!(matches!(