diff --git a/README.md b/README.md index 9bf0486f..a65a7356 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,60 @@ script/release 0.7.1 This bumps version in `Cargo.toml` and `package.json`, builds macOS arm64 and Linux amd64 binaries, tags the repo, and publishes a Gitea release with changelog and binaries attached. +## Multi-node CRDT sync (rendezvous) + +Huskies nodes can replicate pipeline state in real-time over WebSocket. Add a +`rendezvous` field to `.huskies/project.toml` to configure a peer: + +```toml +rendezvous = "ws://other-host:3001/crdt-sync" +``` + +On startup, this node opens an outbound WebSocket connection to the configured +URL and exchanges CRDT ops bidirectionally. The connection is fully symmetric: +both sides send a bulk state dump on connect, then stream individual ops as they +are applied locally. + +### Reconnect behaviour + +If the peer is unreachable on startup (or the connection drops mid-session), the +client retries with exponential backoff starting at 1 s and capping at 30 s. +Failures are logged at **WARN**; after 10 consecutive failures the level escalates +to **ERROR** to surface persistent connectivity problems. + +### Deployment topologies + +**Peer-to-peer (two nodes pointing at each other):** + +``` +Node A ←→ Node B +``` + +Configure each node with the other's `/crdt-sync` URL. Both nodes exchange ops +directly. Supported by this story — ops propagate in both directions and both +nodes converge to the same state. Works well for two machines collaborating on +the same project. + +**Hub-and-spoke (many clients → one central rendezvous):** + +``` +Client 1 ──┐ +Client 2 ──┤── Hub node +Client 3 ──┘ +``` + +Point multiple client nodes at a single "hub" node. The hub receives ops from +all clients and re-broadcasts them. Clients do *not* connect to each other — +convergence is mediated through the hub. The hub itself runs a normal huskies +instance with `rendezvous` unset (it only accepts inbound connections). + +> **Caveat:** Hub-to-client relay depends on the hub's `/crdt-sync` inbound +> WebSocket handler re-broadcasting every received op to all other connected +> peers. That broadcast happens automatically via the shared `SYNC_TX` channel +> (each locally-applied remote op is re-emitted), so hub-and-spoke works today +> but has not been load-tested. Follow-up work may be needed for large fan-out +> (many spoke clients) to avoid broadcast-channel lag. + ## Startup reconcile pass On startup, after CRDT replay and database initialisation, huskies runs a diff --git a/server/src/crdt_sync.rs b/server/src/crdt_sync.rs index c4cde08a..976a174b 100644 --- a/server/src/crdt_sync.rs +++ b/server/src/crdt_sync.rs @@ -35,6 +35,8 @@ use crate::crdt_state; use crate::crdt_wire; use crate::http::context::AppContext; use crate::slog; +use crate::slog_error; +use crate::slog_warn; // ── Wire protocol types ───────────────────────────────────────────── @@ -168,22 +170,38 @@ fn handle_incoming_binary(bytes: &[u8]) { // ── Rendezvous client ─────────────────────────────────────────────── +/// Number of consecutive connection failures before escalating from WARN to ERROR. +pub const RENDEZVOUS_ERROR_THRESHOLD: u32 = 10; + /// Spawn a background task that connects to the configured rendezvous /// peer and exchanges CRDT ops bidirectionally. /// /// The client reconnects with exponential backoff if the connection drops. +/// Individual failures are logged at WARN; after [`RENDEZVOUS_ERROR_THRESHOLD`] +/// consecutive failures the log level escalates to ERROR. pub fn spawn_rendezvous_client(url: String) { tokio::spawn(async move { let mut backoff_secs = 1u64; + let mut consecutive_failures: u32 = 0; loop { slog!("[crdt-sync] Connecting to rendezvous peer: {url}"); match connect_and_sync(&url).await { Ok(()) => { slog!("[crdt-sync] Rendezvous connection closed cleanly"); backoff_secs = 1; + consecutive_failures = 0; } Err(e) => { - slog!("[crdt-sync] Rendezvous connection error: {e}"); + consecutive_failures += 1; + if consecutive_failures >= RENDEZVOUS_ERROR_THRESHOLD { + slog_error!( + "[crdt-sync] Rendezvous peer unreachable ({consecutive_failures} consecutive failures): {e}" + ); + } else { + slog_warn!( + "[crdt-sync] Rendezvous connection error (attempt {consecutive_failures}): {e}" + ); + } } } slog!("[crdt-sync] Reconnecting in {backoff_secs}s..."); @@ -559,6 +577,67 @@ name = "test" assert!(config.rendezvous.is_none()); } + // ── AC4: Failure logging escalation ────────────────────────────────────── + + /// AC4: Connection errors must be logged at WARN for the first nine + /// consecutive failures and escalate to ERROR from the tenth onwards. + #[test] + fn failure_counter_warn_below_threshold() { + let threshold = super::RENDEZVOUS_ERROR_THRESHOLD; + let mut consecutive_failures: u32 = 0; + + // First threshold-1 failures are below the ERROR threshold. + for _ in 0..(threshold - 1) { + consecutive_failures += 1; + assert!( + consecutive_failures < threshold, + "failure {consecutive_failures} must be below ERROR threshold {threshold}" + ); + } + } + + /// AC4: The tenth consecutive failure must trigger ERROR-level logging. + #[test] + fn failure_counter_error_at_threshold() { + let threshold = super::RENDEZVOUS_ERROR_THRESHOLD; + let consecutive_failures: u32 = threshold; + assert!( + consecutive_failures >= threshold, + "failure {consecutive_failures} must reach or exceed ERROR threshold {threshold}" + ); + } + + /// AC4: A successful connection resets the failure counter so subsequent + /// single failures are again logged at WARN (not ERROR). + #[test] + fn failure_counter_resets_on_success() { + let threshold = super::RENDEZVOUS_ERROR_THRESHOLD; + // Simulate sustained failure. + let mut consecutive_failures: u32 = threshold + 5; + assert!(consecutive_failures >= threshold); + + // Simulate a clean reconnect. + consecutive_failures = 0; + assert_eq!(consecutive_failures, 0, "counter must reset to 0 on success"); + + // Next error is attempt 1 — well below the ERROR threshold. + consecutive_failures += 1; + assert!( + consecutive_failures < threshold, + "first failure after reset must be below ERROR threshold" + ); + } + + /// AC4: The RENDEZVOUS_ERROR_THRESHOLD constant must equal 10. + #[test] + fn error_threshold_is_ten() { + assert_eq!( + super::RENDEZVOUS_ERROR_THRESHOLD, + 10, + "ERROR escalation threshold must be 10 consecutive failures" + ); + } + // ── AC5: Self-loop dedup ────────────────────────────────────────────────── /// AC5: Applying the same SignedOp twice returns AlreadySeen on the second @@ -1125,4 +1204,293 @@ name = "test" "slow peer must receive a Lagged error when channel overflows" ); } + + // ── AC5 (story 508): E2E convergence test via real WebSocket ───────────── + + /// AC5: Spin up two in-process WebSocket nodes. Node A serves a + /// `/crdt-sync`-compatible endpoint; Node B connects as a rendezvous client. + /// Node A sends a bulk state; Node B applies it. Assert both nodes see the + /// same items within a bounded time window. + #[tokio::test] + async fn e2e_convergence_two_websocket_nodes() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use futures::{SinkExt, StreamExt}; + use serde_json::json; + use std::sync::{Arc, Mutex}; + use tokio::net::TcpListener; + use tokio_tungstenite::{accept_async, connect_async}; + use tokio_tungstenite::tungstenite::Message as TMsg; + + use crate::crdt_state::PipelineDoc; + + // ── Node A: build local state ────────────────────────────────────── + let kp_a = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "508_e2e_convergence", + "stage": "2_current", + "name": "E2E Convergence Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); + crdt_a.apply(op1.clone()); + + // Serialise A's full state as a bulk message. + let op1_json = serde_json::to_string(&op1).unwrap(); + let bulk_msg = SyncMessage::Bulk { ops: vec![op1_json] }; + let bulk_wire = serde_json::to_string(&bulk_msg).unwrap(); + + // ── Start Node A's WebSocket server on a random port ─────────────── + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let bulk_to_send = bulk_wire.clone(); + let received_by_a: Arc>> = Arc::new(Mutex::new(vec![])); + let received_by_a_clone = received_by_a.clone(); + + tokio::spawn(async move { + let (tcp_stream, _) = listener.accept().await.unwrap(); + let ws_stream = accept_async(tcp_stream).await.unwrap(); + let (mut sink, mut stream) = ws_stream.split(); + + // Send bulk state to the connecting peer. + sink.send(TMsg::Text(bulk_to_send.into())).await.unwrap(); + + // Also listen for ops sent by the peer. + if let Some(Ok(TMsg::Text(txt))) = stream.next().await { + received_by_a_clone.lock().unwrap().push(txt.to_string()); + } + }); + + // ── Node B: connect to Node A and exchange state ─────────────────── + let kp_b = make_keypair(); + let mut crdt_b = BaseCrdt::::new(&kp_b); + + let url = format!("ws://{addr}"); + let (ws_b, _) = connect_async(&url).await.unwrap(); + let (mut sink_b, mut stream_b) = ws_b.split(); + + // Node B receives bulk from A. + if let Some(Ok(TMsg::Text(txt))) = stream_b.next().await { + let msg: SyncMessage = serde_json::from_str(txt.as_str()).unwrap(); + match msg { + SyncMessage::Bulk { ops } => { + for op_str in &ops { + let signed: bft_json_crdt::json_crdt::SignedOp = + serde_json::from_str(op_str).unwrap(); + let r = crdt_b.apply(signed); + assert!(r == OpState::Ok || r == OpState::AlreadySeen); + } + } + _ => panic!("Expected Bulk from Node A"), + } + } + + // Node B also creates a new op and sends it to A. + let item_b: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "508_e2e_convergence_b", + "stage": "1_backlog", + "name": "E2E Convergence B", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op_b1 = crdt_b.doc.items.insert(ROOT_ID, item_b).sign(&kp_b); + crdt_b.apply(op_b1.clone()); + + let op_b1_json = serde_json::to_string(&op_b1).unwrap(); + let msg_to_a = SyncMessage::Op { op: op_b1_json }; + sink_b + .send(TMsg::Text(serde_json::to_string(&msg_to_a).unwrap().into())) + .await + .unwrap(); + + // Wait a moment for Node A to process. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // ── Assert convergence ───────────────────────────────────────────── + + // Node B received Node A's item. + assert_eq!(crdt_b.doc.items.view().len(), 2, + "Node B must see both items after sync"); + let has_a_item = crdt_b.doc.items.view().iter().any(|item| { + item.story_id.view() == JV::String("508_e2e_convergence".to_string()) + }); + assert!(has_a_item, "Node B must have Node A's item"); + + // Node A received Node B's op via the WebSocket. + let a_received = received_by_a.lock().unwrap(); + assert!( + !a_received.is_empty(), + "Node A must have received an op from Node B" + ); + } + + // ── AC6 (story 508): Partition healing E2E via real WebSocket ───────────── + + /// AC6: Two nodes exchange ops, the connection is dropped (partition), each + /// side mutates independently, then they reconnect and the reconnecting + /// client sends a fresh bulk state. Assert both converge to the same final + /// state. + #[tokio::test] + async fn e2e_partition_healing_websocket() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use futures::{SinkExt, StreamExt}; + use serde_json::json; + use tokio::net::TcpListener; + use tokio_tungstenite::{accept_async, connect_async}; + use tokio_tungstenite::tungstenite::Message as TMsg; + + use crate::crdt_state::PipelineDoc; + + // ── Phase 1: Both nodes start with op_a1 (before partition) ─────── + let kp_a = make_keypair(); + let kp_b = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + let mut crdt_b = BaseCrdt::::new(&kp_b); + + let item_a: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "508_heal_a", + "stage": "1_backlog", + "name": "Heal A", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op_a1 = crdt_a.doc.items.insert(ROOT_ID, item_a).sign(&kp_a); + crdt_a.apply(op_a1.clone()); + // B also starts with op_a1 (shared state before partition). + crdt_b.apply(op_a1.clone()); + + // ── Phase 2: Partition — each side mutates independently ────────── + // A advances its story stage. + let op_a2 = crdt_a.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp_a); + crdt_a.apply(op_a2.clone()); + + // B inserts a new story that A doesn't know about yet. + let item_b: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "508_heal_b", + "stage": "1_backlog", + "name": "Heal B", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op_b1 = crdt_b.doc.items.insert(ROOT_ID, item_b).sign(&kp_b); + crdt_b.apply(op_b1.clone()); + + // Collect B's full state as bulk (what it will send on reconnect). + let b_ops: Vec = [&op_a1, &op_b1] + .iter() + .map(|op| serde_json::to_string(op).unwrap()) + .collect(); + let b_bulk_wire = serde_json::to_string(&SyncMessage::Bulk { ops: b_ops }).unwrap(); + + // Collect A's full state as bulk (what it will send on reconnect). + let a_ops: Vec = [&op_a1, &op_a2] + .iter() + .map(|op| serde_json::to_string(op).unwrap()) + .collect(); + let a_bulk_wire = serde_json::to_string(&SyncMessage::Bulk { ops: a_ops }).unwrap(); + + // ── Phase 3: Reconnect — use a real WebSocket to exchange bulk ──── + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let a_bulk_to_send = a_bulk_wire.clone(); + let a_received_bulk: std::sync::Arc>> = + std::sync::Arc::new(std::sync::Mutex::new(None)); + let a_received_clone = a_received_bulk.clone(); + + tokio::spawn(async move { + let (tcp, _) = listener.accept().await.unwrap(); + let ws = accept_async(tcp).await.unwrap(); + let (mut sink, mut stream) = ws.split(); + // A sends its bulk state. + sink.send(TMsg::Text(a_bulk_to_send.into())).await.unwrap(); + // A receives B's bulk state. + if let Some(Ok(TMsg::Text(txt))) = stream.next().await { + *a_received_clone.lock().unwrap() = Some(txt.to_string()); + } + }); + + // B connects, exchanges bulk state. + let (ws_b, _) = connect_async(format!("ws://{addr}")).await.unwrap(); + let (mut sink_b, mut stream_b) = ws_b.split(); + + // B receives A's bulk and applies it. + if let Some(Ok(TMsg::Text(txt))) = stream_b.next().await { + let msg: SyncMessage = serde_json::from_str(txt.as_str()).unwrap(); + if let SyncMessage::Bulk { ops } = msg { + for op_str in &ops { + let signed: bft_json_crdt::json_crdt::SignedOp = + serde_json::from_str(op_str).unwrap(); + let _ = crdt_b.apply(signed); + } + } + } + + // B sends its bulk state to A. + sink_b + .send(TMsg::Text(b_bulk_wire.into())) + .await + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Apply A's received ops into crdt_a. + if let Some(bulk_str) = a_received_bulk.lock().unwrap().take() { + let msg: SyncMessage = serde_json::from_str(&bulk_str).unwrap(); + if let SyncMessage::Bulk { ops } = msg { + for op_str in &ops { + let signed: bft_json_crdt::json_crdt::SignedOp = + serde_json::from_str(op_str).unwrap(); + let _ = crdt_a.apply(signed); + } + } + } + + // ── Assert convergence ───────────────────────────────────────────── + + // Both nodes must have 2 items. + assert_eq!(crdt_a.doc.items.view().len(), 2, + "A must have 2 items after healing"); + assert_eq!(crdt_b.doc.items.view().len(), 2, + "B must have 2 items after healing"); + + // A must see B's story. + let b_story_on_a = crdt_a.doc.items.view().iter().any(|item| { + item.story_id.view() == JV::String("508_heal_b".to_string()) + }); + assert!(b_story_on_a, "A must have B's story after healing"); + + // B must see A's stage advance. + let a_story_on_b = crdt_b.doc.items.view().iter().any(|item| { + item.story_id.view() == JV::String("508_heal_a".to_string()) + }); + assert!(a_story_on_b, "B must have A's story after healing"); + + // CRDT views must be byte-identical (convergence). + let view_a = serde_json::to_string(&CrdtNode::view(&crdt_a.doc.items)).unwrap(); + let view_b = serde_json::to_string(&CrdtNode::view(&crdt_b.doc.items)).unwrap(); + assert_eq!(view_a, view_b, "Both nodes must converge to identical state"); + } }