huskies: merge 1039
This commit is contained in:
@@ -4,9 +4,14 @@
|
||||
use crate::agents::{AgentPool, ReconciliationEvent};
|
||||
use crate::config;
|
||||
use crate::gateway_relay;
|
||||
use crate::http::context::AppContext;
|
||||
use crate::http::mcp::dispatch::dispatch_tool_call;
|
||||
use crate::io;
|
||||
use crate::service;
|
||||
use crate::service::status::StatusBroadcaster;
|
||||
use crate::service::timer::scheduled::{
|
||||
ScheduledTimer, ScheduledTimerStore, TimerAction, TimerMode,
|
||||
};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::broadcast;
|
||||
@@ -126,6 +131,8 @@ pub(crate) fn spawn_event_bridges(
|
||||
/// current wall-clock time against each timer's `due_at` field. Cannot be
|
||||
/// reactive because timers encode absolute timestamps that only become
|
||||
/// actionable when the clock reaches them.
|
||||
/// - **Scheduled timer tick** (every second): fires generic scheduled timers
|
||||
/// registered via the `schedule_timer` MCP tool.
|
||||
/// - **Agent watchdog** (every 30 seconds): detects orphaned `Running` agents
|
||||
/// by comparing elapsed time since the last heartbeat. Cannot be reactive
|
||||
/// because the absence of an event (no heartbeat) is what signals a problem;
|
||||
@@ -136,10 +143,16 @@ pub(crate) fn spawn_event_bridges(
|
||||
pub(crate) fn spawn_tick_loop(
|
||||
agents: Arc<AgentPool>,
|
||||
timer_store: Arc<service::timer::TimerStore>,
|
||||
scheduled_timer_store: Arc<ScheduledTimerStore>,
|
||||
root: Option<PathBuf>,
|
||||
ctx: AppContext,
|
||||
) {
|
||||
let pending_count = timer_store.list().len();
|
||||
crate::slog!("[tick] Unified tick loop started; {pending_count} pending timer(s)");
|
||||
let scheduled_count = scheduled_timer_store.list().len();
|
||||
crate::slog!(
|
||||
"[tick] Unified tick loop started; {pending_count} rate-limit timer(s), \
|
||||
{scheduled_count} scheduled timer(s)"
|
||||
);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
|
||||
@@ -157,6 +170,9 @@ pub(crate) fn spawn_tick_loop(
|
||||
}
|
||||
}
|
||||
|
||||
// Generic scheduled timer tick.
|
||||
tick_scheduled_timers(&scheduled_timer_store, &ctx).await;
|
||||
|
||||
// Time-based: the watchdog detects silence (no heartbeat within a
|
||||
// timeout window). A `TransitionFired` subscriber cannot observe the
|
||||
// absence of events, so this must remain on a periodic tick.
|
||||
@@ -176,6 +192,85 @@ pub(crate) fn spawn_tick_loop(
|
||||
});
|
||||
}
|
||||
|
||||
/// Fire any due generic scheduled timers and re-arm recurring ones.
|
||||
///
|
||||
/// Called every second from the unified tick loop. Catch-up semantics: timers
|
||||
/// whose `fire_at` is already in the past fire on the next tick after boot.
|
||||
async fn tick_scheduled_timers(store: &Arc<ScheduledTimerStore>, ctx: &AppContext) {
|
||||
let due = store.take_due(chrono::Utc::now());
|
||||
if due.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
crate::slog!("[scheduled-timer] {} timer(s) due", due.len());
|
||||
|
||||
for timer in due {
|
||||
fire_scheduled_timer(&timer, ctx).await;
|
||||
|
||||
// Re-arm recurring timers.
|
||||
if let TimerMode::Recurring { interval_secs } = &timer.mode {
|
||||
let next_fire = timer.fire_at + chrono::Duration::seconds(*interval_secs as i64);
|
||||
let rearmed = ScheduledTimer {
|
||||
id: timer.id.clone(),
|
||||
label: timer.label.clone(),
|
||||
fire_at: next_fire,
|
||||
action: timer.action.clone(),
|
||||
mode: timer.mode.clone(),
|
||||
created_at: timer.created_at,
|
||||
};
|
||||
if let Err(e) = store.add(rearmed) {
|
||||
crate::slog_error!("[scheduled-timer] Failed to re-arm timer {}: {e}", timer.id);
|
||||
} else {
|
||||
crate::slog!(
|
||||
"[scheduled-timer] Recurring timer {} re-armed for {}",
|
||||
timer.id,
|
||||
next_fire
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute a single scheduled timer's action.
|
||||
async fn fire_scheduled_timer(timer: &ScheduledTimer, ctx: &AppContext) {
|
||||
let label = timer.label.as_deref().unwrap_or(&timer.id);
|
||||
match &timer.action {
|
||||
TimerAction::Mcp { method, args } => {
|
||||
crate::slog!(
|
||||
"[scheduled-timer] Firing MCP action '{}' for timer {}",
|
||||
method,
|
||||
timer.id
|
||||
);
|
||||
match dispatch_tool_call(method, args.clone(), ctx).await {
|
||||
Ok(result) => {
|
||||
crate::slog!(
|
||||
"[scheduled-timer] Timer {} ('{}') fired OK: {}",
|
||||
timer.id,
|
||||
label,
|
||||
result.chars().take(200).collect::<String>()
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
crate::slog_error!(
|
||||
"[scheduled-timer] Timer {} ('{}') MCP call '{}' failed: {e}",
|
||||
timer.id,
|
||||
label,
|
||||
method
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
TimerAction::Prompt { text } => {
|
||||
crate::slog!(
|
||||
"[scheduled-timer] Prompt timer {} ('{}') fired: {}",
|
||||
timer.id,
|
||||
label,
|
||||
text
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn the gateway relay task if `gateway_url` is configured in
|
||||
/// `project.toml` or the `HUSKIES_GATEWAY_URL` environment variable.
|
||||
pub(crate) fn spawn_gateway_relay(startup_root: &Option<PathBuf>, status: Arc<StatusBroadcaster>) {
|
||||
|
||||
Reference in New Issue
Block a user