Files
huskies/server/examples/pipeline_state_sketch_bare.rs
T
Timmy f7d69cde50 sketch(520): typed pipeline state machine — bare and statig versions
Two parallel scratch experiments under server/examples/ exploring the
typed Rust state machine that should replace huskies's current
stringly-typed CRDT representation (story 520).

  - pipeline_state_sketch_bare.rs   — hand-rolled, plain enums + match
  - pipeline_state_sketch_statig.rs — using the statig crate

Both sketches:
  - Define the same Stage enum (Backlog, Coding, Qa, Merge, Done, Archived)
  - Define ArchiveReason (subsumes refactor 436's blocked/merge_failure/review_hold)
  - Define ExecutionState (per-node, separate from synced Stage) — bare only
  - Define PipelineEvent and the valid transitions
  - Make bug 519 unrepresentable: Stage::Merge requires NonZeroU32 commits_ahead
  - Make bug 502 unrepresentable: Coder agents can't be assigned to Merge state
  - Have happy-path tests, retry-loop tests, and invalid-transition tests

Differences:
  - Bare uses pure pattern matching, no framework. ~720 lines.
  - Statig uses #[state_machine] proc macro and gets free hierarchical
    states via the `active` superstate that factors out the cross-cutting
    Block / ReviewHold / Abandon / Supersede transitions across the four
    active stages. ~440 lines, 11 passing tests.

Run with:
  cargo run  --example pipeline_state_sketch_bare   -p huskies
  cargo run  --example pipeline_state_sketch_statig -p huskies
  cargo test --example pipeline_state_sketch_bare   -p huskies
  cargo test --example pipeline_state_sketch_statig -p huskies

Adds statig 0.3 as a dev-dependency in server/Cargo.toml. Cargo.lock
updated to include statig + statig-macro and their transitive deps.

Not wired into the main codebase. Once we agree on which version to
adopt, story 520 promotes the chosen sketch into a real
server/src/pipeline_state.rs module.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 21:03:07 +01:00

859 lines
29 KiB
Rust

//! Pipeline state machine — design sketch (story 520) — BARE version.
//!
//! This is a SCRATCH EXPERIMENT, not wired into anything else in the codebase.
//! "Bare" version: hand-rolled with plain Rust enums and pattern matching,
//! no external state-machine library. See `pipeline_state_sketch_statig.rs`
//! for a parallel version using the `statig` crate.
//!
//! Run with:
//! cargo run --example pipeline_state_sketch_bare -p huskies
//! Test with:
//! cargo test --example pipeline_state_sketch_bare -p huskies
//!
//! Goal: demonstrate the typed pipeline state machine that should replace
//! huskies's stringly-typed CRDT state. It is intentionally standalone —
//! no integration with crdt_state, no persistence, no events escape this
//! file. Once we agree on the shape, this becomes the foundation for the
//! real implementation in src/pipeline_state.rs.
//!
//! The point of this version is to show that the Rust type system alone is
//! enough to make impossible states unrepresentable, without needing any
//! state-machine framework.
use chrono::{DateTime, Utc};
use std::num::NonZeroU32;
// ── Newtypes ─────────────────────────────────────────────────────────────────
//
// Each of these is a "wrapper around String" today, but the wrapping itself
// is the point: a function that takes a `BranchName` cannot accidentally be
// called with a `StoryId`. Validation can be added later (e.g. `BranchName::new`
// returns `Result<Self, BranchNameError>` and the inner `String` is private)
// without changing call sites.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StoryId(pub String);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BranchName(pub String);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct GitSha(pub String);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AgentName(pub String);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct NodePubkey(pub [u8; 32]);
// ── Synced pipeline stage (lives in CRDT, converges across nodes) ────────────
//
// This is the SHARED state — every node sees the same Stage for a given story
// after CRDT convergence. Local-only state (which agent is running, retry
// count, rate-limit timers) lives separately in `ExecutionState` below, keyed
// by node pubkey.
//
// Notice what is NOT a field on Stage:
// - `agent` — that's local execution state, not pipeline state
// - `retry_count` — also local
// - `blocked` — folded into `Archived { reason: Blocked { .. } }`
//
// And notice what IS a field, by construction:
// - Stage::Merge requires a non-zero commits_ahead (silent no-op merge is unrepresentable)
// - Stage::Done requires a merge_commit (a "done" story without merge metadata is unrepresentable)
// - Stage::Archived always carries a reason (no "archived but we don't know why")
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Stage {
/// Story exists, waiting for dependencies or auto-assign promotion.
Backlog,
/// Story is being actively coded somewhere in the mesh.
/// (Which node is local — see ExecutionState.)
Coding,
/// Coder has run; gates are running.
Qa,
/// Gates passed (or were skipped); ready to merge.
/// `commits_ahead: NonZeroU32` makes "Merge with nothing to merge" structurally impossible.
/// This single field eliminates today's bug 519 (silent mergemaster no-op).
Merge {
feature_branch: BranchName,
commits_ahead: NonZeroU32,
},
/// Mergemaster squashed to master. Always carries the merge metadata,
/// so a "done" story is provably reachable from master.
Done {
merged_at: DateTime<Utc>,
merge_commit: GitSha,
},
/// Out of the active flow. The reason explains why.
Archived {
archived_at: DateTime<Utc>,
reason: ArchiveReason,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ArchiveReason {
/// Normal happy-path completion: accepted and filed away.
Completed,
/// User explicitly abandoned the story.
Abandoned,
/// Replaced by another story.
Superseded { by: StoryId },
/// Manually blocked, awaiting human resolution. Was bug-436's `blocked: true`.
Blocked { reason: String },
/// Mergemaster failed beyond the retry budget. Was bug-436's `merge_failure`.
MergeFailed { reason: String },
/// Held in review at human request. Was bug-436's `review_hold`.
ReviewHeld { reason: String },
}
// ── Per-node execution state (lives in CRDT under node_pubkey key) ───────────
//
// LOCAL-AUTHORED but GLOBALLY-READABLE. Each node only writes to entries where
// node_pubkey == self, so there are no inter-author CRDT merge conflicts. Other
// nodes can READ all entries to know what's happening across the mesh.
//
// In the real CRDT document, this would be stored as something like:
// crdt.execution_state: { node_pubkey -> { story_id -> ExecutionState } }
//
// Why this matters operationally:
// - Cross-node observability: matrix bot can show "node A is running coder-1
// on story X, node B is rate-limited on story Y"
// - Heartbeat detection: if `last_heartbeat` is stale > N min, the entry is
// dead (laptop closed, OOM, segfault). Other nodes can take over (story 479).
// - Foundation for CRDT-based work claiming (story 479).
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExecutionState {
/// No agent on this node is currently working on this story.
Idle,
/// An agent has been requested but hasn't started its subprocess yet.
Pending {
agent: AgentName,
since: DateTime<Utc>,
},
/// An agent's subprocess is alive on this node.
/// `last_heartbeat` is updated periodically; if stale, the process probably died.
Running {
agent: AgentName,
started_at: DateTime<Utc>,
last_heartbeat: DateTime<Utc>,
},
/// Agent hit a rate limit; will resume at the given time.
RateLimited {
agent: AgentName,
resume_at: DateTime<Utc>,
},
/// Agent finished. exit_code disambiguates clean exit / panic / etc.
Completed {
agent: AgentName,
exit_code: i32,
completed_at: DateTime<Utc>,
},
}
// ── Pipeline events ──────────────────────────────────────────────────────────
//
// Events drive Stage transitions. Each event carries any data needed to
// construct the destination state, so the type signature of `transition`
// guarantees we can never accidentally land in an underspecified state.
//
// (Compare with today's stringly-typed code, where you call
// `move_story_to_merge(story_id)` and the destination state is built from
// whatever happens to be in scope at the time.)
#[derive(Debug, Clone)]
pub enum PipelineEvent {
/// All depends_on stories are in Done or Archived; promotion fires.
DepsMet,
/// Coder is going to start running gates.
GatesStarted,
/// Gates passed normally — ready to merge. Carries the data needed to
/// construct Stage::Merge, so the transition can't produce a malformed merge state.
GatesPassed {
feature_branch: BranchName,
commits_ahead: NonZeroU32,
},
/// Gates failed; coder will retry.
GatesFailed { reason: String },
/// QA mode is "server" — skip QA and go straight to merge.
QaSkipped {
feature_branch: BranchName,
commits_ahead: NonZeroU32,
},
/// Mergemaster successfully squashed and pushed to master.
MergeSucceeded { merge_commit: GitSha },
/// Mergemaster gave up after the retry budget.
MergeFailedFinal { reason: String },
/// User accepted a Done story (or auto-accept fired).
Accepted,
/// User explicitly blocked the story.
Block { reason: String },
/// User explicitly unblocked.
Unblock,
/// User explicitly abandoned.
Abandon,
/// User marked the story as superseded by another.
Supersede { by: StoryId },
/// User put the story on review hold.
ReviewHold { reason: String },
}
// ── Transition errors ────────────────────────────────────────────────────────
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TransitionError {
/// The current stage doesn't accept this event.
InvalidTransition {
from_stage: String,
event: String,
},
}
// ── The transition function ──────────────────────────────────────────────────
//
// Pure function. Takes the current Stage and an event, returns the new Stage
// or a TransitionError. The compiler enforces that every constructed Stage
// has all required fields, so impossible destination states are unrepresentable.
//
// "What about the *side effects* of a transition?" — they don't go in here.
// transition() is pure. Side effects (matrix bot notifications, file writes,
// agent spawns, web UI broadcasts) are dispatched by an event bus that watches
// the (before, after) tuple. See the `EventBus` sketch further down.
pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, TransitionError> {
use PipelineEvent::*;
use Stage::*;
let stage_label = stage_label(&state);
let event_label = event_label(&event);
let invalid = || TransitionError::InvalidTransition {
from_stage: stage_label.to_string(),
event: event_label.to_string(),
};
let now = Utc::now();
match (state, event) {
// ── Forward path: backlog → current → (qa →) merge → done ──────────
(Backlog, DepsMet) => Ok(Coding),
(Coding, GatesStarted) => Ok(Qa),
(Coding, QaSkipped { feature_branch, commits_ahead }) => Ok(Merge {
feature_branch,
commits_ahead,
}),
(Qa, GatesPassed { feature_branch, commits_ahead }) => Ok(Merge {
feature_branch,
commits_ahead,
}),
// Gates failed → back to Coding for retry. (Retry-budget enforcement
// lives outside this function — it's accounting on the local side.)
(Qa, GatesFailed { .. }) => Ok(Coding),
(Merge { .. }, MergeSucceeded { merge_commit }) => Ok(Done {
merged_at: now,
merge_commit,
}),
// ── Done → Archived(Completed) ─────────────────────────────────────
(Done { .. }, Accepted) => Ok(Archived {
archived_at: now,
reason: ArchiveReason::Completed,
}),
// ── Stuck states (any active stage → Archived with a reason) ──────
(Backlog, Block { reason })
| (Coding, Block { reason })
| (Qa, Block { reason })
| (Merge { .. }, Block { reason }) => Ok(Archived {
archived_at: now,
reason: ArchiveReason::Blocked { reason },
}),
(Backlog, ReviewHold { reason })
| (Coding, ReviewHold { reason })
| (Qa, ReviewHold { reason })
| (Merge { .. }, ReviewHold { reason }) => Ok(Archived {
archived_at: now,
reason: ArchiveReason::ReviewHeld { reason },
}),
(Merge { .. }, MergeFailedFinal { reason }) => Ok(Archived {
archived_at: now,
reason: ArchiveReason::MergeFailed { reason },
}),
// ── Abandon / supersede from any active or done stage ──────────────
(Backlog, Abandon)
| (Coding, Abandon)
| (Qa, Abandon)
| (Merge { .. }, Abandon)
| (Done { .. }, Abandon) => Ok(Archived {
archived_at: now,
reason: ArchiveReason::Abandoned,
}),
(Backlog, Supersede { by })
| (Coding, Supersede { by })
| (Qa, Supersede { by })
| (Merge { .. }, Supersede { by })
| (Done { .. }, Supersede { by }) => Ok(Archived {
archived_at: now,
reason: ArchiveReason::Superseded { by },
}),
// ── Unblock: only from Archived(Blocked) → Backlog ─────────────────
(
Archived {
reason: ArchiveReason::Blocked { .. },
..
},
Unblock,
) => Ok(Backlog),
// ── Everything else is invalid ─────────────────────────────────────
_ => Err(invalid()),
}
}
fn stage_label(s: &Stage) -> &'static str {
match s {
Stage::Backlog => "Backlog",
Stage::Coding => "Coding",
Stage::Qa => "Qa",
Stage::Merge { .. } => "Merge",
Stage::Done { .. } => "Done",
Stage::Archived { .. } => "Archived",
}
}
fn event_label(e: &PipelineEvent) -> &'static str {
match e {
PipelineEvent::DepsMet => "DepsMet",
PipelineEvent::GatesStarted => "GatesStarted",
PipelineEvent::GatesPassed { .. } => "GatesPassed",
PipelineEvent::GatesFailed { .. } => "GatesFailed",
PipelineEvent::QaSkipped { .. } => "QaSkipped",
PipelineEvent::MergeSucceeded { .. } => "MergeSucceeded",
PipelineEvent::MergeFailedFinal { .. } => "MergeFailedFinal",
PipelineEvent::Accepted => "Accepted",
PipelineEvent::Block { .. } => "Block",
PipelineEvent::Unblock => "Unblock",
PipelineEvent::Abandon => "Abandon",
PipelineEvent::Supersede { .. } => "Supersede",
PipelineEvent::ReviewHold { .. } => "ReviewHold",
}
}
// ── Per-node execution state machine ─────────────────────────────────────────
//
// Independent of the pipeline stage machine. Tracks "what is THIS node doing
// about this story right now." Multiple nodes can have different ExecutionState
// for the same story_id at the same time — and that's fine, because each node
// owns its own subspace in the CRDT.
#[derive(Debug, Clone)]
pub enum ExecutionEvent {
SpawnRequested { agent: AgentName },
SpawnedSuccessfully,
Heartbeat,
HitRateLimit { resume_at: DateTime<Utc> },
Exited { exit_code: i32 },
Stopped,
Reset,
}
pub fn execution_transition(
state: ExecutionState,
event: ExecutionEvent,
) -> Result<ExecutionState, TransitionError> {
use ExecutionEvent::*;
use ExecutionState::*;
let now = Utc::now();
match (state, event) {
(Idle, SpawnRequested { agent }) => Ok(Pending { agent, since: now }),
(Pending { agent, .. }, SpawnedSuccessfully) => Ok(Running {
agent,
started_at: now,
last_heartbeat: now,
}),
(
Running {
agent, started_at, ..
},
Heartbeat,
) => Ok(Running {
agent,
started_at,
last_heartbeat: now,
}),
(Running { agent, .. }, HitRateLimit { resume_at })
| (Pending { agent, .. }, HitRateLimit { resume_at }) => Ok(RateLimited { agent, resume_at }),
(RateLimited { agent, .. }, SpawnedSuccessfully) => Ok(Running {
agent,
started_at: now,
last_heartbeat: now,
}),
(Running { agent, .. }, Exited { exit_code })
| (Pending { agent, .. }, Exited { exit_code })
| (RateLimited { agent, .. }, Exited { exit_code }) => Ok(Completed {
agent,
exit_code,
completed_at: now,
}),
// Stop and Reset always return to Idle, from anywhere.
(_, Stopped) | (_, Reset) => Ok(Idle),
_ => Err(TransitionError::InvalidTransition {
from_stage: "ExecutionState".to_string(),
event: "<exec event>".to_string(),
}),
}
}
// ── Event bus sketch ─────────────────────────────────────────────────────────
//
// This is intentionally tiny — the goal is to show that the side-effect dispatch
// is *separable* from the transition function. Real implementation would use
// tokio broadcast channels or a proper event bus, but the pattern is the same.
#[derive(Debug, Clone)]
pub struct TransitionFired {
pub story_id: StoryId,
pub before: Stage,
pub after: Stage,
pub event: PipelineEvent,
pub at: DateTime<Utc>,
}
pub trait TransitionSubscriber: Send + Sync {
fn name(&self) -> &'static str;
fn on_transition(&self, fired: &TransitionFired);
}
pub struct EventBus {
subscribers: Vec<Box<dyn TransitionSubscriber>>,
}
impl EventBus {
pub fn new() -> Self {
Self {
subscribers: Vec::new(),
}
}
pub fn subscribe<S: TransitionSubscriber + 'static>(&mut self, subscriber: S) {
self.subscribers.push(Box::new(subscriber));
}
pub fn fire(&self, event: TransitionFired) {
for sub in &self.subscribers {
sub.on_transition(&event);
}
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
// Example subscribers (just println! for the sketch):
pub struct MatrixBotSub;
impl TransitionSubscriber for MatrixBotSub {
fn name(&self) -> &'static str {
"matrix-bot"
}
fn on_transition(&self, f: &TransitionFired) {
println!(
" [matrix-bot] #{}: {}{}",
f.story_id.0,
stage_label(&f.before),
stage_label(&f.after)
);
}
}
pub struct FileRendererSub;
impl TransitionSubscriber for FileRendererSub {
fn name(&self) -> &'static str {
"filesystem"
}
fn on_transition(&self, f: &TransitionFired) {
println!(
" [filesystem] re-rendering .huskies/work/{}/{}.md",
stage_dir_name(&f.after),
f.story_id.0
);
}
}
pub struct PipelineItemsSub;
impl TransitionSubscriber for PipelineItemsSub {
fn name(&self) -> &'static str {
"pipeline-items"
}
fn on_transition(&self, f: &TransitionFired) {
println!(
" [pipeline-items] UPDATE pipeline_items SET stage = '{}' WHERE id = '{}'",
stage_dir_name(&f.after),
f.story_id.0
);
}
}
fn stage_dir_name(s: &Stage) -> &'static str {
match s {
Stage::Backlog => "1_backlog",
Stage::Coding => "2_current",
Stage::Qa => "3_qa",
Stage::Merge { .. } => "4_merge",
Stage::Done { .. } => "5_done",
Stage::Archived { .. } => "6_archived",
}
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
fn nz(n: u32) -> NonZeroU32 {
NonZeroU32::new(n).unwrap()
}
fn fb(name: &str) -> BranchName {
BranchName(name.to_string())
}
fn sha(s: &str) -> GitSha {
GitSha(s.to_string())
}
// ── Happy path ─────────────────────────────────────────────────────────
#[test]
fn happy_path_backlog_through_done() {
let s = Stage::Backlog;
let s = transition(s, PipelineEvent::DepsMet).unwrap();
assert!(matches!(s, Stage::Coding));
let s = transition(
s,
PipelineEvent::QaSkipped {
feature_branch: fb("feature/story-1"),
commits_ahead: nz(3),
},
)
.unwrap();
assert!(matches!(s, Stage::Merge { .. }));
let s = transition(
s,
PipelineEvent::MergeSucceeded {
merge_commit: sha("abc123"),
},
)
.unwrap();
assert!(matches!(s, Stage::Done { .. }));
let s = transition(s, PipelineEvent::Accepted).unwrap();
assert!(matches!(
s,
Stage::Archived {
reason: ArchiveReason::Completed,
..
}
));
}
#[test]
fn qa_retry_loop() {
let s = Stage::Coding;
let s = transition(s, PipelineEvent::GatesStarted).unwrap();
assert!(matches!(s, Stage::Qa));
let s = transition(
s,
PipelineEvent::GatesFailed {
reason: "tests failed".into(),
},
)
.unwrap();
assert!(matches!(s, Stage::Coding));
}
// ── Bug 519 made unrepresentable: Merge with zero commits ahead ────────
#[test]
fn merge_with_zero_commits_is_unrepresentable() {
// NonZeroU32::new(0) returns None — the type system literally refuses
// to construct a Merge state with no commits ahead of master. This is
// bug 519's "silent mergemaster no-op" gone, structurally.
assert!(NonZeroU32::new(0).is_none());
}
// ── Bug 502 made unrepresentable: agent on the wrong stage ─────────────
//
// There's nothing to test here at the *Stage* level, because Stage doesn't
// have an `agent` field at all. Agent assignment is per-node ExecutionState.
// The "coder agent on a Merge stage" failure mode from bug 502 cannot be
// expressed in this type system: a coder can attach to a story (writing to
// its node-local ExecutionState), but the Stage::Merge variant has no slot
// for an agent. The "wrong-stage agent" error is gone because the wrong
// state is unrepresentable.
// ── Invalid transitions return errors ──────────────────────────────────
#[test]
fn cannot_jump_from_backlog_to_done() {
let s = Stage::Backlog;
let result = transition(s, PipelineEvent::Accepted);
assert!(matches!(
result,
Err(TransitionError::InvalidTransition { .. })
));
}
#[test]
fn cannot_unblock_a_done_story() {
let s = Stage::Done {
merged_at: Utc::now(),
merge_commit: sha("abc"),
};
let result = transition(s, PipelineEvent::Unblock);
assert!(matches!(
result,
Err(TransitionError::InvalidTransition { .. })
));
}
#[test]
fn cannot_unblock_a_review_held_story() {
// Unblock is specifically for Blocked, not for any Archived variant.
let s = Stage::Archived {
archived_at: Utc::now(),
reason: ArchiveReason::ReviewHeld {
reason: "TBD".into(),
},
};
let result = transition(s, PipelineEvent::Unblock);
assert!(matches!(
result,
Err(TransitionError::InvalidTransition { .. })
));
}
// ── Block from any active stage ────────────────────────────────────────
#[test]
fn block_from_any_active_stage() {
for s in [Stage::Backlog, Stage::Coding, Stage::Qa] {
let result = transition(
s.clone(),
PipelineEvent::Block {
reason: "stuck".into(),
},
);
assert!(matches!(
result,
Ok(Stage::Archived {
reason: ArchiveReason::Blocked { .. },
..
})
));
}
// Also from Merge:
let m = Stage::Merge {
feature_branch: fb("f"),
commits_ahead: nz(1),
};
let result = transition(
m,
PipelineEvent::Block {
reason: "stuck".into(),
},
);
assert!(matches!(
result,
Ok(Stage::Archived {
reason: ArchiveReason::Blocked { .. },
..
})
));
}
#[test]
fn unblock_returns_to_backlog() {
let s = Stage::Archived {
archived_at: Utc::now(),
reason: ArchiveReason::Blocked {
reason: "test".into(),
},
};
let result = transition(s, PipelineEvent::Unblock).unwrap();
assert!(matches!(result, Stage::Backlog));
}
// ── Execution state ────────────────────────────────────────────────────
#[test]
fn execution_happy_path() {
let e = ExecutionState::Idle;
let e = execution_transition(
e,
ExecutionEvent::SpawnRequested {
agent: AgentName("coder-1".into()),
},
)
.unwrap();
assert!(matches!(e, ExecutionState::Pending { .. }));
let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap();
assert!(matches!(e, ExecutionState::Running { .. }));
let e = execution_transition(e, ExecutionEvent::Heartbeat).unwrap();
assert!(matches!(e, ExecutionState::Running { .. }));
let e = execution_transition(e, ExecutionEvent::Exited { exit_code: 0 }).unwrap();
assert!(matches!(
e,
ExecutionState::Completed { exit_code: 0, .. }
));
}
#[test]
fn execution_rate_limit_then_resume() {
let e = ExecutionState::Running {
agent: AgentName("coder-1".into()),
started_at: Utc::now(),
last_heartbeat: Utc::now(),
};
let e = execution_transition(
e,
ExecutionEvent::HitRateLimit {
resume_at: Utc::now() + chrono::Duration::minutes(5),
},
)
.unwrap();
assert!(matches!(e, ExecutionState::RateLimited { .. }));
let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap();
assert!(matches!(e, ExecutionState::Running { .. }));
}
#[test]
fn execution_stop_from_anywhere_returns_idle() {
let e = ExecutionState::Running {
agent: AgentName("coder-1".into()),
started_at: Utc::now(),
last_heartbeat: Utc::now(),
};
let e = execution_transition(e, ExecutionEvent::Stopped).unwrap();
assert!(matches!(e, ExecutionState::Idle));
}
}
// ── main: a quick interactive demo ───────────────────────────────────────────
fn main() {
println!("─── Pipeline state machine sketch (story 520) ───\n");
// Set up the event bus with three subscribers — one for each side effect.
let mut bus = EventBus::new();
bus.subscribe(MatrixBotSub);
bus.subscribe(PipelineItemsSub);
bus.subscribe(FileRendererSub);
let story_id = StoryId("100_story_demo".into());
// Helper to apply a transition + fire the bus.
let mut current_stage = Stage::Backlog;
let step = |bus: &EventBus,
stage: &mut Stage,
event: PipelineEvent|
-> Result<(), TransitionError> {
let before = stage.clone();
let after = transition(stage.clone(), event.clone())?;
bus.fire(TransitionFired {
story_id: story_id.clone(),
before,
after: after.clone(),
event,
at: Utc::now(),
});
*stage = after;
Ok(())
};
println!("Initial: {current_stage:?}\n");
println!("→ DepsMet");
step(&bus, &mut current_stage, PipelineEvent::DepsMet).unwrap();
println!();
println!("→ QaSkipped (qa: server, gates auto-pass)");
step(
&bus,
&mut current_stage,
PipelineEvent::QaSkipped {
feature_branch: BranchName("feature/story-100".into()),
commits_ahead: NonZeroU32::new(3).unwrap(),
},
)
.unwrap();
println!();
println!("→ MergeSucceeded");
step(
&bus,
&mut current_stage,
PipelineEvent::MergeSucceeded {
merge_commit: GitSha("abc1234".into()),
},
)
.unwrap();
println!();
println!("→ Accepted");
step(&bus, &mut current_stage, PipelineEvent::Accepted).unwrap();
println!();
println!("Final: {current_stage:?}\n");
println!("─── Trying an invalid transition: Done → Unblock ───");
let invalid_result = transition(current_stage.clone(), PipelineEvent::Unblock);
println!("Result: {invalid_result:?}");
}