huskies: merge 725_story_auto_swap_oauth_account_on_rate_limit

This commit is contained in:
dave
2026-04-27 18:06:47 +00:00
parent 272a592a4d
commit c6bc6f07f7
3 changed files with 313 additions and 18 deletions
+288 -18
View File
@@ -114,6 +114,11 @@ pub struct PoolAccount {
/// Whether the server has observed a rate-limit response for this account. /// Whether the server has observed a rate-limit response for this account.
#[serde(default)] #[serde(default)]
pub rate_limited: bool, pub rate_limited: bool,
/// Unix-epoch milliseconds when the rate limit resets, if known.
///
/// Used to compute "time until earliest reset" when all accounts are exhausted.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rate_limit_reset_at: Option<u64>,
} }
/// The multi-account login pool, stored in `~/.claude/oauth_pool.json`. /// The multi-account login pool, stored in `~/.claude/oauth_pool.json`.
@@ -173,22 +178,10 @@ pub fn upsert_pool_account(account: PoolAccount) -> Result<(), String> {
write_pool(&pool) write_pool(&pool)
} }
/// Refresh the OAuth access token using the stored refresh token. /// Perform a token refresh HTTP call using `refresh_token`.
/// ///
/// On success, updates `~/.claude/.credentials.json` with the new access /// Returns `(new_access_token, new_expires_at_ms)` on success.
/// token and expiry, then returns `Ok(())`. async fn do_token_refresh(refresh_token: &str) -> Result<(String, u64), String> {
///
/// On failure (e.g. refresh token expired), returns an error string.
pub async fn refresh_access_token() -> Result<(), String> {
slog!("[oauth] Attempting to refresh OAuth access token");
let mut creds = read_credentials()?;
let refresh_token = creds.claude_ai_oauth.refresh_token.clone();
if refresh_token.is_empty() {
return Err("No refresh token found. Run `claude login` to authenticate.".to_string());
}
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let resp = client let resp = client
.post(TOKEN_ENDPOINT) .post(TOKEN_ENDPOINT)
@@ -208,7 +201,6 @@ pub async fn refresh_access_token() -> Result<(), String> {
.map_err(|e| format!("Failed to read refresh response: {e}"))?; .map_err(|e| format!("Failed to read refresh response: {e}"))?;
if !status.is_success() { if !status.is_success() {
// Try to parse a structured error
if let Ok(err) = serde_json::from_str::<TokenRefreshError>(&body) { if let Ok(err) = serde_json::from_str::<TokenRefreshError>(&body) {
let desc = err let desc = err
.error_description .error_description
@@ -231,8 +223,30 @@ pub async fn refresh_access_token() -> Result<(), String> {
.map(|d| d.as_millis() as u64) .map(|d| d.as_millis() as u64)
.unwrap_or(0); .unwrap_or(0);
creds.claude_ai_oauth.access_token = token_resp.access_token; let expires_at = now_ms + (token_resp.expires_in * 1000);
creds.claude_ai_oauth.expires_at = now_ms + (token_resp.expires_in * 1000); Ok((token_resp.access_token, expires_at))
}
/// Refresh the OAuth access token using the stored refresh token.
///
/// On success, updates `~/.claude/.credentials.json` with the new access
/// token and expiry, then returns `Ok(())`.
///
/// On failure (e.g. refresh token expired), returns an error string.
pub async fn refresh_access_token() -> Result<(), String> {
slog!("[oauth] Attempting to refresh OAuth access token");
let mut creds = read_credentials()?;
let refresh_token = creds.claude_ai_oauth.refresh_token.clone();
if refresh_token.is_empty() {
return Err("No refresh token found. Run `claude login` to authenticate.".to_string());
}
let (new_access_token, new_expires_at) = do_token_refresh(&refresh_token).await?;
creds.claude_ai_oauth.access_token = new_access_token;
creds.claude_ai_oauth.expires_at = new_expires_at;
write_credentials(&creds)?; write_credentials(&creds)?;
@@ -244,6 +258,150 @@ pub async fn refresh_access_token() -> Result<(), String> {
Ok(()) Ok(())
} }
// ── Account pool swap ─────────────────────────────────────────────────────────
/// Select the next non-rate-limited account for swapping, given the current
/// credentials' refresh token.
///
/// Marks the current account (identified by `current_refresh_token`) as
/// rate-limited with `reset_at_ms`, then returns a clone of the first
/// non-rate-limited account in the pool.
///
/// Returns:
/// - `Ok(account)` — the next available pool account to switch to
/// - `Err(msg)` — all accounts are rate-limited (includes earliest reset time
/// in the message), or the pool has fewer than 2 accounts
///
/// This is a pure function exposed for unit testing. The async
/// [`swap_to_next_available_account`] calls this and then performs the HTTP
/// token refresh.
pub(crate) fn select_next_account_for_swap(
pool: &mut AccountPool,
current_refresh_token: &str,
reset_at_ms: u64,
) -> Result<PoolAccount, String> {
if pool.accounts.len() < 2 {
return Err(
"No account pool available for automatic account rotation (fewer than 2 accounts)."
.to_string(),
);
}
// Mark the current account as rate-limited.
for account in pool.accounts.values_mut() {
if account.refresh_token == current_refresh_token {
account.rate_limited = true;
account.rate_limit_reset_at = Some(reset_at_ms);
break;
}
}
// Find the next non-rate-limited account.
if let Some(next) = pool.accounts.values().find(|a| !a.rate_limited).cloned() {
return Ok(next);
}
// All accounts are rate-limited — compute the earliest reset time.
let earliest_reset_ms = pool
.accounts
.values()
.filter_map(|a| a.rate_limit_reset_at)
.min();
let msg = match earliest_reset_ms {
Some(ms) => {
let reset_secs = (ms / 1000) as i64;
let reset_dt =
chrono::DateTime::from_timestamp(reset_secs, 0).unwrap_or_else(chrono::Utc::now);
let until = reset_dt.signed_duration_since(chrono::Utc::now());
let mins = until.num_minutes().max(0);
format!(
"All OAuth accounts are rate-limited. \
Earliest reset in {} minute{} (at {} UTC).",
mins,
if mins == 1 { "" } else { "s" },
reset_dt.format("%H:%M")
)
}
None => "All OAuth accounts are rate-limited. No reset time available.".to_string(),
};
Err(msg)
}
/// Swap to the next available OAuth account in the pool on rate-limit.
///
/// Identifies the currently active account (via `~/.claude/.credentials.json`),
/// marks it as rate-limited, then refreshes and activates the next available
/// account by writing its credentials to `~/.claude/.credentials.json`.
///
/// Returns the email of the newly activated account on success, or an error
/// message when no account pool exists or all accounts are exhausted.
pub async fn swap_to_next_available_account(
reset_at: chrono::DateTime<chrono::Utc>,
) -> Result<String, String> {
slog!("[oauth] Rate limit hit — attempting account pool swap");
let mut pool = read_pool()?;
let reset_at_ms = reset_at.timestamp_millis() as u64;
let creds = read_credentials().unwrap_or_else(|_| CredentialsFile {
claude_ai_oauth: OAuthCredentials {
access_token: String::new(),
refresh_token: String::new(),
expires_at: 0,
scopes: vec![],
subscription_type: None,
rate_limit_tier: None,
},
});
let current_refresh_token = creds.claude_ai_oauth.refresh_token.clone();
let next_account =
select_next_account_for_swap(&mut pool, &current_refresh_token, reset_at_ms)?;
slog!(
"[oauth] Swapping to account '{}' — refreshing token",
next_account.email
);
// Refresh the next account's token via HTTP.
let (new_access_token, new_expires_at) = do_token_refresh(&next_account.refresh_token).await?;
// Update the pool account with the fresh token and clear rate-limited flag.
if let Some(pool_account) = pool.accounts.get_mut(&next_account.email) {
pool_account.access_token = new_access_token.clone();
pool_account.expires_at = new_expires_at;
pool_account.rate_limited = false;
pool_account.rate_limit_reset_at = None;
}
// Persist the updated pool (rate-limited flag on old account, fresh token on new).
write_pool(&pool)?;
// Write the new account's credentials to .credentials.json so the next
// Claude Code CLI invocation uses the freshly activated account.
let new_creds = CredentialsFile {
claude_ai_oauth: OAuthCredentials {
access_token: new_access_token,
refresh_token: next_account.refresh_token.clone(),
expires_at: new_expires_at,
scopes: next_account.scopes.clone(),
subscription_type: next_account.subscription_type.clone(),
rate_limit_tier: next_account.rate_limit_tier.clone(),
},
};
write_credentials(&new_creds)?;
slog!(
"[oauth] Account swap complete — now using '{}'",
next_account.email
);
Ok(next_account.email)
}
/// Extract the OAuth login URL from an error message produced by the Claude Code provider. /// Extract the OAuth login URL from an error message produced by the Claude Code provider.
/// ///
/// The provider returns errors like: /// The provider returns errors like:
@@ -331,4 +489,116 @@ mod tests {
assert!(path.ends_with(".claude/.credentials.json")); assert!(path.ends_with(".claude/.credentials.json"));
} }
} }
// ── select_next_account_for_swap ─────────────────────────────────────────
fn make_account(email: &str, refresh_token: &str, rate_limited: bool) -> PoolAccount {
PoolAccount {
email: email.to_string(),
access_token: format!("access-{email}"),
refresh_token: refresh_token.to_string(),
expires_at: 9_999_999_999_999,
scopes: vec![],
subscription_type: None,
rate_limit_tier: None,
rate_limited,
rate_limit_reset_at: None,
}
}
fn make_pool(accounts: Vec<PoolAccount>) -> AccountPool {
let mut pool = AccountPool::default();
for a in accounts {
pool.accounts.insert(a.email.clone(), a);
}
pool
}
/// AC3a: single rate-limit — current account is marked limited, swap to next.
#[test]
fn swap_selects_next_account_when_current_is_rate_limited() {
let account_a = make_account("a@example.com", "refresh-a", false);
let account_b = make_account("b@example.com", "refresh-b", false);
let mut pool = make_pool(vec![account_a, account_b]);
let reset_ms = 1_700_000_000_000u64;
let result = select_next_account_for_swap(&mut pool, "refresh-a", reset_ms);
assert!(result.is_ok(), "should find account B: {result:?}");
let next = result.unwrap();
assert_eq!(next.email, "b@example.com");
// Account A should now be marked rate-limited.
let a = pool.accounts.get("a@example.com").unwrap();
assert!(a.rate_limited, "account A should be marked rate-limited");
assert_eq!(a.rate_limit_reset_at, Some(reset_ms));
}
/// AC3b: cascade swap — next account is already rate-limited, picks the one after.
#[test]
fn swap_skips_already_rate_limited_account_and_picks_next_available() {
let account_a = make_account("a@example.com", "refresh-a", false);
let mut account_b = make_account("b@example.com", "refresh-b", false);
account_b.rate_limited = true; // B is already rate-limited
account_b.rate_limit_reset_at = Some(1_700_000_000_000);
let account_c = make_account("c@example.com", "refresh-c", false);
let mut pool = make_pool(vec![account_a, account_b, account_c]);
let reset_ms = 1_700_000_001_000u64;
let result = select_next_account_for_swap(&mut pool, "refresh-a", reset_ms);
assert!(result.is_ok(), "should find account C: {result:?}");
let next = result.unwrap();
assert_eq!(next.email, "c@example.com");
}
/// AC3c: all-exhausted error path — error message includes reset time.
#[test]
fn swap_returns_error_with_reset_time_when_all_accounts_exhausted() {
let account_a = make_account("a@example.com", "refresh-a", false);
let mut account_b = make_account("b@example.com", "refresh-b", false);
// B is already rate-limited with a known reset time.
account_b.rate_limited = true;
account_b.rate_limit_reset_at =
Some((chrono::Utc::now() + chrono::Duration::hours(2)).timestamp_millis() as u64);
let mut pool = make_pool(vec![account_a, account_b]);
// A also hits rate limit now → all exhausted.
let reset_ms = (chrono::Utc::now() + chrono::Duration::hours(1)).timestamp_millis() as u64;
let result = select_next_account_for_swap(&mut pool, "refresh-a", reset_ms);
assert!(result.is_err(), "should return exhausted error");
let msg = result.unwrap_err();
assert!(
msg.contains("All OAuth accounts are rate-limited"),
"error should say all exhausted: {msg}"
);
assert!(
msg.contains("minute"),
"error should include time until reset: {msg}"
);
}
/// Pool with fewer than 2 accounts returns no-pool error.
#[test]
fn swap_returns_error_when_pool_has_fewer_than_two_accounts() {
let mut pool = make_pool(vec![make_account("a@example.com", "refresh-a", false)]);
let result = select_next_account_for_swap(&mut pool, "refresh-a", 0);
assert!(result.is_err());
let msg = result.unwrap_err();
assert!(
msg.contains("fewer than 2 accounts"),
"should explain pool too small: {msg}"
);
}
/// Empty pool also returns no-pool error.
#[test]
fn swap_returns_error_for_empty_pool() {
let mut pool = AccountPool::default();
let result = select_next_account_for_swap(&mut pool, "any-token", 0);
assert!(result.is_err());
}
} }
+1
View File
@@ -126,6 +126,7 @@ pub(super) fn save_credentials(
subscription_type: None, subscription_type: None,
rate_limit_tier: None, rate_limit_tier: None,
rate_limited: false, rate_limited: false,
rate_limit_reset_at: None,
}; };
oauth::upsert_pool_account(account).map_err(Error::TokenStorage)?; oauth::upsert_pool_account(account).map_err(Error::TokenStorage)?;
+24
View File
@@ -331,6 +331,30 @@ pub fn spawn_rate_limit_auto_scheduler(
); );
continue; continue;
} }
// Try to swap to the next available OAuth account. On
// success the next agent start (via auto-assign) will use
// the freshly activated account — no long timer needed.
// On failure (no pool or all accounts exhausted) fall back
// to the existing timer-based retry path.
match crate::llm::oauth::swap_to_next_available_account(reset_at).await {
Ok(new_email) => {
crate::slog!(
"[timer] Account swap successful for story {story_id} \
(agent {agent_name}): now using '{new_email}'. \
Auto-assign will restart the agent with the new account."
);
// No timer needed — auto-assign picks up the story.
continue;
}
Err(swap_err) => {
crate::slog!(
"[timer] Account swap not possible for story {story_id}: \
{swap_err}. Falling back to timer-based retry."
);
}
}
crate::slog!( crate::slog!(
"[timer] Auto-scheduling timer for story {story_id} \ "[timer] Auto-scheduling timer for story {story_id} \
(agent {agent_name}) to resume at {reset_at}" (agent {agent_name}) to resume at {reset_at}"