From 73614fe5e83f6e16ab4a94258f7d2a6591dbba32 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 24 Feb 2026 13:48:25 +0000 Subject: [PATCH] story-kit: merge 141_story_improve_server_logging_with_timestamps_and_error_visibility --- server/src/agents.rs | 40 ++-- server/src/http/mcp.rs | 42 +++-- server/src/llm/providers/claude_code.rs | 6 +- server/src/log_buffer.rs | 238 ++++++++++++++++++++---- 4 files changed, 254 insertions(+), 72 deletions(-) diff --git a/server/src/agents.rs b/server/src/agents.rs index 3dfb171..a71023f 100644 --- a/server/src/agents.rs +++ b/server/src/agents.rs @@ -1,5 +1,7 @@ use crate::agent_log::AgentLogWriter; use crate::slog; +use crate::slog_error; +use crate::slog_warn; use crate::config::ProjectConfig; use crate::worktree::{self, WorktreeInfo}; use portable_pty::{CommandBuilder, PtySize, native_pty_system}; @@ -689,7 +691,7 @@ impl AgentPool { let agents = match self.agents.lock() { Ok(a) => a, Err(e) => { - slog!("[pipeline] Failed to lock agents for '{story_id}:{agent_name}': {e}"); + slog_error!("[pipeline] Failed to lock agents for '{story_id}:{agent_name}': {e}"); return; } }; @@ -707,14 +709,14 @@ impl AgentPool { let completion = match completion { Some(c) => c, None => { - slog!("[pipeline] No completion report for '{story_id}:{agent_name}'"); + slog_warn!("[pipeline] No completion report for '{story_id}:{agent_name}'"); return; } }; let project_root = match project_root { Some(p) => p, None => { - slog!("[pipeline] No project_root for '{story_id}:{agent_name}'"); + slog_warn!("[pipeline] No project_root for '{story_id}:{agent_name}'"); return; } }; @@ -731,14 +733,14 @@ impl AgentPool { "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. Moving to QA." ); if let Err(e) = move_story_to_qa(&project_root, story_id) { - slog!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"); + slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"); return; } if let Err(e) = self .start_agent(&project_root, story_id, Some("qa"), None) .await { - slog!("[pipeline] Failed to start qa agent for '{story_id}': {e}"); + slog_error!("[pipeline] Failed to start qa agent for '{story_id}': {e}"); } // Coder slot is now free — pick up any other unassigned work in 2_current/. self.auto_assign_available_work(&project_root).await; @@ -756,7 +758,7 @@ impl AgentPool { .start_agent(&project_root, story_id, Some(agent_name), Some(&context)) .await { - slog!( + slog_error!( "[pipeline] Failed to restart coder '{agent_name}' for '{story_id}': {e}" ); } @@ -771,7 +773,7 @@ impl AgentPool { tokio::task::spawn_blocking(move || run_coverage_gate(&cp)) .await .unwrap_or_else(|e| { - slog!("[pipeline] Coverage gate task panicked: {e}"); + slog_warn!("[pipeline] Coverage gate task panicked: {e}"); Ok((false, format!("Coverage gate task panicked: {e}"))) }); let (coverage_passed, coverage_output) = match coverage_result { @@ -784,14 +786,14 @@ impl AgentPool { "[pipeline] QA passed gates and coverage for '{story_id}'. Moving to merge." ); if let Err(e) = move_story_to_merge(&project_root, story_id) { - slog!("[pipeline] Failed to move '{story_id}' to 4_merge/: {e}"); + slog_error!("[pipeline] Failed to move '{story_id}' to 4_merge/: {e}"); return; } if let Err(e) = self .start_agent(&project_root, story_id, Some("mergemaster"), None) .await { - slog!("[pipeline] Failed to start mergemaster for '{story_id}': {e}"); + slog_error!("[pipeline] Failed to start mergemaster for '{story_id}': {e}"); } // QA slot is now free — pick up any other unassigned work in 3_qa/. self.auto_assign_available_work(&project_root).await; @@ -809,7 +811,7 @@ impl AgentPool { .start_agent(&project_root, story_id, Some("qa"), Some(&context)) .await { - slog!("[pipeline] Failed to restart qa for '{story_id}': {e}"); + slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}"); } } } else { @@ -826,7 +828,7 @@ impl AgentPool { .start_agent(&project_root, story_id, Some("qa"), Some(&context)) .await { - slog!("[pipeline] Failed to restart qa for '{story_id}': {e}"); + slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}"); } } } @@ -839,7 +841,7 @@ impl AgentPool { let test_result = tokio::task::spawn_blocking(move || run_project_tests(&root)) .await .unwrap_or_else(|e| { - slog!("[pipeline] Post-merge test task panicked: {e}"); + slog_warn!("[pipeline] Post-merge test task panicked: {e}"); Ok((false, format!("Test task panicked: {e}"))) }); let (passed, output) = match test_result { @@ -852,7 +854,7 @@ impl AgentPool { "[pipeline] Post-merge tests passed for '{story_id}'. Archiving." ); if let Err(e) = move_story_to_archived(&project_root, story_id) { - slog!("[pipeline] Failed to archive '{story_id}': {e}"); + slog_error!("[pipeline] Failed to archive '{story_id}': {e}"); } self.remove_agents_for_story(story_id); // Mergemaster slot is now free — pick up any other items in 4_merge/. @@ -886,7 +888,7 @@ impl AgentPool { .start_agent(&project_root, story_id, Some("mergemaster"), Some(&context)) .await { - slog!( + slog_error!( "[pipeline] Failed to restart mergemaster for '{story_id}': {e}" ); } @@ -1211,7 +1213,7 @@ impl AgentPool { let config = match ProjectConfig::load(project_root) { Ok(c) => c, Err(e) => { - slog!("[auto-assign] Failed to load project config: {e}"); + slog_warn!("[auto-assign] Failed to load project config: {e}"); return; } }; @@ -1236,7 +1238,7 @@ impl AgentPool { let agents = match self.agents.lock() { Ok(a) => a, Err(e) => { - slog!("[auto-assign] Failed to lock agents: {e}"); + slog_error!("[auto-assign] Failed to lock agents: {e}"); break; } }; @@ -1612,7 +1614,7 @@ impl AgentPool { let mut agents = match self.agents.lock() { Ok(a) => a, Err(e) => { - slog!("[agents] Failed to lock pool for cleanup of '{story_id}': {e}"); + slog_error!("[agents] Failed to lock pool for cleanup of '{story_id}': {e}"); return 0; } }; @@ -1638,7 +1640,7 @@ impl AgentPool { let mut agents = match self.agents.lock() { Ok(a) => a, Err(e) => { - slog!("[reaper] Failed to lock pool for TTL reaping: {e}"); + slog_warn!("[reaper] Failed to lock pool for TTL reaping: {e}"); return 0; } }; @@ -2988,7 +2990,7 @@ fn run_agent_pty_blocking( break; } Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { - slog!( + slog_warn!( "[agent:{story_id}:{agent_name}] Inactivity timeout after \ {inactivity_timeout_secs}s with no output. Killing process." ); diff --git a/server/src/http/mcp.rs b/server/src/http/mcp.rs index fafe36e..f679d18 100644 --- a/server/src/http/mcp.rs +++ b/server/src/http/mcp.rs @@ -1,6 +1,7 @@ use crate::agents::{close_bug_to_archive, move_story_to_archived, move_story_to_merge, move_story_to_qa}; use crate::config::ProjectConfig; use crate::log_buffer; +use crate::slog_warn; use crate::http::context::AppContext; use crate::http::settings::get_editor_command_from_store; use crate::http::workflow::{ @@ -757,6 +758,10 @@ fn handle_tools_list(id: Option) -> JsonRpcResponse { "filter": { "type": "string", "description": "Optional substring filter (e.g. 'watcher', 'mcp', 'permission')" + }, + "severity": { + "type": "string", + "description": "Filter by severity level: ERROR, WARN, or INFO. Returns only entries at that level." } } } @@ -848,13 +853,16 @@ async fn handle_tools_call( "content": [{ "type": "text", "text": content }] }), ), - Err(msg) => JsonRpcResponse::success( - id, - json!({ - "content": [{ "type": "text", "text": msg }], - "isError": true - }), - ), + Err(msg) => { + slog_warn!("[mcp] Tool call failed: tool={tool_name} error={msg}"); + JsonRpcResponse::success( + id, + json!({ + "content": [{ "type": "text", "text": msg }], + "isError": true + }), + ) + } } } @@ -1577,12 +1585,15 @@ fn tool_get_server_logs(args: &Value) -> Result { .map(|n| n.min(1000) as usize) .unwrap_or(100); let filter = args.get("filter").and_then(|v| v.as_str()); + let severity = args + .get("severity") + .and_then(|v| v.as_str()) + .and_then(log_buffer::LogLevel::from_str_ci); - // Fetch extra buffer entries to account for multi-line entries within each - let fetch = lines_count.saturating_mul(4).min(4000); - let recent = log_buffer::global().get_recent(fetch, filter); - // Flatten buffer entries into individual lines, then take the last lines_count - let all_lines: Vec<&str> = recent.iter().flat_map(|s| s.lines()).collect(); + let recent = log_buffer::global().get_recent(lines_count, filter, severity.as_ref()); + let joined = recent.join("\n"); + // Clamp to lines_count actual lines in case any entry contains embedded newlines. + let all_lines: Vec<&str> = joined.lines().collect(); let start = all_lines.len().saturating_sub(lines_count); Ok(all_lines[start..].join("\n")) } @@ -1620,12 +1631,17 @@ async fn tool_prompt_permission(args: &Value, ctx: &AppContext) -> Result ( + type Channels = ( tokio::sync::mpsc::UnboundedSender, tokio::sync::mpsc::UnboundedReceiver, tokio::sync::mpsc::UnboundedSender, tokio::sync::mpsc::UnboundedReceiver, std::sync::mpsc::Sender, std::sync::mpsc::Receiver, - ) { + ); + + fn make_channels() -> Channels { let (tok_tx, tok_rx) = tokio::sync::mpsc::unbounded_channel(); let (act_tx, act_rx) = tokio::sync::mpsc::unbounded_channel(); let (msg_tx, msg_rx) = std::sync::mpsc::channel(); diff --git a/server/src/log_buffer.rs b/server/src/log_buffer.rs index 5a06a3a..a06f05d 100644 --- a/server/src/log_buffer.rs +++ b/server/src/log_buffer.rs @@ -1,48 +1,112 @@ //! Bounded in-memory ring buffer for server log output. //! -//! Use the [`slog!`] macro as a drop-in replacement for `eprintln!`. It writes -//! to stderr (same as before) and simultaneously appends the line to the global -//! ring buffer, making it retrievable via the `get_server_logs` MCP tool. +//! Use the [`slog!`] macro (INFO), [`slog_warn!`] (WARN), or [`slog_error!`] +//! (ERROR) as drop-in replacements for `eprintln!`. Each call writes to stderr +//! with an ISO 8601 timestamp + severity prefix, and simultaneously appends +//! the entry to the global ring buffer, making it retrievable via the +//! `get_server_logs` MCP tool. use std::collections::VecDeque; use std::sync::{Mutex, OnceLock}; const CAPACITY: usize = 1000; +/// Severity level for a log entry. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum LogLevel { + Error, + Warn, + Info, +} + +impl LogLevel { + pub fn as_str(&self) -> &'static str { + match self { + LogLevel::Error => "ERROR", + LogLevel::Warn => "WARN", + LogLevel::Info => "INFO", + } + } + + /// Parse from a case-insensitive string. Returns `None` for unknown levels. + pub fn from_str_ci(s: &str) -> Option { + match s.to_uppercase().as_str() { + "ERROR" => Some(LogLevel::Error), + "WARN" => Some(LogLevel::Warn), + "INFO" => Some(LogLevel::Info), + _ => None, + } + } +} + +/// A single captured log entry. +#[derive(Debug, Clone)] +pub struct LogEntry { + pub level: LogLevel, + /// ISO 8601 UTC timestamp. + pub timestamp: String, + pub message: String, +} + +impl LogEntry { + /// Format the entry as a single log line: `{timestamp} [{LEVEL}] {message}`. + pub fn formatted(&self) -> String { + format!("{} [{}] {}", self.timestamp, self.level.as_str(), self.message) + } +} + pub struct LogBuffer { - lines: Mutex>, + entries: Mutex>, } impl LogBuffer { fn new() -> Self { Self { - lines: Mutex::new(VecDeque::with_capacity(CAPACITY)), + entries: Mutex::new(VecDeque::with_capacity(CAPACITY)), } } - /// Append a log line, evicting the oldest entry when at capacity. - pub fn push(&self, line: String) { - if let Ok(mut buf) = self.lines.lock() { + /// Append a log entry, evicting the oldest when at capacity. + pub fn push_entry(&self, level: LogLevel, message: String) { + let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(); + let entry = LogEntry { + level, + timestamp, + message, + }; + let line = entry.formatted(); + eprintln!("{line}"); + if let Ok(mut buf) = self.entries.lock() { if buf.len() >= CAPACITY { buf.pop_front(); } - buf.push_back(line); + buf.push_back(entry); } } - /// Return up to `count` recent lines, optionally filtered by a substring. + /// Return up to `count` recent log lines as formatted strings, + /// optionally filtered by substring and/or severity level. /// Lines are returned in chronological order (oldest first). - pub fn get_recent(&self, count: usize, filter: Option<&str>) -> Vec { - let buf = match self.lines.lock() { + pub fn get_recent( + &self, + count: usize, + filter: Option<&str>, + severity: Option<&LogLevel>, + ) -> Vec { + let buf = match self.entries.lock() { Ok(b) => b, Err(_) => return vec![], }; - let filtered: Vec<&String> = buf + let filtered: Vec = buf .iter() - .filter(|line| filter.is_none_or(|f| line.contains(f))) + .filter(|entry| { + severity.is_none_or(|s| &entry.level == s) + && filter.is_none_or(|f| entry.message.contains(f) || entry.formatted().contains(f)) + }) + .map(|entry| entry.formatted()) .collect(); let start = filtered.len().saturating_sub(count); - filtered[start..].iter().map(|s| (*s).clone()).collect() + filtered[start..].to_vec() } } @@ -53,7 +117,7 @@ pub fn global() -> &'static LogBuffer { GLOBAL.get_or_init(LogBuffer::new) } -/// Write a formatted message to stderr **and** capture it in the ring buffer. +/// Write an INFO log to stderr **and** capture it in the ring buffer. /// /// Usage is identical to `eprintln!`: /// ```ignore @@ -62,9 +126,32 @@ pub fn global() -> &'static LogBuffer { #[macro_export] macro_rules! slog { ($($arg:tt)*) => {{ - let _line = format!($($arg)*); - eprintln!("{}", _line); - $crate::log_buffer::global().push(_line); + $crate::log_buffer::global().push_entry( + $crate::log_buffer::LogLevel::Info, + format!($($arg)*), + ); + }}; +} + +/// Write a WARN log to stderr **and** capture it in the ring buffer. +#[macro_export] +macro_rules! slog_warn { + ($($arg:tt)*) => {{ + $crate::log_buffer::global().push_entry( + $crate::log_buffer::LogLevel::Warn, + format!($($arg)*), + ); + }}; +} + +/// Write an ERROR log to stderr **and** capture it in the ring buffer. +#[macro_export] +macro_rules! slog_error { + ($($arg:tt)*) => {{ + $crate::log_buffer::global().push_entry( + $crate::log_buffer::LogLevel::Error, + format!($($arg)*), + ); }}; } @@ -79,58 +166,133 @@ mod tests { #[test] fn push_and_retrieve() { let buf = fresh_buffer(); - buf.push("line one".into()); - buf.push("line two".into()); - let recent = buf.get_recent(10, None); - assert_eq!(recent, vec!["line one", "line two"]); + buf.push_entry(LogLevel::Info, "line one".into()); + buf.push_entry(LogLevel::Info, "line two".into()); + let recent = buf.get_recent(10, None, None); + assert_eq!(recent.len(), 2); + assert!(recent[0].contains("[INFO]") && recent[0].contains("line one")); + assert!(recent[1].contains("[INFO]") && recent[1].contains("line two")); } #[test] fn evicts_oldest_at_capacity() { let buf = LogBuffer { - lines: Mutex::new(VecDeque::with_capacity(CAPACITY)), + entries: Mutex::new(VecDeque::with_capacity(CAPACITY)), }; // Fill past capacity for i in 0..=CAPACITY { - buf.push(format!("line {i}")); + buf.push_entry(LogLevel::Info, format!("line {i}")); } - let recent = buf.get_recent(CAPACITY + 1, None); + let recent = buf.get_recent(CAPACITY + 1, None, None); // Should have exactly CAPACITY lines assert_eq!(recent.len(), CAPACITY); // The oldest (line 0) should have been evicted - assert!(!recent.iter().any(|l| l == "line 0")); + assert!(!recent.iter().any(|l| l.contains("line 0") && !l.contains("line 10"))); // The newest should be present - assert!(recent.iter().any(|l| l == &format!("line {CAPACITY}"))); + assert!(recent + .iter() + .any(|l| l.contains(&format!("line {CAPACITY}")))); } #[test] fn filter_by_substring() { let buf = fresh_buffer(); - buf.push("watcher started".into()); - buf.push("mcp call received".into()); - buf.push("watcher event".into()); + buf.push_entry(LogLevel::Info, "watcher started".into()); + buf.push_entry(LogLevel::Info, "mcp call received".into()); + buf.push_entry(LogLevel::Info, "watcher event".into()); - let filtered = buf.get_recent(100, Some("watcher")); + let filtered = buf.get_recent(100, Some("watcher"), None); assert_eq!(filtered.len(), 2); - assert_eq!(filtered[0], "watcher started"); - assert_eq!(filtered[1], "watcher event"); + assert!(filtered[0].contains("watcher started")); + assert!(filtered[1].contains("watcher event")); } #[test] fn count_limits_results() { let buf = fresh_buffer(); for i in 0..10 { - buf.push(format!("line {i}")); + buf.push_entry(LogLevel::Info, format!("line {i}")); } - let recent = buf.get_recent(3, None); + let recent = buf.get_recent(3, None, None); assert_eq!(recent.len(), 3); // Most recent 3 - assert_eq!(recent, vec!["line 7", "line 8", "line 9"]); + assert!(recent[0].contains("line 7")); + assert!(recent[1].contains("line 8")); + assert!(recent[2].contains("line 9")); } #[test] fn empty_buffer_returns_empty() { let buf = fresh_buffer(); - assert!(buf.get_recent(10, None).is_empty()); + assert!(buf.get_recent(10, None, None).is_empty()); + } + + #[test] + fn log_lines_include_iso8601_timestamp() { + let buf = fresh_buffer(); + buf.push_entry(LogLevel::Info, "timestamped message".into()); + let recent = buf.get_recent(1, None, None); + assert_eq!(recent.len(), 1); + // Timestamp format: YYYY-MM-DDTHH:MM:SSZ + let line = &recent[0]; + assert!( + line.len() > 20, + "Line should have timestamp prefix: {line}" + ); + // Check it starts with a 4-digit year + assert!(line.chars().next().unwrap().is_ascii_digit()); + assert!(line.contains('T')); + assert!(line.contains('Z')); + } + + #[test] + fn filter_by_severity_error_only() { + let buf = fresh_buffer(); + buf.push_entry(LogLevel::Info, "info message".into()); + buf.push_entry(LogLevel::Warn, "warn message".into()); + buf.push_entry(LogLevel::Error, "error message".into()); + + let errors = buf.get_recent(100, None, Some(&LogLevel::Error)); + assert_eq!(errors.len(), 1); + assert!(errors[0].contains("[ERROR]")); + assert!(errors[0].contains("error message")); + } + + #[test] + fn filter_by_severity_warn_only() { + let buf = fresh_buffer(); + buf.push_entry(LogLevel::Info, "info message".into()); + buf.push_entry(LogLevel::Warn, "warn message".into()); + buf.push_entry(LogLevel::Error, "error message".into()); + + let warns = buf.get_recent(100, None, Some(&LogLevel::Warn)); + assert_eq!(warns.len(), 1); + assert!(warns[0].contains("[WARN]")); + assert!(warns[0].contains("warn message")); + } + + #[test] + fn severity_levels_appear_in_formatted_output() { + let buf = fresh_buffer(); + buf.push_entry(LogLevel::Info, "info".into()); + buf.push_entry(LogLevel::Warn, "warn".into()); + buf.push_entry(LogLevel::Error, "error".into()); + + let all = buf.get_recent(10, None, None); + assert_eq!(all.len(), 3); + assert!(all[0].contains("[INFO]")); + assert!(all[1].contains("[WARN]")); + assert!(all[2].contains("[ERROR]")); + } + + #[test] + fn loglevel_from_str_ci() { + assert_eq!(LogLevel::from_str_ci("ERROR"), Some(LogLevel::Error)); + assert_eq!(LogLevel::from_str_ci("error"), Some(LogLevel::Error)); + assert_eq!(LogLevel::from_str_ci("WARN"), Some(LogLevel::Warn)); + assert_eq!(LogLevel::from_str_ci("warn"), Some(LogLevel::Warn)); + assert_eq!(LogLevel::from_str_ci("INFO"), Some(LogLevel::Info)); + assert_eq!(LogLevel::from_str_ci("info"), Some(LogLevel::Info)); + assert_eq!(LogLevel::from_str_ci("DEBUG"), None); } }