diff --git a/crates/bft-json-crdt/src/debug.rs b/crates/bft-json-crdt/src/debug.rs index 4d141f95..94e0546d 100644 --- a/crates/bft-json-crdt/src/debug.rs +++ b/crates/bft-json-crdt/src/debug.rs @@ -191,7 +191,7 @@ impl ListCrdt where T: CrdtNode, { - pub fn log_ops(&self, highlight: Option) { + pub fn log_ops(&self, _highlight: Option) { #[cfg(feature = "logging-list")] { let mut lines = Vec::::new(); @@ -242,7 +242,7 @@ where let cur_char = if is_last(op) { "╰─" } else { "├─" }; let prefixes = stack.iter().map(|s| s.1).collect::>().join(""); - let highlight_text = if highlight.is_some() && highlight.unwrap() == op.id { + let highlight_text = if _highlight.is_some() && _highlight.unwrap() == op.id { if op.is_deleted { "<- deleted".bold().red() } else { @@ -289,27 +289,27 @@ where } } - pub fn log_apply(&self, op: &Op) { + pub fn log_apply(&self, _op: &Op) { #[cfg(feature = "logging-list")] { - if op.is_deleted { + if _op.is_deleted { println!( "{} Performing a delete of {}@{}", display_author(self.our_id), - display_op_id(op), - op.sequence_num(), + display_op_id(_op), + _op.sequence_num(), ); return; } - if let Some(content) = op.content.as_ref() { + if let Some(content) = _op.content.as_ref() { println!( "{} Performing an insert of {}@{}: '{}' after {}", display_author(self.our_id), - display_op_id(op), - op.sequence_num(), + display_op_id(_op), + _op.sequence_num(), content.hash(), - display_op_id(op) + display_op_id(_op) ); } } diff --git a/server/Cargo.toml b/server/Cargo.toml index 304e2443..2357758f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -47,6 +47,10 @@ indexmap = { version = "2.2.6", features = ["serde"] } [target.'cfg(unix)'.dependencies] libc = { workspace = true } +[lints.rust.unexpected_cfgs] +level = "warn" +check-cfg = ["cfg(feature, values(\"logging-base\"))"] + [dev-dependencies] tempfile = { workspace = true } mockito = "1" diff --git a/server/src/config.rs b/server/src/config.rs index 70f71ee3..228d61dc 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -47,6 +47,12 @@ pub struct ProjectConfig { /// of the container/host local time. Falls back to `chrono::Local` when absent. #[serde(default)] pub timezone: Option, + /// WebSocket URL of a remote huskies node to sync CRDT state with. + /// Example: `rendezvous = "ws://server:3001/crdt-sync"` + /// When set, this node connects to the remote and exchanges CRDT ops + /// so both machines see the same pipeline state in real-time. + #[serde(default)] + pub rendezvous: Option, } /// Configuration for the filesystem watcher's sweep behaviour. @@ -220,6 +226,7 @@ impl Default for ProjectConfig { base_branch: None, rate_limit_notifications: default_rate_limit_notifications(), timezone: None, + rendezvous: None, } } } @@ -295,6 +302,7 @@ impl ProjectConfig { base_branch: legacy.base_branch, rate_limit_notifications: legacy.rate_limit_notifications, timezone: legacy.timezone, + rendezvous: None, }; validate_agents(&config.agent)?; return Ok(config); @@ -322,6 +330,7 @@ impl ProjectConfig { base_branch: legacy.base_branch, rate_limit_notifications: legacy.rate_limit_notifications, timezone: legacy.timezone, + rendezvous: None, }; validate_agents(&config.agent)?; Ok(config) @@ -337,6 +346,7 @@ impl ProjectConfig { base_branch: legacy.base_branch, rate_limit_notifications: legacy.rate_limit_notifications, timezone: legacy.timezone, + rendezvous: None, }) } } diff --git a/server/src/crdt_state.rs b/server/src/crdt_state.rs index 765696d2..d3f780db 100644 --- a/server/src/crdt_state.rs +++ b/server/src/crdt_state.rs @@ -50,6 +50,28 @@ pub fn subscribe() -> Option> { static CRDT_EVENT_TX: OnceLock> = OnceLock::new(); +// ── Sync broadcast (outgoing ops to peers) ────────────────────────── + +static SYNC_TX: OnceLock> = OnceLock::new(); + +/// Subscribe to locally-created CRDT ops for sync replication. +/// +/// Each `SignedOp` broadcast here was created by *this* node and should be +/// forwarded to connected peers. Returns `None` before `init()`. +pub fn subscribe_ops() -> Option> { + SYNC_TX.get().map(|tx| tx.subscribe()) +} + +/// Return all persisted `SignedOp`s in causal order (oldest first). +/// +/// Used during initial sync handshake so a newly-connected peer can +/// reconstruct the full CRDT state. Returns `None` before `init()`. +pub fn all_ops_json() -> Option> { + ALL_OPS.get().map(|m| m.lock().unwrap().clone()) +} + +static ALL_OPS: OnceLock>> = OnceLock::new(); + // ── CRDT document types ────────────────────────────────────────────── #[add_crdt_fields] @@ -125,13 +147,16 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { .fetch_all(&pool) .await?; + let mut all_ops_vec = Vec::with_capacity(rows.len()); for (op_json,) in &rows { if let Ok(signed_op) = serde_json::from_str::(op_json) { crdt.apply(signed_op); + all_ops_vec.push(op_json.clone()); } else { slog!("[crdt] Warning: failed to deserialize stored op"); } } + let _ = ALL_OPS.set(Mutex::new(all_ops_vec)); // Build the index from the reconstructed state. let index = rebuild_index(&crdt); @@ -189,6 +214,10 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { let (event_tx, _) = broadcast::channel::(256); let _ = CRDT_EVENT_TX.set(event_tx); + // Initialise the sync broadcast channel for outgoing ops. + let (sync_tx, _) = broadcast::channel::(1024); + let _ = SYNC_TX.set(sync_tx); + Ok(()) } @@ -240,7 +269,18 @@ where let raw_op = op_fn(state); let signed = raw_op.sign(&state.keypair); state.crdt.apply(signed.clone()); - let _ = state.persist_tx.send(signed); + let _ = state.persist_tx.send(signed.clone()); + + // Track in ALL_OPS and broadcast to sync peers. + if let Ok(json) = serde_json::to_string(&signed) + && let Some(all) = ALL_OPS.get() + && let Ok(mut v) = all.lock() + { + v.push(json); + } + if let Some(tx) = SYNC_TX.get() { + let _ = tx.send(signed); + } } /// Write a pipeline item state through CRDT operations. @@ -356,6 +396,83 @@ fn emit_event(event: CrdtEvent) { } } +// ── Remote op ingestion (from sync peers) ─────────────────────────── + +/// Apply a `SignedOp` received from a remote peer. +/// +/// The op is validated, applied to the local CRDT, persisted to SQLite, +/// and any resulting stage transitions are broadcast as [`CrdtEvent`]s. +/// Unlike `apply_and_persist`, this does **not** re-broadcast the op on +/// the sync channel (to avoid infinite echo loops). +/// +/// Returns `true` if the op was new and applied, `false` if it was a +/// duplicate or failed validation. +pub fn apply_remote_op(op: SignedOp) -> bool { + let Some(state_mutex) = CRDT_STATE.get() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + + // Snapshot stage state before applying so we can detect transitions. + let pre_stages: HashMap = state + .index + .iter() + .filter_map(|(sid, &idx)| { + match state.crdt.doc.items[idx].stage.view() { + JsonValue::String(s) => Some((sid.clone(), s)), + _ => None, + } + }) + .collect(); + + let result = state.crdt.apply(op.clone()); + if result != bft_json_crdt::json_crdt::OpState::Ok + && result != bft_json_crdt::json_crdt::OpState::MissingCausalDependencies + { + return false; + } + + // Persist the op (fire-and-forget). + let _ = state.persist_tx.send(op.clone()); + + // Track in ALL_OPS. + if let Ok(json) = serde_json::to_string(&op) + && let Some(all) = ALL_OPS.get() + && let Ok(mut v) = all.lock() + { + v.push(json); + } + + // Rebuild index (new items may have been inserted). + state.index = rebuild_index(&state.crdt); + + // Detect and broadcast stage transitions. + for (sid, &idx) in &state.index { + let new_stage = match state.crdt.doc.items[idx].stage.view() { + JsonValue::String(s) => s, + _ => continue, + }; + let old_stage = pre_stages.get(sid).cloned(); + let changed = old_stage.as_deref() != Some(&new_stage); + if changed { + let name = match state.crdt.doc.items[idx].name.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + emit_event(CrdtEvent { + story_id: sid.clone(), + from_stage: old_stage, + to_stage: new_stage, + name, + }); + } + } + + true +} + // ── Read path ──────────────────────────────────────────────────────── /// Read the full pipeline state from the CRDT document. @@ -844,4 +961,76 @@ mod tests { let result = check_archived_deps_crdt("nonexistent_story_archived"); assert!(result.is_empty()); } + + // ── 478: WebSocket CRDT sync layer tests ──────────────────────────────── + + #[test] + fn apply_remote_op_returns_false_when_not_initialised() { + // Without the global CRDT state, apply_remote_op should return false. + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + let item: JsonValue = serde_json::json!({ + "story_id": "80_story_remote", + "stage": "1_backlog", + "name": "Remote", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp); + // This uses the global state which may not be initialised in tests. + let _ = apply_remote_op(op); + } + + #[test] + fn signed_op_survives_sync_serialization_roundtrip() { + // Verify that a SignedOp serialised to JSON and back produces + // the same op (critical for the sync wire protocol). + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + let item: JsonValue = serde_json::json!({ + "story_id": "90_story_wire", + "stage": "2_current", + "name": "Wire Test", + "agent": "coder", + "retry_count": 1.0, + "blocked": false, + "depends_on": "[10]", + }) + .into(); + let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp); + + let json1 = serde_json::to_string(&op).unwrap(); + let roundtripped: SignedOp = serde_json::from_str(&json1).unwrap(); + let json2 = serde_json::to_string(&roundtripped).unwrap(); + + assert_eq!(json1, json2); + assert_eq!(op.id(), roundtripped.id()); + assert_eq!(op.inner.seq, roundtripped.inner.seq); + assert_eq!(op.author(), roundtripped.author()); + } + + #[test] + fn sync_broadcast_channel_round_trip() { + let (tx, mut rx) = broadcast::channel::(16); + let kp = make_keypair(); + let mut crdt = BaseCrdt::::new(&kp); + let item: JsonValue = serde_json::json!({ + "story_id": "95_story_sync_bcast", + "stage": "1_backlog", + "name": "", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp); + tx.send(op.clone()).unwrap(); + + let received = rx.try_recv().unwrap(); + assert_eq!(received.id(), op.id()); + } } diff --git a/server/src/crdt_sync.rs b/server/src/crdt_sync.rs new file mode 100644 index 00000000..32a614ce --- /dev/null +++ b/server/src/crdt_sync.rs @@ -0,0 +1,518 @@ +/// WebSocket-based CRDT sync layer for replicating pipeline state between +/// huskies nodes. +/// +/// # Protocol +/// +/// The sync protocol uses newline-delimited JSON over WebSocket text frames. +/// Each message is a JSON object with a `"type"` field: +/// +/// - `{"type":"bulk","ops":[...]}` — Initial state dump (array of serialised +/// `SignedOp` JSON strings). Sent by both sides immediately after connect. +/// - `{"type":"op","op":"..."}` — A single new `SignedOp` (serialised JSON +/// string). Streamed in real-time as local ops are created. +/// +/// Both the server endpoint and the rendezvous client use the same protocol, +/// making the connection fully symmetric. +use bft_json_crdt::json_crdt::SignedOp; +use futures::{SinkExt, StreamExt}; +use poem::handler; +use poem::web::Data; +use poem::web::websocket::{Message as WsMessage, WebSocket}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use crate::crdt_state; +use crate::http::context::AppContext; +use crate::slog; + +// ── Wire protocol types ───────────────────────────────────────────── + +#[derive(Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +enum SyncMessage { + /// Bulk state dump sent on connect. + Bulk { ops: Vec }, + /// A single new op. + Op { op: String }, +} + +// ── Server-side WebSocket handler ─────────────────────────────────── + +#[handler] +pub async fn crdt_sync_handler( + ws: WebSocket, + _ctx: Data<&Arc>, +) -> impl poem::IntoResponse { + ws.on_upgrade(|socket| async move { + let (mut sink, mut stream) = socket.split(); + + slog!("[crdt-sync] Peer connected"); + + // Send bulk state dump. + if let Some(ops) = crdt_state::all_ops_json() { + let msg = SyncMessage::Bulk { ops }; + if let Ok(json) = serde_json::to_string(&msg) + && sink.send(WsMessage::Text(json)).await.is_err() + { + return; + } + } + + // Subscribe to new local ops. + let Some(mut op_rx) = crdt_state::subscribe_ops() else { + return; + }; + + loop { + tokio::select! { + // Forward new local ops to the peer. + result = op_rx.recv() => { + match result { + Ok(signed_op) => { + if let Ok(op_json) = serde_json::to_string(&signed_op) { + let msg = SyncMessage::Op { op: op_json }; + if let Ok(text) = serde_json::to_string(&msg) + && sink.send(WsMessage::Text(text)).await.is_err() + { + break; + } + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + slog!("[crdt-sync] Lagged {n} ops, peer may need re-sync"); + } + Err(_) => break, + } + } + // Receive ops from the peer. + frame = stream.next() => { + match frame { + Some(Ok(WsMessage::Text(text))) => { + handle_incoming_message(&text); + } + Some(Ok(WsMessage::Close(_))) | None => break, + _ => {} + } + } + } + } + + slog!("[crdt-sync] Peer disconnected"); + }) +} + +/// Process an incoming sync message from a peer. +fn handle_incoming_message(text: &str) { + let msg: SyncMessage = match serde_json::from_str(text) { + Ok(m) => m, + Err(e) => { + slog!("[crdt-sync] Bad message from peer: {e}"); + return; + } + }; + + match msg { + SyncMessage::Bulk { ops } => { + let mut applied = 0u64; + for op_json in &ops { + if let Ok(signed_op) = serde_json::from_str::(op_json) + && crdt_state::apply_remote_op(signed_op) + { + applied += 1; + } + } + slog!( + "[crdt-sync] Bulk sync: received {} ops, applied {applied}", + ops.len() + ); + } + SyncMessage::Op { op } => { + if let Ok(signed_op) = serde_json::from_str::(&op) { + crdt_state::apply_remote_op(signed_op); + } + } + } +} + +// ── Rendezvous client ─────────────────────────────────────────────── + +/// Spawn a background task that connects to the configured rendezvous +/// peer and exchanges CRDT ops bidirectionally. +/// +/// The client reconnects with exponential backoff if the connection drops. +pub fn spawn_rendezvous_client(url: String) { + tokio::spawn(async move { + let mut backoff_secs = 1u64; + loop { + slog!("[crdt-sync] Connecting to rendezvous peer: {url}"); + match connect_and_sync(&url).await { + Ok(()) => { + slog!("[crdt-sync] Rendezvous connection closed cleanly"); + backoff_secs = 1; + } + Err(e) => { + slog!("[crdt-sync] Rendezvous connection error: {e}"); + } + } + slog!("[crdt-sync] Reconnecting in {backoff_secs}s..."); + tokio::time::sleep(std::time::Duration::from_secs(backoff_secs)).await; + backoff_secs = (backoff_secs * 2).min(30); + } + }); +} + +/// Connect to a remote sync endpoint and exchange ops until disconnect. +async fn connect_and_sync(url: &str) -> Result<(), String> { + let (ws_stream, _) = tokio_tungstenite::connect_async(url) + .await + .map_err(|e| format!("WebSocket connect failed: {e}"))?; + + let (mut sink, mut stream) = ws_stream.split(); + + slog!("[crdt-sync] Connected to rendezvous peer"); + + // Send our bulk state. + if let Some(ops) = crdt_state::all_ops_json() { + let msg = SyncMessage::Bulk { ops }; + if let Ok(json) = serde_json::to_string(&msg) { + use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; + sink.send(TungsteniteMsg::Text(json.into())) + .await + .map_err(|e| format!("Send bulk failed: {e}"))?; + } + } + + // Subscribe to new local ops. + let Some(mut op_rx) = crdt_state::subscribe_ops() else { + return Err("CRDT not initialised".to_string()); + }; + + loop { + tokio::select! { + result = op_rx.recv() => { + match result { + Ok(signed_op) => { + if let Ok(op_json) = serde_json::to_string(&signed_op) { + let msg = SyncMessage::Op { op: op_json }; + if let Ok(text) = serde_json::to_string(&msg) { + use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; + if sink.send(TungsteniteMsg::Text(text.into())).await.is_err() { + break; + } + } + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + slog!("[crdt-sync] Lagged {n} ops on rendezvous link"); + } + Err(_) => break, + } + } + frame = stream.next() => { + match frame { + Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) => { + handle_incoming_message(text.as_ref()); + } + Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => break, + Some(Err(e)) => { + slog!("[crdt-sync] Rendezvous read error: {e}"); + break; + } + _ => {} + } + } + } + } + + Ok(()) +} + +// ── Tests ──────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sync_message_bulk_serialization_roundtrip() { + let msg = SyncMessage::Bulk { + ops: vec!["op1".to_string(), "op2".to_string()], + }; + let json = serde_json::to_string(&msg).unwrap(); + assert!(json.contains(r#""type":"bulk""#)); + let deserialized: SyncMessage = serde_json::from_str(&json).unwrap(); + match deserialized { + SyncMessage::Bulk { ops } => { + assert_eq!(ops.len(), 2); + assert_eq!(ops[0], "op1"); + assert_eq!(ops[1], "op2"); + } + _ => panic!("Expected Bulk"), + } + } + + #[test] + fn sync_message_op_serialization_roundtrip() { + let msg = SyncMessage::Op { + op: r#"{"inner":{}}"#.to_string(), + }; + let json = serde_json::to_string(&msg).unwrap(); + assert!(json.contains(r#""type":"op""#)); + let deserialized: SyncMessage = serde_json::from_str(&json).unwrap(); + match deserialized { + SyncMessage::Op { op } => { + assert_eq!(op, r#"{"inner":{}}"#); + } + _ => panic!("Expected Op"), + } + } + + #[test] + fn handle_incoming_message_bad_json_does_not_panic() { + handle_incoming_message("not valid json"); + } + + #[test] + fn handle_incoming_message_bulk_with_invalid_ops_does_not_panic() { + let msg = SyncMessage::Bulk { + ops: vec!["not a valid signed op".to_string()], + }; + let json = serde_json::to_string(&msg).unwrap(); + handle_incoming_message(&json); + } + + #[test] + fn handle_incoming_message_op_with_invalid_op_does_not_panic() { + let msg = SyncMessage::Op { + op: "garbage".to_string(), + }; + let json = serde_json::to_string(&msg).unwrap(); + handle_incoming_message(&json); + } + + #[test] + fn subscribe_ops_returns_none_before_init() { + // Before crdt_state::init() the channel doesn't exist yet. + // In test binaries it may or may not be initialised depending on + // other tests, so we just verify no panic. + let _ = crdt_state::subscribe_ops(); + } + + #[test] + fn all_ops_json_returns_none_before_init() { + let _ = crdt_state::all_ops_json(); + } + + #[test] + fn sync_message_bulk_empty_ops() { + let msg = SyncMessage::Bulk { ops: vec![] }; + let json = serde_json::to_string(&msg).unwrap(); + let deserialized: SyncMessage = serde_json::from_str(&json).unwrap(); + match deserialized { + SyncMessage::Bulk { ops } => assert!(ops.is_empty()), + _ => panic!("Expected Bulk"), + } + } + + /// Simulate the sync protocol by creating real SignedOps on two separate + /// CRDT instances and exchanging them through the SyncMessage wire format. + #[test] + fn two_node_sync_via_protocol_messages() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + // ── Node A: create an item ── + let kp_a = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "100_story_sync_test", + "stage": "1_backlog", + "name": "Sync Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); + assert_eq!(crdt_a.apply(op1.clone()), OpState::Ok); + + // Serialise op1 into a SyncMessage::Op. + let op1_json = serde_json::to_string(&op1).unwrap(); + let wire_msg = SyncMessage::Op { op: op1_json.clone() }; + let wire_json = serde_json::to_string(&wire_msg).unwrap(); + + // ── Node B: receive the op through protocol ── + let kp_b = make_keypair(); + let mut crdt_b = BaseCrdt::::new(&kp_b); + assert!(crdt_b.doc.items.view().is_empty()); + + // Parse wire message and apply. + let parsed: SyncMessage = serde_json::from_str(&wire_json).unwrap(); + match parsed { + SyncMessage::Op { op } => { + let signed_op: bft_json_crdt::json_crdt::SignedOp = + serde_json::from_str(&op).unwrap(); + let result = crdt_b.apply(signed_op); + assert_eq!(result, OpState::Ok); + } + _ => panic!("Expected Op"), + } + + // Verify Node B has the same state as Node A. + assert_eq!(crdt_b.doc.items.view().len(), 1); + assert_eq!( + crdt_a.doc.items[0].story_id.view(), + crdt_b.doc.items[0].story_id.view() + ); + assert_eq!( + crdt_a.doc.items[0].stage.view(), + crdt_b.doc.items[0].stage.view() + ); + + // ── Node A: update stage ── + let op2 = crdt_a.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp_a); + crdt_a.apply(op2.clone()); + + // Send via bulk message. + let op2_json = serde_json::to_string(&op2).unwrap(); + let bulk_msg = SyncMessage::Bulk { + ops: vec![op1_json, op2_json], + }; + let bulk_wire = serde_json::to_string(&bulk_msg).unwrap(); + + // ── Node C: receives full state via bulk ── + let kp_c = make_keypair(); + let mut crdt_c = BaseCrdt::::new(&kp_c); + + let parsed_bulk: SyncMessage = serde_json::from_str(&bulk_wire).unwrap(); + match parsed_bulk { + SyncMessage::Bulk { ops } => { + for op_str in &ops { + let signed: bft_json_crdt::json_crdt::SignedOp = + serde_json::from_str(op_str).unwrap(); + crdt_c.apply(signed); + } + } + _ => panic!("Expected Bulk"), + } + + // Node C should have the updated stage. + assert_eq!(crdt_c.doc.items.view().len(), 1); + assert_eq!( + crdt_c.doc.items[0].stage.view(), + bft_json_crdt::json_crdt::JsonValue::String("2_current".to_string()) + ); + } + + /// Verify that a single node's ops (insert + update) can be replayed + /// on another node via bulk sync and produce the same final state. + /// This is the core property needed for partition healing: when a + /// disconnected node reconnects, it sends all its ops as a bulk + /// message and the receiver catches up. + #[test] + fn partition_heal_via_bulk_replay() { + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue as JV}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + use crate::crdt_state::PipelineDoc; + + let kp = make_keypair(); + + // Node A creates an item and advances it. + let mut crdt_a = BaseCrdt::::new(&kp); + let item: bft_json_crdt::json_crdt::JsonValue = json!({ + "story_id": "200_story_heal", + "stage": "1_backlog", + "name": "Heal Test", + "agent": "", + "retry_count": 0.0, + "blocked": false, + "depends_on": "", + }) + .into(); + + let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp); + crdt_a.apply(op1.clone()); + + let op2 = crdt_a.doc.items[0] + .stage + .set("2_current".to_string()) + .sign(&kp); + crdt_a.apply(op2.clone()); + + let op3 = crdt_a.doc.items[0] + .stage + .set("3_qa".to_string()) + .sign(&kp); + crdt_a.apply(op3.clone()); + + // Serialise all ops as a bulk message (simulates partition heal). + let ops_json: Vec = [&op1, &op2, &op3] + .iter() + .map(|op| serde_json::to_string(op).unwrap()) + .collect(); + let bulk = SyncMessage::Bulk { ops: ops_json }; + let wire = serde_json::to_string(&bulk).unwrap(); + + // Node B receives the bulk and reconstructs state. + let mut crdt_b = BaseCrdt::::new(&kp); + let parsed: SyncMessage = serde_json::from_str(&wire).unwrap(); + match parsed { + SyncMessage::Bulk { ops } => { + for op_str in &ops { + let signed: bft_json_crdt::json_crdt::SignedOp = + serde_json::from_str(op_str).unwrap(); + crdt_b.apply(signed); + } + } + _ => panic!("Expected Bulk"), + } + + // Node B should match Node A exactly. + assert_eq!(crdt_b.doc.items.view().len(), 1); + assert_eq!( + crdt_b.doc.items[0].stage.view(), + JV::String("3_qa".to_string()) + ); + assert_eq!( + crdt_a.doc.items[0].stage.view(), + crdt_b.doc.items[0].stage.view() + ); + assert_eq!( + crdt_a.doc.items[0].name.view(), + crdt_b.doc.items[0].name.view() + ); + } + + #[test] + fn config_rendezvous_parsed_from_toml() { + let toml_str = r#" +rendezvous = "ws://remote:3001/crdt-sync" + +[[agent]] +name = "test" +"#; + let config: crate::config::ProjectConfig = toml::from_str(toml_str).unwrap(); + assert_eq!( + config.rendezvous.as_deref(), + Some("ws://remote:3001/crdt-sync") + ); + } + + #[test] + fn config_rendezvous_defaults_to_none() { + let config = crate::config::ProjectConfig::default(); + assert!(config.rendezvous.is_none()); + } +} diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index e95834f6..d3a84fe8 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -76,6 +76,7 @@ pub fn build_routes( .nest("/api", api_service) .nest("/docs", docs_service.swagger_ui()) .at("/ws", get(ws::ws_handler)) + .at("/crdt-sync", get(crate::crdt_sync::crdt_sync_handler)) .at( "/agents/:story_id/:agent_name/stream", get(agents_sse::agent_stream), diff --git a/server/src/main.rs b/server/src/main.rs index 386c7f5a..3d299c5f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -7,6 +7,7 @@ mod agents; mod chat; mod config; pub mod crdt_state; +pub mod crdt_sync; mod db; mod http; mod io; @@ -312,6 +313,14 @@ async fn main() -> Result<(), std::io::Error> { // (CRDT state layer is initialised above alongside the legacy pipeline.db.) + // Start the CRDT sync rendezvous client if configured in project.toml. + if let Some(ref root) = *app_state.project_root.lock().unwrap() + && let Ok(cfg) = config::ProjectConfig::load(root) + && let Some(rendezvous_url) = cfg.rendezvous + { + crdt_sync::spawn_rendezvous_client(rendezvous_url); + } + let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default())); // Event bus: broadcast channel for pipeline lifecycle events. diff --git a/server/src/worktree.rs b/server/src/worktree.rs index b878326e..e8082d21 100644 --- a/server/src/worktree.rs +++ b/server/src/worktree.rs @@ -528,6 +528,7 @@ mod tests { base_branch: None, rate_limit_notifications: true, timezone: None, + rendezvous: None, }; // Should complete without panic run_setup_commands(tmp.path(), &config).await; @@ -552,6 +553,7 @@ mod tests { base_branch: None, rate_limit_notifications: true, timezone: None, + rendezvous: None, }; // Should complete without panic run_setup_commands(tmp.path(), &config).await; @@ -576,6 +578,7 @@ mod tests { base_branch: None, rate_limit_notifications: true, timezone: None, + rendezvous: None, }; // Setup command failures are non-fatal — should not panic or propagate run_setup_commands(tmp.path(), &config).await; @@ -600,6 +603,7 @@ mod tests { base_branch: None, rate_limit_notifications: true, timezone: None, + rendezvous: None, }; // Teardown failures are best-effort — should not propagate assert!(run_teardown_commands(tmp.path(), &config).await.is_ok()); @@ -623,6 +627,7 @@ mod tests { base_branch: None, rate_limit_notifications: true, timezone: None, + rendezvous: None, }; let info = create_worktree(&project_root, "42_fresh_test", &config, 3001) .await @@ -653,6 +658,7 @@ mod tests { base_branch: None, rate_limit_notifications: true, timezone: None, + rendezvous: None, }; // First creation let _info1 = create_worktree(&project_root, "43_reuse_test", &config, 3001) @@ -724,6 +730,7 @@ mod tests { base_branch: None, rate_limit_notifications: true, timezone: None, + rendezvous: None, }; let result = remove_worktree_by_story_id(tmp.path(), "99_nonexistent", &config).await; @@ -753,6 +760,7 @@ mod tests { base_branch: None, rate_limit_notifications: true, timezone: None, + rendezvous: None, }; create_worktree(&project_root, "88_remove_by_id", &config, 3001) .await @@ -829,6 +837,7 @@ mod tests { base_branch: None, rate_limit_notifications: true, timezone: None, + rendezvous: None, }; // Even though setup commands fail, create_worktree must succeed // so the agent can start and fix the problem itself. @@ -861,6 +870,7 @@ mod tests { base_branch: None, rate_limit_notifications: true, timezone: None, + rendezvous: None, }; // First creation — no setup commands, should succeed create_worktree(&project_root, "173_reuse_fail", &empty_config, 3001) @@ -883,6 +893,7 @@ mod tests { base_branch: None, rate_limit_notifications: true, timezone: None, + rendezvous: None, }; // Second call — worktree exists, setup commands fail, must still succeed let result = create_worktree(&project_root, "173_reuse_fail", &failing_config, 3002).await; @@ -911,6 +922,7 @@ mod tests { base_branch: None, rate_limit_notifications: true, timezone: None, + rendezvous: None, }; let info = create_worktree(&project_root, "77_remove_async", &config, 3001) .await