From 9b24c2e28115c156def0c5f5c9ba22cc43aebfd1 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 27 Apr 2026 23:44:36 +0000 Subject: [PATCH] huskies: merge 743 --- server/src/crdt_state/mod.rs | 2 +- server/src/crdt_state/presence.rs | 51 +++++++++++++ server/src/crdt_state/types.rs | 122 ++++++++++++++++++++++++++++++ 3 files changed, 174 insertions(+), 1 deletion(-) diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index f206c497..81135812 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -27,7 +27,7 @@ mod write; pub use ops::{all_ops_json, apply_remote_op, ops_since, our_vector_clock, subscribe_ops}; pub use presence::{ is_claimed_by_us, our_node_id, read_all_node_presence, release_claim, sign_challenge, - sign_versioned_challenge, write_claim, write_node_presence, + sign_versioned_challenge, write_claim, write_node_metadata, write_node_presence, }; pub use read::{ CrdtItemDump, CrdtStateDump, check_archived_deps_crdt, check_unmet_deps_crdt, diff --git a/server/src/crdt_state/presence.rs b/server/src/crdt_state/presence.rs index e61036f5..b9671103 100644 --- a/server/src/crdt_state/presence.rs +++ b/server/src/crdt_state/presence.rs @@ -162,10 +162,46 @@ pub fn write_node_presence(node_id: &str, address: &str, last_seen: f64, alive: node.address.advance_seq(floor); node.last_seen.advance_seq(floor); node.alive.advance_seq(floor); + node.label.advance_seq(floor); + node.assigned_project.advance_seq(floor); + node.last_seen_ms.advance_seq(floor); } } } +/// Write agent metadata fields for an existing node presence entry. +/// +/// Updates `label`, `assigned_project`, and `last_seen_ms` for the node +/// identified by `node_id`. Does nothing if the node does not exist or the +/// CRDT is not initialised. +pub fn write_node_metadata( + node_id: &str, + label: &str, + assigned_project: Option<&str>, + last_seen_ms: f64, +) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + let Some(&idx) = state.node_index.get(node_id) else { + return; + }; + apply_and_persist(&mut state, |s| { + s.crdt.doc.nodes[idx].label.set(label.to_string()) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.nodes[idx] + .assigned_project + .set(assigned_project.unwrap_or("").to_string()) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.nodes[idx].last_seen_ms.set(last_seen_ms) + }); +} + /// Read all node presence entries from the CRDT document. /// /// Returns `None` before `init()`. @@ -200,10 +236,25 @@ fn extract_node_view(node: &NodePresenceCrdt) -> Option { JsonValue::Bool(b) => b, _ => true, }; + let label = match node.label.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let assigned_project = match node.assigned_project.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let last_seen_ms = match node.last_seen_ms.view() { + JsonValue::Number(n) if n > 0.0 => Some(n), + _ => None, + }; Some(NodePresenceView { node_id, address, last_seen, alive, + label, + assigned_project, + last_seen_ms, }) } diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs index 07f8174e..7be09870 100644 --- a/server/src/crdt_state/types.rs +++ b/server/src/crdt_state/types.rs @@ -74,6 +74,12 @@ pub struct NodePresenceCrdt { pub last_seen: LwwRegisterCrdt, /// `false` once a stale-detection pass has tombstoned this node. pub alive: LwwRegisterCrdt, + /// Human-readable display name for this node, e.g. `"builder-1"`. + pub label: LwwRegisterCrdt, + /// Project slug this node is assigned to, or empty string if unassigned. + pub assigned_project: LwwRegisterCrdt, + /// Unix timestamp in **milliseconds** of the last heartbeat (higher precision than `last_seen`). + pub last_seen_ms: LwwRegisterCrdt, } // ── Read-side view types ───────────────────────────────────────────── @@ -105,6 +111,12 @@ pub struct NodePresenceView { /// Unix timestamp (seconds). pub last_seen: f64, pub alive: bool, + /// Human-readable display name; `None` if not yet set. + pub label: Option, + /// Project slug this node is assigned to; `None` if unassigned. + pub assigned_project: Option, + /// Unix timestamp in milliseconds; `None` if not yet set. + pub last_seen_ms: Option, } #[cfg(test)] @@ -229,6 +241,116 @@ mod tests { }); } + #[test] + fn node_metadata_fields_replicate_to_peer() { + // Create two independent CRDT instances simulating two peers. + let kp_a = make_keypair(); + let kp_b = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + let mut crdt_b = BaseCrdt::::new(&kp_b); + + // Peer A inserts a node entry with the new metadata fields. + let node_json: JsonValue = serde_json::json!({ + "node_id": "aabbccdd", + "address": "ws://192.168.1.10:3001/crdt-sync", + "last_seen": 1_000_000.0_f64, + "alive": true, + "label": "builder-1", + "assigned_project": "my-project", + "last_seen_ms": 1_000_000_123.0_f64, + }) + .into(); + + let insert_op = crdt_a.doc.nodes.insert(ROOT_ID, node_json).sign(&kp_a); + assert_eq!(crdt_a.apply(insert_op.clone()), OpState::Ok); + + // Replicate the insert op to peer B. + assert_eq!(crdt_b.apply(insert_op), OpState::Ok); + + // Assert the node appeared on peer B. + assert_eq!(crdt_b.doc.nodes.view().len(), 1); + + let node = &crdt_b.doc.nodes[0]; + assert_eq!( + node.node_id.view(), + JsonValue::String("aabbccdd".to_string()) + ); + assert_eq!( + node.label.view(), + JsonValue::String("builder-1".to_string()) + ); + assert_eq!( + node.assigned_project.view(), + JsonValue::String("my-project".to_string()) + ); + assert_eq!(node.last_seen_ms.view(), JsonValue::Number(1_000_000_123.0)); + + // Peer A updates label and last_seen_ms via separate ops. + let label_op = crdt_a.doc.nodes[0] + .label + .set("builder-1-renamed".to_string()) + .sign(&kp_a); + let ms_op = crdt_a.doc.nodes[0] + .last_seen_ms + .set(2_000_000_000.0_f64) + .sign(&kp_a); + assert_eq!(crdt_a.apply(label_op.clone()), OpState::Ok); + assert_eq!(crdt_a.apply(ms_op.clone()), OpState::Ok); + + // Replicate updates to peer B. + assert_eq!(crdt_b.apply(label_op), OpState::Ok); + assert_eq!(crdt_b.apply(ms_op), OpState::Ok); + + // Both peers converge to the same state. + assert_eq!( + crdt_b.doc.nodes[0].label.view(), + JsonValue::String("builder-1-renamed".to_string()) + ); + assert_eq!( + crdt_b.doc.nodes[0].last_seen_ms.view(), + JsonValue::Number(2_000_000_000.0) + ); + assert_eq!( + crdt_a.doc.nodes[0].label.view(), + crdt_b.doc.nodes[0].label.view() + ); + assert_eq!( + crdt_a.doc.nodes[0].last_seen_ms.view(), + crdt_b.doc.nodes[0].last_seen_ms.view() + ); + } + + #[test] + fn node_metadata_deserializes_with_defaults_for_pre_existing_entries() { + // Pre-existing entries without new fields should deserialize with sensible defaults. + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + + // Insert a legacy node without the new fields. + let legacy_json: JsonValue = serde_json::json!({ + "node_id": "legacy-node", + "address": "ws://10.0.0.1:3001/crdt-sync", + "last_seen": 500_000.0_f64, + "alive": true, + }) + .into(); + + let op = crdt.doc.nodes.insert(ROOT_ID, legacy_json).sign(&kp); + assert_eq!(crdt.apply(op), OpState::Ok); + + let node = &crdt.doc.nodes[0]; + // New fields default to Null (not set). + assert_eq!(node.label.view(), JsonValue::Null); + assert_eq!(node.assigned_project.view(), JsonValue::Null); + assert_eq!(node.last_seen_ms.view(), JsonValue::Null); + // Existing fields still work. + assert_eq!( + node.node_id.view(), + JsonValue::String("legacy-node".to_string()) + ); + assert_eq!(node.alive.view(), JsonValue::Bool(true)); + } + #[test] fn crdt_event_broadcast_channel_round_trip() { let (tx, mut rx) = broadcast::channel::(16);