From 1ee23e7bfe93b18cde0cf510aadfbfb6a6f8936d Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 13 May 2026 22:23:46 +0000 Subject: [PATCH] huskies: merge 996 --- server/src/db/gc.rs | 342 ++++++++++++++++++++++++++++++++ server/src/db/mod.rs | 2 + server/src/startup/tick_loop.rs | 8 + 3 files changed, 352 insertions(+) create mode 100644 server/src/db/gc.rs diff --git a/server/src/db/gc.rs b/server/src/db/gc.rs new file mode 100644 index 00000000..46f6adf2 --- /dev/null +++ b/server/src/db/gc.rs @@ -0,0 +1,342 @@ +//! Content-store garbage collection: TransitionFired subscriber and startup sweep. +//! +//! When a pipeline item reaches a terminal stage (Done, Archived, Abandoned, +//! Superseded, Rejected) every `ContentKey::*` entry for that story is purged +//! from the in-memory content store. There are two purge paths: +//! +//! 1. **Subscriber** ([`spawn_content_gc_subscriber`]) — reacts to +//! [`crate::pipeline_state::TransitionFired`] events and runs for new +//! transitions in the current server session. +//! +//! 2. **Startup sweep** ([`sweep_zombie_content_on_startup`]) — cleans up +//! zombie entries left over from sessions that predate the subscriber. + +use crate::db::{ContentKey, all_content_ids, delete_content}; +use crate::pipeline_state::Stage; +use crate::slog; +use crate::slog_warn; + +/// Purge every [`ContentKey`] variant for `story_id` from the in-memory content store. +/// +/// All eight key namespaces are deleted unconditionally — deletes for absent +/// keys are no-ops. Call this when a work item reaches a terminal stage to +/// prevent long-lived zombie entries from accumulating in the process heap. +pub(crate) fn purge_content_keys_for_story(story_id: &str) { + delete_content(ContentKey::Story(story_id)); + delete_content(ContentKey::GateOutput(story_id)); + delete_content(ContentKey::AbortRespawnCount(story_id)); + delete_content(ContentKey::MergeMasterSpawnCount(story_id)); + delete_content(ContentKey::RunTestsOk(story_id)); + delete_content(ContentKey::CommitRecoveryPending(story_id)); + delete_content(ContentKey::MergeFixupPending(story_id)); + delete_content(ContentKey::MergeFailureKind(story_id)); +} + +/// Spawn a background task that purges content-store entries when a story reaches a terminal stage. +/// +/// Subscribes to [`crate::pipeline_state::subscribe_transitions`]. On each +/// [`crate::pipeline_state::TransitionFired`] where `after` is `Done`, +/// `Archived`, `Abandoned`, `Superseded`, or `Rejected`, all `ContentKey::*` +/// entries for that story are purged. Lag events are logged as warnings — +/// a missed event leaves zombie entries that the next startup sweep will remove. +pub(crate) fn spawn_content_gc_subscriber() { + let mut rx = crate::pipeline_state::subscribe_transitions(); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(fired) => { + if is_terminal_stage(&fired.after) { + let story_id = &fired.story_id.0; + slog!( + "[content-gc] Story '{story_id}' reached terminal stage; \ + purging all content-store entries." + ); + purge_content_keys_for_story(story_id); + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + slog_warn!( + "[content-gc] Subscriber lagged, skipped {n} event(s). \ + Zombie content-store entries will be cleaned by the next startup sweep." + ); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); +} + +/// One-shot startup sweep: purge content-store entries for stories that have +/// already reached terminal stages or are absent from the CRDT. +/// +/// Idempotent — safe to call more than once. Intended to clean up zombie +/// entries left over from server sessions that predate the GC subscriber. +pub(crate) fn sweep_zombie_content_on_startup() { + let raw_keys = all_content_ids(); + + // Extract unique base story IDs from raw content-store keys. + // Raw key formats: `"{id}"` (Story) or `"{id}:{suffix}"` (compound keys). + // Story IDs never contain `:`, so splitting on `:` is unambiguous. + let mut story_ids: Vec = raw_keys + .iter() + .map(|k| { + k.split_once(':') + .map(|(base, _)| base) + .unwrap_or(k) + .to_string() + }) + .collect(); + story_ids.sort(); + story_ids.dedup(); + + let mut swept = 0usize; + for story_id in &story_ids { + let should_purge = match crate::crdt_state::read_item(story_id) { + // Tombstoned or absent from the live CRDT index — purge. + None => true, + Some(item) => is_terminal_stage(item.stage()), + }; + if should_purge { + purge_content_keys_for_story(story_id); + swept += 1; + } + } + + if swept > 0 { + slog!( + "[content-gc] Startup sweep purged content-store entries for \ + {swept} zombie story(s)." + ); + } +} + +/// Return `true` when `stage` is one of the five terminal pipeline stages. +fn is_terminal_stage(stage: &Stage) -> bool { + matches!( + stage, + Stage::Done { .. } + | Stage::Archived { .. } + | Stage::Abandoned { .. } + | Stage::Superseded { .. } + | Stage::Rejected { .. } + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::db::{ + ContentKey, ItemMeta, ensure_content_store, read_content, write_content, + write_item_with_content, + }; + + /// Write all eight ContentKey variants for a story. + fn seed_all_keys(story_id: &str) { + write_content(ContentKey::Story(story_id), "body"); + write_content(ContentKey::GateOutput(story_id), "gate"); + write_content(ContentKey::AbortRespawnCount(story_id), "1"); + write_content(ContentKey::MergeMasterSpawnCount(story_id), "2"); + write_content(ContentKey::RunTestsOk(story_id), "ok"); + write_content(ContentKey::CommitRecoveryPending(story_id), "1"); + write_content(ContentKey::MergeFixupPending(story_id), "1"); + write_content(ContentKey::MergeFailureKind(story_id), r#""GatesFailed""#); + } + + /// Assert all eight ContentKey variants for a story are absent from the store. + fn assert_all_keys_absent(story_id: &str) { + assert!( + read_content(ContentKey::Story(story_id)).is_none(), + "Story key must be absent" + ); + assert!( + read_content(ContentKey::GateOutput(story_id)).is_none(), + "GateOutput key must be absent" + ); + assert!( + read_content(ContentKey::AbortRespawnCount(story_id)).is_none(), + "AbortRespawnCount key must be absent" + ); + assert!( + read_content(ContentKey::MergeMasterSpawnCount(story_id)).is_none(), + "MergeMasterSpawnCount key must be absent" + ); + assert!( + read_content(ContentKey::RunTestsOk(story_id)).is_none(), + "RunTestsOk key must be absent" + ); + assert!( + read_content(ContentKey::CommitRecoveryPending(story_id)).is_none(), + "CommitRecoveryPending key must be absent" + ); + assert!( + read_content(ContentKey::MergeFixupPending(story_id)).is_none(), + "MergeFixupPending key must be absent" + ); + assert!( + read_content(ContentKey::MergeFailureKind(story_id)).is_none(), + "MergeFailureKind key must be absent" + ); + } + + /// AC1: purge_content_keys_for_story removes all eight ContentKey namespaces. + #[test] + fn purge_clears_all_eight_content_key_namespaces() { + ensure_content_store(); + let id = "996_test_purge_all"; + seed_all_keys(id); + + // Verify every key is present before the purge. + assert!(read_content(ContentKey::Story(id)).is_some()); + assert!(read_content(ContentKey::GateOutput(id)).is_some()); + assert!(read_content(ContentKey::AbortRespawnCount(id)).is_some()); + assert!(read_content(ContentKey::MergeMasterSpawnCount(id)).is_some()); + assert!(read_content(ContentKey::RunTestsOk(id)).is_some()); + assert!(read_content(ContentKey::CommitRecoveryPending(id)).is_some()); + assert!(read_content(ContentKey::MergeFixupPending(id)).is_some()); + assert!(read_content(ContentKey::MergeFailureKind(id)).is_some()); + + purge_content_keys_for_story(id); + + assert_all_keys_absent(id); + } + + /// AC1: purge_content_keys_for_story is idempotent — calling it twice on an + /// already-empty store is safe. + #[test] + fn purge_is_idempotent_when_keys_already_absent() { + ensure_content_store(); + let id = "996_test_purge_idempotent"; + + // Both calls must complete without panic. + purge_content_keys_for_story(id); + purge_content_keys_for_story(id); + assert_all_keys_absent(id); + } + + /// AC1 + AC4: the GC subscriber reacts to an Abandoned terminal transition and + /// purges all content-store entries for the story. + #[tokio::test] + async fn subscriber_purges_content_on_terminal_transition() { + crate::crdt_state::init_for_test(); + ensure_content_store(); + + let story_id = "996_test_sub_terminal"; + + write_item_with_content( + story_id, + "2_current", + "---\nname: GC Sub Test\n---\n", + ItemMeta::named("GC Sub Test"), + ); + + seed_all_keys(story_id); + + spawn_content_gc_subscriber(); + + // Transition to Abandoned (a terminal stage). + crate::agents::lifecycle::abandon_story(story_id).expect("abandon must succeed"); + + // Give the subscriber task time to run. + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + + assert_all_keys_absent(story_id); + } + + /// AC4: the subscriber does NOT purge content for stories that remain in + /// active (non-terminal) stages. + #[tokio::test] + async fn subscriber_does_not_purge_active_story_content() { + crate::crdt_state::init_for_test(); + ensure_content_store(); + + let active_id = "996_test_sub_active"; + let terminal_id = "996_test_sub_term_2"; + + for id in [active_id, terminal_id] { + write_item_with_content( + id, + "2_current", + "---\nname: GC Active Test\n---\n", + ItemMeta::named("GC Active Test"), + ); + seed_all_keys(id); + } + + spawn_content_gc_subscriber(); + + // Only terminate one of the two stories. + crate::agents::lifecycle::abandon_story(terminal_id).expect("abandon must succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + + // Terminal story's content must be gone. + assert_all_keys_absent(terminal_id); + + // Active story's main content key must still be present. + assert!( + read_content(ContentKey::Story(active_id)).is_some(), + "active story content must not be purged" + ); + } + + /// AC2: sweep_zombie_content_on_startup purges content for a tombstoned story. + #[test] + fn startup_sweep_purges_tombstoned_story_content() { + crate::crdt_state::init_for_test(); + ensure_content_store(); + + let story_id = "996_test_sweep_tombstone"; + write_item_with_content( + story_id, + "1_backlog", + "---\nname: Sweep Tombstone Test\n---\n", + ItemMeta::named("Sweep Tombstone Test"), + ); + seed_all_keys(story_id); + + // Tombstone the item (evict_item drops only ContentKey::Story). + crate::crdt_state::evict_item(story_id).expect("evict must succeed"); + + // Re-seed all keys to simulate the zombie state evict_item leaves behind. + seed_all_keys(story_id); + + // The startup sweep must purge the remaining zombie keys. + sweep_zombie_content_on_startup(); + + assert_all_keys_absent(story_id); + } + + /// AC2: sweep_zombie_content_on_startup leaves active stories' content intact. + #[test] + fn startup_sweep_preserves_active_story_content() { + crate::crdt_state::init_for_test(); + ensure_content_store(); + + let live_id = "996_test_sweep_live"; + write_item_with_content( + live_id, + "2_current", + "---\nname: Live Story\n---\n", + ItemMeta::named("Live Story"), + ); + write_content(ContentKey::Story(live_id), "live content"); + + sweep_zombie_content_on_startup(); + + assert_eq!( + read_content(ContentKey::Story(live_id)).as_deref(), + Some("live content"), + "active story content must survive the startup sweep" + ); + } + + /// AC2: sweep_zombie_content_on_startup is idempotent. + #[test] + fn startup_sweep_is_idempotent() { + crate::crdt_state::init_for_test(); + ensure_content_store(); + + sweep_zombie_content_on_startup(); + sweep_zombie_content_on_startup(); + } +} diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index cbfe1479..d3c22ed6 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -15,6 +15,8 @@ /// On startup, existing content is loaded from the database into memory so /// no filesystem scan is needed after migration. pub mod content_store; +/// Content-store garbage collection: TransitionFired subscriber and startup sweep. +pub mod gc; /// Write operations for the pipeline — content, stage transitions, and deletions. pub mod ops; /// Recovery for half-written pipeline items (bug 1001 backfill). diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index 96f1ae28..eec54053 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -65,6 +65,11 @@ pub(crate) fn spawn_event_bridges( root.clone(), ); + // Content-store GC subscriber: purges all ContentKey::* entries for a + // story when it reaches a terminal stage, preventing zombie entries from + // accumulating in the process heap (story 996). + crate::db::gc::spawn_content_gc_subscriber(); + let watcher_auto_rx = watcher_tx.subscribe(); let watcher_auto_agents = Arc::clone(&agents); tokio::spawn(async move { @@ -200,6 +205,9 @@ pub(crate) fn spawn_startup_reconciliation( ) { if let Some(root) = startup_root { tokio::spawn(async move { + // Purge content-store entries for stories that reached terminal + // stages in a previous session (before the GC subscriber was active). + crate::db::gc::sweep_zombie_content_on_startup(); crate::slog!("[startup] Reconciling completed worktrees from previous session."); startup_agents .reconcile_on_startup(&root, &startup_reconciliation_tx)