From a5cd3a215225148e2944d06555b2df21a7436095 Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 13 May 2026 22:34:15 +0000 Subject: [PATCH] huskies: merge 994 --- server/src/agents/mod.rs | 29 +++++++++- server/src/agents/pool/auto_assign/scan.rs | 64 +++++++++++++++++++++- server/src/agents/pool/mod.rs | 20 ++++--- server/src/agents/pool/start/mod.rs | 4 +- server/src/agents/pool/test_helpers.rs | 14 ++--- server/src/agents/pool/types.rs | 10 ++-- server/src/agents/pool/wait.rs | 2 +- server/src/chat/commands/status/render.rs | 4 +- 8 files changed, 121 insertions(+), 26 deletions(-) diff --git a/server/src/agents/mod.rs b/server/src/agents/mod.rs index 83f9ae12..6e5cde03 100644 --- a/server/src/agents/mod.rs +++ b/server/src/agents/mod.rs @@ -89,6 +89,30 @@ pub enum AgentStatus { Failed, } +/// The execution state of a rate-limited agent session. +/// +/// Replaces the legacy `throttled: bool` flag, carrying the expiry time so +/// the scheduler can decide when to allow a retry rather than skipping +/// indefinitely. +#[derive(Debug, Clone, Serialize, PartialEq)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum AgentExecution { + /// The agent hit a rate-limit and is paused until `until`. + Throttled { + /// UTC instant at which the rate limit expires and the agent may resume. + until: chrono::DateTime, + }, +} + +impl AgentExecution { + /// Return `true` if the throttle period has not yet elapsed. + pub fn is_active(&self) -> bool { + match self { + Self::Throttled { until } => chrono::Utc::now() < *until, + } + } +} + /// Why an agent was forcibly terminated by the watchdog. #[derive(Debug, Clone, Serialize, PartialEq)] #[serde(rename_all = "snake_case")] @@ -236,8 +260,9 @@ pub struct AgentInfo { pub completion: Option, /// UUID identifying the persistent log file for this session. pub log_session_id: Option, - /// True when a rate-limit throttle warning was received for this agent. - pub throttled: bool, + /// Set when the agent is rate-limited; holds the UTC expiry time. + /// `None` when the agent is not throttled. + pub throttled: Option>, /// Set when the watchdog terminates the agent for exceeding a limit. pub termination_reason: Option, } diff --git a/server/src/agents/pool/auto_assign/scan.rs b/server/src/agents/pool/auto_assign/scan.rs index e81e0651..5ec709e8 100644 --- a/server/src/agents/pool/auto_assign/scan.rs +++ b/server/src/agents/pool/auto_assign/scan.rs @@ -6,7 +6,23 @@ use std::collections::HashMap; use super::super::super::{AgentStatus, PipelineStage, agent_config_stage, pipeline_stage}; use super::super::StoryAgent; +/// Return `true` if the agent has a throttle set whose expiry has already passed. +/// +/// Returns `false` when the agent has no throttle, or when the throttle's +/// `until` time is still in the future (throttle is active, agent is waiting). +fn is_throttle_expired(agent: &StoryAgent) -> bool { + agent + .throttled + .as_ref() + .map(|e| !e.is_active()) + .unwrap_or(false) +} + /// Return `true` if `agent_name` has no active (pending/running) entry in the pool. +/// +/// An agent with an expired throttle is considered free even if its status +/// is still `Running` — the scheduler may retry rather than skip indefinitely. +/// Agents without any throttle (or with an active throttle) are still considered busy. pub(in crate::agents::pool) fn is_agent_free( agents: &HashMap, agent_name: &str, @@ -14,6 +30,7 @@ pub(in crate::agents::pool) fn is_agent_free( !agents.values().any(|a| { a.agent_name == agent_name && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) + && !is_throttle_expired(a) }) } @@ -148,7 +165,7 @@ mod tests { project_root: None, log_session_id: None, merge_failure_reported: false, - throttled: false, + throttled: None, termination_reason: None, status_buffer: None, } @@ -569,6 +586,51 @@ model = "sonnet" assert_eq!(free, Some("qa")); } + // ── is_agent_free: throttle-expiry behaviour ───────────────────────── + + #[test] + fn is_agent_free_returns_false_for_running_agent_no_throttle() { + let mut agents = HashMap::new(); + agents.insert( + "s1:coder-1".to_string(), + make_test_story_agent("coder-1", AgentStatus::Running), + ); + assert!( + !is_agent_free(&agents, "coder-1"), + "running agent with no throttle should be busy" + ); + } + + #[test] + fn is_agent_free_returns_false_for_running_agent_with_active_throttle() { + let mut agent = make_test_story_agent("coder-1", AgentStatus::Running); + // Throttle expires far in the future → still active. + agent.throttled = Some(crate::agents::AgentExecution::Throttled { + until: chrono::Utc::now() + chrono::Duration::hours(1), + }); + let mut agents = HashMap::new(); + agents.insert("s1:coder-1".to_string(), agent); + assert!( + !is_agent_free(&agents, "coder-1"), + "running agent with active throttle should still be busy" + ); + } + + #[test] + fn is_agent_free_returns_true_for_running_agent_with_expired_throttle() { + let mut agent = make_test_story_agent("coder-1", AgentStatus::Running); + // Throttle expired in the past → agent is eligible for retry. + agent.throttled = Some(crate::agents::AgentExecution::Throttled { + until: chrono::Utc::now() - chrono::Duration::minutes(1), + }); + let mut agents = HashMap::new(); + agents.insert("s1:coder-1".to_string(), agent); + assert!( + is_agent_free(&agents, "coder-1"), + "running agent with expired throttle should be considered free" + ); + } + // ── count_active_agents_for_stage ──────────────────────────────────── #[test] diff --git a/server/src/agents/pool/mod.rs b/server/src/agents/pool/mod.rs index 4f326509..02c2c3e7 100644 --- a/server/src/agents/pool/mod.rs +++ b/server/src/agents/pool/mod.rs @@ -61,9 +61,8 @@ impl AgentPool { }; // Spawn a background task (only when inside a tokio runtime) that - // listens for RateLimitWarning and HardBlock events and updates the // listens for RateLimitWarning and RateLimitHardBlock events and updates the - // throttled flag on the relevant agent so status dots stay current. + // throttled field on the relevant agent so status dots stay current. if tokio::runtime::Handle::try_current().is_ok() { let agents_clone = Arc::clone(&pool.agents); let watcher_tx_clone = watcher_tx.clone(); @@ -75,23 +74,28 @@ impl AgentPool { Err(broadcast::error::RecvError::Closed) => break, Err(broadcast::error::RecvError::Lagged(_)) => continue, }; - let (story_id, agent_name) = match &event { + let (story_id, agent_name, until) = match &event { WatcherEvent::RateLimitWarning { story_id, agent_name, - } - | WatcherEvent::RateLimitHardBlock { + } => ( + story_id.clone(), + agent_name.clone(), + // No explicit reset time — use a 15-minute soft timeout. + chrono::Utc::now() + chrono::Duration::minutes(15), + ), + WatcherEvent::RateLimitHardBlock { story_id, agent_name, - .. - } => (story_id.clone(), agent_name.clone()), + reset_at, + } => (story_id.clone(), agent_name.clone(), *reset_at), _ => continue, }; let key = composite_key(&story_id, &agent_name); if let Ok(mut agents) = agents_clone.lock() && let Some(agent) = agents.get_mut(&key) { - agent.throttled = true; + agent.throttled = Some(crate::agents::AgentExecution::Throttled { until }); } let _ = watcher_tx_clone.send(WatcherEvent::AgentStateChanged); } diff --git a/server/src/agents/pool/start/mod.rs b/server/src/agents/pool/start/mod.rs index b793c067..a66c02ac 100644 --- a/server/src/agents/pool/start/mod.rs +++ b/server/src/agents/pool/start/mod.rs @@ -322,7 +322,7 @@ impl AgentPool { project_root: Some(project_root.to_path_buf()), log_session_id: Some(log_session_id.clone()), merge_failure_reported: false, - throttled: false, + throttled: None, termination_reason: None, status_buffer: Some(status_buffer), }, @@ -418,7 +418,7 @@ impl AgentPool { base_branch: None, completion: None, log_session_id: Some(log_session_id), - throttled: false, + throttled: None, termination_reason: None, }) } diff --git a/server/src/agents/pool/test_helpers.rs b/server/src/agents/pool/test_helpers.rs index d322f7c4..8159677a 100644 --- a/server/src/agents/pool/test_helpers.rs +++ b/server/src/agents/pool/test_helpers.rs @@ -35,7 +35,7 @@ impl AgentPool { project_root: None, log_session_id: None, merge_failure_reported: false, - throttled: false, + throttled: None, termination_reason: None, status_buffer: None, }, @@ -73,7 +73,7 @@ impl AgentPool { project_root: None, log_session_id: None, merge_failure_reported: false, - throttled: false, + throttled: None, termination_reason: None, status_buffer: None, }, @@ -108,7 +108,7 @@ impl AgentPool { project_root: Some(project_root), log_session_id: None, merge_failure_reported: false, - throttled: false, + throttled: None, termination_reason: None, status_buffer: None, }, @@ -142,7 +142,7 @@ impl AgentPool { project_root: None, log_session_id: Some(log_session_id.to_string()), merge_failure_reported: false, - throttled: false, + throttled: None, termination_reason: None, status_buffer: None, }, @@ -176,7 +176,7 @@ impl AgentPool { project_root: None, log_session_id: None, merge_failure_reported: false, - throttled: false, + throttled: None, termination_reason: None, status_buffer: Some(StatusEventBuffer::new(&self.status_broadcaster)), }, @@ -233,7 +233,7 @@ impl AgentPool { project_root: Some(project_root), log_session_id: None, merge_failure_reported: false, - throttled: false, + throttled: None, termination_reason: None, status_buffer: None, }, @@ -267,7 +267,7 @@ impl AgentPool { project_root: None, log_session_id: None, merge_failure_reported: false, - throttled: false, + throttled: None, termination_reason: None, status_buffer: None, }, diff --git a/server/src/agents/pool/types.rs b/server/src/agents/pool/types.rs index e7704172..ad132eb8 100644 --- a/server/src/agents/pool/types.rs +++ b/server/src/agents/pool/types.rs @@ -81,9 +81,9 @@ pub(super) struct StoryAgent { /// worktree (which compiles fine) and returns `gates_passed=true` even /// though the code was never squash-merged onto master. pub(super) merge_failure_reported: bool, - /// Set to `true` when a rate-limit throttle warning was received for this agent. - /// True when a rate-limit throttle warning was received for this agent. - pub(super) throttled: bool, + /// Set when a rate-limit event was received for this agent. + /// Carries the expiry time so the scheduler can decide when to retry. + pub(super) throttled: Option, /// Set when the watchdog terminates the agent for exceeding a limit. pub(super) termination_reason: Option, /// Passive event accumulator scoped to this session's project. @@ -113,7 +113,9 @@ pub(super) fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> Agent .map(|wt| wt.base_branch.clone()), completion: agent.completion.clone(), log_session_id: agent.log_session_id.clone(), - throttled: agent.throttled, + throttled: agent.throttled.as_ref().map(|e| match e { + crate::agents::AgentExecution::Throttled { until } => *until, + }), termination_reason: agent.termination_reason.clone(), } } diff --git a/server/src/agents/pool/wait.rs b/server/src/agents/pool/wait.rs index bc81665c..f347a63c 100644 --- a/server/src/agents/pool/wait.rs +++ b/server/src/agents/pool/wait.rs @@ -70,7 +70,7 @@ impl AgentPool { base_branch: None, completion: None, log_session_id: None, - throttled: false, + throttled: None, termination_reason: None, } }); diff --git a/server/src/chat/commands/status/render.rs b/server/src/chat/commands/status/render.rs index 1800b5a6..445e85c4 100644 --- a/server/src/chat/commands/status/render.rs +++ b/server/src/chat/commands/status/render.rs @@ -332,7 +332,9 @@ fn render_item_line( _ => format!(" \u{1F534} {display}{cost_suffix}{dep_suffix}\n"), }; } - let throttled = agent.map(|a| a.throttled).unwrap_or(false); + let throttled = agent + .and_then(|a| a.throttled) + .is_some_and(|until| until > chrono::Utc::now()); let dot = super::traffic_light_dot(blocked, throttled, agent.is_some()); if let Some(agent) = agent { let model_str = config