diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index a76c5891..cadd98e2 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -1,7 +1,8 @@ use crate::agents::AgentPool; use crate::slog; -use matrix_sdk::{Client, config::SyncSettings}; +use matrix_sdk::{Client, LoopCtrl, config::SyncSettings}; use matrix_sdk::ruma::OwnedRoomId; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::collections::{HashMap, HashSet}; use std::path::PathBuf; use std::sync::Arc; @@ -304,12 +305,129 @@ pub async fn run_bot( slog!("[matrix-bot] Starting Matrix sync loop"); - // This blocks until the connection is terminated or an error occurs. + // Retry state — shared across `Fn` closure invocations via Arc atomics. + const MAX_BACKOFF_SECS: u64 = 300; + const INITIAL_BACKOFF_SECS: u64 = 5; + let backoff = Arc::new(AtomicU64::new(INITIAL_BACKOFF_SECS)); + let was_disconnected = Arc::new(AtomicBool::new(false)); + + let sync_transport = Arc::clone(&transport); + let sync_rooms: Vec = announce_room_ids.iter().map(|r| r.to_string()).collect(); + let sync_bot_name = announce_bot_name.clone(); + + let backoff_cb = Arc::clone(&backoff); + let was_disconnected_cb = Arc::clone(&was_disconnected); + + // Use sync_with_result_callback so transient errors (network blips, DNS + // hiccups, temporary homeserver outages) are handled in the callback + // rather than bubbling up as fatal errors. Fatal errors (HTTP 401/403) + // still terminate the loop and propagate to the caller. client - .sync(SyncSettings::default()) + .sync_with_result_callback(SyncSettings::default(), move |result| { + let backoff = Arc::clone(&backoff_cb); + let was_disconnected = Arc::clone(&was_disconnected_cb); + let recovery_transport = Arc::clone(&sync_transport); + let recovery_rooms = sync_rooms.clone(); + let recovery_bot_name = sync_bot_name.clone(); + async move { + match result { + Ok(_) => { + // If we previously lost the connection, announce recovery. + if was_disconnected.swap(false, Ordering::Relaxed) { + backoff.store(INITIAL_BACKOFF_SECS, Ordering::Relaxed); + slog!("[matrix-bot] Reconnected to homeserver — resuming normal operation"); + let msg = format!( + "⚡ **{recovery_bot_name}** reconnected to homeserver." + ); + let html = format!( + "

{recovery_bot_name} reconnected to homeserver.

" + ); + for room_id in &recovery_rooms { + if let Err(e) = recovery_transport + .send_message(room_id, &msg, &html) + .await + { + slog!( + "[matrix-bot] Failed to send recovery notification to {room_id}: {e}" + ); + } + } + } + Ok(LoopCtrl::Continue) + } + Err(e) if is_fatal_sync_error(&e) => Err(e), + Err(e) => { + // Transient error: log, back off, and let the stream retry. + let delay = backoff.load(Ordering::Relaxed); + slog!("[matrix-bot] Sync warning (retrying in {delay}s): {e}"); + was_disconnected.store(true, Ordering::Relaxed); + tokio::time::sleep(std::time::Duration::from_secs(delay)).await; + let new_delay = (delay * 2).min(MAX_BACKOFF_SECS); + backoff.store(new_delay, Ordering::Relaxed); + Ok(LoopCtrl::Continue) + } + } + } + }) .await .map_err(|e| format!("Matrix sync error: {e}"))?; Ok(()) } +/// Returns `true` for errors that indicate the bot's session is permanently +/// invalid (HTTP 401 Unauthorized or 403 Forbidden). All other errors — +/// network failures, timeouts, transient 5xx responses — are considered +/// recoverable and should be retried with exponential back-off. +fn is_fatal_sync_error(e: &matrix_sdk::Error) -> bool { + e.as_client_api_error() + .map(|api_err| { + let code = api_err.status_code.as_u16(); + code == 401 || code == 403 + }) + .unwrap_or(false) +} + +#[cfg(test)] +mod tests { + use super::*; + + /// An I/O error (e.g. connection refused) must NOT be treated as fatal so + /// that the sync loop retries rather than shutting the bot down. + #[test] + fn io_error_is_not_fatal() { + let e: matrix_sdk::Error = + std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "connection refused") + .into(); + assert!(!is_fatal_sync_error(&e)); + } + + /// Exponential back-off must clamp at MAX_BACKOFF_SECS (300 s) regardless + /// of how many consecutive failures occur. + #[test] + fn backoff_clamps_at_max() { + const MAX_BACKOFF_SECS: u64 = 300; + let mut delay = 5u64; + for _ in 0..20 { + delay = (delay * 2).min(MAX_BACKOFF_SECS); + } + assert_eq!(delay, MAX_BACKOFF_SECS); + } + + /// Back-off must at least double each step before clamping. + #[test] + fn backoff_doubles_each_step() { + const MAX_BACKOFF_SECS: u64 = 300; + let steps: Vec = std::iter::successors(Some(5u64), |&d| { + let next = (d * 2).min(MAX_BACKOFF_SECS); + if next < MAX_BACKOFF_SECS { Some(next) } else { None } + }) + .collect(); + // First few steps: 5, 10, 20, 40, 80, 160 + assert_eq!(steps[0], 5); + assert_eq!(steps[1], 10); + assert_eq!(steps[2], 20); + assert_eq!(steps[3], 40); + } +} +