huskies: merge 551_bug_get_agent_output_mcp_tool_returns_fetch_failed_for_running_agents

This commit is contained in:
dave
2026-04-12 17:46:55 +00:00
parent ae4cacefe8
commit 05c3b11e57
2 changed files with 7 additions and 162 deletions
+7 -2
View File
@@ -142,8 +142,13 @@ pub(super) async fn tool_get_agent_output(
all_lines.push(String::new()); // blank line between sessions all_lines.push(String::new()); // blank line between sessions
} }
// Append buffered live events when a specific agent is requested. // Append buffered live events only when no disk logs exist yet.
if let Some(agent_name) = agent_name_filter // emit_event() writes to disk synchronously, so disk is always up-to-date
// when log files are present. Showing live events alongside disk logs would
// produce duplicates. Only fall back to in-memory events when the log
// writer failed and nothing was persisted to disk.
if log_files.is_empty()
&& let Some(agent_name) = agent_name_filter
&& let Ok(live_events) = ctx.agents.drain_events(story_id, agent_name) && let Ok(live_events) = ctx.agents.drain_events(story_id, agent_name)
&& !live_events.is_empty() && !live_events.is_empty()
{ {
-160
View File
@@ -134,19 +134,6 @@ pub async fn mcp_post_handler(req: &Request, body: Body, ctx: Data<&Arc<AppConte
.get("name") .get("name")
.and_then(|v| v.as_str()) .and_then(|v| v.as_str())
.unwrap_or(""); .unwrap_or("");
if tool_name == "get_agent_output" {
// Only use live SSE streaming when agent_name is explicitly provided.
// Without agent_name, fall through to the disk-based handler below.
let has_agent_name = rpc
.params
.get("arguments")
.and_then(|a| a.get("agent_name"))
.and_then(|v| v.as_str())
.is_some();
if has_agent_name {
return handle_agent_output_sse(rpc.id, &rpc.params, &ctx);
}
}
if tool_name == "run_command" { if tool_name == "run_command" {
return shell_tools::handle_run_command_sse(rpc.id, &rpc.params, &ctx); return shell_tools::handle_run_command_sse(rpc.id, &rpc.params, &ctx);
} }
@@ -188,153 +175,6 @@ pub(super) fn to_sse_response(resp: JsonRpcResponse) -> Response {
.body(Body::from_string(sse_body)) .body(Body::from_string(sse_body))
} }
/// Stream agent events as SSE — each event is a separate JSON-RPC notification,
/// followed by a final JSON-RPC response with the matching request id.
fn handle_agent_output_sse(
id: Option<Value>,
params: &Value,
ctx: &AppContext,
) -> Response {
let args = params.get("arguments").cloned().unwrap_or(json!({}));
let story_id = match args.get("story_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => return to_sse_response(JsonRpcResponse::error(
id,
-32602,
"Missing required argument: story_id".into(),
)),
};
let agent_name = match args.get("agent_name").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => return to_sse_response(JsonRpcResponse::error(
id,
-32602,
"Missing required argument: agent_name".into(),
)),
};
let timeout_ms = args
.get("timeout_ms")
.and_then(|v| v.as_u64())
.unwrap_or(10000)
.min(30000);
let mut rx = match ctx.agents.subscribe(&story_id, &agent_name) {
Ok(rx) => rx,
Err(_) => {
// Agent not in pool (exited or never started) — fall back to disk logs.
let text = if let Ok(project_root) = ctx.agents.get_project_root(&ctx.state) {
use crate::agent_log;
let log_files = agent_log::list_story_log_files(
&project_root,
&story_id,
Some(&agent_name),
);
if log_files.is_empty() {
format!("No log files found for story '{story_id}' agent '{agent_name}'.")
} else {
let mut all_lines: Vec<String> = Vec::new();
for path in &log_files {
let file_name =
path.file_name().and_then(|n| n.to_str()).unwrap_or("?");
all_lines.push(format!(
"=== {} ===",
file_name.trim_end_matches(".log")
));
match agent_log::read_log_as_readable_lines(path) {
Ok(lines) => all_lines.extend(lines),
Err(e) => all_lines.push(format!("[ERROR reading log: {e}]")),
}
all_lines.push(String::new());
}
all_lines.join("\n")
}
} else {
format!("No log files found for story '{story_id}' agent '{agent_name}'.")
};
return to_sse_response(JsonRpcResponse::success(
id,
json!({ "content": [{"type": "text", "text": text}] }),
));
}
};
let final_id = id;
let stream = async_stream::stream! {
let deadline = tokio::time::Instant::now()
+ std::time::Duration::from_millis(timeout_ms);
let mut done = false;
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
break;
}
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(event)) => {
let is_terminal = matches!(
&event,
crate::agents::AgentEvent::Done { .. }
| crate::agents::AgentEvent::Error { .. }
);
// Send each event as a JSON-RPC notification (no id)
if let Ok(event_json) = serde_json::to_value(&event) {
let notification = json!({
"jsonrpc": "2.0",
"method": "notifications/tools/progress",
"params": { "event": event_json }
});
if let Ok(s) = serde_json::to_string(&notification) {
yield Ok::<_, std::io::Error>(format!("data: {s}\n\n"));
}
}
if is_terminal {
done = true;
break;
}
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
let notification = json!({
"jsonrpc": "2.0",
"method": "notifications/tools/progress",
"params": { "event": {"type": "warning", "message": format!("Skipped {n} events")} }
});
if let Ok(s) = serde_json::to_string(&notification) {
yield Ok::<_, std::io::Error>(format!("data: {s}\n\n"));
}
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
done = true;
break;
}
Err(_) => break, // timeout
}
}
// Final response with the request id
let final_resp = JsonRpcResponse::success(
final_id,
json!({
"content": [{
"type": "text",
"text": if done { "Agent stream ended." } else { "Stream timed out; call again to continue." }
}]
}),
);
if let Ok(s) = serde_json::to_string(&final_resp) {
yield Ok::<_, std::io::Error>(format!("data: {s}\n\n"));
}
};
Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/event-stream")
.header("Cache-Control", "no-cache")
.body(Body::from_bytes_stream(
futures::StreamExt::map(stream, |r| r.map(bytes::Bytes::from)),
))
}
// ── MCP protocol handlers ───────────────────────────────────────── // ── MCP protocol handlers ─────────────────────────────────────────
fn handle_initialize(id: Option<Value>, params: &Value) -> JsonRpcResponse { fn handle_initialize(id: Option<Value>, params: &Value) -> JsonRpcResponse {