//! Background shadow-write task — persists pipeline items to SQLite asynchronously. //! //! `init` opens the database, runs migrations, loads existing content into the //! in-memory store, and spawns the write loop. All subsequent writes are sent //! over an unbounded channel so callers never block on I/O. use crate::slog; use sqlx::SqlitePool; use sqlx::sqlite::SqliteConnectOptions; use std::collections::HashMap; use std::path::Path; use std::sync::OnceLock; use tokio::sync::mpsc; /// A pending shadow write for one pipeline item. pub(super) struct PipelineWriteMsg { pub(super) story_id: String, pub(super) stage: String, pub(super) name: Option, pub(super) agent: Option, pub(super) retry_count: Option, pub(super) depends_on: Option, pub(super) content: Option, } /// Handle to the background shadow-write task. pub struct PipelineDb { pub(super) tx: mpsc::UnboundedSender, } /// Process-global handle to the background shadow-write task, set once during `init`. pub(super) static PIPELINE_DB: OnceLock = OnceLock::new(); /// Initialise the pipeline database. /// /// Opens (or creates) the SQLite file at `db_path`, runs embedded migrations, /// loads existing story content into the in-memory store, and spawns the /// background write task. Safe to call only once; subsequent calls are no-ops. pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { if PIPELINE_DB.get().is_some() { return Ok(()); } let options = SqliteConnectOptions::new() .filename(db_path) .create_if_missing(true); let pool = SqlitePool::connect_with(options).await?; sqlx::migrate!("./migrations").run(&pool).await?; // Load existing content into the in-memory store. let rows: Vec<(String, Option)> = sqlx::query_as("SELECT id, content FROM pipeline_items WHERE content IS NOT NULL") .fetch_all(&pool) .await?; let mut content_map = HashMap::new(); for (id, content) in rows { if let Some(c) = content { content_map.insert(id, c); } } super::content_store::init_content_store(content_map); let (tx, mut rx) = mpsc::unbounded_channel::(); tokio::spawn(async move { while let Some(msg) = rx.recv().await { // The "deleted" sentinel means the caller wants the row gone. // Issue a real DELETE so the shadow table stays clean and // sync_crdt_stages_from_db cannot resurrect a tombstoned item on // the next restart. if msg.stage == "deleted" { let result = sqlx::query("DELETE FROM pipeline_items WHERE id = ?1") .bind(&msg.story_id) .execute(&pool) .await; if let Err(e) = result { slog!("[db] Shadow delete failed for '{}': {e}", msg.story_id); } continue; } let now = chrono::Utc::now().to_rfc3339(); let result = sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \ ON CONFLICT(id) DO UPDATE SET \ name = excluded.name, \ stage = excluded.stage, \ agent = excluded.agent, \ retry_count = excluded.retry_count, \ depends_on = excluded.depends_on, \ content = COALESCE(excluded.content, pipeline_items.content), \ updated_at = excluded.updated_at", ) .bind(&msg.story_id) .bind(&msg.name) .bind(&msg.stage) .bind(&msg.agent) .bind(msg.retry_count) .bind(&msg.depends_on) .bind(&msg.content) .bind(&now) .execute(&pool) .await; if let Err(e) = result { slog!("[db] Shadow write failed for '{}': {e}", msg.story_id); } } }); let _ = PIPELINE_DB.set(PipelineDb { tx }); Ok(()) }