refactor: split crdt_state.rs into 6 sub-modules with co-located tests
The 2122-line crdt_state.rs is split into a sub-module directory: - types.rs: CRDT/view types + CrdtEvent (247 lines) - state.rs: CrdtState struct, statics, init, apply_and_persist (531 lines) - ops.rs: sync API + apply_remote_op + delta-sync tests (455 lines) - write.rs: write_item + bug_511 test (273 lines) - read.rs: read API + dump + dep helpers (469 lines) - presence.rs: node identity + claim API + heartbeat (176 lines) - mod.rs: doc, sub-module decls, re-exports, hex helper (53 lines) Tests are co-located with the code they primarily exercise per Rust convention. No behaviour change. All 26 crdt_state tests pass; full suite green (2635 tests with --test-threads=1).
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,53 @@
|
|||||||
|
//! CRDT state layer — manages pipeline state as a conflict-free replicated document backed by SQLite.
|
||||||
|
//!
|
||||||
|
//! The CRDT document is the primary source of truth for pipeline item
|
||||||
|
//! metadata (stage, name, agent, etc.). CRDT ops are persisted to SQLite so
|
||||||
|
//! state survives restarts. The filesystem `.huskies/work/` directories are
|
||||||
|
//! still updated as a secondary output for backwards compatibility.
|
||||||
|
//!
|
||||||
|
//! Stage transitions detected by `write_item()` are broadcast as [`CrdtEvent`]s
|
||||||
|
//! so subscribers (auto-assign, WebSocket, notifications) can react without
|
||||||
|
//! polling the filesystem.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
/// A vector clock mapping node IDs (hex-encoded Ed25519 pubkeys) to the count
|
||||||
|
/// of ops seen from that node. Used for delta sync — a connecting peer sends
|
||||||
|
/// its clock so the other side can compute which ops are missing.
|
||||||
|
pub type VectorClock = HashMap<String, u64>;
|
||||||
|
|
||||||
|
mod ops;
|
||||||
|
mod presence;
|
||||||
|
mod read;
|
||||||
|
mod state;
|
||||||
|
mod types;
|
||||||
|
mod write;
|
||||||
|
|
||||||
|
pub use ops::{all_ops_json, apply_remote_op, ops_since, our_vector_clock, subscribe_ops};
|
||||||
|
pub use presence::{
|
||||||
|
is_claimed_by_us, our_node_id, read_all_node_presence, release_claim, sign_challenge,
|
||||||
|
write_claim, write_node_presence,
|
||||||
|
};
|
||||||
|
pub use read::{
|
||||||
|
CrdtItemDump, CrdtStateDump, check_archived_deps_crdt, check_unmet_deps_crdt,
|
||||||
|
dep_is_archived_crdt, dep_is_done_crdt, dump_crdt_state, evict_item, read_all_items,
|
||||||
|
read_item,
|
||||||
|
};
|
||||||
|
pub use state::init;
|
||||||
|
pub use types::{
|
||||||
|
CrdtEvent, NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt,
|
||||||
|
PipelineItemView, subscribe,
|
||||||
|
};
|
||||||
|
pub use write::write_item;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub use state::init_for_test;
|
||||||
|
|
||||||
|
pub(crate) use state::{ALL_OPS, VECTOR_CLOCK};
|
||||||
|
|
||||||
|
/// Hex-encode a byte slice (no external dep needed).
|
||||||
|
pub(crate) mod hex {
|
||||||
|
pub fn encode(bytes: &[u8]) -> String {
|
||||||
|
bytes.iter().map(|b| format!("{b:02x}")).collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,460 @@
|
|||||||
|
//! Public sync-broadcast API and remote-op ingestion.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use bft_json_crdt::json_crdt::*;
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use super::hex;
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
|
use super::VectorClock;
|
||||||
|
use super::state::{
|
||||||
|
ALL_OPS, SYNC_TX, VECTOR_CLOCK, apply_and_persist, emit_event, get_crdt, rebuild_index,
|
||||||
|
rebuild_node_index, track_op,
|
||||||
|
};
|
||||||
|
use super::types::{CrdtEvent, PipelineDoc};
|
||||||
|
use crate::slog;
|
||||||
|
|
||||||
|
/// Subscribe to locally-created CRDT ops for sync replication.
|
||||||
|
///
|
||||||
|
/// Each `SignedOp` broadcast here was created by *this* node and should be
|
||||||
|
/// forwarded to connected peers. Returns `None` before `init()`.
|
||||||
|
pub fn subscribe_ops() -> Option<broadcast::Receiver<SignedOp>> {
|
||||||
|
SYNC_TX.get().map(|tx| tx.subscribe())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return all persisted `SignedOp`s in causal order (oldest first).
|
||||||
|
///
|
||||||
|
/// Used during initial sync handshake so a newly-connected peer can
|
||||||
|
/// reconstruct the full CRDT state. Returns `None` before `init()`.
|
||||||
|
pub fn all_ops_json() -> Option<Vec<String>> {
|
||||||
|
ALL_OPS.get().map(|m| m.lock().unwrap().clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return this node's current vector clock.
|
||||||
|
///
|
||||||
|
/// The clock maps each author's hex-encoded Ed25519 public key to the count
|
||||||
|
/// of ops received from that author. A connecting peer sends its clock so
|
||||||
|
/// the other side can compute which ops are missing via [`ops_since`].
|
||||||
|
///
|
||||||
|
/// Returns `None` before `init()`.
|
||||||
|
pub fn our_vector_clock() -> Option<VectorClock> {
|
||||||
|
VECTOR_CLOCK.get().map(|m| m.lock().unwrap().clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return only the ops that a peer with the given `peer_clock` is missing.
|
||||||
|
///
|
||||||
|
/// Iterates the local op journal and, for each author, skips the first N ops
|
||||||
|
/// (where N = `peer_clock[author]`) and returns the rest. An empty peer
|
||||||
|
/// clock returns all ops (full sync for new nodes).
|
||||||
|
///
|
||||||
|
/// Returns `None` before `init()`.
|
||||||
|
pub fn ops_since(peer_clock: &VectorClock) -> Option<Vec<String>> {
|
||||||
|
let all = ALL_OPS.get()?.lock().ok()?;
|
||||||
|
let mut author_counts: HashMap<String, u64> = HashMap::new();
|
||||||
|
let mut result = Vec::new();
|
||||||
|
|
||||||
|
for op_json in all.iter() {
|
||||||
|
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json) {
|
||||||
|
let author_hex = super::hex::encode(&signed_op.author());
|
||||||
|
let count = author_counts.entry(author_hex.clone()).or_insert(0);
|
||||||
|
*count += 1;
|
||||||
|
|
||||||
|
let peer_has = peer_clock.get(&author_hex).copied().unwrap_or(0);
|
||||||
|
if *count > peer_has {
|
||||||
|
result.push(op_json.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Remote op ingestion (from sync peers) ───────────────────────────
|
||||||
|
|
||||||
|
/// Apply a `SignedOp` received from a remote peer.
|
||||||
|
///
|
||||||
|
/// The op is validated, applied to the local CRDT, persisted to SQLite,
|
||||||
|
/// and any resulting stage transitions are broadcast as [`CrdtEvent`]s.
|
||||||
|
/// Unlike `apply_and_persist`, this does **not** re-broadcast the op on
|
||||||
|
/// the sync channel (to avoid infinite echo loops).
|
||||||
|
///
|
||||||
|
/// Returns `true` if the op was new and applied, `false` if it was a
|
||||||
|
/// duplicate or failed validation.
|
||||||
|
pub fn apply_remote_op(op: SignedOp) -> bool {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Snapshot stage state before applying so we can detect transitions.
|
||||||
|
let pre_stages: HashMap<String, String> = state
|
||||||
|
.index
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(sid, &idx)| match state.crdt.doc.items[idx].stage.view() {
|
||||||
|
JsonValue::String(s) => Some((sid.clone(), s)),
|
||||||
|
_ => None,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let result = state.crdt.apply(op.clone());
|
||||||
|
|
||||||
|
// Self-loop guard: op was already applied (came back via echo from peer).
|
||||||
|
// Return false immediately — do not re-persist or re-add to ALL_OPS.
|
||||||
|
if result == bft_json_crdt::json_crdt::OpState::AlreadySeen {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if result != bft_json_crdt::json_crdt::OpState::Ok
|
||||||
|
&& result != bft_json_crdt::json_crdt::OpState::MissingCausalDependencies
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Persist the op.
|
||||||
|
if let Err(e) = state.persist_tx.send(op.clone()) {
|
||||||
|
crate::slog_error!(
|
||||||
|
"[crdt] Failed to send remote op to persist task: {e}; persist task may be dead. \
|
||||||
|
In-memory state is now ahead of persisted state."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Track in ALL_OPS + VECTOR_CLOCK.
|
||||||
|
if let Ok(json) = serde_json::to_string(&op) {
|
||||||
|
track_op(&op, json);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rebuild indices (new items or nodes may have been inserted).
|
||||||
|
state.index = rebuild_index(&state.crdt);
|
||||||
|
state.node_index = rebuild_node_index(&state.crdt);
|
||||||
|
|
||||||
|
// Detect and broadcast stage transitions.
|
||||||
|
for (sid, &idx) in &state.index {
|
||||||
|
let new_stage = match state.crdt.doc.items[idx].stage.view() {
|
||||||
|
JsonValue::String(s) => s,
|
||||||
|
_ => continue,
|
||||||
|
};
|
||||||
|
let old_stage = pre_stages.get(sid).cloned();
|
||||||
|
let changed = old_stage.as_deref() != Some(&new_stage);
|
||||||
|
if changed {
|
||||||
|
let name = match state.crdt.doc.items[idx].name.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
emit_event(CrdtEvent {
|
||||||
|
story_id: sid.clone(),
|
||||||
|
from_stage: old_stage,
|
||||||
|
to_stage: new_stage,
|
||||||
|
name,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use super::super::state::init_for_test;
|
||||||
|
use super::super::types::{NodePresenceCrdt, PipelineItemCrdt};
|
||||||
|
use super::super::write::write_item;
|
||||||
|
use bft_json_crdt::json_crdt::OpState;
|
||||||
|
use bft_json_crdt::keypair::make_keypair;
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use fastcrypto::ed25519::Ed25519KeyPair;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn signed_op_serialization_roundtrip() {
|
||||||
|
let kp = make_keypair();
|
||||||
|
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
|
||||||
|
let item: JsonValue = json!({
|
||||||
|
"story_id": "60_story_serde",
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": "Serde Test",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
|
||||||
|
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
|
||||||
|
let json_str = serde_json::to_string(&op).unwrap();
|
||||||
|
let deserialized: SignedOp = serde_json::from_str(&json_str).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(op.id(), deserialized.id());
|
||||||
|
assert_eq!(op.inner.seq, deserialized.inner.seq);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn apply_remote_op_returns_false_when_not_initialised() {
|
||||||
|
// Without the global CRDT state, apply_remote_op should return false.
|
||||||
|
let kp = make_keypair();
|
||||||
|
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
let item: JsonValue = serde_json::json!({
|
||||||
|
"story_id": "80_story_remote",
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": "Remote",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let op = crdt
|
||||||
|
.doc
|
||||||
|
.items
|
||||||
|
.insert(bft_json_crdt::op::ROOT_ID, item)
|
||||||
|
.sign(&kp);
|
||||||
|
// This uses the global state which may not be initialised in tests.
|
||||||
|
let _ = apply_remote_op(op);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn signed_op_survives_sync_serialization_roundtrip() {
|
||||||
|
// Verify that a SignedOp serialised to JSON and back produces
|
||||||
|
// the same op (critical for the sync wire protocol).
|
||||||
|
let kp = make_keypair();
|
||||||
|
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
let item: JsonValue = serde_json::json!({
|
||||||
|
"story_id": "90_story_wire",
|
||||||
|
"stage": "2_current",
|
||||||
|
"name": "Wire Test",
|
||||||
|
"agent": "coder",
|
||||||
|
"retry_count": 1.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "[10]",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let op = crdt
|
||||||
|
.doc
|
||||||
|
.items
|
||||||
|
.insert(bft_json_crdt::op::ROOT_ID, item)
|
||||||
|
.sign(&kp);
|
||||||
|
|
||||||
|
let json1 = serde_json::to_string(&op).unwrap();
|
||||||
|
let roundtripped: SignedOp = serde_json::from_str(&json1).unwrap();
|
||||||
|
let json2 = serde_json::to_string(&roundtripped).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(json1, json2);
|
||||||
|
assert_eq!(op.id(), roundtripped.id());
|
||||||
|
assert_eq!(op.inner.seq, roundtripped.inner.seq);
|
||||||
|
assert_eq!(op.author(), roundtripped.author());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn sync_broadcast_channel_round_trip() {
|
||||||
|
let (tx, mut rx) = broadcast::channel::<SignedOp>(16);
|
||||||
|
let kp = make_keypair();
|
||||||
|
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
let item: JsonValue = serde_json::json!({
|
||||||
|
"story_id": "95_story_sync_bcast",
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": "",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let op = crdt
|
||||||
|
.doc
|
||||||
|
.items
|
||||||
|
.insert(bft_json_crdt::op::ROOT_ID, item)
|
||||||
|
.sign(&kp);
|
||||||
|
tx.send(op.clone()).unwrap();
|
||||||
|
|
||||||
|
let received = rx.try_recv().unwrap();
|
||||||
|
assert_eq!(received.id(), op.id());
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_ops(
|
||||||
|
kp: &Ed25519KeyPair,
|
||||||
|
crdt: &mut BaseCrdt<PipelineDoc>,
|
||||||
|
count: usize,
|
||||||
|
prefix: &str,
|
||||||
|
) -> Vec<(SignedOp, String)> {
|
||||||
|
let mut ops = Vec::new();
|
||||||
|
for i in 0..count {
|
||||||
|
let item: JsonValue = json!({
|
||||||
|
"story_id": format!("{prefix}_{i}"),
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": format!("Item {i}"),
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let op = crdt.doc.items.insert(ROOT_ID, item).sign(kp);
|
||||||
|
crdt.apply(op.clone());
|
||||||
|
let json = serde_json::to_string(&op).unwrap();
|
||||||
|
ops.push((op, json));
|
||||||
|
}
|
||||||
|
ops
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_clock(ops: &[(SignedOp, String)]) -> VectorClock {
|
||||||
|
let mut clock = VectorClock::new();
|
||||||
|
for (op, _) in ops {
|
||||||
|
let author = hex::encode(&op.author());
|
||||||
|
*clock.entry(author).or_insert(0) += 1;
|
||||||
|
}
|
||||||
|
clock
|
||||||
|
}
|
||||||
|
|
||||||
|
fn local_ops_since(all_ops: &[(SignedOp, String)], peer_clock: &VectorClock) -> Vec<String> {
|
||||||
|
let mut author_counts: HashMap<String, u64> = HashMap::new();
|
||||||
|
let mut result = Vec::new();
|
||||||
|
for (op, json) in all_ops {
|
||||||
|
let author = hex::encode(&op.author());
|
||||||
|
let count = author_counts.entry(author.clone()).or_insert(0);
|
||||||
|
*count += 1;
|
||||||
|
let peer_has = peer_clock.get(&author).copied().unwrap_or(0);
|
||||||
|
if *count > peer_has {
|
||||||
|
result.push(json.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn delta_sync_low_bandwidth_fully_caught_up() {
|
||||||
|
let kp_a = make_keypair();
|
||||||
|
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
|
||||||
|
|
||||||
|
let ops_a = make_ops(&kp_a, &mut crdt_a, 100, "631_low");
|
||||||
|
|
||||||
|
// B has already seen all 100 ops (its clock matches A's journal).
|
||||||
|
let clock_b = build_clock(&ops_a);
|
||||||
|
|
||||||
|
// Delta should be empty.
|
||||||
|
let delta = local_ops_since(&ops_a, &clock_b);
|
||||||
|
assert_eq!(
|
||||||
|
delta.len(),
|
||||||
|
0,
|
||||||
|
"caught-up peer should receive 0 ops, got {}",
|
||||||
|
delta.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn delta_sync_mid_stream_partial_catch_up() {
|
||||||
|
let kp_a = make_keypair();
|
||||||
|
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
|
||||||
|
|
||||||
|
// Phase 1: 100 ops that B has seen.
|
||||||
|
let ops_phase1 = make_ops(&kp_a, &mut crdt_a, 100, "631_mid1");
|
||||||
|
let clock_b = build_clock(&ops_phase1);
|
||||||
|
|
||||||
|
// Phase 2: 50 more ops that B missed.
|
||||||
|
let ops_phase2 = make_ops(&kp_a, &mut crdt_a, 50, "631_mid2");
|
||||||
|
|
||||||
|
// A's full journal is phase1 + phase2.
|
||||||
|
let mut all_ops_a: Vec<(SignedOp, String)> = ops_phase1;
|
||||||
|
all_ops_a.extend(ops_phase2);
|
||||||
|
|
||||||
|
let delta = local_ops_since(&all_ops_a, &clock_b);
|
||||||
|
assert_eq!(
|
||||||
|
delta.len(),
|
||||||
|
50,
|
||||||
|
"peer should receive exactly 50 missed ops, got {}",
|
||||||
|
delta.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn delta_sync_new_node_receives_all_ops() {
|
||||||
|
let kp_a = make_keypair();
|
||||||
|
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
|
||||||
|
|
||||||
|
let ops_phase1 = make_ops(&kp_a, &mut crdt_a, 100, "631_new1");
|
||||||
|
let ops_phase2 = make_ops(&kp_a, &mut crdt_a, 50, "631_new2");
|
||||||
|
|
||||||
|
let mut all_ops_a: Vec<(SignedOp, String)> = ops_phase1;
|
||||||
|
all_ops_a.extend(ops_phase2);
|
||||||
|
|
||||||
|
// Empty clock = new node.
|
||||||
|
let empty_clock = VectorClock::new();
|
||||||
|
let delta = local_ops_since(&all_ops_a, &empty_clock);
|
||||||
|
assert_eq!(
|
||||||
|
delta.len(),
|
||||||
|
150,
|
||||||
|
"new node should receive all 150 ops, got {}",
|
||||||
|
delta.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn delta_sync_multi_author() {
|
||||||
|
use fastcrypto::traits::KeyPair;
|
||||||
|
|
||||||
|
let kp_a = make_keypair();
|
||||||
|
let kp_b = make_keypair();
|
||||||
|
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
|
||||||
|
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
|
||||||
|
|
||||||
|
let ops_a = make_ops(&kp_a, &mut crdt_a, 30, "631_ma_a");
|
||||||
|
let ops_b = make_ops(&kp_b, &mut crdt_b, 20, "631_ma_b");
|
||||||
|
|
||||||
|
// Combined journal on a hypothetical server.
|
||||||
|
let mut all_ops: Vec<(SignedOp, String)> = ops_a.clone();
|
||||||
|
all_ops.extend(ops_b);
|
||||||
|
|
||||||
|
// Peer has seen all of A's ops but none of B's.
|
||||||
|
let mut peer_clock = VectorClock::new();
|
||||||
|
let author_a_hex = hex::encode(&kp_a.public().0.to_bytes());
|
||||||
|
peer_clock.insert(author_a_hex, 30);
|
||||||
|
|
||||||
|
let delta = local_ops_since(&all_ops, &peer_clock);
|
||||||
|
assert_eq!(
|
||||||
|
delta.len(),
|
||||||
|
20,
|
||||||
|
"peer should receive 20 ops from author B, got {}",
|
||||||
|
delta.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_vector_clock_from_ops() {
|
||||||
|
use fastcrypto::traits::KeyPair;
|
||||||
|
|
||||||
|
let kp = make_keypair();
|
||||||
|
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
let ops = make_ops(&kp, &mut crdt, 10, "631_vc");
|
||||||
|
|
||||||
|
let clock = build_clock(&ops);
|
||||||
|
let author_hex = hex::encode(&kp.public().0.to_bytes());
|
||||||
|
|
||||||
|
assert_eq!(clock.len(), 1, "single author should produce 1 clock entry");
|
||||||
|
assert_eq!(clock[&author_hex], 10, "clock should show 10 ops");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn clock_message_serialization_roundtrip() {
|
||||||
|
let mut clock = VectorClock::new();
|
||||||
|
clock.insert("aabbcc".to_string(), 42);
|
||||||
|
clock.insert("ddeeff".to_string(), 7);
|
||||||
|
|
||||||
|
let json = serde_json::to_value(&clock).unwrap();
|
||||||
|
assert!(json.is_object());
|
||||||
|
let deserialized: VectorClock = serde_json::from_value(json).unwrap();
|
||||||
|
assert_eq!(deserialized["aabbcc"], 42);
|
||||||
|
assert_eq!(deserialized["ddeeff"], 7);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,179 @@
|
|||||||
|
//! Node identity, work claiming, and node presence (heartbeat) API.
|
||||||
|
|
||||||
|
use bft_json_crdt::json_crdt::*;
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use super::hex;
|
||||||
|
use super::read::read_item;
|
||||||
|
use bft_json_crdt::lww_crdt::LwwRegisterCrdt;
|
||||||
|
use fastcrypto::traits::{Signer, ToFromBytes};
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use super::state::{apply_and_persist, get_crdt, rebuild_node_index};
|
||||||
|
use super::types::{NodePresenceCrdt, NodePresenceView, PipelineDoc};
|
||||||
|
use crate::slog;
|
||||||
|
|
||||||
|
// ── Node presence API ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Return the hex-encoded Ed25519 public key for this node.
|
||||||
|
///
|
||||||
|
/// Used as the stable identity written into the CRDT nodes list.
|
||||||
|
/// Returns `None` before `init()`.
|
||||||
|
pub fn our_node_id() -> Option<String> {
|
||||||
|
let state = get_crdt()?.lock().ok()?;
|
||||||
|
Some(hex::encode(&state.crdt.id))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sign a challenge nonce with this node's keypair for WebSocket mutual auth.
|
||||||
|
///
|
||||||
|
/// Returns `(pubkey_hex, signature_hex)` or `None` before `init()`.
|
||||||
|
pub fn sign_challenge(challenge: &str) -> Option<(String, String)> {
|
||||||
|
let state = get_crdt()?.lock().ok()?;
|
||||||
|
let pubkey_hex = crate::node_identity::public_key_hex(&state.keypair);
|
||||||
|
let sig_hex = crate::node_identity::sign_challenge(&state.keypair, challenge);
|
||||||
|
Some((pubkey_hex, sig_hex))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write a claim on a pipeline item via CRDT.
|
||||||
|
///
|
||||||
|
/// Sets `claimed_by` to this node's ID and `claimed_at` to the current time.
|
||||||
|
/// The LWW register ensures deterministic conflict resolution — if two nodes
|
||||||
|
/// claim the same item simultaneously, both will converge to the same winner
|
||||||
|
/// after CRDT sync.
|
||||||
|
///
|
||||||
|
/// Returns `true` if the claim was written, `false` if the item doesn't exist
|
||||||
|
/// or CRDT is not initialised.
|
||||||
|
pub fn write_claim(story_id: &str) -> bool {
|
||||||
|
let Some(node_id) = our_node_id() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let now = chrono::Utc::now().timestamp() as f64;
|
||||||
|
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
let Some(&idx) = state.index.get(story_id) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.items[idx].claimed_by.set(node_id.clone())
|
||||||
|
});
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(now));
|
||||||
|
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Release a claim on a pipeline item (clear claimed_by and claimed_at).
|
||||||
|
pub fn release_claim(story_id: &str) {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let Some(&idx) = state.index.get(story_id) else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.items[idx].claimed_by.set(String::new())
|
||||||
|
});
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(0.0));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if this node currently holds the claim on a pipeline item.
|
||||||
|
pub fn is_claimed_by_us(story_id: &str) -> bool {
|
||||||
|
let Some(node_id) = our_node_id() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Some(item) = read_item(story_id) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
item.claimed_by.as_deref() == Some(&node_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write or update a node presence entry in the CRDT.
|
||||||
|
///
|
||||||
|
/// If a node with the given `node_id` already exists, only `last_seen`,
|
||||||
|
/// `alive`, and `address` are updated. If not, a new entry is inserted.
|
||||||
|
///
|
||||||
|
/// This is the write path for both local heartbeats and tombstoning.
|
||||||
|
pub fn write_node_presence(node_id: &str, address: &str, last_seen: f64, alive: bool) {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(&idx) = state.node_index.get(node_id) {
|
||||||
|
// Update existing entry — three separate ops so peers can merge independently.
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.nodes[idx].last_seen.set(last_seen)
|
||||||
|
});
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.nodes[idx].alive.set(alive));
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.nodes[idx].address.set(address.to_string())
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
// Insert new node entry.
|
||||||
|
let node_json: JsonValue = json!({
|
||||||
|
"node_id": node_id,
|
||||||
|
"address": address,
|
||||||
|
"last_seen": last_seen,
|
||||||
|
"alive": alive,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.nodes.insert(ROOT_ID, node_json));
|
||||||
|
|
||||||
|
// Rebuild node index after insertion.
|
||||||
|
state.node_index = rebuild_node_index(&state.crdt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read all node presence entries from the CRDT document.
|
||||||
|
///
|
||||||
|
/// Returns `None` before `init()`.
|
||||||
|
pub fn read_all_node_presence() -> Option<Vec<NodePresenceView>> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
|
||||||
|
let mut nodes = Vec::new();
|
||||||
|
for node_crdt in state.crdt.doc.nodes.iter() {
|
||||||
|
if let Some(view) = extract_node_view(node_crdt) {
|
||||||
|
nodes.push(view);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(nodes)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract a `NodePresenceView` from a `NodePresenceCrdt`.
|
||||||
|
fn extract_node_view(node: &NodePresenceCrdt) -> Option<NodePresenceView> {
|
||||||
|
let node_id = match node.node_id.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => s,
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
let address = match node.address.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => s,
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
let last_seen = match node.last_seen.view() {
|
||||||
|
JsonValue::Number(n) => n,
|
||||||
|
_ => 0.0,
|
||||||
|
};
|
||||||
|
let alive = match node.alive.view() {
|
||||||
|
JsonValue::Bool(b) => b,
|
||||||
|
_ => true,
|
||||||
|
};
|
||||||
|
Some(NodePresenceView {
|
||||||
|
node_id,
|
||||||
|
address,
|
||||||
|
last_seen,
|
||||||
|
alive,
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -0,0 +1,472 @@
|
|||||||
|
//! Read API for pipeline items, dump introspection, and dependency helpers.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use bft_json_crdt::json_crdt::*;
|
||||||
|
|
||||||
|
use super::state::{ALL_OPS, apply_and_persist, get_crdt, rebuild_index};
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use super::types::{PipelineDoc, PipelineItemCrdt, PipelineItemView};
|
||||||
|
|
||||||
|
// ── Debug dump ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// A raw dump of a single CRDT list entry, including deleted items.
|
||||||
|
///
|
||||||
|
/// Use `content_index` (hex of the list insert `OpId`) to cross-reference
|
||||||
|
/// with rows in the `crdt_ops` SQLite table.
|
||||||
|
pub struct CrdtItemDump {
|
||||||
|
pub story_id: Option<String>,
|
||||||
|
pub stage: Option<String>,
|
||||||
|
pub name: Option<String>,
|
||||||
|
pub agent: Option<String>,
|
||||||
|
pub retry_count: Option<i64>,
|
||||||
|
pub blocked: Option<bool>,
|
||||||
|
pub depends_on: Option<Vec<u32>>,
|
||||||
|
pub claimed_by: Option<String>,
|
||||||
|
pub claimed_at: Option<f64>,
|
||||||
|
/// Hex-encoded OpId of the list insert op — cross-reference with `crdt_ops`.
|
||||||
|
pub content_index: String,
|
||||||
|
pub is_deleted: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Top-level debug dump of the in-memory CRDT state.
|
||||||
|
pub struct CrdtStateDump {
|
||||||
|
pub in_memory_state_loaded: bool,
|
||||||
|
/// Count of non-deleted items with a valid story_id and stage.
|
||||||
|
pub total_items: usize,
|
||||||
|
/// Total list-level ops seen (excludes root sentinel).
|
||||||
|
pub total_ops_in_list: usize,
|
||||||
|
/// Highest Lamport sequence number seen across all list-level ops.
|
||||||
|
pub max_seq_in_list: u64,
|
||||||
|
/// Count of ops in the ALL_OPS journal (persisted ops replayed at startup).
|
||||||
|
pub persisted_ops_count: usize,
|
||||||
|
pub items: Vec<CrdtItemDump>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dump the raw in-memory CRDT state for debugging.
|
||||||
|
///
|
||||||
|
/// Unlike [`read_all_items`] this includes tombstoned (deleted) entries and
|
||||||
|
/// exposes internal op metadata (content_index, seq). Pass a `story_id`
|
||||||
|
/// filter to restrict the output to a single item.
|
||||||
|
///
|
||||||
|
/// **This is a debug tool.** For normal pipeline introspection use
|
||||||
|
/// [`read_all_items`] or the `get_pipeline_status` MCP tool instead.
|
||||||
|
pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump {
|
||||||
|
let in_memory_state_loaded = get_crdt().is_some();
|
||||||
|
|
||||||
|
let persisted_ops_count = ALL_OPS
|
||||||
|
.get()
|
||||||
|
.and_then(|m| m.lock().ok().map(|v| v.len()))
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return CrdtStateDump {
|
||||||
|
in_memory_state_loaded,
|
||||||
|
total_items: 0,
|
||||||
|
total_ops_in_list: 0,
|
||||||
|
max_seq_in_list: 0,
|
||||||
|
persisted_ops_count,
|
||||||
|
items: Vec::new(),
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
let Ok(state) = state_mutex.lock() else {
|
||||||
|
return CrdtStateDump {
|
||||||
|
in_memory_state_loaded,
|
||||||
|
total_items: 0,
|
||||||
|
total_ops_in_list: 0,
|
||||||
|
max_seq_in_list: 0,
|
||||||
|
persisted_ops_count,
|
||||||
|
items: Vec::new(),
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
let total_items = state.crdt.doc.items.iter().count();
|
||||||
|
|
||||||
|
let max_seq_in_list = state
|
||||||
|
.crdt
|
||||||
|
.doc
|
||||||
|
.items
|
||||||
|
.ops
|
||||||
|
.iter()
|
||||||
|
.map(|op| op.seq)
|
||||||
|
.max()
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
// Subtract 1 for the root sentinel.
|
||||||
|
let total_ops_in_list = state.crdt.doc.items.ops.len().saturating_sub(1);
|
||||||
|
|
||||||
|
let mut items = Vec::new();
|
||||||
|
for op in &state.crdt.doc.items.ops {
|
||||||
|
// Skip root sentinel (id == [0u8; 32]).
|
||||||
|
if op.id == ROOT_ID {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let Some(ref item_crdt) = op.content else {
|
||||||
|
// No content — skip (orphaned slot, should not happen in normal use).
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
let story_id = match item_crdt.story_id.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Apply story_id filter before doing any further work.
|
||||||
|
if let Some(filter) = story_id_filter
|
||||||
|
&& story_id.as_deref() != Some(filter)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let stage = match item_crdt.stage.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let name = match item_crdt.name.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let agent = match item_crdt.agent.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let retry_count = match item_crdt.retry_count.view() {
|
||||||
|
JsonValue::Number(n) if n > 0.0 => Some(n as i64),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let blocked = match item_crdt.blocked.view() {
|
||||||
|
JsonValue::Bool(b) => Some(b),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let depends_on = match item_crdt.depends_on.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => serde_json::from_str::<Vec<u32>>(&s).ok(),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let claimed_by = match item_crdt.claimed_by.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let claimed_at = match item_crdt.claimed_at.view() {
|
||||||
|
JsonValue::Number(n) if n > 0.0 => Some(n),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let content_index = op.id.iter().map(|b| format!("{b:02x}")).collect::<String>();
|
||||||
|
|
||||||
|
items.push(CrdtItemDump {
|
||||||
|
story_id,
|
||||||
|
stage,
|
||||||
|
name,
|
||||||
|
agent,
|
||||||
|
retry_count,
|
||||||
|
blocked,
|
||||||
|
depends_on,
|
||||||
|
claimed_by,
|
||||||
|
claimed_at,
|
||||||
|
content_index,
|
||||||
|
is_deleted: op.is_deleted,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
CrdtStateDump {
|
||||||
|
in_memory_state_loaded,
|
||||||
|
total_items,
|
||||||
|
total_ops_in_list,
|
||||||
|
max_seq_in_list,
|
||||||
|
persisted_ops_count,
|
||||||
|
items,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Read path ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Read the full pipeline state from the CRDT document.
|
||||||
|
///
|
||||||
|
/// Returns items grouped by stage, or `None` if the CRDT layer is not
|
||||||
|
/// initialised.
|
||||||
|
pub fn read_all_items() -> Option<Vec<PipelineItemView>> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
|
||||||
|
// Only return items that appear in the deduplicated index.
|
||||||
|
// The index maps story_id → visible_index and represents the
|
||||||
|
// latest-wins view of each story. Iterating raw CRDT entries
|
||||||
|
// would return stale duplicates from earlier stage writes.
|
||||||
|
let mut items = Vec::with_capacity(state.index.len());
|
||||||
|
for &idx in state.index.values() {
|
||||||
|
if let Some(view) = extract_item_view(&state.crdt.doc.items[idx]) {
|
||||||
|
items.push(view);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(items)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read a single pipeline item by story_id.
|
||||||
|
pub fn read_item(story_id: &str) -> Option<PipelineItemView> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
let &idx = state.index.get(story_id)?;
|
||||||
|
extract_item_view(&state.crdt.doc.items[idx])
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark a story as deleted in the in-memory CRDT and persist a tombstone op.
|
||||||
|
///
|
||||||
|
/// This is the eviction primitive for story 521 — it lets external callers
|
||||||
|
/// (e.g. the `purge_story` MCP tool, or operator scripts during incident
|
||||||
|
/// response) clear an item from the running server's in-memory state
|
||||||
|
/// without needing a full process restart.
|
||||||
|
///
|
||||||
|
/// Specifically:
|
||||||
|
/// 1. Looks up the item's CRDT `OpId` via the visible-index map.
|
||||||
|
/// 2. Constructs a delete op via the bft-json-crdt list `delete()` primitive.
|
||||||
|
/// 3. Signs it with the local node's keypair and applies it to the in-memory
|
||||||
|
/// CRDT (marking the item `is_deleted = true` so subsequent
|
||||||
|
/// `read_all_items` / `read_item` calls skip it).
|
||||||
|
/// 4. Persists the signed delete op to `crdt_ops` via the existing
|
||||||
|
/// `apply_and_persist` channel — so the eviction survives a restart.
|
||||||
|
/// 5. Rebuilds the `story_id → visible_index` map (visible indices shift
|
||||||
|
/// when an item is marked deleted).
|
||||||
|
/// 6. Drops the in-memory content-store entry for the story so the cached
|
||||||
|
/// body doesn't outlive the CRDT entry.
|
||||||
|
///
|
||||||
|
/// Returns `Ok(())` if the item was found and a tombstone op was queued,
|
||||||
|
/// or an `Err` if the CRDT layer isn't initialised or the story_id is
|
||||||
|
/// unknown to the in-memory state.
|
||||||
|
pub fn evict_item(story_id: &str) -> Result<(), String> {
|
||||||
|
let state_mutex = get_crdt().ok_or_else(|| "CRDT layer not initialised".to_string())?;
|
||||||
|
let mut state = state_mutex
|
||||||
|
.lock()
|
||||||
|
.map_err(|e| format!("CRDT lock poisoned: {e}"))?;
|
||||||
|
|
||||||
|
let idx = state
|
||||||
|
.index
|
||||||
|
.get(story_id)
|
||||||
|
.copied()
|
||||||
|
.ok_or_else(|| format!("Story '{story_id}' not found in in-memory CRDT"))?;
|
||||||
|
|
||||||
|
// Resolve the item's OpId before the closure (the closure will mutably
|
||||||
|
// borrow `state`, so we can't access `state.crdt.doc.items` from inside).
|
||||||
|
let item_id =
|
||||||
|
state.crdt.doc.items.id_at(idx).ok_or_else(|| {
|
||||||
|
format!("Item index {idx} for '{story_id}' did not resolve to an OpId")
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Write the delete op via the existing apply_and_persist machinery.
|
||||||
|
// This signs the op, applies it to the in-memory CRDT (marking the item
|
||||||
|
// is_deleted), and sends it to the persistence task.
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.items.delete(item_id));
|
||||||
|
|
||||||
|
// Rebuild the story_id → visible_index map; the deleted item is no
|
||||||
|
// longer counted by the iter that rebuild_index uses.
|
||||||
|
state.index = rebuild_index(&state.crdt);
|
||||||
|
|
||||||
|
// Drop the content-store entry so the cached body doesn't outlive the
|
||||||
|
// CRDT entry. (Bug 521 follow-up: when CONTENT_STORE becomes a true
|
||||||
|
// lazy cache, this explicit eviction can go away.)
|
||||||
|
crate::db::delete_content(story_id);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract a `PipelineItemView` from a `PipelineItemCrdt`.
|
||||||
|
pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemView> {
|
||||||
|
let story_id = match item.story_id.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => s,
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
let stage = match item.stage.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => s,
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
let name = match item.name.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let agent = match item.agent.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let retry_count = match item.retry_count.view() {
|
||||||
|
JsonValue::Number(n) if n > 0.0 => Some(n as i64),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let blocked = match item.blocked.view() {
|
||||||
|
JsonValue::Bool(b) => Some(b),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let depends_on = match item.depends_on.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => serde_json::from_str::<Vec<u32>>(&s).ok(),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let claimed_by = match item.claimed_by.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let claimed_at = match item.claimed_at.view() {
|
||||||
|
JsonValue::Number(n) if n > 0.0 => Some(n),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let merged_at = match item.merged_at.view() {
|
||||||
|
JsonValue::Number(n) if n > 0.0 => Some(n),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(PipelineItemView {
|
||||||
|
story_id,
|
||||||
|
stage,
|
||||||
|
name,
|
||||||
|
agent,
|
||||||
|
retry_count,
|
||||||
|
blocked,
|
||||||
|
depends_on,
|
||||||
|
claimed_by,
|
||||||
|
claimed_at,
|
||||||
|
merged_at,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check whether a dependency (by numeric ID prefix) is in `5_done` or `6_archived`
|
||||||
|
/// according to CRDT state.
|
||||||
|
///
|
||||||
|
/// Returns `true` if the dependency is satisfied (item found in a done stage).
|
||||||
|
/// See `dep_is_archived_crdt` to distinguish archive-satisfied from cleanly-done.
|
||||||
|
pub fn dep_is_done_crdt(dep_number: u32) -> bool {
|
||||||
|
let prefix = format!("{dep_number}_");
|
||||||
|
if let Some(items) = read_all_items() {
|
||||||
|
items.iter().any(|item| {
|
||||||
|
item.story_id.starts_with(&prefix)
|
||||||
|
&& matches!(item.stage.as_str(), "5_done" | "6_archived")
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check whether a dependency (by numeric ID prefix) is specifically in `6_archived`
|
||||||
|
/// according to CRDT state.
|
||||||
|
///
|
||||||
|
/// Used to detect when a dependency is satisfied via archive rather than via a clean
|
||||||
|
/// completion through `5_done`. Returns `false` when the CRDT layer is not initialised.
|
||||||
|
pub fn dep_is_archived_crdt(dep_number: u32) -> bool {
|
||||||
|
let prefix = format!("{dep_number}_");
|
||||||
|
if let Some(items) = read_all_items() {
|
||||||
|
items
|
||||||
|
.iter()
|
||||||
|
.any(|item| item.story_id.starts_with(&prefix) && item.stage == "6_archived")
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check unmet dependencies for a story by reading its `depends_on` from the
|
||||||
|
/// CRDT document and checking each dependency against CRDT state.
|
||||||
|
///
|
||||||
|
/// Returns the list of dependency numbers that are NOT in `5_done` or `6_archived`.
|
||||||
|
pub fn check_unmet_deps_crdt(story_id: &str) -> Vec<u32> {
|
||||||
|
let item = match read_item(story_id) {
|
||||||
|
Some(i) => i,
|
||||||
|
None => return Vec::new(),
|
||||||
|
};
|
||||||
|
let deps = match item.depends_on {
|
||||||
|
Some(d) => d,
|
||||||
|
None => return Vec::new(),
|
||||||
|
};
|
||||||
|
deps.into_iter()
|
||||||
|
.filter(|&dep| !dep_is_done_crdt(dep))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return the list of dependency numbers from `story_id`'s `depends_on` that are
|
||||||
|
/// specifically in `6_archived` according to CRDT state.
|
||||||
|
///
|
||||||
|
/// Used to emit a warning when promotion fires because a dep is archived rather than
|
||||||
|
/// cleanly completed. Returns an empty `Vec` when no deps are archived.
|
||||||
|
pub fn check_archived_deps_crdt(story_id: &str) -> Vec<u32> {
|
||||||
|
let item = match read_item(story_id) {
|
||||||
|
Some(i) => i,
|
||||||
|
None => return Vec::new(),
|
||||||
|
};
|
||||||
|
let deps = match item.depends_on {
|
||||||
|
Some(d) => d,
|
||||||
|
None => return Vec::new(),
|
||||||
|
};
|
||||||
|
deps.into_iter()
|
||||||
|
.filter(|&dep| dep_is_archived_crdt(dep))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use super::super::state::init_for_test;
|
||||||
|
use super::super::types::PipelineItemCrdt;
|
||||||
|
use super::super::write::write_item;
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use super::super::state::rebuild_index;
|
||||||
|
use bft_json_crdt::json_crdt::OpState;
|
||||||
|
use bft_json_crdt::keypair::make_keypair;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn extract_item_view_parses_crdt_item() {
|
||||||
|
let kp = make_keypair();
|
||||||
|
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
|
||||||
|
let item_json: JsonValue = json!({
|
||||||
|
"story_id": "40_story_view",
|
||||||
|
"stage": "3_qa",
|
||||||
|
"name": "View Test",
|
||||||
|
"agent": "coder-1",
|
||||||
|
"retry_count": 2.0,
|
||||||
|
"blocked": true,
|
||||||
|
"depends_on": "[10,20]",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
|
||||||
|
let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
|
||||||
|
crdt.apply(op);
|
||||||
|
|
||||||
|
let view = extract_item_view(&crdt.doc.items[0]).unwrap();
|
||||||
|
assert_eq!(view.story_id, "40_story_view");
|
||||||
|
assert_eq!(view.stage, "3_qa");
|
||||||
|
assert_eq!(view.name.as_deref(), Some("View Test"));
|
||||||
|
assert_eq!(view.agent.as_deref(), Some("coder-1"));
|
||||||
|
assert_eq!(view.retry_count, Some(2));
|
||||||
|
assert_eq!(view.blocked, Some(true));
|
||||||
|
assert_eq!(view.depends_on, Some(vec![10, 20]));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dep_is_done_crdt_returns_false_when_no_crdt_state() {
|
||||||
|
// When the global CRDT state is not initialised (or in a test environment),
|
||||||
|
// dep_is_done_crdt should return false rather than panicking.
|
||||||
|
// Note: in the test binary the global may or may not be initialised,
|
||||||
|
// but the function should never panic either way.
|
||||||
|
let _ = dep_is_done_crdt(9999);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn check_unmet_deps_crdt_returns_empty_when_item_not_found() {
|
||||||
|
// Non-existent story should return empty deps.
|
||||||
|
let result = check_unmet_deps_crdt("nonexistent_story");
|
||||||
|
assert!(result.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dep_is_archived_crdt_returns_false_when_no_crdt_state() {
|
||||||
|
// When the global CRDT state is not initialised, must not panic.
|
||||||
|
let _ = dep_is_archived_crdt(9998);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn check_archived_deps_crdt_returns_empty_when_item_not_found() {
|
||||||
|
// Non-existent story should return empty archived deps.
|
||||||
|
let result = check_archived_deps_crdt("nonexistent_story_archived");
|
||||||
|
assert!(result.is_empty());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,535 @@
|
|||||||
|
//! Internal CRDT state struct, statics, initialisation, and central write primitive.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::{Mutex, OnceLock};
|
||||||
|
|
||||||
|
use bft_json_crdt::json_crdt::*;
|
||||||
|
use bft_json_crdt::keypair::make_keypair;
|
||||||
|
use fastcrypto::ed25519::Ed25519KeyPair;
|
||||||
|
use fastcrypto::traits::ToFromBytes;
|
||||||
|
use serde_json::json;
|
||||||
|
use sqlx::SqlitePool;
|
||||||
|
use sqlx::sqlite::SqliteConnectOptions;
|
||||||
|
use tokio::sync::{broadcast, mpsc};
|
||||||
|
|
||||||
|
use super::VectorClock;
|
||||||
|
use super::hex;
|
||||||
|
use super::types::{CrdtEvent, PipelineDoc};
|
||||||
|
use crate::slog;
|
||||||
|
|
||||||
|
// ── Sync broadcast channels ──────────────────────────────────────────
|
||||||
|
|
||||||
|
pub(super) static CRDT_EVENT_TX: OnceLock<broadcast::Sender<CrdtEvent>> = OnceLock::new();
|
||||||
|
|
||||||
|
pub(super) static SYNC_TX: OnceLock<broadcast::Sender<SignedOp>> = OnceLock::new();
|
||||||
|
|
||||||
|
/// All persisted ops as JSON strings, in causal (insertion) order.
|
||||||
|
///
|
||||||
|
/// Pub(crate) so that `crdt_snapshot` can access it for compaction.
|
||||||
|
pub(crate) static ALL_OPS: OnceLock<Mutex<Vec<String>>> = OnceLock::new();
|
||||||
|
|
||||||
|
/// Live vector clock tracking op counts per author.
|
||||||
|
///
|
||||||
|
/// Updated in lockstep with `ALL_OPS` — every time an op is appended to the
|
||||||
|
/// journal, the corresponding author's count is incremented here. This avoids
|
||||||
|
/// re-parsing all ops when a peer requests `our_vector_clock()`.
|
||||||
|
pub(crate) static VECTOR_CLOCK: OnceLock<Mutex<super::VectorClock>> = OnceLock::new();
|
||||||
|
|
||||||
|
/// Append an op's JSON to `ALL_OPS` and bump the author's count in `VECTOR_CLOCK`.
|
||||||
|
///
|
||||||
|
/// Centralises the bookkeeping that must stay in sync between the two statics.
|
||||||
|
pub(super) fn track_op(signed: &SignedOp, json: String) {
|
||||||
|
if let Some(all) = ALL_OPS.get()
|
||||||
|
&& let Ok(mut v) = all.lock()
|
||||||
|
{
|
||||||
|
v.push(json);
|
||||||
|
}
|
||||||
|
if let Some(vc) = VECTOR_CLOCK.get()
|
||||||
|
&& let Ok(mut clock) = vc.lock()
|
||||||
|
{
|
||||||
|
let author_hex = super::hex::encode(&signed.author());
|
||||||
|
*clock.entry(author_hex).or_insert(0) += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) struct CrdtState {
|
||||||
|
pub(super) crdt: BaseCrdt<PipelineDoc>,
|
||||||
|
pub(super) keypair: Ed25519KeyPair,
|
||||||
|
/// Maps story_id → index in the items ListCrdt for O(1) lookup.
|
||||||
|
pub(super) index: HashMap<String, usize>,
|
||||||
|
/// Maps node_id (hex) → index in the nodes ListCrdt for O(1) lookup.
|
||||||
|
pub(super) node_index: HashMap<String, usize>,
|
||||||
|
/// Channel sender for fire-and-forget op persistence.
|
||||||
|
pub(super) persist_tx: mpsc::UnboundedSender<SignedOp>,
|
||||||
|
}
|
||||||
|
|
||||||
|
static CRDT_STATE: OnceLock<Mutex<CrdtState>> = OnceLock::new();
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
thread_local! {
|
||||||
|
static CRDT_STATE_TL: OnceLock<Mutex<CrdtState>> = const { OnceLock::new() };
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(test))]
|
||||||
|
pub(super) fn get_crdt() -> Option<&'static Mutex<CrdtState>> {
|
||||||
|
CRDT_STATE.get()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(super) fn get_crdt() -> Option<&'static Mutex<CrdtState>> {
|
||||||
|
let tl = CRDT_STATE_TL.with(|lock| {
|
||||||
|
if lock.get().is_some() {
|
||||||
|
Some(lock as *const OnceLock<Mutex<CrdtState>>)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if let Some(ptr) = tl {
|
||||||
|
// SAFETY: The thread-local lives as long as the thread, which outlives
|
||||||
|
// any test using it. We only need 'static for the return type.
|
||||||
|
let lock = unsafe { &*ptr };
|
||||||
|
lock.get()
|
||||||
|
} else {
|
||||||
|
CRDT_STATE.get()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/// Initialise the CRDT state layer.
|
||||||
|
///
|
||||||
|
/// Opens the SQLite database, loads or creates a node keypair, replays any
|
||||||
|
/// persisted ops to reconstruct state, and spawns a background persistence
|
||||||
|
/// task. Safe to call only once; subsequent calls are no-ops.
|
||||||
|
pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
|
||||||
|
if CRDT_STATE.get().is_some() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let options = SqliteConnectOptions::new()
|
||||||
|
.filename(db_path)
|
||||||
|
.create_if_missing(true);
|
||||||
|
let pool = SqlitePool::connect_with(options).await?;
|
||||||
|
sqlx::migrate!("./migrations").run(&pool).await?;
|
||||||
|
|
||||||
|
// Load or create the node keypair.
|
||||||
|
let keypair = load_or_create_keypair(&pool).await?;
|
||||||
|
let mut crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
|
||||||
|
|
||||||
|
// Replay persisted ops to reconstruct state.
|
||||||
|
let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
|
||||||
|
.fetch_all(&pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut all_ops_vec = Vec::with_capacity(rows.len());
|
||||||
|
let mut vector_clock = VectorClock::new();
|
||||||
|
for (op_json,) in &rows {
|
||||||
|
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json) {
|
||||||
|
let author_hex = hex::encode(&signed_op.author());
|
||||||
|
*vector_clock.entry(author_hex).or_insert(0) += 1;
|
||||||
|
crdt.apply(signed_op);
|
||||||
|
all_ops_vec.push(op_json.clone());
|
||||||
|
} else {
|
||||||
|
slog!("[crdt] Warning: failed to deserialize stored op");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let _ = ALL_OPS.set(Mutex::new(all_ops_vec));
|
||||||
|
let _ = VECTOR_CLOCK.set(Mutex::new(vector_clock));
|
||||||
|
|
||||||
|
// Build the indices from the reconstructed state.
|
||||||
|
let index = rebuild_index(&crdt);
|
||||||
|
let node_index = rebuild_node_index(&crdt);
|
||||||
|
|
||||||
|
slog!(
|
||||||
|
"[crdt] Initialised: {} ops replayed, {} items indexed, {} nodes indexed",
|
||||||
|
rows.len(),
|
||||||
|
index.len(),
|
||||||
|
node_index.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Spawn background persistence task.
|
||||||
|
let (persist_tx, mut persist_rx) = mpsc::unbounded_channel::<SignedOp>();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some(op) = persist_rx.recv().await {
|
||||||
|
let op_json = match serde_json::to_string(&op) {
|
||||||
|
Ok(j) => j,
|
||||||
|
Err(e) => {
|
||||||
|
slog!("[crdt] Failed to serialize op: {e}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let op_id = hex::encode(&op.id());
|
||||||
|
let seq = op.inner.seq as i64;
|
||||||
|
let now = chrono::Utc::now().to_rfc3339();
|
||||||
|
|
||||||
|
let result = sqlx::query(
|
||||||
|
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \
|
||||||
|
VALUES (?1, ?2, ?3, ?4) \
|
||||||
|
ON CONFLICT(op_id) DO NOTHING",
|
||||||
|
)
|
||||||
|
.bind(&op_id)
|
||||||
|
.bind(seq)
|
||||||
|
.bind(&op_json)
|
||||||
|
.bind(&now)
|
||||||
|
.execute(&pool)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
if let Err(e) = result {
|
||||||
|
slog!("[crdt] Failed to persist op {}: {e}", &op_id[..12]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let state = CrdtState {
|
||||||
|
crdt,
|
||||||
|
keypair,
|
||||||
|
index,
|
||||||
|
node_index,
|
||||||
|
persist_tx,
|
||||||
|
};
|
||||||
|
|
||||||
|
let _ = CRDT_STATE.set(Mutex::new(state));
|
||||||
|
|
||||||
|
// Initialise the CRDT event broadcast channel.
|
||||||
|
let (event_tx, _) = broadcast::channel::<CrdtEvent>(256);
|
||||||
|
let _ = CRDT_EVENT_TX.set(event_tx);
|
||||||
|
|
||||||
|
// Initialise the sync broadcast channel for outgoing ops.
|
||||||
|
let (sync_tx, _) = broadcast::channel::<SignedOp>(1024);
|
||||||
|
let _ = SYNC_TX.set(sync_tx);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initialise a minimal in-memory CRDT state for unit tests.
|
||||||
|
///
|
||||||
|
/// This avoids the async SQLite setup from `init()`. Ops are accepted via a
|
||||||
|
/// channel whose receiver is immediately dropped, so nothing is persisted.
|
||||||
|
/// Safe to call multiple times — subsequent calls are no-ops (OnceLock).
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn init_for_test() {
|
||||||
|
// Initialise thread-local CRDT for test isolation.
|
||||||
|
// Only creates a new CRDT if one isn't set yet on this thread;
|
||||||
|
// subsequent calls are no-ops (matching the old OnceLock semantics
|
||||||
|
// while keeping each thread isolated).
|
||||||
|
CRDT_STATE_TL.with(|lock| {
|
||||||
|
if lock.get().is_none() {
|
||||||
|
let keypair = make_keypair();
|
||||||
|
let crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
|
||||||
|
let (persist_tx, _rx) = mpsc::unbounded_channel();
|
||||||
|
let state = CrdtState {
|
||||||
|
crdt,
|
||||||
|
keypair,
|
||||||
|
index: HashMap::new(),
|
||||||
|
node_index: HashMap::new(),
|
||||||
|
persist_tx,
|
||||||
|
};
|
||||||
|
let _ = lock.set(Mutex::new(state));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
let _ = CRDT_EVENT_TX.get_or_init(|| broadcast::channel::<CrdtEvent>(256).0);
|
||||||
|
let _ = SYNC_TX.get_or_init(|| broadcast::channel::<SignedOp>(1024).0);
|
||||||
|
let _ = ALL_OPS.get_or_init(|| Mutex::new(Vec::new()));
|
||||||
|
let _ = VECTOR_CLOCK.get_or_init(|| Mutex::new(VectorClock::new()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Load or create the Ed25519 keypair used by this node.
|
||||||
|
async fn load_or_create_keypair(pool: &SqlitePool) -> Result<Ed25519KeyPair, sqlx::Error> {
|
||||||
|
let row: Option<(Vec<u8>,)> =
|
||||||
|
sqlx::query_as("SELECT seed FROM crdt_node_identity WHERE id = 1")
|
||||||
|
.fetch_optional(pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if let Some((seed,)) = row {
|
||||||
|
// Reconstruct from stored seed. The seed is the 32-byte private key.
|
||||||
|
if let Ok(kp) = Ed25519KeyPair::from_bytes(&seed) {
|
||||||
|
return Ok(kp);
|
||||||
|
}
|
||||||
|
slog!("[crdt] Stored keypair invalid, regenerating");
|
||||||
|
}
|
||||||
|
|
||||||
|
let kp = make_keypair();
|
||||||
|
let seed = kp.as_bytes().to_vec();
|
||||||
|
sqlx::query("INSERT INTO crdt_node_identity (id, seed) VALUES (1, ?1) ON CONFLICT(id) DO UPDATE SET seed = excluded.seed")
|
||||||
|
.bind(&seed)
|
||||||
|
.execute(pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(kp)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Rebuild the story_id → list index mapping from the current CRDT state.
|
||||||
|
pub(super) fn rebuild_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
|
||||||
|
let mut map = HashMap::new();
|
||||||
|
for (i, item) in crdt.doc.items.iter().enumerate() {
|
||||||
|
if let JsonValue::String(ref sid) = item.story_id.view() {
|
||||||
|
map.insert(sid.clone(), i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
map
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Rebuild the node_id → nodes list index mapping from the current CRDT state.
|
||||||
|
pub(super) fn rebuild_node_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
|
||||||
|
let mut map = HashMap::new();
|
||||||
|
for (i, node) in crdt.doc.nodes.iter().enumerate() {
|
||||||
|
if let JsonValue::String(ref nid) = node.node_id.view() {
|
||||||
|
map.insert(nid.clone(), i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
map
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Write path ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the
|
||||||
|
/// persistence channel. The closure receives `&mut CrdtState` so it can
|
||||||
|
/// mutably access the CRDT document, while `sign` only needs `&keypair`.
|
||||||
|
pub(super) fn apply_and_persist<F>(state: &mut CrdtState, op_fn: F)
|
||||||
|
where
|
||||||
|
F: FnOnce(&mut CrdtState) -> bft_json_crdt::op::Op<JsonValue>,
|
||||||
|
{
|
||||||
|
let raw_op = op_fn(state);
|
||||||
|
let signed = raw_op.sign(&state.keypair);
|
||||||
|
state.crdt.apply(signed.clone());
|
||||||
|
if let Err(e) = state.persist_tx.send(signed.clone()) {
|
||||||
|
crate::slog_error!(
|
||||||
|
"[crdt] Failed to send op to persist task: {e}; persist task may be dead. \
|
||||||
|
In-memory state is now ahead of persisted state."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Track in ALL_OPS + VECTOR_CLOCK, then broadcast to sync peers.
|
||||||
|
if let Ok(json) = serde_json::to_string(&signed) {
|
||||||
|
track_op(&signed, json);
|
||||||
|
}
|
||||||
|
if let Some(tx) = SYNC_TX.get() {
|
||||||
|
let _ = tx.send(signed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Broadcast a CRDT event to all subscribers.
|
||||||
|
pub(super) fn emit_event(event: CrdtEvent) {
|
||||||
|
if let Some(tx) = CRDT_EVENT_TX.get() {
|
||||||
|
let _ = tx.send(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use super::super::types::PipelineItemCrdt;
|
||||||
|
use super::super::write::write_item;
|
||||||
|
use super::super::read::{extract_item_view, read_item};
|
||||||
|
use bft_json_crdt::json_crdt::OpState;
|
||||||
|
use bft_json_crdt::keypair::make_keypair;
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use super::super::hex;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn crdt_ops_replay_reconstructs_state() {
|
||||||
|
let kp = make_keypair();
|
||||||
|
let mut crdt1 = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
|
||||||
|
// Build state with a series of ops.
|
||||||
|
let item_json: JsonValue = json!({
|
||||||
|
"story_id": "30_story_replay",
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": "Replay Test",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
|
||||||
|
let op1 = crdt1.doc.items.insert(ROOT_ID, item_json).sign(&kp);
|
||||||
|
crdt1.apply(op1.clone());
|
||||||
|
|
||||||
|
let op2 = crdt1.doc.items[0]
|
||||||
|
.stage
|
||||||
|
.set("2_current".to_string())
|
||||||
|
.sign(&kp);
|
||||||
|
crdt1.apply(op2.clone());
|
||||||
|
|
||||||
|
let op3 = crdt1.doc.items[0]
|
||||||
|
.name
|
||||||
|
.set("Updated Name".to_string())
|
||||||
|
.sign(&kp);
|
||||||
|
crdt1.apply(op3.clone());
|
||||||
|
|
||||||
|
// Replay ops on a fresh CRDT.
|
||||||
|
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
crdt2.apply(op1);
|
||||||
|
crdt2.apply(op2);
|
||||||
|
crdt2.apply(op3);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
crdt1.doc.items[0].stage.view(),
|
||||||
|
crdt2.doc.items[0].stage.view()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
crdt1.doc.items[0].name.view(),
|
||||||
|
crdt2.doc.items[0].name.view()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn rebuild_index_maps_story_ids() {
|
||||||
|
let kp = make_keypair();
|
||||||
|
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
|
||||||
|
for (sid, stage) in &[("10_story_a", "1_backlog"), ("20_story_b", "2_current")] {
|
||||||
|
let item: JsonValue = json!({
|
||||||
|
"story_id": sid,
|
||||||
|
"stage": stage,
|
||||||
|
"name": "",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
|
||||||
|
crdt.apply(op);
|
||||||
|
}
|
||||||
|
|
||||||
|
let index = rebuild_index(&crdt);
|
||||||
|
assert_eq!(index.len(), 2);
|
||||||
|
assert!(index.contains_key("10_story_a"));
|
||||||
|
assert!(index.contains_key("20_story_b"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn init_and_write_read_roundtrip() {
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let db_path = tmp.path().join("crdt_test.db");
|
||||||
|
|
||||||
|
// Init directly (not via the global singleton, for test isolation).
|
||||||
|
let options = SqliteConnectOptions::new()
|
||||||
|
.filename(&db_path)
|
||||||
|
.create_if_missing(true);
|
||||||
|
let pool = SqlitePool::connect_with(options).await.unwrap();
|
||||||
|
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
||||||
|
|
||||||
|
let keypair = make_keypair();
|
||||||
|
let mut crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
|
||||||
|
|
||||||
|
// Insert and update like write_item does.
|
||||||
|
let item_json: JsonValue = json!({
|
||||||
|
"story_id": "50_story_roundtrip",
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": "Roundtrip",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
|
||||||
|
let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&keypair);
|
||||||
|
crdt.apply(insert_op.clone());
|
||||||
|
|
||||||
|
// Persist the op.
|
||||||
|
let op_json = serde_json::to_string(&insert_op).unwrap();
|
||||||
|
let op_id = hex::encode(&insert_op.id());
|
||||||
|
let now = chrono::Utc::now().to_rfc3339();
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)",
|
||||||
|
)
|
||||||
|
.bind(&op_id)
|
||||||
|
.bind(insert_op.inner.seq as i64)
|
||||||
|
.bind(&op_json)
|
||||||
|
.bind(&now)
|
||||||
|
.execute(&pool)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Reconstruct from DB.
|
||||||
|
let rows: Vec<(String,)> =
|
||||||
|
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
|
||||||
|
.fetch_all(&pool)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&keypair);
|
||||||
|
for (json_str,) in &rows {
|
||||||
|
let op: SignedOp = serde_json::from_str(json_str).unwrap();
|
||||||
|
crdt2.apply(op);
|
||||||
|
}
|
||||||
|
|
||||||
|
let view = extract_item_view(&crdt2.doc.items[0]).unwrap();
|
||||||
|
assert_eq!(view.story_id, "50_story_roundtrip");
|
||||||
|
assert_eq!(view.stage, "1_backlog");
|
||||||
|
assert_eq!(view.name.as_deref(), Some("Roundtrip"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn persist_tx_send_failure_logs_error() {
|
||||||
|
let kp = make_keypair();
|
||||||
|
let crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
let (persist_tx, persist_rx) = mpsc::unbounded_channel::<SignedOp>();
|
||||||
|
|
||||||
|
let mut state = CrdtState {
|
||||||
|
crdt,
|
||||||
|
keypair: kp,
|
||||||
|
index: HashMap::new(),
|
||||||
|
node_index: HashMap::new(),
|
||||||
|
persist_tx,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Drop the receiver so that the next send fails immediately.
|
||||||
|
drop(persist_rx);
|
||||||
|
|
||||||
|
let item_json: JsonValue = json!({
|
||||||
|
"story_id": "518_story_persist_fail",
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": "Persist Fail Test",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
|
||||||
|
let before_errors = crate::log_buffer::global()
|
||||||
|
.get_recent_entries(1000, None, Some(&crate::log_buffer::LogLevel::Error))
|
||||||
|
.len();
|
||||||
|
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json));
|
||||||
|
|
||||||
|
let error_entries = crate::log_buffer::global().get_recent_entries(
|
||||||
|
1000,
|
||||||
|
None,
|
||||||
|
Some(&crate::log_buffer::LogLevel::Error),
|
||||||
|
);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
error_entries.len() > before_errors,
|
||||||
|
"expected an ERROR log entry when persist_tx send fails, but none was added"
|
||||||
|
);
|
||||||
|
|
||||||
|
let last_error = &error_entries[error_entries.len() - 1];
|
||||||
|
assert!(
|
||||||
|
last_error.message.contains("persist"),
|
||||||
|
"error message should mention persist: {}",
|
||||||
|
last_error.message
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
last_error.message.contains("ahead") || last_error.message.contains("diverged"),
|
||||||
|
"error message should note in-memory/persisted divergence: {}",
|
||||||
|
last_error.message
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,249 @@
|
|||||||
|
//! CRDT document types, read-side view types, and CRDT-state events.
|
||||||
|
|
||||||
|
use bft_json_crdt::json_crdt::*;
|
||||||
|
use bft_json_crdt::list_crdt::ListCrdt;
|
||||||
|
use bft_json_crdt::lww_crdt::LwwRegisterCrdt;
|
||||||
|
use std::sync::OnceLock;
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
|
|
||||||
|
/// An event emitted when a pipeline item's stage changes in the CRDT document.
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct CrdtEvent {
|
||||||
|
/// Work item ID (e.g. `"42_story_my_feature"`).
|
||||||
|
pub story_id: String,
|
||||||
|
/// The stage the item was in before this transition, or `None` for new items.
|
||||||
|
pub from_stage: Option<String>,
|
||||||
|
/// The stage the item is now in.
|
||||||
|
pub to_stage: String,
|
||||||
|
/// Human-readable story name from the CRDT document.
|
||||||
|
pub name: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Subscribe to CRDT state transition events.
|
||||||
|
///
|
||||||
|
/// Returns `None` if the CRDT layer has not been initialised yet.
|
||||||
|
pub fn subscribe() -> Option<broadcast::Receiver<CrdtEvent>> {
|
||||||
|
CRDT_EVENT_TX.get().map(|tx| tx.subscribe())
|
||||||
|
}
|
||||||
|
|
||||||
|
static CRDT_EVENT_TX: OnceLock<broadcast::Sender<CrdtEvent>> = OnceLock::new();
|
||||||
|
|
||||||
|
// ── CRDT document types ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[add_crdt_fields]
|
||||||
|
#[derive(Clone, CrdtNode, Debug)]
|
||||||
|
pub struct PipelineDoc {
|
||||||
|
pub items: ListCrdt<PipelineItemCrdt>,
|
||||||
|
pub nodes: ListCrdt<NodePresenceCrdt>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[add_crdt_fields]
|
||||||
|
#[derive(Clone, CrdtNode, Debug)]
|
||||||
|
pub struct PipelineItemCrdt {
|
||||||
|
pub story_id: LwwRegisterCrdt<String>,
|
||||||
|
pub stage: LwwRegisterCrdt<String>,
|
||||||
|
pub name: LwwRegisterCrdt<String>,
|
||||||
|
pub agent: LwwRegisterCrdt<String>,
|
||||||
|
pub retry_count: LwwRegisterCrdt<f64>,
|
||||||
|
pub blocked: LwwRegisterCrdt<bool>,
|
||||||
|
pub depends_on: LwwRegisterCrdt<String>,
|
||||||
|
/// Node ID (hex-encoded Ed25519 pubkey) of the node that claimed this item.
|
||||||
|
/// Used for distributed work claiming — the LWW register resolves conflicts
|
||||||
|
/// deterministically so all nodes converge on the same claimer.
|
||||||
|
pub claimed_by: LwwRegisterCrdt<String>,
|
||||||
|
/// Unix timestamp (seconds) when the claim was written.
|
||||||
|
/// Used for timeout-based reclaim: if a node crashes, other nodes can
|
||||||
|
/// reclaim the item after the timeout expires.
|
||||||
|
pub claimed_at: LwwRegisterCrdt<f64>,
|
||||||
|
/// Unix timestamp (seconds) when the item was merged to master.
|
||||||
|
/// Written once when the item transitions to `5_done`. Used by the
|
||||||
|
/// sweep loop to determine when to promote to `6_archived`.
|
||||||
|
pub merged_at: LwwRegisterCrdt<f64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// CRDT node that holds a single peer's presence entry.
|
||||||
|
#[add_crdt_fields]
|
||||||
|
#[derive(Clone, CrdtNode, Debug)]
|
||||||
|
pub struct NodePresenceCrdt {
|
||||||
|
/// Hex-encoded Ed25519 public key — stable identity across restarts.
|
||||||
|
pub node_id: LwwRegisterCrdt<String>,
|
||||||
|
/// WebSocket URL this peer advertises, e.g. `ws://192.168.1.10:3001/crdt-sync`.
|
||||||
|
pub address: LwwRegisterCrdt<String>,
|
||||||
|
/// Unix timestamp (seconds) of the last heartbeat written by this node.
|
||||||
|
pub last_seen: LwwRegisterCrdt<f64>,
|
||||||
|
/// `false` once a stale-detection pass has tombstoned this node.
|
||||||
|
pub alive: LwwRegisterCrdt<bool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Read-side view types ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// A snapshot of a single pipeline item derived from the CRDT document.
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct PipelineItemView {
|
||||||
|
pub story_id: String,
|
||||||
|
pub stage: String,
|
||||||
|
pub name: Option<String>,
|
||||||
|
pub agent: Option<String>,
|
||||||
|
pub retry_count: Option<i64>,
|
||||||
|
pub blocked: Option<bool>,
|
||||||
|
pub depends_on: Option<Vec<u32>>,
|
||||||
|
/// Node ID of the node that claimed this item (hex-encoded Ed25519 pubkey).
|
||||||
|
pub claimed_by: Option<String>,
|
||||||
|
/// Unix timestamp when the item was claimed.
|
||||||
|
pub claimed_at: Option<f64>,
|
||||||
|
/// Unix timestamp (seconds) when the item was merged to master.
|
||||||
|
/// `None` for items that were never in `5_done` or for legacy items.
|
||||||
|
pub merged_at: Option<f64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A snapshot of a single node presence entry derived from the CRDT document.
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct NodePresenceView {
|
||||||
|
pub node_id: String,
|
||||||
|
pub address: String,
|
||||||
|
/// Unix timestamp (seconds).
|
||||||
|
pub last_seen: f64,
|
||||||
|
pub alive: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use super::super::state::init_for_test;
|
||||||
|
use super::super::write::write_item;
|
||||||
|
use bft_json_crdt::json_crdt::OpState;
|
||||||
|
use bft_json_crdt::keypair::make_keypair;
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use super::super::state::emit_event;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn crdt_doc_insert_and_view() {
|
||||||
|
let kp = make_keypair();
|
||||||
|
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
|
||||||
|
let item_json: JsonValue = json!({
|
||||||
|
"story_id": "10_story_test",
|
||||||
|
"stage": "2_current",
|
||||||
|
"name": "Test Story",
|
||||||
|
"agent": "coder-opus",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
|
||||||
|
let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
|
||||||
|
assert_eq!(crdt.apply(op), OpState::Ok);
|
||||||
|
|
||||||
|
let view = crdt.doc.items.view();
|
||||||
|
assert_eq!(view.len(), 1);
|
||||||
|
|
||||||
|
let item = &crdt.doc.items[0];
|
||||||
|
assert_eq!(
|
||||||
|
item.story_id.view(),
|
||||||
|
JsonValue::String("10_story_test".to_string())
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
item.stage.view(),
|
||||||
|
JsonValue::String("2_current".to_string())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn crdt_doc_update_stage() {
|
||||||
|
let kp = make_keypair();
|
||||||
|
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
|
||||||
|
let item_json: JsonValue = json!({
|
||||||
|
"story_id": "20_story_move",
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": "Move Me",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
|
||||||
|
let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
|
||||||
|
crdt.apply(insert_op);
|
||||||
|
|
||||||
|
// Update stage
|
||||||
|
let stage_op = crdt.doc.items[0]
|
||||||
|
.stage
|
||||||
|
.set("2_current".to_string())
|
||||||
|
.sign(&kp);
|
||||||
|
crdt.apply(stage_op);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
crdt.doc.items[0].stage.view(),
|
||||||
|
JsonValue::String("2_current".to_string())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn crdt_event_has_expected_fields() {
|
||||||
|
let evt = CrdtEvent {
|
||||||
|
story_id: "42_story_foo".to_string(),
|
||||||
|
from_stage: Some("1_backlog".to_string()),
|
||||||
|
to_stage: "2_current".to_string(),
|
||||||
|
name: Some("Foo Feature".to_string()),
|
||||||
|
};
|
||||||
|
assert_eq!(evt.story_id, "42_story_foo");
|
||||||
|
assert_eq!(evt.from_stage.as_deref(), Some("1_backlog"));
|
||||||
|
assert_eq!(evt.to_stage, "2_current");
|
||||||
|
assert_eq!(evt.name.as_deref(), Some("Foo Feature"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn crdt_event_clone_preserves_data() {
|
||||||
|
let evt = CrdtEvent {
|
||||||
|
story_id: "10_story_bar".to_string(),
|
||||||
|
from_stage: None,
|
||||||
|
to_stage: "1_backlog".to_string(),
|
||||||
|
name: None,
|
||||||
|
};
|
||||||
|
let cloned = evt.clone();
|
||||||
|
assert_eq!(cloned.story_id, "10_story_bar");
|
||||||
|
assert!(cloned.from_stage.is_none());
|
||||||
|
assert!(cloned.name.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn emit_event_is_noop_when_channel_not_initialised() {
|
||||||
|
// Before CRDT_EVENT_TX is set, emit_event should not panic.
|
||||||
|
// This test verifies the guard clause works. In test binaries the
|
||||||
|
// OnceLock may already be set by another test, so we just verify
|
||||||
|
// the function doesn't panic regardless.
|
||||||
|
emit_event(CrdtEvent {
|
||||||
|
story_id: "99_story_noop".to_string(),
|
||||||
|
from_stage: None,
|
||||||
|
to_stage: "1_backlog".to_string(),
|
||||||
|
name: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn crdt_event_broadcast_channel_round_trip() {
|
||||||
|
let (tx, mut rx) = broadcast::channel::<CrdtEvent>(16);
|
||||||
|
let evt = CrdtEvent {
|
||||||
|
story_id: "70_story_broadcast".to_string(),
|
||||||
|
from_stage: Some("1_backlog".to_string()),
|
||||||
|
to_stage: "2_current".to_string(),
|
||||||
|
name: Some("Broadcast Test".to_string()),
|
||||||
|
};
|
||||||
|
tx.send(evt).unwrap();
|
||||||
|
|
||||||
|
let received = rx.try_recv().unwrap();
|
||||||
|
assert_eq!(received.story_id, "70_story_broadcast");
|
||||||
|
assert_eq!(received.from_stage.as_deref(), Some("1_backlog"));
|
||||||
|
assert_eq!(received.to_stage, "2_current");
|
||||||
|
assert_eq!(received.name.as_deref(), Some("Broadcast Test"));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,280 @@
|
|||||||
|
//! High-level write API for pipeline items.
|
||||||
|
|
||||||
|
use bft_json_crdt::json_crdt::*;
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use bft_json_crdt::lww_crdt::LwwRegisterCrdt;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use super::state::{apply_and_persist, emit_event, get_crdt, rebuild_index};
|
||||||
|
use super::types::{CrdtEvent, PipelineDoc, PipelineItemCrdt};
|
||||||
|
use crate::slog;
|
||||||
|
|
||||||
|
/// Write a pipeline item state through CRDT operations.
|
||||||
|
///
|
||||||
|
/// If the item exists, updates its registers. If not, inserts a new item
|
||||||
|
/// into the list. All ops are signed and persisted to SQLite.
|
||||||
|
///
|
||||||
|
/// When the stage changes (or a new item is created), a [`CrdtEvent`] is
|
||||||
|
/// broadcast so subscribers can react to the transition.
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
pub fn write_item(
|
||||||
|
story_id: &str,
|
||||||
|
stage: &str,
|
||||||
|
name: Option<&str>,
|
||||||
|
agent: Option<&str>,
|
||||||
|
retry_count: Option<i64>,
|
||||||
|
blocked: Option<bool>,
|
||||||
|
depends_on: Option<&str>,
|
||||||
|
claimed_by: Option<&str>,
|
||||||
|
claimed_at: Option<f64>,
|
||||||
|
merged_at: Option<f64>,
|
||||||
|
) {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(&idx) = state.index.get(story_id) {
|
||||||
|
// Capture the old stage before updating so we can detect transitions.
|
||||||
|
let old_stage = match state.crdt.doc.items[idx].stage.view() {
|
||||||
|
JsonValue::String(s) => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Update existing item registers.
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.items[idx].stage.set(stage.to_string())
|
||||||
|
});
|
||||||
|
|
||||||
|
if let Some(n) = name {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.items[idx].name.set(n.to_string())
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if let Some(a) = agent {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.items[idx].agent.set(a.to_string())
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if let Some(rc) = retry_count {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.items[idx].retry_count.set(rc as f64)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if let Some(b) = blocked {
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].blocked.set(b));
|
||||||
|
}
|
||||||
|
if let Some(d) = depends_on {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.items[idx].depends_on.set(d.to_string())
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if let Some(cb) = claimed_by {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.items[idx].claimed_by.set(cb.to_string())
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if let Some(ca) = claimed_at {
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(ca));
|
||||||
|
}
|
||||||
|
if let Some(ma) = merged_at {
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Broadcast a CrdtEvent if the stage actually changed.
|
||||||
|
let stage_changed = old_stage.as_deref() != Some(stage);
|
||||||
|
if stage_changed {
|
||||||
|
// Read the current name from the CRDT document for the event.
|
||||||
|
let current_name = match state.crdt.doc.items[idx].name.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
emit_event(CrdtEvent {
|
||||||
|
story_id: story_id.to_string(),
|
||||||
|
from_stage: old_stage,
|
||||||
|
to_stage: stage.to_string(),
|
||||||
|
name: current_name,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Insert new item.
|
||||||
|
let item_json: JsonValue = json!({
|
||||||
|
"story_id": story_id,
|
||||||
|
"stage": stage,
|
||||||
|
"name": name.unwrap_or(""),
|
||||||
|
"agent": agent.unwrap_or(""),
|
||||||
|
"retry_count": retry_count.unwrap_or(0) as f64,
|
||||||
|
"blocked": blocked.unwrap_or(false),
|
||||||
|
"depends_on": depends_on.unwrap_or(""),
|
||||||
|
"claimed_by": claimed_by.unwrap_or(""),
|
||||||
|
"claimed_at": claimed_at.unwrap_or(0.0),
|
||||||
|
"merged_at": merged_at.unwrap_or(0.0),
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json));
|
||||||
|
|
||||||
|
// Rebuild index after insertion (indices may shift).
|
||||||
|
state.index = rebuild_index(&state.crdt);
|
||||||
|
|
||||||
|
// Broadcast a CrdtEvent for the new item.
|
||||||
|
emit_event(CrdtEvent {
|
||||||
|
story_id: story_id.to_string(),
|
||||||
|
from_stage: None,
|
||||||
|
to_stage: stage.to_string(),
|
||||||
|
name: name.map(String::from),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use super::super::state::init_for_test;
|
||||||
|
use super::super::read::read_item;
|
||||||
|
use bft_json_crdt::json_crdt::OpState;
|
||||||
|
use bft_json_crdt::keypair::make_keypair;
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use sqlx::SqlitePool;
|
||||||
|
use sqlx::sqlite::SqliteConnectOptions;
|
||||||
|
use super::super::hex;
|
||||||
|
use super::super::state::rebuild_index;
|
||||||
|
use super::super::read::extract_item_view;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() {
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let db_path = tmp.path().join("bug511.db");
|
||||||
|
|
||||||
|
let options = SqliteConnectOptions::new()
|
||||||
|
.filename(&db_path)
|
||||||
|
.create_if_missing(true);
|
||||||
|
let pool = SqlitePool::connect_with(options).await.unwrap();
|
||||||
|
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
||||||
|
|
||||||
|
let kp = make_keypair();
|
||||||
|
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
|
||||||
|
// Insert 5 dummy items to advance items.our_seq to 5.
|
||||||
|
for i in 0..5u32 {
|
||||||
|
let sid = format!("{}_story_warmup", i);
|
||||||
|
let item: JsonValue = json!({
|
||||||
|
"story_id": sid,
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": "",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
|
||||||
|
crdt.apply(op.clone());
|
||||||
|
// We don't persist these to the DB — they are pre-history.
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now insert the real item. items.our_seq was 5, so this op gets seq=6.
|
||||||
|
let target_item: JsonValue = json!({
|
||||||
|
"story_id": "511_story_target",
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": "Bug 511 target",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp);
|
||||||
|
crdt.apply(insert_op.clone());
|
||||||
|
// insert_op.inner.seq == 6
|
||||||
|
|
||||||
|
// Now update the stage. The stage LwwRegisterCrdt for this item starts
|
||||||
|
// at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6.
|
||||||
|
let idx = rebuild_index(&crdt)["511_story_target"];
|
||||||
|
let stage_op = crdt.doc.items[idx]
|
||||||
|
.stage
|
||||||
|
.set("2_current".to_string())
|
||||||
|
.sign(&kp);
|
||||||
|
crdt.apply(stage_op.clone());
|
||||||
|
// stage_op.inner.seq == 1
|
||||||
|
|
||||||
|
// Persist BOTH ops in causal order (insert first, update second).
|
||||||
|
// This means insert_op gets rowid < stage_op rowid.
|
||||||
|
let now = chrono::Utc::now().to_rfc3339();
|
||||||
|
for op in [&insert_op, &stage_op] {
|
||||||
|
let op_json = serde_json::to_string(op).unwrap();
|
||||||
|
let op_id = hex::encode(&op.id());
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)",
|
||||||
|
)
|
||||||
|
.bind(&op_id)
|
||||||
|
.bind(op.inner.seq as i64)
|
||||||
|
.bind(&op_json)
|
||||||
|
.bind(&now)
|
||||||
|
.execute(&pool)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Replay by rowid ASC (the fix). The insert must come before the field
|
||||||
|
// update regardless of their field-level seq values.
|
||||||
|
let rows: Vec<(String,)> =
|
||||||
|
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
|
||||||
|
.fetch_all(&pool)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
for (json_str,) in &rows {
|
||||||
|
let op: SignedOp = serde_json::from_str(json_str).unwrap();
|
||||||
|
crdt2.apply(op);
|
||||||
|
}
|
||||||
|
|
||||||
|
// The item must be in the CRDT and must reflect the stage update.
|
||||||
|
let index2 = rebuild_index(&crdt2);
|
||||||
|
assert!(
|
||||||
|
index2.contains_key("511_story_target"),
|
||||||
|
"item not found after rowid-order replay"
|
||||||
|
);
|
||||||
|
let idx2 = index2["511_story_target"];
|
||||||
|
let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
view.stage, "2_current",
|
||||||
|
"stage field update lost during replay (bug 511 regression)"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Confirm the bug is reproducible by replaying seq ASC instead.
|
||||||
|
// With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6),
|
||||||
|
// fails ErrPathMismatch, and the item ends up at "1_backlog".
|
||||||
|
let rows_wrong_order: Vec<(String,)> =
|
||||||
|
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC")
|
||||||
|
.fetch_all(&pool)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut crdt3 = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
for (json_str,) in &rows_wrong_order {
|
||||||
|
let op: SignedOp = serde_json::from_str(json_str).unwrap();
|
||||||
|
crdt3.apply(op);
|
||||||
|
}
|
||||||
|
|
||||||
|
let index3 = rebuild_index(&crdt3);
|
||||||
|
// With seq ASC replay, the item is created (insert_op eventually runs)
|
||||||
|
// but the stage update is lost (it ran before the item existed).
|
||||||
|
if let Some(idx3) = index3.get("511_story_target") {
|
||||||
|
let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap();
|
||||||
|
// The bug: stage is still "1_backlog" because the update was dropped.
|
||||||
|
assert_eq!(
|
||||||
|
view3.stage, "1_backlog",
|
||||||
|
"expected seq-ASC replay to exhibit the bug (update lost)"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user