huskies: merge 506_story_websocket_sync_endpoint_that_broadcasts_local_signedops_to_connected_peers
This commit is contained in:
+272
-30
@@ -3,16 +3,26 @@
|
||||
///
|
||||
/// # Protocol
|
||||
///
|
||||
/// The sync protocol uses newline-delimited JSON over WebSocket text frames.
|
||||
/// Each message is a JSON object with a `"type"` field:
|
||||
/// The sync protocol is a hybrid of two frame types:
|
||||
///
|
||||
/// ## Text frames (bulk initial state)
|
||||
/// A JSON object with a `"type"` field:
|
||||
/// - `{"type":"bulk","ops":[...]}` — Initial state dump (array of serialised
|
||||
/// `SignedOp` JSON strings). Sent by both sides immediately after connect.
|
||||
/// - `{"type":"op","op":"..."}` — A single new `SignedOp` (serialised JSON
|
||||
/// string). Streamed in real-time as local ops are created.
|
||||
///
|
||||
/// ## Binary frames (real-time op broadcast)
|
||||
/// Individual `SignedOp`s encoded via [`crate::crdt_wire`] (versioned JSON
|
||||
/// envelope: `{"v":1,"op":{...}}`). Each locally-applied op is immediately
|
||||
/// broadcast as a binary frame to all connected peers.
|
||||
///
|
||||
/// Both the server endpoint and the rendezvous client use the same protocol,
|
||||
/// making the connection fully symmetric.
|
||||
///
|
||||
/// ## Backpressure
|
||||
/// Each connected peer has its own [`tokio::sync::broadcast`] receiver. If a
|
||||
/// slow peer allows the channel to fill (indicated by a `Lagged` error), the
|
||||
/// connection is dropped with a warning log. The peer can reconnect and
|
||||
/// receive a fresh bulk state dump to catch up.
|
||||
use bft_json_crdt::json_crdt::SignedOp;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use poem::handler;
|
||||
@@ -22,6 +32,7 @@ use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::crdt_state;
|
||||
use crate::crdt_wire;
|
||||
use crate::http::context::AppContext;
|
||||
use crate::slog;
|
||||
|
||||
@@ -65,21 +76,20 @@ pub async fn crdt_sync_handler(
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Forward new local ops to the peer.
|
||||
// Forward new local ops to the peer encoded via the wire codec.
|
||||
result = op_rx.recv() => {
|
||||
match result {
|
||||
Ok(signed_op) => {
|
||||
if let Ok(op_json) = serde_json::to_string(&signed_op) {
|
||||
let msg = SyncMessage::Op { op: op_json };
|
||||
if let Ok(text) = serde_json::to_string(&msg)
|
||||
&& sink.send(WsMessage::Text(text)).await.is_err()
|
||||
{
|
||||
let bytes = crdt_wire::encode(&signed_op);
|
||||
if sink.send(WsMessage::Binary(bytes)).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
slog!("[crdt-sync] Lagged {n} ops, peer may need re-sync");
|
||||
// The peer cannot keep up; disconnect so it can
|
||||
// reconnect and receive a fresh bulk state dump.
|
||||
slog!("[crdt-sync] Slow peer lagged {n} ops; disconnecting");
|
||||
break;
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
@@ -88,7 +98,12 @@ pub async fn crdt_sync_handler(
|
||||
frame = stream.next() => {
|
||||
match frame {
|
||||
Some(Ok(WsMessage::Text(text))) => {
|
||||
handle_incoming_message(&text);
|
||||
// Bulk state dump or legacy text-frame op.
|
||||
handle_incoming_text(&text);
|
||||
}
|
||||
Some(Ok(WsMessage::Binary(bytes))) => {
|
||||
// Real-time op encoded via wire codec.
|
||||
handle_incoming_binary(&bytes);
|
||||
}
|
||||
Some(Ok(WsMessage::Close(_))) | None => break,
|
||||
_ => {}
|
||||
@@ -101,12 +116,15 @@ pub async fn crdt_sync_handler(
|
||||
})
|
||||
}
|
||||
|
||||
/// Process an incoming sync message from a peer.
|
||||
fn handle_incoming_message(text: &str) {
|
||||
/// Process an incoming text-frame sync message from a peer.
|
||||
///
|
||||
/// Text frames carry the bulk state dump (`SyncMessage::Bulk`) or legacy
|
||||
/// single-op messages (`SyncMessage::Op`).
|
||||
fn handle_incoming_text(text: &str) {
|
||||
let msg: SyncMessage = match serde_json::from_str(text) {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
slog!("[crdt-sync] Bad message from peer: {e}");
|
||||
slog!("[crdt-sync] Bad text message from peer: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -134,6 +152,20 @@ fn handle_incoming_message(text: &str) {
|
||||
}
|
||||
}
|
||||
|
||||
/// Process an incoming binary-frame op from a peer.
|
||||
///
|
||||
/// Binary frames carry a single `SignedOp` encoded via [`crdt_wire`].
|
||||
fn handle_incoming_binary(bytes: &[u8]) {
|
||||
match crdt_wire::decode(bytes) {
|
||||
Ok(signed_op) => {
|
||||
crdt_state::apply_remote_op(signed_op);
|
||||
}
|
||||
Err(e) => {
|
||||
slog!("[crdt-sync] Bad binary frame from peer: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Rendezvous client ───────────────────────────────────────────────
|
||||
|
||||
/// Spawn a background task that connects to the configured rendezvous
|
||||
@@ -192,18 +224,16 @@ async fn connect_and_sync(url: &str) -> Result<(), String> {
|
||||
result = op_rx.recv() => {
|
||||
match result {
|
||||
Ok(signed_op) => {
|
||||
if let Ok(op_json) = serde_json::to_string(&signed_op) {
|
||||
let msg = SyncMessage::Op { op: op_json };
|
||||
if let Ok(text) = serde_json::to_string(&msg) {
|
||||
// Encode via wire codec and send as binary frame.
|
||||
let bytes = crdt_wire::encode(&signed_op);
|
||||
use tokio_tungstenite::tungstenite::Message as TungsteniteMsg;
|
||||
if sink.send(TungsteniteMsg::Text(text.into())).await.is_err() {
|
||||
if sink.send(TungsteniteMsg::Binary(bytes.into())).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
slog!("[crdt-sync] Lagged {n} ops on rendezvous link");
|
||||
slog!("[crdt-sync] Slow rendezvous link lagged {n} ops; disconnecting");
|
||||
break;
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
@@ -211,7 +241,10 @@ async fn connect_and_sync(url: &str) -> Result<(), String> {
|
||||
frame = stream.next() => {
|
||||
match frame {
|
||||
Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) => {
|
||||
handle_incoming_message(text.as_ref());
|
||||
handle_incoming_text(text.as_ref());
|
||||
}
|
||||
Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(bytes))) => {
|
||||
handle_incoming_binary(&bytes);
|
||||
}
|
||||
Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => break,
|
||||
Some(Err(e)) => {
|
||||
@@ -268,26 +301,36 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_incoming_message_bad_json_does_not_panic() {
|
||||
handle_incoming_message("not valid json");
|
||||
fn handle_incoming_text_bad_json_does_not_panic() {
|
||||
handle_incoming_text("not valid json");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_incoming_message_bulk_with_invalid_ops_does_not_panic() {
|
||||
fn handle_incoming_text_bulk_with_invalid_ops_does_not_panic() {
|
||||
let msg = SyncMessage::Bulk {
|
||||
ops: vec!["not a valid signed op".to_string()],
|
||||
};
|
||||
let json = serde_json::to_string(&msg).unwrap();
|
||||
handle_incoming_message(&json);
|
||||
handle_incoming_text(&json);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_incoming_message_op_with_invalid_op_does_not_panic() {
|
||||
fn handle_incoming_text_op_with_invalid_op_does_not_panic() {
|
||||
let msg = SyncMessage::Op {
|
||||
op: "garbage".to_string(),
|
||||
};
|
||||
let json = serde_json::to_string(&msg).unwrap();
|
||||
handle_incoming_message(&json);
|
||||
handle_incoming_text(&json);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_incoming_binary_bad_bytes_does_not_panic() {
|
||||
handle_incoming_binary(b"not valid wire codec");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_incoming_binary_empty_bytes_does_not_panic() {
|
||||
handle_incoming_binary(b"");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -515,4 +558,203 @@ name = "test"
|
||||
let config = crate::config::ProjectConfig::default();
|
||||
assert!(config.rendezvous.is_none());
|
||||
}
|
||||
|
||||
// ── AC8: peer lifecycle tests ─────────────────────────────────────────────
|
||||
|
||||
/// AC8: A peer that connects and then receives a subsequently-applied op
|
||||
/// gets that op encoded via the wire codec (binary frame).
|
||||
#[test]
|
||||
fn peer_receives_op_encoded_via_wire_codec() {
|
||||
use bft_json_crdt::json_crdt::BaseCrdt;
|
||||
use bft_json_crdt::keypair::make_keypair;
|
||||
use bft_json_crdt::op::ROOT_ID;
|
||||
use serde_json::json;
|
||||
|
||||
use crate::crdt_state::PipelineDoc;
|
||||
use crate::crdt_wire;
|
||||
|
||||
let kp = make_keypair();
|
||||
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
let item: bft_json_crdt::json_crdt::JsonValue = json!({
|
||||
"story_id": "506_story_lifecycle_test",
|
||||
"stage": "1_backlog",
|
||||
"name": "Lifecycle Test",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
})
|
||||
.into();
|
||||
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
|
||||
|
||||
// Simulate what the broadcast handler does: encode via wire codec.
|
||||
let bytes = crdt_wire::encode(&op);
|
||||
|
||||
// The bytes must be a versioned JSON envelope, not a SyncMessage wrapper.
|
||||
let text = std::str::from_utf8(&bytes).expect("wire output is valid UTF-8");
|
||||
assert!(
|
||||
text.contains("\"v\":1"),
|
||||
"wire codec version tag must be present: {text}"
|
||||
);
|
||||
assert!(
|
||||
!text.contains("\"type\":\"op\""),
|
||||
"must not be wrapped in SyncMessage: {text}"
|
||||
);
|
||||
|
||||
// The receiving peer can decode and apply the op.
|
||||
let decoded = crdt_wire::decode(&bytes).expect("decode must succeed");
|
||||
assert_eq!(op, decoded);
|
||||
}
|
||||
|
||||
/// AC8: Multiple connected peers all receive the same broadcast op.
|
||||
#[tokio::test]
|
||||
async fn multiple_peers_all_receive_broadcast_op() {
|
||||
use bft_json_crdt::json_crdt::BaseCrdt;
|
||||
use bft_json_crdt::keypair::make_keypair;
|
||||
use bft_json_crdt::op::ROOT_ID;
|
||||
use serde_json::json;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::crdt_state::PipelineDoc;
|
||||
use crate::crdt_wire;
|
||||
|
||||
// Create a broadcast channel (analogous to SYNC_TX).
|
||||
let (tx, _) = broadcast::channel::<SignedOp>(16);
|
||||
let mut rx_peer1 = tx.subscribe();
|
||||
let mut rx_peer2 = tx.subscribe();
|
||||
|
||||
let kp = make_keypair();
|
||||
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
let item: bft_json_crdt::json_crdt::JsonValue = json!({
|
||||
"story_id": "506_story_multi_peer_test",
|
||||
"stage": "1_backlog",
|
||||
"name": "Multi-Peer Test",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
})
|
||||
.into();
|
||||
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
|
||||
|
||||
// Broadcast one op.
|
||||
tx.send(op.clone()).expect("send must succeed");
|
||||
|
||||
// Both peers receive the same op.
|
||||
let received1 = rx_peer1.recv().await.expect("peer 1 must receive");
|
||||
let received2 = rx_peer2.recv().await.expect("peer 2 must receive");
|
||||
assert_eq!(received1, op);
|
||||
assert_eq!(received2, op);
|
||||
|
||||
// Both encode identically via wire codec.
|
||||
let bytes1 = crdt_wire::encode(&received1);
|
||||
let bytes2 = crdt_wire::encode(&received2);
|
||||
assert_eq!(bytes1, bytes2, "wire-encoded bytes must be identical");
|
||||
}
|
||||
|
||||
/// AC8: A peer disconnecting mid-broadcast does not panic.
|
||||
/// Simulated by dropping the receiver before the sender sends an op.
|
||||
#[test]
|
||||
fn disconnected_peer_does_not_panic() {
|
||||
use bft_json_crdt::json_crdt::BaseCrdt;
|
||||
use bft_json_crdt::keypair::make_keypair;
|
||||
use bft_json_crdt::op::ROOT_ID;
|
||||
use serde_json::json;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::crdt_state::PipelineDoc;
|
||||
|
||||
let (tx, rx) = broadcast::channel::<SignedOp>(16);
|
||||
// Drop the receiver to simulate a peer that disconnected.
|
||||
drop(rx);
|
||||
|
||||
let kp = make_keypair();
|
||||
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
let item: bft_json_crdt::json_crdt::JsonValue = json!({
|
||||
"story_id": "506_story_disconnect_test",
|
||||
"stage": "1_backlog",
|
||||
"name": "Disconnect Test",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
})
|
||||
.into();
|
||||
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
|
||||
|
||||
// Sending to a channel with no receivers returns an error; must not panic.
|
||||
let _ = tx.send(op);
|
||||
}
|
||||
|
||||
/// AC8: A lagged receiver gets a `Lagged` error (confirming the
|
||||
/// disconnect-on-overflow behaviour is reachable).
|
||||
#[tokio::test]
|
||||
async fn lagged_peer_gets_lagged_error() {
|
||||
use bft_json_crdt::json_crdt::BaseCrdt;
|
||||
use bft_json_crdt::keypair::make_keypair;
|
||||
use bft_json_crdt::op::ROOT_ID;
|
||||
use serde_json::json;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::crdt_state::PipelineDoc;
|
||||
|
||||
// Tiny capacity so we can trigger Lagged easily.
|
||||
let (tx, mut rx) = broadcast::channel::<SignedOp>(2);
|
||||
|
||||
let kp = make_keypair();
|
||||
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
let item: bft_json_crdt::json_crdt::JsonValue = json!({
|
||||
"story_id": "506_story_lag_test",
|
||||
"stage": "1_backlog",
|
||||
"name": "Lag Test",
|
||||
"agent": "",
|
||||
"retry_count": 0.0,
|
||||
"blocked": false,
|
||||
"depends_on": "",
|
||||
})
|
||||
.into();
|
||||
let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
|
||||
crdt.apply(op1.clone());
|
||||
|
||||
// Overflow the tiny buffer by sending more ops than the capacity.
|
||||
let op2 = crdt.doc.items[0]
|
||||
.stage
|
||||
.set("2_current".to_string())
|
||||
.sign(&kp);
|
||||
crdt.apply(op2.clone());
|
||||
let op3 = crdt.doc.items[0]
|
||||
.stage
|
||||
.set("3_qa".to_string())
|
||||
.sign(&kp);
|
||||
crdt.apply(op3.clone());
|
||||
let op4 = crdt.doc.items[0]
|
||||
.stage
|
||||
.set("4_merge".to_string())
|
||||
.sign(&kp);
|
||||
crdt.apply(op4.clone());
|
||||
|
||||
// Send more ops than the channel capacity without consuming.
|
||||
let _ = tx.send(op1);
|
||||
let _ = tx.send(op2);
|
||||
let _ = tx.send(op3);
|
||||
let _ = tx.send(op4);
|
||||
|
||||
// The slow peer should now see a Lagged error on next recv.
|
||||
// Consume until we hit Lagged or run out.
|
||||
let mut got_lagged = false;
|
||||
for _ in 0..10 {
|
||||
match rx.recv().await {
|
||||
Err(broadcast::error::RecvError::Lagged(_)) => {
|
||||
got_lagged = true;
|
||||
break;
|
||||
}
|
||||
Ok(_) => continue,
|
||||
Err(broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
assert!(
|
||||
got_lagged,
|
||||
"slow peer must receive a Lagged error when channel overflows"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user