//! Background tasks: CRDT-event bridge, auto-assign subscriber, unified tick //! loop, gateway relay, and startup reconciliation. use crate::agents::{AgentPool, ReconciliationEvent}; use crate::config; use crate::gateway_relay; use crate::io; use crate::pipeline_state; use crate::service; use crate::service::status::StatusBroadcaster; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::broadcast; /// Bridge CRDT state-transition events to the watcher broadcast channel and /// spawn the auto-assign subscriber that triggers on active-stage transitions. pub(crate) fn spawn_event_bridges( watcher_tx: broadcast::Sender, project_root: Option, agents: Arc, ) { // CRDT → watcher bridge: translate CRDT stage-transition events into // WatcherEvent::WorkItem so downstream consumers (WebSocket, auto-assign) // see a uniform stream regardless of whether the event originated from the // filesystem watcher or from a CRDT sync peer. { let crdt_watcher_tx = watcher_tx.clone(); let crdt_prune_root = project_root.clone(); if let Some(mut crdt_rx) = crate::crdt_state::subscribe() { tokio::spawn(async move { while let Ok(evt) = crdt_rx.recv().await { if crate::pipeline_state::Stage::from_dir(&evt.to_stage) .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Archived { .. })) && let Some(root) = crdt_prune_root.as_ref().cloned() { let story_id = evt.story_id.clone(); tokio::task::spawn_blocking(move || { if let Err(e) = crate::worktree::prune_worktree_sync(&root, &story_id) { crate::slog!("[crdt] worktree prune failed for {story_id}: {e}"); } }); } let (action, commit_msg) = io::watcher::stage_metadata(&evt.to_stage, &evt.story_id) .unwrap_or(("update", format!("huskies: update {}", evt.story_id))); let watcher_evt = io::watcher::WatcherEvent::WorkItem { stage: evt.to_stage, item_id: evt.story_id, action: action.to_string(), commit_msg, from_stage: evt.from_stage, }; let _ = crdt_watcher_tx.send(watcher_evt); } }); } } // Auto-assign: trigger `auto_assign_available_work` whenever a work item // enters an active pipeline stage (2_current/, 3_qa/, 4_merge/). if let Some(root) = project_root { let watcher_auto_rx = watcher_tx.subscribe(); let watcher_auto_agents = Arc::clone(&agents); tokio::spawn(async move { let mut rx = watcher_auto_rx; while let Ok(event) = rx.recv().await { if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event && pipeline_state::Stage::from_dir(stage.as_str()) .is_some_and(|s| s.is_active()) { crate::slog!( "[auto-assign] CRDT transition detected in {stage}/; \ triggering auto-assign." ); watcher_auto_agents.auto_assign_available_work(&root).await; } } }); } } /// Spawn the unified 1-second background tick loop. /// /// Fires due timers, runs the agent watchdog every 30 ticks, and promotes /// done→archived items every `sweep_interval_secs` ticks. pub(crate) fn spawn_tick_loop( agents: Arc, timer_store: Arc, root: Option, ) { let sweep_cfg = root .as_ref() .and_then(|r| config::ProjectConfig::load(r).ok()) .map(|c| c.watcher) .unwrap_or_default(); let sweep_every = sweep_cfg.sweep_interval_secs.max(1); let done_retention = std::time::Duration::from_secs(sweep_cfg.done_retention_secs); let pending_count = timer_store.list().len(); crate::slog!("[tick] Unified tick loop started; {pending_count} pending timer(s)"); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); let mut tick_count: u64 = 0; loop { interval.tick().await; tick_count = tick_count.wrapping_add(1); // Timer: fire due timers every second. if let Some(ref r) = root { let result = service::timer::tick_once(&timer_store, &agents, r).await; if let Err(msg) = result { crate::slog_error!("[tick] Timer tick panicked: {msg}"); } } // Watchdog: detect orphaned Running agents every 30 ticks. if tick_count.is_multiple_of(30) { let found = agents.run_watchdog_pass(root.as_deref()); if found > 0 { crate::slog!( "[tick] {found} orphaned agent(s) detected; triggering auto-assign." ); if let Some(ref r) = root { agents.auto_assign_available_work(r).await; } } } // Sweep: promote done→archived every sweep_interval_secs ticks. if tick_count.is_multiple_of(sweep_every) { io::watcher::sweep_done_to_archived(done_retention); } } }); } /// Spawn the gateway relay task if `gateway_url` is configured in /// `project.toml` or the `HUSKIES_GATEWAY_URL` environment variable. pub(crate) fn spawn_gateway_relay(startup_root: &Option, status: Arc) { let relay_gateway_url = startup_root .as_ref() .and_then(|r| config::ProjectConfig::load(r).ok()) .and_then(|c| c.gateway_url) .or_else(|| std::env::var("HUSKIES_GATEWAY_URL").ok()) .unwrap_or_default(); if !relay_gateway_url.is_empty() { let relay_project_name = startup_root .as_ref() .and_then(|r| config::ProjectConfig::load(r).ok()) .and_then(|c| c.gateway_project) .or_else(|| std::env::var("HUSKIES_GATEWAY_PROJECT").ok()) .or_else(|| { startup_root .as_ref() .and_then(|r| r.file_name()) .map(|n| n.to_string_lossy().into_owned()) }) .unwrap_or_else(|| "project".to_string()); gateway_relay::spawn_relay_task( relay_gateway_url, relay_project_name, status, reqwest::Client::new(), ); } } /// Spawn the startup reconciliation task: reconcile any stories whose agent /// work was committed while the server was offline, then auto-assign free agents. pub(crate) fn spawn_startup_reconciliation( startup_root: Option, startup_agents: Arc, startup_reconciliation_tx: broadcast::Sender, ) { if let Some(root) = startup_root { tokio::spawn(async move { crate::slog!("[startup] Reconciling completed worktrees from previous session."); startup_agents .reconcile_on_startup(&root, &startup_reconciliation_tx) .await; crate::slog!("[auto-assign] Scanning pipeline stages for unassigned work."); startup_agents.auto_assign_available_work(&root).await; }); } }