huskies: merge 528_story_crdt_based_peer_discovery_via_node_presence_entries

This commit is contained in:
dave
2026-04-10 16:59:17 +00:00
parent 4c8fe910a7
commit 808935b446
+147 -5
View File
@@ -78,6 +78,7 @@ static ALL_OPS: OnceLock<Mutex<Vec<String>>> = OnceLock::new();
#[derive(Clone, CrdtNode, Debug)]
pub struct PipelineDoc {
pub items: ListCrdt<PipelineItemCrdt>,
pub nodes: ListCrdt<NodePresenceCrdt>,
}
#[add_crdt_fields]
@@ -92,6 +93,20 @@ pub struct PipelineItemCrdt {
pub depends_on: LwwRegisterCrdt<String>,
}
/// CRDT node that holds a single peer's presence entry.
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Debug)]
pub struct NodePresenceCrdt {
/// Hex-encoded Ed25519 public key — stable identity across restarts.
pub node_id: LwwRegisterCrdt<String>,
/// WebSocket URL this peer advertises, e.g. `ws://192.168.1.10:3001/crdt-sync`.
pub address: LwwRegisterCrdt<String>,
/// Unix timestamp (seconds) of the last heartbeat written by this node.
pub last_seen: LwwRegisterCrdt<f64>,
/// `false` once a stale-detection pass has tombstoned this node.
pub alive: LwwRegisterCrdt<bool>,
}
// ── Read-side view types ─────────────────────────────────────────────
/// A snapshot of a single pipeline item derived from the CRDT document.
@@ -106,13 +121,25 @@ pub struct PipelineItemView {
pub depends_on: Option<Vec<u32>>,
}
/// A snapshot of a single node presence entry derived from the CRDT document.
#[derive(Clone, Debug)]
pub struct NodePresenceView {
pub node_id: String,
pub address: String,
/// Unix timestamp (seconds).
pub last_seen: f64,
pub alive: bool,
}
// ── Internal state ───────────────────────────────────────────────────
struct CrdtState {
crdt: BaseCrdt<PipelineDoc>,
keypair: Ed25519KeyPair,
/// Maps story_id → index in the ListCrdt for O(1) lookup.
/// Maps story_id → index in the items ListCrdt for O(1) lookup.
index: HashMap<String, usize>,
/// Maps node_id (hex) → index in the nodes ListCrdt for O(1) lookup.
node_index: HashMap<String, usize>,
/// Channel sender for fire-and-forget op persistence.
persist_tx: mpsc::UnboundedSender<SignedOp>,
}
@@ -158,13 +185,15 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
}
let _ = ALL_OPS.set(Mutex::new(all_ops_vec));
// Build the index from the reconstructed state.
// Build the indices from the reconstructed state.
let index = rebuild_index(&crdt);
let node_index = rebuild_node_index(&crdt);
slog!(
"[crdt] Initialised: {} ops replayed, {} items indexed",
"[crdt] Initialised: {} ops replayed, {} items indexed, {} nodes indexed",
rows.len(),
index.len()
index.len(),
node_index.len()
);
// Spawn background persistence task.
@@ -205,6 +234,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
crdt,
keypair,
index,
node_index,
persist_tx,
};
@@ -234,11 +264,13 @@ pub fn init_for_test() {
let keypair = make_keypair();
let crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
let index = HashMap::new();
let node_index = HashMap::new();
let (persist_tx, _rx) = mpsc::unbounded_channel();
let state = CrdtState {
crdt,
keypair,
index,
node_index,
persist_tx,
};
let _ = CRDT_STATE.set(Mutex::new(state));
@@ -285,6 +317,17 @@ fn rebuild_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
map
}
/// Rebuild the node_id → nodes list index mapping from the current CRDT state.
fn rebuild_node_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
let mut map = HashMap::new();
for (i, node) in crdt.doc.nodes.iter().enumerate() {
if let JsonValue::String(ref nid) = node.node_id.view() {
map.insert(nid.clone(), i);
}
}
map
}
// ── Write path ───────────────────────────────────────────────────────
/// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the
@@ -490,8 +533,9 @@ pub fn apply_remote_op(op: SignedOp) -> bool {
v.push(json);
}
// Rebuild index (new items may have been inserted).
// Rebuild indices (new items or nodes may have been inserted).
state.index = rebuild_index(&state.crdt);
state.node_index = rebuild_node_index(&state.crdt);
// Detect and broadcast stage transitions.
for (sid, &idx) in &state.index {
@@ -518,6 +562,103 @@ pub fn apply_remote_op(op: SignedOp) -> bool {
true
}
// ── Node presence API ────────────────────────────────────────────────
/// Return the hex-encoded Ed25519 public key for this node.
///
/// Used as the stable identity written into the CRDT nodes list.
/// Returns `None` before `init()`.
pub fn our_node_id() -> Option<String> {
let state = CRDT_STATE.get()?.lock().ok()?;
Some(hex::encode(&state.crdt.id))
}
/// Write or update a node presence entry in the CRDT.
///
/// If a node with the given `node_id` already exists, only `last_seen`,
/// `alive`, and `address` are updated. If not, a new entry is inserted.
///
/// This is the write path for both local heartbeats and tombstoning.
pub fn write_node_presence(node_id: &str, address: &str, last_seen: f64, alive: bool) {
let Some(state_mutex) = CRDT_STATE.get() else {
return;
};
let Ok(mut state) = state_mutex.lock() else {
return;
};
if let Some(&idx) = state.node_index.get(node_id) {
// Update existing entry — three separate ops so peers can merge independently.
apply_and_persist(&mut state, |s| {
s.crdt.doc.nodes[idx].last_seen.set(last_seen)
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.nodes[idx].alive.set(alive)
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.nodes[idx].address.set(address.to_string())
});
} else {
// Insert new node entry.
let node_json: JsonValue = json!({
"node_id": node_id,
"address": address,
"last_seen": last_seen,
"alive": alive,
})
.into();
apply_and_persist(&mut state, |s| {
s.crdt.doc.nodes.insert(ROOT_ID, node_json)
});
// Rebuild node index after insertion.
state.node_index = rebuild_node_index(&state.crdt);
}
}
/// Read all node presence entries from the CRDT document.
///
/// Returns `None` before `init()`.
pub fn read_all_node_presence() -> Option<Vec<NodePresenceView>> {
let state_mutex = CRDT_STATE.get()?;
let state = state_mutex.lock().ok()?;
let mut nodes = Vec::new();
for node_crdt in state.crdt.doc.nodes.iter() {
if let Some(view) = extract_node_view(node_crdt) {
nodes.push(view);
}
}
Some(nodes)
}
/// Extract a `NodePresenceView` from a `NodePresenceCrdt`.
fn extract_node_view(node: &NodePresenceCrdt) -> Option<NodePresenceView> {
let node_id = match node.node_id.view() {
JsonValue::String(s) if !s.is_empty() => s,
_ => return None,
};
let address = match node.address.view() {
JsonValue::String(s) if !s.is_empty() => s,
_ => return None,
};
let last_seen = match node.last_seen.view() {
JsonValue::Number(n) => n,
_ => 0.0,
};
let alive = match node.alive.view() {
JsonValue::Bool(b) => b,
_ => true,
};
Some(NodePresenceView {
node_id,
address,
last_seen,
alive,
})
}
// ── Debug dump ───────────────────────────────────────────────────────
/// A raw dump of a single CRDT list entry, including deleted items.
@@ -1452,6 +1593,7 @@ mod tests {
crdt,
keypair: kp,
index: HashMap::new(),
node_index: HashMap::new(),
persist_tx,
};