storkit: merge 419_bug_matrix_bot_crashes_on_transient_network_error_instead_of_retrying

This commit is contained in:
dave
2026-03-28 09:08:27 +00:00
parent 05db012aaf
commit 1193b7ac9a
+121 -3
View File
@@ -1,7 +1,8 @@
use crate::agents::AgentPool; use crate::agents::AgentPool;
use crate::slog; use crate::slog;
use matrix_sdk::{Client, config::SyncSettings}; use matrix_sdk::{Client, LoopCtrl, config::SyncSettings};
use matrix_sdk::ruma::OwnedRoomId; use matrix_sdk::ruma::OwnedRoomId;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
@@ -304,12 +305,129 @@ pub async fn run_bot(
slog!("[matrix-bot] Starting Matrix sync loop"); slog!("[matrix-bot] Starting Matrix sync loop");
// This blocks until the connection is terminated or an error occurs. // Retry state — shared across `Fn` closure invocations via Arc atomics.
const MAX_BACKOFF_SECS: u64 = 300;
const INITIAL_BACKOFF_SECS: u64 = 5;
let backoff = Arc::new(AtomicU64::new(INITIAL_BACKOFF_SECS));
let was_disconnected = Arc::new(AtomicBool::new(false));
let sync_transport = Arc::clone(&transport);
let sync_rooms: Vec<String> = announce_room_ids.iter().map(|r| r.to_string()).collect();
let sync_bot_name = announce_bot_name.clone();
let backoff_cb = Arc::clone(&backoff);
let was_disconnected_cb = Arc::clone(&was_disconnected);
// Use sync_with_result_callback so transient errors (network blips, DNS
// hiccups, temporary homeserver outages) are handled in the callback
// rather than bubbling up as fatal errors. Fatal errors (HTTP 401/403)
// still terminate the loop and propagate to the caller.
client client
.sync(SyncSettings::default()) .sync_with_result_callback(SyncSettings::default(), move |result| {
let backoff = Arc::clone(&backoff_cb);
let was_disconnected = Arc::clone(&was_disconnected_cb);
let recovery_transport = Arc::clone(&sync_transport);
let recovery_rooms = sync_rooms.clone();
let recovery_bot_name = sync_bot_name.clone();
async move {
match result {
Ok(_) => {
// If we previously lost the connection, announce recovery.
if was_disconnected.swap(false, Ordering::Relaxed) {
backoff.store(INITIAL_BACKOFF_SECS, Ordering::Relaxed);
slog!("[matrix-bot] Reconnected to homeserver — resuming normal operation");
let msg = format!(
"⚡ **{recovery_bot_name}** reconnected to homeserver."
);
let html = format!(
"<p>⚡ <strong>{recovery_bot_name}</strong> reconnected to homeserver.</p>"
);
for room_id in &recovery_rooms {
if let Err(e) = recovery_transport
.send_message(room_id, &msg, &html)
.await
{
slog!(
"[matrix-bot] Failed to send recovery notification to {room_id}: {e}"
);
}
}
}
Ok(LoopCtrl::Continue)
}
Err(e) if is_fatal_sync_error(&e) => Err(e),
Err(e) => {
// Transient error: log, back off, and let the stream retry.
let delay = backoff.load(Ordering::Relaxed);
slog!("[matrix-bot] Sync warning (retrying in {delay}s): {e}");
was_disconnected.store(true, Ordering::Relaxed);
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
let new_delay = (delay * 2).min(MAX_BACKOFF_SECS);
backoff.store(new_delay, Ordering::Relaxed);
Ok(LoopCtrl::Continue)
}
}
}
})
.await .await
.map_err(|e| format!("Matrix sync error: {e}"))?; .map_err(|e| format!("Matrix sync error: {e}"))?;
Ok(()) Ok(())
} }
/// Returns `true` for errors that indicate the bot's session is permanently
/// invalid (HTTP 401 Unauthorized or 403 Forbidden). All other errors —
/// network failures, timeouts, transient 5xx responses — are considered
/// recoverable and should be retried with exponential back-off.
fn is_fatal_sync_error(e: &matrix_sdk::Error) -> bool {
e.as_client_api_error()
.map(|api_err| {
let code = api_err.status_code.as_u16();
code == 401 || code == 403
})
.unwrap_or(false)
}
#[cfg(test)]
mod tests {
use super::*;
/// An I/O error (e.g. connection refused) must NOT be treated as fatal so
/// that the sync loop retries rather than shutting the bot down.
#[test]
fn io_error_is_not_fatal() {
let e: matrix_sdk::Error =
std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "connection refused")
.into();
assert!(!is_fatal_sync_error(&e));
}
/// Exponential back-off must clamp at MAX_BACKOFF_SECS (300 s) regardless
/// of how many consecutive failures occur.
#[test]
fn backoff_clamps_at_max() {
const MAX_BACKOFF_SECS: u64 = 300;
let mut delay = 5u64;
for _ in 0..20 {
delay = (delay * 2).min(MAX_BACKOFF_SECS);
}
assert_eq!(delay, MAX_BACKOFF_SECS);
}
/// Back-off must at least double each step before clamping.
#[test]
fn backoff_doubles_each_step() {
const MAX_BACKOFF_SECS: u64 = 300;
let steps: Vec<u64> = std::iter::successors(Some(5u64), |&d| {
let next = (d * 2).min(MAX_BACKOFF_SECS);
if next < MAX_BACKOFF_SECS { Some(next) } else { None }
})
.collect();
// First few steps: 5, 10, 20, 40, 80, 160
assert_eq!(steps[0], 5);
assert_eq!(steps[1], 10);
assert_eq!(steps[2], 20);
assert_eq!(steps[3], 40);
}
}