diff --git a/server/migrations/20260515000000_split_stage_into_pipeline_status.sql b/server/migrations/20260515000000_split_stage_into_pipeline_status.sql new file mode 100644 index 00000000..bff482e8 --- /dev/null +++ b/server/migrations/20260515000000_split_stage_into_pipeline_status.sql @@ -0,0 +1,56 @@ +-- Story 1087: split the legacy `stage` column on `pipeline_items` into a +-- `(pipeline, status)` pair so the read side no longer needs to re-derive the +-- display column and badge from the stage string. +-- +-- The migration is additive: `stage` is retained for backwards compatibility +-- while remaining Step E callers are migrated. The backup of `pipeline.db` +-- written by `shadow_write::init` immediately before this migration runs is +-- the recovery path if the backfill produces an unexpected projection. + +ALTER TABLE pipeline_items ADD COLUMN pipeline TEXT NOT NULL DEFAULT ''; +ALTER TABLE pipeline_items ADD COLUMN status TEXT NOT NULL DEFAULT ''; + +-- Backfill `pipeline` from the existing `stage` column. Every wire-form +-- stage string emitted by `stage_dir_name` maps to exactly one of the seven +-- Pipeline columns defined in `pipeline_state::types::Pipeline::as_str`. +-- Legacy directory strings (`1_backlog`, `2_current`, ...) are also handled +-- so that databases predating story 934 migrate cleanly. +UPDATE pipeline_items SET pipeline = CASE stage + WHEN 'upcoming' THEN 'backlog' + WHEN 'backlog' THEN 'backlog' + WHEN '1_backlog' THEN 'backlog' + WHEN 'coding' THEN 'coding' + WHEN 'blocked' THEN 'coding' + WHEN '2_current' THEN 'coding' + WHEN 'qa' THEN 'qa' + WHEN 'review_hold' THEN 'qa' + WHEN '3_qa' THEN 'qa' + WHEN 'merge' THEN 'merge' + WHEN 'merge_failure' THEN 'merge' + WHEN 'merge_failure_final' THEN 'merge' + WHEN '4_merge' THEN 'merge' + WHEN 'done' THEN 'done' + WHEN '5_done' THEN 'done' + WHEN 'abandoned' THEN 'closed' + WHEN 'superseded' THEN 'closed' + WHEN 'rejected' THEN 'closed' + WHEN 'archived' THEN 'archived' + WHEN '6_archived' THEN 'archived' + WHEN 'frozen' THEN 'coding' + ELSE '' +END; + +-- Backfill `status` (badge) from the existing `stage` column. +UPDATE pipeline_items SET status = CASE stage + WHEN 'frozen' THEN 'frozen' + WHEN 'review_hold' THEN 'review-hold' + WHEN 'blocked' THEN 'blocked' + WHEN 'merge_failure' THEN 'merge-failure' + WHEN 'merge_failure_final' THEN 'merge-failure-final' + WHEN 'abandoned' THEN 'abandoned' + WHEN 'superseded' THEN 'superseded' + WHEN 'rejected' THEN 'rejected' + WHEN 'done' THEN 'done' + WHEN '5_done' THEN 'done' + ELSE 'active' +END; diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 67913940..670d77ed 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -588,4 +588,218 @@ mod tests { "retry_count must reset to 0 on stage transition" ); } + + /// Story 1087, AC2: the split-stage migration projects every supported + /// wire-form `stage` string into the canonical `(pipeline, status)` pair. + /// The fixture covers each Stage variant (and the legacy numeric-prefix + /// directory names retained for back-compat). + #[tokio::test] + async fn split_stage_migration_backfills_pipeline_and_status_for_every_variant() { + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("pipeline.db"); + let opts = sqlx::sqlite::SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap(); + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + + // (stage written by older code, expected pipeline, expected status) + let fixture: &[(&str, &str, &str)] = &[ + ("upcoming", "backlog", "active"), + ("backlog", "backlog", "active"), + ("coding", "coding", "active"), + ("blocked", "coding", "blocked"), + ("qa", "qa", "active"), + ("review_hold", "qa", "review-hold"), + ("merge", "merge", "active"), + ("merge_failure", "merge", "merge-failure"), + ("merge_failure_final", "merge", "merge-failure-final"), + ("done", "done", "done"), + ("abandoned", "closed", "abandoned"), + ("superseded", "closed", "superseded"), + ("rejected", "closed", "rejected"), + ("archived", "archived", "active"), + ("frozen", "coding", "frozen"), + // Legacy numeric-prefix directory names. + ("1_backlog", "backlog", "active"), + ("2_current", "coding", "active"), + ("3_qa", "qa", "active"), + ("4_merge", "merge", "active"), + ("5_done", "done", "done"), + ("6_archived", "archived", "active"), + ]; + + let now = chrono::Utc::now().to_rfc3339(); + for (idx, (stage, _, _)) in fixture.iter().enumerate() { + let id = format!("1087_fixture_{idx}"); + sqlx::query( + "INSERT INTO pipeline_items \ + (id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)", + ) + .bind(&id) + .bind("fixture") + .bind(*stage) + .bind(Option::::None) + .bind(Option::::None) + .bind(Option::::None) + .bind("---\nname: fixture\n---\n") + .bind(&now) + .execute(&pool) + .await + .unwrap(); + } + + // Force the split-stage backfill to run against the rows we just + // inserted. In production this is `sqlx::migrate!`'s job, but the + // sqlx migrator only runs migrations once per DB and they were already + // applied at the top of the test before any rows existed. Reissuing + // the backfill statements is the migration logic under test. + sqlx::query( + "UPDATE pipeline_items SET pipeline = CASE stage \ + WHEN 'upcoming' THEN 'backlog' \ + WHEN 'backlog' THEN 'backlog' \ + WHEN '1_backlog' THEN 'backlog' \ + WHEN 'coding' THEN 'coding' \ + WHEN 'blocked' THEN 'coding' \ + WHEN '2_current' THEN 'coding' \ + WHEN 'qa' THEN 'qa' \ + WHEN 'review_hold' THEN 'qa' \ + WHEN '3_qa' THEN 'qa' \ + WHEN 'merge' THEN 'merge' \ + WHEN 'merge_failure' THEN 'merge' \ + WHEN 'merge_failure_final' THEN 'merge' \ + WHEN '4_merge' THEN 'merge' \ + WHEN 'done' THEN 'done' \ + WHEN '5_done' THEN 'done' \ + WHEN 'abandoned' THEN 'closed' \ + WHEN 'superseded' THEN 'closed' \ + WHEN 'rejected' THEN 'closed' \ + WHEN 'archived' THEN 'archived' \ + WHEN '6_archived' THEN 'archived' \ + WHEN 'frozen' THEN 'coding' \ + ELSE '' END", + ) + .execute(&pool) + .await + .unwrap(); + sqlx::query( + "UPDATE pipeline_items SET status = CASE stage \ + WHEN 'frozen' THEN 'frozen' \ + WHEN 'review_hold' THEN 'review-hold' \ + WHEN 'blocked' THEN 'blocked' \ + WHEN 'merge_failure' THEN 'merge-failure' \ + WHEN 'merge_failure_final' THEN 'merge-failure-final' \ + WHEN 'abandoned' THEN 'abandoned' \ + WHEN 'superseded' THEN 'superseded' \ + WHEN 'rejected' THEN 'rejected' \ + WHEN 'done' THEN 'done' \ + WHEN '5_done' THEN 'done' \ + ELSE 'active' END", + ) + .execute(&pool) + .await + .unwrap(); + + for (idx, (stage_input, expect_pipeline, expect_status)) in fixture.iter().enumerate() { + let id = format!("1087_fixture_{idx}"); + let row: (String, String) = + sqlx::query_as("SELECT pipeline, status FROM pipeline_items WHERE id = ?1") + .bind(&id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!( + row.0, *expect_pipeline, + "stage {stage_input:?} should backfill pipeline to {expect_pipeline:?}, got {:?}", + row.0 + ); + assert_eq!( + row.1, *expect_status, + "stage {stage_input:?} should backfill status to {expect_status:?}, got {:?}", + row.1 + ); + } + } + + /// Story 1087, AC1: `shadow_write::init` writes a timestamped backup of + /// pipeline.db before the split-stage migration applies, and skips the + /// backup on subsequent restarts (after the migration is recorded). + #[tokio::test] + async fn pre_pipeline_status_backup_only_runs_once() { + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("pipeline.db"); + + // Seed a "pre-1087" DB: open without applying the split-stage migration. + // We do this by opening with `create_if_missing` and running only the + // legacy migrations — but the simplest way to simulate that here is to + // hand-craft a DB containing an `_sqlx_migrations` table that lists + // every migration EXCEPT the split-stage one. + let opts = sqlx::sqlite::SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap(); + // Apply migrations the normal way, then delete the split-stage row so + // the backup branch fires on the next `init`. + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + sqlx::query("DELETE FROM _sqlx_migrations WHERE version = 20260515000000") + .execute(&pool) + .await + .unwrap(); + pool.close().await; + + // First call: backup branch fires, side-car file appears. + super::shadow_write::backup_pre_pipeline_status(&db_path).await; + let backups: Vec<_> = std::fs::read_dir(tmp.path()) + .unwrap() + .filter_map(Result::ok) + .filter(|e| { + e.file_name() + .to_string_lossy() + .contains(".pre-pipeline-status.") + }) + .collect(); + assert_eq!( + backups.len(), + 1, + "expected exactly one .pre-pipeline-status backup, got {}", + backups.len() + ); + + // Re-apply the migration so the marker row is back, simulating a + // post-migration server restart. + let opts = sqlx::sqlite::SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(false); + let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap(); + let fake_checksum: Vec = vec![0u8; 20]; + sqlx::query( + "INSERT INTO _sqlx_migrations \ + (version, description, installed_on, success, checksum, execution_time) \ + VALUES (20260515000000, 'split_stage_into_pipeline_status', '2026-05-15T00:00:00Z', 1, ?1, 0)", + ) + .bind(&fake_checksum) + .execute(&pool) + .await + .unwrap(); + pool.close().await; + + // Second call: no new backup written. + super::shadow_write::backup_pre_pipeline_status(&db_path).await; + let backups_after: Vec<_> = std::fs::read_dir(tmp.path()) + .unwrap() + .filter_map(Result::ok) + .filter(|e| { + e.file_name() + .to_string_lossy() + .contains(".pre-pipeline-status.") + }) + .collect(); + assert_eq!( + backups_after.len(), + 1, + "post-migration init must not create another backup; got {} backups", + backups_after.len() + ); + } } diff --git a/server/src/db/shadow_write.rs b/server/src/db/shadow_write.rs index 3fd4e612..30fc19b5 100644 --- a/server/src/db/shadow_write.rs +++ b/server/src/db/shadow_write.rs @@ -69,6 +69,13 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { return Ok(()); } + // Story 1087: before running the migration that splits `stage` into + // (`pipeline`, `status`), take a timestamped side-car copy of the live DB + // so the pre-split state is recoverable. Skip the copy when the file does + // not yet exist (fresh installs) or when the split-stage migration has + // already been applied (subsequent restarts). + backup_pre_pipeline_status(db_path).await; + let options = SqliteConnectOptions::new() .filename(db_path) .create_if_missing(true); @@ -147,6 +154,63 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { Ok(()) } +/// Story 1087: file name of the split-stage migration. The version prefix is +/// the same `i64` sqlx assigns to that migration on `installed_on` rows in +/// `_sqlx_migrations`. +const SPLIT_STAGE_MIGRATION_VERSION: i64 = 20260515000000; + +/// Story 1087: take a timestamped side-car copy of `pipeline.db` if and only if +/// the split-stage migration has not yet been applied. This is the AC1 backup +/// — `pipeline.db.pre-pipeline-status..bak` next to the live file. +/// +/// Failures are logged but never propagated: a missing backup must not block +/// the server from starting (a corrupt source file or a read-only directory +/// will be surfaced by the migration step itself). +pub(crate) async fn backup_pre_pipeline_status(db_path: &Path) { + if !db_path.exists() { + return; + } + + // Cheap pre-check: open the DB read-only and see whether the split-stage + // migration version is recorded in `_sqlx_migrations`. If it is, the + // backup has already been taken on a previous start and there is nothing + // to do. + let options = SqliteConnectOptions::new() + .filename(db_path) + .read_only(true) + .create_if_missing(false); + + let probe = SqlitePool::connect_with(options).await; + if let Ok(pool) = probe { + let already_split: Result, _> = + sqlx::query_as("SELECT version FROM _sqlx_migrations WHERE version = ?1 LIMIT 1") + .bind(SPLIT_STAGE_MIGRATION_VERSION) + .fetch_optional(&pool) + .await; + pool.close().await; + if let Ok(Some(_)) = already_split { + return; + } + } + + let ts = chrono::Utc::now().timestamp(); + let mut backup = db_path.as_os_str().to_owned(); + backup.push(format!(".pre-pipeline-status.{ts}.bak")); + let backup_path = std::path::PathBuf::from(backup); + + match tokio::fs::copy(db_path, &backup_path).await { + Ok(_) => slog!( + "[db] Wrote pre-pipeline-status backup of {} to {}", + db_path.display(), + backup_path.display(), + ), + Err(e) => slog!( + "[db] Failed to write pre-pipeline-status backup of {}: {e}", + db_path.display(), + ), + } +} + /// Compare the live `_sqlx_migrations` table against the compiled-in migration /// set and return any rows whose version is not known to this binary. /// diff --git a/server/src/http/mcp/diagnostics/mod.rs b/server/src/http/mcp/diagnostics/mod.rs index dba7d3cf..7421cb88 100644 --- a/server/src/http/mcp/diagnostics/mod.rs +++ b/server/src/http/mcp/diagnostics/mod.rs @@ -92,9 +92,20 @@ pub(crate) fn tool_dump_crdt(args: &Value) -> Result { .items .into_iter() .map(|item| { + // Story 1087: emit `pipeline` and `status` alongside `stage` so + // crdt-dump consumers can route by column/badge without re-deriving + // the projection from the stage string. + let (pipeline, status) = item + .stage + .as_deref() + .and_then(crate::pipeline_state::Stage::from_dir) + .map(|s| (s.pipeline().as_str(), s.status().as_str())) + .unwrap_or(("", "")); json!({ "story_id": item.story_id, "stage": item.stage, + "pipeline": pipeline, + "status": status, "name": item.name, "agent": item.agent, "retry_count": item.retry_count, diff --git a/server/src/http/mcp/story_tools/epic.rs b/server/src/http/mcp/story_tools/epic.rs index 907a288e..0e810bd7 100644 --- a/server/src/http/mcp/story_tools/epic.rs +++ b/server/src/http/mcp/story_tools/epic.rs @@ -129,10 +129,14 @@ pub(crate) fn tool_show_epic(args: &Value, _ctx: &AppContext) -> Result