From 186cb38eeb9a5c2bb51f800073f5e057f8cee11c Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 29 Apr 2026 11:45:33 +0000 Subject: [PATCH] huskies: merge 836 --- server/src/crdt_state/lww_maps.rs | 993 ------------------ .../src/crdt_state/lww_maps/active_agents.rs | 116 ++ .../src/crdt_state/lww_maps/agent_throttle.rs | 114 ++ .../crdt_state/lww_maps/gateway_projects.rs | 98 ++ server/src/crdt_state/lww_maps/merge_jobs.rs | 129 +++ server/src/crdt_state/lww_maps/mod.rs | 56 + server/src/crdt_state/lww_maps/test_jobs.rs | 129 +++ server/src/crdt_state/lww_maps/tests.rs | 297 ++++++ server/src/crdt_state/lww_maps/tokens.rs | 130 +++ 9 files changed, 1069 insertions(+), 993 deletions(-) delete mode 100644 server/src/crdt_state/lww_maps.rs create mode 100644 server/src/crdt_state/lww_maps/active_agents.rs create mode 100644 server/src/crdt_state/lww_maps/agent_throttle.rs create mode 100644 server/src/crdt_state/lww_maps/gateway_projects.rs create mode 100644 server/src/crdt_state/lww_maps/merge_jobs.rs create mode 100644 server/src/crdt_state/lww_maps/mod.rs create mode 100644 server/src/crdt_state/lww_maps/test_jobs.rs create mode 100644 server/src/crdt_state/lww_maps/tests.rs create mode 100644 server/src/crdt_state/lww_maps/tokens.rs diff --git a/server/src/crdt_state/lww_maps.rs b/server/src/crdt_state/lww_maps.rs deleted file mode 100644 index 474cdd50..00000000 --- a/server/src/crdt_state/lww_maps.rs +++ /dev/null @@ -1,993 +0,0 @@ -//! Read/write helpers for the five LWW-map CRDT collections: -//! `tokens`, `merge_jobs`, `active_agents`, `test_jobs`, and `agent_throttle`. -//! -//! Each collection is backed by a `ListCrdt` with a primary-key field. -//! A secondary index in [`super::state::CrdtState`] provides O(1) lookup by -//! key. The write functions follow the same insert-or-update pattern used by -//! [`super::presence`] for the `nodes` collection. - -#![allow(dead_code)] - -use bft_json_crdt::json_crdt::*; -use bft_json_crdt::list_crdt::ListCrdt; -use bft_json_crdt::op::{OpId, ROOT_ID}; -use serde_json::json; - -/// Return the `OpId` of the entry at iter-position `idx`, consistent with -/// `ListCrdt::iter()` and the `[]` index operator. -/// -/// The built-in `id_at` counts non-deleted entries including the root sentinel -/// (which has `content = None`), causing an off-by-one mismatch with -/// `iter().enumerate()`. This helper requires `content.is_some()` so that the -/// secondary-index position maps correctly to an OpId. -fn list_id_at(list: &ListCrdt, idx: usize) -> Option { - let mut i = 0; - for op in &list.ops { - if !op.is_deleted && op.content.is_some() { - if i == idx { - return Some(op.id); - } - i += 1; - } - } - None -} - -use super::state::{ - apply_and_persist, get_crdt, rebuild_active_agent_index, rebuild_agent_throttle_index, - rebuild_gateway_project_index, rebuild_merge_job_index, rebuild_test_job_index, - rebuild_token_index, -}; -use super::types::{ - ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, GatewayProjectCrdt, - GatewayProjectView, MergeJobCrdt, MergeJobView, TestJobCrdt, TestJobView, TokenUsageCrdt, - TokenUsageView, -}; - -// ── tokens ─────────────────────────────────────────────────────────── - -/// Write or update a token-usage entry keyed by `agent_id`. -/// -/// If an entry for `agent_id` already exists it is updated in place; -/// otherwise a new entry is inserted. -pub fn write_token_usage( - agent_id: &str, - story_id: &str, - input_tokens: f64, - output_tokens: f64, - timestamp: f64, -) { - let Some(state_mutex) = get_crdt() else { - return; - }; - let Ok(mut state) = state_mutex.lock() else { - return; - }; - - if let Some(&idx) = state.token_index.get(agent_id) { - apply_and_persist(&mut state, |s| { - s.crdt.doc.tokens[idx].story_id.set(story_id.to_string()) - }); - apply_and_persist(&mut state, |s| { - s.crdt.doc.tokens[idx].input_tokens.set(input_tokens) - }); - apply_and_persist(&mut state, |s| { - s.crdt.doc.tokens[idx].output_tokens.set(output_tokens) - }); - apply_and_persist(&mut state, |s| { - s.crdt.doc.tokens[idx].timestamp.set(timestamp) - }); - } else { - let entry: JsonValue = json!({ - "agent_id": agent_id, - "story_id": story_id, - "input_tokens": input_tokens, - "output_tokens": output_tokens, - "timestamp": timestamp, - }) - .into(); - apply_and_persist(&mut state, |s| s.crdt.doc.tokens.insert(ROOT_ID, entry)); - state.token_index = rebuild_token_index(&state.crdt); - } -} - -/// Read all token-usage entries. -pub fn read_all_token_usage() -> Option> { - let state_mutex = get_crdt()?; - let state = state_mutex.lock().ok()?; - let mut out = Vec::new(); - for entry in state.crdt.doc.tokens.iter() { - if let Some(v) = extract_token_view(entry) { - out.push(v); - } - } - Some(out) -} - -/// Read a single token-usage entry by `agent_id`. -pub fn read_token_usage(agent_id: &str) -> Option { - let state_mutex = get_crdt()?; - let state = state_mutex.lock().ok()?; - let &idx = state.token_index.get(agent_id)?; - extract_token_view(&state.crdt.doc.tokens[idx]) -} - -/// Tombstone a token-usage entry by `agent_id`. -/// -/// Returns `true` if the entry existed and a delete op was issued. -pub fn delete_token_usage(agent_id: &str) -> bool { - let Some(state_mutex) = get_crdt() else { - return false; - }; - let Ok(mut state) = state_mutex.lock() else { - return false; - }; - let Some(&idx) = state.token_index.get(agent_id) else { - return false; - }; - let Some(op_id) = list_id_at(&state.crdt.doc.tokens, idx) else { - return false; - }; - apply_and_persist(&mut state, |s| s.crdt.doc.tokens.delete(op_id)); - state.token_index = rebuild_token_index(&state.crdt); - true -} - -fn extract_token_view(entry: &TokenUsageCrdt) -> Option { - let agent_id = match entry.agent_id.view() { - JsonValue::String(s) if !s.is_empty() => s, - _ => return None, - }; - let story_id = match entry.story_id.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - let input_tokens = match entry.input_tokens.view() { - JsonValue::Number(n) => n, - _ => 0.0, - }; - let output_tokens = match entry.output_tokens.view() { - JsonValue::Number(n) => n, - _ => 0.0, - }; - let timestamp = match entry.timestamp.view() { - JsonValue::Number(n) => n, - _ => 0.0, - }; - Some(TokenUsageView { - agent_id, - story_id, - input_tokens, - output_tokens, - timestamp, - }) -} - -// ── merge_jobs ──────────────────────────────────────────────────────── - -/// Write or update a merge-job entry keyed by `story_id`. -pub fn write_merge_job( - story_id: &str, - status: &str, - started_at: f64, - finished_at: Option, - error: Option<&str>, -) { - let Some(state_mutex) = get_crdt() else { - return; - }; - let Ok(mut state) = state_mutex.lock() else { - return; - }; - - if let Some(&idx) = state.merge_job_index.get(story_id) { - apply_and_persist(&mut state, |s| { - s.crdt.doc.merge_jobs[idx].status.set(status.to_string()) - }); - apply_and_persist(&mut state, |s| { - s.crdt.doc.merge_jobs[idx].started_at.set(started_at) - }); - if let Some(fa) = finished_at { - apply_and_persist(&mut state, |s| { - s.crdt.doc.merge_jobs[idx].finished_at.set(fa) - }); - } - if let Some(e) = error { - apply_and_persist(&mut state, |s| { - s.crdt.doc.merge_jobs[idx].error.set(e.to_string()) - }); - } - } else { - let entry: JsonValue = json!({ - "story_id": story_id, - "status": status, - "started_at": started_at, - "finished_at": finished_at.unwrap_or(0.0), - "error": error.unwrap_or(""), - }) - .into(); - apply_and_persist(&mut state, |s| s.crdt.doc.merge_jobs.insert(ROOT_ID, entry)); - state.merge_job_index = rebuild_merge_job_index(&state.crdt); - } -} - -/// Read all merge-job entries. -pub fn read_all_merge_jobs() -> Option> { - let state_mutex = get_crdt()?; - let state = state_mutex.lock().ok()?; - let mut out = Vec::new(); - for entry in state.crdt.doc.merge_jobs.iter() { - if let Some(v) = extract_merge_job_view(entry) { - out.push(v); - } - } - Some(out) -} - -/// Read a single merge-job entry by `story_id`. -pub fn read_merge_job(story_id: &str) -> Option { - let state_mutex = get_crdt()?; - let state = state_mutex.lock().ok()?; - let &idx = state.merge_job_index.get(story_id)?; - extract_merge_job_view(&state.crdt.doc.merge_jobs[idx]) -} - -/// Tombstone a merge-job entry by `story_id`. -pub fn delete_merge_job(story_id: &str) -> bool { - let Some(state_mutex) = get_crdt() else { - return false; - }; - let Ok(mut state) = state_mutex.lock() else { - return false; - }; - let Some(&idx) = state.merge_job_index.get(story_id) else { - return false; - }; - let Some(op_id) = list_id_at(&state.crdt.doc.merge_jobs, idx) else { - return false; - }; - apply_and_persist(&mut state, |s| s.crdt.doc.merge_jobs.delete(op_id)); - state.merge_job_index = rebuild_merge_job_index(&state.crdt); - true -} - -fn extract_merge_job_view(entry: &MergeJobCrdt) -> Option { - let story_id = match entry.story_id.view() { - JsonValue::String(s) if !s.is_empty() => s, - _ => return None, - }; - let status = match entry.status.view() { - JsonValue::String(s) => s, - _ => String::new(), - }; - let started_at = match entry.started_at.view() { - JsonValue::Number(n) => n, - _ => 0.0, - }; - let finished_at = match entry.finished_at.view() { - JsonValue::Number(n) if n > 0.0 => Some(n), - _ => None, - }; - let error = match entry.error.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - Some(MergeJobView { - story_id, - status, - started_at, - finished_at, - error, - }) -} - -// ── active_agents ───────────────────────────────────────────────────── - -/// Write or update an active-agent entry keyed by `agent_id`. -pub fn write_active_agent(agent_id: &str, story_id: &str, node_id: &str, started_at: f64) { - let Some(state_mutex) = get_crdt() else { - return; - }; - let Ok(mut state) = state_mutex.lock() else { - return; - }; - - if let Some(&idx) = state.active_agent_index.get(agent_id) { - apply_and_persist(&mut state, |s| { - s.crdt.doc.active_agents[idx] - .story_id - .set(story_id.to_string()) - }); - apply_and_persist(&mut state, |s| { - s.crdt.doc.active_agents[idx] - .node_id - .set(node_id.to_string()) - }); - apply_and_persist(&mut state, |s| { - s.crdt.doc.active_agents[idx].started_at.set(started_at) - }); - } else { - let entry: JsonValue = json!({ - "agent_id": agent_id, - "story_id": story_id, - "node_id": node_id, - "started_at": started_at, - }) - .into(); - apply_and_persist(&mut state, |s| { - s.crdt.doc.active_agents.insert(ROOT_ID, entry) - }); - state.active_agent_index = rebuild_active_agent_index(&state.crdt); - } -} - -/// Read all active-agent entries. -pub fn read_all_active_agents() -> Option> { - let state_mutex = get_crdt()?; - let state = state_mutex.lock().ok()?; - let mut out = Vec::new(); - for entry in state.crdt.doc.active_agents.iter() { - if let Some(v) = extract_active_agent_view(entry) { - out.push(v); - } - } - Some(out) -} - -/// Read a single active-agent entry by `agent_id`. -pub fn read_active_agent(agent_id: &str) -> Option { - let state_mutex = get_crdt()?; - let state = state_mutex.lock().ok()?; - let &idx = state.active_agent_index.get(agent_id)?; - extract_active_agent_view(&state.crdt.doc.active_agents[idx]) -} - -/// Tombstone an active-agent entry by `agent_id`. -pub fn delete_active_agent(agent_id: &str) -> bool { - let Some(state_mutex) = get_crdt() else { - return false; - }; - let Ok(mut state) = state_mutex.lock() else { - return false; - }; - let Some(&idx) = state.active_agent_index.get(agent_id) else { - return false; - }; - let Some(op_id) = list_id_at(&state.crdt.doc.active_agents, idx) else { - return false; - }; - apply_and_persist(&mut state, |s| s.crdt.doc.active_agents.delete(op_id)); - state.active_agent_index = rebuild_active_agent_index(&state.crdt); - true -} - -fn extract_active_agent_view(entry: &ActiveAgentCrdt) -> Option { - let agent_id = match entry.agent_id.view() { - JsonValue::String(s) if !s.is_empty() => s, - _ => return None, - }; - let story_id = match entry.story_id.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - let node_id = match entry.node_id.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - let started_at = match entry.started_at.view() { - JsonValue::Number(n) => n, - _ => 0.0, - }; - Some(ActiveAgentView { - agent_id, - story_id, - node_id, - started_at, - }) -} - -// ── test_jobs ───────────────────────────────────────────────────────── - -/// Write or update a test-job entry keyed by `story_id`. -pub fn write_test_job( - story_id: &str, - status: &str, - started_at: f64, - finished_at: Option, - output: Option<&str>, -) { - let Some(state_mutex) = get_crdt() else { - return; - }; - let Ok(mut state) = state_mutex.lock() else { - return; - }; - - if let Some(&idx) = state.test_job_index.get(story_id) { - apply_and_persist(&mut state, |s| { - s.crdt.doc.test_jobs[idx].status.set(status.to_string()) - }); - apply_and_persist(&mut state, |s| { - s.crdt.doc.test_jobs[idx].started_at.set(started_at) - }); - if let Some(fa) = finished_at { - apply_and_persist(&mut state, |s| { - s.crdt.doc.test_jobs[idx].finished_at.set(fa) - }); - } - if let Some(o) = output { - apply_and_persist(&mut state, |s| { - s.crdt.doc.test_jobs[idx].output.set(o.to_string()) - }); - } - } else { - let entry: JsonValue = json!({ - "story_id": story_id, - "status": status, - "started_at": started_at, - "finished_at": finished_at.unwrap_or(0.0), - "output": output.unwrap_or(""), - }) - .into(); - apply_and_persist(&mut state, |s| s.crdt.doc.test_jobs.insert(ROOT_ID, entry)); - state.test_job_index = rebuild_test_job_index(&state.crdt); - } -} - -/// Read all test-job entries. -pub fn read_all_test_jobs() -> Option> { - let state_mutex = get_crdt()?; - let state = state_mutex.lock().ok()?; - let mut out = Vec::new(); - for entry in state.crdt.doc.test_jobs.iter() { - if let Some(v) = extract_test_job_view(entry) { - out.push(v); - } - } - Some(out) -} - -/// Read a single test-job entry by `story_id`. -pub fn read_test_job(story_id: &str) -> Option { - let state_mutex = get_crdt()?; - let state = state_mutex.lock().ok()?; - let &idx = state.test_job_index.get(story_id)?; - extract_test_job_view(&state.crdt.doc.test_jobs[idx]) -} - -/// Tombstone a test-job entry by `story_id`. -pub fn delete_test_job(story_id: &str) -> bool { - let Some(state_mutex) = get_crdt() else { - return false; - }; - let Ok(mut state) = state_mutex.lock() else { - return false; - }; - let Some(&idx) = state.test_job_index.get(story_id) else { - return false; - }; - let Some(op_id) = list_id_at(&state.crdt.doc.test_jobs, idx) else { - return false; - }; - apply_and_persist(&mut state, |s| s.crdt.doc.test_jobs.delete(op_id)); - state.test_job_index = rebuild_test_job_index(&state.crdt); - true -} - -fn extract_test_job_view(entry: &TestJobCrdt) -> Option { - let story_id = match entry.story_id.view() { - JsonValue::String(s) if !s.is_empty() => s, - _ => return None, - }; - let status = match entry.status.view() { - JsonValue::String(s) => s, - _ => String::new(), - }; - let started_at = match entry.started_at.view() { - JsonValue::Number(n) => n, - _ => 0.0, - }; - let finished_at = match entry.finished_at.view() { - JsonValue::Number(n) if n > 0.0 => Some(n), - _ => None, - }; - let output = match entry.output.view() { - JsonValue::String(s) if !s.is_empty() => Some(s), - _ => None, - }; - Some(TestJobView { - story_id, - status, - started_at, - finished_at, - output, - }) -} - -// ── agent_throttle ──────────────────────────────────────────────────── - -/// Write or update an agent-throttle entry keyed by `node_id`. -pub fn write_agent_throttle(node_id: &str, window_start: f64, count: f64, limit: f64) { - let Some(state_mutex) = get_crdt() else { - return; - }; - let Ok(mut state) = state_mutex.lock() else { - return; - }; - - if let Some(&idx) = state.agent_throttle_index.get(node_id) { - apply_and_persist(&mut state, |s| { - s.crdt.doc.agent_throttle[idx] - .window_start - .set(window_start) - }); - apply_and_persist(&mut state, |s| { - s.crdt.doc.agent_throttle[idx].count.set(count) - }); - apply_and_persist(&mut state, |s| { - s.crdt.doc.agent_throttle[idx].limit.set(limit) - }); - } else { - let entry: JsonValue = json!({ - "node_id": node_id, - "window_start": window_start, - "count": count, - "limit": limit, - }) - .into(); - apply_and_persist(&mut state, |s| { - s.crdt.doc.agent_throttle.insert(ROOT_ID, entry) - }); - state.agent_throttle_index = rebuild_agent_throttle_index(&state.crdt); - } -} - -/// Read all agent-throttle entries. -pub fn read_all_agent_throttles() -> Option> { - let state_mutex = get_crdt()?; - let state = state_mutex.lock().ok()?; - let mut out = Vec::new(); - for entry in state.crdt.doc.agent_throttle.iter() { - if let Some(v) = extract_agent_throttle_view(entry) { - out.push(v); - } - } - Some(out) -} - -/// Read a single agent-throttle entry by `node_id`. -pub fn read_agent_throttle(node_id: &str) -> Option { - let state_mutex = get_crdt()?; - let state = state_mutex.lock().ok()?; - let &idx = state.agent_throttle_index.get(node_id)?; - extract_agent_throttle_view(&state.crdt.doc.agent_throttle[idx]) -} - -/// Tombstone an agent-throttle entry by `node_id`. -pub fn delete_agent_throttle(node_id: &str) -> bool { - let Some(state_mutex) = get_crdt() else { - return false; - }; - let Ok(mut state) = state_mutex.lock() else { - return false; - }; - let Some(&idx) = state.agent_throttle_index.get(node_id) else { - return false; - }; - let Some(op_id) = list_id_at(&state.crdt.doc.agent_throttle, idx) else { - return false; - }; - apply_and_persist(&mut state, |s| s.crdt.doc.agent_throttle.delete(op_id)); - state.agent_throttle_index = rebuild_agent_throttle_index(&state.crdt); - true -} - -fn extract_agent_throttle_view(entry: &AgentThrottleCrdt) -> Option { - let node_id = match entry.node_id.view() { - JsonValue::String(s) if !s.is_empty() => s, - _ => return None, - }; - let window_start = match entry.window_start.view() { - JsonValue::Number(n) => n, - _ => 0.0, - }; - let count = match entry.count.view() { - JsonValue::Number(n) => n, - _ => 0.0, - }; - let limit = match entry.limit.view() { - JsonValue::Number(n) => n, - _ => 0.0, - }; - Some(AgentThrottleView { - node_id, - window_start, - count, - limit, - }) -} - -// ── gateway_projects ────────────────────────────────────────────────── - -/// Write or update a gateway-project entry keyed by `name`. -/// -/// If an entry for `name` already exists it is updated in place; -/// otherwise a new entry is inserted. -pub fn write_gateway_project(name: &str, url: &str) { - let Some(state_mutex) = get_crdt() else { - return; - }; - let Ok(mut state) = state_mutex.lock() else { - return; - }; - - if let Some(&idx) = state.gateway_project_index.get(name) { - apply_and_persist(&mut state, |s| { - s.crdt.doc.gateway_projects[idx].url.set(url.to_string()) - }); - } else { - let entry: JsonValue = json!({ - "name": name, - "url": url, - }) - .into(); - apply_and_persist(&mut state, |s| { - s.crdt.doc.gateway_projects.insert(ROOT_ID, entry) - }); - state.gateway_project_index = rebuild_gateway_project_index(&state.crdt); - } -} - -/// Read all gateway-project entries. -pub fn read_all_gateway_projects() -> Option> { - let state_mutex = get_crdt()?; - let state = state_mutex.lock().ok()?; - let mut out = Vec::new(); - for entry in state.crdt.doc.gateway_projects.iter() { - if let Some(v) = extract_gateway_project_view(entry) { - out.push(v); - } - } - Some(out) -} - -/// Read a single gateway-project entry by `name`. -pub fn read_gateway_project(name: &str) -> Option { - let state_mutex = get_crdt()?; - let state = state_mutex.lock().ok()?; - let &idx = state.gateway_project_index.get(name)?; - extract_gateway_project_view(&state.crdt.doc.gateway_projects[idx]) -} - -/// Tombstone a gateway-project entry by `name`. -/// -/// Returns `true` if the entry existed and a delete op was issued. -pub fn delete_gateway_project(name: &str) -> bool { - let Some(state_mutex) = get_crdt() else { - return false; - }; - let Ok(mut state) = state_mutex.lock() else { - return false; - }; - let Some(&idx) = state.gateway_project_index.get(name) else { - return false; - }; - let Some(op_id) = list_id_at(&state.crdt.doc.gateway_projects, idx) else { - return false; - }; - apply_and_persist(&mut state, |s| s.crdt.doc.gateway_projects.delete(op_id)); - state.gateway_project_index = rebuild_gateway_project_index(&state.crdt); - true -} - -fn extract_gateway_project_view(entry: &GatewayProjectCrdt) -> Option { - let name = match entry.name.view() { - JsonValue::String(s) if !s.is_empty() => s, - _ => return None, - }; - let url = match entry.url.view() { - JsonValue::String(s) => s, - _ => String::new(), - }; - Some(GatewayProjectView { name, url }) -} - -// ── Tests ───────────────────────────────────────────────────────────── -// -// The `tokens` collection is used as the representative collection for the -// required CRDT semantics tests (insert, update, delete-via-tombstone, and -// concurrent write semantics). - -#[cfg(test)] -mod tests { - use super::super::state::init_for_test; - use super::*; - use bft_json_crdt::json_crdt::{BaseCrdt, JsonValue, OpState}; - use bft_json_crdt::keypair::make_keypair; - use bft_json_crdt::op::ROOT_ID; - - use super::super::types::PipelineDoc; - - // ── insert ──────────────────────────────────────────────────────── - - #[test] - fn token_insert_is_visible_via_read() { - init_for_test(); - write_token_usage("coder-1:42", "42", 100.0, 200.0, 1_000_000.0); - - let view = read_token_usage("coder-1:42").expect("entry must exist after insert"); - assert_eq!(view.agent_id, "coder-1:42"); - assert_eq!(view.story_id.as_deref(), Some("42")); - assert!((view.input_tokens - 100.0).abs() < f64::EPSILON); - assert!((view.output_tokens - 200.0).abs() < f64::EPSILON); - assert!((view.timestamp - 1_000_000.0).abs() < f64::EPSILON); - } - - #[test] - fn token_read_all_returns_inserted_entries() { - init_for_test(); - write_token_usage("coder-a:10", "10", 10.0, 20.0, 1.0); - write_token_usage("coder-b:10", "10", 30.0, 40.0, 2.0); - - let all = read_all_token_usage().unwrap_or_default(); - let ids: Vec<&str> = all.iter().map(|v| v.agent_id.as_str()).collect(); - assert!( - ids.contains(&"coder-a:10"), - "coder-a:10 must be in read_all" - ); - assert!( - ids.contains(&"coder-b:10"), - "coder-b:10 must be in read_all" - ); - } - - // ── update ──────────────────────────────────────────────────────── - - #[test] - fn token_update_overwrites_fields() { - init_for_test(); - write_token_usage("coder-2:55", "55", 50.0, 60.0, 2_000_000.0); - // Update with new token counts. - write_token_usage("coder-2:55", "55", 500.0, 600.0, 3_000_000.0); - - let view = read_token_usage("coder-2:55").expect("entry must exist after update"); - assert!((view.input_tokens - 500.0).abs() < f64::EPSILON); - assert!((view.output_tokens - 600.0).abs() < f64::EPSILON); - assert!((view.timestamp - 3_000_000.0).abs() < f64::EPSILON); - } - - // ── delete-via-tombstone ────────────────────────────────────────── - - #[test] - fn token_delete_removes_entry_from_read() { - init_for_test(); - write_token_usage("coder-3:77", "77", 1.0, 2.0, 9_999.0); - assert!( - read_token_usage("coder-3:77").is_some(), - "entry must exist before delete" - ); - - let deleted = delete_token_usage("coder-3:77"); - assert!(deleted, "delete must return true for a known entry"); - - assert!( - read_token_usage("coder-3:77").is_none(), - "entry must be absent after tombstone" - ); - } - - #[test] - fn token_delete_nonexistent_returns_false() { - init_for_test(); - assert!(!delete_token_usage("no-such-agent")); - } - - #[test] - fn token_delete_not_returned_by_read_all() { - init_for_test(); - write_token_usage("coder-4:88", "88", 5.0, 10.0, 1.0); - delete_token_usage("coder-4:88"); - - let all = read_all_token_usage().unwrap_or_default(); - assert!( - !all.iter().any(|v| v.agent_id == "coder-4:88"), - "deleted entry must not appear in read_all" - ); - } - - // ── concurrent write semantics (LWW convergence) ────────────────── - - #[test] - fn token_concurrent_writes_converge_via_lww() { - // Two independent CRDTs simulate two nodes writing concurrently. - let kp_a = make_keypair(); - let kp_b = make_keypair(); - let mut crdt_a = BaseCrdt::::new(&kp_a); - let mut crdt_b = BaseCrdt::::new(&kp_b); - - // Node A inserts a token entry. - let entry_a: JsonValue = json!({ - "agent_id": "coder-x:99", - "story_id": "99", - "input_tokens": 10.0, - "output_tokens": 20.0, - "timestamp": 1.0, - }) - .into(); - let insert_a = crdt_a.doc.tokens.insert(ROOT_ID, entry_a).sign(&kp_a); - assert_eq!(crdt_a.apply(insert_a.clone()), OpState::Ok); - - // Node B inserts the same key with different values. - let entry_b: JsonValue = json!({ - "agent_id": "coder-x:99", - "story_id": "99", - "input_tokens": 999.0, - "output_tokens": 888.0, - "timestamp": 1.0, - }) - .into(); - let insert_b = crdt_b.doc.tokens.insert(ROOT_ID, entry_b).sign(&kp_b); - assert_eq!(crdt_b.apply(insert_b.clone()), OpState::Ok); - - // Both nodes update input_tokens concurrently. - let update_a = crdt_a.doc.tokens[0].input_tokens.set(111.0).sign(&kp_a); - let update_b = crdt_b.doc.tokens[0].input_tokens.set(222.0).sign(&kp_b); - - assert_eq!(crdt_a.apply(update_a.clone()), OpState::Ok); - assert_eq!(crdt_b.apply(update_b.clone()), OpState::Ok); - - // Cross-apply: A gets B's ops, B gets A's ops. - assert_eq!(crdt_a.apply(insert_b), OpState::Ok); - assert_eq!(crdt_a.apply(update_b), OpState::Ok); - assert_eq!(crdt_b.apply(insert_a), OpState::Ok); - assert_eq!(crdt_b.apply(update_a), OpState::Ok); - - // Both CRDTs must converge to the same view — compare field by field. - assert_eq!( - crdt_a.doc.tokens.view().len(), - crdt_b.doc.tokens.view().len(), - "both peers must have the same number of token entries" - ); - assert_eq!( - crdt_a.doc.tokens[0].input_tokens.view(), - crdt_b.doc.tokens[0].input_tokens.view(), - "concurrent writes to input_tokens must converge to the same value" - ); - assert_eq!( - crdt_a.doc.tokens[0].output_tokens.view(), - crdt_b.doc.tokens[0].output_tokens.view(), - "concurrent writes must converge for output_tokens" - ); - } - - // ── smoke tests for the other four collections ──────────────────── - - #[test] - fn merge_job_insert_update_delete() { - init_for_test(); - write_merge_job("100", "pending", 1.0, None, None); - let v = read_merge_job("100").expect("merge job must exist"); - assert_eq!(v.status, "pending"); - - write_merge_job("100", "done", 1.0, Some(2.0), None); - let v2 = read_merge_job("100").expect("merge job must exist after update"); - assert_eq!(v2.status, "done"); - assert_eq!(v2.finished_at, Some(2.0)); - - assert!(delete_merge_job("100")); - assert!(read_merge_job("100").is_none()); - } - - #[test] - fn active_agent_insert_update_delete() { - init_for_test(); - write_active_agent("coder-5", "200", "node-abc", 5.0); - let v = read_active_agent("coder-5").expect("active agent must exist"); - assert_eq!(v.story_id.as_deref(), Some("200")); - - write_active_agent("coder-5", "201", "node-abc", 6.0); - let v2 = read_active_agent("coder-5").expect("active agent must exist after update"); - assert_eq!(v2.story_id.as_deref(), Some("201")); - - assert!(delete_active_agent("coder-5")); - assert!(read_active_agent("coder-5").is_none()); - } - - #[test] - fn test_job_insert_update_delete() { - init_for_test(); - write_test_job("300", "running", 7.0, None, None); - let v = read_test_job("300").expect("test job must exist"); - assert_eq!(v.status, "running"); - - write_test_job("300", "pass", 7.0, Some(8.0), Some("all green")); - let v2 = read_test_job("300").expect("test job must exist after update"); - assert_eq!(v2.status, "pass"); - assert_eq!(v2.output.as_deref(), Some("all green")); - - assert!(delete_test_job("300")); - assert!(read_test_job("300").is_none()); - } - - #[test] - fn agent_throttle_insert_update_delete() { - init_for_test(); - write_agent_throttle("node-z", 1000.0, 2.0, 5.0); - let v = read_agent_throttle("node-z").expect("throttle must exist"); - assert!((v.count - 2.0).abs() < f64::EPSILON); - assert!((v.limit - 5.0).abs() < f64::EPSILON); - - write_agent_throttle("node-z", 1000.0, 4.0, 5.0); - let v2 = read_agent_throttle("node-z").expect("throttle must exist after update"); - assert!((v2.count - 4.0).abs() < f64::EPSILON); - - assert!(delete_agent_throttle("node-z")); - assert!(read_agent_throttle("node-z").is_none()); - } - - // ── merge_jobs: concurrent-write LWW resolution ─────────────────── - - #[test] - fn merge_job_concurrent_writes_converge_via_lww() { - // Two independent CRDTs simulate two nodes writing concurrently. - let kp_a = make_keypair(); - let kp_b = make_keypair(); - let mut crdt_a = BaseCrdt::::new(&kp_a); - let mut crdt_b = BaseCrdt::::new(&kp_b); - - // Node A inserts a merge-job entry. - let entry_a: JsonValue = json!({ - "story_id": "500_story_concurrent", - "status": "pending", - "started_at": 1.0, - "finished_at": 0.0, - "error": "", - }) - .into(); - let insert_a = crdt_a.doc.merge_jobs.insert(ROOT_ID, entry_a).sign(&kp_a); - assert_eq!(crdt_a.apply(insert_a.clone()), OpState::Ok); - - // Node B inserts the same story_id with a different status concurrently. - let entry_b: JsonValue = json!({ - "story_id": "500_story_concurrent", - "status": "running", - "started_at": 1.0, - "finished_at": 0.0, - "error": "", - }) - .into(); - let insert_b = crdt_b.doc.merge_jobs.insert(ROOT_ID, entry_b).sign(&kp_b); - assert_eq!(crdt_b.apply(insert_b.clone()), OpState::Ok); - - // Both nodes concurrently update the status field. - let update_a = crdt_a.doc.merge_jobs[0] - .status - .set("done".to_string()) - .sign(&kp_a); - let update_b = crdt_b.doc.merge_jobs[0] - .status - .set("failed".to_string()) - .sign(&kp_b); - - assert_eq!(crdt_a.apply(update_a.clone()), OpState::Ok); - assert_eq!(crdt_b.apply(update_b.clone()), OpState::Ok); - - // Cross-apply: A gets B's ops, B gets A's ops. - assert_eq!(crdt_a.apply(insert_b), OpState::Ok); - assert_eq!(crdt_a.apply(update_b), OpState::Ok); - assert_eq!(crdt_b.apply(insert_a), OpState::Ok); - assert_eq!(crdt_b.apply(update_a), OpState::Ok); - - // Both CRDTs must converge to the same view. - assert_eq!( - crdt_a.doc.merge_jobs.view().len(), - crdt_b.doc.merge_jobs.view().len(), - "both peers must have the same number of merge_job entries" - ); - assert_eq!( - crdt_a.doc.merge_jobs[0].status.view(), - crdt_b.doc.merge_jobs[0].status.view(), - "concurrent writes to status must converge to the same value via LWW" - ); - } -} diff --git a/server/src/crdt_state/lww_maps/active_agents.rs b/server/src/crdt_state/lww_maps/active_agents.rs new file mode 100644 index 00000000..bbc463af --- /dev/null +++ b/server/src/crdt_state/lww_maps/active_agents.rs @@ -0,0 +1,116 @@ +//! Read/write helpers for the `active_agents` LWW-map collection. +//! +//! Active-agent entries are keyed by `agent_id` and track which story and node +//! an agent is currently working on, along with when it started. + +use bft_json_crdt::json_crdt::{JsonValue, *}; +use bft_json_crdt::op::ROOT_ID; +use serde_json::json; + +use super::super::state::{apply_and_persist, get_crdt, rebuild_active_agent_index}; +use super::super::types::{ActiveAgentCrdt, ActiveAgentView}; +use super::list_id_at; + +/// Write or update an active-agent entry keyed by `agent_id`. +pub fn write_active_agent(agent_id: &str, story_id: &str, node_id: &str, started_at: f64) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + + if let Some(&idx) = state.active_agent_index.get(agent_id) { + apply_and_persist(&mut state, |s| { + s.crdt.doc.active_agents[idx] + .story_id + .set(story_id.to_string()) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.active_agents[idx] + .node_id + .set(node_id.to_string()) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.active_agents[idx].started_at.set(started_at) + }); + } else { + let entry: JsonValue = json!({ + "agent_id": agent_id, + "story_id": story_id, + "node_id": node_id, + "started_at": started_at, + }) + .into(); + apply_and_persist(&mut state, |s| { + s.crdt.doc.active_agents.insert(ROOT_ID, entry) + }); + state.active_agent_index = rebuild_active_agent_index(&state.crdt); + } +} + +/// Read all active-agent entries. +pub fn read_all_active_agents() -> Option> { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + let mut out = Vec::new(); + for entry in state.crdt.doc.active_agents.iter() { + if let Some(v) = extract_active_agent_view(entry) { + out.push(v); + } + } + Some(out) +} + +/// Read a single active-agent entry by `agent_id`. +pub fn read_active_agent(agent_id: &str) -> Option { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + let &idx = state.active_agent_index.get(agent_id)?; + extract_active_agent_view(&state.crdt.doc.active_agents[idx]) +} + +/// Tombstone an active-agent entry by `agent_id`. +pub fn delete_active_agent(agent_id: &str) -> bool { + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + let Some(&idx) = state.active_agent_index.get(agent_id) else { + return false; + }; + let Some(op_id) = list_id_at(&state.crdt.doc.active_agents, idx) else { + return false; + }; + apply_and_persist(&mut state, |s| s.crdt.doc.active_agents.delete(op_id)); + state.active_agent_index = rebuild_active_agent_index(&state.crdt); + true +} + +/// Convert a CRDT active-agent entry into its read-only view representation. +pub(super) fn extract_active_agent_view(entry: &ActiveAgentCrdt) -> Option { + let agent_id = match entry.agent_id.view() { + JsonValue::String(s) if !s.is_empty() => s, + _ => return None, + }; + let story_id = match entry.story_id.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let node_id = match entry.node_id.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let started_at = match entry.started_at.view() { + JsonValue::Number(n) => n, + _ => 0.0, + }; + Some(ActiveAgentView { + agent_id, + story_id, + node_id, + started_at, + }) +} diff --git a/server/src/crdt_state/lww_maps/agent_throttle.rs b/server/src/crdt_state/lww_maps/agent_throttle.rs new file mode 100644 index 00000000..bff80d1d --- /dev/null +++ b/server/src/crdt_state/lww_maps/agent_throttle.rs @@ -0,0 +1,114 @@ +//! Read/write helpers for the `agent_throttle` LWW-map collection. +//! +//! Agent-throttle entries are keyed by `node_id` and track per-node sliding- +//! window launch counts used to enforce agent concurrency limits. + +use bft_json_crdt::json_crdt::{JsonValue, *}; +use bft_json_crdt::op::ROOT_ID; +use serde_json::json; + +use super::super::state::{apply_and_persist, get_crdt, rebuild_agent_throttle_index}; +use super::super::types::{AgentThrottleCrdt, AgentThrottleView}; +use super::list_id_at; + +/// Write or update an agent-throttle entry keyed by `node_id`. +pub fn write_agent_throttle(node_id: &str, window_start: f64, count: f64, limit: f64) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + + if let Some(&idx) = state.agent_throttle_index.get(node_id) { + apply_and_persist(&mut state, |s| { + s.crdt.doc.agent_throttle[idx] + .window_start + .set(window_start) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.agent_throttle[idx].count.set(count) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.agent_throttle[idx].limit.set(limit) + }); + } else { + let entry: JsonValue = json!({ + "node_id": node_id, + "window_start": window_start, + "count": count, + "limit": limit, + }) + .into(); + apply_and_persist(&mut state, |s| { + s.crdt.doc.agent_throttle.insert(ROOT_ID, entry) + }); + state.agent_throttle_index = rebuild_agent_throttle_index(&state.crdt); + } +} + +/// Read all agent-throttle entries. +pub fn read_all_agent_throttles() -> Option> { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + let mut out = Vec::new(); + for entry in state.crdt.doc.agent_throttle.iter() { + if let Some(v) = extract_agent_throttle_view(entry) { + out.push(v); + } + } + Some(out) +} + +/// Read a single agent-throttle entry by `node_id`. +pub fn read_agent_throttle(node_id: &str) -> Option { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + let &idx = state.agent_throttle_index.get(node_id)?; + extract_agent_throttle_view(&state.crdt.doc.agent_throttle[idx]) +} + +/// Tombstone an agent-throttle entry by `node_id`. +pub fn delete_agent_throttle(node_id: &str) -> bool { + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + let Some(&idx) = state.agent_throttle_index.get(node_id) else { + return false; + }; + let Some(op_id) = list_id_at(&state.crdt.doc.agent_throttle, idx) else { + return false; + }; + apply_and_persist(&mut state, |s| s.crdt.doc.agent_throttle.delete(op_id)); + state.agent_throttle_index = rebuild_agent_throttle_index(&state.crdt); + true +} + +/// Convert a CRDT agent-throttle entry into its read-only view representation. +pub(super) fn extract_agent_throttle_view(entry: &AgentThrottleCrdt) -> Option { + let node_id = match entry.node_id.view() { + JsonValue::String(s) if !s.is_empty() => s, + _ => return None, + }; + let window_start = match entry.window_start.view() { + JsonValue::Number(n) => n, + _ => 0.0, + }; + let count = match entry.count.view() { + JsonValue::Number(n) => n, + _ => 0.0, + }; + let limit = match entry.limit.view() { + JsonValue::Number(n) => n, + _ => 0.0, + }; + Some(AgentThrottleView { + node_id, + window_start, + count, + limit, + }) +} diff --git a/server/src/crdt_state/lww_maps/gateway_projects.rs b/server/src/crdt_state/lww_maps/gateway_projects.rs new file mode 100644 index 00000000..4af647d4 --- /dev/null +++ b/server/src/crdt_state/lww_maps/gateway_projects.rs @@ -0,0 +1,98 @@ +//! Read/write helpers for the `gateway_projects` LWW-map collection. +//! +//! Gateway-project entries are keyed by `name` and store the URL of each +//! project container registered with the gateway. + +use bft_json_crdt::json_crdt::{JsonValue, *}; +use bft_json_crdt::op::ROOT_ID; +use serde_json::json; + +use super::super::state::{apply_and_persist, get_crdt, rebuild_gateway_project_index}; +use super::super::types::{GatewayProjectCrdt, GatewayProjectView}; +use super::list_id_at; + +/// Write or update a gateway-project entry keyed by `name`. +/// +/// If an entry for `name` already exists it is updated in place; +/// otherwise a new entry is inserted. +pub fn write_gateway_project(name: &str, url: &str) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + + if let Some(&idx) = state.gateway_project_index.get(name) { + apply_and_persist(&mut state, |s| { + s.crdt.doc.gateway_projects[idx].url.set(url.to_string()) + }); + } else { + let entry: JsonValue = json!({ + "name": name, + "url": url, + }) + .into(); + apply_and_persist(&mut state, |s| { + s.crdt.doc.gateway_projects.insert(ROOT_ID, entry) + }); + state.gateway_project_index = rebuild_gateway_project_index(&state.crdt); + } +} + +/// Read all gateway-project entries. +pub fn read_all_gateway_projects() -> Option> { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + let mut out = Vec::new(); + for entry in state.crdt.doc.gateway_projects.iter() { + if let Some(v) = extract_gateway_project_view(entry) { + out.push(v); + } + } + Some(out) +} + +/// Read a single gateway-project entry by `name`. +pub fn read_gateway_project(name: &str) -> Option { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + let &idx = state.gateway_project_index.get(name)?; + extract_gateway_project_view(&state.crdt.doc.gateway_projects[idx]) +} + +/// Tombstone a gateway-project entry by `name`. +/// +/// Returns `true` if the entry existed and a delete op was issued. +pub fn delete_gateway_project(name: &str) -> bool { + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + let Some(&idx) = state.gateway_project_index.get(name) else { + return false; + }; + let Some(op_id) = list_id_at(&state.crdt.doc.gateway_projects, idx) else { + return false; + }; + apply_and_persist(&mut state, |s| s.crdt.doc.gateway_projects.delete(op_id)); + state.gateway_project_index = rebuild_gateway_project_index(&state.crdt); + true +} + +/// Convert a CRDT gateway-project entry into its read-only view representation. +pub(super) fn extract_gateway_project_view( + entry: &GatewayProjectCrdt, +) -> Option { + let name = match entry.name.view() { + JsonValue::String(s) if !s.is_empty() => s, + _ => return None, + }; + let url = match entry.url.view() { + JsonValue::String(s) => s, + _ => String::new(), + }; + Some(GatewayProjectView { name, url }) +} diff --git a/server/src/crdt_state/lww_maps/merge_jobs.rs b/server/src/crdt_state/lww_maps/merge_jobs.rs new file mode 100644 index 00000000..2e138a6a --- /dev/null +++ b/server/src/crdt_state/lww_maps/merge_jobs.rs @@ -0,0 +1,129 @@ +//! Read/write helpers for the `merge_jobs` LWW-map collection. +//! +//! Merge-job entries are keyed by `story_id` and track the status, timing, +//! and optional error for each story merge operation. + +use bft_json_crdt::json_crdt::{JsonValue, *}; +use bft_json_crdt::op::ROOT_ID; +use serde_json::json; + +use super::super::state::{apply_and_persist, get_crdt, rebuild_merge_job_index}; +use super::super::types::{MergeJobCrdt, MergeJobView}; +use super::list_id_at; + +/// Write or update a merge-job entry keyed by `story_id`. +pub fn write_merge_job( + story_id: &str, + status: &str, + started_at: f64, + finished_at: Option, + error: Option<&str>, +) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + + if let Some(&idx) = state.merge_job_index.get(story_id) { + apply_and_persist(&mut state, |s| { + s.crdt.doc.merge_jobs[idx].status.set(status.to_string()) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.merge_jobs[idx].started_at.set(started_at) + }); + if let Some(fa) = finished_at { + apply_and_persist(&mut state, |s| { + s.crdt.doc.merge_jobs[idx].finished_at.set(fa) + }); + } + if let Some(e) = error { + apply_and_persist(&mut state, |s| { + s.crdt.doc.merge_jobs[idx].error.set(e.to_string()) + }); + } + } else { + let entry: JsonValue = json!({ + "story_id": story_id, + "status": status, + "started_at": started_at, + "finished_at": finished_at.unwrap_or(0.0), + "error": error.unwrap_or(""), + }) + .into(); + apply_and_persist(&mut state, |s| s.crdt.doc.merge_jobs.insert(ROOT_ID, entry)); + state.merge_job_index = rebuild_merge_job_index(&state.crdt); + } +} + +/// Read all merge-job entries. +pub fn read_all_merge_jobs() -> Option> { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + let mut out = Vec::new(); + for entry in state.crdt.doc.merge_jobs.iter() { + if let Some(v) = extract_merge_job_view(entry) { + out.push(v); + } + } + Some(out) +} + +/// Read a single merge-job entry by `story_id`. +pub fn read_merge_job(story_id: &str) -> Option { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + let &idx = state.merge_job_index.get(story_id)?; + extract_merge_job_view(&state.crdt.doc.merge_jobs[idx]) +} + +/// Tombstone a merge-job entry by `story_id`. +pub fn delete_merge_job(story_id: &str) -> bool { + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + let Some(&idx) = state.merge_job_index.get(story_id) else { + return false; + }; + let Some(op_id) = list_id_at(&state.crdt.doc.merge_jobs, idx) else { + return false; + }; + apply_and_persist(&mut state, |s| s.crdt.doc.merge_jobs.delete(op_id)); + state.merge_job_index = rebuild_merge_job_index(&state.crdt); + true +} + +/// Convert a CRDT merge-job entry into its read-only view representation. +pub(super) fn extract_merge_job_view(entry: &MergeJobCrdt) -> Option { + let story_id = match entry.story_id.view() { + JsonValue::String(s) if !s.is_empty() => s, + _ => return None, + }; + let status = match entry.status.view() { + JsonValue::String(s) => s, + _ => String::new(), + }; + let started_at = match entry.started_at.view() { + JsonValue::Number(n) => n, + _ => 0.0, + }; + let finished_at = match entry.finished_at.view() { + JsonValue::Number(n) if n > 0.0 => Some(n), + _ => None, + }; + let error = match entry.error.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + Some(MergeJobView { + story_id, + status, + started_at, + finished_at, + error, + }) +} diff --git a/server/src/crdt_state/lww_maps/mod.rs b/server/src/crdt_state/lww_maps/mod.rs new file mode 100644 index 00000000..d25bc656 --- /dev/null +++ b/server/src/crdt_state/lww_maps/mod.rs @@ -0,0 +1,56 @@ +//! Read/write helpers for the five LWW-map CRDT collections: +//! `tokens`, `merge_jobs`, `active_agents`, `test_jobs`, and `agent_throttle`. +//! +//! Each collection is backed by a `ListCrdt` with a primary-key field. +//! A secondary index in [`super::state::CrdtState`] provides O(1) lookup by +//! key. The write functions follow the same insert-or-update pattern used by +//! [`super::presence`] for the `nodes` collection. + +#![allow(dead_code)] + +use bft_json_crdt::json_crdt::CrdtNode; +use bft_json_crdt::list_crdt::ListCrdt; +use bft_json_crdt::op::OpId; + +mod active_agents; +mod agent_throttle; +mod gateway_projects; +mod merge_jobs; +mod test_jobs; +mod tokens; + +#[cfg(test)] +mod tests; + +pub use active_agents::{ + delete_active_agent, read_active_agent, read_all_active_agents, write_active_agent, +}; +pub use agent_throttle::{ + delete_agent_throttle, read_agent_throttle, read_all_agent_throttles, write_agent_throttle, +}; +pub use gateway_projects::{ + delete_gateway_project, read_all_gateway_projects, read_gateway_project, write_gateway_project, +}; +pub use merge_jobs::{delete_merge_job, read_all_merge_jobs, read_merge_job, write_merge_job}; +pub use test_jobs::{delete_test_job, read_all_test_jobs, read_test_job, write_test_job}; +pub use tokens::{delete_token_usage, read_all_token_usage, read_token_usage, write_token_usage}; + +/// Return the `OpId` of the entry at iter-position `idx`, consistent with +/// `ListCrdt::iter()` and the `[]` index operator. +/// +/// The built-in `id_at` counts non-deleted entries including the root sentinel +/// (which has `content = None`), causing an off-by-one mismatch with +/// `iter().enumerate()`. This helper requires `content.is_some()` so that the +/// secondary-index position maps correctly to an OpId. +pub(super) fn list_id_at(list: &ListCrdt, idx: usize) -> Option { + let mut i = 0; + for op in &list.ops { + if !op.is_deleted && op.content.is_some() { + if i == idx { + return Some(op.id); + } + i += 1; + } + } + None +} diff --git a/server/src/crdt_state/lww_maps/test_jobs.rs b/server/src/crdt_state/lww_maps/test_jobs.rs new file mode 100644 index 00000000..5930aa75 --- /dev/null +++ b/server/src/crdt_state/lww_maps/test_jobs.rs @@ -0,0 +1,129 @@ +//! Read/write helpers for the `test_jobs` LWW-map collection. +//! +//! Test-job entries are keyed by `story_id` and track the status, timing, +//! and captured output for each test run. + +use bft_json_crdt::json_crdt::{JsonValue, *}; +use bft_json_crdt::op::ROOT_ID; +use serde_json::json; + +use super::super::state::{apply_and_persist, get_crdt, rebuild_test_job_index}; +use super::super::types::{TestJobCrdt, TestJobView}; +use super::list_id_at; + +/// Write or update a test-job entry keyed by `story_id`. +pub fn write_test_job( + story_id: &str, + status: &str, + started_at: f64, + finished_at: Option, + output: Option<&str>, +) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + + if let Some(&idx) = state.test_job_index.get(story_id) { + apply_and_persist(&mut state, |s| { + s.crdt.doc.test_jobs[idx].status.set(status.to_string()) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.test_jobs[idx].started_at.set(started_at) + }); + if let Some(fa) = finished_at { + apply_and_persist(&mut state, |s| { + s.crdt.doc.test_jobs[idx].finished_at.set(fa) + }); + } + if let Some(o) = output { + apply_and_persist(&mut state, |s| { + s.crdt.doc.test_jobs[idx].output.set(o.to_string()) + }); + } + } else { + let entry: JsonValue = json!({ + "story_id": story_id, + "status": status, + "started_at": started_at, + "finished_at": finished_at.unwrap_or(0.0), + "output": output.unwrap_or(""), + }) + .into(); + apply_and_persist(&mut state, |s| s.crdt.doc.test_jobs.insert(ROOT_ID, entry)); + state.test_job_index = rebuild_test_job_index(&state.crdt); + } +} + +/// Read all test-job entries. +pub fn read_all_test_jobs() -> Option> { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + let mut out = Vec::new(); + for entry in state.crdt.doc.test_jobs.iter() { + if let Some(v) = extract_test_job_view(entry) { + out.push(v); + } + } + Some(out) +} + +/// Read a single test-job entry by `story_id`. +pub fn read_test_job(story_id: &str) -> Option { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + let &idx = state.test_job_index.get(story_id)?; + extract_test_job_view(&state.crdt.doc.test_jobs[idx]) +} + +/// Tombstone a test-job entry by `story_id`. +pub fn delete_test_job(story_id: &str) -> bool { + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + let Some(&idx) = state.test_job_index.get(story_id) else { + return false; + }; + let Some(op_id) = list_id_at(&state.crdt.doc.test_jobs, idx) else { + return false; + }; + apply_and_persist(&mut state, |s| s.crdt.doc.test_jobs.delete(op_id)); + state.test_job_index = rebuild_test_job_index(&state.crdt); + true +} + +/// Convert a CRDT test-job entry into its read-only view representation. +pub(super) fn extract_test_job_view(entry: &TestJobCrdt) -> Option { + let story_id = match entry.story_id.view() { + JsonValue::String(s) if !s.is_empty() => s, + _ => return None, + }; + let status = match entry.status.view() { + JsonValue::String(s) => s, + _ => String::new(), + }; + let started_at = match entry.started_at.view() { + JsonValue::Number(n) => n, + _ => 0.0, + }; + let finished_at = match entry.finished_at.view() { + JsonValue::Number(n) if n > 0.0 => Some(n), + _ => None, + }; + let output = match entry.output.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + Some(TestJobView { + story_id, + status, + started_at, + finished_at, + output, + }) +} diff --git a/server/src/crdt_state/lww_maps/tests.rs b/server/src/crdt_state/lww_maps/tests.rs new file mode 100644 index 00000000..c029f800 --- /dev/null +++ b/server/src/crdt_state/lww_maps/tests.rs @@ -0,0 +1,297 @@ +//! Integration tests for the LWW-map CRDT collections. +//! +//! The `tokens` collection is used as the representative collection for the +//! required CRDT semantics tests (insert, update, delete-via-tombstone, and +//! concurrent write semantics). The remaining four collections have smoke tests. + +use super::super::state::init_for_test; +use super::*; +use bft_json_crdt::json_crdt::{BaseCrdt, JsonValue, OpState}; +use bft_json_crdt::keypair::make_keypair; +use bft_json_crdt::op::ROOT_ID; +use serde_json::json; + +use super::super::types::PipelineDoc; + +// ── insert ──────────────────────────────────────────────────────── + +#[test] +fn token_insert_is_visible_via_read() { + init_for_test(); + write_token_usage("coder-1:42", "42", 100.0, 200.0, 1_000_000.0); + + let view = read_token_usage("coder-1:42").expect("entry must exist after insert"); + assert_eq!(view.agent_id, "coder-1:42"); + assert_eq!(view.story_id.as_deref(), Some("42")); + assert!((view.input_tokens - 100.0).abs() < f64::EPSILON); + assert!((view.output_tokens - 200.0).abs() < f64::EPSILON); + assert!((view.timestamp - 1_000_000.0).abs() < f64::EPSILON); +} + +#[test] +fn token_read_all_returns_inserted_entries() { + init_for_test(); + write_token_usage("coder-a:10", "10", 10.0, 20.0, 1.0); + write_token_usage("coder-b:10", "10", 30.0, 40.0, 2.0); + + let all = read_all_token_usage().unwrap_or_default(); + let ids: Vec<&str> = all.iter().map(|v| v.agent_id.as_str()).collect(); + assert!( + ids.contains(&"coder-a:10"), + "coder-a:10 must be in read_all" + ); + assert!( + ids.contains(&"coder-b:10"), + "coder-b:10 must be in read_all" + ); +} + +// ── update ──────────────────────────────────────────────────────── + +#[test] +fn token_update_overwrites_fields() { + init_for_test(); + write_token_usage("coder-2:55", "55", 50.0, 60.0, 2_000_000.0); + // Update with new token counts. + write_token_usage("coder-2:55", "55", 500.0, 600.0, 3_000_000.0); + + let view = read_token_usage("coder-2:55").expect("entry must exist after update"); + assert!((view.input_tokens - 500.0).abs() < f64::EPSILON); + assert!((view.output_tokens - 600.0).abs() < f64::EPSILON); + assert!((view.timestamp - 3_000_000.0).abs() < f64::EPSILON); +} + +// ── delete-via-tombstone ────────────────────────────────────────── + +#[test] +fn token_delete_removes_entry_from_read() { + init_for_test(); + write_token_usage("coder-3:77", "77", 1.0, 2.0, 9_999.0); + assert!( + read_token_usage("coder-3:77").is_some(), + "entry must exist before delete" + ); + + let deleted = delete_token_usage("coder-3:77"); + assert!(deleted, "delete must return true for a known entry"); + + assert!( + read_token_usage("coder-3:77").is_none(), + "entry must be absent after tombstone" + ); +} + +#[test] +fn token_delete_nonexistent_returns_false() { + init_for_test(); + assert!(!delete_token_usage("no-such-agent")); +} + +#[test] +fn token_delete_not_returned_by_read_all() { + init_for_test(); + write_token_usage("coder-4:88", "88", 5.0, 10.0, 1.0); + delete_token_usage("coder-4:88"); + + let all = read_all_token_usage().unwrap_or_default(); + assert!( + !all.iter().any(|v| v.agent_id == "coder-4:88"), + "deleted entry must not appear in read_all" + ); +} + +// ── concurrent write semantics (LWW convergence) ────────────────── + +#[test] +fn token_concurrent_writes_converge_via_lww() { + // Two independent CRDTs simulate two nodes writing concurrently. + let kp_a = make_keypair(); + let kp_b = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + let mut crdt_b = BaseCrdt::::new(&kp_b); + + // Node A inserts a token entry. + let entry_a: JsonValue = json!({ + "agent_id": "coder-x:99", + "story_id": "99", + "input_tokens": 10.0, + "output_tokens": 20.0, + "timestamp": 1.0, + }) + .into(); + let insert_a = crdt_a.doc.tokens.insert(ROOT_ID, entry_a).sign(&kp_a); + assert_eq!(crdt_a.apply(insert_a.clone()), OpState::Ok); + + // Node B inserts the same key with different values. + let entry_b: JsonValue = json!({ + "agent_id": "coder-x:99", + "story_id": "99", + "input_tokens": 999.0, + "output_tokens": 888.0, + "timestamp": 1.0, + }) + .into(); + let insert_b = crdt_b.doc.tokens.insert(ROOT_ID, entry_b).sign(&kp_b); + assert_eq!(crdt_b.apply(insert_b.clone()), OpState::Ok); + + // Both nodes update input_tokens concurrently. + let update_a = crdt_a.doc.tokens[0].input_tokens.set(111.0).sign(&kp_a); + let update_b = crdt_b.doc.tokens[0].input_tokens.set(222.0).sign(&kp_b); + + assert_eq!(crdt_a.apply(update_a.clone()), OpState::Ok); + assert_eq!(crdt_b.apply(update_b.clone()), OpState::Ok); + + // Cross-apply: A gets B's ops, B gets A's ops. + assert_eq!(crdt_a.apply(insert_b), OpState::Ok); + assert_eq!(crdt_a.apply(update_b), OpState::Ok); + assert_eq!(crdt_b.apply(insert_a), OpState::Ok); + assert_eq!(crdt_b.apply(update_a), OpState::Ok); + + // Both CRDTs must converge to the same view — compare field by field. + assert_eq!( + crdt_a.doc.tokens.view().len(), + crdt_b.doc.tokens.view().len(), + "both peers must have the same number of token entries" + ); + assert_eq!( + crdt_a.doc.tokens[0].input_tokens.view(), + crdt_b.doc.tokens[0].input_tokens.view(), + "concurrent writes to input_tokens must converge to the same value" + ); + assert_eq!( + crdt_a.doc.tokens[0].output_tokens.view(), + crdt_b.doc.tokens[0].output_tokens.view(), + "concurrent writes must converge for output_tokens" + ); +} + +// ── smoke tests for the other four collections ──────────────────── + +#[test] +fn merge_job_insert_update_delete() { + init_for_test(); + write_merge_job("100", "pending", 1.0, None, None); + let v = read_merge_job("100").expect("merge job must exist"); + assert_eq!(v.status, "pending"); + + write_merge_job("100", "done", 1.0, Some(2.0), None); + let v2 = read_merge_job("100").expect("merge job must exist after update"); + assert_eq!(v2.status, "done"); + assert_eq!(v2.finished_at, Some(2.0)); + + assert!(delete_merge_job("100")); + assert!(read_merge_job("100").is_none()); +} + +#[test] +fn active_agent_insert_update_delete() { + init_for_test(); + write_active_agent("coder-5", "200", "node-abc", 5.0); + let v = read_active_agent("coder-5").expect("active agent must exist"); + assert_eq!(v.story_id.as_deref(), Some("200")); + + write_active_agent("coder-5", "201", "node-abc", 6.0); + let v2 = read_active_agent("coder-5").expect("active agent must exist after update"); + assert_eq!(v2.story_id.as_deref(), Some("201")); + + assert!(delete_active_agent("coder-5")); + assert!(read_active_agent("coder-5").is_none()); +} + +#[test] +fn test_job_insert_update_delete() { + init_for_test(); + write_test_job("300", "running", 7.0, None, None); + let v = read_test_job("300").expect("test job must exist"); + assert_eq!(v.status, "running"); + + write_test_job("300", "pass", 7.0, Some(8.0), Some("all green")); + let v2 = read_test_job("300").expect("test job must exist after update"); + assert_eq!(v2.status, "pass"); + assert_eq!(v2.output.as_deref(), Some("all green")); + + assert!(delete_test_job("300")); + assert!(read_test_job("300").is_none()); +} + +#[test] +fn agent_throttle_insert_update_delete() { + init_for_test(); + write_agent_throttle("node-z", 1000.0, 2.0, 5.0); + let v = read_agent_throttle("node-z").expect("throttle must exist"); + assert!((v.count - 2.0).abs() < f64::EPSILON); + assert!((v.limit - 5.0).abs() < f64::EPSILON); + + write_agent_throttle("node-z", 1000.0, 4.0, 5.0); + let v2 = read_agent_throttle("node-z").expect("throttle must exist after update"); + assert!((v2.count - 4.0).abs() < f64::EPSILON); + + assert!(delete_agent_throttle("node-z")); + assert!(read_agent_throttle("node-z").is_none()); +} + +// ── merge_jobs: concurrent-write LWW resolution ─────────────────── + +#[test] +fn merge_job_concurrent_writes_converge_via_lww() { + // Two independent CRDTs simulate two nodes writing concurrently. + let kp_a = make_keypair(); + let kp_b = make_keypair(); + let mut crdt_a = BaseCrdt::::new(&kp_a); + let mut crdt_b = BaseCrdt::::new(&kp_b); + + // Node A inserts a merge-job entry. + let entry_a: JsonValue = json!({ + "story_id": "500_story_concurrent", + "status": "pending", + "started_at": 1.0, + "finished_at": 0.0, + "error": "", + }) + .into(); + let insert_a = crdt_a.doc.merge_jobs.insert(ROOT_ID, entry_a).sign(&kp_a); + assert_eq!(crdt_a.apply(insert_a.clone()), OpState::Ok); + + // Node B inserts the same story_id with a different status concurrently. + let entry_b: JsonValue = json!({ + "story_id": "500_story_concurrent", + "status": "running", + "started_at": 1.0, + "finished_at": 0.0, + "error": "", + }) + .into(); + let insert_b = crdt_b.doc.merge_jobs.insert(ROOT_ID, entry_b).sign(&kp_b); + assert_eq!(crdt_b.apply(insert_b.clone()), OpState::Ok); + + // Both nodes concurrently update the status field. + let update_a = crdt_a.doc.merge_jobs[0] + .status + .set("done".to_string()) + .sign(&kp_a); + let update_b = crdt_b.doc.merge_jobs[0] + .status + .set("failed".to_string()) + .sign(&kp_b); + + assert_eq!(crdt_a.apply(update_a.clone()), OpState::Ok); + assert_eq!(crdt_b.apply(update_b.clone()), OpState::Ok); + + // Cross-apply: A gets B's ops, B gets A's ops. + assert_eq!(crdt_a.apply(insert_b), OpState::Ok); + assert_eq!(crdt_a.apply(update_b), OpState::Ok); + assert_eq!(crdt_b.apply(insert_a), OpState::Ok); + assert_eq!(crdt_b.apply(update_a), OpState::Ok); + + // Both CRDTs must converge to the same view. + assert_eq!( + crdt_a.doc.merge_jobs.view().len(), + crdt_b.doc.merge_jobs.view().len(), + "both peers must have the same number of merge_job entries" + ); + assert_eq!( + crdt_a.doc.merge_jobs[0].status.view(), + crdt_b.doc.merge_jobs[0].status.view(), + "concurrent writes to status must converge to the same value via LWW" + ); +} diff --git a/server/src/crdt_state/lww_maps/tokens.rs b/server/src/crdt_state/lww_maps/tokens.rs new file mode 100644 index 00000000..afc125f8 --- /dev/null +++ b/server/src/crdt_state/lww_maps/tokens.rs @@ -0,0 +1,130 @@ +//! Read/write helpers for the `tokens` LWW-map collection. +//! +//! Token-usage entries are keyed by `agent_id` and track per-agent input/output +//! token counts and a timestamp for the most recent write. + +use bft_json_crdt::json_crdt::{JsonValue, *}; +use bft_json_crdt::op::ROOT_ID; +use serde_json::json; + +use super::super::state::{apply_and_persist, get_crdt, rebuild_token_index}; +use super::super::types::{TokenUsageCrdt, TokenUsageView}; +use super::list_id_at; + +/// Write or update a token-usage entry keyed by `agent_id`. +/// +/// If an entry for `agent_id` already exists it is updated in place; +/// otherwise a new entry is inserted. +pub fn write_token_usage( + agent_id: &str, + story_id: &str, + input_tokens: f64, + output_tokens: f64, + timestamp: f64, +) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + + if let Some(&idx) = state.token_index.get(agent_id) { + apply_and_persist(&mut state, |s| { + s.crdt.doc.tokens[idx].story_id.set(story_id.to_string()) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.tokens[idx].input_tokens.set(input_tokens) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.tokens[idx].output_tokens.set(output_tokens) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.tokens[idx].timestamp.set(timestamp) + }); + } else { + let entry: JsonValue = json!({ + "agent_id": agent_id, + "story_id": story_id, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "timestamp": timestamp, + }) + .into(); + apply_and_persist(&mut state, |s| s.crdt.doc.tokens.insert(ROOT_ID, entry)); + state.token_index = rebuild_token_index(&state.crdt); + } +} + +/// Read all token-usage entries. +pub fn read_all_token_usage() -> Option> { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + let mut out = Vec::new(); + for entry in state.crdt.doc.tokens.iter() { + if let Some(v) = extract_token_view(entry) { + out.push(v); + } + } + Some(out) +} + +/// Read a single token-usage entry by `agent_id`. +pub fn read_token_usage(agent_id: &str) -> Option { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + let &idx = state.token_index.get(agent_id)?; + extract_token_view(&state.crdt.doc.tokens[idx]) +} + +/// Tombstone a token-usage entry by `agent_id`. +/// +/// Returns `true` if the entry existed and a delete op was issued. +pub fn delete_token_usage(agent_id: &str) -> bool { + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + let Some(&idx) = state.token_index.get(agent_id) else { + return false; + }; + let Some(op_id) = list_id_at(&state.crdt.doc.tokens, idx) else { + return false; + }; + apply_and_persist(&mut state, |s| s.crdt.doc.tokens.delete(op_id)); + state.token_index = rebuild_token_index(&state.crdt); + true +} + +/// Convert a CRDT token-usage entry into its read-only view representation. +pub(super) fn extract_token_view(entry: &TokenUsageCrdt) -> Option { + let agent_id = match entry.agent_id.view() { + JsonValue::String(s) if !s.is_empty() => s, + _ => return None, + }; + let story_id = match entry.story_id.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let input_tokens = match entry.input_tokens.view() { + JsonValue::Number(n) => n, + _ => 0.0, + }; + let output_tokens = match entry.output_tokens.view() { + JsonValue::Number(n) => n, + _ => 0.0, + }; + let timestamp = match entry.timestamp.view() { + JsonValue::Number(n) => n, + _ => 0.0, + }; + Some(TokenUsageView { + agent_id, + story_id, + input_tokens, + output_tokens, + timestamp, + }) +}