refactor: extract auth handshake from crdt_sync/server.rs into handshake.rs
The 1680-line server.rs is split: - handshake.rs: perform_auth_handshake helper + close_with_auth_failed + auth tests + start_auth_listener / close_listener_auth_failed test helpers + AuthListenerResult enum - server.rs: crdt_sync_handler (now invokes perform_auth_handshake) + wait_for_sync_text + broadcast/e2e/keepalive tests Auth handshake (Steps 1-3 of the WebSocket handshake) is a self-contained sequence that takes &mut SplitSink + &mut SplitStream and returns Option<AuthMessage>. The caller observes None to mean the connection has already been closed with the appropriate close code. No behaviour change. All 63 crdt_sync tests pass; full suite green.
This commit is contained in:
@@ -0,0 +1,535 @@
|
|||||||
|
//! Auth handshake for the server-side `/crdt-sync` WebSocket.
|
||||||
|
|
||||||
|
use futures::{SinkExt, StreamExt};
|
||||||
|
use poem::web::websocket::Message as WsMessage;
|
||||||
|
|
||||||
|
use crate::node_identity;
|
||||||
|
use crate::slog;
|
||||||
|
|
||||||
|
use super::AUTH_TIMEOUT_SECS;
|
||||||
|
use super::auth::trusted_keys;
|
||||||
|
use super::wire::{AuthMessage, ChallengeMessage};
|
||||||
|
|
||||||
|
/// Perform the auth handshake for a freshly-upgraded WebSocket connection.
|
||||||
|
///
|
||||||
|
/// 1. Sends a challenge to the connecting peer.
|
||||||
|
/// 2. Waits up to [`AUTH_TIMEOUT_SECS`] for a signed reply.
|
||||||
|
/// 3. Verifies the signature and checks the pubkey against the trusted keys.
|
||||||
|
///
|
||||||
|
/// Returns `Some(AuthMessage)` on success. On failure, the connection has
|
||||||
|
/// already been closed with the appropriate close code (`auth_timeout` or
|
||||||
|
/// `auth_failed`); the caller should simply return.
|
||||||
|
pub(super) async fn perform_auth_handshake(
|
||||||
|
sink: &mut futures::stream::SplitSink<poem::web::websocket::WebSocketStream, WsMessage>,
|
||||||
|
stream: &mut futures::stream::SplitStream<poem::web::websocket::WebSocketStream>,
|
||||||
|
) -> Option<AuthMessage> {
|
||||||
|
let challenge = node_identity::generate_challenge();
|
||||||
|
let challenge_msg = ChallengeMessage {
|
||||||
|
r#type: "challenge".to_string(),
|
||||||
|
nonce: challenge.clone(),
|
||||||
|
};
|
||||||
|
let challenge_json = serde_json::to_string(&challenge_msg).ok()?;
|
||||||
|
if sink.send(WsMessage::Text(challenge_json)).await.is_err() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let auth_result = tokio::time::timeout(
|
||||||
|
std::time::Duration::from_secs(AUTH_TIMEOUT_SECS),
|
||||||
|
stream.next(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let auth_text = match auth_result {
|
||||||
|
Ok(Some(Ok(WsMessage::Text(text)))) => text,
|
||||||
|
Ok(_) | Err(_) => {
|
||||||
|
slog!("[crdt-sync] Auth timeout or connection lost during handshake");
|
||||||
|
let _ = sink
|
||||||
|
.send(WsMessage::Close(Some((
|
||||||
|
poem::web::websocket::CloseCode::from(4001),
|
||||||
|
"auth_timeout".to_string(),
|
||||||
|
))))
|
||||||
|
.await;
|
||||||
|
let _ = sink.close().await;
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let auth_msg: AuthMessage = match serde_json::from_str(&auth_text) {
|
||||||
|
Ok(m) => m,
|
||||||
|
Err(_) => {
|
||||||
|
slog!("[crdt-sync] Invalid auth message from peer");
|
||||||
|
close_with_auth_failed(sink).await;
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let sig_valid = node_identity::verify_challenge(
|
||||||
|
&auth_msg.pubkey_hex,
|
||||||
|
&challenge,
|
||||||
|
&auth_msg.signature_hex,
|
||||||
|
);
|
||||||
|
let key_trusted = trusted_keys().iter().any(|k| k == &auth_msg.pubkey_hex);
|
||||||
|
|
||||||
|
if !sig_valid || !key_trusted {
|
||||||
|
slog!(
|
||||||
|
"[crdt-sync] Auth failed for peer (sig_valid={sig_valid}, key_trusted={key_trusted})"
|
||||||
|
);
|
||||||
|
close_with_auth_failed(sink).await;
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
slog!(
|
||||||
|
"[crdt-sync] Peer authenticated: {:.12}…",
|
||||||
|
&auth_msg.pubkey_hex
|
||||||
|
);
|
||||||
|
|
||||||
|
Some(auth_msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Close the WebSocket with a generic `auth_failed` reason.
|
||||||
|
///
|
||||||
|
/// The close reason is intentionally the same for all auth failures
|
||||||
|
/// (bad signature, untrusted key, malformed message) to avoid leaking
|
||||||
|
/// which check failed.
|
||||||
|
async fn close_with_auth_failed(
|
||||||
|
sink: &mut futures::stream::SplitSink<poem::web::websocket::WebSocketStream, WsMessage>,
|
||||||
|
) {
|
||||||
|
let _ = sink
|
||||||
|
.send(WsMessage::Close(Some((
|
||||||
|
poem::web::websocket::CloseCode::from(4002),
|
||||||
|
"auth_failed".to_string(),
|
||||||
|
))))
|
||||||
|
.await;
|
||||||
|
let _ = sink.close().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process an incoming text-frame sync message from a peer.
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use super::super::server::crdt_sync_handler;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum AuthListenerResult {
|
||||||
|
Authenticated(String),
|
||||||
|
AuthFailed(String),
|
||||||
|
AuthTimeout,
|
||||||
|
ConnectionLost,
|
||||||
|
PeerClosedEarly(Option<String>),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async fn start_auth_listener(
|
||||||
|
trusted_keys: Vec<String>,
|
||||||
|
) -> (
|
||||||
|
std::net::SocketAddr,
|
||||||
|
tokio::sync::oneshot::Receiver<AuthListenerResult>,
|
||||||
|
) {
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
use tokio_tungstenite::accept_async;
|
||||||
|
|
||||||
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
let addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
|
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let (tcp_stream, _) = listener.accept().await.unwrap();
|
||||||
|
let ws_stream = accept_async(tcp_stream).await.unwrap();
|
||||||
|
let (mut sink, mut stream) = ws_stream.split();
|
||||||
|
|
||||||
|
use tokio_tungstenite::tungstenite::Message as TMsg;
|
||||||
|
|
||||||
|
// Step 1: Send challenge.
|
||||||
|
let challenge = crate::node_identity::generate_challenge();
|
||||||
|
let challenge_msg = super::ChallengeMessage {
|
||||||
|
r#type: "challenge".to_string(),
|
||||||
|
nonce: challenge.clone(),
|
||||||
|
};
|
||||||
|
let challenge_json = serde_json::to_string(&challenge_msg).unwrap();
|
||||||
|
if sink.send(TMsg::Text(challenge_json.into())).await.is_err() {
|
||||||
|
let _ = result_tx.send(AuthListenerResult::ConnectionLost);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 2: Await auth reply (10s timeout).
|
||||||
|
let auth_frame =
|
||||||
|
tokio::time::timeout(std::time::Duration::from_secs(10), stream.next()).await;
|
||||||
|
|
||||||
|
let auth_text = match auth_frame {
|
||||||
|
Ok(Some(Ok(TMsg::Text(t)))) => t.to_string(),
|
||||||
|
Ok(Some(Ok(TMsg::Close(reason)))) => {
|
||||||
|
let _ = result_tx.send(AuthListenerResult::PeerClosedEarly(
|
||||||
|
reason.map(|r| r.reason.to_string()),
|
||||||
|
));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
let _ = sink
|
||||||
|
.send(TMsg::Close(Some(tokio_tungstenite::tungstenite::protocol::CloseFrame {
|
||||||
|
code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::from(4001),
|
||||||
|
reason: "auth_timeout".into(),
|
||||||
|
})))
|
||||||
|
.await;
|
||||||
|
let _ = result_tx.send(AuthListenerResult::AuthTimeout);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Step 3: Verify.
|
||||||
|
let auth_msg: super::AuthMessage = match serde_json::from_str(&auth_text) {
|
||||||
|
Ok(m) => m,
|
||||||
|
Err(_) => {
|
||||||
|
let _ = close_listener_auth_failed(&mut sink).await;
|
||||||
|
let _ = result_tx.send(AuthListenerResult::AuthFailed("bad_json".into()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let sig_valid = crate::node_identity::verify_challenge(
|
||||||
|
&auth_msg.pubkey_hex,
|
||||||
|
&challenge,
|
||||||
|
&auth_msg.signature_hex,
|
||||||
|
);
|
||||||
|
let key_trusted = trusted_keys.iter().any(|k| k == &auth_msg.pubkey_hex);
|
||||||
|
|
||||||
|
if !sig_valid || !key_trusted {
|
||||||
|
let _ = close_listener_auth_failed(&mut sink).await;
|
||||||
|
let _ = result_tx.send(AuthListenerResult::AuthFailed(format!(
|
||||||
|
"sig_valid={sig_valid}, key_trusted={key_trusted}"
|
||||||
|
)));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Auth passed! Send a bulk state with one op to prove sync works.
|
||||||
|
let kp = bft_json_crdt::keypair::make_keypair();
|
||||||
|
let mut crdt =
|
||||||
|
bft_json_crdt::json_crdt::BaseCrdt::<crate::crdt_state::PipelineDoc>::new(&kp);
|
||||||
|
let item: bft_json_crdt::json_crdt::JsonValue = serde_json::json!({
|
||||||
|
"story_id": "628_auth_test_item",
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": "Auth Test",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
"claimed_by": "",
|
||||||
|
"claimed_at": 0.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let op = crdt
|
||||||
|
.doc
|
||||||
|
.items
|
||||||
|
.insert(bft_json_crdt::op::ROOT_ID, item)
|
||||||
|
.sign(&kp);
|
||||||
|
let op_json = serde_json::to_string(&op).unwrap();
|
||||||
|
let bulk = crate::crdt_sync::wire::SyncMessage::Bulk { ops: vec![op_json] };
|
||||||
|
let bulk_json = serde_json::to_string(&bulk).unwrap();
|
||||||
|
let _ = sink.send(TMsg::Text(bulk_json.into())).await;
|
||||||
|
|
||||||
|
let _ = result_tx.send(AuthListenerResult::Authenticated(auth_msg.pubkey_hex));
|
||||||
|
});
|
||||||
|
|
||||||
|
(addr, result_rx)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async fn close_listener_auth_failed(
|
||||||
|
sink: &mut futures::stream::SplitSink<
|
||||||
|
tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
|
||||||
|
tokio_tungstenite::tungstenite::Message,
|
||||||
|
>,
|
||||||
|
) {
|
||||||
|
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
|
||||||
|
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
|
||||||
|
let _ = sink
|
||||||
|
.send(tokio_tungstenite::tungstenite::Message::Close(Some(
|
||||||
|
CloseFrame {
|
||||||
|
code: CloseCode::from(4002),
|
||||||
|
reason: "auth_failed".into(),
|
||||||
|
},
|
||||||
|
)))
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
|
||||||
|
async fn auth_happy_path_handshake_and_sync() {
|
||||||
|
use bft_json_crdt::keypair::make_keypair;
|
||||||
|
use futures::{SinkExt, StreamExt};
|
||||||
|
use tokio_tungstenite::connect_async;
|
||||||
|
use tokio_tungstenite::tungstenite::Message as TMsg;
|
||||||
|
|
||||||
|
let connector_kp = make_keypair();
|
||||||
|
let connector_pubkey = crate::node_identity::public_key_hex(&connector_kp);
|
||||||
|
|
||||||
|
// Start listener that trusts the connector's pubkey.
|
||||||
|
let (addr, result_rx) = start_auth_listener(vec![connector_pubkey.clone()]).await;
|
||||||
|
|
||||||
|
// Connect and do the handshake.
|
||||||
|
let url = format!("ws://{addr}");
|
||||||
|
let (ws, _) = connect_async(&url).await.unwrap();
|
||||||
|
let (mut sink, mut stream) = ws.split();
|
||||||
|
|
||||||
|
// Receive challenge.
|
||||||
|
let challenge_frame = stream.next().await.unwrap().unwrap();
|
||||||
|
let challenge_text = match challenge_frame {
|
||||||
|
TMsg::Text(t) => t.to_string(),
|
||||||
|
other => panic!("Expected text frame, got {other:?}"),
|
||||||
|
};
|
||||||
|
let challenge_msg: super::ChallengeMessage = serde_json::from_str(&challenge_text).unwrap();
|
||||||
|
assert_eq!(challenge_msg.r#type, "challenge");
|
||||||
|
assert_eq!(
|
||||||
|
challenge_msg.nonce.len(),
|
||||||
|
64,
|
||||||
|
"Challenge must be 64 hex chars"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Sign and reply.
|
||||||
|
let sig = crate::node_identity::sign_challenge(&connector_kp, &challenge_msg.nonce);
|
||||||
|
let auth_msg = super::AuthMessage {
|
||||||
|
r#type: "auth".to_string(),
|
||||||
|
pubkey_hex: connector_pubkey.clone(),
|
||||||
|
signature_hex: sig,
|
||||||
|
};
|
||||||
|
let auth_json = serde_json::to_string(&auth_msg).unwrap();
|
||||||
|
sink.send(TMsg::Text(auth_json.into())).await.unwrap();
|
||||||
|
|
||||||
|
// After auth, we should receive a bulk sync message with at least one op.
|
||||||
|
let bulk_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next())
|
||||||
|
.await
|
||||||
|
.expect("should receive bulk within 5s")
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let bulk_text = match bulk_frame {
|
||||||
|
TMsg::Text(t) => t.to_string(),
|
||||||
|
other => panic!("Expected bulk text frame, got {other:?}"),
|
||||||
|
};
|
||||||
|
let bulk_msg: crate::crdt_sync::wire::SyncMessage = serde_json::from_str(&bulk_text).unwrap();
|
||||||
|
match bulk_msg {
|
||||||
|
crate::crdt_sync::wire::SyncMessage::Bulk { ops } => {
|
||||||
|
assert!(
|
||||||
|
!ops.is_empty(),
|
||||||
|
"Bulk sync must contain at least one op after successful auth"
|
||||||
|
);
|
||||||
|
// Verify we can deserialize the op.
|
||||||
|
let _signed: bft_json_crdt::json_crdt::SignedOp =
|
||||||
|
serde_json::from_str(&ops[0]).unwrap();
|
||||||
|
}
|
||||||
|
_ => panic!("Expected Bulk message after auth"),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify listener also reports success.
|
||||||
|
let listener_result = result_rx.await.unwrap();
|
||||||
|
match listener_result {
|
||||||
|
AuthListenerResult::Authenticated(pubkey) => {
|
||||||
|
assert_eq!(pubkey, connector_pubkey);
|
||||||
|
}
|
||||||
|
other => panic!("Expected Authenticated, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn auth_untrusted_pubkey_rejected() {
|
||||||
|
use bft_json_crdt::keypair::make_keypair;
|
||||||
|
use futures::{SinkExt, StreamExt};
|
||||||
|
use tokio_tungstenite::connect_async;
|
||||||
|
use tokio_tungstenite::tungstenite::Message as TMsg;
|
||||||
|
|
||||||
|
let connector_kp = make_keypair();
|
||||||
|
let connector_pubkey = crate::node_identity::public_key_hex(&connector_kp);
|
||||||
|
|
||||||
|
// Listener trusts a DIFFERENT key, not the connector's.
|
||||||
|
let other_kp = make_keypair();
|
||||||
|
let other_pubkey = crate::node_identity::public_key_hex(&other_kp);
|
||||||
|
|
||||||
|
let (addr, result_rx) = start_auth_listener(vec![other_pubkey]).await;
|
||||||
|
|
||||||
|
let url = format!("ws://{addr}");
|
||||||
|
let (ws, _) = connect_async(&url).await.unwrap();
|
||||||
|
let (mut sink, mut stream) = ws.split();
|
||||||
|
|
||||||
|
// Receive challenge and sign with our (untrusted) key.
|
||||||
|
let challenge_frame = stream.next().await.unwrap().unwrap();
|
||||||
|
let challenge_text = match challenge_frame {
|
||||||
|
TMsg::Text(t) => t.to_string(),
|
||||||
|
_ => panic!("Expected text frame"),
|
||||||
|
};
|
||||||
|
let challenge_msg: super::ChallengeMessage = serde_json::from_str(&challenge_text).unwrap();
|
||||||
|
|
||||||
|
let sig = crate::node_identity::sign_challenge(&connector_kp, &challenge_msg.nonce);
|
||||||
|
let auth_msg = super::AuthMessage {
|
||||||
|
r#type: "auth".to_string(),
|
||||||
|
pubkey_hex: connector_pubkey,
|
||||||
|
signature_hex: sig,
|
||||||
|
};
|
||||||
|
sink.send(TMsg::Text(serde_json::to_string(&auth_msg).unwrap().into()))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Should receive a close frame with auth_failed.
|
||||||
|
let close_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next())
|
||||||
|
.await
|
||||||
|
.expect("should receive close within 5s");
|
||||||
|
|
||||||
|
match close_frame {
|
||||||
|
Some(Ok(TMsg::Close(Some(frame)))) => {
|
||||||
|
assert_eq!(
|
||||||
|
&*frame.reason, "auth_failed",
|
||||||
|
"Close reason must be 'auth_failed'"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Some(Ok(TMsg::Close(None))) => {
|
||||||
|
// Some implementations omit the close frame payload — that's acceptable
|
||||||
|
// as long as no sync data was sent.
|
||||||
|
}
|
||||||
|
other => {
|
||||||
|
// Connection dropped without close frame is also acceptable.
|
||||||
|
// The key assertion is below: no ops were exchanged.
|
||||||
|
let _ = other;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify listener reports auth failure.
|
||||||
|
let listener_result = result_rx.await.unwrap();
|
||||||
|
match listener_result {
|
||||||
|
AuthListenerResult::AuthFailed(reason) => {
|
||||||
|
assert!(reason.contains("key_trusted=false"), "Reason: {reason}");
|
||||||
|
}
|
||||||
|
other => panic!("Expected AuthFailed, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn auth_bad_signature_rejected() {
|
||||||
|
use bft_json_crdt::keypair::make_keypair;
|
||||||
|
use futures::{SinkExt, StreamExt};
|
||||||
|
use tokio_tungstenite::connect_async;
|
||||||
|
use tokio_tungstenite::tungstenite::Message as TMsg;
|
||||||
|
|
||||||
|
let legitimate_kp = make_keypair();
|
||||||
|
let legitimate_pubkey = crate::node_identity::public_key_hex(&legitimate_kp);
|
||||||
|
|
||||||
|
// A different keypair that will sign the challenge (wrong key).
|
||||||
|
let impersonator_kp = make_keypair();
|
||||||
|
|
||||||
|
// Listener trusts the legitimate pubkey.
|
||||||
|
let (addr, result_rx) = start_auth_listener(vec![legitimate_pubkey.clone()]).await;
|
||||||
|
|
||||||
|
let url = format!("ws://{addr}");
|
||||||
|
let (ws, _) = connect_async(&url).await.unwrap();
|
||||||
|
let (mut sink, mut stream) = ws.split();
|
||||||
|
|
||||||
|
// Receive challenge.
|
||||||
|
let challenge_frame = stream.next().await.unwrap().unwrap();
|
||||||
|
let challenge_text = match challenge_frame {
|
||||||
|
TMsg::Text(t) => t.to_string(),
|
||||||
|
_ => panic!("Expected text frame"),
|
||||||
|
};
|
||||||
|
let challenge_msg: super::ChallengeMessage = serde_json::from_str(&challenge_text).unwrap();
|
||||||
|
|
||||||
|
// Sign with the WRONG keypair but claim to be the legitimate pubkey.
|
||||||
|
let bad_sig = crate::node_identity::sign_challenge(&impersonator_kp, &challenge_msg.nonce);
|
||||||
|
let auth_msg = super::AuthMessage {
|
||||||
|
r#type: "auth".to_string(),
|
||||||
|
pubkey_hex: legitimate_pubkey,
|
||||||
|
signature_hex: bad_sig,
|
||||||
|
};
|
||||||
|
sink.send(TMsg::Text(serde_json::to_string(&auth_msg).unwrap().into()))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Should be rejected.
|
||||||
|
let close_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next())
|
||||||
|
.await
|
||||||
|
.expect("should receive close within 5s");
|
||||||
|
|
||||||
|
match close_frame {
|
||||||
|
Some(Ok(TMsg::Close(Some(frame)))) => {
|
||||||
|
assert_eq!(
|
||||||
|
&*frame.reason, "auth_failed",
|
||||||
|
"Close reason must be 'auth_failed' — same as untrusted key"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// Connection closed is acceptable.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify listener reports auth failure with sig_valid=false.
|
||||||
|
let listener_result = result_rx.await.unwrap();
|
||||||
|
match listener_result {
|
||||||
|
AuthListenerResult::AuthFailed(reason) => {
|
||||||
|
assert!(reason.contains("sig_valid=false"), "Reason: {reason}");
|
||||||
|
}
|
||||||
|
other => panic!("Expected AuthFailed, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn auth_replay_protection_fresh_nonces() {
|
||||||
|
use futures::StreamExt;
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
use tokio_tungstenite::tungstenite::Message as TMsg;
|
||||||
|
use tokio_tungstenite::{accept_async, connect_async};
|
||||||
|
|
||||||
|
// Start a listener that sends challenges but doesn't complete auth.
|
||||||
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
let addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
|
let (nonce_tx, mut nonce_rx) = tokio::sync::mpsc::channel::<String>(2);
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
for _ in 0..2 {
|
||||||
|
let (tcp, _) = listener.accept().await.unwrap();
|
||||||
|
let ws = accept_async(tcp).await.unwrap();
|
||||||
|
let (mut sink, _stream) = ws.split();
|
||||||
|
|
||||||
|
let challenge = crate::node_identity::generate_challenge();
|
||||||
|
let msg = super::ChallengeMessage {
|
||||||
|
r#type: "challenge".to_string(),
|
||||||
|
nonce: challenge.clone(),
|
||||||
|
};
|
||||||
|
let json = serde_json::to_string(&msg).unwrap();
|
||||||
|
let _ = sink.send(TMsg::Text(json.into())).await;
|
||||||
|
let _ = nonce_tx.send(challenge).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Connect twice and collect the nonces.
|
||||||
|
let mut nonces = Vec::new();
|
||||||
|
for _ in 0..2 {
|
||||||
|
let url = format!("ws://{addr}");
|
||||||
|
let (ws, _) = connect_async(&url).await.unwrap();
|
||||||
|
let (_sink, mut stream) = ws.split();
|
||||||
|
|
||||||
|
let frame = stream.next().await.unwrap().unwrap();
|
||||||
|
let text = match frame {
|
||||||
|
TMsg::Text(t) => t.to_string(),
|
||||||
|
_ => panic!("Expected text"),
|
||||||
|
};
|
||||||
|
let msg: super::ChallengeMessage = serde_json::from_str(&text).unwrap();
|
||||||
|
nonces.push(msg.nonce);
|
||||||
|
// Drop connection so listener accepts the next one.
|
||||||
|
drop(stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also collect nonces from the listener side.
|
||||||
|
let server_nonce_1 = nonce_rx.recv().await.unwrap();
|
||||||
|
let server_nonce_2 = nonce_rx.recv().await.unwrap();
|
||||||
|
|
||||||
|
assert_ne!(
|
||||||
|
nonces[0], nonces[1],
|
||||||
|
"Consecutive challenges must be different"
|
||||||
|
);
|
||||||
|
assert_ne!(
|
||||||
|
server_nonce_1, server_nonce_2,
|
||||||
|
"Server must generate fresh nonce per accept"
|
||||||
|
);
|
||||||
|
assert_eq!(nonces[0], server_nonce_1, "Client/server nonces must match");
|
||||||
|
assert_eq!(nonces[1], server_nonce_2, "Client/server nonces must match");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -61,6 +61,7 @@ pub const PONG_TIMEOUT_SECS: u64 = 60;
|
|||||||
mod auth;
|
mod auth;
|
||||||
mod client;
|
mod client;
|
||||||
mod dispatch;
|
mod dispatch;
|
||||||
|
mod handshake;
|
||||||
mod server;
|
mod server;
|
||||||
mod wire;
|
mod wire;
|
||||||
|
|
||||||
|
|||||||
+12
-504
@@ -23,7 +23,8 @@ use super::dispatch::{handle_incoming_binary, handle_incoming_text};
|
|||||||
use super::wire::{AuthMessage, ChallengeMessage, SyncMessage};
|
use super::wire::{AuthMessage, ChallengeMessage, SyncMessage};
|
||||||
use super::{AUTH_TIMEOUT_SECS, PING_INTERVAL_SECS, PONG_TIMEOUT_SECS};
|
use super::{AUTH_TIMEOUT_SECS, PING_INTERVAL_SECS, PONG_TIMEOUT_SECS};
|
||||||
|
|
||||||
// ── Server-side WebSocket handler ───────────────────────────────────
|
|
||||||
|
|
||||||
|
|
||||||
/// Query parameters accepted on the `/crdt-sync` WebSocket upgrade request.
|
/// Query parameters accepted on the `/crdt-sync` WebSocket upgrade request.
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
@@ -40,6 +41,7 @@ struct SyncQueryParams {
|
|||||||
/// in open-access mode (the default), a token is optional but still validated
|
/// in open-access mode (the default), a token is optional but still validated
|
||||||
/// if present.
|
/// if present.
|
||||||
#[handler]
|
#[handler]
|
||||||
|
|
||||||
pub async fn crdt_sync_handler(
|
pub async fn crdt_sync_handler(
|
||||||
ws: WebSocket,
|
ws: WebSocket,
|
||||||
_ctx: Data<&Arc<AppContext>>,
|
_ctx: Data<&Arc<AppContext>>,
|
||||||
@@ -74,70 +76,10 @@ pub async fn crdt_sync_handler(
|
|||||||
|
|
||||||
slog!("[crdt-sync] Peer connected, starting auth handshake");
|
slog!("[crdt-sync] Peer connected, starting auth handshake");
|
||||||
|
|
||||||
// ── Step 1: Send challenge to the connecting peer ─────────
|
let auth_msg = match super::handshake::perform_auth_handshake(&mut sink, &mut stream).await {
|
||||||
let challenge = node_identity::generate_challenge();
|
Some(m) => m,
|
||||||
let challenge_msg = ChallengeMessage {
|
None => return,
|
||||||
r#type: "challenge".to_string(),
|
|
||||||
nonce: challenge.clone(),
|
|
||||||
};
|
};
|
||||||
let challenge_json = match serde_json::to_string(&challenge_msg) {
|
|
||||||
Ok(j) => j,
|
|
||||||
Err(_) => return,
|
|
||||||
};
|
|
||||||
if sink.send(WsMessage::Text(challenge_json)).await.is_err() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Step 2: Await auth reply within timeout ───────────────
|
|
||||||
let auth_result = tokio::time::timeout(
|
|
||||||
std::time::Duration::from_secs(AUTH_TIMEOUT_SECS),
|
|
||||||
stream.next(),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let auth_text = match auth_result {
|
|
||||||
Ok(Some(Ok(WsMessage::Text(text)))) => text,
|
|
||||||
Ok(_) | Err(_) => {
|
|
||||||
// Timeout or connection closed before auth reply.
|
|
||||||
slog!("[crdt-sync] Auth timeout or connection lost during handshake");
|
|
||||||
let _ = sink
|
|
||||||
.send(WsMessage::Close(Some((
|
|
||||||
poem::web::websocket::CloseCode::from(4001),
|
|
||||||
"auth_timeout".to_string(),
|
|
||||||
))))
|
|
||||||
.await;
|
|
||||||
let _ = sink.close().await;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// ── Step 3: Verify auth reply ─────────────────────────────
|
|
||||||
let auth_msg: AuthMessage = match serde_json::from_str(&auth_text) {
|
|
||||||
Ok(m) => m,
|
|
||||||
Err(_) => {
|
|
||||||
slog!("[crdt-sync] Invalid auth message from peer");
|
|
||||||
close_with_auth_failed(&mut sink).await;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Verify signature AND check allow-list.
|
|
||||||
let sig_valid =
|
|
||||||
node_identity::verify_challenge(&auth_msg.pubkey_hex, &challenge, &auth_msg.signature_hex);
|
|
||||||
let key_trusted = trusted_keys().iter().any(|k| k == &auth_msg.pubkey_hex);
|
|
||||||
|
|
||||||
if !sig_valid || !key_trusted {
|
|
||||||
slog!("[crdt-sync] Auth failed for peer (sig_valid={sig_valid}, key_trusted={key_trusted})");
|
|
||||||
close_with_auth_failed(&mut sink).await;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
slog!(
|
|
||||||
"[crdt-sync] Peer authenticated: {:.12}…",
|
|
||||||
&auth_msg.pubkey_hex
|
|
||||||
);
|
|
||||||
|
|
||||||
// ── Auth passed — proceed with CRDT sync ──────────────────
|
|
||||||
|
|
||||||
// v2 protocol: send our vector clock so the peer can compute the delta.
|
// v2 protocol: send our vector clock so the peer can compute the delta.
|
||||||
let our_clock = crdt_state::our_vector_clock().unwrap_or_default();
|
let our_clock = crdt_state::our_vector_clock().unwrap_or_default();
|
||||||
@@ -351,6 +293,10 @@ pub async fn crdt_sync_handler(
|
|||||||
.into_response()
|
.into_response()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wait for the next text-frame sync message from the peer, handling Ping/Pong
|
||||||
|
/// transparently.
|
||||||
|
///
|
||||||
|
|
||||||
/// Wait for the next text-frame sync message from the peer, handling Ping/Pong
|
/// Wait for the next text-frame sync message from the peer, handling Ping/Pong
|
||||||
/// transparently.
|
/// transparently.
|
||||||
///
|
///
|
||||||
@@ -373,40 +319,12 @@ async fn wait_for_sync_text(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Close the WebSocket with a generic `auth_failed` reason.
|
|
||||||
///
|
|
||||||
/// The close reason is intentionally the same for all auth failures
|
|
||||||
/// (bad signature, untrusted key, malformed message) to avoid leaking
|
|
||||||
/// which check failed.
|
|
||||||
async fn close_with_auth_failed(
|
|
||||||
sink: &mut futures::stream::SplitSink<poem::web::websocket::WebSocketStream, WsMessage>,
|
|
||||||
) {
|
|
||||||
let _ = sink
|
|
||||||
.send(WsMessage::Close(Some((
|
|
||||||
poem::web::websocket::CloseCode::from(4002),
|
|
||||||
"auth_failed".to_string(),
|
|
||||||
))))
|
|
||||||
.await;
|
|
||||||
let _ = sink.close().await;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Process an incoming text-frame sync message from a peer.
|
|
||||||
///
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use super::super::wire::SyncMessagePublic;
|
||||||
|
use super::super::handshake::perform_auth_handshake;
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
#[allow(dead_code)]
|
|
||||||
enum AuthListenerResult {
|
|
||||||
Authenticated(String),
|
|
||||||
AuthFailed(String),
|
|
||||||
AuthTimeout,
|
|
||||||
ConnectionLost,
|
|
||||||
PeerClosedEarly(Option<String>),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn peer_receives_op_encoded_via_wire_codec() {
|
fn peer_receives_op_encoded_via_wire_codec() {
|
||||||
use bft_json_crdt::json_crdt::BaseCrdt;
|
use bft_json_crdt::json_crdt::BaseCrdt;
|
||||||
use bft_json_crdt::keypair::make_keypair;
|
use bft_json_crdt::keypair::make_keypair;
|
||||||
@@ -903,417 +821,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn start_auth_listener(
|
|
||||||
trusted_keys: Vec<String>,
|
|
||||||
) -> (
|
|
||||||
std::net::SocketAddr,
|
|
||||||
tokio::sync::oneshot::Receiver<AuthListenerResult>,
|
|
||||||
) {
|
|
||||||
use tokio::net::TcpListener;
|
|
||||||
use tokio_tungstenite::accept_async;
|
|
||||||
|
|
||||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
||||||
let addr = listener.local_addr().unwrap();
|
|
||||||
|
|
||||||
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let (tcp_stream, _) = listener.accept().await.unwrap();
|
|
||||||
let ws_stream = accept_async(tcp_stream).await.unwrap();
|
|
||||||
let (mut sink, mut stream) = ws_stream.split();
|
|
||||||
|
|
||||||
use tokio_tungstenite::tungstenite::Message as TMsg;
|
|
||||||
|
|
||||||
// Step 1: Send challenge.
|
|
||||||
let challenge = crate::node_identity::generate_challenge();
|
|
||||||
let challenge_msg = super::ChallengeMessage {
|
|
||||||
r#type: "challenge".to_string(),
|
|
||||||
nonce: challenge.clone(),
|
|
||||||
};
|
|
||||||
let challenge_json = serde_json::to_string(&challenge_msg).unwrap();
|
|
||||||
if sink.send(TMsg::Text(challenge_json.into())).await.is_err() {
|
|
||||||
let _ = result_tx.send(AuthListenerResult::ConnectionLost);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Step 2: Await auth reply (10s timeout).
|
|
||||||
let auth_frame =
|
|
||||||
tokio::time::timeout(std::time::Duration::from_secs(10), stream.next()).await;
|
|
||||||
|
|
||||||
let auth_text = match auth_frame {
|
|
||||||
Ok(Some(Ok(TMsg::Text(t)))) => t.to_string(),
|
|
||||||
Ok(Some(Ok(TMsg::Close(reason)))) => {
|
|
||||||
let _ = result_tx.send(AuthListenerResult::PeerClosedEarly(
|
|
||||||
reason.map(|r| r.reason.to_string()),
|
|
||||||
));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
let _ = sink
|
|
||||||
.send(TMsg::Close(Some(tokio_tungstenite::tungstenite::protocol::CloseFrame {
|
|
||||||
code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::from(4001),
|
|
||||||
reason: "auth_timeout".into(),
|
|
||||||
})))
|
|
||||||
.await;
|
|
||||||
let _ = result_tx.send(AuthListenerResult::AuthTimeout);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Step 3: Verify.
|
|
||||||
let auth_msg: super::AuthMessage = match serde_json::from_str(&auth_text) {
|
|
||||||
Ok(m) => m,
|
|
||||||
Err(_) => {
|
|
||||||
let _ = close_listener_auth_failed(&mut sink).await;
|
|
||||||
let _ = result_tx.send(AuthListenerResult::AuthFailed("bad_json".into()));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let sig_valid = crate::node_identity::verify_challenge(
|
|
||||||
&auth_msg.pubkey_hex,
|
|
||||||
&challenge,
|
|
||||||
&auth_msg.signature_hex,
|
|
||||||
);
|
|
||||||
let key_trusted = trusted_keys.iter().any(|k| k == &auth_msg.pubkey_hex);
|
|
||||||
|
|
||||||
if !sig_valid || !key_trusted {
|
|
||||||
let _ = close_listener_auth_failed(&mut sink).await;
|
|
||||||
let _ = result_tx.send(AuthListenerResult::AuthFailed(format!(
|
|
||||||
"sig_valid={sig_valid}, key_trusted={key_trusted}"
|
|
||||||
)));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Auth passed! Send a bulk state with one op to prove sync works.
|
|
||||||
let kp = bft_json_crdt::keypair::make_keypair();
|
|
||||||
let mut crdt =
|
|
||||||
bft_json_crdt::json_crdt::BaseCrdt::<crate::crdt_state::PipelineDoc>::new(&kp);
|
|
||||||
let item: bft_json_crdt::json_crdt::JsonValue = serde_json::json!({
|
|
||||||
"story_id": "628_auth_test_item",
|
|
||||||
"stage": "1_backlog",
|
|
||||||
"name": "Auth Test",
|
|
||||||
"agent": "",
|
|
||||||
"retry_count": 0.0,
|
|
||||||
"blocked": false,
|
|
||||||
"depends_on": "",
|
|
||||||
"claimed_by": "",
|
|
||||||
"claimed_at": 0.0,
|
|
||||||
})
|
|
||||||
.into();
|
|
||||||
let op = crdt
|
|
||||||
.doc
|
|
||||||
.items
|
|
||||||
.insert(bft_json_crdt::op::ROOT_ID, item)
|
|
||||||
.sign(&kp);
|
|
||||||
let op_json = serde_json::to_string(&op).unwrap();
|
|
||||||
let bulk = super::SyncMessage::Bulk { ops: vec![op_json] };
|
|
||||||
let bulk_json = serde_json::to_string(&bulk).unwrap();
|
|
||||||
let _ = sink.send(TMsg::Text(bulk_json.into())).await;
|
|
||||||
|
|
||||||
let _ = result_tx.send(AuthListenerResult::Authenticated(auth_msg.pubkey_hex));
|
|
||||||
});
|
|
||||||
|
|
||||||
(addr, result_rx)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn close_listener_auth_failed(
|
|
||||||
sink: &mut futures::stream::SplitSink<
|
|
||||||
tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
|
|
||||||
tokio_tungstenite::tungstenite::Message,
|
|
||||||
>,
|
|
||||||
) {
|
|
||||||
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
|
|
||||||
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
|
|
||||||
let _ = sink
|
|
||||||
.send(tokio_tungstenite::tungstenite::Message::Close(Some(
|
|
||||||
CloseFrame {
|
|
||||||
code: CloseCode::from(4002),
|
|
||||||
reason: "auth_failed".into(),
|
|
||||||
},
|
|
||||||
)))
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn auth_happy_path_handshake_and_sync() {
|
|
||||||
use bft_json_crdt::keypair::make_keypair;
|
|
||||||
use futures::{SinkExt, StreamExt};
|
|
||||||
use tokio_tungstenite::connect_async;
|
|
||||||
use tokio_tungstenite::tungstenite::Message as TMsg;
|
|
||||||
|
|
||||||
let connector_kp = make_keypair();
|
|
||||||
let connector_pubkey = crate::node_identity::public_key_hex(&connector_kp);
|
|
||||||
|
|
||||||
// Start listener that trusts the connector's pubkey.
|
|
||||||
let (addr, result_rx) = start_auth_listener(vec![connector_pubkey.clone()]).await;
|
|
||||||
|
|
||||||
// Connect and do the handshake.
|
|
||||||
let url = format!("ws://{addr}");
|
|
||||||
let (ws, _) = connect_async(&url).await.unwrap();
|
|
||||||
let (mut sink, mut stream) = ws.split();
|
|
||||||
|
|
||||||
// Receive challenge.
|
|
||||||
let challenge_frame = stream.next().await.unwrap().unwrap();
|
|
||||||
let challenge_text = match challenge_frame {
|
|
||||||
TMsg::Text(t) => t.to_string(),
|
|
||||||
other => panic!("Expected text frame, got {other:?}"),
|
|
||||||
};
|
|
||||||
let challenge_msg: super::ChallengeMessage = serde_json::from_str(&challenge_text).unwrap();
|
|
||||||
assert_eq!(challenge_msg.r#type, "challenge");
|
|
||||||
assert_eq!(
|
|
||||||
challenge_msg.nonce.len(),
|
|
||||||
64,
|
|
||||||
"Challenge must be 64 hex chars"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Sign and reply.
|
|
||||||
let sig = crate::node_identity::sign_challenge(&connector_kp, &challenge_msg.nonce);
|
|
||||||
let auth_msg = super::AuthMessage {
|
|
||||||
r#type: "auth".to_string(),
|
|
||||||
pubkey_hex: connector_pubkey.clone(),
|
|
||||||
signature_hex: sig,
|
|
||||||
};
|
|
||||||
let auth_json = serde_json::to_string(&auth_msg).unwrap();
|
|
||||||
sink.send(TMsg::Text(auth_json.into())).await.unwrap();
|
|
||||||
|
|
||||||
// After auth, we should receive a bulk sync message with at least one op.
|
|
||||||
let bulk_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next())
|
|
||||||
.await
|
|
||||||
.expect("should receive bulk within 5s")
|
|
||||||
.unwrap()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let bulk_text = match bulk_frame {
|
|
||||||
TMsg::Text(t) => t.to_string(),
|
|
||||||
other => panic!("Expected bulk text frame, got {other:?}"),
|
|
||||||
};
|
|
||||||
let bulk_msg: super::SyncMessage = serde_json::from_str(&bulk_text).unwrap();
|
|
||||||
match bulk_msg {
|
|
||||||
super::SyncMessage::Bulk { ops } => {
|
|
||||||
assert!(
|
|
||||||
!ops.is_empty(),
|
|
||||||
"Bulk sync must contain at least one op after successful auth"
|
|
||||||
);
|
|
||||||
// Verify we can deserialize the op.
|
|
||||||
let _signed: bft_json_crdt::json_crdt::SignedOp =
|
|
||||||
serde_json::from_str(&ops[0]).unwrap();
|
|
||||||
}
|
|
||||||
_ => panic!("Expected Bulk message after auth"),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify listener also reports success.
|
|
||||||
let listener_result = result_rx.await.unwrap();
|
|
||||||
match listener_result {
|
|
||||||
AuthListenerResult::Authenticated(pubkey) => {
|
|
||||||
assert_eq!(pubkey, connector_pubkey);
|
|
||||||
}
|
|
||||||
other => panic!("Expected Authenticated, got {other:?}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn auth_untrusted_pubkey_rejected() {
|
|
||||||
use bft_json_crdt::keypair::make_keypair;
|
|
||||||
use futures::{SinkExt, StreamExt};
|
|
||||||
use tokio_tungstenite::connect_async;
|
|
||||||
use tokio_tungstenite::tungstenite::Message as TMsg;
|
|
||||||
|
|
||||||
let connector_kp = make_keypair();
|
|
||||||
let connector_pubkey = crate::node_identity::public_key_hex(&connector_kp);
|
|
||||||
|
|
||||||
// Listener trusts a DIFFERENT key, not the connector's.
|
|
||||||
let other_kp = make_keypair();
|
|
||||||
let other_pubkey = crate::node_identity::public_key_hex(&other_kp);
|
|
||||||
|
|
||||||
let (addr, result_rx) = start_auth_listener(vec![other_pubkey]).await;
|
|
||||||
|
|
||||||
let url = format!("ws://{addr}");
|
|
||||||
let (ws, _) = connect_async(&url).await.unwrap();
|
|
||||||
let (mut sink, mut stream) = ws.split();
|
|
||||||
|
|
||||||
// Receive challenge and sign with our (untrusted) key.
|
|
||||||
let challenge_frame = stream.next().await.unwrap().unwrap();
|
|
||||||
let challenge_text = match challenge_frame {
|
|
||||||
TMsg::Text(t) => t.to_string(),
|
|
||||||
_ => panic!("Expected text frame"),
|
|
||||||
};
|
|
||||||
let challenge_msg: super::ChallengeMessage = serde_json::from_str(&challenge_text).unwrap();
|
|
||||||
|
|
||||||
let sig = crate::node_identity::sign_challenge(&connector_kp, &challenge_msg.nonce);
|
|
||||||
let auth_msg = super::AuthMessage {
|
|
||||||
r#type: "auth".to_string(),
|
|
||||||
pubkey_hex: connector_pubkey,
|
|
||||||
signature_hex: sig,
|
|
||||||
};
|
|
||||||
sink.send(TMsg::Text(serde_json::to_string(&auth_msg).unwrap().into()))
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// Should receive a close frame with auth_failed.
|
|
||||||
let close_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next())
|
|
||||||
.await
|
|
||||||
.expect("should receive close within 5s");
|
|
||||||
|
|
||||||
match close_frame {
|
|
||||||
Some(Ok(TMsg::Close(Some(frame)))) => {
|
|
||||||
assert_eq!(
|
|
||||||
&*frame.reason, "auth_failed",
|
|
||||||
"Close reason must be 'auth_failed'"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Some(Ok(TMsg::Close(None))) => {
|
|
||||||
// Some implementations omit the close frame payload — that's acceptable
|
|
||||||
// as long as no sync data was sent.
|
|
||||||
}
|
|
||||||
other => {
|
|
||||||
// Connection dropped without close frame is also acceptable.
|
|
||||||
// The key assertion is below: no ops were exchanged.
|
|
||||||
let _ = other;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify listener reports auth failure.
|
|
||||||
let listener_result = result_rx.await.unwrap();
|
|
||||||
match listener_result {
|
|
||||||
AuthListenerResult::AuthFailed(reason) => {
|
|
||||||
assert!(reason.contains("key_trusted=false"), "Reason: {reason}");
|
|
||||||
}
|
|
||||||
other => panic!("Expected AuthFailed, got {other:?}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn auth_bad_signature_rejected() {
|
|
||||||
use bft_json_crdt::keypair::make_keypair;
|
|
||||||
use futures::{SinkExt, StreamExt};
|
|
||||||
use tokio_tungstenite::connect_async;
|
|
||||||
use tokio_tungstenite::tungstenite::Message as TMsg;
|
|
||||||
|
|
||||||
let legitimate_kp = make_keypair();
|
|
||||||
let legitimate_pubkey = crate::node_identity::public_key_hex(&legitimate_kp);
|
|
||||||
|
|
||||||
// A different keypair that will sign the challenge (wrong key).
|
|
||||||
let impersonator_kp = make_keypair();
|
|
||||||
|
|
||||||
// Listener trusts the legitimate pubkey.
|
|
||||||
let (addr, result_rx) = start_auth_listener(vec![legitimate_pubkey.clone()]).await;
|
|
||||||
|
|
||||||
let url = format!("ws://{addr}");
|
|
||||||
let (ws, _) = connect_async(&url).await.unwrap();
|
|
||||||
let (mut sink, mut stream) = ws.split();
|
|
||||||
|
|
||||||
// Receive challenge.
|
|
||||||
let challenge_frame = stream.next().await.unwrap().unwrap();
|
|
||||||
let challenge_text = match challenge_frame {
|
|
||||||
TMsg::Text(t) => t.to_string(),
|
|
||||||
_ => panic!("Expected text frame"),
|
|
||||||
};
|
|
||||||
let challenge_msg: super::ChallengeMessage = serde_json::from_str(&challenge_text).unwrap();
|
|
||||||
|
|
||||||
// Sign with the WRONG keypair but claim to be the legitimate pubkey.
|
|
||||||
let bad_sig = crate::node_identity::sign_challenge(&impersonator_kp, &challenge_msg.nonce);
|
|
||||||
let auth_msg = super::AuthMessage {
|
|
||||||
r#type: "auth".to_string(),
|
|
||||||
pubkey_hex: legitimate_pubkey,
|
|
||||||
signature_hex: bad_sig,
|
|
||||||
};
|
|
||||||
sink.send(TMsg::Text(serde_json::to_string(&auth_msg).unwrap().into()))
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// Should be rejected.
|
|
||||||
let close_frame = tokio::time::timeout(std::time::Duration::from_secs(5), stream.next())
|
|
||||||
.await
|
|
||||||
.expect("should receive close within 5s");
|
|
||||||
|
|
||||||
match close_frame {
|
|
||||||
Some(Ok(TMsg::Close(Some(frame)))) => {
|
|
||||||
assert_eq!(
|
|
||||||
&*frame.reason, "auth_failed",
|
|
||||||
"Close reason must be 'auth_failed' — same as untrusted key"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
// Connection closed is acceptable.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify listener reports auth failure with sig_valid=false.
|
|
||||||
let listener_result = result_rx.await.unwrap();
|
|
||||||
match listener_result {
|
|
||||||
AuthListenerResult::AuthFailed(reason) => {
|
|
||||||
assert!(reason.contains("sig_valid=false"), "Reason: {reason}");
|
|
||||||
}
|
|
||||||
other => panic!("Expected AuthFailed, got {other:?}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn auth_replay_protection_fresh_nonces() {
|
|
||||||
use futures::StreamExt;
|
|
||||||
use tokio::net::TcpListener;
|
|
||||||
use tokio_tungstenite::tungstenite::Message as TMsg;
|
|
||||||
use tokio_tungstenite::{accept_async, connect_async};
|
|
||||||
|
|
||||||
// Start a listener that sends challenges but doesn't complete auth.
|
|
||||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
||||||
let addr = listener.local_addr().unwrap();
|
|
||||||
|
|
||||||
let (nonce_tx, mut nonce_rx) = tokio::sync::mpsc::channel::<String>(2);
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
for _ in 0..2 {
|
|
||||||
let (tcp, _) = listener.accept().await.unwrap();
|
|
||||||
let ws = accept_async(tcp).await.unwrap();
|
|
||||||
let (mut sink, _stream) = ws.split();
|
|
||||||
|
|
||||||
let challenge = crate::node_identity::generate_challenge();
|
|
||||||
let msg = super::ChallengeMessage {
|
|
||||||
r#type: "challenge".to_string(),
|
|
||||||
nonce: challenge.clone(),
|
|
||||||
};
|
|
||||||
let json = serde_json::to_string(&msg).unwrap();
|
|
||||||
let _ = sink.send(TMsg::Text(json.into())).await;
|
|
||||||
let _ = nonce_tx.send(challenge).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Connect twice and collect the nonces.
|
|
||||||
let mut nonces = Vec::new();
|
|
||||||
for _ in 0..2 {
|
|
||||||
let url = format!("ws://{addr}");
|
|
||||||
let (ws, _) = connect_async(&url).await.unwrap();
|
|
||||||
let (_sink, mut stream) = ws.split();
|
|
||||||
|
|
||||||
let frame = stream.next().await.unwrap().unwrap();
|
|
||||||
let text = match frame {
|
|
||||||
TMsg::Text(t) => t.to_string(),
|
|
||||||
_ => panic!("Expected text"),
|
|
||||||
};
|
|
||||||
let msg: super::ChallengeMessage = serde_json::from_str(&text).unwrap();
|
|
||||||
nonces.push(msg.nonce);
|
|
||||||
// Drop connection so listener accepts the next one.
|
|
||||||
drop(stream);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Also collect nonces from the listener side.
|
|
||||||
let server_nonce_1 = nonce_rx.recv().await.unwrap();
|
|
||||||
let server_nonce_2 = nonce_rx.recv().await.unwrap();
|
|
||||||
|
|
||||||
assert_ne!(
|
|
||||||
nonces[0], nonces[1],
|
|
||||||
"Consecutive challenges must be different"
|
|
||||||
);
|
|
||||||
assert_ne!(
|
|
||||||
server_nonce_1, server_nonce_2,
|
|
||||||
"Server must generate fresh nonce per accept"
|
|
||||||
);
|
|
||||||
assert_eq!(nonces[0], server_nonce_1, "Client/server nonces must match");
|
|
||||||
assert_eq!(nonces[1], server_nonce_2, "Client/server nonces must match");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn keepalive_constants_are_correct() {
|
fn keepalive_constants_are_correct() {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
super::super::PING_INTERVAL_SECS,
|
super::super::PING_INTERVAL_SECS,
|
||||||
|
|||||||
Reference in New Issue
Block a user