From cf5424f9a6c131aac5830945c478723a8140d712 Mon Sep 17 00:00:00 2001 From: dave Date: Sat, 28 Mar 2026 08:59:36 +0000 Subject: [PATCH] storkit: merge 421_story_timer_command_for_deferred_agent_start --- server/src/chat/commands/mod.rs | 6 + server/src/chat/commands/timer.rs | 54 ++ server/src/chat/mod.rs | 1 + server/src/chat/timer.rs | 736 ++++++++++++++++++ .../src/chat/transport/matrix/bot/context.rs | 6 + .../src/chat/transport/matrix/bot/messages.rs | 23 + server/src/chat/transport/matrix/bot/run.rs | 10 + 7 files changed, 836 insertions(+) create mode 100644 server/src/chat/commands/timer.rs create mode 100644 server/src/chat/timer.rs diff --git a/server/src/chat/commands/mod.rs b/server/src/chat/commands/mod.rs index 3a788f14..213f4896 100644 --- a/server/src/chat/commands/mod.rs +++ b/server/src/chat/commands/mod.rs @@ -15,6 +15,7 @@ mod move_story; mod overview; mod show; mod status; +mod timer; mod triage; mod unreleased; @@ -160,6 +161,11 @@ pub fn commands() -> &'static [BotCommand] { description: "Rebuild the server binary and restart", handler: handle_rebuild_fallback, }, + BotCommand { + name: "timer", + description: "Schedule a deferred agent start: `timer `, `timer list`, `timer cancel `", + handler: timer::handle_timer, + }, BotCommand { name: "unreleased", description: "Show stories merged to master since the last release tag", diff --git a/server/src/chat/commands/timer.rs b/server/src/chat/commands/timer.rs new file mode 100644 index 00000000..01dbba51 --- /dev/null +++ b/server/src/chat/commands/timer.rs @@ -0,0 +1,54 @@ +//! Handler stub for the `timer` command. +//! +//! The real implementation lives in `crate::chat::timer` (async). This +//! stub exists only so that `timer` appears in the help registry — the +//! handler always returns `None` so the bot's message loop falls through to +//! the async handler. + +use super::CommandContext; + +pub(super) fn handle_timer(_ctx: &CommandContext) -> Option { + // Handled asynchronously in each transport's message dispatcher. + None +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + #[test] + fn timer_command_is_registered() { + use super::super::commands; + let found = commands().iter().any(|c| c.name == "timer"); + assert!(found, "timer command must be in the registry"); + } + + #[test] + fn timer_command_appears_in_help() { + let result = super::super::tests::try_cmd_addressed( + "Timmy", + "@timmy:homeserver.local", + "@timmy help", + ); + let output = result.unwrap(); + assert!( + output.contains("timer"), + "help should list timer command: {output}" + ); + } + + #[test] + fn timer_command_falls_through_to_none_in_registry() { + let result = super::super::tests::try_cmd_addressed( + "Timmy", + "@timmy:homeserver.local", + "@timmy timer list", + ); + assert!( + result.is_none(), + "timer should not produce a sync response (handled async): {result:?}" + ); + } +} diff --git a/server/src/chat/mod.rs b/server/src/chat/mod.rs index c96afcee..0f561921 100644 --- a/server/src/chat/mod.rs +++ b/server/src/chat/mod.rs @@ -5,6 +5,7 @@ //! notifications) to work against any chat platform — Matrix, WhatsApp, etc. pub mod commands; +pub mod timer; pub mod transport; pub mod util; diff --git a/server/src/chat/timer.rs b/server/src/chat/timer.rs new file mode 100644 index 00000000..47a2f157 --- /dev/null +++ b/server/src/chat/timer.rs @@ -0,0 +1,736 @@ +//! Deferred agent start via one-shot timers. +//! +//! Provides [`TimerStore`] for persisting timers to `.storkit/timers.json`, +//! a 30-second tick loop ([`spawn_timer_tick_loop`]) that fires due timers, +//! and command parsing / handling for the `timer` bot command. + +use chrono::{DateTime, Duration, Local, NaiveTime, TimeZone, Utc}; +use serde::{Deserialize, Serialize}; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +// ── Data types ───────────────────────────────────────────────────────────── + +/// A single scheduled timer entry. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct TimerEntry { + /// The full story ID (filename stem, e.g. `421_story_foo`). + pub story_id: String, + /// UTC instant at which the timer should fire. + pub scheduled_at: DateTime, +} + +// ── TimerStore ───────────────────────────────────────────────────────────── + +/// Persistent store for pending timers, backed by a JSON file. +pub struct TimerStore { + path: PathBuf, + timers: Mutex>, +} + +impl TimerStore { + /// Load the timer store from `path`. Returns an empty store if the file + /// does not exist or cannot be parsed. + pub fn load(path: PathBuf) -> Self { + let timers = if path.exists() { + std::fs::read_to_string(&path) + .ok() + .and_then(|s| serde_json::from_str::>(&s).ok()) + .unwrap_or_default() + } else { + Vec::new() + }; + Self { + path, + timers: Mutex::new(timers), + } + } + + fn save_locked(path: &Path, timers: &[TimerEntry]) -> Result<(), String> { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .map_err(|e| format!("Failed to create directory: {e}"))?; + } + let content = + serde_json::to_string_pretty(timers).map_err(|e| format!("Serialize failed: {e}"))?; + std::fs::write(path, content).map_err(|e| format!("Failed to write timers: {e}")) + } + + /// Add a timer and persist to disk. + pub fn add(&self, story_id: String, scheduled_at: DateTime) -> Result<(), String> { + let mut timers = self.timers.lock().unwrap(); + timers.push(TimerEntry { + story_id, + scheduled_at, + }); + Self::save_locked(&self.path, &timers) + } + + /// Remove the timer for `story_id`. Returns `true` if one was removed. + pub fn remove(&self, story_id: &str) -> bool { + let mut timers = self.timers.lock().unwrap(); + let before = timers.len(); + timers.retain(|t| t.story_id != story_id); + let removed = timers.len() < before; + if removed { + let _ = Self::save_locked(&self.path, &timers); + } + removed + } + + /// Return all pending timers (cloned). + pub fn list(&self) -> Vec { + self.timers.lock().unwrap().clone() + } + + /// Remove and return all timers whose `scheduled_at` is ≤ `now`. + /// Persists the updated list to disk if any timers were removed. + pub fn take_due(&self, now: DateTime) -> Vec { + let mut timers = self.timers.lock().unwrap(); + let mut due = Vec::new(); + let mut remaining = Vec::new(); + for t in timers.drain(..) { + if t.scheduled_at <= now { + due.push(t); + } else { + remaining.push(t); + } + } + *timers = remaining; + if !due.is_empty() { + let _ = Self::save_locked(&self.path, &timers); + } + due + } +} + +// ── Tick loop ────────────────────────────────────────────────────────────── + +/// Spawn a background tokio task that fires due timers every 30 seconds. +/// +/// Same pattern as the watchdog in `agents::pool::auto_assign`. +/// When a timer fires, `start_agent` is called for the story. If all coders +/// are busy the story remains in `2_current/` and auto-assign will pick it up. +pub fn spawn_timer_tick_loop( + store: Arc, + agents: Arc, + project_root: PathBuf, +) { + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(30)); + loop { + interval.tick().await; + let now = Utc::now(); + let due = store.take_due(now); + for entry in due { + crate::slog!( + "[timer] Timer fired for story {}; calling start_agent", + entry.story_id + ); + match agents + .start_agent(&project_root, &entry.story_id, None, None) + .await + { + Ok(info) => { + crate::slog!( + "[timer] Started agent {} for story {}", + info.agent_name, + entry.story_id + ); + } + Err(e) => { + crate::slog!( + "[timer] Failed to start agent for story {}: {e}", + entry.story_id + ); + } + } + } + } + }); +} + +// ── Command types ────────────────────────────────────────────────────────── + +/// A parsed `timer` command. +#[derive(Debug, PartialEq)] +pub enum TimerCommand { + /// `timer ` — schedule a deferred start. + Schedule { + story_number_or_id: String, + hhmm: String, + }, + /// `timer list` — list all pending timers. + List, + /// `timer cancel ` — remove a pending timer. + Cancel { story_number_or_id: String }, + /// Malformed arguments. + BadArgs, +} + +// ── Command extraction ───────────────────────────────────────────────────── + +/// Parse a `timer` command from a raw message body. +/// +/// Strips the bot mention prefix and matches the `timer` keyword. +/// Returns `None` when the message is not a timer command at all. +pub fn extract_timer_command( + message: &str, + bot_name: &str, + bot_user_id: &str, +) -> Option { + let stripped = strip_mention(message, bot_name, bot_user_id); + let trimmed = stripped + .trim() + .trim_start_matches(|c: char| !c.is_alphanumeric()); + + let (cmd, args) = match trimmed.split_once(char::is_whitespace) { + Some((c, a)) => (c, a.trim()), + None => (trimmed, ""), + }; + + if !cmd.eq_ignore_ascii_case("timer") { + return None; + } + + // `timer` with no args or `timer list` + if args.is_empty() || args.eq_ignore_ascii_case("list") { + return Some(TimerCommand::List); + } + + let (sub, rest) = match args.split_once(char::is_whitespace) { + Some((s, r)) => (s, r.trim()), + None => (args, ""), + }; + + // `timer cancel ` + if sub.eq_ignore_ascii_case("cancel") { + if rest.is_empty() { + return Some(TimerCommand::BadArgs); + } + return Some(TimerCommand::Cancel { + story_number_or_id: rest.to_string(), + }); + } + + // `timer ` + if rest.is_empty() { + return Some(TimerCommand::BadArgs); + } + + Some(TimerCommand::Schedule { + story_number_or_id: sub.to_string(), + hhmm: rest.to_string(), + }) +} + +// ── Command handler ──────────────────────────────────────────────────────── + +/// Handle a parsed `timer` command. Returns a markdown-formatted response. +pub async fn handle_timer_command( + cmd: TimerCommand, + store: &TimerStore, + project_root: &Path, +) -> String { + match cmd { + TimerCommand::Schedule { + story_number_or_id, + hhmm, + } => { + let story_id = match resolve_story_id(&story_number_or_id, project_root) { + Some(id) => id, + None => { + return format!( + "No story with number or ID **{story_number_or_id}** found." + ); + } + }; + + // The story must already be in 2_current/ — the timer does not move stories. + let current_dir = project_root.join(".storkit").join("work").join("2_current"); + let story_file = current_dir.join(format!("{story_id}.md")); + if !story_file.exists() { + return format!( + "Story **{story_id}** is not in `work/2_current/`. \ + Move it to current before scheduling a timer." + ); + } + + let scheduled_at = match next_occurrence_of_hhmm(&hhmm) { + Some(t) => t, + None => { + return format!( + "Invalid time **{hhmm}**. Use `HH:MM` format (e.g. `14:30`)." + ); + } + }; + + match store.add(story_id.clone(), scheduled_at) { + Ok(()) => { + let local_time = scheduled_at.with_timezone(&Local); + format!( + "Timer set for **{story_id}** at **{}** (server local time).", + local_time.format("%Y-%m-%d %H:%M") + ) + } + Err(e) => format!("Failed to save timer: {e}"), + } + } + TimerCommand::List => { + let timers = store.list(); + if timers.is_empty() { + return "No pending timers.".to_string(); + } + let mut lines = vec!["**Pending timers:**".to_string()]; + for t in &timers { + let local_time = t.scheduled_at.with_timezone(&Local); + lines.push(format!( + "- **{}** → {}", + t.story_id, + local_time.format("%Y-%m-%d %H:%M") + )); + } + lines.join("\n") + } + TimerCommand::Cancel { story_number_or_id } => { + let story_id = resolve_story_id(&story_number_or_id, project_root) + .unwrap_or(story_number_or_id.clone()); + if store.remove(&story_id) { + format!("Timer for **{story_id}** cancelled.") + } else { + format!("No timer found for **{story_id}**.") + } + } + TimerCommand::BadArgs => { + "Usage:\n\ + - `timer ` — schedule deferred start\n\ + - `timer list` — show pending timers\n\ + - `timer cancel ` — remove a timer" + .to_string() + } + } +} + +// ── Helpers ──────────────────────────────────────────────────────────────── + +/// Parse `HH:MM` and return the next UTC instant at which the server-local +/// clock will read that time. If the time has already passed today, returns +/// tomorrow's occurrence. +pub fn next_occurrence_of_hhmm(hhmm: &str) -> Option> { + let (hh, mm) = hhmm.split_once(':')?; + let hours: u32 = hh.parse().ok()?; + let minutes: u32 = mm.parse().ok()?; + if hours > 23 || minutes > 59 { + return None; + } + let target_time = NaiveTime::from_hms_opt(hours, minutes, 0)?; + let now_local = Local::now(); + let today = now_local.date_naive(); + let candidate = today.and_time(target_time); + let candidate_local = Local.from_local_datetime(&candidate).single()?; + if candidate_local > now_local { + Some(candidate_local.to_utc()) + } else { + let tomorrow = today + Duration::days(1); + let tomorrow_candidate = tomorrow.and_time(target_time); + let tomorrow_local = Local.from_local_datetime(&tomorrow_candidate).single()?; + Some(tomorrow_local.to_utc()) + } +} + +/// Resolve a story ID from a numeric story number or a full ID string. +/// +/// Searches all pipeline stages. Returns `None` only when the input is +/// numeric but no matching file is found. +fn resolve_story_id(number_or_id: &str, project_root: &Path) -> Option { + const STAGES: &[&str] = &[ + "1_backlog", + "2_current", + "3_qa", + "4_merge", + "5_done", + "6_archived", + ]; + + // Full ID (contains underscores) — return as-is; validation happens at file-check time. + if number_or_id.contains('_') { + return Some(number_or_id.to_string()); + } + + // Numeric lookup. + if !number_or_id.chars().all(|c| c.is_ascii_digit()) { + return None; + } + + for stage in STAGES { + let dir = project_root.join(".storkit").join("work").join(stage); + if !dir.exists() { + continue; + } + if let Ok(entries) = std::fs::read_dir(&dir) { + for entry in entries.flatten() { + let path = entry.path(); + if path.extension().and_then(|e| e.to_str()) != Some("md") { + continue; + } + if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) { + let file_num = stem + .split('_') + .next() + .filter(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit())) + .unwrap_or(""); + if file_num == number_or_id { + return Some(stem.to_string()); + } + } + } + } + } + None +} + +fn strip_mention<'a>(message: &'a str, bot_name: &str, bot_user_id: &str) -> &'a str { + let trimmed = message.trim(); + if let Some(rest) = strip_prefix_ci(trimmed, bot_user_id) { + return rest; + } + if let Some(localpart) = bot_user_id.split(':').next() + && let Some(rest) = strip_prefix_ci(trimmed, localpart) + { + return rest; + } + if let Some(rest) = strip_prefix_ci(trimmed, bot_name) { + return rest; + } + trimmed +} + +fn strip_prefix_ci<'a>(text: &'a str, prefix: &str) -> Option<&'a str> { + if text.len() < prefix.len() { + return None; + } + if !text[..prefix.len()].eq_ignore_ascii_case(prefix) { + return None; + } + let rest = &text[prefix.len()..]; + match rest.chars().next() { + None => Some(rest), + Some(c) if c.is_alphanumeric() || c == '-' || c == '_' => None, + _ => Some(rest), + } +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + // ── next_occurrence_of_hhmm ───────────────────────────────────────── + + #[test] + fn valid_hhmm_returns_some() { + let result = next_occurrence_of_hhmm("14:30"); + assert!(result.is_some(), "valid HH:MM should return Some"); + } + + #[test] + fn invalid_hhmm_missing_colon_returns_none() { + assert!(next_occurrence_of_hhmm("1430").is_none()); + } + + #[test] + fn invalid_hhmm_bad_hours_returns_none() { + assert!(next_occurrence_of_hhmm("25:00").is_none()); + } + + #[test] + fn invalid_hhmm_bad_minutes_returns_none() { + assert!(next_occurrence_of_hhmm("12:60").is_none()); + } + + #[test] + fn next_occurrence_is_in_the_future() { + let result = next_occurrence_of_hhmm("14:30").unwrap(); + assert!(result > Utc::now(), "next occurrence must be in the future"); + } + + // ── extract_timer_command ─────────────────────────────────────────── + + #[test] + fn non_timer_command_returns_none() { + assert!(extract_timer_command("Timmy help", "Timmy", "@bot:home").is_none()); + } + + #[test] + fn timer_list_no_args() { + assert_eq!( + extract_timer_command("Timmy timer", "Timmy", "@bot:home"), + Some(TimerCommand::List) + ); + } + + #[test] + fn timer_list_explicit() { + assert_eq!( + extract_timer_command("Timmy timer list", "Timmy", "@bot:home"), + Some(TimerCommand::List) + ); + } + + #[test] + fn timer_cancel_story_id() { + assert_eq!( + extract_timer_command( + "Timmy timer cancel 421_story_foo", + "Timmy", + "@bot:home" + ), + Some(TimerCommand::Cancel { + story_number_or_id: "421_story_foo".to_string() + }) + ); + } + + #[test] + fn timer_cancel_no_arg_is_bad_args() { + assert_eq!( + extract_timer_command("Timmy timer cancel", "Timmy", "@bot:home"), + Some(TimerCommand::BadArgs) + ); + } + + #[test] + fn timer_schedule_with_story_id() { + assert_eq!( + extract_timer_command( + "Timmy timer 421_story_foo 14:30", + "Timmy", + "@bot:home" + ), + Some(TimerCommand::Schedule { + story_number_or_id: "421_story_foo".to_string(), + hhmm: "14:30".to_string(), + }) + ); + } + + #[test] + fn timer_schedule_with_number() { + assert_eq!( + extract_timer_command("Timmy timer 421 14:30", "Timmy", "@bot:home"), + Some(TimerCommand::Schedule { + story_number_or_id: "421".to_string(), + hhmm: "14:30".to_string(), + }) + ); + } + + #[test] + fn timer_schedule_missing_time_is_bad_args() { + assert_eq!( + extract_timer_command( + "Timmy timer 421_story_foo", + "Timmy", + "@bot:home" + ), + Some(TimerCommand::BadArgs) + ); + } + + // ── TimerStore ────────────────────────────────────────────────────── + + #[test] + fn timer_store_empty_on_missing_file() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + assert!(store.list().is_empty()); + } + + #[test] + fn timer_store_add_and_list() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let t = Utc::now() + Duration::hours(1); + store.add("story_1".to_string(), t).unwrap(); + let list = store.list(); + assert_eq!(list.len(), 1); + assert_eq!(list[0].story_id, "story_1"); + } + + #[test] + fn timer_store_remove() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let t = Utc::now() + Duration::hours(1); + store.add("story_1".to_string(), t).unwrap(); + assert!(store.remove("story_1")); + assert!(!store.remove("story_1")); // already gone + assert!(store.list().is_empty()); + } + + #[test] + fn timer_store_persists_and_reloads() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("timers.json"); + let t = Utc::now() + Duration::hours(2); + { + let store = TimerStore::load(path.clone()); + store.add("421_story_foo".to_string(), t).unwrap(); + } + // Reload from disk. + let store2 = TimerStore::load(path); + let list = store2.list(); + assert_eq!(list.len(), 1); + assert_eq!(list[0].story_id, "421_story_foo"); + } + + #[test] + fn take_due_returns_only_past_entries() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let past = Utc::now() - Duration::minutes(1); + let future = Utc::now() + Duration::hours(1); + store.add("past_story".to_string(), past).unwrap(); + store.add("future_story".to_string(), future).unwrap(); + + let due = store.take_due(Utc::now()); + assert_eq!(due.len(), 1); + assert_eq!(due[0].story_id, "past_story"); + assert_eq!(store.list().len(), 1); + assert_eq!(store.list()[0].story_id, "future_story"); + } + + #[test] + fn multiple_timers_same_time_all_returned() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let past = Utc::now() - Duration::minutes(1); + store.add("story_a".to_string(), past).unwrap(); + store.add("story_b".to_string(), past).unwrap(); + + let due = store.take_due(Utc::now()); + assert_eq!(due.len(), 2, "both timers at same time must fire"); + } + + // ── handle_timer_command ──────────────────────────────────────────── + + #[tokio::test] + async fn handle_list_empty() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let result = handle_timer_command(TimerCommand::List, &store, dir.path()).await; + assert!(result.contains("No pending timers"), "unexpected: {result}"); + } + + #[tokio::test] + async fn handle_cancel_not_found() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let result = handle_timer_command( + TimerCommand::Cancel { + story_number_or_id: "421_story_foo".to_string(), + }, + &store, + dir.path(), + ) + .await; + assert!( + result.contains("No timer found"), + "unexpected: {result}" + ); + } + + #[tokio::test] + async fn handle_schedule_story_not_in_current() { + let dir = TempDir::new().unwrap(); + // Set up directory structure with no story in 2_current + std::fs::create_dir_all(dir.path().join(".storkit/work/2_current")).unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let result = handle_timer_command( + TimerCommand::Schedule { + story_number_or_id: "421_story_foo".to_string(), + hhmm: "14:30".to_string(), + }, + &store, + dir.path(), + ) + .await; + assert!( + result.contains("not in `work/2_current/`"), + "unexpected: {result}" + ); + } + + #[tokio::test] + async fn handle_schedule_success() { + let dir = TempDir::new().unwrap(); + let current_dir = dir.path().join(".storkit/work/2_current"); + std::fs::create_dir_all(¤t_dir).unwrap(); + std::fs::write(current_dir.join("421_story_foo.md"), "---\nname: Foo\n---").unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let result = handle_timer_command( + TimerCommand::Schedule { + story_number_or_id: "421_story_foo".to_string(), + hhmm: "23:59".to_string(), + }, + &store, + dir.path(), + ) + .await; + assert!( + result.contains("Timer set for"), + "unexpected: {result}" + ); + assert_eq!(store.list().len(), 1); + } + + #[tokio::test] + async fn handle_schedule_invalid_time() { + let dir = TempDir::new().unwrap(); + let current_dir = dir.path().join(".storkit/work/2_current"); + std::fs::create_dir_all(¤t_dir).unwrap(); + std::fs::write(current_dir.join("421_story_foo.md"), "---\nname: Foo\n---").unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let result = handle_timer_command( + TimerCommand::Schedule { + story_number_or_id: "421_story_foo".to_string(), + hhmm: "99:00".to_string(), + }, + &store, + dir.path(), + ) + .await; + assert!(result.contains("Invalid time"), "unexpected: {result}"); + } + + #[tokio::test] + async fn handle_cancel_existing_timer() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let future = Utc::now() + Duration::hours(1); + store.add("421_story_foo".to_string(), future).unwrap(); + let result = handle_timer_command( + TimerCommand::Cancel { + story_number_or_id: "421_story_foo".to_string(), + }, + &store, + dir.path(), + ) + .await; + assert!(result.contains("cancelled"), "unexpected: {result}"); + assert!(store.list().is_empty()); + } + + #[tokio::test] + async fn handle_list_with_entries() { + let dir = TempDir::new().unwrap(); + let store = TimerStore::load(dir.path().join("timers.json")); + let future = Utc::now() + Duration::hours(1); + store.add("421_story_foo".to_string(), future).unwrap(); + let result = handle_timer_command(TimerCommand::List, &store, dir.path()).await; + assert!(result.contains("421_story_foo"), "unexpected: {result}"); + assert!(result.contains("Pending timers"), "unexpected: {result}"); + } +} diff --git a/server/src/chat/transport/matrix/bot/context.rs b/server/src/chat/transport/matrix/bot/context.rs index c2a8d004..6800da50 100644 --- a/server/src/chat/transport/matrix/bot/context.rs +++ b/server/src/chat/transport/matrix/bot/context.rs @@ -1,4 +1,5 @@ use crate::agents::AgentPool; +use crate::chat::timer::TimerStore; use crate::chat::ChatTransport; use crate::http::context::{PermissionDecision, PermissionForward}; use matrix_sdk::ruma::{OwnedEventId, OwnedRoomId, OwnedUserId}; @@ -55,6 +56,8 @@ pub struct BotContext { /// All message I/O goes through this abstraction so the bot logic works /// with any platform, not just Matrix. pub transport: Arc, + /// Persistent store for pending deferred-start timers. + pub timer_store: Arc, } // --------------------------------------------------------------------------- @@ -103,6 +106,9 @@ mod tests { "test-token".to_string(), "pipeline_notification".to_string(), )), + timer_store: Arc::new(crate::chat::timer::TimerStore::load( + std::path::PathBuf::from("/tmp/timers.json"), + )), }; // Clone must work (required by Matrix SDK event handler injection). let _cloned = ctx.clone(); diff --git a/server/src/chat/transport/matrix/bot/messages.rs b/server/src/chat/transport/matrix/bot/messages.rs index c1ba7559..8652241e 100644 --- a/server/src/chat/transport/matrix/bot/messages.rs +++ b/server/src/chat/transport/matrix/bot/messages.rs @@ -433,6 +433,29 @@ pub(super) async fn on_room_message( return; } + // Check for the timer command, which requires async file I/O and cannot + // be handled by the sync command registry. + if let Some(timer_cmd) = crate::chat::timer::extract_timer_command( + &user_message, + &ctx.bot_name, + ctx.bot_user_id.as_str(), + ) { + slog!("[matrix-bot] Handling timer command from {sender}: {timer_cmd:?}"); + let response = crate::chat::timer::handle_timer_command( + timer_cmd, + &ctx.timer_store, + &ctx.project_root, + ) + .await; + let html = markdown_to_html(&response); + if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &response, &html).await + && let Ok(event_id) = msg_id.parse() + { + ctx.bot_sent_event_ids.lock().await.insert(event_id); + } + return; + } + // Spawn a separate task so the Matrix sync loop is not blocked while we // wait for the LLM response (which can take several seconds). tokio::spawn(async move { diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index 06105191..a76c5891 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -215,6 +215,15 @@ pub async fn run_bot( .unwrap_or_else(|| "Assistant".to_string()); let announce_bot_name = bot_name.clone(); + let timer_store = Arc::new(crate::chat::timer::TimerStore::load( + project_root.join(".storkit").join("timers.json"), + )); + crate::chat::timer::spawn_timer_tick_loop( + Arc::clone(&timer_store), + Arc::clone(&agents), + project_root.clone(), + ); + let ctx = BotContext { bot_user_id, target_room_ids, @@ -231,6 +240,7 @@ pub async fn run_bot( agents, htop_sessions: Arc::new(TokioMutex::new(HashMap::new())), transport: Arc::clone(&transport), + timer_store, }; slog!("[matrix-bot] Cryptographic identity verification is always ON — commands from unencrypted rooms or unverified devices are rejected");