huskies: merge 1061

This commit is contained in:
dave
2026-05-14 20:08:09 +00:00
parent 54d9737428
commit 5678f2a556
11 changed files with 752 additions and 82 deletions
@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS event_triggers (
id TEXT PRIMARY KEY,
predicate_json TEXT NOT NULL,
action_json TEXT NOT NULL,
mode TEXT NOT NULL,
created_at TEXT NOT NULL
);
@@ -0,0 +1,4 @@
CREATE TABLE IF NOT EXISTS timers (
story_id TEXT PRIMARY KEY,
scheduled_at TEXT NOT NULL
);
@@ -0,0 +1,8 @@
CREATE TABLE IF NOT EXISTS scheduled_timers (
id TEXT PRIMARY KEY,
label TEXT,
fire_at TEXT NOT NULL,
action_json TEXT NOT NULL,
mode_json TEXT NOT NULL,
created_at TEXT NOT NULL
);
+1 -1
View File
@@ -29,7 +29,7 @@ pub mod shadow_write;
pub use content_store::{ContentKey, all_content_ids, delete_content, read_content, write_content};
pub use ops::{ItemMeta, delete_item, move_item_stage, next_item_number, write_item_with_content};
pub use shadow_write::init;
pub use shadow_write::{get_shared_pool, init};
#[cfg(test)]
pub use content_store::ensure_content_store;
+19
View File
@@ -3,6 +3,10 @@
//! `init` opens the database, runs migrations, loads existing content into the
//! in-memory store, and spawns the write loop. All subsequent writes are sent
//! over an unbounded channel so callers never block on I/O.
//!
//! The opened pool is stored in [`SHARED_POOL`] so that other subsystems
//! (event-trigger store, timer store, scheduled-timer store) can share the
//! same database connection without re-opening the file.
use crate::slog;
use sqlx::SqlitePool;
use sqlx::sqlite::SqliteConnectOptions;
@@ -11,6 +15,18 @@ use std::path::Path;
use std::sync::OnceLock;
use tokio::sync::mpsc;
/// The process-global SQLite pool, set once by [`init`].
///
/// Other modules call [`get_shared_pool`] to access the pool without needing
/// to pass it through every call-site.
static SHARED_POOL: OnceLock<SqlitePool> = OnceLock::new();
/// Return a reference to the shared pipeline database pool, if it has been
/// initialised by a prior call to [`init`].
pub fn get_shared_pool() -> Option<&'static SqlitePool> {
SHARED_POOL.get()
}
/// A pending shadow write for one pipeline item.
pub(super) struct PipelineWriteMsg {
pub(super) story_id: String,
@@ -47,6 +63,9 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
let pool = SqlitePool::connect_with(options).await?;
sqlx::migrate!("./migrations").run(&pool).await?;
// Store pool in global static so other subsystems can reuse it.
let _ = SHARED_POOL.set(pool.clone());
// Load existing content into the in-memory store.
let rows: Vec<(String, Option<String>)> =
sqlx::query_as("SELECT id, content FROM pipeline_items WHERE content IS NOT NULL")
+24 -21
View File
@@ -274,34 +274,37 @@ async fn main() -> Result<(), std::io::Error> {
let matrix_shutdown_tx_for_rebuild = Arc::clone(&bot_ctxs.matrix_shutdown_tx);
// Shared rate-limit retry timer store.
let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load(
startup_root
.as_ref()
.map(|r| r.join(".huskies").join("timers.json"))
.unwrap_or_else(|| std::path::PathBuf::from("/tmp/huskies-timers.json")),
));
let timer_store = {
let pool = crate::db::get_shared_pool().expect("db pool must be initialised before stores");
std::sync::Arc::new(
crate::service::timer::TimerStore::from_pool(pool.clone())
.await
.expect("failed to load timer store from db"),
)
};
let timer_store_for_tick = Arc::clone(&timer_store);
let timer_store_for_bot = Arc::clone(&timer_store);
// Event-based pipeline trigger store.
let event_trigger_store = std::sync::Arc::new(
crate::service::event_triggers::store::EventTriggerStore::load(
startup_root
.as_ref()
.map(|r| r.join(".huskies").join("event_triggers.json"))
.unwrap_or_else(|| std::path::PathBuf::from("/tmp/huskies-event-triggers.json")),
),
);
let event_trigger_store = {
let pool = crate::db::get_shared_pool().expect("db pool must be initialised before stores");
std::sync::Arc::new(
crate::service::event_triggers::store::EventTriggerStore::from_pool(pool.clone())
.await
.expect("failed to load event trigger store from db"),
)
};
let event_trigger_store_for_subscriber = Arc::clone(&event_trigger_store);
// Generic scheduled-timer store for the `schedule_timer` MCP tool.
let scheduled_timer_store =
std::sync::Arc::new(crate::service::timer::ScheduledTimerStore::load(
startup_root
.as_ref()
.map(|r| r.join(".huskies").join("scheduled_timers.json"))
.unwrap_or_else(|| std::path::PathBuf::from("/tmp/huskies-scheduled-timers.json")),
));
let scheduled_timer_store = {
let pool = crate::db::get_shared_pool().expect("db pool must be initialised before stores");
std::sync::Arc::new(
crate::service::timer::ScheduledTimerStore::from_pool(pool.clone())
.await
.expect("failed to load scheduled timer store from db"),
)
};
let scheduled_timer_store_for_tick = Arc::clone(&scheduled_timer_store);
let ctx = AppContext {
+229 -25
View File
@@ -1,25 +1,164 @@
//! Persistent store for registered event triggers, backed by a JSON file.
//! Persistent store for registered event triggers, backed by SQLite (`pipeline.db`).
//!
//! Loaded at server startup and kept in sync on every mutation. Thread-safe
//! via an internal `Mutex`.
//! Production code uses [`EventTriggerStore::from_pool`] which loads from the
//! `event_triggers` table and persists every mutation via a background writer
//! task. Unit tests use [`EventTriggerStore::load`] which keeps state in
//! memory without SQLite (no file I/O in tests).
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::sync::Mutex;
use chrono::Utc;
use serde_json::Value;
use sqlx::SqlitePool;
use tokio::sync::mpsc;
use super::{EventTrigger, FireMode, TriggerAction, TriggerPredicate};
// ── Background writer ────────────────────────────────────────────────────────
/// Commands sent to the background SQLite writer.
enum WriteCmd {
Upsert(Box<EventTrigger>),
Delete(String),
}
fn spawn_writer(pool: SqlitePool, mut rx: mpsc::UnboundedReceiver<WriteCmd>) {
tokio::spawn(async move {
while let Some(cmd) = rx.recv().await {
match cmd {
WriteCmd::Upsert(t) => {
let predicate_json = match serde_json::to_string(&t.predicate) {
Ok(j) => j,
Err(e) => {
crate::slog!("[event-triggers] Failed to serialize predicate: {e}");
continue;
}
};
let action_json = match serde_json::to_string(&t.action) {
Ok(j) => j,
Err(e) => {
crate::slog!("[event-triggers] Failed to serialize action: {e}");
continue;
}
};
let mode_str = match t.mode {
FireMode::Once => "once",
FireMode::Persistent => "persistent",
};
let result = sqlx::query(
"INSERT INTO event_triggers \
(id, predicate_json, action_json, mode, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5) \
ON CONFLICT(id) DO UPDATE SET \
predicate_json = excluded.predicate_json, \
action_json = excluded.action_json, \
mode = excluded.mode",
)
.bind(&t.id)
.bind(&predicate_json)
.bind(&action_json)
.bind(mode_str)
.bind(t.created_at.to_rfc3339())
.execute(&pool)
.await;
if let Err(e) = result {
crate::slog!("[event-triggers] DB upsert failed for '{}': {e}", t.id);
}
}
WriteCmd::Delete(id) => {
let result = sqlx::query("DELETE FROM event_triggers WHERE id = ?1")
.bind(&id)
.execute(&pool)
.await;
if let Err(e) = result {
crate::slog!("[event-triggers] DB delete failed for '{id}': {e}");
}
}
}
}
});
}
// ── Store ────────────────────────────────────────────────────────────────────
/// Internal persistence backend.
enum Persistence {
/// SQLite-backed (production): mutations are sent via channel.
Sqlite(mpsc::UnboundedSender<WriteCmd>),
/// JSON-file-backed (tests): mutations write to disk synchronously.
Json(PathBuf),
}
/// Persistent store for [`EventTrigger`] entries.
pub struct EventTriggerStore {
path: PathBuf,
triggers: Mutex<Vec<EventTrigger>>,
persistence: Persistence,
}
impl EventTriggerStore {
/// Load the store from `path`. Returns an empty store if the file does
/// not exist or cannot be parsed.
/// Load from SQLite using the shared pipeline database pool.
///
/// Reads all existing rows from the `event_triggers` table, then spawns a
/// background task that handles subsequent writes. This is the production
/// constructor; tests use [`EventTriggerStore::load`].
pub async fn from_pool(pool: SqlitePool) -> Result<Self, sqlx::Error> {
let rows: Vec<(String, String, String, String, String)> = sqlx::query_as(
"SELECT id, predicate_json, action_json, mode, created_at \
FROM event_triggers",
)
.fetch_all(&pool)
.await?;
let mut triggers = Vec::with_capacity(rows.len());
for (id, pred_json, action_json, mode_str, created_at_str) in rows {
let predicate: TriggerPredicate = match serde_json::from_str(&pred_json) {
Ok(p) => p,
Err(e) => {
crate::slog!(
"[event-triggers] Skipping malformed predicate for trigger {id}: {e}"
);
continue;
}
};
let action: TriggerAction = match serde_json::from_str(&action_json) {
Ok(a) => a,
Err(e) => {
crate::slog!(
"[event-triggers] Skipping malformed action for trigger {id}: {e}"
);
continue;
}
};
let mode = if mode_str == "persistent" {
FireMode::Persistent
} else {
FireMode::Once
};
let created_at = created_at_str
.parse::<chrono::DateTime<Utc>>()
.unwrap_or_else(|_| Utc::now());
triggers.push(EventTrigger {
id,
predicate,
action,
mode,
created_at,
});
}
let (tx, rx) = mpsc::unbounded_channel();
spawn_writer(pool, rx);
Ok(Self {
triggers: Mutex::new(triggers),
persistence: Persistence::Sqlite(tx),
})
}
/// Load from a JSON file path. Used by unit tests; returns an in-memory
/// store backed by the file for persistence.
pub fn load(path: PathBuf) -> Self {
let triggers = if path.exists() {
std::fs::read_to_string(&path)
@@ -30,22 +169,21 @@ impl EventTriggerStore {
Vec::new()
};
Self {
path,
triggers: Mutex::new(triggers),
persistence: Persistence::Json(path),
}
}
fn persist(path: &Path, triggers: &[EventTrigger]) -> Result<(), String> {
fn persist_json(path: &std::path::Path, triggers: &[EventTrigger]) {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("Failed to create directory: {e}"))?;
let _ = std::fs::create_dir_all(parent);
}
if let Ok(json) = serde_json::to_string_pretty(triggers) {
let _ = std::fs::write(path, json);
}
let json = serde_json::to_string_pretty(triggers)
.map_err(|e| format!("Serialization error: {e}"))?;
std::fs::write(path, json).map_err(|e| format!("Failed to write triggers: {e}"))
}
/// Register a new trigger and persist to disk.
/// Register a new trigger and persist it.
pub fn add(
&self,
predicate: TriggerPredicate,
@@ -61,7 +199,30 @@ impl EventTriggerStore {
};
let mut triggers = self.triggers.lock().unwrap();
triggers.push(trigger.clone());
Self::persist(&self.path, &triggers)?;
match &self.persistence {
Persistence::Sqlite(tx) => {
let _ = tx.send(WriteCmd::Upsert(Box::new(trigger.clone())));
}
Persistence::Json(path) => {
Self::persist_json(path, &triggers);
}
}
let mode_label = match trigger.mode {
FireMode::Once => "once",
FireMode::Persistent => "persistent",
};
crate::slog!(
"[event-triggers] Registered trigger {} | predicate={:?} | mode={mode_label}",
trigger.id,
trigger
.predicate
.to_stage
.as_deref()
.or(trigger.predicate.event_kind.as_deref())
.unwrap_or("*"),
);
Ok(trigger)
}
@@ -77,14 +238,22 @@ impl EventTriggerStore {
triggers.retain(|t| t.id != id);
let removed = triggers.len() < before;
if removed {
let _ = Self::persist(&self.path, &triggers);
match &self.persistence {
Persistence::Sqlite(tx) => {
let _ = tx.send(WriteCmd::Delete(id.to_string()));
}
Persistence::Json(path) => {
Self::persist_json(path, &triggers);
}
}
crate::slog!("[event-triggers] Cancelled trigger {id} | reason=manual");
}
removed
}
/// Remove all triggers whose ids are in `ids` and return how many were removed.
///
/// Used by the subscriber to delete `Once` triggers after they fire.
/// Remove all triggers whose ids are in `ids` and return how many were
/// removed. Used by the subscriber to delete `Once` triggers after they
/// fire.
pub fn cancel_batch(&self, ids: &[String]) -> usize {
if ids.is_empty() {
return 0;
@@ -94,16 +263,25 @@ impl EventTriggerStore {
triggers.retain(|t| !ids.contains(&t.id));
let removed = before - triggers.len();
if removed > 0 {
let _ = Self::persist(&self.path, &triggers);
match &self.persistence {
Persistence::Sqlite(tx) => {
for id in ids {
let _ = tx.send(WriteCmd::Delete(id.clone()));
}
}
Persistence::Json(path) => {
Self::persist_json(path, &triggers);
}
}
for id in ids {
crate::slog!("[event-triggers] Cancelled trigger {id} | reason=once");
}
}
removed
}
}
/// Generate a random UUIDv4-style identifier without pulling in the full uuid crate.
///
/// Uses [`std::time`] entropy mixed with a thread-local counter. Not cryptographically
/// strong, but unique enough for trigger IDs.
/// Generate a random UUIDv4-style identifier.
fn uuid_v4() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
@@ -263,6 +441,32 @@ mod tests {
assert_eq!(list[0].id, id);
}
#[tokio::test]
async fn from_pool_persists_and_reloads() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("test.db");
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
let id = {
let store = EventTriggerStore::from_pool(pool.clone()).await.unwrap();
let t = store
.add(basic_pred(), basic_action(), FireMode::Persistent)
.unwrap();
// Give background writer time to flush.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
t.id.clone()
};
let store2 = EventTriggerStore::from_pool(pool.clone()).await.unwrap();
let list = store2.list();
assert_eq!(list.len(), 1);
assert_eq!(list[0].id, id);
}
#[test]
fn store_cancel_batch() {
let (_dir, store) = tmp_store();
+159 -25
View File
@@ -1,27 +1,99 @@
//! I/O side of the timer service: filesystem persistence, clock reads,
//! I/O side of the timer service: SQLite persistence, clock reads,
//! background task spawning, and story-ID resolution.
//!
//! This is the **only** place inside `service/timer/` that may perform side
//! effects (filesystem reads/writes, clock reads, `tokio::spawn`).
use chrono::{DateTime, Utc};
use sqlx::SqlitePool;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use super::persist::{TimerEntry, deserialize_timers, serialize_timers};
use super::schedule::next_occurrence_at;
// ── TimerStore background writer ─────────────────────────────────────────────
enum TimerWriteCmd {
Upsert(TimerEntry),
Delete(String),
}
fn spawn_timer_writer(pool: SqlitePool, mut rx: mpsc::UnboundedReceiver<TimerWriteCmd>) {
tokio::spawn(async move {
while let Some(cmd) = rx.recv().await {
match cmd {
TimerWriteCmd::Upsert(e) => {
let result = sqlx::query(
"INSERT INTO timers (story_id, scheduled_at) VALUES (?1, ?2) \
ON CONFLICT(story_id) DO UPDATE SET scheduled_at = excluded.scheduled_at",
)
.bind(&e.story_id)
.bind(e.scheduled_at.to_rfc3339())
.execute(&pool)
.await;
if let Err(e) = result {
crate::slog!("[timer] DB upsert failed: {e}");
}
}
TimerWriteCmd::Delete(story_id) => {
let result = sqlx::query("DELETE FROM timers WHERE story_id = ?1")
.bind(&story_id)
.execute(&pool)
.await;
if let Err(e) = result {
crate::slog!("[timer] DB delete failed for '{story_id}': {e}");
}
}
}
}
});
}
// ── TimerStore ─────────────────────────────────────────────────────────────
/// Persistent store for pending timers, backed by a JSON file.
enum TimerPersistence {
Sqlite(mpsc::UnboundedSender<TimerWriteCmd>),
Json(PathBuf),
}
/// Persistent store for pending timers, backed by SQLite in production and
/// a JSON file in tests.
pub struct TimerStore {
path: PathBuf,
timers: Mutex<Vec<TimerEntry>>,
persistence: TimerPersistence,
}
impl TimerStore {
/// Load the timer store from `path`. Returns an empty store if the file
/// does not exist or cannot be parsed.
/// Load from the shared SQLite pool. This is the production constructor.
pub async fn from_pool(pool: SqlitePool) -> Result<Self, sqlx::Error> {
let rows: Vec<(String, String)> =
sqlx::query_as("SELECT story_id, scheduled_at FROM timers")
.fetch_all(&pool)
.await?;
let timers = rows
.into_iter()
.filter_map(|(story_id, scheduled_at_str)| {
let scheduled_at = scheduled_at_str.parse::<DateTime<Utc>>().ok()?;
Some(TimerEntry {
story_id,
scheduled_at,
})
})
.collect();
let (tx, rx) = mpsc::unbounded_channel();
spawn_timer_writer(pool, rx);
Ok(Self {
timers: Mutex::new(timers),
persistence: TimerPersistence::Sqlite(tx),
})
}
/// Load the timer store from a JSON file path. Used by unit tests.
pub fn load(path: PathBuf) -> Self {
let timers = if path.exists() {
std::fs::read_to_string(&path)
@@ -32,12 +104,12 @@ impl TimerStore {
Vec::new()
};
Self {
path,
timers: Mutex::new(timers),
persistence: TimerPersistence::Json(path),
}
}
fn save_locked(path: &Path, timers: &[TimerEntry]) -> Result<(), String> {
fn save_json(path: &Path, timers: &[TimerEntry]) -> Result<(), String> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("Failed to create directory: {e}"))?;
@@ -46,14 +118,23 @@ impl TimerStore {
std::fs::write(path, content).map_err(|e| format!("Failed to write timers: {e}"))
}
/// Add a timer and persist to disk.
/// Add a timer and persist.
pub fn add(&self, story_id: String, scheduled_at: DateTime<Utc>) -> Result<(), String> {
let mut timers = self.timers.lock().unwrap();
timers.push(TimerEntry {
story_id,
story_id: story_id.clone(),
scheduled_at,
});
Self::save_locked(&self.path, &timers)
match &self.persistence {
TimerPersistence::Sqlite(tx) => {
let _ = tx.send(TimerWriteCmd::Upsert(TimerEntry {
story_id,
scheduled_at,
}));
Ok(())
}
TimerPersistence::Json(path) => Self::save_json(path, &timers),
}
}
/// Remove the timer for `story_id`. Returns `true` if one was removed.
@@ -63,7 +144,14 @@ impl TimerStore {
timers.retain(|t| t.story_id != story_id);
let removed = timers.len() < before;
if removed {
let _ = Self::save_locked(&self.path, &timers);
match &self.persistence {
TimerPersistence::Sqlite(tx) => {
let _ = tx.send(TimerWriteCmd::Delete(story_id.to_string()));
}
TimerPersistence::Json(path) => {
let _ = Self::save_json(path, &timers);
}
}
}
removed
}
@@ -73,33 +161,47 @@ impl TimerStore {
self.timers.lock().unwrap().clone()
}
/// Add or update a timer for `story_id`.
///
/// - If no timer exists for `story_id`, adds it.
/// - If a timer already exists and `scheduled_at` is **later**, updates it.
/// - If a timer already exists and `scheduled_at` is earlier or equal, no-op.
///
/// Use this instead of [`add`] when auto-scheduling from rate-limit events to
/// avoid creating duplicates and to always keep the latest reset time.
/// Add or update a timer for `story_id`. When an existing timer has an
/// earlier `scheduled_at`, it is updated to the later value. When it is
/// already later, this is a no-op.
pub fn upsert(&self, story_id: String, scheduled_at: DateTime<Utc>) -> Result<(), String> {
let mut timers = self.timers.lock().unwrap();
if let Some(existing) = timers.iter_mut().find(|t| t.story_id == story_id) {
if scheduled_at > existing.scheduled_at {
existing.scheduled_at = scheduled_at;
Self::save_locked(&self.path, &timers)?;
match &self.persistence {
TimerPersistence::Sqlite(tx) => {
let _ = tx.send(TimerWriteCmd::Upsert(TimerEntry {
story_id,
scheduled_at,
}));
}
TimerPersistence::Json(path) => {
Self::save_json(path, &timers)?;
}
}
}
} else {
timers.push(TimerEntry {
story_id,
story_id: story_id.clone(),
scheduled_at,
});
Self::save_locked(&self.path, &timers)?;
match &self.persistence {
TimerPersistence::Sqlite(tx) => {
let _ = tx.send(TimerWriteCmd::Upsert(TimerEntry {
story_id,
scheduled_at,
}));
}
TimerPersistence::Json(path) => {
Self::save_json(path, &timers)?;
}
}
}
Ok(())
}
/// Remove and return all timers whose `scheduled_at` is ≤ `now`.
/// Persists the updated list to disk if any timers were removed.
/// Remove and return all timers whose `scheduled_at` ≤ `now`.
pub fn take_due(&self, now: DateTime<Utc>) -> Vec<TimerEntry> {
let mut timers = self.timers.lock().unwrap();
let mut due = Vec::new();
@@ -113,7 +215,16 @@ impl TimerStore {
}
*timers = remaining;
if !due.is_empty() {
let _ = Self::save_locked(&self.path, &timers);
match &self.persistence {
TimerPersistence::Sqlite(tx) => {
for entry in &due {
let _ = tx.send(TimerWriteCmd::Delete(entry.story_id.clone()));
}
}
TimerPersistence::Json(path) => {
let _ = Self::save_json(path, &timers);
}
}
}
due
}
@@ -512,6 +623,29 @@ mod tests {
assert_eq!(list[0].story_id, "421_story_foo");
}
#[tokio::test]
async fn from_pool_persists_and_reloads() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("test.db");
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
let t = Utc::now() + Duration::hours(2);
{
let store = TimerStore::from_pool(pool.clone()).await.unwrap();
store.add("421_story_foo".to_string(), t).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
let store2 = TimerStore::from_pool(pool.clone()).await.unwrap();
let list = store2.list();
assert_eq!(list.len(), 1);
assert_eq!(list[0].story_id, "421_story_foo");
}
#[test]
fn take_due_returns_only_past_entries() {
let dir = TempDir::new().unwrap();
+186 -9
View File
@@ -7,8 +7,10 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::SqlitePool;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use tokio::sync::mpsc;
use uuid::Uuid;
// ── Action ────────────────────────────────────────────────────────────────────
@@ -76,16 +78,138 @@ impl ScheduledTimer {
}
}
// ── Background writer ─────────────────────────────────────────────────────────
enum SchedWriteCmd {
Upsert(ScheduledTimer),
Delete(String),
}
fn spawn_sched_writer(pool: SqlitePool, mut rx: mpsc::UnboundedReceiver<SchedWriteCmd>) {
tokio::spawn(async move {
while let Some(cmd) = rx.recv().await {
match cmd {
SchedWriteCmd::Upsert(t) => {
let action_json = match serde_json::to_string(&t.action) {
Ok(j) => j,
Err(e) => {
crate::slog!("[scheduled-timer] Serialize action failed: {e}");
continue;
}
};
let mode_json = match serde_json::to_string(&t.mode) {
Ok(j) => j,
Err(e) => {
crate::slog!("[scheduled-timer] Serialize mode failed: {e}");
continue;
}
};
let result = sqlx::query(
"INSERT INTO scheduled_timers \
(id, label, fire_at, action_json, mode_json, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6) \
ON CONFLICT(id) DO UPDATE SET \
label = excluded.label, \
fire_at = excluded.fire_at, \
action_json = excluded.action_json, \
mode_json = excluded.mode_json",
)
.bind(&t.id)
.bind(&t.label)
.bind(t.fire_at.to_rfc3339())
.bind(&action_json)
.bind(&mode_json)
.bind(t.created_at.to_rfc3339())
.execute(&pool)
.await;
if let Err(e) = result {
crate::slog!("[scheduled-timer] DB upsert failed for '{}': {e}", t.id);
}
}
SchedWriteCmd::Delete(id) => {
let result = sqlx::query("DELETE FROM scheduled_timers WHERE id = ?1")
.bind(&id)
.execute(&pool)
.await;
if let Err(e) = result {
crate::slog!("[scheduled-timer] DB delete failed for '{id}': {e}");
}
}
}
}
});
}
// ── Store ─────────────────────────────────────────────────────────────────────
/// Persistent store for generic scheduled timers, backed by a JSON file.
enum SchedPersistence {
Sqlite(mpsc::UnboundedSender<SchedWriteCmd>),
Json(PathBuf),
}
/// Persistent store for generic scheduled timers, backed by SQLite in
/// production and a JSON file in tests.
pub struct ScheduledTimerStore {
path: PathBuf,
timers: Mutex<Vec<ScheduledTimer>>,
persistence: SchedPersistence,
}
impl ScheduledTimerStore {
/// Load (or create empty) store from `path`.
/// Load from the shared SQLite pool. This is the production constructor.
pub async fn from_pool(pool: SqlitePool) -> Result<Self, sqlx::Error> {
let rows: Vec<(String, Option<String>, String, String, String, String)> = sqlx::query_as(
"SELECT id, label, fire_at, action_json, mode_json, created_at \
FROM scheduled_timers",
)
.fetch_all(&pool)
.await?;
let mut timers = Vec::with_capacity(rows.len());
for (id, label, fire_at_str, action_json, mode_json, created_at_str) in rows {
let fire_at = match fire_at_str.parse::<DateTime<Utc>>() {
Ok(t) => t,
Err(e) => {
crate::slog!("[scheduled-timer] Bad fire_at for {id}: {e}");
continue;
}
};
let action: TimerAction = match serde_json::from_str(&action_json) {
Ok(a) => a,
Err(e) => {
crate::slog!("[scheduled-timer] Bad action for {id}: {e}");
continue;
}
};
let mode: TimerMode = match serde_json::from_str(&mode_json) {
Ok(m) => m,
Err(e) => {
crate::slog!("[scheduled-timer] Bad mode for {id}: {e}");
continue;
}
};
let created_at = created_at_str
.parse::<DateTime<Utc>>()
.unwrap_or_else(|_| Utc::now());
timers.push(ScheduledTimer {
id,
label,
fire_at,
action,
mode,
created_at,
});
}
let (tx, rx) = mpsc::unbounded_channel();
spawn_sched_writer(pool, rx);
Ok(Self {
timers: Mutex::new(timers),
persistence: SchedPersistence::Sqlite(tx),
})
}
/// Load (or create empty) store from a JSON file path. Used by unit tests.
pub fn load(path: PathBuf) -> Self {
let timers = if path.exists() {
std::fs::read_to_string(&path)
@@ -96,12 +220,12 @@ impl ScheduledTimerStore {
Vec::new()
};
Self {
path,
timers: Mutex::new(timers),
persistence: SchedPersistence::Json(path),
}
}
fn save(path: &Path, timers: &[ScheduledTimer]) -> Result<(), String> {
fn save_json(path: &Path, timers: &[ScheduledTimer]) -> Result<(), String> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(|e| format!("mkdir failed: {e}"))?;
}
@@ -116,8 +240,14 @@ impl ScheduledTimerStore {
if timers.iter().any(|t| t.id == timer.id) {
return Err(format!("Timer with id '{}' already exists", timer.id));
}
timers.push(timer);
Self::save(&self.path, &timers)
timers.push(timer.clone());
match &self.persistence {
SchedPersistence::Sqlite(tx) => {
let _ = tx.send(SchedWriteCmd::Upsert(timer));
Ok(())
}
SchedPersistence::Json(path) => Self::save_json(path, &timers),
}
}
/// Remove a timer by ID. Returns `true` if one was removed.
@@ -127,7 +257,14 @@ impl ScheduledTimerStore {
timers.retain(|t| t.id != id);
let removed = timers.len() < before;
if removed {
let _ = Self::save(&self.path, &timers);
match &self.persistence {
SchedPersistence::Sqlite(tx) => {
let _ = tx.send(SchedWriteCmd::Delete(id.to_string()));
}
SchedPersistence::Json(path) => {
let _ = Self::save_json(path, &timers);
}
}
}
removed
}
@@ -151,7 +288,16 @@ impl ScheduledTimerStore {
}
*timers = remaining;
if !due.is_empty() {
let _ = Self::save(&self.path, &timers);
match &self.persistence {
SchedPersistence::Sqlite(tx) => {
for entry in &due {
let _ = tx.send(SchedWriteCmd::Delete(entry.id.clone()));
}
}
SchedPersistence::Json(path) => {
let _ = Self::save_json(path, &timers);
}
}
}
due
}
@@ -412,6 +558,37 @@ mod tests {
assert_eq!(due.len(), 1, "past timer must fire on next tick");
}
#[tokio::test]
async fn from_pool_persists_and_reloads() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("test.db");
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS scheduled_timers \
(id TEXT PRIMARY KEY, label TEXT, fire_at TEXT NOT NULL, \
action_json TEXT NOT NULL, mode_json TEXT NOT NULL, created_at TEXT NOT NULL)",
)
.execute(&pool)
.await
.unwrap();
let t = Utc::now() + chrono::Duration::hours(1);
{
let store = ScheduledTimerStore::from_pool(pool.clone()).await.unwrap();
store.add(make_timer("tm-aabbccdd", t)).unwrap();
// Give the background writer a moment to flush.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
let store2 = ScheduledTimerStore::from_pool(pool).await.unwrap();
let list = store2.list();
assert_eq!(list.len(), 1);
assert_eq!(list[0].id, "tm-aabbccdd");
}
#[test]
fn new_id_has_tm_prefix() {
let id = ScheduledTimer::new_id();
+113
View File
@@ -107,6 +107,115 @@ pub(crate) async fn open_project_root(
}
}
/// One-shot migration: read existing JSON store files and insert their rows
/// into the SQLite tables created by the migration scripts, then rename the
/// originals to `*.migrated` so this function is a no-op on the next startup.
///
/// Runs after `db::init()` so the tables already exist and the shared pool is
/// available. Errors are logged but never fatal.
async fn migrate_json_stores_to_sqlite(huskies_dir: &Path) {
let pool = match crate::db::get_shared_pool() {
Some(p) => p,
None => {
crate::slog!("[db-migrate] Shared pool not available; skipping JSON migration");
return;
}
};
// ── event_triggers.json ───────────────────────────────────────────────────
let et_path = huskies_dir.join("event_triggers.json");
if et_path.exists() {
match std::fs::read_to_string(&et_path) {
Ok(s) => {
let triggers: Vec<crate::service::event_triggers::EventTrigger> =
serde_json::from_str(&s).unwrap_or_default();
for t in triggers {
let predicate_json = serde_json::to_string(&t.predicate).unwrap_or_default();
let action_json = serde_json::to_string(&t.action).unwrap_or_default();
let mode = match t.mode {
crate::service::event_triggers::FireMode::Once => "once",
crate::service::event_triggers::FireMode::Persistent => "persistent",
};
let created_at = t.created_at.to_rfc3339();
let _ = sqlx::query(
"INSERT OR IGNORE INTO event_triggers \
(id, predicate_json, action_json, mode, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5)",
)
.bind(&t.id)
.bind(&predicate_json)
.bind(&action_json)
.bind(mode)
.bind(&created_at)
.execute(pool)
.await;
}
let migrated = huskies_dir.join("event_triggers.json.migrated");
let _ = std::fs::rename(&et_path, &migrated);
crate::slog!("[db-migrate] Migrated event_triggers.json → SQLite");
}
Err(e) => crate::slog!("[db-migrate] Could not read event_triggers.json: {e}"),
}
}
// ── timers.json ───────────────────────────────────────────────────────────
let timers_path = huskies_dir.join("timers.json");
if timers_path.exists() {
match std::fs::read_to_string(&timers_path) {
Ok(s) => {
let entries: Vec<crate::service::timer::TimerEntry> =
serde_json::from_str(&s).unwrap_or_default();
for e in entries {
let _ = sqlx::query(
"INSERT OR IGNORE INTO timers (story_id, scheduled_at) \
VALUES (?1, ?2)",
)
.bind(&e.story_id)
.bind(e.scheduled_at.to_rfc3339())
.execute(pool)
.await;
}
let migrated = huskies_dir.join("timers.json.migrated");
let _ = std::fs::rename(&timers_path, &migrated);
crate::slog!("[db-migrate] Migrated timers.json → SQLite");
}
Err(e) => crate::slog!("[db-migrate] Could not read timers.json: {e}"),
}
}
// ── scheduled_timers.json ─────────────────────────────────────────────────
let st_path = huskies_dir.join("scheduled_timers.json");
if st_path.exists() {
match std::fs::read_to_string(&st_path) {
Ok(s) => {
use crate::service::timer::scheduled::ScheduledTimer;
let entries: Vec<ScheduledTimer> = serde_json::from_str(&s).unwrap_or_default();
for t in entries {
let action_json = serde_json::to_string(&t.action).unwrap_or_default();
let mode_json = serde_json::to_string(&t.mode).unwrap_or_default();
let _ = sqlx::query(
"INSERT OR IGNORE INTO scheduled_timers \
(id, label, fire_at, action_json, mode_json, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
)
.bind(&t.id)
.bind(&t.label)
.bind(t.fire_at.to_rfc3339())
.bind(&action_json)
.bind(&mode_json)
.bind(t.created_at.to_rfc3339())
.execute(pool)
.await;
}
let migrated = huskies_dir.join("scheduled_timers.json.migrated");
let _ = std::fs::rename(&st_path, &migrated);
crate::slog!("[db-migrate] Migrated scheduled_timers.json → SQLite");
}
Err(e) => crate::slog!("[db-migrate] Could not read scheduled_timers.json: {e}"),
}
}
}
/// Set up the server log file, node identity keypair, pipeline DB, and CRDT state.
pub(crate) async fn init_subsystems(app_state: &Arc<SessionState>, cwd: &Path) {
// Enable persistent server log file now that the project root is known.
@@ -146,6 +255,10 @@ pub(crate) async fn init_subsystems(app_state: &Arc<SessionState>, cwd: &Path) {
if let Some(ref db_path) = pipeline_db_path {
if let Err(e) = db::init(db_path).await {
crate::slog!("[db] Failed to initialise pipeline.db: {e}");
} else {
// One-shot migration: move any existing JSON store files into SQLite.
let huskies_dir = db_path.parent().unwrap_or(db_path);
migrate_json_stores_to_sqlite(huskies_dir).await;
}
if let Err(e) = crdt_state::init(db_path).await {
crate::slog!("[crdt] Failed to initialise CRDT state layer: {e}");
+2 -1
View File
@@ -330,8 +330,9 @@ pub(crate) fn spawn_event_trigger_subscriber(
Ok(f) => f,
Err(broadcast::error::RecvError::Lagged(n)) => {
crate::slog!(
"[event-triggers] Lagged {n} transition events; some triggers may have been skipped"
"[event-triggers] Lagged {n} transition events; replaying pipeline state to recover"
);
crate::pipeline_state::replay_current_pipeline_state();
continue;
}
Err(broadcast::error::RecvError::Closed) => {