diff --git a/server/src/crdt_state/lww_maps/llm_sessions.rs b/server/src/crdt_state/lww_maps/llm_sessions.rs index a1a29640..9ba70e2c 100644 --- a/server/src/crdt_state/lww_maps/llm_sessions.rs +++ b/server/src/crdt_state/lww_maps/llm_sessions.rs @@ -6,14 +6,14 @@ //! only events the LLM has not yet seen and advance the marks atomically within //! a single CRDT lock acquisition. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use bft_json_crdt::json_crdt::{JsonValue, *}; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use super::super::state::{apply_and_persist, get_crdt, rebuild_llm_session_index}; -use super::super::types::{LlmSessionCrdt, LlmSessionView}; +use super::super::types::{LlmSessionCrdt, LlmSessionView, ScopeFilter}; use super::event_log::GAP_PIPELINE_EVENT; /// Write or upsert an LLM session entry keyed by `session_id`. @@ -22,6 +22,9 @@ use super::event_log::GAP_PIPELINE_EVENT; /// `persona_name` and `scope` on an existing entry. The `high_water` /// register is not touched by this function — use `assemble_and_advance_session` /// to advance it atomically. +/// +/// The `scope` string must be in wire form: `"all"` for [`ScopeFilter::All`] +/// or `"sleds:hex1,hex2"` for [`ScopeFilter::Sleds`]. pub fn write_llm_session(session_id: &str, persona_name: &str, scope: &str) { let Some(state_mutex) = get_crdt() else { return; @@ -66,17 +69,20 @@ pub fn read_llm_session(session_id: &str) -> Option { /// high-water marks, render them as a block of audit lines, and advance the /// marks to prevent double-injection on the next call. /// -/// Scope is "single-sled": only events recorded by the local node (identified -/// via [`crate::crdt_state::our_node_id`]) are included. Events from other -/// sleds are ignored in this story. +/// The set of sleds whose events are collected is determined by the session's +/// [`ScopeFilter`]: +/// - [`ScopeFilter::All`]: events from every sled present in the event log are +/// included — this is the gateway-level persona default that gives a full +/// cross-sled view. +/// - [`ScopeFilter::Sleds`]: only events whose `sled_id` is in the stored set +/// are included. When the stored set is empty (legacy `"single-sled"` rows or +/// freshly created sessions with no explicit scope), the local node's sled ID +/// is used as the sole member, preserving prior single-sled behaviour. /// /// Returns an empty `Vec` when there are no new events or the CRDT is not /// initialised. pub fn assemble_and_advance_session(session_id: &str) -> Vec { let local_sled_id = crate::crdt_state::our_node_id().unwrap_or_default(); - if local_sled_id.is_empty() { - return Vec::new(); - } let Some(state_mutex) = get_crdt() else { return Vec::new(); @@ -85,33 +91,81 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec { return Vec::new(); }; - // Read the current high-water map for this session. - let current_high_water: BTreeMap = { - match state.llm_session_index.get(session_id).copied() { - Some(idx) => parse_high_water(&state.crdt.doc.llm_sessions[idx]), - None => BTreeMap::new(), + // Determine the session's scope filter and current high-water map. + let (scope_filter, current_high_water) = match state.llm_session_index.get(session_id).copied() + { + Some(idx) => { + let filter = parse_scope(&state.crdt.doc.llm_sessions[idx], &local_sled_id); + let hw = parse_high_water(&state.crdt.doc.llm_sessions[idx]); + (filter, hw) + } + None => { + // New session with no stored entry: default to local sled only. + let mut ids = BTreeSet::new(); + if !local_sled_id.is_empty() { + ids.insert(local_sled_id.clone()); + } + (ScopeFilter::Sleds(ids), BTreeMap::new()) } }; - let last_seen = current_high_water.get(&local_sled_id).copied(); + // Build the set of sled IDs to collect events from. + let target_sleds: BTreeSet = match &scope_filter { + ScopeFilter::All => { + // Collect every unique sled_id present in the event log at this moment + // (live, not snapshotted — picks up newly adopted sleds automatically). + state + .crdt + .doc + .event_log + .iter() + .filter_map(|e| match e.sled_id.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }) + .collect() + } + ScopeFilter::Sleds(ids) if ids.is_empty() => { + // Empty set → legacy fallback: local sled only. + if local_sled_id.is_empty() { + return Vec::new(); + } + std::iter::once(local_sled_id.clone()).collect() + } + ScopeFilter::Sleds(ids) => ids.clone(), + }; - // Collect new events from the local sled past the high-water mark. - let new_events: Vec<(u64, String, String, String, String)> = state + if target_sleds.is_empty() { + return Vec::new(); + } + + // Collect new events from each target sled past its high-water mark. + let mut new_events: Vec<(f64, String, String, String, String, String)> = state .crdt .doc .event_log .iter() - .filter_map(|e| extract_new_event(e, &local_sled_id, last_seen)) + .filter_map(|e| extract_new_event_multi(e, &target_sleds, ¤t_high_water)) .collect(); if new_events.is_empty() { return Vec::new(); } - // Advance the high-water mark to the maximum new event_seq. - let new_max_seq = new_events.iter().map(|(seq, ..)| *seq).max().unwrap_or(0); + // Sort by (sled_id, event_seq) for deterministic ordering. + new_events.sort_by(|a, b| { + a.1.cmp(&b.1) + .then(a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal)) + }); + + // Advance the high-water mark for each sled that had new events. let mut new_high_water = current_high_water; - new_high_water.insert(local_sled_id.clone(), new_max_seq); + for (seq, sled_id, ..) in &new_events { + let entry = new_high_water.entry(sled_id.clone()).or_insert(0); + if *seq as u64 > *entry { + *entry = *seq as u64; + } + } let new_hw_json = serde_json::to_string(&new_high_water).unwrap_or_else(|_| "{}".to_string()); // Upsert the session entry with the new high-water value. @@ -123,10 +177,11 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec { .set(new_hw_json.clone()) }); } else { + let scope_str = scope_filter.to_scope_str(); let entry: JsonValue = json!({ "session_id": session_id, "persona_name": "", - "scope": "single-sled", + "scope": scope_str, "high_water": new_hw_json, }) .into(); @@ -136,13 +191,14 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec { state.llm_session_index = rebuild_llm_session_index(&state.crdt); } - // Observability: log event-log size and gap count for this sled. + // Observability: log event-log size and gap count across the session's + // target sleds (the scope actually assembled for this session). let total_entries = state .crdt .doc .event_log .iter() - .filter(|e| matches!(e.sled_id.view(), JsonValue::String(s) if s == local_sled_id)) + .filter(|e| matches!(e.sled_id.view(), JsonValue::String(s) if target_sleds.contains(&s))) .count(); let gap_count = state .crdt @@ -150,7 +206,7 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec { .event_log .iter() .filter(|e| { - matches!(e.sled_id.view(), JsonValue::String(s) if s == local_sled_id) + matches!(e.sled_id.view(), JsonValue::String(s) if target_sleds.contains(&s)) && matches!(e.pipeline_event.view(), JsonValue::String(s) if s == GAP_PIPELINE_EVENT) }) .count(); @@ -162,16 +218,18 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec { // human-readable message so the LLM is never presented with raw field data. new_events .into_iter() - .map(|(_, story_id, from_stage, to_stage, pipeline_event)| { - if pipeline_event == GAP_PIPELINE_EVENT { - format!("events between {from_stage} and {to_stage} were dropped") - } else { - format!( - "pipeline_event story_id=\"{story_id}\" from=\"{from_stage}\" \ - to=\"{to_stage}\" event=\"{pipeline_event}\"" - ) - } - }) + .map( + |(_, sled_id, story_id, from_stage, to_stage, pipeline_event)| { + if pipeline_event == GAP_PIPELINE_EVENT { + format!("events between {from_stage} and {to_stage} were dropped") + } else { + format!( + "pipeline_event sled_id=\"{sled_id}\" story_id=\"{story_id}\" \ + from=\"{from_stage}\" to=\"{to_stage}\" event=\"{pipeline_event}\"" + ) + } + }, + ) .collect() } @@ -185,24 +243,47 @@ fn parse_high_water(entry: &LlmSessionCrdt) -> BTreeMap { } } -/// Extract one event log entry if it belongs to `sled_id` and has an -/// `event_seq` strictly greater than `last_seen` (or `last_seen` is `None`, -/// meaning all events from this sled are new). -fn extract_new_event( +/// Parse the scope filter from an `LlmSessionCrdt` entry, falling back to +/// a single-element set containing `local_sled_id` for legacy / empty scope strings. +fn parse_scope(entry: &LlmSessionCrdt, local_sled_id: &str) -> ScopeFilter { + let raw = match entry.scope.view() { + JsonValue::String(s) => s, + _ => String::new(), + }; + let filter = ScopeFilter::from_scope_str(&raw); + // For a Sleds filter with an empty set (legacy "single-sled" or ""), + // fall back to the local sled. + if let ScopeFilter::Sleds(ref ids) = filter + && ids.is_empty() + && !local_sled_id.is_empty() + { + let mut fallback = BTreeSet::new(); + fallback.insert(local_sled_id.to_string()); + return ScopeFilter::Sleds(fallback); + } + filter +} + +/// Extract one event log entry if its `sled_id` is in `target_sleds` and its +/// `event_seq` is strictly greater than the matching high-water value (or no +/// high-water has been recorded yet for that sled). +/// +/// Returns `(event_seq, sled_id, story_id, from_stage, to_stage, pipeline_event)`. +fn extract_new_event_multi( e: &crate::crdt_state::types::EventLogEntryCrdt, - sled_id: &str, - last_seen: Option, -) -> Option<(u64, String, String, String, String)> { - let entry_sled = match e.sled_id.view() { - JsonValue::String(s) if s == sled_id => s, + target_sleds: &BTreeSet, + high_water: &BTreeMap, +) -> Option<(f64, String, String, String, String, String)> { + let sled_id = match e.sled_id.view() { + JsonValue::String(s) if !s.is_empty() && target_sleds.contains(&s) => s, _ => return None, }; let event_seq = match e.event_seq.view() { - JsonValue::Number(n) => n as u64, + JsonValue::Number(n) => n, _ => return None, }; - // Skip if we've already injected this event. - if last_seen.is_some_and(|last| event_seq <= last) { + let last_seen = high_water.get(&sled_id).copied(); + if last_seen.is_some_and(|last| event_seq as u64 <= last) { return None; } let story_id = match e.story_id.view() { @@ -221,8 +302,14 @@ fn extract_new_event( JsonValue::String(s) => s, _ => String::new(), }; - let _ = entry_sled; // used only for filtering above - Some((event_seq, story_id, from_stage, to_stage, pipeline_event)) + Some(( + event_seq, + sled_id, + story_id, + from_stage, + to_stage, + pipeline_event, + )) } /// Convert a CRDT LLM session entry into its read-only view representation. @@ -235,15 +322,13 @@ pub(super) fn extract_llm_session_view(entry: &LlmSessionCrdt) -> Option s, _ => String::new(), }; - let scope = match entry.scope.view() { - JsonValue::String(s) => s, - _ => String::new(), - }; + let local_sled_id = crate::crdt_state::our_node_id().unwrap_or_default(); + let scope_filter = parse_scope(entry, &local_sled_id); let high_water = parse_high_water(entry); Some(LlmSessionView { session_id, persona_name, - scope, + scope_filter, high_water, }) } diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index ed7aae85..1e9a14ca 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -53,8 +53,8 @@ pub use types::{ ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent, EpicId, EventLogEntryCrdt, GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, LlmSessionCrdt, LlmSessionView, MergeJobCrdt, MergeJobView, NodePresenceCrdt, NodePresenceView, PipelineDoc, - PipelineItemCrdt, PipelineItemView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, - WorkItem, + PipelineItemCrdt, PipelineItemView, ScopeFilter, TestJobCrdt, TestJobView, TokenUsageCrdt, + TokenUsageView, WorkItem, }; pub use write::{ bump_retry_count, migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs, diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs index f4c834d1..60723fc5 100644 --- a/server/src/crdt_state/types.rs +++ b/server/src/crdt_state/types.rs @@ -90,21 +90,75 @@ pub struct LlmSessionCrdt { pub session_id: LwwRegisterCrdt, /// Human-readable persona name (e.g. `"Timmy"`). pub persona_name: LwwRegisterCrdt, - /// Scope tag: `"single-sled"` for now; extended in future stories. + /// Scope wire string parsed by [`ScopeFilter::from_scope_str`]: `"all"`, + /// `"sleds:hex1,hex2"`, or legacy `"single-sled"` / empty (→ local sled). pub scope: LwwRegisterCrdt, /// JSON-serialised `BTreeMap` tracking how far /// each sled's event stream has been injected into this session's prompts. pub high_water: LwwRegisterCrdt, } +/// Which sleds' events an LLM session may see. +/// +/// Stored as a compact string in the CRDT register and parsed at read time. +/// The default for a freshly-created session with no stored scope is +/// [`ScopeFilter::LocalOnly`], which preserves prior single-sled behaviour. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum ScopeFilter { + /// Include events from every sled present in the CRDT event log. + /// + /// Default for gateway-level personas (e.g. Timmy in multi-project mode). + All, + /// Include only events whose `sled_id` is in the given set. + /// + /// Default for sled-level personas: the set contains only the sled's own ID. + Sleds(std::collections::BTreeSet), +} + +impl ScopeFilter { + /// Parse a wire-form scope string stored in the CRDT register. + /// + /// Recognised forms: + /// - `"all"` → [`ScopeFilter::All`] + /// - `"sleds:hex1,hex2,…"` → [`ScopeFilter::Sleds`] + /// - Anything else (including legacy `"single-sled"` and empty) → + /// [`ScopeFilter::Sleds`] with an empty set; callers should fall back + /// to the local sled ID in that case. + pub fn from_scope_str(s: &str) -> Self { + if s == "all" { + return ScopeFilter::All; + } + if let Some(rest) = s.strip_prefix("sleds:") { + let ids = rest + .split(',') + .filter(|id| !id.is_empty()) + .map(|id| id.to_string()) + .collect(); + return ScopeFilter::Sleds(ids); + } + ScopeFilter::Sleds(std::collections::BTreeSet::new()) + } + + /// Encode this filter as the compact wire string stored in the CRDT. + pub fn to_scope_str(&self) -> String { + match self { + ScopeFilter::All => "all".to_string(), + ScopeFilter::Sleds(ids) => { + let joined = ids.iter().map(|s| s.as_str()).collect::>().join(","); + format!("sleds:{joined}") + } + } + } +} + /// Read-side snapshot of a single LLM session entry. pub struct LlmSessionView { /// Stable session identifier. pub session_id: String, /// Persona name for the bot in this session. pub persona_name: String, - /// Scope tag (e.g. `"single-sled"`). - pub scope: String, + /// Parsed event-scope filter derived from the `scope` CRDT register. + pub scope_filter: ScopeFilter, /// Decoded high-water map: sled_id → last seen event_seq. pub high_water: std::collections::BTreeMap, } diff --git a/server/src/llm_session/mod.rs b/server/src/llm_session/mod.rs index 5fb6e8da..4e777215 100644 --- a/server/src/llm_session/mod.rs +++ b/server/src/llm_session/mod.rs @@ -155,4 +155,123 @@ mod tests { let ctx = assemble_prompt_context("room-empty"); assert!(ctx.is_empty(), "must be empty with no events; got: {ctx}"); } + + /// AC 4: two sleds each fire one transition; a session scoped `All` sees + /// both events; a session scoped `Sleds([sled-A])` sees only sled-A's event. + /// + /// Simulates the gateway aggregate view by directly calling + /// `append_event_log_entry` with two distinct sled IDs, then asserting + /// scope-filtered assembly behaves correctly. + #[test] + fn scope_filter_all_sees_both_sleds_filter_sees_one() { + crate::crdt_state::init_for_test(); + + let sled_a = "aaaaaaaaaaaaaaaa"; + let sled_b = "bbbbbbbbbbbbbbbb"; + + // Each sled fires one pipeline transition. + crate::crdt_state::append_event_log_entry( + sled_a, + 1_000_000.0, + "10_story_alpha", + "1_backlog", + "2_current", + "DepsMet", + ); + crate::crdt_state::append_event_log_entry( + sled_b, + 1_000_001.0, + "20_story_beta", + "2_current", + "3_qa", + "AgentCompleted", + ); + + // Set up a session scoped to ALL sleds. + crate::crdt_state::write_llm_session("room-scope-all", "Timmy", "all"); + // Set up a session scoped to sled-A only. + let sled_a_scope = format!("sleds:{sled_a}"); + crate::crdt_state::write_llm_session("room-scope-sled-a", "Sally", &sled_a_scope); + + // All-scope session: both events must appear. + let ctx_all = assemble_prompt_context("room-scope-all"); + assert!( + ctx_all.contains("10_story_alpha"), + "All scope must contain sled-A event; got: {ctx_all}" + ); + assert!( + ctx_all.contains("20_story_beta"), + "All scope must contain sled-B event; got: {ctx_all}" + ); + + // Sled-A-only session: only sled-A's event visible. + let ctx_a = assemble_prompt_context("room-scope-sled-a"); + assert!( + ctx_a.contains("10_story_alpha"), + "Sleds filter must contain sled-A event; got: {ctx_a}" + ); + assert!( + !ctx_a.contains("20_story_beta"), + "Sleds filter must NOT contain sled-B event; got: {ctx_a}" + ); + + // Second call on both sessions: nothing new (high-water advanced). + let ctx_all2 = assemble_prompt_context("room-scope-all"); + assert!( + ctx_all2.is_empty(), + "All scope second call must be empty; got: {ctx_all2}" + ); + let ctx_a2 = assemble_prompt_context("room-scope-sled-a"); + assert!( + ctx_a2.is_empty(), + "Sleds filter second call must be empty; got: {ctx_a2}" + ); + } + + /// Newly-added sled events appear in an All-scope session without + /// restarting (AC 5 runtime pickup). + #[test] + fn scope_filter_all_picks_up_new_sled_at_runtime() { + crate::crdt_state::init_for_test(); + + let sled_a = "cccccccccccccccc"; + let sled_new = "dddddddddddddddd"; + + // Only sled-A exists initially. + crate::crdt_state::append_event_log_entry( + sled_a, + 2_000_000.0, + "30_story_first", + "1_backlog", + "2_current", + "DepsMet", + ); + crate::crdt_state::write_llm_session("room-runtime-pickup", "Timmy", "all"); + + let ctx1 = assemble_prompt_context("room-runtime-pickup"); + assert!( + ctx1.contains("30_story_first"), + "first event must appear; got: {ctx1}" + ); + + // sled_new is adopted at runtime — its event is appended without restart. + crate::crdt_state::append_event_log_entry( + sled_new, + 2_000_001.0, + "40_story_second", + "2_current", + "3_qa", + "AgentCompleted", + ); + + let ctx2 = assemble_prompt_context("room-runtime-pickup"); + assert!( + ctx2.contains("40_story_second"), + "newly adopted sled event must appear; got: {ctx2}" + ); + assert!( + !ctx2.contains("30_story_first"), + "old event must not reappear; got: {ctx2}" + ); + } } diff --git a/server/src/service/gateway/mod.rs b/server/src/service/gateway/mod.rs index 38900ac4..fe17c65c 100644 --- a/server/src/service/gateway/mod.rs +++ b/server/src/service/gateway/mod.rs @@ -613,6 +613,12 @@ pub async fn init_project( /// Broadcast a status event received from a project node to all local subscribers. /// +/// For [`crate::service::events::StoredEvent::StageTransition`] events the +/// transition is also appended to the gateway's CRDT event log using the +/// project name as the `sled_id`. This builds the live tail-merged aggregate +/// view that [`crate::crdt_state::assemble_and_advance_session`] reads when a +/// session's [`crate::crdt_state::ScopeFilter`] is set to `All`. +/// /// Returns the number of active receivers that received the event. /// A return value of zero means no subscribers are currently connected. pub fn broadcast_status_event( @@ -620,6 +626,26 @@ pub fn broadcast_status_event( project: String, event: crate::service::events::StoredEvent, ) -> usize { + // Append StageTransition events to the gateway CRDT so the session + // assembler can deliver a unified cross-sled event stream. + if let crate::service::events::StoredEvent::StageTransition { + ref story_id, + ref from_stage, + ref to_stage, + timestamp_ms, + .. + } = event + { + let timestamp_secs = timestamp_ms as f64 / 1_000.0_f64; + crate::crdt_state::append_event_log_entry( + &project, + timestamp_secs, + story_id, + from_stage, + to_stage, + "StageTransition", + ); + } let msg = GatewayStatusEvent { project, event }; state.event_tx.send(msg).unwrap_or(0) }