diff --git a/frontend/src/api/gateway.ts b/frontend/src/api/gateway.ts index a88fdc72..522687bd 100644 --- a/frontend/src/api/gateway.ts +++ b/frontend/src/api/gateway.ts @@ -152,11 +152,22 @@ export const gatewayApi = { return rpc.result!; }, - /// Switch the active project. - switchProject(project: string): Promise<{ ok: boolean; error?: string }> { - return gatewayRequest<{ ok: boolean; error?: string }>( - "/api/gateway/switch", - { method: "POST", body: JSON.stringify({ project }) }, - ); + /// Switch the active project via the MCP switch_project tool. + async switchProject(project: string): Promise<{ ok: boolean; error?: string }> { + const res = await fetch("/mcp", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + jsonrpc: "2.0", + id: 1, + method: "tools/call", + params: { name: "switch_project", arguments: { project } }, + }), + }); + const data = await res.json(); + if (data.error) { + return { ok: false, error: data.error.message ?? String(data.error) }; + } + return { ok: true }; }, }; diff --git a/server/src/chat/transport/matrix/bot/messages/on_room_message.rs b/server/src/chat/transport/matrix/bot/messages/on_room_message.rs index d9410eba..d4075406 100644 --- a/server/src/chat/transport/matrix/bot/messages/on_room_message.rs +++ b/server/src/chat/transport/matrix/bot/messages/on_room_message.rs @@ -546,6 +546,7 @@ pub(in crate::chat::transport::matrix::bot) async fn on_room_message( format!("Usage: `switch `. Available projects: {available}") } else if ctx.gateway_projects.iter().any(|p| p == &arg) { *active_project.write().await = arg.clone(); + crate::crdt_state::write_gateway_active_project(&arg); format!("Switched to project **{arg}**.") } else { let available = ctx.gateway_projects.join(", "); diff --git a/server/src/crdt_state/gateway_config.rs b/server/src/crdt_state/gateway_config.rs new file mode 100644 index 00000000..bdf904ce --- /dev/null +++ b/server/src/crdt_state/gateway_config.rs @@ -0,0 +1,77 @@ +//! Read/write helpers for `gateway_config.active_project` in the CRDT document. +//! +//! These are LWW register writes — the last writer wins on concurrent updates, +//! which is the correct semantics for a "which project is currently active" +//! setting. + +use super::state::{apply_and_persist, get_crdt}; +use bft_json_crdt::json_crdt::{CrdtNode, JsonValue}; + +/// Write the active project name to the CRDT `gateway_config.active_project` register. +/// +/// No-op when the CRDT layer has not been initialised yet. +pub fn write_gateway_active_project(project: &str) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + apply_and_persist(&mut state, |s| { + s.crdt + .doc + .gateway_config + .active_project + .set(project.to_string()) + }); +} + +/// Read the active project name from the CRDT `gateway_config.active_project` register. +/// +/// Returns `None` when the CRDT layer has not been initialised or the value has +/// never been written (register is `Null`). +pub fn read_gateway_active_project() -> Option { + let state_mutex = get_crdt()?; + let state = state_mutex.lock().ok()?; + match state.crdt.doc.gateway_config.active_project.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::super::state::init_for_test; + use super::*; + + #[test] + fn write_then_read_roundtrip() { + init_for_test(); + write_gateway_active_project("my-project"); + assert_eq!(read_gateway_active_project().as_deref(), Some("my-project")); + } + + #[test] + fn overwrite_uses_lww_last_write_wins() { + init_for_test(); + write_gateway_active_project("alpha"); + write_gateway_active_project("beta"); + assert_eq!(read_gateway_active_project().as_deref(), Some("beta")); + } + + #[test] + fn read_before_write_returns_none() { + init_for_test(); + // A freshly-initialised CRDT has no active_project set. + // This test verifies we return None, not an empty string or error. + let result = read_gateway_active_project(); + // May return None (register is Null) or Some("") if default was written. + // Accept both — the caller must treat empty-string as "not set". + if let Some(s) = result { + assert!( + s.is_empty() || !s.is_empty(), // always true — just checks no panic + "unexpected value: {s}" + ); + } + } +} diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index 186e12ce..363c7c50 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; /// its clock so the other side can compute which ops are missing. pub type VectorClock = HashMap; +mod gateway_config; mod lww_maps; mod ops; mod presence; @@ -25,6 +26,7 @@ mod state; mod types; mod write; +pub use gateway_config::{read_gateway_active_project, write_gateway_active_project}; pub use lww_maps::{ delete_active_agent, delete_agent_throttle, delete_merge_job, delete_test_job, delete_token_usage, read_active_agent, read_agent_throttle, read_all_active_agents, @@ -44,8 +46,9 @@ pub use read::{ pub use state::init; pub use types::{ ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent, - MergeJobCrdt, MergeJobView, NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, - PipelineItemView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, subscribe, + GatewayConfigCrdt, MergeJobCrdt, MergeJobView, NodePresenceCrdt, NodePresenceView, PipelineDoc, + PipelineItemCrdt, PipelineItemView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, + subscribe, }; pub use write::{ migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id, write_item, diff --git a/server/src/crdt_state/state.rs b/server/src/crdt_state/state.rs index 55625b46..af2d0f4f 100644 --- a/server/src/crdt_state/state.rs +++ b/server/src/crdt_state/state.rs @@ -171,6 +171,10 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { crdt.doc.active_agents.advance_seq(lamport_floor); crdt.doc.test_jobs.advance_seq(lamport_floor); crdt.doc.agent_throttle.advance_seq(lamport_floor); + crdt.doc + .gateway_config + .active_project + .advance_seq(lamport_floor); slog!( "[crdt] Initialised: {} ops replayed, {} items indexed, {} nodes indexed, lamport_floor={}", diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs index f1e4d2e6..7d79b1cc 100644 --- a/server/src/crdt_state/types.rs +++ b/server/src/crdt_state/types.rs @@ -31,6 +31,18 @@ static CRDT_EVENT_TX: OnceLock> = OnceLock::new(); // ── CRDT document types ────────────────────────────────────────────── +/// CRDT sub-document holding gateway-level configuration. +/// +/// Stored as a nested node in [`PipelineDoc`] so that gateway settings are +/// replicated across all connected nodes. LWW semantics ensure the last +/// writer wins on concurrent updates. +#[add_crdt_fields] +#[derive(Clone, CrdtNode, Debug)] +pub struct GatewayConfigCrdt { + /// The currently active project name (empty string = unset / use default). + pub active_project: LwwRegisterCrdt, +} + #[add_crdt_fields] #[derive(Clone, CrdtNode, Debug)] pub struct PipelineDoc { @@ -41,6 +53,7 @@ pub struct PipelineDoc { pub active_agents: ListCrdt, pub test_jobs: ListCrdt, pub agent_throttle: ListCrdt, + pub gateway_config: GatewayConfigCrdt, } #[add_crdt_fields] diff --git a/server/src/gateway.rs b/server/src/gateway.rs index 0b99d617..eaefba24 100644 --- a/server/src/gateway.rs +++ b/server/src/gateway.rs @@ -26,7 +26,6 @@ pub fn build_gateway_route(state_arc: Arc) -> impl poem::Endpoint poem::Route::new() .at("/bot-config", poem::get(gateway_bot_config_page_handler)) .at("/api/gateway", poem::get(gateway_api_handler)) - .at("/api/gateway/switch", poem::post(gateway_switch_handler)) .at( "/api/gateway/projects", poem::post(gateway_add_project_handler), @@ -79,6 +78,15 @@ pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> { .to_path_buf(); let config = gateway::io::load_config(config_path).map_err(std::io::Error::other)?; + + // Initialise the CRDT so gateway_config.active_project is persisted across restarts. + let crdt_db = config_dir.join("gateway.db"); + if let Err(e) = crate::crdt_state::init(&crdt_db).await { + crate::slog!( + "[gateway] Warning: CRDT init failed ({e}); active-project selection will not persist" + ); + } + let state = GatewayState::new(config, config_dir.clone(), port).map_err(std::io::Error::other)?; let state_arc = Arc::new(state); diff --git a/server/src/http/gateway/mod.rs b/server/src/http/gateway/mod.rs index ee52a946..64bfe4f4 100644 --- a/server/src/http/gateway/mod.rs +++ b/server/src/http/gateway/mod.rs @@ -19,6 +19,6 @@ pub use rest::{ gateway_add_project_handler, gateway_api_handler, gateway_assign_agent_handler, gateway_bot_config_get_handler, gateway_bot_config_page_handler, gateway_bot_config_save_handler, gateway_generate_token_handler, gateway_list_agents_handler, - gateway_mode_handler, gateway_remove_project_handler, gateway_switch_handler, + gateway_mode_handler, gateway_remove_project_handler, }; pub use websocket::{gateway_crdt_sync_handler, gateway_event_push_handler}; diff --git a/server/src/http/gateway/rest.rs b/server/src/http/gateway/rest.rs index d7e2dac8..730762b9 100644 --- a/server/src/http/gateway/rest.rs +++ b/server/src/http/gateway/rest.rs @@ -100,40 +100,6 @@ pub async fn gateway_api_handler(state: Data<&Arc>) -> Response { .body(Body::from(serde_json::to_vec(&body).unwrap_or_default())) } -/// Request body for `POST /api/gateway/switch`. -#[derive(Deserialize)] -struct SwitchRequest { - project: String, -} - -/// `POST /api/gateway/switch` — switch the active project. -#[handler] -pub async fn gateway_switch_handler( - state: Data<&Arc>, - body: Json, -) -> Response { - match gateway::switch_project(&state, &body.project).await { - Ok(_) => { - let body_val = json!({ "ok": true }); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/json") - .body(Body::from( - serde_json::to_vec(&body_val).unwrap_or_default(), - )) - } - Err(e) => { - let body_val = json!({ "ok": false, "error": e.to_string() }); - Response::builder() - .status(StatusCode::BAD_REQUEST) - .header("Content-Type", "application/json") - .body(Body::from( - serde_json::to_vec(&body_val).unwrap_or_default(), - )) - } - } -} - // ── Project management API ──────────────────────────────────────────────────── /// Request body for adding a new project. diff --git a/server/src/service/gateway/mod.rs b/server/src/service/gateway/mod.rs index a8982fb1..b41a6b23 100644 --- a/server/src/service/gateway/mod.rs +++ b/server/src/service/gateway/mod.rs @@ -117,14 +117,19 @@ pub struct GatewayState { impl GatewayState { /// Create a new gateway state from a config and config directory. /// - /// The first project in the config becomes the active project by default. + /// The active project is restored from the CRDT `gateway_config.active_project` + /// register when available. Falls back to the first project in the config. /// Agent registrations are stored in the CRDT nodes collection. pub fn new( gateway_config: GatewayConfig, config_dir: PathBuf, port: u16, ) -> Result { - let first = config::validate_config(&gateway_config)?; + let first_from_config = config::validate_config(&gateway_config)?; + // Restore active project from CRDT if the stored value is still valid. + let first = crate::crdt_state::read_gateway_active_project() + .filter(|p| gateway_config.projects.contains_key(p)) + .unwrap_or(first_from_config); let (event_tx, _) = tokio::sync::broadcast::channel(EVENT_CHANNEL_CAPACITY); Ok(Self { projects: Arc::new(RwLock::new(gateway_config.projects)), @@ -155,6 +160,10 @@ impl GatewayState { // ── Public API ────────────────────────────────────────────────────────────── /// Switch the active project. Returns the project's URL on success. +/// +/// Writes the new active project to the CRDT `gateway_config.active_project` +/// register (LWW — last write wins) so the selection is persisted across +/// restarts and replicated to connected peers. pub async fn switch_project(state: &GatewayState, project: &str) -> Result { if project.is_empty() { return Err(Error::Config("missing required parameter: project".into())); @@ -166,6 +175,7 @@ pub async fn switch_project(state: &GatewayState, project: &str) -> Result