story-kit: merge 227_bug_thinking_traces_visible_in_agents_panel_in_release_builds_only
This commit is contained in:
@@ -9,6 +9,9 @@ use std::sync::Arc;
|
||||
///
|
||||
/// Streams `AgentEvent`s as Server-Sent Events. Each event is JSON-encoded
|
||||
/// with `data:` prefix and double newline terminator per the SSE spec.
|
||||
///
|
||||
/// `AgentEvent::Thinking` events are intentionally excluded — thinking traces
|
||||
/// are internal model state and must never be displayed in the UI.
|
||||
#[handler]
|
||||
pub async fn agent_stream(
|
||||
Path((story_id, agent_name)): Path<(String, String)>,
|
||||
@@ -27,6 +30,11 @@ pub async fn agent_stream(
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(event) => {
|
||||
// Never forward thinking traces to the UI — they are
|
||||
// internal model state and must not be displayed.
|
||||
if matches!(event, crate::agents::AgentEvent::Thinking { .. }) {
|
||||
continue;
|
||||
}
|
||||
if let Ok(json) = serde_json::to_string(&event) {
|
||||
yield Ok::<_, std::io::Error>(format!("data: {json}\n\n"));
|
||||
}
|
||||
@@ -56,3 +64,145 @@ pub async fn agent_stream(
|
||||
futures::StreamExt::map(stream, |r| r.map(bytes::Bytes::from)),
|
||||
))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::agents::{AgentEvent, AgentStatus};
|
||||
use crate::http::context::AppContext;
|
||||
use poem::{EndpointExt, Route, get};
|
||||
use std::sync::Arc;
|
||||
use tempfile::tempdir;
|
||||
|
||||
fn test_app(ctx: Arc<AppContext>) -> impl poem::Endpoint {
|
||||
Route::new()
|
||||
.at(
|
||||
"/agents/:story_id/:agent_name/stream",
|
||||
get(agent_stream),
|
||||
)
|
||||
.data(ctx)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thinking_events_are_not_forwarded_via_sse() {
|
||||
let tmp = tempdir().unwrap();
|
||||
let ctx = Arc::new(AppContext::new_test(tmp.path().to_path_buf()));
|
||||
|
||||
// Inject a running agent and get its broadcast sender.
|
||||
let tx = ctx
|
||||
.agents
|
||||
.inject_test_agent("1_story", "coder-1", AgentStatus::Running);
|
||||
|
||||
// Spawn a task that sends events after the SSE connection is established.
|
||||
let tx_clone = tx.clone();
|
||||
tokio::spawn(async move {
|
||||
// Brief pause so the SSE handler has subscribed before we emit.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
|
||||
|
||||
// Thinking event — must be filtered out.
|
||||
let _ = tx_clone.send(AgentEvent::Thinking {
|
||||
story_id: "1_story".to_string(),
|
||||
agent_name: "coder-1".to_string(),
|
||||
text: "secret thinking text".to_string(),
|
||||
});
|
||||
|
||||
// Output event — must be forwarded.
|
||||
let _ = tx_clone.send(AgentEvent::Output {
|
||||
story_id: "1_story".to_string(),
|
||||
agent_name: "coder-1".to_string(),
|
||||
text: "visible output".to_string(),
|
||||
});
|
||||
|
||||
// Done event — closes the stream.
|
||||
let _ = tx_clone.send(AgentEvent::Done {
|
||||
story_id: "1_story".to_string(),
|
||||
agent_name: "coder-1".to_string(),
|
||||
session_id: None,
|
||||
});
|
||||
});
|
||||
|
||||
let cli = poem::test::TestClient::new(test_app(ctx));
|
||||
let resp = cli
|
||||
.get("/agents/1_story/coder-1/stream")
|
||||
.send()
|
||||
.await;
|
||||
|
||||
let body = resp.0.into_body().into_string().await.unwrap();
|
||||
|
||||
// Thinking content must not appear anywhere in the SSE output.
|
||||
assert!(
|
||||
!body.contains("secret thinking text"),
|
||||
"Thinking text must not be forwarded via SSE: {body}"
|
||||
);
|
||||
assert!(
|
||||
!body.contains("\"type\":\"thinking\""),
|
||||
"Thinking event type must not appear in SSE output: {body}"
|
||||
);
|
||||
|
||||
// Output event must be present.
|
||||
assert!(
|
||||
body.contains("visible output"),
|
||||
"Output event must be forwarded via SSE: {body}"
|
||||
);
|
||||
assert!(
|
||||
body.contains("\"type\":\"output\""),
|
||||
"Output event type must appear in SSE output: {body}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn output_and_done_events_are_forwarded_via_sse() {
|
||||
let tmp = tempdir().unwrap();
|
||||
let ctx = Arc::new(AppContext::new_test(tmp.path().to_path_buf()));
|
||||
|
||||
let tx = ctx
|
||||
.agents
|
||||
.inject_test_agent("2_story", "coder-1", AgentStatus::Running);
|
||||
|
||||
let tx_clone = tx.clone();
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
|
||||
|
||||
let _ = tx_clone.send(AgentEvent::Output {
|
||||
story_id: "2_story".to_string(),
|
||||
agent_name: "coder-1".to_string(),
|
||||
text: "step 1 output".to_string(),
|
||||
});
|
||||
|
||||
let _ = tx_clone.send(AgentEvent::Done {
|
||||
story_id: "2_story".to_string(),
|
||||
agent_name: "coder-1".to_string(),
|
||||
session_id: Some("sess-abc".to_string()),
|
||||
});
|
||||
});
|
||||
|
||||
let cli = poem::test::TestClient::new(test_app(ctx));
|
||||
let resp = cli
|
||||
.get("/agents/2_story/coder-1/stream")
|
||||
.send()
|
||||
.await;
|
||||
|
||||
let body = resp.0.into_body().into_string().await.unwrap();
|
||||
|
||||
assert!(body.contains("step 1 output"), "Output must be forwarded: {body}");
|
||||
assert!(body.contains("\"type\":\"done\""), "Done event must be forwarded: {body}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unknown_agent_returns_404() {
|
||||
let tmp = tempdir().unwrap();
|
||||
let ctx = Arc::new(AppContext::new_test(tmp.path().to_path_buf()));
|
||||
|
||||
let cli = poem::test::TestClient::new(test_app(ctx));
|
||||
let resp = cli
|
||||
.get("/agents/nonexistent/coder-1/stream")
|
||||
.send()
|
||||
.await;
|
||||
|
||||
assert_eq!(
|
||||
resp.0.status(),
|
||||
poem::http::StatusCode::NOT_FOUND,
|
||||
"Unknown agent must return 404"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user