2026-04-28 19:12:55 +00:00
|
|
|
//! Background tasks: CRDT-event bridge, auto-assign subscriber, unified tick
|
|
|
|
|
//! loop, gateway relay, and startup reconciliation.
|
|
|
|
|
|
|
|
|
|
use crate::agents::{AgentPool, ReconciliationEvent};
|
|
|
|
|
use crate::config;
|
|
|
|
|
use crate::gateway_relay;
|
|
|
|
|
use crate::io;
|
|
|
|
|
use crate::service;
|
|
|
|
|
use crate::service::status::StatusBroadcaster;
|
|
|
|
|
use std::path::PathBuf;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use tokio::sync::broadcast;
|
|
|
|
|
|
|
|
|
|
/// Bridge CRDT state-transition events to the watcher broadcast channel and
|
|
|
|
|
/// spawn the auto-assign subscriber that triggers on active-stage transitions.
|
|
|
|
|
pub(crate) fn spawn_event_bridges(
|
|
|
|
|
watcher_tx: broadcast::Sender<io::watcher::WatcherEvent>,
|
|
|
|
|
project_root: Option<PathBuf>,
|
|
|
|
|
agents: Arc<AgentPool>,
|
|
|
|
|
) {
|
2026-05-13 23:18:59 +00:00
|
|
|
// Audit log subscriber: write one structured line per pipeline transition.
|
|
|
|
|
crate::pipeline_state::spawn_audit_log_subscriber();
|
|
|
|
|
|
2026-04-28 19:12:55 +00:00
|
|
|
// CRDT → watcher bridge: translate CRDT stage-transition events into
|
|
|
|
|
// WatcherEvent::WorkItem so downstream consumers (WebSocket, auto-assign)
|
|
|
|
|
// see a uniform stream regardless of whether the event originated from the
|
|
|
|
|
// filesystem watcher or from a CRDT sync peer.
|
|
|
|
|
{
|
|
|
|
|
let crdt_watcher_tx = watcher_tx.clone();
|
|
|
|
|
if let Some(mut crdt_rx) = crate::crdt_state::subscribe() {
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
while let Ok(evt) = crdt_rx.recv().await {
|
|
|
|
|
let (action, commit_msg) =
|
2026-05-13 13:17:46 +00:00
|
|
|
io::watcher::stage_metadata(&evt.to_stage, &evt.story_id);
|
2026-04-28 19:12:55 +00:00
|
|
|
let watcher_evt = io::watcher::WatcherEvent::WorkItem {
|
2026-05-13 13:17:46 +00:00
|
|
|
stage: evt.to_stage.dir_name().to_string(),
|
2026-04-28 19:12:55 +00:00
|
|
|
item_id: evt.story_id,
|
|
|
|
|
action: action.to_string(),
|
|
|
|
|
commit_msg,
|
2026-05-13 13:17:46 +00:00
|
|
|
from_stage: evt.from_stage.map(|s| s.dir_name().to_string()),
|
2026-04-28 19:12:55 +00:00
|
|
|
};
|
|
|
|
|
let _ = crdt_watcher_tx.send(watcher_evt);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-13 11:47:27 +00:00
|
|
|
// Auto-assign: trigger `auto_assign_available_work` on every work-item
|
|
|
|
|
// CRDT state-transition event. auto_assign_available_work is idempotent
|
|
|
|
|
// and noops where there is nothing to do, so firing on every transition
|
|
|
|
|
// ensures that MergeFailure and other non-"active" stages are covered
|
|
|
|
|
// without any per-stage special-casing.
|
2026-04-28 19:12:55 +00:00
|
|
|
if let Some(root) = project_root {
|
2026-05-13 21:37:07 +00:00
|
|
|
// Worktree lifecycle subscribers: create worktrees on Stage::Coding
|
|
|
|
|
// and remove them on terminal stages (Done, Archived, Abandoned, Superseded).
|
|
|
|
|
crate::agents::pool::worktree_lifecycle::spawn_worktree_create_subscriber(
|
|
|
|
|
root.clone(),
|
|
|
|
|
agents.port(),
|
|
|
|
|
);
|
|
|
|
|
crate::agents::pool::worktree_lifecycle::spawn_worktree_cleanup_subscriber(root.clone());
|
|
|
|
|
|
2026-05-13 19:29:08 +00:00
|
|
|
// Mergemaster auto-spawn subscriber: reacts to TransitionFired events for
|
|
|
|
|
// Stage::MergeFailure { kind: ConflictDetected } and spawns mergemaster
|
|
|
|
|
// directly from the typed event, eliminating the predicate-mismatch
|
|
|
|
|
// failure mode of the previous scan-loop approach (story 998).
|
|
|
|
|
crate::agents::pool::auto_assign::spawn_merge_failure_subscriber(
|
|
|
|
|
Arc::clone(&agents),
|
|
|
|
|
root.clone(),
|
|
|
|
|
);
|
|
|
|
|
|
2026-05-14 09:33:50 +00:00
|
|
|
// Consecutive-failure auto-block subscriber: blocks stories after N
|
2026-05-14 12:13:37 +01:00
|
|
|
// consecutive MergeFailure transitions (story 1018). Bug 1025: takes
|
|
|
|
|
// the agent pool so it can gate the counter on mergemaster presence —
|
|
|
|
|
// failures during active recovery iteration do not count toward block.
|
|
|
|
|
crate::agents::pool::auto_assign::spawn_merge_failure_block_subscriber(
|
|
|
|
|
Arc::clone(&agents),
|
|
|
|
|
root.clone(),
|
|
|
|
|
);
|
2026-05-14 09:33:50 +00:00
|
|
|
|
2026-05-13 22:23:46 +00:00
|
|
|
// Content-store GC subscriber: purges all ContentKey::* entries for a
|
|
|
|
|
// story when it reaches a terminal stage, preventing zombie entries from
|
|
|
|
|
// accumulating in the process heap (story 996).
|
|
|
|
|
crate::db::gc::spawn_content_gc_subscriber();
|
|
|
|
|
|
2026-05-13 23:51:12 +00:00
|
|
|
// Cost-rollup bootstrap: pre-populate the register from existing JSONL
|
|
|
|
|
// so status renderers show correct costs after a server restart.
|
|
|
|
|
crate::service::agents::cost_rollup::init_from_disk(&root);
|
|
|
|
|
|
|
|
|
|
// Cost-rollup subscriber: snapshots per-story token costs into the
|
|
|
|
|
// in-memory register whenever a story reaches a terminal stage.
|
|
|
|
|
crate::agents::pool::cost_rollup_subscriber::spawn_cost_rollup_subscriber(root.clone());
|
|
|
|
|
|
2026-05-14 08:48:11 +00:00
|
|
|
// Done→archived subscriber: archives Done stories after the configured
|
|
|
|
|
// retention period. Fires on each Stage::Done TransitionFired event and
|
|
|
|
|
// sleeps for the remaining retention time from merged_at before archiving.
|
|
|
|
|
// Replaces the periodic sweep_done_to_archived scan that ran on every
|
|
|
|
|
// sweep_interval_secs tick.
|
|
|
|
|
{
|
|
|
|
|
let done_retention = config::ProjectConfig::load(&root)
|
|
|
|
|
.map(|c| std::time::Duration::from_secs(c.watcher.done_retention_secs))
|
|
|
|
|
.unwrap_or_else(|_| std::time::Duration::from_secs(4 * 3600));
|
|
|
|
|
io::watcher::spawn_done_to_archived_subscriber(done_retention);
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-28 19:12:55 +00:00
|
|
|
let watcher_auto_rx = watcher_tx.subscribe();
|
|
|
|
|
let watcher_auto_agents = Arc::clone(&agents);
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
let mut rx = watcher_auto_rx;
|
|
|
|
|
while let Ok(event) = rx.recv().await {
|
2026-05-13 11:47:27 +00:00
|
|
|
if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event {
|
2026-04-28 19:12:55 +00:00
|
|
|
crate::slog!(
|
|
|
|
|
"[auto-assign] CRDT transition detected in {stage}/; \
|
|
|
|
|
triggering auto-assign."
|
|
|
|
|
);
|
|
|
|
|
watcher_auto_agents.auto_assign_available_work(&root).await;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Spawn the unified 1-second background tick loop.
|
|
|
|
|
///
|
2026-05-14 08:48:11 +00:00
|
|
|
/// Handles only genuinely time-based work:
|
|
|
|
|
/// - **Timer tick** (every second): fires due pipeline timers by comparing the
|
|
|
|
|
/// current wall-clock time against each timer's `due_at` field. Cannot be
|
|
|
|
|
/// reactive because timers encode absolute timestamps that only become
|
|
|
|
|
/// actionable when the clock reaches them.
|
|
|
|
|
/// - **Agent watchdog** (every 30 seconds): detects orphaned `Running` agents
|
|
|
|
|
/// by comparing elapsed time since the last heartbeat. Cannot be reactive
|
|
|
|
|
/// because the absence of an event (no heartbeat) is what signals a problem;
|
|
|
|
|
/// a `TransitionFired` subscriber would never fire for a silently crashed agent.
|
|
|
|
|
///
|
|
|
|
|
/// Stage-change-reactive work (done→archived archival, worktree cleanup) has
|
|
|
|
|
/// been moved to `TransitionFired` subscribers spawned from `spawn_event_bridges`.
|
2026-04-28 19:12:55 +00:00
|
|
|
pub(crate) fn spawn_tick_loop(
|
|
|
|
|
agents: Arc<AgentPool>,
|
|
|
|
|
timer_store: Arc<service::timer::TimerStore>,
|
|
|
|
|
root: Option<PathBuf>,
|
|
|
|
|
) {
|
|
|
|
|
let pending_count = timer_store.list().len();
|
|
|
|
|
crate::slog!("[tick] Unified tick loop started; {pending_count} pending timer(s)");
|
|
|
|
|
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
|
|
|
|
|
let mut tick_count: u64 = 0;
|
|
|
|
|
loop {
|
|
|
|
|
interval.tick().await;
|
|
|
|
|
tick_count = tick_count.wrapping_add(1);
|
|
|
|
|
|
2026-05-14 08:48:11 +00:00
|
|
|
// Time-based: timers encode absolute due timestamps; only a
|
|
|
|
|
// wall-clock comparison can determine when one is due.
|
2026-04-28 19:12:55 +00:00
|
|
|
if let Some(ref r) = root {
|
|
|
|
|
let result = service::timer::tick_once(&timer_store, &agents, r).await;
|
|
|
|
|
if let Err(msg) = result {
|
|
|
|
|
crate::slog_error!("[tick] Timer tick panicked: {msg}");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-14 08:48:11 +00:00
|
|
|
// Time-based: the watchdog detects silence (no heartbeat within a
|
|
|
|
|
// timeout window). A `TransitionFired` subscriber cannot observe the
|
|
|
|
|
// absence of events, so this must remain on a periodic tick.
|
2026-04-28 19:12:55 +00:00
|
|
|
if tick_count.is_multiple_of(30) {
|
|
|
|
|
let found = agents.run_watchdog_pass(root.as_deref());
|
|
|
|
|
if found > 0 {
|
|
|
|
|
crate::slog!(
|
|
|
|
|
"[tick] {found} orphaned agent(s) detected; triggering auto-assign."
|
|
|
|
|
);
|
|
|
|
|
if let Some(ref r) = root {
|
|
|
|
|
agents.auto_assign_available_work(r).await;
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-04-29 08:51:22 +00:00
|
|
|
agents.reap_stale_merge_jobs();
|
2026-04-28 19:12:55 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Spawn the gateway relay task if `gateway_url` is configured in
|
|
|
|
|
/// `project.toml` or the `HUSKIES_GATEWAY_URL` environment variable.
|
|
|
|
|
pub(crate) fn spawn_gateway_relay(startup_root: &Option<PathBuf>, status: Arc<StatusBroadcaster>) {
|
|
|
|
|
let relay_gateway_url = startup_root
|
|
|
|
|
.as_ref()
|
|
|
|
|
.and_then(|r| config::ProjectConfig::load(r).ok())
|
|
|
|
|
.and_then(|c| c.gateway_url)
|
|
|
|
|
.or_else(|| std::env::var("HUSKIES_GATEWAY_URL").ok())
|
|
|
|
|
.unwrap_or_default();
|
|
|
|
|
|
|
|
|
|
if !relay_gateway_url.is_empty() {
|
|
|
|
|
let relay_project_name = startup_root
|
|
|
|
|
.as_ref()
|
|
|
|
|
.and_then(|r| config::ProjectConfig::load(r).ok())
|
|
|
|
|
.and_then(|c| c.gateway_project)
|
|
|
|
|
.or_else(|| std::env::var("HUSKIES_GATEWAY_PROJECT").ok())
|
|
|
|
|
.or_else(|| {
|
|
|
|
|
startup_root
|
|
|
|
|
.as_ref()
|
|
|
|
|
.and_then(|r| r.file_name())
|
|
|
|
|
.map(|n| n.to_string_lossy().into_owned())
|
|
|
|
|
})
|
|
|
|
|
.unwrap_or_else(|| "project".to_string());
|
|
|
|
|
|
|
|
|
|
gateway_relay::spawn_relay_task(
|
|
|
|
|
relay_gateway_url,
|
|
|
|
|
relay_project_name,
|
|
|
|
|
status,
|
|
|
|
|
reqwest::Client::new(),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-13 23:46:30 +00:00
|
|
|
/// Spawn the startup reconstruction task: replay the current pipeline state
|
|
|
|
|
/// through the [`TransitionFired`][crate::pipeline_state::TransitionFired]
|
|
|
|
|
/// broadcast channel so that all existing subscribers (worktree lifecycle,
|
|
|
|
|
/// merge-failure auto-spawn, auto-assign) react identically to a live
|
|
|
|
|
/// transition, then trigger a full auto-assign pass.
|
|
|
|
|
///
|
|
|
|
|
/// Replaces the legacy scan-based `reconcile_on_startup` approach. The CRDT
|
|
|
|
|
/// is the durable source of truth; replaying it as synthetic self-transitions
|
|
|
|
|
/// is cheaper, simpler, and idempotent: a second replay produces another burst
|
|
|
|
|
/// of events that subscribers safely ignore for already-assigned stories.
|
2026-04-28 19:12:55 +00:00
|
|
|
pub(crate) fn spawn_startup_reconciliation(
|
|
|
|
|
startup_root: Option<PathBuf>,
|
|
|
|
|
startup_agents: Arc<AgentPool>,
|
|
|
|
|
startup_reconciliation_tx: broadcast::Sender<ReconciliationEvent>,
|
|
|
|
|
) {
|
|
|
|
|
if let Some(root) = startup_root {
|
|
|
|
|
tokio::spawn(async move {
|
2026-05-13 22:23:46 +00:00
|
|
|
// Purge content-store entries for stories that reached terminal
|
|
|
|
|
// stages in a previous session (before the GC subscriber was active).
|
|
|
|
|
crate::db::gc::sweep_zombie_content_on_startup();
|
2026-05-13 23:46:30 +00:00
|
|
|
crate::slog!(
|
|
|
|
|
"[startup] Replaying current pipeline state through TransitionFired channel."
|
|
|
|
|
);
|
|
|
|
|
crate::pipeline_state::replay_current_pipeline_state();
|
2026-04-28 19:12:55 +00:00
|
|
|
crate::slog!("[auto-assign] Scanning pipeline stages for unassigned work.");
|
|
|
|
|
startup_agents.auto_assign_available_work(&root).await;
|
2026-05-13 23:46:30 +00:00
|
|
|
let _ = startup_reconciliation_tx.send(ReconciliationEvent {
|
|
|
|
|
story_id: String::new(),
|
|
|
|
|
status: "done".to_string(),
|
|
|
|
|
message: "Startup event replay complete.".to_string(),
|
|
|
|
|
});
|
2026-04-28 19:12:55 +00:00
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|