//! Read/write helpers for the `agent_throttle` LWW-map collection. //! //! Agent-throttle entries are keyed by `node_id` and track per-node sliding- //! window launch counts used to enforce agent concurrency limits. use bft_json_crdt::json_crdt::{JsonValue, *}; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use super::super::state::{apply_and_persist, get_crdt, rebuild_agent_throttle_index}; use super::super::types::{AgentThrottleCrdt, AgentThrottleView}; use super::list_id_at; /// Write or update an agent-throttle entry keyed by `node_id`. pub fn write_agent_throttle(node_id: &str, window_start: f64, count: f64, limit: f64) { let Some(state_mutex) = get_crdt() else { return; }; let Ok(mut state) = state_mutex.lock() else { return; }; if let Some(&idx) = state.agent_throttle_index.get(node_id) { apply_and_persist(&mut state, |s| { s.crdt.doc.agent_throttle[idx] .window_start .set(window_start) }); apply_and_persist(&mut state, |s| { s.crdt.doc.agent_throttle[idx].count.set(count) }); apply_and_persist(&mut state, |s| { s.crdt.doc.agent_throttle[idx].limit.set(limit) }); } else { let entry: JsonValue = json!({ "node_id": node_id, "window_start": window_start, "count": count, "limit": limit, }) .into(); apply_and_persist(&mut state, |s| { s.crdt.doc.agent_throttle.insert(ROOT_ID, entry) }); state.agent_throttle_index = rebuild_agent_throttle_index(&state.crdt); } } /// Read all agent-throttle entries. pub fn read_all_agent_throttles() -> Option> { let state_mutex = get_crdt()?; let state = state_mutex.lock().ok()?; let mut out = Vec::new(); for entry in state.crdt.doc.agent_throttle.iter() { if let Some(v) = extract_agent_throttle_view(entry) { out.push(v); } } Some(out) } /// Read a single agent-throttle entry by `node_id`. pub fn read_agent_throttle(node_id: &str) -> Option { let state_mutex = get_crdt()?; let state = state_mutex.lock().ok()?; let &idx = state.agent_throttle_index.get(node_id)?; extract_agent_throttle_view(&state.crdt.doc.agent_throttle[idx]) } /// Tombstone an agent-throttle entry by `node_id`. pub fn delete_agent_throttle(node_id: &str) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.agent_throttle_index.get(node_id) else { return false; }; let Some(op_id) = list_id_at(&state.crdt.doc.agent_throttle, idx) else { return false; }; apply_and_persist(&mut state, |s| s.crdt.doc.agent_throttle.delete(op_id)); state.agent_throttle_index = rebuild_agent_throttle_index(&state.crdt); true } /// Convert a CRDT agent-throttle entry into its read-only view representation. pub(super) fn extract_agent_throttle_view(entry: &AgentThrottleCrdt) -> Option { let node_id = match entry.node_id.view() { JsonValue::String(s) if !s.is_empty() => s, _ => return None, }; let window_start = match entry.window_start.view() { JsonValue::Number(n) => n, _ => 0.0, }; let count = match entry.count.view() { JsonValue::Number(n) => n, _ => 0.0, }; let limit = match entry.limit.view() { JsonValue::Number(n) => n, _ => 0.0, }; Some(AgentThrottleView { node_id, window_start, count, limit, }) }