diff --git a/server/src/llm/oauth.rs b/server/src/llm/oauth.rs index 38616ee7..76ec4ee5 100644 --- a/server/src/llm/oauth.rs +++ b/server/src/llm/oauth.rs @@ -114,6 +114,11 @@ pub struct PoolAccount { /// Whether the server has observed a rate-limit response for this account. #[serde(default)] pub rate_limited: bool, + /// Unix-epoch milliseconds when the rate limit resets, if known. + /// + /// Used to compute "time until earliest reset" when all accounts are exhausted. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub rate_limit_reset_at: Option, } /// The multi-account login pool, stored in `~/.claude/oauth_pool.json`. @@ -173,22 +178,10 @@ pub fn upsert_pool_account(account: PoolAccount) -> Result<(), String> { write_pool(&pool) } -/// Refresh the OAuth access token using the stored refresh token. +/// Perform a token refresh HTTP call using `refresh_token`. /// -/// On success, updates `~/.claude/.credentials.json` with the new access -/// token and expiry, then returns `Ok(())`. -/// -/// On failure (e.g. refresh token expired), returns an error string. -pub async fn refresh_access_token() -> Result<(), String> { - slog!("[oauth] Attempting to refresh OAuth access token"); - - let mut creds = read_credentials()?; - let refresh_token = creds.claude_ai_oauth.refresh_token.clone(); - - if refresh_token.is_empty() { - return Err("No refresh token found. Run `claude login` to authenticate.".to_string()); - } - +/// Returns `(new_access_token, new_expires_at_ms)` on success. +async fn do_token_refresh(refresh_token: &str) -> Result<(String, u64), String> { let client = reqwest::Client::new(); let resp = client .post(TOKEN_ENDPOINT) @@ -208,7 +201,6 @@ pub async fn refresh_access_token() -> Result<(), String> { .map_err(|e| format!("Failed to read refresh response: {e}"))?; if !status.is_success() { - // Try to parse a structured error if let Ok(err) = serde_json::from_str::(&body) { let desc = err .error_description @@ -231,8 +223,30 @@ pub async fn refresh_access_token() -> Result<(), String> { .map(|d| d.as_millis() as u64) .unwrap_or(0); - creds.claude_ai_oauth.access_token = token_resp.access_token; - creds.claude_ai_oauth.expires_at = now_ms + (token_resp.expires_in * 1000); + let expires_at = now_ms + (token_resp.expires_in * 1000); + Ok((token_resp.access_token, expires_at)) +} + +/// Refresh the OAuth access token using the stored refresh token. +/// +/// On success, updates `~/.claude/.credentials.json` with the new access +/// token and expiry, then returns `Ok(())`. +/// +/// On failure (e.g. refresh token expired), returns an error string. +pub async fn refresh_access_token() -> Result<(), String> { + slog!("[oauth] Attempting to refresh OAuth access token"); + + let mut creds = read_credentials()?; + let refresh_token = creds.claude_ai_oauth.refresh_token.clone(); + + if refresh_token.is_empty() { + return Err("No refresh token found. Run `claude login` to authenticate.".to_string()); + } + + let (new_access_token, new_expires_at) = do_token_refresh(&refresh_token).await?; + + creds.claude_ai_oauth.access_token = new_access_token; + creds.claude_ai_oauth.expires_at = new_expires_at; write_credentials(&creds)?; @@ -244,6 +258,150 @@ pub async fn refresh_access_token() -> Result<(), String> { Ok(()) } +// ── Account pool swap ───────────────────────────────────────────────────────── + +/// Select the next non-rate-limited account for swapping, given the current +/// credentials' refresh token. +/// +/// Marks the current account (identified by `current_refresh_token`) as +/// rate-limited with `reset_at_ms`, then returns a clone of the first +/// non-rate-limited account in the pool. +/// +/// Returns: +/// - `Ok(account)` — the next available pool account to switch to +/// - `Err(msg)` — all accounts are rate-limited (includes earliest reset time +/// in the message), or the pool has fewer than 2 accounts +/// +/// This is a pure function exposed for unit testing. The async +/// [`swap_to_next_available_account`] calls this and then performs the HTTP +/// token refresh. +pub(crate) fn select_next_account_for_swap( + pool: &mut AccountPool, + current_refresh_token: &str, + reset_at_ms: u64, +) -> Result { + if pool.accounts.len() < 2 { + return Err( + "No account pool available for automatic account rotation (fewer than 2 accounts)." + .to_string(), + ); + } + + // Mark the current account as rate-limited. + for account in pool.accounts.values_mut() { + if account.refresh_token == current_refresh_token { + account.rate_limited = true; + account.rate_limit_reset_at = Some(reset_at_ms); + break; + } + } + + // Find the next non-rate-limited account. + if let Some(next) = pool.accounts.values().find(|a| !a.rate_limited).cloned() { + return Ok(next); + } + + // All accounts are rate-limited — compute the earliest reset time. + let earliest_reset_ms = pool + .accounts + .values() + .filter_map(|a| a.rate_limit_reset_at) + .min(); + + let msg = match earliest_reset_ms { + Some(ms) => { + let reset_secs = (ms / 1000) as i64; + let reset_dt = + chrono::DateTime::from_timestamp(reset_secs, 0).unwrap_or_else(chrono::Utc::now); + let until = reset_dt.signed_duration_since(chrono::Utc::now()); + let mins = until.num_minutes().max(0); + format!( + "All OAuth accounts are rate-limited. \ + Earliest reset in {} minute{} (at {} UTC).", + mins, + if mins == 1 { "" } else { "s" }, + reset_dt.format("%H:%M") + ) + } + None => "All OAuth accounts are rate-limited. No reset time available.".to_string(), + }; + + Err(msg) +} + +/// Swap to the next available OAuth account in the pool on rate-limit. +/// +/// Identifies the currently active account (via `~/.claude/.credentials.json`), +/// marks it as rate-limited, then refreshes and activates the next available +/// account by writing its credentials to `~/.claude/.credentials.json`. +/// +/// Returns the email of the newly activated account on success, or an error +/// message when no account pool exists or all accounts are exhausted. +pub async fn swap_to_next_available_account( + reset_at: chrono::DateTime, +) -> Result { + slog!("[oauth] Rate limit hit — attempting account pool swap"); + + let mut pool = read_pool()?; + + let reset_at_ms = reset_at.timestamp_millis() as u64; + + let creds = read_credentials().unwrap_or_else(|_| CredentialsFile { + claude_ai_oauth: OAuthCredentials { + access_token: String::new(), + refresh_token: String::new(), + expires_at: 0, + scopes: vec![], + subscription_type: None, + rate_limit_tier: None, + }, + }); + let current_refresh_token = creds.claude_ai_oauth.refresh_token.clone(); + + let next_account = + select_next_account_for_swap(&mut pool, ¤t_refresh_token, reset_at_ms)?; + + slog!( + "[oauth] Swapping to account '{}' — refreshing token", + next_account.email + ); + + // Refresh the next account's token via HTTP. + let (new_access_token, new_expires_at) = do_token_refresh(&next_account.refresh_token).await?; + + // Update the pool account with the fresh token and clear rate-limited flag. + if let Some(pool_account) = pool.accounts.get_mut(&next_account.email) { + pool_account.access_token = new_access_token.clone(); + pool_account.expires_at = new_expires_at; + pool_account.rate_limited = false; + pool_account.rate_limit_reset_at = None; + } + + // Persist the updated pool (rate-limited flag on old account, fresh token on new). + write_pool(&pool)?; + + // Write the new account's credentials to .credentials.json so the next + // Claude Code CLI invocation uses the freshly activated account. + let new_creds = CredentialsFile { + claude_ai_oauth: OAuthCredentials { + access_token: new_access_token, + refresh_token: next_account.refresh_token.clone(), + expires_at: new_expires_at, + scopes: next_account.scopes.clone(), + subscription_type: next_account.subscription_type.clone(), + rate_limit_tier: next_account.rate_limit_tier.clone(), + }, + }; + write_credentials(&new_creds)?; + + slog!( + "[oauth] Account swap complete — now using '{}'", + next_account.email + ); + + Ok(next_account.email) +} + /// Extract the OAuth login URL from an error message produced by the Claude Code provider. /// /// The provider returns errors like: @@ -331,4 +489,116 @@ mod tests { assert!(path.ends_with(".claude/.credentials.json")); } } + + // ── select_next_account_for_swap ───────────────────────────────────────── + + fn make_account(email: &str, refresh_token: &str, rate_limited: bool) -> PoolAccount { + PoolAccount { + email: email.to_string(), + access_token: format!("access-{email}"), + refresh_token: refresh_token.to_string(), + expires_at: 9_999_999_999_999, + scopes: vec![], + subscription_type: None, + rate_limit_tier: None, + rate_limited, + rate_limit_reset_at: None, + } + } + + fn make_pool(accounts: Vec) -> AccountPool { + let mut pool = AccountPool::default(); + for a in accounts { + pool.accounts.insert(a.email.clone(), a); + } + pool + } + + /// AC3a: single rate-limit — current account is marked limited, swap to next. + #[test] + fn swap_selects_next_account_when_current_is_rate_limited() { + let account_a = make_account("a@example.com", "refresh-a", false); + let account_b = make_account("b@example.com", "refresh-b", false); + let mut pool = make_pool(vec![account_a, account_b]); + + let reset_ms = 1_700_000_000_000u64; + let result = select_next_account_for_swap(&mut pool, "refresh-a", reset_ms); + + assert!(result.is_ok(), "should find account B: {result:?}"); + let next = result.unwrap(); + assert_eq!(next.email, "b@example.com"); + + // Account A should now be marked rate-limited. + let a = pool.accounts.get("a@example.com").unwrap(); + assert!(a.rate_limited, "account A should be marked rate-limited"); + assert_eq!(a.rate_limit_reset_at, Some(reset_ms)); + } + + /// AC3b: cascade swap — next account is already rate-limited, picks the one after. + #[test] + fn swap_skips_already_rate_limited_account_and_picks_next_available() { + let account_a = make_account("a@example.com", "refresh-a", false); + let mut account_b = make_account("b@example.com", "refresh-b", false); + account_b.rate_limited = true; // B is already rate-limited + account_b.rate_limit_reset_at = Some(1_700_000_000_000); + let account_c = make_account("c@example.com", "refresh-c", false); + + let mut pool = make_pool(vec![account_a, account_b, account_c]); + + let reset_ms = 1_700_000_001_000u64; + let result = select_next_account_for_swap(&mut pool, "refresh-a", reset_ms); + + assert!(result.is_ok(), "should find account C: {result:?}"); + let next = result.unwrap(); + assert_eq!(next.email, "c@example.com"); + } + + /// AC3c: all-exhausted error path — error message includes reset time. + #[test] + fn swap_returns_error_with_reset_time_when_all_accounts_exhausted() { + let account_a = make_account("a@example.com", "refresh-a", false); + let mut account_b = make_account("b@example.com", "refresh-b", false); + // B is already rate-limited with a known reset time. + account_b.rate_limited = true; + account_b.rate_limit_reset_at = + Some((chrono::Utc::now() + chrono::Duration::hours(2)).timestamp_millis() as u64); + + let mut pool = make_pool(vec![account_a, account_b]); + + // A also hits rate limit now → all exhausted. + let reset_ms = (chrono::Utc::now() + chrono::Duration::hours(1)).timestamp_millis() as u64; + let result = select_next_account_for_swap(&mut pool, "refresh-a", reset_ms); + + assert!(result.is_err(), "should return exhausted error"); + let msg = result.unwrap_err(); + assert!( + msg.contains("All OAuth accounts are rate-limited"), + "error should say all exhausted: {msg}" + ); + assert!( + msg.contains("minute"), + "error should include time until reset: {msg}" + ); + } + + /// Pool with fewer than 2 accounts returns no-pool error. + #[test] + fn swap_returns_error_when_pool_has_fewer_than_two_accounts() { + let mut pool = make_pool(vec![make_account("a@example.com", "refresh-a", false)]); + let result = select_next_account_for_swap(&mut pool, "refresh-a", 0); + assert!(result.is_err()); + let msg = result.unwrap_err(); + assert!( + msg.contains("fewer than 2 accounts"), + "should explain pool too small: {msg}" + ); + } + + /// Empty pool also returns no-pool error. + #[test] + fn swap_returns_error_for_empty_pool() { + let mut pool = AccountPool::default(); + let result = select_next_account_for_swap(&mut pool, "any-token", 0); + assert!(result.is_err()); + } } diff --git a/server/src/service/oauth/io.rs b/server/src/service/oauth/io.rs index 9e44b651..745083ab 100644 --- a/server/src/service/oauth/io.rs +++ b/server/src/service/oauth/io.rs @@ -126,6 +126,7 @@ pub(super) fn save_credentials( subscription_type: None, rate_limit_tier: None, rate_limited: false, + rate_limit_reset_at: None, }; oauth::upsert_pool_account(account).map_err(Error::TokenStorage)?; diff --git a/server/src/service/timer/io.rs b/server/src/service/timer/io.rs index 7eed77ff..83b43cb2 100644 --- a/server/src/service/timer/io.rs +++ b/server/src/service/timer/io.rs @@ -331,6 +331,30 @@ pub fn spawn_rate_limit_auto_scheduler( ); continue; } + + // Try to swap to the next available OAuth account. On + // success the next agent start (via auto-assign) will use + // the freshly activated account — no long timer needed. + // On failure (no pool or all accounts exhausted) fall back + // to the existing timer-based retry path. + match crate::llm::oauth::swap_to_next_available_account(reset_at).await { + Ok(new_email) => { + crate::slog!( + "[timer] Account swap successful for story {story_id} \ + (agent {agent_name}): now using '{new_email}'. \ + Auto-assign will restart the agent with the new account." + ); + // No timer needed — auto-assign picks up the story. + continue; + } + Err(swap_err) => { + crate::slog!( + "[timer] Account swap not possible for story {story_id}: \ + {swap_err}. Falling back to timer-based retry." + ); + } + } + crate::slog!( "[timer] Auto-scheduling timer for story {story_id} \ (agent {agent_name}) to resume at {reset_at}"