diff --git a/server/src/chat/transport/matrix/bot/context.rs b/server/src/chat/transport/matrix/bot/context.rs index a3c29085..793ad356 100644 --- a/server/src/chat/transport/matrix/bot/context.rs +++ b/server/src/chat/transport/matrix/bot/context.rs @@ -51,6 +51,13 @@ pub struct BotContext { /// Used to proxy bot commands to the active project over WebSocket (`/ws`). /// Empty in standalone mode. pub gateway_project_urls: BTreeMap, + /// Pipeline transition events buffered since the last LLM turn. + /// + /// A background task appends one compact audit line per real stage + /// transition. `handle_message` drains this buffer and injects it as a + /// `` block at the head of the next user prompt so Timmy + /// sees pipeline activity without requiring a separate message. + pub pending_pipeline_events: Arc>>, } impl BotContext { @@ -236,6 +243,7 @@ mod tests { gateway_active_project, gateway_projects, gateway_project_urls, + pending_pipeline_events: Arc::new(TokioMutex::new(Vec::new())), } } diff --git a/server/src/chat/transport/matrix/bot/messages/handle_message.rs b/server/src/chat/transport/matrix/bot/messages/handle_message.rs index 8ef116cf..f42b8a71 100644 --- a/server/src/chat/transport/matrix/bot/messages/handle_message.rs +++ b/server/src/chat/transport/matrix/bot/messages/handle_message.rs @@ -30,6 +30,30 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message( guard.get(&room_id).and_then(|conv| conv.session_id.clone()) }; + // Drain any pipeline transition events buffered since the last LLM turn and + // prepend them as a passive block so Timmy sees pipeline + // activity without requiring a separate message. Capped at 20 lines to + // keep context size bounded. + const MAX_PIPELINE_EVENTS: usize = 20; + let system_reminder_prefix = { + let mut guard = ctx.pending_pipeline_events.lock().await; + if guard.is_empty() { + String::new() + } else { + let total = guard.len(); + let lines: Vec = guard.drain(..).collect(); + drop(guard); + let shown_count = total.min(MAX_PIPELINE_EVENTS); + let shown = lines[..shown_count].join("\n"); + let tail = if total > MAX_PIPELINE_EVENTS { + format!("\n...and {} more", total - MAX_PIPELINE_EVENTS) + } else { + String::new() + }; + format!("\n{shown}{tail}\n\n") + } + }; + // The prompt is just the current message with sender attribution. // Prior conversation context is carried by the Claude Code session. let bot_name = &ctx.services.bot_name; @@ -40,7 +64,7 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message( String::new() }; let prompt = format!( - "[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n{active_project_ctx}\n{}", + "{system_reminder_prefix}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n{active_project_ctx}\n{}", format_user_prompt(&sender, &user_message) ); diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index c8746ce2..9cc43bbb 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -295,6 +295,34 @@ pub async fn run_bot( ); } + // Subscribe to pipeline stage transitions and buffer compact audit lines + // between Timmy's turns. Replay events (before == after stage label) are + // silently dropped — only real transitions are recorded. + let pending_pipeline_events: Arc>> = + Arc::new(TokioMutex::new(Vec::new())); + { + use crate::pipeline_state::{format_audit_entry, stage_label, subscribe_transitions}; + let mut rx = subscribe_transitions(); + let buf = Arc::clone(&pending_pipeline_events); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(fired) => { + if stage_label(&fired.before) == stage_label(&fired.after) { + continue; + } + let line = format_audit_entry(&fired); + buf.lock().await.push(line); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + slog!("[matrix-bot] pipeline event buffer lagged by {n} events"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); + } + let ctx = BotContext { services, matrix_user_id: bot_user_id, @@ -309,6 +337,7 @@ pub async fn run_bot( gateway_active_project, gateway_projects, gateway_project_urls, + pending_pipeline_events, }; slog!(