huskies: merge 480_story_cryptographic_node_auth_for_distributed_mesh

This commit is contained in:
dave
2026-04-10 19:14:21 +00:00
parent 40893a8cb1
commit 2e0ed98d42
7 changed files with 418 additions and 21 deletions
+1
View File
@@ -42,6 +42,7 @@ sqlx = { workspace = true }
wait-timeout = "0.2.1"
bft-json-crdt = { path = "../crates/bft-json-crdt", default-features = false, features = ["bft"] }
fastcrypto = "0.1.8"
hex = "0.4"
indexmap = { version = "2.2.6", features = ["serde"] }
[target.'cfg(unix)'.dependencies]
+11
View File
@@ -53,6 +53,13 @@ pub struct ProjectConfig {
/// so both machines see the same pipeline state in real-time.
#[serde(default)]
pub rendezvous: Option<String>,
/// List of hex-encoded Ed25519 public keys of trusted nodes.
/// When non-empty, only nodes whose public key is in this list may
/// connect via the CRDT sync WebSocket. Nodes authenticate by signing
/// a random challenge with their private key.
/// When empty (default), the mesh is open — any node may connect.
#[serde(default)]
pub trusted_keys: Vec<String>,
}
/// Configuration for the filesystem watcher's sweep behaviour.
@@ -228,6 +235,7 @@ impl Default for ProjectConfig {
rate_limit_notifications: default_rate_limit_notifications(),
timezone: None,
rendezvous: None,
trusted_keys: Vec::new(),
}
}
}
@@ -304,6 +312,7 @@ impl ProjectConfig {
rate_limit_notifications: legacy.rate_limit_notifications,
timezone: legacy.timezone,
rendezvous: None,
trusted_keys: Vec::new(),
};
validate_agents(&config.agent)?;
return Ok(config);
@@ -332,6 +341,7 @@ impl ProjectConfig {
rate_limit_notifications: legacy.rate_limit_notifications,
timezone: legacy.timezone,
rendezvous: None,
trusted_keys: Vec::new(),
};
validate_agents(&config.agent)?;
Ok(config)
@@ -348,6 +358,7 @@ impl ProjectConfig {
rate_limit_notifications: legacy.rate_limit_notifications,
timezone: legacy.timezone,
rendezvous: None,
trusted_keys: Vec::new(),
})
}
}
+13
View File
@@ -600,6 +600,19 @@ pub fn our_node_id() -> Option<String> {
Some(hex::encode(&state.crdt.id))
}
/// Sign a byte slice with this node's Ed25519 private key.
///
/// Used by the CRDT sync auth handshake: when a remote peer sends a
/// challenge nonce, this node signs it to prove possession of the
/// private key corresponding to its public node ID.
/// Returns `None` before `init()`.
pub fn sign_bytes(message: &[u8]) -> Option<Vec<u8>> {
use bft_json_crdt::keypair::sign;
let state = CRDT_STATE.get()?.lock().ok()?;
let sig = sign(&state.keypair, message);
Some(sig.as_ref().to_vec())
}
/// Write a claim on a pipeline item via CRDT.
///
/// Sets `claimed_by` to this node's ID and `claimed_at` to the current time.
+375 -21
View File
@@ -3,7 +3,17 @@
///
/// # Protocol
///
/// The sync protocol is a hybrid of two frame types:
/// ## Authentication (optional)
///
/// When `trusted_keys` is configured in `project.toml`, nodes authenticate
/// on WebSocket connect via an Ed25519 challenge-response handshake:
///
/// 1. Server sends `{"type":"challenge","nonce":"<hex>"}` (32 random bytes).
/// 2. Client responds `{"type":"auth","pubkey":"<hex>","signature":"<hex>"}`.
/// 3. Server verifies the signature and checks the pubkey is in `trusted_keys`.
/// 4. On failure the connection is closed immediately.
///
/// When `trusted_keys` is empty (default), auth is skipped — the mesh is open.
///
/// ## Text frames (bulk initial state)
/// A JSON object with a `"type"` field:
@@ -47,6 +57,62 @@ enum SyncMessage {
Bulk { ops: Vec<String> },
/// A single new op.
Op { op: String },
/// Challenge sent by the server when `trusted_keys` is configured.
Challenge { nonce: String },
/// Auth response from the client: pubkey + signature over the nonce.
Auth {
pubkey: String,
signature: String,
},
}
/// Size of the random challenge nonce in bytes (hex-encoded to 64 chars).
pub const CHALLENGE_NONCE_BYTES: usize = 32;
/// Load the list of trusted Ed25519 public keys from the project config.
///
/// Returns an empty vec when no project root is set or no keys are configured.
fn load_trusted_keys(ctx: &AppContext) -> Vec<String> {
let root = ctx.state.project_root.lock().unwrap().clone();
let Some(root) = root else { return Vec::new() };
crate::config::ProjectConfig::load(&root)
.map(|cfg| cfg.trusted_keys)
.unwrap_or_default()
}
/// Verify an auth response against the challenge nonce and trusted keys list.
///
/// Returns `true` if: the pubkey is in `trusted_keys` AND the signature is a
/// valid Ed25519 signature of `nonce_bytes` by the claimed pubkey.
pub fn verify_auth(
trusted_keys: &[String],
nonce_bytes: &[u8],
pubkey_hex: &str,
signature_hex: &str,
) -> bool {
use bft_json_crdt::keypair::{Ed25519PublicKey, Ed25519Signature, verify};
use fastcrypto::traits::ToFromBytes;
// Check the pubkey is trusted.
if !trusted_keys.iter().any(|k| k == pubkey_hex) {
return false;
}
// Decode pubkey and signature from hex.
let Ok(pubkey_bytes) = hex::decode(pubkey_hex) else {
return false;
};
let Ok(sig_bytes) = hex::decode(signature_hex) else {
return false;
};
let Ok(pubkey) = Ed25519PublicKey::from_bytes(&pubkey_bytes) else {
return false;
};
let Ok(signature) = Ed25519Signature::from_bytes(&sig_bytes) else {
return false;
};
verify(pubkey, nonce_bytes, signature)
}
// ── Server-side WebSocket handler ───────────────────────────────────
@@ -54,14 +120,60 @@ enum SyncMessage {
#[handler]
pub async fn crdt_sync_handler(
ws: WebSocket,
_ctx: Data<&Arc<AppContext>>,
ctx: Data<&Arc<AppContext>>,
) -> impl poem::IntoResponse {
ws.on_upgrade(|socket| async move {
let trusted_keys = load_trusted_keys(&ctx);
ws.on_upgrade(move |socket| async move {
let (mut sink, mut stream) = socket.split();
slog!("[crdt-sync] Peer connected");
// Send bulk state dump.
// ── Auth handshake (when trusted_keys is configured) ────────
if !trusted_keys.is_empty() {
// Generate random nonce.
let nonce_bytes: [u8; CHALLENGE_NONCE_BYTES] = {
let mut buf = [0u8; CHALLENGE_NONCE_BYTES];
use sha2::Digest;
let hash = sha2::Sha256::digest(uuid::Uuid::new_v4().as_bytes());
buf.copy_from_slice(&hash);
buf
};
let nonce_hex = hex::encode(nonce_bytes);
// Send challenge.
let challenge = SyncMessage::Challenge { nonce: nonce_hex.clone() };
if let Ok(json) = serde_json::to_string(&challenge)
&& sink.send(WsMessage::Text(json)).await.is_err()
{
return;
}
// Wait for auth response (with timeout).
let auth_result = tokio::time::timeout(
std::time::Duration::from_secs(10),
stream.next(),
).await;
let authenticated = match auth_result {
Ok(Some(Ok(WsMessage::Text(text)))) => {
match serde_json::from_str::<SyncMessage>(&text) {
Ok(SyncMessage::Auth { pubkey, signature }) => {
verify_auth(&trusted_keys, &nonce_bytes, &pubkey, &signature)
}
_ => false,
}
}
_ => false,
};
if !authenticated {
slog!("[crdt-sync] Peer failed authentication; disconnecting");
return;
}
slog!("[crdt-sync] Peer authenticated successfully");
}
// ── Bulk state dump + sync loop ─────────────────────────────
if let Some(ops) = crdt_state::all_ops_json() {
let msg = SyncMessage::Bulk { ops };
if let Ok(json) = serde_json::to_string(&msg)
@@ -118,6 +230,22 @@ pub async fn crdt_sync_handler(
})
}
/// Apply a bulk set of serialised `SignedOp` JSON strings.
fn handle_bulk_ops(ops: &[String]) {
let mut applied = 0u64;
for op_json in ops {
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json)
&& crdt_state::apply_remote_op(signed_op)
{
applied += 1;
}
}
slog!(
"[crdt-sync] Bulk sync: received {} ops, applied {applied}",
ops.len()
);
}
/// Process an incoming text-frame sync message from a peer.
///
/// Text frames carry the bulk state dump (`SyncMessage::Bulk`) or legacy
@@ -133,24 +261,16 @@ fn handle_incoming_text(text: &str) {
match msg {
SyncMessage::Bulk { ops } => {
let mut applied = 0u64;
for op_json in &ops {
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json)
&& crdt_state::apply_remote_op(signed_op)
{
applied += 1;
}
}
slog!(
"[crdt-sync] Bulk sync: received {} ops, applied {applied}",
ops.len()
);
handle_bulk_ops(&ops);
}
SyncMessage::Op { op } => {
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(&op) {
crdt_state::apply_remote_op(signed_op);
}
}
// Challenge/Auth are only used during the initial handshake,
// not during the main sync loop.
SyncMessage::Challenge { .. } | SyncMessage::Auth { .. } => {}
}
}
@@ -212,7 +332,13 @@ pub fn spawn_rendezvous_client(url: String) {
}
/// Connect to a remote sync endpoint and exchange ops until disconnect.
///
/// If the remote sends a `Challenge` message, this node responds with an
/// `Auth` message signed by its Ed25519 keypair. The remote then verifies
/// the signature against its `trusted_keys` list.
async fn connect_and_sync(url: &str) -> Result<(), String> {
use tokio_tungstenite::tungstenite::Message as TungsteniteMsg;
let (ws_stream, _) = tokio_tungstenite::connect_async(url)
.await
.map_err(|e| format!("WebSocket connect failed: {e}"))?;
@@ -221,11 +347,59 @@ async fn connect_and_sync(url: &str) -> Result<(), String> {
slog!("[crdt-sync] Connected to rendezvous peer");
// ── Handle auth challenge if the remote sends one ───────────
// Peek at the first message: if it's a Challenge, respond with Auth.
// If it's a Bulk (no auth required), process it immediately.
let first_frame = stream
.next()
.await
.ok_or_else(|| "connection closed before first message".to_string())?
.map_err(|e| format!("read error on first frame: {e}"))?;
match &first_frame {
TungsteniteMsg::Text(text) => {
match serde_json::from_str::<SyncMessage>(text.as_ref()) {
Ok(SyncMessage::Challenge { nonce }) => {
slog!("[crdt-sync] Received auth challenge from rendezvous peer");
let nonce_bytes = hex::decode(&nonce)
.map_err(|e| format!("bad challenge nonce hex: {e}"))?;
let pubkey_hex = crdt_state::our_node_id()
.ok_or_else(|| "CRDT not initialised".to_string())?;
let sig_bytes = crdt_state::sign_bytes(&nonce_bytes)
.ok_or_else(|| "CRDT not initialised".to_string())?;
let auth = SyncMessage::Auth {
pubkey: pubkey_hex,
signature: hex::encode(sig_bytes),
};
let json = serde_json::to_string(&auth)
.map_err(|e| format!("serialize auth: {e}"))?;
sink.send(TungsteniteMsg::Text(json.into()))
.await
.map_err(|e| format!("send auth failed: {e}"))?;
slog!("[crdt-sync] Sent auth response");
}
Ok(SyncMessage::Bulk { ops }) => {
// No auth required — process bulk immediately.
handle_bulk_ops(&ops);
}
_ => {
handle_incoming_text(text.as_ref());
}
}
}
TungsteniteMsg::Binary(bytes) => {
handle_incoming_binary(bytes);
}
TungsteniteMsg::Close(_) => {
return Ok(());
}
_ => {}
}
// Send our bulk state.
if let Some(ops) = crdt_state::all_ops_json() {
let msg = SyncMessage::Bulk { ops };
if let Ok(json) = serde_json::to_string(&msg) {
use tokio_tungstenite::tungstenite::Message as TungsteniteMsg;
sink.send(TungsteniteMsg::Text(json.into()))
.await
.map_err(|e| format!("Send bulk failed: {e}"))?;
@@ -244,7 +418,6 @@ async fn connect_and_sync(url: &str) -> Result<(), String> {
Ok(signed_op) => {
// Encode via wire codec and send as binary frame.
let bytes = crdt_wire::encode(&signed_op);
use tokio_tungstenite::tungstenite::Message as TungsteniteMsg;
if sink.send(TungsteniteMsg::Binary(bytes.into())).await.is_err() {
break;
}
@@ -258,13 +431,13 @@ async fn connect_and_sync(url: &str) -> Result<(), String> {
}
frame = stream.next() => {
match frame {
Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) => {
Some(Ok(TungsteniteMsg::Text(text))) => {
handle_incoming_text(text.as_ref());
}
Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(bytes))) => {
Some(Ok(TungsteniteMsg::Binary(bytes))) => {
handle_incoming_binary(&bytes);
}
Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => break,
Some(Ok(TungsteniteMsg::Close(_))) | None => break,
Some(Err(e)) => {
slog!("[crdt-sync] Rendezvous read error: {e}");
break;
@@ -1525,4 +1698,185 @@ name = "test"
let view_b = serde_json::to_string(&CrdtNode::view(&crdt_b.doc.items)).unwrap();
assert_eq!(view_a, view_b, "Both nodes must converge to identical state");
}
// ── Auth protocol tests ─────────────────────────────────────────────
#[test]
fn sync_message_challenge_serialization_roundtrip() {
let msg = SyncMessage::Challenge {
nonce: "abcd1234".to_string(),
};
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains(r#""type":"challenge""#));
let deserialized: SyncMessage = serde_json::from_str(&json).unwrap();
match deserialized {
SyncMessage::Challenge { nonce } => assert_eq!(nonce, "abcd1234"),
_ => panic!("Expected Challenge"),
}
}
#[test]
fn sync_message_auth_serialization_roundtrip() {
let msg = SyncMessage::Auth {
pubkey: "deadbeef".to_string(),
signature: "cafebabe".to_string(),
};
let json = serde_json::to_string(&msg).unwrap();
assert!(json.contains(r#""type":"auth""#));
let deserialized: SyncMessage = serde_json::from_str(&json).unwrap();
match deserialized {
SyncMessage::Auth { pubkey, signature } => {
assert_eq!(pubkey, "deadbeef");
assert_eq!(signature, "cafebabe");
}
_ => panic!("Expected Auth"),
}
}
#[test]
fn verify_auth_accepts_valid_signature() {
use bft_json_crdt::keypair::{make_keypair, sign};
use fastcrypto::traits::KeyPair;
let kp = make_keypair();
let pubkey_hex = hex::encode(kp.public().as_ref());
let nonce = b"test-challenge-nonce-1234567890ab";
let sig = sign(&kp, nonce);
let sig_hex = hex::encode(sig.as_ref());
let trusted = vec![pubkey_hex.clone()];
assert!(
super::verify_auth(&trusted, nonce, &pubkey_hex, &sig_hex),
"Valid signature from trusted key must be accepted"
);
}
#[test]
fn verify_auth_rejects_untrusted_key() {
use bft_json_crdt::keypair::{make_keypair, sign};
use fastcrypto::traits::KeyPair;
let kp = make_keypair();
let pubkey_hex = hex::encode(kp.public().as_ref());
let nonce = b"test-challenge-nonce-1234567890ab";
let sig = sign(&kp, nonce);
let sig_hex = hex::encode(sig.as_ref());
// Trusted list does NOT contain this key.
let trusted = vec!["aaaa".repeat(16)];
assert!(
!super::verify_auth(&trusted, nonce, &pubkey_hex, &sig_hex),
"Signature from untrusted key must be rejected"
);
}
#[test]
fn verify_auth_rejects_wrong_signature() {
use bft_json_crdt::keypair::make_keypair;
use fastcrypto::traits::KeyPair;
let kp = make_keypair();
let pubkey_hex = hex::encode(kp.public().as_ref());
let nonce = b"test-challenge-nonce-1234567890ab";
// Fabricate an invalid signature (all zeros, 64 bytes).
let bad_sig_hex = "00".repeat(64);
let trusted = vec![pubkey_hex.clone()];
assert!(
!super::verify_auth(&trusted, nonce, &pubkey_hex, &bad_sig_hex),
"Invalid signature must be rejected even if key is trusted"
);
}
#[test]
fn verify_auth_rejects_wrong_nonce() {
use bft_json_crdt::keypair::{make_keypair, sign};
use fastcrypto::traits::KeyPair;
let kp = make_keypair();
let pubkey_hex = hex::encode(kp.public().as_ref());
// Sign one nonce but verify against a different one.
let nonce_a = b"nonce-aaaaaaaaaaaaaaaaaaaaaaaaaaaa";
let nonce_b = b"nonce-bbbbbbbbbbbbbbbbbbbbbbbbbbbb";
let sig = sign(&kp, nonce_a);
let sig_hex = hex::encode(sig.as_ref());
let trusted = vec![pubkey_hex.clone()];
assert!(
!super::verify_auth(&trusted, nonce_b, &pubkey_hex, &sig_hex),
"Signature for wrong nonce must be rejected"
);
}
#[test]
fn verify_auth_rejects_malformed_hex() {
let trusted = vec!["abcd".to_string()];
assert!(!super::verify_auth(&trusted, b"nonce", "abcd", "not-hex!!"));
assert!(!super::verify_auth(&trusted, b"nonce", "not-hex!!", "abcd"));
}
#[test]
fn verify_auth_empty_trusted_keys_rejects_all() {
use bft_json_crdt::keypair::{make_keypair, sign};
use fastcrypto::traits::KeyPair;
let kp = make_keypair();
let pubkey_hex = hex::encode(kp.public().as_ref());
let nonce = b"test-nonce-12345678901234567890ab";
let sig = sign(&kp, nonce);
let sig_hex = hex::encode(sig.as_ref());
let trusted: Vec<String> = vec![];
assert!(
!super::verify_auth(&trusted, nonce, &pubkey_hex, &sig_hex),
"Empty trusted_keys must reject all auth attempts"
);
}
#[test]
fn config_trusted_keys_parsed_from_toml() {
let toml_str = r#"
trusted_keys = [
"aabbccdd00112233445566778899aabbccddeeff00112233445566778899aabb",
"11223344556677889900aabbccddeeff11223344556677889900aabbccddeeff",
]
[[agent]]
name = "test"
"#;
let config: crate::config::ProjectConfig = toml::from_str(toml_str).unwrap();
assert_eq!(config.trusted_keys.len(), 2);
assert_eq!(
config.trusted_keys[0],
"aabbccdd00112233445566778899aabbccddeeff00112233445566778899aabb"
);
}
#[test]
fn config_trusted_keys_defaults_to_empty() {
let config = crate::config::ProjectConfig::default();
assert!(config.trusted_keys.is_empty());
}
#[test]
fn handle_incoming_text_ignores_challenge_and_auth_messages() {
// Challenge and Auth messages in the sync loop should be silently ignored.
let challenge = SyncMessage::Challenge {
nonce: "abc123".to_string(),
};
let json = serde_json::to_string(&challenge).unwrap();
handle_incoming_text(&json); // must not panic
let auth = SyncMessage::Auth {
pubkey: "dead".to_string(),
signature: "beef".to_string(),
};
let json = serde_json::to_string(&auth).unwrap();
handle_incoming_text(&json); // must not panic
}
}
+5
View File
@@ -337,6 +337,11 @@ async fn main() -> Result<(), std::io::Error> {
}
}
// Log the node's public key so operators can add it to trusted_keys.
if let Some(node_id) = crdt_state::our_node_id() {
slog!("[crdt] Node public key: {node_id}");
}
// (CRDT state layer is initialised above alongside the legacy pipeline.db.)
// Start the CRDT sync rendezvous client if configured in project.toml.
+12
View File
@@ -529,6 +529,7 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
trusted_keys: Vec::new(),
};
// Should complete without panic
run_setup_commands(tmp.path(), &config).await;
@@ -554,6 +555,7 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
trusted_keys: Vec::new(),
};
// Should complete without panic
run_setup_commands(tmp.path(), &config).await;
@@ -579,6 +581,7 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
trusted_keys: Vec::new(),
};
// Setup command failures are non-fatal — should not panic or propagate
run_setup_commands(tmp.path(), &config).await;
@@ -604,6 +607,7 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
trusted_keys: Vec::new(),
};
// Teardown failures are best-effort — should not propagate
assert!(run_teardown_commands(tmp.path(), &config).await.is_ok());
@@ -628,6 +632,7 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
trusted_keys: Vec::new(),
};
let info = create_worktree(&project_root, "42_fresh_test", &config, 3001)
.await
@@ -659,6 +664,7 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
trusted_keys: Vec::new(),
};
// First creation
let _info1 = create_worktree(&project_root, "43_reuse_test", &config, 3001)
@@ -731,6 +737,7 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
trusted_keys: Vec::new(),
};
let result = remove_worktree_by_story_id(tmp.path(), "99_nonexistent", &config).await;
@@ -761,6 +768,7 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
trusted_keys: Vec::new(),
};
create_worktree(&project_root, "88_remove_by_id", &config, 3001)
.await
@@ -838,6 +846,7 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
trusted_keys: Vec::new(),
};
// Even though setup commands fail, create_worktree must succeed
// so the agent can start and fix the problem itself.
@@ -871,6 +880,7 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
trusted_keys: Vec::new(),
};
// First creation — no setup commands, should succeed
create_worktree(&project_root, "173_reuse_fail", &empty_config, 3001)
@@ -894,6 +904,7 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
trusted_keys: Vec::new(),
};
// Second call — worktree exists, setup commands fail, must still succeed
let result = create_worktree(&project_root, "173_reuse_fail", &failing_config, 3002).await;
@@ -923,6 +934,7 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
trusted_keys: Vec::new(),
};
let info = create_worktree(&project_root, "77_remove_async", &config, 3001)
.await