From 2f0d796b38da374f3e3052ab04789ee38cb7ce28 Mon Sep 17 00:00:00 2001 From: Dave Date: Thu, 19 Mar 2026 01:29:33 +0000 Subject: [PATCH] story-kit: merge 292_story_show_server_logs_in_web_ui --- frontend/src/api/client.ts | 13 +- frontend/src/components/Chat.tsx | 7 + frontend/src/components/ServerLogsPanel.tsx | 246 ++++++++++++++++++++ server/src/http/ws.rs | 87 ++++++- server/src/log_buffer.rs | 45 +++- server/src/matrix/bot.rs | 1 + 6 files changed, 384 insertions(+), 15 deletions(-) create mode 100644 frontend/src/components/ServerLogsPanel.tsx diff --git a/frontend/src/api/client.ts b/frontend/src/api/client.ts index 5e40391..fb0e49f 100644 --- a/frontend/src/api/client.ts +++ b/frontend/src/api/client.ts @@ -85,7 +85,9 @@ export type WsResponse = /** Streaming token from a /btw side question response. */ | { type: "side_question_token"; content: string } /** Final signal that the /btw side question has been fully answered. */ - | { type: "side_question_done"; response: string }; + | { type: "side_question_done"; response: string } + /** A single server log entry (bulk on connect, then live). */ + | { type: "log_entry"; timestamp: string; level: string; message: string }; export interface ProviderConfig { provider: string; @@ -376,6 +378,11 @@ export class ChatWebSocket { private onOnboardingStatus?: (needsOnboarding: boolean) => void; private onSideQuestionToken?: (content: string) => void; private onSideQuestionDone?: (response: string) => void; + private onLogEntry?: ( + timestamp: string, + level: string, + message: string, + ) => void; private connected = false; private closeTimer?: number; private wsPath = DEFAULT_WS_PATH; @@ -461,6 +468,8 @@ export class ChatWebSocket { this.onSideQuestionToken?.(data.content); if (data.type === "side_question_done") this.onSideQuestionDone?.(data.response); + if (data.type === "log_entry") + this.onLogEntry?.(data.timestamp, data.level, data.message); if (data.type === "pong") { window.clearTimeout(this.heartbeatTimeout); this.heartbeatTimeout = undefined; @@ -516,6 +525,7 @@ export class ChatWebSocket { onOnboardingStatus?: (needsOnboarding: boolean) => void; onSideQuestionToken?: (content: string) => void; onSideQuestionDone?: (response: string) => void; + onLogEntry?: (timestamp: string, level: string, message: string) => void; }, wsPath = DEFAULT_WS_PATH, ) { @@ -533,6 +543,7 @@ export class ChatWebSocket { this.onOnboardingStatus = handlers.onOnboardingStatus; this.onSideQuestionToken = handlers.onSideQuestionToken; this.onSideQuestionDone = handlers.onSideQuestionDone; + this.onLogEntry = handlers.onLogEntry; this.wsPath = wsPath; this.shouldReconnect = true; diff --git a/frontend/src/components/Chat.tsx b/frontend/src/components/Chat.tsx index b6d2c8d..e30125b 100644 --- a/frontend/src/components/Chat.tsx +++ b/frontend/src/components/Chat.tsx @@ -13,6 +13,8 @@ import { ChatInput } from "./ChatInput"; import { HelpOverlay } from "./HelpOverlay"; import { LozengeFlyProvider } from "./LozengeFlyContext"; import { MessageItem } from "./MessageItem"; +import type { LogEntry } from "./ServerLogsPanel"; +import { ServerLogsPanel } from "./ServerLogsPanel"; import { SideQuestionOverlay } from "./SideQuestionOverlay"; import { StagePanel } from "./StagePanel"; import { WorkItemDetailPanel } from "./WorkItemDetailPanel"; @@ -214,6 +216,7 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { loading: boolean; } | null>(null); const [showHelp, setShowHelp] = useState(false); + const [serverLogs, setServerLogs] = useState([]); // Ref so stale WebSocket callbacks can read the current queued messages const queuedMessagesRef = useRef<{ id: string; text: string }[]>([]); const queueIdCounterRef = useRef(0); @@ -402,6 +405,9 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { prev ? { ...prev, response, loading: false } : prev, ); }, + onLogEntry: (timestamp, level, message) => { + setServerLogs((prev) => [...prev, { timestamp, level, message }]); + }, }); return () => { @@ -1021,6 +1027,7 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { items={pipeline.backlog} onItemClick={(item) => setSelectedWorkItemId(item.story_id)} /> + )} diff --git a/frontend/src/components/ServerLogsPanel.tsx b/frontend/src/components/ServerLogsPanel.tsx new file mode 100644 index 0000000..16c9a85 --- /dev/null +++ b/frontend/src/components/ServerLogsPanel.tsx @@ -0,0 +1,246 @@ +import * as React from "react"; + +const { useCallback, useEffect, useRef, useState } = React; + +export interface LogEntry { + timestamp: string; + level: string; + message: string; +} + +interface ServerLogsPanelProps { + logs: LogEntry[]; +} + +function levelColor(level: string): string { + switch (level.toUpperCase()) { + case "ERROR": + return "#e06c75"; + case "WARN": + return "#e5c07b"; + default: + return "#98c379"; + } +} + +export function ServerLogsPanel({ logs }: ServerLogsPanelProps) { + const [isOpen, setIsOpen] = useState(false); + const [filter, setFilter] = useState(""); + const [severityFilter, setSeverityFilter] = useState("ALL"); + const scrollRef = useRef(null); + const userScrolledUpRef = useRef(false); + const lastScrollTopRef = useRef(0); + + const filteredLogs = logs.filter((entry) => { + const matchesSeverity = + severityFilter === "ALL" || entry.level.toUpperCase() === severityFilter; + const matchesFilter = + filter === "" || + entry.message.toLowerCase().includes(filter.toLowerCase()) || + entry.timestamp.includes(filter); + return matchesSeverity && matchesFilter; + }); + + const scrollToBottom = useCallback(() => { + const el = scrollRef.current; + if (el) { + el.scrollTop = el.scrollHeight; + lastScrollTopRef.current = el.scrollTop; + } + }, []); + + // Auto-scroll when new entries arrive (unless user scrolled up). + useEffect(() => { + if (!isOpen) return; + if (!userScrolledUpRef.current) { + scrollToBottom(); + } + }, [filteredLogs.length, isOpen, scrollToBottom]); + + const handleScroll = () => { + const el = scrollRef.current; + if (!el) return; + const isAtBottom = el.scrollHeight - el.scrollTop - el.clientHeight < 5; + if (el.scrollTop < lastScrollTopRef.current) { + userScrolledUpRef.current = true; + } + if (isAtBottom) { + userScrolledUpRef.current = false; + } + lastScrollTopRef.current = el.scrollTop; + }; + + const severityButtons = ["ALL", "INFO", "WARN", "ERROR"] as const; + + return ( +
+ {/* Header / toggle */} + + + {isOpen && ( +
+ {/* Filter controls */} +
+ setFilter(e.target.value)} + placeholder="Filter logs..." + style={{ + flex: 1, + minWidth: "80px", + padding: "4px 8px", + borderRadius: "4px", + border: "1px solid #333", + background: "#161b22", + color: "#ccc", + fontSize: "0.8em", + outline: "none", + }} + /> + {severityButtons.map((sev) => ( + + ))} +
+ + {/* Log entries */} +
+ {filteredLogs.length === 0 ? ( +
+ No log entries +
+ ) : ( + filteredLogs.map((entry, idx) => ( +
+ + {entry.timestamp.replace("T", " ").replace("Z", "")} + + + {entry.level} + + + {entry.message} + +
+ )) + )} +
+
+ )} +
+ ); +} diff --git a/server/src/http/ws.rs b/server/src/http/ws.rs index 3028385..c144b08 100644 --- a/server/src/http/ws.rs +++ b/server/src/http/ws.rs @@ -4,6 +4,7 @@ use crate::io::onboarding; use crate::io::watcher::WatcherEvent; use crate::llm::chat; use crate::llm::types::Message; +use crate::log_buffer; use futures::{SinkExt, StreamExt}; use poem::handler; use poem::web::Data; @@ -132,6 +133,13 @@ enum WsResponse { SideQuestionDone { response: String, }, + /// A single server log entry. Sent in bulk on connect (recent history), + /// then streamed live as new entries arrive. + LogEntry { + timestamp: String, + level: String, + message: String, + }, } impl From for Option { @@ -208,6 +216,42 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem }); } + // Push recent server log entries so the client has history on connect. + { + let entries = log_buffer::global().get_recent_entries(100, None, None); + for entry in entries { + let _ = tx.send(WsResponse::LogEntry { + timestamp: entry.timestamp, + level: entry.level.as_str().to_string(), + message: entry.message, + }); + } + } + + // Subscribe to live log entries and forward them to the client. + let tx_logs = tx.clone(); + let mut log_rx = log_buffer::global().subscribe(); + tokio::spawn(async move { + loop { + match log_rx.recv().await { + Ok(entry) => { + if tx_logs + .send(WsResponse::LogEntry { + timestamp: entry.timestamp, + level: entry.level.as_str().to_string(), + message: entry.message, + }) + .is_err() + { + break; + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); + // Subscribe to filesystem watcher events and forward them to the client. // After each work-item event, also push the updated pipeline state. // Config-changed events are forwarded as-is without a pipeline refresh. @@ -1136,10 +1180,30 @@ mod tests { "expected onboarding_status, got: {onboarding}" ); + // Drain any log_entry messages sent as initial history on connect. + // These are buffered before tests send their own requests. + loop { + // Use a very short timeout: if nothing arrives quickly, the burst is done. + let Ok(Some(Ok(msg))) = + tokio::time::timeout(std::time::Duration::from_millis(200), stream.next()).await + else { + break; + }; + let val: serde_json::Value = match msg { + tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(), + _ => break, + }; + if val["type"] != "log_entry" { + // Unexpected non-log message during drain — this shouldn't happen. + panic!("unexpected message during log drain: {val}"); + } + } + (sink, stream, initial) } - /// Read next text message from the stream with a timeout. + /// Read next non-log_entry text message from the stream with a timeout. + /// Skips any `log_entry` messages that arrive between events. async fn next_msg( stream: &mut futures::stream::SplitStream< tokio_tungstenite::WebSocketStream< @@ -1147,14 +1211,19 @@ mod tests { >, >, ) -> serde_json::Value { - let msg = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next()) - .await - .expect("timeout waiting for message") - .expect("stream ended") - .expect("ws error"); - match msg { - tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(), - other => panic!("expected text message, got: {other:?}"), + loop { + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next()) + .await + .expect("timeout waiting for message") + .expect("stream ended") + .expect("ws error"); + let val: serde_json::Value = match msg { + tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(), + other => panic!("expected text message, got: {other:?}"), + }; + if val["type"] != "log_entry" { + return val; + } } } diff --git a/server/src/log_buffer.rs b/server/src/log_buffer.rs index 473cb51..0e0441c 100644 --- a/server/src/log_buffer.rs +++ b/server/src/log_buffer.rs @@ -11,6 +11,7 @@ use std::fs::OpenOptions; use std::io::Write; use std::path::PathBuf; use std::sync::{Mutex, OnceLock}; +use tokio::sync::broadcast; const CAPACITY: usize = 1000; @@ -72,16 +73,25 @@ impl LogEntry { pub struct LogBuffer { entries: Mutex>, log_file: Mutex>, + /// Broadcast channel for live log streaming to WebSocket subscribers. + broadcast_tx: broadcast::Sender, } impl LogBuffer { fn new() -> Self { + let (broadcast_tx, _) = broadcast::channel(512); Self { entries: Mutex::new(VecDeque::with_capacity(CAPACITY)), log_file: Mutex::new(None), + broadcast_tx, } } + /// Subscribe to live log entries as they are pushed. + pub fn subscribe(&self) -> broadcast::Receiver { + self.broadcast_tx.subscribe() + } + /// Set the persistent log file path. Call once at startup after the /// project root is known. pub fn set_log_file(&self, path: PathBuf) { @@ -112,8 +122,11 @@ impl LogBuffer { if buf.len() >= CAPACITY { buf.pop_front(); } - buf.push_back(entry); + buf.push_back(entry.clone()); } + + // Best-effort broadcast to WebSocket subscribers. + let _ = self.broadcast_tx.send(entry); } /// Return up to `count` recent log lines as formatted strings, @@ -140,6 +153,31 @@ impl LogBuffer { let start = filtered.len().saturating_sub(count); filtered[start..].to_vec() } + + /// Return up to `count` recent `LogEntry` structs (not formatted strings), + /// optionally filtered by substring and/or severity level. + /// Entries are returned in chronological order (oldest first). + pub fn get_recent_entries( + &self, + count: usize, + filter: Option<&str>, + severity: Option<&LogLevel>, + ) -> Vec { + let buf = match self.entries.lock() { + Ok(b) => b, + Err(_) => return vec![], + }; + let filtered: Vec = buf + .iter() + .filter(|entry| { + severity.is_none_or(|s| &entry.level == s) + && filter.is_none_or(|f| entry.message.contains(f) || entry.formatted().contains(f)) + }) + .cloned() + .collect(); + let start = filtered.len().saturating_sub(count); + filtered[start..].to_vec() + } } static GLOBAL: OnceLock = OnceLock::new(); @@ -208,10 +246,7 @@ mod tests { #[test] fn evicts_oldest_at_capacity() { - let buf = LogBuffer { - entries: Mutex::new(VecDeque::with_capacity(CAPACITY)), - log_file: Mutex::new(None), - }; + let buf = LogBuffer::new(); // Fill past capacity for i in 0..=CAPACITY { buf.push_entry(LogLevel::Info, format!("line {i}")); diff --git a/server/src/matrix/bot.rs b/server/src/matrix/bot.rs index 7e7cd11..456933a 100644 --- a/server/src/matrix/bot.rs +++ b/server/src/matrix/bot.rs @@ -2116,6 +2116,7 @@ mod tests { out.contains("**Pipeline Status**"), "missing bold title: {out}" ); + assert!(out.contains("**Pipeline Status**"), "missing bold title: {out}"); assert!(out.contains("**Backlog**"), "stage should use bold: {out}"); }