huskies: merge 1143 story Decouple LLM environmental awareness from chat transport — persona-keyed sessions and a real-time event subscription

This commit is contained in:
dave
2026-05-18 16:47:31 +00:00
parent e58ff4465a
commit fb4e52dd09
15 changed files with 281 additions and 77 deletions
@@ -303,12 +303,12 @@ pub(super) async fn handle_incoming_message(
/// Build the prompt for a Discord LLM turn, prepending any pending /// Build the prompt for a Discord LLM turn, prepending any pending
/// CRDT pipeline-transition events as a `<system-reminder>` block. /// CRDT pipeline-transition events as a `<system-reminder>` block.
fn build_discord_llm_prompt( fn build_discord_llm_prompt(
session_id: &str, persona: &str,
bot_name: &str, bot_name: &str,
user: &str, user: &str,
user_message: &str, user_message: &str,
) -> String { ) -> String {
let event_ctx = crate::llm_session::assemble_prompt_context(session_id); let event_ctx = crate::llm_session::assemble_prompt_context(persona);
format!( format!(
"{event_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{user}: {user_message}" "{event_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{user}: {user_message}"
) )
@@ -328,12 +328,8 @@ async fn handle_llm_message(ctx: &DiscordContext, channel: &str, user: &str, use
}; };
let bot_name = &ctx.services.bot_name; let bot_name = &ctx.services.bot_name;
let prompt = build_discord_llm_prompt( let persona = bot_name.to_lowercase();
resume_session_id.as_deref().unwrap_or(channel), let prompt = build_discord_llm_prompt(&persona, bot_name, user, user_message);
bot_name,
user,
user_message,
);
let provider = ClaudeCodeProvider::new(); let provider = ClaudeCodeProvider::new();
let (_cancel_tx, mut cancel_rx) = watch::channel(false); let (_cancel_tx, mut cancel_rx) = watch::channel(false);
@@ -32,9 +32,12 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message(
}; };
// Pull new pipeline-transition events from the CRDT event log for this // Pull new pipeline-transition events from the CRDT event log for this
// session and atomically advance the high-water marks so the same events // persona and atomically advance the high-water marks so the same events
// are not re-injected on the next turn. // are not re-injected on the next turn. All transports share the same
let event_log_ctx = crate::llm_session::assemble_prompt_context(&room_id_str); // persona key so events are visible regardless of which transport handles
// the next turn.
let persona = ctx.services.bot_name.to_lowercase();
let event_log_ctx = crate::llm_session::assemble_prompt_context(&persona);
// The prompt is just the current message with sender attribution. // The prompt is just the current message with sender attribution.
// Prior conversation context is carried by the Claude Code session. // Prior conversation context is carried by the Claude Code session.
@@ -29,9 +29,8 @@ pub(super) async fn handle_llm_message(
}; };
let bot_name = &ctx.services.bot_name; let bot_name = &ctx.services.bot_name;
let event_ctx = crate::llm_session::assemble_prompt_context( let persona = bot_name.to_lowercase();
resume_session_id.as_deref().unwrap_or(channel), let event_ctx = crate::llm_session::assemble_prompt_context(&persona);
);
let prompt = format!( let prompt = format!(
"{event_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{user}: {user_message}" "{event_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{user}: {user_message}"
); );
@@ -27,8 +27,8 @@ pub(super) async fn handle_llm_message(
}; };
let bot_name = &ctx.services.bot_name; let bot_name = &ctx.services.bot_name;
let event_ctx = let persona = bot_name.to_lowercase();
crate::llm_session::assemble_prompt_context(resume_session_id.as_deref().unwrap_or(sender)); let event_ctx = crate::llm_session::assemble_prompt_context(&persona);
let prompt = format!( let prompt = format!(
"{event_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{sender}: {user_message}" "{event_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n\n{sender}: {user_message}"
); );
+29 -31
View File
@@ -1,10 +1,10 @@
//! Read/write helpers for the `llm_sessions` LWW-map collection, including the //! Read/write helpers for the `llm_sessions` LWW-map collection, including the
//! atomic `assemble_and_advance_session` helper used by the Matrix bot. //! atomic `assemble_and_advance_session` helper used by every chat transport.
//! //!
//! LLM sessions are keyed by `session_id` (typically a Matrix room ID) and track //! LLM sessions are keyed by **persona name** (e.g. `"timmy"` for the
//! per-sled high-water marks so that `assemble_and_advance_session` can inject //! gateway-level bot) and track per-sled high-water marks so that
//! only events the LLM has not yet seen and advance the marks atomically within //! `assemble_and_advance_session` can inject only events the LLM has not yet
//! a single CRDT lock acquisition. //! seen and advance the marks atomically within a single CRDT lock acquisition.
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
@@ -16,16 +16,15 @@ use super::super::state::{apply_and_persist, get_crdt, rebuild_llm_session_index
use super::super::types::{LlmSessionCrdt, LlmSessionView, ScopeFilter}; use super::super::types::{LlmSessionCrdt, LlmSessionView, ScopeFilter};
use super::event_log::GAP_PIPELINE_EVENT; use super::event_log::GAP_PIPELINE_EVENT;
/// Write or upsert an LLM session entry keyed by `session_id`. /// Write or upsert an LLM session entry keyed by `persona`.
/// ///
/// Creates a new entry if `session_id` is not yet present; updates /// Creates a new entry if `persona` is not yet present; updates `scope` on an
/// `persona_name` and `scope` on an existing entry. The `high_water` /// existing entry. The `high_water` register is not touched by this function —
/// register is not touched by this function — use `assemble_and_advance_session` /// use `assemble_and_advance_session` to advance it atomically.
/// to advance it atomically.
/// ///
/// The `scope` string must be in wire form: `"all"` for [`ScopeFilter::All`] /// The `scope` string must be in wire form: `"all"` for [`ScopeFilter::All`]
/// or `"sleds:hex1,hex2"` for [`ScopeFilter::Sleds`]. /// or `"sleds:hex1,hex2"` for [`ScopeFilter::Sleds`].
pub fn write_llm_session(session_id: &str, persona_name: &str, scope: &str) { pub fn write_llm_session(persona: &str, scope: &str) {
let Some(state_mutex) = get_crdt() else { let Some(state_mutex) = get_crdt() else {
return; return;
}; };
@@ -33,19 +32,19 @@ pub fn write_llm_session(session_id: &str, persona_name: &str, scope: &str) {
return; return;
}; };
if let Some(&idx) = state.llm_session_index.get(session_id) { if let Some(&idx) = state.llm_session_index.get(persona) {
apply_and_persist(&mut state, |s| { apply_and_persist(&mut state, |s| {
s.crdt.doc.llm_sessions[idx] s.crdt.doc.llm_sessions[idx]
.persona_name .persona_name
.set(persona_name.to_string()) .set(persona.to_string())
}); });
apply_and_persist(&mut state, |s| { apply_and_persist(&mut state, |s| {
s.crdt.doc.llm_sessions[idx].scope.set(scope.to_string()) s.crdt.doc.llm_sessions[idx].scope.set(scope.to_string())
}); });
} else { } else {
let entry: JsonValue = json!({ let entry: JsonValue = json!({
"session_id": session_id, "session_id": persona,
"persona_name": persona_name, "persona_name": persona,
"scope": scope, "scope": scope,
"high_water": "{}", "high_water": "{}",
}) })
@@ -57,19 +56,19 @@ pub fn write_llm_session(session_id: &str, persona_name: &str, scope: &str) {
} }
} }
/// Read a single LLM session entry by `session_id`. /// Read a single LLM session entry by persona name.
pub fn read_llm_session(session_id: &str) -> Option<LlmSessionView> { pub fn read_llm_session(persona: &str) -> Option<LlmSessionView> {
let state_mutex = get_crdt()?; let state_mutex = get_crdt()?;
let state = state_mutex.lock().ok()?; let state = state_mutex.lock().ok()?;
let &idx = state.llm_session_index.get(session_id)?; let &idx = state.llm_session_index.get(persona)?;
extract_llm_session_view(&state.crdt.doc.llm_sessions[idx]) extract_llm_session_view(&state.crdt.doc.llm_sessions[idx])
} }
/// Atomically read new event-log entries for `session_id` past the stored /// Atomically read new event-log entries for `persona` past the stored
/// high-water marks, render them as a block of audit lines, and advance the /// high-water marks, render them as a block of audit lines, and advance the
/// marks to prevent double-injection on the next call. /// marks to prevent double-injection on the next call.
/// ///
/// The set of sleds whose events are collected is determined by the session's /// The set of sleds whose events are collected is determined by the persona's
/// [`ScopeFilter`]: /// [`ScopeFilter`]:
/// - [`ScopeFilter::All`]: events from every sled present in the event log are /// - [`ScopeFilter::All`]: events from every sled present in the event log are
/// included — this is the gateway-level persona default that gives a full /// included — this is the gateway-level persona default that gives a full
@@ -81,7 +80,7 @@ pub fn read_llm_session(session_id: &str) -> Option<LlmSessionView> {
/// ///
/// Returns an empty `Vec` when there are no new events or the CRDT is not /// Returns an empty `Vec` when there are no new events or the CRDT is not
/// initialised. /// initialised.
pub fn assemble_and_advance_session(session_id: &str) -> Vec<String> { pub fn assemble_and_advance_session(persona: &str) -> Vec<String> {
let local_sled_id = crate::crdt_state::our_node_id().unwrap_or_default(); let local_sled_id = crate::crdt_state::our_node_id().unwrap_or_default();
let Some(state_mutex) = get_crdt() else { let Some(state_mutex) = get_crdt() else {
@@ -91,9 +90,8 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec<String> {
return Vec::new(); return Vec::new();
}; };
// Determine the session's scope filter and current high-water map. // Determine the persona's scope filter and current high-water map.
let (scope_filter, current_high_water) = match state.llm_session_index.get(session_id).copied() let (scope_filter, current_high_water) = match state.llm_session_index.get(persona).copied() {
{
Some(idx) => { Some(idx) => {
let filter = parse_scope(&state.crdt.doc.llm_sessions[idx], &local_sled_id); let filter = parse_scope(&state.crdt.doc.llm_sessions[idx], &local_sled_id);
let hw = parse_high_water(&state.crdt.doc.llm_sessions[idx]); let hw = parse_high_water(&state.crdt.doc.llm_sessions[idx]);
@@ -168,8 +166,8 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec<String> {
} }
let new_hw_json = serde_json::to_string(&new_high_water).unwrap_or_else(|_| "{}".to_string()); let new_hw_json = serde_json::to_string(&new_high_water).unwrap_or_else(|_| "{}".to_string());
// Upsert the session entry with the new high-water value. // Upsert the persona entry with the new high-water value.
let idx_opt = state.llm_session_index.get(session_id).copied(); let idx_opt = state.llm_session_index.get(persona).copied();
if let Some(idx) = idx_opt { if let Some(idx) = idx_opt {
apply_and_persist(&mut state, |s| { apply_and_persist(&mut state, |s| {
s.crdt.doc.llm_sessions[idx] s.crdt.doc.llm_sessions[idx]
@@ -179,8 +177,8 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec<String> {
} else { } else {
let scope_str = scope_filter.to_scope_str(); let scope_str = scope_filter.to_scope_str();
let entry: JsonValue = json!({ let entry: JsonValue = json!({
"session_id": session_id, "session_id": persona,
"persona_name": "", "persona_name": persona,
"scope": scope_str, "scope": scope_str,
"high_water": new_hw_json, "high_water": new_hw_json,
}) })
@@ -191,8 +189,8 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec<String> {
state.llm_session_index = rebuild_llm_session_index(&state.crdt); state.llm_session_index = rebuild_llm_session_index(&state.crdt);
} }
// Observability: log event-log size and gap count across the session's // Observability: log event-log size and gap count across the persona's
// target sleds (the scope actually assembled for this session). // target sleds (the scope actually assembled for this persona).
let total_entries = state let total_entries = state
.crdt .crdt
.doc .doc
@@ -211,7 +209,7 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec<String> {
}) })
.count(); .count();
crate::slog!( crate::slog!(
"[event-log] assemble session={session_id} sled_entries={total_entries} gap_count={gap_count}" "[event-log] assemble persona={persona} sled_entries={total_entries} gap_count={gap_count}"
); );
// Render each new event as a compact audit line; gap sentinels get a // Render each new event as a compact audit line; gap sentinels get a
+10
View File
@@ -66,6 +66,15 @@ pub fn log_transition_event(fired: &crate::pipeline_state::TransitionFired) {
to_stage, to_stage,
pipeline_event, pipeline_event,
); );
// Real-time push to per-persona WebSocket subscribers.
crate::pipeline_event_bus::broadcast(crate::pipeline_event_bus::BusEvent {
sled_id,
story_id: fired.story_id.0.clone(),
from_stage: crate::pipeline_state::stage_label(&fired.before).to_string(),
to_stage: crate::pipeline_state::stage_label(&fired.after).to_string(),
pipeline_event: crate::pipeline_state::event_label(&fired.event).to_string(),
});
} }
/// Read all persisted events from the CRDT event log. /// Read all persisted events from the CRDT event log.
@@ -121,6 +130,7 @@ pub fn spawn_event_log_subscriber() {
loop { loop {
match rx.recv().await { match rx.recv().await {
Ok(fired) => { Ok(fired) => {
// log_transition_event also broadcasts to the pipeline_event_bus.
log_transition_event(&fired); log_transition_event(&fired);
next_logical_seq += 1; next_logical_seq += 1;
} }
+10
View File
@@ -86,6 +86,14 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
ws::subscribe_status(tx.clone(), ctx.services.status.subscribe()); ws::subscribe_status(tx.clone(), ctx.services.status.subscribe());
} }
// Subscribe to real-time pipeline-transition events for this persona.
// Events that arrived while no client was connected are caught up by
// assemble_prompt_context at turn time.
ws::subscribe_persona_pipeline_events(
tx.clone(),
ctx.services.bot_name.to_lowercase(),
);
// Map of pending permission request_id -> oneshot responder. // Map of pending permission request_id -> oneshot responder.
let mut pending_perms: HashMap<String, oneshot::Sender<PermissionDecision>> = let mut pending_perms: HashMap<String, oneshot::Sender<PermissionDecision>> =
HashMap::new(); HashMap::new();
@@ -109,9 +117,11 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
let tx_activity = tx.clone(); let tx_activity = tx.clone();
let ctx_clone = ctx.clone(); let ctx_clone = ctx.clone();
let persona = ctx_clone.services.bot_name.to_lowercase();
let chat_fut = chat::chat( let chat_fut = chat::chat(
messages, messages,
config, config,
&persona,
&ctx_clone.state, &ctx_clone.state,
ctx_clone.store.as_ref(), ctx_clone.store.as_ref(),
move |history| { move |history| {
+9 -6
View File
@@ -113,10 +113,13 @@ pub fn cancel_chat(state: &SessionState) -> Result<(), String> {
} }
/// Run a multi-turn chat with tool calling against the configured provider. /// Run a multi-turn chat with tool calling against the configured provider.
///
/// `persona` is the persona name used to key CRDT event-log assembly (e.g. `"timmy"`).
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn chat<F, U, T, A>( pub async fn chat<F, U, T, A>(
mut messages: Vec<Message>, mut messages: Vec<Message>,
config: ProviderConfig, config: ProviderConfig,
persona: &str,
state: &SessionState, state: &SessionState,
store: &dyn StoreOps, store: &dyn StoreOps,
mut on_update: F, mut on_update: F,
@@ -140,12 +143,9 @@ where
inject_received_at(&mut messages, &received_at); inject_received_at(&mut messages, &received_at);
// Assemble CRDT pipeline-transition events once per turn and advance the // Assemble CRDT pipeline-transition events once per turn and advance the
// high-water mark. Uses the Claude Code session_id when available so the // high-water mark. Uses the caller-supplied persona so all transports share
// same event stream key is used for resumable sessions; falls back to // the same event stream regardless of transport-specific session identifiers.
// "web-ui" for Anthropic/Ollama turns which have no persistent session. let event_ctx = crate::llm_session::assemble_prompt_context(persona);
let event_ctx = crate::llm_session::assemble_prompt_context(
config.session_id.as_deref().unwrap_or("web-ui"),
);
let _ = state.cancel_tx.send(false); let _ = state.cancel_tx.send(false);
let mut cancel_rx = state.cancel_rx.clone(); let mut cancel_rx = state.cancel_rx.clone();
@@ -628,6 +628,7 @@ mod tests {
let result = chat( let result = chat(
messages, messages,
config, config,
"timmy",
&state, &state,
&store, &store,
|_| {}, |_| {},
@@ -672,6 +673,7 @@ mod tests {
let result = chat( let result = chat(
messages, messages,
config, config,
"timmy",
&state, &state,
&store, &store,
|_| {}, |_| {},
@@ -712,6 +714,7 @@ mod tests {
let result = chat( let result = chat(
messages, messages,
config, config,
"timmy",
&state, &state,
&store, &store,
|_| {}, |_| {},
+77 -23
View File
@@ -1,23 +1,23 @@
//! LLM session management — CRDT-backed context assembly for bot prompts. //! LLM session management — CRDT-backed context assembly for bot prompts.
//! //!
//! The central export is [`assemble_prompt_context`], which reads new pipeline //! The central export is [`assemble_prompt_context`], which reads new pipeline
//! transition events from the CRDT event log past the session's stored high-water //! transition events from the CRDT event log past the persona's stored high-water
//! marks, wraps them in a `<system-reminder>` block for injection at the head of //! marks, wraps them in a `<system-reminder>` block for injection at the head of
//! the next LLM prompt, and atomically advances the marks so a mid-turn crash //! the next LLM prompt, and atomically advances the marks so a mid-turn crash
//! cannot double-inject the same events. //! cannot double-inject the same events.
/// Assemble a `<system-reminder>` block containing new pipeline-transition events /// Assemble a `<system-reminder>` block containing new pipeline-transition events
/// for `session_id` and atomically advance the high-water marks. /// for `persona` and atomically advance the high-water marks.
/// ///
/// Reads events from the local sled's CRDT event log that have not yet been /// All chat transports call this with the same persona name (e.g. `"timmy"`)
/// injected into this session (tracked via per-sled high-water marks stored in /// so that events are visible to whichever transport handles the next turn,
/// the `LlmSessionCrdt` entity). Returns an empty string when there are no new /// regardless of transport-specific session identifiers. Returns an empty
/// events or the CRDT is not yet initialised. /// string when there are no new events or the CRDT is not yet initialised.
pub fn assemble_prompt_context(session_id: &str) -> String { pub fn assemble_prompt_context(persona: &str) -> String {
let lines = crate::crdt_state::assemble_and_advance_session(session_id); let lines = crate::crdt_state::assemble_and_advance_session(persona);
let event_count = lines.len(); let event_count = lines.len();
crate::slog!( crate::slog!(
"[llm-session] assemble_prompt_context session={session_id} new_events={event_count}" "[llm-session] assemble_prompt_context persona={persona} new_events={event_count}"
); );
if lines.is_empty() { if lines.is_empty() {
return String::new(); return String::new();
@@ -187,14 +187,14 @@ mod tests {
"AgentCompleted", "AgentCompleted",
); );
// Set up a session scoped to ALL sleds. // Set up a persona scoped to ALL sleds.
crate::crdt_state::write_llm_session("room-scope-all", "Timmy", "all"); crate::crdt_state::write_llm_session("timmy", "all");
// Set up a session scoped to sled-A only. // Set up a persona scoped to sled-A only.
let sled_a_scope = format!("sleds:{sled_a}"); let sled_a_scope = format!("sleds:{sled_a}");
crate::crdt_state::write_llm_session("room-scope-sled-a", "Sally", &sled_a_scope); crate::crdt_state::write_llm_session("sally", &sled_a_scope);
// All-scope session: both events must appear. // All-scope persona: both events must appear.
let ctx_all = assemble_prompt_context("room-scope-all"); let ctx_all = assemble_prompt_context("timmy");
assert!( assert!(
ctx_all.contains("10_story_alpha"), ctx_all.contains("10_story_alpha"),
"All scope must contain sled-A event; got: {ctx_all}" "All scope must contain sled-A event; got: {ctx_all}"
@@ -204,8 +204,8 @@ mod tests {
"All scope must contain sled-B event; got: {ctx_all}" "All scope must contain sled-B event; got: {ctx_all}"
); );
// Sled-A-only session: only sled-A's event visible. // Sled-A-only persona: only sled-A's event visible.
let ctx_a = assemble_prompt_context("room-scope-sled-a"); let ctx_a = assemble_prompt_context("sally");
assert!( assert!(
ctx_a.contains("10_story_alpha"), ctx_a.contains("10_story_alpha"),
"Sleds filter must contain sled-A event; got: {ctx_a}" "Sleds filter must contain sled-A event; got: {ctx_a}"
@@ -215,19 +215,73 @@ mod tests {
"Sleds filter must NOT contain sled-B event; got: {ctx_a}" "Sleds filter must NOT contain sled-B event; got: {ctx_a}"
); );
// Second call on both sessions: nothing new (high-water advanced). // Second call on both personas: nothing new (high-water advanced).
let ctx_all2 = assemble_prompt_context("room-scope-all"); let ctx_all2 = assemble_prompt_context("timmy");
assert!( assert!(
ctx_all2.is_empty(), ctx_all2.is_empty(),
"All scope second call must be empty; got: {ctx_all2}" "All scope second call must be empty; got: {ctx_all2}"
); );
let ctx_a2 = assemble_prompt_context("room-scope-sled-a"); let ctx_a2 = assemble_prompt_context("sally");
assert!( assert!(
ctx_a2.is_empty(), ctx_a2.is_empty(),
"Sleds filter second call must be empty; got: {ctx_a2}" "Sleds filter second call must be empty; got: {ctx_a2}"
); );
} }
/// AC 5 e2e: fire a pipeline transition, then verify that calling
/// `assemble_prompt_context` with the same persona key from any "transport"
/// (simulated by different caller labels) sees the event. The persona is
/// transport-agnostic; subsequent transports sharing the persona see their
/// own new events independently via independent calls (each drains a fresh
/// batch).
#[test]
fn persona_key_is_transport_agnostic() {
crate::crdt_state::init_for_test();
crate::crdt_state::write_llm_session("timmy", "all");
// Fire event 1.
crate::event_log::log_transition_event(&make_fired("e2e_story_1"));
// Matrix turn: see event 1.
let matrix_ctx = assemble_prompt_context("timmy");
assert!(
matrix_ctx.contains("e2e_story_1"),
"Matrix turn must see event 1; got: {matrix_ctx}"
);
// Fire event 2.
crate::event_log::log_transition_event(&make_fired("e2e_story_2"));
// Web-UI turn (same persona): see event 2 only (event 1 high-water already advanced).
let web_ui_ctx = assemble_prompt_context("timmy");
assert!(
web_ui_ctx.contains("e2e_story_2"),
"Web-UI turn must see event 2; got: {web_ui_ctx}"
);
assert!(
!web_ui_ctx.contains("e2e_story_1"),
"Web-UI turn must NOT re-see event 1; got: {web_ui_ctx}"
);
// Fire event 3.
crate::event_log::log_transition_event(&make_fired("e2e_story_3"));
// CLI turn (same persona): see event 3 only.
let cli_ctx = assemble_prompt_context("timmy");
assert!(
cli_ctx.contains("e2e_story_3"),
"CLI turn must see event 3; got: {cli_ctx}"
);
assert!(
!cli_ctx.contains("e2e_story_1"),
"CLI turn must NOT re-see event 1; got: {cli_ctx}"
);
assert!(
!cli_ctx.contains("e2e_story_2"),
"CLI turn must NOT re-see event 2; got: {cli_ctx}"
);
}
/// Newly-added sled events appear in an All-scope session without /// Newly-added sled events appear in an All-scope session without
/// restarting (AC 5 runtime pickup). /// restarting (AC 5 runtime pickup).
#[test] #[test]
@@ -246,9 +300,9 @@ mod tests {
"2_current", "2_current",
"DepsMet", "DepsMet",
); );
crate::crdt_state::write_llm_session("room-runtime-pickup", "Timmy", "all"); crate::crdt_state::write_llm_session("timmy", "all");
let ctx1 = assemble_prompt_context("room-runtime-pickup"); let ctx1 = assemble_prompt_context("timmy");
assert!( assert!(
ctx1.contains("30_story_first"), ctx1.contains("30_story_first"),
"first event must appear; got: {ctx1}" "first event must appear; got: {ctx1}"
@@ -264,7 +318,7 @@ mod tests {
"AgentCompleted", "AgentCompleted",
); );
let ctx2 = assemble_prompt_context("room-runtime-pickup"); let ctx2 = assemble_prompt_context("timmy");
assert!( assert!(
ctx2.contains("40_story_second"), ctx2.contains("40_story_second"),
"newly adopted sled event must appear; got: {ctx2}" "newly adopted sled event must appear; got: {ctx2}"
+7
View File
@@ -36,6 +36,8 @@ pub mod log_buffer;
pub mod mesh; pub mod mesh;
/// Node identity — Ed25519 keypair generation and stable node ID management. /// Node identity — Ed25519 keypair generation and stable node ID management.
pub mod node_identity; pub mod node_identity;
/// Pipeline event bus — real-time broadcast of pipeline-transition events to persona subscribers.
pub(crate) mod pipeline_event_bus;
pub(crate) mod pipeline_state; pub(crate) mod pipeline_state;
/// Reliable process-termination primitives shared across the server. /// Reliable process-termination primitives shared across the server.
pub mod process_kill; pub mod process_kill;
@@ -286,6 +288,11 @@ async fn main() -> Result<(), std::io::Error> {
)), )),
}); });
// Register the bot's persona in the CRDT so all transports share a single
// event-log high-water mark keyed by name rather than transport ids.
// scope="all" gives the gateway persona a cross-sled view of pipeline events.
crate::crdt_state::write_llm_session(&services.bot_name.to_lowercase(), "all");
// Sled uplink: forward permission requests to an upstream gateway when configured. // Sled uplink: forward permission requests to an upstream gateway when configured.
let upstream_gateway = cli let upstream_gateway = cli
.upstream_gateway .upstream_gateway
+84
View File
@@ -0,0 +1,84 @@
//! Real-time pipeline event bus — broadcasts `TransitionFired` events to
//! per-persona WebSocket subscribers as they happen.
//!
//! Turn-time [`crate::llm_session::assemble_prompt_context`] still works as
//! a fallback catch-up mechanism for any events that accumulated while no
//! subscriber was connected.
use std::sync::OnceLock;
use tokio::sync::broadcast;
/// Capacity of the per-persona event bus broadcast channel.
const BUS_CAPACITY: usize = 256;
/// A raw pipeline-transition event forwarded from `log_transition_event`.
#[derive(Clone, Debug)]
pub struct BusEvent {
/// Hex-encoded sled ID that fired the transition.
pub sled_id: String,
/// Story identifier (e.g. `"42_story_foo"`).
pub story_id: String,
/// Human-readable label of the stage before the transition.
pub from_stage: String,
/// Human-readable label of the stage after the transition.
pub to_stage: String,
/// String label of the `PipelineEvent` variant.
pub pipeline_event: String,
}
static BUS_TX: OnceLock<broadcast::Sender<BusEvent>> = OnceLock::new();
/// Initialise the pipeline event bus. No-op on subsequent calls.
pub fn init() {
let _ = BUS_TX.get_or_init(|| broadcast::channel::<BusEvent>(BUS_CAPACITY).0);
}
/// Broadcast a pipeline transition event to all active subscribers.
///
/// No-op if the bus has not been initialised or there are no subscribers.
pub fn broadcast(event: BusEvent) {
if let Some(tx) = BUS_TX.get() {
let _ = tx.send(event);
}
}
/// Subscribe to the pipeline event bus.
///
/// Returns `None` if the bus has not been initialised yet.
pub fn subscribe() -> Option<broadcast::Receiver<BusEvent>> {
BUS_TX.get().map(|tx| tx.subscribe())
}
/// Render a [`BusEvent`] as the same compact audit line used in
/// `assemble_and_advance_session`.
pub fn render_event(event: &BusEvent) -> String {
if event.pipeline_event == crate::crdt_state::GAP_PIPELINE_EVENT {
format!(
"events between {} and {} were dropped",
event.from_stage, event.to_stage
)
} else {
format!(
"pipeline_event sled_id=\"{}\" story_id=\"{}\" from=\"{}\" to=\"{}\" event=\"{}\"",
event.sled_id, event.story_id, event.from_stage, event.to_stage, event.pipeline_event
)
}
}
/// Returns `true` if `event` should be delivered to `persona` based on the
/// persona's stored scope filter in the CRDT.
///
/// Falls back to local-sled-only if no session entry exists for `persona`.
pub fn event_matches_persona(event: &BusEvent, persona: &str) -> bool {
use crate::crdt_state::ScopeFilter;
match crate::crdt_state::read_llm_session(persona) {
Some(session) => match &session.scope_filter {
ScopeFilter::All => true,
ScopeFilter::Sleds(ids) => ids.contains(&event.sled_id),
},
None => {
let local = crate::crdt_state::our_node_id().unwrap_or_default();
!local.is_empty() && event.sled_id == local
}
}
}
+28
View File
@@ -132,6 +132,34 @@ pub fn subscribe_status(tx: mpsc::UnboundedSender<WsResponse>, mut subscription:
}); });
} }
/// Spawn a background task that forwards real-time pipeline-transition events to
/// the client, filtered to those visible to `persona` based on its scope filter.
///
/// Each matching event is delivered as a [`WsResponse::PipelineEvent`] frame.
/// Events that occur while no subscriber is connected are NOT delivered here;
/// [`crate::llm_session::assemble_prompt_context`] catches up on those at turn time.
pub fn subscribe_persona_pipeline_events(tx: mpsc::UnboundedSender<WsResponse>, persona: String) {
let Some(mut rx) = crate::pipeline_event_bus::subscribe() else {
return;
};
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => {
if crate::pipeline_event_bus::event_matches_persona(&event, &persona) {
let line = crate::pipeline_event_bus::render_event(&event);
if tx.send(WsResponse::PipelineEvent { line }).is_err() {
break;
}
}
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
});
}
/// Spawn a background task that forwards reconciliation events to the client. /// Spawn a background task that forwards reconciliation events to the client.
pub fn subscribe_reconciliation( pub fn subscribe_reconciliation(
tx: mpsc::UnboundedSender<WsResponse>, tx: mpsc::UnboundedSender<WsResponse>,
@@ -127,6 +127,13 @@ pub enum WsResponse {
StatusUpdate { StatusUpdate {
event: StatusEvent, event: StatusEvent,
}, },
/// A real-time pipeline-transition event pushed to the client as it happens.
///
/// Carries the same compact audit-line format used in `<system-reminder>`
/// blocks so that LLM-aware clients can consume it without additional parsing.
PipelineEvent {
line: String,
},
} }
#[cfg(test)] #[cfg(test)]
+2 -1
View File
@@ -23,6 +23,7 @@ pub use dispatch::{
}; };
pub use io::{ pub use io::{
check_onboarding, load_initial_pipeline_state, load_recent_logs, load_wizard_state, check_onboarding, load_initial_pipeline_state, load_recent_logs, load_wizard_state,
subscribe_logs, subscribe_reconciliation, subscribe_status, subscribe_watcher, subscribe_logs, subscribe_persona_pipeline_events, subscribe_reconciliation, subscribe_status,
subscribe_watcher,
}; };
pub use message::{WizardStepInfo, WsResponse}; pub use message::{WizardStepInfo, WsResponse};
+4
View File
@@ -28,6 +28,10 @@ pub(crate) fn spawn_event_bridges(
// Audit log subscriber: write one structured line per pipeline transition. // Audit log subscriber: write one structured line per pipeline transition.
crate::pipeline_state::spawn_audit_log_subscriber(); crate::pipeline_state::spawn_audit_log_subscriber();
// Pipeline event bus: initialise before the event-log subscriber so that
// real-time broadcasts are ready before the first transition fires.
crate::pipeline_event_bus::init();
// Event log subscriber: persist every transition to the CRDT event log so // Event log subscriber: persist every transition to the CRDT event log so
// the history survives rebuild_and_restart and replicates across nodes. // the history survives rebuild_and_restart and replicates across nodes.
crate::event_log::spawn_event_log_subscriber(); crate::event_log::spawn_event_log_subscriber();