feat(423): auto-schedule timer on rate limit to resume after reset
- pty.rs: detect rate_limit_event hard blocks, parse reset_at, emit WatcherEvent::RateLimitHardBlock with story_id, agent_name, reset_at - watcher.rs: add RateLimitHardBlock variant to WatcherEvent enum - timer.rs: add TimerStore::upsert (add-or-update-to-later) and spawn_rate_limit_auto_scheduler (listens for RateLimitHardBlock, upserts timer for the blocked story) - notifications.rs: handle RateLimitHardBlock events with a debounced chat notification including the scheduled resume time; add format_rate_limit_hard_block_notification helper - matrix/mod.rs: subscribe second watcher_rx for auto-scheduler, pass it to run_bot - matrix/bot/run.rs: wire spawn_rate_limit_auto_scheduler into bot startup Tests cover: AC1 (hard block detection in pty), AC2 (auto-scheduler adds timer), AC3 (upsert deduplication), AC5 (chat notification sent), AC6 (worktree preserved — timer fires start_agent on existing worktree)
This commit is contained in:
+102
-7
@@ -347,13 +347,49 @@ fn run_agent_pty_blocking(
|
||||
// The raw JSON is still forwarded as AgentJson below.
|
||||
"assistant" | "user" => {}
|
||||
"rate_limit_event" => {
|
||||
slog!(
|
||||
"[agent:{story_id}:{agent_name}] API rate limit warning received"
|
||||
);
|
||||
let _ = watcher_tx.send(WatcherEvent::RateLimitWarning {
|
||||
story_id: story_id.to_string(),
|
||||
agent_name: agent_name.to_string(),
|
||||
});
|
||||
let rate_limit_info = json.get("rate_limit_info");
|
||||
let status = rate_limit_info
|
||||
.and_then(|i| i.get("status"))
|
||||
.and_then(|s| s.as_str())
|
||||
.unwrap_or("");
|
||||
let is_hard_block = !status.is_empty() && status != "allowed_warning";
|
||||
let reset_at = rate_limit_info
|
||||
.and_then(|i| i.get("reset_at"))
|
||||
.and_then(|r| r.as_str())
|
||||
.and_then(|r| chrono::DateTime::parse_from_rfc3339(r).ok())
|
||||
.map(|dt| dt.with_timezone(&chrono::Utc));
|
||||
|
||||
if is_hard_block {
|
||||
if let Some(reset_at) = reset_at {
|
||||
slog!(
|
||||
"[agent:{story_id}:{agent_name}] API rate limit hard block \
|
||||
(status={status}); resets at {reset_at}"
|
||||
);
|
||||
let _ = watcher_tx.send(WatcherEvent::RateLimitHardBlock {
|
||||
story_id: story_id.to_string(),
|
||||
agent_name: agent_name.to_string(),
|
||||
reset_at,
|
||||
});
|
||||
} else {
|
||||
slog!(
|
||||
"[agent:{story_id}:{agent_name}] API rate limit hard block \
|
||||
(status={status}); no reset_at in rate_limit_info"
|
||||
);
|
||||
let _ = watcher_tx.send(WatcherEvent::RateLimitWarning {
|
||||
story_id: story_id.to_string(),
|
||||
agent_name: agent_name.to_string(),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
slog!(
|
||||
"[agent:{story_id}:{agent_name}] API rate limit warning received \
|
||||
(status={status})"
|
||||
);
|
||||
let _ = watcher_tx.send(WatcherEvent::RateLimitWarning {
|
||||
story_id: story_id.to_string(),
|
||||
agent_name: agent_name.to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
"result" => {
|
||||
// Extract token usage from the result event.
|
||||
@@ -468,6 +504,65 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
/// AC1: hard block with `reset_at` emits `RateLimitHardBlock` with the
|
||||
/// correct story_id, agent_name, and parsed reset_at timestamp.
|
||||
#[tokio::test]
|
||||
async fn rate_limit_hard_block_sends_watcher_hard_block_event() {
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let script = tmp.path().join("emit_hard_block.sh");
|
||||
std::fs::write(
|
||||
&script,
|
||||
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"hard_block\",\"reset_at\":\"2099-01-01T12:00:00Z\"}}'\n",
|
||||
)
|
||||
.unwrap();
|
||||
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
|
||||
|
||||
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
||||
let (watcher_tx, mut watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
||||
let event_log = Arc::new(Mutex::new(Vec::new()));
|
||||
let child_killers = Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
let result = run_agent_pty_streaming(
|
||||
"423_story_rate_limit",
|
||||
"coder-1",
|
||||
"sh",
|
||||
&[script.to_string_lossy().to_string()],
|
||||
"--",
|
||||
"/tmp",
|
||||
&tx,
|
||||
&event_log,
|
||||
None,
|
||||
0,
|
||||
child_killers,
|
||||
watcher_tx,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(result.is_ok(), "PTY run should succeed: {:?}", result.err());
|
||||
|
||||
let evt = watcher_rx
|
||||
.try_recv()
|
||||
.expect("Expected a RateLimitHardBlock to be sent on watcher_tx");
|
||||
match evt {
|
||||
WatcherEvent::RateLimitHardBlock {
|
||||
story_id,
|
||||
agent_name,
|
||||
reset_at,
|
||||
} => {
|
||||
assert_eq!(story_id, "423_story_rate_limit");
|
||||
assert_eq!(agent_name, "coder-1");
|
||||
assert_eq!(
|
||||
reset_at.to_rfc3339(),
|
||||
"2099-01-01T12:00:00+00:00",
|
||||
"reset_at should match the parsed timestamp"
|
||||
);
|
||||
}
|
||||
other => panic!("Expected RateLimitHardBlock, got: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_emit_event_writes_to_log_writer() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
|
||||
@@ -83,6 +83,31 @@ impl TimerStore {
|
||||
self.timers.lock().unwrap().clone()
|
||||
}
|
||||
|
||||
/// Add or update a timer for `story_id`.
|
||||
///
|
||||
/// - If no timer exists for `story_id`, adds it.
|
||||
/// - If a timer already exists and `scheduled_at` is **later**, updates it.
|
||||
/// - If a timer already exists and `scheduled_at` is earlier or equal, no-op.
|
||||
///
|
||||
/// Use this instead of [`add`] when auto-scheduling from rate-limit events to
|
||||
/// avoid creating duplicates and to always keep the latest reset time.
|
||||
pub fn upsert(&self, story_id: String, scheduled_at: DateTime<Utc>) -> Result<(), String> {
|
||||
let mut timers = self.timers.lock().unwrap();
|
||||
if let Some(existing) = timers.iter_mut().find(|t| t.story_id == story_id) {
|
||||
if scheduled_at > existing.scheduled_at {
|
||||
existing.scheduled_at = scheduled_at;
|
||||
Self::save_locked(&self.path, &timers)?;
|
||||
}
|
||||
} else {
|
||||
timers.push(TimerEntry {
|
||||
story_id,
|
||||
scheduled_at,
|
||||
});
|
||||
Self::save_locked(&self.path, &timers)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove and return all timers whose `scheduled_at` is ≤ `now`.
|
||||
/// Persists the updated list to disk if any timers were removed.
|
||||
pub fn take_due(&self, now: DateTime<Utc>) -> Vec<TimerEntry> {
|
||||
@@ -150,6 +175,58 @@ pub fn spawn_timer_tick_loop(
|
||||
});
|
||||
}
|
||||
|
||||
/// Spawn a background task that listens for [`WatcherEvent::RateLimitHardBlock`]
|
||||
/// events and auto-schedules a timer for the blocked story.
|
||||
///
|
||||
/// If a timer already exists for the story, it is updated to the later reset time
|
||||
/// rather than creating a duplicate (via [`TimerStore::upsert`]).
|
||||
pub fn spawn_rate_limit_auto_scheduler(
|
||||
store: Arc<TimerStore>,
|
||||
mut watcher_rx: tokio::sync::broadcast::Receiver<crate::io::watcher::WatcherEvent>,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match watcher_rx.recv().await {
|
||||
Ok(crate::io::watcher::WatcherEvent::RateLimitHardBlock {
|
||||
story_id,
|
||||
agent_name,
|
||||
reset_at,
|
||||
}) => {
|
||||
crate::slog!(
|
||||
"[timer] Auto-scheduling timer for story {story_id} \
|
||||
(agent {agent_name}) to resume at {reset_at}"
|
||||
);
|
||||
match store.upsert(story_id.clone(), reset_at) {
|
||||
Ok(()) => {
|
||||
crate::slog!(
|
||||
"[timer] Timer upserted for story {story_id}; \
|
||||
scheduled at {reset_at}"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
crate::slog!(
|
||||
"[timer] Failed to upsert timer for story {story_id}: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
crate::slog!(
|
||||
"[timer] Rate-limit auto-scheduler lagged, skipped {n} events"
|
||||
);
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
crate::slog!(
|
||||
"[timer] Watcher channel closed, stopping rate-limit auto-scheduler"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// ── Command types ──────────────────────────────────────────────────────────
|
||||
|
||||
/// A parsed `timer` command.
|
||||
@@ -602,6 +679,117 @@ mod tests {
|
||||
assert_eq!(store.list()[0].story_id, "future_story");
|
||||
}
|
||||
|
||||
// ── AC3: upsert ─────────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn upsert_adds_new_timer_when_none_exists() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let store = TimerStore::load(dir.path().join("timers.json"));
|
||||
let t = Utc::now() + Duration::hours(1);
|
||||
store.upsert("story_1".to_string(), t).unwrap();
|
||||
let list = store.list();
|
||||
assert_eq!(list.len(), 1);
|
||||
assert_eq!(list[0].story_id, "story_1");
|
||||
assert_eq!(list[0].scheduled_at, t);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn upsert_updates_to_later_time() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let store = TimerStore::load(dir.path().join("timers.json"));
|
||||
let early = Utc::now() + Duration::hours(1);
|
||||
let later = Utc::now() + Duration::hours(2);
|
||||
store.upsert("story_1".to_string(), early).unwrap();
|
||||
store.upsert("story_1".to_string(), later).unwrap();
|
||||
let list = store.list();
|
||||
assert_eq!(list.len(), 1, "should not create duplicate");
|
||||
assert_eq!(list[0].scheduled_at, later, "should update to later time");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn upsert_does_not_downgrade_to_earlier_time() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let store = TimerStore::load(dir.path().join("timers.json"));
|
||||
let later = Utc::now() + Duration::hours(2);
|
||||
let earlier = Utc::now() + Duration::hours(1);
|
||||
store.upsert("story_1".to_string(), later).unwrap();
|
||||
store.upsert("story_1".to_string(), earlier).unwrap();
|
||||
let list = store.list();
|
||||
assert_eq!(list.len(), 1);
|
||||
assert_eq!(
|
||||
list[0].scheduled_at, later,
|
||||
"should keep the later time, not downgrade"
|
||||
);
|
||||
}
|
||||
|
||||
// ── AC2: spawn_rate_limit_auto_scheduler ────────────────────────────
|
||||
|
||||
/// AC2: a RateLimitHardBlock event causes the auto-scheduler to add a timer.
|
||||
#[tokio::test]
|
||||
async fn rate_limit_auto_scheduler_adds_timer_on_hard_block() {
|
||||
use crate::io::watcher::WatcherEvent;
|
||||
|
||||
let dir = TempDir::new().unwrap();
|
||||
let store = Arc::new(TimerStore::load(dir.path().join("timers.json")));
|
||||
let (watcher_tx, watcher_rx) = tokio::sync::broadcast::channel::<WatcherEvent>(16);
|
||||
|
||||
spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx);
|
||||
|
||||
let reset_at = Utc::now() + Duration::hours(1);
|
||||
watcher_tx
|
||||
.send(WatcherEvent::RateLimitHardBlock {
|
||||
story_id: "423_story_test".to_string(),
|
||||
agent_name: "coder-1".to_string(),
|
||||
reset_at,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Give the spawned task time to process the event.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
|
||||
let list = store.list();
|
||||
assert_eq!(list.len(), 1, "expected one timer after hard block");
|
||||
assert_eq!(list[0].story_id, "423_story_test");
|
||||
assert_eq!(list[0].scheduled_at, reset_at);
|
||||
}
|
||||
|
||||
/// AC3 integration: a second hard block with a later reset_at updates the
|
||||
/// existing timer rather than creating a duplicate.
|
||||
#[tokio::test]
|
||||
async fn rate_limit_auto_scheduler_upserts_on_repeated_hard_block() {
|
||||
use crate::io::watcher::WatcherEvent;
|
||||
|
||||
let dir = TempDir::new().unwrap();
|
||||
let store = Arc::new(TimerStore::load(dir.path().join("timers.json")));
|
||||
let (watcher_tx, watcher_rx) = tokio::sync::broadcast::channel::<WatcherEvent>(16);
|
||||
|
||||
spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx);
|
||||
|
||||
let first = Utc::now() + Duration::hours(1);
|
||||
let second = Utc::now() + Duration::hours(2);
|
||||
|
||||
watcher_tx
|
||||
.send(WatcherEvent::RateLimitHardBlock {
|
||||
story_id: "423_story_test".to_string(),
|
||||
agent_name: "coder-1".to_string(),
|
||||
reset_at: first,
|
||||
})
|
||||
.unwrap();
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
watcher_tx
|
||||
.send(WatcherEvent::RateLimitHardBlock {
|
||||
story_id: "423_story_test".to_string(),
|
||||
agent_name: "coder-1".to_string(),
|
||||
reset_at: second,
|
||||
})
|
||||
.unwrap();
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
|
||||
let list = store.list();
|
||||
assert_eq!(list.len(), 1, "should not create a duplicate timer");
|
||||
assert_eq!(list[0].scheduled_at, second, "should update to later time");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_timers_same_time_all_returned() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
|
||||
@@ -22,6 +22,7 @@ pub async fn run_bot(
|
||||
config: super::super::config::BotConfig,
|
||||
project_root: PathBuf,
|
||||
watcher_rx: tokio::sync::broadcast::Receiver<crate::io::watcher::WatcherEvent>,
|
||||
watcher_rx_auto: tokio::sync::broadcast::Receiver<crate::io::watcher::WatcherEvent>,
|
||||
perm_rx: Arc<TokioMutex<mpsc::UnboundedReceiver<crate::http::context::PermissionForward>>>,
|
||||
agents: Arc<AgentPool>,
|
||||
shutdown_rx: watch::Receiver<Option<crate::rebuild::ShutdownReason>>,
|
||||
@@ -224,6 +225,11 @@ pub async fn run_bot(
|
||||
Arc::clone(&agents),
|
||||
project_root.clone(),
|
||||
);
|
||||
// Auto-schedule timers when an agent hits a hard rate limit.
|
||||
crate::chat::timer::spawn_rate_limit_auto_scheduler(
|
||||
Arc::clone(&timer_store),
|
||||
watcher_rx_auto,
|
||||
);
|
||||
|
||||
let ctx = BotContext {
|
||||
bot_user_id,
|
||||
|
||||
@@ -90,8 +90,11 @@ pub fn spawn_bot(
|
||||
|
||||
let root = project_root.to_path_buf();
|
||||
let watcher_rx = watcher_tx.subscribe();
|
||||
let watcher_rx_auto = watcher_tx.subscribe();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = bot::run_bot(config, root, watcher_rx, perm_rx, agents, shutdown_rx).await
|
||||
if let Err(e) =
|
||||
bot::run_bot(config, root, watcher_rx, watcher_rx_auto, perm_rx, agents, shutdown_rx)
|
||||
.await
|
||||
{
|
||||
crate::slog!("[matrix-bot] Fatal error: {e}");
|
||||
}
|
||||
|
||||
@@ -136,6 +136,31 @@ pub fn format_blocked_notification(
|
||||
/// Minimum time between rate-limit notifications for the same agent.
|
||||
const RATE_LIMIT_DEBOUNCE: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Format a rate limit hard block notification message with scheduled resume time.
|
||||
///
|
||||
/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`.
|
||||
pub fn format_rate_limit_hard_block_notification(
|
||||
item_id: &str,
|
||||
story_name: Option<&str>,
|
||||
agent_name: &str,
|
||||
resume_at: chrono::DateTime<chrono::Utc>,
|
||||
) -> (String, String) {
|
||||
let number = extract_story_number(item_id).unwrap_or(item_id);
|
||||
let name = story_name.unwrap_or(item_id);
|
||||
let local_time = resume_at.with_timezone(&chrono::Local);
|
||||
let resume_str = local_time.format("%Y-%m-%d %H:%M").to_string();
|
||||
|
||||
let plain = format!(
|
||||
"\u{1f6d1} #{number} {name} \u{2014} {agent_name} hit a hard rate limit; \
|
||||
will auto-resume at {resume_str}"
|
||||
);
|
||||
let html = format!(
|
||||
"\u{1f6d1} <strong>#{number}</strong> <em>{name}</em> \u{2014} \
|
||||
{agent_name} hit a hard rate limit; will auto-resume at {resume_str}"
|
||||
);
|
||||
(plain, html)
|
||||
}
|
||||
|
||||
/// Format a rate limit warning notification message.
|
||||
///
|
||||
/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`.
|
||||
@@ -288,6 +313,45 @@ pub fn spawn_notification_listener(
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(WatcherEvent::RateLimitHardBlock {
|
||||
ref story_id,
|
||||
ref agent_name,
|
||||
reset_at,
|
||||
}) => {
|
||||
// Debounce: reuse the same key as RateLimitWarning so both
|
||||
// types are rate-limited together for the same agent.
|
||||
let debounce_key = format!("{story_id}:{agent_name}");
|
||||
let now = Instant::now();
|
||||
if let Some(&last) = rate_limit_last_notified.get(&debounce_key)
|
||||
&& now.duration_since(last) < RATE_LIMIT_DEBOUNCE
|
||||
{
|
||||
slog!(
|
||||
"[bot] Rate-limit hard-block notification debounced for \
|
||||
{story_id}:{agent_name}"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
rate_limit_last_notified.insert(debounce_key, now);
|
||||
|
||||
let story_name = find_story_name_any_stage(&project_root, story_id);
|
||||
let (plain, html) = format_rate_limit_hard_block_notification(
|
||||
story_id,
|
||||
story_name.as_deref(),
|
||||
agent_name,
|
||||
reset_at,
|
||||
);
|
||||
|
||||
slog!("[bot] Sending rate-limit hard-block notification: {plain}");
|
||||
|
||||
for room_id in &get_room_ids() {
|
||||
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
|
||||
slog!(
|
||||
"[bot] Failed to send rate-limit hard-block notification \
|
||||
to {room_id}: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => {} // Ignore non-work-item events
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
slog!(
|
||||
|
||||
@@ -75,6 +75,16 @@ pub enum WatcherEvent {
|
||||
/// Human-readable reason the story was blocked.
|
||||
reason: String,
|
||||
},
|
||||
/// An agent hit a hard API rate limit and will be blocked until `reset_at`.
|
||||
/// Triggers auto-scheduling of a timer and a notification with the resume time.
|
||||
RateLimitHardBlock {
|
||||
/// Work item ID the agent is working on.
|
||||
story_id: String,
|
||||
/// Name of the agent that hit the hard rate limit.
|
||||
agent_name: String,
|
||||
/// UTC instant at which the rate limit resets.
|
||||
reset_at: chrono::DateTime<chrono::Utc>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Return `true` if `path` is the root-level `.storkit/project.toml`, i.e.
|
||||
|
||||
Reference in New Issue
Block a user