huskies: merge 1126 story Gateway event aggregator with per-session scope filters (Timmy=All, Sally=single sled)

This commit is contained in:
dave
2026-05-17 20:57:14 +00:00
parent 71d3047ef0
commit 2d0387fe63
5 changed files with 342 additions and 58 deletions
+138 -53
View File
@@ -6,14 +6,14 @@
//! only events the LLM has not yet seen and advance the marks atomically within
//! a single CRDT lock acquisition.
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use bft_json_crdt::json_crdt::{JsonValue, *};
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use super::super::state::{apply_and_persist, get_crdt, rebuild_llm_session_index};
use super::super::types::{LlmSessionCrdt, LlmSessionView};
use super::super::types::{LlmSessionCrdt, LlmSessionView, ScopeFilter};
use super::event_log::GAP_PIPELINE_EVENT;
/// Write or upsert an LLM session entry keyed by `session_id`.
@@ -22,6 +22,9 @@ use super::event_log::GAP_PIPELINE_EVENT;
/// `persona_name` and `scope` on an existing entry. The `high_water`
/// register is not touched by this function — use `assemble_and_advance_session`
/// to advance it atomically.
///
/// The `scope` string must be in wire form: `"all"` for [`ScopeFilter::All`]
/// or `"sleds:hex1,hex2"` for [`ScopeFilter::Sleds`].
pub fn write_llm_session(session_id: &str, persona_name: &str, scope: &str) {
let Some(state_mutex) = get_crdt() else {
return;
@@ -66,17 +69,20 @@ pub fn read_llm_session(session_id: &str) -> Option<LlmSessionView> {
/// high-water marks, render them as a block of audit lines, and advance the
/// marks to prevent double-injection on the next call.
///
/// Scope is "single-sled": only events recorded by the local node (identified
/// via [`crate::crdt_state::our_node_id`]) are included. Events from other
/// sleds are ignored in this story.
/// The set of sleds whose events are collected is determined by the session's
/// [`ScopeFilter`]:
/// - [`ScopeFilter::All`]: events from every sled present in the event log are
/// included — this is the gateway-level persona default that gives a full
/// cross-sled view.
/// - [`ScopeFilter::Sleds`]: only events whose `sled_id` is in the stored set
/// are included. When the stored set is empty (legacy `"single-sled"` rows or
/// freshly created sessions with no explicit scope), the local node's sled ID
/// is used as the sole member, preserving prior single-sled behaviour.
///
/// Returns an empty `Vec` when there are no new events or the CRDT is not
/// initialised.
pub fn assemble_and_advance_session(session_id: &str) -> Vec<String> {
let local_sled_id = crate::crdt_state::our_node_id().unwrap_or_default();
if local_sled_id.is_empty() {
return Vec::new();
}
let Some(state_mutex) = get_crdt() else {
return Vec::new();
@@ -85,33 +91,81 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec<String> {
return Vec::new();
};
// Read the current high-water map for this session.
let current_high_water: BTreeMap<String, u64> = {
match state.llm_session_index.get(session_id).copied() {
Some(idx) => parse_high_water(&state.crdt.doc.llm_sessions[idx]),
None => BTreeMap::new(),
// Determine the session's scope filter and current high-water map.
let (scope_filter, current_high_water) = match state.llm_session_index.get(session_id).copied()
{
Some(idx) => {
let filter = parse_scope(&state.crdt.doc.llm_sessions[idx], &local_sled_id);
let hw = parse_high_water(&state.crdt.doc.llm_sessions[idx]);
(filter, hw)
}
None => {
// New session with no stored entry: default to local sled only.
let mut ids = BTreeSet::new();
if !local_sled_id.is_empty() {
ids.insert(local_sled_id.clone());
}
(ScopeFilter::Sleds(ids), BTreeMap::new())
}
};
let last_seen = current_high_water.get(&local_sled_id).copied();
// Build the set of sled IDs to collect events from.
let target_sleds: BTreeSet<String> = match &scope_filter {
ScopeFilter::All => {
// Collect every unique sled_id present in the event log at this moment
// (live, not snapshotted — picks up newly adopted sleds automatically).
state
.crdt
.doc
.event_log
.iter()
.filter_map(|e| match e.sled_id.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
})
.collect()
}
ScopeFilter::Sleds(ids) if ids.is_empty() => {
// Empty set → legacy fallback: local sled only.
if local_sled_id.is_empty() {
return Vec::new();
}
std::iter::once(local_sled_id.clone()).collect()
}
ScopeFilter::Sleds(ids) => ids.clone(),
};
// Collect new events from the local sled past the high-water mark.
let new_events: Vec<(u64, String, String, String, String)> = state
if target_sleds.is_empty() {
return Vec::new();
}
// Collect new events from each target sled past its high-water mark.
let mut new_events: Vec<(f64, String, String, String, String, String)> = state
.crdt
.doc
.event_log
.iter()
.filter_map(|e| extract_new_event(e, &local_sled_id, last_seen))
.filter_map(|e| extract_new_event_multi(e, &target_sleds, &current_high_water))
.collect();
if new_events.is_empty() {
return Vec::new();
}
// Advance the high-water mark to the maximum new event_seq.
let new_max_seq = new_events.iter().map(|(seq, ..)| *seq).max().unwrap_or(0);
// Sort by (sled_id, event_seq) for deterministic ordering.
new_events.sort_by(|a, b| {
a.1.cmp(&b.1)
.then(a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal))
});
// Advance the high-water mark for each sled that had new events.
let mut new_high_water = current_high_water;
new_high_water.insert(local_sled_id.clone(), new_max_seq);
for (seq, sled_id, ..) in &new_events {
let entry = new_high_water.entry(sled_id.clone()).or_insert(0);
if *seq as u64 > *entry {
*entry = *seq as u64;
}
}
let new_hw_json = serde_json::to_string(&new_high_water).unwrap_or_else(|_| "{}".to_string());
// Upsert the session entry with the new high-water value.
@@ -123,10 +177,11 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec<String> {
.set(new_hw_json.clone())
});
} else {
let scope_str = scope_filter.to_scope_str();
let entry: JsonValue = json!({
"session_id": session_id,
"persona_name": "",
"scope": "single-sled",
"scope": scope_str,
"high_water": new_hw_json,
})
.into();
@@ -136,13 +191,14 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec<String> {
state.llm_session_index = rebuild_llm_session_index(&state.crdt);
}
// Observability: log event-log size and gap count for this sled.
// Observability: log event-log size and gap count across the session's
// target sleds (the scope actually assembled for this session).
let total_entries = state
.crdt
.doc
.event_log
.iter()
.filter(|e| matches!(e.sled_id.view(), JsonValue::String(s) if s == local_sled_id))
.filter(|e| matches!(e.sled_id.view(), JsonValue::String(s) if target_sleds.contains(&s)))
.count();
let gap_count = state
.crdt
@@ -150,7 +206,7 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec<String> {
.event_log
.iter()
.filter(|e| {
matches!(e.sled_id.view(), JsonValue::String(s) if s == local_sled_id)
matches!(e.sled_id.view(), JsonValue::String(s) if target_sleds.contains(&s))
&& matches!(e.pipeline_event.view(), JsonValue::String(s) if s == GAP_PIPELINE_EVENT)
})
.count();
@@ -162,16 +218,18 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec<String> {
// human-readable message so the LLM is never presented with raw field data.
new_events
.into_iter()
.map(|(_, story_id, from_stage, to_stage, pipeline_event)| {
if pipeline_event == GAP_PIPELINE_EVENT {
format!("events between {from_stage} and {to_stage} were dropped")
} else {
format!(
"pipeline_event story_id=\"{story_id}\" from=\"{from_stage}\" \
to=\"{to_stage}\" event=\"{pipeline_event}\""
)
}
})
.map(
|(_, sled_id, story_id, from_stage, to_stage, pipeline_event)| {
if pipeline_event == GAP_PIPELINE_EVENT {
format!("events between {from_stage} and {to_stage} were dropped")
} else {
format!(
"pipeline_event sled_id=\"{sled_id}\" story_id=\"{story_id}\" \
from=\"{from_stage}\" to=\"{to_stage}\" event=\"{pipeline_event}\""
)
}
},
)
.collect()
}
@@ -185,24 +243,47 @@ fn parse_high_water(entry: &LlmSessionCrdt) -> BTreeMap<String, u64> {
}
}
/// Extract one event log entry if it belongs to `sled_id` and has an
/// `event_seq` strictly greater than `last_seen` (or `last_seen` is `None`,
/// meaning all events from this sled are new).
fn extract_new_event(
/// Parse the scope filter from an `LlmSessionCrdt` entry, falling back to
/// a single-element set containing `local_sled_id` for legacy / empty scope strings.
fn parse_scope(entry: &LlmSessionCrdt, local_sled_id: &str) -> ScopeFilter {
let raw = match entry.scope.view() {
JsonValue::String(s) => s,
_ => String::new(),
};
let filter = ScopeFilter::from_scope_str(&raw);
// For a Sleds filter with an empty set (legacy "single-sled" or ""),
// fall back to the local sled.
if let ScopeFilter::Sleds(ref ids) = filter
&& ids.is_empty()
&& !local_sled_id.is_empty()
{
let mut fallback = BTreeSet::new();
fallback.insert(local_sled_id.to_string());
return ScopeFilter::Sleds(fallback);
}
filter
}
/// Extract one event log entry if its `sled_id` is in `target_sleds` and its
/// `event_seq` is strictly greater than the matching high-water value (or no
/// high-water has been recorded yet for that sled).
///
/// Returns `(event_seq, sled_id, story_id, from_stage, to_stage, pipeline_event)`.
fn extract_new_event_multi(
e: &crate::crdt_state::types::EventLogEntryCrdt,
sled_id: &str,
last_seen: Option<u64>,
) -> Option<(u64, String, String, String, String)> {
let entry_sled = match e.sled_id.view() {
JsonValue::String(s) if s == sled_id => s,
target_sleds: &BTreeSet<String>,
high_water: &BTreeMap<String, u64>,
) -> Option<(f64, String, String, String, String, String)> {
let sled_id = match e.sled_id.view() {
JsonValue::String(s) if !s.is_empty() && target_sleds.contains(&s) => s,
_ => return None,
};
let event_seq = match e.event_seq.view() {
JsonValue::Number(n) => n as u64,
JsonValue::Number(n) => n,
_ => return None,
};
// Skip if we've already injected this event.
if last_seen.is_some_and(|last| event_seq <= last) {
let last_seen = high_water.get(&sled_id).copied();
if last_seen.is_some_and(|last| event_seq as u64 <= last) {
return None;
}
let story_id = match e.story_id.view() {
@@ -221,8 +302,14 @@ fn extract_new_event(
JsonValue::String(s) => s,
_ => String::new(),
};
let _ = entry_sled; // used only for filtering above
Some((event_seq, story_id, from_stage, to_stage, pipeline_event))
Some((
event_seq,
sled_id,
story_id,
from_stage,
to_stage,
pipeline_event,
))
}
/// Convert a CRDT LLM session entry into its read-only view representation.
@@ -235,15 +322,13 @@ pub(super) fn extract_llm_session_view(entry: &LlmSessionCrdt) -> Option<LlmSess
JsonValue::String(s) => s,
_ => String::new(),
};
let scope = match entry.scope.view() {
JsonValue::String(s) => s,
_ => String::new(),
};
let local_sled_id = crate::crdt_state::our_node_id().unwrap_or_default();
let scope_filter = parse_scope(entry, &local_sled_id);
let high_water = parse_high_water(entry);
Some(LlmSessionView {
session_id,
persona_name,
scope,
scope_filter,
high_water,
})
}
+2 -2
View File
@@ -53,8 +53,8 @@ pub use types::{
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent, EpicId,
EventLogEntryCrdt, GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, LlmSessionCrdt,
LlmSessionView, MergeJobCrdt, MergeJobView, NodePresenceCrdt, NodePresenceView, PipelineDoc,
PipelineItemCrdt, PipelineItemView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView,
WorkItem,
PipelineItemCrdt, PipelineItemView, ScopeFilter, TestJobCrdt, TestJobView, TokenUsageCrdt,
TokenUsageView, WorkItem,
};
pub use write::{
bump_retry_count, migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs,
+57 -3
View File
@@ -90,21 +90,75 @@ pub struct LlmSessionCrdt {
pub session_id: LwwRegisterCrdt<String>,
/// Human-readable persona name (e.g. `"Timmy"`).
pub persona_name: LwwRegisterCrdt<String>,
/// Scope tag: `"single-sled"` for now; extended in future stories.
/// Scope wire string parsed by [`ScopeFilter::from_scope_str`]: `"all"`,
/// `"sleds:hex1,hex2"`, or legacy `"single-sled"` / empty (→ local sled).
pub scope: LwwRegisterCrdt<String>,
/// JSON-serialised `BTreeMap<sled_id, last_seen_event_seq>` tracking how far
/// each sled's event stream has been injected into this session's prompts.
pub high_water: LwwRegisterCrdt<String>,
}
/// Which sleds' events an LLM session may see.
///
/// Stored as a compact string in the CRDT register and parsed at read time.
/// The default for a freshly-created session with no stored scope is
/// [`ScopeFilter::LocalOnly`], which preserves prior single-sled behaviour.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ScopeFilter {
/// Include events from every sled present in the CRDT event log.
///
/// Default for gateway-level personas (e.g. Timmy in multi-project mode).
All,
/// Include only events whose `sled_id` is in the given set.
///
/// Default for sled-level personas: the set contains only the sled's own ID.
Sleds(std::collections::BTreeSet<String>),
}
impl ScopeFilter {
/// Parse a wire-form scope string stored in the CRDT register.
///
/// Recognised forms:
/// - `"all"` → [`ScopeFilter::All`]
/// - `"sleds:hex1,hex2,…"` → [`ScopeFilter::Sleds`]
/// - Anything else (including legacy `"single-sled"` and empty) →
/// [`ScopeFilter::Sleds`] with an empty set; callers should fall back
/// to the local sled ID in that case.
pub fn from_scope_str(s: &str) -> Self {
if s == "all" {
return ScopeFilter::All;
}
if let Some(rest) = s.strip_prefix("sleds:") {
let ids = rest
.split(',')
.filter(|id| !id.is_empty())
.map(|id| id.to_string())
.collect();
return ScopeFilter::Sleds(ids);
}
ScopeFilter::Sleds(std::collections::BTreeSet::new())
}
/// Encode this filter as the compact wire string stored in the CRDT.
pub fn to_scope_str(&self) -> String {
match self {
ScopeFilter::All => "all".to_string(),
ScopeFilter::Sleds(ids) => {
let joined = ids.iter().map(|s| s.as_str()).collect::<Vec<_>>().join(",");
format!("sleds:{joined}")
}
}
}
}
/// Read-side snapshot of a single LLM session entry.
pub struct LlmSessionView {
/// Stable session identifier.
pub session_id: String,
/// Persona name for the bot in this session.
pub persona_name: String,
/// Scope tag (e.g. `"single-sled"`).
pub scope: String,
/// Parsed event-scope filter derived from the `scope` CRDT register.
pub scope_filter: ScopeFilter,
/// Decoded high-water map: sled_id → last seen event_seq.
pub high_water: std::collections::BTreeMap<String, u64>,
}
+119
View File
@@ -155,4 +155,123 @@ mod tests {
let ctx = assemble_prompt_context("room-empty");
assert!(ctx.is_empty(), "must be empty with no events; got: {ctx}");
}
/// AC 4: two sleds each fire one transition; a session scoped `All` sees
/// both events; a session scoped `Sleds([sled-A])` sees only sled-A's event.
///
/// Simulates the gateway aggregate view by directly calling
/// `append_event_log_entry` with two distinct sled IDs, then asserting
/// scope-filtered assembly behaves correctly.
#[test]
fn scope_filter_all_sees_both_sleds_filter_sees_one() {
crate::crdt_state::init_for_test();
let sled_a = "aaaaaaaaaaaaaaaa";
let sled_b = "bbbbbbbbbbbbbbbb";
// Each sled fires one pipeline transition.
crate::crdt_state::append_event_log_entry(
sled_a,
1_000_000.0,
"10_story_alpha",
"1_backlog",
"2_current",
"DepsMet",
);
crate::crdt_state::append_event_log_entry(
sled_b,
1_000_001.0,
"20_story_beta",
"2_current",
"3_qa",
"AgentCompleted",
);
// Set up a session scoped to ALL sleds.
crate::crdt_state::write_llm_session("room-scope-all", "Timmy", "all");
// Set up a session scoped to sled-A only.
let sled_a_scope = format!("sleds:{sled_a}");
crate::crdt_state::write_llm_session("room-scope-sled-a", "Sally", &sled_a_scope);
// All-scope session: both events must appear.
let ctx_all = assemble_prompt_context("room-scope-all");
assert!(
ctx_all.contains("10_story_alpha"),
"All scope must contain sled-A event; got: {ctx_all}"
);
assert!(
ctx_all.contains("20_story_beta"),
"All scope must contain sled-B event; got: {ctx_all}"
);
// Sled-A-only session: only sled-A's event visible.
let ctx_a = assemble_prompt_context("room-scope-sled-a");
assert!(
ctx_a.contains("10_story_alpha"),
"Sleds filter must contain sled-A event; got: {ctx_a}"
);
assert!(
!ctx_a.contains("20_story_beta"),
"Sleds filter must NOT contain sled-B event; got: {ctx_a}"
);
// Second call on both sessions: nothing new (high-water advanced).
let ctx_all2 = assemble_prompt_context("room-scope-all");
assert!(
ctx_all2.is_empty(),
"All scope second call must be empty; got: {ctx_all2}"
);
let ctx_a2 = assemble_prompt_context("room-scope-sled-a");
assert!(
ctx_a2.is_empty(),
"Sleds filter second call must be empty; got: {ctx_a2}"
);
}
/// Newly-added sled events appear in an All-scope session without
/// restarting (AC 5 runtime pickup).
#[test]
fn scope_filter_all_picks_up_new_sled_at_runtime() {
crate::crdt_state::init_for_test();
let sled_a = "cccccccccccccccc";
let sled_new = "dddddddddddddddd";
// Only sled-A exists initially.
crate::crdt_state::append_event_log_entry(
sled_a,
2_000_000.0,
"30_story_first",
"1_backlog",
"2_current",
"DepsMet",
);
crate::crdt_state::write_llm_session("room-runtime-pickup", "Timmy", "all");
let ctx1 = assemble_prompt_context("room-runtime-pickup");
assert!(
ctx1.contains("30_story_first"),
"first event must appear; got: {ctx1}"
);
// sled_new is adopted at runtime — its event is appended without restart.
crate::crdt_state::append_event_log_entry(
sled_new,
2_000_001.0,
"40_story_second",
"2_current",
"3_qa",
"AgentCompleted",
);
let ctx2 = assemble_prompt_context("room-runtime-pickup");
assert!(
ctx2.contains("40_story_second"),
"newly adopted sled event must appear; got: {ctx2}"
);
assert!(
!ctx2.contains("30_story_first"),
"old event must not reappear; got: {ctx2}"
);
}
}
+26
View File
@@ -613,6 +613,12 @@ pub async fn init_project(
/// Broadcast a status event received from a project node to all local subscribers.
///
/// For [`crate::service::events::StoredEvent::StageTransition`] events the
/// transition is also appended to the gateway's CRDT event log using the
/// project name as the `sled_id`. This builds the live tail-merged aggregate
/// view that [`crate::crdt_state::assemble_and_advance_session`] reads when a
/// session's [`crate::crdt_state::ScopeFilter`] is set to `All`.
///
/// Returns the number of active receivers that received the event.
/// A return value of zero means no subscribers are currently connected.
pub fn broadcast_status_event(
@@ -620,6 +626,26 @@ pub fn broadcast_status_event(
project: String,
event: crate::service::events::StoredEvent,
) -> usize {
// Append StageTransition events to the gateway CRDT so the session
// assembler can deliver a unified cross-sled event stream.
if let crate::service::events::StoredEvent::StageTransition {
ref story_id,
ref from_stage,
ref to_stage,
timestamp_ms,
..
} = event
{
let timestamp_secs = timestamp_ms as f64 / 1_000.0_f64;
crate::crdt_state::append_event_log_entry(
&project,
timestamp_secs,
story_id,
from_stage,
to_stage,
"StageTransition",
);
}
let msg = GatewayStatusEvent { project, event };
state.event_tx.send(msg).unwrap_or(0)
}