diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index 35656941..d11bd39d 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -45,6 +45,7 @@ pub use read::{ dep_is_archived_crdt, dep_is_done_crdt, dump_crdt_state, evict_item, is_tombstoned, read_all_items, read_item, tombstoned_ids, }; +pub(crate) use state::flush_persistence; pub use state::{init, subscribe}; pub use types::{ ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent, EpicId, diff --git a/server/src/crdt_state/ops.rs b/server/src/crdt_state/ops.rs index cc14901a..45ed2f83 100644 --- a/server/src/crdt_state/ops.rs +++ b/server/src/crdt_state/ops.rs @@ -2,6 +2,7 @@ #![allow(unused_imports, dead_code)] use std::collections::HashMap; +use std::sync::atomic::Ordering; use super::hex; use bft_json_crdt::json_crdt::*; @@ -10,9 +11,10 @@ use tokio::sync::broadcast; use super::VectorClock; use super::state::{ - SYNC_TX, all_ops_lock, apply_and_persist, emit_event, get_crdt, rebuild_active_agent_index, - rebuild_agent_throttle_index, rebuild_index, rebuild_merge_job_index, rebuild_node_index, - rebuild_test_job_index, rebuild_token_index, track_op, vector_clock_lock, + PERSIST_PENDING, PersistMsg, SYNC_TX, all_ops_lock, apply_and_persist, emit_event, get_crdt, + rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_index, + rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index, rebuild_token_index, + track_op, vector_clock_lock, }; use super::types::{CrdtEvent, PipelineDoc}; use crate::slog; @@ -116,9 +118,15 @@ pub fn apply_remote_op(op: SignedOp) -> bool { } // Persist the op. - if let Err(e) = state.persist_tx.send(op.clone()) { + if state + .persist_tx + .send(PersistMsg::Op(Box::new(op.clone()))) + .is_ok() + { + PERSIST_PENDING.fetch_add(1, Ordering::Relaxed); + } else { crate::slog_error!( - "[crdt] Failed to send remote op to persist task: {e}; persist task may be dead. \ + "[crdt] Failed to send remote op to persist task; persist task may be dead. \ In-memory state is now ahead of persisted state." ); } diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index 93429531..71473024 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -6,7 +6,9 @@ use std::collections::HashMap; use bft_json_crdt::json_crdt::*; use bft_json_crdt::op::{OpId, ROOT_ID}; -use super::state::{all_ops_lock, apply_and_persist, get_crdt, rebuild_index}; +use std::sync::atomic::Ordering; + +use super::state::{PERSIST_PENDING, all_ops_lock, apply_and_persist, get_crdt, rebuild_index}; use super::types::{PipelineDoc, PipelineItemCrdt, PipelineItemView}; // ── Debug dump ─────────────────────────────────────────────────────── @@ -44,6 +46,8 @@ pub struct CrdtStateDump { pub max_seq_in_list: u64, /// Count of ops in the ALL_OPS journal (persisted ops replayed at startup). pub persisted_ops_count: usize, + /// Count of ops queued in the persistence channel not yet written to SQLite. + pub pending_persist_ops_count: usize, pub items: Vec, } @@ -61,6 +65,7 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump { let persisted_ops_count = all_ops_lock() .and_then(|m| m.lock().ok().map(|v| v.len())) .unwrap_or(0); + let pending_persist_ops_count = PERSIST_PENDING.load(Ordering::Relaxed); let Some(state_mutex) = get_crdt() else { return CrdtStateDump { @@ -69,6 +74,7 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump { total_ops_in_list: 0, max_seq_in_list: 0, persisted_ops_count, + pending_persist_ops_count, items: Vec::new(), }; }; @@ -80,6 +86,7 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump { total_ops_in_list: 0, max_seq_in_list: 0, persisted_ops_count, + pending_persist_ops_count, items: Vec::new(), }; }; @@ -179,6 +186,7 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump { total_ops_in_list, max_seq_in_list, persisted_ops_count, + pending_persist_ops_count, items, } } diff --git a/server/src/crdt_state/state/apply.rs b/server/src/crdt_state/state/apply.rs index a98e11e9..7ee7bacd 100644 --- a/server/src/crdt_state/state/apply.rs +++ b/server/src/crdt_state/state/apply.rs @@ -5,11 +5,13 @@ //! it to the live document, sends it to the persistence channel, and broadcasts //! it to sync peers via [`super::SYNC_TX`]. +use std::sync::atomic::Ordering; + use bft_json_crdt::json_crdt::JsonValue; use bft_json_crdt::op::Op; use super::super::types::CrdtEvent; -use super::{CrdtState, statics}; +use super::{CrdtState, init::PersistMsg, statics}; /// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the /// persistence channel. The closure receives `&mut CrdtState` so it can @@ -21,7 +23,13 @@ where let raw_op = op_fn(state); let signed = raw_op.sign(&state.keypair); state.crdt.apply(signed.clone()); - if state.persist_tx.send(signed.clone()).is_err() { + if state + .persist_tx + .send(PersistMsg::Op(Box::new(signed.clone()))) + .is_ok() + { + statics::PERSIST_PENDING.fetch_add(1, Ordering::Relaxed); + } else { let op_type = if signed.inner.is_deleted { "Delete" } else { diff --git a/server/src/crdt_state/state/init.rs b/server/src/crdt_state/state/init.rs index 64c3d00e..36bc9df8 100644 --- a/server/src/crdt_state/state/init.rs +++ b/server/src/crdt_state/state/init.rs @@ -8,12 +8,13 @@ use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::Mutex; +use std::sync::atomic::Ordering; use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue, SignedOp}; use bft_json_crdt::keypair::{Ed25519KeyPair, make_keypair}; use sqlx::SqlitePool; use sqlx::sqlite::SqliteConnectOptions; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{broadcast, mpsc, oneshot}; use super::super::VectorClock; use super::super::hex; @@ -23,10 +24,18 @@ use super::indices::{ rebuild_index, rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index, rebuild_token_index, }; -use super::statics::{ALL_OPS, CRDT_EVENT_TX, SYNC_TX, VECTOR_CLOCK}; +use super::statics::{ALL_OPS, CRDT_EVENT_TX, PERSIST_PENDING, SYNC_TX, VECTOR_CLOCK}; use super::{CRDT_STATE, CrdtState}; use crate::slog; +/// Message type for the persistence background channel. +pub(crate) enum PersistMsg { + /// Persist this op to SQLite. + Op(Box), + /// Drain: signal the sender after all preceding ops are committed. + Flush(oneshot::Sender<()>), +} + /// Initialise the CRDT state layer. /// /// Opens the SQLite database, loads or creates a node keypair, replays any @@ -119,35 +128,46 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { ); // Spawn background persistence task. - let (persist_tx, mut persist_rx) = mpsc::unbounded_channel::(); + let (persist_tx, mut persist_rx) = mpsc::unbounded_channel::(); tokio::spawn(async move { - while let Some(op) = persist_rx.recv().await { - let op_json = match serde_json::to_string(&op) { - Ok(j) => j, - Err(e) => { - slog!("[crdt] Failed to serialize op: {e}"); - continue; + while let Some(msg) = persist_rx.recv().await { + match msg { + PersistMsg::Op(op) => { + let op = *op; + let op_json = match serde_json::to_string(&op) { + Ok(j) => j, + Err(e) => { + slog!("[crdt] Failed to serialize op: {e}"); + PERSIST_PENDING.fetch_sub(1, Ordering::Relaxed); + continue; + } + }; + let op_id = hex::encode(&op.id()); + let seq = op.inner.seq as i64; + let now = chrono::Utc::now().to_rfc3339(); + + let result = sqlx::query( + "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \ + VALUES (?1, ?2, ?3, ?4) \ + ON CONFLICT(op_id) DO NOTHING", + ) + .bind(&op_id) + .bind(seq) + .bind(&op_json) + .bind(&now) + .execute(&pool) + .await; + + if let Err(e) = result { + slog!("[crdt] Failed to persist op {}: {e}", &op_id[..12]); + } + PERSIST_PENDING.fetch_sub(1, Ordering::Relaxed); + } + PersistMsg::Flush(reply) => { + // All ops queued before this message have already been processed. + let _ = reply.send(()); } - }; - let op_id = hex::encode(&op.id()); - let seq = op.inner.seq as i64; - let now = chrono::Utc::now().to_rfc3339(); - - let result = sqlx::query( - "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \ - VALUES (?1, ?2, ?3, ?4) \ - ON CONFLICT(op_id) DO NOTHING", - ) - .bind(&op_id) - .bind(seq) - .bind(&op_json) - .bind(&now) - .execute(&pool) - .await; - - if let Err(e) = result { - slog!("[crdt] Failed to persist op {}: {e}", &op_id[..12]); } } }); @@ -181,6 +201,43 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { Ok(()) } +/// Signal the persistence background task to drain and wait until all currently-queued +/// ops have been written to SQLite, or until `timeout` elapses. +/// +/// Because the persistence channel is FIFO, a `Flush` sentinel processed by the task +/// guarantees that every `Op` sent before it has already been committed. On timeout a +/// warning is logged with the queue depth so regressions are visible in logs. +pub(crate) async fn flush_persistence(timeout: std::time::Duration) { + let Some(state_mutex) = super::get_crdt() else { + return; + }; + let persist_tx = { + let Ok(state) = state_mutex.lock() else { + return; + }; + state.persist_tx.clone() + }; + let pending_at_send = PERSIST_PENDING.load(Ordering::Relaxed); + let (tx, rx) = oneshot::channel(); + if persist_tx.send(PersistMsg::Flush(tx)).is_err() { + slog!("[rebuild] Persistence channel closed — skipping flush"); + return; + } + match tokio::time::timeout(timeout, rx).await { + Ok(_) => { + slog!("[rebuild] Persistence channel drained ({pending_at_send} ops flushed)"); + } + Err(_) => { + let pending_now = PERSIST_PENDING.load(Ordering::Relaxed); + slog!( + "[rebuild] WARNING: persistence flush timed out after {}ms; \ + queue_depth_at_send={pending_at_send} queue_depth_now={pending_now}", + timeout.as_millis() + ); + } + } +} + /// Load or create the Ed25519 keypair used by this node. async fn load_or_create_keypair(pool: &SqlitePool) -> Result { let row: Option<(Vec,)> = diff --git a/server/src/crdt_state/state/mod.rs b/server/src/crdt_state/state/mod.rs index b00f89eb..8e0ff9e6 100644 --- a/server/src/crdt_state/state/mod.rs +++ b/server/src/crdt_state/state/mod.rs @@ -27,6 +27,7 @@ mod tests; // ── Re-exports for crdt_state siblings ────────────────────────────── pub use init::init; +pub(crate) use init::{PersistMsg, flush_persistence}; /// Subscribe to CRDT state-transition events. /// @@ -41,8 +42,8 @@ pub(super) use indices::{ rebuild_index, rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index, rebuild_token_index, }; +pub(crate) use statics::{PERSIST_PENDING, all_ops_lock, vector_clock_lock}; pub(super) use statics::{SYNC_TX, track_op}; -pub(crate) use statics::{all_ops_lock, vector_clock_lock}; // ── CrdtState struct ───────────────────────────────────────────────── @@ -66,8 +67,8 @@ pub(super) struct CrdtState { pub(super) agent_throttle_index: HashMap, /// Maps project name → index in the gateway_projects ListCrdt for O(1) lookup. pub(super) gateway_project_index: HashMap, - /// Channel sender for fire-and-forget op persistence. - pub(super) persist_tx: mpsc::UnboundedSender, + /// Channel sender for op persistence and drain signalling. + pub(super) persist_tx: mpsc::UnboundedSender, /// Max sequence number seen across all ops during init() replay. /// /// Newly-created registers (post-init) must have their Lamport clock @@ -128,7 +129,7 @@ pub(super) fn get_crdt() -> Option<&'static Mutex> { pub fn init_for_test() { let keypair = make_keypair(); let crdt = BaseCrdt::::new(&keypair); - let (persist_tx, rx) = mpsc::unbounded_channel(); + let (persist_tx, rx) = mpsc::unbounded_channel::(); // Leak the receiver so the channel stays open: apply_and_persist // can then send without error, preventing [crdt_persist] WARNs // from racing with other tests that watch the global log buffer. diff --git a/server/src/crdt_state/state/statics.rs b/server/src/crdt_state/state/statics.rs index 161c2018..78e1026e 100644 --- a/server/src/crdt_state/state/statics.rs +++ b/server/src/crdt_state/state/statics.rs @@ -10,6 +10,7 @@ //! tests do not share `ALL_OPS` — preventing one test's `apply_compaction` //! from pruning another test's freshly-written ops. +use std::sync::atomic::AtomicUsize; use std::sync::{Mutex, OnceLock}; use bft_json_crdt::json_crdt::SignedOp; @@ -19,6 +20,14 @@ use super::super::VectorClock; use super::super::hex; use super::super::types::CrdtEvent; +/// Count of ops queued in the persistence channel that have not yet been written to SQLite. +/// +/// Incremented when an op is sent into the channel; decremented after the +/// persistence task commits it. Exposed via `dump_crdt_state` as +/// `pending_persist_ops_count` so operators can tell whether there is a flush +/// backlog before calling `rebuild_and_restart`. +pub(crate) static PERSIST_PENDING: AtomicUsize = AtomicUsize::new(0); + /// Broadcast channel for CRDT events (stage transitions, etc.). pub(super) static CRDT_EVENT_TX: OnceLock> = OnceLock::new(); diff --git a/server/src/crdt_state/state/tests.rs b/server/src/crdt_state/state/tests.rs index 3e37c8f8..5a2e8f1d 100644 --- a/server/src/crdt_state/state/tests.rs +++ b/server/src/crdt_state/state/tests.rs @@ -6,6 +6,7 @@ use super::super::hex; use super::super::read::extract_item_view; use super::super::types::PipelineDoc; +use super::init::PersistMsg; use super::*; use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue, SignedOp}; use bft_json_crdt::keypair::make_keypair; @@ -222,7 +223,7 @@ async fn init_and_write_read_roundtrip() { fn persist_tx_send_failure_logs_warn_with_op_type_and_seq() { let kp = make_keypair(); let crdt = BaseCrdt::::new(&kp); - let (persist_tx, persist_rx) = mpsc::unbounded_channel::(); + let (persist_tx, persist_rx) = mpsc::unbounded_channel::(); let mut state = CrdtState { crdt, @@ -296,7 +297,7 @@ fn persist_tx_send_failure_logs_warn_with_op_type_and_seq() { fn persist_tx_send_success_emits_no_warn() { let kp = make_keypair(); let crdt = BaseCrdt::::new(&kp); - let (persist_tx, _persist_rx) = mpsc::unbounded_channel::(); + let (persist_tx, _persist_rx) = mpsc::unbounded_channel::(); let mut state = CrdtState { crdt, @@ -485,3 +486,102 @@ async fn restart_new_register_resumes_from_lamport_floor() { max_seq, ); } + +/// Regression test for story 1116: ops sent before `flush_persistence` must all be +/// present in the `crdt_ops` SQLite table after the flush completes. +/// +/// Bug: `rebuild_and_restart` called `exec()` before the persistence task had +/// a chance to drain the unbounded channel, silently dropping queued ops. +/// +/// Reproducer: apply N ops → call `rebuild_and_restart` → the process re-execs +/// and on the next startup `persisted_ops_count` is < N (lost ops). +/// Fixed by: send a `Flush` sentinel through the channel before `exec()`; the +/// task echoes back only after all preceding `Op` messages are committed. +#[tokio::test] +async fn flush_persistence_drains_all_ops_before_ack() { + use std::sync::atomic::Ordering; + use tokio::sync::oneshot; + + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("flush_drain_test.db"); + + let options = SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + let pool = SqlitePool::connect_with(options).await.unwrap(); + sqlx::migrate!("./migrations").run(&pool).await.unwrap(); + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + // Spawn an isolated persistence task — same logic as init() but without + // touching the global singleton (keeping this test fully self-contained). + let (tx, mut rx) = mpsc::unbounded_channel::(); + let pool_clone = pool.clone(); + tokio::spawn(async move { + use std::sync::atomic::AtomicUsize; + let counter = AtomicUsize::new(0); + while let Some(msg) = rx.recv().await { + match msg { + PersistMsg::Op(op) => { + let op_json = serde_json::to_string(&op).unwrap(); + let op_id = hex::encode(&op.id()); + let seq = op.inner.seq as i64; + let now = chrono::Utc::now().to_rfc3339(); + sqlx::query( + "INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \ + VALUES (?1, ?2, ?3, ?4) ON CONFLICT(op_id) DO NOTHING", + ) + .bind(&op_id) + .bind(seq) + .bind(&op_json) + .bind(&now) + .execute(&pool_clone) + .await + .unwrap(); + counter.fetch_add(1, Ordering::Relaxed); + } + PersistMsg::Flush(reply) => { + let _ = reply.send(()); + } + } + } + }); + + const N: usize = 10; + for i in 0..N { + let item: JsonValue = json!({ + "story_id": format!("1116_drain_{i}"), + "stage": "1_backlog", + "name": format!("Drain Test {i}"), + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt.apply(op.clone()); + tx.send(PersistMsg::Op(Box::new(op))).unwrap(); + } + + // Send flush sentinel and wait — all N ops must be committed first. + let (flush_tx, flush_rx) = oneshot::channel(); + tx.send(PersistMsg::Flush(flush_tx)).unwrap(); + tokio::time::timeout(std::time::Duration::from_secs(5), flush_rx) + .await + .expect("flush timed out — persistence task did not drain within 5 s") + .expect("flush oneshot dropped unexpectedly"); + + // Verify all N ops are in the database. + let (count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM crdt_ops") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!( + count as usize, N, + "all {N} ops must be in crdt_ops after flush; got {count}" + ); +} diff --git a/server/src/http/mcp/diagnostics/mod.rs b/server/src/http/mcp/diagnostics/mod.rs index 7421cb88..b15da0e4 100644 --- a/server/src/http/mcp/diagnostics/mod.rs +++ b/server/src/http/mcp/diagnostics/mod.rs @@ -126,7 +126,7 @@ pub(crate) fn tool_dump_crdt(args: &Value) -> Result { "total_ops_in_list": dump.total_ops_in_list, "max_seq_in_list": dump.max_seq_in_list, "persisted_ops_count": dump.persisted_ops_count, - "pending_persist_ops_count": null, + "pending_persist_ops_count": dump.pending_persist_ops_count, }, "items": items, })) diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index edbdf6d7..bc13afcf 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -203,7 +203,7 @@ pub fn debug_crdt_handler(req: &poem::Request) -> poem::Response { "total_ops_in_list": dump.total_ops_in_list, "max_seq_in_list": dump.max_seq_in_list, "persisted_ops_count": dump.persisted_ops_count, - "pending_persist_ops_count": null, + "pending_persist_ops_count": dump.pending_persist_ops_count, }, "items": items, }); diff --git a/server/src/rebuild.rs b/server/src/rebuild.rs index 20ea1a25..761e9405 100644 --- a/server/src/rebuild.rs +++ b/server/src/rebuild.rs @@ -189,6 +189,11 @@ pub async fn rebuild_and_restart( n.notify(ShutdownReason::Rebuild).await; } + // 5b. Drain the persistence channel so no queued ops are lost when exec() + // replaces this process. Times out after 5 s with a logged warning + // naming the queue depth so any regression is visible in logs. + crate::crdt_state::flush_persistence(std::time::Duration::from_secs(5)).await; + // 6. Re-exec with the new binary. // Use the cargo output path rather than current_exe() so that rebuilds // inside Docker work correctly — the running binary may be installed at