huskies: merge 1094 bug delete_story leaks zombie rows in pipeline_items shadow table — 176 tombstoned items still report non-terminal stages

This commit is contained in:
dave
2026-05-15 12:21:17 +00:00
parent d944885ce9
commit 2857c3b46b
7 changed files with 201 additions and 8 deletions
+4 -4
View File
@@ -54,10 +54,10 @@ pub use types::{
};
pub use write::{
bump_retry_count, migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs,
migrate_node_claims_to_agent_claims, migrate_story_ids_to_numeric, name_from_story_id,
purge_done_stage_merge_jobs, set_agent, set_depends_on, set_epic, set_item_type, set_name,
set_origin, set_plan_state, set_qa_mode, set_resume_to, set_resume_to_raw, set_retry_count,
write_item,
migrate_node_claims_to_agent_claims, migrate_story_ids_to_numeric,
migrate_zombie_pipeline_rows, name_from_story_id, purge_done_stage_merge_jobs, set_agent,
set_depends_on, set_epic, set_item_type, set_name, set_origin, set_plan_state, set_qa_mode,
set_resume_to, set_resume_to_raw, set_retry_count, write_item,
};
#[cfg(test)]
+150
View File
@@ -705,6 +705,59 @@ pub fn purge_done_stage_merge_jobs() {
slog!("[crdt] Purged {count} stale MergeJob entries for terminal-stage stories");
}
/// Delete `pipeline_items` rows that correspond to CRDT-tombstoned stories.
///
/// Pre-1094 code deleted pipeline_items via a fire-and-forget channel that
/// could be lost on an abrupt restart, leaving rows with non-terminal stage
/// values for stories that no longer exist in the CRDT. This migration
/// removes those zombie rows on startup.
///
/// Idempotent: rows already absent are unaffected; running twice produces the
/// same result.
pub async fn migrate_zombie_pipeline_rows() {
let pool = match crate::db::get_shared_pool() {
Some(p) => p,
None => return,
};
let tombstone_ids = crate::crdt_state::tombstoned_ids();
sweep_zombie_rows(pool, &tombstone_ids).await;
}
/// Inner sweep used by [`migrate_zombie_pipeline_rows`] and its tests.
///
/// Deletes every `pipeline_items` row in `ids` whose stage is not already a
/// terminal value. Returns the number of rows deleted.
#[cfg_attr(test, allow(dead_code))]
pub(crate) async fn sweep_zombie_rows(pool: &sqlx::SqlitePool, ids: &[String]) -> u32 {
if ids.is_empty() {
return 0;
}
let mut cleaned = 0u32;
for story_id in ids {
match sqlx::query(
"DELETE FROM pipeline_items WHERE id = ?1 AND stage NOT IN \
('done','archived','abandoned','superseded','rejected')",
)
.bind(story_id)
.execute(pool)
.await
{
Ok(r) if r.rows_affected() > 0 => cleaned += 1,
Ok(_) => {}
Err(e) => {
slog!(
"[crdt] migrate_zombie_pipeline_rows: failed to delete '{}': {e}",
story_id
);
}
}
}
if cleaned > 0 {
slog!("[crdt] Swept {cleaned} zombie pipeline_items rows for tombstoned stories");
}
cleaned
}
#[cfg(test)]
mod merge_job_migration_tests {
use super::super::super::state::init_for_test;
@@ -909,3 +962,100 @@ mod merge_job_migration_tests {
migrate_merge_job(std::path::Path::new("/nonexistent/pipeline.db"));
}
}
#[cfg(test)]
mod zombie_row_migration_tests {
use super::super::super::state::init_for_test;
use super::*;
use sqlx::Row as _;
async fn make_pool() -> sqlx::SqlitePool {
let options = sqlx::sqlite::SqliteConnectOptions::new()
.filename(":memory:")
.create_if_missing(true);
let pool = sqlx::pool::PoolOptions::new()
.max_connections(1)
.connect_with(options)
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
pool
}
async fn insert_row(pool: &sqlx::SqlitePool, story_id: &str, stage: &str) {
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \
VALUES (?1, ?2, ?3, NULL, 0, NULL, NULL, ?4, ?4)",
)
.bind(story_id)
.bind(story_id)
.bind(stage)
.bind(&now)
.execute(pool)
.await
.unwrap();
}
async fn row_stage(pool: &sqlx::SqlitePool, story_id: &str) -> Option<String> {
sqlx::query("SELECT stage FROM pipeline_items WHERE id = ?1")
.bind(story_id)
.fetch_optional(pool)
.await
.unwrap()
.map(|r| r.get(0))
}
/// Bug 1094 regression: delete a story in `coding` stage, assert the
/// `pipeline_items` row is gone; then re-run the sweep and confirm no
/// further changes (idempotent).
#[tokio::test]
async fn sweep_removes_zombie_coding_row_and_is_idempotent() {
init_for_test();
let pool = make_pool().await;
let story_id = "1094_zombie_regression";
// Seed: insert a pipeline_items row in the "coding" stage.
insert_row(&pool, story_id, "coding").await;
assert_eq!(row_stage(&pool, story_id).await.as_deref(), Some("coding"));
// Tombstone the story in the CRDT (simulate evict_item outcome).
crate::crdt_state::write_item_str(
story_id,
"coding",
Some("Zombie regression story"),
None,
None,
None,
);
crate::crdt_state::evict_item(story_id).ok();
// Run the sweep — row must be deleted.
let deleted = sweep_zombie_rows(&pool, &[story_id.to_string()]).await;
assert_eq!(deleted, 1, "expected one zombie row to be cleaned");
assert!(
row_stage(&pool, story_id).await.is_none(),
"pipeline_items row must be gone after sweep"
);
// Re-run is a no-op (idempotent).
let second = sweep_zombie_rows(&pool, &[story_id.to_string()]).await;
assert_eq!(second, 0, "second sweep must be a no-op");
}
/// Rows already in a terminal stage must be left alone.
#[tokio::test]
async fn sweep_skips_terminal_stage_rows() {
let pool = make_pool().await;
let story_id = "1094_terminal_skip";
insert_row(&pool, story_id, "done").await;
let deleted = sweep_zombie_rows(&pool, &[story_id.to_string()]).await;
assert_eq!(deleted, 0, "terminal-stage row must not be deleted");
assert!(
row_stage(&pool, story_id).await.is_some(),
"terminal-stage row must survive sweep"
);
}
}
+2 -2
View File
@@ -18,6 +18,6 @@ pub use item::{
pub use item::write_item_str;
pub use migrations::{
migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs,
migrate_node_claims_to_agent_claims, migrate_story_ids_to_numeric, name_from_story_id,
purge_done_stage_merge_jobs,
migrate_node_claims_to_agent_claims, migrate_story_ids_to_numeric,
migrate_zombie_pipeline_rows, name_from_story_id, purge_done_stage_merge_jobs,
};
+1 -1
View File
@@ -29,7 +29,7 @@ pub mod shadow_write;
pub use content_store::{ContentKey, all_content_ids, delete_content, read_content, write_content};
pub use ops::{
ItemMeta, delete_item, move_item_stage, next_item_number, sync_item_name,
ItemMeta, delete_item, delete_item_sync, move_item_stage, next_item_number, sync_item_name,
write_item_with_content,
};
pub use shadow_write::{check_schema_drift, get_shared_pool, init};
+36
View File
@@ -198,6 +198,42 @@ pub fn delete_item(story_id: &str) {
}
}
/// Delete a story from the shadow table, awaiting the SQLite write.
///
/// Unlike [`delete_item`], this function issues a direct `DELETE FROM
/// pipeline_items` via the shared pool and awaits the result — so the row
/// is gone before this function returns. Use this from async call sites
/// where durability of the deletion matters (e.g. story deletion, startup
/// migration). Falls back to the fire-and-forget channel when the shared
/// pool is not yet initialised.
pub async fn delete_item_sync(story_id: &str) {
delete_content(ContentKey::Story(story_id));
if let Some(pool) = super::shadow_write::get_shared_pool() {
if let Err(e) = sqlx::query("DELETE FROM pipeline_items WHERE id = ?1")
.bind(story_id)
.execute(pool)
.await
{
crate::slog_warn!(
"[db] Synchronous delete from pipeline_items failed for '{}': {e}",
story_id
);
}
} else if let Some(db) = PIPELINE_DB.get() {
let msg = PipelineWriteMsg {
story_id: story_id.to_string(),
stage: "deleted".to_string(),
name: None,
agent: None,
retry_count: None,
depends_on: None,
content: None,
};
let _ = db.tx.send(msg);
}
}
/// Sync the shadow table's `name` column after a CRDT name-register write.
///
/// Reads the current item from the CRDT (which already holds the new name after
+4 -1
View File
@@ -142,7 +142,10 @@ pub async fn delete_work_item(
}
// 5. Delete from database content store and shadow table.
crate::db::delete_item(story_id);
// Use the synchronous variant so the pipeline_items row is gone before we
// return — the fire-and-forget channel cannot guarantee the DELETE commits
// before a restart, which leaves zombie rows (bug 1094).
crate::db::delete_item_sync(story_id).await;
slog_warn!("[delete_work_item] Deleted '{story_id}' from content store / shadow table");
// 6. Remove the filesystem shadow file from work/N_stage/.
+4
View File
@@ -360,6 +360,10 @@ pub(crate) async fn init_subsystems(app_state: &Arc<SessionState>, cwd: &Path, i
// Story 1052: remove stale MergeJob entries for terminal-stage
// stories so they can never cause "FAILED" labels in the UI.
crdt_state::purge_done_stage_merge_jobs();
// Story 1094: delete pipeline_items rows whose CRDT entry is
// tombstoned but whose row survived with a non-terminal stage
// (pre-1094 fire-and-forget delete could be lost on restart).
crdt_state::migrate_zombie_pipeline_rows().await;
}
}
}