huskies: merge 836
This commit is contained in:
@@ -1,993 +0,0 @@
|
|||||||
//! Read/write helpers for the five LWW-map CRDT collections:
|
|
||||||
//! `tokens`, `merge_jobs`, `active_agents`, `test_jobs`, and `agent_throttle`.
|
|
||||||
//!
|
|
||||||
//! Each collection is backed by a `ListCrdt` with a primary-key field.
|
|
||||||
//! A secondary index in [`super::state::CrdtState`] provides O(1) lookup by
|
|
||||||
//! key. The write functions follow the same insert-or-update pattern used by
|
|
||||||
//! [`super::presence`] for the `nodes` collection.
|
|
||||||
|
|
||||||
#![allow(dead_code)]
|
|
||||||
|
|
||||||
use bft_json_crdt::json_crdt::*;
|
|
||||||
use bft_json_crdt::list_crdt::ListCrdt;
|
|
||||||
use bft_json_crdt::op::{OpId, ROOT_ID};
|
|
||||||
use serde_json::json;
|
|
||||||
|
|
||||||
/// Return the `OpId` of the entry at iter-position `idx`, consistent with
|
|
||||||
/// `ListCrdt::iter()` and the `[]` index operator.
|
|
||||||
///
|
|
||||||
/// The built-in `id_at` counts non-deleted entries including the root sentinel
|
|
||||||
/// (which has `content = None`), causing an off-by-one mismatch with
|
|
||||||
/// `iter().enumerate()`. This helper requires `content.is_some()` so that the
|
|
||||||
/// secondary-index position maps correctly to an OpId.
|
|
||||||
fn list_id_at<T: CrdtNode>(list: &ListCrdt<T>, idx: usize) -> Option<OpId> {
|
|
||||||
let mut i = 0;
|
|
||||||
for op in &list.ops {
|
|
||||||
if !op.is_deleted && op.content.is_some() {
|
|
||||||
if i == idx {
|
|
||||||
return Some(op.id);
|
|
||||||
}
|
|
||||||
i += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
use super::state::{
|
|
||||||
apply_and_persist, get_crdt, rebuild_active_agent_index, rebuild_agent_throttle_index,
|
|
||||||
rebuild_gateway_project_index, rebuild_merge_job_index, rebuild_test_job_index,
|
|
||||||
rebuild_token_index,
|
|
||||||
};
|
|
||||||
use super::types::{
|
|
||||||
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, GatewayProjectCrdt,
|
|
||||||
GatewayProjectView, MergeJobCrdt, MergeJobView, TestJobCrdt, TestJobView, TokenUsageCrdt,
|
|
||||||
TokenUsageView,
|
|
||||||
};
|
|
||||||
|
|
||||||
// ── tokens ───────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/// Write or update a token-usage entry keyed by `agent_id`.
|
|
||||||
///
|
|
||||||
/// If an entry for `agent_id` already exists it is updated in place;
|
|
||||||
/// otherwise a new entry is inserted.
|
|
||||||
pub fn write_token_usage(
|
|
||||||
agent_id: &str,
|
|
||||||
story_id: &str,
|
|
||||||
input_tokens: f64,
|
|
||||||
output_tokens: f64,
|
|
||||||
timestamp: f64,
|
|
||||||
) {
|
|
||||||
let Some(state_mutex) = get_crdt() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
let Ok(mut state) = state_mutex.lock() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(&idx) = state.token_index.get(agent_id) {
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.tokens[idx].story_id.set(story_id.to_string())
|
|
||||||
});
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.tokens[idx].input_tokens.set(input_tokens)
|
|
||||||
});
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.tokens[idx].output_tokens.set(output_tokens)
|
|
||||||
});
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.tokens[idx].timestamp.set(timestamp)
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
let entry: JsonValue = json!({
|
|
||||||
"agent_id": agent_id,
|
|
||||||
"story_id": story_id,
|
|
||||||
"input_tokens": input_tokens,
|
|
||||||
"output_tokens": output_tokens,
|
|
||||||
"timestamp": timestamp,
|
|
||||||
})
|
|
||||||
.into();
|
|
||||||
apply_and_persist(&mut state, |s| s.crdt.doc.tokens.insert(ROOT_ID, entry));
|
|
||||||
state.token_index = rebuild_token_index(&state.crdt);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read all token-usage entries.
|
|
||||||
pub fn read_all_token_usage() -> Option<Vec<TokenUsageView>> {
|
|
||||||
let state_mutex = get_crdt()?;
|
|
||||||
let state = state_mutex.lock().ok()?;
|
|
||||||
let mut out = Vec::new();
|
|
||||||
for entry in state.crdt.doc.tokens.iter() {
|
|
||||||
if let Some(v) = extract_token_view(entry) {
|
|
||||||
out.push(v);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(out)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read a single token-usage entry by `agent_id`.
|
|
||||||
pub fn read_token_usage(agent_id: &str) -> Option<TokenUsageView> {
|
|
||||||
let state_mutex = get_crdt()?;
|
|
||||||
let state = state_mutex.lock().ok()?;
|
|
||||||
let &idx = state.token_index.get(agent_id)?;
|
|
||||||
extract_token_view(&state.crdt.doc.tokens[idx])
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Tombstone a token-usage entry by `agent_id`.
|
|
||||||
///
|
|
||||||
/// Returns `true` if the entry existed and a delete op was issued.
|
|
||||||
pub fn delete_token_usage(agent_id: &str) -> bool {
|
|
||||||
let Some(state_mutex) = get_crdt() else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Ok(mut state) = state_mutex.lock() else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Some(&idx) = state.token_index.get(agent_id) else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Some(op_id) = list_id_at(&state.crdt.doc.tokens, idx) else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
apply_and_persist(&mut state, |s| s.crdt.doc.tokens.delete(op_id));
|
|
||||||
state.token_index = rebuild_token_index(&state.crdt);
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn extract_token_view(entry: &TokenUsageCrdt) -> Option<TokenUsageView> {
|
|
||||||
let agent_id = match entry.agent_id.view() {
|
|
||||||
JsonValue::String(s) if !s.is_empty() => s,
|
|
||||||
_ => return None,
|
|
||||||
};
|
|
||||||
let story_id = match entry.story_id.view() {
|
|
||||||
JsonValue::String(s) if !s.is_empty() => Some(s),
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
let input_tokens = match entry.input_tokens.view() {
|
|
||||||
JsonValue::Number(n) => n,
|
|
||||||
_ => 0.0,
|
|
||||||
};
|
|
||||||
let output_tokens = match entry.output_tokens.view() {
|
|
||||||
JsonValue::Number(n) => n,
|
|
||||||
_ => 0.0,
|
|
||||||
};
|
|
||||||
let timestamp = match entry.timestamp.view() {
|
|
||||||
JsonValue::Number(n) => n,
|
|
||||||
_ => 0.0,
|
|
||||||
};
|
|
||||||
Some(TokenUsageView {
|
|
||||||
agent_id,
|
|
||||||
story_id,
|
|
||||||
input_tokens,
|
|
||||||
output_tokens,
|
|
||||||
timestamp,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── merge_jobs ────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/// Write or update a merge-job entry keyed by `story_id`.
|
|
||||||
pub fn write_merge_job(
|
|
||||||
story_id: &str,
|
|
||||||
status: &str,
|
|
||||||
started_at: f64,
|
|
||||||
finished_at: Option<f64>,
|
|
||||||
error: Option<&str>,
|
|
||||||
) {
|
|
||||||
let Some(state_mutex) = get_crdt() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
let Ok(mut state) = state_mutex.lock() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(&idx) = state.merge_job_index.get(story_id) {
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.merge_jobs[idx].status.set(status.to_string())
|
|
||||||
});
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.merge_jobs[idx].started_at.set(started_at)
|
|
||||||
});
|
|
||||||
if let Some(fa) = finished_at {
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.merge_jobs[idx].finished_at.set(fa)
|
|
||||||
});
|
|
||||||
}
|
|
||||||
if let Some(e) = error {
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.merge_jobs[idx].error.set(e.to_string())
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
let entry: JsonValue = json!({
|
|
||||||
"story_id": story_id,
|
|
||||||
"status": status,
|
|
||||||
"started_at": started_at,
|
|
||||||
"finished_at": finished_at.unwrap_or(0.0),
|
|
||||||
"error": error.unwrap_or(""),
|
|
||||||
})
|
|
||||||
.into();
|
|
||||||
apply_and_persist(&mut state, |s| s.crdt.doc.merge_jobs.insert(ROOT_ID, entry));
|
|
||||||
state.merge_job_index = rebuild_merge_job_index(&state.crdt);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read all merge-job entries.
|
|
||||||
pub fn read_all_merge_jobs() -> Option<Vec<MergeJobView>> {
|
|
||||||
let state_mutex = get_crdt()?;
|
|
||||||
let state = state_mutex.lock().ok()?;
|
|
||||||
let mut out = Vec::new();
|
|
||||||
for entry in state.crdt.doc.merge_jobs.iter() {
|
|
||||||
if let Some(v) = extract_merge_job_view(entry) {
|
|
||||||
out.push(v);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(out)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read a single merge-job entry by `story_id`.
|
|
||||||
pub fn read_merge_job(story_id: &str) -> Option<MergeJobView> {
|
|
||||||
let state_mutex = get_crdt()?;
|
|
||||||
let state = state_mutex.lock().ok()?;
|
|
||||||
let &idx = state.merge_job_index.get(story_id)?;
|
|
||||||
extract_merge_job_view(&state.crdt.doc.merge_jobs[idx])
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Tombstone a merge-job entry by `story_id`.
|
|
||||||
pub fn delete_merge_job(story_id: &str) -> bool {
|
|
||||||
let Some(state_mutex) = get_crdt() else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Ok(mut state) = state_mutex.lock() else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Some(&idx) = state.merge_job_index.get(story_id) else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Some(op_id) = list_id_at(&state.crdt.doc.merge_jobs, idx) else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
apply_and_persist(&mut state, |s| s.crdt.doc.merge_jobs.delete(op_id));
|
|
||||||
state.merge_job_index = rebuild_merge_job_index(&state.crdt);
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn extract_merge_job_view(entry: &MergeJobCrdt) -> Option<MergeJobView> {
|
|
||||||
let story_id = match entry.story_id.view() {
|
|
||||||
JsonValue::String(s) if !s.is_empty() => s,
|
|
||||||
_ => return None,
|
|
||||||
};
|
|
||||||
let status = match entry.status.view() {
|
|
||||||
JsonValue::String(s) => s,
|
|
||||||
_ => String::new(),
|
|
||||||
};
|
|
||||||
let started_at = match entry.started_at.view() {
|
|
||||||
JsonValue::Number(n) => n,
|
|
||||||
_ => 0.0,
|
|
||||||
};
|
|
||||||
let finished_at = match entry.finished_at.view() {
|
|
||||||
JsonValue::Number(n) if n > 0.0 => Some(n),
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
let error = match entry.error.view() {
|
|
||||||
JsonValue::String(s) if !s.is_empty() => Some(s),
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
Some(MergeJobView {
|
|
||||||
story_id,
|
|
||||||
status,
|
|
||||||
started_at,
|
|
||||||
finished_at,
|
|
||||||
error,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── active_agents ─────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/// Write or update an active-agent entry keyed by `agent_id`.
|
|
||||||
pub fn write_active_agent(agent_id: &str, story_id: &str, node_id: &str, started_at: f64) {
|
|
||||||
let Some(state_mutex) = get_crdt() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
let Ok(mut state) = state_mutex.lock() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(&idx) = state.active_agent_index.get(agent_id) {
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.active_agents[idx]
|
|
||||||
.story_id
|
|
||||||
.set(story_id.to_string())
|
|
||||||
});
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.active_agents[idx]
|
|
||||||
.node_id
|
|
||||||
.set(node_id.to_string())
|
|
||||||
});
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.active_agents[idx].started_at.set(started_at)
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
let entry: JsonValue = json!({
|
|
||||||
"agent_id": agent_id,
|
|
||||||
"story_id": story_id,
|
|
||||||
"node_id": node_id,
|
|
||||||
"started_at": started_at,
|
|
||||||
})
|
|
||||||
.into();
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.active_agents.insert(ROOT_ID, entry)
|
|
||||||
});
|
|
||||||
state.active_agent_index = rebuild_active_agent_index(&state.crdt);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read all active-agent entries.
|
|
||||||
pub fn read_all_active_agents() -> Option<Vec<ActiveAgentView>> {
|
|
||||||
let state_mutex = get_crdt()?;
|
|
||||||
let state = state_mutex.lock().ok()?;
|
|
||||||
let mut out = Vec::new();
|
|
||||||
for entry in state.crdt.doc.active_agents.iter() {
|
|
||||||
if let Some(v) = extract_active_agent_view(entry) {
|
|
||||||
out.push(v);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(out)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read a single active-agent entry by `agent_id`.
|
|
||||||
pub fn read_active_agent(agent_id: &str) -> Option<ActiveAgentView> {
|
|
||||||
let state_mutex = get_crdt()?;
|
|
||||||
let state = state_mutex.lock().ok()?;
|
|
||||||
let &idx = state.active_agent_index.get(agent_id)?;
|
|
||||||
extract_active_agent_view(&state.crdt.doc.active_agents[idx])
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Tombstone an active-agent entry by `agent_id`.
|
|
||||||
pub fn delete_active_agent(agent_id: &str) -> bool {
|
|
||||||
let Some(state_mutex) = get_crdt() else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Ok(mut state) = state_mutex.lock() else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Some(&idx) = state.active_agent_index.get(agent_id) else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Some(op_id) = list_id_at(&state.crdt.doc.active_agents, idx) else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
apply_and_persist(&mut state, |s| s.crdt.doc.active_agents.delete(op_id));
|
|
||||||
state.active_agent_index = rebuild_active_agent_index(&state.crdt);
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn extract_active_agent_view(entry: &ActiveAgentCrdt) -> Option<ActiveAgentView> {
|
|
||||||
let agent_id = match entry.agent_id.view() {
|
|
||||||
JsonValue::String(s) if !s.is_empty() => s,
|
|
||||||
_ => return None,
|
|
||||||
};
|
|
||||||
let story_id = match entry.story_id.view() {
|
|
||||||
JsonValue::String(s) if !s.is_empty() => Some(s),
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
let node_id = match entry.node_id.view() {
|
|
||||||
JsonValue::String(s) if !s.is_empty() => Some(s),
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
let started_at = match entry.started_at.view() {
|
|
||||||
JsonValue::Number(n) => n,
|
|
||||||
_ => 0.0,
|
|
||||||
};
|
|
||||||
Some(ActiveAgentView {
|
|
||||||
agent_id,
|
|
||||||
story_id,
|
|
||||||
node_id,
|
|
||||||
started_at,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── test_jobs ─────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/// Write or update a test-job entry keyed by `story_id`.
|
|
||||||
pub fn write_test_job(
|
|
||||||
story_id: &str,
|
|
||||||
status: &str,
|
|
||||||
started_at: f64,
|
|
||||||
finished_at: Option<f64>,
|
|
||||||
output: Option<&str>,
|
|
||||||
) {
|
|
||||||
let Some(state_mutex) = get_crdt() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
let Ok(mut state) = state_mutex.lock() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(&idx) = state.test_job_index.get(story_id) {
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.test_jobs[idx].status.set(status.to_string())
|
|
||||||
});
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.test_jobs[idx].started_at.set(started_at)
|
|
||||||
});
|
|
||||||
if let Some(fa) = finished_at {
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.test_jobs[idx].finished_at.set(fa)
|
|
||||||
});
|
|
||||||
}
|
|
||||||
if let Some(o) = output {
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.test_jobs[idx].output.set(o.to_string())
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
let entry: JsonValue = json!({
|
|
||||||
"story_id": story_id,
|
|
||||||
"status": status,
|
|
||||||
"started_at": started_at,
|
|
||||||
"finished_at": finished_at.unwrap_or(0.0),
|
|
||||||
"output": output.unwrap_or(""),
|
|
||||||
})
|
|
||||||
.into();
|
|
||||||
apply_and_persist(&mut state, |s| s.crdt.doc.test_jobs.insert(ROOT_ID, entry));
|
|
||||||
state.test_job_index = rebuild_test_job_index(&state.crdt);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read all test-job entries.
|
|
||||||
pub fn read_all_test_jobs() -> Option<Vec<TestJobView>> {
|
|
||||||
let state_mutex = get_crdt()?;
|
|
||||||
let state = state_mutex.lock().ok()?;
|
|
||||||
let mut out = Vec::new();
|
|
||||||
for entry in state.crdt.doc.test_jobs.iter() {
|
|
||||||
if let Some(v) = extract_test_job_view(entry) {
|
|
||||||
out.push(v);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(out)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read a single test-job entry by `story_id`.
|
|
||||||
pub fn read_test_job(story_id: &str) -> Option<TestJobView> {
|
|
||||||
let state_mutex = get_crdt()?;
|
|
||||||
let state = state_mutex.lock().ok()?;
|
|
||||||
let &idx = state.test_job_index.get(story_id)?;
|
|
||||||
extract_test_job_view(&state.crdt.doc.test_jobs[idx])
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Tombstone a test-job entry by `story_id`.
|
|
||||||
pub fn delete_test_job(story_id: &str) -> bool {
|
|
||||||
let Some(state_mutex) = get_crdt() else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Ok(mut state) = state_mutex.lock() else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Some(&idx) = state.test_job_index.get(story_id) else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Some(op_id) = list_id_at(&state.crdt.doc.test_jobs, idx) else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
apply_and_persist(&mut state, |s| s.crdt.doc.test_jobs.delete(op_id));
|
|
||||||
state.test_job_index = rebuild_test_job_index(&state.crdt);
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn extract_test_job_view(entry: &TestJobCrdt) -> Option<TestJobView> {
|
|
||||||
let story_id = match entry.story_id.view() {
|
|
||||||
JsonValue::String(s) if !s.is_empty() => s,
|
|
||||||
_ => return None,
|
|
||||||
};
|
|
||||||
let status = match entry.status.view() {
|
|
||||||
JsonValue::String(s) => s,
|
|
||||||
_ => String::new(),
|
|
||||||
};
|
|
||||||
let started_at = match entry.started_at.view() {
|
|
||||||
JsonValue::Number(n) => n,
|
|
||||||
_ => 0.0,
|
|
||||||
};
|
|
||||||
let finished_at = match entry.finished_at.view() {
|
|
||||||
JsonValue::Number(n) if n > 0.0 => Some(n),
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
let output = match entry.output.view() {
|
|
||||||
JsonValue::String(s) if !s.is_empty() => Some(s),
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
Some(TestJobView {
|
|
||||||
story_id,
|
|
||||||
status,
|
|
||||||
started_at,
|
|
||||||
finished_at,
|
|
||||||
output,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── agent_throttle ────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/// Write or update an agent-throttle entry keyed by `node_id`.
|
|
||||||
pub fn write_agent_throttle(node_id: &str, window_start: f64, count: f64, limit: f64) {
|
|
||||||
let Some(state_mutex) = get_crdt() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
let Ok(mut state) = state_mutex.lock() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(&idx) = state.agent_throttle_index.get(node_id) {
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.agent_throttle[idx]
|
|
||||||
.window_start
|
|
||||||
.set(window_start)
|
|
||||||
});
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.agent_throttle[idx].count.set(count)
|
|
||||||
});
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.agent_throttle[idx].limit.set(limit)
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
let entry: JsonValue = json!({
|
|
||||||
"node_id": node_id,
|
|
||||||
"window_start": window_start,
|
|
||||||
"count": count,
|
|
||||||
"limit": limit,
|
|
||||||
})
|
|
||||||
.into();
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.agent_throttle.insert(ROOT_ID, entry)
|
|
||||||
});
|
|
||||||
state.agent_throttle_index = rebuild_agent_throttle_index(&state.crdt);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read all agent-throttle entries.
|
|
||||||
pub fn read_all_agent_throttles() -> Option<Vec<AgentThrottleView>> {
|
|
||||||
let state_mutex = get_crdt()?;
|
|
||||||
let state = state_mutex.lock().ok()?;
|
|
||||||
let mut out = Vec::new();
|
|
||||||
for entry in state.crdt.doc.agent_throttle.iter() {
|
|
||||||
if let Some(v) = extract_agent_throttle_view(entry) {
|
|
||||||
out.push(v);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(out)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read a single agent-throttle entry by `node_id`.
|
|
||||||
pub fn read_agent_throttle(node_id: &str) -> Option<AgentThrottleView> {
|
|
||||||
let state_mutex = get_crdt()?;
|
|
||||||
let state = state_mutex.lock().ok()?;
|
|
||||||
let &idx = state.agent_throttle_index.get(node_id)?;
|
|
||||||
extract_agent_throttle_view(&state.crdt.doc.agent_throttle[idx])
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Tombstone an agent-throttle entry by `node_id`.
|
|
||||||
pub fn delete_agent_throttle(node_id: &str) -> bool {
|
|
||||||
let Some(state_mutex) = get_crdt() else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Ok(mut state) = state_mutex.lock() else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Some(&idx) = state.agent_throttle_index.get(node_id) else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Some(op_id) = list_id_at(&state.crdt.doc.agent_throttle, idx) else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
apply_and_persist(&mut state, |s| s.crdt.doc.agent_throttle.delete(op_id));
|
|
||||||
state.agent_throttle_index = rebuild_agent_throttle_index(&state.crdt);
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn extract_agent_throttle_view(entry: &AgentThrottleCrdt) -> Option<AgentThrottleView> {
|
|
||||||
let node_id = match entry.node_id.view() {
|
|
||||||
JsonValue::String(s) if !s.is_empty() => s,
|
|
||||||
_ => return None,
|
|
||||||
};
|
|
||||||
let window_start = match entry.window_start.view() {
|
|
||||||
JsonValue::Number(n) => n,
|
|
||||||
_ => 0.0,
|
|
||||||
};
|
|
||||||
let count = match entry.count.view() {
|
|
||||||
JsonValue::Number(n) => n,
|
|
||||||
_ => 0.0,
|
|
||||||
};
|
|
||||||
let limit = match entry.limit.view() {
|
|
||||||
JsonValue::Number(n) => n,
|
|
||||||
_ => 0.0,
|
|
||||||
};
|
|
||||||
Some(AgentThrottleView {
|
|
||||||
node_id,
|
|
||||||
window_start,
|
|
||||||
count,
|
|
||||||
limit,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── gateway_projects ──────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/// Write or update a gateway-project entry keyed by `name`.
|
|
||||||
///
|
|
||||||
/// If an entry for `name` already exists it is updated in place;
|
|
||||||
/// otherwise a new entry is inserted.
|
|
||||||
pub fn write_gateway_project(name: &str, url: &str) {
|
|
||||||
let Some(state_mutex) = get_crdt() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
let Ok(mut state) = state_mutex.lock() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(&idx) = state.gateway_project_index.get(name) {
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.gateway_projects[idx].url.set(url.to_string())
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
let entry: JsonValue = json!({
|
|
||||||
"name": name,
|
|
||||||
"url": url,
|
|
||||||
})
|
|
||||||
.into();
|
|
||||||
apply_and_persist(&mut state, |s| {
|
|
||||||
s.crdt.doc.gateway_projects.insert(ROOT_ID, entry)
|
|
||||||
});
|
|
||||||
state.gateway_project_index = rebuild_gateway_project_index(&state.crdt);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read all gateway-project entries.
|
|
||||||
pub fn read_all_gateway_projects() -> Option<Vec<GatewayProjectView>> {
|
|
||||||
let state_mutex = get_crdt()?;
|
|
||||||
let state = state_mutex.lock().ok()?;
|
|
||||||
let mut out = Vec::new();
|
|
||||||
for entry in state.crdt.doc.gateway_projects.iter() {
|
|
||||||
if let Some(v) = extract_gateway_project_view(entry) {
|
|
||||||
out.push(v);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Some(out)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Read a single gateway-project entry by `name`.
|
|
||||||
pub fn read_gateway_project(name: &str) -> Option<GatewayProjectView> {
|
|
||||||
let state_mutex = get_crdt()?;
|
|
||||||
let state = state_mutex.lock().ok()?;
|
|
||||||
let &idx = state.gateway_project_index.get(name)?;
|
|
||||||
extract_gateway_project_view(&state.crdt.doc.gateway_projects[idx])
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Tombstone a gateway-project entry by `name`.
|
|
||||||
///
|
|
||||||
/// Returns `true` if the entry existed and a delete op was issued.
|
|
||||||
pub fn delete_gateway_project(name: &str) -> bool {
|
|
||||||
let Some(state_mutex) = get_crdt() else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Ok(mut state) = state_mutex.lock() else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Some(&idx) = state.gateway_project_index.get(name) else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let Some(op_id) = list_id_at(&state.crdt.doc.gateway_projects, idx) else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
apply_and_persist(&mut state, |s| s.crdt.doc.gateway_projects.delete(op_id));
|
|
||||||
state.gateway_project_index = rebuild_gateway_project_index(&state.crdt);
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn extract_gateway_project_view(entry: &GatewayProjectCrdt) -> Option<GatewayProjectView> {
|
|
||||||
let name = match entry.name.view() {
|
|
||||||
JsonValue::String(s) if !s.is_empty() => s,
|
|
||||||
_ => return None,
|
|
||||||
};
|
|
||||||
let url = match entry.url.view() {
|
|
||||||
JsonValue::String(s) => s,
|
|
||||||
_ => String::new(),
|
|
||||||
};
|
|
||||||
Some(GatewayProjectView { name, url })
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Tests ─────────────────────────────────────────────────────────────
|
|
||||||
//
|
|
||||||
// The `tokens` collection is used as the representative collection for the
|
|
||||||
// required CRDT semantics tests (insert, update, delete-via-tombstone, and
|
|
||||||
// concurrent write semantics).
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::super::state::init_for_test;
|
|
||||||
use super::*;
|
|
||||||
use bft_json_crdt::json_crdt::{BaseCrdt, JsonValue, OpState};
|
|
||||||
use bft_json_crdt::keypair::make_keypair;
|
|
||||||
use bft_json_crdt::op::ROOT_ID;
|
|
||||||
|
|
||||||
use super::super::types::PipelineDoc;
|
|
||||||
|
|
||||||
// ── insert ────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn token_insert_is_visible_via_read() {
|
|
||||||
init_for_test();
|
|
||||||
write_token_usage("coder-1:42", "42", 100.0, 200.0, 1_000_000.0);
|
|
||||||
|
|
||||||
let view = read_token_usage("coder-1:42").expect("entry must exist after insert");
|
|
||||||
assert_eq!(view.agent_id, "coder-1:42");
|
|
||||||
assert_eq!(view.story_id.as_deref(), Some("42"));
|
|
||||||
assert!((view.input_tokens - 100.0).abs() < f64::EPSILON);
|
|
||||||
assert!((view.output_tokens - 200.0).abs() < f64::EPSILON);
|
|
||||||
assert!((view.timestamp - 1_000_000.0).abs() < f64::EPSILON);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn token_read_all_returns_inserted_entries() {
|
|
||||||
init_for_test();
|
|
||||||
write_token_usage("coder-a:10", "10", 10.0, 20.0, 1.0);
|
|
||||||
write_token_usage("coder-b:10", "10", 30.0, 40.0, 2.0);
|
|
||||||
|
|
||||||
let all = read_all_token_usage().unwrap_or_default();
|
|
||||||
let ids: Vec<&str> = all.iter().map(|v| v.agent_id.as_str()).collect();
|
|
||||||
assert!(
|
|
||||||
ids.contains(&"coder-a:10"),
|
|
||||||
"coder-a:10 must be in read_all"
|
|
||||||
);
|
|
||||||
assert!(
|
|
||||||
ids.contains(&"coder-b:10"),
|
|
||||||
"coder-b:10 must be in read_all"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── update ────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn token_update_overwrites_fields() {
|
|
||||||
init_for_test();
|
|
||||||
write_token_usage("coder-2:55", "55", 50.0, 60.0, 2_000_000.0);
|
|
||||||
// Update with new token counts.
|
|
||||||
write_token_usage("coder-2:55", "55", 500.0, 600.0, 3_000_000.0);
|
|
||||||
|
|
||||||
let view = read_token_usage("coder-2:55").expect("entry must exist after update");
|
|
||||||
assert!((view.input_tokens - 500.0).abs() < f64::EPSILON);
|
|
||||||
assert!((view.output_tokens - 600.0).abs() < f64::EPSILON);
|
|
||||||
assert!((view.timestamp - 3_000_000.0).abs() < f64::EPSILON);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── delete-via-tombstone ──────────────────────────────────────────
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn token_delete_removes_entry_from_read() {
|
|
||||||
init_for_test();
|
|
||||||
write_token_usage("coder-3:77", "77", 1.0, 2.0, 9_999.0);
|
|
||||||
assert!(
|
|
||||||
read_token_usage("coder-3:77").is_some(),
|
|
||||||
"entry must exist before delete"
|
|
||||||
);
|
|
||||||
|
|
||||||
let deleted = delete_token_usage("coder-3:77");
|
|
||||||
assert!(deleted, "delete must return true for a known entry");
|
|
||||||
|
|
||||||
assert!(
|
|
||||||
read_token_usage("coder-3:77").is_none(),
|
|
||||||
"entry must be absent after tombstone"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn token_delete_nonexistent_returns_false() {
|
|
||||||
init_for_test();
|
|
||||||
assert!(!delete_token_usage("no-such-agent"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn token_delete_not_returned_by_read_all() {
|
|
||||||
init_for_test();
|
|
||||||
write_token_usage("coder-4:88", "88", 5.0, 10.0, 1.0);
|
|
||||||
delete_token_usage("coder-4:88");
|
|
||||||
|
|
||||||
let all = read_all_token_usage().unwrap_or_default();
|
|
||||||
assert!(
|
|
||||||
!all.iter().any(|v| v.agent_id == "coder-4:88"),
|
|
||||||
"deleted entry must not appear in read_all"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── concurrent write semantics (LWW convergence) ──────────────────
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn token_concurrent_writes_converge_via_lww() {
|
|
||||||
// Two independent CRDTs simulate two nodes writing concurrently.
|
|
||||||
let kp_a = make_keypair();
|
|
||||||
let kp_b = make_keypair();
|
|
||||||
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
|
|
||||||
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
|
|
||||||
|
|
||||||
// Node A inserts a token entry.
|
|
||||||
let entry_a: JsonValue = json!({
|
|
||||||
"agent_id": "coder-x:99",
|
|
||||||
"story_id": "99",
|
|
||||||
"input_tokens": 10.0,
|
|
||||||
"output_tokens": 20.0,
|
|
||||||
"timestamp": 1.0,
|
|
||||||
})
|
|
||||||
.into();
|
|
||||||
let insert_a = crdt_a.doc.tokens.insert(ROOT_ID, entry_a).sign(&kp_a);
|
|
||||||
assert_eq!(crdt_a.apply(insert_a.clone()), OpState::Ok);
|
|
||||||
|
|
||||||
// Node B inserts the same key with different values.
|
|
||||||
let entry_b: JsonValue = json!({
|
|
||||||
"agent_id": "coder-x:99",
|
|
||||||
"story_id": "99",
|
|
||||||
"input_tokens": 999.0,
|
|
||||||
"output_tokens": 888.0,
|
|
||||||
"timestamp": 1.0,
|
|
||||||
})
|
|
||||||
.into();
|
|
||||||
let insert_b = crdt_b.doc.tokens.insert(ROOT_ID, entry_b).sign(&kp_b);
|
|
||||||
assert_eq!(crdt_b.apply(insert_b.clone()), OpState::Ok);
|
|
||||||
|
|
||||||
// Both nodes update input_tokens concurrently.
|
|
||||||
let update_a = crdt_a.doc.tokens[0].input_tokens.set(111.0).sign(&kp_a);
|
|
||||||
let update_b = crdt_b.doc.tokens[0].input_tokens.set(222.0).sign(&kp_b);
|
|
||||||
|
|
||||||
assert_eq!(crdt_a.apply(update_a.clone()), OpState::Ok);
|
|
||||||
assert_eq!(crdt_b.apply(update_b.clone()), OpState::Ok);
|
|
||||||
|
|
||||||
// Cross-apply: A gets B's ops, B gets A's ops.
|
|
||||||
assert_eq!(crdt_a.apply(insert_b), OpState::Ok);
|
|
||||||
assert_eq!(crdt_a.apply(update_b), OpState::Ok);
|
|
||||||
assert_eq!(crdt_b.apply(insert_a), OpState::Ok);
|
|
||||||
assert_eq!(crdt_b.apply(update_a), OpState::Ok);
|
|
||||||
|
|
||||||
// Both CRDTs must converge to the same view — compare field by field.
|
|
||||||
assert_eq!(
|
|
||||||
crdt_a.doc.tokens.view().len(),
|
|
||||||
crdt_b.doc.tokens.view().len(),
|
|
||||||
"both peers must have the same number of token entries"
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
crdt_a.doc.tokens[0].input_tokens.view(),
|
|
||||||
crdt_b.doc.tokens[0].input_tokens.view(),
|
|
||||||
"concurrent writes to input_tokens must converge to the same value"
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
crdt_a.doc.tokens[0].output_tokens.view(),
|
|
||||||
crdt_b.doc.tokens[0].output_tokens.view(),
|
|
||||||
"concurrent writes must converge for output_tokens"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── smoke tests for the other four collections ────────────────────
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn merge_job_insert_update_delete() {
|
|
||||||
init_for_test();
|
|
||||||
write_merge_job("100", "pending", 1.0, None, None);
|
|
||||||
let v = read_merge_job("100").expect("merge job must exist");
|
|
||||||
assert_eq!(v.status, "pending");
|
|
||||||
|
|
||||||
write_merge_job("100", "done", 1.0, Some(2.0), None);
|
|
||||||
let v2 = read_merge_job("100").expect("merge job must exist after update");
|
|
||||||
assert_eq!(v2.status, "done");
|
|
||||||
assert_eq!(v2.finished_at, Some(2.0));
|
|
||||||
|
|
||||||
assert!(delete_merge_job("100"));
|
|
||||||
assert!(read_merge_job("100").is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn active_agent_insert_update_delete() {
|
|
||||||
init_for_test();
|
|
||||||
write_active_agent("coder-5", "200", "node-abc", 5.0);
|
|
||||||
let v = read_active_agent("coder-5").expect("active agent must exist");
|
|
||||||
assert_eq!(v.story_id.as_deref(), Some("200"));
|
|
||||||
|
|
||||||
write_active_agent("coder-5", "201", "node-abc", 6.0);
|
|
||||||
let v2 = read_active_agent("coder-5").expect("active agent must exist after update");
|
|
||||||
assert_eq!(v2.story_id.as_deref(), Some("201"));
|
|
||||||
|
|
||||||
assert!(delete_active_agent("coder-5"));
|
|
||||||
assert!(read_active_agent("coder-5").is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_job_insert_update_delete() {
|
|
||||||
init_for_test();
|
|
||||||
write_test_job("300", "running", 7.0, None, None);
|
|
||||||
let v = read_test_job("300").expect("test job must exist");
|
|
||||||
assert_eq!(v.status, "running");
|
|
||||||
|
|
||||||
write_test_job("300", "pass", 7.0, Some(8.0), Some("all green"));
|
|
||||||
let v2 = read_test_job("300").expect("test job must exist after update");
|
|
||||||
assert_eq!(v2.status, "pass");
|
|
||||||
assert_eq!(v2.output.as_deref(), Some("all green"));
|
|
||||||
|
|
||||||
assert!(delete_test_job("300"));
|
|
||||||
assert!(read_test_job("300").is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn agent_throttle_insert_update_delete() {
|
|
||||||
init_for_test();
|
|
||||||
write_agent_throttle("node-z", 1000.0, 2.0, 5.0);
|
|
||||||
let v = read_agent_throttle("node-z").expect("throttle must exist");
|
|
||||||
assert!((v.count - 2.0).abs() < f64::EPSILON);
|
|
||||||
assert!((v.limit - 5.0).abs() < f64::EPSILON);
|
|
||||||
|
|
||||||
write_agent_throttle("node-z", 1000.0, 4.0, 5.0);
|
|
||||||
let v2 = read_agent_throttle("node-z").expect("throttle must exist after update");
|
|
||||||
assert!((v2.count - 4.0).abs() < f64::EPSILON);
|
|
||||||
|
|
||||||
assert!(delete_agent_throttle("node-z"));
|
|
||||||
assert!(read_agent_throttle("node-z").is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── merge_jobs: concurrent-write LWW resolution ───────────────────
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn merge_job_concurrent_writes_converge_via_lww() {
|
|
||||||
// Two independent CRDTs simulate two nodes writing concurrently.
|
|
||||||
let kp_a = make_keypair();
|
|
||||||
let kp_b = make_keypair();
|
|
||||||
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
|
|
||||||
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
|
|
||||||
|
|
||||||
// Node A inserts a merge-job entry.
|
|
||||||
let entry_a: JsonValue = json!({
|
|
||||||
"story_id": "500_story_concurrent",
|
|
||||||
"status": "pending",
|
|
||||||
"started_at": 1.0,
|
|
||||||
"finished_at": 0.0,
|
|
||||||
"error": "",
|
|
||||||
})
|
|
||||||
.into();
|
|
||||||
let insert_a = crdt_a.doc.merge_jobs.insert(ROOT_ID, entry_a).sign(&kp_a);
|
|
||||||
assert_eq!(crdt_a.apply(insert_a.clone()), OpState::Ok);
|
|
||||||
|
|
||||||
// Node B inserts the same story_id with a different status concurrently.
|
|
||||||
let entry_b: JsonValue = json!({
|
|
||||||
"story_id": "500_story_concurrent",
|
|
||||||
"status": "running",
|
|
||||||
"started_at": 1.0,
|
|
||||||
"finished_at": 0.0,
|
|
||||||
"error": "",
|
|
||||||
})
|
|
||||||
.into();
|
|
||||||
let insert_b = crdt_b.doc.merge_jobs.insert(ROOT_ID, entry_b).sign(&kp_b);
|
|
||||||
assert_eq!(crdt_b.apply(insert_b.clone()), OpState::Ok);
|
|
||||||
|
|
||||||
// Both nodes concurrently update the status field.
|
|
||||||
let update_a = crdt_a.doc.merge_jobs[0]
|
|
||||||
.status
|
|
||||||
.set("done".to_string())
|
|
||||||
.sign(&kp_a);
|
|
||||||
let update_b = crdt_b.doc.merge_jobs[0]
|
|
||||||
.status
|
|
||||||
.set("failed".to_string())
|
|
||||||
.sign(&kp_b);
|
|
||||||
|
|
||||||
assert_eq!(crdt_a.apply(update_a.clone()), OpState::Ok);
|
|
||||||
assert_eq!(crdt_b.apply(update_b.clone()), OpState::Ok);
|
|
||||||
|
|
||||||
// Cross-apply: A gets B's ops, B gets A's ops.
|
|
||||||
assert_eq!(crdt_a.apply(insert_b), OpState::Ok);
|
|
||||||
assert_eq!(crdt_a.apply(update_b), OpState::Ok);
|
|
||||||
assert_eq!(crdt_b.apply(insert_a), OpState::Ok);
|
|
||||||
assert_eq!(crdt_b.apply(update_a), OpState::Ok);
|
|
||||||
|
|
||||||
// Both CRDTs must converge to the same view.
|
|
||||||
assert_eq!(
|
|
||||||
crdt_a.doc.merge_jobs.view().len(),
|
|
||||||
crdt_b.doc.merge_jobs.view().len(),
|
|
||||||
"both peers must have the same number of merge_job entries"
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
crdt_a.doc.merge_jobs[0].status.view(),
|
|
||||||
crdt_b.doc.merge_jobs[0].status.view(),
|
|
||||||
"concurrent writes to status must converge to the same value via LWW"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,116 @@
|
|||||||
|
//! Read/write helpers for the `active_agents` LWW-map collection.
|
||||||
|
//!
|
||||||
|
//! Active-agent entries are keyed by `agent_id` and track which story and node
|
||||||
|
//! an agent is currently working on, along with when it started.
|
||||||
|
|
||||||
|
use bft_json_crdt::json_crdt::{JsonValue, *};
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use super::super::state::{apply_and_persist, get_crdt, rebuild_active_agent_index};
|
||||||
|
use super::super::types::{ActiveAgentCrdt, ActiveAgentView};
|
||||||
|
use super::list_id_at;
|
||||||
|
|
||||||
|
/// Write or update an active-agent entry keyed by `agent_id`.
|
||||||
|
pub fn write_active_agent(agent_id: &str, story_id: &str, node_id: &str, started_at: f64) {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(&idx) = state.active_agent_index.get(agent_id) {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.active_agents[idx]
|
||||||
|
.story_id
|
||||||
|
.set(story_id.to_string())
|
||||||
|
});
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.active_agents[idx]
|
||||||
|
.node_id
|
||||||
|
.set(node_id.to_string())
|
||||||
|
});
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.active_agents[idx].started_at.set(started_at)
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
let entry: JsonValue = json!({
|
||||||
|
"agent_id": agent_id,
|
||||||
|
"story_id": story_id,
|
||||||
|
"node_id": node_id,
|
||||||
|
"started_at": started_at,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.active_agents.insert(ROOT_ID, entry)
|
||||||
|
});
|
||||||
|
state.active_agent_index = rebuild_active_agent_index(&state.crdt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read all active-agent entries.
|
||||||
|
pub fn read_all_active_agents() -> Option<Vec<ActiveAgentView>> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
let mut out = Vec::new();
|
||||||
|
for entry in state.crdt.doc.active_agents.iter() {
|
||||||
|
if let Some(v) = extract_active_agent_view(entry) {
|
||||||
|
out.push(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read a single active-agent entry by `agent_id`.
|
||||||
|
pub fn read_active_agent(agent_id: &str) -> Option<ActiveAgentView> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
let &idx = state.active_agent_index.get(agent_id)?;
|
||||||
|
extract_active_agent_view(&state.crdt.doc.active_agents[idx])
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tombstone an active-agent entry by `agent_id`.
|
||||||
|
pub fn delete_active_agent(agent_id: &str) -> bool {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Some(&idx) = state.active_agent_index.get(agent_id) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Some(op_id) = list_id_at(&state.crdt.doc.active_agents, idx) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.active_agents.delete(op_id));
|
||||||
|
state.active_agent_index = rebuild_active_agent_index(&state.crdt);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert a CRDT active-agent entry into its read-only view representation.
|
||||||
|
pub(super) fn extract_active_agent_view(entry: &ActiveAgentCrdt) -> Option<ActiveAgentView> {
|
||||||
|
let agent_id = match entry.agent_id.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => s,
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
let story_id = match entry.story_id.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let node_id = match entry.node_id.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let started_at = match entry.started_at.view() {
|
||||||
|
JsonValue::Number(n) => n,
|
||||||
|
_ => 0.0,
|
||||||
|
};
|
||||||
|
Some(ActiveAgentView {
|
||||||
|
agent_id,
|
||||||
|
story_id,
|
||||||
|
node_id,
|
||||||
|
started_at,
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -0,0 +1,114 @@
|
|||||||
|
//! Read/write helpers for the `agent_throttle` LWW-map collection.
|
||||||
|
//!
|
||||||
|
//! Agent-throttle entries are keyed by `node_id` and track per-node sliding-
|
||||||
|
//! window launch counts used to enforce agent concurrency limits.
|
||||||
|
|
||||||
|
use bft_json_crdt::json_crdt::{JsonValue, *};
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use super::super::state::{apply_and_persist, get_crdt, rebuild_agent_throttle_index};
|
||||||
|
use super::super::types::{AgentThrottleCrdt, AgentThrottleView};
|
||||||
|
use super::list_id_at;
|
||||||
|
|
||||||
|
/// Write or update an agent-throttle entry keyed by `node_id`.
|
||||||
|
pub fn write_agent_throttle(node_id: &str, window_start: f64, count: f64, limit: f64) {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(&idx) = state.agent_throttle_index.get(node_id) {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.agent_throttle[idx]
|
||||||
|
.window_start
|
||||||
|
.set(window_start)
|
||||||
|
});
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.agent_throttle[idx].count.set(count)
|
||||||
|
});
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.agent_throttle[idx].limit.set(limit)
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
let entry: JsonValue = json!({
|
||||||
|
"node_id": node_id,
|
||||||
|
"window_start": window_start,
|
||||||
|
"count": count,
|
||||||
|
"limit": limit,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.agent_throttle.insert(ROOT_ID, entry)
|
||||||
|
});
|
||||||
|
state.agent_throttle_index = rebuild_agent_throttle_index(&state.crdt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read all agent-throttle entries.
|
||||||
|
pub fn read_all_agent_throttles() -> Option<Vec<AgentThrottleView>> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
let mut out = Vec::new();
|
||||||
|
for entry in state.crdt.doc.agent_throttle.iter() {
|
||||||
|
if let Some(v) = extract_agent_throttle_view(entry) {
|
||||||
|
out.push(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read a single agent-throttle entry by `node_id`.
|
||||||
|
pub fn read_agent_throttle(node_id: &str) -> Option<AgentThrottleView> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
let &idx = state.agent_throttle_index.get(node_id)?;
|
||||||
|
extract_agent_throttle_view(&state.crdt.doc.agent_throttle[idx])
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tombstone an agent-throttle entry by `node_id`.
|
||||||
|
pub fn delete_agent_throttle(node_id: &str) -> bool {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Some(&idx) = state.agent_throttle_index.get(node_id) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Some(op_id) = list_id_at(&state.crdt.doc.agent_throttle, idx) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.agent_throttle.delete(op_id));
|
||||||
|
state.agent_throttle_index = rebuild_agent_throttle_index(&state.crdt);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert a CRDT agent-throttle entry into its read-only view representation.
|
||||||
|
pub(super) fn extract_agent_throttle_view(entry: &AgentThrottleCrdt) -> Option<AgentThrottleView> {
|
||||||
|
let node_id = match entry.node_id.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => s,
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
let window_start = match entry.window_start.view() {
|
||||||
|
JsonValue::Number(n) => n,
|
||||||
|
_ => 0.0,
|
||||||
|
};
|
||||||
|
let count = match entry.count.view() {
|
||||||
|
JsonValue::Number(n) => n,
|
||||||
|
_ => 0.0,
|
||||||
|
};
|
||||||
|
let limit = match entry.limit.view() {
|
||||||
|
JsonValue::Number(n) => n,
|
||||||
|
_ => 0.0,
|
||||||
|
};
|
||||||
|
Some(AgentThrottleView {
|
||||||
|
node_id,
|
||||||
|
window_start,
|
||||||
|
count,
|
||||||
|
limit,
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -0,0 +1,98 @@
|
|||||||
|
//! Read/write helpers for the `gateway_projects` LWW-map collection.
|
||||||
|
//!
|
||||||
|
//! Gateway-project entries are keyed by `name` and store the URL of each
|
||||||
|
//! project container registered with the gateway.
|
||||||
|
|
||||||
|
use bft_json_crdt::json_crdt::{JsonValue, *};
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use super::super::state::{apply_and_persist, get_crdt, rebuild_gateway_project_index};
|
||||||
|
use super::super::types::{GatewayProjectCrdt, GatewayProjectView};
|
||||||
|
use super::list_id_at;
|
||||||
|
|
||||||
|
/// Write or update a gateway-project entry keyed by `name`.
|
||||||
|
///
|
||||||
|
/// If an entry for `name` already exists it is updated in place;
|
||||||
|
/// otherwise a new entry is inserted.
|
||||||
|
pub fn write_gateway_project(name: &str, url: &str) {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(&idx) = state.gateway_project_index.get(name) {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.gateway_projects[idx].url.set(url.to_string())
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
let entry: JsonValue = json!({
|
||||||
|
"name": name,
|
||||||
|
"url": url,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.gateway_projects.insert(ROOT_ID, entry)
|
||||||
|
});
|
||||||
|
state.gateway_project_index = rebuild_gateway_project_index(&state.crdt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read all gateway-project entries.
|
||||||
|
pub fn read_all_gateway_projects() -> Option<Vec<GatewayProjectView>> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
let mut out = Vec::new();
|
||||||
|
for entry in state.crdt.doc.gateway_projects.iter() {
|
||||||
|
if let Some(v) = extract_gateway_project_view(entry) {
|
||||||
|
out.push(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read a single gateway-project entry by `name`.
|
||||||
|
pub fn read_gateway_project(name: &str) -> Option<GatewayProjectView> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
let &idx = state.gateway_project_index.get(name)?;
|
||||||
|
extract_gateway_project_view(&state.crdt.doc.gateway_projects[idx])
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tombstone a gateway-project entry by `name`.
|
||||||
|
///
|
||||||
|
/// Returns `true` if the entry existed and a delete op was issued.
|
||||||
|
pub fn delete_gateway_project(name: &str) -> bool {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Some(&idx) = state.gateway_project_index.get(name) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Some(op_id) = list_id_at(&state.crdt.doc.gateway_projects, idx) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.gateway_projects.delete(op_id));
|
||||||
|
state.gateway_project_index = rebuild_gateway_project_index(&state.crdt);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert a CRDT gateway-project entry into its read-only view representation.
|
||||||
|
pub(super) fn extract_gateway_project_view(
|
||||||
|
entry: &GatewayProjectCrdt,
|
||||||
|
) -> Option<GatewayProjectView> {
|
||||||
|
let name = match entry.name.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => s,
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
let url = match entry.url.view() {
|
||||||
|
JsonValue::String(s) => s,
|
||||||
|
_ => String::new(),
|
||||||
|
};
|
||||||
|
Some(GatewayProjectView { name, url })
|
||||||
|
}
|
||||||
@@ -0,0 +1,129 @@
|
|||||||
|
//! Read/write helpers for the `merge_jobs` LWW-map collection.
|
||||||
|
//!
|
||||||
|
//! Merge-job entries are keyed by `story_id` and track the status, timing,
|
||||||
|
//! and optional error for each story merge operation.
|
||||||
|
|
||||||
|
use bft_json_crdt::json_crdt::{JsonValue, *};
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use super::super::state::{apply_and_persist, get_crdt, rebuild_merge_job_index};
|
||||||
|
use super::super::types::{MergeJobCrdt, MergeJobView};
|
||||||
|
use super::list_id_at;
|
||||||
|
|
||||||
|
/// Write or update a merge-job entry keyed by `story_id`.
|
||||||
|
pub fn write_merge_job(
|
||||||
|
story_id: &str,
|
||||||
|
status: &str,
|
||||||
|
started_at: f64,
|
||||||
|
finished_at: Option<f64>,
|
||||||
|
error: Option<&str>,
|
||||||
|
) {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(&idx) = state.merge_job_index.get(story_id) {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.merge_jobs[idx].status.set(status.to_string())
|
||||||
|
});
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.merge_jobs[idx].started_at.set(started_at)
|
||||||
|
});
|
||||||
|
if let Some(fa) = finished_at {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.merge_jobs[idx].finished_at.set(fa)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if let Some(e) = error {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.merge_jobs[idx].error.set(e.to_string())
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let entry: JsonValue = json!({
|
||||||
|
"story_id": story_id,
|
||||||
|
"status": status,
|
||||||
|
"started_at": started_at,
|
||||||
|
"finished_at": finished_at.unwrap_or(0.0),
|
||||||
|
"error": error.unwrap_or(""),
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.merge_jobs.insert(ROOT_ID, entry));
|
||||||
|
state.merge_job_index = rebuild_merge_job_index(&state.crdt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read all merge-job entries.
|
||||||
|
pub fn read_all_merge_jobs() -> Option<Vec<MergeJobView>> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
let mut out = Vec::new();
|
||||||
|
for entry in state.crdt.doc.merge_jobs.iter() {
|
||||||
|
if let Some(v) = extract_merge_job_view(entry) {
|
||||||
|
out.push(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read a single merge-job entry by `story_id`.
|
||||||
|
pub fn read_merge_job(story_id: &str) -> Option<MergeJobView> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
let &idx = state.merge_job_index.get(story_id)?;
|
||||||
|
extract_merge_job_view(&state.crdt.doc.merge_jobs[idx])
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tombstone a merge-job entry by `story_id`.
|
||||||
|
pub fn delete_merge_job(story_id: &str) -> bool {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Some(&idx) = state.merge_job_index.get(story_id) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Some(op_id) = list_id_at(&state.crdt.doc.merge_jobs, idx) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.merge_jobs.delete(op_id));
|
||||||
|
state.merge_job_index = rebuild_merge_job_index(&state.crdt);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert a CRDT merge-job entry into its read-only view representation.
|
||||||
|
pub(super) fn extract_merge_job_view(entry: &MergeJobCrdt) -> Option<MergeJobView> {
|
||||||
|
let story_id = match entry.story_id.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => s,
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
let status = match entry.status.view() {
|
||||||
|
JsonValue::String(s) => s,
|
||||||
|
_ => String::new(),
|
||||||
|
};
|
||||||
|
let started_at = match entry.started_at.view() {
|
||||||
|
JsonValue::Number(n) => n,
|
||||||
|
_ => 0.0,
|
||||||
|
};
|
||||||
|
let finished_at = match entry.finished_at.view() {
|
||||||
|
JsonValue::Number(n) if n > 0.0 => Some(n),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let error = match entry.error.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
Some(MergeJobView {
|
||||||
|
story_id,
|
||||||
|
status,
|
||||||
|
started_at,
|
||||||
|
finished_at,
|
||||||
|
error,
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -0,0 +1,56 @@
|
|||||||
|
//! Read/write helpers for the five LWW-map CRDT collections:
|
||||||
|
//! `tokens`, `merge_jobs`, `active_agents`, `test_jobs`, and `agent_throttle`.
|
||||||
|
//!
|
||||||
|
//! Each collection is backed by a `ListCrdt` with a primary-key field.
|
||||||
|
//! A secondary index in [`super::state::CrdtState`] provides O(1) lookup by
|
||||||
|
//! key. The write functions follow the same insert-or-update pattern used by
|
||||||
|
//! [`super::presence`] for the `nodes` collection.
|
||||||
|
|
||||||
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use bft_json_crdt::json_crdt::CrdtNode;
|
||||||
|
use bft_json_crdt::list_crdt::ListCrdt;
|
||||||
|
use bft_json_crdt::op::OpId;
|
||||||
|
|
||||||
|
mod active_agents;
|
||||||
|
mod agent_throttle;
|
||||||
|
mod gateway_projects;
|
||||||
|
mod merge_jobs;
|
||||||
|
mod test_jobs;
|
||||||
|
mod tokens;
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests;
|
||||||
|
|
||||||
|
pub use active_agents::{
|
||||||
|
delete_active_agent, read_active_agent, read_all_active_agents, write_active_agent,
|
||||||
|
};
|
||||||
|
pub use agent_throttle::{
|
||||||
|
delete_agent_throttle, read_agent_throttle, read_all_agent_throttles, write_agent_throttle,
|
||||||
|
};
|
||||||
|
pub use gateway_projects::{
|
||||||
|
delete_gateway_project, read_all_gateway_projects, read_gateway_project, write_gateway_project,
|
||||||
|
};
|
||||||
|
pub use merge_jobs::{delete_merge_job, read_all_merge_jobs, read_merge_job, write_merge_job};
|
||||||
|
pub use test_jobs::{delete_test_job, read_all_test_jobs, read_test_job, write_test_job};
|
||||||
|
pub use tokens::{delete_token_usage, read_all_token_usage, read_token_usage, write_token_usage};
|
||||||
|
|
||||||
|
/// Return the `OpId` of the entry at iter-position `idx`, consistent with
|
||||||
|
/// `ListCrdt::iter()` and the `[]` index operator.
|
||||||
|
///
|
||||||
|
/// The built-in `id_at` counts non-deleted entries including the root sentinel
|
||||||
|
/// (which has `content = None`), causing an off-by-one mismatch with
|
||||||
|
/// `iter().enumerate()`. This helper requires `content.is_some()` so that the
|
||||||
|
/// secondary-index position maps correctly to an OpId.
|
||||||
|
pub(super) fn list_id_at<T: CrdtNode>(list: &ListCrdt<T>, idx: usize) -> Option<OpId> {
|
||||||
|
let mut i = 0;
|
||||||
|
for op in &list.ops {
|
||||||
|
if !op.is_deleted && op.content.is_some() {
|
||||||
|
if i == idx {
|
||||||
|
return Some(op.id);
|
||||||
|
}
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
@@ -0,0 +1,129 @@
|
|||||||
|
//! Read/write helpers for the `test_jobs` LWW-map collection.
|
||||||
|
//!
|
||||||
|
//! Test-job entries are keyed by `story_id` and track the status, timing,
|
||||||
|
//! and captured output for each test run.
|
||||||
|
|
||||||
|
use bft_json_crdt::json_crdt::{JsonValue, *};
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use super::super::state::{apply_and_persist, get_crdt, rebuild_test_job_index};
|
||||||
|
use super::super::types::{TestJobCrdt, TestJobView};
|
||||||
|
use super::list_id_at;
|
||||||
|
|
||||||
|
/// Write or update a test-job entry keyed by `story_id`.
|
||||||
|
pub fn write_test_job(
|
||||||
|
story_id: &str,
|
||||||
|
status: &str,
|
||||||
|
started_at: f64,
|
||||||
|
finished_at: Option<f64>,
|
||||||
|
output: Option<&str>,
|
||||||
|
) {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(&idx) = state.test_job_index.get(story_id) {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.test_jobs[idx].status.set(status.to_string())
|
||||||
|
});
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.test_jobs[idx].started_at.set(started_at)
|
||||||
|
});
|
||||||
|
if let Some(fa) = finished_at {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.test_jobs[idx].finished_at.set(fa)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if let Some(o) = output {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.test_jobs[idx].output.set(o.to_string())
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let entry: JsonValue = json!({
|
||||||
|
"story_id": story_id,
|
||||||
|
"status": status,
|
||||||
|
"started_at": started_at,
|
||||||
|
"finished_at": finished_at.unwrap_or(0.0),
|
||||||
|
"output": output.unwrap_or(""),
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.test_jobs.insert(ROOT_ID, entry));
|
||||||
|
state.test_job_index = rebuild_test_job_index(&state.crdt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read all test-job entries.
|
||||||
|
pub fn read_all_test_jobs() -> Option<Vec<TestJobView>> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
let mut out = Vec::new();
|
||||||
|
for entry in state.crdt.doc.test_jobs.iter() {
|
||||||
|
if let Some(v) = extract_test_job_view(entry) {
|
||||||
|
out.push(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read a single test-job entry by `story_id`.
|
||||||
|
pub fn read_test_job(story_id: &str) -> Option<TestJobView> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
let &idx = state.test_job_index.get(story_id)?;
|
||||||
|
extract_test_job_view(&state.crdt.doc.test_jobs[idx])
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tombstone a test-job entry by `story_id`.
|
||||||
|
pub fn delete_test_job(story_id: &str) -> bool {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Some(&idx) = state.test_job_index.get(story_id) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Some(op_id) = list_id_at(&state.crdt.doc.test_jobs, idx) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.test_jobs.delete(op_id));
|
||||||
|
state.test_job_index = rebuild_test_job_index(&state.crdt);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert a CRDT test-job entry into its read-only view representation.
|
||||||
|
pub(super) fn extract_test_job_view(entry: &TestJobCrdt) -> Option<TestJobView> {
|
||||||
|
let story_id = match entry.story_id.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => s,
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
let status = match entry.status.view() {
|
||||||
|
JsonValue::String(s) => s,
|
||||||
|
_ => String::new(),
|
||||||
|
};
|
||||||
|
let started_at = match entry.started_at.view() {
|
||||||
|
JsonValue::Number(n) => n,
|
||||||
|
_ => 0.0,
|
||||||
|
};
|
||||||
|
let finished_at = match entry.finished_at.view() {
|
||||||
|
JsonValue::Number(n) if n > 0.0 => Some(n),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let output = match entry.output.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
Some(TestJobView {
|
||||||
|
story_id,
|
||||||
|
status,
|
||||||
|
started_at,
|
||||||
|
finished_at,
|
||||||
|
output,
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -0,0 +1,297 @@
|
|||||||
|
//! Integration tests for the LWW-map CRDT collections.
|
||||||
|
//!
|
||||||
|
//! The `tokens` collection is used as the representative collection for the
|
||||||
|
//! required CRDT semantics tests (insert, update, delete-via-tombstone, and
|
||||||
|
//! concurrent write semantics). The remaining four collections have smoke tests.
|
||||||
|
|
||||||
|
use super::super::state::init_for_test;
|
||||||
|
use super::*;
|
||||||
|
use bft_json_crdt::json_crdt::{BaseCrdt, JsonValue, OpState};
|
||||||
|
use bft_json_crdt::keypair::make_keypair;
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use super::super::types::PipelineDoc;
|
||||||
|
|
||||||
|
// ── insert ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn token_insert_is_visible_via_read() {
|
||||||
|
init_for_test();
|
||||||
|
write_token_usage("coder-1:42", "42", 100.0, 200.0, 1_000_000.0);
|
||||||
|
|
||||||
|
let view = read_token_usage("coder-1:42").expect("entry must exist after insert");
|
||||||
|
assert_eq!(view.agent_id, "coder-1:42");
|
||||||
|
assert_eq!(view.story_id.as_deref(), Some("42"));
|
||||||
|
assert!((view.input_tokens - 100.0).abs() < f64::EPSILON);
|
||||||
|
assert!((view.output_tokens - 200.0).abs() < f64::EPSILON);
|
||||||
|
assert!((view.timestamp - 1_000_000.0).abs() < f64::EPSILON);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn token_read_all_returns_inserted_entries() {
|
||||||
|
init_for_test();
|
||||||
|
write_token_usage("coder-a:10", "10", 10.0, 20.0, 1.0);
|
||||||
|
write_token_usage("coder-b:10", "10", 30.0, 40.0, 2.0);
|
||||||
|
|
||||||
|
let all = read_all_token_usage().unwrap_or_default();
|
||||||
|
let ids: Vec<&str> = all.iter().map(|v| v.agent_id.as_str()).collect();
|
||||||
|
assert!(
|
||||||
|
ids.contains(&"coder-a:10"),
|
||||||
|
"coder-a:10 must be in read_all"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
ids.contains(&"coder-b:10"),
|
||||||
|
"coder-b:10 must be in read_all"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── update ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn token_update_overwrites_fields() {
|
||||||
|
init_for_test();
|
||||||
|
write_token_usage("coder-2:55", "55", 50.0, 60.0, 2_000_000.0);
|
||||||
|
// Update with new token counts.
|
||||||
|
write_token_usage("coder-2:55", "55", 500.0, 600.0, 3_000_000.0);
|
||||||
|
|
||||||
|
let view = read_token_usage("coder-2:55").expect("entry must exist after update");
|
||||||
|
assert!((view.input_tokens - 500.0).abs() < f64::EPSILON);
|
||||||
|
assert!((view.output_tokens - 600.0).abs() < f64::EPSILON);
|
||||||
|
assert!((view.timestamp - 3_000_000.0).abs() < f64::EPSILON);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── delete-via-tombstone ──────────────────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn token_delete_removes_entry_from_read() {
|
||||||
|
init_for_test();
|
||||||
|
write_token_usage("coder-3:77", "77", 1.0, 2.0, 9_999.0);
|
||||||
|
assert!(
|
||||||
|
read_token_usage("coder-3:77").is_some(),
|
||||||
|
"entry must exist before delete"
|
||||||
|
);
|
||||||
|
|
||||||
|
let deleted = delete_token_usage("coder-3:77");
|
||||||
|
assert!(deleted, "delete must return true for a known entry");
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
read_token_usage("coder-3:77").is_none(),
|
||||||
|
"entry must be absent after tombstone"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn token_delete_nonexistent_returns_false() {
|
||||||
|
init_for_test();
|
||||||
|
assert!(!delete_token_usage("no-such-agent"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn token_delete_not_returned_by_read_all() {
|
||||||
|
init_for_test();
|
||||||
|
write_token_usage("coder-4:88", "88", 5.0, 10.0, 1.0);
|
||||||
|
delete_token_usage("coder-4:88");
|
||||||
|
|
||||||
|
let all = read_all_token_usage().unwrap_or_default();
|
||||||
|
assert!(
|
||||||
|
!all.iter().any(|v| v.agent_id == "coder-4:88"),
|
||||||
|
"deleted entry must not appear in read_all"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── concurrent write semantics (LWW convergence) ──────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn token_concurrent_writes_converge_via_lww() {
|
||||||
|
// Two independent CRDTs simulate two nodes writing concurrently.
|
||||||
|
let kp_a = make_keypair();
|
||||||
|
let kp_b = make_keypair();
|
||||||
|
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
|
||||||
|
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
|
||||||
|
|
||||||
|
// Node A inserts a token entry.
|
||||||
|
let entry_a: JsonValue = json!({
|
||||||
|
"agent_id": "coder-x:99",
|
||||||
|
"story_id": "99",
|
||||||
|
"input_tokens": 10.0,
|
||||||
|
"output_tokens": 20.0,
|
||||||
|
"timestamp": 1.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let insert_a = crdt_a.doc.tokens.insert(ROOT_ID, entry_a).sign(&kp_a);
|
||||||
|
assert_eq!(crdt_a.apply(insert_a.clone()), OpState::Ok);
|
||||||
|
|
||||||
|
// Node B inserts the same key with different values.
|
||||||
|
let entry_b: JsonValue = json!({
|
||||||
|
"agent_id": "coder-x:99",
|
||||||
|
"story_id": "99",
|
||||||
|
"input_tokens": 999.0,
|
||||||
|
"output_tokens": 888.0,
|
||||||
|
"timestamp": 1.0,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let insert_b = crdt_b.doc.tokens.insert(ROOT_ID, entry_b).sign(&kp_b);
|
||||||
|
assert_eq!(crdt_b.apply(insert_b.clone()), OpState::Ok);
|
||||||
|
|
||||||
|
// Both nodes update input_tokens concurrently.
|
||||||
|
let update_a = crdt_a.doc.tokens[0].input_tokens.set(111.0).sign(&kp_a);
|
||||||
|
let update_b = crdt_b.doc.tokens[0].input_tokens.set(222.0).sign(&kp_b);
|
||||||
|
|
||||||
|
assert_eq!(crdt_a.apply(update_a.clone()), OpState::Ok);
|
||||||
|
assert_eq!(crdt_b.apply(update_b.clone()), OpState::Ok);
|
||||||
|
|
||||||
|
// Cross-apply: A gets B's ops, B gets A's ops.
|
||||||
|
assert_eq!(crdt_a.apply(insert_b), OpState::Ok);
|
||||||
|
assert_eq!(crdt_a.apply(update_b), OpState::Ok);
|
||||||
|
assert_eq!(crdt_b.apply(insert_a), OpState::Ok);
|
||||||
|
assert_eq!(crdt_b.apply(update_a), OpState::Ok);
|
||||||
|
|
||||||
|
// Both CRDTs must converge to the same view — compare field by field.
|
||||||
|
assert_eq!(
|
||||||
|
crdt_a.doc.tokens.view().len(),
|
||||||
|
crdt_b.doc.tokens.view().len(),
|
||||||
|
"both peers must have the same number of token entries"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
crdt_a.doc.tokens[0].input_tokens.view(),
|
||||||
|
crdt_b.doc.tokens[0].input_tokens.view(),
|
||||||
|
"concurrent writes to input_tokens must converge to the same value"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
crdt_a.doc.tokens[0].output_tokens.view(),
|
||||||
|
crdt_b.doc.tokens[0].output_tokens.view(),
|
||||||
|
"concurrent writes must converge for output_tokens"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── smoke tests for the other four collections ────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn merge_job_insert_update_delete() {
|
||||||
|
init_for_test();
|
||||||
|
write_merge_job("100", "pending", 1.0, None, None);
|
||||||
|
let v = read_merge_job("100").expect("merge job must exist");
|
||||||
|
assert_eq!(v.status, "pending");
|
||||||
|
|
||||||
|
write_merge_job("100", "done", 1.0, Some(2.0), None);
|
||||||
|
let v2 = read_merge_job("100").expect("merge job must exist after update");
|
||||||
|
assert_eq!(v2.status, "done");
|
||||||
|
assert_eq!(v2.finished_at, Some(2.0));
|
||||||
|
|
||||||
|
assert!(delete_merge_job("100"));
|
||||||
|
assert!(read_merge_job("100").is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn active_agent_insert_update_delete() {
|
||||||
|
init_for_test();
|
||||||
|
write_active_agent("coder-5", "200", "node-abc", 5.0);
|
||||||
|
let v = read_active_agent("coder-5").expect("active agent must exist");
|
||||||
|
assert_eq!(v.story_id.as_deref(), Some("200"));
|
||||||
|
|
||||||
|
write_active_agent("coder-5", "201", "node-abc", 6.0);
|
||||||
|
let v2 = read_active_agent("coder-5").expect("active agent must exist after update");
|
||||||
|
assert_eq!(v2.story_id.as_deref(), Some("201"));
|
||||||
|
|
||||||
|
assert!(delete_active_agent("coder-5"));
|
||||||
|
assert!(read_active_agent("coder-5").is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_job_insert_update_delete() {
|
||||||
|
init_for_test();
|
||||||
|
write_test_job("300", "running", 7.0, None, None);
|
||||||
|
let v = read_test_job("300").expect("test job must exist");
|
||||||
|
assert_eq!(v.status, "running");
|
||||||
|
|
||||||
|
write_test_job("300", "pass", 7.0, Some(8.0), Some("all green"));
|
||||||
|
let v2 = read_test_job("300").expect("test job must exist after update");
|
||||||
|
assert_eq!(v2.status, "pass");
|
||||||
|
assert_eq!(v2.output.as_deref(), Some("all green"));
|
||||||
|
|
||||||
|
assert!(delete_test_job("300"));
|
||||||
|
assert!(read_test_job("300").is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn agent_throttle_insert_update_delete() {
|
||||||
|
init_for_test();
|
||||||
|
write_agent_throttle("node-z", 1000.0, 2.0, 5.0);
|
||||||
|
let v = read_agent_throttle("node-z").expect("throttle must exist");
|
||||||
|
assert!((v.count - 2.0).abs() < f64::EPSILON);
|
||||||
|
assert!((v.limit - 5.0).abs() < f64::EPSILON);
|
||||||
|
|
||||||
|
write_agent_throttle("node-z", 1000.0, 4.0, 5.0);
|
||||||
|
let v2 = read_agent_throttle("node-z").expect("throttle must exist after update");
|
||||||
|
assert!((v2.count - 4.0).abs() < f64::EPSILON);
|
||||||
|
|
||||||
|
assert!(delete_agent_throttle("node-z"));
|
||||||
|
assert!(read_agent_throttle("node-z").is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── merge_jobs: concurrent-write LWW resolution ───────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn merge_job_concurrent_writes_converge_via_lww() {
|
||||||
|
// Two independent CRDTs simulate two nodes writing concurrently.
|
||||||
|
let kp_a = make_keypair();
|
||||||
|
let kp_b = make_keypair();
|
||||||
|
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
|
||||||
|
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
|
||||||
|
|
||||||
|
// Node A inserts a merge-job entry.
|
||||||
|
let entry_a: JsonValue = json!({
|
||||||
|
"story_id": "500_story_concurrent",
|
||||||
|
"status": "pending",
|
||||||
|
"started_at": 1.0,
|
||||||
|
"finished_at": 0.0,
|
||||||
|
"error": "",
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let insert_a = crdt_a.doc.merge_jobs.insert(ROOT_ID, entry_a).sign(&kp_a);
|
||||||
|
assert_eq!(crdt_a.apply(insert_a.clone()), OpState::Ok);
|
||||||
|
|
||||||
|
// Node B inserts the same story_id with a different status concurrently.
|
||||||
|
let entry_b: JsonValue = json!({
|
||||||
|
"story_id": "500_story_concurrent",
|
||||||
|
"status": "running",
|
||||||
|
"started_at": 1.0,
|
||||||
|
"finished_at": 0.0,
|
||||||
|
"error": "",
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
let insert_b = crdt_b.doc.merge_jobs.insert(ROOT_ID, entry_b).sign(&kp_b);
|
||||||
|
assert_eq!(crdt_b.apply(insert_b.clone()), OpState::Ok);
|
||||||
|
|
||||||
|
// Both nodes concurrently update the status field.
|
||||||
|
let update_a = crdt_a.doc.merge_jobs[0]
|
||||||
|
.status
|
||||||
|
.set("done".to_string())
|
||||||
|
.sign(&kp_a);
|
||||||
|
let update_b = crdt_b.doc.merge_jobs[0]
|
||||||
|
.status
|
||||||
|
.set("failed".to_string())
|
||||||
|
.sign(&kp_b);
|
||||||
|
|
||||||
|
assert_eq!(crdt_a.apply(update_a.clone()), OpState::Ok);
|
||||||
|
assert_eq!(crdt_b.apply(update_b.clone()), OpState::Ok);
|
||||||
|
|
||||||
|
// Cross-apply: A gets B's ops, B gets A's ops.
|
||||||
|
assert_eq!(crdt_a.apply(insert_b), OpState::Ok);
|
||||||
|
assert_eq!(crdt_a.apply(update_b), OpState::Ok);
|
||||||
|
assert_eq!(crdt_b.apply(insert_a), OpState::Ok);
|
||||||
|
assert_eq!(crdt_b.apply(update_a), OpState::Ok);
|
||||||
|
|
||||||
|
// Both CRDTs must converge to the same view.
|
||||||
|
assert_eq!(
|
||||||
|
crdt_a.doc.merge_jobs.view().len(),
|
||||||
|
crdt_b.doc.merge_jobs.view().len(),
|
||||||
|
"both peers must have the same number of merge_job entries"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
crdt_a.doc.merge_jobs[0].status.view(),
|
||||||
|
crdt_b.doc.merge_jobs[0].status.view(),
|
||||||
|
"concurrent writes to status must converge to the same value via LWW"
|
||||||
|
);
|
||||||
|
}
|
||||||
@@ -0,0 +1,130 @@
|
|||||||
|
//! Read/write helpers for the `tokens` LWW-map collection.
|
||||||
|
//!
|
||||||
|
//! Token-usage entries are keyed by `agent_id` and track per-agent input/output
|
||||||
|
//! token counts and a timestamp for the most recent write.
|
||||||
|
|
||||||
|
use bft_json_crdt::json_crdt::{JsonValue, *};
|
||||||
|
use bft_json_crdt::op::ROOT_ID;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use super::super::state::{apply_and_persist, get_crdt, rebuild_token_index};
|
||||||
|
use super::super::types::{TokenUsageCrdt, TokenUsageView};
|
||||||
|
use super::list_id_at;
|
||||||
|
|
||||||
|
/// Write or update a token-usage entry keyed by `agent_id`.
|
||||||
|
///
|
||||||
|
/// If an entry for `agent_id` already exists it is updated in place;
|
||||||
|
/// otherwise a new entry is inserted.
|
||||||
|
pub fn write_token_usage(
|
||||||
|
agent_id: &str,
|
||||||
|
story_id: &str,
|
||||||
|
input_tokens: f64,
|
||||||
|
output_tokens: f64,
|
||||||
|
timestamp: f64,
|
||||||
|
) {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(&idx) = state.token_index.get(agent_id) {
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.tokens[idx].story_id.set(story_id.to_string())
|
||||||
|
});
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.tokens[idx].input_tokens.set(input_tokens)
|
||||||
|
});
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.tokens[idx].output_tokens.set(output_tokens)
|
||||||
|
});
|
||||||
|
apply_and_persist(&mut state, |s| {
|
||||||
|
s.crdt.doc.tokens[idx].timestamp.set(timestamp)
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
let entry: JsonValue = json!({
|
||||||
|
"agent_id": agent_id,
|
||||||
|
"story_id": story_id,
|
||||||
|
"input_tokens": input_tokens,
|
||||||
|
"output_tokens": output_tokens,
|
||||||
|
"timestamp": timestamp,
|
||||||
|
})
|
||||||
|
.into();
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.tokens.insert(ROOT_ID, entry));
|
||||||
|
state.token_index = rebuild_token_index(&state.crdt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read all token-usage entries.
|
||||||
|
pub fn read_all_token_usage() -> Option<Vec<TokenUsageView>> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
let mut out = Vec::new();
|
||||||
|
for entry in state.crdt.doc.tokens.iter() {
|
||||||
|
if let Some(v) = extract_token_view(entry) {
|
||||||
|
out.push(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read a single token-usage entry by `agent_id`.
|
||||||
|
pub fn read_token_usage(agent_id: &str) -> Option<TokenUsageView> {
|
||||||
|
let state_mutex = get_crdt()?;
|
||||||
|
let state = state_mutex.lock().ok()?;
|
||||||
|
let &idx = state.token_index.get(agent_id)?;
|
||||||
|
extract_token_view(&state.crdt.doc.tokens[idx])
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tombstone a token-usage entry by `agent_id`.
|
||||||
|
///
|
||||||
|
/// Returns `true` if the entry existed and a delete op was issued.
|
||||||
|
pub fn delete_token_usage(agent_id: &str) -> bool {
|
||||||
|
let Some(state_mutex) = get_crdt() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Ok(mut state) = state_mutex.lock() else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Some(&idx) = state.token_index.get(agent_id) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let Some(op_id) = list_id_at(&state.crdt.doc.tokens, idx) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
apply_and_persist(&mut state, |s| s.crdt.doc.tokens.delete(op_id));
|
||||||
|
state.token_index = rebuild_token_index(&state.crdt);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert a CRDT token-usage entry into its read-only view representation.
|
||||||
|
pub(super) fn extract_token_view(entry: &TokenUsageCrdt) -> Option<TokenUsageView> {
|
||||||
|
let agent_id = match entry.agent_id.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => s,
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
let story_id = match entry.story_id.view() {
|
||||||
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
let input_tokens = match entry.input_tokens.view() {
|
||||||
|
JsonValue::Number(n) => n,
|
||||||
|
_ => 0.0,
|
||||||
|
};
|
||||||
|
let output_tokens = match entry.output_tokens.view() {
|
||||||
|
JsonValue::Number(n) => n,
|
||||||
|
_ => 0.0,
|
||||||
|
};
|
||||||
|
let timestamp = match entry.timestamp.view() {
|
||||||
|
JsonValue::Number(n) => n,
|
||||||
|
_ => 0.0,
|
||||||
|
};
|
||||||
|
Some(TokenUsageView {
|
||||||
|
agent_id,
|
||||||
|
story_id,
|
||||||
|
input_tokens,
|
||||||
|
output_tokens,
|
||||||
|
timestamp,
|
||||||
|
})
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user