From 4b64bc614fd096c8a19546fad39f7b546c265e69 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 27 Apr 2026 18:39:35 +0000 Subject: [PATCH] huskies: merge 726_story_notify_chat_transports_when_oauth_account_swaps_or_all_accounts_are_exhausted --- server/src/chat/transport/matrix/bot/run.rs | 4 + server/src/chat/transport/matrix/mod.rs | 1 + server/src/io/watcher.rs | 12 +++ server/src/service/notifications/events.rs | 22 ++++ server/src/service/notifications/format.rs | 21 ++++ server/src/service/notifications/io.rs | 107 +++++++++++++++++++- server/src/service/timer/io.rs | 21 +++- server/src/service/ws/message.rs | 3 + 8 files changed, 187 insertions(+), 4 deletions(-) diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index 488ec4fe..c5541abb 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -25,6 +25,7 @@ pub async fn run_bot( services: Arc, watcher_rx: tokio::sync::broadcast::Receiver, watcher_rx_auto: tokio::sync::broadcast::Receiver, + watcher_tx: tokio::sync::broadcast::Sender, shutdown_rx: watch::Receiver>, gateway_active_project: Option>>, gateway_projects: Vec, @@ -227,9 +228,12 @@ pub async fn run_bot( let announce_bot_name = services.bot_name.clone(); // Auto-schedule timers when an agent hits a hard rate limit. + // Also emits OAuthAccountSwapped / OAuthAccountsExhausted events back into + // the watcher channel so the notification listener can forward them to chat. crate::service::timer::spawn_rate_limit_auto_scheduler( Arc::clone(&timer_store), watcher_rx_auto, + watcher_tx, ); // Subscribe to the status broadcaster if the matrix_status_consumer toggle is diff --git a/server/src/chat/transport/matrix/mod.rs b/server/src/chat/transport/matrix/mod.rs index 57b59a15..68135629 100644 --- a/server/src/chat/transport/matrix/mod.rs +++ b/server/src/chat/transport/matrix/mod.rs @@ -103,6 +103,7 @@ pub fn spawn_bot( services, watcher_rx, watcher_rx_auto, + watcher_tx, shutdown_rx, gateway_active_project, gateway_projects, diff --git a/server/src/io/watcher.rs b/server/src/io/watcher.rs index 570199bb..e19d5a39 100644 --- a/server/src/io/watcher.rs +++ b/server/src/io/watcher.rs @@ -85,6 +85,18 @@ pub enum WatcherEvent { /// UTC instant at which the rate limit resets. reset_at: chrono::DateTime, }, + /// An OAuth account pool swap succeeded: a different account is now active. + /// Triggers a notification to chat transports naming the new account. + OAuthAccountSwapped { + /// Email address of the newly activated account. + new_email: String, + }, + /// All OAuth accounts in the pool are rate-limited — no swap was possible. + /// Triggers a notification to chat transports with the earliest reset time. + OAuthAccountsExhausted { + /// Human-readable message describing when the earliest reset occurs. + earliest_reset_msg: String, + }, } /// Return `true` if `path` is the root-level `.huskies/project.toml` or diff --git a/server/src/service/notifications/events.rs b/server/src/service/notifications/events.rs index 340582df..67bcbc17 100644 --- a/server/src/service/notifications/events.rs +++ b/server/src/service/notifications/events.rs @@ -17,6 +17,10 @@ pub enum EventAction { RateLimitWarning, /// Post a story-blocked notification. StoryBlocked, + /// Post an OAuth account-swap notification naming the new account. + OAuthAccountSwapped, + /// Post an OAuth accounts-exhausted notification with the earliest reset time. + OAuthAccountsExhausted, /// Log server-side only; do not post to chat (e.g. hard rate-limit blocks). LogOnly, /// Reload the project configuration. @@ -42,6 +46,8 @@ pub fn classify(event: &WatcherEvent) -> EventAction { WatcherEvent::StoryBlocked { .. } => EventAction::StoryBlocked, WatcherEvent::RateLimitHardBlock { .. } => EventAction::LogOnly, WatcherEvent::ConfigChanged => EventAction::ReloadConfig, + WatcherEvent::OAuthAccountSwapped { .. } => EventAction::OAuthAccountSwapped, + WatcherEvent::OAuthAccountsExhausted { .. } => EventAction::OAuthAccountsExhausted, _ => EventAction::Skip, } } @@ -116,4 +122,20 @@ mod tests { EventAction::ReloadConfig ); } + + #[test] + fn oauth_account_swapped_is_classified_correctly() { + let event = WatcherEvent::OAuthAccountSwapped { + new_email: "new@example.com".to_string(), + }; + assert_eq!(classify(&event), EventAction::OAuthAccountSwapped); + } + + #[test] + fn oauth_accounts_exhausted_is_classified_correctly() { + let event = WatcherEvent::OAuthAccountsExhausted { + earliest_reset_msg: "All accounts rate-limited; earliest reset in 2h".to_string(), + }; + assert_eq!(classify(&event), EventAction::OAuthAccountsExhausted); + } } diff --git a/server/src/service/notifications/format.rs b/server/src/service/notifications/format.rs index c75f6b1b..d8cb3182 100644 --- a/server/src/service/notifications/format.rs +++ b/server/src/service/notifications/format.rs @@ -93,6 +93,27 @@ pub fn format_rate_limit_notification( (plain, html) } +/// Format an OAuth account-swap notification message. +/// +/// Sent when the pool successfully rotates to a new account after a rate-limit. +/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. +pub fn format_oauth_account_swapped(new_email: &str) -> (String, String) { + let plain = format!("\u{1f504} OAuth account rotated \u{2014} now using {new_email}"); + let html = + format!("\u{1f504} OAuth account rotated \u{2014} now using {new_email}"); + (plain, html) +} + +/// Format an OAuth accounts-exhausted notification message. +/// +/// Sent when all pool accounts are rate-limited and no swap was possible. +/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. +pub fn format_oauth_accounts_exhausted(earliest_reset_msg: &str) -> (String, String) { + let plain = format!("\u{26d4} {earliest_reset_msg}"); + let html = format!("\u{26d4} {earliest_reset_msg}"); + (plain, html) +} + #[cfg(test)] mod tests { use super::*; diff --git a/server/src/service/notifications/io.rs b/server/src/service/notifications/io.rs index 04556172..d3b69395 100644 --- a/server/src/service/notifications/io.rs +++ b/server/src/service/notifications/io.rs @@ -18,8 +18,9 @@ use tokio::sync::broadcast; use super::events::classify; use super::filter::{STAGE_TRANSITION_DEBOUNCE, should_send_rate_limit}; use super::format::{ - format_blocked_notification, format_error_notification, format_rate_limit_notification, - format_stage_notification, stage_display_name, + format_blocked_notification, format_error_notification, format_oauth_account_swapped, + format_oauth_accounts_exhausted, format_rate_limit_notification, format_stage_notification, + stage_display_name, }; use super::route::rooms_for_notification; @@ -245,6 +246,39 @@ pub fn spawn_notification_listener( } } } + EventAction::OAuthAccountSwapped => { + let WatcherEvent::OAuthAccountSwapped { ref new_email } = event else { + continue; + }; + let (plain, html) = format_oauth_account_swapped(new_email); + slog!("[bot] Sending OAuth account-swap notification: {plain}"); + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!( + "[bot] Failed to send OAuth account-swap notification \ + to {room_id}: {e}" + ); + } + } + } + EventAction::OAuthAccountsExhausted => { + let WatcherEvent::OAuthAccountsExhausted { + ref earliest_reset_msg, + } = event + else { + continue; + }; + let (plain, html) = format_oauth_accounts_exhausted(earliest_reset_msg); + slog!("[bot] Sending OAuth accounts-exhausted notification: {plain}"); + for room_id in &rooms_for_notification(&get_room_ids) { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!( + "[bot] Failed to send OAuth accounts-exhausted notification \ + to {room_id}: {e}" + ); + } + } + } EventAction::LogOnly => { // Hard-block: log server-side for debugging; do NOT post to chat. // Hard-block auto-resume is normal operation — the status command @@ -872,6 +906,75 @@ mod tests { ); } + // ── OAuthAccountSwapped / OAuthAccountsExhausted ──────────────────────── + + /// AC1: OAuthAccountSwapped fires a notification naming the new account. + #[tokio::test] + async fn oauth_account_swapped_sends_notification_with_new_email() { + let tmp = tempfile::tempdir().unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room1:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + watcher_tx + .send(WatcherEvent::OAuthAccountSwapped { + new_email: "alice@example.com".to_string(), + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 1, "Expected exactly one notification"); + let (room_id, plain, _html) = &calls[0]; + assert_eq!(room_id, "!room1:example.org"); + assert!( + plain.contains("alice@example.com"), + "notification should name the new account; got: {plain}" + ); + } + + /// AC2: OAuthAccountsExhausted fires a notification with the reset message. + #[tokio::test] + async fn oauth_accounts_exhausted_sends_notification_with_reset_msg() { + let tmp = tempfile::tempdir().unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room1:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + watcher_tx + .send(WatcherEvent::OAuthAccountsExhausted { + earliest_reset_msg: "All OAuth accounts are rate-limited; earliest reset in 3h" + .to_string(), + }) + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 1, "Expected exactly one notification"); + let (room_id, plain, _html) = &calls[0]; + assert_eq!(room_id, "!room1:example.org"); + assert!( + plain.contains("rate-limited"), + "notification should contain reset message; got: {plain}" + ); + } + /// Stories that skip QA (qa: server) move directly from Current to Merge. /// The notification must say "Current → Merge", not "QA → Merge". #[tokio::test] diff --git a/server/src/service/timer/io.rs b/server/src/service/timer/io.rs index 83b43cb2..b19273df 100644 --- a/server/src/service/timer/io.rs +++ b/server/src/service/timer/io.rs @@ -305,11 +305,17 @@ fn panic_payload_to_string(payload: &Box) -> String { /// Spawn a background task that listens for [`WatcherEvent::RateLimitHardBlock`] /// events and auto-schedules a timer for the blocked story. /// +/// When an OAuth account swap succeeds, emits [`WatcherEvent::OAuthAccountSwapped`] +/// so chat transports can notify the user which account took over. When all +/// accounts are exhausted, emits [`WatcherEvent::OAuthAccountsExhausted`] with +/// the earliest reset time. +/// /// If a timer already exists for the story, it is updated to the later reset time /// rather than creating a duplicate (via [`TimerStore::upsert`]). pub fn spawn_rate_limit_auto_scheduler( store: Arc, mut watcher_rx: tokio::sync::broadcast::Receiver, + watcher_tx: tokio::sync::broadcast::Sender, ) { tokio::spawn(async move { loop { @@ -344,6 +350,9 @@ pub fn spawn_rate_limit_auto_scheduler( (agent {agent_name}): now using '{new_email}'. \ Auto-assign will restart the agent with the new account." ); + let _ = watcher_tx.send( + crate::io::watcher::WatcherEvent::OAuthAccountSwapped { new_email }, + ); // No timer needed — auto-assign picks up the story. continue; } @@ -352,6 +361,14 @@ pub fn spawn_rate_limit_auto_scheduler( "[timer] Account swap not possible for story {story_id}: \ {swap_err}. Falling back to timer-based retry." ); + // Notify chat transports when all accounts are exhausted. + if swap_err.contains("All OAuth accounts are rate-limited") { + let _ = watcher_tx.send( + crate::io::watcher::WatcherEvent::OAuthAccountsExhausted { + earliest_reset_msg: swap_err.clone(), + }, + ); + } } } @@ -561,7 +578,7 @@ mod tests { let store = Arc::new(TimerStore::load(dir.path().join("timers.json"))); let (watcher_tx, watcher_rx) = tokio::sync::broadcast::channel::(16); - spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx); + spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx, watcher_tx.clone()); let reset_at = Utc::now() + Duration::hours(1); watcher_tx @@ -591,7 +608,7 @@ mod tests { let store = Arc::new(TimerStore::load(dir.path().join("timers.json"))); let (watcher_tx, watcher_rx) = tokio::sync::broadcast::channel::(16); - spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx); + spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx, watcher_tx.clone()); let first = Utc::now() + Duration::hours(1); let second = Utc::now() + Duration::hours(2); diff --git a/server/src/service/ws/message.rs b/server/src/service/ws/message.rs index 174b5a27..ec6d9d99 100644 --- a/server/src/service/ws/message.rs +++ b/server/src/service/ws/message.rs @@ -193,6 +193,9 @@ pub fn watcher_event_to_response(e: WatcherEvent) -> Option { WatcherEvent::RateLimitWarning { .. } => None, WatcherEvent::StoryBlocked { .. } => None, WatcherEvent::RateLimitHardBlock { .. } => None, + // OAuth events are forwarded to chat transports only; no WebSocket message for the frontend. + WatcherEvent::OAuthAccountSwapped { .. } => None, + WatcherEvent::OAuthAccountsExhausted { .. } => None, } }