storkit: merge 366_story_bot_sends_shutdown_message_on_server_stop_or_rebuild

This commit is contained in:
dave
2026-03-22 19:08:41 +00:00
parent f610ef6046
commit 47173e0d3a
7 changed files with 390 additions and 20 deletions

View File

@@ -1,5 +1,6 @@
use crate::agents::{AgentPool, ReconciliationEvent}; use crate::agents::{AgentPool, ReconciliationEvent};
use crate::io::watcher::WatcherEvent; use crate::io::watcher::WatcherEvent;
use crate::rebuild::{BotShutdownNotifier, ShutdownReason};
use crate::state::SessionState; use crate::state::SessionState;
use crate::store::JsonFileStore; use crate::store::JsonFileStore;
use crate::workflow::WorkflowState; use crate::workflow::WorkflowState;
@@ -52,6 +53,20 @@ pub struct AppContext {
/// Child process of the QA app launched for manual testing. /// Child process of the QA app launched for manual testing.
/// Only one instance runs at a time. /// Only one instance runs at a time.
pub qa_app_process: Arc<std::sync::Mutex<Option<std::process::Child>>>, pub qa_app_process: Arc<std::sync::Mutex<Option<std::process::Child>>>,
/// Best-effort shutdown notifier for active bot channels (Slack / WhatsApp).
///
/// When set, the MCP `rebuild_and_restart` tool uses this to announce the
/// shutdown to configured channels before re-execing the server binary.
/// `None` when no webhook-based bot transport is configured.
pub bot_shutdown: Option<Arc<BotShutdownNotifier>>,
/// Watch sender used to signal the Matrix bot task that the server is
/// shutting down (rebuild path). The bot task listens for this signal and
/// sends a shutdown announcement to all configured rooms.
///
/// Wrapped in `Arc` so `AppContext` can implement `Clone`.
/// `None` when no Matrix bot is configured.
pub matrix_shutdown_tx:
Option<Arc<tokio::sync::watch::Sender<Option<ShutdownReason>>>>,
} }
#[cfg(test)] #[cfg(test)]
@@ -73,6 +88,8 @@ impl AppContext {
perm_tx, perm_tx,
perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)),
qa_app_process: Arc::new(std::sync::Mutex::new(None)), qa_app_process: Arc::new(std::sync::Mutex::new(None)),
bot_shutdown: None,
matrix_shutdown_tx: None,
} }
} }
} }

View File

@@ -29,8 +29,18 @@ pub(super) fn tool_get_server_logs(args: &Value) -> Result<String, String> {
/// Rebuild the server binary and re-exec (delegates to `crate::rebuild`). /// Rebuild the server binary and re-exec (delegates to `crate::rebuild`).
pub(super) async fn tool_rebuild_and_restart(ctx: &AppContext) -> Result<String, String> { pub(super) async fn tool_rebuild_and_restart(ctx: &AppContext) -> Result<String, String> {
slog!("[rebuild] Rebuild and restart requested via MCP tool"); slog!("[rebuild] Rebuild and restart requested via MCP tool");
// Signal the Matrix bot (if active) so it can send its own shutdown
// announcement before the process is replaced. Best-effort: we wait up
// to 1.5 s for the message to be delivered.
if let Some(ref tx) = ctx.matrix_shutdown_tx {
let _ = tx.send(Some(crate::rebuild::ShutdownReason::Rebuild));
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
}
let project_root = ctx.state.get_project_root().unwrap_or_default(); let project_root = ctx.state.get_project_root().unwrap_or_default();
crate::rebuild::rebuild_and_restart(&ctx.agents, &project_root).await let notifier = ctx.bot_shutdown.as_deref();
crate::rebuild::rebuild_and_restart(&ctx.agents, &project_root, notifier).await
} }
/// Generate a Claude Code permission rule string for the given tool name and input. /// Generate a Claude Code permission rule string for the given tool name and input.

View File

@@ -24,6 +24,7 @@ use crate::http::build_routes;
use crate::http::context::AppContext; use crate::http::context::AppContext;
use crate::http::{remove_port_file, resolve_port, write_port_file}; use crate::http::{remove_port_file, resolve_port, write_port_file};
use crate::io::fs::find_story_kit_root; use crate::io::fs::find_story_kit_root;
use crate::rebuild::{BotShutdownNotifier, ShutdownReason};
use crate::state::SessionState; use crate::state::SessionState;
use crate::store::JsonFileStore; use crate::store::JsonFileStore;
use crate::workflow::WorkflowState; use crate::workflow::WorkflowState;
@@ -177,17 +178,6 @@ async fn main() -> Result<(), std::io::Error> {
let startup_reconciliation_tx = reconciliation_tx.clone(); let startup_reconciliation_tx = reconciliation_tx.clone();
// Clone for shutdown cleanup — kill orphaned PTY children before exiting. // Clone for shutdown cleanup — kill orphaned PTY children before exiting.
let agents_for_shutdown = Arc::clone(&agents); let agents_for_shutdown = Arc::clone(&agents);
let ctx = AppContext {
state: app_state,
store,
workflow,
agents,
watcher_tx,
reconciliation_tx,
perm_tx,
perm_rx,
qa_app_process: Arc::new(std::sync::Mutex::new(None)),
};
// Build WhatsApp webhook context if bot.toml configures transport = "whatsapp". // Build WhatsApp webhook context if bot.toml configures transport = "whatsapp".
let whatsapp_ctx: Option<Arc<whatsapp::WhatsAppWebhookContext>> = startup_root let whatsapp_ctx: Option<Arc<whatsapp::WhatsAppWebhookContext>> = startup_root
@@ -255,7 +245,50 @@ async fn main() -> Result<(), std::io::Error> {
}) })
}); });
let app = build_routes(ctx, whatsapp_ctx, slack_ctx); // Build a best-effort shutdown notifier for webhook-based transports.
//
// • Slack: channels are fixed at startup (channel_ids from bot.toml).
// • WhatsApp: active senders are tracked at runtime in ambient_rooms.
// We keep the WhatsApp context Arc so we can read the rooms at shutdown.
// • Matrix: the bot task manages its own announcement via matrix_shutdown_tx.
let bot_shutdown_notifier: Option<Arc<BotShutdownNotifier>> =
if let Some(ref ctx) = slack_ctx {
let channels: Vec<String> = ctx.channel_ids.iter().cloned().collect();
Some(Arc::new(BotShutdownNotifier::new(
Arc::clone(&ctx.transport) as Arc<dyn crate::transport::ChatTransport>,
channels,
ctx.bot_name.clone(),
)))
} else {
None
};
// Retain a reference to the WhatsApp context for shutdown notifications.
// At shutdown time we read ambient_rooms to get the current set of active senders.
let whatsapp_ctx_for_shutdown: Option<Arc<whatsapp::WhatsAppWebhookContext>> =
whatsapp_ctx.clone();
// Watch channel: signals the Matrix bot task to send a shutdown announcement.
// `None` initial value means "server is running".
let (matrix_shutdown_tx, matrix_shutdown_rx) =
tokio::sync::watch::channel::<Option<ShutdownReason>>(None);
let matrix_shutdown_tx = Arc::new(matrix_shutdown_tx);
let matrix_shutdown_tx_for_rebuild = Arc::clone(&matrix_shutdown_tx);
let ctx = AppContext {
state: app_state,
store,
workflow,
agents,
watcher_tx,
reconciliation_tx,
perm_tx,
perm_rx,
qa_app_process: Arc::new(std::sync::Mutex::new(None)),
bot_shutdown: bot_shutdown_notifier.clone(),
matrix_shutdown_tx: Some(Arc::clone(&matrix_shutdown_tx)),
};
let app = build_routes(ctx, whatsapp_ctx.clone(), slack_ctx.clone());
// Optional Matrix bot: connect to the homeserver and start listening for // Optional Matrix bot: connect to the homeserver and start listening for
// messages if `.storkit/bot.toml` is present and enabled. // messages if `.storkit/bot.toml` is present and enabled.
@@ -265,7 +298,11 @@ async fn main() -> Result<(), std::io::Error> {
watcher_tx_for_bot, watcher_tx_for_bot,
perm_rx_for_bot, perm_rx_for_bot,
Arc::clone(&startup_agents), Arc::clone(&startup_agents),
matrix_shutdown_rx,
); );
} else {
// Keep the receiver alive (drop it) so the sender never errors.
drop(matrix_shutdown_rx);
} }
// On startup: // On startup:
@@ -295,6 +332,36 @@ async fn main() -> Result<(), std::io::Error> {
let result = Server::new(TcpListener::bind(&addr)).run(app).await; let result = Server::new(TcpListener::bind(&addr)).run(app).await;
// ── Shutdown notifications (best-effort) ─────────────────────────────
//
// The server is stopping (SIGINT / SIGTERM). Notify active bot channels
// so participants know the bot is going offline. We do this before killing
// PTY children so network I/O can still complete.
// Slack: notifier holds the fixed channel list.
if let Some(ref notifier) = bot_shutdown_notifier {
notifier.notify(ShutdownReason::Manual).await;
}
// WhatsApp: read the current set of ambient rooms and notify each sender.
if let Some(ref ctx) = whatsapp_ctx_for_shutdown {
let rooms: Vec<String> = ctx.ambient_rooms.lock().unwrap().iter().cloned().collect();
if !rooms.is_empty() {
let wa_notifier = BotShutdownNotifier::new(
Arc::clone(&ctx.transport) as Arc<dyn crate::transport::ChatTransport>,
rooms,
ctx.bot_name.clone(),
);
wa_notifier.notify(ShutdownReason::Manual).await;
}
}
// Matrix: signal the bot task and give it a short window to send its message.
let _ = matrix_shutdown_tx_for_rebuild.send(Some(ShutdownReason::Manual));
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
// ── Cleanup ──────────────────────────────────────────────────────────
// Kill all active PTY child processes before exiting to prevent orphaned // Kill all active PTY child processes before exiting to prevent orphaned
// Claude Code processes from running after the server restarts. // Claude Code processes from running after the server restarts.
agents_for_shutdown.kill_all_children(); agents_for_shutdown.kill_all_children();

View File

@@ -213,6 +213,7 @@ pub async fn run_bot(
watcher_rx: tokio::sync::broadcast::Receiver<crate::io::watcher::WatcherEvent>, watcher_rx: tokio::sync::broadcast::Receiver<crate::io::watcher::WatcherEvent>,
perm_rx: Arc<TokioMutex<mpsc::UnboundedReceiver<PermissionForward>>>, perm_rx: Arc<TokioMutex<mpsc::UnboundedReceiver<PermissionForward>>>,
agents: Arc<AgentPool>, agents: Arc<AgentPool>,
shutdown_rx: tokio::sync::watch::Receiver<Option<crate::rebuild::ShutdownReason>>,
) -> Result<(), String> { ) -> Result<(), String> {
let store_path = project_root.join(".storkit").join("matrix_store"); let store_path = project_root.join(".storkit").join("matrix_store");
let client = Client::builder() let client = Client::builder()
@@ -426,6 +427,30 @@ pub async fn run_bot(
notif_project_root, notif_project_root,
); );
// Spawn a shutdown watcher that sends a best-effort goodbye message to all
// configured rooms when the server is about to stop (SIGINT/SIGTERM or rebuild).
{
let shutdown_transport = Arc::clone(&transport);
let shutdown_rooms: Vec<String> =
announce_room_ids.iter().map(|r| r.to_string()).collect();
let shutdown_bot_name = announce_bot_name.clone();
let mut rx = shutdown_rx;
tokio::spawn(async move {
// Wait until the channel holds Some(reason).
if rx.wait_for(|v| v.is_some()).await.is_ok() {
let reason = rx.borrow().clone();
let notifier = crate::rebuild::BotShutdownNotifier::new(
shutdown_transport,
shutdown_rooms,
shutdown_bot_name,
);
if let Some(r) = reason {
notifier.notify(r).await;
}
}
});
}
// Send a startup announcement to each configured room so users know the // Send a startup announcement to each configured room so users know the
// bot is online. This runs once per process start — the sync loop handles // bot is online. This runs once per process start — the sync loop handles
// reconnects internally so this code is never reached again on a network // reconnects internally so this code is never reached again on a network

View File

@@ -32,9 +32,10 @@ pub use config::BotConfig;
use crate::agents::AgentPool; use crate::agents::AgentPool;
use crate::http::context::PermissionForward; use crate::http::context::PermissionForward;
use crate::io::watcher::WatcherEvent; use crate::io::watcher::WatcherEvent;
use crate::rebuild::ShutdownReason;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{Mutex as TokioMutex, broadcast, mpsc}; use tokio::sync::{Mutex as TokioMutex, broadcast, mpsc, watch};
/// Attempt to start the Matrix bot. /// Attempt to start the Matrix bot.
/// ///
@@ -50,12 +51,17 @@ use tokio::sync::{Mutex as TokioMutex, broadcast, mpsc};
/// `prompt_permission` tool. The bot locks it during active chat sessions /// `prompt_permission` tool. The bot locks it during active chat sessions
/// to surface permission prompts to the Matrix room and relay user decisions. /// to surface permission prompts to the Matrix room and relay user decisions.
/// ///
/// `shutdown_rx` is a watch channel that delivers a `ShutdownReason` when the
/// server is about to stop (SIGINT/SIGTERM or rebuild). The bot uses this to
/// announce the shutdown to all configured rooms before the process exits.
///
/// Must be called from within a Tokio runtime context (e.g., from `main`). /// Must be called from within a Tokio runtime context (e.g., from `main`).
pub fn spawn_bot( pub fn spawn_bot(
project_root: &Path, project_root: &Path,
watcher_tx: broadcast::Sender<WatcherEvent>, watcher_tx: broadcast::Sender<WatcherEvent>,
perm_rx: Arc<TokioMutex<mpsc::UnboundedReceiver<PermissionForward>>>, perm_rx: Arc<TokioMutex<mpsc::UnboundedReceiver<PermissionForward>>>,
agents: Arc<AgentPool>, agents: Arc<AgentPool>,
shutdown_rx: watch::Receiver<Option<ShutdownReason>>,
) { ) {
let config = match BotConfig::load(project_root) { let config = match BotConfig::load(project_root) {
Some(c) => c, Some(c) => c,
@@ -83,7 +89,8 @@ pub fn spawn_bot(
let root = project_root.to_path_buf(); let root = project_root.to_path_buf();
let watcher_rx = watcher_tx.subscribe(); let watcher_rx = watcher_tx.subscribe();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = bot::run_bot(config, root, watcher_rx, perm_rx, agents).await { if let Err(e) = bot::run_bot(config, root, watcher_rx, perm_rx, agents, shutdown_rx).await
{
crate::slog!("[matrix-bot] Fatal error: {e}"); crate::slog!("[matrix-bot] Fatal error: {e}");
} }
}); });

View File

@@ -50,7 +50,7 @@ pub async fn handle_rebuild(
agents: &Arc<AgentPool>, agents: &Arc<AgentPool>,
) -> String { ) -> String {
crate::slog!("[matrix-bot] rebuild command received (bot={bot_name})"); crate::slog!("[matrix-bot] rebuild command received (bot={bot_name})");
match crate::rebuild::rebuild_and_restart(agents, project_root).await { match crate::rebuild::rebuild_and_restart(agents, project_root, None).await {
Ok(msg) => msg, Ok(msg) => msg,
Err(e) => format!("Rebuild failed: {e}"), Err(e) => format!("Rebuild failed: {e}"),
} }

View File

@@ -2,7 +2,72 @@
use crate::agents::AgentPool; use crate::agents::AgentPool;
use crate::slog; use crate::slog;
use crate::transport::ChatTransport;
use std::path::Path; use std::path::Path;
use std::sync::Arc;
// ── Shutdown notification ────────────────────────────────────────────────
/// The reason the server is shutting down.
///
/// Used to select the appropriate shutdown message sent to active bot channels.
#[derive(Clone, Debug, PartialEq)]
pub enum ShutdownReason {
/// The operator stopped the server manually (SIGINT / SIGTERM / ctrl-c).
Manual,
/// A rebuild-and-restart was requested (via MCP tool or bot command).
Rebuild,
}
/// Sends a shutdown announcement to all configured bot channels.
///
/// Wraps a [`ChatTransport`] together with the list of channel/room IDs the
/// bot is active in. Calling [`notify`] is best-effort — failures are logged
/// but never propagate, so shutdown is never blocked by a failed send.
pub struct BotShutdownNotifier {
transport: Arc<dyn ChatTransport>,
channels: Vec<String>,
bot_name: String,
}
impl BotShutdownNotifier {
pub fn new(
transport: Arc<dyn ChatTransport>,
channels: Vec<String>,
bot_name: String,
) -> Self {
Self {
transport,
channels,
bot_name,
}
}
/// Send a shutdown message to all configured channels.
///
/// Errors from individual sends are logged and ignored so that a single
/// failing channel does not prevent messages from reaching the rest.
pub async fn notify(&self, reason: ShutdownReason) {
let msg = match reason {
ShutdownReason::Manual => {
format!("{} is going offline (server stopped).", self.bot_name)
}
ShutdownReason::Rebuild => {
format!(
"{} is going offline to pick up a new build.",
self.bot_name
)
}
};
for channel in &self.channels {
if let Err(e) = self.transport.send_message(channel, &msg, &msg).await {
slog!("[shutdown] Failed to send shutdown message to {channel}: {e}");
}
}
}
}
// ── Rebuild ──────────────────────────────────────────────────────────────
/// Rebuild the server binary and re-exec. /// Rebuild the server binary and re-exec.
/// ///
@@ -10,9 +75,14 @@ use std::path::Path;
/// 2. Runs `cargo build [-p storkit]` from the workspace root, matching /// 2. Runs `cargo build [-p storkit]` from the workspace root, matching
/// the current build profile (debug or release). /// the current build profile (debug or release).
/// 3. If the build fails, returns the build error (server stays up). /// 3. If the build fails, returns the build error (server stays up).
/// 4. If the build succeeds, re-execs the process with the new binary via /// 4. If the build succeeds, sends a best-effort shutdown notification (if a
/// `std::os::unix::process::CommandExt::exec()`. /// [`BotShutdownNotifier`] is provided), then re-execs the process with
pub async fn rebuild_and_restart(agents: &AgentPool, project_root: &Path) -> Result<String, String> { /// the new binary via `std::os::unix::process::CommandExt::exec()`.
pub async fn rebuild_and_restart(
agents: &AgentPool,
project_root: &Path,
notifier: Option<&BotShutdownNotifier>,
) -> Result<String, String> {
slog!("[rebuild] Rebuild and restart requested"); slog!("[rebuild] Rebuild and restart requested");
// 1. Gracefully stop all running agents. // 1. Gracefully stop all running agents.
@@ -69,7 +139,14 @@ pub async fn rebuild_and_restart(agents: &AgentPool, project_root: &Path) -> Res
slog!("[rebuild] Build succeeded, re-execing with new binary"); slog!("[rebuild] Build succeeded, re-execing with new binary");
// 4. Re-exec with the new binary. // 4. Send shutdown notification before replacing the process so that chat
// participants know the bot is going offline. Best-effort only — we
// do not abort the rebuild if the send fails.
if let Some(n) = notifier {
n.notify(ShutdownReason::Rebuild).await;
}
// 5. Re-exec with the new binary.
// Use the cargo output path rather than current_exe() so that rebuilds // Use the cargo output path rather than current_exe() so that rebuilds
// inside Docker work correctly — the running binary may be installed at // inside Docker work correctly — the running binary may be installed at
// /usr/local/bin/storkit (read-only) while cargo writes the new binary // /usr/local/bin/storkit (read-only) while cargo writes the new binary
@@ -102,3 +179,170 @@ pub async fn rebuild_and_restart(agents: &AgentPool, project_root: &Path) -> Res
// If we get here, exec() failed. // If we get here, exec() failed.
Err(format!("Failed to exec new binary: {err}")) Err(format!("Failed to exec new binary: {err}"))
} }
// ── Tests ────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use crate::transport::MessageId;
use std::sync::Mutex;
/// In-memory transport that records sent messages.
struct CapturingTransport {
sent: Mutex<Vec<(String, String)>>,
fail: bool,
}
impl CapturingTransport {
fn new() -> Self {
Self {
sent: Mutex::new(Vec::new()),
fail: false,
}
}
fn failing() -> Self {
Self {
sent: Mutex::new(Vec::new()),
fail: true,
}
}
fn messages(&self) -> Vec<(String, String)> {
self.sent.lock().unwrap().clone()
}
}
#[async_trait]
impl ChatTransport for CapturingTransport {
async fn send_message(
&self,
room_id: &str,
plain: &str,
_html: &str,
) -> Result<MessageId, String> {
if self.fail {
return Err("send failed".to_string());
}
self.sent
.lock()
.unwrap()
.push((room_id.to_string(), plain.to_string()));
Ok("msg-id".to_string())
}
async fn edit_message(
&self,
_room_id: &str,
_original_message_id: &str,
_plain: &str,
_html: &str,
) -> Result<(), String> {
Ok(())
}
async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> {
Ok(())
}
}
#[tokio::test]
async fn notify_manual_sends_to_all_channels() {
let transport = Arc::new(CapturingTransport::new());
let notifier = BotShutdownNotifier::new(
Arc::clone(&transport) as Arc<dyn ChatTransport>,
vec!["#channel1".to_string(), "#channel2".to_string()],
"Timmy".to_string(),
);
notifier.notify(ShutdownReason::Manual).await;
let msgs = transport.messages();
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0].0, "#channel1");
assert_eq!(msgs[1].0, "#channel2");
// Message must indicate manual stop.
assert!(
msgs[0].1.contains("offline"),
"expected 'offline' in manual message: {}",
msgs[0].1
);
assert!(
msgs[0].1.contains("stopped") || msgs[0].1.contains("manual"),
"expected reason in manual message: {}",
msgs[0].1
);
}
#[tokio::test]
async fn notify_rebuild_sends_rebuild_reason() {
let transport = Arc::new(CapturingTransport::new());
let notifier = BotShutdownNotifier::new(
Arc::clone(&transport) as Arc<dyn ChatTransport>,
vec!["#general".to_string()],
"Timmy".to_string(),
);
notifier.notify(ShutdownReason::Rebuild).await;
let msgs = transport.messages();
assert_eq!(msgs.len(), 1);
// Message must indicate rebuild, not manual stop.
assert!(
msgs[0].1.contains("build") || msgs[0].1.contains("rebuild"),
"expected rebuild reason in message: {}",
msgs[0].1
);
}
#[tokio::test]
async fn notify_manual_and_rebuild_messages_are_distinct() {
let transport_a = Arc::new(CapturingTransport::new());
let notifier_a = BotShutdownNotifier::new(
Arc::clone(&transport_a) as Arc<dyn ChatTransport>,
vec!["C1".to_string()],
"Bot".to_string(),
);
notifier_a.notify(ShutdownReason::Manual).await;
let transport_b = Arc::new(CapturingTransport::new());
let notifier_b = BotShutdownNotifier::new(
Arc::clone(&transport_b) as Arc<dyn ChatTransport>,
vec!["C1".to_string()],
"Bot".to_string(),
);
notifier_b.notify(ShutdownReason::Rebuild).await;
let manual_msg = &transport_a.messages()[0].1;
let rebuild_msg = &transport_b.messages()[0].1;
assert_ne!(manual_msg, rebuild_msg, "manual and rebuild messages must differ");
}
#[tokio::test]
async fn notify_is_best_effort_failing_send_does_not_panic() {
// A transport that always fails should not cause notify() to panic or
// return an error — the failure is swallowed silently.
let transport = Arc::new(CapturingTransport::failing());
let notifier = BotShutdownNotifier::new(
Arc::clone(&transport) as Arc<dyn ChatTransport>,
vec!["#channel".to_string()],
"Timmy".to_string(),
);
// Should complete without panicking.
notifier.notify(ShutdownReason::Manual).await;
}
#[tokio::test]
async fn notify_with_no_channels_is_noop() {
let transport = Arc::new(CapturingTransport::new());
let notifier = BotShutdownNotifier::new(
Arc::clone(&transport) as Arc<dyn ChatTransport>,
vec![],
"Timmy".to_string(),
);
notifier.notify(ShutdownReason::Manual).await;
assert!(transport.messages().is_empty());
}
}