huskies: merge 1078

This commit is contained in:
dave
2026-05-15 01:27:30 +00:00
parent a06bf6778b
commit 56179d712e
2 changed files with 129 additions and 7 deletions
@@ -41,7 +41,16 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message(
let all_lines: Vec<String> = sled_guard.drain(..).chain(gtw_guard.drain(..)).collect(); let all_lines: Vec<String> = sled_guard.drain(..).chain(gtw_guard.drain(..)).collect();
drop(sled_guard); drop(sled_guard);
drop(gtw_guard); drop(gtw_guard);
format_drained_events(all_lines) slog!(
"[matrix-bot] drained {} gateway audit lines for LLM context",
all_lines.len()
);
let prefix = format_drained_events(all_lines);
slog!(
"[matrix-bot] format_drained_events output: {} bytes",
prefix.len()
);
prefix
}; };
// The prompt is just the current message with sender attribution. // The prompt is just the current message with sender attribution.
+119 -6
View File
@@ -326,21 +326,49 @@ pub async fn run_bot(
} }
// Subscribe to gateway-side status events and buffer compact audit lines for // Subscribe to gateway-side status events and buffer compact audit lines for
// the LLM context. A separate resubscribed receiver is used so both the // the LLM context.
// buffer task and the room-forwarder task receive every event independently. //
// Investigation log (story 1078) — hypotheses ruled out:
// (A) gateway_event_rx is None: impossible — spawn_gateway_bot always passes
// Some(state.event_tx.clone()) in gateway mode (gateway/mod.rs:130).
// (B) recv() never returns: buf task uses the ORIGINAL event_rx (subscribed
// before Matrix init) so any events buffered during init are visible;
// future events arrive normally via the shared broadcast channel.
// (C) Different Arc: buf and ctx.pending_gateway_events are both clones of
// the same Arc<TokioMutex<Vec<String>>> — writes in the buf task are
// immediately visible to handle_message.
// (D) format_drained_events empty on non-empty input: the function is
// pure/tested; the drain slog in handle_message now makes the count
// observable so we can confirm it is non-zero when events arrive.
//
// Bug fixed here: previously the buffer task held `event_rx.resubscribe()`,
// which starts at the *current tail* (next unsent message) and silently
// discards every event that arrived during the Matrix login / room-join /
// cross-signing phase (~530 s window). The forwarder now gets the
// resubscribed receiver (only needs live events going forward); the buffer
// task holds the original `event_rx` so it drains the init-window backlog
// on first poll.
let pending_gateway_events: Arc<TokioMutex<Vec<String>>> = let pending_gateway_events: Arc<TokioMutex<Vec<String>>> =
Arc::new(TokioMutex::new(Vec::new())); Arc::new(TokioMutex::new(Vec::new()));
let gateway_event_rx_for_forwarder = if let Some(event_rx) = gateway_event_rx { let gateway_event_rx_for_forwarder = if let Some(event_rx) = gateway_event_rx {
// Buffer task: silently accumulate compact audit lines for Timmy's context. // The forwarder only needs live (future) events — resubscribe is fine.
let forwarder_rx = event_rx.resubscribe();
// Buffer task: hold the *original* receiver so init-window events are
// not lost. Silently accumulate compact audit lines for Timmy's context.
{ {
use crate::service::gateway::polling::format_gateway_audit_line; use crate::service::gateway::polling::format_gateway_audit_line;
let buf_rx = event_rx.resubscribe();
let buf = Arc::clone(&pending_gateway_events); let buf = Arc::clone(&pending_gateway_events);
slog!("[matrix-bot] subscribed to gateway events; buffer task starting");
tokio::spawn(async move { tokio::spawn(async move {
let mut rx = buf_rx; let mut rx = event_rx;
loop { loop {
match rx.recv().await { match rx.recv().await {
Ok(event) => { Ok(event) => {
slog!(
"[matrix-bot] buffered audit line for project={} id={}",
event.project,
event.event.timestamp_ms()
);
let line = format_gateway_audit_line(&event.project, &event.event); let line = format_gateway_audit_line(&event.project, &event.event);
buf.lock().await.push(line); buf.lock().await.push(line);
} }
@@ -352,7 +380,7 @@ pub async fn run_bot(
} }
}); });
} }
Some(event_rx) Some(forwarder_rx)
} else { } else {
None None
}; };
@@ -592,4 +620,89 @@ mod tests {
assert_eq!(steps[2], 20); assert_eq!(steps[2], 20);
assert_eq!(steps[3], 40); assert_eq!(steps[3], 40);
} }
/// Regression test (story 1078): gateway broadcast events must reach
/// `pending_gateway_events` and produce an `audit ts=…` line in the
/// `format_drained_events` output that is prepended to Timmy's prompt.
///
/// The test spins up a mock `event_tx` broadcaster, sends one
/// `StageTransition` event, lets the buffer task process it, drains the
/// buffer, and asserts the result contains the expected audit prefix.
#[tokio::test]
async fn gateway_buffer_task_injects_audit_line_into_context() {
use super::super::messages::format_drained_events;
use crate::service::events::StoredEvent;
use crate::service::gateway::GatewayStatusEvent;
use crate::service::gateway::polling::format_gateway_audit_line;
let (event_tx, event_rx) = tokio::sync::broadcast::channel::<GatewayStatusEvent>(16);
// pending_gateway_events shared between buffer task and drain site.
let pending: Arc<TokioMutex<Vec<String>>> = Arc::new(TokioMutex::new(Vec::new()));
// Spawn a minimal buffer task — same logic as run_bot uses.
{
let buf = Arc::clone(&pending);
tokio::spawn(async move {
let mut rx = event_rx;
loop {
match rx.recv().await {
Ok(event) => {
let line = format_gateway_audit_line(&event.project, &event.event);
buf.lock().await.push(line);
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
}
// Send one stage-transition event, as a project node would.
let evt = GatewayStatusEvent {
project: "huskies".to_string(),
event: StoredEvent::StageTransition {
story_id: "42_story_feat".to_string(),
story_name: String::new(),
from_stage: "2_current".to_string(),
to_stage: "3_qa".to_string(),
timestamp_ms: 1_000_000,
},
};
let receivers = event_tx.send(evt).unwrap_or(0);
assert!(
receivers > 0,
"event must have at least one active receiver"
);
// Wait for the buffer task to process the event.
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
if !pending.lock().await.is_empty() {
break;
}
assert!(
std::time::Instant::now() < deadline,
"buffer task did not receive the event within 2 s"
);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
// Drain and format — mirrors what handle_message does.
let lines: Vec<String> = pending.lock().await.drain(..).collect();
let prefix = format_drained_events(lines);
assert!(
prefix.contains("audit ts="),
"prompt prefix must contain 'audit ts='; got: {prefix}"
);
assert!(
prefix.contains("project=huskies"),
"prompt prefix must name the project; got: {prefix}"
);
assert!(
prefix.starts_with("<system-reminder>\n"),
"prefix must open with <system-reminder>; got: {prefix}"
);
}
} }