From bb6a6063e8d1e01aed95a605ccd436f36ca01c13 Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 14 May 2026 23:39:56 +0000 Subject: [PATCH] huskies: merge 1066 --- server/src/agent_mode/mod.rs | 11 +- .../agents/pool/auto_assign/auto_assign.rs | 21 +- .../merge_failure_block_subscriber.rs | 9 + .../auto_assign/merge_failure_subscriber.rs | 24 ++ server/src/agents/pool/auto_assign/mod.rs | 4 + .../src/agents/pool/cost_rollup_subscriber.rs | 9 + server/src/agents/pool/worktree_lifecycle.rs | 32 +++ server/src/config/mod.rs | 11 + server/src/io/watcher/mod.rs | 1 - server/src/io/watcher/sweep.rs | 7 +- server/src/pipeline_state/events.rs | 80 ------ server/src/pipeline_state/mod.rs | 6 +- server/src/pipeline_state/subscribers.rs | 8 + server/src/service/settings/project.rs | 1 + server/src/startup/tick_loop.rs | 257 ++++++++++++++++-- 15 files changed, 361 insertions(+), 120 deletions(-) diff --git a/server/src/agent_mode/mod.rs b/server/src/agent_mode/mod.rs index d533cb23..8bef0a18 100644 --- a/server/src/agent_mode/mod.rs +++ b/server/src/agent_mode/mod.rs @@ -198,10 +198,13 @@ pub async fn run( ) }; - // Replay current pipeline state so subscribers (worktree lifecycle, merge-failure - // auto-spawn) react to any stories already in active stages, then auto-assign. - slog!("[agent-mode] Replaying current pipeline state."); - crate::pipeline_state::replay_current_pipeline_state(); + // Reconcile subscriber side effects for the current CRDT state without + // flooding the broadcast channel (replaces the former replay_current_pipeline_state call). + slog!("[agent-mode] Running startup reconcile pass."); + let done_retention = crate::config::ProjectConfig::load(&project_root) + .map(|c| std::time::Duration::from_secs(c.watcher.done_retention_secs)) + .unwrap_or_else(|_| std::time::Duration::from_secs(4 * 3600)); + crate::startup::tick_loop::run_reconcile_pass(&project_root, &agents, done_retention).await; // Run initial auto-assign. slog!("[agent-mode] Initial auto-assign scan."); diff --git a/server/src/agents/pool/auto_assign/auto_assign.rs b/server/src/agents/pool/auto_assign/auto_assign.rs index 43e48c00..ba2fa7b2 100644 --- a/server/src/agents/pool/auto_assign/auto_assign.rs +++ b/server/src/agents/pool/auto_assign/auto_assign.rs @@ -569,14 +569,15 @@ mod tests { ); } - // ── AC4: startup event replay + pool reconstruction ────────────────── + // ── AC4: startup reconcile + pool reconstruction ────────────────── /// AC4: Simulates a server restart by seeding the CRDT with a story in - /// Coding stage, calling `replay_current_pipeline_state` (the new startup - /// path), then `auto_assign_available_work`. Asserts the pool ends in the - /// expected state: exactly one agent assigned to the story. + /// Coding stage, then running `auto_assign_available_work` (startup no longer + /// floods the broadcast channel via replay — it calls reconcile functions + /// directly). Asserts the pool ends in the expected state: exactly one agent + /// assigned to the story, and a second pass does not double-spawn. #[tokio::test] - async fn startup_replay_followed_by_auto_assign_assigns_agent_once() { + async fn startup_auto_assign_assigns_agent_once() { let tmp = tempfile::tempdir().unwrap(); let sk = tmp.path().join(".huskies"); std::fs::create_dir_all(&sk).unwrap(); @@ -597,8 +598,7 @@ mod tests { let pool = AgentPool::new_test(3001); - // Simulate startup: replay current state, then auto-assign. - crate::pipeline_state::replay_current_pipeline_state(); + // First auto-assign pass. pool.auto_assign_available_work(tmp.path()).await; let count_after_first = { @@ -612,8 +612,7 @@ mod tests { .count() }; - // AC3 (idempotency): replaying twice must not double-spawn agents. - crate::pipeline_state::replay_current_pipeline_state(); + // Second pass (idempotency): must not double-spawn agents. pool.auto_assign_available_work(tmp.path()).await; let count_after_second = { @@ -629,11 +628,11 @@ mod tests { assert!( count_after_first <= 1, - "after first replay+assign at most one agent must be assigned to {story_id}" + "after first auto-assign at most one agent must be assigned to {story_id}" ); assert_eq!( count_after_first, count_after_second, - "second replay must not spawn additional agents (idempotency)" + "second auto-assign must not spawn additional agents (idempotency)" ); } } diff --git a/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs b/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs index 623d366f..018a2c74 100644 --- a/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs +++ b/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs @@ -21,6 +21,15 @@ use super::super::super::PipelineStage; use super::super::AgentPool; use super::scan::is_story_assigned_for_stage; +/// Reconcile: no-op for the merge-failure block subscriber. +/// +/// The block subscriber maintains an in-memory per-story consecutive-failure counter +/// that cannot be reconstructed from CRDT state alone (only the current stage is +/// stored, not the history of how many times each story failed). Eventual consistency +/// is guaranteed by the live subscriber reacting to each new `MergeFailure` event; +/// the periodic reconciler cannot add value here without risking spurious blocks. +pub(crate) fn reconcile_merge_failure_block() {} + /// Spawn a background task that blocks stories after N consecutive `MergeFailure` transitions. /// /// Subscribes to the pipeline transition broadcast channel and tracks a per-story diff --git a/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs b/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs index ccc2175e..6a36dd4e 100644 --- a/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs +++ b/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs @@ -17,6 +17,30 @@ use super::super::super::PipelineStage; use super::super::AgentPool; use super::scan::{find_free_agent_for_stage, is_story_assigned_for_stage}; +/// Reconcile: for each story currently in `MergeFailure { kind: ConflictDetected }`, +/// ensure a mergemaster agent is running. +/// +/// Idempotent — `on_merge_failure_transition` guards against double-spawning via +/// `is_story_assigned_for_stage`. Called by the periodic reconciler so that a Lagged +/// startup event never leaves a ConflictDetected story without a recovery agent. +pub(crate) async fn reconcile_merge_failure(pool: &Arc, project_root: &Path) { + use crate::pipeline_state::{MergeFailureKind, PipelineEvent, Stage, TransitionFired}; + for item in crate::pipeline_state::read_all_typed() { + if let Stage::MergeFailure { ref kind, .. } = item.stage + && matches!(kind, MergeFailureKind::ConflictDetected(_)) + { + let fired = TransitionFired { + story_id: item.story_id.clone(), + before: item.stage.clone(), + after: item.stage.clone(), + event: PipelineEvent::MergeFailed { kind: kind.clone() }, + at: chrono::Utc::now(), + }; + on_merge_failure_transition(pool, project_root, &fired).await; + } + } +} + /// Spawn a background task that auto-spawns mergemaster agents on /// `Stage::MergeFailure { kind: ConflictDetected(_) }` transitions. /// diff --git a/server/src/agents/pool/auto_assign/mod.rs b/server/src/agents/pool/auto_assign/mod.rs index 943206c6..0082b790 100644 --- a/server/src/agents/pool/auto_assign/mod.rs +++ b/server/src/agents/pool/auto_assign/mod.rs @@ -17,7 +17,11 @@ pub(crate) mod watchdog; // so that pool::lifecycle and pool::pipeline continue to access them unchanged. pub(super) use scan::{find_free_agent_for_stage, is_agent_free}; +/// Re-export for `startup::tick_loop`. +pub(crate) use merge_failure_block_subscriber::reconcile_merge_failure_block; /// Re-export for `startup::tick_loop`. pub(crate) use merge_failure_block_subscriber::spawn_merge_failure_block_subscriber; /// Re-export for `startup::tick_loop`. +pub(crate) use merge_failure_subscriber::reconcile_merge_failure; +/// Re-export for `startup::tick_loop`. pub(crate) use merge_failure_subscriber::spawn_merge_failure_subscriber; diff --git a/server/src/agents/pool/cost_rollup_subscriber.rs b/server/src/agents/pool/cost_rollup_subscriber.rs index 7d496518..21a5119e 100644 --- a/server/src/agents/pool/cost_rollup_subscriber.rs +++ b/server/src/agents/pool/cost_rollup_subscriber.rs @@ -13,6 +13,15 @@ use crate::pipeline_state::Stage; use crate::slog; use crate::slog_warn; +/// Reconcile: re-populate the CostRollup register from disk for all known stories. +/// +/// Idempotent — `init_from_disk` scans all existing token-usage JSONL files and +/// overwrites the in-memory register. Called by the periodic reconciler so that +/// a Lagged event can never leave a story with a stale or absent cost entry. +pub(crate) fn reconcile_cost_rollup(project_root: &Path) { + crate::service::agents::cost_rollup::init_from_disk(project_root); +} + /// Spawn a background task that maintains the CostRollup register. /// /// On every terminal stage transition (Done, Archived, Abandoned, Superseded, diff --git a/server/src/agents/pool/worktree_lifecycle.rs b/server/src/agents/pool/worktree_lifecycle.rs index 8dbe1070..d5bc1051 100644 --- a/server/src/agents/pool/worktree_lifecycle.rs +++ b/server/src/agents/pool/worktree_lifecycle.rs @@ -72,6 +72,38 @@ pub(crate) fn spawn_worktree_cleanup_subscriber(project_root: PathBuf) { }); } +/// Reconcile worktree creation: for each story currently in `Stage::Coding`, ensure its worktree exists. +/// +/// Idempotent — creates worktrees for Coding stories that have no worktree yet, and is +/// a no-op for stories whose worktree already exists. Called by the periodic reconciler +/// so that Lagged events on the broadcast channel never leave Coding stories without worktrees. +pub(crate) async fn reconcile_worktree_create(project_root: &Path, port: u16) { + for item in crate::pipeline_state::read_all_typed() { + if matches!(item.stage, crate::pipeline_state::Stage::Coding { .. }) { + on_coding_transition(project_root, port, &item.story_id.0).await; + } + } +} + +/// Reconcile worktree cleanup: for each story in a terminal stage, ensure its worktree is removed. +/// +/// Idempotent — removes worktrees for terminal stories that still have one, and is a no-op +/// for stories with no worktree. Called by the periodic reconciler so that Lagged events on +/// the broadcast channel never leave terminal stories with dangling worktrees. +pub(crate) async fn reconcile_worktree_cleanup(project_root: &Path) { + for item in crate::pipeline_state::read_all_typed() { + if matches!( + item.stage, + crate::pipeline_state::Stage::Done { .. } + | crate::pipeline_state::Stage::Archived { .. } + | crate::pipeline_state::Stage::Abandoned { .. } + | crate::pipeline_state::Stage::Superseded { .. } + ) { + on_terminal_transition(project_root, &item.story_id.0).await; + } + } +} + /// Create the worktree and feature branch for `story_id` when it enters `Stage::Coding`. pub(crate) async fn on_coding_transition(project_root: &Path, port: u16, story_id: &str) { let config = match crate::config::ProjectConfig::load(project_root) { diff --git a/server/src/config/mod.rs b/server/src/config/mod.rs index 901edbeb..28d7bc74 100644 --- a/server/src/config/mod.rs +++ b/server/src/config/mod.rs @@ -161,6 +161,12 @@ pub struct WatcherConfig { /// moved to `6_archived/`. Default: 14400 (4 hours). #[serde(default = "default_done_retention_secs")] pub done_retention_secs: u64, + /// How often (in seconds) the periodic reconciler runs to converge + /// subscriber side effects. The reconciler calls each subscriber's + /// `reconcile()` entry point so that Lagged events never leave persistent + /// state diverged. Default: 30 seconds. + #[serde(default = "default_reconcile_interval_secs")] + pub reconcile_interval_secs: u64, } impl Default for WatcherConfig { @@ -168,6 +174,7 @@ impl Default for WatcherConfig { Self { sweep_interval_secs: default_sweep_interval_secs(), done_retention_secs: default_done_retention_secs(), + reconcile_interval_secs: default_reconcile_interval_secs(), } } } @@ -180,6 +187,10 @@ fn default_done_retention_secs() -> u64 { 4 * 60 * 60 // 4 hours } +fn default_reconcile_interval_secs() -> u64 { + 30 +} + fn default_qa() -> String { "server".to_string() } diff --git a/server/src/io/watcher/mod.rs b/server/src/io/watcher/mod.rs index 04ad3d7b..e0e609fa 100644 --- a/server/src/io/watcher/mod.rs +++ b/server/src/io/watcher/mod.rs @@ -21,7 +21,6 @@ mod sweep; pub use events::WatcherEvent; pub(crate) use sweep::spawn_done_to_archived_subscriber; -#[cfg(test)] pub(crate) use sweep::sweep_done_to_archived; use crate::slog; diff --git a/server/src/io/watcher/sweep.rs b/server/src/io/watcher/sweep.rs index 151babaa..5d5cb165 100644 --- a/server/src/io/watcher/sweep.rs +++ b/server/src/io/watcher/sweep.rs @@ -70,7 +70,7 @@ pub(crate) fn spawn_done_to_archived_subscriber(done_retention: Duration) { }); } -/// Sweep items in `Stage::Done` whose `merged_at` timestamp exceeds the +/// Reconcile: sweep items in `Stage::Done` whose `merged_at` timestamp exceeds the /// retention duration to `Stage::Archived` via the typed transition table. /// /// Routes through [`crate::pipeline_state::apply_transition`] so the @@ -78,9 +78,10 @@ pub(crate) fn spawn_done_to_archived_subscriber(done_retention: Duration) { /// `TransitionFired` event is emitted to subscribers (worktree pruning, /// matrix notifier, etc.). /// -/// Used in tests for direct one-shot sweeps; production code uses +/// Called at startup and by the periodic reconciler to archive Done stories +/// whose retention has elapsed, even when the `TransitionFired` subscriber +/// lagged and missed their Done event. Production reactive archiving uses /// [`spawn_done_to_archived_subscriber`] instead. -#[cfg(test)] pub(crate) fn sweep_done_to_archived(done_retention: Duration) { use crate::pipeline_state::{PipelineEvent, Stage, apply_transition, read_all_typed}; diff --git a/server/src/pipeline_state/events.rs b/server/src/pipeline_state/events.rs index 9056d25a..21f99012 100644 --- a/server/src/pipeline_state/events.rs +++ b/server/src/pipeline_state/events.rs @@ -36,32 +36,6 @@ pub(super) fn try_broadcast(fired: &TransitionFired) { let _ = get_or_init_tx().send(fired.clone()); } -/// Replay the current CRDT pipeline state as a burst of synthetic -/// [`TransitionFired`] events at server startup. -/// -/// Reads every item from the CRDT and broadcasts a self-transition -/// (`before == after`) for each one so that all existing subscribers -/// (worktree lifecycle, merge-failure auto-spawn, auto-assign) react -/// identically to a live event. This replaces the legacy scan-based -/// `reconcile_on_startup` path. -/// -/// Idempotent: a second call produces another burst of events, but every -/// subscriber already guards against duplicate work (e.g. -/// `is_story_assigned_for_stage` returns true once an agent is running, -/// and worktree creation is a no-op when the worktree already exists). -pub fn replay_current_pipeline_state() { - for item in super::read_all_typed() { - let fired = TransitionFired { - story_id: item.story_id.clone(), - before: item.stage.clone(), - after: item.stage, - event: super::PipelineEvent::DepsMet, - at: chrono::Utc::now(), - }; - try_broadcast(&fired); - } -} - /// Fired when a pipeline stage transition completes. #[derive(Debug, Clone)] pub struct TransitionFired { @@ -183,58 +157,4 @@ mod tests { } // ── TransitionError Display ───────────────────────────────────────── - - // ── replay_current_pipeline_state ────────────────────────────────── - - /// AC1: replay broadcasts a synthetic event for every item in the CRDT. - #[test] - fn replay_broadcasts_event_for_crdt_item_in_coding_stage() { - crate::crdt_state::init_for_test(); - crate::db::ensure_content_store(); - - let story_id = "9901_replay_coding"; - crate::db::write_item_with_content( - story_id, - "2_current", - "---\nname: Replay Coding\n---\n", - crate::db::ItemMeta::named("Replay Coding"), - ); - - let mut rx = subscribe_transitions(); - replay_current_pipeline_state(); - - let mut found = false; - while let Ok(fired) = rx.try_recv() { - if fired.story_id.0 == story_id && matches!(fired.after, Stage::Coding { .. }) { - found = true; - } - } - assert!( - found, - "replay must broadcast a Coding event for a story in 2_current" - ); - } - - /// AC3: calling replay_current_pipeline_state twice fires events both times. - /// - /// Pool-state idempotency (no duplicate agents) is enforced by subscribers, - /// not by the replay function itself. This test verifies that replay is safe - /// to call multiple times without panicking. - #[test] - fn replay_twice_does_not_panic() { - crate::crdt_state::init_for_test(); - crate::db::ensure_content_store(); - - let story_id = "9902_replay_idem"; - crate::db::write_item_with_content( - story_id, - "3_qa", - "---\nname: Replay QA\n---\n", - crate::db::ItemMeta::named("Replay QA"), - ); - - // Two successive replays must not panic. - replay_current_pipeline_state(); - replay_current_pipeline_state(); - } } diff --git a/server/src/pipeline_state/mod.rs b/server/src/pipeline_state/mod.rs index 85fc642e..8c1f03c6 100644 --- a/server/src/pipeline_state/mod.rs +++ b/server/src/pipeline_state/mod.rs @@ -51,10 +51,7 @@ pub use transition::{ }; #[allow(unused_imports)] -pub use events::{ - EventBus, TransitionFired, TransitionSubscriber, replay_current_pipeline_state, - subscribe_transitions, -}; +pub use events::{EventBus, TransitionFired, TransitionSubscriber, subscribe_transitions}; #[allow(unused_imports)] pub use projection::ProjectionError; @@ -66,6 +63,7 @@ pub use apply::{ transition_to_unfrozen, }; +pub(crate) use subscribers::reconcile_audit_log; pub use subscribers::spawn_audit_log_subscriber; #[allow(unused_imports)] diff --git a/server/src/pipeline_state/subscribers.rs b/server/src/pipeline_state/subscribers.rs index 78e9222d..d27298d1 100644 --- a/server/src/pipeline_state/subscribers.rs +++ b/server/src/pipeline_state/subscribers.rs @@ -35,6 +35,14 @@ impl TransitionSubscriber for AuditLogSubscriber { } } +/// Reconcile: no-op for the audit log subscriber. +/// +/// The audit log records live transitions only. Replaying historical CRDT state at +/// reconcile time would produce misleading entries (wrong timestamps, duplicate lines). +/// Eventual consistency of the audit log is not required — missed events are simply +/// absent from the log, which is acceptable. +pub(crate) fn reconcile_audit_log() {} + /// Spawn a background task that writes a structured audit log entry for every pipeline transition. /// /// Subscribes to the transition broadcast channel. Every `TransitionFired` event produces diff --git a/server/src/service/settings/project.rs b/server/src/service/settings/project.rs index b32374cf..156bb421 100644 --- a/server/src/service/settings/project.rs +++ b/server/src/service/settings/project.rs @@ -191,6 +191,7 @@ mod tests { watcher: crate::config::WatcherConfig { sweep_interval_secs: 30, done_retention_secs: 7200, + reconcile_interval_secs: 30, }, ..Default::default() }; diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index c95c5350..8a52ead8 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -156,6 +156,17 @@ pub(crate) fn spawn_tick_loop( {scheduled_count} scheduled timer(s)" ); + let (reconcile_interval, done_retention) = root + .as_ref() + .and_then(|r| config::ProjectConfig::load(r).ok()) + .map(|c| { + ( + c.watcher.reconcile_interval_secs, + std::time::Duration::from_secs(c.watcher.done_retention_secs), + ) + }) + .unwrap_or((30, std::time::Duration::from_secs(4 * 3600))); + tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); let mut tick_count: u64 = 0; @@ -190,6 +201,15 @@ pub(crate) fn spawn_tick_loop( } agents.reap_stale_merge_jobs(); } + + // Periodic reconciler: converge subscriber side effects so that + // Lagged broadcast events never leave state permanently diverged. + if tick_count.is_multiple_of(reconcile_interval) + && let Some(ref r) = root + { + crate::slog!("[reconcile] Running periodic reconcile pass."); + run_reconcile_pass(r, &agents, done_retention).await; + } } }); } @@ -450,16 +470,50 @@ async fn execute_prompt_action( } } -/// Spawn the startup reconstruction task: replay the current pipeline state -/// through the [`TransitionFired`][crate::pipeline_state::TransitionFired] -/// broadcast channel so that all existing subscribers (worktree lifecycle, -/// merge-failure auto-spawn, auto-assign) react identically to a live -/// transition, then trigger a full auto-assign pass. +/// Run one full reconcile pass: call each subscriber's idempotent `reconcile()` +/// entry point so that side effects converge regardless of whether the +/// broadcast channel lagged during startup or at runtime. /// -/// Replaces the legacy scan-based `reconcile_on_startup` approach. The CRDT -/// is the durable source of truth; replaying it as synthetic self-transitions -/// is cheaper, simpler, and idempotent: a second replay produces another burst -/// of events that subscribers safely ignore for already-assigned stories. +/// Safe to call any number of times — every reconcile function is idempotent. +pub(crate) async fn run_reconcile_pass( + root: &std::path::Path, + agents: &Arc, + done_retention: std::time::Duration, +) { + // Content-GC: purge content-store entries for terminal/tombstoned stories. + crate::db::gc::sweep_zombie_content_on_startup(); + + // Worktree create: ensure every Coding story has a worktree. + crate::agents::pool::worktree_lifecycle::reconcile_worktree_create(root, agents.port()).await; + + // Worktree cleanup: remove worktrees for terminal stories. + crate::agents::pool::worktree_lifecycle::reconcile_worktree_cleanup(root).await; + + // Done-archive: archive Done stories whose retention period has elapsed. + crate::io::watcher::sweep_done_to_archived(done_retention); + + // Cost-rollup: re-populate the in-memory register from disk. + crate::agents::pool::cost_rollup_subscriber::reconcile_cost_rollup(root); + + // Merge-failure: spawn mergemaster for ConflictDetected stories with no active agent. + crate::agents::pool::auto_assign::reconcile_merge_failure(agents, root).await; + + // Merge-block: no-op (in-memory counter cannot be reconstructed from CRDT). + crate::agents::pool::auto_assign::reconcile_merge_failure_block(); + + // Audit-log: no-op (historical replay would produce misleading entries). + crate::pipeline_state::reconcile_audit_log(); +} + +/// Spawn the startup reconciliation task: run a full reconcile pass so that all +/// side-effect subscribers converge on the current CRDT state without flooding +/// the broadcast channel, then trigger a full auto-assign pass. +/// +/// Replaces the former `replay_current_pipeline_state()` approach, which +/// sent one synthetic `TransitionFired` per CRDT item through the broadcast +/// channel. With >256 items that caused `Subscriber lagged` warnings and +/// left subscribers with diverged state. Direct reconcile calls bypass the +/// channel entirely and scale to any CRDT size. pub(crate) fn spawn_startup_reconciliation( startup_root: Option, startup_agents: Arc, @@ -467,20 +521,189 @@ pub(crate) fn spawn_startup_reconciliation( ) { if let Some(root) = startup_root { tokio::spawn(async move { - // Purge content-store entries for stories that reached terminal - // stages in a previous session (before the GC subscriber was active). - crate::db::gc::sweep_zombie_content_on_startup(); - crate::slog!( - "[startup] Replaying current pipeline state through TransitionFired channel." - ); - crate::pipeline_state::replay_current_pipeline_state(); + let done_retention = crate::config::ProjectConfig::load(&root) + .map(|c| std::time::Duration::from_secs(c.watcher.done_retention_secs)) + .unwrap_or_else(|_| std::time::Duration::from_secs(4 * 3600)); + crate::slog!("[startup] Running per-subscriber reconcile pass."); + run_reconcile_pass(&root, &startup_agents, done_retention).await; crate::slog!("[auto-assign] Scanning pipeline stages for unassigned work."); startup_agents.auto_assign_available_work(&root).await; let _ = startup_reconciliation_tx.send(ReconciliationEvent { story_id: String::new(), status: "done".to_string(), - message: "Startup event replay complete.".to_string(), + message: "Startup reconcile pass complete.".to_string(), }); }); } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::db::{ + ContentKey, ItemMeta, ensure_content_store, write_content, write_item_with_content, + }; + use crate::io::watcher::WatcherEvent; + use tokio::sync::broadcast; + + fn make_pool() -> Arc { + let (tx, _) = broadcast::channel::(16); + Arc::new(AgentPool::new(3099, tx)) + } + + fn setup_huskies_dir(tmp: &tempfile::TempDir) -> std::path::PathBuf { + let root = tmp.path().to_path_buf(); + std::fs::create_dir_all(root.join(".huskies")).unwrap(); + std::fs::write(root.join(".huskies/project.toml"), "").unwrap(); + root + } + + /// AC4 + AC6: seeding >256 CRDT items and running the reconcile pass must not + /// produce any "Subscriber lagged" warnings (structural guarantee — the new + /// path never broadcasts through the channel) and must purge zombie content + /// for all terminal stories after one reconcile tick. + /// + /// Distribution: 300 Backlog + 200 Coding + 200 Abandoned (terminal) + 300 QA + /// = 1000 items. Each of the 200 Abandoned stories gets a content-store entry + /// seeded before the reconcile so we can assert it is cleaned up. + #[tokio::test] + async fn reconcile_pass_scales_to_1000_items_without_lagged_divergence() { + crate::crdt_state::init_for_test(); + ensure_content_store(); + + let tmp = tempfile::tempdir().unwrap(); + let root = setup_huskies_dir(&tmp); + let pool = make_pool(); + + // ── Seed 1000 items across several stages ────────────────────────── + for i in 0..300u32 { + let id = format!("1066_backlog_{i:04}"); + write_item_with_content( + &id, + "1_backlog", + "---\nname: Backlog\n---\n", + ItemMeta::named("Backlog"), + ); + } + for i in 0..200u32 { + let id = format!("1066_coding_{i:04}"); + write_item_with_content( + &id, + "2_current", + "---\nname: Coding\n---\n", + ItemMeta::named("Coding"), + ); + } + for i in 0..200u32 { + let id = format!("1066_abandoned_{i:04}"); + write_item_with_content( + &id, + "2_current", + "---\nname: Abandoned\n---\n", + ItemMeta::named("Abandoned"), + ); + // Move to terminal stage (Abandoned). + crate::agents::lifecycle::abandon_story(&id).expect("abandon must succeed"); + // Seed a content-store entry to verify GC cleans it up. + write_content(ContentKey::Story(&id), "zombie content"); + } + for i in 0..300u32 { + let id = format!("1066_qa_{i:04}"); + write_item_with_content(&id, "3_qa", "---\nname: QA\n---\n", ItemMeta::named("QA")); + } + + // ── Subscribe BEFORE the reconcile to catch any Lagged events ────── + let mut transition_rx = crate::pipeline_state::subscribe_transitions(); + + // ── Run one reconcile pass ───────────────────────────────────────── + // Use zero retention so any Done items (none here, but defensive) archive immediately. + run_reconcile_pass(&root, &pool, std::time::Duration::ZERO).await; + + // ── Drain the transition channel; must contain zero Lagged events ── + // The reconcile path never broadcasts through TRANSITION_TX, so any + // events here are from the abandon_story calls above (all pre-reconcile). + let mut lagged_count = 0u64; + loop { + match transition_rx.try_recv() { + Ok(_) => {} + Err(tokio::sync::broadcast::error::TryRecvError::Lagged(n)) => { + lagged_count += n; + } + Err(tokio::sync::broadcast::error::TryRecvError::Empty) + | Err(tokio::sync::broadcast::error::TryRecvError::Closed) => break, + } + } + + // The reconcile pass itself must not have sent anything through the channel. + // (abandon_story above may have sent some events, but those are pre-reconcile + // lifecycle transitions, not the reconcile itself.) + assert_eq!( + lagged_count, 0, + "run_reconcile_pass must not broadcast through the transition channel (no Lagged)" + ); + + // ── Assert: zombie content purged for all 200 Abandoned stories ──── + for i in 0..200u32 { + let id = format!("1066_abandoned_{i:04}"); + assert!( + crate::db::read_content(ContentKey::Story(&id)).is_none(), + "zombie content must be purged for abandoned story {id}" + ); + } + } + + /// AC4 regression: the subscriber channel (capacity 256) must not lag when + /// 1000 items are seeded — the reconcile path bypasses the channel entirely. + #[tokio::test] + async fn reconcile_never_floods_broadcast_channel() { + crate::crdt_state::init_for_test(); + ensure_content_store(); + + let tmp = tempfile::tempdir().unwrap(); + let root = setup_huskies_dir(&tmp); + let pool = make_pool(); + + // Seed 1000 Backlog items (no lifecycle transitions — clean slate). + for i in 0..1000u32 { + let id = format!("1066_flood_{i:04}"); + write_item_with_content( + &id, + "1_backlog", + "---\nname: Flood\n---\n", + ItemMeta::named("Flood"), + ); + } + + // Subscribe after seeding and drain any pre-existing channel noise from + // concurrent tests before checking that the reconcile pass adds nothing. + let mut rx = crate::pipeline_state::subscribe_transitions(); + while let Ok(_) | Err(tokio::sync::broadcast::error::TryRecvError::Lagged(_)) = + rx.try_recv() + {} + + run_reconcile_pass(&root, &pool, std::time::Duration::ZERO).await; + + // The channel must have received exactly zero messages from run_reconcile_pass. + let mut msg_count = 0u64; + let mut lagged = false; + loop { + match rx.try_recv() { + Ok(_) => msg_count += 1, + Err(tokio::sync::broadcast::error::TryRecvError::Lagged(_)) => { + lagged = true; + break; + } + Err(_) => break, + } + } + + assert!( + !lagged, + "run_reconcile_pass must never cause Lagged on the broadcast channel" + ); + assert_eq!( + msg_count, 0, + "run_reconcile_pass must not send any TransitionFired events" + ); + } +}