huskies: merge 539_bug_crdt_event_bridge_still_writes_filesystem_shadow_files_after_530_eliminated_filesystem_state
This commit is contained in:
@@ -404,6 +404,78 @@ pub fn next_item_number() -> u32 {
|
||||
max_num + 1
|
||||
}
|
||||
|
||||
/// One-time migration: sync CRDT stages from the pipeline_items DB table.
|
||||
///
|
||||
/// During the filesystem→CRDT migration, many stories were imported into the
|
||||
/// CRDT with stage `1_backlog` but then moved forward (to done/archived) via
|
||||
/// filesystem-only moves that never wrote CRDT ops. This leaves stale
|
||||
/// `1_backlog` entries in the CRDT for stories that are actually done.
|
||||
///
|
||||
/// This function reads the authoritative stage from `pipeline_items` and
|
||||
/// calls `write_item` to correct any CRDT entries that disagree.
|
||||
#[cfg(test)]
|
||||
pub async fn sync_crdt_stages_from_db(db_path: &Path) {
|
||||
slog!("[db-sync] START: sync_crdt_stages_from_db called with {}", db_path.display());
|
||||
|
||||
let options = SqliteConnectOptions::new().filename(db_path);
|
||||
let Ok(pool) = SqlitePool::connect_with(options).await else {
|
||||
slog!("[db-sync] FAIL: could not connect to pipeline.db");
|
||||
return;
|
||||
};
|
||||
|
||||
type SyncRow = (String, String, Option<String>, Option<String>, Option<i64>, Option<bool>, Option<String>);
|
||||
let rows: Vec<SyncRow> =
|
||||
sqlx::query_as(
|
||||
"SELECT id, stage, name, agent, retry_count, blocked, depends_on FROM pipeline_items"
|
||||
)
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
|
||||
slog!("[db-sync] loaded {} rows from pipeline_items", rows.len());
|
||||
|
||||
let mut corrected = 0u32;
|
||||
let mut skipped = 0u32;
|
||||
let mut first_few = 0u32;
|
||||
for (story_id, db_stage, name, agent, retry_count, blocked, depends_on) in &rows {
|
||||
let crdt_stage = crate::crdt_state::read_item(story_id)
|
||||
.map(|v| v.stage.clone());
|
||||
|
||||
if first_few < 5 {
|
||||
slog!("[db-sync] sample: '{story_id}' crdt={crdt_stage:?} db={db_stage}");
|
||||
first_few += 1;
|
||||
}
|
||||
|
||||
// Skip stale "deleted" shadow rows left by old code that used the
|
||||
// "deleted" sentinel as a soft-delete instead of issuing a real SQL
|
||||
// DELETE. Syncing these back into the CRDT would resurrect tombstoned
|
||||
// items with stage = "deleted".
|
||||
if db_stage == "deleted" {
|
||||
skipped += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if crdt_stage.as_deref() != Some(db_stage.as_str()) {
|
||||
crate::crdt_state::write_item(
|
||||
story_id,
|
||||
db_stage,
|
||||
name.as_deref(),
|
||||
agent.as_deref(),
|
||||
*retry_count,
|
||||
*blocked,
|
||||
depends_on.as_deref(),
|
||||
None,
|
||||
None,
|
||||
None, // merged_at unknown for migrated items; epoch fallback sweeps them
|
||||
);
|
||||
corrected += 1;
|
||||
} else {
|
||||
skipped += 1;
|
||||
}
|
||||
}
|
||||
|
||||
slog!("[db-sync] DONE: corrected={corrected} skipped={skipped} total={}", rows.len());
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
Reference in New Issue
Block a user