huskies: merge 764

This commit is contained in:
dave
2026-04-28 09:49:57 +00:00
parent 3d986a733b
commit d2d5ef8afa
5 changed files with 1147 additions and 6 deletions
+906
View File
@@ -0,0 +1,906 @@
//! Read/write helpers for the five LWW-map CRDT collections:
//! `tokens`, `merge_jobs`, `active_agents`, `test_jobs`, and `agent_throttle`.
//!
//! Each collection is backed by a `ListCrdt` with a primary-key field.
//! A secondary index in [`super::state::CrdtState`] provides O(1) lookup by
//! key. The write functions follow the same insert-or-update pattern used by
//! [`super::presence`] for the `nodes` collection.
#![allow(dead_code)]
use bft_json_crdt::json_crdt::*;
use bft_json_crdt::list_crdt::ListCrdt;
use bft_json_crdt::op::{OpId, ROOT_ID};
use serde_json::json;
/// Return the `OpId` of the entry at iter-position `idx`, consistent with
/// `ListCrdt::iter()` and the `[]` index operator.
///
/// The built-in `id_at` counts non-deleted entries including the root sentinel
/// (which has `content = None`), causing an off-by-one mismatch with
/// `iter().enumerate()`. This helper requires `content.is_some()` so that the
/// secondary-index position maps correctly to an OpId.
fn list_id_at<T: CrdtNode>(list: &ListCrdt<T>, idx: usize) -> Option<OpId> {
let mut i = 0;
for op in &list.ops {
if !op.is_deleted && op.content.is_some() {
if i == idx {
return Some(op.id);
}
i += 1;
}
}
None
}
use super::state::{
apply_and_persist, get_crdt, rebuild_active_agent_index, rebuild_agent_throttle_index,
rebuild_merge_job_index, rebuild_test_job_index, rebuild_token_index,
};
use super::types::{
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, MergeJobCrdt,
MergeJobView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView,
};
// ── tokens ───────────────────────────────────────────────────────────
/// Write or update a token-usage entry keyed by `agent_id`.
///
/// If an entry for `agent_id` already exists it is updated in place;
/// otherwise a new entry is inserted.
pub fn write_token_usage(
agent_id: &str,
story_id: &str,
input_tokens: f64,
output_tokens: f64,
timestamp: f64,
) {
let Some(state_mutex) = get_crdt() else {
return;
};
let Ok(mut state) = state_mutex.lock() else {
return;
};
if let Some(&idx) = state.token_index.get(agent_id) {
apply_and_persist(&mut state, |s| {
s.crdt.doc.tokens[idx].story_id.set(story_id.to_string())
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.tokens[idx].input_tokens.set(input_tokens)
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.tokens[idx].output_tokens.set(output_tokens)
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.tokens[idx].timestamp.set(timestamp)
});
} else {
let entry: JsonValue = json!({
"agent_id": agent_id,
"story_id": story_id,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"timestamp": timestamp,
})
.into();
apply_and_persist(&mut state, |s| s.crdt.doc.tokens.insert(ROOT_ID, entry));
state.token_index = rebuild_token_index(&state.crdt);
}
}
/// Read all token-usage entries.
pub fn read_all_token_usage() -> Option<Vec<TokenUsageView>> {
let state_mutex = get_crdt()?;
let state = state_mutex.lock().ok()?;
let mut out = Vec::new();
for entry in state.crdt.doc.tokens.iter() {
if let Some(v) = extract_token_view(entry) {
out.push(v);
}
}
Some(out)
}
/// Read a single token-usage entry by `agent_id`.
pub fn read_token_usage(agent_id: &str) -> Option<TokenUsageView> {
let state_mutex = get_crdt()?;
let state = state_mutex.lock().ok()?;
let &idx = state.token_index.get(agent_id)?;
extract_token_view(&state.crdt.doc.tokens[idx])
}
/// Tombstone a token-usage entry by `agent_id`.
///
/// Returns `true` if the entry existed and a delete op was issued.
pub fn delete_token_usage(agent_id: &str) -> bool {
let Some(state_mutex) = get_crdt() else {
return false;
};
let Ok(mut state) = state_mutex.lock() else {
return false;
};
let Some(&idx) = state.token_index.get(agent_id) else {
return false;
};
let Some(op_id) = list_id_at(&state.crdt.doc.tokens, idx) else {
return false;
};
apply_and_persist(&mut state, |s| s.crdt.doc.tokens.delete(op_id));
state.token_index = rebuild_token_index(&state.crdt);
true
}
fn extract_token_view(entry: &TokenUsageCrdt) -> Option<TokenUsageView> {
let agent_id = match entry.agent_id.view() {
JsonValue::String(s) if !s.is_empty() => s,
_ => return None,
};
let story_id = match entry.story_id.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let input_tokens = match entry.input_tokens.view() {
JsonValue::Number(n) => n,
_ => 0.0,
};
let output_tokens = match entry.output_tokens.view() {
JsonValue::Number(n) => n,
_ => 0.0,
};
let timestamp = match entry.timestamp.view() {
JsonValue::Number(n) => n,
_ => 0.0,
};
Some(TokenUsageView {
agent_id,
story_id,
input_tokens,
output_tokens,
timestamp,
})
}
// ── merge_jobs ────────────────────────────────────────────────────────
/// Write or update a merge-job entry keyed by `story_id`.
pub fn write_merge_job(
story_id: &str,
status: &str,
started_at: f64,
finished_at: Option<f64>,
error: Option<&str>,
) {
let Some(state_mutex) = get_crdt() else {
return;
};
let Ok(mut state) = state_mutex.lock() else {
return;
};
if let Some(&idx) = state.merge_job_index.get(story_id) {
apply_and_persist(&mut state, |s| {
s.crdt.doc.merge_jobs[idx].status.set(status.to_string())
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.merge_jobs[idx].started_at.set(started_at)
});
if let Some(fa) = finished_at {
apply_and_persist(&mut state, |s| {
s.crdt.doc.merge_jobs[idx].finished_at.set(fa)
});
}
if let Some(e) = error {
apply_and_persist(&mut state, |s| {
s.crdt.doc.merge_jobs[idx].error.set(e.to_string())
});
}
} else {
let entry: JsonValue = json!({
"story_id": story_id,
"status": status,
"started_at": started_at,
"finished_at": finished_at.unwrap_or(0.0),
"error": error.unwrap_or(""),
})
.into();
apply_and_persist(&mut state, |s| s.crdt.doc.merge_jobs.insert(ROOT_ID, entry));
state.merge_job_index = rebuild_merge_job_index(&state.crdt);
}
}
/// Read all merge-job entries.
pub fn read_all_merge_jobs() -> Option<Vec<MergeJobView>> {
let state_mutex = get_crdt()?;
let state = state_mutex.lock().ok()?;
let mut out = Vec::new();
for entry in state.crdt.doc.merge_jobs.iter() {
if let Some(v) = extract_merge_job_view(entry) {
out.push(v);
}
}
Some(out)
}
/// Read a single merge-job entry by `story_id`.
pub fn read_merge_job(story_id: &str) -> Option<MergeJobView> {
let state_mutex = get_crdt()?;
let state = state_mutex.lock().ok()?;
let &idx = state.merge_job_index.get(story_id)?;
extract_merge_job_view(&state.crdt.doc.merge_jobs[idx])
}
/// Tombstone a merge-job entry by `story_id`.
pub fn delete_merge_job(story_id: &str) -> bool {
let Some(state_mutex) = get_crdt() else {
return false;
};
let Ok(mut state) = state_mutex.lock() else {
return false;
};
let Some(&idx) = state.merge_job_index.get(story_id) else {
return false;
};
let Some(op_id) = list_id_at(&state.crdt.doc.merge_jobs, idx) else {
return false;
};
apply_and_persist(&mut state, |s| s.crdt.doc.merge_jobs.delete(op_id));
state.merge_job_index = rebuild_merge_job_index(&state.crdt);
true
}
fn extract_merge_job_view(entry: &MergeJobCrdt) -> Option<MergeJobView> {
let story_id = match entry.story_id.view() {
JsonValue::String(s) if !s.is_empty() => s,
_ => return None,
};
let status = match entry.status.view() {
JsonValue::String(s) => s,
_ => String::new(),
};
let started_at = match entry.started_at.view() {
JsonValue::Number(n) => n,
_ => 0.0,
};
let finished_at = match entry.finished_at.view() {
JsonValue::Number(n) if n > 0.0 => Some(n),
_ => None,
};
let error = match entry.error.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
Some(MergeJobView {
story_id,
status,
started_at,
finished_at,
error,
})
}
// ── active_agents ─────────────────────────────────────────────────────
/// Write or update an active-agent entry keyed by `agent_id`.
pub fn write_active_agent(agent_id: &str, story_id: &str, node_id: &str, started_at: f64) {
let Some(state_mutex) = get_crdt() else {
return;
};
let Ok(mut state) = state_mutex.lock() else {
return;
};
if let Some(&idx) = state.active_agent_index.get(agent_id) {
apply_and_persist(&mut state, |s| {
s.crdt.doc.active_agents[idx]
.story_id
.set(story_id.to_string())
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.active_agents[idx]
.node_id
.set(node_id.to_string())
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.active_agents[idx].started_at.set(started_at)
});
} else {
let entry: JsonValue = json!({
"agent_id": agent_id,
"story_id": story_id,
"node_id": node_id,
"started_at": started_at,
})
.into();
apply_and_persist(&mut state, |s| {
s.crdt.doc.active_agents.insert(ROOT_ID, entry)
});
state.active_agent_index = rebuild_active_agent_index(&state.crdt);
}
}
/// Read all active-agent entries.
pub fn read_all_active_agents() -> Option<Vec<ActiveAgentView>> {
let state_mutex = get_crdt()?;
let state = state_mutex.lock().ok()?;
let mut out = Vec::new();
for entry in state.crdt.doc.active_agents.iter() {
if let Some(v) = extract_active_agent_view(entry) {
out.push(v);
}
}
Some(out)
}
/// Read a single active-agent entry by `agent_id`.
pub fn read_active_agent(agent_id: &str) -> Option<ActiveAgentView> {
let state_mutex = get_crdt()?;
let state = state_mutex.lock().ok()?;
let &idx = state.active_agent_index.get(agent_id)?;
extract_active_agent_view(&state.crdt.doc.active_agents[idx])
}
/// Tombstone an active-agent entry by `agent_id`.
pub fn delete_active_agent(agent_id: &str) -> bool {
let Some(state_mutex) = get_crdt() else {
return false;
};
let Ok(mut state) = state_mutex.lock() else {
return false;
};
let Some(&idx) = state.active_agent_index.get(agent_id) else {
return false;
};
let Some(op_id) = list_id_at(&state.crdt.doc.active_agents, idx) else {
return false;
};
apply_and_persist(&mut state, |s| s.crdt.doc.active_agents.delete(op_id));
state.active_agent_index = rebuild_active_agent_index(&state.crdt);
true
}
fn extract_active_agent_view(entry: &ActiveAgentCrdt) -> Option<ActiveAgentView> {
let agent_id = match entry.agent_id.view() {
JsonValue::String(s) if !s.is_empty() => s,
_ => return None,
};
let story_id = match entry.story_id.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let node_id = match entry.node_id.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let started_at = match entry.started_at.view() {
JsonValue::Number(n) => n,
_ => 0.0,
};
Some(ActiveAgentView {
agent_id,
story_id,
node_id,
started_at,
})
}
// ── test_jobs ─────────────────────────────────────────────────────────
/// Write or update a test-job entry keyed by `story_id`.
pub fn write_test_job(
story_id: &str,
status: &str,
started_at: f64,
finished_at: Option<f64>,
output: Option<&str>,
) {
let Some(state_mutex) = get_crdt() else {
return;
};
let Ok(mut state) = state_mutex.lock() else {
return;
};
if let Some(&idx) = state.test_job_index.get(story_id) {
apply_and_persist(&mut state, |s| {
s.crdt.doc.test_jobs[idx].status.set(status.to_string())
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.test_jobs[idx].started_at.set(started_at)
});
if let Some(fa) = finished_at {
apply_and_persist(&mut state, |s| {
s.crdt.doc.test_jobs[idx].finished_at.set(fa)
});
}
if let Some(o) = output {
apply_and_persist(&mut state, |s| {
s.crdt.doc.test_jobs[idx].output.set(o.to_string())
});
}
} else {
let entry: JsonValue = json!({
"story_id": story_id,
"status": status,
"started_at": started_at,
"finished_at": finished_at.unwrap_or(0.0),
"output": output.unwrap_or(""),
})
.into();
apply_and_persist(&mut state, |s| s.crdt.doc.test_jobs.insert(ROOT_ID, entry));
state.test_job_index = rebuild_test_job_index(&state.crdt);
}
}
/// Read all test-job entries.
pub fn read_all_test_jobs() -> Option<Vec<TestJobView>> {
let state_mutex = get_crdt()?;
let state = state_mutex.lock().ok()?;
let mut out = Vec::new();
for entry in state.crdt.doc.test_jobs.iter() {
if let Some(v) = extract_test_job_view(entry) {
out.push(v);
}
}
Some(out)
}
/// Read a single test-job entry by `story_id`.
pub fn read_test_job(story_id: &str) -> Option<TestJobView> {
let state_mutex = get_crdt()?;
let state = state_mutex.lock().ok()?;
let &idx = state.test_job_index.get(story_id)?;
extract_test_job_view(&state.crdt.doc.test_jobs[idx])
}
/// Tombstone a test-job entry by `story_id`.
pub fn delete_test_job(story_id: &str) -> bool {
let Some(state_mutex) = get_crdt() else {
return false;
};
let Ok(mut state) = state_mutex.lock() else {
return false;
};
let Some(&idx) = state.test_job_index.get(story_id) else {
return false;
};
let Some(op_id) = list_id_at(&state.crdt.doc.test_jobs, idx) else {
return false;
};
apply_and_persist(&mut state, |s| s.crdt.doc.test_jobs.delete(op_id));
state.test_job_index = rebuild_test_job_index(&state.crdt);
true
}
fn extract_test_job_view(entry: &TestJobCrdt) -> Option<TestJobView> {
let story_id = match entry.story_id.view() {
JsonValue::String(s) if !s.is_empty() => s,
_ => return None,
};
let status = match entry.status.view() {
JsonValue::String(s) => s,
_ => String::new(),
};
let started_at = match entry.started_at.view() {
JsonValue::Number(n) => n,
_ => 0.0,
};
let finished_at = match entry.finished_at.view() {
JsonValue::Number(n) if n > 0.0 => Some(n),
_ => None,
};
let output = match entry.output.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
Some(TestJobView {
story_id,
status,
started_at,
finished_at,
output,
})
}
// ── agent_throttle ────────────────────────────────────────────────────
/// Write or update an agent-throttle entry keyed by `node_id`.
pub fn write_agent_throttle(node_id: &str, window_start: f64, count: f64, limit: f64) {
let Some(state_mutex) = get_crdt() else {
return;
};
let Ok(mut state) = state_mutex.lock() else {
return;
};
if let Some(&idx) = state.agent_throttle_index.get(node_id) {
apply_and_persist(&mut state, |s| {
s.crdt.doc.agent_throttle[idx]
.window_start
.set(window_start)
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.agent_throttle[idx].count.set(count)
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.agent_throttle[idx].limit.set(limit)
});
} else {
let entry: JsonValue = json!({
"node_id": node_id,
"window_start": window_start,
"count": count,
"limit": limit,
})
.into();
apply_and_persist(&mut state, |s| {
s.crdt.doc.agent_throttle.insert(ROOT_ID, entry)
});
state.agent_throttle_index = rebuild_agent_throttle_index(&state.crdt);
}
}
/// Read all agent-throttle entries.
pub fn read_all_agent_throttles() -> Option<Vec<AgentThrottleView>> {
let state_mutex = get_crdt()?;
let state = state_mutex.lock().ok()?;
let mut out = Vec::new();
for entry in state.crdt.doc.agent_throttle.iter() {
if let Some(v) = extract_agent_throttle_view(entry) {
out.push(v);
}
}
Some(out)
}
/// Read a single agent-throttle entry by `node_id`.
pub fn read_agent_throttle(node_id: &str) -> Option<AgentThrottleView> {
let state_mutex = get_crdt()?;
let state = state_mutex.lock().ok()?;
let &idx = state.agent_throttle_index.get(node_id)?;
extract_agent_throttle_view(&state.crdt.doc.agent_throttle[idx])
}
/// Tombstone an agent-throttle entry by `node_id`.
pub fn delete_agent_throttle(node_id: &str) -> bool {
let Some(state_mutex) = get_crdt() else {
return false;
};
let Ok(mut state) = state_mutex.lock() else {
return false;
};
let Some(&idx) = state.agent_throttle_index.get(node_id) else {
return false;
};
let Some(op_id) = list_id_at(&state.crdt.doc.agent_throttle, idx) else {
return false;
};
apply_and_persist(&mut state, |s| s.crdt.doc.agent_throttle.delete(op_id));
state.agent_throttle_index = rebuild_agent_throttle_index(&state.crdt);
true
}
fn extract_agent_throttle_view(entry: &AgentThrottleCrdt) -> Option<AgentThrottleView> {
let node_id = match entry.node_id.view() {
JsonValue::String(s) if !s.is_empty() => s,
_ => return None,
};
let window_start = match entry.window_start.view() {
JsonValue::Number(n) => n,
_ => 0.0,
};
let count = match entry.count.view() {
JsonValue::Number(n) => n,
_ => 0.0,
};
let limit = match entry.limit.view() {
JsonValue::Number(n) => n,
_ => 0.0,
};
Some(AgentThrottleView {
node_id,
window_start,
count,
limit,
})
}
// ── Tests ─────────────────────────────────────────────────────────────
//
// The `tokens` collection is used as the representative collection for the
// required CRDT semantics tests (insert, update, delete-via-tombstone, and
// concurrent write semantics).
#[cfg(test)]
mod tests {
use super::super::state::init_for_test;
use super::*;
use bft_json_crdt::json_crdt::{BaseCrdt, JsonValue, OpState};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use super::super::types::PipelineDoc;
// ── insert ────────────────────────────────────────────────────────
#[test]
fn token_insert_is_visible_via_read() {
init_for_test();
write_token_usage("coder-1:42", "42", 100.0, 200.0, 1_000_000.0);
let view = read_token_usage("coder-1:42").expect("entry must exist after insert");
assert_eq!(view.agent_id, "coder-1:42");
assert_eq!(view.story_id.as_deref(), Some("42"));
assert!((view.input_tokens - 100.0).abs() < f64::EPSILON);
assert!((view.output_tokens - 200.0).abs() < f64::EPSILON);
assert!((view.timestamp - 1_000_000.0).abs() < f64::EPSILON);
}
#[test]
fn token_read_all_returns_inserted_entries() {
init_for_test();
write_token_usage("coder-a:10", "10", 10.0, 20.0, 1.0);
write_token_usage("coder-b:10", "10", 30.0, 40.0, 2.0);
let all = read_all_token_usage().unwrap_or_default();
let ids: Vec<&str> = all.iter().map(|v| v.agent_id.as_str()).collect();
assert!(
ids.contains(&"coder-a:10"),
"coder-a:10 must be in read_all"
);
assert!(
ids.contains(&"coder-b:10"),
"coder-b:10 must be in read_all"
);
}
// ── update ────────────────────────────────────────────────────────
#[test]
fn token_update_overwrites_fields() {
init_for_test();
write_token_usage("coder-2:55", "55", 50.0, 60.0, 2_000_000.0);
// Update with new token counts.
write_token_usage("coder-2:55", "55", 500.0, 600.0, 3_000_000.0);
let view = read_token_usage("coder-2:55").expect("entry must exist after update");
assert!((view.input_tokens - 500.0).abs() < f64::EPSILON);
assert!((view.output_tokens - 600.0).abs() < f64::EPSILON);
assert!((view.timestamp - 3_000_000.0).abs() < f64::EPSILON);
}
// ── delete-via-tombstone ──────────────────────────────────────────
#[test]
fn token_delete_removes_entry_from_read() {
init_for_test();
write_token_usage("coder-3:77", "77", 1.0, 2.0, 9_999.0);
assert!(
read_token_usage("coder-3:77").is_some(),
"entry must exist before delete"
);
let deleted = delete_token_usage("coder-3:77");
assert!(deleted, "delete must return true for a known entry");
assert!(
read_token_usage("coder-3:77").is_none(),
"entry must be absent after tombstone"
);
}
#[test]
fn token_delete_nonexistent_returns_false() {
init_for_test();
assert!(!delete_token_usage("no-such-agent"));
}
#[test]
fn token_delete_not_returned_by_read_all() {
init_for_test();
write_token_usage("coder-4:88", "88", 5.0, 10.0, 1.0);
delete_token_usage("coder-4:88");
let all = read_all_token_usage().unwrap_or_default();
assert!(
!all.iter().any(|v| v.agent_id == "coder-4:88"),
"deleted entry must not appear in read_all"
);
}
// ── concurrent write semantics (LWW convergence) ──────────────────
#[test]
fn token_concurrent_writes_converge_via_lww() {
// Two independent CRDTs simulate two nodes writing concurrently.
let kp_a = make_keypair();
let kp_b = make_keypair();
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
// Node A inserts a token entry.
let entry_a: JsonValue = json!({
"agent_id": "coder-x:99",
"story_id": "99",
"input_tokens": 10.0,
"output_tokens": 20.0,
"timestamp": 1.0,
})
.into();
let insert_a = crdt_a.doc.tokens.insert(ROOT_ID, entry_a).sign(&kp_a);
assert_eq!(crdt_a.apply(insert_a.clone()), OpState::Ok);
// Node B inserts the same key with different values.
let entry_b: JsonValue = json!({
"agent_id": "coder-x:99",
"story_id": "99",
"input_tokens": 999.0,
"output_tokens": 888.0,
"timestamp": 1.0,
})
.into();
let insert_b = crdt_b.doc.tokens.insert(ROOT_ID, entry_b).sign(&kp_b);
assert_eq!(crdt_b.apply(insert_b.clone()), OpState::Ok);
// Both nodes update input_tokens concurrently.
let update_a = crdt_a.doc.tokens[0].input_tokens.set(111.0).sign(&kp_a);
let update_b = crdt_b.doc.tokens[0].input_tokens.set(222.0).sign(&kp_b);
assert_eq!(crdt_a.apply(update_a.clone()), OpState::Ok);
assert_eq!(crdt_b.apply(update_b.clone()), OpState::Ok);
// Cross-apply: A gets B's ops, B gets A's ops.
assert_eq!(crdt_a.apply(insert_b), OpState::Ok);
assert_eq!(crdt_a.apply(update_b), OpState::Ok);
assert_eq!(crdt_b.apply(insert_a), OpState::Ok);
assert_eq!(crdt_b.apply(update_a), OpState::Ok);
// Both CRDTs must converge to the same view — compare field by field.
assert_eq!(
crdt_a.doc.tokens.view().len(),
crdt_b.doc.tokens.view().len(),
"both peers must have the same number of token entries"
);
assert_eq!(
crdt_a.doc.tokens[0].input_tokens.view(),
crdt_b.doc.tokens[0].input_tokens.view(),
"concurrent writes to input_tokens must converge to the same value"
);
assert_eq!(
crdt_a.doc.tokens[0].output_tokens.view(),
crdt_b.doc.tokens[0].output_tokens.view(),
"concurrent writes must converge for output_tokens"
);
}
// ── smoke tests for the other four collections ────────────────────
#[test]
fn merge_job_insert_update_delete() {
init_for_test();
write_merge_job("100", "pending", 1.0, None, None);
let v = read_merge_job("100").expect("merge job must exist");
assert_eq!(v.status, "pending");
write_merge_job("100", "done", 1.0, Some(2.0), None);
let v2 = read_merge_job("100").expect("merge job must exist after update");
assert_eq!(v2.status, "done");
assert_eq!(v2.finished_at, Some(2.0));
assert!(delete_merge_job("100"));
assert!(read_merge_job("100").is_none());
}
#[test]
fn active_agent_insert_update_delete() {
init_for_test();
write_active_agent("coder-5", "200", "node-abc", 5.0);
let v = read_active_agent("coder-5").expect("active agent must exist");
assert_eq!(v.story_id.as_deref(), Some("200"));
write_active_agent("coder-5", "201", "node-abc", 6.0);
let v2 = read_active_agent("coder-5").expect("active agent must exist after update");
assert_eq!(v2.story_id.as_deref(), Some("201"));
assert!(delete_active_agent("coder-5"));
assert!(read_active_agent("coder-5").is_none());
}
#[test]
fn test_job_insert_update_delete() {
init_for_test();
write_test_job("300", "running", 7.0, None, None);
let v = read_test_job("300").expect("test job must exist");
assert_eq!(v.status, "running");
write_test_job("300", "pass", 7.0, Some(8.0), Some("all green"));
let v2 = read_test_job("300").expect("test job must exist after update");
assert_eq!(v2.status, "pass");
assert_eq!(v2.output.as_deref(), Some("all green"));
assert!(delete_test_job("300"));
assert!(read_test_job("300").is_none());
}
#[test]
fn agent_throttle_insert_update_delete() {
init_for_test();
write_agent_throttle("node-z", 1000.0, 2.0, 5.0);
let v = read_agent_throttle("node-z").expect("throttle must exist");
assert!((v.count - 2.0).abs() < f64::EPSILON);
assert!((v.limit - 5.0).abs() < f64::EPSILON);
write_agent_throttle("node-z", 1000.0, 4.0, 5.0);
let v2 = read_agent_throttle("node-z").expect("throttle must exist after update");
assert!((v2.count - 4.0).abs() < f64::EPSILON);
assert!(delete_agent_throttle("node-z"));
assert!(read_agent_throttle("node-z").is_none());
}
// ── merge_jobs: concurrent-write LWW resolution ───────────────────
#[test]
fn merge_job_concurrent_writes_converge_via_lww() {
// Two independent CRDTs simulate two nodes writing concurrently.
let kp_a = make_keypair();
let kp_b = make_keypair();
let mut crdt_a = BaseCrdt::<PipelineDoc>::new(&kp_a);
let mut crdt_b = BaseCrdt::<PipelineDoc>::new(&kp_b);
// Node A inserts a merge-job entry.
let entry_a: JsonValue = json!({
"story_id": "500_story_concurrent",
"status": "pending",
"started_at": 1.0,
"finished_at": 0.0,
"error": "",
})
.into();
let insert_a = crdt_a.doc.merge_jobs.insert(ROOT_ID, entry_a).sign(&kp_a);
assert_eq!(crdt_a.apply(insert_a.clone()), OpState::Ok);
// Node B inserts the same story_id with a different status concurrently.
let entry_b: JsonValue = json!({
"story_id": "500_story_concurrent",
"status": "running",
"started_at": 1.0,
"finished_at": 0.0,
"error": "",
})
.into();
let insert_b = crdt_b.doc.merge_jobs.insert(ROOT_ID, entry_b).sign(&kp_b);
assert_eq!(crdt_b.apply(insert_b.clone()), OpState::Ok);
// Both nodes concurrently update the status field.
let update_a = crdt_a.doc.merge_jobs[0]
.status
.set("done".to_string())
.sign(&kp_a);
let update_b = crdt_b.doc.merge_jobs[0]
.status
.set("failed".to_string())
.sign(&kp_b);
assert_eq!(crdt_a.apply(update_a.clone()), OpState::Ok);
assert_eq!(crdt_b.apply(update_b.clone()), OpState::Ok);
// Cross-apply: A gets B's ops, B gets A's ops.
assert_eq!(crdt_a.apply(insert_b), OpState::Ok);
assert_eq!(crdt_a.apply(update_b), OpState::Ok);
assert_eq!(crdt_b.apply(insert_a), OpState::Ok);
assert_eq!(crdt_b.apply(update_a), OpState::Ok);
// Both CRDTs must converge to the same view.
assert_eq!(
crdt_a.doc.merge_jobs.view().len(),
crdt_b.doc.merge_jobs.view().len(),
"both peers must have the same number of merge_job entries"
);
assert_eq!(
crdt_a.doc.merge_jobs[0].status.view(),
crdt_b.doc.merge_jobs[0].status.view(),
"concurrent writes to status must converge to the same value via LWW"
);
}
}
+11 -2
View File
@@ -17,6 +17,7 @@ use std::collections::HashMap;
/// its clock so the other side can compute which ops are missing.
pub type VectorClock = HashMap<String, u64>;
mod lww_maps;
mod ops;
mod presence;
mod read;
@@ -24,6 +25,13 @@ mod state;
mod types;
mod write;
pub use lww_maps::{
delete_active_agent, delete_agent_throttle, delete_merge_job, delete_test_job,
delete_token_usage, read_active_agent, read_agent_throttle, read_all_active_agents,
read_all_agent_throttles, read_all_merge_jobs, read_all_test_jobs, read_all_token_usage,
read_merge_job, read_test_job, read_token_usage, write_active_agent, write_agent_throttle,
write_merge_job, write_test_job, write_token_usage,
};
pub use ops::{all_ops_json, apply_remote_op, ops_since, our_vector_clock, subscribe_ops};
pub use presence::{
is_claimed_by_us, our_node_id, read_all_node_presence, release_claim, sign_challenge,
@@ -35,8 +43,9 @@ pub use read::{
};
pub use state::init;
pub use types::{
CrdtEvent, NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView,
subscribe,
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent,
MergeJobCrdt, MergeJobView, NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt,
PipelineItemView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, subscribe,
};
pub use write::{
migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id, write_item,
+10 -3
View File
@@ -10,8 +10,10 @@ use tokio::sync::broadcast;
use super::VectorClock;
use super::state::{
ALL_OPS, SYNC_TX, VECTOR_CLOCK, apply_and_persist, emit_event, get_crdt, rebuild_index,
rebuild_node_index, track_op,
ALL_OPS, SYNC_TX, VECTOR_CLOCK, apply_and_persist, emit_event, get_crdt,
rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_index,
rebuild_merge_job_index, rebuild_node_index, rebuild_test_job_index, rebuild_token_index,
track_op,
};
use super::types::{CrdtEvent, PipelineDoc};
use crate::slog;
@@ -127,9 +129,14 @@ pub fn apply_remote_op(op: SignedOp) -> bool {
track_op(&op, json);
}
// Rebuild indices (new items or nodes may have been inserted).
// Rebuild indices (new items, nodes, or LWW-map entries may have been inserted).
state.index = rebuild_index(&state.crdt);
state.node_index = rebuild_node_index(&state.crdt);
state.token_index = rebuild_token_index(&state.crdt);
state.merge_job_index = rebuild_merge_job_index(&state.crdt);
state.active_agent_index = rebuild_active_agent_index(&state.crdt);
state.test_job_index = rebuild_test_job_index(&state.crdt);
state.agent_throttle_index = rebuild_agent_throttle_index(&state.crdt);
// Detect and broadcast stage transitions.
for (sid, &idx) in &state.index {
+96 -1
View File
@@ -61,6 +61,16 @@ pub(super) struct CrdtState {
pub(super) index: HashMap<String, usize>,
/// Maps node_id (hex) → index in the nodes ListCrdt for O(1) lookup.
pub(super) node_index: HashMap<String, usize>,
/// Maps agent_id → index in the tokens ListCrdt for O(1) lookup.
pub(super) token_index: HashMap<String, usize>,
/// Maps story_id → index in the merge_jobs ListCrdt for O(1) lookup.
pub(super) merge_job_index: HashMap<String, usize>,
/// Maps agent_id → index in the active_agents ListCrdt for O(1) lookup.
pub(super) active_agent_index: HashMap<String, usize>,
/// Maps story_id → index in the test_jobs ListCrdt for O(1) lookup.
pub(super) test_job_index: HashMap<String, usize>,
/// Maps node_id → index in the agent_throttle ListCrdt for O(1) lookup.
pub(super) agent_throttle_index: HashMap<String, usize>,
/// Channel sender for fire-and-forget op persistence.
pub(super) persist_tx: mpsc::UnboundedSender<SignedOp>,
/// Max sequence number seen across all ops during init() replay.
@@ -146,11 +156,21 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
// Build the indices from the reconstructed state.
let index = rebuild_index(&crdt);
let node_index = rebuild_node_index(&crdt);
let token_index = rebuild_token_index(&crdt);
let merge_job_index = rebuild_merge_job_index(&crdt);
let active_agent_index = rebuild_active_agent_index(&crdt);
let test_job_index = rebuild_test_job_index(&crdt);
let agent_throttle_index = rebuild_agent_throttle_index(&crdt);
// Advance the top-level list clocks to the Lamport floor so that
// list-level inserts (new items / new nodes) don't re-emit low seq numbers.
// list-level inserts don't re-emit low seq numbers.
crdt.doc.items.advance_seq(lamport_floor);
crdt.doc.nodes.advance_seq(lamport_floor);
crdt.doc.tokens.advance_seq(lamport_floor);
crdt.doc.merge_jobs.advance_seq(lamport_floor);
crdt.doc.active_agents.advance_seq(lamport_floor);
crdt.doc.test_jobs.advance_seq(lamport_floor);
crdt.doc.agent_throttle.advance_seq(lamport_floor);
slog!(
"[crdt] Initialised: {} ops replayed, {} items indexed, {} nodes indexed, lamport_floor={}",
@@ -199,6 +219,11 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
keypair,
index,
node_index,
token_index,
merge_job_index,
active_agent_index,
test_job_index,
agent_throttle_index,
persist_tx,
lamport_floor,
};
@@ -237,6 +262,11 @@ pub fn init_for_test() {
keypair,
index: HashMap::new(),
node_index: HashMap::new(),
token_index: HashMap::new(),
merge_job_index: HashMap::new(),
active_agent_index: HashMap::new(),
test_job_index: HashMap::new(),
agent_throttle_index: HashMap::new(),
persist_tx,
lamport_floor: 0,
};
@@ -296,6 +326,61 @@ pub(super) fn rebuild_node_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String
map
}
/// Rebuild the agent_id → tokens list index.
pub(super) fn rebuild_token_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
let mut map = HashMap::new();
for (i, entry) in crdt.doc.tokens.iter().enumerate() {
if let JsonValue::String(ref k) = entry.agent_id.view() {
map.insert(k.clone(), i);
}
}
map
}
/// Rebuild the story_id → merge_jobs list index.
pub(super) fn rebuild_merge_job_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
let mut map = HashMap::new();
for (i, entry) in crdt.doc.merge_jobs.iter().enumerate() {
if let JsonValue::String(ref k) = entry.story_id.view() {
map.insert(k.clone(), i);
}
}
map
}
/// Rebuild the agent_id → active_agents list index.
pub(super) fn rebuild_active_agent_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
let mut map = HashMap::new();
for (i, entry) in crdt.doc.active_agents.iter().enumerate() {
if let JsonValue::String(ref k) = entry.agent_id.view() {
map.insert(k.clone(), i);
}
}
map
}
/// Rebuild the story_id → test_jobs list index.
pub(super) fn rebuild_test_job_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
let mut map = HashMap::new();
for (i, entry) in crdt.doc.test_jobs.iter().enumerate() {
if let JsonValue::String(ref k) = entry.story_id.view() {
map.insert(k.clone(), i);
}
}
map
}
/// Rebuild the node_id → agent_throttle list index.
pub(super) fn rebuild_agent_throttle_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
let mut map = HashMap::new();
for (i, entry) in crdt.doc.agent_throttle.iter().enumerate() {
if let JsonValue::String(ref k) = entry.node_id.view() {
map.insert(k.clone(), i);
}
}
map
}
// ── Write path ───────────────────────────────────────────────────────
/// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the
@@ -503,6 +588,11 @@ mod tests {
keypair: kp,
index: HashMap::new(),
node_index: HashMap::new(),
token_index: HashMap::new(),
merge_job_index: HashMap::new(),
active_agent_index: HashMap::new(),
test_job_index: HashMap::new(),
agent_throttle_index: HashMap::new(),
persist_tx,
lamport_floor: 0,
};
@@ -570,6 +660,11 @@ mod tests {
keypair: kp,
index: HashMap::new(),
node_index: HashMap::new(),
token_index: HashMap::new(),
merge_job_index: HashMap::new(),
active_agent_index: HashMap::new(),
test_job_index: HashMap::new(),
agent_throttle_index: HashMap::new(),
persist_tx,
lamport_floor: 0,
};
+124
View File
@@ -36,6 +36,11 @@ static CRDT_EVENT_TX: OnceLock<broadcast::Sender<CrdtEvent>> = OnceLock::new();
pub struct PipelineDoc {
pub items: ListCrdt<PipelineItemCrdt>,
pub nodes: ListCrdt<NodePresenceCrdt>,
pub tokens: ListCrdt<TokenUsageCrdt>,
pub merge_jobs: ListCrdt<MergeJobCrdt>,
pub active_agents: ListCrdt<ActiveAgentCrdt>,
pub test_jobs: ListCrdt<TestJobCrdt>,
pub agent_throttle: ListCrdt<AgentThrottleCrdt>,
}
#[add_crdt_fields]
@@ -119,6 +124,125 @@ pub struct NodePresenceView {
pub last_seen_ms: Option<f64>,
}
// ── LWW-map CRDT types ───────────────────────────────────────────────
// Each collection is a `ListCrdt` with a primary-key field. A secondary
// index in `CrdtState` provides O(1) lookup by that key.
/// CRDT entry holding per-agent token-usage metrics.
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Debug)]
pub struct TokenUsageCrdt {
/// Unique key (e.g. `"coder-1:42_story_foo"`).
pub agent_id: LwwRegisterCrdt<String>,
pub story_id: LwwRegisterCrdt<String>,
pub input_tokens: LwwRegisterCrdt<f64>,
pub output_tokens: LwwRegisterCrdt<f64>,
/// Unix timestamp (seconds) of this record.
pub timestamp: LwwRegisterCrdt<f64>,
}
/// CRDT entry describing a merge job.
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Debug)]
pub struct MergeJobCrdt {
/// Unique key: the story being merged.
pub story_id: LwwRegisterCrdt<String>,
/// Status: `"pending"`, `"running"`, `"done"`, or `"failed"`.
pub status: LwwRegisterCrdt<String>,
pub started_at: LwwRegisterCrdt<f64>,
pub finished_at: LwwRegisterCrdt<f64>,
pub error: LwwRegisterCrdt<String>,
}
/// CRDT entry for a currently-running agent instance.
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Debug)]
pub struct ActiveAgentCrdt {
/// Unique key (e.g. `"coder-1"`).
pub agent_id: LwwRegisterCrdt<String>,
pub story_id: LwwRegisterCrdt<String>,
pub node_id: LwwRegisterCrdt<String>,
pub started_at: LwwRegisterCrdt<f64>,
}
/// CRDT entry describing a test job.
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Debug)]
pub struct TestJobCrdt {
/// Unique key: the story under test.
pub story_id: LwwRegisterCrdt<String>,
/// Status: `"pending"`, `"running"`, `"pass"`, or `"fail"`.
pub status: LwwRegisterCrdt<String>,
pub started_at: LwwRegisterCrdt<f64>,
pub finished_at: LwwRegisterCrdt<f64>,
pub output: LwwRegisterCrdt<String>,
}
/// CRDT entry holding per-node agent-throttle state.
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Debug)]
pub struct AgentThrottleCrdt {
/// Unique key: the node whose throttle this tracks.
pub node_id: LwwRegisterCrdt<String>,
/// Unix timestamp (seconds) of the throttle window start.
pub window_start: LwwRegisterCrdt<f64>,
/// Agent starts recorded in the current window.
pub count: LwwRegisterCrdt<f64>,
/// Maximum starts allowed per window.
pub limit: LwwRegisterCrdt<f64>,
}
// ── LWW-map view types ───────────────────────────────────────────────
/// Snapshot of a single token-usage entry.
#[derive(Clone, Debug)]
pub struct TokenUsageView {
pub agent_id: String,
pub story_id: Option<String>,
pub input_tokens: f64,
pub output_tokens: f64,
/// Unix timestamp (seconds).
pub timestamp: f64,
}
/// Snapshot of a single merge-job entry.
#[derive(Clone, Debug)]
pub struct MergeJobView {
pub story_id: String,
pub status: String,
pub started_at: f64,
pub finished_at: Option<f64>,
pub error: Option<String>,
}
/// Snapshot of a single active-agent entry.
#[derive(Clone, Debug)]
pub struct ActiveAgentView {
pub agent_id: String,
pub story_id: Option<String>,
pub node_id: Option<String>,
pub started_at: f64,
}
/// Snapshot of a single test-job entry.
#[derive(Clone, Debug)]
pub struct TestJobView {
pub story_id: String,
pub status: String,
pub started_at: f64,
pub finished_at: Option<f64>,
pub output: Option<String>,
}
/// Snapshot of a single agent-throttle entry.
#[derive(Clone, Debug)]
pub struct AgentThrottleView {
pub node_id: String,
pub window_start: f64,
pub count: f64,
pub limit: f64,
}
#[cfg(test)]
mod tests {
use super::super::state::emit_event;