Files
huskies/server/src/crdt_state.rs
T

1398 lines
48 KiB
Rust
Raw Normal View History

/// CRDT state layer for pipeline state, backed by SQLite.
///
/// The CRDT document is the primary source of truth for pipeline item
/// metadata (stage, name, agent, etc.). CRDT ops are persisted to SQLite so
/// state survives restarts. The filesystem `.huskies/work/` directories are
/// still updated as a secondary output for backwards compatibility.
///
/// Stage transitions detected by `write_item()` are broadcast as [`CrdtEvent`]s
/// so subscribers (auto-assign, WebSocket, notifications) can react without
/// polling the filesystem.
use std::collections::HashMap;
use std::sync::{Mutex, OnceLock};
use bft_json_crdt::json_crdt::*;
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::list_crdt::ListCrdt;
use bft_json_crdt::lww_crdt::LwwRegisterCrdt;
use bft_json_crdt::op::ROOT_ID;
use fastcrypto::ed25519::Ed25519KeyPair;
use fastcrypto::traits::ToFromBytes;
use serde_json::json;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use std::path::Path;
use tokio::sync::{broadcast, mpsc};
use crate::slog;
// ── CRDT events ─────────────────────────────────────────────────────
/// An event emitted when a pipeline item's stage changes in the CRDT document.
#[derive(Clone, Debug)]
pub struct CrdtEvent {
/// Work item ID (e.g. `"42_story_my_feature"`).
pub story_id: String,
/// The stage the item was in before this transition, or `None` for new items.
pub from_stage: Option<String>,
/// The stage the item is now in.
pub to_stage: String,
/// Human-readable story name from the CRDT document.
pub name: Option<String>,
}
/// Subscribe to CRDT state transition events.
///
/// Returns `None` if the CRDT layer has not been initialised yet.
pub fn subscribe() -> Option<broadcast::Receiver<CrdtEvent>> {
CRDT_EVENT_TX.get().map(|tx| tx.subscribe())
}
static CRDT_EVENT_TX: OnceLock<broadcast::Sender<CrdtEvent>> = OnceLock::new();
// ── Sync broadcast (outgoing ops to peers) ──────────────────────────
static SYNC_TX: OnceLock<broadcast::Sender<SignedOp>> = OnceLock::new();
/// Subscribe to locally-created CRDT ops for sync replication.
///
/// Each `SignedOp` broadcast here was created by *this* node and should be
/// forwarded to connected peers. Returns `None` before `init()`.
pub fn subscribe_ops() -> Option<broadcast::Receiver<SignedOp>> {
SYNC_TX.get().map(|tx| tx.subscribe())
}
/// Return all persisted `SignedOp`s in causal order (oldest first).
///
/// Used during initial sync handshake so a newly-connected peer can
/// reconstruct the full CRDT state. Returns `None` before `init()`.
pub fn all_ops_json() -> Option<Vec<String>> {
ALL_OPS.get().map(|m| m.lock().unwrap().clone())
}
static ALL_OPS: OnceLock<Mutex<Vec<String>>> = OnceLock::new();
// ── CRDT document types ──────────────────────────────────────────────
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Debug)]
pub struct PipelineDoc {
pub items: ListCrdt<PipelineItemCrdt>,
}
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Debug)]
pub struct PipelineItemCrdt {
pub story_id: LwwRegisterCrdt<String>,
pub stage: LwwRegisterCrdt<String>,
pub name: LwwRegisterCrdt<String>,
pub agent: LwwRegisterCrdt<String>,
pub retry_count: LwwRegisterCrdt<f64>,
pub blocked: LwwRegisterCrdt<bool>,
pub depends_on: LwwRegisterCrdt<String>,
}
// ── Read-side view types ─────────────────────────────────────────────
/// A snapshot of a single pipeline item derived from the CRDT document.
#[derive(Clone, Debug)]
pub struct PipelineItemView {
pub story_id: String,
pub stage: String,
pub name: Option<String>,
pub agent: Option<String>,
pub retry_count: Option<i64>,
pub blocked: Option<bool>,
pub depends_on: Option<Vec<u32>>,
}
// ── Internal state ───────────────────────────────────────────────────
struct CrdtState {
crdt: BaseCrdt<PipelineDoc>,
keypair: Ed25519KeyPair,
/// Maps story_id → index in the ListCrdt for O(1) lookup.
index: HashMap<String, usize>,
/// Channel sender for fire-and-forget op persistence.
persist_tx: mpsc::UnboundedSender<SignedOp>,
}
static CRDT_STATE: OnceLock<Mutex<CrdtState>> = OnceLock::new();
// ── Initialisation ───────────────────────────────────────────────────
/// Initialise the CRDT state layer.
///
/// Opens the SQLite database, loads or creates a node keypair, replays any
/// persisted ops to reconstruct state, and spawns a background persistence
/// task. Safe to call only once; subsequent calls are no-ops.
pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
if CRDT_STATE.get().is_some() {
return Ok(());
}
let options = SqliteConnectOptions::new()
.filename(db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await?;
sqlx::migrate!("./migrations").run(&pool).await?;
// Load or create the node keypair.
let keypair = load_or_create_keypair(&pool).await?;
let mut crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
// Replay persisted ops to reconstruct state.
let rows: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
.fetch_all(&pool)
.await?;
let mut all_ops_vec = Vec::with_capacity(rows.len());
for (op_json,) in &rows {
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json) {
crdt.apply(signed_op);
all_ops_vec.push(op_json.clone());
} else {
slog!("[crdt] Warning: failed to deserialize stored op");
}
}
let _ = ALL_OPS.set(Mutex::new(all_ops_vec));
// Build the index from the reconstructed state.
let index = rebuild_index(&crdt);
slog!(
"[crdt] Initialised: {} ops replayed, {} items indexed",
rows.len(),
index.len()
);
// Spawn background persistence task.
let (persist_tx, mut persist_rx) = mpsc::unbounded_channel::<SignedOp>();
tokio::spawn(async move {
while let Some(op) = persist_rx.recv().await {
let op_json = match serde_json::to_string(&op) {
Ok(j) => j,
Err(e) => {
slog!("[crdt] Failed to serialize op: {e}");
continue;
}
};
let op_id = hex::encode(&op.id());
let seq = op.inner.seq as i64;
let now = chrono::Utc::now().to_rfc3339();
let result = sqlx::query(
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \
VALUES (?1, ?2, ?3, ?4) \
ON CONFLICT(op_id) DO NOTHING",
)
.bind(&op_id)
.bind(seq)
.bind(&op_json)
.bind(&now)
.execute(&pool)
.await;
if let Err(e) = result {
slog!("[crdt] Failed to persist op {}: {e}", &op_id[..12]);
}
}
});
let state = CrdtState {
crdt,
keypair,
index,
persist_tx,
};
let _ = CRDT_STATE.set(Mutex::new(state));
// Initialise the CRDT event broadcast channel.
let (event_tx, _) = broadcast::channel::<CrdtEvent>(256);
let _ = CRDT_EVENT_TX.set(event_tx);
// Initialise the sync broadcast channel for outgoing ops.
let (sync_tx, _) = broadcast::channel::<SignedOp>(1024);
let _ = SYNC_TX.set(sync_tx);
Ok(())
}
/// Load or create the Ed25519 keypair used by this node.
async fn load_or_create_keypair(pool: &SqlitePool) -> Result<Ed25519KeyPair, sqlx::Error> {
let row: Option<(Vec<u8>,)> =
sqlx::query_as("SELECT seed FROM crdt_node_identity WHERE id = 1")
.fetch_optional(pool)
.await?;
if let Some((seed,)) = row {
// Reconstruct from stored seed. The seed is the 32-byte private key.
if let Ok(kp) = Ed25519KeyPair::from_bytes(&seed) {
return Ok(kp);
}
slog!("[crdt] Stored keypair invalid, regenerating");
}
let kp = make_keypair();
let seed = kp.as_bytes().to_vec();
sqlx::query("INSERT INTO crdt_node_identity (id, seed) VALUES (1, ?1) ON CONFLICT(id) DO UPDATE SET seed = excluded.seed")
.bind(&seed)
.execute(pool)
.await?;
Ok(kp)
}
/// Rebuild the story_id → list index mapping from the current CRDT state.
fn rebuild_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
let mut map = HashMap::new();
for (i, item) in crdt.doc.items.iter().enumerate() {
if let JsonValue::String(ref sid) = item.story_id.view() {
map.insert(sid.clone(), i);
}
}
map
}
// ── Write path ───────────────────────────────────────────────────────
/// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the
/// persistence channel. The closure receives `&mut CrdtState` so it can
/// mutably access the CRDT document, while `sign` only needs `&keypair`.
fn apply_and_persist<F>(state: &mut CrdtState, op_fn: F)
where
F: FnOnce(&mut CrdtState) -> bft_json_crdt::op::Op<JsonValue>,
{
let raw_op = op_fn(state);
let signed = raw_op.sign(&state.keypair);
state.crdt.apply(signed.clone());
let _ = state.persist_tx.send(signed.clone());
// Track in ALL_OPS and broadcast to sync peers.
if let Ok(json) = serde_json::to_string(&signed)
&& let Some(all) = ALL_OPS.get()
&& let Ok(mut v) = all.lock()
{
v.push(json);
}
if let Some(tx) = SYNC_TX.get() {
let _ = tx.send(signed);
}
}
/// Write a pipeline item state through CRDT operations.
///
/// If the item exists, updates its registers. If not, inserts a new item
/// into the list. All ops are signed and persisted to SQLite.
///
/// When the stage changes (or a new item is created), a [`CrdtEvent`] is
/// broadcast so subscribers can react to the transition.
pub fn write_item(
story_id: &str,
stage: &str,
name: Option<&str>,
agent: Option<&str>,
retry_count: Option<i64>,
blocked: Option<bool>,
depends_on: Option<&str>,
) {
let Some(state_mutex) = CRDT_STATE.get() else {
return;
};
let Ok(mut state) = state_mutex.lock() else {
return;
};
if let Some(&idx) = state.index.get(story_id) {
// Capture the old stage before updating so we can detect transitions.
let old_stage = match state.crdt.doc.items[idx].stage.view() {
JsonValue::String(s) => Some(s),
_ => None,
};
// Update existing item registers.
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].stage.set(stage.to_string())
});
if let Some(n) = name {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].name.set(n.to_string())
});
}
if let Some(a) = agent {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].agent.set(a.to_string())
});
}
if let Some(rc) = retry_count {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].retry_count.set(rc as f64)
});
}
if let Some(b) = blocked {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].blocked.set(b)
});
}
if let Some(d) = depends_on {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].depends_on.set(d.to_string())
});
}
// Broadcast a CrdtEvent if the stage actually changed.
let stage_changed = old_stage.as_deref() != Some(stage);
if stage_changed {
// Read the current name from the CRDT document for the event.
let current_name = match state.crdt.doc.items[idx].name.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
emit_event(CrdtEvent {
story_id: story_id.to_string(),
from_stage: old_stage,
to_stage: stage.to_string(),
name: current_name,
});
}
} else {
// Insert new item.
let item_json: JsonValue = json!({
"story_id": story_id,
"stage": stage,
"name": name.unwrap_or(""),
"agent": agent.unwrap_or(""),
"retry_count": retry_count.unwrap_or(0) as f64,
"blocked": blocked.unwrap_or(false),
"depends_on": depends_on.unwrap_or(""),
})
.into();
apply_and_persist(&mut state, |s| {
s.crdt.doc.items.insert(ROOT_ID, item_json)
});
// Rebuild index after insertion (indices may shift).
state.index = rebuild_index(&state.crdt);
// Broadcast a CrdtEvent for the new item.
emit_event(CrdtEvent {
story_id: story_id.to_string(),
from_stage: None,
to_stage: stage.to_string(),
name: name.map(String::from),
});
}
}
/// Broadcast a CRDT event to all subscribers.
fn emit_event(event: CrdtEvent) {
if let Some(tx) = CRDT_EVENT_TX.get() {
let _ = tx.send(event);
}
}
// ── Remote op ingestion (from sync peers) ───────────────────────────
/// Apply a `SignedOp` received from a remote peer.
///
/// The op is validated, applied to the local CRDT, persisted to SQLite,
/// and any resulting stage transitions are broadcast as [`CrdtEvent`]s.
/// Unlike `apply_and_persist`, this does **not** re-broadcast the op on
/// the sync channel (to avoid infinite echo loops).
///
/// Returns `true` if the op was new and applied, `false` if it was a
/// duplicate or failed validation.
pub fn apply_remote_op(op: SignedOp) -> bool {
let Some(state_mutex) = CRDT_STATE.get() else {
return false;
};
let Ok(mut state) = state_mutex.lock() else {
return false;
};
// Snapshot stage state before applying so we can detect transitions.
let pre_stages: HashMap<String, String> = state
.index
.iter()
.filter_map(|(sid, &idx)| {
match state.crdt.doc.items[idx].stage.view() {
JsonValue::String(s) => Some((sid.clone(), s)),
_ => None,
}
})
.collect();
let result = state.crdt.apply(op.clone());
if result != bft_json_crdt::json_crdt::OpState::Ok
&& result != bft_json_crdt::json_crdt::OpState::MissingCausalDependencies
{
return false;
}
// Persist the op (fire-and-forget).
let _ = state.persist_tx.send(op.clone());
// Track in ALL_OPS.
if let Ok(json) = serde_json::to_string(&op)
&& let Some(all) = ALL_OPS.get()
&& let Ok(mut v) = all.lock()
{
v.push(json);
}
// Rebuild index (new items may have been inserted).
state.index = rebuild_index(&state.crdt);
// Detect and broadcast stage transitions.
for (sid, &idx) in &state.index {
let new_stage = match state.crdt.doc.items[idx].stage.view() {
JsonValue::String(s) => s,
_ => continue,
};
let old_stage = pre_stages.get(sid).cloned();
let changed = old_stage.as_deref() != Some(&new_stage);
if changed {
let name = match state.crdt.doc.items[idx].name.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
emit_event(CrdtEvent {
story_id: sid.clone(),
from_stage: old_stage,
to_stage: new_stage,
name,
});
}
}
true
}
// ── Debug dump ───────────────────────────────────────────────────────
/// A raw dump of a single CRDT list entry, including deleted items.
///
/// Use `content_index` (hex of the list insert `OpId`) to cross-reference
/// with rows in the `crdt_ops` SQLite table.
pub struct CrdtItemDump {
pub story_id: Option<String>,
pub stage: Option<String>,
pub name: Option<String>,
pub agent: Option<String>,
pub retry_count: Option<i64>,
pub blocked: Option<bool>,
pub depends_on: Option<Vec<u32>>,
/// Hex-encoded OpId of the list insert op — cross-reference with `crdt_ops`.
pub content_index: String,
pub is_deleted: bool,
}
/// Top-level debug dump of the in-memory CRDT state.
pub struct CrdtStateDump {
pub in_memory_state_loaded: bool,
/// Count of non-deleted items with a valid story_id and stage.
pub total_items: usize,
/// Total list-level ops seen (excludes root sentinel).
pub total_ops_in_list: usize,
/// Highest Lamport sequence number seen across all list-level ops.
pub max_seq_in_list: u64,
/// Count of ops in the ALL_OPS journal (persisted ops replayed at startup).
pub persisted_ops_count: usize,
pub items: Vec<CrdtItemDump>,
}
/// Dump the raw in-memory CRDT state for debugging.
///
/// Unlike [`read_all_items`] this includes tombstoned (deleted) entries and
/// exposes internal op metadata (content_index, seq). Pass a `story_id`
/// filter to restrict the output to a single item.
///
/// **This is a debug tool.** For normal pipeline introspection use
/// [`read_all_items`] or the `get_pipeline_status` MCP tool instead.
pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump {
let in_memory_state_loaded = CRDT_STATE.get().is_some();
let persisted_ops_count = ALL_OPS
.get()
.and_then(|m| m.lock().ok().map(|v| v.len()))
.unwrap_or(0);
let Some(state_mutex) = CRDT_STATE.get() else {
return CrdtStateDump {
in_memory_state_loaded,
total_items: 0,
total_ops_in_list: 0,
max_seq_in_list: 0,
persisted_ops_count,
items: Vec::new(),
};
};
let Ok(state) = state_mutex.lock() else {
return CrdtStateDump {
in_memory_state_loaded,
total_items: 0,
total_ops_in_list: 0,
max_seq_in_list: 0,
persisted_ops_count,
items: Vec::new(),
};
};
let total_items = state.crdt.doc.items.iter().count();
let max_seq_in_list = state
.crdt
.doc
.items
.ops
.iter()
.map(|op| op.seq)
.max()
.unwrap_or(0);
// Subtract 1 for the root sentinel.
let total_ops_in_list = state.crdt.doc.items.ops.len().saturating_sub(1);
let mut items = Vec::new();
for op in &state.crdt.doc.items.ops {
// Skip root sentinel (id == [0u8; 32]).
if op.id == ROOT_ID {
continue;
}
let Some(ref item_crdt) = op.content else {
// No content — skip (orphaned slot, should not happen in normal use).
continue;
};
let story_id = match item_crdt.story_id.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
// Apply story_id filter before doing any further work.
if let Some(filter) = story_id_filter
&& story_id.as_deref() != Some(filter)
{
continue;
}
let stage = match item_crdt.stage.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let name = match item_crdt.name.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let agent = match item_crdt.agent.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let retry_count = match item_crdt.retry_count.view() {
JsonValue::Number(n) if n > 0.0 => Some(n as i64),
_ => None,
};
let blocked = match item_crdt.blocked.view() {
JsonValue::Bool(b) => Some(b),
_ => None,
};
let depends_on = match item_crdt.depends_on.view() {
JsonValue::String(s) if !s.is_empty() => serde_json::from_str::<Vec<u32>>(&s).ok(),
_ => None,
};
let content_index = op.id.iter().map(|b| format!("{b:02x}")).collect::<String>();
items.push(CrdtItemDump {
story_id,
stage,
name,
agent,
retry_count,
blocked,
depends_on,
content_index,
is_deleted: op.is_deleted,
});
}
CrdtStateDump {
in_memory_state_loaded,
total_items,
total_ops_in_list,
max_seq_in_list,
persisted_ops_count,
items,
}
}
// ── Read path ────────────────────────────────────────────────────────
/// Read the full pipeline state from the CRDT document.
///
/// Returns items grouped by stage, or `None` if the CRDT layer is not
/// initialised.
pub fn read_all_items() -> Option<Vec<PipelineItemView>> {
let state_mutex = CRDT_STATE.get()?;
let state = state_mutex.lock().ok()?;
let mut items = Vec::new();
for item_crdt in state.crdt.doc.items.iter() {
if let Some(view) = extract_item_view(item_crdt) {
items.push(view);
}
}
Some(items)
}
/// Read a single pipeline item by story_id.
pub fn read_item(story_id: &str) -> Option<PipelineItemView> {
let state_mutex = CRDT_STATE.get()?;
let state = state_mutex.lock().ok()?;
let &idx = state.index.get(story_id)?;
extract_item_view(&state.crdt.doc.items[idx])
}
/// Mark a story as deleted in the in-memory CRDT and persist a tombstone op.
///
/// This is the eviction primitive for story 521 — it lets external callers
/// (e.g. the `purge_story` MCP tool, or operator scripts during incident
/// response) clear an item from the running server's in-memory state
/// without needing a full process restart.
///
/// Specifically:
/// 1. Looks up the item's CRDT `OpId` via the visible-index map.
/// 2. Constructs a delete op via the bft-json-crdt list `delete()` primitive.
/// 3. Signs it with the local node's keypair and applies it to the in-memory
/// CRDT (marking the item `is_deleted = true` so subsequent
/// `read_all_items` / `read_item` calls skip it).
/// 4. Persists the signed delete op to `crdt_ops` via the existing
/// `apply_and_persist` channel — so the eviction survives a restart.
/// 5. Rebuilds the `story_id → visible_index` map (visible indices shift
/// when an item is marked deleted).
/// 6. Drops the in-memory content-store entry for the story so the cached
/// body doesn't outlive the CRDT entry.
///
/// Returns `Ok(())` if the item was found and a tombstone op was queued,
/// or an `Err` if the CRDT layer isn't initialised or the story_id is
/// unknown to the in-memory state.
pub fn evict_item(story_id: &str) -> Result<(), String> {
let state_mutex = CRDT_STATE
.get()
.ok_or_else(|| "CRDT layer not initialised".to_string())?;
let mut state = state_mutex
.lock()
.map_err(|e| format!("CRDT lock poisoned: {e}"))?;
let idx = state
.index
.get(story_id)
.copied()
.ok_or_else(|| format!("Story '{story_id}' not found in in-memory CRDT"))?;
// Resolve the item's OpId before the closure (the closure will mutably
// borrow `state`, so we can't access `state.crdt.doc.items` from inside).
let item_id = state
.crdt
.doc
.items
.id_at(idx)
.ok_or_else(|| format!("Item index {idx} for '{story_id}' did not resolve to an OpId"))?;
// Write the delete op via the existing apply_and_persist machinery.
// This signs the op, applies it to the in-memory CRDT (marking the item
// is_deleted), and sends it to the persistence task.
apply_and_persist(&mut state, |s| s.crdt.doc.items.delete(item_id));
// Rebuild the story_id → visible_index map; the deleted item is no
// longer counted by the iter that rebuild_index uses.
state.index = rebuild_index(&state.crdt);
// Drop the content-store entry so the cached body doesn't outlive the
// CRDT entry. (Bug 521 follow-up: when CONTENT_STORE becomes a true
// lazy cache, this explicit eviction can go away.)
crate::db::delete_content(story_id);
Ok(())
}
/// Extract a `PipelineItemView` from a `PipelineItemCrdt`.
fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemView> {
let story_id = match item.story_id.view() {
JsonValue::String(s) if !s.is_empty() => s,
_ => return None,
};
let stage = match item.stage.view() {
JsonValue::String(s) if !s.is_empty() => s,
_ => return None,
};
let name = match item.name.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let agent = match item.agent.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let retry_count = match item.retry_count.view() {
JsonValue::Number(n) if n > 0.0 => Some(n as i64),
_ => None,
};
let blocked = match item.blocked.view() {
JsonValue::Bool(b) => Some(b),
_ => None,
};
let depends_on = match item.depends_on.view() {
JsonValue::String(s) if !s.is_empty() => {
serde_json::from_str::<Vec<u32>>(&s).ok()
}
_ => None,
};
Some(PipelineItemView {
story_id,
stage,
name,
agent,
retry_count,
blocked,
depends_on,
})
}
/// Check whether a dependency (by numeric ID prefix) is in `5_done` or `6_archived`
/// according to CRDT state.
///
/// Returns `true` if the dependency is satisfied (item found in a done stage).
/// See `dep_is_archived_crdt` to distinguish archive-satisfied from cleanly-done.
pub fn dep_is_done_crdt(dep_number: u32) -> bool {
let prefix = format!("{dep_number}_");
if let Some(items) = read_all_items() {
items.iter().any(|item| {
item.story_id.starts_with(&prefix)
&& matches!(item.stage.as_str(), "5_done" | "6_archived")
})
} else {
false
}
}
/// Check whether a dependency (by numeric ID prefix) is specifically in `6_archived`
/// according to CRDT state.
///
/// Used to detect when a dependency is satisfied via archive rather than via a clean
/// completion through `5_done`. Returns `false` when the CRDT layer is not initialised.
pub fn dep_is_archived_crdt(dep_number: u32) -> bool {
let prefix = format!("{dep_number}_");
if let Some(items) = read_all_items() {
items.iter().any(|item| {
item.story_id.starts_with(&prefix) && item.stage == "6_archived"
})
} else {
false
}
}
/// Check unmet dependencies for a story by reading its `depends_on` from the
/// CRDT document and checking each dependency against CRDT state.
///
/// Returns the list of dependency numbers that are NOT in `5_done` or `6_archived`.
pub fn check_unmet_deps_crdt(story_id: &str) -> Vec<u32> {
let item = match read_item(story_id) {
Some(i) => i,
None => return Vec::new(),
};
let deps = match item.depends_on {
Some(d) => d,
None => return Vec::new(),
};
deps.into_iter()
.filter(|&dep| !dep_is_done_crdt(dep))
.collect()
}
/// Return the list of dependency numbers from `story_id`'s `depends_on` that are
/// specifically in `6_archived` according to CRDT state.
///
/// Used to emit a warning when promotion fires because a dep is archived rather than
/// cleanly completed. Returns an empty `Vec` when no deps are archived.
pub fn check_archived_deps_crdt(story_id: &str) -> Vec<u32> {
let item = match read_item(story_id) {
Some(i) => i,
None => return Vec::new(),
};
let deps = match item.depends_on {
Some(d) => d,
None => return Vec::new(),
};
deps.into_iter()
.filter(|&dep| dep_is_archived_crdt(dep))
.collect()
}
/// Hex-encode a byte slice (no external dep needed).
mod hex {
pub fn encode(bytes: &[u8]) -> String {
bytes.iter().map(|b| format!("{b:02x}")).collect()
}
}
// ── Tests ────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use bft_json_crdt::json_crdt::OpState;
#[test]
fn crdt_doc_insert_and_view() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item_json: JsonValue = json!({
"story_id": "10_story_test",
"stage": "2_current",
"name": "Test Story",
"agent": "coder-opus",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
assert_eq!(crdt.apply(op), OpState::Ok);
let view = crdt.doc.items.view();
assert_eq!(view.len(), 1);
let item = &crdt.doc.items[0];
assert_eq!(item.story_id.view(), JsonValue::String("10_story_test".to_string()));
assert_eq!(item.stage.view(), JsonValue::String("2_current".to_string()));
}
#[test]
fn crdt_doc_update_stage() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item_json: JsonValue = json!({
"story_id": "20_story_move",
"stage": "1_backlog",
"name": "Move Me",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
crdt.apply(insert_op);
// Update stage
let stage_op = crdt.doc.items[0].stage.set("2_current".to_string()).sign(&kp);
crdt.apply(stage_op);
assert_eq!(
crdt.doc.items[0].stage.view(),
JsonValue::String("2_current".to_string())
);
}
#[test]
fn crdt_ops_replay_reconstructs_state() {
let kp = make_keypair();
let mut crdt1 = BaseCrdt::<PipelineDoc>::new(&kp);
// Build state with a series of ops.
let item_json: JsonValue = json!({
"story_id": "30_story_replay",
"stage": "1_backlog",
"name": "Replay Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op1 = crdt1.doc.items.insert(ROOT_ID, item_json).sign(&kp);
crdt1.apply(op1.clone());
let op2 = crdt1.doc.items[0].stage.set("2_current".to_string()).sign(&kp);
crdt1.apply(op2.clone());
let op3 = crdt1.doc.items[0].name.set("Updated Name".to_string()).sign(&kp);
crdt1.apply(op3.clone());
// Replay ops on a fresh CRDT.
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
crdt2.apply(op1);
crdt2.apply(op2);
crdt2.apply(op3);
assert_eq!(
crdt1.doc.items[0].stage.view(),
crdt2.doc.items[0].stage.view()
);
assert_eq!(
crdt1.doc.items[0].name.view(),
crdt2.doc.items[0].name.view()
);
}
#[test]
fn extract_item_view_parses_crdt_item() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item_json: JsonValue = json!({
"story_id": "40_story_view",
"stage": "3_qa",
"name": "View Test",
"agent": "coder-1",
"retry_count": 2.0,
"blocked": true,
"depends_on": "[10,20]",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
crdt.apply(op);
let view = extract_item_view(&crdt.doc.items[0]).unwrap();
assert_eq!(view.story_id, "40_story_view");
assert_eq!(view.stage, "3_qa");
assert_eq!(view.name.as_deref(), Some("View Test"));
assert_eq!(view.agent.as_deref(), Some("coder-1"));
assert_eq!(view.retry_count, Some(2));
assert_eq!(view.blocked, Some(true));
assert_eq!(view.depends_on, Some(vec![10, 20]));
}
#[test]
fn rebuild_index_maps_story_ids() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
for (sid, stage) in &[("10_story_a", "1_backlog"), ("20_story_b", "2_current")] {
let item: JsonValue = json!({
"story_id": sid,
"stage": stage,
"name": "",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt.apply(op);
}
let index = rebuild_index(&crdt);
assert_eq!(index.len(), 2);
assert!(index.contains_key("10_story_a"));
assert!(index.contains_key("20_story_b"));
}
#[tokio::test]
async fn init_and_write_read_roundtrip() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("crdt_test.db");
// Init directly (not via the global singleton, for test isolation).
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
let keypair = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
// Insert and update like write_item does.
let item_json: JsonValue = json!({
"story_id": "50_story_roundtrip",
"stage": "1_backlog",
"name": "Roundtrip",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&keypair);
crdt.apply(insert_op.clone());
// Persist the op.
let op_json = serde_json::to_string(&insert_op).unwrap();
let op_id = hex::encode(&insert_op.id());
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)",
)
.bind(&op_id)
.bind(insert_op.inner.seq as i64)
.bind(&op_json)
.bind(&now)
.execute(&pool)
.await
.unwrap();
// Reconstruct from DB.
let rows: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
.fetch_all(&pool)
.await
.unwrap();
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&keypair);
for (json_str,) in &rows {
let op: SignedOp = serde_json::from_str(json_str).unwrap();
crdt2.apply(op);
}
let view = extract_item_view(&crdt2.doc.items[0]).unwrap();
assert_eq!(view.story_id, "50_story_roundtrip");
assert_eq!(view.stage, "1_backlog");
assert_eq!(view.name.as_deref(), Some("Roundtrip"));
}
#[test]
fn signed_op_serialization_roundtrip() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = json!({
"story_id": "60_story_serde",
"stage": "1_backlog",
"name": "Serde Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
let json_str = serde_json::to_string(&op).unwrap();
let deserialized: SignedOp = serde_json::from_str(&json_str).unwrap();
assert_eq!(op.id(), deserialized.id());
assert_eq!(op.inner.seq, deserialized.inner.seq);
}
// ── CrdtEvent tests ─────────────────────────────────────────────────
#[test]
fn crdt_event_has_expected_fields() {
let evt = CrdtEvent {
story_id: "42_story_foo".to_string(),
from_stage: Some("1_backlog".to_string()),
to_stage: "2_current".to_string(),
name: Some("Foo Feature".to_string()),
};
assert_eq!(evt.story_id, "42_story_foo");
assert_eq!(evt.from_stage.as_deref(), Some("1_backlog"));
assert_eq!(evt.to_stage, "2_current");
assert_eq!(evt.name.as_deref(), Some("Foo Feature"));
}
#[test]
fn crdt_event_clone_preserves_data() {
let evt = CrdtEvent {
story_id: "10_story_bar".to_string(),
from_stage: None,
to_stage: "1_backlog".to_string(),
name: None,
};
let cloned = evt.clone();
assert_eq!(cloned.story_id, "10_story_bar");
assert!(cloned.from_stage.is_none());
assert!(cloned.name.is_none());
}
#[test]
fn emit_event_is_noop_when_channel_not_initialised() {
// Before CRDT_EVENT_TX is set, emit_event should not panic.
// This test verifies the guard clause works. In test binaries the
// OnceLock may already be set by another test, so we just verify
// the function doesn't panic regardless.
emit_event(CrdtEvent {
story_id: "99_story_noop".to_string(),
from_stage: None,
to_stage: "1_backlog".to_string(),
name: None,
});
}
#[test]
fn crdt_event_broadcast_channel_round_trip() {
let (tx, mut rx) = broadcast::channel::<CrdtEvent>(16);
let evt = CrdtEvent {
story_id: "70_story_broadcast".to_string(),
from_stage: Some("1_backlog".to_string()),
to_stage: "2_current".to_string(),
name: Some("Broadcast Test".to_string()),
};
tx.send(evt).unwrap();
let received = rx.try_recv().unwrap();
assert_eq!(received.story_id, "70_story_broadcast");
assert_eq!(received.from_stage.as_deref(), Some("1_backlog"));
assert_eq!(received.to_stage, "2_current");
assert_eq!(received.name.as_deref(), Some("Broadcast Test"));
}
#[test]
fn dep_is_done_crdt_returns_false_when_no_crdt_state() {
// When the global CRDT state is not initialised (or in a test environment),
// dep_is_done_crdt should return false rather than panicking.
// Note: in the test binary the global may or may not be initialised,
// but the function should never panic either way.
let _ = dep_is_done_crdt(9999);
}
#[test]
fn check_unmet_deps_crdt_returns_empty_when_item_not_found() {
// Non-existent story should return empty deps.
let result = check_unmet_deps_crdt("nonexistent_story");
assert!(result.is_empty());
}
// ── Bug 503: archived-dep visibility ─────────────────────────────────────
#[test]
fn dep_is_archived_crdt_returns_false_when_no_crdt_state() {
// When the global CRDT state is not initialised, must not panic.
let _ = dep_is_archived_crdt(9998);
}
#[test]
fn check_archived_deps_crdt_returns_empty_when_item_not_found() {
// Non-existent story should return empty archived deps.
let result = check_archived_deps_crdt("nonexistent_story_archived");
assert!(result.is_empty());
}
// ── 478: WebSocket CRDT sync layer tests ────────────────────────────────
#[test]
fn apply_remote_op_returns_false_when_not_initialised() {
// Without the global CRDT state, apply_remote_op should return false.
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = serde_json::json!({
"story_id": "80_story_remote",
"stage": "1_backlog",
"name": "Remote",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp);
// This uses the global state which may not be initialised in tests.
let _ = apply_remote_op(op);
}
#[test]
fn signed_op_survives_sync_serialization_roundtrip() {
// Verify that a SignedOp serialised to JSON and back produces
// the same op (critical for the sync wire protocol).
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = serde_json::json!({
"story_id": "90_story_wire",
"stage": "2_current",
"name": "Wire Test",
"agent": "coder",
"retry_count": 1.0,
"blocked": false,
"depends_on": "[10]",
})
.into();
let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp);
let json1 = serde_json::to_string(&op).unwrap();
let roundtripped: SignedOp = serde_json::from_str(&json1).unwrap();
let json2 = serde_json::to_string(&roundtripped).unwrap();
assert_eq!(json1, json2);
assert_eq!(op.id(), roundtripped.id());
assert_eq!(op.inner.seq, roundtripped.inner.seq);
assert_eq!(op.author(), roundtripped.author());
}
#[test]
fn sync_broadcast_channel_round_trip() {
let (tx, mut rx) = broadcast::channel::<SignedOp>(16);
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = serde_json::json!({
"story_id": "95_story_sync_bcast",
"stage": "1_backlog",
"name": "",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp);
tx.send(op.clone()).unwrap();
let received = rx.try_recv().unwrap();
assert_eq!(received.id(), op.id());
}
// ── Bug 511: CRDT lamport clock resets on restart ────────────────────────
//
// Root cause: Op::sign() always produces SignedOp with depends_on = vec![],
// so the causal dependency queue never engages during replay. Field update
// ops (seq=1,2,3 from each field's LwwRegisterCrdt counter) are replayed
// before list insert ops (seq=N from the items ListCrdt counter) when
// ordered by `seq ASC`. They fail ErrPathMismatch silently, their our_seq
// is never updated, and the next field write re-uses seq=1.
//
// Fix: replay by `rowid ASC` (SQLite insertion order) instead of `seq ASC`.
// Rowid preserves the causal order ops were originally applied in, so field
// updates always come after the item insert they reference.
#[tokio::test]
async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("bug511.db");
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
// Insert 5 dummy items to advance items.our_seq to 5.
for i in 0..5u32 {
let sid = format!("{}_story_warmup", i);
let item: JsonValue = json!({
"story_id": sid,
"stage": "1_backlog",
"name": "",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt.apply(op.clone());
// We don't persist these to the DB — they are pre-history.
}
// Now insert the real item. items.our_seq was 5, so this op gets seq=6.
let target_item: JsonValue = json!({
"story_id": "511_story_target",
"stage": "1_backlog",
"name": "Bug 511 target",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp);
crdt.apply(insert_op.clone());
// insert_op.inner.seq == 6
// Now update the stage. The stage LwwRegisterCrdt for this item starts
// at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6.
let idx = rebuild_index(&crdt)["511_story_target"];
let stage_op = crdt.doc.items[idx].stage.set("2_current".to_string()).sign(&kp);
crdt.apply(stage_op.clone());
// stage_op.inner.seq == 1
// Persist BOTH ops in causal order (insert first, update second).
// This means insert_op gets rowid < stage_op rowid.
let now = chrono::Utc::now().to_rfc3339();
for op in [&insert_op, &stage_op] {
let op_json = serde_json::to_string(op).unwrap();
let op_id = hex::encode(&op.id());
sqlx::query(
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)",
)
.bind(&op_id)
.bind(op.inner.seq as i64)
.bind(&op_json)
.bind(&now)
.execute(&pool)
.await
.unwrap();
}
// Replay by rowid ASC (the fix). The insert must come before the field
// update regardless of their field-level seq values.
let rows: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
.fetch_all(&pool)
.await
.unwrap();
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
for (json_str,) in &rows {
let op: SignedOp = serde_json::from_str(json_str).unwrap();
crdt2.apply(op);
}
// The item must be in the CRDT and must reflect the stage update.
let index2 = rebuild_index(&crdt2);
assert!(
index2.contains_key("511_story_target"),
"item not found after rowid-order replay"
);
let idx2 = index2["511_story_target"];
let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap();
assert_eq!(
view.stage, "2_current",
"stage field update lost during replay (bug 511 regression)"
);
// Confirm the bug is reproducible by replaying seq ASC instead.
// With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6),
// fails ErrPathMismatch, and the item ends up at "1_backlog".
let rows_wrong_order: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC")
.fetch_all(&pool)
.await
.unwrap();
let mut crdt3 = BaseCrdt::<PipelineDoc>::new(&kp);
for (json_str,) in &rows_wrong_order {
let op: SignedOp = serde_json::from_str(json_str).unwrap();
crdt3.apply(op);
}
let index3 = rebuild_index(&crdt3);
// With seq ASC replay, the item is created (insert_op eventually runs)
// but the stage update is lost (it ran before the item existed).
if let Some(idx3) = index3.get("511_story_target") {
let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap();
// The bug: stage is still "1_backlog" because the update was dropped.
assert_eq!(
view3.stage, "1_backlog",
"expected seq-ASC replay to exhibit the bug (update lost)"
);
}
}
}