huskies: merge 918
This commit is contained in:
@@ -5,6 +5,7 @@
|
|||||||
//! This file contains only the `run` entrypoint and `build_gateway_route` wiring.
|
//! This file contains only the `run` entrypoint and `build_gateway_route` wiring.
|
||||||
|
|
||||||
use crate::http::gateway::*;
|
use crate::http::gateway::*;
|
||||||
|
use crate::rebuild::ShutdownReason;
|
||||||
use crate::service::gateway::{self, GatewayState};
|
use crate::service::gateway::{self, GatewayState};
|
||||||
use poem::EndpointExt;
|
use poem::EndpointExt;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
@@ -118,7 +119,7 @@ pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> {
|
|||||||
.iter()
|
.iter()
|
||||||
.map(|(name, entry)| (name.clone(), entry.url.clone()))
|
.map(|(name, entry)| (name.clone(), entry.url.clone()))
|
||||||
.collect();
|
.collect();
|
||||||
let bot_abort = gateway::io::spawn_gateway_bot(
|
let (bot_abort, bot_shutdown_tx) = gateway::io::spawn_gateway_bot(
|
||||||
&config_dir,
|
&config_dir,
|
||||||
Arc::clone(&state_arc.active_project),
|
Arc::clone(&state_arc.active_project),
|
||||||
gateway_projects,
|
gateway_projects,
|
||||||
@@ -127,16 +128,29 @@ pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> {
|
|||||||
Some(state_arc.event_tx.clone()),
|
Some(state_arc.event_tx.clone()),
|
||||||
);
|
);
|
||||||
*state_arc.bot_handle.lock().await = bot_abort;
|
*state_arc.bot_handle.lock().await = bot_abort;
|
||||||
|
*state_arc.bot_shutdown_tx.lock().await = Some(bot_shutdown_tx);
|
||||||
|
|
||||||
let route = build_gateway_route(state_arc);
|
let route = build_gateway_route(state_arc.clone());
|
||||||
|
|
||||||
let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
|
let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||||
let addr = format!("{host}:{port}");
|
let addr = format!("{host}:{port}");
|
||||||
crate::slog!("[gateway] Listening on {addr}");
|
crate::slog!("[gateway] Listening on {addr}");
|
||||||
|
|
||||||
poem::Server::new(poem::listener::TcpListener::bind(&addr))
|
let result = poem::Server::new(poem::listener::TcpListener::bind(&addr))
|
||||||
.run(route)
|
.run(route)
|
||||||
.await
|
.await;
|
||||||
|
|
||||||
|
// Best-effort shutdown notification: signal the Matrix bot so it can post
|
||||||
|
// "going offline" before the process exits. Mirror of main.rs:346.
|
||||||
|
{
|
||||||
|
let guard = state_arc.bot_shutdown_tx.lock().await;
|
||||||
|
if let Some(tx) = guard.as_ref() {
|
||||||
|
let _ = tx.send(Some(ShutdownReason::Manual));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
|
||||||
|
|
||||||
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -431,6 +431,10 @@ pub type ActiveProject = std::sync::Arc<tokio::sync::RwLock<String>>;
|
|||||||
/// `gateway_event_tx` — when `Some`, the bot will subscribe to the gateway
|
/// `gateway_event_tx` — when `Some`, the bot will subscribe to the gateway
|
||||||
/// status broadcaster and forward [`super::GatewayStatusEvent`]s to its
|
/// status broadcaster and forward [`super::GatewayStatusEvent`]s to its
|
||||||
/// configured Matrix rooms with a `[project-name]` prefix.
|
/// configured Matrix rooms with a `[project-name]` prefix.
|
||||||
|
///
|
||||||
|
/// Returns `(abort_handle, shutdown_tx)`. The caller **must** hold
|
||||||
|
/// `shutdown_tx` for the bot's lifetime and send `Some(ShutdownReason)` on it
|
||||||
|
/// before process exit so the bot can announce "going offline" to its rooms.
|
||||||
pub fn spawn_gateway_bot(
|
pub fn spawn_gateway_bot(
|
||||||
config_dir: &Path,
|
config_dir: &Path,
|
||||||
active_project: ActiveProject,
|
active_project: ActiveProject,
|
||||||
@@ -438,7 +442,10 @@ pub fn spawn_gateway_bot(
|
|||||||
gateway_project_urls: BTreeMap<String, String>,
|
gateway_project_urls: BTreeMap<String, String>,
|
||||||
port: u16,
|
port: u16,
|
||||||
gateway_event_tx: Option<tokio::sync::broadcast::Sender<super::GatewayStatusEvent>>,
|
gateway_event_tx: Option<tokio::sync::broadcast::Sender<super::GatewayStatusEvent>>,
|
||||||
) -> Option<tokio::task::AbortHandle> {
|
) -> (
|
||||||
|
Option<tokio::task::AbortHandle>,
|
||||||
|
tokio::sync::watch::Sender<Option<crate::rebuild::ShutdownReason>>,
|
||||||
|
) {
|
||||||
use crate::agents::AgentPool;
|
use crate::agents::AgentPool;
|
||||||
use crate::services::Services;
|
use crate::services::Services;
|
||||||
use tokio::sync::{broadcast, mpsc};
|
use tokio::sync::{broadcast, mpsc};
|
||||||
@@ -458,7 +465,8 @@ pub fn spawn_gateway_bot(
|
|||||||
|
|
||||||
let (shutdown_tx, shutdown_rx) =
|
let (shutdown_tx, shutdown_rx) =
|
||||||
tokio::sync::watch::channel::<Option<crate::rebuild::ShutdownReason>>(None);
|
tokio::sync::watch::channel::<Option<crate::rebuild::ShutdownReason>>(None);
|
||||||
std::mem::forget(shutdown_tx);
|
// shutdown_tx is intentionally NOT forgotten — the caller holds it and
|
||||||
|
// sends Some(reason) on gateway shutdown so the bot announces "going offline".
|
||||||
|
|
||||||
let agents = std::sync::Arc::new(AgentPool::new(port, watcher_tx.clone()));
|
let agents = std::sync::Arc::new(AgentPool::new(port, watcher_tx.clone()));
|
||||||
|
|
||||||
@@ -498,7 +506,7 @@ pub fn spawn_gateway_bot(
|
|||||||
config_dir.join(".huskies").join("timers.json"),
|
config_dir.join(".huskies").join("timers.json"),
|
||||||
));
|
));
|
||||||
let gateway_event_rx = gateway_event_tx.map(|tx| tx.subscribe());
|
let gateway_event_rx = gateway_event_tx.map(|tx| tx.subscribe());
|
||||||
crate::chat::transport::matrix::spawn_bot(
|
let handle = crate::chat::transport::matrix::spawn_bot(
|
||||||
config_dir,
|
config_dir,
|
||||||
watcher_tx,
|
watcher_tx,
|
||||||
services,
|
services,
|
||||||
@@ -508,5 +516,48 @@ pub fn spawn_gateway_bot(
|
|||||||
gateway_project_urls,
|
gateway_project_urls,
|
||||||
timer_store,
|
timer_store,
|
||||||
gateway_event_rx,
|
gateway_event_rx,
|
||||||
)
|
);
|
||||||
|
(handle, shutdown_tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Tests ────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
/// Regression test for story 918: `spawn_gateway_bot` must return a live
|
||||||
|
/// `shutdown_tx` (not one dropped via `std::mem::forget`) so that callers
|
||||||
|
/// can signal `Some(reason)` and the bot's shutdown watcher receives it.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn spawn_gateway_bot_shutdown_tx_not_forgotten() {
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let active = std::sync::Arc::new(tokio::sync::RwLock::new("proj".to_string()));
|
||||||
|
let (event_tx, _) = tokio::sync::broadcast::channel(4);
|
||||||
|
|
||||||
|
let (handle, shutdown_tx) = spawn_gateway_bot(
|
||||||
|
tmp.path(),
|
||||||
|
active,
|
||||||
|
vec!["proj".to_string()],
|
||||||
|
std::collections::BTreeMap::new(),
|
||||||
|
3001,
|
||||||
|
Some(event_tx),
|
||||||
|
);
|
||||||
|
|
||||||
|
// No bot.toml in tmp → no abort handle spawned.
|
||||||
|
assert!(handle.is_none());
|
||||||
|
|
||||||
|
// The shutdown_tx must be live: subscribe a receiver and verify that
|
||||||
|
// sending Some(reason) is observed — this would fail if the sender
|
||||||
|
// had been forgotten (channel closed, send returns Err).
|
||||||
|
let rx = shutdown_tx.subscribe();
|
||||||
|
shutdown_tx
|
||||||
|
.send(Some(crate::rebuild::ShutdownReason::Manual))
|
||||||
|
.expect("shutdown_tx must not be closed (was previously std::mem::forget'd)");
|
||||||
|
assert_eq!(
|
||||||
|
*rx.borrow(),
|
||||||
|
Some(crate::rebuild::ShutdownReason::Manual),
|
||||||
|
"shutdown receiver must see the Manual reason"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ pub use io::{
|
|||||||
spawn_gateway_notification_poller,
|
spawn_gateway_notification_poller,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use crate::rebuild::ShutdownReason;
|
||||||
use io::Client;
|
use io::Client;
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
@@ -111,6 +112,12 @@ pub struct GatewayState {
|
|||||||
pub port: u16,
|
pub port: u16,
|
||||||
/// Abort handle for the running Matrix bot task (if any).
|
/// Abort handle for the running Matrix bot task (if any).
|
||||||
pub bot_handle: Arc<TokioMutex<Option<tokio::task::AbortHandle>>>,
|
pub bot_handle: Arc<TokioMutex<Option<tokio::task::AbortHandle>>>,
|
||||||
|
/// Watch sender used to signal the Matrix bot to shut down gracefully.
|
||||||
|
///
|
||||||
|
/// Send `Some(ShutdownReason)` on this channel before process exit so the
|
||||||
|
/// bot task receives the signal and posts an "going offline" announcement.
|
||||||
|
pub bot_shutdown_tx:
|
||||||
|
Arc<TokioMutex<Option<tokio::sync::watch::Sender<Option<ShutdownReason>>>>>,
|
||||||
/// Broadcast sender for [`GatewayStatusEvent`]s pushed by project nodes.
|
/// Broadcast sender for [`GatewayStatusEvent`]s pushed by project nodes.
|
||||||
///
|
///
|
||||||
/// Call `event_tx.subscribe()` to obtain a receiver for outbound fan-out.
|
/// Call `event_tx.subscribe()` to obtain a receiver for outbound fan-out.
|
||||||
@@ -142,6 +149,7 @@ impl GatewayState {
|
|||||||
config_dir,
|
config_dir,
|
||||||
port,
|
port,
|
||||||
bot_handle: Arc::new(TokioMutex::new(None)),
|
bot_handle: Arc::new(TokioMutex::new(None)),
|
||||||
|
bot_shutdown_tx: Arc::new(TokioMutex::new(None)),
|
||||||
event_tx,
|
event_tx,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -462,7 +470,7 @@ pub async fn save_bot_config_and_restart(state: &GatewayState, content: &str) ->
|
|||||||
.map(|(name, entry)| (name.clone(), entry.url.clone()))
|
.map(|(name, entry)| (name.clone(), entry.url.clone()))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let new_handle = io::spawn_gateway_bot(
|
let (new_handle, new_shutdown_tx) = io::spawn_gateway_bot(
|
||||||
&state.config_dir,
|
&state.config_dir,
|
||||||
Arc::clone(&state.active_project),
|
Arc::clone(&state.active_project),
|
||||||
gateway_projects,
|
gateway_projects,
|
||||||
@@ -471,6 +479,7 @@ pub async fn save_bot_config_and_restart(state: &GatewayState, content: &str) ->
|
|||||||
Some(state.event_tx.clone()),
|
Some(state.event_tx.clone()),
|
||||||
);
|
);
|
||||||
*handle = new_handle;
|
*handle = new_handle;
|
||||||
|
*state.bot_shutdown_tx.lock().await = Some(new_shutdown_tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
crate::slog!("[gateway] Bot configuration saved; bot restarted");
|
crate::slog!("[gateway] Bot configuration saved; bot restarted");
|
||||||
|
|||||||
Reference in New Issue
Block a user