huskies: merge 633_story_crdt_sync_bearer_token_connection_auth
This commit is contained in:
@@ -59,6 +59,17 @@ pub struct ProjectConfig {
|
|||||||
/// When empty or missing, all peers are rejected (closed-by-default).
|
/// When empty or missing, all peers are rejected (closed-by-default).
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub trusted_keys: Vec<String>,
|
pub trusted_keys: Vec<String>,
|
||||||
|
/// When `true`, `/crdt-sync` WebSocket connections must supply a valid
|
||||||
|
/// `?token=<bearer-token>` query parameter or receive HTTP 401.
|
||||||
|
/// Defaults to `false` so trusted-network deployments keep the current
|
||||||
|
/// open behaviour.
|
||||||
|
#[serde(default)]
|
||||||
|
pub crdt_require_token: bool,
|
||||||
|
/// Static bearer tokens accepted for `/crdt-sync` connections.
|
||||||
|
/// Each entry is a raw token string; tokens expire 30 days after the
|
||||||
|
/// server starts. Only meaningful when `crdt_require_token` is `true`.
|
||||||
|
#[serde(default)]
|
||||||
|
pub crdt_tokens: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Configuration for the filesystem watcher's sweep behaviour.
|
/// Configuration for the filesystem watcher's sweep behaviour.
|
||||||
@@ -234,6 +245,8 @@ impl Default for ProjectConfig {
|
|||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -312,6 +325,8 @@ impl ProjectConfig {
|
|||||||
timezone: legacy.timezone,
|
timezone: legacy.timezone,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
};
|
};
|
||||||
validate_agents(&config.agent)?;
|
validate_agents(&config.agent)?;
|
||||||
return Ok(config);
|
return Ok(config);
|
||||||
@@ -341,6 +356,8 @@ impl ProjectConfig {
|
|||||||
timezone: legacy.timezone,
|
timezone: legacy.timezone,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
};
|
};
|
||||||
validate_agents(&config.agent)?;
|
validate_agents(&config.agent)?;
|
||||||
Ok(config)
|
Ok(config)
|
||||||
@@ -358,6 +375,8 @@ impl ProjectConfig {
|
|||||||
timezone: legacy.timezone,
|
timezone: legacy.timezone,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+297
-5
@@ -44,9 +44,12 @@
|
|||||||
use bft_json_crdt::json_crdt::SignedOp;
|
use bft_json_crdt::json_crdt::SignedOp;
|
||||||
use futures::{SinkExt, StreamExt};
|
use futures::{SinkExt, StreamExt};
|
||||||
use poem::handler;
|
use poem::handler;
|
||||||
|
use poem::http::StatusCode;
|
||||||
use poem::web::Data;
|
use poem::web::Data;
|
||||||
|
use poem::web::Query;
|
||||||
use poem::web::websocket::{Message as WsMessage, WebSocket};
|
use poem::web::websocket::{Message as WsMessage, WebSocket};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::sync::{Arc, OnceLock};
|
use std::sync::{Arc, OnceLock};
|
||||||
|
|
||||||
use crate::crdt_state;
|
use crate::crdt_state;
|
||||||
@@ -86,6 +89,65 @@ fn trusted_keys() -> &'static [String] {
|
|||||||
TRUSTED_KEYS.get().map(|v| v.as_slice()).unwrap_or(&[])
|
TRUSTED_KEYS.get().map(|v| v.as_slice()).unwrap_or(&[])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Bearer-token auth ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Time-to-live for CRDT bearer tokens in seconds (30 days).
|
||||||
|
const TOKEN_TTL_SECS: f64 = 30.0 * 24.0 * 3600.0;
|
||||||
|
|
||||||
|
/// Whether a bearer token is required for `/crdt-sync` connections.
|
||||||
|
/// `None` (uninitialised) → open access (backward compatible).
|
||||||
|
static REQUIRE_TOKEN: OnceLock<bool> = OnceLock::new();
|
||||||
|
|
||||||
|
/// Valid bearer tokens — maps token string to its expiry unix timestamp.
|
||||||
|
static CRDT_TOKENS: OnceLock<std::sync::RwLock<HashMap<String, f64>>> = OnceLock::new();
|
||||||
|
|
||||||
|
/// Initialise bearer-token auth for CRDT-sync connections.
|
||||||
|
///
|
||||||
|
/// Must be called once at startup before any WebSocket connections are accepted.
|
||||||
|
/// When `require` is `true`, clients must supply a valid `?token=` query
|
||||||
|
/// parameter on the upgrade request or receive HTTP 401. When `require` is
|
||||||
|
/// `false` (default) a token is optional — connections without one are
|
||||||
|
/// accepted, but a supplied token is still validated.
|
||||||
|
pub fn init_token_auth(require: bool, tokens: Vec<String>) {
|
||||||
|
let _ = REQUIRE_TOKEN.set(require);
|
||||||
|
let store = CRDT_TOKENS.get_or_init(|| std::sync::RwLock::new(HashMap::new()));
|
||||||
|
if let Ok(mut map) = store.write() {
|
||||||
|
let now = chrono::Utc::now().timestamp() as f64;
|
||||||
|
for token in tokens {
|
||||||
|
map.insert(token, now + TOKEN_TTL_SECS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add a bearer token to the CRDT-sync token store.
|
||||||
|
///
|
||||||
|
/// The token expires after [`TOKEN_TTL_SECS`] seconds. Returns the expiry
|
||||||
|
/// unix timestamp so callers can surface it in admin tooling.
|
||||||
|
pub fn add_join_token(token: String) -> f64 {
|
||||||
|
let store = CRDT_TOKENS.get_or_init(|| std::sync::RwLock::new(HashMap::new()));
|
||||||
|
let now = chrono::Utc::now().timestamp() as f64;
|
||||||
|
let expires_at = now + TOKEN_TTL_SECS;
|
||||||
|
if let Ok(mut map) = store.write() {
|
||||||
|
map.insert(token, expires_at);
|
||||||
|
}
|
||||||
|
expires_at
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Validate a bearer token against the CRDT-sync token store.
|
||||||
|
///
|
||||||
|
/// Returns `true` if the token exists in the store and has not expired.
|
||||||
|
fn validate_join_token(token: &str) -> bool {
|
||||||
|
let Some(store) = CRDT_TOKENS.get() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let now = chrono::Utc::now().timestamp() as f64;
|
||||||
|
store
|
||||||
|
.read()
|
||||||
|
.ok()
|
||||||
|
.and_then(|map| map.get(token).copied())
|
||||||
|
.is_some_and(|expires_at| expires_at > now)
|
||||||
|
}
|
||||||
|
|
||||||
// ── Wire protocol types ─────────────────────────────────────────────
|
// ── Wire protocol types ─────────────────────────────────────────────
|
||||||
|
|
||||||
/// Auth handshake: challenge sent by the listener to the connector.
|
/// Auth handshake: challenge sent by the listener to the connector.
|
||||||
@@ -126,12 +188,49 @@ enum SyncMessage {
|
|||||||
|
|
||||||
// ── Server-side WebSocket handler ───────────────────────────────────
|
// ── Server-side WebSocket handler ───────────────────────────────────
|
||||||
|
|
||||||
|
/// Query parameters accepted on the `/crdt-sync` WebSocket upgrade request.
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct SyncQueryParams {
|
||||||
|
/// Optional bearer token. Required when the server is in token-required mode.
|
||||||
|
token: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// WebSocket handler for CRDT peer synchronisation.
|
||||||
|
///
|
||||||
|
/// Accepts an optional `?token=<bearer-token>` query parameter. When the
|
||||||
|
/// server is configured with `crdt_require_token = true`, a valid token must
|
||||||
|
/// be supplied or the upgrade is rejected with HTTP 401. When the server is
|
||||||
|
/// in open-access mode (the default), a token is optional but still validated
|
||||||
|
/// if present.
|
||||||
#[handler]
|
#[handler]
|
||||||
pub async fn crdt_sync_handler(
|
pub async fn crdt_sync_handler(
|
||||||
ws: WebSocket,
|
ws: WebSocket,
|
||||||
_ctx: Data<&Arc<AppContext>>,
|
_ctx: Data<&Arc<AppContext>>,
|
||||||
remote_addr: &poem::web::RemoteAddr,
|
remote_addr: &poem::web::RemoteAddr,
|
||||||
) -> impl poem::IntoResponse {
|
Query(params): Query<SyncQueryParams>,
|
||||||
|
) -> poem::Response {
|
||||||
|
// ── Bearer-token check (pre-upgrade) ────────────────────────────
|
||||||
|
let require_token = REQUIRE_TOKEN.get().copied().unwrap_or(false);
|
||||||
|
match ¶ms.token {
|
||||||
|
Some(t) => {
|
||||||
|
if !validate_join_token(t) {
|
||||||
|
slog!("[crdt-sync] Rejected connection: invalid or expired token");
|
||||||
|
return poem::Response::builder()
|
||||||
|
.status(StatusCode::UNAUTHORIZED)
|
||||||
|
.body("invalid or expired token");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None if require_token => {
|
||||||
|
slog!("[crdt-sync] Rejected connection: token required but not provided");
|
||||||
|
return poem::Response::builder()
|
||||||
|
.status(StatusCode::UNAUTHORIZED)
|
||||||
|
.body("token required");
|
||||||
|
}
|
||||||
|
None => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── WebSocket upgrade ────────────────────────────────────────────
|
||||||
|
use poem::IntoResponse as _;
|
||||||
let peer_addr = remote_addr.to_string();
|
let peer_addr = remote_addr.to_string();
|
||||||
ws.on_upgrade(move |socket| async move {
|
ws.on_upgrade(move |socket| async move {
|
||||||
let (mut sink, mut stream) = socket.split();
|
let (mut sink, mut stream) = socket.split();
|
||||||
@@ -398,6 +497,7 @@ pub async fn crdt_sync_handler(
|
|||||||
|
|
||||||
slog!("[crdt-sync] Peer disconnected");
|
slog!("[crdt-sync] Peer disconnected");
|
||||||
})
|
})
|
||||||
|
.into_response()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Wait for the next text-frame sync message from the peer, handling Ping/Pong
|
/// Wait for the next text-frame sync message from the peer, handling Ping/Pong
|
||||||
@@ -512,13 +612,18 @@ pub const RENDEZVOUS_ERROR_THRESHOLD: u32 = 10;
|
|||||||
/// The client reconnects with exponential backoff if the connection drops.
|
/// The client reconnects with exponential backoff if the connection drops.
|
||||||
/// Individual failures are logged at WARN; after [`RENDEZVOUS_ERROR_THRESHOLD`]
|
/// Individual failures are logged at WARN; after [`RENDEZVOUS_ERROR_THRESHOLD`]
|
||||||
/// consecutive failures the log level escalates to ERROR.
|
/// consecutive failures the log level escalates to ERROR.
|
||||||
pub fn spawn_rendezvous_client(url: String) {
|
///
|
||||||
|
/// When `token` is provided it is appended to the upgrade URL as
|
||||||
|
/// `?token=<token>` so the server's bearer-token check is satisfied. This
|
||||||
|
/// reuses the existing `--join-token` / `HUSKIES_JOIN_TOKEN` plumbing on the
|
||||||
|
/// agent side.
|
||||||
|
pub fn spawn_rendezvous_client(url: String, token: Option<String>) {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut backoff_secs = 1u64;
|
let mut backoff_secs = 1u64;
|
||||||
let mut consecutive_failures: u32 = 0;
|
let mut consecutive_failures: u32 = 0;
|
||||||
loop {
|
loop {
|
||||||
slog!("[crdt-sync] Connecting to rendezvous peer: {url}");
|
slog!("[crdt-sync] Connecting to rendezvous peer: {url}");
|
||||||
match connect_and_sync(&url).await {
|
match connect_and_sync(&url, token.as_deref()).await {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
slog!("[crdt-sync] Rendezvous connection closed cleanly");
|
slog!("[crdt-sync] Rendezvous connection closed cleanly");
|
||||||
backoff_secs = 1;
|
backoff_secs = 1;
|
||||||
@@ -545,8 +650,21 @@ pub fn spawn_rendezvous_client(url: String) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Connect to a remote sync endpoint and exchange ops until disconnect.
|
/// Connect to a remote sync endpoint and exchange ops until disconnect.
|
||||||
async fn connect_and_sync(url: &str) -> Result<(), String> {
|
///
|
||||||
let (ws_stream, _) = tokio_tungstenite::connect_async(url)
|
/// When `token` is supplied it is appended as `?token=<token>` to the
|
||||||
|
/// connection URL so the server's bearer-token check passes.
|
||||||
|
async fn connect_and_sync(url: &str, token: Option<&str>) -> Result<(), String> {
|
||||||
|
let connect_url = match token {
|
||||||
|
Some(t) => {
|
||||||
|
if url.contains('?') {
|
||||||
|
format!("{url}&token={t}")
|
||||||
|
} else {
|
||||||
|
format!("{url}?token={t}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => url.to_string(),
|
||||||
|
};
|
||||||
|
let (ws_stream, _) = tokio_tungstenite::connect_async(connect_url.as_str())
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format!("WebSocket connect failed: {e}"))?;
|
.map_err(|e| format!("WebSocket connect failed: {e}"))?;
|
||||||
|
|
||||||
@@ -3304,4 +3422,178 @@ name = "test"
|
|||||||
// Reaching this point means all prior tests in this module compiled
|
// Reaching this point means all prior tests in this module compiled
|
||||||
// and passed. This test documents the AC6 intent.
|
// and passed. This test documents the AC6 intent.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Story 633: bearer-token connection auth ───────────────────────────────
|
||||||
|
|
||||||
|
/// AC4: Valid token — `validate_join_token` returns `true` for a token that
|
||||||
|
/// has been added via `add_join_token` and has not expired.
|
||||||
|
#[test]
|
||||||
|
fn valid_token_passes_validation() {
|
||||||
|
let token = format!("test-valid-{}", uuid::Uuid::new_v4());
|
||||||
|
super::add_join_token(token.clone());
|
||||||
|
assert!(
|
||||||
|
super::validate_join_token(&token),
|
||||||
|
"A freshly added token must pass validation"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC5: Invalid (bogus) token — `validate_join_token` returns `false` for a
|
||||||
|
/// token that was never added to the store.
|
||||||
|
#[test]
|
||||||
|
fn bogus_token_fails_validation() {
|
||||||
|
let bogus = "this-token-was-never-added-to-the-store";
|
||||||
|
assert!(
|
||||||
|
!super::validate_join_token(bogus),
|
||||||
|
"An unknown token must fail validation"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC5: Expired token — `validate_join_token` returns `false` for a token
|
||||||
|
/// whose `expires_at` is in the past.
|
||||||
|
#[test]
|
||||||
|
fn expired_token_fails_validation() {
|
||||||
|
// Insert a token directly with an already-past expiry timestamp.
|
||||||
|
let token = format!("test-expired-{}", uuid::Uuid::new_v4());
|
||||||
|
let store = super::CRDT_TOKENS
|
||||||
|
.get_or_init(|| std::sync::RwLock::new(std::collections::HashMap::new()));
|
||||||
|
// expires_at = 1 (way in the past — 1970-01-01T00:00:01Z)
|
||||||
|
store.write().unwrap().insert(token.clone(), 1.0_f64);
|
||||||
|
assert!(
|
||||||
|
!super::validate_join_token(&token),
|
||||||
|
"An expired token must fail validation"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC6: No token when server requires one — `validate_join_token` returns
|
||||||
|
/// `false` and the caller must reject with 401. Verifies the logic path
|
||||||
|
/// used by `crdt_sync_handler`.
|
||||||
|
#[test]
|
||||||
|
fn no_token_with_require_true_is_rejected() {
|
||||||
|
// Simulate: require_token=true, token=None → reject.
|
||||||
|
let require_token = true;
|
||||||
|
let token: Option<&str> = None;
|
||||||
|
let should_reject = match token {
|
||||||
|
Some(t) => !super::validate_join_token(t),
|
||||||
|
None if require_token => true,
|
||||||
|
None => false,
|
||||||
|
};
|
||||||
|
assert!(
|
||||||
|
should_reject,
|
||||||
|
"Missing token must be rejected when token is required"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC6: No token when server is in open mode — connection is accepted.
|
||||||
|
#[test]
|
||||||
|
fn no_token_with_require_false_is_accepted() {
|
||||||
|
let require_token = false;
|
||||||
|
let token: Option<&str> = None;
|
||||||
|
let should_reject = match token {
|
||||||
|
Some(t) => !super::validate_join_token(t),
|
||||||
|
None if require_token => true,
|
||||||
|
None => false,
|
||||||
|
};
|
||||||
|
assert!(
|
||||||
|
!should_reject,
|
||||||
|
"Missing token must be accepted in open mode"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC3: `spawn_rendezvous_client` URL construction — when a token is provided
|
||||||
|
/// the `?token=` query parameter is appended correctly.
|
||||||
|
#[test]
|
||||||
|
fn rendezvous_url_with_token_appended() {
|
||||||
|
let base = "ws://host:3001/crdt-sync";
|
||||||
|
let token = "my-secret-token";
|
||||||
|
let url_with_token = if base.contains('?') {
|
||||||
|
format!("{base}&token={token}")
|
||||||
|
} else {
|
||||||
|
format!("{base}?token={token}")
|
||||||
|
};
|
||||||
|
assert_eq!(
|
||||||
|
url_with_token,
|
||||||
|
"ws://host:3001/crdt-sync?token=my-secret-token"
|
||||||
|
);
|
||||||
|
|
||||||
|
// With existing query params.
|
||||||
|
let base_with_query = "ws://host:3001/crdt-sync?foo=bar";
|
||||||
|
let url_appended = if base_with_query.contains('?') {
|
||||||
|
format!("{base_with_query}&token={token}")
|
||||||
|
} else {
|
||||||
|
format!("{base_with_query}?token={token}")
|
||||||
|
};
|
||||||
|
assert_eq!(
|
||||||
|
url_appended,
|
||||||
|
"ws://host:3001/crdt-sync?foo=bar&token=my-secret-token"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC3: Without a token, the URL is used as-is.
|
||||||
|
#[test]
|
||||||
|
fn rendezvous_url_without_token_unchanged() {
|
||||||
|
let base = "ws://host:3001/crdt-sync";
|
||||||
|
let token: Option<&str> = None;
|
||||||
|
let connect_url = match token {
|
||||||
|
Some(t) => format!("{base}?token={t}"),
|
||||||
|
None => base.to_string(),
|
||||||
|
};
|
||||||
|
assert_eq!(connect_url, base);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `add_join_token` returns a future expiry timestamp that is in the future.
|
||||||
|
#[test]
|
||||||
|
fn add_join_token_returns_future_expiry() {
|
||||||
|
let token = format!("test-expiry-{}", uuid::Uuid::new_v4());
|
||||||
|
let now = chrono::Utc::now().timestamp() as f64;
|
||||||
|
let expires_at = super::add_join_token(token);
|
||||||
|
assert!(
|
||||||
|
expires_at > now,
|
||||||
|
"Expiry timestamp must be in the future (got {expires_at}, now={now})"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// TOKEN_TTL_SECS must be 30 days.
|
||||||
|
#[test]
|
||||||
|
fn token_ttl_is_thirty_days() {
|
||||||
|
assert_eq!(
|
||||||
|
super::TOKEN_TTL_SECS,
|
||||||
|
30.0 * 24.0 * 3600.0,
|
||||||
|
"TOKEN_TTL_SECS must be 30 days"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Config: `crdt_require_token` defaults to `false`.
|
||||||
|
#[test]
|
||||||
|
fn config_crdt_require_token_defaults_to_false() {
|
||||||
|
let config = crate::config::ProjectConfig::default();
|
||||||
|
assert!(
|
||||||
|
!config.crdt_require_token,
|
||||||
|
"crdt_require_token must default to false (open access)"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Config: `crdt_tokens` defaults to empty.
|
||||||
|
#[test]
|
||||||
|
fn config_crdt_tokens_defaults_to_empty() {
|
||||||
|
let config = crate::config::ProjectConfig::default();
|
||||||
|
assert!(
|
||||||
|
config.crdt_tokens.is_empty(),
|
||||||
|
"crdt_tokens must default to empty"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Config: `crdt_require_token` and `crdt_tokens` are parsed from TOML.
|
||||||
|
#[test]
|
||||||
|
fn config_crdt_token_fields_parsed_from_toml() {
|
||||||
|
let toml_str = r#"
|
||||||
|
crdt_require_token = true
|
||||||
|
crdt_tokens = ["token-abc", "token-xyz"]
|
||||||
|
|
||||||
|
[[agent]]
|
||||||
|
name = "test"
|
||||||
|
"#;
|
||||||
|
let config: crate::config::ProjectConfig = toml::from_str(toml_str).unwrap();
|
||||||
|
assert!(config.crdt_require_token);
|
||||||
|
assert_eq!(config.crdt_tokens, vec!["token-abc", "token-xyz"]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+30
-9
@@ -419,10 +419,19 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
|
|
||||||
// (CRDT state layer is initialised above alongside the legacy pipeline.db.)
|
// (CRDT state layer is initialised above alongside the legacy pipeline.db.)
|
||||||
|
|
||||||
// Load trusted keys and start the CRDT sync rendezvous client if configured.
|
// Load trusted keys, token auth config, and start the CRDT sync rendezvous
|
||||||
// In agent mode, the --rendezvous flag overrides project.toml.
|
// client if configured. In agent mode, the --rendezvous flag overrides
|
||||||
|
// project.toml. The --join-token / HUSKIES_JOIN_TOKEN is appended to the
|
||||||
|
// rendezvous URL as ?token=... so the server's bearer-token check passes.
|
||||||
|
let crdt_join_token = cli
|
||||||
|
.join_token
|
||||||
|
.clone()
|
||||||
|
.or_else(|| std::env::var("HUSKIES_JOIN_TOKEN").ok());
|
||||||
|
|
||||||
let sync_config = if is_agent {
|
let sync_config = if is_agent {
|
||||||
agent_rendezvous.clone().map(|url| (url, Vec::new()))
|
agent_rendezvous
|
||||||
|
.clone()
|
||||||
|
.map(|url| (url, Vec::new(), false, Vec::new()))
|
||||||
} else {
|
} else {
|
||||||
app_state
|
app_state
|
||||||
.project_root
|
.project_root
|
||||||
@@ -430,22 +439,34 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.and_then(|root| config::ProjectConfig::load(root).ok())
|
.and_then(|root| config::ProjectConfig::load(root).ok())
|
||||||
.and_then(|cfg| cfg.rendezvous.map(|url| (url, cfg.trusted_keys)))
|
.and_then(|cfg| {
|
||||||
|
cfg.rendezvous.map(|url| {
|
||||||
|
(
|
||||||
|
url,
|
||||||
|
cfg.trusted_keys,
|
||||||
|
cfg.crdt_require_token,
|
||||||
|
cfg.crdt_tokens,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
})
|
||||||
};
|
};
|
||||||
if let Some((rendezvous_url, trusted_keys)) = sync_config {
|
if let Some((rendezvous_url, trusted_keys, require_token, crdt_tokens)) = sync_config {
|
||||||
crdt_sync::init_trusted_keys(trusted_keys);
|
crdt_sync::init_trusted_keys(trusted_keys);
|
||||||
crdt_sync::spawn_rendezvous_client(rendezvous_url);
|
crdt_sync::init_token_auth(require_token, crdt_tokens);
|
||||||
|
crdt_sync::spawn_rendezvous_client(rendezvous_url, crdt_join_token);
|
||||||
} else {
|
} else {
|
||||||
// Even without rendezvous, initialise trusted keys for incoming connections.
|
// Even without rendezvous, initialise trusted keys and token auth for
|
||||||
let keys = app_state
|
// incoming connections.
|
||||||
|
let (keys, require_token, crdt_tokens) = app_state
|
||||||
.project_root
|
.project_root
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.and_then(|root| config::ProjectConfig::load(root).ok())
|
.and_then(|root| config::ProjectConfig::load(root).ok())
|
||||||
.map(|cfg| cfg.trusted_keys)
|
.map(|cfg| (cfg.trusted_keys, cfg.crdt_require_token, cfg.crdt_tokens))
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
crdt_sync::init_trusted_keys(keys);
|
crdt_sync::init_trusted_keys(keys);
|
||||||
|
crdt_sync::init_token_auth(require_token, crdt_tokens);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Agent mode: headless build agent ────────────────────────────────
|
// ── Agent mode: headless build agent ────────────────────────────────
|
||||||
|
|||||||
@@ -531,6 +531,8 @@ mod tests {
|
|||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
};
|
};
|
||||||
// Should complete without panic
|
// Should complete without panic
|
||||||
run_setup_commands(tmp.path(), &config).await;
|
run_setup_commands(tmp.path(), &config).await;
|
||||||
@@ -557,6 +559,8 @@ mod tests {
|
|||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
};
|
};
|
||||||
// Should complete without panic
|
// Should complete without panic
|
||||||
run_setup_commands(tmp.path(), &config).await;
|
run_setup_commands(tmp.path(), &config).await;
|
||||||
@@ -583,6 +587,8 @@ mod tests {
|
|||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
};
|
};
|
||||||
// Setup command failures are non-fatal — should not panic or propagate
|
// Setup command failures are non-fatal — should not panic or propagate
|
||||||
run_setup_commands(tmp.path(), &config).await;
|
run_setup_commands(tmp.path(), &config).await;
|
||||||
@@ -609,6 +615,8 @@ mod tests {
|
|||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
};
|
};
|
||||||
// Teardown failures are best-effort — should not propagate
|
// Teardown failures are best-effort — should not propagate
|
||||||
assert!(run_teardown_commands(tmp.path(), &config).await.is_ok());
|
assert!(run_teardown_commands(tmp.path(), &config).await.is_ok());
|
||||||
@@ -634,6 +642,8 @@ mod tests {
|
|||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
};
|
};
|
||||||
let info = create_worktree(&project_root, "42_fresh_test", &config, 3001)
|
let info = create_worktree(&project_root, "42_fresh_test", &config, 3001)
|
||||||
.await
|
.await
|
||||||
@@ -666,6 +676,8 @@ mod tests {
|
|||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
};
|
};
|
||||||
// First creation
|
// First creation
|
||||||
let _info1 = create_worktree(&project_root, "43_reuse_test", &config, 3001)
|
let _info1 = create_worktree(&project_root, "43_reuse_test", &config, 3001)
|
||||||
@@ -739,6 +751,8 @@ mod tests {
|
|||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = remove_worktree_by_story_id(tmp.path(), "99_nonexistent", &config).await;
|
let result = remove_worktree_by_story_id(tmp.path(), "99_nonexistent", &config).await;
|
||||||
@@ -770,6 +784,8 @@ mod tests {
|
|||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
};
|
};
|
||||||
create_worktree(&project_root, "88_remove_by_id", &config, 3001)
|
create_worktree(&project_root, "88_remove_by_id", &config, 3001)
|
||||||
.await
|
.await
|
||||||
@@ -848,6 +864,8 @@ mod tests {
|
|||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
};
|
};
|
||||||
// Even though setup commands fail, create_worktree must succeed
|
// Even though setup commands fail, create_worktree must succeed
|
||||||
// so the agent can start and fix the problem itself.
|
// so the agent can start and fix the problem itself.
|
||||||
@@ -882,6 +900,8 @@ mod tests {
|
|||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
};
|
};
|
||||||
// First creation — no setup commands, should succeed
|
// First creation — no setup commands, should succeed
|
||||||
create_worktree(&project_root, "173_reuse_fail", &empty_config, 3001)
|
create_worktree(&project_root, "173_reuse_fail", &empty_config, 3001)
|
||||||
@@ -906,6 +926,8 @@ mod tests {
|
|||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
};
|
};
|
||||||
// Second call — worktree exists, setup commands fail, must still succeed
|
// Second call — worktree exists, setup commands fail, must still succeed
|
||||||
let result = create_worktree(&project_root, "173_reuse_fail", &failing_config, 3002).await;
|
let result = create_worktree(&project_root, "173_reuse_fail", &failing_config, 3002).await;
|
||||||
@@ -936,6 +958,8 @@ mod tests {
|
|||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
trusted_keys: Vec::new(),
|
trusted_keys: Vec::new(),
|
||||||
|
crdt_require_token: false,
|
||||||
|
crdt_tokens: Vec::new(),
|
||||||
};
|
};
|
||||||
let info = create_worktree(&project_root, "77_remove_async", &config, 3001)
|
let info = create_worktree(&project_root, "77_remove_async", &config, 3001)
|
||||||
.await
|
.await
|
||||||
|
|||||||
Reference in New Issue
Block a user