huskies: merge 770

This commit is contained in:
dave
2026-04-28 15:31:29 +00:00
parent 1946709681
commit f63464852b
13 changed files with 212 additions and 266 deletions
+5 -35
View File
@@ -132,38 +132,6 @@ describe("agentsApi", () => {
});
});
describe("listAgents", () => {
it("sends GET to /agents and returns agent list", async () => {
mockFetch.mockResolvedValueOnce(okResponse([sampleAgent]));
const result = await agentsApi.listAgents();
expect(mockFetch).toHaveBeenCalledWith(
"/api/agents",
expect.objectContaining({}),
);
expect(result).toEqual([sampleAgent]);
});
it("returns empty array when no agents running", async () => {
mockFetch.mockResolvedValueOnce(okResponse([]));
const result = await agentsApi.listAgents();
expect(result).toEqual([]);
});
it("uses custom baseUrl when provided", async () => {
mockFetch.mockResolvedValueOnce(okResponse([]));
await agentsApi.listAgents("http://localhost:3002/api");
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:3002/api/agents",
expect.objectContaining({}),
);
});
});
describe("getAgentConfig", () => {
it("sends GET to /agents/config and returns config list", async () => {
mockFetch.mockResolvedValueOnce(okResponse([sampleConfig]));
@@ -216,15 +184,17 @@ describe("agentsApi", () => {
describe("error handling", () => {
it("throws on non-ok response with body text", async () => {
mockFetch.mockResolvedValueOnce(errorResponse(404, "agent not found"));
mockFetch.mockResolvedValueOnce(errorResponse(404, "config not found"));
await expect(agentsApi.listAgents()).rejects.toThrow("agent not found");
await expect(agentsApi.getAgentConfig()).rejects.toThrow(
"config not found",
);
});
it("throws with status code when no body", async () => {
mockFetch.mockResolvedValueOnce(errorResponse(500, ""));
await expect(agentsApi.listAgents()).rejects.toThrow(
await expect(agentsApi.getAgentConfig()).rejects.toThrow(
"Request failed (500)",
);
});
+4 -2
View File
@@ -1,3 +1,5 @@
import { rpcCall } from "./rpc";
export type AgentStatusValue = "pending" | "running" | "completed" | "failed";
export interface AgentInfo {
@@ -94,8 +96,8 @@ export const agentsApi = {
);
},
listAgents(baseUrl?: string) {
return requestJson<AgentInfo[]>("/agents", {}, baseUrl);
listAgents(_baseUrl?: string) {
return rpcCall<AgentInfo[]>("active_agents.list");
},
getAgentConfig(baseUrl?: string) {
+107
View File
@@ -0,0 +1,107 @@
/**
* Lightweight read-RPC client over the `/ws` WebSocket.
*
* Opens a short-lived WebSocket, sends an `rpc_request` frame, waits for the
* matching `rpc_response`, then closes the connection.
*/
let correlationCounter = 0;
function nextCorrelationId(): string {
return `rpc-${Date.now()}-${++correlationCounter}`;
}
/**
* Build the WebSocket URL for the `/ws` endpoint, deriving the protocol
* (ws/wss) and host from the current page location.
*/
function buildWsUrl(): string {
const proto = window.location.protocol === "https:" ? "wss:" : "ws:";
return `${proto}//${window.location.host}/ws`;
}
export interface RpcResponse<T = unknown> {
ok: boolean;
result?: T;
error?: string;
code?: string;
}
/**
* Send a read-RPC request over a temporary WebSocket connection and return
* the result. Rejects if the server responds with `ok: false` or if the
* connection times out.
*/
export function rpcCall<T = unknown>(
method: string,
params: Record<string, unknown> = {},
timeoutMs = 5000,
): Promise<T> {
return new Promise<T>((resolve, reject) => {
const correlationId = nextCorrelationId();
const ws = new WebSocket(buildWsUrl());
let settled = false;
const timer = setTimeout(() => {
if (!settled) {
settled = true;
ws.close();
reject(new Error(`RPC timeout for ${method}`));
}
}, timeoutMs);
ws.onopen = () => {
ws.send(
JSON.stringify({
kind: "rpc_request",
version: 1,
correlation_id: correlationId,
ttl_ms: timeoutMs,
method,
params,
}),
);
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
// Only process rpc_response frames matching our correlation ID.
if (
data.kind === "rpc_response" &&
data.correlation_id === correlationId
) {
settled = true;
clearTimeout(timer);
ws.close();
if (data.ok) {
resolve(data.result as T);
} else {
reject(
new Error(data.error || `RPC error: ${data.code || "UNKNOWN"}`),
);
}
}
// Ignore other messages (pipeline_state, onboarding_status, etc.)
} catch {
// Ignore non-JSON or unparseable messages
}
};
ws.onerror = () => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(new Error(`WebSocket error during RPC call to ${method}`));
}
};
ws.onclose = () => {
if (!settled) {
settled = true;
clearTimeout(timer);
reject(new Error(`WebSocket closed before RPC response for ${method}`));
}
};
});
}
+1 -1
View File
@@ -10,7 +10,7 @@ beforeEach(() => {
vi.fn((input: string | URL | Request) => {
const url = typeof input === "string" ? input : input.toString();
// Endpoints that return arrays need [] not {} to avoid "not iterable" errors.
const arrayEndpoints = ["/agents", "/agents/config"];
const arrayEndpoints = ["/agents/config"];
const body = arrayEndpoints.some((ep) => url.endsWith(ep))
? JSON.stringify([])
: JSON.stringify({});
+1
View File
@@ -69,6 +69,7 @@ mod wire;
pub use auth::{add_join_token, init_token_auth, init_trusted_keys};
pub(crate) use client::connect_and_sync;
pub use client::{RENDEZVOUS_ERROR_THRESHOLD, spawn_rendezvous_client};
pub(crate) use rpc::try_handle_rpc_text;
pub use server::crdt_sync_handler;
// Test-only re-export used by `crdt_snapshot` tests.
+34 -2
View File
@@ -29,7 +29,10 @@ pub(super) type Handler = fn(Value) -> Value;
///
/// Add new handlers here. The registry is a plain slice — linear scan is
/// fine for the small number of methods expected.
static HANDLERS: &[(&str, Handler)] = &[("health.check", handle_health_check)];
static HANDLERS: &[(&str, Handler)] = &[
("health.check", handle_health_check),
("active_agents.list", handle_active_agents_list),
];
/// Handler for the `health.check` method.
///
@@ -39,6 +42,35 @@ fn handle_health_check(_params: Value) -> Value {
serde_json::json!({"status": "ok"})
}
/// Handler for the `active_agents.list` method.
///
/// Reads the `active_agents` collection from the CRDT and returns an array
/// matching the shape formerly served by `GET /api/agents`. Each entry
/// contains `story_id`, `agent_name`, `status`, `session_id`, and
/// `worktree_path`.
fn handle_active_agents_list(_params: Value) -> Value {
let entries = crate::crdt_state::read_all_active_agents().unwrap_or_default();
let list: Vec<Value> = entries
.into_iter()
.map(|view| {
// agent_id is the composite key "story_id:agent_name".
let (story_id, agent_name) = view
.agent_id
.rsplit_once(':')
.map(|(s, a)| (s.to_string(), a.to_string()))
.unwrap_or_else(|| (view.story_id.unwrap_or_default(), view.agent_id.clone()));
serde_json::json!({
"story_id": story_id,
"agent_name": agent_name,
"status": "running",
"session_id": null,
"worktree_path": null,
})
})
.collect();
Value::Array(list)
}
/// Dispatch an incoming RPC method call to the registered handler.
///
/// Returns `Ok(result)` on success or `Err("NOT_FOUND")` if no handler is
@@ -57,7 +89,7 @@ pub(super) fn dispatch(method: &str, params: Value) -> Result<Value, &'static st
///
/// Returns `None` if the text is not a valid `rpc_request` frame (i.e. it
/// should be forwarded to the CRDT sync handler instead).
pub(super) fn try_handle_rpc_text(text: &str) -> Option<RpcFrame> {
pub(crate) fn try_handle_rpc_text(text: &str) -> Option<RpcFrame> {
let frame: RpcFrame = serde_json::from_str(text).ok()?;
match frame {
RpcFrame::RpcRequest {
-30
View File
@@ -262,36 +262,6 @@ impl AgentsApi {
Ok(Json(true))
}
/// List all agents with their status.
///
/// Agents for stories that have been completed (`work/5_done/` or `work/6_archived/`) are
/// excluded so the agents panel is not cluttered with old completed items
/// on frontend startup.
#[oai(path = "/agents", method = "get")]
async fn list_agents(&self) -> OpenApiResult<Json<Vec<AgentInfoResponse>>> {
let project_root = self
.ctx
.services
.agents
.get_project_root(&self.ctx.state)
.ok();
let agents = svc::list_agents(&self.ctx.services.agents, project_root.as_deref())
.map_err(map_svc_error)?;
Ok(Json(
agents
.into_iter()
.map(|info| AgentInfoResponse {
story_id: info.story_id,
agent_name: info.agent_name,
status: info.status.to_string(),
session_id: info.session_id,
worktree_path: info.worktree_path,
})
.collect(),
))
}
/// Get the configured agent roster from project.toml.
#[oai(path = "/agents/config", method = "get")]
async fn get_agent_config(&self) -> OpenApiResult<Json<Vec<AgentConfigInfoResponse>>> {
-52
View File
@@ -42,58 +42,6 @@ fn story_is_archived_true_when_file_in_6_archived() {
assert!(svc::is_archived(&root, "79_story_foo"));
}
#[tokio::test]
async fn list_agents_excludes_archived_stories() {
let tmp = TempDir::new().unwrap();
let root = make_work_dirs(&tmp);
// Place an archived story file in 6_archived
std::fs::write(
root.join(".huskies/work/6_archived/79_story_archived.md"),
"---\nname: archived story\n---\n",
)
.unwrap();
let ctx = AppContext::new_test(root);
// Inject an agent for the archived story (completed) and one for an active story
ctx.services
.agents
.inject_test_agent("79_story_archived", "coder-1", AgentStatus::Completed);
ctx.services
.agents
.inject_test_agent("80_story_active", "coder-1", AgentStatus::Running);
let api = AgentsApi { ctx: Arc::new(ctx) };
let result = api.list_agents().await.unwrap().0;
// Archived story's agent should not appear
assert!(
!result.iter().any(|a| a.story_id == "79_story_archived"),
"archived story agent should be excluded from list_agents"
);
// Active story's agent should still appear
assert!(
result.iter().any(|a| a.story_id == "80_story_active"),
"active story agent should be included in list_agents"
);
}
#[tokio::test]
async fn list_agents_includes_all_when_no_project_root() {
// When no project root is configured, all agents are returned (safe default).
let tmp = TempDir::new().unwrap();
let ctx = AppContext::new_test(tmp.path().to_path_buf());
// Clear the project_root so get_project_root returns Err
*ctx.state.project_root.lock().unwrap() = None;
ctx.services
.agents
.inject_test_agent("42_story_whatever", "coder-1", AgentStatus::Completed);
let api = AgentsApi { ctx: Arc::new(ctx) };
let result = api.list_agents().await.unwrap().0;
assert!(result.iter().any(|a| a.story_id == "42_story_whatever"));
}
fn make_project_toml(root: &path::Path, content: &str) {
let sk_dir = root.join(".huskies");
std::fs::create_dir_all(&sk_dir).unwrap();
+23
View File
@@ -82,6 +82,7 @@ pub fn build_routes(
.nest("/docs", docs_service.swagger_ui())
.at("/ws", get(ws::ws_handler))
.at("/crdt-sync", get(crate::crdt_sync::crdt_sync_handler))
.at("/rpc", post(rpc_http_handler))
.at(
"/agents/:story_id/:agent_name/stream",
get(agents_sse::agent_stream),
@@ -133,6 +134,28 @@ pub fn build_routes(
route.data(ctx_arc)
}
/// HTTP bridge for the read-RPC protocol.
///
/// Accepts a JSON [`RpcFrame::RpcRequest`] body and returns the corresponding
/// [`RpcFrame::RpcResponse`]. This allows HTTP clients (e.g. the frontend) to
/// call read-RPC methods without maintaining a `/crdt-sync` WebSocket connection.
#[poem::handler]
pub async fn rpc_http_handler(body: poem::web::Json<serde_json::Value>) -> poem::Response {
let text = serde_json::to_string(&body.0).unwrap_or_default();
match crate::crdt_sync::try_handle_rpc_text(&text) {
Some(response) => {
let json = serde_json::to_string(&response).unwrap_or_default();
poem::Response::builder()
.status(poem::http::StatusCode::OK)
.header(poem::http::header::CONTENT_TYPE, "application/json")
.body(json)
}
None => poem::Response::builder()
.status(poem::http::StatusCode::BAD_REQUEST)
.body("Invalid RPC request"),
}
}
/// Debug HTTP endpoint: `GET /debug/crdt[?story_id=<id>]`
///
/// Returns the raw in-memory CRDT state as JSON. Accepts an optional
+37 -5
View File
@@ -29,13 +29,30 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
ws.on_upgrade(move |socket| async move {
let (mut sink, mut stream) = socket.split();
let (tx, mut rx) = mpsc::unbounded_channel::<WsResponse>();
// Separate channel for pre-serialized messages (e.g. RPC responses).
let (raw_tx, mut raw_rx) = mpsc::unbounded_channel::<String>();
let forward = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if let Ok(text) = serde_json::to_string(&msg)
&& sink.send(WsMessage::Text(text)).await.is_err()
{
break;
loop {
tokio::select! {
msg = rx.recv() => match msg {
Some(msg) => {
if let Ok(text) = serde_json::to_string(&msg)
&& sink.send(WsMessage::Text(text)).await.is_err()
{
break;
}
}
None => break,
},
raw = raw_rx.recv() => match raw {
Some(text) => {
if sink.send(WsMessage::Text(text)).await.is_err() {
break;
}
}
None => break,
},
}
}
});
@@ -79,6 +96,14 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
break;
};
// Handle read-RPC frames (discriminated by "kind", not "type").
if let Some(rpc_resp) = crate::crdt_sync::try_handle_rpc_text(&text) {
if let Ok(resp_text) = serde_json::to_string(&rpc_resp) {
let _ = raw_tx.send(resp_text);
}
continue;
}
match ws::dispatch_outer(&text) {
ws::DispatchResult::StartChat { messages, config } => {
let tx_updates = tx.clone();
@@ -134,6 +159,13 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
}
Some(Ok(WsMessage::Text(inner_text))) = stream.next() => {
// Handle read-RPC frames during active chat.
if let Some(rpc_resp) = crate::crdt_sync::try_handle_rpc_text(&inner_text) {
if let Ok(resp_text) = serde_json::to_string(&rpc_resp) {
let _ = raw_tx.send(resp_text);
}
continue;
}
match ws::dispatch_inner(&inner_text, &mut pending_perms) {
ws::InnerDispatchResult::CancelChat => {
let _ = chat::cancel_chat(&ctx.state);
-7
View File
@@ -104,13 +104,6 @@ pub mod test_helpers {
std::fs::create_dir_all(tmp.path().join(".huskies")).unwrap();
}
/// Create the `5_done` and `6_archived` work-stage directories.
pub fn make_work_dirs(tmp: &TempDir) {
for stage in &["5_done", "6_archived"] {
std::fs::create_dir_all(tmp.path().join(".huskies").join("work").join(stage)).unwrap();
}
}
/// Create all six pipeline stage directories under `.huskies/work/`.
pub fn make_stage_dirs(tmp: &TempDir) {
for stage in &[
-55
View File
@@ -113,20 +113,6 @@ pub async fn stop_agent(
.map_err(Error::AgentNotFound)
}
/// List all agents, optionally filtering out those belonging to archived stories.
///
/// When `project_root` is `None` the archive filter is skipped and all agents
/// are returned (safe default when the server is not yet fully configured).
pub fn list_agents(pool: &AgentPool, project_root: Option<&Path>) -> Result<Vec<AgentInfo>, Error> {
let agents = pool.list_agents().map_err(Error::Io)?;
match project_root {
Some(root) => Ok(selection::filter_non_archived(agents, |id| {
io::is_archived(root, id)
})),
None => Ok(agents),
}
}
/// Create a git worktree for a story.
pub async fn create_worktree(
pool: &AgentPool,
@@ -289,50 +275,9 @@ fn config_to_entries(config: &ProjectConfig) -> Vec<AgentConfigEntry> {
#[cfg(test)]
mod tests {
use super::*;
use crate::agents::AgentStatus;
use io::test_helpers::*;
use std::sync::Arc;
use tempfile::TempDir;
fn make_pool(tmp: &TempDir) -> Arc<AgentPool> {
let (tx, _) = tokio::sync::broadcast::channel(64);
let pool = AgentPool::new(3001, tx);
let state = crate::state::SessionState::default();
*state.project_root.lock().unwrap() = Some(tmp.path().to_path_buf());
Arc::new(pool)
}
// ── list_agents ───────────────────────────────────────────────────────────
#[tokio::test]
async fn list_agents_excludes_archived_stories() {
let tmp = TempDir::new().unwrap();
make_work_dirs(&tmp);
write_story_file(
&tmp,
".huskies/work/6_archived/79_story_archived.md",
"---\nname: archived\n---\n",
);
let pool = make_pool(&tmp);
pool.inject_test_agent("79_story_archived", "coder-1", AgentStatus::Completed);
pool.inject_test_agent("80_story_active", "coder-1", AgentStatus::Running);
let agents = list_agents(&pool, Some(tmp.path())).unwrap();
assert!(!agents.iter().any(|a| a.story_id == "79_story_archived"));
assert!(agents.iter().any(|a| a.story_id == "80_story_active"));
}
#[tokio::test]
async fn list_agents_includes_all_when_no_project_root() {
let tmp = TempDir::new().unwrap();
let pool = make_pool(&tmp);
pool.inject_test_agent("42_story_whatever", "coder-1", AgentStatus::Completed);
let agents = list_agents(&pool, None).unwrap();
assert!(agents.iter().any(|a| a.story_id == "42_story_whatever"));
}
// ── get_agent_config ──────────────────────────────────────────────────────
#[test]
-77
View File
@@ -4,22 +4,6 @@
//! return a result without touching the filesystem, network, or any mutable
//! global state. This makes them fast to test without tempdirs or async runtimes.
use crate::agent_log::LogEntry;
use crate::agents::AgentInfo;
/// Filter a list of agents, removing any whose story is archived.
///
/// `is_archived` is a predicate injected by the caller — typically a closure
/// over the project root that calls `io::is_archived`. This keeps the function
/// pure: it never touches the filesystem itself.
pub fn filter_non_archived<F>(agents: Vec<AgentInfo>, is_archived: F) -> Vec<AgentInfo>
where
F: Fn(&str) -> bool,
{
agents
.into_iter()
.filter(|info| !is_archived(&info.story_id))
.collect()
}
/// Concatenate the text of all `output` events from an agent log.
///
@@ -42,22 +26,6 @@ pub fn collect_output_text(entries: &[LogEntry]) -> String {
#[cfg(test)]
mod tests {
use super::*;
use crate::agents::AgentStatus;
fn make_agent(story_id: &str) -> AgentInfo {
AgentInfo {
story_id: story_id.to_string(),
agent_name: "coder-1".to_string(),
status: AgentStatus::Running,
session_id: None,
worktree_path: None,
base_branch: None,
completion: None,
log_session_id: None,
throttled: false,
termination_reason: None,
}
}
fn make_log_entry(event_type: &str, text: Option<&str>) -> LogEntry {
let mut obj = serde_json::Map::new();
@@ -74,51 +42,6 @@ mod tests {
}
}
// ── filter_non_archived ───────────────────────────────────────────────────
#[test]
fn filter_keeps_non_archived_agents() {
let agents = vec![make_agent("10_active"), make_agent("11_active")];
let result = filter_non_archived(agents, |_| false);
assert_eq!(result.len(), 2);
}
#[test]
fn filter_removes_archived_agents() {
let agents = vec![make_agent("10_archived"), make_agent("11_active")];
let result = filter_non_archived(agents, |id| id == "10_archived");
assert_eq!(result.len(), 1);
assert_eq!(result[0].story_id, "11_active");
}
#[test]
fn filter_removes_all_when_all_archived() {
let agents = vec![make_agent("10_a"), make_agent("11_b")];
let result = filter_non_archived(agents, |_| true);
assert!(result.is_empty());
}
#[test]
fn filter_returns_empty_for_empty_input() {
let result = filter_non_archived(vec![], |_| false);
assert!(result.is_empty());
}
#[test]
fn filter_preserves_order() {
let agents = vec![
make_agent("1_a"),
make_agent("2_b"),
make_agent("3_c"),
make_agent("4_d"),
];
let result = filter_non_archived(agents, |id| id == "2_b");
assert_eq!(result.len(), 3);
assert_eq!(result[0].story_id, "1_a");
assert_eq!(result[1].story_id, "3_c");
assert_eq!(result[2].story_id, "4_d");
}
// ── collect_output_text ───────────────────────────────────────────────────
#[test]