From 91be0ac47f2499854454399b9877c9b591b6d95b Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 10 Apr 2026 17:34:41 +0000 Subject: [PATCH] huskies: merge 534_refactor_unify_timer_tick_watchdog_and_watcher_sweep_into_a_single_1_second_tick_loop --- .../src/agents/pool/auto_assign/watchdog.rs | 30 ++------ server/src/chat/timer.rs | 39 ++--------- server/src/chat/transport/matrix/bot/run.rs | 5 -- server/src/io/watcher.rs | 69 ++++++------------ server/src/main.rs | 70 ++++++++++++++++--- 5 files changed, 88 insertions(+), 125 deletions(-) diff --git a/server/src/agents/pool/auto_assign/watchdog.rs b/server/src/agents/pool/auto_assign/watchdog.rs index c374c6ba..2fa40c95 100644 --- a/server/src/agents/pool/auto_assign/watchdog.rs +++ b/server/src/agents/pool/auto_assign/watchdog.rs @@ -1,8 +1,7 @@ //! Watchdog task: detects orphaned agents and triggers auto-assign. use std::collections::HashMap; -use std::path::PathBuf; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; use tokio::sync::broadcast; use crate::slog; @@ -73,30 +72,11 @@ impl AgentPool { check_orphaned_agents(&self.agents); } - /// Spawn a background watchdog task that periodically checks for Running agents - /// whose underlying task has already finished (orphaned entries). Any such agent - /// is marked Failed and an Error event is emitted so that `wait_for_agent` unblocks. + /// Run one watchdog pass and return the number of orphaned agents detected. /// - /// The watchdog runs every 30 seconds. It is a safety net for edge cases where the - /// PTY read loop exits without updating the agent status (e.g. a panic in the - /// spawn_blocking task, or an external SIGKILL that closes the PTY fd immediately). - /// - /// When orphaned agents are detected and a `project_root` is provided, auto-assign - /// is triggered so that free agents can pick up unassigned work. - pub fn spawn_watchdog(pool: Arc, project_root: Option) { - tokio::spawn(async move { - let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); - loop { - interval.tick().await; - let found = check_orphaned_agents(&pool.agents); - if found > 0 - && let Some(ref root) = project_root - { - slog!("[watchdog] {found} orphaned agent(s) detected; triggering auto-assign."); - pool.auto_assign_available_work(root).await; - } - } - }); + /// Called by the unified background tick loop every 30 ticks. + pub fn run_watchdog_pass(&self) -> usize { + check_orphaned_agents(&self.agents) } } diff --git a/server/src/chat/timer.rs b/server/src/chat/timer.rs index 8267d2a7..dbca8d28 100644 --- a/server/src/chat/timer.rs +++ b/server/src/chat/timer.rs @@ -1,8 +1,8 @@ //! Deferred agent start via one-shot timers. //! -//! Provides [`TimerStore`] for persisting timers to `.huskies/timers.json`, -//! a 30-second tick loop ([`spawn_timer_tick_loop`]) that fires due timers, +//! Provides [`TimerStore`] for persisting timers to `.huskies/timers.json` //! and command parsing / handling for the `timer` bot command. +//! Due timers are fired by the unified background tick loop in `main`. use chrono::{DateTime, Duration, Local, NaiveTime, TimeZone, Utc}; use chrono_tz::Tz; @@ -134,43 +134,12 @@ impl TimerStore { // ── Tick loop ────────────────────────────────────────────────────────────── -/// Spawn a background tokio task that fires due timers every 1 second. -/// -/// Same pattern as the watchdog in `agents::pool::auto_assign`. -/// When a timer fires, `start_agent` is called for the story. If all coders -/// are busy the story remains in `2_current/` and auto-assign will pick it up. -/// -/// The loop body is wrapped in `catch_unwind` so a panic on any single tick -/// does not silently kill the background task. -pub fn spawn_timer_tick_loop( - store: Arc, - agents: Arc, - project_root: PathBuf, -) { - let pending_count = store.list().len(); - crate::slog!( - "[timer] Tick loop started; {pending_count} pending timer(s) loaded" - ); - - tokio::spawn(async move { - let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); - loop { - interval.tick().await; - - // Wrap the tick body so a panic doesn't kill the loop. - let tick_result = tick_once(&store, &agents, &project_root).await; - if let Err(msg) = tick_result { - crate::slog_error!("[timer] Tick panicked: {msg}"); - } - } - }); -} - /// Execute one tick of the timer loop. /// +/// Called by the unified background tick loop every second. /// Separated from the loop so we can catch panics at the call-site. /// Returns `Err` only when the tick panicked (the panic message is returned). -async fn tick_once( +pub(crate) async fn tick_once( store: &Arc, agents: &Arc, project_root: &Path, diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index a64fe41b..54af521d 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -220,11 +220,6 @@ pub async fn run_bot( let timer_store = Arc::new(crate::chat::timer::TimerStore::load( project_root.join(".huskies").join("timers.json"), )); - crate::chat::timer::spawn_timer_tick_loop( - Arc::clone(&timer_store), - Arc::clone(&agents), - project_root.clone(), - ); // Auto-schedule timers when an agent hits a hard rate limit. crate::chat::timer::spawn_rate_limit_auto_scheduler( Arc::clone(&timer_store), diff --git a/server/src/io/watcher.rs b/server/src/io/watcher.rs index 2d833c35..57f49ccd 100644 --- a/server/src/io/watcher.rs +++ b/server/src/io/watcher.rs @@ -19,7 +19,6 @@ //! via exit-code inspection and silently skips the commit while still broadcasting //! the event so connected clients stay in sync. -use crate::config::{ProjectConfig, WatcherConfig}; use crate::slog; use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher, recommended_watcher}; use serde::Serialize; @@ -328,7 +327,7 @@ fn flush_pending( /// All state is read from and written to CRDT — no filesystem access. /// Worktree pruning is handled separately by the CRDT event subscriber. pub(crate) fn sweep_done_to_archived(done_retention: Duration) { - use crate::pipeline_state::{Stage, read_all_typed}; + use crate::pipeline_state::{PipelineEvent, Stage, stage_dir_name, transition, read_all_typed}; for item in read_all_typed() { if let Stage::Done { merged_at, .. } = &item.stage { @@ -337,9 +336,24 @@ pub(crate) fn sweep_done_to_archived(done_retention: Duration) { .to_std() .unwrap_or_default(); if age >= done_retention { - let story_id = &item.story_id.0; - crate::db::move_item_stage(story_id, "6_archived", None); - slog!("[watcher] sweep: promoted {story_id} → 6_archived/"); + let story_id = item.story_id.0.clone(); + match transition(item.stage.clone(), PipelineEvent::Accepted) { + Ok(new_stage) => { + crate::crdt_state::write_item( + &story_id, + stage_dir_name(&new_stage), + None, + None, + None, + Some(false), + None, + ); + slog!("[watcher] sweep: promoted {story_id} → 6_archived/"); + } + Err(e) => { + slog!("[watcher] sweep: transition error for {story_id}: {e}"); + } + } } } } @@ -360,7 +374,6 @@ pub(crate) fn sweep_done_to_archived(done_retention: Duration) { pub fn start_watcher( git_root: PathBuf, event_tx: broadcast::Sender, - watcher_config: WatcherConfig, ) { std::thread::spawn(move || { let (notify_tx, notify_rx) = mpsc::channel::>(); @@ -389,27 +402,13 @@ pub fn start_watcher( } } - slog!("[watcher] watching config files and running sweep timer"); + slog!("[watcher] watching config files for hot-reload"); const DEBOUNCE: Duration = Duration::from_millis(300); - // Mutable sweep config — hot-reloaded when project.toml changes. - let mut sweep_interval = Duration::from_secs(watcher_config.sweep_interval_secs); - let mut done_retention = Duration::from_secs(watcher_config.done_retention_secs); - slog!( - "[watcher] sweep_interval={}s done_retention={}s", - watcher_config.sweep_interval_secs, - watcher_config.done_retention_secs - ); - // Whether a config file change is pending in the current debounce window. let mut config_changed_pending = false; let mut deadline: Option = None; - // Track when we last swept 5_done/ → 6_archived/. - // Initialise to "now minus interval" so the first sweep runs on startup. - let mut last_sweep = Instant::now() - .checked_sub(sweep_interval) - .unwrap_or_else(Instant::now); loop { // How long until the debounce window closes (or wait for next event). @@ -454,37 +453,9 @@ pub fn start_watcher( slog!("[watcher] broadcasting agent_config_changed"); let _ = event_tx.send(WatcherEvent::ConfigChanged); - // Hot-reload sweep config from project.toml. - match ProjectConfig::load(&git_root) { - Ok(cfg) => { - let new_sweep = Duration::from_secs(cfg.watcher.sweep_interval_secs); - let new_retention = - Duration::from_secs(cfg.watcher.done_retention_secs); - if new_sweep != sweep_interval || new_retention != done_retention { - slog!( - "[watcher] hot-reload: sweep_interval={}s done_retention={}s", - cfg.watcher.sweep_interval_secs, - cfg.watcher.done_retention_secs - ); - sweep_interval = new_sweep; - done_retention = new_retention; - } - } - Err(e) => { - slog!("[watcher] hot-reload: failed to parse config: {e}"); - } - } - config_changed_pending = false; } deadline = None; - - // Periodically promote old items from 5_done/ to 6_archived/. - let now = Instant::now(); - if now.duration_since(last_sweep) >= sweep_interval { - last_sweep = now; - sweep_done_to_archived(done_retention); - } } } }); diff --git a/server/src/main.rs b/server/src/main.rs index bf1b9ddb..b9cf7892 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -323,19 +323,12 @@ async fn main() -> Result<(), std::io::Error> { let (watcher_tx, _) = broadcast::channel::(1024); let agents = Arc::new(AgentPool::new(port, watcher_tx.clone())); - // Start the background watchdog that detects and cleans up orphaned Running agents. - // When orphans are found, auto-assign is triggered to reassign free agents. - let watchdog_root: Option = app_state.project_root.lock().unwrap().clone(); - AgentPool::spawn_watchdog(Arc::clone(&agents), watchdog_root); - // Filesystem watcher: watches config files (project.toml, agents.toml) for - // hot-reload and runs the CRDT-based done→archived sweep. Work-item pipeline - // events are driven by CRDT state transitions via crdt_state::subscribe(). + // hot-reload. Work-item pipeline events are driven by CRDT state transitions + // via crdt_state::subscribe(). Sweep (done→archived) is handled by the unified + // background tick loop below. if let Some(ref root) = *app_state.project_root.lock().unwrap() { - let watcher_config = config::ProjectConfig::load(root) - .map(|c| c.watcher) - .unwrap_or_default(); - io::watcher::start_watcher(root.clone(), watcher_tx.clone(), watcher_config); + io::watcher::start_watcher(root.clone(), watcher_tx.clone()); } // Bridge CRDT state-transition events to the watcher broadcast channel. @@ -655,6 +648,8 @@ async fn main() -> Result<(), std::io::Error> { .unwrap_or_else(|| std::path::PathBuf::from("/tmp/huskies-timers.json")), )); + let timer_store_for_tick = Arc::clone(&timer_store); + let ctx = AppContext { state: app_state, store, @@ -672,6 +667,59 @@ async fn main() -> Result<(), std::io::Error> { let app = build_routes(ctx, whatsapp_ctx.clone(), slack_ctx.clone(), port); + // Unified 1-second background tick loop: fires due timers, detects orphaned + // agents (watchdog), and promotes done→archived items (sweep). Replaces the + // three separate background loops that previously ran independently. + { + let tick_agents = Arc::clone(&startup_agents); + let tick_timer = timer_store_for_tick; + let tick_root = startup_root.clone(); + let sweep_cfg = tick_root + .as_ref() + .and_then(|r| config::ProjectConfig::load(r).ok()) + .map(|c| c.watcher) + .unwrap_or_default(); + let sweep_every = sweep_cfg.sweep_interval_secs.max(1); + let done_retention = std::time::Duration::from_secs(sweep_cfg.done_retention_secs); + let pending_count = tick_timer.list().len(); + crate::slog!("[tick] Unified tick loop started; {pending_count} pending timer(s)"); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); + let mut tick_count: u64 = 0; + loop { + interval.tick().await; + tick_count = tick_count.wrapping_add(1); + + // Timer: fire due timers every second. + if let Some(ref root) = tick_root { + let result = + crate::chat::timer::tick_once(&tick_timer, &tick_agents, root).await; + if let Err(msg) = result { + crate::slog_error!("[tick] Timer tick panicked: {msg}"); + } + } + + // Watchdog: detect orphaned Running agents every 30 ticks. + if tick_count.is_multiple_of(30) { + let found = tick_agents.run_watchdog_pass(); + if found > 0 { + crate::slog!( + "[tick] {found} orphaned agent(s) detected; triggering auto-assign." + ); + if let Some(ref root) = tick_root { + tick_agents.auto_assign_available_work(root).await; + } + } + } + + // Sweep: promote done→archived every sweep_interval_secs ticks. + if tick_count.is_multiple_of(sweep_every) { + crate::io::watcher::sweep_done_to_archived(done_retention); + } + } + }); + } + // Optional Matrix bot: connect to the homeserver and start listening for // messages if `.huskies/bot.toml` is present and enabled. if let Some(ref root) = startup_root {