From 05c3b11e571ba5f88f2dbbf49f1dc95ce4cd28bc Mon Sep 17 00:00:00 2001 From: dave Date: Sun, 12 Apr 2026 17:46:55 +0000 Subject: [PATCH] huskies: merge 551_bug_get_agent_output_mcp_tool_returns_fetch_failed_for_running_agents --- server/src/http/mcp/agent_tools.rs | 9 +- server/src/http/mcp/mod.rs | 160 ----------------------------- 2 files changed, 7 insertions(+), 162 deletions(-) diff --git a/server/src/http/mcp/agent_tools.rs b/server/src/http/mcp/agent_tools.rs index 8856f429..8fd6c9d0 100644 --- a/server/src/http/mcp/agent_tools.rs +++ b/server/src/http/mcp/agent_tools.rs @@ -142,8 +142,13 @@ pub(super) async fn tool_get_agent_output( all_lines.push(String::new()); // blank line between sessions } - // Append buffered live events when a specific agent is requested. - if let Some(agent_name) = agent_name_filter + // Append buffered live events only when no disk logs exist yet. + // emit_event() writes to disk synchronously, so disk is always up-to-date + // when log files are present. Showing live events alongside disk logs would + // produce duplicates. Only fall back to in-memory events when the log + // writer failed and nothing was persisted to disk. + if log_files.is_empty() + && let Some(agent_name) = agent_name_filter && let Ok(live_events) = ctx.agents.drain_events(story_id, agent_name) && !live_events.is_empty() { diff --git a/server/src/http/mcp/mod.rs b/server/src/http/mcp/mod.rs index fd5eaf88..8c161744 100644 --- a/server/src/http/mcp/mod.rs +++ b/server/src/http/mcp/mod.rs @@ -134,19 +134,6 @@ pub async fn mcp_post_handler(req: &Request, body: Body, ctx: Data<&Arc Response { .body(Body::from_string(sse_body)) } -/// Stream agent events as SSE — each event is a separate JSON-RPC notification, -/// followed by a final JSON-RPC response with the matching request id. -fn handle_agent_output_sse( - id: Option, - params: &Value, - ctx: &AppContext, -) -> Response { - let args = params.get("arguments").cloned().unwrap_or(json!({})); - let story_id = match args.get("story_id").and_then(|v| v.as_str()) { - Some(s) => s.to_string(), - None => return to_sse_response(JsonRpcResponse::error( - id, - -32602, - "Missing required argument: story_id".into(), - )), - }; - let agent_name = match args.get("agent_name").and_then(|v| v.as_str()) { - Some(s) => s.to_string(), - None => return to_sse_response(JsonRpcResponse::error( - id, - -32602, - "Missing required argument: agent_name".into(), - )), - }; - let timeout_ms = args - .get("timeout_ms") - .and_then(|v| v.as_u64()) - .unwrap_or(10000) - .min(30000); - - let mut rx = match ctx.agents.subscribe(&story_id, &agent_name) { - Ok(rx) => rx, - Err(_) => { - // Agent not in pool (exited or never started) — fall back to disk logs. - let text = if let Ok(project_root) = ctx.agents.get_project_root(&ctx.state) { - use crate::agent_log; - let log_files = agent_log::list_story_log_files( - &project_root, - &story_id, - Some(&agent_name), - ); - if log_files.is_empty() { - format!("No log files found for story '{story_id}' agent '{agent_name}'.") - } else { - let mut all_lines: Vec = Vec::new(); - for path in &log_files { - let file_name = - path.file_name().and_then(|n| n.to_str()).unwrap_or("?"); - all_lines.push(format!( - "=== {} ===", - file_name.trim_end_matches(".log") - )); - match agent_log::read_log_as_readable_lines(path) { - Ok(lines) => all_lines.extend(lines), - Err(e) => all_lines.push(format!("[ERROR reading log: {e}]")), - } - all_lines.push(String::new()); - } - all_lines.join("\n") - } - } else { - format!("No log files found for story '{story_id}' agent '{agent_name}'.") - }; - return to_sse_response(JsonRpcResponse::success( - id, - json!({ "content": [{"type": "text", "text": text}] }), - )); - } - }; - - let final_id = id; - let stream = async_stream::stream! { - let deadline = tokio::time::Instant::now() - + std::time::Duration::from_millis(timeout_ms); - let mut done = false; - - loop { - let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); - if remaining.is_zero() { - break; - } - - match tokio::time::timeout(remaining, rx.recv()).await { - Ok(Ok(event)) => { - let is_terminal = matches!( - &event, - crate::agents::AgentEvent::Done { .. } - | crate::agents::AgentEvent::Error { .. } - ); - // Send each event as a JSON-RPC notification (no id) - if let Ok(event_json) = serde_json::to_value(&event) { - let notification = json!({ - "jsonrpc": "2.0", - "method": "notifications/tools/progress", - "params": { "event": event_json } - }); - if let Ok(s) = serde_json::to_string(¬ification) { - yield Ok::<_, std::io::Error>(format!("data: {s}\n\n")); - } - } - if is_terminal { - done = true; - break; - } - } - Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => { - let notification = json!({ - "jsonrpc": "2.0", - "method": "notifications/tools/progress", - "params": { "event": {"type": "warning", "message": format!("Skipped {n} events")} } - }); - if let Ok(s) = serde_json::to_string(¬ification) { - yield Ok::<_, std::io::Error>(format!("data: {s}\n\n")); - } - } - Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => { - done = true; - break; - } - Err(_) => break, // timeout - } - } - - // Final response with the request id - let final_resp = JsonRpcResponse::success( - final_id, - json!({ - "content": [{ - "type": "text", - "text": if done { "Agent stream ended." } else { "Stream timed out; call again to continue." } - }] - }), - ); - if let Ok(s) = serde_json::to_string(&final_resp) { - yield Ok::<_, std::io::Error>(format!("data: {s}\n\n")); - } - }; - - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "text/event-stream") - .header("Cache-Control", "no-cache") - .body(Body::from_bytes_stream( - futures::StreamExt::map(stream, |r| r.map(bytes::Bytes::from)), - )) -} - // ── MCP protocol handlers ───────────────────────────────────────── fn handle_initialize(id: Option, params: &Value) -> JsonRpcResponse {