From f2943c7e69db491b857a918d3b945f8328749281 Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 13 May 2026 04:43:48 +0000 Subject: [PATCH] huskies: merge 948 --- .../api/__test_utils__/mockRpcWebSocket.ts | 151 ++++ frontend/src/api/agents.test.ts | 37 +- frontend/src/api/agents.ts | 15 +- frontend/src/api/bot_config.ts | 9 +- frontend/src/api/client.test.ts | 31 +- frontend/src/api/client/http.ts | 75 +- frontend/src/api/rpc.ts | 133 +++- frontend/src/api/settings.test.ts | 75 +- frontend/src/api/settings.ts | 13 +- server/src/crdt_sync/client.rs | 2 +- server/src/crdt_sync/mod.rs | 1 + server/src/crdt_sync/rpc.rs | 644 ++++++++++++++++-- server/src/crdt_sync/server/mod.rs | 2 +- server/src/http/mod.rs | 2 +- server/src/http/ws/mod.rs | 4 +- server/src/main.rs | 6 +- 16 files changed, 995 insertions(+), 205 deletions(-) create mode 100644 frontend/src/api/__test_utils__/mockRpcWebSocket.ts diff --git a/frontend/src/api/__test_utils__/mockRpcWebSocket.ts b/frontend/src/api/__test_utils__/mockRpcWebSocket.ts new file mode 100644 index 00000000..7d95c101 --- /dev/null +++ b/frontend/src/api/__test_utils__/mockRpcWebSocket.ts @@ -0,0 +1,151 @@ +/** + * Test helpers for stubbing the WebSocket used by `rpcCall`. + * + * `rpcCall` opens a transient WebSocket, sends an `rpc_request` frame, and + * resolves once the matching `rpc_response` arrives. `installRpcMock` + * installs a `WebSocket` global that records sent frames and replies with + * canned responses keyed by RPC method name. + */ + +import { vi } from "vitest"; + +interface MockSocket { + url: string; + sent: string[]; + onopen: ((ev: Event) => void) | null; + onmessage: ((ev: { data: string }) => void) | null; + onerror: ((ev: Event) => void) | null; + onclose: ((ev: CloseEvent) => void) | null; + readyState: number; + send(data: string): void; + close(): void; +} + +/** + * Test handle returned by `installMockRpcWebSocket`: records sockets and calls, + * lets the test register canned responses (or override responses for specific + * methods), and restores the real `WebSocket` constructor on cleanup. + */ +export interface MockRpcInstaller { + /** All sockets created during the test, in order. */ + instances: MockSocket[]; + /** All RPC method names that were called. */ + calls: { method: string; params: Record }[]; + /** + * Register a result to be returned for `method`. If the value is a + * function, it is invoked with the request params and its return value + * (or the resolved promise) is used as the result. + */ + respond(method: string, result: unknown): void; + /** Make `method` reply with an `ok:false` response. */ + respondError(method: string, error: string, code?: string): void; +} + +/** + * Install a stub `WebSocket` global that synchronously resolves RPC calls + * with results registered via the returned [`MockRpcInstaller`]. + */ +export function installRpcMock(): MockRpcInstaller { + const instances: MockSocket[] = []; + const calls: { method: string; params: Record }[] = []; + const results = new Map(); + const errors = new Map(); + + class MockWebSocket implements MockSocket { + static readonly CONNECTING = 0; + static readonly OPEN = 1; + static readonly CLOSING = 2; + static readonly CLOSED = 3; + + url: string; + sent: string[] = []; + onopen: ((ev: Event) => void) | null = null; + onmessage: ((ev: { data: string }) => void) | null = null; + onerror: ((ev: Event) => void) | null = null; + onclose: ((ev: CloseEvent) => void) | null = null; + readyState = 0; + + constructor(url: string) { + this.url = url; + instances.push(this); + queueMicrotask(() => { + this.readyState = 1; + this.onopen?.(new Event("open")); + }); + } + + send(data: string) { + this.sent.push(data); + let frame: { + correlation_id?: string; + method?: string; + params?: Record; + }; + try { + frame = JSON.parse(data); + } catch { + return; + } + const { correlation_id, method, params } = frame; + if (!correlation_id || !method) return; + calls.push({ method, params: params ?? {} }); + queueMicrotask(() => { + const err = errors.get(method); + if (err) { + this.onmessage?.({ + data: JSON.stringify({ + kind: "rpc_response", + version: 1, + correlation_id, + ok: false, + error: err.error, + code: err.code, + }), + }); + return; + } + if (results.has(method)) { + this.onmessage?.({ + data: JSON.stringify({ + kind: "rpc_response", + version: 1, + correlation_id, + ok: true, + result: results.get(method), + }), + }); + return; + } + // No registered response — synthesise NOT_FOUND so the test fails + // loudly instead of timing out. + this.onmessage?.({ + data: JSON.stringify({ + kind: "rpc_response", + version: 1, + correlation_id, + ok: false, + error: `no mock for ${method}`, + code: "NOT_FOUND", + }), + }); + }); + } + + close() { + this.readyState = 3; + } + } + + vi.stubGlobal("WebSocket", MockWebSocket); + + return { + instances, + calls, + respond(method, result) { + results.set(method, result); + }, + respondError(method, error, code) { + errors.set(method, { error, code }); + }, + }; +} diff --git a/frontend/src/api/agents.test.ts b/frontend/src/api/agents.test.ts index c59011e6..f7848ddd 100644 --- a/frontend/src/api/agents.test.ts +++ b/frontend/src/api/agents.test.ts @@ -1,6 +1,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { AgentConfigInfo, AgentEvent, AgentInfo } from "./agents"; import { agentsApi, subscribeAgentStream } from "./agents"; +import { installRpcMock } from "./__test_utils__/mockRpcWebSocket"; const mockFetch = vi.fn(); @@ -133,26 +134,24 @@ describe("agentsApi", () => { }); describe("getAgentConfig", () => { - it("sends GET to /agents/config and returns config list", async () => { - mockFetch.mockResolvedValueOnce(okResponse([sampleConfig])); + it("dispatches an agent_config.list RPC and returns the config list", async () => { + const rpc = installRpcMock(); + rpc.respond("agent_config.list", [sampleConfig]); const result = await agentsApi.getAgentConfig(); - expect(mockFetch).toHaveBeenCalledWith( - "/api/agents/config", - expect.objectContaining({}), - ); + expect(rpc.calls).toEqual([ + { method: "agent_config.list", params: {} }, + ]); expect(result).toEqual([sampleConfig]); }); - it("uses custom baseUrl when provided", async () => { - mockFetch.mockResolvedValueOnce(okResponse([sampleConfig])); + it("surfaces RPC errors visibly", async () => { + const rpc = installRpcMock(); + rpc.respondError("agent_config.list", "config not found", "NOT_FOUND"); - await agentsApi.getAgentConfig("http://localhost:3002/api"); - - expect(mockFetch).toHaveBeenCalledWith( - "http://localhost:3002/api/agents/config", - expect.objectContaining({}), + await expect(agentsApi.getAgentConfig()).rejects.toThrow( + "config not found", ); }); }); @@ -183,18 +182,18 @@ describe("agentsApi", () => { }); describe("error handling", () => { - it("throws on non-ok response with body text", async () => { - mockFetch.mockResolvedValueOnce(errorResponse(404, "config not found")); + it("throws on non-ok HTTP response from startAgent", async () => { + mockFetch.mockResolvedValueOnce(errorResponse(404, "story not found")); - await expect(agentsApi.getAgentConfig()).rejects.toThrow( - "config not found", + await expect(agentsApi.startAgent("missing_story")).rejects.toThrow( + "story not found", ); }); - it("throws with status code when no body", async () => { + it("throws with status code from startAgent when body is empty", async () => { mockFetch.mockResolvedValueOnce(errorResponse(500, "")); - await expect(agentsApi.getAgentConfig()).rejects.toThrow( + await expect(agentsApi.startAgent("missing_story")).rejects.toThrow( "Request failed (500)", ); }); diff --git a/frontend/src/api/agents.ts b/frontend/src/api/agents.ts index 6d27d366..12d67224 100644 --- a/frontend/src/api/agents.ts +++ b/frontend/src/api/agents.ts @@ -100,8 +100,8 @@ export const agentsApi = { return rpcCall("active_agents.list"); }, - getAgentConfig(baseUrl?: string) { - return requestJson("/agents/config", {}, baseUrl); + getAgentConfig(_baseUrl?: string) { + return rpcCall("agent_config.list"); }, reloadConfig(baseUrl?: string) { @@ -112,12 +112,11 @@ export const agentsApi = { ); }, - getAgentOutput(storyId: string, agentName: string, baseUrl?: string) { - return requestJson<{ output: string }>( - `/agents/${encodeURIComponent(storyId)}/${encodeURIComponent(agentName)}/output`, - {}, - baseUrl, - ); + getAgentOutput(storyId: string, agentName: string, _baseUrl?: string) { + return rpcCall<{ output: string }>("agents.get_output", { + story_id: storyId, + agent_name: agentName, + }); }, }; diff --git a/frontend/src/api/bot_config.ts b/frontend/src/api/bot_config.ts index 1dff6c84..afe699b9 100644 --- a/frontend/src/api/bot_config.ts +++ b/frontend/src/api/bot_config.ts @@ -1,3 +1,8 @@ +/** + * WS-RPC client for chat-bot transport config (Matrix / Slack / WhatsApp). + */ +import { rpcCall } from "./rpc"; + export interface BotConfig { transport: string | null; enabled: boolean | null; @@ -29,8 +34,8 @@ async function requestJson( } export const botConfigApi = { - getConfig(baseUrl?: string): Promise { - return requestJson("/bot/config", {}, baseUrl); + getConfig(_baseUrl?: string): Promise { + return rpcCall("bot_config.get"); }, saveConfig(config: BotConfig, baseUrl?: string): Promise { diff --git a/frontend/src/api/client.test.ts b/frontend/src/api/client.test.ts index dbd2ff5f..4ae49ea5 100644 --- a/frontend/src/api/client.test.ts +++ b/frontend/src/api/client.test.ts @@ -1,5 +1,6 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { api, ChatWebSocket, resolveWsHost } from "./client"; +import { installRpcMock } from "./__test_utils__/mockRpcWebSocket"; const mockFetch = vi.fn(); @@ -24,20 +25,19 @@ function errorResponse(status: number, text: string) { describe("api client", () => { describe("getCurrentProject", () => { - it("sends GET to /project", async () => { - mockFetch.mockResolvedValueOnce(okResponse("/home/user/project")); + it("dispatches project.current RPC and returns the path", async () => { + const rpc = installRpcMock(); + rpc.respond("project.current", "/home/user/project"); const result = await api.getCurrentProject(); - expect(mockFetch).toHaveBeenCalledWith( - "/api/project", - expect.objectContaining({}), - ); + expect(rpc.calls).toEqual([{ method: "project.current", params: {} }]); expect(result).toBe("/home/user/project"); }); it("returns null when no project open", async () => { - mockFetch.mockResolvedValueOnce(okResponse(null)); + const rpc = installRpcMock(); + rpc.respond("project.current", null); const result = await api.getCurrentProject(); expect(result).toBeNull(); @@ -74,25 +74,28 @@ describe("api client", () => { }); describe("getKnownProjects", () => { - it("returns array of project paths", async () => { - mockFetch.mockResolvedValueOnce(okResponse(["/a", "/b"])); + it("dispatches project.known RPC and returns the path list", async () => { + const rpc = installRpcMock(); + rpc.respond("project.known", ["/a", "/b"]); const result = await api.getKnownProjects(); + expect(rpc.calls).toEqual([{ method: "project.known", params: {} }]); expect(result).toEqual(["/a", "/b"]); }); }); describe("error handling", () => { - it("throws on non-ok response with body text", async () => { - mockFetch.mockResolvedValueOnce(errorResponse(404, "Not found")); + it("surfaces RPC errors visibly", async () => { + const rpc = installRpcMock(); + rpc.respondError("project.current", "store offline", "INTERNAL"); - await expect(api.getCurrentProject()).rejects.toThrow("Not found"); + await expect(api.getCurrentProject()).rejects.toThrow("store offline"); }); - it("throws with status code when no body", async () => { + it("throws on non-ok HTTP response for legacy POST endpoints", async () => { mockFetch.mockResolvedValueOnce(errorResponse(500, "")); - await expect(api.getCurrentProject()).rejects.toThrow( + await expect(api.openProject("/some/path")).rejects.toThrow( "Request failed (500)", ); }); diff --git a/frontend/src/api/client/http.ts b/frontend/src/api/client/http.ts index aacc172f..082506ca 100644 --- a/frontend/src/api/client/http.ts +++ b/frontend/src/api/client/http.ts @@ -5,6 +5,7 @@ * object exposing all REST endpoints. */ +import { rpcCall } from "../rpc"; import type { AllTokenUsageResponse, AnthropicModelInfo, @@ -87,11 +88,11 @@ export async function callMcpTool( /** Typed REST and MCP wrappers for all Huskies server endpoints. */ export const api = { - getCurrentProject(baseUrl?: string) { - return requestJson("/project", {}, baseUrl); + getCurrentProject(_baseUrl?: string) { + return rpcCall("project.current"); }, - getKnownProjects(baseUrl?: string) { - return requestJson("/projects", {}, baseUrl); + getKnownProjects(_baseUrl?: string) { + return rpcCall("project.known"); }, forgetKnownProject(path: string, baseUrl?: string) { return requestJson( @@ -110,8 +111,8 @@ export const api = { closeProject(baseUrl?: string) { return requestJson("/project", { method: "DELETE" }, baseUrl); }, - getModelPreference(baseUrl?: string) { - return requestJson("/model", {}, baseUrl); + getModelPreference(_baseUrl?: string) { + return rpcCall("model.get_preference"); }, setModelPreference(model: string, baseUrl?: string) { return requestJson( @@ -120,21 +121,17 @@ export const api = { baseUrl, ); }, - getOllamaModels(baseUrlParam?: string, baseUrl?: string) { - const url = new URL( - buildApiUrl("/ollama/models", baseUrl), - window.location.origin, + getOllamaModels(baseUrlParam?: string, _baseUrl?: string) { + return rpcCall( + "ollama.list_models", + baseUrlParam ? { base_url: baseUrlParam } : {}, ); - if (baseUrlParam) { - url.searchParams.set("base_url", baseUrlParam); - } - return requestJson(url.pathname + url.search, {}, ""); }, - getAnthropicApiKeyExists(baseUrl?: string) { - return requestJson("/anthropic/key/exists", {}, baseUrl); + getAnthropicApiKeyExists(_baseUrl?: string) { + return rpcCall("anthropic.key_exists"); }, - getAnthropicModels(baseUrl?: string) { - return requestJson("/anthropic/models", {}, baseUrl); + getAnthropicModels(_baseUrl?: string) { + return rpcCall("anthropic.list_models"); }, setAnthropicApiKey(api_key: string, baseUrl?: string) { return requestJson( @@ -178,11 +175,11 @@ export const api = { baseUrl, ); }, - getHomeDirectory(baseUrl?: string) { - return requestJson("/io/fs/home", {}, baseUrl); + getHomeDirectory(_baseUrl?: string) { + return rpcCall("io.home_directory"); }, - listProjectFiles(baseUrl?: string) { - return requestJson("/io/fs/files", {}, baseUrl); + listProjectFiles(_baseUrl?: string) { + return rpcCall("io.list_project_files"); }, searchFiles(query: string, baseUrl?: string) { return requestJson( @@ -201,29 +198,21 @@ export const api = { cancelChat(baseUrl?: string) { return requestJson("/chat/cancel", { method: "POST" }, baseUrl); }, - getWorkItemContent(storyId: string, baseUrl?: string) { - return requestJson( - `/work-items/${encodeURIComponent(storyId)}`, - {}, - baseUrl, - ); + getWorkItemContent(storyId: string, _baseUrl?: string) { + return rpcCall("work_items.get", { story_id: storyId }); }, - getTestResults(storyId: string, baseUrl?: string) { - return requestJson( - `/work-items/${encodeURIComponent(storyId)}/test-results`, - {}, - baseUrl, - ); + getTestResults(storyId: string, _baseUrl?: string) { + return rpcCall("work_items.test_results", { + story_id: storyId, + }); }, - getTokenCost(storyId: string, baseUrl?: string) { - return requestJson( - `/work-items/${encodeURIComponent(storyId)}/token-cost`, - {}, - baseUrl, - ); + getTokenCost(storyId: string, _baseUrl?: string) { + return rpcCall("work_items.token_cost", { + story_id: storyId, + }); }, - getAllTokenUsage(baseUrl?: string) { - return requestJson("/token-usage", {}, baseUrl); + getAllTokenUsage(_baseUrl?: string) { + return rpcCall("token_usage.all"); }, /** Trigger a server rebuild and restart. */ rebuildAndRestart() { @@ -247,7 +236,7 @@ export const api = { }, /** Fetch OAuth status from the server. */ getOAuthStatus() { - return requestJson("/oauth/status", {}, ""); + return rpcCall("oauth.status"); }, /** Execute a bot slash command without LLM invocation. Returns markdown response text. */ botCommand(command: string, args: string, baseUrl?: string) { diff --git a/frontend/src/api/rpc.ts b/frontend/src/api/rpc.ts index b2125eed..76818d86 100644 --- a/frontend/src/api/rpc.ts +++ b/frontend/src/api/rpc.ts @@ -1,8 +1,13 @@ /** * Lightweight read-RPC client over the `/ws` WebSocket. * - * Opens a short-lived WebSocket, sends an `rpc_request` frame, waits for the - * matching `rpc_response`, then closes the connection. + * Each `rpcCall` opens a short-lived WebSocket, sends an `rpc_request` frame, + * waits for the matching `rpc_response`, then closes the connection. + * + * On a transient connection failure the call is retried once before rejecting, + * which lets a freshly-started backend race finish before the user sees an + * error. Failures surface as `Error` instances whose `.message` is intended + * to be visible (toast / banner) — callers must not swallow them silently. */ let correlationCounter = 0; @@ -27,26 +32,59 @@ export interface RpcResponse { code?: string; } +/** Error subclass for RPC failures so callers can recognise them. */ +export class RpcError extends Error { + constructor( + message: string, + public readonly code?: string, + public readonly method?: string, + ) { + super(message); + this.name = "RpcError"; + } +} + +/** Maximum number of automatic retries on transient WebSocket failure. */ +const MAX_RETRIES = 1; + +/** Delay between retry attempts (ms). */ +const RETRY_DELAY_MS = 250; + /** - * Send a read-RPC request over a temporary WebSocket connection and return - * the result. Rejects if the server responds with `ok: false` or if the - * connection times out. + * Internal: a single one-shot RPC attempt. Resolves with the result or + * rejects with an `RpcError`. */ -export function rpcCall( +function rpcAttempt( method: string, - params: Record = {}, - timeoutMs = 5000, + params: Record, + timeoutMs: number, ): Promise { return new Promise((resolve, reject) => { const correlationId = nextCorrelationId(); - const ws = new WebSocket(buildWsUrl()); + let ws: WebSocket; + try { + ws = new WebSocket(buildWsUrl()); + } catch (err) { + reject( + new RpcError( + `Failed to open WebSocket for ${method}: ${(err as Error).message}`, + "CONNECT_FAILED", + method, + ), + ); + return; + } let settled = false; const timer = setTimeout(() => { if (!settled) { settled = true; - ws.close(); - reject(new Error(`RPC timeout for ${method}`)); + try { + ws.close(); + } catch { + /* ignore */ + } + reject(new RpcError(`RPC timeout for ${method}`, "TIMEOUT", method)); } }, timeoutMs); @@ -66,25 +104,32 @@ export function rpcCall( ws.onmessage = (event) => { try { const data = JSON.parse(event.data); - // Only process rpc_response frames matching our correlation ID. if ( data.kind === "rpc_response" && data.correlation_id === correlationId ) { settled = true; clearTimeout(timer); - ws.close(); + try { + ws.close(); + } catch { + /* ignore */ + } if (data.ok) { resolve(data.result as T); } else { reject( - new Error(data.error || `RPC error: ${data.code || "UNKNOWN"}`), + new RpcError( + data.error || `RPC error: ${data.code || "UNKNOWN"}`, + data.code, + method, + ), ); } } - // Ignore other messages (pipeline_state, onboarding_status, etc.) + // Ignore other frames (pipeline_state, onboarding_status, etc.) } catch { - // Ignore non-JSON or unparseable messages + /* ignore non-JSON / malformed frames */ } }; @@ -92,7 +137,13 @@ export function rpcCall( if (!settled) { settled = true; clearTimeout(timer); - reject(new Error(`WebSocket error during RPC call to ${method}`)); + reject( + new RpcError( + `WebSocket error during RPC call to ${method}`, + "CONNECT_FAILED", + method, + ), + ); } }; @@ -100,8 +151,54 @@ export function rpcCall( if (!settled) { settled = true; clearTimeout(timer); - reject(new Error(`WebSocket closed before RPC response for ${method}`)); + reject( + new RpcError( + `WebSocket closed before RPC response for ${method}`, + "CONNECT_FAILED", + method, + ), + ); } }; }); } + +/** Return true if the error is one we should retry (connection-level). */ +function isRetryable(err: unknown): boolean { + return ( + err instanceof RpcError && + (err.code === "CONNECT_FAILED" || err.code === "TIMEOUT") + ); +} + +function sleep(ms: number): Promise { + return new Promise((r) => setTimeout(r, ms)); +} + +/** + * Send a read-RPC request over a temporary WebSocket connection and return + * the result. On transient connection failure the call is retried once + * before rejecting. Rejects with [`RpcError`] on server-side errors, + * timeouts, or persistent connection failures. + */ +export async function rpcCall( + method: string, + params: Record = {}, + timeoutMs = 5000, +): Promise { + let lastErr: unknown; + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + try { + return await rpcAttempt(method, params, timeoutMs); + } catch (err) { + lastErr = err; + if (attempt < MAX_RETRIES && isRetryable(err)) { + await sleep(RETRY_DELAY_MS); + continue; + } + throw err; + } + } + // Unreachable but TypeScript can't prove it. + throw lastErr; +} diff --git a/frontend/src/api/settings.test.ts b/frontend/src/api/settings.test.ts index 6d0d4cb1..36931d92 100644 --- a/frontend/src/api/settings.test.ts +++ b/frontend/src/api/settings.test.ts @@ -1,6 +1,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { ProjectSettings } from "./settings"; import { settingsApi } from "./settings"; +import { installRpcMock } from "./__test_utils__/mockRpcWebSocket"; const mockFetch = vi.fn(); @@ -38,28 +39,24 @@ const defaultProjectSettings: ProjectSettings = { describe("settingsApi", () => { describe("getProjectSettings", () => { - it("sends GET to /settings and returns project settings", async () => { - mockFetch.mockResolvedValueOnce(okResponse(defaultProjectSettings)); + it("dispatches settings.get_project RPC and returns project settings", async () => { + const rpc = installRpcMock(); + rpc.respond("settings.get_project", defaultProjectSettings); const result = await settingsApi.getProjectSettings(); - expect(mockFetch).toHaveBeenCalledWith( - "/api/settings", - expect.objectContaining({ - headers: expect.objectContaining({ - "Content-Type": "application/json", - }), - }), - ); + expect(rpc.calls).toEqual([ + { method: "settings.get_project", params: {} }, + ]); expect(result).toEqual(defaultProjectSettings); }); - it("uses custom baseUrl when provided", async () => { - mockFetch.mockResolvedValueOnce(okResponse(defaultProjectSettings)); - await settingsApi.getProjectSettings("http://localhost:4000/api"); - expect(mockFetch).toHaveBeenCalledWith( - "http://localhost:4000/api/settings", - expect.anything(), + it("surfaces RPC errors visibly", async () => { + const rpc = installRpcMock(); + rpc.respondError("settings.get_project", "no project open", "INTERNAL"); + + await expect(settingsApi.getProjectSettings()).rejects.toThrow( + "no project open", ); }); }); @@ -95,41 +92,26 @@ describe("settingsApi", () => { }); describe("getEditorCommand", () => { - it("sends GET to /settings/editor and returns editor settings", async () => { + it("dispatches settings.get_editor RPC and returns editor settings", async () => { + const rpc = installRpcMock(); const expected = { editor_command: "zed" }; - mockFetch.mockResolvedValueOnce(okResponse(expected)); + rpc.respond("settings.get_editor", expected); const result = await settingsApi.getEditorCommand(); - expect(mockFetch).toHaveBeenCalledWith( - "/api/settings/editor", - expect.objectContaining({ - headers: expect.objectContaining({ - "Content-Type": "application/json", - }), - }), - ); + expect(rpc.calls).toEqual([ + { method: "settings.get_editor", params: {} }, + ]); expect(result).toEqual(expected); }); it("returns null editor_command when not configured", async () => { - const expected = { editor_command: null }; - mockFetch.mockResolvedValueOnce(okResponse(expected)); + const rpc = installRpcMock(); + rpc.respond("settings.get_editor", { editor_command: null }); const result = await settingsApi.getEditorCommand(); expect(result.editor_command).toBeNull(); }); - - it("uses custom baseUrl when provided", async () => { - mockFetch.mockResolvedValueOnce(okResponse({ editor_command: "code" })); - - await settingsApi.getEditorCommand("http://localhost:4000/api"); - - expect(mockFetch).toHaveBeenCalledWith( - "http://localhost:4000/api/settings/editor", - expect.anything(), - ); - }); }); describe("setEditorCommand", () => { @@ -178,19 +160,12 @@ describe("settingsApi", () => { }); describe("error handling", () => { - it("throws with response body text on non-ok response", async () => { - mockFetch.mockResolvedValueOnce(errorResponse(400, "Bad Request")); + it("surfaces RPC errors for getEditorCommand", async () => { + const rpc = installRpcMock(); + rpc.respondError("settings.get_editor", "store unavailable", "INTERNAL"); await expect(settingsApi.getEditorCommand()).rejects.toThrow( - "Bad Request", - ); - }); - - it("throws with status code message when response body is empty", async () => { - mockFetch.mockResolvedValueOnce(errorResponse(500, "")); - - await expect(settingsApi.getEditorCommand()).rejects.toThrow( - "Request failed (500)", + "store unavailable", ); }); diff --git a/frontend/src/api/settings.ts b/frontend/src/api/settings.ts index 9afc3e55..026f410e 100644 --- a/frontend/src/api/settings.ts +++ b/frontend/src/api/settings.ts @@ -1,3 +1,8 @@ +/** + * WS-RPC client for editor and project settings. + */ +import { rpcCall } from "./rpc"; + export interface EditorSettings { editor_command: string | null; } @@ -47,8 +52,8 @@ async function requestJson( } export const settingsApi = { - getProjectSettings(baseUrl?: string): Promise { - return requestJson("/settings", {}, baseUrl); + getProjectSettings(_baseUrl?: string): Promise { + return rpcCall("settings.get_project"); }, putProjectSettings( @@ -62,8 +67,8 @@ export const settingsApi = { ); }, - getEditorCommand(baseUrl?: string): Promise { - return requestJson("/settings/editor", {}, baseUrl); + getEditorCommand(_baseUrl?: string): Promise { + return rpcCall("settings.get_editor"); }, setEditorCommand( diff --git a/server/src/crdt_sync/client.rs b/server/src/crdt_sync/client.rs index a3c60650..0588b3f1 100644 --- a/server/src/crdt_sync/client.rs +++ b/server/src/crdt_sync/client.rs @@ -364,7 +364,7 @@ pub(crate) async fn connect_and_sync(url: &str, token: Option<&str>) -> Result<( if !flush_ok { break; } - } else if let Some(rpc_resp) = try_handle_rpc_text(text.as_ref()) { + } else if let Some(rpc_resp) = try_handle_rpc_text(text.as_ref()).await { // RPC request from the peer — dispatch and reply. use tokio_tungstenite::tungstenite::Message as TungsteniteMsg; if let Ok(json) = serde_json::to_string(&rpc_resp) diff --git a/server/src/crdt_sync/mod.rs b/server/src/crdt_sync/mod.rs index eb99d87b..8a107306 100644 --- a/server/src/crdt_sync/mod.rs +++ b/server/src/crdt_sync/mod.rs @@ -69,6 +69,7 @@ mod wire; pub use auth::{add_join_token, init_token_auth, init_trusted_keys}; pub(crate) use client::connect_and_sync; pub use client::{RENDEZVOUS_ERROR_THRESHOLD, spawn_rendezvous_client}; +pub use rpc::init_rpc_context; pub(crate) use rpc::try_handle_rpc_text; pub use server::crdt_sync_handler; diff --git a/server/src/crdt_sync/rpc.rs b/server/src/crdt_sync/rpc.rs index 0b373d16..b6983ed7 100644 --- a/server/src/crdt_sync/rpc.rs +++ b/server/src/crdt_sync/rpc.rs @@ -1,7 +1,7 @@ //! RPC method registry for the `/crdt-sync` WebSocket multiplexer. //! //! Incoming [`RpcFrame::RpcRequest`] frames are dispatched through this -//! registry. Each method handler is a plain function that accepts a +//! registry. Each method handler is an async function that accepts a //! `serde_json::Value` parameter bag and returns a `serde_json::Value` result. //! //! # Registering handlers @@ -9,7 +9,7 @@ //! Add a new entry to the `HANDLERS` static slice: //! //! ```rust,ignore -//! ("my.method", handle_my_method as Handler), +//! ("my.method", |p| Box::pin(handle_my_method(p))), //! ``` //! //! # Unknown methods @@ -17,28 +17,117 @@ //! [`dispatch`] returns `Err("NOT_FOUND")` for any method not present in the //! registry. The caller should translate this into an //! [`RpcFrame::RpcResponse`] with `ok: false, code: "NOT_FOUND"`. +//! +//! # Global context +//! +//! Many handlers need access to project state (session root, store, workflow). +//! Call [`init_rpc_context`] once at server startup to register these. +//! Handlers that require context return an error result when it has not been +//! set. + +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, OnceLock}; use serde_json::Value; use super::wire::RpcFrame; +use crate::state::SessionState; +use crate::store::JsonFileStore; +use crate::workflow::WorkflowState; -/// Signature for a synchronous RPC method handler. -pub(super) type Handler = fn(Value) -> Value; +/// Future returned by an RPC handler. +type HandlerFuture = Pin + Send>>; + +/// Signature for an async RPC method handler. +pub(super) type Handler = fn(Value) -> HandlerFuture; + +/// Shared state made available to all RPC handlers. +pub struct RpcState { + pub state: Arc, + pub store: Arc, + pub workflow: Arc>, +} + +/// Global RPC context, initialised once at server startup via [`init_rpc_context`]. +static RPC_CTX: OnceLock = OnceLock::new(); + +/// Register the global RPC context. +/// +/// Must be called before any handler that accesses project state is invoked. +/// Subsequent calls are silently ignored (OnceLock semantics). +pub fn init_rpc_context( + state: Arc, + store: Arc, + workflow: Arc>, +) { + let _ = RPC_CTX.set(RpcState { + state, + store, + workflow, + }); +} /// Static registry mapping method names to handlers. /// /// Add new handlers here. The registry is a plain slice — linear scan is /// fine for the small number of methods expected. static HANDLERS: &[(&str, Handler)] = &[ - ("health.check", handle_health_check), - ("active_agents.list", handle_active_agents_list), + ("health.check", |p| Box::pin(handle_health_check(p))), + ("active_agents.list", |p| { + Box::pin(handle_active_agents_list(p)) + }), + ("agent_config.list", |p| { + Box::pin(handle_agent_config_list(p)) + }), + ("settings.get_project", |p| { + Box::pin(handle_settings_get_project(p)) + }), + ("settings.get_editor", |p| { + Box::pin(handle_settings_get_editor(p)) + }), + ("model.get_preference", |p| { + Box::pin(handle_model_get_preference(p)) + }), + ("project.current", |p| Box::pin(handle_project_current(p))), + ("project.known", |p| Box::pin(handle_project_known(p))), + ("anthropic.key_exists", |p| { + Box::pin(handle_anthropic_key_exists(p)) + }), + ("anthropic.list_models", |p| { + Box::pin(handle_anthropic_list_models(p)) + }), + ("ollama.list_models", |p| { + Box::pin(handle_ollama_list_models(p)) + }), + ("io.home_directory", |p| { + Box::pin(handle_io_home_directory(p)) + }), + ("io.list_project_files", |p| { + Box::pin(handle_io_list_project_files(p)) + }), + ("work_items.get", |p| Box::pin(handle_work_items_get(p))), + ("work_items.test_results", |p| { + Box::pin(handle_work_items_test_results(p)) + }), + ("work_items.token_cost", |p| { + Box::pin(handle_work_items_token_cost(p)) + }), + ("token_usage.all", |p| Box::pin(handle_token_usage_all(p))), + ("oauth.status", |p| Box::pin(handle_oauth_status(p))), + ("bot_config.get", |p| Box::pin(handle_bot_config_get(p))), + ("agents.get_output", |p| { + Box::pin(handle_agents_get_output(p)) + }), ]; +// ── handlers ───────────────────────────────────────────────────────────────── + /// Handler for the `health.check` method. /// /// Returns `{"status": "ok"}` unconditionally. Used as a smoke test to /// verify that the RPC multiplexer is wired up correctly. -fn handle_health_check(_params: Value) -> Value { +async fn handle_health_check(_params: Value) -> Value { serde_json::json!({"status": "ok"}) } @@ -48,12 +137,11 @@ fn handle_health_check(_params: Value) -> Value { /// matching the shape formerly served by `GET /api/agents`. Each entry /// contains `story_id`, `agent_name`, `status`, `session_id`, and /// `worktree_path`. -fn handle_active_agents_list(_params: Value) -> Value { +async fn handle_active_agents_list(_params: Value) -> Value { let entries = crate::crdt_state::read_all_active_agents().unwrap_or_default(); let list: Vec = entries .into_iter() .map(|view| { - // agent_id is the composite key "story_id:agent_name". let (story_id, agent_name) = view .agent_id .rsplit_once(':') @@ -71,14 +159,389 @@ fn handle_active_agents_list(_params: Value) -> Value { Value::Array(list) } +/// Handler for the `agent_config.list` method. +/// +/// Returns the configured agent roster from `project.toml`, matching the +/// shape formerly served by `GET /api/agents/config`. +async fn handle_agent_config_list(_params: Value) -> Value { + let Some(ctx) = RPC_CTX.get() else { + return serde_json::json!({"error": "RPC context not initialised"}); + }; + let Ok(root) = ctx.state.get_project_root() else { + return Value::Array(vec![]); + }; + let entries = crate::service::agents::get_agent_config(&root).unwrap_or_default(); + let list: Vec = entries + .into_iter() + .map(|e| { + serde_json::json!({ + "name": e.name, + "role": e.role, + "stage": e.stage, + "model": e.model, + "allowed_tools": e.allowed_tools, + "max_turns": e.max_turns, + "max_budget_usd": e.max_budget_usd, + }) + }) + .collect(); + Value::Array(list) +} + +/// Handler for the `settings.get_project` method. +/// +/// Returns the current `project.toml` scalar settings, matching the shape +/// formerly served by `GET /api/settings`. +async fn handle_settings_get_project(_params: Value) -> Value { + let Some(ctx) = RPC_CTX.get() else { + return serde_json::json!({"error": "RPC context not initialised"}); + }; + let Ok(root) = ctx.state.get_project_root() else { + return serde_json::json!({"error": "No project open"}); + }; + match crate::service::settings::load_project_settings(&root) { + Ok(s) => serde_json::json!({ + "default_qa": s.default_qa, + "default_coder_model": s.default_coder_model, + "max_coders": s.max_coders, + "max_retries": s.max_retries, + "base_branch": s.base_branch, + "rate_limit_notifications": s.rate_limit_notifications, + "timezone": s.timezone, + "rendezvous": s.rendezvous, + "watcher_sweep_interval_secs": s.watcher_sweep_interval_secs, + "watcher_done_retention_secs": s.watcher_done_retention_secs, + }), + Err(e) => serde_json::json!({"error": e.to_string()}), + } +} + +/// Handler for the `settings.get_editor` method. +/// +/// Returns the configured editor command from the store, matching the shape +/// formerly served by `GET /api/settings/editor`. +async fn handle_settings_get_editor(_params: Value) -> Value { + let Some(ctx) = RPC_CTX.get() else { + return serde_json::json!({"editor_command": null}); + }; + let cmd = crate::service::settings::get_editor_command(ctx.store.as_ref()); + serde_json::json!({"editor_command": cmd}) +} + +/// Handler for the `model.get_preference` method. +/// +/// Returns the user's saved LLM model name from the store, matching the +/// shape formerly served by `GET /api/model`. +async fn handle_model_get_preference(_params: Value) -> Value { + let Some(ctx) = RPC_CTX.get() else { + return Value::Null; + }; + match crate::io::fs::get_model_preference(ctx.store.as_ref()) { + Ok(pref) => serde_json::to_value(pref).unwrap_or(Value::Null), + Err(_) => Value::Null, + } +} + +/// Handler for the `project.current` method. +/// +/// Returns the currently open project path (or null), matching the shape +/// formerly served by `GET /api/project`. +async fn handle_project_current(_params: Value) -> Value { + let Some(ctx) = RPC_CTX.get() else { + return Value::Null; + }; + match crate::service::project::get_current_project(&ctx.state, ctx.store.as_ref()) { + Ok(path) => serde_json::to_value(path).unwrap_or(Value::Null), + Err(_) => Value::Null, + } +} + +/// Handler for the `project.known` method. +/// +/// Returns the list of previously-opened project paths from the store, +/// matching the shape formerly served by `GET /api/projects`. +async fn handle_project_known(_params: Value) -> Value { + let Some(ctx) = RPC_CTX.get() else { + return Value::Array(vec![]); + }; + match crate::service::project::get_known_projects(ctx.store.as_ref()) { + Ok(paths) => serde_json::to_value(paths).unwrap_or(Value::Array(vec![])), + Err(_) => Value::Array(vec![]), + } +} + +/// Handler for the `anthropic.key_exists` method. +/// +/// Returns true when an Anthropic API key is stored, matching the shape +/// formerly served by `GET /api/anthropic/key/exists`. +async fn handle_anthropic_key_exists(_params: Value) -> Value { + let Some(ctx) = RPC_CTX.get() else { + return Value::Bool(false); + }; + match crate::service::anthropic::get_api_key_exists(ctx.store.as_ref()) { + Ok(exists) => Value::Bool(exists), + Err(_) => Value::Bool(false), + } +} + +/// Handler for the `anthropic.list_models` method. +/// +/// Returns the available Anthropic models, matching the shape formerly +/// served by `GET /api/anthropic/models`. Surfaces upstream errors as a +/// JSON object `{"error": "..."}`. +async fn handle_anthropic_list_models(_params: Value) -> Value { + let Some(ctx) = RPC_CTX.get() else { + return serde_json::json!({"error": "RPC context not initialised"}); + }; + match crate::service::anthropic::list_models(ctx.store.as_ref()).await { + Ok(models) => serde_json::to_value(models).unwrap_or(Value::Array(vec![])), + Err(e) => serde_json::json!({"error": e.to_string()}), + } +} + +/// Handler for the `ollama.list_models` method. +/// +/// Returns the available Ollama models for the configured base URL, +/// matching the shape formerly served by `GET /api/ollama/models`. +/// +/// Parameters: `{ "base_url"?: string }`. +async fn handle_ollama_list_models(params: Value) -> Value { + let base_url = params + .get("base_url") + .and_then(|v| v.as_str()) + .map(str::to_string); + match crate::llm::chat::get_ollama_models(base_url).await { + Ok(models) => serde_json::to_value(models).unwrap_or(Value::Array(vec![])), + Err(_) => Value::Array(vec![]), + } +} + +/// Handler for the `io.home_directory` method. +/// +/// Returns the user's home directory path, matching the shape formerly +/// served by `GET /api/io/fs/home`. +async fn handle_io_home_directory(_params: Value) -> Value { + match crate::service::file_io::get_home_directory() { + Ok(home) => Value::String(home), + Err(_) => Value::Null, + } +} + +/// Handler for the `io.list_project_files` method. +/// +/// Returns the list of files in the currently open project, matching the +/// shape formerly served by `GET /api/io/fs/files`. +async fn handle_io_list_project_files(_params: Value) -> Value { + let Some(ctx) = RPC_CTX.get() else { + return Value::Array(vec![]); + }; + match crate::service::file_io::list_project_files(&ctx.state).await { + Ok(files) => serde_json::to_value(files).unwrap_or(Value::Array(vec![])), + Err(_) => Value::Array(vec![]), + } +} + +/// Handler for the `work_items.get` method. +/// +/// Returns the markdown content and metadata for a work item, matching the +/// shape formerly served by `GET /api/work-items/{story_id}`. +/// +/// Parameters: `{ "story_id": string }`. +async fn handle_work_items_get(params: Value) -> Value { + let Some(ctx) = RPC_CTX.get() else { + return serde_json::json!({"error": "RPC context not initialised"}); + }; + let Some(story_id) = params.get("story_id").and_then(|v| v.as_str()) else { + return serde_json::json!({"error": "missing story_id"}); + }; + let Ok(root) = ctx.state.get_project_root() else { + return serde_json::json!({"error": "No project open"}); + }; + match crate::service::agents::get_work_item_content(&root, story_id) { + Ok(c) => serde_json::json!({ + "content": c.content, + "stage": c.stage, + "name": c.name, + "agent": c.agent, + }), + Err(e) => serde_json::json!({"error": e.to_string()}), + } +} + +/// Handler for the `work_items.test_results` method. +/// +/// Returns the most recent test-suite results for a story, matching the +/// shape formerly served by `GET /api/work-items/{story_id}/test-results`. +/// +/// Parameters: `{ "story_id": string }`. +async fn handle_work_items_test_results(params: Value) -> Value { + let Some(ctx) = RPC_CTX.get() else { + return Value::Null; + }; + let Some(story_id) = params.get("story_id").and_then(|v| v.as_str()) else { + return Value::Null; + }; + let Ok(root) = ctx.state.get_project_root() else { + return Value::Null; + }; + let workflow = ctx.workflow.lock().unwrap(); + match crate::service::agents::get_test_results(&root, story_id, &workflow) { + Some(results) => serde_json::to_value(results).unwrap_or(Value::Null), + None => Value::Null, + } +} + +/// Handler for the `work_items.token_cost` method. +/// +/// Returns the aggregated LLM token cost for a story, matching the shape +/// formerly served by `GET /api/work-items/{story_id}/token-cost`. +/// +/// Parameters: `{ "story_id": string }`. +async fn handle_work_items_token_cost(params: Value) -> Value { + let Some(ctx) = RPC_CTX.get() else { + return serde_json::json!({"error": "RPC context not initialised"}); + }; + let Some(story_id) = params.get("story_id").and_then(|v| v.as_str()) else { + return serde_json::json!({"error": "missing story_id"}); + }; + let Ok(root) = ctx.state.get_project_root() else { + return serde_json::json!({"error": "No project open"}); + }; + match crate::service::agents::get_work_item_token_cost(&root, story_id) { + Ok(summary) => serde_json::json!({ + "total_cost_usd": summary.total_cost_usd, + "agents": summary.agents.into_iter().map(|a| serde_json::json!({ + "agent_name": a.agent_name, + "model": a.model, + "input_tokens": a.input_tokens, + "output_tokens": a.output_tokens, + "cache_creation_input_tokens": a.cache_creation_input_tokens, + "cache_read_input_tokens": a.cache_read_input_tokens, + "total_cost_usd": a.total_cost_usd, + })).collect::>(), + }), + Err(e) => serde_json::json!({"error": e.to_string()}), + } +} + +/// Handler for the `token_usage.all` method. +/// +/// Returns every token-usage record for the project, matching the shape +/// formerly served by `GET /api/token-usage`. +async fn handle_token_usage_all(_params: Value) -> Value { + let Some(ctx) = RPC_CTX.get() else { + return serde_json::json!({"records": []}); + }; + let Ok(root) = ctx.state.get_project_root() else { + return serde_json::json!({"records": []}); + }; + let records = crate::service::agents::get_all_token_usage(&root).unwrap_or_default(); + serde_json::json!({"records": records}) +} + +/// Handler for the `oauth.status` method. +/// +/// Returns the status of every stored OAuth account in the login pool, +/// matching the shape formerly served by `GET /oauth/status`. +async fn handle_oauth_status(_params: Value) -> Value { + let accounts = crate::service::oauth::check_all_accounts(); + serde_json::json!({"accounts": accounts}) +} + +/// Handler for the `bot_config.get` method. +/// +/// Reads the credentials stored in `.huskies/bot.toml`, matching the shape +/// formerly served by `GET /api/bot/config`. +async fn handle_bot_config_get(_params: Value) -> Value { + let Some(ctx) = RPC_CTX.get() else { + return bot_config_default(); + }; + let Ok(root) = ctx.state.get_project_root() else { + return bot_config_default(); + }; + let path = root.join(".huskies").join("bot.toml"); + match std::fs::read_to_string(&path) { + Ok(s) => match toml::from_str::(&s) { + Ok(v) => merge_bot_config_defaults(v), + Err(_) => bot_config_default(), + }, + Err(_) => bot_config_default(), + } +} + +/// Default bot config payload — every key present, every value null. +fn bot_config_default() -> Value { + serde_json::json!({ + "transport": null, + "enabled": null, + "homeserver": null, + "username": null, + "password": null, + "room_ids": null, + "slack_bot_token": null, + "slack_signing_secret": null, + "slack_channel_ids": null, + }) +} + +/// Fill in missing keys with `null` so the frontend always sees the full shape. +fn merge_bot_config_defaults(mut v: Value) -> Value { + let obj = v.as_object_mut(); + let keys = [ + "transport", + "enabled", + "homeserver", + "username", + "password", + "room_ids", + "slack_bot_token", + "slack_signing_secret", + "slack_channel_ids", + ]; + if let Some(map) = obj { + for k in keys { + map.entry(k.to_string()).or_insert(Value::Null); + } + } + v +} + +/// Handler for the `agents.get_output` method. +/// +/// Returns the concatenated output text for an agent's most recent session, +/// matching the shape formerly served by +/// `GET /api/agents/{story_id}/{agent_name}/output`. +/// +/// Parameters: `{ "story_id": string, "agent_name": string }`. +async fn handle_agents_get_output(params: Value) -> Value { + let Some(ctx) = RPC_CTX.get() else { + return serde_json::json!({"error": "RPC context not initialised"}); + }; + let Some(story_id) = params.get("story_id").and_then(|v| v.as_str()) else { + return serde_json::json!({"error": "missing story_id"}); + }; + let Some(agent_name) = params.get("agent_name").and_then(|v| v.as_str()) else { + return serde_json::json!({"error": "missing agent_name"}); + }; + let Ok(root) = ctx.state.get_project_root() else { + return serde_json::json!({"error": "no project open"}); + }; + match crate::service::agents::get_agent_output(&root, story_id, agent_name) { + Ok(output) => serde_json::json!({"output": output}), + Err(e) => serde_json::json!({"error": e.to_string()}), + } +} + +// ── dispatch ────────────────────────────────────────────────────────────────── + /// Dispatch an incoming RPC method call to the registered handler. /// /// Returns `Ok(result)` on success or `Err("NOT_FOUND")` if no handler is /// registered for `method`. -pub(super) fn dispatch(method: &str, params: Value) -> Result { +pub(super) async fn dispatch(method: &str, params: Value) -> Result { for (name, handler) in HANDLERS { if *name == method { - return Ok(handler(params)); + return Ok(handler(params).await); } } Err("NOT_FOUND") @@ -89,7 +552,7 @@ pub(super) fn dispatch(method: &str, params: Value) -> Result Option { +pub(crate) async fn try_handle_rpc_text(text: &str) -> Option { let frame: RpcFrame = serde_json::from_str(text).ok()?; match frame { RpcFrame::RpcRequest { @@ -99,7 +562,7 @@ pub(crate) fn try_handle_rpc_text(text: &str) -> Option { params, .. } => { - let response = match dispatch(&method, params) { + let response = match dispatch(&method, params).await { Ok(result) => RpcFrame::RpcResponse { version, correlation_id, @@ -128,23 +591,23 @@ pub(crate) fn try_handle_rpc_text(text: &str) -> Option { mod tests { use super::*; - #[test] - fn health_check_returns_ok_status() { - let result = dispatch("health.check", serde_json::json!({})); + #[tokio::test] + async fn health_check_returns_ok_status() { + let result = dispatch("health.check", serde_json::json!({})).await; assert!(result.is_ok()); let val = result.unwrap(); assert_eq!(val["status"], "ok"); } - #[test] - fn unknown_method_returns_not_found() { - let result = dispatch("nonexistent.method", serde_json::json!({})); + #[tokio::test] + async fn unknown_method_returns_not_found() { + let result = dispatch("nonexistent.method", serde_json::json!({})).await; assert!(result.is_err()); assert_eq!(result.unwrap_err(), "NOT_FOUND"); } - #[test] - fn try_handle_rpc_text_health_check() { + #[tokio::test] + async fn try_handle_rpc_text_health_check() { let req = serde_json::json!({ "kind": "rpc_request", "version": 1, @@ -154,7 +617,9 @@ mod tests { "params": {} }); let text = serde_json::to_string(&req).unwrap(); - let resp = try_handle_rpc_text(&text).expect("must produce a response"); + let resp = try_handle_rpc_text(&text) + .await + .expect("must produce a response"); match resp { RpcFrame::RpcResponse { ok, @@ -173,8 +638,8 @@ mod tests { } } - #[test] - fn try_handle_rpc_text_unknown_method_returns_not_found() { + #[tokio::test] + async fn try_handle_rpc_text_unknown_method_returns_not_found() { let req = serde_json::json!({ "kind": "rpc_request", "version": 1, @@ -184,7 +649,9 @@ mod tests { "params": {} }); let text = serde_json::to_string(&req).unwrap(); - let resp = try_handle_rpc_text(&text).expect("must produce a response for unknown method"); + let resp = try_handle_rpc_text(&text) + .await + .expect("must produce a response for unknown method"); match resp { RpcFrame::RpcResponse { ok, code, .. } => { assert!(!ok, "unknown method must not succeed"); @@ -194,17 +661,14 @@ mod tests { } } - #[test] - fn try_handle_rpc_text_ignores_non_rpc_frames() { - // A SyncMessage::Bulk frame must not be treated as an RPC request. + #[tokio::test] + async fn try_handle_rpc_text_ignores_non_rpc_frames() { let bulk = r#"{"type":"bulk","ops":[]}"#; - assert!(try_handle_rpc_text(bulk).is_none()); + assert!(try_handle_rpc_text(bulk).await.is_none()); } - #[test] - fn try_handle_rpc_text_ignores_rpc_response_frames() { - // An incoming rpc_response (e.g. reply to our own outbound request) must - // not trigger a further response. + #[tokio::test] + async fn try_handle_rpc_text_ignores_rpc_response_frames() { let resp = serde_json::json!({ "kind": "rpc_response", "version": 1, @@ -213,16 +677,16 @@ mod tests { "result": {"status": "ok"} }); let text = serde_json::to_string(&resp).unwrap(); - assert!(try_handle_rpc_text(&text).is_none()); + assert!(try_handle_rpc_text(&text).await.is_none()); } - #[test] - fn try_handle_rpc_text_ignores_invalid_json() { - assert!(try_handle_rpc_text("not json at all").is_none()); + #[tokio::test] + async fn try_handle_rpc_text_ignores_invalid_json() { + assert!(try_handle_rpc_text("not json at all").await.is_none()); } - #[test] - fn rpc_response_correlation_id_mirrors_request() { + #[tokio::test] + async fn rpc_response_correlation_id_mirrors_request() { let req = serde_json::json!({ "kind": "rpc_request", "version": 1, @@ -232,7 +696,7 @@ mod tests { "params": {} }); let text = serde_json::to_string(&req).unwrap(); - let resp = try_handle_rpc_text(&text).unwrap(); + let resp = try_handle_rpc_text(&text).await.unwrap(); match resp { RpcFrame::RpcResponse { correlation_id, .. } => { assert_eq!(correlation_id, "mirror-me"); @@ -240,4 +704,104 @@ mod tests { _ => panic!("Expected RpcResponse"), } } + + // ── context-dependent handlers (no context set) ────────────────────────── + + #[tokio::test] + async fn agent_config_list_returns_value_without_context() { + let result = dispatch("agent_config.list", serde_json::json!({})).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn settings_get_editor_returns_editor_command_key() { + let result = dispatch("settings.get_editor", serde_json::json!({})).await; + assert!(result.is_ok()); + let val = result.unwrap(); + assert!(val.get("editor_command").is_some()); + } + + #[tokio::test] + async fn model_get_preference_returns_a_value() { + let result = dispatch("model.get_preference", serde_json::json!({})).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn project_current_returns_a_value() { + let result = dispatch("project.current", serde_json::json!({})).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn project_known_returns_a_value() { + let result = dispatch("project.known", serde_json::json!({})).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn anthropic_key_exists_returns_a_value() { + let result = dispatch("anthropic.key_exists", serde_json::json!({})).await; + assert!(result.is_ok()); + let val = result.unwrap(); + assert!(val.is_boolean() || val.is_object()); + } + + #[tokio::test] + async fn io_home_directory_returns_a_value() { + let result = dispatch("io.home_directory", serde_json::json!({})).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn token_usage_all_returns_records_envelope() { + let result = dispatch("token_usage.all", serde_json::json!({})).await; + assert!(result.is_ok()); + let val = result.unwrap(); + assert!(val.get("records").is_some()); + } + + #[tokio::test] + async fn oauth_status_returns_accounts_envelope() { + let result = dispatch("oauth.status", serde_json::json!({})).await; + assert!(result.is_ok()); + let val = result.unwrap(); + assert!(val.get("accounts").is_some()); + } + + #[tokio::test] + async fn bot_config_get_returns_full_shape() { + let result = dispatch("bot_config.get", serde_json::json!({})).await; + assert!(result.is_ok()); + let val = result.unwrap(); + for key in [ + "transport", + "enabled", + "homeserver", + "username", + "password", + "room_ids", + "slack_bot_token", + "slack_signing_secret", + "slack_channel_ids", + ] { + assert!(val.get(key).is_some(), "missing key {key}"); + } + } + + #[tokio::test] + async fn work_items_get_missing_story_id_returns_error() { + let result = dispatch("work_items.get", serde_json::json!({})).await; + assert!(result.is_ok()); + let val = result.unwrap(); + assert!(val.get("error").is_some()); + } + + #[tokio::test] + async fn agents_get_output_missing_story_id_returns_error() { + let result = dispatch("agents.get_output", serde_json::json!({})).await; + assert!(result.is_ok()); + let val = result.unwrap(); + assert!(val.get("error").is_some()); + } } diff --git a/server/src/crdt_sync/server/mod.rs b/server/src/crdt_sync/server/mod.rs index 71518742..15c63f77 100644 --- a/server/src/crdt_sync/server/mod.rs +++ b/server/src/crdt_sync/server/mod.rs @@ -264,7 +264,7 @@ pub async fn crdt_sync_handler( if !flush_ok { break; } - } else if let Some(rpc_resp) = try_handle_rpc_text(&text) { + } else if let Some(rpc_resp) = try_handle_rpc_text(&text).await { // RPC request — dispatch to registry and send response. if let Ok(json) = serde_json::to_string(&rpc_resp) && sink.send(WsMessage::Text(json)).await.is_err() diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index 7febb2f8..d4b89fb2 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -167,7 +167,7 @@ pub fn build_routes( #[poem::handler] pub async fn rpc_http_handler(body: poem::web::Json) -> poem::Response { let text = serde_json::to_string(&body.0).unwrap_or_default(); - match crate::crdt_sync::try_handle_rpc_text(&text) { + match crate::crdt_sync::try_handle_rpc_text(&text).await { Some(response) => { let json = serde_json::to_string(&response).unwrap_or_default(); poem::Response::builder() diff --git a/server/src/http/ws/mod.rs b/server/src/http/ws/mod.rs index 7c8da623..b44ce7c7 100644 --- a/server/src/http/ws/mod.rs +++ b/server/src/http/ws/mod.rs @@ -97,7 +97,7 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem }; // Handle read-RPC frames (discriminated by "kind", not "type"). - if let Some(rpc_resp) = crate::crdt_sync::try_handle_rpc_text(&text) { + if let Some(rpc_resp) = crate::crdt_sync::try_handle_rpc_text(&text).await { if let Ok(resp_text) = serde_json::to_string(&rpc_resp) { let _ = raw_tx.send(resp_text); } @@ -160,7 +160,7 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem Some(Ok(WsMessage::Text(inner_text))) = stream.next() => { // Handle read-RPC frames during active chat. - if let Some(rpc_resp) = crate::crdt_sync::try_handle_rpc_text(&inner_text) { + if let Some(rpc_resp) = crate::crdt_sync::try_handle_rpc_text(&inner_text).await { if let Ok(resp_text) = serde_json::to_string(&rpc_resp) { let _ = raw_tx.send(resp_text); } diff --git a/server/src/main.rs b/server/src/main.rs index 22669402..1b61ac3f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -102,6 +102,10 @@ async fn main() -> Result<(), std::io::Error> { } let store = Arc::new(JsonFileStore::from_path(store_path).map_err(std::io::Error::other)?); + // Shared workflow state — same instance is reused for HTTP handlers below. + let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default())); + crate::crdt_sync::init_rpc_context(app_state.clone(), store.clone(), Arc::clone(&workflow)); + // Collect CLI args, skipping the binary name (argv[0]). let raw_args: Vec = std::env::args().skip(1).collect(); @@ -174,8 +178,6 @@ async fn main() -> Result<(), std::io::Error> { return agent_mode::run(agent_root, rendezvous, port, join_token, agent_gateway_url).await; } - let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default())); - // Event bus: broadcast channel for pipeline lifecycle events. let (watcher_tx, _) = broadcast::channel::(1024); let agents = Arc::new(AgentPool::new(port, watcher_tx.clone()));