feat(1001): recover_half_written_items MCP tool
Adds db::recover, a discovery + recovery layer for pipeline items that got half-written before the Stage 1 fix landed (content in content store + SQLite shadow, no live CRDT entry). For each orphan, the content body is re-anchored to a fresh non-tombstoned id and the old id's content row is cleared. Exposed as the recover_half_written_items MCP tool. dry_run defaults to true so the caller can review what would change before mutating. YAML front-matter parsing is hand-rolled and scoped to the three fields the create_*_file path emits (name, type, depends_on). It tolerates missing or malformed lines by falling back to safe defaults; the orphan is recovered with the best metadata we can pull from the body and the rest is left to the operator to fix up. The discovery step is read-only and idempotent. Recovery is also idempotent in the sense that once an orphan is lifted, the next discovery pass won't see it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -17,11 +17,14 @@
|
||||
pub mod content_store;
|
||||
/// Write operations for the pipeline — content, stage transitions, and deletions.
|
||||
pub mod ops;
|
||||
/// Recovery for half-written pipeline items (bug 1001 backfill).
|
||||
pub mod recover;
|
||||
/// Background shadow-write task — persists pipeline items to SQLite asynchronously.
|
||||
pub mod shadow_write;
|
||||
|
||||
pub use content_store::{ContentKey, all_content_ids, delete_content, read_content, write_content};
|
||||
pub use ops::{ItemMeta, delete_item, move_item_stage, next_item_number, write_item_with_content};
|
||||
pub use recover::{find_half_written_items, recover_half_written_items};
|
||||
pub use shadow_write::init;
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -0,0 +1,391 @@
|
||||
//! Recovery for half-written pipeline items (bug 1001).
|
||||
//!
|
||||
//! A half-written item is one whose content row exists in the content store /
|
||||
//! SQLite shadow but whose CRDT entry is absent or tombstoned. These items
|
||||
//! are invisible to every CRDT-driven read path (`list_refactors`,
|
||||
//! `get_pipeline_status`, `read_item`, …), and `update_story` / `delete_story`
|
||||
//! / `purge_story` all refuse to operate on them. They're effectively
|
||||
//! orphaned.
|
||||
//!
|
||||
//! This module discovers them and lifts each one onto a fresh non-tombstoned
|
||||
//! ID so the content becomes addressable again.
|
||||
//!
|
||||
//! The Stage 1 commit (`fix(1001):`) closes the door on *new* half-writes;
|
||||
//! this module exists to clean up the rows that were created before that
|
||||
//! commit landed.
|
||||
|
||||
use crate::crdt_state;
|
||||
use crate::db::content_store::{ContentKey, all_content_ids, read_content};
|
||||
use crate::db::ops::{ItemMeta, delete_item, next_item_number, write_item_with_content};
|
||||
use crate::io::story_metadata::ItemType;
|
||||
|
||||
/// A pipeline item whose content row exists but whose CRDT entry is missing
|
||||
/// or tombstoned.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
|
||||
pub struct HalfWritten {
|
||||
/// The orphaned numeric story_id (as a string, matching CRDT keys).
|
||||
pub story_id: String,
|
||||
/// Best-effort human name pulled from the YAML front matter. Empty if
|
||||
/// the front matter was unparseable.
|
||||
pub name: String,
|
||||
/// True if the CRDT has a tombstone for this id; false if the id is
|
||||
/// simply absent (which can happen if the CRDT op was lost or never
|
||||
/// applied for reasons other than a tombstone).
|
||||
pub tombstoned: bool,
|
||||
}
|
||||
|
||||
/// The outcome of recovering a single half-written item.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
|
||||
pub struct RecoveryResult {
|
||||
/// The orphan's numeric id at the time of discovery.
|
||||
pub old_id: String,
|
||||
/// The fresh id the content was re-anchored to.
|
||||
pub new_id: String,
|
||||
/// Best-effort human name pulled from the YAML front matter.
|
||||
pub name: String,
|
||||
}
|
||||
|
||||
/// Find all half-written items currently present on this node.
|
||||
///
|
||||
/// Scans every id known to the in-memory content store and returns the ones
|
||||
/// for which `crdt_state::read_item` is `None` (i.e. there is no live CRDT
|
||||
/// entry). The `tombstoned` flag distinguishes the most common cause
|
||||
/// (allocator collided with a tombstone) from rarer split-brain causes.
|
||||
pub fn find_half_written_items() -> Vec<HalfWritten> {
|
||||
let mut out = Vec::new();
|
||||
for id in all_content_ids() {
|
||||
if crdt_state::read_item(&id).is_some() {
|
||||
continue;
|
||||
}
|
||||
// No live CRDT entry — orphan.
|
||||
let content = read_content(ContentKey::Story(&id)).unwrap_or_default();
|
||||
let name = parse_name_from_front_matter(&content).unwrap_or_default();
|
||||
out.push(HalfWritten {
|
||||
story_id: id.clone(),
|
||||
name,
|
||||
tombstoned: crdt_state::is_tombstoned(&id),
|
||||
});
|
||||
}
|
||||
out.sort_by(|a, b| a.story_id.cmp(&b.story_id));
|
||||
out
|
||||
}
|
||||
|
||||
/// Move every half-written item's content onto a fresh non-tombstoned id.
|
||||
///
|
||||
/// For each orphan, this:
|
||||
/// 1. Allocates a fresh id via `next_item_number` (which now skips
|
||||
/// tombstones).
|
||||
/// 2. Re-inserts the same content body at the new id with stage = backlog
|
||||
/// and the parsed name.
|
||||
/// 3. Re-applies `item_type` and `depends_on` from the original YAML front
|
||||
/// matter where parseable.
|
||||
/// 4. Verifies the new entry is visible via `read_item`. On verification
|
||||
/// failure, the new id's content is rolled back (defence-in-depth) and
|
||||
/// the orphan is left in place.
|
||||
/// 5. Deletes the orphan's content-store + shadow-DB rows via `delete_item`.
|
||||
///
|
||||
/// Returns a (`old_id`, `new_id`, `name`) mapping for every successful
|
||||
/// recovery. Orphans that failed to recover are simply omitted from the
|
||||
/// returned list and logged at WARN level.
|
||||
pub fn recover_half_written_items() -> Vec<RecoveryResult> {
|
||||
let orphans = find_half_written_items();
|
||||
let mut results = Vec::with_capacity(orphans.len());
|
||||
for orphan in orphans {
|
||||
match recover_one(&orphan) {
|
||||
Ok(result) => results.push(result),
|
||||
Err(e) => {
|
||||
crate::slog_warn!(
|
||||
"[db::recover] could not recover half-written id '{id}': {e}",
|
||||
id = orphan.story_id
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
results
|
||||
}
|
||||
|
||||
fn recover_one(orphan: &HalfWritten) -> Result<RecoveryResult, String> {
|
||||
let content = read_content(ContentKey::Story(&orphan.story_id))
|
||||
.ok_or_else(|| "content store row vanished between discovery and recovery".to_string())?;
|
||||
|
||||
// Best-effort metadata extraction from the YAML front matter. These all
|
||||
// fall back to safe defaults if the line is missing or malformed.
|
||||
let name = parse_name_from_front_matter(&content).unwrap_or_default();
|
||||
let item_type = parse_type_from_front_matter(&content);
|
||||
let depends_on = parse_depends_on_from_front_matter(&content);
|
||||
|
||||
let new_number = next_item_number();
|
||||
let new_id = new_number.to_string();
|
||||
|
||||
// Sanity: the allocator should never hand back a tombstone, but bail if
|
||||
// it somehow did so we don't reproduce the original bug.
|
||||
if crdt_state::is_tombstoned(&new_id) {
|
||||
return Err(format!(
|
||||
"allocator returned tombstoned id '{new_id}' for recovery — refusing to write"
|
||||
));
|
||||
}
|
||||
|
||||
write_item_with_content(
|
||||
&new_id,
|
||||
"1_backlog",
|
||||
&content,
|
||||
ItemMeta {
|
||||
name: if name.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(name.clone())
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
if let Some(t) = item_type {
|
||||
crdt_state::set_item_type(&new_id, Some(t));
|
||||
}
|
||||
if !depends_on.is_empty() {
|
||||
crdt_state::set_depends_on(&new_id, &depends_on);
|
||||
}
|
||||
|
||||
// Verify the new id materialised before we drop the orphan row.
|
||||
if crdt_state::read_item(&new_id).is_none() {
|
||||
// The new id didn't take — extremely unlikely after the Stage 1
|
||||
// fixes, but if it happens, roll back the new write and leave the
|
||||
// orphan in place for a human to look at.
|
||||
delete_item(&new_id);
|
||||
return Err(format!(
|
||||
"newly-allocated id '{new_id}' did not register in CRDT"
|
||||
));
|
||||
}
|
||||
|
||||
// Drop the orphan content/shadow row.
|
||||
delete_item(&orphan.story_id);
|
||||
|
||||
Ok(RecoveryResult {
|
||||
old_id: orphan.story_id.clone(),
|
||||
new_id,
|
||||
name,
|
||||
})
|
||||
}
|
||||
|
||||
/// Pull the YAML front-matter block (everything between the first two `---`
|
||||
/// lines, if any) as a slice of source lines.
|
||||
fn front_matter_lines(content: &str) -> Vec<&str> {
|
||||
let mut iter = content.lines();
|
||||
let Some(first) = iter.next() else {
|
||||
return Vec::new();
|
||||
};
|
||||
if first.trim() != "---" {
|
||||
return Vec::new();
|
||||
}
|
||||
let mut out = Vec::new();
|
||||
for line in iter {
|
||||
if line.trim() == "---" {
|
||||
return out;
|
||||
}
|
||||
out.push(line);
|
||||
}
|
||||
Vec::new() // no closing `---`
|
||||
}
|
||||
|
||||
/// Parse `name: "..."` (or unquoted) from the YAML front matter.
|
||||
fn parse_name_from_front_matter(content: &str) -> Option<String> {
|
||||
for line in front_matter_lines(content) {
|
||||
let trimmed = line.trim_start();
|
||||
if let Some(rest) = trimmed.strip_prefix("name:") {
|
||||
let value = rest.trim();
|
||||
// Strip optional surrounding quotes.
|
||||
let unquoted = value
|
||||
.strip_prefix('"')
|
||||
.and_then(|s| s.strip_suffix('"'))
|
||||
.unwrap_or(value);
|
||||
if unquoted.is_empty() {
|
||||
return None;
|
||||
}
|
||||
return Some(unquoted.to_string());
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Parse `type: <story|bug|spike|refactor|epic>` from the YAML front matter.
|
||||
fn parse_type_from_front_matter(content: &str) -> Option<ItemType> {
|
||||
for line in front_matter_lines(content) {
|
||||
let trimmed = line.trim_start();
|
||||
if let Some(rest) = trimmed.strip_prefix("type:") {
|
||||
let value = rest.trim();
|
||||
return ItemType::from_str(value);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Parse `depends_on: [a, b, c]` from the YAML front matter.
|
||||
///
|
||||
/// Tolerates trailing whitespace and odd spacing; ignores entries that don't
|
||||
/// parse as `u32`.
|
||||
fn parse_depends_on_from_front_matter(content: &str) -> Vec<u32> {
|
||||
for line in front_matter_lines(content) {
|
||||
let trimmed = line.trim_start();
|
||||
if let Some(rest) = trimmed.strip_prefix("depends_on:") {
|
||||
let value = rest.trim();
|
||||
// Expect a `[ ... ]` array literal.
|
||||
let inner = value
|
||||
.strip_prefix('[')
|
||||
.and_then(|s| s.strip_suffix(']'))
|
||||
.unwrap_or("");
|
||||
return inner
|
||||
.split(',')
|
||||
.filter_map(|s| s.trim().parse::<u32>().ok())
|
||||
.collect();
|
||||
}
|
||||
}
|
||||
Vec::new()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
/// Build a story body that mirrors what create_*_file writes.
|
||||
fn body(name: &str, item_type: &str, depends_on: Option<&[u32]>) -> String {
|
||||
let mut s = String::from("---\n");
|
||||
s.push_str(&format!("type: {item_type}\n"));
|
||||
s.push_str(&format!("name: \"{}\"\n", name.replace('"', "\\\"")));
|
||||
if let Some(deps) = depends_on
|
||||
&& !deps.is_empty()
|
||||
{
|
||||
let parts: Vec<String> = deps.iter().map(u32::to_string).collect();
|
||||
s.push_str(&format!("depends_on: [{}]\n", parts.join(", ")));
|
||||
}
|
||||
s.push_str("---\n\n# Heading\n\n## Acceptance Criteria\n\n- [ ] Real AC\n");
|
||||
s
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn front_matter_parsers_extract_name_type_and_depends_on() {
|
||||
let content = body("My Refactor: typed Stage", "refactor", Some(&[42, 99]));
|
||||
assert_eq!(
|
||||
parse_name_from_front_matter(&content).as_deref(),
|
||||
Some("My Refactor: typed Stage")
|
||||
);
|
||||
assert_eq!(
|
||||
parse_type_from_front_matter(&content),
|
||||
Some(ItemType::Refactor)
|
||||
);
|
||||
assert_eq!(parse_depends_on_from_front_matter(&content), vec![42, 99]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn front_matter_parsers_handle_missing_optional_fields() {
|
||||
let content = body("Plain Story", "story", None);
|
||||
assert_eq!(
|
||||
parse_name_from_front_matter(&content).as_deref(),
|
||||
Some("Plain Story")
|
||||
);
|
||||
assert_eq!(
|
||||
parse_depends_on_from_front_matter(&content),
|
||||
Vec::<u32>::new()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn front_matter_parsers_handle_no_front_matter() {
|
||||
let content = "no front matter here\n";
|
||||
assert!(parse_name_from_front_matter(content).is_none());
|
||||
assert!(parse_type_from_front_matter(content).is_none());
|
||||
assert!(parse_depends_on_from_front_matter(content).is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn find_returns_empty_when_all_items_have_crdt_entries() {
|
||||
crdt_state::init_for_test();
|
||||
crate::db::ensure_content_store();
|
||||
write_item_with_content(
|
||||
"9800",
|
||||
"1_backlog",
|
||||
&body("Healthy", "refactor", None),
|
||||
ItemMeta::named("Healthy"),
|
||||
);
|
||||
let half = find_half_written_items();
|
||||
assert!(
|
||||
half.iter().all(|h| h.story_id != "9800"),
|
||||
"9800 should not be reported as half-written: {half:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn recover_lifts_half_written_item_to_a_fresh_id() {
|
||||
crdt_state::init_for_test();
|
||||
crate::db::ensure_content_store();
|
||||
|
||||
// Seed an item, tombstone it (this leaves a tombstone in the CRDT
|
||||
// set), then manually write content at the tombstoned id to
|
||||
// simulate a pre-fix half-write.
|
||||
let old_id = "9801";
|
||||
let content = body("Tombstoned Then Half-Written", "refactor", Some(&[100]));
|
||||
write_item_with_content(
|
||||
old_id,
|
||||
"1_backlog",
|
||||
&content,
|
||||
ItemMeta::named("Tombstoned Then Half-Written"),
|
||||
);
|
||||
crdt_state::evict_item(old_id).expect("evict should succeed");
|
||||
// Manually re-write content at the now-tombstoned id to mimic the
|
||||
// pre-fix half-write (write_item will silently reject the CRDT side).
|
||||
crate::db::content_store::write_content(ContentKey::Story(old_id), &content);
|
||||
crdt_state::write_item(
|
||||
old_id,
|
||||
&crate::pipeline_state::Stage::Backlog,
|
||||
Some("Tombstoned Then Half-Written"),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
assert!(
|
||||
crdt_state::read_item(old_id).is_none(),
|
||||
"test setup is wrong — old id should still be invisible after the tombstone-blocked write_item"
|
||||
);
|
||||
|
||||
// Sanity: find_half_written_items picks it up.
|
||||
let half = find_half_written_items();
|
||||
let entry = half
|
||||
.iter()
|
||||
.find(|h| h.story_id == old_id)
|
||||
.expect("half-written discovery should find the orphan");
|
||||
assert!(entry.tombstoned, "orphan should be flagged as tombstoned");
|
||||
assert_eq!(entry.name, "Tombstoned Then Half-Written");
|
||||
|
||||
// Run the recovery and inspect the mapping.
|
||||
let results = recover_half_written_items();
|
||||
let mapping = results
|
||||
.iter()
|
||||
.find(|r| r.old_id == old_id)
|
||||
.expect("recovery should produce a mapping for the orphan");
|
||||
assert_ne!(mapping.new_id, mapping.old_id);
|
||||
assert_eq!(mapping.name, "Tombstoned Then Half-Written");
|
||||
|
||||
// The new id should be visible in the CRDT.
|
||||
let view = crdt_state::read_item(&mapping.new_id)
|
||||
.expect("new id must be live in CRDT after recovery");
|
||||
assert_eq!(view.name(), "Tombstoned Then Half-Written");
|
||||
// depends_on was carried across.
|
||||
assert_eq!(view.depends_on(), &[100]);
|
||||
// item_type was carried across.
|
||||
assert_eq!(view.item_type(), Some(ItemType::Refactor));
|
||||
|
||||
// The old orphan row is gone.
|
||||
assert!(
|
||||
read_content(ContentKey::Story(old_id)).is_none(),
|
||||
"orphan content should be cleared after recovery"
|
||||
);
|
||||
|
||||
// Re-running recovery is a no-op (no orphans left).
|
||||
let again = recover_half_written_items();
|
||||
assert!(
|
||||
again.iter().all(|r| r.old_id != old_id),
|
||||
"recovery should be idempotent for the same orphan"
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user