Revert "refactor: split top-5 largest files into mod.rs + tests.rs"

This reverts commit 65a3767a7a.
This commit is contained in:
dave
2026-04-26 20:15:58 +00:00
parent 65a3767a7a
commit 795b172bba
13 changed files with 8918 additions and 8904 deletions
File diff suppressed because it is too large Load Diff
-854
View File
@@ -1,854 +0,0 @@
use super::*;
use bft_json_crdt::json_crdt::OpState;
#[test]
fn crdt_doc_insert_and_view() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item_json: JsonValue = json!({
"story_id": "10_story_test",
"stage": "2_current",
"name": "Test Story",
"agent": "coder-opus",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
assert_eq!(crdt.apply(op), OpState::Ok);
let view = crdt.doc.items.view();
assert_eq!(view.len(), 1);
let item = &crdt.doc.items[0];
assert_eq!(
item.story_id.view(),
JsonValue::String("10_story_test".to_string())
);
assert_eq!(
item.stage.view(),
JsonValue::String("2_current".to_string())
);
}
#[test]
fn crdt_doc_update_stage() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item_json: JsonValue = json!({
"story_id": "20_story_move",
"stage": "1_backlog",
"name": "Move Me",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
crdt.apply(insert_op);
// Update stage
let stage_op = crdt.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp);
crdt.apply(stage_op);
assert_eq!(
crdt.doc.items[0].stage.view(),
JsonValue::String("2_current".to_string())
);
}
#[test]
fn crdt_ops_replay_reconstructs_state() {
let kp = make_keypair();
let mut crdt1 = BaseCrdt::<PipelineDoc>::new(&kp);
// Build state with a series of ops.
let item_json: JsonValue = json!({
"story_id": "30_story_replay",
"stage": "1_backlog",
"name": "Replay Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op1 = crdt1.doc.items.insert(ROOT_ID, item_json).sign(&kp);
crdt1.apply(op1.clone());
let op2 = crdt1.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp);
crdt1.apply(op2.clone());
let op3 = crdt1.doc.items[0]
.name
.set("Updated Name".to_string())
.sign(&kp);
crdt1.apply(op3.clone());
// Replay ops on a fresh CRDT.
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
crdt2.apply(op1);
crdt2.apply(op2);
crdt2.apply(op3);
assert_eq!(
crdt1.doc.items[0].stage.view(),
crdt2.doc.items[0].stage.view()
);
assert_eq!(
crdt1.doc.items[0].name.view(),
crdt2.doc.items[0].name.view()
);
}
#[test]
fn extract_item_view_parses_crdt_item() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item_json: JsonValue = json!({
"story_id": "40_story_view",
"stage": "3_qa",
"name": "View Test",
"agent": "coder-1",
"retry_count": 2.0,
"blocked": true,
"depends_on": "[10,20]",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
crdt.apply(op);
let view = extract_item_view(&crdt.doc.items[0]).unwrap();
assert_eq!(view.story_id, "40_story_view");
assert_eq!(view.stage, "3_qa");
assert_eq!(view.name.as_deref(), Some("View Test"));
assert_eq!(view.agent.as_deref(), Some("coder-1"));
assert_eq!(view.retry_count, Some(2));
assert_eq!(view.blocked, Some(true));
assert_eq!(view.depends_on, Some(vec![10, 20]));
}
#[test]
fn rebuild_index_maps_story_ids() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
for (sid, stage) in &[("10_story_a", "1_backlog"), ("20_story_b", "2_current")] {
let item: JsonValue = json!({
"story_id": sid,
"stage": stage,
"name": "",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt.apply(op);
}
let index = rebuild_index(&crdt);
assert_eq!(index.len(), 2);
assert!(index.contains_key("10_story_a"));
assert!(index.contains_key("20_story_b"));
}
#[tokio::test]
async fn init_and_write_read_roundtrip() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("crdt_test.db");
// Init directly (not via the global singleton, for test isolation).
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
let keypair = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
// Insert and update like write_item does.
let item_json: JsonValue = json!({
"story_id": "50_story_roundtrip",
"stage": "1_backlog",
"name": "Roundtrip",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&keypair);
crdt.apply(insert_op.clone());
// Persist the op.
let op_json = serde_json::to_string(&insert_op).unwrap();
let op_id = hex::encode(&insert_op.id());
let now = chrono::Utc::now().to_rfc3339();
sqlx::query("INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)")
.bind(&op_id)
.bind(insert_op.inner.seq as i64)
.bind(&op_json)
.bind(&now)
.execute(&pool)
.await
.unwrap();
// Reconstruct from DB.
let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
.fetch_all(&pool)
.await
.unwrap();
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&keypair);
for (json_str,) in &rows {
let op: SignedOp = serde_json::from_str(json_str).unwrap();
crdt2.apply(op);
}
let view = extract_item_view(&crdt2.doc.items[0]).unwrap();
assert_eq!(view.story_id, "50_story_roundtrip");
assert_eq!(view.stage, "1_backlog");
assert_eq!(view.name.as_deref(), Some("Roundtrip"));
}
#[test]
fn signed_op_serialization_roundtrip() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = json!({
"story_id": "60_story_serde",
"stage": "1_backlog",
"name": "Serde Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
let json_str = serde_json::to_string(&op).unwrap();
let deserialized: SignedOp = serde_json::from_str(&json_str).unwrap();
assert_eq!(op.id(), deserialized.id());
assert_eq!(op.inner.seq, deserialized.inner.seq);
}
// ── CrdtEvent tests ─────────────────────────────────────────────────
#[test]
fn crdt_event_has_expected_fields() {
let evt = CrdtEvent {
story_id: "42_story_foo".to_string(),
from_stage: Some("1_backlog".to_string()),
to_stage: "2_current".to_string(),
name: Some("Foo Feature".to_string()),
};
assert_eq!(evt.story_id, "42_story_foo");
assert_eq!(evt.from_stage.as_deref(), Some("1_backlog"));
assert_eq!(evt.to_stage, "2_current");
assert_eq!(evt.name.as_deref(), Some("Foo Feature"));
}
#[test]
fn crdt_event_clone_preserves_data() {
let evt = CrdtEvent {
story_id: "10_story_bar".to_string(),
from_stage: None,
to_stage: "1_backlog".to_string(),
name: None,
};
let cloned = evt.clone();
assert_eq!(cloned.story_id, "10_story_bar");
assert!(cloned.from_stage.is_none());
assert!(cloned.name.is_none());
}
#[test]
fn emit_event_is_noop_when_channel_not_initialised() {
// Before CRDT_EVENT_TX is set, emit_event should not panic.
// This test verifies the guard clause works. In test binaries the
// OnceLock may already be set by another test, so we just verify
// the function doesn't panic regardless.
emit_event(CrdtEvent {
story_id: "99_story_noop".to_string(),
from_stage: None,
to_stage: "1_backlog".to_string(),
name: None,
});
}
#[test]
fn crdt_event_broadcast_channel_round_trip() {
let (tx, mut rx) = broadcast::channel::<CrdtEvent>(16);
let evt = CrdtEvent {
story_id: "70_story_broadcast".to_string(),
from_stage: Some("1_backlog".to_string()),
to_stage: "2_current".to_string(),
name: Some("Broadcast Test".to_string()),
};
tx.send(evt).unwrap();
let received = rx.try_recv().unwrap();
assert_eq!(received.story_id, "70_story_broadcast");
assert_eq!(received.from_stage.as_deref(), Some("1_backlog"));
assert_eq!(received.to_stage, "2_current");
assert_eq!(received.name.as_deref(), Some("Broadcast Test"));
}
#[test]
fn dep_is_done_crdt_returns_false_when_no_crdt_state() {
// When the global CRDT state is not initialised (or in a test environment),
// dep_is_done_crdt should return false rather than panicking.
// Note: in the test binary the global may or may not be initialised,
// but the function should never panic either way.
let _ = dep_is_done_crdt(9999);
}
#[test]
fn check_unmet_deps_crdt_returns_empty_when_item_not_found() {
// Non-existent story should return empty deps.
let result = check_unmet_deps_crdt("nonexistent_story");
assert!(result.is_empty());
}
// ── Bug 503: archived-dep visibility ─────────────────────────────────────
#[test]
fn dep_is_archived_crdt_returns_false_when_no_crdt_state() {
// When the global CRDT state is not initialised, must not panic.
let _ = dep_is_archived_crdt(9998);
}
#[test]
fn check_archived_deps_crdt_returns_empty_when_item_not_found() {
// Non-existent story should return empty archived deps.
let result = check_archived_deps_crdt("nonexistent_story_archived");
assert!(result.is_empty());
}
// ── 478: WebSocket CRDT sync layer tests ────────────────────────────────
#[test]
fn apply_remote_op_returns_false_when_not_initialised() {
// Without the global CRDT state, apply_remote_op should return false.
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = serde_json::json!({
"story_id": "80_story_remote",
"stage": "1_backlog",
"name": "Remote",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt
.doc
.items
.insert(bft_json_crdt::op::ROOT_ID, item)
.sign(&kp);
// This uses the global state which may not be initialised in tests.
let _ = apply_remote_op(op);
}
#[test]
fn signed_op_survives_sync_serialization_roundtrip() {
// Verify that a SignedOp serialised to JSON and back produces
// the same op (critical for the sync wire protocol).
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = serde_json::json!({
"story_id": "90_story_wire",
"stage": "2_current",
"name": "Wire Test",
"agent": "coder",
"retry_count": 1.0,
"blocked": false,
"depends_on": "[10]",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt
.doc
.items
.insert(bft_json_crdt::op::ROOT_ID, item)
.sign(&kp);
let json1 = serde_json::to_string(&op).unwrap();
let roundtripped: SignedOp = serde_json::from_str(&json1).unwrap();
let json2 = serde_json::to_string(&roundtripped).unwrap();
assert_eq!(json1, json2);
assert_eq!(op.id(), roundtripped.id());
assert_eq!(op.inner.seq, roundtripped.inner.seq);
assert_eq!(op.author(), roundtripped.author());
}
#[test]
fn sync_broadcast_channel_round_trip() {
let (tx, mut rx) = broadcast::channel::<SignedOp>(16);
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = serde_json::json!({
"story_id": "95_story_sync_bcast",
"stage": "1_backlog",
"name": "",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt
.doc
.items
.insert(bft_json_crdt::op::ROOT_ID, item)
.sign(&kp);
tx.send(op.clone()).unwrap();
let received = rx.try_recv().unwrap();
assert_eq!(received.id(), op.id());
}
// ── Bug 511: CRDT lamport clock resets on restart ────────────────────────
//
// Root cause: Op::sign() always produces SignedOp with depends_on = vec![],
// so the causal dependency queue never engages during replay. Field update
// ops (seq=1,2,3 from each field's LwwRegisterCrdt counter) are replayed
// before list insert ops (seq=N from the items ListCrdt counter) when
// ordered by `seq ASC`. They fail ErrPathMismatch silently, their our_seq
// is never updated, and the next field write re-uses seq=1.
//
// Fix: replay by `rowid ASC` (SQLite insertion order) instead of `seq ASC`.
// Rowid preserves the causal order ops were originally applied in, so field
// updates always come after the item insert they reference.
#[tokio::test]
async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("bug511.db");
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
// Insert 5 dummy items to advance items.our_seq to 5.
for i in 0..5u32 {
let sid = format!("{}_story_warmup", i);
let item: JsonValue = json!({
"story_id": sid,
"stage": "1_backlog",
"name": "",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt.apply(op.clone());
// We don't persist these to the DB — they are pre-history.
}
// Now insert the real item. items.our_seq was 5, so this op gets seq=6.
let target_item: JsonValue = json!({
"story_id": "511_story_target",
"stage": "1_backlog",
"name": "Bug 511 target",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp);
crdt.apply(insert_op.clone());
// insert_op.inner.seq == 6
// Now update the stage. The stage LwwRegisterCrdt for this item starts
// at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6.
let idx = rebuild_index(&crdt)["511_story_target"];
let stage_op = crdt.doc.items[idx]
.stage
.set("2_current".to_string())
.sign(&kp);
crdt.apply(stage_op.clone());
// stage_op.inner.seq == 1
// Persist BOTH ops in causal order (insert first, update second).
// This means insert_op gets rowid < stage_op rowid.
let now = chrono::Utc::now().to_rfc3339();
for op in [&insert_op, &stage_op] {
let op_json = serde_json::to_string(op).unwrap();
let op_id = hex::encode(&op.id());
sqlx::query(
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)",
)
.bind(&op_id)
.bind(op.inner.seq as i64)
.bind(&op_json)
.bind(&now)
.execute(&pool)
.await
.unwrap();
}
// Replay by rowid ASC (the fix). The insert must come before the field
// update regardless of their field-level seq values.
let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
.fetch_all(&pool)
.await
.unwrap();
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
for (json_str,) in &rows {
let op: SignedOp = serde_json::from_str(json_str).unwrap();
crdt2.apply(op);
}
// The item must be in the CRDT and must reflect the stage update.
let index2 = rebuild_index(&crdt2);
assert!(
index2.contains_key("511_story_target"),
"item not found after rowid-order replay"
);
let idx2 = index2["511_story_target"];
let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap();
assert_eq!(
view.stage, "2_current",
"stage field update lost during replay (bug 511 regression)"
);
// Confirm the bug is reproducible by replaying seq ASC instead.
// With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6),
// fails ErrPathMismatch, and the item ends up at "1_backlog".
let rows_wrong_order: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC")
.fetch_all(&pool)
.await
.unwrap();
let mut crdt3 = BaseCrdt::<PipelineDoc>::new(&kp);
for (json_str,) in &rows_wrong_order {
let op: SignedOp = serde_json::from_str(json_str).unwrap();
crdt3.apply(op);
}
let index3 = rebuild_index(&crdt3);
// With seq ASC replay, the item is created (insert_op eventually runs)
// but the stage update is lost (it ran before the item existed).
if let Some(idx3) = index3.get("511_story_target") {
let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap();
// The bug: stage is still "1_backlog" because the update was dropped.
assert_eq!(
view3.stage, "1_backlog",
"expected seq-ASC replay to exhibit the bug (update lost)"
);
}
}
// ── Story 518: persist_tx send failure logging ───────────────────────────
#[test]
fn persist_tx_send_failure_logs_error() {
let kp = make_keypair();
let crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let (persist_tx, persist_rx) = mpsc::unbounded_channel::<SignedOp>();
let mut state = CrdtState {
crdt,
keypair: kp,
index: HashMap::new(),
node_index: HashMap::new(),
persist_tx,
};
// Drop the receiver so that the next send fails immediately.
drop(persist_rx);
let item_json: JsonValue = json!({
"story_id": "518_story_persist_fail",
"stage": "1_backlog",
"name": "Persist Fail Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let before_errors = crate::log_buffer::global()
.get_recent_entries(1000, None, Some(&crate::log_buffer::LogLevel::Error))
.len();
apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json));
let error_entries = crate::log_buffer::global().get_recent_entries(
1000,
None,
Some(&crate::log_buffer::LogLevel::Error),
);
assert!(
error_entries.len() > before_errors,
"expected an ERROR log entry when persist_tx send fails, but none was added"
);
let last_error = &error_entries[error_entries.len() - 1];
assert!(
last_error.message.contains("persist"),
"error message should mention persist: {}",
last_error.message
);
assert!(
last_error.message.contains("ahead") || last_error.message.contains("diverged"),
"error message should note in-memory/persisted divergence: {}",
last_error.message
);
}
// ── Story 631: vector clock delta sync tests ────────────────────────
/// Helper: create N signed insert ops on a CRDT and return them with their JSON.
fn make_ops(
kp: &Ed25519KeyPair,
crdt: &mut BaseCrdt<PipelineDoc>,
count: usize,
prefix: &str,
) -> Vec<(SignedOp, String)> {
let mut ops = Vec::new();
for i in 0..count {
let item: JsonValue = json!({
"story_id": format!("{prefix}_{i}"),
"stage": "1_backlog",
"name": format!("Item {i}"),
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(kp);
crdt.apply(op.clone());
let json = serde_json::to_string(&op).unwrap();
ops.push((op, json));
}
ops
}
/// Build a vector clock from a list of (SignedOp, json) pairs.
fn build_clock(ops: &[(SignedOp, String)]) -> VectorClock {
let mut clock = VectorClock::new();
for (op, _) in ops {
let author = hex::encode(&op.author());
*clock.entry(author).or_insert(0) += 1;
}
clock
}
/// Compute ops_since against a local journal and peer clock.
///
/// Mirrors the production `ops_since` logic but operates on a local Vec
/// instead of the global `ALL_OPS` static.
fn local_ops_since(all_ops: &[(SignedOp, String)], peer_clock: &VectorClock) -> Vec<String> {
let mut author_counts: HashMap<String, u64> = HashMap::new();
let mut result = Vec::new();
for (op, json) in all_ops {
let author = hex::encode(&op.author());
let count = author_counts.entry(author.clone()).or_insert(0);
*count += 1;
let peer_has = peer_clock.get(&author).copied().unwrap_or(0);
if *count > peer_has {
result.push(json.clone());
}
}
result
}
/// Integration test (low-bandwidth sync): two nodes, A applies 100 ops,
/// B reconnects with a current clock — B receives 0 ops on the bulk phase.
#[test]
fn delta_sync_low_bandwidth_fully_caught_up() {
let kp_a = make_keypair();
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
let ops_a = make_ops(&kp_a, &mut crdt_a, 100, "631_low");
// B has already seen all 100 ops (its clock matches A's journal).
let clock_b = build_clock(&ops_a);
// Delta should be empty.
let delta = local_ops_since(&ops_a, &clock_b);
assert_eq!(
delta.len(),
0,
"caught-up peer should receive 0 ops, got {}",
delta.len()
);
}
/// Integration test (mid-stream): A applies 100 ops, B disconnects,
/// A applies 50 more ops, B reconnects — B receives exactly the 50 missed ops.
#[test]
fn delta_sync_mid_stream_partial_catch_up() {
let kp_a = make_keypair();
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
// Phase 1: 100 ops that B has seen.
let ops_phase1 = make_ops(&kp_a, &mut crdt_a, 100, "631_mid1");
let clock_b = build_clock(&ops_phase1);
// Phase 2: 50 more ops that B missed.
let ops_phase2 = make_ops(&kp_a, &mut crdt_a, 50, "631_mid2");
// A's full journal is phase1 + phase2.
let mut all_ops_a: Vec<(SignedOp, String)> = ops_phase1;
all_ops_a.extend(ops_phase2);
let delta = local_ops_since(&all_ops_a, &clock_b);
assert_eq!(
delta.len(),
50,
"peer should receive exactly 50 missed ops, got {}",
delta.len()
);
}
/// Integration test (new node): C connects with empty clock,
/// receives all 150 ops — verifies fallback behaviour.
#[test]
fn delta_sync_new_node_receives_all_ops() {
let kp_a = make_keypair();
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
let ops_phase1 = make_ops(&kp_a, &mut crdt_a, 100, "631_new1");
let ops_phase2 = make_ops(&kp_a, &mut crdt_a, 50, "631_new2");
let mut all_ops_a: Vec<(SignedOp, String)> = ops_phase1;
all_ops_a.extend(ops_phase2);
// Empty clock = new node.
let empty_clock = VectorClock::new();
let delta = local_ops_since(&all_ops_a, &empty_clock);
assert_eq!(
delta.len(),
150,
"new node should receive all 150 ops, got {}",
delta.len()
);
}
/// Multi-author delta sync: ops from two different nodes, peer has seen
/// all of one author but none of the other.
#[test]
fn delta_sync_multi_author() {
use fastcrypto::traits::KeyPair;
let kp_a = make_keypair();
let kp_b = make_keypair();
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
let ops_a = make_ops(&kp_a, &mut crdt_a, 30, "631_ma_a");
let ops_b = make_ops(&kp_b, &mut crdt_b, 20, "631_ma_b");
// Combined journal on a hypothetical server.
let mut all_ops: Vec<(SignedOp, String)> = ops_a.clone();
all_ops.extend(ops_b);
// Peer has seen all of A's ops but none of B's.
let mut peer_clock = VectorClock::new();
let author_a_hex = hex::encode(&kp_a.public().0.to_bytes());
peer_clock.insert(author_a_hex, 30);
let delta = local_ops_since(&all_ops, &peer_clock);
assert_eq!(
delta.len(),
20,
"peer should receive 20 ops from author B, got {}",
delta.len()
);
}
/// Vector clock construction from ops.
#[test]
fn build_vector_clock_from_ops() {
use fastcrypto::traits::KeyPair;
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let ops = make_ops(&kp, &mut crdt, 10, "631_vc");
let clock = build_clock(&ops);
let author_hex = hex::encode(&kp.public().0.to_bytes());
assert_eq!(clock.len(), 1, "single author should produce 1 clock entry");
assert_eq!(clock[&author_hex], 10, "clock should show 10 ops");
}
/// Wire format: clock message serialization round-trip.
#[test]
fn clock_message_serialization_roundtrip() {
let mut clock = VectorClock::new();
clock.insert("aabbcc".to_string(), 42);
clock.insert("ddeeff".to_string(), 7);
let json = serde_json::to_value(&clock).unwrap();
assert!(json.is_object());
let deserialized: VectorClock = serde_json::from_value(json).unwrap();
assert_eq!(deserialized["aabbcc"], 42);
assert_eq!(deserialized["ddeeff"], 7);
}