huskies: merge 1093 bug Chat dispatcher spawns one Timmy per inbound message — needs coalesce window + per-session serial lock

This commit is contained in:
dave
2026-05-15 11:57:00 +00:00
parent 01e60a670c
commit fc5481dbe4
15 changed files with 466 additions and 8 deletions
+1
View File
@@ -78,6 +78,7 @@ pub(super) fn build_agent_app_context(
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
permission_timeout_secs: 120,
status: agents.status_broadcaster(),
chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)),
});
crate::http::context::AppContext {
state: Arc::new(state),
+367
View File
@@ -0,0 +1,367 @@
//! Protocol-agnostic chat dispatcher — coalesce window + per-session serial lock.
//!
//! Sits between every inbound transport (Matrix, Slack, WhatsApp, …) and the
//! `claude -p` spawner. Transport handlers call [`ChatDispatcher::submit`]
//! instead of spawning directly; the dispatcher enforces two invariants:
//!
//! 1. **Coalesce window**: messages arriving for the same session within
//! `coalesce_ms` of each other are concatenated and delivered to a single
//! spawn. The window is a *debounce*: each new message extends the window by
//! `coalesce_ms` from its arrival time, so bursts flush as one batch.
//!
//! 2. **Per-session serial lock**: while one `claude -p` run is active, further
//! messages for that session queue up and are dispatched as a single batch
//! once the running invocation completes.
//!
//! A [`ChatDispatcher::stop`] call cancels the active run for a session and
//! discards the pending queue.
use crate::slog;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::{mpsc, watch};
/// A factory function that produces one LLM execution future per dispatch.
///
/// Arguments:
/// - `String` — the (possibly concatenated) prompt to send to `claude -p`.
/// - `watch::Receiver<bool>` — send `true` on this channel to cancel the run.
///
/// Returns a boxed, pinned `Send + 'static` future that resolves when the LLM
/// session ends (whether normally or via cancellation).
pub type SpawnFn = Arc<
dyn Fn(
String,
watch::Receiver<bool>,
) -> Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>
+ Send
+ Sync,
>;
enum SessionMsg {
UserMessage { text: String, factory: SpawnFn },
Stop,
}
struct SessionHandle {
tx: mpsc::UnboundedSender<SessionMsg>,
}
/// Coalescing, serialising dispatcher for chat-to-LLM message routing.
///
/// Construct once at startup via [`ChatDispatcher::new`] and share via `Arc`.
/// Call [`submit`](ChatDispatcher::submit) from every transport handler instead
/// of spawning `claude -p` directly.
pub struct ChatDispatcher {
sessions: Mutex<HashMap<String, SessionHandle>>,
coalesce_ms: u64,
}
impl ChatDispatcher {
/// Create a new dispatcher with the given coalesce window in milliseconds.
pub fn new(coalesce_ms: u64) -> Self {
Self {
sessions: Mutex::new(HashMap::new()),
coalesce_ms,
}
}
/// Submit a message for a chat session.
///
/// If no session task exists for `session_key`, one is created lazily.
/// The `factory` is called by the session task when the coalesce window
/// closes (or immediately after the current run finishes, for pending
/// messages).
pub fn submit(&self, session_key: String, message: String, factory: SpawnFn) {
let mut guard = self.sessions.lock().unwrap();
let coalesce_ms = self.coalesce_ms;
let handle = guard.entry(session_key.clone()).or_insert_with(|| {
let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(session_task(session_key.clone(), rx, coalesce_ms));
SessionHandle { tx }
});
let _ = handle.tx.send(SessionMsg::UserMessage {
text: message,
factory,
});
}
/// Stop the active LLM run for `session_key` and clear its pending queue.
///
/// Returns `true` if the session existed (whether or not anything was
/// actually running), `false` if no session for that key has been created.
pub fn stop(&self, session_key: &str) -> bool {
let guard = self.sessions.lock().unwrap();
if let Some(handle) = guard.get(session_key) {
let _ = handle.tx.send(SessionMsg::Stop);
true
} else {
false
}
}
}
/// Per-session background task.
///
/// Phases:
/// 1. **Wait** — blocks until the first `UserMessage` arrives.
/// 2. **Coalesce** — extends the window by `coalesce_ms` on each new message;
/// fires when no message arrives within the window.
/// 3. **Run** — calls the factory with the concatenated batch; while running,
/// collects further `UserMessage`s into a pending list and logs a warn per
/// message. A `Stop` message cancels the running call and clears pending.
/// 4. **Drain** — after the run, if pending is non-empty, fires a second run
/// with the accumulated batch and loops back to step 3.
/// 5. Returns to step 1 when pending is empty.
async fn session_task(
session_key: String,
mut rx: mpsc::UnboundedReceiver<SessionMsg>,
coalesce_ms: u64,
) {
let coalesce_dur = Duration::from_millis(coalesce_ms);
loop {
// ── Phase 1: wait for the first message ─────────────────────────────
let (first_text, first_factory) = loop {
match rx.recv().await {
None => return,
Some(SessionMsg::Stop) => continue,
Some(SessionMsg::UserMessage { text, factory }) => break (text, factory),
}
};
// ── Phase 2: coalesce window (debounce) ──────────────────────────────
let mut batch: Vec<String> = vec![first_text];
let mut latest_factory: SpawnFn = first_factory;
let mut deadline = tokio::time::Instant::now() + coalesce_dur;
'coalesce: loop {
let now = tokio::time::Instant::now();
if now >= deadline {
break 'coalesce;
}
let remaining = deadline - now;
match tokio::time::timeout(remaining, rx.recv()).await {
Err(_) => break 'coalesce, // window closed
Ok(None) => return, // channel closed → exit task
Ok(Some(SessionMsg::Stop)) => {
batch.clear();
break 'coalesce;
}
Ok(Some(SessionMsg::UserMessage { text, factory })) => {
batch.push(text);
latest_factory = factory;
// Extend deadline on each new message (debounce).
deadline = tokio::time::Instant::now() + coalesce_dur;
}
}
}
if batch.is_empty() {
continue; // Stop received during coalesce — restart
}
// ── Phase 3 + 4: run → drain pending → repeat ───────────────────────
let mut prompt = batch.join("\n\n");
let mut factory = latest_factory;
loop {
let (cancel_tx, cancel_rx) = watch::channel(false);
let llm_fut = factory(prompt, cancel_rx);
let mut llm_task = tokio::spawn(llm_fut);
let mut pending_texts: Vec<String> = vec![];
let mut pending_factory: Option<SpawnFn> = None;
let mut stopped = false;
// Wait for the LLM to finish, collecting messages that arrive during the run.
loop {
tokio::select! {
_ = &mut llm_task => { break; }
msg = rx.recv() => {
match msg {
None => {
llm_task.abort();
return;
}
Some(SessionMsg::Stop) => {
let _ = cancel_tx.send(true);
let _ = llm_task.await;
pending_texts.clear();
stopped = true;
break;
}
Some(SessionMsg::UserMessage { text, factory: f }) => {
pending_texts.push(text);
let depth = pending_texts.len();
slog!(
"[chat-dispatcher] coalescing message for session={}, queue_depth={}",
session_key,
depth,
);
pending_factory = Some(f);
}
}
}
}
}
if stopped || pending_texts.is_empty() {
break; // back to Phase 1
}
// Fire the pending batch as the next run (no additional coalesce window).
prompt = pending_texts.join("\n\n");
factory = pending_factory.unwrap();
}
}
}
// ── Tests ─────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
fn make_factory(spawn_count: Arc<AtomicUsize>, run_ms: u64) -> SpawnFn {
Arc::new(move |_prompt: String, _cancel_rx: watch::Receiver<bool>| {
let count = Arc::clone(&spawn_count);
Box::pin(async move {
count.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(run_ms)).await;
})
})
}
/// AC 6 regression: three messages arriving 200 ms / (long gap) / (after run)
/// apart on the same session must produce at most two spawns, never three
/// concurrent processes.
///
/// Setup:
/// coalesce_ms = 50 ms (short window so test runs fast)
/// LLM "run" = 150 ms
/// msg1 @ t=0
/// msg2 @ t=20 ms — within coalesce window, merged with msg1 → 1 spawn
/// msg3 @ t=300 ms — after run completes → 2nd spawn
///
/// Expected: exactly 2 spawns, never 3.
#[tokio::test]
async fn three_messages_never_three_concurrent_spawns() {
let spawn_count = Arc::new(AtomicUsize::new(0));
let dispatcher = Arc::new(ChatDispatcher::new(50));
let session = "room1".to_string();
// msg1 at t=0
dispatcher.submit(
session.clone(),
"msg1".to_string(),
make_factory(Arc::clone(&spawn_count), 150),
);
// msg2 at t=20 ms — inside the 50 ms coalesce window
tokio::time::sleep(Duration::from_millis(20)).await;
dispatcher.submit(
session.clone(),
"msg2".to_string(),
make_factory(Arc::clone(&spawn_count), 150),
);
// msg3 at t=300 ms — after the coalesce window fires (t≈70 ms) and the
// 150 ms run completes (t≈220 ms), so msg3 starts a second coalesce cycle.
tokio::time::sleep(Duration::from_millis(280)).await;
dispatcher.submit(
session.clone(),
"msg3".to_string(),
make_factory(Arc::clone(&spawn_count), 150),
);
// Wait long enough for both runs to finish.
tokio::time::sleep(Duration::from_millis(500)).await;
let count = spawn_count.load(Ordering::SeqCst);
assert!(
(1..=2).contains(&count),
"expected 1 or 2 spawns (msgs 1+2 coalesced, msg3 separate), got {count}"
);
}
/// Messages that arrive while the LLM is running are not lost — they are
/// delivered as a single follow-up spawn once the first run completes.
#[tokio::test]
async fn pending_messages_dispatched_after_run_completes() {
let spawn_count = Arc::new(AtomicUsize::new(0));
let dispatcher = Arc::new(ChatDispatcher::new(50));
let session = "room2".to_string();
// First message — starts a 200 ms run.
dispatcher.submit(
session.clone(),
"first".to_string(),
make_factory(Arc::clone(&spawn_count), 200),
);
// Wait for coalesce window to fire, then send two more.
tokio::time::sleep(Duration::from_millis(100)).await;
dispatcher.submit(
session.clone(),
"second".to_string(),
make_factory(Arc::clone(&spawn_count), 50),
);
dispatcher.submit(
session.clone(),
"third".to_string(),
make_factory(Arc::clone(&spawn_count), 50),
);
// Wait long enough for both runs.
tokio::time::sleep(Duration::from_millis(600)).await;
let count = spawn_count.load(Ordering::SeqCst);
assert_eq!(
count, 2,
"first run + one pending-batch run = 2 total spawns"
);
}
/// Stop cancels the running LLM and discards pending messages.
#[tokio::test]
async fn stop_cancels_run_and_clears_pending() {
let spawn_count = Arc::new(AtomicUsize::new(0));
let dispatcher = Arc::new(ChatDispatcher::new(30));
let session = "room3".to_string();
// Start a long run.
dispatcher.submit(
session.clone(),
"long-running".to_string(),
make_factory(Arc::clone(&spawn_count), 500),
);
// Wait for coalesce window to fire.
tokio::time::sleep(Duration::from_millis(80)).await;
// Queue a pending message.
dispatcher.submit(
session.clone(),
"pending".to_string(),
make_factory(Arc::clone(&spawn_count), 50),
);
// Stop immediately.
dispatcher.stop(&session);
// Wait longer than the run would have taken if not stopped.
tokio::time::sleep(Duration::from_millis(700)).await;
let count = spawn_count.load(Ordering::SeqCst);
// The first run was started before stop (spawn_count=1).
// The pending message should NOT have produced a second spawn.
assert!(
count <= 1,
"stop should discard pending; got {count} spawns"
);
}
}
+2
View File
@@ -6,6 +6,8 @@
/// Bot command registry and dispatch — parses and routes incoming chat messages.
pub mod commands;
/// Protocol-agnostic chat dispatcher — coalesce window and per-session serial lock.
pub mod dispatcher;
/// Chat history utilities — loading and serialising conversation history.
pub mod history;
pub(crate) mod lookup;
@@ -268,6 +268,7 @@ mod tests {
pending_perm_replies: Arc::new(TokioMutex::new(HashMap::new())),
permission_timeout_secs: 120,
status: Arc::new(crate::service::status::StatusBroadcaster::new()),
chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)),
})
}
@@ -21,6 +21,7 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message(
ctx: BotContext,
sender: String,
user_message: String,
mut cancel_rx: watch::Receiver<bool>,
) {
// Look up the room's existing Claude Code session ID (if any) so we can
// resume the conversation with structured API messages instead of
@@ -68,9 +69,6 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message(
);
let provider = ClaudeCodeProvider::new();
let (cancel_tx, mut cancel_rx) = watch::channel(false);
// Keep the sender alive for the duration of the call.
let _cancel_tx = cancel_tx;
// Channel for sending complete paragraphs to the Matrix posting task.
let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
@@ -608,9 +608,56 @@ pub(in crate::chat::transport::matrix::bot) async fn on_room_message(
return;
}
// Spawn a separate task so the Matrix sync loop is not blocked while we
// wait for the LLM response (which can take several seconds).
tokio::spawn(async move {
handle_message(room_id_str, incoming_room_id, ctx, sender, user_message).await;
});
// "stop" — cancel the running LLM turn for this session and clear pending queue.
{
let stripped = crate::chat::util::strip_bot_mention(
&user_message,
&ctx.services.bot_name,
ctx.matrix_user_id.as_str(),
)
.trim()
.to_ascii_lowercase();
if stripped == "stop" {
slog!("[matrix-bot] stop command from {sender} for session {room_id_str}");
ctx.services.chat_dispatcher.stop(&room_id_str);
let msg = "Stopped.";
let html = markdown_to_html(msg);
if let Ok(msg_id) = ctx.transport.send_message(&room_id_str, msg, &html).await
&& let Ok(event_id) = msg_id.parse()
{
ctx.bot_sent_event_ids.lock().await.insert(event_id);
}
return;
}
}
// Hand the message to the protocol-agnostic dispatcher instead of spawning
// directly. The dispatcher applies a coalesce window and a per-session
// serial lock, preventing duplicate concurrent Timmy spawns.
let ctx_for_factory = ctx.clone();
let factory: crate::chat::dispatcher::SpawnFn = {
let room_id_str2 = room_id_str.clone();
std::sync::Arc::new(
move |coalesced: String, cancel_rx: tokio::sync::watch::Receiver<bool>| {
let room_id_str = room_id_str2.clone();
let incoming_room_id = incoming_room_id.clone();
let ctx = ctx_for_factory.clone();
let sender = sender.clone();
Box::pin(async move {
handle_message(
room_id_str,
incoming_room_id,
ctx,
sender,
coalesced,
cancel_rx,
)
.await;
})
},
)
};
ctx.services
.chat_dispatcher
.submit(room_id_str, user_message, factory);
}
@@ -150,6 +150,7 @@ mod tests {
pending_perm_replies: Arc::new(TokioMutex::new(HashMap::new())),
permission_timeout_secs: 120,
status: Arc::new(crate::service::status::StatusBroadcaster::new()),
chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)),
});
(services, perm_tx)
}
@@ -17,6 +17,11 @@ pub(super) fn default_aggregated_notifications_enabled() -> bool {
true
}
/// Default coalesce window for the chat dispatcher (1 500 ms).
pub(super) fn default_coalesce_window_ms() -> u64 {
1_500
}
pub(super) fn default_transport() -> String {
"matrix".to_string()
}
@@ -187,4 +192,14 @@ pub struct BotConfig {
/// Defaults to `true`.
#[serde(default = "default_aggregated_notifications_enabled")]
pub aggregated_notifications_enabled: bool,
/// Duration in milliseconds of the chat dispatcher's coalesce window.
///
/// Messages for the same session arriving within this window are
/// concatenated into a single `claude -p` call. The window is a
/// debounce: each new message extends the deadline by this duration.
///
/// Defaults to 1 500 ms (1.5 s).
#[serde(default = "default_coalesce_window_ms")]
pub coalesce_window_ms: u64,
}
@@ -310,6 +310,7 @@ mod tests {
perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)),
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(Default::default())),
permission_timeout_secs: 120,
chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)),
});
Arc::new(WhatsAppWebhookContext {
services,
+1
View File
@@ -118,6 +118,7 @@ impl AppContext {
)),
permission_timeout_secs: 120,
status: agents.status_broadcaster(),
chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)),
});
Self {
state: Arc::new(state),
+6
View File
@@ -238,6 +238,12 @@ async fn main() -> Result<(), std::io::Error> {
.map(|c| c.permission_timeout_secs)
.unwrap_or(120),
status: agents.status_broadcaster(),
chat_dispatcher: std::sync::Arc::new(chat::dispatcher::ChatDispatcher::new(
bot_cfg
.as_ref()
.map(|c| c.coalesce_window_ms)
.unwrap_or(1_500),
)),
});
// Sled uplink: forward permission requests to an upstream gateway when configured.
+1
View File
@@ -147,6 +147,7 @@ pub(super) fn call_sync(
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
permission_timeout_secs: 120,
status: Arc::new(crate::service::status::StatusBroadcaster::new()),
chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)),
});
let dispatch = CommandDispatch {
+6
View File
@@ -558,6 +558,12 @@ pub fn spawn_gateway_bot(
.as_ref()
.map(|c| c.permission_timeout_secs)
.unwrap_or(120),
chat_dispatcher: std::sync::Arc::new(crate::chat::dispatcher::ChatDispatcher::new(
bot_cfg
.as_ref()
.map(|c| c.coalesce_window_ms)
.unwrap_or(1_500),
)),
});
let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load(
+8
View File
@@ -6,6 +6,7 @@
//! transport's context struct.
use crate::agents::AgentPool;
use crate::chat::dispatcher::ChatDispatcher;
use crate::http::context::{PermissionDecision, PermissionForward};
use crate::service::status::StatusBroadcaster;
use std::collections::{HashMap, HashSet};
@@ -44,6 +45,12 @@ pub struct Services {
/// only to subscribers of this instance, providing natural multi-project
/// isolation.
pub status: Arc<StatusBroadcaster>,
/// Protocol-agnostic chat dispatcher shared by all transport handlers.
///
/// Transport handlers call [`ChatDispatcher::submit`] instead of spawning
/// `claude -p` directly. The dispatcher applies a coalesce window and a
/// per-session serial lock, preventing duplicate concurrent spawns.
pub chat_dispatcher: Arc<ChatDispatcher>,
}
#[cfg(test)]
@@ -63,6 +70,7 @@ impl Services {
perm_rx: std::sync::Arc::new(TokioMutex::new(perm_rx)),
pending_perm_replies: std::sync::Arc::new(TokioMutex::new(HashMap::new())),
permission_timeout_secs: 120,
chat_dispatcher: std::sync::Arc::new(ChatDispatcher::new(1_500)),
})
}
}
+3
View File
@@ -532,6 +532,7 @@ mod tests {
perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)),
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
permission_timeout_secs: 120,
chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)),
});
// Empty URL → noop; if it panicked or blocked the test would fail.
spawn_uplink_task(
@@ -603,6 +604,7 @@ mod tests {
perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)),
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
permission_timeout_secs: 120,
chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)),
});
spawn_uplink_task(
@@ -700,6 +702,7 @@ mod tests {
perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)),
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
permission_timeout_secs: 120,
chat_dispatcher: Arc::new(crate::chat::dispatcher::ChatDispatcher::new(1_500)),
});
spawn_uplink_task(