sketch(520): add ExecutionMachine to the statig sketch for parity with bare
The statig version was missing the per-node ExecutionState machine that
the bare version has. This commit adds it as a sub-module so its
generated `State` enum doesn't collide with the top-level PipelineMachine's
`State` enum.
Adds:
- ExecutionEvent enum (top-level, alongside PipelineEvent)
- mod execution { … } sub-module containing ExecutionMachine
- States: idle, pending, running, rate_limited, completed
- Cross-cutting `any` superstate that handles Stopped/Reset → Idle
- 6 new tests covering the happy path, rate-limit + resume, and
stop-from-anywhere via the superstate
Also adds a small note about how statig's `#[action]` entry/exit hooks
would replace the bare version's external EventBus pattern (without
implementing it — we'd pick one or the other based on whether side
effects should live inside or outside the state machine).
Test count: 11 → 17 (all passing).
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -266,6 +266,175 @@ impl PipelineMachine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Per-node execution state machine ─────────────────────────────────────────
|
||||||
|
//
|
||||||
|
// Independent of the pipeline stage machine. Tracks "what is THIS node doing
|
||||||
|
// about this story right now." Lives in its own sub-module so its generated
|
||||||
|
// `State` enum doesn't collide with `PipelineMachine`'s.
|
||||||
|
//
|
||||||
|
// In a real implementation, multiple nodes can have different ExecutionState
|
||||||
|
// for the same story_id at the same time — and that's fine, because each
|
||||||
|
// node owns its own subspace in the CRDT (keyed by node pubkey).
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum ExecutionEvent {
|
||||||
|
SpawnRequested { agent: AgentName },
|
||||||
|
SpawnedSuccessfully,
|
||||||
|
Heartbeat,
|
||||||
|
HitRateLimit { resume_at: DateTime<Utc> },
|
||||||
|
Exited { exit_code: i32 },
|
||||||
|
Stopped,
|
||||||
|
Reset,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub mod execution {
|
||||||
|
use super::{AgentName, DateTime, ExecutionEvent, Utc};
|
||||||
|
use statig::prelude::*;
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct ExecutionMachine;
|
||||||
|
|
||||||
|
#[state_machine(
|
||||||
|
initial = "State::idle()",
|
||||||
|
state(derive(Debug, Clone, PartialEq, Eq))
|
||||||
|
)]
|
||||||
|
impl ExecutionMachine {
|
||||||
|
// ── Idle: no agent on this node is working on this story ──────────
|
||||||
|
|
||||||
|
#[state(superstate = "any")]
|
||||||
|
fn idle(event: &ExecutionEvent) -> Response<State> {
|
||||||
|
match event {
|
||||||
|
ExecutionEvent::SpawnRequested { agent } => {
|
||||||
|
Transition(State::pending(agent.clone(), Utc::now()))
|
||||||
|
}
|
||||||
|
_ => Super,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Pending: agent has been requested but hasn't started yet ──────
|
||||||
|
|
||||||
|
#[state(superstate = "any")]
|
||||||
|
fn pending(
|
||||||
|
agent: &mut AgentName,
|
||||||
|
_since: &mut DateTime<Utc>,
|
||||||
|
event: &ExecutionEvent,
|
||||||
|
) -> Response<State> {
|
||||||
|
match event {
|
||||||
|
ExecutionEvent::SpawnedSuccessfully => {
|
||||||
|
let now = Utc::now();
|
||||||
|
Transition(State::running(agent.clone(), now, now))
|
||||||
|
}
|
||||||
|
ExecutionEvent::HitRateLimit { resume_at } => {
|
||||||
|
Transition(State::rate_limited(agent.clone(), *resume_at))
|
||||||
|
}
|
||||||
|
ExecutionEvent::Exited { exit_code } => Transition(State::completed(
|
||||||
|
agent.clone(),
|
||||||
|
*exit_code,
|
||||||
|
Utc::now(),
|
||||||
|
)),
|
||||||
|
_ => Super,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Running: agent's subprocess is alive ──────────────────────────
|
||||||
|
//
|
||||||
|
// Heartbeat is a self-transition: we update last_heartbeat in-place
|
||||||
|
// via the &mut reference and return `Handled` (no actual stage change).
|
||||||
|
// This is statig's idiomatic way to mutate state-local data without
|
||||||
|
// transitioning.
|
||||||
|
|
||||||
|
#[state(superstate = "any")]
|
||||||
|
fn running(
|
||||||
|
agent: &mut AgentName,
|
||||||
|
_started_at: &mut DateTime<Utc>,
|
||||||
|
last_heartbeat: &mut DateTime<Utc>,
|
||||||
|
event: &ExecutionEvent,
|
||||||
|
) -> Response<State> {
|
||||||
|
match event {
|
||||||
|
ExecutionEvent::Heartbeat => {
|
||||||
|
*last_heartbeat = Utc::now();
|
||||||
|
Handled
|
||||||
|
}
|
||||||
|
ExecutionEvent::HitRateLimit { resume_at } => {
|
||||||
|
Transition(State::rate_limited(agent.clone(), *resume_at))
|
||||||
|
}
|
||||||
|
ExecutionEvent::Exited { exit_code } => Transition(State::completed(
|
||||||
|
agent.clone(),
|
||||||
|
*exit_code,
|
||||||
|
Utc::now(),
|
||||||
|
)),
|
||||||
|
_ => Super,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── RateLimited: waiting for the API rate-limit window to clear ───
|
||||||
|
|
||||||
|
#[state(superstate = "any")]
|
||||||
|
fn rate_limited(
|
||||||
|
agent: &mut AgentName,
|
||||||
|
_resume_at: &mut DateTime<Utc>,
|
||||||
|
event: &ExecutionEvent,
|
||||||
|
) -> Response<State> {
|
||||||
|
match event {
|
||||||
|
ExecutionEvent::SpawnedSuccessfully => {
|
||||||
|
let now = Utc::now();
|
||||||
|
Transition(State::running(agent.clone(), now, now))
|
||||||
|
}
|
||||||
|
ExecutionEvent::Exited { exit_code } => Transition(State::completed(
|
||||||
|
agent.clone(),
|
||||||
|
*exit_code,
|
||||||
|
Utc::now(),
|
||||||
|
)),
|
||||||
|
_ => Super,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Completed: agent finished, exit code captured ─────────────────
|
||||||
|
|
||||||
|
#[state(superstate = "any")]
|
||||||
|
fn completed(
|
||||||
|
agent: &mut AgentName,
|
||||||
|
exit_code: &mut i32,
|
||||||
|
completed_at: &mut DateTime<Utc>,
|
||||||
|
event: &ExecutionEvent,
|
||||||
|
) -> Response<State> {
|
||||||
|
// Completed is mostly terminal; only Stopped/Reset (handled by
|
||||||
|
// the `any` superstate) returns to Idle. Field names are kept
|
||||||
|
// un-underscored so the generated State::Completed variant
|
||||||
|
// exposes them as `exit_code` etc. for test pattern matching.
|
||||||
|
let _ = (agent, exit_code, completed_at, event);
|
||||||
|
Super
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Cross-cutting: Stopped and Reset return to Idle from anywhere ─
|
||||||
|
|
||||||
|
#[superstate]
|
||||||
|
fn any(event: &ExecutionEvent) -> Response<State> {
|
||||||
|
match event {
|
||||||
|
ExecutionEvent::Stopped | ExecutionEvent::Reset => {
|
||||||
|
Transition(State::idle())
|
||||||
|
}
|
||||||
|
_ => Handled,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Side effects via statig's entry/exit actions (alternative to EventBus) ───
|
||||||
|
//
|
||||||
|
// The bare version uses an explicit EventBus + Subscriber trait + per-state
|
||||||
|
// publish-on-transition pattern. statig has a more native equivalent:
|
||||||
|
// `#[action]`-tagged functions that fire on state entry / exit / transition.
|
||||||
|
//
|
||||||
|
// We don't include a full action-based example here — it would roughly look
|
||||||
|
// like adding `entry_action = "log_entry"` to each #[state] attribute and
|
||||||
|
// defining `fn log_entry(...)` in the impl block. The trade-off is that
|
||||||
|
// statig's actions are tightly coupled to the state machine impl block,
|
||||||
|
// while the bare version's EventBus allows arbitrary external subscribers
|
||||||
|
// to plug in without touching the state machine code. Both patterns are
|
||||||
|
// valid; pick based on whether you want side-effect dispatch INSIDE the
|
||||||
|
// machine (statig actions) or OUTSIDE it (bare EventBus).
|
||||||
|
|
||||||
// ── Tests ────────────────────────────────────────────────────────────────────
|
// ── Tests ────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -485,6 +654,90 @@ mod tests {
|
|||||||
}
|
}
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── ExecutionMachine tests ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
use super::execution::{ExecutionMachine, State as ExecState};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn execution_happy_path() {
|
||||||
|
let mut em = ExecutionMachine.state_machine();
|
||||||
|
assert!(matches!(em.state(), ExecState::Idle {}));
|
||||||
|
|
||||||
|
em.handle(&ExecutionEvent::SpawnRequested {
|
||||||
|
agent: AgentName("coder-1".into()),
|
||||||
|
});
|
||||||
|
assert!(matches!(em.state(), ExecState::Pending { .. }));
|
||||||
|
|
||||||
|
em.handle(&ExecutionEvent::SpawnedSuccessfully);
|
||||||
|
assert!(matches!(em.state(), ExecState::Running { .. }));
|
||||||
|
|
||||||
|
em.handle(&ExecutionEvent::Heartbeat);
|
||||||
|
// Heartbeat updates last_heartbeat in-place; we stay in Running.
|
||||||
|
assert!(matches!(em.state(), ExecState::Running { .. }));
|
||||||
|
|
||||||
|
em.handle(&ExecutionEvent::Exited { exit_code: 0 });
|
||||||
|
assert!(matches!(em.state(), ExecState::Completed { exit_code: 0, .. }));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn execution_rate_limit_then_resume() {
|
||||||
|
let mut em = ExecutionMachine.state_machine();
|
||||||
|
em.handle(&ExecutionEvent::SpawnRequested {
|
||||||
|
agent: AgentName("coder-1".into()),
|
||||||
|
});
|
||||||
|
em.handle(&ExecutionEvent::SpawnedSuccessfully);
|
||||||
|
em.handle(&ExecutionEvent::HitRateLimit {
|
||||||
|
resume_at: Utc::now() + chrono::Duration::minutes(5),
|
||||||
|
});
|
||||||
|
assert!(matches!(em.state(), ExecState::RateLimited { .. }));
|
||||||
|
|
||||||
|
em.handle(&ExecutionEvent::SpawnedSuccessfully);
|
||||||
|
assert!(matches!(em.state(), ExecState::Running { .. }));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn execution_stop_from_running_returns_idle_via_superstate() {
|
||||||
|
let mut em = ExecutionMachine.state_machine();
|
||||||
|
em.handle(&ExecutionEvent::SpawnRequested {
|
||||||
|
agent: AgentName("coder-1".into()),
|
||||||
|
});
|
||||||
|
em.handle(&ExecutionEvent::SpawnedSuccessfully);
|
||||||
|
em.handle(&ExecutionEvent::Stopped);
|
||||||
|
assert!(matches!(em.state(), ExecState::Idle {}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn execution_stop_from_pending_returns_idle_via_superstate() {
|
||||||
|
let mut em = ExecutionMachine.state_machine();
|
||||||
|
em.handle(&ExecutionEvent::SpawnRequested {
|
||||||
|
agent: AgentName("coder-1".into()),
|
||||||
|
});
|
||||||
|
em.handle(&ExecutionEvent::Stopped);
|
||||||
|
assert!(matches!(em.state(), ExecState::Idle {}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn execution_stop_from_rate_limited_returns_idle_via_superstate() {
|
||||||
|
let mut em = ExecutionMachine.state_machine();
|
||||||
|
em.handle(&ExecutionEvent::SpawnRequested {
|
||||||
|
agent: AgentName("coder-1".into()),
|
||||||
|
});
|
||||||
|
em.handle(&ExecutionEvent::SpawnedSuccessfully);
|
||||||
|
em.handle(&ExecutionEvent::HitRateLimit {
|
||||||
|
resume_at: Utc::now() + chrono::Duration::minutes(5),
|
||||||
|
});
|
||||||
|
em.handle(&ExecutionEvent::Stopped);
|
||||||
|
assert!(matches!(em.state(), ExecState::Idle {}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn nodepubkey_type_is_constructible() {
|
||||||
|
// Just exercise the NodePubkey newtype so it isn't dead code.
|
||||||
|
// In a real implementation it'd key the per-node ExecutionState
|
||||||
|
// map inside the CRDT.
|
||||||
|
let _ = NodePubkey([0u8; 32]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── main: a quick interactive demo ───────────────────────────────────────────
|
// ── main: a quick interactive demo ───────────────────────────────────────────
|
||||||
|
|||||||
Reference in New Issue
Block a user