huskies: merge 1127 story Migrate all LLM-invoking transports onto assemble_prompt_context; delete legacy Vec

This commit is contained in:
dave
2026-05-17 22:23:15 +00:00
parent c97b7c841f
commit fe00fe6a25
9 changed files with 94 additions and 286 deletions
+5 -174
View File
@@ -303,93 +303,11 @@ pub async fn run_bot(
);
}
// Subscribe to pipeline stage transitions and buffer compact audit lines
// between Timmy's turns. Replay events (before == after stage label) are
// silently dropped — only real transitions are recorded.
let pending_pipeline_events: Arc<TokioMutex<Vec<String>>> =
Arc::new(TokioMutex::new(Vec::new()));
{
use crate::pipeline_state::{format_audit_entry, stage_label, subscribe_transitions};
let mut rx = subscribe_transitions();
let buf = Arc::clone(&pending_pipeline_events);
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(fired) => {
if stage_label(&fired.before) == stage_label(&fired.after) {
continue;
}
let line = format_audit_entry(&fired);
buf.lock().await.push(line);
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
slog!("[matrix-bot] pipeline event buffer lagged by {n} events");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
}
// Subscribe to gateway-side status events and buffer compact audit lines for
// the LLM context.
//
// Investigation log (story 1078) — hypotheses ruled out:
// (A) gateway_event_rx is None: impossible — spawn_gateway_bot always passes
// Some(state.event_tx.clone()) in gateway mode (gateway/mod.rs:130).
// (B) recv() never returns: buf task uses the ORIGINAL event_rx (subscribed
// before Matrix init) so any events buffered during init are visible;
// future events arrive normally via the shared broadcast channel.
// (C) Different Arc: buf and ctx.pending_gateway_events are both clones of
// the same Arc<TokioMutex<Vec<String>>> — writes in the buf task are
// immediately visible to handle_message.
// (D) format_drained_events empty on non-empty input: the function is
// pure/tested; the drain slog in handle_message now makes the count
// observable so we can confirm it is non-zero when events arrive.
//
// Bug fixed here: previously the buffer task held `event_rx.resubscribe()`,
// which starts at the *current tail* (next unsent message) and silently
// discards every event that arrived during the Matrix login / room-join /
// cross-signing phase (~530 s window). The forwarder now gets the
// resubscribed receiver (only needs live events going forward); the buffer
// task holds the original `event_rx` so it drains the init-window backlog
// on first poll.
let pending_gateway_events: Arc<TokioMutex<Vec<String>>> =
Arc::new(TokioMutex::new(Vec::new()));
let gateway_event_rx_for_forwarder = if let Some(event_rx) = gateway_event_rx {
// The forwarder only needs live (future) events — resubscribe is fine.
let forwarder_rx = event_rx.resubscribe();
// Buffer task: hold the *original* receiver so init-window events are
// not lost. Silently accumulate compact audit lines for Timmy's context.
{
use crate::service::gateway::polling::format_gateway_audit_line;
let buf = Arc::clone(&pending_gateway_events);
slog!("[matrix-bot] subscribed to gateway events; buffer task starting");
tokio::spawn(async move {
let mut rx = event_rx;
loop {
match rx.recv().await {
Ok(event) => {
slog!(
"[matrix-bot] buffered audit line for project={} id={}",
event.project,
event.event.timestamp_ms()
);
let line = format_gateway_audit_line(&event.project, &event.event);
buf.lock().await.push(line);
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
slog!("[matrix-bot] gateway event buffer lagged by {n} events");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
}
Some(forwarder_rx)
} else {
None
};
// The forwarder only needs live (future) events — resubscribe is fine.
// Pipeline-transition context is now delivered to the LLM via
// `assemble_prompt_context` (CRDT event log) rather than these in-memory
// buffers, so the buffer tasks are gone; only the forwarder remains.
let gateway_event_rx_for_forwarder = gateway_event_rx.map(|rx| rx.resubscribe());
let ctx = BotContext {
services,
@@ -405,8 +323,6 @@ pub async fn run_bot(
gateway_active_project,
gateway_project_urls,
gateway_projects_store,
pending_pipeline_events,
pending_gateway_events,
handled_incoming_event_ids: Arc::new(TokioMutex::new(super::context::SeenEventIds::new(
super::context::SEEN_EVENT_IDS_CAP,
))),
@@ -626,89 +542,4 @@ mod tests {
assert_eq!(steps[2], 20);
assert_eq!(steps[3], 40);
}
/// Regression test (story 1078): gateway broadcast events must reach
/// `pending_gateway_events` and produce an `audit ts=…` line in the
/// `format_drained_events` output that is prepended to Timmy's prompt.
///
/// The test spins up a mock `event_tx` broadcaster, sends one
/// `StageTransition` event, lets the buffer task process it, drains the
/// buffer, and asserts the result contains the expected audit prefix.
#[tokio::test]
async fn gateway_buffer_task_injects_audit_line_into_context() {
use super::super::messages::format_drained_events;
use crate::service::events::StoredEvent;
use crate::service::gateway::GatewayStatusEvent;
use crate::service::gateway::polling::format_gateway_audit_line;
let (event_tx, event_rx) = tokio::sync::broadcast::channel::<GatewayStatusEvent>(16);
// pending_gateway_events shared between buffer task and drain site.
let pending: Arc<TokioMutex<Vec<String>>> = Arc::new(TokioMutex::new(Vec::new()));
// Spawn a minimal buffer task — same logic as run_bot uses.
{
let buf = Arc::clone(&pending);
tokio::spawn(async move {
let mut rx = event_rx;
loop {
match rx.recv().await {
Ok(event) => {
let line = format_gateway_audit_line(&event.project, &event.event);
buf.lock().await.push(line);
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
}
// Send one stage-transition event, as a project node would.
let evt = GatewayStatusEvent {
project: "huskies".to_string(),
event: StoredEvent::StageTransition {
story_id: "42_story_feat".to_string(),
story_name: String::new(),
from_stage: "2_current".to_string(),
to_stage: "3_qa".to_string(),
timestamp_ms: 1_000_000,
},
};
let receivers = event_tx.send(evt).unwrap_or(0);
assert!(
receivers > 0,
"event must have at least one active receiver"
);
// Wait for the buffer task to process the event.
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
if !pending.lock().await.is_empty() {
break;
}
assert!(
std::time::Instant::now() < deadline,
"buffer task did not receive the event within 2 s"
);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
// Drain and format — mirrors what handle_message does.
let lines: Vec<String> = pending.lock().await.drain(..).collect();
let prefix = format_drained_events(lines);
assert!(
prefix.contains("audit ts="),
"prompt prefix must contain 'audit ts='; got: {prefix}"
);
assert!(
prefix.contains("project=huskies"),
"prompt prefix must name the project; got: {prefix}"
);
assert!(
prefix.starts_with("<system-reminder>\n"),
"prefix must open with <system-reminder>; got: {prefix}"
);
}
}