//! Rendezvous client: connect to a remote peer, authenticate, and exchange CRDT ops. use bft_json_crdt::json_crdt::SignedOp; use futures::{SinkExt, StreamExt}; use crate::crdt_state; use crate::crdt_wire; use crate::slog; use crate::slog_error; use crate::slog_warn; use super::auth; use super::dispatch::{handle_incoming_binary, handle_incoming_text}; use super::rpc::try_handle_rpc_text; use super::wire::{AuthMessage, ChallengeMessage, HelloMessage, ServerAuthMessage, SyncMessage}; use super::{AUTH_TIMEOUT_SECS, PING_INTERVAL_SECS, PONG_TIMEOUT_SECS}; #[allow(unused_imports)] use auth::{add_join_token, init_token_auth}; // needed by tests // ── Rendezvous client ─────────────────────────────────────────────── /// Number of consecutive connection failures before escalating from WARN to ERROR. pub const RENDEZVOUS_ERROR_THRESHOLD: u32 = 10; /// Spawn a background task that connects to the configured rendezvous /// peer and exchanges CRDT ops bidirectionally. /// /// The client reconnects with exponential backoff if the connection drops. /// Individual failures are logged at WARN; after [`RENDEZVOUS_ERROR_THRESHOLD`] /// consecutive failures the log level escalates to ERROR. /// /// When `token` is provided it is appended to the upgrade URL as /// `?token=` so the server's bearer-token check is satisfied. This /// reuses the existing `--join-token` / `HUSKIES_JOIN_TOKEN` plumbing on the /// agent side. pub fn spawn_rendezvous_client(url: String, token: Option) { tokio::spawn(async move { let mut backoff_secs = 1u64; let mut consecutive_failures: u32 = 0; loop { slog!("[crdt-sync] Connecting to rendezvous peer: {url}"); match connect_and_sync(&url, token.as_deref()).await { Ok(()) => { slog!("[crdt-sync] Rendezvous connection closed cleanly"); backoff_secs = 1; consecutive_failures = 0; } Err(e) => { consecutive_failures += 1; if consecutive_failures >= RENDEZVOUS_ERROR_THRESHOLD { slog_error!( "[crdt-sync] Rendezvous peer unreachable ({consecutive_failures} consecutive failures): {e}" ); } else { slog_warn!( "[crdt-sync] Rendezvous connection error (attempt {consecutive_failures}): {e}" ); } } } slog!("[crdt-sync] Reconnecting in {backoff_secs}s..."); tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await; backoff_secs = (backoff_secs * 2).min(30); } }); } /// Connect to a remote sync endpoint and exchange ops until disconnect. /// /// When `token` is supplied it is appended as `?token=` to the /// connection URL so the server's bearer-token check passes. pub(crate) async fn connect_and_sync(url: &str, token: Option<&str>) -> Result<(), String> { let connect_url = match token { Some(t) => { if url.contains('?') { format!("{url}&token={t}") } else { format!("{url}?token={t}") } } None => url.to_string(), }; let (ws_stream, _) = tokio_tungstenite::connect_async(connect_url.as_str()) .await .map_err(|e| format!("WebSocket connect failed: {e}"))?; let (mut sink, mut stream) = ws_stream.split(); slog!("[crdt-sync] Connected to rendezvous peer, starting mutual-auth handshake"); use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; // ── Step 1: Send hello with a fresh client nonce ────────────── let client_nonce = crate::node_identity::generate_challenge(); let hello = HelloMessage { r#type: "hello".to_string(), nonce: client_nonce.clone(), }; let hello_json = serde_json::to_string(&hello).map_err(|e| format!("Serialize hello: {e}"))?; sink.send(TungsteniteMsg::Text(hello_json.into())) .await .map_err(|e| format!("Send hello failed: {e}"))?; slog!("[crdt-sync] Hello sent, awaiting server_auth"); // ── Step 2: Receive server_auth from the responding node ────── let server_auth_frame = tokio::time::timeout( std::time::Duration::from_secs(AUTH_TIMEOUT_SECS), stream.next(), ) .await .map_err(|_| "Auth timeout waiting for server_auth".to_string())? .ok_or_else(|| "Connection closed before server_auth".to_string())? .map_err(|e| format!("WebSocket read error: {e}"))?; let server_auth_text = match server_auth_frame { TungsteniteMsg::Text(t) => t.to_string(), _ => return Err("Expected text frame for server_auth".to_string()), }; let server_auth: ServerAuthMessage = serde_json::from_str(&server_auth_text) .map_err(|e| format!("Invalid server_auth message: {e}"))?; if server_auth.r#type != "server_auth" { return Err(format!( "Expected server_auth message, got type={}", server_auth.r#type )); } // ── Step 3: Verify server's signature and check trusted-key list ─ let versioned_challenge = format!("huskies-v1:{client_nonce}"); let server_sig_valid = crate::node_identity::verify_message_strict( &server_auth.pubkey_hex, versioned_challenge.as_bytes(), &server_auth.signature_hex, ); let server_key_trusted = auth::trusted_keys() .iter() .any(|k| k == &server_auth.pubkey_hex); if !server_sig_valid || !server_key_trusted { slog!( "[crdt-sync] Server auth failed \ (sig_valid={server_sig_valid}, key_trusted={server_key_trusted}, \ server_pubkey={})", server_auth.pubkey_hex ); return Err(format!( "Server auth rejected: sig_valid={server_sig_valid}, \ key_trusted={server_key_trusted}, server_pubkey={}", server_auth.pubkey_hex )); } slog!( "[crdt-sync] Server authenticated: {:.12}…", &server_auth.pubkey_hex ); // ── Step 4: Receive challenge from the responding node ──────── let challenge_frame = tokio::time::timeout( std::time::Duration::from_secs(AUTH_TIMEOUT_SECS), stream.next(), ) .await .map_err(|_| "Auth timeout waiting for challenge".to_string())? .ok_or_else(|| "Connection closed before challenge".to_string())? .map_err(|e| format!("WebSocket read error: {e}"))?; let challenge_text = match challenge_frame { TungsteniteMsg::Text(t) => t.to_string(), _ => return Err("Expected text frame for challenge".to_string()), }; let challenge_msg: ChallengeMessage = serde_json::from_str(&challenge_text) .map_err(|e| format!("Invalid challenge message: {e}"))?; if challenge_msg.r#type != "challenge" { return Err(format!( "Expected challenge message, got type={}", challenge_msg.r#type )); } // ── Step 5: Sign challenge and send auth reply ──────────────── let (pubkey_hex, signature_hex) = crdt_state::sign_challenge(&challenge_msg.nonce) .ok_or_else(|| "CRDT not initialised — cannot sign challenge".to_string())?; let auth_msg = AuthMessage { r#type: "auth".to_string(), pubkey_hex, signature_hex, }; let auth_json = serde_json::to_string(&auth_msg).map_err(|e| format!("Serialize auth: {e}"))?; sink.send(TungsteniteMsg::Text(auth_json.into())) .await .map_err(|e| format!("Send auth failed: {e}"))?; slog!("[crdt-sync] Auth reply sent, waiting for sync data"); // v2 protocol: send our vector clock. let our_clock = crdt_state::our_vector_clock().unwrap_or_default(); let clock_msg = SyncMessage::Clock { clock: our_clock }; if let Ok(json) = serde_json::to_string(&clock_msg) { sink.send(TungsteniteMsg::Text(json.into())) .await .map_err(|e| format!("Send clock failed: {e}"))?; } // Wait for the server's first sync message. let first_msg = tokio::time::timeout( std::time::Duration::from_secs(AUTH_TIMEOUT_SECS), wait_for_rendezvous_sync_text(&mut stream), ) .await .map_err(|_| "Timeout waiting for server sync message".to_string())?; match first_msg { Some(SyncMessage::Clock { clock: peer_clock }) => { // v2 server — send only the ops the server is missing. let delta = crdt_state::ops_since(&peer_clock).unwrap_or_default(); slog!( "[crdt-sync] v2 delta sync: sending {} ops to server (server missing)", delta.len() ); let msg = SyncMessage::Bulk { ops: delta }; if let Ok(json) = serde_json::to_string(&msg) { sink.send(TungsteniteMsg::Text(json.into())) .await .map_err(|e| format!("Send delta failed: {e}"))?; } } Some(SyncMessage::Bulk { ops }) => { // v1 server — apply their bulk and send our full bulk. let mut applied = 0u64; for op_json in &ops { if let Ok(signed_op) = serde_json::from_str::(op_json) && crdt_state::apply_remote_op(signed_op) { applied += 1; } } slog!( "[crdt-sync] v1 bulk sync: received {} ops from server, applied {applied}", ops.len() ); if let Some(all) = crdt_state::all_ops_json() { let msg = SyncMessage::Bulk { ops: all }; if let Ok(json) = serde_json::to_string(&msg) { sink.send(TungsteniteMsg::Text(json.into())) .await .map_err(|e| format!("Send bulk failed: {e}"))?; } } } _ => { // Fallback — send full bulk. slog!("[crdt-sync] No sync message from server; sending full bulk as fallback"); if let Some(all) = crdt_state::all_ops_json() { let msg = SyncMessage::Bulk { ops: all }; if let Ok(json) = serde_json::to_string(&msg) { sink.send(TungsteniteMsg::Text(json.into())) .await .map_err(|e| format!("Send bulk failed: {e}"))?; } } } } // Bulk-delta phase complete — signal the server that we are ready for // real-time op streaming. if let Ok(json) = serde_json::to_string(&SyncMessage::Ready) { sink.send(TungsteniteMsg::Text(json.into())) .await .map_err(|e| format!("Send ready failed: {e}"))?; } // Subscribe to new local ops. let Some(mut op_rx) = crdt_state::subscribe_ops() else { return Err("CRDT not initialised".to_string()); }; // Buffer for locally-generated ops produced before the server's `ready` // arrives. Flushed in-order once the server signals catch-up. let mut peer_ready = false; let mut op_buffer: Vec = Vec::new(); // ── Keepalive state ─────────────────────────────────────────────── let mut pong_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); let mut ping_ticker = tokio::time::interval_at( tokio::time::Instant::now() + std::time::Duration::from_secs(PING_INTERVAL_SECS), std::time::Duration::from_secs(PING_INTERVAL_SECS), ); loop { tokio::select! { // Send periodic Ping and enforce Pong timeout. _ = ping_ticker.tick() => { if tokio::time::Instant::now() >= pong_deadline { slog_warn!( "[crdt-sync] No pong from rendezvous peer {} in {}s; disconnecting", url, PONG_TIMEOUT_SECS ); return Err(format!( "Keepalive timeout: no pong from {url} in {PONG_TIMEOUT_SECS}s" )); } use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; if sink.send(TungsteniteMsg::Ping(bytes::Bytes::new())).await.is_err() { break; } } result = op_rx.recv() => { match result { Ok(signed_op) => { if peer_ready { // Encode via wire codec and send as binary frame. let bytes = crdt_wire::encode(&signed_op); use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; if sink.send(TungsteniteMsg::Binary(bytes.into())).await.is_err() { break; } } else { // Buffer until the server signals ready. op_buffer.push(signed_op); } } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { slog!("[crdt-sync] Slow rendezvous link lagged {n} ops; disconnecting"); break; } Err(_) => break, } } frame = stream.next() => { match frame { Some(Ok(tokio_tungstenite::tungstenite::Message::Pong(_))) => { // Reset the pong deadline on every Pong received. pong_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); } Some(Ok(tokio_tungstenite::tungstenite::Message::Ping(_))) => { // tungstenite auto-responds to Ping with Pong at the // protocol level; no manual response needed here. } Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) => { // Check for the ready signal before other text frames. if let Ok(SyncMessage::Ready) = serde_json::from_str(text.as_ref()) { peer_ready = true; slog!("[crdt-sync] Server ready; flushing {} buffered ops", op_buffer.len()); let mut flush_ok = true; for op in op_buffer.drain(..) { let bytes = crdt_wire::encode(&op); use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; if sink.send(TungsteniteMsg::Binary(bytes.into())).await.is_err() { flush_ok = false; break; } } if !flush_ok { break; } } else if let Some(rpc_resp) = try_handle_rpc_text(text.as_ref()).await { // RPC request from the peer — dispatch and reply. use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; if let Ok(json) = serde_json::to_string(&rpc_resp) && sink.send(TungsteniteMsg::Text(json.into())).await.is_err() { break; } } else { handle_incoming_text(text.as_ref()); } } Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(bytes))) => { // Real-time op — applied immediately regardless of ready state. handle_incoming_binary(&bytes); } Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => break, Some(Err(e)) => { slog!("[crdt-sync] Rendezvous read error: {e}"); break; } _ => {} } } } } Ok(()) } /// Wait for the next text-frame sync message from a tungstenite stream, /// handling Ping/Pong transparently. /// /// Returns `None` on connection close or read error. async fn wait_for_rendezvous_sync_text( stream: &mut futures::stream::SplitStream< tokio_tungstenite::WebSocketStream< tokio_tungstenite::MaybeTlsStream, >, >, ) -> Option { use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; loop { match stream.next().await { Some(Ok(TungsteniteMsg::Text(text))) => { return serde_json::from_str(text.as_ref()).ok(); } Some(Ok(TungsteniteMsg::Ping(_) | TungsteniteMsg::Pong(_))) => continue, _ => return None, } } } // ── Tests ──────────────────────────────────────────────────────────── #[cfg(test)] mod tests { #[allow(unused_imports)] use super::*; #[test] fn config_rendezvous_parsed_from_toml() { let toml_str = r#" rendezvous = "ws://remote:3001/crdt-sync" [[agent]] name = "test" "#; let config: crate::config::ProjectConfig = toml::from_str(toml_str).unwrap(); assert_eq!( config.rendezvous.as_deref(), Some("ws://remote:3001/crdt-sync") ); } #[test] fn config_rendezvous_defaults_to_none() { let config = crate::config::ProjectConfig::default(); assert!(config.rendezvous.is_none()); } #[test] fn failure_counter_warn_below_threshold() { let threshold = RENDEZVOUS_ERROR_THRESHOLD; let mut consecutive_failures: u32 = 0; // First threshold-1 failures are below the ERROR threshold. for _ in 0..(threshold - 1) { consecutive_failures += 1; assert!( consecutive_failures < threshold, "failure {consecutive_failures} must be below ERROR threshold {threshold}" ); } } #[test] fn failure_counter_error_at_threshold() { let threshold = RENDEZVOUS_ERROR_THRESHOLD; let consecutive_failures: u32 = threshold; assert!( consecutive_failures >= threshold, "failure {consecutive_failures} must reach or exceed ERROR threshold {threshold}" ); } #[test] fn failure_counter_resets_on_success() { let threshold = RENDEZVOUS_ERROR_THRESHOLD; // Simulate sustained failure. let mut consecutive_failures: u32 = threshold + 5; assert!(consecutive_failures >= threshold); // Simulate a clean reconnect. consecutive_failures = 0; assert_eq!( consecutive_failures, 0, "counter must reset to 0 on success" ); // Next error is attempt 1 — well below the ERROR threshold. consecutive_failures += 1; assert!( consecutive_failures < threshold, "first failure after reset must be below ERROR threshold" ); } #[test] fn error_threshold_is_ten() { assert_eq!( RENDEZVOUS_ERROR_THRESHOLD, 10, "ERROR escalation threshold must be 10 consecutive failures" ); } #[test] fn rendezvous_url_with_token_appended() { let base = "ws://host:3001/crdt-sync"; let token = "my-secret-token"; let url_with_token = if base.contains('?') { format!("{base}&token={token}") } else { format!("{base}?token={token}") }; assert_eq!( url_with_token, "ws://host:3001/crdt-sync?token=my-secret-token" ); // With existing query params. let base_with_query = "ws://host:3001/crdt-sync?foo=bar"; let url_appended = if base_with_query.contains('?') { format!("{base_with_query}&token={token}") } else { format!("{base_with_query}?token={token}") }; assert_eq!( url_appended, "ws://host:3001/crdt-sync?foo=bar&token=my-secret-token" ); } #[test] fn rendezvous_url_without_token_unchanged() { let base = "ws://host:3001/crdt-sync"; let token: Option<&str> = None; let connect_url = match token { Some(t) => format!("{base}?token={t}"), None => base.to_string(), }; assert_eq!(connect_url, base); } }