huskies: merge 1125 story LLM session entity + assemble_prompt_context helper, wired into Matrix bot
This commit is contained in:
@@ -54,6 +54,11 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message(
|
|||||||
prefix
|
prefix
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Pull new pipeline-transition events from the CRDT event log for this
|
||||||
|
// session and atomically advance the high-water marks so the same events
|
||||||
|
// are not re-injected on the next turn.
|
||||||
|
let event_log_ctx = crate::llm_session::assemble_prompt_context(&room_id_str);
|
||||||
|
|
||||||
// The prompt is just the current message with sender attribution.
|
// The prompt is just the current message with sender attribution.
|
||||||
// Prior conversation context is carried by the Claude Code session.
|
// Prior conversation context is carried by the Claude Code session.
|
||||||
let bot_name = &ctx.services.bot_name;
|
let bot_name = &ctx.services.bot_name;
|
||||||
@@ -64,7 +69,7 @@ pub(in crate::chat::transport::matrix::bot) async fn handle_message(
|
|||||||
String::new()
|
String::new()
|
||||||
};
|
};
|
||||||
let prompt = format!(
|
let prompt = format!(
|
||||||
"{system_reminder_prefix}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n{active_project_ctx}\n{}",
|
"{system_reminder_prefix}{event_log_ctx}[Your name is {bot_name}. Refer to yourself as {bot_name}, not Claude.]\n{active_project_ctx}\n{}",
|
||||||
format_user_prompt(&sender, &user_message)
|
format_user_prompt(&sender, &user_message)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,221 @@
|
|||||||
|
//! Read/write helpers for the `llm_sessions` LWW-map collection, including the
|
||||||
|
//! atomic `assemble_and_advance_session` helper used by the Matrix bot.
|
||||||
|
//!
|
||||||
|
//! LLM sessions are keyed by `session_id` (typically a Matrix room ID) and track
|
||||||
|
//! per-sled high-water marks so that `assemble_and_advance_session` can inject
|
||||||
|
//! only events the LLM has not yet seen and advance the marks atomically within
|
||||||
|
//! a single CRDT lock acquisition.
|
||||||
|
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
use bft_json_crdt::json_crdt::{JsonValue, *};
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use super::super::state::{apply_and_persist, get_crdt, rebuild_llm_session_index};
|
||||||
|
use super::super::types::{LlmSessionCrdt, LlmSessionView};
|
||||||
|
|
||||||
|
/// Write or upsert an LLM session entry keyed by `session_id`.
|
||||||
|
///
|
||||||
|
/// Creates a new entry if `session_id` is not yet present; updates
|
||||||
|
/// `persona_name` and `scope` on an existing entry. The `high_water`
|
||||||
|
/// register is not touched by this function — use `assemble_and_advance_session`
|
||||||
|
/// to advance it atomically.
|
||||||
|
pub fn write_llm_session(session_id: &str, persona_name: &str, scope: &str) {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(&idx) = state.llm_session_index.get(session_id) {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.llm_sessions[idx]
|
||||||
|
.persona_name
|
||||||
|
.set(persona_name.to_string())
|
||||||
|
});
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.llm_sessions[idx].scope.set(scope.to_string())
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
let entry: JsonValue = json!({
|
||||||
|
"session_id": session_id,
|
||||||
|
"persona_name": persona_name,
|
||||||
|
"scope": scope,
|
||||||
|
"high_water": "{}",
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.llm_sessions.insert(ROOT_ID, entry)
|
||||||
|
});
|
||||||
|
state.llm_session_index = rebuild_llm_session_index(&state.crdt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read a single LLM session entry by `session_id`.
|
||||||
|
pub fn read_llm_session(session_id: &str) -> Option<LlmSessionView> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
let &idx = state.llm_session_index.get(session_id)?;
|
||||||
|
extract_llm_session_view(&state.crdt.doc.llm_sessions[idx])
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Atomically read new event-log entries for `session_id` past the stored
|
||||||
|
/// high-water marks, render them as a block of audit lines, and advance the
|
||||||
|
/// marks to prevent double-injection on the next call.
|
||||||
|
///
|
||||||
|
/// Scope is "single-sled": only events recorded by the local node (identified
|
||||||
|
/// via [`crate::crdt_state::our_node_id`]) are included. Events from other
|
||||||
|
/// sleds are ignored in this story.
|
||||||
|
///
|
||||||
|
/// Returns an empty `Vec` when there are no new events or the CRDT is not
|
||||||
|
/// initialised.
|
||||||
|
pub fn assemble_and_advance_session(session_id: &str) -> Vec<String> {
|
||||||
|
let local_sled_id = crate::crdt_state::our_node_id().unwrap_or_default();
|
||||||
|
if local_sled_id.is_empty() {
|
||||||
|
return Vec::new();
|
||||||
|
}
|
||||||
|
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return Vec::new();
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return Vec::new();
|
||||||
|
};
|
||||||
|
|
||||||
|
// Read the current high-water map for this session.
|
||||||
|
let current_high_water: BTreeMap<String, u64> = {
|
||||||
|
match state.llm_session_index.get(session_id).copied() {
|
||||||
|
Some(idx) => parse_high_water(&state.crdt.doc.llm_sessions[idx]),
|
||||||
|
None => BTreeMap::new(),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let last_seen = current_high_water.get(&local_sled_id).copied();
|
||||||
|
|
||||||
|
// Collect new events from the local sled past the high-water mark.
|
||||||
|
let new_events: Vec<(u64, String, String, String, String)> = state
|
||||||
|
.crdt
|
||||||
|
.doc
|
||||||
|
.event_log
|
||||||
|
.iter()
|
||||||
|
.filter_map(|e| extract_new_event(e, &local_sled_id, last_seen))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if new_events.is_empty() {
|
||||||
|
return Vec::new();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Advance the high-water mark to the maximum new event_seq.
|
||||||
|
let new_max_seq = new_events.iter().map(|(seq, ..)| *seq).max().unwrap_or(0);
|
||||||
|
let mut new_high_water = current_high_water;
|
||||||
|
new_high_water.insert(local_sled_id, new_max_seq);
|
||||||
|
let new_hw_json = serde_json::to_string(&new_high_water).unwrap_or_else(|_| "{}".to_string());
|
||||||
|
|
||||||
|
// Upsert the session entry with the new high-water value.
|
||||||
|
let idx_opt = state.llm_session_index.get(session_id).copied();
|
||||||
|
if let Some(idx) = idx_opt {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.llm_sessions[idx]
|
||||||
|
.high_water
|
||||||
|
.set(new_hw_json.clone())
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
let entry: JsonValue = json!({
|
||||||
|
"session_id": session_id,
|
||||||
|
"persona_name": "",
|
||||||
|
"scope": "single-sled",
|
||||||
|
"high_water": new_hw_json,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.llm_sessions.insert(ROOT_ID, entry)
|
||||||
|
});
|
||||||
|
state.llm_session_index = rebuild_llm_session_index(&state.crdt);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Render each new event as a compact audit line.
|
||||||
|
new_events
|
||||||
|
.into_iter()
|
||||||
|
.map(|(_, story_id, from_stage, to_stage, pipeline_event)| {
|
||||||
|
format!(
|
||||||
|
"pipeline_event story_id=\"{story_id}\" from=\"{from_stage}\" \
|
||||||
|
to=\"{to_stage}\" event=\"{pipeline_event}\""
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decode the high-water JSON string from an `LlmSessionCrdt` entry.
|
||||||
|
fn parse_high_water(entry: &LlmSessionCrdt) -> BTreeMap<String, u64> {
|
||||||
|
match entry.high_water.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() && s != "{}" => {
|
||||||
|
serde_json::from_str(&s).unwrap_or_default()
|
||||||
|
}
|
||||||
|
_ => BTreeMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract one event log entry if it belongs to `sled_id` and has an
|
||||||
|
/// `event_seq` strictly greater than `last_seen` (or `last_seen` is `None`,
|
||||||
|
/// meaning all events from this sled are new).
|
||||||
|
fn extract_new_event(
|
||||||
|
e: &crate::crdt_state::types::EventLogEntryCrdt,
|
||||||
|
sled_id: &str,
|
||||||
|
last_seen: Option<u64>,
|
||||||
|
) -> Option<(u64, String, String, String, String)> {
|
||||||
|
let entry_sled = match e.sled_id.view() {
|
||||||
|
JsonValue::String(s) if s == sled_id => s,
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
let event_seq = match e.event_seq.view() {
|
||||||
|
JsonValue::Number(n) => n as u64,
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
// Skip if we've already injected this event.
|
||||||
|
if last_seen.is_some_and(|last| event_seq <= last) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let story_id = match e.story_id.view() {
|
||||||
|
JsonValue::String(s) => s,
|
||||||
|
_ => String::new(),
|
||||||
|
};
|
||||||
|
let from_stage = match e.from_stage.view() {
|
||||||
|
JsonValue::String(s) => s,
|
||||||
|
_ => String::new(),
|
||||||
|
};
|
||||||
|
let to_stage = match e.to_stage.view() {
|
||||||
|
JsonValue::String(s) => s,
|
||||||
|
_ => String::new(),
|
||||||
|
};
|
||||||
|
let pipeline_event = match e.pipeline_event.view() {
|
||||||
|
JsonValue::String(s) => s,
|
||||||
|
_ => String::new(),
|
||||||
|
};
|
||||||
|
let _ = entry_sled; // used only for filtering above
|
||||||
|
Some((event_seq, story_id, from_stage, to_stage, pipeline_event))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert a CRDT LLM session entry into its read-only view representation.
|
||||||
|
pub(super) fn extract_llm_session_view(entry: &LlmSessionCrdt) -> Option<LlmSessionView> {
|
||||||
|
let session_id = match entry.session_id.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => s,
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
let persona_name = match entry.persona_name.view() {
|
||||||
|
JsonValue::String(s) => s,
|
||||||
|
_ => String::new(),
|
||||||
|
};
|
||||||
|
let scope = match entry.scope.view() {
|
||||||
|
JsonValue::String(s) => s,
|
||||||
|
_ => String::new(),
|
||||||
|
};
|
||||||
|
let high_water = parse_high_water(entry);
|
||||||
|
Some(LlmSessionView {
|
||||||
|
session_id,
|
||||||
|
persona_name,
|
||||||
|
scope,
|
||||||
|
high_water,
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -16,6 +16,7 @@ mod active_agents;
|
|||||||
mod agent_throttle;
|
mod agent_throttle;
|
||||||
mod event_log;
|
mod event_log;
|
||||||
mod gateway_projects;
|
mod gateway_projects;
|
||||||
|
mod llm_sessions;
|
||||||
mod merge_jobs;
|
mod merge_jobs;
|
||||||
mod test_jobs;
|
mod test_jobs;
|
||||||
mod tokens;
|
mod tokens;
|
||||||
@@ -33,6 +34,7 @@ pub use event_log::{EventLogEntryRaw, append_event_log_entry, read_all_event_log
|
|||||||
pub use gateway_projects::{
|
pub use gateway_projects::{
|
||||||
delete_gateway_project, read_all_gateway_projects, read_gateway_project, write_gateway_project,
|
delete_gateway_project, read_all_gateway_projects, read_gateway_project, write_gateway_project,
|
||||||
};
|
};
|
||||||
|
pub use llm_sessions::{assemble_and_advance_session, read_llm_session, write_llm_session};
|
||||||
pub use merge_jobs::{delete_merge_job, read_all_merge_jobs, read_merge_job, write_merge_job};
|
pub use merge_jobs::{delete_merge_job, read_all_merge_jobs, read_merge_job, write_merge_job};
|
||||||
pub use test_jobs::{delete_test_job, read_all_test_jobs, read_test_job, write_test_job};
|
pub use test_jobs::{delete_test_job, read_all_test_jobs, read_test_job, write_test_job};
|
||||||
pub use tokens::{delete_token_usage, read_all_token_usage, read_token_usage, write_token_usage};
|
pub use tokens::{delete_token_usage, read_all_token_usage, read_token_usage, write_token_usage};
|
||||||
|
|||||||
@@ -28,12 +28,13 @@ mod write;
|
|||||||
|
|
||||||
pub use gateway_config::{read_gateway_active_project, write_gateway_active_project};
|
pub use gateway_config::{read_gateway_active_project, write_gateway_active_project};
|
||||||
pub use lww_maps::{
|
pub use lww_maps::{
|
||||||
EventLogEntryRaw, append_event_log_entry, delete_active_agent, delete_agent_throttle,
|
EventLogEntryRaw, append_event_log_entry, assemble_and_advance_session, delete_active_agent,
|
||||||
delete_gateway_project, delete_merge_job, delete_test_job, delete_token_usage,
|
delete_agent_throttle, delete_gateway_project, delete_merge_job, delete_test_job,
|
||||||
read_active_agent, read_agent_throttle, read_all_active_agents, read_all_agent_throttles,
|
delete_token_usage, read_active_agent, read_agent_throttle, read_all_active_agents,
|
||||||
read_all_event_log_entries, read_all_gateway_projects, read_all_merge_jobs, read_all_test_jobs,
|
read_all_agent_throttles, read_all_event_log_entries, read_all_gateway_projects,
|
||||||
read_all_token_usage, read_gateway_project, read_merge_job, read_test_job, read_token_usage,
|
read_all_merge_jobs, read_all_test_jobs, read_all_token_usage, read_gateway_project,
|
||||||
write_active_agent, write_agent_throttle, write_gateway_project, write_merge_job,
|
read_llm_session, read_merge_job, read_test_job, read_token_usage, write_active_agent,
|
||||||
|
write_agent_throttle, write_gateway_project, write_llm_session, write_merge_job,
|
||||||
write_test_job, write_token_usage,
|
write_test_job, write_token_usage,
|
||||||
};
|
};
|
||||||
pub use ops::{all_ops_json, apply_remote_op, ops_since, our_vector_clock, subscribe_ops};
|
pub use ops::{all_ops_json, apply_remote_op, ops_since, our_vector_clock, subscribe_ops};
|
||||||
@@ -50,9 +51,10 @@ pub(crate) use state::flush_persistence;
|
|||||||
pub use state::{init, subscribe};
|
pub use state::{init, subscribe};
|
||||||
pub use types::{
|
pub use types::{
|
||||||
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent, EpicId,
|
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent, EpicId,
|
||||||
EventLogEntryCrdt, GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt,
|
EventLogEntryCrdt, GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, LlmSessionCrdt,
|
||||||
MergeJobView, NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt,
|
LlmSessionView, MergeJobCrdt, MergeJobView, NodePresenceCrdt, NodePresenceView, PipelineDoc,
|
||||||
PipelineItemView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem,
|
PipelineItemCrdt, PipelineItemView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView,
|
||||||
|
WorkItem,
|
||||||
};
|
};
|
||||||
pub use write::{
|
pub use write::{
|
||||||
bump_retry_count, migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs,
|
bump_retry_count, migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs,
|
||||||
|
|||||||
@@ -113,3 +113,16 @@ pub(in crate::crdt_state) fn rebuild_gateway_project_index(
|
|||||||
}
|
}
|
||||||
map
|
map
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Rebuild the session_id → llm_sessions list index.
|
||||||
|
pub(in crate::crdt_state) fn rebuild_llm_session_index(
|
||||||
|
crdt: &BaseCrdt<PipelineDoc>,
|
||||||
|
) -> HashMap<String, usize> {
|
||||||
|
let mut map = HashMap::new();
|
||||||
|
for (i, entry) in crdt.doc.llm_sessions.iter().enumerate() {
|
||||||
|
if let JsonValue::String(ref k) = entry.session_id.view() {
|
||||||
|
map.insert(k.clone(), i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
map
|
||||||
|
}
|
||||||
|
|||||||
@@ -21,8 +21,8 @@ use super::super::hex;
|
|||||||
use super::super::types::{CrdtEvent, PipelineDoc};
|
use super::super::types::{CrdtEvent, PipelineDoc};
|
||||||
use super::indices::{
|
use super::indices::{
|
||||||
rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_gateway_project_index,
|
rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_gateway_project_index,
|
||||||
rebuild_index, rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index,
|
rebuild_index, rebuild_llm_session_index, rebuild_merge_job_index, rebuild_node_index,
|
||||||
rebuild_token_index,
|
rebuild_test_job_index, rebuild_token_index,
|
||||||
};
|
};
|
||||||
use super::statics::{ALL_OPS, CRDT_EVENT_TX, PERSIST_PENDING, SYNC_TX, VECTOR_CLOCK};
|
use super::statics::{ALL_OPS, CRDT_EVENT_TX, PERSIST_PENDING, SYNC_TX, VECTOR_CLOCK};
|
||||||
use super::{CRDT_STATE, CrdtState};
|
use super::{CRDT_STATE, CrdtState};
|
||||||
@@ -103,6 +103,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
|
|||||||
let test_job_index = rebuild_test_job_index(&crdt);
|
let test_job_index = rebuild_test_job_index(&crdt);
|
||||||
let agent_throttle_index = rebuild_agent_throttle_index(&crdt);
|
let agent_throttle_index = rebuild_agent_throttle_index(&crdt);
|
||||||
let gateway_project_index = rebuild_gateway_project_index(&crdt);
|
let gateway_project_index = rebuild_gateway_project_index(&crdt);
|
||||||
|
let llm_session_index = rebuild_llm_session_index(&crdt);
|
||||||
|
|
||||||
// Advance the top-level list clocks to the Lamport floor so that
|
// Advance the top-level list clocks to the Lamport floor so that
|
||||||
// list-level inserts don't re-emit low seq numbers.
|
// list-level inserts don't re-emit low seq numbers.
|
||||||
@@ -114,6 +115,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
|
|||||||
crdt.doc.test_jobs.advance_seq(lamport_floor);
|
crdt.doc.test_jobs.advance_seq(lamport_floor);
|
||||||
crdt.doc.agent_throttle.advance_seq(lamport_floor);
|
crdt.doc.agent_throttle.advance_seq(lamport_floor);
|
||||||
crdt.doc.gateway_projects.advance_seq(lamport_floor);
|
crdt.doc.gateway_projects.advance_seq(lamport_floor);
|
||||||
|
crdt.doc.llm_sessions.advance_seq(lamport_floor);
|
||||||
crdt.doc
|
crdt.doc
|
||||||
.gateway_config
|
.gateway_config
|
||||||
.active_project
|
.active_project
|
||||||
@@ -183,6 +185,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
|
|||||||
test_job_index,
|
test_job_index,
|
||||||
agent_throttle_index,
|
agent_throttle_index,
|
||||||
gateway_project_index,
|
gateway_project_index,
|
||||||
|
llm_session_index,
|
||||||
persist_tx,
|
persist_tx,
|
||||||
lamport_floor,
|
lamport_floor,
|
||||||
tombstones,
|
tombstones,
|
||||||
|
|||||||
@@ -39,8 +39,8 @@ pub fn subscribe() -> Option<broadcast::Receiver<super::types::CrdtEvent>> {
|
|||||||
pub(super) use apply::{apply_and_persist, emit_event};
|
pub(super) use apply::{apply_and_persist, emit_event};
|
||||||
pub(super) use indices::{
|
pub(super) use indices::{
|
||||||
rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_gateway_project_index,
|
rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_gateway_project_index,
|
||||||
rebuild_index, rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index,
|
rebuild_index, rebuild_llm_session_index, rebuild_merge_job_index, rebuild_node_index,
|
||||||
rebuild_token_index,
|
rebuild_test_job_index, rebuild_token_index,
|
||||||
};
|
};
|
||||||
pub(crate) use statics::{PERSIST_PENDING, all_ops_lock, vector_clock_lock};
|
pub(crate) use statics::{PERSIST_PENDING, all_ops_lock, vector_clock_lock};
|
||||||
pub(super) use statics::{SYNC_TX, track_op};
|
pub(super) use statics::{SYNC_TX, track_op};
|
||||||
@@ -67,6 +67,8 @@ pub(super) struct CrdtState {
|
|||||||
pub(super) agent_throttle_index: HashMap<String, usize>,
|
pub(super) agent_throttle_index: HashMap<String, usize>,
|
||||||
/// Maps project name → index in the gateway_projects ListCrdt for O(1) lookup.
|
/// Maps project name → index in the gateway_projects ListCrdt for O(1) lookup.
|
||||||
pub(super) gateway_project_index: HashMap<String, usize>,
|
pub(super) gateway_project_index: HashMap<String, usize>,
|
||||||
|
/// Maps session_id → index in the llm_sessions ListCrdt for O(1) lookup.
|
||||||
|
pub(super) llm_session_index: HashMap<String, usize>,
|
||||||
/// Channel sender for op persistence and drain signalling.
|
/// Channel sender for op persistence and drain signalling.
|
||||||
pub(super) persist_tx: mpsc::UnboundedSender<init::PersistMsg>,
|
pub(super) persist_tx: mpsc::UnboundedSender<init::PersistMsg>,
|
||||||
/// Max sequence number seen across all ops during init() replay.
|
/// Max sequence number seen across all ops during init() replay.
|
||||||
@@ -145,6 +147,7 @@ pub fn init_for_test() {
|
|||||||
test_job_index: HashMap::new(),
|
test_job_index: HashMap::new(),
|
||||||
agent_throttle_index: HashMap::new(),
|
agent_throttle_index: HashMap::new(),
|
||||||
gateway_project_index: HashMap::new(),
|
gateway_project_index: HashMap::new(),
|
||||||
|
llm_session_index: HashMap::new(),
|
||||||
persist_tx,
|
persist_tx,
|
||||||
lamport_floor: 0,
|
lamport_floor: 0,
|
||||||
tombstones: HashSet::new(),
|
tombstones: HashSet::new(),
|
||||||
|
|||||||
@@ -236,6 +236,7 @@ fn persist_tx_send_failure_logs_warn_with_op_type_and_seq() {
|
|||||||
test_job_index: HashMap::new(),
|
test_job_index: HashMap::new(),
|
||||||
agent_throttle_index: HashMap::new(),
|
agent_throttle_index: HashMap::new(),
|
||||||
gateway_project_index: HashMap::new(),
|
gateway_project_index: HashMap::new(),
|
||||||
|
llm_session_index: HashMap::new(),
|
||||||
persist_tx,
|
persist_tx,
|
||||||
lamport_floor: 0,
|
lamport_floor: 0,
|
||||||
tombstones: std::collections::HashSet::new(),
|
tombstones: std::collections::HashSet::new(),
|
||||||
@@ -310,6 +311,7 @@ fn persist_tx_send_success_emits_no_warn() {
|
|||||||
test_job_index: HashMap::new(),
|
test_job_index: HashMap::new(),
|
||||||
agent_throttle_index: HashMap::new(),
|
agent_throttle_index: HashMap::new(),
|
||||||
gateway_project_index: HashMap::new(),
|
gateway_project_index: HashMap::new(),
|
||||||
|
llm_session_index: HashMap::new(),
|
||||||
persist_tx,
|
persist_tx,
|
||||||
lamport_floor: 0,
|
lamport_floor: 0,
|
||||||
tombstones: std::collections::HashSet::new(),
|
tombstones: std::collections::HashSet::new(),
|
||||||
|
|||||||
@@ -48,6 +48,8 @@ pub struct PipelineDoc {
|
|||||||
pub gateway_config: GatewayConfigCrdt,
|
pub gateway_config: GatewayConfigCrdt,
|
||||||
/// Append-only log of every pipeline transition, persisted as CRDT ops.
|
/// Append-only log of every pipeline transition, persisted as CRDT ops.
|
||||||
pub event_log: ListCrdt<EventLogEntryCrdt>,
|
pub event_log: ListCrdt<EventLogEntryCrdt>,
|
||||||
|
/// Per-session LLM context state (high-water marks for event log injection).
|
||||||
|
pub llm_sessions: ListCrdt<LlmSessionCrdt>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// CRDT entry representing a single persisted pipeline stage-transition event.
|
/// CRDT entry representing a single persisted pipeline stage-transition event.
|
||||||
@@ -76,6 +78,37 @@ pub struct EventLogEntryCrdt {
|
|||||||
pub pipeline_event: LwwRegisterCrdt<String>,
|
pub pipeline_event: LwwRegisterCrdt<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// CRDT entry tracking an LLM session's event-log injection state.
|
||||||
|
///
|
||||||
|
/// Each session (keyed by `session_id`, typically a Matrix room ID) records the
|
||||||
|
/// per-sled high-water marks so that `assemble_prompt_context` can inject only
|
||||||
|
/// events the LLM has not yet seen and then advance the marks atomically.
|
||||||
|
#[add_crdt_fields]
|
||||||
|
#[derive(Clone, CrdtNode, Debug)]
|
||||||
|
pub struct LlmSessionCrdt {
|
||||||
|
/// Stable session identifier (e.g. Matrix room ID).
|
||||||
|
pub session_id: LwwRegisterCrdt<String>,
|
||||||
|
/// Human-readable persona name (e.g. `"Timmy"`).
|
||||||
|
pub persona_name: LwwRegisterCrdt<String>,
|
||||||
|
/// Scope tag: `"single-sled"` for now; extended in future stories.
|
||||||
|
pub scope: LwwRegisterCrdt<String>,
|
||||||
|
/// JSON-serialised `BTreeMap<sled_id, last_seen_event_seq>` tracking how far
|
||||||
|
/// each sled's event stream has been injected into this session's prompts.
|
||||||
|
pub high_water: LwwRegisterCrdt<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read-side snapshot of a single LLM session entry.
|
||||||
|
pub struct LlmSessionView {
|
||||||
|
/// Stable session identifier.
|
||||||
|
pub session_id: String,
|
||||||
|
/// Persona name for the bot in this session.
|
||||||
|
pub persona_name: String,
|
||||||
|
/// Scope tag (e.g. `"single-sled"`).
|
||||||
|
pub scope: String,
|
||||||
|
/// Decoded high-water map: sled_id → last seen event_seq.
|
||||||
|
pub high_water: std::collections::BTreeMap<String, u64>,
|
||||||
|
}
|
||||||
|
|
||||||
/// CRDT sub-document representing a single pipeline work item with LWW fields for stage, agent, etc.
|
/// CRDT sub-document representing a single pipeline work item with LWW fields for stage, agent, etc.
|
||||||
///
|
///
|
||||||
/// Story 945: the `blocked`, `review_hold`, `frozen`, and
|
/// Story 945: the `blocked`, `review_hold`, `frozen`, and
|
||||||
|
|||||||
@@ -0,0 +1,154 @@
|
|||||||
|
//! LLM session management — CRDT-backed context assembly for bot prompts.
|
||||||
|
//!
|
||||||
|
//! The central export is [`assemble_prompt_context`], which reads new pipeline
|
||||||
|
//! transition events from the CRDT event log past the session's stored high-water
|
||||||
|
//! marks, wraps them in a `<system-reminder>` block for injection at the head of
|
||||||
|
//! the next LLM prompt, and atomically advances the marks so a mid-turn crash
|
||||||
|
//! cannot double-inject the same events.
|
||||||
|
|
||||||
|
/// Assemble a `<system-reminder>` block containing new pipeline-transition events
|
||||||
|
/// for `session_id` and atomically advance the high-water marks.
|
||||||
|
///
|
||||||
|
/// Reads events from the local sled's CRDT event log that have not yet been
|
||||||
|
/// injected into this session (tracked via per-sled high-water marks stored in
|
||||||
|
/// the `LlmSessionCrdt` entity). Returns an empty string when there are no new
|
||||||
|
/// events or the CRDT is not yet initialised.
|
||||||
|
pub fn assemble_prompt_context(session_id: &str) -> String {
|
||||||
|
let lines = crate::crdt_state::assemble_and_advance_session(session_id);
|
||||||
|
if lines.is_empty() {
|
||||||
|
return String::new();
|
||||||
|
}
|
||||||
|
let body = lines.join("\n");
|
||||||
|
format!("<system-reminder>\n{body}\n</system-reminder>\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::pipeline_state::{PipelineEvent, PlanState, Stage, StoryId, TransitionFired};
|
||||||
|
|
||||||
|
fn make_fired(story_id: &str) -> TransitionFired {
|
||||||
|
TransitionFired {
|
||||||
|
story_id: StoryId(story_id.to_string()),
|
||||||
|
before: Stage::Backlog,
|
||||||
|
after: Stage::Coding {
|
||||||
|
claim: None,
|
||||||
|
plan: PlanState::Missing,
|
||||||
|
retries: 0,
|
||||||
|
},
|
||||||
|
event: PipelineEvent::DepsMet,
|
||||||
|
at: chrono::Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC 4: fire a `TransitionFired` event, call `assemble_prompt_context` via
|
||||||
|
/// the session helper, assert the rendered output contains the event details.
|
||||||
|
/// A second call must return empty because the high-water was advanced.
|
||||||
|
#[test]
|
||||||
|
fn assemble_prompt_context_includes_new_events_and_advances_high_water() {
|
||||||
|
crate::crdt_state::init_for_test();
|
||||||
|
|
||||||
|
// Log two transition events for different stories.
|
||||||
|
crate::event_log::log_transition_event(&make_fired("42_story_foo"));
|
||||||
|
crate::event_log::log_transition_event(&make_fired("99_story_bar"));
|
||||||
|
|
||||||
|
let ctx = assemble_prompt_context("room-test-1");
|
||||||
|
|
||||||
|
// Must be wrapped in a <system-reminder> block.
|
||||||
|
assert!(
|
||||||
|
ctx.starts_with("<system-reminder>\n"),
|
||||||
|
"missing opening tag; got: {ctx}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
ctx.ends_with("</system-reminder>\n"),
|
||||||
|
"missing closing tag; got: {ctx}"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Both story IDs must appear in the rendered block.
|
||||||
|
assert!(
|
||||||
|
ctx.contains("42_story_foo"),
|
||||||
|
"first story missing; got: {ctx}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
ctx.contains("99_story_bar"),
|
||||||
|
"second story missing; got: {ctx}"
|
||||||
|
);
|
||||||
|
|
||||||
|
// The pipeline_event label must appear.
|
||||||
|
assert!(ctx.contains("DepsMet"), "event label missing; got: {ctx}");
|
||||||
|
|
||||||
|
// Second call: high-water was advanced — no new events, returns empty.
|
||||||
|
let ctx2 = assemble_prompt_context("room-test-1");
|
||||||
|
assert!(
|
||||||
|
ctx2.is_empty(),
|
||||||
|
"second call must be empty after high-water advance; got: {ctx2}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Different session IDs have independent high-water marks.
|
||||||
|
#[test]
|
||||||
|
fn assemble_prompt_context_sessions_are_independent() {
|
||||||
|
crate::crdt_state::init_for_test();
|
||||||
|
|
||||||
|
crate::event_log::log_transition_event(&make_fired("77_story_x"));
|
||||||
|
|
||||||
|
// Session A sees the event.
|
||||||
|
let ctx_a = assemble_prompt_context("room-session-a");
|
||||||
|
assert!(
|
||||||
|
ctx_a.contains("77_story_x"),
|
||||||
|
"session A must see the event; got: {ctx_a}"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Session B also sees it (independent high-water).
|
||||||
|
let ctx_b = assemble_prompt_context("room-session-b");
|
||||||
|
assert!(
|
||||||
|
ctx_b.contains("77_story_x"),
|
||||||
|
"session B must see the event; got: {ctx_b}"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Second call on A: already advanced.
|
||||||
|
let ctx_a2 = assemble_prompt_context("room-session-a");
|
||||||
|
assert!(
|
||||||
|
ctx_a2.is_empty(),
|
||||||
|
"session A second call must be empty; got: {ctx_a2}"
|
||||||
|
);
|
||||||
|
|
||||||
|
// But B's second call is also empty.
|
||||||
|
let ctx_b2 = assemble_prompt_context("room-session-b");
|
||||||
|
assert!(
|
||||||
|
ctx_b2.is_empty(),
|
||||||
|
"session B second call must be empty; got: {ctx_b2}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Events logged after a prior advance are included in the next call.
|
||||||
|
#[test]
|
||||||
|
fn assemble_prompt_context_includes_events_logged_after_advance() {
|
||||||
|
crate::crdt_state::init_for_test();
|
||||||
|
|
||||||
|
crate::event_log::log_transition_event(&make_fired("10_story_old"));
|
||||||
|
// First call drains and advances.
|
||||||
|
let ctx1 = assemble_prompt_context("room-incremental");
|
||||||
|
assert!(ctx1.contains("10_story_old"), "got: {ctx1}");
|
||||||
|
|
||||||
|
// Log a new event after the advance.
|
||||||
|
crate::event_log::log_transition_event(&make_fired("20_story_new"));
|
||||||
|
let ctx2 = assemble_prompt_context("room-incremental");
|
||||||
|
assert!(
|
||||||
|
ctx2.contains("20_story_new"),
|
||||||
|
"new event must appear; got: {ctx2}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
!ctx2.contains("10_story_old"),
|
||||||
|
"old event must not reappear; got: {ctx2}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `assemble_prompt_context` returns empty string when there are no events.
|
||||||
|
#[test]
|
||||||
|
fn assemble_prompt_context_empty_when_no_events() {
|
||||||
|
crate::crdt_state::init_for_test();
|
||||||
|
let ctx = assemble_prompt_context("room-empty");
|
||||||
|
assert!(ctx.is_empty(), "must be empty with no events; got: {ctx}");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -28,6 +28,8 @@ mod gateway_relay;
|
|||||||
mod http;
|
mod http;
|
||||||
mod io;
|
mod io;
|
||||||
mod llm;
|
mod llm;
|
||||||
|
/// LLM session management — CRDT-backed context assembly for bot prompts.
|
||||||
|
pub(crate) mod llm_session;
|
||||||
/// Log buffer — in-memory ring buffer for recent server-side log lines.
|
/// Log buffer — in-memory ring buffer for recent server-side log lines.
|
||||||
pub mod log_buffer;
|
pub mod log_buffer;
|
||||||
/// Mesh — peer discovery and multi-hop CRDT replication over WebSocket.
|
/// Mesh — peer discovery and multi-hop CRDT replication over WebSocket.
|
||||||
|
|||||||
Reference in New Issue
Block a user