//! Pipeline state machine — design sketch (story 520) — BARE version. //! //! This is a SCRATCH EXPERIMENT, not wired into anything else in the codebase. //! "Bare" version: hand-rolled with plain Rust enums and pattern matching, //! no external state-machine library. See `pipeline_state_sketch_statig.rs` //! for a parallel version using the `statig` crate. //! //! Run with: //! cargo run --example pipeline_state_sketch_bare -p huskies //! Test with: //! cargo test --example pipeline_state_sketch_bare -p huskies //! //! Goal: demonstrate the typed pipeline state machine that should replace //! huskies's stringly-typed CRDT state. It is intentionally standalone — //! no integration with crdt_state, no persistence, no events escape this //! file. Once we agree on the shape, this becomes the foundation for the //! real implementation in src/pipeline_state.rs. //! //! The point of this version is to show that the Rust type system alone is //! enough to make impossible states unrepresentable, without needing any //! state-machine framework. use chrono::{DateTime, Utc}; use std::num::NonZeroU32; // ── Newtypes ───────────────────────────────────────────────────────────────── // // Each of these is a "wrapper around String" today, but the wrapping itself // is the point: a function that takes a `BranchName` cannot accidentally be // called with a `StoryId`. Validation can be added later (e.g. `BranchName::new` // returns `Result` and the inner `String` is private) // without changing call sites. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StoryId(pub String); #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BranchName(pub String); #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct GitSha(pub String); #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct AgentName(pub String); #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct NodePubkey(pub [u8; 32]); // ── Synced pipeline stage (lives in CRDT, converges across nodes) ──────────── // // This is the SHARED state — every node sees the same Stage for a given story // after CRDT convergence. Local-only state (which agent is running, retry // count, rate-limit timers) lives separately in `ExecutionState` below, keyed // by node pubkey. // // Notice what is NOT a field on Stage: // - `agent` — that's local execution state, not pipeline state // - `retry_count` — also local // - `blocked` — folded into `Archived { reason: Blocked { .. } }` // // And notice what IS a field, by construction: // - Stage::Merge requires a non-zero commits_ahead (silent no-op merge is unrepresentable) // - Stage::Done requires a merge_commit (a "done" story without merge metadata is unrepresentable) // - Stage::Archived always carries a reason (no "archived but we don't know why") #[derive(Debug, Clone, PartialEq, Eq)] pub enum Stage { /// Story exists, waiting for dependencies or auto-assign promotion. Backlog, /// Story is being actively coded somewhere in the mesh. /// (Which node is local — see ExecutionState.) Coding, /// Coder has run; gates are running. Qa, /// Gates passed (or were skipped); ready to merge. /// `commits_ahead: NonZeroU32` makes "Merge with nothing to merge" structurally impossible. /// This single field eliminates today's bug 519 (silent mergemaster no-op). Merge { feature_branch: BranchName, commits_ahead: NonZeroU32, }, /// Mergemaster squashed to master. Always carries the merge metadata, /// so a "done" story is provably reachable from master. Done { merged_at: DateTime, merge_commit: GitSha, }, /// Out of the active flow. The reason explains why. Archived { archived_at: DateTime, reason: ArchiveReason, }, } #[derive(Debug, Clone, PartialEq, Eq)] pub enum ArchiveReason { /// Normal happy-path completion: accepted and filed away. Completed, /// User explicitly abandoned the story. Abandoned, /// Replaced by another story. Superseded { by: StoryId }, /// Manually blocked, awaiting human resolution. Was bug-436's `blocked: true`. Blocked { reason: String }, /// Mergemaster failed beyond the retry budget. Was bug-436's `merge_failure`. MergeFailed { reason: String }, /// Held in review at human request. Was bug-436's `review_hold`. ReviewHeld { reason: String }, } // ── Per-node execution state (lives in CRDT under node_pubkey key) ─────────── // // LOCAL-AUTHORED but GLOBALLY-READABLE. Each node only writes to entries where // node_pubkey == self, so there are no inter-author CRDT merge conflicts. Other // nodes can READ all entries to know what's happening across the mesh. // // In the real CRDT document, this would be stored as something like: // crdt.execution_state: { node_pubkey -> { story_id -> ExecutionState } } // // Why this matters operationally: // - Cross-node observability: matrix bot can show "node A is running coder-1 // on story X, node B is rate-limited on story Y" // - Heartbeat detection: if `last_heartbeat` is stale > N min, the entry is // dead (laptop closed, OOM, segfault). Other nodes can take over (story 479). // - Foundation for CRDT-based work claiming (story 479). #[derive(Debug, Clone, PartialEq, Eq)] pub enum ExecutionState { /// No agent on this node is currently working on this story. Idle, /// An agent has been requested but hasn't started its subprocess yet. Pending { agent: AgentName, since: DateTime, }, /// An agent's subprocess is alive on this node. /// `last_heartbeat` is updated periodically; if stale, the process probably died. Running { agent: AgentName, started_at: DateTime, last_heartbeat: DateTime, }, /// Agent hit a rate limit; will resume at the given time. RateLimited { agent: AgentName, resume_at: DateTime, }, /// Agent finished. exit_code disambiguates clean exit / panic / etc. Completed { agent: AgentName, exit_code: i32, completed_at: DateTime, }, } // ── Pipeline events ────────────────────────────────────────────────────────── // // Events drive Stage transitions. Each event carries any data needed to // construct the destination state, so the type signature of `transition` // guarantees we can never accidentally land in an underspecified state. // // (Compare with today's stringly-typed code, where you call // `move_story_to_merge(story_id)` and the destination state is built from // whatever happens to be in scope at the time.) #[derive(Debug, Clone)] pub enum PipelineEvent { /// All depends_on stories are in Done or Archived; promotion fires. DepsMet, /// Coder is going to start running gates. GatesStarted, /// Gates passed normally — ready to merge. Carries the data needed to /// construct Stage::Merge, so the transition can't produce a malformed merge state. GatesPassed { feature_branch: BranchName, commits_ahead: NonZeroU32, }, /// Gates failed; coder will retry. GatesFailed { reason: String }, /// QA mode is "server" — skip QA and go straight to merge. QaSkipped { feature_branch: BranchName, commits_ahead: NonZeroU32, }, /// Mergemaster successfully squashed and pushed to master. MergeSucceeded { merge_commit: GitSha }, /// Mergemaster gave up after the retry budget. MergeFailedFinal { reason: String }, /// User accepted a Done story (or auto-accept fired). Accepted, /// User explicitly blocked the story. Block { reason: String }, /// User explicitly unblocked. Unblock, /// User explicitly abandoned. Abandon, /// User marked the story as superseded by another. Supersede { by: StoryId }, /// User put the story on review hold. ReviewHold { reason: String }, } // ── Transition errors ──────────────────────────────────────────────────────── #[derive(Debug, Clone, PartialEq, Eq)] pub enum TransitionError { /// The current stage doesn't accept this event. InvalidTransition { from_stage: String, event: String, }, } // ── The transition function ────────────────────────────────────────────────── // // Pure function. Takes the current Stage and an event, returns the new Stage // or a TransitionError. The compiler enforces that every constructed Stage // has all required fields, so impossible destination states are unrepresentable. // // "What about the *side effects* of a transition?" — they don't go in here. // transition() is pure. Side effects (matrix bot notifications, file writes, // agent spawns, web UI broadcasts) are dispatched by an event bus that watches // the (before, after) tuple. See the `EventBus` sketch further down. pub fn transition(state: Stage, event: PipelineEvent) -> Result { use PipelineEvent::*; use Stage::*; let stage_label = stage_label(&state); let event_label = event_label(&event); let invalid = || TransitionError::InvalidTransition { from_stage: stage_label.to_string(), event: event_label.to_string(), }; let now = Utc::now(); match (state, event) { // ── Forward path: backlog → current → (qa →) merge → done ────────── (Backlog, DepsMet) => Ok(Coding), (Coding, GatesStarted) => Ok(Qa), (Coding, QaSkipped { feature_branch, commits_ahead }) => Ok(Merge { feature_branch, commits_ahead, }), (Qa, GatesPassed { feature_branch, commits_ahead }) => Ok(Merge { feature_branch, commits_ahead, }), // Gates failed → back to Coding for retry. (Retry-budget enforcement // lives outside this function — it's accounting on the local side.) (Qa, GatesFailed { .. }) => Ok(Coding), (Merge { .. }, MergeSucceeded { merge_commit }) => Ok(Done { merged_at: now, merge_commit, }), // ── Done → Archived(Completed) ───────────────────────────────────── (Done { .. }, Accepted) => Ok(Archived { archived_at: now, reason: ArchiveReason::Completed, }), // ── Stuck states (any active stage → Archived with a reason) ────── (Backlog, Block { reason }) | (Coding, Block { reason }) | (Qa, Block { reason }) | (Merge { .. }, Block { reason }) => Ok(Archived { archived_at: now, reason: ArchiveReason::Blocked { reason }, }), (Backlog, ReviewHold { reason }) | (Coding, ReviewHold { reason }) | (Qa, ReviewHold { reason }) | (Merge { .. }, ReviewHold { reason }) => Ok(Archived { archived_at: now, reason: ArchiveReason::ReviewHeld { reason }, }), (Merge { .. }, MergeFailedFinal { reason }) => Ok(Archived { archived_at: now, reason: ArchiveReason::MergeFailed { reason }, }), // ── Abandon / supersede from any active or done stage ────────────── (Backlog, Abandon) | (Coding, Abandon) | (Qa, Abandon) | (Merge { .. }, Abandon) | (Done { .. }, Abandon) => Ok(Archived { archived_at: now, reason: ArchiveReason::Abandoned, }), (Backlog, Supersede { by }) | (Coding, Supersede { by }) | (Qa, Supersede { by }) | (Merge { .. }, Supersede { by }) | (Done { .. }, Supersede { by }) => Ok(Archived { archived_at: now, reason: ArchiveReason::Superseded { by }, }), // ── Unblock: only from Archived(Blocked) → Backlog ───────────────── ( Archived { reason: ArchiveReason::Blocked { .. }, .. }, Unblock, ) => Ok(Backlog), // ── Everything else is invalid ───────────────────────────────────── _ => Err(invalid()), } } fn stage_label(s: &Stage) -> &'static str { match s { Stage::Backlog => "Backlog", Stage::Coding => "Coding", Stage::Qa => "Qa", Stage::Merge { .. } => "Merge", Stage::Done { .. } => "Done", Stage::Archived { .. } => "Archived", } } fn event_label(e: &PipelineEvent) -> &'static str { match e { PipelineEvent::DepsMet => "DepsMet", PipelineEvent::GatesStarted => "GatesStarted", PipelineEvent::GatesPassed { .. } => "GatesPassed", PipelineEvent::GatesFailed { .. } => "GatesFailed", PipelineEvent::QaSkipped { .. } => "QaSkipped", PipelineEvent::MergeSucceeded { .. } => "MergeSucceeded", PipelineEvent::MergeFailedFinal { .. } => "MergeFailedFinal", PipelineEvent::Accepted => "Accepted", PipelineEvent::Block { .. } => "Block", PipelineEvent::Unblock => "Unblock", PipelineEvent::Abandon => "Abandon", PipelineEvent::Supersede { .. } => "Supersede", PipelineEvent::ReviewHold { .. } => "ReviewHold", } } // ── Per-node execution state machine ───────────────────────────────────────── // // Independent of the pipeline stage machine. Tracks "what is THIS node doing // about this story right now." Multiple nodes can have different ExecutionState // for the same story_id at the same time — and that's fine, because each node // owns its own subspace in the CRDT. #[derive(Debug, Clone)] pub enum ExecutionEvent { SpawnRequested { agent: AgentName }, SpawnedSuccessfully, Heartbeat, HitRateLimit { resume_at: DateTime }, Exited { exit_code: i32 }, Stopped, Reset, } pub fn execution_transition( state: ExecutionState, event: ExecutionEvent, ) -> Result { use ExecutionEvent::*; use ExecutionState::*; let now = Utc::now(); match (state, event) { (Idle, SpawnRequested { agent }) => Ok(Pending { agent, since: now }), (Pending { agent, .. }, SpawnedSuccessfully) => Ok(Running { agent, started_at: now, last_heartbeat: now, }), ( Running { agent, started_at, .. }, Heartbeat, ) => Ok(Running { agent, started_at, last_heartbeat: now, }), (Running { agent, .. }, HitRateLimit { resume_at }) | (Pending { agent, .. }, HitRateLimit { resume_at }) => Ok(RateLimited { agent, resume_at }), (RateLimited { agent, .. }, SpawnedSuccessfully) => Ok(Running { agent, started_at: now, last_heartbeat: now, }), (Running { agent, .. }, Exited { exit_code }) | (Pending { agent, .. }, Exited { exit_code }) | (RateLimited { agent, .. }, Exited { exit_code }) => Ok(Completed { agent, exit_code, completed_at: now, }), // Stop and Reset always return to Idle, from anywhere. (_, Stopped) | (_, Reset) => Ok(Idle), _ => Err(TransitionError::InvalidTransition { from_stage: "ExecutionState".to_string(), event: "".to_string(), }), } } // ── Event bus sketch ───────────────────────────────────────────────────────── // // This is intentionally tiny — the goal is to show that the side-effect dispatch // is *separable* from the transition function. Real implementation would use // tokio broadcast channels or a proper event bus, but the pattern is the same. #[derive(Debug, Clone)] pub struct TransitionFired { pub story_id: StoryId, pub before: Stage, pub after: Stage, pub event: PipelineEvent, pub at: DateTime, } pub trait TransitionSubscriber: Send + Sync { fn name(&self) -> &'static str; fn on_transition(&self, fired: &TransitionFired); } pub struct EventBus { subscribers: Vec>, } impl EventBus { pub fn new() -> Self { Self { subscribers: Vec::new(), } } pub fn subscribe(&mut self, subscriber: S) { self.subscribers.push(Box::new(subscriber)); } pub fn fire(&self, event: TransitionFired) { for sub in &self.subscribers { sub.on_transition(&event); } } } impl Default for EventBus { fn default() -> Self { Self::new() } } // Example subscribers (just println! for the sketch): pub struct MatrixBotSub; impl TransitionSubscriber for MatrixBotSub { fn name(&self) -> &'static str { "matrix-bot" } fn on_transition(&self, f: &TransitionFired) { println!( " [matrix-bot] #{}: {} → {}", f.story_id.0, stage_label(&f.before), stage_label(&f.after) ); } } pub struct FileRendererSub; impl TransitionSubscriber for FileRendererSub { fn name(&self) -> &'static str { "filesystem" } fn on_transition(&self, f: &TransitionFired) { println!( " [filesystem] re-rendering .huskies/work/{}/{}.md", stage_dir_name(&f.after), f.story_id.0 ); } } pub struct PipelineItemsSub; impl TransitionSubscriber for PipelineItemsSub { fn name(&self) -> &'static str { "pipeline-items" } fn on_transition(&self, f: &TransitionFired) { println!( " [pipeline-items] UPDATE pipeline_items SET stage = '{}' WHERE id = '{}'", stage_dir_name(&f.after), f.story_id.0 ); } } fn stage_dir_name(s: &Stage) -> &'static str { match s { Stage::Backlog => "1_backlog", Stage::Coding => "2_current", Stage::Qa => "3_qa", Stage::Merge { .. } => "4_merge", Stage::Done { .. } => "5_done", Stage::Archived { .. } => "6_archived", } } // ── Tests ──────────────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; fn nz(n: u32) -> NonZeroU32 { NonZeroU32::new(n).unwrap() } fn fb(name: &str) -> BranchName { BranchName(name.to_string()) } fn sha(s: &str) -> GitSha { GitSha(s.to_string()) } // ── Happy path ───────────────────────────────────────────────────────── #[test] fn happy_path_backlog_through_done() { let s = Stage::Backlog; let s = transition(s, PipelineEvent::DepsMet).unwrap(); assert!(matches!(s, Stage::Coding)); let s = transition( s, PipelineEvent::QaSkipped { feature_branch: fb("feature/story-1"), commits_ahead: nz(3), }, ) .unwrap(); assert!(matches!(s, Stage::Merge { .. })); let s = transition( s, PipelineEvent::MergeSucceeded { merge_commit: sha("abc123"), }, ) .unwrap(); assert!(matches!(s, Stage::Done { .. })); let s = transition(s, PipelineEvent::Accepted).unwrap(); assert!(matches!( s, Stage::Archived { reason: ArchiveReason::Completed, .. } )); } #[test] fn qa_retry_loop() { let s = Stage::Coding; let s = transition(s, PipelineEvent::GatesStarted).unwrap(); assert!(matches!(s, Stage::Qa)); let s = transition( s, PipelineEvent::GatesFailed { reason: "tests failed".into(), }, ) .unwrap(); assert!(matches!(s, Stage::Coding)); } // ── Bug 519 made unrepresentable: Merge with zero commits ahead ──────── #[test] fn merge_with_zero_commits_is_unrepresentable() { // NonZeroU32::new(0) returns None — the type system literally refuses // to construct a Merge state with no commits ahead of master. This is // bug 519's "silent mergemaster no-op" gone, structurally. assert!(NonZeroU32::new(0).is_none()); } // ── Bug 502 made unrepresentable: agent on the wrong stage ───────────── // // There's nothing to test here at the *Stage* level, because Stage doesn't // have an `agent` field at all. Agent assignment is per-node ExecutionState. // The "coder agent on a Merge stage" failure mode from bug 502 cannot be // expressed in this type system: a coder can attach to a story (writing to // its node-local ExecutionState), but the Stage::Merge variant has no slot // for an agent. The "wrong-stage agent" error is gone because the wrong // state is unrepresentable. // ── Invalid transitions return errors ────────────────────────────────── #[test] fn cannot_jump_from_backlog_to_done() { let s = Stage::Backlog; let result = transition(s, PipelineEvent::Accepted); assert!(matches!( result, Err(TransitionError::InvalidTransition { .. }) )); } #[test] fn cannot_unblock_a_done_story() { let s = Stage::Done { merged_at: Utc::now(), merge_commit: sha("abc"), }; let result = transition(s, PipelineEvent::Unblock); assert!(matches!( result, Err(TransitionError::InvalidTransition { .. }) )); } #[test] fn cannot_unblock_a_review_held_story() { // Unblock is specifically for Blocked, not for any Archived variant. let s = Stage::Archived { archived_at: Utc::now(), reason: ArchiveReason::ReviewHeld { reason: "TBD".into(), }, }; let result = transition(s, PipelineEvent::Unblock); assert!(matches!( result, Err(TransitionError::InvalidTransition { .. }) )); } // ── Block from any active stage ──────────────────────────────────────── #[test] fn block_from_any_active_stage() { for s in [Stage::Backlog, Stage::Coding, Stage::Qa] { let result = transition( s.clone(), PipelineEvent::Block { reason: "stuck".into(), }, ); assert!(matches!( result, Ok(Stage::Archived { reason: ArchiveReason::Blocked { .. }, .. }) )); } // Also from Merge: let m = Stage::Merge { feature_branch: fb("f"), commits_ahead: nz(1), }; let result = transition( m, PipelineEvent::Block { reason: "stuck".into(), }, ); assert!(matches!( result, Ok(Stage::Archived { reason: ArchiveReason::Blocked { .. }, .. }) )); } #[test] fn unblock_returns_to_backlog() { let s = Stage::Archived { archived_at: Utc::now(), reason: ArchiveReason::Blocked { reason: "test".into(), }, }; let result = transition(s, PipelineEvent::Unblock).unwrap(); assert!(matches!(result, Stage::Backlog)); } // ── Execution state ──────────────────────────────────────────────────── #[test] fn execution_happy_path() { let e = ExecutionState::Idle; let e = execution_transition( e, ExecutionEvent::SpawnRequested { agent: AgentName("coder-1".into()), }, ) .unwrap(); assert!(matches!(e, ExecutionState::Pending { .. })); let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap(); assert!(matches!(e, ExecutionState::Running { .. })); let e = execution_transition(e, ExecutionEvent::Heartbeat).unwrap(); assert!(matches!(e, ExecutionState::Running { .. })); let e = execution_transition(e, ExecutionEvent::Exited { exit_code: 0 }).unwrap(); assert!(matches!( e, ExecutionState::Completed { exit_code: 0, .. } )); } #[test] fn execution_rate_limit_then_resume() { let e = ExecutionState::Running { agent: AgentName("coder-1".into()), started_at: Utc::now(), last_heartbeat: Utc::now(), }; let e = execution_transition( e, ExecutionEvent::HitRateLimit { resume_at: Utc::now() + chrono::Duration::minutes(5), }, ) .unwrap(); assert!(matches!(e, ExecutionState::RateLimited { .. })); let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap(); assert!(matches!(e, ExecutionState::Running { .. })); } #[test] fn execution_stop_from_anywhere_returns_idle() { let e = ExecutionState::Running { agent: AgentName("coder-1".into()), started_at: Utc::now(), last_heartbeat: Utc::now(), }; let e = execution_transition(e, ExecutionEvent::Stopped).unwrap(); assert!(matches!(e, ExecutionState::Idle)); } } // ── main: a quick interactive demo ─────────────────────────────────────────── fn main() { println!("─── Pipeline state machine sketch (story 520) ───\n"); // Set up the event bus with three subscribers — one for each side effect. let mut bus = EventBus::new(); bus.subscribe(MatrixBotSub); bus.subscribe(PipelineItemsSub); bus.subscribe(FileRendererSub); let story_id = StoryId("100_story_demo".into()); // Helper to apply a transition + fire the bus. let mut current_stage = Stage::Backlog; let step = |bus: &EventBus, stage: &mut Stage, event: PipelineEvent| -> Result<(), TransitionError> { let before = stage.clone(); let after = transition(stage.clone(), event.clone())?; bus.fire(TransitionFired { story_id: story_id.clone(), before, after: after.clone(), event, at: Utc::now(), }); *stage = after; Ok(()) }; println!("Initial: {current_stage:?}\n"); println!("→ DepsMet"); step(&bus, &mut current_stage, PipelineEvent::DepsMet).unwrap(); println!(); println!("→ QaSkipped (qa: server, gates auto-pass)"); step( &bus, &mut current_stage, PipelineEvent::QaSkipped { feature_branch: BranchName("feature/story-100".into()), commits_ahead: NonZeroU32::new(3).unwrap(), }, ) .unwrap(); println!(); println!("→ MergeSucceeded"); step( &bus, &mut current_stage, PipelineEvent::MergeSucceeded { merge_commit: GitSha("abc1234".into()), }, ) .unwrap(); println!(); println!("→ Accepted"); step(&bus, &mut current_stage, PipelineEvent::Accepted).unwrap(); println!(); println!("Final: {current_stage:?}\n"); println!("─── Trying an invalid transition: Done → Unblock ───"); let invalid_result = transition(current_stage.clone(), PipelineEvent::Unblock); println!("Result: {invalid_result:?}"); }