45f1096b96
In gateway mode the bot has no local CRDT or project filesystem, so all bot commands (status, backlog, start, assign, etc.) returned empty or broken results. Now the gateway bot proxies non-local commands via HTTP to the active project's /api/bot/command endpoint, which already exists on every project server. Only a small set of gateway-local commands (help, ambient, reset, switch) are still handled directly by the gateway. Everything else is forwarded automatically, so new commands added in the future will work through the proxy without additional gateway changes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
444 lines
18 KiB
Rust
444 lines
18 KiB
Rust
//! Matrix bot run loop — connects to the homeserver and processes sync events.
|
|
use crate::agents::AgentPool;
|
|
use crate::slog;
|
|
use matrix_sdk::ruma::OwnedRoomId;
|
|
use matrix_sdk::{Client, LoopCtrl, config::SyncSettings};
|
|
use std::collections::{HashMap, HashSet};
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
|
use tokio::sync::Mutex as TokioMutex;
|
|
use tokio::sync::{RwLock, mpsc, watch};
|
|
|
|
use super::context::BotContext;
|
|
use super::format::{format_startup_announcement, markdown_to_html};
|
|
use super::history::load_history;
|
|
use super::messages::on_room_message;
|
|
use super::verification::{on_room_verification_request, on_to_device_verification_request};
|
|
|
|
/// Connect to the Matrix homeserver, join all configured rooms, and start
|
|
/// listening for messages. Runs the full Matrix sync loop — call from a
|
|
/// `tokio::spawn` task so it doesn't block the main thread.
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub async fn run_bot(
|
|
config: super::super::config::BotConfig,
|
|
project_root: PathBuf,
|
|
watcher_rx: tokio::sync::broadcast::Receiver<crate::io::watcher::WatcherEvent>,
|
|
watcher_rx_auto: tokio::sync::broadcast::Receiver<crate::io::watcher::WatcherEvent>,
|
|
perm_rx: Arc<TokioMutex<mpsc::UnboundedReceiver<crate::http::context::PermissionForward>>>,
|
|
agents: Arc<AgentPool>,
|
|
shutdown_rx: watch::Receiver<Option<crate::rebuild::ShutdownReason>>,
|
|
gateway_active_project: Option<Arc<RwLock<String>>>,
|
|
gateway_projects: Vec<String>,
|
|
gateway_project_urls: std::collections::BTreeMap<String, String>,
|
|
) -> Result<(), String> {
|
|
let store_path = project_root.join(".huskies").join("matrix_store");
|
|
let client = Client::builder()
|
|
.homeserver_url(config.homeserver.as_deref().unwrap_or_default())
|
|
.sqlite_store(&store_path, None)
|
|
.build()
|
|
.await
|
|
.map_err(|e| format!("Failed to build Matrix client: {e}"))?;
|
|
|
|
// Persist device ID so E2EE crypto state survives restarts.
|
|
let device_id_path = project_root.join(".huskies").join("matrix_device_id");
|
|
let saved_device_id: Option<String> = std::fs::read_to_string(&device_id_path)
|
|
.ok()
|
|
.map(|s| s.trim().to_string())
|
|
.filter(|s| !s.is_empty());
|
|
|
|
let mut login_builder = client
|
|
.matrix_auth()
|
|
.login_username(
|
|
config.username.as_deref().unwrap_or_default(),
|
|
config.password.as_deref().unwrap_or_default(),
|
|
)
|
|
.initial_device_display_name("Huskies Bot");
|
|
|
|
if let Some(ref device_id) = saved_device_id {
|
|
login_builder = login_builder.device_id(device_id);
|
|
}
|
|
|
|
let login_response = login_builder
|
|
.await
|
|
.map_err(|e| format!("Matrix login failed: {e}"))?;
|
|
|
|
// Save device ID on first login so subsequent restarts reuse the same device.
|
|
if saved_device_id.is_none() {
|
|
let _ = std::fs::write(&device_id_path, &login_response.device_id);
|
|
slog!(
|
|
"[matrix-bot] Saved device ID {} for future restarts",
|
|
login_response.device_id
|
|
);
|
|
}
|
|
|
|
let bot_user_id = client
|
|
.user_id()
|
|
.ok_or_else(|| "No user ID after login".to_string())?
|
|
.to_owned();
|
|
|
|
slog!(
|
|
"[matrix-bot] Logged in as {bot_user_id} (device: {})",
|
|
login_response.device_id
|
|
);
|
|
|
|
// Bootstrap cross-signing keys for E2EE verification support.
|
|
// Pass the bot's password for UIA (User-Interactive Authentication) —
|
|
// the homeserver requires proof of identity before accepting cross-signing keys.
|
|
{
|
|
use matrix_sdk::ruma::api::client::uiaa;
|
|
let password_auth = uiaa::AuthData::Password(uiaa::Password::new(
|
|
uiaa::UserIdentifier::UserIdOrLocalpart(config.username.clone().unwrap_or_default()),
|
|
config.password.clone().unwrap_or_default(),
|
|
));
|
|
if let Err(e) = client
|
|
.encryption()
|
|
.bootstrap_cross_signing(Some(password_auth))
|
|
.await
|
|
{
|
|
slog!("[matrix-bot] Cross-signing bootstrap note: {e}");
|
|
}
|
|
}
|
|
|
|
// Self-sign own device keys so other clients don't show
|
|
// "encrypted by a device not verified by its owner" warnings.
|
|
match client.encryption().get_own_device().await {
|
|
Ok(Some(own_device)) => {
|
|
if own_device.is_cross_signed_by_owner() {
|
|
slog!("[matrix-bot] Device already self-signed");
|
|
} else {
|
|
slog!("[matrix-bot] Device not self-signed, signing now...");
|
|
match own_device.verify().await {
|
|
Ok(()) => slog!("[matrix-bot] Successfully self-signed device keys"),
|
|
Err(e) => slog!("[matrix-bot] Failed to self-sign device keys: {e}"),
|
|
}
|
|
}
|
|
}
|
|
Ok(None) => slog!("[matrix-bot] Could not find own device in crypto store"),
|
|
Err(e) => slog!("[matrix-bot] Error retrieving own device: {e}"),
|
|
}
|
|
|
|
if config.allowed_users.is_empty() {
|
|
return Err(
|
|
"allowed_users is empty in bot.toml — refusing to start (fail-closed). \
|
|
Add at least one Matrix user ID to allowed_users."
|
|
.to_string(),
|
|
);
|
|
}
|
|
|
|
slog!("[matrix-bot] Allowed users: {:?}", config.allowed_users);
|
|
|
|
// Parse and join all configured rooms.
|
|
let mut target_room_ids: Vec<OwnedRoomId> = Vec::new();
|
|
for room_id_str in config.effective_room_ids() {
|
|
let room_id: OwnedRoomId = room_id_str
|
|
.parse()
|
|
.map_err(|_| format!("Invalid room ID '{room_id_str}'"))?;
|
|
|
|
// Try to join with a timeout. Conduit sometimes hangs or returns
|
|
// errors on join if the bot is already a member.
|
|
match tokio::time::timeout(
|
|
std::time::Duration::from_secs(10),
|
|
client.join_room_by_id(&room_id),
|
|
)
|
|
.await
|
|
{
|
|
Ok(Ok(_)) => slog!("[matrix-bot] Joined room {room_id}"),
|
|
Ok(Err(e)) => {
|
|
slog!("[matrix-bot] Join room error (may already be a member): {e}")
|
|
}
|
|
Err(_) => slog!("[matrix-bot] Join room timed out (may already be a member)"),
|
|
}
|
|
|
|
target_room_ids.push(room_id);
|
|
}
|
|
|
|
if target_room_ids.is_empty() {
|
|
return Err("No valid room IDs configured — cannot start".to_string());
|
|
}
|
|
|
|
slog!(
|
|
"[matrix-bot] Listening in {} room(s): {:?}",
|
|
target_room_ids.len(),
|
|
target_room_ids
|
|
);
|
|
|
|
// Clone values needed by the notification listener and startup announcement
|
|
// before they are moved into BotContext.
|
|
let notif_room_ids = target_room_ids.clone();
|
|
let notif_project_root = project_root.clone();
|
|
let announce_room_ids = target_room_ids.clone();
|
|
|
|
let persisted = load_history(&project_root);
|
|
slog!(
|
|
"[matrix-bot] Loaded persisted conversation history for {} room(s)",
|
|
persisted.len()
|
|
);
|
|
|
|
// Restore persisted ambient rooms from config.
|
|
let persisted_ambient: HashSet<String> = config.ambient_rooms.iter().cloned().collect();
|
|
if !persisted_ambient.is_empty() {
|
|
slog!(
|
|
"[matrix-bot] Restored ambient mode for {} room(s): {:?}",
|
|
persisted_ambient.len(),
|
|
persisted_ambient
|
|
);
|
|
}
|
|
|
|
// Create the transport abstraction based on the configured transport type.
|
|
let transport: Arc<dyn crate::chat::ChatTransport> = match config.transport.as_str() {
|
|
"whatsapp" => {
|
|
if config.whatsapp_provider == "twilio" {
|
|
slog!("[matrix-bot] Using WhatsApp/Twilio transport");
|
|
Arc::new(
|
|
crate::chat::transport::whatsapp::TwilioWhatsAppTransport::new(
|
|
config.twilio_account_sid.clone().unwrap_or_default(),
|
|
config.twilio_auth_token.clone().unwrap_or_default(),
|
|
config.twilio_whatsapp_number.clone().unwrap_or_default(),
|
|
),
|
|
)
|
|
} else {
|
|
slog!("[matrix-bot] Using WhatsApp/Meta transport");
|
|
Arc::new(crate::chat::transport::whatsapp::WhatsAppTransport::new(
|
|
config.whatsapp_phone_number_id.clone().unwrap_or_default(),
|
|
config.whatsapp_access_token.clone().unwrap_or_default(),
|
|
config
|
|
.whatsapp_notification_template
|
|
.clone()
|
|
.unwrap_or_else(|| "pipeline_notification".to_string()),
|
|
))
|
|
}
|
|
}
|
|
_ => {
|
|
slog!("[matrix-bot] Using Matrix transport");
|
|
Arc::new(super::super::transport_impl::MatrixTransport::new(
|
|
client.clone(),
|
|
))
|
|
}
|
|
};
|
|
|
|
let bot_name = config
|
|
.display_name
|
|
.clone()
|
|
.unwrap_or_else(|| "Assistant".to_string());
|
|
let announce_bot_name = bot_name.clone();
|
|
|
|
let timer_store = Arc::new(crate::chat::timer::TimerStore::load(
|
|
project_root.join(".huskies").join("timers.json"),
|
|
));
|
|
// Auto-schedule timers when an agent hits a hard rate limit.
|
|
crate::chat::timer::spawn_rate_limit_auto_scheduler(Arc::clone(&timer_store), watcher_rx_auto);
|
|
|
|
let ctx = BotContext {
|
|
bot_user_id,
|
|
target_room_ids,
|
|
project_root,
|
|
allowed_users: config.allowed_users,
|
|
history: Arc::new(TokioMutex::new(persisted)),
|
|
history_size: config.history_size,
|
|
bot_sent_event_ids: Arc::new(TokioMutex::new(HashSet::new())),
|
|
perm_rx,
|
|
pending_perm_replies: Arc::new(TokioMutex::new(HashMap::new())),
|
|
permission_timeout_secs: config.permission_timeout_secs,
|
|
bot_name,
|
|
ambient_rooms: Arc::new(std::sync::Mutex::new(persisted_ambient)),
|
|
agents,
|
|
htop_sessions: Arc::new(TokioMutex::new(HashMap::new())),
|
|
transport: Arc::clone(&transport),
|
|
timer_store,
|
|
gateway_active_project,
|
|
gateway_projects,
|
|
gateway_project_urls,
|
|
};
|
|
|
|
slog!(
|
|
"[matrix-bot] Cryptographic identity verification is always ON — commands from unencrypted rooms or unverified devices are rejected"
|
|
);
|
|
|
|
// Register event handlers and inject shared context.
|
|
client.add_event_handler_context(ctx);
|
|
client.add_event_handler(on_room_message);
|
|
client.add_event_handler(on_to_device_verification_request);
|
|
client.add_event_handler(on_room_verification_request);
|
|
|
|
// Spawn the stage-transition notification listener before entering the
|
|
// sync loop so it starts receiving watcher events immediately.
|
|
let notif_room_id_strings: Vec<String> = notif_room_ids.iter().map(|r| r.to_string()).collect();
|
|
super::super::notifications::spawn_notification_listener(
|
|
Arc::clone(&transport),
|
|
move || notif_room_id_strings.clone(),
|
|
watcher_rx,
|
|
notif_project_root,
|
|
);
|
|
|
|
// Spawn a shutdown watcher that sends a best-effort goodbye message to all
|
|
// configured rooms when the server is about to stop (SIGINT/SIGTERM or rebuild).
|
|
{
|
|
let shutdown_transport = Arc::clone(&transport);
|
|
let shutdown_rooms: Vec<String> = announce_room_ids.iter().map(|r| r.to_string()).collect();
|
|
let shutdown_bot_name = announce_bot_name.clone();
|
|
let mut rx = shutdown_rx;
|
|
tokio::spawn(async move {
|
|
// Wait until the channel holds Some(reason).
|
|
if rx.wait_for(|v| v.is_some()).await.is_ok() {
|
|
let reason = rx.borrow().clone();
|
|
let notifier = crate::rebuild::BotShutdownNotifier::new(
|
|
shutdown_transport,
|
|
shutdown_rooms,
|
|
shutdown_bot_name,
|
|
);
|
|
if let Some(r) = reason {
|
|
notifier.notify(r).await;
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
// Send a startup announcement to each configured room so users know the
|
|
// bot is online. This runs once per process start — the sync loop handles
|
|
// reconnects internally so this code is never reached again on a network
|
|
// blip or sync resumption.
|
|
let announce_msg = format_startup_announcement(&announce_bot_name);
|
|
let announce_html = markdown_to_html(&announce_msg);
|
|
slog!("[matrix-bot] Sending startup announcement: {announce_msg}");
|
|
for room_id in &announce_room_ids {
|
|
let room_id_str = room_id.to_string();
|
|
if let Err(e) = transport
|
|
.send_message(&room_id_str, &announce_msg, &announce_html)
|
|
.await
|
|
{
|
|
slog!("[matrix-bot] Failed to send startup announcement to {room_id}: {e}");
|
|
}
|
|
}
|
|
|
|
slog!("[matrix-bot] Starting Matrix sync loop");
|
|
|
|
// Retry state — shared across `Fn` closure invocations via Arc atomics.
|
|
const MAX_BACKOFF_SECS: u64 = 300;
|
|
const INITIAL_BACKOFF_SECS: u64 = 5;
|
|
let backoff = Arc::new(AtomicU64::new(INITIAL_BACKOFF_SECS));
|
|
let was_disconnected = Arc::new(AtomicBool::new(false));
|
|
|
|
let sync_transport = Arc::clone(&transport);
|
|
let sync_rooms: Vec<String> = announce_room_ids.iter().map(|r| r.to_string()).collect();
|
|
let sync_bot_name = announce_bot_name.clone();
|
|
|
|
let backoff_cb = Arc::clone(&backoff);
|
|
let was_disconnected_cb = Arc::clone(&was_disconnected);
|
|
|
|
// Use sync_with_result_callback so transient errors (network blips, DNS
|
|
// hiccups, temporary homeserver outages) are handled in the callback
|
|
// rather than bubbling up as fatal errors. Fatal errors (HTTP 401/403)
|
|
// still terminate the loop and propagate to the caller.
|
|
client
|
|
.sync_with_result_callback(SyncSettings::default(), move |result| {
|
|
let backoff = Arc::clone(&backoff_cb);
|
|
let was_disconnected = Arc::clone(&was_disconnected_cb);
|
|
let recovery_transport = Arc::clone(&sync_transport);
|
|
let recovery_rooms = sync_rooms.clone();
|
|
let recovery_bot_name = sync_bot_name.clone();
|
|
async move {
|
|
match result {
|
|
Ok(_) => {
|
|
// If we previously lost the connection, announce recovery.
|
|
if was_disconnected.swap(false, Ordering::Relaxed) {
|
|
backoff.store(INITIAL_BACKOFF_SECS, Ordering::Relaxed);
|
|
slog!("[matrix-bot] Reconnected to homeserver — resuming normal operation");
|
|
let msg = format!(
|
|
"⚡ **{recovery_bot_name}** reconnected to homeserver."
|
|
);
|
|
let html = format!(
|
|
"<p>⚡ <strong>{recovery_bot_name}</strong> reconnected to homeserver.</p>"
|
|
);
|
|
for room_id in &recovery_rooms {
|
|
if let Err(e) = recovery_transport
|
|
.send_message(room_id, &msg, &html)
|
|
.await
|
|
{
|
|
slog!(
|
|
"[matrix-bot] Failed to send recovery notification to {room_id}: {e}"
|
|
);
|
|
}
|
|
}
|
|
}
|
|
Ok(LoopCtrl::Continue)
|
|
}
|
|
Err(e) if is_fatal_sync_error(&e) => Err(e),
|
|
Err(e) => {
|
|
// Transient error: log, back off, and let the stream retry.
|
|
let delay = backoff.load(Ordering::Relaxed);
|
|
slog!("[matrix-bot] Sync warning (retrying in {delay}s): {e}");
|
|
was_disconnected.store(true, Ordering::Relaxed);
|
|
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
|
|
let new_delay = (delay * 2).min(MAX_BACKOFF_SECS);
|
|
backoff.store(new_delay, Ordering::Relaxed);
|
|
Ok(LoopCtrl::Continue)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
.await
|
|
.map_err(|e| format!("Matrix sync error: {e}"))?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Returns `true` for errors that indicate the bot's session is permanently
|
|
/// invalid (HTTP 401 Unauthorized or 403 Forbidden). All other errors —
|
|
/// network failures, timeouts, transient 5xx responses — are considered
|
|
/// recoverable and should be retried with exponential back-off.
|
|
fn is_fatal_sync_error(e: &matrix_sdk::Error) -> bool {
|
|
e.as_client_api_error()
|
|
.map(|api_err| {
|
|
let code = api_err.status_code.as_u16();
|
|
code == 401 || code == 403
|
|
})
|
|
.unwrap_or(false)
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
/// An I/O error (e.g. connection refused) must NOT be treated as fatal so
|
|
/// that the sync loop retries rather than shutting the bot down.
|
|
#[test]
|
|
fn io_error_is_not_fatal() {
|
|
let e: matrix_sdk::Error =
|
|
std::io::Error::new(std::io::ErrorKind::ConnectionRefused, "connection refused").into();
|
|
assert!(!is_fatal_sync_error(&e));
|
|
}
|
|
|
|
/// Exponential back-off must clamp at MAX_BACKOFF_SECS (300 s) regardless
|
|
/// of how many consecutive failures occur.
|
|
#[test]
|
|
fn backoff_clamps_at_max() {
|
|
const MAX_BACKOFF_SECS: u64 = 300;
|
|
let mut delay = 5u64;
|
|
for _ in 0..20 {
|
|
delay = (delay * 2).min(MAX_BACKOFF_SECS);
|
|
}
|
|
assert_eq!(delay, MAX_BACKOFF_SECS);
|
|
}
|
|
|
|
/// Back-off must at least double each step before clamping.
|
|
#[test]
|
|
fn backoff_doubles_each_step() {
|
|
const MAX_BACKOFF_SECS: u64 = 300;
|
|
let steps: Vec<u64> = std::iter::successors(Some(5u64), |&d| {
|
|
let next = (d * 2).min(MAX_BACKOFF_SECS);
|
|
if next < MAX_BACKOFF_SECS {
|
|
Some(next)
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect();
|
|
// First few steps: 5, 10, 20, 40, 80, 160
|
|
assert_eq!(steps[0], 5);
|
|
assert_eq!(steps[1], 10);
|
|
assert_eq!(steps[2], 20);
|
|
assert_eq!(steps[3], 40);
|
|
}
|
|
}
|