huskies: merge 538_bug_done_archived_sweep_never_fires_because_stage_done_projection_uses_utc_now_instead_of_real_merged_at_timestamp

This commit is contained in:
dave
2026-04-11 13:25:51 +00:00
parent 5d193bb568
commit 4ab723f40b
5 changed files with 130 additions and 4 deletions
+19
View File
@@ -99,6 +99,10 @@ pub struct PipelineItemCrdt {
/// Used for timeout-based reclaim: if a node crashes, other nodes can
/// reclaim the item after the timeout expires.
pub claimed_at: LwwRegisterCrdt<f64>,
/// Unix timestamp (seconds) when the item was merged to master.
/// Written once when the item transitions to `5_done`. Used by the
/// sweep loop to determine when to promote to `6_archived`.
pub merged_at: LwwRegisterCrdt<f64>,
}
/// CRDT node that holds a single peer's presence entry.
@@ -131,6 +135,9 @@ pub struct PipelineItemView {
pub claimed_by: Option<String>,
/// Unix timestamp when the item was claimed.
pub claimed_at: Option<f64>,
/// Unix timestamp (seconds) when the item was merged to master.
/// `None` for items that were never in `5_done` or for legacy items.
pub merged_at: Option<f64>,
}
/// A snapshot of a single node presence entry derived from the CRDT document.
@@ -413,6 +420,7 @@ pub fn write_item(
depends_on: Option<&str>,
claimed_by: Option<&str>,
claimed_at: Option<f64>,
merged_at: Option<f64>,
) {
let Some(state_mutex) = get_crdt() else {
return;
@@ -468,6 +476,11 @@ pub fn write_item(
s.crdt.doc.items[idx].claimed_at.set(ca)
});
}
if let Some(ma) = merged_at {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].merged_at.set(ma)
});
}
// Broadcast a CrdtEvent if the stage actually changed.
let stage_changed = old_stage.as_deref() != Some(stage);
@@ -496,6 +509,7 @@ pub fn write_item(
"depends_on": depends_on.unwrap_or(""),
"claimed_by": claimed_by.unwrap_or(""),
"claimed_at": claimed_at.unwrap_or(0.0),
"merged_at": merged_at.unwrap_or(0.0),
})
.into();
@@ -1083,6 +1097,10 @@ fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemView> {
JsonValue::Number(n) if n > 0.0 => Some(n),
_ => None,
};
let merged_at = match item.merged_at.view() {
JsonValue::Number(n) if n > 0.0 => Some(n),
_ => None,
};
Some(PipelineItemView {
story_id,
@@ -1094,6 +1112,7 @@ fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemView> {
depends_on,
claimed_by,
claimed_at,
merged_at,
})
}
+13
View File
@@ -244,6 +244,11 @@ pub fn write_item_with_content(
write_content(story_id, content);
// Primary: CRDT ops.
let merged_at_ts = if stage == "5_done" {
Some(chrono::Utc::now().timestamp() as f64)
} else {
None
};
crate::crdt_state::write_item(
story_id,
stage,
@@ -254,6 +259,7 @@ pub fn write_item_with_content(
depends_on.as_deref(),
None,
None,
merged_at_ts,
);
// Shadow: pipeline_items table (only when DB is initialised).
@@ -312,6 +318,11 @@ pub fn move_item_stage(
.unwrap_or((None, None, None, None, None));
// CRDT stage transition.
let merged_at_ts = if new_stage == "5_done" {
Some(chrono::Utc::now().timestamp() as f64)
} else {
None
};
crate::crdt_state::write_item(
story_id,
new_stage,
@@ -322,6 +333,7 @@ pub fn move_item_stage(
depends_on.as_deref(),
None,
None,
merged_at_ts,
);
// Shadow table.
@@ -453,6 +465,7 @@ pub async fn sync_crdt_stages_from_db(db_path: &Path) {
depends_on.as_deref(),
None,
None,
None, // merged_at unknown for migrated items; epoch fallback sweeps them
);
corrected += 1;
} else {
+75
View File
@@ -349,6 +349,7 @@ pub(crate) fn sweep_done_to_archived(done_retention: Duration) {
None,
None,
None,
None,
);
slog!("[watcher] sweep: promoted {story_id} → 6_archived/");
}
@@ -1125,4 +1126,78 @@ mod tests {
"item should be archived with zero retention"
);
}
/// Prove that the sweep reads `merged_at` from the CRDT (not `Utc::now()`).
///
/// This test sets `merged_at` to 10 seconds in the past and uses a 5-second
/// retention. If the sweep were still using `Utc::now()` as the start time
/// (the original bug), the elapsed time would be ~0 and the item would NOT
/// be swept. With the fix, the item is swept because 10s > 5s retention.
#[test]
fn sweep_uses_crdt_merged_at_not_utc_now() {
crate::db::ensure_content_store();
let ten_seconds_ago =
(chrono::Utc::now() - chrono::Duration::seconds(10)).timestamp() as f64;
// Write item in 5_done with an explicit past merged_at timestamp.
crate::crdt_state::write_item(
"9883_story_sweep_merged_at",
"5_done",
Some("merged_at test"),
None,
None,
None,
None,
None,
None,
Some(ten_seconds_ago),
);
// 5-second retention: item is 10s old → should be swept.
sweep_done_to_archived(Duration::from_secs(5));
let items = crate::pipeline_state::read_all_typed();
let item = items
.iter()
.find(|i| i.story_id.0 == "9883_story_sweep_merged_at");
assert!(
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })),
"item with merged_at 10s ago should be archived with 5s retention"
);
}
/// Prove that an item with merged_at NEWER than done_retention is NOT swept.
#[test]
fn sweep_keeps_item_newer_than_retention() {
crate::db::ensure_content_store();
let one_second_ago =
(chrono::Utc::now() - chrono::Duration::seconds(1)).timestamp() as f64;
crate::crdt_state::write_item(
"9884_story_sweep_recent",
"5_done",
Some("recent merged_at test"),
None,
None,
None,
None,
None,
None,
Some(one_second_ago),
);
// 1-hour retention: item is only 1s old → should NOT be swept.
sweep_done_to_archived(Duration::from_secs(3600));
let items = crate::pipeline_state::read_all_typed();
let item = items
.iter()
.find(|i| i.story_id.0 == "9884_story_sweep_recent");
assert!(
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Done { .. })),
"item with merged_at 1s ago should stay in Done with 1-hour retention"
);
}
}
+18 -4
View File
@@ -585,11 +585,19 @@ fn project_stage(view: &PipelineItemView) -> Result<Stage, ProjectionError> {
})
}
"5_done" => {
// Existing CRDT data doesn't carry merge metadata. For projection
// from legacy data, we use epoch/placeholder values. New items
// entering Done via the transition function will carry real data.
// Use the stored merged_at timestamp if present. Legacy items
// that pre-date this field have merged_at = None, so we fall back
// to UNIX_EPOCH, which makes them older than any retention window
// and therefore eligible for immediate sweep to 6_archived.
let merged_at = view
.merged_at
.map(|ts| {
DateTime::from_timestamp(ts as i64, 0)
.unwrap_or(DateTime::<Utc>::UNIX_EPOCH)
})
.unwrap_or(DateTime::<Utc>::UNIX_EPOCH);
Ok(Stage::Done {
merged_at: Utc::now(),
merged_at,
merge_commit: GitSha("legacy".to_string()),
})
}
@@ -1147,6 +1155,7 @@ mod tests {
depends_on: Some(vec![10, 20]),
claimed_by: None,
claimed_at: None,
merged_at: None,
};
let item = PipelineItem::try_from(&view).unwrap();
assert_eq!(item.story_id, StoryId("42_story_test".to_string()));
@@ -1168,6 +1177,7 @@ mod tests {
depends_on: None,
claimed_by: None,
claimed_at: None,
merged_at: None,
};
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(item.stage, Stage::Coding));
@@ -1186,6 +1196,7 @@ mod tests {
depends_on: None,
claimed_by: None,
claimed_at: None,
merged_at: None,
};
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(item.stage, Stage::Merge { .. }));
@@ -1211,6 +1222,7 @@ mod tests {
depends_on: None,
claimed_by: None,
claimed_at: None,
merged_at: None,
};
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(
@@ -1234,6 +1246,7 @@ mod tests {
depends_on: None,
claimed_by: None,
claimed_at: None,
merged_at: None,
};
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(
@@ -1257,6 +1270,7 @@ mod tests {
depends_on: None,
claimed_by: None,
claimed_at: None,
merged_at: None,
};
let result = PipelineItem::try_from(&view);
assert!(matches!(