huskies: merge 1144 story Gateway trampoline-restart: detached helper survives the gateway's own death
This commit is contained in:
@@ -99,6 +99,11 @@ pub struct BotContext {
|
||||
/// each event is processed at most once. Insert the event ID before any
|
||||
/// side-effecting work; return early if the insert returns `false`.
|
||||
pub handled_incoming_event_ids: Arc<TokioMutex<SeenEventIds>>,
|
||||
/// In gateway mode: the port the gateway is listening on.
|
||||
///
|
||||
/// Used by the "rebuild gateway" command to construct the health-check URL
|
||||
/// passed to the trampoline. `None` in standalone single-project mode.
|
||||
pub gateway_port: Option<u16>,
|
||||
}
|
||||
|
||||
impl BotContext {
|
||||
@@ -293,6 +298,7 @@ mod tests {
|
||||
handled_incoming_event_ids: Arc::new(TokioMutex::new(SeenEventIds::new(
|
||||
SEEN_EVENT_IDS_CAP,
|
||||
))),
|
||||
gateway_port: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,23 @@ pub fn format_startup_announcement(bot_name: &str) -> String {
|
||||
format!("{bot_name} is online.")
|
||||
}
|
||||
|
||||
/// Format the ready announcement sent after a successful gateway trampoline restart.
|
||||
///
|
||||
/// Returns "gateway X.Y.Z ready" using the compiled-in crate version so the
|
||||
/// operator can confirm which binary is running after a rebuild.
|
||||
pub fn format_gateway_ready_announcement() -> String {
|
||||
format!("gateway {} ready", env!("CARGO_PKG_VERSION"))
|
||||
}
|
||||
|
||||
/// Format the failure announcement sent when the trampoline rolls back to the
|
||||
/// previous binary.
|
||||
///
|
||||
/// `reason` is the human-readable failure description from the trampoline
|
||||
/// (e.g. "port 3000 already in use").
|
||||
pub fn format_gateway_rollback_announcement(reason: &str) -> String {
|
||||
format!("Gateway rebuild failed: {reason}. Previous version restored.")
|
||||
}
|
||||
|
||||
/// Convert a Markdown string to an HTML string using pulldown-cmark.
|
||||
///
|
||||
/// Enables the standard extension set (tables, footnotes, strikethrough,
|
||||
|
||||
@@ -19,6 +19,28 @@ use super::super::verification::check_sender_verified;
|
||||
|
||||
use super::handle_message;
|
||||
|
||||
/// Return `true` when the message is a "rebuild gateway" command addressed to the bot.
|
||||
///
|
||||
/// The command is recognised case-insensitively as `rebuild gateway` after stripping
|
||||
/// the bot mention prefix so both `@Timmy rebuild gateway` and `Timmy rebuild gateway`
|
||||
/// match.
|
||||
fn extract_rebuild_gateway_command(message: &str, bot_name: &str, bot_user_id: &str) -> bool {
|
||||
let stripped = crate::chat::util::strip_bot_mention(message, bot_name, bot_user_id);
|
||||
let trimmed = stripped
|
||||
.trim()
|
||||
.trim_start_matches(|c: char| !c.is_alphanumeric());
|
||||
let (cmd, rest) = match trimmed.split_once(char::is_whitespace) {
|
||||
Some((c, r)) => (c, r.trim()),
|
||||
None => return false,
|
||||
};
|
||||
cmd.eq_ignore_ascii_case("rebuild")
|
||||
&& rest
|
||||
.split_whitespace()
|
||||
.next()
|
||||
.map(|w| w.eq_ignore_ascii_case("gateway"))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Evaluate a `switch <arg>` command against the live project store.
|
||||
///
|
||||
/// Reads valid project names from the store at call time so newly added
|
||||
@@ -657,6 +679,87 @@ pub(in crate::chat::transport::matrix::bot) async fn on_room_message(
|
||||
return;
|
||||
}
|
||||
|
||||
// In gateway mode, intercept "rebuild gateway" and route it through the
|
||||
// detached trampoline so the process swap survives any bash-tool kill cascade.
|
||||
if ctx.gateway_active_project.is_some()
|
||||
&& extract_rebuild_gateway_command(
|
||||
&user_message,
|
||||
&ctx.services.bot_name,
|
||||
ctx.matrix_user_id.as_str(),
|
||||
)
|
||||
{
|
||||
slog!("[matrix-bot] Handling 'rebuild gateway' command from {sender}");
|
||||
let ack = "Rebuilding gateway\u{2026} this may take a moment.";
|
||||
let ack_html = markdown_to_html(ack);
|
||||
if let Ok(msg_id) = ctx
|
||||
.transport
|
||||
.send_message(&room_id_str, ack, &ack_html)
|
||||
.await
|
||||
&& let Ok(event_id) = msg_id.parse()
|
||||
{
|
||||
ctx.bot_sent_event_ids.lock().await.insert(event_id);
|
||||
}
|
||||
let config_dir = ctx.services.project_root.clone();
|
||||
let gateway_port: u16 = ctx.gateway_port.unwrap_or(3000);
|
||||
match crate::gateway::rebuild::rebuild_gateway(&config_dir, gateway_port).await {
|
||||
Ok(()) => {
|
||||
// Trampoline is running detached — it kills this gateway and starts
|
||||
// the new one, which will post "gateway X.Y.Z ready" on startup.
|
||||
}
|
||||
Err(e) => {
|
||||
let msg = format!("Gateway rebuild failed: {e}");
|
||||
let html = markdown_to_html(&msg);
|
||||
if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &msg, &html).await
|
||||
&& let Ok(event_id) = msg_id.parse()
|
||||
{
|
||||
ctx.bot_sent_event_ids.lock().await.insert(event_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// In gateway mode, intercept "rebuild gateway" before the plain "rebuild"
|
||||
// handler so the trampoline path is used instead of a direct re-exec.
|
||||
if ctx.gateway_port.is_some()
|
||||
&& super::super::super::rebuild::extract_rebuild_gateway_command(
|
||||
&user_message,
|
||||
&ctx.services.bot_name,
|
||||
ctx.matrix_user_id.as_str(),
|
||||
)
|
||||
.is_some()
|
||||
{
|
||||
slog!("[matrix-bot] Handling rebuild-gateway command from {sender}");
|
||||
let ack = "Rebuilding gateway… this may take a moment. \
|
||||
The gateway will announce itself when the new version is ready.";
|
||||
let ack_html = markdown_to_html(ack);
|
||||
if let Ok(msg_id) = ctx
|
||||
.transport
|
||||
.send_message(&room_id_str, ack, &ack_html)
|
||||
.await
|
||||
&& let Ok(event_id) = msg_id.parse()
|
||||
{
|
||||
ctx.bot_sent_event_ids.lock().await.insert(event_id);
|
||||
}
|
||||
let port = ctx.gateway_port.unwrap_or(3000);
|
||||
match crate::gateway::rebuild::rebuild_gateway(&ctx.services.project_root, port).await {
|
||||
Ok(()) => {
|
||||
// Trampoline is running — this gateway will be killed shortly.
|
||||
// No further reply needed; the new gateway posts "gateway X.Y.Z ready".
|
||||
}
|
||||
Err(e) => {
|
||||
let msg = format!("Gateway rebuild failed: {e}");
|
||||
let html = markdown_to_html(&msg);
|
||||
if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, &msg, &html).await
|
||||
&& let Ok(event_id) = msg_id.parse()
|
||||
{
|
||||
ctx.bot_sent_event_ids.lock().await.insert(event_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Check for the rebuild command, which requires async agent and process ops
|
||||
// and cannot be handled by the sync command registry.
|
||||
if super::super::super::rebuild::extract_rebuild_command(
|
||||
|
||||
@@ -39,6 +39,7 @@ pub async fn run_bot(
|
||||
gateway_event_rx: Option<
|
||||
tokio::sync::broadcast::Receiver<crate::service::gateway::GatewayStatusEvent>,
|
||||
>,
|
||||
gateway_port: Option<u16>,
|
||||
) -> Result<(), String> {
|
||||
let project_root = &services.project_root;
|
||||
let store_path = project_root.join(".huskies").join("matrix_store");
|
||||
@@ -334,6 +335,7 @@ pub async fn run_bot(
|
||||
handled_incoming_event_ids: Arc::new(TokioMutex::new(super::context::SeenEventIds::new(
|
||||
super::context::SEEN_EVENT_IDS_CAP,
|
||||
))),
|
||||
gateway_port,
|
||||
};
|
||||
|
||||
slog!(
|
||||
@@ -408,7 +410,17 @@ pub async fn run_bot(
|
||||
// bot is online. This runs once per process start — the sync loop handles
|
||||
// reconnects internally so this code is never reached again on a network
|
||||
// blip or sync resumption.
|
||||
let announce_msg = format_startup_announcement(&announce_bot_name);
|
||||
//
|
||||
// When started by the trampoline the message is specialised:
|
||||
// - HUSKIES_TRAMPOLINE_STARTED=1 → "gateway X.Y.Z ready"
|
||||
// - HUSKIES_TRAMPOLINE_FAILURE=<reason> → rollback failure notice
|
||||
let announce_msg = if let Ok(reason) = std::env::var("HUSKIES_TRAMPOLINE_FAILURE") {
|
||||
super::format::format_gateway_rollback_announcement(&reason)
|
||||
} else if std::env::var("HUSKIES_TRAMPOLINE_STARTED").is_ok() {
|
||||
super::format::format_gateway_ready_announcement()
|
||||
} else {
|
||||
format_startup_announcement(&announce_bot_name)
|
||||
};
|
||||
let announce_html = markdown_to_html(&announce_msg);
|
||||
slog!("[matrix-bot] Sending startup announcement: {announce_msg}");
|
||||
for room_id in &announce_room_ids {
|
||||
|
||||
@@ -94,6 +94,7 @@ pub fn spawn_bot(
|
||||
gateway_event_rx: Option<
|
||||
tokio::sync::broadcast::Receiver<crate::service::gateway::GatewayStatusEvent>,
|
||||
>,
|
||||
gateway_port: Option<u16>,
|
||||
) -> Option<tokio::task::AbortHandle> {
|
||||
let config = match BotConfig::load(project_root) {
|
||||
Some(c) => c,
|
||||
@@ -132,6 +133,7 @@ pub fn spawn_bot(
|
||||
gateway_projects_store,
|
||||
timer_store,
|
||||
gateway_event_rx,
|
||||
gateway_port,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -40,6 +40,43 @@ pub fn extract_rebuild_command(
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse a "rebuild gateway" command from a raw message body.
|
||||
///
|
||||
/// Returns `Some(RebuildCommand)` only when the stripped message begins with
|
||||
/// "rebuild gateway" (case-insensitive). A plain "rebuild" without the
|
||||
/// "gateway" qualifier returns `None` so it falls through to the standard
|
||||
/// server rebuild handler.
|
||||
pub fn extract_rebuild_gateway_command(
|
||||
message: &str,
|
||||
bot_name: &str,
|
||||
bot_user_id: &str,
|
||||
) -> Option<RebuildCommand> {
|
||||
let stripped = strip_bot_mention(message, bot_name, bot_user_id);
|
||||
let trimmed = stripped
|
||||
.trim()
|
||||
.trim_start_matches(|c: char| !c.is_alphanumeric());
|
||||
|
||||
let (cmd, rest) = trimmed.split_once(char::is_whitespace)?;
|
||||
|
||||
if !cmd.eq_ignore_ascii_case("rebuild") {
|
||||
return None;
|
||||
}
|
||||
|
||||
let qualifier = rest
|
||||
.trim()
|
||||
.trim_start_matches(|c: char| !c.is_alphanumeric());
|
||||
let first_word = match qualifier.split_once(char::is_whitespace) {
|
||||
Some((w, _)) => w,
|
||||
None => qualifier,
|
||||
};
|
||||
|
||||
if first_word.eq_ignore_ascii_case("gateway") {
|
||||
Some(RebuildCommand)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a rebuild command: trigger server rebuild and restart.
|
||||
///
|
||||
/// Returns a string describing the outcome. On build failure the error
|
||||
|
||||
@@ -35,6 +35,11 @@ pub(crate) struct CliArgs {
|
||||
/// `HUSKIES_BINARY_SOURCE` env var, then derives the URL from
|
||||
/// `HUSKIES_UPSTREAM_GATEWAY`.
|
||||
pub(crate) upgrade_source: Option<String>,
|
||||
/// Path to a trampoline job file (`--trampoline <path>`).
|
||||
///
|
||||
/// When set, the binary runs as a detached trampoline helper: it kills the
|
||||
/// old gateway, starts the new one, polls its health, and rolls back on failure.
|
||||
pub(crate) trampoline: Option<String>,
|
||||
}
|
||||
|
||||
/// Parse CLI arguments into `CliArgs`, or exit early for `--help` / `--version`.
|
||||
@@ -51,6 +56,7 @@ pub(crate) fn parse_cli_args(args: &[String]) -> Result<CliArgs, String> {
|
||||
let mut upstream_gateway: Option<String> = None;
|
||||
let mut upgrade = false;
|
||||
let mut upgrade_source: Option<String> = None;
|
||||
let mut trampoline: Option<String> = None;
|
||||
let mut i = 0;
|
||||
|
||||
while i < args.len() {
|
||||
@@ -143,6 +149,16 @@ pub(crate) fn parse_cli_args(args: &[String]) -> Result<CliArgs, String> {
|
||||
a if a.starts_with("--source=") => {
|
||||
upgrade_source = Some(a["--source=".len()..].to_string());
|
||||
}
|
||||
"--trampoline" => {
|
||||
i += 1;
|
||||
if i >= args.len() {
|
||||
return Err("--trampoline requires a path".to_string());
|
||||
}
|
||||
trampoline = Some(args[i].clone());
|
||||
}
|
||||
a if a.starts_with("--trampoline=") => {
|
||||
trampoline = Some(a["--trampoline=".len()..].to_string());
|
||||
}
|
||||
a if a.starts_with('-') => {
|
||||
return Err(format!("unknown option: {a}"));
|
||||
}
|
||||
@@ -172,6 +188,7 @@ pub(crate) fn parse_cli_args(args: &[String]) -> Result<CliArgs, String> {
|
||||
upstream_gateway,
|
||||
upgrade,
|
||||
upgrade_source,
|
||||
trampoline,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,9 @@
|
||||
//! Business logic lives in `service::gateway`, HTTP handlers in `http::gateway`.
|
||||
//! This file contains only the `run` entrypoint and `build_gateway_route` wiring.
|
||||
|
||||
/// Gateway rebuild — builds the new binary and launches the detached trampoline.
|
||||
pub mod rebuild;
|
||||
|
||||
use crate::http::gateway::*;
|
||||
use crate::rebuild::ShutdownReason;
|
||||
use crate::service::gateway::{self, GatewayState};
|
||||
|
||||
@@ -0,0 +1,115 @@
|
||||
//! Gateway rebuild — builds the new huskies binary and hands off to the trampoline.
|
||||
//!
|
||||
//! The trampoline is spawned as a detached process (new Unix session) so that it
|
||||
//! survives the gateway's own death. On success the gateway continues running
|
||||
//! until the trampoline kills it; the new gateway then posts "gateway X.Y.Z ready".
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
/// Build the huskies binary and launch the detached trampoline to swap the gateway.
|
||||
///
|
||||
/// Returns `Err(message)` (shown to the user in chat) if the build or trampoline
|
||||
/// launch fails. On success returns `Ok(())` — the trampoline is now running
|
||||
/// in a detached process and will kill this gateway and replace it with the new
|
||||
/// binary within 10 s.
|
||||
pub async fn rebuild_gateway(config_dir: &Path, gateway_port: u16) -> Result<(), String> {
|
||||
let manifest_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR"));
|
||||
let workspace_root = manifest_dir
|
||||
.parent()
|
||||
.ok_or("cannot determine workspace root from CARGO_MANIFEST_DIR")?;
|
||||
|
||||
crate::slog!(
|
||||
"[gateway-rebuild] Building from workspace root: {}",
|
||||
workspace_root.display()
|
||||
);
|
||||
|
||||
// Rebuild the frontend bundle so rust-embed picks up the latest assets.
|
||||
let frontend_dir = workspace_root.join("frontend");
|
||||
if frontend_dir.join("package.json").exists() {
|
||||
crate::slog!("[gateway-rebuild] Building frontend");
|
||||
let fe_output = tokio::task::spawn_blocking({
|
||||
let dir = frontend_dir.clone();
|
||||
move || {
|
||||
std::process::Command::new("npm")
|
||||
.args(["run", "build"])
|
||||
.current_dir(&dir)
|
||||
.output()
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|e| format!("frontend build task panicked: {e}"))?
|
||||
.map_err(|e| format!("failed to run npm run build: {e}"))?;
|
||||
|
||||
if !fe_output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&fe_output.stderr);
|
||||
return Err(format!("Frontend build failed:\n{stderr}"));
|
||||
}
|
||||
crate::slog!("[gateway-rebuild] Frontend build succeeded");
|
||||
}
|
||||
|
||||
// Build the server binary matching the current profile.
|
||||
let build_args: Vec<&str> = if cfg!(debug_assertions) {
|
||||
vec!["build", "-p", "huskies"]
|
||||
} else {
|
||||
vec!["build", "--release", "-p", "huskies"]
|
||||
};
|
||||
crate::slog!("[gateway-rebuild] cargo {}", build_args.join(" "));
|
||||
|
||||
let output = tokio::task::spawn_blocking({
|
||||
let root = workspace_root.to_path_buf();
|
||||
move || {
|
||||
std::process::Command::new("cargo")
|
||||
.args(&build_args)
|
||||
.current_dir(&root)
|
||||
.output()
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|e| format!("build task panicked: {e}"))?
|
||||
.map_err(|e| format!("failed to run cargo build: {e}"))?;
|
||||
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
crate::slog!("[gateway-rebuild] Build failed");
|
||||
return Err(format!("Build failed:\n{stderr}"));
|
||||
}
|
||||
|
||||
crate::slog!("[gateway-rebuild] Build succeeded — launching trampoline");
|
||||
|
||||
// Paths for the new and old binaries.
|
||||
let new_binary = if cfg!(debug_assertions) {
|
||||
workspace_root.join("target/debug/huskies")
|
||||
} else {
|
||||
workspace_root.join("target/release/huskies")
|
||||
};
|
||||
|
||||
let old_binary =
|
||||
std::env::current_exe().map_err(|e| format!("cannot locate current binary: {e}"))?;
|
||||
|
||||
let huskies_dir = config_dir.join(".huskies");
|
||||
std::fs::create_dir_all(&huskies_dir)
|
||||
.map_err(|e| format!("cannot create .huskies dir: {e}"))?;
|
||||
let backup_binary = huskies_dir.join("huskies_backup");
|
||||
|
||||
// Current gateway args (skip argv[0]).
|
||||
let gateway_args: Vec<String> = std::env::args().skip(1).collect();
|
||||
|
||||
let job = crate::trampoline::TrampolineJob {
|
||||
gateway_pid: std::process::id(),
|
||||
new_binary_path: new_binary,
|
||||
old_binary_path: old_binary,
|
||||
backup_binary_path: backup_binary,
|
||||
gateway_args,
|
||||
health_url: format!("http://127.0.0.1:{gateway_port}/api/gateway"),
|
||||
};
|
||||
|
||||
let job_path = huskies_dir.join("trampoline.json");
|
||||
crate::trampoline::write_job_atomic(&job, &job_path)?;
|
||||
|
||||
let exe = std::env::current_exe()
|
||||
.map_err(|e| format!("cannot locate current binary for trampoline: {e}"))?;
|
||||
crate::trampoline::spawn_detached_trampoline(&exe, &job_path)?;
|
||||
|
||||
crate::slog!("[gateway-rebuild] Trampoline launched — gateway will be replaced shortly");
|
||||
Ok(())
|
||||
}
|
||||
@@ -51,6 +51,8 @@ pub mod sled_uplink;
|
||||
mod startup;
|
||||
mod state;
|
||||
mod store;
|
||||
/// Detached trampoline — kills the running gateway and starts the new binary.
|
||||
pub mod trampoline;
|
||||
/// In-container binary self-update — fetch, atomic replace, and re-exec.
|
||||
pub mod upgrade;
|
||||
/// Validated input layer — transport-agnostic newtypes and request structs for all MCP write tools.
|
||||
@@ -162,6 +164,11 @@ async fn main() -> Result<(), std::io::Error> {
|
||||
}
|
||||
}
|
||||
|
||||
// ── Trampoline mode: kill old gateway, start new one ─────────────────────
|
||||
if let Some(ref job_path) = cli.trampoline {
|
||||
trampoline::run_trampoline(std::path::Path::new(job_path)).await;
|
||||
}
|
||||
|
||||
// ── Upgrade mode: fetch new binary, replace, exit ───────────────────────
|
||||
if cli.upgrade {
|
||||
let source = cli
|
||||
@@ -414,6 +421,7 @@ async fn main() -> Result<(), std::io::Error> {
|
||||
None,
|
||||
timer_store_for_bot,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
} else {
|
||||
drop(matrix_shutdown_rx);
|
||||
|
||||
@@ -579,6 +579,7 @@ pub fn spawn_gateway_bot(
|
||||
Some(gateway_projects_store),
|
||||
timer_store,
|
||||
gateway_event_rx,
|
||||
Some(port),
|
||||
);
|
||||
(handle, shutdown_tx)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,334 @@
|
||||
//! Detached trampoline — kills the running gateway and launches the replacement.
|
||||
//!
|
||||
//! The trampoline is invoked as `huskies --trampoline <job-file>`. It is spawned
|
||||
//! as a new Unix session (`setsid`) so that SIGKILL/SIGTERM sent to the original
|
||||
//! bash-tool process group does not reach it.
|
||||
//!
|
||||
//! Flow:
|
||||
//! 1. Gateway writes a [`TrampolineJob`] atomically and spawns the trampoline.
|
||||
//! 2. Trampoline backs up the old binary, kills the gateway, starts the new binary.
|
||||
//! 3. If the new binary passes a health-poll within 10 s → exit 0.
|
||||
//! 4. If it fails → restore backup, start it with `HUSKIES_TRAMPOLINE_FAILURE` set.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Duration;
|
||||
|
||||
// ── Job descriptor ────────────────────────────────────────────────────────────
|
||||
|
||||
/// Descriptor atomically written by the gateway before it hands control to the trampoline.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct TrampolineJob {
|
||||
/// PID of the currently running gateway process to kill.
|
||||
pub gateway_pid: u32,
|
||||
/// Absolute path to the newly compiled binary to launch.
|
||||
pub new_binary_path: PathBuf,
|
||||
/// Absolute path of the binary currently running as the gateway (for rollback).
|
||||
pub old_binary_path: PathBuf,
|
||||
/// Where to write the backup of the old binary before killing the gateway.
|
||||
pub backup_binary_path: PathBuf,
|
||||
/// Arguments forwarded verbatim to the new/backup gateway (everything after argv[0]).
|
||||
pub gateway_args: Vec<String>,
|
||||
/// HTTP URL the trampoline polls to verify the new gateway is serving.
|
||||
/// Empty string means skip health polling (used in tests).
|
||||
pub health_url: String,
|
||||
}
|
||||
|
||||
// ── Atomic write ──────────────────────────────────────────────────────────────
|
||||
|
||||
/// Write `job` to `path` atomically: write to a sibling `.tmp` file, then rename.
|
||||
///
|
||||
/// The rename is atomic on POSIX so the trampoline never reads a half-written file.
|
||||
pub fn write_job_atomic(job: &TrampolineJob, path: &Path) -> Result<(), String> {
|
||||
let tmp = path.with_extension("tmp");
|
||||
let data = serde_json::to_vec(job).map_err(|e| format!("JSON encode failed: {e}"))?;
|
||||
std::fs::write(&tmp, &data).map_err(|e| format!("tmp write failed: {e}"))?;
|
||||
std::fs::rename(&tmp, path).map_err(|e| format!("rename failed: {e}"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Spawn detached ────────────────────────────────────────────────────────────
|
||||
|
||||
/// Spawn `exe --trampoline <job_path>` as a fully detached process.
|
||||
///
|
||||
/// On Unix the child calls `setsid()` in `pre_exec` so it belongs to a new session
|
||||
/// and is unreachable by signals sent to the original process group. stdin/stdout/
|
||||
/// stderr are all redirected to `/dev/null` so the child is fully daemonised.
|
||||
pub fn spawn_detached_trampoline(exe: &Path, job_path: &Path) -> Result<(), String> {
|
||||
let mut cmd = std::process::Command::new(exe);
|
||||
cmd.arg("--trampoline").arg(job_path);
|
||||
cmd.stdin(std::process::Stdio::null());
|
||||
cmd.stdout(std::process::Stdio::null());
|
||||
cmd.stderr(std::process::Stdio::null());
|
||||
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::os::unix::process::CommandExt;
|
||||
// SAFETY: setsid() is async-signal-safe. This is called in the child
|
||||
// between fork and exec with no other threads running in the child's
|
||||
// address space — the only safe window for pre_exec hooks.
|
||||
unsafe {
|
||||
cmd.pre_exec(|| {
|
||||
if libc::setsid() == -1 {
|
||||
return Err(std::io::Error::last_os_error());
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
cmd.spawn().map_err(|e| format!("spawn failed: {e}"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Process management ────────────────────────────────────────────────────────
|
||||
|
||||
/// Send SIGTERM to `pid`, wait up to 3 s for it to exit, then SIGKILL.
|
||||
///
|
||||
/// After SIGKILL the process is unconditionally considered gone — SIGKILL cannot
|
||||
/// be ignored, so the process is dead even if it briefly lingers as a zombie
|
||||
/// (zombie detection via `kill(pid, 0)` is unreliable from a non-parent process).
|
||||
#[cfg(unix)]
|
||||
fn kill_gateway_process(pid: u32) -> Result<(), String> {
|
||||
use std::thread::sleep;
|
||||
|
||||
let ipid = pid as libc::pid_t;
|
||||
|
||||
// Safety: kill() is always safe to call with any pid.
|
||||
let running = || unsafe { libc::kill(ipid, 0) } == 0;
|
||||
|
||||
if !running() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
unsafe { libc::kill(ipid, libc::SIGTERM) };
|
||||
|
||||
for _ in 0..30 {
|
||||
sleep(Duration::from_millis(100));
|
||||
if !running() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// SIGKILL cannot be ignored — the kernel will terminate the process.
|
||||
// We don't loop-poll after this: the process may briefly appear as a
|
||||
// zombie (still in the table, not yet reaped by its parent), in which
|
||||
// case kill(pid, 0) returns 0 even though it is effectively dead.
|
||||
unsafe { libc::kill(ipid, libc::SIGKILL) };
|
||||
sleep(Duration::from_millis(200));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
fn kill_gateway_process(pid: u32) -> Result<(), String> {
|
||||
Err(format!("kill not supported on this platform (pid {pid})"))
|
||||
}
|
||||
|
||||
// ── Health polling ────────────────────────────────────────────────────────────
|
||||
|
||||
/// Poll `url` every 500 ms until it returns HTTP 2xx or `timeout` elapses.
|
||||
async fn poll_health(url: &str, timeout: Duration) -> Result<(), String> {
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(2))
|
||||
.build()
|
||||
.unwrap_or_else(|_| reqwest::Client::new());
|
||||
|
||||
let deadline = std::time::Instant::now() + timeout;
|
||||
while std::time::Instant::now() < deadline {
|
||||
if let Ok(resp) = client.get(url).send().await
|
||||
&& resp.status().is_success()
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
Err(format!(
|
||||
"health check timed out after {}s: {url}",
|
||||
timeout.as_secs()
|
||||
))
|
||||
}
|
||||
|
||||
// ── Core logic (testable) ─────────────────────────────────────────────────────
|
||||
|
||||
/// Kill the old gateway, start the new one, and poll its health endpoint.
|
||||
///
|
||||
/// Returns `Ok(())` on success or `Err(reason)` when the new gateway could not
|
||||
/// be started or failed health checks. Callers are responsible for rollback.
|
||||
///
|
||||
/// When `job.health_url` is empty the health poll is skipped (for unit tests).
|
||||
pub async fn execute_trampoline_core(job: &TrampolineJob) -> Result<(), String> {
|
||||
// Back up old binary (best-effort — rollback won't work if this fails).
|
||||
if let Some(parent) = job.backup_binary_path.parent() {
|
||||
let _ = std::fs::create_dir_all(parent);
|
||||
}
|
||||
let _ = std::fs::copy(&job.old_binary_path, &job.backup_binary_path);
|
||||
|
||||
// Kill old gateway.
|
||||
kill_gateway_process(job.gateway_pid)?;
|
||||
|
||||
// Start new gateway.
|
||||
std::process::Command::new(&job.new_binary_path)
|
||||
.args(&job.gateway_args)
|
||||
.env("HUSKIES_TRAMPOLINE_STARTED", "1")
|
||||
.stdin(std::process::Stdio::null())
|
||||
.stdout(std::process::Stdio::null())
|
||||
.stderr(std::process::Stdio::null())
|
||||
.spawn()
|
||||
.map_err(|e| format!("failed to start new gateway: {e}"))?;
|
||||
|
||||
// Poll health (skip when URL is empty — used in tests).
|
||||
if !job.health_url.is_empty() {
|
||||
poll_health(&job.health_url, Duration::from_secs(10)).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Entry point ───────────────────────────────────────────────────────────────
|
||||
|
||||
/// Run the trampoline from a job file. This function never returns.
|
||||
///
|
||||
/// On success exits 0 (new gateway is up and will post its own "ready" message).
|
||||
/// On failure starts the backup binary with `HUSKIES_TRAMPOLINE_FAILURE` set and
|
||||
/// exits 1. On unrecoverable failure (cannot start backup either) exits 2.
|
||||
pub async fn run_trampoline(job_path: &Path) -> ! {
|
||||
let data = match std::fs::read(job_path) {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
eprintln!(
|
||||
"[trampoline] cannot read job file {}: {e}",
|
||||
job_path.display()
|
||||
);
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
let job: TrampolineJob = match serde_json::from_slice(&data) {
|
||||
Ok(j) => j,
|
||||
Err(e) => {
|
||||
eprintln!("[trampoline] cannot parse job file: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
eprintln!(
|
||||
"[trampoline] killing gateway PID {} and starting {}",
|
||||
job.gateway_pid,
|
||||
job.new_binary_path.display()
|
||||
);
|
||||
|
||||
match execute_trampoline_core(&job).await {
|
||||
Ok(()) => {
|
||||
eprintln!("[trampoline] new gateway is up — exiting");
|
||||
let _ = std::fs::remove_file(job_path);
|
||||
std::process::exit(0);
|
||||
}
|
||||
Err(reason) => {
|
||||
eprintln!("[trampoline] new gateway failed ({reason}) — rolling back");
|
||||
let result = std::process::Command::new(&job.backup_binary_path)
|
||||
.args(&job.gateway_args)
|
||||
.env("HUSKIES_TRAMPOLINE_FAILURE", &reason)
|
||||
.stdin(std::process::Stdio::null())
|
||||
.stdout(std::process::Stdio::null())
|
||||
.stderr(std::process::Stdio::null())
|
||||
.spawn();
|
||||
match result {
|
||||
Ok(_) => {
|
||||
let _ = std::fs::remove_file(job_path);
|
||||
std::process::exit(1);
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("[trampoline] FATAL: cannot start backup gateway: {e}");
|
||||
std::process::exit(2);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Tests ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Locate `sleep` on the current platform (needed for a portable fake-gateway).
|
||||
fn find_sleep() -> PathBuf {
|
||||
for candidate in ["/usr/bin/sleep", "/bin/sleep"] {
|
||||
let p = PathBuf::from(candidate);
|
||||
if p.exists() {
|
||||
return p;
|
||||
}
|
||||
}
|
||||
panic!("sleep binary not found");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn write_job_atomic_round_trips() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let job = TrampolineJob {
|
||||
gateway_pid: 12345,
|
||||
new_binary_path: PathBuf::from("/new/huskies"),
|
||||
old_binary_path: PathBuf::from("/old/huskies"),
|
||||
backup_binary_path: tmp.path().join("backup"),
|
||||
gateway_args: vec!["--gateway".to_string(), "/workspace".to_string()],
|
||||
health_url: "http://127.0.0.1:3000/api/gateway".to_string(),
|
||||
};
|
||||
let path = tmp.path().join("trampoline.json");
|
||||
write_job_atomic(&job, &path).unwrap();
|
||||
|
||||
// No .tmp file should remain.
|
||||
assert!(!path.with_extension("tmp").exists());
|
||||
// Final file must exist.
|
||||
assert!(path.exists());
|
||||
|
||||
// Round-trip: deserialise and compare fields.
|
||||
let data = std::fs::read(&path).unwrap();
|
||||
let loaded: TrampolineJob = serde_json::from_slice(&data).unwrap();
|
||||
assert_eq!(loaded.gateway_pid, job.gateway_pid);
|
||||
assert_eq!(loaded.new_binary_path, job.new_binary_path);
|
||||
assert_eq!(loaded.gateway_args, job.gateway_args);
|
||||
}
|
||||
|
||||
/// AC 5: a fake-gateway `sleep` process is killed and replaced within timeout.
|
||||
#[tokio::test]
|
||||
async fn fake_gateway_killed_and_replaced_within_timeout() {
|
||||
let sleep_exe = find_sleep();
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
|
||||
// Spawn the fake gateway (a long-lived sleep process).
|
||||
let mut fake_gw = std::process::Command::new(&sleep_exe)
|
||||
.arg("60")
|
||||
.stdin(std::process::Stdio::null())
|
||||
.spawn()
|
||||
.expect("spawn fake gateway");
|
||||
let fake_pid = fake_gw.id();
|
||||
|
||||
let job = TrampolineJob {
|
||||
gateway_pid: fake_pid,
|
||||
new_binary_path: sleep_exe.clone(),
|
||||
old_binary_path: sleep_exe.clone(),
|
||||
backup_binary_path: tmp.path().join("backup"),
|
||||
gateway_args: vec!["1".to_string()],
|
||||
health_url: String::new(), // skip health check in test
|
||||
};
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
let result = execute_trampoline_core(&job).await;
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
assert!(result.is_ok(), "trampoline core should succeed: {result:?}");
|
||||
assert!(
|
||||
elapsed < Duration::from_secs(10),
|
||||
"should complete well within 10s timeout, took {elapsed:?}"
|
||||
);
|
||||
|
||||
// Reap the zombie — should be dead now.
|
||||
let status = fake_gw.try_wait().expect("try_wait");
|
||||
assert!(
|
||||
status.is_some(),
|
||||
"fake gateway process should be dead after trampoline kill"
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user