From 69930fb29f7eea2f3e4e3aeaf89facf458e87ba6 Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 29 Apr 2026 12:01:07 +0000 Subject: [PATCH] huskies: merge 837 --- server/src/crdt_sync/handshake.rs | 918 ------------------------ server/src/crdt_sync/handshake/mod.rs | 185 +++++ server/src/crdt_sync/handshake/tests.rs | 734 +++++++++++++++++++ 3 files changed, 919 insertions(+), 918 deletions(-) delete mode 100644 server/src/crdt_sync/handshake.rs create mode 100644 server/src/crdt_sync/handshake/mod.rs create mode 100644 server/src/crdt_sync/handshake/tests.rs diff --git a/server/src/crdt_sync/handshake.rs b/server/src/crdt_sync/handshake.rs deleted file mode 100644 index a0eba4b1..00000000 --- a/server/src/crdt_sync/handshake.rs +++ /dev/null @@ -1,918 +0,0 @@ -//! Auth handshake for the server-side `/crdt-sync` WebSocket. -//! -//! # Extended mutual-auth handshake protocol -//! -//! ```text -//! Connecting peer (client): Responding node (server): -//! hello(nonce) ──────────────────► receive hello -//! ◄────────────────── server_auth(pubkey, sign("huskies-v1:{nonce}")) -//! verify server sig + trusted_keys -//! ◄────────────────── challenge(server_nonce) -//! auth(pubkey, sign(server_nonce)) ► -//! verify client sig + trusted_keys -//! ``` -//! -//! Both sides verify the peer's pubkey against `trusted_keys`. A peer whose -//! pubkey is absent from the allow-list is rejected with close code 4002. - -#![allow(unused_imports, dead_code)] -use futures::{SinkExt, StreamExt}; -use poem::web::websocket::Message as WsMessage; - -use crate::crdt_state; -use crate::node_identity; -use crate::slog; - -use super::AUTH_TIMEOUT_SECS; -use super::auth::trusted_keys; -use super::wire::{AuthMessage, ChallengeMessage, HelloMessage, ServerAuthMessage}; - -/// Perform the extended mutual-auth handshake for a freshly-upgraded WebSocket -/// connection. -/// -/// **Protocol (server/responding-node side):** -/// 1. Receive `hello` from the connecting peer (contains client nonce). -/// 2. Sign `"huskies-v1:{nonce}"` and send `server_auth` (this node's pubkey + -/// signature) back to the connecting peer. -/// 3. Send a fresh challenge nonce to the connecting peer. -/// 4. Wait up to [`AUTH_TIMEOUT_SECS`] for a signed `auth` reply. -/// 5. Verify the connecting peer's signature and check its pubkey against the -/// trusted-key allow-list. -/// -/// Returns `Some(AuthMessage)` on success. On failure the connection has -/// already been closed with the appropriate close code (`auth_timeout` or -/// `auth_failed`); the caller should simply return. -pub(super) async fn perform_auth_handshake( - sink: &mut futures::stream::SplitSink, - stream: &mut futures::stream::SplitStream, -) -> Option { - // ── Step 1: Receive hello from connecting peer ─────────────────── - let hello_result = tokio::time::timeout( - std::time::Duration::from_secs(AUTH_TIMEOUT_SECS), - stream.next(), - ) - .await; - - let hello_text = match hello_result { - Ok(Some(Ok(WsMessage::Text(text)))) => text, - Ok(_) | Err(_) => { - slog!("[crdt-sync] No hello from peer — closing"); - close_with_auth_failed(sink).await; - return None; - } - }; - - let hello: HelloMessage = match serde_json::from_str::(&hello_text) { - Ok(m) if m.r#type == "hello" => m, - _ => { - slog!("[crdt-sync] Invalid hello message from peer"); - close_with_auth_failed(sink).await; - return None; - } - }; - - // ── Step 2: Sign versioned challenge and send server_auth ──────── - let (server_pubkey_hex, server_sig_hex) = - match crdt_state::sign_versioned_challenge(&hello.nonce) { - Some(v) => v, - None => { - slog!("[crdt-sync] CRDT not initialised — cannot produce server_auth"); - close_with_auth_failed(sink).await; - return None; - } - }; - - let server_auth = ServerAuthMessage { - r#type: "server_auth".to_string(), - pubkey_hex: server_pubkey_hex, - signature_hex: server_sig_hex, - }; - let server_auth_json = serde_json::to_string(&server_auth).ok()?; - if sink.send(WsMessage::Text(server_auth_json)).await.is_err() { - return None; - } - - // ── Step 3: Send challenge nonce to connecting peer ────────────── - let challenge = node_identity::generate_challenge(); - let challenge_msg = ChallengeMessage { - r#type: "challenge".to_string(), - nonce: challenge.clone(), - }; - let challenge_json = serde_json::to_string(&challenge_msg).ok()?; - if sink.send(WsMessage::Text(challenge_json)).await.is_err() { - return None; - } - - // ── Step 4: Await signed auth reply from connecting peer ───────── - let auth_result = tokio::time::timeout( - std::time::Duration::from_secs(AUTH_TIMEOUT_SECS), - stream.next(), - ) - .await; - - let auth_text = match auth_result { - Ok(Some(Ok(WsMessage::Text(text)))) => text, - Ok(_) | Err(_) => { - slog!("[crdt-sync] Auth timeout or connection lost during handshake"); - let _ = sink - .send(WsMessage::Close(Some(( - poem::web::websocket::CloseCode::from(4001), - "auth_timeout".to_string(), - )))) - .await; - let _ = sink.close().await; - return None; - } - }; - - let auth_msg: AuthMessage = match serde_json::from_str(&auth_text) { - Ok(m) => m, - Err(_) => { - slog!("[crdt-sync] Invalid auth message from peer"); - close_with_auth_failed(sink).await; - return None; - } - }; - - // ── Step 5: Verify signature and check trusted-key allow-list ──── - let key_trusted = trusted_keys().iter().any(|k| k == &auth_msg.pubkey_hex); - if !key_trusted { - slog!( - "[crdt-sync] Auth rejected: peer pubkey not in trusted_keys: {}", - auth_msg.pubkey_hex - ); - close_with_auth_failed(sink).await; - return None; - } - - let sig_valid = - node_identity::verify_challenge(&auth_msg.pubkey_hex, &challenge, &auth_msg.signature_hex); - if !sig_valid { - slog!( - "[crdt-sync] Auth rejected: invalid signature from peer {:.12}…", - &auth_msg.pubkey_hex - ); - close_with_auth_failed(sink).await; - return None; - } - - slog!( - "[crdt-sync] Peer authenticated: {:.12}…", - &auth_msg.pubkey_hex - ); - - Some(auth_msg) -} - -/// Close the WebSocket with a generic `auth_failed` reason. -/// -/// The close reason is intentionally the same for all auth failures -/// (bad signature, untrusted key, malformed message) to avoid leaking -/// which check failed. -async fn close_with_auth_failed( - sink: &mut futures::stream::SplitSink, -) { - let _ = sink - .send(WsMessage::Close(Some(( - poem::web::websocket::CloseCode::from(4002), - "auth_failed".to_string(), - )))) - .await; - let _ = sink.close().await; -} - -#[cfg(test)] -mod tests { - use super::*; - use bft_json_crdt::keypair::make_keypair; - - // ── AuthListenerResult ─────────────────────────────────────────── - #[allow(dead_code)] - #[derive(Debug)] - enum AuthListenerResult { - Authenticated(String), - AuthFailed(String), - AuthTimeout, - ConnectionLost, - PeerClosedEarly(Option), - } - - /// Start a test server that implements the full extended mutual-auth handshake. - /// - /// The server: - /// 1. Receives `hello` from the connecting peer. - /// 2. Signs the versioned challenge with `listener_kp`. - /// 3. Sends `server_auth` back. - /// 4. Sends a challenge nonce to the connecting peer. - /// 5. Receives and verifies the connecting peer's `auth` reply. - /// 6. Checks the peer pubkey against `trusted_keys`. - /// 7. If auth passes, sends a bulk-sync message and reports success. - /// - /// Returns `(addr, listener_pubkey_hex, result_rx)`. - async fn start_auth_listener( - trusted_keys: Vec, - ) -> ( - std::net::SocketAddr, - String, - tokio::sync::oneshot::Receiver, - ) { - use tokio::net::TcpListener; - use tokio_tungstenite::accept_async; - - let listener_kp = make_keypair(); - let listener_pubkey = crate::node_identity::public_key_hex(&listener_kp); - - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - - let (result_tx, result_rx) = tokio::sync::oneshot::channel(); - - let listener_pubkey_clone = listener_pubkey.clone(); - tokio::spawn(async move { - let (tcp_stream, _) = listener.accept().await.unwrap(); - let ws_stream = accept_async(tcp_stream).await.unwrap(); - let (mut sink, mut stream) = ws_stream.split(); - - use tokio_tungstenite::tungstenite::Message as TMsg; - - // Step 1: Receive hello from connecting peer. - let hello_frame = - tokio::time::timeout(std::time::Duration::from_secs(10), stream.next()).await; - let hello_text = match hello_frame { - Ok(Some(Ok(TMsg::Text(t)))) => t.to_string(), - Ok(Some(Ok(TMsg::Close(reason)))) => { - let _ = result_tx.send(AuthListenerResult::PeerClosedEarly( - reason.map(|r| r.reason.to_string()), - )); - return; - } - _ => { - let _ = result_tx.send(AuthListenerResult::ConnectionLost); - return; - } - }; - let hello: HelloMessage = match serde_json::from_str::(&hello_text) { - Ok(m) if m.r#type == "hello" => m, - _ => { - let _ = result_tx.send(AuthListenerResult::AuthFailed("bad_hello".into())); - return; - } - }; - - // Step 2: Sign versioned challenge and send server_auth. - let versioned = format!("huskies-v1:{}", hello.nonce); - let server_sig = crate::node_identity::sign_challenge(&listener_kp, &versioned); - let server_auth = ServerAuthMessage { - r#type: "server_auth".to_string(), - pubkey_hex: listener_pubkey_clone.clone(), - signature_hex: server_sig, - }; - let server_auth_json = serde_json::to_string(&server_auth).unwrap(); - if sink - .send(TMsg::Text(server_auth_json.into())) - .await - .is_err() - { - let _ = result_tx.send(AuthListenerResult::ConnectionLost); - return; - } - - // Step 3: Send challenge. - let challenge = crate::node_identity::generate_challenge(); - let challenge_msg = ChallengeMessage { - r#type: "challenge".to_string(), - nonce: challenge.clone(), - }; - let challenge_json = serde_json::to_string(&challenge_msg).unwrap(); - if sink.send(TMsg::Text(challenge_json.into())).await.is_err() { - let _ = result_tx.send(AuthListenerResult::ConnectionLost); - return; - } - - // Step 4: Await auth reply. - let auth_frame = - tokio::time::timeout(std::time::Duration::from_secs(10), stream.next()).await; - let auth_text = match auth_frame { - Ok(Some(Ok(TMsg::Text(t)))) => t.to_string(), - Ok(Some(Ok(TMsg::Close(reason)))) => { - let _ = result_tx.send(AuthListenerResult::PeerClosedEarly( - reason.map(|r| r.reason.to_string()), - )); - return; - } - _ => { - let _ = sink - .send(TMsg::Close(Some( - tokio_tungstenite::tungstenite::protocol::CloseFrame { - code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::from(4001), - reason: "auth_timeout".into(), - }, - ))) - .await; - let _ = result_tx.send(AuthListenerResult::AuthTimeout); - return; - } - }; - - // Step 5: Verify. - let auth_msg: AuthMessage = match serde_json::from_str(&auth_text) { - Ok(m) => m, - Err(_) => { - let _ = close_listener_auth_failed(&mut sink).await; - let _ = result_tx.send(AuthListenerResult::AuthFailed("bad_json".into())); - return; - } - }; - - let sig_valid = crate::node_identity::verify_challenge( - &auth_msg.pubkey_hex, - &challenge, - &auth_msg.signature_hex, - ); - let key_trusted = trusted_keys.iter().any(|k| k == &auth_msg.pubkey_hex); - - if !sig_valid || !key_trusted { - let _ = close_listener_auth_failed(&mut sink).await; - let _ = result_tx.send(AuthListenerResult::AuthFailed(format!( - "sig_valid={sig_valid}, key_trusted={key_trusted}" - ))); - return; - } - - // Auth passed — send bulk state. - let kp = make_keypair(); - let mut crdt = - bft_json_crdt::json_crdt::BaseCrdt::::new(&kp); - let item: bft_json_crdt::json_crdt::JsonValue = serde_json::json!({ - "story_id": "628_auth_test_item", - "stage": "1_backlog", - "name": "Auth Test", - "agent": "", - "retry_count": 0.0, - "blocked": false, - "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, - }) - .into(); - let op = crdt - .doc - .items - .insert(bft_json_crdt::op::ROOT_ID, item) - .sign(&kp); - let op_json = serde_json::to_string(&op).unwrap(); - let bulk = crate::crdt_sync::wire::SyncMessage::Bulk { ops: vec![op_json] }; - let bulk_json = serde_json::to_string(&bulk).unwrap(); - let _ = sink.send(TMsg::Text(bulk_json.into())).await; - - let _ = result_tx.send(AuthListenerResult::Authenticated(auth_msg.pubkey_hex)); - }); - - (addr, listener_pubkey, result_rx) - } - - async fn close_listener_auth_failed( - sink: &mut futures::stream::SplitSink< - tokio_tungstenite::WebSocketStream, - tokio_tungstenite::tungstenite::Message, - >, - ) { - use tokio_tungstenite::tungstenite::protocol::CloseFrame; - use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; - let _ = sink - .send(tokio_tungstenite::tungstenite::Message::Close(Some( - CloseFrame { - code: CloseCode::from(4002), - reason: "auth_failed".into(), - }, - ))) - .await; - } - - // ── Helper: perform the connecting-peer side of the handshake ──── - - /// Drive the full extended client handshake over a tungstenite stream. - /// - /// Returns `Ok(connector_pubkey_hex)` on success, or an error string - /// describing the rejection. - async fn perform_client_handshake( - sink: &mut futures::stream::SplitSink< - tokio_tungstenite::WebSocketStream< - tokio_tungstenite::MaybeTlsStream, - >, - tokio_tungstenite::tungstenite::Message, - >, - stream: &mut futures::stream::SplitStream< - tokio_tungstenite::WebSocketStream< - tokio_tungstenite::MaybeTlsStream, - >, - >, - connector_kp: &bft_json_crdt::keypair::Ed25519KeyPair, - connector_trusted_keys: &[String], - ) -> Result { - use tokio_tungstenite::tungstenite::Message as TMsg; - - // Step 1: Send hello with fresh nonce. - let client_nonce = crate::node_identity::generate_challenge(); - let hello = HelloMessage { - r#type: "hello".to_string(), - nonce: client_nonce.clone(), - }; - sink.send(TMsg::Text(serde_json::to_string(&hello).unwrap().into())) - .await - .map_err(|e| format!("Send hello failed: {e}"))?; - - // Step 2: Receive server_auth. - let sa_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next()) - .await - .map_err(|_| "Timeout waiting for server_auth".to_string())? - .ok_or_else(|| "Connection closed before server_auth".to_string())? - .map_err(|e| format!("WS read error: {e}"))?; - - let sa_text = match sa_frame { - TMsg::Text(t) => t.to_string(), - TMsg::Close(f) => { - return Err(format!( - "auth_failed: {}", - f.map(|f| f.reason.to_string()) - .unwrap_or_else(|| "no reason".to_string()) - )); - } - other => return Err(format!("Unexpected frame: {other:?}")), - }; - - let server_auth: ServerAuthMessage = - serde_json::from_str(&sa_text).map_err(|e| format!("Invalid server_auth: {e}"))?; - - // Step 3: Verify server's signature over versioned challenge. - let versioned = format!("huskies-v1:{}", client_nonce); - let sig_valid = crate::node_identity::verify_message_strict( - &server_auth.pubkey_hex, - versioned.as_bytes(), - &server_auth.signature_hex, - ); - let key_trusted = connector_trusted_keys - .iter() - .any(|k| k == &server_auth.pubkey_hex); - - if !sig_valid || !key_trusted { - return Err(format!( - "Server auth failed: sig_valid={sig_valid}, key_trusted={key_trusted}, \ - server_pubkey={}", - server_auth.pubkey_hex - )); - } - - // Step 4: Receive challenge from server. - let ch_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next()) - .await - .map_err(|_| "Timeout waiting for challenge".to_string())? - .ok_or_else(|| "Connection closed before challenge".to_string())? - .map_err(|e| format!("WS read error: {e}"))?; - - let ch_text = match ch_frame { - TMsg::Text(t) => t.to_string(), - other => return Err(format!("Expected challenge text frame, got {other:?}")), - }; - let challenge_msg: ChallengeMessage = - serde_json::from_str(&ch_text).map_err(|e| format!("Invalid challenge: {e}"))?; - - // Step 5: Sign and send auth reply. - let connector_pubkey = crate::node_identity::public_key_hex(connector_kp); - let sig = crate::node_identity::sign_challenge(connector_kp, &challenge_msg.nonce); - let auth = AuthMessage { - r#type: "auth".to_string(), - pubkey_hex: connector_pubkey.clone(), - signature_hex: sig, - }; - sink.send(TMsg::Text(serde_json::to_string(&auth).unwrap().into())) - .await - .map_err(|e| format!("Send auth failed: {e}"))?; - - Ok(connector_pubkey) - } - - // ── Tests ──────────────────────────────────────────────────────── - - #[tokio::test] - async fn auth_happy_path_handshake_and_sync() { - use futures::{SinkExt, StreamExt}; - use tokio_tungstenite::connect_async; - use tokio_tungstenite::tungstenite::Message as TMsg; - - let connector_kp = make_keypair(); - let connector_pubkey = crate::node_identity::public_key_hex(&connector_kp); - - // Start listener that trusts the connector's pubkey. - let (addr, listener_pubkey, result_rx) = - start_auth_listener(vec![connector_pubkey.clone()]).await; - - // Connect and drive the full handshake. - let url = format!("ws://{addr}"); - let (ws, _) = connect_async(&url).await.unwrap(); - let (mut sink, mut stream) = ws.split(); - - let result = perform_client_handshake( - &mut sink, - &mut stream, - &connector_kp, - &[listener_pubkey], // connector trusts the listener - ) - .await; - assert!(result.is_ok(), "Client handshake failed: {:?}", result); - assert_eq!(result.unwrap(), connector_pubkey); - - // After auth we should receive a bulk sync. - let bulk_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next()) - .await - .expect("should receive bulk within 5s") - .unwrap() - .unwrap(); - - let bulk_text = match bulk_frame { - TMsg::Text(t) => t.to_string(), - other => panic!("Expected bulk text frame, got {other:?}"), - }; - let bulk_msg: crate::crdt_sync::wire::SyncMessage = - serde_json::from_str(&bulk_text).unwrap(); - match bulk_msg { - crate::crdt_sync::wire::SyncMessage::Bulk { ops } => { - assert!( - !ops.is_empty(), - "Bulk sync must contain at least one op after successful auth" - ); - let _signed: bft_json_crdt::json_crdt::SignedOp = - serde_json::from_str(&ops[0]).unwrap(); - } - _ => panic!("Expected Bulk message after auth"), - } - - // Verify listener also reports success. - let listener_result = result_rx.await.unwrap(); - match listener_result { - AuthListenerResult::Authenticated(pubkey) => { - assert_eq!(pubkey, connector_pubkey); - } - other => panic!("Expected Authenticated, got {other:?}"), - } - } - - #[tokio::test] - async fn auth_untrusted_pubkey_rejected() { - use futures::{SinkExt, StreamExt}; - use tokio_tungstenite::connect_async; - - let connector_kp = make_keypair(); - let connector_pubkey = crate::node_identity::public_key_hex(&connector_kp); - - // Listener trusts a DIFFERENT key, not the connector's. - let other_kp = make_keypair(); - let other_pubkey = crate::node_identity::public_key_hex(&other_kp); - - let (addr, listener_pubkey, result_rx) = start_auth_listener(vec![other_pubkey]).await; - - let url = format!("ws://{addr}"); - let (ws, _) = connect_async(&url).await.unwrap(); - let (mut sink, mut stream) = ws.split(); - - // Connector trusts the listener (so server auth passes). - let result = - perform_client_handshake(&mut sink, &mut stream, &connector_kp, &[listener_pubkey]) - .await; - - // The server rejects the connector (untrusted pubkey) — client should see - // either a close frame or connection loss after sending auth. - // We check that the listener reports AuthFailed. - let listener_result = result_rx.await.unwrap(); - match listener_result { - AuthListenerResult::AuthFailed(reason) => { - assert!( - reason.contains("key_trusted=false"), - "Expected key_trusted=false, got: {reason}" - ); - } - // The connector might get a close frame instead of reaching Ok. - _ => { - // If perform_client_handshake returned an error, that's also fine. - let _ = result; - } - } - - // Unused connector_pubkey reference kept to satisfy borrow checker. - drop(connector_pubkey); - } - - #[tokio::test] - async fn auth_bad_signature_rejected() { - use futures::{SinkExt, StreamExt}; - use tokio_tungstenite::connect_async; - use tokio_tungstenite::tungstenite::Message as TMsg; - - let legitimate_kp = make_keypair(); - let legitimate_pubkey = crate::node_identity::public_key_hex(&legitimate_kp); - - // A different keypair that will sign the challenge (wrong key). - let impersonator_kp = make_keypair(); - - // Listener trusts the legitimate pubkey. - let (addr, listener_pubkey, result_rx) = - start_auth_listener(vec![legitimate_pubkey.clone()]).await; - - let url = format!("ws://{addr}"); - let (ws, _) = connect_async(&url).await.unwrap(); - let (mut sink, mut stream) = ws.split(); - - // Step 1: Send hello. - let client_nonce = crate::node_identity::generate_challenge(); - let hello = HelloMessage { - r#type: "hello".to_string(), - nonce: client_nonce.clone(), - }; - sink.send(TMsg::Text(serde_json::to_string(&hello).unwrap().into())) - .await - .unwrap(); - - // Step 2: Receive server_auth and verify (listener is trusted). - let sa_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next()) - .await - .unwrap() - .unwrap() - .unwrap(); - let sa_text = match sa_frame { - TMsg::Text(t) => t.to_string(), - other => panic!("Expected server_auth text, got {other:?}"), - }; - let server_auth: ServerAuthMessage = serde_json::from_str(&sa_text).unwrap(); - let versioned = format!("huskies-v1:{}", client_nonce); - assert!( - crate::node_identity::verify_message_strict( - &server_auth.pubkey_hex, - versioned.as_bytes(), - &server_auth.signature_hex, - ), - "Server auth should be valid in bad-sig test" - ); - assert_eq!( - server_auth.pubkey_hex, listener_pubkey, - "Server pubkey must match listener" - ); - - // Step 3: Receive challenge. - let ch_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next()) - .await - .unwrap() - .unwrap() - .unwrap(); - let ch_text = match ch_frame { - TMsg::Text(t) => t.to_string(), - _ => panic!("Expected challenge text frame"), - }; - let challenge_msg: ChallengeMessage = serde_json::from_str(&ch_text).unwrap(); - - // Step 4: Sign with WRONG keypair (impersonator) but claim legitimate pubkey. - let bad_sig = crate::node_identity::sign_challenge(&impersonator_kp, &challenge_msg.nonce); - let auth_msg = AuthMessage { - r#type: "auth".to_string(), - pubkey_hex: legitimate_pubkey, - signature_hex: bad_sig, - }; - sink.send(TMsg::Text(serde_json::to_string(&auth_msg).unwrap().into())) - .await - .unwrap(); - - // Should be rejected. - let close_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next()) - .await - .expect("should receive close within 5s"); - - match close_frame { - Some(Ok(TMsg::Close(Some(frame)))) => { - assert_eq!( - &*frame.reason, "auth_failed", - "Close reason must be 'auth_failed'" - ); - } - _ => { - // Connection closed is acceptable. - } - } - - // Verify listener reports auth failure with sig_valid=false. - let listener_result = result_rx.await.unwrap(); - match listener_result { - AuthListenerResult::AuthFailed(reason) => { - assert!(reason.contains("sig_valid=false"), "Reason: {reason}"); - } - other => panic!("Expected AuthFailed, got {other:?}"), - } - } - - #[tokio::test] - async fn auth_replay_protection_fresh_nonces() { - use futures::StreamExt; - use tokio::net::TcpListener; - use tokio_tungstenite::tungstenite::Message as TMsg; - use tokio_tungstenite::{accept_async, connect_async}; - - // Start a listener that sends challenges but doesn't complete auth. - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - - let (nonce_tx, mut nonce_rx) = tokio::sync::mpsc::channel::(2); - - tokio::spawn(async move { - for _ in 0..2 { - let (tcp, _) = listener.accept().await.unwrap(); - let ws = accept_async(tcp).await.unwrap(); - let (mut sink, _stream) = ws.split(); - - let challenge = crate::node_identity::generate_challenge(); - let msg = ChallengeMessage { - r#type: "challenge".to_string(), - nonce: challenge.clone(), - }; - let json = serde_json::to_string(&msg).unwrap(); - let _ = sink.send(TMsg::Text(json.into())).await; - let _ = nonce_tx.send(challenge).await; - } - }); - - // Connect twice and collect the nonces. - let mut nonces = Vec::new(); - for _ in 0..2 { - let url = format!("ws://{addr}"); - let (ws, _) = connect_async(&url).await.unwrap(); - let (_sink, mut stream) = ws.split(); - - let frame = stream.next().await.unwrap().unwrap(); - let text = match frame { - TMsg::Text(t) => t.to_string(), - _ => panic!("Expected text"), - }; - let msg: ChallengeMessage = serde_json::from_str(&text).unwrap(); - nonces.push(msg.nonce); - drop(stream); - } - - let server_nonce_1 = nonce_rx.recv().await.unwrap(); - let server_nonce_2 = nonce_rx.recv().await.unwrap(); - - assert_ne!( - nonces[0], nonces[1], - "Consecutive challenges must be different" - ); - assert_ne!( - server_nonce_1, server_nonce_2, - "Server must generate fresh nonce per accept" - ); - assert_eq!(nonces[0], server_nonce_1); - assert_eq!(nonces[1], server_nonce_2); - } - - // ── New tests for AC4 ──────────────────────────────────────────── - - /// Handshake succeeds when both nodes mutually trust each other's pubkeys. - #[tokio::test] - async fn mutual_auth_handshake_succeeds() { - use futures::{SinkExt, StreamExt}; - use tokio_tungstenite::connect_async; - - let connector_kp = make_keypair(); - let connector_pubkey = crate::node_identity::public_key_hex(&connector_kp); - - let (addr, listener_pubkey, result_rx) = - start_auth_listener(vec![connector_pubkey.clone()]).await; - - let url = format!("ws://{addr}"); - let (ws, _) = connect_async(&url).await.unwrap(); - let (mut sink, mut stream) = ws.split(); - - let result = perform_client_handshake( - &mut sink, - &mut stream, - &connector_kp, - std::slice::from_ref(&listener_pubkey), // connector trusts listener (mutual) - ) - .await; - assert!(result.is_ok(), "Mutual auth handshake failed: {:?}", result); - - let listener_result = result_rx.await.unwrap(); - assert!( - matches!(listener_result, AuthListenerResult::Authenticated(_)), - "Listener should report Authenticated, got {listener_result:?}" - ); - } - - /// Handshake rejected when the responding node's pubkey is not in the - /// connecting peer's trusted_keys. Includes the offered pubkey in the - /// rejection reason. - #[tokio::test] - async fn handshake_rejected_untrusted_server_pubkey() { - use futures::{SinkExt, StreamExt}; - use tokio_tungstenite::connect_async; - - let connector_kp = make_keypair(); - let connector_pubkey = crate::node_identity::public_key_hex(&connector_kp); - - // Server trusts connector, but connector does NOT trust server. - let (addr, listener_pubkey, _result_rx) = start_auth_listener(vec![connector_pubkey]).await; - - let url = format!("ws://{addr}"); - let (ws, _) = connect_async(&url).await.unwrap(); - let (mut sink, mut stream) = ws.split(); - - // Connector passes an EMPTY trusted_keys list — no server is trusted. - let result = perform_client_handshake( - &mut sink, - &mut stream, - &connector_kp, - &[], // connector trusts nobody - ) - .await; - - assert!( - result.is_err(), - "Expected handshake to fail when server pubkey is not trusted" - ); - let err = result.unwrap_err(); - // The error must include the server's offered pubkey. - assert!( - err.contains(&listener_pubkey), - "Rejection error must include the offered server pubkey. Error: {err}" - ); - } - - /// Handshake rejected when the server's signature is valid but was produced - /// over a different nonce than the one the client sent (replay/swap defence). - #[tokio::test] - async fn handshake_rejected_wrong_nonce_in_server_response() { - use futures::{SinkExt, StreamExt}; - use tokio::net::TcpListener; - use tokio_tungstenite::tungstenite::Message as TMsg; - use tokio_tungstenite::{accept_async, connect_async}; - - // Set up a rogue server that signs a DIFFERENT nonce (not the one sent). - let rogue_kp = make_keypair(); - let rogue_pubkey = crate::node_identity::public_key_hex(&rogue_kp); - let rogue_pubkey_for_client = rogue_pubkey.clone(); - - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - - tokio::spawn(async move { - let (tcp, _) = listener.accept().await.unwrap(); - let ws = accept_async(tcp).await.unwrap(); - let (mut sink, mut stream) = ws.split(); - - // Receive hello from connector. - let frame = stream.next().await.unwrap().unwrap(); - let _hello_text = match frame { - TMsg::Text(t) => t.to_string(), - _ => return, - }; - - // Sign a DIFFERENT nonce (not the one from hello). - let different_nonce = crate::node_identity::generate_challenge(); - let wrong_versioned = format!("huskies-v1:{different_nonce}"); - let bad_sig = crate::node_identity::sign_challenge(&rogue_kp, &wrong_versioned); - - let server_auth = ServerAuthMessage { - r#type: "server_auth".to_string(), - pubkey_hex: rogue_pubkey.clone(), - signature_hex: bad_sig, - }; - let _ = sink - .send(TMsg::Text( - serde_json::to_string(&server_auth).unwrap().into(), - )) - .await; - // Drop connection — client should reject before reaching challenge step. - }); - - let connector_kp = make_keypair(); - - let url = format!("ws://{addr}"); - let (ws, _) = connect_async(&url).await.unwrap(); - let (mut sink, mut stream) = ws.split(); - - // Connector trusts the rogue server's pubkey — only the nonce mismatch - // should cause rejection. - let result = perform_client_handshake( - &mut sink, - &mut stream, - &connector_kp, - &[rogue_pubkey_for_client], - ) - .await; - - assert!( - result.is_err(), - "Expected handshake to fail when server signs wrong nonce" - ); - let err = result.unwrap_err(); - assert!( - err.contains("sig_valid=false"), - "Rejection must report invalid signature. Error: {err}" - ); - } -} diff --git a/server/src/crdt_sync/handshake/mod.rs b/server/src/crdt_sync/handshake/mod.rs new file mode 100644 index 00000000..ac62553a --- /dev/null +++ b/server/src/crdt_sync/handshake/mod.rs @@ -0,0 +1,185 @@ +//! Auth handshake for the server-side `/crdt-sync` WebSocket. +//! +//! # Extended mutual-auth handshake protocol +//! +//! ```text +//! Connecting peer (client): Responding node (server): +//! hello(nonce) ──────────────────► receive hello +//! ◄────────────────── server_auth(pubkey, sign("huskies-v1:{nonce}")) +//! verify server sig + trusted_keys +//! ◄────────────────── challenge(server_nonce) +//! auth(pubkey, sign(server_nonce)) ► +//! verify client sig + trusted_keys +//! ``` +//! +//! Both sides verify the peer's pubkey against `trusted_keys`. A peer whose +//! pubkey is absent from the allow-list is rejected with close code 4002. + +#![allow(unused_imports, dead_code)] +use futures::{SinkExt, StreamExt}; +use poem::web::websocket::Message as WsMessage; + +use crate::crdt_state; +use crate::node_identity; +use crate::slog; + +use super::AUTH_TIMEOUT_SECS; +use super::auth::trusted_keys; +use super::wire::{AuthMessage, ChallengeMessage, HelloMessage, ServerAuthMessage}; + +/// Perform the extended mutual-auth handshake for a freshly-upgraded WebSocket +/// connection. +/// +/// **Protocol (server/responding-node side):** +/// 1. Receive `hello` from the connecting peer (contains client nonce). +/// 2. Sign `"huskies-v1:{nonce}"` and send `server_auth` (this node's pubkey + +/// signature) back to the connecting peer. +/// 3. Send a fresh challenge nonce to the connecting peer. +/// 4. Wait up to [`AUTH_TIMEOUT_SECS`] for a signed `auth` reply. +/// 5. Verify the connecting peer's signature and check its pubkey against the +/// trusted-key allow-list. +/// +/// Returns `Some(AuthMessage)` on success. On failure the connection has +/// already been closed with the appropriate close code (`auth_timeout` or +/// `auth_failed`); the caller should simply return. +pub(super) async fn perform_auth_handshake( + sink: &mut futures::stream::SplitSink, + stream: &mut futures::stream::SplitStream, +) -> Option { + // ── Step 1: Receive hello from connecting peer ─────────────────── + let hello_result = tokio::time::timeout( + std::time::Duration::from_secs(AUTH_TIMEOUT_SECS), + stream.next(), + ) + .await; + + let hello_text = match hello_result { + Ok(Some(Ok(WsMessage::Text(text)))) => text, + Ok(_) | Err(_) => { + slog!("[crdt-sync] No hello from peer — closing"); + close_with_auth_failed(sink).await; + return None; + } + }; + + let hello: HelloMessage = match serde_json::from_str::(&hello_text) { + Ok(m) if m.r#type == "hello" => m, + _ => { + slog!("[crdt-sync] Invalid hello message from peer"); + close_with_auth_failed(sink).await; + return None; + } + }; + + // ── Step 2: Sign versioned challenge and send server_auth ──────── + let (server_pubkey_hex, server_sig_hex) = + match crdt_state::sign_versioned_challenge(&hello.nonce) { + Some(v) => v, + None => { + slog!("[crdt-sync] CRDT not initialised — cannot produce server_auth"); + close_with_auth_failed(sink).await; + return None; + } + }; + + let server_auth = ServerAuthMessage { + r#type: "server_auth".to_string(), + pubkey_hex: server_pubkey_hex, + signature_hex: server_sig_hex, + }; + let server_auth_json = serde_json::to_string(&server_auth).ok()?; + if sink.send(WsMessage::Text(server_auth_json)).await.is_err() { + return None; + } + + // ── Step 3: Send challenge nonce to connecting peer ────────────── + let challenge = node_identity::generate_challenge(); + let challenge_msg = ChallengeMessage { + r#type: "challenge".to_string(), + nonce: challenge.clone(), + }; + let challenge_json = serde_json::to_string(&challenge_msg).ok()?; + if sink.send(WsMessage::Text(challenge_json)).await.is_err() { + return None; + } + + // ── Step 4: Await signed auth reply from connecting peer ───────── + let auth_result = tokio::time::timeout( + std::time::Duration::from_secs(AUTH_TIMEOUT_SECS), + stream.next(), + ) + .await; + + let auth_text = match auth_result { + Ok(Some(Ok(WsMessage::Text(text)))) => text, + Ok(_) | Err(_) => { + slog!("[crdt-sync] Auth timeout or connection lost during handshake"); + let _ = sink + .send(WsMessage::Close(Some(( + poem::web::websocket::CloseCode::from(4001), + "auth_timeout".to_string(), + )))) + .await; + let _ = sink.close().await; + return None; + } + }; + + let auth_msg: AuthMessage = match serde_json::from_str(&auth_text) { + Ok(m) => m, + Err(_) => { + slog!("[crdt-sync] Invalid auth message from peer"); + close_with_auth_failed(sink).await; + return None; + } + }; + + // ── Step 5: Verify signature and check trusted-key allow-list ──── + let key_trusted = trusted_keys().iter().any(|k| k == &auth_msg.pubkey_hex); + if !key_trusted { + slog!( + "[crdt-sync] Auth rejected: peer pubkey not in trusted_keys: {}", + auth_msg.pubkey_hex + ); + close_with_auth_failed(sink).await; + return None; + } + + let sig_valid = + node_identity::verify_challenge(&auth_msg.pubkey_hex, &challenge, &auth_msg.signature_hex); + if !sig_valid { + slog!( + "[crdt-sync] Auth rejected: invalid signature from peer {:.12}…", + &auth_msg.pubkey_hex + ); + close_with_auth_failed(sink).await; + return None; + } + + slog!( + "[crdt-sync] Peer authenticated: {:.12}…", + &auth_msg.pubkey_hex + ); + + Some(auth_msg) +} + +/// Close the WebSocket with a generic `auth_failed` reason. +/// +/// The close reason is intentionally the same for all auth failures +/// (bad signature, untrusted key, malformed message) to avoid leaking +/// which check failed. +async fn close_with_auth_failed( + sink: &mut futures::stream::SplitSink, +) { + let _ = sink + .send(WsMessage::Close(Some(( + poem::web::websocket::CloseCode::from(4002), + "auth_failed".to_string(), + )))) + .await; + let _ = sink.close().await; +} + +#[cfg(test)] +mod tests; diff --git a/server/src/crdt_sync/handshake/tests.rs b/server/src/crdt_sync/handshake/tests.rs new file mode 100644 index 00000000..d2a71a14 --- /dev/null +++ b/server/src/crdt_sync/handshake/tests.rs @@ -0,0 +1,734 @@ +//! Tests for the extended mutual-auth handshake protocol, covering happy-path +//! authentication, rejection of untrusted keys, bad signatures, and replay +//! protection via fresh nonces. + +use super::*; +use bft_json_crdt::keypair::make_keypair; + +// ── AuthListenerResult ─────────────────────────────────────────── +#[allow(dead_code)] +#[derive(Debug)] +enum AuthListenerResult { + Authenticated(String), + AuthFailed(String), + AuthTimeout, + ConnectionLost, + PeerClosedEarly(Option), +} + +/// Start a test server that implements the full extended mutual-auth handshake. +/// +/// The server: +/// 1. Receives `hello` from the connecting peer. +/// 2. Signs the versioned challenge with `listener_kp`. +/// 3. Sends `server_auth` back. +/// 4. Sends a challenge nonce to the connecting peer. +/// 5. Receives and verifies the connecting peer's `auth` reply. +/// 6. Checks the peer pubkey against `trusted_keys`. +/// 7. If auth passes, sends a bulk-sync message and reports success. +/// +/// Returns `(addr, listener_pubkey_hex, result_rx)`. +async fn start_auth_listener( + trusted_keys: Vec, +) -> ( + std::net::SocketAddr, + String, + tokio::sync::oneshot::Receiver, +) { + use tokio::net::TcpListener; + use tokio_tungstenite::accept_async; + + let listener_kp = make_keypair(); + let listener_pubkey = crate::node_identity::public_key_hex(&listener_kp); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let (result_tx, result_rx) = tokio::sync::oneshot::channel(); + + let listener_pubkey_clone = listener_pubkey.clone(); + tokio::spawn(async move { + let (tcp_stream, _) = listener.accept().await.unwrap(); + let ws_stream = accept_async(tcp_stream).await.unwrap(); + let (mut sink, mut stream) = ws_stream.split(); + + use tokio_tungstenite::tungstenite::Message as TMsg; + + // Step 1: Receive hello from connecting peer. + let hello_frame = + tokio::time::timeout(std::time::Duration::from_secs(10), stream.next()).await; + let hello_text = match hello_frame { + Ok(Some(Ok(TMsg::Text(t)))) => t.to_string(), + Ok(Some(Ok(TMsg::Close(reason)))) => { + let _ = result_tx.send(AuthListenerResult::PeerClosedEarly( + reason.map(|r| r.reason.to_string()), + )); + return; + } + _ => { + let _ = result_tx.send(AuthListenerResult::ConnectionLost); + return; + } + }; + let hello: HelloMessage = match serde_json::from_str::(&hello_text) { + Ok(m) if m.r#type == "hello" => m, + _ => { + let _ = result_tx.send(AuthListenerResult::AuthFailed("bad_hello".into())); + return; + } + }; + + // Step 2: Sign versioned challenge and send server_auth. + let versioned = format!("huskies-v1:{}", hello.nonce); + let server_sig = crate::node_identity::sign_challenge(&listener_kp, &versioned); + let server_auth = ServerAuthMessage { + r#type: "server_auth".to_string(), + pubkey_hex: listener_pubkey_clone.clone(), + signature_hex: server_sig, + }; + let server_auth_json = serde_json::to_string(&server_auth).unwrap(); + if sink + .send(TMsg::Text(server_auth_json.into())) + .await + .is_err() + { + let _ = result_tx.send(AuthListenerResult::ConnectionLost); + return; + } + + // Step 3: Send challenge. + let challenge = crate::node_identity::generate_challenge(); + let challenge_msg = ChallengeMessage { + r#type: "challenge".to_string(), + nonce: challenge.clone(), + }; + let challenge_json = serde_json::to_string(&challenge_msg).unwrap(); + if sink.send(TMsg::Text(challenge_json.into())).await.is_err() { + let _ = result_tx.send(AuthListenerResult::ConnectionLost); + return; + } + + // Step 4: Await auth reply. + let auth_frame = + tokio::time::timeout(std::time::Duration::from_secs(10), stream.next()).await; + let auth_text = match auth_frame { + Ok(Some(Ok(TMsg::Text(t)))) => t.to_string(), + Ok(Some(Ok(TMsg::Close(reason)))) => { + let _ = result_tx.send(AuthListenerResult::PeerClosedEarly( + reason.map(|r| r.reason.to_string()), + )); + return; + } + _ => { + let _ = sink + .send(TMsg::Close(Some( + tokio_tungstenite::tungstenite::protocol::CloseFrame { + code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::from(4001), + reason: "auth_timeout".into(), + }, + ))) + .await; + let _ = result_tx.send(AuthListenerResult::AuthTimeout); + return; + } + }; + + // Step 5: Verify. + let auth_msg: AuthMessage = match serde_json::from_str(&auth_text) { + Ok(m) => m, + Err(_) => { + let _ = close_listener_auth_failed(&mut sink).await; + let _ = result_tx.send(AuthListenerResult::AuthFailed("bad_json".into())); + return; + } + }; + + let sig_valid = crate::node_identity::verify_challenge( + &auth_msg.pubkey_hex, + &challenge, + &auth_msg.signature_hex, + ); + let key_trusted = trusted_keys.iter().any(|k| k == &auth_msg.pubkey_hex); + + if !sig_valid || !key_trusted { + let _ = close_listener_auth_failed(&mut sink).await; + let _ = result_tx.send(AuthListenerResult::AuthFailed(format!( + "sig_valid={sig_valid}, key_trusted={key_trusted}" + ))); + return; + } + + // Auth passed — send bulk state. + let kp = make_keypair(); + let mut crdt = + bft_json_crdt::json_crdt::BaseCrdt::::new(&kp); + let item: bft_json_crdt::json_crdt::JsonValue = serde_json::json!({ + "story_id": "628_auth_test_item", + "stage": "1_backlog", + "name": "Auth Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, + }) + .into(); + let op = crdt + .doc + .items + .insert(bft_json_crdt::op::ROOT_ID, item) + .sign(&kp); + let op_json = serde_json::to_string(&op).unwrap(); + let bulk = crate::crdt_sync::wire::SyncMessage::Bulk { ops: vec![op_json] }; + let bulk_json = serde_json::to_string(&bulk).unwrap(); + let _ = sink.send(TMsg::Text(bulk_json.into())).await; + + let _ = result_tx.send(AuthListenerResult::Authenticated(auth_msg.pubkey_hex)); + }); + + (addr, listener_pubkey, result_rx) +} + +async fn close_listener_auth_failed( + sink: &mut futures::stream::SplitSink< + tokio_tungstenite::WebSocketStream, + tokio_tungstenite::tungstenite::Message, + >, +) { + use tokio_tungstenite::tungstenite::protocol::CloseFrame; + use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; + let _ = sink + .send(tokio_tungstenite::tungstenite::Message::Close(Some( + CloseFrame { + code: CloseCode::from(4002), + reason: "auth_failed".into(), + }, + ))) + .await; +} + +// ── Helper: perform the connecting-peer side of the handshake ──── + +/// Drive the full extended client handshake over a tungstenite stream. +/// +/// Returns `Ok(connector_pubkey_hex)` on success, or an error string +/// describing the rejection. +async fn perform_client_handshake( + sink: &mut futures::stream::SplitSink< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + tokio_tungstenite::tungstenite::Message, + >, + stream: &mut futures::stream::SplitStream< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + >, + connector_kp: &bft_json_crdt::keypair::Ed25519KeyPair, + connector_trusted_keys: &[String], +) -> Result { + use tokio_tungstenite::tungstenite::Message as TMsg; + + // Step 1: Send hello with fresh nonce. + let client_nonce = crate::node_identity::generate_challenge(); + let hello = HelloMessage { + r#type: "hello".to_string(), + nonce: client_nonce.clone(), + }; + sink.send(TMsg::Text(serde_json::to_string(&hello).unwrap().into())) + .await + .map_err(|e| format!("Send hello failed: {e}"))?; + + // Step 2: Receive server_auth. + let sa_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next()) + .await + .map_err(|_| "Timeout waiting for server_auth".to_string())? + .ok_or_else(|| "Connection closed before server_auth".to_string())? + .map_err(|e| format!("WS read error: {e}"))?; + + let sa_text = match sa_frame { + TMsg::Text(t) => t.to_string(), + TMsg::Close(f) => { + return Err(format!( + "auth_failed: {}", + f.map(|f| f.reason.to_string()) + .unwrap_or_else(|| "no reason".to_string()) + )); + } + other => return Err(format!("Unexpected frame: {other:?}")), + }; + + let server_auth: ServerAuthMessage = + serde_json::from_str(&sa_text).map_err(|e| format!("Invalid server_auth: {e}"))?; + + // Step 3: Verify server's signature over versioned challenge. + let versioned = format!("huskies-v1:{}", client_nonce); + let sig_valid = crate::node_identity::verify_message_strict( + &server_auth.pubkey_hex, + versioned.as_bytes(), + &server_auth.signature_hex, + ); + let key_trusted = connector_trusted_keys + .iter() + .any(|k| k == &server_auth.pubkey_hex); + + if !sig_valid || !key_trusted { + return Err(format!( + "Server auth failed: sig_valid={sig_valid}, key_trusted={key_trusted}, \ + server_pubkey={}", + server_auth.pubkey_hex + )); + } + + // Step 4: Receive challenge from server. + let ch_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next()) + .await + .map_err(|_| "Timeout waiting for challenge".to_string())? + .ok_or_else(|| "Connection closed before challenge".to_string())? + .map_err(|e| format!("WS read error: {e}"))?; + + let ch_text = match ch_frame { + TMsg::Text(t) => t.to_string(), + other => return Err(format!("Expected challenge text frame, got {other:?}")), + }; + let challenge_msg: ChallengeMessage = + serde_json::from_str(&ch_text).map_err(|e| format!("Invalid challenge: {e}"))?; + + // Step 5: Sign and send auth reply. + let connector_pubkey = crate::node_identity::public_key_hex(connector_kp); + let sig = crate::node_identity::sign_challenge(connector_kp, &challenge_msg.nonce); + let auth = AuthMessage { + r#type: "auth".to_string(), + pubkey_hex: connector_pubkey.clone(), + signature_hex: sig, + }; + sink.send(TMsg::Text(serde_json::to_string(&auth).unwrap().into())) + .await + .map_err(|e| format!("Send auth failed: {e}"))?; + + Ok(connector_pubkey) +} + +// ── Tests ──────────────────────────────────────────────────────── + +#[tokio::test] +async fn auth_happy_path_handshake_and_sync() { + use futures::{SinkExt, StreamExt}; + use tokio_tungstenite::connect_async; + use tokio_tungstenite::tungstenite::Message as TMsg; + + let connector_kp = make_keypair(); + let connector_pubkey = crate::node_identity::public_key_hex(&connector_kp); + + // Start listener that trusts the connector's pubkey. + let (addr, listener_pubkey, result_rx) = + start_auth_listener(vec![connector_pubkey.clone()]).await; + + // Connect and drive the full handshake. + let url = format!("ws://{addr}"); + let (ws, _) = connect_async(&url).await.unwrap(); + let (mut sink, mut stream) = ws.split(); + + let result = perform_client_handshake( + &mut sink, + &mut stream, + &connector_kp, + &[listener_pubkey], // connector trusts the listener + ) + .await; + assert!(result.is_ok(), "Client handshake failed: {:?}", result); + assert_eq!(result.unwrap(), connector_pubkey); + + // After auth we should receive a bulk sync. + let bulk_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next()) + .await + .expect("should receive bulk within 5s") + .unwrap() + .unwrap(); + + let bulk_text = match bulk_frame { + TMsg::Text(t) => t.to_string(), + other => panic!("Expected bulk text frame, got {other:?}"), + }; + let bulk_msg: crate::crdt_sync::wire::SyncMessage = serde_json::from_str(&bulk_text).unwrap(); + match bulk_msg { + crate::crdt_sync::wire::SyncMessage::Bulk { ops } => { + assert!( + !ops.is_empty(), + "Bulk sync must contain at least one op after successful auth" + ); + let _signed: bft_json_crdt::json_crdt::SignedOp = + serde_json::from_str(&ops[0]).unwrap(); + } + _ => panic!("Expected Bulk message after auth"), + } + + // Verify listener also reports success. + let listener_result = result_rx.await.unwrap(); + match listener_result { + AuthListenerResult::Authenticated(pubkey) => { + assert_eq!(pubkey, connector_pubkey); + } + other => panic!("Expected Authenticated, got {other:?}"), + } +} + +#[tokio::test] +async fn auth_untrusted_pubkey_rejected() { + use futures::{SinkExt, StreamExt}; + use tokio_tungstenite::connect_async; + + let connector_kp = make_keypair(); + let connector_pubkey = crate::node_identity::public_key_hex(&connector_kp); + + // Listener trusts a DIFFERENT key, not the connector's. + let other_kp = make_keypair(); + let other_pubkey = crate::node_identity::public_key_hex(&other_kp); + + let (addr, listener_pubkey, result_rx) = start_auth_listener(vec![other_pubkey]).await; + + let url = format!("ws://{addr}"); + let (ws, _) = connect_async(&url).await.unwrap(); + let (mut sink, mut stream) = ws.split(); + + // Connector trusts the listener (so server auth passes). + let result = + perform_client_handshake(&mut sink, &mut stream, &connector_kp, &[listener_pubkey]).await; + + // The server rejects the connector (untrusted pubkey) — client should see + // either a close frame or connection loss after sending auth. + // We check that the listener reports AuthFailed. + let listener_result = result_rx.await.unwrap(); + match listener_result { + AuthListenerResult::AuthFailed(reason) => { + assert!( + reason.contains("key_trusted=false"), + "Expected key_trusted=false, got: {reason}" + ); + } + // The connector might get a close frame instead of reaching Ok. + _ => { + // If perform_client_handshake returned an error, that's also fine. + let _ = result; + } + } + + // Unused connector_pubkey reference kept to satisfy borrow checker. + drop(connector_pubkey); +} + +#[tokio::test] +async fn auth_bad_signature_rejected() { + use futures::{SinkExt, StreamExt}; + use tokio_tungstenite::connect_async; + use tokio_tungstenite::tungstenite::Message as TMsg; + + let legitimate_kp = make_keypair(); + let legitimate_pubkey = crate::node_identity::public_key_hex(&legitimate_kp); + + // A different keypair that will sign the challenge (wrong key). + let impersonator_kp = make_keypair(); + + // Listener trusts the legitimate pubkey. + let (addr, listener_pubkey, result_rx) = + start_auth_listener(vec![legitimate_pubkey.clone()]).await; + + let url = format!("ws://{addr}"); + let (ws, _) = connect_async(&url).await.unwrap(); + let (mut sink, mut stream) = ws.split(); + + // Step 1: Send hello. + let client_nonce = crate::node_identity::generate_challenge(); + let hello = HelloMessage { + r#type: "hello".to_string(), + nonce: client_nonce.clone(), + }; + sink.send(TMsg::Text(serde_json::to_string(&hello).unwrap().into())) + .await + .unwrap(); + + // Step 2: Receive server_auth and verify (listener is trusted). + let sa_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let sa_text = match sa_frame { + TMsg::Text(t) => t.to_string(), + other => panic!("Expected server_auth text, got {other:?}"), + }; + let server_auth: ServerAuthMessage = serde_json::from_str(&sa_text).unwrap(); + let versioned = format!("huskies-v1:{}", client_nonce); + assert!( + crate::node_identity::verify_message_strict( + &server_auth.pubkey_hex, + versioned.as_bytes(), + &server_auth.signature_hex, + ), + "Server auth should be valid in bad-sig test" + ); + assert_eq!( + server_auth.pubkey_hex, listener_pubkey, + "Server pubkey must match listener" + ); + + // Step 3: Receive challenge. + let ch_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next()) + .await + .unwrap() + .unwrap() + .unwrap(); + let ch_text = match ch_frame { + TMsg::Text(t) => t.to_string(), + _ => panic!("Expected challenge text frame"), + }; + let challenge_msg: ChallengeMessage = serde_json::from_str(&ch_text).unwrap(); + + // Step 4: Sign with WRONG keypair (impersonator) but claim legitimate pubkey. + let bad_sig = crate::node_identity::sign_challenge(&impersonator_kp, &challenge_msg.nonce); + let auth_msg = AuthMessage { + r#type: "auth".to_string(), + pubkey_hex: legitimate_pubkey, + signature_hex: bad_sig, + }; + sink.send(TMsg::Text(serde_json::to_string(&auth_msg).unwrap().into())) + .await + .unwrap(); + + // Should be rejected. + let close_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next()) + .await + .expect("should receive close within 5s"); + + match close_frame { + Some(Ok(TMsg::Close(Some(frame)))) => { + assert_eq!( + &*frame.reason, "auth_failed", + "Close reason must be 'auth_failed'" + ); + } + _ => { + // Connection closed is acceptable. + } + } + + // Verify listener reports auth failure with sig_valid=false. + let listener_result = result_rx.await.unwrap(); + match listener_result { + AuthListenerResult::AuthFailed(reason) => { + assert!(reason.contains("sig_valid=false"), "Reason: {reason}"); + } + other => panic!("Expected AuthFailed, got {other:?}"), + } +} + +#[tokio::test] +async fn auth_replay_protection_fresh_nonces() { + use futures::StreamExt; + use tokio::net::TcpListener; + use tokio_tungstenite::tungstenite::Message as TMsg; + use tokio_tungstenite::{accept_async, connect_async}; + + // Start a listener that sends challenges but doesn't complete auth. + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let (nonce_tx, mut nonce_rx) = tokio::sync::mpsc::channel::(2); + + tokio::spawn(async move { + for _ in 0..2 { + let (tcp, _) = listener.accept().await.unwrap(); + let ws = accept_async(tcp).await.unwrap(); + let (mut sink, _stream) = ws.split(); + + let challenge = crate::node_identity::generate_challenge(); + let msg = ChallengeMessage { + r#type: "challenge".to_string(), + nonce: challenge.clone(), + }; + let json = serde_json::to_string(&msg).unwrap(); + let _ = sink.send(TMsg::Text(json.into())).await; + let _ = nonce_tx.send(challenge).await; + } + }); + + // Connect twice and collect the nonces. + let mut nonces = Vec::new(); + for _ in 0..2 { + let url = format!("ws://{addr}"); + let (ws, _) = connect_async(&url).await.unwrap(); + let (_sink, mut stream) = ws.split(); + + let frame = stream.next().await.unwrap().unwrap(); + let text = match frame { + TMsg::Text(t) => t.to_string(), + _ => panic!("Expected text"), + }; + let msg: ChallengeMessage = serde_json::from_str(&text).unwrap(); + nonces.push(msg.nonce); + drop(stream); + } + + let server_nonce_1 = nonce_rx.recv().await.unwrap(); + let server_nonce_2 = nonce_rx.recv().await.unwrap(); + + assert_ne!( + nonces[0], nonces[1], + "Consecutive challenges must be different" + ); + assert_ne!( + server_nonce_1, server_nonce_2, + "Server must generate fresh nonce per accept" + ); + assert_eq!(nonces[0], server_nonce_1); + assert_eq!(nonces[1], server_nonce_2); +} + +// ── New tests for AC4 ──────────────────────────────────────────── + +/// Handshake succeeds when both nodes mutually trust each other's pubkeys. +#[tokio::test] +async fn mutual_auth_handshake_succeeds() { + use futures::{SinkExt, StreamExt}; + use tokio_tungstenite::connect_async; + + let connector_kp = make_keypair(); + let connector_pubkey = crate::node_identity::public_key_hex(&connector_kp); + + let (addr, listener_pubkey, result_rx) = + start_auth_listener(vec![connector_pubkey.clone()]).await; + + let url = format!("ws://{addr}"); + let (ws, _) = connect_async(&url).await.unwrap(); + let (mut sink, mut stream) = ws.split(); + + let result = perform_client_handshake( + &mut sink, + &mut stream, + &connector_kp, + std::slice::from_ref(&listener_pubkey), // connector trusts listener (mutual) + ) + .await; + assert!(result.is_ok(), "Mutual auth handshake failed: {:?}", result); + + let listener_result = result_rx.await.unwrap(); + assert!( + matches!(listener_result, AuthListenerResult::Authenticated(_)), + "Listener should report Authenticated, got {listener_result:?}" + ); +} + +/// Handshake rejected when the responding node's pubkey is not in the +/// connecting peer's trusted_keys. Includes the offered pubkey in the +/// rejection reason. +#[tokio::test] +async fn handshake_rejected_untrusted_server_pubkey() { + use futures::{SinkExt, StreamExt}; + use tokio_tungstenite::connect_async; + + let connector_kp = make_keypair(); + let connector_pubkey = crate::node_identity::public_key_hex(&connector_kp); + + // Server trusts connector, but connector does NOT trust server. + let (addr, listener_pubkey, _result_rx) = start_auth_listener(vec![connector_pubkey]).await; + + let url = format!("ws://{addr}"); + let (ws, _) = connect_async(&url).await.unwrap(); + let (mut sink, mut stream) = ws.split(); + + // Connector passes an EMPTY trusted_keys list — no server is trusted. + let result = perform_client_handshake( + &mut sink, + &mut stream, + &connector_kp, + &[], // connector trusts nobody + ) + .await; + + assert!( + result.is_err(), + "Expected handshake to fail when server pubkey is not trusted" + ); + let err = result.unwrap_err(); + // The error must include the server's offered pubkey. + assert!( + err.contains(&listener_pubkey), + "Rejection error must include the offered server pubkey. Error: {err}" + ); +} + +/// Handshake rejected when the server's signature is valid but was produced +/// over a different nonce than the one the client sent (replay/swap defence). +#[tokio::test] +async fn handshake_rejected_wrong_nonce_in_server_response() { + use futures::{SinkExt, StreamExt}; + use tokio::net::TcpListener; + use tokio_tungstenite::tungstenite::Message as TMsg; + use tokio_tungstenite::{accept_async, connect_async}; + + // Set up a rogue server that signs a DIFFERENT nonce (not the one sent). + let rogue_kp = make_keypair(); + let rogue_pubkey = crate::node_identity::public_key_hex(&rogue_kp); + let rogue_pubkey_for_client = rogue_pubkey.clone(); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + let (tcp, _) = listener.accept().await.unwrap(); + let ws = accept_async(tcp).await.unwrap(); + let (mut sink, mut stream) = ws.split(); + + // Receive hello from connector. + let frame = stream.next().await.unwrap().unwrap(); + let _hello_text = match frame { + TMsg::Text(t) => t.to_string(), + _ => return, + }; + + // Sign a DIFFERENT nonce (not the one from hello). + let different_nonce = crate::node_identity::generate_challenge(); + let wrong_versioned = format!("huskies-v1:{different_nonce}"); + let bad_sig = crate::node_identity::sign_challenge(&rogue_kp, &wrong_versioned); + + let server_auth = ServerAuthMessage { + r#type: "server_auth".to_string(), + pubkey_hex: rogue_pubkey.clone(), + signature_hex: bad_sig, + }; + let _ = sink + .send(TMsg::Text( + serde_json::to_string(&server_auth).unwrap().into(), + )) + .await; + // Drop connection — client should reject before reaching challenge step. + }); + + let connector_kp = make_keypair(); + + let url = format!("ws://{addr}"); + let (ws, _) = connect_async(&url).await.unwrap(); + let (mut sink, mut stream) = ws.split(); + + // Connector trusts the rogue server's pubkey — only the nonce mismatch + // should cause rejection. + let result = perform_client_handshake( + &mut sink, + &mut stream, + &connector_kp, + &[rogue_pubkey_for_client], + ) + .await; + + assert!( + result.is_err(), + "Expected handshake to fail when server signs wrong nonce" + ); + let err = result.unwrap_err(); + assert!( + err.contains("sig_valid=false"), + "Rejection must report invalid signature. Error: {err}" + ); +}