//! Chat-transport context construction, startup announcements, stage-transition //! notification listeners, and shutdown notifications. use crate::chat; use crate::config; use crate::rebuild::{BotShutdownNotifier, ShutdownReason}; use crate::service; use crate::services::Services; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::broadcast; /// All chat-transport contexts built at startup, plus the Matrix shutdown channel. pub(crate) struct BotContexts { pub(crate) whatsapp_ctx: Option>, pub(crate) slack_ctx: Option>, pub(crate) discord_ctx: Option>, /// Best-effort shutdown notifier for Slack / Discord (fixed channel list). pub(crate) shutdown_notifier: Option>, /// Retained for shutdown-time WhatsApp notifications (reads ambient_rooms). pub(crate) whatsapp_ctx_for_shutdown: Option>, /// Sender used to signal the Matrix bot task at shutdown. pub(crate) matrix_shutdown_tx: Arc>>, } /// Build WhatsApp, Slack, and Discord contexts from `bot.toml`, along with /// the shutdown notifier and Matrix watch channel. /// /// Returns the `BotContexts` struct and the Matrix shutdown receiver separately /// so the receiver can be moved into `spawn_bot` without partially moving the struct. pub(crate) fn build_bot_contexts( startup_root: &Option, services: &Arc, ) -> ( BotContexts, tokio::sync::watch::Receiver>, ) { let whatsapp_ctx: Option> = startup_root .as_ref() .and_then(|root| chat::transport::matrix::BotConfig::load(root)) .filter(|cfg| cfg.transport == "whatsapp") .map(|cfg| { let provider = cfg.whatsapp_provider.clone(); let transport: Arc = if provider == "twilio" { Arc::new(chat::transport::whatsapp::TwilioWhatsAppTransport::new( cfg.twilio_account_sid.clone().unwrap_or_default(), cfg.twilio_auth_token.clone().unwrap_or_default(), cfg.twilio_whatsapp_number.clone().unwrap_or_default(), )) } else { let template_name = cfg .whatsapp_notification_template .clone() .unwrap_or_else(|| "pipeline_notification".to_string()); Arc::new(chat::transport::whatsapp::WhatsAppTransport::new( cfg.whatsapp_phone_number_id.clone().unwrap_or_default(), cfg.whatsapp_access_token.clone().unwrap_or_default(), template_name, )) }; let root = startup_root.clone().unwrap(); let history = chat::transport::whatsapp::load_whatsapp_history(&root); Arc::new(chat::transport::whatsapp::WhatsAppWebhookContext { services: Arc::clone(services), verify_token: cfg.whatsapp_verify_token.clone().unwrap_or_default(), provider, transport, history: std::sync::Arc::new(tokio::sync::Mutex::new(history)), history_size: cfg.history_size, window_tracker: Arc::new(chat::transport::whatsapp::MessagingWindowTracker::new()), allowed_phones: cfg.whatsapp_allowed_phones.clone(), app_secret: cfg.whatsapp_app_secret.clone().unwrap_or_default(), }) }); let slack_ctx: Option> = startup_root .as_ref() .and_then(|root| chat::transport::matrix::BotConfig::load(root)) .filter(|cfg| cfg.transport == "slack") .map(|cfg| { let transport = Arc::new(chat::transport::slack::SlackTransport::new( cfg.slack_bot_token.clone().unwrap_or_default(), )); let root = startup_root.clone().unwrap(); let history = chat::transport::slack::load_slack_history(&root); let channel_ids: std::collections::HashSet = cfg.slack_channel_ids.iter().cloned().collect(); Arc::new(chat::transport::slack::SlackWebhookContext { services: Arc::clone(services), signing_secret: cfg.slack_signing_secret.clone().unwrap_or_default(), transport, history: std::sync::Arc::new(tokio::sync::Mutex::new(history)), history_size: cfg.history_size, channel_ids, }) }); let discord_ctx: Option> = startup_root .as_ref() .and_then(|root| chat::transport::matrix::BotConfig::load(root)) .filter(|cfg| cfg.transport == "discord") .map(|cfg| { let transport = Arc::new(chat::transport::discord::DiscordTransport::new( cfg.discord_bot_token.clone().unwrap_or_default(), )); let root = startup_root.clone().unwrap(); let history = chat::transport::discord::load_discord_history(&root); let channel_ids: std::collections::HashSet = cfg.discord_channel_ids.iter().cloned().collect(); let allowed_users: std::collections::HashSet = cfg.discord_allowed_users.iter().cloned().collect(); Arc::new(chat::transport::discord::DiscordContext { services: Arc::clone(services), bot_token: cfg.discord_bot_token.clone().unwrap_or_default(), transport, history: std::sync::Arc::new(tokio::sync::Mutex::new(history)), history_size: cfg.history_size, channel_ids, allowed_users, }) }); // Build a best-effort shutdown notifier. // Slack and Discord have fixed channel lists; WhatsApp rooms are tracked at // runtime via ambient_rooms and handled separately in `notify_shutdown`. let shutdown_notifier: Option> = if let Some(ref ctx) = slack_ctx { let channels: Vec = ctx.channel_ids.iter().cloned().collect(); Some(Arc::new(BotShutdownNotifier::new( Arc::clone(&ctx.transport) as Arc, channels, ctx.services.bot_name.clone(), ))) } else if let Some(ref ctx) = discord_ctx { let channels: Vec = ctx.channel_ids.iter().cloned().collect(); Some(Arc::new(BotShutdownNotifier::new( Arc::clone(&ctx.transport) as Arc, channels, ctx.services.bot_name.clone(), ))) } else { None }; let whatsapp_ctx_for_shutdown = whatsapp_ctx.clone(); let (matrix_shutdown_tx, matrix_shutdown_rx) = tokio::sync::watch::channel::>(None); let matrix_shutdown_tx = Arc::new(matrix_shutdown_tx); ( BotContexts { whatsapp_ctx, slack_ctx, discord_ctx, shutdown_notifier, whatsapp_ctx_for_shutdown, matrix_shutdown_tx, }, matrix_shutdown_rx, ) } /// Send `"{bot_name} is online."` to all known WhatsApp, Slack, and Discord /// contacts so users know the bot is ready after a (re)start. pub(crate) fn spawn_startup_announcements(ctxs: &BotContexts) { use chat::transport::whatsapp::WhatsAppConversationHistory; if let Some(ref ctx) = ctxs.whatsapp_ctx { let transport = Arc::clone(&ctx.transport); let bot_name = ctx.services.bot_name.clone(); let history: WhatsAppConversationHistory = Arc::clone(&ctx.history); tokio::spawn(async move { let senders: Vec = history.lock().await.keys().cloned().collect(); if senders.is_empty() { return; } let notifier = BotShutdownNotifier::new(transport, senders, bot_name); notifier.notify_startup().await; }); } if let Some(ref ctx) = ctxs.slack_ctx { let transport = Arc::clone(&ctx.transport) as Arc; let bot_name = ctx.services.bot_name.clone(); let channels: Vec = ctx.channel_ids.iter().cloned().collect(); tokio::spawn(async move { if channels.is_empty() { return; } let notifier = BotShutdownNotifier::new(transport, channels, bot_name); notifier.notify_startup().await; }); } if let Some(ref ctx) = ctxs.discord_ctx { let transport = Arc::clone(&ctx.transport) as Arc; let bot_name = ctx.services.bot_name.clone(); let channels: Vec = ctx.channel_ids.iter().cloned().collect(); tokio::spawn(async move { if channels.is_empty() { return; } let notifier = BotShutdownNotifier::new(transport, channels, bot_name); notifier.notify_startup().await; }); } } /// Spawn stage-transition notification listeners and status-event consumers for /// all configured chat transports (WhatsApp, Slack, Discord). /// /// Watcher receivers that have no matching transport are dropped immediately. pub(crate) fn spawn_notification_listeners( ctxs: &BotContexts, startup_root: &Option, watcher_rx_for_whatsapp: broadcast::Receiver, watcher_rx_for_slack: broadcast::Receiver, watcher_rx_for_discord: broadcast::Receiver, ) { if let (Some(ctx), Some(root)) = (&ctxs.whatsapp_ctx, startup_root) { let ambient_rooms = Arc::clone(&ctx.services.ambient_rooms); service::notifications::spawn_notification_listener( Arc::clone(&ctx.transport), move || ambient_rooms.lock().unwrap().iter().cloned().collect(), watcher_rx_for_whatsapp, root.clone(), ); { use crate::service::status::format::format_status_event; let status_enabled = config::ProjectConfig::load(root) .map(|c| c.whatsapp_status_consumer) .unwrap_or(true); if status_enabled { let mut sub = ctx.services.status.subscribe(); let transport = Arc::clone(&ctx.transport) as Arc; let ambient_rooms = Arc::clone(&ctx.services.ambient_rooms); tokio::spawn(async move { while let Some(event) = sub.recv().await { let plain = format_status_event(&event); let rooms: Vec = ambient_rooms.lock().unwrap().iter().cloned().collect(); for room in &rooms { if let Err(e) = transport.send_message(room, &plain, "").await { crate::slog!( "[whatsapp] Failed to send status event to {room}: {e}" ); } } } crate::slog!("[whatsapp] Status subscriber task exiting — broadcaster dropped"); }); } } } else { drop(watcher_rx_for_whatsapp); } if let (Some(ctx), Some(root)) = (&ctxs.slack_ctx, startup_root) { let channel_ids: Vec = ctx.channel_ids.iter().cloned().collect(); service::notifications::spawn_notification_listener( Arc::clone(&ctx.transport) as Arc, move || channel_ids.clone(), watcher_rx_for_slack, root.clone(), ); { use crate::service::status::format::format_status_event; let status_enabled = config::ProjectConfig::load(root) .map(|c| c.slack_status_consumer) .unwrap_or(true); if status_enabled { let mut sub = ctx.services.status.subscribe(); let transport = Arc::clone(&ctx.transport) as Arc; let channels: Vec = ctx.channel_ids.iter().cloned().collect(); tokio::spawn(async move { while let Some(event) = sub.recv().await { let plain = format_status_event(&event); for channel in &channels { if let Err(e) = transport.send_message(channel, &plain, "").await { crate::slog!( "[slack] Failed to send status event to {channel}: {e}" ); } } } crate::slog!("[slack] Status subscriber task exiting — broadcaster dropped"); }); } } } else { drop(watcher_rx_for_slack); } if let (Some(ctx), Some(root)) = (&ctxs.discord_ctx, startup_root) { chat::transport::discord::gateway::spawn_gateway(Arc::clone(ctx)); let channel_ids: Vec = ctx.channel_ids.iter().cloned().collect(); service::notifications::spawn_notification_listener( Arc::clone(&ctx.transport) as Arc, move || channel_ids.clone(), watcher_rx_for_discord, root.clone(), ); { use crate::service::status::format::format_status_event; let status_enabled = config::ProjectConfig::load(root) .map(|c| c.discord_status_consumer) .unwrap_or(true); if status_enabled { let mut sub = ctx.services.status.subscribe(); let transport = Arc::clone(&ctx.transport) as Arc; let channels: Vec = ctx.channel_ids.iter().cloned().collect(); tokio::spawn(async move { while let Some(event) = sub.recv().await { let plain = format_status_event(&event); for channel in &channels { if let Err(e) = transport.send_message(channel, &plain, "").await { crate::slog!( "[discord] Failed to send status event to {channel}: {e}" ); } } } crate::slog!("[discord] Status subscriber task exiting — broadcaster dropped"); }); } } } else { drop(watcher_rx_for_discord); } } /// Send shutdown notifications to all active bot channels (best-effort). /// /// Called after the HTTP server stops accepting connections so that /// network I/O can still complete before PTY children are killed. pub(crate) async fn notify_shutdown(ctxs: &BotContexts) { // Slack / Discord: notifier holds the fixed channel list. if let Some(ref notifier) = ctxs.shutdown_notifier { notifier.notify(ShutdownReason::Manual).await; } // WhatsApp: read the current set of ambient rooms and notify each sender. if let Some(ref ctx) = ctxs.whatsapp_ctx_for_shutdown { let rooms: Vec = ctx .services .ambient_rooms .lock() .unwrap() .iter() .cloned() .collect(); if !rooms.is_empty() { let wa_notifier = BotShutdownNotifier::new( Arc::clone(&ctx.transport) as Arc, rooms, ctx.services.bot_name.clone(), ); wa_notifier.notify(ShutdownReason::Manual).await; } } }