//! Background tasks: CRDT-event bridge, auto-assign subscriber, unified tick //! loop, gateway relay, and startup reconciliation. use crate::agents::{AgentPool, ReconciliationEvent}; use crate::config; use crate::gateway_relay; use crate::io; use crate::service; use crate::service::status::StatusBroadcaster; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::broadcast; /// Bridge CRDT state-transition events to the watcher broadcast channel and /// spawn the auto-assign subscriber that triggers on active-stage transitions. pub(crate) fn spawn_event_bridges( watcher_tx: broadcast::Sender, project_root: Option, agents: Arc, ) { // Audit log subscriber: write one structured line per pipeline transition. crate::pipeline_state::spawn_audit_log_subscriber(); // CRDT → watcher bridge: translate CRDT stage-transition events into // WatcherEvent::WorkItem so downstream consumers (WebSocket, auto-assign) // see a uniform stream regardless of whether the event originated from the // filesystem watcher or from a CRDT sync peer. { let crdt_watcher_tx = watcher_tx.clone(); if let Some(mut crdt_rx) = crate::crdt_state::subscribe() { tokio::spawn(async move { while let Ok(evt) = crdt_rx.recv().await { let (action, commit_msg) = io::watcher::stage_metadata(&evt.to_stage, &evt.story_id); let watcher_evt = io::watcher::WatcherEvent::WorkItem { stage: evt.to_stage.dir_name().to_string(), item_id: evt.story_id, action: action.to_string(), commit_msg, from_stage: evt.from_stage.map(|s| s.dir_name().to_string()), }; let _ = crdt_watcher_tx.send(watcher_evt); } }); } } // Auto-assign: trigger `auto_assign_available_work` on every work-item // CRDT state-transition event. auto_assign_available_work is idempotent // and noops where there is nothing to do, so firing on every transition // ensures that MergeFailure and other non-"active" stages are covered // without any per-stage special-casing. if let Some(root) = project_root { // Worktree lifecycle subscribers: create worktrees on Stage::Coding // and remove them on terminal stages (Done, Archived, Abandoned, Superseded). crate::agents::pool::worktree_lifecycle::spawn_worktree_create_subscriber( root.clone(), agents.port(), ); crate::agents::pool::worktree_lifecycle::spawn_worktree_cleanup_subscriber(root.clone()); // Mergemaster auto-spawn subscriber: reacts to TransitionFired events for // Stage::MergeFailure { kind: ConflictDetected } and spawns mergemaster // directly from the typed event, eliminating the predicate-mismatch // failure mode of the previous scan-loop approach (story 998). crate::agents::pool::auto_assign::spawn_merge_failure_subscriber( Arc::clone(&agents), root.clone(), ); // Consecutive-failure auto-block subscriber: blocks stories after N // consecutive MergeFailure transitions (story 1018). crate::agents::pool::auto_assign::spawn_merge_failure_block_subscriber(root.clone()); // Content-store GC subscriber: purges all ContentKey::* entries for a // story when it reaches a terminal stage, preventing zombie entries from // accumulating in the process heap (story 996). crate::db::gc::spawn_content_gc_subscriber(); // Cost-rollup bootstrap: pre-populate the register from existing JSONL // so status renderers show correct costs after a server restart. crate::service::agents::cost_rollup::init_from_disk(&root); // Cost-rollup subscriber: snapshots per-story token costs into the // in-memory register whenever a story reaches a terminal stage. crate::agents::pool::cost_rollup_subscriber::spawn_cost_rollup_subscriber(root.clone()); // Done→archived subscriber: archives Done stories after the configured // retention period. Fires on each Stage::Done TransitionFired event and // sleeps for the remaining retention time from merged_at before archiving. // Replaces the periodic sweep_done_to_archived scan that ran on every // sweep_interval_secs tick. { let done_retention = config::ProjectConfig::load(&root) .map(|c| std::time::Duration::from_secs(c.watcher.done_retention_secs)) .unwrap_or_else(|_| std::time::Duration::from_secs(4 * 3600)); io::watcher::spawn_done_to_archived_subscriber(done_retention); } let watcher_auto_rx = watcher_tx.subscribe(); let watcher_auto_agents = Arc::clone(&agents); tokio::spawn(async move { let mut rx = watcher_auto_rx; while let Ok(event) = rx.recv().await { if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event { crate::slog!( "[auto-assign] CRDT transition detected in {stage}/; \ triggering auto-assign." ); watcher_auto_agents.auto_assign_available_work(&root).await; } } }); } } /// Spawn the unified 1-second background tick loop. /// /// Handles only genuinely time-based work: /// - **Timer tick** (every second): fires due pipeline timers by comparing the /// current wall-clock time against each timer's `due_at` field. Cannot be /// reactive because timers encode absolute timestamps that only become /// actionable when the clock reaches them. /// - **Agent watchdog** (every 30 seconds): detects orphaned `Running` agents /// by comparing elapsed time since the last heartbeat. Cannot be reactive /// because the absence of an event (no heartbeat) is what signals a problem; /// a `TransitionFired` subscriber would never fire for a silently crashed agent. /// /// Stage-change-reactive work (done→archived archival, worktree cleanup) has /// been moved to `TransitionFired` subscribers spawned from `spawn_event_bridges`. pub(crate) fn spawn_tick_loop( agents: Arc, timer_store: Arc, root: Option, ) { let pending_count = timer_store.list().len(); crate::slog!("[tick] Unified tick loop started; {pending_count} pending timer(s)"); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); let mut tick_count: u64 = 0; loop { interval.tick().await; tick_count = tick_count.wrapping_add(1); // Time-based: timers encode absolute due timestamps; only a // wall-clock comparison can determine when one is due. if let Some(ref r) = root { let result = service::timer::tick_once(&timer_store, &agents, r).await; if let Err(msg) = result { crate::slog_error!("[tick] Timer tick panicked: {msg}"); } } // Time-based: the watchdog detects silence (no heartbeat within a // timeout window). A `TransitionFired` subscriber cannot observe the // absence of events, so this must remain on a periodic tick. if tick_count.is_multiple_of(30) { let found = agents.run_watchdog_pass(root.as_deref()); if found > 0 { crate::slog!( "[tick] {found} orphaned agent(s) detected; triggering auto-assign." ); if let Some(ref r) = root { agents.auto_assign_available_work(r).await; } } agents.reap_stale_merge_jobs(); } } }); } /// Spawn the gateway relay task if `gateway_url` is configured in /// `project.toml` or the `HUSKIES_GATEWAY_URL` environment variable. pub(crate) fn spawn_gateway_relay(startup_root: &Option, status: Arc) { let relay_gateway_url = startup_root .as_ref() .and_then(|r| config::ProjectConfig::load(r).ok()) .and_then(|c| c.gateway_url) .or_else(|| std::env::var("HUSKIES_GATEWAY_URL").ok()) .unwrap_or_default(); if !relay_gateway_url.is_empty() { let relay_project_name = startup_root .as_ref() .and_then(|r| config::ProjectConfig::load(r).ok()) .and_then(|c| c.gateway_project) .or_else(|| std::env::var("HUSKIES_GATEWAY_PROJECT").ok()) .or_else(|| { startup_root .as_ref() .and_then(|r| r.file_name()) .map(|n| n.to_string_lossy().into_owned()) }) .unwrap_or_else(|| "project".to_string()); gateway_relay::spawn_relay_task( relay_gateway_url, relay_project_name, status, reqwest::Client::new(), ); } } /// Spawn the startup reconstruction task: replay the current pipeline state /// through the [`TransitionFired`][crate::pipeline_state::TransitionFired] /// broadcast channel so that all existing subscribers (worktree lifecycle, /// merge-failure auto-spawn, auto-assign) react identically to a live /// transition, then trigger a full auto-assign pass. /// /// Replaces the legacy scan-based `reconcile_on_startup` approach. The CRDT /// is the durable source of truth; replaying it as synthetic self-transitions /// is cheaper, simpler, and idempotent: a second replay produces another burst /// of events that subscribers safely ignore for already-assigned stories. pub(crate) fn spawn_startup_reconciliation( startup_root: Option, startup_agents: Arc, startup_reconciliation_tx: broadcast::Sender, ) { if let Some(root) = startup_root { tokio::spawn(async move { // Purge content-store entries for stories that reached terminal // stages in a previous session (before the GC subscriber was active). crate::db::gc::sweep_zombie_content_on_startup(); crate::slog!( "[startup] Replaying current pipeline state through TransitionFired channel." ); crate::pipeline_state::replay_current_pipeline_state(); crate::slog!("[auto-assign] Scanning pipeline stages for unassigned work."); startup_agents.auto_assign_available_work(&root).await; let _ = startup_reconciliation_tx.send(ReconciliationEvent { story_id: String::new(), status: "done".to_string(), message: "Startup event replay complete.".to_string(), }); }); } }