huskies: merge 855

This commit is contained in:
dave
2026-04-29 21:35:55 +00:00
parent a7b1572693
commit 4d24b5b661
17 changed files with 204 additions and 973 deletions
+2 -1
View File
@@ -9,7 +9,8 @@
//! - [`websocket`] — WebSocket handlers (CRDT-sync, event push)
//! - [`rest`] — REST API handlers (agents, projects, bot config, pipeline)
mod jsonrpc;
/// JSON-RPC 2.0 request/response types shared across gateway handlers.
pub(crate) mod jsonrpc;
mod mcp;
mod rest;
mod websocket;
+42 -31
View File
@@ -1,26 +1,25 @@
//! `tools/call` MCP method — dispatches a tool name to the appropriate `*_tools` module.
use serde_json::{Value, json};
use serde_json::Value;
use super::JsonRpcResponse;
use super::{
agent_tools, diagnostics, git_tools, merge_tools, qa_tools, shell_tools, status_tools,
story_tools, wizard_tools,
};
use crate::http::context::AppContext;
use crate::slog_warn;
// ── Tool dispatch ─────────────────────────────────────────────────
pub(super) async fn handle_tools_call(
id: Option<Value>,
params: &Value,
/// Execute an MCP tool by name, returning the text result or an error string.
///
/// This is the shared dispatch entry point used by both the WebSocket
/// rendezvous channel and API-based agent runtimes (Gemini, OpenAI).
pub async fn dispatch_tool_call(
tool_name: &str,
args: Value,
ctx: &AppContext,
) -> JsonRpcResponse {
let tool_name = params.get("name").and_then(|v| v.as_str()).unwrap_or("");
let args = params.get("arguments").cloned().unwrap_or(json!({}));
let result = match tool_name {
) -> Result<String, String> {
match tool_name {
// Workflow tools
"create_story" => story_tools::tool_create_story(&args, ctx),
"validate_stories" => story_tools::tool_validate_stories(ctx),
@@ -120,31 +119,43 @@ pub(super) async fn handle_tools_call(
"wizard_skip" => wizard_tools::tool_wizard_skip(ctx),
"wizard_retry" => wizard_tools::tool_wizard_retry(ctx),
_ => Err(format!("Unknown tool: {tool_name}")),
};
match result {
Ok(content) => JsonRpcResponse::success(
id,
json!({
"content": [{ "type": "text", "text": content }]
}),
),
Err(msg) => {
slog_warn!("[mcp] Tool call failed: tool={tool_name} error={msg}");
JsonRpcResponse::success(
id,
json!({
"content": [{ "type": "text", "text": msg }],
"isError": true
}),
)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::http::gateway::jsonrpc::JsonRpcResponse;
use crate::slog_warn;
use serde_json::json;
/// Test helper: invoke a `tools/call` JSON-RPC request and return the response.
pub(in crate::http::mcp) async fn handle_tools_call(
id: Option<serde_json::Value>,
params: &serde_json::Value,
ctx: &crate::http::context::AppContext,
) -> JsonRpcResponse {
let tool_name = params.get("name").and_then(|v| v.as_str()).unwrap_or("");
let args = params.get("arguments").cloned().unwrap_or(json!({}));
match super::dispatch_tool_call(tool_name, args, ctx).await {
Ok(content) => JsonRpcResponse::success(
id,
json!({
"content": [{ "type": "text", "text": content }]
}),
),
Err(msg) => {
slog_warn!("[mcp] Tool call failed: tool={tool_name} error={msg}");
JsonRpcResponse::success(
id,
json!({
"content": [{ "type": "text", "text": msg }],
"isError": true
}),
)
}
}
}
use crate::http::test_helpers::test_ctx;
#[test]
+11 -571
View File
@@ -1,18 +1,15 @@
//! HTTP MCP server module.
use crate::http::context::AppContext;
use poem::handler;
use poem::http::StatusCode;
use poem::web::Data;
use poem::{Body, Request, Response};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::sync::Arc;
//! MCP tool dispatch and schema module.
//!
//! Agents no longer connect via an HTTP `/mcp` endpoint. Tool dispatch
//! is invoked directly by API-based runtimes (Gemini, OpenAI) and by
//! the WebSocket-based `/crdt-sync` rendezvous channel.
/// MCP tools for agent start, stop, wait, list, and inspect.
pub mod agent_tools;
/// MCP tools for server logs, CRDT dump, version, and story movement.
pub mod diagnostics;
/// MCP tool dispatch — routes a tool name to the appropriate handler module.
pub mod dispatch;
/// MCP tools for git operations scoped to agent worktrees.
pub mod git_tools;
/// MCP tools for merge status and merge-to-master operations.
@@ -25,568 +22,11 @@ pub mod shell_tools;
pub mod status_tools;
/// MCP tools for creating, updating, and managing stories and bugs.
pub mod story_tools;
/// MCP tool schema definitions for `tools/list`.
pub mod tools_list;
/// MCP tools for the project setup wizard.
pub mod wizard_tools;
mod dispatch;
mod tools_list;
use dispatch::handle_tools_call;
use tools_list::handle_tools_list;
/// Returns true when the Accept header includes text/event-stream.
fn wants_sse(req: &Request) -> bool {
req.header("accept")
.unwrap_or("")
.contains("text/event-stream")
}
// ── JSON-RPC structs ──────────────────────────────────────────────
#[derive(Deserialize)]
struct JsonRpcRequest {
jsonrpc: String,
id: Option<Value>,
method: String,
#[serde(default)]
params: Value,
}
#[derive(Serialize)]
pub(super) struct JsonRpcResponse {
jsonrpc: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
id: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<JsonRpcError>,
}
#[derive(Serialize)]
struct JsonRpcError {
code: i64,
message: String,
#[serde(skip_serializing_if = "Option::is_none")]
data: Option<Value>,
}
impl JsonRpcResponse {
pub(super) fn success(id: Option<Value>, result: Value) -> Self {
Self {
jsonrpc: "2.0",
id,
result: Some(result),
error: None,
}
}
pub(super) fn error(id: Option<Value>, code: i64, message: String) -> Self {
Self {
jsonrpc: "2.0",
id,
result: None,
error: Some(JsonRpcError {
code,
message,
data: None,
}),
}
}
}
// ── Poem handlers ─────────────────────────────────────────────────
#[handler]
pub async fn mcp_get_handler() -> Response {
Response::builder()
.status(StatusCode::METHOD_NOT_ALLOWED)
.body(Body::empty())
}
#[handler]
pub async fn mcp_post_handler(req: &Request, body: Body, ctx: Data<&Arc<AppContext>>) -> Response {
// Validate Content-Type
let content_type = req.header("content-type").unwrap_or("");
if !content_type.is_empty() && !content_type.contains("application/json") {
return json_rpc_error_response(
None,
-32700,
"Unsupported Content-Type; expected application/json".into(),
);
}
let bytes = match body.into_bytes().await {
Ok(b) => b,
Err(_) => return json_rpc_error_response(None, -32700, "Parse error".into()),
};
let rpc: JsonRpcRequest = match serde_json::from_slice(&bytes) {
Ok(r) => r,
Err(_) => return json_rpc_error_response(None, -32700, "Parse error".into()),
};
if rpc.jsonrpc != "2.0" {
return json_rpc_error_response(rpc.id, -32600, "Invalid JSON-RPC version".into());
}
// Notifications (no id) — accept silently
if rpc.id.is_none() || rpc.id.as_ref() == Some(&Value::Null) {
if rpc.method.starts_with("notifications/") {
return Response::builder()
.status(StatusCode::ACCEPTED)
.body(Body::empty());
}
return json_rpc_error_response(None, -32600, "Missing id".into());
}
let sse = wants_sse(req);
// Streaming agent output over SSE
if sse && rpc.method == "tools/call" {
let tool_name = rpc
.params
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("");
if tool_name == "run_command" {
return shell_tools::handle_run_command_sse(rpc.id, &rpc.params, &ctx);
}
}
let resp = match rpc.method.as_str() {
"initialize" => handle_initialize(rpc.id, &rpc.params),
"tools/list" => handle_tools_list(rpc.id),
"tools/call" => handle_tools_call(rpc.id, &rpc.params, &ctx).await,
_ => JsonRpcResponse::error(rpc.id, -32601, format!("Unknown method: {}", rpc.method)),
};
if sse {
to_sse_response(resp)
} else {
to_json_response(resp)
}
}
fn json_rpc_error_response(id: Option<Value>, code: i64, message: String) -> Response {
to_json_response(JsonRpcResponse::error(id, code, message))
}
fn to_json_response(resp: JsonRpcResponse) -> Response {
let body = serde_json::to_vec(&resp).unwrap_or_default();
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(Body::from(body))
}
pub(super) fn to_sse_response(resp: JsonRpcResponse) -> Response {
let json = serde_json::to_string(&resp).unwrap_or_default();
let sse_body = format!("data: {json}\n\n");
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/event-stream")
.header("Cache-Control", "no-cache")
.body(Body::from_string(sse_body))
}
// ── MCP protocol handlers ─────────────────────────────────────────
fn handle_initialize(id: Option<Value>, params: &Value) -> JsonRpcResponse {
let _protocol_version = params
.get("protocolVersion")
.and_then(|v| v.as_str())
.unwrap_or("2025-03-26");
JsonRpcResponse::success(
id,
json!({
"protocolVersion": "2025-03-26",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "huskies",
"version": "1.0.0"
}
}),
)
}
// Re-export for test code in submodules that references `super::super::handle_tools_list`.
#[cfg(test)]
mod tests {
use super::*;
use crate::http::test_helpers::test_ctx;
#[test]
fn json_rpc_response_serializes_success() {
let resp = JsonRpcResponse::success(Some(json!(1)), json!({"ok": true}));
let s = serde_json::to_string(&resp).unwrap();
assert!(s.contains("\"result\""));
assert!(!s.contains("\"error\""));
}
#[test]
fn json_rpc_response_serializes_error() {
let resp = JsonRpcResponse::error(Some(json!(1)), -32600, "bad".into());
let s = serde_json::to_string(&resp).unwrap();
assert!(s.contains("\"error\""));
assert!(!s.contains("\"result\""));
}
#[test]
fn initialize_returns_capabilities() {
let resp = handle_initialize(
Some(json!(1)),
&json!({"protocolVersion": "2025-03-26", "capabilities": {}, "clientInfo": {"name": "test", "version": "1.0"}}),
);
let result = resp.result.unwrap();
assert_eq!(result["protocolVersion"], "2025-03-26");
assert!(result["capabilities"]["tools"].is_object());
assert_eq!(result["serverInfo"]["name"], "huskies");
}
#[test]
fn to_sse_response_wraps_in_data_prefix() {
let resp = JsonRpcResponse::success(Some(json!(1)), json!({"ok": true}));
let http_resp = to_sse_response(resp);
assert_eq!(
http_resp.headers().get("content-type").unwrap(),
"text/event-stream"
);
}
#[test]
fn wants_sse_detects_accept_header() {
// Can't easily construct a Request in tests without TestClient,
// so test the logic indirectly via to_sse_response format
let resp = JsonRpcResponse::success(Some(json!(1)), json!("ok"));
let json_resp = to_json_response(resp);
assert_eq!(
json_resp.headers().get("content-type").unwrap(),
"application/json"
);
}
#[test]
fn json_rpc_error_response_builds_json_response() {
let resp = json_rpc_error_response(Some(json!(42)), -32600, "test error".into());
assert_eq!(resp.status(), poem::http::StatusCode::OK);
assert_eq!(
resp.headers().get("content-type").unwrap(),
"application/json"
);
}
// ── HTTP handler tests (TestClient) ───────────────────────────
fn test_mcp_app(ctx: std::sync::Arc<AppContext>) -> impl poem::Endpoint {
use poem::EndpointExt;
poem::Route::new()
.at("/mcp", poem::post(mcp_post_handler).get(mcp_get_handler))
.data(ctx)
}
async fn read_body_json(resp: poem::test::TestResponse) -> Value {
let body = resp.0.into_body().into_string().await.unwrap();
serde_json::from_str(&body).unwrap()
}
async fn post_json_mcp<E: poem::Endpoint>(
cli: &poem::test::TestClient<E>,
payload: &str,
) -> Value {
let resp = cli
.post("/mcp")
.header("content-type", "application/json")
.body(payload.to_string())
.send()
.await;
read_body_json(resp).await
}
#[tokio::test]
async fn mcp_get_handler_returns_405() {
let tmp = tempfile::tempdir().unwrap();
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
let resp = cli.get("/mcp").send().await;
assert_eq!(resp.0.status(), poem::http::StatusCode::METHOD_NOT_ALLOWED);
}
#[tokio::test]
async fn mcp_post_invalid_content_type_returns_error() {
let tmp = tempfile::tempdir().unwrap();
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
let resp = cli
.post("/mcp")
.header("content-type", "text/plain")
.body("{}")
.send()
.await;
let body = read_body_json(resp).await;
assert!(body.get("error").is_some(), "expected error field: {body}");
}
#[tokio::test]
async fn mcp_post_invalid_json_returns_parse_error() {
let tmp = tempfile::tempdir().unwrap();
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
let resp = cli
.post("/mcp")
.header("content-type", "application/json")
.body("not-valid-json")
.send()
.await;
let body = read_body_json(resp).await;
assert!(body.get("error").is_some(), "expected error field: {body}");
}
#[tokio::test]
async fn mcp_post_wrong_jsonrpc_version_returns_error() {
let tmp = tempfile::tempdir().unwrap();
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
let body = post_json_mcp(
&cli,
r#"{"jsonrpc":"1.0","id":1,"method":"initialize","params":{}}"#,
)
.await;
assert!(
body["error"]["message"]
.as_str()
.unwrap_or("")
.contains("version"),
"expected version error: {body}"
);
}
#[tokio::test]
async fn mcp_post_notification_with_null_id_returns_accepted() {
let tmp = tempfile::tempdir().unwrap();
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
let resp = cli
.post("/mcp")
.header("content-type", "application/json")
.body(r#"{"jsonrpc":"2.0","method":"notifications/initialized","params":{}}"#)
.send()
.await;
assert_eq!(resp.0.status(), poem::http::StatusCode::ACCEPTED);
}
#[tokio::test]
async fn mcp_post_notification_with_explicit_null_id_returns_accepted() {
let tmp = tempfile::tempdir().unwrap();
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
let resp = cli
.post("/mcp")
.header("content-type", "application/json")
.body(r#"{"jsonrpc":"2.0","id":null,"method":"notifications/initialized","params":{}}"#)
.send()
.await;
assert_eq!(resp.0.status(), poem::http::StatusCode::ACCEPTED);
}
#[tokio::test]
async fn mcp_post_missing_id_non_notification_returns_error() {
let tmp = tempfile::tempdir().unwrap();
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
let body = post_json_mcp(
&cli,
r#"{"jsonrpc":"2.0","method":"initialize","params":{}}"#,
)
.await;
assert!(body.get("error").is_some(), "expected error: {body}");
}
#[tokio::test]
async fn mcp_post_unknown_method_returns_error() {
let tmp = tempfile::tempdir().unwrap();
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
let body = post_json_mcp(
&cli,
r#"{"jsonrpc":"2.0","id":1,"method":"bogus/method","params":{}}"#,
)
.await;
assert!(
body["error"]["message"]
.as_str()
.unwrap_or("")
.contains("Unknown method"),
"expected unknown method error: {body}"
);
}
#[tokio::test]
async fn mcp_post_initialize_returns_capabilities() {
let tmp = tempfile::tempdir().unwrap();
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
let body = post_json_mcp(
&cli,
r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-03-26","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#,
)
.await;
assert_eq!(body["result"]["protocolVersion"], "2025-03-26");
assert_eq!(body["result"]["serverInfo"]["name"], "huskies");
}
#[tokio::test]
async fn mcp_post_tools_list_returns_tools() {
let tmp = tempfile::tempdir().unwrap();
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
let body = post_json_mcp(
&cli,
r#"{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}"#,
)
.await;
assert!(body["result"]["tools"].is_array());
}
#[tokio::test]
async fn mcp_post_sse_returns_event_stream_content_type() {
let tmp = tempfile::tempdir().unwrap();
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
let resp = cli
.post("/mcp")
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}"#)
.send()
.await;
assert_eq!(
resp.0.headers().get("content-type").unwrap(),
"text/event-stream"
);
}
#[tokio::test]
async fn mcp_post_sse_get_agent_output_missing_story_id() {
let tmp = tempfile::tempdir().unwrap();
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
let resp = cli
.post("/mcp")
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"get_agent_output","arguments":{}}}"#)
.send()
.await;
assert_eq!(
resp.0.headers().get("content-type").unwrap(),
"text/event-stream",
"expected SSE content-type"
);
}
#[tokio::test]
async fn mcp_post_sse_get_agent_output_without_agent_name_returns_disk_content() {
// Without agent_name the SSE live-streaming intercept is skipped and
// the disk-based handler runs. The transport still wraps the result in
// SSE format (data: …\n\n) because the client sent Accept: text/event-stream,
// but the content should be a valid JSON-RPC result, not a subscribe error.
let tmp = tempfile::tempdir().unwrap();
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
let resp = cli
.post("/mcp")
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"get_agent_output","arguments":{"story_id":"1_test"}}}"#)
.send()
.await;
let body = resp.0.into_body().into_string().await.unwrap();
// Body is SSE-wrapped: "data: {…}\n\n" — strip the prefix and verify it's
// a valid JSON-RPC result (not an error about missing agent_name).
let json_part = body
.trim_start_matches("data: ")
.trim_end_matches("\n\n")
.trim();
let parsed: serde_json::Value = serde_json::from_str(json_part)
.unwrap_or_else(|_| panic!("expected JSON-RPC in SSE body, got: {body}"));
assert!(
parsed.get("result").is_some(),
"expected JSON-RPC result (disk-based handler ran): {parsed}"
);
// Must NOT be an error about missing agent_name (agent_name is now optional)
assert!(
parsed.get("error").is_none(),
"unexpected error when agent_name omitted: {parsed}"
);
}
#[tokio::test]
async fn mcp_post_sse_get_agent_output_no_agent_no_logs_returns_not_found() {
// Agent not in pool and no log files → SSE success with "No log files found" message.
let tmp = tempfile::tempdir().unwrap();
let ctx = std::sync::Arc::new(test_ctx(tmp.path()));
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
let resp = cli
.post("/mcp")
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"get_agent_output","arguments":{"story_id":"99_nope","agent_name":"bot"}}}"#)
.send()
.await;
assert_eq!(
resp.0.headers().get("content-type").unwrap(),
"text/event-stream"
);
let body = resp.0.into_body().into_string().await.unwrap();
assert!(body.contains("data:"), "expected SSE data prefix: {body}");
// Must NOT return isError — should be a success result with "No log files found"
assert!(
!body.contains("isError"),
"expected no isError for missing agent: {body}"
);
assert!(
body.contains("No log files found"),
"expected not-found message: {body}"
);
}
#[tokio::test]
async fn mcp_post_sse_get_agent_output_exited_agent_reads_disk_logs() {
use crate::agent_log::AgentLogWriter;
use crate::agents::AgentEvent;
// Agent has exited (not in pool) but wrote logs to disk.
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let mut writer = AgentLogWriter::new(root, "42_story_foo", "coder-1", "sess-sse").unwrap();
writer
.write_event(&AgentEvent::Output {
story_id: "42_story_foo".to_string(),
agent_name: "coder-1".to_string(),
text: "disk output".to_string(),
})
.unwrap();
drop(writer);
let ctx = std::sync::Arc::new(test_ctx(root));
let cli = poem::test::TestClient::new(test_mcp_app(ctx));
let resp = cli
.post("/mcp")
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.body(r#"{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"get_agent_output","arguments":{"story_id":"42_story_foo","agent_name":"coder-1"}}}"#)
.send()
.await;
let body = resp.0.into_body().into_string().await.unwrap();
assert!(
body.contains("disk output"),
"expected disk log content in SSE response: {body}"
);
assert!(
!body.contains("isError"),
"expected no error for exited agent with logs: {body}"
);
}
}
pub(crate) use tools_list::handle_tools_list;
+1 -167
View File
@@ -1,8 +1,5 @@
//! MCP shell command execution: tool_run_command + SSE streaming variant.
//! MCP shell command execution: `tool_run_command`.
use bytes::Bytes;
use futures::StreamExt;
use poem::{Body, Response};
use serde_json::{Value, json};
use std::path::PathBuf;
@@ -89,169 +86,6 @@ pub(crate) async fn tool_run_command(args: &Value, ctx: &AppContext) -> Result<S
}
}
/// SSE streaming run_command: spawns the process and emits stdout/stderr lines
pub(crate) fn handle_run_command_sse(
id: Option<Value>,
params: &Value,
ctx: &AppContext,
) -> Response {
use super::super::{JsonRpcResponse, to_sse_response};
let args = params.get("arguments").cloned().unwrap_or(json!({}));
let command = match args.get("command").and_then(|v| v.as_str()) {
Some(c) => c.to_string(),
None => {
return to_sse_response(JsonRpcResponse::error(
id,
-32602,
"Missing required argument: command".into(),
));
}
};
let working_dir = match args.get("working_dir").and_then(|v| v.as_str()) {
Some(d) => d.to_string(),
None => {
return to_sse_response(JsonRpcResponse::error(
id,
-32602,
"Missing required argument: working_dir".into(),
));
}
};
let timeout_secs = args
.get("timeout")
.and_then(|v| v.as_u64())
.unwrap_or(DEFAULT_TIMEOUT_SECS)
.min(MAX_TIMEOUT_SECS);
if let Some(reason) = is_dangerous(&command) {
return to_sse_response(JsonRpcResponse::error(id, -32602, reason));
}
let canonical_dir = match validate_working_dir(&working_dir, ctx) {
Ok(d) => d,
Err(e) => return to_sse_response(JsonRpcResponse::error(id, -32602, e)),
};
let final_id = id;
let stream = async_stream::stream! {
use tokio::io::AsyncBufReadExt;
let mut child = match tokio::process::Command::new("bash")
.arg("-c")
.arg(&command)
.current_dir(&canonical_dir)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => {
let resp = JsonRpcResponse::success(
final_id,
json!({
"content": [{"type": "text", "text": format!("Failed to spawn process: {e}")}],
"isError": true
}),
);
if let Ok(s) = serde_json::to_string(&resp) {
yield Ok::<_, std::io::Error>(format!("data: {s}\n\n"));
}
return;
}
};
let stdout = child.stdout.take().expect("stdout piped");
let stderr = child.stderr.take().expect("stderr piped");
let mut stdout_lines = tokio::io::BufReader::new(stdout).lines();
let mut stderr_lines = tokio::io::BufReader::new(stderr).lines();
let deadline = tokio::time::Instant::now()
+ std::time::Duration::from_secs(timeout_secs);
let mut stdout_done = false;
let mut stderr_done = false;
let mut timed_out = false;
loop {
if stdout_done && stderr_done {
break;
}
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
timed_out = true;
let _ = child.kill().await;
break;
}
tokio::select! {
line = stdout_lines.next_line(), if !stdout_done => {
match line {
Ok(Some(l)) => {
let notif = json!({
"jsonrpc": "2.0",
"method": "notifications/tools/progress",
"params": { "stream": "stdout", "line": l }
});
if let Ok(s) = serde_json::to_string(&notif) {
yield Ok::<_, std::io::Error>(format!("data: {s}\n\n"));
}
}
_ => { stdout_done = true; }
}
}
line = stderr_lines.next_line(), if !stderr_done => {
match line {
Ok(Some(l)) => {
let notif = json!({
"jsonrpc": "2.0",
"method": "notifications/tools/progress",
"params": { "stream": "stderr", "line": l }
});
if let Ok(s) = serde_json::to_string(&notif) {
yield Ok::<_, std::io::Error>(format!("data: {s}\n\n"));
}
}
_ => { stderr_done = true; }
}
}
_ = tokio::time::sleep(remaining) => {
timed_out = true;
let _ = child.kill().await;
break;
}
}
}
let exit_code = child.wait().await.ok().and_then(|s| s.code()).unwrap_or(-1);
let summary = json!({
"exit_code": exit_code,
"timed_out": timed_out,
});
let final_resp = JsonRpcResponse::success(
final_id,
json!({
"content": [{"type": "text", "text": summary.to_string()}]
}),
);
if let Ok(s) = serde_json::to_string(&final_resp) {
yield Ok::<_, std::io::Error>(format!("data: {s}\n\n"));
}
};
Response::builder()
.status(poem::http::StatusCode::OK)
.header("Content-Type", "text/event-stream")
.header("Cache-Control", "no-cache")
.body(Body::from_bytes_stream(stream.map(|r| r.map(Bytes::from))))
}
/// Run the project's test suite (`script/test`) and block until complete.
///
/// Spawns the test process, then polls every second server-side until the
+1 -1
View File
@@ -6,7 +6,7 @@
mod exec;
mod script;
pub(crate) use exec::{handle_run_command_sse, tool_run_command};
pub(crate) use exec::tool_run_command;
pub(crate) use script::{
tool_get_test_result, tool_run_build, tool_run_check, tool_run_lint, tool_run_tests,
};
+17 -5
View File
@@ -1,24 +1,36 @@
//! `tools/list` MCP method — returns the static schema for every tool the server exposes.
use serde_json::{Value, json};
use super::JsonRpcResponse;
use serde_json::Value;
mod agent_tools;
mod story_tools;
mod system_tools;
pub(super) fn handle_tools_list(id: Option<Value>) -> JsonRpcResponse {
/// Return the full list of MCP tool definitions (name, description, inputSchema).
///
/// Used by API-based runtimes (Gemini, OpenAI) that need tool schemas
/// without going through the network.
pub fn list_tools() -> Vec<Value> {
let mut tools = Vec::new();
tools.extend(story_tools::story_tools());
tools.extend(agent_tools::agent_tools());
tools.extend(system_tools::system_tools());
JsonRpcResponse::success(id, json!({ "tools": tools }))
tools
}
/// Wrap `list_tools()` in a JSON-RPC response (test-only helper).
#[cfg(test)]
pub(crate) fn handle_tools_list(
id: Option<Value>,
) -> crate::http::gateway::jsonrpc::JsonRpcResponse {
use serde_json::json;
crate::http::gateway::jsonrpc::JsonRpcResponse::success(id, json!({ "tools": list_tools() }))
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn tools_list_returns_all_tools() {
-4
View File
@@ -112,10 +112,6 @@ pub fn build_routes(
"/agents/:story_id/:agent_name/stream",
get(agents_sse::agent_stream),
)
.at(
"/mcp",
post(mcp::mcp_post_handler).get(mcp::mcp_get_handler),
)
.at("/identity", get(identity::identity_handler))
.at(
"/oauth/authorize",