debug: add logging to sync_crdt_stages_from_db to diagnose stale backlog

This commit is contained in:
dave
2026-04-10 20:33:04 +00:00
parent bae3619723
commit fc24da82ae
+15 -4
View File
@@ -351,9 +351,11 @@ pub fn next_item_number() -> u32 {
/// This function reads the authoritative stage from `pipeline_items` and /// This function reads the authoritative stage from `pipeline_items` and
/// calls `write_item` to correct any CRDT entries that disagree. /// calls `write_item` to correct any CRDT entries that disagree.
pub async fn sync_crdt_stages_from_db(db_path: &Path) { pub async fn sync_crdt_stages_from_db(db_path: &Path) {
slog!("[db-sync] START: sync_crdt_stages_from_db called with {}", db_path.display());
let options = SqliteConnectOptions::new().filename(db_path); let options = SqliteConnectOptions::new().filename(db_path);
let Ok(pool) = SqlitePool::connect_with(options).await else { let Ok(pool) = SqlitePool::connect_with(options).await else {
slog!("[db] CRDT stage sync: failed to connect to pipeline.db"); slog!("[db-sync] FAIL: could not connect to pipeline.db");
return; return;
}; };
@@ -365,11 +367,20 @@ pub async fn sync_crdt_stages_from_db(db_path: &Path) {
.await .await
.unwrap_or_default(); .unwrap_or_default();
slog!("[db-sync] loaded {} rows from pipeline_items", rows.len());
let mut corrected = 0u32; let mut corrected = 0u32;
let mut skipped = 0u32;
let mut first_few = 0u32;
for (story_id, db_stage, name, agent, retry_count, blocked, depends_on) in &rows { for (story_id, db_stage, name, agent, retry_count, blocked, depends_on) in &rows {
let crdt_stage = crate::crdt_state::read_item(story_id) let crdt_stage = crate::crdt_state::read_item(story_id)
.map(|v| v.stage.clone()); .map(|v| v.stage.clone());
if first_few < 5 {
slog!("[db-sync] sample: '{story_id}' crdt={crdt_stage:?} db={db_stage}");
first_few += 1;
}
if crdt_stage.as_deref() != Some(db_stage.as_str()) { if crdt_stage.as_deref() != Some(db_stage.as_str()) {
crate::crdt_state::write_item( crate::crdt_state::write_item(
story_id, story_id,
@@ -383,12 +394,12 @@ pub async fn sync_crdt_stages_from_db(db_path: &Path) {
None, None,
); );
corrected += 1; corrected += 1;
} else {
skipped += 1;
} }
} }
if corrected > 0 { slog!("[db-sync] DONE: corrected={corrected} skipped={skipped} total={}", rows.len());
slog!("[db] CRDT stage sync: corrected {corrected} items from pipeline_items");
}
} }
#[cfg(test)] #[cfg(test)]