merge(478): WebSocket CRDT sync layer (manual squash from feature/story-478)

Manual squash-merge of feature/story-478_… into master after the in-pipeline
mergemaster runs failed silently. The 478 agent did substantial real work
across multiple respawn cycles before being interrupted; commits on the
feature branch were intact and verified high-quality but never merged via
the normal pipeline path due to compounding bugs:

- The first mergemaster attempt ran ($0.82 in tokens) and exited "Done"
  cleanly but didn't push anything to master — likely the worktree was
  briefly on master rather than the feature branch when the merge_agent_work
  MCP tool ran, so it found nothing to merge.
- Subsequent timer fires defaulted to spawning coders instead of resuming
  mergemaster, burning more tokens for no progress.
- Bug 510 (split-brain shadows yanking done stories back to current) and
  bug 501 (timers don't cancel on stop/completion) compounded the cost.

What this commit lands:
- server/src/crdt_sync.rs (new, ~518 lines): GET /crdt-sync WebSocket
  handler that subscribes to locally-applied SignedOps and streams them as
  binary frames. Per-peer bounded queue (256 ops) drops slow peers.
- server/src/crdt_state.rs: new public functions subscribe_ops(),
  all_ops_json(), apply_remote_op() backing the sync handler. Adds the
  CRDT_OP_TX broadcast channel (capacity 1024).
- server/src/main.rs: wires up the sync subsystem at startup.
- server/src/http/mod.rs: registers the new endpoint.
- server/src/config.rs: adds optional rendezvous field for outbound peers.
- server/src/worktree.rs: minor changes from the original branch.
- server/Cargo.toml: cfg lint suppression for CrdtNode derive.
- crates/bft-json-crdt/src/debug.rs: fix unused-variable warnings.

Resolved a trivial test-mod merge conflict in crdt_state.rs (both 478 and
503 added new tests at the end of the test module — kept both sets).

Note: this is the squash of the original 478 work that the user explicitly
authorized landing. The earlier rogue commit ac9f3ecf — which added a
DIFFERENT, broken implementation of the same feature directly to master
under the user's identity without consent — was reverted earlier in this
session. The forensic tags rogue-commit-2026-04-09-ac9f3ecf and
pre-502-reset-2026-04-09 still exist for incident audit.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Timmy
2026-04-09 19:46:29 +01:00
parent 41515e3b8f
commit 5765fb57be
8 changed files with 754 additions and 11 deletions
+190 -1
View File
@@ -50,6 +50,28 @@ pub fn subscribe() -> Option<broadcast::Receiver<CrdtEvent>> {
static CRDT_EVENT_TX: OnceLock<broadcast::Sender<CrdtEvent>> = OnceLock::new();
// ── Sync broadcast (outgoing ops to peers) ──────────────────────────
static SYNC_TX: OnceLock<broadcast::Sender<SignedOp>> = OnceLock::new();
/// Subscribe to locally-created CRDT ops for sync replication.
///
/// Each `SignedOp` broadcast here was created by *this* node and should be
/// forwarded to connected peers. Returns `None` before `init()`.
pub fn subscribe_ops() -> Option<broadcast::Receiver<SignedOp>> {
SYNC_TX.get().map(|tx| tx.subscribe())
}
/// Return all persisted `SignedOp`s in causal order (oldest first).
///
/// Used during initial sync handshake so a newly-connected peer can
/// reconstruct the full CRDT state. Returns `None` before `init()`.
pub fn all_ops_json() -> Option<Vec<String>> {
ALL_OPS.get().map(|m| m.lock().unwrap().clone())
}
static ALL_OPS: OnceLock<Mutex<Vec<String>>> = OnceLock::new();
// ── CRDT document types ──────────────────────────────────────────────
#[add_crdt_fields]
@@ -125,13 +147,16 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
.fetch_all(&pool)
.await?;
let mut all_ops_vec = Vec::with_capacity(rows.len());
for (op_json,) in &rows {
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json) {
crdt.apply(signed_op);
all_ops_vec.push(op_json.clone());
} else {
slog!("[crdt] Warning: failed to deserialize stored op");
}
}
let _ = ALL_OPS.set(Mutex::new(all_ops_vec));
// Build the index from the reconstructed state.
let index = rebuild_index(&crdt);
@@ -189,6 +214,10 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
let (event_tx, _) = broadcast::channel::<CrdtEvent>(256);
let _ = CRDT_EVENT_TX.set(event_tx);
// Initialise the sync broadcast channel for outgoing ops.
let (sync_tx, _) = broadcast::channel::<SignedOp>(1024);
let _ = SYNC_TX.set(sync_tx);
Ok(())
}
@@ -240,7 +269,18 @@ where
let raw_op = op_fn(state);
let signed = raw_op.sign(&state.keypair);
state.crdt.apply(signed.clone());
let _ = state.persist_tx.send(signed);
let _ = state.persist_tx.send(signed.clone());
// Track in ALL_OPS and broadcast to sync peers.
if let Ok(json) = serde_json::to_string(&signed)
&& let Some(all) = ALL_OPS.get()
&& let Ok(mut v) = all.lock()
{
v.push(json);
}
if let Some(tx) = SYNC_TX.get() {
let _ = tx.send(signed);
}
}
/// Write a pipeline item state through CRDT operations.
@@ -356,6 +396,83 @@ fn emit_event(event: CrdtEvent) {
}
}
// ── Remote op ingestion (from sync peers) ───────────────────────────
/// Apply a `SignedOp` received from a remote peer.
///
/// The op is validated, applied to the local CRDT, persisted to SQLite,
/// and any resulting stage transitions are broadcast as [`CrdtEvent`]s.
/// Unlike `apply_and_persist`, this does **not** re-broadcast the op on
/// the sync channel (to avoid infinite echo loops).
///
/// Returns `true` if the op was new and applied, `false` if it was a
/// duplicate or failed validation.
pub fn apply_remote_op(op: SignedOp) -> bool {
let Some(state_mutex) = CRDT_STATE.get() else {
return false;
};
let Ok(mut state) = state_mutex.lock() else {
return false;
};
// Snapshot stage state before applying so we can detect transitions.
let pre_stages: HashMap<String, String> = state
.index
.iter()
.filter_map(|(sid, &idx)| {
match state.crdt.doc.items[idx].stage.view() {
JsonValue::String(s) => Some((sid.clone(), s)),
_ => None,
}
})
.collect();
let result = state.crdt.apply(op.clone());
if result != bft_json_crdt::json_crdt::OpState::Ok
&& result != bft_json_crdt::json_crdt::OpState::MissingCausalDependencies
{
return false;
}
// Persist the op (fire-and-forget).
let _ = state.persist_tx.send(op.clone());
// Track in ALL_OPS.
if let Ok(json) = serde_json::to_string(&op)
&& let Some(all) = ALL_OPS.get()
&& let Ok(mut v) = all.lock()
{
v.push(json);
}
// Rebuild index (new items may have been inserted).
state.index = rebuild_index(&state.crdt);
// Detect and broadcast stage transitions.
for (sid, &idx) in &state.index {
let new_stage = match state.crdt.doc.items[idx].stage.view() {
JsonValue::String(s) => s,
_ => continue,
};
let old_stage = pre_stages.get(sid).cloned();
let changed = old_stage.as_deref() != Some(&new_stage);
if changed {
let name = match state.crdt.doc.items[idx].name.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
emit_event(CrdtEvent {
story_id: sid.clone(),
from_stage: old_stage,
to_stage: new_stage,
name,
});
}
}
true
}
// ── Read path ────────────────────────────────────────────────────────
/// Read the full pipeline state from the CRDT document.
@@ -844,4 +961,76 @@ mod tests {
let result = check_archived_deps_crdt("nonexistent_story_archived");
assert!(result.is_empty());
}
// ── 478: WebSocket CRDT sync layer tests ────────────────────────────────
#[test]
fn apply_remote_op_returns_false_when_not_initialised() {
// Without the global CRDT state, apply_remote_op should return false.
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = serde_json::json!({
"story_id": "80_story_remote",
"stage": "1_backlog",
"name": "Remote",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp);
// This uses the global state which may not be initialised in tests.
let _ = apply_remote_op(op);
}
#[test]
fn signed_op_survives_sync_serialization_roundtrip() {
// Verify that a SignedOp serialised to JSON and back produces
// the same op (critical for the sync wire protocol).
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = serde_json::json!({
"story_id": "90_story_wire",
"stage": "2_current",
"name": "Wire Test",
"agent": "coder",
"retry_count": 1.0,
"blocked": false,
"depends_on": "[10]",
})
.into();
let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp);
let json1 = serde_json::to_string(&op).unwrap();
let roundtripped: SignedOp = serde_json::from_str(&json1).unwrap();
let json2 = serde_json::to_string(&roundtripped).unwrap();
assert_eq!(json1, json2);
assert_eq!(op.id(), roundtripped.id());
assert_eq!(op.inner.seq, roundtripped.inner.seq);
assert_eq!(op.author(), roundtripped.author());
}
#[test]
fn sync_broadcast_channel_round_trip() {
let (tx, mut rx) = broadcast::channel::<SignedOp>(16);
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = serde_json::json!({
"story_id": "95_story_sync_bcast",
"stage": "1_backlog",
"name": "",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp);
tx.send(op.clone()).unwrap();
let received = rx.try_recv().unwrap();
assert_eq!(received.id(), op.id());
}
}