huskies: merge 1054
This commit is contained in:
@@ -3,13 +3,56 @@ use crate::chat::ChatTransport;
|
||||
use crate::service::timer::TimerStore;
|
||||
use crate::services::Services;
|
||||
use matrix_sdk::ruma::{OwnedEventId, OwnedRoomId, OwnedUserId};
|
||||
use std::collections::{BTreeMap, HashSet};
|
||||
use std::collections::{BTreeMap, HashSet, VecDeque};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex as TokioMutex;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::history::ConversationHistory;
|
||||
|
||||
/// Maximum number of incoming event IDs retained for deduplication.
|
||||
///
|
||||
/// FIFO ring (VecDeque + HashSet): when full, the oldest entry is evicted.
|
||||
/// Matrix sync replays are temporally clustered — any event replayed more than
|
||||
/// 1 000 events later was long since processed, so FIFO eviction is correct.
|
||||
/// 1 000 × ~80 B ≈ 80 KB, negligible memory cost.
|
||||
pub const SEEN_EVENT_IDS_CAP: usize = 1_000;
|
||||
|
||||
/// Bounded FIFO set for deduplicating incoming Matrix event IDs.
|
||||
pub struct SeenEventIds {
|
||||
deque: VecDeque<OwnedEventId>,
|
||||
set: HashSet<OwnedEventId>,
|
||||
cap: usize,
|
||||
}
|
||||
|
||||
impl SeenEventIds {
|
||||
/// Create a new set with the given capacity cap.
|
||||
pub fn new(cap: usize) -> Self {
|
||||
Self {
|
||||
deque: VecDeque::with_capacity(cap),
|
||||
set: HashSet::with_capacity(cap),
|
||||
cap,
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert an event ID. Returns `true` if the ID was new (never seen),
|
||||
/// `false` if already present (duplicate — caller should skip processing).
|
||||
/// When the set is full, the oldest entry is evicted before inserting.
|
||||
pub fn insert(&mut self, id: OwnedEventId) -> bool {
|
||||
if self.set.contains(&id) {
|
||||
return false;
|
||||
}
|
||||
if self.deque.len() == self.cap
|
||||
&& let Some(old) = self.deque.pop_front()
|
||||
{
|
||||
self.set.remove(&old);
|
||||
}
|
||||
self.deque.push_back(id.clone());
|
||||
self.set.insert(id);
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared context injected into Matrix event handlers.
|
||||
#[derive(Clone)]
|
||||
pub struct BotContext {
|
||||
@@ -58,6 +101,12 @@ pub struct BotContext {
|
||||
/// `<system-reminder>` block at the head of the next user prompt so Timmy
|
||||
/// sees pipeline activity without requiring a separate message.
|
||||
pub pending_pipeline_events: Arc<TokioMutex<Vec<String>>>,
|
||||
/// Bounded FIFO set of already-handled incoming event IDs.
|
||||
///
|
||||
/// The Matrix sync loop can replay events on reconnect. This set ensures
|
||||
/// each event is processed at most once. Insert the event ID before any
|
||||
/// side-effecting work; return early if the insert returns `false`.
|
||||
pub handled_incoming_event_ids: Arc<TokioMutex<SeenEventIds>>,
|
||||
}
|
||||
|
||||
impl BotContext {
|
||||
@@ -244,6 +293,9 @@ mod tests {
|
||||
gateway_projects,
|
||||
gateway_project_urls,
|
||||
pending_pipeline_events: Arc::new(TokioMutex::new(Vec::new())),
|
||||
handled_incoming_event_ids: Arc::new(TokioMutex::new(SeenEventIds::new(
|
||||
SEEN_EVENT_IDS_CAP,
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -310,6 +362,64 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
// -- SeenEventIds deduplication ----------------------------------------
|
||||
|
||||
fn make_event_id(s: &str) -> OwnedEventId {
|
||||
s.parse().unwrap()
|
||||
}
|
||||
|
||||
/// AC3: the same event_id presented twice is deduplicated — the second
|
||||
/// insert returns false, so any downstream handler would execute only once.
|
||||
#[test]
|
||||
fn insert_same_event_id_twice_returns_false_on_second() {
|
||||
let mut seen = SeenEventIds::new(10);
|
||||
let id = make_event_id("$event1:example.com");
|
||||
assert!(seen.insert(id.clone()), "first insert must be new");
|
||||
assert!(!seen.insert(id), "second insert must be a duplicate");
|
||||
}
|
||||
|
||||
/// AC4: two different event_ids with the same body content both return
|
||||
/// true — dedupe is keyed strictly on event_id, never on content.
|
||||
#[test]
|
||||
fn insert_different_event_ids_same_body_both_new() {
|
||||
let mut seen = SeenEventIds::new(10);
|
||||
let id1 = make_event_id("$event1:example.com");
|
||||
let id2 = make_event_id("$event2:example.com");
|
||||
assert!(seen.insert(id1), "first event_id must be new");
|
||||
assert!(
|
||||
seen.insert(id2),
|
||||
"second event_id with identical body must also be new"
|
||||
);
|
||||
}
|
||||
|
||||
/// The set evicts the oldest entry (FIFO) when the cap is reached so that
|
||||
/// subsequent inserts still work and memory stays bounded.
|
||||
#[test]
|
||||
fn seen_event_ids_evicts_oldest_at_cap() {
|
||||
let cap = 3;
|
||||
let mut seen = SeenEventIds::new(cap);
|
||||
let id0 = make_event_id("$ev0:example.com");
|
||||
let id1 = make_event_id("$ev1:example.com");
|
||||
let id2 = make_event_id("$ev2:example.com");
|
||||
let id3 = make_event_id("$ev3:example.com");
|
||||
|
||||
assert!(seen.insert(id0.clone())); // deque: [id0]
|
||||
assert!(seen.insert(id1.clone())); // deque: [id0, id1]
|
||||
assert!(seen.insert(id2.clone())); // deque: [id0, id1, id2] — full
|
||||
// Cap reached — inserting id3 evicts id0 (oldest).
|
||||
assert!(seen.insert(id3.clone())); // deque: [id1, id2, id3]
|
||||
// id0 was evicted, so re-inserting it returns true (treated as new).
|
||||
assert!(seen.insert(id0), "evicted entry should be re-insertable");
|
||||
// Re-inserting id0 evicts id1 (new oldest). id2 and id3 are still present.
|
||||
assert!(!seen.insert(id2.clone()), "id2 still in set");
|
||||
assert!(!seen.insert(id3), "id3 still in set");
|
||||
// id1 was evicted — treated as new again.
|
||||
assert!(
|
||||
seen.insert(id1),
|
||||
"id1 was evicted and should be re-insertable"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bot_context_has_no_require_verified_devices_field() {
|
||||
let services = test_services(PathBuf::from("/tmp"));
|
||||
|
||||
@@ -33,6 +33,20 @@ pub(in crate::chat::transport::matrix::bot) async fn on_room_message(
|
||||
ev.sender,
|
||||
);
|
||||
|
||||
// Deduplicate: the Matrix sync loop replays events on reconnect. Insert the
|
||||
// event_id before any side-effecting work; return early if already handled.
|
||||
{
|
||||
let mut seen = ctx.handled_incoming_event_ids.lock().await;
|
||||
if !seen.insert(ev.event_id.clone()) {
|
||||
slog!(
|
||||
"[matrix-bot] Dropping duplicate event {} from {}",
|
||||
ev.event_id,
|
||||
ev.sender
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Only handle messages from rooms we are configured to listen in.
|
||||
if !ctx.target_room_ids.iter().any(|r| r == &incoming_room_id) {
|
||||
slog!("[matrix-bot] Ignoring message from unconfigured room {incoming_room_id}");
|
||||
|
||||
@@ -340,6 +340,9 @@ pub async fn run_bot(
|
||||
gateway_projects,
|
||||
gateway_project_urls,
|
||||
pending_pipeline_events,
|
||||
handled_incoming_event_ids: Arc::new(TokioMutex::new(super::context::SeenEventIds::new(
|
||||
super::context::SEEN_EVENT_IDS_CAP,
|
||||
))),
|
||||
};
|
||||
|
||||
slog!(
|
||||
|
||||
Reference in New Issue
Block a user