From 1ca9bc1bfdec62d17df7ec7da0d81e62a1db3994 Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 10 Apr 2026 15:49:04 +0000 Subject: [PATCH] huskies: merge 506_story_websocket_sync_endpoint_that_broadcasts_local_signedops_to_connected_peers --- server/src/crdt_sync.rs | 308 +++++++++++++++++++++++++++++++++++----- 1 file changed, 275 insertions(+), 33 deletions(-) diff --git a/server/src/crdt_sync.rs b/server/src/crdt_sync.rs index 32a614ce..54df4bfa 100644 --- a/server/src/crdt_sync.rs +++ b/server/src/crdt_sync.rs @@ -3,16 +3,26 @@ /// /// # Protocol /// -/// The sync protocol uses newline-delimited JSON over WebSocket text frames. -/// Each message is a JSON object with a `"type"` field: +/// The sync protocol is a hybrid of two frame types: /// +/// ## Text frames (bulk initial state) +/// A JSON object with a `"type"` field: /// - `{"type":"bulk","ops":[...]}` — Initial state dump (array of serialised /// `SignedOp` JSON strings). Sent by both sides immediately after connect. -/// - `{"type":"op","op":"..."}` — A single new `SignedOp` (serialised JSON -/// string). Streamed in real-time as local ops are created. +/// +/// ## Binary frames (real-time op broadcast) +/// Individual `SignedOp`s encoded via [`crate::crdt_wire`] (versioned JSON +/// envelope: `{"v":1,"op":{...}}`). Each locally-applied op is immediately +/// broadcast as a binary frame to all connected peers. /// /// Both the server endpoint and the rendezvous client use the same protocol, /// making the connection fully symmetric. +/// +/// ## Backpressure +/// Each connected peer has its own [`tokio::sync::broadcast`] receiver. If a +/// slow peer allows the channel to fill (indicated by a `Lagged` error), the +/// connection is dropped with a warning log. The peer can reconnect and +/// receive a fresh bulk state dump to catch up. use bft_json_crdt::json_crdt::SignedOp; use futures::{SinkExt, StreamExt}; use poem::handler; @@ -22,6 +32,7 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use crate::crdt_state; +use crate::crdt_wire; use crate::http::context::AppContext; use crate::slog; @@ -65,21 +76,20 @@ pub async fn crdt_sync_handler( loop { tokio::select! { - // Forward new local ops to the peer. + // Forward new local ops to the peer encoded via the wire codec. result = op_rx.recv() => { match result { Ok(signed_op) => { - if let Ok(op_json) = serde_json::to_string(&signed_op) { - let msg = SyncMessage::Op { op: op_json }; - if let Ok(text) = serde_json::to_string(&msg) - && sink.send(WsMessage::Text(text)).await.is_err() - { - break; - } + let bytes = crdt_wire::encode(&signed_op); + if sink.send(WsMessage::Binary(bytes)).await.is_err() { + break; } } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - slog!("[crdt-sync] Lagged {n} ops, peer may need re-sync"); + // The peer cannot keep up; disconnect so it can + // reconnect and receive a fresh bulk state dump. + slog!("[crdt-sync] Slow peer lagged {n} ops; disconnecting"); + break; } Err(_) => break, } @@ -88,7 +98,12 @@ pub async fn crdt_sync_handler( frame = stream.next() => { match frame { Some(Ok(WsMessage::Text(text))) => { - handle_incoming_message(&text); + // Bulk state dump or legacy text-frame op. + handle_incoming_text(&text); + } + Some(Ok(WsMessage::Binary(bytes))) => { + // Real-time op encoded via wire codec. + handle_incoming_binary(&bytes); } Some(Ok(WsMessage::Close(_))) | None => break, _ => {} @@ -101,12 +116,15 @@ pub async fn crdt_sync_handler( }) } -/// Process an incoming sync message from a peer. -fn handle_incoming_message(text: &str) { +/// Process an incoming text-frame sync message from a peer. +/// +/// Text frames carry the bulk state dump (`SyncMessage::Bulk`) or legacy +/// single-op messages (`SyncMessage::Op`). +fn handle_incoming_text(text: &str) { let msg: SyncMessage = match serde_json::from_str(text) { Ok(m) => m, Err(e) => { - slog!("[crdt-sync] Bad message from peer: {e}"); + slog!("[crdt-sync] Bad text message from peer: {e}"); return; } }; @@ -134,6 +152,20 @@ fn handle_incoming_message(text: &str) { } } +/// Process an incoming binary-frame op from a peer. +/// +/// Binary frames carry a single `SignedOp` encoded via [`crdt_wire`]. +fn handle_incoming_binary(bytes: &[u8]) { + match crdt_wire::decode(bytes) { + Ok(signed_op) => { + crdt_state::apply_remote_op(signed_op); + } + Err(e) => { + slog!("[crdt-sync] Bad binary frame from peer: {e}"); + } + } +} + // ── Rendezvous client ─────────────────────────────────────────────── /// Spawn a background task that connects to the configured rendezvous @@ -192,18 +224,16 @@ async fn connect_and_sync(url: &str) -> Result<(), String> { result = op_rx.recv() => { match result { Ok(signed_op) => { - if let Ok(op_json) = serde_json::to_string(&signed_op) { - let msg = SyncMessage::Op { op: op_json }; - if let Ok(text) = serde_json::to_string(&msg) { - use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; - if sink.send(TungsteniteMsg::Text(text.into())).await.is_err() { - break; - } - } + // Encode via wire codec and send as binary frame. + let bytes = crdt_wire::encode(&signed_op); + use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; + if sink.send(TungsteniteMsg::Binary(bytes.into())).await.is_err() { + break; } } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - slog!("[crdt-sync] Lagged {n} ops on rendezvous link"); + slog!("[crdt-sync] Slow rendezvous link lagged {n} ops; disconnecting"); + break; } Err(_) => break, } @@ -211,7 +241,10 @@ async fn connect_and_sync(url: &str) -> Result<(), String> { frame = stream.next() => { match frame { Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) => { - handle_incoming_message(text.as_ref()); + handle_incoming_text(text.as_ref()); + } + Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(bytes))) => { + handle_incoming_binary(&bytes); } Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => break, Some(Err(e)) => { @@ -268,26 +301,36 @@ mod tests { } #[test] - fn handle_incoming_message_bad_json_does_not_panic() { - handle_incoming_message("not valid json"); + fn handle_incoming_text_bad_json_does_not_panic() { + handle_incoming_text("not valid json"); } #[test] - fn handle_incoming_message_bulk_with_invalid_ops_does_not_panic() { + fn handle_incoming_text_bulk_with_invalid_ops_does_not_panic() { let msg = SyncMessage::Bulk { ops: vec!["not a valid signed op".to_string()], }; let json = serde_json::to_string(&msg).unwrap(); - handle_incoming_message(&json); + handle_incoming_text(&json); } #[test] - fn handle_incoming_message_op_with_invalid_op_does_not_panic() { + fn handle_incoming_text_op_with_invalid_op_does_not_panic() { let msg = SyncMessage::Op { op: "garbage".to_string(), }; let json = serde_json::to_string(&msg).unwrap(); - handle_incoming_message(&json); + handle_incoming_text(&json); + } + + #[test] + fn handle_incoming_binary_bad_bytes_does_not_panic() { + handle_incoming_binary(b"not valid wire codec"); + } + + #[test] + fn handle_incoming_binary_empty_bytes_does_not_panic() { + handle_incoming_binary(b""); } #[test] @@ -515,4 +558,203 @@ name = "test" let config = crate::config::ProjectConfig::default(); assert!(config.rendezvous.is_none()); } + + // ── AC8: peer lifecycle tests ───────────────────────────────────────────── + + /// AC8: A peer that connects and then receives a subsequently-applied op + /// gets that op encoded via the wire codec (binary frame). + #[test] + fn peer_receives_op_encoded_via_wire_codec() { + use bft_json_crdt::json_crdt::BaseCrdt; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + use crate::crdt_wire; + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "506_story_lifecycle_test", + "stage": "1_backlog", + "name": "Lifecycle Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + + // Simulate what the broadcast handler does: encode via wire codec. + let bytes = crdt_wire::encode(&op); + + // The bytes must be a versioned JSON envelope, not a SyncMessage wrapper. + let text = std::str::from_utf8(&bytes).expect("wire output is valid UTF-8"); + assert!( + text.contains("\"v\":1"), + "wire codec version tag must be present: {text}" + ); + assert!( + !text.contains("\"type\":\"op\""), + "must not be wrapped in SyncMessage: {text}" + ); + + // The receiving peer can decode and apply the op. + let decoded = crdt_wire::decode(&bytes).expect("decode must succeed"); + assert_eq!(op, decoded); + } + + /// AC8: Multiple connected peers all receive the same broadcast op. + #[tokio::test] + async fn multiple_peers_all_receive_broadcast_op() { + use bft_json_crdt::json_crdt::BaseCrdt; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + use tokio::sync::broadcast; + + use crate::crdt_state::PipelineDoc; + use crate::crdt_wire; + + // Create a broadcast channel (analogous to SYNC_TX). + let (tx, _) = broadcast::channel::(16); + let mut rx_peer1 = tx.subscribe(); + let mut rx_peer2 = tx.subscribe(); + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "506_story_multi_peer_test", + "stage": "1_backlog", + "name": "Multi-Peer Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + + // Broadcast one op. + tx.send(op.clone()).expect("send must succeed"); + + // Both peers receive the same op. + let received1 = rx_peer1.recv().await.expect("peer 1 must receive"); + let received2 = rx_peer2.recv().await.expect("peer 2 must receive"); + assert_eq!(received1, op); + assert_eq!(received2, op); + + // Both encode identically via wire codec. + let bytes1 = crdt_wire::encode(&received1); + let bytes2 = crdt_wire::encode(&received2); + assert_eq!(bytes1, bytes2, "wire-encoded bytes must be identical"); + } + + /// AC8: A peer disconnecting mid-broadcast does not panic. + /// Simulated by dropping the receiver before the sender sends an op. + #[test] + fn disconnected_peer_does_not_panic() { + use bft_json_crdt::json_crdt::BaseCrdt; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + use tokio::sync::broadcast; + + use crate::crdt_state::PipelineDoc; + + let (tx, rx) = broadcast::channel::(16); + // Drop the receiver to simulate a peer that disconnected. + drop(rx); + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "506_story_disconnect_test", + "stage": "1_backlog", + "name": "Disconnect Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + + // Sending to a channel with no receivers returns an error; must not panic. + let _ = tx.send(op); + } + + /// AC8: A lagged receiver gets a `Lagged` error (confirming the + /// disconnect-on-overflow behaviour is reachable). + #[tokio::test] + async fn lagged_peer_gets_lagged_error() { + use bft_json_crdt::json_crdt::BaseCrdt; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + use tokio::sync::broadcast; + + use crate::crdt_state::PipelineDoc; + + // Tiny capacity so we can trigger Lagged easily. + let (tx, mut rx) = broadcast::channel::(2); + + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "506_story_lag_test", + "stage": "1_backlog", + "name": "Lag Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt.apply(op1.clone()); + + // Overflow the tiny buffer by sending more ops than the capacity. + let op2 = crdt.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp); + crdt.apply(op2.clone()); + let op3 = crdt.doc.items[0] + .stage + .set("3_qa".to_string()) + .sign(&kp); + crdt.apply(op3.clone()); + let op4 = crdt.doc.items[0] + .stage + .set("4_merge".to_string()) + .sign(&kp); + crdt.apply(op4.clone()); + + // Send more ops than the channel capacity without consuming. + let _ = tx.send(op1); + let _ = tx.send(op2); + let _ = tx.send(op3); + let _ = tx.send(op4); + + // The slow peer should now see a Lagged error on next recv. + // Consume until we hit Lagged or run out. + let mut got_lagged = false; + for _ in 0..10 { + match rx.recv().await { + Err(broadcast::error::RecvError::Lagged(_)) => { + got_lagged = true; + break; + } + Ok(_) => continue, + Err(broadcast::error::RecvError::Closed) => break, + } + } + assert!( + got_lagged, + "slow peer must receive a Lagged error when channel overflows" + ); + } }