diff --git a/Cargo.lock b/Cargo.lock index 162d38c2..1b034e9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2331,6 +2331,7 @@ dependencies = [ "serde_yaml", "sha2 0.11.0", "sqlx", + "statig", "strip-ansi-escapes", "tempfile", "tokio", @@ -4097,6 +4098,30 @@ dependencies = [ "toml_edit 0.25.10+spec-1.1.0", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro-error-attr2" version = "2.0.0" @@ -5722,6 +5747,27 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "statig" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42c467cc59664639bf70b8225b1b4a9c30d926f3e010c29e804bf940d618c663" +dependencies = [ + "statig_macro", +] + +[[package]] +name = "statig_macro" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4c61563b68df6e452ceece3fba1329c8c6a5d348fe17b0778fada28bc95fde" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "string_cache" version = "0.8.9" diff --git a/server/Cargo.toml b/server/Cargo.toml index 2357758f..d3cf2867 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -55,3 +55,5 @@ check-cfg = ["cfg(feature, values(\"logging-base\"))"] tempfile = { workspace = true } mockito = "1" filetime = { workspace = true } +# For the pipeline_state_sketch_statig example only. +statig = "0.3" diff --git a/server/examples/pipeline_state_sketch_bare.rs b/server/examples/pipeline_state_sketch_bare.rs new file mode 100644 index 00000000..2dd241fa --- /dev/null +++ b/server/examples/pipeline_state_sketch_bare.rs @@ -0,0 +1,858 @@ +//! Pipeline state machine — design sketch (story 520) — BARE version. +//! +//! This is a SCRATCH EXPERIMENT, not wired into anything else in the codebase. +//! "Bare" version: hand-rolled with plain Rust enums and pattern matching, +//! no external state-machine library. See `pipeline_state_sketch_statig.rs` +//! for a parallel version using the `statig` crate. +//! +//! Run with: +//! cargo run --example pipeline_state_sketch_bare -p huskies +//! Test with: +//! cargo test --example pipeline_state_sketch_bare -p huskies +//! +//! Goal: demonstrate the typed pipeline state machine that should replace +//! huskies's stringly-typed CRDT state. It is intentionally standalone — +//! no integration with crdt_state, no persistence, no events escape this +//! file. Once we agree on the shape, this becomes the foundation for the +//! real implementation in src/pipeline_state.rs. +//! +//! The point of this version is to show that the Rust type system alone is +//! enough to make impossible states unrepresentable, without needing any +//! state-machine framework. + +use chrono::{DateTime, Utc}; +use std::num::NonZeroU32; + +// ── Newtypes ───────────────────────────────────────────────────────────────── +// +// Each of these is a "wrapper around String" today, but the wrapping itself +// is the point: a function that takes a `BranchName` cannot accidentally be +// called with a `StoryId`. Validation can be added later (e.g. `BranchName::new` +// returns `Result` and the inner `String` is private) +// without changing call sites. + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct StoryId(pub String); + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct BranchName(pub String); + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct GitSha(pub String); + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct AgentName(pub String); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct NodePubkey(pub [u8; 32]); + +// ── Synced pipeline stage (lives in CRDT, converges across nodes) ──────────── +// +// This is the SHARED state — every node sees the same Stage for a given story +// after CRDT convergence. Local-only state (which agent is running, retry +// count, rate-limit timers) lives separately in `ExecutionState` below, keyed +// by node pubkey. +// +// Notice what is NOT a field on Stage: +// - `agent` — that's local execution state, not pipeline state +// - `retry_count` — also local +// - `blocked` — folded into `Archived { reason: Blocked { .. } }` +// +// And notice what IS a field, by construction: +// - Stage::Merge requires a non-zero commits_ahead (silent no-op merge is unrepresentable) +// - Stage::Done requires a merge_commit (a "done" story without merge metadata is unrepresentable) +// - Stage::Archived always carries a reason (no "archived but we don't know why") + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Stage { + /// Story exists, waiting for dependencies or auto-assign promotion. + Backlog, + + /// Story is being actively coded somewhere in the mesh. + /// (Which node is local — see ExecutionState.) + Coding, + + /// Coder has run; gates are running. + Qa, + + /// Gates passed (or were skipped); ready to merge. + /// `commits_ahead: NonZeroU32` makes "Merge with nothing to merge" structurally impossible. + /// This single field eliminates today's bug 519 (silent mergemaster no-op). + Merge { + feature_branch: BranchName, + commits_ahead: NonZeroU32, + }, + + /// Mergemaster squashed to master. Always carries the merge metadata, + /// so a "done" story is provably reachable from master. + Done { + merged_at: DateTime, + merge_commit: GitSha, + }, + + /// Out of the active flow. The reason explains why. + Archived { + archived_at: DateTime, + reason: ArchiveReason, + }, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ArchiveReason { + /// Normal happy-path completion: accepted and filed away. + Completed, + /// User explicitly abandoned the story. + Abandoned, + /// Replaced by another story. + Superseded { by: StoryId }, + /// Manually blocked, awaiting human resolution. Was bug-436's `blocked: true`. + Blocked { reason: String }, + /// Mergemaster failed beyond the retry budget. Was bug-436's `merge_failure`. + MergeFailed { reason: String }, + /// Held in review at human request. Was bug-436's `review_hold`. + ReviewHeld { reason: String }, +} + +// ── Per-node execution state (lives in CRDT under node_pubkey key) ─────────── +// +// LOCAL-AUTHORED but GLOBALLY-READABLE. Each node only writes to entries where +// node_pubkey == self, so there are no inter-author CRDT merge conflicts. Other +// nodes can READ all entries to know what's happening across the mesh. +// +// In the real CRDT document, this would be stored as something like: +// crdt.execution_state: { node_pubkey -> { story_id -> ExecutionState } } +// +// Why this matters operationally: +// - Cross-node observability: matrix bot can show "node A is running coder-1 +// on story X, node B is rate-limited on story Y" +// - Heartbeat detection: if `last_heartbeat` is stale > N min, the entry is +// dead (laptop closed, OOM, segfault). Other nodes can take over (story 479). +// - Foundation for CRDT-based work claiming (story 479). + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ExecutionState { + /// No agent on this node is currently working on this story. + Idle, + + /// An agent has been requested but hasn't started its subprocess yet. + Pending { + agent: AgentName, + since: DateTime, + }, + + /// An agent's subprocess is alive on this node. + /// `last_heartbeat` is updated periodically; if stale, the process probably died. + Running { + agent: AgentName, + started_at: DateTime, + last_heartbeat: DateTime, + }, + + /// Agent hit a rate limit; will resume at the given time. + RateLimited { + agent: AgentName, + resume_at: DateTime, + }, + + /// Agent finished. exit_code disambiguates clean exit / panic / etc. + Completed { + agent: AgentName, + exit_code: i32, + completed_at: DateTime, + }, +} + +// ── Pipeline events ────────────────────────────────────────────────────────── +// +// Events drive Stage transitions. Each event carries any data needed to +// construct the destination state, so the type signature of `transition` +// guarantees we can never accidentally land in an underspecified state. +// +// (Compare with today's stringly-typed code, where you call +// `move_story_to_merge(story_id)` and the destination state is built from +// whatever happens to be in scope at the time.) + +#[derive(Debug, Clone)] +pub enum PipelineEvent { + /// All depends_on stories are in Done or Archived; promotion fires. + DepsMet, + + /// Coder is going to start running gates. + GatesStarted, + + /// Gates passed normally — ready to merge. Carries the data needed to + /// construct Stage::Merge, so the transition can't produce a malformed merge state. + GatesPassed { + feature_branch: BranchName, + commits_ahead: NonZeroU32, + }, + + /// Gates failed; coder will retry. + GatesFailed { reason: String }, + + /// QA mode is "server" — skip QA and go straight to merge. + QaSkipped { + feature_branch: BranchName, + commits_ahead: NonZeroU32, + }, + + /// Mergemaster successfully squashed and pushed to master. + MergeSucceeded { merge_commit: GitSha }, + + /// Mergemaster gave up after the retry budget. + MergeFailedFinal { reason: String }, + + /// User accepted a Done story (or auto-accept fired). + Accepted, + + /// User explicitly blocked the story. + Block { reason: String }, + + /// User explicitly unblocked. + Unblock, + + /// User explicitly abandoned. + Abandon, + + /// User marked the story as superseded by another. + Supersede { by: StoryId }, + + /// User put the story on review hold. + ReviewHold { reason: String }, +} + +// ── Transition errors ──────────────────────────────────────────────────────── + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum TransitionError { + /// The current stage doesn't accept this event. + InvalidTransition { + from_stage: String, + event: String, + }, +} + +// ── The transition function ────────────────────────────────────────────────── +// +// Pure function. Takes the current Stage and an event, returns the new Stage +// or a TransitionError. The compiler enforces that every constructed Stage +// has all required fields, so impossible destination states are unrepresentable. +// +// "What about the *side effects* of a transition?" — they don't go in here. +// transition() is pure. Side effects (matrix bot notifications, file writes, +// agent spawns, web UI broadcasts) are dispatched by an event bus that watches +// the (before, after) tuple. See the `EventBus` sketch further down. + +pub fn transition(state: Stage, event: PipelineEvent) -> Result { + use PipelineEvent::*; + use Stage::*; + + let stage_label = stage_label(&state); + let event_label = event_label(&event); + let invalid = || TransitionError::InvalidTransition { + from_stage: stage_label.to_string(), + event: event_label.to_string(), + }; + + let now = Utc::now(); + + match (state, event) { + // ── Forward path: backlog → current → (qa →) merge → done ────────── + (Backlog, DepsMet) => Ok(Coding), + (Coding, GatesStarted) => Ok(Qa), + (Coding, QaSkipped { feature_branch, commits_ahead }) => Ok(Merge { + feature_branch, + commits_ahead, + }), + (Qa, GatesPassed { feature_branch, commits_ahead }) => Ok(Merge { + feature_branch, + commits_ahead, + }), + // Gates failed → back to Coding for retry. (Retry-budget enforcement + // lives outside this function — it's accounting on the local side.) + (Qa, GatesFailed { .. }) => Ok(Coding), + (Merge { .. }, MergeSucceeded { merge_commit }) => Ok(Done { + merged_at: now, + merge_commit, + }), + + // ── Done → Archived(Completed) ───────────────────────────────────── + (Done { .. }, Accepted) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::Completed, + }), + + // ── Stuck states (any active stage → Archived with a reason) ────── + (Backlog, Block { reason }) + | (Coding, Block { reason }) + | (Qa, Block { reason }) + | (Merge { .. }, Block { reason }) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::Blocked { reason }, + }), + + (Backlog, ReviewHold { reason }) + | (Coding, ReviewHold { reason }) + | (Qa, ReviewHold { reason }) + | (Merge { .. }, ReviewHold { reason }) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::ReviewHeld { reason }, + }), + + (Merge { .. }, MergeFailedFinal { reason }) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::MergeFailed { reason }, + }), + + // ── Abandon / supersede from any active or done stage ────────────── + (Backlog, Abandon) + | (Coding, Abandon) + | (Qa, Abandon) + | (Merge { .. }, Abandon) + | (Done { .. }, Abandon) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::Abandoned, + }), + + (Backlog, Supersede { by }) + | (Coding, Supersede { by }) + | (Qa, Supersede { by }) + | (Merge { .. }, Supersede { by }) + | (Done { .. }, Supersede { by }) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::Superseded { by }, + }), + + // ── Unblock: only from Archived(Blocked) → Backlog ───────────────── + ( + Archived { + reason: ArchiveReason::Blocked { .. }, + .. + }, + Unblock, + ) => Ok(Backlog), + + // ── Everything else is invalid ───────────────────────────────────── + _ => Err(invalid()), + } +} + +fn stage_label(s: &Stage) -> &'static str { + match s { + Stage::Backlog => "Backlog", + Stage::Coding => "Coding", + Stage::Qa => "Qa", + Stage::Merge { .. } => "Merge", + Stage::Done { .. } => "Done", + Stage::Archived { .. } => "Archived", + } +} + +fn event_label(e: &PipelineEvent) -> &'static str { + match e { + PipelineEvent::DepsMet => "DepsMet", + PipelineEvent::GatesStarted => "GatesStarted", + PipelineEvent::GatesPassed { .. } => "GatesPassed", + PipelineEvent::GatesFailed { .. } => "GatesFailed", + PipelineEvent::QaSkipped { .. } => "QaSkipped", + PipelineEvent::MergeSucceeded { .. } => "MergeSucceeded", + PipelineEvent::MergeFailedFinal { .. } => "MergeFailedFinal", + PipelineEvent::Accepted => "Accepted", + PipelineEvent::Block { .. } => "Block", + PipelineEvent::Unblock => "Unblock", + PipelineEvent::Abandon => "Abandon", + PipelineEvent::Supersede { .. } => "Supersede", + PipelineEvent::ReviewHold { .. } => "ReviewHold", + } +} + +// ── Per-node execution state machine ───────────────────────────────────────── +// +// Independent of the pipeline stage machine. Tracks "what is THIS node doing +// about this story right now." Multiple nodes can have different ExecutionState +// for the same story_id at the same time — and that's fine, because each node +// owns its own subspace in the CRDT. + +#[derive(Debug, Clone)] +pub enum ExecutionEvent { + SpawnRequested { agent: AgentName }, + SpawnedSuccessfully, + Heartbeat, + HitRateLimit { resume_at: DateTime }, + Exited { exit_code: i32 }, + Stopped, + Reset, +} + +pub fn execution_transition( + state: ExecutionState, + event: ExecutionEvent, +) -> Result { + use ExecutionEvent::*; + use ExecutionState::*; + + let now = Utc::now(); + + match (state, event) { + (Idle, SpawnRequested { agent }) => Ok(Pending { agent, since: now }), + + (Pending { agent, .. }, SpawnedSuccessfully) => Ok(Running { + agent, + started_at: now, + last_heartbeat: now, + }), + + ( + Running { + agent, started_at, .. + }, + Heartbeat, + ) => Ok(Running { + agent, + started_at, + last_heartbeat: now, + }), + + (Running { agent, .. }, HitRateLimit { resume_at }) + | (Pending { agent, .. }, HitRateLimit { resume_at }) => Ok(RateLimited { agent, resume_at }), + + (RateLimited { agent, .. }, SpawnedSuccessfully) => Ok(Running { + agent, + started_at: now, + last_heartbeat: now, + }), + + (Running { agent, .. }, Exited { exit_code }) + | (Pending { agent, .. }, Exited { exit_code }) + | (RateLimited { agent, .. }, Exited { exit_code }) => Ok(Completed { + agent, + exit_code, + completed_at: now, + }), + + // Stop and Reset always return to Idle, from anywhere. + (_, Stopped) | (_, Reset) => Ok(Idle), + + _ => Err(TransitionError::InvalidTransition { + from_stage: "ExecutionState".to_string(), + event: "".to_string(), + }), + } +} + +// ── Event bus sketch ───────────────────────────────────────────────────────── +// +// This is intentionally tiny — the goal is to show that the side-effect dispatch +// is *separable* from the transition function. Real implementation would use +// tokio broadcast channels or a proper event bus, but the pattern is the same. + +#[derive(Debug, Clone)] +pub struct TransitionFired { + pub story_id: StoryId, + pub before: Stage, + pub after: Stage, + pub event: PipelineEvent, + pub at: DateTime, +} + +pub trait TransitionSubscriber: Send + Sync { + fn name(&self) -> &'static str; + fn on_transition(&self, fired: &TransitionFired); +} + +pub struct EventBus { + subscribers: Vec>, +} + +impl EventBus { + pub fn new() -> Self { + Self { + subscribers: Vec::new(), + } + } + + pub fn subscribe(&mut self, subscriber: S) { + self.subscribers.push(Box::new(subscriber)); + } + + pub fn fire(&self, event: TransitionFired) { + for sub in &self.subscribers { + sub.on_transition(&event); + } + } +} + +impl Default for EventBus { + fn default() -> Self { + Self::new() + } +} + +// Example subscribers (just println! for the sketch): + +pub struct MatrixBotSub; +impl TransitionSubscriber for MatrixBotSub { + fn name(&self) -> &'static str { + "matrix-bot" + } + fn on_transition(&self, f: &TransitionFired) { + println!( + " [matrix-bot] #{}: {} → {}", + f.story_id.0, + stage_label(&f.before), + stage_label(&f.after) + ); + } +} + +pub struct FileRendererSub; +impl TransitionSubscriber for FileRendererSub { + fn name(&self) -> &'static str { + "filesystem" + } + fn on_transition(&self, f: &TransitionFired) { + println!( + " [filesystem] re-rendering .huskies/work/{}/{}.md", + stage_dir_name(&f.after), + f.story_id.0 + ); + } +} + +pub struct PipelineItemsSub; +impl TransitionSubscriber for PipelineItemsSub { + fn name(&self) -> &'static str { + "pipeline-items" + } + fn on_transition(&self, f: &TransitionFired) { + println!( + " [pipeline-items] UPDATE pipeline_items SET stage = '{}' WHERE id = '{}'", + stage_dir_name(&f.after), + f.story_id.0 + ); + } +} + +fn stage_dir_name(s: &Stage) -> &'static str { + match s { + Stage::Backlog => "1_backlog", + Stage::Coding => "2_current", + Stage::Qa => "3_qa", + Stage::Merge { .. } => "4_merge", + Stage::Done { .. } => "5_done", + Stage::Archived { .. } => "6_archived", + } +} + +// ── Tests ──────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + fn nz(n: u32) -> NonZeroU32 { + NonZeroU32::new(n).unwrap() + } + fn fb(name: &str) -> BranchName { + BranchName(name.to_string()) + } + fn sha(s: &str) -> GitSha { + GitSha(s.to_string()) + } + + // ── Happy path ───────────────────────────────────────────────────────── + + #[test] + fn happy_path_backlog_through_done() { + let s = Stage::Backlog; + let s = transition(s, PipelineEvent::DepsMet).unwrap(); + assert!(matches!(s, Stage::Coding)); + + let s = transition( + s, + PipelineEvent::QaSkipped { + feature_branch: fb("feature/story-1"), + commits_ahead: nz(3), + }, + ) + .unwrap(); + assert!(matches!(s, Stage::Merge { .. })); + + let s = transition( + s, + PipelineEvent::MergeSucceeded { + merge_commit: sha("abc123"), + }, + ) + .unwrap(); + assert!(matches!(s, Stage::Done { .. })); + + let s = transition(s, PipelineEvent::Accepted).unwrap(); + assert!(matches!( + s, + Stage::Archived { + reason: ArchiveReason::Completed, + .. + } + )); + } + + #[test] + fn qa_retry_loop() { + let s = Stage::Coding; + let s = transition(s, PipelineEvent::GatesStarted).unwrap(); + assert!(matches!(s, Stage::Qa)); + + let s = transition( + s, + PipelineEvent::GatesFailed { + reason: "tests failed".into(), + }, + ) + .unwrap(); + assert!(matches!(s, Stage::Coding)); + } + + // ── Bug 519 made unrepresentable: Merge with zero commits ahead ──────── + + #[test] + fn merge_with_zero_commits_is_unrepresentable() { + // NonZeroU32::new(0) returns None — the type system literally refuses + // to construct a Merge state with no commits ahead of master. This is + // bug 519's "silent mergemaster no-op" gone, structurally. + assert!(NonZeroU32::new(0).is_none()); + } + + // ── Bug 502 made unrepresentable: agent on the wrong stage ───────────── + // + // There's nothing to test here at the *Stage* level, because Stage doesn't + // have an `agent` field at all. Agent assignment is per-node ExecutionState. + // The "coder agent on a Merge stage" failure mode from bug 502 cannot be + // expressed in this type system: a coder can attach to a story (writing to + // its node-local ExecutionState), but the Stage::Merge variant has no slot + // for an agent. The "wrong-stage agent" error is gone because the wrong + // state is unrepresentable. + + // ── Invalid transitions return errors ────────────────────────────────── + + #[test] + fn cannot_jump_from_backlog_to_done() { + let s = Stage::Backlog; + let result = transition(s, PipelineEvent::Accepted); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); + } + + #[test] + fn cannot_unblock_a_done_story() { + let s = Stage::Done { + merged_at: Utc::now(), + merge_commit: sha("abc"), + }; + let result = transition(s, PipelineEvent::Unblock); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); + } + + #[test] + fn cannot_unblock_a_review_held_story() { + // Unblock is specifically for Blocked, not for any Archived variant. + let s = Stage::Archived { + archived_at: Utc::now(), + reason: ArchiveReason::ReviewHeld { + reason: "TBD".into(), + }, + }; + let result = transition(s, PipelineEvent::Unblock); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); + } + + // ── Block from any active stage ──────────────────────────────────────── + + #[test] + fn block_from_any_active_stage() { + for s in [Stage::Backlog, Stage::Coding, Stage::Qa] { + let result = transition( + s.clone(), + PipelineEvent::Block { + reason: "stuck".into(), + }, + ); + assert!(matches!( + result, + Ok(Stage::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + }) + )); + } + + // Also from Merge: + let m = Stage::Merge { + feature_branch: fb("f"), + commits_ahead: nz(1), + }; + let result = transition( + m, + PipelineEvent::Block { + reason: "stuck".into(), + }, + ); + assert!(matches!( + result, + Ok(Stage::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + }) + )); + } + + #[test] + fn unblock_returns_to_backlog() { + let s = Stage::Archived { + archived_at: Utc::now(), + reason: ArchiveReason::Blocked { + reason: "test".into(), + }, + }; + let result = transition(s, PipelineEvent::Unblock).unwrap(); + assert!(matches!(result, Stage::Backlog)); + } + + // ── Execution state ──────────────────────────────────────────────────── + + #[test] + fn execution_happy_path() { + let e = ExecutionState::Idle; + let e = execution_transition( + e, + ExecutionEvent::SpawnRequested { + agent: AgentName("coder-1".into()), + }, + ) + .unwrap(); + assert!(matches!(e, ExecutionState::Pending { .. })); + + let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap(); + assert!(matches!(e, ExecutionState::Running { .. })); + + let e = execution_transition(e, ExecutionEvent::Heartbeat).unwrap(); + assert!(matches!(e, ExecutionState::Running { .. })); + + let e = execution_transition(e, ExecutionEvent::Exited { exit_code: 0 }).unwrap(); + assert!(matches!( + e, + ExecutionState::Completed { exit_code: 0, .. } + )); + } + + #[test] + fn execution_rate_limit_then_resume() { + let e = ExecutionState::Running { + agent: AgentName("coder-1".into()), + started_at: Utc::now(), + last_heartbeat: Utc::now(), + }; + let e = execution_transition( + e, + ExecutionEvent::HitRateLimit { + resume_at: Utc::now() + chrono::Duration::minutes(5), + }, + ) + .unwrap(); + assert!(matches!(e, ExecutionState::RateLimited { .. })); + + let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap(); + assert!(matches!(e, ExecutionState::Running { .. })); + } + + #[test] + fn execution_stop_from_anywhere_returns_idle() { + let e = ExecutionState::Running { + agent: AgentName("coder-1".into()), + started_at: Utc::now(), + last_heartbeat: Utc::now(), + }; + let e = execution_transition(e, ExecutionEvent::Stopped).unwrap(); + assert!(matches!(e, ExecutionState::Idle)); + } +} + +// ── main: a quick interactive demo ─────────────────────────────────────────── + +fn main() { + println!("─── Pipeline state machine sketch (story 520) ───\n"); + + // Set up the event bus with three subscribers — one for each side effect. + let mut bus = EventBus::new(); + bus.subscribe(MatrixBotSub); + bus.subscribe(PipelineItemsSub); + bus.subscribe(FileRendererSub); + + let story_id = StoryId("100_story_demo".into()); + + // Helper to apply a transition + fire the bus. + let mut current_stage = Stage::Backlog; + let step = |bus: &EventBus, + stage: &mut Stage, + event: PipelineEvent| + -> Result<(), TransitionError> { + let before = stage.clone(); + let after = transition(stage.clone(), event.clone())?; + bus.fire(TransitionFired { + story_id: story_id.clone(), + before, + after: after.clone(), + event, + at: Utc::now(), + }); + *stage = after; + Ok(()) + }; + + println!("Initial: {current_stage:?}\n"); + + println!("→ DepsMet"); + step(&bus, &mut current_stage, PipelineEvent::DepsMet).unwrap(); + println!(); + + println!("→ QaSkipped (qa: server, gates auto-pass)"); + step( + &bus, + &mut current_stage, + PipelineEvent::QaSkipped { + feature_branch: BranchName("feature/story-100".into()), + commits_ahead: NonZeroU32::new(3).unwrap(), + }, + ) + .unwrap(); + println!(); + + println!("→ MergeSucceeded"); + step( + &bus, + &mut current_stage, + PipelineEvent::MergeSucceeded { + merge_commit: GitSha("abc1234".into()), + }, + ) + .unwrap(); + println!(); + + println!("→ Accepted"); + step(&bus, &mut current_stage, PipelineEvent::Accepted).unwrap(); + println!(); + + println!("Final: {current_stage:?}\n"); + + println!("─── Trying an invalid transition: Done → Unblock ───"); + let invalid_result = transition(current_stage.clone(), PipelineEvent::Unblock); + println!("Result: {invalid_result:?}"); +} diff --git a/server/examples/pipeline_state_sketch_statig.rs b/server/examples/pipeline_state_sketch_statig.rs new file mode 100644 index 00000000..5b751f70 --- /dev/null +++ b/server/examples/pipeline_state_sketch_statig.rs @@ -0,0 +1,532 @@ +//! Pipeline state machine — design sketch (story 520) — STATIG version. +//! +//! Parallel to `pipeline_state_sketch_bare.rs`. Same domain types, same +//! transitions, same event semantics — but the state machine is built using +//! the `statig` crate (https://crates.io/crates/statig) instead of being +//! hand-rolled. +//! +//! Run with: +//! cargo run --example pipeline_state_sketch_statig -p huskies +//! Test with: +//! cargo test --example pipeline_state_sketch_statig -p huskies +//! +//! Why both versions? +//! +//! - The **bare** version shows that plain Rust enums + a transition function +//! are *enough* to make impossible states unrepresentable. No framework. +//! - The **statig** version shows what we'd gain by adopting a state-machine +//! crate: hierarchical states (the `active` superstate factors out the +//! cross-cutting Block/ReviewHold/Abandon/Supersede transitions, which the +//! bare version had to duplicate inline with `|` patterns), generated +//! `State` enum with type-safe data-carrying constructors, and stateful +//! `handle(&event)` dispatch. Type safety is preserved either way: +//! `State::merge(BranchName, NonZeroU32)` requires both args at the +//! constructor, just like `Stage::Merge { feature_branch, commits_ahead }` +//! in the bare version. +//! +//! Trade-off: statig adds a dependency and a proc-macro layer, which makes +//! the code harder to read for someone unfamiliar with the crate. The +//! framework-free version is more transparent but requires manual +//! pattern-matching and inline duplication for cross-cutting transitions. + +use chrono::{DateTime, Utc}; +use statig::prelude::*; +use std::num::NonZeroU32; + +// ── Newtypes (same as bare version) ────────────────────────────────────────── + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct StoryId(pub String); + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct BranchName(pub String); + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct GitSha(pub String); + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct AgentName(pub String); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct NodePubkey(pub [u8; 32]); + +// ── Archive reason (same as bare version) ──────────────────────────────────── + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ArchiveReason { + Completed, + Abandoned, + Superseded { by: StoryId }, + Blocked { reason: String }, + MergeFailed { reason: String }, + ReviewHeld { reason: String }, +} + +// ── Pipeline events (same as bare version) ─────────────────────────────────── + +#[derive(Debug, Clone)] +pub enum PipelineEvent { + DepsMet, + GatesStarted, + GatesPassed { + feature_branch: BranchName, + commits_ahead: NonZeroU32, + }, + GatesFailed { + reason: String, + }, + QaSkipped { + feature_branch: BranchName, + commits_ahead: NonZeroU32, + }, + MergeSucceeded { + merge_commit: GitSha, + }, + MergeFailedFinal { + reason: String, + }, + Accepted, + Block { + reason: String, + }, + Unblock, + Abandon, + Supersede { + by: StoryId, + }, + ReviewHold { + reason: String, + }, +} + +// ── The state machine ──────────────────────────────────────────────────────── +// +// statig requires a "context" struct (the `Self` of the impl block). For us +// it's empty — all per-state data lives ON the state itself, carried forward +// by the auto-generated `State::xxx(...)` constructors. + +#[derive(Default)] +pub struct PipelineMachine; + +#[state_machine( + initial = "State::backlog()", + state(derive(Debug, Clone, PartialEq, Eq)) +)] +impl PipelineMachine { + // ── Active stages: backlog, coding, qa, merge ──────────────────────── + // + // Each is a child of the `active` superstate, which handles the + // cross-cutting transitions (Block / ReviewHold / Abandon / Supersede) + // exactly once instead of being duplicated per state. + + #[state(superstate = "active")] + fn backlog(event: &PipelineEvent) -> Response { + match event { + PipelineEvent::DepsMet => Transition(State::coding()), + _ => Super, // defer to `active` (and ultimately to "unhandled") + } + } + + #[state(superstate = "active")] + fn coding(event: &PipelineEvent) -> Response { + match event { + PipelineEvent::GatesStarted => Transition(State::qa()), + PipelineEvent::QaSkipped { + feature_branch, + commits_ahead, + } => Transition(State::merge(feature_branch.clone(), *commits_ahead)), + _ => Super, + } + } + + #[state(superstate = "active")] + fn qa(event: &PipelineEvent) -> Response { + match event { + PipelineEvent::GatesPassed { + feature_branch, + commits_ahead, + } => Transition(State::merge(feature_branch.clone(), *commits_ahead)), + PipelineEvent::GatesFailed { .. } => Transition(State::coding()), + _ => Super, + } + } + + #[state(superstate = "active")] + fn merge( + _feature_branch: &mut BranchName, + _commits_ahead: &mut NonZeroU32, + event: &PipelineEvent, + ) -> Response { + // Note: the type signature of this state function REQUIRES both + // _feature_branch and _commits_ahead. There is no way to construct + // a Merge state without them. NonZeroU32 makes "merge with zero + // commits ahead" structurally unrepresentable (bug 519 fixed by + // construction, same as the bare version). + // + // The fields are prefixed with `_` because this state function only + // transitions forward and doesn't read them — but they're available + // to inspect via the State::Merge variant generated by the macro. + match event { + PipelineEvent::MergeSucceeded { merge_commit } => Transition(State::done( + Utc::now(), + merge_commit.clone(), + )), + PipelineEvent::MergeFailedFinal { reason } => Transition(State::archived( + Utc::now(), + ArchiveReason::MergeFailed { + reason: reason.clone(), + }, + )), + _ => Super, + } + } + + // ── Cross-cutting superstate ───────────────────────────────────────── + // + // This is the statig payoff: ONE place defines what Block/ReviewHold/ + // Abandon/Supersede do across all four active stages. The bare version + // had to duplicate this with `|` patterns. Adding a new active stage + // here means just adding it as a child of `active`; the cross-cutting + // transitions come for free. + + #[superstate] + fn active(event: &PipelineEvent) -> Response { + let now = Utc::now(); + match event { + PipelineEvent::Block { reason } => Transition(State::archived( + now, + ArchiveReason::Blocked { + reason: reason.clone(), + }, + )), + PipelineEvent::ReviewHold { reason } => Transition(State::archived( + now, + ArchiveReason::ReviewHeld { + reason: reason.clone(), + }, + )), + PipelineEvent::Abandon => { + Transition(State::archived(now, ArchiveReason::Abandoned)) + } + PipelineEvent::Supersede { by } => Transition(State::archived( + now, + ArchiveReason::Superseded { by: by.clone() }, + )), + _ => Handled, // unhandled events are silently ignored + } + } + + // ── Done is special: it's not a child of `active` because Block and ── + // ── ReviewHold are NOT valid from Done (per the bare version's rules). + // ── Abandon and Supersede ARE valid, so we have to handle them inline. + + #[state] + fn done( + merged_at: &mut DateTime, + merge_commit: &mut GitSha, + event: &PipelineEvent, + ) -> Response { + let now = Utc::now(); + let _ = merged_at; // currently unused; available for queries + let _ = merge_commit; + match event { + PipelineEvent::Accepted => { + Transition(State::archived(now, ArchiveReason::Completed)) + } + PipelineEvent::Abandon => { + Transition(State::archived(now, ArchiveReason::Abandoned)) + } + PipelineEvent::Supersede { by } => Transition(State::archived( + now, + ArchiveReason::Superseded { by: by.clone() }, + )), + _ => Handled, + } + } + + // ── Archived is terminal except for Unblock from Blocked → Backlog ─── + + #[state] + fn archived( + archived_at: &mut DateTime, + reason: &mut ArchiveReason, + event: &PipelineEvent, + ) -> Response { + let _ = archived_at; + match event { + PipelineEvent::Unblock => { + if matches!(reason, ArchiveReason::Blocked { .. }) { + Transition(State::backlog()) + } else { + Handled // unblock only valid from Blocked + } + } + _ => Handled, + } + } +} + +// ── Tests ──────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + fn nz(n: u32) -> NonZeroU32 { + NonZeroU32::new(n).unwrap() + } + fn fb(name: &str) -> BranchName { + BranchName(name.to_string()) + } + fn sha(s: &str) -> GitSha { + GitSha(s.to_string()) + } + + // ── Happy path ───────────────────────────────────────────────────────── + + #[test] + fn happy_path_backlog_through_done() { + let mut sm = PipelineMachine.state_machine(); + assert!(matches!(sm.state(), State::Backlog {})); + + sm.handle(&PipelineEvent::DepsMet); + assert!(matches!(sm.state(), State::Coding {})); + + sm.handle(&PipelineEvent::QaSkipped { + feature_branch: fb("feature/story-1"), + commits_ahead: nz(3), + }); + assert!(matches!(sm.state(), State::Merge { .. })); + + sm.handle(&PipelineEvent::MergeSucceeded { + merge_commit: sha("abc123"), + }); + assert!(matches!(sm.state(), State::Done { .. })); + + sm.handle(&PipelineEvent::Accepted); + assert!(matches!( + sm.state(), + State::Archived { + reason: ArchiveReason::Completed, + .. + } + )); + } + + #[test] + fn qa_retry_loop() { + let mut sm = PipelineMachine.state_machine(); + sm.handle(&PipelineEvent::DepsMet); + sm.handle(&PipelineEvent::GatesStarted); + assert!(matches!(sm.state(), State::Qa {})); + + sm.handle(&PipelineEvent::GatesFailed { + reason: "tests failed".into(), + }); + assert!(matches!(sm.state(), State::Coding {})); + } + + // ── Bug 519 unrepresentability: Merge with zero commits ahead ────────── + + #[test] + fn merge_with_zero_commits_is_unrepresentable() { + // Identical to the bare version: NonZeroU32::new(0) returns None, + // so a State::merge(branch, ZERO) literally cannot be constructed. + assert!(NonZeroU32::new(0).is_none()); + } + + // ── Cross-cutting Block from any active stage (superstate proves it) ─── + + #[test] + fn block_from_backlog_via_superstate() { + let mut sm = PipelineMachine.state_machine(); + sm.handle(&PipelineEvent::Block { + reason: "stuck".into(), + }); + assert!(matches!( + sm.state(), + State::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + } + )); + } + + #[test] + fn block_from_coding_via_superstate() { + let mut sm = PipelineMachine.state_machine(); + sm.handle(&PipelineEvent::DepsMet); + sm.handle(&PipelineEvent::Block { + reason: "stuck".into(), + }); + assert!(matches!( + sm.state(), + State::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + } + )); + } + + #[test] + fn block_from_qa_via_superstate() { + let mut sm = PipelineMachine.state_machine(); + sm.handle(&PipelineEvent::DepsMet); + sm.handle(&PipelineEvent::GatesStarted); + sm.handle(&PipelineEvent::Block { + reason: "stuck".into(), + }); + assert!(matches!( + sm.state(), + State::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + } + )); + } + + #[test] + fn block_from_merge_via_superstate() { + let mut sm = PipelineMachine.state_machine(); + sm.handle(&PipelineEvent::DepsMet); + sm.handle(&PipelineEvent::QaSkipped { + feature_branch: fb("f"), + commits_ahead: nz(1), + }); + sm.handle(&PipelineEvent::Block { + reason: "stuck".into(), + }); + assert!(matches!( + sm.state(), + State::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + } + )); + } + + // ── Block from Done is NOT valid (Done isn't a child of `active`) ────── + + #[test] + fn block_from_done_is_ignored() { + let mut sm = PipelineMachine.state_machine(); + sm.handle(&PipelineEvent::DepsMet); + sm.handle(&PipelineEvent::QaSkipped { + feature_branch: fb("f"), + commits_ahead: nz(1), + }); + sm.handle(&PipelineEvent::MergeSucceeded { + merge_commit: sha("abc"), + }); + // Now in Done. Block should NOT transition us anywhere. + sm.handle(&PipelineEvent::Block { + reason: "stuck".into(), + }); + assert!(matches!(sm.state(), State::Done { .. })); + } + + // ── Abandon from Done IS valid (handled inline in done()) ────────────── + + #[test] + fn abandon_from_done_works() { + let mut sm = PipelineMachine.state_machine(); + sm.handle(&PipelineEvent::DepsMet); + sm.handle(&PipelineEvent::QaSkipped { + feature_branch: fb("f"), + commits_ahead: nz(1), + }); + sm.handle(&PipelineEvent::MergeSucceeded { + merge_commit: sha("abc"), + }); + sm.handle(&PipelineEvent::Abandon); + assert!(matches!( + sm.state(), + State::Archived { + reason: ArchiveReason::Abandoned, + .. + } + )); + } + + // ── Unblock from Archived(Blocked) → Backlog ─────────────────────────── + + #[test] + fn unblock_returns_to_backlog() { + let mut sm = PipelineMachine.state_machine(); + sm.handle(&PipelineEvent::Block { + reason: "test".into(), + }); + assert!(matches!( + sm.state(), + State::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + } + )); + + sm.handle(&PipelineEvent::Unblock); + assert!(matches!(sm.state(), State::Backlog {})); + } + + #[test] + fn unblock_from_review_held_does_nothing() { + // Unblock is specifically for Blocked, not for any Archived variant. + let mut sm = PipelineMachine.state_machine(); + sm.handle(&PipelineEvent::ReviewHold { + reason: "TBD".into(), + }); + // Now in Archived(ReviewHeld). Unblock should NOT transition. + sm.handle(&PipelineEvent::Unblock); + assert!(matches!( + sm.state(), + State::Archived { + reason: ArchiveReason::ReviewHeld { .. }, + .. + } + )); + } +} + +// ── main: a quick interactive demo ─────────────────────────────────────────── + +fn main() { + println!("─── Pipeline state machine sketch (story 520) — STATIG version ───\n"); + + let mut sm = PipelineMachine.state_machine(); + println!("Initial: {:?}\n", sm.state()); + + println!("→ DepsMet"); + sm.handle(&PipelineEvent::DepsMet); + println!(" state: {:?}\n", sm.state()); + + println!("→ QaSkipped"); + sm.handle(&PipelineEvent::QaSkipped { + feature_branch: BranchName("feature/story-100".into()), + commits_ahead: NonZeroU32::new(3).unwrap(), + }); + println!(" state: {:?}\n", sm.state()); + + println!("→ MergeSucceeded"); + sm.handle(&PipelineEvent::MergeSucceeded { + merge_commit: GitSha("abc1234".into()), + }); + println!(" state: {:?}\n", sm.state()); + + println!("→ Accepted"); + sm.handle(&PipelineEvent::Accepted); + println!(" state: {:?}\n", sm.state()); + + println!("─── Trying invalid transition: Done → Unblock ───"); + let mut sm2 = PipelineMachine.state_machine(); + sm2.handle(&PipelineEvent::DepsMet); + sm2.handle(&PipelineEvent::QaSkipped { + feature_branch: BranchName("feature/story-101".into()), + commits_ahead: NonZeroU32::new(2).unwrap(), + }); + sm2.handle(&PipelineEvent::MergeSucceeded { + merge_commit: GitSha("def5678".into()), + }); + println!(" before Unblock: {:?}", sm2.state()); + sm2.handle(&PipelineEvent::Unblock); // silently ignored — no transition + println!(" after Unblock: {:?} (no change — Unblock is a no-op from Done)", sm2.state()); +}