From 311883f45d4c8dfdce04300414cdb8cf049f10bf Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 14 May 2026 16:26:49 +0000 Subject: [PATCH] huskies: merge 1039 --- .huskies/source-map.json | 21 + server/src/agent_mode/context.rs | 4 + server/src/http/context.rs | 9 +- server/src/http/mcp/dispatch.rs | 6 +- server/src/http/mcp/mod.rs | 2 + server/src/http/mcp/timer_tools.rs | 324 ++++++++++++ server/src/http/mcp/tools_list/mod.rs | 5 +- .../src/http/mcp/tools_list/system_tools.rs | 60 +++ server/src/main.rs | 15 +- server/src/service/timer/mod.rs | 3 + server/src/service/timer/scheduled.rs | 464 ++++++++++++++++++ server/src/startup/tick_loop.rs | 97 +++- 12 files changed, 1005 insertions(+), 5 deletions(-) create mode 100644 server/src/http/mcp/timer_tools.rs create mode 100644 server/src/service/timer/scheduled.rs diff --git a/.huskies/source-map.json b/.huskies/source-map.json index e5a18df7..cfb8eab3 100644 --- a/.huskies/source-map.json +++ b/.huskies/source-map.json @@ -1670,6 +1670,7 @@ "mod shell_tools", "mod status_tools", "mod story_tools", + "mod timer_tools", "mod tools_list", "mod wizard_tools", "fn mcp_get_handler", @@ -1750,6 +1751,11 @@ "fn tool_update_story", "fn tool_unblock_story" ], + "server/src/http/mcp/timer_tools.rs": [ + "fn tool_schedule_timer", + "fn tool_list_timers", + "fn tool_cancel_timer" + ], "server/src/http/mcp/tools_list/agent_tools.rs": [ "fn agent_tools" ], @@ -2758,6 +2764,7 @@ "mod parse", "mod persist", "mod schedule", + "mod scheduled", "enum Error", "fn schedule_timer", "fn cancel_timer", @@ -2776,6 +2783,20 @@ "server/src/service/timer/schedule.rs": [ "fn next_occurrence_at" ], + "server/src/service/timer/scheduled.rs": [ + "enum TimerAction", + "enum TimerMode", + "struct ScheduledTimer", + "fn new_id", + "struct ScheduledTimerStore", + "fn load", + "fn add", + "fn remove_by_id", + "fn list", + "fn take_due", + "fn parse_when_str", + "fn parse_interval_str" + ], "server/src/service/wizard/io.rs": [ "fn load", "fn save", diff --git a/server/src/agent_mode/context.rs b/server/src/agent_mode/context.rs index a048dd0a..af955beb 100644 --- a/server/src/agent_mode/context.rs +++ b/server/src/agent_mode/context.rs @@ -64,6 +64,9 @@ pub(super) fn build_agent_app_context( let timer_store = Arc::new(crate::service::timer::TimerStore::load( project_root.join(".huskies").join("timers.json"), )); + let scheduled_timer_store = Arc::new(crate::service::timer::ScheduledTimerStore::load( + project_root.join(".huskies").join("scheduled_timers.json"), + )); let agents = Arc::new(AgentPool::new(port, watcher_tx.clone())); let services = Arc::new(crate::services::Services { project_root: project_root.to_path_buf(), @@ -90,5 +93,6 @@ pub(super) fn build_agent_app_context( bot_shutdown: None, matrix_shutdown_tx: None, timer_store, + scheduled_timer_store, } } diff --git a/server/src/http/context.rs b/server/src/http/context.rs index be4ad0b8..a5de829c 100644 --- a/server/src/http/context.rs +++ b/server/src/http/context.rs @@ -2,7 +2,7 @@ use crate::agents::ReconciliationEvent; use crate::io::watcher::WatcherEvent; use crate::rebuild::{BotShutdownNotifier, ShutdownReason}; -use crate::service::timer::TimerStore; +use crate::service::timer::{ScheduledTimerStore, TimerStore}; use crate::services::Services; use crate::state::SessionState; use crate::store::JsonFileStore; @@ -74,6 +74,9 @@ pub struct AppContext { /// spawned by the bot so that cancellations take effect in-memory rather /// than only on disk. pub timer_store: Arc, + /// Generic scheduled-timer store for `schedule_timer` / `list_timers` / + /// `cancel_timer` MCP tools. Persists to `.huskies/scheduled_timers.json`. + pub scheduled_timer_store: Arc, } #[cfg(test)] @@ -90,6 +93,9 @@ impl AppContext { let timer_store = Arc::new(TimerStore::load( project_root.join(".huskies").join("timers.json"), )); + let scheduled_timer_store = Arc::new(ScheduledTimerStore::load( + project_root.join(".huskies").join("scheduled_timers.json"), + )); let agents = Arc::new(AgentPool::new(3001, watcher_tx.clone())); let services = Arc::new(Services { project_root: project_root.clone(), @@ -116,6 +122,7 @@ impl AppContext { bot_shutdown: None, matrix_shutdown_tx: None, timer_store, + scheduled_timer_store, } } } diff --git a/server/src/http/mcp/dispatch.rs b/server/src/http/mcp/dispatch.rs index a725b1b1..d924c763 100644 --- a/server/src/http/mcp/dispatch.rs +++ b/server/src/http/mcp/dispatch.rs @@ -4,7 +4,7 @@ use serde_json::Value; use super::{ agent_tools, diagnostics, git_tools, merge_tools, qa_tools, shell_tools, status_tools, - story_tools, wizard_tools, + story_tools, timer_tools, wizard_tools, }; use crate::http::context::AppContext; @@ -128,6 +128,10 @@ pub async fn dispatch_tool_call( "wizard_confirm" => wizard_tools::tool_wizard_confirm(ctx), "wizard_skip" => wizard_tools::tool_wizard_skip(ctx), "wizard_retry" => wizard_tools::tool_wizard_retry(ctx), + // Scheduled timer tools + "schedule_timer" => timer_tools::tool_schedule_timer(&args, ctx), + "list_timers" => timer_tools::tool_list_timers(ctx), + "cancel_timer" => timer_tools::tool_cancel_timer(&args, ctx), _ => Err(format!("Unknown tool: {tool_name}")), } } diff --git a/server/src/http/mcp/mod.rs b/server/src/http/mcp/mod.rs index 39c4cc40..cd352dee 100644 --- a/server/src/http/mcp/mod.rs +++ b/server/src/http/mcp/mod.rs @@ -25,6 +25,8 @@ pub mod shell_tools; pub mod status_tools; /// MCP tools for creating, updating, and managing stories and bugs. pub mod story_tools; +/// MCP tools for generic scheduled timers (`schedule_timer`, `list_timers`, `cancel_timer`). +pub mod timer_tools; /// MCP tool schema definitions for `tools/list`. pub mod tools_list; /// MCP tools for the project setup wizard. diff --git a/server/src/http/mcp/timer_tools.rs b/server/src/http/mcp/timer_tools.rs new file mode 100644 index 00000000..000ea8e0 --- /dev/null +++ b/server/src/http/mcp/timer_tools.rs @@ -0,0 +1,324 @@ +//! MCP tools for generic scheduled timers: `schedule_timer`, `list_timers`, +//! `cancel_timer`. + +use chrono::Utc; +use serde_json::Value; + +use crate::http::context::AppContext; +use crate::service::timer::scheduled::{ + ScheduledTimer, TimerAction, TimerMode, parse_interval_str, parse_when_str, +}; + +// ── schedule_timer ──────────────────────────────────────────────────────────── + +/// Register a new scheduled timer. +pub fn tool_schedule_timer(args: &Value, ctx: &AppContext) -> Result { + let when = args + .get("when") + .and_then(|v| v.as_str()) + .ok_or("Missing required parameter: 'when'")?; + + let action_val = args + .get("action") + .ok_or("Missing required parameter: 'action'")?; + + let action_type = action_val + .get("type") + .and_then(|v| v.as_str()) + .ok_or("'action' must have a 'type' field: 'mcp' or 'prompt'")?; + + let action = match action_type { + "mcp" => { + let method = action_val + .get("method") + .and_then(|v| v.as_str()) + .ok_or("'mcp' action requires 'method'")? + .to_string(); + let args_val = action_val + .get("args") + .cloned() + .unwrap_or(serde_json::json!({})); + TimerAction::Mcp { + method, + args: args_val, + } + } + "prompt" => { + let text = action_val + .get("text") + .and_then(|v| v.as_str()) + .ok_or("'prompt' action requires 'text'")? + .to_string(); + TimerAction::Prompt { text } + } + other => { + return Err(format!( + "Unknown action type '{other}'. Use 'mcp' or 'prompt'." + )); + } + }; + + let mode_str = args.get("mode").and_then(|v| v.as_str()).unwrap_or("once"); + + let now = Utc::now(); + let (fire_at, inferred_interval) = parse_when_str(when, now)?; + + let mode = match mode_str { + "once" => TimerMode::Once, + "recurring" => { + // Prefer explicit `interval` arg; fall back to interval inferred from + // the relative `when` duration. + let interval_secs = if let Some(iv) = args + .get("interval") + .and_then(|v| v.as_str()) + .and_then(parse_interval_str) + { + iv + } else if let Some(iv) = inferred_interval { + iv + } else { + return Err( + "Recurring timers with an absolute 'when' require an 'interval' \ + parameter (e.g. '2h', '15 minutes')." + .to_string(), + ); + }; + TimerMode::Recurring { interval_secs } + } + other => { + return Err(format!( + "Unknown mode '{other}'. Use 'once' or 'recurring'." + )); + } + }; + + let label = args + .get("label") + .and_then(|v| v.as_str()) + .map(str::to_string); + + let timer = ScheduledTimer { + id: ScheduledTimer::new_id(), + label: label.clone(), + fire_at, + action, + mode, + created_at: now, + }; + + let id = timer.id.clone(); + ctx.scheduled_timer_store.add(timer)?; + + let label_str = label.map(|l| format!(": {l}")).unwrap_or_default(); + Ok(format!( + "Timer `{id}`{label_str} scheduled to fire at {fire_at} (UTC)." + )) +} + +// ── list_timers ─────────────────────────────────────────────────────────────── + +/// List all pending scheduled timers. +pub fn tool_list_timers(ctx: &AppContext) -> Result { + let timers = ctx.scheduled_timer_store.list(); + if timers.is_empty() { + return Ok("No pending scheduled timers.".to_string()); + } + let mut lines = vec![format!("**Pending scheduled timers ({}):**", timers.len())]; + for t in &timers { + let label = t + .label + .as_deref() + .map(|l| format!(" ({l})")) + .unwrap_or_default(); + let mode_str = match &t.mode { + TimerMode::Once => "once".to_string(), + TimerMode::Recurring { interval_secs } => { + format!("recurring every {interval_secs}s") + } + }; + let action_str = match &t.action { + TimerAction::Mcp { method, .. } => format!("mcp:{method}"), + TimerAction::Prompt { text } => { + let preview: String = text.chars().take(40).collect(); + format!("prompt:{preview}") + } + }; + lines.push(format!( + "- `{}` {}{} — fires at {} UTC [{}] action={}", + t.id, + mode_str, + label, + t.fire_at.format("%Y-%m-%dT%H:%M:%SZ"), + mode_str, + action_str + )); + } + Ok(lines.join("\n")) +} + +// ── cancel_timer ────────────────────────────────────────────────────────────── + +/// Cancel a scheduled timer by ID. +pub fn tool_cancel_timer(args: &Value, ctx: &AppContext) -> Result { + let id = args + .get("id") + .and_then(|v| v.as_str()) + .ok_or("Missing required parameter: 'id'")?; + + if ctx.scheduled_timer_store.remove_by_id(id) { + Ok(format!("Timer `{id}` cancelled.")) + } else { + Err(format!("No scheduled timer found with id '{id}'.")) + } +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::http::test_helpers::test_ctx; + + #[test] + fn schedule_timer_mcp_action_once() { + let dir = tempfile::tempdir().unwrap(); + let ctx = test_ctx(dir.path()); + let args = serde_json::json!({ + "when": "in 1 hour", + "action": { "type": "mcp", "method": "get_pipeline_status", "args": {} }, + "mode": "once", + "label": "hourly check" + }); + let result = tool_schedule_timer(&args, &ctx).unwrap(); + assert!(result.contains("tm-"), "expected timer id: {result}"); + assert!( + result.contains("scheduled to fire"), + "expected schedule msg: {result}" + ); + assert_eq!(ctx.scheduled_timer_store.list().len(), 1); + } + + #[test] + fn schedule_timer_prompt_action() { + let dir = tempfile::tempdir().unwrap(); + let ctx = test_ctx(dir.path()); + let args = serde_json::json!({ + "when": "30m", + "action": { "type": "prompt", "text": "standup time!" } + }); + let result = tool_schedule_timer(&args, &ctx).unwrap(); + assert!(result.contains("scheduled"), "unexpected: {result}"); + let list = ctx.scheduled_timer_store.list(); + assert_eq!(list.len(), 1); + assert!(matches!(&list[0].action, TimerAction::Prompt { .. })); + } + + #[test] + fn schedule_timer_recurring_relative() { + let dir = tempfile::tempdir().unwrap(); + let ctx = test_ctx(dir.path()); + let args = serde_json::json!({ + "when": "in 2 hours", + "action": { "type": "prompt", "text": "reminder" }, + "mode": "recurring" + }); + let result = tool_schedule_timer(&args, &ctx).unwrap(); + assert!(result.contains("scheduled"), "unexpected: {result}"); + let list = ctx.scheduled_timer_store.list(); + assert_eq!(list.len(), 1); + assert!( + matches!( + &list[0].mode, + TimerMode::Recurring { + interval_secs: 7200 + } + ), + "expected 7200s interval: {:?}", + list[0].mode + ); + } + + #[test] + fn schedule_timer_recurring_absolute_requires_interval() { + let dir = tempfile::tempdir().unwrap(); + let ctx = test_ctx(dir.path()); + let args = serde_json::json!({ + "when": "2026-12-01T09:00:00Z", + "action": { "type": "prompt", "text": "reminder" }, + "mode": "recurring" + // no interval + }); + assert!(tool_schedule_timer(&args, &ctx).is_err()); + } + + #[test] + fn schedule_timer_recurring_absolute_with_interval() { + let dir = tempfile::tempdir().unwrap(); + let ctx = test_ctx(dir.path()); + let args = serde_json::json!({ + "when": "2026-12-01T09:00:00Z", + "action": { "type": "prompt", "text": "daily" }, + "mode": "recurring", + "interval": "24h" + }); + let result = tool_schedule_timer(&args, &ctx).unwrap(); + assert!(result.contains("scheduled"), "unexpected: {result}"); + let list = ctx.scheduled_timer_store.list(); + assert!( + matches!( + &list[0].mode, + TimerMode::Recurring { + interval_secs: 86400 + } + ), + "expected 86400s (24h): {:?}", + list[0].mode + ); + } + + #[test] + fn list_timers_empty() { + let dir = tempfile::tempdir().unwrap(); + let ctx = test_ctx(dir.path()); + let result = tool_list_timers(&ctx).unwrap(); + assert!(result.contains("No pending"), "unexpected: {result}"); + } + + #[test] + fn list_timers_shows_entries() { + let dir = tempfile::tempdir().unwrap(); + let ctx = test_ctx(dir.path()); + let args = serde_json::json!({ + "when": "1h", + "action": { "type": "prompt", "text": "foo" }, + "label": "my-timer" + }); + tool_schedule_timer(&args, &ctx).unwrap(); + let result = tool_list_timers(&ctx).unwrap(); + assert!(result.contains("my-timer"), "unexpected: {result}"); + assert!(result.contains("tm-"), "expected timer id: {result}"); + } + + #[test] + fn cancel_timer_removes_it() { + let dir = tempfile::tempdir().unwrap(); + let ctx = test_ctx(dir.path()); + let args = serde_json::json!({ + "when": "1h", + "action": { "type": "prompt", "text": "foo" } + }); + tool_schedule_timer(&args, &ctx).unwrap(); + let id = ctx.scheduled_timer_store.list()[0].id.clone(); + + let result = tool_cancel_timer(&serde_json::json!({ "id": &id }), &ctx).unwrap(); + assert!(result.contains("cancelled"), "unexpected: {result}"); + assert!(ctx.scheduled_timer_store.list().is_empty()); + } + + #[test] + fn cancel_timer_not_found_is_err() { + let dir = tempfile::tempdir().unwrap(); + let ctx = test_ctx(dir.path()); + assert!(tool_cancel_timer(&serde_json::json!({ "id": "tm-notexist" }), &ctx).is_err()); + } +} diff --git a/server/src/http/mcp/tools_list/mod.rs b/server/src/http/mcp/tools_list/mod.rs index 54836947..38020c27 100644 --- a/server/src/http/mcp/tools_list/mod.rs +++ b/server/src/http/mcp/tools_list/mod.rs @@ -108,7 +108,10 @@ mod tests { assert!(names.contains(&"unfreeze_story")); assert!(names.contains(&"find_orphaned_items")); assert!(names.contains(&"recover_half_written_items")); - assert_eq!(tools.len(), 76); + assert!(names.contains(&"schedule_timer")); + assert!(names.contains(&"list_timers")); + assert!(names.contains(&"cancel_timer")); + assert_eq!(tools.len(), 79); } #[test] diff --git a/server/src/http/mcp/tools_list/system_tools.rs b/server/src/http/mcp/tools_list/system_tools.rs index 97f56264..b4d864d6 100644 --- a/server/src/http/mcp/tools_list/system_tools.rs +++ b/server/src/http/mcp/tools_list/system_tools.rs @@ -357,6 +357,66 @@ pub(super) fn system_tools() -> Vec { "properties": {} } }), + json!({ + "name": "schedule_timer", + "description": "Register a durable scheduled timer that fires an MCP call or reminder at a specified time. Survives server restart and rebuild. Use 'once' for a one-shot timer or 'recurring' to re-arm automatically after each fire.", + "inputSchema": { + "type": "object", + "properties": { + "when": { + "type": "string", + "description": "When to fire: relative duration ('in 2 hours', '15 minutes', '30s') or ISO 8601 absolute timestamp ('2026-05-15T10:00:00Z')" + }, + "action": { + "type": "object", + "description": "What to do when the timer fires. Either {\"type\":\"mcp\",\"method\":\"tool_name\",\"args\":{...}} for an MCP call or {\"type\":\"prompt\",\"text\":\"...\"} for a server-log reminder.", + "properties": { + "type": { "type": "string", "enum": ["mcp", "prompt"] }, + "method": { "type": "string", "description": "MCP tool name (required for type='mcp')" }, + "args": { "type": "object", "description": "MCP tool arguments (for type='mcp')" }, + "text": { "type": "string", "description": "Reminder text (required for type='prompt')" } + }, + "required": ["type"] + }, + "mode": { + "type": "string", + "enum": ["once", "recurring"], + "description": "Fire once (default) or keep recurring. For recurring with absolute 'when', also provide 'interval'." + }, + "interval": { + "type": "string", + "description": "Re-arm interval for recurring timers when 'when' is an absolute timestamp (e.g. '24h', '1 hour', '30 minutes'). Optional when 'when' is relative — interval is inferred." + }, + "label": { + "type": "string", + "description": "Optional human-readable label shown in list_timers output." + } + }, + "required": ["when", "action"] + } + }), + json!({ + "name": "list_timers", + "description": "List all pending scheduled timers registered via schedule_timer.", + "inputSchema": { + "type": "object", + "properties": {} + } + }), + json!({ + "name": "cancel_timer", + "description": "Cancel a scheduled timer by its ID. Use list_timers to find the ID.", + "inputSchema": { + "type": "object", + "properties": { + "id": { + "type": "string", + "description": "Timer ID to cancel (e.g. 'tm-a3f7b9c2')" + } + }, + "required": ["id"] + } + }), json!({ "name": "recover_half_written_items", "description": "Recover half-written (orphaned) pipeline items by lifting each onto a fresh non-tombstoned ID. For each orphan, allocates a new ID, copies the content, re-applies item_type and depends_on from front matter, verifies the new entry is live in the CRDT, then removes the orphaned row. Pass 'only' to restrict recovery to specific orphan IDs (safe for live systems); omit to recover all. Returns old_id → new_id mappings for every successful recovery.", diff --git a/server/src/main.rs b/server/src/main.rs index 567fe6d3..30963ee6 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -283,6 +283,16 @@ async fn main() -> Result<(), std::io::Error> { let timer_store_for_tick = Arc::clone(&timer_store); let timer_store_for_bot = Arc::clone(&timer_store); + // Generic scheduled-timer store for the `schedule_timer` MCP tool. + let scheduled_timer_store = + std::sync::Arc::new(crate::service::timer::ScheduledTimerStore::load( + startup_root + .as_ref() + .map(|r| r.join(".huskies").join("scheduled_timers.json")) + .unwrap_or_else(|| std::path::PathBuf::from("/tmp/huskies-scheduled-timers.json")), + )); + let scheduled_timer_store_for_tick = Arc::clone(&scheduled_timer_store); + let ctx = AppContext { state: app_state, store, @@ -295,6 +305,7 @@ async fn main() -> Result<(), std::io::Error> { bot_shutdown: bot_ctxs.shutdown_notifier.clone(), matrix_shutdown_tx: Some(Arc::clone(&bot_ctxs.matrix_shutdown_tx)), timer_store, + scheduled_timer_store, }; // Per-project event buffer for the gateway's `/api/events` poller. @@ -305,7 +316,7 @@ async fn main() -> Result<(), std::io::Error> { startup::tick_loop::spawn_gateway_relay(&startup_root, Arc::clone(&services.status)); let app = build_routes( - ctx, + ctx.clone(), bot_ctxs.whatsapp_ctx.clone(), bot_ctxs.slack_ctx.clone(), port, @@ -316,7 +327,9 @@ async fn main() -> Result<(), std::io::Error> { startup::tick_loop::spawn_tick_loop( Arc::clone(&startup_agents), timer_store_for_tick, + scheduled_timer_store_for_tick, startup_root.clone(), + ctx, ); // Optional Matrix bot. diff --git a/server/src/service/timer/mod.rs b/server/src/service/timer/mod.rs index f75c8983..6fbad4f8 100644 --- a/server/src/service/timer/mod.rs +++ b/server/src/service/timer/mod.rs @@ -15,10 +15,13 @@ pub(super) mod io; pub(super) mod parse; pub(super) mod persist; pub(super) mod schedule; +/// Generic scheduled-timer store: MCP-callable timers with `Mcp` / `Prompt` actions. +pub mod scheduled; pub use io::{TimerStore, next_occurrence_of_hhmm, spawn_rate_limit_auto_scheduler, tick_once}; pub use parse::{TimerCommand, extract_timer_command}; pub use persist::TimerEntry; +pub use scheduled::ScheduledTimerStore; use std::path::Path; diff --git a/server/src/service/timer/scheduled.rs b/server/src/service/timer/scheduled.rs new file mode 100644 index 00000000..399bee6e --- /dev/null +++ b/server/src/service/timer/scheduled.rs @@ -0,0 +1,464 @@ +//! Generic scheduled timers: fire MCP calls or prompts at a configured instant, +//! with optional recurring re-arm. +//! +//! Separate from [`crate::service::timer::TimerStore`] which handles story-scoped +//! rate-limit retry timers. This module provides a general-purpose scheduling +//! primitive with explicit action types and unique IDs. + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::path::{Path, PathBuf}; +use std::sync::Mutex; +use uuid::Uuid; + +// ── Action ──────────────────────────────────────────────────────────────────── + +/// What to execute when a scheduled timer fires. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum TimerAction { + /// Call an MCP tool by name with JSON arguments. + Mcp { + /// MCP tool name (e.g. `"start_agent"`, `"create_bug"`). + method: String, + /// JSON arguments object for the tool call. + #[serde(default)] + args: serde_json::Value, + }, + /// Broadcast a reminder text to the server log. + Prompt { + /// Free-form reminder text logged when the timer fires. + text: String, + }, +} + +// ── Mode ───────────────────────────────────────────────────────────────────── + +/// Whether the timer fires once or repeatedly. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "mode", rename_all = "snake_case")] +pub enum TimerMode { + /// Fire once, then discard. + Once, + /// Fire, then re-arm at `fire_at + interval_secs`. + Recurring { + /// Seconds between firings. + interval_secs: u64, + }, +} + +// ── Entry ───────────────────────────────────────────────────────────────────── + +/// A single generic scheduled timer entry. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct ScheduledTimer { + /// Unique stable identifier (UUID v4 short form, e.g. `"tm-a3f7b9c2"`). + pub id: String, + /// Optional human-readable label. + pub label: Option, + /// UTC instant when this timer should next fire. + pub fire_at: DateTime, + /// What to execute on fire. + pub action: TimerAction, + /// One-shot or recurring. + pub mode: TimerMode, + /// UTC instant when this timer was created. + pub created_at: DateTime, +} + +impl ScheduledTimer { + /// Generate a fresh timer ID. + pub fn new_id() -> String { + let id = Uuid::new_v4(); + let hex = id.as_simple().to_string(); + let prefix: String = hex.chars().take(8).collect(); + format!("tm-{prefix}") + } +} + +// ── Store ───────────────────────────────────────────────────────────────────── + +/// Persistent store for generic scheduled timers, backed by a JSON file. +pub struct ScheduledTimerStore { + path: PathBuf, + timers: Mutex>, +} + +impl ScheduledTimerStore { + /// Load (or create empty) store from `path`. + pub fn load(path: PathBuf) -> Self { + let timers = if path.exists() { + std::fs::read_to_string(&path) + .ok() + .and_then(|s| serde_json::from_str::>(&s).ok()) + .unwrap_or_default() + } else { + Vec::new() + }; + Self { + path, + timers: Mutex::new(timers), + } + } + + fn save(path: &Path, timers: &[ScheduledTimer]) -> Result<(), String> { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).map_err(|e| format!("mkdir failed: {e}"))?; + } + let content = + serde_json::to_string_pretty(timers).map_err(|e| format!("serialize failed: {e}"))?; + std::fs::write(path, content).map_err(|e| format!("write failed: {e}")) + } + + /// Add a timer. Errors if a timer with the same ID already exists. + pub fn add(&self, timer: ScheduledTimer) -> Result<(), String> { + let mut timers = self.timers.lock().unwrap(); + if timers.iter().any(|t| t.id == timer.id) { + return Err(format!("Timer with id '{}' already exists", timer.id)); + } + timers.push(timer); + Self::save(&self.path, &timers) + } + + /// Remove a timer by ID. Returns `true` if one was removed. + pub fn remove_by_id(&self, id: &str) -> bool { + let mut timers = self.timers.lock().unwrap(); + let before = timers.len(); + timers.retain(|t| t.id != id); + let removed = timers.len() < before; + if removed { + let _ = Self::save(&self.path, &timers); + } + removed + } + + /// Return all pending timers (cloned). + pub fn list(&self) -> Vec { + self.timers.lock().unwrap().clone() + } + + /// Remove and return all timers whose `fire_at` ≤ `now`. + pub fn take_due(&self, now: DateTime) -> Vec { + let mut timers = self.timers.lock().unwrap(); + let mut due = Vec::new(); + let mut remaining = Vec::new(); + for t in timers.drain(..) { + if t.fire_at <= now { + due.push(t); + } else { + remaining.push(t); + } + } + *timers = remaining; + if !due.is_empty() { + let _ = Self::save(&self.path, &timers); + } + due + } +} + +// ── When parsing ────────────────────────────────────────────────────────────── + +/// Parse a `when` string into `(fire_at, optional_interval_secs)`. +/// +/// Accepted forms: +/// - Relative: `"in 2 hours"`, `"in 15 minutes"`, `"in 30 seconds"`, +/// `"2h"`, `"15m"`, `"30s"` (the `"in "` prefix is optional) +/// - Absolute: ISO 8601 / RFC 3339 timestamp (`"2026-05-15T10:00:00Z"`) +/// +/// Returns the interval in seconds for relative durations so callers can +/// set up recurring re-arm intervals. +pub fn parse_when_str( + when: &str, + now: DateTime, +) -> Result<(DateTime, Option), String> { + let trimmed = when.trim(); + + // Strip optional "in " prefix, then attempt interval parse. + let candidate = trimmed + .strip_prefix("in ") + .or_else(|| trimmed.strip_prefix("In ")) + .unwrap_or(trimmed); + + if let Some(secs) = parse_interval_str(candidate) { + let fire_at = now + chrono::Duration::seconds(secs as i64); + return Ok((fire_at, Some(secs))); + } + + // Try ISO 8601 / RFC 3339 absolute timestamp. + if let Ok(dt) = trimmed.parse::>() { + return Ok((dt, None)); + } + if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(trimmed) { + return Ok((dt.with_timezone(&Utc), None)); + } + + Err(format!( + "Cannot parse 'when': '{when}'. \ + Use 'in 2 hours', '15 minutes', '2h', or an ISO 8601 timestamp." + )) +} + +/// Parse an interval string like `"2h"`, `"15m"`, `"30s"`, `"2 hours"`, +/// `"15 minutes"`, `"30 seconds"` into a number of seconds. +pub fn parse_interval_str(s: &str) -> Option { + let s = s.trim().to_lowercase(); + let (num_str, unit_str) = split_num_unit(&s); + let n: u64 = num_str.parse().ok()?; + if n == 0 { + return None; + } + match unit_str.trim() { + "h" | "hr" | "hrs" | "hour" | "hours" => Some(n * 3600), + "m" | "min" | "mins" | "minute" | "minutes" => Some(n * 60), + "s" | "sec" | "secs" | "second" | "seconds" => Some(n), + _ => None, + } +} + +/// Split a string like `"2hours"` or `"15 minutes"` into `("2", "hours")`. +fn split_num_unit(s: &str) -> (&str, &str) { + let idx = s.find(|c: char| !c.is_ascii_digit()).unwrap_or(s.len()); + let (num, unit) = s.split_at(idx); + (num, unit.trim_start_matches(' ')) +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + use tempfile::TempDir; + + fn fixed_now() -> DateTime { + Utc.with_ymd_and_hms(2026, 5, 15, 10, 0, 0).unwrap() + } + + // ── parse_interval_str ──────────────────────────────────────────────────── + + #[test] + fn parse_interval_hours_long() { + assert_eq!(parse_interval_str("2 hours"), Some(7200)); + } + + #[test] + fn parse_interval_hours_short() { + assert_eq!(parse_interval_str("2h"), Some(7200)); + } + + #[test] + fn parse_interval_minutes_long() { + assert_eq!(parse_interval_str("15 minutes"), Some(900)); + } + + #[test] + fn parse_interval_minutes_short() { + assert_eq!(parse_interval_str("15m"), Some(900)); + } + + #[test] + fn parse_interval_seconds_long() { + assert_eq!(parse_interval_str("30 seconds"), Some(30)); + } + + #[test] + fn parse_interval_seconds_short() { + assert_eq!(parse_interval_str("30s"), Some(30)); + } + + #[test] + fn parse_interval_zero_returns_none() { + assert_eq!(parse_interval_str("0 hours"), None); + } + + #[test] + fn parse_interval_unknown_unit_returns_none() { + assert_eq!(parse_interval_str("5 fortnights"), None); + } + + // ── parse_when_str ──────────────────────────────────────────────────────── + + #[test] + fn parse_when_relative_with_in_prefix() { + let now = fixed_now(); + let (fire_at, interval) = parse_when_str("in 2 hours", now).unwrap(); + assert_eq!(interval, Some(7200)); + assert_eq!(fire_at, now + chrono::Duration::seconds(7200)); + } + + #[test] + fn parse_when_relative_without_in_prefix() { + let now = fixed_now(); + let (fire_at, interval) = parse_when_str("15 minutes", now).unwrap(); + assert_eq!(interval, Some(900)); + assert_eq!(fire_at, now + chrono::Duration::seconds(900)); + } + + #[test] + fn parse_when_short_form() { + let now = fixed_now(); + let (fire_at, interval) = parse_when_str("2h", now).unwrap(); + assert_eq!(interval, Some(7200)); + assert_eq!(fire_at, now + chrono::Duration::seconds(7200)); + } + + #[test] + fn parse_when_iso8601() { + let now = fixed_now(); + let (fire_at, interval) = parse_when_str("2026-05-15T12:00:00Z", now).unwrap(); + assert_eq!(interval, None); + let expected = Utc.with_ymd_and_hms(2026, 5, 15, 12, 0, 0).unwrap(); + assert_eq!(fire_at, expected); + } + + #[test] + fn parse_when_invalid_returns_err() { + let now = fixed_now(); + assert!(parse_when_str("next Tuesday", now).is_err()); + } + + // ── ScheduledTimerStore ─────────────────────────────────────────────────── + + fn make_timer(id: &str, fire_at: DateTime) -> ScheduledTimer { + ScheduledTimer { + id: id.to_string(), + label: None, + fire_at, + action: TimerAction::Prompt { + text: "test".to_string(), + }, + mode: TimerMode::Once, + created_at: Utc::now(), + } + } + + #[test] + fn store_empty_on_missing_file() { + let dir = TempDir::new().unwrap(); + let store = ScheduledTimerStore::load(dir.path().join("timers.json")); + assert!(store.list().is_empty()); + } + + #[test] + fn store_add_and_list() { + let dir = TempDir::new().unwrap(); + let store = ScheduledTimerStore::load(dir.path().join("timers.json")); + let t = Utc::now() + chrono::Duration::hours(1); + store.add(make_timer("tm-aabbccdd", t)).unwrap(); + let list = store.list(); + assert_eq!(list.len(), 1); + assert_eq!(list[0].id, "tm-aabbccdd"); + } + + #[test] + fn store_add_duplicate_id_fails() { + let dir = TempDir::new().unwrap(); + let store = ScheduledTimerStore::load(dir.path().join("timers.json")); + let t = Utc::now() + chrono::Duration::hours(1); + store.add(make_timer("tm-aabbccdd", t)).unwrap(); + assert!(store.add(make_timer("tm-aabbccdd", t)).is_err()); + } + + #[test] + fn store_remove_by_id() { + let dir = TempDir::new().unwrap(); + let store = ScheduledTimerStore::load(dir.path().join("timers.json")); + let t = Utc::now() + chrono::Duration::hours(1); + store.add(make_timer("tm-aabbccdd", t)).unwrap(); + assert!(store.remove_by_id("tm-aabbccdd")); + assert!(!store.remove_by_id("tm-aabbccdd")); + assert!(store.list().is_empty()); + } + + #[test] + fn store_persists_and_reloads() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("timers.json"); + let t = Utc::now() + chrono::Duration::hours(2); + { + let store = ScheduledTimerStore::load(path.clone()); + store.add(make_timer("tm-aabbccdd", t)).unwrap(); + } + let store2 = ScheduledTimerStore::load(path); + assert_eq!(store2.list().len(), 1); + assert_eq!(store2.list()[0].id, "tm-aabbccdd"); + } + + #[test] + fn take_due_returns_only_past_entries() { + let dir = TempDir::new().unwrap(); + let store = ScheduledTimerStore::load(dir.path().join("timers.json")); + let past = Utc::now() - chrono::Duration::minutes(1); + let future = Utc::now() + chrono::Duration::hours(1); + store.add(make_timer("tm-past", past)).unwrap(); + store.add(make_timer("tm-future", future)).unwrap(); + + let due = store.take_due(Utc::now()); + assert_eq!(due.len(), 1); + assert_eq!(due[0].id, "tm-past"); + assert_eq!(store.list().len(), 1); + assert_eq!(store.list()[0].id, "tm-future"); + } + + #[test] + fn take_due_with_already_past_fires_immediately() { + let dir = TempDir::new().unwrap(); + let store = ScheduledTimerStore::load(dir.path().join("timers.json")); + // Simulate server restart: timer scheduled in past (catch-up semantics) + let way_past = Utc::now() - chrono::Duration::hours(3); + store.add(make_timer("tm-catchup", way_past)).unwrap(); + let due = store.take_due(Utc::now()); + assert_eq!(due.len(), 1, "past timer must fire on next tick"); + } + + #[test] + fn new_id_has_tm_prefix() { + let id = ScheduledTimer::new_id(); + assert!(id.starts_with("tm-"), "expected 'tm-' prefix: {id}"); + assert_eq!(id.len(), 11, "expected 'tm-' + 8 hex chars: {id}"); + } + + // ── TimerAction serde ───────────────────────────────────────────────────── + + #[test] + fn timer_action_mcp_round_trips() { + let action = TimerAction::Mcp { + method: "start_agent".to_string(), + args: serde_json::json!({ "story_id": "42_foo" }), + }; + let s = serde_json::to_string(&action).unwrap(); + let back: TimerAction = serde_json::from_str(&s).unwrap(); + assert_eq!(action, back); + } + + #[test] + fn timer_action_prompt_round_trips() { + let action = TimerAction::Prompt { + text: "daily standup reminder".to_string(), + }; + let s = serde_json::to_string(&action).unwrap(); + let back: TimerAction = serde_json::from_str(&s).unwrap(); + assert_eq!(action, back); + } + + // ── TimerMode serde ─────────────────────────────────────────────────────── + + #[test] + fn timer_mode_once_round_trips() { + let mode = TimerMode::Once; + let s = serde_json::to_string(&mode).unwrap(); + let back: TimerMode = serde_json::from_str(&s).unwrap(); + assert_eq!(mode, back); + } + + #[test] + fn timer_mode_recurring_round_trips() { + let mode = TimerMode::Recurring { + interval_secs: 3600, + }; + let s = serde_json::to_string(&mode).unwrap(); + let back: TimerMode = serde_json::from_str(&s).unwrap(); + assert_eq!(mode, back); + } +} diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index 25a7a116..8a24d3b6 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -4,9 +4,14 @@ use crate::agents::{AgentPool, ReconciliationEvent}; use crate::config; use crate::gateway_relay; +use crate::http::context::AppContext; +use crate::http::mcp::dispatch::dispatch_tool_call; use crate::io; use crate::service; use crate::service::status::StatusBroadcaster; +use crate::service::timer::scheduled::{ + ScheduledTimer, ScheduledTimerStore, TimerAction, TimerMode, +}; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::broadcast; @@ -126,6 +131,8 @@ pub(crate) fn spawn_event_bridges( /// current wall-clock time against each timer's `due_at` field. Cannot be /// reactive because timers encode absolute timestamps that only become /// actionable when the clock reaches them. +/// - **Scheduled timer tick** (every second): fires generic scheduled timers +/// registered via the `schedule_timer` MCP tool. /// - **Agent watchdog** (every 30 seconds): detects orphaned `Running` agents /// by comparing elapsed time since the last heartbeat. Cannot be reactive /// because the absence of an event (no heartbeat) is what signals a problem; @@ -136,10 +143,16 @@ pub(crate) fn spawn_event_bridges( pub(crate) fn spawn_tick_loop( agents: Arc, timer_store: Arc, + scheduled_timer_store: Arc, root: Option, + ctx: AppContext, ) { let pending_count = timer_store.list().len(); - crate::slog!("[tick] Unified tick loop started; {pending_count} pending timer(s)"); + let scheduled_count = scheduled_timer_store.list().len(); + crate::slog!( + "[tick] Unified tick loop started; {pending_count} rate-limit timer(s), \ + {scheduled_count} scheduled timer(s)" + ); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); @@ -157,6 +170,9 @@ pub(crate) fn spawn_tick_loop( } } + // Generic scheduled timer tick. + tick_scheduled_timers(&scheduled_timer_store, &ctx).await; + // Time-based: the watchdog detects silence (no heartbeat within a // timeout window). A `TransitionFired` subscriber cannot observe the // absence of events, so this must remain on a periodic tick. @@ -176,6 +192,85 @@ pub(crate) fn spawn_tick_loop( }); } +/// Fire any due generic scheduled timers and re-arm recurring ones. +/// +/// Called every second from the unified tick loop. Catch-up semantics: timers +/// whose `fire_at` is already in the past fire on the next tick after boot. +async fn tick_scheduled_timers(store: &Arc, ctx: &AppContext) { + let due = store.take_due(chrono::Utc::now()); + if due.is_empty() { + return; + } + + crate::slog!("[scheduled-timer] {} timer(s) due", due.len()); + + for timer in due { + fire_scheduled_timer(&timer, ctx).await; + + // Re-arm recurring timers. + if let TimerMode::Recurring { interval_secs } = &timer.mode { + let next_fire = timer.fire_at + chrono::Duration::seconds(*interval_secs as i64); + let rearmed = ScheduledTimer { + id: timer.id.clone(), + label: timer.label.clone(), + fire_at: next_fire, + action: timer.action.clone(), + mode: timer.mode.clone(), + created_at: timer.created_at, + }; + if let Err(e) = store.add(rearmed) { + crate::slog_error!("[scheduled-timer] Failed to re-arm timer {}: {e}", timer.id); + } else { + crate::slog!( + "[scheduled-timer] Recurring timer {} re-armed for {}", + timer.id, + next_fire + ); + } + } + } +} + +/// Execute a single scheduled timer's action. +async fn fire_scheduled_timer(timer: &ScheduledTimer, ctx: &AppContext) { + let label = timer.label.as_deref().unwrap_or(&timer.id); + match &timer.action { + TimerAction::Mcp { method, args } => { + crate::slog!( + "[scheduled-timer] Firing MCP action '{}' for timer {}", + method, + timer.id + ); + match dispatch_tool_call(method, args.clone(), ctx).await { + Ok(result) => { + crate::slog!( + "[scheduled-timer] Timer {} ('{}') fired OK: {}", + timer.id, + label, + result.chars().take(200).collect::() + ); + } + Err(e) => { + crate::slog_error!( + "[scheduled-timer] Timer {} ('{}') MCP call '{}' failed: {e}", + timer.id, + label, + method + ); + } + } + } + TimerAction::Prompt { text } => { + crate::slog!( + "[scheduled-timer] Prompt timer {} ('{}') fired: {}", + timer.id, + label, + text + ); + } + } +} + /// Spawn the gateway relay task if `gateway_url` is configured in /// `project.toml` or the `HUSKIES_GATEWAY_URL` environment variable. pub(crate) fn spawn_gateway_relay(startup_root: &Option, status: Arc) {