From fb4e52dd09714cf399f1343ac89089e33a83b045 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 18 May 2026 16:47:31 +0000 Subject: [PATCH] =?UTF-8?q?huskies:=20merge=201143=20story=20Decouple=20LL?= =?UTF-8?q?M=20environmental=20awareness=20from=20chat=20transport=20?= =?UTF-8?q?=E2=80=94=20persona-keyed=20sessions=20and=20a=20real-time=20ev?= =?UTF-8?q?ent=20subscription?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/src/chat/transport/discord/commands.rs | 12 +-- .../matrix/bot/messages/handle_message.rs | 9 +- .../src/chat/transport/slack/commands/llm.rs | 5 +- .../chat/transport/whatsapp/commands/llm.rs | 4 +- .../src/crdt_state/lww_maps/llm_sessions.rs | 60 +++++------ server/src/event_log/mod.rs | 10 ++ server/src/http/ws/mod.rs | 10 ++ server/src/llm/chat/run.rs | 15 +-- server/src/llm_session/mod.rs | 100 ++++++++++++++---- server/src/main.rs | 7 ++ server/src/pipeline_event_bus.rs | 84 +++++++++++++++ server/src/service/ws/io.rs | 28 +++++ server/src/service/ws/message/response.rs | 7 ++ server/src/service/ws/mod.rs | 3 +- server/src/startup/tick_loop.rs | 4 + 15 files changed, 281 insertions(+), 77 deletions(-) create mode 100644 server/src/pipeline_event_bus.rs diff --git a/server/src/chat/transport/discord/commands.rs b/server/src/chat/transport/discord/commands.rs index dc5a3d0a..621276e0 100644 --- a/server/src/chat/transport/discord/commands.rs +++ b/server/src/chat/transport/discord/commands.rs @@ -303,12 +303,12 @@ pub(super) async fn handle_incoming_message( /// Build the prompt for a Discord LLM turn, prepending any pending /// CRDT pipeline-transition events as a `` block. fn build_discord_llm_prompt( - session_id: &str, + persona: &str, bot_name: &str, user: &str, user_message: &str, ) -> String { - let event_ctx = crate::llm_session::assemble_prompt_context(session_id); + let event_ctx = crate::llm_session::assemble_prompt_context(persona); format!( "{event_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{user}: {user_message}" ) @@ -328,12 +328,8 @@ async fn handle_llm_message(ctx: &DiscordContext, channel: &str, user: &str, use }; let bot_name = &ctx.services.bot_name; - let prompt = build_discord_llm_prompt( - resume_session_id.as_deref().unwrap_or(channel), - bot_name, - user, - user_message, - ); + let persona = bot_name.to_lowercase(); + let prompt = build_discord_llm_prompt(&persona, bot_name, user, user_message); let provider = ClaudeCodeProvider::new(); let (_cancel_tx, mut cancel_rx) = watch::channel(false); diff --git a/server/src/chat/transport/matrix/bot/messages/handle_message.rs b/server/src/chat/transport/matrix/bot/messages/handle_message.rs index c04d8514..aedf4659 100644 --- a/server/src/chat/transport/matrix/bot/messages/handle_message.rs +++ b/server/src/chat/transport/matrix/bot/messages/handle_message.rs @@ -32,9 +32,12 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message( }; // Pull new pipeline-transition events from the CRDT event log for this - // session and atomically advance the high-water marks so the same events - // are not re-injected on the next turn. - let event_log_ctx = crate::llm_session::assemble_prompt_context(&room_id_str); + // persona and atomically advance the high-water marks so the same events + // are not re-injected on the next turn. All transports share the same + // persona key so events are visible regardless of which transport handles + // the next turn. + let persona = ctx.services.bot_name.to_lowercase(); + let event_log_ctx = crate::llm_session::assemble_prompt_context(&persona); // The prompt is just the current message with sender attribution. // Prior conversation context is carried by the Claude Code session. diff --git a/server/src/chat/transport/slack/commands/llm.rs b/server/src/chat/transport/slack/commands/llm.rs index d10834dc..d234ed26 100644 --- a/server/src/chat/transport/slack/commands/llm.rs +++ b/server/src/chat/transport/slack/commands/llm.rs @@ -29,9 +29,8 @@ pub(super) async fn handle_llm_message( }; let bot_name = &ctx.services.bot_name; - let event_ctx = crate::llm_session::assemble_prompt_context( - resume_session_id.as_deref().unwrap_or(channel), - ); + let persona = bot_name.to_lowercase(); + let event_ctx = crate::llm_session::assemble_prompt_context(&persona); let prompt = format!( "{event_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{user}: {user_message}" ); diff --git a/server/src/chat/transport/whatsapp/commands/llm.rs b/server/src/chat/transport/whatsapp/commands/llm.rs index 6f43b7be..5a88c096 100644 --- a/server/src/chat/transport/whatsapp/commands/llm.rs +++ b/server/src/chat/transport/whatsapp/commands/llm.rs @@ -27,8 +27,8 @@ pub(super) async fn handle_llm_message( }; let bot_name = &ctx.services.bot_name; - let event_ctx = - crate::llm_session::assemble_prompt_context(resume_session_id.as_deref().unwrap_or(sender)); + let persona = bot_name.to_lowercase(); + let event_ctx = crate::llm_session::assemble_prompt_context(&persona); let prompt = format!( "{event_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{sender}: {user_message}" ); diff --git a/server/src/crdt_state/lww_maps/llm_sessions.rs b/server/src/crdt_state/lww_maps/llm_sessions.rs index 9ba70e2c..3600620a 100644 --- a/server/src/crdt_state/lww_maps/llm_sessions.rs +++ b/server/src/crdt_state/lww_maps/llm_sessions.rs @@ -1,10 +1,10 @@ //! Read/write helpers for the `llm_sessions` LWW-map collection, including the -//! atomic `assemble_and_advance_session` helper used by the Matrix bot. +//! atomic `assemble_and_advance_session` helper used by every chat transport. //! -//! LLM sessions are keyed by `session_id` (typically a Matrix room ID) and track -//! per-sled high-water marks so that `assemble_and_advance_session` can inject -//! only events the LLM has not yet seen and advance the marks atomically within -//! a single CRDT lock acquisition. +//! LLM sessions are keyed by **persona name** (e.g. `"timmy"` for the +//! gateway-level bot) and track per-sled high-water marks so that +//! `assemble_and_advance_session` can inject only events the LLM has not yet +//! seen and advance the marks atomically within a single CRDT lock acquisition. use std::collections::{BTreeMap, BTreeSet}; @@ -16,16 +16,15 @@ use super::super::state::{apply_and_persist, get_crdt, rebuild_llm_session_index use super::super::types::{LlmSessionCrdt, LlmSessionView, ScopeFilter}; use super::event_log::GAP_PIPELINE_EVENT; -/// Write or upsert an LLM session entry keyed by `session_id`. +/// Write or upsert an LLM session entry keyed by `persona`. /// -/// Creates a new entry if `session_id` is not yet present; updates -/// `persona_name` and `scope` on an existing entry. The `high_water` -/// register is not touched by this function — use `assemble_and_advance_session` -/// to advance it atomically. +/// Creates a new entry if `persona` is not yet present; updates `scope` on an +/// existing entry. The `high_water` register is not touched by this function — +/// use `assemble_and_advance_session` to advance it atomically. /// /// The `scope` string must be in wire form: `"all"` for [`ScopeFilter::All`] /// or `"sleds:hex1,hex2"` for [`ScopeFilter::Sleds`]. -pub fn write_llm_session(session_id: &str, persona_name: &str, scope: &str) { +pub fn write_llm_session(persona: &str, scope: &str) { let Some(state_mutex) = get_crdt() else { return; }; @@ -33,19 +32,19 @@ pub fn write_llm_session(session_id: &str, persona_name: &str, scope: &str) { return; }; - if let Some(&idx) = state.llm_session_index.get(session_id) { + if let Some(&idx) = state.llm_session_index.get(persona) { apply_and_persist(&mut state, |s| { s.crdt.doc.llm_sessions[idx] .persona_name - .set(persona_name.to_string()) + .set(persona.to_string()) }); apply_and_persist(&mut state, |s| { s.crdt.doc.llm_sessions[idx].scope.set(scope.to_string()) }); } else { let entry: JsonValue = json!({ - "session_id": session_id, - "persona_name": persona_name, + "session_id": persona, + "persona_name": persona, "scope": scope, "high_water": "{}", }) @@ -57,19 +56,19 @@ pub fn write_llm_session(session_id: &str, persona_name: &str, scope: &str) { } } -/// Read a single LLM session entry by `session_id`. -pub fn read_llm_session(session_id: &str) -> Option { +/// Read a single LLM session entry by persona name. +pub fn read_llm_session(persona: &str) -> Option { let state_mutex = get_crdt()?; let state = state_mutex.lock().ok()?; - let &idx = state.llm_session_index.get(session_id)?; + let &idx = state.llm_session_index.get(persona)?; extract_llm_session_view(&state.crdt.doc.llm_sessions[idx]) } -/// Atomically read new event-log entries for `session_id` past the stored +/// Atomically read new event-log entries for `persona` past the stored /// high-water marks, render them as a block of audit lines, and advance the /// marks to prevent double-injection on the next call. /// -/// The set of sleds whose events are collected is determined by the session's +/// The set of sleds whose events are collected is determined by the persona's /// [`ScopeFilter`]: /// - [`ScopeFilter::All`]: events from every sled present in the event log are /// included — this is the gateway-level persona default that gives a full @@ -81,7 +80,7 @@ pub fn read_llm_session(session_id: &str) -> Option { /// /// Returns an empty `Vec` when there are no new events or the CRDT is not /// initialised. -pub fn assemble_and_advance_session(session_id: &str) -> Vec { +pub fn assemble_and_advance_session(persona: &str) -> Vec { let local_sled_id = crate::crdt_state::our_node_id().unwrap_or_default(); let Some(state_mutex) = get_crdt() else { @@ -91,9 +90,8 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec { return Vec::new(); }; - // Determine the session's scope filter and current high-water map. - let (scope_filter, current_high_water) = match state.llm_session_index.get(session_id).copied() - { + // Determine the persona's scope filter and current high-water map. + let (scope_filter, current_high_water) = match state.llm_session_index.get(persona).copied() { Some(idx) => { let filter = parse_scope(&state.crdt.doc.llm_sessions[idx], &local_sled_id); let hw = parse_high_water(&state.crdt.doc.llm_sessions[idx]); @@ -168,8 +166,8 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec { } let new_hw_json = serde_json::to_string(&new_high_water).unwrap_or_else(|_| "{}".to_string()); - // Upsert the session entry with the new high-water value. - let idx_opt = state.llm_session_index.get(session_id).copied(); + // Upsert the persona entry with the new high-water value. + let idx_opt = state.llm_session_index.get(persona).copied(); if let Some(idx) = idx_opt { apply_and_persist(&mut state, |s| { s.crdt.doc.llm_sessions[idx] @@ -179,8 +177,8 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec { } else { let scope_str = scope_filter.to_scope_str(); let entry: JsonValue = json!({ - "session_id": session_id, - "persona_name": "", + "session_id": persona, + "persona_name": persona, "scope": scope_str, "high_water": new_hw_json, }) @@ -191,8 +189,8 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec { state.llm_session_index = rebuild_llm_session_index(&state.crdt); } - // Observability: log event-log size and gap count across the session's - // target sleds (the scope actually assembled for this session). + // Observability: log event-log size and gap count across the persona's + // target sleds (the scope actually assembled for this persona). let total_entries = state .crdt .doc @@ -211,7 +209,7 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec { }) .count(); crate::slog!( - "[event-log] assemble session={session_id} sled_entries={total_entries} gap_count={gap_count}" + "[event-log] assemble persona={persona} sled_entries={total_entries} gap_count={gap_count}" ); // Render each new event as a compact audit line; gap sentinels get a diff --git a/server/src/event_log/mod.rs b/server/src/event_log/mod.rs index fb7dfbca..b20274af 100644 --- a/server/src/event_log/mod.rs +++ b/server/src/event_log/mod.rs @@ -66,6 +66,15 @@ pub fn log_transition_event(fired: &crate::pipeline_state::TransitionFired) { to_stage, pipeline_event, ); + + // Real-time push to per-persona WebSocket subscribers. + crate::pipeline_event_bus::broadcast(crate::pipeline_event_bus::BusEvent { + sled_id, + story_id: fired.story_id.0.clone(), + from_stage: crate::pipeline_state::stage_label(&fired.before).to_string(), + to_stage: crate::pipeline_state::stage_label(&fired.after).to_string(), + pipeline_event: crate::pipeline_state::event_label(&fired.event).to_string(), + }); } /// Read all persisted events from the CRDT event log. @@ -121,6 +130,7 @@ pub fn spawn_event_log_subscriber() { loop { match rx.recv().await { Ok(fired) => { + // log_transition_event also broadcasts to the pipeline_event_bus. log_transition_event(&fired); next_logical_seq += 1; } diff --git a/server/src/http/ws/mod.rs b/server/src/http/ws/mod.rs index 69f42b24..079f4714 100644 --- a/server/src/http/ws/mod.rs +++ b/server/src/http/ws/mod.rs @@ -86,6 +86,14 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem ws::subscribe_status(tx.clone(), ctx.services.status.subscribe()); } + // Subscribe to real-time pipeline-transition events for this persona. + // Events that arrived while no client was connected are caught up by + // assemble_prompt_context at turn time. + ws::subscribe_persona_pipeline_events( + tx.clone(), + ctx.services.bot_name.to_lowercase(), + ); + // Map of pending permission request_id -> oneshot responder. let mut pending_perms: HashMap> = HashMap::new(); @@ -109,9 +117,11 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem let tx_activity = tx.clone(); let ctx_clone = ctx.clone(); + let persona = ctx_clone.services.bot_name.to_lowercase(); let chat_fut = chat::chat( messages, config, + &persona, &ctx_clone.state, ctx_clone.store.as_ref(), move |history| { diff --git a/server/src/llm/chat/run.rs b/server/src/llm/chat/run.rs index 9de4d72f..778d58e7 100644 --- a/server/src/llm/chat/run.rs +++ b/server/src/llm/chat/run.rs @@ -113,10 +113,13 @@ pub fn cancel_chat(state: &SessionState) -> Result<(), String> { } /// Run a multi-turn chat with tool calling against the configured provider. +/// +/// `persona` is the persona name used to key CRDT event-log assembly (e.g. `"timmy"`). #[allow(clippy::too_many_arguments)] pub async fn chat( mut messages: Vec, config: ProviderConfig, + persona: &str, state: &SessionState, store: &dyn StoreOps, mut on_update: F, @@ -140,12 +143,9 @@ where inject_received_at(&mut messages, &received_at); // Assemble CRDT pipeline-transition events once per turn and advance the - // high-water mark. Uses the Claude Code session_id when available so the - // same event stream key is used for resumable sessions; falls back to - // "web-ui" for Anthropic/Ollama turns which have no persistent session. - let event_ctx = crate::llm_session::assemble_prompt_context( - config.session_id.as_deref().unwrap_or("web-ui"), - ); + // high-water mark. Uses the caller-supplied persona so all transports share + // the same event stream regardless of transport-specific session identifiers. + let event_ctx = crate::llm_session::assemble_prompt_context(persona); let _ = state.cancel_tx.send(false); let mut cancel_rx = state.cancel_rx.clone(); @@ -628,6 +628,7 @@ mod tests { let result = chat( messages, config, + "timmy", &state, &store, |_| {}, @@ -672,6 +673,7 @@ mod tests { let result = chat( messages, config, + "timmy", &state, &store, |_| {}, @@ -712,6 +714,7 @@ mod tests { let result = chat( messages, config, + "timmy", &state, &store, |_| {}, diff --git a/server/src/llm_session/mod.rs b/server/src/llm_session/mod.rs index 4e777215..8a283f6e 100644 --- a/server/src/llm_session/mod.rs +++ b/server/src/llm_session/mod.rs @@ -1,23 +1,23 @@ //! LLM session management — CRDT-backed context assembly for bot prompts. //! //! The central export is [`assemble_prompt_context`], which reads new pipeline -//! transition events from the CRDT event log past the session's stored high-water +//! transition events from the CRDT event log past the persona's stored high-water //! marks, wraps them in a `` block for injection at the head of //! the next LLM prompt, and atomically advances the marks so a mid-turn crash //! cannot double-inject the same events. /// Assemble a `` block containing new pipeline-transition events -/// for `session_id` and atomically advance the high-water marks. +/// for `persona` and atomically advance the high-water marks. /// -/// Reads events from the local sled's CRDT event log that have not yet been -/// injected into this session (tracked via per-sled high-water marks stored in -/// the `LlmSessionCrdt` entity). Returns an empty string when there are no new -/// events or the CRDT is not yet initialised. -pub fn assemble_prompt_context(session_id: &str) -> String { - let lines = crate::crdt_state::assemble_and_advance_session(session_id); +/// All chat transports call this with the same persona name (e.g. `"timmy"`) +/// so that events are visible to whichever transport handles the next turn, +/// regardless of transport-specific session identifiers. Returns an empty +/// string when there are no new events or the CRDT is not yet initialised. +pub fn assemble_prompt_context(persona: &str) -> String { + let lines = crate::crdt_state::assemble_and_advance_session(persona); let event_count = lines.len(); crate::slog!( - "[llm-session] assemble_prompt_context session={session_id} new_events={event_count}" + "[llm-session] assemble_prompt_context persona={persona} new_events={event_count}" ); if lines.is_empty() { return String::new(); @@ -187,14 +187,14 @@ mod tests { "AgentCompleted", ); - // Set up a session scoped to ALL sleds. - crate::crdt_state::write_llm_session("room-scope-all", "Timmy", "all"); - // Set up a session scoped to sled-A only. + // Set up a persona scoped to ALL sleds. + crate::crdt_state::write_llm_session("timmy", "all"); + // Set up a persona scoped to sled-A only. let sled_a_scope = format!("sleds:{sled_a}"); - crate::crdt_state::write_llm_session("room-scope-sled-a", "Sally", &sled_a_scope); + crate::crdt_state::write_llm_session("sally", &sled_a_scope); - // All-scope session: both events must appear. - let ctx_all = assemble_prompt_context("room-scope-all"); + // All-scope persona: both events must appear. + let ctx_all = assemble_prompt_context("timmy"); assert!( ctx_all.contains("10_story_alpha"), "All scope must contain sled-A event; got: {ctx_all}" @@ -204,8 +204,8 @@ mod tests { "All scope must contain sled-B event; got: {ctx_all}" ); - // Sled-A-only session: only sled-A's event visible. - let ctx_a = assemble_prompt_context("room-scope-sled-a"); + // Sled-A-only persona: only sled-A's event visible. + let ctx_a = assemble_prompt_context("sally"); assert!( ctx_a.contains("10_story_alpha"), "Sleds filter must contain sled-A event; got: {ctx_a}" @@ -215,19 +215,73 @@ mod tests { "Sleds filter must NOT contain sled-B event; got: {ctx_a}" ); - // Second call on both sessions: nothing new (high-water advanced). - let ctx_all2 = assemble_prompt_context("room-scope-all"); + // Second call on both personas: nothing new (high-water advanced). + let ctx_all2 = assemble_prompt_context("timmy"); assert!( ctx_all2.is_empty(), "All scope second call must be empty; got: {ctx_all2}" ); - let ctx_a2 = assemble_prompt_context("room-scope-sled-a"); + let ctx_a2 = assemble_prompt_context("sally"); assert!( ctx_a2.is_empty(), "Sleds filter second call must be empty; got: {ctx_a2}" ); } + /// AC 5 e2e: fire a pipeline transition, then verify that calling + /// `assemble_prompt_context` with the same persona key from any "transport" + /// (simulated by different caller labels) sees the event. The persona is + /// transport-agnostic; subsequent transports sharing the persona see their + /// own new events independently via independent calls (each drains a fresh + /// batch). + #[test] + fn persona_key_is_transport_agnostic() { + crate::crdt_state::init_for_test(); + crate::crdt_state::write_llm_session("timmy", "all"); + + // Fire event 1. + crate::event_log::log_transition_event(&make_fired("e2e_story_1")); + + // Matrix turn: see event 1. + let matrix_ctx = assemble_prompt_context("timmy"); + assert!( + matrix_ctx.contains("e2e_story_1"), + "Matrix turn must see event 1; got: {matrix_ctx}" + ); + + // Fire event 2. + crate::event_log::log_transition_event(&make_fired("e2e_story_2")); + + // Web-UI turn (same persona): see event 2 only (event 1 high-water already advanced). + let web_ui_ctx = assemble_prompt_context("timmy"); + assert!( + web_ui_ctx.contains("e2e_story_2"), + "Web-UI turn must see event 2; got: {web_ui_ctx}" + ); + assert!( + !web_ui_ctx.contains("e2e_story_1"), + "Web-UI turn must NOT re-see event 1; got: {web_ui_ctx}" + ); + + // Fire event 3. + crate::event_log::log_transition_event(&make_fired("e2e_story_3")); + + // CLI turn (same persona): see event 3 only. + let cli_ctx = assemble_prompt_context("timmy"); + assert!( + cli_ctx.contains("e2e_story_3"), + "CLI turn must see event 3; got: {cli_ctx}" + ); + assert!( + !cli_ctx.contains("e2e_story_1"), + "CLI turn must NOT re-see event 1; got: {cli_ctx}" + ); + assert!( + !cli_ctx.contains("e2e_story_2"), + "CLI turn must NOT re-see event 2; got: {cli_ctx}" + ); + } + /// Newly-added sled events appear in an All-scope session without /// restarting (AC 5 runtime pickup). #[test] @@ -246,9 +300,9 @@ mod tests { "2_current", "DepsMet", ); - crate::crdt_state::write_llm_session("room-runtime-pickup", "Timmy", "all"); + crate::crdt_state::write_llm_session("timmy", "all"); - let ctx1 = assemble_prompt_context("room-runtime-pickup"); + let ctx1 = assemble_prompt_context("timmy"); assert!( ctx1.contains("30_story_first"), "first event must appear; got: {ctx1}" @@ -264,7 +318,7 @@ mod tests { "AgentCompleted", ); - let ctx2 = assemble_prompt_context("room-runtime-pickup"); + let ctx2 = assemble_prompt_context("timmy"); assert!( ctx2.contains("40_story_second"), "newly adopted sled event must appear; got: {ctx2}" diff --git a/server/src/main.rs b/server/src/main.rs index 2aac2709..332fc27a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -36,6 +36,8 @@ pub mod log_buffer; pub mod mesh; /// Node identity — Ed25519 keypair generation and stable node ID management. pub mod node_identity; +/// Pipeline event bus — real-time broadcast of pipeline-transition events to persona subscribers. +pub(crate) mod pipeline_event_bus; pub(crate) mod pipeline_state; /// Reliable process-termination primitives shared across the server. pub mod process_kill; @@ -286,6 +288,11 @@ async fn main() -> Result<(), std::io::Error> { )), }); + // Register the bot's persona in the CRDT so all transports share a single + // event-log high-water mark keyed by name rather than transport ids. + // scope="all" gives the gateway persona a cross-sled view of pipeline events. + crate::crdt_state::write_llm_session(&services.bot_name.to_lowercase(), "all"); + // Sled uplink: forward permission requests to an upstream gateway when configured. let upstream_gateway = cli .upstream_gateway diff --git a/server/src/pipeline_event_bus.rs b/server/src/pipeline_event_bus.rs new file mode 100644 index 00000000..217eb904 --- /dev/null +++ b/server/src/pipeline_event_bus.rs @@ -0,0 +1,84 @@ +//! Real-time pipeline event bus — broadcasts `TransitionFired` events to +//! per-persona WebSocket subscribers as they happen. +//! +//! Turn-time [`crate::llm_session::assemble_prompt_context`] still works as +//! a fallback catch-up mechanism for any events that accumulated while no +//! subscriber was connected. + +use std::sync::OnceLock; +use tokio::sync::broadcast; + +/// Capacity of the per-persona event bus broadcast channel. +const BUS_CAPACITY: usize = 256; + +/// A raw pipeline-transition event forwarded from `log_transition_event`. +#[derive(Clone, Debug)] +pub struct BusEvent { + /// Hex-encoded sled ID that fired the transition. + pub sled_id: String, + /// Story identifier (e.g. `"42_story_foo"`). + pub story_id: String, + /// Human-readable label of the stage before the transition. + pub from_stage: String, + /// Human-readable label of the stage after the transition. + pub to_stage: String, + /// String label of the `PipelineEvent` variant. + pub pipeline_event: String, +} + +static BUS_TX: OnceLock> = OnceLock::new(); + +/// Initialise the pipeline event bus. No-op on subsequent calls. +pub fn init() { + let _ = BUS_TX.get_or_init(|| broadcast::channel::(BUS_CAPACITY).0); +} + +/// Broadcast a pipeline transition event to all active subscribers. +/// +/// No-op if the bus has not been initialised or there are no subscribers. +pub fn broadcast(event: BusEvent) { + if let Some(tx) = BUS_TX.get() { + let _ = tx.send(event); + } +} + +/// Subscribe to the pipeline event bus. +/// +/// Returns `None` if the bus has not been initialised yet. +pub fn subscribe() -> Option> { + BUS_TX.get().map(|tx| tx.subscribe()) +} + +/// Render a [`BusEvent`] as the same compact audit line used in +/// `assemble_and_advance_session`. +pub fn render_event(event: &BusEvent) -> String { + if event.pipeline_event == crate::crdt_state::GAP_PIPELINE_EVENT { + format!( + "events between {} and {} were dropped", + event.from_stage, event.to_stage + ) + } else { + format!( + "pipeline_event sled_id=\"{}\" story_id=\"{}\" from=\"{}\" to=\"{}\" event=\"{}\"", + event.sled_id, event.story_id, event.from_stage, event.to_stage, event.pipeline_event + ) + } +} + +/// Returns `true` if `event` should be delivered to `persona` based on the +/// persona's stored scope filter in the CRDT. +/// +/// Falls back to local-sled-only if no session entry exists for `persona`. +pub fn event_matches_persona(event: &BusEvent, persona: &str) -> bool { + use crate::crdt_state::ScopeFilter; + match crate::crdt_state::read_llm_session(persona) { + Some(session) => match &session.scope_filter { + ScopeFilter::All => true, + ScopeFilter::Sleds(ids) => ids.contains(&event.sled_id), + }, + None => { + let local = crate::crdt_state::our_node_id().unwrap_or_default(); + !local.is_empty() && event.sled_id == local + } + } +} diff --git a/server/src/service/ws/io.rs b/server/src/service/ws/io.rs index 95b5b13d..908684ea 100644 --- a/server/src/service/ws/io.rs +++ b/server/src/service/ws/io.rs @@ -132,6 +132,34 @@ pub fn subscribe_status(tx: mpsc::UnboundedSender, mut subscription: }); } +/// Spawn a background task that forwards real-time pipeline-transition events to +/// the client, filtered to those visible to `persona` based on its scope filter. +/// +/// Each matching event is delivered as a [`WsResponse::PipelineEvent`] frame. +/// Events that occur while no subscriber is connected are NOT delivered here; +/// [`crate::llm_session::assemble_prompt_context`] catches up on those at turn time. +pub fn subscribe_persona_pipeline_events(tx: mpsc::UnboundedSender, persona: String) { + let Some(mut rx) = crate::pipeline_event_bus::subscribe() else { + return; + }; + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(event) => { + if crate::pipeline_event_bus::event_matches_persona(&event, &persona) { + let line = crate::pipeline_event_bus::render_event(&event); + if tx.send(WsResponse::PipelineEvent { line }).is_err() { + break; + } + } + } + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => break, + } + } + }); +} + /// Spawn a background task that forwards reconciliation events to the client. pub fn subscribe_reconciliation( tx: mpsc::UnboundedSender, diff --git a/server/src/service/ws/message/response.rs b/server/src/service/ws/message/response.rs index 1003d687..169e5abf 100644 --- a/server/src/service/ws/message/response.rs +++ b/server/src/service/ws/message/response.rs @@ -127,6 +127,13 @@ pub enum WsResponse { StatusUpdate { event: StatusEvent, }, + /// A real-time pipeline-transition event pushed to the client as it happens. + /// + /// Carries the same compact audit-line format used in `` + /// blocks so that LLM-aware clients can consume it without additional parsing. + PipelineEvent { + line: String, + }, } #[cfg(test)] diff --git a/server/src/service/ws/mod.rs b/server/src/service/ws/mod.rs index 0eef977d..98730e60 100644 --- a/server/src/service/ws/mod.rs +++ b/server/src/service/ws/mod.rs @@ -23,6 +23,7 @@ pub use dispatch::{ }; pub use io::{ check_onboarding, load_initial_pipeline_state, load_recent_logs, load_wizard_state, - subscribe_logs, subscribe_reconciliation, subscribe_status, subscribe_watcher, + subscribe_logs, subscribe_persona_pipeline_events, subscribe_reconciliation, subscribe_status, + subscribe_watcher, }; pub use message::{WizardStepInfo, WsResponse}; diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index 954eaf2f..9345a231 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -28,6 +28,10 @@ pub(crate) fn spawn_event_bridges( // Audit log subscriber: write one structured line per pipeline transition. crate::pipeline_state::spawn_audit_log_subscriber(); + // Pipeline event bus: initialise before the event-log subscriber so that + // real-time broadcasts are ready before the first transition fires. + crate::pipeline_event_bus::init(); + // Event log subscriber: persist every transition to the CRDT event log so // the history survives rebuild_and_restart and replicates across nodes. crate::event_log::spawn_event_log_subscriber();