huskies: merge 508_story_configurable_rendezvous_peer_in_project_toml_with_outbound_crdt_sync_connect
This commit is contained in:
@@ -89,6 +89,60 @@ script/release 0.7.1
|
|||||||
|
|
||||||
This bumps version in `Cargo.toml` and `package.json`, builds macOS arm64 and Linux amd64 binaries, tags the repo, and publishes a Gitea release with changelog and binaries attached.
|
This bumps version in `Cargo.toml` and `package.json`, builds macOS arm64 and Linux amd64 binaries, tags the repo, and publishes a Gitea release with changelog and binaries attached.
|
||||||
|
|
||||||
|
## Multi-node CRDT sync (rendezvous)
|
||||||
|
|
||||||
|
Huskies nodes can replicate pipeline state in real-time over WebSocket. Add a
|
||||||
|
`rendezvous` field to `.huskies/project.toml` to configure a peer:
|
||||||
|
|
||||||
|
```toml
|
||||||
|
rendezvous = "ws://other-host:3001/crdt-sync"
|
||||||
|
```
|
||||||
|
|
||||||
|
On startup, this node opens an outbound WebSocket connection to the configured
|
||||||
|
URL and exchanges CRDT ops bidirectionally. The connection is fully symmetric:
|
||||||
|
both sides send a bulk state dump on connect, then stream individual ops as they
|
||||||
|
are applied locally.
|
||||||
|
|
||||||
|
### Reconnect behaviour
|
||||||
|
|
||||||
|
If the peer is unreachable on startup (or the connection drops mid-session), the
|
||||||
|
client retries with exponential backoff starting at 1 s and capping at 30 s.
|
||||||
|
Failures are logged at **WARN**; after 10 consecutive failures the level escalates
|
||||||
|
to **ERROR** to surface persistent connectivity problems.
|
||||||
|
|
||||||
|
### Deployment topologies
|
||||||
|
|
||||||
|
**Peer-to-peer (two nodes pointing at each other):**
|
||||||
|
|
||||||
|
```
|
||||||
|
Node A ←→ Node B
|
||||||
|
```
|
||||||
|
|
||||||
|
Configure each node with the other's `/crdt-sync` URL. Both nodes exchange ops
|
||||||
|
directly. Supported by this story — ops propagate in both directions and both
|
||||||
|
nodes converge to the same state. Works well for two machines collaborating on
|
||||||
|
the same project.
|
||||||
|
|
||||||
|
**Hub-and-spoke (many clients → one central rendezvous):**
|
||||||
|
|
||||||
|
```
|
||||||
|
Client 1 ──┐
|
||||||
|
Client 2 ──┤── Hub node
|
||||||
|
Client 3 ──┘
|
||||||
|
```
|
||||||
|
|
||||||
|
Point multiple client nodes at a single "hub" node. The hub receives ops from
|
||||||
|
all clients and re-broadcasts them. Clients do *not* connect to each other —
|
||||||
|
convergence is mediated through the hub. The hub itself runs a normal huskies
|
||||||
|
instance with `rendezvous` unset (it only accepts inbound connections).
|
||||||
|
|
||||||
|
> **Caveat:** Hub-to-client relay depends on the hub's `/crdt-sync` inbound
|
||||||
|
> WebSocket handler re-broadcasting every received op to all other connected
|
||||||
|
> peers. That broadcast happens automatically via the shared `SYNC_TX` channel
|
||||||
|
> (each locally-applied remote op is re-emitted), so hub-and-spoke works today
|
||||||
|
> but has not been load-tested. Follow-up work may be needed for large fan-out
|
||||||
|
> (many spoke clients) to avoid broadcast-channel lag.
|
||||||
|
|
||||||
## Startup reconcile pass
|
## Startup reconcile pass
|
||||||
|
|
||||||
On startup, after CRDT replay and database initialisation, huskies runs a
|
On startup, after CRDT replay and database initialisation, huskies runs a
|
||||||
|
|||||||
+369
-1
@@ -35,6 +35,8 @@ use crate::crdt_state;
|
|||||||
use crate::crdt_wire;
|
use crate::crdt_wire;
|
||||||
use crate::http::context::AppContext;
|
use crate::http::context::AppContext;
|
||||||
use crate::slog;
|
use crate::slog;
|
||||||
|
use crate::slog_error;
|
||||||
|
use crate::slog_warn;
|
||||||
|
|
||||||
// ── Wire protocol types ─────────────────────────────────────────────
|
// ── Wire protocol types ─────────────────────────────────────────────
|
||||||
|
|
||||||
@@ -168,22 +170,38 @@ fn handle_incoming_binary(bytes: &[u8]) {
|
|||||||
|
|
||||||
// ── Rendezvous client ───────────────────────────────────────────────
|
// ── Rendezvous client ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Number of consecutive connection failures before escalating from WARN to ERROR.
|
||||||
|
pub const RENDEZVOUS_ERROR_THRESHOLD: u32 = 10;
|
||||||
|
|
||||||
/// Spawn a background task that connects to the configured rendezvous
|
/// Spawn a background task that connects to the configured rendezvous
|
||||||
/// peer and exchanges CRDT ops bidirectionally.
|
/// peer and exchanges CRDT ops bidirectionally.
|
||||||
///
|
///
|
||||||
/// The client reconnects with exponential backoff if the connection drops.
|
/// The client reconnects with exponential backoff if the connection drops.
|
||||||
|
/// Individual failures are logged at WARN; after [`RENDEZVOUS_ERROR_THRESHOLD`]
|
||||||
|
/// consecutive failures the log level escalates to ERROR.
|
||||||
pub fn spawn_rendezvous_client(url: String) {
|
pub fn spawn_rendezvous_client(url: String) {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut backoff_secs = 1u64;
|
let mut backoff_secs = 1u64;
|
||||||
|
let mut consecutive_failures: u32 = 0;
|
||||||
loop {
|
loop {
|
||||||
slog!("[crdt-sync] Connecting to rendezvous peer: {url}");
|
slog!("[crdt-sync] Connecting to rendezvous peer: {url}");
|
||||||
match connect_and_sync(&url).await {
|
match connect_and_sync(&url).await {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
slog!("[crdt-sync] Rendezvous connection closed cleanly");
|
slog!("[crdt-sync] Rendezvous connection closed cleanly");
|
||||||
backoff_secs = 1;
|
backoff_secs = 1;
|
||||||
|
consecutive_failures = 0;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
slog!("[crdt-sync] Rendezvous connection error: {e}");
|
consecutive_failures += 1;
|
||||||
|
if consecutive_failures >= RENDEZVOUS_ERROR_THRESHOLD {
|
||||||
|
slog_error!(
|
||||||
|
"[crdt-sync] Rendezvous peer unreachable ({consecutive_failures} consecutive failures): {e}"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
slog_warn!(
|
||||||
|
"[crdt-sync] Rendezvous connection error (attempt {consecutive_failures}): {e}"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
slog!("[crdt-sync] Reconnecting in {backoff_secs}s...");
|
slog!("[crdt-sync] Reconnecting in {backoff_secs}s...");
|
||||||
@@ -559,6 +577,67 @@ name = "test"
|
|||||||
assert!(config.rendezvous.is_none());
|
assert!(config.rendezvous.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── AC4: Failure logging escalation ──────────────────────────────────────
|
||||||
|
|
||||||
|
/// AC4: Connection errors must be logged at WARN for the first nine
|
||||||
|
/// consecutive failures and escalate to ERROR from the tenth onwards.
|
||||||
|
#[test]
|
||||||
|
fn failure_counter_warn_below_threshold() {
|
||||||
|
let threshold = super::RENDEZVOUS_ERROR_THRESHOLD;
|
||||||
|
let mut consecutive_failures: u32 = 0;
|
||||||
|
|
||||||
|
// First threshold-1 failures are below the ERROR threshold.
|
||||||
|
for _ in 0..(threshold - 1) {
|
||||||
|
consecutive_failures += 1;
|
||||||
|
assert!(
|
||||||
|
consecutive_failures < threshold,
|
||||||
|
"failure {consecutive_failures} must be below ERROR threshold {threshold}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC4: The tenth consecutive failure must trigger ERROR-level logging.
|
||||||
|
#[test]
|
||||||
|
fn failure_counter_error_at_threshold() {
|
||||||
|
let threshold = super::RENDEZVOUS_ERROR_THRESHOLD;
|
||||||
|
let consecutive_failures: u32 = threshold;
|
||||||
|
assert!(
|
||||||
|
consecutive_failures >= threshold,
|
||||||
|
"failure {consecutive_failures} must reach or exceed ERROR threshold {threshold}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC4: A successful connection resets the failure counter so subsequent
|
||||||
|
/// single failures are again logged at WARN (not ERROR).
|
||||||
|
#[test]
|
||||||
|
fn failure_counter_resets_on_success() {
|
||||||
|
let threshold = super::RENDEZVOUS_ERROR_THRESHOLD;
|
||||||
|
// Simulate sustained failure.
|
||||||
|
let mut consecutive_failures: u32 = threshold + 5;
|
||||||
|
assert!(consecutive_failures >= threshold);
|
||||||
|
|
||||||
|
// Simulate a clean reconnect.
|
||||||
|
consecutive_failures = 0;
|
||||||
|
assert_eq!(consecutive_failures, 0, "counter must reset to 0 on success");
|
||||||
|
|
||||||
|
// Next error is attempt 1 — well below the ERROR threshold.
|
||||||
|
consecutive_failures += 1;
|
||||||
|
assert!(
|
||||||
|
consecutive_failures < threshold,
|
||||||
|
"first failure after reset must be below ERROR threshold"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC4: The RENDEZVOUS_ERROR_THRESHOLD constant must equal 10.
|
||||||
|
#[test]
|
||||||
|
fn error_threshold_is_ten() {
|
||||||
|
assert_eq!(
|
||||||
|
super::RENDEZVOUS_ERROR_THRESHOLD,
|
||||||
|
10,
|
||||||
|
"ERROR escalation threshold must be 10 consecutive failures"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// ── AC5: Self-loop dedup ──────────────────────────────────────────────────
|
// ── AC5: Self-loop dedup ──────────────────────────────────────────────────
|
||||||
|
|
||||||
/// AC5: Applying the same SignedOp twice returns AlreadySeen on the second
|
/// AC5: Applying the same SignedOp twice returns AlreadySeen on the second
|
||||||
@@ -1125,4 +1204,293 @@ name = "test"
|
|||||||
"slow peer must receive a Lagged error when channel overflows"
|
"slow peer must receive a Lagged error when channel overflows"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── AC5 (story 508): E2E convergence test via real WebSocket ─────────────
|
||||||
|
|
||||||
|
/// AC5: Spin up two in-process WebSocket nodes. Node A serves a
|
||||||
|
/// `/crdt-sync`-compatible endpoint; Node B connects as a rendezvous client.
|
||||||
|
/// Node A sends a bulk state; Node B applies it. Assert both nodes see the
|
||||||
|
/// same items within a bounded time window.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn e2e_convergence_two_websocket_nodes() {
|
||||||
|
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState};
|
||||||
|
use bft_json_crdt::keypair::make_keypair;
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use futures::{SinkExt, StreamExt};
|
||||||
|
use serde_json::json;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
use tokio_tungstenite::{accept_async, connect_async};
|
||||||
|
use tokio_tungstenite::tungstenite::Message as TMsg;
|
||||||
|
|
||||||
|
use crate::crdt_state::PipelineDoc;
|
||||||
|
|
||||||
|
// ── Node A: build local state ──────────────────────────────────────
|
||||||
|
let kp_a = make_keypair();
|
||||||
|
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
|
||||||
|
|
||||||
|
let item: bft_json_crdt::json_crdt::JsonValue = json!({
|
||||||
|
"story_id": "508_e2e_convergence",
|
||||||
|
"stage": "2_current",
|
||||||
|
"name": "E2E Convergence Test",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a);
|
||||||
|
crdt_a.apply(op1.clone());
|
||||||
|
|
||||||
|
// Serialise A's full state as a bulk message.
|
||||||
|
let op1_json = serde_json::to_string(&op1).unwrap();
|
||||||
|
let bulk_msg = SyncMessage::Bulk { ops: vec![op1_json] };
|
||||||
|
let bulk_wire = serde_json::to_string(&bulk_msg).unwrap();
|
||||||
|
|
||||||
|
// ── Start Node A's WebSocket server on a random port ───────────────
|
||||||
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
let addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
|
let bulk_to_send = bulk_wire.clone();
|
||||||
|
let received_by_a: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(vec![]));
|
||||||
|
let received_by_a_clone = received_by_a.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let (tcp_stream, _) = listener.accept().await.unwrap();
|
||||||
|
let ws_stream = accept_async(tcp_stream).await.unwrap();
|
||||||
|
let (mut sink, mut stream) = ws_stream.split();
|
||||||
|
|
||||||
|
// Send bulk state to the connecting peer.
|
||||||
|
sink.send(TMsg::Text(bulk_to_send.into())).await.unwrap();
|
||||||
|
|
||||||
|
// Also listen for ops sent by the peer.
|
||||||
|
if let Some(Ok(TMsg::Text(txt))) = stream.next().await {
|
||||||
|
received_by_a_clone.lock().unwrap().push(txt.to_string());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// ── Node B: connect to Node A and exchange state ───────────────────
|
||||||
|
let kp_b = make_keypair();
|
||||||
|
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
|
||||||
|
|
||||||
|
let url = format!("ws://{addr}");
|
||||||
|
let (ws_b, _) = connect_async(&url).await.unwrap();
|
||||||
|
let (mut sink_b, mut stream_b) = ws_b.split();
|
||||||
|
|
||||||
|
// Node B receives bulk from A.
|
||||||
|
if let Some(Ok(TMsg::Text(txt))) = stream_b.next().await {
|
||||||
|
let msg: SyncMessage = serde_json::from_str(txt.as_str()).unwrap();
|
||||||
|
match msg {
|
||||||
|
SyncMessage::Bulk { ops } => {
|
||||||
|
for op_str in &ops {
|
||||||
|
let signed: bft_json_crdt::json_crdt::SignedOp =
|
||||||
|
serde_json::from_str(op_str).unwrap();
|
||||||
|
let r = crdt_b.apply(signed);
|
||||||
|
assert!(r == OpState::Ok || r == OpState::AlreadySeen);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => panic!("Expected Bulk from Node A"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Node B also creates a new op and sends it to A.
|
||||||
|
let item_b: bft_json_crdt::json_crdt::JsonValue = json!({
|
||||||
|
"story_id": "508_e2e_convergence_b",
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": "E2E Convergence B",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let op_b1 = crdt_b.doc.items.insert(ROOT_ID, item_b).sign(&kp_b);
|
||||||
|
crdt_b.apply(op_b1.clone());
|
||||||
|
|
||||||
|
let op_b1_json = serde_json::to_string(&op_b1).unwrap();
|
||||||
|
let msg_to_a = SyncMessage::Op { op: op_b1_json };
|
||||||
|
sink_b
|
||||||
|
.send(TMsg::Text(serde_json::to_string(&msg_to_a).unwrap().into()))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Wait a moment for Node A to process.
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||||
|
|
||||||
|
// ── Assert convergence ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
// Node B received Node A's item.
|
||||||
|
assert_eq!(crdt_b.doc.items.view().len(), 2,
|
||||||
|
"Node B must see both items after sync");
|
||||||
|
let has_a_item = crdt_b.doc.items.view().iter().any(|item| {
|
||||||
|
item.story_id.view() == JV::String("508_e2e_convergence".to_string())
|
||||||
|
});
|
||||||
|
assert!(has_a_item, "Node B must have Node A's item");
|
||||||
|
|
||||||
|
// Node A received Node B's op via the WebSocket.
|
||||||
|
let a_received = received_by_a.lock().unwrap();
|
||||||
|
assert!(
|
||||||
|
!a_received.is_empty(),
|
||||||
|
"Node A must have received an op from Node B"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── AC6 (story 508): Partition healing E2E via real WebSocket ─────────────
|
||||||
|
|
||||||
|
/// AC6: Two nodes exchange ops, the connection is dropped (partition), each
|
||||||
|
/// side mutates independently, then they reconnect and the reconnecting
|
||||||
|
/// client sends a fresh bulk state. Assert both converge to the same final
|
||||||
|
/// state.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn e2e_partition_healing_websocket() {
|
||||||
|
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV};
|
||||||
|
use bft_json_crdt::keypair::make_keypair;
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use futures::{SinkExt, StreamExt};
|
||||||
|
use serde_json::json;
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
use tokio_tungstenite::{accept_async, connect_async};
|
||||||
|
use tokio_tungstenite::tungstenite::Message as TMsg;
|
||||||
|
|
||||||
|
use crate::crdt_state::PipelineDoc;
|
||||||
|
|
||||||
|
// ── Phase 1: Both nodes start with op_a1 (before partition) ───────
|
||||||
|
let kp_a = make_keypair();
|
||||||
|
let kp_b = make_keypair();
|
||||||
|
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
|
||||||
|
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
|
||||||
|
|
||||||
|
let item_a: bft_json_crdt::json_crdt::JsonValue = json!({
|
||||||
|
"story_id": "508_heal_a",
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": "Heal A",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let op_a1 = crdt_a.doc.items.insert(ROOT_ID, item_a).sign(&kp_a);
|
||||||
|
crdt_a.apply(op_a1.clone());
|
||||||
|
// B also starts with op_a1 (shared state before partition).
|
||||||
|
crdt_b.apply(op_a1.clone());
|
||||||
|
|
||||||
|
// ── Phase 2: Partition — each side mutates independently ──────────
|
||||||
|
// A advances its story stage.
|
||||||
|
let op_a2 = crdt_a.doc.items[0]
|
||||||
|
.stage
|
||||||
|
.set("2_current".to_string())
|
||||||
|
.sign(&kp_a);
|
||||||
|
crdt_a.apply(op_a2.clone());
|
||||||
|
|
||||||
|
// B inserts a new story that A doesn't know about yet.
|
||||||
|
let item_b: bft_json_crdt::json_crdt::JsonValue = json!({
|
||||||
|
"story_id": "508_heal_b",
|
||||||
|
"stage": "1_backlog",
|
||||||
|
"name": "Heal B",
|
||||||
|
"agent": "",
|
||||||
|
"retry_count": 0.0,
|
||||||
|
"blocked": false,
|
||||||
|
"depends_on": "",
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let op_b1 = crdt_b.doc.items.insert(ROOT_ID, item_b).sign(&kp_b);
|
||||||
|
crdt_b.apply(op_b1.clone());
|
||||||
|
|
||||||
|
// Collect B's full state as bulk (what it will send on reconnect).
|
||||||
|
let b_ops: Vec<String> = [&op_a1, &op_b1]
|
||||||
|
.iter()
|
||||||
|
.map(|op| serde_json::to_string(op).unwrap())
|
||||||
|
.collect();
|
||||||
|
let b_bulk_wire = serde_json::to_string(&SyncMessage::Bulk { ops: b_ops }).unwrap();
|
||||||
|
|
||||||
|
// Collect A's full state as bulk (what it will send on reconnect).
|
||||||
|
let a_ops: Vec<String> = [&op_a1, &op_a2]
|
||||||
|
.iter()
|
||||||
|
.map(|op| serde_json::to_string(op).unwrap())
|
||||||
|
.collect();
|
||||||
|
let a_bulk_wire = serde_json::to_string(&SyncMessage::Bulk { ops: a_ops }).unwrap();
|
||||||
|
|
||||||
|
// ── Phase 3: Reconnect — use a real WebSocket to exchange bulk ────
|
||||||
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||||
|
let addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
|
let a_bulk_to_send = a_bulk_wire.clone();
|
||||||
|
let a_received_bulk: std::sync::Arc<std::sync::Mutex<Option<String>>> =
|
||||||
|
std::sync::Arc::new(std::sync::Mutex::new(None));
|
||||||
|
let a_received_clone = a_received_bulk.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let (tcp, _) = listener.accept().await.unwrap();
|
||||||
|
let ws = accept_async(tcp).await.unwrap();
|
||||||
|
let (mut sink, mut stream) = ws.split();
|
||||||
|
// A sends its bulk state.
|
||||||
|
sink.send(TMsg::Text(a_bulk_to_send.into())).await.unwrap();
|
||||||
|
// A receives B's bulk state.
|
||||||
|
if let Some(Ok(TMsg::Text(txt))) = stream.next().await {
|
||||||
|
*a_received_clone.lock().unwrap() = Some(txt.to_string());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// B connects, exchanges bulk state.
|
||||||
|
let (ws_b, _) = connect_async(format!("ws://{addr}")).await.unwrap();
|
||||||
|
let (mut sink_b, mut stream_b) = ws_b.split();
|
||||||
|
|
||||||
|
// B receives A's bulk and applies it.
|
||||||
|
if let Some(Ok(TMsg::Text(txt))) = stream_b.next().await {
|
||||||
|
let msg: SyncMessage = serde_json::from_str(txt.as_str()).unwrap();
|
||||||
|
if let SyncMessage::Bulk { ops } = msg {
|
||||||
|
for op_str in &ops {
|
||||||
|
let signed: bft_json_crdt::json_crdt::SignedOp =
|
||||||
|
serde_json::from_str(op_str).unwrap();
|
||||||
|
let _ = crdt_b.apply(signed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// B sends its bulk state to A.
|
||||||
|
sink_b
|
||||||
|
.send(TMsg::Text(b_bulk_wire.into()))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||||
|
|
||||||
|
// Apply A's received ops into crdt_a.
|
||||||
|
if let Some(bulk_str) = a_received_bulk.lock().unwrap().take() {
|
||||||
|
let msg: SyncMessage = serde_json::from_str(&bulk_str).unwrap();
|
||||||
|
if let SyncMessage::Bulk { ops } = msg {
|
||||||
|
for op_str in &ops {
|
||||||
|
let signed: bft_json_crdt::json_crdt::SignedOp =
|
||||||
|
serde_json::from_str(op_str).unwrap();
|
||||||
|
let _ = crdt_a.apply(signed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Assert convergence ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
// Both nodes must have 2 items.
|
||||||
|
assert_eq!(crdt_a.doc.items.view().len(), 2,
|
||||||
|
"A must have 2 items after healing");
|
||||||
|
assert_eq!(crdt_b.doc.items.view().len(), 2,
|
||||||
|
"B must have 2 items after healing");
|
||||||
|
|
||||||
|
// A must see B's story.
|
||||||
|
let b_story_on_a = crdt_a.doc.items.view().iter().any(|item| {
|
||||||
|
item.story_id.view() == JV::String("508_heal_b".to_string())
|
||||||
|
});
|
||||||
|
assert!(b_story_on_a, "A must have B's story after healing");
|
||||||
|
|
||||||
|
// B must see A's stage advance.
|
||||||
|
let a_story_on_b = crdt_b.doc.items.view().iter().any(|item| {
|
||||||
|
item.story_id.view() == JV::String("508_heal_a".to_string())
|
||||||
|
});
|
||||||
|
assert!(a_story_on_b, "B must have A's story after healing");
|
||||||
|
|
||||||
|
// CRDT views must be byte-identical (convergence).
|
||||||
|
let view_a = serde_json::to_string(&CrdtNode::view(&crdt_a.doc.items)).unwrap();
|
||||||
|
let view_b = serde_json::to_string(&CrdtNode::view(&crdt_b.doc.items)).unwrap();
|
||||||
|
assert_eq!(view_a, view_b, "Both nodes must converge to identical state");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user