diff --git a/server/src/crdt_state.rs b/server/src/crdt_state.rs index 317ab5eb..0a9ac38a 100644 --- a/server/src/crdt_state.rs +++ b/server/src/crdt_state.rs @@ -78,6 +78,7 @@ static ALL_OPS: OnceLock>> = OnceLock::new(); #[derive(Clone, CrdtNode, Debug)] pub struct PipelineDoc { pub items: ListCrdt, + pub nodes: ListCrdt, } #[add_crdt_fields] @@ -92,6 +93,20 @@ pub struct PipelineItemCrdt { pub depends_on: LwwRegisterCrdt, } +/// CRDT node that holds a single peer's presence entry. +#[add_crdt_fields] +#[derive(Clone, CrdtNode, Debug)] +pub struct NodePresenceCrdt { + /// Hex-encoded Ed25519 public key — stable identity across restarts. + pub node_id: LwwRegisterCrdt, + /// WebSocket URL this peer advertises, e.g. `ws://192.168.1.10:3001/crdt-sync`. + pub address: LwwRegisterCrdt, + /// Unix timestamp (seconds) of the last heartbeat written by this node. + pub last_seen: LwwRegisterCrdt, + /// `false` once a stale-detection pass has tombstoned this node. + pub alive: LwwRegisterCrdt, +} + // ── Read-side view types ───────────────────────────────────────────── /// A snapshot of a single pipeline item derived from the CRDT document. @@ -106,13 +121,25 @@ pub struct PipelineItemView { pub depends_on: Option>, } +/// A snapshot of a single node presence entry derived from the CRDT document. +#[derive(Clone, Debug)] +pub struct NodePresenceView { + pub node_id: String, + pub address: String, + /// Unix timestamp (seconds). + pub last_seen: f64, + pub alive: bool, +} + // ── Internal state ─────────────────────────────────────────────────── struct CrdtState { crdt: BaseCrdt, keypair: Ed25519KeyPair, - /// Maps story_id → index in the ListCrdt for O(1) lookup. + /// Maps story_id → index in the items ListCrdt for O(1) lookup. index: HashMap, + /// Maps node_id (hex) → index in the nodes ListCrdt for O(1) lookup. + node_index: HashMap, /// Channel sender for fire-and-forget op persistence. persist_tx: mpsc::UnboundedSender, } @@ -158,13 +185,15 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { } let _ = ALL_OPS.set(Mutex::new(all_ops_vec)); - // Build the index from the reconstructed state. + // Build the indices from the reconstructed state. let index = rebuild_index(&crdt); + let node_index = rebuild_node_index(&crdt); slog!( - "[crdt] Initialised: {} ops replayed, {} items indexed", + "[crdt] Initialised: {} ops replayed, {} items indexed, {} nodes indexed", rows.len(), - index.len() + index.len(), + node_index.len() ); // Spawn background persistence task. @@ -205,6 +234,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { crdt, keypair, index, + node_index, persist_tx, }; @@ -234,11 +264,13 @@ pub fn init_for_test() { let keypair = make_keypair(); let crdt = BaseCrdt::::new(&keypair); let index = HashMap::new(); + let node_index = HashMap::new(); let (persist_tx, _rx) = mpsc::unbounded_channel(); let state = CrdtState { crdt, keypair, index, + node_index, persist_tx, }; let _ = CRDT_STATE.set(Mutex::new(state)); @@ -285,6 +317,17 @@ fn rebuild_index(crdt: &BaseCrdt) -> HashMap { map } +/// Rebuild the node_id → nodes list index mapping from the current CRDT state. +fn rebuild_node_index(crdt: &BaseCrdt) -> HashMap { + let mut map = HashMap::new(); + for (i, node) in crdt.doc.nodes.iter().enumerate() { + if let JsonValue::String(ref nid) = node.node_id.view() { + map.insert(nid.clone(), i); + } + } + map +} + // ── Write path ─────────────────────────────────────────────────────── /// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the @@ -490,8 +533,9 @@ pub fn apply_remote_op(op: SignedOp) -> bool { v.push(json); } - // Rebuild index (new items may have been inserted). + // Rebuild indices (new items or nodes may have been inserted). state.index = rebuild_index(&state.crdt); + state.node_index = rebuild_node_index(&state.crdt); // Detect and broadcast stage transitions. for (sid, &idx) in &state.index { @@ -518,6 +562,103 @@ pub fn apply_remote_op(op: SignedOp) -> bool { true } +// ── Node presence API ──────────────────────────────────────────────── + +/// Return the hex-encoded Ed25519 public key for this node. +/// +/// Used as the stable identity written into the CRDT nodes list. +/// Returns `None` before `init()`. +pub fn our_node_id() -> Option { + let state = CRDT_STATE.get()?.lock().ok()?; + Some(hex::encode(&state.crdt.id)) +} + +/// Write or update a node presence entry in the CRDT. +/// +/// If a node with the given `node_id` already exists, only `last_seen`, +/// `alive`, and `address` are updated. If not, a new entry is inserted. +/// +/// This is the write path for both local heartbeats and tombstoning. +pub fn write_node_presence(node_id: &str, address: &str, last_seen: f64, alive: bool) { + let Some(state_mutex) = CRDT_STATE.get() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + + if let Some(&idx) = state.node_index.get(node_id) { + // Update existing entry — three separate ops so peers can merge independently. + apply_and_persist(&mut state, |s| { + s.crdt.doc.nodes[idx].last_seen.set(last_seen) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.nodes[idx].alive.set(alive) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.nodes[idx].address.set(address.to_string()) + }); + } else { + // Insert new node entry. + let node_json: JsonValue = json!({ + "node_id": node_id, + "address": address, + "last_seen": last_seen, + "alive": alive, + }) + .into(); + + apply_and_persist(&mut state, |s| { + s.crdt.doc.nodes.insert(ROOT_ID, node_json) + }); + + // Rebuild node index after insertion. + state.node_index = rebuild_node_index(&state.crdt); + } +} + +/// Read all node presence entries from the CRDT document. +/// +/// Returns `None` before `init()`. +pub fn read_all_node_presence() -> Option> { + let state_mutex = CRDT_STATE.get()?; + let state = state_mutex.lock().ok()?; + + let mut nodes = Vec::new(); + for node_crdt in state.crdt.doc.nodes.iter() { + if let Some(view) = extract_node_view(node_crdt) { + nodes.push(view); + } + } + Some(nodes) +} + +/// Extract a `NodePresenceView` from a `NodePresenceCrdt`. +fn extract_node_view(node: &NodePresenceCrdt) -> Option { + let node_id = match node.node_id.view() { + JsonValue::String(s) if !s.is_empty() => s, + _ => return None, + }; + let address = match node.address.view() { + JsonValue::String(s) if !s.is_empty() => s, + _ => return None, + }; + let last_seen = match node.last_seen.view() { + JsonValue::Number(n) => n, + _ => 0.0, + }; + let alive = match node.alive.view() { + JsonValue::Bool(b) => b, + _ => true, + }; + Some(NodePresenceView { + node_id, + address, + last_seen, + alive, + }) +} + // ── Debug dump ─────────────────────────────────────────────────────── /// A raw dump of a single CRDT list entry, including deleted items. @@ -1452,6 +1593,7 @@ mod tests { crdt, keypair: kp, index: HashMap::new(), + node_index: HashMap::new(), persist_tx, };