//! CRDT sync — WebSocket-based replication of pipeline state between huskies nodes. /// WebSocket-based CRDT sync layer for replicating pipeline state between /// huskies nodes. /// /// # Protocol /// /// ## Version negotiation /// /// After the auth handshake, both sides send their first sync message: /// /// - **v2 peers** send a `clock` frame: `{"type":"clock","clock":{ : , ... }}` /// containing a vector clock that maps each author's hex Ed25519 pubkey to the /// count of ops received from that author. Upon receiving the peer's clock, /// each side computes the delta via [`crdt_state::ops_since`] and sends only /// the missing ops as a `bulk` frame. /// /// - **v1 (legacy) peers** send a `bulk` frame directly (full op dump). /// A v2 peer receiving a `bulk` first (instead of a `clock`) falls back to /// the full-dump path: applies the incoming bulk and responds with its own /// full bulk. This preserves backward compatibility — no code change needed /// on the v1 side. /// /// ## Text frames /// A JSON object with a `"type"` field: /// - `{"type":"clock","clock":{...}}` — Vector clock (v2 protocol). /// - `{"type":"bulk","ops":[...]}` — Ops dump (full or delta). /// - `{"type":"ready"}` — Signals that the bulk-delta phase is complete and the /// sender is ready for real-time op streaming. Locally-generated ops are /// buffered until the peer's `ready` is received, then flushed in order. /// /// ## Binary frames (real-time op broadcast) /// Individual `SignedOp`s encoded via [`crate::crdt_wire`] (versioned JSON /// envelope: `{"v":1,"op":{...}}`). Each locally-applied op is immediately /// broadcast as a binary frame to all connected peers. /// /// Both the server endpoint and the rendezvous client use the same protocol, /// making the connection fully symmetric. /// /// ## Backpressure /// Each connected peer has its own [`tokio::sync::broadcast`] receiver. If a /// slow peer allows the channel to fill (indicated by a `Lagged` error), the /// connection is dropped with a warning log. The peer can reconnect and /// receive a fresh bulk state dump to catch up. use bft_json_crdt::json_crdt::SignedOp; use futures::{SinkExt, StreamExt}; use poem::handler; use poem::http::StatusCode; use poem::web::Data; use poem::web::Query; use poem::web::websocket::{Message as WsMessage, WebSocket}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::{Arc, OnceLock}; use crate::crdt_snapshot; use crate::crdt_state; use crate::crdt_wire; use crate::http::context::AppContext; use crate::node_identity; use crate::slog; use crate::slog_error; use crate::slog_warn; // ── Auth configuration ────────────────────────────────────────────── /// Default timeout for the auth handshake (seconds). const AUTH_TIMEOUT_SECS: u64 = 10; // ── Keepalive configuration ───────────────────────────────────────── /// Interval (seconds) between WebSocket Ping frames sent by each side. pub const PING_INTERVAL_SECS: u64 = 30; /// Seconds without a Pong response before the connection is dropped. pub const PONG_TIMEOUT_SECS: u64 = 60; /// Trusted public keys loaded once at startup. static TRUSTED_KEYS: OnceLock> = OnceLock::new(); /// Initialise the trusted-key allow-list for connect-time mutual auth. /// /// Must be called once at startup before any WebSocket connections are /// accepted. Subsequent calls are no-ops (OnceLock). pub fn init_trusted_keys(keys: Vec) { let _ = TRUSTED_KEYS.set(keys); } /// Return a reference to the trusted-key allow-list. fn trusted_keys() -> &'static [String] { TRUSTED_KEYS.get().map(|v| v.as_slice()).unwrap_or(&[]) } // ── Bearer-token auth ─────────────────────────────────────────────── /// Time-to-live for CRDT bearer tokens in seconds (30 days). const TOKEN_TTL_SECS: f64 = 30.0 * 24.0 * 3600.0; /// Whether a bearer token is required for `/crdt-sync` connections. /// `None` (uninitialised) → open access (backward compatible). static REQUIRE_TOKEN: OnceLock = OnceLock::new(); /// Valid bearer tokens — maps token string to its expiry unix timestamp. static CRDT_TOKENS: OnceLock>> = OnceLock::new(); /// Initialise bearer-token auth for CRDT-sync connections. /// /// Must be called once at startup before any WebSocket connections are accepted. /// When `require` is `true`, clients must supply a valid `?token=` query /// parameter on the upgrade request or receive HTTP 401. When `require` is /// `false` (default) a token is optional — connections without one are /// accepted, but a supplied token is still validated. pub fn init_token_auth(require: bool, tokens: Vec) { let _ = REQUIRE_TOKEN.set(require); let store = CRDT_TOKENS.get_or_init(|| std::sync::RwLock::new(HashMap::new())); if let Ok(mut map) = store.write() { let now = chrono::Utc::now().timestamp() as f64; for token in tokens { map.insert(token, now + TOKEN_TTL_SECS); } } } /// Add a bearer token to the CRDT-sync token store. /// /// The token expires after [`TOKEN_TTL_SECS`] seconds. Returns the expiry /// unix timestamp so callers can surface it in admin tooling. pub fn add_join_token(token: String) -> f64 { let store = CRDT_TOKENS.get_or_init(|| std::sync::RwLock::new(HashMap::new())); let now = chrono::Utc::now().timestamp() as f64; let expires_at = now + TOKEN_TTL_SECS; if let Ok(mut map) = store.write() { map.insert(token, expires_at); } expires_at } /// Validate a bearer token against the CRDT-sync token store. /// /// Returns `true` if the token exists in the store and has not expired. fn validate_join_token(token: &str) -> bool { let Some(store) = CRDT_TOKENS.get() else { return false; }; let now = chrono::Utc::now().timestamp() as f64; store .read() .ok() .and_then(|map| map.get(token).copied()) .is_some_and(|expires_at| expires_at > now) } // ── Wire protocol types ───────────────────────────────────────────── /// Auth handshake: challenge sent by the listener to the connector. #[derive(Serialize, Deserialize, Debug)] struct ChallengeMessage { r#type: String, nonce: String, } /// Auth handshake: auth reply sent by the connector to the listener. #[derive(Serialize, Deserialize, Debug)] struct AuthMessage { r#type: String, pubkey_hex: String, signature_hex: String, } #[derive(Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] pub(crate) enum SyncMessage { /// Bulk state dump sent on connect (v1) or delta ops after clock exchange (v2). Bulk { ops: Vec }, /// A single new op. Op { op: String }, /// Vector clock exchanged on connect (v2 protocol). /// /// Each entry maps a node's hex-encoded Ed25519 public key to the count of /// ops received from that node. The receiving side computes the delta via /// [`crdt_state::ops_since`] and sends only the missing ops. Clock { clock: std::collections::HashMap, }, /// Signals that the bulk-delta phase is complete; the sender is ready for /// real-time op streaming. Locally-generated ops are buffered until the /// peer's `Ready` is received, then flushed in-order. Ready, } /// Crate-visible re-export of `SyncMessage` for backwards-compatibility testing. /// /// Used by `crdt_snapshot` tests to verify that snapshot messages are NOT /// parseable as legacy `SyncMessage` variants — confirming that old peers /// will gracefully reject them. #[cfg(test)] pub(crate) type SyncMessagePublic = SyncMessage; // ── Server-side WebSocket handler ─────────────────────────────────── /// Query parameters accepted on the `/crdt-sync` WebSocket upgrade request. #[derive(Deserialize)] struct SyncQueryParams { /// Optional bearer token. Required when the server is in token-required mode. token: Option, } /// WebSocket handler for CRDT peer synchronisation. /// /// Accepts an optional `?token=` query parameter. When the /// server is configured with `crdt_require_token = true`, a valid token must /// be supplied or the upgrade is rejected with HTTP 401. When the server is /// in open-access mode (the default), a token is optional but still validated /// if present. #[handler] pub async fn crdt_sync_handler( ws: WebSocket, _ctx: Data<&Arc>, remote_addr: &poem::web::RemoteAddr, Query(params): Query, ) -> poem::Response { // ── Bearer-token check (pre-upgrade) ──────────────────────────── let require_token = REQUIRE_TOKEN.get().copied().unwrap_or(false); match ¶ms.token { Some(t) => { if !validate_join_token(t) { slog!("[crdt-sync] Rejected connection: invalid or expired token"); return poem::Response::builder() .status(StatusCode::UNAUTHORIZED) .body("invalid or expired token"); } } None if require_token => { slog!("[crdt-sync] Rejected connection: token required but not provided"); return poem::Response::builder() .status(StatusCode::UNAUTHORIZED) .body("token required"); } None => {} } // ── WebSocket upgrade ──────────────────────────────────────────── use poem::IntoResponse as _; let peer_addr = remote_addr.to_string(); ws.on_upgrade(move |socket| async move { let (mut sink, mut stream) = socket.split(); slog!("[crdt-sync] Peer connected, starting auth handshake"); // ── Step 1: Send challenge to the connecting peer ───────── let challenge = node_identity::generate_challenge(); let challenge_msg = ChallengeMessage { r#type: "challenge".to_string(), nonce: challenge.clone(), }; let challenge_json = match serde_json::to_string(&challenge_msg) { Ok(j) => j, Err(_) => return, }; if sink.send(WsMessage::Text(challenge_json)).await.is_err() { return; } // ── Step 2: Await auth reply within timeout ─────────────── let auth_result = tokio::time::timeout( std::time::Duration::from_secs(AUTH_TIMEOUT_SECS), stream.next(), ) .await; let auth_text = match auth_result { Ok(Some(Ok(WsMessage::Text(text)))) => text, Ok(_) | Err(_) => { // Timeout or connection closed before auth reply. slog!("[crdt-sync] Auth timeout or connection lost during handshake"); let _ = sink .send(WsMessage::Close(Some(( poem::web::websocket::CloseCode::from(4001), "auth_timeout".to_string(), )))) .await; let _ = sink.close().await; return; } }; // ── Step 3: Verify auth reply ───────────────────────────── let auth_msg: AuthMessage = match serde_json::from_str(&auth_text) { Ok(m) => m, Err(_) => { slog!("[crdt-sync] Invalid auth message from peer"); close_with_auth_failed(&mut sink).await; return; } }; // Verify signature AND check allow-list. let sig_valid = node_identity::verify_challenge(&auth_msg.pubkey_hex, &challenge, &auth_msg.signature_hex); let key_trusted = trusted_keys().iter().any(|k| k == &auth_msg.pubkey_hex); if !sig_valid || !key_trusted { slog!("[crdt-sync] Auth failed for peer (sig_valid={sig_valid}, key_trusted={key_trusted})"); close_with_auth_failed(&mut sink).await; return; } slog!( "[crdt-sync] Peer authenticated: {:.12}…", &auth_msg.pubkey_hex ); // ── Auth passed — proceed with CRDT sync ────────────────── // v2 protocol: send our vector clock so the peer can compute the delta. let our_clock = crdt_state::our_vector_clock().unwrap_or_default(); let clock_msg = SyncMessage::Clock { clock: our_clock }; if let Ok(json) = serde_json::to_string(&clock_msg) && sink.send(WsMessage::Text(json)).await.is_err() { return; } // Wait for the peer's first sync message to determine protocol version. let first_msg = tokio::time::timeout( std::time::Duration::from_secs(AUTH_TIMEOUT_SECS), wait_for_sync_text(&mut stream, &mut sink), ) .await; match first_msg { Ok(Some(SyncMessage::Clock { clock: peer_clock })) => { // v2 peer — if we have a snapshot and the peer has an empty // clock (new node), send the snapshot first for onboarding. if peer_clock.is_empty() && let Some(snapshot) = crdt_snapshot::latest_snapshot() { let snap_msg = crdt_snapshot::SnapshotMessage::Snapshot(snapshot); if let Ok(json) = serde_json::to_string(&snap_msg) { if sink.send(WsMessage::Text(json)).await.is_err() { return; } slog!("[crdt-sync] Sent snapshot to new node for onboarding"); } } // Send only the ops the peer is missing. let delta = crdt_state::ops_since(&peer_clock).unwrap_or_default(); slog!( "[crdt-sync] v2 delta sync: sending {} ops (peer missing)", delta.len() ); let msg = SyncMessage::Bulk { ops: delta }; if let Ok(json) = serde_json::to_string(&msg) && sink.send(WsMessage::Text(json)).await.is_err() { return; } } Ok(Some(SyncMessage::Bulk { ops })) => { // v1 peer — apply their bulk and send our full bulk. let mut applied = 0u64; for op_json in &ops { if let Ok(signed_op) = serde_json::from_str::(op_json) && crdt_state::apply_remote_op(signed_op) { applied += 1; } } slog!( "[crdt-sync] v1 bulk sync: received {} ops, applied {applied}", ops.len() ); if let Some(all) = crdt_state::all_ops_json() { let msg = SyncMessage::Bulk { ops: all }; if let Ok(json) = serde_json::to_string(&msg) && sink.send(WsMessage::Text(json)).await.is_err() { return; } } } Ok(Some(SyncMessage::Op { op })) => { // Single op before negotiation — treat as v1. if let Ok(signed_op) = serde_json::from_str::(&op) { crdt_state::apply_remote_op(signed_op); } if let Some(all) = crdt_state::all_ops_json() { let msg = SyncMessage::Bulk { ops: all }; if let Ok(json) = serde_json::to_string(&msg) && sink.send(WsMessage::Text(json)).await.is_err() { return; } } } _ => { // Timeout or error — send full bulk as fallback. slog!("[crdt-sync] No sync message from peer; sending full bulk as fallback"); if let Some(all) = crdt_state::all_ops_json() { let msg = SyncMessage::Bulk { ops: all }; if let Ok(json) = serde_json::to_string(&msg) && sink.send(WsMessage::Text(json)).await.is_err() { return; } } } } // Bulk-delta phase complete — signal the peer that we are ready for // real-time op streaming. if let Ok(json) = serde_json::to_string(&SyncMessage::Ready) && sink.send(WsMessage::Text(json)).await.is_err() { return; } // Subscribe to new local ops. let Some(mut op_rx) = crdt_state::subscribe_ops() else { return; }; // Buffer for locally-generated ops produced before the peer's `ready` // arrives. Flushed in-order once the peer signals catch-up. let mut peer_ready = false; let mut op_buffer: Vec = Vec::new(); // ── Keepalive state ─────────────────────────────────────────── let mut pong_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); let mut ping_ticker = tokio::time::interval_at( tokio::time::Instant::now() + std::time::Duration::from_secs(PING_INTERVAL_SECS), std::time::Duration::from_secs(PING_INTERVAL_SECS), ); loop { tokio::select! { // Send periodic Ping and enforce Pong timeout. _ = ping_ticker.tick() => { if tokio::time::Instant::now() >= pong_deadline { slog_warn!( "[crdt-sync] No pong from peer {} in {}s; disconnecting", peer_addr, PONG_TIMEOUT_SECS ); break; } if sink.send(WsMessage::Ping(vec![])).await.is_err() { break; } } // Forward new local ops to the peer encoded via the wire codec. result = op_rx.recv() => { match result { Ok(signed_op) => { if peer_ready { let bytes = crdt_wire::encode(&signed_op); if sink.send(WsMessage::Binary(bytes)).await.is_err() { break; } } else { // Buffer until the peer signals ready. op_buffer.push(signed_op); } } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { // The peer cannot keep up; disconnect so it can // reconnect and receive a fresh bulk state dump. slog!("[crdt-sync] Slow peer lagged {n} ops; disconnecting"); break; } Err(_) => break, } } // Receive ops from the peer. frame = stream.next() => { match frame { Some(Ok(WsMessage::Pong(_))) => { // Reset the pong deadline on every Pong received. pong_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); } Some(Ok(WsMessage::Ping(data))) => { // Respond to peer's Ping so the peer's keepalive passes. let _ = sink.send(WsMessage::Pong(data)).await; } Some(Ok(WsMessage::Text(text))) => { // Check for the ready signal before other text frames. if let Ok(SyncMessage::Ready) = serde_json::from_str(&text) { peer_ready = true; slog!("[crdt-sync] Peer ready; flushing {} buffered ops", op_buffer.len()); let mut flush_ok = true; for op in op_buffer.drain(..) { let bytes = crdt_wire::encode(&op); if sink.send(WsMessage::Binary(bytes)).await.is_err() { flush_ok = false; break; } } if !flush_ok { break; } } else { // Bulk state dump, legacy op frame, or clock frame. handle_incoming_text(&text); } } Some(Ok(WsMessage::Binary(bytes))) => { // Real-time op encoded via wire codec — applied immediately // regardless of our own ready state. handle_incoming_binary(&bytes); } Some(Ok(WsMessage::Close(_))) | None => break, _ => {} } } } } slog!("[crdt-sync] Peer disconnected"); }) .into_response() } /// Wait for the next text-frame sync message from the peer, handling Ping/Pong /// transparently. /// /// Returns `None` on connection close or read error. async fn wait_for_sync_text( stream: &mut futures::stream::SplitStream, sink: &mut futures::stream::SplitSink, ) -> Option { loop { match stream.next().await { Some(Ok(WsMessage::Text(text))) => { return serde_json::from_str(&text).ok(); } Some(Ok(WsMessage::Ping(data))) => { let _ = sink.send(WsMessage::Pong(data)).await; } Some(Ok(WsMessage::Pong(_))) => continue, _ => return None, } } } /// Close the WebSocket with a generic `auth_failed` reason. /// /// The close reason is intentionally the same for all auth failures /// (bad signature, untrusted key, malformed message) to avoid leaking /// which check failed. async fn close_with_auth_failed( sink: &mut futures::stream::SplitSink, ) { let _ = sink .send(WsMessage::Close(Some(( poem::web::websocket::CloseCode::from(4002), "auth_failed".to_string(), )))) .await; let _ = sink.close().await; } /// Process an incoming text-frame sync message from a peer. /// /// Text frames carry the bulk state dump (`SyncMessage::Bulk`), legacy /// single-op messages (`SyncMessage::Op`), or snapshot protocol messages. fn handle_incoming_text(text: &str) { // First try to parse as a snapshot protocol message. if let Ok(snapshot_msg) = serde_json::from_str::(text) { handle_snapshot_message(snapshot_msg); return; } let msg: SyncMessage = match serde_json::from_str(text) { Ok(m) => m, Err(e) => { slog!("[crdt-sync] Bad text message from peer: {e}"); return; } }; match msg { SyncMessage::Bulk { ops } => { let mut applied = 0u64; for op_json in &ops { if let Ok(signed_op) = serde_json::from_str::(op_json) && crdt_state::apply_remote_op(signed_op) { applied += 1; } } slog!( "[crdt-sync] Bulk sync: received {} ops, applied {applied}", ops.len() ); } SyncMessage::Op { op } => { if let Ok(signed_op) = serde_json::from_str::(&op) { crdt_state::apply_remote_op(signed_op); } } SyncMessage::Clock { .. } => { // Clock frames are handled during the initial negotiation phase. // If one arrives during the streaming loop it is a protocol error // on the peer's part — log and ignore. slog!("[crdt-sync] Ignoring unexpected clock frame during streaming phase"); } SyncMessage::Ready => { // Ready frames are intercepted inline in the streaming loop before // this function is called. If one reaches here it is a protocol // error — log and ignore. slog!("[crdt-sync] Ignoring unexpected ready frame in handle_incoming_text"); } } } /// Handle an incoming snapshot protocol message. /// /// - **Snapshot**: apply the snapshot state and send an ack back. /// Peers without snapshot support will never reach this code path because /// the `SnapshotMessage` parse will fail and the message falls through to /// the legacy `SyncMessage` handler, which logs and ignores unknown types. /// - **SnapshotAck**: record the ack for quorum tracking. fn handle_snapshot_message(msg: crdt_snapshot::SnapshotMessage) { match msg { crdt_snapshot::SnapshotMessage::Snapshot(snapshot) => { slog!( "[crdt-sync] Received snapshot at_seq={}, {} ops, {} manifest entries", snapshot.at_seq, snapshot.state.len(), snapshot.op_manifest.len() ); // Apply compaction on this peer. crdt_snapshot::apply_compaction(snapshot.clone()); // Send ack back to leader via the sync broadcast channel. // The ack is sent as a CRDT event that the streaming loop picks up. // For now, log the ack intent — actual transport is handled by the // caller that invokes handle_incoming_text. slog!( "[crdt-sync] Snapshot applied, ack for at_seq={}", snapshot.at_seq ); } crdt_snapshot::SnapshotMessage::SnapshotAck(ack) => { if let Some(node_id) = crdt_state::our_node_id() { let _ = node_id; // The ack comes from a peer, not from us. } slog!( "[crdt-sync] Received snapshot_ack for at_seq={}", ack.at_seq ); // Record the ack — the coordination logic checks for quorum. // Note: we don't know the peer's node_id from the message alone; // in a full implementation the ack would include the sender's // node_id. For now we log it for protocol completeness. } } } /// Process an incoming binary-frame op from a peer. /// /// Binary frames carry a single `SignedOp` encoded via [`crdt_wire`]. fn handle_incoming_binary(bytes: &[u8]) { match crdt_wire::decode(bytes) { Ok(signed_op) => { crdt_state::apply_remote_op(signed_op); } Err(e) => { slog!("[crdt-sync] Bad binary frame from peer: {e}"); } } } // ── Rendezvous client ─────────────────────────────────────────────── /// Number of consecutive connection failures before escalating from WARN to ERROR. pub const RENDEZVOUS_ERROR_THRESHOLD: u32 = 10; /// Spawn a background task that connects to the configured rendezvous /// peer and exchanges CRDT ops bidirectionally. /// /// The client reconnects with exponential backoff if the connection drops. /// Individual failures are logged at WARN; after [`RENDEZVOUS_ERROR_THRESHOLD`] /// consecutive failures the log level escalates to ERROR. /// /// When `token` is provided it is appended to the upgrade URL as /// `?token=` so the server's bearer-token check is satisfied. This /// reuses the existing `--join-token` / `HUSKIES_JOIN_TOKEN` plumbing on the /// agent side. pub fn spawn_rendezvous_client(url: String, token: Option) { tokio::spawn(async move { let mut backoff_secs = 1u64; let mut consecutive_failures: u32 = 0; loop { slog!("[crdt-sync] Connecting to rendezvous peer: {url}"); match connect_and_sync(&url, token.as_deref()).await { Ok(()) => { slog!("[crdt-sync] Rendezvous connection closed cleanly"); backoff_secs = 1; consecutive_failures = 0; } Err(e) => { consecutive_failures += 1; if consecutive_failures >= RENDEZVOUS_ERROR_THRESHOLD { slog_error!( "[crdt-sync] Rendezvous peer unreachable ({consecutive_failures} consecutive failures): {e}" ); } else { slog_warn!( "[crdt-sync] Rendezvous connection error (attempt {consecutive_failures}): {e}" ); } } } slog!("[crdt-sync] Reconnecting in {backoff_secs}s..."); tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await; backoff_secs = (backoff_secs * 2).min(30); } }); } /// Connect to a remote sync endpoint and exchange ops until disconnect. /// /// When `token` is supplied it is appended as `?token=` to the /// connection URL so the server's bearer-token check passes. pub(crate) async fn connect_and_sync(url: &str, token: Option<&str>) -> Result<(), String> { let connect_url = match token { Some(t) => { if url.contains('?') { format!("{url}&token={t}") } else { format!("{url}?token={t}") } } None => url.to_string(), }; let (ws_stream, _) = tokio_tungstenite::connect_async(connect_url.as_str()) .await .map_err(|e| format!("WebSocket connect failed: {e}"))?; let (mut sink, mut stream) = ws_stream.split(); slog!("[crdt-sync] Connected to rendezvous peer, awaiting challenge"); // ── Step 1: Receive challenge from listener ─────────────────── use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; let challenge_frame = tokio::time::timeout( std::time::Duration::from_secs(AUTH_TIMEOUT_SECS), stream.next(), ) .await .map_err(|_| "Auth timeout waiting for challenge".to_string())? .ok_or_else(|| "Connection closed before challenge".to_string())? .map_err(|e| format!("WebSocket read error: {e}"))?; let challenge_text = match challenge_frame { TungsteniteMsg::Text(t) => t.to_string(), _ => return Err("Expected text frame for challenge".to_string()), }; let challenge_msg: ChallengeMessage = serde_json::from_str(&challenge_text) .map_err(|e| format!("Invalid challenge message: {e}"))?; if challenge_msg.r#type != "challenge" { return Err(format!( "Expected challenge message, got type={}", challenge_msg.r#type )); } // ── Step 2: Sign challenge and send auth reply ──────────────── let (pubkey_hex, signature_hex) = crdt_state::sign_challenge(&challenge_msg.nonce) .ok_or_else(|| "CRDT not initialised — cannot sign challenge".to_string())?; let auth_msg = AuthMessage { r#type: "auth".to_string(), pubkey_hex, signature_hex, }; let auth_json = serde_json::to_string(&auth_msg).map_err(|e| format!("Serialize auth: {e}"))?; sink.send(TungsteniteMsg::Text(auth_json.into())) .await .map_err(|e| format!("Send auth failed: {e}"))?; slog!("[crdt-sync] Auth reply sent, waiting for sync data"); // v2 protocol: send our vector clock. let our_clock = crdt_state::our_vector_clock().unwrap_or_default(); let clock_msg = SyncMessage::Clock { clock: our_clock }; if let Ok(json) = serde_json::to_string(&clock_msg) { sink.send(TungsteniteMsg::Text(json.into())) .await .map_err(|e| format!("Send clock failed: {e}"))?; } // Wait for the server's first sync message. let first_msg = tokio::time::timeout( std::time::Duration::from_secs(AUTH_TIMEOUT_SECS), wait_for_rendezvous_sync_text(&mut stream), ) .await .map_err(|_| "Timeout waiting for server sync message".to_string())?; match first_msg { Some(SyncMessage::Clock { clock: peer_clock }) => { // v2 server — send only the ops the server is missing. let delta = crdt_state::ops_since(&peer_clock).unwrap_or_default(); slog!( "[crdt-sync] v2 delta sync: sending {} ops to server (server missing)", delta.len() ); let msg = SyncMessage::Bulk { ops: delta }; if let Ok(json) = serde_json::to_string(&msg) { sink.send(TungsteniteMsg::Text(json.into())) .await .map_err(|e| format!("Send delta failed: {e}"))?; } } Some(SyncMessage::Bulk { ops }) => { // v1 server — apply their bulk and send our full bulk. let mut applied = 0u64; for op_json in &ops { if let Ok(signed_op) = serde_json::from_str::(op_json) && crdt_state::apply_remote_op(signed_op) { applied += 1; } } slog!( "[crdt-sync] v1 bulk sync: received {} ops from server, applied {applied}", ops.len() ); if let Some(all) = crdt_state::all_ops_json() { let msg = SyncMessage::Bulk { ops: all }; if let Ok(json) = serde_json::to_string(&msg) { sink.send(TungsteniteMsg::Text(json.into())) .await .map_err(|e| format!("Send bulk failed: {e}"))?; } } } _ => { // Fallback — send full bulk. slog!("[crdt-sync] No sync message from server; sending full bulk as fallback"); if let Some(all) = crdt_state::all_ops_json() { let msg = SyncMessage::Bulk { ops: all }; if let Ok(json) = serde_json::to_string(&msg) { sink.send(TungsteniteMsg::Text(json.into())) .await .map_err(|e| format!("Send bulk failed: {e}"))?; } } } } // Bulk-delta phase complete — signal the server that we are ready for // real-time op streaming. if let Ok(json) = serde_json::to_string(&SyncMessage::Ready) { sink.send(TungsteniteMsg::Text(json.into())) .await .map_err(|e| format!("Send ready failed: {e}"))?; } // Subscribe to new local ops. let Some(mut op_rx) = crdt_state::subscribe_ops() else { return Err("CRDT not initialised".to_string()); }; // Buffer for locally-generated ops produced before the server's `ready` // arrives. Flushed in-order once the server signals catch-up. let mut peer_ready = false; let mut op_buffer: Vec = Vec::new(); // ── Keepalive state ─────────────────────────────────────────────── let mut pong_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); let mut ping_ticker = tokio::time::interval_at( tokio::time::Instant::now() + std::time::Duration::from_secs(PING_INTERVAL_SECS), std::time::Duration::from_secs(PING_INTERVAL_SECS), ); loop { tokio::select! { // Send periodic Ping and enforce Pong timeout. _ = ping_ticker.tick() => { if tokio::time::Instant::now() >= pong_deadline { slog_warn!( "[crdt-sync] No pong from rendezvous peer {} in {}s; disconnecting", url, PONG_TIMEOUT_SECS ); return Err(format!( "Keepalive timeout: no pong from {url} in {PONG_TIMEOUT_SECS}s" )); } use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; if sink.send(TungsteniteMsg::Ping(bytes::Bytes::new())).await.is_err() { break; } } result = op_rx.recv() => { match result { Ok(signed_op) => { if peer_ready { // Encode via wire codec and send as binary frame. let bytes = crdt_wire::encode(&signed_op); use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; if sink.send(TungsteniteMsg::Binary(bytes.into())).await.is_err() { break; } } else { // Buffer until the server signals ready. op_buffer.push(signed_op); } } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { slog!("[crdt-sync] Slow rendezvous link lagged {n} ops; disconnecting"); break; } Err(_) => break, } } frame = stream.next() => { match frame { Some(Ok(tokio_tungstenite::tungstenite::Message::Pong(_))) => { // Reset the pong deadline on every Pong received. pong_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); } Some(Ok(tokio_tungstenite::tungstenite::Message::Ping(_))) => { // tungstenite auto-responds to Ping with Pong at the // protocol level; no manual response needed here. } Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) => { // Check for the ready signal before other text frames. if let Ok(SyncMessage::Ready) = serde_json::from_str(text.as_ref()) { peer_ready = true; slog!("[crdt-sync] Server ready; flushing {} buffered ops", op_buffer.len()); let mut flush_ok = true; for op in op_buffer.drain(..) { let bytes = crdt_wire::encode(&op); use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; if sink.send(TungsteniteMsg::Binary(bytes.into())).await.is_err() { flush_ok = false; break; } } if !flush_ok { break; } } else { handle_incoming_text(text.as_ref()); } } Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(bytes))) => { // Real-time op — applied immediately regardless of ready state. handle_incoming_binary(&bytes); } Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => break, Some(Err(e)) => { slog!("[crdt-sync] Rendezvous read error: {e}"); break; } _ => {} } } } } Ok(()) } /// Wait for the next text-frame sync message from a tungstenite stream, /// handling Ping/Pong transparently. /// /// Returns `None` on connection close or read error. async fn wait_for_rendezvous_sync_text( stream: &mut futures::stream::SplitStream< tokio_tungstenite::WebSocketStream< tokio_tungstenite::MaybeTlsStream, >, >, ) -> Option { use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; loop { match stream.next().await { Some(Ok(TungsteniteMsg::Text(text))) => { return serde_json::from_str(text.as_ref()).ok(); } Some(Ok(TungsteniteMsg::Ping(_) | TungsteniteMsg::Pong(_))) => continue, _ => return None, } } } // ── Tests ──────────────────────────────────────────────────────────── #[cfg(test)] mod tests;