From de638603cda6b9d62732d90727124e2f54134452 Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 19 May 2026 18:07:59 +0000 Subject: [PATCH] huskies: merge 1144 story Gateway trampoline-restart: detached helper survives the gateway's own death --- .../src/chat/transport/matrix/bot/context.rs | 6 + .../src/chat/transport/matrix/bot/format.rs | 17 + .../matrix/bot/messages/on_room_message.rs | 103 ++++++ server/src/chat/transport/matrix/bot/run.rs | 14 +- server/src/chat/transport/matrix/mod.rs | 2 + server/src/chat/transport/matrix/rebuild.rs | 37 ++ server/src/cli.rs | 17 + server/src/gateway/mod.rs | 3 + server/src/gateway/rebuild.rs | 115 ++++++ server/src/main.rs | 8 + server/src/service/gateway/io.rs | 1 + server/src/trampoline.rs | 334 ++++++++++++++++++ 12 files changed, 656 insertions(+), 1 deletion(-) create mode 100644 server/src/gateway/rebuild.rs create mode 100644 server/src/trampoline.rs diff --git a/server/src/chat/transport/matrix/bot/context.rs b/server/src/chat/transport/matrix/bot/context.rs index 28a6d496..b19128a0 100644 --- a/server/src/chat/transport/matrix/bot/context.rs +++ b/server/src/chat/transport/matrix/bot/context.rs @@ -99,6 +99,11 @@ pub struct BotContext { /// each event is processed at most once. Insert the event ID before any /// side-effecting work; return early if the insert returns `false`. pub handled_incoming_event_ids: Arc>, + /// In gateway mode: the port the gateway is listening on. + /// + /// Used by the "rebuild gateway" command to construct the health-check URL + /// passed to the trampoline. `None` in standalone single-project mode. + pub gateway_port: Option, } impl BotContext { @@ -293,6 +298,7 @@ mod tests { handled_incoming_event_ids: Arc::new(TokioMutex::new(SeenEventIds::new( SEEN_EVENT_IDS_CAP, ))), + gateway_port: None, } } diff --git a/server/src/chat/transport/matrix/bot/format.rs b/server/src/chat/transport/matrix/bot/format.rs index b3ec969d..93413721 100644 --- a/server/src/chat/transport/matrix/bot/format.rs +++ b/server/src/chat/transport/matrix/bot/format.rs @@ -9,6 +9,23 @@ pub fn format_startup_announcement(bot_name: &str) -> String { format!("{bot_name} is online.") } +/// Format the ready announcement sent after a successful gateway trampoline restart. +/// +/// Returns "gateway X.Y.Z ready" using the compiled-in crate version so the +/// operator can confirm which binary is running after a rebuild. +pub fn format_gateway_ready_announcement() -> String { + format!("gateway {} ready", env!("CARGO_PKG_VERSION")) +} + +/// Format the failure announcement sent when the trampoline rolls back to the +/// previous binary. +/// +/// `reason` is the human-readable failure description from the trampoline +/// (e.g. "port 3000 already in use"). +pub fn format_gateway_rollback_announcement(reason: &str) -> String { + format!("Gateway rebuild failed: {reason}. Previous version restored.") +} + /// Convert a Markdown string to an HTML string using pulldown-cmark. /// /// Enables the standard extension set (tables, footnotes, strikethrough, diff --git a/server/src/chat/transport/matrix/bot/messages/on_room_message.rs b/server/src/chat/transport/matrix/bot/messages/on_room_message.rs index ba4bcd93..fa14ce7a 100644 --- a/server/src/chat/transport/matrix/bot/messages/on_room_message.rs +++ b/server/src/chat/transport/matrix/bot/messages/on_room_message.rs @@ -19,6 +19,28 @@ use super::super::verification::check_sender_verified; use super::handle_message; +/// Return `true` when the message is a "rebuild gateway" command addressed to the bot. +/// +/// The command is recognised case-insensitively as `rebuild gateway` after stripping +/// the bot mention prefix so both `@Timmy rebuild gateway` and `Timmy rebuild gateway` +/// match. +fn extract_rebuild_gateway_command(message: &str, bot_name: &str, bot_user_id: &str) -> bool { + let stripped = crate::chat::util::strip_bot_mention(message, bot_name, bot_user_id); + let trimmed = stripped + .trim() + .trim_start_matches(|c: char| !c.is_alphanumeric()); + let (cmd, rest) = match trimmed.split_once(char::is_whitespace) { + Some((c, r)) => (c, r.trim()), + None => return false, + }; + cmd.eq_ignore_ascii_case("rebuild") + && rest + .split_whitespace() + .next() + .map(|w| w.eq_ignore_ascii_case("gateway")) + .unwrap_or(false) +} + /// Evaluate a `switch ` command against the live project store. /// /// Reads valid project names from the store at call time so newly added @@ -657,6 +679,87 @@ pub(in crate::chat::transport::matrix::bot) async fn on_room_message( return; } + // In gateway mode, intercept "rebuild gateway" and route it through the + // detached trampoline so the process swap survives any bash-tool kill cascade. + if ctx.gateway_active_project.is_some() + && extract_rebuild_gateway_command( + &user_message, + &ctx.services.bot_name, + ctx.matrix_user_id.as_str(), + ) + { + slog!("[matrix-bot] Handling 'rebuild gateway' command from {sender}"); + let ack = "Rebuilding gateway\u{2026} this may take a moment."; + let ack_html = markdown_to_html(ack); + if let Ok(msg_id) = ctx + .transport + .send_message(&room_id_str, ack, &ack_html) + .await + && let Ok(event_id) = msg_id.parse() + { + ctx.bot_sent_event_ids.lock().await.insert(event_id); + } + let config_dir = ctx.services.project_root.clone(); + let gateway_port: u16 = ctx.gateway_port.unwrap_or(3000); + match crate::gateway::rebuild::rebuild_gateway(&config_dir, gateway_port).await { + Ok(()) => { + // Trampoline is running detached — it kills this gateway and starts + // the new one, which will post "gateway X.Y.Z ready" on startup. + } + Err(e) => { + let msg = format!("Gateway rebuild failed: {e}"); + let html = markdown_to_html(&msg); + if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &msg, &html).await + && let Ok(event_id) = msg_id.parse() + { + ctx.bot_sent_event_ids.lock().await.insert(event_id); + } + } + } + return; + } + + // In gateway mode, intercept "rebuild gateway" before the plain "rebuild" + // handler so the trampoline path is used instead of a direct re-exec. + if ctx.gateway_port.is_some() + && super::super::super::rebuild::extract_rebuild_gateway_command( + &user_message, + &ctx.services.bot_name, + ctx.matrix_user_id.as_str(), + ) + .is_some() + { + slog!("[matrix-bot] Handling rebuild-gateway command from {sender}"); + let ack = "Rebuilding gateway… this may take a moment. \ + The gateway will announce itself when the new version is ready."; + let ack_html = markdown_to_html(ack); + if let Ok(msg_id) = ctx + .transport + .send_message(&room_id_str, ack, &ack_html) + .await + && let Ok(event_id) = msg_id.parse() + { + ctx.bot_sent_event_ids.lock().await.insert(event_id); + } + let port = ctx.gateway_port.unwrap_or(3000); + match crate::gateway::rebuild::rebuild_gateway(&ctx.services.project_root, port).await { + Ok(()) => { + // Trampoline is running — this gateway will be killed shortly. + // No further reply needed; the new gateway posts "gateway X.Y.Z ready". + } + Err(e) => { + let msg = format!("Gateway rebuild failed: {e}"); + let html = markdown_to_html(&msg); + if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &msg, &html).await + && let Ok(event_id) = msg_id.parse() + { + ctx.bot_sent_event_ids.lock().await.insert(event_id); + } + } + } + return; + } + // Check for the rebuild command, which requires async agent and process ops // and cannot be handled by the sync command registry. if super::super::super::rebuild::extract_rebuild_command( diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index 653f898d..14cbc33b 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -39,6 +39,7 @@ pub async fn run_bot( gateway_event_rx: Option< tokio::sync::broadcast::Receiver, >, + gateway_port: Option, ) -> Result<(), String> { let project_root = &services.project_root; let store_path = project_root.join(".huskies").join("matrix_store"); @@ -334,6 +335,7 @@ pub async fn run_bot( handled_incoming_event_ids: Arc::new(TokioMutex::new(super::context::SeenEventIds::new( super::context::SEEN_EVENT_IDS_CAP, ))), + gateway_port, }; slog!( @@ -408,7 +410,17 @@ pub async fn run_bot( // bot is online. This runs once per process start — the sync loop handles // reconnects internally so this code is never reached again on a network // blip or sync resumption. - let announce_msg = format_startup_announcement(&announce_bot_name); + // + // When started by the trampoline the message is specialised: + // - HUSKIES_TRAMPOLINE_STARTED=1 → "gateway X.Y.Z ready" + // - HUSKIES_TRAMPOLINE_FAILURE= → rollback failure notice + let announce_msg = if let Ok(reason) = std::env::var("HUSKIES_TRAMPOLINE_FAILURE") { + super::format::format_gateway_rollback_announcement(&reason) + } else if std::env::var("HUSKIES_TRAMPOLINE_STARTED").is_ok() { + super::format::format_gateway_ready_announcement() + } else { + format_startup_announcement(&announce_bot_name) + }; let announce_html = markdown_to_html(&announce_msg); slog!("[matrix-bot] Sending startup announcement: {announce_msg}"); for room_id in &announce_room_ids { diff --git a/server/src/chat/transport/matrix/mod.rs b/server/src/chat/transport/matrix/mod.rs index 1b3be2fc..37726629 100644 --- a/server/src/chat/transport/matrix/mod.rs +++ b/server/src/chat/transport/matrix/mod.rs @@ -94,6 +94,7 @@ pub fn spawn_bot( gateway_event_rx: Option< tokio::sync::broadcast::Receiver, >, + gateway_port: Option, ) -> Option { let config = match BotConfig::load(project_root) { Some(c) => c, @@ -132,6 +133,7 @@ pub fn spawn_bot( gateway_projects_store, timer_store, gateway_event_rx, + gateway_port, ) .await { diff --git a/server/src/chat/transport/matrix/rebuild.rs b/server/src/chat/transport/matrix/rebuild.rs index 24e767bc..65ffb7f8 100644 --- a/server/src/chat/transport/matrix/rebuild.rs +++ b/server/src/chat/transport/matrix/rebuild.rs @@ -40,6 +40,43 @@ pub fn extract_rebuild_command( } } +/// Parse a "rebuild gateway" command from a raw message body. +/// +/// Returns `Some(RebuildCommand)` only when the stripped message begins with +/// "rebuild gateway" (case-insensitive). A plain "rebuild" without the +/// "gateway" qualifier returns `None` so it falls through to the standard +/// server rebuild handler. +pub fn extract_rebuild_gateway_command( + message: &str, + bot_name: &str, + bot_user_id: &str, +) -> Option { + let stripped = strip_bot_mention(message, bot_name, bot_user_id); + let trimmed = stripped + .trim() + .trim_start_matches(|c: char| !c.is_alphanumeric()); + + let (cmd, rest) = trimmed.split_once(char::is_whitespace)?; + + if !cmd.eq_ignore_ascii_case("rebuild") { + return None; + } + + let qualifier = rest + .trim() + .trim_start_matches(|c: char| !c.is_alphanumeric()); + let first_word = match qualifier.split_once(char::is_whitespace) { + Some((w, _)) => w, + None => qualifier, + }; + + if first_word.eq_ignore_ascii_case("gateway") { + Some(RebuildCommand) + } else { + None + } +} + /// Handle a rebuild command: trigger server rebuild and restart. /// /// Returns a string describing the outcome. On build failure the error diff --git a/server/src/cli.rs b/server/src/cli.rs index 07961773..b8e0c287 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -35,6 +35,11 @@ pub(crate) struct CliArgs { /// `HUSKIES_BINARY_SOURCE` env var, then derives the URL from /// `HUSKIES_UPSTREAM_GATEWAY`. pub(crate) upgrade_source: Option, + /// Path to a trampoline job file (`--trampoline `). + /// + /// When set, the binary runs as a detached trampoline helper: it kills the + /// old gateway, starts the new one, polls its health, and rolls back on failure. + pub(crate) trampoline: Option, } /// Parse CLI arguments into `CliArgs`, or exit early for `--help` / `--version`. @@ -51,6 +56,7 @@ pub(crate) fn parse_cli_args(args: &[String]) -> Result { let mut upstream_gateway: Option = None; let mut upgrade = false; let mut upgrade_source: Option = None; + let mut trampoline: Option = None; let mut i = 0; while i < args.len() { @@ -143,6 +149,16 @@ pub(crate) fn parse_cli_args(args: &[String]) -> Result { a if a.starts_with("--source=") => { upgrade_source = Some(a["--source=".len()..].to_string()); } + "--trampoline" => { + i += 1; + if i >= args.len() { + return Err("--trampoline requires a path".to_string()); + } + trampoline = Some(args[i].clone()); + } + a if a.starts_with("--trampoline=") => { + trampoline = Some(a["--trampoline=".len()..].to_string()); + } a if a.starts_with('-') => { return Err(format!("unknown option: {a}")); } @@ -172,6 +188,7 @@ pub(crate) fn parse_cli_args(args: &[String]) -> Result { upstream_gateway, upgrade, upgrade_source, + trampoline, }) } diff --git a/server/src/gateway/mod.rs b/server/src/gateway/mod.rs index fc4fbacc..c49e36c0 100644 --- a/server/src/gateway/mod.rs +++ b/server/src/gateway/mod.rs @@ -4,6 +4,9 @@ //! Business logic lives in `service::gateway`, HTTP handlers in `http::gateway`. //! This file contains only the `run` entrypoint and `build_gateway_route` wiring. +/// Gateway rebuild — builds the new binary and launches the detached trampoline. +pub mod rebuild; + use crate::http::gateway::*; use crate::rebuild::ShutdownReason; use crate::service::gateway::{self, GatewayState}; diff --git a/server/src/gateway/rebuild.rs b/server/src/gateway/rebuild.rs new file mode 100644 index 00000000..b791c10c --- /dev/null +++ b/server/src/gateway/rebuild.rs @@ -0,0 +1,115 @@ +//! Gateway rebuild — builds the new huskies binary and hands off to the trampoline. +//! +//! The trampoline is spawned as a detached process (new Unix session) so that it +//! survives the gateway's own death. On success the gateway continues running +//! until the trampoline kills it; the new gateway then posts "gateway X.Y.Z ready". + +use std::path::Path; + +/// Build the huskies binary and launch the detached trampoline to swap the gateway. +/// +/// Returns `Err(message)` (shown to the user in chat) if the build or trampoline +/// launch fails. On success returns `Ok(())` — the trampoline is now running +/// in a detached process and will kill this gateway and replace it with the new +/// binary within 10 s. +pub async fn rebuild_gateway(config_dir: &Path, gateway_port: u16) -> Result<(), String> { + let manifest_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR")); + let workspace_root = manifest_dir + .parent() + .ok_or("cannot determine workspace root from CARGO_MANIFEST_DIR")?; + + crate::slog!( + "[gateway-rebuild] Building from workspace root: {}", + workspace_root.display() + ); + + // Rebuild the frontend bundle so rust-embed picks up the latest assets. + let frontend_dir = workspace_root.join("frontend"); + if frontend_dir.join("package.json").exists() { + crate::slog!("[gateway-rebuild] Building frontend"); + let fe_output = tokio::task::spawn_blocking({ + let dir = frontend_dir.clone(); + move || { + std::process::Command::new("npm") + .args(["run", "build"]) + .current_dir(&dir) + .output() + } + }) + .await + .map_err(|e| format!("frontend build task panicked: {e}"))? + .map_err(|e| format!("failed to run npm run build: {e}"))?; + + if !fe_output.status.success() { + let stderr = String::from_utf8_lossy(&fe_output.stderr); + return Err(format!("Frontend build failed:\n{stderr}")); + } + crate::slog!("[gateway-rebuild] Frontend build succeeded"); + } + + // Build the server binary matching the current profile. + let build_args: Vec<&str> = if cfg!(debug_assertions) { + vec!["build", "-p", "huskies"] + } else { + vec!["build", "--release", "-p", "huskies"] + }; + crate::slog!("[gateway-rebuild] cargo {}", build_args.join(" ")); + + let output = tokio::task::spawn_blocking({ + let root = workspace_root.to_path_buf(); + move || { + std::process::Command::new("cargo") + .args(&build_args) + .current_dir(&root) + .output() + } + }) + .await + .map_err(|e| format!("build task panicked: {e}"))? + .map_err(|e| format!("failed to run cargo build: {e}"))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + crate::slog!("[gateway-rebuild] Build failed"); + return Err(format!("Build failed:\n{stderr}")); + } + + crate::slog!("[gateway-rebuild] Build succeeded — launching trampoline"); + + // Paths for the new and old binaries. + let new_binary = if cfg!(debug_assertions) { + workspace_root.join("target/debug/huskies") + } else { + workspace_root.join("target/release/huskies") + }; + + let old_binary = + std::env::current_exe().map_err(|e| format!("cannot locate current binary: {e}"))?; + + let huskies_dir = config_dir.join(".huskies"); + std::fs::create_dir_all(&huskies_dir) + .map_err(|e| format!("cannot create .huskies dir: {e}"))?; + let backup_binary = huskies_dir.join("huskies_backup"); + + // Current gateway args (skip argv[0]). + let gateway_args: Vec = std::env::args().skip(1).collect(); + + let job = crate::trampoline::TrampolineJob { + gateway_pid: std::process::id(), + new_binary_path: new_binary, + old_binary_path: old_binary, + backup_binary_path: backup_binary, + gateway_args, + health_url: format!("http://127.0.0.1:{gateway_port}/api/gateway"), + }; + + let job_path = huskies_dir.join("trampoline.json"); + crate::trampoline::write_job_atomic(&job, &job_path)?; + + let exe = std::env::current_exe() + .map_err(|e| format!("cannot locate current binary for trampoline: {e}"))?; + crate::trampoline::spawn_detached_trampoline(&exe, &job_path)?; + + crate::slog!("[gateway-rebuild] Trampoline launched — gateway will be replaced shortly"); + Ok(()) +} diff --git a/server/src/main.rs b/server/src/main.rs index 332fc27a..42dc6900 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -51,6 +51,8 @@ pub mod sled_uplink; mod startup; mod state; mod store; +/// Detached trampoline — kills the running gateway and starts the new binary. +pub mod trampoline; /// In-container binary self-update — fetch, atomic replace, and re-exec. pub mod upgrade; /// Validated input layer — transport-agnostic newtypes and request structs for all MCP write tools. @@ -162,6 +164,11 @@ async fn main() -> Result<(), std::io::Error> { } } + // ── Trampoline mode: kill old gateway, start new one ───────────────────── + if let Some(ref job_path) = cli.trampoline { + trampoline::run_trampoline(std::path::Path::new(job_path)).await; + } + // ── Upgrade mode: fetch new binary, replace, exit ─────────────────────── if cli.upgrade { let source = cli @@ -414,6 +421,7 @@ async fn main() -> Result<(), std::io::Error> { None, timer_store_for_bot, None, + None, ); } else { drop(matrix_shutdown_rx); diff --git a/server/src/service/gateway/io.rs b/server/src/service/gateway/io.rs index 09d90114..84c7cdc6 100644 --- a/server/src/service/gateway/io.rs +++ b/server/src/service/gateway/io.rs @@ -579,6 +579,7 @@ pub fn spawn_gateway_bot( Some(gateway_projects_store), timer_store, gateway_event_rx, + Some(port), ); (handle, shutdown_tx) } diff --git a/server/src/trampoline.rs b/server/src/trampoline.rs new file mode 100644 index 00000000..a7443ed2 --- /dev/null +++ b/server/src/trampoline.rs @@ -0,0 +1,334 @@ +//! Detached trampoline — kills the running gateway and launches the replacement. +//! +//! The trampoline is invoked as `huskies --trampoline `. It is spawned +//! as a new Unix session (`setsid`) so that SIGKILL/SIGTERM sent to the original +//! bash-tool process group does not reach it. +//! +//! Flow: +//! 1. Gateway writes a [`TrampolineJob`] atomically and spawns the trampoline. +//! 2. Trampoline backs up the old binary, kills the gateway, starts the new binary. +//! 3. If the new binary passes a health-poll within 10 s → exit 0. +//! 4. If it fails → restore backup, start it with `HUSKIES_TRAMPOLINE_FAILURE` set. + +use serde::{Deserialize, Serialize}; +use std::path::{Path, PathBuf}; +use std::time::Duration; + +// ── Job descriptor ──────────────────────────────────────────────────────────── + +/// Descriptor atomically written by the gateway before it hands control to the trampoline. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TrampolineJob { + /// PID of the currently running gateway process to kill. + pub gateway_pid: u32, + /// Absolute path to the newly compiled binary to launch. + pub new_binary_path: PathBuf, + /// Absolute path of the binary currently running as the gateway (for rollback). + pub old_binary_path: PathBuf, + /// Where to write the backup of the old binary before killing the gateway. + pub backup_binary_path: PathBuf, + /// Arguments forwarded verbatim to the new/backup gateway (everything after argv[0]). + pub gateway_args: Vec, + /// HTTP URL the trampoline polls to verify the new gateway is serving. + /// Empty string means skip health polling (used in tests). + pub health_url: String, +} + +// ── Atomic write ────────────────────────────────────────────────────────────── + +/// Write `job` to `path` atomically: write to a sibling `.tmp` file, then rename. +/// +/// The rename is atomic on POSIX so the trampoline never reads a half-written file. +pub fn write_job_atomic(job: &TrampolineJob, path: &Path) -> Result<(), String> { + let tmp = path.with_extension("tmp"); + let data = serde_json::to_vec(job).map_err(|e| format!("JSON encode failed: {e}"))?; + std::fs::write(&tmp, &data).map_err(|e| format!("tmp write failed: {e}"))?; + std::fs::rename(&tmp, path).map_err(|e| format!("rename failed: {e}"))?; + Ok(()) +} + +// ── Spawn detached ──────────────────────────────────────────────────────────── + +/// Spawn `exe --trampoline ` as a fully detached process. +/// +/// On Unix the child calls `setsid()` in `pre_exec` so it belongs to a new session +/// and is unreachable by signals sent to the original process group. stdin/stdout/ +/// stderr are all redirected to `/dev/null` so the child is fully daemonised. +pub fn spawn_detached_trampoline(exe: &Path, job_path: &Path) -> Result<(), String> { + let mut cmd = std::process::Command::new(exe); + cmd.arg("--trampoline").arg(job_path); + cmd.stdin(std::process::Stdio::null()); + cmd.stdout(std::process::Stdio::null()); + cmd.stderr(std::process::Stdio::null()); + + #[cfg(unix)] + { + use std::os::unix::process::CommandExt; + // SAFETY: setsid() is async-signal-safe. This is called in the child + // between fork and exec with no other threads running in the child's + // address space — the only safe window for pre_exec hooks. + unsafe { + cmd.pre_exec(|| { + if libc::setsid() == -1 { + return Err(std::io::Error::last_os_error()); + } + Ok(()) + }); + } + } + + cmd.spawn().map_err(|e| format!("spawn failed: {e}"))?; + Ok(()) +} + +// ── Process management ──────────────────────────────────────────────────────── + +/// Send SIGTERM to `pid`, wait up to 3 s for it to exit, then SIGKILL. +/// +/// After SIGKILL the process is unconditionally considered gone — SIGKILL cannot +/// be ignored, so the process is dead even if it briefly lingers as a zombie +/// (zombie detection via `kill(pid, 0)` is unreliable from a non-parent process). +#[cfg(unix)] +fn kill_gateway_process(pid: u32) -> Result<(), String> { + use std::thread::sleep; + + let ipid = pid as libc::pid_t; + + // Safety: kill() is always safe to call with any pid. + let running = || unsafe { libc::kill(ipid, 0) } == 0; + + if !running() { + return Ok(()); + } + + unsafe { libc::kill(ipid, libc::SIGTERM) }; + + for _ in 0..30 { + sleep(Duration::from_millis(100)); + if !running() { + return Ok(()); + } + } + + // SIGKILL cannot be ignored — the kernel will terminate the process. + // We don't loop-poll after this: the process may briefly appear as a + // zombie (still in the table, not yet reaped by its parent), in which + // case kill(pid, 0) returns 0 even though it is effectively dead. + unsafe { libc::kill(ipid, libc::SIGKILL) }; + sleep(Duration::from_millis(200)); + Ok(()) +} + +#[cfg(not(unix))] +fn kill_gateway_process(pid: u32) -> Result<(), String> { + Err(format!("kill not supported on this platform (pid {pid})")) +} + +// ── Health polling ──────────────────────────────────────────────────────────── + +/// Poll `url` every 500 ms until it returns HTTP 2xx or `timeout` elapses. +async fn poll_health(url: &str, timeout: Duration) -> Result<(), String> { + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(2)) + .build() + .unwrap_or_else(|_| reqwest::Client::new()); + + let deadline = std::time::Instant::now() + timeout; + while std::time::Instant::now() < deadline { + if let Ok(resp) = client.get(url).send().await + && resp.status().is_success() + { + return Ok(()); + } + tokio::time::sleep(Duration::from_millis(500)).await; + } + Err(format!( + "health check timed out after {}s: {url}", + timeout.as_secs() + )) +} + +// ── Core logic (testable) ───────────────────────────────────────────────────── + +/// Kill the old gateway, start the new one, and poll its health endpoint. +/// +/// Returns `Ok(())` on success or `Err(reason)` when the new gateway could not +/// be started or failed health checks. Callers are responsible for rollback. +/// +/// When `job.health_url` is empty the health poll is skipped (for unit tests). +pub async fn execute_trampoline_core(job: &TrampolineJob) -> Result<(), String> { + // Back up old binary (best-effort — rollback won't work if this fails). + if let Some(parent) = job.backup_binary_path.parent() { + let _ = std::fs::create_dir_all(parent); + } + let _ = std::fs::copy(&job.old_binary_path, &job.backup_binary_path); + + // Kill old gateway. + kill_gateway_process(job.gateway_pid)?; + + // Start new gateway. + std::process::Command::new(&job.new_binary_path) + .args(&job.gateway_args) + .env("HUSKIES_TRAMPOLINE_STARTED", "1") + .stdin(std::process::Stdio::null()) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .spawn() + .map_err(|e| format!("failed to start new gateway: {e}"))?; + + // Poll health (skip when URL is empty — used in tests). + if !job.health_url.is_empty() { + poll_health(&job.health_url, Duration::from_secs(10)).await?; + } + + Ok(()) +} + +// ── Entry point ─────────────────────────────────────────────────────────────── + +/// Run the trampoline from a job file. This function never returns. +/// +/// On success exits 0 (new gateway is up and will post its own "ready" message). +/// On failure starts the backup binary with `HUSKIES_TRAMPOLINE_FAILURE` set and +/// exits 1. On unrecoverable failure (cannot start backup either) exits 2. +pub async fn run_trampoline(job_path: &Path) -> ! { + let data = match std::fs::read(job_path) { + Ok(d) => d, + Err(e) => { + eprintln!( + "[trampoline] cannot read job file {}: {e}", + job_path.display() + ); + std::process::exit(1); + } + }; + + let job: TrampolineJob = match serde_json::from_slice(&data) { + Ok(j) => j, + Err(e) => { + eprintln!("[trampoline] cannot parse job file: {e}"); + std::process::exit(1); + } + }; + + eprintln!( + "[trampoline] killing gateway PID {} and starting {}", + job.gateway_pid, + job.new_binary_path.display() + ); + + match execute_trampoline_core(&job).await { + Ok(()) => { + eprintln!("[trampoline] new gateway is up — exiting"); + let _ = std::fs::remove_file(job_path); + std::process::exit(0); + } + Err(reason) => { + eprintln!("[trampoline] new gateway failed ({reason}) — rolling back"); + let result = std::process::Command::new(&job.backup_binary_path) + .args(&job.gateway_args) + .env("HUSKIES_TRAMPOLINE_FAILURE", &reason) + .stdin(std::process::Stdio::null()) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .spawn(); + match result { + Ok(_) => { + let _ = std::fs::remove_file(job_path); + std::process::exit(1); + } + Err(e) => { + eprintln!("[trampoline] FATAL: cannot start backup gateway: {e}"); + std::process::exit(2); + } + } + } + } +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use std::path::PathBuf; + + /// Locate `sleep` on the current platform (needed for a portable fake-gateway). + fn find_sleep() -> PathBuf { + for candidate in ["/usr/bin/sleep", "/bin/sleep"] { + let p = PathBuf::from(candidate); + if p.exists() { + return p; + } + } + panic!("sleep binary not found"); + } + + #[test] + fn write_job_atomic_round_trips() { + let tmp = tempfile::tempdir().unwrap(); + let job = TrampolineJob { + gateway_pid: 12345, + new_binary_path: PathBuf::from("/new/huskies"), + old_binary_path: PathBuf::from("/old/huskies"), + backup_binary_path: tmp.path().join("backup"), + gateway_args: vec!["--gateway".to_string(), "/workspace".to_string()], + health_url: "http://127.0.0.1:3000/api/gateway".to_string(), + }; + let path = tmp.path().join("trampoline.json"); + write_job_atomic(&job, &path).unwrap(); + + // No .tmp file should remain. + assert!(!path.with_extension("tmp").exists()); + // Final file must exist. + assert!(path.exists()); + + // Round-trip: deserialise and compare fields. + let data = std::fs::read(&path).unwrap(); + let loaded: TrampolineJob = serde_json::from_slice(&data).unwrap(); + assert_eq!(loaded.gateway_pid, job.gateway_pid); + assert_eq!(loaded.new_binary_path, job.new_binary_path); + assert_eq!(loaded.gateway_args, job.gateway_args); + } + + /// AC 5: a fake-gateway `sleep` process is killed and replaced within timeout. + #[tokio::test] + async fn fake_gateway_killed_and_replaced_within_timeout() { + let sleep_exe = find_sleep(); + let tmp = tempfile::tempdir().unwrap(); + + // Spawn the fake gateway (a long-lived sleep process). + let mut fake_gw = std::process::Command::new(&sleep_exe) + .arg("60") + .stdin(std::process::Stdio::null()) + .spawn() + .expect("spawn fake gateway"); + let fake_pid = fake_gw.id(); + + let job = TrampolineJob { + gateway_pid: fake_pid, + new_binary_path: sleep_exe.clone(), + old_binary_path: sleep_exe.clone(), + backup_binary_path: tmp.path().join("backup"), + gateway_args: vec!["1".to_string()], + health_url: String::new(), // skip health check in test + }; + + let start = std::time::Instant::now(); + let result = execute_trampoline_core(&job).await; + let elapsed = start.elapsed(); + + assert!(result.is_ok(), "trampoline core should succeed: {result:?}"); + assert!( + elapsed < Duration::from_secs(10), + "should complete well within 10s timeout, took {elapsed:?}" + ); + + // Reap the zombie — should be dead now. + let status = fake_gw.try_wait().expect("try_wait"); + assert!( + status.is_some(), + "fake gateway process should be dead after trampoline kill" + ); + } +}