huskies: merge 1070
This commit is contained in:
@@ -82,11 +82,18 @@ pub(crate) fn spawn_done_to_archived_subscriber(done_retention: Duration) {
|
|||||||
/// whose retention has elapsed, even when the `TransitionFired` subscriber
|
/// whose retention has elapsed, even when the `TransitionFired` subscriber
|
||||||
/// lagged and missed their Done event. Production reactive archiving uses
|
/// lagged and missed their Done event. Production reactive archiving uses
|
||||||
/// [`spawn_done_to_archived_subscriber`] instead.
|
/// [`spawn_done_to_archived_subscriber`] instead.
|
||||||
|
///
|
||||||
|
/// Logs a summary INFO line on every call: candidates evaluated and items
|
||||||
|
/// archived, or "no items past retention" when nothing was swept.
|
||||||
pub(crate) fn sweep_done_to_archived(done_retention: Duration) {
|
pub(crate) fn sweep_done_to_archived(done_retention: Duration) {
|
||||||
use crate::pipeline_state::{PipelineEvent, Stage, apply_transition, read_all_typed};
|
use crate::pipeline_state::{PipelineEvent, Stage, apply_transition, read_all_typed};
|
||||||
|
|
||||||
|
let mut candidates: usize = 0;
|
||||||
|
let mut archived: usize = 0;
|
||||||
|
|
||||||
for item in read_all_typed() {
|
for item in read_all_typed() {
|
||||||
if let Stage::Done { merged_at, .. } = &item.stage {
|
if let Stage::Done { merged_at, .. } = &item.stage {
|
||||||
|
candidates += 1;
|
||||||
let age = chrono::Utc::now()
|
let age = chrono::Utc::now()
|
||||||
.signed_duration_since(*merged_at)
|
.signed_duration_since(*merged_at)
|
||||||
.to_std()
|
.to_std()
|
||||||
@@ -94,7 +101,10 @@ pub(crate) fn sweep_done_to_archived(done_retention: Duration) {
|
|||||||
if age >= done_retention {
|
if age >= done_retention {
|
||||||
let story_id = item.story_id.0.clone();
|
let story_id = item.story_id.0.clone();
|
||||||
match apply_transition(&story_id, PipelineEvent::Accepted, None) {
|
match apply_transition(&story_id, PipelineEvent::Accepted, None) {
|
||||||
Ok(_) => slog!("[watcher] sweep: promoted {story_id} → archived"),
|
Ok(_) => {
|
||||||
|
archived += 1;
|
||||||
|
slog!("[watcher] sweep: promoted {story_id} → archived")
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
slog!("[watcher] sweep: transition error for {story_id}: {e}")
|
slog!("[watcher] sweep: transition error for {story_id}: {e}")
|
||||||
}
|
}
|
||||||
@@ -102,4 +112,10 @@ pub(crate) fn sweep_done_to_archived(done_retention: Duration) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if archived > 0 {
|
||||||
|
slog!("[watcher] sweep: {candidates} candidate(s) evaluated, {archived} archived");
|
||||||
|
} else {
|
||||||
|
slog!("[watcher] sweep: {candidates} candidate(s) evaluated, no items past retention");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -301,6 +301,48 @@ async fn done_to_archived_subscriber_archives_on_transition() {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Regression: simulates a server restart occurring between move-to-done and
|
||||||
|
/// the configured retention window expiry.
|
||||||
|
///
|
||||||
|
/// Before the fix the archive-deadline was held only in the reactive
|
||||||
|
/// subscriber's volatile sleep task; a restart would lose that task and the
|
||||||
|
/// item would never be archived. The fix is that `sweep_done_to_archived`
|
||||||
|
/// reads `merged_at` from the CRDT (durable across restarts) and archives any
|
||||||
|
/// item whose age exceeds the retention, so the next periodic reconcile tick
|
||||||
|
/// after restart picks it up regardless of whether a sleep task existed.
|
||||||
|
#[test]
|
||||||
|
fn restart_scenario_sweep_archives_past_retention_after_sweep_tick() {
|
||||||
|
crate::crdt_state::init_for_test();
|
||||||
|
crate::db::ensure_content_store();
|
||||||
|
|
||||||
|
let story_id = "9885_sweep_restart_regression";
|
||||||
|
|
||||||
|
// Simulate: item moved to Done 10 seconds before the restart.
|
||||||
|
// The reactive subscriber would have had a sleep task for the remaining
|
||||||
|
// retention time; that task is now gone (process restarted).
|
||||||
|
let ten_seconds_ago = (chrono::Utc::now() - chrono::Duration::seconds(10)).timestamp() as f64;
|
||||||
|
crate::crdt_state::write_item_str(
|
||||||
|
story_id,
|
||||||
|
"5_done",
|
||||||
|
Some("Restart regression test"),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Some(ten_seconds_ago),
|
||||||
|
);
|
||||||
|
|
||||||
|
// The next periodic reconcile tick after restart calls sweep_done_to_archived
|
||||||
|
// directly. With 5-second retention and merged_at 10s ago, the item must
|
||||||
|
// be archived even though no reactive subscriber sleep task exists.
|
||||||
|
sweep_done_to_archived(Duration::from_secs(5));
|
||||||
|
|
||||||
|
let items = crate::pipeline_state::read_all_typed();
|
||||||
|
let item = items.iter().find(|i| i.story_id.0 == story_id);
|
||||||
|
assert!(
|
||||||
|
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })),
|
||||||
|
"item past retention must be archived on the next sweep tick after a server restart"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
/// Prove that an item with merged_at NEWER than done_retention is NOT swept.
|
/// Prove that an item with merged_at NEWER than done_retention is NOT swept.
|
||||||
#[test]
|
#[test]
|
||||||
fn sweep_keeps_item_newer_than_retention() {
|
fn sweep_keeps_item_newer_than_retention() {
|
||||||
|
|||||||
Reference in New Issue
Block a user