diff --git a/server/src/http/mcp/mod.rs b/server/src/http/mcp/mod.rs index dcc5e44a..39c4cc40 100644 --- a/server/src/http/mcp/mod.rs +++ b/server/src/http/mcp/mod.rs @@ -14,6 +14,9 @@ pub mod dispatch; pub mod git_tools; /// MCP tools for merge status and merge-to-master operations. pub mod merge_tools; +/// Task-local progress emitter used to deliver `notifications/progress` +/// during long-running tool calls so MCP clients' socket timers reset. +pub mod progress; /// MCP tools for QA request, approve, and reject workflows. pub mod qa_tools; /// MCP tools for running shell commands and test suites. @@ -36,10 +39,12 @@ use crate::http::gateway::jsonrpc::JsonRpcResponse; use poem::handler; use poem::http::StatusCode; use poem::web::Data; -use poem::{Body, Request, Response}; +use poem::web::sse::{Event, SSE}; +use poem::{Body, IntoResponse, Request, Response}; use serde::Deserialize; use serde_json::{Value, json}; use std::sync::Arc; +use std::time::Duration; #[derive(Deserialize)] struct JsonRpcRequest { @@ -102,6 +107,24 @@ pub async fn mcp_post_handler(req: &Request, body: Body, ctx: Data<&Arc handle_initialize(rpc.id), "tools/list" => { @@ -114,6 +137,110 @@ pub async fn mcp_post_handler(req: &Request, body: Body, ctx: Data<&Arc, + params: Value, + progress_token: Value, + ctx: Arc, +) -> Response { + use tokio::sync::mpsc::unbounded_channel; + + let tool_name = params + .get("name") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let args = params.get("arguments").cloned().unwrap_or(json!({})); + + let (tx, mut rx) = unbounded_channel::(); + let emitter = progress::ProgressEmitter { + token: progress_token, + tx, + }; + + // Spawn dispatch with the emitter installed in the task-local. The + // task runs to completion regardless of whether the SSE consumer is + // still listening (so the test job, etc. still finishes and writes + // its final state to the CRDT even on client disconnect). + let dispatch_ctx = Arc::clone(&ctx); + let dispatch_handle = tokio::spawn(async move { + progress::EMITTER + .scope(emitter, async move { + dispatch::dispatch_tool_call(&tool_name, args, &dispatch_ctx).await + }) + .await + }); + + let id_for_final = id; + let stream = async_stream::stream! { + let mut dispatch_handle = Some(dispatch_handle); + loop { + tokio::select! { + biased; + Some(notification) = rx.recv() => { + let data = serde_json::to_string(¬ification).unwrap_or_default(); + yield Event::message(data); + } + join_result = async { + match dispatch_handle.as_mut() { + Some(h) => h.await, + None => std::future::pending().await, + } + }, if dispatch_handle.is_some() => { + // Mark the handle taken so the select arm's guard + // disables on the next iteration (we break before + // looping but the explicit take is clearer). + let _ = dispatch_handle.take(); + // Drain any progress events that landed while we were + // racing the select arm — they would otherwise sit in + // the receiver and never be flushed. + while let Ok(notification) = rx.try_recv() { + let data = serde_json::to_string(¬ification).unwrap_or_default(); + yield Event::message(data); + } + let final_resp = match join_result { + Ok(Ok(content)) => JsonRpcResponse::success( + id_for_final.clone(), + json!({ "content": [{ "type": "text", "text": content }] }), + ), + Ok(Err(msg)) => { + crate::slog_warn!("[mcp/sse] Tool call failed: {msg}"); + JsonRpcResponse::success( + id_for_final.clone(), + json!({ + "content": [{ "type": "text", "text": msg }], + "isError": true + }), + ) + } + Err(join_err) => { + crate::slog_warn!("[mcp/sse] Tool dispatch task panicked: {join_err}"); + JsonRpcResponse::error( + id_for_final.clone(), + -32603, + format!("internal error: {join_err}"), + ) + } + }; + let data = serde_json::to_string(&final_resp).unwrap_or_default(); + yield Event::message(data); + break; + } + } + } + }; + + SSE::new(stream) + .keep_alive(Duration::from_secs(15)) + .into_response() +} + fn json_response(resp: JsonRpcResponse) -> Response { let body = serde_json::to_vec(&resp).unwrap_or_default(); Response::builder() diff --git a/server/src/http/mcp/progress.rs b/server/src/http/mcp/progress.rs new file mode 100644 index 00000000..2babfa96 --- /dev/null +++ b/server/src/http/mcp/progress.rs @@ -0,0 +1,123 @@ +//! Task-local plumbing for MCP `notifications/progress` events. +//! +//! Long-running tool handlers (notably `tool_run_tests`) emit progress events +//! during execution so the MCP client's transport timer resets and the call +//! never surfaces as a tool-call error to the agent. The HTTP MCP handler +//! installs an emitter via `tokio::task_local!` before dispatching the tool +//! and converts emitted events into SSE messages on the response stream. +//! +//! Tool handlers call [`emit_progress`] unconditionally — when no emitter is +//! installed (plain JSON response path, or test code), the call is a no-op. +//! That keeps handler code free of `if let Some(...) =` ceremony for the +//! progress-aware branch. +//! +//! Wire format follows MCP 2025-03-26: the emitter wraps the data in a +//! standard JSON-RPC notification with method `notifications/progress` and a +//! `progressToken` echoed back to the client. + +use serde_json::{Value, json}; +use tokio::sync::mpsc::UnboundedSender; + +/// Per-request channel for emitting progress notifications back to an MCP +/// client. Installed in a `tokio::task_local!` scope by the SSE response +/// path and consumed by the SSE stream producer. +#[derive(Clone)] +pub struct ProgressEmitter { + /// The client-supplied opaque token from `params._meta.progressToken`. + /// Echoed back unchanged in every progress notification so the client + /// can correlate progress with the originating request. + pub token: Value, + /// Channel into the SSE response stream. Each value sent is a + /// fully-formed `notifications/progress` JSON-RPC message ready to be + /// serialised as an SSE `data:` field. + pub tx: UnboundedSender, +} + +tokio::task_local! { + /// Set by the SSE response path before dispatching a tool call. Unset + /// in the plain JSON path and in tests, where [`emit_progress`] no-ops. + pub static EMITTER: ProgressEmitter; +} + +/// Emit a progress notification to the current request's SSE stream, if one +/// is attached. No-op when no emitter is in scope (plain JSON path). +/// +/// `progress` is a monotonically increasing value (typically seconds elapsed +/// for long-running tools); `total` is optional and omitted when unknown; +/// `message` is a short human-readable description of the current state. +pub fn emit_progress(progress: f64, total: Option, message: Option<&str>) { + let _ = EMITTER.try_with(|e| { + let mut params = json!({ + "progressToken": e.token, + "progress": progress, + }); + if let Some(t) = total { + params["total"] = json!(t); + } + if let Some(m) = message { + params["message"] = json!(m); + } + let notification = json!({ + "jsonrpc": "2.0", + "method": "notifications/progress", + "params": params, + }); + // Send is fire-and-forget. If the receiver dropped (client + // disconnected mid-stream), we don't care — the tool dispatch + // task keeps running and writes its final state to the CRDT. + let _ = e.tx.send(notification); + }); +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::sync::mpsc::unbounded_channel; + + #[tokio::test] + async fn emit_progress_no_op_without_emitter() { + // Calling outside a task_local scope must not panic. + emit_progress(1.0, None, Some("hello")); + } + + #[tokio::test] + async fn emit_progress_sends_notification_when_emitter_installed() { + let (tx, mut rx) = unbounded_channel(); + let emitter = ProgressEmitter { + token: json!("test-token"), + tx, + }; + EMITTER + .scope(emitter, async { + emit_progress(5.0, Some(10.0), Some("halfway")); + }) + .await; + + let notif = rx.recv().await.expect("notification must be delivered"); + assert_eq!(notif["method"], "notifications/progress"); + assert_eq!(notif["params"]["progressToken"], "test-token"); + assert_eq!(notif["params"]["progress"], 5.0); + assert_eq!(notif["params"]["total"], 10.0); + assert_eq!(notif["params"]["message"], "halfway"); + } + + #[tokio::test] + async fn emit_progress_omits_optional_fields() { + let (tx, mut rx) = unbounded_channel(); + let emitter = ProgressEmitter { + token: json!(42), + tx, + }; + EMITTER + .scope(emitter, async { + emit_progress(1.0, None, None); + }) + .await; + + let notif = rx.recv().await.unwrap(); + assert_eq!(notif["params"]["progressToken"], 42); + assert_eq!(notif["params"]["progress"], 1.0); + assert!(notif["params"].get("total").is_none()); + assert!(notif["params"].get("message").is_none()); + } +} diff --git a/server/src/http/mcp/shell_tools/script.rs b/server/src/http/mcp/shell_tools/script.rs index 43091286..41824b67 100644 --- a/server/src/http/mcp/shell_tools/script.rs +++ b/server/src/http/mcp/shell_tools/script.rs @@ -13,6 +13,10 @@ use super::exec::validate_working_dir; const TEST_TIMEOUT_SECS: u64 = 1200; const MAX_OUTPUT_LINES: usize = 100; +/// How often `tool_run_tests` emits a `notifications/progress` event while a +/// test job is in flight. Chosen well below typical MCP HTTP transport +/// timeouts (~60s) so the client's socket timer resets before it can fire. +const PROGRESS_INTERVAL_SECS: u64 = 25; // ── In-flight process registry ─────────────────────────────────────────────── // @@ -142,9 +146,24 @@ pub(crate) async fn tool_run_tests(args: &Value, ctx: &AppContext) -> Result= PROGRESS_INTERVAL_SECS { + crate::http::mcp::progress::emit_progress( + elapsed_secs as f64, + None, + Some(&format!("running tests: {elapsed_secs}s elapsed")), + ); + last_progress_emit_secs = elapsed_secs; + } + let mut jobs = active_jobs().lock().map_err(|e| e.to_string())?; let job = match jobs.get_mut(&working_dir) { Some(j) => j, @@ -165,9 +184,18 @@ pub(crate) async fn tool_run_tests(args: &Value, ctx: &AppContext) -> Result Result Result Result Result { let start = std::time::Instant::now(); + let mut last_progress_emit_secs: u64 = 0; loop { match crate::crdt_state::read_test_job(sid) { None => { @@ -273,7 +303,21 @@ async fn attach_to_in_flight_test_job(sid: &str) -> Result { } } - if start.elapsed().as_secs() > TEST_TIMEOUT_SECS { + // Keep the SSE consumer's MCP socket alive while we wait. No-op when + // no emitter is installed (plain JSON path). + let elapsed_secs = start.elapsed().as_secs(); + if elapsed_secs.saturating_sub(last_progress_emit_secs) >= PROGRESS_INTERVAL_SECS { + crate::http::mcp::progress::emit_progress( + elapsed_secs as f64, + None, + Some(&format!( + "attached to in-flight test job: {elapsed_secs}s elapsed" + )), + ); + last_progress_emit_secs = elapsed_secs; + } + + if elapsed_secs > TEST_TIMEOUT_SECS { return Err(format!( "Attached test job for '{sid}' did not complete within {TEST_TIMEOUT_SECS}s" ));