huskies: merge 563_story_build_agent_join_mechanism_agents_register_with_the_gateway_via_token

This commit is contained in:
dave
2026-04-14 12:02:17 +00:00
parent efe434ede3
commit d0d2b17484
6 changed files with 871 additions and 2 deletions
+43
View File
@@ -36,10 +36,15 @@ const SCAN_INTERVAL_SECS: u64 = 15;
///
/// This function never returns under normal operation — it runs until the
/// process is terminated (SIGINT/SIGTERM).
///
/// If `join_token` and `gateway_url` are both provided the agent will register
/// itself with the gateway on startup using the one-time token.
pub async fn run(
project_root: Option<PathBuf>,
rendezvous_url: String,
port: u16,
join_token: Option<String>,
gateway_url: Option<String>,
) -> Result<(), std::io::Error> {
let project_root = match project_root {
Some(r) => r,
@@ -127,6 +132,14 @@ pub async fn run(
// Write initial heartbeat.
write_heartbeat(&rendezvous_url, port);
// Register with gateway if a join token and gateway URL were provided.
if let (Some(token), Some(url)) = (join_token, gateway_url) {
let node_id = crdt_state::our_node_id().unwrap_or_else(|| "unknown".to_string());
let label = format!("build-agent-{}", &node_id[..node_id.len().min(8)]);
let address = format!("ws://0.0.0.0:{port}/crdt-sync");
register_with_gateway(&url, &token, &label, &address).await;
}
// Reconcile any committed work from a previous session.
{
let recon_agents = Arc::clone(&agents);
@@ -427,6 +440,36 @@ fn push_feature_branch(worktree_path: &str, story_id: &str) -> Result<(), String
}
}
// ── Gateway registration ──────────────────────────────────────────────────
/// Register this build agent with a gateway using a one-time join token.
///
/// POSTs `{ token, label, address }` to `{gateway_url}/gateway/register`. On
/// success the gateway stores the agent and it will appear in the gateway UI.
async fn register_with_gateway(gateway_url: &str, token: &str, label: &str, address: &str) {
let client = reqwest::Client::new();
let url = format!("{}/gateway/register", gateway_url.trim_end_matches('/'));
let body = serde_json::json!({
"token": token,
"label": label,
"address": address,
});
match client.post(&url).json(&body).send().await {
Ok(resp) if resp.status().is_success() => {
slog!("[agent-mode] Registered with gateway at {gateway_url}");
}
Ok(resp) => {
slog!(
"[agent-mode] Gateway registration failed: HTTP {}",
resp.status()
);
}
Err(e) => {
slog!("[agent-mode] Gateway registration error: {e}");
}
}
}
// ── Tests ────────────────────────────────────────────────────────────────
#[cfg(test)]
+345
View File
@@ -9,15 +9,18 @@
use poem::EndpointExt;
use poem::handler;
use poem::http::StatusCode;
use poem::web::Path as PoemPath;
use poem::web::{Data, Json};
use poem::{Body, Request, Response};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
// Re-export active_project type alias for clarity in gateway bot helpers.
type ActiveProject = Arc<RwLock<String>>;
@@ -48,6 +51,35 @@ impl GatewayConfig {
}
}
// ── Agent join types ─────────────────────────────────────────────────
/// A build agent that has registered with this gateway.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JoinedAgent {
/// Unique ID assigned by the gateway on registration.
pub id: String,
/// Human-readable label provided by the agent (e.g. `build-agent-abc123`).
pub label: String,
/// The agent's CRDT-sync WebSocket address (e.g. `ws://host:3001/crdt-sync`).
pub address: String,
/// Unix timestamp when the agent registered.
pub registered_at: f64,
}
/// A one-time join token that has been generated but not yet consumed.
struct PendingToken {
#[allow(dead_code)]
created_at: f64,
}
/// Request body sent by a build agent when registering with the gateway.
#[derive(Deserialize)]
struct RegisterAgentRequest {
token: String,
label: String,
address: String,
}
// ── Gateway state ────────────────────────────────────────────────────
/// Shared gateway state threaded through HTTP handlers.
@@ -59,6 +91,10 @@ pub struct GatewayState {
pub active_project: Arc<RwLock<String>>,
/// HTTP client for proxying requests to project containers.
pub client: Client,
/// Build agents that have joined this gateway.
pub joined_agents: Arc<RwLock<Vec<JoinedAgent>>>,
/// One-time join tokens that have been issued but not yet consumed.
pending_tokens: Arc<RwLock<HashMap<String, PendingToken>>>,
}
impl GatewayState {
@@ -73,6 +109,8 @@ impl GatewayState {
config,
active_project: Arc::new(RwLock::new(first)),
client: Client::new(),
joined_agents: Arc::new(RwLock::new(Vec::new())),
pending_tokens: Arc::new(RwLock::new(HashMap::new())),
})
}
@@ -509,6 +547,133 @@ async fn handle_gateway_health(state: &GatewayState) -> JsonRpcResponse {
)
}
// ── Agent join handlers ───────────────────────────────────────────────
/// `GET /gateway/mode` — returns `{"mode":"gateway"}` so clients can detect gateway mode.
#[handler]
pub async fn gateway_mode_handler() -> Response {
let body = json!({ "mode": "gateway" });
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_vec(&body).unwrap_or_default()))
}
/// `POST /gateway/tokens` — generate a one-time join token for a build agent.
///
/// Returns `{"token": "<uuid>"}`. The token is valid until consumed by
/// `POST /gateway/register` or the process restarts.
#[handler]
pub async fn gateway_generate_token_handler(state: Data<&Arc<GatewayState>>) -> Response {
let token = Uuid::new_v4().to_string();
let now = chrono::Utc::now().timestamp() as f64;
state
.pending_tokens
.write()
.await
.insert(token.clone(), PendingToken { created_at: now });
crate::slog!("[gateway] Generated join token {:.8}…", &token);
let body = json!({ "token": token });
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(serde_json::to_vec(&body).unwrap_or_default()))
}
/// `POST /gateway/register` — build agent presents its join token and registers.
///
/// Expects JSON body: `{ "token": "...", "label": "...", "address": "..." }`.
/// On success returns the `JoinedAgent` record. The token is consumed immediately.
#[handler]
pub async fn gateway_register_agent_handler(
body: Body,
state: Data<&Arc<GatewayState>>,
) -> Response {
let bytes = match body.into_bytes().await {
Ok(b) => b,
Err(_) => {
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from("could not read request body"));
}
};
let req: RegisterAgentRequest = match serde_json::from_slice(&bytes) {
Ok(r) => r,
Err(_) => {
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from("invalid JSON body"));
}
};
// Validate and consume the token.
let mut tokens = state.pending_tokens.write().await;
if !tokens.contains_key(&req.token) {
return Response::builder()
.status(StatusCode::UNAUTHORIZED)
.body(Body::from("invalid or already-used join token"));
}
tokens.remove(&req.token);
drop(tokens);
let agent = JoinedAgent {
id: Uuid::new_v4().to_string(),
label: req.label,
address: req.address,
registered_at: chrono::Utc::now().timestamp() as f64,
};
crate::slog!(
"[gateway] Agent '{}' registered (id={})",
agent.label,
agent.id
);
state.joined_agents.write().await.push(agent.clone());
let body = serde_json::to_vec(&agent).unwrap_or_default();
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(body))
}
/// `GET /gateway/agents` — list all registered build agents.
#[handler]
pub async fn gateway_list_agents_handler(state: Data<&Arc<GatewayState>>) -> Response {
let agents = state.joined_agents.read().await.clone();
let body = serde_json::to_vec(&agents).unwrap_or_default();
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(body))
}
/// `DELETE /gateway/agents/:id` — remove a registered build agent.
#[handler]
pub async fn gateway_remove_agent_handler(
PoemPath(id): PoemPath<String>,
state: Data<&Arc<GatewayState>>,
) -> Response {
let mut agents = state.joined_agents.write().await;
let before = agents.len();
agents.retain(|a| a.id != id);
let removed = agents.len() < before;
drop(agents);
if removed {
crate::slog!("[gateway] Removed agent id={id}");
Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::empty())
} else {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("agent not found"))
}
}
// ── Health aggregation endpoint ──────────────────────────────────────
/// HTTP GET `/health` handler for the gateway — aggregates health from all projects.
@@ -830,6 +995,28 @@ pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> {
poem::post(gateway_mcp_post_handler).get(gateway_mcp_get_handler),
)
.at("/health", poem::get(gateway_health_handler))
// Agent join endpoints.
.at("/gateway/mode", poem::get(gateway_mode_handler))
.at(
"/gateway/tokens",
poem::post(gateway_generate_token_handler),
)
.at(
"/gateway/register",
poem::post(gateway_register_agent_handler),
)
.at("/gateway/agents", poem::get(gateway_list_agents_handler))
.at(
"/gateway/agents/:id",
poem::delete(gateway_remove_agent_handler),
)
// Serve the embedded React frontend so the gateway has a UI.
.at(
"/assets/*path",
poem::get(crate::http::assets::embedded_asset),
)
.at("/*path", poem::get(crate::http::assets::embedded_file))
.at("/", poem::get(crate::http::assets::embedded_index))
.data(state_arc);
let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
@@ -1153,4 +1340,162 @@ enabled = false
"disabled bot.toml must return None in gateway mode"
);
}
// ── Agent join mechanism tests ───────────────────────────────────────
fn make_test_state() -> Arc<GatewayState> {
let mut projects = BTreeMap::new();
projects.insert(
"test".into(),
ProjectEntry {
url: "http://test:3001".into(),
},
);
let config = GatewayConfig { projects };
Arc::new(GatewayState::new(config).unwrap())
}
#[tokio::test]
async fn generate_token_creates_pending_token() {
let state = make_test_state();
let app = poem::Route::new()
.at(
"/gateway/tokens",
poem::post(gateway_generate_token_handler),
)
.data(state.clone());
let cli = poem::test::TestClient::new(app);
let resp = cli.post("/gateway/tokens").send().await;
assert_eq!(resp.0.status(), StatusCode::OK);
let body: Value = resp.0.into_body().into_json().await.unwrap();
let token = body["token"].as_str().unwrap();
assert!(!token.is_empty());
let tokens = state.pending_tokens.read().await;
assert!(tokens.contains_key(token));
}
#[tokio::test]
async fn register_agent_consumes_token() {
let state = make_test_state();
// Insert a token manually.
let token = "test-token-123".to_string();
state.pending_tokens.write().await.insert(
token.clone(),
PendingToken {
created_at: chrono::Utc::now().timestamp() as f64,
},
);
let app = poem::Route::new()
.at(
"/gateway/register",
poem::post(gateway_register_agent_handler),
)
.data(state.clone());
let cli = poem::test::TestClient::new(app);
let resp = cli
.post("/gateway/register")
.header("Content-Type", "application/json")
.body(
json!({
"token": token,
"label": "test-agent",
"address": "ws://localhost:3001/crdt-sync"
})
.to_string(),
)
.send()
.await;
assert_eq!(resp.0.status(), StatusCode::OK);
// Token consumed.
assert!(state.pending_tokens.read().await.is_empty());
// Agent registered.
let agents = state.joined_agents.read().await;
assert_eq!(agents.len(), 1);
assert_eq!(agents[0].label, "test-agent");
}
#[tokio::test]
async fn register_agent_rejects_invalid_token() {
let state = make_test_state();
let app = poem::Route::new()
.at(
"/gateway/register",
poem::post(gateway_register_agent_handler),
)
.data(state.clone());
let cli = poem::test::TestClient::new(app);
let resp = cli
.post("/gateway/register")
.header("Content-Type", "application/json")
.body(
json!({
"token": "bad-token",
"label": "agent",
"address": "ws://localhost:3001/crdt-sync"
})
.to_string(),
)
.send()
.await;
assert_eq!(resp.0.status(), StatusCode::UNAUTHORIZED);
assert!(state.joined_agents.read().await.is_empty());
}
#[tokio::test]
async fn list_agents_returns_registered_agents() {
let state = make_test_state();
state.joined_agents.write().await.push(JoinedAgent {
id: "id-1".into(),
label: "agent-1".into(),
address: "ws://a:3001/crdt-sync".into(),
registered_at: 0.0,
});
let app = poem::Route::new()
.at("/gateway/agents", poem::get(gateway_list_agents_handler))
.data(state.clone());
let cli = poem::test::TestClient::new(app);
let resp = cli.get("/gateway/agents").send().await;
assert_eq!(resp.0.status(), StatusCode::OK);
let agents: Vec<Value> = resp.0.into_body().into_json().await.unwrap();
assert_eq!(agents.len(), 1);
assert_eq!(agents[0]["label"], "agent-1");
}
#[tokio::test]
async fn remove_agent_deletes_by_id() {
let state = make_test_state();
state.joined_agents.write().await.push(JoinedAgent {
id: "del-id".into(),
label: "to-delete".into(),
address: "ws://x:3001/crdt-sync".into(),
registered_at: 0.0,
});
let app = poem::Route::new()
.at(
"/gateway/agents/:id",
poem::delete(gateway_remove_agent_handler),
)
.data(state.clone());
let cli = poem::test::TestClient::new(app);
let resp = cli.delete("/gateway/agents/del-id").send().await;
assert_eq!(resp.0.status(), StatusCode::NO_CONTENT);
assert!(state.joined_agents.read().await.is_empty());
}
#[tokio::test]
async fn remove_agent_unknown_id_returns_not_found() {
let state = make_test_state();
let app = poem::Route::new()
.at(
"/gateway/agents/:id",
poem::delete(gateway_remove_agent_handler),
)
.data(state.clone());
let cli = poem::test::TestClient::new(app);
let resp = cli.delete("/gateway/agents/no-such-id").send().await;
assert_eq!(resp.0.status(), StatusCode::NOT_FOUND);
}
}
+100 -1
View File
@@ -56,6 +56,10 @@ struct CliArgs {
rendezvous: Option<String>,
/// Whether `--gateway` mode was requested (proxy MCP calls to per-project containers).
gateway: bool,
/// One-time join token for registering this build agent with a gateway (`--join-token`).
join_token: Option<String>,
/// HTTP URL of the gateway to register with when a join token is provided (`--gateway-url`).
gateway_url: Option<String>,
}
/// Parse CLI arguments into `CliArgs`, or exit early for `--help` / `--version`.
@@ -66,6 +70,8 @@ fn parse_cli_args(args: &[String]) -> Result<CliArgs, String> {
let mut agent = false;
let mut gateway = false;
let mut rendezvous: Option<String> = None;
let mut join_token: Option<String> = None;
let mut gateway_url: Option<String> = None;
let mut i = 0;
while i < args.len() {
@@ -106,6 +112,26 @@ fn parse_cli_args(args: &[String]) -> Result<CliArgs, String> {
let val = &a["--rendezvous=".len()..];
rendezvous = Some(val.to_string());
}
"--join-token" => {
i += 1;
if i >= args.len() {
return Err("--join-token requires a value".to_string());
}
join_token = Some(args[i].clone());
}
a if a.starts_with("--join-token=") => {
join_token = Some(a["--join-token=".len()..].to_string());
}
"--gateway-url" => {
i += 1;
if i >= args.len() {
return Err("--gateway-url requires a value".to_string());
}
gateway_url = Some(args[i].clone());
}
a if a.starts_with("--gateway-url=") => {
gateway_url = Some(a["--gateway-url=".len()..].to_string());
}
"--gateway" => {
gateway = true;
}
@@ -139,6 +165,8 @@ fn parse_cli_args(args: &[String]) -> Result<CliArgs, String> {
agent,
rendezvous,
gateway,
join_token,
gateway_url,
})
}
@@ -172,6 +200,11 @@ fn print_help() {
println!(
" (or cwd) and proxies MCP calls to per-project containers."
);
println!(" --join-token <TOKEN> One-time token for registering this build agent with a");
println!(" gateway. Also readable from HUSKIES_JOIN_TOKEN env var.");
println!(" --gateway-url <URL> HTTP URL of the gateway to register with when");
println!(" --join-token is provided (agent mode only).");
println!(" Also readable from HUSKIES_GATEWAY_URL env var.");
}
/// Resolve the optional positional path argument into an absolute `PathBuf`.
@@ -411,7 +444,16 @@ async fn main() -> Result<(), std::io::Error> {
if is_agent {
let agent_root = app_state.project_root.lock().unwrap().clone();
let rendezvous = agent_rendezvous.expect("agent mode requires --rendezvous");
return agent_mode::run(agent_root, rendezvous, port).await;
// Join token / gateway URL can come from CLI flags or environment variables.
let join_token = cli
.join_token
.clone()
.or_else(|| std::env::var("HUSKIES_JOIN_TOKEN").ok());
let agent_gateway_url = cli
.gateway_url
.clone()
.or_else(|| std::env::var("HUSKIES_GATEWAY_URL").ok());
return agent_mode::run(agent_root, rendezvous, port, join_token, agent_gateway_url).await;
}
let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default()));
@@ -1087,6 +1129,63 @@ name = "coder"
assert_eq!(result.path, Some("/my/project".to_string()));
}
#[test]
fn parse_join_token_flag() {
let args = vec![
"agent".to_string(),
"--rendezvous".to_string(),
"ws://host:3001/crdt-sync".to_string(),
"--join-token".to_string(),
"my-secret-token".to_string(),
];
let result = parse_cli_args(&args).unwrap();
assert_eq!(result.join_token, Some("my-secret-token".to_string()));
}
#[test]
fn parse_join_token_equals_syntax() {
let args = vec![
"agent".to_string(),
"--rendezvous".to_string(),
"ws://host:3001/crdt-sync".to_string(),
"--join-token=abc123".to_string(),
];
let result = parse_cli_args(&args).unwrap();
assert_eq!(result.join_token, Some("abc123".to_string()));
}
#[test]
fn parse_gateway_url_flag() {
let args = vec![
"agent".to_string(),
"--rendezvous".to_string(),
"ws://host:3001/crdt-sync".to_string(),
"--gateway-url".to_string(),
"http://gateway:3000".to_string(),
];
let result = parse_cli_args(&args).unwrap();
assert_eq!(result.gateway_url, Some("http://gateway:3000".to_string()));
}
#[test]
fn parse_join_token_missing_value_is_error() {
let args = vec!["--join-token".to_string()];
assert!(parse_cli_args(&args).is_err());
}
#[test]
fn parse_gateway_url_missing_value_is_error() {
let args = vec!["--gateway-url".to_string()];
assert!(parse_cli_args(&args).is_err());
}
#[test]
fn parse_no_args_join_token_and_gateway_url_are_none() {
let result = parse_cli_args(&[]).unwrap();
assert_eq!(result.join_token, None);
assert_eq!(result.gateway_url, None);
}
// ── resolve_path_arg ────────────────────────────────────────────
#[test]