fix: add --all to cargo fmt in script/test and autoformat codebase

cargo fmt without --all fails with "Failed to find targets" in
workspace repos. This was blocking every story's gates. Also ran
cargo fmt --all to fix all existing formatting issues.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
dave
2026-04-13 14:07:08 +00:00
parent ed2526ce41
commit 845b85e7a7
128 changed files with 3566 additions and 2395 deletions
+75 -60
View File
@@ -20,8 +20,8 @@ use bft_json_crdt::op::ROOT_ID;
use fastcrypto::ed25519::Ed25519KeyPair;
use fastcrypto::traits::ToFromBytes;
use serde_json::json;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use sqlx::sqlite::SqliteConnectOptions;
use std::path::Path;
use tokio::sync::{broadcast, mpsc};
@@ -218,10 +218,9 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
let mut crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
// Replay persisted ops to reconstruct state.
let rows: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
.fetch_all(&pool)
.await?;
let rows: Vec<(String,)> = sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
.fetch_all(&pool)
.await?;
let mut all_ops_vec = Vec::with_capacity(rows.len());
for (op_json,) in &rows {
@@ -316,7 +315,13 @@ pub fn init_for_test() {
let keypair = make_keypair();
let crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
let (persist_tx, _rx) = mpsc::unbounded_channel();
let state = CrdtState { crdt, keypair, index: HashMap::new(), node_index: HashMap::new(), persist_tx };
let state = CrdtState {
crdt,
keypair,
index: HashMap::new(),
node_index: HashMap::new(),
persist_tx,
};
let _ = lock.set(Mutex::new(state));
}
});
@@ -458,9 +463,7 @@ pub fn write_item(
});
}
if let Some(b) = blocked {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].blocked.set(b)
});
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].blocked.set(b));
}
if let Some(d) = depends_on {
apply_and_persist(&mut state, |s| {
@@ -473,14 +476,10 @@ pub fn write_item(
});
}
if let Some(ca) = claimed_at {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claimed_at.set(ca)
});
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(ca));
}
if let Some(ma) = merged_at {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].merged_at.set(ma)
});
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma));
}
// Broadcast a CrdtEvent if the stage actually changed.
@@ -514,9 +513,7 @@ pub fn write_item(
})
.into();
apply_and_persist(&mut state, |s| {
s.crdt.doc.items.insert(ROOT_ID, item_json)
});
apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json));
// Rebuild index after insertion (indices may shift).
state.index = rebuild_index(&state.crdt);
@@ -561,11 +558,9 @@ pub fn apply_remote_op(op: SignedOp) -> bool {
let pre_stages: HashMap<String, String> = state
.index
.iter()
.filter_map(|(sid, &idx)| {
match state.crdt.doc.items[idx].stage.view() {
JsonValue::String(s) => Some((sid.clone(), s)),
_ => None,
}
.filter_map(|(sid, &idx)| match state.crdt.doc.items[idx].stage.view() {
JsonValue::String(s) => Some((sid.clone(), s)),
_ => None,
})
.collect();
@@ -668,9 +663,7 @@ pub fn write_claim(story_id: &str) -> bool {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claimed_by.set(node_id.clone())
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claimed_at.set(now)
});
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(now));
true
}
@@ -690,9 +683,7 @@ pub fn release_claim(story_id: &str) {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claimed_by.set(String::new())
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claimed_at.set(0.0)
});
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(0.0));
}
/// Check if this node currently holds the claim on a pipeline item.
@@ -725,9 +716,7 @@ pub fn write_node_presence(node_id: &str, address: &str, last_seen: f64, alive:
apply_and_persist(&mut state, |s| {
s.crdt.doc.nodes[idx].last_seen.set(last_seen)
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.nodes[idx].alive.set(alive)
});
apply_and_persist(&mut state, |s| s.crdt.doc.nodes[idx].alive.set(alive));
apply_and_persist(&mut state, |s| {
s.crdt.doc.nodes[idx].address.set(address.to_string())
});
@@ -741,9 +730,7 @@ pub fn write_node_presence(node_id: &str, address: &str, last_seen: f64, alive:
})
.into();
apply_and_persist(&mut state, |s| {
s.crdt.doc.nodes.insert(ROOT_ID, node_json)
});
apply_and_persist(&mut state, |s| s.crdt.doc.nodes.insert(ROOT_ID, node_json));
// Rebuild node index after insertion.
state.node_index = rebuild_node_index(&state.crdt);
@@ -1019,8 +1006,7 @@ pub fn read_item(story_id: &str) -> Option<PipelineItemView> {
/// or an `Err` if the CRDT layer isn't initialised or the story_id is
/// unknown to the in-memory state.
pub fn evict_item(story_id: &str) -> Result<(), String> {
let state_mutex = get_crdt()
.ok_or_else(|| "CRDT layer not initialised".to_string())?;
let state_mutex = get_crdt().ok_or_else(|| "CRDT layer not initialised".to_string())?;
let mut state = state_mutex
.lock()
.map_err(|e| format!("CRDT lock poisoned: {e}"))?;
@@ -1033,12 +1019,10 @@ pub fn evict_item(story_id: &str) -> Result<(), String> {
// Resolve the item's OpId before the closure (the closure will mutably
// borrow `state`, so we can't access `state.crdt.doc.items` from inside).
let item_id = state
.crdt
.doc
.items
.id_at(idx)
.ok_or_else(|| format!("Item index {idx} for '{story_id}' did not resolve to an OpId"))?;
let item_id =
state.crdt.doc.items.id_at(idx).ok_or_else(|| {
format!("Item index {idx} for '{story_id}' did not resolve to an OpId")
})?;
// Write the delete op via the existing apply_and_persist machinery.
// This signs the op, applies it to the in-memory CRDT (marking the item
@@ -1084,9 +1068,7 @@ fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemView> {
_ => None,
};
let depends_on = match item.depends_on.view() {
JsonValue::String(s) if !s.is_empty() => {
serde_json::from_str::<Vec<u32>>(&s).ok()
}
JsonValue::String(s) if !s.is_empty() => serde_json::from_str::<Vec<u32>>(&s).ok(),
_ => None,
};
@@ -1142,9 +1124,9 @@ pub fn dep_is_done_crdt(dep_number: u32) -> bool {
pub fn dep_is_archived_crdt(dep_number: u32) -> bool {
let prefix = format!("{dep_number}_");
if let Some(items) = read_all_items() {
items.iter().any(|item| {
item.story_id.starts_with(&prefix) && item.stage == "6_archived"
})
items
.iter()
.any(|item| item.story_id.starts_with(&prefix) && item.stage == "6_archived")
} else {
false
}
@@ -1226,8 +1208,14 @@ mod tests {
assert_eq!(view.len(), 1);
let item = &crdt.doc.items[0];
assert_eq!(item.story_id.view(), JsonValue::String("10_story_test".to_string()));
assert_eq!(item.stage.view(), JsonValue::String("2_current".to_string()));
assert_eq!(
item.story_id.view(),
JsonValue::String("10_story_test".to_string())
);
assert_eq!(
item.stage.view(),
JsonValue::String("2_current".to_string())
);
}
#[test]
@@ -1252,7 +1240,10 @@ mod tests {
crdt.apply(insert_op);
// Update stage
let stage_op = crdt.doc.items[0].stage.set("2_current".to_string()).sign(&kp);
let stage_op = crdt.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp);
crdt.apply(stage_op);
assert_eq!(
@@ -1283,10 +1274,16 @@ mod tests {
let op1 = crdt1.doc.items.insert(ROOT_ID, item_json).sign(&kp);
crdt1.apply(op1.clone());
let op2 = crdt1.doc.items[0].stage.set("2_current".to_string()).sign(&kp);
let op2 = crdt1.doc.items[0]
.stage
.set("2_current".to_string())
.sign(&kp);
crdt1.apply(op2.clone());
let op3 = crdt1.doc.items[0].name.set("Updated Name".to_string()).sign(&kp);
let op3 = crdt1.doc.items[0]
.name
.set("Updated Name".to_string())
.sign(&kp);
crdt1.apply(op3.clone());
// Replay ops on a fresh CRDT.
@@ -1568,7 +1565,11 @@ mod tests {
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp);
let op = crdt
.doc
.items
.insert(bft_json_crdt::op::ROOT_ID, item)
.sign(&kp);
// This uses the global state which may not be initialised in tests.
let _ = apply_remote_op(op);
}
@@ -1591,7 +1592,11 @@ mod tests {
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp);
let op = crdt
.doc
.items
.insert(bft_json_crdt::op::ROOT_ID, item)
.sign(&kp);
let json1 = serde_json::to_string(&op).unwrap();
let roundtripped: SignedOp = serde_json::from_str(&json1).unwrap();
@@ -1620,7 +1625,11 @@ mod tests {
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp);
let op = crdt
.doc
.items
.insert(bft_json_crdt::op::ROOT_ID, item)
.sign(&kp);
tx.send(op.clone()).unwrap();
let received = rx.try_recv().unwrap();
@@ -1693,7 +1702,10 @@ mod tests {
// Now update the stage. The stage LwwRegisterCrdt for this item starts
// at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6.
let idx = rebuild_index(&crdt)["511_story_target"];
let stage_op = crdt.doc.items[idx].stage.set("2_current".to_string()).sign(&kp);
let stage_op = crdt.doc.items[idx]
.stage
.set("2_current".to_string())
.sign(&kp);
crdt.apply(stage_op.clone());
// stage_op.inner.seq == 1
@@ -1808,8 +1820,11 @@ mod tests {
apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json));
let error_entries = crate::log_buffer::global()
.get_recent_entries(1000, None, Some(&crate::log_buffer::LogLevel::Error));
let error_entries = crate::log_buffer::global().get_recent_entries(
1000,
None,
Some(&crate::log_buffer::LogLevel::Error),
);
assert!(
error_entries.len() > before_errors,