//! Pipeline state machine — design sketch (story 520) — STATIG version. //! //! Parallel to `pipeline_state_sketch_bare.rs`. Same domain types, same //! transitions, same event semantics — but the state machine is built using //! the `statig` crate (https://crates.io/crates/statig) instead of being //! hand-rolled. //! //! Run with: //! cargo run --example pipeline_state_sketch_statig -p huskies //! Test with: //! cargo test --example pipeline_state_sketch_statig -p huskies //! //! Why both versions? //! //! - The **bare** version shows that plain Rust enums + a transition function //! are *enough* to make impossible states unrepresentable. No framework. //! - The **statig** version shows what we'd gain by adopting a state-machine //! crate: hierarchical states (the `active` superstate factors out the //! cross-cutting Block/ReviewHold/Abandon/Supersede transitions, which the //! bare version had to duplicate inline with `|` patterns), generated //! `State` enum with type-safe data-carrying constructors, and stateful //! `handle(&event)` dispatch. Type safety is preserved either way: //! `State::merge(BranchName, NonZeroU32)` requires both args at the //! constructor, just like `Stage::Merge { feature_branch, commits_ahead }` //! in the bare version. //! //! Trade-off: statig adds a dependency and a proc-macro layer, which makes //! the code harder to read for someone unfamiliar with the crate. The //! framework-free version is more transparent but requires manual //! pattern-matching and inline duplication for cross-cutting transitions. use chrono::{DateTime, Utc}; use statig::prelude::*; use std::num::NonZeroU32; // ── Newtypes (same as bare version) ────────────────────────────────────────── #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StoryId(pub String); #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BranchName(pub String); #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct GitSha(pub String); #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct AgentName(pub String); #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct NodePubkey(pub [u8; 32]); // ── Archive reason (same as bare version) ──────────────────────────────────── #[derive(Debug, Clone, PartialEq, Eq)] pub enum ArchiveReason { Completed, Abandoned, Superseded { by: StoryId }, Blocked { reason: String }, MergeFailed { reason: String }, ReviewHeld { reason: String }, } // ── Pipeline events (same as bare version) ─────────────────────────────────── #[derive(Debug, Clone)] pub enum PipelineEvent { DepsMet, GatesStarted, GatesPassed { feature_branch: BranchName, commits_ahead: NonZeroU32, }, GatesFailed { reason: String, }, QaSkipped { feature_branch: BranchName, commits_ahead: NonZeroU32, }, MergeSucceeded { merge_commit: GitSha, }, MergeFailedFinal { reason: String, }, Accepted, Block { reason: String, }, Unblock, Abandon, Supersede { by: StoryId, }, ReviewHold { reason: String, }, } // ── The state machine ──────────────────────────────────────────────────────── // // statig requires a "context" struct (the `Self` of the impl block). For us // it's empty — all per-state data lives ON the state itself, carried forward // by the auto-generated `State::xxx(...)` constructors. #[derive(Default)] pub struct PipelineMachine; #[state_machine( initial = "State::backlog()", state(derive(Debug, Clone, PartialEq, Eq)) )] impl PipelineMachine { // ── Active stages: backlog, coding, qa, merge ──────────────────────── // // Each is a child of the `active` superstate, which handles the // cross-cutting transitions (Block / ReviewHold / Abandon / Supersede) // exactly once instead of being duplicated per state. #[state(superstate = "active")] fn backlog(event: &PipelineEvent) -> Response { match event { PipelineEvent::DepsMet => Transition(State::coding()), _ => Super, // defer to `active` (and ultimately to "unhandled") } } #[state(superstate = "active")] fn coding(event: &PipelineEvent) -> Response { match event { PipelineEvent::GatesStarted => Transition(State::qa()), PipelineEvent::QaSkipped { feature_branch, commits_ahead, } => Transition(State::merge(feature_branch.clone(), *commits_ahead)), _ => Super, } } #[state(superstate = "active")] fn qa(event: &PipelineEvent) -> Response { match event { PipelineEvent::GatesPassed { feature_branch, commits_ahead, } => Transition(State::merge(feature_branch.clone(), *commits_ahead)), PipelineEvent::GatesFailed { .. } => Transition(State::coding()), _ => Super, } } #[state(superstate = "active")] fn merge( _feature_branch: &mut BranchName, _commits_ahead: &mut NonZeroU32, event: &PipelineEvent, ) -> Response { // Note: the type signature of this state function REQUIRES both // _feature_branch and _commits_ahead. There is no way to construct // a Merge state without them. NonZeroU32 makes "merge with zero // commits ahead" structurally unrepresentable (bug 519 fixed by // construction, same as the bare version). // // The fields are prefixed with `_` because this state function only // transitions forward and doesn't read them — but they're available // to inspect via the State::Merge variant generated by the macro. match event { PipelineEvent::MergeSucceeded { merge_commit } => { Transition(State::done(Utc::now(), merge_commit.clone())) } PipelineEvent::MergeFailedFinal { reason } => Transition(State::archived( Utc::now(), ArchiveReason::MergeFailed { reason: reason.clone(), }, )), _ => Super, } } // ── Cross-cutting superstate ───────────────────────────────────────── // // This is the statig payoff: ONE place defines what Block/ReviewHold/ // Abandon/Supersede do across all four active stages. The bare version // had to duplicate this with `|` patterns. Adding a new active stage // here means just adding it as a child of `active`; the cross-cutting // transitions come for free. #[superstate] fn active(event: &PipelineEvent) -> Response { let now = Utc::now(); match event { PipelineEvent::Block { reason } => Transition(State::archived( now, ArchiveReason::Blocked { reason: reason.clone(), }, )), PipelineEvent::ReviewHold { reason } => Transition(State::archived( now, ArchiveReason::ReviewHeld { reason: reason.clone(), }, )), PipelineEvent::Abandon => Transition(State::archived(now, ArchiveReason::Abandoned)), PipelineEvent::Supersede { by } => Transition(State::archived( now, ArchiveReason::Superseded { by: by.clone() }, )), _ => Handled, // unhandled events are silently ignored } } // ── Done is special: it's not a child of `active` because Block and ── // ── ReviewHold are NOT valid from Done (per the bare version's rules). // ── Abandon and Supersede ARE valid, so we have to handle them inline. #[state] fn done( merged_at: &mut DateTime, merge_commit: &mut GitSha, event: &PipelineEvent, ) -> Response { let now = Utc::now(); let _ = merged_at; // currently unused; available for queries let _ = merge_commit; match event { PipelineEvent::Accepted => Transition(State::archived(now, ArchiveReason::Completed)), PipelineEvent::Abandon => Transition(State::archived(now, ArchiveReason::Abandoned)), PipelineEvent::Supersede { by } => Transition(State::archived( now, ArchiveReason::Superseded { by: by.clone() }, )), _ => Handled, } } // ── Archived is terminal except for Unblock from Blocked → Backlog ─── #[state] fn archived( archived_at: &mut DateTime, reason: &mut ArchiveReason, event: &PipelineEvent, ) -> Response { let _ = archived_at; match event { PipelineEvent::Unblock => { if matches!(reason, ArchiveReason::Blocked { .. }) { Transition(State::backlog()) } else { Handled // unblock only valid from Blocked } } _ => Handled, } } } // ── Per-node execution state machine ───────────────────────────────────────── // // Independent of the pipeline stage machine. Tracks "what is THIS node doing // about this story right now." Lives in its own sub-module so its generated // `State` enum doesn't collide with `PipelineMachine`'s. // // In a real implementation, multiple nodes can have different ExecutionState // for the same story_id at the same time — and that's fine, because each // node owns its own subspace in the CRDT (keyed by node pubkey). #[derive(Debug, Clone)] pub enum ExecutionEvent { SpawnRequested { agent: AgentName }, SpawnedSuccessfully, Heartbeat, HitRateLimit { resume_at: DateTime }, Exited { exit_code: i32 }, Stopped, Reset, } pub mod execution { use super::{AgentName, DateTime, ExecutionEvent, Utc}; use statig::prelude::*; #[derive(Default)] pub struct ExecutionMachine; #[state_machine(initial = "State::idle()", state(derive(Debug, Clone, PartialEq, Eq)))] impl ExecutionMachine { // ── Idle: no agent on this node is working on this story ────────── #[state(superstate = "any")] fn idle(event: &ExecutionEvent) -> Response { match event { ExecutionEvent::SpawnRequested { agent } => { Transition(State::pending(agent.clone(), Utc::now())) } _ => Super, } } // ── Pending: agent has been requested but hasn't started yet ────── #[state(superstate = "any")] fn pending( agent: &mut AgentName, _since: &mut DateTime, event: &ExecutionEvent, ) -> Response { match event { ExecutionEvent::SpawnedSuccessfully => { let now = Utc::now(); Transition(State::running(agent.clone(), now, now)) } ExecutionEvent::HitRateLimit { resume_at } => { Transition(State::rate_limited(agent.clone(), *resume_at)) } ExecutionEvent::Exited { exit_code } => { Transition(State::completed(agent.clone(), *exit_code, Utc::now())) } _ => Super, } } // ── Running: agent's subprocess is alive ────────────────────────── // // Heartbeat is a self-transition: we update last_heartbeat in-place // via the &mut reference and return `Handled` (no actual stage change). // This is statig's idiomatic way to mutate state-local data without // transitioning. #[state(superstate = "any")] fn running( agent: &mut AgentName, _started_at: &mut DateTime, last_heartbeat: &mut DateTime, event: &ExecutionEvent, ) -> Response { match event { ExecutionEvent::Heartbeat => { *last_heartbeat = Utc::now(); Handled } ExecutionEvent::HitRateLimit { resume_at } => { Transition(State::rate_limited(agent.clone(), *resume_at)) } ExecutionEvent::Exited { exit_code } => { Transition(State::completed(agent.clone(), *exit_code, Utc::now())) } _ => Super, } } // ── RateLimited: waiting for the API rate-limit window to clear ─── #[state(superstate = "any")] fn rate_limited( agent: &mut AgentName, _resume_at: &mut DateTime, event: &ExecutionEvent, ) -> Response { match event { ExecutionEvent::SpawnedSuccessfully => { let now = Utc::now(); Transition(State::running(agent.clone(), now, now)) } ExecutionEvent::Exited { exit_code } => { Transition(State::completed(agent.clone(), *exit_code, Utc::now())) } _ => Super, } } // ── Completed: agent finished, exit code captured ───────────────── #[state(superstate = "any")] fn completed( agent: &mut AgentName, exit_code: &mut i32, completed_at: &mut DateTime, event: &ExecutionEvent, ) -> Response { // Completed is mostly terminal; only Stopped/Reset (handled by // the `any` superstate) returns to Idle. Field names are kept // un-underscored so the generated State::Completed variant // exposes them as `exit_code` etc. for test pattern matching. let _ = (agent, exit_code, completed_at, event); Super } // ── Cross-cutting: Stopped and Reset return to Idle from anywhere ─ #[superstate] fn any(event: &ExecutionEvent) -> Response { match event { ExecutionEvent::Stopped | ExecutionEvent::Reset => Transition(State::idle()), _ => Handled, } } } } // ── Side effects via statig's entry/exit actions (alternative to EventBus) ─── // // The bare version uses an explicit EventBus + Subscriber trait + per-state // publish-on-transition pattern. statig has a more native equivalent: // `#[action]`-tagged functions that fire on state entry / exit / transition. // // We don't include a full action-based example here — it would roughly look // like adding `entry_action = "log_entry"` to each #[state] attribute and // defining `fn log_entry(...)` in the impl block. The trade-off is that // statig's actions are tightly coupled to the state machine impl block, // while the bare version's EventBus allows arbitrary external subscribers // to plug in without touching the state machine code. Both patterns are // valid; pick based on whether you want side-effect dispatch INSIDE the // machine (statig actions) or OUTSIDE it (bare EventBus). // ── Tests ──────────────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; fn nz(n: u32) -> NonZeroU32 { NonZeroU32::new(n).unwrap() } fn fb(name: &str) -> BranchName { BranchName(name.to_string()) } fn sha(s: &str) -> GitSha { GitSha(s.to_string()) } // ── Happy path ───────────────────────────────────────────────────────── #[test] fn happy_path_backlog_through_done() { let mut sm = PipelineMachine.state_machine(); assert!(matches!(sm.state(), State::Backlog {})); sm.handle(&PipelineEvent::DepsMet); assert!(matches!(sm.state(), State::Coding {})); sm.handle(&PipelineEvent::QaSkipped { feature_branch: fb("feature/story-1"), commits_ahead: nz(3), }); assert!(matches!(sm.state(), State::Merge { .. })); sm.handle(&PipelineEvent::MergeSucceeded { merge_commit: sha("abc123"), }); assert!(matches!(sm.state(), State::Done { .. })); sm.handle(&PipelineEvent::Accepted); assert!(matches!( sm.state(), State::Archived { reason: ArchiveReason::Completed, .. } )); } #[test] fn qa_retry_loop() { let mut sm = PipelineMachine.state_machine(); sm.handle(&PipelineEvent::DepsMet); sm.handle(&PipelineEvent::GatesStarted); assert!(matches!(sm.state(), State::Qa {})); sm.handle(&PipelineEvent::GatesFailed { reason: "tests failed".into(), }); assert!(matches!(sm.state(), State::Coding {})); } // ── Bug 519 unrepresentability: Merge with zero commits ahead ────────── #[test] fn merge_with_zero_commits_is_unrepresentable() { // Identical to the bare version: NonZeroU32::new(0) returns None, // so a State::merge(branch, ZERO) literally cannot be constructed. assert!(NonZeroU32::new(0).is_none()); } // ── Cross-cutting Block from any active stage (superstate proves it) ─── #[test] fn block_from_backlog_via_superstate() { let mut sm = PipelineMachine.state_machine(); sm.handle(&PipelineEvent::Block { reason: "stuck".into(), }); assert!(matches!( sm.state(), State::Archived { reason: ArchiveReason::Blocked { .. }, .. } )); } #[test] fn block_from_coding_via_superstate() { let mut sm = PipelineMachine.state_machine(); sm.handle(&PipelineEvent::DepsMet); sm.handle(&PipelineEvent::Block { reason: "stuck".into(), }); assert!(matches!( sm.state(), State::Archived { reason: ArchiveReason::Blocked { .. }, .. } )); } #[test] fn block_from_qa_via_superstate() { let mut sm = PipelineMachine.state_machine(); sm.handle(&PipelineEvent::DepsMet); sm.handle(&PipelineEvent::GatesStarted); sm.handle(&PipelineEvent::Block { reason: "stuck".into(), }); assert!(matches!( sm.state(), State::Archived { reason: ArchiveReason::Blocked { .. }, .. } )); } #[test] fn block_from_merge_via_superstate() { let mut sm = PipelineMachine.state_machine(); sm.handle(&PipelineEvent::DepsMet); sm.handle(&PipelineEvent::QaSkipped { feature_branch: fb("f"), commits_ahead: nz(1), }); sm.handle(&PipelineEvent::Block { reason: "stuck".into(), }); assert!(matches!( sm.state(), State::Archived { reason: ArchiveReason::Blocked { .. }, .. } )); } // ── Block from Done is NOT valid (Done isn't a child of `active`) ────── #[test] fn block_from_done_is_ignored() { let mut sm = PipelineMachine.state_machine(); sm.handle(&PipelineEvent::DepsMet); sm.handle(&PipelineEvent::QaSkipped { feature_branch: fb("f"), commits_ahead: nz(1), }); sm.handle(&PipelineEvent::MergeSucceeded { merge_commit: sha("abc"), }); // Now in Done. Block should NOT transition us anywhere. sm.handle(&PipelineEvent::Block { reason: "stuck".into(), }); assert!(matches!(sm.state(), State::Done { .. })); } // ── Abandon from Done IS valid (handled inline in done()) ────────────── #[test] fn abandon_from_done_works() { let mut sm = PipelineMachine.state_machine(); sm.handle(&PipelineEvent::DepsMet); sm.handle(&PipelineEvent::QaSkipped { feature_branch: fb("f"), commits_ahead: nz(1), }); sm.handle(&PipelineEvent::MergeSucceeded { merge_commit: sha("abc"), }); sm.handle(&PipelineEvent::Abandon); assert!(matches!( sm.state(), State::Archived { reason: ArchiveReason::Abandoned, .. } )); } // ── Unblock from Archived(Blocked) → Backlog ─────────────────────────── #[test] fn unblock_returns_to_backlog() { let mut sm = PipelineMachine.state_machine(); sm.handle(&PipelineEvent::Block { reason: "test".into(), }); assert!(matches!( sm.state(), State::Archived { reason: ArchiveReason::Blocked { .. }, .. } )); sm.handle(&PipelineEvent::Unblock); assert!(matches!(sm.state(), State::Backlog {})); } #[test] fn unblock_from_review_held_does_nothing() { // Unblock is specifically for Blocked, not for any Archived variant. let mut sm = PipelineMachine.state_machine(); sm.handle(&PipelineEvent::ReviewHold { reason: "TBD".into(), }); // Now in Archived(ReviewHeld). Unblock should NOT transition. sm.handle(&PipelineEvent::Unblock); assert!(matches!( sm.state(), State::Archived { reason: ArchiveReason::ReviewHeld { .. }, .. } )); } // ── ExecutionMachine tests ───────────────────────────────────────────── use super::execution::{ExecutionMachine, State as ExecState}; #[test] fn execution_happy_path() { let mut em = ExecutionMachine.state_machine(); assert!(matches!(em.state(), ExecState::Idle {})); em.handle(&ExecutionEvent::SpawnRequested { agent: AgentName("coder-1".into()), }); assert!(matches!(em.state(), ExecState::Pending { .. })); em.handle(&ExecutionEvent::SpawnedSuccessfully); assert!(matches!(em.state(), ExecState::Running { .. })); em.handle(&ExecutionEvent::Heartbeat); // Heartbeat updates last_heartbeat in-place; we stay in Running. assert!(matches!(em.state(), ExecState::Running { .. })); em.handle(&ExecutionEvent::Exited { exit_code: 0 }); assert!(matches!( em.state(), ExecState::Completed { exit_code: 0, .. } )); } #[test] fn execution_rate_limit_then_resume() { let mut em = ExecutionMachine.state_machine(); em.handle(&ExecutionEvent::SpawnRequested { agent: AgentName("coder-1".into()), }); em.handle(&ExecutionEvent::SpawnedSuccessfully); em.handle(&ExecutionEvent::HitRateLimit { resume_at: Utc::now() + chrono::Duration::minutes(5), }); assert!(matches!(em.state(), ExecState::RateLimited { .. })); em.handle(&ExecutionEvent::SpawnedSuccessfully); assert!(matches!(em.state(), ExecState::Running { .. })); } #[test] fn execution_stop_from_running_returns_idle_via_superstate() { let mut em = ExecutionMachine.state_machine(); em.handle(&ExecutionEvent::SpawnRequested { agent: AgentName("coder-1".into()), }); em.handle(&ExecutionEvent::SpawnedSuccessfully); em.handle(&ExecutionEvent::Stopped); assert!(matches!(em.state(), ExecState::Idle {})); } #[test] fn execution_stop_from_pending_returns_idle_via_superstate() { let mut em = ExecutionMachine.state_machine(); em.handle(&ExecutionEvent::SpawnRequested { agent: AgentName("coder-1".into()), }); em.handle(&ExecutionEvent::Stopped); assert!(matches!(em.state(), ExecState::Idle {})); } #[test] fn execution_stop_from_rate_limited_returns_idle_via_superstate() { let mut em = ExecutionMachine.state_machine(); em.handle(&ExecutionEvent::SpawnRequested { agent: AgentName("coder-1".into()), }); em.handle(&ExecutionEvent::SpawnedSuccessfully); em.handle(&ExecutionEvent::HitRateLimit { resume_at: Utc::now() + chrono::Duration::minutes(5), }); em.handle(&ExecutionEvent::Stopped); assert!(matches!(em.state(), ExecState::Idle {})); } #[test] fn nodepubkey_type_is_constructible() { // Just exercise the NodePubkey newtype so it isn't dead code. // In a real implementation it'd key the per-node ExecutionState // map inside the CRDT. let _ = NodePubkey([0u8; 32]); } } // ── main: a quick interactive demo ─────────────────────────────────────────── fn main() { println!("─── Pipeline state machine sketch (story 520) — STATIG version ───\n"); let mut sm = PipelineMachine.state_machine(); println!("Initial: {:?}\n", sm.state()); println!("→ DepsMet"); sm.handle(&PipelineEvent::DepsMet); println!(" state: {:?}\n", sm.state()); println!("→ QaSkipped"); sm.handle(&PipelineEvent::QaSkipped { feature_branch: BranchName("feature/story-100".into()), commits_ahead: NonZeroU32::new(3).unwrap(), }); println!(" state: {:?}\n", sm.state()); println!("→ MergeSucceeded"); sm.handle(&PipelineEvent::MergeSucceeded { merge_commit: GitSha("abc1234".into()), }); println!(" state: {:?}\n", sm.state()); println!("→ Accepted"); sm.handle(&PipelineEvent::Accepted); println!(" state: {:?}\n", sm.state()); println!("─── Trying invalid transition: Done → Unblock ───"); let mut sm2 = PipelineMachine.state_machine(); sm2.handle(&PipelineEvent::DepsMet); sm2.handle(&PipelineEvent::QaSkipped { feature_branch: BranchName("feature/story-101".into()), commits_ahead: NonZeroU32::new(2).unwrap(), }); sm2.handle(&PipelineEvent::MergeSucceeded { merge_commit: GitSha("def5678".into()), }); println!(" before Unblock: {:?}", sm2.state()); sm2.handle(&PipelineEvent::Unblock); // silently ignored — no transition println!( " after Unblock: {:?} (no change — Unblock is a no-op from Done)", sm2.state() ); }