From f63464852b5ae4ab5e7d0be00f6333b8290c0c5b Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 28 Apr 2026 15:31:29 +0000 Subject: [PATCH] huskies: merge 770 --- frontend/src/api/agents.test.ts | 40 ++------- frontend/src/api/agents.ts | 6 +- frontend/src/api/rpc.ts | 107 +++++++++++++++++++++++++ frontend/src/setupTests.ts | 2 +- server/src/crdt_sync/mod.rs | 1 + server/src/crdt_sync/rpc.rs | 36 ++++++++- server/src/http/agents/mod.rs | 30 ------- server/src/http/agents/tests.rs | 52 ------------ server/src/http/mod.rs | 23 ++++++ server/src/http/ws.rs | 42 ++++++++-- server/src/service/agents/io.rs | 7 -- server/src/service/agents/mod.rs | 55 ------------- server/src/service/agents/selection.rs | 77 ------------------ 13 files changed, 212 insertions(+), 266 deletions(-) create mode 100644 frontend/src/api/rpc.ts diff --git a/frontend/src/api/agents.test.ts b/frontend/src/api/agents.test.ts index ca86f6ad..c59011e6 100644 --- a/frontend/src/api/agents.test.ts +++ b/frontend/src/api/agents.test.ts @@ -132,38 +132,6 @@ describe("agentsApi", () => { }); }); - describe("listAgents", () => { - it("sends GET to /agents and returns agent list", async () => { - mockFetch.mockResolvedValueOnce(okResponse([sampleAgent])); - - const result = await agentsApi.listAgents(); - - expect(mockFetch).toHaveBeenCalledWith( - "/api/agents", - expect.objectContaining({}), - ); - expect(result).toEqual([sampleAgent]); - }); - - it("returns empty array when no agents running", async () => { - mockFetch.mockResolvedValueOnce(okResponse([])); - - const result = await agentsApi.listAgents(); - expect(result).toEqual([]); - }); - - it("uses custom baseUrl when provided", async () => { - mockFetch.mockResolvedValueOnce(okResponse([])); - - await agentsApi.listAgents("http://localhost:3002/api"); - - expect(mockFetch).toHaveBeenCalledWith( - "http://localhost:3002/api/agents", - expect.objectContaining({}), - ); - }); - }); - describe("getAgentConfig", () => { it("sends GET to /agents/config and returns config list", async () => { mockFetch.mockResolvedValueOnce(okResponse([sampleConfig])); @@ -216,15 +184,17 @@ describe("agentsApi", () => { describe("error handling", () => { it("throws on non-ok response with body text", async () => { - mockFetch.mockResolvedValueOnce(errorResponse(404, "agent not found")); + mockFetch.mockResolvedValueOnce(errorResponse(404, "config not found")); - await expect(agentsApi.listAgents()).rejects.toThrow("agent not found"); + await expect(agentsApi.getAgentConfig()).rejects.toThrow( + "config not found", + ); }); it("throws with status code when no body", async () => { mockFetch.mockResolvedValueOnce(errorResponse(500, "")); - await expect(agentsApi.listAgents()).rejects.toThrow( + await expect(agentsApi.getAgentConfig()).rejects.toThrow( "Request failed (500)", ); }); diff --git a/frontend/src/api/agents.ts b/frontend/src/api/agents.ts index ca1a1393..6d27d366 100644 --- a/frontend/src/api/agents.ts +++ b/frontend/src/api/agents.ts @@ -1,3 +1,5 @@ +import { rpcCall } from "./rpc"; + export type AgentStatusValue = "pending" | "running" | "completed" | "failed"; export interface AgentInfo { @@ -94,8 +96,8 @@ export const agentsApi = { ); }, - listAgents(baseUrl?: string) { - return requestJson("/agents", {}, baseUrl); + listAgents(_baseUrl?: string) { + return rpcCall("active_agents.list"); }, getAgentConfig(baseUrl?: string) { diff --git a/frontend/src/api/rpc.ts b/frontend/src/api/rpc.ts new file mode 100644 index 00000000..b2125eed --- /dev/null +++ b/frontend/src/api/rpc.ts @@ -0,0 +1,107 @@ +/** + * Lightweight read-RPC client over the `/ws` WebSocket. + * + * Opens a short-lived WebSocket, sends an `rpc_request` frame, waits for the + * matching `rpc_response`, then closes the connection. + */ + +let correlationCounter = 0; + +function nextCorrelationId(): string { + return `rpc-${Date.now()}-${++correlationCounter}`; +} + +/** + * Build the WebSocket URL for the `/ws` endpoint, deriving the protocol + * (ws/wss) and host from the current page location. + */ +function buildWsUrl(): string { + const proto = window.location.protocol === "https:" ? "wss:" : "ws:"; + return `${proto}//${window.location.host}/ws`; +} + +export interface RpcResponse { + ok: boolean; + result?: T; + error?: string; + code?: string; +} + +/** + * Send a read-RPC request over a temporary WebSocket connection and return + * the result. Rejects if the server responds with `ok: false` or if the + * connection times out. + */ +export function rpcCall( + method: string, + params: Record = {}, + timeoutMs = 5000, +): Promise { + return new Promise((resolve, reject) => { + const correlationId = nextCorrelationId(); + const ws = new WebSocket(buildWsUrl()); + let settled = false; + + const timer = setTimeout(() => { + if (!settled) { + settled = true; + ws.close(); + reject(new Error(`RPC timeout for ${method}`)); + } + }, timeoutMs); + + ws.onopen = () => { + ws.send( + JSON.stringify({ + kind: "rpc_request", + version: 1, + correlation_id: correlationId, + ttl_ms: timeoutMs, + method, + params, + }), + ); + }; + + ws.onmessage = (event) => { + try { + const data = JSON.parse(event.data); + // Only process rpc_response frames matching our correlation ID. + if ( + data.kind === "rpc_response" && + data.correlation_id === correlationId + ) { + settled = true; + clearTimeout(timer); + ws.close(); + if (data.ok) { + resolve(data.result as T); + } else { + reject( + new Error(data.error || `RPC error: ${data.code || "UNKNOWN"}`), + ); + } + } + // Ignore other messages (pipeline_state, onboarding_status, etc.) + } catch { + // Ignore non-JSON or unparseable messages + } + }; + + ws.onerror = () => { + if (!settled) { + settled = true; + clearTimeout(timer); + reject(new Error(`WebSocket error during RPC call to ${method}`)); + } + }; + + ws.onclose = () => { + if (!settled) { + settled = true; + clearTimeout(timer); + reject(new Error(`WebSocket closed before RPC response for ${method}`)); + } + }; + }); +} diff --git a/frontend/src/setupTests.ts b/frontend/src/setupTests.ts index 0f1a560a..1c857b4a 100644 --- a/frontend/src/setupTests.ts +++ b/frontend/src/setupTests.ts @@ -10,7 +10,7 @@ beforeEach(() => { vi.fn((input: string | URL | Request) => { const url = typeof input === "string" ? input : input.toString(); // Endpoints that return arrays need [] not {} to avoid "not iterable" errors. - const arrayEndpoints = ["/agents", "/agents/config"]; + const arrayEndpoints = ["/agents/config"]; const body = arrayEndpoints.some((ep) => url.endsWith(ep)) ? JSON.stringify([]) : JSON.stringify({}); diff --git a/server/src/crdt_sync/mod.rs b/server/src/crdt_sync/mod.rs index 9d129805..eb99d87b 100644 --- a/server/src/crdt_sync/mod.rs +++ b/server/src/crdt_sync/mod.rs @@ -69,6 +69,7 @@ mod wire; pub use auth::{add_join_token, init_token_auth, init_trusted_keys}; pub(crate) use client::connect_and_sync; pub use client::{RENDEZVOUS_ERROR_THRESHOLD, spawn_rendezvous_client}; +pub(crate) use rpc::try_handle_rpc_text; pub use server::crdt_sync_handler; // Test-only re-export used by `crdt_snapshot` tests. diff --git a/server/src/crdt_sync/rpc.rs b/server/src/crdt_sync/rpc.rs index f1642659..0b373d16 100644 --- a/server/src/crdt_sync/rpc.rs +++ b/server/src/crdt_sync/rpc.rs @@ -29,7 +29,10 @@ pub(super) type Handler = fn(Value) -> Value; /// /// Add new handlers here. The registry is a plain slice — linear scan is /// fine for the small number of methods expected. -static HANDLERS: &[(&str, Handler)] = &[("health.check", handle_health_check)]; +static HANDLERS: &[(&str, Handler)] = &[ + ("health.check", handle_health_check), + ("active_agents.list", handle_active_agents_list), +]; /// Handler for the `health.check` method. /// @@ -39,6 +42,35 @@ fn handle_health_check(_params: Value) -> Value { serde_json::json!({"status": "ok"}) } +/// Handler for the `active_agents.list` method. +/// +/// Reads the `active_agents` collection from the CRDT and returns an array +/// matching the shape formerly served by `GET /api/agents`. Each entry +/// contains `story_id`, `agent_name`, `status`, `session_id`, and +/// `worktree_path`. +fn handle_active_agents_list(_params: Value) -> Value { + let entries = crate::crdt_state::read_all_active_agents().unwrap_or_default(); + let list: Vec = entries + .into_iter() + .map(|view| { + // agent_id is the composite key "story_id:agent_name". + let (story_id, agent_name) = view + .agent_id + .rsplit_once(':') + .map(|(s, a)| (s.to_string(), a.to_string())) + .unwrap_or_else(|| (view.story_id.unwrap_or_default(), view.agent_id.clone())); + serde_json::json!({ + "story_id": story_id, + "agent_name": agent_name, + "status": "running", + "session_id": null, + "worktree_path": null, + }) + }) + .collect(); + Value::Array(list) +} + /// Dispatch an incoming RPC method call to the registered handler. /// /// Returns `Ok(result)` on success or `Err("NOT_FOUND")` if no handler is @@ -57,7 +89,7 @@ pub(super) fn dispatch(method: &str, params: Value) -> Result Option { +pub(crate) fn try_handle_rpc_text(text: &str) -> Option { let frame: RpcFrame = serde_json::from_str(text).ok()?; match frame { RpcFrame::RpcRequest { diff --git a/server/src/http/agents/mod.rs b/server/src/http/agents/mod.rs index 9d9fd9b0..c94d51c3 100644 --- a/server/src/http/agents/mod.rs +++ b/server/src/http/agents/mod.rs @@ -262,36 +262,6 @@ impl AgentsApi { Ok(Json(true)) } - /// List all agents with their status. - /// - /// Agents for stories that have been completed (`work/5_done/` or `work/6_archived/`) are - /// excluded so the agents panel is not cluttered with old completed items - /// on frontend startup. - #[oai(path = "/agents", method = "get")] - async fn list_agents(&self) -> OpenApiResult>> { - let project_root = self - .ctx - .services - .agents - .get_project_root(&self.ctx.state) - .ok(); - let agents = svc::list_agents(&self.ctx.services.agents, project_root.as_deref()) - .map_err(map_svc_error)?; - - Ok(Json( - agents - .into_iter() - .map(|info| AgentInfoResponse { - story_id: info.story_id, - agent_name: info.agent_name, - status: info.status.to_string(), - session_id: info.session_id, - worktree_path: info.worktree_path, - }) - .collect(), - )) - } - /// Get the configured agent roster from project.toml. #[oai(path = "/agents/config", method = "get")] async fn get_agent_config(&self) -> OpenApiResult>> { diff --git a/server/src/http/agents/tests.rs b/server/src/http/agents/tests.rs index 7994bae4..051b8569 100644 --- a/server/src/http/agents/tests.rs +++ b/server/src/http/agents/tests.rs @@ -42,58 +42,6 @@ fn story_is_archived_true_when_file_in_6_archived() { assert!(svc::is_archived(&root, "79_story_foo")); } -#[tokio::test] -async fn list_agents_excludes_archived_stories() { - let tmp = TempDir::new().unwrap(); - let root = make_work_dirs(&tmp); - - // Place an archived story file in 6_archived - std::fs::write( - root.join(".huskies/work/6_archived/79_story_archived.md"), - "---\nname: archived story\n---\n", - ) - .unwrap(); - - let ctx = AppContext::new_test(root); - // Inject an agent for the archived story (completed) and one for an active story - ctx.services - .agents - .inject_test_agent("79_story_archived", "coder-1", AgentStatus::Completed); - ctx.services - .agents - .inject_test_agent("80_story_active", "coder-1", AgentStatus::Running); - - let api = AgentsApi { ctx: Arc::new(ctx) }; - let result = api.list_agents().await.unwrap().0; - - // Archived story's agent should not appear - assert!( - !result.iter().any(|a| a.story_id == "79_story_archived"), - "archived story agent should be excluded from list_agents" - ); - // Active story's agent should still appear - assert!( - result.iter().any(|a| a.story_id == "80_story_active"), - "active story agent should be included in list_agents" - ); -} - -#[tokio::test] -async fn list_agents_includes_all_when_no_project_root() { - // When no project root is configured, all agents are returned (safe default). - let tmp = TempDir::new().unwrap(); - let ctx = AppContext::new_test(tmp.path().to_path_buf()); - // Clear the project_root so get_project_root returns Err - *ctx.state.project_root.lock().unwrap() = None; - ctx.services - .agents - .inject_test_agent("42_story_whatever", "coder-1", AgentStatus::Completed); - - let api = AgentsApi { ctx: Arc::new(ctx) }; - let result = api.list_agents().await.unwrap().0; - assert!(result.iter().any(|a| a.story_id == "42_story_whatever")); -} - fn make_project_toml(root: &path::Path, content: &str) { let sk_dir = root.join(".huskies"); std::fs::create_dir_all(&sk_dir).unwrap(); diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index d2e2ee32..12a137d5 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -82,6 +82,7 @@ pub fn build_routes( .nest("/docs", docs_service.swagger_ui()) .at("/ws", get(ws::ws_handler)) .at("/crdt-sync", get(crate::crdt_sync::crdt_sync_handler)) + .at("/rpc", post(rpc_http_handler)) .at( "/agents/:story_id/:agent_name/stream", get(agents_sse::agent_stream), @@ -133,6 +134,28 @@ pub fn build_routes( route.data(ctx_arc) } +/// HTTP bridge for the read-RPC protocol. +/// +/// Accepts a JSON [`RpcFrame::RpcRequest`] body and returns the corresponding +/// [`RpcFrame::RpcResponse`]. This allows HTTP clients (e.g. the frontend) to +/// call read-RPC methods without maintaining a `/crdt-sync` WebSocket connection. +#[poem::handler] +pub async fn rpc_http_handler(body: poem::web::Json) -> poem::Response { + let text = serde_json::to_string(&body.0).unwrap_or_default(); + match crate::crdt_sync::try_handle_rpc_text(&text) { + Some(response) => { + let json = serde_json::to_string(&response).unwrap_or_default(); + poem::Response::builder() + .status(poem::http::StatusCode::OK) + .header(poem::http::header::CONTENT_TYPE, "application/json") + .body(json) + } + None => poem::Response::builder() + .status(poem::http::StatusCode::BAD_REQUEST) + .body("Invalid RPC request"), + } +} + /// Debug HTTP endpoint: `GET /debug/crdt[?story_id=]` /// /// Returns the raw in-memory CRDT state as JSON. Accepts an optional diff --git a/server/src/http/ws.rs b/server/src/http/ws.rs index c7292168..d0e6f83d 100644 --- a/server/src/http/ws.rs +++ b/server/src/http/ws.rs @@ -29,13 +29,30 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem ws.on_upgrade(move |socket| async move { let (mut sink, mut stream) = socket.split(); let (tx, mut rx) = mpsc::unbounded_channel::(); + // Separate channel for pre-serialized messages (e.g. RPC responses). + let (raw_tx, mut raw_rx) = mpsc::unbounded_channel::(); let forward = tokio::spawn(async move { - while let Some(msg) = rx.recv().await { - if let Ok(text) = serde_json::to_string(&msg) - && sink.send(WsMessage::Text(text)).await.is_err() - { - break; + loop { + tokio::select! { + msg = rx.recv() => match msg { + Some(msg) => { + if let Ok(text) = serde_json::to_string(&msg) + && sink.send(WsMessage::Text(text)).await.is_err() + { + break; + } + } + None => break, + }, + raw = raw_rx.recv() => match raw { + Some(text) => { + if sink.send(WsMessage::Text(text)).await.is_err() { + break; + } + } + None => break, + }, } } }); @@ -79,6 +96,14 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem break; }; + // Handle read-RPC frames (discriminated by "kind", not "type"). + if let Some(rpc_resp) = crate::crdt_sync::try_handle_rpc_text(&text) { + if let Ok(resp_text) = serde_json::to_string(&rpc_resp) { + let _ = raw_tx.send(resp_text); + } + continue; + } + match ws::dispatch_outer(&text) { ws::DispatchResult::StartChat { messages, config } => { let tx_updates = tx.clone(); @@ -134,6 +159,13 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem } Some(Ok(WsMessage::Text(inner_text))) = stream.next() => { + // Handle read-RPC frames during active chat. + if let Some(rpc_resp) = crate::crdt_sync::try_handle_rpc_text(&inner_text) { + if let Ok(resp_text) = serde_json::to_string(&rpc_resp) { + let _ = raw_tx.send(resp_text); + } + continue; + } match ws::dispatch_inner(&inner_text, &mut pending_perms) { ws::InnerDispatchResult::CancelChat => { let _ = chat::cancel_chat(&ctx.state); diff --git a/server/src/service/agents/io.rs b/server/src/service/agents/io.rs index 24fc777c..25acc1e2 100644 --- a/server/src/service/agents/io.rs +++ b/server/src/service/agents/io.rs @@ -104,13 +104,6 @@ pub mod test_helpers { std::fs::create_dir_all(tmp.path().join(".huskies")).unwrap(); } - /// Create the `5_done` and `6_archived` work-stage directories. - pub fn make_work_dirs(tmp: &TempDir) { - for stage in &["5_done", "6_archived"] { - std::fs::create_dir_all(tmp.path().join(".huskies").join("work").join(stage)).unwrap(); - } - } - /// Create all six pipeline stage directories under `.huskies/work/`. pub fn make_stage_dirs(tmp: &TempDir) { for stage in &[ diff --git a/server/src/service/agents/mod.rs b/server/src/service/agents/mod.rs index 9f461862..db5280fc 100644 --- a/server/src/service/agents/mod.rs +++ b/server/src/service/agents/mod.rs @@ -113,20 +113,6 @@ pub async fn stop_agent( .map_err(Error::AgentNotFound) } -/// List all agents, optionally filtering out those belonging to archived stories. -/// -/// When `project_root` is `None` the archive filter is skipped and all agents -/// are returned (safe default when the server is not yet fully configured). -pub fn list_agents(pool: &AgentPool, project_root: Option<&Path>) -> Result, Error> { - let agents = pool.list_agents().map_err(Error::Io)?; - match project_root { - Some(root) => Ok(selection::filter_non_archived(agents, |id| { - io::is_archived(root, id) - })), - None => Ok(agents), - } -} - /// Create a git worktree for a story. pub async fn create_worktree( pool: &AgentPool, @@ -289,50 +275,9 @@ fn config_to_entries(config: &ProjectConfig) -> Vec { #[cfg(test)] mod tests { use super::*; - use crate::agents::AgentStatus; use io::test_helpers::*; - use std::sync::Arc; use tempfile::TempDir; - fn make_pool(tmp: &TempDir) -> Arc { - let (tx, _) = tokio::sync::broadcast::channel(64); - let pool = AgentPool::new(3001, tx); - let state = crate::state::SessionState::default(); - *state.project_root.lock().unwrap() = Some(tmp.path().to_path_buf()); - Arc::new(pool) - } - - // ── list_agents ─────────────────────────────────────────────────────────── - - #[tokio::test] - async fn list_agents_excludes_archived_stories() { - let tmp = TempDir::new().unwrap(); - make_work_dirs(&tmp); - write_story_file( - &tmp, - ".huskies/work/6_archived/79_story_archived.md", - "---\nname: archived\n---\n", - ); - - let pool = make_pool(&tmp); - pool.inject_test_agent("79_story_archived", "coder-1", AgentStatus::Completed); - pool.inject_test_agent("80_story_active", "coder-1", AgentStatus::Running); - - let agents = list_agents(&pool, Some(tmp.path())).unwrap(); - assert!(!agents.iter().any(|a| a.story_id == "79_story_archived")); - assert!(agents.iter().any(|a| a.story_id == "80_story_active")); - } - - #[tokio::test] - async fn list_agents_includes_all_when_no_project_root() { - let tmp = TempDir::new().unwrap(); - let pool = make_pool(&tmp); - pool.inject_test_agent("42_story_whatever", "coder-1", AgentStatus::Completed); - - let agents = list_agents(&pool, None).unwrap(); - assert!(agents.iter().any(|a| a.story_id == "42_story_whatever")); - } - // ── get_agent_config ────────────────────────────────────────────────────── #[test] diff --git a/server/src/service/agents/selection.rs b/server/src/service/agents/selection.rs index 0b9d9cc1..c6e4215b 100644 --- a/server/src/service/agents/selection.rs +++ b/server/src/service/agents/selection.rs @@ -4,22 +4,6 @@ //! return a result without touching the filesystem, network, or any mutable //! global state. This makes them fast to test without tempdirs or async runtimes. use crate::agent_log::LogEntry; -use crate::agents::AgentInfo; - -/// Filter a list of agents, removing any whose story is archived. -/// -/// `is_archived` is a predicate injected by the caller — typically a closure -/// over the project root that calls `io::is_archived`. This keeps the function -/// pure: it never touches the filesystem itself. -pub fn filter_non_archived(agents: Vec, is_archived: F) -> Vec -where - F: Fn(&str) -> bool, -{ - agents - .into_iter() - .filter(|info| !is_archived(&info.story_id)) - .collect() -} /// Concatenate the text of all `output` events from an agent log. /// @@ -42,22 +26,6 @@ pub fn collect_output_text(entries: &[LogEntry]) -> String { #[cfg(test)] mod tests { use super::*; - use crate::agents::AgentStatus; - - fn make_agent(story_id: &str) -> AgentInfo { - AgentInfo { - story_id: story_id.to_string(), - agent_name: "coder-1".to_string(), - status: AgentStatus::Running, - session_id: None, - worktree_path: None, - base_branch: None, - completion: None, - log_session_id: None, - throttled: false, - termination_reason: None, - } - } fn make_log_entry(event_type: &str, text: Option<&str>) -> LogEntry { let mut obj = serde_json::Map::new(); @@ -74,51 +42,6 @@ mod tests { } } - // ── filter_non_archived ─────────────────────────────────────────────────── - - #[test] - fn filter_keeps_non_archived_agents() { - let agents = vec![make_agent("10_active"), make_agent("11_active")]; - let result = filter_non_archived(agents, |_| false); - assert_eq!(result.len(), 2); - } - - #[test] - fn filter_removes_archived_agents() { - let agents = vec![make_agent("10_archived"), make_agent("11_active")]; - let result = filter_non_archived(agents, |id| id == "10_archived"); - assert_eq!(result.len(), 1); - assert_eq!(result[0].story_id, "11_active"); - } - - #[test] - fn filter_removes_all_when_all_archived() { - let agents = vec![make_agent("10_a"), make_agent("11_b")]; - let result = filter_non_archived(agents, |_| true); - assert!(result.is_empty()); - } - - #[test] - fn filter_returns_empty_for_empty_input() { - let result = filter_non_archived(vec![], |_| false); - assert!(result.is_empty()); - } - - #[test] - fn filter_preserves_order() { - let agents = vec![ - make_agent("1_a"), - make_agent("2_b"), - make_agent("3_c"), - make_agent("4_d"), - ]; - let result = filter_non_archived(agents, |id| id == "2_b"); - assert_eq!(result.len(), 3); - assert_eq!(result[0].story_id, "1_a"); - assert_eq!(result[1].story_id, "3_c"); - assert_eq!(result[2].story_id, "4_d"); - } - // ── collect_output_text ─────────────────────────────────────────────────── #[test]