Files
huskies/server/src/log_buffer.rs
T

679 lines
22 KiB
Rust
Raw Normal View History

//! Bounded in-memory ring buffer for server log output.
//!
//! Use the [`slog!`] macro (INFO), [`slog_warn!`] (WARN), or [`slog_error!`]
//! (ERROR) as drop-in replacements for `eprintln!`. Each call writes to stderr
//! with an ISO 8601 timestamp + severity prefix, and simultaneously appends
//! the entry to the global ring buffer, making it retrievable via the
//! `get_server_logs` MCP tool.
use std::collections::VecDeque;
use std::fs::OpenOptions;
use std::io::Write;
2026-05-14 11:19:15 +00:00
use std::path::{Path, PathBuf};
use std::sync::{Mutex, OnceLock};
use tokio::sync::broadcast;
const CAPACITY: usize = 1000;
2026-05-14 11:19:15 +00:00
/// Number of daily log files to keep on disk before pruning older ones.
const DEFAULT_KEEP_DAYS: u64 = 7;
/// Severity level for a log entry.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LogLevel {
Error,
Warn,
Info,
}
impl LogLevel {
2026-04-29 10:41:32 +00:00
/// Return the uppercase string label for this level (`"ERROR"`, `"WARN"`, or `"INFO"`).
pub fn as_str(&self) -> &'static str {
match self {
LogLevel::Error => "ERROR",
LogLevel::Warn => "WARN",
LogLevel::Info => "INFO",
}
}
/// Parse from a case-insensitive string. Returns `None` for unknown levels.
pub fn from_str_ci(s: &str) -> Option<Self> {
match s.to_uppercase().as_str() {
"ERROR" => Some(LogLevel::Error),
"WARN" => Some(LogLevel::Warn),
"INFO" => Some(LogLevel::Info),
_ => None,
}
}
}
/// A single captured log entry.
#[derive(Debug, Clone)]
pub struct LogEntry {
pub level: LogLevel,
/// ISO 8601 UTC timestamp.
pub timestamp: String,
pub message: String,
}
impl LogEntry {
/// Format the entry as a single log line: `{timestamp} [{LEVEL}] {message}`.
pub fn formatted(&self) -> String {
format!(
"{} [{}] {}",
self.timestamp,
self.level.as_str(),
self.message
)
}
/// Format with ANSI color codes for terminal output.
/// WARN is yellow, ERROR is red, INFO has no color.
fn colored_formatted(&self) -> String {
let line = self.formatted();
match self.level {
LogLevel::Warn => format!("\x1b[33m{line}\x1b[0m"),
LogLevel::Error => format!("\x1b[31m{line}\x1b[0m"),
LogLevel::Info => line,
}
}
}
2026-05-14 11:19:15 +00:00
/// Internal state for the on-disk log: directory and last-written date.
struct LogDiskState {
dir: Option<PathBuf>,
/// `YYYY-MM-DD` of the last written entry — used to detect day rollover.
last_date: String,
}
2026-04-29 10:41:32 +00:00
/// Bounded in-memory ring buffer holding recent log entries and a broadcast channel for live streaming.
pub struct LogBuffer {
entries: Mutex<VecDeque<LogEntry>>,
2026-05-14 11:19:15 +00:00
disk: Mutex<LogDiskState>,
/// Broadcast channel for live log streaming to WebSocket subscribers.
broadcast_tx: broadcast::Sender<LogEntry>,
}
impl LogBuffer {
fn new() -> Self {
let (broadcast_tx, _) = broadcast::channel(512);
Self {
entries: Mutex::new(VecDeque::with_capacity(CAPACITY)),
2026-05-14 11:19:15 +00:00
disk: Mutex::new(LogDiskState {
dir: None,
last_date: String::new(),
}),
broadcast_tx,
}
}
/// Subscribe to live log entries as they are pushed.
pub fn subscribe(&self) -> broadcast::Receiver<LogEntry> {
self.broadcast_tx.subscribe()
}
2026-05-14 11:19:15 +00:00
/// Set the directory for daily-rotated persistent log files.
///
/// Files are written as `server-YYYY-MM-DD.log` inside `dir`. Files older
/// than [`DEFAULT_KEEP_DAYS`] are pruned immediately and again on each day
/// rollover. Call once at startup after the project root is known.
pub fn set_log_dir(&self, dir: PathBuf) {
prune_old_logs(&dir, DEFAULT_KEEP_DAYS);
if let Ok(mut state) = self.disk.lock() {
state.dir = Some(dir);
}
}
/// Append a log entry, evicting the oldest when at capacity.
pub fn push_entry(&self, level: LogLevel, message: String) {
let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
let entry = LogEntry {
level,
timestamp,
message,
};
eprintln!("{}", entry.colored_formatted());
2026-05-14 11:19:15 +00:00
// Compute today's log-file path and detect day rollover under a single lock.
let (log_path, prune_dir) = {
match self.disk.lock() {
Ok(mut state) => {
if let Some(dir) = state.dir.clone() {
let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
let path = dir.join(format!("server-{today}.log"));
let maybe_prune = if state.last_date != today {
state.last_date = today;
Some(dir)
} else {
None
};
(Some(path), maybe_prune)
} else {
(None, None)
}
}
Err(_) => (None, None),
}
};
// Prune old files after releasing the lock (filesystem I/O outside the lock).
if let Some(ref dir) = prune_dir {
prune_old_logs(dir, DEFAULT_KEEP_DAYS);
}
// Append to the current day's log file (best-effort).
if let Some(ref path) = log_path
&& let Ok(mut file) = OpenOptions::new().create(true).append(true).open(path)
{
let _ = writeln!(file, "{}", entry.formatted());
}
if let Ok(mut buf) = self.entries.lock() {
if buf.len() >= CAPACITY {
buf.pop_front();
}
buf.push_back(entry.clone());
}
// Best-effort broadcast to WebSocket subscribers.
let _ = self.broadcast_tx.send(entry);
}
2026-05-14 11:19:15 +00:00
/// Read log lines from on-disk files with optional substring filter, offset, and limit.
///
/// Reads all `server-YYYY-MM-DD.log` files inside the configured log directory
/// in chronological order. Returns up to `limit` matching lines, skipping the
/// first `offset` matches. Useful for accessing history beyond the in-memory ring.
pub fn read_from_disk(&self, filter: Option<&str>, offset: usize, limit: usize) -> Vec<String> {
if limit == 0 {
return vec![];
}
let dir = match self.disk.lock() {
Ok(s) => s.dir.clone(),
Err(_) => return vec![],
};
let dir = match dir {
Some(d) => d,
None => return vec![],
};
read_log_dir(&dir, filter, offset, limit)
}
/// Return up to `count` recent log lines as formatted strings,
/// optionally filtered by substring and/or severity level.
/// Lines are returned in chronological order (oldest first).
pub fn get_recent(
&self,
count: usize,
filter: Option<&str>,
severity: Option<&LogLevel>,
) -> Vec<String> {
let buf = match self.entries.lock() {
Ok(b) => b,
Err(_) => return vec![],
};
let filtered: Vec<String> = buf
.iter()
.filter(|entry| {
severity.is_none_or(|s| &entry.level == s)
&& filter
.is_none_or(|f| entry.message.contains(f) || entry.formatted().contains(f))
})
.map(|entry| entry.formatted())
.collect();
let start = filtered.len().saturating_sub(count);
filtered[start..].to_vec()
}
/// Return up to `count` recent `LogEntry` structs (not formatted strings),
/// optionally filtered by substring and/or severity level.
/// Entries are returned in chronological order (oldest first).
pub fn get_recent_entries(
&self,
count: usize,
filter: Option<&str>,
severity: Option<&LogLevel>,
) -> Vec<LogEntry> {
let buf = match self.entries.lock() {
Ok(b) => b,
Err(_) => return vec![],
};
let filtered: Vec<LogEntry> = buf
.iter()
.filter(|entry| {
severity.is_none_or(|s| &entry.level == s)
&& filter
.is_none_or(|f| entry.message.contains(f) || entry.formatted().contains(f))
})
.cloned()
.collect();
let start = filtered.len().saturating_sub(count);
filtered[start..].to_vec()
}
}
static GLOBAL: OnceLock<LogBuffer> = OnceLock::new();
/// Access the process-wide log ring buffer.
pub fn global() -> &'static LogBuffer {
GLOBAL.get_or_init(LogBuffer::new)
}
2026-05-14 11:19:15 +00:00
/// Delete daily log files older than `keep_days` from `dir`.
fn prune_old_logs(dir: &Path, keep_days: u64) {
let cutoff = chrono::Utc::now()
.checked_sub_signed(chrono::Duration::days(keep_days as i64))
.map(|t| t.format("%Y-%m-%d").to_string())
.unwrap_or_default();
let Ok(entries) = std::fs::read_dir(dir) else {
return;
};
for entry in entries.filter_map(|e| e.ok()) {
let path = entry.path();
let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
continue;
};
// Match "server-YYYY-MM-DD.log"
if name.starts_with("server-") && name.ends_with(".log") && name.len() == 21 {
// SAFETY: "server-" is 7 ASCII bytes, ".log" is 4, total 21 chars means
// the middle 10 bytes are the date "YYYY-MM-DD" — all ASCII, safe to slice.
if let Some(date_part) = name.get(7..17)
&& date_part < cutoff.as_str()
{
let _ = std::fs::remove_file(&path);
}
}
}
}
/// Read log lines from all daily files in `dir`, applying filter/offset/limit.
fn read_log_dir(dir: &Path, filter: Option<&str>, offset: usize, limit: usize) -> Vec<String> {
use std::io::BufRead;
let mut log_files: Vec<PathBuf> = match std::fs::read_dir(dir) {
Ok(entries) => entries
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
p.file_name()
.and_then(|n| n.to_str())
.map(|n| n.starts_with("server-") && n.ends_with(".log"))
.unwrap_or(false)
})
.collect(),
Err(_) => return vec![],
};
log_files.sort();
let mut skipped = 0usize;
let mut results = Vec::new();
'outer: for path in &log_files {
let file = match std::fs::File::open(path) {
Ok(f) => f,
Err(_) => continue,
};
let reader = std::io::BufReader::new(file);
for line in reader.lines() {
let line = match line {
Ok(l) => l,
Err(_) => continue,
};
if let Some(f) = filter
&& !line.contains(f)
{
continue;
}
if skipped < offset {
skipped += 1;
continue;
}
results.push(line);
if results.len() >= limit {
break 'outer;
}
}
}
results
}
/// Write an INFO log to stderr **and** capture it in the ring buffer.
///
/// Usage is identical to `eprintln!`:
/// ```ignore
/// slog!("agent {} started", name);
/// ```
#[macro_export]
macro_rules! slog {
($($arg:tt)*) => {{
$crate::log_buffer::global().push_entry(
$crate::log_buffer::LogLevel::Info,
format!($($arg)*),
);
}};
}
/// Write a WARN log to stderr **and** capture it in the ring buffer.
#[macro_export]
macro_rules! slog_warn {
($($arg:tt)*) => {{
$crate::log_buffer::global().push_entry(
$crate::log_buffer::LogLevel::Warn,
format!($($arg)*),
);
}};
}
/// Write an ERROR log to stderr **and** capture it in the ring buffer.
#[macro_export]
macro_rules! slog_error {
($($arg:tt)*) => {{
$crate::log_buffer::global().push_entry(
$crate::log_buffer::LogLevel::Error,
format!($($arg)*),
);
}};
}
#[cfg(test)]
mod tests {
use super::*;
fn fresh_buffer() -> LogBuffer {
LogBuffer::new()
}
#[test]
fn push_and_retrieve() {
let buf = fresh_buffer();
buf.push_entry(LogLevel::Info, "line one".into());
buf.push_entry(LogLevel::Info, "line two".into());
let recent = buf.get_recent(10, None, None);
assert_eq!(recent.len(), 2);
assert!(recent[0].contains("[INFO]") && recent[0].contains("line one"));
assert!(recent[1].contains("[INFO]") && recent[1].contains("line two"));
}
#[test]
fn evicts_oldest_at_capacity() {
let buf = LogBuffer::new();
// Fill past capacity
for i in 0..=CAPACITY {
buf.push_entry(LogLevel::Info, format!("line {i}"));
}
let recent = buf.get_recent(CAPACITY + 1, None, None);
// Should have exactly CAPACITY lines
assert_eq!(recent.len(), CAPACITY);
// The oldest (line 0) should have been evicted
assert!(
!recent
.iter()
.any(|l| l.contains("line 0") && !l.contains("line 10"))
);
// The newest should be present
assert!(
recent
.iter()
.any(|l| l.contains(&format!("line {CAPACITY}")))
);
}
#[test]
fn filter_by_substring() {
let buf = fresh_buffer();
buf.push_entry(LogLevel::Info, "watcher started".into());
buf.push_entry(LogLevel::Info, "mcp call received".into());
buf.push_entry(LogLevel::Info, "watcher event".into());
let filtered = buf.get_recent(100, Some("watcher"), None);
assert_eq!(filtered.len(), 2);
assert!(filtered[0].contains("watcher started"));
assert!(filtered[1].contains("watcher event"));
}
#[test]
fn count_limits_results() {
let buf = fresh_buffer();
for i in 0..10 {
buf.push_entry(LogLevel::Info, format!("line {i}"));
}
let recent = buf.get_recent(3, None, None);
assert_eq!(recent.len(), 3);
// Most recent 3
assert!(recent[0].contains("line 7"));
assert!(recent[1].contains("line 8"));
assert!(recent[2].contains("line 9"));
}
#[test]
fn empty_buffer_returns_empty() {
let buf = fresh_buffer();
assert!(buf.get_recent(10, None, None).is_empty());
}
#[test]
fn log_lines_include_iso8601_timestamp() {
let buf = fresh_buffer();
buf.push_entry(LogLevel::Info, "timestamped message".into());
let recent = buf.get_recent(1, None, None);
assert_eq!(recent.len(), 1);
// Timestamp format: YYYY-MM-DDTHH:MM:SSZ
let line = &recent[0];
assert!(line.len() > 20, "Line should have timestamp prefix: {line}");
// Check it starts with a 4-digit year
assert!(line.chars().next().unwrap().is_ascii_digit());
assert!(line.contains('T'));
assert!(line.contains('Z'));
}
#[test]
fn filter_by_severity_error_only() {
let buf = fresh_buffer();
buf.push_entry(LogLevel::Info, "info message".into());
buf.push_entry(LogLevel::Warn, "warn message".into());
buf.push_entry(LogLevel::Error, "error message".into());
let errors = buf.get_recent(100, None, Some(&LogLevel::Error));
assert_eq!(errors.len(), 1);
assert!(errors[0].contains("[ERROR]"));
assert!(errors[0].contains("error message"));
}
#[test]
fn filter_by_severity_warn_only() {
let buf = fresh_buffer();
buf.push_entry(LogLevel::Info, "info message".into());
buf.push_entry(LogLevel::Warn, "warn message".into());
buf.push_entry(LogLevel::Error, "error message".into());
let warns = buf.get_recent(100, None, Some(&LogLevel::Warn));
assert_eq!(warns.len(), 1);
assert!(warns[0].contains("[WARN]"));
assert!(warns[0].contains("warn message"));
}
#[test]
fn severity_levels_appear_in_formatted_output() {
let buf = fresh_buffer();
buf.push_entry(LogLevel::Info, "info".into());
buf.push_entry(LogLevel::Warn, "warn".into());
buf.push_entry(LogLevel::Error, "error".into());
let all = buf.get_recent(10, None, None);
assert_eq!(all.len(), 3);
assert!(all[0].contains("[INFO]"));
assert!(all[1].contains("[WARN]"));
assert!(all[2].contains("[ERROR]"));
}
#[test]
fn loglevel_from_str_ci() {
assert_eq!(LogLevel::from_str_ci("ERROR"), Some(LogLevel::Error));
assert_eq!(LogLevel::from_str_ci("error"), Some(LogLevel::Error));
assert_eq!(LogLevel::from_str_ci("WARN"), Some(LogLevel::Warn));
assert_eq!(LogLevel::from_str_ci("warn"), Some(LogLevel::Warn));
assert_eq!(LogLevel::from_str_ci("INFO"), Some(LogLevel::Info));
assert_eq!(LogLevel::from_str_ci("info"), Some(LogLevel::Info));
assert_eq!(LogLevel::from_str_ci("DEBUG"), None);
}
#[test]
fn colored_formatted_warn_has_yellow_ansi() {
let entry = LogEntry {
level: LogLevel::Warn,
timestamp: "2026-01-01T00:00:00Z".into(),
message: "test warning".into(),
};
let colored = entry.colored_formatted();
assert!(
colored.starts_with("\x1b[33m"),
"WARN should start with yellow ANSI code"
);
assert!(
colored.ends_with("\x1b[0m"),
"WARN should end with ANSI reset"
);
assert!(colored.contains("[WARN]"));
assert!(colored.contains("test warning"));
}
#[test]
fn colored_formatted_error_has_red_ansi() {
let entry = LogEntry {
level: LogLevel::Error,
timestamp: "2026-01-01T00:00:00Z".into(),
message: "test error".into(),
};
let colored = entry.colored_formatted();
assert!(
colored.starts_with("\x1b[31m"),
"ERROR should start with red ANSI code"
);
assert!(
colored.ends_with("\x1b[0m"),
"ERROR should end with ANSI reset"
);
assert!(colored.contains("[ERROR]"));
assert!(colored.contains("test error"));
}
#[test]
fn colored_formatted_info_has_no_ansi() {
let entry = LogEntry {
level: LogLevel::Info,
timestamp: "2026-01-01T00:00:00Z".into(),
message: "test info".into(),
};
let colored = entry.colored_formatted();
assert!(
!colored.contains("\x1b["),
"INFO should have no ANSI escape codes"
);
assert!(colored.contains("[INFO]"));
assert!(colored.contains("test info"));
}
2026-05-14 11:19:15 +00:00
#[test]
fn read_from_disk_returns_history_beyond_ring() {
// Create a unique temp dir so parallel test runs do not interfere.
let tmp = std::env::temp_dir().join(format!(
"huskies_log_test_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
));
std::fs::create_dir_all(&tmp).unwrap();
let buf = LogBuffer::new();
buf.set_log_dir(tmp.clone());
// Write CAPACITY + 1 entries so entry 0 is evicted from the ring buffer.
for i in 0..=CAPACITY {
buf.push_entry(LogLevel::Info, format!("history-line-{i}"));
}
// The ring has evicted "history-line-0" (replaced by "history-line-1000").
let ring = buf.get_recent(CAPACITY + 1, None, None);
assert_eq!(
ring.len(),
CAPACITY,
"ring must hold exactly CAPACITY entries"
);
assert!(
!ring.iter().any(|l| l.ends_with("history-line-0")),
"history-line-0 must have been evicted from the ring"
);
// The on-disk path still has it.
let disk = buf.read_from_disk(Some("history-line-0"), 0, 10);
let _ = std::fs::remove_dir_all(&tmp);
// "history-line-0" as a substring only matches the entry whose message
// is exactly "history-line-0", not "history-line-100" etc.
let matching: Vec<_> = disk
.iter()
.filter(|l| l.ends_with("history-line-0"))
.collect();
assert!(
!matching.is_empty(),
"disk log must surface evicted line 0 (disk results: {:?})",
&disk[..disk.len().min(5)]
);
}
#[test]
fn read_from_disk_offset_skips_lines() {
let tmp = std::env::temp_dir().join(format!(
"huskies_log_test_offset_{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
));
std::fs::create_dir_all(&tmp).unwrap();
let buf = LogBuffer::new();
buf.set_log_dir(tmp.clone());
for i in 0..5 {
buf.push_entry(LogLevel::Info, format!("offset-test-{i}"));
}
let all = buf.read_from_disk(Some("offset-test-"), 0, 10);
let skipped = buf.read_from_disk(Some("offset-test-"), 2, 10);
let _ = std::fs::remove_dir_all(&tmp);
assert_eq!(all.len(), 5);
assert_eq!(skipped.len(), 3, "offset=2 should skip first 2 matches");
assert!(skipped[0].contains("offset-test-2"));
}
#[test]
fn read_from_disk_no_dir_returns_empty() {
let buf = LogBuffer::new();
// No set_log_dir call — dir is None
let result = buf.read_from_disk(None, 0, 100);
assert!(result.is_empty());
}
#[test]
fn ring_buffer_entries_have_no_ansi_codes() {
let buf = fresh_buffer();
buf.push_entry(LogLevel::Info, "info msg".into());
buf.push_entry(LogLevel::Warn, "warn msg".into());
buf.push_entry(LogLevel::Error, "error msg".into());
let recent = buf.get_recent(10, None, None);
assert_eq!(recent.len(), 3);
for line in &recent {
assert!(
!line.contains("\x1b["),
"Ring buffer entry should not contain ANSI codes: {line}"
);
}
}
}