fix: startup migration syncs stale CRDT stages from pipeline_items DB
510 stories had stale 1_backlog stages in the CRDT because they were imported during the filesystem→CRDT migration and then moved forward via filesystem-only moves that never wrote CRDT ops. This made done stories appear as ghost entries in the backlog. On startup, reads the authoritative stage from pipeline_items and corrects any CRDT entries that disagree. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -341,6 +341,55 @@ pub fn next_item_number() -> u32 {
|
|||||||
max_num + 1
|
max_num + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// One-time migration: sync CRDT stages from the pipeline_items DB table.
|
||||||
|
///
|
||||||
|
/// During the filesystem→CRDT migration, many stories were imported into the
|
||||||
|
/// CRDT with stage `1_backlog` but then moved forward (to done/archived) via
|
||||||
|
/// filesystem-only moves that never wrote CRDT ops. This leaves stale
|
||||||
|
/// `1_backlog` entries in the CRDT for stories that are actually done.
|
||||||
|
///
|
||||||
|
/// This function reads the authoritative stage from `pipeline_items` and
|
||||||
|
/// calls `write_item` to correct any CRDT entries that disagree.
|
||||||
|
pub async fn sync_crdt_stages_from_db(db_path: &Path) {
|
||||||
|
let options = SqliteConnectOptions::new().filename(db_path);
|
||||||
|
let Ok(pool) = SqlitePool::connect_with(options).await else {
|
||||||
|
slog!("[db] CRDT stage sync: failed to connect to pipeline.db");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let rows: Vec<(String, String, Option<String>, Option<String>, Option<i64>, Option<bool>, Option<String>)> =
|
||||||
|
sqlx::query_as(
|
||||||
|
"SELECT id, stage, name, agent, retry_count, blocked, depends_on FROM pipeline_items"
|
||||||
|
)
|
||||||
|
.fetch_all(&pool)
|
||||||
|
.await
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let mut corrected = 0u32;
|
||||||
|
for (story_id, db_stage, name, agent, retry_count, blocked, depends_on) in &rows {
|
||||||
|
let crdt_stage = crate::crdt_state::read_item(story_id)
|
||||||
|
.map(|v| v.stage.clone());
|
||||||
|
|
||||||
|
if crdt_stage.as_deref() != Some(db_stage.as_str()) {
|
||||||
|
crate::crdt_state::write_item(
|
||||||
|
story_id,
|
||||||
|
db_stage,
|
||||||
|
name.as_deref(),
|
||||||
|
agent.as_deref(),
|
||||||
|
*retry_count,
|
||||||
|
*blocked,
|
||||||
|
depends_on.as_deref(),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
corrected += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if corrected > 0 {
|
||||||
|
slog!("[db] CRDT stage sync: corrected {corrected} items from pipeline_items");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
|||||||
@@ -337,6 +337,12 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// One-time fix: sync CRDT stages from pipeline_items DB for stories
|
||||||
|
// that were migrated with stale backlog stages.
|
||||||
|
if let Some(ref db_path) = pipeline_db_path {
|
||||||
|
db::sync_crdt_stages_from_db(db_path).await;
|
||||||
|
}
|
||||||
|
|
||||||
// (CRDT state layer is initialised above alongside the legacy pipeline.db.)
|
// (CRDT state layer is initialised above alongside the legacy pipeline.db.)
|
||||||
|
|
||||||
// Start the CRDT sync rendezvous client if configured in project.toml.
|
// Start the CRDT sync rendezvous client if configured in project.toml.
|
||||||
|
|||||||
Reference in New Issue
Block a user