huskies: merge 994

This commit is contained in:
dave
2026-05-13 22:34:15 +00:00
parent 1ee23e7bfe
commit a5cd3a2152
8 changed files with 121 additions and 26 deletions
+27 -2
View File
@@ -89,6 +89,30 @@ pub enum AgentStatus {
Failed, Failed,
} }
/// The execution state of a rate-limited agent session.
///
/// Replaces the legacy `throttled: bool` flag, carrying the expiry time so
/// the scheduler can decide when to allow a retry rather than skipping
/// indefinitely.
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum AgentExecution {
/// The agent hit a rate-limit and is paused until `until`.
Throttled {
/// UTC instant at which the rate limit expires and the agent may resume.
until: chrono::DateTime<chrono::Utc>,
},
}
impl AgentExecution {
/// Return `true` if the throttle period has not yet elapsed.
pub fn is_active(&self) -> bool {
match self {
Self::Throttled { until } => chrono::Utc::now() < *until,
}
}
}
/// Why an agent was forcibly terminated by the watchdog. /// Why an agent was forcibly terminated by the watchdog.
#[derive(Debug, Clone, Serialize, PartialEq)] #[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
@@ -236,8 +260,9 @@ pub struct AgentInfo {
pub completion: Option<CompletionReport>, pub completion: Option<CompletionReport>,
/// UUID identifying the persistent log file for this session. /// UUID identifying the persistent log file for this session.
pub log_session_id: Option<String>, pub log_session_id: Option<String>,
/// True when a rate-limit throttle warning was received for this agent. /// Set when the agent is rate-limited; holds the UTC expiry time.
pub throttled: bool, /// `None` when the agent is not throttled.
pub throttled: Option<chrono::DateTime<chrono::Utc>>,
/// Set when the watchdog terminates the agent for exceeding a limit. /// Set when the watchdog terminates the agent for exceeding a limit.
pub termination_reason: Option<TerminationReason>, pub termination_reason: Option<TerminationReason>,
} }
+63 -1
View File
@@ -6,7 +6,23 @@ use std::collections::HashMap;
use super::super::super::{AgentStatus, PipelineStage, agent_config_stage, pipeline_stage}; use super::super::super::{AgentStatus, PipelineStage, agent_config_stage, pipeline_stage};
use super::super::StoryAgent; use super::super::StoryAgent;
/// Return `true` if the agent has a throttle set whose expiry has already passed.
///
/// Returns `false` when the agent has no throttle, or when the throttle's
/// `until` time is still in the future (throttle is active, agent is waiting).
fn is_throttle_expired(agent: &StoryAgent) -> bool {
agent
.throttled
.as_ref()
.map(|e| !e.is_active())
.unwrap_or(false)
}
/// Return `true` if `agent_name` has no active (pending/running) entry in the pool. /// Return `true` if `agent_name` has no active (pending/running) entry in the pool.
///
/// An agent with an expired throttle is considered free even if its status
/// is still `Running` — the scheduler may retry rather than skip indefinitely.
/// Agents without any throttle (or with an active throttle) are still considered busy.
pub(in crate::agents::pool) fn is_agent_free( pub(in crate::agents::pool) fn is_agent_free(
agents: &HashMap<String, StoryAgent>, agents: &HashMap<String, StoryAgent>,
agent_name: &str, agent_name: &str,
@@ -14,6 +30,7 @@ pub(in crate::agents::pool) fn is_agent_free(
!agents.values().any(|a| { !agents.values().any(|a| {
a.agent_name == agent_name a.agent_name == agent_name
&& matches!(a.status, AgentStatus::Running | AgentStatus::Pending) && matches!(a.status, AgentStatus::Running | AgentStatus::Pending)
&& !is_throttle_expired(a)
}) })
} }
@@ -148,7 +165,7 @@ mod tests {
project_root: None, project_root: None,
log_session_id: None, log_session_id: None,
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: None,
termination_reason: None, termination_reason: None,
status_buffer: None, status_buffer: None,
} }
@@ -569,6 +586,51 @@ model = "sonnet"
assert_eq!(free, Some("qa")); assert_eq!(free, Some("qa"));
} }
// ── is_agent_free: throttle-expiry behaviour ─────────────────────────
#[test]
fn is_agent_free_returns_false_for_running_agent_no_throttle() {
let mut agents = HashMap::new();
agents.insert(
"s1:coder-1".to_string(),
make_test_story_agent("coder-1", AgentStatus::Running),
);
assert!(
!is_agent_free(&agents, "coder-1"),
"running agent with no throttle should be busy"
);
}
#[test]
fn is_agent_free_returns_false_for_running_agent_with_active_throttle() {
let mut agent = make_test_story_agent("coder-1", AgentStatus::Running);
// Throttle expires far in the future → still active.
agent.throttled = Some(crate::agents::AgentExecution::Throttled {
until: chrono::Utc::now() + chrono::Duration::hours(1),
});
let mut agents = HashMap::new();
agents.insert("s1:coder-1".to_string(), agent);
assert!(
!is_agent_free(&agents, "coder-1"),
"running agent with active throttle should still be busy"
);
}
#[test]
fn is_agent_free_returns_true_for_running_agent_with_expired_throttle() {
let mut agent = make_test_story_agent("coder-1", AgentStatus::Running);
// Throttle expired in the past → agent is eligible for retry.
agent.throttled = Some(crate::agents::AgentExecution::Throttled {
until: chrono::Utc::now() - chrono::Duration::minutes(1),
});
let mut agents = HashMap::new();
agents.insert("s1:coder-1".to_string(), agent);
assert!(
is_agent_free(&agents, "coder-1"),
"running agent with expired throttle should be considered free"
);
}
// ── count_active_agents_for_stage ──────────────────────────────────── // ── count_active_agents_for_stage ────────────────────────────────────
#[test] #[test]
+12 -8
View File
@@ -61,9 +61,8 @@ impl AgentPool {
}; };
// Spawn a background task (only when inside a tokio runtime) that // Spawn a background task (only when inside a tokio runtime) that
// listens for RateLimitWarning and HardBlock events and updates the
// listens for RateLimitWarning and RateLimitHardBlock events and updates the // listens for RateLimitWarning and RateLimitHardBlock events and updates the
// throttled flag on the relevant agent so status dots stay current. // throttled field on the relevant agent so status dots stay current.
if tokio::runtime::Handle::try_current().is_ok() { if tokio::runtime::Handle::try_current().is_ok() {
let agents_clone = Arc::clone(&pool.agents); let agents_clone = Arc::clone(&pool.agents);
let watcher_tx_clone = watcher_tx.clone(); let watcher_tx_clone = watcher_tx.clone();
@@ -75,23 +74,28 @@ impl AgentPool {
Err(broadcast::error::RecvError::Closed) => break, Err(broadcast::error::RecvError::Closed) => break,
Err(broadcast::error::RecvError::Lagged(_)) => continue, Err(broadcast::error::RecvError::Lagged(_)) => continue,
}; };
let (story_id, agent_name) = match &event { let (story_id, agent_name, until) = match &event {
WatcherEvent::RateLimitWarning { WatcherEvent::RateLimitWarning {
story_id, story_id,
agent_name, agent_name,
} } => (
| WatcherEvent::RateLimitHardBlock { story_id.clone(),
agent_name.clone(),
// No explicit reset time — use a 15-minute soft timeout.
chrono::Utc::now() + chrono::Duration::minutes(15),
),
WatcherEvent::RateLimitHardBlock {
story_id, story_id,
agent_name, agent_name,
.. reset_at,
} => (story_id.clone(), agent_name.clone()), } => (story_id.clone(), agent_name.clone(), *reset_at),
_ => continue, _ => continue,
}; };
let key = composite_key(&story_id, &agent_name); let key = composite_key(&story_id, &agent_name);
if let Ok(mut agents) = agents_clone.lock() if let Ok(mut agents) = agents_clone.lock()
&& let Some(agent) = agents.get_mut(&key) && let Some(agent) = agents.get_mut(&key)
{ {
agent.throttled = true; agent.throttled = Some(crate::agents::AgentExecution::Throttled { until });
} }
let _ = watcher_tx_clone.send(WatcherEvent::AgentStateChanged); let _ = watcher_tx_clone.send(WatcherEvent::AgentStateChanged);
} }
+2 -2
View File
@@ -322,7 +322,7 @@ impl AgentPool {
project_root: Some(project_root.to_path_buf()), project_root: Some(project_root.to_path_buf()),
log_session_id: Some(log_session_id.clone()), log_session_id: Some(log_session_id.clone()),
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: None,
termination_reason: None, termination_reason: None,
status_buffer: Some(status_buffer), status_buffer: Some(status_buffer),
}, },
@@ -418,7 +418,7 @@ impl AgentPool {
base_branch: None, base_branch: None,
completion: None, completion: None,
log_session_id: Some(log_session_id), log_session_id: Some(log_session_id),
throttled: false, throttled: None,
termination_reason: None, termination_reason: None,
}) })
} }
+7 -7
View File
@@ -35,7 +35,7 @@ impl AgentPool {
project_root: None, project_root: None,
log_session_id: None, log_session_id: None,
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: None,
termination_reason: None, termination_reason: None,
status_buffer: None, status_buffer: None,
}, },
@@ -73,7 +73,7 @@ impl AgentPool {
project_root: None, project_root: None,
log_session_id: None, log_session_id: None,
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: None,
termination_reason: None, termination_reason: None,
status_buffer: None, status_buffer: None,
}, },
@@ -108,7 +108,7 @@ impl AgentPool {
project_root: Some(project_root), project_root: Some(project_root),
log_session_id: None, log_session_id: None,
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: None,
termination_reason: None, termination_reason: None,
status_buffer: None, status_buffer: None,
}, },
@@ -142,7 +142,7 @@ impl AgentPool {
project_root: None, project_root: None,
log_session_id: Some(log_session_id.to_string()), log_session_id: Some(log_session_id.to_string()),
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: None,
termination_reason: None, termination_reason: None,
status_buffer: None, status_buffer: None,
}, },
@@ -176,7 +176,7 @@ impl AgentPool {
project_root: None, project_root: None,
log_session_id: None, log_session_id: None,
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: None,
termination_reason: None, termination_reason: None,
status_buffer: Some(StatusEventBuffer::new(&self.status_broadcaster)), status_buffer: Some(StatusEventBuffer::new(&self.status_broadcaster)),
}, },
@@ -233,7 +233,7 @@ impl AgentPool {
project_root: Some(project_root), project_root: Some(project_root),
log_session_id: None, log_session_id: None,
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: None,
termination_reason: None, termination_reason: None,
status_buffer: None, status_buffer: None,
}, },
@@ -267,7 +267,7 @@ impl AgentPool {
project_root: None, project_root: None,
log_session_id: None, log_session_id: None,
merge_failure_reported: false, merge_failure_reported: false,
throttled: false, throttled: None,
termination_reason: None, termination_reason: None,
status_buffer: None, status_buffer: None,
}, },
+6 -4
View File
@@ -81,9 +81,9 @@ pub(super) struct StoryAgent {
/// worktree (which compiles fine) and returns `gates_passed=true` even /// worktree (which compiles fine) and returns `gates_passed=true` even
/// though the code was never squash-merged onto master. /// though the code was never squash-merged onto master.
pub(super) merge_failure_reported: bool, pub(super) merge_failure_reported: bool,
/// Set to `true` when a rate-limit throttle warning was received for this agent. /// Set when a rate-limit event was received for this agent.
/// True when a rate-limit throttle warning was received for this agent. /// Carries the expiry time so the scheduler can decide when to retry.
pub(super) throttled: bool, pub(super) throttled: Option<crate::agents::AgentExecution>,
/// Set when the watchdog terminates the agent for exceeding a limit. /// Set when the watchdog terminates the agent for exceeding a limit.
pub(super) termination_reason: Option<crate::agents::TerminationReason>, pub(super) termination_reason: Option<crate::agents::TerminationReason>,
/// Passive event accumulator scoped to this session's project. /// Passive event accumulator scoped to this session's project.
@@ -113,7 +113,9 @@ pub(super) fn agent_info_from_entry(story_id: &str, agent: &StoryAgent) -> Agent
.map(|wt| wt.base_branch.clone()), .map(|wt| wt.base_branch.clone()),
completion: agent.completion.clone(), completion: agent.completion.clone(),
log_session_id: agent.log_session_id.clone(), log_session_id: agent.log_session_id.clone(),
throttled: agent.throttled, throttled: agent.throttled.as_ref().map(|e| match e {
crate::agents::AgentExecution::Throttled { until } => *until,
}),
termination_reason: agent.termination_reason.clone(), termination_reason: agent.termination_reason.clone(),
} }
} }
+1 -1
View File
@@ -70,7 +70,7 @@ impl AgentPool {
base_branch: None, base_branch: None,
completion: None, completion: None,
log_session_id: None, log_session_id: None,
throttled: false, throttled: None,
termination_reason: None, termination_reason: None,
} }
}); });
+3 -1
View File
@@ -332,7 +332,9 @@ fn render_item_line(
_ => format!(" \u{1F534} {display}{cost_suffix}{dep_suffix}\n"), _ => format!(" \u{1F534} {display}{cost_suffix}{dep_suffix}\n"),
}; };
} }
let throttled = agent.map(|a| a.throttled).unwrap_or(false); let throttled = agent
.and_then(|a| a.throttled)
.is_some_and(|until| until > chrono::Utc::now());
let dot = super::traffic_light_dot(blocked, throttled, agent.is_some()); let dot = super::traffic_light_dot(blocked, throttled, agent.is_some());
if let Some(agent) = agent { if let Some(agent) = agent {
let model_str = config let model_str = config