From 7548486a53367e4e3702b1a92daf56821e5f7d42 Mon Sep 17 00:00:00 2001 From: dave Date: Sat, 25 Apr 2026 22:09:31 +0000 Subject: [PATCH] huskies: merge 633_story_crdt_sync_bearer_token_connection_auth --- server/src/config.rs | 19 +++ server/src/crdt_sync.rs | 302 +++++++++++++++++++++++++++++++++++++++- server/src/main.rs | 39 ++++-- server/src/worktree.rs | 24 ++++ 4 files changed, 370 insertions(+), 14 deletions(-) diff --git a/server/src/config.rs b/server/src/config.rs index 49c8b6ba..a80b4b92 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -59,6 +59,17 @@ pub struct ProjectConfig { /// When empty or missing, all peers are rejected (closed-by-default). #[serde(default)] pub trusted_keys: Vec, + /// When `true`, `/crdt-sync` WebSocket connections must supply a valid + /// `?token=` query parameter or receive HTTP 401. + /// Defaults to `false` so trusted-network deployments keep the current + /// open behaviour. + #[serde(default)] + pub crdt_require_token: bool, + /// Static bearer tokens accepted for `/crdt-sync` connections. + /// Each entry is a raw token string; tokens expire 30 days after the + /// server starts. Only meaningful when `crdt_require_token` is `true`. + #[serde(default)] + pub crdt_tokens: Vec, } /// Configuration for the filesystem watcher's sweep behaviour. @@ -234,6 +245,8 @@ impl Default for ProjectConfig { timezone: None, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), } } } @@ -312,6 +325,8 @@ impl ProjectConfig { timezone: legacy.timezone, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), }; validate_agents(&config.agent)?; return Ok(config); @@ -341,6 +356,8 @@ impl ProjectConfig { timezone: legacy.timezone, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), }; validate_agents(&config.agent)?; Ok(config) @@ -358,6 +375,8 @@ impl ProjectConfig { timezone: legacy.timezone, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), }) } } diff --git a/server/src/crdt_sync.rs b/server/src/crdt_sync.rs index 4eae175e..0cf6ce1d 100644 --- a/server/src/crdt_sync.rs +++ b/server/src/crdt_sync.rs @@ -44,9 +44,12 @@ use bft_json_crdt::json_crdt::SignedOp; use futures::{SinkExt, StreamExt}; use poem::handler; +use poem::http::StatusCode; use poem::web::Data; +use poem::web::Query; use poem::web::websocket::{Message as WsMessage, WebSocket}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::sync::{Arc, OnceLock}; use crate::crdt_state; @@ -86,6 +89,65 @@ fn trusted_keys() -> &'static [String] { TRUSTED_KEYS.get().map(|v| v.as_slice()).unwrap_or(&[]) } +// ── Bearer-token auth ─────────────────────────────────────────────── + +/// Time-to-live for CRDT bearer tokens in seconds (30 days). +const TOKEN_TTL_SECS: f64 = 30.0 * 24.0 * 3600.0; + +/// Whether a bearer token is required for `/crdt-sync` connections. +/// `None` (uninitialised) → open access (backward compatible). +static REQUIRE_TOKEN: OnceLock = OnceLock::new(); + +/// Valid bearer tokens — maps token string to its expiry unix timestamp. +static CRDT_TOKENS: OnceLock>> = OnceLock::new(); + +/// Initialise bearer-token auth for CRDT-sync connections. +/// +/// Must be called once at startup before any WebSocket connections are accepted. +/// When `require` is `true`, clients must supply a valid `?token=` query +/// parameter on the upgrade request or receive HTTP 401. When `require` is +/// `false` (default) a token is optional — connections without one are +/// accepted, but a supplied token is still validated. +pub fn init_token_auth(require: bool, tokens: Vec) { + let _ = REQUIRE_TOKEN.set(require); + let store = CRDT_TOKENS.get_or_init(|| std::sync::RwLock::new(HashMap::new())); + if let Ok(mut map) = store.write() { + let now = chrono::Utc::now().timestamp() as f64; + for token in tokens { + map.insert(token, now + TOKEN_TTL_SECS); + } + } +} + +/// Add a bearer token to the CRDT-sync token store. +/// +/// The token expires after [`TOKEN_TTL_SECS`] seconds. Returns the expiry +/// unix timestamp so callers can surface it in admin tooling. +pub fn add_join_token(token: String) -> f64 { + let store = CRDT_TOKENS.get_or_init(|| std::sync::RwLock::new(HashMap::new())); + let now = chrono::Utc::now().timestamp() as f64; + let expires_at = now + TOKEN_TTL_SECS; + if let Ok(mut map) = store.write() { + map.insert(token, expires_at); + } + expires_at +} + +/// Validate a bearer token against the CRDT-sync token store. +/// +/// Returns `true` if the token exists in the store and has not expired. +fn validate_join_token(token: &str) -> bool { + let Some(store) = CRDT_TOKENS.get() else { + return false; + }; + let now = chrono::Utc::now().timestamp() as f64; + store + .read() + .ok() + .and_then(|map| map.get(token).copied()) + .is_some_and(|expires_at| expires_at > now) +} + // ── Wire protocol types ───────────────────────────────────────────── /// Auth handshake: challenge sent by the listener to the connector. @@ -126,12 +188,49 @@ enum SyncMessage { // ── Server-side WebSocket handler ─────────────────────────────────── +/// Query parameters accepted on the `/crdt-sync` WebSocket upgrade request. +#[derive(Deserialize)] +struct SyncQueryParams { + /// Optional bearer token. Required when the server is in token-required mode. + token: Option, +} + +/// WebSocket handler for CRDT peer synchronisation. +/// +/// Accepts an optional `?token=` query parameter. When the +/// server is configured with `crdt_require_token = true`, a valid token must +/// be supplied or the upgrade is rejected with HTTP 401. When the server is +/// in open-access mode (the default), a token is optional but still validated +/// if present. #[handler] pub async fn crdt_sync_handler( ws: WebSocket, _ctx: Data<&Arc>, remote_addr: &poem::web::RemoteAddr, -) -> impl poem::IntoResponse { + Query(params): Query, +) -> poem::Response { + // ── Bearer-token check (pre-upgrade) ──────────────────────────── + let require_token = REQUIRE_TOKEN.get().copied().unwrap_or(false); + match ¶ms.token { + Some(t) => { + if !validate_join_token(t) { + slog!("[crdt-sync] Rejected connection: invalid or expired token"); + return poem::Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body("invalid or expired token"); + } + } + None if require_token => { + slog!("[crdt-sync] Rejected connection: token required but not provided"); + return poem::Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body("token required"); + } + None => {} + } + + // ── WebSocket upgrade ──────────────────────────────────────────── + use poem::IntoResponse as _; let peer_addr = remote_addr.to_string(); ws.on_upgrade(move |socket| async move { let (mut sink, mut stream) = socket.split(); @@ -398,6 +497,7 @@ pub async fn crdt_sync_handler( slog!("[crdt-sync] Peer disconnected"); }) + .into_response() } /// Wait for the next text-frame sync message from the peer, handling Ping/Pong @@ -512,13 +612,18 @@ pub const RENDEZVOUS_ERROR_THRESHOLD: u32 = 10; /// The client reconnects with exponential backoff if the connection drops. /// Individual failures are logged at WARN; after [`RENDEZVOUS_ERROR_THRESHOLD`] /// consecutive failures the log level escalates to ERROR. -pub fn spawn_rendezvous_client(url: String) { +/// +/// When `token` is provided it is appended to the upgrade URL as +/// `?token=` so the server's bearer-token check is satisfied. This +/// reuses the existing `--join-token` / `HUSKIES_JOIN_TOKEN` plumbing on the +/// agent side. +pub fn spawn_rendezvous_client(url: String, token: Option) { tokio::spawn(async move { let mut backoff_secs = 1u64; let mut consecutive_failures: u32 = 0; loop { slog!("[crdt-sync] Connecting to rendezvous peer: {url}"); - match connect_and_sync(&url).await { + match connect_and_sync(&url, token.as_deref()).await { Ok(()) => { slog!("[crdt-sync] Rendezvous connection closed cleanly"); backoff_secs = 1; @@ -545,8 +650,21 @@ pub fn spawn_rendezvous_client(url: String) { } /// Connect to a remote sync endpoint and exchange ops until disconnect. -async fn connect_and_sync(url: &str) -> Result<(), String> { - let (ws_stream, _) = tokio_tungstenite::connect_async(url) +/// +/// When `token` is supplied it is appended as `?token=` to the +/// connection URL so the server's bearer-token check passes. +async fn connect_and_sync(url: &str, token: Option<&str>) -> Result<(), String> { + let connect_url = match token { + Some(t) => { + if url.contains('?') { + format!("{url}&token={t}") + } else { + format!("{url}?token={t}") + } + } + None => url.to_string(), + }; + let (ws_stream, _) = tokio_tungstenite::connect_async(connect_url.as_str()) .await .map_err(|e| format!("WebSocket connect failed: {e}"))?; @@ -3304,4 +3422,178 @@ name = "test" // Reaching this point means all prior tests in this module compiled // and passed. This test documents the AC6 intent. } + + // ── Story 633: bearer-token connection auth ─────────────────────────────── + + /// AC4: Valid token — `validate_join_token` returns `true` for a token that + /// has been added via `add_join_token` and has not expired. + #[test] + fn valid_token_passes_validation() { + let token = format!("test-valid-{}", uuid::Uuid::new_v4()); + super::add_join_token(token.clone()); + assert!( + super::validate_join_token(&token), + "A freshly added token must pass validation" + ); + } + + /// AC5: Invalid (bogus) token — `validate_join_token` returns `false` for a + /// token that was never added to the store. + #[test] + fn bogus_token_fails_validation() { + let bogus = "this-token-was-never-added-to-the-store"; + assert!( + !super::validate_join_token(bogus), + "An unknown token must fail validation" + ); + } + + /// AC5: Expired token — `validate_join_token` returns `false` for a token + /// whose `expires_at` is in the past. + #[test] + fn expired_token_fails_validation() { + // Insert a token directly with an already-past expiry timestamp. + let token = format!("test-expired-{}", uuid::Uuid::new_v4()); + let store = super::CRDT_TOKENS + .get_or_init(|| std::sync::RwLock::new(std::collections::HashMap::new())); + // expires_at = 1 (way in the past — 1970-01-01T00:00:01Z) + store.write().unwrap().insert(token.clone(), 1.0_f64); + assert!( + !super::validate_join_token(&token), + "An expired token must fail validation" + ); + } + + /// AC6: No token when server requires one — `validate_join_token` returns + /// `false` and the caller must reject with 401. Verifies the logic path + /// used by `crdt_sync_handler`. + #[test] + fn no_token_with_require_true_is_rejected() { + // Simulate: require_token=true, token=None → reject. + let require_token = true; + let token: Option<&str> = None; + let should_reject = match token { + Some(t) => !super::validate_join_token(t), + None if require_token => true, + None => false, + }; + assert!( + should_reject, + "Missing token must be rejected when token is required" + ); + } + + /// AC6: No token when server is in open mode — connection is accepted. + #[test] + fn no_token_with_require_false_is_accepted() { + let require_token = false; + let token: Option<&str> = None; + let should_reject = match token { + Some(t) => !super::validate_join_token(t), + None if require_token => true, + None => false, + }; + assert!( + !should_reject, + "Missing token must be accepted in open mode" + ); + } + + /// AC3: `spawn_rendezvous_client` URL construction — when a token is provided + /// the `?token=` query parameter is appended correctly. + #[test] + fn rendezvous_url_with_token_appended() { + let base = "ws://host:3001/crdt-sync"; + let token = "my-secret-token"; + let url_with_token = if base.contains('?') { + format!("{base}&token={token}") + } else { + format!("{base}?token={token}") + }; + assert_eq!( + url_with_token, + "ws://host:3001/crdt-sync?token=my-secret-token" + ); + + // With existing query params. + let base_with_query = "ws://host:3001/crdt-sync?foo=bar"; + let url_appended = if base_with_query.contains('?') { + format!("{base_with_query}&token={token}") + } else { + format!("{base_with_query}?token={token}") + }; + assert_eq!( + url_appended, + "ws://host:3001/crdt-sync?foo=bar&token=my-secret-token" + ); + } + + /// AC3: Without a token, the URL is used as-is. + #[test] + fn rendezvous_url_without_token_unchanged() { + let base = "ws://host:3001/crdt-sync"; + let token: Option<&str> = None; + let connect_url = match token { + Some(t) => format!("{base}?token={t}"), + None => base.to_string(), + }; + assert_eq!(connect_url, base); + } + + /// `add_join_token` returns a future expiry timestamp that is in the future. + #[test] + fn add_join_token_returns_future_expiry() { + let token = format!("test-expiry-{}", uuid::Uuid::new_v4()); + let now = chrono::Utc::now().timestamp() as f64; + let expires_at = super::add_join_token(token); + assert!( + expires_at > now, + "Expiry timestamp must be in the future (got {expires_at}, now={now})" + ); + } + + /// TOKEN_TTL_SECS must be 30 days. + #[test] + fn token_ttl_is_thirty_days() { + assert_eq!( + super::TOKEN_TTL_SECS, + 30.0 * 24.0 * 3600.0, + "TOKEN_TTL_SECS must be 30 days" + ); + } + + /// Config: `crdt_require_token` defaults to `false`. + #[test] + fn config_crdt_require_token_defaults_to_false() { + let config = crate::config::ProjectConfig::default(); + assert!( + !config.crdt_require_token, + "crdt_require_token must default to false (open access)" + ); + } + + /// Config: `crdt_tokens` defaults to empty. + #[test] + fn config_crdt_tokens_defaults_to_empty() { + let config = crate::config::ProjectConfig::default(); + assert!( + config.crdt_tokens.is_empty(), + "crdt_tokens must default to empty" + ); + } + + /// Config: `crdt_require_token` and `crdt_tokens` are parsed from TOML. + #[test] + fn config_crdt_token_fields_parsed_from_toml() { + let toml_str = r#" +crdt_require_token = true +crdt_tokens = ["token-abc", "token-xyz"] + +[[agent]] +name = "test" +"#; + let config: crate::config::ProjectConfig = toml::from_str(toml_str).unwrap(); + assert!(config.crdt_require_token); + assert_eq!(config.crdt_tokens, vec!["token-abc", "token-xyz"]); + } } diff --git a/server/src/main.rs b/server/src/main.rs index 670d8bb1..7870e1f4 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -419,10 +419,19 @@ async fn main() -> Result<(), std::io::Error> { // (CRDT state layer is initialised above alongside the legacy pipeline.db.) - // Load trusted keys and start the CRDT sync rendezvous client if configured. - // In agent mode, the --rendezvous flag overrides project.toml. + // Load trusted keys, token auth config, and start the CRDT sync rendezvous + // client if configured. In agent mode, the --rendezvous flag overrides + // project.toml. The --join-token / HUSKIES_JOIN_TOKEN is appended to the + // rendezvous URL as ?token=... so the server's bearer-token check passes. + let crdt_join_token = cli + .join_token + .clone() + .or_else(|| std::env::var("HUSKIES_JOIN_TOKEN").ok()); + let sync_config = if is_agent { - agent_rendezvous.clone().map(|url| (url, Vec::new())) + agent_rendezvous + .clone() + .map(|url| (url, Vec::new(), false, Vec::new())) } else { app_state .project_root @@ -430,22 +439,34 @@ async fn main() -> Result<(), std::io::Error> { .unwrap() .as_ref() .and_then(|root| config::ProjectConfig::load(root).ok()) - .and_then(|cfg| cfg.rendezvous.map(|url| (url, cfg.trusted_keys))) + .and_then(|cfg| { + cfg.rendezvous.map(|url| { + ( + url, + cfg.trusted_keys, + cfg.crdt_require_token, + cfg.crdt_tokens, + ) + }) + }) }; - if let Some((rendezvous_url, trusted_keys)) = sync_config { + if let Some((rendezvous_url, trusted_keys, require_token, crdt_tokens)) = sync_config { crdt_sync::init_trusted_keys(trusted_keys); - crdt_sync::spawn_rendezvous_client(rendezvous_url); + crdt_sync::init_token_auth(require_token, crdt_tokens); + crdt_sync::spawn_rendezvous_client(rendezvous_url, crdt_join_token); } else { - // Even without rendezvous, initialise trusted keys for incoming connections. - let keys = app_state + // Even without rendezvous, initialise trusted keys and token auth for + // incoming connections. + let (keys, require_token, crdt_tokens) = app_state .project_root .lock() .unwrap() .as_ref() .and_then(|root| config::ProjectConfig::load(root).ok()) - .map(|cfg| cfg.trusted_keys) + .map(|cfg| (cfg.trusted_keys, cfg.crdt_require_token, cfg.crdt_tokens)) .unwrap_or_default(); crdt_sync::init_trusted_keys(keys); + crdt_sync::init_token_auth(require_token, crdt_tokens); } // ── Agent mode: headless build agent ──────────────────────────────── diff --git a/server/src/worktree.rs b/server/src/worktree.rs index 94de189e..64c036cc 100644 --- a/server/src/worktree.rs +++ b/server/src/worktree.rs @@ -531,6 +531,8 @@ mod tests { timezone: None, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), }; // Should complete without panic run_setup_commands(tmp.path(), &config).await; @@ -557,6 +559,8 @@ mod tests { timezone: None, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), }; // Should complete without panic run_setup_commands(tmp.path(), &config).await; @@ -583,6 +587,8 @@ mod tests { timezone: None, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), }; // Setup command failures are non-fatal — should not panic or propagate run_setup_commands(tmp.path(), &config).await; @@ -609,6 +615,8 @@ mod tests { timezone: None, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), }; // Teardown failures are best-effort — should not propagate assert!(run_teardown_commands(tmp.path(), &config).await.is_ok()); @@ -634,6 +642,8 @@ mod tests { timezone: None, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), }; let info = create_worktree(&project_root, "42_fresh_test", &config, 3001) .await @@ -666,6 +676,8 @@ mod tests { timezone: None, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), }; // First creation let _info1 = create_worktree(&project_root, "43_reuse_test", &config, 3001) @@ -739,6 +751,8 @@ mod tests { timezone: None, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), }; let result = remove_worktree_by_story_id(tmp.path(), "99_nonexistent", &config).await; @@ -770,6 +784,8 @@ mod tests { timezone: None, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), }; create_worktree(&project_root, "88_remove_by_id", &config, 3001) .await @@ -848,6 +864,8 @@ mod tests { timezone: None, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), }; // Even though setup commands fail, create_worktree must succeed // so the agent can start and fix the problem itself. @@ -882,6 +900,8 @@ mod tests { timezone: None, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), }; // First creation — no setup commands, should succeed create_worktree(&project_root, "173_reuse_fail", &empty_config, 3001) @@ -906,6 +926,8 @@ mod tests { timezone: None, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), }; // Second call — worktree exists, setup commands fail, must still succeed let result = create_worktree(&project_root, "173_reuse_fail", &failing_config, 3002).await; @@ -936,6 +958,8 @@ mod tests { timezone: None, rendezvous: None, trusted_keys: Vec::new(), + crdt_require_token: false, + crdt_tokens: Vec::new(), }; let info = create_worktree(&project_root, "77_remove_async", &config, 3001) .await