huskies: merge 996
This commit is contained in:
@@ -0,0 +1,342 @@
|
||||
//! Content-store garbage collection: TransitionFired subscriber and startup sweep.
|
||||
//!
|
||||
//! When a pipeline item reaches a terminal stage (Done, Archived, Abandoned,
|
||||
//! Superseded, Rejected) every `ContentKey::*` entry for that story is purged
|
||||
//! from the in-memory content store. There are two purge paths:
|
||||
//!
|
||||
//! 1. **Subscriber** ([`spawn_content_gc_subscriber`]) — reacts to
|
||||
//! [`crate::pipeline_state::TransitionFired`] events and runs for new
|
||||
//! transitions in the current server session.
|
||||
//!
|
||||
//! 2. **Startup sweep** ([`sweep_zombie_content_on_startup`]) — cleans up
|
||||
//! zombie entries left over from sessions that predate the subscriber.
|
||||
|
||||
use crate::db::{ContentKey, all_content_ids, delete_content};
|
||||
use crate::pipeline_state::Stage;
|
||||
use crate::slog;
|
||||
use crate::slog_warn;
|
||||
|
||||
/// Purge every [`ContentKey`] variant for `story_id` from the in-memory content store.
|
||||
///
|
||||
/// All eight key namespaces are deleted unconditionally — deletes for absent
|
||||
/// keys are no-ops. Call this when a work item reaches a terminal stage to
|
||||
/// prevent long-lived zombie entries from accumulating in the process heap.
|
||||
pub(crate) fn purge_content_keys_for_story(story_id: &str) {
|
||||
delete_content(ContentKey::Story(story_id));
|
||||
delete_content(ContentKey::GateOutput(story_id));
|
||||
delete_content(ContentKey::AbortRespawnCount(story_id));
|
||||
delete_content(ContentKey::MergeMasterSpawnCount(story_id));
|
||||
delete_content(ContentKey::RunTestsOk(story_id));
|
||||
delete_content(ContentKey::CommitRecoveryPending(story_id));
|
||||
delete_content(ContentKey::MergeFixupPending(story_id));
|
||||
delete_content(ContentKey::MergeFailureKind(story_id));
|
||||
}
|
||||
|
||||
/// Spawn a background task that purges content-store entries when a story reaches a terminal stage.
|
||||
///
|
||||
/// Subscribes to [`crate::pipeline_state::subscribe_transitions`]. On each
|
||||
/// [`crate::pipeline_state::TransitionFired`] where `after` is `Done`,
|
||||
/// `Archived`, `Abandoned`, `Superseded`, or `Rejected`, all `ContentKey::*`
|
||||
/// entries for that story are purged. Lag events are logged as warnings —
|
||||
/// a missed event leaves zombie entries that the next startup sweep will remove.
|
||||
pub(crate) fn spawn_content_gc_subscriber() {
|
||||
let mut rx = crate::pipeline_state::subscribe_transitions();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(fired) => {
|
||||
if is_terminal_stage(&fired.after) {
|
||||
let story_id = &fired.story_id.0;
|
||||
slog!(
|
||||
"[content-gc] Story '{story_id}' reached terminal stage; \
|
||||
purging all content-store entries."
|
||||
);
|
||||
purge_content_keys_for_story(story_id);
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
slog_warn!(
|
||||
"[content-gc] Subscriber lagged, skipped {n} event(s). \
|
||||
Zombie content-store entries will be cleaned by the next startup sweep."
|
||||
);
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// One-shot startup sweep: purge content-store entries for stories that have
|
||||
/// already reached terminal stages or are absent from the CRDT.
|
||||
///
|
||||
/// Idempotent — safe to call more than once. Intended to clean up zombie
|
||||
/// entries left over from server sessions that predate the GC subscriber.
|
||||
pub(crate) fn sweep_zombie_content_on_startup() {
|
||||
let raw_keys = all_content_ids();
|
||||
|
||||
// Extract unique base story IDs from raw content-store keys.
|
||||
// Raw key formats: `"{id}"` (Story) or `"{id}:{suffix}"` (compound keys).
|
||||
// Story IDs never contain `:`, so splitting on `:` is unambiguous.
|
||||
let mut story_ids: Vec<String> = raw_keys
|
||||
.iter()
|
||||
.map(|k| {
|
||||
k.split_once(':')
|
||||
.map(|(base, _)| base)
|
||||
.unwrap_or(k)
|
||||
.to_string()
|
||||
})
|
||||
.collect();
|
||||
story_ids.sort();
|
||||
story_ids.dedup();
|
||||
|
||||
let mut swept = 0usize;
|
||||
for story_id in &story_ids {
|
||||
let should_purge = match crate::crdt_state::read_item(story_id) {
|
||||
// Tombstoned or absent from the live CRDT index — purge.
|
||||
None => true,
|
||||
Some(item) => is_terminal_stage(item.stage()),
|
||||
};
|
||||
if should_purge {
|
||||
purge_content_keys_for_story(story_id);
|
||||
swept += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if swept > 0 {
|
||||
slog!(
|
||||
"[content-gc] Startup sweep purged content-store entries for \
|
||||
{swept} zombie story(s)."
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Return `true` when `stage` is one of the five terminal pipeline stages.
|
||||
fn is_terminal_stage(stage: &Stage) -> bool {
|
||||
matches!(
|
||||
stage,
|
||||
Stage::Done { .. }
|
||||
| Stage::Archived { .. }
|
||||
| Stage::Abandoned { .. }
|
||||
| Stage::Superseded { .. }
|
||||
| Stage::Rejected { .. }
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::db::{
|
||||
ContentKey, ItemMeta, ensure_content_store, read_content, write_content,
|
||||
write_item_with_content,
|
||||
};
|
||||
|
||||
/// Write all eight ContentKey variants for a story.
|
||||
fn seed_all_keys(story_id: &str) {
|
||||
write_content(ContentKey::Story(story_id), "body");
|
||||
write_content(ContentKey::GateOutput(story_id), "gate");
|
||||
write_content(ContentKey::AbortRespawnCount(story_id), "1");
|
||||
write_content(ContentKey::MergeMasterSpawnCount(story_id), "2");
|
||||
write_content(ContentKey::RunTestsOk(story_id), "ok");
|
||||
write_content(ContentKey::CommitRecoveryPending(story_id), "1");
|
||||
write_content(ContentKey::MergeFixupPending(story_id), "1");
|
||||
write_content(ContentKey::MergeFailureKind(story_id), r#""GatesFailed""#);
|
||||
}
|
||||
|
||||
/// Assert all eight ContentKey variants for a story are absent from the store.
|
||||
fn assert_all_keys_absent(story_id: &str) {
|
||||
assert!(
|
||||
read_content(ContentKey::Story(story_id)).is_none(),
|
||||
"Story key must be absent"
|
||||
);
|
||||
assert!(
|
||||
read_content(ContentKey::GateOutput(story_id)).is_none(),
|
||||
"GateOutput key must be absent"
|
||||
);
|
||||
assert!(
|
||||
read_content(ContentKey::AbortRespawnCount(story_id)).is_none(),
|
||||
"AbortRespawnCount key must be absent"
|
||||
);
|
||||
assert!(
|
||||
read_content(ContentKey::MergeMasterSpawnCount(story_id)).is_none(),
|
||||
"MergeMasterSpawnCount key must be absent"
|
||||
);
|
||||
assert!(
|
||||
read_content(ContentKey::RunTestsOk(story_id)).is_none(),
|
||||
"RunTestsOk key must be absent"
|
||||
);
|
||||
assert!(
|
||||
read_content(ContentKey::CommitRecoveryPending(story_id)).is_none(),
|
||||
"CommitRecoveryPending key must be absent"
|
||||
);
|
||||
assert!(
|
||||
read_content(ContentKey::MergeFixupPending(story_id)).is_none(),
|
||||
"MergeFixupPending key must be absent"
|
||||
);
|
||||
assert!(
|
||||
read_content(ContentKey::MergeFailureKind(story_id)).is_none(),
|
||||
"MergeFailureKind key must be absent"
|
||||
);
|
||||
}
|
||||
|
||||
/// AC1: purge_content_keys_for_story removes all eight ContentKey namespaces.
|
||||
#[test]
|
||||
fn purge_clears_all_eight_content_key_namespaces() {
|
||||
ensure_content_store();
|
||||
let id = "996_test_purge_all";
|
||||
seed_all_keys(id);
|
||||
|
||||
// Verify every key is present before the purge.
|
||||
assert!(read_content(ContentKey::Story(id)).is_some());
|
||||
assert!(read_content(ContentKey::GateOutput(id)).is_some());
|
||||
assert!(read_content(ContentKey::AbortRespawnCount(id)).is_some());
|
||||
assert!(read_content(ContentKey::MergeMasterSpawnCount(id)).is_some());
|
||||
assert!(read_content(ContentKey::RunTestsOk(id)).is_some());
|
||||
assert!(read_content(ContentKey::CommitRecoveryPending(id)).is_some());
|
||||
assert!(read_content(ContentKey::MergeFixupPending(id)).is_some());
|
||||
assert!(read_content(ContentKey::MergeFailureKind(id)).is_some());
|
||||
|
||||
purge_content_keys_for_story(id);
|
||||
|
||||
assert_all_keys_absent(id);
|
||||
}
|
||||
|
||||
/// AC1: purge_content_keys_for_story is idempotent — calling it twice on an
|
||||
/// already-empty store is safe.
|
||||
#[test]
|
||||
fn purge_is_idempotent_when_keys_already_absent() {
|
||||
ensure_content_store();
|
||||
let id = "996_test_purge_idempotent";
|
||||
|
||||
// Both calls must complete without panic.
|
||||
purge_content_keys_for_story(id);
|
||||
purge_content_keys_for_story(id);
|
||||
assert_all_keys_absent(id);
|
||||
}
|
||||
|
||||
/// AC1 + AC4: the GC subscriber reacts to an Abandoned terminal transition and
|
||||
/// purges all content-store entries for the story.
|
||||
#[tokio::test]
|
||||
async fn subscriber_purges_content_on_terminal_transition() {
|
||||
crate::crdt_state::init_for_test();
|
||||
ensure_content_store();
|
||||
|
||||
let story_id = "996_test_sub_terminal";
|
||||
|
||||
write_item_with_content(
|
||||
story_id,
|
||||
"2_current",
|
||||
"---\nname: GC Sub Test\n---\n",
|
||||
ItemMeta::named("GC Sub Test"),
|
||||
);
|
||||
|
||||
seed_all_keys(story_id);
|
||||
|
||||
spawn_content_gc_subscriber();
|
||||
|
||||
// Transition to Abandoned (a terminal stage).
|
||||
crate::agents::lifecycle::abandon_story(story_id).expect("abandon must succeed");
|
||||
|
||||
// Give the subscriber task time to run.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
||||
|
||||
assert_all_keys_absent(story_id);
|
||||
}
|
||||
|
||||
/// AC4: the subscriber does NOT purge content for stories that remain in
|
||||
/// active (non-terminal) stages.
|
||||
#[tokio::test]
|
||||
async fn subscriber_does_not_purge_active_story_content() {
|
||||
crate::crdt_state::init_for_test();
|
||||
ensure_content_store();
|
||||
|
||||
let active_id = "996_test_sub_active";
|
||||
let terminal_id = "996_test_sub_term_2";
|
||||
|
||||
for id in [active_id, terminal_id] {
|
||||
write_item_with_content(
|
||||
id,
|
||||
"2_current",
|
||||
"---\nname: GC Active Test\n---\n",
|
||||
ItemMeta::named("GC Active Test"),
|
||||
);
|
||||
seed_all_keys(id);
|
||||
}
|
||||
|
||||
spawn_content_gc_subscriber();
|
||||
|
||||
// Only terminate one of the two stories.
|
||||
crate::agents::lifecycle::abandon_story(terminal_id).expect("abandon must succeed");
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
||||
|
||||
// Terminal story's content must be gone.
|
||||
assert_all_keys_absent(terminal_id);
|
||||
|
||||
// Active story's main content key must still be present.
|
||||
assert!(
|
||||
read_content(ContentKey::Story(active_id)).is_some(),
|
||||
"active story content must not be purged"
|
||||
);
|
||||
}
|
||||
|
||||
/// AC2: sweep_zombie_content_on_startup purges content for a tombstoned story.
|
||||
#[test]
|
||||
fn startup_sweep_purges_tombstoned_story_content() {
|
||||
crate::crdt_state::init_for_test();
|
||||
ensure_content_store();
|
||||
|
||||
let story_id = "996_test_sweep_tombstone";
|
||||
write_item_with_content(
|
||||
story_id,
|
||||
"1_backlog",
|
||||
"---\nname: Sweep Tombstone Test\n---\n",
|
||||
ItemMeta::named("Sweep Tombstone Test"),
|
||||
);
|
||||
seed_all_keys(story_id);
|
||||
|
||||
// Tombstone the item (evict_item drops only ContentKey::Story).
|
||||
crate::crdt_state::evict_item(story_id).expect("evict must succeed");
|
||||
|
||||
// Re-seed all keys to simulate the zombie state evict_item leaves behind.
|
||||
seed_all_keys(story_id);
|
||||
|
||||
// The startup sweep must purge the remaining zombie keys.
|
||||
sweep_zombie_content_on_startup();
|
||||
|
||||
assert_all_keys_absent(story_id);
|
||||
}
|
||||
|
||||
/// AC2: sweep_zombie_content_on_startup leaves active stories' content intact.
|
||||
#[test]
|
||||
fn startup_sweep_preserves_active_story_content() {
|
||||
crate::crdt_state::init_for_test();
|
||||
ensure_content_store();
|
||||
|
||||
let live_id = "996_test_sweep_live";
|
||||
write_item_with_content(
|
||||
live_id,
|
||||
"2_current",
|
||||
"---\nname: Live Story\n---\n",
|
||||
ItemMeta::named("Live Story"),
|
||||
);
|
||||
write_content(ContentKey::Story(live_id), "live content");
|
||||
|
||||
sweep_zombie_content_on_startup();
|
||||
|
||||
assert_eq!(
|
||||
read_content(ContentKey::Story(live_id)).as_deref(),
|
||||
Some("live content"),
|
||||
"active story content must survive the startup sweep"
|
||||
);
|
||||
}
|
||||
|
||||
/// AC2: sweep_zombie_content_on_startup is idempotent.
|
||||
#[test]
|
||||
fn startup_sweep_is_idempotent() {
|
||||
crate::crdt_state::init_for_test();
|
||||
ensure_content_store();
|
||||
|
||||
sweep_zombie_content_on_startup();
|
||||
sweep_zombie_content_on_startup();
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,8 @@
|
||||
/// On startup, existing content is loaded from the database into memory so
|
||||
/// no filesystem scan is needed after migration.
|
||||
pub mod content_store;
|
||||
/// Content-store garbage collection: TransitionFired subscriber and startup sweep.
|
||||
pub mod gc;
|
||||
/// Write operations for the pipeline — content, stage transitions, and deletions.
|
||||
pub mod ops;
|
||||
/// Recovery for half-written pipeline items (bug 1001 backfill).
|
||||
|
||||
@@ -65,6 +65,11 @@ pub(crate) fn spawn_event_bridges(
|
||||
root.clone(),
|
||||
);
|
||||
|
||||
// Content-store GC subscriber: purges all ContentKey::* entries for a
|
||||
// story when it reaches a terminal stage, preventing zombie entries from
|
||||
// accumulating in the process heap (story 996).
|
||||
crate::db::gc::spawn_content_gc_subscriber();
|
||||
|
||||
let watcher_auto_rx = watcher_tx.subscribe();
|
||||
let watcher_auto_agents = Arc::clone(&agents);
|
||||
tokio::spawn(async move {
|
||||
@@ -200,6 +205,9 @@ pub(crate) fn spawn_startup_reconciliation(
|
||||
) {
|
||||
if let Some(root) = startup_root {
|
||||
tokio::spawn(async move {
|
||||
// Purge content-store entries for stories that reached terminal
|
||||
// stages in a previous session (before the GC subscriber was active).
|
||||
crate::db::gc::sweep_zombie_content_on_startup();
|
||||
crate::slog!("[startup] Reconciling completed worktrees from previous session.");
|
||||
startup_agents
|
||||
.reconcile_on_startup(&root, &startup_reconciliation_tx)
|
||||
|
||||
Reference in New Issue
Block a user