//! Background shadow-write task — persists pipeline items to SQLite asynchronously. //! //! `init` opens the database, runs migrations, loads existing content into the //! in-memory store, and spawns the write loop. All subsequent writes are sent //! over an unbounded channel so callers never block on I/O. //! //! The opened pool is stored in [`SHARED_POOL`] so that other subsystems //! (event-trigger store, timer store, scheduled-timer store) can share the //! same database connection without re-opening the file. use crate::slog; use sqlx::SqlitePool; use sqlx::sqlite::SqliteConnectOptions; use std::collections::HashMap; use std::collections::HashSet; use std::path::Path; use std::sync::OnceLock; use tokio::sync::mpsc; /// One migration row in the live database that is not in the compiled-in set. /// /// Returned by [`check_schema_drift`] for each unknown migration. pub struct UnknownMigration { /// sqlx migration version number (derived from the filename timestamp). pub version: i64, /// Human-readable description from the migration filename. pub description: String, /// When the migration was applied, as stored in `_sqlx_migrations.installed_on`. pub installed_on: String, } /// The process-global SQLite pool, set once by [`init`]. /// /// Other modules call [`get_shared_pool`] to access the pool without needing /// to pass it through every call-site. static SHARED_POOL: OnceLock = OnceLock::new(); /// Return a reference to the shared pipeline database pool, if it has been /// initialised by a prior call to [`init`]. pub fn get_shared_pool() -> Option<&'static SqlitePool> { SHARED_POOL.get() } /// A pending shadow write for one pipeline item. pub(super) struct PipelineWriteMsg { pub(super) story_id: String, pub(super) stage: String, pub(super) name: Option, pub(super) agent: Option, pub(super) retry_count: Option, pub(super) depends_on: Option, pub(super) content: Option, } /// Handle to the background shadow-write task. pub struct PipelineDb { pub(super) tx: mpsc::UnboundedSender, } /// Process-global handle to the background shadow-write task, set once during `init`. pub(super) static PIPELINE_DB: OnceLock = OnceLock::new(); /// Initialise the pipeline database. /// /// Opens (or creates) the SQLite file at `db_path`, runs embedded migrations, /// loads existing story content into the in-memory store, and spawns the /// background write task. Safe to call only once; subsequent calls are no-ops. pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { if PIPELINE_DB.get().is_some() { return Ok(()); } // Story 1087: before running the migration that splits `stage` into // (`pipeline`, `status`), take a timestamped side-car copy of the live DB // so the pre-split state is recoverable. Skip the copy when the file does // not yet exist (fresh installs) or when the split-stage migration has // already been applied (subsequent restarts). backup_pre_pipeline_status(db_path).await; let options = SqliteConnectOptions::new() .filename(db_path) .create_if_missing(true); let pool = SqlitePool::connect_with(options).await?; sqlx::migrate!("./migrations").run(&pool).await?; // Store pool in global static so other subsystems can reuse it. let _ = SHARED_POOL.set(pool.clone()); // Load existing content into the in-memory store. let rows: Vec<(String, Option)> = sqlx::query_as("SELECT id, content FROM pipeline_items WHERE content IS NOT NULL") .fetch_all(&pool) .await?; let mut content_map = HashMap::new(); for (id, content) in rows { if let Some(c) = content { content_map.insert(id, c); } } super::content_store::init_content_store(content_map); let (tx, mut rx) = mpsc::unbounded_channel::(); tokio::spawn(async move { while let Some(msg) = rx.recv().await { // The "deleted" sentinel means the caller wants the row gone. // Issue a real DELETE so the shadow table stays clean and // sync_crdt_stages_from_db cannot resurrect a tombstoned item on // the next restart. if msg.stage == "deleted" { let result = sqlx::query("DELETE FROM pipeline_items WHERE id = ?1") .bind(&msg.story_id) .execute(&pool) .await; if let Err(e) = result { slog!("[db] Shadow delete failed for '{}': {e}", msg.story_id); } continue; } let now = chrono::Utc::now().to_rfc3339(); let result = sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \ ON CONFLICT(id) DO UPDATE SET \ name = excluded.name, \ stage = excluded.stage, \ agent = excluded.agent, \ retry_count = excluded.retry_count, \ depends_on = excluded.depends_on, \ content = COALESCE(excluded.content, pipeline_items.content), \ updated_at = excluded.updated_at", ) .bind(&msg.story_id) .bind(&msg.name) .bind(&msg.stage) .bind(&msg.agent) .bind(msg.retry_count) .bind(&msg.depends_on) .bind(&msg.content) .bind(&now) .execute(&pool) .await; if let Err(e) = result { slog!("[db] Shadow write failed for '{}': {e}", msg.story_id); } } }); let _ = PIPELINE_DB.set(PipelineDb { tx }); Ok(()) } /// Story 1087: file name of the split-stage migration. The version prefix is /// the same `i64` sqlx assigns to that migration on `installed_on` rows in /// `_sqlx_migrations`. const SPLIT_STAGE_MIGRATION_VERSION: i64 = 20260515000000; /// Story 1087: take a timestamped side-car copy of `pipeline.db` if and only if /// the split-stage migration has not yet been applied. This is the AC1 backup /// — `pipeline.db.pre-pipeline-status..bak` next to the live file. /// /// Failures are logged but never propagated: a missing backup must not block /// the server from starting (a corrupt source file or a read-only directory /// will be surfaced by the migration step itself). pub(crate) async fn backup_pre_pipeline_status(db_path: &Path) { if !db_path.exists() { return; } // Cheap pre-check: open the DB read-only and see whether the split-stage // migration version is recorded in `_sqlx_migrations`. If it is, the // backup has already been taken on a previous start and there is nothing // to do. let options = SqliteConnectOptions::new() .filename(db_path) .read_only(true) .create_if_missing(false); let probe = SqlitePool::connect_with(options).await; if let Ok(pool) = probe { let already_split: Result, _> = sqlx::query_as("SELECT version FROM _sqlx_migrations WHERE version = ?1 LIMIT 1") .bind(SPLIT_STAGE_MIGRATION_VERSION) .fetch_optional(&pool) .await; pool.close().await; if let Ok(Some(_)) = already_split { return; } } let ts = chrono::Utc::now().timestamp(); let mut backup = db_path.as_os_str().to_owned(); backup.push(format!(".pre-pipeline-status.{ts}.bak")); let backup_path = std::path::PathBuf::from(backup); match tokio::fs::copy(db_path, &backup_path).await { Ok(_) => slog!( "[db] Wrote pre-pipeline-status backup of {} to {}", db_path.display(), backup_path.display(), ), Err(e) => slog!( "[db] Failed to write pre-pipeline-status backup of {}: {e}", db_path.display(), ), } } /// Compare the live `_sqlx_migrations` table against the compiled-in migration /// set and return any rows whose version is not known to this binary. /// /// A non-empty result means the database was previously opened by a newer /// binary that applied additional migrations. The server must refuse to start /// in that state because the schema may contain tables or columns that this /// binary does not understand. pub async fn check_schema_drift(pool: &SqlitePool) -> Vec { let migrator = sqlx::migrate!("./migrations"); let known: HashSet = migrator.migrations.iter().map(|m| m.version).collect(); let rows: Vec<(i64, String, String)> = sqlx::query_as( "SELECT version, description, installed_on FROM _sqlx_migrations ORDER BY version", ) .fetch_all(pool) .await .unwrap_or_default(); rows.into_iter() .filter(|(v, _, _)| !known.contains(v)) .map(|(version, description, installed_on)| UnknownMigration { version, description, installed_on, }) .collect() }