storkit: merge 465_bug_timer_tick_loop_never_fires_due_entries
This commit is contained in:
+131
-39
@@ -138,60 +138,109 @@ impl TimerStore {
|
|||||||
/// Same pattern as the watchdog in `agents::pool::auto_assign`.
|
/// Same pattern as the watchdog in `agents::pool::auto_assign`.
|
||||||
/// When a timer fires, `start_agent` is called for the story. If all coders
|
/// When a timer fires, `start_agent` is called for the story. If all coders
|
||||||
/// are busy the story remains in `2_current/` and auto-assign will pick it up.
|
/// are busy the story remains in `2_current/` and auto-assign will pick it up.
|
||||||
|
///
|
||||||
|
/// The loop body is wrapped in `catch_unwind` so a panic on any single tick
|
||||||
|
/// does not silently kill the background task.
|
||||||
pub fn spawn_timer_tick_loop(
|
pub fn spawn_timer_tick_loop(
|
||||||
store: Arc<TimerStore>,
|
store: Arc<TimerStore>,
|
||||||
agents: Arc<crate::agents::AgentPool>,
|
agents: Arc<crate::agents::AgentPool>,
|
||||||
project_root: PathBuf,
|
project_root: PathBuf,
|
||||||
) {
|
) {
|
||||||
|
let pending_count = store.list().len();
|
||||||
|
crate::slog!(
|
||||||
|
"[timer] Tick loop started; {pending_count} pending timer(s) loaded"
|
||||||
|
);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
|
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
let now = Utc::now();
|
|
||||||
let due = store.take_due(now);
|
|
||||||
for entry in due {
|
|
||||||
crate::slog!(
|
|
||||||
"[timer] Timer fired for story {}",
|
|
||||||
entry.story_id
|
|
||||||
);
|
|
||||||
|
|
||||||
// Move from backlog to current if needed — the auto-assign
|
// Wrap the tick body so a panic doesn't kill the loop.
|
||||||
// watcher will then start an agent automatically.
|
let tick_result = tick_once(&store, &agents, &project_root).await;
|
||||||
if let Err(e) = crate::agents::lifecycle::move_story_to_current(
|
if let Err(msg) = tick_result {
|
||||||
&project_root,
|
crate::slog_error!("[timer] Tick panicked: {msg}");
|
||||||
&entry.story_id,
|
|
||||||
) {
|
|
||||||
crate::slog!(
|
|
||||||
"[timer] Failed to move story {} to current: {e}",
|
|
||||||
entry.story_id
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
match agents
|
|
||||||
.start_agent(&project_root, &entry.story_id, None, None)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(info) => {
|
|
||||||
crate::slog!(
|
|
||||||
"[timer] Started agent {} for story {}",
|
|
||||||
info.agent_name,
|
|
||||||
entry.story_id
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
crate::slog!(
|
|
||||||
"[timer] Failed to start agent for story {}: {e} \
|
|
||||||
(auto-assign may pick it up)",
|
|
||||||
entry.story_id
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Execute one tick of the timer loop.
|
||||||
|
///
|
||||||
|
/// Separated from the loop so we can catch panics at the call-site.
|
||||||
|
/// Returns `Err` only when the tick panicked (the panic message is returned).
|
||||||
|
async fn tick_once(
|
||||||
|
store: &Arc<TimerStore>,
|
||||||
|
agents: &Arc<crate::agents::AgentPool>,
|
||||||
|
project_root: &Path,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
// take_due is sync and could panic (e.g. poisoned mutex) — catch it.
|
||||||
|
let due = {
|
||||||
|
let store_ref = Arc::clone(store);
|
||||||
|
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
|
||||||
|
store_ref.take_due(Utc::now())
|
||||||
|
}));
|
||||||
|
match result {
|
||||||
|
Ok(due) => due,
|
||||||
|
Err(e) => return Err(panic_payload_to_string(&e)),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let remaining = store.list().len();
|
||||||
|
crate::slog!(
|
||||||
|
"[timer] Tick: {} due, {remaining} remaining",
|
||||||
|
due.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
for entry in due {
|
||||||
|
crate::slog!("[timer] Timer fired for story {}", entry.story_id);
|
||||||
|
|
||||||
|
// Move from backlog to current if needed — the auto-assign
|
||||||
|
// watcher will then start an agent automatically.
|
||||||
|
if let Err(e) =
|
||||||
|
crate::agents::lifecycle::move_story_to_current(project_root, &entry.story_id)
|
||||||
|
{
|
||||||
|
crate::slog!(
|
||||||
|
"[timer] Failed to move story {} to current: {e}",
|
||||||
|
entry.story_id
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match agents
|
||||||
|
.start_agent(project_root, &entry.story_id, None, None)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(info) => {
|
||||||
|
crate::slog!(
|
||||||
|
"[timer] Started agent {} for story {}",
|
||||||
|
info.agent_name,
|
||||||
|
entry.story_id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
crate::slog!(
|
||||||
|
"[timer] Failed to start agent for story {}: {e} \
|
||||||
|
(auto-assign may pick it up)",
|
||||||
|
entry.story_id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract a human-readable message from a `catch_unwind` panic payload.
|
||||||
|
fn panic_payload_to_string(payload: &Box<dyn std::any::Any + Send>) -> String {
|
||||||
|
if let Some(s) = payload.downcast_ref::<&str>() {
|
||||||
|
(*s).to_string()
|
||||||
|
} else if let Some(s) = payload.downcast_ref::<String>() {
|
||||||
|
s.clone()
|
||||||
|
} else {
|
||||||
|
"unknown panic".to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Spawn a background task that listens for [`WatcherEvent::RateLimitHardBlock`]
|
/// Spawn a background task that listens for [`WatcherEvent::RateLimitHardBlock`]
|
||||||
/// events and auto-schedules a timer for the blocked story.
|
/// events and auto-schedules a timer for the blocked story.
|
||||||
///
|
///
|
||||||
@@ -981,4 +1030,47 @@ mod tests {
|
|||||||
// Timer was consumed.
|
// Timer was consumed.
|
||||||
assert!(store.list().is_empty(), "fired timer should be removed from store");
|
assert!(store.list().is_empty(), "fired timer should be removed from store");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── AC4: tick_once integration test ─────────────────────────────────
|
||||||
|
|
||||||
|
/// Create a past-due timer, run tick_once, and assert the entry is
|
||||||
|
/// consumed. start_agent will fail (no real agent binary), but
|
||||||
|
/// take_due must still drain the entry.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn tick_once_consumes_past_due_entry() {
|
||||||
|
use std::fs;
|
||||||
|
|
||||||
|
let dir = TempDir::new().unwrap();
|
||||||
|
let root = dir.path();
|
||||||
|
let backlog = root.join(".storkit/work/1_backlog");
|
||||||
|
let current = root.join(".storkit/work/2_current");
|
||||||
|
fs::create_dir_all(&backlog).unwrap();
|
||||||
|
fs::create_dir_all(¤t).unwrap();
|
||||||
|
fs::write(
|
||||||
|
backlog.join("421_story_foo.md"),
|
||||||
|
"---\nname: Foo\n---\n",
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let store = Arc::new(TimerStore::load(root.join("timers.json")));
|
||||||
|
let past = Utc::now() - Duration::seconds(5);
|
||||||
|
store.add("421_story_foo".to_string(), past).unwrap();
|
||||||
|
assert_eq!(store.list().len(), 1, "precondition: one pending timer");
|
||||||
|
|
||||||
|
let agents = Arc::new(crate::agents::AgentPool::new_test(19999));
|
||||||
|
|
||||||
|
// tick_once should drain the due entry even though start_agent
|
||||||
|
// will fail (no agent binary configured in the test pool).
|
||||||
|
let result = super::tick_once(&store, &agents, root).await;
|
||||||
|
assert!(result.is_ok(), "tick_once should not panic: {result:?}");
|
||||||
|
assert!(
|
||||||
|
store.list().is_empty(),
|
||||||
|
"past-due timer must be consumed after tick_once"
|
||||||
|
);
|
||||||
|
// Story should have been moved to current.
|
||||||
|
assert!(
|
||||||
|
current.join("421_story_foo.md").exists(),
|
||||||
|
"story should be in 2_current/ after tick fires"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user