storkit: merge 421_story_timer_command_for_deferred_agent_start

This commit is contained in:
dave
2026-03-28 08:59:36 +00:00
parent 1ec9aaab8a
commit cf5424f9a6
7 changed files with 836 additions and 0 deletions
+6
View File
@@ -15,6 +15,7 @@ mod move_story;
mod overview; mod overview;
mod show; mod show;
mod status; mod status;
mod timer;
mod triage; mod triage;
mod unreleased; mod unreleased;
@@ -160,6 +161,11 @@ pub fn commands() -> &'static [BotCommand] {
description: "Rebuild the server binary and restart", description: "Rebuild the server binary and restart",
handler: handle_rebuild_fallback, handler: handle_rebuild_fallback,
}, },
BotCommand {
name: "timer",
description: "Schedule a deferred agent start: `timer <story_id> <HH:MM>`, `timer list`, `timer cancel <story_id>`",
handler: timer::handle_timer,
},
BotCommand { BotCommand {
name: "unreleased", name: "unreleased",
description: "Show stories merged to master since the last release tag", description: "Show stories merged to master since the last release tag",
+54
View File
@@ -0,0 +1,54 @@
//! Handler stub for the `timer` command.
//!
//! The real implementation lives in `crate::chat::timer` (async). This
//! stub exists only so that `timer` appears in the help registry — the
//! handler always returns `None` so the bot's message loop falls through to
//! the async handler.
use super::CommandContext;
pub(super) fn handle_timer(_ctx: &CommandContext) -> Option<String> {
// Handled asynchronously in each transport's message dispatcher.
None
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
#[test]
fn timer_command_is_registered() {
use super::super::commands;
let found = commands().iter().any(|c| c.name == "timer");
assert!(found, "timer command must be in the registry");
}
#[test]
fn timer_command_appears_in_help() {
let result = super::super::tests::try_cmd_addressed(
"Timmy",
"@timmy:homeserver.local",
"@timmy help",
);
let output = result.unwrap();
assert!(
output.contains("timer"),
"help should list timer command: {output}"
);
}
#[test]
fn timer_command_falls_through_to_none_in_registry() {
let result = super::super::tests::try_cmd_addressed(
"Timmy",
"@timmy:homeserver.local",
"@timmy timer list",
);
assert!(
result.is_none(),
"timer should not produce a sync response (handled async): {result:?}"
);
}
}
+1
View File
@@ -5,6 +5,7 @@
//! notifications) to work against any chat platform — Matrix, WhatsApp, etc. //! notifications) to work against any chat platform — Matrix, WhatsApp, etc.
pub mod commands; pub mod commands;
pub mod timer;
pub mod transport; pub mod transport;
pub mod util; pub mod util;
+736
View File
@@ -0,0 +1,736 @@
//! Deferred agent start via one-shot timers.
//!
//! Provides [`TimerStore`] for persisting timers to `.storkit/timers.json`,
//! a 30-second tick loop ([`spawn_timer_tick_loop`]) that fires due timers,
//! and command parsing / handling for the `timer` bot command.
use chrono::{DateTime, Duration, Local, NaiveTime, TimeZone, Utc};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
// ── Data types ─────────────────────────────────────────────────────────────
/// A single scheduled timer entry.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TimerEntry {
/// The full story ID (filename stem, e.g. `421_story_foo`).
pub story_id: String,
/// UTC instant at which the timer should fire.
pub scheduled_at: DateTime<Utc>,
}
// ── TimerStore ─────────────────────────────────────────────────────────────
/// Persistent store for pending timers, backed by a JSON file.
pub struct TimerStore {
path: PathBuf,
timers: Mutex<Vec<TimerEntry>>,
}
impl TimerStore {
/// Load the timer store from `path`. Returns an empty store if the file
/// does not exist or cannot be parsed.
pub fn load(path: PathBuf) -> Self {
let timers = if path.exists() {
std::fs::read_to_string(&path)
.ok()
.and_then(|s| serde_json::from_str::<Vec<TimerEntry>>(&s).ok())
.unwrap_or_default()
} else {
Vec::new()
};
Self {
path,
timers: Mutex::new(timers),
}
}
fn save_locked(path: &Path, timers: &[TimerEntry]) -> Result<(), String> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("Failed to create directory: {e}"))?;
}
let content =
serde_json::to_string_pretty(timers).map_err(|e| format!("Serialize failed: {e}"))?;
std::fs::write(path, content).map_err(|e| format!("Failed to write timers: {e}"))
}
/// Add a timer and persist to disk.
pub fn add(&self, story_id: String, scheduled_at: DateTime<Utc>) -> Result<(), String> {
let mut timers = self.timers.lock().unwrap();
timers.push(TimerEntry {
story_id,
scheduled_at,
});
Self::save_locked(&self.path, &timers)
}
/// Remove the timer for `story_id`. Returns `true` if one was removed.
pub fn remove(&self, story_id: &str) -> bool {
let mut timers = self.timers.lock().unwrap();
let before = timers.len();
timers.retain(|t| t.story_id != story_id);
let removed = timers.len() < before;
if removed {
let _ = Self::save_locked(&self.path, &timers);
}
removed
}
/// Return all pending timers (cloned).
pub fn list(&self) -> Vec<TimerEntry> {
self.timers.lock().unwrap().clone()
}
/// Remove and return all timers whose `scheduled_at` is ≤ `now`.
/// Persists the updated list to disk if any timers were removed.
pub fn take_due(&self, now: DateTime<Utc>) -> Vec<TimerEntry> {
let mut timers = self.timers.lock().unwrap();
let mut due = Vec::new();
let mut remaining = Vec::new();
for t in timers.drain(..) {
if t.scheduled_at <= now {
due.push(t);
} else {
remaining.push(t);
}
}
*timers = remaining;
if !due.is_empty() {
let _ = Self::save_locked(&self.path, &timers);
}
due
}
}
// ── Tick loop ──────────────────────────────────────────────────────────────
/// Spawn a background tokio task that fires due timers every 30 seconds.
///
/// Same pattern as the watchdog in `agents::pool::auto_assign`.
/// When a timer fires, `start_agent` is called for the story. If all coders
/// are busy the story remains in `2_current/` and auto-assign will pick it up.
pub fn spawn_timer_tick_loop(
store: Arc<TimerStore>,
agents: Arc<crate::agents::AgentPool>,
project_root: PathBuf,
) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
let now = Utc::now();
let due = store.take_due(now);
for entry in due {
crate::slog!(
"[timer] Timer fired for story {}; calling start_agent",
entry.story_id
);
match agents
.start_agent(&project_root, &entry.story_id, None, None)
.await
{
Ok(info) => {
crate::slog!(
"[timer] Started agent {} for story {}",
info.agent_name,
entry.story_id
);
}
Err(e) => {
crate::slog!(
"[timer] Failed to start agent for story {}: {e}",
entry.story_id
);
}
}
}
}
});
}
// ── Command types ──────────────────────────────────────────────────────────
/// A parsed `timer` command.
#[derive(Debug, PartialEq)]
pub enum TimerCommand {
/// `timer <story_id_or_number> <HH:MM>` — schedule a deferred start.
Schedule {
story_number_or_id: String,
hhmm: String,
},
/// `timer list` — list all pending timers.
List,
/// `timer cancel <story_id_or_number>` — remove a pending timer.
Cancel { story_number_or_id: String },
/// Malformed arguments.
BadArgs,
}
// ── Command extraction ─────────────────────────────────────────────────────
/// Parse a `timer` command from a raw message body.
///
/// Strips the bot mention prefix and matches the `timer` keyword.
/// Returns `None` when the message is not a timer command at all.
pub fn extract_timer_command(
message: &str,
bot_name: &str,
bot_user_id: &str,
) -> Option<TimerCommand> {
let stripped = strip_mention(message, bot_name, bot_user_id);
let trimmed = stripped
.trim()
.trim_start_matches(|c: char| !c.is_alphanumeric());
let (cmd, args) = match trimmed.split_once(char::is_whitespace) {
Some((c, a)) => (c, a.trim()),
None => (trimmed, ""),
};
if !cmd.eq_ignore_ascii_case("timer") {
return None;
}
// `timer` with no args or `timer list`
if args.is_empty() || args.eq_ignore_ascii_case("list") {
return Some(TimerCommand::List);
}
let (sub, rest) = match args.split_once(char::is_whitespace) {
Some((s, r)) => (s, r.trim()),
None => (args, ""),
};
// `timer cancel <id>`
if sub.eq_ignore_ascii_case("cancel") {
if rest.is_empty() {
return Some(TimerCommand::BadArgs);
}
return Some(TimerCommand::Cancel {
story_number_or_id: rest.to_string(),
});
}
// `timer <id> <HH:MM>`
if rest.is_empty() {
return Some(TimerCommand::BadArgs);
}
Some(TimerCommand::Schedule {
story_number_or_id: sub.to_string(),
hhmm: rest.to_string(),
})
}
// ── Command handler ────────────────────────────────────────────────────────
/// Handle a parsed `timer` command. Returns a markdown-formatted response.
pub async fn handle_timer_command(
cmd: TimerCommand,
store: &TimerStore,
project_root: &Path,
) -> String {
match cmd {
TimerCommand::Schedule {
story_number_or_id,
hhmm,
} => {
let story_id = match resolve_story_id(&story_number_or_id, project_root) {
Some(id) => id,
None => {
return format!(
"No story with number or ID **{story_number_or_id}** found."
);
}
};
// The story must already be in 2_current/ — the timer does not move stories.
let current_dir = project_root.join(".storkit").join("work").join("2_current");
let story_file = current_dir.join(format!("{story_id}.md"));
if !story_file.exists() {
return format!(
"Story **{story_id}** is not in `work/2_current/`. \
Move it to current before scheduling a timer."
);
}
let scheduled_at = match next_occurrence_of_hhmm(&hhmm) {
Some(t) => t,
None => {
return format!(
"Invalid time **{hhmm}**. Use `HH:MM` format (e.g. `14:30`)."
);
}
};
match store.add(story_id.clone(), scheduled_at) {
Ok(()) => {
let local_time = scheduled_at.with_timezone(&Local);
format!(
"Timer set for **{story_id}** at **{}** (server local time).",
local_time.format("%Y-%m-%d %H:%M")
)
}
Err(e) => format!("Failed to save timer: {e}"),
}
}
TimerCommand::List => {
let timers = store.list();
if timers.is_empty() {
return "No pending timers.".to_string();
}
let mut lines = vec!["**Pending timers:**".to_string()];
for t in &timers {
let local_time = t.scheduled_at.with_timezone(&Local);
lines.push(format!(
"- **{}** → {}",
t.story_id,
local_time.format("%Y-%m-%d %H:%M")
));
}
lines.join("\n")
}
TimerCommand::Cancel { story_number_or_id } => {
let story_id = resolve_story_id(&story_number_or_id, project_root)
.unwrap_or(story_number_or_id.clone());
if store.remove(&story_id) {
format!("Timer for **{story_id}** cancelled.")
} else {
format!("No timer found for **{story_id}**.")
}
}
TimerCommand::BadArgs => {
"Usage:\n\
- `timer <story_id> <HH:MM>` — schedule deferred start\n\
- `timer list` — show pending timers\n\
- `timer cancel <story_id>` — remove a timer"
.to_string()
}
}
}
// ── Helpers ────────────────────────────────────────────────────────────────
/// Parse `HH:MM` and return the next UTC instant at which the server-local
/// clock will read that time. If the time has already passed today, returns
/// tomorrow's occurrence.
pub fn next_occurrence_of_hhmm(hhmm: &str) -> Option<DateTime<Utc>> {
let (hh, mm) = hhmm.split_once(':')?;
let hours: u32 = hh.parse().ok()?;
let minutes: u32 = mm.parse().ok()?;
if hours > 23 || minutes > 59 {
return None;
}
let target_time = NaiveTime::from_hms_opt(hours, minutes, 0)?;
let now_local = Local::now();
let today = now_local.date_naive();
let candidate = today.and_time(target_time);
let candidate_local = Local.from_local_datetime(&candidate).single()?;
if candidate_local > now_local {
Some(candidate_local.to_utc())
} else {
let tomorrow = today + Duration::days(1);
let tomorrow_candidate = tomorrow.and_time(target_time);
let tomorrow_local = Local.from_local_datetime(&tomorrow_candidate).single()?;
Some(tomorrow_local.to_utc())
}
}
/// Resolve a story ID from a numeric story number or a full ID string.
///
/// Searches all pipeline stages. Returns `None` only when the input is
/// numeric but no matching file is found.
fn resolve_story_id(number_or_id: &str, project_root: &Path) -> Option<String> {
const STAGES: &[&str] = &[
"1_backlog",
"2_current",
"3_qa",
"4_merge",
"5_done",
"6_archived",
];
// Full ID (contains underscores) — return as-is; validation happens at file-check time.
if number_or_id.contains('_') {
return Some(number_or_id.to_string());
}
// Numeric lookup.
if !number_or_id.chars().all(|c| c.is_ascii_digit()) {
return None;
}
for stage in STAGES {
let dir = project_root.join(".storkit").join("work").join(stage);
if !dir.exists() {
continue;
}
if let Ok(entries) = std::fs::read_dir(&dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("md") {
continue;
}
if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
let file_num = stem
.split('_')
.next()
.filter(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit()))
.unwrap_or("");
if file_num == number_or_id {
return Some(stem.to_string());
}
}
}
}
}
None
}
fn strip_mention<'a>(message: &'a str, bot_name: &str, bot_user_id: &str) -> &'a str {
let trimmed = message.trim();
if let Some(rest) = strip_prefix_ci(trimmed, bot_user_id) {
return rest;
}
if let Some(localpart) = bot_user_id.split(':').next()
&& let Some(rest) = strip_prefix_ci(trimmed, localpart)
{
return rest;
}
if let Some(rest) = strip_prefix_ci(trimmed, bot_name) {
return rest;
}
trimmed
}
fn strip_prefix_ci<'a>(text: &'a str, prefix: &str) -> Option<&'a str> {
if text.len() < prefix.len() {
return None;
}
if !text[..prefix.len()].eq_ignore_ascii_case(prefix) {
return None;
}
let rest = &text[prefix.len()..];
match rest.chars().next() {
None => Some(rest),
Some(c) if c.is_alphanumeric() || c == '-' || c == '_' => None,
_ => Some(rest),
}
}
// ── Tests ──────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
// ── next_occurrence_of_hhmm ─────────────────────────────────────────
#[test]
fn valid_hhmm_returns_some() {
let result = next_occurrence_of_hhmm("14:30");
assert!(result.is_some(), "valid HH:MM should return Some");
}
#[test]
fn invalid_hhmm_missing_colon_returns_none() {
assert!(next_occurrence_of_hhmm("1430").is_none());
}
#[test]
fn invalid_hhmm_bad_hours_returns_none() {
assert!(next_occurrence_of_hhmm("25:00").is_none());
}
#[test]
fn invalid_hhmm_bad_minutes_returns_none() {
assert!(next_occurrence_of_hhmm("12:60").is_none());
}
#[test]
fn next_occurrence_is_in_the_future() {
let result = next_occurrence_of_hhmm("14:30").unwrap();
assert!(result > Utc::now(), "next occurrence must be in the future");
}
// ── extract_timer_command ───────────────────────────────────────────
#[test]
fn non_timer_command_returns_none() {
assert!(extract_timer_command("Timmy help", "Timmy", "@bot:home").is_none());
}
#[test]
fn timer_list_no_args() {
assert_eq!(
extract_timer_command("Timmy timer", "Timmy", "@bot:home"),
Some(TimerCommand::List)
);
}
#[test]
fn timer_list_explicit() {
assert_eq!(
extract_timer_command("Timmy timer list", "Timmy", "@bot:home"),
Some(TimerCommand::List)
);
}
#[test]
fn timer_cancel_story_id() {
assert_eq!(
extract_timer_command(
"Timmy timer cancel 421_story_foo",
"Timmy",
"@bot:home"
),
Some(TimerCommand::Cancel {
story_number_or_id: "421_story_foo".to_string()
})
);
}
#[test]
fn timer_cancel_no_arg_is_bad_args() {
assert_eq!(
extract_timer_command("Timmy timer cancel", "Timmy", "@bot:home"),
Some(TimerCommand::BadArgs)
);
}
#[test]
fn timer_schedule_with_story_id() {
assert_eq!(
extract_timer_command(
"Timmy timer 421_story_foo 14:30",
"Timmy",
"@bot:home"
),
Some(TimerCommand::Schedule {
story_number_or_id: "421_story_foo".to_string(),
hhmm: "14:30".to_string(),
})
);
}
#[test]
fn timer_schedule_with_number() {
assert_eq!(
extract_timer_command("Timmy timer 421 14:30", "Timmy", "@bot:home"),
Some(TimerCommand::Schedule {
story_number_or_id: "421".to_string(),
hhmm: "14:30".to_string(),
})
);
}
#[test]
fn timer_schedule_missing_time_is_bad_args() {
assert_eq!(
extract_timer_command(
"Timmy timer 421_story_foo",
"Timmy",
"@bot:home"
),
Some(TimerCommand::BadArgs)
);
}
// ── TimerStore ──────────────────────────────────────────────────────
#[test]
fn timer_store_empty_on_missing_file() {
let dir = TempDir::new().unwrap();
let store = TimerStore::load(dir.path().join("timers.json"));
assert!(store.list().is_empty());
}
#[test]
fn timer_store_add_and_list() {
let dir = TempDir::new().unwrap();
let store = TimerStore::load(dir.path().join("timers.json"));
let t = Utc::now() + Duration::hours(1);
store.add("story_1".to_string(), t).unwrap();
let list = store.list();
assert_eq!(list.len(), 1);
assert_eq!(list[0].story_id, "story_1");
}
#[test]
fn timer_store_remove() {
let dir = TempDir::new().unwrap();
let store = TimerStore::load(dir.path().join("timers.json"));
let t = Utc::now() + Duration::hours(1);
store.add("story_1".to_string(), t).unwrap();
assert!(store.remove("story_1"));
assert!(!store.remove("story_1")); // already gone
assert!(store.list().is_empty());
}
#[test]
fn timer_store_persists_and_reloads() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("timers.json");
let t = Utc::now() + Duration::hours(2);
{
let store = TimerStore::load(path.clone());
store.add("421_story_foo".to_string(), t).unwrap();
}
// Reload from disk.
let store2 = TimerStore::load(path);
let list = store2.list();
assert_eq!(list.len(), 1);
assert_eq!(list[0].story_id, "421_story_foo");
}
#[test]
fn take_due_returns_only_past_entries() {
let dir = TempDir::new().unwrap();
let store = TimerStore::load(dir.path().join("timers.json"));
let past = Utc::now() - Duration::minutes(1);
let future = Utc::now() + Duration::hours(1);
store.add("past_story".to_string(), past).unwrap();
store.add("future_story".to_string(), future).unwrap();
let due = store.take_due(Utc::now());
assert_eq!(due.len(), 1);
assert_eq!(due[0].story_id, "past_story");
assert_eq!(store.list().len(), 1);
assert_eq!(store.list()[0].story_id, "future_story");
}
#[test]
fn multiple_timers_same_time_all_returned() {
let dir = TempDir::new().unwrap();
let store = TimerStore::load(dir.path().join("timers.json"));
let past = Utc::now() - Duration::minutes(1);
store.add("story_a".to_string(), past).unwrap();
store.add("story_b".to_string(), past).unwrap();
let due = store.take_due(Utc::now());
assert_eq!(due.len(), 2, "both timers at same time must fire");
}
// ── handle_timer_command ────────────────────────────────────────────
#[tokio::test]
async fn handle_list_empty() {
let dir = TempDir::new().unwrap();
let store = TimerStore::load(dir.path().join("timers.json"));
let result = handle_timer_command(TimerCommand::List, &store, dir.path()).await;
assert!(result.contains("No pending timers"), "unexpected: {result}");
}
#[tokio::test]
async fn handle_cancel_not_found() {
let dir = TempDir::new().unwrap();
let store = TimerStore::load(dir.path().join("timers.json"));
let result = handle_timer_command(
TimerCommand::Cancel {
story_number_or_id: "421_story_foo".to_string(),
},
&store,
dir.path(),
)
.await;
assert!(
result.contains("No timer found"),
"unexpected: {result}"
);
}
#[tokio::test]
async fn handle_schedule_story_not_in_current() {
let dir = TempDir::new().unwrap();
// Set up directory structure with no story in 2_current
std::fs::create_dir_all(dir.path().join(".storkit/work/2_current")).unwrap();
let store = TimerStore::load(dir.path().join("timers.json"));
let result = handle_timer_command(
TimerCommand::Schedule {
story_number_or_id: "421_story_foo".to_string(),
hhmm: "14:30".to_string(),
},
&store,
dir.path(),
)
.await;
assert!(
result.contains("not in `work/2_current/`"),
"unexpected: {result}"
);
}
#[tokio::test]
async fn handle_schedule_success() {
let dir = TempDir::new().unwrap();
let current_dir = dir.path().join(".storkit/work/2_current");
std::fs::create_dir_all(&current_dir).unwrap();
std::fs::write(current_dir.join("421_story_foo.md"), "---\nname: Foo\n---").unwrap();
let store = TimerStore::load(dir.path().join("timers.json"));
let result = handle_timer_command(
TimerCommand::Schedule {
story_number_or_id: "421_story_foo".to_string(),
hhmm: "23:59".to_string(),
},
&store,
dir.path(),
)
.await;
assert!(
result.contains("Timer set for"),
"unexpected: {result}"
);
assert_eq!(store.list().len(), 1);
}
#[tokio::test]
async fn handle_schedule_invalid_time() {
let dir = TempDir::new().unwrap();
let current_dir = dir.path().join(".storkit/work/2_current");
std::fs::create_dir_all(&current_dir).unwrap();
std::fs::write(current_dir.join("421_story_foo.md"), "---\nname: Foo\n---").unwrap();
let store = TimerStore::load(dir.path().join("timers.json"));
let result = handle_timer_command(
TimerCommand::Schedule {
story_number_or_id: "421_story_foo".to_string(),
hhmm: "99:00".to_string(),
},
&store,
dir.path(),
)
.await;
assert!(result.contains("Invalid time"), "unexpected: {result}");
}
#[tokio::test]
async fn handle_cancel_existing_timer() {
let dir = TempDir::new().unwrap();
let store = TimerStore::load(dir.path().join("timers.json"));
let future = Utc::now() + Duration::hours(1);
store.add("421_story_foo".to_string(), future).unwrap();
let result = handle_timer_command(
TimerCommand::Cancel {
story_number_or_id: "421_story_foo".to_string(),
},
&store,
dir.path(),
)
.await;
assert!(result.contains("cancelled"), "unexpected: {result}");
assert!(store.list().is_empty());
}
#[tokio::test]
async fn handle_list_with_entries() {
let dir = TempDir::new().unwrap();
let store = TimerStore::load(dir.path().join("timers.json"));
let future = Utc::now() + Duration::hours(1);
store.add("421_story_foo".to_string(), future).unwrap();
let result = handle_timer_command(TimerCommand::List, &store, dir.path()).await;
assert!(result.contains("421_story_foo"), "unexpected: {result}");
assert!(result.contains("Pending timers"), "unexpected: {result}");
}
}
@@ -1,4 +1,5 @@
use crate::agents::AgentPool; use crate::agents::AgentPool;
use crate::chat::timer::TimerStore;
use crate::chat::ChatTransport; use crate::chat::ChatTransport;
use crate::http::context::{PermissionDecision, PermissionForward}; use crate::http::context::{PermissionDecision, PermissionForward};
use matrix_sdk::ruma::{OwnedEventId, OwnedRoomId, OwnedUserId}; use matrix_sdk::ruma::{OwnedEventId, OwnedRoomId, OwnedUserId};
@@ -55,6 +56,8 @@ pub struct BotContext {
/// All message I/O goes through this abstraction so the bot logic works /// All message I/O goes through this abstraction so the bot logic works
/// with any platform, not just Matrix. /// with any platform, not just Matrix.
pub transport: Arc<dyn ChatTransport>, pub transport: Arc<dyn ChatTransport>,
/// Persistent store for pending deferred-start timers.
pub timer_store: Arc<TimerStore>,
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -103,6 +106,9 @@ mod tests {
"test-token".to_string(), "test-token".to_string(),
"pipeline_notification".to_string(), "pipeline_notification".to_string(),
)), )),
timer_store: Arc::new(crate::chat::timer::TimerStore::load(
std::path::PathBuf::from("/tmp/timers.json"),
)),
}; };
// Clone must work (required by Matrix SDK event handler injection). // Clone must work (required by Matrix SDK event handler injection).
let _cloned = ctx.clone(); let _cloned = ctx.clone();
@@ -433,6 +433,29 @@ pub(super) async fn on_room_message(
return; return;
} }
// Check for the timer command, which requires async file I/O and cannot
// be handled by the sync command registry.
if let Some(timer_cmd) = crate::chat::timer::extract_timer_command(
&user_message,
&ctx.bot_name,
ctx.bot_user_id.as_str(),
) {
slog!("[matrix-bot] Handling timer command from {sender}: {timer_cmd:?}");
let response = crate::chat::timer::handle_timer_command(
timer_cmd,
&ctx.timer_store,
&ctx.project_root,
)
.await;
let html = markdown_to_html(&response);
if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &response, &html).await
&& let Ok(event_id) = msg_id.parse()
{
ctx.bot_sent_event_ids.lock().await.insert(event_id);
}
return;
}
// Spawn a separate task so the Matrix sync loop is not blocked while we // Spawn a separate task so the Matrix sync loop is not blocked while we
// wait for the LLM response (which can take several seconds). // wait for the LLM response (which can take several seconds).
tokio::spawn(async move { tokio::spawn(async move {
@@ -215,6 +215,15 @@ pub async fn run_bot(
.unwrap_or_else(|| "Assistant".to_string()); .unwrap_or_else(|| "Assistant".to_string());
let announce_bot_name = bot_name.clone(); let announce_bot_name = bot_name.clone();
let timer_store = Arc::new(crate::chat::timer::TimerStore::load(
project_root.join(".storkit").join("timers.json"),
));
crate::chat::timer::spawn_timer_tick_loop(
Arc::clone(&timer_store),
Arc::clone(&agents),
project_root.clone(),
);
let ctx = BotContext { let ctx = BotContext {
bot_user_id, bot_user_id,
target_room_ids, target_room_ids,
@@ -231,6 +240,7 @@ pub async fn run_bot(
agents, agents,
htop_sessions: Arc::new(TokioMutex::new(HashMap::new())), htop_sessions: Arc::new(TokioMutex::new(HashMap::new())),
transport: Arc::clone(&transport), transport: Arc::clone(&transport),
timer_store,
}; };
slog!("[matrix-bot] Cryptographic identity verification is always ON — commands from unencrypted rooms or unverified devices are rejected"); slog!("[matrix-bot] Cryptographic identity verification is always ON — commands from unencrypted rooms or unverified devices are rejected");