sketch(520): typed pipeline state machine — bare and statig versions
Two parallel scratch experiments under server/examples/ exploring the
typed Rust state machine that should replace huskies's current
stringly-typed CRDT representation (story 520).
- pipeline_state_sketch_bare.rs — hand-rolled, plain enums + match
- pipeline_state_sketch_statig.rs — using the statig crate
Both sketches:
- Define the same Stage enum (Backlog, Coding, Qa, Merge, Done, Archived)
- Define ArchiveReason (subsumes refactor 436's blocked/merge_failure/review_hold)
- Define ExecutionState (per-node, separate from synced Stage) — bare only
- Define PipelineEvent and the valid transitions
- Make bug 519 unrepresentable: Stage::Merge requires NonZeroU32 commits_ahead
- Make bug 502 unrepresentable: Coder agents can't be assigned to Merge state
- Have happy-path tests, retry-loop tests, and invalid-transition tests
Differences:
- Bare uses pure pattern matching, no framework. ~720 lines.
- Statig uses #[state_machine] proc macro and gets free hierarchical
states via the `active` superstate that factors out the cross-cutting
Block / ReviewHold / Abandon / Supersede transitions across the four
active stages. ~440 lines, 11 passing tests.
Run with:
cargo run --example pipeline_state_sketch_bare -p huskies
cargo run --example pipeline_state_sketch_statig -p huskies
cargo test --example pipeline_state_sketch_bare -p huskies
cargo test --example pipeline_state_sketch_statig -p huskies
Adds statig 0.3 as a dev-dependency in server/Cargo.toml. Cargo.lock
updated to include statig + statig-macro and their transitive deps.
Not wired into the main codebase. Once we agree on which version to
adopt, story 520 promotes the chosen sketch into a real
server/src/pipeline_state.rs module.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Generated
+46
@@ -2331,6 +2331,7 @@ dependencies = [
|
|||||||
"serde_yaml",
|
"serde_yaml",
|
||||||
"sha2 0.11.0",
|
"sha2 0.11.0",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
|
"statig",
|
||||||
"strip-ansi-escapes",
|
"strip-ansi-escapes",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
"tokio",
|
"tokio",
|
||||||
@@ -4097,6 +4098,30 @@ dependencies = [
|
|||||||
"toml_edit 0.25.10+spec-1.1.0",
|
"toml_edit 0.25.10+spec-1.1.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "proc-macro-error"
|
||||||
|
version = "1.0.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro-error-attr",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn 1.0.109",
|
||||||
|
"version_check",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "proc-macro-error-attr"
|
||||||
|
version = "1.0.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"version_check",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "proc-macro-error-attr2"
|
name = "proc-macro-error-attr2"
|
||||||
version = "2.0.0"
|
version = "2.0.0"
|
||||||
@@ -5722,6 +5747,27 @@ version = "1.1.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
|
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "statig"
|
||||||
|
version = "0.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "42c467cc59664639bf70b8225b1b4a9c30d926f3e010c29e804bf940d618c663"
|
||||||
|
dependencies = [
|
||||||
|
"statig_macro",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "statig_macro"
|
||||||
|
version = "0.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "bf4c61563b68df6e452ceece3fba1329c8c6a5d348fe17b0778fada28bc95fde"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro-error",
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn 1.0.109",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "string_cache"
|
name = "string_cache"
|
||||||
version = "0.8.9"
|
version = "0.8.9"
|
||||||
|
|||||||
@@ -55,3 +55,5 @@ check-cfg = ["cfg(feature, values(\"logging-base\"))"]
|
|||||||
tempfile = { workspace = true }
|
tempfile = { workspace = true }
|
||||||
mockito = "1"
|
mockito = "1"
|
||||||
filetime = { workspace = true }
|
filetime = { workspace = true }
|
||||||
|
# For the pipeline_state_sketch_statig example only.
|
||||||
|
statig = "0.3"
|
||||||
|
|||||||
@@ -0,0 +1,858 @@
|
|||||||
|
//! Pipeline state machine — design sketch (story 520) — BARE version.
|
||||||
|
//!
|
||||||
|
//! This is a SCRATCH EXPERIMENT, not wired into anything else in the codebase.
|
||||||
|
//! "Bare" version: hand-rolled with plain Rust enums and pattern matching,
|
||||||
|
//! no external state-machine library. See `pipeline_state_sketch_statig.rs`
|
||||||
|
//! for a parallel version using the `statig` crate.
|
||||||
|
//!
|
||||||
|
//! Run with:
|
||||||
|
//! cargo run --example pipeline_state_sketch_bare -p huskies
|
||||||
|
//! Test with:
|
||||||
|
//! cargo test --example pipeline_state_sketch_bare -p huskies
|
||||||
|
//!
|
||||||
|
//! Goal: demonstrate the typed pipeline state machine that should replace
|
||||||
|
//! huskies's stringly-typed CRDT state. It is intentionally standalone —
|
||||||
|
//! no integration with crdt_state, no persistence, no events escape this
|
||||||
|
//! file. Once we agree on the shape, this becomes the foundation for the
|
||||||
|
//! real implementation in src/pipeline_state.rs.
|
||||||
|
//!
|
||||||
|
//! The point of this version is to show that the Rust type system alone is
|
||||||
|
//! enough to make impossible states unrepresentable, without needing any
|
||||||
|
//! state-machine framework.
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use std::num::NonZeroU32;
|
||||||
|
|
||||||
|
// ── Newtypes ─────────────────────────────────────────────────────────────────
|
||||||
|
//
|
||||||
|
// Each of these is a "wrapper around String" today, but the wrapping itself
|
||||||
|
// is the point: a function that takes a `BranchName` cannot accidentally be
|
||||||
|
// called with a `StoryId`. Validation can be added later (e.g. `BranchName::new`
|
||||||
|
// returns `Result<Self, BranchNameError>` and the inner `String` is private)
|
||||||
|
// without changing call sites.
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub struct StoryId(pub String);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub struct BranchName(pub String);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub struct GitSha(pub String);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub struct AgentName(pub String);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||||
|
pub struct NodePubkey(pub [u8; 32]);
|
||||||
|
|
||||||
|
// ── Synced pipeline stage (lives in CRDT, converges across nodes) ────────────
|
||||||
|
//
|
||||||
|
// This is the SHARED state — every node sees the same Stage for a given story
|
||||||
|
// after CRDT convergence. Local-only state (which agent is running, retry
|
||||||
|
// count, rate-limit timers) lives separately in `ExecutionState` below, keyed
|
||||||
|
// by node pubkey.
|
||||||
|
//
|
||||||
|
// Notice what is NOT a field on Stage:
|
||||||
|
// - `agent` — that's local execution state, not pipeline state
|
||||||
|
// - `retry_count` — also local
|
||||||
|
// - `blocked` — folded into `Archived { reason: Blocked { .. } }`
|
||||||
|
//
|
||||||
|
// And notice what IS a field, by construction:
|
||||||
|
// - Stage::Merge requires a non-zero commits_ahead (silent no-op merge is unrepresentable)
|
||||||
|
// - Stage::Done requires a merge_commit (a "done" story without merge metadata is unrepresentable)
|
||||||
|
// - Stage::Archived always carries a reason (no "archived but we don't know why")
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub enum Stage {
|
||||||
|
/// Story exists, waiting for dependencies or auto-assign promotion.
|
||||||
|
Backlog,
|
||||||
|
|
||||||
|
/// Story is being actively coded somewhere in the mesh.
|
||||||
|
/// (Which node is local — see ExecutionState.)
|
||||||
|
Coding,
|
||||||
|
|
||||||
|
/// Coder has run; gates are running.
|
||||||
|
Qa,
|
||||||
|
|
||||||
|
/// Gates passed (or were skipped); ready to merge.
|
||||||
|
/// `commits_ahead: NonZeroU32` makes "Merge with nothing to merge" structurally impossible.
|
||||||
|
/// This single field eliminates today's bug 519 (silent mergemaster no-op).
|
||||||
|
Merge {
|
||||||
|
feature_branch: BranchName,
|
||||||
|
commits_ahead: NonZeroU32,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// Mergemaster squashed to master. Always carries the merge metadata,
|
||||||
|
/// so a "done" story is provably reachable from master.
|
||||||
|
Done {
|
||||||
|
merged_at: DateTime<Utc>,
|
||||||
|
merge_commit: GitSha,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// Out of the active flow. The reason explains why.
|
||||||
|
Archived {
|
||||||
|
archived_at: DateTime<Utc>,
|
||||||
|
reason: ArchiveReason,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub enum ArchiveReason {
|
||||||
|
/// Normal happy-path completion: accepted and filed away.
|
||||||
|
Completed,
|
||||||
|
/// User explicitly abandoned the story.
|
||||||
|
Abandoned,
|
||||||
|
/// Replaced by another story.
|
||||||
|
Superseded { by: StoryId },
|
||||||
|
/// Manually blocked, awaiting human resolution. Was bug-436's `blocked: true`.
|
||||||
|
Blocked { reason: String },
|
||||||
|
/// Mergemaster failed beyond the retry budget. Was bug-436's `merge_failure`.
|
||||||
|
MergeFailed { reason: String },
|
||||||
|
/// Held in review at human request. Was bug-436's `review_hold`.
|
||||||
|
ReviewHeld { reason: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Per-node execution state (lives in CRDT under node_pubkey key) ───────────
|
||||||
|
//
|
||||||
|
// LOCAL-AUTHORED but GLOBALLY-READABLE. Each node only writes to entries where
|
||||||
|
// node_pubkey == self, so there are no inter-author CRDT merge conflicts. Other
|
||||||
|
// nodes can READ all entries to know what's happening across the mesh.
|
||||||
|
//
|
||||||
|
// In the real CRDT document, this would be stored as something like:
|
||||||
|
// crdt.execution_state: { node_pubkey -> { story_id -> ExecutionState } }
|
||||||
|
//
|
||||||
|
// Why this matters operationally:
|
||||||
|
// - Cross-node observability: matrix bot can show "node A is running coder-1
|
||||||
|
// on story X, node B is rate-limited on story Y"
|
||||||
|
// - Heartbeat detection: if `last_heartbeat` is stale > N min, the entry is
|
||||||
|
// dead (laptop closed, OOM, segfault). Other nodes can take over (story 479).
|
||||||
|
// - Foundation for CRDT-based work claiming (story 479).
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub enum ExecutionState {
|
||||||
|
/// No agent on this node is currently working on this story.
|
||||||
|
Idle,
|
||||||
|
|
||||||
|
/// An agent has been requested but hasn't started its subprocess yet.
|
||||||
|
Pending {
|
||||||
|
agent: AgentName,
|
||||||
|
since: DateTime<Utc>,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// An agent's subprocess is alive on this node.
|
||||||
|
/// `last_heartbeat` is updated periodically; if stale, the process probably died.
|
||||||
|
Running {
|
||||||
|
agent: AgentName,
|
||||||
|
started_at: DateTime<Utc>,
|
||||||
|
last_heartbeat: DateTime<Utc>,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// Agent hit a rate limit; will resume at the given time.
|
||||||
|
RateLimited {
|
||||||
|
agent: AgentName,
|
||||||
|
resume_at: DateTime<Utc>,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// Agent finished. exit_code disambiguates clean exit / panic / etc.
|
||||||
|
Completed {
|
||||||
|
agent: AgentName,
|
||||||
|
exit_code: i32,
|
||||||
|
completed_at: DateTime<Utc>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Pipeline events ──────────────────────────────────────────────────────────
|
||||||
|
//
|
||||||
|
// Events drive Stage transitions. Each event carries any data needed to
|
||||||
|
// construct the destination state, so the type signature of `transition`
|
||||||
|
// guarantees we can never accidentally land in an underspecified state.
|
||||||
|
//
|
||||||
|
// (Compare with today's stringly-typed code, where you call
|
||||||
|
// `move_story_to_merge(story_id)` and the destination state is built from
|
||||||
|
// whatever happens to be in scope at the time.)
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum PipelineEvent {
|
||||||
|
/// All depends_on stories are in Done or Archived; promotion fires.
|
||||||
|
DepsMet,
|
||||||
|
|
||||||
|
/// Coder is going to start running gates.
|
||||||
|
GatesStarted,
|
||||||
|
|
||||||
|
/// Gates passed normally — ready to merge. Carries the data needed to
|
||||||
|
/// construct Stage::Merge, so the transition can't produce a malformed merge state.
|
||||||
|
GatesPassed {
|
||||||
|
feature_branch: BranchName,
|
||||||
|
commits_ahead: NonZeroU32,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// Gates failed; coder will retry.
|
||||||
|
GatesFailed { reason: String },
|
||||||
|
|
||||||
|
/// QA mode is "server" — skip QA and go straight to merge.
|
||||||
|
QaSkipped {
|
||||||
|
feature_branch: BranchName,
|
||||||
|
commits_ahead: NonZeroU32,
|
||||||
|
},
|
||||||
|
|
||||||
|
/// Mergemaster successfully squashed and pushed to master.
|
||||||
|
MergeSucceeded { merge_commit: GitSha },
|
||||||
|
|
||||||
|
/// Mergemaster gave up after the retry budget.
|
||||||
|
MergeFailedFinal { reason: String },
|
||||||
|
|
||||||
|
/// User accepted a Done story (or auto-accept fired).
|
||||||
|
Accepted,
|
||||||
|
|
||||||
|
/// User explicitly blocked the story.
|
||||||
|
Block { reason: String },
|
||||||
|
|
||||||
|
/// User explicitly unblocked.
|
||||||
|
Unblock,
|
||||||
|
|
||||||
|
/// User explicitly abandoned.
|
||||||
|
Abandon,
|
||||||
|
|
||||||
|
/// User marked the story as superseded by another.
|
||||||
|
Supersede { by: StoryId },
|
||||||
|
|
||||||
|
/// User put the story on review hold.
|
||||||
|
ReviewHold { reason: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Transition errors ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub enum TransitionError {
|
||||||
|
/// The current stage doesn't accept this event.
|
||||||
|
InvalidTransition {
|
||||||
|
from_stage: String,
|
||||||
|
event: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── The transition function ──────────────────────────────────────────────────
|
||||||
|
//
|
||||||
|
// Pure function. Takes the current Stage and an event, returns the new Stage
|
||||||
|
// or a TransitionError. The compiler enforces that every constructed Stage
|
||||||
|
// has all required fields, so impossible destination states are unrepresentable.
|
||||||
|
//
|
||||||
|
// "What about the *side effects* of a transition?" — they don't go in here.
|
||||||
|
// transition() is pure. Side effects (matrix bot notifications, file writes,
|
||||||
|
// agent spawns, web UI broadcasts) are dispatched by an event bus that watches
|
||||||
|
// the (before, after) tuple. See the `EventBus` sketch further down.
|
||||||
|
|
||||||
|
pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, TransitionError> {
|
||||||
|
use PipelineEvent::*;
|
||||||
|
use Stage::*;
|
||||||
|
|
||||||
|
let stage_label = stage_label(&state);
|
||||||
|
let event_label = event_label(&event);
|
||||||
|
let invalid = || TransitionError::InvalidTransition {
|
||||||
|
from_stage: stage_label.to_string(),
|
||||||
|
event: event_label.to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let now = Utc::now();
|
||||||
|
|
||||||
|
match (state, event) {
|
||||||
|
// ── Forward path: backlog → current → (qa →) merge → done ──────────
|
||||||
|
(Backlog, DepsMet) => Ok(Coding),
|
||||||
|
(Coding, GatesStarted) => Ok(Qa),
|
||||||
|
(Coding, QaSkipped { feature_branch, commits_ahead }) => Ok(Merge {
|
||||||
|
feature_branch,
|
||||||
|
commits_ahead,
|
||||||
|
}),
|
||||||
|
(Qa, GatesPassed { feature_branch, commits_ahead }) => Ok(Merge {
|
||||||
|
feature_branch,
|
||||||
|
commits_ahead,
|
||||||
|
}),
|
||||||
|
// Gates failed → back to Coding for retry. (Retry-budget enforcement
|
||||||
|
// lives outside this function — it's accounting on the local side.)
|
||||||
|
(Qa, GatesFailed { .. }) => Ok(Coding),
|
||||||
|
(Merge { .. }, MergeSucceeded { merge_commit }) => Ok(Done {
|
||||||
|
merged_at: now,
|
||||||
|
merge_commit,
|
||||||
|
}),
|
||||||
|
|
||||||
|
// ── Done → Archived(Completed) ─────────────────────────────────────
|
||||||
|
(Done { .. }, Accepted) => Ok(Archived {
|
||||||
|
archived_at: now,
|
||||||
|
reason: ArchiveReason::Completed,
|
||||||
|
}),
|
||||||
|
|
||||||
|
// ── Stuck states (any active stage → Archived with a reason) ──────
|
||||||
|
(Backlog, Block { reason })
|
||||||
|
| (Coding, Block { reason })
|
||||||
|
| (Qa, Block { reason })
|
||||||
|
| (Merge { .. }, Block { reason }) => Ok(Archived {
|
||||||
|
archived_at: now,
|
||||||
|
reason: ArchiveReason::Blocked { reason },
|
||||||
|
}),
|
||||||
|
|
||||||
|
(Backlog, ReviewHold { reason })
|
||||||
|
| (Coding, ReviewHold { reason })
|
||||||
|
| (Qa, ReviewHold { reason })
|
||||||
|
| (Merge { .. }, ReviewHold { reason }) => Ok(Archived {
|
||||||
|
archived_at: now,
|
||||||
|
reason: ArchiveReason::ReviewHeld { reason },
|
||||||
|
}),
|
||||||
|
|
||||||
|
(Merge { .. }, MergeFailedFinal { reason }) => Ok(Archived {
|
||||||
|
archived_at: now,
|
||||||
|
reason: ArchiveReason::MergeFailed { reason },
|
||||||
|
}),
|
||||||
|
|
||||||
|
// ── Abandon / supersede from any active or done stage ──────────────
|
||||||
|
(Backlog, Abandon)
|
||||||
|
| (Coding, Abandon)
|
||||||
|
| (Qa, Abandon)
|
||||||
|
| (Merge { .. }, Abandon)
|
||||||
|
| (Done { .. }, Abandon) => Ok(Archived {
|
||||||
|
archived_at: now,
|
||||||
|
reason: ArchiveReason::Abandoned,
|
||||||
|
}),
|
||||||
|
|
||||||
|
(Backlog, Supersede { by })
|
||||||
|
| (Coding, Supersede { by })
|
||||||
|
| (Qa, Supersede { by })
|
||||||
|
| (Merge { .. }, Supersede { by })
|
||||||
|
| (Done { .. }, Supersede { by }) => Ok(Archived {
|
||||||
|
archived_at: now,
|
||||||
|
reason: ArchiveReason::Superseded { by },
|
||||||
|
}),
|
||||||
|
|
||||||
|
// ── Unblock: only from Archived(Blocked) → Backlog ─────────────────
|
||||||
|
(
|
||||||
|
Archived {
|
||||||
|
reason: ArchiveReason::Blocked { .. },
|
||||||
|
..
|
||||||
|
},
|
||||||
|
Unblock,
|
||||||
|
) => Ok(Backlog),
|
||||||
|
|
||||||
|
// ── Everything else is invalid ─────────────────────────────────────
|
||||||
|
_ => Err(invalid()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stage_label(s: &Stage) -> &'static str {
|
||||||
|
match s {
|
||||||
|
Stage::Backlog => "Backlog",
|
||||||
|
Stage::Coding => "Coding",
|
||||||
|
Stage::Qa => "Qa",
|
||||||
|
Stage::Merge { .. } => "Merge",
|
||||||
|
Stage::Done { .. } => "Done",
|
||||||
|
Stage::Archived { .. } => "Archived",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn event_label(e: &PipelineEvent) -> &'static str {
|
||||||
|
match e {
|
||||||
|
PipelineEvent::DepsMet => "DepsMet",
|
||||||
|
PipelineEvent::GatesStarted => "GatesStarted",
|
||||||
|
PipelineEvent::GatesPassed { .. } => "GatesPassed",
|
||||||
|
PipelineEvent::GatesFailed { .. } => "GatesFailed",
|
||||||
|
PipelineEvent::QaSkipped { .. } => "QaSkipped",
|
||||||
|
PipelineEvent::MergeSucceeded { .. } => "MergeSucceeded",
|
||||||
|
PipelineEvent::MergeFailedFinal { .. } => "MergeFailedFinal",
|
||||||
|
PipelineEvent::Accepted => "Accepted",
|
||||||
|
PipelineEvent::Block { .. } => "Block",
|
||||||
|
PipelineEvent::Unblock => "Unblock",
|
||||||
|
PipelineEvent::Abandon => "Abandon",
|
||||||
|
PipelineEvent::Supersede { .. } => "Supersede",
|
||||||
|
PipelineEvent::ReviewHold { .. } => "ReviewHold",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Per-node execution state machine ─────────────────────────────────────────
|
||||||
|
//
|
||||||
|
// Independent of the pipeline stage machine. Tracks "what is THIS node doing
|
||||||
|
// about this story right now." Multiple nodes can have different ExecutionState
|
||||||
|
// for the same story_id at the same time — and that's fine, because each node
|
||||||
|
// owns its own subspace in the CRDT.
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum ExecutionEvent {
|
||||||
|
SpawnRequested { agent: AgentName },
|
||||||
|
SpawnedSuccessfully,
|
||||||
|
Heartbeat,
|
||||||
|
HitRateLimit { resume_at: DateTime<Utc> },
|
||||||
|
Exited { exit_code: i32 },
|
||||||
|
Stopped,
|
||||||
|
Reset,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn execution_transition(
|
||||||
|
state: ExecutionState,
|
||||||
|
event: ExecutionEvent,
|
||||||
|
) -> Result<ExecutionState, TransitionError> {
|
||||||
|
use ExecutionEvent::*;
|
||||||
|
use ExecutionState::*;
|
||||||
|
|
||||||
|
let now = Utc::now();
|
||||||
|
|
||||||
|
match (state, event) {
|
||||||
|
(Idle, SpawnRequested { agent }) => Ok(Pending { agent, since: now }),
|
||||||
|
|
||||||
|
(Pending { agent, .. }, SpawnedSuccessfully) => Ok(Running {
|
||||||
|
agent,
|
||||||
|
started_at: now,
|
||||||
|
last_heartbeat: now,
|
||||||
|
}),
|
||||||
|
|
||||||
|
(
|
||||||
|
Running {
|
||||||
|
agent, started_at, ..
|
||||||
|
},
|
||||||
|
Heartbeat,
|
||||||
|
) => Ok(Running {
|
||||||
|
agent,
|
||||||
|
started_at,
|
||||||
|
last_heartbeat: now,
|
||||||
|
}),
|
||||||
|
|
||||||
|
(Running { agent, .. }, HitRateLimit { resume_at })
|
||||||
|
| (Pending { agent, .. }, HitRateLimit { resume_at }) => Ok(RateLimited { agent, resume_at }),
|
||||||
|
|
||||||
|
(RateLimited { agent, .. }, SpawnedSuccessfully) => Ok(Running {
|
||||||
|
agent,
|
||||||
|
started_at: now,
|
||||||
|
last_heartbeat: now,
|
||||||
|
}),
|
||||||
|
|
||||||
|
(Running { agent, .. }, Exited { exit_code })
|
||||||
|
| (Pending { agent, .. }, Exited { exit_code })
|
||||||
|
| (RateLimited { agent, .. }, Exited { exit_code }) => Ok(Completed {
|
||||||
|
agent,
|
||||||
|
exit_code,
|
||||||
|
completed_at: now,
|
||||||
|
}),
|
||||||
|
|
||||||
|
// Stop and Reset always return to Idle, from anywhere.
|
||||||
|
(_, Stopped) | (_, Reset) => Ok(Idle),
|
||||||
|
|
||||||
|
_ => Err(TransitionError::InvalidTransition {
|
||||||
|
from_stage: "ExecutionState".to_string(),
|
||||||
|
event: "<exec event>".to_string(),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Event bus sketch ─────────────────────────────────────────────────────────
|
||||||
|
//
|
||||||
|
// This is intentionally tiny — the goal is to show that the side-effect dispatch
|
||||||
|
// is *separable* from the transition function. Real implementation would use
|
||||||
|
// tokio broadcast channels or a proper event bus, but the pattern is the same.
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct TransitionFired {
|
||||||
|
pub story_id: StoryId,
|
||||||
|
pub before: Stage,
|
||||||
|
pub after: Stage,
|
||||||
|
pub event: PipelineEvent,
|
||||||
|
pub at: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait TransitionSubscriber: Send + Sync {
|
||||||
|
fn name(&self) -> &'static str;
|
||||||
|
fn on_transition(&self, fired: &TransitionFired);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct EventBus {
|
||||||
|
subscribers: Vec<Box<dyn TransitionSubscriber>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventBus {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
subscribers: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn subscribe<S: TransitionSubscriber + 'static>(&mut self, subscriber: S) {
|
||||||
|
self.subscribers.push(Box::new(subscriber));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn fire(&self, event: TransitionFired) {
|
||||||
|
for sub in &self.subscribers {
|
||||||
|
sub.on_transition(&event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for EventBus {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Example subscribers (just println! for the sketch):
|
||||||
|
|
||||||
|
pub struct MatrixBotSub;
|
||||||
|
impl TransitionSubscriber for MatrixBotSub {
|
||||||
|
fn name(&self) -> &'static str {
|
||||||
|
"matrix-bot"
|
||||||
|
}
|
||||||
|
fn on_transition(&self, f: &TransitionFired) {
|
||||||
|
println!(
|
||||||
|
" [matrix-bot] #{}: {} → {}",
|
||||||
|
f.story_id.0,
|
||||||
|
stage_label(&f.before),
|
||||||
|
stage_label(&f.after)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct FileRendererSub;
|
||||||
|
impl TransitionSubscriber for FileRendererSub {
|
||||||
|
fn name(&self) -> &'static str {
|
||||||
|
"filesystem"
|
||||||
|
}
|
||||||
|
fn on_transition(&self, f: &TransitionFired) {
|
||||||
|
println!(
|
||||||
|
" [filesystem] re-rendering .huskies/work/{}/{}.md",
|
||||||
|
stage_dir_name(&f.after),
|
||||||
|
f.story_id.0
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct PipelineItemsSub;
|
||||||
|
impl TransitionSubscriber for PipelineItemsSub {
|
||||||
|
fn name(&self) -> &'static str {
|
||||||
|
"pipeline-items"
|
||||||
|
}
|
||||||
|
fn on_transition(&self, f: &TransitionFired) {
|
||||||
|
println!(
|
||||||
|
" [pipeline-items] UPDATE pipeline_items SET stage = '{}' WHERE id = '{}'",
|
||||||
|
stage_dir_name(&f.after),
|
||||||
|
f.story_id.0
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stage_dir_name(s: &Stage) -> &'static str {
|
||||||
|
match s {
|
||||||
|
Stage::Backlog => "1_backlog",
|
||||||
|
Stage::Coding => "2_current",
|
||||||
|
Stage::Qa => "3_qa",
|
||||||
|
Stage::Merge { .. } => "4_merge",
|
||||||
|
Stage::Done { .. } => "5_done",
|
||||||
|
Stage::Archived { .. } => "6_archived",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Tests ────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn nz(n: u32) -> NonZeroU32 {
|
||||||
|
NonZeroU32::new(n).unwrap()
|
||||||
|
}
|
||||||
|
fn fb(name: &str) -> BranchName {
|
||||||
|
BranchName(name.to_string())
|
||||||
|
}
|
||||||
|
fn sha(s: &str) -> GitSha {
|
||||||
|
GitSha(s.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Happy path ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn happy_path_backlog_through_done() {
|
||||||
|
let s = Stage::Backlog;
|
||||||
|
let s = transition(s, PipelineEvent::DepsMet).unwrap();
|
||||||
|
assert!(matches!(s, Stage::Coding));
|
||||||
|
|
||||||
|
let s = transition(
|
||||||
|
s,
|
||||||
|
PipelineEvent::QaSkipped {
|
||||||
|
feature_branch: fb("feature/story-1"),
|
||||||
|
commits_ahead: nz(3),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert!(matches!(s, Stage::Merge { .. }));
|
||||||
|
|
||||||
|
let s = transition(
|
||||||
|
s,
|
||||||
|
PipelineEvent::MergeSucceeded {
|
||||||
|
merge_commit: sha("abc123"),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert!(matches!(s, Stage::Done { .. }));
|
||||||
|
|
||||||
|
let s = transition(s, PipelineEvent::Accepted).unwrap();
|
||||||
|
assert!(matches!(
|
||||||
|
s,
|
||||||
|
Stage::Archived {
|
||||||
|
reason: ArchiveReason::Completed,
|
||||||
|
..
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn qa_retry_loop() {
|
||||||
|
let s = Stage::Coding;
|
||||||
|
let s = transition(s, PipelineEvent::GatesStarted).unwrap();
|
||||||
|
assert!(matches!(s, Stage::Qa));
|
||||||
|
|
||||||
|
let s = transition(
|
||||||
|
s,
|
||||||
|
PipelineEvent::GatesFailed {
|
||||||
|
reason: "tests failed".into(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert!(matches!(s, Stage::Coding));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Bug 519 made unrepresentable: Merge with zero commits ahead ────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn merge_with_zero_commits_is_unrepresentable() {
|
||||||
|
// NonZeroU32::new(0) returns None — the type system literally refuses
|
||||||
|
// to construct a Merge state with no commits ahead of master. This is
|
||||||
|
// bug 519's "silent mergemaster no-op" gone, structurally.
|
||||||
|
assert!(NonZeroU32::new(0).is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Bug 502 made unrepresentable: agent on the wrong stage ─────────────
|
||||||
|
//
|
||||||
|
// There's nothing to test here at the *Stage* level, because Stage doesn't
|
||||||
|
// have an `agent` field at all. Agent assignment is per-node ExecutionState.
|
||||||
|
// The "coder agent on a Merge stage" failure mode from bug 502 cannot be
|
||||||
|
// expressed in this type system: a coder can attach to a story (writing to
|
||||||
|
// its node-local ExecutionState), but the Stage::Merge variant has no slot
|
||||||
|
// for an agent. The "wrong-stage agent" error is gone because the wrong
|
||||||
|
// state is unrepresentable.
|
||||||
|
|
||||||
|
// ── Invalid transitions return errors ──────────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cannot_jump_from_backlog_to_done() {
|
||||||
|
let s = Stage::Backlog;
|
||||||
|
let result = transition(s, PipelineEvent::Accepted);
|
||||||
|
assert!(matches!(
|
||||||
|
result,
|
||||||
|
Err(TransitionError::InvalidTransition { .. })
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cannot_unblock_a_done_story() {
|
||||||
|
let s = Stage::Done {
|
||||||
|
merged_at: Utc::now(),
|
||||||
|
merge_commit: sha("abc"),
|
||||||
|
};
|
||||||
|
let result = transition(s, PipelineEvent::Unblock);
|
||||||
|
assert!(matches!(
|
||||||
|
result,
|
||||||
|
Err(TransitionError::InvalidTransition { .. })
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cannot_unblock_a_review_held_story() {
|
||||||
|
// Unblock is specifically for Blocked, not for any Archived variant.
|
||||||
|
let s = Stage::Archived {
|
||||||
|
archived_at: Utc::now(),
|
||||||
|
reason: ArchiveReason::ReviewHeld {
|
||||||
|
reason: "TBD".into(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
let result = transition(s, PipelineEvent::Unblock);
|
||||||
|
assert!(matches!(
|
||||||
|
result,
|
||||||
|
Err(TransitionError::InvalidTransition { .. })
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Block from any active stage ────────────────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn block_from_any_active_stage() {
|
||||||
|
for s in [Stage::Backlog, Stage::Coding, Stage::Qa] {
|
||||||
|
let result = transition(
|
||||||
|
s.clone(),
|
||||||
|
PipelineEvent::Block {
|
||||||
|
reason: "stuck".into(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
assert!(matches!(
|
||||||
|
result,
|
||||||
|
Ok(Stage::Archived {
|
||||||
|
reason: ArchiveReason::Blocked { .. },
|
||||||
|
..
|
||||||
|
})
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also from Merge:
|
||||||
|
let m = Stage::Merge {
|
||||||
|
feature_branch: fb("f"),
|
||||||
|
commits_ahead: nz(1),
|
||||||
|
};
|
||||||
|
let result = transition(
|
||||||
|
m,
|
||||||
|
PipelineEvent::Block {
|
||||||
|
reason: "stuck".into(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
assert!(matches!(
|
||||||
|
result,
|
||||||
|
Ok(Stage::Archived {
|
||||||
|
reason: ArchiveReason::Blocked { .. },
|
||||||
|
..
|
||||||
|
})
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn unblock_returns_to_backlog() {
|
||||||
|
let s = Stage::Archived {
|
||||||
|
archived_at: Utc::now(),
|
||||||
|
reason: ArchiveReason::Blocked {
|
||||||
|
reason: "test".into(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
let result = transition(s, PipelineEvent::Unblock).unwrap();
|
||||||
|
assert!(matches!(result, Stage::Backlog));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Execution state ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn execution_happy_path() {
|
||||||
|
let e = ExecutionState::Idle;
|
||||||
|
let e = execution_transition(
|
||||||
|
e,
|
||||||
|
ExecutionEvent::SpawnRequested {
|
||||||
|
agent: AgentName("coder-1".into()),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert!(matches!(e, ExecutionState::Pending { .. }));
|
||||||
|
|
||||||
|
let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap();
|
||||||
|
assert!(matches!(e, ExecutionState::Running { .. }));
|
||||||
|
|
||||||
|
let e = execution_transition(e, ExecutionEvent::Heartbeat).unwrap();
|
||||||
|
assert!(matches!(e, ExecutionState::Running { .. }));
|
||||||
|
|
||||||
|
let e = execution_transition(e, ExecutionEvent::Exited { exit_code: 0 }).unwrap();
|
||||||
|
assert!(matches!(
|
||||||
|
e,
|
||||||
|
ExecutionState::Completed { exit_code: 0, .. }
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn execution_rate_limit_then_resume() {
|
||||||
|
let e = ExecutionState::Running {
|
||||||
|
agent: AgentName("coder-1".into()),
|
||||||
|
started_at: Utc::now(),
|
||||||
|
last_heartbeat: Utc::now(),
|
||||||
|
};
|
||||||
|
let e = execution_transition(
|
||||||
|
e,
|
||||||
|
ExecutionEvent::HitRateLimit {
|
||||||
|
resume_at: Utc::now() + chrono::Duration::minutes(5),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
assert!(matches!(e, ExecutionState::RateLimited { .. }));
|
||||||
|
|
||||||
|
let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap();
|
||||||
|
assert!(matches!(e, ExecutionState::Running { .. }));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn execution_stop_from_anywhere_returns_idle() {
|
||||||
|
let e = ExecutionState::Running {
|
||||||
|
agent: AgentName("coder-1".into()),
|
||||||
|
started_at: Utc::now(),
|
||||||
|
last_heartbeat: Utc::now(),
|
||||||
|
};
|
||||||
|
let e = execution_transition(e, ExecutionEvent::Stopped).unwrap();
|
||||||
|
assert!(matches!(e, ExecutionState::Idle));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── main: a quick interactive demo ───────────────────────────────────────────
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
println!("─── Pipeline state machine sketch (story 520) ───\n");
|
||||||
|
|
||||||
|
// Set up the event bus with three subscribers — one for each side effect.
|
||||||
|
let mut bus = EventBus::new();
|
||||||
|
bus.subscribe(MatrixBotSub);
|
||||||
|
bus.subscribe(PipelineItemsSub);
|
||||||
|
bus.subscribe(FileRendererSub);
|
||||||
|
|
||||||
|
let story_id = StoryId("100_story_demo".into());
|
||||||
|
|
||||||
|
// Helper to apply a transition + fire the bus.
|
||||||
|
let mut current_stage = Stage::Backlog;
|
||||||
|
let step = |bus: &EventBus,
|
||||||
|
stage: &mut Stage,
|
||||||
|
event: PipelineEvent|
|
||||||
|
-> Result<(), TransitionError> {
|
||||||
|
let before = stage.clone();
|
||||||
|
let after = transition(stage.clone(), event.clone())?;
|
||||||
|
bus.fire(TransitionFired {
|
||||||
|
story_id: story_id.clone(),
|
||||||
|
before,
|
||||||
|
after: after.clone(),
|
||||||
|
event,
|
||||||
|
at: Utc::now(),
|
||||||
|
});
|
||||||
|
*stage = after;
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
|
println!("Initial: {current_stage:?}\n");
|
||||||
|
|
||||||
|
println!("→ DepsMet");
|
||||||
|
step(&bus, &mut current_stage, PipelineEvent::DepsMet).unwrap();
|
||||||
|
println!();
|
||||||
|
|
||||||
|
println!("→ QaSkipped (qa: server, gates auto-pass)");
|
||||||
|
step(
|
||||||
|
&bus,
|
||||||
|
&mut current_stage,
|
||||||
|
PipelineEvent::QaSkipped {
|
||||||
|
feature_branch: BranchName("feature/story-100".into()),
|
||||||
|
commits_ahead: NonZeroU32::new(3).unwrap(),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
println!();
|
||||||
|
|
||||||
|
println!("→ MergeSucceeded");
|
||||||
|
step(
|
||||||
|
&bus,
|
||||||
|
&mut current_stage,
|
||||||
|
PipelineEvent::MergeSucceeded {
|
||||||
|
merge_commit: GitSha("abc1234".into()),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
println!();
|
||||||
|
|
||||||
|
println!("→ Accepted");
|
||||||
|
step(&bus, &mut current_stage, PipelineEvent::Accepted).unwrap();
|
||||||
|
println!();
|
||||||
|
|
||||||
|
println!("Final: {current_stage:?}\n");
|
||||||
|
|
||||||
|
println!("─── Trying an invalid transition: Done → Unblock ───");
|
||||||
|
let invalid_result = transition(current_stage.clone(), PipelineEvent::Unblock);
|
||||||
|
println!("Result: {invalid_result:?}");
|
||||||
|
}
|
||||||
@@ -0,0 +1,532 @@
|
|||||||
|
//! Pipeline state machine — design sketch (story 520) — STATIG version.
|
||||||
|
//!
|
||||||
|
//! Parallel to `pipeline_state_sketch_bare.rs`. Same domain types, same
|
||||||
|
//! transitions, same event semantics — but the state machine is built using
|
||||||
|
//! the `statig` crate (https://crates.io/crates/statig) instead of being
|
||||||
|
//! hand-rolled.
|
||||||
|
//!
|
||||||
|
//! Run with:
|
||||||
|
//! cargo run --example pipeline_state_sketch_statig -p huskies
|
||||||
|
//! Test with:
|
||||||
|
//! cargo test --example pipeline_state_sketch_statig -p huskies
|
||||||
|
//!
|
||||||
|
//! Why both versions?
|
||||||
|
//!
|
||||||
|
//! - The **bare** version shows that plain Rust enums + a transition function
|
||||||
|
//! are *enough* to make impossible states unrepresentable. No framework.
|
||||||
|
//! - The **statig** version shows what we'd gain by adopting a state-machine
|
||||||
|
//! crate: hierarchical states (the `active` superstate factors out the
|
||||||
|
//! cross-cutting Block/ReviewHold/Abandon/Supersede transitions, which the
|
||||||
|
//! bare version had to duplicate inline with `|` patterns), generated
|
||||||
|
//! `State` enum with type-safe data-carrying constructors, and stateful
|
||||||
|
//! `handle(&event)` dispatch. Type safety is preserved either way:
|
||||||
|
//! `State::merge(BranchName, NonZeroU32)` requires both args at the
|
||||||
|
//! constructor, just like `Stage::Merge { feature_branch, commits_ahead }`
|
||||||
|
//! in the bare version.
|
||||||
|
//!
|
||||||
|
//! Trade-off: statig adds a dependency and a proc-macro layer, which makes
|
||||||
|
//! the code harder to read for someone unfamiliar with the crate. The
|
||||||
|
//! framework-free version is more transparent but requires manual
|
||||||
|
//! pattern-matching and inline duplication for cross-cutting transitions.
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use statig::prelude::*;
|
||||||
|
use std::num::NonZeroU32;
|
||||||
|
|
||||||
|
// ── Newtypes (same as bare version) ──────────────────────────────────────────
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub struct StoryId(pub String);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub struct BranchName(pub String);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub struct GitSha(pub String);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub struct AgentName(pub String);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||||
|
pub struct NodePubkey(pub [u8; 32]);
|
||||||
|
|
||||||
|
// ── Archive reason (same as bare version) ────────────────────────────────────
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub enum ArchiveReason {
|
||||||
|
Completed,
|
||||||
|
Abandoned,
|
||||||
|
Superseded { by: StoryId },
|
||||||
|
Blocked { reason: String },
|
||||||
|
MergeFailed { reason: String },
|
||||||
|
ReviewHeld { reason: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Pipeline events (same as bare version) ───────────────────────────────────
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum PipelineEvent {
|
||||||
|
DepsMet,
|
||||||
|
GatesStarted,
|
||||||
|
GatesPassed {
|
||||||
|
feature_branch: BranchName,
|
||||||
|
commits_ahead: NonZeroU32,
|
||||||
|
},
|
||||||
|
GatesFailed {
|
||||||
|
reason: String,
|
||||||
|
},
|
||||||
|
QaSkipped {
|
||||||
|
feature_branch: BranchName,
|
||||||
|
commits_ahead: NonZeroU32,
|
||||||
|
},
|
||||||
|
MergeSucceeded {
|
||||||
|
merge_commit: GitSha,
|
||||||
|
},
|
||||||
|
MergeFailedFinal {
|
||||||
|
reason: String,
|
||||||
|
},
|
||||||
|
Accepted,
|
||||||
|
Block {
|
||||||
|
reason: String,
|
||||||
|
},
|
||||||
|
Unblock,
|
||||||
|
Abandon,
|
||||||
|
Supersede {
|
||||||
|
by: StoryId,
|
||||||
|
},
|
||||||
|
ReviewHold {
|
||||||
|
reason: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── The state machine ────────────────────────────────────────────────────────
|
||||||
|
//
|
||||||
|
// statig requires a "context" struct (the `Self` of the impl block). For us
|
||||||
|
// it's empty — all per-state data lives ON the state itself, carried forward
|
||||||
|
// by the auto-generated `State::xxx(...)` constructors.
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct PipelineMachine;
|
||||||
|
|
||||||
|
#[state_machine(
|
||||||
|
initial = "State::backlog()",
|
||||||
|
state(derive(Debug, Clone, PartialEq, Eq))
|
||||||
|
)]
|
||||||
|
impl PipelineMachine {
|
||||||
|
// ── Active stages: backlog, coding, qa, merge ────────────────────────
|
||||||
|
//
|
||||||
|
// Each is a child of the `active` superstate, which handles the
|
||||||
|
// cross-cutting transitions (Block / ReviewHold / Abandon / Supersede)
|
||||||
|
// exactly once instead of being duplicated per state.
|
||||||
|
|
||||||
|
#[state(superstate = "active")]
|
||||||
|
fn backlog(event: &PipelineEvent) -> Response<State> {
|
||||||
|
match event {
|
||||||
|
PipelineEvent::DepsMet => Transition(State::coding()),
|
||||||
|
_ => Super, // defer to `active` (and ultimately to "unhandled")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[state(superstate = "active")]
|
||||||
|
fn coding(event: &PipelineEvent) -> Response<State> {
|
||||||
|
match event {
|
||||||
|
PipelineEvent::GatesStarted => Transition(State::qa()),
|
||||||
|
PipelineEvent::QaSkipped {
|
||||||
|
feature_branch,
|
||||||
|
commits_ahead,
|
||||||
|
} => Transition(State::merge(feature_branch.clone(), *commits_ahead)),
|
||||||
|
_ => Super,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[state(superstate = "active")]
|
||||||
|
fn qa(event: &PipelineEvent) -> Response<State> {
|
||||||
|
match event {
|
||||||
|
PipelineEvent::GatesPassed {
|
||||||
|
feature_branch,
|
||||||
|
commits_ahead,
|
||||||
|
} => Transition(State::merge(feature_branch.clone(), *commits_ahead)),
|
||||||
|
PipelineEvent::GatesFailed { .. } => Transition(State::coding()),
|
||||||
|
_ => Super,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[state(superstate = "active")]
|
||||||
|
fn merge(
|
||||||
|
_feature_branch: &mut BranchName,
|
||||||
|
_commits_ahead: &mut NonZeroU32,
|
||||||
|
event: &PipelineEvent,
|
||||||
|
) -> Response<State> {
|
||||||
|
// Note: the type signature of this state function REQUIRES both
|
||||||
|
// _feature_branch and _commits_ahead. There is no way to construct
|
||||||
|
// a Merge state without them. NonZeroU32 makes "merge with zero
|
||||||
|
// commits ahead" structurally unrepresentable (bug 519 fixed by
|
||||||
|
// construction, same as the bare version).
|
||||||
|
//
|
||||||
|
// The fields are prefixed with `_` because this state function only
|
||||||
|
// transitions forward and doesn't read them — but they're available
|
||||||
|
// to inspect via the State::Merge variant generated by the macro.
|
||||||
|
match event {
|
||||||
|
PipelineEvent::MergeSucceeded { merge_commit } => Transition(State::done(
|
||||||
|
Utc::now(),
|
||||||
|
merge_commit.clone(),
|
||||||
|
)),
|
||||||
|
PipelineEvent::MergeFailedFinal { reason } => Transition(State::archived(
|
||||||
|
Utc::now(),
|
||||||
|
ArchiveReason::MergeFailed {
|
||||||
|
reason: reason.clone(),
|
||||||
|
},
|
||||||
|
)),
|
||||||
|
_ => Super,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Cross-cutting superstate ─────────────────────────────────────────
|
||||||
|
//
|
||||||
|
// This is the statig payoff: ONE place defines what Block/ReviewHold/
|
||||||
|
// Abandon/Supersede do across all four active stages. The bare version
|
||||||
|
// had to duplicate this with `|` patterns. Adding a new active stage
|
||||||
|
// here means just adding it as a child of `active`; the cross-cutting
|
||||||
|
// transitions come for free.
|
||||||
|
|
||||||
|
#[superstate]
|
||||||
|
fn active(event: &PipelineEvent) -> Response<State> {
|
||||||
|
let now = Utc::now();
|
||||||
|
match event {
|
||||||
|
PipelineEvent::Block { reason } => Transition(State::archived(
|
||||||
|
now,
|
||||||
|
ArchiveReason::Blocked {
|
||||||
|
reason: reason.clone(),
|
||||||
|
},
|
||||||
|
)),
|
||||||
|
PipelineEvent::ReviewHold { reason } => Transition(State::archived(
|
||||||
|
now,
|
||||||
|
ArchiveReason::ReviewHeld {
|
||||||
|
reason: reason.clone(),
|
||||||
|
},
|
||||||
|
)),
|
||||||
|
PipelineEvent::Abandon => {
|
||||||
|
Transition(State::archived(now, ArchiveReason::Abandoned))
|
||||||
|
}
|
||||||
|
PipelineEvent::Supersede { by } => Transition(State::archived(
|
||||||
|
now,
|
||||||
|
ArchiveReason::Superseded { by: by.clone() },
|
||||||
|
)),
|
||||||
|
_ => Handled, // unhandled events are silently ignored
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Done is special: it's not a child of `active` because Block and ──
|
||||||
|
// ── ReviewHold are NOT valid from Done (per the bare version's rules).
|
||||||
|
// ── Abandon and Supersede ARE valid, so we have to handle them inline.
|
||||||
|
|
||||||
|
#[state]
|
||||||
|
fn done(
|
||||||
|
merged_at: &mut DateTime<Utc>,
|
||||||
|
merge_commit: &mut GitSha,
|
||||||
|
event: &PipelineEvent,
|
||||||
|
) -> Response<State> {
|
||||||
|
let now = Utc::now();
|
||||||
|
let _ = merged_at; // currently unused; available for queries
|
||||||
|
let _ = merge_commit;
|
||||||
|
match event {
|
||||||
|
PipelineEvent::Accepted => {
|
||||||
|
Transition(State::archived(now, ArchiveReason::Completed))
|
||||||
|
}
|
||||||
|
PipelineEvent::Abandon => {
|
||||||
|
Transition(State::archived(now, ArchiveReason::Abandoned))
|
||||||
|
}
|
||||||
|
PipelineEvent::Supersede { by } => Transition(State::archived(
|
||||||
|
now,
|
||||||
|
ArchiveReason::Superseded { by: by.clone() },
|
||||||
|
)),
|
||||||
|
_ => Handled,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Archived is terminal except for Unblock from Blocked → Backlog ───
|
||||||
|
|
||||||
|
#[state]
|
||||||
|
fn archived(
|
||||||
|
archived_at: &mut DateTime<Utc>,
|
||||||
|
reason: &mut ArchiveReason,
|
||||||
|
event: &PipelineEvent,
|
||||||
|
) -> Response<State> {
|
||||||
|
let _ = archived_at;
|
||||||
|
match event {
|
||||||
|
PipelineEvent::Unblock => {
|
||||||
|
if matches!(reason, ArchiveReason::Blocked { .. }) {
|
||||||
|
Transition(State::backlog())
|
||||||
|
} else {
|
||||||
|
Handled // unblock only valid from Blocked
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => Handled,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Tests ────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn nz(n: u32) -> NonZeroU32 {
|
||||||
|
NonZeroU32::new(n).unwrap()
|
||||||
|
}
|
||||||
|
fn fb(name: &str) -> BranchName {
|
||||||
|
BranchName(name.to_string())
|
||||||
|
}
|
||||||
|
fn sha(s: &str) -> GitSha {
|
||||||
|
GitSha(s.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Happy path ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn happy_path_backlog_through_done() {
|
||||||
|
let mut sm = PipelineMachine.state_machine();
|
||||||
|
assert!(matches!(sm.state(), State::Backlog {}));
|
||||||
|
|
||||||
|
sm.handle(&PipelineEvent::DepsMet);
|
||||||
|
assert!(matches!(sm.state(), State::Coding {}));
|
||||||
|
|
||||||
|
sm.handle(&PipelineEvent::QaSkipped {
|
||||||
|
feature_branch: fb("feature/story-1"),
|
||||||
|
commits_ahead: nz(3),
|
||||||
|
});
|
||||||
|
assert!(matches!(sm.state(), State::Merge { .. }));
|
||||||
|
|
||||||
|
sm.handle(&PipelineEvent::MergeSucceeded {
|
||||||
|
merge_commit: sha("abc123"),
|
||||||
|
});
|
||||||
|
assert!(matches!(sm.state(), State::Done { .. }));
|
||||||
|
|
||||||
|
sm.handle(&PipelineEvent::Accepted);
|
||||||
|
assert!(matches!(
|
||||||
|
sm.state(),
|
||||||
|
State::Archived {
|
||||||
|
reason: ArchiveReason::Completed,
|
||||||
|
..
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn qa_retry_loop() {
|
||||||
|
let mut sm = PipelineMachine.state_machine();
|
||||||
|
sm.handle(&PipelineEvent::DepsMet);
|
||||||
|
sm.handle(&PipelineEvent::GatesStarted);
|
||||||
|
assert!(matches!(sm.state(), State::Qa {}));
|
||||||
|
|
||||||
|
sm.handle(&PipelineEvent::GatesFailed {
|
||||||
|
reason: "tests failed".into(),
|
||||||
|
});
|
||||||
|
assert!(matches!(sm.state(), State::Coding {}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Bug 519 unrepresentability: Merge with zero commits ahead ──────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn merge_with_zero_commits_is_unrepresentable() {
|
||||||
|
// Identical to the bare version: NonZeroU32::new(0) returns None,
|
||||||
|
// so a State::merge(branch, ZERO) literally cannot be constructed.
|
||||||
|
assert!(NonZeroU32::new(0).is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Cross-cutting Block from any active stage (superstate proves it) ───
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn block_from_backlog_via_superstate() {
|
||||||
|
let mut sm = PipelineMachine.state_machine();
|
||||||
|
sm.handle(&PipelineEvent::Block {
|
||||||
|
reason: "stuck".into(),
|
||||||
|
});
|
||||||
|
assert!(matches!(
|
||||||
|
sm.state(),
|
||||||
|
State::Archived {
|
||||||
|
reason: ArchiveReason::Blocked { .. },
|
||||||
|
..
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn block_from_coding_via_superstate() {
|
||||||
|
let mut sm = PipelineMachine.state_machine();
|
||||||
|
sm.handle(&PipelineEvent::DepsMet);
|
||||||
|
sm.handle(&PipelineEvent::Block {
|
||||||
|
reason: "stuck".into(),
|
||||||
|
});
|
||||||
|
assert!(matches!(
|
||||||
|
sm.state(),
|
||||||
|
State::Archived {
|
||||||
|
reason: ArchiveReason::Blocked { .. },
|
||||||
|
..
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn block_from_qa_via_superstate() {
|
||||||
|
let mut sm = PipelineMachine.state_machine();
|
||||||
|
sm.handle(&PipelineEvent::DepsMet);
|
||||||
|
sm.handle(&PipelineEvent::GatesStarted);
|
||||||
|
sm.handle(&PipelineEvent::Block {
|
||||||
|
reason: "stuck".into(),
|
||||||
|
});
|
||||||
|
assert!(matches!(
|
||||||
|
sm.state(),
|
||||||
|
State::Archived {
|
||||||
|
reason: ArchiveReason::Blocked { .. },
|
||||||
|
..
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn block_from_merge_via_superstate() {
|
||||||
|
let mut sm = PipelineMachine.state_machine();
|
||||||
|
sm.handle(&PipelineEvent::DepsMet);
|
||||||
|
sm.handle(&PipelineEvent::QaSkipped {
|
||||||
|
feature_branch: fb("f"),
|
||||||
|
commits_ahead: nz(1),
|
||||||
|
});
|
||||||
|
sm.handle(&PipelineEvent::Block {
|
||||||
|
reason: "stuck".into(),
|
||||||
|
});
|
||||||
|
assert!(matches!(
|
||||||
|
sm.state(),
|
||||||
|
State::Archived {
|
||||||
|
reason: ArchiveReason::Blocked { .. },
|
||||||
|
..
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Block from Done is NOT valid (Done isn't a child of `active`) ──────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn block_from_done_is_ignored() {
|
||||||
|
let mut sm = PipelineMachine.state_machine();
|
||||||
|
sm.handle(&PipelineEvent::DepsMet);
|
||||||
|
sm.handle(&PipelineEvent::QaSkipped {
|
||||||
|
feature_branch: fb("f"),
|
||||||
|
commits_ahead: nz(1),
|
||||||
|
});
|
||||||
|
sm.handle(&PipelineEvent::MergeSucceeded {
|
||||||
|
merge_commit: sha("abc"),
|
||||||
|
});
|
||||||
|
// Now in Done. Block should NOT transition us anywhere.
|
||||||
|
sm.handle(&PipelineEvent::Block {
|
||||||
|
reason: "stuck".into(),
|
||||||
|
});
|
||||||
|
assert!(matches!(sm.state(), State::Done { .. }));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Abandon from Done IS valid (handled inline in done()) ──────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn abandon_from_done_works() {
|
||||||
|
let mut sm = PipelineMachine.state_machine();
|
||||||
|
sm.handle(&PipelineEvent::DepsMet);
|
||||||
|
sm.handle(&PipelineEvent::QaSkipped {
|
||||||
|
feature_branch: fb("f"),
|
||||||
|
commits_ahead: nz(1),
|
||||||
|
});
|
||||||
|
sm.handle(&PipelineEvent::MergeSucceeded {
|
||||||
|
merge_commit: sha("abc"),
|
||||||
|
});
|
||||||
|
sm.handle(&PipelineEvent::Abandon);
|
||||||
|
assert!(matches!(
|
||||||
|
sm.state(),
|
||||||
|
State::Archived {
|
||||||
|
reason: ArchiveReason::Abandoned,
|
||||||
|
..
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Unblock from Archived(Blocked) → Backlog ───────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn unblock_returns_to_backlog() {
|
||||||
|
let mut sm = PipelineMachine.state_machine();
|
||||||
|
sm.handle(&PipelineEvent::Block {
|
||||||
|
reason: "test".into(),
|
||||||
|
});
|
||||||
|
assert!(matches!(
|
||||||
|
sm.state(),
|
||||||
|
State::Archived {
|
||||||
|
reason: ArchiveReason::Blocked { .. },
|
||||||
|
..
|
||||||
|
}
|
||||||
|
));
|
||||||
|
|
||||||
|
sm.handle(&PipelineEvent::Unblock);
|
||||||
|
assert!(matches!(sm.state(), State::Backlog {}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn unblock_from_review_held_does_nothing() {
|
||||||
|
// Unblock is specifically for Blocked, not for any Archived variant.
|
||||||
|
let mut sm = PipelineMachine.state_machine();
|
||||||
|
sm.handle(&PipelineEvent::ReviewHold {
|
||||||
|
reason: "TBD".into(),
|
||||||
|
});
|
||||||
|
// Now in Archived(ReviewHeld). Unblock should NOT transition.
|
||||||
|
sm.handle(&PipelineEvent::Unblock);
|
||||||
|
assert!(matches!(
|
||||||
|
sm.state(),
|
||||||
|
State::Archived {
|
||||||
|
reason: ArchiveReason::ReviewHeld { .. },
|
||||||
|
..
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── main: a quick interactive demo ───────────────────────────────────────────
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
println!("─── Pipeline state machine sketch (story 520) — STATIG version ───\n");
|
||||||
|
|
||||||
|
let mut sm = PipelineMachine.state_machine();
|
||||||
|
println!("Initial: {:?}\n", sm.state());
|
||||||
|
|
||||||
|
println!("→ DepsMet");
|
||||||
|
sm.handle(&PipelineEvent::DepsMet);
|
||||||
|
println!(" state: {:?}\n", sm.state());
|
||||||
|
|
||||||
|
println!("→ QaSkipped");
|
||||||
|
sm.handle(&PipelineEvent::QaSkipped {
|
||||||
|
feature_branch: BranchName("feature/story-100".into()),
|
||||||
|
commits_ahead: NonZeroU32::new(3).unwrap(),
|
||||||
|
});
|
||||||
|
println!(" state: {:?}\n", sm.state());
|
||||||
|
|
||||||
|
println!("→ MergeSucceeded");
|
||||||
|
sm.handle(&PipelineEvent::MergeSucceeded {
|
||||||
|
merge_commit: GitSha("abc1234".into()),
|
||||||
|
});
|
||||||
|
println!(" state: {:?}\n", sm.state());
|
||||||
|
|
||||||
|
println!("→ Accepted");
|
||||||
|
sm.handle(&PipelineEvent::Accepted);
|
||||||
|
println!(" state: {:?}\n", sm.state());
|
||||||
|
|
||||||
|
println!("─── Trying invalid transition: Done → Unblock ───");
|
||||||
|
let mut sm2 = PipelineMachine.state_machine();
|
||||||
|
sm2.handle(&PipelineEvent::DepsMet);
|
||||||
|
sm2.handle(&PipelineEvent::QaSkipped {
|
||||||
|
feature_branch: BranchName("feature/story-101".into()),
|
||||||
|
commits_ahead: NonZeroU32::new(2).unwrap(),
|
||||||
|
});
|
||||||
|
sm2.handle(&PipelineEvent::MergeSucceeded {
|
||||||
|
merge_commit: GitSha("def5678".into()),
|
||||||
|
});
|
||||||
|
println!(" before Unblock: {:?}", sm2.state());
|
||||||
|
sm2.handle(&PipelineEvent::Unblock); // silently ignored — no transition
|
||||||
|
println!(" after Unblock: {:?} (no change — Unblock is a no-op from Done)", sm2.state());
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user