huskies: merge 1063

This commit is contained in:
dave
2026-05-14 21:48:14 +00:00
parent 23c3301903
commit c66016394b
5 changed files with 220 additions and 30 deletions
@@ -101,6 +101,13 @@ pub struct BotContext {
/// `<system-reminder>` block at the head of the next user prompt so Timmy
/// sees pipeline activity without requiring a separate message.
pub pending_pipeline_events: Arc<TokioMutex<Vec<String>>>,
/// Gateway aggregate transition events buffered since the last LLM turn.
///
/// In gateway mode a background task appends one compact audit line per
/// `GatewayStatusEvent` received from the gateway broadcaster. Drained
/// alongside `pending_pipeline_events` on each user message. Always
/// empty in standalone (non-gateway) mode.
pub pending_gateway_events: Arc<TokioMutex<Vec<String>>>,
/// Bounded FIFO set of already-handled incoming event IDs.
///
/// The Matrix sync loop can replay events on reconnect. This set ensures
@@ -293,6 +300,7 @@ mod tests {
gateway_projects,
gateway_project_urls,
pending_pipeline_events: Arc::new(TokioMutex::new(Vec::new())),
pending_gateway_events: Arc::new(TokioMutex::new(Vec::new())),
handled_incoming_event_ids: Arc::new(TokioMutex::new(SeenEventIds::new(
SEEN_EVENT_IDS_CAP,
))),
@@ -13,7 +13,7 @@ use super::super::context::BotContext;
use super::super::format::markdown_to_html;
use super::super::history::{ConversationEntry, ConversationRole, save_history};
use super::format_user_prompt;
use super::{format_drained_events, format_user_prompt};
pub(in crate::chat::transport::matrix::bot) async fn handle_message(
room_id_str: String,
@@ -30,28 +30,18 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message(
guard.get(&room_id).and_then(|conv| conv.session_id.clone())
};
// Drain any pipeline transition events buffered since the last LLM turn and
// prepend them as a passive <system-reminder> block so Timmy sees pipeline
// activity without requiring a separate message. Capped at 20 lines to
// keep context size bounded.
const MAX_PIPELINE_EVENTS: usize = 20;
// Drain pipeline and gateway transition events buffered since the last LLM
// turn and prepend them as a passive <system-reminder> block so Timmy sees
// pipeline activity without requiring a separate message. Sled events come
// from `pending_pipeline_events`; gateway events from `pending_gateway_events`.
// In practice only one buffer is non-empty (sled mode vs gateway mode).
let system_reminder_prefix = {
let mut guard = ctx.pending_pipeline_events.lock().await;
if guard.is_empty() {
String::new()
} else {
let total = guard.len();
let lines: Vec<String> = guard.drain(..).collect();
drop(guard);
let shown_count = total.min(MAX_PIPELINE_EVENTS);
let shown = lines[..shown_count].join("\n");
let tail = if total > MAX_PIPELINE_EVENTS {
format!("\n...and {} more", total - MAX_PIPELINE_EVENTS)
} else {
String::new()
};
format!("<system-reminder>\n{shown}{tail}\n</system-reminder>\n")
}
let mut sled_guard = ctx.pending_pipeline_events.lock().await;
let mut gtw_guard = ctx.pending_gateway_events.lock().await;
let all_lines: Vec<String> = sled_guard.drain(..).chain(gtw_guard.drain(..)).collect();
drop(sled_guard);
drop(gtw_guard);
format_drained_events(all_lines)
};
// The prompt is just the current message with sender attribution.
@@ -11,6 +11,27 @@ pub(super) fn format_user_prompt(sender: &str, message: &str) -> String {
format!("{sender}: {message}")
}
/// Drain `lines` into a `<system-reminder>` block for injection at the head of
/// the next LLM prompt. Returns an empty string when `lines` is empty.
///
/// At most 20 lines are shown verbatim; excess lines are replaced with a
/// `…and N more` indicator to keep context size bounded.
pub(in crate::chat::transport::matrix::bot) fn format_drained_events(lines: Vec<String>) -> String {
if lines.is_empty() {
return String::new();
}
const MAX_PIPELINE_EVENTS: usize = 20;
let total = lines.len();
let shown_count = total.min(MAX_PIPELINE_EVENTS);
let shown = lines[..shown_count].join("\n");
let tail = if total > MAX_PIPELINE_EVENTS {
format!("\n...and {} more", total - MAX_PIPELINE_EVENTS)
} else {
String::new()
};
format!("<system-reminder>\n{shown}{tail}\n</system-reminder>\n")
}
/// Matrix event handler for room messages. Each invocation spawns an
#[cfg(test)]
mod tests {
@@ -51,6 +72,49 @@ mod tests {
assert!(crate::llm::oauth::extract_login_url_from_error(err).is_none());
}
// -- format_drained_events ----------------------------------------------
#[test]
fn format_drained_events_empty_returns_empty_string() {
assert_eq!(format_drained_events(vec![]), String::new());
}
#[test]
fn format_drained_events_wraps_in_system_reminder() {
let result = format_drained_events(vec!["audit ts=2026 id=1 event=x".to_string()]);
assert!(result.starts_with("<system-reminder>\n"), "got: {result}");
assert!(result.ends_with("</system-reminder>\n"), "got: {result}");
assert!(
result.contains("audit ts=2026 id=1 event=x"),
"got: {result}"
);
}
#[test]
fn format_drained_events_caps_at_20_with_overflow_indicator() {
let lines: Vec<String> = (0..25).map(|i| format!("line {i}")).collect();
let result = format_drained_events(lines);
assert!(result.contains("...and 5 more"), "got: {result}");
assert!(
result.contains("line 19"),
"last shown line missing; got: {result}"
);
assert!(
!result.contains("line 20"),
"line 21 must be hidden; got: {result}"
);
}
#[test]
fn format_drained_events_exactly_20_no_overflow_indicator() {
let lines: Vec<String> = (0..20).map(|i| format!("line {i}")).collect();
let result = format_drained_events(lines);
assert!(
!result.contains("...and"),
"must not show overflow when exactly 20; got: {result}"
);
}
// -- bot_name / system prompt -------------------------------------------
#[test]
+35 -7
View File
@@ -325,6 +325,38 @@ pub async fn run_bot(
});
}
// Subscribe to gateway-side status events and buffer compact audit lines for
// the LLM context. A separate resubscribed receiver is used so both the
// buffer task and the room-forwarder task receive every event independently.
let pending_gateway_events: Arc<TokioMutex<Vec<String>>> =
Arc::new(TokioMutex::new(Vec::new()));
let gateway_event_rx_for_forwarder = if let Some(event_rx) = gateway_event_rx {
// Buffer task: silently accumulate compact audit lines for Timmy's context.
{
use crate::service::gateway::polling::format_gateway_audit_line;
let buf_rx = event_rx.resubscribe();
let buf = Arc::clone(&pending_gateway_events);
tokio::spawn(async move {
let mut rx = buf_rx;
loop {
match rx.recv().await {
Ok(event) => {
let line = format_gateway_audit_line(&event.project, &event.event);
buf.lock().await.push(line);
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
slog!("[matrix-bot] gateway event buffer lagged by {n} events");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
}
Some(event_rx)
} else {
None
};
let ctx = BotContext {
services,
matrix_user_id: bot_user_id,
@@ -340,6 +372,7 @@ pub async fn run_bot(
gateway_projects,
gateway_project_urls,
pending_pipeline_events,
pending_gateway_events,
handled_incoming_event_ids: Arc::new(TokioMutex::new(super::context::SeenEventIds::new(
super::context::SEEN_EVENT_IDS_CAP,
))),
@@ -379,13 +412,8 @@ pub async fn run_bot(
);
}
// In gateway mode, subscribe to the gateway-side status broadcaster and
// forward events to the configured Matrix rooms with a `[project-name]` prefix.
// This path delivers events pushed directly by project nodes over WebSocket
// (via `/gateway/events/push`), complementing the HTTP-polling path above.
// On broadcaster back-pressure (Lagged), the task re-subscribes automatically
// so it never permanently stalls.
if let Some(event_rx) = gateway_event_rx {
// Forwarder task: post gateway events to Matrix rooms with `[project-name]` prefix.
if let Some(event_rx) = gateway_event_rx_for_forwarder {
let broadcast_room_ids: Vec<String> =
announce_room_ids.iter().map(|r| r.to_string()).collect();
crate::gateway::spawn_gateway_broadcaster_forwarder(