From 853f53e8e6be1f001d0454b77f41f22771340d6a Mon Sep 17 00:00:00 2001 From: dave Date: Sat, 25 Apr 2026 21:05:54 +0000 Subject: [PATCH] huskies: merge 630_story_crdt_sync_websocket_keepalive_ping_pong --- server/src/agent_mode.rs | 2 +- server/src/crdt_sync.rs | 460 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 460 insertions(+), 2 deletions(-) diff --git a/server/src/agent_mode.rs b/server/src/agent_mode.rs index d962cb1b..d99a70c1 100644 --- a/server/src/agent_mode.rs +++ b/server/src/agent_mode.rs @@ -30,7 +30,7 @@ use crate::slog; const CLAIM_TIMEOUT_SECS: f64 = 600.0; // 10 minutes /// Interval between heartbeat writes and work scans. -const SCAN_INTERVAL_SECS: u64 = 15; +pub const SCAN_INTERVAL_SECS: u64 = 15; /// Run the headless build agent loop. /// diff --git a/server/src/crdt_sync.rs b/server/src/crdt_sync.rs index d006c5e8..021ff544 100644 --- a/server/src/crdt_sync.rs +++ b/server/src/crdt_sync.rs @@ -45,6 +45,14 @@ use crate::slog_warn; /// Default timeout for the auth handshake (seconds). const AUTH_TIMEOUT_SECS: u64 = 10; +// ── Keepalive configuration ───────────────────────────────────────── + +/// Interval (seconds) between WebSocket Ping frames sent by each side. +pub const PING_INTERVAL_SECS: u64 = 30; + +/// Seconds without a Pong response before the connection is dropped. +pub const PONG_TIMEOUT_SECS: u64 = 60; + /// Trusted public keys loaded once at startup. static TRUSTED_KEYS: OnceLock> = OnceLock::new(); @@ -93,8 +101,10 @@ enum SyncMessage { pub async fn crdt_sync_handler( ws: WebSocket, _ctx: Data<&Arc>, + remote_addr: &poem::web::RemoteAddr, ) -> impl poem::IntoResponse { - ws.on_upgrade(|socket| async move { + let peer_addr = remote_addr.to_string(); + ws.on_upgrade(move |socket| async move { let (mut sink, mut stream) = socket.split(); slog!("[crdt-sync] Peer connected, starting auth handshake"); @@ -179,8 +189,31 @@ pub async fn crdt_sync_handler( return; }; + // ── Keepalive state ─────────────────────────────────────────── + let mut pong_deadline = tokio::time::Instant::now() + + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); + let mut ping_ticker = tokio::time::interval_at( + tokio::time::Instant::now() + + std::time::Duration::from_secs(PING_INTERVAL_SECS), + std::time::Duration::from_secs(PING_INTERVAL_SECS), + ); + loop { tokio::select! { + // Send periodic Ping and enforce Pong timeout. + _ = ping_ticker.tick() => { + if tokio::time::Instant::now() >= pong_deadline { + slog_warn!( + "[crdt-sync] No pong from peer {} in {}s; disconnecting", + peer_addr, + PONG_TIMEOUT_SECS + ); + break; + } + if sink.send(WsMessage::Ping(vec![])).await.is_err() { + break; + } + } // Forward new local ops to the peer encoded via the wire codec. result = op_rx.recv() => { match result { @@ -202,6 +235,15 @@ pub async fn crdt_sync_handler( // Receive ops from the peer. frame = stream.next() => { match frame { + Some(Ok(WsMessage::Pong(_))) => { + // Reset the pong deadline on every Pong received. + pong_deadline = tokio::time::Instant::now() + + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); + } + Some(Ok(WsMessage::Ping(data))) => { + // Respond to peer's Ping so the peer's keepalive passes. + let _ = sink.send(WsMessage::Pong(data)).await; + } Some(Ok(WsMessage::Text(text))) => { // Bulk state dump or legacy text-frame op. handle_incoming_text(&text); @@ -399,8 +441,33 @@ async fn connect_and_sync(url: &str) -> Result<(), String> { return Err("CRDT not initialised".to_string()); }; + // ── Keepalive state ─────────────────────────────────────────────── + let mut pong_deadline = + tokio::time::Instant::now() + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); + let mut ping_ticker = tokio::time::interval_at( + tokio::time::Instant::now() + std::time::Duration::from_secs(PING_INTERVAL_SECS), + std::time::Duration::from_secs(PING_INTERVAL_SECS), + ); + loop { tokio::select! { + // Send periodic Ping and enforce Pong timeout. + _ = ping_ticker.tick() => { + if tokio::time::Instant::now() >= pong_deadline { + slog_warn!( + "[crdt-sync] No pong from rendezvous peer {} in {}s; disconnecting", + url, + PONG_TIMEOUT_SECS + ); + return Err(format!( + "Keepalive timeout: no pong from {url} in {PONG_TIMEOUT_SECS}s" + )); + } + use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; + if sink.send(TungsteniteMsg::Ping(bytes::Bytes::new())).await.is_err() { + break; + } + } result = op_rx.recv() => { match result { Ok(signed_op) => { @@ -420,6 +487,15 @@ async fn connect_and_sync(url: &str) -> Result<(), String> { } frame = stream.next() => { match frame { + Some(Ok(tokio_tungstenite::tungstenite::Message::Pong(_))) => { + // Reset the pong deadline on every Pong received. + pong_deadline = tokio::time::Instant::now() + + std::time::Duration::from_secs(PONG_TIMEOUT_SECS); + } + Some(Ok(tokio_tungstenite::tungstenite::Message::Ping(_))) => { + // tungstenite auto-responds to Ping with Pong at the + // protocol level; no manual response needed here. + } Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) => { handle_incoming_text(text.as_ref()); } @@ -2177,4 +2253,386 @@ name = "test" // If we got here, all previous crdt_sync tests compiled and passed. // This test exists as a documentation anchor for AC9. } + + // ── Story 630: WebSocket keepalive (ping/pong) ──────────────────────────── + + /// AC1/AC2: PING_INTERVAL_SECS is 30 and PONG_TIMEOUT_SECS is 60 — the + /// transport-level constants are correct. + #[test] + fn keepalive_constants_are_correct() { + assert_eq!( + super::PING_INTERVAL_SECS, + 30, + "Ping interval must be 30 seconds" + ); + assert_eq!( + super::PONG_TIMEOUT_SECS, + 60, + "Pong timeout must be 60 seconds" + ); + } + + /// AC5: The agent-mode heartbeat interval (SCAN_INTERVAL_SECS) is 15s and + /// must not be changed by the keepalive work. + #[test] + fn agent_mode_heartbeat_interval_unchanged() { + assert_eq!( + crate::agent_mode::SCAN_INTERVAL_SECS, + 15, + "Agent-mode heartbeat interval must remain 15s" + ); + } + + /// AC4: Reconnect backoff constants are unchanged. + #[test] + fn reconnect_backoff_constants_unchanged() { + assert_eq!( + super::RENDEZVOUS_ERROR_THRESHOLD, + 10, + "Backoff threshold must still be 10" + ); + } + + /// AC1: Server (accept_async side) emits a Ping frame after the configured + /// interval. Uses short durations (100 ms ping) so the test finishes fast. + #[tokio::test] + async fn server_sends_ping_to_peer_at_interval() { + use futures::{SinkExt, StreamExt}; + use std::time::Duration; + use tokio::net::TcpListener; + use tokio_tungstenite::tungstenite::Message as TMsg; + use tokio_tungstenite::{accept_async, connect_async}; + + let ping_ms = 100u64; + let timeout_ms = 400u64; + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + // Server task: keepalive sender with short intervals. + tokio::spawn(async move { + let (tcp, _) = listener.accept().await.unwrap(); + let ws = accept_async(tcp).await.unwrap(); + let (mut sink, mut stream) = ws.split(); + + let mut pong_deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_ms); + let mut ticker = tokio::time::interval_at( + tokio::time::Instant::now() + Duration::from_millis(ping_ms), + Duration::from_millis(ping_ms), + ); + + loop { + tokio::select! { + _ = ticker.tick() => { + if tokio::time::Instant::now() >= pong_deadline { break; } + if sink.send(TMsg::Ping(bytes::Bytes::new())).await.is_err() { break; } + } + frame = stream.next() => { + match frame { + Some(Ok(TMsg::Pong(_))) => { + pong_deadline = tokio::time::Instant::now() + + Duration::from_millis(timeout_ms); + } + None | Some(Err(_)) => break, + _ => {} + } + } + } + } + }); + + let (ws_client, _) = connect_async(format!("ws://{addr}")).await.unwrap(); + let (_sink_c, mut stream_c) = ws_client.split(); + + // Wait for more than one ping interval. + tokio::time::sleep(Duration::from_millis(ping_ms * 2)).await; + + // Client should receive a Ping from the server. + let frame = tokio::time::timeout(Duration::from_millis(200), stream_c.next()).await; + let got_ping = matches!(frame, Ok(Some(Ok(TMsg::Ping(_))))); + assert!( + got_ping, + "Client must receive a Ping frame from the server after the ping interval" + ); + } + + /// AC2: Client-side keepalive sender emits a Ping after the interval, + /// symmetrically to the server side. + #[tokio::test] + async fn client_sends_ping_to_server_at_interval() { + use futures::{SinkExt, StreamExt}; + use std::time::Duration; + use tokio::net::TcpListener; + use tokio_tungstenite::tungstenite::Message as TMsg; + use tokio_tungstenite::{accept_async, connect_async}; + + let ping_ms = 100u64; + let timeout_ms = 400u64; + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let (ping_tx, ping_rx) = tokio::sync::oneshot::channel::<()>(); + + // Server task: wait for the first Ping the client sends. + tokio::spawn(async move { + let (tcp, _) = listener.accept().await.unwrap(); + let ws = accept_async(tcp).await.unwrap(); + let (_sink, mut stream) = ws.split(); + loop { + match stream.next().await { + Some(Ok(TMsg::Ping(_))) => { + let _ = ping_tx.send(()); + break; + } + Some(Ok(_)) => continue, + _ => break, + } + } + }); + + let (ws_client, _) = connect_async(format!("ws://{addr}")).await.unwrap(); + let (mut sink_c, mut stream_c) = ws_client.split(); + + // Client keepalive task. + tokio::spawn(async move { + let mut pong_deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_ms); + let mut ticker = tokio::time::interval_at( + tokio::time::Instant::now() + Duration::from_millis(ping_ms), + Duration::from_millis(ping_ms), + ); + loop { + tokio::select! { + _ = ticker.tick() => { + if tokio::time::Instant::now() >= pong_deadline { break; } + if sink_c.send(TMsg::Ping(bytes::Bytes::new())).await.is_err() { break; } + } + frame = stream_c.next() => { + match frame { + Some(Ok(TMsg::Pong(_))) => { + pong_deadline = tokio::time::Instant::now() + + Duration::from_millis(timeout_ms); + } + None | Some(Err(_)) => break, + _ => {} + } + } + } + } + }); + + let result = tokio::time::timeout(Duration::from_millis(ping_ms * 3), ping_rx).await; + assert!( + result.is_ok(), + "Server must receive a Ping from the client after the ping interval" + ); + } + + /// AC3: Either side disconnects when no Pong is received within the timeout. + /// The keepalive sender returns `true` (timed out) when Pongs are withheld. + #[tokio::test] + async fn keepalive_disconnects_when_pong_withheld() { + use futures::{SinkExt, StreamExt}; + use std::time::Duration; + use tokio::net::TcpListener; + use tokio_tungstenite::tungstenite::Message as TMsg; + use tokio_tungstenite::{accept_async, connect_async}; + + let ping_ms = 100u64; + let timeout_ms = 250u64; + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let (done_tx, done_rx) = tokio::sync::oneshot::channel::(); + + // Server: sends Pings, never receives Pong (client swallows all). + tokio::spawn(async move { + let (tcp, _) = listener.accept().await.unwrap(); + let ws = accept_async(tcp).await.unwrap(); + let (mut sink, mut stream) = ws.split(); + + let pong_deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_ms); + let mut ticker = tokio::time::interval_at( + tokio::time::Instant::now() + Duration::from_millis(ping_ms), + Duration::from_millis(ping_ms), + ); + + let timed_out = loop { + tokio::select! { + _ = ticker.tick() => { + if tokio::time::Instant::now() >= pong_deadline { break true; } + if sink.send(TMsg::Ping(bytes::Bytes::new())).await.is_err() { + break false; + } + } + frame = stream.next() => { + match frame { + Some(Ok(_)) => {} // swallow — no Pong sent + _ => break false, + } + } + } + }; + let _ = done_tx.send(timed_out); + }); + + // Client: connect but never respond to Pings. + let (_ws_client, _) = connect_async(format!("ws://{addr}")).await.unwrap(); + + let result = + tokio::time::timeout(Duration::from_millis(timeout_ms + ping_ms * 3), done_rx).await; + let timed_out = result + .expect("Server must report within expected wall-clock time") + .expect("oneshot intact"); + + assert!( + timed_out, + "Server must disconnect on keepalive timeout when Pong is withheld" + ); + } + + /// AC3 (positive path): Connection stays alive when Pong responses arrive + /// before the timeout fires. + #[tokio::test] + async fn keepalive_connection_survives_with_pong_responses() { + use futures::{SinkExt, StreamExt}; + use std::time::Duration; + use tokio::net::TcpListener; + use tokio_tungstenite::tungstenite::Message as TMsg; + use tokio_tungstenite::{accept_async, connect_async}; + + let ping_ms = 100u64; + let timeout_ms = 250u64; + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let (result_tx, result_rx) = tokio::sync::oneshot::channel::(); + + // Server: sends Pings, resets deadline on Pong. + tokio::spawn(async move { + let (tcp, _) = listener.accept().await.unwrap(); + let ws = accept_async(tcp).await.unwrap(); + let (mut sink, mut stream) = ws.split(); + + let mut pong_deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_ms); + let mut ticker = tokio::time::interval_at( + tokio::time::Instant::now() + Duration::from_millis(ping_ms), + Duration::from_millis(ping_ms), + ); + + let timed_out = loop { + tokio::select! { + _ = ticker.tick() => { + if tokio::time::Instant::now() >= pong_deadline { break true; } + if sink.send(TMsg::Ping(bytes::Bytes::new())).await.is_err() { + break false; + } + } + frame = stream.next() => { + match frame { + Some(Ok(TMsg::Pong(_))) => { + pong_deadline = tokio::time::Instant::now() + + Duration::from_millis(timeout_ms); + } + None | Some(Err(_)) => break false, // clean close + _ => {} + } + } + } + }; + let _ = result_tx.send(timed_out); + }); + + let (ws_client, _) = connect_async(format!("ws://{addr}")).await.unwrap(); + let (mut sink_c, mut stream_c) = ws_client.split(); + + // Client: respond to every Ping with Pong for several intervals. + let respond_task = tokio::spawn(async move { + while let Some(Ok(msg)) = stream_c.next().await { + if let TMsg::Ping(data) = msg + && sink_c.send(TMsg::Pong(data)).await.is_err() + { + break; + } + } + }); + + // Run for a few intervals, then drop the client. + tokio::time::sleep(Duration::from_millis(ping_ms * 3)).await; + respond_task.abort(); + + let result = tokio::time::timeout(Duration::from_millis(200), result_rx).await; + let timed_out = result.unwrap_or(Ok(false)).unwrap_or(false); + assert!( + !timed_out, + "Server must NOT timeout when the client responds to Pings with Pongs" + ); + } + + /// AC6: Integration — one node swallows Pongs; the other drops the + /// connection within the timeout, then reconnect is possible via backoff. + #[tokio::test] + async fn two_node_pong_swallow_causes_disconnect_within_timeout() { + use futures::{SinkExt, StreamExt}; + use std::time::Duration; + use tokio::net::TcpListener; + use tokio_tungstenite::tungstenite::Message as TMsg; + use tokio_tungstenite::{accept_async, connect_async}; + + let ping_ms = 100u64; + let timeout_ms = 250u64; + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + // Node A (listener): sends Pings, never receives Pong. + let (a_done_tx, a_done_rx) = tokio::sync::oneshot::channel::(); + tokio::spawn(async move { + let (tcp, _) = listener.accept().await.unwrap(); + let ws = accept_async(tcp).await.unwrap(); + let (mut sink, mut stream) = ws.split(); + + let pong_deadline = tokio::time::Instant::now() + Duration::from_millis(timeout_ms); + let mut ticker = tokio::time::interval_at( + tokio::time::Instant::now() + Duration::from_millis(ping_ms), + Duration::from_millis(ping_ms), + ); + + let timed_out = loop { + tokio::select! { + _ = ticker.tick() => { + if tokio::time::Instant::now() >= pong_deadline { break true; } + if sink.send(TMsg::Ping(bytes::Bytes::new())).await.is_err() { + break false; + } + } + frame = stream.next() => { + match frame { + Some(Ok(_)) => {} // swallow all frames + _ => break false, + } + } + } + }; + let _ = a_done_tx.send(timed_out); + }); + + // Node B: connects, drains frames silently (swallows Pings, never pongs). + let (ws_b, _) = connect_async(format!("ws://{addr}")).await.unwrap(); + let (_sink_b, mut stream_b) = ws_b.split(); + tokio::spawn(async move { while let Some(Ok(_)) = stream_b.next().await {} }); + + let result = + tokio::time::timeout(Duration::from_millis(timeout_ms + ping_ms * 3), a_done_rx).await; + let timed_out = result + .expect("Node A must report within expected wall-clock time") + .expect("channel intact"); + + assert!( + timed_out, + "Node A must disconnect due to keepalive timeout when Node B swallows Pongs" + ); + } }