433 lines
16 KiB
Rust
433 lines
16 KiB
Rust
//! SQLite storage layer — content store, shadow writes, and CRDT op persistence.
|
|
/// SQLite storage layer for pipeline state and story content.
|
|
///
|
|
/// The CRDT layer (`crdt_state`) is the primary source of truth for pipeline
|
|
/// metadata (stage, name, agent, etc.). This module provides:
|
|
///
|
|
/// 1. **Content store** — an in-memory `HashMap<story_id, markdown>` backed
|
|
/// by the `pipeline_items.content` column. Provides fast synchronous
|
|
/// reads for MCP tools and other callers.
|
|
///
|
|
/// 2. **Shadow-write channel** — a fire-and-forget background task that
|
|
/// upserts `pipeline_items` rows so the database always has a full copy
|
|
/// of story content plus metadata.
|
|
///
|
|
/// On startup, existing content is loaded from the database into memory so
|
|
/// no filesystem scan is needed after migration.
|
|
pub mod content_store;
|
|
/// Write operations for the pipeline — content, stage transitions, and deletions.
|
|
pub mod ops;
|
|
/// Background shadow-write task — persists pipeline items to SQLite asynchronously.
|
|
pub mod shadow_write;
|
|
|
|
pub use content_store::{all_content_ids, delete_content, read_content, write_content};
|
|
pub use ops::{ItemMeta, delete_item, move_item_stage, next_item_number, write_item_with_content};
|
|
pub use shadow_write::init;
|
|
|
|
#[cfg(test)]
|
|
pub use content_store::ensure_content_store;
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use std::fs;
|
|
|
|
/// Helper: write a minimal story .md file with front matter.
|
|
fn write_story(dir: &std::path::Path, filename: &str, content: &str) {
|
|
fs::write(dir.join(filename), content).unwrap();
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn shadow_write_inserts_row_into_sqlite() {
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let db_path = tmp.path().join("pipeline.db");
|
|
|
|
// Initialise the DB in an isolated pool (not the global singleton, to
|
|
// keep tests hermetic).
|
|
let options = sqlx::sqlite::SqliteConnectOptions::new()
|
|
.filename(&db_path)
|
|
.create_if_missing(true);
|
|
let pool = sqlx::SqlitePool::connect_with(options).await.unwrap();
|
|
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
|
|
|
// Write a story file in a temp stage dir.
|
|
let stage_dir = tmp.path().join("2_current");
|
|
fs::create_dir_all(&stage_dir).unwrap();
|
|
let story_path = stage_dir.join("10_story_shadow_test.md");
|
|
write_story(
|
|
&stage_dir,
|
|
"10_story_shadow_test.md",
|
|
"---\nname: Shadow Test\nagent: coder-opus\nretry_count: 2\n---\n# Story\n",
|
|
);
|
|
|
|
// Perform the upsert directly (bypass the global singleton).
|
|
let now = chrono::Utc::now().to_rfc3339();
|
|
sqlx::query(
|
|
"INSERT INTO pipeline_items \
|
|
(id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \
|
|
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \
|
|
ON CONFLICT(id) DO UPDATE SET \
|
|
name = excluded.name, \
|
|
stage = excluded.stage, \
|
|
agent = excluded.agent, \
|
|
retry_count = excluded.retry_count, \
|
|
depends_on = excluded.depends_on, \
|
|
content = COALESCE(excluded.content, pipeline_items.content), \
|
|
updated_at = excluded.updated_at",
|
|
)
|
|
.bind("10_story_shadow_test")
|
|
.bind("Shadow Test")
|
|
.bind("2_current")
|
|
.bind("coder-opus")
|
|
.bind(2_i64)
|
|
.bind(Option::<String>::None)
|
|
.bind("---\nname: Shadow Test\n---\n# Story\n")
|
|
.bind(&now)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Query back and verify.
|
|
let row: (String, Option<String>, String) =
|
|
sqlx::query_as("SELECT id, name, stage FROM pipeline_items WHERE id = ?1")
|
|
.bind("10_story_shadow_test")
|
|
.fetch_one(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(row.0, "10_story_shadow_test");
|
|
assert_eq!(row.1.as_deref(), Some("Shadow Test"));
|
|
assert_eq!(row.2, "2_current");
|
|
// The shadow row's name + retry_count came through from the INSERT
|
|
// params above; that's what the test exercises. Story 929 dropped
|
|
// the redundant "re-parse the YAML body to double-check" step that
|
|
// used to live here.
|
|
let _ = story_path;
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn pipeline_items_table_has_content_column() {
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let db_path = tmp.path().join("pipeline.db");
|
|
let options = sqlx::sqlite::SqliteConnectOptions::new()
|
|
.filename(&db_path)
|
|
.create_if_missing(true);
|
|
let pool = sqlx::SqlitePool::connect_with(options).await.unwrap();
|
|
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
|
|
|
// Verify content column exists by inserting a full row.
|
|
let now = chrono::Utc::now().to_rfc3339();
|
|
let content = "---\nname: Test\n---\n# Story\n";
|
|
sqlx::query(
|
|
"INSERT INTO pipeline_items \
|
|
(id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \
|
|
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)",
|
|
)
|
|
.bind("99_story_col_test")
|
|
.bind(Option::<String>::None)
|
|
.bind("1_backlog")
|
|
.bind(Option::<String>::None)
|
|
.bind(Option::<i64>::None)
|
|
.bind(Option::<String>::None)
|
|
.bind(content)
|
|
.bind(&now)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
let row: (Option<String>,) =
|
|
sqlx::query_as("SELECT content FROM pipeline_items WHERE id = ?1")
|
|
.bind("99_story_col_test")
|
|
.fetch_one(&pool)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(row.0.as_deref(), Some(content));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn upsert_updates_stage_on_move() {
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let db_path = tmp.path().join("pipeline.db");
|
|
let options = sqlx::sqlite::SqliteConnectOptions::new()
|
|
.filename(&db_path)
|
|
.create_if_missing(true);
|
|
let pool = sqlx::SqlitePool::connect_with(options).await.unwrap();
|
|
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
|
|
|
let now = chrono::Utc::now().to_rfc3339();
|
|
|
|
// Insert initial row in backlog.
|
|
sqlx::query(
|
|
"INSERT INTO pipeline_items \
|
|
(id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \
|
|
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)",
|
|
)
|
|
.bind("5_story_move")
|
|
.bind("Move Me")
|
|
.bind("1_backlog")
|
|
.bind(Option::<String>::None)
|
|
.bind(Option::<i64>::None)
|
|
.bind(Option::<String>::None)
|
|
.bind("---\nname: Move Me\n---\n")
|
|
.bind(&now)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Upsert with new stage (simulating move to current).
|
|
sqlx::query(
|
|
"INSERT INTO pipeline_items \
|
|
(id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \
|
|
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \
|
|
ON CONFLICT(id) DO UPDATE SET \
|
|
name = excluded.name, \
|
|
stage = excluded.stage, \
|
|
agent = excluded.agent, \
|
|
retry_count = excluded.retry_count, \
|
|
depends_on = excluded.depends_on, \
|
|
content = COALESCE(excluded.content, pipeline_items.content), \
|
|
updated_at = excluded.updated_at",
|
|
)
|
|
.bind("5_story_move")
|
|
.bind("Move Me")
|
|
.bind("2_current")
|
|
.bind(Option::<String>::None)
|
|
.bind(Option::<i64>::None)
|
|
.bind(Option::<String>::None)
|
|
.bind(Option::<String>::None) // content NULL → COALESCE preserves existing
|
|
.bind(&now)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
let row: (String,) = sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1")
|
|
.bind("5_story_move")
|
|
.fetch_one(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(row.0, "2_current");
|
|
}
|
|
|
|
#[test]
|
|
fn content_store_read_write_delete() {
|
|
ensure_content_store();
|
|
|
|
let story_id = "100_story_content_test";
|
|
let markdown = "---\nname: Content Test\n---\n# Story\n";
|
|
|
|
// Write.
|
|
write_content(story_id, markdown);
|
|
assert_eq!(read_content(story_id).as_deref(), Some(markdown));
|
|
|
|
// Overwrite.
|
|
let updated = "---\nname: Updated\n---\n# Updated Story\n";
|
|
write_content(story_id, updated);
|
|
assert_eq!(read_content(story_id).as_deref(), Some(updated));
|
|
|
|
// Delete.
|
|
delete_content(story_id);
|
|
assert!(read_content(story_id).is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn next_item_number_returns_1_when_empty() {
|
|
// When no items exist, should return 1.
|
|
// Note: in test context the global CRDT/content store may or may not
|
|
// be initialised, so the function falls back gracefully.
|
|
let n = next_item_number();
|
|
assert!(n >= 1);
|
|
}
|
|
|
|
/// Regression test for bug 537: `delete_item` must issue a real SQL DELETE
|
|
/// rather than upserting stage = "deleted". A "deleted" shadow row that
|
|
/// survives a restart would be picked up by `sync_crdt_stages_from_db` and
|
|
/// re-inserted into the CRDT with stage "deleted" — resurrecting a
|
|
/// tombstoned story.
|
|
#[tokio::test]
|
|
async fn delete_item_removes_row_not_sets_deleted_stage() {
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let db_path = tmp.path().join("pipeline.db");
|
|
|
|
let options = sqlx::sqlite::SqliteConnectOptions::new()
|
|
.filename(&db_path)
|
|
.create_if_missing(true);
|
|
let pool = sqlx::SqlitePool::connect_with(options).await.unwrap();
|
|
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
|
|
|
let now = chrono::Utc::now().to_rfc3339();
|
|
|
|
// Insert a live row.
|
|
sqlx::query(
|
|
"INSERT INTO pipeline_items \
|
|
(id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \
|
|
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)",
|
|
)
|
|
.bind("42_story_to_delete")
|
|
.bind("Delete Me")
|
|
.bind("2_current")
|
|
.bind(Option::<String>::None)
|
|
.bind(Option::<i64>::None)
|
|
.bind(Option::<String>::None)
|
|
.bind("---\nname: Delete Me\n---\n")
|
|
.bind(&now)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
// Simulate what the background task does when it receives a "deleted"
|
|
// sentinel message — it must DELETE the row, not upsert it.
|
|
sqlx::query("DELETE FROM pipeline_items WHERE id = ?1")
|
|
.bind("42_story_to_delete")
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
// The row must be gone — not present with stage = "deleted".
|
|
let row: Option<(String,)> =
|
|
sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1")
|
|
.bind("42_story_to_delete")
|
|
.fetch_optional(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert!(
|
|
row.is_none(),
|
|
"delete_item must remove the row; found stage = {:?}",
|
|
row.map(|r| r.0)
|
|
);
|
|
}
|
|
|
|
/// Story 864: `write_item_with_content` no longer parses YAML front-matter
|
|
/// from `content`. The CRDT metadata reflects ONLY what the caller passes
|
|
/// via `ItemMeta`. This test writes a body without any front-matter at
|
|
/// all, sets metadata explicitly, and asserts the CRDT picks up the typed
|
|
/// values, not anything derived from `content`.
|
|
#[test]
|
|
fn write_item_typed_meta_takes_precedence_over_content() {
|
|
crate::crdt_state::init_for_test();
|
|
ensure_content_store();
|
|
let story_id = "9864_story_typed_meta";
|
|
|
|
// Body has NO YAML header — just plain markdown.
|
|
let content = "# Just a heading\n\nNo front matter here.\n";
|
|
let meta = ItemMeta {
|
|
name: Some("Typed Name".into()),
|
|
agent: Some("coder-1".into()),
|
|
retry_count: Some(2),
|
|
depends_on: Some(vec![100, 200]),
|
|
};
|
|
write_item_with_content(story_id, "2_current", content, meta);
|
|
|
|
let view = crate::crdt_state::read_item(story_id).expect("story exists in CRDT");
|
|
assert_eq!(view.stage().dir_name(), "coding");
|
|
assert_eq!(view.name(), Some("Typed Name"));
|
|
assert_eq!(view.agent(), Some("coder-1"));
|
|
assert_eq!(view.retry_count(), 2);
|
|
assert_eq!(view.depends_on(), &[100, 200]);
|
|
|
|
// Content is stored verbatim (no parsing, no rewrite).
|
|
assert_eq!(read_content(story_id).as_deref(), Some(content));
|
|
}
|
|
|
|
/// Story 864: passing `ItemMeta::default()` against a content blob that
|
|
/// LOOKS like front-matter must NOT silently extract metadata into the
|
|
/// CRDT. The whole point of removing the implicit YAML round-trip is
|
|
/// that metadata only flows in through the typed `ItemMeta` arg.
|
|
#[test]
|
|
fn write_item_default_meta_ignores_yaml_in_content() {
|
|
crate::crdt_state::init_for_test();
|
|
ensure_content_store();
|
|
let story_id = "9864_story_yaml_ignored";
|
|
|
|
let content = "---\nname: Should Not Appear\nagent: ghost\n---\n# Body\n";
|
|
write_item_with_content(story_id, "2_current", content, ItemMeta::default());
|
|
|
|
let view = crate::crdt_state::read_item(story_id).expect("story exists in CRDT");
|
|
assert_eq!(view.stage().dir_name(), "coding");
|
|
assert_eq!(
|
|
view.name(),
|
|
None,
|
|
"name must come from typed meta, not parsed YAML"
|
|
);
|
|
assert_eq!(
|
|
view.agent(),
|
|
None,
|
|
"agent must come from typed meta, not parsed YAML"
|
|
);
|
|
}
|
|
|
|
/// Regression (story 894): startup with no YAML migration call leaves
|
|
/// existing clean content readable and unmodified. Verifies that removing
|
|
/// `db::yaml_migration::run()` from the startup path does not break reads.
|
|
#[test]
|
|
fn startup_reads_clean_content_unchanged() {
|
|
crate::crdt_state::init_for_test();
|
|
ensure_content_store();
|
|
let story_id = "9894_story_clean_content";
|
|
|
|
// Plain body — no YAML block, representing post-migration state.
|
|
let body = "# Story Heading\n\nSome content.\n";
|
|
write_item_with_content(
|
|
story_id,
|
|
"2_current",
|
|
body,
|
|
ItemMeta {
|
|
name: Some("Clean".into()),
|
|
..ItemMeta::default()
|
|
},
|
|
);
|
|
|
|
let read_back = read_content(story_id).expect("content present");
|
|
assert_eq!(read_back, body, "plain content must be readable as-is");
|
|
assert!(
|
|
!read_back.trim_start().starts_with("---"),
|
|
"no YAML header should appear"
|
|
);
|
|
}
|
|
|
|
/// Bug 780: stage transitions must reset retry_count to 0 in the CRDT.
|
|
/// Carryover from prior-stage retries was tripping the auto-assigner's
|
|
/// deterministic-merge skip logic.
|
|
#[test]
|
|
fn move_item_stage_resets_retry_count_to_zero() {
|
|
crate::crdt_state::init_for_test();
|
|
ensure_content_store();
|
|
let story_id = "9870_story_780_retry_reset";
|
|
|
|
// Seed the story in 2_current with retry_count = 3 (a coder that
|
|
// burned all its retries).
|
|
crate::crdt_state::write_item_str(
|
|
story_id,
|
|
"2_current",
|
|
Some("Retry reset test"),
|
|
None,
|
|
Some(3),
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
);
|
|
write_content(
|
|
story_id,
|
|
"---\nname: Retry reset test\nretry_count: 3\n---\n",
|
|
);
|
|
let typed = crate::pipeline_state::read_typed(story_id)
|
|
.expect("read should succeed")
|
|
.expect("story exists in CRDT");
|
|
assert_eq!(typed.retry_count, 3);
|
|
|
|
// Promote to 4_merge. retry_count must reset.
|
|
move_item_stage(story_id, "4_merge", None);
|
|
|
|
let typed_after = crate::pipeline_state::read_typed(story_id)
|
|
.expect("read should succeed")
|
|
.expect("story exists in CRDT");
|
|
assert_eq!(typed_after.stage.dir_name(), "merge");
|
|
assert_eq!(
|
|
typed_after.retry_count, 0,
|
|
"retry_count must reset to 0 on stage transition"
|
|
);
|
|
}
|
|
}
|