From 2857c3b46bad9ad9c517275932e0159c2805c2ff Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 15 May 2026 12:21:17 +0000 Subject: [PATCH] =?UTF-8?q?huskies:=20merge=201094=20bug=20delete=5Fstory?= =?UTF-8?q?=20leaks=20zombie=20rows=20in=20pipeline=5Fitems=20shadow=20tab?= =?UTF-8?q?le=20=E2=80=94=20176=20tombstoned=20items=20still=20report=20no?= =?UTF-8?q?n-terminal=20stages?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/src/crdt_state/mod.rs | 8 +- server/src/crdt_state/write/migrations.rs | 150 ++++++++++++++++++++++ server/src/crdt_state/write/mod.rs | 4 +- server/src/db/mod.rs | 2 +- server/src/db/ops.rs | 36 ++++++ server/src/service/work_item/delete.rs | 5 +- server/src/startup/project.rs | 4 + 7 files changed, 201 insertions(+), 8 deletions(-) diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index 264a81c9..35656941 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -54,10 +54,10 @@ pub use types::{ }; pub use write::{ bump_retry_count, migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs, - migrate_node_claims_to_agent_claims, migrate_story_ids_to_numeric, name_from_story_id, - purge_done_stage_merge_jobs, set_agent, set_depends_on, set_epic, set_item_type, set_name, - set_origin, set_plan_state, set_qa_mode, set_resume_to, set_resume_to_raw, set_retry_count, - write_item, + migrate_node_claims_to_agent_claims, migrate_story_ids_to_numeric, + migrate_zombie_pipeline_rows, name_from_story_id, purge_done_stage_merge_jobs, set_agent, + set_depends_on, set_epic, set_item_type, set_name, set_origin, set_plan_state, set_qa_mode, + set_resume_to, set_resume_to_raw, set_retry_count, write_item, }; #[cfg(test)] diff --git a/server/src/crdt_state/write/migrations.rs b/server/src/crdt_state/write/migrations.rs index f3f2f2ba..07bc8dd7 100644 --- a/server/src/crdt_state/write/migrations.rs +++ b/server/src/crdt_state/write/migrations.rs @@ -705,6 +705,59 @@ pub fn purge_done_stage_merge_jobs() { slog!("[crdt] Purged {count} stale MergeJob entries for terminal-stage stories"); } +/// Delete `pipeline_items` rows that correspond to CRDT-tombstoned stories. +/// +/// Pre-1094 code deleted pipeline_items via a fire-and-forget channel that +/// could be lost on an abrupt restart, leaving rows with non-terminal stage +/// values for stories that no longer exist in the CRDT. This migration +/// removes those zombie rows on startup. +/// +/// Idempotent: rows already absent are unaffected; running twice produces the +/// same result. +pub async fn migrate_zombie_pipeline_rows() { + let pool = match crate::db::get_shared_pool() { + Some(p) => p, + None => return, + }; + let tombstone_ids = crate::crdt_state::tombstoned_ids(); + sweep_zombie_rows(pool, &tombstone_ids).await; +} + +/// Inner sweep used by [`migrate_zombie_pipeline_rows`] and its tests. +/// +/// Deletes every `pipeline_items` row in `ids` whose stage is not already a +/// terminal value. Returns the number of rows deleted. +#[cfg_attr(test, allow(dead_code))] +pub(crate) async fn sweep_zombie_rows(pool: &sqlx::SqlitePool, ids: &[String]) -> u32 { + if ids.is_empty() { + return 0; + } + let mut cleaned = 0u32; + for story_id in ids { + match sqlx::query( + "DELETE FROM pipeline_items WHERE id = ?1 AND stage NOT IN \ + ('done','archived','abandoned','superseded','rejected')", + ) + .bind(story_id) + .execute(pool) + .await + { + Ok(r) if r.rows_affected() > 0 => cleaned += 1, + Ok(_) => {} + Err(e) => { + slog!( + "[crdt] migrate_zombie_pipeline_rows: failed to delete '{}': {e}", + story_id + ); + } + } + } + if cleaned > 0 { + slog!("[crdt] Swept {cleaned} zombie pipeline_items rows for tombstoned stories"); + } + cleaned +} + #[cfg(test)] mod merge_job_migration_tests { use super::super::super::state::init_for_test; @@ -909,3 +962,100 @@ mod merge_job_migration_tests { migrate_merge_job(std::path::Path::new("/nonexistent/pipeline.db")); } } + +#[cfg(test)] +mod zombie_row_migration_tests { + use super::super::super::state::init_for_test; + use super::*; + use sqlx::Row as _; + + async fn make_pool() -> sqlx::SqlitePool { + let options = sqlx::sqlite::SqliteConnectOptions::new() + .filename(":memory:") + .create_if_missing(true); + let pool = sqlx::pool::PoolOptions::new() + .max_connections(1) + .connect_with(options) + .await + .unwrap(); + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + pool + } + + async fn insert_row(pool: &sqlx::SqlitePool, story_id: &str, stage: &str) { + let now = chrono::Utc::now().to_rfc3339(); + sqlx::query( + "INSERT INTO pipeline_items \ + (id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \ + VALUES (?1, ?2, ?3, NULL, 0, NULL, NULL, ?4, ?4)", + ) + .bind(story_id) + .bind(story_id) + .bind(stage) + .bind(&now) + .execute(pool) + .await + .unwrap(); + } + + async fn row_stage(pool: &sqlx::SqlitePool, story_id: &str) -> Option { + sqlx::query("SELECT stage FROM pipeline_items WHERE id = ?1") + .bind(story_id) + .fetch_optional(pool) + .await + .unwrap() + .map(|r| r.get(0)) + } + + /// Bug 1094 regression: delete a story in `coding` stage, assert the + /// `pipeline_items` row is gone; then re-run the sweep and confirm no + /// further changes (idempotent). + #[tokio::test] + async fn sweep_removes_zombie_coding_row_and_is_idempotent() { + init_for_test(); + let pool = make_pool().await; + let story_id = "1094_zombie_regression"; + + // Seed: insert a pipeline_items row in the "coding" stage. + insert_row(&pool, story_id, "coding").await; + assert_eq!(row_stage(&pool, story_id).await.as_deref(), Some("coding")); + + // Tombstone the story in the CRDT (simulate evict_item outcome). + crate::crdt_state::write_item_str( + story_id, + "coding", + Some("Zombie regression story"), + None, + None, + None, + ); + crate::crdt_state::evict_item(story_id).ok(); + + // Run the sweep — row must be deleted. + let deleted = sweep_zombie_rows(&pool, &[story_id.to_string()]).await; + assert_eq!(deleted, 1, "expected one zombie row to be cleaned"); + assert!( + row_stage(&pool, story_id).await.is_none(), + "pipeline_items row must be gone after sweep" + ); + + // Re-run is a no-op (idempotent). + let second = sweep_zombie_rows(&pool, &[story_id.to_string()]).await; + assert_eq!(second, 0, "second sweep must be a no-op"); + } + + /// Rows already in a terminal stage must be left alone. + #[tokio::test] + async fn sweep_skips_terminal_stage_rows() { + let pool = make_pool().await; + let story_id = "1094_terminal_skip"; + insert_row(&pool, story_id, "done").await; + + let deleted = sweep_zombie_rows(&pool, &[story_id.to_string()]).await; + assert_eq!(deleted, 0, "terminal-stage row must not be deleted"); + assert!( + row_stage(&pool, story_id).await.is_some(), + "terminal-stage row must survive sweep" + ); + } +} diff --git a/server/src/crdt_state/write/mod.rs b/server/src/crdt_state/write/mod.rs index 3ad85917..fb72aa4d 100644 --- a/server/src/crdt_state/write/mod.rs +++ b/server/src/crdt_state/write/mod.rs @@ -18,6 +18,6 @@ pub use item::{ pub use item::write_item_str; pub use migrations::{ migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs, - migrate_node_claims_to_agent_claims, migrate_story_ids_to_numeric, name_from_story_id, - purge_done_stage_merge_jobs, + migrate_node_claims_to_agent_claims, migrate_story_ids_to_numeric, + migrate_zombie_pipeline_rows, name_from_story_id, purge_done_stage_merge_jobs, }; diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index f204ced9..5b89c843 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -29,7 +29,7 @@ pub mod shadow_write; pub use content_store::{ContentKey, all_content_ids, delete_content, read_content, write_content}; pub use ops::{ - ItemMeta, delete_item, move_item_stage, next_item_number, sync_item_name, + ItemMeta, delete_item, delete_item_sync, move_item_stage, next_item_number, sync_item_name, write_item_with_content, }; pub use shadow_write::{check_schema_drift, get_shared_pool, init}; diff --git a/server/src/db/ops.rs b/server/src/db/ops.rs index 25394378..23d717b1 100644 --- a/server/src/db/ops.rs +++ b/server/src/db/ops.rs @@ -198,6 +198,42 @@ pub fn delete_item(story_id: &str) { } } +/// Delete a story from the shadow table, awaiting the SQLite write. +/// +/// Unlike [`delete_item`], this function issues a direct `DELETE FROM +/// pipeline_items` via the shared pool and awaits the result — so the row +/// is gone before this function returns. Use this from async call sites +/// where durability of the deletion matters (e.g. story deletion, startup +/// migration). Falls back to the fire-and-forget channel when the shared +/// pool is not yet initialised. +pub async fn delete_item_sync(story_id: &str) { + delete_content(ContentKey::Story(story_id)); + + if let Some(pool) = super::shadow_write::get_shared_pool() { + if let Err(e) = sqlx::query("DELETE FROM pipeline_items WHERE id = ?1") + .bind(story_id) + .execute(pool) + .await + { + crate::slog_warn!( + "[db] Synchronous delete from pipeline_items failed for '{}': {e}", + story_id + ); + } + } else if let Some(db) = PIPELINE_DB.get() { + let msg = PipelineWriteMsg { + story_id: story_id.to_string(), + stage: "deleted".to_string(), + name: None, + agent: None, + retry_count: None, + depends_on: None, + content: None, + }; + let _ = db.tx.send(msg); + } +} + /// Sync the shadow table's `name` column after a CRDT name-register write. /// /// Reads the current item from the CRDT (which already holds the new name after diff --git a/server/src/service/work_item/delete.rs b/server/src/service/work_item/delete.rs index 02a40b60..5a0ac9d9 100644 --- a/server/src/service/work_item/delete.rs +++ b/server/src/service/work_item/delete.rs @@ -142,7 +142,10 @@ pub async fn delete_work_item( } // 5. Delete from database content store and shadow table. - crate::db::delete_item(story_id); + // Use the synchronous variant so the pipeline_items row is gone before we + // return — the fire-and-forget channel cannot guarantee the DELETE commits + // before a restart, which leaves zombie rows (bug 1094). + crate::db::delete_item_sync(story_id).await; slog_warn!("[delete_work_item] Deleted '{story_id}' from content store / shadow table"); // 6. Remove the filesystem shadow file from work/N_stage/. diff --git a/server/src/startup/project.rs b/server/src/startup/project.rs index f9b0ea4b..3c1d2dec 100644 --- a/server/src/startup/project.rs +++ b/server/src/startup/project.rs @@ -360,6 +360,10 @@ pub(crate) async fn init_subsystems(app_state: &Arc, cwd: &Path, i // Story 1052: remove stale MergeJob entries for terminal-stage // stories so they can never cause "FAILED" labels in the UI. crdt_state::purge_done_stage_merge_jobs(); + // Story 1094: delete pipeline_items rows whose CRDT entry is + // tombstoned but whose row survived with a non-terminal stage + // (pre-1094 fire-and-forget delete could be lost on restart). + crdt_state::migrate_zombie_pipeline_rows().await; } } }