diff --git a/server/examples/pipeline_state_sketch_statig.rs b/server/examples/pipeline_state_sketch_statig.rs index 5b751f70..24f94041 100644 --- a/server/examples/pipeline_state_sketch_statig.rs +++ b/server/examples/pipeline_state_sketch_statig.rs @@ -266,6 +266,175 @@ impl PipelineMachine { } } +// ── Per-node execution state machine ───────────────────────────────────────── +// +// Independent of the pipeline stage machine. Tracks "what is THIS node doing +// about this story right now." Lives in its own sub-module so its generated +// `State` enum doesn't collide with `PipelineMachine`'s. +// +// In a real implementation, multiple nodes can have different ExecutionState +// for the same story_id at the same time — and that's fine, because each +// node owns its own subspace in the CRDT (keyed by node pubkey). + +#[derive(Debug, Clone)] +pub enum ExecutionEvent { + SpawnRequested { agent: AgentName }, + SpawnedSuccessfully, + Heartbeat, + HitRateLimit { resume_at: DateTime }, + Exited { exit_code: i32 }, + Stopped, + Reset, +} + +pub mod execution { + use super::{AgentName, DateTime, ExecutionEvent, Utc}; + use statig::prelude::*; + + #[derive(Default)] + pub struct ExecutionMachine; + + #[state_machine( + initial = "State::idle()", + state(derive(Debug, Clone, PartialEq, Eq)) + )] + impl ExecutionMachine { + // ── Idle: no agent on this node is working on this story ────────── + + #[state(superstate = "any")] + fn idle(event: &ExecutionEvent) -> Response { + match event { + ExecutionEvent::SpawnRequested { agent } => { + Transition(State::pending(agent.clone(), Utc::now())) + } + _ => Super, + } + } + + // ── Pending: agent has been requested but hasn't started yet ────── + + #[state(superstate = "any")] + fn pending( + agent: &mut AgentName, + _since: &mut DateTime, + event: &ExecutionEvent, + ) -> Response { + match event { + ExecutionEvent::SpawnedSuccessfully => { + let now = Utc::now(); + Transition(State::running(agent.clone(), now, now)) + } + ExecutionEvent::HitRateLimit { resume_at } => { + Transition(State::rate_limited(agent.clone(), *resume_at)) + } + ExecutionEvent::Exited { exit_code } => Transition(State::completed( + agent.clone(), + *exit_code, + Utc::now(), + )), + _ => Super, + } + } + + // ── Running: agent's subprocess is alive ────────────────────────── + // + // Heartbeat is a self-transition: we update last_heartbeat in-place + // via the &mut reference and return `Handled` (no actual stage change). + // This is statig's idiomatic way to mutate state-local data without + // transitioning. + + #[state(superstate = "any")] + fn running( + agent: &mut AgentName, + _started_at: &mut DateTime, + last_heartbeat: &mut DateTime, + event: &ExecutionEvent, + ) -> Response { + match event { + ExecutionEvent::Heartbeat => { + *last_heartbeat = Utc::now(); + Handled + } + ExecutionEvent::HitRateLimit { resume_at } => { + Transition(State::rate_limited(agent.clone(), *resume_at)) + } + ExecutionEvent::Exited { exit_code } => Transition(State::completed( + agent.clone(), + *exit_code, + Utc::now(), + )), + _ => Super, + } + } + + // ── RateLimited: waiting for the API rate-limit window to clear ─── + + #[state(superstate = "any")] + fn rate_limited( + agent: &mut AgentName, + _resume_at: &mut DateTime, + event: &ExecutionEvent, + ) -> Response { + match event { + ExecutionEvent::SpawnedSuccessfully => { + let now = Utc::now(); + Transition(State::running(agent.clone(), now, now)) + } + ExecutionEvent::Exited { exit_code } => Transition(State::completed( + agent.clone(), + *exit_code, + Utc::now(), + )), + _ => Super, + } + } + + // ── Completed: agent finished, exit code captured ───────────────── + + #[state(superstate = "any")] + fn completed( + agent: &mut AgentName, + exit_code: &mut i32, + completed_at: &mut DateTime, + event: &ExecutionEvent, + ) -> Response { + // Completed is mostly terminal; only Stopped/Reset (handled by + // the `any` superstate) returns to Idle. Field names are kept + // un-underscored so the generated State::Completed variant + // exposes them as `exit_code` etc. for test pattern matching. + let _ = (agent, exit_code, completed_at, event); + Super + } + + // ── Cross-cutting: Stopped and Reset return to Idle from anywhere ─ + + #[superstate] + fn any(event: &ExecutionEvent) -> Response { + match event { + ExecutionEvent::Stopped | ExecutionEvent::Reset => { + Transition(State::idle()) + } + _ => Handled, + } + } + } +} + +// ── Side effects via statig's entry/exit actions (alternative to EventBus) ─── +// +// The bare version uses an explicit EventBus + Subscriber trait + per-state +// publish-on-transition pattern. statig has a more native equivalent: +// `#[action]`-tagged functions that fire on state entry / exit / transition. +// +// We don't include a full action-based example here — it would roughly look +// like adding `entry_action = "log_entry"` to each #[state] attribute and +// defining `fn log_entry(...)` in the impl block. The trade-off is that +// statig's actions are tightly coupled to the state machine impl block, +// while the bare version's EventBus allows arbitrary external subscribers +// to plug in without touching the state machine code. Both patterns are +// valid; pick based on whether you want side-effect dispatch INSIDE the +// machine (statig actions) or OUTSIDE it (bare EventBus). + // ── Tests ──────────────────────────────────────────────────────────────────── #[cfg(test)] @@ -485,6 +654,90 @@ mod tests { } )); } + + // ── ExecutionMachine tests ───────────────────────────────────────────── + + use super::execution::{ExecutionMachine, State as ExecState}; + + #[test] + fn execution_happy_path() { + let mut em = ExecutionMachine.state_machine(); + assert!(matches!(em.state(), ExecState::Idle {})); + + em.handle(&ExecutionEvent::SpawnRequested { + agent: AgentName("coder-1".into()), + }); + assert!(matches!(em.state(), ExecState::Pending { .. })); + + em.handle(&ExecutionEvent::SpawnedSuccessfully); + assert!(matches!(em.state(), ExecState::Running { .. })); + + em.handle(&ExecutionEvent::Heartbeat); + // Heartbeat updates last_heartbeat in-place; we stay in Running. + assert!(matches!(em.state(), ExecState::Running { .. })); + + em.handle(&ExecutionEvent::Exited { exit_code: 0 }); + assert!(matches!(em.state(), ExecState::Completed { exit_code: 0, .. })); + } + + #[test] + fn execution_rate_limit_then_resume() { + let mut em = ExecutionMachine.state_machine(); + em.handle(&ExecutionEvent::SpawnRequested { + agent: AgentName("coder-1".into()), + }); + em.handle(&ExecutionEvent::SpawnedSuccessfully); + em.handle(&ExecutionEvent::HitRateLimit { + resume_at: Utc::now() + chrono::Duration::minutes(5), + }); + assert!(matches!(em.state(), ExecState::RateLimited { .. })); + + em.handle(&ExecutionEvent::SpawnedSuccessfully); + assert!(matches!(em.state(), ExecState::Running { .. })); + } + + #[test] + fn execution_stop_from_running_returns_idle_via_superstate() { + let mut em = ExecutionMachine.state_machine(); + em.handle(&ExecutionEvent::SpawnRequested { + agent: AgentName("coder-1".into()), + }); + em.handle(&ExecutionEvent::SpawnedSuccessfully); + em.handle(&ExecutionEvent::Stopped); + assert!(matches!(em.state(), ExecState::Idle {})); + } + + #[test] + fn execution_stop_from_pending_returns_idle_via_superstate() { + let mut em = ExecutionMachine.state_machine(); + em.handle(&ExecutionEvent::SpawnRequested { + agent: AgentName("coder-1".into()), + }); + em.handle(&ExecutionEvent::Stopped); + assert!(matches!(em.state(), ExecState::Idle {})); + } + + #[test] + fn execution_stop_from_rate_limited_returns_idle_via_superstate() { + let mut em = ExecutionMachine.state_machine(); + em.handle(&ExecutionEvent::SpawnRequested { + agent: AgentName("coder-1".into()), + }); + em.handle(&ExecutionEvent::SpawnedSuccessfully); + em.handle(&ExecutionEvent::HitRateLimit { + resume_at: Utc::now() + chrono::Duration::minutes(5), + }); + em.handle(&ExecutionEvent::Stopped); + assert!(matches!(em.state(), ExecState::Idle {})); + } + + #[test] + fn nodepubkey_type_is_constructible() { + // Just exercise the NodePubkey newtype so it isn't dead code. + // In a real implementation it'd key the per-node ExecutionState + // map inside the CRDT. + let _ = NodePubkey([0u8; 32]); + } } // ── main: a quick interactive demo ───────────────────────────────────────────