feat(904): MCP progress notifications + SSE for long-running tool calls

Follow-up to bug 903. The attach fix made run_tests retries safe, but
agents still observed the underlying MCP transport timeout as a
tool-call error and had to handle it via retry. Implement the proper
fix: MCP `notifications/progress` events keep the client's transport
timer alive so the call never errors from the agent's perspective.

What changed:

server/src/http/mcp/progress.rs (new)
  - `ProgressEmitter` (progressToken + mpsc sender) installed in a
    `tokio::task_local!` scope by the SSE response path.
  - `emit_progress(progress, total, message)` builds a JSON-RPC
    `notifications/progress` message and sends it via the channel.
    No-op when no emitter is in scope (plain JSON path / tests / API
    runtimes), so tool handlers can call it unconditionally.

server/src/http/mcp/mod.rs
  - mcp_post_handler now detects `Accept: text/event-stream` AND a
    `params._meta.progressToken` on tools/call. When both are present,
    routes through `sse_tools_call` instead of the plain JSON path.
  - sse_tools_call: spawns the dispatch task with the emitter installed,
    builds an SSE stream that interleaves incoming progress events with
    the final JSON-RPC response, with a 15s keep-alive interval as a
    backstop for tools that don't emit their own progress.
  - Plain JSON behaviour is unchanged for non-SSE clients and for
    everything other than tools/call.

server/src/http/mcp/shell_tools/script.rs
  - tool_run_tests poll loop emits `notifications/progress` every 25s
    of elapsed time (well below the typical ~60s MCP transport
    timeout). Attached callers (the bug 903 fix path) also emit so
    their MCP socket stays alive while waiting for the in-flight job.
  - Output filtering: on a passing run the response now returns a
    one-line summary ("All N tests passed.") instead of the full
    `cargo test` stdout, which was pure noise that burned agent
    tokens. Failure output is unchanged (truncated tail with the
    `failures:` section and final test_result line). CRDT entry
    stores the same filtered value so attached callers see it too.

Tests (3 new):
  - emit_progress_no_op_without_emitter — calling outside scope is safe
  - emit_progress_sends_notification_when_emitter_installed — full path
  - emit_progress_omits_optional_fields — total/message optional

Not changed: coder system_prompts still tell agents to retry on
transport-timeout errors. That advice is now belt-and-braces — if
claude-code's HTTP MCP client honours progress notifications, no agent
will ever observe the error; if not, retry is still safe post-903. We
can drop the retry advice once we've observed the SSE path working in
the field.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Timmy
2026-05-12 15:05:04 +01:00
parent a97a10fba2
commit ddc4228b10
3 changed files with 300 additions and 6 deletions
+128 -1
View File
@@ -14,6 +14,9 @@ pub mod dispatch;
pub mod git_tools;
/// MCP tools for merge status and merge-to-master operations.
pub mod merge_tools;
/// Task-local progress emitter used to deliver `notifications/progress`
/// during long-running tool calls so MCP clients' socket timers reset.
pub mod progress;
/// MCP tools for QA request, approve, and reject workflows.
pub mod qa_tools;
/// MCP tools for running shell commands and test suites.
@@ -36,10 +39,12 @@ use crate::http::gateway::jsonrpc::JsonRpcResponse;
use poem::handler;
use poem::http::StatusCode;
use poem::web::Data;
use poem::{Body, Request, Response};
use poem::web::sse::{Event, SSE};
use poem::{Body, IntoResponse, Request, Response};
use serde::Deserialize;
use serde_json::{Value, json};
use std::sync::Arc;
use std::time::Duration;
#[derive(Deserialize)]
struct JsonRpcRequest {
@@ -102,6 +107,24 @@ pub async fn mcp_post_handler(req: &Request, body: Body, ctx: Data<&Arc<AppConte
return json_response(JsonRpcResponse::error(None, -32600, "Missing id".into()));
}
// Progress-aware path: only `tools/call` is long-running, and only when
// the client supplies a `progressToken` AND accepts `text/event-stream`
// do we take the SSE branch. Everything else returns plain JSON as before.
if rpc.method == "tools/call" {
let accepts_sse = req
.header("accept")
.map(|h| h.contains("text/event-stream"))
.unwrap_or(false);
let progress_token = rpc
.params
.get("_meta")
.and_then(|m| m.get("progressToken"))
.cloned();
if let (true, Some(token)) = (accepts_sse, progress_token) {
return sse_tools_call(rpc.id, rpc.params, token, Arc::clone(&ctx)).await;
}
}
let resp = match rpc.method.as_str() {
"initialize" => handle_initialize(rpc.id),
"tools/list" => {
@@ -114,6 +137,110 @@ pub async fn mcp_post_handler(req: &Request, body: Body, ctx: Data<&Arc<AppConte
json_response(resp)
}
/// SSE variant of `tools/call`. Installs a progress emitter in a
/// `tokio::task_local!` scope before dispatching the tool, then streams
/// `notifications/progress` events as they arrive followed by the final
/// JSON-RPC response. Keep-alive comments every 15s keep idle connections
/// from being closed by intermediate proxies even before the tool emits its
/// first progress event.
async fn sse_tools_call(
id: Option<Value>,
params: Value,
progress_token: Value,
ctx: Arc<AppContext>,
) -> Response {
use tokio::sync::mpsc::unbounded_channel;
let tool_name = params
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let args = params.get("arguments").cloned().unwrap_or(json!({}));
let (tx, mut rx) = unbounded_channel::<Value>();
let emitter = progress::ProgressEmitter {
token: progress_token,
tx,
};
// Spawn dispatch with the emitter installed in the task-local. The
// task runs to completion regardless of whether the SSE consumer is
// still listening (so the test job, etc. still finishes and writes
// its final state to the CRDT even on client disconnect).
let dispatch_ctx = Arc::clone(&ctx);
let dispatch_handle = tokio::spawn(async move {
progress::EMITTER
.scope(emitter, async move {
dispatch::dispatch_tool_call(&tool_name, args, &dispatch_ctx).await
})
.await
});
let id_for_final = id;
let stream = async_stream::stream! {
let mut dispatch_handle = Some(dispatch_handle);
loop {
tokio::select! {
biased;
Some(notification) = rx.recv() => {
let data = serde_json::to_string(&notification).unwrap_or_default();
yield Event::message(data);
}
join_result = async {
match dispatch_handle.as_mut() {
Some(h) => h.await,
None => std::future::pending().await,
}
}, if dispatch_handle.is_some() => {
// Mark the handle taken so the select arm's guard
// disables on the next iteration (we break before
// looping but the explicit take is clearer).
let _ = dispatch_handle.take();
// Drain any progress events that landed while we were
// racing the select arm — they would otherwise sit in
// the receiver and never be flushed.
while let Ok(notification) = rx.try_recv() {
let data = serde_json::to_string(&notification).unwrap_or_default();
yield Event::message(data);
}
let final_resp = match join_result {
Ok(Ok(content)) => JsonRpcResponse::success(
id_for_final.clone(),
json!({ "content": [{ "type": "text", "text": content }] }),
),
Ok(Err(msg)) => {
crate::slog_warn!("[mcp/sse] Tool call failed: {msg}");
JsonRpcResponse::success(
id_for_final.clone(),
json!({
"content": [{ "type": "text", "text": msg }],
"isError": true
}),
)
}
Err(join_err) => {
crate::slog_warn!("[mcp/sse] Tool dispatch task panicked: {join_err}");
JsonRpcResponse::error(
id_for_final.clone(),
-32603,
format!("internal error: {join_err}"),
)
}
};
let data = serde_json::to_string(&final_resp).unwrap_or_default();
yield Event::message(data);
break;
}
}
}
};
SSE::new(stream)
.keep_alive(Duration::from_secs(15))
.into_response()
}
fn json_response(resp: JsonRpcResponse) -> Response {
let body = serde_json::to_vec(&resp).unwrap_or_default();
Response::builder()