From c66016394b3b73184d8e2bb7111aca12da53563e Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 14 May 2026 21:48:14 +0000 Subject: [PATCH] huskies: merge 1063 --- .../src/chat/transport/matrix/bot/context.rs | 8 ++ .../matrix/bot/messages/handle_message.rs | 34 +++--- .../chat/transport/matrix/bot/messages/mod.rs | 64 +++++++++++ server/src/chat/transport/matrix/bot/run.rs | 42 ++++++-- server/src/service/gateway/polling.rs | 102 +++++++++++++++++- 5 files changed, 220 insertions(+), 30 deletions(-) diff --git a/server/src/chat/transport/matrix/bot/context.rs b/server/src/chat/transport/matrix/bot/context.rs index 582d6c6d..5d419f1c 100644 --- a/server/src/chat/transport/matrix/bot/context.rs +++ b/server/src/chat/transport/matrix/bot/context.rs @@ -101,6 +101,13 @@ pub struct BotContext { /// `` block at the head of the next user prompt so Timmy /// sees pipeline activity without requiring a separate message. pub pending_pipeline_events: Arc>>, + /// Gateway aggregate transition events buffered since the last LLM turn. + /// + /// In gateway mode a background task appends one compact audit line per + /// `GatewayStatusEvent` received from the gateway broadcaster. Drained + /// alongside `pending_pipeline_events` on each user message. Always + /// empty in standalone (non-gateway) mode. + pub pending_gateway_events: Arc>>, /// Bounded FIFO set of already-handled incoming event IDs. /// /// The Matrix sync loop can replay events on reconnect. This set ensures @@ -293,6 +300,7 @@ mod tests { gateway_projects, gateway_project_urls, pending_pipeline_events: Arc::new(TokioMutex::new(Vec::new())), + pending_gateway_events: Arc::new(TokioMutex::new(Vec::new())), handled_incoming_event_ids: Arc::new(TokioMutex::new(SeenEventIds::new( SEEN_EVENT_IDS_CAP, ))), diff --git a/server/src/chat/transport/matrix/bot/messages/handle_message.rs b/server/src/chat/transport/matrix/bot/messages/handle_message.rs index f42b8a71..0d56cc73 100644 --- a/server/src/chat/transport/matrix/bot/messages/handle_message.rs +++ b/server/src/chat/transport/matrix/bot/messages/handle_message.rs @@ -13,7 +13,7 @@ use super::super::context::BotContext; use super::super::format::markdown_to_html; use super::super::history::{ConversationEntry, ConversationRole, save_history}; -use super::format_user_prompt; +use super::{format_drained_events, format_user_prompt}; pub(in crate::chat::transport::matrix::bot) async fn handle_message( room_id_str: String, @@ -30,28 +30,18 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message( guard.get(&room_id).and_then(|conv| conv.session_id.clone()) }; - // Drain any pipeline transition events buffered since the last LLM turn and - // prepend them as a passive block so Timmy sees pipeline - // activity without requiring a separate message. Capped at 20 lines to - // keep context size bounded. - const MAX_PIPELINE_EVENTS: usize = 20; + // Drain pipeline and gateway transition events buffered since the last LLM + // turn and prepend them as a passive block so Timmy sees + // pipeline activity without requiring a separate message. Sled events come + // from `pending_pipeline_events`; gateway events from `pending_gateway_events`. + // In practice only one buffer is non-empty (sled mode vs gateway mode). let system_reminder_prefix = { - let mut guard = ctx.pending_pipeline_events.lock().await; - if guard.is_empty() { - String::new() - } else { - let total = guard.len(); - let lines: Vec = guard.drain(..).collect(); - drop(guard); - let shown_count = total.min(MAX_PIPELINE_EVENTS); - let shown = lines[..shown_count].join("\n"); - let tail = if total > MAX_PIPELINE_EVENTS { - format!("\n...and {} more", total - MAX_PIPELINE_EVENTS) - } else { - String::new() - }; - format!("\n{shown}{tail}\n\n") - } + let mut sled_guard = ctx.pending_pipeline_events.lock().await; + let mut gtw_guard = ctx.pending_gateway_events.lock().await; + let all_lines: Vec = sled_guard.drain(..).chain(gtw_guard.drain(..)).collect(); + drop(sled_guard); + drop(gtw_guard); + format_drained_events(all_lines) }; // The prompt is just the current message with sender attribution. diff --git a/server/src/chat/transport/matrix/bot/messages/mod.rs b/server/src/chat/transport/matrix/bot/messages/mod.rs index a3f92049..e3509efa 100644 --- a/server/src/chat/transport/matrix/bot/messages/mod.rs +++ b/server/src/chat/transport/matrix/bot/messages/mod.rs @@ -11,6 +11,27 @@ pub(super) fn format_user_prompt(sender: &str, message: &str) -> String { format!("{sender}: {message}") } +/// Drain `lines` into a `` block for injection at the head of +/// the next LLM prompt. Returns an empty string when `lines` is empty. +/// +/// At most 20 lines are shown verbatim; excess lines are replaced with a +/// `…and N more` indicator to keep context size bounded. +pub(in crate::chat::transport::matrix::bot) fn format_drained_events(lines: Vec) -> String { + if lines.is_empty() { + return String::new(); + } + const MAX_PIPELINE_EVENTS: usize = 20; + let total = lines.len(); + let shown_count = total.min(MAX_PIPELINE_EVENTS); + let shown = lines[..shown_count].join("\n"); + let tail = if total > MAX_PIPELINE_EVENTS { + format!("\n...and {} more", total - MAX_PIPELINE_EVENTS) + } else { + String::new() + }; + format!("\n{shown}{tail}\n\n") +} + /// Matrix event handler for room messages. Each invocation spawns an #[cfg(test)] mod tests { @@ -51,6 +72,49 @@ mod tests { assert!(crate::llm::oauth::extract_login_url_from_error(err).is_none()); } + // -- format_drained_events ---------------------------------------------- + + #[test] + fn format_drained_events_empty_returns_empty_string() { + assert_eq!(format_drained_events(vec![]), String::new()); + } + + #[test] + fn format_drained_events_wraps_in_system_reminder() { + let result = format_drained_events(vec!["audit ts=2026 id=1 event=x".to_string()]); + assert!(result.starts_with("\n"), "got: {result}"); + assert!(result.ends_with("\n"), "got: {result}"); + assert!( + result.contains("audit ts=2026 id=1 event=x"), + "got: {result}" + ); + } + + #[test] + fn format_drained_events_caps_at_20_with_overflow_indicator() { + let lines: Vec = (0..25).map(|i| format!("line {i}")).collect(); + let result = format_drained_events(lines); + assert!(result.contains("...and 5 more"), "got: {result}"); + assert!( + result.contains("line 19"), + "last shown line missing; got: {result}" + ); + assert!( + !result.contains("line 20"), + "line 21 must be hidden; got: {result}" + ); + } + + #[test] + fn format_drained_events_exactly_20_no_overflow_indicator() { + let lines: Vec = (0..20).map(|i| format!("line {i}")).collect(); + let result = format_drained_events(lines); + assert!( + !result.contains("...and"), + "must not show overflow when exactly 20; got: {result}" + ); + } + // -- bot_name / system prompt ------------------------------------------- #[test] diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index f350f104..1b1fe990 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -325,6 +325,38 @@ pub async fn run_bot( }); } + // Subscribe to gateway-side status events and buffer compact audit lines for + // the LLM context. A separate resubscribed receiver is used so both the + // buffer task and the room-forwarder task receive every event independently. + let pending_gateway_events: Arc>> = + Arc::new(TokioMutex::new(Vec::new())); + let gateway_event_rx_for_forwarder = if let Some(event_rx) = gateway_event_rx { + // Buffer task: silently accumulate compact audit lines for Timmy's context. + { + use crate::service::gateway::polling::format_gateway_audit_line; + let buf_rx = event_rx.resubscribe(); + let buf = Arc::clone(&pending_gateway_events); + tokio::spawn(async move { + let mut rx = buf_rx; + loop { + match rx.recv().await { + Ok(event) => { + let line = format_gateway_audit_line(&event.project, &event.event); + buf.lock().await.push(line); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + slog!("[matrix-bot] gateway event buffer lagged by {n} events"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); + } + Some(event_rx) + } else { + None + }; + let ctx = BotContext { services, matrix_user_id: bot_user_id, @@ -340,6 +372,7 @@ pub async fn run_bot( gateway_projects, gateway_project_urls, pending_pipeline_events, + pending_gateway_events, handled_incoming_event_ids: Arc::new(TokioMutex::new(super::context::SeenEventIds::new( super::context::SEEN_EVENT_IDS_CAP, ))), @@ -379,13 +412,8 @@ pub async fn run_bot( ); } - // In gateway mode, subscribe to the gateway-side status broadcaster and - // forward events to the configured Matrix rooms with a `[project-name]` prefix. - // This path delivers events pushed directly by project nodes over WebSocket - // (via `/gateway/events/push`), complementing the HTTP-polling path above. - // On broadcaster back-pressure (Lagged), the task re-subscribes automatically - // so it never permanently stalls. - if let Some(event_rx) = gateway_event_rx { + // Forwarder task: post gateway events to Matrix rooms with `[project-name]` prefix. + if let Some(event_rx) = gateway_event_rx_for_forwarder { let broadcast_room_ids: Vec = announce_room_ids.iter().map(|r| r.to_string()).collect(); crate::gateway::spawn_gateway_broadcaster_forwarder( diff --git a/server/src/service/gateway/polling.rs b/server/src/service/gateway/polling.rs index ce7e5763..5dfac222 100644 --- a/server/src/service/gateway/polling.rs +++ b/server/src/service/gateway/polling.rs @@ -4,7 +4,7 @@ //! with `[project-name]` prefixes. The actual I/O (HTTP polling, spawning //! tasks, sending messages) lives in `io.rs`. -use crate::pipeline_state::Stage; +use crate::pipeline_state::{Stage, stage_label}; use crate::service::events::StoredEvent; use crate::service::notifications::{ format_blocked_notification, format_error_notification, format_stage_notification, @@ -52,6 +52,46 @@ pub fn format_gateway_event(project_name: &str, event: &StoredEvent) -> (String, } } +/// Format a [`StoredEvent`] from a project as a compact audit line for LLM context. +/// +/// Produces a structured one-line entry with stable `key=value` fields, including +/// the project name, mirroring the sled-side `format_audit_entry` format. +pub fn format_gateway_audit_line(project: &str, event: &StoredEvent) -> String { + let ts_ms = event.timestamp_ms(); + let ts = chrono::DateTime::from_timestamp_millis(ts_ms as i64) + .unwrap_or_else(chrono::Utc::now) + .to_rfc3339_opts(chrono::SecondsFormat::Secs, true); + + match event { + StoredEvent::StageTransition { + story_id, + from_stage, + to_stage, + .. + } => { + let from_label = stage_label(&Stage::from_dir(from_stage).unwrap_or(Stage::Upcoming)); + let to_label = stage_label(&Stage::from_dir(to_stage).unwrap_or(Stage::Upcoming)); + format!( + "audit ts={ts} project={project} id={story_id} from={from_label} to={to_label} event=stage_transition" + ) + } + StoredEvent::MergeFailure { + story_id, reason, .. + } => { + format!( + "audit ts={ts} project={project} id={story_id} event=merge_failure reason={reason}" + ) + } + StoredEvent::StoryBlocked { + story_id, reason, .. + } => { + format!( + "audit ts={ts} project={project} id={story_id} event=story_blocked reason={reason}" + ) + } + } +} + // ── Tests ──────────────────────────────────────────────────────────────────── #[cfg(test)] @@ -188,4 +228,64 @@ mod tests { "should not have double spaces from empty name; got: {plain}" ); } + + // -- format_gateway_audit_line ------------------------------------------- + + #[test] + fn audit_line_stage_transition_contains_project_and_stages() { + let event = StoredEvent::StageTransition { + story_id: "42_story_feat".to_string(), + story_name: String::new(), + from_stage: "2_current".to_string(), + to_stage: "3_qa".to_string(), + timestamp_ms: 1_000_000, + }; + let line = format_gateway_audit_line("huskies", &event); + assert!( + line.starts_with("audit ts="), + "must start with audit ts=; got: {line}" + ); + assert!( + line.contains("project=huskies"), + "must contain project; got: {line}" + ); + assert!( + line.contains("id=42_story_feat"), + "must contain story id; got: {line}" + ); + assert!( + line.contains("event=stage_transition"), + "must name event; got: {line}" + ); + assert!(line.contains("from="), "must have from field; got: {line}"); + assert!(line.contains("to="), "must have to field; got: {line}"); + } + + #[test] + fn audit_line_merge_failure_contains_reason() { + let event = StoredEvent::MergeFailure { + story_id: "7_story_bar".to_string(), + story_name: String::new(), + reason: "conflict in main.rs".to_string(), + timestamp_ms: 2_000_000, + }; + let line = format_gateway_audit_line("robot-studio", &event); + assert!(line.contains("project=robot-studio"), "got: {line}"); + assert!(line.contains("event=merge_failure"), "got: {line}"); + assert!(line.contains("reason=conflict in main.rs"), "got: {line}"); + } + + #[test] + fn audit_line_story_blocked_contains_reason() { + let event = StoredEvent::StoryBlocked { + story_id: "3_story_baz".to_string(), + story_name: String::new(), + reason: "retry limit exceeded".to_string(), + timestamp_ms: 3_000_000, + }; + let line = format_gateway_audit_line("proj", &event); + assert!(line.contains("project=proj"), "got: {line}"); + assert!(line.contains("event=story_blocked"), "got: {line}"); + assert!(line.contains("reason=retry limit exceeded"), "got: {line}"); + } }