chore: switch mergemaster to opus and add cargo fmt guidance
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
+137
-24
@@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize};
|
||||
use serde_json::{Value, json};
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use uuid::Uuid;
|
||||
@@ -64,6 +64,9 @@ pub struct JoinedAgent {
|
||||
pub address: String,
|
||||
/// Unix timestamp when the agent registered.
|
||||
pub registered_at: f64,
|
||||
/// Project this agent is assigned to, if any.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub assigned_project: Option<String>,
|
||||
}
|
||||
|
||||
/// A one-time join token that has been generated but not yet consumed.
|
||||
@@ -80,6 +83,14 @@ struct RegisterAgentRequest {
|
||||
address: String,
|
||||
}
|
||||
|
||||
/// Request body for assigning or reassigning an agent to a project.
|
||||
///
|
||||
/// Send `{"project": "my-project"}` to assign, or `{"project": null}` to unassign.
|
||||
#[derive(Deserialize)]
|
||||
struct AssignAgentRequest {
|
||||
project: Option<String>,
|
||||
}
|
||||
|
||||
// ── Gateway state ────────────────────────────────────────────────────
|
||||
|
||||
/// Shared gateway state threaded through HTTP handlers.
|
||||
@@ -95,22 +106,51 @@ pub struct GatewayState {
|
||||
pub joined_agents: Arc<RwLock<Vec<JoinedAgent>>>,
|
||||
/// One-time join tokens that have been issued but not yet consumed.
|
||||
pending_tokens: Arc<RwLock<HashMap<String, PendingToken>>>,
|
||||
/// Directory containing `projects.toml`, used for persisting agent data.
|
||||
pub config_dir: PathBuf,
|
||||
}
|
||||
|
||||
/// Load persisted agents from `<config_dir>/gateway_agents.json`.
|
||||
/// Returns an empty list if the file does not exist or cannot be parsed.
|
||||
fn load_agents(config_dir: &Path) -> Vec<JoinedAgent> {
|
||||
let path = config_dir.join("gateway_agents.json");
|
||||
match std::fs::read(&path) {
|
||||
Ok(data) => serde_json::from_slice(&data).unwrap_or_default(),
|
||||
Err(_) => Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Persist the current agent list to `<config_dir>/gateway_agents.json`.
|
||||
/// Silently ignores write errors (e.g. read-only filesystem or empty path).
|
||||
async fn save_agents(agents: &[JoinedAgent], config_dir: &Path) {
|
||||
if config_dir == Path::new("") {
|
||||
return;
|
||||
}
|
||||
let path = config_dir.join("gateway_agents.json");
|
||||
if let Ok(data) = serde_json::to_vec_pretty(agents) {
|
||||
let _ = tokio::fs::write(&path, data).await;
|
||||
}
|
||||
}
|
||||
|
||||
impl GatewayState {
|
||||
/// Create a new gateway state from a config. The first project in the config
|
||||
/// becomes the active project by default.
|
||||
pub fn new(config: GatewayConfig) -> Result<Self, String> {
|
||||
/// Create a new gateway state from a config and config directory.
|
||||
///
|
||||
/// The first project in the config becomes the active project by default.
|
||||
/// Previously registered agents are loaded from `gateway_agents.json` in
|
||||
/// `config_dir` if the file exists.
|
||||
pub fn new(config: GatewayConfig, config_dir: PathBuf) -> Result<Self, String> {
|
||||
if config.projects.is_empty() {
|
||||
return Err("projects.toml must define at least one project".to_string());
|
||||
}
|
||||
let first = config.projects.keys().next().unwrap().clone();
|
||||
let agents = load_agents(&config_dir);
|
||||
Ok(Self {
|
||||
config,
|
||||
active_project: Arc::new(RwLock::new(first)),
|
||||
client: Client::new(),
|
||||
joined_agents: Arc::new(RwLock::new(Vec::new())),
|
||||
joined_agents: Arc::new(RwLock::new(agents)),
|
||||
pending_tokens: Arc::new(RwLock::new(HashMap::new())),
|
||||
config_dir,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -622,6 +662,7 @@ pub async fn gateway_register_agent_handler(
|
||||
label: req.label,
|
||||
address: req.address,
|
||||
registered_at: chrono::Utc::now().timestamp() as f64,
|
||||
assigned_project: None,
|
||||
};
|
||||
|
||||
crate::slog!(
|
||||
@@ -630,7 +671,11 @@ pub async fn gateway_register_agent_handler(
|
||||
agent.id
|
||||
);
|
||||
|
||||
state.joined_agents.write().await.push(agent.clone());
|
||||
{
|
||||
let mut agents = state.joined_agents.write().await;
|
||||
agents.push(agent.clone());
|
||||
save_agents(&agents, &state.config_dir).await;
|
||||
}
|
||||
|
||||
let body = serde_json::to_vec(&agent).unwrap_or_default();
|
||||
Response::builder()
|
||||
@@ -656,11 +701,16 @@ pub async fn gateway_remove_agent_handler(
|
||||
PoemPath(id): PoemPath<String>,
|
||||
state: Data<&Arc<GatewayState>>,
|
||||
) -> Response {
|
||||
let mut agents = state.joined_agents.write().await;
|
||||
let before = agents.len();
|
||||
agents.retain(|a| a.id != id);
|
||||
let removed = agents.len() < before;
|
||||
drop(agents);
|
||||
let removed = {
|
||||
let mut agents = state.joined_agents.write().await;
|
||||
let before = agents.len();
|
||||
agents.retain(|a| a.id != id);
|
||||
let removed = agents.len() < before;
|
||||
if removed {
|
||||
save_agents(&agents, &state.config_dir).await;
|
||||
}
|
||||
removed
|
||||
};
|
||||
|
||||
if removed {
|
||||
crate::slog!("[gateway] Removed agent id={id}");
|
||||
@@ -674,6 +724,63 @@ pub async fn gateway_remove_agent_handler(
|
||||
}
|
||||
}
|
||||
|
||||
/// `POST /gateway/agents/:id/assign` — assign or unassign an agent to a project.
|
||||
///
|
||||
/// Body: `{ "project": "my-project" }` to assign, or `{ "project": null }` to unassign.
|
||||
/// Returns the updated `JoinedAgent` on success. The assignment is persisted to disk
|
||||
/// so it survives gateway restarts.
|
||||
#[handler]
|
||||
pub async fn gateway_assign_agent_handler(
|
||||
PoemPath(id): PoemPath<String>,
|
||||
body: Json<AssignAgentRequest>,
|
||||
state: Data<&Arc<GatewayState>>,
|
||||
) -> Response {
|
||||
let project = body
|
||||
.0
|
||||
.project
|
||||
.and_then(|p| if p.is_empty() { None } else { Some(p) });
|
||||
|
||||
if let Some(ref p) = project
|
||||
&& !state.config.projects.contains_key(p.as_str())
|
||||
{
|
||||
return Response::builder()
|
||||
.status(StatusCode::BAD_REQUEST)
|
||||
.body(Body::from(format!("unknown project '{p}'")));
|
||||
}
|
||||
|
||||
let updated = {
|
||||
let mut agents = state.joined_agents.write().await;
|
||||
match agents.iter_mut().find(|a| a.id == id) {
|
||||
None => None,
|
||||
Some(a) => {
|
||||
a.assigned_project = project;
|
||||
Some(a.clone())
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match updated {
|
||||
None => Response::builder()
|
||||
.status(StatusCode::NOT_FOUND)
|
||||
.body(Body::from("agent not found")),
|
||||
Some(agent) => {
|
||||
crate::slog!(
|
||||
"[gateway] Agent '{}' (id={}) assigned to {:?}",
|
||||
agent.label,
|
||||
agent.id,
|
||||
agent.assigned_project
|
||||
);
|
||||
let agents = state.joined_agents.read().await.clone();
|
||||
save_agents(&agents, &state.config_dir).await;
|
||||
let body = serde_json::to_vec(&agent).unwrap_or_default();
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header("Content-Type", "application/json")
|
||||
.body(Body::from(body))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Health aggregation endpoint ──────────────────────────────────────
|
||||
|
||||
/// HTTP GET `/health` handler for the gateway — aggregates health from all projects.
|
||||
@@ -948,8 +1055,14 @@ pub async fn gateway_switch_handler(
|
||||
|
||||
/// Start the gateway HTTP server. This is the entry point when `--gateway` is used.
|
||||
pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> {
|
||||
// Locate the gateway config directory (parent of `projects.toml`).
|
||||
let config_dir = config_path
|
||||
.parent()
|
||||
.unwrap_or(std::path::Path::new("."))
|
||||
.to_path_buf();
|
||||
|
||||
let config = GatewayConfig::load(config_path).map_err(std::io::Error::other)?;
|
||||
let state = GatewayState::new(config).map_err(std::io::Error::other)?;
|
||||
let state = GatewayState::new(config, config_dir.clone()).map_err(std::io::Error::other)?;
|
||||
let state_arc = Arc::new(state);
|
||||
|
||||
let active = state_arc.active_project.read().await.clone();
|
||||
@@ -965,12 +1078,6 @@ pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> {
|
||||
.join(", ")
|
||||
);
|
||||
|
||||
// Locate the gateway config directory (parent of `projects.toml`).
|
||||
let config_dir = config_path
|
||||
.parent()
|
||||
.unwrap_or(std::path::Path::new("."))
|
||||
.to_path_buf();
|
||||
|
||||
// Write `.mcp.json` so that the gateway's Matrix bot's Claude Code CLI
|
||||
// connects to this gateway's MCP endpoint (which proxies to the active project).
|
||||
if let Err(e) = write_gateway_mcp_json(&config_dir, port) {
|
||||
@@ -1010,6 +1117,10 @@ pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> {
|
||||
"/gateway/agents/:id",
|
||||
poem::delete(gateway_remove_agent_handler),
|
||||
)
|
||||
.at(
|
||||
"/gateway/agents/:id/assign",
|
||||
poem::post(gateway_assign_agent_handler),
|
||||
)
|
||||
// Serve the embedded React frontend so the gateway has a UI.
|
||||
.at(
|
||||
"/assets/*path",
|
||||
@@ -1127,7 +1238,7 @@ url = "http://localhost:3002"
|
||||
let config = GatewayConfig {
|
||||
projects: BTreeMap::new(),
|
||||
};
|
||||
assert!(GatewayState::new(config).is_err());
|
||||
assert!(GatewayState::new(config, PathBuf::new()).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1146,7 +1257,7 @@ url = "http://localhost:3002"
|
||||
},
|
||||
);
|
||||
let config = GatewayConfig { projects };
|
||||
let state = GatewayState::new(config).unwrap();
|
||||
let state = GatewayState::new(config, PathBuf::new()).unwrap();
|
||||
let active = state.active_project.blocking_read().clone();
|
||||
assert_eq!(active, "alpha"); // BTreeMap sorts alphabetically.
|
||||
}
|
||||
@@ -1179,7 +1290,7 @@ url = "http://localhost:3002"
|
||||
},
|
||||
);
|
||||
let config = GatewayConfig { projects };
|
||||
let state = GatewayState::new(config).unwrap();
|
||||
let state = GatewayState::new(config, PathBuf::new()).unwrap();
|
||||
|
||||
let params = json!({ "arguments": { "project": "beta" } });
|
||||
let resp = handle_switch_project(¶ms, &state).await;
|
||||
@@ -1199,7 +1310,7 @@ url = "http://localhost:3002"
|
||||
},
|
||||
);
|
||||
let config = GatewayConfig { projects };
|
||||
let state = GatewayState::new(config).unwrap();
|
||||
let state = GatewayState::new(config, PathBuf::new()).unwrap();
|
||||
|
||||
let params = json!({ "arguments": { "project": "nonexistent" } });
|
||||
let resp = handle_switch_project(¶ms, &state).await;
|
||||
@@ -1216,7 +1327,7 @@ url = "http://localhost:3002"
|
||||
},
|
||||
);
|
||||
let config = GatewayConfig { projects };
|
||||
let state = GatewayState::new(config).unwrap();
|
||||
let state = GatewayState::new(config, PathBuf::new()).unwrap();
|
||||
|
||||
let url = state.active_url().await.unwrap();
|
||||
assert_eq!(url, "http://my:3001");
|
||||
@@ -1352,7 +1463,7 @@ enabled = false
|
||||
},
|
||||
);
|
||||
let config = GatewayConfig { projects };
|
||||
Arc::new(GatewayState::new(config).unwrap())
|
||||
Arc::new(GatewayState::new(config, PathBuf::new()).unwrap())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1452,6 +1563,7 @@ enabled = false
|
||||
label: "agent-1".into(),
|
||||
address: "ws://a:3001/crdt-sync".into(),
|
||||
registered_at: 0.0,
|
||||
assigned_project: None,
|
||||
});
|
||||
let app = poem::Route::new()
|
||||
.at("/gateway/agents", poem::get(gateway_list_agents_handler))
|
||||
@@ -1472,6 +1584,7 @@ enabled = false
|
||||
label: "to-delete".into(),
|
||||
address: "ws://x:3001/crdt-sync".into(),
|
||||
registered_at: 0.0,
|
||||
assigned_project: None,
|
||||
});
|
||||
let app = poem::Route::new()
|
||||
.at(
|
||||
|
||||
Reference in New Issue
Block a user