From cd411ba44305f58498c3d9894154af1a08a4b8f4 Mon Sep 17 00:00:00 2001 From: Timmy Date: Wed, 13 May 2026 19:16:05 +0100 Subject: [PATCH] feat(1001): recover_half_written_items MCP tool Adds db::recover, a discovery + recovery layer for pipeline items that got half-written before the Stage 1 fix landed (content in content store + SQLite shadow, no live CRDT entry). For each orphan, the content body is re-anchored to a fresh non-tombstoned id and the old id's content row is cleared. Exposed as the recover_half_written_items MCP tool. dry_run defaults to true so the caller can review what would change before mutating. YAML front-matter parsing is hand-rolled and scoped to the three fields the create_*_file path emits (name, type, depends_on). It tolerates missing or malformed lines by falling back to safe defaults; the orphan is recovered with the best metadata we can pull from the body and the rest is left to the operator to fix up. The discovery step is read-only and idempotent. Recovery is also idempotent in the sense that once an orphan is lifted, the next discovery pass won't see it. Co-Authored-By: Claude Opus 4.7 (1M context) --- server/src/db/mod.rs | 3 + server/src/db/recover.rs | 391 ++++++++++++++++++ server/src/http/mcp/diagnostics/mod.rs | 40 ++ server/src/http/mcp/dispatch.rs | 2 + server/src/http/mcp/tools_list/mod.rs | 3 +- .../src/http/mcp/tools_list/system_tools.rs | 14 + 6 files changed, 452 insertions(+), 1 deletion(-) create mode 100644 server/src/db/recover.rs diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index b8e08d0a..d1c005c7 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -17,11 +17,14 @@ pub mod content_store; /// Write operations for the pipeline — content, stage transitions, and deletions. pub mod ops; +/// Recovery for half-written pipeline items (bug 1001 backfill). +pub mod recover; /// Background shadow-write task — persists pipeline items to SQLite asynchronously. pub mod shadow_write; pub use content_store::{ContentKey, all_content_ids, delete_content, read_content, write_content}; pub use ops::{ItemMeta, delete_item, move_item_stage, next_item_number, write_item_with_content}; +pub use recover::{find_half_written_items, recover_half_written_items}; pub use shadow_write::init; #[cfg(test)] diff --git a/server/src/db/recover.rs b/server/src/db/recover.rs new file mode 100644 index 00000000..d1b3363b --- /dev/null +++ b/server/src/db/recover.rs @@ -0,0 +1,391 @@ +//! Recovery for half-written pipeline items (bug 1001). +//! +//! A half-written item is one whose content row exists in the content store / +//! SQLite shadow but whose CRDT entry is absent or tombstoned. These items +//! are invisible to every CRDT-driven read path (`list_refactors`, +//! `get_pipeline_status`, `read_item`, …), and `update_story` / `delete_story` +//! / `purge_story` all refuse to operate on them. They're effectively +//! orphaned. +//! +//! This module discovers them and lifts each one onto a fresh non-tombstoned +//! ID so the content becomes addressable again. +//! +//! The Stage 1 commit (`fix(1001):`) closes the door on *new* half-writes; +//! this module exists to clean up the rows that were created before that +//! commit landed. + +use crate::crdt_state; +use crate::db::content_store::{ContentKey, all_content_ids, read_content}; +use crate::db::ops::{ItemMeta, delete_item, next_item_number, write_item_with_content}; +use crate::io::story_metadata::ItemType; + +/// A pipeline item whose content row exists but whose CRDT entry is missing +/// or tombstoned. +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)] +pub struct HalfWritten { + /// The orphaned numeric story_id (as a string, matching CRDT keys). + pub story_id: String, + /// Best-effort human name pulled from the YAML front matter. Empty if + /// the front matter was unparseable. + pub name: String, + /// True if the CRDT has a tombstone for this id; false if the id is + /// simply absent (which can happen if the CRDT op was lost or never + /// applied for reasons other than a tombstone). + pub tombstoned: bool, +} + +/// The outcome of recovering a single half-written item. +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)] +pub struct RecoveryResult { + /// The orphan's numeric id at the time of discovery. + pub old_id: String, + /// The fresh id the content was re-anchored to. + pub new_id: String, + /// Best-effort human name pulled from the YAML front matter. + pub name: String, +} + +/// Find all half-written items currently present on this node. +/// +/// Scans every id known to the in-memory content store and returns the ones +/// for which `crdt_state::read_item` is `None` (i.e. there is no live CRDT +/// entry). The `tombstoned` flag distinguishes the most common cause +/// (allocator collided with a tombstone) from rarer split-brain causes. +pub fn find_half_written_items() -> Vec { + let mut out = Vec::new(); + for id in all_content_ids() { + if crdt_state::read_item(&id).is_some() { + continue; + } + // No live CRDT entry — orphan. + let content = read_content(ContentKey::Story(&id)).unwrap_or_default(); + let name = parse_name_from_front_matter(&content).unwrap_or_default(); + out.push(HalfWritten { + story_id: id.clone(), + name, + tombstoned: crdt_state::is_tombstoned(&id), + }); + } + out.sort_by(|a, b| a.story_id.cmp(&b.story_id)); + out +} + +/// Move every half-written item's content onto a fresh non-tombstoned id. +/// +/// For each orphan, this: +/// 1. Allocates a fresh id via `next_item_number` (which now skips +/// tombstones). +/// 2. Re-inserts the same content body at the new id with stage = backlog +/// and the parsed name. +/// 3. Re-applies `item_type` and `depends_on` from the original YAML front +/// matter where parseable. +/// 4. Verifies the new entry is visible via `read_item`. On verification +/// failure, the new id's content is rolled back (defence-in-depth) and +/// the orphan is left in place. +/// 5. Deletes the orphan's content-store + shadow-DB rows via `delete_item`. +/// +/// Returns a (`old_id`, `new_id`, `name`) mapping for every successful +/// recovery. Orphans that failed to recover are simply omitted from the +/// returned list and logged at WARN level. +pub fn recover_half_written_items() -> Vec { + let orphans = find_half_written_items(); + let mut results = Vec::with_capacity(orphans.len()); + for orphan in orphans { + match recover_one(&orphan) { + Ok(result) => results.push(result), + Err(e) => { + crate::slog_warn!( + "[db::recover] could not recover half-written id '{id}': {e}", + id = orphan.story_id + ); + } + } + } + results +} + +fn recover_one(orphan: &HalfWritten) -> Result { + let content = read_content(ContentKey::Story(&orphan.story_id)) + .ok_or_else(|| "content store row vanished between discovery and recovery".to_string())?; + + // Best-effort metadata extraction from the YAML front matter. These all + // fall back to safe defaults if the line is missing or malformed. + let name = parse_name_from_front_matter(&content).unwrap_or_default(); + let item_type = parse_type_from_front_matter(&content); + let depends_on = parse_depends_on_from_front_matter(&content); + + let new_number = next_item_number(); + let new_id = new_number.to_string(); + + // Sanity: the allocator should never hand back a tombstone, but bail if + // it somehow did so we don't reproduce the original bug. + if crdt_state::is_tombstoned(&new_id) { + return Err(format!( + "allocator returned tombstoned id '{new_id}' for recovery — refusing to write" + )); + } + + write_item_with_content( + &new_id, + "1_backlog", + &content, + ItemMeta { + name: if name.is_empty() { + None + } else { + Some(name.clone()) + }, + ..Default::default() + }, + ); + if let Some(t) = item_type { + crdt_state::set_item_type(&new_id, Some(t)); + } + if !depends_on.is_empty() { + crdt_state::set_depends_on(&new_id, &depends_on); + } + + // Verify the new id materialised before we drop the orphan row. + if crdt_state::read_item(&new_id).is_none() { + // The new id didn't take — extremely unlikely after the Stage 1 + // fixes, but if it happens, roll back the new write and leave the + // orphan in place for a human to look at. + delete_item(&new_id); + return Err(format!( + "newly-allocated id '{new_id}' did not register in CRDT" + )); + } + + // Drop the orphan content/shadow row. + delete_item(&orphan.story_id); + + Ok(RecoveryResult { + old_id: orphan.story_id.clone(), + new_id, + name, + }) +} + +/// Pull the YAML front-matter block (everything between the first two `---` +/// lines, if any) as a slice of source lines. +fn front_matter_lines(content: &str) -> Vec<&str> { + let mut iter = content.lines(); + let Some(first) = iter.next() else { + return Vec::new(); + }; + if first.trim() != "---" { + return Vec::new(); + } + let mut out = Vec::new(); + for line in iter { + if line.trim() == "---" { + return out; + } + out.push(line); + } + Vec::new() // no closing `---` +} + +/// Parse `name: "..."` (or unquoted) from the YAML front matter. +fn parse_name_from_front_matter(content: &str) -> Option { + for line in front_matter_lines(content) { + let trimmed = line.trim_start(); + if let Some(rest) = trimmed.strip_prefix("name:") { + let value = rest.trim(); + // Strip optional surrounding quotes. + let unquoted = value + .strip_prefix('"') + .and_then(|s| s.strip_suffix('"')) + .unwrap_or(value); + if unquoted.is_empty() { + return None; + } + return Some(unquoted.to_string()); + } + } + None +} + +/// Parse `type: ` from the YAML front matter. +fn parse_type_from_front_matter(content: &str) -> Option { + for line in front_matter_lines(content) { + let trimmed = line.trim_start(); + if let Some(rest) = trimmed.strip_prefix("type:") { + let value = rest.trim(); + return ItemType::from_str(value); + } + } + None +} + +/// Parse `depends_on: [a, b, c]` from the YAML front matter. +/// +/// Tolerates trailing whitespace and odd spacing; ignores entries that don't +/// parse as `u32`. +fn parse_depends_on_from_front_matter(content: &str) -> Vec { + for line in front_matter_lines(content) { + let trimmed = line.trim_start(); + if let Some(rest) = trimmed.strip_prefix("depends_on:") { + let value = rest.trim(); + // Expect a `[ ... ]` array literal. + let inner = value + .strip_prefix('[') + .and_then(|s| s.strip_suffix(']')) + .unwrap_or(""); + return inner + .split(',') + .filter_map(|s| s.trim().parse::().ok()) + .collect(); + } + } + Vec::new() +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Build a story body that mirrors what create_*_file writes. + fn body(name: &str, item_type: &str, depends_on: Option<&[u32]>) -> String { + let mut s = String::from("---\n"); + s.push_str(&format!("type: {item_type}\n")); + s.push_str(&format!("name: \"{}\"\n", name.replace('"', "\\\""))); + if let Some(deps) = depends_on + && !deps.is_empty() + { + let parts: Vec = deps.iter().map(u32::to_string).collect(); + s.push_str(&format!("depends_on: [{}]\n", parts.join(", "))); + } + s.push_str("---\n\n# Heading\n\n## Acceptance Criteria\n\n- [ ] Real AC\n"); + s + } + + #[test] + fn front_matter_parsers_extract_name_type_and_depends_on() { + let content = body("My Refactor: typed Stage", "refactor", Some(&[42, 99])); + assert_eq!( + parse_name_from_front_matter(&content).as_deref(), + Some("My Refactor: typed Stage") + ); + assert_eq!( + parse_type_from_front_matter(&content), + Some(ItemType::Refactor) + ); + assert_eq!(parse_depends_on_from_front_matter(&content), vec![42, 99]); + } + + #[test] + fn front_matter_parsers_handle_missing_optional_fields() { + let content = body("Plain Story", "story", None); + assert_eq!( + parse_name_from_front_matter(&content).as_deref(), + Some("Plain Story") + ); + assert_eq!( + parse_depends_on_from_front_matter(&content), + Vec::::new() + ); + } + + #[test] + fn front_matter_parsers_handle_no_front_matter() { + let content = "no front matter here\n"; + assert!(parse_name_from_front_matter(content).is_none()); + assert!(parse_type_from_front_matter(content).is_none()); + assert!(parse_depends_on_from_front_matter(content).is_empty()); + } + + #[test] + fn find_returns_empty_when_all_items_have_crdt_entries() { + crdt_state::init_for_test(); + crate::db::ensure_content_store(); + write_item_with_content( + "9800", + "1_backlog", + &body("Healthy", "refactor", None), + ItemMeta::named("Healthy"), + ); + let half = find_half_written_items(); + assert!( + half.iter().all(|h| h.story_id != "9800"), + "9800 should not be reported as half-written: {half:?}" + ); + } + + #[test] + fn recover_lifts_half_written_item_to_a_fresh_id() { + crdt_state::init_for_test(); + crate::db::ensure_content_store(); + + // Seed an item, tombstone it (this leaves a tombstone in the CRDT + // set), then manually write content at the tombstoned id to + // simulate a pre-fix half-write. + let old_id = "9801"; + let content = body("Tombstoned Then Half-Written", "refactor", Some(&[100])); + write_item_with_content( + old_id, + "1_backlog", + &content, + ItemMeta::named("Tombstoned Then Half-Written"), + ); + crdt_state::evict_item(old_id).expect("evict should succeed"); + // Manually re-write content at the now-tombstoned id to mimic the + // pre-fix half-write (write_item will silently reject the CRDT side). + crate::db::content_store::write_content(ContentKey::Story(old_id), &content); + crdt_state::write_item( + old_id, + &crate::pipeline_state::Stage::Backlog, + Some("Tombstoned Then Half-Written"), + None, + None, + None, + None, + None, + None, + ); + assert!( + crdt_state::read_item(old_id).is_none(), + "test setup is wrong — old id should still be invisible after the tombstone-blocked write_item" + ); + + // Sanity: find_half_written_items picks it up. + let half = find_half_written_items(); + let entry = half + .iter() + .find(|h| h.story_id == old_id) + .expect("half-written discovery should find the orphan"); + assert!(entry.tombstoned, "orphan should be flagged as tombstoned"); + assert_eq!(entry.name, "Tombstoned Then Half-Written"); + + // Run the recovery and inspect the mapping. + let results = recover_half_written_items(); + let mapping = results + .iter() + .find(|r| r.old_id == old_id) + .expect("recovery should produce a mapping for the orphan"); + assert_ne!(mapping.new_id, mapping.old_id); + assert_eq!(mapping.name, "Tombstoned Then Half-Written"); + + // The new id should be visible in the CRDT. + let view = crdt_state::read_item(&mapping.new_id) + .expect("new id must be live in CRDT after recovery"); + assert_eq!(view.name(), "Tombstoned Then Half-Written"); + // depends_on was carried across. + assert_eq!(view.depends_on(), &[100]); + // item_type was carried across. + assert_eq!(view.item_type(), Some(ItemType::Refactor)); + + // The old orphan row is gone. + assert!( + read_content(ContentKey::Story(old_id)).is_none(), + "orphan content should be cleared after recovery" + ); + + // Re-running recovery is a no-op (no orphans left). + let again = recover_half_written_items(); + assert!( + again.iter().all(|r| r.old_id != old_id), + "recovery should be idempotent for the same orphan" + ); + } +} diff --git a/server/src/http/mcp/diagnostics/mod.rs b/server/src/http/mcp/diagnostics/mod.rs index ad013c38..56d16746 100644 --- a/server/src/http/mcp/diagnostics/mod.rs +++ b/server/src/http/mcp/diagnostics/mod.rs @@ -115,6 +115,46 @@ pub(crate) fn tool_dump_crdt(args: &Value) -> Result { .map_err(|e| format!("Serialization error: {e}")) } +/// MCP tool: discover or recover half-written pipeline items (bug 1001). +/// +/// A half-written item is one whose content row is in the content store / +/// SQLite shadow but whose CRDT entry is absent or tombstoned. Such items +/// are invisible to every CRDT-driven read path and to `delete_story` / +/// `purge_story`, so they need explicit recovery. +/// +/// With `dry_run = true` (default), returns the list of discovered half- +/// writes without mutating anything. With `dry_run = false`, lifts each +/// orphan onto a fresh non-tombstoned id and returns the old→new mapping. +pub(crate) fn tool_recover_half_written_items(args: &Value) -> Result { + let dry_run = args + .get("dry_run") + .and_then(|v| v.as_bool()) + .unwrap_or(true); + + if dry_run { + let half = crate::db::find_half_written_items(); + return serde_json::to_string_pretty(&json!({ + "dry_run": true, + "found": half, + "count": half.len(), + "message": format!( + "Discovered {} half-written item(s). Re-run with dry_run=false to recover them.", + half.len() + ), + })) + .map_err(|e| format!("Serialization error: {e}")); + } + + let results = crate::db::recover_half_written_items(); + serde_json::to_string_pretty(&json!({ + "dry_run": false, + "recovered": results, + "count": results.len(), + "message": format!("Recovered {} half-written item(s).", results.len()), + })) + .map_err(|e| format!("Serialization error: {e}")) +} + /// MCP tool: return the server version, build hash, and running port. pub(crate) fn tool_get_version(ctx: &AppContext) -> Result { let build_hash = diff --git a/server/src/http/mcp/dispatch.rs b/server/src/http/mcp/dispatch.rs index ad5cecf6..62bb1e73 100644 --- a/server/src/http/mcp/dispatch.rs +++ b/server/src/http/mcp/dispatch.rs @@ -93,6 +93,8 @@ pub async fn dispatch_tool_call( "purge_story" => story_tools::tool_purge_story(&args, ctx), // Debug CRDT dump (story 515) "dump_crdt" => diagnostics::tool_dump_crdt(&args), + // Recover half-written pipeline items (bug 1001) + "recover_half_written_items" => diagnostics::tool_recover_half_written_items(&args), // Read-only peer mesh diagnostics (story 720) "mesh_status" => diagnostics::tool_mesh_status(&args), // Arbitrary pipeline movement diff --git a/server/src/http/mcp/tools_list/mod.rs b/server/src/http/mcp/tools_list/mod.rs index 3234c9ab..971492a9 100644 --- a/server/src/http/mcp/tools_list/mod.rs +++ b/server/src/http/mcp/tools_list/mod.rs @@ -96,6 +96,7 @@ mod tests { assert!(names.contains(&"status")); assert!(names.contains(&"loc_file")); assert!(names.contains(&"dump_crdt")); + assert!(names.contains(&"recover_half_written_items")); assert!(names.contains(&"get_version")); assert!(names.contains(&"remove_criterion")); assert!(names.contains(&"mesh_status")); @@ -106,7 +107,7 @@ mod tests { assert!(names.contains(&"show_epic")); assert!(names.contains(&"freeze_story")); assert!(names.contains(&"unfreeze_story")); - assert_eq!(tools.len(), 74); + assert_eq!(tools.len(), 75); } #[test] diff --git a/server/src/http/mcp/tools_list/system_tools.rs b/server/src/http/mcp/tools_list/system_tools.rs index 7e0f09d5..ef269aa4 100644 --- a/server/src/http/mcp/tools_list/system_tools.rs +++ b/server/src/http/mcp/tools_list/system_tools.rs @@ -288,6 +288,20 @@ pub(super) fn system_tools() -> Vec { "required": [] } }), + json!({ + "name": "recover_half_written_items", + "description": "Discover (and optionally recover) half-written pipeline items — rows whose content is in the SQLite shadow and content store but whose CRDT entry is absent or tombstoned. These are invisible to every CRDT-driven read path and can't be cleaned up by delete_story / purge_story. Recovery moves each orphan's content onto a fresh non-tombstoned id and reports the old→new mapping. Defaults to dry_run=true so you can see what would change before mutating.", + "inputSchema": { + "type": "object", + "properties": { + "dry_run": { + "type": "boolean", + "description": "When true (default), only discover half-written items and report them. When false, perform the recovery." + } + }, + "required": [] + } + }), json!({ "name": "wizard_status", "description": "Return the current setup wizard state: which step is active, and which are done/skipped/pending. Use this to inspect progress before calling wizard_generate, wizard_confirm, wizard_skip, or wizard_retry.",