From e3f5875b8e14709fb82fe9c2c74bede6da3b2962 Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 14 May 2026 08:48:11 +0000 Subject: [PATCH] huskies: merge 1019 --- server/src/io/watcher/mod.rs | 2 + server/src/io/watcher/sweep.rs | 69 ++++++++++++++++++++++++++++++++- server/src/io/watcher/tests.rs | 42 ++++++++++++++++++++ server/src/startup/tick_loop.rs | 65 ++++++++++++++----------------- server/src/worktree/mod.rs | 1 - server/src/worktree/sweep.rs | 39 ++++++++----------- 6 files changed, 157 insertions(+), 61 deletions(-) diff --git a/server/src/io/watcher/mod.rs b/server/src/io/watcher/mod.rs index 04e241fa..04ad3d7b 100644 --- a/server/src/io/watcher/mod.rs +++ b/server/src/io/watcher/mod.rs @@ -20,6 +20,8 @@ mod events; mod sweep; pub use events::WatcherEvent; +pub(crate) use sweep::spawn_done_to_archived_subscriber; +#[cfg(test)] pub(crate) use sweep::sweep_done_to_archived; use crate::slog; diff --git a/server/src/io/watcher/sweep.rs b/server/src/io/watcher/sweep.rs index b8a7f0bd..151babaa 100644 --- a/server/src/io/watcher/sweep.rs +++ b/server/src/io/watcher/sweep.rs @@ -1,12 +1,75 @@ -//! Periodic sweep of completed work items from `done` to `archived`. +//! Sweep and reactive subscriber for promoting `done` items to `archived`. //! //! Items in `Stage::Done` whose `merged_at` timestamp exceeds the configured //! retention duration are promoted to `Stage::Archived` via the canonical //! pipeline state machine (story 934, stage 5). +//! +//! `sweep_done_to_archived` is the synchronous one-shot sweep (used in tests +//! and for backwards-compat call sites). `spawn_done_to_archived_subscriber` +//! is the reactive replacement for the tick-loop periodic scan. use crate::slog; +use crate::slog_warn; use std::time::Duration; +/// Spawn a subscriber that archives each `Stage::Done` story after the +/// configured retention period expires. +/// +/// Subscribes to the pipeline [`TransitionFired`][crate::pipeline_state::TransitionFired] +/// broadcast channel. On each `Stage::Done` transition, spawns a short-lived +/// task that sleeps for the remaining retention time (computed from the story's +/// `merged_at` timestamp) and then calls +/// [`apply_transition`][crate::pipeline_state::apply_transition] with +/// `PipelineEvent::Accepted` to move it to `Stage::Archived`. +/// +/// Using `merged_at` rather than a fixed sleep means the subscriber correctly +/// handles stories that have been in `Done` for hours before a server restart: +/// the computed remaining time will be small (or zero), so archival happens +/// promptly rather than waiting another full retention period. +/// +/// Replaces the periodic `sweep_done_to_archived` call from the tick loop. +pub(crate) fn spawn_done_to_archived_subscriber(done_retention: Duration) { + use crate::pipeline_state::{PipelineEvent, Stage, apply_transition, subscribe_transitions}; + + let mut rx = subscribe_transitions(); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(fired) => { + if let Stage::Done { merged_at, .. } = fired.after { + let story_id = fired.story_id.0.clone(); + let retention = done_retention; + tokio::spawn(async move { + let age = chrono::Utc::now() + .signed_duration_since(merged_at) + .to_std() + .unwrap_or_default(); + if age < retention { + tokio::time::sleep(retention - age).await; + } + match apply_transition(&story_id, PipelineEvent::Accepted, None) { + Ok(_) => { + slog!("[watcher] sweep: promoted {story_id} → archived") + } + Err(e) => { + slog!("[watcher] sweep: transition error for {story_id}: {e}") + } + } + }); + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + slog_warn!( + "[done-archive-sub] Lagged, skipped {n} event(s); some Done stories \ + may not auto-archive." + ); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); +} + /// Sweep items in `Stage::Done` whose `merged_at` timestamp exceeds the /// retention duration to `Stage::Archived` via the typed transition table. /// @@ -14,6 +77,10 @@ use std::time::Duration; /// `Done + Accepted → Archived` transition is validated and a /// `TransitionFired` event is emitted to subscribers (worktree pruning, /// matrix notifier, etc.). +/// +/// Used in tests for direct one-shot sweeps; production code uses +/// [`spawn_done_to_archived_subscriber`] instead. +#[cfg(test)] pub(crate) fn sweep_done_to_archived(done_retention: Duration) { use crate::pipeline_state::{PipelineEvent, Stage, apply_transition, read_all_typed}; diff --git a/server/src/io/watcher/tests.rs b/server/src/io/watcher/tests.rs index 64b3d0b6..a4f4b5f8 100644 --- a/server/src/io/watcher/tests.rs +++ b/server/src/io/watcher/tests.rs @@ -259,6 +259,48 @@ fn sweep_uses_crdt_merged_at_not_utc_now() { ); } +// ── spawn_done_to_archived_subscriber (reactive) ───────────────────────── +// +// AC4: tests that the TransitionFired subscriber archives Done stories +// on transition rather than on a periodic tick. + +/// Moving a story to Done fires the subscriber; with zero retention the +/// subscriber archives it immediately. +#[tokio::test] +async fn done_to_archived_subscriber_archives_on_transition() { + crate::crdt_state::init_for_test(); + crate::db::ensure_content_store(); + + let story_id = "9886_sub_archive_reactive"; + crate::db::write_item_with_content( + story_id, + "1_backlog", + "---\nname: Reactive archive test\n---\n", + crate::db::ItemMeta::named("Reactive archive test"), + ); + + // Zero retention: archive immediately after the Done transition. + spawn_done_to_archived_subscriber(Duration::ZERO); + + // Trigger Done via Close event (valid from Backlog → Done). + crate::pipeline_state::apply_transition( + story_id, + crate::pipeline_state::PipelineEvent::Close, + None, + ) + .expect("Close transition must succeed from Backlog"); + + // Give the subscriber task time to process and archive. + tokio::time::sleep(std::time::Duration::from_millis(300)).await; + + let items = crate::pipeline_state::read_all_typed(); + let item = items.iter().find(|i| i.story_id.0 == story_id); + assert!( + item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })), + "story should be archived after Done transition with zero retention" + ); +} + /// Prove that an item with merged_at NEWER than done_retention is NOT swept. #[test] fn sweep_keeps_item_newer_than_retention() { diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index 55b7a714..c128d36d 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -81,6 +81,18 @@ pub(crate) fn spawn_event_bridges( // in-memory register whenever a story reaches a terminal stage. crate::agents::pool::cost_rollup_subscriber::spawn_cost_rollup_subscriber(root.clone()); + // Done→archived subscriber: archives Done stories after the configured + // retention period. Fires on each Stage::Done TransitionFired event and + // sleeps for the remaining retention time from merged_at before archiving. + // Replaces the periodic sweep_done_to_archived scan that ran on every + // sweep_interval_secs tick. + { + let done_retention = config::ProjectConfig::load(&root) + .map(|c| std::time::Duration::from_secs(c.watcher.done_retention_secs)) + .unwrap_or_else(|_| std::time::Duration::from_secs(4 * 3600)); + io::watcher::spawn_done_to_archived_subscriber(done_retention); + } + let watcher_auto_rx = watcher_tx.subscribe(); let watcher_auto_agents = Arc::clone(&agents); tokio::spawn(async move { @@ -100,28 +112,23 @@ pub(crate) fn spawn_event_bridges( /// Spawn the unified 1-second background tick loop. /// -/// Fires due timers, runs the agent watchdog every 30 ticks, promotes -/// done→archived items every `sweep_interval_secs` ticks, and removes -/// orphaned worktrees every `worktree_sweep_interval_secs` ticks (default -/// 1200, i.e. 20 minutes). +/// Handles only genuinely time-based work: +/// - **Timer tick** (every second): fires due pipeline timers by comparing the +/// current wall-clock time against each timer's `due_at` field. Cannot be +/// reactive because timers encode absolute timestamps that only become +/// actionable when the clock reaches them. +/// - **Agent watchdog** (every 30 seconds): detects orphaned `Running` agents +/// by comparing elapsed time since the last heartbeat. Cannot be reactive +/// because the absence of an event (no heartbeat) is what signals a problem; +/// a `TransitionFired` subscriber would never fire for a silently crashed agent. +/// +/// Stage-change-reactive work (done→archived archival, worktree cleanup) has +/// been moved to `TransitionFired` subscribers spawned from `spawn_event_bridges`. pub(crate) fn spawn_tick_loop( agents: Arc, timer_store: Arc, root: Option, ) { - let project_cfg = root - .as_ref() - .and_then(|r| config::ProjectConfig::load(r).ok()); - let sweep_cfg = project_cfg - .as_ref() - .map(|c| c.watcher.clone()) - .unwrap_or_default(); - let sweep_every = sweep_cfg.sweep_interval_secs.max(1); - let done_retention = std::time::Duration::from_secs(sweep_cfg.done_retention_secs); - // Capture config for the worktree sweep (read once at startup). - let worktree_sweep_config = project_cfg.unwrap_or_default(); - // Worktree orphan sweep: every 20 minutes by default. - let worktree_sweep_every: u64 = 1200; let pending_count = timer_store.list().len(); crate::slog!("[tick] Unified tick loop started; {pending_count} pending timer(s)"); @@ -132,7 +139,8 @@ pub(crate) fn spawn_tick_loop( interval.tick().await; tick_count = tick_count.wrapping_add(1); - // Timer: fire due timers every second. + // Time-based: timers encode absolute due timestamps; only a + // wall-clock comparison can determine when one is due. if let Some(ref r) = root { let result = service::timer::tick_once(&timer_store, &agents, r).await; if let Err(msg) = result { @@ -140,8 +148,9 @@ pub(crate) fn spawn_tick_loop( } } - // Watchdog: detect orphaned Running agents every 30 ticks. - // Also reap stale Running merge_jobs from previous server instances. + // Time-based: the watchdog detects silence (no heartbeat within a + // timeout window). A `TransitionFired` subscriber cannot observe the + // absence of events, so this must remain on a periodic tick. if tick_count.is_multiple_of(30) { let found = agents.run_watchdog_pass(root.as_deref()); if found > 0 { @@ -154,22 +163,6 @@ pub(crate) fn spawn_tick_loop( } agents.reap_stale_merge_jobs(); } - - // Sweep: promote done→archived every sweep_interval_secs ticks. - if tick_count.is_multiple_of(sweep_every) { - io::watcher::sweep_done_to_archived(done_retention); - } - - // Worktree orphan sweep: remove worktrees for done/archived/absent stories. - if tick_count.is_multiple_of(worktree_sweep_every) - && let Some(ref r) = root - { - let removed = - crate::worktree::sweep_orphaned_worktrees(r, &worktree_sweep_config).await; - if removed > 0 { - crate::slog!("[worktree-sweep] Removed {removed} orphaned worktree(s)."); - } - } } }); } diff --git a/server/src/worktree/mod.rs b/server/src/worktree/mod.rs index d7fa2d8c..bf09522c 100644 --- a/server/src/worktree/mod.rs +++ b/server/src/worktree/mod.rs @@ -13,7 +13,6 @@ pub use create::install_pre_commit_hook; pub(crate) use git::detect_base_branch; pub use git::migrate_slug_paths; pub use remove::remove_worktree_by_story_id; -pub use sweep::sweep_orphaned_worktrees; #[derive(Debug, Clone)] /// Details about a newly created worktree: path, branch, and base branch. diff --git a/server/src/worktree/sweep.rs b/server/src/worktree/sweep.rs index 4a4c50e6..58dd71c1 100644 --- a/server/src/worktree/sweep.rs +++ b/server/src/worktree/sweep.rs @@ -1,14 +1,20 @@ -//! Periodic orphan sweep — removes worktrees whose stories are done, archived, -//! or absent from the CRDT. +//! Worktree orphan sweep helpers. +//! +//! `worktree_should_be_swept` and `sweep_with_lookup` are test helpers used +//! to verify that the reactive worktree-cleanup subscriber (`spawn_worktree_cleanup_subscriber`) +//! correctly identifies which stages warrant worktree removal. Production +//! worktree cleanup is driven by `TransitionFired` events in +//! `agents::pool::worktree_lifecycle`, not by periodic scanning. +#[cfg(test)] +use super::{list_worktrees, remove_worktree_by_story_id}; +#[cfg(test)] use crate::config::ProjectConfig; -use crate::pipeline_state::{Stage, read_typed}; +use crate::pipeline_state::Stage; +#[cfg(test)] use std::path::Path; -use super::{list_worktrees, remove_worktree_by_story_id}; - -/// Returns `true` if a worktree for the given pipeline stage should be removed -/// by the orphan sweep. +/// Returns `true` if a worktree for the given pipeline stage should be removed. /// /// A worktree is swept when its story is `Done`, `Archived`, or not present in /// the CRDT at all (i.e. `stage` is `None`). Active stages (`Backlog`, @@ -27,26 +33,13 @@ pub fn worktree_should_be_swept(stage: Option<&Stage>) -> bool { } } -/// Remove orphaned worktrees whose stories are done, archived, or absent from -/// the CRDT. -/// -/// Walks `.huskies/worktrees/`, checks each story's stage via `lookup`, and -/// calls [`remove_worktree_by_story_id`] for any that should be swept. -/// Failures are logged individually; the sweep continues regardless. -/// -/// Returns the number of worktrees successfully removed. -pub async fn sweep_orphaned_worktrees(project_root: &Path, config: &ProjectConfig) -> usize { - sweep_with_lookup(project_root, config, |story_id| { - read_typed(story_id).ok().flatten().map(|item| item.stage) - }) - .await -} - -/// Internal sweep implementation that accepts a custom CRDT lookup function. +/// Internal sweep implementation for tests: walks worktrees and removes those +/// whose stories are in a terminal stage according to the provided `lookup`. /// /// Accepts a `lookup` closure `fn(&str) -> Option` that returns the /// stage for a given story ID, or `None` if the story is not in the CRDT. /// This indirection makes the sweep testable without a real CRDT. +#[cfg(test)] pub(crate) async fn sweep_with_lookup( project_root: &Path, config: &ProjectConfig,