//! Node identity, work claiming, and node presence (heartbeat) API. #![allow(unused_imports, dead_code)] use super::hex; use super::read::read_item; use bft_json_crdt::json_crdt::*; use bft_json_crdt::lww_crdt::LwwRegisterCrdt; use bft_json_crdt::op::ROOT_ID; use fastcrypto::traits::{Signer, ToFromBytes}; use serde_json::json; use super::state::{apply_and_persist, get_crdt, rebuild_node_index}; use super::types::{NodePresenceCrdt, NodePresenceView, PipelineDoc}; use crate::slog; // ── Node presence API ──────────────────────────────────────────────── /// Return the hex-encoded Ed25519 public key for this node. /// /// Used as the stable identity written into the CRDT nodes list. /// Returns `None` before `init()`. pub fn our_node_id() -> Option { let state = get_crdt()?.lock().ok()?; Some(hex::encode(&state.crdt.id)) } /// Sign a challenge nonce with this node's keypair for WebSocket mutual auth. /// /// Returns `(pubkey_hex, signature_hex)` or `None` before `init()`. pub fn sign_challenge(challenge: &str) -> Option<(String, String)> { let state = get_crdt()?.lock().ok()?; let pubkey_hex = crate::node_identity::public_key_hex(&state.keypair); let sig_hex = crate::node_identity::sign_challenge(&state.keypair, challenge); Some((pubkey_hex, sig_hex)) } /// Sign a versioned challenge `"huskies-v1:{nonce}"` for the extended WebSocket /// mutual-auth handshake (responding node side). /// /// The signature covers the full versioned string (not just the nonce), so an /// attacker cannot replay a signature from a previous handshake or a different /// protocol version. /// /// Returns `(pubkey_hex, signature_hex)` or `None` before `init()`. pub fn sign_versioned_challenge(nonce: &str) -> Option<(String, String)> { let state = get_crdt()?.lock().ok()?; let pubkey_hex = crate::node_identity::public_key_hex(&state.keypair); let versioned = format!("huskies-v1:{nonce}"); let sig_hex = crate::node_identity::sign_challenge(&state.keypair, &versioned); Some((pubkey_hex, sig_hex)) } /// Write a claim on a pipeline item via CRDT. /// /// Sets `claimed_by` to this node's ID and `claimed_at` to the current time. /// The LWW register ensures deterministic conflict resolution — if two nodes /// claim the same item simultaneously, both will converge to the same winner /// after CRDT sync. /// /// Returns `true` if the claim was written, `false` if the item doesn't exist /// or CRDT is not initialised. pub fn write_claim(story_id: &str) -> bool { let Some(node_id) = our_node_id() else { return false; }; let now = chrono::Utc::now().timestamp() as f64; let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].claimed_by.set(node_id.clone()) }); apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(now)); true } /// Release a claim on a pipeline item (clear claimed_by and claimed_at). pub fn release_claim(story_id: &str) { let Some(state_mutex) = get_crdt() else { return; }; let Ok(mut state) = state_mutex.lock() else { return; }; let Some(&idx) = state.index.get(story_id) else { return; }; apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].claimed_by.set(String::new()) }); apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(0.0)); } /// Check if this node currently holds the claim on a pipeline item. pub fn is_claimed_by_us(story_id: &str) -> bool { let Some(node_id) = our_node_id() else { return false; }; let Some(item) = read_item(story_id) else { return false; }; item.claimed_by.as_deref() == Some(&node_id) } /// Write or update a node presence entry in the CRDT. /// /// If a node with the given `node_id` already exists, only `last_seen`, /// `alive`, and `address` are updated. If not, a new entry is inserted. /// /// This is the write path for both local heartbeats and tombstoning. pub fn write_node_presence(node_id: &str, address: &str, last_seen: f64, alive: bool) { let Some(state_mutex) = get_crdt() else { return; }; let Ok(mut state) = state_mutex.lock() else { return; }; if let Some(&idx) = state.node_index.get(node_id) { // Update existing entry — three separate ops so peers can merge independently. apply_and_persist(&mut state, |s| { s.crdt.doc.nodes[idx].last_seen.set(last_seen) }); apply_and_persist(&mut state, |s| s.crdt.doc.nodes[idx].alive.set(alive)); apply_and_persist(&mut state, |s| { s.crdt.doc.nodes[idx].address.set(address.to_string()) }); } else { // Insert new node entry. let node_json: JsonValue = json!({ "node_id": node_id, "address": address, "last_seen": last_seen, "alive": alive, }) .into(); apply_and_persist(&mut state, |s| s.crdt.doc.nodes.insert(ROOT_ID, node_json)); // Rebuild node index after insertion. state.node_index = rebuild_node_index(&state.crdt); // Advance the inner registers of the newly-created node to the Lamport // floor so their first local ops don't re-emit low sequence numbers. let floor = state.lamport_floor; if floor > 0 && let Some(&idx) = state.node_index.get(node_id) { let node = &mut state.crdt.doc.nodes[idx]; node.node_id.advance_seq(floor); node.address.advance_seq(floor); node.last_seen.advance_seq(floor); node.alive.advance_seq(floor); node.label.advance_seq(floor); node.assigned_project.advance_seq(floor); node.last_seen_ms.advance_seq(floor); } } } /// Write agent metadata fields for an existing node presence entry. /// /// Updates `label`, `assigned_project`, and `last_seen_ms` for the node /// identified by `node_id`. Does nothing if the node does not exist or the /// CRDT is not initialised. pub fn write_node_metadata( node_id: &str, label: &str, assigned_project: Option<&str>, last_seen_ms: f64, ) { let Some(state_mutex) = get_crdt() else { return; }; let Ok(mut state) = state_mutex.lock() else { return; }; let Some(&idx) = state.node_index.get(node_id) else { return; }; apply_and_persist(&mut state, |s| { s.crdt.doc.nodes[idx].label.set(label.to_string()) }); apply_and_persist(&mut state, |s| { s.crdt.doc.nodes[idx] .assigned_project .set(assigned_project.unwrap_or("").to_string()) }); apply_and_persist(&mut state, |s| { s.crdt.doc.nodes[idx].last_seen_ms.set(last_seen_ms) }); } /// Read all node presence entries from the CRDT document. /// /// Returns `None` before `init()`. pub fn read_all_node_presence() -> Option> { let state_mutex = get_crdt()?; let state = state_mutex.lock().ok()?; let mut nodes = Vec::new(); for node_crdt in state.crdt.doc.nodes.iter() { if let Some(view) = extract_node_view(node_crdt) { nodes.push(view); } } Some(nodes) } /// Extract a `NodePresenceView` from a `NodePresenceCrdt`. fn extract_node_view(node: &NodePresenceCrdt) -> Option { let node_id = match node.node_id.view() { JsonValue::String(s) if !s.is_empty() => s, _ => return None, }; let address = match node.address.view() { JsonValue::String(s) if !s.is_empty() => s, _ => return None, }; let last_seen = match node.last_seen.view() { JsonValue::Number(n) => n, _ => 0.0, }; let alive = match node.alive.view() { JsonValue::Bool(b) => b, _ => true, }; let label = match node.label.view() { JsonValue::String(s) if !s.is_empty() => Some(s), _ => None, }; let assigned_project = match node.assigned_project.view() { JsonValue::String(s) if !s.is_empty() => Some(s), _ => None, }; let last_seen_ms = match node.last_seen_ms.view() { JsonValue::Number(n) if n > 0.0 => Some(n), _ => None, }; Some(NodePresenceView { node_id, address, last_seen, alive, label, assigned_project, last_seen_ms, }) }