Revert "fix(crdt_snapshot): serialise tests that share global SNAPSHOT_STATE / ALL_OPS / VECTOR_CLOCK (bug 669)"
This reverts commit 8e608feec1.
This commit is contained in:
@@ -439,12 +439,6 @@ mod tests {
|
|||||||
|
|
||||||
use crate::crdt_state::PipelineDoc;
|
use crate::crdt_state::PipelineDoc;
|
||||||
|
|
||||||
/// Serialises tests that mutate the global SNAPSHOT_STATE / ALL_OPS / VECTOR_CLOCK
|
|
||||||
/// statics. These statics are shared across test threads (only the per-thread
|
|
||||||
/// CRDT is thread-local), so without this lock parallel tests interleave their
|
|
||||||
/// op writes and snapshot generation.
|
|
||||||
static GLOBAL_STATE_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
|
|
||||||
|
|
||||||
// ── Wire message tests ──────────────────────────────────────────────
|
// ── Wire message tests ──────────────────────────────────────────────
|
||||||
|
|
||||||
/// Snapshot wire message serialization round-trip.
|
/// Snapshot wire message serialization round-trip.
|
||||||
@@ -522,14 +516,12 @@ mod tests {
|
|||||||
/// Single node is always leader.
|
/// Single node is always leader.
|
||||||
#[test]
|
#[test]
|
||||||
fn single_node_is_always_leader() {
|
fn single_node_is_always_leader() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
assert!(is_snapshot_leader("node_a", &[]));
|
assert!(is_snapshot_leader("node_a", &[]));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Leader is the node with the lowest hash.
|
/// Leader is the node with the lowest hash.
|
||||||
#[test]
|
#[test]
|
||||||
fn leader_is_lowest_hash_node() {
|
fn leader_is_lowest_hash_node() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
let nodes = vec![
|
let nodes = vec![
|
||||||
"node_alpha".to_string(),
|
"node_alpha".to_string(),
|
||||||
"node_beta".to_string(),
|
"node_beta".to_string(),
|
||||||
@@ -561,7 +553,6 @@ mod tests {
|
|||||||
/// Leader selection is deterministic.
|
/// Leader selection is deterministic.
|
||||||
#[test]
|
#[test]
|
||||||
fn leader_selection_is_deterministic() {
|
fn leader_selection_is_deterministic() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
let peers = vec!["a".to_string(), "b".to_string(), "c".to_string()];
|
let peers = vec!["a".to_string(), "b".to_string(), "c".to_string()];
|
||||||
let result1 = is_snapshot_leader("a", &peers);
|
let result1 = is_snapshot_leader("a", &peers);
|
||||||
let result2 = is_snapshot_leader("a", &peers);
|
let result2 = is_snapshot_leader("a", &peers);
|
||||||
@@ -573,7 +564,6 @@ mod tests {
|
|||||||
/// Threshold is configurable.
|
/// Threshold is configurable.
|
||||||
#[test]
|
#[test]
|
||||||
fn snapshot_threshold_default() {
|
fn snapshot_threshold_default() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
// When not set, defaults to DEFAULT_SNAPSHOT_THRESHOLD.
|
// When not set, defaults to DEFAULT_SNAPSHOT_THRESHOLD.
|
||||||
assert_eq!(snapshot_threshold(), DEFAULT_SNAPSHOT_THRESHOLD);
|
assert_eq!(snapshot_threshold(), DEFAULT_SNAPSHOT_THRESHOLD);
|
||||||
}
|
}
|
||||||
@@ -583,7 +573,6 @@ mod tests {
|
|||||||
/// Record ack returns true when all peers have acked.
|
/// Record ack returns true when all peers have acked.
|
||||||
#[test]
|
#[test]
|
||||||
fn coordination_quorum_reached() {
|
fn coordination_quorum_reached() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
init();
|
init();
|
||||||
let snapshot = Snapshot {
|
let snapshot = Snapshot {
|
||||||
at_seq: 10,
|
at_seq: 10,
|
||||||
@@ -604,7 +593,6 @@ mod tests {
|
|||||||
/// Ack for wrong at_seq is rejected.
|
/// Ack for wrong at_seq is rejected.
|
||||||
#[test]
|
#[test]
|
||||||
fn coordination_ack_wrong_seq_rejected() {
|
fn coordination_ack_wrong_seq_rejected() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
init();
|
init();
|
||||||
let snapshot = Snapshot {
|
let snapshot = Snapshot {
|
||||||
at_seq: 20,
|
at_seq: 20,
|
||||||
@@ -622,7 +610,6 @@ mod tests {
|
|||||||
/// Abort clears pending state.
|
/// Abort clears pending state.
|
||||||
#[test]
|
#[test]
|
||||||
fn coordination_abort_clears_state() {
|
fn coordination_abort_clears_state() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
init();
|
init();
|
||||||
let snapshot = Snapshot {
|
let snapshot = Snapshot {
|
||||||
at_seq: 30,
|
at_seq: 30,
|
||||||
@@ -643,7 +630,6 @@ mod tests {
|
|||||||
/// Unacked peers are reported correctly.
|
/// Unacked peers are reported correctly.
|
||||||
#[test]
|
#[test]
|
||||||
fn unacked_peers_reported() {
|
fn unacked_peers_reported() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
init();
|
init();
|
||||||
let snapshot = Snapshot {
|
let snapshot = Snapshot {
|
||||||
at_seq: 40,
|
at_seq: 40,
|
||||||
@@ -674,7 +660,6 @@ mod tests {
|
|||||||
/// Snapshot generation from ops includes attribution manifest.
|
/// Snapshot generation from ops includes attribution manifest.
|
||||||
#[test]
|
#[test]
|
||||||
fn snapshot_generation_includes_manifest() {
|
fn snapshot_generation_includes_manifest() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
crdt_state::init_for_test();
|
crdt_state::init_for_test();
|
||||||
|
|
||||||
// Write some items to populate ALL_OPS.
|
// Write some items to populate ALL_OPS.
|
||||||
@@ -720,7 +705,6 @@ mod tests {
|
|||||||
/// Attribution can be queried by story_id after snapshot.
|
/// Attribution can be queried by story_id after snapshot.
|
||||||
#[test]
|
#[test]
|
||||||
fn attribution_query_by_story_id() {
|
fn attribution_query_by_story_id() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
crdt_state::init_for_test();
|
crdt_state::init_for_test();
|
||||||
init();
|
init();
|
||||||
|
|
||||||
@@ -757,7 +741,6 @@ mod tests {
|
|||||||
/// After compaction, ALL_OPS size is reduced.
|
/// After compaction, ALL_OPS size is reduced.
|
||||||
#[test]
|
#[test]
|
||||||
fn compaction_reduces_ops() {
|
fn compaction_reduces_ops() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
crdt_state::init_for_test();
|
crdt_state::init_for_test();
|
||||||
init();
|
init();
|
||||||
|
|
||||||
@@ -799,7 +782,6 @@ mod tests {
|
|||||||
/// Latest snapshot is available after compaction.
|
/// Latest snapshot is available after compaction.
|
||||||
#[test]
|
#[test]
|
||||||
fn latest_snapshot_available_after_compaction() {
|
fn latest_snapshot_available_after_compaction() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
crdt_state::init_for_test();
|
crdt_state::init_for_test();
|
||||||
init();
|
init();
|
||||||
|
|
||||||
@@ -831,7 +813,6 @@ mod tests {
|
|||||||
/// the same `at_seq` and pruned ops disappear from each node's log.
|
/// the same `at_seq` and pruned ops disappear from each node's log.
|
||||||
#[test]
|
#[test]
|
||||||
fn three_node_compaction_convergence() {
|
fn three_node_compaction_convergence() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
use bft_json_crdt::json_crdt::BaseCrdt;
|
use bft_json_crdt::json_crdt::BaseCrdt;
|
||||||
|
|
||||||
// Simulate 3 independent nodes.
|
// Simulate 3 independent nodes.
|
||||||
@@ -943,7 +924,6 @@ mod tests {
|
|||||||
/// abort the compaction cleanly (no half-applied snapshot state).
|
/// abort the compaction cleanly (no half-applied snapshot state).
|
||||||
#[test]
|
#[test]
|
||||||
fn failure_mode_node_offline_aborts_cleanly() {
|
fn failure_mode_node_offline_aborts_cleanly() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
init();
|
init();
|
||||||
|
|
||||||
let snapshot = Snapshot {
|
let snapshot = Snapshot {
|
||||||
@@ -989,7 +969,6 @@ mod tests {
|
|||||||
/// + ops with seq >= at_seq.
|
/// + ops with seq >= at_seq.
|
||||||
#[test]
|
#[test]
|
||||||
fn new_node_onboarding_with_snapshot() {
|
fn new_node_onboarding_with_snapshot() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
let kp = make_keypair();
|
let kp = make_keypair();
|
||||||
let mut crdt_existing = BaseCrdt::<PipelineDoc>::new(&kp);
|
let mut crdt_existing = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||||
|
|
||||||
@@ -1047,7 +1026,6 @@ mod tests {
|
|||||||
/// Peers without snapshot support fall back to vector-clock-based full sync.
|
/// Peers without snapshot support fall back to vector-clock-based full sync.
|
||||||
#[test]
|
#[test]
|
||||||
fn backwards_compat_unknown_snapshot_message_ignored() {
|
fn backwards_compat_unknown_snapshot_message_ignored() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
// A peer that doesn't understand snapshot messages should be able to
|
// A peer that doesn't understand snapshot messages should be able to
|
||||||
// parse them as unknown variants and ignore them gracefully.
|
// parse them as unknown variants and ignore them gracefully.
|
||||||
let snapshot_json = r#"{"type":"snapshot","at_seq":100,"state":["op1"],"op_manifest":[]}"#;
|
let snapshot_json = r#"{"type":"snapshot","at_seq":100,"state":["op1"],"op_manifest":[]}"#;
|
||||||
@@ -1076,7 +1054,6 @@ mod tests {
|
|||||||
/// After compaction, an archived story's attribution can be reconstructed.
|
/// After compaction, an archived story's attribution can be reconstructed.
|
||||||
#[test]
|
#[test]
|
||||||
fn attribution_preserved_after_compaction() {
|
fn attribution_preserved_after_compaction() {
|
||||||
let _g = GLOBAL_STATE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
|
||||||
crdt_state::init_for_test();
|
crdt_state::init_for_test();
|
||||||
init();
|
init();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user