ddc4228b10
Follow-up to bug 903. The attach fix made run_tests retries safe, but
agents still observed the underlying MCP transport timeout as a
tool-call error and had to handle it via retry. Implement the proper
fix: MCP `notifications/progress` events keep the client's transport
timer alive so the call never errors from the agent's perspective.
What changed:
server/src/http/mcp/progress.rs (new)
- `ProgressEmitter` (progressToken + mpsc sender) installed in a
`tokio::task_local!` scope by the SSE response path.
- `emit_progress(progress, total, message)` builds a JSON-RPC
`notifications/progress` message and sends it via the channel.
No-op when no emitter is in scope (plain JSON path / tests / API
runtimes), so tool handlers can call it unconditionally.
server/src/http/mcp/mod.rs
- mcp_post_handler now detects `Accept: text/event-stream` AND a
`params._meta.progressToken` on tools/call. When both are present,
routes through `sse_tools_call` instead of the plain JSON path.
- sse_tools_call: spawns the dispatch task with the emitter installed,
builds an SSE stream that interleaves incoming progress events with
the final JSON-RPC response, with a 15s keep-alive interval as a
backstop for tools that don't emit their own progress.
- Plain JSON behaviour is unchanged for non-SSE clients and for
everything other than tools/call.
server/src/http/mcp/shell_tools/script.rs
- tool_run_tests poll loop emits `notifications/progress` every 25s
of elapsed time (well below the typical ~60s MCP transport
timeout). Attached callers (the bug 903 fix path) also emit so
their MCP socket stays alive while waiting for the in-flight job.
- Output filtering: on a passing run the response now returns a
one-line summary ("All N tests passed.") instead of the full
`cargo test` stdout, which was pure noise that burned agent
tokens. Failure output is unchanged (truncated tail with the
`failures:` section and final test_result line). CRDT entry
stores the same filtered value so attached callers see it too.
Tests (3 new):
- emit_progress_no_op_without_emitter — calling outside scope is safe
- emit_progress_sends_notification_when_emitter_installed — full path
- emit_progress_omits_optional_fields — total/message optional
Not changed: coder system_prompts still tell agents to retry on
transport-timeout errors. That advice is now belt-and-braces — if
claude-code's HTTP MCP client honours progress notifications, no agent
will ever observe the error; if not, retry is still safe post-903. We
can drop the retry advice once we've observed the SSE path working in
the field.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
284 lines
10 KiB
Rust
284 lines
10 KiB
Rust
//! MCP HTTP endpoint and tool dispatch module.
|
|
//!
|
|
//! Local Claude Code agents connect to `POST /mcp` with a JSON-RPC 2.0
|
|
//! envelope. Tool dispatch is also invoked directly by API-based runtimes
|
|
//! (Gemini, OpenAI) via [`dispatch::dispatch_tool_call`].
|
|
|
|
/// MCP tools for agent start, stop, wait, list, and inspect.
|
|
pub mod agent_tools;
|
|
/// MCP tools for server logs, CRDT dump, version, and story movement.
|
|
pub mod diagnostics;
|
|
/// MCP tool dispatch — routes a tool name to the appropriate handler module.
|
|
pub mod dispatch;
|
|
/// MCP tools for git operations scoped to agent worktrees.
|
|
pub mod git_tools;
|
|
/// MCP tools for merge status and merge-to-master operations.
|
|
pub mod merge_tools;
|
|
/// Task-local progress emitter used to deliver `notifications/progress`
|
|
/// during long-running tool calls so MCP clients' socket timers reset.
|
|
pub mod progress;
|
|
/// MCP tools for QA request, approve, and reject workflows.
|
|
pub mod qa_tools;
|
|
/// MCP tools for running shell commands and test suites.
|
|
pub mod shell_tools;
|
|
/// MCP tools for pipeline status, story todos, and triage dump.
|
|
pub mod status_tools;
|
|
/// MCP tools for creating, updating, and managing stories and bugs.
|
|
pub mod story_tools;
|
|
/// MCP tool schema definitions for `tools/list`.
|
|
pub mod tools_list;
|
|
/// MCP tools for the project setup wizard.
|
|
pub mod wizard_tools;
|
|
|
|
// Re-export for test code in submodules that references `super::super::handle_tools_list`.
|
|
#[cfg(test)]
|
|
pub(crate) use tools_list::handle_tools_list;
|
|
|
|
use crate::http::context::AppContext;
|
|
use crate::http::gateway::jsonrpc::JsonRpcResponse;
|
|
use poem::handler;
|
|
use poem::http::StatusCode;
|
|
use poem::web::Data;
|
|
use poem::web::sse::{Event, SSE};
|
|
use poem::{Body, IntoResponse, Request, Response};
|
|
use serde::Deserialize;
|
|
use serde_json::{Value, json};
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
#[derive(Deserialize)]
|
|
struct JsonRpcRequest {
|
|
jsonrpc: String,
|
|
id: Option<Value>,
|
|
method: String,
|
|
#[serde(default)]
|
|
params: Value,
|
|
}
|
|
|
|
/// `GET /mcp` — method not allowed (MCP uses POST).
|
|
#[handler]
|
|
pub async fn mcp_get_handler() -> Response {
|
|
Response::builder()
|
|
.status(StatusCode::METHOD_NOT_ALLOWED)
|
|
.body(Body::empty())
|
|
}
|
|
|
|
/// `POST /mcp` — JSON-RPC 2.0 entry point for `initialize`, `tools/list`,
|
|
/// `tools/call`, and `notifications/*`.
|
|
#[handler]
|
|
pub async fn mcp_post_handler(req: &Request, body: Body, ctx: Data<&Arc<AppContext>>) -> Response {
|
|
let content_type = req.header("content-type").unwrap_or("");
|
|
if !content_type.is_empty() && !content_type.contains("application/json") {
|
|
return json_response(JsonRpcResponse::error(
|
|
None,
|
|
-32700,
|
|
"Unsupported Content-Type; expected application/json".into(),
|
|
));
|
|
}
|
|
|
|
let bytes = match body.into_bytes().await {
|
|
Ok(b) => b,
|
|
Err(_) => {
|
|
return json_response(JsonRpcResponse::error(None, -32700, "Parse error".into()));
|
|
}
|
|
};
|
|
|
|
let rpc: JsonRpcRequest = match serde_json::from_slice(&bytes) {
|
|
Ok(r) => r,
|
|
Err(_) => {
|
|
return json_response(JsonRpcResponse::error(None, -32700, "Parse error".into()));
|
|
}
|
|
};
|
|
|
|
if rpc.jsonrpc != "2.0" {
|
|
return json_response(JsonRpcResponse::error(
|
|
rpc.id,
|
|
-32600,
|
|
"Invalid JSON-RPC version".into(),
|
|
));
|
|
}
|
|
|
|
if rpc.id.is_none() || rpc.id.as_ref() == Some(&Value::Null) {
|
|
if rpc.method.starts_with("notifications/") {
|
|
return Response::builder()
|
|
.status(StatusCode::ACCEPTED)
|
|
.body(Body::empty());
|
|
}
|
|
return json_response(JsonRpcResponse::error(None, -32600, "Missing id".into()));
|
|
}
|
|
|
|
// Progress-aware path: only `tools/call` is long-running, and only when
|
|
// the client supplies a `progressToken` AND accepts `text/event-stream`
|
|
// do we take the SSE branch. Everything else returns plain JSON as before.
|
|
if rpc.method == "tools/call" {
|
|
let accepts_sse = req
|
|
.header("accept")
|
|
.map(|h| h.contains("text/event-stream"))
|
|
.unwrap_or(false);
|
|
let progress_token = rpc
|
|
.params
|
|
.get("_meta")
|
|
.and_then(|m| m.get("progressToken"))
|
|
.cloned();
|
|
if let (true, Some(token)) = (accepts_sse, progress_token) {
|
|
return sse_tools_call(rpc.id, rpc.params, token, Arc::clone(&ctx)).await;
|
|
}
|
|
}
|
|
|
|
let resp = match rpc.method.as_str() {
|
|
"initialize" => handle_initialize(rpc.id),
|
|
"tools/list" => {
|
|
JsonRpcResponse::success(rpc.id, json!({ "tools": tools_list::list_tools() }))
|
|
}
|
|
"tools/call" => handle_tools_call(rpc.id, &rpc.params, &ctx).await,
|
|
_ => JsonRpcResponse::error(rpc.id, -32601, format!("Unknown method: {}", rpc.method)),
|
|
};
|
|
|
|
json_response(resp)
|
|
}
|
|
|
|
/// SSE variant of `tools/call`. Installs a progress emitter in a
|
|
/// `tokio::task_local!` scope before dispatching the tool, then streams
|
|
/// `notifications/progress` events as they arrive followed by the final
|
|
/// JSON-RPC response. Keep-alive comments every 15s keep idle connections
|
|
/// from being closed by intermediate proxies even before the tool emits its
|
|
/// first progress event.
|
|
async fn sse_tools_call(
|
|
id: Option<Value>,
|
|
params: Value,
|
|
progress_token: Value,
|
|
ctx: Arc<AppContext>,
|
|
) -> Response {
|
|
use tokio::sync::mpsc::unbounded_channel;
|
|
|
|
let tool_name = params
|
|
.get("name")
|
|
.and_then(|v| v.as_str())
|
|
.unwrap_or("")
|
|
.to_string();
|
|
let args = params.get("arguments").cloned().unwrap_or(json!({}));
|
|
|
|
let (tx, mut rx) = unbounded_channel::<Value>();
|
|
let emitter = progress::ProgressEmitter {
|
|
token: progress_token,
|
|
tx,
|
|
};
|
|
|
|
// Spawn dispatch with the emitter installed in the task-local. The
|
|
// task runs to completion regardless of whether the SSE consumer is
|
|
// still listening (so the test job, etc. still finishes and writes
|
|
// its final state to the CRDT even on client disconnect).
|
|
let dispatch_ctx = Arc::clone(&ctx);
|
|
let dispatch_handle = tokio::spawn(async move {
|
|
progress::EMITTER
|
|
.scope(emitter, async move {
|
|
dispatch::dispatch_tool_call(&tool_name, args, &dispatch_ctx).await
|
|
})
|
|
.await
|
|
});
|
|
|
|
let id_for_final = id;
|
|
let stream = async_stream::stream! {
|
|
let mut dispatch_handle = Some(dispatch_handle);
|
|
loop {
|
|
tokio::select! {
|
|
biased;
|
|
Some(notification) = rx.recv() => {
|
|
let data = serde_json::to_string(¬ification).unwrap_or_default();
|
|
yield Event::message(data);
|
|
}
|
|
join_result = async {
|
|
match dispatch_handle.as_mut() {
|
|
Some(h) => h.await,
|
|
None => std::future::pending().await,
|
|
}
|
|
}, if dispatch_handle.is_some() => {
|
|
// Mark the handle taken so the select arm's guard
|
|
// disables on the next iteration (we break before
|
|
// looping but the explicit take is clearer).
|
|
let _ = dispatch_handle.take();
|
|
// Drain any progress events that landed while we were
|
|
// racing the select arm — they would otherwise sit in
|
|
// the receiver and never be flushed.
|
|
while let Ok(notification) = rx.try_recv() {
|
|
let data = serde_json::to_string(¬ification).unwrap_or_default();
|
|
yield Event::message(data);
|
|
}
|
|
let final_resp = match join_result {
|
|
Ok(Ok(content)) => JsonRpcResponse::success(
|
|
id_for_final.clone(),
|
|
json!({ "content": [{ "type": "text", "text": content }] }),
|
|
),
|
|
Ok(Err(msg)) => {
|
|
crate::slog_warn!("[mcp/sse] Tool call failed: {msg}");
|
|
JsonRpcResponse::success(
|
|
id_for_final.clone(),
|
|
json!({
|
|
"content": [{ "type": "text", "text": msg }],
|
|
"isError": true
|
|
}),
|
|
)
|
|
}
|
|
Err(join_err) => {
|
|
crate::slog_warn!("[mcp/sse] Tool dispatch task panicked: {join_err}");
|
|
JsonRpcResponse::error(
|
|
id_for_final.clone(),
|
|
-32603,
|
|
format!("internal error: {join_err}"),
|
|
)
|
|
}
|
|
};
|
|
let data = serde_json::to_string(&final_resp).unwrap_or_default();
|
|
yield Event::message(data);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
SSE::new(stream)
|
|
.keep_alive(Duration::from_secs(15))
|
|
.into_response()
|
|
}
|
|
|
|
fn json_response(resp: JsonRpcResponse) -> Response {
|
|
let body = serde_json::to_vec(&resp).unwrap_or_default();
|
|
Response::builder()
|
|
.status(StatusCode::OK)
|
|
.header("Content-Type", "application/json")
|
|
.body(Body::from(body))
|
|
}
|
|
|
|
fn handle_initialize(id: Option<Value>) -> JsonRpcResponse {
|
|
JsonRpcResponse::success(
|
|
id,
|
|
json!({
|
|
"protocolVersion": "2025-03-26",
|
|
"capabilities": { "tools": {} },
|
|
"serverInfo": { "name": "huskies", "version": "1.0.0" }
|
|
}),
|
|
)
|
|
}
|
|
|
|
async fn handle_tools_call(id: Option<Value>, params: &Value, ctx: &AppContext) -> JsonRpcResponse {
|
|
let tool_name = params.get("name").and_then(|v| v.as_str()).unwrap_or("");
|
|
let args = params.get("arguments").cloned().unwrap_or(json!({}));
|
|
|
|
match dispatch::dispatch_tool_call(tool_name, args, ctx).await {
|
|
Ok(content) => JsonRpcResponse::success(
|
|
id,
|
|
json!({ "content": [{ "type": "text", "text": content }] }),
|
|
),
|
|
Err(msg) => {
|
|
crate::slog_warn!("[mcp] Tool call failed: tool={tool_name} error={msg}");
|
|
JsonRpcResponse::success(
|
|
id,
|
|
json!({
|
|
"content": [{ "type": "text", "text": msg }],
|
|
"isError": true
|
|
}),
|
|
)
|
|
}
|
|
}
|
|
}
|