huskies: merge 1116 story rebuild_and_restart loses pending CRDT ops by calling exec() before persistence channel drains

This commit is contained in:
dave
2026-05-17 17:43:46 +00:00
parent 49af014a84
commit 7de167b21b
11 changed files with 241 additions and 44 deletions
+1
View File
@@ -45,6 +45,7 @@ pub use read::{
dep_is_archived_crdt, dep_is_done_crdt, dump_crdt_state, evict_item, is_tombstoned,
read_all_items, read_item, tombstoned_ids,
};
pub(crate) use state::flush_persistence;
pub use state::{init, subscribe};
pub use types::{
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent, EpicId,
+13 -5
View File
@@ -2,6 +2,7 @@
#![allow(unused_imports, dead_code)]
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use super::hex;
use bft_json_crdt::json_crdt::*;
@@ -10,9 +11,10 @@ use tokio::sync::broadcast;
use super::VectorClock;
use super::state::{
SYNC_TX, all_ops_lock, apply_and_persist, emit_event, get_crdt, rebuild_active_agent_index,
rebuild_agent_throttle_index, rebuild_index, rebuild_merge_job_index, rebuild_node_index,
rebuild_test_job_index, rebuild_token_index, track_op, vector_clock_lock,
PERSIST_PENDING, PersistMsg, SYNC_TX, all_ops_lock, apply_and_persist, emit_event, get_crdt,
rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_index,
rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index, rebuild_token_index,
track_op, vector_clock_lock,
};
use super::types::{CrdtEvent, PipelineDoc};
use crate::slog;
@@ -116,9 +118,15 @@ pub fn apply_remote_op(op: SignedOp) -> bool {
}
// Persist the op.
if let Err(e) = state.persist_tx.send(op.clone()) {
if state
.persist_tx
.send(PersistMsg::Op(Box::new(op.clone())))
.is_ok()
{
PERSIST_PENDING.fetch_add(1, Ordering::Relaxed);
} else {
crate::slog_error!(
"[crdt] Failed to send remote op to persist task: {e}; persist task may be dead. \
"[crdt] Failed to send remote op to persist task; persist task may be dead. \
In-memory state is now ahead of persisted state."
);
}
+9 -1
View File
@@ -6,7 +6,9 @@ use std::collections::HashMap;
use bft_json_crdt::json_crdt::*;
use bft_json_crdt::op::{OpId, ROOT_ID};
use super::state::{all_ops_lock, apply_and_persist, get_crdt, rebuild_index};
use std::sync::atomic::Ordering;
use super::state::{PERSIST_PENDING, all_ops_lock, apply_and_persist, get_crdt, rebuild_index};
use super::types::{PipelineDoc, PipelineItemCrdt, PipelineItemView};
// ── Debug dump ───────────────────────────────────────────────────────
@@ -44,6 +46,8 @@ pub struct CrdtStateDump {
pub max_seq_in_list: u64,
/// Count of ops in the ALL_OPS journal (persisted ops replayed at startup).
pub persisted_ops_count: usize,
/// Count of ops queued in the persistence channel not yet written to SQLite.
pub pending_persist_ops_count: usize,
pub items: Vec<CrdtItemDump>,
}
@@ -61,6 +65,7 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump {
let persisted_ops_count = all_ops_lock()
.and_then(|m| m.lock().ok().map(|v| v.len()))
.unwrap_or(0);
let pending_persist_ops_count = PERSIST_PENDING.load(Ordering::Relaxed);
let Some(state_mutex) = get_crdt() else {
return CrdtStateDump {
@@ -69,6 +74,7 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump {
total_ops_in_list: 0,
max_seq_in_list: 0,
persisted_ops_count,
pending_persist_ops_count,
items: Vec::new(),
};
};
@@ -80,6 +86,7 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump {
total_ops_in_list: 0,
max_seq_in_list: 0,
persisted_ops_count,
pending_persist_ops_count,
items: Vec::new(),
};
};
@@ -179,6 +186,7 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump {
total_ops_in_list,
max_seq_in_list,
persisted_ops_count,
pending_persist_ops_count,
items,
}
}
+10 -2
View File
@@ -5,11 +5,13 @@
//! it to the live document, sends it to the persistence channel, and broadcasts
//! it to sync peers via [`super::SYNC_TX`].
use std::sync::atomic::Ordering;
use bft_json_crdt::json_crdt::JsonValue;
use bft_json_crdt::op::Op;
use super::super::types::CrdtEvent;
use super::{CrdtState, statics};
use super::{CrdtState, init::PersistMsg, statics};
/// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the
/// persistence channel. The closure receives `&mut CrdtState` so it can
@@ -21,7 +23,13 @@ where
let raw_op = op_fn(state);
let signed = raw_op.sign(&state.keypair);
state.crdt.apply(signed.clone());
if state.persist_tx.send(signed.clone()).is_err() {
if state
.persist_tx
.send(PersistMsg::Op(Box::new(signed.clone())))
.is_ok()
{
statics::PERSIST_PENDING.fetch_add(1, Ordering::Relaxed);
} else {
let op_type = if signed.inner.is_deleted {
"Delete"
} else {
+85 -28
View File
@@ -8,12 +8,13 @@
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Mutex;
use std::sync::atomic::Ordering;
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue, SignedOp};
use bft_json_crdt::keypair::{Ed25519KeyPair, make_keypair};
use sqlx::SqlitePool;
use sqlx::sqlite::SqliteConnectOptions;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::{broadcast, mpsc, oneshot};
use super::super::VectorClock;
use super::super::hex;
@@ -23,10 +24,18 @@ use super::indices::{
rebuild_index, rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index,
rebuild_token_index,
};
use super::statics::{ALL_OPS, CRDT_EVENT_TX, SYNC_TX, VECTOR_CLOCK};
use super::statics::{ALL_OPS, CRDT_EVENT_TX, PERSIST_PENDING, SYNC_TX, VECTOR_CLOCK};
use super::{CRDT_STATE, CrdtState};
use crate::slog;
/// Message type for the persistence background channel.
pub(crate) enum PersistMsg {
/// Persist this op to SQLite.
Op(Box<SignedOp>),
/// Drain: signal the sender after all preceding ops are committed.
Flush(oneshot::Sender<()>),
}
/// Initialise the CRDT state layer.
///
/// Opens the SQLite database, loads or creates a node keypair, replays any
@@ -119,35 +128,46 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
);
// Spawn background persistence task.
let (persist_tx, mut persist_rx) = mpsc::unbounded_channel::<SignedOp>();
let (persist_tx, mut persist_rx) = mpsc::unbounded_channel::<PersistMsg>();
tokio::spawn(async move {
while let Some(op) = persist_rx.recv().await {
let op_json = match serde_json::to_string(&op) {
Ok(j) => j,
Err(e) => {
slog!("[crdt] Failed to serialize op: {e}");
continue;
while let Some(msg) = persist_rx.recv().await {
match msg {
PersistMsg::Op(op) => {
let op = *op;
let op_json = match serde_json::to_string(&op) {
Ok(j) => j,
Err(e) => {
slog!("[crdt] Failed to serialize op: {e}");
PERSIST_PENDING.fetch_sub(1, Ordering::Relaxed);
continue;
}
};
let op_id = hex::encode(&op.id());
let seq = op.inner.seq as i64;
let now = chrono::Utc::now().to_rfc3339();
let result = sqlx::query(
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \
VALUES (?1, ?2, ?3, ?4) \
ON CONFLICT(op_id) DO NOTHING",
)
.bind(&op_id)
.bind(seq)
.bind(&op_json)
.bind(&now)
.execute(&pool)
.await;
if let Err(e) = result {
slog!("[crdt] Failed to persist op {}: {e}", &op_id[..12]);
}
PERSIST_PENDING.fetch_sub(1, Ordering::Relaxed);
}
PersistMsg::Flush(reply) => {
// All ops queued before this message have already been processed.
let _ = reply.send(());
}
};
let op_id = hex::encode(&op.id());
let seq = op.inner.seq as i64;
let now = chrono::Utc::now().to_rfc3339();
let result = sqlx::query(
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \
VALUES (?1, ?2, ?3, ?4) \
ON CONFLICT(op_id) DO NOTHING",
)
.bind(&op_id)
.bind(seq)
.bind(&op_json)
.bind(&now)
.execute(&pool)
.await;
if let Err(e) = result {
slog!("[crdt] Failed to persist op {}: {e}", &op_id[..12]);
}
}
});
@@ -181,6 +201,43 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
Ok(())
}
/// Signal the persistence background task to drain and wait until all currently-queued
/// ops have been written to SQLite, or until `timeout` elapses.
///
/// Because the persistence channel is FIFO, a `Flush` sentinel processed by the task
/// guarantees that every `Op` sent before it has already been committed. On timeout a
/// warning is logged with the queue depth so regressions are visible in logs.
pub(crate) async fn flush_persistence(timeout: std::time::Duration) {
let Some(state_mutex) = super::get_crdt() else {
return;
};
let persist_tx = {
let Ok(state) = state_mutex.lock() else {
return;
};
state.persist_tx.clone()
};
let pending_at_send = PERSIST_PENDING.load(Ordering::Relaxed);
let (tx, rx) = oneshot::channel();
if persist_tx.send(PersistMsg::Flush(tx)).is_err() {
slog!("[rebuild] Persistence channel closed — skipping flush");
return;
}
match tokio::time::timeout(timeout, rx).await {
Ok(_) => {
slog!("[rebuild] Persistence channel drained ({pending_at_send} ops flushed)");
}
Err(_) => {
let pending_now = PERSIST_PENDING.load(Ordering::Relaxed);
slog!(
"[rebuild] WARNING: persistence flush timed out after {}ms; \
queue_depth_at_send={pending_at_send} queue_depth_now={pending_now}",
timeout.as_millis()
);
}
}
}
/// Load or create the Ed25519 keypair used by this node.
async fn load_or_create_keypair(pool: &SqlitePool) -> Result<Ed25519KeyPair, sqlx::Error> {
let row: Option<(Vec<u8>,)> =
+5 -4
View File
@@ -27,6 +27,7 @@ mod tests;
// ── Re-exports for crdt_state siblings ──────────────────────────────
pub use init::init;
pub(crate) use init::{PersistMsg, flush_persistence};
/// Subscribe to CRDT state-transition events.
///
@@ -41,8 +42,8 @@ pub(super) use indices::{
rebuild_index, rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index,
rebuild_token_index,
};
pub(crate) use statics::{PERSIST_PENDING, all_ops_lock, vector_clock_lock};
pub(super) use statics::{SYNC_TX, track_op};
pub(crate) use statics::{all_ops_lock, vector_clock_lock};
// ── CrdtState struct ─────────────────────────────────────────────────
@@ -66,8 +67,8 @@ pub(super) struct CrdtState {
pub(super) agent_throttle_index: HashMap<String, usize>,
/// Maps project name → index in the gateway_projects ListCrdt for O(1) lookup.
pub(super) gateway_project_index: HashMap<String, usize>,
/// Channel sender for fire-and-forget op persistence.
pub(super) persist_tx: mpsc::UnboundedSender<SignedOp>,
/// Channel sender for op persistence and drain signalling.
pub(super) persist_tx: mpsc::UnboundedSender<init::PersistMsg>,
/// Max sequence number seen across all ops during init() replay.
///
/// Newly-created registers (post-init) must have their Lamport clock
@@ -128,7 +129,7 @@ pub(super) fn get_crdt() -> Option<&'static Mutex<CrdtState>> {
pub fn init_for_test() {
let keypair = make_keypair();
let crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
let (persist_tx, rx) = mpsc::unbounded_channel();
let (persist_tx, rx) = mpsc::unbounded_channel::<init::PersistMsg>();
// Leak the receiver so the channel stays open: apply_and_persist
// can then send without error, preventing [crdt_persist] WARNs
// from racing with other tests that watch the global log buffer.
+9
View File
@@ -10,6 +10,7 @@
//! tests do not share `ALL_OPS` — preventing one test's `apply_compaction`
//! from pruning another test's freshly-written ops.
use std::sync::atomic::AtomicUsize;
use std::sync::{Mutex, OnceLock};
use bft_json_crdt::json_crdt::SignedOp;
@@ -19,6 +20,14 @@ use super::super::VectorClock;
use super::super::hex;
use super::super::types::CrdtEvent;
/// Count of ops queued in the persistence channel that have not yet been written to SQLite.
///
/// Incremented when an op is sent into the channel; decremented after the
/// persistence task commits it. Exposed via `dump_crdt_state` as
/// `pending_persist_ops_count` so operators can tell whether there is a flush
/// backlog before calling `rebuild_and_restart`.
pub(crate) static PERSIST_PENDING: AtomicUsize = AtomicUsize::new(0);
/// Broadcast channel for CRDT events (stage transitions, etc.).
pub(super) static CRDT_EVENT_TX: OnceLock<broadcast::Sender<CrdtEvent>> = OnceLock::new();
+102 -2
View File
@@ -6,6 +6,7 @@
use super::super::hex;
use super::super::read::extract_item_view;
use super::super::types::PipelineDoc;
use super::init::PersistMsg;
use super::*;
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue, SignedOp};
use bft_json_crdt::keypair::make_keypair;
@@ -222,7 +223,7 @@ async fn init_and_write_read_roundtrip() {
fn persist_tx_send_failure_logs_warn_with_op_type_and_seq() {
let kp = make_keypair();
let crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let (persist_tx, persist_rx) = mpsc::unbounded_channel::<SignedOp>();
let (persist_tx, persist_rx) = mpsc::unbounded_channel::<PersistMsg>();
let mut state = CrdtState {
crdt,
@@ -296,7 +297,7 @@ fn persist_tx_send_failure_logs_warn_with_op_type_and_seq() {
fn persist_tx_send_success_emits_no_warn() {
let kp = make_keypair();
let crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let (persist_tx, _persist_rx) = mpsc::unbounded_channel::<SignedOp>();
let (persist_tx, _persist_rx) = mpsc::unbounded_channel::<PersistMsg>();
let mut state = CrdtState {
crdt,
@@ -485,3 +486,102 @@ async fn restart_new_register_resumes_from_lamport_floor() {
max_seq,
);
}
/// Regression test for story 1116: ops sent before `flush_persistence` must all be
/// present in the `crdt_ops` SQLite table after the flush completes.
///
/// Bug: `rebuild_and_restart` called `exec()` before the persistence task had
/// a chance to drain the unbounded channel, silently dropping queued ops.
///
/// Reproducer: apply N ops → call `rebuild_and_restart` → the process re-execs
/// and on the next startup `persisted_ops_count` is < N (lost ops).
/// Fixed by: send a `Flush` sentinel through the channel before `exec()`; the
/// task echoes back only after all preceding `Op` messages are committed.
#[tokio::test]
async fn flush_persistence_drains_all_ops_before_ack() {
use std::sync::atomic::Ordering;
use tokio::sync::oneshot;
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("flush_drain_test.db");
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
// Spawn an isolated persistence task — same logic as init() but without
// touching the global singleton (keeping this test fully self-contained).
let (tx, mut rx) = mpsc::unbounded_channel::<PersistMsg>();
let pool_clone = pool.clone();
tokio::spawn(async move {
use std::sync::atomic::AtomicUsize;
let counter = AtomicUsize::new(0);
while let Some(msg) = rx.recv().await {
match msg {
PersistMsg::Op(op) => {
let op_json = serde_json::to_string(&op).unwrap();
let op_id = hex::encode(&op.id());
let seq = op.inner.seq as i64;
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \
VALUES (?1, ?2, ?3, ?4) ON CONFLICT(op_id) DO NOTHING",
)
.bind(&op_id)
.bind(seq)
.bind(&op_json)
.bind(&now)
.execute(&pool_clone)
.await
.unwrap();
counter.fetch_add(1, Ordering::Relaxed);
}
PersistMsg::Flush(reply) => {
let _ = reply.send(());
}
}
}
});
const N: usize = 10;
for i in 0..N {
let item: JsonValue = json!({
"story_id": format!("1116_drain_{i}"),
"stage": "1_backlog",
"name": format!("Drain Test {i}"),
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt.apply(op.clone());
tx.send(PersistMsg::Op(Box::new(op))).unwrap();
}
// Send flush sentinel and wait — all N ops must be committed first.
let (flush_tx, flush_rx) = oneshot::channel();
tx.send(PersistMsg::Flush(flush_tx)).unwrap();
tokio::time::timeout(std::time::Duration::from_secs(5), flush_rx)
.await
.expect("flush timed out — persistence task did not drain within 5 s")
.expect("flush oneshot dropped unexpectedly");
// Verify all N ops are in the database.
let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM crdt_ops")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
count as usize, N,
"all {N} ops must be in crdt_ops after flush; got {count}"
);
}
+1 -1
View File
@@ -126,7 +126,7 @@ pub(crate) fn tool_dump_crdt(args: &Value) -> Result<String, String> {
"total_ops_in_list": dump.total_ops_in_list,
"max_seq_in_list": dump.max_seq_in_list,
"persisted_ops_count": dump.persisted_ops_count,
"pending_persist_ops_count": null,
"pending_persist_ops_count": dump.pending_persist_ops_count,
},
"items": items,
}))
+1 -1
View File
@@ -203,7 +203,7 @@ pub fn debug_crdt_handler(req: &poem::Request) -> poem::Response {
"total_ops_in_list": dump.total_ops_in_list,
"max_seq_in_list": dump.max_seq_in_list,
"persisted_ops_count": dump.persisted_ops_count,
"pending_persist_ops_count": null,
"pending_persist_ops_count": dump.pending_persist_ops_count,
},
"items": items,
});
+5
View File
@@ -189,6 +189,11 @@ pub async fn rebuild_and_restart(
n.notify(ShutdownReason::Rebuild).await;
}
// 5b. Drain the persistence channel so no queued ops are lost when exec()
// replaces this process. Times out after 5 s with a logged warning
// naming the queue depth so any regression is visible in logs.
crate::crdt_state::flush_persistence(std::time::Duration::from_secs(5)).await;
// 6. Re-exec with the new binary.
// Use the cargo output path rather than current_exe() so that rebuilds
// inside Docker work correctly — the running binary may be installed at