//! Background tasks: CRDT-event bridge, auto-assign subscriber, unified tick //! loop, gateway relay, and startup reconciliation. use crate::agents::{AgentPool, ReconciliationEvent}; use crate::config; use crate::gateway_relay; use crate::http::context::AppContext; use crate::http::mcp::dispatch::dispatch_tool_call; use crate::io; use crate::service; use crate::service::event_triggers::store::EventTriggerStore; use crate::service::event_triggers::{FireMode, TriggerAction}; use crate::service::status::StatusBroadcaster; use crate::service::timer::scheduled::{ ScheduledTimer, ScheduledTimerStore, TimerAction, TimerMode, }; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::broadcast; /// Bridge CRDT state-transition events to the watcher broadcast channel and /// spawn the auto-assign subscriber that triggers on active-stage transitions. pub(crate) fn spawn_event_bridges( watcher_tx: broadcast::Sender, project_root: Option, agents: Arc, ) { // Audit log subscriber: write one structured line per pipeline transition. crate::pipeline_state::spawn_audit_log_subscriber(); // CRDT → watcher bridge: translate CRDT stage-transition events into // WatcherEvent::WorkItem so downstream consumers (WebSocket, auto-assign) // see a uniform stream regardless of whether the event originated from the // filesystem watcher or from a CRDT sync peer. { let crdt_watcher_tx = watcher_tx.clone(); if let Some(mut crdt_rx) = crate::crdt_state::subscribe() { tokio::spawn(async move { while let Ok(evt) = crdt_rx.recv().await { let (action, commit_msg) = io::watcher::stage_metadata(&evt.to_stage, &evt.story_id); let watcher_evt = io::watcher::WatcherEvent::WorkItem { stage: evt.to_stage.dir_name().to_string(), item_id: evt.story_id, action: action.to_string(), commit_msg, from_stage: evt.from_stage.map(|s| s.dir_name().to_string()), }; let _ = crdt_watcher_tx.send(watcher_evt); } }); } } // Auto-assign: trigger `auto_assign_available_work` on every work-item // CRDT state-transition event. auto_assign_available_work is idempotent // and noops where there is nothing to do, so firing on every transition // ensures that MergeFailure and other non-"active" stages are covered // without any per-stage special-casing. if let Some(root) = project_root { // Worktree lifecycle subscribers: create worktrees on Stage::Coding // and remove them on terminal stages (Done, Archived, Abandoned, Superseded). crate::agents::pool::worktree_lifecycle::spawn_worktree_create_subscriber( root.clone(), agents.port(), ); crate::agents::pool::worktree_lifecycle::spawn_worktree_cleanup_subscriber(root.clone()); // Mergemaster auto-spawn subscriber: reacts to TransitionFired events for // Stage::MergeFailure { kind: ConflictDetected } and spawns mergemaster // directly from the typed event, eliminating the predicate-mismatch // failure mode of the previous scan-loop approach (story 998). crate::agents::pool::auto_assign::spawn_merge_failure_subscriber( Arc::clone(&agents), root.clone(), ); // Consecutive-failure auto-block subscriber: blocks stories after N // consecutive MergeFailure transitions (story 1018). Bug 1025: takes // the agent pool so it can gate the counter on mergemaster presence — // failures during active recovery iteration do not count toward block. crate::agents::pool::auto_assign::spawn_merge_failure_block_subscriber( Arc::clone(&agents), root.clone(), ); // Content-store GC subscriber: purges all ContentKey::* entries for a // story when it reaches a terminal stage, preventing zombie entries from // accumulating in the process heap (story 996). crate::db::gc::spawn_content_gc_subscriber(); // Cost-rollup bootstrap: pre-populate the register from existing JSONL // so status renderers show correct costs after a server restart. crate::service::agents::cost_rollup::init_from_disk(&root); // Cost-rollup subscriber: snapshots per-story token costs into the // in-memory register whenever a story reaches a terminal stage. crate::agents::pool::cost_rollup_subscriber::spawn_cost_rollup_subscriber(root.clone()); // Done→archived subscriber: archives Done stories after the configured // retention period. Fires on each Stage::Done TransitionFired event and // sleeps for the remaining retention time from merged_at before archiving. // Replaces the periodic sweep_done_to_archived scan that ran on every // sweep_interval_secs tick. { let done_retention = config::ProjectConfig::load(&root) .map(|c| std::time::Duration::from_secs(c.watcher.done_retention_secs)) .unwrap_or_else(|_| std::time::Duration::from_secs(4 * 3600)); io::watcher::spawn_done_to_archived_subscriber(done_retention); } let watcher_auto_rx = watcher_tx.subscribe(); let watcher_auto_agents = Arc::clone(&agents); tokio::spawn(async move { let mut rx = watcher_auto_rx; while let Ok(event) = rx.recv().await { if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event { crate::slog!( "[auto-assign] CRDT transition detected in {stage}/; \ triggering auto-assign." ); watcher_auto_agents.auto_assign_available_work(&root).await; } } }); } } /// Spawn the unified 1-second background tick loop. /// /// Handles only genuinely time-based work: /// - **Timer tick** (every second): fires due pipeline timers by comparing the /// current wall-clock time against each timer's `due_at` field. Cannot be /// reactive because timers encode absolute timestamps that only become /// actionable when the clock reaches them. /// - **Scheduled timer tick** (every second): fires generic scheduled timers /// registered via the `schedule_timer` MCP tool. /// - **Agent watchdog** (every 30 seconds): detects orphaned `Running` agents /// by comparing elapsed time since the last heartbeat. Cannot be reactive /// because the absence of an event (no heartbeat) is what signals a problem; /// a `TransitionFired` subscriber would never fire for a silently crashed agent. /// /// Stage-change-reactive work (done→archived archival, worktree cleanup) has /// been moved to `TransitionFired` subscribers spawned from `spawn_event_bridges`. pub(crate) fn spawn_tick_loop( agents: Arc, timer_store: Arc, scheduled_timer_store: Arc, root: Option, ctx: AppContext, ) { let pending_count = timer_store.list().len(); let scheduled_count = scheduled_timer_store.list().len(); crate::slog!( "[tick] Unified tick loop started; {pending_count} rate-limit timer(s), \ {scheduled_count} scheduled timer(s)" ); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); let mut tick_count: u64 = 0; loop { interval.tick().await; tick_count = tick_count.wrapping_add(1); // Time-based: timers encode absolute due timestamps; only a // wall-clock comparison can determine when one is due. if let Some(ref r) = root { let result = service::timer::tick_once(&timer_store, &agents, r).await; if let Err(msg) = result { crate::slog_error!("[tick] Timer tick panicked: {msg}"); } } // Generic scheduled timer tick. tick_scheduled_timers(&scheduled_timer_store, &ctx).await; // Time-based: the watchdog detects silence (no heartbeat within a // timeout window). A `TransitionFired` subscriber cannot observe the // absence of events, so this must remain on a periodic tick. if tick_count.is_multiple_of(30) { let found = agents.run_watchdog_pass(root.as_deref()); if found > 0 { crate::slog!( "[tick] {found} orphaned agent(s) detected; triggering auto-assign." ); if let Some(ref r) = root { agents.auto_assign_available_work(r).await; } } agents.reap_stale_merge_jobs(); } } }); } /// Fire any due generic scheduled timers and re-arm recurring ones. /// /// Called every second from the unified tick loop. Catch-up semantics: timers /// whose `fire_at` is already in the past fire on the next tick after boot. async fn tick_scheduled_timers(store: &Arc, ctx: &AppContext) { let due = store.take_due(chrono::Utc::now()); if due.is_empty() { return; } crate::slog!("[scheduled-timer] {} timer(s) due", due.len()); for timer in due { fire_scheduled_timer(&timer, ctx).await; // Re-arm recurring timers. if let TimerMode::Recurring { interval_secs } = &timer.mode { let next_fire = timer.fire_at + chrono::Duration::seconds(*interval_secs as i64); let rearmed = ScheduledTimer { id: timer.id.clone(), label: timer.label.clone(), fire_at: next_fire, action: timer.action.clone(), mode: timer.mode.clone(), created_at: timer.created_at, }; if let Err(e) = store.add(rearmed) { crate::slog_error!("[scheduled-timer] Failed to re-arm timer {}: {e}", timer.id); } else { crate::slog!( "[scheduled-timer] Recurring timer {} re-armed for {}", timer.id, next_fire ); } } } } /// Execute a single scheduled timer's action. async fn fire_scheduled_timer(timer: &ScheduledTimer, ctx: &AppContext) { let label = timer.label.as_deref().unwrap_or(&timer.id); match &timer.action { TimerAction::Mcp { method, args } => { crate::slog!( "[scheduled-timer] Firing MCP action '{}' for timer {}", method, timer.id ); match dispatch_tool_call(method, args.clone(), ctx).await { Ok(result) => { crate::slog!( "[scheduled-timer] Timer {} ('{}') fired OK: {}", timer.id, label, result.chars().take(200).collect::() ); } Err(e) => { crate::slog_error!( "[scheduled-timer] Timer {} ('{}') MCP call '{}' failed: {e}", timer.id, label, method ); } } } TimerAction::Prompt { text } => { crate::slog!( "[scheduled-timer] Prompt timer {} ('{}') fired: {}", timer.id, label, text ); } } } /// Spawn the gateway relay task if `gateway_url` is configured in /// `project.toml` or the `HUSKIES_GATEWAY_URL` environment variable. pub(crate) fn spawn_gateway_relay(startup_root: &Option, status: Arc) { let relay_gateway_url = startup_root .as_ref() .and_then(|r| config::ProjectConfig::load(r).ok()) .and_then(|c| c.gateway_url) .or_else(|| std::env::var("HUSKIES_GATEWAY_URL").ok()) .unwrap_or_default(); if !relay_gateway_url.is_empty() { let relay_project_name = startup_root .as_ref() .and_then(|r| config::ProjectConfig::load(r).ok()) .and_then(|c| c.gateway_project) .or_else(|| std::env::var("HUSKIES_GATEWAY_PROJECT").ok()) .or_else(|| { startup_root .as_ref() .and_then(|r| r.file_name()) .map(|n| n.to_string_lossy().into_owned()) }) .unwrap_or_else(|| "project".to_string()); gateway_relay::spawn_relay_task( relay_gateway_url, relay_project_name, status, reqwest::Client::new(), ); } } /// Spawn the event-trigger subscriber. /// /// Subscribes to [`crate::pipeline_state::subscribe_transitions`] and on each /// [`crate::pipeline_state::TransitionFired`] checks every registered trigger's /// predicate. Matching triggers have their action executed: /// /// - `Mcp`: dispatches the named MCP tool via the full `dispatch_tool_call` path. /// - `Prompt`: creates an ephemeral story and starts an agent on it. /// /// `Once` triggers are removed from the store after they fire; `Persistent` /// triggers remain until explicitly cancelled via `cancel_event_trigger`. pub(crate) fn spawn_event_trigger_subscriber( store: Arc, agents: Arc, project_root: Option, ctx: AppContext, ) { let mut rx = crate::pipeline_state::subscribe_transitions(); tokio::spawn(async move { loop { let fired = match rx.recv().await { Ok(f) => f, Err(broadcast::error::RecvError::Lagged(n)) => { crate::slog!( "[event-triggers] Lagged {n} transition events; some triggers may have been skipped" ); continue; } Err(broadcast::error::RecvError::Closed) => { crate::slog!("[event-triggers] Transition channel closed; subscriber stopping"); break; } }; let triggers = store.list(); if triggers.is_empty() { continue; } let mut to_cancel: Vec = Vec::new(); for trigger in &triggers { if !trigger.predicate.matches(&fired) { continue; } crate::slog!( "[event-triggers] Trigger {} matched: story={} {}→{}", trigger.id, fired.story_id.0, crate::pipeline_state::stage_label(&fired.before), crate::pipeline_state::stage_label(&fired.after), ); match &trigger.action { TriggerAction::Mcp { method, args } => { execute_mcp_action(method, args.clone(), &ctx).await; } TriggerAction::Prompt { text } => { execute_prompt_action( text, &fired.story_id.0, &agents, project_root.as_deref(), ) .await; } } if trigger.mode == FireMode::Once { to_cancel.push(trigger.id.clone()); } } if !to_cancel.is_empty() { store.cancel_batch(&to_cancel); } } }); } /// Execute an Mcp action by dispatching through the full MCP tool dispatch path. async fn execute_mcp_action(method: &str, args: serde_json::Value, ctx: &AppContext) { match crate::http::mcp::dispatch::dispatch_tool_call(method, args, ctx).await { Ok(result) => { crate::slog!("[event-triggers] Mcp '{method}' succeeded: {result}"); } Err(e) => { crate::slog!("[event-triggers] Mcp '{method}' failed: {e}"); } } } /// Execute a Prompt action: create an ephemeral story and start an agent on it. async fn execute_prompt_action( text: &str, triggering_story_id: &str, agents: &Arc, project_root: Option<&std::path::Path>, ) { let Some(root) = project_root else { crate::slog!("[event-triggers] Prompt action skipped (no project root configured): {text}"); return; }; // Allocate a new story ID for the ephemeral agent task. let num = crate::db::ops::next_item_number(); let story_id = format!("{num}_trigger_task"); let content = format!( "---\nname: Trigger Task\n---\n\ # Trigger Task\n\n\ _Auto-created by event trigger (source story: {triggering_story_id})_\n\n\ ## Task\n\n\ {text}\n\n\ ## Acceptance Criteria\n\n\ - [ ] Complete the task described above and exit.\n" ); crate::db::write_item_with_content( &story_id, "1_backlog", &content, crate::db::ItemMeta { name: Some("Trigger Task".to_string()), ..Default::default() }, ); if let Err(e) = crate::agents::lifecycle::move_story_to_current(&story_id) { crate::slog!("[event-triggers] Failed to move {story_id} to current: {e}"); return; } match agents.start_agent(root, &story_id, None, None, None).await { Ok(info) => { crate::slog!( "[event-triggers] Started agent {} for prompt task {story_id}", info.agent_name ); } Err(e) => { crate::slog!("[event-triggers] Failed to start agent for prompt task {story_id}: {e}"); } } } /// Spawn the startup reconstruction task: replay the current pipeline state /// through the [`TransitionFired`][crate::pipeline_state::TransitionFired] /// broadcast channel so that all existing subscribers (worktree lifecycle, /// merge-failure auto-spawn, auto-assign) react identically to a live /// transition, then trigger a full auto-assign pass. /// /// Replaces the legacy scan-based `reconcile_on_startup` approach. The CRDT /// is the durable source of truth; replaying it as synthetic self-transitions /// is cheaper, simpler, and idempotent: a second replay produces another burst /// of events that subscribers safely ignore for already-assigned stories. pub(crate) fn spawn_startup_reconciliation( startup_root: Option, startup_agents: Arc, startup_reconciliation_tx: broadcast::Sender, ) { if let Some(root) = startup_root { tokio::spawn(async move { // Purge content-store entries for stories that reached terminal // stages in a previous session (before the GC subscriber was active). crate::db::gc::sweep_zombie_content_on_startup(); crate::slog!( "[startup] Replaying current pipeline state through TransitionFired channel." ); crate::pipeline_state::replay_current_pipeline_state(); crate::slog!("[auto-assign] Scanning pipeline stages for unassigned work."); startup_agents.auto_assign_available_work(&root).await; let _ = startup_reconciliation_tx.send(ReconciliationEvent { story_id: String::new(), status: "done".to_string(), message: "Startup event replay complete.".to_string(), }); }); } }