diff --git a/Cargo.lock b/Cargo.lock index 1b034e9a..ba345617 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2309,6 +2309,7 @@ dependencies = [ "fastcrypto", "filetime", "futures", + "hex", "homedir", "ignore", "indexmap 2.13.1", diff --git a/server/Cargo.toml b/server/Cargo.toml index d3cf2867..f92e5288 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -42,6 +42,7 @@ sqlx = { workspace = true } wait-timeout = "0.2.1" bft-json-crdt = { path = "../crates/bft-json-crdt", default-features = false, features = ["bft"] } fastcrypto = "0.1.8" +hex = "0.4" indexmap = { version = "2.2.6", features = ["serde"] } [target.'cfg(unix)'.dependencies] diff --git a/server/src/config.rs b/server/src/config.rs index 2b5f6025..c05972e6 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -53,6 +53,13 @@ pub struct ProjectConfig { /// so both machines see the same pipeline state in real-time. #[serde(default)] pub rendezvous: Option, + /// List of hex-encoded Ed25519 public keys of trusted nodes. + /// When non-empty, only nodes whose public key is in this list may + /// connect via the CRDT sync WebSocket. Nodes authenticate by signing + /// a random challenge with their private key. + /// When empty (default), the mesh is open — any node may connect. + #[serde(default)] + pub trusted_keys: Vec, } /// Configuration for the filesystem watcher's sweep behaviour. @@ -228,6 +235,7 @@ impl Default for ProjectConfig { rate_limit_notifications: default_rate_limit_notifications(), timezone: None, rendezvous: None, + trusted_keys: Vec::new(), } } } @@ -304,6 +312,7 @@ impl ProjectConfig { rate_limit_notifications: legacy.rate_limit_notifications, timezone: legacy.timezone, rendezvous: None, + trusted_keys: Vec::new(), }; validate_agents(&config.agent)?; return Ok(config); @@ -332,6 +341,7 @@ impl ProjectConfig { rate_limit_notifications: legacy.rate_limit_notifications, timezone: legacy.timezone, rendezvous: None, + trusted_keys: Vec::new(), }; validate_agents(&config.agent)?; Ok(config) @@ -348,6 +358,7 @@ impl ProjectConfig { rate_limit_notifications: legacy.rate_limit_notifications, timezone: legacy.timezone, rendezvous: None, + trusted_keys: Vec::new(), }) } } diff --git a/server/src/crdt_state.rs b/server/src/crdt_state.rs index 4d1dccd0..fd16306c 100644 --- a/server/src/crdt_state.rs +++ b/server/src/crdt_state.rs @@ -600,6 +600,19 @@ pub fn our_node_id() -> Option { Some(hex::encode(&state.crdt.id)) } +/// Sign a byte slice with this node's Ed25519 private key. +/// +/// Used by the CRDT sync auth handshake: when a remote peer sends a +/// challenge nonce, this node signs it to prove possession of the +/// private key corresponding to its public node ID. +/// Returns `None` before `init()`. +pub fn sign_bytes(message: &[u8]) -> Option> { + use bft_json_crdt::keypair::sign; + let state = CRDT_STATE.get()?.lock().ok()?; + let sig = sign(&state.keypair, message); + Some(sig.as_ref().to_vec()) +} + /// Write a claim on a pipeline item via CRDT. /// /// Sets `claimed_by` to this node's ID and `claimed_at` to the current time. diff --git a/server/src/crdt_sync.rs b/server/src/crdt_sync.rs index 17ce32e1..403ad5d6 100644 --- a/server/src/crdt_sync.rs +++ b/server/src/crdt_sync.rs @@ -3,7 +3,17 @@ /// /// # Protocol /// -/// The sync protocol is a hybrid of two frame types: +/// ## Authentication (optional) +/// +/// When `trusted_keys` is configured in `project.toml`, nodes authenticate +/// on WebSocket connect via an Ed25519 challenge-response handshake: +/// +/// 1. Server sends `{"type":"challenge","nonce":""}` (32 random bytes). +/// 2. Client responds `{"type":"auth","pubkey":"","signature":""}`. +/// 3. Server verifies the signature and checks the pubkey is in `trusted_keys`. +/// 4. On failure the connection is closed immediately. +/// +/// When `trusted_keys` is empty (default), auth is skipped — the mesh is open. /// /// ## Text frames (bulk initial state) /// A JSON object with a `"type"` field: @@ -47,6 +57,62 @@ enum SyncMessage { Bulk { ops: Vec }, /// A single new op. Op { op: String }, + /// Challenge sent by the server when `trusted_keys` is configured. + Challenge { nonce: String }, + /// Auth response from the client: pubkey + signature over the nonce. + Auth { + pubkey: String, + signature: String, + }, +} + +/// Size of the random challenge nonce in bytes (hex-encoded to 64 chars). +pub const CHALLENGE_NONCE_BYTES: usize = 32; + +/// Load the list of trusted Ed25519 public keys from the project config. +/// +/// Returns an empty vec when no project root is set or no keys are configured. +fn load_trusted_keys(ctx: &AppContext) -> Vec { + let root = ctx.state.project_root.lock().unwrap().clone(); + let Some(root) = root else { return Vec::new() }; + crate::config::ProjectConfig::load(&root) + .map(|cfg| cfg.trusted_keys) + .unwrap_or_default() +} + +/// Verify an auth response against the challenge nonce and trusted keys list. +/// +/// Returns `true` if: the pubkey is in `trusted_keys` AND the signature is a +/// valid Ed25519 signature of `nonce_bytes` by the claimed pubkey. +pub fn verify_auth( + trusted_keys: &[String], + nonce_bytes: &[u8], + pubkey_hex: &str, + signature_hex: &str, +) -> bool { + use bft_json_crdt::keypair::{Ed25519PublicKey, Ed25519Signature, verify}; + use fastcrypto::traits::ToFromBytes; + + // Check the pubkey is trusted. + if !trusted_keys.iter().any(|k| k == pubkey_hex) { + return false; + } + + // Decode pubkey and signature from hex. + let Ok(pubkey_bytes) = hex::decode(pubkey_hex) else { + return false; + }; + let Ok(sig_bytes) = hex::decode(signature_hex) else { + return false; + }; + let Ok(pubkey) = Ed25519PublicKey::from_bytes(&pubkey_bytes) else { + return false; + }; + let Ok(signature) = Ed25519Signature::from_bytes(&sig_bytes) else { + return false; + }; + + verify(pubkey, nonce_bytes, signature) } // ── Server-side WebSocket handler ─────────────────────────────────── @@ -54,14 +120,60 @@ enum SyncMessage { #[handler] pub async fn crdt_sync_handler( ws: WebSocket, - _ctx: Data<&Arc>, + ctx: Data<&Arc>, ) -> impl poem::IntoResponse { - ws.on_upgrade(|socket| async move { + let trusted_keys = load_trusted_keys(&ctx); + ws.on_upgrade(move |socket| async move { let (mut sink, mut stream) = socket.split(); slog!("[crdt-sync] Peer connected"); - // Send bulk state dump. + // ── Auth handshake (when trusted_keys is configured) ──────── + if !trusted_keys.is_empty() { + // Generate random nonce. + let nonce_bytes: [u8; CHALLENGE_NONCE_BYTES] = { + let mut buf = [0u8; CHALLENGE_NONCE_BYTES]; + use sha2::Digest; + let hash = sha2::Sha256::digest(uuid::Uuid::new_v4().as_bytes()); + buf.copy_from_slice(&hash); + buf + }; + let nonce_hex = hex::encode(nonce_bytes); + + // Send challenge. + let challenge = SyncMessage::Challenge { nonce: nonce_hex.clone() }; + if let Ok(json) = serde_json::to_string(&challenge) + && sink.send(WsMessage::Text(json)).await.is_err() + { + return; + } + + // Wait for auth response (with timeout). + let auth_result = tokio::time::timeout( + std::time::Duration::from_secs(10), + stream.next(), + ).await; + + let authenticated = match auth_result { + Ok(Some(Ok(WsMessage::Text(text)))) => { + match serde_json::from_str::(&text) { + Ok(SyncMessage::Auth { pubkey, signature }) => { + verify_auth(&trusted_keys, &nonce_bytes, &pubkey, &signature) + } + _ => false, + } + } + _ => false, + }; + + if !authenticated { + slog!("[crdt-sync] Peer failed authentication; disconnecting"); + return; + } + slog!("[crdt-sync] Peer authenticated successfully"); + } + + // ── Bulk state dump + sync loop ───────────────────────────── if let Some(ops) = crdt_state::all_ops_json() { let msg = SyncMessage::Bulk { ops }; if let Ok(json) = serde_json::to_string(&msg) @@ -118,6 +230,22 @@ pub async fn crdt_sync_handler( }) } +/// Apply a bulk set of serialised `SignedOp` JSON strings. +fn handle_bulk_ops(ops: &[String]) { + let mut applied = 0u64; + for op_json in ops { + if let Ok(signed_op) = serde_json::from_str::(op_json) + && crdt_state::apply_remote_op(signed_op) + { + applied += 1; + } + } + slog!( + "[crdt-sync] Bulk sync: received {} ops, applied {applied}", + ops.len() + ); +} + /// Process an incoming text-frame sync message from a peer. /// /// Text frames carry the bulk state dump (`SyncMessage::Bulk`) or legacy @@ -133,24 +261,16 @@ fn handle_incoming_text(text: &str) { match msg { SyncMessage::Bulk { ops } => { - let mut applied = 0u64; - for op_json in &ops { - if let Ok(signed_op) = serde_json::from_str::(op_json) - && crdt_state::apply_remote_op(signed_op) - { - applied += 1; - } - } - slog!( - "[crdt-sync] Bulk sync: received {} ops, applied {applied}", - ops.len() - ); + handle_bulk_ops(&ops); } SyncMessage::Op { op } => { if let Ok(signed_op) = serde_json::from_str::(&op) { crdt_state::apply_remote_op(signed_op); } } + // Challenge/Auth are only used during the initial handshake, + // not during the main sync loop. + SyncMessage::Challenge { .. } | SyncMessage::Auth { .. } => {} } } @@ -212,7 +332,13 @@ pub fn spawn_rendezvous_client(url: String) { } /// Connect to a remote sync endpoint and exchange ops until disconnect. +/// +/// If the remote sends a `Challenge` message, this node responds with an +/// `Auth` message signed by its Ed25519 keypair. The remote then verifies +/// the signature against its `trusted_keys` list. async fn connect_and_sync(url: &str) -> Result<(), String> { + use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; + let (ws_stream, _) = tokio_tungstenite::connect_async(url) .await .map_err(|e| format!("WebSocket connect failed: {e}"))?; @@ -221,11 +347,59 @@ async fn connect_and_sync(url: &str) -> Result<(), String> { slog!("[crdt-sync] Connected to rendezvous peer"); + // ── Handle auth challenge if the remote sends one ─────────── + // Peek at the first message: if it's a Challenge, respond with Auth. + // If it's a Bulk (no auth required), process it immediately. + let first_frame = stream + .next() + .await + .ok_or_else(|| "connection closed before first message".to_string())? + .map_err(|e| format!("read error on first frame: {e}"))?; + + match &first_frame { + TungsteniteMsg::Text(text) => { + match serde_json::from_str::(text.as_ref()) { + Ok(SyncMessage::Challenge { nonce }) => { + slog!("[crdt-sync] Received auth challenge from rendezvous peer"); + let nonce_bytes = hex::decode(&nonce) + .map_err(|e| format!("bad challenge nonce hex: {e}"))?; + let pubkey_hex = crdt_state::our_node_id() + .ok_or_else(|| "CRDT not initialised".to_string())?; + let sig_bytes = crdt_state::sign_bytes(&nonce_bytes) + .ok_or_else(|| "CRDT not initialised".to_string())?; + let auth = SyncMessage::Auth { + pubkey: pubkey_hex, + signature: hex::encode(sig_bytes), + }; + let json = serde_json::to_string(&auth) + .map_err(|e| format!("serialize auth: {e}"))?; + sink.send(TungsteniteMsg::Text(json.into())) + .await + .map_err(|e| format!("send auth failed: {e}"))?; + slog!("[crdt-sync] Sent auth response"); + } + Ok(SyncMessage::Bulk { ops }) => { + // No auth required — process bulk immediately. + handle_bulk_ops(&ops); + } + _ => { + handle_incoming_text(text.as_ref()); + } + } + } + TungsteniteMsg::Binary(bytes) => { + handle_incoming_binary(bytes); + } + TungsteniteMsg::Close(_) => { + return Ok(()); + } + _ => {} + } + // Send our bulk state. if let Some(ops) = crdt_state::all_ops_json() { let msg = SyncMessage::Bulk { ops }; if let Ok(json) = serde_json::to_string(&msg) { - use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; sink.send(TungsteniteMsg::Text(json.into())) .await .map_err(|e| format!("Send bulk failed: {e}"))?; @@ -244,7 +418,6 @@ async fn connect_and_sync(url: &str) -> Result<(), String> { Ok(signed_op) => { // Encode via wire codec and send as binary frame. let bytes = crdt_wire::encode(&signed_op); - use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; if sink.send(TungsteniteMsg::Binary(bytes.into())).await.is_err() { break; } @@ -258,13 +431,13 @@ async fn connect_and_sync(url: &str) -> Result<(), String> { } frame = stream.next() => { match frame { - Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) => { + Some(Ok(TungsteniteMsg::Text(text))) => { handle_incoming_text(text.as_ref()); } - Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(bytes))) => { + Some(Ok(TungsteniteMsg::Binary(bytes))) => { handle_incoming_binary(&bytes); } - Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => break, + Some(Ok(TungsteniteMsg::Close(_))) | None => break, Some(Err(e)) => { slog!("[crdt-sync] Rendezvous read error: {e}"); break; @@ -1525,4 +1698,185 @@ name = "test" let view_b = serde_json::to_string(&CrdtNode::view(&crdt_b.doc.items)).unwrap(); assert_eq!(view_a, view_b, "Both nodes must converge to identical state"); } + + // ── Auth protocol tests ───────────────────────────────────────────── + + #[test] + fn sync_message_challenge_serialization_roundtrip() { + let msg = SyncMessage::Challenge { + nonce: "abcd1234".to_string(), + }; + let json = serde_json::to_string(&msg).unwrap(); + assert!(json.contains(r#""type":"challenge""#)); + let deserialized: SyncMessage = serde_json::from_str(&json).unwrap(); + match deserialized { + SyncMessage::Challenge { nonce } => assert_eq!(nonce, "abcd1234"), + _ => panic!("Expected Challenge"), + } + } + + #[test] + fn sync_message_auth_serialization_roundtrip() { + let msg = SyncMessage::Auth { + pubkey: "deadbeef".to_string(), + signature: "cafebabe".to_string(), + }; + let json = serde_json::to_string(&msg).unwrap(); + assert!(json.contains(r#""type":"auth""#)); + let deserialized: SyncMessage = serde_json::from_str(&json).unwrap(); + match deserialized { + SyncMessage::Auth { pubkey, signature } => { + assert_eq!(pubkey, "deadbeef"); + assert_eq!(signature, "cafebabe"); + } + _ => panic!("Expected Auth"), + } + } + + #[test] + fn verify_auth_accepts_valid_signature() { + use bft_json_crdt::keypair::{make_keypair, sign}; + use fastcrypto::traits::KeyPair; + + let kp = make_keypair(); + let pubkey_hex = hex::encode(kp.public().as_ref()); + let nonce = b"test-challenge-nonce-1234567890ab"; + + let sig = sign(&kp, nonce); + let sig_hex = hex::encode(sig.as_ref()); + + let trusted = vec![pubkey_hex.clone()]; + assert!( + super::verify_auth(&trusted, nonce, &pubkey_hex, &sig_hex), + "Valid signature from trusted key must be accepted" + ); + } + + #[test] + fn verify_auth_rejects_untrusted_key() { + use bft_json_crdt::keypair::{make_keypair, sign}; + use fastcrypto::traits::KeyPair; + + let kp = make_keypair(); + let pubkey_hex = hex::encode(kp.public().as_ref()); + let nonce = b"test-challenge-nonce-1234567890ab"; + + let sig = sign(&kp, nonce); + let sig_hex = hex::encode(sig.as_ref()); + + // Trusted list does NOT contain this key. + let trusted = vec!["aaaa".repeat(16)]; + assert!( + !super::verify_auth(&trusted, nonce, &pubkey_hex, &sig_hex), + "Signature from untrusted key must be rejected" + ); + } + + #[test] + fn verify_auth_rejects_wrong_signature() { + use bft_json_crdt::keypair::make_keypair; + use fastcrypto::traits::KeyPair; + + let kp = make_keypair(); + let pubkey_hex = hex::encode(kp.public().as_ref()); + let nonce = b"test-challenge-nonce-1234567890ab"; + + // Fabricate an invalid signature (all zeros, 64 bytes). + let bad_sig_hex = "00".repeat(64); + + let trusted = vec![pubkey_hex.clone()]; + assert!( + !super::verify_auth(&trusted, nonce, &pubkey_hex, &bad_sig_hex), + "Invalid signature must be rejected even if key is trusted" + ); + } + + #[test] + fn verify_auth_rejects_wrong_nonce() { + use bft_json_crdt::keypair::{make_keypair, sign}; + use fastcrypto::traits::KeyPair; + + let kp = make_keypair(); + let pubkey_hex = hex::encode(kp.public().as_ref()); + + // Sign one nonce but verify against a different one. + let nonce_a = b"nonce-aaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + let nonce_b = b"nonce-bbbbbbbbbbbbbbbbbbbbbbbbbbbb"; + + let sig = sign(&kp, nonce_a); + let sig_hex = hex::encode(sig.as_ref()); + + let trusted = vec![pubkey_hex.clone()]; + assert!( + !super::verify_auth(&trusted, nonce_b, &pubkey_hex, &sig_hex), + "Signature for wrong nonce must be rejected" + ); + } + + #[test] + fn verify_auth_rejects_malformed_hex() { + let trusted = vec!["abcd".to_string()]; + assert!(!super::verify_auth(&trusted, b"nonce", "abcd", "not-hex!!")); + assert!(!super::verify_auth(&trusted, b"nonce", "not-hex!!", "abcd")); + } + + #[test] + fn verify_auth_empty_trusted_keys_rejects_all() { + use bft_json_crdt::keypair::{make_keypair, sign}; + use fastcrypto::traits::KeyPair; + + let kp = make_keypair(); + let pubkey_hex = hex::encode(kp.public().as_ref()); + let nonce = b"test-nonce-12345678901234567890ab"; + let sig = sign(&kp, nonce); + let sig_hex = hex::encode(sig.as_ref()); + + let trusted: Vec = vec![]; + assert!( + !super::verify_auth(&trusted, nonce, &pubkey_hex, &sig_hex), + "Empty trusted_keys must reject all auth attempts" + ); + } + + #[test] + fn config_trusted_keys_parsed_from_toml() { + let toml_str = r#" +trusted_keys = [ + "aabbccdd00112233445566778899aabbccddeeff00112233445566778899aabb", + "11223344556677889900aabbccddeeff11223344556677889900aabbccddeeff", +] + +[[agent]] +name = "test" +"#; + let config: crate::config::ProjectConfig = toml::from_str(toml_str).unwrap(); + assert_eq!(config.trusted_keys.len(), 2); + assert_eq!( + config.trusted_keys[0], + "aabbccdd00112233445566778899aabbccddeeff00112233445566778899aabb" + ); + } + + #[test] + fn config_trusted_keys_defaults_to_empty() { + let config = crate::config::ProjectConfig::default(); + assert!(config.trusted_keys.is_empty()); + } + + #[test] + fn handle_incoming_text_ignores_challenge_and_auth_messages() { + // Challenge and Auth messages in the sync loop should be silently ignored. + let challenge = SyncMessage::Challenge { + nonce: "abc123".to_string(), + }; + let json = serde_json::to_string(&challenge).unwrap(); + handle_incoming_text(&json); // must not panic + + let auth = SyncMessage::Auth { + pubkey: "dead".to_string(), + signature: "beef".to_string(), + }; + let json = serde_json::to_string(&auth).unwrap(); + handle_incoming_text(&json); // must not panic + } } diff --git a/server/src/main.rs b/server/src/main.rs index 9c02d514..109fe120 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -337,6 +337,11 @@ async fn main() -> Result<(), std::io::Error> { } } + // Log the node's public key so operators can add it to trusted_keys. + if let Some(node_id) = crdt_state::our_node_id() { + slog!("[crdt] Node public key: {node_id}"); + } + // (CRDT state layer is initialised above alongside the legacy pipeline.db.) // Start the CRDT sync rendezvous client if configured in project.toml. diff --git a/server/src/worktree.rs b/server/src/worktree.rs index e8082d21..2f859c51 100644 --- a/server/src/worktree.rs +++ b/server/src/worktree.rs @@ -529,6 +529,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + trusted_keys: Vec::new(), }; // Should complete without panic run_setup_commands(tmp.path(), &config).await; @@ -554,6 +555,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + trusted_keys: Vec::new(), }; // Should complete without panic run_setup_commands(tmp.path(), &config).await; @@ -579,6 +581,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + trusted_keys: Vec::new(), }; // Setup command failures are non-fatal — should not panic or propagate run_setup_commands(tmp.path(), &config).await; @@ -604,6 +607,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + trusted_keys: Vec::new(), }; // Teardown failures are best-effort — should not propagate assert!(run_teardown_commands(tmp.path(), &config).await.is_ok()); @@ -628,6 +632,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + trusted_keys: Vec::new(), }; let info = create_worktree(&project_root, "42_fresh_test", &config, 3001) .await @@ -659,6 +664,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + trusted_keys: Vec::new(), }; // First creation let _info1 = create_worktree(&project_root, "43_reuse_test", &config, 3001) @@ -731,6 +737,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + trusted_keys: Vec::new(), }; let result = remove_worktree_by_story_id(tmp.path(), "99_nonexistent", &config).await; @@ -761,6 +768,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + trusted_keys: Vec::new(), }; create_worktree(&project_root, "88_remove_by_id", &config, 3001) .await @@ -838,6 +846,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + trusted_keys: Vec::new(), }; // Even though setup commands fail, create_worktree must succeed // so the agent can start and fix the problem itself. @@ -871,6 +880,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + trusted_keys: Vec::new(), }; // First creation — no setup commands, should succeed create_worktree(&project_root, "173_reuse_fail", &empty_config, 3001) @@ -894,6 +904,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + trusted_keys: Vec::new(), }; // Second call — worktree exists, setup commands fail, must still succeed let result = create_worktree(&project_root, "173_reuse_fail", &failing_config, 3002).await; @@ -923,6 +934,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + trusted_keys: Vec::new(), }; let info = create_worktree(&project_root, "77_remove_async", &config, 3001) .await