feat(424): rate-limit traffic-light dots and hard-block alerts
- Add HardBlock variant to WatcherEvent (story_id, agent_name, reset_time) - In pty.rs, distinguish allowed_warning (throttle) from hard blocks; emit RateLimitWarning for throttles, HardBlock for actual 429s - Add `throttled: bool` field to StoryAgent / AgentInfo - Pool spawns a background listener that sets throttled=true on RateLimitWarning or HardBlock events and fires AgentStateChanged - Status command shows traffic-light dots: ○ idle, ● running, ◑ throttled, ✗ blocked - Read blocked flag from story front matter for the ✗ dot - Notifications: RateLimitWarning silenced (too noisy); HardBlock sends urgent chat notification with optional reset time - Tests added for traffic_light_dot, read_story_blocked, status output, and all notification paths
This commit is contained in:
@@ -188,6 +188,8 @@ pub struct AgentInfo {
|
|||||||
pub completion: Option<CompletionReport>,
|
pub completion: Option<CompletionReport>,
|
||||||
/// UUID identifying the persistent log file for this session.
|
/// UUID identifying the persistent log file for this session.
|
||||||
pub log_session_id: Option<String>,
|
pub log_session_id: Option<String>,
|
||||||
|
/// True when a rate-limit throttle warning was received for this agent.
|
||||||
|
pub throttled: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -271,6 +271,7 @@ impl AgentPool {
|
|||||||
project_root: Some(project_root.to_path_buf()),
|
project_root: Some(project_root.to_path_buf()),
|
||||||
log_session_id: Some(log_session_id.clone()),
|
log_session_id: Some(log_session_id.clone()),
|
||||||
merge_failure_reported: false,
|
merge_failure_reported: false,
|
||||||
|
throttled: false,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,13 +41,47 @@ pub struct AgentPool {
|
|||||||
|
|
||||||
impl AgentPool {
|
impl AgentPool {
|
||||||
pub fn new(port: u16, watcher_tx: broadcast::Sender<WatcherEvent>) -> Self {
|
pub fn new(port: u16, watcher_tx: broadcast::Sender<WatcherEvent>) -> Self {
|
||||||
Self {
|
let pool = Self {
|
||||||
agents: Arc::new(Mutex::new(HashMap::new())),
|
agents: Arc::new(Mutex::new(HashMap::new())),
|
||||||
port,
|
port,
|
||||||
child_killers: Arc::new(Mutex::new(HashMap::new())),
|
child_killers: Arc::new(Mutex::new(HashMap::new())),
|
||||||
watcher_tx,
|
watcher_tx: watcher_tx.clone(),
|
||||||
merge_jobs: Arc::new(Mutex::new(HashMap::new())),
|
merge_jobs: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Spawn a background task (only when inside a tokio runtime) that
|
||||||
|
// listens for RateLimitWarning and HardBlock events and updates the
|
||||||
|
// throttled flag on the relevant agent so status dots stay current.
|
||||||
|
if tokio::runtime::Handle::try_current().is_ok() {
|
||||||
|
let agents_clone = Arc::clone(&pool.agents);
|
||||||
|
let watcher_tx_clone = watcher_tx.clone();
|
||||||
|
let mut rx = watcher_tx.subscribe();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
let event = match rx.recv().await {
|
||||||
|
Ok(e) => e,
|
||||||
|
Err(broadcast::error::RecvError::Closed) => break,
|
||||||
|
Err(broadcast::error::RecvError::Lagged(_)) => continue,
|
||||||
|
};
|
||||||
|
let (story_id, agent_name) = match &event {
|
||||||
|
WatcherEvent::RateLimitWarning { story_id, agent_name }
|
||||||
|
| WatcherEvent::HardBlock { story_id, agent_name, .. } => {
|
||||||
|
(story_id.clone(), agent_name.clone())
|
||||||
|
}
|
||||||
|
_ => continue,
|
||||||
|
};
|
||||||
|
let key = composite_key(&story_id, &agent_name);
|
||||||
|
if let Ok(mut agents) = agents_clone.lock() {
|
||||||
|
if let Some(agent) = agents.get_mut(&key) {
|
||||||
|
agent.throttled = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let _ = watcher_tx_clone.send(WatcherEvent::AgentStateChanged);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pool
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn port(&self) -> u16 {
|
pub fn port(&self) -> u16 {
|
||||||
|
|||||||
@@ -80,6 +80,8 @@ pub(super) struct StoryAgent {
|
|||||||
/// worktree (which compiles fine) and returns `gates_passed=true` even
|
/// worktree (which compiles fine) and returns `gates_passed=true` even
|
||||||
/// though the code was never squash-merged onto master.
|
/// though the code was never squash-merged onto master.
|
||||||
pub(super) merge_failure_reported: bool,
|
pub(super) merge_failure_reported: bool,
|
||||||
|
/// Set to `true` when a rate-limit throttle warning was received for this agent.
|
||||||
|
pub(super) throttled: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Build an `AgentInfo` snapshot from a `StoryAgent` map entry.
|
/// Build an `AgentInfo` snapshot from a `StoryAgent` map entry.
|
||||||
@@ -99,5 +101,6 @@ pub(super) fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> Agent
|
|||||||
.map(|wt| wt.base_branch.clone()),
|
.map(|wt| wt.base_branch.clone()),
|
||||||
completion: agent.completion.clone(),
|
completion: agent.completion.clone(),
|
||||||
log_session_id: agent.log_session_id.clone(),
|
log_session_id: agent.log_session_id.clone(),
|
||||||
|
throttled: agent.throttled,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -49,6 +49,44 @@ pub(super) fn story_short_label(stem: &str, name: Option<&str>) -> String {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read the `blocked` flag from a story file's YAML front matter.
|
||||||
|
///
|
||||||
|
/// Returns `true` when the story has `blocked: true` set (retry limit reached).
|
||||||
|
fn read_story_blocked(project_root: &std::path::Path, stage_dir: &str, stem: &str) -> bool {
|
||||||
|
let path = project_root
|
||||||
|
.join(".storkit")
|
||||||
|
.join("work")
|
||||||
|
.join(stage_dir)
|
||||||
|
.join(format!("{stem}.md"));
|
||||||
|
std::fs::read_to_string(path)
|
||||||
|
.ok()
|
||||||
|
.and_then(|c| crate::io::story_metadata::parse_front_matter(&c).ok())
|
||||||
|
.and_then(|m| m.blocked)
|
||||||
|
.unwrap_or(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Choose the traffic-light dot for a work item.
|
||||||
|
///
|
||||||
|
/// Priority: blocked > throttled > running > idle.
|
||||||
|
/// Uses compact Unicode characters (not large emoji) so the output stays
|
||||||
|
/// readable in plain-text chat clients.
|
||||||
|
///
|
||||||
|
/// - `●` running normally (active agent, no throttle)
|
||||||
|
/// - `◑` throttled (rate-limit warning received)
|
||||||
|
/// - `✗` hard-blocked (retry limit exceeded)
|
||||||
|
/// - `○` idle / no active agent
|
||||||
|
pub(super) fn traffic_light_dot(blocked: bool, throttled: bool, has_agent: bool) -> &'static str {
|
||||||
|
if blocked {
|
||||||
|
"\u{2717} " // ✗ — hard blocked
|
||||||
|
} else if throttled {
|
||||||
|
"\u{25D1} " // ◑ — throttled
|
||||||
|
} else if has_agent {
|
||||||
|
"\u{25CF} " // ● — running normally
|
||||||
|
} else {
|
||||||
|
"\u{25CB} " // ○ — idle / no agent
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Read all story IDs and names from a pipeline stage directory.
|
/// Read all story IDs and names from a pipeline stage directory.
|
||||||
fn read_stage_items(
|
fn read_stage_items(
|
||||||
project_root: &std::path::Path,
|
project_root: &std::path::Path,
|
||||||
@@ -130,18 +168,22 @@ pub(super) fn build_pipeline_status(project_root: &std::path::Path, agents: &Age
|
|||||||
.filter(|&&c| c > 0.0)
|
.filter(|&&c| c > 0.0)
|
||||||
.map(|c| format!(" — ${c:.2}"))
|
.map(|c| format!(" — ${c:.2}"))
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
if let Some(agent) = active_map.get(story_id) {
|
let blocked = read_story_blocked(project_root, dir, story_id);
|
||||||
|
let agent = active_map.get(story_id);
|
||||||
|
let throttled = agent.map(|a| a.throttled).unwrap_or(false);
|
||||||
|
let dot = traffic_light_dot(blocked, throttled, agent.is_some());
|
||||||
|
if let Some(agent) = agent {
|
||||||
let model_str = config
|
let model_str = config
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.and_then(|cfg| cfg.find_agent(&agent.agent_name))
|
.and_then(|cfg| cfg.find_agent(&agent.agent_name))
|
||||||
.and_then(|ac| ac.model.as_deref())
|
.and_then(|ac| ac.model.as_deref())
|
||||||
.unwrap_or("?");
|
.unwrap_or("?");
|
||||||
out.push_str(&format!(
|
out.push_str(&format!(
|
||||||
" • {display}{cost_suffix} — {} ({model_str})\n",
|
" {dot}{display}{cost_suffix} — {} ({model_str})\n",
|
||||||
agent.agent_name
|
agent.agent_name
|
||||||
));
|
));
|
||||||
} else {
|
} else {
|
||||||
out.push_str(&format!(" • {display}{cost_suffix}\n"));
|
out.push_str(&format!(" {dot}{display}{cost_suffix}\n"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -399,4 +441,107 @@ mod tests {
|
|||||||
"output must show aggregated cost: {output}"
|
"output must show aggregated cost: {output}"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// -- traffic_light_dot --------------------------------------------------
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dot_idle_when_no_agent() {
|
||||||
|
assert_eq!(traffic_light_dot(false, false, false), "\u{25CB} "); // ○
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dot_running_when_agent_not_throttled() {
|
||||||
|
assert_eq!(traffic_light_dot(false, false, true), "\u{25CF} "); // ●
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dot_throttled_when_agent_throttled() {
|
||||||
|
assert_eq!(traffic_light_dot(false, true, true), "\u{25D1} "); // ◑
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dot_blocked_takes_priority_over_throttled() {
|
||||||
|
assert_eq!(traffic_light_dot(true, true, true), "\u{2717} "); // ✗
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dot_blocked_when_no_agent_but_blocked_flag() {
|
||||||
|
assert_eq!(traffic_light_dot(true, false, false), "\u{2717} "); // ✗
|
||||||
|
}
|
||||||
|
|
||||||
|
// -- read_story_blocked --------------------------------------------------
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn read_story_blocked_returns_true_when_blocked() {
|
||||||
|
use tempfile::TempDir;
|
||||||
|
let tmp = TempDir::new().unwrap();
|
||||||
|
let stage_dir = tmp.path().join(".storkit/work/2_current");
|
||||||
|
std::fs::create_dir_all(&stage_dir).unwrap();
|
||||||
|
std::fs::write(
|
||||||
|
stage_dir.join("42_story_foo.md"),
|
||||||
|
"---\nname: Foo\nblocked: true\n---\n",
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert!(read_story_blocked(tmp.path(), "2_current", "42_story_foo"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn read_story_blocked_returns_false_when_not_blocked() {
|
||||||
|
use tempfile::TempDir;
|
||||||
|
let tmp = TempDir::new().unwrap();
|
||||||
|
let stage_dir = tmp.path().join(".storkit/work/2_current");
|
||||||
|
std::fs::create_dir_all(&stage_dir).unwrap();
|
||||||
|
std::fs::write(
|
||||||
|
stage_dir.join("42_story_foo.md"),
|
||||||
|
"---\nname: Foo\n---\n",
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert!(!read_story_blocked(tmp.path(), "2_current", "42_story_foo"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// -- status output shows idle dot for items with no active agent --------
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn status_shows_idle_dot_for_unassigned_story() {
|
||||||
|
use std::io::Write;
|
||||||
|
use tempfile::TempDir;
|
||||||
|
|
||||||
|
let tmp = TempDir::new().unwrap();
|
||||||
|
let stage_dir = tmp.path().join(".storkit/work/2_current");
|
||||||
|
std::fs::create_dir_all(&stage_dir).unwrap();
|
||||||
|
|
||||||
|
let story_path = stage_dir.join("42_story_idle.md");
|
||||||
|
let mut f = std::fs::File::create(&story_path).unwrap();
|
||||||
|
writeln!(f, "---\nname: Idle Story\n---\n").unwrap();
|
||||||
|
|
||||||
|
let agents = AgentPool::new_test(3000);
|
||||||
|
let output = build_pipeline_status(tmp.path(), &agents);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
output.contains("\u{25CB} "), // ○
|
||||||
|
"idle story should show empty-circle dot: {output}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn status_shows_blocked_dot_for_blocked_story() {
|
||||||
|
use std::io::Write;
|
||||||
|
use tempfile::TempDir;
|
||||||
|
|
||||||
|
let tmp = TempDir::new().unwrap();
|
||||||
|
let stage_dir = tmp.path().join(".storkit/work/2_current");
|
||||||
|
std::fs::create_dir_all(&stage_dir).unwrap();
|
||||||
|
|
||||||
|
let story_path = stage_dir.join("42_story_blocked.md");
|
||||||
|
let mut f = std::fs::File::create(&story_path).unwrap();
|
||||||
|
writeln!(f, "---\nname: Blocked Story\nblocked: true\n---\n").unwrap();
|
||||||
|
|
||||||
|
let agents = AgentPool::new_test(3000);
|
||||||
|
let output = build_pipeline_status(tmp.path(), &agents);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
output.contains("\u{2717} "), // ✗
|
||||||
|
"blocked story should show X dot: {output}"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user