diff --git a/server/src/agents/mod.rs b/server/src/agents/mod.rs index c5661a55..81bbb379 100644 --- a/server/src/agents/mod.rs +++ b/server/src/agents/mod.rs @@ -188,6 +188,8 @@ pub struct AgentInfo { pub completion: Option, /// UUID identifying the persistent log file for this session. pub log_session_id: Option, + /// True when a rate-limit throttle warning was received for this agent. + pub throttled: bool, } #[cfg(test)] diff --git a/server/src/agents/pool/lifecycle.rs b/server/src/agents/pool/lifecycle.rs index b8fa5d15..dbe11183 100644 --- a/server/src/agents/pool/lifecycle.rs +++ b/server/src/agents/pool/lifecycle.rs @@ -271,6 +271,7 @@ impl AgentPool { project_root: Some(project_root.to_path_buf()), log_session_id: Some(log_session_id.clone()), merge_failure_reported: false, + throttled: false, }, ); } diff --git a/server/src/agents/pool/mod.rs b/server/src/agents/pool/mod.rs index a7610f11..e68381eb 100644 --- a/server/src/agents/pool/mod.rs +++ b/server/src/agents/pool/mod.rs @@ -41,13 +41,47 @@ pub struct AgentPool { impl AgentPool { pub fn new(port: u16, watcher_tx: broadcast::Sender) -> Self { - Self { + let pool = Self { agents: Arc::new(Mutex::new(HashMap::new())), port, child_killers: Arc::new(Mutex::new(HashMap::new())), - watcher_tx, + watcher_tx: watcher_tx.clone(), merge_jobs: Arc::new(Mutex::new(HashMap::new())), + }; + + // Spawn a background task (only when inside a tokio runtime) that + // listens for RateLimitWarning and HardBlock events and updates the + // throttled flag on the relevant agent so status dots stay current. + if tokio::runtime::Handle::try_current().is_ok() { + let agents_clone = Arc::clone(&pool.agents); + let watcher_tx_clone = watcher_tx.clone(); + let mut rx = watcher_tx.subscribe(); + tokio::spawn(async move { + loop { + let event = match rx.recv().await { + Ok(e) => e, + Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(_)) => continue, + }; + let (story_id, agent_name) = match &event { + WatcherEvent::RateLimitWarning { story_id, agent_name } + | WatcherEvent::HardBlock { story_id, agent_name, .. } => { + (story_id.clone(), agent_name.clone()) + } + _ => continue, + }; + let key = composite_key(&story_id, &agent_name); + if let Ok(mut agents) = agents_clone.lock() { + if let Some(agent) = agents.get_mut(&key) { + agent.throttled = true; + } + } + let _ = watcher_tx_clone.send(WatcherEvent::AgentStateChanged); + } + }); } + + pool } pub fn port(&self) -> u16 { diff --git a/server/src/agents/pool/types.rs b/server/src/agents/pool/types.rs index 7d2d8367..345ace0a 100644 --- a/server/src/agents/pool/types.rs +++ b/server/src/agents/pool/types.rs @@ -80,6 +80,8 @@ pub(super) struct StoryAgent { /// worktree (which compiles fine) and returns `gates_passed=true` even /// though the code was never squash-merged onto master. pub(super) merge_failure_reported: bool, + /// Set to `true` when a rate-limit throttle warning was received for this agent. + pub(super) throttled: bool, } /// Build an `AgentInfo` snapshot from a `StoryAgent` map entry. @@ -99,5 +101,6 @@ pub(super) fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> Agent .map(|wt| wt.base_branch.clone()), completion: agent.completion.clone(), log_session_id: agent.log_session_id.clone(), + throttled: agent.throttled, } } diff --git a/server/src/chat/commands/status.rs b/server/src/chat/commands/status.rs index f56dbb05..34272076 100644 --- a/server/src/chat/commands/status.rs +++ b/server/src/chat/commands/status.rs @@ -49,6 +49,44 @@ pub(super) fn story_short_label(stem: &str, name: Option<&str>) -> String { } } +/// Read the `blocked` flag from a story file's YAML front matter. +/// +/// Returns `true` when the story has `blocked: true` set (retry limit reached). +fn read_story_blocked(project_root: &std::path::Path, stage_dir: &str, stem: &str) -> bool { + let path = project_root + .join(".storkit") + .join("work") + .join(stage_dir) + .join(format!("{stem}.md")); + std::fs::read_to_string(path) + .ok() + .and_then(|c| crate::io::story_metadata::parse_front_matter(&c).ok()) + .and_then(|m| m.blocked) + .unwrap_or(false) +} + +/// Choose the traffic-light dot for a work item. +/// +/// Priority: blocked > throttled > running > idle. +/// Uses compact Unicode characters (not large emoji) so the output stays +/// readable in plain-text chat clients. +/// +/// - `●` running normally (active agent, no throttle) +/// - `◑` throttled (rate-limit warning received) +/// - `✗` hard-blocked (retry limit exceeded) +/// - `○` idle / no active agent +pub(super) fn traffic_light_dot(blocked: bool, throttled: bool, has_agent: bool) -> &'static str { + if blocked { + "\u{2717} " // ✗ — hard blocked + } else if throttled { + "\u{25D1} " // ◑ — throttled + } else if has_agent { + "\u{25CF} " // ● — running normally + } else { + "\u{25CB} " // ○ — idle / no agent + } +} + /// Read all story IDs and names from a pipeline stage directory. fn read_stage_items( project_root: &std::path::Path, @@ -130,18 +168,22 @@ pub(super) fn build_pipeline_status(project_root: &std::path::Path, agents: &Age .filter(|&&c| c > 0.0) .map(|c| format!(" — ${c:.2}")) .unwrap_or_default(); - if let Some(agent) = active_map.get(story_id) { + let blocked = read_story_blocked(project_root, dir, story_id); + let agent = active_map.get(story_id); + let throttled = agent.map(|a| a.throttled).unwrap_or(false); + let dot = traffic_light_dot(blocked, throttled, agent.is_some()); + if let Some(agent) = agent { let model_str = config .as_ref() .and_then(|cfg| cfg.find_agent(&agent.agent_name)) .and_then(|ac| ac.model.as_deref()) .unwrap_or("?"); out.push_str(&format!( - " • {display}{cost_suffix} — {} ({model_str})\n", + " {dot}{display}{cost_suffix} — {} ({model_str})\n", agent.agent_name )); } else { - out.push_str(&format!(" • {display}{cost_suffix}\n")); + out.push_str(&format!(" {dot}{display}{cost_suffix}\n")); } } } @@ -399,4 +441,107 @@ mod tests { "output must show aggregated cost: {output}" ); } + + // -- traffic_light_dot -------------------------------------------------- + + #[test] + fn dot_idle_when_no_agent() { + assert_eq!(traffic_light_dot(false, false, false), "\u{25CB} "); // ○ + } + + #[test] + fn dot_running_when_agent_not_throttled() { + assert_eq!(traffic_light_dot(false, false, true), "\u{25CF} "); // ● + } + + #[test] + fn dot_throttled_when_agent_throttled() { + assert_eq!(traffic_light_dot(false, true, true), "\u{25D1} "); // ◑ + } + + #[test] + fn dot_blocked_takes_priority_over_throttled() { + assert_eq!(traffic_light_dot(true, true, true), "\u{2717} "); // ✗ + } + + #[test] + fn dot_blocked_when_no_agent_but_blocked_flag() { + assert_eq!(traffic_light_dot(true, false, false), "\u{2717} "); // ✗ + } + + // -- read_story_blocked -------------------------------------------------- + + #[test] + fn read_story_blocked_returns_true_when_blocked() { + use tempfile::TempDir; + let tmp = TempDir::new().unwrap(); + let stage_dir = tmp.path().join(".storkit/work/2_current"); + std::fs::create_dir_all(&stage_dir).unwrap(); + std::fs::write( + stage_dir.join("42_story_foo.md"), + "---\nname: Foo\nblocked: true\n---\n", + ) + .unwrap(); + assert!(read_story_blocked(tmp.path(), "2_current", "42_story_foo")); + } + + #[test] + fn read_story_blocked_returns_false_when_not_blocked() { + use tempfile::TempDir; + let tmp = TempDir::new().unwrap(); + let stage_dir = tmp.path().join(".storkit/work/2_current"); + std::fs::create_dir_all(&stage_dir).unwrap(); + std::fs::write( + stage_dir.join("42_story_foo.md"), + "---\nname: Foo\n---\n", + ) + .unwrap(); + assert!(!read_story_blocked(tmp.path(), "2_current", "42_story_foo")); + } + + // -- status output shows idle dot for items with no active agent -------- + + #[test] + fn status_shows_idle_dot_for_unassigned_story() { + use std::io::Write; + use tempfile::TempDir; + + let tmp = TempDir::new().unwrap(); + let stage_dir = tmp.path().join(".storkit/work/2_current"); + std::fs::create_dir_all(&stage_dir).unwrap(); + + let story_path = stage_dir.join("42_story_idle.md"); + let mut f = std::fs::File::create(&story_path).unwrap(); + writeln!(f, "---\nname: Idle Story\n---\n").unwrap(); + + let agents = AgentPool::new_test(3000); + let output = build_pipeline_status(tmp.path(), &agents); + + assert!( + output.contains("\u{25CB} "), // ○ + "idle story should show empty-circle dot: {output}" + ); + } + + #[test] + fn status_shows_blocked_dot_for_blocked_story() { + use std::io::Write; + use tempfile::TempDir; + + let tmp = TempDir::new().unwrap(); + let stage_dir = tmp.path().join(".storkit/work/2_current"); + std::fs::create_dir_all(&stage_dir).unwrap(); + + let story_path = stage_dir.join("42_story_blocked.md"); + let mut f = std::fs::File::create(&story_path).unwrap(); + writeln!(f, "---\nname: Blocked Story\nblocked: true\n---\n").unwrap(); + + let agents = AgentPool::new_test(3000); + let output = build_pipeline_status(tmp.path(), &agents); + + assert!( + output.contains("\u{2717} "), // ✗ + "blocked story should show X dot: {output}" + ); + } }