//! Recovery for half-written pipeline items (bug 1001). //! //! A half-written item is one whose content row exists in the content store / //! SQLite shadow but whose CRDT entry is absent or tombstoned. These items //! are invisible to every CRDT-driven read path (`list_refactors`, //! `get_pipeline_status`, `read_item`, …), and `update_story` / `delete_story` //! / `purge_story` all refuse to operate on them. They're effectively //! orphaned. //! //! This module discovers them and lifts each one onto a fresh non-tombstoned //! ID so the content becomes addressable again. //! //! The Stage 1 commit (`fix(1001):`) closes the door on *new* half-writes; //! this module exists to clean up the rows that were created before that //! commit landed. use crate::crdt_state; use crate::db::content_store::{ContentKey, all_content_ids, read_content}; use crate::db::ops::{ItemMeta, delete_item, next_item_number, write_item_with_content}; use crate::io::story_metadata::ItemType; /// A pipeline item whose content row exists but whose CRDT entry is missing /// or tombstoned. #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)] pub struct HalfWritten { /// The orphaned numeric story_id (as a string, matching CRDT keys). pub story_id: String, /// Best-effort human name pulled from the YAML front matter. Empty if /// the front matter was unparseable. pub name: String, /// True if the CRDT has a tombstone for this id; false if the id is /// simply absent (which can happen if the CRDT op was lost or never /// applied for reasons other than a tombstone). pub tombstoned: bool, } /// The outcome of recovering a single half-written item. #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)] pub struct RecoveryResult { /// The orphan's numeric id at the time of discovery. pub old_id: String, /// The fresh id the content was re-anchored to. pub new_id: String, /// Best-effort human name pulled from the YAML front matter. pub name: String, } /// Find all half-written items currently present on this node. /// /// Scans every id known to the in-memory content store and returns the ones /// for which `crdt_state::read_item` is `None` (i.e. there is no live CRDT /// entry). The `tombstoned` flag distinguishes the most common cause /// (allocator collided with a tombstone) from rarer split-brain causes. pub fn find_half_written_items() -> Vec { let mut out = Vec::new(); for id in all_content_ids() { if crdt_state::read_item(&id).is_some() { continue; } // No live CRDT entry — orphan. let content = read_content(ContentKey::Story(&id)).unwrap_or_default(); let name = parse_name_from_front_matter(&content).unwrap_or_default(); out.push(HalfWritten { story_id: id.clone(), name, tombstoned: crdt_state::is_tombstoned(&id), }); } out.sort_by(|a, b| a.story_id.cmp(&b.story_id)); out } /// Move every half-written item's content onto a fresh non-tombstoned id. /// /// For each orphan, this: /// 1. Allocates a fresh id via `next_item_number` (which now skips /// tombstones). /// 2. Re-inserts the same content body at the new id with stage = backlog /// and the parsed name. /// 3. Re-applies `item_type` and `depends_on` from the original YAML front /// matter where parseable. /// 4. Verifies the new entry is visible via `read_item`. On verification /// failure, the new id's content is rolled back (defence-in-depth) and /// the orphan is left in place. /// 5. Deletes the orphan's content-store + shadow-DB rows via `delete_item`. /// /// If `only` is provided, recovery operates only on orphans whose `story_id` /// is in the list — everything else is left alone. This is the safe default /// for a live system that may have many historic purged ids visible as /// orphans alongside the genuinely-recent half-writes. /// /// Returns a (`old_id`, `new_id`, `name`) mapping for every successful /// recovery. Orphans that failed to recover are simply omitted from the /// returned list and logged at WARN level. pub fn recover_half_written_items(only: Option<&[String]>) -> Vec { let orphans = find_half_written_items(); let mut results = Vec::with_capacity(orphans.len()); for orphan in orphans { if let Some(filter) = only && !filter.iter().any(|f| f == &orphan.story_id) { continue; } match recover_one(&orphan) { Ok(result) => results.push(result), Err(e) => { crate::slog_warn!( "[db::recover] could not recover half-written id '{id}': {e}", id = orphan.story_id ); } } } results } fn recover_one(orphan: &HalfWritten) -> Result { let content = read_content(ContentKey::Story(&orphan.story_id)) .ok_or_else(|| "content store row vanished between discovery and recovery".to_string())?; // Best-effort metadata extraction from the YAML front matter. These all // fall back to safe defaults if the line is missing or malformed. let name = parse_name_from_front_matter(&content).unwrap_or_default(); let item_type = parse_type_from_front_matter(&content); let depends_on = parse_depends_on_from_front_matter(&content); let new_number = next_item_number(); let new_id = new_number.to_string(); // Sanity: the allocator should never hand back a tombstone, but bail if // it somehow did so we don't reproduce the original bug. if crdt_state::is_tombstoned(&new_id) { return Err(format!( "allocator returned tombstoned id '{new_id}' for recovery — refusing to write" )); } write_item_with_content( &new_id, "1_backlog", &content, ItemMeta { name: if name.is_empty() { None } else { Some(name.clone()) }, ..Default::default() }, ); if let Some(t) = item_type { crdt_state::set_item_type(&new_id, Some(t)); } if !depends_on.is_empty() { crdt_state::set_depends_on(&new_id, &depends_on); } // Verify the new id materialised before we drop the orphan row. if crdt_state::read_item(&new_id).is_none() { // The new id didn't take — extremely unlikely after the Stage 1 // fixes, but if it happens, roll back the new write and leave the // orphan in place for a human to look at. delete_item(&new_id); return Err(format!( "newly-allocated id '{new_id}' did not register in CRDT" )); } // Drop the orphan content/shadow row. delete_item(&orphan.story_id); Ok(RecoveryResult { old_id: orphan.story_id.clone(), new_id, name, }) } /// Pull the YAML front-matter block (everything between the first two `---` /// lines, if any) as a slice of source lines. fn front_matter_lines(content: &str) -> Vec<&str> { let mut iter = content.lines(); let Some(first) = iter.next() else { return Vec::new(); }; if first.trim() != "---" { return Vec::new(); } let mut out = Vec::new(); for line in iter { if line.trim() == "---" { return out; } out.push(line); } Vec::new() // no closing `---` } /// Parse `name: "..."` (or unquoted) from the YAML front matter. fn parse_name_from_front_matter(content: &str) -> Option { for line in front_matter_lines(content) { let trimmed = line.trim_start(); if let Some(rest) = trimmed.strip_prefix("name:") { let value = rest.trim(); // Strip optional surrounding quotes. let unquoted = value .strip_prefix('"') .and_then(|s| s.strip_suffix('"')) .unwrap_or(value); if unquoted.is_empty() { return None; } return Some(unquoted.to_string()); } } None } /// Parse `type: ` from the YAML front matter. fn parse_type_from_front_matter(content: &str) -> Option { for line in front_matter_lines(content) { let trimmed = line.trim_start(); if let Some(rest) = trimmed.strip_prefix("type:") { let value = rest.trim(); return ItemType::from_str(value); } } None } /// Parse `depends_on: [a, b, c]` from the YAML front matter. /// /// Tolerates trailing whitespace and odd spacing; ignores entries that don't /// parse as `u32`. fn parse_depends_on_from_front_matter(content: &str) -> Vec { for line in front_matter_lines(content) { let trimmed = line.trim_start(); if let Some(rest) = trimmed.strip_prefix("depends_on:") { let value = rest.trim(); // Expect a `[ ... ]` array literal. let inner = value .strip_prefix('[') .and_then(|s| s.strip_suffix(']')) .unwrap_or(""); return inner .split(',') .filter_map(|s| s.trim().parse::().ok()) .collect(); } } Vec::new() } #[cfg(test)] mod tests { use super::*; /// Build a story body that mirrors what create_*_file writes. fn body(name: &str, item_type: &str, depends_on: Option<&[u32]>) -> String { let mut s = String::from("---\n"); s.push_str(&format!("type: {item_type}\n")); s.push_str(&format!("name: \"{}\"\n", name.replace('"', "\\\""))); if let Some(deps) = depends_on && !deps.is_empty() { let parts: Vec = deps.iter().map(u32::to_string).collect(); s.push_str(&format!("depends_on: [{}]\n", parts.join(", "))); } s.push_str("---\n\n# Heading\n\n## Acceptance Criteria\n\n- [ ] Real AC\n"); s } #[test] fn front_matter_parsers_extract_name_type_and_depends_on() { let content = body("My Refactor: typed Stage", "refactor", Some(&[42, 99])); assert_eq!( parse_name_from_front_matter(&content).as_deref(), Some("My Refactor: typed Stage") ); assert_eq!( parse_type_from_front_matter(&content), Some(ItemType::Refactor) ); assert_eq!(parse_depends_on_from_front_matter(&content), vec![42, 99]); } #[test] fn front_matter_parsers_handle_missing_optional_fields() { let content = body("Plain Story", "story", None); assert_eq!( parse_name_from_front_matter(&content).as_deref(), Some("Plain Story") ); assert_eq!( parse_depends_on_from_front_matter(&content), Vec::::new() ); } #[test] fn front_matter_parsers_handle_no_front_matter() { let content = "no front matter here\n"; assert!(parse_name_from_front_matter(content).is_none()); assert!(parse_type_from_front_matter(content).is_none()); assert!(parse_depends_on_from_front_matter(content).is_empty()); } #[test] fn find_returns_empty_when_all_items_have_crdt_entries() { crdt_state::init_for_test(); crate::db::ensure_content_store(); write_item_with_content( "9800", "1_backlog", &body("Healthy", "refactor", None), ItemMeta::named("Healthy"), ); let half = find_half_written_items(); assert!( half.iter().all(|h| h.story_id != "9800"), "9800 should not be reported as half-written: {half:?}" ); } #[test] fn recover_lifts_half_written_item_to_a_fresh_id() { crdt_state::init_for_test(); crate::db::ensure_content_store(); // Seed an item, tombstone it (this leaves a tombstone in the CRDT // set), then manually write content at the tombstoned id to // simulate a pre-fix half-write. let old_id = "9801"; let content = body("Tombstoned Then Half-Written", "refactor", Some(&[100])); write_item_with_content( old_id, "1_backlog", &content, ItemMeta::named("Tombstoned Then Half-Written"), ); crdt_state::evict_item(old_id).expect("evict should succeed"); // Manually re-write content at the now-tombstoned id to mimic the // pre-fix half-write (write_item will silently reject the CRDT side). crate::db::content_store::write_content(ContentKey::Story(old_id), &content); crdt_state::write_item( old_id, &crate::pipeline_state::Stage::Backlog, Some("Tombstoned Then Half-Written"), None, None, None, None, None, None, ); assert!( crdt_state::read_item(old_id).is_none(), "test setup is wrong — old id should still be invisible after the tombstone-blocked write_item" ); // Sanity: find_half_written_items picks it up. let half = find_half_written_items(); let entry = half .iter() .find(|h| h.story_id == old_id) .expect("half-written discovery should find the orphan"); assert!(entry.tombstoned, "orphan should be flagged as tombstoned"); assert_eq!(entry.name, "Tombstoned Then Half-Written"); // Run the recovery and inspect the mapping. let results = recover_half_written_items(None); let mapping = results .iter() .find(|r| r.old_id == old_id) .expect("recovery should produce a mapping for the orphan"); assert_ne!(mapping.new_id, mapping.old_id); assert_eq!(mapping.name, "Tombstoned Then Half-Written"); // The new id should be visible in the CRDT. let view = crdt_state::read_item(&mapping.new_id) .expect("new id must be live in CRDT after recovery"); assert_eq!(view.name(), "Tombstoned Then Half-Written"); // depends_on was carried across. assert_eq!(view.depends_on(), &[100]); // item_type was carried across. assert_eq!(view.item_type(), Some(ItemType::Refactor)); // The old orphan row is gone. assert!( read_content(ContentKey::Story(old_id)).is_none(), "orphan content should be cleared after recovery" ); // Re-running recovery is a no-op (no orphans left). let again = recover_half_written_items(None); assert!( again.iter().all(|r| r.old_id != old_id), "recovery should be idempotent for the same orphan" ); } /// The `only` filter restricts recovery to a specific id set; orphans /// outside the filter are left alone. #[test] fn recover_only_filter_restricts_recovery_to_named_ids() { crdt_state::init_for_test(); crate::db::ensure_content_store(); // Two orphans, only one in the filter. let recover_id = "9810"; let keep_id = "9811"; for id in [recover_id, keep_id] { let name = format!("Item {id}"); let content = body(&name, "refactor", None); write_item_with_content(id, "1_backlog", &content, ItemMeta::named(&name)); crdt_state::evict_item(id).expect("evict should succeed"); crate::db::content_store::write_content(ContentKey::Story(id), &content); } let results = recover_half_written_items(Some(&[recover_id.to_string()])); assert_eq!(results.len(), 1, "exactly one orphan should be recovered"); assert_eq!(results[0].old_id, recover_id); // The filtered-out orphan must still be a half-written item. let half = find_half_written_items(); assert!( half.iter().any(|h| h.story_id == keep_id), "filtered-out orphan should remain" ); assert!( half.iter().all(|h| h.story_id != recover_id), "recovered orphan should be gone" ); } }