//! Write operations for the pipeline — content, stage transitions, and deletions. //! //! Each function updates three layers atomically in order: the in-memory //! content store, the CRDT (source of truth for metadata), and the SQLite //! shadow table (via the background channel). use super::content_store::{ ContentKey, all_content_ids, delete_content, ensure_content_store, read_content, write_content, }; use super::shadow_write::{PIPELINE_DB, PipelineWriteMsg}; /// Typed metadata for a pipeline item write. /// /// Story 929: callers pass metadata explicitly — no YAML parsing. Every /// field is `Option`-typed; `None` means "leave unchanged" on update, /// "use the default" on insert. #[derive(Default, Clone, Debug)] pub struct ItemMeta { pub name: Option, pub agent: Option, pub depends_on: Option>, } impl ItemMeta { /// Convenience constructor for the common "just set a name" case. #[cfg(test)] pub fn named(name: impl Into) -> Self { Self { name: Some(name.into()), ..Self::default() } } } /// Normalise a stage string at the db boundary. /// /// Accepts the clean post-934 vocabulary (passthrough) and the pre-934 /// directory-style strings (`"2_current"`, `"4_merge"`, etc.) by mapping /// them to the clean form before handing off to `Stage::from_dir` (which /// itself only accepts clean form after stage 6). This keeps the public /// db API tolerant for callers that still pass legacy strings while the /// internal type stays strict. fn normalise_stage_str(stage: &str) -> &str { match stage { "0_upcoming" => "upcoming", "1_backlog" => "backlog", "2_current" => "coding", "2_blocked" => "blocked", "3_qa" => "qa", "4_merge" => "merge", "4_merge_failure" => "merge_failure", "4_merge_failure_final" => "merge_failure_final", "5_done" => "done", "6_archived" => "archived", "7_frozen" => "frozen", "7_review_hold" => "review_hold", other => other, } } /// Write a pipeline item from in-memory content (no filesystem access). /// /// This is the primary write path for the DB-backed pipeline. It updates /// the CRDT, the in-memory content store, and the SQLite shadow table. /// /// The metadata in `meta` is authoritative: this function does NOT parse /// `content` to extract front-matter fields. Callers must pass typed /// metadata explicitly via `ItemMeta`. pub fn write_item_with_content(story_id: &str, stage: &str, content: &str, meta: ItemMeta) { let depends_on_json = meta .depends_on .as_ref() .and_then(|d| serde_json::to_string(d).ok()); // Update in-memory content store. ensure_content_store(); write_content(ContentKey::Story(story_id), content); // Primary: CRDT ops. let stage = normalise_stage_str(stage); let Some(typed_stage) = crate::pipeline_state::Stage::from_dir(stage) else { crate::slog!( "[db] write_item_with_content: unknown stage '{stage}' for {story_id}; skipping CRDT write" ); return; }; let merged_at_ts = matches!(typed_stage, crate::pipeline_state::Stage::Done { .. }) .then(|| chrono::Utc::now().timestamp() as f64); crate::crdt_state::write_item( story_id, &typed_stage, meta.name.as_deref(), meta.agent.as_deref(), depends_on_json.as_deref(), merged_at_ts, ); // Shadow: pipeline_items table (only when DB is initialised). if let Some(db) = PIPELINE_DB.get() { let msg = PipelineWriteMsg { story_id: story_id.to_string(), stage: stage.to_string(), name: meta.name, agent: meta.agent, retry_count: None, depends_on: depends_on_json, content: Some(content.to_string()), }; let _ = db.tx.send(msg); } } /// Update only the stage of an existing item (used by move operations). /// /// Reads current content from the in-memory store, updates the CRDT stage, /// and persists the change. Optionally modifies the content (e.g. to clear /// front-matter fields). pub fn move_item_stage( story_id: &str, new_stage: &str, content_transform: Option<&dyn Fn(&str) -> String>, ) { let current_content = read_content(ContentKey::Story(story_id)); let content = match (¤t_content, content_transform) { (Some(c), Some(transform)) => { let new_content = transform(c); write_content(ContentKey::Story(story_id), &new_content); Some(new_content) } (Some(c), None) => Some(c.clone()), _ => None, }; // Story 929: metadata (name/agent/depends_on) is owned by the CRDT typed // registers — no need to re-derive it from the content body's YAML front // matter on every stage transition. Pass `None` for those fields so // write_item leaves the existing registers untouched. let new_stage = normalise_stage_str(new_stage); let Some(typed_stage) = crate::pipeline_state::Stage::from_dir(new_stage) else { crate::slog!( "[db] move_item_stage: unknown stage '{new_stage}' for {story_id}; skipping CRDT write" ); return; }; let merged_at_ts = matches!(typed_stage, crate::pipeline_state::Stage::Done { .. }) .then(|| chrono::Utc::now().timestamp() as f64); crate::crdt_state::write_item(story_id, &typed_stage, None, None, None, merged_at_ts); // Bug 780: stage transitions reset retry_count to 0. retry_count tracks // attempts at THIS stage's work (coding, merging, qa); a fresh attempt at // a new stage is conceptually distinct from prior attempts at a different // stage. `blocked` is preserved — that's a human-set signal that survives // transitions. crate::crdt_state::set_retry_count(story_id, 0); // Shadow table — read current metadata from the CRDT so the SQLite // mirror stays in sync. Always reset retry_count to 0 on stage transition. if let Some(db) = PIPELINE_DB.get() { let view = crate::crdt_state::read_item(story_id); let name = view.as_ref().map(|v| v.name().to_string()); let agent = view.as_ref().and_then(|v| v.agent().map(|a| a.to_string())); let depends_on = view .as_ref() .map(|v| v.depends_on()) .filter(|d| !d.is_empty()) .and_then(|d| serde_json::to_string(d).ok()); let msg = PipelineWriteMsg { story_id: story_id.to_string(), stage: new_stage.to_string(), name, agent, retry_count: Some(0), depends_on, content, }; let _ = db.tx.send(msg); } } /// Shadow-write the updated agent field for an existing pipeline item. /// /// Called by [`crate::crdt_state::set_agent`] after the CRDT register is updated /// so `pipeline_items.agent` stays in sync. Reads the full current metadata from /// the CRDT (stage, name, depends_on, retry_count) to avoid overwriting other /// columns with stale values — only the `agent` column carries the new data. pub fn sync_item_agent(story_id: &str) { let Some(db) = PIPELINE_DB.get() else { return; }; let Some(view) = crate::crdt_state::read_item(story_id) else { return; }; let stage = view.stage().dir_name().to_string(); let name = Some(view.name().to_string()); let agent = view.agent().map(|a| a.as_str().to_string()); let depends_on = { let d = view.depends_on(); if d.is_empty() { None } else { serde_json::to_string(d).ok() } }; let retry_count = Some(i64::from(view.retry_count())); let msg = PipelineWriteMsg { story_id: story_id.to_string(), stage, name, agent, retry_count, depends_on, content: None, }; let _ = db.tx.send(msg); } /// Delete a story from the shadow table (fire-and-forget). pub fn delete_item(story_id: &str) { delete_content(ContentKey::Story(story_id)); if let Some(db) = PIPELINE_DB.get() { // Reuse the channel with a special "deleted" stage marker. // The background task will handle it. // Actually, we send a delete message by abusing the write — we'll // just remove it by setting stage to "deleted". let msg = PipelineWriteMsg { story_id: story_id.to_string(), stage: "deleted".to_string(), name: None, agent: None, retry_count: None, depends_on: None, content: None, }; let _ = db.tx.send(msg); } } /// Delete a story from the shadow table, awaiting the SQLite write. /// /// Unlike [`delete_item`], this function issues a direct `DELETE FROM /// pipeline_items` via the shared pool and awaits the result — so the row /// is gone before this function returns. Use this from async call sites /// where durability of the deletion matters (e.g. story deletion, startup /// migration). Falls back to the fire-and-forget channel when the shared /// pool is not yet initialised. pub async fn delete_item_sync(story_id: &str) { delete_content(ContentKey::Story(story_id)); if let Some(pool) = super::shadow_write::get_shared_pool() { if let Err(e) = sqlx::query("DELETE FROM pipeline_items WHERE id = ?1") .bind(story_id) .execute(pool) .await { crate::slog_warn!( "[db] Synchronous delete from pipeline_items failed for '{}': {e}", story_id ); } } else if let Some(db) = PIPELINE_DB.get() { let msg = PipelineWriteMsg { story_id: story_id.to_string(), stage: "deleted".to_string(), name: None, agent: None, retry_count: None, depends_on: None, content: None, }; let _ = db.tx.send(msg); } } /// Sync the shadow table's `name` column after a CRDT name-register write. /// /// Reads the current item from the CRDT (which already holds the new name after /// `apply_and_persist`) and sends a `PipelineWriteMsg` so the SQLite mirror /// stays in sync. All other columns (stage, agent, retry_count, depends_on) /// are preserved from the live CRDT view; `content` is left as `None` so the /// UPSERT's `COALESCE` keeps the existing value. /// /// No-ops if the DB is not initialised or the item is not in the CRDT. pub fn sync_item_name(story_id: &str) { let Some(db) = PIPELINE_DB.get() else { return }; let Some(view) = crate::crdt_state::read_item(story_id) else { return; }; let depends_on = { let d = view.depends_on(); if d.is_empty() { None } else { serde_json::to_string(d).ok() } }; let msg = PipelineWriteMsg { story_id: story_id.to_string(), stage: view.stage().dir_name().to_string(), name: Some(view.name().to_string()), agent: view.agent().map(|a| a.to_string()), retry_count: Some(view.retry_count() as i64), depends_on, content: None, }; let _ = db.tx.send(msg); } /// Sync the `depends_on` field of a pipeline item from the CRDT to the shadow table. /// /// Called after [`crate::crdt_state::set_depends_on`] updates the CRDT register so /// that the SQLite shadow table stays in lock-step. Reads the full current view from /// the CRDT (stage, name, agent, retry_count, depends_on) and sends a /// [`PipelineWriteMsg`] over [`PIPELINE_DB`]`.tx`. Pattern mirrors /// [`move_item_stage`] lines 157-176. No-op when the CRDT is uninitialised or the /// story_id is not found. pub fn sync_item_depends_on(story_id: &str) { let Some(db) = PIPELINE_DB.get() else { return; }; let Some(view) = crate::crdt_state::read_item(story_id) else { return; }; let depends_on = { let d = view.depends_on(); if d.is_empty() { None } else { serde_json::to_string(d).ok() } }; let msg = PipelineWriteMsg { story_id: story_id.to_string(), stage: view.stage().dir_name().to_string(), name: Some(view.name().to_string()), agent: view.agent().map(|a| a.to_string()), retry_count: Some(view.retry_count() as i64), depends_on, content: None, }; let _ = db.tx.send(msg); } /// Get the next available item number by scanning the CRDT state, the /// in-memory content store, AND the tombstone set for the highest existing /// number. /// /// Tombstoned IDs are excluded from `read_all_typed` (their CRDT entry is /// `is_deleted`) and from `all_content_ids` (their content row is cleared by /// `evict_item`). Without consulting the tombstone set, the allocator can /// hand out a tombstoned numeric ID; `crdt_state::write_item` would then /// silently reject the new entry while the content store and SQLite shadow /// happily accept it, producing a split-brain half-write (bug 1001). pub fn next_item_number() -> u32 { let mut max_num: u32 = 0; let parse_leading_digits = |s: &str| -> Option { let num_str: String = s.chars().take_while(|c| c.is_ascii_digit()).collect(); num_str.parse::().ok() }; // Scan CRDT items via typed projection. for item in crate::pipeline_state::read_all_typed() { if let Some(n) = parse_leading_digits(&item.story_id.0) && n > max_num { max_num = n; } } // Also scan the content store (might have items not yet in CRDT). for id in all_content_ids() { if let Some(n) = parse_leading_digits(&id) && n > max_num { max_num = n; } } // Also scan tombstones — a tombstoned ID still poisons that slot because // crdt_state::write_item rejects writes for tombstoned IDs. Without this // pass, the next allocated ID can collide with a tombstone and produce // a half-write (bug 1001). for id in crate::crdt_state::tombstoned_ids() { if let Some(n) = parse_leading_digits(&id) && n > max_num { max_num = n; } } max_num + 1 } #[cfg(test)] mod tests { use super::*; use crate::db::shadow_write; /// `shadow_write::init` spawns its background task on the calling runtime. /// Under `#[tokio::test]` that runtime is per-test and drops when the test /// ends, killing the task. This OnceLock holds a multi-thread runtime that /// persists for the lifetime of the test binary so the write loop stays alive /// across all tests that share `PIPELINE_DB`. static SHADOW_RT: std::sync::OnceLock = std::sync::OnceLock::new(); async fn ensure_shadow_db() { static INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new(); if INIT.get().is_some() { return; } let rt = SHADOW_RT.get_or_init(|| { tokio::runtime::Builder::new_multi_thread() .worker_threads(1) .enable_all() .build() .expect("shadow rt") }); rt.spawn(async { static INNER: std::sync::OnceLock<()> = std::sync::OnceLock::new(); if INNER.get().is_some() { return; } let tmp = tempfile::tempdir().expect("tmp"); let db_path = tmp.path().join("pipeline.db"); std::mem::forget(tmp); shadow_write::init(&db_path).await.expect("shadow init"); let _ = INNER.set(()); }) .await .expect("shadow init task"); let _ = INIT.set(()); } /// Regression test for story 1097: `set_depends_on` must sync the shadow /// table. Before the fix, the CRDT register was updated but the /// `pipeline_items.depends_on` column was never written. #[tokio::test] async fn set_depends_on_syncs_shadow_table() { crate::crdt_state::init_for_test(); ensure_content_store(); ensure_shadow_db().await; let story_id = "1097_story_depends_on_shadow_drift"; // Insert the story so it exists in both the CRDT and the shadow table. write_item_with_content( story_id, "backlog", "---\nname: Depends On Shadow Drift\n---\n", ItemMeta::named("Depends On Shadow Drift"), ); // Let the initial shadow write land. tokio::time::sleep(std::time::Duration::from_millis(50)).await; // This is the write under test: it must update the shadow table. let ok = crate::crdt_state::set_depends_on(story_id, &[1, 2]); assert!(ok, "set_depends_on must return true for an existing item"); // Let the shadow write land. tokio::time::sleep(std::time::Duration::from_millis(50)).await; let pool = shadow_write::get_shared_pool().expect("pool must be initialised"); let row: (Option,) = sqlx::query_as("SELECT depends_on FROM pipeline_items WHERE id = ?1") .bind(story_id) .fetch_one(pool) .await .expect("row must exist in shadow table"); assert_eq!( row.0.as_deref(), Some("[1,2]"), "pipeline_items.depends_on must reflect the set_depends_on call" ); } }