From 19b7edb60c495f027abe533e58701b4ac4a82a3d Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 12 May 2026 16:30:55 +0000 Subject: [PATCH] huskies: merge 918 --- server/src/gateway/mod.rs | 22 +++++++++--- server/src/service/gateway/io.rs | 59 ++++++++++++++++++++++++++++--- server/src/service/gateway/mod.rs | 11 +++++- 3 files changed, 83 insertions(+), 9 deletions(-) diff --git a/server/src/gateway/mod.rs b/server/src/gateway/mod.rs index be00f6ef..20d77d45 100644 --- a/server/src/gateway/mod.rs +++ b/server/src/gateway/mod.rs @@ -5,6 +5,7 @@ //! This file contains only the `run` entrypoint and `build_gateway_route` wiring. use crate::http::gateway::*; +use crate::rebuild::ShutdownReason; use crate::service::gateway::{self, GatewayState}; use poem::EndpointExt; use std::path::Path; @@ -118,7 +119,7 @@ pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> { .iter() .map(|(name, entry)| (name.clone(), entry.url.clone())) .collect(); - let bot_abort = gateway::io::spawn_gateway_bot( + let (bot_abort, bot_shutdown_tx) = gateway::io::spawn_gateway_bot( &config_dir, Arc::clone(&state_arc.active_project), gateway_projects, @@ -127,16 +128,29 @@ pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> { Some(state_arc.event_tx.clone()), ); *state_arc.bot_handle.lock().await = bot_abort; + *state_arc.bot_shutdown_tx.lock().await = Some(bot_shutdown_tx); - let route = build_gateway_route(state_arc); + let route = build_gateway_route(state_arc.clone()); let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); let addr = format!("{host}:{port}"); crate::slog!("[gateway] Listening on {addr}"); - poem::Server::new(poem::listener::TcpListener::bind(&addr)) + let result = poem::Server::new(poem::listener::TcpListener::bind(&addr)) .run(route) - .await + .await; + + // Best-effort shutdown notification: signal the Matrix bot so it can post + // "going offline" before the process exits. Mirror of main.rs:346. + { + let guard = state_arc.bot_shutdown_tx.lock().await; + if let Some(tx) = guard.as_ref() { + let _ = tx.send(Some(ShutdownReason::Manual)); + } + } + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + + result } #[cfg(test)] diff --git a/server/src/service/gateway/io.rs b/server/src/service/gateway/io.rs index ff5aa81b..f7a39d57 100644 --- a/server/src/service/gateway/io.rs +++ b/server/src/service/gateway/io.rs @@ -431,6 +431,10 @@ pub type ActiveProject = std::sync::Arc>; /// `gateway_event_tx` — when `Some`, the bot will subscribe to the gateway /// status broadcaster and forward [`super::GatewayStatusEvent`]s to its /// configured Matrix rooms with a `[project-name]` prefix. +/// +/// Returns `(abort_handle, shutdown_tx)`. The caller **must** hold +/// `shutdown_tx` for the bot's lifetime and send `Some(ShutdownReason)` on it +/// before process exit so the bot can announce "going offline" to its rooms. pub fn spawn_gateway_bot( config_dir: &Path, active_project: ActiveProject, @@ -438,7 +442,10 @@ pub fn spawn_gateway_bot( gateway_project_urls: BTreeMap, port: u16, gateway_event_tx: Option>, -) -> Option { +) -> ( + Option, + tokio::sync::watch::Sender>, +) { use crate::agents::AgentPool; use crate::services::Services; use tokio::sync::{broadcast, mpsc}; @@ -458,7 +465,8 @@ pub fn spawn_gateway_bot( let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel::>(None); - std::mem::forget(shutdown_tx); + // shutdown_tx is intentionally NOT forgotten — the caller holds it and + // sends Some(reason) on gateway shutdown so the bot announces "going offline". let agents = std::sync::Arc::new(AgentPool::new(port, watcher_tx.clone())); @@ -498,7 +506,7 @@ pub fn spawn_gateway_bot( config_dir.join(".huskies").join("timers.json"), )); let gateway_event_rx = gateway_event_tx.map(|tx| tx.subscribe()); - crate::chat::transport::matrix::spawn_bot( + let handle = crate::chat::transport::matrix::spawn_bot( config_dir, watcher_tx, services, @@ -508,5 +516,48 @@ pub fn spawn_gateway_bot( gateway_project_urls, timer_store, gateway_event_rx, - ) + ); + (handle, shutdown_tx) +} + +// ── Tests ──────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + /// Regression test for story 918: `spawn_gateway_bot` must return a live + /// `shutdown_tx` (not one dropped via `std::mem::forget`) so that callers + /// can signal `Some(reason)` and the bot's shutdown watcher receives it. + #[tokio::test] + async fn spawn_gateway_bot_shutdown_tx_not_forgotten() { + let tmp = tempfile::tempdir().unwrap(); + let active = std::sync::Arc::new(tokio::sync::RwLock::new("proj".to_string())); + let (event_tx, _) = tokio::sync::broadcast::channel(4); + + let (handle, shutdown_tx) = spawn_gateway_bot( + tmp.path(), + active, + vec!["proj".to_string()], + std::collections::BTreeMap::new(), + 3001, + Some(event_tx), + ); + + // No bot.toml in tmp → no abort handle spawned. + assert!(handle.is_none()); + + // The shutdown_tx must be live: subscribe a receiver and verify that + // sending Some(reason) is observed — this would fail if the sender + // had been forgotten (channel closed, send returns Err). + let rx = shutdown_tx.subscribe(); + shutdown_tx + .send(Some(crate::rebuild::ShutdownReason::Manual)) + .expect("shutdown_tx must not be closed (was previously std::mem::forget'd)"); + assert_eq!( + *rx.borrow(), + Some(crate::rebuild::ShutdownReason::Manual), + "shutdown receiver must see the Manual reason" + ); + } } diff --git a/server/src/service/gateway/mod.rs b/server/src/service/gateway/mod.rs index 1c3b3b7e..e1f66156 100644 --- a/server/src/service/gateway/mod.rs +++ b/server/src/service/gateway/mod.rs @@ -22,6 +22,7 @@ pub use io::{ spawn_gateway_notification_poller, }; +use crate::rebuild::ShutdownReason; use io::Client; use std::collections::{BTreeMap, HashMap}; use std::path::PathBuf; @@ -111,6 +112,12 @@ pub struct GatewayState { pub port: u16, /// Abort handle for the running Matrix bot task (if any). pub bot_handle: Arc>>, + /// Watch sender used to signal the Matrix bot to shut down gracefully. + /// + /// Send `Some(ShutdownReason)` on this channel before process exit so the + /// bot task receives the signal and posts an "going offline" announcement. + pub bot_shutdown_tx: + Arc>>>>, /// Broadcast sender for [`GatewayStatusEvent`]s pushed by project nodes. /// /// Call `event_tx.subscribe()` to obtain a receiver for outbound fan-out. @@ -142,6 +149,7 @@ impl GatewayState { config_dir, port, bot_handle: Arc::new(TokioMutex::new(None)), + bot_shutdown_tx: Arc::new(TokioMutex::new(None)), event_tx, }) } @@ -462,7 +470,7 @@ pub async fn save_bot_config_and_restart(state: &GatewayState, content: &str) -> .map(|(name, entry)| (name.clone(), entry.url.clone())) .collect(); - let new_handle = io::spawn_gateway_bot( + let (new_handle, new_shutdown_tx) = io::spawn_gateway_bot( &state.config_dir, Arc::clone(&state.active_project), gateway_projects, @@ -471,6 +479,7 @@ pub async fn save_bot_config_and_restart(state: &GatewayState, content: &str) -> Some(state.event_tx.clone()), ); *handle = new_handle; + *state.bot_shutdown_tx.lock().await = Some(new_shutdown_tx); } crate::slog!("[gateway] Bot configuration saved; bot restarted");