diff --git a/server/src/chat/transport/matrix/bot/messages/handle_message.rs b/server/src/chat/transport/matrix/bot/messages/handle_message.rs index 0d56cc73..3275e0f9 100644 --- a/server/src/chat/transport/matrix/bot/messages/handle_message.rs +++ b/server/src/chat/transport/matrix/bot/messages/handle_message.rs @@ -41,7 +41,16 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message( let all_lines: Vec = sled_guard.drain(..).chain(gtw_guard.drain(..)).collect(); drop(sled_guard); drop(gtw_guard); - format_drained_events(all_lines) + slog!( + "[matrix-bot] drained {} gateway audit lines for LLM context", + all_lines.len() + ); + let prefix = format_drained_events(all_lines); + slog!( + "[matrix-bot] format_drained_events output: {} bytes", + prefix.len() + ); + prefix }; // The prompt is just the current message with sender attribution. diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index 1b1fe990..0616840a 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -326,21 +326,49 @@ pub async fn run_bot( } // Subscribe to gateway-side status events and buffer compact audit lines for - // the LLM context. A separate resubscribed receiver is used so both the - // buffer task and the room-forwarder task receive every event independently. + // the LLM context. + // + // Investigation log (story 1078) — hypotheses ruled out: + // (A) gateway_event_rx is None: impossible — spawn_gateway_bot always passes + // Some(state.event_tx.clone()) in gateway mode (gateway/mod.rs:130). + // (B) recv() never returns: buf task uses the ORIGINAL event_rx (subscribed + // before Matrix init) so any events buffered during init are visible; + // future events arrive normally via the shared broadcast channel. + // (C) Different Arc: buf and ctx.pending_gateway_events are both clones of + // the same Arc>> — writes in the buf task are + // immediately visible to handle_message. + // (D) format_drained_events empty on non-empty input: the function is + // pure/tested; the drain slog in handle_message now makes the count + // observable so we can confirm it is non-zero when events arrive. + // + // Bug fixed here: previously the buffer task held `event_rx.resubscribe()`, + // which starts at the *current tail* (next unsent message) and silently + // discards every event that arrived during the Matrix login / room-join / + // cross-signing phase (~5–30 s window). The forwarder now gets the + // resubscribed receiver (only needs live events going forward); the buffer + // task holds the original `event_rx` so it drains the init-window backlog + // on first poll. let pending_gateway_events: Arc>> = Arc::new(TokioMutex::new(Vec::new())); let gateway_event_rx_for_forwarder = if let Some(event_rx) = gateway_event_rx { - // Buffer task: silently accumulate compact audit lines for Timmy's context. + // The forwarder only needs live (future) events — resubscribe is fine. + let forwarder_rx = event_rx.resubscribe(); + // Buffer task: hold the *original* receiver so init-window events are + // not lost. Silently accumulate compact audit lines for Timmy's context. { use crate::service::gateway::polling::format_gateway_audit_line; - let buf_rx = event_rx.resubscribe(); let buf = Arc::clone(&pending_gateway_events); + slog!("[matrix-bot] subscribed to gateway events; buffer task starting"); tokio::spawn(async move { - let mut rx = buf_rx; + let mut rx = event_rx; loop { match rx.recv().await { Ok(event) => { + slog!( + "[matrix-bot] buffered audit line for project={} id={}", + event.project, + event.event.timestamp_ms() + ); let line = format_gateway_audit_line(&event.project, &event.event); buf.lock().await.push(line); } @@ -352,7 +380,7 @@ pub async fn run_bot( } }); } - Some(event_rx) + Some(forwarder_rx) } else { None }; @@ -592,4 +620,89 @@ mod tests { assert_eq!(steps[2], 20); assert_eq!(steps[3], 40); } + + /// Regression test (story 1078): gateway broadcast events must reach + /// `pending_gateway_events` and produce an `audit ts=…` line in the + /// `format_drained_events` output that is prepended to Timmy's prompt. + /// + /// The test spins up a mock `event_tx` broadcaster, sends one + /// `StageTransition` event, lets the buffer task process it, drains the + /// buffer, and asserts the result contains the expected audit prefix. + #[tokio::test] + async fn gateway_buffer_task_injects_audit_line_into_context() { + use super::super::messages::format_drained_events; + use crate::service::events::StoredEvent; + use crate::service::gateway::GatewayStatusEvent; + use crate::service::gateway::polling::format_gateway_audit_line; + + let (event_tx, event_rx) = tokio::sync::broadcast::channel::(16); + + // pending_gateway_events shared between buffer task and drain site. + let pending: Arc>> = Arc::new(TokioMutex::new(Vec::new())); + + // Spawn a minimal buffer task — same logic as run_bot uses. + { + let buf = Arc::clone(&pending); + tokio::spawn(async move { + let mut rx = event_rx; + loop { + match rx.recv().await { + Ok(event) => { + let line = format_gateway_audit_line(&event.project, &event.event); + buf.lock().await.push(line); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); + } + + // Send one stage-transition event, as a project node would. + let evt = GatewayStatusEvent { + project: "huskies".to_string(), + event: StoredEvent::StageTransition { + story_id: "42_story_feat".to_string(), + story_name: String::new(), + from_stage: "2_current".to_string(), + to_stage: "3_qa".to_string(), + timestamp_ms: 1_000_000, + }, + }; + let receivers = event_tx.send(evt).unwrap_or(0); + assert!( + receivers > 0, + "event must have at least one active receiver" + ); + + // Wait for the buffer task to process the event. + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2); + loop { + if !pending.lock().await.is_empty() { + break; + } + assert!( + std::time::Instant::now() < deadline, + "buffer task did not receive the event within 2 s" + ); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + + // Drain and format — mirrors what handle_message does. + let lines: Vec = pending.lock().await.drain(..).collect(); + let prefix = format_drained_events(lines); + + assert!( + prefix.contains("audit ts="), + "prompt prefix must contain 'audit ts='; got: {prefix}" + ); + assert!( + prefix.contains("project=huskies"), + "prompt prefix must name the project; got: {prefix}" + ); + assert!( + prefix.starts_with("\n"), + "prefix must open with ; got: {prefix}" + ); + } }