story-kit: merge 292_story_show_server_logs_in_web_ui
This commit is contained in:
@@ -4,6 +4,7 @@ use crate::io::onboarding;
|
||||
use crate::io::watcher::WatcherEvent;
|
||||
use crate::llm::chat;
|
||||
use crate::llm::types::Message;
|
||||
use crate::log_buffer;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use poem::handler;
|
||||
use poem::web::Data;
|
||||
@@ -132,6 +133,13 @@ enum WsResponse {
|
||||
SideQuestionDone {
|
||||
response: String,
|
||||
},
|
||||
/// A single server log entry. Sent in bulk on connect (recent history),
|
||||
/// then streamed live as new entries arrive.
|
||||
LogEntry {
|
||||
timestamp: String,
|
||||
level: String,
|
||||
message: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<WatcherEvent> for Option<WsResponse> {
|
||||
@@ -208,6 +216,42 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
|
||||
});
|
||||
}
|
||||
|
||||
// Push recent server log entries so the client has history on connect.
|
||||
{
|
||||
let entries = log_buffer::global().get_recent_entries(100, None, None);
|
||||
for entry in entries {
|
||||
let _ = tx.send(WsResponse::LogEntry {
|
||||
timestamp: entry.timestamp,
|
||||
level: entry.level.as_str().to_string(),
|
||||
message: entry.message,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe to live log entries and forward them to the client.
|
||||
let tx_logs = tx.clone();
|
||||
let mut log_rx = log_buffer::global().subscribe();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match log_rx.recv().await {
|
||||
Ok(entry) => {
|
||||
if tx_logs
|
||||
.send(WsResponse::LogEntry {
|
||||
timestamp: entry.timestamp,
|
||||
level: entry.level.as_str().to_string(),
|
||||
message: entry.message,
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Subscribe to filesystem watcher events and forward them to the client.
|
||||
// After each work-item event, also push the updated pipeline state.
|
||||
// Config-changed events are forwarded as-is without a pipeline refresh.
|
||||
@@ -1136,10 +1180,30 @@ mod tests {
|
||||
"expected onboarding_status, got: {onboarding}"
|
||||
);
|
||||
|
||||
// Drain any log_entry messages sent as initial history on connect.
|
||||
// These are buffered before tests send their own requests.
|
||||
loop {
|
||||
// Use a very short timeout: if nothing arrives quickly, the burst is done.
|
||||
let Ok(Some(Ok(msg))) =
|
||||
tokio::time::timeout(std::time::Duration::from_millis(200), stream.next()).await
|
||||
else {
|
||||
break;
|
||||
};
|
||||
let val: serde_json::Value = match msg {
|
||||
tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(),
|
||||
_ => break,
|
||||
};
|
||||
if val["type"] != "log_entry" {
|
||||
// Unexpected non-log message during drain — this shouldn't happen.
|
||||
panic!("unexpected message during log drain: {val}");
|
||||
}
|
||||
}
|
||||
|
||||
(sink, stream, initial)
|
||||
}
|
||||
|
||||
/// Read next text message from the stream with a timeout.
|
||||
/// Read next non-log_entry text message from the stream with a timeout.
|
||||
/// Skips any `log_entry` messages that arrive between events.
|
||||
async fn next_msg(
|
||||
stream: &mut futures::stream::SplitStream<
|
||||
tokio_tungstenite::WebSocketStream<
|
||||
@@ -1147,14 +1211,19 @@ mod tests {
|
||||
>,
|
||||
>,
|
||||
) -> serde_json::Value {
|
||||
let msg = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next())
|
||||
.await
|
||||
.expect("timeout waiting for message")
|
||||
.expect("stream ended")
|
||||
.expect("ws error");
|
||||
match msg {
|
||||
tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(),
|
||||
other => panic!("expected text message, got: {other:?}"),
|
||||
loop {
|
||||
let msg = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next())
|
||||
.await
|
||||
.expect("timeout waiting for message")
|
||||
.expect("stream ended")
|
||||
.expect("ws error");
|
||||
let val: serde_json::Value = match msg {
|
||||
tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(),
|
||||
other => panic!("expected text message, got: {other:?}"),
|
||||
};
|
||||
if val["type"] != "log_entry" {
|
||||
return val;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ use std::fs::OpenOptions;
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Mutex, OnceLock};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
const CAPACITY: usize = 1000;
|
||||
|
||||
@@ -72,16 +73,25 @@ impl LogEntry {
|
||||
pub struct LogBuffer {
|
||||
entries: Mutex<VecDeque<LogEntry>>,
|
||||
log_file: Mutex<Option<PathBuf>>,
|
||||
/// Broadcast channel for live log streaming to WebSocket subscribers.
|
||||
broadcast_tx: broadcast::Sender<LogEntry>,
|
||||
}
|
||||
|
||||
impl LogBuffer {
|
||||
fn new() -> Self {
|
||||
let (broadcast_tx, _) = broadcast::channel(512);
|
||||
Self {
|
||||
entries: Mutex::new(VecDeque::with_capacity(CAPACITY)),
|
||||
log_file: Mutex::new(None),
|
||||
broadcast_tx,
|
||||
}
|
||||
}
|
||||
|
||||
/// Subscribe to live log entries as they are pushed.
|
||||
pub fn subscribe(&self) -> broadcast::Receiver<LogEntry> {
|
||||
self.broadcast_tx.subscribe()
|
||||
}
|
||||
|
||||
/// Set the persistent log file path. Call once at startup after the
|
||||
/// project root is known.
|
||||
pub fn set_log_file(&self, path: PathBuf) {
|
||||
@@ -112,8 +122,11 @@ impl LogBuffer {
|
||||
if buf.len() >= CAPACITY {
|
||||
buf.pop_front();
|
||||
}
|
||||
buf.push_back(entry);
|
||||
buf.push_back(entry.clone());
|
||||
}
|
||||
|
||||
// Best-effort broadcast to WebSocket subscribers.
|
||||
let _ = self.broadcast_tx.send(entry);
|
||||
}
|
||||
|
||||
/// Return up to `count` recent log lines as formatted strings,
|
||||
@@ -140,6 +153,31 @@ impl LogBuffer {
|
||||
let start = filtered.len().saturating_sub(count);
|
||||
filtered[start..].to_vec()
|
||||
}
|
||||
|
||||
/// Return up to `count` recent `LogEntry` structs (not formatted strings),
|
||||
/// optionally filtered by substring and/or severity level.
|
||||
/// Entries are returned in chronological order (oldest first).
|
||||
pub fn get_recent_entries(
|
||||
&self,
|
||||
count: usize,
|
||||
filter: Option<&str>,
|
||||
severity: Option<&LogLevel>,
|
||||
) -> Vec<LogEntry> {
|
||||
let buf = match self.entries.lock() {
|
||||
Ok(b) => b,
|
||||
Err(_) => return vec![],
|
||||
};
|
||||
let filtered: Vec<LogEntry> = buf
|
||||
.iter()
|
||||
.filter(|entry| {
|
||||
severity.is_none_or(|s| &entry.level == s)
|
||||
&& filter.is_none_or(|f| entry.message.contains(f) || entry.formatted().contains(f))
|
||||
})
|
||||
.cloned()
|
||||
.collect();
|
||||
let start = filtered.len().saturating_sub(count);
|
||||
filtered[start..].to_vec()
|
||||
}
|
||||
}
|
||||
|
||||
static GLOBAL: OnceLock<LogBuffer> = OnceLock::new();
|
||||
@@ -208,10 +246,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn evicts_oldest_at_capacity() {
|
||||
let buf = LogBuffer {
|
||||
entries: Mutex::new(VecDeque::with_capacity(CAPACITY)),
|
||||
log_file: Mutex::new(None),
|
||||
};
|
||||
let buf = LogBuffer::new();
|
||||
// Fill past capacity
|
||||
for i in 0..=CAPACITY {
|
||||
buf.push_entry(LogLevel::Info, format!("line {i}"));
|
||||
|
||||
@@ -2116,6 +2116,7 @@ mod tests {
|
||||
out.contains("**Pipeline Status**"),
|
||||
"missing bold title: {out}"
|
||||
);
|
||||
assert!(out.contains("**Pipeline Status**"), "missing bold title: {out}");
|
||||
assert!(out.contains("**Backlog**"), "stage should use bold: {out}");
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user