huskies: merge 1138 story In-container huskies self-update — huskies upgrade pulls a fresh binary without docker rebuild
This commit is contained in:
+85
-2
@@ -27,6 +27,14 @@ pub(crate) struct CliArgs {
|
||||
/// forwards all `prompt_permission` tool calls to the gateway over a WebSocket.
|
||||
/// Also readable from the `HUSKIES_UPSTREAM_GATEWAY` env var.
|
||||
pub(crate) upstream_gateway: Option<String>,
|
||||
/// Whether the `upgrade` subcommand was given.
|
||||
pub(crate) upgrade: bool,
|
||||
/// Source URL for the `upgrade` subcommand (`--source <URL>`).
|
||||
///
|
||||
/// If omitted, the upgrade subcommand falls back to
|
||||
/// `HUSKIES_BINARY_SOURCE` env var, then derives the URL from
|
||||
/// `HUSKIES_UPSTREAM_GATEWAY`.
|
||||
pub(crate) upgrade_source: Option<String>,
|
||||
}
|
||||
|
||||
/// Parse CLI arguments into `CliArgs`, or exit early for `--help` / `--version`.
|
||||
@@ -41,6 +49,8 @@ pub(crate) fn parse_cli_args(args: &[String]) -> Result<CliArgs, String> {
|
||||
let mut join_token: Option<String> = None;
|
||||
let mut gateway_url: Option<String> = None;
|
||||
let mut upstream_gateway: Option<String> = None;
|
||||
let mut upgrade = false;
|
||||
let mut upgrade_source: Option<String> = None;
|
||||
let mut i = 0;
|
||||
|
||||
while i < args.len() {
|
||||
@@ -120,6 +130,19 @@ pub(crate) fn parse_cli_args(args: &[String]) -> Result<CliArgs, String> {
|
||||
"agent" => {
|
||||
agent = true;
|
||||
}
|
||||
"upgrade" => {
|
||||
upgrade = true;
|
||||
}
|
||||
"--source" => {
|
||||
i += 1;
|
||||
if i >= args.len() {
|
||||
return Err("--source requires a value".to_string());
|
||||
}
|
||||
upgrade_source = Some(args[i].clone());
|
||||
}
|
||||
a if a.starts_with("--source=") => {
|
||||
upgrade_source = Some(a["--source=".len()..].to_string());
|
||||
}
|
||||
a if a.starts_with('-') => {
|
||||
return Err(format!("unknown option: {a}"));
|
||||
}
|
||||
@@ -147,6 +170,8 @@ pub(crate) fn parse_cli_args(args: &[String]) -> Result<CliArgs, String> {
|
||||
join_token,
|
||||
gateway_url,
|
||||
upstream_gateway,
|
||||
upgrade,
|
||||
upgrade_source,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -155,12 +180,16 @@ pub(crate) fn print_help() {
|
||||
println!("huskies init [OPTIONS] [PATH]");
|
||||
println!("huskies agent --rendezvous <URL> [OPTIONS] [PATH]");
|
||||
println!("huskies --gateway [OPTIONS] [PATH]");
|
||||
println!("huskies upgrade [--source <URL>]");
|
||||
println!();
|
||||
println!("Serve a huskies project.");
|
||||
println!();
|
||||
println!("COMMANDS:");
|
||||
println!(" init Scaffold a new .huskies/ project and start the interactive setup wizard.");
|
||||
println!(" agent Run as a headless build agent — syncs CRDT state, claims and runs work.");
|
||||
println!(" init Scaffold a new .huskies/ project and start the interactive setup wizard.");
|
||||
println!(" agent Run as a headless build agent — syncs CRDT state, claims and runs work.");
|
||||
println!(
|
||||
" upgrade Fetch a new huskies binary from SOURCE and atomically replace the current"
|
||||
);
|
||||
println!();
|
||||
println!("ARGS:");
|
||||
println!(
|
||||
@@ -190,6 +219,8 @@ pub(crate) fn print_help() {
|
||||
println!(" sled connects to WS URL and forwards all");
|
||||
println!(" prompt_permission calls via the uplink protocol.");
|
||||
println!(" Also readable from HUSKIES_UPSTREAM_GATEWAY env var.");
|
||||
println!(" --source <URL> Binary source URL for the `upgrade` subcommand.");
|
||||
println!(" Falls back to HUSKIES_BINARY_SOURCE env var.");
|
||||
}
|
||||
|
||||
/// Resolve the optional positional path argument into an absolute `PathBuf`.
|
||||
@@ -399,6 +430,58 @@ mod tests {
|
||||
assert!(parse_cli_args(&args).is_err());
|
||||
}
|
||||
|
||||
// ── upgrade subcommand ──────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn parse_upgrade_subcommand() {
|
||||
let args = vec!["upgrade".to_string()];
|
||||
let result = parse_cli_args(&args).unwrap();
|
||||
assert!(result.upgrade);
|
||||
assert_eq!(result.upgrade_source, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_upgrade_with_source_flag() {
|
||||
let args = vec![
|
||||
"upgrade".to_string(),
|
||||
"--source".to_string(),
|
||||
"http://gateway:3000/api/huskies-binary".to_string(),
|
||||
];
|
||||
let result = parse_cli_args(&args).unwrap();
|
||||
assert!(result.upgrade);
|
||||
assert_eq!(
|
||||
result.upgrade_source,
|
||||
Some("http://gateway:3000/api/huskies-binary".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_upgrade_with_source_equals_syntax() {
|
||||
let args = vec![
|
||||
"upgrade".to_string(),
|
||||
"--source=http://gw:3000/api/b".to_string(),
|
||||
];
|
||||
let result = parse_cli_args(&args).unwrap();
|
||||
assert!(result.upgrade);
|
||||
assert_eq!(
|
||||
result.upgrade_source,
|
||||
Some("http://gw:3000/api/b".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_upgrade_source_missing_value_is_error() {
|
||||
let args = vec!["upgrade".to_string(), "--source".to_string()];
|
||||
assert!(parse_cli_args(&args).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_no_args_upgrade_is_false() {
|
||||
let result = parse_cli_args(&[]).unwrap();
|
||||
assert!(!result.upgrade);
|
||||
assert_eq!(result.upgrade_source, None);
|
||||
}
|
||||
|
||||
// ── resolve_path_arg ────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -62,6 +62,11 @@ pub fn build_gateway_route(state_arc: Arc<GatewayState>) -> impl poem::Endpoint
|
||||
"/gateway/agents/:id/assign",
|
||||
poem::post(gateway_assign_agent_handler),
|
||||
)
|
||||
// Binary self-update: serve the gateway binary so sleds can download it.
|
||||
.at(
|
||||
"/api/huskies-binary",
|
||||
poem::get(crate::http::serve_binary_handler),
|
||||
)
|
||||
.data(state_arc)
|
||||
}
|
||||
|
||||
|
||||
@@ -26,6 +26,8 @@ const GATEWAY_TOOLS: &[&str] = &[
|
||||
// Handled at the gateway so the Matrix bot's perm_rx listener is used
|
||||
// rather than the container's (which has no interactive session attached).
|
||||
"prompt_permission",
|
||||
// Binary self-update: gateway serves its own binary and triggers upgrade on sleds.
|
||||
"upgrade_sled",
|
||||
];
|
||||
|
||||
/// Gateway tool definitions.
|
||||
@@ -121,6 +123,23 @@ pub(crate) fn gateway_tool_definitions() -> Vec<Value> {
|
||||
"properties": {}
|
||||
}
|
||||
}),
|
||||
json!({
|
||||
"name": "upgrade_sled",
|
||||
"description": "Trigger a binary self-update on a project sled. The sled downloads the new binary from `source_url` (defaults to this gateway's /api/huskies-binary endpoint), atomically replaces its own executable, drains CRDT persistence so no ops are lost, and re-execs. Without `project`, upgrades the active project.",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"project": {
|
||||
"type": "string",
|
||||
"description": "Name of the project sled to upgrade. Defaults to the currently active project."
|
||||
},
|
||||
"source_url": {
|
||||
"type": "string",
|
||||
"description": "HTTP URL of the binary to install (e.g. 'http://gateway:3000/api/huskies-binary'). Defaults to this gateway's own binary endpoint."
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
]
|
||||
}
|
||||
|
||||
@@ -385,6 +404,7 @@ async fn handle_gateway_tool(
|
||||
"aggregate_pipeline_status" => handle_aggregate_pipeline_status_tool(state, id).await,
|
||||
"agents.list" => handle_agents_list_tool(id),
|
||||
"prompt_permission" => handle_prompt_permission_tool(params, state, id).await,
|
||||
"upgrade_sled" => handle_upgrade_sled_tool(params, state, id).await,
|
||||
_ => JsonRpcResponse::error(id, -32601, format!("Unknown gateway tool: {tool_name}")),
|
||||
}
|
||||
}
|
||||
@@ -769,6 +789,93 @@ fn handle_agents_list_tool(id: Option<Value>) -> JsonRpcResponse {
|
||||
)
|
||||
}
|
||||
|
||||
/// Handle the `upgrade_sled` gateway tool.
|
||||
///
|
||||
/// Posts `{"source_url": "<url>"}` to the target sled's `/api/upgrade` endpoint,
|
||||
/// which triggers the sled to download the new binary, drain CRDT persistence,
|
||||
/// and re-exec. Returns 202 text immediately — the sled connection will drop
|
||||
/// shortly after as `exec()` replaces the process.
|
||||
async fn handle_upgrade_sled_tool(
|
||||
params: &Value,
|
||||
state: &GatewayState,
|
||||
id: Option<Value>,
|
||||
) -> JsonRpcResponse {
|
||||
let args = params.get("arguments").unwrap_or(params);
|
||||
|
||||
// Resolve target project URL (explicit project arg or active project).
|
||||
let project_name = args.get("project").and_then(|v| v.as_str());
|
||||
let sled_url = if let Some(name) = project_name {
|
||||
let projects = state.projects.read().await;
|
||||
match projects.get(name).and_then(|e| e.url.clone()) {
|
||||
Some(u) => u,
|
||||
None => {
|
||||
return JsonRpcResponse::error(
|
||||
id,
|
||||
-32602,
|
||||
format!("Project '{name}' not found or has no URL configured"),
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
match state.active_url().await {
|
||||
Ok(u) => u,
|
||||
Err(e) => return JsonRpcResponse::error(id, -32603, e.to_string()),
|
||||
}
|
||||
};
|
||||
|
||||
// Build the binary source URL: caller-supplied or this gateway's own endpoint.
|
||||
let source_url = args
|
||||
.get("source_url")
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(|| {
|
||||
// Default: the gateway serves its own binary at /api/huskies-binary.
|
||||
// Use the same host/port the gateway is bound to.
|
||||
std::env::var("HUSKIES_GATEWAY_BINARY_URL")
|
||||
.unwrap_or_else(|_| format!("http://gateway:{}/api/huskies-binary", state.port))
|
||||
});
|
||||
|
||||
let upgrade_url = format!("{sled_url}/api/upgrade");
|
||||
let body = serde_json::json!({ "source_url": source_url });
|
||||
|
||||
let active_name = project_name.map(|s| s.to_string()).unwrap_or_else(|| {
|
||||
state
|
||||
.active_project
|
||||
.try_read()
|
||||
.map(|g| g.clone())
|
||||
.unwrap_or_default()
|
||||
});
|
||||
|
||||
match state.client.post(&upgrade_url).json(&body).send().await {
|
||||
Ok(resp) if resp.status().is_success() || resp.status().as_u16() == 202 => {
|
||||
JsonRpcResponse::success(
|
||||
id,
|
||||
json!({
|
||||
"content": [{
|
||||
"type": "text",
|
||||
"text": format!(
|
||||
"Upgrade triggered on '{active_name}'. The sled is downloading the new binary from {source_url} and will re-exec momentarily."
|
||||
)
|
||||
}]
|
||||
}),
|
||||
)
|
||||
}
|
||||
Ok(resp) => JsonRpcResponse::error(
|
||||
id,
|
||||
-32603,
|
||||
format!(
|
||||
"Sled returned HTTP {} for upgrade request to {upgrade_url}",
|
||||
resp.status()
|
||||
),
|
||||
),
|
||||
Err(e) => JsonRpcResponse::error(
|
||||
id,
|
||||
-32603,
|
||||
format!("Failed to send upgrade request to {upgrade_url}: {e}"),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle the `pipeline.get` read-RPC — returns per-project item lists in the
|
||||
/// shape expected by the gateway web UI:
|
||||
/// `{ "active": "...", "projects": { "name": { "active": [...], "backlog_count": N } } }`.
|
||||
|
||||
@@ -104,6 +104,10 @@ pub fn build_routes(
|
||||
route = route.at("/api/events", get(events::events_handler).data(buf));
|
||||
}
|
||||
|
||||
route = route
|
||||
.at("/api/upgrade", post(upgrade_trigger_handler))
|
||||
.at("/api/huskies-binary", get(serve_binary_handler));
|
||||
|
||||
if let Some(wa_ctx) = whatsapp_ctx {
|
||||
route = route.at(
|
||||
"/webhook/whatsapp",
|
||||
@@ -209,6 +213,72 @@ pub fn debug_crdt_handler(req: &poem::Request) -> poem::Response {
|
||||
.body(serde_json::to_string_pretty(&body).unwrap_or_default())
|
||||
}
|
||||
|
||||
/// `POST /api/upgrade` — trigger a self-update on the running sled.
|
||||
///
|
||||
/// Accepts `{"source_url": "http://gateway:3000/api/huskies-binary"}` and
|
||||
/// spawns the upgrade task in the background, returning 202 immediately.
|
||||
/// The connection will be dropped when `exec()` replaces the process.
|
||||
#[poem::handler]
|
||||
pub async fn upgrade_trigger_handler(
|
||||
body: poem::web::Json<serde_json::Value>,
|
||||
ctx: poem::web::Data<&std::sync::Arc<AppContext>>,
|
||||
) -> poem::Response {
|
||||
let source_url = match body
|
||||
.0
|
||||
.get("source_url")
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|s| s.to_string())
|
||||
{
|
||||
Some(u) => u,
|
||||
None => {
|
||||
return poem::Response::builder()
|
||||
.status(StatusCode::BAD_REQUEST)
|
||||
.body("Missing required field: source_url");
|
||||
}
|
||||
};
|
||||
|
||||
let project_root = ctx.state.get_project_root().unwrap_or_default();
|
||||
|
||||
// Spawn upgrade in background so we can return 202 before exec() fires.
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = crate::upgrade::upgrade_and_reexec(&source_url, &project_root).await {
|
||||
crate::slog!("[upgrade] Upgrade failed: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
poem::Response::builder()
|
||||
.status(StatusCode::ACCEPTED)
|
||||
.body("Upgrade triggered. The sled will re-exec momentarily.")
|
||||
}
|
||||
|
||||
/// `GET /api/huskies-binary` — serve the running binary so peer sleds can download it.
|
||||
///
|
||||
/// Streams `current_exe()` (the binary that is currently running) as an
|
||||
/// `application/octet-stream` download. Returns 500 if the path cannot be
|
||||
/// resolved or read.
|
||||
#[poem::handler]
|
||||
pub async fn serve_binary_handler() -> poem::Response {
|
||||
let exe = match std::env::current_exe() {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
return poem::Response::builder()
|
||||
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.body(format!("Cannot resolve current executable: {e}"));
|
||||
}
|
||||
};
|
||||
|
||||
match tokio::fs::read(&exe).await {
|
||||
Ok(bytes) => poem::Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header("Content-Type", "application/octet-stream")
|
||||
.header("Content-Disposition", "attachment; filename=\"huskies\"")
|
||||
.body(bytes),
|
||||
Err(e) => poem::Response::builder()
|
||||
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.body(format!("Cannot read binary at {}: {e}", exe.display())),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -49,6 +49,8 @@ pub mod sled_uplink;
|
||||
mod startup;
|
||||
mod state;
|
||||
mod store;
|
||||
/// In-container binary self-update — fetch, atomic replace, and re-exec.
|
||||
pub mod upgrade;
|
||||
/// Validated input layer — transport-agnostic newtypes and request structs for all MCP write tools.
|
||||
pub mod validation;
|
||||
mod workflow;
|
||||
@@ -72,6 +74,19 @@ mod cli;
|
||||
|
||||
use cli::{parse_cli_args, resolve_path_arg};
|
||||
|
||||
/// Convert a WebSocket gateway URL into the binary download HTTP URL.
|
||||
///
|
||||
/// `ws://gateway:3000/api/sled-uplink?token=x` → `http://gateway:3000/api/huskies-binary`
|
||||
fn derive_binary_url_from_ws(ws_url: &str) -> Option<String> {
|
||||
let http = ws_url
|
||||
.strip_prefix("wss://")
|
||||
.map(|s| format!("https://{s}"))
|
||||
.or_else(|| ws_url.strip_prefix("ws://").map(|s| format!("http://{s}")))?;
|
||||
// Strip any path and query string, then append the binary endpoint.
|
||||
let base = http.split('/').take(3).collect::<Vec<_>>().join("/");
|
||||
Some(format!("{base}/api/huskies-binary"))
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), std::io::Error> {
|
||||
// Reap zombie grandchildren on Unix (for native deployments without tini/init).
|
||||
@@ -145,6 +160,27 @@ async fn main() -> Result<(), std::io::Error> {
|
||||
}
|
||||
}
|
||||
|
||||
// ── Upgrade mode: fetch new binary, replace, exit ───────────────────────
|
||||
if cli.upgrade {
|
||||
let source = cli
|
||||
.upgrade_source
|
||||
.clone()
|
||||
.or_else(|| std::env::var("HUSKIES_BINARY_SOURCE").ok())
|
||||
.unwrap_or_else(|| {
|
||||
// Derive from HUSKIES_UPSTREAM_GATEWAY: ws://host:port/... → http://host:port/api/huskies-binary
|
||||
std::env::var("HUSKIES_UPSTREAM_GATEWAY")
|
||||
.ok()
|
||||
.and_then(|ws| derive_binary_url_from_ws(&ws))
|
||||
.unwrap_or_else(|| "http://gateway:3000/api/huskies-binary".to_string())
|
||||
});
|
||||
let target = upgrade::resolve_target_path();
|
||||
if let Err(e) = upgrade::run_cli_upgrade(&source, &target).await {
|
||||
eprintln!("error: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// ── Gateway mode: multi-project proxy ────────────────────────────────────
|
||||
if is_gateway {
|
||||
let config_dir = explicit_path.unwrap_or_else(|| cwd.clone());
|
||||
@@ -464,4 +500,28 @@ name = "coder"
|
||||
config::ProjectConfig::load(tmp.path())
|
||||
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn derive_binary_url_strips_ws_scheme_and_path() {
|
||||
let url = derive_binary_url_from_ws("ws://gateway:3000/api/sled-uplink?token=abc");
|
||||
assert_eq!(
|
||||
url.as_deref(),
|
||||
Some("http://gateway:3000/api/huskies-binary")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn derive_binary_url_handles_wss_scheme() {
|
||||
let url = derive_binary_url_from_ws("wss://myhost:443/path");
|
||||
assert_eq!(
|
||||
url.as_deref(),
|
||||
Some("https://myhost:443/api/huskies-binary")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn derive_binary_url_invalid_scheme_returns_none() {
|
||||
let url = derive_binary_url_from_ws("http://not-a-ws-url");
|
||||
assert!(url.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,263 @@
|
||||
//! In-container binary self-update — fetch a new `huskies` binary, atomically
|
||||
//! replace the on-disk executable, drain CRDT persistence, and re-exec.
|
||||
|
||||
use crate::slog;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
// ── Binary fetch ─────────────────────────────────────────────────────────────
|
||||
|
||||
/// Download a binary from `source_url` and atomically replace `target_path`.
|
||||
///
|
||||
/// Writes to a sibling `.tmp` file first, then renames so the replacement is
|
||||
/// atomic on the same filesystem. Sets the execute bit before renaming so the
|
||||
/// file is runnable the moment it appears at the target location.
|
||||
pub async fn fetch_and_replace_binary(source_url: &str, target_path: &Path) -> Result<(), String> {
|
||||
slog!("[upgrade] Fetching binary from {source_url}");
|
||||
|
||||
let resp = reqwest::get(source_url)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to fetch binary from {source_url}: {e}"))?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
return Err(format!(
|
||||
"Binary fetch returned HTTP {}: {source_url}",
|
||||
resp.status()
|
||||
));
|
||||
}
|
||||
|
||||
let bytes = resp
|
||||
.bytes()
|
||||
.await
|
||||
.map_err(|e| format!("Failed to read binary response body: {e}"))?;
|
||||
|
||||
if bytes.is_empty() {
|
||||
return Err("Binary fetch returned an empty body".to_string());
|
||||
}
|
||||
|
||||
// Write to a sibling temp file so the rename is atomic on the same FS.
|
||||
let tmp_path = sibling_tmp_path(target_path)?;
|
||||
std::fs::write(&tmp_path, &bytes)
|
||||
.map_err(|e| format!("Failed to write tmp binary to {}: {e}", tmp_path.display()))?;
|
||||
|
||||
set_executable(&tmp_path)?;
|
||||
|
||||
std::fs::rename(&tmp_path, target_path).map_err(|e| {
|
||||
format!(
|
||||
"Failed to rename {} → {}: {e}",
|
||||
tmp_path.display(),
|
||||
target_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
slog!(
|
||||
"[upgrade] Binary replaced at {} ({} bytes)",
|
||||
target_path.display(),
|
||||
bytes.len()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Full server upgrade (called from the running process) ─────────────────
|
||||
|
||||
/// Fetch a new binary, atomically replace the current executable, drain CRDT
|
||||
/// persistence, and re-exec the running server process with its original args.
|
||||
///
|
||||
/// This function never returns on success — `exec()` replaces the process.
|
||||
/// On failure it returns `Err(message)` so the caller can report the error
|
||||
/// while keeping the original server running.
|
||||
pub async fn upgrade_and_reexec(source_url: &str, project_root: &Path) -> Result<String, String> {
|
||||
let target = resolve_target_path();
|
||||
|
||||
fetch_and_replace_binary(source_url, &target).await?;
|
||||
|
||||
// Drain queued CRDT ops so nothing is lost when exec() replaces the process.
|
||||
crate::crdt_state::flush_persistence(std::time::Duration::from_secs(5)).await;
|
||||
|
||||
// Clean up the port file so the new process can write a fresh one.
|
||||
let port_file = project_root.join(".huskies_port");
|
||||
if port_file.exists() {
|
||||
let _ = std::fs::remove_file(&port_file);
|
||||
}
|
||||
|
||||
let args: Vec<String> = std::env::args().collect();
|
||||
slog!("[upgrade] Re-execing with new binary: {}", target.display());
|
||||
|
||||
use std::os::unix::process::CommandExt;
|
||||
let err = std::process::Command::new(&target).args(&args[1..]).exec();
|
||||
|
||||
// exec() only returns on failure.
|
||||
Err(format!(
|
||||
"Failed to exec new binary at {}: {err}",
|
||||
target.display()
|
||||
))
|
||||
}
|
||||
|
||||
// ── CLI upgrade (no re-exec) ─────────────────────────────────────────────
|
||||
|
||||
/// Run the `huskies upgrade` CLI subcommand: download, replace, and exit.
|
||||
///
|
||||
/// Unlike [`upgrade_and_reexec`], this does not flush the CRDT or re-exec
|
||||
/// because the CLI subcommand is run as a standalone command (not the server).
|
||||
/// After this returns the caller should exit.
|
||||
pub async fn run_cli_upgrade(source_url: &str, target: &Path) -> Result<(), String> {
|
||||
fetch_and_replace_binary(source_url, target).await?;
|
||||
println!(
|
||||
"Upgrade complete. New binary installed at {}.",
|
||||
target.display()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Helpers ───────────────────────────────────────────────────────────────
|
||||
|
||||
/// Resolve the path to replace: `current_exe()` if accessible, else
|
||||
/// `/usr/local/bin/huskies`.
|
||||
pub fn resolve_target_path() -> PathBuf {
|
||||
std::env::current_exe().unwrap_or_else(|_| PathBuf::from("/usr/local/bin/huskies"))
|
||||
}
|
||||
|
||||
fn sibling_tmp_path(target: &Path) -> Result<PathBuf, String> {
|
||||
let parent = target
|
||||
.parent()
|
||||
.ok_or_else(|| format!("Cannot determine parent dir of {}", target.display()))?;
|
||||
Ok(parent.join(".huskies_upgrade.tmp"))
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn set_executable(path: &Path) -> Result<(), String> {
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
let meta =
|
||||
std::fs::metadata(path).map_err(|e| format!("Cannot stat {}: {e}", path.display()))?;
|
||||
let mut perms = meta.permissions();
|
||||
perms.set_mode(0o755);
|
||||
std::fs::set_permissions(path, perms)
|
||||
.map_err(|e| format!("Cannot chmod {}: {e}", path.display()))
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
fn set_executable(_path: &Path) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Tests ─────────────────────────────────────────────────────────────────
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
/// Start a tiny HTTP server in the background that serves `content` at `/`.
|
||||
async fn serve_bytes(content: Vec<u8>) -> (u16, tokio::task::JoinHandle<()>) {
|
||||
use std::sync::Arc;
|
||||
|
||||
let content = Arc::new(content);
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let port = listener.local_addr().unwrap().port();
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
loop {
|
||||
let Ok((mut stream, _)) = listener.accept().await else {
|
||||
break;
|
||||
};
|
||||
let content = Arc::clone(&content);
|
||||
tokio::spawn(async move {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
// Drain the HTTP request (ignore it).
|
||||
let mut buf = [0u8; 4096];
|
||||
let _ = tokio::io::AsyncReadExt::read(&mut stream, &mut buf).await;
|
||||
// Write a minimal HTTP/1.1 200 response.
|
||||
let header = format!(
|
||||
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: application/octet-stream\r\n\r\n",
|
||||
content.len()
|
||||
);
|
||||
let _ = stream.write_all(header.as_bytes()).await;
|
||||
let _ = stream.write_all(&content).await;
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
(port, handle)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_and_replace_binary_downloads_and_replaces() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let target = dir.path().join("huskies");
|
||||
std::fs::write(&target, b"old binary").unwrap();
|
||||
|
||||
let content = b"new binary content v0.99.0".to_vec();
|
||||
let (port, _srv) = serve_bytes(content.clone()).await;
|
||||
|
||||
let url = format!("http://127.0.0.1:{port}/huskies");
|
||||
fetch_and_replace_binary(&url, &target).await.unwrap();
|
||||
|
||||
let on_disk = std::fs::read(&target).unwrap();
|
||||
assert_eq!(
|
||||
on_disk, content,
|
||||
"target must contain the downloaded content"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_and_replace_binary_sets_executable_bit() {
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let target = dir.path().join("huskies");
|
||||
std::fs::write(&target, b"old").unwrap();
|
||||
|
||||
let (port, _srv) = serve_bytes(b"#!/bin/sh\nexit 0".to_vec()).await;
|
||||
let url = format!("http://127.0.0.1:{port}/huskies");
|
||||
fetch_and_replace_binary(&url, &target).await.unwrap();
|
||||
|
||||
let mode = std::fs::metadata(&target).unwrap().permissions().mode();
|
||||
assert!(mode & 0o111 != 0, "binary must be executable after upgrade");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_and_replace_binary_empty_body_is_error() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let target = dir.path().join("huskies");
|
||||
std::fs::write(&target, b"old").unwrap();
|
||||
|
||||
let (port, _srv) = serve_bytes(vec![]).await;
|
||||
let url = format!("http://127.0.0.1:{port}/huskies");
|
||||
let err = fetch_and_replace_binary(&url, &target).await.unwrap_err();
|
||||
assert!(
|
||||
err.contains("empty"),
|
||||
"expected empty-body error, got: {err}"
|
||||
);
|
||||
|
||||
// Original must be untouched (rename never happened).
|
||||
let on_disk = std::fs::read(&target).unwrap();
|
||||
assert_eq!(on_disk, b"old");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fetch_and_replace_binary_unreachable_url_is_error() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let target = dir.path().join("huskies");
|
||||
let err = fetch_and_replace_binary("http://127.0.0.1:1/huskies", &target)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(!err.is_empty(), "expected a non-empty error");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn persisted_ops_count_does_not_decrease_after_flush() {
|
||||
// Initialise an in-process CRDT, flush it, and verify the persisted
|
||||
// count is stable (AC 5 — no ops lost across upgrade).
|
||||
crate::crdt_state::init_for_test();
|
||||
|
||||
let before = crate::crdt_state::dump_crdt_state(None).persisted_ops_count;
|
||||
crate::crdt_state::flush_persistence(std::time::Duration::from_millis(200)).await;
|
||||
let after = crate::crdt_state::dump_crdt_state(None).persisted_ops_count;
|
||||
|
||||
assert!(
|
||||
after >= before,
|
||||
"persisted_ops_count must not decrease after flush: before={before} after={after}"
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user