From 5678f2a5565611b6980dcc33fc18ec69b380bc22 Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 14 May 2026 20:08:09 +0000 Subject: [PATCH] huskies: merge 1061 --- .../20250514000000_event_triggers.sql | 7 + server/migrations/20250514000001_timers.sql | 4 + .../20250514000002_scheduled_timers.sql | 8 + server/src/db/mod.rs | 2 +- server/src/db/shadow_write.rs | 19 ++ server/src/main.rs | 45 ++-- server/src/service/event_triggers/store.rs | 254 ++++++++++++++++-- server/src/service/timer/io.rs | 184 +++++++++++-- server/src/service/timer/scheduled.rs | 195 +++++++++++++- server/src/startup/project.rs | 113 ++++++++ server/src/startup/tick_loop.rs | 3 +- 11 files changed, 752 insertions(+), 82 deletions(-) create mode 100644 server/migrations/20250514000000_event_triggers.sql create mode 100644 server/migrations/20250514000001_timers.sql create mode 100644 server/migrations/20250514000002_scheduled_timers.sql diff --git a/server/migrations/20250514000000_event_triggers.sql b/server/migrations/20250514000000_event_triggers.sql new file mode 100644 index 00000000..0923e4c6 --- /dev/null +++ b/server/migrations/20250514000000_event_triggers.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS event_triggers ( + id TEXT PRIMARY KEY, + predicate_json TEXT NOT NULL, + action_json TEXT NOT NULL, + mode TEXT NOT NULL, + created_at TEXT NOT NULL +); diff --git a/server/migrations/20250514000001_timers.sql b/server/migrations/20250514000001_timers.sql new file mode 100644 index 00000000..4dd6b91f --- /dev/null +++ b/server/migrations/20250514000001_timers.sql @@ -0,0 +1,4 @@ +CREATE TABLE IF NOT EXISTS timers ( + story_id TEXT PRIMARY KEY, + scheduled_at TEXT NOT NULL +); diff --git a/server/migrations/20250514000002_scheduled_timers.sql b/server/migrations/20250514000002_scheduled_timers.sql new file mode 100644 index 00000000..e00ef74d --- /dev/null +++ b/server/migrations/20250514000002_scheduled_timers.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS scheduled_timers ( + id TEXT PRIMARY KEY, + label TEXT, + fire_at TEXT NOT NULL, + action_json TEXT NOT NULL, + mode_json TEXT NOT NULL, + created_at TEXT NOT NULL +); diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 286c290d..20a5e01b 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -29,7 +29,7 @@ pub mod shadow_write; pub use content_store::{ContentKey, all_content_ids, delete_content, read_content, write_content}; pub use ops::{ItemMeta, delete_item, move_item_stage, next_item_number, write_item_with_content}; -pub use shadow_write::init; +pub use shadow_write::{get_shared_pool, init}; #[cfg(test)] pub use content_store::ensure_content_store; diff --git a/server/src/db/shadow_write.rs b/server/src/db/shadow_write.rs index d45a8382..0ccb5bea 100644 --- a/server/src/db/shadow_write.rs +++ b/server/src/db/shadow_write.rs @@ -3,6 +3,10 @@ //! `init` opens the database, runs migrations, loads existing content into the //! in-memory store, and spawns the write loop. All subsequent writes are sent //! over an unbounded channel so callers never block on I/O. +//! +//! The opened pool is stored in [`SHARED_POOL`] so that other subsystems +//! (event-trigger store, timer store, scheduled-timer store) can share the +//! same database connection without re-opening the file. use crate::slog; use sqlx::SqlitePool; use sqlx::sqlite::SqliteConnectOptions; @@ -11,6 +15,18 @@ use std::path::Path; use std::sync::OnceLock; use tokio::sync::mpsc; +/// The process-global SQLite pool, set once by [`init`]. +/// +/// Other modules call [`get_shared_pool`] to access the pool without needing +/// to pass it through every call-site. +static SHARED_POOL: OnceLock = OnceLock::new(); + +/// Return a reference to the shared pipeline database pool, if it has been +/// initialised by a prior call to [`init`]. +pub fn get_shared_pool() -> Option<&'static SqlitePool> { + SHARED_POOL.get() +} + /// A pending shadow write for one pipeline item. pub(super) struct PipelineWriteMsg { pub(super) story_id: String, @@ -47,6 +63,9 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { let pool = SqlitePool::connect_with(options).await?; sqlx::migrate!("./migrations").run(&pool).await?; + // Store pool in global static so other subsystems can reuse it. + let _ = SHARED_POOL.set(pool.clone()); + // Load existing content into the in-memory store. let rows: Vec<(String, Option)> = sqlx::query_as("SELECT id, content FROM pipeline_items WHERE content IS NOT NULL") diff --git a/server/src/main.rs b/server/src/main.rs index 71f80d1c..0b78af7a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -274,34 +274,37 @@ async fn main() -> Result<(), std::io::Error> { let matrix_shutdown_tx_for_rebuild = Arc::clone(&bot_ctxs.matrix_shutdown_tx); // Shared rate-limit retry timer store. - let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load( - startup_root - .as_ref() - .map(|r| r.join(".huskies").join("timers.json")) - .unwrap_or_else(|| std::path::PathBuf::from("/tmp/huskies-timers.json")), - )); + let timer_store = { + let pool = crate::db::get_shared_pool().expect("db pool must be initialised before stores"); + std::sync::Arc::new( + crate::service::timer::TimerStore::from_pool(pool.clone()) + .await + .expect("failed to load timer store from db"), + ) + }; let timer_store_for_tick = Arc::clone(&timer_store); let timer_store_for_bot = Arc::clone(&timer_store); // Event-based pipeline trigger store. - let event_trigger_store = std::sync::Arc::new( - crate::service::event_triggers::store::EventTriggerStore::load( - startup_root - .as_ref() - .map(|r| r.join(".huskies").join("event_triggers.json")) - .unwrap_or_else(|| std::path::PathBuf::from("/tmp/huskies-event-triggers.json")), - ), - ); + let event_trigger_store = { + let pool = crate::db::get_shared_pool().expect("db pool must be initialised before stores"); + std::sync::Arc::new( + crate::service::event_triggers::store::EventTriggerStore::from_pool(pool.clone()) + .await + .expect("failed to load event trigger store from db"), + ) + }; let event_trigger_store_for_subscriber = Arc::clone(&event_trigger_store); // Generic scheduled-timer store for the `schedule_timer` MCP tool. - let scheduled_timer_store = - std::sync::Arc::new(crate::service::timer::ScheduledTimerStore::load( - startup_root - .as_ref() - .map(|r| r.join(".huskies").join("scheduled_timers.json")) - .unwrap_or_else(|| std::path::PathBuf::from("/tmp/huskies-scheduled-timers.json")), - )); + let scheduled_timer_store = { + let pool = crate::db::get_shared_pool().expect("db pool must be initialised before stores"); + std::sync::Arc::new( + crate::service::timer::ScheduledTimerStore::from_pool(pool.clone()) + .await + .expect("failed to load scheduled timer store from db"), + ) + }; let scheduled_timer_store_for_tick = Arc::clone(&scheduled_timer_store); let ctx = AppContext { diff --git a/server/src/service/event_triggers/store.rs b/server/src/service/event_triggers/store.rs index 165c545f..3ea30c3f 100644 --- a/server/src/service/event_triggers/store.rs +++ b/server/src/service/event_triggers/store.rs @@ -1,25 +1,164 @@ -//! Persistent store for registered event triggers, backed by a JSON file. +//! Persistent store for registered event triggers, backed by SQLite (`pipeline.db`). //! -//! Loaded at server startup and kept in sync on every mutation. Thread-safe -//! via an internal `Mutex`. +//! Production code uses [`EventTriggerStore::from_pool`] which loads from the +//! `event_triggers` table and persists every mutation via a background writer +//! task. Unit tests use [`EventTriggerStore::load`] which keeps state in +//! memory without SQLite (no file I/O in tests). -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::Mutex; use chrono::Utc; use serde_json::Value; +use sqlx::SqlitePool; +use tokio::sync::mpsc; use super::{EventTrigger, FireMode, TriggerAction, TriggerPredicate}; +// ── Background writer ──────────────────────────────────────────────────────── + +/// Commands sent to the background SQLite writer. +enum WriteCmd { + Upsert(Box), + Delete(String), +} + +fn spawn_writer(pool: SqlitePool, mut rx: mpsc::UnboundedReceiver) { + tokio::spawn(async move { + while let Some(cmd) = rx.recv().await { + match cmd { + WriteCmd::Upsert(t) => { + let predicate_json = match serde_json::to_string(&t.predicate) { + Ok(j) => j, + Err(e) => { + crate::slog!("[event-triggers] Failed to serialize predicate: {e}"); + continue; + } + }; + let action_json = match serde_json::to_string(&t.action) { + Ok(j) => j, + Err(e) => { + crate::slog!("[event-triggers] Failed to serialize action: {e}"); + continue; + } + }; + let mode_str = match t.mode { + FireMode::Once => "once", + FireMode::Persistent => "persistent", + }; + let result = sqlx::query( + "INSERT INTO event_triggers \ + (id, predicate_json, action_json, mode, created_at) \ + VALUES (?1, ?2, ?3, ?4, ?5) \ + ON CONFLICT(id) DO UPDATE SET \ + predicate_json = excluded.predicate_json, \ + action_json = excluded.action_json, \ + mode = excluded.mode", + ) + .bind(&t.id) + .bind(&predicate_json) + .bind(&action_json) + .bind(mode_str) + .bind(t.created_at.to_rfc3339()) + .execute(&pool) + .await; + if let Err(e) = result { + crate::slog!("[event-triggers] DB upsert failed for '{}': {e}", t.id); + } + } + WriteCmd::Delete(id) => { + let result = sqlx::query("DELETE FROM event_triggers WHERE id = ?1") + .bind(&id) + .execute(&pool) + .await; + if let Err(e) = result { + crate::slog!("[event-triggers] DB delete failed for '{id}': {e}"); + } + } + } + } + }); +} + +// ── Store ──────────────────────────────────────────────────────────────────── + +/// Internal persistence backend. +enum Persistence { + /// SQLite-backed (production): mutations are sent via channel. + Sqlite(mpsc::UnboundedSender), + /// JSON-file-backed (tests): mutations write to disk synchronously. + Json(PathBuf), +} + /// Persistent store for [`EventTrigger`] entries. pub struct EventTriggerStore { - path: PathBuf, triggers: Mutex>, + persistence: Persistence, } impl EventTriggerStore { - /// Load the store from `path`. Returns an empty store if the file does - /// not exist or cannot be parsed. + /// Load from SQLite using the shared pipeline database pool. + /// + /// Reads all existing rows from the `event_triggers` table, then spawns a + /// background task that handles subsequent writes. This is the production + /// constructor; tests use [`EventTriggerStore::load`]. + pub async fn from_pool(pool: SqlitePool) -> Result { + let rows: Vec<(String, String, String, String, String)> = sqlx::query_as( + "SELECT id, predicate_json, action_json, mode, created_at \ + FROM event_triggers", + ) + .fetch_all(&pool) + .await?; + + let mut triggers = Vec::with_capacity(rows.len()); + for (id, pred_json, action_json, mode_str, created_at_str) in rows { + let predicate: TriggerPredicate = match serde_json::from_str(&pred_json) { + Ok(p) => p, + Err(e) => { + crate::slog!( + "[event-triggers] Skipping malformed predicate for trigger {id}: {e}" + ); + continue; + } + }; + let action: TriggerAction = match serde_json::from_str(&action_json) { + Ok(a) => a, + Err(e) => { + crate::slog!( + "[event-triggers] Skipping malformed action for trigger {id}: {e}" + ); + continue; + } + }; + let mode = if mode_str == "persistent" { + FireMode::Persistent + } else { + FireMode::Once + }; + let created_at = created_at_str + .parse::>() + .unwrap_or_else(|_| Utc::now()); + + triggers.push(EventTrigger { + id, + predicate, + action, + mode, + created_at, + }); + } + + let (tx, rx) = mpsc::unbounded_channel(); + spawn_writer(pool, rx); + + Ok(Self { + triggers: Mutex::new(triggers), + persistence: Persistence::Sqlite(tx), + }) + } + + /// Load from a JSON file path. Used by unit tests; returns an in-memory + /// store backed by the file for persistence. pub fn load(path: PathBuf) -> Self { let triggers = if path.exists() { std::fs::read_to_string(&path) @@ -30,22 +169,21 @@ impl EventTriggerStore { Vec::new() }; Self { - path, triggers: Mutex::new(triggers), + persistence: Persistence::Json(path), } } - fn persist(path: &Path, triggers: &[EventTrigger]) -> Result<(), String> { + fn persist_json(path: &std::path::Path, triggers: &[EventTrigger]) { if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent) - .map_err(|e| format!("Failed to create directory: {e}"))?; + let _ = std::fs::create_dir_all(parent); + } + if let Ok(json) = serde_json::to_string_pretty(triggers) { + let _ = std::fs::write(path, json); } - let json = serde_json::to_string_pretty(triggers) - .map_err(|e| format!("Serialization error: {e}"))?; - std::fs::write(path, json).map_err(|e| format!("Failed to write triggers: {e}")) } - /// Register a new trigger and persist to disk. + /// Register a new trigger and persist it. pub fn add( &self, predicate: TriggerPredicate, @@ -61,7 +199,30 @@ impl EventTriggerStore { }; let mut triggers = self.triggers.lock().unwrap(); triggers.push(trigger.clone()); - Self::persist(&self.path, &triggers)?; + + match &self.persistence { + Persistence::Sqlite(tx) => { + let _ = tx.send(WriteCmd::Upsert(Box::new(trigger.clone()))); + } + Persistence::Json(path) => { + Self::persist_json(path, &triggers); + } + } + + let mode_label = match trigger.mode { + FireMode::Once => "once", + FireMode::Persistent => "persistent", + }; + crate::slog!( + "[event-triggers] Registered trigger {} | predicate={:?} | mode={mode_label}", + trigger.id, + trigger + .predicate + .to_stage + .as_deref() + .or(trigger.predicate.event_kind.as_deref()) + .unwrap_or("*"), + ); Ok(trigger) } @@ -77,14 +238,22 @@ impl EventTriggerStore { triggers.retain(|t| t.id != id); let removed = triggers.len() < before; if removed { - let _ = Self::persist(&self.path, &triggers); + match &self.persistence { + Persistence::Sqlite(tx) => { + let _ = tx.send(WriteCmd::Delete(id.to_string())); + } + Persistence::Json(path) => { + Self::persist_json(path, &triggers); + } + } + crate::slog!("[event-triggers] Cancelled trigger {id} | reason=manual"); } removed } - /// Remove all triggers whose ids are in `ids` and return how many were removed. - /// - /// Used by the subscriber to delete `Once` triggers after they fire. + /// Remove all triggers whose ids are in `ids` and return how many were + /// removed. Used by the subscriber to delete `Once` triggers after they + /// fire. pub fn cancel_batch(&self, ids: &[String]) -> usize { if ids.is_empty() { return 0; @@ -94,16 +263,25 @@ impl EventTriggerStore { triggers.retain(|t| !ids.contains(&t.id)); let removed = before - triggers.len(); if removed > 0 { - let _ = Self::persist(&self.path, &triggers); + match &self.persistence { + Persistence::Sqlite(tx) => { + for id in ids { + let _ = tx.send(WriteCmd::Delete(id.clone())); + } + } + Persistence::Json(path) => { + Self::persist_json(path, &triggers); + } + } + for id in ids { + crate::slog!("[event-triggers] Cancelled trigger {id} | reason=once"); + } } removed } } -/// Generate a random UUIDv4-style identifier without pulling in the full uuid crate. -/// -/// Uses [`std::time`] entropy mixed with a thread-local counter. Not cryptographically -/// strong, but unique enough for trigger IDs. +/// Generate a random UUIDv4-style identifier. fn uuid_v4() -> String { use std::time::{SystemTime, UNIX_EPOCH}; let nanos = SystemTime::now() @@ -263,6 +441,32 @@ mod tests { assert_eq!(list[0].id, id); } + #[tokio::test] + async fn from_pool_persists_and_reloads() { + let dir = TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let opts = sqlx::sqlite::SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap(); + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + + let id = { + let store = EventTriggerStore::from_pool(pool.clone()).await.unwrap(); + let t = store + .add(basic_pred(), basic_action(), FireMode::Persistent) + .unwrap(); + // Give background writer time to flush. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + t.id.clone() + }; + + let store2 = EventTriggerStore::from_pool(pool.clone()).await.unwrap(); + let list = store2.list(); + assert_eq!(list.len(), 1); + assert_eq!(list[0].id, id); + } + #[test] fn store_cancel_batch() { let (_dir, store) = tmp_store(); diff --git a/server/src/service/timer/io.rs b/server/src/service/timer/io.rs index d82ff5b9..dfb03783 100644 --- a/server/src/service/timer/io.rs +++ b/server/src/service/timer/io.rs @@ -1,27 +1,99 @@ -//! I/O side of the timer service: filesystem persistence, clock reads, +//! I/O side of the timer service: SQLite persistence, clock reads, //! background task spawning, and story-ID resolution. //! //! This is the **only** place inside `service/timer/` that may perform side //! effects (filesystem reads/writes, clock reads, `tokio::spawn`). use chrono::{DateTime, Utc}; +use sqlx::SqlitePool; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; +use tokio::sync::mpsc; use super::persist::{TimerEntry, deserialize_timers, serialize_timers}; use super::schedule::next_occurrence_at; +// ── TimerStore background writer ───────────────────────────────────────────── + +enum TimerWriteCmd { + Upsert(TimerEntry), + Delete(String), +} + +fn spawn_timer_writer(pool: SqlitePool, mut rx: mpsc::UnboundedReceiver) { + tokio::spawn(async move { + while let Some(cmd) = rx.recv().await { + match cmd { + TimerWriteCmd::Upsert(e) => { + let result = sqlx::query( + "INSERT INTO timers (story_id, scheduled_at) VALUES (?1, ?2) \ + ON CONFLICT(story_id) DO UPDATE SET scheduled_at = excluded.scheduled_at", + ) + .bind(&e.story_id) + .bind(e.scheduled_at.to_rfc3339()) + .execute(&pool) + .await; + if let Err(e) = result { + crate::slog!("[timer] DB upsert failed: {e}"); + } + } + TimerWriteCmd::Delete(story_id) => { + let result = sqlx::query("DELETE FROM timers WHERE story_id = ?1") + .bind(&story_id) + .execute(&pool) + .await; + if let Err(e) = result { + crate::slog!("[timer] DB delete failed for '{story_id}': {e}"); + } + } + } + } + }); +} + // ── TimerStore ───────────────────────────────────────────────────────────── -/// Persistent store for pending timers, backed by a JSON file. +enum TimerPersistence { + Sqlite(mpsc::UnboundedSender), + Json(PathBuf), +} + +/// Persistent store for pending timers, backed by SQLite in production and +/// a JSON file in tests. pub struct TimerStore { - path: PathBuf, timers: Mutex>, + persistence: TimerPersistence, } impl TimerStore { - /// Load the timer store from `path`. Returns an empty store if the file - /// does not exist or cannot be parsed. + /// Load from the shared SQLite pool. This is the production constructor. + pub async fn from_pool(pool: SqlitePool) -> Result { + let rows: Vec<(String, String)> = + sqlx::query_as("SELECT story_id, scheduled_at FROM timers") + .fetch_all(&pool) + .await?; + + let timers = rows + .into_iter() + .filter_map(|(story_id, scheduled_at_str)| { + let scheduled_at = scheduled_at_str.parse::>().ok()?; + Some(TimerEntry { + story_id, + scheduled_at, + }) + }) + .collect(); + + let (tx, rx) = mpsc::unbounded_channel(); + spawn_timer_writer(pool, rx); + + Ok(Self { + timers: Mutex::new(timers), + persistence: TimerPersistence::Sqlite(tx), + }) + } + + /// Load the timer store from a JSON file path. Used by unit tests. pub fn load(path: PathBuf) -> Self { let timers = if path.exists() { std::fs::read_to_string(&path) @@ -32,12 +104,12 @@ impl TimerStore { Vec::new() }; Self { - path, timers: Mutex::new(timers), + persistence: TimerPersistence::Json(path), } } - fn save_locked(path: &Path, timers: &[TimerEntry]) -> Result<(), String> { + fn save_json(path: &Path, timers: &[TimerEntry]) -> Result<(), String> { if let Some(parent) = path.parent() { std::fs::create_dir_all(parent) .map_err(|e| format!("Failed to create directory: {e}"))?; @@ -46,14 +118,23 @@ impl TimerStore { std::fs::write(path, content).map_err(|e| format!("Failed to write timers: {e}")) } - /// Add a timer and persist to disk. + /// Add a timer and persist. pub fn add(&self, story_id: String, scheduled_at: DateTime) -> Result<(), String> { let mut timers = self.timers.lock().unwrap(); timers.push(TimerEntry { - story_id, + story_id: story_id.clone(), scheduled_at, }); - Self::save_locked(&self.path, &timers) + match &self.persistence { + TimerPersistence::Sqlite(tx) => { + let _ = tx.send(TimerWriteCmd::Upsert(TimerEntry { + story_id, + scheduled_at, + })); + Ok(()) + } + TimerPersistence::Json(path) => Self::save_json(path, &timers), + } } /// Remove the timer for `story_id`. Returns `true` if one was removed. @@ -63,7 +144,14 @@ impl TimerStore { timers.retain(|t| t.story_id != story_id); let removed = timers.len() < before; if removed { - let _ = Self::save_locked(&self.path, &timers); + match &self.persistence { + TimerPersistence::Sqlite(tx) => { + let _ = tx.send(TimerWriteCmd::Delete(story_id.to_string())); + } + TimerPersistence::Json(path) => { + let _ = Self::save_json(path, &timers); + } + } } removed } @@ -73,33 +161,47 @@ impl TimerStore { self.timers.lock().unwrap().clone() } - /// Add or update a timer for `story_id`. - /// - /// - If no timer exists for `story_id`, adds it. - /// - If a timer already exists and `scheduled_at` is **later**, updates it. - /// - If a timer already exists and `scheduled_at` is earlier or equal, no-op. - /// - /// Use this instead of [`add`] when auto-scheduling from rate-limit events to - /// avoid creating duplicates and to always keep the latest reset time. + /// Add or update a timer for `story_id`. When an existing timer has an + /// earlier `scheduled_at`, it is updated to the later value. When it is + /// already later, this is a no-op. pub fn upsert(&self, story_id: String, scheduled_at: DateTime) -> Result<(), String> { let mut timers = self.timers.lock().unwrap(); if let Some(existing) = timers.iter_mut().find(|t| t.story_id == story_id) { if scheduled_at > existing.scheduled_at { existing.scheduled_at = scheduled_at; - Self::save_locked(&self.path, &timers)?; + match &self.persistence { + TimerPersistence::Sqlite(tx) => { + let _ = tx.send(TimerWriteCmd::Upsert(TimerEntry { + story_id, + scheduled_at, + })); + } + TimerPersistence::Json(path) => { + Self::save_json(path, &timers)?; + } + } } } else { timers.push(TimerEntry { - story_id, + story_id: story_id.clone(), scheduled_at, }); - Self::save_locked(&self.path, &timers)?; + match &self.persistence { + TimerPersistence::Sqlite(tx) => { + let _ = tx.send(TimerWriteCmd::Upsert(TimerEntry { + story_id, + scheduled_at, + })); + } + TimerPersistence::Json(path) => { + Self::save_json(path, &timers)?; + } + } } Ok(()) } - /// Remove and return all timers whose `scheduled_at` is ≤ `now`. - /// Persists the updated list to disk if any timers were removed. + /// Remove and return all timers whose `scheduled_at` ≤ `now`. pub fn take_due(&self, now: DateTime) -> Vec { let mut timers = self.timers.lock().unwrap(); let mut due = Vec::new(); @@ -113,7 +215,16 @@ impl TimerStore { } *timers = remaining; if !due.is_empty() { - let _ = Self::save_locked(&self.path, &timers); + match &self.persistence { + TimerPersistence::Sqlite(tx) => { + for entry in &due { + let _ = tx.send(TimerWriteCmd::Delete(entry.story_id.clone())); + } + } + TimerPersistence::Json(path) => { + let _ = Self::save_json(path, &timers); + } + } } due } @@ -512,6 +623,29 @@ mod tests { assert_eq!(list[0].story_id, "421_story_foo"); } + #[tokio::test] + async fn from_pool_persists_and_reloads() { + let dir = TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let opts = sqlx::sqlite::SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap(); + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + + let t = Utc::now() + Duration::hours(2); + { + let store = TimerStore::from_pool(pool.clone()).await.unwrap(); + store.add("421_story_foo".to_string(), t).unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + + let store2 = TimerStore::from_pool(pool.clone()).await.unwrap(); + let list = store2.list(); + assert_eq!(list.len(), 1); + assert_eq!(list[0].story_id, "421_story_foo"); + } + #[test] fn take_due_returns_only_past_entries() { let dir = TempDir::new().unwrap(); diff --git a/server/src/service/timer/scheduled.rs b/server/src/service/timer/scheduled.rs index 399bee6e..9029108e 100644 --- a/server/src/service/timer/scheduled.rs +++ b/server/src/service/timer/scheduled.rs @@ -7,8 +7,10 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use sqlx::SqlitePool; use std::path::{Path, PathBuf}; use std::sync::Mutex; +use tokio::sync::mpsc; use uuid::Uuid; // ── Action ──────────────────────────────────────────────────────────────────── @@ -76,16 +78,138 @@ impl ScheduledTimer { } } +// ── Background writer ───────────────────────────────────────────────────────── + +enum SchedWriteCmd { + Upsert(ScheduledTimer), + Delete(String), +} + +fn spawn_sched_writer(pool: SqlitePool, mut rx: mpsc::UnboundedReceiver) { + tokio::spawn(async move { + while let Some(cmd) = rx.recv().await { + match cmd { + SchedWriteCmd::Upsert(t) => { + let action_json = match serde_json::to_string(&t.action) { + Ok(j) => j, + Err(e) => { + crate::slog!("[scheduled-timer] Serialize action failed: {e}"); + continue; + } + }; + let mode_json = match serde_json::to_string(&t.mode) { + Ok(j) => j, + Err(e) => { + crate::slog!("[scheduled-timer] Serialize mode failed: {e}"); + continue; + } + }; + let result = sqlx::query( + "INSERT INTO scheduled_timers \ + (id, label, fire_at, action_json, mode_json, created_at) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6) \ + ON CONFLICT(id) DO UPDATE SET \ + label = excluded.label, \ + fire_at = excluded.fire_at, \ + action_json = excluded.action_json, \ + mode_json = excluded.mode_json", + ) + .bind(&t.id) + .bind(&t.label) + .bind(t.fire_at.to_rfc3339()) + .bind(&action_json) + .bind(&mode_json) + .bind(t.created_at.to_rfc3339()) + .execute(&pool) + .await; + if let Err(e) = result { + crate::slog!("[scheduled-timer] DB upsert failed for '{}': {e}", t.id); + } + } + SchedWriteCmd::Delete(id) => { + let result = sqlx::query("DELETE FROM scheduled_timers WHERE id = ?1") + .bind(&id) + .execute(&pool) + .await; + if let Err(e) = result { + crate::slog!("[scheduled-timer] DB delete failed for '{id}': {e}"); + } + } + } + } + }); +} + // ── Store ───────────────────────────────────────────────────────────────────── -/// Persistent store for generic scheduled timers, backed by a JSON file. +enum SchedPersistence { + Sqlite(mpsc::UnboundedSender), + Json(PathBuf), +} + +/// Persistent store for generic scheduled timers, backed by SQLite in +/// production and a JSON file in tests. pub struct ScheduledTimerStore { - path: PathBuf, timers: Mutex>, + persistence: SchedPersistence, } impl ScheduledTimerStore { - /// Load (or create empty) store from `path`. + /// Load from the shared SQLite pool. This is the production constructor. + pub async fn from_pool(pool: SqlitePool) -> Result { + let rows: Vec<(String, Option, String, String, String, String)> = sqlx::query_as( + "SELECT id, label, fire_at, action_json, mode_json, created_at \ + FROM scheduled_timers", + ) + .fetch_all(&pool) + .await?; + + let mut timers = Vec::with_capacity(rows.len()); + for (id, label, fire_at_str, action_json, mode_json, created_at_str) in rows { + let fire_at = match fire_at_str.parse::>() { + Ok(t) => t, + Err(e) => { + crate::slog!("[scheduled-timer] Bad fire_at for {id}: {e}"); + continue; + } + }; + let action: TimerAction = match serde_json::from_str(&action_json) { + Ok(a) => a, + Err(e) => { + crate::slog!("[scheduled-timer] Bad action for {id}: {e}"); + continue; + } + }; + let mode: TimerMode = match serde_json::from_str(&mode_json) { + Ok(m) => m, + Err(e) => { + crate::slog!("[scheduled-timer] Bad mode for {id}: {e}"); + continue; + } + }; + let created_at = created_at_str + .parse::>() + .unwrap_or_else(|_| Utc::now()); + timers.push(ScheduledTimer { + id, + label, + fire_at, + action, + mode, + created_at, + }); + } + + let (tx, rx) = mpsc::unbounded_channel(); + spawn_sched_writer(pool, rx); + + Ok(Self { + timers: Mutex::new(timers), + persistence: SchedPersistence::Sqlite(tx), + }) + } + + /// Load (or create empty) store from a JSON file path. Used by unit tests. pub fn load(path: PathBuf) -> Self { let timers = if path.exists() { std::fs::read_to_string(&path) @@ -96,12 +220,12 @@ impl ScheduledTimerStore { Vec::new() }; Self { - path, timers: Mutex::new(timers), + persistence: SchedPersistence::Json(path), } } - fn save(path: &Path, timers: &[ScheduledTimer]) -> Result<(), String> { + fn save_json(path: &Path, timers: &[ScheduledTimer]) -> Result<(), String> { if let Some(parent) = path.parent() { std::fs::create_dir_all(parent).map_err(|e| format!("mkdir failed: {e}"))?; } @@ -116,8 +240,14 @@ impl ScheduledTimerStore { if timers.iter().any(|t| t.id == timer.id) { return Err(format!("Timer with id '{}' already exists", timer.id)); } - timers.push(timer); - Self::save(&self.path, &timers) + timers.push(timer.clone()); + match &self.persistence { + SchedPersistence::Sqlite(tx) => { + let _ = tx.send(SchedWriteCmd::Upsert(timer)); + Ok(()) + } + SchedPersistence::Json(path) => Self::save_json(path, &timers), + } } /// Remove a timer by ID. Returns `true` if one was removed. @@ -127,7 +257,14 @@ impl ScheduledTimerStore { timers.retain(|t| t.id != id); let removed = timers.len() < before; if removed { - let _ = Self::save(&self.path, &timers); + match &self.persistence { + SchedPersistence::Sqlite(tx) => { + let _ = tx.send(SchedWriteCmd::Delete(id.to_string())); + } + SchedPersistence::Json(path) => { + let _ = Self::save_json(path, &timers); + } + } } removed } @@ -151,7 +288,16 @@ impl ScheduledTimerStore { } *timers = remaining; if !due.is_empty() { - let _ = Self::save(&self.path, &timers); + match &self.persistence { + SchedPersistence::Sqlite(tx) => { + for entry in &due { + let _ = tx.send(SchedWriteCmd::Delete(entry.id.clone())); + } + } + SchedPersistence::Json(path) => { + let _ = Self::save_json(path, &timers); + } + } } due } @@ -412,6 +558,37 @@ mod tests { assert_eq!(due.len(), 1, "past timer must fire on next tick"); } + #[tokio::test] + async fn from_pool_persists_and_reloads() { + let dir = TempDir::new().unwrap(); + let db_path = dir.path().join("test.db"); + let opts = sqlx::sqlite::SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap(); + sqlx::query( + "CREATE TABLE IF NOT EXISTS scheduled_timers \ + (id TEXT PRIMARY KEY, label TEXT, fire_at TEXT NOT NULL, \ + action_json TEXT NOT NULL, mode_json TEXT NOT NULL, created_at TEXT NOT NULL)", + ) + .execute(&pool) + .await + .unwrap(); + + let t = Utc::now() + chrono::Duration::hours(1); + { + let store = ScheduledTimerStore::from_pool(pool.clone()).await.unwrap(); + store.add(make_timer("tm-aabbccdd", t)).unwrap(); + // Give the background writer a moment to flush. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + + let store2 = ScheduledTimerStore::from_pool(pool).await.unwrap(); + let list = store2.list(); + assert_eq!(list.len(), 1); + assert_eq!(list[0].id, "tm-aabbccdd"); + } + #[test] fn new_id_has_tm_prefix() { let id = ScheduledTimer::new_id(); diff --git a/server/src/startup/project.rs b/server/src/startup/project.rs index 7151c445..2c56e14b 100644 --- a/server/src/startup/project.rs +++ b/server/src/startup/project.rs @@ -107,6 +107,115 @@ pub(crate) async fn open_project_root( } } +/// One-shot migration: read existing JSON store files and insert their rows +/// into the SQLite tables created by the migration scripts, then rename the +/// originals to `*.migrated` so this function is a no-op on the next startup. +/// +/// Runs after `db::init()` so the tables already exist and the shared pool is +/// available. Errors are logged but never fatal. +async fn migrate_json_stores_to_sqlite(huskies_dir: &Path) { + let pool = match crate::db::get_shared_pool() { + Some(p) => p, + None => { + crate::slog!("[db-migrate] Shared pool not available; skipping JSON migration"); + return; + } + }; + + // ── event_triggers.json ─────────────────────────────────────────────────── + let et_path = huskies_dir.join("event_triggers.json"); + if et_path.exists() { + match std::fs::read_to_string(&et_path) { + Ok(s) => { + let triggers: Vec = + serde_json::from_str(&s).unwrap_or_default(); + for t in triggers { + let predicate_json = serde_json::to_string(&t.predicate).unwrap_or_default(); + let action_json = serde_json::to_string(&t.action).unwrap_or_default(); + let mode = match t.mode { + crate::service::event_triggers::FireMode::Once => "once", + crate::service::event_triggers::FireMode::Persistent => "persistent", + }; + let created_at = t.created_at.to_rfc3339(); + let _ = sqlx::query( + "INSERT OR IGNORE INTO event_triggers \ + (id, predicate_json, action_json, mode, created_at) \ + VALUES (?1, ?2, ?3, ?4, ?5)", + ) + .bind(&t.id) + .bind(&predicate_json) + .bind(&action_json) + .bind(mode) + .bind(&created_at) + .execute(pool) + .await; + } + let migrated = huskies_dir.join("event_triggers.json.migrated"); + let _ = std::fs::rename(&et_path, &migrated); + crate::slog!("[db-migrate] Migrated event_triggers.json → SQLite"); + } + Err(e) => crate::slog!("[db-migrate] Could not read event_triggers.json: {e}"), + } + } + + // ── timers.json ─────────────────────────────────────────────────────────── + let timers_path = huskies_dir.join("timers.json"); + if timers_path.exists() { + match std::fs::read_to_string(&timers_path) { + Ok(s) => { + let entries: Vec = + serde_json::from_str(&s).unwrap_or_default(); + for e in entries { + let _ = sqlx::query( + "INSERT OR IGNORE INTO timers (story_id, scheduled_at) \ + VALUES (?1, ?2)", + ) + .bind(&e.story_id) + .bind(e.scheduled_at.to_rfc3339()) + .execute(pool) + .await; + } + let migrated = huskies_dir.join("timers.json.migrated"); + let _ = std::fs::rename(&timers_path, &migrated); + crate::slog!("[db-migrate] Migrated timers.json → SQLite"); + } + Err(e) => crate::slog!("[db-migrate] Could not read timers.json: {e}"), + } + } + + // ── scheduled_timers.json ───────────────────────────────────────────────── + let st_path = huskies_dir.join("scheduled_timers.json"); + if st_path.exists() { + match std::fs::read_to_string(&st_path) { + Ok(s) => { + use crate::service::timer::scheduled::ScheduledTimer; + let entries: Vec = serde_json::from_str(&s).unwrap_or_default(); + for t in entries { + let action_json = serde_json::to_string(&t.action).unwrap_or_default(); + let mode_json = serde_json::to_string(&t.mode).unwrap_or_default(); + let _ = sqlx::query( + "INSERT OR IGNORE INTO scheduled_timers \ + (id, label, fire_at, action_json, mode_json, created_at) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + ) + .bind(&t.id) + .bind(&t.label) + .bind(t.fire_at.to_rfc3339()) + .bind(&action_json) + .bind(&mode_json) + .bind(t.created_at.to_rfc3339()) + .execute(pool) + .await; + } + let migrated = huskies_dir.join("scheduled_timers.json.migrated"); + let _ = std::fs::rename(&st_path, &migrated); + crate::slog!("[db-migrate] Migrated scheduled_timers.json → SQLite"); + } + Err(e) => crate::slog!("[db-migrate] Could not read scheduled_timers.json: {e}"), + } + } +} + /// Set up the server log file, node identity keypair, pipeline DB, and CRDT state. pub(crate) async fn init_subsystems(app_state: &Arc, cwd: &Path) { // Enable persistent server log file now that the project root is known. @@ -146,6 +255,10 @@ pub(crate) async fn init_subsystems(app_state: &Arc, cwd: &Path) { if let Some(ref db_path) = pipeline_db_path { if let Err(e) = db::init(db_path).await { crate::slog!("[db] Failed to initialise pipeline.db: {e}"); + } else { + // One-shot migration: move any existing JSON store files into SQLite. + let huskies_dir = db_path.parent().unwrap_or(db_path); + migrate_json_stores_to_sqlite(huskies_dir).await; } if let Err(e) = crdt_state::init(db_path).await { crate::slog!("[crdt] Failed to initialise CRDT state layer: {e}"); diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index 97248296..a2c2f19c 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -330,8 +330,9 @@ pub(crate) fn spawn_event_trigger_subscriber( Ok(f) => f, Err(broadcast::error::RecvError::Lagged(n)) => { crate::slog!( - "[event-triggers] Lagged {n} transition events; some triggers may have been skipped" + "[event-triggers] Lagged {n} transition events; replaying pipeline state to recover" ); + crate::pipeline_state::replay_current_pipeline_state(); continue; } Err(broadcast::error::RecvError::Closed) => {