diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index e436cc44..6d1e9f31 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -341,6 +341,55 @@ pub fn next_item_number() -> u32 { max_num + 1 } +/// One-time migration: sync CRDT stages from the pipeline_items DB table. +/// +/// During the filesystem→CRDT migration, many stories were imported into the +/// CRDT with stage `1_backlog` but then moved forward (to done/archived) via +/// filesystem-only moves that never wrote CRDT ops. This leaves stale +/// `1_backlog` entries in the CRDT for stories that are actually done. +/// +/// This function reads the authoritative stage from `pipeline_items` and +/// calls `write_item` to correct any CRDT entries that disagree. +pub async fn sync_crdt_stages_from_db(db_path: &Path) { + let options = SqliteConnectOptions::new().filename(db_path); + let Ok(pool) = SqlitePool::connect_with(options).await else { + slog!("[db] CRDT stage sync: failed to connect to pipeline.db"); + return; + }; + + let rows: Vec<(String, String, Option, Option, Option, Option, Option)> = + sqlx::query_as( + "SELECT id, stage, name, agent, retry_count, blocked, depends_on FROM pipeline_items" + ) + .fetch_all(&pool) + .await + .unwrap_or_default(); + + let mut corrected = 0u32; + for (story_id, db_stage, name, agent, retry_count, blocked, depends_on) in &rows { + let crdt_stage = crate::crdt_state::read_item(story_id) + .map(|v| v.stage.clone()); + + if crdt_stage.as_deref() != Some(db_stage.as_str()) { + crate::crdt_state::write_item( + story_id, + db_stage, + name.as_deref(), + agent.as_deref(), + *retry_count, + *blocked, + depends_on.as_deref(), + None, + None, + ); + corrected += 1; + } + } + + if corrected > 0 { + slog!("[db] CRDT stage sync: corrected {corrected} items from pipeline_items"); + } +} #[cfg(test)] mod tests { diff --git a/server/src/main.rs b/server/src/main.rs index 9c02d514..f1383196 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -337,6 +337,12 @@ async fn main() -> Result<(), std::io::Error> { } } + // One-time fix: sync CRDT stages from pipeline_items DB for stories + // that were migrated with stale backlog stages. + if let Some(ref db_path) = pipeline_db_path { + db::sync_crdt_stages_from_db(db_path).await; + } + // (CRDT state layer is initialised above alongside the legacy pipeline.db.) // Start the CRDT sync rendezvous client if configured in project.toml.