From fc5481dbe418eb76f8cdce32dd1ed66891d94ed5 Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 15 May 2026 11:57:00 +0000 Subject: [PATCH] =?UTF-8?q?huskies:=20merge=201093=20bug=20Chat=20dispatch?= =?UTF-8?q?er=20spawns=20one=20Timmy=20per=20inbound=20message=20=E2=80=94?= =?UTF-8?q?=20needs=20coalesce=20window=20+=20per-session=20serial=20lock?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/src/agent_mode/context.rs | 1 + server/src/chat/dispatcher.rs | 367 ++++++++++++++++++ server/src/chat/mod.rs | 2 + .../src/chat/transport/matrix/bot/context.rs | 1 + .../matrix/bot/messages/handle_message.rs | 4 +- .../matrix/bot/messages/on_room_message.rs | 57 ++- .../matrix/bot/permission_listener.rs | 1 + .../src/chat/transport/matrix/config/types.rs | 15 + .../chat/transport/whatsapp/commands/mod.rs | 1 + server/src/http/context.rs | 1 + server/src/main.rs | 6 + server/src/service/bot_command/io.rs | 1 + server/src/service/gateway/io.rs | 6 + server/src/services.rs | 8 + server/src/sled_uplink.rs | 3 + 15 files changed, 466 insertions(+), 8 deletions(-) create mode 100644 server/src/chat/dispatcher.rs diff --git a/server/src/agent_mode/context.rs b/server/src/agent_mode/context.rs index b4eda9e4..fc327747 100644 --- a/server/src/agent_mode/context.rs +++ b/server/src/agent_mode/context.rs @@ -78,6 +78,7 @@ pub(super) fn build_agent_app_context( pending_perm_replies: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), permission_timeout_secs: 120, status: agents.status_broadcaster(), + chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)), }); crate::http::context::AppContext { state: Arc::new(state), diff --git a/server/src/chat/dispatcher.rs b/server/src/chat/dispatcher.rs new file mode 100644 index 00000000..27f4c8b9 --- /dev/null +++ b/server/src/chat/dispatcher.rs @@ -0,0 +1,367 @@ +//! Protocol-agnostic chat dispatcher — coalesce window + per-session serial lock. +//! +//! Sits between every inbound transport (Matrix, Slack, WhatsApp, …) and the +//! `claude -p` spawner. Transport handlers call [`ChatDispatcher::submit`] +//! instead of spawning directly; the dispatcher enforces two invariants: +//! +//! 1. **Coalesce window**: messages arriving for the same session within +//! `coalesce_ms` of each other are concatenated and delivered to a single +//! spawn. The window is a *debounce*: each new message extends the window by +//! `coalesce_ms` from its arrival time, so bursts flush as one batch. +//! +//! 2. **Per-session serial lock**: while one `claude -p` run is active, further +//! messages for that session queue up and are dispatched as a single batch +//! once the running invocation completes. +//! +//! A [`ChatDispatcher::stop`] call cancels the active run for a session and +//! discards the pending queue. + +use crate::slog; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tokio::sync::{mpsc, watch}; + +/// A factory function that produces one LLM execution future per dispatch. +/// +/// Arguments: +/// - `String` — the (possibly concatenated) prompt to send to `claude -p`. +/// - `watch::Receiver` — send `true` on this channel to cancel the run. +/// +/// Returns a boxed, pinned `Send + 'static` future that resolves when the LLM +/// session ends (whether normally or via cancellation). +pub type SpawnFn = Arc< + dyn Fn( + String, + watch::Receiver, + ) -> Pin + Send + 'static>> + + Send + + Sync, +>; + +enum SessionMsg { + UserMessage { text: String, factory: SpawnFn }, + Stop, +} + +struct SessionHandle { + tx: mpsc::UnboundedSender, +} + +/// Coalescing, serialising dispatcher for chat-to-LLM message routing. +/// +/// Construct once at startup via [`ChatDispatcher::new`] and share via `Arc`. +/// Call [`submit`](ChatDispatcher::submit) from every transport handler instead +/// of spawning `claude -p` directly. +pub struct ChatDispatcher { + sessions: Mutex>, + coalesce_ms: u64, +} + +impl ChatDispatcher { + /// Create a new dispatcher with the given coalesce window in milliseconds. + pub fn new(coalesce_ms: u64) -> Self { + Self { + sessions: Mutex::new(HashMap::new()), + coalesce_ms, + } + } + + /// Submit a message for a chat session. + /// + /// If no session task exists for `session_key`, one is created lazily. + /// The `factory` is called by the session task when the coalesce window + /// closes (or immediately after the current run finishes, for pending + /// messages). + pub fn submit(&self, session_key: String, message: String, factory: SpawnFn) { + let mut guard = self.sessions.lock().unwrap(); + let coalesce_ms = self.coalesce_ms; + let handle = guard.entry(session_key.clone()).or_insert_with(|| { + let (tx, rx) = mpsc::unbounded_channel(); + tokio::spawn(session_task(session_key.clone(), rx, coalesce_ms)); + SessionHandle { tx } + }); + let _ = handle.tx.send(SessionMsg::UserMessage { + text: message, + factory, + }); + } + + /// Stop the active LLM run for `session_key` and clear its pending queue. + /// + /// Returns `true` if the session existed (whether or not anything was + /// actually running), `false` if no session for that key has been created. + pub fn stop(&self, session_key: &str) -> bool { + let guard = self.sessions.lock().unwrap(); + if let Some(handle) = guard.get(session_key) { + let _ = handle.tx.send(SessionMsg::Stop); + true + } else { + false + } + } +} + +/// Per-session background task. +/// +/// Phases: +/// 1. **Wait** — blocks until the first `UserMessage` arrives. +/// 2. **Coalesce** — extends the window by `coalesce_ms` on each new message; +/// fires when no message arrives within the window. +/// 3. **Run** — calls the factory with the concatenated batch; while running, +/// collects further `UserMessage`s into a pending list and logs a warn per +/// message. A `Stop` message cancels the running call and clears pending. +/// 4. **Drain** — after the run, if pending is non-empty, fires a second run +/// with the accumulated batch and loops back to step 3. +/// 5. Returns to step 1 when pending is empty. +async fn session_task( + session_key: String, + mut rx: mpsc::UnboundedReceiver, + coalesce_ms: u64, +) { + let coalesce_dur = Duration::from_millis(coalesce_ms); + + loop { + // ── Phase 1: wait for the first message ───────────────────────────── + let (first_text, first_factory) = loop { + match rx.recv().await { + None => return, + Some(SessionMsg::Stop) => continue, + Some(SessionMsg::UserMessage { text, factory }) => break (text, factory), + } + }; + + // ── Phase 2: coalesce window (debounce) ────────────────────────────── + let mut batch: Vec = vec![first_text]; + let mut latest_factory: SpawnFn = first_factory; + let mut deadline = tokio::time::Instant::now() + coalesce_dur; + + 'coalesce: loop { + let now = tokio::time::Instant::now(); + if now >= deadline { + break 'coalesce; + } + let remaining = deadline - now; + match tokio::time::timeout(remaining, rx.recv()).await { + Err(_) => break 'coalesce, // window closed + Ok(None) => return, // channel closed → exit task + Ok(Some(SessionMsg::Stop)) => { + batch.clear(); + break 'coalesce; + } + Ok(Some(SessionMsg::UserMessage { text, factory })) => { + batch.push(text); + latest_factory = factory; + // Extend deadline on each new message (debounce). + deadline = tokio::time::Instant::now() + coalesce_dur; + } + } + } + + if batch.is_empty() { + continue; // Stop received during coalesce — restart + } + + // ── Phase 3 + 4: run → drain pending → repeat ─────────────────────── + let mut prompt = batch.join("\n\n"); + let mut factory = latest_factory; + + loop { + let (cancel_tx, cancel_rx) = watch::channel(false); + let llm_fut = factory(prompt, cancel_rx); + let mut llm_task = tokio::spawn(llm_fut); + + let mut pending_texts: Vec = vec![]; + let mut pending_factory: Option = None; + let mut stopped = false; + + // Wait for the LLM to finish, collecting messages that arrive during the run. + loop { + tokio::select! { + _ = &mut llm_task => { break; } + msg = rx.recv() => { + match msg { + None => { + llm_task.abort(); + return; + } + Some(SessionMsg::Stop) => { + let _ = cancel_tx.send(true); + let _ = llm_task.await; + pending_texts.clear(); + stopped = true; + break; + } + Some(SessionMsg::UserMessage { text, factory: f }) => { + pending_texts.push(text); + let depth = pending_texts.len(); + slog!( + "[chat-dispatcher] coalescing message for session={}, queue_depth={}", + session_key, + depth, + ); + pending_factory = Some(f); + } + } + } + } + } + + if stopped || pending_texts.is_empty() { + break; // back to Phase 1 + } + + // Fire the pending batch as the next run (no additional coalesce window). + prompt = pending_texts.join("\n\n"); + factory = pending_factory.unwrap(); + } + } +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + + fn make_factory(spawn_count: Arc, run_ms: u64) -> SpawnFn { + Arc::new(move |_prompt: String, _cancel_rx: watch::Receiver| { + let count = Arc::clone(&spawn_count); + Box::pin(async move { + count.fetch_add(1, Ordering::SeqCst); + tokio::time::sleep(Duration::from_millis(run_ms)).await; + }) + }) + } + + /// AC 6 regression: three messages arriving 200 ms / (long gap) / (after run) + /// apart on the same session must produce at most two spawns, never three + /// concurrent processes. + /// + /// Setup: + /// coalesce_ms = 50 ms (short window so test runs fast) + /// LLM "run" = 150 ms + /// msg1 @ t=0 + /// msg2 @ t=20 ms — within coalesce window, merged with msg1 → 1 spawn + /// msg3 @ t=300 ms — after run completes → 2nd spawn + /// + /// Expected: exactly 2 spawns, never 3. + #[tokio::test] + async fn three_messages_never_three_concurrent_spawns() { + let spawn_count = Arc::new(AtomicUsize::new(0)); + let dispatcher = Arc::new(ChatDispatcher::new(50)); + let session = "room1".to_string(); + + // msg1 at t=0 + dispatcher.submit( + session.clone(), + "msg1".to_string(), + make_factory(Arc::clone(&spawn_count), 150), + ); + + // msg2 at t=20 ms — inside the 50 ms coalesce window + tokio::time::sleep(Duration::from_millis(20)).await; + dispatcher.submit( + session.clone(), + "msg2".to_string(), + make_factory(Arc::clone(&spawn_count), 150), + ); + + // msg3 at t=300 ms — after the coalesce window fires (t≈70 ms) and the + // 150 ms run completes (t≈220 ms), so msg3 starts a second coalesce cycle. + tokio::time::sleep(Duration::from_millis(280)).await; + dispatcher.submit( + session.clone(), + "msg3".to_string(), + make_factory(Arc::clone(&spawn_count), 150), + ); + + // Wait long enough for both runs to finish. + tokio::time::sleep(Duration::from_millis(500)).await; + + let count = spawn_count.load(Ordering::SeqCst); + assert!( + (1..=2).contains(&count), + "expected 1 or 2 spawns (msgs 1+2 coalesced, msg3 separate), got {count}" + ); + } + + /// Messages that arrive while the LLM is running are not lost — they are + /// delivered as a single follow-up spawn once the first run completes. + #[tokio::test] + async fn pending_messages_dispatched_after_run_completes() { + let spawn_count = Arc::new(AtomicUsize::new(0)); + let dispatcher = Arc::new(ChatDispatcher::new(50)); + let session = "room2".to_string(); + + // First message — starts a 200 ms run. + dispatcher.submit( + session.clone(), + "first".to_string(), + make_factory(Arc::clone(&spawn_count), 200), + ); + + // Wait for coalesce window to fire, then send two more. + tokio::time::sleep(Duration::from_millis(100)).await; + dispatcher.submit( + session.clone(), + "second".to_string(), + make_factory(Arc::clone(&spawn_count), 50), + ); + dispatcher.submit( + session.clone(), + "third".to_string(), + make_factory(Arc::clone(&spawn_count), 50), + ); + + // Wait long enough for both runs. + tokio::time::sleep(Duration::from_millis(600)).await; + + let count = spawn_count.load(Ordering::SeqCst); + assert_eq!( + count, 2, + "first run + one pending-batch run = 2 total spawns" + ); + } + + /// Stop cancels the running LLM and discards pending messages. + #[tokio::test] + async fn stop_cancels_run_and_clears_pending() { + let spawn_count = Arc::new(AtomicUsize::new(0)); + let dispatcher = Arc::new(ChatDispatcher::new(30)); + let session = "room3".to_string(); + + // Start a long run. + dispatcher.submit( + session.clone(), + "long-running".to_string(), + make_factory(Arc::clone(&spawn_count), 500), + ); + + // Wait for coalesce window to fire. + tokio::time::sleep(Duration::from_millis(80)).await; + + // Queue a pending message. + dispatcher.submit( + session.clone(), + "pending".to_string(), + make_factory(Arc::clone(&spawn_count), 50), + ); + + // Stop immediately. + dispatcher.stop(&session); + + // Wait longer than the run would have taken if not stopped. + tokio::time::sleep(Duration::from_millis(700)).await; + + let count = spawn_count.load(Ordering::SeqCst); + // The first run was started before stop (spawn_count=1). + // The pending message should NOT have produced a second spawn. + assert!( + count <= 1, + "stop should discard pending; got {count} spawns" + ); + } +} diff --git a/server/src/chat/mod.rs b/server/src/chat/mod.rs index 2a7c66f8..cdd2350b 100644 --- a/server/src/chat/mod.rs +++ b/server/src/chat/mod.rs @@ -6,6 +6,8 @@ /// Bot command registry and dispatch — parses and routes incoming chat messages. pub mod commands; +/// Protocol-agnostic chat dispatcher — coalesce window and per-session serial lock. +pub mod dispatcher; /// Chat history utilities — loading and serialising conversation history. pub mod history; pub(crate) mod lookup; diff --git a/server/src/chat/transport/matrix/bot/context.rs b/server/src/chat/transport/matrix/bot/context.rs index 5d419f1c..b6ce60c8 100644 --- a/server/src/chat/transport/matrix/bot/context.rs +++ b/server/src/chat/transport/matrix/bot/context.rs @@ -268,6 +268,7 @@ mod tests { pending_perm_replies: Arc::new(TokioMutex::new(HashMap::new())), permission_timeout_secs: 120, status: Arc::new(crate::service::status::StatusBroadcaster::new()), + chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)), }) } diff --git a/server/src/chat/transport/matrix/bot/messages/handle_message.rs b/server/src/chat/transport/matrix/bot/messages/handle_message.rs index 3275e0f9..67b73416 100644 --- a/server/src/chat/transport/matrix/bot/messages/handle_message.rs +++ b/server/src/chat/transport/matrix/bot/messages/handle_message.rs @@ -21,6 +21,7 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message( ctx: BotContext, sender: String, user_message: String, + mut cancel_rx: watch::Receiver, ) { // Look up the room's existing Claude Code session ID (if any) so we can // resume the conversation with structured API messages instead of @@ -68,9 +69,6 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message( ); let provider = ClaudeCodeProvider::new(); - let (cancel_tx, mut cancel_rx) = watch::channel(false); - // Keep the sender alive for the duration of the call. - let _cancel_tx = cancel_tx; // Channel for sending complete paragraphs to the Matrix posting task. let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); diff --git a/server/src/chat/transport/matrix/bot/messages/on_room_message.rs b/server/src/chat/transport/matrix/bot/messages/on_room_message.rs index f0858a23..b8851bb9 100644 --- a/server/src/chat/transport/matrix/bot/messages/on_room_message.rs +++ b/server/src/chat/transport/matrix/bot/messages/on_room_message.rs @@ -608,9 +608,56 @@ pub(in crate::chat::transport::matrix::bot) async fn on_room_message( return; } - // Spawn a separate task so the Matrix sync loop is not blocked while we - // wait for the LLM response (which can take several seconds). - tokio::spawn(async move { - handle_message(room_id_str, incoming_room_id, ctx, sender, user_message).await; - }); + // "stop" — cancel the running LLM turn for this session and clear pending queue. + { + let stripped = crate::chat::util::strip_bot_mention( + &user_message, + &ctx.services.bot_name, + ctx.matrix_user_id.as_str(), + ) + .trim() + .to_ascii_lowercase(); + if stripped == "stop" { + slog!("[matrix-bot] stop command from {sender} for session {room_id_str}"); + ctx.services.chat_dispatcher.stop(&room_id_str); + let msg = "Stopped."; + let html = markdown_to_html(msg); + if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, msg, &html).await + && let Ok(event_id) = msg_id.parse() + { + ctx.bot_sent_event_ids.lock().await.insert(event_id); + } + return; + } + } + + // Hand the message to the protocol-agnostic dispatcher instead of spawning + // directly. The dispatcher applies a coalesce window and a per-session + // serial lock, preventing duplicate concurrent Timmy spawns. + let ctx_for_factory = ctx.clone(); + let factory: crate::chat::dispatcher::SpawnFn = { + let room_id_str2 = room_id_str.clone(); + std::sync::Arc::new( + move |coalesced: String, cancel_rx: tokio::sync::watch::Receiver| { + let room_id_str = room_id_str2.clone(); + let incoming_room_id = incoming_room_id.clone(); + let ctx = ctx_for_factory.clone(); + let sender = sender.clone(); + Box::pin(async move { + handle_message( + room_id_str, + incoming_room_id, + ctx, + sender, + coalesced, + cancel_rx, + ) + .await; + }) + }, + ) + }; + ctx.services + .chat_dispatcher + .submit(room_id_str, user_message, factory); } diff --git a/server/src/chat/transport/matrix/bot/permission_listener.rs b/server/src/chat/transport/matrix/bot/permission_listener.rs index f4caa971..fca99f77 100644 --- a/server/src/chat/transport/matrix/bot/permission_listener.rs +++ b/server/src/chat/transport/matrix/bot/permission_listener.rs @@ -150,6 +150,7 @@ mod tests { pending_perm_replies: Arc::new(TokioMutex::new(HashMap::new())), permission_timeout_secs: 120, status: Arc::new(crate::service::status::StatusBroadcaster::new()), + chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)), }); (services, perm_tx) } diff --git a/server/src/chat/transport/matrix/config/types.rs b/server/src/chat/transport/matrix/config/types.rs index d3f3d206..1e291e6a 100644 --- a/server/src/chat/transport/matrix/config/types.rs +++ b/server/src/chat/transport/matrix/config/types.rs @@ -17,6 +17,11 @@ pub(super) fn default_aggregated_notifications_enabled() -> bool { true } +/// Default coalesce window for the chat dispatcher (1 500 ms). +pub(super) fn default_coalesce_window_ms() -> u64 { + 1_500 +} + pub(super) fn default_transport() -> String { "matrix".to_string() } @@ -187,4 +192,14 @@ pub struct BotConfig { /// Defaults to `true`. #[serde(default = "default_aggregated_notifications_enabled")] pub aggregated_notifications_enabled: bool, + + /// Duration in milliseconds of the chat dispatcher's coalesce window. + /// + /// Messages for the same session arriving within this window are + /// concatenated into a single `claude -p` call. The window is a + /// debounce: each new message extends the deadline by this duration. + /// + /// Defaults to 1 500 ms (1.5 s). + #[serde(default = "default_coalesce_window_ms")] + pub coalesce_window_ms: u64, } diff --git a/server/src/chat/transport/whatsapp/commands/mod.rs b/server/src/chat/transport/whatsapp/commands/mod.rs index 3106aa16..2d3f0753 100644 --- a/server/src/chat/transport/whatsapp/commands/mod.rs +++ b/server/src/chat/transport/whatsapp/commands/mod.rs @@ -310,6 +310,7 @@ mod tests { perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(Default::default())), permission_timeout_secs: 120, + chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)), }); Arc::new(WhatsAppWebhookContext { services, diff --git a/server/src/http/context.rs b/server/src/http/context.rs index c9fc2137..7db9e475 100644 --- a/server/src/http/context.rs +++ b/server/src/http/context.rs @@ -118,6 +118,7 @@ impl AppContext { )), permission_timeout_secs: 120, status: agents.status_broadcaster(), + chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)), }); Self { state: Arc::new(state), diff --git a/server/src/main.rs b/server/src/main.rs index 69021fa6..0e263648 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -238,6 +238,12 @@ async fn main() -> Result<(), std::io::Error> { .map(|c| c.permission_timeout_secs) .unwrap_or(120), status: agents.status_broadcaster(), + chat_dispatcher: std::sync::Arc::new(chat::dispatcher::ChatDispatcher::new( + bot_cfg + .as_ref() + .map(|c| c.coalesce_window_ms) + .unwrap_or(1_500), + )), }); // Sled uplink: forward permission requests to an upstream gateway when configured. diff --git a/server/src/service/bot_command/io.rs b/server/src/service/bot_command/io.rs index 43ba680b..7423de0a 100644 --- a/server/src/service/bot_command/io.rs +++ b/server/src/service/bot_command/io.rs @@ -147,6 +147,7 @@ pub(super) fn call_sync( pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())), permission_timeout_secs: 120, status: Arc::new(crate::service::status::StatusBroadcaster::new()), + chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)), }); let dispatch = CommandDispatch { diff --git a/server/src/service/gateway/io.rs b/server/src/service/gateway/io.rs index e41b285b..476a6494 100644 --- a/server/src/service/gateway/io.rs +++ b/server/src/service/gateway/io.rs @@ -558,6 +558,12 @@ pub fn spawn_gateway_bot( .as_ref() .map(|c| c.permission_timeout_secs) .unwrap_or(120), + chat_dispatcher: std::sync::Arc::new(crate::chat::dispatcher::ChatDispatcher::new( + bot_cfg + .as_ref() + .map(|c| c.coalesce_window_ms) + .unwrap_or(1_500), + )), }); let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load( diff --git a/server/src/services.rs b/server/src/services.rs index e71132f4..29df2506 100644 --- a/server/src/services.rs +++ b/server/src/services.rs @@ -6,6 +6,7 @@ //! transport's context struct. use crate::agents::AgentPool; +use crate::chat::dispatcher::ChatDispatcher; use crate::http::context::{PermissionDecision, PermissionForward}; use crate::service::status::StatusBroadcaster; use std::collections::{HashMap, HashSet}; @@ -44,6 +45,12 @@ pub struct Services { /// only to subscribers of this instance, providing natural multi-project /// isolation. pub status: Arc, + /// Protocol-agnostic chat dispatcher shared by all transport handlers. + /// + /// Transport handlers call [`ChatDispatcher::submit`] instead of spawning + /// `claude -p` directly. The dispatcher applies a coalesce window and a + /// per-session serial lock, preventing duplicate concurrent spawns. + pub chat_dispatcher: Arc, } #[cfg(test)] @@ -63,6 +70,7 @@ impl Services { perm_rx: std::sync::Arc::new(TokioMutex::new(perm_rx)), pending_perm_replies: std::sync::Arc::new(TokioMutex::new(HashMap::new())), permission_timeout_secs: 120, + chat_dispatcher: std::sync::Arc::new(ChatDispatcher::new(1_500)), }) } } diff --git a/server/src/sled_uplink.rs b/server/src/sled_uplink.rs index 163efadf..4f979cfe 100644 --- a/server/src/sled_uplink.rs +++ b/server/src/sled_uplink.rs @@ -532,6 +532,7 @@ mod tests { perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())), permission_timeout_secs: 120, + chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)), }); // Empty URL → noop; if it panicked or blocked the test would fail. spawn_uplink_task( @@ -603,6 +604,7 @@ mod tests { perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())), permission_timeout_secs: 120, + chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)), }); spawn_uplink_task( @@ -700,6 +702,7 @@ mod tests { perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())), permission_timeout_secs: 120, + chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)), }); spawn_uplink_task(