huskies: merge 948

This commit is contained in:
dave
2026-05-13 04:43:48 +00:00
parent 2f50e2198b
commit f2943c7e69
16 changed files with 995 additions and 205 deletions
+1 -1
View File
@@ -364,7 +364,7 @@ pub(crate) async fn connect_and_sync(url: &str, token: Option<&str>) -> Result<(
if !flush_ok {
break;
}
} else if let Some(rpc_resp) = try_handle_rpc_text(text.as_ref()) {
} else if let Some(rpc_resp) = try_handle_rpc_text(text.as_ref()).await {
// RPC request from the peer — dispatch and reply.
use tokio_tungstenite::tungstenite::Message as TungsteniteMsg;
if let Ok(json) = serde_json::to_string(&rpc_resp)
+1
View File
@@ -69,6 +69,7 @@ mod wire;
pub use auth::{add_join_token, init_token_auth, init_trusted_keys};
pub(crate) use client::connect_and_sync;
pub use client::{RENDEZVOUS_ERROR_THRESHOLD, spawn_rendezvous_client};
pub use rpc::init_rpc_context;
pub(crate) use rpc::try_handle_rpc_text;
pub use server::crdt_sync_handler;
+604 -40
View File
@@ -1,7 +1,7 @@
//! RPC method registry for the `/crdt-sync` WebSocket multiplexer.
//!
//! Incoming [`RpcFrame::RpcRequest`] frames are dispatched through this
//! registry. Each method handler is a plain function that accepts a
//! registry. Each method handler is an async function that accepts a
//! `serde_json::Value` parameter bag and returns a `serde_json::Value` result.
//!
//! # Registering handlers
@@ -9,7 +9,7 @@
//! Add a new entry to the `HANDLERS` static slice:
//!
//! ```rust,ignore
//! ("my.method", handle_my_method as Handler),
//! ("my.method", |p| Box::pin(handle_my_method(p))),
//! ```
//!
//! # Unknown methods
@@ -17,28 +17,117 @@
//! [`dispatch`] returns `Err("NOT_FOUND")` for any method not present in the
//! registry. The caller should translate this into an
//! [`RpcFrame::RpcResponse`] with `ok: false, code: "NOT_FOUND"`.
//!
//! # Global context
//!
//! Many handlers need access to project state (session root, store, workflow).
//! Call [`init_rpc_context`] once at server startup to register these.
//! Handlers that require context return an error result when it has not been
//! set.
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, OnceLock};
use serde_json::Value;
use super::wire::RpcFrame;
use crate::state::SessionState;
use crate::store::JsonFileStore;
use crate::workflow::WorkflowState;
/// Signature for a synchronous RPC method handler.
pub(super) type Handler = fn(Value) -> Value;
/// Future returned by an RPC handler.
type HandlerFuture = Pin<Box<dyn Future<Output = Value> + Send>>;
/// Signature for an async RPC method handler.
pub(super) type Handler = fn(Value) -> HandlerFuture;
/// Shared state made available to all RPC handlers.
pub struct RpcState {
pub state: Arc<SessionState>,
pub store: Arc<JsonFileStore>,
pub workflow: Arc<std::sync::Mutex<WorkflowState>>,
}
/// Global RPC context, initialised once at server startup via [`init_rpc_context`].
static RPC_CTX: OnceLock<RpcState> = OnceLock::new();
/// Register the global RPC context.
///
/// Must be called before any handler that accesses project state is invoked.
/// Subsequent calls are silently ignored (OnceLock semantics).
pub fn init_rpc_context(
state: Arc<SessionState>,
store: Arc<JsonFileStore>,
workflow: Arc<std::sync::Mutex<WorkflowState>>,
) {
let _ = RPC_CTX.set(RpcState {
state,
store,
workflow,
});
}
/// Static registry mapping method names to handlers.
///
/// Add new handlers here. The registry is a plain slice — linear scan is
/// fine for the small number of methods expected.
static HANDLERS: &[(&str, Handler)] = &[
("health.check", handle_health_check),
("active_agents.list", handle_active_agents_list),
("health.check", |p| Box::pin(handle_health_check(p))),
("active_agents.list", |p| {
Box::pin(handle_active_agents_list(p))
}),
("agent_config.list", |p| {
Box::pin(handle_agent_config_list(p))
}),
("settings.get_project", |p| {
Box::pin(handle_settings_get_project(p))
}),
("settings.get_editor", |p| {
Box::pin(handle_settings_get_editor(p))
}),
("model.get_preference", |p| {
Box::pin(handle_model_get_preference(p))
}),
("project.current", |p| Box::pin(handle_project_current(p))),
("project.known", |p| Box::pin(handle_project_known(p))),
("anthropic.key_exists", |p| {
Box::pin(handle_anthropic_key_exists(p))
}),
("anthropic.list_models", |p| {
Box::pin(handle_anthropic_list_models(p))
}),
("ollama.list_models", |p| {
Box::pin(handle_ollama_list_models(p))
}),
("io.home_directory", |p| {
Box::pin(handle_io_home_directory(p))
}),
("io.list_project_files", |p| {
Box::pin(handle_io_list_project_files(p))
}),
("work_items.get", |p| Box::pin(handle_work_items_get(p))),
("work_items.test_results", |p| {
Box::pin(handle_work_items_test_results(p))
}),
("work_items.token_cost", |p| {
Box::pin(handle_work_items_token_cost(p))
}),
("token_usage.all", |p| Box::pin(handle_token_usage_all(p))),
("oauth.status", |p| Box::pin(handle_oauth_status(p))),
("bot_config.get", |p| Box::pin(handle_bot_config_get(p))),
("agents.get_output", |p| {
Box::pin(handle_agents_get_output(p))
}),
];
// ── handlers ─────────────────────────────────────────────────────────────────
/// Handler for the `health.check` method.
///
/// Returns `{"status": "ok"}` unconditionally. Used as a smoke test to
/// verify that the RPC multiplexer is wired up correctly.
fn handle_health_check(_params: Value) -> Value {
async fn handle_health_check(_params: Value) -> Value {
serde_json::json!({"status": "ok"})
}
@@ -48,12 +137,11 @@ fn handle_health_check(_params: Value) -> Value {
/// matching the shape formerly served by `GET /api/agents`. Each entry
/// contains `story_id`, `agent_name`, `status`, `session_id`, and
/// `worktree_path`.
fn handle_active_agents_list(_params: Value) -> Value {
async fn handle_active_agents_list(_params: Value) -> Value {
let entries = crate::crdt_state::read_all_active_agents().unwrap_or_default();
let list: Vec<Value> = entries
.into_iter()
.map(|view| {
// agent_id is the composite key "story_id:agent_name".
let (story_id, agent_name) = view
.agent_id
.rsplit_once(':')
@@ -71,14 +159,389 @@ fn handle_active_agents_list(_params: Value) -> Value {
Value::Array(list)
}
/// Handler for the `agent_config.list` method.
///
/// Returns the configured agent roster from `project.toml`, matching the
/// shape formerly served by `GET /api/agents/config`.
async fn handle_agent_config_list(_params: Value) -> Value {
let Some(ctx) = RPC_CTX.get() else {
return serde_json::json!({"error": "RPC context not initialised"});
};
let Ok(root) = ctx.state.get_project_root() else {
return Value::Array(vec![]);
};
let entries = crate::service::agents::get_agent_config(&root).unwrap_or_default();
let list: Vec<Value> = entries
.into_iter()
.map(|e| {
serde_json::json!({
"name": e.name,
"role": e.role,
"stage": e.stage,
"model": e.model,
"allowed_tools": e.allowed_tools,
"max_turns": e.max_turns,
"max_budget_usd": e.max_budget_usd,
})
})
.collect();
Value::Array(list)
}
/// Handler for the `settings.get_project` method.
///
/// Returns the current `project.toml` scalar settings, matching the shape
/// formerly served by `GET /api/settings`.
async fn handle_settings_get_project(_params: Value) -> Value {
let Some(ctx) = RPC_CTX.get() else {
return serde_json::json!({"error": "RPC context not initialised"});
};
let Ok(root) = ctx.state.get_project_root() else {
return serde_json::json!({"error": "No project open"});
};
match crate::service::settings::load_project_settings(&root) {
Ok(s) => serde_json::json!({
"default_qa": s.default_qa,
"default_coder_model": s.default_coder_model,
"max_coders": s.max_coders,
"max_retries": s.max_retries,
"base_branch": s.base_branch,
"rate_limit_notifications": s.rate_limit_notifications,
"timezone": s.timezone,
"rendezvous": s.rendezvous,
"watcher_sweep_interval_secs": s.watcher_sweep_interval_secs,
"watcher_done_retention_secs": s.watcher_done_retention_secs,
}),
Err(e) => serde_json::json!({"error": e.to_string()}),
}
}
/// Handler for the `settings.get_editor` method.
///
/// Returns the configured editor command from the store, matching the shape
/// formerly served by `GET /api/settings/editor`.
async fn handle_settings_get_editor(_params: Value) -> Value {
let Some(ctx) = RPC_CTX.get() else {
return serde_json::json!({"editor_command": null});
};
let cmd = crate::service::settings::get_editor_command(ctx.store.as_ref());
serde_json::json!({"editor_command": cmd})
}
/// Handler for the `model.get_preference` method.
///
/// Returns the user's saved LLM model name from the store, matching the
/// shape formerly served by `GET /api/model`.
async fn handle_model_get_preference(_params: Value) -> Value {
let Some(ctx) = RPC_CTX.get() else {
return Value::Null;
};
match crate::io::fs::get_model_preference(ctx.store.as_ref()) {
Ok(pref) => serde_json::to_value(pref).unwrap_or(Value::Null),
Err(_) => Value::Null,
}
}
/// Handler for the `project.current` method.
///
/// Returns the currently open project path (or null), matching the shape
/// formerly served by `GET /api/project`.
async fn handle_project_current(_params: Value) -> Value {
let Some(ctx) = RPC_CTX.get() else {
return Value::Null;
};
match crate::service::project::get_current_project(&ctx.state, ctx.store.as_ref()) {
Ok(path) => serde_json::to_value(path).unwrap_or(Value::Null),
Err(_) => Value::Null,
}
}
/// Handler for the `project.known` method.
///
/// Returns the list of previously-opened project paths from the store,
/// matching the shape formerly served by `GET /api/projects`.
async fn handle_project_known(_params: Value) -> Value {
let Some(ctx) = RPC_CTX.get() else {
return Value::Array(vec![]);
};
match crate::service::project::get_known_projects(ctx.store.as_ref()) {
Ok(paths) => serde_json::to_value(paths).unwrap_or(Value::Array(vec![])),
Err(_) => Value::Array(vec![]),
}
}
/// Handler for the `anthropic.key_exists` method.
///
/// Returns true when an Anthropic API key is stored, matching the shape
/// formerly served by `GET /api/anthropic/key/exists`.
async fn handle_anthropic_key_exists(_params: Value) -> Value {
let Some(ctx) = RPC_CTX.get() else {
return Value::Bool(false);
};
match crate::service::anthropic::get_api_key_exists(ctx.store.as_ref()) {
Ok(exists) => Value::Bool(exists),
Err(_) => Value::Bool(false),
}
}
/// Handler for the `anthropic.list_models` method.
///
/// Returns the available Anthropic models, matching the shape formerly
/// served by `GET /api/anthropic/models`. Surfaces upstream errors as a
/// JSON object `{"error": "..."}`.
async fn handle_anthropic_list_models(_params: Value) -> Value {
let Some(ctx) = RPC_CTX.get() else {
return serde_json::json!({"error": "RPC context not initialised"});
};
match crate::service::anthropic::list_models(ctx.store.as_ref()).await {
Ok(models) => serde_json::to_value(models).unwrap_or(Value::Array(vec![])),
Err(e) => serde_json::json!({"error": e.to_string()}),
}
}
/// Handler for the `ollama.list_models` method.
///
/// Returns the available Ollama models for the configured base URL,
/// matching the shape formerly served by `GET /api/ollama/models`.
///
/// Parameters: `{ "base_url"?: string }`.
async fn handle_ollama_list_models(params: Value) -> Value {
let base_url = params
.get("base_url")
.and_then(|v| v.as_str())
.map(str::to_string);
match crate::llm::chat::get_ollama_models(base_url).await {
Ok(models) => serde_json::to_value(models).unwrap_or(Value::Array(vec![])),
Err(_) => Value::Array(vec![]),
}
}
/// Handler for the `io.home_directory` method.
///
/// Returns the user's home directory path, matching the shape formerly
/// served by `GET /api/io/fs/home`.
async fn handle_io_home_directory(_params: Value) -> Value {
match crate::service::file_io::get_home_directory() {
Ok(home) => Value::String(home),
Err(_) => Value::Null,
}
}
/// Handler for the `io.list_project_files` method.
///
/// Returns the list of files in the currently open project, matching the
/// shape formerly served by `GET /api/io/fs/files`.
async fn handle_io_list_project_files(_params: Value) -> Value {
let Some(ctx) = RPC_CTX.get() else {
return Value::Array(vec![]);
};
match crate::service::file_io::list_project_files(&ctx.state).await {
Ok(files) => serde_json::to_value(files).unwrap_or(Value::Array(vec![])),
Err(_) => Value::Array(vec![]),
}
}
/// Handler for the `work_items.get` method.
///
/// Returns the markdown content and metadata for a work item, matching the
/// shape formerly served by `GET /api/work-items/{story_id}`.
///
/// Parameters: `{ "story_id": string }`.
async fn handle_work_items_get(params: Value) -> Value {
let Some(ctx) = RPC_CTX.get() else {
return serde_json::json!({"error": "RPC context not initialised"});
};
let Some(story_id) = params.get("story_id").and_then(|v| v.as_str()) else {
return serde_json::json!({"error": "missing story_id"});
};
let Ok(root) = ctx.state.get_project_root() else {
return serde_json::json!({"error": "No project open"});
};
match crate::service::agents::get_work_item_content(&root, story_id) {
Ok(c) => serde_json::json!({
"content": c.content,
"stage": c.stage,
"name": c.name,
"agent": c.agent,
}),
Err(e) => serde_json::json!({"error": e.to_string()}),
}
}
/// Handler for the `work_items.test_results` method.
///
/// Returns the most recent test-suite results for a story, matching the
/// shape formerly served by `GET /api/work-items/{story_id}/test-results`.
///
/// Parameters: `{ "story_id": string }`.
async fn handle_work_items_test_results(params: Value) -> Value {
let Some(ctx) = RPC_CTX.get() else {
return Value::Null;
};
let Some(story_id) = params.get("story_id").and_then(|v| v.as_str()) else {
return Value::Null;
};
let Ok(root) = ctx.state.get_project_root() else {
return Value::Null;
};
let workflow = ctx.workflow.lock().unwrap();
match crate::service::agents::get_test_results(&root, story_id, &workflow) {
Some(results) => serde_json::to_value(results).unwrap_or(Value::Null),
None => Value::Null,
}
}
/// Handler for the `work_items.token_cost` method.
///
/// Returns the aggregated LLM token cost for a story, matching the shape
/// formerly served by `GET /api/work-items/{story_id}/token-cost`.
///
/// Parameters: `{ "story_id": string }`.
async fn handle_work_items_token_cost(params: Value) -> Value {
let Some(ctx) = RPC_CTX.get() else {
return serde_json::json!({"error": "RPC context not initialised"});
};
let Some(story_id) = params.get("story_id").and_then(|v| v.as_str()) else {
return serde_json::json!({"error": "missing story_id"});
};
let Ok(root) = ctx.state.get_project_root() else {
return serde_json::json!({"error": "No project open"});
};
match crate::service::agents::get_work_item_token_cost(&root, story_id) {
Ok(summary) => serde_json::json!({
"total_cost_usd": summary.total_cost_usd,
"agents": summary.agents.into_iter().map(|a| serde_json::json!({
"agent_name": a.agent_name,
"model": a.model,
"input_tokens": a.input_tokens,
"output_tokens": a.output_tokens,
"cache_creation_input_tokens": a.cache_creation_input_tokens,
"cache_read_input_tokens": a.cache_read_input_tokens,
"total_cost_usd": a.total_cost_usd,
})).collect::<Vec<_>>(),
}),
Err(e) => serde_json::json!({"error": e.to_string()}),
}
}
/// Handler for the `token_usage.all` method.
///
/// Returns every token-usage record for the project, matching the shape
/// formerly served by `GET /api/token-usage`.
async fn handle_token_usage_all(_params: Value) -> Value {
let Some(ctx) = RPC_CTX.get() else {
return serde_json::json!({"records": []});
};
let Ok(root) = ctx.state.get_project_root() else {
return serde_json::json!({"records": []});
};
let records = crate::service::agents::get_all_token_usage(&root).unwrap_or_default();
serde_json::json!({"records": records})
}
/// Handler for the `oauth.status` method.
///
/// Returns the status of every stored OAuth account in the login pool,
/// matching the shape formerly served by `GET /oauth/status`.
async fn handle_oauth_status(_params: Value) -> Value {
let accounts = crate::service::oauth::check_all_accounts();
serde_json::json!({"accounts": accounts})
}
/// Handler for the `bot_config.get` method.
///
/// Reads the credentials stored in `.huskies/bot.toml`, matching the shape
/// formerly served by `GET /api/bot/config`.
async fn handle_bot_config_get(_params: Value) -> Value {
let Some(ctx) = RPC_CTX.get() else {
return bot_config_default();
};
let Ok(root) = ctx.state.get_project_root() else {
return bot_config_default();
};
let path = root.join(".huskies").join("bot.toml");
match std::fs::read_to_string(&path) {
Ok(s) => match toml::from_str::<Value>(&s) {
Ok(v) => merge_bot_config_defaults(v),
Err(_) => bot_config_default(),
},
Err(_) => bot_config_default(),
}
}
/// Default bot config payload — every key present, every value null.
fn bot_config_default() -> Value {
serde_json::json!({
"transport": null,
"enabled": null,
"homeserver": null,
"username": null,
"password": null,
"room_ids": null,
"slack_bot_token": null,
"slack_signing_secret": null,
"slack_channel_ids": null,
})
}
/// Fill in missing keys with `null` so the frontend always sees the full shape.
fn merge_bot_config_defaults(mut v: Value) -> Value {
let obj = v.as_object_mut();
let keys = [
"transport",
"enabled",
"homeserver",
"username",
"password",
"room_ids",
"slack_bot_token",
"slack_signing_secret",
"slack_channel_ids",
];
if let Some(map) = obj {
for k in keys {
map.entry(k.to_string()).or_insert(Value::Null);
}
}
v
}
/// Handler for the `agents.get_output` method.
///
/// Returns the concatenated output text for an agent's most recent session,
/// matching the shape formerly served by
/// `GET /api/agents/{story_id}/{agent_name}/output`.
///
/// Parameters: `{ "story_id": string, "agent_name": string }`.
async fn handle_agents_get_output(params: Value) -> Value {
let Some(ctx) = RPC_CTX.get() else {
return serde_json::json!({"error": "RPC context not initialised"});
};
let Some(story_id) = params.get("story_id").and_then(|v| v.as_str()) else {
return serde_json::json!({"error": "missing story_id"});
};
let Some(agent_name) = params.get("agent_name").and_then(|v| v.as_str()) else {
return serde_json::json!({"error": "missing agent_name"});
};
let Ok(root) = ctx.state.get_project_root() else {
return serde_json::json!({"error": "no project open"});
};
match crate::service::agents::get_agent_output(&root, story_id, agent_name) {
Ok(output) => serde_json::json!({"output": output}),
Err(e) => serde_json::json!({"error": e.to_string()}),
}
}
// ── dispatch ──────────────────────────────────────────────────────────────────
/// Dispatch an incoming RPC method call to the registered handler.
///
/// Returns `Ok(result)` on success or `Err("NOT_FOUND")` if no handler is
/// registered for `method`.
pub(super) fn dispatch(method: &str, params: Value) -> Result<Value, &'static str> {
pub(super) async fn dispatch(method: &str, params: Value) -> Result<Value, &'static str> {
for (name, handler) in HANDLERS {
if *name == method {
return Ok(handler(params));
return Ok(handler(params).await);
}
}
Err("NOT_FOUND")
@@ -89,7 +552,7 @@ pub(super) fn dispatch(method: &str, params: Value) -> Result<Value, &'static st
///
/// Returns `None` if the text is not a valid `rpc_request` frame (i.e. it
/// should be forwarded to the CRDT sync handler instead).
pub(crate) fn try_handle_rpc_text(text: &str) -> Option<RpcFrame> {
pub(crate) async fn try_handle_rpc_text(text: &str) -> Option<RpcFrame> {
let frame: RpcFrame = serde_json::from_str(text).ok()?;
match frame {
RpcFrame::RpcRequest {
@@ -99,7 +562,7 @@ pub(crate) fn try_handle_rpc_text(text: &str) -> Option<RpcFrame> {
params,
..
} => {
let response = match dispatch(&method, params) {
let response = match dispatch(&method, params).await {
Ok(result) => RpcFrame::RpcResponse {
version,
correlation_id,
@@ -128,23 +591,23 @@ pub(crate) fn try_handle_rpc_text(text: &str) -> Option<RpcFrame> {
mod tests {
use super::*;
#[test]
fn health_check_returns_ok_status() {
let result = dispatch("health.check", serde_json::json!({}));
#[tokio::test]
async fn health_check_returns_ok_status() {
let result = dispatch("health.check", serde_json::json!({})).await;
assert!(result.is_ok());
let val = result.unwrap();
assert_eq!(val["status"], "ok");
}
#[test]
fn unknown_method_returns_not_found() {
let result = dispatch("nonexistent.method", serde_json::json!({}));
#[tokio::test]
async fn unknown_method_returns_not_found() {
let result = dispatch("nonexistent.method", serde_json::json!({})).await;
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "NOT_FOUND");
}
#[test]
fn try_handle_rpc_text_health_check() {
#[tokio::test]
async fn try_handle_rpc_text_health_check() {
let req = serde_json::json!({
"kind": "rpc_request",
"version": 1,
@@ -154,7 +617,9 @@ mod tests {
"params": {}
});
let text = serde_json::to_string(&req).unwrap();
let resp = try_handle_rpc_text(&text).expect("must produce a response");
let resp = try_handle_rpc_text(&text)
.await
.expect("must produce a response");
match resp {
RpcFrame::RpcResponse {
ok,
@@ -173,8 +638,8 @@ mod tests {
}
}
#[test]
fn try_handle_rpc_text_unknown_method_returns_not_found() {
#[tokio::test]
async fn try_handle_rpc_text_unknown_method_returns_not_found() {
let req = serde_json::json!({
"kind": "rpc_request",
"version": 1,
@@ -184,7 +649,9 @@ mod tests {
"params": {}
});
let text = serde_json::to_string(&req).unwrap();
let resp = try_handle_rpc_text(&text).expect("must produce a response for unknown method");
let resp = try_handle_rpc_text(&text)
.await
.expect("must produce a response for unknown method");
match resp {
RpcFrame::RpcResponse { ok, code, .. } => {
assert!(!ok, "unknown method must not succeed");
@@ -194,17 +661,14 @@ mod tests {
}
}
#[test]
fn try_handle_rpc_text_ignores_non_rpc_frames() {
// A SyncMessage::Bulk frame must not be treated as an RPC request.
#[tokio::test]
async fn try_handle_rpc_text_ignores_non_rpc_frames() {
let bulk = r#"{"type":"bulk","ops":[]}"#;
assert!(try_handle_rpc_text(bulk).is_none());
assert!(try_handle_rpc_text(bulk).await.is_none());
}
#[test]
fn try_handle_rpc_text_ignores_rpc_response_frames() {
// An incoming rpc_response (e.g. reply to our own outbound request) must
// not trigger a further response.
#[tokio::test]
async fn try_handle_rpc_text_ignores_rpc_response_frames() {
let resp = serde_json::json!({
"kind": "rpc_response",
"version": 1,
@@ -213,16 +677,16 @@ mod tests {
"result": {"status": "ok"}
});
let text = serde_json::to_string(&resp).unwrap();
assert!(try_handle_rpc_text(&text).is_none());
assert!(try_handle_rpc_text(&text).await.is_none());
}
#[test]
fn try_handle_rpc_text_ignores_invalid_json() {
assert!(try_handle_rpc_text("not json at all").is_none());
#[tokio::test]
async fn try_handle_rpc_text_ignores_invalid_json() {
assert!(try_handle_rpc_text("not json at all").await.is_none());
}
#[test]
fn rpc_response_correlation_id_mirrors_request() {
#[tokio::test]
async fn rpc_response_correlation_id_mirrors_request() {
let req = serde_json::json!({
"kind": "rpc_request",
"version": 1,
@@ -232,7 +696,7 @@ mod tests {
"params": {}
});
let text = serde_json::to_string(&req).unwrap();
let resp = try_handle_rpc_text(&text).unwrap();
let resp = try_handle_rpc_text(&text).await.unwrap();
match resp {
RpcFrame::RpcResponse { correlation_id, .. } => {
assert_eq!(correlation_id, "mirror-me");
@@ -240,4 +704,104 @@ mod tests {
_ => panic!("Expected RpcResponse"),
}
}
// ── context-dependent handlers (no context set) ──────────────────────────
#[tokio::test]
async fn agent_config_list_returns_value_without_context() {
let result = dispatch("agent_config.list", serde_json::json!({})).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn settings_get_editor_returns_editor_command_key() {
let result = dispatch("settings.get_editor", serde_json::json!({})).await;
assert!(result.is_ok());
let val = result.unwrap();
assert!(val.get("editor_command").is_some());
}
#[tokio::test]
async fn model_get_preference_returns_a_value() {
let result = dispatch("model.get_preference", serde_json::json!({})).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn project_current_returns_a_value() {
let result = dispatch("project.current", serde_json::json!({})).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn project_known_returns_a_value() {
let result = dispatch("project.known", serde_json::json!({})).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn anthropic_key_exists_returns_a_value() {
let result = dispatch("anthropic.key_exists", serde_json::json!({})).await;
assert!(result.is_ok());
let val = result.unwrap();
assert!(val.is_boolean() || val.is_object());
}
#[tokio::test]
async fn io_home_directory_returns_a_value() {
let result = dispatch("io.home_directory", serde_json::json!({})).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn token_usage_all_returns_records_envelope() {
let result = dispatch("token_usage.all", serde_json::json!({})).await;
assert!(result.is_ok());
let val = result.unwrap();
assert!(val.get("records").is_some());
}
#[tokio::test]
async fn oauth_status_returns_accounts_envelope() {
let result = dispatch("oauth.status", serde_json::json!({})).await;
assert!(result.is_ok());
let val = result.unwrap();
assert!(val.get("accounts").is_some());
}
#[tokio::test]
async fn bot_config_get_returns_full_shape() {
let result = dispatch("bot_config.get", serde_json::json!({})).await;
assert!(result.is_ok());
let val = result.unwrap();
for key in [
"transport",
"enabled",
"homeserver",
"username",
"password",
"room_ids",
"slack_bot_token",
"slack_signing_secret",
"slack_channel_ids",
] {
assert!(val.get(key).is_some(), "missing key {key}");
}
}
#[tokio::test]
async fn work_items_get_missing_story_id_returns_error() {
let result = dispatch("work_items.get", serde_json::json!({})).await;
assert!(result.is_ok());
let val = result.unwrap();
assert!(val.get("error").is_some());
}
#[tokio::test]
async fn agents_get_output_missing_story_id_returns_error() {
let result = dispatch("agents.get_output", serde_json::json!({})).await;
assert!(result.is_ok());
let val = result.unwrap();
assert!(val.get("error").is_some());
}
}
+1 -1
View File
@@ -264,7 +264,7 @@ pub async fn crdt_sync_handler(
if !flush_ok {
break;
}
} else if let Some(rpc_resp) = try_handle_rpc_text(&text) {
} else if let Some(rpc_resp) = try_handle_rpc_text(&text).await {
// RPC request — dispatch to registry and send response.
if let Ok(json) = serde_json::to_string(&rpc_resp)
&& sink.send(WsMessage::Text(json)).await.is_err()
+1 -1
View File
@@ -167,7 +167,7 @@ pub fn build_routes(
#[poem::handler]
pub async fn rpc_http_handler(body: poem::web::Json<serde_json::Value>) -> poem::Response {
let text = serde_json::to_string(&body.0).unwrap_or_default();
match crate::crdt_sync::try_handle_rpc_text(&text) {
match crate::crdt_sync::try_handle_rpc_text(&text).await {
Some(response) => {
let json = serde_json::to_string(&response).unwrap_or_default();
poem::Response::builder()
+2 -2
View File
@@ -97,7 +97,7 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
};
// Handle read-RPC frames (discriminated by "kind", not "type").
if let Some(rpc_resp) = crate::crdt_sync::try_handle_rpc_text(&text) {
if let Some(rpc_resp) = crate::crdt_sync::try_handle_rpc_text(&text).await {
if let Ok(resp_text) = serde_json::to_string(&rpc_resp) {
let _ = raw_tx.send(resp_text);
}
@@ -160,7 +160,7 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
Some(Ok(WsMessage::Text(inner_text))) = stream.next() => {
// Handle read-RPC frames during active chat.
if let Some(rpc_resp) = crate::crdt_sync::try_handle_rpc_text(&inner_text) {
if let Some(rpc_resp) = crate::crdt_sync::try_handle_rpc_text(&inner_text).await {
if let Ok(resp_text) = serde_json::to_string(&rpc_resp) {
let _ = raw_tx.send(resp_text);
}
+4 -2
View File
@@ -102,6 +102,10 @@ async fn main() -> Result<(), std::io::Error> {
}
let store = Arc::new(JsonFileStore::from_path(store_path).map_err(std::io::Error::other)?);
// Shared workflow state — same instance is reused for HTTP handlers below.
let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default()));
crate::crdt_sync::init_rpc_context(app_state.clone(), store.clone(), Arc::clone(&workflow));
// Collect CLI args, skipping the binary name (argv[0]).
let raw_args: Vec<String> = std::env::args().skip(1).collect();
@@ -174,8 +178,6 @@ async fn main() -> Result<(), std::io::Error> {
return agent_mode::run(agent_root, rendezvous, port, join_token, agent_gateway_url).await;
}
let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default()));
// Event bus: broadcast channel for pipeline lifecycle events.
let (watcher_tx, _) = broadcast::channel::<io::watcher::WatcherEvent>(1024);
let agents = Arc::new(AgentPool::new(port, watcher_tx.clone()));