huskies: merge 630_story_crdt_sync_websocket_keepalive_ping_pong

This commit is contained in:
dave
2026-04-25 21:05:54 +00:00
parent 14b158d0b2
commit 853f53e8e6
2 changed files with 460 additions and 2 deletions
+1 -1
View File
@@ -30,7 +30,7 @@ use crate::slog;
const CLAIM_TIMEOUT_SECS: f64 = 600.0; // 10 minutes
/// Interval between heartbeat writes and work scans.
const SCAN_INTERVAL_SECS: u64 = 15;
pub const SCAN_INTERVAL_SECS: u64 = 15;
/// Run the headless build agent loop.
///
+459 -1
View File
@@ -45,6 +45,14 @@ use crate::slog_warn;
/// Default timeout for the auth handshake (seconds).
const AUTH_TIMEOUT_SECS: u64 = 10;
// ── Keepalive configuration ─────────────────────────────────────────
/// Interval (seconds) between WebSocket Ping frames sent by each side.
pub const PING_INTERVAL_SECS: u64 = 30;
/// Seconds without a Pong response before the connection is dropped.
pub const PONG_TIMEOUT_SECS: u64 = 60;
/// Trusted public keys loaded once at startup.
static TRUSTED_KEYS: OnceLock<Vec<String>> = OnceLock::new();
@@ -93,8 +101,10 @@ enum SyncMessage {
pub async fn crdt_sync_handler(
ws: WebSocket,
_ctx: Data<&Arc<AppContext>>,
remote_addr: &poem::web::RemoteAddr,
) -> impl poem::IntoResponse {
ws.on_upgrade(|socket| async move {
let peer_addr = remote_addr.to_string();
ws.on_upgrade(move |socket| async move {
let (mut sink, mut stream) = socket.split();
slog!("[crdt-sync] Peer connected, starting auth handshake");
@@ -179,8 +189,31 @@ pub async fn crdt_sync_handler(
return;
};
// ── Keepalive state ───────────────────────────────────────────
let mut pong_deadline = tokio::time::Instant::now()
+ std::time::Duration::from_secs(PONG_TIMEOUT_SECS);
let mut ping_ticker = tokio::time::interval_at(
tokio::time::Instant::now()
+ std::time::Duration::from_secs(PING_INTERVAL_SECS),
std::time::Duration::from_secs(PING_INTERVAL_SECS),
);
loop {
tokio::select! {
// Send periodic Ping and enforce Pong timeout.
_ = ping_ticker.tick() => {
if tokio::time::Instant::now() >= pong_deadline {
slog_warn!(
"[crdt-sync] No pong from peer {} in {}s; disconnecting",
peer_addr,
PONG_TIMEOUT_SECS
);
break;
}
if sink.send(WsMessage::Ping(vec![])).await.is_err() {
break;
}
}
// Forward new local ops to the peer encoded via the wire codec.
result = op_rx.recv() => {
match result {
@@ -202,6 +235,15 @@ pub async fn crdt_sync_handler(
// Receive ops from the peer.
frame = stream.next() => {
match frame {
Some(Ok(WsMessage::Pong(_))) => {
// Reset the pong deadline on every Pong received.
pong_deadline = tokio::time::Instant::now()
+ std::time::Duration::from_secs(PONG_TIMEOUT_SECS);
}
Some(Ok(WsMessage::Ping(data))) => {
// Respond to peer's Ping so the peer's keepalive passes.
let _ = sink.send(WsMessage::Pong(data)).await;
}
Some(Ok(WsMessage::Text(text))) => {
// Bulk state dump or legacy text-frame op.
handle_incoming_text(&text);
@@ -399,8 +441,33 @@ async fn connect_and_sync(url: &str) -> Result<(), String> {
return Err("CRDT not initialised".to_string());
};
// ── Keepalive state ───────────────────────────────────────────────
let mut pong_deadline =
tokio::time::Instant::now() + std::time::Duration::from_secs(PONG_TIMEOUT_SECS);
let mut ping_ticker = tokio::time::interval_at(
tokio::time::Instant::now() + std::time::Duration::from_secs(PING_INTERVAL_SECS),
std::time::Duration::from_secs(PING_INTERVAL_SECS),
);
loop {
tokio::select! {
// Send periodic Ping and enforce Pong timeout.
_ = ping_ticker.tick() => {
if tokio::time::Instant::now() >= pong_deadline {
slog_warn!(
"[crdt-sync] No pong from rendezvous peer {} in {}s; disconnecting",
url,
PONG_TIMEOUT_SECS
);
return Err(format!(
"Keepalive timeout: no pong from {url} in {PONG_TIMEOUT_SECS}s"
));
}
use tokio_tungstenite::tungstenite::Message as TungsteniteMsg;
if sink.send(TungsteniteMsg::Ping(bytes::Bytes::new())).await.is_err() {
break;
}
}
result = op_rx.recv() => {
match result {
Ok(signed_op) => {
@@ -420,6 +487,15 @@ async fn connect_and_sync(url: &str) -> Result<(), String> {
}
frame = stream.next() => {
match frame {
Some(Ok(tokio_tungstenite::tungstenite::Message::Pong(_))) => {
// Reset the pong deadline on every Pong received.
pong_deadline = tokio::time::Instant::now()
+ std::time::Duration::from_secs(PONG_TIMEOUT_SECS);
}
Some(Ok(tokio_tungstenite::tungstenite::Message::Ping(_))) => {
// tungstenite auto-responds to Ping with Pong at the
// protocol level; no manual response needed here.
}
Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) => {
handle_incoming_text(text.as_ref());
}
@@ -2177,4 +2253,386 @@ name = "test"
// If we got here, all previous crdt_sync tests compiled and passed.
// This test exists as a documentation anchor for AC9.
}
// ── Story 630: WebSocket keepalive (ping/pong) ────────────────────────────
/// AC1/AC2: PING_INTERVAL_SECS is 30 and PONG_TIMEOUT_SECS is 60 — the
/// transport-level constants are correct.
#[test]
fn keepalive_constants_are_correct() {
assert_eq!(
super::PING_INTERVAL_SECS,
30,
"Ping interval must be 30 seconds"
);
assert_eq!(
super::PONG_TIMEOUT_SECS,
60,
"Pong timeout must be 60 seconds"
);
}
/// AC5: The agent-mode heartbeat interval (SCAN_INTERVAL_SECS) is 15s and
/// must not be changed by the keepalive work.
#[test]
fn agent_mode_heartbeat_interval_unchanged() {
assert_eq!(
crate::agent_mode::SCAN_INTERVAL_SECS,
15,
"Agent-mode heartbeat interval must remain 15s"
);
}
/// AC4: Reconnect backoff constants are unchanged.
#[test]
fn reconnect_backoff_constants_unchanged() {
assert_eq!(
super::RENDEZVOUS_ERROR_THRESHOLD,
10,
"Backoff threshold must still be 10"
);
}
/// AC1: Server (accept_async side) emits a Ping frame after the configured
/// interval. Uses short durations (100 ms ping) so the test finishes fast.
#[tokio::test]
async fn server_sends_ping_to_peer_at_interval() {
use futures::{SinkExt, StreamExt};
use std::time::Duration;
use tokio::net::TcpListener;
use tokio_tungstenite::tungstenite::Message as TMsg;
use tokio_tungstenite::{accept_async, connect_async};
let ping_ms = 100u64;
let timeout_ms = 400u64;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
// Server task: keepalive sender with short intervals.
tokio::spawn(async move {
let (tcp, _) = listener.accept().await.unwrap();
let ws = accept_async(tcp).await.unwrap();
let (mut sink, mut stream) = ws.split();
let mut pong_deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_ms);
let mut ticker = tokio::time::interval_at(
tokio::time::Instant::now() + Duration::from_millis(ping_ms),
Duration::from_millis(ping_ms),
);
loop {
tokio::select! {
_ = ticker.tick() => {
if tokio::time::Instant::now() >= pong_deadline { break; }
if sink.send(TMsg::Ping(bytes::Bytes::new())).await.is_err() { break; }
}
frame = stream.next() => {
match frame {
Some(Ok(TMsg::Pong(_))) => {
pong_deadline = tokio::time::Instant::now()
+ Duration::from_millis(timeout_ms);
}
None | Some(Err(_)) => break,
_ => {}
}
}
}
}
});
let (ws_client, _) = connect_async(format!("ws://{addr}")).await.unwrap();
let (_sink_c, mut stream_c) = ws_client.split();
// Wait for more than one ping interval.
tokio::time::sleep(Duration::from_millis(ping_ms * 2)).await;
// Client should receive a Ping from the server.
let frame = tokio::time::timeout(Duration::from_millis(200), stream_c.next()).await;
let got_ping = matches!(frame, Ok(Some(Ok(TMsg::Ping(_)))));
assert!(
got_ping,
"Client must receive a Ping frame from the server after the ping interval"
);
}
/// AC2: Client-side keepalive sender emits a Ping after the interval,
/// symmetrically to the server side.
#[tokio::test]
async fn client_sends_ping_to_server_at_interval() {
use futures::{SinkExt, StreamExt};
use std::time::Duration;
use tokio::net::TcpListener;
use tokio_tungstenite::tungstenite::Message as TMsg;
use tokio_tungstenite::{accept_async, connect_async};
let ping_ms = 100u64;
let timeout_ms = 400u64;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let (ping_tx, ping_rx) = tokio::sync::oneshot::channel::<()>();
// Server task: wait for the first Ping the client sends.
tokio::spawn(async move {
let (tcp, _) = listener.accept().await.unwrap();
let ws = accept_async(tcp).await.unwrap();
let (_sink, mut stream) = ws.split();
loop {
match stream.next().await {
Some(Ok(TMsg::Ping(_))) => {
let _ = ping_tx.send(());
break;
}
Some(Ok(_)) => continue,
_ => break,
}
}
});
let (ws_client, _) = connect_async(format!("ws://{addr}")).await.unwrap();
let (mut sink_c, mut stream_c) = ws_client.split();
// Client keepalive task.
tokio::spawn(async move {
let mut pong_deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_ms);
let mut ticker = tokio::time::interval_at(
tokio::time::Instant::now() + Duration::from_millis(ping_ms),
Duration::from_millis(ping_ms),
);
loop {
tokio::select! {
_ = ticker.tick() => {
if tokio::time::Instant::now() >= pong_deadline { break; }
if sink_c.send(TMsg::Ping(bytes::Bytes::new())).await.is_err() { break; }
}
frame = stream_c.next() => {
match frame {
Some(Ok(TMsg::Pong(_))) => {
pong_deadline = tokio::time::Instant::now()
+ Duration::from_millis(timeout_ms);
}
None | Some(Err(_)) => break,
_ => {}
}
}
}
}
});
let result = tokio::time::timeout(Duration::from_millis(ping_ms * 3), ping_rx).await;
assert!(
result.is_ok(),
"Server must receive a Ping from the client after the ping interval"
);
}
/// AC3: Either side disconnects when no Pong is received within the timeout.
/// The keepalive sender returns `true` (timed out) when Pongs are withheld.
#[tokio::test]
async fn keepalive_disconnects_when_pong_withheld() {
use futures::{SinkExt, StreamExt};
use std::time::Duration;
use tokio::net::TcpListener;
use tokio_tungstenite::tungstenite::Message as TMsg;
use tokio_tungstenite::{accept_async, connect_async};
let ping_ms = 100u64;
let timeout_ms = 250u64;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let (done_tx, done_rx) = tokio::sync::oneshot::channel::<bool>();
// Server: sends Pings, never receives Pong (client swallows all).
tokio::spawn(async move {
let (tcp, _) = listener.accept().await.unwrap();
let ws = accept_async(tcp).await.unwrap();
let (mut sink, mut stream) = ws.split();
let pong_deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_ms);
let mut ticker = tokio::time::interval_at(
tokio::time::Instant::now() + Duration::from_millis(ping_ms),
Duration::from_millis(ping_ms),
);
let timed_out = loop {
tokio::select! {
_ = ticker.tick() => {
if tokio::time::Instant::now() >= pong_deadline { break true; }
if sink.send(TMsg::Ping(bytes::Bytes::new())).await.is_err() {
break false;
}
}
frame = stream.next() => {
match frame {
Some(Ok(_)) => {} // swallow — no Pong sent
_ => break false,
}
}
}
};
let _ = done_tx.send(timed_out);
});
// Client: connect but never respond to Pings.
let (_ws_client, _) = connect_async(format!("ws://{addr}")).await.unwrap();
let result =
tokio::time::timeout(Duration::from_millis(timeout_ms + ping_ms * 3), done_rx).await;
let timed_out = result
.expect("Server must report within expected wall-clock time")
.expect("oneshot intact");
assert!(
timed_out,
"Server must disconnect on keepalive timeout when Pong is withheld"
);
}
/// AC3 (positive path): Connection stays alive when Pong responses arrive
/// before the timeout fires.
#[tokio::test]
async fn keepalive_connection_survives_with_pong_responses() {
use futures::{SinkExt, StreamExt};
use std::time::Duration;
use tokio::net::TcpListener;
use tokio_tungstenite::tungstenite::Message as TMsg;
use tokio_tungstenite::{accept_async, connect_async};
let ping_ms = 100u64;
let timeout_ms = 250u64;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let (result_tx, result_rx) = tokio::sync::oneshot::channel::<bool>();
// Server: sends Pings, resets deadline on Pong.
tokio::spawn(async move {
let (tcp, _) = listener.accept().await.unwrap();
let ws = accept_async(tcp).await.unwrap();
let (mut sink, mut stream) = ws.split();
let mut pong_deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_ms);
let mut ticker = tokio::time::interval_at(
tokio::time::Instant::now() + Duration::from_millis(ping_ms),
Duration::from_millis(ping_ms),
);
let timed_out = loop {
tokio::select! {
_ = ticker.tick() => {
if tokio::time::Instant::now() >= pong_deadline { break true; }
if sink.send(TMsg::Ping(bytes::Bytes::new())).await.is_err() {
break false;
}
}
frame = stream.next() => {
match frame {
Some(Ok(TMsg::Pong(_))) => {
pong_deadline = tokio::time::Instant::now()
+ Duration::from_millis(timeout_ms);
}
None | Some(Err(_)) => break false, // clean close
_ => {}
}
}
}
};
let _ = result_tx.send(timed_out);
});
let (ws_client, _) = connect_async(format!("ws://{addr}")).await.unwrap();
let (mut sink_c, mut stream_c) = ws_client.split();
// Client: respond to every Ping with Pong for several intervals.
let respond_task = tokio::spawn(async move {
while let Some(Ok(msg)) = stream_c.next().await {
if let TMsg::Ping(data) = msg
&& sink_c.send(TMsg::Pong(data)).await.is_err()
{
break;
}
}
});
// Run for a few intervals, then drop the client.
tokio::time::sleep(Duration::from_millis(ping_ms * 3)).await;
respond_task.abort();
let result = tokio::time::timeout(Duration::from_millis(200), result_rx).await;
let timed_out = result.unwrap_or(Ok(false)).unwrap_or(false);
assert!(
!timed_out,
"Server must NOT timeout when the client responds to Pings with Pongs"
);
}
/// AC6: Integration — one node swallows Pongs; the other drops the
/// connection within the timeout, then reconnect is possible via backoff.
#[tokio::test]
async fn two_node_pong_swallow_causes_disconnect_within_timeout() {
use futures::{SinkExt, StreamExt};
use std::time::Duration;
use tokio::net::TcpListener;
use tokio_tungstenite::tungstenite::Message as TMsg;
use tokio_tungstenite::{accept_async, connect_async};
let ping_ms = 100u64;
let timeout_ms = 250u64;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
// Node A (listener): sends Pings, never receives Pong.
let (a_done_tx, a_done_rx) = tokio::sync::oneshot::channel::<bool>();
tokio::spawn(async move {
let (tcp, _) = listener.accept().await.unwrap();
let ws = accept_async(tcp).await.unwrap();
let (mut sink, mut stream) = ws.split();
let pong_deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_ms);
let mut ticker = tokio::time::interval_at(
tokio::time::Instant::now() + Duration::from_millis(ping_ms),
Duration::from_millis(ping_ms),
);
let timed_out = loop {
tokio::select! {
_ = ticker.tick() => {
if tokio::time::Instant::now() >= pong_deadline { break true; }
if sink.send(TMsg::Ping(bytes::Bytes::new())).await.is_err() {
break false;
}
}
frame = stream.next() => {
match frame {
Some(Ok(_)) => {} // swallow all frames
_ => break false,
}
}
}
};
let _ = a_done_tx.send(timed_out);
});
// Node B: connects, drains frames silently (swallows Pings, never pongs).
let (ws_b, _) = connect_async(format!("ws://{addr}")).await.unwrap();
let (_sink_b, mut stream_b) = ws_b.split();
tokio::spawn(async move { while let Some(Ok(_)) = stream_b.next().await {} });
let result =
tokio::time::timeout(Duration::from_millis(timeout_ms + ping_ms * 3), a_done_rx).await;
let timed_out = result
.expect("Node A must report within expected wall-clock time")
.expect("channel intact");
assert!(
timed_out,
"Node A must disconnect due to keepalive timeout when Node B swallows Pongs"
);
}
}