huskies: merge 1019

This commit is contained in:
dave
2026-05-14 08:48:11 +00:00
parent ebf58ef224
commit e3f5875b8e
6 changed files with 157 additions and 61 deletions
+2
View File
@@ -20,6 +20,8 @@ mod events;
mod sweep;
pub use events::WatcherEvent;
pub(crate) use sweep::spawn_done_to_archived_subscriber;
#[cfg(test)]
pub(crate) use sweep::sweep_done_to_archived;
use crate::slog;
+68 -1
View File
@@ -1,12 +1,75 @@
//! Periodic sweep of completed work items from `done` to `archived`.
//! Sweep and reactive subscriber for promoting `done` items to `archived`.
//!
//! Items in `Stage::Done` whose `merged_at` timestamp exceeds the configured
//! retention duration are promoted to `Stage::Archived` via the canonical
//! pipeline state machine (story 934, stage 5).
//!
//! `sweep_done_to_archived` is the synchronous one-shot sweep (used in tests
//! and for backwards-compat call sites). `spawn_done_to_archived_subscriber`
//! is the reactive replacement for the tick-loop periodic scan.
use crate::slog;
use crate::slog_warn;
use std::time::Duration;
/// Spawn a subscriber that archives each `Stage::Done` story after the
/// configured retention period expires.
///
/// Subscribes to the pipeline [`TransitionFired`][crate::pipeline_state::TransitionFired]
/// broadcast channel. On each `Stage::Done` transition, spawns a short-lived
/// task that sleeps for the remaining retention time (computed from the story's
/// `merged_at` timestamp) and then calls
/// [`apply_transition`][crate::pipeline_state::apply_transition] with
/// `PipelineEvent::Accepted` to move it to `Stage::Archived`.
///
/// Using `merged_at` rather than a fixed sleep means the subscriber correctly
/// handles stories that have been in `Done` for hours before a server restart:
/// the computed remaining time will be small (or zero), so archival happens
/// promptly rather than waiting another full retention period.
///
/// Replaces the periodic `sweep_done_to_archived` call from the tick loop.
pub(crate) fn spawn_done_to_archived_subscriber(done_retention: Duration) {
use crate::pipeline_state::{PipelineEvent, Stage, apply_transition, subscribe_transitions};
let mut rx = subscribe_transitions();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(fired) => {
if let Stage::Done { merged_at, .. } = fired.after {
let story_id = fired.story_id.0.clone();
let retention = done_retention;
tokio::spawn(async move {
let age = chrono::Utc::now()
.signed_duration_since(merged_at)
.to_std()
.unwrap_or_default();
if age < retention {
tokio::time::sleep(retention - age).await;
}
match apply_transition(&story_id, PipelineEvent::Accepted, None) {
Ok(_) => {
slog!("[watcher] sweep: promoted {story_id} → archived")
}
Err(e) => {
slog!("[watcher] sweep: transition error for {story_id}: {e}")
}
}
});
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
slog_warn!(
"[done-archive-sub] Lagged, skipped {n} event(s); some Done stories \
may not auto-archive."
);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
}
/// Sweep items in `Stage::Done` whose `merged_at` timestamp exceeds the
/// retention duration to `Stage::Archived` via the typed transition table.
///
@@ -14,6 +77,10 @@ use std::time::Duration;
/// `Done + Accepted → Archived` transition is validated and a
/// `TransitionFired` event is emitted to subscribers (worktree pruning,
/// matrix notifier, etc.).
///
/// Used in tests for direct one-shot sweeps; production code uses
/// [`spawn_done_to_archived_subscriber`] instead.
#[cfg(test)]
pub(crate) fn sweep_done_to_archived(done_retention: Duration) {
use crate::pipeline_state::{PipelineEvent, Stage, apply_transition, read_all_typed};
+42
View File
@@ -259,6 +259,48 @@ fn sweep_uses_crdt_merged_at_not_utc_now() {
);
}
// ── spawn_done_to_archived_subscriber (reactive) ─────────────────────────
//
// AC4: tests that the TransitionFired subscriber archives Done stories
// on transition rather than on a periodic tick.
/// Moving a story to Done fires the subscriber; with zero retention the
/// subscriber archives it immediately.
#[tokio::test]
async fn done_to_archived_subscriber_archives_on_transition() {
crate::crdt_state::init_for_test();
crate::db::ensure_content_store();
let story_id = "9886_sub_archive_reactive";
crate::db::write_item_with_content(
story_id,
"1_backlog",
"---\nname: Reactive archive test\n---\n",
crate::db::ItemMeta::named("Reactive archive test"),
);
// Zero retention: archive immediately after the Done transition.
spawn_done_to_archived_subscriber(Duration::ZERO);
// Trigger Done via Close event (valid from Backlog → Done).
crate::pipeline_state::apply_transition(
story_id,
crate::pipeline_state::PipelineEvent::Close,
None,
)
.expect("Close transition must succeed from Backlog");
// Give the subscriber task time to process and archive.
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
let items = crate::pipeline_state::read_all_typed();
let item = items.iter().find(|i| i.story_id.0 == story_id);
assert!(
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })),
"story should be archived after Done transition with zero retention"
);
}
/// Prove that an item with merged_at NEWER than done_retention is NOT swept.
#[test]
fn sweep_keeps_item_newer_than_retention() {
+29 -36
View File
@@ -81,6 +81,18 @@ pub(crate) fn spawn_event_bridges(
// in-memory register whenever a story reaches a terminal stage.
crate::agents::pool::cost_rollup_subscriber::spawn_cost_rollup_subscriber(root.clone());
// Done→archived subscriber: archives Done stories after the configured
// retention period. Fires on each Stage::Done TransitionFired event and
// sleeps for the remaining retention time from merged_at before archiving.
// Replaces the periodic sweep_done_to_archived scan that ran on every
// sweep_interval_secs tick.
{
let done_retention = config::ProjectConfig::load(&root)
.map(|c| std::time::Duration::from_secs(c.watcher.done_retention_secs))
.unwrap_or_else(|_| std::time::Duration::from_secs(4 * 3600));
io::watcher::spawn_done_to_archived_subscriber(done_retention);
}
let watcher_auto_rx = watcher_tx.subscribe();
let watcher_auto_agents = Arc::clone(&agents);
tokio::spawn(async move {
@@ -100,28 +112,23 @@ pub(crate) fn spawn_event_bridges(
/// Spawn the unified 1-second background tick loop.
///
/// Fires due timers, runs the agent watchdog every 30 ticks, promotes
/// done→archived items every `sweep_interval_secs` ticks, and removes
/// orphaned worktrees every `worktree_sweep_interval_secs` ticks (default
/// 1200, i.e. 20 minutes).
/// Handles only genuinely time-based work:
/// - **Timer tick** (every second): fires due pipeline timers by comparing the
/// current wall-clock time against each timer's `due_at` field. Cannot be
/// reactive because timers encode absolute timestamps that only become
/// actionable when the clock reaches them.
/// - **Agent watchdog** (every 30 seconds): detects orphaned `Running` agents
/// by comparing elapsed time since the last heartbeat. Cannot be reactive
/// because the absence of an event (no heartbeat) is what signals a problem;
/// a `TransitionFired` subscriber would never fire for a silently crashed agent.
///
/// Stage-change-reactive work (done→archived archival, worktree cleanup) has
/// been moved to `TransitionFired` subscribers spawned from `spawn_event_bridges`.
pub(crate) fn spawn_tick_loop(
agents: Arc<AgentPool>,
timer_store: Arc<service::timer::TimerStore>,
root: Option<PathBuf>,
) {
let project_cfg = root
.as_ref()
.and_then(|r| config::ProjectConfig::load(r).ok());
let sweep_cfg = project_cfg
.as_ref()
.map(|c| c.watcher.clone())
.unwrap_or_default();
let sweep_every = sweep_cfg.sweep_interval_secs.max(1);
let done_retention = std::time::Duration::from_secs(sweep_cfg.done_retention_secs);
// Capture config for the worktree sweep (read once at startup).
let worktree_sweep_config = project_cfg.unwrap_or_default();
// Worktree orphan sweep: every 20 minutes by default.
let worktree_sweep_every: u64 = 1200;
let pending_count = timer_store.list().len();
crate::slog!("[tick] Unified tick loop started; {pending_count} pending timer(s)");
@@ -132,7 +139,8 @@ pub(crate) fn spawn_tick_loop(
interval.tick().await;
tick_count = tick_count.wrapping_add(1);
// Timer: fire due timers every second.
// Time-based: timers encode absolute due timestamps; only a
// wall-clock comparison can determine when one is due.
if let Some(ref r) = root {
let result = service::timer::tick_once(&timer_store, &agents, r).await;
if let Err(msg) = result {
@@ -140,8 +148,9 @@ pub(crate) fn spawn_tick_loop(
}
}
// Watchdog: detect orphaned Running agents every 30 ticks.
// Also reap stale Running merge_jobs from previous server instances.
// Time-based: the watchdog detects silence (no heartbeat within a
// timeout window). A `TransitionFired` subscriber cannot observe the
// absence of events, so this must remain on a periodic tick.
if tick_count.is_multiple_of(30) {
let found = agents.run_watchdog_pass(root.as_deref());
if found > 0 {
@@ -154,22 +163,6 @@ pub(crate) fn spawn_tick_loop(
}
agents.reap_stale_merge_jobs();
}
// Sweep: promote done→archived every sweep_interval_secs ticks.
if tick_count.is_multiple_of(sweep_every) {
io::watcher::sweep_done_to_archived(done_retention);
}
// Worktree orphan sweep: remove worktrees for done/archived/absent stories.
if tick_count.is_multiple_of(worktree_sweep_every)
&& let Some(ref r) = root
{
let removed =
crate::worktree::sweep_orphaned_worktrees(r, &worktree_sweep_config).await;
if removed > 0 {
crate::slog!("[worktree-sweep] Removed {removed} orphaned worktree(s).");
}
}
}
});
}
-1
View File
@@ -13,7 +13,6 @@ pub use create::install_pre_commit_hook;
pub(crate) use git::detect_base_branch;
pub use git::migrate_slug_paths;
pub use remove::remove_worktree_by_story_id;
pub use sweep::sweep_orphaned_worktrees;
#[derive(Debug, Clone)]
/// Details about a newly created worktree: path, branch, and base branch.
+16 -23
View File
@@ -1,14 +1,20 @@
//! Periodic orphan sweep — removes worktrees whose stories are done, archived,
//! or absent from the CRDT.
//! Worktree orphan sweep helpers.
//!
//! `worktree_should_be_swept` and `sweep_with_lookup` are test helpers used
//! to verify that the reactive worktree-cleanup subscriber (`spawn_worktree_cleanup_subscriber`)
//! correctly identifies which stages warrant worktree removal. Production
//! worktree cleanup is driven by `TransitionFired` events in
//! `agents::pool::worktree_lifecycle`, not by periodic scanning.
#[cfg(test)]
use super::{list_worktrees, remove_worktree_by_story_id};
#[cfg(test)]
use crate::config::ProjectConfig;
use crate::pipeline_state::{Stage, read_typed};
use crate::pipeline_state::Stage;
#[cfg(test)]
use std::path::Path;
use super::{list_worktrees, remove_worktree_by_story_id};
/// Returns `true` if a worktree for the given pipeline stage should be removed
/// by the orphan sweep.
/// Returns `true` if a worktree for the given pipeline stage should be removed.
///
/// A worktree is swept when its story is `Done`, `Archived`, or not present in
/// the CRDT at all (i.e. `stage` is `None`). Active stages (`Backlog`,
@@ -27,26 +33,13 @@ pub fn worktree_should_be_swept(stage: Option<&Stage>) -> bool {
}
}
/// Remove orphaned worktrees whose stories are done, archived, or absent from
/// the CRDT.
///
/// Walks `.huskies/worktrees/`, checks each story's stage via `lookup`, and
/// calls [`remove_worktree_by_story_id`] for any that should be swept.
/// Failures are logged individually; the sweep continues regardless.
///
/// Returns the number of worktrees successfully removed.
pub async fn sweep_orphaned_worktrees(project_root: &Path, config: &ProjectConfig) -> usize {
sweep_with_lookup(project_root, config, |story_id| {
read_typed(story_id).ok().flatten().map(|item| item.stage)
})
.await
}
/// Internal sweep implementation that accepts a custom CRDT lookup function.
/// Internal sweep implementation for tests: walks worktrees and removes those
/// whose stories are in a terminal stage according to the provided `lookup`.
///
/// Accepts a `lookup` closure `fn(&str) -> Option<Stage>` that returns the
/// stage for a given story ID, or `None` if the story is not in the CRDT.
/// This indirection makes the sweep testable without a real CRDT.
#[cfg(test)]
pub(crate) async fn sweep_with_lookup<F>(
project_root: &Path,
config: &ProjectConfig,