diff --git a/server/src/io/watcher/sweep.rs b/server/src/io/watcher/sweep.rs index 5d5cb165..8555290c 100644 --- a/server/src/io/watcher/sweep.rs +++ b/server/src/io/watcher/sweep.rs @@ -82,11 +82,18 @@ pub(crate) fn spawn_done_to_archived_subscriber(done_retention: Duration) { /// whose retention has elapsed, even when the `TransitionFired` subscriber /// lagged and missed their Done event. Production reactive archiving uses /// [`spawn_done_to_archived_subscriber`] instead. +/// +/// Logs a summary INFO line on every call: candidates evaluated and items +/// archived, or "no items past retention" when nothing was swept. pub(crate) fn sweep_done_to_archived(done_retention: Duration) { use crate::pipeline_state::{PipelineEvent, Stage, apply_transition, read_all_typed}; + let mut candidates: usize = 0; + let mut archived: usize = 0; + for item in read_all_typed() { if let Stage::Done { merged_at, .. } = &item.stage { + candidates += 1; let age = chrono::Utc::now() .signed_duration_since(*merged_at) .to_std() @@ -94,7 +101,10 @@ pub(crate) fn sweep_done_to_archived(done_retention: Duration) { if age >= done_retention { let story_id = item.story_id.0.clone(); match apply_transition(&story_id, PipelineEvent::Accepted, None) { - Ok(_) => slog!("[watcher] sweep: promoted {story_id} → archived"), + Ok(_) => { + archived += 1; + slog!("[watcher] sweep: promoted {story_id} → archived") + } Err(e) => { slog!("[watcher] sweep: transition error for {story_id}: {e}") } @@ -102,4 +112,10 @@ pub(crate) fn sweep_done_to_archived(done_retention: Duration) { } } } + + if archived > 0 { + slog!("[watcher] sweep: {candidates} candidate(s) evaluated, {archived} archived"); + } else { + slog!("[watcher] sweep: {candidates} candidate(s) evaluated, no items past retention"); + } } diff --git a/server/src/io/watcher/tests.rs b/server/src/io/watcher/tests.rs index bbf25035..e1420235 100644 --- a/server/src/io/watcher/tests.rs +++ b/server/src/io/watcher/tests.rs @@ -301,6 +301,48 @@ async fn done_to_archived_subscriber_archives_on_transition() { ); } +/// Regression: simulates a server restart occurring between move-to-done and +/// the configured retention window expiry. +/// +/// Before the fix the archive-deadline was held only in the reactive +/// subscriber's volatile sleep task; a restart would lose that task and the +/// item would never be archived. The fix is that `sweep_done_to_archived` +/// reads `merged_at` from the CRDT (durable across restarts) and archives any +/// item whose age exceeds the retention, so the next periodic reconcile tick +/// after restart picks it up regardless of whether a sleep task existed. +#[test] +fn restart_scenario_sweep_archives_past_retention_after_sweep_tick() { + crate::crdt_state::init_for_test(); + crate::db::ensure_content_store(); + + let story_id = "9885_sweep_restart_regression"; + + // Simulate: item moved to Done 10 seconds before the restart. + // The reactive subscriber would have had a sleep task for the remaining + // retention time; that task is now gone (process restarted). + let ten_seconds_ago = (chrono::Utc::now() - chrono::Duration::seconds(10)).timestamp() as f64; + crate::crdt_state::write_item_str( + story_id, + "5_done", + Some("Restart regression test"), + None, + None, + Some(ten_seconds_ago), + ); + + // The next periodic reconcile tick after restart calls sweep_done_to_archived + // directly. With 5-second retention and merged_at 10s ago, the item must + // be archived even though no reactive subscriber sleep task exists. + sweep_done_to_archived(Duration::from_secs(5)); + + let items = crate::pipeline_state::read_all_typed(); + let item = items.iter().find(|i| i.story_id.0 == story_id); + assert!( + item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })), + "item past retention must be archived on the next sweep tick after a server restart" + ); +} + /// Prove that an item with merged_at NEWER than done_retention is NOT swept. #[test] fn sweep_keeps_item_newer_than_retention() {