Files
huskies/server/examples/pipeline_state_sketch_statig.rs
T

774 lines
27 KiB
Rust
Raw Normal View History

//! Pipeline state machine — design sketch (story 520) — STATIG version.
//!
//! Parallel to `pipeline_state_sketch_bare.rs`. Same domain types, same
//! transitions, same event semantics — but the state machine is built using
//! the `statig` crate (https://crates.io/crates/statig) instead of being
//! hand-rolled.
//!
//! Run with:
//! cargo run --example pipeline_state_sketch_statig -p huskies
//! Test with:
//! cargo test --example pipeline_state_sketch_statig -p huskies
//!
//! Why both versions?
//!
//! - The **bare** version shows that plain Rust enums + a transition function
//! are *enough* to make impossible states unrepresentable. No framework.
//! - The **statig** version shows what we'd gain by adopting a state-machine
//! crate: hierarchical states (the `active` superstate factors out the
//! cross-cutting Block/ReviewHold/Abandon/Supersede transitions, which the
//! bare version had to duplicate inline with `|` patterns), generated
//! `State` enum with type-safe data-carrying constructors, and stateful
//! `handle(&event)` dispatch. Type safety is preserved either way:
//! `State::merge(BranchName, NonZeroU32)` requires both args at the
//! constructor, just like `Stage::Merge { feature_branch, commits_ahead }`
//! in the bare version.
//!
//! Trade-off: statig adds a dependency and a proc-macro layer, which makes
//! the code harder to read for someone unfamiliar with the crate. The
//! framework-free version is more transparent but requires manual
//! pattern-matching and inline duplication for cross-cutting transitions.
use chrono::{DateTime, Utc};
use statig::prelude::*;
use std::num::NonZeroU32;
// ── Newtypes (same as bare version) ──────────────────────────────────────────
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StoryId(pub String);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BranchName(pub String);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct GitSha(pub String);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AgentName(pub String);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct NodePubkey(pub [u8; 32]);
// ── Archive reason (same as bare version) ────────────────────────────────────
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ArchiveReason {
Completed,
Abandoned,
Superseded { by: StoryId },
Blocked { reason: String },
MergeFailed { reason: String },
ReviewHeld { reason: String },
}
// ── Pipeline events (same as bare version) ───────────────────────────────────
#[derive(Debug, Clone)]
pub enum PipelineEvent {
DepsMet,
GatesStarted,
GatesPassed {
feature_branch: BranchName,
commits_ahead: NonZeroU32,
},
GatesFailed {
reason: String,
},
QaSkipped {
feature_branch: BranchName,
commits_ahead: NonZeroU32,
},
MergeSucceeded {
merge_commit: GitSha,
},
MergeFailedFinal {
reason: String,
},
Accepted,
Block {
reason: String,
},
Unblock,
Abandon,
Supersede {
by: StoryId,
},
ReviewHold {
reason: String,
},
}
// ── The state machine ────────────────────────────────────────────────────────
//
// statig requires a "context" struct (the `Self` of the impl block). For us
// it's empty — all per-state data lives ON the state itself, carried forward
// by the auto-generated `State::xxx(...)` constructors.
#[derive(Default)]
pub struct PipelineMachine;
#[state_machine(
initial = "State::backlog()",
state(derive(Debug, Clone, PartialEq, Eq))
)]
impl PipelineMachine {
// ── Active stages: backlog, coding, qa, merge ────────────────────────
//
// Each is a child of the `active` superstate, which handles the
// cross-cutting transitions (Block / ReviewHold / Abandon / Supersede)
// exactly once instead of being duplicated per state.
#[state(superstate = "active")]
fn backlog(event: &PipelineEvent) -> Response<State> {
match event {
PipelineEvent::DepsMet => Transition(State::coding()),
_ => Super, // defer to `active` (and ultimately to "unhandled")
}
}
#[state(superstate = "active")]
fn coding(event: &PipelineEvent) -> Response<State> {
match event {
PipelineEvent::GatesStarted => Transition(State::qa()),
PipelineEvent::QaSkipped {
feature_branch,
commits_ahead,
} => Transition(State::merge(feature_branch.clone(), *commits_ahead)),
_ => Super,
}
}
#[state(superstate = "active")]
fn qa(event: &PipelineEvent) -> Response<State> {
match event {
PipelineEvent::GatesPassed {
feature_branch,
commits_ahead,
} => Transition(State::merge(feature_branch.clone(), *commits_ahead)),
PipelineEvent::GatesFailed { .. } => Transition(State::coding()),
_ => Super,
}
}
#[state(superstate = "active")]
fn merge(
_feature_branch: &mut BranchName,
_commits_ahead: &mut NonZeroU32,
event: &PipelineEvent,
) -> Response<State> {
// Note: the type signature of this state function REQUIRES both
// _feature_branch and _commits_ahead. There is no way to construct
// a Merge state without them. NonZeroU32 makes "merge with zero
// commits ahead" structurally unrepresentable (bug 519 fixed by
// construction, same as the bare version).
//
// The fields are prefixed with `_` because this state function only
// transitions forward and doesn't read them — but they're available
// to inspect via the State::Merge variant generated by the macro.
match event {
PipelineEvent::MergeSucceeded { merge_commit } => {
Transition(State::done(Utc::now(), merge_commit.clone()))
}
PipelineEvent::MergeFailedFinal { reason } => Transition(State::archived(
Utc::now(),
ArchiveReason::MergeFailed {
reason: reason.clone(),
},
)),
_ => Super,
}
}
// ── Cross-cutting superstate ─────────────────────────────────────────
//
// This is the statig payoff: ONE place defines what Block/ReviewHold/
// Abandon/Supersede do across all four active stages. The bare version
// had to duplicate this with `|` patterns. Adding a new active stage
// here means just adding it as a child of `active`; the cross-cutting
// transitions come for free.
#[superstate]
fn active(event: &PipelineEvent) -> Response<State> {
let now = Utc::now();
match event {
PipelineEvent::Block { reason } => Transition(State::archived(
now,
ArchiveReason::Blocked {
reason: reason.clone(),
},
)),
PipelineEvent::ReviewHold { reason } => Transition(State::archived(
now,
ArchiveReason::ReviewHeld {
reason: reason.clone(),
},
)),
PipelineEvent::Abandon => Transition(State::archived(now, ArchiveReason::Abandoned)),
PipelineEvent::Supersede { by } => Transition(State::archived(
now,
ArchiveReason::Superseded { by: by.clone() },
)),
_ => Handled, // unhandled events are silently ignored
}
}
// ── Done is special: it's not a child of `active` because Block and ──
// ── ReviewHold are NOT valid from Done (per the bare version's rules).
// ── Abandon and Supersede ARE valid, so we have to handle them inline.
#[state]
fn done(
merged_at: &mut DateTime<Utc>,
merge_commit: &mut GitSha,
event: &PipelineEvent,
) -> Response<State> {
let now = Utc::now();
let _ = merged_at; // currently unused; available for queries
let _ = merge_commit;
match event {
PipelineEvent::Accepted => Transition(State::archived(now, ArchiveReason::Completed)),
PipelineEvent::Abandon => Transition(State::archived(now, ArchiveReason::Abandoned)),
PipelineEvent::Supersede { by } => Transition(State::archived(
now,
ArchiveReason::Superseded { by: by.clone() },
)),
_ => Handled,
}
}
// ── Archived is terminal except for Unblock from Blocked → Backlog ───
#[state]
fn archived(
archived_at: &mut DateTime<Utc>,
reason: &mut ArchiveReason,
event: &PipelineEvent,
) -> Response<State> {
let _ = archived_at;
match event {
PipelineEvent::Unblock => {
if matches!(reason, ArchiveReason::Blocked { .. }) {
Transition(State::backlog())
} else {
Handled // unblock only valid from Blocked
}
}
_ => Handled,
}
}
}
// ── Per-node execution state machine ─────────────────────────────────────────
//
// Independent of the pipeline stage machine. Tracks "what is THIS node doing
// about this story right now." Lives in its own sub-module so its generated
// `State` enum doesn't collide with `PipelineMachine`'s.
//
// In a real implementation, multiple nodes can have different ExecutionState
// for the same story_id at the same time — and that's fine, because each
// node owns its own subspace in the CRDT (keyed by node pubkey).
#[derive(Debug, Clone)]
pub enum ExecutionEvent {
SpawnRequested { agent: AgentName },
SpawnedSuccessfully,
Heartbeat,
HitRateLimit { resume_at: DateTime<Utc> },
Exited { exit_code: i32 },
Stopped,
Reset,
}
pub mod execution {
use super::{AgentName, DateTime, ExecutionEvent, Utc};
use statig::prelude::*;
#[derive(Default)]
pub struct ExecutionMachine;
#[state_machine(initial = "State::idle()", state(derive(Debug, Clone, PartialEq, Eq)))]
impl ExecutionMachine {
// ── Idle: no agent on this node is working on this story ──────────
#[state(superstate = "any")]
fn idle(event: &ExecutionEvent) -> Response<State> {
match event {
ExecutionEvent::SpawnRequested { agent } => {
Transition(State::pending(agent.clone(), Utc::now()))
}
_ => Super,
}
}
// ── Pending: agent has been requested but hasn't started yet ──────
#[state(superstate = "any")]
fn pending(
agent: &mut AgentName,
_since: &mut DateTime<Utc>,
event: &ExecutionEvent,
) -> Response<State> {
match event {
ExecutionEvent::SpawnedSuccessfully => {
let now = Utc::now();
Transition(State::running(agent.clone(), now, now))
}
ExecutionEvent::HitRateLimit { resume_at } => {
Transition(State::rate_limited(agent.clone(), *resume_at))
}
ExecutionEvent::Exited { exit_code } => {
Transition(State::completed(agent.clone(), *exit_code, Utc::now()))
}
_ => Super,
}
}
// ── Running: agent's subprocess is alive ──────────────────────────
//
// Heartbeat is a self-transition: we update last_heartbeat in-place
// via the &mut reference and return `Handled` (no actual stage change).
// This is statig's idiomatic way to mutate state-local data without
// transitioning.
#[state(superstate = "any")]
fn running(
agent: &mut AgentName,
_started_at: &mut DateTime<Utc>,
last_heartbeat: &mut DateTime<Utc>,
event: &ExecutionEvent,
) -> Response<State> {
match event {
ExecutionEvent::Heartbeat => {
*last_heartbeat = Utc::now();
Handled
}
ExecutionEvent::HitRateLimit { resume_at } => {
Transition(State::rate_limited(agent.clone(), *resume_at))
}
ExecutionEvent::Exited { exit_code } => {
Transition(State::completed(agent.clone(), *exit_code, Utc::now()))
}
_ => Super,
}
}
// ── RateLimited: waiting for the API rate-limit window to clear ───
#[state(superstate = "any")]
fn rate_limited(
agent: &mut AgentName,
_resume_at: &mut DateTime<Utc>,
event: &ExecutionEvent,
) -> Response<State> {
match event {
ExecutionEvent::SpawnedSuccessfully => {
let now = Utc::now();
Transition(State::running(agent.clone(), now, now))
}
ExecutionEvent::Exited { exit_code } => {
Transition(State::completed(agent.clone(), *exit_code, Utc::now()))
}
_ => Super,
}
}
// ── Completed: agent finished, exit code captured ─────────────────
#[state(superstate = "any")]
fn completed(
agent: &mut AgentName,
exit_code: &mut i32,
completed_at: &mut DateTime<Utc>,
event: &ExecutionEvent,
) -> Response<State> {
// Completed is mostly terminal; only Stopped/Reset (handled by
// the `any` superstate) returns to Idle. Field names are kept
// un-underscored so the generated State::Completed variant
// exposes them as `exit_code` etc. for test pattern matching.
let _ = (agent, exit_code, completed_at, event);
Super
}
// ── Cross-cutting: Stopped and Reset return to Idle from anywhere ─
#[superstate]
fn any(event: &ExecutionEvent) -> Response<State> {
match event {
ExecutionEvent::Stopped | ExecutionEvent::Reset => Transition(State::idle()),
_ => Handled,
}
}
}
}
// ── Side effects via statig's entry/exit actions (alternative to EventBus) ───
//
// The bare version uses an explicit EventBus + Subscriber trait + per-state
// publish-on-transition pattern. statig has a more native equivalent:
// `#[action]`-tagged functions that fire on state entry / exit / transition.
//
// We don't include a full action-based example here — it would roughly look
// like adding `entry_action = "log_entry"` to each #[state] attribute and
// defining `fn log_entry(...)` in the impl block. The trade-off is that
// statig's actions are tightly coupled to the state machine impl block,
// while the bare version's EventBus allows arbitrary external subscribers
// to plug in without touching the state machine code. Both patterns are
// valid; pick based on whether you want side-effect dispatch INSIDE the
// machine (statig actions) or OUTSIDE it (bare EventBus).
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
fn nz(n: u32) -> NonZeroU32 {
NonZeroU32::new(n).unwrap()
}
fn fb(name: &str) -> BranchName {
BranchName(name.to_string())
}
fn sha(s: &str) -> GitSha {
GitSha(s.to_string())
}
// ── Happy path ─────────────────────────────────────────────────────────
#[test]
fn happy_path_backlog_through_done() {
let mut sm = PipelineMachine.state_machine();
assert!(matches!(sm.state(), State::Backlog {}));
sm.handle(&PipelineEvent::DepsMet);
assert!(matches!(sm.state(), State::Coding {}));
sm.handle(&PipelineEvent::QaSkipped {
feature_branch: fb("feature/story-1"),
commits_ahead: nz(3),
});
assert!(matches!(sm.state(), State::Merge { .. }));
sm.handle(&PipelineEvent::MergeSucceeded {
merge_commit: sha("abc123"),
});
assert!(matches!(sm.state(), State::Done { .. }));
sm.handle(&PipelineEvent::Accepted);
assert!(matches!(
sm.state(),
State::Archived {
reason: ArchiveReason::Completed,
..
}
));
}
#[test]
fn qa_retry_loop() {
let mut sm = PipelineMachine.state_machine();
sm.handle(&PipelineEvent::DepsMet);
sm.handle(&PipelineEvent::GatesStarted);
assert!(matches!(sm.state(), State::Qa {}));
sm.handle(&PipelineEvent::GatesFailed {
reason: "tests failed".into(),
});
assert!(matches!(sm.state(), State::Coding {}));
}
// ── Bug 519 unrepresentability: Merge with zero commits ahead ──────────
#[test]
fn merge_with_zero_commits_is_unrepresentable() {
// Identical to the bare version: NonZeroU32::new(0) returns None,
// so a State::merge(branch, ZERO) literally cannot be constructed.
assert!(NonZeroU32::new(0).is_none());
}
// ── Cross-cutting Block from any active stage (superstate proves it) ───
#[test]
fn block_from_backlog_via_superstate() {
let mut sm = PipelineMachine.state_machine();
sm.handle(&PipelineEvent::Block {
reason: "stuck".into(),
});
assert!(matches!(
sm.state(),
State::Archived {
reason: ArchiveReason::Blocked { .. },
..
}
));
}
#[test]
fn block_from_coding_via_superstate() {
let mut sm = PipelineMachine.state_machine();
sm.handle(&PipelineEvent::DepsMet);
sm.handle(&PipelineEvent::Block {
reason: "stuck".into(),
});
assert!(matches!(
sm.state(),
State::Archived {
reason: ArchiveReason::Blocked { .. },
..
}
));
}
#[test]
fn block_from_qa_via_superstate() {
let mut sm = PipelineMachine.state_machine();
sm.handle(&PipelineEvent::DepsMet);
sm.handle(&PipelineEvent::GatesStarted);
sm.handle(&PipelineEvent::Block {
reason: "stuck".into(),
});
assert!(matches!(
sm.state(),
State::Archived {
reason: ArchiveReason::Blocked { .. },
..
}
));
}
#[test]
fn block_from_merge_via_superstate() {
let mut sm = PipelineMachine.state_machine();
sm.handle(&PipelineEvent::DepsMet);
sm.handle(&PipelineEvent::QaSkipped {
feature_branch: fb("f"),
commits_ahead: nz(1),
});
sm.handle(&PipelineEvent::Block {
reason: "stuck".into(),
});
assert!(matches!(
sm.state(),
State::Archived {
reason: ArchiveReason::Blocked { .. },
..
}
));
}
// ── Block from Done is NOT valid (Done isn't a child of `active`) ──────
#[test]
fn block_from_done_is_ignored() {
let mut sm = PipelineMachine.state_machine();
sm.handle(&PipelineEvent::DepsMet);
sm.handle(&PipelineEvent::QaSkipped {
feature_branch: fb("f"),
commits_ahead: nz(1),
});
sm.handle(&PipelineEvent::MergeSucceeded {
merge_commit: sha("abc"),
});
// Now in Done. Block should NOT transition us anywhere.
sm.handle(&PipelineEvent::Block {
reason: "stuck".into(),
});
assert!(matches!(sm.state(), State::Done { .. }));
}
// ── Abandon from Done IS valid (handled inline in done()) ──────────────
#[test]
fn abandon_from_done_works() {
let mut sm = PipelineMachine.state_machine();
sm.handle(&PipelineEvent::DepsMet);
sm.handle(&PipelineEvent::QaSkipped {
feature_branch: fb("f"),
commits_ahead: nz(1),
});
sm.handle(&PipelineEvent::MergeSucceeded {
merge_commit: sha("abc"),
});
sm.handle(&PipelineEvent::Abandon);
assert!(matches!(
sm.state(),
State::Archived {
reason: ArchiveReason::Abandoned,
..
}
));
}
// ── Unblock from Archived(Blocked) → Backlog ───────────────────────────
#[test]
fn unblock_returns_to_backlog() {
let mut sm = PipelineMachine.state_machine();
sm.handle(&PipelineEvent::Block {
reason: "test".into(),
});
assert!(matches!(
sm.state(),
State::Archived {
reason: ArchiveReason::Blocked { .. },
..
}
));
sm.handle(&PipelineEvent::Unblock);
assert!(matches!(sm.state(), State::Backlog {}));
}
#[test]
fn unblock_from_review_held_does_nothing() {
// Unblock is specifically for Blocked, not for any Archived variant.
let mut sm = PipelineMachine.state_machine();
sm.handle(&PipelineEvent::ReviewHold {
reason: "TBD".into(),
});
// Now in Archived(ReviewHeld). Unblock should NOT transition.
sm.handle(&PipelineEvent::Unblock);
assert!(matches!(
sm.state(),
State::Archived {
reason: ArchiveReason::ReviewHeld { .. },
..
}
));
}
// ── ExecutionMachine tests ─────────────────────────────────────────────
use super::execution::{ExecutionMachine, State as ExecState};
#[test]
fn execution_happy_path() {
let mut em = ExecutionMachine.state_machine();
assert!(matches!(em.state(), ExecState::Idle {}));
em.handle(&ExecutionEvent::SpawnRequested {
agent: AgentName("coder-1".into()),
});
assert!(matches!(em.state(), ExecState::Pending { .. }));
em.handle(&ExecutionEvent::SpawnedSuccessfully);
assert!(matches!(em.state(), ExecState::Running { .. }));
em.handle(&ExecutionEvent::Heartbeat);
// Heartbeat updates last_heartbeat in-place; we stay in Running.
assert!(matches!(em.state(), ExecState::Running { .. }));
em.handle(&ExecutionEvent::Exited { exit_code: 0 });
assert!(matches!(
em.state(),
ExecState::Completed { exit_code: 0, .. }
));
}
#[test]
fn execution_rate_limit_then_resume() {
let mut em = ExecutionMachine.state_machine();
em.handle(&ExecutionEvent::SpawnRequested {
agent: AgentName("coder-1".into()),
});
em.handle(&ExecutionEvent::SpawnedSuccessfully);
em.handle(&ExecutionEvent::HitRateLimit {
resume_at: Utc::now() + chrono::Duration::minutes(5),
});
assert!(matches!(em.state(), ExecState::RateLimited { .. }));
em.handle(&ExecutionEvent::SpawnedSuccessfully);
assert!(matches!(em.state(), ExecState::Running { .. }));
}
#[test]
fn execution_stop_from_running_returns_idle_via_superstate() {
let mut em = ExecutionMachine.state_machine();
em.handle(&ExecutionEvent::SpawnRequested {
agent: AgentName("coder-1".into()),
});
em.handle(&ExecutionEvent::SpawnedSuccessfully);
em.handle(&ExecutionEvent::Stopped);
assert!(matches!(em.state(), ExecState::Idle {}));
}
#[test]
fn execution_stop_from_pending_returns_idle_via_superstate() {
let mut em = ExecutionMachine.state_machine();
em.handle(&ExecutionEvent::SpawnRequested {
agent: AgentName("coder-1".into()),
});
em.handle(&ExecutionEvent::Stopped);
assert!(matches!(em.state(), ExecState::Idle {}));
}
#[test]
fn execution_stop_from_rate_limited_returns_idle_via_superstate() {
let mut em = ExecutionMachine.state_machine();
em.handle(&ExecutionEvent::SpawnRequested {
agent: AgentName("coder-1".into()),
});
em.handle(&ExecutionEvent::SpawnedSuccessfully);
em.handle(&ExecutionEvent::HitRateLimit {
resume_at: Utc::now() + chrono::Duration::minutes(5),
});
em.handle(&ExecutionEvent::Stopped);
assert!(matches!(em.state(), ExecState::Idle {}));
}
#[test]
fn nodepubkey_type_is_constructible() {
// Just exercise the NodePubkey newtype so it isn't dead code.
// In a real implementation it'd key the per-node ExecutionState
// map inside the CRDT.
let _ = NodePubkey([0u8; 32]);
}
}
// ── main: a quick interactive demo ───────────────────────────────────────────
fn main() {
println!("─── Pipeline state machine sketch (story 520) — STATIG version ───\n");
let mut sm = PipelineMachine.state_machine();
println!("Initial: {:?}\n", sm.state());
println!("→ DepsMet");
sm.handle(&PipelineEvent::DepsMet);
println!(" state: {:?}\n", sm.state());
println!("→ QaSkipped");
sm.handle(&PipelineEvent::QaSkipped {
feature_branch: BranchName("feature/story-100".into()),
commits_ahead: NonZeroU32::new(3).unwrap(),
});
println!(" state: {:?}\n", sm.state());
println!("→ MergeSucceeded");
sm.handle(&PipelineEvent::MergeSucceeded {
merge_commit: GitSha("abc1234".into()),
});
println!(" state: {:?}\n", sm.state());
println!("→ Accepted");
sm.handle(&PipelineEvent::Accepted);
println!(" state: {:?}\n", sm.state());
println!("─── Trying invalid transition: Done → Unblock ───");
let mut sm2 = PipelineMachine.state_machine();
sm2.handle(&PipelineEvent::DepsMet);
sm2.handle(&PipelineEvent::QaSkipped {
feature_branch: BranchName("feature/story-101".into()),
commits_ahead: NonZeroU32::new(2).unwrap(),
});
sm2.handle(&PipelineEvent::MergeSucceeded {
merge_commit: GitSha("def5678".into()),
});
println!(" before Unblock: {:?}", sm2.state());
sm2.handle(&PipelineEvent::Unblock); // silently ignored — no transition
println!(
" after Unblock: {:?} (no change — Unblock is a no-op from Done)",
sm2.state()
);
}