//! Write operations for the pipeline — content, stage transitions, and deletions. //! //! Each function updates three layers atomically in order: the in-memory //! content store, the CRDT (source of truth for metadata), and the SQLite //! shadow table (via the background channel). use super::content_store::{ all_content_ids, delete_content, ensure_content_store, read_content, write_content, }; use super::shadow_write::{PIPELINE_DB, PipelineWriteMsg}; use super::yaml_legacy::parse_front_matter; /// Typed metadata for a pipeline item write. /// /// Replaces the prior YAML-parsing write path (story 864): callers now pass /// metadata explicitly instead of round-tripping it through a serialized /// front-matter blob. Every field is `Option`-typed; `None` means /// "leave unchanged" on update, "use the default" on insert. #[derive(Default, Clone, Debug)] pub struct ItemMeta { pub name: Option, pub agent: Option, pub retry_count: Option, pub blocked: Option, pub depends_on: Option>, } impl ItemMeta { /// Convenience constructor for the common "just set a name" case. #[cfg(test)] pub fn named(name: impl Into) -> Self { Self { name: Some(name.into()), ..Self::default() } } /// Parse YAML front-matter from a content string into typed metadata. /// /// This is an explicit caller-side conversion — the write path itself /// no longer parses YAML. Use this when the caller has a raw content /// string with front-matter and wants the metadata to flow into the /// CRDT. Returns `Self::default()` if parsing fails or there is no /// front-matter present. pub fn from_yaml(content: &str) -> Self { match parse_front_matter(content) { Ok(m) => Self { name: m.name, agent: m.agent, retry_count: m.retry_count.map(|r| r as i64), blocked: m.blocked, depends_on: m.depends_on, }, Err(_) => Self::default(), } } } /// Write a pipeline item from in-memory content (no filesystem access). /// /// This is the primary write path for the DB-backed pipeline. It updates /// the CRDT, the in-memory content store, and the SQLite shadow table. /// /// The metadata in `meta` is authoritative: this function does NOT parse /// `content` to extract front-matter fields. Callers must pass typed /// metadata explicitly via `ItemMeta`. pub fn write_item_with_content(story_id: &str, stage: &str, content: &str, meta: ItemMeta) { let depends_on_json = meta .depends_on .as_ref() .and_then(|d| serde_json::to_string(d).ok()); // Update in-memory content store. ensure_content_store(); write_content(story_id, content); // Primary: CRDT ops. let merged_at_ts = if crate::pipeline_state::Stage::from_dir(stage) .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. })) { Some(chrono::Utc::now().timestamp() as f64) } else { None }; crate::crdt_state::write_item( story_id, stage, meta.name.as_deref(), meta.agent.as_deref(), meta.retry_count, meta.blocked, depends_on_json.as_deref(), None, None, merged_at_ts, ); // Shadow: pipeline_items table (only when DB is initialised). if let Some(db) = PIPELINE_DB.get() { let msg = PipelineWriteMsg { story_id: story_id.to_string(), stage: stage.to_string(), name: meta.name, agent: meta.agent, retry_count: meta.retry_count, blocked: meta.blocked, depends_on: depends_on_json, content: Some(content.to_string()), }; let _ = db.tx.send(msg); } } /// Update only the stage of an existing item (used by move operations). /// /// Reads current content from the in-memory store, updates the CRDT stage, /// and persists the change. Optionally modifies the content (e.g. to clear /// front-matter fields). pub fn move_item_stage( story_id: &str, new_stage: &str, content_transform: Option<&dyn Fn(&str) -> String>, ) { let current_content = read_content(story_id); let content = match (¤t_content, content_transform) { (Some(c), Some(transform)) => { let new_content = transform(c); write_content(story_id, &new_content); Some(new_content) } (Some(c), None) => Some(c.clone()), _ => None, }; let (name, agent, _ignored_retry_count, blocked, depends_on) = content .as_deref() .or(current_content.as_deref()) .and_then(|c| parse_front_matter(c).ok()) .map(|meta| { ( meta.name, meta.agent, meta.retry_count.map(|r| r as i64), meta.blocked, meta.depends_on .as_ref() .and_then(|d| serde_json::to_string(d).ok()), ) }) .unwrap_or((None, None, None, None, None)); // CRDT stage transition. let merged_at_ts = if crate::pipeline_state::Stage::from_dir(new_stage) .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. })) { Some(chrono::Utc::now().timestamp() as f64) } else { None }; crate::crdt_state::write_item( story_id, new_stage, name.as_deref(), agent.as_deref(), None, blocked, depends_on.as_deref(), None, None, merged_at_ts, ); // Bug 780: stage transitions reset retry_count to 0. retry_count tracks // attempts at THIS stage's work (coding, merging, qa); a fresh attempt at // a new stage is conceptually distinct from prior attempts at a different // stage. `blocked` is preserved — that's a human-set signal that survives // transitions. crate::crdt_state::set_retry_count(story_id, 0); // Shadow table — always reset retry_count to 0 on stage transition. let retry_count: Option = Some(0); if let Some(db) = PIPELINE_DB.get() { let msg = PipelineWriteMsg { story_id: story_id.to_string(), stage: new_stage.to_string(), name, agent, retry_count, blocked, depends_on, content, }; let _ = db.tx.send(msg); } } /// Delete a story from the shadow table (fire-and-forget). pub fn delete_item(story_id: &str) { delete_content(story_id); if let Some(db) = PIPELINE_DB.get() { // Reuse the channel with a special "deleted" stage marker. // The background task will handle it. // Actually, we send a delete message by abusing the write — we'll // just remove it by setting stage to "deleted". let msg = PipelineWriteMsg { story_id: story_id.to_string(), stage: "deleted".to_string(), name: None, agent: None, retry_count: None, blocked: None, depends_on: None, content: None, }; let _ = db.tx.send(msg); } } /// Get the next available item number by scanning both the CRDT state /// and the in-memory content store for the highest existing number. pub fn next_item_number() -> u32 { let mut max_num: u32 = 0; // Scan CRDT items via typed projection. for item in crate::pipeline_state::read_all_typed() { let num_str: String = item .story_id .0 .chars() .take_while(|c| c.is_ascii_digit()) .collect(); if let Ok(n) = num_str.parse::() && n > max_num { max_num = n; } } // Also scan the content store (might have items not yet in CRDT). for id in all_content_ids() { let num_str: String = id.chars().take_while(|c| c.is_ascii_digit()).collect(); if let Ok(n) = num_str.parse::() && n > max_num { max_num = n; } } max_num + 1 }