diff --git a/server/src/chat/transport/matrix/bot/messages/handle_message.rs b/server/src/chat/transport/matrix/bot/messages/handle_message.rs index 67b73416..0010d83b 100644 --- a/server/src/chat/transport/matrix/bot/messages/handle_message.rs +++ b/server/src/chat/transport/matrix/bot/messages/handle_message.rs @@ -54,6 +54,11 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message( prefix }; + // Pull new pipeline-transition events from the CRDT event log for this + // session and atomically advance the high-water marks so the same events + // are not re-injected on the next turn. + let event_log_ctx = crate::llm_session::assemble_prompt_context(&room_id_str); + // The prompt is just the current message with sender attribution. // Prior conversation context is carried by the Claude Code session. let bot_name = &ctx.services.bot_name; @@ -64,7 +69,7 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message( String::new() }; let prompt = format!( - "{system_reminder_prefix}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n{active_project_ctx}\n{}", + "{system_reminder_prefix}{event_log_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n{active_project_ctx}\n{}", format_user_prompt(&sender, &user_message) ); diff --git a/server/src/crdt_state/lww_maps/llm_sessions.rs b/server/src/crdt_state/lww_maps/llm_sessions.rs new file mode 100644 index 00000000..3e2d07bb --- /dev/null +++ b/server/src/crdt_state/lww_maps/llm_sessions.rs @@ -0,0 +1,221 @@ +//! Read/write helpers for the `llm_sessions` LWW-map collection, including the +//! atomic `assemble_and_advance_session` helper used by the Matrix bot. +//! +//! LLM sessions are keyed by `session_id` (typically a Matrix room ID) and track +//! per-sled high-water marks so that `assemble_and_advance_session` can inject +//! only events the LLM has not yet seen and advance the marks atomically within +//! a single CRDT lock acquisition. + +use std::collections::BTreeMap; + +use bft_json_crdt::json_crdt::{JsonValue, *}; +use bft_json_crdt::op::ROOT_ID; +use serde_json::json; + +use super::super::state::{apply_and_persist, get_crdt, rebuild_llm_session_index}; +use super::super::types::{LlmSessionCrdt, LlmSessionView}; + +/// Write or upsert an LLM session entry keyed by `session_id`. +/// +/// Creates a new entry if `session_id` is not yet present; updates +/// `persona_name` and `scope` on an existing entry. The `high_water` +/// register is not touched by this function — use `assemble_and_advance_session` +/// to advance it atomically. +pub fn write_llm_session(session_id: &str, persona_name: &str, scope: &str) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + + if let Some(&idx) = state.llm_session_index.get(session_id) { + apply_and_persist(&mut state, |s| { + s.crdt.doc.llm_sessions[idx] + .persona_name + .set(persona_name.to_string()) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.llm_sessions[idx].scope.set(scope.to_string()) + }); + } else { + let entry: JsonValue = json!({ + "session_id": session_id, + "persona_name": persona_name, + "scope": scope, + "high_water": "{}", + }) + .into(); + apply_and_persist(&mut state, |s| { + s.crdt.doc.llm_sessions.insert(ROOT_ID, entry) + }); + state.llm_session_index = rebuild_llm_session_index(&state.crdt); + } +} + +/// Read a single LLM session entry by `session_id`. +pub fn read_llm_session(session_id: &str) -> Option { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + let &idx = state.llm_session_index.get(session_id)?; + extract_llm_session_view(&state.crdt.doc.llm_sessions[idx]) +} + +/// Atomically read new event-log entries for `session_id` past the stored +/// high-water marks, render them as a block of audit lines, and advance the +/// marks to prevent double-injection on the next call. +/// +/// Scope is "single-sled": only events recorded by the local node (identified +/// via [`crate::crdt_state::our_node_id`]) are included. Events from other +/// sleds are ignored in this story. +/// +/// Returns an empty `Vec` when there are no new events or the CRDT is not +/// initialised. +pub fn assemble_and_advance_session(session_id: &str) -> Vec { + let local_sled_id = crate::crdt_state::our_node_id().unwrap_or_default(); + if local_sled_id.is_empty() { + return Vec::new(); + } + + let Some(state_mutex) = get_crdt() else { + return Vec::new(); + }; + let Ok(mut state) = state_mutex.lock() else { + return Vec::new(); + }; + + // Read the current high-water map for this session. + let current_high_water: BTreeMap = { + match state.llm_session_index.get(session_id).copied() { + Some(idx) => parse_high_water(&state.crdt.doc.llm_sessions[idx]), + None => BTreeMap::new(), + } + }; + + let last_seen = current_high_water.get(&local_sled_id).copied(); + + // Collect new events from the local sled past the high-water mark. + let new_events: Vec<(u64, String, String, String, String)> = state + .crdt + .doc + .event_log + .iter() + .filter_map(|e| extract_new_event(e, &local_sled_id, last_seen)) + .collect(); + + if new_events.is_empty() { + return Vec::new(); + } + + // Advance the high-water mark to the maximum new event_seq. + let new_max_seq = new_events.iter().map(|(seq, ..)| *seq).max().unwrap_or(0); + let mut new_high_water = current_high_water; + new_high_water.insert(local_sled_id, new_max_seq); + let new_hw_json = serde_json::to_string(&new_high_water).unwrap_or_else(|_| "{}".to_string()); + + // Upsert the session entry with the new high-water value. + let idx_opt = state.llm_session_index.get(session_id).copied(); + if let Some(idx) = idx_opt { + apply_and_persist(&mut state, |s| { + s.crdt.doc.llm_sessions[idx] + .high_water + .set(new_hw_json.clone()) + }); + } else { + let entry: JsonValue = json!({ + "session_id": session_id, + "persona_name": "", + "scope": "single-sled", + "high_water": new_hw_json, + }) + .into(); + apply_and_persist(&mut state, |s| { + s.crdt.doc.llm_sessions.insert(ROOT_ID, entry) + }); + state.llm_session_index = rebuild_llm_session_index(&state.crdt); + } + + // Render each new event as a compact audit line. + new_events + .into_iter() + .map(|(_, story_id, from_stage, to_stage, pipeline_event)| { + format!( + "pipeline_event story_id=\"{story_id}\" from=\"{from_stage}\" \ + to=\"{to_stage}\" event=\"{pipeline_event}\"" + ) + }) + .collect() +} + +/// Decode the high-water JSON string from an `LlmSessionCrdt` entry. +fn parse_high_water(entry: &LlmSessionCrdt) -> BTreeMap { + match entry.high_water.view() { + JsonValue::String(s) if !s.is_empty() && s != "{}" => { + serde_json::from_str(&s).unwrap_or_default() + } + _ => BTreeMap::new(), + } +} + +/// Extract one event log entry if it belongs to `sled_id` and has an +/// `event_seq` strictly greater than `last_seen` (or `last_seen` is `None`, +/// meaning all events from this sled are new). +fn extract_new_event( + e: &crate::crdt_state::types::EventLogEntryCrdt, + sled_id: &str, + last_seen: Option, +) -> Option<(u64, String, String, String, String)> { + let entry_sled = match e.sled_id.view() { + JsonValue::String(s) if s == sled_id => s, + _ => return None, + }; + let event_seq = match e.event_seq.view() { + JsonValue::Number(n) => n as u64, + _ => return None, + }; + // Skip if we've already injected this event. + if last_seen.is_some_and(|last| event_seq <= last) { + return None; + } + let story_id = match e.story_id.view() { + JsonValue::String(s) => s, + _ => String::new(), + }; + let from_stage = match e.from_stage.view() { + JsonValue::String(s) => s, + _ => String::new(), + }; + let to_stage = match e.to_stage.view() { + JsonValue::String(s) => s, + _ => String::new(), + }; + let pipeline_event = match e.pipeline_event.view() { + JsonValue::String(s) => s, + _ => String::new(), + }; + let _ = entry_sled; // used only for filtering above + Some((event_seq, story_id, from_stage, to_stage, pipeline_event)) +} + +/// Convert a CRDT LLM session entry into its read-only view representation. +pub(super) fn extract_llm_session_view(entry: &LlmSessionCrdt) -> Option { + let session_id = match entry.session_id.view() { + JsonValue::String(s) if !s.is_empty() => s, + _ => return None, + }; + let persona_name = match entry.persona_name.view() { + JsonValue::String(s) => s, + _ => String::new(), + }; + let scope = match entry.scope.view() { + JsonValue::String(s) => s, + _ => String::new(), + }; + let high_water = parse_high_water(entry); + Some(LlmSessionView { + session_id, + persona_name, + scope, + high_water, + }) +} diff --git a/server/src/crdt_state/lww_maps/mod.rs b/server/src/crdt_state/lww_maps/mod.rs index b605a4f1..ebd472b3 100644 --- a/server/src/crdt_state/lww_maps/mod.rs +++ b/server/src/crdt_state/lww_maps/mod.rs @@ -16,6 +16,7 @@ mod active_agents; mod agent_throttle; mod event_log; mod gateway_projects; +mod llm_sessions; mod merge_jobs; mod test_jobs; mod tokens; @@ -33,6 +34,7 @@ pub use event_log::{EventLogEntryRaw, append_event_log_entry, read_all_event_log pub use gateway_projects::{ delete_gateway_project, read_all_gateway_projects, read_gateway_project, write_gateway_project, }; +pub use llm_sessions::{assemble_and_advance_session, read_llm_session, write_llm_session}; pub use merge_jobs::{delete_merge_job, read_all_merge_jobs, read_merge_job, write_merge_job}; pub use test_jobs::{delete_test_job, read_all_test_jobs, read_test_job, write_test_job}; pub use tokens::{delete_token_usage, read_all_token_usage, read_token_usage, write_token_usage}; diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index 37f4db81..548c41ad 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -28,12 +28,13 @@ mod write; pub use gateway_config::{read_gateway_active_project, write_gateway_active_project}; pub use lww_maps::{ - EventLogEntryRaw, append_event_log_entry, delete_active_agent, delete_agent_throttle, - delete_gateway_project, delete_merge_job, delete_test_job, delete_token_usage, - read_active_agent, read_agent_throttle, read_all_active_agents, read_all_agent_throttles, - read_all_event_log_entries, read_all_gateway_projects, read_all_merge_jobs, read_all_test_jobs, - read_all_token_usage, read_gateway_project, read_merge_job, read_test_job, read_token_usage, - write_active_agent, write_agent_throttle, write_gateway_project, write_merge_job, + EventLogEntryRaw, append_event_log_entry, assemble_and_advance_session, delete_active_agent, + delete_agent_throttle, delete_gateway_project, delete_merge_job, delete_test_job, + delete_token_usage, read_active_agent, read_agent_throttle, read_all_active_agents, + read_all_agent_throttles, read_all_event_log_entries, read_all_gateway_projects, + read_all_merge_jobs, read_all_test_jobs, read_all_token_usage, read_gateway_project, + read_llm_session, read_merge_job, read_test_job, read_token_usage, write_active_agent, + write_agent_throttle, write_gateway_project, write_llm_session, write_merge_job, write_test_job, write_token_usage, }; pub use ops::{all_ops_json, apply_remote_op, ops_since, our_vector_clock, subscribe_ops}; @@ -50,9 +51,10 @@ pub(crate) use state::flush_persistence; pub use state::{init, subscribe}; pub use types::{ ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent, EpicId, - EventLogEntryCrdt, GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, - MergeJobView, NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, - PipelineItemView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem, + EventLogEntryCrdt, GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, LlmSessionCrdt, + LlmSessionView, MergeJobCrdt, MergeJobView, NodePresenceCrdt, NodePresenceView, PipelineDoc, + PipelineItemCrdt, PipelineItemView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, + WorkItem, }; pub use write::{ bump_retry_count, migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs, diff --git a/server/src/crdt_state/state/indices.rs b/server/src/crdt_state/state/indices.rs index 31f801ec..50f5d794 100644 --- a/server/src/crdt_state/state/indices.rs +++ b/server/src/crdt_state/state/indices.rs @@ -113,3 +113,16 @@ pub(in crate::crdt_state) fn rebuild_gateway_project_index( } map } + +/// Rebuild the session_id → llm_sessions list index. +pub(in crate::crdt_state) fn rebuild_llm_session_index( + crdt: &BaseCrdt, +) -> HashMap { + let mut map = HashMap::new(); + for (i, entry) in crdt.doc.llm_sessions.iter().enumerate() { + if let JsonValue::String(ref k) = entry.session_id.view() { + map.insert(k.clone(), i); + } + } + map +} diff --git a/server/src/crdt_state/state/init.rs b/server/src/crdt_state/state/init.rs index 36bc9df8..65e62f05 100644 --- a/server/src/crdt_state/state/init.rs +++ b/server/src/crdt_state/state/init.rs @@ -21,8 +21,8 @@ use super::super::hex; use super::super::types::{CrdtEvent, PipelineDoc}; use super::indices::{ rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_gateway_project_index, - rebuild_index, rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index, - rebuild_token_index, + rebuild_index, rebuild_llm_session_index, rebuild_merge_job_index, rebuild_node_index, + rebuild_test_job_index, rebuild_token_index, }; use super::statics::{ALL_OPS, CRDT_EVENT_TX, PERSIST_PENDING, SYNC_TX, VECTOR_CLOCK}; use super::{CRDT_STATE, CrdtState}; @@ -103,6 +103,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { let test_job_index = rebuild_test_job_index(&crdt); let agent_throttle_index = rebuild_agent_throttle_index(&crdt); let gateway_project_index = rebuild_gateway_project_index(&crdt); + let llm_session_index = rebuild_llm_session_index(&crdt); // Advance the top-level list clocks to the Lamport floor so that // list-level inserts don't re-emit low seq numbers. @@ -114,6 +115,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { crdt.doc.test_jobs.advance_seq(lamport_floor); crdt.doc.agent_throttle.advance_seq(lamport_floor); crdt.doc.gateway_projects.advance_seq(lamport_floor); + crdt.doc.llm_sessions.advance_seq(lamport_floor); crdt.doc .gateway_config .active_project @@ -183,6 +185,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { test_job_index, agent_throttle_index, gateway_project_index, + llm_session_index, persist_tx, lamport_floor, tombstones, diff --git a/server/src/crdt_state/state/mod.rs b/server/src/crdt_state/state/mod.rs index 8e0ff9e6..af3672ba 100644 --- a/server/src/crdt_state/state/mod.rs +++ b/server/src/crdt_state/state/mod.rs @@ -39,8 +39,8 @@ pub fn subscribe() -> Option> { pub(super) use apply::{apply_and_persist, emit_event}; pub(super) use indices::{ rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_gateway_project_index, - rebuild_index, rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index, - rebuild_token_index, + rebuild_index, rebuild_llm_session_index, rebuild_merge_job_index, rebuild_node_index, + rebuild_test_job_index, rebuild_token_index, }; pub(crate) use statics::{PERSIST_PENDING, all_ops_lock, vector_clock_lock}; pub(super) use statics::{SYNC_TX, track_op}; @@ -67,6 +67,8 @@ pub(super) struct CrdtState { pub(super) agent_throttle_index: HashMap, /// Maps project name → index in the gateway_projects ListCrdt for O(1) lookup. pub(super) gateway_project_index: HashMap, + /// Maps session_id → index in the llm_sessions ListCrdt for O(1) lookup. + pub(super) llm_session_index: HashMap, /// Channel sender for op persistence and drain signalling. pub(super) persist_tx: mpsc::UnboundedSender, /// Max sequence number seen across all ops during init() replay. @@ -145,6 +147,7 @@ pub fn init_for_test() { test_job_index: HashMap::new(), agent_throttle_index: HashMap::new(), gateway_project_index: HashMap::new(), + llm_session_index: HashMap::new(), persist_tx, lamport_floor: 0, tombstones: HashSet::new(), diff --git a/server/src/crdt_state/state/tests.rs b/server/src/crdt_state/state/tests.rs index 5a2e8f1d..b55d5a12 100644 --- a/server/src/crdt_state/state/tests.rs +++ b/server/src/crdt_state/state/tests.rs @@ -236,6 +236,7 @@ fn persist_tx_send_failure_logs_warn_with_op_type_and_seq() { test_job_index: HashMap::new(), agent_throttle_index: HashMap::new(), gateway_project_index: HashMap::new(), + llm_session_index: HashMap::new(), persist_tx, lamport_floor: 0, tombstones: std::collections::HashSet::new(), @@ -310,6 +311,7 @@ fn persist_tx_send_success_emits_no_warn() { test_job_index: HashMap::new(), agent_throttle_index: HashMap::new(), gateway_project_index: HashMap::new(), + llm_session_index: HashMap::new(), persist_tx, lamport_floor: 0, tombstones: std::collections::HashSet::new(), diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs index 01e75aac..f4c834d1 100644 --- a/server/src/crdt_state/types.rs +++ b/server/src/crdt_state/types.rs @@ -48,6 +48,8 @@ pub struct PipelineDoc { pub gateway_config: GatewayConfigCrdt, /// Append-only log of every pipeline transition, persisted as CRDT ops. pub event_log: ListCrdt, + /// Per-session LLM context state (high-water marks for event log injection). + pub llm_sessions: ListCrdt, } /// CRDT entry representing a single persisted pipeline stage-transition event. @@ -76,6 +78,37 @@ pub struct EventLogEntryCrdt { pub pipeline_event: LwwRegisterCrdt, } +/// CRDT entry tracking an LLM session's event-log injection state. +/// +/// Each session (keyed by `session_id`, typically a Matrix room ID) records the +/// per-sled high-water marks so that `assemble_prompt_context` can inject only +/// events the LLM has not yet seen and then advance the marks atomically. +#[add_crdt_fields] +#[derive(Clone, CrdtNode, Debug)] +pub struct LlmSessionCrdt { + /// Stable session identifier (e.g. Matrix room ID). + pub session_id: LwwRegisterCrdt, + /// Human-readable persona name (e.g. `"Timmy"`). + pub persona_name: LwwRegisterCrdt, + /// Scope tag: `"single-sled"` for now; extended in future stories. + pub scope: LwwRegisterCrdt, + /// JSON-serialised `BTreeMap` tracking how far + /// each sled's event stream has been injected into this session's prompts. + pub high_water: LwwRegisterCrdt, +} + +/// Read-side snapshot of a single LLM session entry. +pub struct LlmSessionView { + /// Stable session identifier. + pub session_id: String, + /// Persona name for the bot in this session. + pub persona_name: String, + /// Scope tag (e.g. `"single-sled"`). + pub scope: String, + /// Decoded high-water map: sled_id → last seen event_seq. + pub high_water: std::collections::BTreeMap, +} + /// CRDT sub-document representing a single pipeline work item with LWW fields for stage, agent, etc. /// /// Story 945: the `blocked`, `review_hold`, `frozen`, and diff --git a/server/src/llm_session/mod.rs b/server/src/llm_session/mod.rs new file mode 100644 index 00000000..11c5de7f --- /dev/null +++ b/server/src/llm_session/mod.rs @@ -0,0 +1,154 @@ +//! LLM session management — CRDT-backed context assembly for bot prompts. +//! +//! The central export is [`assemble_prompt_context`], which reads new pipeline +//! transition events from the CRDT event log past the session's stored high-water +//! marks, wraps them in a `` block for injection at the head of +//! the next LLM prompt, and atomically advances the marks so a mid-turn crash +//! cannot double-inject the same events. + +/// Assemble a `` block containing new pipeline-transition events +/// for `session_id` and atomically advance the high-water marks. +/// +/// Reads events from the local sled's CRDT event log that have not yet been +/// injected into this session (tracked via per-sled high-water marks stored in +/// the `LlmSessionCrdt` entity). Returns an empty string when there are no new +/// events or the CRDT is not yet initialised. +pub fn assemble_prompt_context(session_id: &str) -> String { + let lines = crate::crdt_state::assemble_and_advance_session(session_id); + if lines.is_empty() { + return String::new(); + } + let body = lines.join("\n"); + format!("\n{body}\n\n") +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::pipeline_state::{PipelineEvent, PlanState, Stage, StoryId, TransitionFired}; + + fn make_fired(story_id: &str) -> TransitionFired { + TransitionFired { + story_id: StoryId(story_id.to_string()), + before: Stage::Backlog, + after: Stage::Coding { + claim: None, + plan: PlanState::Missing, + retries: 0, + }, + event: PipelineEvent::DepsMet, + at: chrono::Utc::now(), + } + } + + /// AC 4: fire a `TransitionFired` event, call `assemble_prompt_context` via + /// the session helper, assert the rendered output contains the event details. + /// A second call must return empty because the high-water was advanced. + #[test] + fn assemble_prompt_context_includes_new_events_and_advances_high_water() { + crate::crdt_state::init_for_test(); + + // Log two transition events for different stories. + crate::event_log::log_transition_event(&make_fired("42_story_foo")); + crate::event_log::log_transition_event(&make_fired("99_story_bar")); + + let ctx = assemble_prompt_context("room-test-1"); + + // Must be wrapped in a block. + assert!( + ctx.starts_with("\n"), + "missing opening tag; got: {ctx}" + ); + assert!( + ctx.ends_with("\n"), + "missing closing tag; got: {ctx}" + ); + + // Both story IDs must appear in the rendered block. + assert!( + ctx.contains("42_story_foo"), + "first story missing; got: {ctx}" + ); + assert!( + ctx.contains("99_story_bar"), + "second story missing; got: {ctx}" + ); + + // The pipeline_event label must appear. + assert!(ctx.contains("DepsMet"), "event label missing; got: {ctx}"); + + // Second call: high-water was advanced — no new events, returns empty. + let ctx2 = assemble_prompt_context("room-test-1"); + assert!( + ctx2.is_empty(), + "second call must be empty after high-water advance; got: {ctx2}" + ); + } + + /// Different session IDs have independent high-water marks. + #[test] + fn assemble_prompt_context_sessions_are_independent() { + crate::crdt_state::init_for_test(); + + crate::event_log::log_transition_event(&make_fired("77_story_x")); + + // Session A sees the event. + let ctx_a = assemble_prompt_context("room-session-a"); + assert!( + ctx_a.contains("77_story_x"), + "session A must see the event; got: {ctx_a}" + ); + + // Session B also sees it (independent high-water). + let ctx_b = assemble_prompt_context("room-session-b"); + assert!( + ctx_b.contains("77_story_x"), + "session B must see the event; got: {ctx_b}" + ); + + // Second call on A: already advanced. + let ctx_a2 = assemble_prompt_context("room-session-a"); + assert!( + ctx_a2.is_empty(), + "session A second call must be empty; got: {ctx_a2}" + ); + + // But B's second call is also empty. + let ctx_b2 = assemble_prompt_context("room-session-b"); + assert!( + ctx_b2.is_empty(), + "session B second call must be empty; got: {ctx_b2}" + ); + } + + /// Events logged after a prior advance are included in the next call. + #[test] + fn assemble_prompt_context_includes_events_logged_after_advance() { + crate::crdt_state::init_for_test(); + + crate::event_log::log_transition_event(&make_fired("10_story_old")); + // First call drains and advances. + let ctx1 = assemble_prompt_context("room-incremental"); + assert!(ctx1.contains("10_story_old"), "got: {ctx1}"); + + // Log a new event after the advance. + crate::event_log::log_transition_event(&make_fired("20_story_new")); + let ctx2 = assemble_prompt_context("room-incremental"); + assert!( + ctx2.contains("20_story_new"), + "new event must appear; got: {ctx2}" + ); + assert!( + !ctx2.contains("10_story_old"), + "old event must not reappear; got: {ctx2}" + ); + } + + /// `assemble_prompt_context` returns empty string when there are no events. + #[test] + fn assemble_prompt_context_empty_when_no_events() { + crate::crdt_state::init_for_test(); + let ctx = assemble_prompt_context("room-empty"); + assert!(ctx.is_empty(), "must be empty with no events; got: {ctx}"); + } +} diff --git a/server/src/main.rs b/server/src/main.rs index e21fbc16..7cc442d9 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -28,6 +28,8 @@ mod gateway_relay; mod http; mod io; mod llm; +/// LLM session management — CRDT-backed context assembly for bot prompts. +pub(crate) mod llm_session; /// Log buffer — in-memory ring buffer for recent server-side log lines. pub mod log_buffer; /// Mesh — peer discovery and multi-hop CRDT replication over WebSocket.