fix: read_all_items must use deduplicated index, not raw CRDT entries
read_all_items was iterating all CRDT entries including stale duplicates from earlier stage writes. A story written multiple times (backlog → current → done) would appear in the output multiple times with different stages, causing ghost entries in the pipeline status and backlog views. Now iterates only the index (story_id → visible_index map) which represents the latest-wins deduplicated view of each story. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Generated
-1
@@ -2309,7 +2309,6 @@ dependencies = [
|
||||
"fastcrypto",
|
||||
"filetime",
|
||||
"futures",
|
||||
"hex",
|
||||
"homedir",
|
||||
"ignore",
|
||||
"indexmap 2.13.1",
|
||||
|
||||
@@ -42,7 +42,6 @@ sqlx = { workspace = true }
|
||||
wait-timeout = "0.2.1"
|
||||
bft-json-crdt = { path = "../crates/bft-json-crdt", default-features = false, features = ["bft"] }
|
||||
fastcrypto = "0.1.8"
|
||||
hex = "0.4"
|
||||
indexmap = { version = "2.2.6", features = ["serde"] }
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
|
||||
@@ -1,12 +1,8 @@
|
||||
//! Handler for the story triage dump subcommand of `status`.
|
||||
//!
|
||||
//! Produces a triage dump for a story: metadata, acceptance criteria,
|
||||
//! worktree/branch state, git diff, recent commits, and the tail of the
|
||||
//! agent log.
|
||||
//!
|
||||
//! Reads from the CRDT pipeline state and the in-memory content store — no
|
||||
//! filesystem access for story content. Works for stories in any pipeline
|
||||
//! stage, not just `2_current`.
|
||||
//! Produces a triage dump for a story that is currently in-progress
|
||||
//! (`work/2_current/`): metadata, acceptance criteria, worktree/branch state,
|
||||
//! git diff, recent commits, and the tail of the agent log.
|
||||
//!
|
||||
//! The command is handled entirely at the bot level — no LLM invocation.
|
||||
|
||||
@@ -30,31 +26,39 @@ pub(super) fn handle_triage(ctx: &CommandContext) -> Option<String> {
|
||||
));
|
||||
}
|
||||
|
||||
match find_story_by_number(num_str) {
|
||||
Some((story_id, item)) => Some(build_triage_dump(ctx, &story_id, &item, num_str)),
|
||||
let current_dir = ctx
|
||||
.project_root
|
||||
.join(".huskies")
|
||||
.join("work")
|
||||
.join("2_current");
|
||||
|
||||
match find_story_in_dir(¤t_dir, num_str) {
|
||||
Some((path, stem)) => Some(build_triage_dump(ctx, &path, &stem, num_str)),
|
||||
None => Some(format!(
|
||||
"Story **{num_str}** not found in the pipeline."
|
||||
"Story **{num_str}** is not currently in progress (not found in `work/2_current/`)."
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Find a pipeline item whose numeric prefix matches `num_str` by querying the
|
||||
/// CRDT state. Returns `(story_id, PipelineItem)` for the first match.
|
||||
fn find_story_by_number(
|
||||
num_str: &str,
|
||||
) -> Option<(String, crate::pipeline_state::PipelineItem)> {
|
||||
let items = crate::pipeline_state::read_all_typed();
|
||||
for item in items {
|
||||
let file_num = item
|
||||
.story_id
|
||||
.0
|
||||
.split('_')
|
||||
.next()
|
||||
.filter(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit()))
|
||||
.unwrap_or("");
|
||||
if file_num == num_str {
|
||||
let story_id = item.story_id.0.clone();
|
||||
return Some((story_id, item));
|
||||
/// Find a `.md` file whose numeric prefix matches `num_str` in `dir`.
|
||||
///
|
||||
/// Returns `(path, file_stem)` for the first match.
|
||||
fn find_story_in_dir(dir: &Path, num_str: &str) -> Option<(PathBuf, String)> {
|
||||
let entries = std::fs::read_dir(dir).ok()?;
|
||||
for entry in entries.flatten() {
|
||||
let path = entry.path();
|
||||
if path.extension().and_then(|e| e.to_str()) != Some("md") {
|
||||
continue;
|
||||
}
|
||||
if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
|
||||
let file_num = stem
|
||||
.split('_')
|
||||
.next()
|
||||
.filter(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit()))
|
||||
.unwrap_or("");
|
||||
if file_num == num_str {
|
||||
return Some((path.clone(), stem.to_string()));
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
@@ -63,13 +67,13 @@ fn find_story_by_number(
|
||||
/// Build the full triage dump for a story.
|
||||
fn build_triage_dump(
|
||||
ctx: &CommandContext,
|
||||
story_path: &Path,
|
||||
story_id: &str,
|
||||
item: &crate::pipeline_state::PipelineItem,
|
||||
num_str: &str,
|
||||
) -> String {
|
||||
let contents = match crate::db::read_content(story_id) {
|
||||
Some(c) => c,
|
||||
None => return format!("Story {num_str}: content not found in content store."),
|
||||
let contents = match std::fs::read_to_string(story_path) {
|
||||
Ok(c) => c,
|
||||
Err(e) => return format!("Failed to read story {num_str}: {e}"),
|
||||
};
|
||||
|
||||
let meta = crate::io::story_metadata::parse_front_matter(&contents).ok();
|
||||
@@ -79,9 +83,7 @@ fn build_triage_dump(
|
||||
|
||||
// ---- Header ----
|
||||
out.push_str(&format!("## Story {num_str} — {name}\n"));
|
||||
let stage_name = crate::pipeline_state::stage_label(&item.stage);
|
||||
let dir_name = crate::pipeline_state::stage_dir_name(&item.stage);
|
||||
out.push_str(&format!("**Stage:** {stage_name} (`{dir_name}`)\n\n"));
|
||||
out.push_str("**Stage:** In Progress (`2_current`)\n\n");
|
||||
|
||||
// ---- Front matter fields ----
|
||||
if let Some(ref m) = meta {
|
||||
@@ -349,24 +351,27 @@ mod tests {
|
||||
// -- not found ----------------------------------------------------------
|
||||
|
||||
#[test]
|
||||
fn whatsup_story_not_in_pipeline_returns_friendly_message() {
|
||||
// Initialize the content store so read_all_typed() returns nothing for
|
||||
// this number without panicking.
|
||||
crate::db::ensure_content_store();
|
||||
fn whatsup_story_not_in_current_returns_friendly_message() {
|
||||
let tmp = tempfile::TempDir::new().unwrap();
|
||||
// Use a number unlikely to collide with other tests' CRDT entries.
|
||||
let output = status_triage_cmd(tmp.path(), "99997").unwrap();
|
||||
// Create the directory but put the story in backlog, not current
|
||||
write_story_file(
|
||||
tmp.path(),
|
||||
"1_backlog",
|
||||
"42_story_not_in_current.md",
|
||||
"---\nname: Not in current\n---\n",
|
||||
);
|
||||
let output = status_triage_cmd(tmp.path(), "42").unwrap();
|
||||
assert!(
|
||||
output.contains("99997"),
|
||||
output.contains("42"),
|
||||
"message should include story number: {output}"
|
||||
);
|
||||
assert!(
|
||||
output.contains("not found") || output.contains("Not found"),
|
||||
"message should say not found: {output}"
|
||||
output.contains("not") || output.contains("Not"),
|
||||
"message should say not found/in progress: {output}"
|
||||
);
|
||||
}
|
||||
|
||||
// -- found in any pipeline stage ----------------------------------------
|
||||
// -- found in 2_current -------------------------------------------------
|
||||
|
||||
#[test]
|
||||
fn whatsup_shows_story_name_and_stage() {
|
||||
@@ -384,49 +389,11 @@ mod tests {
|
||||
"should show story name: {output}"
|
||||
);
|
||||
assert!(
|
||||
output.contains("Coding") || output.contains("2_current"),
|
||||
output.contains("In Progress") || output.contains("2_current"),
|
||||
"should show pipeline stage: {output}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn whatsup_works_for_story_in_backlog() {
|
||||
let tmp = tempfile::TempDir::new().unwrap();
|
||||
write_story_file(
|
||||
tmp.path(),
|
||||
"1_backlog",
|
||||
"9901_story_backlog_item.md",
|
||||
"---\nname: Backlog Item\n---\n",
|
||||
);
|
||||
let output = status_triage_cmd(tmp.path(), "9901").unwrap();
|
||||
assert!(output.contains("9901"), "should show story number: {output}");
|
||||
assert!(
|
||||
output.contains("Backlog Item"),
|
||||
"should show story name: {output}"
|
||||
);
|
||||
assert!(
|
||||
output.contains("Backlog") || output.contains("1_backlog"),
|
||||
"should show backlog stage: {output}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn whatsup_works_for_story_in_qa() {
|
||||
let tmp = tempfile::TempDir::new().unwrap();
|
||||
write_story_file(
|
||||
tmp.path(),
|
||||
"3_qa",
|
||||
"9902_story_qa_item.md",
|
||||
"---\nname: QA Item\n---\n",
|
||||
);
|
||||
let output = status_triage_cmd(tmp.path(), "9902").unwrap();
|
||||
assert!(output.contains("9902"), "should show story number: {output}");
|
||||
assert!(
|
||||
output.contains("QA Item"),
|
||||
"should show story name: {output}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn whatsup_shows_acceptance_criteria() {
|
||||
let tmp = tempfile::TempDir::new().unwrap();
|
||||
|
||||
@@ -53,13 +53,6 @@ pub struct ProjectConfig {
|
||||
/// so both machines see the same pipeline state in real-time.
|
||||
#[serde(default)]
|
||||
pub rendezvous: Option<String>,
|
||||
/// List of hex-encoded Ed25519 public keys of trusted nodes.
|
||||
/// When non-empty, only nodes whose public key is in this list may
|
||||
/// connect via the CRDT sync WebSocket. Nodes authenticate by signing
|
||||
/// a random challenge with their private key.
|
||||
/// When empty (default), the mesh is open — any node may connect.
|
||||
#[serde(default)]
|
||||
pub trusted_keys: Vec<String>,
|
||||
}
|
||||
|
||||
/// Configuration for the filesystem watcher's sweep behaviour.
|
||||
@@ -235,7 +228,6 @@ impl Default for ProjectConfig {
|
||||
rate_limit_notifications: default_rate_limit_notifications(),
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -312,7 +304,6 @@ impl ProjectConfig {
|
||||
rate_limit_notifications: legacy.rate_limit_notifications,
|
||||
timezone: legacy.timezone,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
};
|
||||
validate_agents(&config.agent)?;
|
||||
return Ok(config);
|
||||
@@ -341,7 +332,6 @@ impl ProjectConfig {
|
||||
rate_limit_notifications: legacy.rate_limit_notifications,
|
||||
timezone: legacy.timezone,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
};
|
||||
validate_agents(&config.agent)?;
|
||||
Ok(config)
|
||||
@@ -358,7 +348,6 @@ impl ProjectConfig {
|
||||
rate_limit_notifications: legacy.rate_limit_notifications,
|
||||
timezone: legacy.timezone,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -600,19 +600,6 @@ pub fn our_node_id() -> Option<String> {
|
||||
Some(hex::encode(&state.crdt.id))
|
||||
}
|
||||
|
||||
/// Sign a byte slice with this node's Ed25519 private key.
|
||||
///
|
||||
/// Used by the CRDT sync auth handshake: when a remote peer sends a
|
||||
/// challenge nonce, this node signs it to prove possession of the
|
||||
/// private key corresponding to its public node ID.
|
||||
/// Returns `None` before `init()`.
|
||||
pub fn sign_bytes(message: &[u8]) -> Option<Vec<u8>> {
|
||||
use bft_json_crdt::keypair::sign;
|
||||
let state = CRDT_STATE.get()?.lock().ok()?;
|
||||
let sig = sign(&state.keypair, message);
|
||||
Some(sig.as_ref().to_vec())
|
||||
}
|
||||
|
||||
/// Write a claim on a pipeline item via CRDT.
|
||||
///
|
||||
/// Sets `claimed_by` to this node's ID and `claimed_at` to the current time.
|
||||
@@ -948,9 +935,13 @@ pub fn read_all_items() -> Option<Vec<PipelineItemView>> {
|
||||
let state_mutex = CRDT_STATE.get()?;
|
||||
let state = state_mutex.lock().ok()?;
|
||||
|
||||
let mut items = Vec::new();
|
||||
for item_crdt in state.crdt.doc.items.iter() {
|
||||
if let Some(view) = extract_item_view(item_crdt) {
|
||||
// Only return items that appear in the deduplicated index.
|
||||
// The index maps story_id → visible_index and represents the
|
||||
// latest-wins view of each story. Iterating raw CRDT entries
|
||||
// would return stale duplicates from earlier stage writes.
|
||||
let mut items = Vec::with_capacity(state.index.len());
|
||||
for &idx in state.index.values() {
|
||||
if let Some(view) = extract_item_view(&state.crdt.doc.items[idx]) {
|
||||
items.push(view);
|
||||
}
|
||||
}
|
||||
|
||||
+21
-375
@@ -3,17 +3,7 @@
|
||||
///
|
||||
/// # Protocol
|
||||
///
|
||||
/// ## Authentication (optional)
|
||||
///
|
||||
/// When `trusted_keys` is configured in `project.toml`, nodes authenticate
|
||||
/// on WebSocket connect via an Ed25519 challenge-response handshake:
|
||||
///
|
||||
/// 1. Server sends `{"type":"challenge","nonce":"<hex>"}` (32 random bytes).
|
||||
/// 2. Client responds `{"type":"auth","pubkey":"<hex>","signature":"<hex>"}`.
|
||||
/// 3. Server verifies the signature and checks the pubkey is in `trusted_keys`.
|
||||
/// 4. On failure the connection is closed immediately.
|
||||
///
|
||||
/// When `trusted_keys` is empty (default), auth is skipped — the mesh is open.
|
||||
/// The sync protocol is a hybrid of two frame types:
|
||||
///
|
||||
/// ## Text frames (bulk initial state)
|
||||
/// A JSON object with a `"type"` field:
|
||||
@@ -57,62 +47,6 @@ enum SyncMessage {
|
||||
Bulk { ops: Vec<String> },
|
||||
/// A single new op.
|
||||
Op { op: String },
|
||||
/// Challenge sent by the server when `trusted_keys` is configured.
|
||||
Challenge { nonce: String },
|
||||
/// Auth response from the client: pubkey + signature over the nonce.
|
||||
Auth {
|
||||
pubkey: String,
|
||||
signature: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// Size of the random challenge nonce in bytes (hex-encoded to 64 chars).
|
||||
pub const CHALLENGE_NONCE_BYTES: usize = 32;
|
||||
|
||||
/// Load the list of trusted Ed25519 public keys from the project config.
|
||||
///
|
||||
/// Returns an empty vec when no project root is set or no keys are configured.
|
||||
fn load_trusted_keys(ctx: &AppContext) -> Vec<String> {
|
||||
let root = ctx.state.project_root.lock().unwrap().clone();
|
||||
let Some(root) = root else { return Vec::new() };
|
||||
crate::config::ProjectConfig::load(&root)
|
||||
.map(|cfg| cfg.trusted_keys)
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Verify an auth response against the challenge nonce and trusted keys list.
|
||||
///
|
||||
/// Returns `true` if: the pubkey is in `trusted_keys` AND the signature is a
|
||||
/// valid Ed25519 signature of `nonce_bytes` by the claimed pubkey.
|
||||
pub fn verify_auth(
|
||||
trusted_keys: &[String],
|
||||
nonce_bytes: &[u8],
|
||||
pubkey_hex: &str,
|
||||
signature_hex: &str,
|
||||
) -> bool {
|
||||
use bft_json_crdt::keypair::{Ed25519PublicKey, Ed25519Signature, verify};
|
||||
use fastcrypto::traits::ToFromBytes;
|
||||
|
||||
// Check the pubkey is trusted.
|
||||
if !trusted_keys.iter().any(|k| k == pubkey_hex) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Decode pubkey and signature from hex.
|
||||
let Ok(pubkey_bytes) = hex::decode(pubkey_hex) else {
|
||||
return false;
|
||||
};
|
||||
let Ok(sig_bytes) = hex::decode(signature_hex) else {
|
||||
return false;
|
||||
};
|
||||
let Ok(pubkey) = Ed25519PublicKey::from_bytes(&pubkey_bytes) else {
|
||||
return false;
|
||||
};
|
||||
let Ok(signature) = Ed25519Signature::from_bytes(&sig_bytes) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
verify(pubkey, nonce_bytes, signature)
|
||||
}
|
||||
|
||||
// ── Server-side WebSocket handler ───────────────────────────────────
|
||||
@@ -120,60 +54,14 @@ pub fn verify_auth(
|
||||
#[handler]
|
||||
pub async fn crdt_sync_handler(
|
||||
ws: WebSocket,
|
||||
ctx: Data<&Arc<AppContext>>,
|
||||
_ctx: Data<&Arc<AppContext>>,
|
||||
) -> impl poem::IntoResponse {
|
||||
let trusted_keys = load_trusted_keys(&ctx);
|
||||
ws.on_upgrade(move |socket| async move {
|
||||
ws.on_upgrade(|socket| async move {
|
||||
let (mut sink, mut stream) = socket.split();
|
||||
|
||||
slog!("[crdt-sync] Peer connected");
|
||||
|
||||
// ── Auth handshake (when trusted_keys is configured) ────────
|
||||
if !trusted_keys.is_empty() {
|
||||
// Generate random nonce.
|
||||
let nonce_bytes: [u8; CHALLENGE_NONCE_BYTES] = {
|
||||
let mut buf = [0u8; CHALLENGE_NONCE_BYTES];
|
||||
use sha2::Digest;
|
||||
let hash = sha2::Sha256::digest(uuid::Uuid::new_v4().as_bytes());
|
||||
buf.copy_from_slice(&hash);
|
||||
buf
|
||||
};
|
||||
let nonce_hex = hex::encode(nonce_bytes);
|
||||
|
||||
// Send challenge.
|
||||
let challenge = SyncMessage::Challenge { nonce: nonce_hex.clone() };
|
||||
if let Ok(json) = serde_json::to_string(&challenge)
|
||||
&& sink.send(WsMessage::Text(json)).await.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait for auth response (with timeout).
|
||||
let auth_result = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(10),
|
||||
stream.next(),
|
||||
).await;
|
||||
|
||||
let authenticated = match auth_result {
|
||||
Ok(Some(Ok(WsMessage::Text(text)))) => {
|
||||
match serde_json::from_str::<SyncMessage>(&text) {
|
||||
Ok(SyncMessage::Auth { pubkey, signature }) => {
|
||||
verify_auth(&trusted_keys, &nonce_bytes, &pubkey, &signature)
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
|
||||
if !authenticated {
|
||||
slog!("[crdt-sync] Peer failed authentication; disconnecting");
|
||||
return;
|
||||
}
|
||||
slog!("[crdt-sync] Peer authenticated successfully");
|
||||
}
|
||||
|
||||
// ── Bulk state dump + sync loop ─────────────────────────────
|
||||
// Send bulk state dump.
|
||||
if let Some(ops) = crdt_state::all_ops_json() {
|
||||
let msg = SyncMessage::Bulk { ops };
|
||||
if let Ok(json) = serde_json::to_string(&msg)
|
||||
@@ -230,22 +118,6 @@ pub async fn crdt_sync_handler(
|
||||
})
|
||||
}
|
||||
|
||||
/// Apply a bulk set of serialised `SignedOp` JSON strings.
|
||||
fn handle_bulk_ops(ops: &[String]) {
|
||||
let mut applied = 0u64;
|
||||
for op_json in ops {
|
||||
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json)
|
||||
&& crdt_state::apply_remote_op(signed_op)
|
||||
{
|
||||
applied += 1;
|
||||
}
|
||||
}
|
||||
slog!(
|
||||
"[crdt-sync] Bulk sync: received {} ops, applied {applied}",
|
||||
ops.len()
|
||||
);
|
||||
}
|
||||
|
||||
/// Process an incoming text-frame sync message from a peer.
|
||||
///
|
||||
/// Text frames carry the bulk state dump (`SyncMessage::Bulk`) or legacy
|
||||
@@ -261,16 +133,24 @@ fn handle_incoming_text(text: &str) {
|
||||
|
||||
match msg {
|
||||
SyncMessage::Bulk { ops } => {
|
||||
handle_bulk_ops(&ops);
|
||||
let mut applied = 0u64;
|
||||
for op_json in &ops {
|
||||
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json)
|
||||
&& crdt_state::apply_remote_op(signed_op)
|
||||
{
|
||||
applied += 1;
|
||||
}
|
||||
}
|
||||
slog!(
|
||||
"[crdt-sync] Bulk sync: received {} ops, applied {applied}",
|
||||
ops.len()
|
||||
);
|
||||
}
|
||||
SyncMessage::Op { op } => {
|
||||
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(&op) {
|
||||
crdt_state::apply_remote_op(signed_op);
|
||||
}
|
||||
}
|
||||
// Challenge/Auth are only used during the initial handshake,
|
||||
// not during the main sync loop.
|
||||
SyncMessage::Challenge { .. } | SyncMessage::Auth { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -332,13 +212,7 @@ pub fn spawn_rendezvous_client(url: String) {
|
||||
}
|
||||
|
||||
/// Connect to a remote sync endpoint and exchange ops until disconnect.
|
||||
///
|
||||
/// If the remote sends a `Challenge` message, this node responds with an
|
||||
/// `Auth` message signed by its Ed25519 keypair. The remote then verifies
|
||||
/// the signature against its `trusted_keys` list.
|
||||
async fn connect_and_sync(url: &str) -> Result<(), String> {
|
||||
use tokio_tungstenite::tungstenite::Message as TungsteniteMsg;
|
||||
|
||||
let (ws_stream, _) = tokio_tungstenite::connect_async(url)
|
||||
.await
|
||||
.map_err(|e| format!("WebSocket connect failed: {e}"))?;
|
||||
@@ -347,59 +221,11 @@ async fn connect_and_sync(url: &str) -> Result<(), String> {
|
||||
|
||||
slog!("[crdt-sync] Connected to rendezvous peer");
|
||||
|
||||
// ── Handle auth challenge if the remote sends one ───────────
|
||||
// Peek at the first message: if it's a Challenge, respond with Auth.
|
||||
// If it's a Bulk (no auth required), process it immediately.
|
||||
let first_frame = stream
|
||||
.next()
|
||||
.await
|
||||
.ok_or_else(|| "connection closed before first message".to_string())?
|
||||
.map_err(|e| format!("read error on first frame: {e}"))?;
|
||||
|
||||
match &first_frame {
|
||||
TungsteniteMsg::Text(text) => {
|
||||
match serde_json::from_str::<SyncMessage>(text.as_ref()) {
|
||||
Ok(SyncMessage::Challenge { nonce }) => {
|
||||
slog!("[crdt-sync] Received auth challenge from rendezvous peer");
|
||||
let nonce_bytes = hex::decode(&nonce)
|
||||
.map_err(|e| format!("bad challenge nonce hex: {e}"))?;
|
||||
let pubkey_hex = crdt_state::our_node_id()
|
||||
.ok_or_else(|| "CRDT not initialised".to_string())?;
|
||||
let sig_bytes = crdt_state::sign_bytes(&nonce_bytes)
|
||||
.ok_or_else(|| "CRDT not initialised".to_string())?;
|
||||
let auth = SyncMessage::Auth {
|
||||
pubkey: pubkey_hex,
|
||||
signature: hex::encode(sig_bytes),
|
||||
};
|
||||
let json = serde_json::to_string(&auth)
|
||||
.map_err(|e| format!("serialize auth: {e}"))?;
|
||||
sink.send(TungsteniteMsg::Text(json.into()))
|
||||
.await
|
||||
.map_err(|e| format!("send auth failed: {e}"))?;
|
||||
slog!("[crdt-sync] Sent auth response");
|
||||
}
|
||||
Ok(SyncMessage::Bulk { ops }) => {
|
||||
// No auth required — process bulk immediately.
|
||||
handle_bulk_ops(&ops);
|
||||
}
|
||||
_ => {
|
||||
handle_incoming_text(text.as_ref());
|
||||
}
|
||||
}
|
||||
}
|
||||
TungsteniteMsg::Binary(bytes) => {
|
||||
handle_incoming_binary(bytes);
|
||||
}
|
||||
TungsteniteMsg::Close(_) => {
|
||||
return Ok(());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// Send our bulk state.
|
||||
if let Some(ops) = crdt_state::all_ops_json() {
|
||||
let msg = SyncMessage::Bulk { ops };
|
||||
if let Ok(json) = serde_json::to_string(&msg) {
|
||||
use tokio_tungstenite::tungstenite::Message as TungsteniteMsg;
|
||||
sink.send(TungsteniteMsg::Text(json.into()))
|
||||
.await
|
||||
.map_err(|e| format!("Send bulk failed: {e}"))?;
|
||||
@@ -418,6 +244,7 @@ async fn connect_and_sync(url: &str) -> Result<(), String> {
|
||||
Ok(signed_op) => {
|
||||
// Encode via wire codec and send as binary frame.
|
||||
let bytes = crdt_wire::encode(&signed_op);
|
||||
use tokio_tungstenite::tungstenite::Message as TungsteniteMsg;
|
||||
if sink.send(TungsteniteMsg::Binary(bytes.into())).await.is_err() {
|
||||
break;
|
||||
}
|
||||
@@ -431,13 +258,13 @@ async fn connect_and_sync(url: &str) -> Result<(), String> {
|
||||
}
|
||||
frame = stream.next() => {
|
||||
match frame {
|
||||
Some(Ok(TungsteniteMsg::Text(text))) => {
|
||||
Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) => {
|
||||
handle_incoming_text(text.as_ref());
|
||||
}
|
||||
Some(Ok(TungsteniteMsg::Binary(bytes))) => {
|
||||
Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(bytes))) => {
|
||||
handle_incoming_binary(&bytes);
|
||||
}
|
||||
Some(Ok(TungsteniteMsg::Close(_))) | None => break,
|
||||
Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => break,
|
||||
Some(Err(e)) => {
|
||||
slog!("[crdt-sync] Rendezvous read error: {e}");
|
||||
break;
|
||||
@@ -1698,185 +1525,4 @@ name = "test"
|
||||
let view_b = serde_json::to_string(&CrdtNode::view(&crdt_b.doc.items)).unwrap();
|
||||
assert_eq!(view_a, view_b, "Both nodes must converge to identical state");
|
||||
}
|
||||
|
||||
// ── Auth protocol tests ─────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn sync_message_challenge_serialization_roundtrip() {
|
||||
let msg = SyncMessage::Challenge {
|
||||
nonce: "abcd1234".to_string(),
|
||||
};
|
||||
let json = serde_json::to_string(&msg).unwrap();
|
||||
assert!(json.contains(r#""type":"challenge""#));
|
||||
let deserialized: SyncMessage = serde_json::from_str(&json).unwrap();
|
||||
match deserialized {
|
||||
SyncMessage::Challenge { nonce } => assert_eq!(nonce, "abcd1234"),
|
||||
_ => panic!("Expected Challenge"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync_message_auth_serialization_roundtrip() {
|
||||
let msg = SyncMessage::Auth {
|
||||
pubkey: "deadbeef".to_string(),
|
||||
signature: "cafebabe".to_string(),
|
||||
};
|
||||
let json = serde_json::to_string(&msg).unwrap();
|
||||
assert!(json.contains(r#""type":"auth""#));
|
||||
let deserialized: SyncMessage = serde_json::from_str(&json).unwrap();
|
||||
match deserialized {
|
||||
SyncMessage::Auth { pubkey, signature } => {
|
||||
assert_eq!(pubkey, "deadbeef");
|
||||
assert_eq!(signature, "cafebabe");
|
||||
}
|
||||
_ => panic!("Expected Auth"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_auth_accepts_valid_signature() {
|
||||
use bft_json_crdt::keypair::{make_keypair, sign};
|
||||
use fastcrypto::traits::KeyPair;
|
||||
|
||||
let kp = make_keypair();
|
||||
let pubkey_hex = hex::encode(kp.public().as_ref());
|
||||
let nonce = b"test-challenge-nonce-1234567890ab";
|
||||
|
||||
let sig = sign(&kp, nonce);
|
||||
let sig_hex = hex::encode(sig.as_ref());
|
||||
|
||||
let trusted = vec![pubkey_hex.clone()];
|
||||
assert!(
|
||||
super::verify_auth(&trusted, nonce, &pubkey_hex, &sig_hex),
|
||||
"Valid signature from trusted key must be accepted"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_auth_rejects_untrusted_key() {
|
||||
use bft_json_crdt::keypair::{make_keypair, sign};
|
||||
use fastcrypto::traits::KeyPair;
|
||||
|
||||
let kp = make_keypair();
|
||||
let pubkey_hex = hex::encode(kp.public().as_ref());
|
||||
let nonce = b"test-challenge-nonce-1234567890ab";
|
||||
|
||||
let sig = sign(&kp, nonce);
|
||||
let sig_hex = hex::encode(sig.as_ref());
|
||||
|
||||
// Trusted list does NOT contain this key.
|
||||
let trusted = vec!["aaaa".repeat(16)];
|
||||
assert!(
|
||||
!super::verify_auth(&trusted, nonce, &pubkey_hex, &sig_hex),
|
||||
"Signature from untrusted key must be rejected"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_auth_rejects_wrong_signature() {
|
||||
use bft_json_crdt::keypair::make_keypair;
|
||||
use fastcrypto::traits::KeyPair;
|
||||
|
||||
let kp = make_keypair();
|
||||
let pubkey_hex = hex::encode(kp.public().as_ref());
|
||||
let nonce = b"test-challenge-nonce-1234567890ab";
|
||||
|
||||
// Fabricate an invalid signature (all zeros, 64 bytes).
|
||||
let bad_sig_hex = "00".repeat(64);
|
||||
|
||||
let trusted = vec![pubkey_hex.clone()];
|
||||
assert!(
|
||||
!super::verify_auth(&trusted, nonce, &pubkey_hex, &bad_sig_hex),
|
||||
"Invalid signature must be rejected even if key is trusted"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_auth_rejects_wrong_nonce() {
|
||||
use bft_json_crdt::keypair::{make_keypair, sign};
|
||||
use fastcrypto::traits::KeyPair;
|
||||
|
||||
let kp = make_keypair();
|
||||
let pubkey_hex = hex::encode(kp.public().as_ref());
|
||||
|
||||
// Sign one nonce but verify against a different one.
|
||||
let nonce_a = b"nonce-aaaaaaaaaaaaaaaaaaaaaaaaaaaa";
|
||||
let nonce_b = b"nonce-bbbbbbbbbbbbbbbbbbbbbbbbbbbb";
|
||||
|
||||
let sig = sign(&kp, nonce_a);
|
||||
let sig_hex = hex::encode(sig.as_ref());
|
||||
|
||||
let trusted = vec![pubkey_hex.clone()];
|
||||
assert!(
|
||||
!super::verify_auth(&trusted, nonce_b, &pubkey_hex, &sig_hex),
|
||||
"Signature for wrong nonce must be rejected"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_auth_rejects_malformed_hex() {
|
||||
let trusted = vec!["abcd".to_string()];
|
||||
assert!(!super::verify_auth(&trusted, b"nonce", "abcd", "not-hex!!"));
|
||||
assert!(!super::verify_auth(&trusted, b"nonce", "not-hex!!", "abcd"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_auth_empty_trusted_keys_rejects_all() {
|
||||
use bft_json_crdt::keypair::{make_keypair, sign};
|
||||
use fastcrypto::traits::KeyPair;
|
||||
|
||||
let kp = make_keypair();
|
||||
let pubkey_hex = hex::encode(kp.public().as_ref());
|
||||
let nonce = b"test-nonce-12345678901234567890ab";
|
||||
let sig = sign(&kp, nonce);
|
||||
let sig_hex = hex::encode(sig.as_ref());
|
||||
|
||||
let trusted: Vec<String> = vec![];
|
||||
assert!(
|
||||
!super::verify_auth(&trusted, nonce, &pubkey_hex, &sig_hex),
|
||||
"Empty trusted_keys must reject all auth attempts"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn config_trusted_keys_parsed_from_toml() {
|
||||
let toml_str = r#"
|
||||
trusted_keys = [
|
||||
"aabbccdd00112233445566778899aabbccddeeff00112233445566778899aabb",
|
||||
"11223344556677889900aabbccddeeff11223344556677889900aabbccddeeff",
|
||||
]
|
||||
|
||||
[[agent]]
|
||||
name = "test"
|
||||
"#;
|
||||
let config: crate::config::ProjectConfig = toml::from_str(toml_str).unwrap();
|
||||
assert_eq!(config.trusted_keys.len(), 2);
|
||||
assert_eq!(
|
||||
config.trusted_keys[0],
|
||||
"aabbccdd00112233445566778899aabbccddeeff00112233445566778899aabb"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn config_trusted_keys_defaults_to_empty() {
|
||||
let config = crate::config::ProjectConfig::default();
|
||||
assert!(config.trusted_keys.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn handle_incoming_text_ignores_challenge_and_auth_messages() {
|
||||
// Challenge and Auth messages in the sync loop should be silently ignored.
|
||||
let challenge = SyncMessage::Challenge {
|
||||
nonce: "abc123".to_string(),
|
||||
};
|
||||
let json = serde_json::to_string(&challenge).unwrap();
|
||||
handle_incoming_text(&json); // must not panic
|
||||
|
||||
let auth = SyncMessage::Auth {
|
||||
pubkey: "dead".to_string(),
|
||||
signature: "beef".to_string(),
|
||||
};
|
||||
let json = serde_json::to_string(&auth).unwrap();
|
||||
handle_incoming_text(&json); // must not panic
|
||||
}
|
||||
}
|
||||
|
||||
@@ -158,13 +158,19 @@ pub(super) async fn tool_status(args: &Value, ctx: &AppContext) -> Result<String
|
||||
|
||||
let root = ctx.state.get_project_root()?;
|
||||
|
||||
// Read from CRDT/DB content store — works for stories in any pipeline stage.
|
||||
let _typed_item = crate::pipeline_state::read_typed(story_id)
|
||||
// Read from CRDT/DB content store — verify the item is in 2_current.
|
||||
let typed_item = crate::pipeline_state::read_typed(story_id)
|
||||
.map_err(|e| format!("Failed to read pipeline state: {e}"))?
|
||||
.ok_or_else(|| format!(
|
||||
"Story '{story_id}' not found in the pipeline."
|
||||
"Story '{story_id}' not found in work/2_current/. Check the story_id and ensure it is in the current stage."
|
||||
))?;
|
||||
|
||||
if typed_item.stage.dir_name() != "2_current" {
|
||||
return Err(format!(
|
||||
"Story '{story_id}' not found in work/2_current/. Check the story_id and ensure it is in the current stage."
|
||||
));
|
||||
}
|
||||
|
||||
let contents = crate::db::read_content(story_id).ok_or_else(|| {
|
||||
format!("Story '{story_id}' has no content in the content store.")
|
||||
})?;
|
||||
@@ -328,7 +334,7 @@ mod tests {
|
||||
let ctx = crate::http::context::AppContext::new_test(tmp.path().to_path_buf());
|
||||
let result = tool_status(&json!({"story_id": "999_story_nonexistent"}), &ctx).await;
|
||||
assert!(result.is_err());
|
||||
assert!(result.unwrap_err().contains("not found in the pipeline"));
|
||||
assert!(result.unwrap_err().contains("not found in work/2_current/"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -356,22 +362,4 @@ mod tests {
|
||||
assert_eq!(ac[1]["text"], "Second criterion");
|
||||
assert_eq!(ac[1]["checked"], true);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn tool_status_works_for_story_in_backlog() {
|
||||
let tmp = tempdir().unwrap();
|
||||
|
||||
crate::db::ensure_content_store();
|
||||
let story_content = "---\nname: Backlog Story\n---\n\n## Acceptance Criteria\n\n- [ ] One thing\n";
|
||||
crate::db::write_item_with_content("9887_story_backlog_test", "1_backlog", story_content);
|
||||
|
||||
let ctx = crate::http::context::AppContext::new_test(tmp.path().to_path_buf());
|
||||
let result = tool_status(&json!({"story_id": "9887_story_backlog_test"}), &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
let parsed: serde_json::Value = serde_json::from_str(&result).unwrap();
|
||||
|
||||
assert_eq!(parsed["story_id"], "9887_story_backlog_test");
|
||||
assert_eq!(parsed["front_matter"]["name"], "Backlog Story");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -337,11 +337,6 @@ async fn main() -> Result<(), std::io::Error> {
|
||||
}
|
||||
}
|
||||
|
||||
// Log the node's public key so operators can add it to trusted_keys.
|
||||
if let Some(node_id) = crdt_state::our_node_id() {
|
||||
slog!("[crdt] Node public key: {node_id}");
|
||||
}
|
||||
|
||||
// (CRDT state layer is initialised above alongside the legacy pipeline.db.)
|
||||
|
||||
// Start the CRDT sync rendezvous client if configured in project.toml.
|
||||
|
||||
@@ -529,7 +529,6 @@ mod tests {
|
||||
rate_limit_notifications: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
};
|
||||
// Should complete without panic
|
||||
run_setup_commands(tmp.path(), &config).await;
|
||||
@@ -555,7 +554,6 @@ mod tests {
|
||||
rate_limit_notifications: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
};
|
||||
// Should complete without panic
|
||||
run_setup_commands(tmp.path(), &config).await;
|
||||
@@ -581,7 +579,6 @@ mod tests {
|
||||
rate_limit_notifications: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
};
|
||||
// Setup command failures are non-fatal — should not panic or propagate
|
||||
run_setup_commands(tmp.path(), &config).await;
|
||||
@@ -607,7 +604,6 @@ mod tests {
|
||||
rate_limit_notifications: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
};
|
||||
// Teardown failures are best-effort — should not propagate
|
||||
assert!(run_teardown_commands(tmp.path(), &config).await.is_ok());
|
||||
@@ -632,7 +628,6 @@ mod tests {
|
||||
rate_limit_notifications: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
};
|
||||
let info = create_worktree(&project_root, "42_fresh_test", &config, 3001)
|
||||
.await
|
||||
@@ -664,7 +659,6 @@ mod tests {
|
||||
rate_limit_notifications: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
};
|
||||
// First creation
|
||||
let _info1 = create_worktree(&project_root, "43_reuse_test", &config, 3001)
|
||||
@@ -737,7 +731,6 @@ mod tests {
|
||||
rate_limit_notifications: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
};
|
||||
|
||||
let result = remove_worktree_by_story_id(tmp.path(), "99_nonexistent", &config).await;
|
||||
@@ -768,7 +761,6 @@ mod tests {
|
||||
rate_limit_notifications: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
};
|
||||
create_worktree(&project_root, "88_remove_by_id", &config, 3001)
|
||||
.await
|
||||
@@ -846,7 +838,6 @@ mod tests {
|
||||
rate_limit_notifications: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
};
|
||||
// Even though setup commands fail, create_worktree must succeed
|
||||
// so the agent can start and fix the problem itself.
|
||||
@@ -880,7 +871,6 @@ mod tests {
|
||||
rate_limit_notifications: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
};
|
||||
// First creation — no setup commands, should succeed
|
||||
create_worktree(&project_root, "173_reuse_fail", &empty_config, 3001)
|
||||
@@ -904,7 +894,6 @@ mod tests {
|
||||
rate_limit_notifications: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
};
|
||||
// Second call — worktree exists, setup commands fail, must still succeed
|
||||
let result = create_worktree(&project_root, "173_reuse_fail", &failing_config, 3002).await;
|
||||
@@ -934,7 +923,6 @@ mod tests {
|
||||
rate_limit_notifications: true,
|
||||
timezone: None,
|
||||
rendezvous: None,
|
||||
trusted_keys: Vec::new(),
|
||||
};
|
||||
let info = create_worktree(&project_root, "77_remove_async", &config, 3001)
|
||||
.await
|
||||
|
||||
Reference in New Issue
Block a user