diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 6d1e9f31..360ad212 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -351,9 +351,11 @@ pub fn next_item_number() -> u32 { /// This function reads the authoritative stage from `pipeline_items` and /// calls `write_item` to correct any CRDT entries that disagree. pub async fn sync_crdt_stages_from_db(db_path: &Path) { + slog!("[db-sync] START: sync_crdt_stages_from_db called with {}", db_path.display()); + let options = SqliteConnectOptions::new().filename(db_path); let Ok(pool) = SqlitePool::connect_with(options).await else { - slog!("[db] CRDT stage sync: failed to connect to pipeline.db"); + slog!("[db-sync] FAIL: could not connect to pipeline.db"); return; }; @@ -365,11 +367,20 @@ pub async fn sync_crdt_stages_from_db(db_path: &Path) { .await .unwrap_or_default(); + slog!("[db-sync] loaded {} rows from pipeline_items", rows.len()); + let mut corrected = 0u32; + let mut skipped = 0u32; + let mut first_few = 0u32; for (story_id, db_stage, name, agent, retry_count, blocked, depends_on) in &rows { let crdt_stage = crate::crdt_state::read_item(story_id) .map(|v| v.stage.clone()); + if first_few < 5 { + slog!("[db-sync] sample: '{story_id}' crdt={crdt_stage:?} db={db_stage}"); + first_few += 1; + } + if crdt_stage.as_deref() != Some(db_stage.as_str()) { crate::crdt_state::write_item( story_id, @@ -383,12 +394,12 @@ pub async fn sync_crdt_stages_from_db(db_path: &Path) { None, ); corrected += 1; + } else { + skipped += 1; } } - if corrected > 0 { - slog!("[db] CRDT stage sync: corrected {corrected} items from pipeline_items"); - } + slog!("[db-sync] DONE: corrected={corrected} skipped={skipped} total={}", rows.len()); } #[cfg(test)]