//! Background tasks: CRDT-event bridge, auto-assign subscriber, unified tick //! loop, gateway relay, and startup reconciliation. use crate::agents::{AgentPool, ReconciliationEvent}; use crate::config; use crate::gateway_relay; use crate::http::context::AppContext; use crate::http::mcp::dispatch::dispatch_tool_call; use crate::io; use crate::service; use crate::service::event_triggers::store::EventTriggerStore; use crate::service::event_triggers::{FireMode, TriggerAction}; use crate::service::status::StatusBroadcaster; use crate::service::timer::scheduled::{ ScheduledTimer, ScheduledTimerStore, TimerAction, TimerMode, }; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::broadcast; /// Bridge CRDT state-transition events to the watcher broadcast channel and /// spawn the auto-assign subscriber that triggers on active-stage transitions. pub(crate) fn spawn_event_bridges( watcher_tx: broadcast::Sender, project_root: Option, agents: Arc, ) { // Audit log subscriber: write one structured line per pipeline transition. crate::pipeline_state::spawn_audit_log_subscriber(); // CRDT → watcher bridge: translate CRDT stage-transition events into // WatcherEvent::WorkItem so downstream consumers (WebSocket, auto-assign) // see a uniform stream regardless of whether the event originated from the // filesystem watcher or from a CRDT sync peer. { let crdt_watcher_tx = watcher_tx.clone(); if let Some(mut crdt_rx) = crate::crdt_state::subscribe() { tokio::spawn(async move { while let Ok(evt) = crdt_rx.recv().await { let (action, commit_msg) = io::watcher::stage_metadata(&evt.to_stage, &evt.story_id); let watcher_evt = io::watcher::WatcherEvent::WorkItem { stage: evt.to_stage.dir_name().to_string(), item_id: evt.story_id, action: action.to_string(), commit_msg, from_stage: evt.from_stage.map(|s| s.dir_name().to_string()), }; let _ = crdt_watcher_tx.send(watcher_evt); } }); } } // Auto-assign: trigger `auto_assign_available_work` on every work-item // CRDT state-transition event. auto_assign_available_work is idempotent // and noops where there is nothing to do, so firing on every transition // ensures that MergeFailure and other non-"active" stages are covered // without any per-stage special-casing. if let Some(root) = project_root { // Worktree lifecycle subscribers: create worktrees on Stage::Coding // and remove them on terminal stages (Done, Archived, Abandoned, Superseded). crate::agents::pool::worktree_lifecycle::spawn_worktree_create_subscriber( root.clone(), agents.port(), ); crate::agents::pool::worktree_lifecycle::spawn_worktree_cleanup_subscriber(root.clone()); // Mergemaster auto-spawn subscriber: reacts to TransitionFired events for // Stage::MergeFailure { kind: ConflictDetected } and spawns mergemaster // directly from the typed event, eliminating the predicate-mismatch // failure mode of the previous scan-loop approach (story 998). crate::agents::pool::auto_assign::spawn_merge_failure_subscriber( Arc::clone(&agents), root.clone(), ); // Consecutive-failure auto-block subscriber: blocks stories after N // consecutive MergeFailure transitions (story 1018). Bug 1025: takes // the agent pool so it can gate the counter on mergemaster presence — // failures during active recovery iteration do not count toward block. crate::agents::pool::auto_assign::spawn_merge_failure_block_subscriber( Arc::clone(&agents), root.clone(), ); // Content-store GC subscriber: purges all ContentKey::* entries for a // story when it reaches a terminal stage, preventing zombie entries from // accumulating in the process heap (story 996). crate::db::gc::spawn_content_gc_subscriber(); // Cost-rollup bootstrap: pre-populate the register from existing JSONL // so status renderers show correct costs after a server restart. crate::service::agents::cost_rollup::init_from_disk(&root); // Cost-rollup subscriber: snapshots per-story token costs into the // in-memory register whenever a story reaches a terminal stage. crate::agents::pool::cost_rollup_subscriber::spawn_cost_rollup_subscriber(root.clone()); // Done→archived subscriber: archives Done stories after the configured // retention period. Fires on each Stage::Done TransitionFired event and // sleeps for the remaining retention time from merged_at before archiving. // Replaces the periodic sweep_done_to_archived scan that ran on every // sweep_interval_secs tick. { let done_retention = config::ProjectConfig::load(&root) .map(|c| std::time::Duration::from_secs(c.watcher.done_retention_secs)) .unwrap_or_else(|_| std::time::Duration::from_secs(4 * 3600)); io::watcher::spawn_done_to_archived_subscriber(done_retention); } let watcher_auto_rx = watcher_tx.subscribe(); let watcher_auto_agents = Arc::clone(&agents); tokio::spawn(async move { let mut rx = watcher_auto_rx; while let Ok(event) = rx.recv().await { if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event { crate::slog!( "[auto-assign] CRDT transition detected in {stage}/; \ triggering auto-assign." ); watcher_auto_agents.auto_assign_available_work(&root).await; } } }); } } /// Spawn the unified 1-second background tick loop. /// /// Handles only genuinely time-based work: /// - **Timer tick** (every second): fires due pipeline timers by comparing the /// current wall-clock time against each timer's `due_at` field. Cannot be /// reactive because timers encode absolute timestamps that only become /// actionable when the clock reaches them. /// - **Scheduled timer tick** (every second): fires generic scheduled timers /// registered via the `schedule_timer` MCP tool. /// - **Agent watchdog** (every 30 seconds): detects orphaned `Running` agents /// by comparing elapsed time since the last heartbeat. Cannot be reactive /// because the absence of an event (no heartbeat) is what signals a problem; /// a `TransitionFired` subscriber would never fire for a silently crashed agent. /// /// Stage-change-reactive work (done→archived archival, worktree cleanup) has /// been moved to `TransitionFired` subscribers spawned from `spawn_event_bridges`. pub(crate) fn spawn_tick_loop( agents: Arc, timer_store: Arc, scheduled_timer_store: Arc, root: Option, ctx: AppContext, ) { let pending_count = timer_store.list().len(); let scheduled_count = scheduled_timer_store.list().len(); crate::slog!( "[tick] Unified tick loop started; {pending_count} rate-limit timer(s), \ {scheduled_count} scheduled timer(s)" ); let (reconcile_interval, done_retention) = root .as_ref() .and_then(|r| config::ProjectConfig::load(r).ok()) .map(|c| { ( c.watcher.reconcile_interval_secs, std::time::Duration::from_secs(c.watcher.done_retention_secs), ) }) .unwrap_or((30, std::time::Duration::from_secs(4 * 3600))); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); let mut tick_count: u64 = 0; loop { interval.tick().await; tick_count = tick_count.wrapping_add(1); // Time-based: timers encode absolute due timestamps; only a // wall-clock comparison can determine when one is due. if let Some(ref r) = root { let result = service::timer::tick_once(&timer_store, &agents, r).await; if let Err(msg) = result { crate::slog_error!("[tick] Timer tick panicked: {msg}"); } } // Generic scheduled timer tick. tick_scheduled_timers(&scheduled_timer_store, &ctx).await; // Time-based: the watchdog detects silence (no heartbeat within a // timeout window). A `TransitionFired` subscriber cannot observe the // absence of events, so this must remain on a periodic tick. if tick_count.is_multiple_of(30) { let found = agents.run_watchdog_pass(root.as_deref()); if found > 0 { crate::slog!( "[tick] {found} orphaned agent(s) detected; triggering auto-assign." ); if let Some(ref r) = root { agents.auto_assign_available_work(r).await; } } agents.reap_stale_merge_jobs(); } // Periodic reconciler: converge subscriber side effects so that // Lagged broadcast events never leave state permanently diverged. if tick_count.is_multiple_of(reconcile_interval) && let Some(ref r) = root { crate::slog!("[reconcile] Running periodic reconcile pass."); run_reconcile_pass(r, &agents, done_retention).await; } } }); } /// Fire any due generic scheduled timers and re-arm recurring ones. /// /// Called every second from the unified tick loop. Catch-up semantics: timers /// whose `fire_at` is already in the past fire on the next tick after boot. async fn tick_scheduled_timers(store: &Arc, ctx: &AppContext) { let due = store.take_due(chrono::Utc::now()); if due.is_empty() { return; } crate::slog!("[scheduled-timer] {} timer(s) due", due.len()); for timer in due { fire_scheduled_timer(&timer, ctx).await; // Re-arm recurring timers. if let TimerMode::Recurring { interval_secs } = &timer.mode { let next_fire = timer.fire_at + chrono::Duration::seconds(*interval_secs as i64); let rearmed = ScheduledTimer { id: timer.id.clone(), label: timer.label.clone(), fire_at: next_fire, action: timer.action.clone(), mode: timer.mode.clone(), created_at: timer.created_at, }; if let Err(e) = store.add(rearmed) { crate::slog_error!("[scheduled-timer] Failed to re-arm timer {}: {e}", timer.id); } else { crate::slog!( "[scheduled-timer] Recurring timer {} re-armed for {}", timer.id, next_fire ); } } } } /// Execute a single scheduled timer's action. async fn fire_scheduled_timer(timer: &ScheduledTimer, ctx: &AppContext) { let label = timer.label.as_deref().unwrap_or(&timer.id); match &timer.action { TimerAction::Mcp { method, args } => { crate::slog!( "[scheduled-timer] Firing MCP action '{}' for timer {}", method, timer.id ); match dispatch_tool_call(method, args.clone(), ctx).await { Ok(result) => { crate::slog!( "[scheduled-timer] Timer {} ('{}') fired OK: {}", timer.id, label, result.chars().take(200).collect::() ); } Err(e) => { crate::slog_error!( "[scheduled-timer] Timer {} ('{}') MCP call '{}' failed: {e}", timer.id, label, method ); } } } TimerAction::Prompt { text } => { crate::slog!( "[scheduled-timer] Prompt timer {} ('{}') fired: {}", timer.id, label, text ); } } } /// Spawn the gateway relay task if `gateway_url` is configured in /// `project.toml` or the `HUSKIES_GATEWAY_URL` environment variable. pub(crate) fn spawn_gateway_relay(startup_root: &Option, status: Arc) { let relay_gateway_url = startup_root .as_ref() .and_then(|r| config::ProjectConfig::load(r).ok()) .and_then(|c| c.gateway_url) .or_else(|| std::env::var("HUSKIES_GATEWAY_URL").ok()) .unwrap_or_default(); if !relay_gateway_url.is_empty() { let relay_project_name = startup_root .as_ref() .and_then(|r| config::ProjectConfig::load(r).ok()) .and_then(|c| c.gateway_project) .or_else(|| std::env::var("HUSKIES_GATEWAY_PROJECT").ok()) .or_else(|| { startup_root .as_ref() .and_then(|r| r.file_name()) .map(|n| n.to_string_lossy().into_owned()) }) .unwrap_or_else(|| "project".to_string()); gateway_relay::spawn_relay_task( relay_gateway_url, relay_project_name, status, reqwest::Client::new(), ); } } /// Spawn the event-trigger subscriber. /// /// Subscribes to [`crate::pipeline_state::subscribe_transitions`] and on each /// [`crate::pipeline_state::TransitionFired`] checks every registered trigger's /// predicate. Matching triggers have their action executed: /// /// - `Mcp`: dispatches the named MCP tool via the full `dispatch_tool_call` path. /// - `Prompt`: creates an ephemeral story and starts an agent on it. /// /// `Once` triggers are removed from the store after they fire; `Persistent` /// triggers remain until explicitly cancelled via `cancel_event_trigger`. pub(crate) fn spawn_event_trigger_subscriber( store: Arc, agents: Arc, project_root: Option, ctx: AppContext, ) { let mut rx = crate::pipeline_state::subscribe_transitions(); tokio::spawn(async move { loop { let fired = match rx.recv().await { Ok(f) => f, Err(broadcast::error::RecvError::Lagged(n)) => { crate::slog!( "[event-triggers] Lagged {n} transition events; some triggers may have been skipped" ); continue; } Err(broadcast::error::RecvError::Closed) => { crate::slog!("[event-triggers] Transition channel closed; subscriber stopping"); break; } }; let triggers = store.list(); if triggers.is_empty() { continue; } for trigger in &triggers { if !trigger.predicate.matches(&fired) { continue; } crate::slog!( "[event-triggers] Trigger {} matched: story={} {}→{}", trigger.id, fired.story_id.0, crate::pipeline_state::stage_label(&fired.before), crate::pipeline_state::stage_label(&fired.after), ); // Cancel once-mode triggers before dispatching the action so // that a server restart triggered by the action (e.g. // rebuild_and_restart) cannot find and replay the trigger. if trigger.mode == FireMode::Once { store.cancel(&trigger.id); } match &trigger.action { TriggerAction::Mcp { method, args } => { execute_mcp_action(method, args.clone(), &ctx).await; } TriggerAction::Prompt { text } => { execute_prompt_action( text, &fired.story_id.0, &agents, project_root.as_deref(), ) .await; } } } } }); } /// Execute an Mcp action by dispatching through the full MCP tool dispatch path. async fn execute_mcp_action(method: &str, args: serde_json::Value, ctx: &AppContext) { match crate::http::mcp::dispatch::dispatch_tool_call(method, args, ctx).await { Ok(result) => { crate::slog!("[event-triggers] Mcp '{method}' succeeded: {result}"); } Err(e) => { crate::slog!("[event-triggers] Mcp '{method}' failed: {e}"); } } } /// Execute a Prompt action: create an ephemeral story and start an agent on it. async fn execute_prompt_action( text: &str, triggering_story_id: &str, agents: &Arc, project_root: Option<&std::path::Path>, ) { let Some(root) = project_root else { crate::slog!("[event-triggers] Prompt action skipped (no project root configured): {text}"); return; }; // Allocate a new story ID for the ephemeral agent task. let num = crate::db::ops::next_item_number(); let story_id = format!("{num}_trigger_task"); let content = format!( "---\nname: Trigger Task\n---\n\ # Trigger Task\n\n\ _Auto-created by event trigger (source story: {triggering_story_id})_\n\n\ ## Task\n\n\ {text}\n\n\ ## Acceptance Criteria\n\n\ - [ ] Complete the task described above and exit.\n" ); crate::db::write_item_with_content( &story_id, "1_backlog", &content, crate::db::ItemMeta { name: Some("Trigger Task".to_string()), ..Default::default() }, ); if let Err(e) = crate::agents::lifecycle::move_story_to_current(&story_id) { crate::slog!("[event-triggers] Failed to move {story_id} to current: {e}"); return; } match agents.start_agent(root, &story_id, None, None, None).await { Ok(info) => { crate::slog!( "[event-triggers] Started agent {} for prompt task {story_id}", info.agent_name ); } Err(e) => { crate::slog!("[event-triggers] Failed to start agent for prompt task {story_id}: {e}"); } } } /// Run one full reconcile pass: call each subscriber's idempotent `reconcile()` /// entry point so that side effects converge regardless of whether the /// broadcast channel lagged during startup or at runtime. /// /// Safe to call any number of times — every reconcile function is idempotent. pub(crate) async fn run_reconcile_pass( root: &std::path::Path, agents: &Arc, done_retention: std::time::Duration, ) { // Content-GC: purge content-store entries for terminal/tombstoned stories. crate::db::gc::sweep_zombie_content_on_startup(); // Worktree create: ensure every Coding story has a worktree. crate::agents::pool::worktree_lifecycle::reconcile_worktree_create(root, agents.port()).await; // Worktree cleanup: remove worktrees for terminal stories. crate::agents::pool::worktree_lifecycle::reconcile_worktree_cleanup(root).await; // Done-archive: archive Done stories whose retention period has elapsed. crate::io::watcher::sweep_done_to_archived(done_retention); // Cost-rollup: re-populate the in-memory register from disk. crate::agents::pool::cost_rollup_subscriber::reconcile_cost_rollup(root); // Merge-failure: spawn mergemaster for ConflictDetected stories with no active agent. crate::agents::pool::auto_assign::reconcile_merge_failure(agents, root).await; // Merge-block: no-op (in-memory counter cannot be reconstructed from CRDT). crate::agents::pool::auto_assign::reconcile_merge_failure_block(); // Audit-log: no-op (historical replay would produce misleading entries). crate::pipeline_state::reconcile_audit_log(); } /// Spawn the startup reconciliation task: run a full reconcile pass so that all /// side-effect subscribers converge on the current CRDT state without flooding /// the broadcast channel, then trigger a full auto-assign pass. /// /// Replaces the former `replay_current_pipeline_state()` approach, which /// sent one synthetic `TransitionFired` per CRDT item through the broadcast /// channel. With >256 items that caused `Subscriber lagged` warnings and /// left subscribers with diverged state. Direct reconcile calls bypass the /// channel entirely and scale to any CRDT size. pub(crate) fn spawn_startup_reconciliation( startup_root: Option, startup_agents: Arc, startup_reconciliation_tx: broadcast::Sender, ) { if let Some(root) = startup_root { tokio::spawn(async move { let done_retention = crate::config::ProjectConfig::load(&root) .map(|c| std::time::Duration::from_secs(c.watcher.done_retention_secs)) .unwrap_or_else(|_| std::time::Duration::from_secs(4 * 3600)); crate::slog!("[startup] Running per-subscriber reconcile pass."); run_reconcile_pass(&root, &startup_agents, done_retention).await; crate::slog!("[auto-assign] Scanning pipeline stages for unassigned work."); startup_agents.auto_assign_available_work(&root).await; let _ = startup_reconciliation_tx.send(ReconciliationEvent { story_id: String::new(), status: "done".to_string(), message: "Startup reconcile pass complete.".to_string(), }); }); } } #[cfg(test)] mod tests { use super::*; use crate::db::{ ContentKey, ItemMeta, ensure_content_store, write_content, write_item_with_content, }; use crate::io::watcher::WatcherEvent; use tokio::sync::broadcast; fn make_pool() -> Arc { let (tx, _) = broadcast::channel::(16); Arc::new(AgentPool::new(3099, tx)) } fn setup_huskies_dir(tmp: &tempfile::TempDir) -> std::path::PathBuf { let root = tmp.path().to_path_buf(); std::fs::create_dir_all(root.join(".huskies")).unwrap(); std::fs::write(root.join(".huskies/project.toml"), "").unwrap(); root } /// AC4 + AC6: seeding >256 CRDT items and running the reconcile pass must not /// produce any "Subscriber lagged" warnings (structural guarantee — the new /// path never broadcasts through the channel) and must purge zombie content /// for all terminal stories after one reconcile tick. /// /// Distribution: 300 Backlog + 200 Coding + 200 Abandoned (terminal) + 300 QA /// = 1000 items. Each of the 200 Abandoned stories gets a content-store entry /// seeded before the reconcile so we can assert it is cleaned up. #[tokio::test] async fn reconcile_pass_scales_to_1000_items_without_lagged_divergence() { crate::crdt_state::init_for_test(); ensure_content_store(); let tmp = tempfile::tempdir().unwrap(); let root = setup_huskies_dir(&tmp); let pool = make_pool(); // ── Seed 1000 items across several stages ────────────────────────── for i in 0..300u32 { let id = format!("1066_backlog_{i:04}"); write_item_with_content( &id, "1_backlog", "---\nname: Backlog\n---\n", ItemMeta::named("Backlog"), ); } for i in 0..200u32 { let id = format!("1066_coding_{i:04}"); write_item_with_content( &id, "2_current", "---\nname: Coding\n---\n", ItemMeta::named("Coding"), ); } for i in 0..200u32 { let id = format!("1066_abandoned_{i:04}"); write_item_with_content( &id, "2_current", "---\nname: Abandoned\n---\n", ItemMeta::named("Abandoned"), ); // Move to terminal stage (Abandoned). crate::agents::lifecycle::abandon_story(&id).expect("abandon must succeed"); // Seed a content-store entry to verify GC cleans it up. write_content(ContentKey::Story(&id), "zombie content"); } for i in 0..300u32 { let id = format!("1066_qa_{i:04}"); write_item_with_content(&id, "3_qa", "---\nname: QA\n---\n", ItemMeta::named("QA")); } // ── Subscribe BEFORE the reconcile to catch any Lagged events ────── let mut transition_rx = crate::pipeline_state::subscribe_transitions(); // ── Run one reconcile pass ───────────────────────────────────────── // Use zero retention so any Done items (none here, but defensive) archive immediately. run_reconcile_pass(&root, &pool, std::time::Duration::ZERO).await; // ── Drain the transition channel; must contain zero Lagged events ── // The reconcile path never broadcasts through TRANSITION_TX, so any // events here are from the abandon_story calls above (all pre-reconcile). let mut lagged_count = 0u64; loop { match transition_rx.try_recv() { Ok(_) => {} Err(tokio::sync::broadcast::error::TryRecvError::Lagged(n)) => { lagged_count += n; } Err(tokio::sync::broadcast::error::TryRecvError::Empty) | Err(tokio::sync::broadcast::error::TryRecvError::Closed) => break, } } // The reconcile pass itself must not have sent anything through the channel. // (abandon_story above may have sent some events, but those are pre-reconcile // lifecycle transitions, not the reconcile itself.) assert_eq!( lagged_count, 0, "run_reconcile_pass must not broadcast through the transition channel (no Lagged)" ); // ── Assert: zombie content purged for all 200 Abandoned stories ──── for i in 0..200u32 { let id = format!("1066_abandoned_{i:04}"); assert!( crate::db::read_content(ContentKey::Story(&id)).is_none(), "zombie content must be purged for abandoned story {id}" ); } } /// AC4 regression: the subscriber channel (capacity 256) must not lag when /// 1000 items are seeded — the reconcile path bypasses the channel entirely. #[tokio::test] async fn reconcile_never_floods_broadcast_channel() { crate::crdt_state::init_for_test(); ensure_content_store(); let tmp = tempfile::tempdir().unwrap(); let root = setup_huskies_dir(&tmp); let pool = make_pool(); // Seed 1000 Backlog items (no lifecycle transitions — clean slate). for i in 0..1000u32 { let id = format!("1066_flood_{i:04}"); write_item_with_content( &id, "1_backlog", "---\nname: Flood\n---\n", ItemMeta::named("Flood"), ); } // Subscribe and drain pre-existing channel noise. Note: `TRANSITION_TX` // is a single process-global broadcast channel shared by every test in // this binary, so other tests running on parallel threads may write to // it during our window. We can't assert `msg_count == 0` — that's // racy by construction. The real "never floods" invariant is captured // by the Lagged check: 1000 seeded items must not overflow the // 256-slot channel, which is only possible if the reconcile path // bypasses the broadcast (which is what AC4 requires). let mut rx = crate::pipeline_state::subscribe_transitions(); while let Ok(_) | Err(tokio::sync::broadcast::error::TryRecvError::Lagged(_)) = rx.try_recv() {} run_reconcile_pass(&root, &pool, std::time::Duration::ZERO).await; let mut lagged = false; loop { match rx.try_recv() { Ok(_) => {} Err(tokio::sync::broadcast::error::TryRecvError::Lagged(_)) => { lagged = true; break; } Err(_) => break, } } assert!( !lagged, "run_reconcile_pass must never cause Lagged on the broadcast channel" ); } }