From eca0ef792c4d194ec925cd4a5f932ee91fe5aa8a Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 24 Apr 2026 17:39:42 +0000 Subject: [PATCH] huskies: merge 615_story_extract_timer_service --- server/src/chat/timer.rs | 1146 +---------------- .../src/chat/transport/matrix/bot/context.rs | 10 +- .../src/chat/transport/matrix/bot/messages.rs | 4 +- server/src/chat/transport/matrix/bot/run.rs | 7 +- server/src/http/context.rs | 2 +- server/src/main.rs | 4 +- server/src/service/bot_command/io.rs | 6 +- server/src/service/mod.rs | 1 + server/src/service/timer/io.rs | 650 ++++++++++ server/src/service/timer/mod.rs | 468 +++++++ server/src/service/timer/parse.rs | 179 +++ server/src/service/timer/persist.rs | 89 ++ server/src/service/timer/schedule.rs | 190 +++ 13 files changed, 1603 insertions(+), 1153 deletions(-) create mode 100644 server/src/service/timer/io.rs create mode 100644 server/src/service/timer/mod.rs create mode 100644 server/src/service/timer/parse.rs create mode 100644 server/src/service/timer/persist.rs create mode 100644 server/src/service/timer/schedule.rs diff --git a/server/src/chat/timer.rs b/server/src/chat/timer.rs index d62de925..f301cd16 100644 --- a/server/src/chat/timer.rs +++ b/server/src/chat/timer.rs @@ -1,1139 +1,9 @@ -//! Deferred agent start via one-shot timers. +//! Thin re-export shim. All timer logic now lives in [`crate::service::timer`]. //! -//! Provides [`TimerStore`] for persisting timers to `.huskies/timers.json` -//! and command parsing / handling for the `timer` bot command. -//! Due timers are fired by the unified background tick loop in `main`. - -use chrono::{DateTime, Duration, Local, NaiveTime, TimeZone, Utc}; -use chrono_tz::Tz; -use serde::{Deserialize, Serialize}; -use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; - -use crate::chat::util::strip_bot_mention; - -// ── Data types ───────────────────────────────────────────────────────────── - -/// A single scheduled timer entry. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct TimerEntry { - /// The full story ID (filename stem, e.g. `421_story_foo`). - pub story_id: String, - /// UTC instant at which the timer should fire. - pub scheduled_at: DateTime, -} - -// ── TimerStore ───────────────────────────────────────────────────────────── - -/// Persistent store for pending timers, backed by a JSON file. -pub struct TimerStore { - path: PathBuf, - timers: Mutex>, -} - -impl TimerStore { - /// Load the timer store from `path`. Returns an empty store if the file - /// does not exist or cannot be parsed. - pub fn load(path: PathBuf) -> Self { - let timers = if path.exists() { - std::fs::read_to_string(&path) - .ok() - .and_then(|s| serde_json::from_str::>(&s).ok()) - .unwrap_or_default() - } else { - Vec::new() - }; - Self { - path, - timers: Mutex::new(timers), - } - } - - fn save_locked(path: &Path, timers: &[TimerEntry]) -> Result<(), String> { - if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent) - .map_err(|e| format!("Failed to create directory: {e}"))?; - } - let content = - serde_json::to_string_pretty(timers).map_err(|e| format!("Serialize failed: {e}"))?; - std::fs::write(path, content).map_err(|e| format!("Failed to write timers: {e}")) - } - - /// Add a timer and persist to disk. - pub fn add(&self, story_id: String, scheduled_at: DateTime) -> Result<(), String> { - let mut timers = self.timers.lock().unwrap(); - timers.push(TimerEntry { - story_id, - scheduled_at, - }); - Self::save_locked(&self.path, &timers) - } - - /// Remove the timer for `story_id`. Returns `true` if one was removed. - pub fn remove(&self, story_id: &str) -> bool { - let mut timers = self.timers.lock().unwrap(); - let before = timers.len(); - timers.retain(|t| t.story_id != story_id); - let removed = timers.len() < before; - if removed { - let _ = Self::save_locked(&self.path, &timers); - } - removed - } - - /// Return all pending timers (cloned). - pub fn list(&self) -> Vec { - self.timers.lock().unwrap().clone() - } - - /// Add or update a timer for `story_id`. - /// - /// - If no timer exists for `story_id`, adds it. - /// - If a timer already exists and `scheduled_at` is **later**, updates it. - /// - If a timer already exists and `scheduled_at` is earlier or equal, no-op. - /// - /// Use this instead of [`add`] when auto-scheduling from rate-limit events to - /// avoid creating duplicates and to always keep the latest reset time. - pub fn upsert(&self, story_id: String, scheduled_at: DateTime) -> Result<(), String> { - let mut timers = self.timers.lock().unwrap(); - if let Some(existing) = timers.iter_mut().find(|t| t.story_id == story_id) { - if scheduled_at > existing.scheduled_at { - existing.scheduled_at = scheduled_at; - Self::save_locked(&self.path, &timers)?; - } - } else { - timers.push(TimerEntry { - story_id, - scheduled_at, - }); - Self::save_locked(&self.path, &timers)?; - } - Ok(()) - } - - /// Remove and return all timers whose `scheduled_at` is ≤ `now`. - /// Persists the updated list to disk if any timers were removed. - pub fn take_due(&self, now: DateTime) -> Vec { - let mut timers = self.timers.lock().unwrap(); - let mut due = Vec::new(); - let mut remaining = Vec::new(); - for t in timers.drain(..) { - if t.scheduled_at <= now { - due.push(t); - } else { - remaining.push(t); - } - } - *timers = remaining; - if !due.is_empty() { - let _ = Self::save_locked(&self.path, &timers); - } - due - } -} - -// ── Tick loop ────────────────────────────────────────────────────────────── - -/// Execute one tick of the timer loop. -/// -/// Called by the unified background tick loop every second. -/// Separated from the loop so we can catch panics at the call-site. -/// Returns `Err` only when the tick panicked (the panic message is returned). -pub(crate) async fn tick_once( - store: &Arc, - agents: &Arc, - project_root: &Path, -) -> Result<(), String> { - // take_due is sync and could panic (e.g. poisoned mutex) — catch it. - let due = { - let store_ref = Arc::clone(store); - let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || { - store_ref.take_due(Utc::now()) - })); - match result { - Ok(due) => due, - Err(e) => return Err(panic_payload_to_string(&e)), - } - }; - - if due.is_empty() { - return Ok(()); - } - - let remaining = store.list().len(); - crate::slog!("[timer] Tick: {} due, {remaining} remaining", due.len()); - - for entry in due { - crate::slog!("[timer] Timer fired for story {}", entry.story_id); - - // Bug 501: Defense-in-depth check. If the story has already advanced - // past the active-work stages (3_qa, 4_merge, 5_done, 6_archived), - // there is nothing to resume — the timer is stale and should no-op. - // The primary cancellation paths (move_story MCP → backlog, stop_agent) - // remove the timer before it fires; this guard covers the case where - // cancellation was not yet called or the story raced forward through - // the pipeline while the timer was pending. - if let Ok(Some(item)) = crate::pipeline_state::read_typed(&entry.story_id) { - use crate::pipeline_state::Stage; - match &item.stage { - Stage::Qa | Stage::Merge { .. } | Stage::Done { .. } | Stage::Archived { .. } => { - crate::slog!( - "[timer] Skipping timer for story {} — currently in '{}', \ - not in backlog/current; timer is stale", - entry.story_id, - item.stage.dir_name() - ); - continue; - } - _ => {} - } - } - - // Move from backlog to current if needed — the auto-assign - // watcher will then start an agent automatically. - if let Err(e) = - crate::agents::lifecycle::move_story_to_current(project_root, &entry.story_id) - { - crate::slog!( - "[timer] Failed to move story {} to current: {e}", - entry.story_id - ); - continue; - } - - match agents - .start_agent(project_root, &entry.story_id, None, None, None) - .await - { - Ok(info) => { - crate::slog!( - "[timer] Started agent {} for story {}", - info.agent_name, - entry.story_id - ); - } - Err(e) => { - crate::slog!( - "[timer] Failed to start agent for story {}: {e} \ - (auto-assign may pick it up)", - entry.story_id - ); - } - } - } - Ok(()) -} - -/// Extract a human-readable message from a `catch_unwind` panic payload. -fn panic_payload_to_string(payload: &Box) -> String { - if let Some(s) = payload.downcast_ref::<&str>() { - (*s).to_string() - } else if let Some(s) = payload.downcast_ref::() { - s.clone() - } else { - "unknown panic".to_string() - } -} - -/// Spawn a background task that listens for [`WatcherEvent::RateLimitHardBlock`] -/// events and auto-schedules a timer for the blocked story. -/// -/// If a timer already exists for the story, it is updated to the later reset time -/// rather than creating a duplicate (via [`TimerStore::upsert`]). -pub fn spawn_rate_limit_auto_scheduler( - store: Arc, - mut watcher_rx: tokio::sync::broadcast::Receiver, -) { - tokio::spawn(async move { - loop { - match watcher_rx.recv().await { - Ok(crate::io::watcher::WatcherEvent::RateLimitHardBlock { - story_id, - agent_name, - reset_at, - }) => { - // Skip short rate limits (≤10 min) — the CLI handles - // these internally. Only schedule timers for long - // session-level blocks where the CLI will exit. - let until_reset = reset_at.signed_duration_since(chrono::Utc::now()); - if until_reset.num_minutes() <= 10 { - crate::slog!( - "[timer] Skipping short rate limit for {story_id} \ - ({} min); CLI will handle internally", - until_reset.num_minutes() - ); - continue; - } - crate::slog!( - "[timer] Auto-scheduling timer for story {story_id} \ - (agent {agent_name}) to resume at {reset_at}" - ); - match store.upsert(story_id.clone(), reset_at) { - Ok(()) => { - crate::slog!( - "[timer] Timer upserted for story {story_id}; \ - scheduled at {reset_at}" - ); - } - Err(e) => { - crate::slog!( - "[timer] Failed to upsert timer for story {story_id}: {e}" - ); - } - } - } - Ok(_) => {} - Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - crate::slog!("[timer] Rate-limit auto-scheduler lagged, skipped {n} events"); - } - Err(tokio::sync::broadcast::error::RecvError::Closed) => { - crate::slog!( - "[timer] Watcher channel closed, stopping rate-limit auto-scheduler" - ); - break; - } - } - } - }); -} - -// ── Command types ────────────────────────────────────────────────────────── - -/// A parsed `timer` command. -#[derive(Debug, PartialEq)] -pub enum TimerCommand { - /// `timer ` — schedule a deferred start. - Schedule { - story_number_or_id: String, - hhmm: String, - }, - /// `timer list` — list all pending timers. - List, - /// `timer cancel ` — remove a pending timer. - Cancel { story_number_or_id: String }, - /// Malformed arguments. - BadArgs, -} - -// ── Command extraction ───────────────────────────────────────────────────── - -/// Parse a `timer` command from a raw message body. -/// -/// Strips the bot mention prefix and matches the `timer` keyword. -/// Returns `None` when the message is not a timer command at all. -pub fn extract_timer_command( - message: &str, - bot_name: &str, - bot_user_id: &str, -) -> Option { - let stripped = strip_bot_mention(message, bot_name, bot_user_id); - let trimmed = stripped - .trim() - .trim_start_matches(|c: char| !c.is_alphanumeric()); - - let (cmd, args) = match trimmed.split_once(char::is_whitespace) { - Some((c, a)) => (c, a.trim()), - None => (trimmed, ""), - }; - - if !cmd.eq_ignore_ascii_case("timer") { - return None; - } - - // `timer` with no args or `timer list` - if args.is_empty() || args.eq_ignore_ascii_case("list") { - return Some(TimerCommand::List); - } - - let (sub, rest) = match args.split_once(char::is_whitespace) { - Some((s, r)) => (s, r.trim()), - None => (args, ""), - }; - - // `timer cancel ` - if sub.eq_ignore_ascii_case("cancel") { - if rest.is_empty() { - return Some(TimerCommand::BadArgs); - } - return Some(TimerCommand::Cancel { - story_number_or_id: rest.to_string(), - }); - } - - // `timer ` - if rest.is_empty() { - return Some(TimerCommand::BadArgs); - } - - Some(TimerCommand::Schedule { - story_number_or_id: sub.to_string(), - hhmm: rest.to_string(), - }) -} - -// ── Command handler ──────────────────────────────────────────────────────── - -/// Handle a parsed `timer` command. Returns a markdown-formatted response. -pub async fn handle_timer_command( - cmd: TimerCommand, - store: &TimerStore, - project_root: &Path, -) -> String { - // Load the configured timezone (if any) from project.toml. - let config_tz: Option = crate::config::ProjectConfig::load(project_root) - .ok() - .and_then(|c| c.timezone); - let tz_str: Option<&str> = config_tz.as_deref(); - - match cmd { - TimerCommand::Schedule { - story_number_or_id, - hhmm, - } => { - let story_id = match resolve_story_id(&story_number_or_id, project_root) { - Some(id) => id, - None => { - return format!("No story with number or ID **{story_number_or_id}** found."); - } - }; - - // The story must be in backlog or current. When the timer fires, - // backlog stories are moved to current automatically. - // Check CRDT state first, then fall back to filesystem. - let in_valid_stage = - if let Ok(Some(item)) = crate::pipeline_state::read_typed(&story_id) { - use crate::pipeline_state::Stage; - matches!(item.stage, Stage::Backlog | Stage::Coding) - } else { - let work_dir = project_root.join(".huskies").join("work"); - work_dir - .join("1_backlog") - .join(format!("{story_id}.md")) - .exists() - || work_dir - .join("2_current") - .join(format!("{story_id}.md")) - .exists() - }; - if !in_valid_stage { - return format!("Story **{story_id}** is not in backlog or current."); - } - - let scheduled_at = match next_occurrence_of_hhmm(&hhmm, tz_str) { - Some(t) => t, - None => { - return format!("Invalid time **{hhmm}**. Use `HH:MM` format (e.g. `14:30`)."); - } - }; - - match store.add(story_id.clone(), scheduled_at) { - Ok(()) => { - let (display_time, tz_label) = format_in_timezone(scheduled_at, tz_str); - format!("Timer set for **{story_id}** at **{display_time}** ({tz_label}).") - } - Err(e) => format!("Failed to save timer: {e}"), - } - } - TimerCommand::List => { - let timers = store.list(); - if timers.is_empty() { - return "No pending timers.".to_string(); - } - let mut lines = vec!["**Pending timers:**".to_string()]; - for t in &timers { - let (display_time, _) = format_in_timezone(t.scheduled_at, tz_str); - lines.push(format!("- **{}** → {}", t.story_id, display_time)); - } - lines.join("\n") - } - TimerCommand::Cancel { story_number_or_id } => { - let story_id = resolve_story_id(&story_number_or_id, project_root) - .unwrap_or(story_number_or_id.clone()); - if store.remove(&story_id) { - format!("Timer for **{story_id}** cancelled.") - } else { - format!("No timer found for **{story_id}**.") - } - } - TimerCommand::BadArgs => "Usage:\n\ - - `timer ` — schedule deferred start\n\ - - `timer list` — show pending timers\n\ - - `timer cancel ` — remove a timer" - .to_string(), - } -} - -// ── Helpers ──────────────────────────────────────────────────────────────── - -/// Parse `HH:MM` and return the next UTC instant at which the given timezone -/// (or the server-local clock when `timezone` is `None`) will read that time. -/// If the time has already passed today, returns tomorrow's occurrence. -pub fn next_occurrence_of_hhmm(hhmm: &str, timezone: Option<&str>) -> Option> { - let (hh, mm) = hhmm.split_once(':')?; - let hours: u32 = hh.parse().ok()?; - let minutes: u32 = mm.parse().ok()?; - if hours > 23 || minutes > 59 { - return None; - } - let target_time = NaiveTime::from_hms_opt(hours, minutes, 0)?; - - match timezone.and_then(|s| s.parse::().ok()) { - Some(tz) => { - let now_tz = Utc::now().with_timezone(&tz); - let today = now_tz.date_naive(); - let candidate = today.and_time(target_time); - let candidate_tz = tz.from_local_datetime(&candidate).single()?; - if candidate_tz > now_tz { - Some(candidate_tz.to_utc()) - } else { - let tomorrow = today + Duration::days(1); - let tomorrow_candidate = tomorrow.and_time(target_time); - let tomorrow_tz = tz.from_local_datetime(&tomorrow_candidate).single()?; - Some(tomorrow_tz.to_utc()) - } - } - None => { - // Fall back to host/container local timezone. - let now_local = Local::now(); - let today = now_local.date_naive(); - let candidate = today.and_time(target_time); - let candidate_local = Local.from_local_datetime(&candidate).single()?; - if candidate_local > now_local { - Some(candidate_local.to_utc()) - } else { - let tomorrow = today + Duration::days(1); - let tomorrow_candidate = tomorrow.and_time(target_time); - let tomorrow_local = Local.from_local_datetime(&tomorrow_candidate).single()?; - Some(tomorrow_local.to_utc()) - } - } - } -} - -/// Format a UTC instant for display in the given timezone (or local time when -/// `timezone` is `None`). Returns `(formatted_string, label)` where `label` -/// is either the IANA timezone name or `"server local time"`. -fn format_in_timezone(dt: DateTime, timezone: Option<&str>) -> (String, String) { - match timezone.and_then(|s| s.parse::().ok()) { - Some(tz) => { - let tz_time = dt.with_timezone(&tz); - (tz_time.format("%Y-%m-%d %H:%M").to_string(), tz.to_string()) - } - None => { - let local_time = dt.with_timezone(&Local); - ( - local_time.format("%Y-%m-%d %H:%M").to_string(), - "server local time".to_string(), - ) - } - } -} - -/// Resolve a story ID from a numeric story number or a full ID string. -/// -/// Searches all pipeline stages. Returns `None` only when the input is -/// numeric but no matching file is found. -fn resolve_story_id(number_or_id: &str, project_root: &Path) -> Option { - const STAGES: &[&str] = &[ - "1_backlog", - "2_current", - "3_qa", - "4_merge", - "5_done", - "6_archived", - ]; - - // Full ID (contains underscores) — return as-is; validation happens at file-check time. - if number_or_id.contains('_') { - return Some(number_or_id.to_string()); - } - - // Numeric lookup. - if !number_or_id.chars().all(|c| c.is_ascii_digit()) { - return None; - } - - // --- DB-first lookup --- - for id in crate::db::all_content_ids() { - let file_num = id.split('_').next().unwrap_or(""); - if file_num == number_or_id - && crate::pipeline_state::read_typed(&id) - .ok() - .flatten() - .is_some() - { - return Some(id); - } - } - - // --- Filesystem fallback --- - for stage in STAGES { - let dir = project_root.join(".huskies").join("work").join(stage); - if !dir.exists() { - continue; - } - if let Ok(entries) = std::fs::read_dir(&dir) { - for entry in entries.flatten() { - let path = entry.path(); - if path.extension().and_then(|e| e.to_str()) != Some("md") { - continue; - } - if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) { - let file_num = stem - .split('_') - .next() - .filter(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit())) - .unwrap_or(""); - if file_num == number_or_id { - return Some(stem.to_string()); - } - } - } - } - } - None -} - -// ── Tests ────────────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::TempDir; - - // ── next_occurrence_of_hhmm ───────────────────────────────────────── - - #[test] - fn valid_hhmm_returns_some() { - let result = next_occurrence_of_hhmm("14:30", None); - assert!(result.is_some(), "valid HH:MM should return Some"); - } - - #[test] - fn invalid_hhmm_missing_colon_returns_none() { - assert!(next_occurrence_of_hhmm("1430", None).is_none()); - } - - #[test] - fn invalid_hhmm_bad_hours_returns_none() { - assert!(next_occurrence_of_hhmm("25:00", None).is_none()); - } - - #[test] - fn invalid_hhmm_bad_minutes_returns_none() { - assert!(next_occurrence_of_hhmm("12:60", None).is_none()); - } - - #[test] - fn next_occurrence_is_in_the_future() { - let result = next_occurrence_of_hhmm("14:30", None).unwrap(); - assert!(result > Utc::now(), "next occurrence must be in the future"); - } - - #[test] - fn next_occurrence_with_named_timezone_is_in_the_future() { - let result = next_occurrence_of_hhmm("14:30", Some("Europe/London")).unwrap(); - assert!( - result > Utc::now(), - "next occurrence (Europe/London) must be in the future" - ); - } - - #[test] - fn next_occurrence_with_invalid_timezone_falls_back_to_local() { - // An unrecognised timezone name falls back to chrono::Local (returns Some). - let result = next_occurrence_of_hhmm("14:30", Some("Invalid/Zone")); - assert!( - result.is_some(), - "invalid timezone should fall back to local and return Some" - ); - } - - // ── extract_timer_command ─────────────────────────────────────────── - - #[test] - fn non_timer_command_returns_none() { - assert!(extract_timer_command("Timmy help", "Timmy", "@bot:home").is_none()); - } - - #[test] - fn timer_list_no_args() { - assert_eq!( - extract_timer_command("Timmy timer", "Timmy", "@bot:home"), - Some(TimerCommand::List) - ); - } - - #[test] - fn timer_list_explicit() { - assert_eq!( - extract_timer_command("Timmy timer list", "Timmy", "@bot:home"), - Some(TimerCommand::List) - ); - } - - #[test] - fn timer_cancel_story_id() { - assert_eq!( - extract_timer_command("Timmy timer cancel 421_story_foo", "Timmy", "@bot:home"), - Some(TimerCommand::Cancel { - story_number_or_id: "421_story_foo".to_string() - }) - ); - } - - #[test] - fn timer_cancel_no_arg_is_bad_args() { - assert_eq!( - extract_timer_command("Timmy timer cancel", "Timmy", "@bot:home"), - Some(TimerCommand::BadArgs) - ); - } - - #[test] - fn timer_schedule_with_story_id() { - assert_eq!( - extract_timer_command("Timmy timer 421_story_foo 14:30", "Timmy", "@bot:home"), - Some(TimerCommand::Schedule { - story_number_or_id: "421_story_foo".to_string(), - hhmm: "14:30".to_string(), - }) - ); - } - - #[test] - fn timer_schedule_with_number() { - assert_eq!( - extract_timer_command("Timmy timer 421 14:30", "Timmy", "@bot:home"), - Some(TimerCommand::Schedule { - story_number_or_id: "421".to_string(), - hhmm: "14:30".to_string(), - }) - ); - } - - #[test] - fn timer_schedule_missing_time_is_bad_args() { - assert_eq!( - extract_timer_command("Timmy timer 421_story_foo", "Timmy", "@bot:home"), - Some(TimerCommand::BadArgs) - ); - } - - // ── TimerStore ────────────────────────────────────────────────────── - - #[test] - fn timer_store_empty_on_missing_file() { - let dir = TempDir::new().unwrap(); - let store = TimerStore::load(dir.path().join("timers.json")); - assert!(store.list().is_empty()); - } - - #[test] - fn timer_store_add_and_list() { - let dir = TempDir::new().unwrap(); - let store = TimerStore::load(dir.path().join("timers.json")); - let t = Utc::now() + Duration::hours(1); - store.add("story_1".to_string(), t).unwrap(); - let list = store.list(); - assert_eq!(list.len(), 1); - assert_eq!(list[0].story_id, "story_1"); - } - - #[test] - fn timer_store_remove() { - let dir = TempDir::new().unwrap(); - let store = TimerStore::load(dir.path().join("timers.json")); - let t = Utc::now() + Duration::hours(1); - store.add("story_1".to_string(), t).unwrap(); - assert!(store.remove("story_1")); - assert!(!store.remove("story_1")); // already gone - assert!(store.list().is_empty()); - } - - #[test] - fn timer_store_persists_and_reloads() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("timers.json"); - let t = Utc::now() + Duration::hours(2); - { - let store = TimerStore::load(path.clone()); - store.add("421_story_foo".to_string(), t).unwrap(); - } - // Reload from disk. - let store2 = TimerStore::load(path); - let list = store2.list(); - assert_eq!(list.len(), 1); - assert_eq!(list[0].story_id, "421_story_foo"); - } - - #[test] - fn take_due_returns_only_past_entries() { - let dir = TempDir::new().unwrap(); - let store = TimerStore::load(dir.path().join("timers.json")); - let past = Utc::now() - Duration::minutes(1); - let future = Utc::now() + Duration::hours(1); - store.add("past_story".to_string(), past).unwrap(); - store.add("future_story".to_string(), future).unwrap(); - - let due = store.take_due(Utc::now()); - assert_eq!(due.len(), 1); - assert_eq!(due[0].story_id, "past_story"); - assert_eq!(store.list().len(), 1); - assert_eq!(store.list()[0].story_id, "future_story"); - } - - // ── AC3: upsert ───────────────────────────────────────────────────── - - #[test] - fn upsert_adds_new_timer_when_none_exists() { - let dir = TempDir::new().unwrap(); - let store = TimerStore::load(dir.path().join("timers.json")); - let t = Utc::now() + Duration::hours(1); - store.upsert("story_1".to_string(), t).unwrap(); - let list = store.list(); - assert_eq!(list.len(), 1); - assert_eq!(list[0].story_id, "story_1"); - assert_eq!(list[0].scheduled_at, t); - } - - #[test] - fn upsert_updates_to_later_time() { - let dir = TempDir::new().unwrap(); - let store = TimerStore::load(dir.path().join("timers.json")); - let early = Utc::now() + Duration::hours(1); - let later = Utc::now() + Duration::hours(2); - store.upsert("story_1".to_string(), early).unwrap(); - store.upsert("story_1".to_string(), later).unwrap(); - let list = store.list(); - assert_eq!(list.len(), 1, "should not create duplicate"); - assert_eq!(list[0].scheduled_at, later, "should update to later time"); - } - - #[test] - fn upsert_does_not_downgrade_to_earlier_time() { - let dir = TempDir::new().unwrap(); - let store = TimerStore::load(dir.path().join("timers.json")); - let later = Utc::now() + Duration::hours(2); - let earlier = Utc::now() + Duration::hours(1); - store.upsert("story_1".to_string(), later).unwrap(); - store.upsert("story_1".to_string(), earlier).unwrap(); - let list = store.list(); - assert_eq!(list.len(), 1); - assert_eq!( - list[0].scheduled_at, later, - "should keep the later time, not downgrade" - ); - } - - // ── AC2: spawn_rate_limit_auto_scheduler ──────────────────────────── - - /// AC2: a RateLimitHardBlock event causes the auto-scheduler to add a timer. - #[tokio::test] - async fn rate_limit_auto_scheduler_adds_timer_on_hard_block() { - use crate::io::watcher::WatcherEvent; - - let dir = TempDir::new().unwrap(); - let store = Arc::new(TimerStore::load(dir.path().join("timers.json"))); - let (watcher_tx, watcher_rx) = tokio::sync::broadcast::channel::(16); - - spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx); - - let reset_at = Utc::now() + Duration::hours(1); - watcher_tx - .send(WatcherEvent::RateLimitHardBlock { - story_id: "423_story_test".to_string(), - agent_name: "coder-1".to_string(), - reset_at, - }) - .unwrap(); - - // Give the spawned task time to process the event. - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let list = store.list(); - assert_eq!(list.len(), 1, "expected one timer after hard block"); - assert_eq!(list[0].story_id, "423_story_test"); - assert_eq!(list[0].scheduled_at, reset_at); - } - - /// AC3 integration: a second hard block with a later reset_at updates the - /// existing timer rather than creating a duplicate. - #[tokio::test] - async fn rate_limit_auto_scheduler_upserts_on_repeated_hard_block() { - use crate::io::watcher::WatcherEvent; - - let dir = TempDir::new().unwrap(); - let store = Arc::new(TimerStore::load(dir.path().join("timers.json"))); - let (watcher_tx, watcher_rx) = tokio::sync::broadcast::channel::(16); - - spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx); - - let first = Utc::now() + Duration::hours(1); - let second = Utc::now() + Duration::hours(2); - - watcher_tx - .send(WatcherEvent::RateLimitHardBlock { - story_id: "423_story_test".to_string(), - agent_name: "coder-1".to_string(), - reset_at: first, - }) - .unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - watcher_tx - .send(WatcherEvent::RateLimitHardBlock { - story_id: "423_story_test".to_string(), - agent_name: "coder-1".to_string(), - reset_at: second, - }) - .unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let list = store.list(); - assert_eq!(list.len(), 1, "should not create a duplicate timer"); - assert_eq!(list[0].scheduled_at, second, "should update to later time"); - } - - #[test] - fn multiple_timers_same_time_all_returned() { - let dir = TempDir::new().unwrap(); - let store = TimerStore::load(dir.path().join("timers.json")); - let past = Utc::now() - Duration::minutes(1); - store.add("story_a".to_string(), past).unwrap(); - store.add("story_b".to_string(), past).unwrap(); - - let due = store.take_due(Utc::now()); - assert_eq!(due.len(), 2, "both timers at same time must fire"); - } - - // ── handle_timer_command ──────────────────────────────────────────── - - #[tokio::test] - async fn handle_list_empty() { - let dir = TempDir::new().unwrap(); - let store = TimerStore::load(dir.path().join("timers.json")); - let result = handle_timer_command(TimerCommand::List, &store, dir.path()).await; - assert!(result.contains("No pending timers"), "unexpected: {result}"); - } - - #[tokio::test] - async fn handle_cancel_not_found() { - let dir = TempDir::new().unwrap(); - let store = TimerStore::load(dir.path().join("timers.json")); - let result = handle_timer_command( - TimerCommand::Cancel { - story_number_or_id: "421_story_foo".to_string(), - }, - &store, - dir.path(), - ) - .await; - assert!(result.contains("No timer found"), "unexpected: {result}"); - } - - #[tokio::test] - async fn handle_schedule_story_not_in_backlog_or_current() { - let dir = TempDir::new().unwrap(); - // Ensure CRDT content store is initialised so the DB-first lookup works. - crate::db::ensure_content_store(); - // No story written — "9950_story_timer_neg" should not be found. - let store = TimerStore::load(dir.path().join("timers.json")); - let result = handle_timer_command( - TimerCommand::Schedule { - story_number_or_id: "9950_story_timer_neg".to_string(), - hhmm: "14:30".to_string(), - }, - &store, - dir.path(), - ) - .await; - assert!( - result.contains("not in backlog or current"), - "unexpected: {result}" - ); - } - - #[tokio::test] - async fn handle_schedule_accepts_backlog_story() { - let dir = TempDir::new().unwrap(); - let backlog_dir = dir.path().join(".huskies/work/1_backlog"); - std::fs::create_dir_all(&backlog_dir).unwrap(); - std::fs::write( - backlog_dir.join("421_story_foo.md"), - "---\nname: Foo\n---\n", - ) - .unwrap(); - let store = TimerStore::load(dir.path().join("timers.json")); - let result = handle_timer_command( - TimerCommand::Schedule { - story_number_or_id: "421_story_foo".to_string(), - hhmm: "14:30".to_string(), - }, - &store, - dir.path(), - ) - .await; - assert!( - result.contains("Timer set"), - "backlog story should be accepted: {result}" - ); - } - - #[tokio::test] - async fn handle_schedule_success() { - let dir = TempDir::new().unwrap(); - let current_dir = dir.path().join(".huskies/work/2_current"); - std::fs::create_dir_all(¤t_dir).unwrap(); - std::fs::write(current_dir.join("421_story_foo.md"), "---\nname: Foo\n---").unwrap(); - let store = TimerStore::load(dir.path().join("timers.json")); - let result = handle_timer_command( - TimerCommand::Schedule { - story_number_or_id: "421_story_foo".to_string(), - hhmm: "23:59".to_string(), - }, - &store, - dir.path(), - ) - .await; - assert!(result.contains("Timer set for"), "unexpected: {result}"); - assert_eq!(store.list().len(), 1); - } - - #[tokio::test] - async fn handle_schedule_invalid_time() { - let dir = TempDir::new().unwrap(); - let current_dir = dir.path().join(".huskies/work/2_current"); - std::fs::create_dir_all(¤t_dir).unwrap(); - std::fs::write(current_dir.join("421_story_foo.md"), "---\nname: Foo\n---").unwrap(); - let store = TimerStore::load(dir.path().join("timers.json")); - let result = handle_timer_command( - TimerCommand::Schedule { - story_number_or_id: "421_story_foo".to_string(), - hhmm: "99:00".to_string(), - }, - &store, - dir.path(), - ) - .await; - assert!(result.contains("Invalid time"), "unexpected: {result}"); - } - - #[tokio::test] - async fn handle_cancel_existing_timer() { - let dir = TempDir::new().unwrap(); - let store = TimerStore::load(dir.path().join("timers.json")); - let future = Utc::now() + Duration::hours(1); - store.add("421_story_foo".to_string(), future).unwrap(); - let result = handle_timer_command( - TimerCommand::Cancel { - story_number_or_id: "421_story_foo".to_string(), - }, - &store, - dir.path(), - ) - .await; - assert!(result.contains("cancelled"), "unexpected: {result}"); - assert!(store.list().is_empty()); - } - - #[tokio::test] - async fn handle_list_with_entries() { - let dir = TempDir::new().unwrap(); - let store = TimerStore::load(dir.path().join("timers.json")); - let future = Utc::now() + Duration::hours(1); - store.add("421_story_foo".to_string(), future).unwrap(); - let result = handle_timer_command(TimerCommand::List, &store, dir.path()).await; - assert!(result.contains("421_story_foo"), "unexpected: {result}"); - assert!(result.contains("Pending timers"), "unexpected: {result}"); - } - - // ── AC: firing a timer for a backlog story moves it to current ─────── - - /// When a timer fires for a story in backlog, the tick loop calls - /// `move_story_to_current` before `start_agent`. This test exercises - /// that exact sequence (minus the agent pool) to prove the story ends - /// up in `2_current/` after firing. - #[test] - fn fired_timer_for_backlog_story_moves_to_current() { - use std::fs; - - let dir = TempDir::new().unwrap(); - let root = dir.path(); - let backlog = root.join(".huskies/work/1_backlog"); - let current = root.join(".huskies/work/2_current"); - fs::create_dir_all(&backlog).unwrap(); - fs::create_dir_all(¤t).unwrap(); - let content = "---\nname: Foo\n---\n"; - fs::write(backlog.join("421_story_foo.md"), content).unwrap(); - crate::db::ensure_content_store(); - crate::db::write_content("421_story_foo", content); - - // Add a past timer so take_due returns it immediately. - let store = TimerStore::load(root.join("timers.json")); - let past = Utc::now() - Duration::seconds(1); - store.add("421_story_foo".to_string(), past).unwrap(); - - // Drain due timers — same as the tick loop does. - let due = store.take_due(Utc::now()); - assert_eq!(due.len(), 1, "expected one fired timer"); - - // Apply the move-to-current step the tick loop performs. - for entry in &due { - crate::agents::lifecycle::move_story_to_current(root, &entry.story_id) - .expect("move_story_to_current should succeed for backlog story"); - } - - // Story must still be accessible in the content store after the move. - assert!( - crate::db::read_content("421_story_foo").is_some(), - "story should be in the content store after timer fires" - ); - // Timer was consumed. - assert!( - store.list().is_empty(), - "fired timer should be removed from store" - ); - } - - // ── AC4: tick_once integration test ───────────────────────────────── - - /// Create a past-due timer, run tick_once, and assert the entry is - /// consumed. start_agent will fail (no real agent binary), but - /// take_due must still drain the entry. - #[tokio::test] - async fn tick_once_consumes_past_due_entry() { - use std::fs; - - let dir = TempDir::new().unwrap(); - let root = dir.path(); - let backlog = root.join(".huskies/work/1_backlog"); - let current = root.join(".huskies/work/2_current"); - fs::create_dir_all(&backlog).unwrap(); - fs::create_dir_all(¤t).unwrap(); - let content = "---\nname: Foo\n---\n"; - fs::write(backlog.join("9905_story_foo.md"), content).unwrap(); - crate::db::ensure_content_store(); - crate::db::write_content("9905_story_foo", content); - - let store = Arc::new(TimerStore::load(root.join("timers.json"))); - let past = Utc::now() - Duration::seconds(5); - store.add("9905_story_foo".to_string(), past).unwrap(); - assert_eq!(store.list().len(), 1, "precondition: one pending timer"); - - let agents = Arc::new(crate::agents::AgentPool::new_test(19999)); - - // tick_once should drain the due entry even though start_agent - // will fail (no agent binary configured in the test pool). - let result = super::tick_once(&store, &agents, root).await; - assert!(result.is_ok(), "tick_once should not panic: {result:?}"); - assert!( - store.list().is_empty(), - "past-due timer must be consumed after tick_once" - ); - // Story should still be accessible in the content store after the move. - assert!( - crate::db::read_content("9905_story_foo").is_some(), - "story should be in the content store after tick fires" - ); - } -} +//! This module exists only for backwards compatibility during the migration. +//! Callers should use `crate::service::timer::*` directly. +#[allow(unused_imports)] +pub use crate::service::timer::{ + TimerCommand, TimerEntry, TimerStore, extract_timer_command, handle_timer_command, + next_occurrence_of_hhmm, spawn_rate_limit_auto_scheduler, tick_once, +}; diff --git a/server/src/chat/transport/matrix/bot/context.rs b/server/src/chat/transport/matrix/bot/context.rs index 1cb67a97..f9d1c0d1 100644 --- a/server/src/chat/transport/matrix/bot/context.rs +++ b/server/src/chat/transport/matrix/bot/context.rs @@ -1,8 +1,8 @@ //! Matrix bot context — shared state for the Matrix bot (rooms, history, permissions). use crate::agents::AgentPool; use crate::chat::ChatTransport; -use crate::chat::timer::TimerStore; use crate::http::context::{PermissionDecision, PermissionForward}; +use crate::service::timer::TimerStore; use matrix_sdk::ruma::{OwnedEventId, OwnedRoomId, OwnedUserId}; use std::collections::{BTreeMap, HashMap, HashSet}; use std::path::PathBuf; @@ -177,7 +177,7 @@ mod tests { "test-token".to_string(), "pipeline_notification".to_string(), )), - timer_store: Arc::new(crate::chat::timer::TimerStore::load( + timer_store: Arc::new(crate::service::timer::TimerStore::load( std::path::PathBuf::from("/tmp/timers.json"), )), gateway_active_project: None, @@ -215,7 +215,7 @@ mod tests { "test-token".to_string(), "pipeline_notification".to_string(), )), - timer_store: Arc::new(crate::chat::timer::TimerStore::load( + timer_store: Arc::new(crate::service::timer::TimerStore::load( std::path::PathBuf::from("/tmp/timers.json"), )), gateway_active_project: Some(Arc::clone(&active)), @@ -256,7 +256,7 @@ mod tests { "test-token".to_string(), "pipeline_notification".to_string(), )), - timer_store: Arc::new(crate::chat::timer::TimerStore::load( + timer_store: Arc::new(crate::service::timer::TimerStore::load( std::path::PathBuf::from("/tmp/timers.json"), )), gateway_active_project: Some(Arc::clone(&active)), @@ -306,7 +306,7 @@ mod tests { "test-token".to_string(), "pipeline_notification".to_string(), )), - timer_store: Arc::new(crate::chat::timer::TimerStore::load( + timer_store: Arc::new(crate::service::timer::TimerStore::load( std::path::PathBuf::from("/tmp/timers.json"), )), gateway_active_project: None, diff --git a/server/src/chat/transport/matrix/bot/messages.rs b/server/src/chat/transport/matrix/bot/messages.rs index dff4703d..40a75e8f 100644 --- a/server/src/chat/transport/matrix/bot/messages.rs +++ b/server/src/chat/transport/matrix/bot/messages.rs @@ -572,13 +572,13 @@ pub(super) async fn on_room_message( // Check for the timer command, which requires async file I/O and cannot // be handled by the sync command registry. - if let Some(timer_cmd) = crate::chat::timer::extract_timer_command( + if let Some(timer_cmd) = crate::service::timer::extract_timer_command( &user_message, &ctx.bot_name, ctx.bot_user_id.as_str(), ) { slog!("[matrix-bot] Handling timer command from {sender}: {timer_cmd:?}"); - let response = crate::chat::timer::handle_timer_command( + let response = crate::service::timer::handle_timer_command( timer_cmd, &ctx.timer_store, &ctx.project_root, diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index f15cbd11..525cc2e4 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -228,11 +228,14 @@ pub async fn run_bot( .unwrap_or_else(|| "Assistant".to_string()); let announce_bot_name = bot_name.clone(); - let timer_store = Arc::new(crate::chat::timer::TimerStore::load( + let timer_store = Arc::new(crate::service::timer::TimerStore::load( project_root.join(".huskies").join("timers.json"), )); // Auto-schedule timers when an agent hits a hard rate limit. - crate::chat::timer::spawn_rate_limit_auto_scheduler(Arc::clone(&timer_store), watcher_rx_auto); + crate::service::timer::spawn_rate_limit_auto_scheduler( + Arc::clone(&timer_store), + watcher_rx_auto, + ); let ctx = BotContext { bot_user_id, diff --git a/server/src/http/context.rs b/server/src/http/context.rs index 37b009eb..e88abb2c 100644 --- a/server/src/http/context.rs +++ b/server/src/http/context.rs @@ -1,8 +1,8 @@ //! Application context — shared state (`AppContext`) threaded through all HTTP handlers. use crate::agents::{AgentPool, ReconciliationEvent}; -use crate::chat::timer::TimerStore; use crate::io::watcher::WatcherEvent; use crate::rebuild::{BotShutdownNotifier, ShutdownReason}; +use crate::service::timer::TimerStore; use crate::state::SessionState; use crate::store::JsonFileStore; use crate::workflow::WorkflowState; diff --git a/server/src/main.rs b/server/src/main.rs index d54290f6..ad91cf4a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -780,7 +780,7 @@ async fn main() -> Result<(), std::io::Error> { // in `chat::transport::matrix::bot::run::spawn_bot`. Refactor to consume this // shared instance via `AppContext.timer_store` so cancellations from MCP // tools and the bot's tick loop see the same in-memory state. - let timer_store = std::sync::Arc::new(crate::chat::timer::TimerStore::load( + let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load( startup_root .as_ref() .map(|r| r.join(".huskies").join("timers.json")) @@ -844,7 +844,7 @@ async fn main() -> Result<(), std::io::Error> { // Timer: fire due timers every second. if let Some(ref root) = tick_root { let result = - crate::chat::timer::tick_once(&tick_timer, &tick_agents, root).await; + crate::service::timer::tick_once(&tick_timer, &tick_agents, root).await; if let Err(msg) = result { crate::slog_error!("[tick] Timer tick panicked: {msg}"); } diff --git a/server/src/service/bot_command/io.rs b/server/src/service/bot_command/io.rs index db0ec70b..cf7ea9cc 100644 --- a/server/src/service/bot_command/io.rs +++ b/server/src/service/bot_command/io.rs @@ -74,7 +74,7 @@ pub(super) async fn call_rebuild(project_root: &Path, agents: &Arc) - /// Returns `Err` with a usage string if the timer arguments cannot be parsed. pub(super) async fn call_timer(args: &str, project_root: &Path) -> Result { let synthetic = format!("__web_ui__ timer {args}"); - let timer_cmd = match crate::chat::timer::extract_timer_command( + let timer_cmd = match crate::service::timer::extract_timer_command( &synthetic, "__web_ui__", "@__web_ui__:localhost", @@ -88,8 +88,8 @@ pub(super) async fn call_timer(args: &str, project_root: &Path) -> Result>, +} + +impl TimerStore { + /// Load the timer store from `path`. Returns an empty store if the file + /// does not exist or cannot be parsed. + pub fn load(path: PathBuf) -> Self { + let timers = if path.exists() { + std::fs::read_to_string(&path) + .ok() + .and_then(|s| deserialize_timers(&s)) + .unwrap_or_default() + } else { + Vec::new() + }; + Self { + path, + timers: Mutex::new(timers), + } + } + + fn save_locked(path: &Path, timers: &[TimerEntry]) -> Result<(), String> { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .map_err(|e| format!("Failed to create directory: {e}"))?; + } + let content = serialize_timers(timers)?; + std::fs::write(path, content).map_err(|e| format!("Failed to write timers: {e}")) + } + + /// Add a timer and persist to disk. + pub fn add(&self, story_id: String, scheduled_at: DateTime) -> Result<(), String> { + let mut timers = self.timers.lock().unwrap(); + timers.push(TimerEntry { + story_id, + scheduled_at, + }); + Self::save_locked(&self.path, &timers) + } + + /// Remove the timer for `story_id`. Returns `true` if one was removed. + pub fn remove(&self, story_id: &str) -> bool { + let mut timers = self.timers.lock().unwrap(); + let before = timers.len(); + timers.retain(|t| t.story_id != story_id); + let removed = timers.len() < before; + if removed { + let _ = Self::save_locked(&self.path, &timers); + } + removed + } + + /// Return all pending timers (cloned). + pub fn list(&self) -> Vec { + self.timers.lock().unwrap().clone() + } + + /// Add or update a timer for `story_id`. + /// + /// - If no timer exists for `story_id`, adds it. + /// - If a timer already exists and `scheduled_at` is **later**, updates it. + /// - If a timer already exists and `scheduled_at` is earlier or equal, no-op. + /// + /// Use this instead of [`add`] when auto-scheduling from rate-limit events to + /// avoid creating duplicates and to always keep the latest reset time. + pub fn upsert(&self, story_id: String, scheduled_at: DateTime) -> Result<(), String> { + let mut timers = self.timers.lock().unwrap(); + if let Some(existing) = timers.iter_mut().find(|t| t.story_id == story_id) { + if scheduled_at > existing.scheduled_at { + existing.scheduled_at = scheduled_at; + Self::save_locked(&self.path, &timers)?; + } + } else { + timers.push(TimerEntry { + story_id, + scheduled_at, + }); + Self::save_locked(&self.path, &timers)?; + } + Ok(()) + } + + /// Remove and return all timers whose `scheduled_at` is ≤ `now`. + /// Persists the updated list to disk if any timers were removed. + pub fn take_due(&self, now: DateTime) -> Vec { + let mut timers = self.timers.lock().unwrap(); + let mut due = Vec::new(); + let mut remaining = Vec::new(); + for t in timers.drain(..) { + if t.scheduled_at <= now { + due.push(t); + } else { + remaining.push(t); + } + } + *timers = remaining; + if !due.is_empty() { + let _ = Self::save_locked(&self.path, &timers); + } + due + } +} + +// ── Clock-reading wrapper ─────────────────────────────────────────────────── + +/// Parse `HH:MM` and return the next UTC instant at which the given timezone +/// (or the server-local clock when `timezone` is `None`) will read that time. +/// If the time has already passed today, returns tomorrow's occurrence. +/// +/// This wrapper reads the current clock via `Utc::now()`. For pure unit tests +/// use [`crate::service::timer::schedule::next_occurrence_at`] directly. +pub fn next_occurrence_of_hhmm(hhmm: &str, timezone: Option<&str>) -> Option> { + next_occurrence_at(hhmm, timezone, Utc::now()) +} + +// ── Story-ID resolution ───────────────────────────────────────────────────── + +/// Resolve a story ID from a numeric story number or a full ID string. +/// +/// Searches all pipeline stages. Returns `None` only when the input is +/// numeric but no matching file is found. +pub(super) fn resolve_story_id(number_or_id: &str, project_root: &Path) -> Option { + const STAGES: &[&str] = &[ + "1_backlog", + "2_current", + "3_qa", + "4_merge", + "5_done", + "6_archived", + ]; + + // Full ID (contains underscores) — return as-is; validation happens at file-check time. + if number_or_id.contains('_') { + return Some(number_or_id.to_string()); + } + + // Numeric lookup. + if !number_or_id.chars().all(|c| c.is_ascii_digit()) { + return None; + } + + // --- DB-first lookup --- + for id in crate::db::all_content_ids() { + let file_num = id.split('_').next().unwrap_or(""); + if file_num == number_or_id + && crate::pipeline_state::read_typed(&id) + .ok() + .flatten() + .is_some() + { + return Some(id); + } + } + + // --- Filesystem fallback --- + for stage in STAGES { + let dir = project_root.join(".huskies").join("work").join(stage); + if !dir.exists() { + continue; + } + if let Ok(entries) = std::fs::read_dir(&dir) { + for entry in entries.flatten() { + let path = entry.path(); + if path.extension().and_then(|e| e.to_str()) != Some("md") { + continue; + } + if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) { + let file_num = stem + .split('_') + .next() + .filter(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit())) + .unwrap_or(""); + if file_num == number_or_id { + return Some(stem.to_string()); + } + } + } + } + } + None +} + +// ── Tick loop ─────────────────────────────────────────────────────────────── + +/// Execute one tick of the timer loop. +/// +/// Called by the unified background tick loop every second. +/// Separated from the loop so we can catch panics at the call-site. +/// Returns `Err` only when the tick panicked (the panic message is returned). +pub async fn tick_once( + store: &Arc, + agents: &Arc, + project_root: &Path, +) -> Result<(), String> { + // take_due is sync and could panic (e.g. poisoned mutex) — catch it. + let due = { + let store_ref = Arc::clone(store); + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || { + store_ref.take_due(Utc::now()) + })); + match result { + Ok(due) => due, + Err(e) => return Err(panic_payload_to_string(&e)), + } + }; + + if due.is_empty() { + return Ok(()); + } + + let remaining = store.list().len(); + crate::slog!("[timer] Tick: {} due, {remaining} remaining", due.len()); + + for entry in due { + crate::slog!("[timer] Timer fired for story {}", entry.story_id); + + // Bug 501: Defense-in-depth check. If the story has already advanced + // past the active-work stages (3_qa, 4_merge, 5_done, 6_archived), + // there is nothing to resume — the timer is stale and should no-op. + // The primary cancellation paths (move_story MCP → backlog, stop_agent) + // remove the timer before it fires; this guard covers the case where + // cancellation was not yet called or the story raced forward through + // the pipeline while the timer was pending. + if let Ok(Some(item)) = crate::pipeline_state::read_typed(&entry.story_id) { + use crate::pipeline_state::Stage; + match &item.stage { + Stage::Qa | Stage::Merge { .. } | Stage::Done { .. } | Stage::Archived { .. } => { + crate::slog!( + "[timer] Skipping timer for story {} — currently in '{}', \ + not in backlog/current; timer is stale", + entry.story_id, + item.stage.dir_name() + ); + continue; + } + _ => {} + } + } + + // Move from backlog to current if needed — the auto-assign + // watcher will then start an agent automatically. + if let Err(e) = + crate::agents::lifecycle::move_story_to_current(project_root, &entry.story_id) + { + crate::slog!( + "[timer] Failed to move story {} to current: {e}", + entry.story_id + ); + continue; + } + + match agents + .start_agent(project_root, &entry.story_id, None, None, None) + .await + { + Ok(info) => { + crate::slog!( + "[timer] Started agent {} for story {}", + info.agent_name, + entry.story_id + ); + } + Err(e) => { + crate::slog!( + "[timer] Failed to start agent for story {}: {e} \ + (auto-assign may pick it up)", + entry.story_id + ); + } + } + } + Ok(()) +} + +fn panic_payload_to_string(payload: &Box) -> String { + if let Some(s) = payload.downcast_ref::<&str>() { + (*s).to_string() + } else if let Some(s) = payload.downcast_ref::() { + s.clone() + } else { + "unknown panic".to_string() + } +} + +// ── Rate-limit auto-scheduler ─────────────────────────────────────────────── + +/// Spawn a background task that listens for [`WatcherEvent::RateLimitHardBlock`] +/// events and auto-schedules a timer for the blocked story. +/// +/// If a timer already exists for the story, it is updated to the later reset time +/// rather than creating a duplicate (via [`TimerStore::upsert`]). +pub fn spawn_rate_limit_auto_scheduler( + store: Arc, + mut watcher_rx: tokio::sync::broadcast::Receiver, +) { + tokio::spawn(async move { + loop { + match watcher_rx.recv().await { + Ok(crate::io::watcher::WatcherEvent::RateLimitHardBlock { + story_id, + agent_name, + reset_at, + }) => { + // Skip short rate limits (≤10 min) — the CLI handles + // these internally. Only schedule timers for long + // session-level blocks where the CLI will exit. + let until_reset = reset_at.signed_duration_since(chrono::Utc::now()); + if until_reset.num_minutes() <= 10 { + crate::slog!( + "[timer] Skipping short rate limit for {story_id} \ + ({} min); CLI will handle internally", + until_reset.num_minutes() + ); + continue; + } + crate::slog!( + "[timer] Auto-scheduling timer for story {story_id} \ + (agent {agent_name}) to resume at {reset_at}" + ); + match store.upsert(story_id.clone(), reset_at) { + Ok(()) => { + crate::slog!( + "[timer] Timer upserted for story {story_id}; \ + scheduled at {reset_at}" + ); + } + Err(e) => { + crate::slog!( + "[timer] Failed to upsert timer for story {story_id}: {e}" + ); + } + } + } + Ok(_) => {} + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + crate::slog!("[timer] Rate-limit auto-scheduler lagged, skipped {n} events"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + crate::slog!( + "[timer] Watcher channel closed, stopping rate-limit auto-scheduler" + ); + break; + } + } + } + }); +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Duration; + use tempfile::TempDir; + + // ── next_occurrence_of_hhmm ───────────────────────────────────────── + + #[test] + fn valid_hhmm_returns_some() { + let result = next_occurrence_of_hhmm("14:30", None); + assert!(result.is_some(), "valid HH:MM should return Some"); + } + + #[test] + fn invalid_hhmm_missing_colon_returns_none() { + assert!(next_occurrence_of_hhmm("1430", None).is_none()); + } + + #[test] + fn invalid_hhmm_bad_hours_returns_none() { + assert!(next_occurrence_of_hhmm("25:00", None).is_none()); + } + + #[test] + fn invalid_hhmm_bad_minutes_returns_none() { + assert!(next_occurrence_of_hhmm("12:60", None).is_none()); + } + + #[test] + fn next_occurrence_is_in_the_future() { + let result = next_occurrence_of_hhmm("14:30", None).unwrap(); + assert!(result > Utc::now(), "next occurrence must be in the future"); + } + + #[test] + fn next_occurrence_with_named_timezone_is_in_the_future() { + let result = next_occurrence_of_hhmm("14:30", Some("Europe/London")).unwrap(); + assert!( + result > Utc::now(), + "next occurrence (Europe/London) must be in the future" + ); + } + + #[test] + fn next_occurrence_with_invalid_timezone_falls_back_to_local() { + // An unrecognised timezone name falls back to chrono::Local (returns Some). + let result = next_occurrence_of_hhmm("14:30", Some("Invalid/Zone")); + assert!( + result.is_some(), + "invalid timezone should fall back to local and return Some" + ); + } + + // ── TimerStore ────────────────────────────────────────────────────── + + #[test] + fn timer_store_empty_on_missing_file() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + assert!(store.list().is_empty()); + } + + #[test] + fn timer_store_add_and_list() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let t = Utc::now() + Duration::hours(1); + store.add("story_1".to_string(), t).unwrap(); + let list = store.list(); + assert_eq!(list.len(), 1); + assert_eq!(list[0].story_id, "story_1"); + } + + #[test] + fn timer_store_remove() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let t = Utc::now() + Duration::hours(1); + store.add("story_1".to_string(), t).unwrap(); + assert!(store.remove("story_1")); + assert!(!store.remove("story_1")); // already gone + assert!(store.list().is_empty()); + } + + #[test] + fn timer_store_persists_and_reloads() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("timers.json"); + let t = Utc::now() + Duration::hours(2); + { + let store = TimerStore::load(path.clone()); + store.add("421_story_foo".to_string(), t).unwrap(); + } + // Reload from disk. + let store2 = TimerStore::load(path); + let list = store2.list(); + assert_eq!(list.len(), 1); + assert_eq!(list[0].story_id, "421_story_foo"); + } + + #[test] + fn take_due_returns_only_past_entries() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let past = Utc::now() - Duration::minutes(1); + let future = Utc::now() + Duration::hours(1); + store.add("past_story".to_string(), past).unwrap(); + store.add("future_story".to_string(), future).unwrap(); + + let due = store.take_due(Utc::now()); + assert_eq!(due.len(), 1); + assert_eq!(due[0].story_id, "past_story"); + assert_eq!(store.list().len(), 1); + assert_eq!(store.list()[0].story_id, "future_story"); + } + + // ── upsert ───────────────────────────────────────────────────────── + + #[test] + fn upsert_adds_new_timer_when_none_exists() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let t = Utc::now() + Duration::hours(1); + store.upsert("story_1".to_string(), t).unwrap(); + let list = store.list(); + assert_eq!(list.len(), 1); + assert_eq!(list[0].story_id, "story_1"); + assert_eq!(list[0].scheduled_at, t); + } + + #[test] + fn upsert_updates_to_later_time() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let early = Utc::now() + Duration::hours(1); + let later = Utc::now() + Duration::hours(2); + store.upsert("story_1".to_string(), early).unwrap(); + store.upsert("story_1".to_string(), later).unwrap(); + let list = store.list(); + assert_eq!(list.len(), 1, "should not create duplicate"); + assert_eq!(list[0].scheduled_at, later, "should update to later time"); + } + + #[test] + fn upsert_does_not_downgrade_to_earlier_time() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let later = Utc::now() + Duration::hours(2); + let earlier = Utc::now() + Duration::hours(1); + store.upsert("story_1".to_string(), later).unwrap(); + store.upsert("story_1".to_string(), earlier).unwrap(); + let list = store.list(); + assert_eq!(list.len(), 1); + assert_eq!( + list[0].scheduled_at, later, + "should keep the later time, not downgrade" + ); + } + + // ── spawn_rate_limit_auto_scheduler ───────────────────────────────── + + /// AC2: a RateLimitHardBlock event causes the auto-scheduler to add a timer. + #[tokio::test] + async fn rate_limit_auto_scheduler_adds_timer_on_hard_block() { + use crate::io::watcher::WatcherEvent; + + let dir = TempDir::new().unwrap(); + let store = Arc::new(TimerStore::load(dir.path().join("timers.json"))); + let (watcher_tx, watcher_rx) = tokio::sync::broadcast::channel::(16); + + spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx); + + let reset_at = Utc::now() + Duration::hours(1); + watcher_tx + .send(WatcherEvent::RateLimitHardBlock { + story_id: "423_story_test".to_string(), + agent_name: "coder-1".to_string(), + reset_at, + }) + .unwrap(); + + // Give the spawned task time to process the event. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let list = store.list(); + assert_eq!(list.len(), 1, "expected one timer after hard block"); + assert_eq!(list[0].story_id, "423_story_test"); + assert_eq!(list[0].scheduled_at, reset_at); + } + + /// AC3 integration: a second hard block with a later reset_at updates the + /// existing timer rather than creating a duplicate. + #[tokio::test] + async fn rate_limit_auto_scheduler_upserts_on_repeated_hard_block() { + use crate::io::watcher::WatcherEvent; + + let dir = TempDir::new().unwrap(); + let store = Arc::new(TimerStore::load(dir.path().join("timers.json"))); + let (watcher_tx, watcher_rx) = tokio::sync::broadcast::channel::(16); + + spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx); + + let first = Utc::now() + Duration::hours(1); + let second = Utc::now() + Duration::hours(2); + + watcher_tx + .send(WatcherEvent::RateLimitHardBlock { + story_id: "423_story_test".to_string(), + agent_name: "coder-1".to_string(), + reset_at: first, + }) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + watcher_tx + .send(WatcherEvent::RateLimitHardBlock { + story_id: "423_story_test".to_string(), + agent_name: "coder-1".to_string(), + reset_at: second, + }) + .unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let list = store.list(); + assert_eq!(list.len(), 1, "should not create a duplicate timer"); + assert_eq!(list[0].scheduled_at, second, "should update to later time"); + } + + #[test] + fn multiple_timers_same_time_all_returned() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let past = Utc::now() - Duration::minutes(1); + store.add("story_a".to_string(), past).unwrap(); + store.add("story_b".to_string(), past).unwrap(); + + let due = store.take_due(Utc::now()); + assert_eq!(due.len(), 2, "both timers at same time must fire"); + } + + // ── tick_once ─────────────────────────────────────────────────────── + + /// Create a past-due timer, run tick_once, and assert the entry is + /// consumed. start_agent will fail (no real agent binary), but + /// take_due must still drain the entry. + #[tokio::test] + async fn tick_once_consumes_past_due_entry() { + use std::fs; + + let dir = TempDir::new().unwrap(); + let root = dir.path(); + let backlog = root.join(".huskies/work/1_backlog"); + let current = root.join(".huskies/work/2_current"); + fs::create_dir_all(&backlog).unwrap(); + fs::create_dir_all(¤t).unwrap(); + let content = "---\nname: Foo\n---\n"; + fs::write(backlog.join("9905_story_foo.md"), content).unwrap(); + crate::db::ensure_content_store(); + crate::db::write_content("9905_story_foo", content); + + let store = Arc::new(TimerStore::load(root.join("timers.json"))); + let past = Utc::now() - Duration::seconds(5); + store.add("9905_story_foo".to_string(), past).unwrap(); + assert_eq!(store.list().len(), 1, "precondition: one pending timer"); + + let agents = Arc::new(crate::agents::AgentPool::new_test(19999)); + + // tick_once should drain the due entry even though start_agent + // will fail (no agent binary configured in the test pool). + let result = super::tick_once(&store, &agents, root).await; + assert!(result.is_ok(), "tick_once should not panic: {result:?}"); + assert!( + store.list().is_empty(), + "past-due timer must be consumed after tick_once" + ); + // Story should still be accessible in the content store after the move. + assert!( + crate::db::read_content("9905_story_foo").is_some(), + "story should be in the content store after tick fires" + ); + } +} diff --git a/server/src/service/timer/mod.rs b/server/src/service/timer/mod.rs new file mode 100644 index 00000000..8f74a669 --- /dev/null +++ b/server/src/service/timer/mod.rs @@ -0,0 +1,468 @@ +//! Timer service — deferred agent start via one-shot timers. +//! +//! Provides [`TimerStore`] for persisting timers to `.huskies/timers.json` +//! and command parsing / handling for the `timer` bot command. +//! Due timers are fired by the unified background tick loop in `main`. +//! +//! Follows service-module conventions: +//! - `mod.rs` (this file) — public API, typed [`Error`], orchestration +//! - `io.rs` — the ONLY place that performs side effects (filesystem, clock, spawn) +//! - `parse.rs` — pure: command parsing, time display formatting +//! - `persist.rs` — pure: serialisation/deserialisation of `timers.json` +//! - `schedule.rs` — pure: next-fire-time calculation given a reference instant + +pub(super) mod io; +pub(super) mod parse; +pub(super) mod persist; +pub(super) mod schedule; + +pub use io::{TimerStore, next_occurrence_of_hhmm, spawn_rate_limit_auto_scheduler, tick_once}; +pub use parse::{TimerCommand, extract_timer_command}; +pub use persist::TimerEntry; + +use std::path::Path; + +// ── Error type ──────────────────────────────────────────────────────────────── + +/// Typed errors returned by `service::timer` operations. +/// +/// HTTP handlers and bot commands may map these to user-facing messages: +/// - [`Error::Parse`] → "Invalid time format" +/// - [`Error::DuplicateSchedule`] → "Timer already scheduled" +/// - [`Error::NoSuchSchedule`] → "No timer found" +/// - [`Error::Io`] → "Internal error saving timer" +#[derive(Debug)] +#[allow(dead_code)] +pub enum Error { + /// The supplied `HH:MM` string could not be parsed or is out of range. + Parse(String), + /// A timer already exists for the given story ID. + DuplicateSchedule(String), + /// No timer exists for the given story ID. + NoSuchSchedule(String), + /// A filesystem read or write operation failed. + Io(String), +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Parse(msg) => write!(f, "Parse error: {msg}"), + Self::DuplicateSchedule(id) => { + write!(f, "Timer already exists for story '{id}'") + } + Self::NoSuchSchedule(id) => write!(f, "No timer found for story '{id}'"), + Self::Io(msg) => write!(f, "I/O error: {msg}"), + } + } +} + +// ── Typed public API ────────────────────────────────────────────────────────── + +/// Schedule a new timer for `story_id` to fire at the next `HH:MM` occurrence. +#[allow(dead_code)] +/// +/// Returns the scheduled UTC instant on success. +/// +/// # Errors +/// - [`Error::DuplicateSchedule`] if a timer already exists for `story_id`. +/// - [`Error::Parse`] if `hhmm` is not a valid `HH:MM` string. +/// - [`Error::Io`] if persisting the timer to disk fails. +pub fn schedule_timer( + store: &TimerStore, + story_id: &str, + hhmm: &str, + timezone: Option<&str>, +) -> Result, Error> { + if store.list().iter().any(|t| t.story_id == story_id) { + return Err(Error::DuplicateSchedule(story_id.to_string())); + } + let scheduled_at = next_occurrence_of_hhmm(hhmm, timezone) + .ok_or_else(|| Error::Parse(format!("invalid HH:MM: '{hhmm}'")))?; + store + .add(story_id.to_string(), scheduled_at) + .map_err(Error::Io)?; + Ok(scheduled_at) +} + +/// Cancel an existing timer for `story_id`. +#[allow(dead_code)] +/// +/// # Errors +/// - [`Error::NoSuchSchedule`] if no timer exists for `story_id`. +pub fn cancel_timer(store: &TimerStore, story_id: &str) -> Result<(), Error> { + if store.remove(story_id) { + Ok(()) + } else { + Err(Error::NoSuchSchedule(story_id.to_string())) + } +} + +// ── Command handler ──────────────────────────────────────────────────────────── + +/// Handle a parsed `timer` command. Returns a markdown-formatted response. +pub async fn handle_timer_command( + cmd: TimerCommand, + store: &TimerStore, + project_root: &Path, +) -> String { + // Load the configured timezone (if any) from project.toml. + let config_tz: Option = crate::config::ProjectConfig::load(project_root) + .ok() + .and_then(|c| c.timezone); + let tz_str: Option<&str> = config_tz.as_deref(); + + match cmd { + TimerCommand::Schedule { + story_number_or_id, + hhmm, + } => { + let story_id = match io::resolve_story_id(&story_number_or_id, project_root) { + Some(id) => id, + None => { + return format!("No story with number or ID **{story_number_or_id}** found."); + } + }; + + // The story must be in backlog or current. When the timer fires, + // backlog stories are moved to current automatically. + // Check CRDT state first, then fall back to filesystem. + let in_valid_stage = + if let Ok(Some(item)) = crate::pipeline_state::read_typed(&story_id) { + use crate::pipeline_state::Stage; + matches!(item.stage, Stage::Backlog | Stage::Coding) + } else { + let work_dir = project_root.join(".huskies").join("work"); + work_dir + .join("1_backlog") + .join(format!("{story_id}.md")) + .exists() + || work_dir + .join("2_current") + .join(format!("{story_id}.md")) + .exists() + }; + if !in_valid_stage { + return format!("Story **{story_id}** is not in backlog or current."); + } + + let scheduled_at = match next_occurrence_of_hhmm(&hhmm, tz_str) { + Some(t) => t, + None => { + return format!("Invalid time **{hhmm}**. Use `HH:MM` format (e.g. `14:30`)."); + } + }; + + match store.add(story_id.clone(), scheduled_at) { + Ok(()) => { + let (display_time, tz_label) = parse::format_in_timezone(scheduled_at, tz_str); + format!("Timer set for **{story_id}** at **{display_time}** ({tz_label}).") + } + Err(e) => format!("Failed to save timer: {e}"), + } + } + TimerCommand::List => { + let timers = store.list(); + if timers.is_empty() { + return "No pending timers.".to_string(); + } + let mut lines = vec!["**Pending timers:**".to_string()]; + for t in &timers { + let (display_time, _) = parse::format_in_timezone(t.scheduled_at, tz_str); + lines.push(format!("- **{}** → {}", t.story_id, display_time)); + } + lines.join("\n") + } + TimerCommand::Cancel { story_number_or_id } => { + let story_id = io::resolve_story_id(&story_number_or_id, project_root) + .unwrap_or(story_number_or_id.clone()); + if store.remove(&story_id) { + format!("Timer for **{story_id}** cancelled.") + } else { + format!("No timer found for **{story_id}**.") + } + } + TimerCommand::BadArgs => "Usage:\n\ + - `timer ` — schedule deferred start\n\ + - `timer list` — show pending timers\n\ + - `timer cancel ` — remove a timer" + .to_string(), + } +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{Duration, Utc}; + use tempfile::TempDir; + + // ── Error Display ───────────────────────────────────────────────────────── + + #[test] + fn error_parse_display() { + let e = Error::Parse("bad value".to_string()); + assert!(e.to_string().contains("Parse error")); + assert!(e.to_string().contains("bad value")); + } + + #[test] + fn error_duplicate_schedule_display() { + let e = Error::DuplicateSchedule("421_story_foo".to_string()); + assert!(e.to_string().contains("already exists")); + assert!(e.to_string().contains("421_story_foo")); + } + + #[test] + fn error_no_such_schedule_display() { + let e = Error::NoSuchSchedule("421_story_foo".to_string()); + assert!(e.to_string().contains("No timer found")); + assert!(e.to_string().contains("421_story_foo")); + } + + #[test] + fn error_io_display() { + let e = Error::Io("disk full".to_string()); + assert!(e.to_string().contains("I/O error")); + assert!(e.to_string().contains("disk full")); + } + + // ── schedule_timer ───────────────────────────────────────────────────── + + #[test] + fn schedule_timer_returns_duplicate_when_already_exists() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let t = Utc::now() + Duration::hours(1); + store.add("421_story_foo".to_string(), t).unwrap(); + let result = schedule_timer(&store, "421_story_foo", "14:30", None); + assert!( + matches!(result, Err(Error::DuplicateSchedule(_))), + "expected DuplicateSchedule: {result:?}" + ); + } + + #[test] + fn schedule_timer_returns_parse_error_for_bad_hhmm() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let result = schedule_timer(&store, "421_story_foo", "99:99", None); + assert!( + matches!(result, Err(Error::Parse(_))), + "expected Parse error: {result:?}" + ); + } + + // ── cancel_timer ─────────────────────────────────────────────────────── + + #[test] + fn cancel_timer_returns_no_such_when_missing() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let result = cancel_timer(&store, "421_story_foo"); + assert!( + matches!(result, Err(Error::NoSuchSchedule(_))), + "expected NoSuchSchedule: {result:?}" + ); + } + + #[test] + fn cancel_timer_succeeds_when_exists() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let t = Utc::now() + Duration::hours(1); + store.add("421_story_foo".to_string(), t).unwrap(); + assert!(cancel_timer(&store, "421_story_foo").is_ok()); + assert!(store.list().is_empty()); + } + + // ── handle_timer_command ──────────────────────────────────────────── + + #[tokio::test] + async fn handle_list_empty() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let result = handle_timer_command(TimerCommand::List, &store, dir.path()).await; + assert!(result.contains("No pending timers"), "unexpected: {result}"); + } + + #[tokio::test] + async fn handle_cancel_not_found() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let result = handle_timer_command( + TimerCommand::Cancel { + story_number_or_id: "421_story_foo".to_string(), + }, + &store, + dir.path(), + ) + .await; + assert!(result.contains("No timer found"), "unexpected: {result}"); + } + + #[tokio::test] + async fn handle_schedule_story_not_in_backlog_or_current() { + let dir = TempDir::new().unwrap(); + // Ensure CRDT content store is initialised so the DB-first lookup works. + crate::db::ensure_content_store(); + // No story written — "9950_story_timer_neg" should not be found. + let store = TimerStore::load(dir.path().join("timers.json")); + let result = handle_timer_command( + TimerCommand::Schedule { + story_number_or_id: "9950_story_timer_neg".to_string(), + hhmm: "14:30".to_string(), + }, + &store, + dir.path(), + ) + .await; + assert!( + result.contains("not in backlog or current"), + "unexpected: {result}" + ); + } + + #[tokio::test] + async fn handle_schedule_accepts_backlog_story() { + let dir = TempDir::new().unwrap(); + let backlog_dir = dir.path().join(".huskies/work/1_backlog"); + std::fs::create_dir_all(&backlog_dir).unwrap(); + std::fs::write( + backlog_dir.join("421_story_foo.md"), + "---\nname: Foo\n---\n", + ) + .unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let result = handle_timer_command( + TimerCommand::Schedule { + story_number_or_id: "421_story_foo".to_string(), + hhmm: "14:30".to_string(), + }, + &store, + dir.path(), + ) + .await; + assert!( + result.contains("Timer set"), + "backlog story should be accepted: {result}" + ); + } + + #[tokio::test] + async fn handle_schedule_success() { + let dir = TempDir::new().unwrap(); + let current_dir = dir.path().join(".huskies/work/2_current"); + std::fs::create_dir_all(¤t_dir).unwrap(); + std::fs::write(current_dir.join("421_story_foo.md"), "---\nname: Foo\n---").unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let result = handle_timer_command( + TimerCommand::Schedule { + story_number_or_id: "421_story_foo".to_string(), + hhmm: "23:59".to_string(), + }, + &store, + dir.path(), + ) + .await; + assert!(result.contains("Timer set for"), "unexpected: {result}"); + assert_eq!(store.list().len(), 1); + } + + #[tokio::test] + async fn handle_schedule_invalid_time() { + let dir = TempDir::new().unwrap(); + let current_dir = dir.path().join(".huskies/work/2_current"); + std::fs::create_dir_all(¤t_dir).unwrap(); + std::fs::write(current_dir.join("421_story_foo.md"), "---\nname: Foo\n---").unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let result = handle_timer_command( + TimerCommand::Schedule { + story_number_or_id: "421_story_foo".to_string(), + hhmm: "99:00".to_string(), + }, + &store, + dir.path(), + ) + .await; + assert!(result.contains("Invalid time"), "unexpected: {result}"); + } + + #[tokio::test] + async fn handle_cancel_existing_timer() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let future = Utc::now() + Duration::hours(1); + store.add("421_story_foo".to_string(), future).unwrap(); + let result = handle_timer_command( + TimerCommand::Cancel { + story_number_or_id: "421_story_foo".to_string(), + }, + &store, + dir.path(), + ) + .await; + assert!(result.contains("cancelled"), "unexpected: {result}"); + assert!(store.list().is_empty()); + } + + #[tokio::test] + async fn handle_list_with_entries() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let future = Utc::now() + Duration::hours(1); + store.add("421_story_foo".to_string(), future).unwrap(); + let result = handle_timer_command(TimerCommand::List, &store, dir.path()).await; + assert!(result.contains("421_story_foo"), "unexpected: {result}"); + assert!(result.contains("Pending timers"), "unexpected: {result}"); + } + + // ── firing a timer for a backlog story moves it to current ─────────── + + /// When a timer fires for a story in backlog, the tick loop calls + /// `move_story_to_current` before `start_agent`. This test exercises + /// that exact sequence (minus the agent pool) to prove the story ends + /// up in `2_current/` after firing. + #[test] + fn fired_timer_for_backlog_story_moves_to_current() { + use std::fs; + + let dir = TempDir::new().unwrap(); + let root = dir.path(); + let backlog = root.join(".huskies/work/1_backlog"); + let current = root.join(".huskies/work/2_current"); + fs::create_dir_all(&backlog).unwrap(); + fs::create_dir_all(¤t).unwrap(); + let content = "---\nname: Foo\n---\n"; + fs::write(backlog.join("421_story_foo.md"), content).unwrap(); + crate::db::ensure_content_store(); + crate::db::write_content("421_story_foo", content); + + // Add a past timer so take_due returns it immediately. + let store = TimerStore::load(root.join("timers.json")); + let past = Utc::now() - Duration::seconds(1); + store.add("421_story_foo".to_string(), past).unwrap(); + + // Drain due timers — same as the tick loop does. + let due = store.take_due(Utc::now()); + assert_eq!(due.len(), 1, "expected one fired timer"); + + // Apply the move-to-current step the tick loop performs. + for entry in &due { + crate::agents::lifecycle::move_story_to_current(root, &entry.story_id) + .expect("move_story_to_current should succeed for backlog story"); + } + + // Story must still be accessible in the content store after the move. + assert!( + crate::db::read_content("421_story_foo").is_some(), + "story should be in the content store after timer fires" + ); + // Timer was consumed. + assert!( + store.list().is_empty(), + "fired timer should be removed from store" + ); + } +} diff --git a/server/src/service/timer/parse.rs b/server/src/service/timer/parse.rs new file mode 100644 index 00000000..d8b69042 --- /dev/null +++ b/server/src/service/timer/parse.rs @@ -0,0 +1,179 @@ +//! Pure parsing logic for timer bot-commands and time display formatting. +//! +//! No side effects: no filesystem access, no clock reads. + +use chrono::{DateTime, Local, Utc}; +use chrono_tz::Tz; + +use crate::chat::util::strip_bot_mention; + +// ── Command types ────────────────────────────────────────────────────────── + +/// A parsed `timer` command. +#[derive(Debug, PartialEq)] +pub enum TimerCommand { + /// `timer ` — schedule a deferred start. + Schedule { + story_number_or_id: String, + hhmm: String, + }, + /// `timer list` — list all pending timers. + List, + /// `timer cancel ` — remove a pending timer. + Cancel { story_number_or_id: String }, + /// Malformed arguments. + BadArgs, +} + +// ── Command extraction ───────────────────────────────────────────────────── + +/// Parse a `timer` command from a raw message body. +/// +/// Strips the bot mention prefix and matches the `timer` keyword. +/// Returns `None` when the message is not a timer command at all. +pub fn extract_timer_command( + message: &str, + bot_name: &str, + bot_user_id: &str, +) -> Option { + let stripped = strip_bot_mention(message, bot_name, bot_user_id); + let trimmed = stripped + .trim() + .trim_start_matches(|c: char| !c.is_alphanumeric()); + + let (cmd, args) = match trimmed.split_once(char::is_whitespace) { + Some((c, a)) => (c, a.trim()), + None => (trimmed, ""), + }; + + if !cmd.eq_ignore_ascii_case("timer") { + return None; + } + + // `timer` with no args or `timer list` + if args.is_empty() || args.eq_ignore_ascii_case("list") { + return Some(TimerCommand::List); + } + + let (sub, rest) = match args.split_once(char::is_whitespace) { + Some((s, r)) => (s, r.trim()), + None => (args, ""), + }; + + // `timer cancel ` + if sub.eq_ignore_ascii_case("cancel") { + if rest.is_empty() { + return Some(TimerCommand::BadArgs); + } + return Some(TimerCommand::Cancel { + story_number_or_id: rest.to_string(), + }); + } + + // `timer ` + if rest.is_empty() { + return Some(TimerCommand::BadArgs); + } + + Some(TimerCommand::Schedule { + story_number_or_id: sub.to_string(), + hhmm: rest.to_string(), + }) +} + +// ── Display helpers ──────────────────────────────────────────────────────── + +/// Format a UTC instant for display in the given timezone (or local time when +/// `timezone` is `None`). Returns `(formatted_string, label)` where `label` +/// is either the IANA timezone name or `"server local time"`. +pub(super) fn format_in_timezone(dt: DateTime, timezone: Option<&str>) -> (String, String) { + match timezone.and_then(|s| s.parse::().ok()) { + Some(tz) => { + let tz_time = dt.with_timezone(&tz); + (tz_time.format("%Y-%m-%d %H:%M").to_string(), tz.to_string()) + } + None => { + let local_time = dt.with_timezone(&Local); + ( + local_time.format("%Y-%m-%d %H:%M").to_string(), + "server local time".to_string(), + ) + } + } +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn non_timer_command_returns_none() { + assert!(extract_timer_command("Timmy help", "Timmy", "@bot:home").is_none()); + } + + #[test] + fn timer_list_no_args() { + assert_eq!( + extract_timer_command("Timmy timer", "Timmy", "@bot:home"), + Some(TimerCommand::List) + ); + } + + #[test] + fn timer_list_explicit() { + assert_eq!( + extract_timer_command("Timmy timer list", "Timmy", "@bot:home"), + Some(TimerCommand::List) + ); + } + + #[test] + fn timer_cancel_story_id() { + assert_eq!( + extract_timer_command("Timmy timer cancel 421_story_foo", "Timmy", "@bot:home"), + Some(TimerCommand::Cancel { + story_number_or_id: "421_story_foo".to_string() + }) + ); + } + + #[test] + fn timer_cancel_no_arg_is_bad_args() { + assert_eq!( + extract_timer_command("Timmy timer cancel", "Timmy", "@bot:home"), + Some(TimerCommand::BadArgs) + ); + } + + #[test] + fn timer_schedule_with_story_id() { + assert_eq!( + extract_timer_command("Timmy timer 421_story_foo 14:30", "Timmy", "@bot:home"), + Some(TimerCommand::Schedule { + story_number_or_id: "421_story_foo".to_string(), + hhmm: "14:30".to_string(), + }) + ); + } + + #[test] + fn timer_schedule_with_number() { + assert_eq!( + extract_timer_command("Timmy timer 421 14:30", "Timmy", "@bot:home"), + Some(TimerCommand::Schedule { + story_number_or_id: "421".to_string(), + hhmm: "14:30".to_string(), + }) + ); + } + + #[test] + fn timer_schedule_missing_time_is_bad_args() { + assert_eq!( + extract_timer_command("Timmy timer 421_story_foo", "Timmy", "@bot:home"), + Some(TimerCommand::BadArgs) + ); + } +} diff --git a/server/src/service/timer/persist.rs b/server/src/service/timer/persist.rs new file mode 100644 index 00000000..a1ab127d --- /dev/null +++ b/server/src/service/timer/persist.rs @@ -0,0 +1,89 @@ +//! Pure serialisation logic for the `timers.json` persistence format. +//! +//! No side effects: no filesystem access, no clock reads. + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +// ── Data types ───────────────────────────────────────────────────────────── + +/// A single scheduled timer entry. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct TimerEntry { + /// The full story ID (filename stem, e.g. `421_story_foo`). + pub story_id: String, + /// UTC instant at which the timer should fire. + pub scheduled_at: DateTime, +} + +// ── Serialisation ────────────────────────────────────────────────────────── + +/// Serialise a slice of timer entries to a pretty-printed JSON string. +pub(super) fn serialize_timers(entries: &[TimerEntry]) -> Result { + serde_json::to_string_pretty(entries).map_err(|e| format!("Serialize failed: {e}")) +} + +/// Deserialise a JSON string into a vector of timer entries. +/// Returns `None` if the input cannot be parsed. +pub(super) fn deserialize_timers(s: &str) -> Option> { + serde_json::from_str(s).ok() +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + + fn make_entry(story_id: &str, secs_from_epoch: i64) -> TimerEntry { + TimerEntry { + story_id: story_id.to_string(), + scheduled_at: Utc.timestamp_opt(secs_from_epoch, 0).unwrap(), + } + } + + #[test] + fn empty_slice_serialises_to_empty_array() { + let s = serialize_timers(&[]).unwrap(); + assert_eq!(s.trim(), "[]"); + } + + #[test] + fn single_entry_round_trips() { + let entry = make_entry("421_story_foo", 1_700_000_000); + let s = serialize_timers(std::slice::from_ref(&entry)).unwrap(); + let back = deserialize_timers(&s).unwrap(); + assert_eq!(back.len(), 1); + assert_eq!(back[0], entry); + } + + #[test] + fn multiple_entries_round_trip() { + let entries = vec![ + make_entry("421_story_foo", 1_700_000_000), + make_entry("422_story_bar", 1_700_001_000), + ]; + let s = serialize_timers(&entries).unwrap(); + let back = deserialize_timers(&s).unwrap(); + assert_eq!(back, entries); + } + + #[test] + fn deserialize_invalid_json_returns_none() { + assert!(deserialize_timers("not valid json").is_none()); + } + + #[test] + fn deserialize_wrong_schema_returns_none() { + assert!(deserialize_timers("[1, 2, 3]").is_none()); + } + + #[test] + fn story_id_preserved_exactly() { + let entry = make_entry("615_story_extract_timer_service", 1_700_000_000); + let s = serialize_timers(std::slice::from_ref(&entry)).unwrap(); + let back = deserialize_timers(&s).unwrap(); + assert_eq!(back[0].story_id, "615_story_extract_timer_service"); + } +} diff --git a/server/src/service/timer/schedule.rs b/server/src/service/timer/schedule.rs new file mode 100644 index 00000000..0f93e6f8 --- /dev/null +++ b/server/src/service/timer/schedule.rs @@ -0,0 +1,190 @@ +//! Pure schedule-math: given a `HH:MM` string and a reference instant, compute +//! the next UTC occurrence of that wall-clock time in the configured timezone. +//! +//! No side effects: no filesystem access, no clock reads. The caller is +//! responsible for supplying the current instant via `now`. + +use chrono::{DateTime, Duration, Local, NaiveTime, TimeZone, Utc}; +use chrono_tz::Tz; + +/// Return the next UTC instant at which the given timezone (or the +/// server-local timezone when `timezone` is `None`) will read `hhmm`. +/// +/// If that wall-clock time has already passed today, returns tomorrow's +/// occurrence. Returns `None` if `hhmm` is not a valid `HH:MM` string. +/// +/// `now` is the reference instant. Callers that want real-time behaviour +/// should pass `Utc::now()` — this is done automatically by the `io` layer's +/// [`crate::service::timer::next_occurrence_of_hhmm`] wrapper. +pub fn next_occurrence_at( + hhmm: &str, + timezone: Option<&str>, + now: DateTime, +) -> Option> { + let (hh, mm) = hhmm.split_once(':')?; + let hours: u32 = hh.parse().ok()?; + let minutes: u32 = mm.parse().ok()?; + if hours > 23 || minutes > 59 { + return None; + } + let target_time = NaiveTime::from_hms_opt(hours, minutes, 0)?; + + match timezone.and_then(|s| s.parse::().ok()) { + Some(tz) => { + let now_tz = now.with_timezone(&tz); + let today = now_tz.date_naive(); + let candidate = today.and_time(target_time); + let candidate_tz = tz.from_local_datetime(&candidate).single()?; + if candidate_tz > now_tz { + Some(candidate_tz.to_utc()) + } else { + let tomorrow = today + Duration::days(1); + let tomorrow_candidate = tomorrow.and_time(target_time); + let tomorrow_tz = tz.from_local_datetime(&tomorrow_candidate).single()?; + Some(tomorrow_tz.to_utc()) + } + } + None => { + // Fall back to host/container local timezone. + let now_local = now.with_timezone(&Local); + let today = now_local.date_naive(); + let candidate = today.and_time(target_time); + let candidate_local = Local.from_local_datetime(&candidate).single()?; + if candidate_local > now_local { + Some(candidate_local.to_utc()) + } else { + let tomorrow = today + Duration::days(1); + let tomorrow_candidate = tomorrow.and_time(target_time); + let tomorrow_local = Local.from_local_datetime(&tomorrow_candidate).single()?; + Some(tomorrow_local.to_utc()) + } + } + } +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + + /// Fixed reference instant: 2024-01-15 10:00:00 UTC (winter, GMT = UTC+0). + fn now_utc() -> DateTime { + Utc.with_ymd_and_hms(2024, 1, 15, 10, 0, 0).unwrap() + } + + // ── parse validation ────────────────────────────────────────────────── + + #[test] + fn invalid_hhmm_missing_colon_returns_none() { + assert!(next_occurrence_at("1430", Some("UTC"), now_utc()).is_none()); + } + + #[test] + fn invalid_hhmm_bad_hours_returns_none() { + assert!(next_occurrence_at("25:00", Some("UTC"), now_utc()).is_none()); + } + + #[test] + fn invalid_hhmm_bad_minutes_returns_none() { + assert!(next_occurrence_at("12:60", Some("UTC"), now_utc()).is_none()); + } + + #[test] + fn midnight_is_valid() { + assert!(next_occurrence_at("00:00", Some("UTC"), now_utc()).is_some()); + } + + #[test] + fn last_minute_of_day_is_valid() { + assert!(next_occurrence_at("23:59", Some("UTC"), now_utc()).is_some()); + } + + // ── schedule math in UTC ────────────────────────────────────────────── + + #[test] + fn later_today_when_not_yet_passed_in_utc() { + // now = 10:00 UTC, target = 14:30 UTC → fires same day + let now = Utc.with_ymd_and_hms(2024, 1, 15, 10, 0, 0).unwrap(); + let result = next_occurrence_at("14:30", Some("UTC"), now).unwrap(); + let expected = Utc.with_ymd_and_hms(2024, 1, 15, 14, 30, 0).unwrap(); + assert_eq!(result, expected); + } + + #[test] + fn tomorrow_when_time_already_passed_in_utc() { + // now = 15:00 UTC, target = 14:30 UTC → fires tomorrow + let now = Utc.with_ymd_and_hms(2024, 1, 15, 15, 0, 0).unwrap(); + let result = next_occurrence_at("14:30", Some("UTC"), now).unwrap(); + let expected = Utc.with_ymd_and_hms(2024, 1, 16, 14, 30, 0).unwrap(); + assert_eq!(result, expected); + } + + #[test] + fn exact_minute_match_fires_tomorrow_utc() { + // now == target: not strictly in the future → fires tomorrow + let now = Utc.with_ymd_and_hms(2024, 1, 15, 14, 30, 0).unwrap(); + let result = next_occurrence_at("14:30", Some("UTC"), now).unwrap(); + let expected = Utc.with_ymd_and_hms(2024, 1, 16, 14, 30, 0).unwrap(); + assert_eq!(result, expected); + } + + #[test] + fn result_is_always_after_now() { + let now = now_utc(); + let result = next_occurrence_at("14:30", Some("UTC"), now).unwrap(); + assert!(result > now, "result {result} must be after now {now}"); + } + + // ── timezone round-trips ────────────────────────────────────────────── + + #[test] + fn europe_london_gmt_no_offset_winter() { + // Winter: GMT = UTC+0 → 14:30 London = 14:30 UTC + let now = Utc.with_ymd_and_hms(2024, 1, 15, 10, 0, 0).unwrap(); + let result = next_occurrence_at("14:30", Some("Europe/London"), now).unwrap(); + let expected = Utc.with_ymd_and_hms(2024, 1, 15, 14, 30, 0).unwrap(); + assert_eq!(result, expected); + } + + #[test] + fn europe_london_bst_plus_one_summer() { + // Summer: BST = UTC+1 → 14:30 London = 13:30 UTC + // now = 2024-07-15 10:00 UTC = 11:00 BST (before 14:30 BST) + let now = Utc.with_ymd_and_hms(2024, 7, 15, 10, 0, 0).unwrap(); + let result = next_occurrence_at("14:30", Some("Europe/London"), now).unwrap(); + let expected = Utc.with_ymd_and_hms(2024, 7, 15, 13, 30, 0).unwrap(); + assert_eq!(result, expected); + } + + #[test] + fn america_new_york_est_fires_tomorrow() { + // EST = UTC-5; now = 2024-01-15 20:00 UTC = 15:00 EST + // target = 14:30 EST already passed today → fires tomorrow 14:30 EST = 19:30 UTC + let now = Utc.with_ymd_and_hms(2024, 1, 15, 20, 0, 0).unwrap(); + let result = next_occurrence_at("14:30", Some("America/New_York"), now).unwrap(); + let expected = Utc.with_ymd_and_hms(2024, 1, 16, 19, 30, 0).unwrap(); + assert_eq!(result, expected); + } + + #[test] + fn america_new_york_est_fires_today() { + // EST = UTC-5; now = 2024-01-15 10:00 UTC = 05:00 EST + // target = 14:30 EST not yet reached → fires today: 14:30 EST = 19:30 UTC + let now = Utc.with_ymd_and_hms(2024, 1, 15, 10, 0, 0).unwrap(); + let result = next_occurrence_at("14:30", Some("America/New_York"), now).unwrap(); + let expected = Utc.with_ymd_and_hms(2024, 1, 15, 19, 30, 0).unwrap(); + assert_eq!(result, expected); + } + + #[test] + fn invalid_timezone_falls_back_to_local_and_returns_some() { + // Unrecognised timezone falls back to chrono::Local — still returns Some. + let result = next_occurrence_at("14:30", Some("Invalid/Zone"), now_utc()); + assert!( + result.is_some(), + "invalid timezone should fall back to local and return Some" + ); + } +}