From 47173e0d3a59f154415a79239dd322fdf33251f4 Mon Sep 17 00:00:00 2001 From: dave Date: Sun, 22 Mar 2026 19:08:41 +0000 Subject: [PATCH] storkit: merge 366_story_bot_sends_shutdown_message_on_server_stop_or_rebuild --- server/src/http/context.rs | 17 ++ server/src/http/mcp/diagnostics.rs | 12 +- server/src/main.rs | 91 +++++++++-- server/src/matrix/bot.rs | 25 +++ server/src/matrix/mod.rs | 11 +- server/src/matrix/rebuild.rs | 2 +- server/src/rebuild.rs | 252 ++++++++++++++++++++++++++++- 7 files changed, 390 insertions(+), 20 deletions(-) diff --git a/server/src/http/context.rs b/server/src/http/context.rs index c0150c2..08f796f 100644 --- a/server/src/http/context.rs +++ b/server/src/http/context.rs @@ -1,5 +1,6 @@ use crate::agents::{AgentPool, ReconciliationEvent}; use crate::io::watcher::WatcherEvent; +use crate::rebuild::{BotShutdownNotifier, ShutdownReason}; use crate::state::SessionState; use crate::store::JsonFileStore; use crate::workflow::WorkflowState; @@ -52,6 +53,20 @@ pub struct AppContext { /// Child process of the QA app launched for manual testing. /// Only one instance runs at a time. pub qa_app_process: Arc>>, + /// Best-effort shutdown notifier for active bot channels (Slack / WhatsApp). + /// + /// When set, the MCP `rebuild_and_restart` tool uses this to announce the + /// shutdown to configured channels before re-execing the server binary. + /// `None` when no webhook-based bot transport is configured. + pub bot_shutdown: Option>, + /// Watch sender used to signal the Matrix bot task that the server is + /// shutting down (rebuild path). The bot task listens for this signal and + /// sends a shutdown announcement to all configured rooms. + /// + /// Wrapped in `Arc` so `AppContext` can implement `Clone`. + /// `None` when no Matrix bot is configured. + pub matrix_shutdown_tx: + Option>>>, } #[cfg(test)] @@ -73,6 +88,8 @@ impl AppContext { perm_tx, perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), qa_app_process: Arc::new(std::sync::Mutex::new(None)), + bot_shutdown: None, + matrix_shutdown_tx: None, } } } diff --git a/server/src/http/mcp/diagnostics.rs b/server/src/http/mcp/diagnostics.rs index b173fff..6c87d93 100644 --- a/server/src/http/mcp/diagnostics.rs +++ b/server/src/http/mcp/diagnostics.rs @@ -29,8 +29,18 @@ pub(super) fn tool_get_server_logs(args: &Value) -> Result { /// Rebuild the server binary and re-exec (delegates to `crate::rebuild`). pub(super) async fn tool_rebuild_and_restart(ctx: &AppContext) -> Result { slog!("[rebuild] Rebuild and restart requested via MCP tool"); + + // Signal the Matrix bot (if active) so it can send its own shutdown + // announcement before the process is replaced. Best-effort: we wait up + // to 1.5 s for the message to be delivered. + if let Some(ref tx) = ctx.matrix_shutdown_tx { + let _ = tx.send(Some(crate::rebuild::ShutdownReason::Rebuild)); + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + } + let project_root = ctx.state.get_project_root().unwrap_or_default(); - crate::rebuild::rebuild_and_restart(&ctx.agents, &project_root).await + let notifier = ctx.bot_shutdown.as_deref(); + crate::rebuild::rebuild_and_restart(&ctx.agents, &project_root, notifier).await } /// Generate a Claude Code permission rule string for the given tool name and input. diff --git a/server/src/main.rs b/server/src/main.rs index 20543ba..dbd2fd4 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -24,6 +24,7 @@ use crate::http::build_routes; use crate::http::context::AppContext; use crate::http::{remove_port_file, resolve_port, write_port_file}; use crate::io::fs::find_story_kit_root; +use crate::rebuild::{BotShutdownNotifier, ShutdownReason}; use crate::state::SessionState; use crate::store::JsonFileStore; use crate::workflow::WorkflowState; @@ -177,17 +178,6 @@ async fn main() -> Result<(), std::io::Error> { let startup_reconciliation_tx = reconciliation_tx.clone(); // Clone for shutdown cleanup — kill orphaned PTY children before exiting. let agents_for_shutdown = Arc::clone(&agents); - let ctx = AppContext { - state: app_state, - store, - workflow, - agents, - watcher_tx, - reconciliation_tx, - perm_tx, - perm_rx, - qa_app_process: Arc::new(std::sync::Mutex::new(None)), - }; // Build WhatsApp webhook context if bot.toml configures transport = "whatsapp". let whatsapp_ctx: Option> = startup_root @@ -255,7 +245,50 @@ async fn main() -> Result<(), std::io::Error> { }) }); - let app = build_routes(ctx, whatsapp_ctx, slack_ctx); + // Build a best-effort shutdown notifier for webhook-based transports. + // + // • Slack: channels are fixed at startup (channel_ids from bot.toml). + // • WhatsApp: active senders are tracked at runtime in ambient_rooms. + // We keep the WhatsApp context Arc so we can read the rooms at shutdown. + // • Matrix: the bot task manages its own announcement via matrix_shutdown_tx. + let bot_shutdown_notifier: Option> = + if let Some(ref ctx) = slack_ctx { + let channels: Vec = ctx.channel_ids.iter().cloned().collect(); + Some(Arc::new(BotShutdownNotifier::new( + Arc::clone(&ctx.transport) as Arc, + channels, + ctx.bot_name.clone(), + ))) + } else { + None + }; + // Retain a reference to the WhatsApp context for shutdown notifications. + // At shutdown time we read ambient_rooms to get the current set of active senders. + let whatsapp_ctx_for_shutdown: Option> = + whatsapp_ctx.clone(); + + // Watch channel: signals the Matrix bot task to send a shutdown announcement. + // `None` initial value means "server is running". + let (matrix_shutdown_tx, matrix_shutdown_rx) = + tokio::sync::watch::channel::>(None); + let matrix_shutdown_tx = Arc::new(matrix_shutdown_tx); + let matrix_shutdown_tx_for_rebuild = Arc::clone(&matrix_shutdown_tx); + + let ctx = AppContext { + state: app_state, + store, + workflow, + agents, + watcher_tx, + reconciliation_tx, + perm_tx, + perm_rx, + qa_app_process: Arc::new(std::sync::Mutex::new(None)), + bot_shutdown: bot_shutdown_notifier.clone(), + matrix_shutdown_tx: Some(Arc::clone(&matrix_shutdown_tx)), + }; + + let app = build_routes(ctx, whatsapp_ctx.clone(), slack_ctx.clone()); // Optional Matrix bot: connect to the homeserver and start listening for // messages if `.storkit/bot.toml` is present and enabled. @@ -265,7 +298,11 @@ async fn main() -> Result<(), std::io::Error> { watcher_tx_for_bot, perm_rx_for_bot, Arc::clone(&startup_agents), + matrix_shutdown_rx, ); + } else { + // Keep the receiver alive (drop it) so the sender never errors. + drop(matrix_shutdown_rx); } // On startup: @@ -295,6 +332,36 @@ async fn main() -> Result<(), std::io::Error> { let result = Server::new(TcpListener::bind(&addr)).run(app).await; + // ── Shutdown notifications (best-effort) ───────────────────────────── + // + // The server is stopping (SIGINT / SIGTERM). Notify active bot channels + // so participants know the bot is going offline. We do this before killing + // PTY children so network I/O can still complete. + + // Slack: notifier holds the fixed channel list. + if let Some(ref notifier) = bot_shutdown_notifier { + notifier.notify(ShutdownReason::Manual).await; + } + + // WhatsApp: read the current set of ambient rooms and notify each sender. + if let Some(ref ctx) = whatsapp_ctx_for_shutdown { + let rooms: Vec = ctx.ambient_rooms.lock().unwrap().iter().cloned().collect(); + if !rooms.is_empty() { + let wa_notifier = BotShutdownNotifier::new( + Arc::clone(&ctx.transport) as Arc, + rooms, + ctx.bot_name.clone(), + ); + wa_notifier.notify(ShutdownReason::Manual).await; + } + } + + // Matrix: signal the bot task and give it a short window to send its message. + let _ = matrix_shutdown_tx_for_rebuild.send(Some(ShutdownReason::Manual)); + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + + // ── Cleanup ────────────────────────────────────────────────────────── + // Kill all active PTY child processes before exiting to prevent orphaned // Claude Code processes from running after the server restarts. agents_for_shutdown.kill_all_children(); diff --git a/server/src/matrix/bot.rs b/server/src/matrix/bot.rs index 877181c..0222d1b 100644 --- a/server/src/matrix/bot.rs +++ b/server/src/matrix/bot.rs @@ -213,6 +213,7 @@ pub async fn run_bot( watcher_rx: tokio::sync::broadcast::Receiver, perm_rx: Arc>>, agents: Arc, + shutdown_rx: tokio::sync::watch::Receiver>, ) -> Result<(), String> { let store_path = project_root.join(".storkit").join("matrix_store"); let client = Client::builder() @@ -426,6 +427,30 @@ pub async fn run_bot( notif_project_root, ); + // Spawn a shutdown watcher that sends a best-effort goodbye message to all + // configured rooms when the server is about to stop (SIGINT/SIGTERM or rebuild). + { + let shutdown_transport = Arc::clone(&transport); + let shutdown_rooms: Vec = + announce_room_ids.iter().map(|r| r.to_string()).collect(); + let shutdown_bot_name = announce_bot_name.clone(); + let mut rx = shutdown_rx; + tokio::spawn(async move { + // Wait until the channel holds Some(reason). + if rx.wait_for(|v| v.is_some()).await.is_ok() { + let reason = rx.borrow().clone(); + let notifier = crate::rebuild::BotShutdownNotifier::new( + shutdown_transport, + shutdown_rooms, + shutdown_bot_name, + ); + if let Some(r) = reason { + notifier.notify(r).await; + } + } + }); + } + // Send a startup announcement to each configured room so users know the // bot is online. This runs once per process start — the sync loop handles // reconnects internally so this code is never reached again on a network diff --git a/server/src/matrix/mod.rs b/server/src/matrix/mod.rs index a60439f..377c4ea 100644 --- a/server/src/matrix/mod.rs +++ b/server/src/matrix/mod.rs @@ -32,9 +32,10 @@ pub use config::BotConfig; use crate::agents::AgentPool; use crate::http::context::PermissionForward; use crate::io::watcher::WatcherEvent; +use crate::rebuild::ShutdownReason; use std::path::Path; use std::sync::Arc; -use tokio::sync::{Mutex as TokioMutex, broadcast, mpsc}; +use tokio::sync::{Mutex as TokioMutex, broadcast, mpsc, watch}; /// Attempt to start the Matrix bot. /// @@ -50,12 +51,17 @@ use tokio::sync::{Mutex as TokioMutex, broadcast, mpsc}; /// `prompt_permission` tool. The bot locks it during active chat sessions /// to surface permission prompts to the Matrix room and relay user decisions. /// +/// `shutdown_rx` is a watch channel that delivers a `ShutdownReason` when the +/// server is about to stop (SIGINT/SIGTERM or rebuild). The bot uses this to +/// announce the shutdown to all configured rooms before the process exits. +/// /// Must be called from within a Tokio runtime context (e.g., from `main`). pub fn spawn_bot( project_root: &Path, watcher_tx: broadcast::Sender, perm_rx: Arc>>, agents: Arc, + shutdown_rx: watch::Receiver>, ) { let config = match BotConfig::load(project_root) { Some(c) => c, @@ -83,7 +89,8 @@ pub fn spawn_bot( let root = project_root.to_path_buf(); let watcher_rx = watcher_tx.subscribe(); tokio::spawn(async move { - if let Err(e) = bot::run_bot(config, root, watcher_rx, perm_rx, agents).await { + if let Err(e) = bot::run_bot(config, root, watcher_rx, perm_rx, agents, shutdown_rx).await + { crate::slog!("[matrix-bot] Fatal error: {e}"); } }); diff --git a/server/src/matrix/rebuild.rs b/server/src/matrix/rebuild.rs index 278750e..9da471d 100644 --- a/server/src/matrix/rebuild.rs +++ b/server/src/matrix/rebuild.rs @@ -50,7 +50,7 @@ pub async fn handle_rebuild( agents: &Arc, ) -> String { crate::slog!("[matrix-bot] rebuild command received (bot={bot_name})"); - match crate::rebuild::rebuild_and_restart(agents, project_root).await { + match crate::rebuild::rebuild_and_restart(agents, project_root, None).await { Ok(msg) => msg, Err(e) => format!("Rebuild failed: {e}"), } diff --git a/server/src/rebuild.rs b/server/src/rebuild.rs index 374ae0d..596267b 100644 --- a/server/src/rebuild.rs +++ b/server/src/rebuild.rs @@ -2,7 +2,72 @@ use crate::agents::AgentPool; use crate::slog; +use crate::transport::ChatTransport; use std::path::Path; +use std::sync::Arc; + +// ── Shutdown notification ──────────────────────────────────────────────── + +/// The reason the server is shutting down. +/// +/// Used to select the appropriate shutdown message sent to active bot channels. +#[derive(Clone, Debug, PartialEq)] +pub enum ShutdownReason { + /// The operator stopped the server manually (SIGINT / SIGTERM / ctrl-c). + Manual, + /// A rebuild-and-restart was requested (via MCP tool or bot command). + Rebuild, +} + +/// Sends a shutdown announcement to all configured bot channels. +/// +/// Wraps a [`ChatTransport`] together with the list of channel/room IDs the +/// bot is active in. Calling [`notify`] is best-effort — failures are logged +/// but never propagate, so shutdown is never blocked by a failed send. +pub struct BotShutdownNotifier { + transport: Arc, + channels: Vec, + bot_name: String, +} + +impl BotShutdownNotifier { + pub fn new( + transport: Arc, + channels: Vec, + bot_name: String, + ) -> Self { + Self { + transport, + channels, + bot_name, + } + } + + /// Send a shutdown message to all configured channels. + /// + /// Errors from individual sends are logged and ignored so that a single + /// failing channel does not prevent messages from reaching the rest. + pub async fn notify(&self, reason: ShutdownReason) { + let msg = match reason { + ShutdownReason::Manual => { + format!("{} is going offline (server stopped).", self.bot_name) + } + ShutdownReason::Rebuild => { + format!( + "{} is going offline to pick up a new build.", + self.bot_name + ) + } + }; + for channel in &self.channels { + if let Err(e) = self.transport.send_message(channel, &msg, &msg).await { + slog!("[shutdown] Failed to send shutdown message to {channel}: {e}"); + } + } + } +} + +// ── Rebuild ────────────────────────────────────────────────────────────── /// Rebuild the server binary and re-exec. /// @@ -10,9 +75,14 @@ use std::path::Path; /// 2. Runs `cargo build [-p storkit]` from the workspace root, matching /// the current build profile (debug or release). /// 3. If the build fails, returns the build error (server stays up). -/// 4. If the build succeeds, re-execs the process with the new binary via -/// `std::os::unix::process::CommandExt::exec()`. -pub async fn rebuild_and_restart(agents: &AgentPool, project_root: &Path) -> Result { +/// 4. If the build succeeds, sends a best-effort shutdown notification (if a +/// [`BotShutdownNotifier`] is provided), then re-execs the process with +/// the new binary via `std::os::unix::process::CommandExt::exec()`. +pub async fn rebuild_and_restart( + agents: &AgentPool, + project_root: &Path, + notifier: Option<&BotShutdownNotifier>, +) -> Result { slog!("[rebuild] Rebuild and restart requested"); // 1. Gracefully stop all running agents. @@ -69,7 +139,14 @@ pub async fn rebuild_and_restart(agents: &AgentPool, project_root: &Path) -> Res slog!("[rebuild] Build succeeded, re-execing with new binary"); - // 4. Re-exec with the new binary. + // 4. Send shutdown notification before replacing the process so that chat + // participants know the bot is going offline. Best-effort only — we + // do not abort the rebuild if the send fails. + if let Some(n) = notifier { + n.notify(ShutdownReason::Rebuild).await; + } + + // 5. Re-exec with the new binary. // Use the cargo output path rather than current_exe() so that rebuilds // inside Docker work correctly — the running binary may be installed at // /usr/local/bin/storkit (read-only) while cargo writes the new binary @@ -102,3 +179,170 @@ pub async fn rebuild_and_restart(agents: &AgentPool, project_root: &Path) -> Res // If we get here, exec() failed. Err(format!("Failed to exec new binary: {err}")) } + +// ── Tests ──────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use async_trait::async_trait; + use crate::transport::MessageId; + use std::sync::Mutex; + + /// In-memory transport that records sent messages. + struct CapturingTransport { + sent: Mutex>, + fail: bool, + } + + impl CapturingTransport { + fn new() -> Self { + Self { + sent: Mutex::new(Vec::new()), + fail: false, + } + } + + fn failing() -> Self { + Self { + sent: Mutex::new(Vec::new()), + fail: true, + } + } + + fn messages(&self) -> Vec<(String, String)> { + self.sent.lock().unwrap().clone() + } + } + + #[async_trait] + impl ChatTransport for CapturingTransport { + async fn send_message( + &self, + room_id: &str, + plain: &str, + _html: &str, + ) -> Result { + if self.fail { + return Err("send failed".to_string()); + } + self.sent + .lock() + .unwrap() + .push((room_id.to_string(), plain.to_string())); + Ok("msg-id".to_string()) + } + + async fn edit_message( + &self, + _room_id: &str, + _original_message_id: &str, + _plain: &str, + _html: &str, + ) -> Result<(), String> { + Ok(()) + } + + async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { + Ok(()) + } + } + + #[tokio::test] + async fn notify_manual_sends_to_all_channels() { + let transport = Arc::new(CapturingTransport::new()); + let notifier = BotShutdownNotifier::new( + Arc::clone(&transport) as Arc, + vec!["#channel1".to_string(), "#channel2".to_string()], + "Timmy".to_string(), + ); + + notifier.notify(ShutdownReason::Manual).await; + + let msgs = transport.messages(); + assert_eq!(msgs.len(), 2); + assert_eq!(msgs[0].0, "#channel1"); + assert_eq!(msgs[1].0, "#channel2"); + // Message must indicate manual stop. + assert!( + msgs[0].1.contains("offline"), + "expected 'offline' in manual message: {}", + msgs[0].1 + ); + assert!( + msgs[0].1.contains("stopped") || msgs[0].1.contains("manual"), + "expected reason in manual message: {}", + msgs[0].1 + ); + } + + #[tokio::test] + async fn notify_rebuild_sends_rebuild_reason() { + let transport = Arc::new(CapturingTransport::new()); + let notifier = BotShutdownNotifier::new( + Arc::clone(&transport) as Arc, + vec!["#general".to_string()], + "Timmy".to_string(), + ); + + notifier.notify(ShutdownReason::Rebuild).await; + + let msgs = transport.messages(); + assert_eq!(msgs.len(), 1); + // Message must indicate rebuild, not manual stop. + assert!( + msgs[0].1.contains("build") || msgs[0].1.contains("rebuild"), + "expected rebuild reason in message: {}", + msgs[0].1 + ); + } + + #[tokio::test] + async fn notify_manual_and_rebuild_messages_are_distinct() { + let transport_a = Arc::new(CapturingTransport::new()); + let notifier_a = BotShutdownNotifier::new( + Arc::clone(&transport_a) as Arc, + vec!["C1".to_string()], + "Bot".to_string(), + ); + notifier_a.notify(ShutdownReason::Manual).await; + + let transport_b = Arc::new(CapturingTransport::new()); + let notifier_b = BotShutdownNotifier::new( + Arc::clone(&transport_b) as Arc, + vec!["C1".to_string()], + "Bot".to_string(), + ); + notifier_b.notify(ShutdownReason::Rebuild).await; + + let manual_msg = &transport_a.messages()[0].1; + let rebuild_msg = &transport_b.messages()[0].1; + assert_ne!(manual_msg, rebuild_msg, "manual and rebuild messages must differ"); + } + + #[tokio::test] + async fn notify_is_best_effort_failing_send_does_not_panic() { + // A transport that always fails should not cause notify() to panic or + // return an error — the failure is swallowed silently. + let transport = Arc::new(CapturingTransport::failing()); + let notifier = BotShutdownNotifier::new( + Arc::clone(&transport) as Arc, + vec!["#channel".to_string()], + "Timmy".to_string(), + ); + // Should complete without panicking. + notifier.notify(ShutdownReason::Manual).await; + } + + #[tokio::test] + async fn notify_with_no_channels_is_noop() { + let transport = Arc::new(CapturingTransport::new()); + let notifier = BotShutdownNotifier::new( + Arc::clone(&transport) as Arc, + vec![], + "Timmy".to_string(), + ); + notifier.notify(ShutdownReason::Manual).await; + assert!(transport.messages().is_empty()); + } +}