huskies: merge 615_story_extract_timer_service
This commit is contained in:
@@ -0,0 +1,650 @@
|
||||
//! I/O side of the timer service: filesystem persistence, clock reads,
|
||||
//! background task spawning, and story-ID resolution.
|
||||
//!
|
||||
//! This is the **only** place inside `service/timer/` that may perform side
|
||||
//! effects (filesystem reads/writes, clock reads, `tokio::spawn`).
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use super::persist::{TimerEntry, deserialize_timers, serialize_timers};
|
||||
use super::schedule::next_occurrence_at;
|
||||
|
||||
// ── TimerStore ─────────────────────────────────────────────────────────────
|
||||
|
||||
/// Persistent store for pending timers, backed by a JSON file.
|
||||
pub struct TimerStore {
|
||||
path: PathBuf,
|
||||
timers: Mutex<Vec<TimerEntry>>,
|
||||
}
|
||||
|
||||
impl TimerStore {
|
||||
/// Load the timer store from `path`. Returns an empty store if the file
|
||||
/// does not exist or cannot be parsed.
|
||||
pub fn load(path: PathBuf) -> Self {
|
||||
let timers = if path.exists() {
|
||||
std::fs::read_to_string(&path)
|
||||
.ok()
|
||||
.and_then(|s| deserialize_timers(&s))
|
||||
.unwrap_or_default()
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
Self {
|
||||
path,
|
||||
timers: Mutex::new(timers),
|
||||
}
|
||||
}
|
||||
|
||||
fn save_locked(path: &Path, timers: &[TimerEntry]) -> Result<(), String> {
|
||||
if let Some(parent) = path.parent() {
|
||||
std::fs::create_dir_all(parent)
|
||||
.map_err(|e| format!("Failed to create directory: {e}"))?;
|
||||
}
|
||||
let content = serialize_timers(timers)?;
|
||||
std::fs::write(path, content).map_err(|e| format!("Failed to write timers: {e}"))
|
||||
}
|
||||
|
||||
/// Add a timer and persist to disk.
|
||||
pub fn add(&self, story_id: String, scheduled_at: DateTime<Utc>) -> Result<(), String> {
|
||||
let mut timers = self.timers.lock().unwrap();
|
||||
timers.push(TimerEntry {
|
||||
story_id,
|
||||
scheduled_at,
|
||||
});
|
||||
Self::save_locked(&self.path, &timers)
|
||||
}
|
||||
|
||||
/// Remove the timer for `story_id`. Returns `true` if one was removed.
|
||||
pub fn remove(&self, story_id: &str) -> bool {
|
||||
let mut timers = self.timers.lock().unwrap();
|
||||
let before = timers.len();
|
||||
timers.retain(|t| t.story_id != story_id);
|
||||
let removed = timers.len() < before;
|
||||
if removed {
|
||||
let _ = Self::save_locked(&self.path, &timers);
|
||||
}
|
||||
removed
|
||||
}
|
||||
|
||||
/// Return all pending timers (cloned).
|
||||
pub fn list(&self) -> Vec<TimerEntry> {
|
||||
self.timers.lock().unwrap().clone()
|
||||
}
|
||||
|
||||
/// Add or update a timer for `story_id`.
|
||||
///
|
||||
/// - If no timer exists for `story_id`, adds it.
|
||||
/// - If a timer already exists and `scheduled_at` is **later**, updates it.
|
||||
/// - If a timer already exists and `scheduled_at` is earlier or equal, no-op.
|
||||
///
|
||||
/// Use this instead of [`add`] when auto-scheduling from rate-limit events to
|
||||
/// avoid creating duplicates and to always keep the latest reset time.
|
||||
pub fn upsert(&self, story_id: String, scheduled_at: DateTime<Utc>) -> Result<(), String> {
|
||||
let mut timers = self.timers.lock().unwrap();
|
||||
if let Some(existing) = timers.iter_mut().find(|t| t.story_id == story_id) {
|
||||
if scheduled_at > existing.scheduled_at {
|
||||
existing.scheduled_at = scheduled_at;
|
||||
Self::save_locked(&self.path, &timers)?;
|
||||
}
|
||||
} else {
|
||||
timers.push(TimerEntry {
|
||||
story_id,
|
||||
scheduled_at,
|
||||
});
|
||||
Self::save_locked(&self.path, &timers)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remove and return all timers whose `scheduled_at` is ≤ `now`.
|
||||
/// Persists the updated list to disk if any timers were removed.
|
||||
pub fn take_due(&self, now: DateTime<Utc>) -> Vec<TimerEntry> {
|
||||
let mut timers = self.timers.lock().unwrap();
|
||||
let mut due = Vec::new();
|
||||
let mut remaining = Vec::new();
|
||||
for t in timers.drain(..) {
|
||||
if t.scheduled_at <= now {
|
||||
due.push(t);
|
||||
} else {
|
||||
remaining.push(t);
|
||||
}
|
||||
}
|
||||
*timers = remaining;
|
||||
if !due.is_empty() {
|
||||
let _ = Self::save_locked(&self.path, &timers);
|
||||
}
|
||||
due
|
||||
}
|
||||
}
|
||||
|
||||
// ── Clock-reading wrapper ───────────────────────────────────────────────────
|
||||
|
||||
/// Parse `HH:MM` and return the next UTC instant at which the given timezone
|
||||
/// (or the server-local clock when `timezone` is `None`) will read that time.
|
||||
/// If the time has already passed today, returns tomorrow's occurrence.
|
||||
///
|
||||
/// This wrapper reads the current clock via `Utc::now()`. For pure unit tests
|
||||
/// use [`crate::service::timer::schedule::next_occurrence_at`] directly.
|
||||
pub fn next_occurrence_of_hhmm(hhmm: &str, timezone: Option<&str>) -> Option<DateTime<Utc>> {
|
||||
next_occurrence_at(hhmm, timezone, Utc::now())
|
||||
}
|
||||
|
||||
// ── Story-ID resolution ─────────────────────────────────────────────────────
|
||||
|
||||
/// Resolve a story ID from a numeric story number or a full ID string.
|
||||
///
|
||||
/// Searches all pipeline stages. Returns `None` only when the input is
|
||||
/// numeric but no matching file is found.
|
||||
pub(super) fn resolve_story_id(number_or_id: &str, project_root: &Path) -> Option<String> {
|
||||
const STAGES: &[&str] = &[
|
||||
"1_backlog",
|
||||
"2_current",
|
||||
"3_qa",
|
||||
"4_merge",
|
||||
"5_done",
|
||||
"6_archived",
|
||||
];
|
||||
|
||||
// Full ID (contains underscores) — return as-is; validation happens at file-check time.
|
||||
if number_or_id.contains('_') {
|
||||
return Some(number_or_id.to_string());
|
||||
}
|
||||
|
||||
// Numeric lookup.
|
||||
if !number_or_id.chars().all(|c| c.is_ascii_digit()) {
|
||||
return None;
|
||||
}
|
||||
|
||||
// --- DB-first lookup ---
|
||||
for id in crate::db::all_content_ids() {
|
||||
let file_num = id.split('_').next().unwrap_or("");
|
||||
if file_num == number_or_id
|
||||
&& crate::pipeline_state::read_typed(&id)
|
||||
.ok()
|
||||
.flatten()
|
||||
.is_some()
|
||||
{
|
||||
return Some(id);
|
||||
}
|
||||
}
|
||||
|
||||
// --- Filesystem fallback ---
|
||||
for stage in STAGES {
|
||||
let dir = project_root.join(".huskies").join("work").join(stage);
|
||||
if !dir.exists() {
|
||||
continue;
|
||||
}
|
||||
if let Ok(entries) = std::fs::read_dir(&dir) {
|
||||
for entry in entries.flatten() {
|
||||
let path = entry.path();
|
||||
if path.extension().and_then(|e| e.to_str()) != Some("md") {
|
||||
continue;
|
||||
}
|
||||
if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
|
||||
let file_num = stem
|
||||
.split('_')
|
||||
.next()
|
||||
.filter(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit()))
|
||||
.unwrap_or("");
|
||||
if file_num == number_or_id {
|
||||
return Some(stem.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
// ── Tick loop ───────────────────────────────────────────────────────────────
|
||||
|
||||
/// Execute one tick of the timer loop.
|
||||
///
|
||||
/// Called by the unified background tick loop every second.
|
||||
/// Separated from the loop so we can catch panics at the call-site.
|
||||
/// Returns `Err` only when the tick panicked (the panic message is returned).
|
||||
pub async fn tick_once(
|
||||
store: &Arc<TimerStore>,
|
||||
agents: &Arc<crate::agents::AgentPool>,
|
||||
project_root: &Path,
|
||||
) -> Result<(), String> {
|
||||
// take_due is sync and could panic (e.g. poisoned mutex) — catch it.
|
||||
let due = {
|
||||
let store_ref = Arc::clone(store);
|
||||
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
|
||||
store_ref.take_due(Utc::now())
|
||||
}));
|
||||
match result {
|
||||
Ok(due) => due,
|
||||
Err(e) => return Err(panic_payload_to_string(&e)),
|
||||
}
|
||||
};
|
||||
|
||||
if due.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let remaining = store.list().len();
|
||||
crate::slog!("[timer] Tick: {} due, {remaining} remaining", due.len());
|
||||
|
||||
for entry in due {
|
||||
crate::slog!("[timer] Timer fired for story {}", entry.story_id);
|
||||
|
||||
// Bug 501: Defense-in-depth check. If the story has already advanced
|
||||
// past the active-work stages (3_qa, 4_merge, 5_done, 6_archived),
|
||||
// there is nothing to resume — the timer is stale and should no-op.
|
||||
// The primary cancellation paths (move_story MCP → backlog, stop_agent)
|
||||
// remove the timer before it fires; this guard covers the case where
|
||||
// cancellation was not yet called or the story raced forward through
|
||||
// the pipeline while the timer was pending.
|
||||
if let Ok(Some(item)) = crate::pipeline_state::read_typed(&entry.story_id) {
|
||||
use crate::pipeline_state::Stage;
|
||||
match &item.stage {
|
||||
Stage::Qa | Stage::Merge { .. } | Stage::Done { .. } | Stage::Archived { .. } => {
|
||||
crate::slog!(
|
||||
"[timer] Skipping timer for story {} — currently in '{}', \
|
||||
not in backlog/current; timer is stale",
|
||||
entry.story_id,
|
||||
item.stage.dir_name()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Move from backlog to current if needed — the auto-assign
|
||||
// watcher will then start an agent automatically.
|
||||
if let Err(e) =
|
||||
crate::agents::lifecycle::move_story_to_current(project_root, &entry.story_id)
|
||||
{
|
||||
crate::slog!(
|
||||
"[timer] Failed to move story {} to current: {e}",
|
||||
entry.story_id
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
match agents
|
||||
.start_agent(project_root, &entry.story_id, None, None, None)
|
||||
.await
|
||||
{
|
||||
Ok(info) => {
|
||||
crate::slog!(
|
||||
"[timer] Started agent {} for story {}",
|
||||
info.agent_name,
|
||||
entry.story_id
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
crate::slog!(
|
||||
"[timer] Failed to start agent for story {}: {e} \
|
||||
(auto-assign may pick it up)",
|
||||
entry.story_id
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn panic_payload_to_string(payload: &Box<dyn std::any::Any + Send>) -> String {
|
||||
if let Some(s) = payload.downcast_ref::<&str>() {
|
||||
(*s).to_string()
|
||||
} else if let Some(s) = payload.downcast_ref::<String>() {
|
||||
s.clone()
|
||||
} else {
|
||||
"unknown panic".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
// ── Rate-limit auto-scheduler ───────────────────────────────────────────────
|
||||
|
||||
/// Spawn a background task that listens for [`WatcherEvent::RateLimitHardBlock`]
|
||||
/// events and auto-schedules a timer for the blocked story.
|
||||
///
|
||||
/// If a timer already exists for the story, it is updated to the later reset time
|
||||
/// rather than creating a duplicate (via [`TimerStore::upsert`]).
|
||||
pub fn spawn_rate_limit_auto_scheduler(
|
||||
store: Arc<TimerStore>,
|
||||
mut watcher_rx: tokio::sync::broadcast::Receiver<crate::io::watcher::WatcherEvent>,
|
||||
) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match watcher_rx.recv().await {
|
||||
Ok(crate::io::watcher::WatcherEvent::RateLimitHardBlock {
|
||||
story_id,
|
||||
agent_name,
|
||||
reset_at,
|
||||
}) => {
|
||||
// Skip short rate limits (≤10 min) — the CLI handles
|
||||
// these internally. Only schedule timers for long
|
||||
// session-level blocks where the CLI will exit.
|
||||
let until_reset = reset_at.signed_duration_since(chrono::Utc::now());
|
||||
if until_reset.num_minutes() <= 10 {
|
||||
crate::slog!(
|
||||
"[timer] Skipping short rate limit for {story_id} \
|
||||
({} min); CLI will handle internally",
|
||||
until_reset.num_minutes()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
crate::slog!(
|
||||
"[timer] Auto-scheduling timer for story {story_id} \
|
||||
(agent {agent_name}) to resume at {reset_at}"
|
||||
);
|
||||
match store.upsert(story_id.clone(), reset_at) {
|
||||
Ok(()) => {
|
||||
crate::slog!(
|
||||
"[timer] Timer upserted for story {story_id}; \
|
||||
scheduled at {reset_at}"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
crate::slog!(
|
||||
"[timer] Failed to upsert timer for story {story_id}: {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
crate::slog!("[timer] Rate-limit auto-scheduler lagged, skipped {n} events");
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
crate::slog!(
|
||||
"[timer] Watcher channel closed, stopping rate-limit auto-scheduler"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// ── Tests ──────────────────────────────────────────────────────────────────
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use chrono::Duration;
|
||||
use tempfile::TempDir;
|
||||
|
||||
// ── next_occurrence_of_hhmm ─────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn valid_hhmm_returns_some() {
|
||||
let result = next_occurrence_of_hhmm("14:30", None);
|
||||
assert!(result.is_some(), "valid HH:MM should return Some");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_hhmm_missing_colon_returns_none() {
|
||||
assert!(next_occurrence_of_hhmm("1430", None).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_hhmm_bad_hours_returns_none() {
|
||||
assert!(next_occurrence_of_hhmm("25:00", None).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_hhmm_bad_minutes_returns_none() {
|
||||
assert!(next_occurrence_of_hhmm("12:60", None).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn next_occurrence_is_in_the_future() {
|
||||
let result = next_occurrence_of_hhmm("14:30", None).unwrap();
|
||||
assert!(result > Utc::now(), "next occurrence must be in the future");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn next_occurrence_with_named_timezone_is_in_the_future() {
|
||||
let result = next_occurrence_of_hhmm("14:30", Some("Europe/London")).unwrap();
|
||||
assert!(
|
||||
result > Utc::now(),
|
||||
"next occurrence (Europe/London) must be in the future"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn next_occurrence_with_invalid_timezone_falls_back_to_local() {
|
||||
// An unrecognised timezone name falls back to chrono::Local (returns Some).
|
||||
let result = next_occurrence_of_hhmm("14:30", Some("Invalid/Zone"));
|
||||
assert!(
|
||||
result.is_some(),
|
||||
"invalid timezone should fall back to local and return Some"
|
||||
);
|
||||
}
|
||||
|
||||
// ── TimerStore ──────────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn timer_store_empty_on_missing_file() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let store = TimerStore::load(dir.path().join("timers.json"));
|
||||
assert!(store.list().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn timer_store_add_and_list() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let store = TimerStore::load(dir.path().join("timers.json"));
|
||||
let t = Utc::now() + Duration::hours(1);
|
||||
store.add("story_1".to_string(), t).unwrap();
|
||||
let list = store.list();
|
||||
assert_eq!(list.len(), 1);
|
||||
assert_eq!(list[0].story_id, "story_1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn timer_store_remove() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let store = TimerStore::load(dir.path().join("timers.json"));
|
||||
let t = Utc::now() + Duration::hours(1);
|
||||
store.add("story_1".to_string(), t).unwrap();
|
||||
assert!(store.remove("story_1"));
|
||||
assert!(!store.remove("story_1")); // already gone
|
||||
assert!(store.list().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn timer_store_persists_and_reloads() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let path = dir.path().join("timers.json");
|
||||
let t = Utc::now() + Duration::hours(2);
|
||||
{
|
||||
let store = TimerStore::load(path.clone());
|
||||
store.add("421_story_foo".to_string(), t).unwrap();
|
||||
}
|
||||
// Reload from disk.
|
||||
let store2 = TimerStore::load(path);
|
||||
let list = store2.list();
|
||||
assert_eq!(list.len(), 1);
|
||||
assert_eq!(list[0].story_id, "421_story_foo");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn take_due_returns_only_past_entries() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let store = TimerStore::load(dir.path().join("timers.json"));
|
||||
let past = Utc::now() - Duration::minutes(1);
|
||||
let future = Utc::now() + Duration::hours(1);
|
||||
store.add("past_story".to_string(), past).unwrap();
|
||||
store.add("future_story".to_string(), future).unwrap();
|
||||
|
||||
let due = store.take_due(Utc::now());
|
||||
assert_eq!(due.len(), 1);
|
||||
assert_eq!(due[0].story_id, "past_story");
|
||||
assert_eq!(store.list().len(), 1);
|
||||
assert_eq!(store.list()[0].story_id, "future_story");
|
||||
}
|
||||
|
||||
// ── upsert ─────────────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn upsert_adds_new_timer_when_none_exists() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let store = TimerStore::load(dir.path().join("timers.json"));
|
||||
let t = Utc::now() + Duration::hours(1);
|
||||
store.upsert("story_1".to_string(), t).unwrap();
|
||||
let list = store.list();
|
||||
assert_eq!(list.len(), 1);
|
||||
assert_eq!(list[0].story_id, "story_1");
|
||||
assert_eq!(list[0].scheduled_at, t);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn upsert_updates_to_later_time() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let store = TimerStore::load(dir.path().join("timers.json"));
|
||||
let early = Utc::now() + Duration::hours(1);
|
||||
let later = Utc::now() + Duration::hours(2);
|
||||
store.upsert("story_1".to_string(), early).unwrap();
|
||||
store.upsert("story_1".to_string(), later).unwrap();
|
||||
let list = store.list();
|
||||
assert_eq!(list.len(), 1, "should not create duplicate");
|
||||
assert_eq!(list[0].scheduled_at, later, "should update to later time");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn upsert_does_not_downgrade_to_earlier_time() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let store = TimerStore::load(dir.path().join("timers.json"));
|
||||
let later = Utc::now() + Duration::hours(2);
|
||||
let earlier = Utc::now() + Duration::hours(1);
|
||||
store.upsert("story_1".to_string(), later).unwrap();
|
||||
store.upsert("story_1".to_string(), earlier).unwrap();
|
||||
let list = store.list();
|
||||
assert_eq!(list.len(), 1);
|
||||
assert_eq!(
|
||||
list[0].scheduled_at, later,
|
||||
"should keep the later time, not downgrade"
|
||||
);
|
||||
}
|
||||
|
||||
// ── spawn_rate_limit_auto_scheduler ─────────────────────────────────
|
||||
|
||||
/// AC2: a RateLimitHardBlock event causes the auto-scheduler to add a timer.
|
||||
#[tokio::test]
|
||||
async fn rate_limit_auto_scheduler_adds_timer_on_hard_block() {
|
||||
use crate::io::watcher::WatcherEvent;
|
||||
|
||||
let dir = TempDir::new().unwrap();
|
||||
let store = Arc::new(TimerStore::load(dir.path().join("timers.json")));
|
||||
let (watcher_tx, watcher_rx) = tokio::sync::broadcast::channel::<WatcherEvent>(16);
|
||||
|
||||
spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx);
|
||||
|
||||
let reset_at = Utc::now() + Duration::hours(1);
|
||||
watcher_tx
|
||||
.send(WatcherEvent::RateLimitHardBlock {
|
||||
story_id: "423_story_test".to_string(),
|
||||
agent_name: "coder-1".to_string(),
|
||||
reset_at,
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Give the spawned task time to process the event.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
|
||||
let list = store.list();
|
||||
assert_eq!(list.len(), 1, "expected one timer after hard block");
|
||||
assert_eq!(list[0].story_id, "423_story_test");
|
||||
assert_eq!(list[0].scheduled_at, reset_at);
|
||||
}
|
||||
|
||||
/// AC3 integration: a second hard block with a later reset_at updates the
|
||||
/// existing timer rather than creating a duplicate.
|
||||
#[tokio::test]
|
||||
async fn rate_limit_auto_scheduler_upserts_on_repeated_hard_block() {
|
||||
use crate::io::watcher::WatcherEvent;
|
||||
|
||||
let dir = TempDir::new().unwrap();
|
||||
let store = Arc::new(TimerStore::load(dir.path().join("timers.json")));
|
||||
let (watcher_tx, watcher_rx) = tokio::sync::broadcast::channel::<WatcherEvent>(16);
|
||||
|
||||
spawn_rate_limit_auto_scheduler(Arc::clone(&store), watcher_rx);
|
||||
|
||||
let first = Utc::now() + Duration::hours(1);
|
||||
let second = Utc::now() + Duration::hours(2);
|
||||
|
||||
watcher_tx
|
||||
.send(WatcherEvent::RateLimitHardBlock {
|
||||
story_id: "423_story_test".to_string(),
|
||||
agent_name: "coder-1".to_string(),
|
||||
reset_at: first,
|
||||
})
|
||||
.unwrap();
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
watcher_tx
|
||||
.send(WatcherEvent::RateLimitHardBlock {
|
||||
story_id: "423_story_test".to_string(),
|
||||
agent_name: "coder-1".to_string(),
|
||||
reset_at: second,
|
||||
})
|
||||
.unwrap();
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
|
||||
let list = store.list();
|
||||
assert_eq!(list.len(), 1, "should not create a duplicate timer");
|
||||
assert_eq!(list[0].scheduled_at, second, "should update to later time");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_timers_same_time_all_returned() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let store = TimerStore::load(dir.path().join("timers.json"));
|
||||
let past = Utc::now() - Duration::minutes(1);
|
||||
store.add("story_a".to_string(), past).unwrap();
|
||||
store.add("story_b".to_string(), past).unwrap();
|
||||
|
||||
let due = store.take_due(Utc::now());
|
||||
assert_eq!(due.len(), 2, "both timers at same time must fire");
|
||||
}
|
||||
|
||||
// ── tick_once ───────────────────────────────────────────────────────
|
||||
|
||||
/// Create a past-due timer, run tick_once, and assert the entry is
|
||||
/// consumed. start_agent will fail (no real agent binary), but
|
||||
/// take_due must still drain the entry.
|
||||
#[tokio::test]
|
||||
async fn tick_once_consumes_past_due_entry() {
|
||||
use std::fs;
|
||||
|
||||
let dir = TempDir::new().unwrap();
|
||||
let root = dir.path();
|
||||
let backlog = root.join(".huskies/work/1_backlog");
|
||||
let current = root.join(".huskies/work/2_current");
|
||||
fs::create_dir_all(&backlog).unwrap();
|
||||
fs::create_dir_all(¤t).unwrap();
|
||||
let content = "---\nname: Foo\n---\n";
|
||||
fs::write(backlog.join("9905_story_foo.md"), content).unwrap();
|
||||
crate::db::ensure_content_store();
|
||||
crate::db::write_content("9905_story_foo", content);
|
||||
|
||||
let store = Arc::new(TimerStore::load(root.join("timers.json")));
|
||||
let past = Utc::now() - Duration::seconds(5);
|
||||
store.add("9905_story_foo".to_string(), past).unwrap();
|
||||
assert_eq!(store.list().len(), 1, "precondition: one pending timer");
|
||||
|
||||
let agents = Arc::new(crate::agents::AgentPool::new_test(19999));
|
||||
|
||||
// tick_once should drain the due entry even though start_agent
|
||||
// will fail (no agent binary configured in the test pool).
|
||||
let result = super::tick_once(&store, &agents, root).await;
|
||||
assert!(result.is_ok(), "tick_once should not panic: {result:?}");
|
||||
assert!(
|
||||
store.list().is_empty(),
|
||||
"past-due timer must be consumed after tick_once"
|
||||
);
|
||||
// Story should still be accessible in the content store after the move.
|
||||
assert!(
|
||||
crate::db::read_content("9905_story_foo").is_some(),
|
||||
"story should be in the content store after tick fires"
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user