diff --git a/server/src/cli.rs b/server/src/cli.rs index 7a6f1baf..07961773 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -27,6 +27,14 @@ pub(crate) struct CliArgs { /// forwards all `prompt_permission` tool calls to the gateway over a WebSocket. /// Also readable from the `HUSKIES_UPSTREAM_GATEWAY` env var. pub(crate) upstream_gateway: Option, + /// Whether the `upgrade` subcommand was given. + pub(crate) upgrade: bool, + /// Source URL for the `upgrade` subcommand (`--source `). + /// + /// If omitted, the upgrade subcommand falls back to + /// `HUSKIES_BINARY_SOURCE` env var, then derives the URL from + /// `HUSKIES_UPSTREAM_GATEWAY`. + pub(crate) upgrade_source: Option, } /// Parse CLI arguments into `CliArgs`, or exit early for `--help` / `--version`. @@ -41,6 +49,8 @@ pub(crate) fn parse_cli_args(args: &[String]) -> Result { let mut join_token: Option = None; let mut gateway_url: Option = None; let mut upstream_gateway: Option = None; + let mut upgrade = false; + let mut upgrade_source: Option = None; let mut i = 0; while i < args.len() { @@ -120,6 +130,19 @@ pub(crate) fn parse_cli_args(args: &[String]) -> Result { "agent" => { agent = true; } + "upgrade" => { + upgrade = true; + } + "--source" => { + i += 1; + if i >= args.len() { + return Err("--source requires a value".to_string()); + } + upgrade_source = Some(args[i].clone()); + } + a if a.starts_with("--source=") => { + upgrade_source = Some(a["--source=".len()..].to_string()); + } a if a.starts_with('-') => { return Err(format!("unknown option: {a}")); } @@ -147,6 +170,8 @@ pub(crate) fn parse_cli_args(args: &[String]) -> Result { join_token, gateway_url, upstream_gateway, + upgrade, + upgrade_source, }) } @@ -155,12 +180,16 @@ pub(crate) fn print_help() { println!("huskies init [OPTIONS] [PATH]"); println!("huskies agent --rendezvous [OPTIONS] [PATH]"); println!("huskies --gateway [OPTIONS] [PATH]"); + println!("huskies upgrade [--source ]"); println!(); println!("Serve a huskies project."); println!(); println!("COMMANDS:"); - println!(" init Scaffold a new .huskies/ project and start the interactive setup wizard."); - println!(" agent Run as a headless build agent — syncs CRDT state, claims and runs work."); + println!(" init Scaffold a new .huskies/ project and start the interactive setup wizard."); + println!(" agent Run as a headless build agent — syncs CRDT state, claims and runs work."); + println!( + " upgrade Fetch a new huskies binary from SOURCE and atomically replace the current" + ); println!(); println!("ARGS:"); println!( @@ -190,6 +219,8 @@ pub(crate) fn print_help() { println!(" sled connects to WS URL and forwards all"); println!(" prompt_permission calls via the uplink protocol."); println!(" Also readable from HUSKIES_UPSTREAM_GATEWAY env var."); + println!(" --source Binary source URL for the `upgrade` subcommand."); + println!(" Falls back to HUSKIES_BINARY_SOURCE env var."); } /// Resolve the optional positional path argument into an absolute `PathBuf`. @@ -399,6 +430,58 @@ mod tests { assert!(parse_cli_args(&args).is_err()); } + // ── upgrade subcommand ────────────────────────────────────────── + + #[test] + fn parse_upgrade_subcommand() { + let args = vec!["upgrade".to_string()]; + let result = parse_cli_args(&args).unwrap(); + assert!(result.upgrade); + assert_eq!(result.upgrade_source, None); + } + + #[test] + fn parse_upgrade_with_source_flag() { + let args = vec![ + "upgrade".to_string(), + "--source".to_string(), + "http://gateway:3000/api/huskies-binary".to_string(), + ]; + let result = parse_cli_args(&args).unwrap(); + assert!(result.upgrade); + assert_eq!( + result.upgrade_source, + Some("http://gateway:3000/api/huskies-binary".to_string()) + ); + } + + #[test] + fn parse_upgrade_with_source_equals_syntax() { + let args = vec![ + "upgrade".to_string(), + "--source=http://gw:3000/api/b".to_string(), + ]; + let result = parse_cli_args(&args).unwrap(); + assert!(result.upgrade); + assert_eq!( + result.upgrade_source, + Some("http://gw:3000/api/b".to_string()) + ); + } + + #[test] + fn parse_upgrade_source_missing_value_is_error() { + let args = vec!["upgrade".to_string(), "--source".to_string()]; + assert!(parse_cli_args(&args).is_err()); + } + + #[test] + fn parse_no_args_upgrade_is_false() { + let result = parse_cli_args(&[]).unwrap(); + assert!(!result.upgrade); + assert_eq!(result.upgrade_source, None); + } + // ── resolve_path_arg ──────────────────────────────────────────── #[test] diff --git a/server/src/gateway/mod.rs b/server/src/gateway/mod.rs index 3bdc403e..fc4fbacc 100644 --- a/server/src/gateway/mod.rs +++ b/server/src/gateway/mod.rs @@ -62,6 +62,11 @@ pub fn build_gateway_route(state_arc: Arc) -> impl poem::Endpoint "/gateway/agents/:id/assign", poem::post(gateway_assign_agent_handler), ) + // Binary self-update: serve the gateway binary so sleds can download it. + .at( + "/api/huskies-binary", + poem::get(crate::http::serve_binary_handler), + ) .data(state_arc) } diff --git a/server/src/http/gateway/mcp.rs b/server/src/http/gateway/mcp.rs index d4231ed2..a12f31b5 100644 --- a/server/src/http/gateway/mcp.rs +++ b/server/src/http/gateway/mcp.rs @@ -26,6 +26,8 @@ const GATEWAY_TOOLS: &[&str] = &[ // Handled at the gateway so the Matrix bot's perm_rx listener is used // rather than the container's (which has no interactive session attached). "prompt_permission", + // Binary self-update: gateway serves its own binary and triggers upgrade on sleds. + "upgrade_sled", ]; /// Gateway tool definitions. @@ -121,6 +123,23 @@ pub(crate) fn gateway_tool_definitions() -> Vec { "properties": {} } }), + json!({ + "name": "upgrade_sled", + "description": "Trigger a binary self-update on a project sled. The sled downloads the new binary from `source_url` (defaults to this gateway's /api/huskies-binary endpoint), atomically replaces its own executable, drains CRDT persistence so no ops are lost, and re-execs. Without `project`, upgrades the active project.", + "inputSchema": { + "type": "object", + "properties": { + "project": { + "type": "string", + "description": "Name of the project sled to upgrade. Defaults to the currently active project." + }, + "source_url": { + "type": "string", + "description": "HTTP URL of the binary to install (e.g. 'http://gateway:3000/api/huskies-binary'). Defaults to this gateway's own binary endpoint." + } + } + } + }), ] } @@ -385,6 +404,7 @@ async fn handle_gateway_tool( "aggregate_pipeline_status" => handle_aggregate_pipeline_status_tool(state, id).await, "agents.list" => handle_agents_list_tool(id), "prompt_permission" => handle_prompt_permission_tool(params, state, id).await, + "upgrade_sled" => handle_upgrade_sled_tool(params, state, id).await, _ => JsonRpcResponse::error(id, -32601, format!("Unknown gateway tool: {tool_name}")), } } @@ -769,6 +789,93 @@ fn handle_agents_list_tool(id: Option) -> JsonRpcResponse { ) } +/// Handle the `upgrade_sled` gateway tool. +/// +/// Posts `{"source_url": ""}` to the target sled's `/api/upgrade` endpoint, +/// which triggers the sled to download the new binary, drain CRDT persistence, +/// and re-exec. Returns 202 text immediately — the sled connection will drop +/// shortly after as `exec()` replaces the process. +async fn handle_upgrade_sled_tool( + params: &Value, + state: &GatewayState, + id: Option, +) -> JsonRpcResponse { + let args = params.get("arguments").unwrap_or(params); + + // Resolve target project URL (explicit project arg or active project). + let project_name = args.get("project").and_then(|v| v.as_str()); + let sled_url = if let Some(name) = project_name { + let projects = state.projects.read().await; + match projects.get(name).and_then(|e| e.url.clone()) { + Some(u) => u, + None => { + return JsonRpcResponse::error( + id, + -32602, + format!("Project '{name}' not found or has no URL configured"), + ); + } + } + } else { + match state.active_url().await { + Ok(u) => u, + Err(e) => return JsonRpcResponse::error(id, -32603, e.to_string()), + } + }; + + // Build the binary source URL: caller-supplied or this gateway's own endpoint. + let source_url = args + .get("source_url") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .unwrap_or_else(|| { + // Default: the gateway serves its own binary at /api/huskies-binary. + // Use the same host/port the gateway is bound to. + std::env::var("HUSKIES_GATEWAY_BINARY_URL") + .unwrap_or_else(|_| format!("http://gateway:{}/api/huskies-binary", state.port)) + }); + + let upgrade_url = format!("{sled_url}/api/upgrade"); + let body = serde_json::json!({ "source_url": source_url }); + + let active_name = project_name.map(|s| s.to_string()).unwrap_or_else(|| { + state + .active_project + .try_read() + .map(|g| g.clone()) + .unwrap_or_default() + }); + + match state.client.post(&upgrade_url).json(&body).send().await { + Ok(resp) if resp.status().is_success() || resp.status().as_u16() == 202 => { + JsonRpcResponse::success( + id, + json!({ + "content": [{ + "type": "text", + "text": format!( + "Upgrade triggered on '{active_name}'. The sled is downloading the new binary from {source_url} and will re-exec momentarily." + ) + }] + }), + ) + } + Ok(resp) => JsonRpcResponse::error( + id, + -32603, + format!( + "Sled returned HTTP {} for upgrade request to {upgrade_url}", + resp.status() + ), + ), + Err(e) => JsonRpcResponse::error( + id, + -32603, + format!("Failed to send upgrade request to {upgrade_url}: {e}"), + ), + } +} + /// Handle the `pipeline.get` read-RPC — returns per-project item lists in the /// shape expected by the gateway web UI: /// `{ "active": "...", "projects": { "name": { "active": [...], "backlog_count": N } } }`. diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index 07dcfab7..b3a6a670 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -104,6 +104,10 @@ pub fn build_routes( route = route.at("/api/events", get(events::events_handler).data(buf)); } + route = route + .at("/api/upgrade", post(upgrade_trigger_handler)) + .at("/api/huskies-binary", get(serve_binary_handler)); + if let Some(wa_ctx) = whatsapp_ctx { route = route.at( "/webhook/whatsapp", @@ -209,6 +213,72 @@ pub fn debug_crdt_handler(req: &poem::Request) -> poem::Response { .body(serde_json::to_string_pretty(&body).unwrap_or_default()) } +/// `POST /api/upgrade` — trigger a self-update on the running sled. +/// +/// Accepts `{"source_url": "http://gateway:3000/api/huskies-binary"}` and +/// spawns the upgrade task in the background, returning 202 immediately. +/// The connection will be dropped when `exec()` replaces the process. +#[poem::handler] +pub async fn upgrade_trigger_handler( + body: poem::web::Json, + ctx: poem::web::Data<&std::sync::Arc>, +) -> poem::Response { + let source_url = match body + .0 + .get("source_url") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + { + Some(u) => u, + None => { + return poem::Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("Missing required field: source_url"); + } + }; + + let project_root = ctx.state.get_project_root().unwrap_or_default(); + + // Spawn upgrade in background so we can return 202 before exec() fires. + tokio::spawn(async move { + if let Err(e) = crate::upgrade::upgrade_and_reexec(&source_url, &project_root).await { + crate::slog!("[upgrade] Upgrade failed: {e}"); + } + }); + + poem::Response::builder() + .status(StatusCode::ACCEPTED) + .body("Upgrade triggered. The sled will re-exec momentarily.") +} + +/// `GET /api/huskies-binary` — serve the running binary so peer sleds can download it. +/// +/// Streams `current_exe()` (the binary that is currently running) as an +/// `application/octet-stream` download. Returns 500 if the path cannot be +/// resolved or read. +#[poem::handler] +pub async fn serve_binary_handler() -> poem::Response { + let exe = match std::env::current_exe() { + Ok(p) => p, + Err(e) => { + return poem::Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(format!("Cannot resolve current executable: {e}")); + } + }; + + match tokio::fs::read(&exe).await { + Ok(bytes) => poem::Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/octet-stream") + .header("Content-Disposition", "attachment; filename=\"huskies\"") + .body(bytes), + Err(e) => poem::Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(format!("Cannot read binary at {}: {e}", exe.display())), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/server/src/main.rs b/server/src/main.rs index a95ee792..2aac2709 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -49,6 +49,8 @@ pub mod sled_uplink; mod startup; mod state; mod store; +/// In-container binary self-update — fetch, atomic replace, and re-exec. +pub mod upgrade; /// Validated input layer — transport-agnostic newtypes and request structs for all MCP write tools. pub mod validation; mod workflow; @@ -72,6 +74,19 @@ mod cli; use cli::{parse_cli_args, resolve_path_arg}; +/// Convert a WebSocket gateway URL into the binary download HTTP URL. +/// +/// `ws://gateway:3000/api/sled-uplink?token=x` → `http://gateway:3000/api/huskies-binary` +fn derive_binary_url_from_ws(ws_url: &str) -> Option { + let http = ws_url + .strip_prefix("wss://") + .map(|s| format!("https://{s}")) + .or_else(|| ws_url.strip_prefix("ws://").map(|s| format!("http://{s}")))?; + // Strip any path and query string, then append the binary endpoint. + let base = http.split('/').take(3).collect::>().join("/"); + Some(format!("{base}/api/huskies-binary")) +} + #[tokio::main] async fn main() -> Result<(), std::io::Error> { // Reap zombie grandchildren on Unix (for native deployments without tini/init). @@ -145,6 +160,27 @@ async fn main() -> Result<(), std::io::Error> { } } + // ── Upgrade mode: fetch new binary, replace, exit ─────────────────────── + if cli.upgrade { + let source = cli + .upgrade_source + .clone() + .or_else(|| std::env::var("HUSKIES_BINARY_SOURCE").ok()) + .unwrap_or_else(|| { + // Derive from HUSKIES_UPSTREAM_GATEWAY: ws://host:port/... → http://host:port/api/huskies-binary + std::env::var("HUSKIES_UPSTREAM_GATEWAY") + .ok() + .and_then(|ws| derive_binary_url_from_ws(&ws)) + .unwrap_or_else(|| "http://gateway:3000/api/huskies-binary".to_string()) + }); + let target = upgrade::resolve_target_path(); + if let Err(e) = upgrade::run_cli_upgrade(&source, &target).await { + eprintln!("error: {e}"); + std::process::exit(1); + } + return Ok(()); + } + // ── Gateway mode: multi-project proxy ──────────────────────────────────── if is_gateway { let config_dir = explicit_path.unwrap_or_else(|| cwd.clone()); @@ -464,4 +500,28 @@ name = "coder" config::ProjectConfig::load(tmp.path()) .unwrap_or_else(|e| panic!("Invalid project.toml: {e}")); } + + #[test] + fn derive_binary_url_strips_ws_scheme_and_path() { + let url = derive_binary_url_from_ws("ws://gateway:3000/api/sled-uplink?token=abc"); + assert_eq!( + url.as_deref(), + Some("http://gateway:3000/api/huskies-binary") + ); + } + + #[test] + fn derive_binary_url_handles_wss_scheme() { + let url = derive_binary_url_from_ws("wss://myhost:443/path"); + assert_eq!( + url.as_deref(), + Some("https://myhost:443/api/huskies-binary") + ); + } + + #[test] + fn derive_binary_url_invalid_scheme_returns_none() { + let url = derive_binary_url_from_ws("http://not-a-ws-url"); + assert!(url.is_none()); + } } diff --git a/server/src/upgrade.rs b/server/src/upgrade.rs new file mode 100644 index 00000000..2c014578 --- /dev/null +++ b/server/src/upgrade.rs @@ -0,0 +1,263 @@ +//! In-container binary self-update — fetch a new `huskies` binary, atomically +//! replace the on-disk executable, drain CRDT persistence, and re-exec. + +use crate::slog; +use std::path::{Path, PathBuf}; + +// ── Binary fetch ───────────────────────────────────────────────────────────── + +/// Download a binary from `source_url` and atomically replace `target_path`. +/// +/// Writes to a sibling `.tmp` file first, then renames so the replacement is +/// atomic on the same filesystem. Sets the execute bit before renaming so the +/// file is runnable the moment it appears at the target location. +pub async fn fetch_and_replace_binary(source_url: &str, target_path: &Path) -> Result<(), String> { + slog!("[upgrade] Fetching binary from {source_url}"); + + let resp = reqwest::get(source_url) + .await + .map_err(|e| format!("Failed to fetch binary from {source_url}: {e}"))?; + + if !resp.status().is_success() { + return Err(format!( + "Binary fetch returned HTTP {}: {source_url}", + resp.status() + )); + } + + let bytes = resp + .bytes() + .await + .map_err(|e| format!("Failed to read binary response body: {e}"))?; + + if bytes.is_empty() { + return Err("Binary fetch returned an empty body".to_string()); + } + + // Write to a sibling temp file so the rename is atomic on the same FS. + let tmp_path = sibling_tmp_path(target_path)?; + std::fs::write(&tmp_path, &bytes) + .map_err(|e| format!("Failed to write tmp binary to {}: {e}", tmp_path.display()))?; + + set_executable(&tmp_path)?; + + std::fs::rename(&tmp_path, target_path).map_err(|e| { + format!( + "Failed to rename {} → {}: {e}", + tmp_path.display(), + target_path.display() + ) + })?; + + slog!( + "[upgrade] Binary replaced at {} ({} bytes)", + target_path.display(), + bytes.len() + ); + Ok(()) +} + +// ── Full server upgrade (called from the running process) ───────────────── + +/// Fetch a new binary, atomically replace the current executable, drain CRDT +/// persistence, and re-exec the running server process with its original args. +/// +/// This function never returns on success — `exec()` replaces the process. +/// On failure it returns `Err(message)` so the caller can report the error +/// while keeping the original server running. +pub async fn upgrade_and_reexec(source_url: &str, project_root: &Path) -> Result { + let target = resolve_target_path(); + + fetch_and_replace_binary(source_url, &target).await?; + + // Drain queued CRDT ops so nothing is lost when exec() replaces the process. + crate::crdt_state::flush_persistence(std::time::Duration::from_secs(5)).await; + + // Clean up the port file so the new process can write a fresh one. + let port_file = project_root.join(".huskies_port"); + if port_file.exists() { + let _ = std::fs::remove_file(&port_file); + } + + let args: Vec = std::env::args().collect(); + slog!("[upgrade] Re-execing with new binary: {}", target.display()); + + use std::os::unix::process::CommandExt; + let err = std::process::Command::new(&target).args(&args[1..]).exec(); + + // exec() only returns on failure. + Err(format!( + "Failed to exec new binary at {}: {err}", + target.display() + )) +} + +// ── CLI upgrade (no re-exec) ───────────────────────────────────────────── + +/// Run the `huskies upgrade` CLI subcommand: download, replace, and exit. +/// +/// Unlike [`upgrade_and_reexec`], this does not flush the CRDT or re-exec +/// because the CLI subcommand is run as a standalone command (not the server). +/// After this returns the caller should exit. +pub async fn run_cli_upgrade(source_url: &str, target: &Path) -> Result<(), String> { + fetch_and_replace_binary(source_url, target).await?; + println!( + "Upgrade complete. New binary installed at {}.", + target.display() + ); + Ok(()) +} + +// ── Helpers ─────────────────────────────────────────────────────────────── + +/// Resolve the path to replace: `current_exe()` if accessible, else +/// `/usr/local/bin/huskies`. +pub fn resolve_target_path() -> PathBuf { + std::env::current_exe().unwrap_or_else(|_| PathBuf::from("/usr/local/bin/huskies")) +} + +fn sibling_tmp_path(target: &Path) -> Result { + let parent = target + .parent() + .ok_or_else(|| format!("Cannot determine parent dir of {}", target.display()))?; + Ok(parent.join(".huskies_upgrade.tmp")) +} + +#[cfg(unix)] +fn set_executable(path: &Path) -> Result<(), String> { + use std::os::unix::fs::PermissionsExt; + let meta = + std::fs::metadata(path).map_err(|e| format!("Cannot stat {}: {e}", path.display()))?; + let mut perms = meta.permissions(); + perms.set_mode(0o755); + std::fs::set_permissions(path, perms) + .map_err(|e| format!("Cannot chmod {}: {e}", path.display())) +} + +#[cfg(not(unix))] +fn set_executable(_path: &Path) -> Result<(), String> { + Ok(()) +} + +// ── Tests ───────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + /// Start a tiny HTTP server in the background that serves `content` at `/`. + async fn serve_bytes(content: Vec) -> (u16, tokio::task::JoinHandle<()>) { + use std::sync::Arc; + + let content = Arc::new(content); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + + let handle = tokio::spawn(async move { + loop { + let Ok((mut stream, _)) = listener.accept().await else { + break; + }; + let content = Arc::clone(&content); + tokio::spawn(async move { + use tokio::io::AsyncWriteExt; + // Drain the HTTP request (ignore it). + let mut buf = [0u8; 4096]; + let _ = tokio::io::AsyncReadExt::read(&mut stream, &mut buf).await; + // Write a minimal HTTP/1.1 200 response. + let header = format!( + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: application/octet-stream\r\n\r\n", + content.len() + ); + let _ = stream.write_all(header.as_bytes()).await; + let _ = stream.write_all(&content).await; + }); + } + }); + + (port, handle) + } + + #[tokio::test] + async fn fetch_and_replace_binary_downloads_and_replaces() { + let dir = tempfile::tempdir().unwrap(); + let target = dir.path().join("huskies"); + std::fs::write(&target, b"old binary").unwrap(); + + let content = b"new binary content v0.99.0".to_vec(); + let (port, _srv) = serve_bytes(content.clone()).await; + + let url = format!("http://127.0.0.1:{port}/huskies"); + fetch_and_replace_binary(&url, &target).await.unwrap(); + + let on_disk = std::fs::read(&target).unwrap(); + assert_eq!( + on_disk, content, + "target must contain the downloaded content" + ); + } + + #[tokio::test] + async fn fetch_and_replace_binary_sets_executable_bit() { + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + + let dir = tempfile::tempdir().unwrap(); + let target = dir.path().join("huskies"); + std::fs::write(&target, b"old").unwrap(); + + let (port, _srv) = serve_bytes(b"#!/bin/sh\nexit 0".to_vec()).await; + let url = format!("http://127.0.0.1:{port}/huskies"); + fetch_and_replace_binary(&url, &target).await.unwrap(); + + let mode = std::fs::metadata(&target).unwrap().permissions().mode(); + assert!(mode & 0o111 != 0, "binary must be executable after upgrade"); + } + } + + #[tokio::test] + async fn fetch_and_replace_binary_empty_body_is_error() { + let dir = tempfile::tempdir().unwrap(); + let target = dir.path().join("huskies"); + std::fs::write(&target, b"old").unwrap(); + + let (port, _srv) = serve_bytes(vec![]).await; + let url = format!("http://127.0.0.1:{port}/huskies"); + let err = fetch_and_replace_binary(&url, &target).await.unwrap_err(); + assert!( + err.contains("empty"), + "expected empty-body error, got: {err}" + ); + + // Original must be untouched (rename never happened). + let on_disk = std::fs::read(&target).unwrap(); + assert_eq!(on_disk, b"old"); + } + + #[tokio::test] + async fn fetch_and_replace_binary_unreachable_url_is_error() { + let dir = tempfile::tempdir().unwrap(); + let target = dir.path().join("huskies"); + let err = fetch_and_replace_binary("http://127.0.0.1:1/huskies", &target) + .await + .unwrap_err(); + assert!(!err.is_empty(), "expected a non-empty error"); + } + + #[tokio::test] + async fn persisted_ops_count_does_not_decrease_after_flush() { + // Initialise an in-process CRDT, flush it, and verify the persisted + // count is stable (AC 5 — no ops lost across upgrade). + crate::crdt_state::init_for_test(); + + let before = crate::crdt_state::dump_crdt_state(None).persisted_ops_count; + crate::crdt_state::flush_persistence(std::time::Duration::from_millis(200)).await; + let after = crate::crdt_state::dump_crdt_state(None).persisted_ops_count; + + assert!( + after >= before, + "persisted_ops_count must not decrease after flush: before={before} after={after}" + ); + } +}