2026-04-26 20:54:15 +00:00
|
|
|
//! Node identity, work claiming, and node presence (heartbeat) API.
|
|
|
|
|
|
2026-04-27 01:32:08 +00:00
|
|
|
#![allow(unused_imports, dead_code)]
|
2026-04-26 20:54:15 +00:00
|
|
|
use super::hex;
|
|
|
|
|
use super::read::read_item;
|
2026-04-27 01:32:08 +00:00
|
|
|
use bft_json_crdt::json_crdt::*;
|
2026-04-26 20:54:15 +00:00
|
|
|
use bft_json_crdt::lww_crdt::LwwRegisterCrdt;
|
2026-04-27 01:32:08 +00:00
|
|
|
use bft_json_crdt::op::ROOT_ID;
|
2026-04-26 20:54:15 +00:00
|
|
|
use fastcrypto::traits::{Signer, ToFromBytes};
|
|
|
|
|
use serde_json::json;
|
|
|
|
|
|
|
|
|
|
use super::state::{apply_and_persist, get_crdt, rebuild_node_index};
|
|
|
|
|
use super::types::{NodePresenceCrdt, NodePresenceView, PipelineDoc};
|
|
|
|
|
use crate::slog;
|
|
|
|
|
|
|
|
|
|
// ── Node presence API ────────────────────────────────────────────────
|
|
|
|
|
|
|
|
|
|
/// Return the hex-encoded Ed25519 public key for this node.
|
|
|
|
|
///
|
|
|
|
|
/// Used as the stable identity written into the CRDT nodes list.
|
|
|
|
|
/// Returns `None` before `init()`.
|
|
|
|
|
pub fn our_node_id() -> Option<String> {
|
|
|
|
|
let state = get_crdt()?.lock().ok()?;
|
|
|
|
|
Some(hex::encode(&state.crdt.id))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Sign a challenge nonce with this node's keypair for WebSocket mutual auth.
|
|
|
|
|
///
|
|
|
|
|
/// Returns `(pubkey_hex, signature_hex)` or `None` before `init()`.
|
|
|
|
|
pub fn sign_challenge(challenge: &str) -> Option<(String, String)> {
|
|
|
|
|
let state = get_crdt()?.lock().ok()?;
|
|
|
|
|
let pubkey_hex = crate::node_identity::public_key_hex(&state.keypair);
|
|
|
|
|
let sig_hex = crate::node_identity::sign_challenge(&state.keypair, challenge);
|
|
|
|
|
Some((pubkey_hex, sig_hex))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Write a claim on a pipeline item via CRDT.
|
|
|
|
|
///
|
|
|
|
|
/// Sets `claimed_by` to this node's ID and `claimed_at` to the current time.
|
|
|
|
|
/// The LWW register ensures deterministic conflict resolution — if two nodes
|
|
|
|
|
/// claim the same item simultaneously, both will converge to the same winner
|
|
|
|
|
/// after CRDT sync.
|
|
|
|
|
///
|
|
|
|
|
/// Returns `true` if the claim was written, `false` if the item doesn't exist
|
|
|
|
|
/// or CRDT is not initialised.
|
|
|
|
|
pub fn write_claim(story_id: &str) -> bool {
|
|
|
|
|
let Some(node_id) = our_node_id() else {
|
|
|
|
|
return false;
|
|
|
|
|
};
|
|
|
|
|
let now = chrono::Utc::now().timestamp() as f64;
|
|
|
|
|
|
|
|
|
|
let Some(state_mutex) = get_crdt() else {
|
|
|
|
|
return false;
|
|
|
|
|
};
|
|
|
|
|
let Ok(mut state) = state_mutex.lock() else {
|
|
|
|
|
return false;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let Some(&idx) = state.index.get(story_id) else {
|
|
|
|
|
return false;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
apply_and_persist(&mut state, |s| {
|
|
|
|
|
s.crdt.doc.items[idx].claimed_by.set(node_id.clone())
|
|
|
|
|
});
|
|
|
|
|
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(now));
|
|
|
|
|
|
|
|
|
|
true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Release a claim on a pipeline item (clear claimed_by and claimed_at).
|
|
|
|
|
pub fn release_claim(story_id: &str) {
|
|
|
|
|
let Some(state_mutex) = get_crdt() else {
|
|
|
|
|
return;
|
|
|
|
|
};
|
|
|
|
|
let Ok(mut state) = state_mutex.lock() else {
|
|
|
|
|
return;
|
|
|
|
|
};
|
|
|
|
|
let Some(&idx) = state.index.get(story_id) else {
|
|
|
|
|
return;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
apply_and_persist(&mut state, |s| {
|
|
|
|
|
s.crdt.doc.items[idx].claimed_by.set(String::new())
|
|
|
|
|
});
|
|
|
|
|
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(0.0));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Check if this node currently holds the claim on a pipeline item.
|
|
|
|
|
pub fn is_claimed_by_us(story_id: &str) -> bool {
|
|
|
|
|
let Some(node_id) = our_node_id() else {
|
|
|
|
|
return false;
|
|
|
|
|
};
|
|
|
|
|
let Some(item) = read_item(story_id) else {
|
|
|
|
|
return false;
|
|
|
|
|
};
|
|
|
|
|
item.claimed_by.as_deref() == Some(&node_id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Write or update a node presence entry in the CRDT.
|
|
|
|
|
///
|
|
|
|
|
/// If a node with the given `node_id` already exists, only `last_seen`,
|
|
|
|
|
/// `alive`, and `address` are updated. If not, a new entry is inserted.
|
|
|
|
|
///
|
|
|
|
|
/// This is the write path for both local heartbeats and tombstoning.
|
|
|
|
|
pub fn write_node_presence(node_id: &str, address: &str, last_seen: f64, alive: bool) {
|
|
|
|
|
let Some(state_mutex) = get_crdt() else {
|
|
|
|
|
return;
|
|
|
|
|
};
|
|
|
|
|
let Ok(mut state) = state_mutex.lock() else {
|
|
|
|
|
return;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if let Some(&idx) = state.node_index.get(node_id) {
|
|
|
|
|
// Update existing entry — three separate ops so peers can merge independently.
|
|
|
|
|
apply_and_persist(&mut state, |s| {
|
|
|
|
|
s.crdt.doc.nodes[idx].last_seen.set(last_seen)
|
|
|
|
|
});
|
|
|
|
|
apply_and_persist(&mut state, |s| s.crdt.doc.nodes[idx].alive.set(alive));
|
|
|
|
|
apply_and_persist(&mut state, |s| {
|
|
|
|
|
s.crdt.doc.nodes[idx].address.set(address.to_string())
|
|
|
|
|
});
|
|
|
|
|
} else {
|
|
|
|
|
// Insert new node entry.
|
|
|
|
|
let node_json: JsonValue = json!({
|
|
|
|
|
"node_id": node_id,
|
|
|
|
|
"address": address,
|
|
|
|
|
"last_seen": last_seen,
|
|
|
|
|
"alive": alive,
|
|
|
|
|
})
|
|
|
|
|
.into();
|
|
|
|
|
|
|
|
|
|
apply_and_persist(&mut state, |s| s.crdt.doc.nodes.insert(ROOT_ID, node_json));
|
|
|
|
|
|
|
|
|
|
// Rebuild node index after insertion.
|
|
|
|
|
state.node_index = rebuild_node_index(&state.crdt);
|
2026-04-27 12:26:28 +00:00
|
|
|
|
|
|
|
|
// Advance the inner registers of the newly-created node to the Lamport
|
|
|
|
|
// floor so their first local ops don't re-emit low sequence numbers.
|
|
|
|
|
let floor = state.lamport_floor;
|
|
|
|
|
if floor > 0
|
|
|
|
|
&& let Some(&idx) = state.node_index.get(node_id)
|
|
|
|
|
{
|
|
|
|
|
let node = &mut state.crdt.doc.nodes[idx];
|
|
|
|
|
node.node_id.advance_seq(floor);
|
|
|
|
|
node.address.advance_seq(floor);
|
|
|
|
|
node.last_seen.advance_seq(floor);
|
|
|
|
|
node.alive.advance_seq(floor);
|
|
|
|
|
}
|
2026-04-26 20:54:15 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Read all node presence entries from the CRDT document.
|
|
|
|
|
///
|
|
|
|
|
/// Returns `None` before `init()`.
|
|
|
|
|
pub fn read_all_node_presence() -> Option<Vec<NodePresenceView>> {
|
|
|
|
|
let state_mutex = get_crdt()?;
|
|
|
|
|
let state = state_mutex.lock().ok()?;
|
|
|
|
|
|
|
|
|
|
let mut nodes = Vec::new();
|
|
|
|
|
for node_crdt in state.crdt.doc.nodes.iter() {
|
|
|
|
|
if let Some(view) = extract_node_view(node_crdt) {
|
|
|
|
|
nodes.push(view);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Some(nodes)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Extract a `NodePresenceView` from a `NodePresenceCrdt`.
|
|
|
|
|
fn extract_node_view(node: &NodePresenceCrdt) -> Option<NodePresenceView> {
|
|
|
|
|
let node_id = match node.node_id.view() {
|
|
|
|
|
JsonValue::String(s) if !s.is_empty() => s,
|
|
|
|
|
_ => return None,
|
|
|
|
|
};
|
|
|
|
|
let address = match node.address.view() {
|
|
|
|
|
JsonValue::String(s) if !s.is_empty() => s,
|
|
|
|
|
_ => return None,
|
|
|
|
|
};
|
|
|
|
|
let last_seen = match node.last_seen.view() {
|
|
|
|
|
JsonValue::Number(n) => n,
|
|
|
|
|
_ => 0.0,
|
|
|
|
|
};
|
|
|
|
|
let alive = match node.alive.view() {
|
|
|
|
|
JsonValue::Bool(b) => b,
|
|
|
|
|
_ => true,
|
|
|
|
|
};
|
|
|
|
|
Some(NodePresenceView {
|
|
|
|
|
node_id,
|
|
|
|
|
address,
|
|
|
|
|
last_seen,
|
|
|
|
|
alive,
|
|
|
|
|
})
|
|
|
|
|
}
|