//! I/O side of the timer service: filesystem persistence, clock reads, //! background task spawning, and story-ID resolution. //! //! This is the **only** place inside `service/timer/` that may perform side //! effects (filesystem reads/writes, clock reads, `tokio::spawn`). use chrono::{DateTime, Utc}; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use super::persist::{TimerEntry, deserialize_timers, serialize_timers}; use super::schedule::next_occurrence_at; // ── TimerStore ───────────────────────────────────────────────────────────── /// Persistent store for pending timers, backed by a JSON file. pub struct TimerStore { path: PathBuf, timers: Mutex>, } impl TimerStore { /// Load the timer store from `path`. Returns an empty store if the file /// does not exist or cannot be parsed. pub fn load(path: PathBuf) -> Self { let timers = if path.exists() { std::fs::read_to_string(&path) .ok() .and_then(|s| deserialize_timers(&s)) .unwrap_or_default() } else { Vec::new() }; Self { path, timers: Mutex::new(timers), } } fn save_locked(path: &Path, timers: &[TimerEntry]) -> Result<(), String> { if let Some(parent) = path.parent() { std::fs::create_dir_all(parent) .map_err(|e| format!("Failed to create directory: {e}"))?; } let content = serialize_timers(timers)?; std::fs::write(path, content).map_err(|e| format!("Failed to write timers: {e}")) } /// Add a timer and persist to disk. pub fn add(&self, story_id: String, scheduled_at: DateTime) -> Result<(), String> { let mut timers = self.timers.lock().unwrap(); timers.push(TimerEntry { story_id, scheduled_at, }); Self::save_locked(&self.path, &timers) } /// Remove the timer for `story_id`. Returns `true` if one was removed. pub fn remove(&self, story_id: &str) -> bool { let mut timers = self.timers.lock().unwrap(); let before = timers.len(); timers.retain(|t| t.story_id != story_id); let removed = timers.len() < before; if removed { let _ = Self::save_locked(&self.path, &timers); } removed } /// Return all pending timers (cloned). pub fn list(&self) -> Vec { self.timers.lock().unwrap().clone() } /// Add or update a timer for `story_id`. /// /// - If no timer exists for `story_id`, adds it. /// - If a timer already exists and `scheduled_at` is **later**, updates it. /// - If a timer already exists and `scheduled_at` is earlier or equal, no-op. /// /// Use this instead of [`add`] when auto-scheduling from rate-limit events to /// avoid creating duplicates and to always keep the latest reset time. pub fn upsert(&self, story_id: String, scheduled_at: DateTime) -> Result<(), String> { let mut timers = self.timers.lock().unwrap(); if let Some(existing) = timers.iter_mut().find(|t| t.story_id == story_id) { if scheduled_at > existing.scheduled_at { existing.scheduled_at = scheduled_at; Self::save_locked(&self.path, &timers)?; } } else { timers.push(TimerEntry { story_id, scheduled_at, }); Self::save_locked(&self.path, &timers)?; } Ok(()) } /// Remove and return all timers whose `scheduled_at` is ≤ `now`. /// Persists the updated list to disk if any timers were removed. pub fn take_due(&self, now: DateTime) -> Vec { let mut timers = self.timers.lock().unwrap(); let mut due = Vec::new(); let mut remaining = Vec::new(); for t in timers.drain(..) { if t.scheduled_at <= now { due.push(t); } else { remaining.push(t); } } *timers = remaining; if !due.is_empty() { let _ = Self::save_locked(&self.path, &timers); } due } } // ── Clock-reading wrapper ─────────────────────────────────────────────────── /// Parse `HH:MM` and return the next UTC instant at which the given timezone /// (or the server-local clock when `timezone` is `None`) will read that time. /// If the time has already passed today, returns tomorrow's occurrence. /// /// This wrapper reads the current clock via `Utc::now()`. For pure unit tests /// use [`crate::service::timer::schedule::next_occurrence_at`] directly. pub fn next_occurrence_of_hhmm(hhmm: &str, timezone: Option<&str>) -> Option> { next_occurrence_at(hhmm, timezone, Utc::now()) } // ── Story-ID resolution ───────────────────────────────────────────────────── /// Resolve a story ID from a numeric story number or a full ID string. /// /// Searches all pipeline stages. Returns `None` only when the input is /// numeric but no matching file is found. pub(super) fn resolve_story_id(number_or_id: &str, project_root: &Path) -> Option { const STAGES: &[&str] = &[ "1_backlog", "2_current", "3_qa", "4_merge", "5_done", "6_archived", ]; // Full ID (contains underscores) — return as-is; validation happens at file-check time. if number_or_id.contains('_') { return Some(number_or_id.to_string()); } // Numeric lookup. if !number_or_id.chars().all(|c| c.is_ascii_digit()) { return None; } // --- DB-first lookup --- for id in crate::db::all_content_ids() { let file_num = id.split('_').next().unwrap_or(""); if file_num == number_or_id && crate::pipeline_state::read_typed(&id) .ok() .flatten() .is_some() { return Some(id); } } // --- Filesystem fallback --- for stage in STAGES { let dir = project_root.join(".huskies").join("work").join(stage); if !dir.exists() { continue; } if let Ok(entries) = std::fs::read_dir(&dir) { for entry in entries.flatten() { let path = entry.path(); if path.extension().and_then(|e| e.to_str()) != Some("md") { continue; } if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) { let file_num = stem .split('_') .next() .filter(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit())) .unwrap_or(""); if file_num == number_or_id { return Some(stem.to_string()); } } } } } None } // ── Tick loop ─────────────────────────────────────────────────────────────── /// Execute one tick of the timer loop. /// /// Called by the unified background tick loop every second. /// Separated from the loop so we can catch panics at the call-site. /// Returns `Err` only when the tick panicked (the panic message is returned). pub async fn tick_once( store: &Arc, agents: &Arc, project_root: &Path, ) -> Result<(), String> { // take_due is sync and could panic (e.g. poisoned mutex) — catch it. let due = { let store_ref = Arc::clone(store); let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || { store_ref.take_due(Utc::now()) })); match result { Ok(due) => due, Err(e) => return Err(panic_payload_to_string(&e)), } }; if due.is_empty() { return Ok(()); } let remaining = store.list().len(); crate::slog!("[timer] Tick: {} due, {remaining} remaining", due.len()); for entry in due { crate::slog!("[timer] Timer fired for story {}", entry.story_id); // Bug 501: Defense-in-depth check. If the story has already advanced // past the active-work stages (3_qa, 4_merge, 5_done, 6_archived), // there is nothing to resume — the timer is stale and should no-op. // The primary cancellation paths (move_story MCP → backlog, stop_agent) // remove the timer before it fires; this guard covers the case where // cancellation was not yet called or the story raced forward through // the pipeline while the timer was pending. if let Ok(Some(item)) = crate::pipeline_state::read_typed(&entry.story_id) { use crate::pipeline_state::Stage; match &item.stage { Stage::Qa | Stage::Merge { .. } | Stage::Done { .. } | Stage::Archived { .. } => { crate::slog!( "[timer] Skipping timer for story {} — currently in '{}', \ not in backlog/current; timer is stale", entry.story_id, item.stage.dir_name() ); continue; } _ => {} } } // Move from backlog to current if needed — the auto-assign // watcher will then start an agent automatically. if let Err(e) = crate::agents::lifecycle::move_story_to_current(project_root, &entry.story_id) { crate::slog!( "[timer] Failed to move story {} to current: {e}", entry.story_id ); continue; } match agents .start_agent(project_root, &entry.story_id, None, None, None) .await { Ok(info) => { crate::slog!( "[timer] Started agent {} for story {}", info.agent_name, entry.story_id ); } Err(e) => { crate::slog!( "[timer] Failed to start agent for story {}: {e} \ (auto-assign may pick it up)", entry.story_id ); } } } Ok(()) } fn panic_payload_to_string(payload: &Box) -> String { if let Some(s) = payload.downcast_ref::<&str>() { (*s).to_string() } else if let Some(s) = payload.downcast_ref::() { s.clone() } else { "unknown panic".to_string() } } // ── Rate-limit auto-scheduler ─────────────────────────────────────────────── /// Spawn a background task that listens for [`WatcherEvent::RateLimitHardBlock`] /// events and auto-schedules a timer for the blocked story. /// /// When an OAuth account swap succeeds, emits [`WatcherEvent::OAuthAccountSwapped`] /// so chat transports can notify the user which account took over. When all /// accounts are exhausted, emits [`WatcherEvent::OAuthAccountsExhausted`] with /// the earliest reset time. /// /// If a timer already exists for the story, it is updated to the later reset time /// rather than creating a duplicate (via [`TimerStore::upsert`]). pub fn spawn_rate_limit_auto_scheduler( store: Arc, mut watcher_rx: tokio::sync::broadcast::Receiver, watcher_tx: tokio::sync::broadcast::Sender, ) { tokio::spawn(async move { loop { match watcher_rx.recv().await { Ok(crate::io::watcher::WatcherEvent::RateLimitHardBlock { story_id, agent_name, reset_at, }) => { // Skip short rate limits (≤10 min) — the CLI handles // these internally. Only schedule timers for long // session-level blocks where the CLI will exit. let until_reset = reset_at.signed_duration_since(chrono::Utc::now()); if until_reset.num_minutes() <= 10 { crate::slog!( "[timer] Skipping short rate limit for {story_id} \ ({} min); CLI will handle internally", until_reset.num_minutes() ); continue; } // Try to swap to the next available OAuth account. On // success the next agent start (via auto-assign) will use // the freshly activated account — no long timer needed. // On failure (no pool or all accounts exhausted) fall back // to the existing timer-based retry path. match crate::llm::oauth::swap_to_next_available_account(reset_at).await { Ok(new_email) => { crate::slog!( "[timer] Account swap successful for story {story_id} \ (agent {agent_name}): now using '{new_email}'. \ Auto-assign will restart the agent with the new account." ); let _ = watcher_tx.send( crate::io::watcher::WatcherEvent::OAuthAccountSwapped { new_email }, ); // No timer needed — auto-assign picks up the story. continue; } Err(swap_err) => { crate::slog!( "[timer] Account swap not possible for story {story_id}: \ {swap_err}. Falling back to timer-based retry." ); // Notify chat transports when all accounts are exhausted. if swap_err.contains("All OAuth accounts are rate-limited") { let _ = watcher_tx.send( crate::io::watcher::WatcherEvent::OAuthAccountsExhausted { earliest_reset_msg: swap_err.clone(), }, ); } } } crate::slog!( "[timer] Auto-scheduling timer for story {story_id} \ (agent {agent_name}) to resume at {reset_at}" ); match store.upsert(story_id.clone(), reset_at) { Ok(()) => { crate::slog!( "[timer] Timer upserted for story {story_id}; \ scheduled at {reset_at}" ); } Err(e) => { crate::slog!( "[timer] Failed to upsert timer for story {story_id}: {e}" ); } } } Ok(_) => {} Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { crate::slog!("[timer] Rate-limit auto-scheduler lagged, skipped {n} events"); } Err(tokio::sync::broadcast::error::RecvError::Closed) => { crate::slog!( "[timer] Watcher channel closed, stopping rate-limit auto-scheduler" ); break; } } } }); } // ── Tests ────────────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; use chrono::Duration; use tempfile::TempDir; // ── next_occurrence_of_hhmm ───────────────────────────────────────── #[test] fn valid_hhmm_returns_some() { let result = next_occurrence_of_hhmm("14:30", None); assert!(result.is_some(), "valid HH:MM should return Some"); } #[test] fn invalid_hhmm_missing_colon_returns_none() { assert!(next_occurrence_of_hhmm("1430", None).is_none()); } #[test] fn invalid_hhmm_bad_hours_returns_none() { assert!(next_occurrence_of_hhmm("25:00", None).is_none()); } #[test] fn invalid_hhmm_bad_minutes_returns_none() { assert!(next_occurrence_of_hhmm("12:60", None).is_none()); } #[test] fn next_occurrence_is_in_the_future() { let result = next_occurrence_of_hhmm("14:30", None).unwrap(); assert!(result > Utc::now(), "next occurrence must be in the future"); } #[test] fn next_occurrence_with_named_timezone_is_in_the_future() { let result = next_occurrence_of_hhmm("14:30", Some("Europe/London")).unwrap(); assert!( result > Utc::now(), "next occurrence (Europe/London) must be in the future" ); } #[test] fn next_occurrence_with_invalid_timezone_falls_back_to_local() { // An unrecognised timezone name falls back to chrono::Local (returns Some). let result = next_occurrence_of_hhmm("14:30", Some("Invalid/Zone")); assert!( result.is_some(), "invalid timezone should fall back to local and return Some" ); } // ── TimerStore ────────────────────────────────────────────────────── #[test] fn timer_store_empty_on_missing_file() { let dir = TempDir::new().unwrap(); let store = TimerStore::load(dir.path().join("timers.json")); assert!(store.list().is_empty()); } #[test] fn timer_store_add_and_list() { let dir = TempDir::new().unwrap(); let store = TimerStore::load(dir.path().join("timers.json")); let t = Utc::now() + Duration::hours(1); store.add("story_1".to_string(), t).unwrap(); let list = store.list(); assert_eq!(list.len(), 1); assert_eq!(list[0].story_id, "story_1"); } #[test] fn timer_store_remove() { let dir = TempDir::new().unwrap(); let store = TimerStore::load(dir.path().join("timers.json")); let t = Utc::now() + Duration::hours(1); store.add("story_1".to_string(), t).unwrap(); assert!(store.remove("story_1")); assert!(!store.remove("story_1")); // already gone assert!(store.list().is_empty()); } #[test] fn timer_store_persists_and_reloads() { let dir = TempDir::new().unwrap(); let path = dir.path().join("timers.json"); let t = Utc::now() + Duration::hours(2); { let store = TimerStore::load(path.clone()); store.add("421_story_foo".to_string(), t).unwrap(); } // Reload from disk. let store2 = TimerStore::load(path); let list = store2.list(); assert_eq!(list.len(), 1); assert_eq!(list[0].story_id, "421_story_foo"); } #[test] fn take_due_returns_only_past_entries() { let dir = TempDir::new().unwrap(); let store = TimerStore::load(dir.path().join("timers.json")); let past = Utc::now() - Duration::minutes(1); let future = Utc::now() + Duration::hours(1); store.add("past_story".to_string(), past).unwrap(); store.add("future_story".to_string(), future).unwrap(); let due = store.take_due(Utc::now()); assert_eq!(due.len(), 1); assert_eq!(due[0].story_id, "past_story"); assert_eq!(store.list().len(), 1); assert_eq!(store.list()[0].story_id, "future_story"); } // ── upsert ───────────────────────────────────────────────────────── #[test] fn upsert_adds_new_timer_when_none_exists() { let dir = TempDir::new().unwrap(); let store = TimerStore::load(dir.path().join("timers.json")); let t = Utc::now() + Duration::hours(1); store.upsert("story_1".to_string(), t).unwrap(); let list = store.list(); assert_eq!(list.len(), 1); assert_eq!(list[0].story_id, "story_1"); assert_eq!(list[0].scheduled_at, t); } #[test] fn upsert_updates_to_later_time() { let dir = TempDir::new().unwrap(); let store = TimerStore::load(dir.path().join("timers.json")); let early = Utc::now() + Duration::hours(1); let later = Utc::now() + Duration::hours(2); store.upsert("story_1".to_string(), early).unwrap(); store.upsert("story_1".to_string(), later).unwrap(); let list = store.list(); assert_eq!(list.len(), 1, "should not create duplicate"); assert_eq!(list[0].scheduled_at, later, "should update to later time"); } #[test] fn upsert_does_not_downgrade_to_earlier_time() { let dir = TempDir::new().unwrap(); let store = TimerStore::load(dir.path().join("timers.json")); let later = Utc::now() + Duration::hours(2); let earlier = Utc::now() + Duration::hours(1); store.upsert("story_1".to_string(), later).unwrap(); store.upsert("story_1".to_string(), earlier).unwrap(); let list = store.list(); assert_eq!(list.len(), 1); assert_eq!( list[0].scheduled_at, later, "should keep the later time, not downgrade" ); } // ── spawn_rate_limit_auto_scheduler ───────────────────────────────── /// AC2: a RateLimitHardBlock event causes the auto-scheduler to add a timer. #[tokio::test] async fn rate_limit_auto_scheduler_adds_timer_on_hard_block() { use crate::io::watcher::WatcherEvent; let dir = TempDir::new().unwrap(); let store = Arc::new(TimerStore::load(dir.path().join("timers.json"))); let (watcher_tx, watcher_rx) = tokio::sync::broadcast::channel::(16); spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx, watcher_tx.clone()); let reset_at = Utc::now() + Duration::hours(1); watcher_tx .send(WatcherEvent::RateLimitHardBlock { story_id: "423_story_test".to_string(), agent_name: "coder-1".to_string(), reset_at, }) .unwrap(); // Give the spawned task time to process the event. tokio::time::sleep(std::time::Duration::from_millis(100)).await; let list = store.list(); assert_eq!(list.len(), 1, "expected one timer after hard block"); assert_eq!(list[0].story_id, "423_story_test"); assert_eq!(list[0].scheduled_at, reset_at); } /// AC3 integration: a second hard block with a later reset_at updates the /// existing timer rather than creating a duplicate. #[tokio::test] async fn rate_limit_auto_scheduler_upserts_on_repeated_hard_block() { use crate::io::watcher::WatcherEvent; let dir = TempDir::new().unwrap(); let store = Arc::new(TimerStore::load(dir.path().join("timers.json"))); let (watcher_tx, watcher_rx) = tokio::sync::broadcast::channel::(16); spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx, watcher_tx.clone()); let first = Utc::now() + Duration::hours(1); let second = Utc::now() + Duration::hours(2); watcher_tx .send(WatcherEvent::RateLimitHardBlock { story_id: "423_story_test".to_string(), agent_name: "coder-1".to_string(), reset_at: first, }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(50)).await; watcher_tx .send(WatcherEvent::RateLimitHardBlock { story_id: "423_story_test".to_string(), agent_name: "coder-1".to_string(), reset_at: second, }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; let list = store.list(); assert_eq!(list.len(), 1, "should not create a duplicate timer"); assert_eq!(list[0].scheduled_at, second, "should update to later time"); } #[test] fn multiple_timers_same_time_all_returned() { let dir = TempDir::new().unwrap(); let store = TimerStore::load(dir.path().join("timers.json")); let past = Utc::now() - Duration::minutes(1); store.add("story_a".to_string(), past).unwrap(); store.add("story_b".to_string(), past).unwrap(); let due = store.take_due(Utc::now()); assert_eq!(due.len(), 2, "both timers at same time must fire"); } // ── tick_once ─────────────────────────────────────────────────────── /// Create a past-due timer, run tick_once, and assert the entry is /// consumed. start_agent will fail (no real agent binary), but /// take_due must still drain the entry. #[tokio::test] async fn tick_once_consumes_past_due_entry() { use std::fs; let dir = TempDir::new().unwrap(); let root = dir.path(); let backlog = root.join(".huskies/work/1_backlog"); let current = root.join(".huskies/work/2_current"); fs::create_dir_all(&backlog).unwrap(); fs::create_dir_all(¤t).unwrap(); let content = "---\nname: Foo\n---\n"; fs::write(backlog.join("9905_story_foo.md"), content).unwrap(); crate::db::ensure_content_store(); crate::db::write_content("9905_story_foo", content); let store = Arc::new(TimerStore::load(root.join("timers.json"))); let past = Utc::now() - Duration::seconds(5); store.add("9905_story_foo".to_string(), past).unwrap(); assert_eq!(store.list().len(), 1, "precondition: one pending timer"); let agents = Arc::new(crate::agents::AgentPool::new_test(19999)); // tick_once should drain the due entry even though start_agent // will fail (no agent binary configured in the test pool). let result = super::tick_once(&store, &agents, root).await; assert!(result.is_ok(), "tick_once should not panic: {result:?}"); assert!( store.list().is_empty(), "past-due timer must be consumed after tick_once" ); // Story should still be accessible in the content store after the move. assert!( crate::db::read_content("9905_story_foo").is_some(), "story should be in the content store after tick fires" ); } }