diff --git a/server/src/chat/transport/discord/commands.rs b/server/src/chat/transport/discord/commands.rs index 0b2d4d41..dc5a3d0a 100644 --- a/server/src/chat/transport/discord/commands.rs +++ b/server/src/chat/transport/discord/commands.rs @@ -300,6 +300,20 @@ pub(super) async fn handle_incoming_message( handle_llm_message(ctx, channel, user, message).await; } +/// Build the prompt for a Discord LLM turn, prepending any pending +/// CRDT pipeline-transition events as a `` block. +fn build_discord_llm_prompt( + session_id: &str, + bot_name: &str, + user: &str, + user_message: &str, +) -> String { + let event_ctx = crate::llm_session::assemble_prompt_context(session_id); + format!( + "{event_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{user}: {user_message}" + ) +} + /// Forward a message to Claude Code and send the response back via Discord. async fn handle_llm_message(ctx: &DiscordContext, channel: &str, user: &str, user_message: &str) { use crate::chat::util::drain_complete_paragraphs; @@ -314,8 +328,11 @@ async fn handle_llm_message(ctx: &DiscordContext, channel: &str, user: &str, use }; let bot_name = &ctx.services.bot_name; - let prompt = format!( - "[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{user}: {user_message}" + let prompt = build_discord_llm_prompt( + resume_session_id.as_deref().unwrap_or(channel), + bot_name, + user, + user_message, ); let provider = ClaudeCodeProvider::new(); @@ -604,4 +621,40 @@ mod tests { assert!(conv.session_id.is_none(), "session_id should be cleared"); assert!(conv.entries.is_empty(), "entries should be cleared"); } + + /// AC 4: fire a `TransitionFired` event, simulate a Discord user turn, and + /// assert the assembled prompt contains the event (end-to-end non-Matrix test). + #[test] + fn discord_prompt_includes_transition_event() { + use crate::pipeline_state::{PipelineEvent, PlanState, Stage, StoryId, TransitionFired}; + crate::crdt_state::init_for_test(); + + crate::event_log::log_transition_event(&TransitionFired { + story_id: StoryId("77_discord_test".to_string()), + before: Stage::Backlog, + after: Stage::Coding { + claim: None, + plan: PlanState::Missing, + retries: 0, + }, + event: PipelineEvent::DepsMet, + at: chrono::Utc::now(), + }); + + let prompt = + build_discord_llm_prompt("discord-ch-test", "Timmy", "@alice", "what is the status?"); + + assert!( + prompt.contains(""), + "assembled prompt must include system-reminder block; got: {prompt}" + ); + assert!( + prompt.contains("77_discord_test"), + "assembled prompt must contain story id; got: {prompt}" + ); + assert!( + prompt.contains("what is the status?"), + "assembled prompt must contain user message; got: {prompt}" + ); + } } diff --git a/server/src/chat/transport/matrix/bot/context.rs b/server/src/chat/transport/matrix/bot/context.rs index f26ee918..29822293 100644 --- a/server/src/chat/transport/matrix/bot/context.rs +++ b/server/src/chat/transport/matrix/bot/context.rs @@ -97,20 +97,6 @@ pub struct BotContext { /// The `new project` command writes here so HTTP handlers see the new entry /// immediately without requiring a gateway restart. `None` in standalone mode. pub gateway_projects_store: Option>>>, - /// Pipeline transition events buffered since the last LLM turn. - /// - /// A background task appends one compact audit line per real stage - /// transition. `handle_message` drains this buffer and injects it as a - /// `` block at the head of the next user prompt so Timmy - /// sees pipeline activity without requiring a separate message. - pub pending_pipeline_events: Arc>>, - /// Gateway aggregate transition events buffered since the last LLM turn. - /// - /// In gateway mode a background task appends one compact audit line per - /// `GatewayStatusEvent` received from the gateway broadcaster. Drained - /// alongside `pending_pipeline_events` on each user message. Always - /// empty in standalone (non-gateway) mode. - pub pending_gateway_events: Arc>>, /// Bounded FIFO set of already-handled incoming event IDs. /// /// The Matrix sync loop can replay events on reconnect. This set ensures @@ -302,8 +288,6 @@ mod tests { gateway_active_project, gateway_project_urls, gateway_projects_store: None, - pending_pipeline_events: Arc::new(TokioMutex::new(Vec::new())), - pending_gateway_events: Arc::new(TokioMutex::new(Vec::new())), handled_incoming_event_ids: Arc::new(TokioMutex::new(SeenEventIds::new( SEEN_EVENT_IDS_CAP, ))), diff --git a/server/src/chat/transport/matrix/bot/messages/handle_message.rs b/server/src/chat/transport/matrix/bot/messages/handle_message.rs index 0010d83b..c04d8514 100644 --- a/server/src/chat/transport/matrix/bot/messages/handle_message.rs +++ b/server/src/chat/transport/matrix/bot/messages/handle_message.rs @@ -13,7 +13,7 @@ use super::super::context::BotContext; use super::super::format::markdown_to_html; use super::super::history::{ConversationEntry, ConversationRole, save_history}; -use super::{format_drained_events, format_user_prompt}; +use super::format_user_prompt; pub(in crate::chat::transport::matrix::bot) async fn handle_message( room_id_str: String, @@ -31,29 +31,6 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message( guard.get(&room_id).and_then(|conv| conv.session_id.clone()) }; - // Drain pipeline and gateway transition events buffered since the last LLM - // turn and prepend them as a passive block so Timmy sees - // pipeline activity without requiring a separate message. Sled events come - // from `pending_pipeline_events`; gateway events from `pending_gateway_events`. - // In practice only one buffer is non-empty (sled mode vs gateway mode). - let system_reminder_prefix = { - let mut sled_guard = ctx.pending_pipeline_events.lock().await; - let mut gtw_guard = ctx.pending_gateway_events.lock().await; - let all_lines: Vec = sled_guard.drain(..).chain(gtw_guard.drain(..)).collect(); - drop(sled_guard); - drop(gtw_guard); - slog!( - "[matrix-bot] drained {} gateway audit lines for LLM context", - all_lines.len() - ); - let prefix = format_drained_events(all_lines); - slog!( - "[matrix-bot] format_drained_events output: {} bytes", - prefix.len() - ); - prefix - }; - // Pull new pipeline-transition events from the CRDT event log for this // session and atomically advance the high-water marks so the same events // are not re-injected on the next turn. @@ -69,7 +46,7 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message( String::new() }; let prompt = format!( - "{system_reminder_prefix}{event_log_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n{active_project_ctx}\n{}", + "{event_log_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n{active_project_ctx}\n{}", format_user_prompt(&sender, &user_message) ); diff --git a/server/src/chat/transport/matrix/bot/messages/mod.rs b/server/src/chat/transport/matrix/bot/messages/mod.rs index e3509efa..a3f92049 100644 --- a/server/src/chat/transport/matrix/bot/messages/mod.rs +++ b/server/src/chat/transport/matrix/bot/messages/mod.rs @@ -11,27 +11,6 @@ pub(super) fn format_user_prompt(sender: &str, message: &str) -> String { format!("{sender}: {message}") } -/// Drain `lines` into a `` block for injection at the head of -/// the next LLM prompt. Returns an empty string when `lines` is empty. -/// -/// At most 20 lines are shown verbatim; excess lines are replaced with a -/// `…and N more` indicator to keep context size bounded. -pub(in crate::chat::transport::matrix::bot) fn format_drained_events(lines: Vec) -> String { - if lines.is_empty() { - return String::new(); - } - const MAX_PIPELINE_EVENTS: usize = 20; - let total = lines.len(); - let shown_count = total.min(MAX_PIPELINE_EVENTS); - let shown = lines[..shown_count].join("\n"); - let tail = if total > MAX_PIPELINE_EVENTS { - format!("\n...and {} more", total - MAX_PIPELINE_EVENTS) - } else { - String::new() - }; - format!("\n{shown}{tail}\n\n") -} - /// Matrix event handler for room messages. Each invocation spawns an #[cfg(test)] mod tests { @@ -72,49 +51,6 @@ mod tests { assert!(crate::llm::oauth::extract_login_url_from_error(err).is_none()); } - // -- format_drained_events ---------------------------------------------- - - #[test] - fn format_drained_events_empty_returns_empty_string() { - assert_eq!(format_drained_events(vec![]), String::new()); - } - - #[test] - fn format_drained_events_wraps_in_system_reminder() { - let result = format_drained_events(vec!["audit ts=2026 id=1 event=x".to_string()]); - assert!(result.starts_with("\n"), "got: {result}"); - assert!(result.ends_with("\n"), "got: {result}"); - assert!( - result.contains("audit ts=2026 id=1 event=x"), - "got: {result}" - ); - } - - #[test] - fn format_drained_events_caps_at_20_with_overflow_indicator() { - let lines: Vec = (0..25).map(|i| format!("line {i}")).collect(); - let result = format_drained_events(lines); - assert!(result.contains("...and 5 more"), "got: {result}"); - assert!( - result.contains("line 19"), - "last shown line missing; got: {result}" - ); - assert!( - !result.contains("line 20"), - "line 21 must be hidden; got: {result}" - ); - } - - #[test] - fn format_drained_events_exactly_20_no_overflow_indicator() { - let lines: Vec = (0..20).map(|i| format!("line {i}")).collect(); - let result = format_drained_events(lines); - assert!( - !result.contains("...and"), - "must not show overflow when exactly 20; got: {result}" - ); - } - // -- bot_name / system prompt ------------------------------------------- #[test] diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index 8dc3a58a..32c5253c 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -303,93 +303,11 @@ pub async fn run_bot( ); } - // Subscribe to pipeline stage transitions and buffer compact audit lines - // between Timmy's turns. Replay events (before == after stage label) are - // silently dropped — only real transitions are recorded. - let pending_pipeline_events: Arc>> = - Arc::new(TokioMutex::new(Vec::new())); - { - use crate::pipeline_state::{format_audit_entry, stage_label, subscribe_transitions}; - let mut rx = subscribe_transitions(); - let buf = Arc::clone(&pending_pipeline_events); - tokio::spawn(async move { - loop { - match rx.recv().await { - Ok(fired) => { - if stage_label(&fired.before) == stage_label(&fired.after) { - continue; - } - let line = format_audit_entry(&fired); - buf.lock().await.push(line); - } - Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - slog!("[matrix-bot] pipeline event buffer lagged by {n} events"); - } - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - } - } - }); - } - - // Subscribe to gateway-side status events and buffer compact audit lines for - // the LLM context. - // - // Investigation log (story 1078) — hypotheses ruled out: - // (A) gateway_event_rx is None: impossible — spawn_gateway_bot always passes - // Some(state.event_tx.clone()) in gateway mode (gateway/mod.rs:130). - // (B) recv() never returns: buf task uses the ORIGINAL event_rx (subscribed - // before Matrix init) so any events buffered during init are visible; - // future events arrive normally via the shared broadcast channel. - // (C) Different Arc: buf and ctx.pending_gateway_events are both clones of - // the same Arc>> — writes in the buf task are - // immediately visible to handle_message. - // (D) format_drained_events empty on non-empty input: the function is - // pure/tested; the drain slog in handle_message now makes the count - // observable so we can confirm it is non-zero when events arrive. - // - // Bug fixed here: previously the buffer task held `event_rx.resubscribe()`, - // which starts at the *current tail* (next unsent message) and silently - // discards every event that arrived during the Matrix login / room-join / - // cross-signing phase (~5–30 s window). The forwarder now gets the - // resubscribed receiver (only needs live events going forward); the buffer - // task holds the original `event_rx` so it drains the init-window backlog - // on first poll. - let pending_gateway_events: Arc>> = - Arc::new(TokioMutex::new(Vec::new())); - let gateway_event_rx_for_forwarder = if let Some(event_rx) = gateway_event_rx { - // The forwarder only needs live (future) events — resubscribe is fine. - let forwarder_rx = event_rx.resubscribe(); - // Buffer task: hold the *original* receiver so init-window events are - // not lost. Silently accumulate compact audit lines for Timmy's context. - { - use crate::service::gateway::polling::format_gateway_audit_line; - let buf = Arc::clone(&pending_gateway_events); - slog!("[matrix-bot] subscribed to gateway events; buffer task starting"); - tokio::spawn(async move { - let mut rx = event_rx; - loop { - match rx.recv().await { - Ok(event) => { - slog!( - "[matrix-bot] buffered audit line for project={} id={}", - event.project, - event.event.timestamp_ms() - ); - let line = format_gateway_audit_line(&event.project, &event.event); - buf.lock().await.push(line); - } - Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { - slog!("[matrix-bot] gateway event buffer lagged by {n} events"); - } - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - } - } - }); - } - Some(forwarder_rx) - } else { - None - }; + // The forwarder only needs live (future) events — resubscribe is fine. + // Pipeline-transition context is now delivered to the LLM via + // `assemble_prompt_context` (CRDT event log) rather than these in-memory + // buffers, so the buffer tasks are gone; only the forwarder remains. + let gateway_event_rx_for_forwarder = gateway_event_rx.map(|rx| rx.resubscribe()); let ctx = BotContext { services, @@ -405,8 +323,6 @@ pub async fn run_bot( gateway_active_project, gateway_project_urls, gateway_projects_store, - pending_pipeline_events, - pending_gateway_events, handled_incoming_event_ids: Arc::new(TokioMutex::new(super::context::SeenEventIds::new( super::context::SEEN_EVENT_IDS_CAP, ))), @@ -626,89 +542,4 @@ mod tests { assert_eq!(steps[2], 20); assert_eq!(steps[3], 40); } - - /// Regression test (story 1078): gateway broadcast events must reach - /// `pending_gateway_events` and produce an `audit ts=…` line in the - /// `format_drained_events` output that is prepended to Timmy's prompt. - /// - /// The test spins up a mock `event_tx` broadcaster, sends one - /// `StageTransition` event, lets the buffer task process it, drains the - /// buffer, and asserts the result contains the expected audit prefix. - #[tokio::test] - async fn gateway_buffer_task_injects_audit_line_into_context() { - use super::super::messages::format_drained_events; - use crate::service::events::StoredEvent; - use crate::service::gateway::GatewayStatusEvent; - use crate::service::gateway::polling::format_gateway_audit_line; - - let (event_tx, event_rx) = tokio::sync::broadcast::channel::(16); - - // pending_gateway_events shared between buffer task and drain site. - let pending: Arc>> = Arc::new(TokioMutex::new(Vec::new())); - - // Spawn a minimal buffer task — same logic as run_bot uses. - { - let buf = Arc::clone(&pending); - tokio::spawn(async move { - let mut rx = event_rx; - loop { - match rx.recv().await { - Ok(event) => { - let line = format_gateway_audit_line(&event.project, &event.event); - buf.lock().await.push(line); - } - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - } - } - }); - } - - // Send one stage-transition event, as a project node would. - let evt = GatewayStatusEvent { - project: "huskies".to_string(), - event: StoredEvent::StageTransition { - story_id: "42_story_feat".to_string(), - story_name: String::new(), - from_stage: "2_current".to_string(), - to_stage: "3_qa".to_string(), - timestamp_ms: 1_000_000, - }, - }; - let receivers = event_tx.send(evt).unwrap_or(0); - assert!( - receivers > 0, - "event must have at least one active receiver" - ); - - // Wait for the buffer task to process the event. - let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2); - loop { - if !pending.lock().await.is_empty() { - break; - } - assert!( - std::time::Instant::now() < deadline, - "buffer task did not receive the event within 2 s" - ); - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - } - - // Drain and format — mirrors what handle_message does. - let lines: Vec = pending.lock().await.drain(..).collect(); - let prefix = format_drained_events(lines); - - assert!( - prefix.contains("audit ts="), - "prompt prefix must contain 'audit ts='; got: {prefix}" - ); - assert!( - prefix.contains("project=huskies"), - "prompt prefix must name the project; got: {prefix}" - ); - assert!( - prefix.starts_with("\n"), - "prefix must open with ; got: {prefix}" - ); - } } diff --git a/server/src/chat/transport/slack/commands/llm.rs b/server/src/chat/transport/slack/commands/llm.rs index 32c2a5c8..d10834dc 100644 --- a/server/src/chat/transport/slack/commands/llm.rs +++ b/server/src/chat/transport/slack/commands/llm.rs @@ -29,8 +29,11 @@ pub(super) async fn handle_llm_message( }; let bot_name = &ctx.services.bot_name; + let event_ctx = crate::llm_session::assemble_prompt_context( + resume_session_id.as_deref().unwrap_or(channel), + ); let prompt = format!( - "[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{user}: {user_message}" + "{event_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{user}: {user_message}" ); let provider = ClaudeCodeProvider::new(); diff --git a/server/src/chat/transport/whatsapp/commands/llm.rs b/server/src/chat/transport/whatsapp/commands/llm.rs index 76973ba6..6f43b7be 100644 --- a/server/src/chat/transport/whatsapp/commands/llm.rs +++ b/server/src/chat/transport/whatsapp/commands/llm.rs @@ -27,8 +27,10 @@ pub(super) async fn handle_llm_message( }; let bot_name = &ctx.services.bot_name; + let event_ctx = + crate::llm_session::assemble_prompt_context(resume_session_id.as_deref().unwrap_or(sender)); let prompt = format!( - "[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{sender}: {user_message}" + "{event_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{sender}: {user_message}" ); let provider = ClaudeCodeProvider::new(); diff --git a/server/src/llm/chat/run.rs b/server/src/llm/chat/run.rs index 8c8dc2c6..9de4d72f 100644 --- a/server/src/llm/chat/run.rs +++ b/server/src/llm/chat/run.rs @@ -139,6 +139,14 @@ where let received_at = Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(); inject_received_at(&mut messages, &received_at); + // Assemble CRDT pipeline-transition events once per turn and advance the + // high-water mark. Uses the Claude Code session_id when available so the + // same event stream key is used for resumable sessions; falls back to + // "web-ui" for Anthropic/Ollama turns which have no persistent session. + let event_ctx = crate::llm_session::assemble_prompt_context( + config.session_id.as_deref().unwrap_or("web-ui"), + ); + let _ = state.cancel_tx.send(false); let mut cancel_rx = state.cancel_rx.clone(); cancel_rx.borrow_and_update(); @@ -177,10 +185,14 @@ where // would be lost because Claude Code only receives a single prompt // string. In that case, prepend the conversation history so the LLM // retains full context even though the session cannot be resumed. + // In both cases, prepend any pending CRDT pipeline-transition events. let user_message = if config.session_id.is_some() { - latest_user_content + format!("{event_ctx}{latest_user_content}") } else { - build_claude_code_context_prompt(&messages, &latest_user_content) + format!( + "{event_ctx}{}", + build_claude_code_context_prompt(&messages, &latest_user_content) + ) }; let project_root = state @@ -233,6 +245,14 @@ where &[] }; + // Prepend pipeline-transition events to the last user message so Anthropic + // and Ollama providers also receive the CRDT context on every turn. + if !event_ctx.is_empty() + && let Some(msg) = messages.iter_mut().rev().find(|m| m.role == Role::User) + { + msg.content = format!("{event_ctx}{}", msg.content); + } + let mut current_history = messages.clone(); // Build the system prompt — append onboarding instructions when the diff --git a/server/src/service/gateway/polling.rs b/server/src/service/gateway/polling.rs index 5dfac222..a963b06a 100644 --- a/server/src/service/gateway/polling.rs +++ b/server/src/service/gateway/polling.rs @@ -4,7 +4,7 @@ //! with `[project-name]` prefixes. The actual I/O (HTTP polling, spawning //! tasks, sending messages) lives in `io.rs`. -use crate::pipeline_state::{Stage, stage_label}; +use crate::pipeline_state::Stage; use crate::service::events::StoredEvent; use crate::service::notifications::{ format_blocked_notification, format_error_notification, format_stage_notification, @@ -56,7 +56,9 @@ pub fn format_gateway_event(project_name: &str, event: &StoredEvent) -> (String, /// /// Produces a structured one-line entry with stable `key=value` fields, including /// the project name, mirroring the sled-side `format_audit_entry` format. +#[cfg(test)] pub fn format_gateway_audit_line(project: &str, event: &StoredEvent) -> String { + use crate::pipeline_state::stage_label; let ts_ms = event.timestamp_ms(); let ts = chrono::DateTime::from_timestamp_millis(ts_ms as i64) .unwrap_or_else(chrono::Utc::now)