Files
huskies/server/src/startup/tick_loop.rs
T

208 lines
8.6 KiB
Rust
Raw Normal View History

2026-04-28 19:12:55 +00:00
//! Background tasks: CRDT-event bridge, auto-assign subscriber, unified tick
//! loop, gateway relay, and startup reconciliation.
use crate::agents::{AgentPool, ReconciliationEvent};
use crate::config;
use crate::gateway_relay;
use crate::io;
use crate::service;
use crate::service::status::StatusBroadcaster;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::broadcast;
/// Bridge CRDT state-transition events to the watcher broadcast channel and
/// spawn the auto-assign subscriber that triggers on active-stage transitions.
pub(crate) fn spawn_event_bridges(
watcher_tx: broadcast::Sender<io::watcher::WatcherEvent>,
project_root: Option<PathBuf>,
agents: Arc<AgentPool>,
) {
// CRDT → watcher bridge: translate CRDT stage-transition events into
// WatcherEvent::WorkItem so downstream consumers (WebSocket, auto-assign)
// see a uniform stream regardless of whether the event originated from the
// filesystem watcher or from a CRDT sync peer.
{
let crdt_watcher_tx = watcher_tx.clone();
let crdt_prune_root = project_root.clone();
if let Some(mut crdt_rx) = crate::crdt_state::subscribe() {
tokio::spawn(async move {
while let Ok(evt) = crdt_rx.recv().await {
2026-05-13 13:17:46 +00:00
if matches!(evt.to_stage, crate::pipeline_state::Stage::Archived { .. })
2026-04-28 19:12:55 +00:00
&& let Some(root) = crdt_prune_root.as_ref().cloned()
{
let story_id = evt.story_id.clone();
2026-04-29 21:14:27 +00:00
tokio::spawn(async move {
let config =
crate::config::ProjectConfig::load(&root).unwrap_or_default();
crate::worktree::remove_worktree_by_story_id(&root, &story_id, &config)
.await
.ok();
2026-04-28 19:12:55 +00:00
});
}
let (action, commit_msg) =
2026-05-13 13:17:46 +00:00
io::watcher::stage_metadata(&evt.to_stage, &evt.story_id);
2026-04-28 19:12:55 +00:00
let watcher_evt = io::watcher::WatcherEvent::WorkItem {
2026-05-13 13:17:46 +00:00
stage: evt.to_stage.dir_name().to_string(),
2026-04-28 19:12:55 +00:00
item_id: evt.story_id,
action: action.to_string(),
commit_msg,
2026-05-13 13:17:46 +00:00
from_stage: evt.from_stage.map(|s| s.dir_name().to_string()),
2026-04-28 19:12:55 +00:00
};
let _ = crdt_watcher_tx.send(watcher_evt);
}
});
}
}
2026-05-13 11:47:27 +00:00
// Auto-assign: trigger `auto_assign_available_work` on every work-item
// CRDT state-transition event. auto_assign_available_work is idempotent
// and noops where there is nothing to do, so firing on every transition
// ensures that MergeFailure and other non-"active" stages are covered
// without any per-stage special-casing.
2026-04-28 19:12:55 +00:00
if let Some(root) = project_root {
let watcher_auto_rx = watcher_tx.subscribe();
let watcher_auto_agents = Arc::clone(&agents);
tokio::spawn(async move {
let mut rx = watcher_auto_rx;
while let Ok(event) = rx.recv().await {
2026-05-13 11:47:27 +00:00
if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event {
2026-04-28 19:12:55 +00:00
crate::slog!(
"[auto-assign] CRDT transition detected in {stage}/; \
triggering auto-assign."
);
watcher_auto_agents.auto_assign_available_work(&root).await;
}
}
});
}
}
/// Spawn the unified 1-second background tick loop.
///
2026-04-29 10:28:18 +00:00
/// Fires due timers, runs the agent watchdog every 30 ticks, promotes
/// done→archived items every `sweep_interval_secs` ticks, and removes
/// orphaned worktrees every `worktree_sweep_interval_secs` ticks (default
/// 1200, i.e. 20 minutes).
2026-04-28 19:12:55 +00:00
pub(crate) fn spawn_tick_loop(
agents: Arc<AgentPool>,
timer_store: Arc<service::timer::TimerStore>,
root: Option<PathBuf>,
) {
2026-04-29 10:28:18 +00:00
let project_cfg = root
2026-04-28 19:12:55 +00:00
.as_ref()
2026-04-29 10:28:18 +00:00
.and_then(|r| config::ProjectConfig::load(r).ok());
let sweep_cfg = project_cfg
.as_ref()
.map(|c| c.watcher.clone())
2026-04-28 19:12:55 +00:00
.unwrap_or_default();
let sweep_every = sweep_cfg.sweep_interval_secs.max(1);
let done_retention = std::time::Duration::from_secs(sweep_cfg.done_retention_secs);
2026-04-29 10:28:18 +00:00
// Capture config for the worktree sweep (read once at startup).
let worktree_sweep_config = project_cfg.unwrap_or_default();
// Worktree orphan sweep: every 20 minutes by default.
let worktree_sweep_every: u64 = 1200;
2026-04-28 19:12:55 +00:00
let pending_count = timer_store.list().len();
crate::slog!("[tick] Unified tick loop started; {pending_count} pending timer(s)");
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
let mut tick_count: u64 = 0;
loop {
interval.tick().await;
tick_count = tick_count.wrapping_add(1);
// Timer: fire due timers every second.
if let Some(ref r) = root {
let result = service::timer::tick_once(&timer_store, &agents, r).await;
if let Err(msg) = result {
crate::slog_error!("[tick] Timer tick panicked: {msg}");
}
}
// Watchdog: detect orphaned Running agents every 30 ticks.
2026-04-29 08:51:22 +00:00
// Also reap stale Running merge_jobs from previous server instances.
2026-04-28 19:12:55 +00:00
if tick_count.is_multiple_of(30) {
let found = agents.run_watchdog_pass(root.as_deref());
if found > 0 {
crate::slog!(
"[tick] {found} orphaned agent(s) detected; triggering auto-assign."
);
if let Some(ref r) = root {
agents.auto_assign_available_work(r).await;
}
}
2026-04-29 08:51:22 +00:00
agents.reap_stale_merge_jobs();
2026-04-28 19:12:55 +00:00
}
// Sweep: promote done→archived every sweep_interval_secs ticks.
if tick_count.is_multiple_of(sweep_every) {
io::watcher::sweep_done_to_archived(done_retention);
}
2026-04-29 10:28:18 +00:00
// Worktree orphan sweep: remove worktrees for done/archived/absent stories.
if tick_count.is_multiple_of(worktree_sweep_every)
&& let Some(ref r) = root
{
let removed =
crate::worktree::sweep_orphaned_worktrees(r, &worktree_sweep_config).await;
if removed > 0 {
crate::slog!("[worktree-sweep] Removed {removed} orphaned worktree(s).");
}
}
2026-04-28 19:12:55 +00:00
}
});
}
/// Spawn the gateway relay task if `gateway_url` is configured in
/// `project.toml` or the `HUSKIES_GATEWAY_URL` environment variable.
pub(crate) fn spawn_gateway_relay(startup_root: &Option<PathBuf>, status: Arc<StatusBroadcaster>) {
let relay_gateway_url = startup_root
.as_ref()
.and_then(|r| config::ProjectConfig::load(r).ok())
.and_then(|c| c.gateway_url)
.or_else(|| std::env::var("HUSKIES_GATEWAY_URL").ok())
.unwrap_or_default();
if !relay_gateway_url.is_empty() {
let relay_project_name = startup_root
.as_ref()
.and_then(|r| config::ProjectConfig::load(r).ok())
.and_then(|c| c.gateway_project)
.or_else(|| std::env::var("HUSKIES_GATEWAY_PROJECT").ok())
.or_else(|| {
startup_root
.as_ref()
.and_then(|r| r.file_name())
.map(|n| n.to_string_lossy().into_owned())
})
.unwrap_or_else(|| "project".to_string());
gateway_relay::spawn_relay_task(
relay_gateway_url,
relay_project_name,
status,
reqwest::Client::new(),
);
}
}
/// Spawn the startup reconciliation task: reconcile any stories whose agent
/// work was committed while the server was offline, then auto-assign free agents.
pub(crate) fn spawn_startup_reconciliation(
startup_root: Option<PathBuf>,
startup_agents: Arc<AgentPool>,
startup_reconciliation_tx: broadcast::Sender<ReconciliationEvent>,
) {
if let Some(root) = startup_root {
tokio::spawn(async move {
crate::slog!("[startup] Reconciling completed worktrees from previous session.");
startup_agents
.reconcile_on_startup(&root, &startup_reconciliation_tx)
.await;
crate::slog!("[auto-assign] Scanning pipeline stages for unassigned work.");
startup_agents.auto_assign_available_work(&root).await;
});
}
}