huskies: merge 776
This commit is contained in:
@@ -35,11 +35,13 @@ fn list_id_at<T: CrdtNode>(list: &ListCrdt<T>, idx: usize) -> Option<OpId> {
|
||||
|
||||
use super::state::{
|
||||
apply_and_persist, get_crdt, rebuild_active_agent_index, rebuild_agent_throttle_index,
|
||||
rebuild_merge_job_index, rebuild_test_job_index, rebuild_token_index,
|
||||
rebuild_gateway_project_index, rebuild_merge_job_index, rebuild_test_job_index,
|
||||
rebuild_token_index,
|
||||
};
|
||||
use super::types::{
|
||||
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, MergeJobCrdt,
|
||||
MergeJobView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView,
|
||||
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, GatewayProjectCrdt,
|
||||
GatewayProjectView, MergeJobCrdt, MergeJobView, TestJobCrdt, TestJobView, TokenUsageCrdt,
|
||||
TokenUsageView,
|
||||
};
|
||||
|
||||
// ── tokens ───────────────────────────────────────────────────────────
|
||||
@@ -605,6 +607,91 @@ fn extract_agent_throttle_view(entry: &AgentThrottleCrdt) -> Option<AgentThrottl
|
||||
})
|
||||
}
|
||||
|
||||
// ── gateway_projects ──────────────────────────────────────────────────
|
||||
|
||||
/// Write or update a gateway-project entry keyed by `name`.
|
||||
///
|
||||
/// If an entry for `name` already exists it is updated in place;
|
||||
/// otherwise a new entry is inserted.
|
||||
pub fn write_gateway_project(name: &str, url: &str) {
|
||||
let Some(state_mutex) = get_crdt() else {
|
||||
return;
|
||||
};
|
||||
let Ok(mut state) = state_mutex.lock() else {
|
||||
return;
|
||||
};
|
||||
|
||||
if let Some(&idx) = state.gateway_project_index.get(name) {
|
||||
apply_and_persist(&mut state, |s| {
|
||||
s.crdt.doc.gateway_projects[idx].url.set(url.to_string())
|
||||
});
|
||||
} else {
|
||||
let entry: JsonValue = json!({
|
||||
"name": name,
|
||||
"url": url,
|
||||
})
|
||||
.into();
|
||||
apply_and_persist(&mut state, |s| {
|
||||
s.crdt.doc.gateway_projects.insert(ROOT_ID, entry)
|
||||
});
|
||||
state.gateway_project_index = rebuild_gateway_project_index(&state.crdt);
|
||||
}
|
||||
}
|
||||
|
||||
/// Read all gateway-project entries.
|
||||
pub fn read_all_gateway_projects() -> Option<Vec<GatewayProjectView>> {
|
||||
let state_mutex = get_crdt()?;
|
||||
let state = state_mutex.lock().ok()?;
|
||||
let mut out = Vec::new();
|
||||
for entry in state.crdt.doc.gateway_projects.iter() {
|
||||
if let Some(v) = extract_gateway_project_view(entry) {
|
||||
out.push(v);
|
||||
}
|
||||
}
|
||||
Some(out)
|
||||
}
|
||||
|
||||
/// Read a single gateway-project entry by `name`.
|
||||
pub fn read_gateway_project(name: &str) -> Option<GatewayProjectView> {
|
||||
let state_mutex = get_crdt()?;
|
||||
let state = state_mutex.lock().ok()?;
|
||||
let &idx = state.gateway_project_index.get(name)?;
|
||||
extract_gateway_project_view(&state.crdt.doc.gateway_projects[idx])
|
||||
}
|
||||
|
||||
/// Tombstone a gateway-project entry by `name`.
|
||||
///
|
||||
/// Returns `true` if the entry existed and a delete op was issued.
|
||||
pub fn delete_gateway_project(name: &str) -> bool {
|
||||
let Some(state_mutex) = get_crdt() else {
|
||||
return false;
|
||||
};
|
||||
let Ok(mut state) = state_mutex.lock() else {
|
||||
return false;
|
||||
};
|
||||
let Some(&idx) = state.gateway_project_index.get(name) else {
|
||||
return false;
|
||||
};
|
||||
let Some(op_id) = list_id_at(&state.crdt.doc.gateway_projects, idx) else {
|
||||
return false;
|
||||
};
|
||||
apply_and_persist(&mut state, |s| s.crdt.doc.gateway_projects.delete(op_id));
|
||||
state.gateway_project_index = rebuild_gateway_project_index(&state.crdt);
|
||||
true
|
||||
}
|
||||
|
||||
fn extract_gateway_project_view(entry: &GatewayProjectCrdt) -> Option<GatewayProjectView> {
|
||||
let name = match entry.name.view() {
|
||||
JsonValue::String(s) if !s.is_empty() => s,
|
||||
_ => return None,
|
||||
};
|
||||
let url = match entry.url.view() {
|
||||
JsonValue::String(s) => s,
|
||||
_ => String::new(),
|
||||
};
|
||||
Some(GatewayProjectView { name, url })
|
||||
}
|
||||
|
||||
// ── Tests ─────────────────────────────────────────────────────────────
|
||||
//
|
||||
// The `tokens` collection is used as the representative collection for the
|
||||
|
||||
@@ -28,11 +28,12 @@ mod write;
|
||||
|
||||
pub use gateway_config::{read_gateway_active_project, write_gateway_active_project};
|
||||
pub use lww_maps::{
|
||||
delete_active_agent, delete_agent_throttle, delete_merge_job, delete_test_job,
|
||||
delete_token_usage, read_active_agent, read_agent_throttle, read_all_active_agents,
|
||||
read_all_agent_throttles, read_all_merge_jobs, read_all_test_jobs, read_all_token_usage,
|
||||
delete_active_agent, delete_agent_throttle, delete_gateway_project, delete_merge_job,
|
||||
delete_test_job, delete_token_usage, read_active_agent, read_agent_throttle,
|
||||
read_all_active_agents, read_all_agent_throttles, read_all_gateway_projects,
|
||||
read_all_merge_jobs, read_all_test_jobs, read_all_token_usage, read_gateway_project,
|
||||
read_merge_job, read_test_job, read_token_usage, write_active_agent, write_agent_throttle,
|
||||
write_merge_job, write_test_job, write_token_usage,
|
||||
write_gateway_project, write_merge_job, write_test_job, write_token_usage,
|
||||
};
|
||||
pub use ops::{all_ops_json, apply_remote_op, ops_since, our_vector_clock, subscribe_ops};
|
||||
pub use presence::{
|
||||
@@ -46,9 +47,9 @@ pub use read::{
|
||||
pub use state::init;
|
||||
pub use types::{
|
||||
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent,
|
||||
GatewayConfigCrdt, MergeJobCrdt, MergeJobView, NodePresenceCrdt, NodePresenceView, PipelineDoc,
|
||||
PipelineItemCrdt, PipelineItemView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView,
|
||||
subscribe,
|
||||
GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, MergeJobView,
|
||||
NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView,
|
||||
TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, subscribe,
|
||||
};
|
||||
pub use write::{
|
||||
migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id, write_item,
|
||||
|
||||
@@ -71,6 +71,8 @@ pub(super) struct CrdtState {
|
||||
pub(super) test_job_index: HashMap<String, usize>,
|
||||
/// Maps node_id → index in the agent_throttle ListCrdt for O(1) lookup.
|
||||
pub(super) agent_throttle_index: HashMap<String, usize>,
|
||||
/// Maps project name → index in the gateway_projects ListCrdt for O(1) lookup.
|
||||
pub(super) gateway_project_index: HashMap<String, usize>,
|
||||
/// Channel sender for fire-and-forget op persistence.
|
||||
pub(super) persist_tx: mpsc::UnboundedSender<SignedOp>,
|
||||
/// Max sequence number seen across all ops during init() replay.
|
||||
@@ -161,6 +163,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
|
||||
let active_agent_index = rebuild_active_agent_index(&crdt);
|
||||
let test_job_index = rebuild_test_job_index(&crdt);
|
||||
let agent_throttle_index = rebuild_agent_throttle_index(&crdt);
|
||||
let gateway_project_index = rebuild_gateway_project_index(&crdt);
|
||||
|
||||
// Advance the top-level list clocks to the Lamport floor so that
|
||||
// list-level inserts don't re-emit low seq numbers.
|
||||
@@ -171,6 +174,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
|
||||
crdt.doc.active_agents.advance_seq(lamport_floor);
|
||||
crdt.doc.test_jobs.advance_seq(lamport_floor);
|
||||
crdt.doc.agent_throttle.advance_seq(lamport_floor);
|
||||
crdt.doc.gateway_projects.advance_seq(lamport_floor);
|
||||
crdt.doc
|
||||
.gateway_config
|
||||
.active_project
|
||||
@@ -228,6 +232,7 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
|
||||
active_agent_index,
|
||||
test_job_index,
|
||||
agent_throttle_index,
|
||||
gateway_project_index,
|
||||
persist_tx,
|
||||
lamport_floor,
|
||||
};
|
||||
@@ -271,6 +276,7 @@ pub fn init_for_test() {
|
||||
active_agent_index: HashMap::new(),
|
||||
test_job_index: HashMap::new(),
|
||||
agent_throttle_index: HashMap::new(),
|
||||
gateway_project_index: HashMap::new(),
|
||||
persist_tx,
|
||||
lamport_floor: 0,
|
||||
};
|
||||
@@ -385,6 +391,19 @@ pub(super) fn rebuild_agent_throttle_index(crdt: &BaseCrdt<PipelineDoc>) -> Hash
|
||||
map
|
||||
}
|
||||
|
||||
/// Rebuild the project name → gateway_projects list index.
|
||||
pub(super) fn rebuild_gateway_project_index(
|
||||
crdt: &BaseCrdt<PipelineDoc>,
|
||||
) -> HashMap<String, usize> {
|
||||
let mut map = HashMap::new();
|
||||
for (i, entry) in crdt.doc.gateway_projects.iter().enumerate() {
|
||||
if let JsonValue::String(ref k) = entry.name.view() {
|
||||
map.insert(k.clone(), i);
|
||||
}
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
// ── Write path ───────────────────────────────────────────────────────
|
||||
|
||||
/// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the
|
||||
@@ -597,6 +616,7 @@ mod tests {
|
||||
active_agent_index: HashMap::new(),
|
||||
test_job_index: HashMap::new(),
|
||||
agent_throttle_index: HashMap::new(),
|
||||
gateway_project_index: HashMap::new(),
|
||||
persist_tx,
|
||||
lamport_floor: 0,
|
||||
};
|
||||
@@ -669,6 +689,7 @@ mod tests {
|
||||
active_agent_index: HashMap::new(),
|
||||
test_job_index: HashMap::new(),
|
||||
agent_throttle_index: HashMap::new(),
|
||||
gateway_project_index: HashMap::new(),
|
||||
persist_tx,
|
||||
lamport_floor: 0,
|
||||
};
|
||||
|
||||
@@ -53,6 +53,7 @@ pub struct PipelineDoc {
|
||||
pub active_agents: ListCrdt<ActiveAgentCrdt>,
|
||||
pub test_jobs: ListCrdt<TestJobCrdt>,
|
||||
pub agent_throttle: ListCrdt<AgentThrottleCrdt>,
|
||||
pub gateway_projects: ListCrdt<GatewayProjectCrdt>,
|
||||
pub gateway_config: GatewayConfigCrdt,
|
||||
}
|
||||
|
||||
@@ -205,6 +206,16 @@ pub struct AgentThrottleCrdt {
|
||||
pub limit: LwwRegisterCrdt<f64>,
|
||||
}
|
||||
|
||||
/// CRDT entry for a gateway project registered in `gateway_config.projects`.
|
||||
#[add_crdt_fields]
|
||||
#[derive(Clone, CrdtNode, Debug)]
|
||||
pub struct GatewayProjectCrdt {
|
||||
/// Unique key: project name (e.g. `"huskies"`).
|
||||
pub name: LwwRegisterCrdt<String>,
|
||||
/// Container base URL (e.g. `"http://huskies:3001"`).
|
||||
pub url: LwwRegisterCrdt<String>,
|
||||
}
|
||||
|
||||
// ── LWW-map view types ───────────────────────────────────────────────
|
||||
|
||||
/// Snapshot of a single token-usage entry.
|
||||
@@ -256,6 +267,13 @@ pub struct AgentThrottleView {
|
||||
pub limit: f64,
|
||||
}
|
||||
|
||||
/// Snapshot of a single gateway-project entry.
|
||||
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub struct GatewayProjectView {
|
||||
pub name: String,
|
||||
pub url: String,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::super::state::emit_event;
|
||||
|
||||
Reference in New Issue
Block a user