diff --git a/server/src/crdt_sync/server/mod.rs b/server/src/crdt_sync/server/mod.rs index 477828a9..71518742 100644 --- a/server/src/crdt_sync/server/mod.rs +++ b/server/src/crdt_sync/server/mod.rs @@ -4,16 +4,13 @@ use bft_json_crdt::json_crdt::SignedOp; use futures::{SinkExt, StreamExt}; use poem::handler; use poem::http::StatusCode; -use poem::web::Data; use poem::web::Query; use poem::web::websocket::{Message as WsMessage, WebSocket}; use serde::Deserialize; -use std::sync::Arc; use crate::crdt_snapshot; use crate::crdt_state; use crate::crdt_wire; -use crate::http::context::AppContext; use crate::slog; use crate::slog_warn; @@ -41,7 +38,6 @@ struct SyncQueryParams { pub async fn crdt_sync_handler( ws: WebSocket, - _ctx: Data<&Arc>, remote_addr: &poem::web::RemoteAddr, Query(params): Query, ) -> poem::Response { diff --git a/server/src/gateway.rs b/server/src/gateway.rs index 2866583a..dc8623ee 100644 --- a/server/src/gateway.rs +++ b/server/src/gateway.rs @@ -12,7 +12,7 @@ use std::sync::Arc; // Re-export public types that callers reference as `crate::gateway::*`. pub use crate::service::gateway::{ - GatewayConfig, GatewayState as GatewayStateType, GatewayStatusEvent, JoinedAgent, ProjectEntry, + GatewayConfig, GatewayState as GatewayStateType, GatewayStatusEvent, ProjectEntry, broadcast_status_event, fetch_all_project_pipeline_statuses, format_aggregate_status_compact, spawn_gateway_broadcaster_forwarder, spawn_gateway_notification_poller, subscribe_status_events, @@ -54,23 +54,6 @@ pub fn build_gateway_route(state_arc: Arc) -> impl poem::Endpoint "/gateway/tokens", poem::post(gateway_generate_token_handler), ) - .at( - "/gateway/register", - poem::post(gateway_register_agent_handler), - ) - .at("/gateway/agents", poem::get(gateway_list_agents_handler)) - .at( - "/gateway/agents/:id", - poem::delete(gateway_remove_agent_handler), - ) - .at( - "/gateway/agents/:id/assign", - poem::post(gateway_assign_agent_handler), - ) - .at( - "/gateway/agents/:id/heartbeat", - poem::post(gateway_heartbeat_handler), - ) .at( "/gateway/events/push", poem::get(gateway_event_push_handler), @@ -197,184 +180,6 @@ mod tests { assert!(tokens.contains_key(token)); } - #[tokio::test] - async fn register_agent_consumes_token() { - let state = make_test_state(); - - let token = "test-token-123".to_string(); - state.pending_tokens.write().await.insert( - token.clone(), - gateway::PendingToken { - created_at: chrono::Utc::now().timestamp() as f64, - }, - ); - - let app = poem::Route::new() - .at( - "/gateway/register", - poem::post(gateway_register_agent_handler), - ) - .data(state.clone()); - let cli = poem::test::TestClient::new(app); - let resp = cli - .post("/gateway/register") - .header("Content-Type", "application/json") - .body( - serde_json::json!({ - "token": token, - "label": "test-agent", - "address": "ws://localhost:3001/crdt-sync" - }) - .to_string(), - ) - .send() - .await; - assert_eq!(resp.0.status(), poem::http::StatusCode::OK); - assert!(state.pending_tokens.read().await.is_empty()); - let agents = state.joined_agents.read().await; - assert_eq!(agents.len(), 1); - assert_eq!(agents[0].label, "test-agent"); - } - - #[tokio::test] - async fn register_agent_rejects_invalid_token() { - let state = make_test_state(); - let app = poem::Route::new() - .at( - "/gateway/register", - poem::post(gateway_register_agent_handler), - ) - .data(state.clone()); - let cli = poem::test::TestClient::new(app); - let resp = cli - .post("/gateway/register") - .header("Content-Type", "application/json") - .body( - serde_json::json!({ - "token": "bad-token", - "label": "agent", - "address": "ws://localhost:3001/crdt-sync" - }) - .to_string(), - ) - .send() - .await; - assert_eq!(resp.0.status(), poem::http::StatusCode::UNAUTHORIZED); - assert!(state.joined_agents.read().await.is_empty()); - } - - #[tokio::test] - async fn list_agents_returns_registered_agents() { - let state = make_test_state(); - state - .joined_agents - .write() - .await - .push(gateway::JoinedAgent { - id: "id-1".into(), - label: "agent-1".into(), - address: "ws://a:3001/crdt-sync".into(), - registered_at: 0.0, - last_seen: 0.0, - assigned_project: None, - }); - let app = poem::Route::new() - .at("/gateway/agents", poem::get(gateway_list_agents_handler)) - .data(state.clone()); - let cli = poem::test::TestClient::new(app); - let resp = cli.get("/gateway/agents").send().await; - assert_eq!(resp.0.status(), poem::http::StatusCode::OK); - let agents: Vec = resp.0.into_body().into_json().await.unwrap(); - assert_eq!(agents.len(), 1); - assert_eq!(agents[0]["label"], "agent-1"); - } - - #[tokio::test] - async fn remove_agent_deletes_by_id() { - let state = make_test_state(); - state - .joined_agents - .write() - .await - .push(gateway::JoinedAgent { - id: "del-id".into(), - label: "to-delete".into(), - address: "ws://x:3001/crdt-sync".into(), - registered_at: 0.0, - last_seen: 0.0, - assigned_project: None, - }); - let app = poem::Route::new() - .at( - "/gateway/agents/:id", - poem::delete(gateway_remove_agent_handler), - ) - .data(state.clone()); - let cli = poem::test::TestClient::new(app); - let resp = cli.delete("/gateway/agents/del-id").send().await; - assert_eq!(resp.0.status(), poem::http::StatusCode::NO_CONTENT); - assert!(state.joined_agents.read().await.is_empty()); - } - - #[tokio::test] - async fn remove_agent_unknown_id_returns_not_found() { - let state = make_test_state(); - let app = poem::Route::new() - .at( - "/gateway/agents/:id", - poem::delete(gateway_remove_agent_handler), - ) - .data(state.clone()); - let cli = poem::test::TestClient::new(app); - let resp = cli.delete("/gateway/agents/no-such-id").send().await; - assert_eq!(resp.0.status(), poem::http::StatusCode::NOT_FOUND); - } - - #[tokio::test] - async fn heartbeat_updates_last_seen() { - let state = make_test_state(); - state - .joined_agents - .write() - .await - .push(gateway::JoinedAgent { - id: "hb-id".into(), - label: "hb-agent".into(), - address: "ws://hb:3001/crdt-sync".into(), - registered_at: 0.0, - last_seen: 0.0, - assigned_project: None, - }); - let app = poem::Route::new() - .at( - "/gateway/agents/:id/heartbeat", - poem::post(gateway_heartbeat_handler), - ) - .data(state.clone()); - let cli = poem::test::TestClient::new(app); - let resp = cli.post("/gateway/agents/hb-id/heartbeat").send().await; - assert_eq!(resp.0.status(), poem::http::StatusCode::NO_CONTENT); - let agents = state.joined_agents.read().await; - assert!(agents[0].last_seen > 0.0); - } - - #[tokio::test] - async fn heartbeat_unknown_id_returns_not_found() { - let state = make_test_state(); - let app = poem::Route::new() - .at( - "/gateway/agents/:id/heartbeat", - poem::post(gateway_heartbeat_handler), - ) - .data(state.clone()); - let cli = poem::test::TestClient::new(app); - let resp = cli - .post("/gateway/agents/no-such-id/heartbeat") - .send() - .await; - assert_eq!(resp.0.status(), poem::http::StatusCode::NOT_FOUND); - } - // ── Notification poller integration tests ──────────────────────────── #[tokio::test] diff --git a/server/src/http/gateway.rs b/server/src/http/gateway.rs index b5ccdaee..ac78e2cd 100644 --- a/server/src/http/gateway.rs +++ b/server/src/http/gateway.rs @@ -513,127 +513,6 @@ pub async fn gateway_generate_token_handler(state: Data<&Arc>) -> .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) } -/// Request body sent by a build agent when registering with the gateway. -#[derive(Deserialize)] -struct RegisterAgentRequest { - token: String, - label: String, - address: String, -} - -/// `POST /gateway/register` — build agent presents its join token and registers. -#[handler] -pub async fn gateway_register_agent_handler( - body: Body, - state: Data<&Arc>, -) -> Response { - let bytes = match body.into_bytes().await { - Ok(b) => b, - Err(_) => { - return Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::from("could not read request body")); - } - }; - - let req: RegisterAgentRequest = match serde_json::from_slice(&bytes) { - Ok(r) => r, - Err(_) => { - return Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::from("invalid JSON body")); - } - }; - - match gateway::register_agent(&state, &req.token, req.label, req.address).await { - Ok(agent) => { - let body = serde_json::to_vec(&agent).unwrap_or_default(); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from(body)) - } - Err(_) => Response::builder() - .status(StatusCode::UNAUTHORIZED) - .body(Body::from("invalid or already-used join token")), - } -} - -/// `GET /gateway/agents` — list all registered build agents. -#[handler] -pub async fn gateway_list_agents_handler(state: Data<&Arc>) -> Response { - let agents = state.joined_agents.read().await.clone(); - let body = serde_json::to_vec(&agents).unwrap_or_default(); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from(body)) -} - -/// `DELETE /gateway/agents/:id` — remove a registered build agent. -#[handler] -pub async fn gateway_remove_agent_handler( - PoemPath(id): PoemPath, - state: Data<&Arc>, -) -> Response { - if gateway::remove_agent(&state, &id).await { - Response::builder() - .status(StatusCode::NO_CONTENT) - .body(Body::empty()) - } else { - Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::from("agent not found")) - } -} - -/// Request body for assigning an agent to a project. -#[derive(Deserialize)] -struct AssignAgentRequest { - project: Option, -} - -/// `POST /gateway/agents/:id/assign` — assign or unassign an agent to a project. -#[handler] -pub async fn gateway_assign_agent_handler( - PoemPath(id): PoemPath, - body: Json, - state: Data<&Arc>, -) -> Response { - match gateway::assign_agent(&state, &id, body.0.project).await { - Ok(agent) => { - let body = serde_json::to_vec(&agent).unwrap_or_default(); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from(body)) - } - Err(gateway::Error::ProjectNotFound(msg)) => Response::builder() - .status(StatusCode::BAD_REQUEST) - .body(Body::from(msg)), - Err(_) => Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::from("agent not found")), - } -} - -/// `POST /gateway/agents/:id/heartbeat` — update an agent's last-seen timestamp. -#[handler] -pub async fn gateway_heartbeat_handler( - PoemPath(id): PoemPath, - state: Data<&Arc>, -) -> Response { - if gateway::heartbeat_agent(&state, &id).await { - Response::builder() - .status(StatusCode::NO_CONTENT) - .body(Body::empty()) - } else { - Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::from("agent not found")) - } -} - // ── Event-push WebSocket handler ──────────────────────────────────────────── /// Query parameters accepted on the `/gateway/events/push` WebSocket upgrade. diff --git a/server/src/service/gateway/io.rs b/server/src/service/gateway/io.rs index 7a5f89da..875859af 100644 --- a/server/src/service/gateway/io.rs +++ b/server/src/service/gateway/io.rs @@ -5,7 +5,6 @@ //! spawning the Matrix bot task, and the notification poller background task. use super::config::{GatewayConfig, ProjectEntry}; -use super::registration::JoinedAgent; pub use reqwest::Client; use serde_json::{Value, json}; use std::collections::{BTreeMap, HashMap}; @@ -20,16 +19,6 @@ pub fn load_config(path: &Path) -> Result { toml::from_str(&contents).map_err(|e| format!("invalid projects.toml: {e}")) } -/// Load persisted agents from `/gateway_agents.json`. -/// Returns an empty list if the file does not exist or cannot be parsed. -pub fn load_agents(config_dir: &Path) -> Vec { - let path = config_dir.join("gateway_agents.json"); - match std::fs::read(&path) { - Ok(data) => serde_json::from_slice(&data).unwrap_or_default(), - Err(_) => Vec::new(), - } -} - /// Persist the current projects map to `/projects.toml`. /// Silently ignores write errors or skips when `config_dir` is empty. pub async fn save_config(projects: &BTreeMap, config_dir: &Path) { @@ -45,18 +34,6 @@ pub async fn save_config(projects: &BTreeMap, config_dir: } } -/// Persist the current agent list to `/gateway_agents.json`. -/// Silently ignores write errors. -pub async fn save_agents(agents: &[JoinedAgent], config_dir: &Path) { - if config_dir == Path::new("") { - return; - } - let path = config_dir.join("gateway_agents.json"); - if let Ok(data) = serde_json::to_vec_pretty(agents) { - let _ = tokio::fs::write(&path, data).await; - } -} - // ── Bot config I/O ────────────────────────────────────────────────────────── /// Read the current raw bot.toml as key/value pairs for the configuration UI. diff --git a/server/src/service/gateway/mod.rs b/server/src/service/gateway/mod.rs index 64e93c28..a0c664ea 100644 --- a/server/src/service/gateway/mod.rs +++ b/server/src/service/gateway/mod.rs @@ -4,7 +4,6 @@ //! - `mod.rs` (this file) — public API, typed [`Error`], orchestration, `GatewayState` //! - `io.rs` — the ONLY place that performs side effects (filesystem, network, process spawn) //! - `config.rs` — pure config types and validation -//! - `registration.rs` — pure agent registration logic //! - `aggregation.rs` — pure cross-project pipeline formatting //! - `polling.rs` — pure notification event formatting @@ -12,7 +11,6 @@ pub mod aggregation; pub mod config; pub(crate) mod io; pub mod polling; -pub mod registration; pub use aggregation::format_aggregate_status_compact; pub use config::{GatewayConfig, ProjectEntry}; @@ -20,7 +18,6 @@ pub use io::{ fetch_all_project_pipeline_statuses, spawn_gateway_broadcaster_forwarder, spawn_gateway_notification_poller, }; -pub use registration::JoinedAgent; use io::Client; use std::collections::{BTreeMap, HashMap}; @@ -29,6 +26,8 @@ use std::sync::Arc; use tokio::sync::Mutex as TokioMutex; use tokio::sync::RwLock; +pub use crate::crdt_state::NodePresenceView; + // ── Status event broadcaster ──────────────────────────────────────────────── /// Capacity of the gateway status event broadcast channel. @@ -101,8 +100,6 @@ pub struct GatewayState { pub active_project: Arc>, /// HTTP client for proxying requests to project containers. pub client: Client, - /// Build agents that have joined this gateway. - pub joined_agents: Arc>>, /// One-time join tokens that have been issued but not yet consumed. pub(crate) pending_tokens: Arc>>, /// Directory containing `projects.toml` and the `.huskies/` subfolder. @@ -121,20 +118,18 @@ impl GatewayState { /// Create a new gateway state from a config and config directory. /// /// The first project in the config becomes the active project by default. - /// Previously registered agents are loaded from `gateway_agents.json`. + /// Agent registrations are stored in the CRDT nodes collection. pub fn new( gateway_config: GatewayConfig, config_dir: PathBuf, port: u16, ) -> Result { let first = config::validate_config(&gateway_config)?; - let agents = io::load_agents(&config_dir); let (event_tx, _) = tokio::sync::broadcast::channel(EVENT_CHANNEL_CAPACITY); Ok(Self { projects: Arc::new(RwLock::new(gateway_config.projects)), active_project: Arc::new(RwLock::new(first)), client: Client::new(), - joined_agents: Arc::new(RwLock::new(agents)), pending_tokens: Arc::new(RwLock::new(HashMap::new())), config_dir, port, @@ -187,82 +182,118 @@ pub async fn generate_join_token(state: &GatewayState) -> String { token } -/// Register a build agent with a join token. +/// Register a new build agent using a one-time join token. +/// +/// Validates and consumes the token, then writes the agent's node presence +/// and metadata to the CRDT collection. Returns the newly-created node view. +#[allow(dead_code)] pub async fn register_agent( state: &GatewayState, token: &str, label: String, address: String, -) -> Result { - // Validate and consume the token. - let mut tokens = state.pending_tokens.write().await; - if !tokens.contains_key(token) { - return Err(Error::DuplicateToken( - "invalid or already-used join token".into(), - )); +) -> Result { + { + let mut tokens = state.pending_tokens.write().await; + if !tokens.contains_key(token) { + return Err(Error::InvalidAgent( + "invalid or already-used join token".into(), + )); + } + tokens.remove(token); } - tokens.remove(token); - drop(tokens); + let node_id = uuid::Uuid::new_v4().to_string(); let now = chrono::Utc::now().timestamp() as f64; - let agent = registration::create_agent(uuid::Uuid::new_v4().to_string(), label, address, now); + let now_ms = chrono::Utc::now().timestamp_millis() as f64; + + crate::crdt_state::write_node_presence(&node_id, &address, now, true); + crate::crdt_state::write_node_metadata(&node_id, &label, None, now_ms); crate::slog!( - "[gateway] Agent '{}' registered (id={})", - agent.label, - agent.id + "[gateway] Registered agent '{label}' node_id={:.12}…", + &node_id ); - { - let mut agents = state.joined_agents.write().await; - agents.push(agent.clone()); - io::save_agents(&agents, &state.config_dir).await; - } - - Ok(agent) + crate::crdt_state::read_all_node_presence() + .unwrap_or_default() + .into_iter() + .find(|n| n.node_id == node_id) + .ok_or_else(|| Error::Upstream("node write did not persist".into())) } -/// Remove a registered agent by ID. Returns `true` if found and removed. -pub async fn remove_agent(state: &GatewayState, id: &str) -> bool { - let mut agents = state.joined_agents.write().await; - let removed = registration::remove_agent(&mut agents, id); - if removed { - io::save_agents(&agents, &state.config_dir).await; - crate::slog!("[gateway] Removed agent id={id}"); - } - removed +/// Tombstone a registered agent in the CRDT (set `alive = false`). +/// +/// Returns `true` if the node was found and tombstoned. +#[allow(dead_code)] +pub fn remove_agent(node_id: &str) -> bool { + let nodes = crate::crdt_state::read_all_node_presence().unwrap_or_default(); + let Some(node) = nodes.iter().find(|n| n.node_id == node_id) else { + return false; + }; + let now = chrono::Utc::now().timestamp() as f64; + crate::crdt_state::write_node_presence(node_id, &node.address, now, false); + true } -/// Assign or unassign an agent to a project. +/// Assign (or unassign) an agent to a project in the CRDT. +/// +/// Validates that the project exists in the gateway config (when assigning), +/// then writes the updated `assigned_project` field to the CRDT. +#[allow(dead_code)] pub async fn assign_agent( state: &GatewayState, - id: &str, + node_id: &str, project: Option, -) -> Result { - let project_clean = project.and_then(|p| if p.is_empty() { None } else { Some(p) }); - - let updated = { +) -> Result { + if let Some(ref p) = project { let projects = state.projects.read().await; - let mut agents = state.joined_agents.write().await; - registration::assign_agent(&mut agents, id, project_clean, &projects)? - }; + if !projects.contains_key(p.as_str()) { + return Err(Error::ProjectNotFound(format!("unknown project '{p}'"))); + } + } - crate::slog!( - "[gateway] Agent '{}' (id={}) assigned to {:?}", - updated.label, - updated.id, - updated.assigned_project + let nodes = crate::crdt_state::read_all_node_presence().unwrap_or_default(); + let node = nodes + .iter() + .find(|n| n.node_id == node_id) + .ok_or_else(|| Error::InvalidAgent(format!("agent not found: {node_id}")))?; + + let now_ms = chrono::Utc::now().timestamp_millis() as f64; + crate::crdt_state::write_node_metadata( + node_id, + node.label.as_deref().unwrap_or(""), + project.as_deref(), + now_ms, ); - let agents = state.joined_agents.read().await.clone(); - io::save_agents(&agents, &state.config_dir).await; - Ok(updated) + + crate::crdt_state::read_all_node_presence() + .unwrap_or_default() + .into_iter() + .find(|n| n.node_id == node_id) + .ok_or_else(|| Error::Upstream("node write did not persist".into())) } -/// Update an agent's heartbeat. Returns `true` if found. -pub async fn heartbeat_agent(state: &GatewayState, id: &str) -> bool { +/// Update an agent's heartbeat via CRDT. Returns `true` if the node was found. +#[allow(dead_code)] +pub fn heartbeat_agent(id: &str) -> bool { let now = chrono::Utc::now().timestamp() as f64; - let mut agents = state.joined_agents.write().await; - registration::heartbeat(&mut agents, id, now) + let nodes = crate::crdt_state::read_all_node_presence().unwrap_or_default(); + let Some(node) = nodes.iter().find(|n| n.node_id == id) else { + return false; + }; + crate::crdt_state::write_node_presence(id, &node.address, now, node.alive); + true +} + +/// List all registered build agents from the CRDT nodes collection. +#[allow(dead_code)] +pub fn list_agents() -> Vec { + crate::crdt_state::read_all_node_presence() + .unwrap_or_default() + .into_iter() + .filter(|n| n.alive) + .collect() } /// Add a new project to the gateway config. @@ -561,16 +592,18 @@ mod tests { } #[tokio::test] - async fn generate_and_register_agent() { + async fn register_agent_consumes_token_and_writes_crdt() { + crate::crdt_state::init_for_test(); let config = make_config(&[("test", "http://test:3001")]); let state = GatewayState::new(config, PathBuf::new(), 3000).unwrap(); let token = generate_join_token(&state).await; - let agent = register_agent(&state, &token, "test-agent".into(), "ws://a".into()) + let node = register_agent(&state, &token, "test-agent".into(), "ws://a".into()) .await .unwrap(); - assert_eq!(agent.label, "test-agent"); + assert_eq!(node.label.as_deref(), Some("test-agent")); assert!(state.pending_tokens.read().await.is_empty()); - assert_eq!(state.joined_agents.read().await.len(), 1); + let agents = list_agents(); + assert!(agents.iter().any(|n| n.node_id == node.node_id)); } #[tokio::test] @@ -582,31 +615,29 @@ mod tests { } #[tokio::test] - async fn remove_agent_success() { + async fn remove_agent_tombstones_crdt_node() { + crate::crdt_state::init_for_test(); let config = make_config(&[("test", "http://test:3001")]); let state = GatewayState::new(config, PathBuf::new(), 3000).unwrap(); let token = generate_join_token(&state).await; - let agent = register_agent(&state, &token, "a".into(), "ws://a".into()) + let node = register_agent(&state, &token, "a".into(), "ws://a".into()) .await .unwrap(); - assert!(remove_agent(&state, &agent.id).await); - assert!(state.joined_agents.read().await.is_empty()); + assert!(remove_agent(&node.node_id)); + let alive = list_agents(); + assert!(!alive.iter().any(|n| n.node_id == node.node_id)); } #[tokio::test] - async fn heartbeat_agent_updates_timestamp() { + async fn heartbeat_agent_returns_true_for_known_node() { + crate::crdt_state::init_for_test(); let config = make_config(&[("test", "http://test:3001")]); let state = GatewayState::new(config, PathBuf::new(), 3000).unwrap(); let token = generate_join_token(&state).await; - let agent = register_agent(&state, &token, "a".into(), "ws://a".into()) + let node = register_agent(&state, &token, "a".into(), "ws://a".into()) .await .unwrap(); - let old_ts = agent.last_seen; - // Small sleep to ensure timestamp differs. - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - assert!(heartbeat_agent(&state, &agent.id).await); - let agents = state.joined_agents.read().await; - assert!(agents[0].last_seen >= old_ts); + assert!(heartbeat_agent(&node.node_id)); } #[tokio::test] diff --git a/server/src/service/gateway/registration.rs b/server/src/service/gateway/registration.rs deleted file mode 100644 index 6f00f1de..00000000 --- a/server/src/service/gateway/registration.rs +++ /dev/null @@ -1,165 +0,0 @@ -//! Gateway agent registration — pure logic for managing build agents. -//! -//! Contains `JoinedAgent` and functions that validate and manipulate agent -//! state in memory. All persistence (disk I/O) lives in `io.rs`. - -use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; - -use super::config::ProjectEntry; - -/// A build agent that has registered with this gateway. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct JoinedAgent { - /// Unique ID assigned by the gateway on registration. - pub id: String, - /// Human-readable label provided by the agent (e.g. `build-agent-abc123`). - pub label: String, - /// The agent's CRDT-sync WebSocket address (e.g. `ws://host:3001/crdt-sync`). - pub address: String, - /// Unix timestamp when the agent registered. - pub registered_at: f64, - /// Unix timestamp of the last heartbeat from this agent. - #[serde(default)] - pub last_seen: f64, - /// Project this agent is assigned to, if any. - #[serde(default, skip_serializing_if = "Option::is_none")] - pub assigned_project: Option, -} - -/// Create a new `JoinedAgent` from registration data. -pub fn create_agent(id: String, label: String, address: String, now: f64) -> JoinedAgent { - JoinedAgent { - id, - label, - address, - registered_at: now, - last_seen: now, - assigned_project: None, - } -} - -/// Remove an agent by ID from the list. Returns `true` if found and removed. -pub fn remove_agent(agents: &mut Vec, id: &str) -> bool { - let before = agents.len(); - agents.retain(|a| a.id != id); - agents.len() < before -} - -/// Assign (or unassign) an agent to a project. -/// -/// Returns the updated agent on success, or an error if the agent or project -/// is not found. -pub fn assign_agent( - agents: &mut [JoinedAgent], - id: &str, - project: Option, - projects: &BTreeMap, -) -> Result { - // Validate project exists if assigning. - if let Some(ref p) = project - && !projects.contains_key(p.as_str()) - { - return Err(super::Error::ProjectNotFound(format!( - "unknown project '{p}'" - ))); - } - - match agents.iter_mut().find(|a| a.id == id) { - None => Err(super::Error::InvalidAgent(format!("agent not found: {id}"))), - Some(a) => { - a.assigned_project = project; - Ok(a.clone()) - } - } -} - -/// Update an agent's last-seen timestamp. Returns `true` if the agent was found. -pub fn heartbeat(agents: &mut [JoinedAgent], id: &str, now: f64) -> bool { - match agents.iter_mut().find(|a| a.id == id) { - None => false, - Some(a) => { - a.last_seen = now; - true - } - } -} - -// ── Tests ──────────────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn create_agent_sets_fields() { - let agent = create_agent("id-1".into(), "lbl".into(), "ws://a".into(), 100.0); - assert_eq!(agent.id, "id-1"); - assert_eq!(agent.label, "lbl"); - assert_eq!(agent.address, "ws://a"); - assert_eq!(agent.registered_at, 100.0); - assert_eq!(agent.last_seen, 100.0); - assert!(agent.assigned_project.is_none()); - } - - #[test] - fn remove_agent_by_id() { - let mut agents = vec![ - create_agent("a".into(), "A".into(), "ws://a".into(), 0.0), - create_agent("b".into(), "B".into(), "ws://b".into(), 0.0), - ]; - assert!(remove_agent(&mut agents, "a")); - assert_eq!(agents.len(), 1); - assert_eq!(agents[0].id, "b"); - } - - #[test] - fn remove_agent_missing_returns_false() { - let mut agents = vec![]; - assert!(!remove_agent(&mut agents, "x")); - } - - #[test] - fn assign_agent_to_valid_project() { - let mut projects = BTreeMap::new(); - projects.insert( - "proj".into(), - ProjectEntry { - url: "http://p".into(), - }, - ); - let mut agents = vec![create_agent("a".into(), "A".into(), "ws://a".into(), 0.0)]; - let result = assign_agent(&mut agents, "a", Some("proj".into()), &projects); - assert!(result.is_ok()); - assert_eq!(result.unwrap().assigned_project, Some("proj".into())); - } - - #[test] - fn assign_agent_to_unknown_project_fails() { - let projects = BTreeMap::new(); - let mut agents = vec![create_agent("a".into(), "A".into(), "ws://a".into(), 0.0)]; - let result = assign_agent(&mut agents, "a", Some("nope".into()), &projects); - assert!(result.is_err()); - } - - #[test] - fn assign_agent_unknown_id_fails() { - let projects = BTreeMap::new(); - let mut agents: Vec = vec![]; - let result = assign_agent(&mut agents, "x", None, &projects); - assert!(result.is_err()); - } - - #[test] - fn heartbeat_updates_last_seen() { - let mut agents = vec![create_agent("a".into(), "A".into(), "ws://a".into(), 0.0)]; - assert!(heartbeat(&mut agents, "a", 999.0)); - assert_eq!(agents[0].last_seen, 999.0); - } - - #[test] - fn heartbeat_unknown_id_returns_false() { - let mut agents: Vec = vec![]; - assert!(!heartbeat(&mut agents, "x", 1.0)); - } -}