use crate::http::context::AppContext; use poem::handler; use poem::http::StatusCode; use poem::web::{Data, Path}; use poem::{Body, IntoResponse, Response}; use std::sync::Arc; /// SSE endpoint: `GET /agents/:story_id/:agent_name/stream` /// /// Streams `AgentEvent`s as Server-Sent Events. Each event is JSON-encoded /// with `data:` prefix and double newline terminator per the SSE spec. #[handler] pub async fn agent_stream( Path((story_id, agent_name)): Path<(String, String)>, ctx: Data<&Arc>, ) -> impl IntoResponse { let mut rx = match ctx.agents.subscribe(&story_id, &agent_name) { Ok(rx) => rx, Err(e) => { return Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::from_string(e)); } }; let stream = async_stream::stream! { loop { match rx.recv().await { Ok(event) => { if let Ok(json) = serde_json::to_string(&event) { yield Ok::<_, std::io::Error>(format!("data: {json}\n\n")); } // Check for terminal events match &event { crate::agents::AgentEvent::Done { .. } | crate::agents::AgentEvent::Error { .. } => break, crate::agents::AgentEvent::Status { status, .. } if status == "stopped" => break, _ => {} } } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { let msg = format!("{{\"type\":\"warning\",\"message\":\"Skipped {n} events\"}}"); yield Ok::<_, std::io::Error>(format!("data: {msg}\n\n")); } Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } } }; Response::builder() .header("Content-Type", "text/event-stream") .header("Cache-Control", "no-cache") .header("Connection", "keep-alive") .body(Body::from_bytes_stream( futures::StreamExt::map(stream, |r| r.map(bytes::Bytes::from)), )) }