huskies: merge 726_story_notify_chat_transports_when_oauth_account_swaps_or_all_accounts_are_exhausted

This commit is contained in:
dave
2026-04-27 18:39:35 +00:00
parent 80661fa622
commit 4b64bc614f
8 changed files with 187 additions and 4 deletions
@@ -25,6 +25,7 @@ pub async fn run_bot(
services: Arc<Services>,
watcher_rx: tokio::sync::broadcast::Receiver<crate::io::watcher::WatcherEvent>,
watcher_rx_auto: tokio::sync::broadcast::Receiver<crate::io::watcher::WatcherEvent>,
watcher_tx: tokio::sync::broadcast::Sender<crate::io::watcher::WatcherEvent>,
shutdown_rx: watch::Receiver<Option<crate::rebuild::ShutdownReason>>,
gateway_active_project: Option<Arc<RwLock<String>>>,
gateway_projects: Vec<String>,
@@ -227,9 +228,12 @@ pub async fn run_bot(
let announce_bot_name = services.bot_name.clone();
// Auto-schedule timers when an agent hits a hard rate limit.
// Also emits OAuthAccountSwapped / OAuthAccountsExhausted events back into
// the watcher channel so the notification listener can forward them to chat.
crate::service::timer::spawn_rate_limit_auto_scheduler(
Arc::clone(&timer_store),
watcher_rx_auto,
watcher_tx,
);
// Subscribe to the status broadcaster if the matrix_status_consumer toggle is
+1
View File
@@ -103,6 +103,7 @@ pub fn spawn_bot(
services,
watcher_rx,
watcher_rx_auto,
watcher_tx,
shutdown_rx,
gateway_active_project,
gateway_projects,
+12
View File
@@ -85,6 +85,18 @@ pub enum WatcherEvent {
/// UTC instant at which the rate limit resets.
reset_at: chrono::DateTime<chrono::Utc>,
},
/// An OAuth account pool swap succeeded: a different account is now active.
/// Triggers a notification to chat transports naming the new account.
OAuthAccountSwapped {
/// Email address of the newly activated account.
new_email: String,
},
/// All OAuth accounts in the pool are rate-limited — no swap was possible.
/// Triggers a notification to chat transports with the earliest reset time.
OAuthAccountsExhausted {
/// Human-readable message describing when the earliest reset occurs.
earliest_reset_msg: String,
},
}
/// Return `true` if `path` is the root-level `.huskies/project.toml` or
@@ -17,6 +17,10 @@ pub enum EventAction {
RateLimitWarning,
/// Post a story-blocked notification.
StoryBlocked,
/// Post an OAuth account-swap notification naming the new account.
OAuthAccountSwapped,
/// Post an OAuth accounts-exhausted notification with the earliest reset time.
OAuthAccountsExhausted,
/// Log server-side only; do not post to chat (e.g. hard rate-limit blocks).
LogOnly,
/// Reload the project configuration.
@@ -42,6 +46,8 @@ pub fn classify(event: &WatcherEvent) -> EventAction {
WatcherEvent::StoryBlocked { .. } => EventAction::StoryBlocked,
WatcherEvent::RateLimitHardBlock { .. } => EventAction::LogOnly,
WatcherEvent::ConfigChanged => EventAction::ReloadConfig,
WatcherEvent::OAuthAccountSwapped { .. } => EventAction::OAuthAccountSwapped,
WatcherEvent::OAuthAccountsExhausted { .. } => EventAction::OAuthAccountsExhausted,
_ => EventAction::Skip,
}
}
@@ -116,4 +122,20 @@ mod tests {
EventAction::ReloadConfig
);
}
#[test]
fn oauth_account_swapped_is_classified_correctly() {
let event = WatcherEvent::OAuthAccountSwapped {
new_email: "new@example.com".to_string(),
};
assert_eq!(classify(&event), EventAction::OAuthAccountSwapped);
}
#[test]
fn oauth_accounts_exhausted_is_classified_correctly() {
let event = WatcherEvent::OAuthAccountsExhausted {
earliest_reset_msg: "All accounts rate-limited; earliest reset in 2h".to_string(),
};
assert_eq!(classify(&event), EventAction::OAuthAccountsExhausted);
}
}
@@ -93,6 +93,27 @@ pub fn format_rate_limit_notification(
(plain, html)
}
/// Format an OAuth account-swap notification message.
///
/// Sent when the pool successfully rotates to a new account after a rate-limit.
/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`.
pub fn format_oauth_account_swapped(new_email: &str) -> (String, String) {
let plain = format!("\u{1f504} OAuth account rotated \u{2014} now using {new_email}");
let html =
format!("\u{1f504} OAuth account rotated \u{2014} now using <strong>{new_email}</strong>");
(plain, html)
}
/// Format an OAuth accounts-exhausted notification message.
///
/// Sent when all pool accounts are rate-limited and no swap was possible.
/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`.
pub fn format_oauth_accounts_exhausted(earliest_reset_msg: &str) -> (String, String) {
let plain = format!("\u{26d4} {earliest_reset_msg}");
let html = format!("\u{26d4} {earliest_reset_msg}");
(plain, html)
}
#[cfg(test)]
mod tests {
use super::*;
+105 -2
View File
@@ -18,8 +18,9 @@ use tokio::sync::broadcast;
use super::events::classify;
use super::filter::{STAGE_TRANSITION_DEBOUNCE, should_send_rate_limit};
use super::format::{
format_blocked_notification, format_error_notification, format_rate_limit_notification,
format_stage_notification, stage_display_name,
format_blocked_notification, format_error_notification, format_oauth_account_swapped,
format_oauth_accounts_exhausted, format_rate_limit_notification, format_stage_notification,
stage_display_name,
};
use super::route::rooms_for_notification;
@@ -245,6 +246,39 @@ pub fn spawn_notification_listener(
}
}
}
EventAction::OAuthAccountSwapped => {
let WatcherEvent::OAuthAccountSwapped { ref new_email } = event else {
continue;
};
let (plain, html) = format_oauth_account_swapped(new_email);
slog!("[bot] Sending OAuth account-swap notification: {plain}");
for room_id in &rooms_for_notification(&get_room_ids) {
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
slog!(
"[bot] Failed to send OAuth account-swap notification \
to {room_id}: {e}"
);
}
}
}
EventAction::OAuthAccountsExhausted => {
let WatcherEvent::OAuthAccountsExhausted {
ref earliest_reset_msg,
} = event
else {
continue;
};
let (plain, html) = format_oauth_accounts_exhausted(earliest_reset_msg);
slog!("[bot] Sending OAuth accounts-exhausted notification: {plain}");
for room_id in &rooms_for_notification(&get_room_ids) {
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
slog!(
"[bot] Failed to send OAuth accounts-exhausted notification \
to {room_id}: {e}"
);
}
}
}
EventAction::LogOnly => {
// Hard-block: log server-side for debugging; do NOT post to chat.
// Hard-block auto-resume is normal operation — the status command
@@ -872,6 +906,75 @@ mod tests {
);
}
// ── OAuthAccountSwapped / OAuthAccountsExhausted ────────────────────────
/// AC1: OAuthAccountSwapped fires a notification naming the new account.
#[tokio::test]
async fn oauth_account_swapped_sends_notification_with_new_email() {
let tmp = tempfile::tempdir().unwrap();
let (watcher_tx, watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let (transport, calls) = MockTransport::new();
spawn_notification_listener(
transport,
|| vec!["!room1:example.org".to_string()],
watcher_rx,
tmp.path().to_path_buf(),
);
watcher_tx
.send(WatcherEvent::OAuthAccountSwapped {
new_email: "alice@example.com".to_string(),
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let calls = calls.lock().unwrap();
assert_eq!(calls.len(), 1, "Expected exactly one notification");
let (room_id, plain, _html) = &calls[0];
assert_eq!(room_id, "!room1:example.org");
assert!(
plain.contains("alice@example.com"),
"notification should name the new account; got: {plain}"
);
}
/// AC2: OAuthAccountsExhausted fires a notification with the reset message.
#[tokio::test]
async fn oauth_accounts_exhausted_sends_notification_with_reset_msg() {
let tmp = tempfile::tempdir().unwrap();
let (watcher_tx, watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let (transport, calls) = MockTransport::new();
spawn_notification_listener(
transport,
|| vec!["!room1:example.org".to_string()],
watcher_rx,
tmp.path().to_path_buf(),
);
watcher_tx
.send(WatcherEvent::OAuthAccountsExhausted {
earliest_reset_msg: "All OAuth accounts are rate-limited; earliest reset in 3h"
.to_string(),
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let calls = calls.lock().unwrap();
assert_eq!(calls.len(), 1, "Expected exactly one notification");
let (room_id, plain, _html) = &calls[0];
assert_eq!(room_id, "!room1:example.org");
assert!(
plain.contains("rate-limited"),
"notification should contain reset message; got: {plain}"
);
}
/// Stories that skip QA (qa: server) move directly from Current to Merge.
/// The notification must say "Current → Merge", not "QA → Merge".
#[tokio::test]
+19 -2
View File
@@ -305,11 +305,17 @@ fn panic_payload_to_string(payload: &Box<dyn std::any::Any + Send>) -> String {
/// Spawn a background task that listens for [`WatcherEvent::RateLimitHardBlock`]
/// events and auto-schedules a timer for the blocked story.
///
/// When an OAuth account swap succeeds, emits [`WatcherEvent::OAuthAccountSwapped`]
/// so chat transports can notify the user which account took over. When all
/// accounts are exhausted, emits [`WatcherEvent::OAuthAccountsExhausted`] with
/// the earliest reset time.
///
/// If a timer already exists for the story, it is updated to the later reset time
/// rather than creating a duplicate (via [`TimerStore::upsert`]).
pub fn spawn_rate_limit_auto_scheduler(
store: Arc<TimerStore>,
mut watcher_rx: tokio::sync::broadcast::Receiver<crate::io::watcher::WatcherEvent>,
watcher_tx: tokio::sync::broadcast::Sender<crate::io::watcher::WatcherEvent>,
) {
tokio::spawn(async move {
loop {
@@ -344,6 +350,9 @@ pub fn spawn_rate_limit_auto_scheduler(
(agent {agent_name}): now using '{new_email}'. \
Auto-assign will restart the agent with the new account."
);
let _ = watcher_tx.send(
crate::io::watcher::WatcherEvent::OAuthAccountSwapped { new_email },
);
// No timer needed — auto-assign picks up the story.
continue;
}
@@ -352,6 +361,14 @@ pub fn spawn_rate_limit_auto_scheduler(
"[timer] Account swap not possible for story {story_id}: \
{swap_err}. Falling back to timer-based retry."
);
// Notify chat transports when all accounts are exhausted.
if swap_err.contains("All OAuth accounts are rate-limited") {
let _ = watcher_tx.send(
crate::io::watcher::WatcherEvent::OAuthAccountsExhausted {
earliest_reset_msg: swap_err.clone(),
},
);
}
}
}
@@ -561,7 +578,7 @@ mod tests {
let store = Arc::new(TimerStore::load(dir.path().join("timers.json")));
let (watcher_tx, watcher_rx) = tokio::sync::broadcast::channel::<WatcherEvent>(16);
spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx);
spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx, watcher_tx.clone());
let reset_at = Utc::now() + Duration::hours(1);
watcher_tx
@@ -591,7 +608,7 @@ mod tests {
let store = Arc::new(TimerStore::load(dir.path().join("timers.json")));
let (watcher_tx, watcher_rx) = tokio::sync::broadcast::channel::<WatcherEvent>(16);
spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx);
spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx, watcher_tx.clone());
let first = Utc::now() + Duration::hours(1);
let second = Utc::now() + Duration::hours(2);
+3
View File
@@ -193,6 +193,9 @@ pub fn watcher_event_to_response(e: WatcherEvent) -> Option<WsResponse> {
WatcherEvent::RateLimitWarning { .. } => None,
WatcherEvent::StoryBlocked { .. } => None,
WatcherEvent::RateLimitHardBlock { .. } => None,
// OAuth events are forwarded to chat transports only; no WebSocket message for the frontend.
WatcherEvent::OAuthAccountSwapped { .. } => None,
WatcherEvent::OAuthAccountsExhausted { .. } => None,
}
}