Files
huskies/server/src/db/mod.rs
T

806 lines
32 KiB
Rust

//! SQLite storage layer — content store, shadow writes, and CRDT op persistence.
/// SQLite storage layer for pipeline state and story content.
///
/// The CRDT layer (`crdt_state`) is the primary source of truth for pipeline
/// metadata (stage, name, agent, etc.). This module provides:
///
/// 1. **Content store** — an in-memory `HashMap<story_id, markdown>` backed
/// by the `pipeline_items.content` column. Provides fast synchronous
/// reads for MCP tools and other callers.
///
/// 2. **Shadow-write channel** — a fire-and-forget background task that
/// upserts `pipeline_items` rows so the database always has a full copy
/// of story content plus metadata.
///
/// On startup, existing content is loaded from the database into memory so
/// no filesystem scan is needed after migration.
pub mod content_store;
/// Content-store garbage collection: TransitionFired subscriber and startup sweep.
pub mod gc;
/// Write operations for the pipeline — content, stage transitions, and deletions.
pub mod ops;
/// Recovery for half-written pipeline items (bug 1001 backfill).
///
/// Exposed via `diagnostics::tool_find_orphaned_items` and
/// `diagnostics::tool_recover_half_written_items` MCP tools.
pub mod recover;
/// Background shadow-write task — persists pipeline items to SQLite asynchronously.
pub mod shadow_write;
pub use content_store::{ContentKey, all_content_ids, delete_content, read_content, write_content};
pub use ops::{ItemMeta, delete_item, move_item_stage, next_item_number, write_item_with_content};
pub use shadow_write::{check_schema_drift, get_shared_pool, init};
#[cfg(test)]
pub use content_store::ensure_content_store;
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
/// Helper: write a minimal story .md file with front matter.
fn write_story(dir: &std::path::Path, filename: &str, content: &str) {
fs::write(dir.join(filename), content).unwrap();
}
#[tokio::test]
async fn shadow_write_inserts_row_into_sqlite() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
// Initialise the DB in an isolated pool (not the global singleton, to
// keep tests hermetic).
let options = sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = sqlx::SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
// Write a story file in a temp stage dir.
let stage_dir = tmp.path().join("2_current");
fs::create_dir_all(&stage_dir).unwrap();
let story_path = stage_dir.join("10_story_shadow_test.md");
write_story(
&stage_dir,
"10_story_shadow_test.md",
"---\nname: Shadow Test\nagent: coder-opus\nretry_count: 2\n---\n# Story\n",
);
// Perform the upsert directly (bypass the global singleton).
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \
ON CONFLICT(id) DO UPDATE SET \
name = excluded.name, \
stage = excluded.stage, \
agent = excluded.agent, \
retry_count = excluded.retry_count, \
depends_on = excluded.depends_on, \
content = COALESCE(excluded.content, pipeline_items.content), \
updated_at = excluded.updated_at",
)
.bind("10_story_shadow_test")
.bind("Shadow Test")
.bind("2_current")
.bind("coder-opus")
.bind(2_i64)
.bind(Option::<String>::None)
.bind("---\nname: Shadow Test\n---\n# Story\n")
.bind(&now)
.execute(&pool)
.await
.unwrap();
// Query back and verify.
let row: (String, Option<String>, String) =
sqlx::query_as("SELECT id, name, stage FROM pipeline_items WHERE id = ?1")
.bind("10_story_shadow_test")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, "10_story_shadow_test");
assert_eq!(row.1.as_deref(), Some("Shadow Test"));
assert_eq!(row.2, "2_current");
// The shadow row's name + retry_count came through from the INSERT
// params above; that's what the test exercises. Story 929 dropped
// the redundant "re-parse the YAML body to double-check" step that
// used to live here.
let _ = story_path;
}
#[tokio::test]
async fn pipeline_items_table_has_content_column() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
let options = sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = sqlx::SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
// Verify content column exists by inserting a full row.
let now = chrono::Utc::now().to_rfc3339();
let content = "---\nname: Test\n---\n# Story\n";
sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)",
)
.bind("99_story_col_test")
.bind(Option::<String>::None)
.bind("1_backlog")
.bind(Option::<String>::None)
.bind(Option::<i64>::None)
.bind(Option::<String>::None)
.bind(content)
.bind(&now)
.execute(&pool)
.await
.unwrap();
let row: (Option<String>,) =
sqlx::query_as("SELECT content FROM pipeline_items WHERE id = ?1")
.bind("99_story_col_test")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0.as_deref(), Some(content));
}
#[tokio::test]
async fn upsert_updates_stage_on_move() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
let options = sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = sqlx::SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
let now = chrono::Utc::now().to_rfc3339();
// Insert initial row in backlog.
sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)",
)
.bind("5_story_move")
.bind("Move Me")
.bind("1_backlog")
.bind(Option::<String>::None)
.bind(Option::<i64>::None)
.bind(Option::<String>::None)
.bind("---\nname: Move Me\n---\n")
.bind(&now)
.execute(&pool)
.await
.unwrap();
// Upsert with new stage (simulating move to current).
sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \
ON CONFLICT(id) DO UPDATE SET \
name = excluded.name, \
stage = excluded.stage, \
agent = excluded.agent, \
retry_count = excluded.retry_count, \
depends_on = excluded.depends_on, \
content = COALESCE(excluded.content, pipeline_items.content), \
updated_at = excluded.updated_at",
)
.bind("5_story_move")
.bind("Move Me")
.bind("2_current")
.bind(Option::<String>::None)
.bind(Option::<i64>::None)
.bind(Option::<String>::None)
.bind(Option::<String>::None) // content NULL → COALESCE preserves existing
.bind(&now)
.execute(&pool)
.await
.unwrap();
let row: (String,) = sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1")
.bind("5_story_move")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, "2_current");
}
#[test]
fn content_store_read_write_delete() {
ensure_content_store();
let story_id = "100_story_content_test";
let markdown = "---\nname: Content Test\n---\n# Story\n";
// Write.
write_content(ContentKey::Story(story_id), markdown);
assert_eq!(
read_content(ContentKey::Story(story_id)).as_deref(),
Some(markdown)
);
// Overwrite.
let updated = "---\nname: Updated\n---\n# Updated Story\n";
write_content(ContentKey::Story(story_id), updated);
assert_eq!(
read_content(ContentKey::Story(story_id)).as_deref(),
Some(updated)
);
// Delete.
delete_content(ContentKey::Story(story_id));
assert!(read_content(ContentKey::Story(story_id)).is_none());
}
#[test]
fn next_item_number_returns_1_when_empty() {
// When no items exist, should return 1.
// Note: in test context the global CRDT/content store may or may not
// be initialised, so the function falls back gracefully.
let n = next_item_number();
assert!(n >= 1);
}
/// Regression test for bug 1001: `next_item_number` must not hand out a
/// tombstoned ID. Without this fix, an `evict_item` followed by a fresh
/// create reuses the tombstoned numeric slot, producing a half-written
/// item (content store + shadow DB accept; CRDT silently rejects).
#[test]
fn next_item_number_skips_tombstoned_ids() {
crate::crdt_state::init_for_test();
ensure_content_store();
// Seed a high-numbered item via the normal write path so it lands in
// the CRDT index.
let high_id = "9990";
write_item_with_content(
high_id,
"1_backlog",
"---\nname: To Be Tombstoned\n---\n",
ItemMeta::named("To Be Tombstoned"),
);
// Tombstone it. This adds 9990 to state.tombstones and clears the
// content-store row, so without the fix `next_item_number` would
// return 9990 again because it's invisible to both visible-index and
// content-id scans.
crate::crdt_state::evict_item(high_id).expect("evict should succeed");
assert!(crate::crdt_state::is_tombstoned(high_id));
let next = next_item_number();
assert!(
next > 9990,
"next_item_number must skip past tombstoned id 9990, got {next}"
);
}
/// is_tombstoned reflects the post-evict state.
#[test]
fn is_tombstoned_returns_true_after_evict() {
crate::crdt_state::init_for_test();
ensure_content_store();
let id = "9991";
write_item_with_content(
id,
"1_backlog",
"---\nname: Soon To Vanish\n---\n",
ItemMeta::named("Soon To Vanish"),
);
assert!(!crate::crdt_state::is_tombstoned(id));
crate::crdt_state::evict_item(id).expect("evict should succeed");
assert!(crate::crdt_state::is_tombstoned(id));
}
/// Regression test for bug 537: `delete_item` must issue a real SQL DELETE
/// rather than upserting stage = "deleted". A "deleted" shadow row that
/// survives a restart would be picked up by `sync_crdt_stages_from_db` and
/// re-inserted into the CRDT with stage "deleted" — resurrecting a
/// tombstoned story.
#[tokio::test]
async fn delete_item_removes_row_not_sets_deleted_stage() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
let options = sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = sqlx::SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
let now = chrono::Utc::now().to_rfc3339();
// Insert a live row.
sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)",
)
.bind("42_story_to_delete")
.bind("Delete Me")
.bind("2_current")
.bind(Option::<String>::None)
.bind(Option::<i64>::None)
.bind(Option::<String>::None)
.bind("---\nname: Delete Me\n---\n")
.bind(&now)
.execute(&pool)
.await
.unwrap();
// Simulate what the background task does when it receives a "deleted"
// sentinel message — it must DELETE the row, not upsert it.
sqlx::query("DELETE FROM pipeline_items WHERE id = ?1")
.bind("42_story_to_delete")
.execute(&pool)
.await
.unwrap();
// The row must be gone — not present with stage = "deleted".
let row: Option<(String,)> =
sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1")
.bind("42_story_to_delete")
.fetch_optional(&pool)
.await
.unwrap();
assert!(
row.is_none(),
"delete_item must remove the row; found stage = {:?}",
row.map(|r| r.0)
);
}
/// Story 864: `write_item_with_content` no longer parses YAML front-matter
/// from `content`. The CRDT metadata reflects ONLY what the caller passes
/// via `ItemMeta`. This test writes a body without any front-matter at
/// all, sets metadata explicitly, and asserts the CRDT picks up the typed
/// values, not anything derived from `content`.
#[test]
fn write_item_typed_meta_takes_precedence_over_content() {
crate::crdt_state::init_for_test();
ensure_content_store();
let story_id = "9864_story_typed_meta";
// Body has NO YAML header — just plain markdown.
let content = "# Just a heading\n\nNo front matter here.\n";
let meta = ItemMeta {
name: Some("Typed Name".into()),
agent: Some("coder-1".into()),
depends_on: Some(vec![100, 200]),
};
write_item_with_content(story_id, "2_current", content, meta);
let view = crate::crdt_state::read_item(story_id).expect("story exists in CRDT");
assert_eq!(view.stage().dir_name(), "coding");
assert_eq!(view.name(), "Typed Name");
assert_eq!(view.agent(), Some(crate::config::AgentName::Coder1));
assert_eq!(view.retry_count(), 0);
assert_eq!(view.depends_on(), &[100, 200]);
// Content is stored verbatim (no parsing, no rewrite).
assert_eq!(
read_content(ContentKey::Story(story_id)).as_deref(),
Some(content)
);
}
/// Regression: root cause of the 2026-05-14 21:07 production outage.
///
/// A headless agent on a feature branch (whose binary includes a new
/// sqlx migration) must NEVER apply that migration to the production
/// pipeline.db. Verify that opening an agent-local DB and running
/// migrations on it leaves the production DB's `_sqlx_migrations` table
/// unchanged.
///
/// The enforcement mechanism is in `init_subsystems(is_agent=true)`, which
/// redirects to a temp path. This test validates the SQLite isolation
/// property: migrations applied to one file are confined to that file.
#[tokio::test]
async fn agent_db_isolation_does_not_affect_production_db() {
let tmp = tempfile::tempdir().unwrap();
let prod_db_path = tmp.path().join("production.db");
let agent_db_path = tmp.path().join("agent_temp.db");
// Set up the production DB — apply the current compiled-in migrations.
let prod_opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(&prod_db_path)
.create_if_missing(true);
let prod_pool = sqlx::SqlitePool::connect_with(prod_opts).await.unwrap();
sqlx::migrate!("./migrations")
.run(&prod_pool)
.await
.unwrap();
// Record the migration versions present in the production DB.
let before: Vec<(i64,)> =
sqlx::query_as("SELECT version FROM _sqlx_migrations ORDER BY version")
.fetch_all(&prod_pool)
.await
.unwrap();
// Simulate the agent opening its own isolated DB and running migrations.
let agent_opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(&agent_db_path)
.create_if_missing(true);
let agent_pool = sqlx::SqlitePool::connect_with(agent_opts).await.unwrap();
sqlx::migrate!("./migrations")
.run(&agent_pool)
.await
.unwrap();
// Production DB must be completely unaffected by the agent's migration run.
let after: Vec<(i64,)> =
sqlx::query_as("SELECT version FROM _sqlx_migrations ORDER BY version")
.fetch_all(&prod_pool)
.await
.unwrap();
assert_eq!(
before, after,
"agent opening its own DB must not alter the production DB migration table"
);
}
/// Verify that `check_schema_drift` returns an empty list when all
/// migrations in the database are recognised by this binary.
#[tokio::test]
async fn check_schema_drift_empty_when_all_known() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("drift_test.db");
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
let drift = super::shadow_write::check_schema_drift(&pool).await;
assert!(
drift.is_empty(),
"no drift expected when DB matches the compiled-in migration set"
);
}
/// Verify that `check_schema_drift` identifies a manually-inserted
/// migration row that is not part of the compiled-in set.
#[tokio::test]
async fn check_schema_drift_detects_unknown_migration() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("drift_future.db");
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
// Inject a fake "future" migration that no binary compiled today would know.
let fake_checksum: Vec<u8> = vec![0u8; 20];
sqlx::query(
"INSERT INTO _sqlx_migrations \
(version, description, installed_on, success, checksum, execution_time) \
VALUES (99999999999999, 'future_migration', '2099-01-01T00:00:00Z', 1, ?1, 0)",
)
.bind(&fake_checksum)
.execute(&pool)
.await
.unwrap();
let drift = super::shadow_write::check_schema_drift(&pool).await;
assert_eq!(drift.len(), 1, "exactly one unknown migration expected");
assert_eq!(drift[0].version, 99999999999999_i64);
assert_eq!(drift[0].description, "future_migration");
}
/// Story 864: passing `ItemMeta::default()` against a content blob that
/// LOOKS like front-matter must NOT silently extract metadata into the
/// CRDT. The whole point of removing the implicit YAML round-trip is
/// that metadata only flows in through the typed `ItemMeta` arg.
#[test]
fn write_item_default_meta_ignores_yaml_in_content() {
crate::crdt_state::init_for_test();
ensure_content_store();
let story_id = "9864_story_yaml_ignored";
let content = "---\nname: Should Not Appear\nagent: ghost\n---\n# Body\n";
write_item_with_content(story_id, "2_current", content, ItemMeta::default());
// Nameless items are filtered out by read_item (AC 5: nameless = malformed).
assert!(
crate::crdt_state::read_item(story_id).is_none(),
"name must come from typed meta, not parsed YAML — nameless items must not be surfaced"
);
}
/// Regression (story 894): startup with no YAML migration call leaves
/// existing clean content readable and unmodified. Verifies that removing
/// `db::yaml_migration::run()` from the startup path does not break reads.
#[test]
fn startup_reads_clean_content_unchanged() {
crate::crdt_state::init_for_test();
ensure_content_store();
let story_id = "9894_story_clean_content";
// Plain body — no YAML block, representing post-migration state.
let body = "# Story Heading\n\nSome content.\n";
write_item_with_content(
story_id,
"2_current",
body,
ItemMeta {
name: Some("Clean".into()),
..ItemMeta::default()
},
);
let read_back = read_content(ContentKey::Story(story_id)).expect("content present");
assert_eq!(read_back, body, "plain content must be readable as-is");
assert!(
!read_back.trim_start().starts_with("---"),
"no YAML header should appear"
);
}
/// Bug 780: stage transitions must reset retry_count to 0 in the CRDT.
/// Carryover from prior-stage retries was tripping the auto-assigner's
/// deterministic-merge skip logic.
#[test]
fn move_item_stage_resets_retry_count_to_zero() {
crate::crdt_state::init_for_test();
ensure_content_store();
let story_id = "9870_story_780_retry_reset";
// Seed the story in 2_current with retry_count = 3 (a coder that
// burned all its retries).
crate::crdt_state::write_item_str(
story_id,
"2_current",
Some("Retry reset test"),
None,
None,
None,
);
crate::crdt_state::set_retry_count(story_id, 3);
let typed = crate::pipeline_state::read_typed(story_id)
.expect("read should succeed")
.expect("story exists in CRDT");
assert_eq!(typed.retry_count(), 3);
// Promote to 4_merge. retry_count must reset.
move_item_stage(story_id, "4_merge", None);
let typed_after = crate::pipeline_state::read_typed(story_id)
.expect("read should succeed")
.expect("story exists in CRDT");
assert_eq!(typed_after.stage.dir_name(), "merge");
assert_eq!(
typed_after.retry_count(),
0,
"retry_count must reset to 0 on stage transition"
);
}
/// Story 1087, AC2: the split-stage migration projects every supported
/// wire-form `stage` string into the canonical `(pipeline, status)` pair.
/// The fixture covers each Stage variant (and the legacy numeric-prefix
/// directory names retained for back-compat).
#[tokio::test]
async fn split_stage_migration_backfills_pipeline_and_status_for_every_variant() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
// (stage written by older code, expected pipeline, expected status)
let fixture: &[(&str, &str, &str)] = &[
("upcoming", "backlog", "active"),
("backlog", "backlog", "active"),
("coding", "coding", "active"),
("blocked", "coding", "blocked"),
("qa", "qa", "active"),
("review_hold", "qa", "review-hold"),
("merge", "merge", "active"),
("merge_failure", "merge", "merge-failure"),
("merge_failure_final", "merge", "merge-failure-final"),
("done", "done", "done"),
("abandoned", "closed", "abandoned"),
("superseded", "closed", "superseded"),
("rejected", "closed", "rejected"),
("archived", "archived", "active"),
("frozen", "coding", "frozen"),
// Legacy numeric-prefix directory names.
("1_backlog", "backlog", "active"),
("2_current", "coding", "active"),
("3_qa", "qa", "active"),
("4_merge", "merge", "active"),
("5_done", "done", "done"),
("6_archived", "archived", "active"),
];
let now = chrono::Utc::now().to_rfc3339();
for (idx, (stage, _, _)) in fixture.iter().enumerate() {
let id = format!("1087_fixture_{idx}");
sqlx::query(
"INSERT INTO pipeline_items \
(id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)",
)
.bind(&id)
.bind("fixture")
.bind(*stage)
.bind(Option::<String>::None)
.bind(Option::<i64>::None)
.bind(Option::<String>::None)
.bind("---\nname: fixture\n---\n")
.bind(&now)
.execute(&pool)
.await
.unwrap();
}
// Force the split-stage backfill to run against the rows we just
// inserted. In production this is `sqlx::migrate!`'s job, but the
// sqlx migrator only runs migrations once per DB and they were already
// applied at the top of the test before any rows existed. Reissuing
// the backfill statements is the migration logic under test.
sqlx::query(
"UPDATE pipeline_items SET pipeline = CASE stage \
WHEN 'upcoming' THEN 'backlog' \
WHEN 'backlog' THEN 'backlog' \
WHEN '1_backlog' THEN 'backlog' \
WHEN 'coding' THEN 'coding' \
WHEN 'blocked' THEN 'coding' \
WHEN '2_current' THEN 'coding' \
WHEN 'qa' THEN 'qa' \
WHEN 'review_hold' THEN 'qa' \
WHEN '3_qa' THEN 'qa' \
WHEN 'merge' THEN 'merge' \
WHEN 'merge_failure' THEN 'merge' \
WHEN 'merge_failure_final' THEN 'merge' \
WHEN '4_merge' THEN 'merge' \
WHEN 'done' THEN 'done' \
WHEN '5_done' THEN 'done' \
WHEN 'abandoned' THEN 'closed' \
WHEN 'superseded' THEN 'closed' \
WHEN 'rejected' THEN 'closed' \
WHEN 'archived' THEN 'archived' \
WHEN '6_archived' THEN 'archived' \
WHEN 'frozen' THEN 'coding' \
ELSE '' END",
)
.execute(&pool)
.await
.unwrap();
sqlx::query(
"UPDATE pipeline_items SET status = CASE stage \
WHEN 'frozen' THEN 'frozen' \
WHEN 'review_hold' THEN 'review-hold' \
WHEN 'blocked' THEN 'blocked' \
WHEN 'merge_failure' THEN 'merge-failure' \
WHEN 'merge_failure_final' THEN 'merge-failure-final' \
WHEN 'abandoned' THEN 'abandoned' \
WHEN 'superseded' THEN 'superseded' \
WHEN 'rejected' THEN 'rejected' \
WHEN 'done' THEN 'done' \
WHEN '5_done' THEN 'done' \
ELSE 'active' END",
)
.execute(&pool)
.await
.unwrap();
for (idx, (stage_input, expect_pipeline, expect_status)) in fixture.iter().enumerate() {
let id = format!("1087_fixture_{idx}");
let row: (String, String) =
sqlx::query_as("SELECT pipeline, status FROM pipeline_items WHERE id = ?1")
.bind(&id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
row.0, *expect_pipeline,
"stage {stage_input:?} should backfill pipeline to {expect_pipeline:?}, got {:?}",
row.0
);
assert_eq!(
row.1, *expect_status,
"stage {stage_input:?} should backfill status to {expect_status:?}, got {:?}",
row.1
);
}
}
/// Story 1087, AC1: `shadow_write::init` writes a timestamped backup of
/// pipeline.db before the split-stage migration applies, and skips the
/// backup on subsequent restarts (after the migration is recorded).
#[tokio::test]
async fn pre_pipeline_status_backup_only_runs_once() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
// Seed a "pre-1087" DB: open without applying the split-stage migration.
// We do this by opening with `create_if_missing` and running only the
// legacy migrations — but the simplest way to simulate that here is to
// hand-craft a DB containing an `_sqlx_migrations` table that lists
// every migration EXCEPT the split-stage one.
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
// Apply migrations the normal way, then delete the split-stage row so
// the backup branch fires on the next `init`.
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
sqlx::query("DELETE FROM _sqlx_migrations WHERE version = 20260515000000")
.execute(&pool)
.await
.unwrap();
pool.close().await;
// First call: backup branch fires, side-car file appears.
super::shadow_write::backup_pre_pipeline_status(&db_path).await;
let backups: Vec<_> = std::fs::read_dir(tmp.path())
.unwrap()
.filter_map(Result::ok)
.filter(|e| {
e.file_name()
.to_string_lossy()
.contains(".pre-pipeline-status.")
})
.collect();
assert_eq!(
backups.len(),
1,
"expected exactly one .pre-pipeline-status backup, got {}",
backups.len()
);
// Re-apply the migration so the marker row is back, simulating a
// post-migration server restart.
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(false);
let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
let fake_checksum: Vec<u8> = vec![0u8; 20];
sqlx::query(
"INSERT INTO _sqlx_migrations \
(version, description, installed_on, success, checksum, execution_time) \
VALUES (20260515000000, 'split_stage_into_pipeline_status', '2026-05-15T00:00:00Z', 1, ?1, 0)",
)
.bind(&fake_checksum)
.execute(&pool)
.await
.unwrap();
pool.close().await;
// Second call: no new backup written.
super::shadow_write::backup_pre_pipeline_status(&db_path).await;
let backups_after: Vec<_> = std::fs::read_dir(tmp.path())
.unwrap()
.filter_map(Result::ok)
.filter(|e| {
e.file_name()
.to_string_lossy()
.contains(".pre-pipeline-status.")
})
.collect();
assert_eq!(
backups_after.len(),
1,
"post-migration init must not create another backup; got {} backups",
backups_after.len()
);
}
}