From 595777f366a97aae2c6f4e8936ea284ce3eed7f0 Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 14 May 2026 18:46:35 +0000 Subject: [PATCH] huskies: merge 1054 --- .../src/chat/transport/matrix/bot/context.rs | 112 +++++++++++++++++- .../matrix/bot/messages/on_room_message.rs | 14 +++ server/src/chat/transport/matrix/bot/run.rs | 3 + 3 files changed, 128 insertions(+), 1 deletion(-) diff --git a/server/src/chat/transport/matrix/bot/context.rs b/server/src/chat/transport/matrix/bot/context.rs index 793ad356..582d6c6d 100644 --- a/server/src/chat/transport/matrix/bot/context.rs +++ b/server/src/chat/transport/matrix/bot/context.rs @@ -3,13 +3,56 @@ use crate::chat::ChatTransport; use crate::service::timer::TimerStore; use crate::services::Services; use matrix_sdk::ruma::{OwnedEventId, OwnedRoomId, OwnedUserId}; -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, HashSet, VecDeque}; use std::sync::Arc; use tokio::sync::Mutex as TokioMutex; use tokio::sync::RwLock; use super::history::ConversationHistory; +/// Maximum number of incoming event IDs retained for deduplication. +/// +/// FIFO ring (VecDeque + HashSet): when full, the oldest entry is evicted. +/// Matrix sync replays are temporally clustered — any event replayed more than +/// 1 000 events later was long since processed, so FIFO eviction is correct. +/// 1 000 × ~80 B ≈ 80 KB, negligible memory cost. +pub const SEEN_EVENT_IDS_CAP: usize = 1_000; + +/// Bounded FIFO set for deduplicating incoming Matrix event IDs. +pub struct SeenEventIds { + deque: VecDeque, + set: HashSet, + cap: usize, +} + +impl SeenEventIds { + /// Create a new set with the given capacity cap. + pub fn new(cap: usize) -> Self { + Self { + deque: VecDeque::with_capacity(cap), + set: HashSet::with_capacity(cap), + cap, + } + } + + /// Insert an event ID. Returns `true` if the ID was new (never seen), + /// `false` if already present (duplicate — caller should skip processing). + /// When the set is full, the oldest entry is evicted before inserting. + pub fn insert(&mut self, id: OwnedEventId) -> bool { + if self.set.contains(&id) { + return false; + } + if self.deque.len() == self.cap + && let Some(old) = self.deque.pop_front() + { + self.set.remove(&old); + } + self.deque.push_back(id.clone()); + self.set.insert(id); + true + } +} + /// Shared context injected into Matrix event handlers. #[derive(Clone)] pub struct BotContext { @@ -58,6 +101,12 @@ pub struct BotContext { /// `` block at the head of the next user prompt so Timmy /// sees pipeline activity without requiring a separate message. pub pending_pipeline_events: Arc>>, + /// Bounded FIFO set of already-handled incoming event IDs. + /// + /// The Matrix sync loop can replay events on reconnect. This set ensures + /// each event is processed at most once. Insert the event ID before any + /// side-effecting work; return early if the insert returns `false`. + pub handled_incoming_event_ids: Arc>, } impl BotContext { @@ -244,6 +293,9 @@ mod tests { gateway_projects, gateway_project_urls, pending_pipeline_events: Arc::new(TokioMutex::new(Vec::new())), + handled_incoming_event_ids: Arc::new(TokioMutex::new(SeenEventIds::new( + SEEN_EVENT_IDS_CAP, + ))), } } @@ -310,6 +362,64 @@ mod tests { ); } + // -- SeenEventIds deduplication ---------------------------------------- + + fn make_event_id(s: &str) -> OwnedEventId { + s.parse().unwrap() + } + + /// AC3: the same event_id presented twice is deduplicated — the second + /// insert returns false, so any downstream handler would execute only once. + #[test] + fn insert_same_event_id_twice_returns_false_on_second() { + let mut seen = SeenEventIds::new(10); + let id = make_event_id("$event1:example.com"); + assert!(seen.insert(id.clone()), "first insert must be new"); + assert!(!seen.insert(id), "second insert must be a duplicate"); + } + + /// AC4: two different event_ids with the same body content both return + /// true — dedupe is keyed strictly on event_id, never on content. + #[test] + fn insert_different_event_ids_same_body_both_new() { + let mut seen = SeenEventIds::new(10); + let id1 = make_event_id("$event1:example.com"); + let id2 = make_event_id("$event2:example.com"); + assert!(seen.insert(id1), "first event_id must be new"); + assert!( + seen.insert(id2), + "second event_id with identical body must also be new" + ); + } + + /// The set evicts the oldest entry (FIFO) when the cap is reached so that + /// subsequent inserts still work and memory stays bounded. + #[test] + fn seen_event_ids_evicts_oldest_at_cap() { + let cap = 3; + let mut seen = SeenEventIds::new(cap); + let id0 = make_event_id("$ev0:example.com"); + let id1 = make_event_id("$ev1:example.com"); + let id2 = make_event_id("$ev2:example.com"); + let id3 = make_event_id("$ev3:example.com"); + + assert!(seen.insert(id0.clone())); // deque: [id0] + assert!(seen.insert(id1.clone())); // deque: [id0, id1] + assert!(seen.insert(id2.clone())); // deque: [id0, id1, id2] — full + // Cap reached — inserting id3 evicts id0 (oldest). + assert!(seen.insert(id3.clone())); // deque: [id1, id2, id3] + // id0 was evicted, so re-inserting it returns true (treated as new). + assert!(seen.insert(id0), "evicted entry should be re-insertable"); + // Re-inserting id0 evicts id1 (new oldest). id2 and id3 are still present. + assert!(!seen.insert(id2.clone()), "id2 still in set"); + assert!(!seen.insert(id3), "id3 still in set"); + // id1 was evicted — treated as new again. + assert!( + seen.insert(id1), + "id1 was evicted and should be re-insertable" + ); + } + #[test] fn bot_context_has_no_require_verified_devices_field() { let services = test_services(PathBuf::from("/tmp")); diff --git a/server/src/chat/transport/matrix/bot/messages/on_room_message.rs b/server/src/chat/transport/matrix/bot/messages/on_room_message.rs index e9574d7e..f0858a23 100644 --- a/server/src/chat/transport/matrix/bot/messages/on_room_message.rs +++ b/server/src/chat/transport/matrix/bot/messages/on_room_message.rs @@ -33,6 +33,20 @@ pub(in crate::chat::transport::matrix::bot) async fn on_room_message( ev.sender, ); + // Deduplicate: the Matrix sync loop replays events on reconnect. Insert the + // event_id before any side-effecting work; return early if already handled. + { + let mut seen = ctx.handled_incoming_event_ids.lock().await; + if !seen.insert(ev.event_id.clone()) { + slog!( + "[matrix-bot] Dropping duplicate event {} from {}", + ev.event_id, + ev.sender + ); + return; + } + } + // Only handle messages from rooms we are configured to listen in. if !ctx.target_room_ids.iter().any(|r| r == &incoming_room_id) { slog!("[matrix-bot] Ignoring message from unconfigured room {incoming_room_id}"); diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index 58f9ea39..f350f104 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -340,6 +340,9 @@ pub async fn run_bot( gateway_projects, gateway_project_urls, pending_pipeline_events, + handled_incoming_event_ids: Arc::new(TokioMutex::new(super::context::SeenEventIds::new( + super::context::SEEN_EVENT_IDS_CAP, + ))), }; slog!(