huskies: merge 838
This commit is contained in:
@@ -1,869 +0,0 @@
|
||||
//! Internal CRDT state struct, statics, initialisation, and central write primitive.
|
||||
|
||||
#![allow(unused_imports, dead_code)]
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::{Mutex, OnceLock};
|
||||
|
||||
use bft_json_crdt::json_crdt::*;
|
||||
use bft_json_crdt::keypair::make_keypair;
|
||||
use fastcrypto::ed25519::Ed25519KeyPair;
|
||||
use fastcrypto::traits::ToFromBytes;
|
||||
use serde_json::json;
|
||||
use sqlx::SqlitePool;
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
|
||||
use super::VectorClock;
|
||||
use super::hex;
|
||||
use super::types::{CrdtEvent, PipelineDoc};
|
||||
use crate::slog;
|
||||
|
||||
// ── Sync broadcast channels ──────────────────────────────────────────
|
||||
|
||||
pub(super) static CRDT_EVENT_TX: OnceLock<broadcast::Sender<CrdtEvent>> = OnceLock::new();
|
||||
|
||||
pub(super) static SYNC_TX: OnceLock<broadcast::Sender<SignedOp>> = OnceLock::new();
|
||||
|
||||
/// All persisted ops as JSON strings, in causal (insertion) order.
|
||||
///
|
||||
/// Pub(crate) so that `crdt_snapshot` can access it for compaction.
|
||||
pub(crate) static ALL_OPS: OnceLock<Mutex<Vec<String>>> = OnceLock::new();
|
||||
|
||||
/// Live vector clock tracking op counts per author.
|
||||
///
|
||||
/// Updated in lockstep with `ALL_OPS` — every time an op is appended to the
|
||||
/// journal, the corresponding author's count is incremented here. This avoids
|
||||
/// re-parsing all ops when a peer requests `our_vector_clock()`.
|
||||
pub(crate) static VECTOR_CLOCK: OnceLock<Mutex<super::VectorClock>> = OnceLock::new();
|
||||
|
||||
/// Append an op's JSON to `ALL_OPS` and bump the author's count in `VECTOR_CLOCK`.
|
||||
///
|
||||
/// Centralises the bookkeeping that must stay in sync between the two statics.
|
||||
pub(super) fn track_op(signed: &SignedOp, json: String) {
|
||||
if let Some(all) = ALL_OPS.get()
|
||||
&& let Ok(mut v) = all.lock()
|
||||
{
|
||||
v.push(json);
|
||||
}
|
||||
if let Some(vc) = VECTOR_CLOCK.get()
|
||||
&& let Ok(mut clock) = vc.lock()
|
||||
{
|
||||
let author_hex = super::hex::encode(&signed.author());
|
||||
*clock.entry(author_hex).or_insert(0) += 1;
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) struct CrdtState {
|
||||
pub(super) crdt: BaseCrdt<PipelineDoc>,
|
||||
pub(super) keypair: Ed25519KeyPair,
|
||||
/// Maps story_id → index in the items ListCrdt for O(1) lookup.
|
||||
pub(super) index: HashMap<String, usize>,
|
||||
/// Maps node_id (hex) → index in the nodes ListCrdt for O(1) lookup.
|
||||
pub(super) node_index: HashMap<String, usize>,
|
||||
/// Maps agent_id → index in the tokens ListCrdt for O(1) lookup.
|
||||
pub(super) token_index: HashMap<String, usize>,
|
||||
/// Maps story_id → index in the merge_jobs ListCrdt for O(1) lookup.
|
||||
pub(super) merge_job_index: HashMap<String, usize>,
|
||||
/// Maps agent_id → index in the active_agents ListCrdt for O(1) lookup.
|
||||
pub(super) active_agent_index: HashMap<String, usize>,
|
||||
/// Maps story_id → index in the test_jobs ListCrdt for O(1) lookup.
|
||||
pub(super) test_job_index: HashMap<String, usize>,
|
||||
/// Maps node_id → index in the agent_throttle ListCrdt for O(1) lookup.
|
||||
pub(super) agent_throttle_index: HashMap<String, usize>,
|
||||
/// Maps project name → index in the gateway_projects ListCrdt for O(1) lookup.
|
||||
pub(super) gateway_project_index: HashMap<String, usize>,
|
||||
/// Channel sender for fire-and-forget op persistence.
|
||||
pub(super) persist_tx: mpsc::UnboundedSender<SignedOp>,
|
||||
/// Max sequence number seen across all ops during init() replay.
|
||||
///
|
||||
/// Newly-created registers (post-init) must have their Lamport clock
|
||||
/// advanced to this floor so they don't re-emit low sequence numbers.
|
||||
pub(super) lamport_floor: u64,
|
||||
}
|
||||
|
||||
static CRDT_STATE: OnceLock<Mutex<CrdtState>> = OnceLock::new();
|
||||
|
||||
#[cfg(test)]
|
||||
thread_local! {
|
||||
static CRDT_STATE_TL: OnceLock<Mutex<CrdtState>> = const { OnceLock::new() };
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub(super) fn get_crdt() -> Option<&'static Mutex<CrdtState>> {
|
||||
CRDT_STATE.get()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(super) fn get_crdt() -> Option<&'static Mutex<CrdtState>> {
|
||||
let tl = CRDT_STATE_TL.with(|lock| {
|
||||
if lock.get().is_some() {
|
||||
Some(lock as *const OnceLock<Mutex<CrdtState>>)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
if let Some(ptr) = tl {
|
||||
// SAFETY: The thread-local lives as long as the thread, which outlives
|
||||
// any test using it. We only need 'static for the return type.
|
||||
let lock = unsafe { &*ptr };
|
||||
lock.get()
|
||||
} else {
|
||||
CRDT_STATE.get()
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialise the CRDT state layer.
|
||||
///
|
||||
/// Opens the SQLite database, loads or creates a node keypair, replays any
|
||||
/// persisted ops to reconstruct state, and spawns a background persistence
|
||||
/// task. Safe to call only once; subsequent calls are no-ops.
|
||||
pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
|
||||
if CRDT_STATE.get().is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let options = SqliteConnectOptions::new()
|
||||
.filename(db_path)
|
||||
.create_if_missing(true);
|
||||
let pool = SqlitePool::connect_with(options).await?;
|
||||
sqlx::migrate!("./migrations").run(&pool).await?;
|
||||
|
||||
// Load or create the node keypair.
|
||||
let keypair = load_or_create_keypair(&pool).await?;
|
||||
let mut crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
|
||||
|
||||
// Replay persisted ops to reconstruct state.
|
||||
let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
|
||||
.fetch_all(&pool)
|
||||
.await?;
|
||||
|
||||
let mut all_ops_vec = Vec::with_capacity(rows.len());
|
||||
let mut vector_clock = VectorClock::new();
|
||||
let mut lamport_floor: u64 = 0;
|
||||
for (op_json,) in &rows {
|
||||
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json) {
|
||||
let author_hex = hex::encode(&signed_op.author());
|
||||
*vector_clock.entry(author_hex).or_insert(0) += 1;
|
||||
lamport_floor = lamport_floor.max(signed_op.inner.seq);
|
||||
crdt.apply(signed_op);
|
||||
all_ops_vec.push(op_json.clone());
|
||||
} else {
|
||||
slog!("[crdt] Warning: failed to deserialize stored op");
|
||||
}
|
||||
}
|
||||
let _ = ALL_OPS.set(Mutex::new(all_ops_vec));
|
||||
let _ = VECTOR_CLOCK.set(Mutex::new(vector_clock));
|
||||
|
||||
// Build the indices from the reconstructed state.
|
||||
let index = rebuild_index(&crdt);
|
||||
let node_index = rebuild_node_index(&crdt);
|
||||
let token_index = rebuild_token_index(&crdt);
|
||||
let merge_job_index = rebuild_merge_job_index(&crdt);
|
||||
let active_agent_index = rebuild_active_agent_index(&crdt);
|
||||
let test_job_index = rebuild_test_job_index(&crdt);
|
||||
let agent_throttle_index = rebuild_agent_throttle_index(&crdt);
|
||||
let gateway_project_index = rebuild_gateway_project_index(&crdt);
|
||||
|
||||
// Advance the top-level list clocks to the Lamport floor so that
|
||||
// list-level inserts don't re-emit low seq numbers.
|
||||
crdt.doc.items.advance_seq(lamport_floor);
|
||||
crdt.doc.nodes.advance_seq(lamport_floor);
|
||||
crdt.doc.tokens.advance_seq(lamport_floor);
|
||||
crdt.doc.merge_jobs.advance_seq(lamport_floor);
|
||||
crdt.doc.active_agents.advance_seq(lamport_floor);
|
||||
crdt.doc.test_jobs.advance_seq(lamport_floor);
|
||||
crdt.doc.agent_throttle.advance_seq(lamport_floor);
|
||||
crdt.doc.gateway_projects.advance_seq(lamport_floor);
|
||||
crdt.doc
|
||||
.gateway_config
|
||||
.active_project
|
||||
.advance_seq(lamport_floor);
|
||||
|
||||
slog!(
|
||||
"[crdt] Initialised: {} ops replayed, {} items indexed, {} nodes indexed, lamport_floor={}",
|
||||
rows.len(),
|
||||
index.len(),
|
||||
node_index.len(),
|
||||
lamport_floor,
|
||||
);
|
||||
|
||||
// Spawn background persistence task.
|
||||
let (persist_tx, mut persist_rx) = mpsc::unbounded_channel::<SignedOp>();
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(op) = persist_rx.recv().await {
|
||||
let op_json = match serde_json::to_string(&op) {
|
||||
Ok(j) => j,
|
||||
Err(e) => {
|
||||
slog!("[crdt] Failed to serialize op: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let op_id = hex::encode(&op.id());
|
||||
let seq = op.inner.seq as i64;
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
|
||||
let result = sqlx::query(
|
||||
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \
|
||||
VALUES (?1, ?2, ?3, ?4) \
|
||||
ON CONFLICT(op_id) DO NOTHING",
|
||||
)
|
||||
.bind(&op_id)
|
||||
.bind(seq)
|
||||
.bind(&op_json)
|
||||
.bind(&now)
|
||||
.execute(&pool)
|
||||
.await;
|
||||
|
||||
if let Err(e) = result {
|
||||
slog!("[crdt] Failed to persist op {}: {e}", &op_id[..12]);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let state = CrdtState {
|
||||
crdt,
|
||||
keypair,
|
||||
index,
|
||||
node_index,
|
||||
token_index,
|
||||
merge_job_index,
|
||||
active_agent_index,
|
||||
test_job_index,
|
||||
agent_throttle_index,
|
||||
gateway_project_index,
|
||||
persist_tx,
|
||||
lamport_floor,
|
||||
};
|
||||
|
||||
let _ = CRDT_STATE.set(Mutex::new(state));
|
||||
|
||||
// Initialise the CRDT event broadcast channel.
|
||||
let (event_tx, _) = broadcast::channel::<CrdtEvent>(256);
|
||||
let _ = CRDT_EVENT_TX.set(event_tx);
|
||||
|
||||
// Initialise the sync broadcast channel for outgoing ops.
|
||||
let (sync_tx, _) = broadcast::channel::<SignedOp>(1024);
|
||||
let _ = SYNC_TX.set(sync_tx);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Initialise a minimal in-memory CRDT state for unit tests.
|
||||
///
|
||||
/// This avoids the async SQLite setup from `init()`. Ops are accepted via a
|
||||
/// channel whose receiver is immediately dropped, so nothing is persisted.
|
||||
/// Safe to call multiple times — subsequent calls are no-ops (OnceLock).
|
||||
#[cfg(test)]
|
||||
pub fn init_for_test() {
|
||||
// Initialise thread-local CRDT for test isolation.
|
||||
// Only creates a new CRDT if one isn't set yet on this thread;
|
||||
// subsequent calls are no-ops (matching the old OnceLock semantics
|
||||
// while keeping each thread isolated).
|
||||
CRDT_STATE_TL.with(|lock| {
|
||||
if lock.get().is_none() {
|
||||
let keypair = make_keypair();
|
||||
let crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
|
||||
let (persist_tx, _rx) = mpsc::unbounded_channel();
|
||||
let state = CrdtState {
|
||||
crdt,
|
||||
keypair,
|
||||
index: HashMap::new(),
|
||||
node_index: HashMap::new(),
|
||||
token_index: HashMap::new(),
|
||||
merge_job_index: HashMap::new(),
|
||||
active_agent_index: HashMap::new(),
|
||||
test_job_index: HashMap::new(),
|
||||
agent_throttle_index: HashMap::new(),
|
||||
gateway_project_index: HashMap::new(),
|
||||
persist_tx,
|
||||
lamport_floor: 0,
|
||||
};
|
||||
let _ = lock.set(Mutex::new(state));
|
||||
}
|
||||
});
|
||||
let _ = CRDT_EVENT_TX.get_or_init(|| broadcast::channel::<CrdtEvent>(256).0);
|
||||
let _ = SYNC_TX.get_or_init(|| broadcast::channel::<SignedOp>(1024).0);
|
||||
let _ = ALL_OPS.get_or_init(|| Mutex::new(Vec::new()));
|
||||
let _ = VECTOR_CLOCK.get_or_init(|| Mutex::new(VectorClock::new()));
|
||||
}
|
||||
|
||||
/// Load or create the Ed25519 keypair used by this node.
|
||||
async fn load_or_create_keypair(pool: &SqlitePool) -> Result<Ed25519KeyPair, sqlx::Error> {
|
||||
let row: Option<(Vec<u8>,)> =
|
||||
sqlx::query_as("SELECT seed FROM crdt_node_identity WHERE id = 1")
|
||||
.fetch_optional(pool)
|
||||
.await?;
|
||||
|
||||
if let Some((seed,)) = row {
|
||||
// Reconstruct from stored seed. The seed is the 32-byte private key.
|
||||
if let Ok(kp) = Ed25519KeyPair::from_bytes(&seed) {
|
||||
return Ok(kp);
|
||||
}
|
||||
slog!("[crdt] Stored keypair invalid, regenerating");
|
||||
}
|
||||
|
||||
let kp = make_keypair();
|
||||
let seed = kp.as_bytes().to_vec();
|
||||
sqlx::query("INSERT INTO crdt_node_identity (id, seed) VALUES (1, ?1) ON CONFLICT(id) DO UPDATE SET seed = excluded.seed")
|
||||
.bind(&seed)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(kp)
|
||||
}
|
||||
|
||||
/// Rebuild the story_id → list index mapping from the current CRDT state.
|
||||
pub(super) fn rebuild_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, item) in crdt.doc.items.iter().enumerate() {
|
||||
if let JsonValue::String(ref sid) = item.story_id.view() {
|
||||
map.insert(sid.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
/// Rebuild the node_id → nodes list index mapping from the current CRDT state.
|
||||
pub(super) fn rebuild_node_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, node) in crdt.doc.nodes.iter().enumerate() {
|
||||
if let JsonValue::String(ref nid) = node.node_id.view() {
|
||||
map.insert(nid.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
/// Rebuild the agent_id → tokens list index.
|
||||
pub(super) fn rebuild_token_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, entry) in crdt.doc.tokens.iter().enumerate() {
|
||||
if let JsonValue::String(ref k) = entry.agent_id.view() {
|
||||
map.insert(k.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
/// Rebuild the story_id → merge_jobs list index.
|
||||
pub(super) fn rebuild_merge_job_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, entry) in crdt.doc.merge_jobs.iter().enumerate() {
|
||||
if let JsonValue::String(ref k) = entry.story_id.view() {
|
||||
map.insert(k.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
/// Rebuild the agent_id → active_agents list index.
|
||||
pub(super) fn rebuild_active_agent_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, entry) in crdt.doc.active_agents.iter().enumerate() {
|
||||
if let JsonValue::String(ref k) = entry.agent_id.view() {
|
||||
map.insert(k.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
/// Rebuild the story_id → test_jobs list index.
|
||||
pub(super) fn rebuild_test_job_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, entry) in crdt.doc.test_jobs.iter().enumerate() {
|
||||
if let JsonValue::String(ref k) = entry.story_id.view() {
|
||||
map.insert(k.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
/// Rebuild the node_id → agent_throttle list index.
|
||||
pub(super) fn rebuild_agent_throttle_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, entry) in crdt.doc.agent_throttle.iter().enumerate() {
|
||||
if let JsonValue::String(ref k) = entry.node_id.view() {
|
||||
map.insert(k.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
/// Rebuild the project name → gateway_projects list index.
|
||||
pub(super) fn rebuild_gateway_project_index(
|
||||
crdt: &BaseCrdt<PipelineDoc>,
|
||||
) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, entry) in crdt.doc.gateway_projects.iter().enumerate() {
|
||||
if let JsonValue::String(ref k) = entry.name.view() {
|
||||
map.insert(k.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
// ── Write path ───────────────────────────────────────────────────────
|
||||
|
||||
/// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the
|
||||
/// persistence channel. The closure receives `&mut CrdtState` so it can
|
||||
/// mutably access the CRDT document, while `sign` only needs `&keypair`.
|
||||
pub(super) fn apply_and_persist<F>(state: &mut CrdtState, op_fn: F)
|
||||
where
|
||||
F: FnOnce(&mut CrdtState) -> bft_json_crdt::op::Op<JsonValue>,
|
||||
{
|
||||
let raw_op = op_fn(state);
|
||||
let signed = raw_op.sign(&state.keypair);
|
||||
state.crdt.apply(signed.clone());
|
||||
if state.persist_tx.send(signed.clone()).is_err() {
|
||||
let op_type = if signed.inner.is_deleted {
|
||||
"Delete"
|
||||
} else {
|
||||
"Insert"
|
||||
};
|
||||
let seq = signed.inner.seq;
|
||||
crate::slog_warn!(
|
||||
"[crdt_persist] persist channel send failed: op_type={op_type} seq={seq}"
|
||||
);
|
||||
}
|
||||
|
||||
// Track in ALL_OPS + VECTOR_CLOCK, then broadcast to sync peers.
|
||||
if let Ok(json) = serde_json::to_string(&signed) {
|
||||
track_op(&signed, json);
|
||||
}
|
||||
if let Some(tx) = SYNC_TX.get() {
|
||||
let _ = tx.send(signed);
|
||||
}
|
||||
}
|
||||
|
||||
/// Broadcast a CRDT event to all subscribers.
|
||||
pub(super) fn emit_event(event: CrdtEvent) {
|
||||
if let Some(tx) = CRDT_EVENT_TX.get() {
|
||||
let _ = tx.send(event);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::super::hex;
|
||||
use super::super::read::{extract_item_view, read_item};
|
||||
use super::super::types::PipelineItemCrdt;
|
||||
use super::super::write::write_item;
|
||||
use super::*;
|
||||
use bft_json_crdt::json_crdt::OpState;
|
||||
use bft_json_crdt::keypair::make_keypair;
|
||||
use bft_json_crdt::op::ROOT_ID;
|
||||
use serde_json::json;
|
||||
|
||||
#[test]
|
||||
fn crdt_ops_replay_reconstructs_state() {
|
||||
let kp = make_keypair();
|
||||
let mut crdt1 = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
|
||||
// Build state with a series of ops.
|
||||
let item_json: JsonValue = json!({
|
||||
"story_id": "30_story_replay",
|
||||
"stage": "1_backlog",
|
||||
"name": "Replay Test",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
"claimed_by": "",
|
||||
"claimed_at": 0.0,
|
||||
})
|
||||
.into();
|
||||
|
||||
let op1 = crdt1.doc.items.insert(ROOT_ID, item_json).sign(&kp);
|
||||
crdt1.apply(op1.clone());
|
||||
|
||||
let op2 = crdt1.doc.items[0]
|
||||
.stage
|
||||
.set("2_current".to_string())
|
||||
.sign(&kp);
|
||||
crdt1.apply(op2.clone());
|
||||
|
||||
let op3 = crdt1.doc.items[0]
|
||||
.name
|
||||
.set("Updated Name".to_string())
|
||||
.sign(&kp);
|
||||
crdt1.apply(op3.clone());
|
||||
|
||||
// Replay ops on a fresh CRDT.
|
||||
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
crdt2.apply(op1);
|
||||
crdt2.apply(op2);
|
||||
crdt2.apply(op3);
|
||||
|
||||
assert_eq!(
|
||||
crdt1.doc.items[0].stage.view(),
|
||||
crdt2.doc.items[0].stage.view()
|
||||
);
|
||||
assert_eq!(
|
||||
crdt1.doc.items[0].name.view(),
|
||||
crdt2.doc.items[0].name.view()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rebuild_index_maps_story_ids() {
|
||||
let kp = make_keypair();
|
||||
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
|
||||
for (sid, stage) in &[("10_story_a", "1_backlog"), ("20_story_b", "2_current")] {
|
||||
let item: JsonValue = json!({
|
||||
"story_id": sid,
|
||||
"stage": stage,
|
||||
"name": "",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
"claimed_by": "",
|
||||
"claimed_at": 0.0,
|
||||
})
|
||||
.into();
|
||||
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
|
||||
crdt.apply(op);
|
||||
}
|
||||
|
||||
let index = rebuild_index(&crdt);
|
||||
assert_eq!(index.len(), 2);
|
||||
assert!(index.contains_key("10_story_a"));
|
||||
assert!(index.contains_key("20_story_b"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_and_write_read_roundtrip() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let db_path = tmp.path().join("crdt_test.db");
|
||||
|
||||
// Init directly (not via the global singleton, for test isolation).
|
||||
let options = SqliteConnectOptions::new()
|
||||
.filename(&db_path)
|
||||
.create_if_missing(true);
|
||||
let pool = SqlitePool::connect_with(options).await.unwrap();
|
||||
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
||||
|
||||
let keypair = make_keypair();
|
||||
let mut crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
|
||||
|
||||
// Insert and update like write_item does.
|
||||
let item_json: JsonValue = json!({
|
||||
"story_id": "50_story_roundtrip",
|
||||
"stage": "1_backlog",
|
||||
"name": "Roundtrip",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
"claimed_by": "",
|
||||
"claimed_at": 0.0,
|
||||
})
|
||||
.into();
|
||||
|
||||
let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&keypair);
|
||||
crdt.apply(insert_op.clone());
|
||||
|
||||
// Persist the op.
|
||||
let op_json = serde_json::to_string(&insert_op).unwrap();
|
||||
let op_id = hex::encode(&insert_op.id());
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
sqlx::query(
|
||||
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)",
|
||||
)
|
||||
.bind(&op_id)
|
||||
.bind(insert_op.inner.seq as i64)
|
||||
.bind(&op_json)
|
||||
.bind(&now)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Reconstruct from DB.
|
||||
let rows: Vec<(String,)> =
|
||||
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&keypair);
|
||||
for (json_str,) in &rows {
|
||||
let op: SignedOp = serde_json::from_str(json_str).unwrap();
|
||||
crdt2.apply(op);
|
||||
}
|
||||
|
||||
let view = extract_item_view(&crdt2.doc.items[0]).unwrap();
|
||||
assert_eq!(view.story_id, "50_story_roundtrip");
|
||||
assert_eq!(view.stage, "1_backlog");
|
||||
assert_eq!(view.name.as_deref(), Some("Roundtrip"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persist_tx_send_failure_logs_warn_with_op_type_and_seq() {
|
||||
let kp = make_keypair();
|
||||
let crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
let (persist_tx, persist_rx) = mpsc::unbounded_channel::<SignedOp>();
|
||||
|
||||
let mut state = CrdtState {
|
||||
crdt,
|
||||
keypair: kp,
|
||||
index: HashMap::new(),
|
||||
node_index: HashMap::new(),
|
||||
token_index: HashMap::new(),
|
||||
merge_job_index: HashMap::new(),
|
||||
active_agent_index: HashMap::new(),
|
||||
test_job_index: HashMap::new(),
|
||||
agent_throttle_index: HashMap::new(),
|
||||
gateway_project_index: HashMap::new(),
|
||||
persist_tx,
|
||||
lamport_floor: 0,
|
||||
};
|
||||
|
||||
// Drop the receiver so that the next send fails immediately.
|
||||
drop(persist_rx);
|
||||
|
||||
let item_json: JsonValue = json!({
|
||||
"story_id": "676_story_persist_fail",
|
||||
"stage": "1_backlog",
|
||||
"name": "Persist Fail Test",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
"claimed_by": "",
|
||||
"claimed_at": 0.0,
|
||||
})
|
||||
.into();
|
||||
|
||||
let before_warns = crate::log_buffer::global()
|
||||
.get_recent_entries(1000, None, Some(&crate::log_buffer::LogLevel::Warn))
|
||||
.len();
|
||||
|
||||
apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json));
|
||||
|
||||
let warn_entries = crate::log_buffer::global().get_recent_entries(
|
||||
1000,
|
||||
None,
|
||||
Some(&crate::log_buffer::LogLevel::Warn),
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
warn_entries.len(),
|
||||
before_warns + 1,
|
||||
"expected exactly one WARN log entry when persist_tx send fails"
|
||||
);
|
||||
|
||||
let warn = &warn_entries[warn_entries.len() - 1];
|
||||
assert!(
|
||||
warn.message.contains("[crdt_persist]"),
|
||||
"WARN message must be prefixed [crdt_persist]: {}",
|
||||
warn.message
|
||||
);
|
||||
assert!(
|
||||
warn.message.contains("op_type="),
|
||||
"WARN message must include op_type: {}",
|
||||
warn.message
|
||||
);
|
||||
assert!(
|
||||
warn.message.contains("seq="),
|
||||
"WARN message must include seq: {}",
|
||||
warn.message
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persist_tx_send_success_emits_no_warn() {
|
||||
let kp = make_keypair();
|
||||
let crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
let (persist_tx, _persist_rx) = mpsc::unbounded_channel::<SignedOp>();
|
||||
|
||||
let mut state = CrdtState {
|
||||
crdt,
|
||||
keypair: kp,
|
||||
index: HashMap::new(),
|
||||
node_index: HashMap::new(),
|
||||
token_index: HashMap::new(),
|
||||
merge_job_index: HashMap::new(),
|
||||
active_agent_index: HashMap::new(),
|
||||
test_job_index: HashMap::new(),
|
||||
agent_throttle_index: HashMap::new(),
|
||||
gateway_project_index: HashMap::new(),
|
||||
persist_tx,
|
||||
lamport_floor: 0,
|
||||
};
|
||||
|
||||
let item_json: JsonValue = json!({
|
||||
"story_id": "676_story_happy_path",
|
||||
"stage": "1_backlog",
|
||||
"name": "Happy Path Test",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
"claimed_by": "",
|
||||
"claimed_at": 0.0,
|
||||
})
|
||||
.into();
|
||||
|
||||
let before_warns = crate::log_buffer::global()
|
||||
.get_recent_entries(
|
||||
1000,
|
||||
Some("[crdt_persist]"),
|
||||
Some(&crate::log_buffer::LogLevel::Warn),
|
||||
)
|
||||
.len();
|
||||
|
||||
apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json));
|
||||
|
||||
let after_warns = crate::log_buffer::global()
|
||||
.get_recent_entries(
|
||||
1000,
|
||||
Some("[crdt_persist]"),
|
||||
Some(&crate::log_buffer::LogLevel::Warn),
|
||||
)
|
||||
.len();
|
||||
|
||||
assert_eq!(
|
||||
after_warns, before_warns,
|
||||
"no [crdt_persist] WARN should be emitted when persist_tx send succeeds"
|
||||
);
|
||||
}
|
||||
|
||||
/// After replaying ops from a journal, a brand-new register created
|
||||
/// post-init must emit its first local op with seq = lamport_floor + 1,
|
||||
/// not seq = 1. This is the Phase C integration test.
|
||||
#[tokio::test]
|
||||
async fn restart_new_register_resumes_from_lamport_floor() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let db_path = tmp.path().join("lamport_floor.db");
|
||||
|
||||
let options = SqliteConnectOptions::new()
|
||||
.filename(&db_path)
|
||||
.create_if_missing(true);
|
||||
let pool = SqlitePool::connect_with(options).await.unwrap();
|
||||
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
||||
|
||||
let kp = make_keypair();
|
||||
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
|
||||
// Insert an item and update its stage a few times to push seq up.
|
||||
let item: JsonValue = json!({
|
||||
"story_id": "664_story_original",
|
||||
"stage": "1_backlog",
|
||||
"name": "Original",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
"claimed_by": "",
|
||||
"claimed_at": 0.0,
|
||||
"merged_at": 0.0,
|
||||
})
|
||||
.into();
|
||||
|
||||
let mut ops = Vec::new();
|
||||
|
||||
let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
|
||||
crdt.apply(op1.clone());
|
||||
ops.push(op1);
|
||||
|
||||
let idx = rebuild_index(&crdt)["664_story_original"];
|
||||
let op2 = crdt.doc.items[idx]
|
||||
.stage
|
||||
.set("2_current".to_string())
|
||||
.sign(&kp);
|
||||
crdt.apply(op2.clone());
|
||||
ops.push(op2);
|
||||
|
||||
let op3 = crdt.doc.items[idx]
|
||||
.stage
|
||||
.set("3_review".to_string())
|
||||
.sign(&kp);
|
||||
crdt.apply(op3.clone());
|
||||
ops.push(op3);
|
||||
|
||||
// Record the max seq across all persisted ops — this is the floor.
|
||||
let max_seq = ops.iter().map(|o| o.inner.seq).max().unwrap();
|
||||
|
||||
// Persist all ops.
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
for op in &ops {
|
||||
let op_json = serde_json::to_string(op).unwrap();
|
||||
let op_id = hex::encode(&op.id());
|
||||
sqlx::query(
|
||||
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)",
|
||||
)
|
||||
.bind(&op_id)
|
||||
.bind(op.inner.seq as i64)
|
||||
.bind(&op_json)
|
||||
.bind(&now)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// --- Simulate restart: replay from journal into a fresh CRDT ---
|
||||
let rows: Vec<(String,)> =
|
||||
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
let mut lamport_floor: u64 = 0;
|
||||
for (json_str,) in &rows {
|
||||
let signed: SignedOp = serde_json::from_str(json_str).unwrap();
|
||||
lamport_floor = lamport_floor.max(signed.inner.seq);
|
||||
crdt2.apply(signed);
|
||||
}
|
||||
|
||||
// Advance top-level lists (mirrors what init() does).
|
||||
crdt2.doc.items.advance_seq(lamport_floor);
|
||||
crdt2.doc.nodes.advance_seq(lamport_floor);
|
||||
|
||||
assert_eq!(lamport_floor, max_seq);
|
||||
|
||||
// Insert a brand-new item — simulating a new story arriving after restart.
|
||||
let new_item: JsonValue = json!({
|
||||
"story_id": "664_story_new_after_restart",
|
||||
"stage": "1_backlog",
|
||||
"name": "New After Restart",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
"claimed_by": "",
|
||||
"claimed_at": 0.0,
|
||||
"merged_at": 0.0,
|
||||
})
|
||||
.into();
|
||||
|
||||
let insert_op = crdt2.doc.items.insert(ROOT_ID, new_item);
|
||||
// The list-level insert must have seq > lamport_floor.
|
||||
assert!(
|
||||
insert_op.seq > max_seq,
|
||||
"list insert seq ({}) must be > lamport_floor ({})",
|
||||
insert_op.seq,
|
||||
max_seq,
|
||||
);
|
||||
let insert_signed = insert_op.sign(&kp);
|
||||
crdt2.apply(insert_signed);
|
||||
|
||||
// Advance the new item's inner registers to the floor (mirrors write_item).
|
||||
let idx2 = rebuild_index(&crdt2)["664_story_new_after_restart"];
|
||||
let new_crdt_item = &mut crdt2.doc.items[idx2];
|
||||
new_crdt_item.stage.advance_seq(lamport_floor);
|
||||
|
||||
// Now update the stage — the first field-level op must also be > floor.
|
||||
let stage_op = crdt2.doc.items[idx2].stage.set("2_current".to_string());
|
||||
assert!(
|
||||
stage_op.seq > max_seq,
|
||||
"first field op seq ({}) on new register must be > lamport_floor ({}); \
|
||||
got seq = 1 means the register reset its clock on restart",
|
||||
stage_op.seq,
|
||||
max_seq,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
//! Write path: create, sign, apply, persist, and broadcast a CRDT op.
|
||||
//!
|
||||
//! [`apply_and_persist`] is the single entry point for all CRDT mutations.
|
||||
//! It invokes the caller's op-factory closure, signs the resulting op, applies
|
||||
//! it to the live document, sends it to the persistence channel, and broadcasts
|
||||
//! it to sync peers via [`super::SYNC_TX`].
|
||||
|
||||
use bft_json_crdt::json_crdt::JsonValue;
|
||||
use bft_json_crdt::op::Op;
|
||||
|
||||
use super::super::types::CrdtEvent;
|
||||
use super::{CrdtState, statics};
|
||||
|
||||
/// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the
|
||||
/// persistence channel. The closure receives `&mut CrdtState` so it can
|
||||
/// mutably access the CRDT document, while `sign` only needs `&keypair`.
|
||||
pub(in crate::crdt_state) fn apply_and_persist<F>(state: &mut CrdtState, op_fn: F)
|
||||
where
|
||||
F: FnOnce(&mut CrdtState) -> Op<JsonValue>,
|
||||
{
|
||||
let raw_op = op_fn(state);
|
||||
let signed = raw_op.sign(&state.keypair);
|
||||
state.crdt.apply(signed.clone());
|
||||
if state.persist_tx.send(signed.clone()).is_err() {
|
||||
let op_type = if signed.inner.is_deleted {
|
||||
"Delete"
|
||||
} else {
|
||||
"Insert"
|
||||
};
|
||||
let seq = signed.inner.seq;
|
||||
crate::slog_warn!(
|
||||
"[crdt_persist] persist channel send failed: op_type={op_type} seq={seq}"
|
||||
);
|
||||
}
|
||||
|
||||
// Track in ALL_OPS + VECTOR_CLOCK, then broadcast to sync peers.
|
||||
if let Ok(json) = serde_json::to_string(&signed) {
|
||||
statics::track_op(&signed, json);
|
||||
}
|
||||
if let Some(tx) = statics::SYNC_TX.get() {
|
||||
let _ = tx.send(signed);
|
||||
}
|
||||
}
|
||||
|
||||
/// Broadcast a CRDT event to all subscribers.
|
||||
pub(in crate::crdt_state) fn emit_event(event: CrdtEvent) {
|
||||
if let Some(tx) = statics::CRDT_EVENT_TX.get() {
|
||||
let _ = tx.send(event);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,115 @@
|
||||
//! Index-rebuild helpers: map domain keys to `ListCrdt` positions for O(1) lookup.
|
||||
//!
|
||||
//! Each function scans the corresponding `ListCrdt` in the CRDT document and
|
||||
//! returns a fresh `HashMap` from the entry's primary key to its position index.
|
||||
//! These are called once during [`super::init::init`] to populate the secondary
|
||||
//! indices in [`super::CrdtState`], and again by write helpers whenever an entry
|
||||
//! is inserted.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue};
|
||||
|
||||
use super::super::types::PipelineDoc;
|
||||
|
||||
/// Rebuild the story_id → list index mapping from the current CRDT state.
|
||||
pub(in crate::crdt_state) fn rebuild_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, item) in crdt.doc.items.iter().enumerate() {
|
||||
if let JsonValue::String(ref sid) = item.story_id.view() {
|
||||
map.insert(sid.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
/// Rebuild the node_id → nodes list index mapping from the current CRDT state.
|
||||
pub(in crate::crdt_state) fn rebuild_node_index(
|
||||
crdt: &BaseCrdt<PipelineDoc>,
|
||||
) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, node) in crdt.doc.nodes.iter().enumerate() {
|
||||
if let JsonValue::String(ref nid) = node.node_id.view() {
|
||||
map.insert(nid.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
/// Rebuild the agent_id → tokens list index.
|
||||
pub(in crate::crdt_state) fn rebuild_token_index(
|
||||
crdt: &BaseCrdt<PipelineDoc>,
|
||||
) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, entry) in crdt.doc.tokens.iter().enumerate() {
|
||||
if let JsonValue::String(ref k) = entry.agent_id.view() {
|
||||
map.insert(k.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
/// Rebuild the story_id → merge_jobs list index.
|
||||
pub(in crate::crdt_state) fn rebuild_merge_job_index(
|
||||
crdt: &BaseCrdt<PipelineDoc>,
|
||||
) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, entry) in crdt.doc.merge_jobs.iter().enumerate() {
|
||||
if let JsonValue::String(ref k) = entry.story_id.view() {
|
||||
map.insert(k.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
/// Rebuild the agent_id → active_agents list index.
|
||||
pub(in crate::crdt_state) fn rebuild_active_agent_index(
|
||||
crdt: &BaseCrdt<PipelineDoc>,
|
||||
) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, entry) in crdt.doc.active_agents.iter().enumerate() {
|
||||
if let JsonValue::String(ref k) = entry.agent_id.view() {
|
||||
map.insert(k.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
/// Rebuild the story_id → test_jobs list index.
|
||||
pub(in crate::crdt_state) fn rebuild_test_job_index(
|
||||
crdt: &BaseCrdt<PipelineDoc>,
|
||||
) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, entry) in crdt.doc.test_jobs.iter().enumerate() {
|
||||
if let JsonValue::String(ref k) = entry.story_id.view() {
|
||||
map.insert(k.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
/// Rebuild the node_id → agent_throttle list index.
|
||||
pub(in crate::crdt_state) fn rebuild_agent_throttle_index(
|
||||
crdt: &BaseCrdt<PipelineDoc>,
|
||||
) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, entry) in crdt.doc.agent_throttle.iter().enumerate() {
|
||||
if let JsonValue::String(ref k) = entry.node_id.view() {
|
||||
map.insert(k.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
/// Rebuild the project name → gateway_projects list index.
|
||||
pub(in crate::crdt_state) fn rebuild_gateway_project_index(
|
||||
crdt: &BaseCrdt<PipelineDoc>,
|
||||
) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, entry) in crdt.doc.gateway_projects.iter().enumerate() {
|
||||
if let JsonValue::String(ref k) = entry.name.view() {
|
||||
map.insert(k.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
@@ -0,0 +1,192 @@
|
||||
//! CRDT initialisation: async startup and keypair persistence.
|
||||
//!
|
||||
//! [`init`] opens the SQLite database, loads or creates the node keypair,
|
||||
//! replays all persisted ops to reconstruct state, and spawns a background
|
||||
//! persistence task. It is safe to call only once; subsequent calls are
|
||||
//! no-ops (guarded by [`super::CRDT_STATE`]).
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp};
|
||||
use bft_json_crdt::keypair::make_keypair;
|
||||
use fastcrypto::ed25519::Ed25519KeyPair;
|
||||
use fastcrypto::traits::ToFromBytes;
|
||||
use sqlx::SqlitePool;
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
|
||||
use super::super::VectorClock;
|
||||
use super::super::hex;
|
||||
use super::super::types::{CrdtEvent, PipelineDoc};
|
||||
use super::indices::{
|
||||
rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_gateway_project_index,
|
||||
rebuild_index, rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index,
|
||||
rebuild_token_index,
|
||||
};
|
||||
use super::statics::{ALL_OPS, CRDT_EVENT_TX, SYNC_TX, VECTOR_CLOCK};
|
||||
use super::{CRDT_STATE, CrdtState};
|
||||
use crate::slog;
|
||||
|
||||
/// Initialise the CRDT state layer.
|
||||
///
|
||||
/// Opens the SQLite database, loads or creates a node keypair, replays any
|
||||
/// persisted ops to reconstruct state, and spawns a background persistence
|
||||
/// task. Safe to call only once; subsequent calls are no-ops.
|
||||
pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
|
||||
if CRDT_STATE.get().is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let options = SqliteConnectOptions::new()
|
||||
.filename(db_path)
|
||||
.create_if_missing(true);
|
||||
let pool = SqlitePool::connect_with(options).await?;
|
||||
sqlx::migrate!("./migrations").run(&pool).await?;
|
||||
|
||||
// Load or create the node keypair.
|
||||
let keypair = load_or_create_keypair(&pool).await?;
|
||||
let mut crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
|
||||
|
||||
// Replay persisted ops to reconstruct state.
|
||||
let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
|
||||
.fetch_all(&pool)
|
||||
.await?;
|
||||
|
||||
let mut all_ops_vec = Vec::with_capacity(rows.len());
|
||||
let mut vector_clock = VectorClock::new();
|
||||
let mut lamport_floor: u64 = 0;
|
||||
for (op_json,) in &rows {
|
||||
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json) {
|
||||
let author_hex = hex::encode(&signed_op.author());
|
||||
*vector_clock.entry(author_hex).or_insert(0) += 1;
|
||||
lamport_floor = lamport_floor.max(signed_op.inner.seq);
|
||||
crdt.apply(signed_op);
|
||||
all_ops_vec.push(op_json.clone());
|
||||
} else {
|
||||
slog!("[crdt] Warning: failed to deserialize stored op");
|
||||
}
|
||||
}
|
||||
let _ = ALL_OPS.set(Mutex::new(all_ops_vec));
|
||||
let _ = VECTOR_CLOCK.set(Mutex::new(vector_clock));
|
||||
|
||||
// Build the indices from the reconstructed state.
|
||||
let index = rebuild_index(&crdt);
|
||||
let node_index = rebuild_node_index(&crdt);
|
||||
let token_index = rebuild_token_index(&crdt);
|
||||
let merge_job_index = rebuild_merge_job_index(&crdt);
|
||||
let active_agent_index = rebuild_active_agent_index(&crdt);
|
||||
let test_job_index = rebuild_test_job_index(&crdt);
|
||||
let agent_throttle_index = rebuild_agent_throttle_index(&crdt);
|
||||
let gateway_project_index = rebuild_gateway_project_index(&crdt);
|
||||
|
||||
// Advance the top-level list clocks to the Lamport floor so that
|
||||
// list-level inserts don't re-emit low seq numbers.
|
||||
crdt.doc.items.advance_seq(lamport_floor);
|
||||
crdt.doc.nodes.advance_seq(lamport_floor);
|
||||
crdt.doc.tokens.advance_seq(lamport_floor);
|
||||
crdt.doc.merge_jobs.advance_seq(lamport_floor);
|
||||
crdt.doc.active_agents.advance_seq(lamport_floor);
|
||||
crdt.doc.test_jobs.advance_seq(lamport_floor);
|
||||
crdt.doc.agent_throttle.advance_seq(lamport_floor);
|
||||
crdt.doc.gateway_projects.advance_seq(lamport_floor);
|
||||
crdt.doc
|
||||
.gateway_config
|
||||
.active_project
|
||||
.advance_seq(lamport_floor);
|
||||
|
||||
slog!(
|
||||
"[crdt] Initialised: {} ops replayed, {} items indexed, {} nodes indexed, lamport_floor={}",
|
||||
rows.len(),
|
||||
index.len(),
|
||||
node_index.len(),
|
||||
lamport_floor,
|
||||
);
|
||||
|
||||
// Spawn background persistence task.
|
||||
let (persist_tx, mut persist_rx) = mpsc::unbounded_channel::<SignedOp>();
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(op) = persist_rx.recv().await {
|
||||
let op_json = match serde_json::to_string(&op) {
|
||||
Ok(j) => j,
|
||||
Err(e) => {
|
||||
slog!("[crdt] Failed to serialize op: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let op_id = hex::encode(&op.id());
|
||||
let seq = op.inner.seq as i64;
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
|
||||
let result = sqlx::query(
|
||||
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \
|
||||
VALUES (?1, ?2, ?3, ?4) \
|
||||
ON CONFLICT(op_id) DO NOTHING",
|
||||
)
|
||||
.bind(&op_id)
|
||||
.bind(seq)
|
||||
.bind(&op_json)
|
||||
.bind(&now)
|
||||
.execute(&pool)
|
||||
.await;
|
||||
|
||||
if let Err(e) = result {
|
||||
slog!("[crdt] Failed to persist op {}: {e}", &op_id[..12]);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let state = CrdtState {
|
||||
crdt,
|
||||
keypair,
|
||||
index,
|
||||
node_index,
|
||||
token_index,
|
||||
merge_job_index,
|
||||
active_agent_index,
|
||||
test_job_index,
|
||||
agent_throttle_index,
|
||||
gateway_project_index,
|
||||
persist_tx,
|
||||
lamport_floor,
|
||||
};
|
||||
|
||||
let _ = CRDT_STATE.set(Mutex::new(state));
|
||||
|
||||
// Initialise the CRDT event broadcast channel.
|
||||
let (event_tx, _) = broadcast::channel::<CrdtEvent>(256);
|
||||
let _ = CRDT_EVENT_TX.set(event_tx);
|
||||
|
||||
// Initialise the sync broadcast channel for outgoing ops.
|
||||
let (sync_tx, _) = broadcast::channel::<SignedOp>(1024);
|
||||
let _ = SYNC_TX.set(sync_tx);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Load or create the Ed25519 keypair used by this node.
|
||||
async fn load_or_create_keypair(pool: &SqlitePool) -> Result<Ed25519KeyPair, sqlx::Error> {
|
||||
let row: Option<(Vec<u8>,)> =
|
||||
sqlx::query_as("SELECT seed FROM crdt_node_identity WHERE id = 1")
|
||||
.fetch_optional(pool)
|
||||
.await?;
|
||||
|
||||
if let Some((seed,)) = row {
|
||||
// Reconstruct from stored seed. The seed is the 32-byte private key.
|
||||
if let Ok(kp) = Ed25519KeyPair::from_bytes(&seed) {
|
||||
return Ok(kp);
|
||||
}
|
||||
slog!("[crdt] Stored keypair invalid, regenerating");
|
||||
}
|
||||
|
||||
let kp = make_keypair();
|
||||
let seed = kp.as_bytes().to_vec();
|
||||
sqlx::query("INSERT INTO crdt_node_identity (id, seed) VALUES (1, ?1) ON CONFLICT(id) DO UPDATE SET seed = excluded.seed")
|
||||
.bind(&seed)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(kp)
|
||||
}
|
||||
@@ -0,0 +1,145 @@
|
||||
//! Internal CRDT state struct, statics, initialisation, and central write primitive.
|
||||
//!
|
||||
//! This module is split into focused submodules:
|
||||
//! - [`statics`]: broadcast channels and op-tracking statics
|
||||
//! - [`indices`]: index-rebuild helpers for O(1) key lookup
|
||||
//! - [`init`]: async startup and keypair persistence
|
||||
//! - [`apply`]: write path (sign, apply, persist, broadcast)
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Mutex, OnceLock};
|
||||
|
||||
use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp};
|
||||
use bft_json_crdt::keypair::make_keypair;
|
||||
use fastcrypto::ed25519::Ed25519KeyPair;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
|
||||
use super::VectorClock;
|
||||
use super::types::{CrdtEvent, PipelineDoc};
|
||||
|
||||
mod apply;
|
||||
mod indices;
|
||||
mod init;
|
||||
mod statics;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
// ── Re-exports for crdt_state siblings ──────────────────────────────
|
||||
|
||||
pub use init::init;
|
||||
|
||||
pub(super) use apply::{apply_and_persist, emit_event};
|
||||
pub(super) use indices::{
|
||||
rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_gateway_project_index,
|
||||
rebuild_index, rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index,
|
||||
rebuild_token_index,
|
||||
};
|
||||
pub(crate) use statics::{ALL_OPS, VECTOR_CLOCK};
|
||||
pub(super) use statics::{SYNC_TX, track_op};
|
||||
|
||||
// ── CrdtState struct ─────────────────────────────────────────────────
|
||||
|
||||
/// Holds the core CRDT document, signing keypair, and all O(1) lookup indices.
|
||||
pub(super) struct CrdtState {
|
||||
pub(super) crdt: BaseCrdt<PipelineDoc>,
|
||||
pub(super) keypair: Ed25519KeyPair,
|
||||
/// Maps story_id → index in the items ListCrdt for O(1) lookup.
|
||||
pub(super) index: HashMap<String, usize>,
|
||||
/// Maps node_id (hex) → index in the nodes ListCrdt for O(1) lookup.
|
||||
pub(super) node_index: HashMap<String, usize>,
|
||||
/// Maps agent_id → index in the tokens ListCrdt for O(1) lookup.
|
||||
pub(super) token_index: HashMap<String, usize>,
|
||||
/// Maps story_id → index in the merge_jobs ListCrdt for O(1) lookup.
|
||||
pub(super) merge_job_index: HashMap<String, usize>,
|
||||
/// Maps agent_id → index in the active_agents ListCrdt for O(1) lookup.
|
||||
pub(super) active_agent_index: HashMap<String, usize>,
|
||||
/// Maps story_id → index in the test_jobs ListCrdt for O(1) lookup.
|
||||
pub(super) test_job_index: HashMap<String, usize>,
|
||||
/// Maps node_id → index in the agent_throttle ListCrdt for O(1) lookup.
|
||||
pub(super) agent_throttle_index: HashMap<String, usize>,
|
||||
/// Maps project name → index in the gateway_projects ListCrdt for O(1) lookup.
|
||||
pub(super) gateway_project_index: HashMap<String, usize>,
|
||||
/// Channel sender for fire-and-forget op persistence.
|
||||
pub(super) persist_tx: mpsc::UnboundedSender<SignedOp>,
|
||||
/// Max sequence number seen across all ops during init() replay.
|
||||
///
|
||||
/// Newly-created registers (post-init) must have their Lamport clock
|
||||
/// advanced to this floor so they don't re-emit low sequence numbers.
|
||||
pub(super) lamport_floor: u64,
|
||||
}
|
||||
|
||||
// ── Singleton and accessor ───────────────────────────────────────────
|
||||
|
||||
/// Process-wide singleton holding the initialised [`CrdtState`].
|
||||
pub(super) static CRDT_STATE: OnceLock<Mutex<CrdtState>> = OnceLock::new();
|
||||
|
||||
#[cfg(test)]
|
||||
thread_local! {
|
||||
static CRDT_STATE_TL: OnceLock<Mutex<CrdtState>> = const { OnceLock::new() };
|
||||
}
|
||||
|
||||
/// Returns a reference to the global [`CrdtState`] mutex, if initialised.
|
||||
#[cfg(not(test))]
|
||||
pub(super) fn get_crdt() -> Option<&'static Mutex<CrdtState>> {
|
||||
CRDT_STATE.get()
|
||||
}
|
||||
|
||||
/// Returns the thread-local [`CrdtState`] if set, otherwise the global one (test variant).
|
||||
#[cfg(test)]
|
||||
pub(super) fn get_crdt() -> Option<&'static Mutex<CrdtState>> {
|
||||
let tl = CRDT_STATE_TL.with(|lock| {
|
||||
if lock.get().is_some() {
|
||||
Some(lock as *const OnceLock<Mutex<CrdtState>>)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
if let Some(ptr) = tl {
|
||||
// SAFETY: The thread-local lives as long as the thread, which outlives
|
||||
// any test using it. We only need 'static for the return type.
|
||||
let lock = unsafe { &*ptr };
|
||||
lock.get()
|
||||
} else {
|
||||
CRDT_STATE.get()
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialise a minimal in-memory CRDT state for unit tests.
|
||||
///
|
||||
/// This avoids the async SQLite setup from `init()`. Ops are accepted via a
|
||||
/// channel whose receiver is immediately dropped, so nothing is persisted.
|
||||
/// Safe to call multiple times — subsequent calls are no-ops (OnceLock).
|
||||
#[cfg(test)]
|
||||
pub fn init_for_test() {
|
||||
// Initialise thread-local CRDT for test isolation.
|
||||
// Only creates a new CRDT if one isn't set yet on this thread;
|
||||
// subsequent calls are no-ops (matching the old OnceLock semantics
|
||||
// while keeping each thread isolated).
|
||||
CRDT_STATE_TL.with(|lock| {
|
||||
if lock.get().is_none() {
|
||||
let keypair = make_keypair();
|
||||
let crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
|
||||
let (persist_tx, _rx) = mpsc::unbounded_channel();
|
||||
let state = CrdtState {
|
||||
crdt,
|
||||
keypair,
|
||||
index: HashMap::new(),
|
||||
node_index: HashMap::new(),
|
||||
token_index: HashMap::new(),
|
||||
merge_job_index: HashMap::new(),
|
||||
active_agent_index: HashMap::new(),
|
||||
test_job_index: HashMap::new(),
|
||||
agent_throttle_index: HashMap::new(),
|
||||
gateway_project_index: HashMap::new(),
|
||||
persist_tx,
|
||||
lamport_floor: 0,
|
||||
};
|
||||
let _ = lock.set(Mutex::new(state));
|
||||
}
|
||||
});
|
||||
let _ = statics::CRDT_EVENT_TX.get_or_init(|| broadcast::channel::<CrdtEvent>(256).0);
|
||||
let _ = statics::SYNC_TX.get_or_init(|| broadcast::channel::<SignedOp>(1024).0);
|
||||
let _ = statics::ALL_OPS.get_or_init(|| Mutex::new(Vec::new()));
|
||||
let _ = statics::VECTOR_CLOCK.get_or_init(|| Mutex::new(VectorClock::new()));
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
//! Broadcast channels and op-tracking statics for the CRDT state layer.
|
||||
//!
|
||||
//! Provides the outbound sync channel ([`SYNC_TX`]), the event broadcast
|
||||
//! channel ([`CRDT_EVENT_TX`]), and the in-memory op journal
|
||||
//! ([`ALL_OPS`] / [`VECTOR_CLOCK`]) that tracks every applied op for
|
||||
//! delta-sync.
|
||||
|
||||
use std::sync::{Mutex, OnceLock};
|
||||
|
||||
use bft_json_crdt::json_crdt::SignedOp;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use super::super::VectorClock;
|
||||
use super::super::hex;
|
||||
use super::super::types::CrdtEvent;
|
||||
|
||||
/// Broadcast channel for CRDT events (stage transitions, etc.).
|
||||
pub(super) static CRDT_EVENT_TX: OnceLock<broadcast::Sender<CrdtEvent>> = OnceLock::new();
|
||||
|
||||
/// Broadcast channel for outbound ops to sync peers.
|
||||
pub(in crate::crdt_state) static SYNC_TX: OnceLock<broadcast::Sender<SignedOp>> = OnceLock::new();
|
||||
|
||||
/// All persisted ops as JSON strings, in causal (insertion) order.
|
||||
///
|
||||
/// Pub(crate) so that `crdt_snapshot` can access it for compaction.
|
||||
pub(crate) static ALL_OPS: OnceLock<Mutex<Vec<String>>> = OnceLock::new();
|
||||
|
||||
/// Live vector clock tracking op counts per author.
|
||||
///
|
||||
/// Updated in lockstep with `ALL_OPS` — every time an op is appended to the
|
||||
/// journal, the corresponding author's count is incremented here. This avoids
|
||||
/// re-parsing all ops when a peer requests `our_vector_clock()`.
|
||||
pub(crate) static VECTOR_CLOCK: OnceLock<Mutex<VectorClock>> = OnceLock::new();
|
||||
|
||||
/// Append an op's JSON to `ALL_OPS` and bump the author's count in `VECTOR_CLOCK`.
|
||||
///
|
||||
/// Centralises the bookkeeping that must stay in sync between the two statics.
|
||||
pub(in crate::crdt_state) fn track_op(signed: &SignedOp, json: String) {
|
||||
if let Some(all) = ALL_OPS.get()
|
||||
&& let Ok(mut v) = all.lock()
|
||||
{
|
||||
v.push(json);
|
||||
}
|
||||
if let Some(vc) = VECTOR_CLOCK.get()
|
||||
&& let Ok(mut clock) = vc.lock()
|
||||
{
|
||||
let author_hex = hex::encode(&signed.author());
|
||||
*clock.entry(author_hex).or_insert(0) += 1;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,424 @@
|
||||
//! Unit and integration tests for the core CRDT state module.
|
||||
//!
|
||||
//! Covers op replay, index rebuild, round-trip persistence, the
|
||||
//! Lamport-floor invariant, and the persist-channel error path.
|
||||
|
||||
use super::super::hex;
|
||||
use super::super::read::extract_item_view;
|
||||
use super::super::types::PipelineDoc;
|
||||
use super::*;
|
||||
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue, SignedOp};
|
||||
use bft_json_crdt::keypair::make_keypair;
|
||||
use bft_json_crdt::op::ROOT_ID;
|
||||
use serde_json::json;
|
||||
use sqlx::SqlitePool;
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
use std::collections::HashMap;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[test]
|
||||
fn crdt_ops_replay_reconstructs_state() {
|
||||
let kp = make_keypair();
|
||||
let mut crdt1 = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
|
||||
// Build state with a series of ops.
|
||||
let item_json: JsonValue = json!({
|
||||
"story_id": "30_story_replay",
|
||||
"stage": "1_backlog",
|
||||
"name": "Replay Test",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
"claimed_by": "",
|
||||
"claimed_at": 0.0,
|
||||
})
|
||||
.into();
|
||||
|
||||
let op1 = crdt1.doc.items.insert(ROOT_ID, item_json).sign(&kp);
|
||||
crdt1.apply(op1.clone());
|
||||
|
||||
let op2 = crdt1.doc.items[0]
|
||||
.stage
|
||||
.set("2_current".to_string())
|
||||
.sign(&kp);
|
||||
crdt1.apply(op2.clone());
|
||||
|
||||
let op3 = crdt1.doc.items[0]
|
||||
.name
|
||||
.set("Updated Name".to_string())
|
||||
.sign(&kp);
|
||||
crdt1.apply(op3.clone());
|
||||
|
||||
// Replay ops on a fresh CRDT.
|
||||
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
crdt2.apply(op1);
|
||||
crdt2.apply(op2);
|
||||
crdt2.apply(op3);
|
||||
|
||||
assert_eq!(
|
||||
crdt1.doc.items[0].stage.view(),
|
||||
crdt2.doc.items[0].stage.view()
|
||||
);
|
||||
assert_eq!(
|
||||
crdt1.doc.items[0].name.view(),
|
||||
crdt2.doc.items[0].name.view()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rebuild_index_maps_story_ids() {
|
||||
let kp = make_keypair();
|
||||
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
|
||||
for (sid, stage) in &[("10_story_a", "1_backlog"), ("20_story_b", "2_current")] {
|
||||
let item: JsonValue = json!({
|
||||
"story_id": sid,
|
||||
"stage": stage,
|
||||
"name": "",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
"claimed_by": "",
|
||||
"claimed_at": 0.0,
|
||||
})
|
||||
.into();
|
||||
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
|
||||
crdt.apply(op);
|
||||
}
|
||||
|
||||
let index = rebuild_index(&crdt);
|
||||
assert_eq!(index.len(), 2);
|
||||
assert!(index.contains_key("10_story_a"));
|
||||
assert!(index.contains_key("20_story_b"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_and_write_read_roundtrip() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let db_path = tmp.path().join("crdt_test.db");
|
||||
|
||||
// Init directly (not via the global singleton, for test isolation).
|
||||
let options = SqliteConnectOptions::new()
|
||||
.filename(&db_path)
|
||||
.create_if_missing(true);
|
||||
let pool = SqlitePool::connect_with(options).await.unwrap();
|
||||
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
||||
|
||||
let keypair = make_keypair();
|
||||
let mut crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
|
||||
|
||||
// Insert and update like write_item does.
|
||||
let item_json: JsonValue = json!({
|
||||
"story_id": "50_story_roundtrip",
|
||||
"stage": "1_backlog",
|
||||
"name": "Roundtrip",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
"claimed_by": "",
|
||||
"claimed_at": 0.0,
|
||||
})
|
||||
.into();
|
||||
|
||||
let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&keypair);
|
||||
crdt.apply(insert_op.clone());
|
||||
|
||||
// Persist the op.
|
||||
let op_json = serde_json::to_string(&insert_op).unwrap();
|
||||
let op_id = hex::encode(&insert_op.id());
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
sqlx::query("INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)")
|
||||
.bind(&op_id)
|
||||
.bind(insert_op.inner.seq as i64)
|
||||
.bind(&op_json)
|
||||
.bind(&now)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Reconstruct from DB.
|
||||
let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&keypair);
|
||||
for (json_str,) in &rows {
|
||||
let op: SignedOp = serde_json::from_str(json_str).unwrap();
|
||||
crdt2.apply(op);
|
||||
}
|
||||
|
||||
let view = extract_item_view(&crdt2.doc.items[0]).unwrap();
|
||||
assert_eq!(view.story_id, "50_story_roundtrip");
|
||||
assert_eq!(view.stage, "1_backlog");
|
||||
assert_eq!(view.name.as_deref(), Some("Roundtrip"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persist_tx_send_failure_logs_warn_with_op_type_and_seq() {
|
||||
let kp = make_keypair();
|
||||
let crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
let (persist_tx, persist_rx) = mpsc::unbounded_channel::<SignedOp>();
|
||||
|
||||
let mut state = CrdtState {
|
||||
crdt,
|
||||
keypair: kp,
|
||||
index: HashMap::new(),
|
||||
node_index: HashMap::new(),
|
||||
token_index: HashMap::new(),
|
||||
merge_job_index: HashMap::new(),
|
||||
active_agent_index: HashMap::new(),
|
||||
test_job_index: HashMap::new(),
|
||||
agent_throttle_index: HashMap::new(),
|
||||
gateway_project_index: HashMap::new(),
|
||||
persist_tx,
|
||||
lamport_floor: 0,
|
||||
};
|
||||
|
||||
// Drop the receiver so that the next send fails immediately.
|
||||
drop(persist_rx);
|
||||
|
||||
let item_json: JsonValue = json!({
|
||||
"story_id": "676_story_persist_fail",
|
||||
"stage": "1_backlog",
|
||||
"name": "Persist Fail Test",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
"claimed_by": "",
|
||||
"claimed_at": 0.0,
|
||||
})
|
||||
.into();
|
||||
|
||||
let before_warns = crate::log_buffer::global()
|
||||
.get_recent_entries(1000, None, Some(&crate::log_buffer::LogLevel::Warn))
|
||||
.len();
|
||||
|
||||
apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json));
|
||||
|
||||
let warn_entries = crate::log_buffer::global().get_recent_entries(
|
||||
1000,
|
||||
None,
|
||||
Some(&crate::log_buffer::LogLevel::Warn),
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
warn_entries.len(),
|
||||
before_warns + 1,
|
||||
"expected exactly one WARN log entry when persist_tx send fails"
|
||||
);
|
||||
|
||||
let warn = &warn_entries[warn_entries.len() - 1];
|
||||
assert!(
|
||||
warn.message.contains("[crdt_persist]"),
|
||||
"WARN message must be prefixed [crdt_persist]: {}",
|
||||
warn.message
|
||||
);
|
||||
assert!(
|
||||
warn.message.contains("op_type="),
|
||||
"WARN message must include op_type: {}",
|
||||
warn.message
|
||||
);
|
||||
assert!(
|
||||
warn.message.contains("seq="),
|
||||
"WARN message must include seq: {}",
|
||||
warn.message
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persist_tx_send_success_emits_no_warn() {
|
||||
let kp = make_keypair();
|
||||
let crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
let (persist_tx, _persist_rx) = mpsc::unbounded_channel::<SignedOp>();
|
||||
|
||||
let mut state = CrdtState {
|
||||
crdt,
|
||||
keypair: kp,
|
||||
index: HashMap::new(),
|
||||
node_index: HashMap::new(),
|
||||
token_index: HashMap::new(),
|
||||
merge_job_index: HashMap::new(),
|
||||
active_agent_index: HashMap::new(),
|
||||
test_job_index: HashMap::new(),
|
||||
agent_throttle_index: HashMap::new(),
|
||||
gateway_project_index: HashMap::new(),
|
||||
persist_tx,
|
||||
lamport_floor: 0,
|
||||
};
|
||||
|
||||
let item_json: JsonValue = json!({
|
||||
"story_id": "676_story_happy_path",
|
||||
"stage": "1_backlog",
|
||||
"name": "Happy Path Test",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
"claimed_by": "",
|
||||
"claimed_at": 0.0,
|
||||
})
|
||||
.into();
|
||||
|
||||
let before_warns = crate::log_buffer::global()
|
||||
.get_recent_entries(
|
||||
1000,
|
||||
Some("[crdt_persist]"),
|
||||
Some(&crate::log_buffer::LogLevel::Warn),
|
||||
)
|
||||
.len();
|
||||
|
||||
apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json));
|
||||
|
||||
let after_warns = crate::log_buffer::global()
|
||||
.get_recent_entries(
|
||||
1000,
|
||||
Some("[crdt_persist]"),
|
||||
Some(&crate::log_buffer::LogLevel::Warn),
|
||||
)
|
||||
.len();
|
||||
|
||||
assert_eq!(
|
||||
after_warns, before_warns,
|
||||
"no [crdt_persist] WARN should be emitted when persist_tx send succeeds"
|
||||
);
|
||||
}
|
||||
|
||||
/// After replaying ops from a journal, a brand-new register created
|
||||
/// post-init must emit its first local op with seq = lamport_floor + 1,
|
||||
/// not seq = 1. This is the Phase C integration test.
|
||||
#[tokio::test]
|
||||
async fn restart_new_register_resumes_from_lamport_floor() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let db_path = tmp.path().join("lamport_floor.db");
|
||||
|
||||
let options = SqliteConnectOptions::new()
|
||||
.filename(&db_path)
|
||||
.create_if_missing(true);
|
||||
let pool = SqlitePool::connect_with(options).await.unwrap();
|
||||
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
||||
|
||||
let kp = make_keypair();
|
||||
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
|
||||
// Insert an item and update its stage a few times to push seq up.
|
||||
let item: JsonValue = json!({
|
||||
"story_id": "664_story_original",
|
||||
"stage": "1_backlog",
|
||||
"name": "Original",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
"claimed_by": "",
|
||||
"claimed_at": 0.0,
|
||||
"merged_at": 0.0,
|
||||
})
|
||||
.into();
|
||||
|
||||
let mut ops = Vec::new();
|
||||
|
||||
let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
|
||||
crdt.apply(op1.clone());
|
||||
ops.push(op1);
|
||||
|
||||
let idx = rebuild_index(&crdt)["664_story_original"];
|
||||
let op2 = crdt.doc.items[idx]
|
||||
.stage
|
||||
.set("2_current".to_string())
|
||||
.sign(&kp);
|
||||
crdt.apply(op2.clone());
|
||||
ops.push(op2);
|
||||
|
||||
let op3 = crdt.doc.items[idx]
|
||||
.stage
|
||||
.set("3_review".to_string())
|
||||
.sign(&kp);
|
||||
crdt.apply(op3.clone());
|
||||
ops.push(op3);
|
||||
|
||||
// Record the max seq across all persisted ops — this is the floor.
|
||||
let max_seq = ops.iter().map(|o| o.inner.seq).max().unwrap();
|
||||
|
||||
// Persist all ops.
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
for op in &ops {
|
||||
let op_json = serde_json::to_string(op).unwrap();
|
||||
let op_id = hex::encode(&op.id());
|
||||
sqlx::query(
|
||||
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)",
|
||||
)
|
||||
.bind(&op_id)
|
||||
.bind(op.inner.seq as i64)
|
||||
.bind(&op_json)
|
||||
.bind(&now)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// --- Simulate restart: replay from journal into a fresh CRDT ---
|
||||
let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
let mut lamport_floor: u64 = 0;
|
||||
for (json_str,) in &rows {
|
||||
let signed: SignedOp = serde_json::from_str(json_str).unwrap();
|
||||
lamport_floor = lamport_floor.max(signed.inner.seq);
|
||||
crdt2.apply(signed);
|
||||
}
|
||||
|
||||
// Advance top-level lists (mirrors what init() does).
|
||||
crdt2.doc.items.advance_seq(lamport_floor);
|
||||
crdt2.doc.nodes.advance_seq(lamport_floor);
|
||||
|
||||
assert_eq!(lamport_floor, max_seq);
|
||||
|
||||
// Insert a brand-new item — simulating a new story arriving after restart.
|
||||
let new_item: JsonValue = json!({
|
||||
"story_id": "664_story_new_after_restart",
|
||||
"stage": "1_backlog",
|
||||
"name": "New After Restart",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
"claimed_by": "",
|
||||
"claimed_at": 0.0,
|
||||
"merged_at": 0.0,
|
||||
})
|
||||
.into();
|
||||
|
||||
let insert_op = crdt2.doc.items.insert(ROOT_ID, new_item);
|
||||
// The list-level insert must have seq > lamport_floor.
|
||||
assert!(
|
||||
insert_op.seq > max_seq,
|
||||
"list insert seq ({}) must be > lamport_floor ({})",
|
||||
insert_op.seq,
|
||||
max_seq,
|
||||
);
|
||||
let insert_signed = insert_op.sign(&kp);
|
||||
crdt2.apply(insert_signed);
|
||||
|
||||
// Advance the new item's inner registers to the floor (mirrors write_item).
|
||||
let idx2 = rebuild_index(&crdt2)["664_story_new_after_restart"];
|
||||
let new_crdt_item = &mut crdt2.doc.items[idx2];
|
||||
new_crdt_item.stage.advance_seq(lamport_floor);
|
||||
|
||||
// Now update the stage — the first field-level op must also be > floor.
|
||||
let stage_op = crdt2.doc.items[idx2].stage.set("2_current".to_string());
|
||||
assert!(
|
||||
stage_op.seq > max_seq,
|
||||
"first field op seq ({}) on new register must be > lamport_floor ({}); \
|
||||
got seq = 1 means the register reset its clock on restart",
|
||||
stage_op.seq,
|
||||
max_seq,
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user