From dd35a8a5309d8cd4f1a320fc456f3c645bb719e1 Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 28 Apr 2026 20:56:22 +0000 Subject: [PATCH] huskies: merge 799 --- server/src/pipeline_state/mod.rs | 910 +----------------------- server/src/pipeline_state/tests.rs | 408 +++++++++++ server/src/pipeline_state/transition.rs | 254 +++++++ server/src/pipeline_state/types.rs | 235 ++++++ 4 files changed, 924 insertions(+), 883 deletions(-) create mode 100644 server/src/pipeline_state/tests.rs create mode 100644 server/src/pipeline_state/transition.rs create mode 100644 server/src/pipeline_state/types.rs diff --git a/server/src/pipeline_state/mod.rs b/server/src/pipeline_state/mod.rs index ebada0b4..f56e6c7a 100644 --- a/server/src/pipeline_state/mod.rs +++ b/server/src/pipeline_state/mod.rs @@ -11,905 +11,49 @@ //! This is a foundation module: the types, transitions, projection layer, and //! event bus are fully defined and tested here. Consumers will be migrated to //! the typed API incrementally in follow-up stories. +//! +//! ## Module layout +//! - [`types`] — newtypes, `Stage`, `ExecutionState`, `PipelineItem`, errors, label helpers +//! - [`transition`] — `PipelineEvent`, `ExecutionEvent`, `transition`, `execution_transition` +//! - [`events`] — `EventBus`, `TransitionFired`, `TransitionSubscriber` +//! - [`projection`] — CRDT → typed projection layer (`read_typed`, `read_all_typed`) +//! - [`subscribers`] — concrete subscriber stubs // Some items are exercised by tests or used only in non-active code paths; // the dead_code lint is suppressed for the module. #![allow(dead_code)] -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use std::fmt; -use std::num::NonZeroU32; - -// ── Newtypes ──────────────────────────────────────────────────────────────── - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub struct StoryId(pub String); - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub struct BranchName(pub String); - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub struct GitSha(pub String); - -#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub struct AgentName(pub String); - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] -pub struct NodePubkey(pub [u8; 32]); - -impl fmt::Display for StoryId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(&self.0) - } -} - -impl fmt::Display for AgentName { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(&self.0) - } -} - -// ── Synced pipeline stage (lives in CRDT, converges across nodes) ─────────── - -/// The pipeline stage for a work item. -/// -/// This is the SHARED state — every node sees the same Stage for a given story -/// after CRDT convergence. Notice what is NOT a field: -/// - `agent` — local execution state, not pipeline state -/// - `retry_count` — also local -/// - `blocked` — folded into `Archived { reason: Blocked { .. } }` -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub enum Stage { - /// Story exists, waiting for dependencies or auto-assign promotion. - Backlog, - - /// Story is being actively coded somewhere in the mesh. - Coding, - - /// Coder has run; gates are running. - Qa, - - /// Gates passed; ready to merge. - /// `commits_ahead: NonZeroU32` makes "Merge with nothing to merge" - /// structurally impossible (eliminates bug 519). - Merge { - feature_branch: BranchName, - commits_ahead: NonZeroU32, - }, - - /// Mergemaster squashed to master. Always carries merge metadata. - Done { - merged_at: DateTime, - merge_commit: GitSha, - }, - - /// Out of the active flow. The reason explains why. - Archived { - archived_at: DateTime, - reason: ArchiveReason, - }, -} - -/// Why a story was archived. Subsumes the old `blocked`, `merge_failure`, -/// and `review_hold` front-matter fields (story 436). -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub enum ArchiveReason { - /// Normal happy-path completion. - Completed, - /// User explicitly abandoned the story. - Abandoned, - /// Replaced by another story. - Superseded { by: StoryId }, - /// Manually blocked, awaiting human resolution. - Blocked { reason: String }, - /// Mergemaster failed beyond the retry budget. - MergeFailed { reason: String }, - /// Held in review at human request. - ReviewHeld { reason: String }, -} - -// ── Stage convenience methods ────────────────────────────────────────────── - -impl Stage { - /// Returns true if this stage is an "active" stage (Coding, Qa, or Merge). - pub fn is_active(&self) -> bool { - matches!(self, Stage::Coding | Stage::Qa | Stage::Merge { .. }) - } - - /// Returns the filesystem directory name for this stage. - pub fn dir_name(&self) -> &'static str { - stage_dir_name(self) - } - - /// Returns true if this is the Archived(Blocked) variant. - pub fn is_blocked(&self) -> bool { - matches!( - self, - Stage::Archived { - reason: ArchiveReason::Blocked { .. }, - .. - } - ) - } - - /// Parse a stage from its filesystem directory name. - /// - /// This is the single canonical conversion boundary for turning a loose - /// stage-directory string (from CRDT fields or watcher events) into a - /// typed `Stage`. Rich variants (`Done`, `Archived`, `Merge`) are - /// synthesised with zero-value fields — callers should use this only for - /// stage *classification* (e.g. `is_active()`, `matches!`), not for - /// accessing the rich metadata fields. - pub fn from_dir(s: &str) -> Option { - match s { - "1_backlog" => Some(Stage::Backlog), - "2_current" => Some(Stage::Coding), - "3_qa" => Some(Stage::Qa), - "4_merge" => Some(Stage::Merge { - feature_branch: BranchName(String::new()), - commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), - }), - "5_done" => Some(Stage::Done { - merged_at: DateTime::::UNIX_EPOCH, - merge_commit: GitSha(String::new()), - }), - "6_archived" => Some(Stage::Archived { - archived_at: DateTime::::UNIX_EPOCH, - reason: ArchiveReason::Completed, - }), - _ => None, - } - } -} - -// ── Per-node execution state ──────────────────────────────────────────────── - -/// Per-node execution tracking, stored in the CRDT under each node's pubkey. -/// -/// Each node only writes to entries where `node_pubkey == self.pubkey`, so -/// there are no inter-author CRDT merge conflicts. All nodes can READ all -/// entries to know what's happening across the mesh. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub enum ExecutionState { - Idle, - Pending { - agent: AgentName, - since: DateTime, - }, - Running { - agent: AgentName, - started_at: DateTime, - last_heartbeat: DateTime, - }, - RateLimited { - agent: AgentName, - resume_at: DateTime, - }, - Completed { - agent: AgentName, - exit_code: i32, - completed_at: DateTime, - }, -} - -// ── Pipeline item (the aggregate) ─────────────────────────────────────────── - -/// A fully typed pipeline item. Every field is validated by construction. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PipelineItem { - pub story_id: StoryId, - pub name: String, - pub stage: Stage, - pub depends_on: Vec, - pub retry_count: u32, -} - -// ── Pipeline events ───────────────────────────────────────────────────────── - -/// Events that drive Stage transitions. Each variant carries the data needed -/// to construct the destination state, so the transition function can never -/// accidentally land in an underspecified state. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum PipelineEvent { - /// Dependencies met; promote from backlog. - DepsMet, - /// Coder starting gates. - GatesStarted, - /// Gates passed — ready to merge. - GatesPassed { - feature_branch: BranchName, - commits_ahead: NonZeroU32, - }, - /// Gates failed; retry. - GatesFailed { reason: String }, - /// QA mode is "server" — skip QA, go straight to merge. - QaSkipped { - feature_branch: BranchName, - commits_ahead: NonZeroU32, - }, - /// Mergemaster squash succeeded. - MergeSucceeded { merge_commit: GitSha }, - /// Mergemaster gave up after retry budget. - MergeFailedFinal { reason: String }, - /// Story accepted (Done → Archived). - Accepted, - /// User blocked the story. - Block { reason: String }, - /// User unblocked. - Unblock, - /// User abandoned. - Abandon, - /// Story superseded by another. - Supersede { by: StoryId }, - /// Story put on review hold. - ReviewHold { reason: String }, -} - -// ── Transition errors ─────────────────────────────────────────────────────── - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub enum TransitionError { - InvalidTransition { from_stage: String, event: String }, -} - -impl fmt::Display for TransitionError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::InvalidTransition { from_stage, event } => { - write!(f, "invalid transition: {from_stage} + {event}") - } - } - } -} - -impl std::error::Error for TransitionError {} - -// ── The transition function ───────────────────────────────────────────────── - -/// Pure state transition. Takes the current Stage and an event, returns the -/// new Stage or a TransitionError. Side effects are dispatched separately -/// via the event bus. -pub fn transition(state: Stage, event: PipelineEvent) -> Result { - use PipelineEvent::*; - use Stage::*; - - let sl = stage_label(&state); - let el = event_label(&event); - let invalid = || TransitionError::InvalidTransition { - from_stage: sl.to_string(), - event: el.to_string(), - }; - - let now = Utc::now(); - - match (state, event) { - // ── Forward path ──────────────────────────────────────────────── - (Backlog, DepsMet) => Ok(Coding), - (Coding, GatesStarted) => Ok(Qa), - ( - Coding, - QaSkipped { - feature_branch, - commits_ahead, - }, - ) => Ok(Merge { - feature_branch, - commits_ahead, - }), - ( - Qa, - GatesPassed { - feature_branch, - commits_ahead, - }, - ) => Ok(Merge { - feature_branch, - commits_ahead, - }), - (Qa, GatesFailed { .. }) => Ok(Coding), - (Merge { .. }, MergeSucceeded { merge_commit }) => Ok(Done { - merged_at: now, - merge_commit, - }), - - // ── Done → Archived(Completed) ────────────────────────────────── - (Done { .. }, Accepted) => Ok(Archived { - archived_at: now, - reason: ArchiveReason::Completed, - }), - - // ── Stuck states (any active → Archived) ─────────────────────── - (Backlog, Block { reason }) - | (Coding, Block { reason }) - | (Qa, Block { reason }) - | (Merge { .. }, Block { reason }) => Ok(Archived { - archived_at: now, - reason: ArchiveReason::Blocked { reason }, - }), - - (Backlog, ReviewHold { reason }) - | (Coding, ReviewHold { reason }) - | (Qa, ReviewHold { reason }) - | (Merge { .. }, ReviewHold { reason }) => Ok(Archived { - archived_at: now, - reason: ArchiveReason::ReviewHeld { reason }, - }), - - (Merge { .. }, MergeFailedFinal { reason }) => Ok(Archived { - archived_at: now, - reason: ArchiveReason::MergeFailed { reason }, - }), - - // ── Abandon / supersede from any active or done stage ─────────── - (Backlog, Abandon) - | (Coding, Abandon) - | (Qa, Abandon) - | (Merge { .. }, Abandon) - | (Done { .. }, Abandon) => Ok(Archived { - archived_at: now, - reason: ArchiveReason::Abandoned, - }), - - (Backlog, Supersede { by }) - | (Coding, Supersede { by }) - | (Qa, Supersede { by }) - | (Merge { .. }, Supersede { by }) - | (Done { .. }, Supersede { by }) => Ok(Archived { - archived_at: now, - reason: ArchiveReason::Superseded { by }, - }), - - // ── Unblock: only from Archived(Blocked) → Backlog ───────────── - ( - Archived { - reason: ArchiveReason::Blocked { .. }, - .. - }, - Unblock, - ) => Ok(Backlog), - - // ── Everything else is invalid ────────────────────────────────── - _ => Err(invalid()), - } -} - -// ── Label helpers ─────────────────────────────────────────────────────────── - -pub fn stage_label(s: &Stage) -> &'static str { - match s { - Stage::Backlog => "Backlog", - Stage::Coding => "Coding", - Stage::Qa => "Qa", - Stage::Merge { .. } => "Merge", - Stage::Done { .. } => "Done", - Stage::Archived { .. } => "Archived", - } -} - -pub fn event_label(e: &PipelineEvent) -> &'static str { - match e { - PipelineEvent::DepsMet => "DepsMet", - PipelineEvent::GatesStarted => "GatesStarted", - PipelineEvent::GatesPassed { .. } => "GatesPassed", - PipelineEvent::GatesFailed { .. } => "GatesFailed", - PipelineEvent::QaSkipped { .. } => "QaSkipped", - PipelineEvent::MergeSucceeded { .. } => "MergeSucceeded", - PipelineEvent::MergeFailedFinal { .. } => "MergeFailedFinal", - PipelineEvent::Accepted => "Accepted", - PipelineEvent::Block { .. } => "Block", - PipelineEvent::Unblock => "Unblock", - PipelineEvent::Abandon => "Abandon", - PipelineEvent::Supersede { .. } => "Supersede", - PipelineEvent::ReviewHold { .. } => "ReviewHold", - } -} - -/// Map a Stage to the filesystem directory name used by the work pipeline. -pub fn stage_dir_name(s: &Stage) -> &'static str { - match s { - Stage::Backlog => "1_backlog", - Stage::Coding => "2_current", - Stage::Qa => "3_qa", - Stage::Merge { .. } => "4_merge", - Stage::Done { .. } => "5_done", - Stage::Archived { .. } => "6_archived", - } -} - -// ── Per-node execution state machine ──────────────────────────────────────── - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ExecutionEvent { - SpawnRequested { agent: AgentName }, - SpawnedSuccessfully, - Heartbeat, - HitRateLimit { resume_at: DateTime }, - Exited { exit_code: i32 }, - Stopped, - Reset, -} - -pub fn execution_transition( - state: ExecutionState, - event: ExecutionEvent, -) -> Result { - use ExecutionEvent::*; - use ExecutionState::*; - - let now = Utc::now(); - - match (state, event) { - (Idle, SpawnRequested { agent }) => Ok(Pending { agent, since: now }), - - (Pending { agent, .. }, SpawnedSuccessfully) => Ok(Running { - agent, - started_at: now, - last_heartbeat: now, - }), - - ( - Running { - agent, started_at, .. - }, - Heartbeat, - ) => Ok(Running { - agent, - started_at, - last_heartbeat: now, - }), - - (Running { agent, .. }, HitRateLimit { resume_at }) - | (Pending { agent, .. }, HitRateLimit { resume_at }) => { - Ok(RateLimited { agent, resume_at }) - } - - (RateLimited { agent, .. }, SpawnedSuccessfully) => Ok(Running { - agent, - started_at: now, - last_heartbeat: now, - }), - - (Running { agent, .. }, Exited { exit_code }) - | (Pending { agent, .. }, Exited { exit_code }) - | (RateLimited { agent, .. }, Exited { exit_code }) => Ok(Completed { - agent, - exit_code, - completed_at: now, - }), - - (_, Stopped) | (_, Reset) => Ok(Idle), - - _ => Err(TransitionError::InvalidTransition { - from_stage: "ExecutionState".to_string(), - event: "".to_string(), - }), - } -} - mod events; mod projection; mod subscribers; +mod transition; +mod types; + +#[cfg(test)] +mod tests; + +// ── Public re-exports ─────────────────────────────────────────────────────── + +#[allow(unused_imports)] +pub use types::{ + AgentName, ArchiveReason, BranchName, ExecutionState, GitSha, NodePubkey, PipelineItem, Stage, + StoryId, TransitionError, stage_dir_name, stage_label, +}; + +#[allow(unused_imports)] +pub use transition::{ + ExecutionEvent, PipelineEvent, event_label, execution_transition, transition, +}; #[allow(unused_imports)] pub use events::{EventBus, TransitionFired, TransitionSubscriber}; + #[allow(unused_imports)] pub use projection::{ProjectionError, project_stage}; pub use projection::{read_all_typed, read_typed}; + #[allow(unused_imports)] pub use subscribers::{ AutoAssignSubscriber, FileRendererSubscriber, MatrixBotSubscriber, PipelineItemsSubscriber, WebUiBroadcastSubscriber, }; - -#[cfg(test)] -mod tests { - use super::*; - use std::num::NonZeroU32; - - fn nz(n: u32) -> NonZeroU32 { - NonZeroU32::new(n).unwrap() - } - fn fb(name: &str) -> BranchName { - BranchName(name.to_string()) - } - fn sha(s: &str) -> GitSha { - GitSha(s.to_string()) - } - fn sid(s: &str) -> StoryId { - StoryId(s.to_string()) - } - - #[test] - fn happy_path_backlog_through_archived() { - let s = Stage::Backlog; - let s = transition(s, PipelineEvent::DepsMet).unwrap(); - assert!(matches!(s, Stage::Coding)); - - let s = transition( - s, - PipelineEvent::QaSkipped { - feature_branch: fb("feature/story-1"), - commits_ahead: nz(3), - }, - ) - .unwrap(); - assert!(matches!(s, Stage::Merge { .. })); - - let s = transition( - s, - PipelineEvent::MergeSucceeded { - merge_commit: sha("abc123"), - }, - ) - .unwrap(); - assert!(matches!(s, Stage::Done { .. })); - - let s = transition(s, PipelineEvent::Accepted).unwrap(); - assert!(matches!( - s, - Stage::Archived { - reason: ArchiveReason::Completed, - .. - } - )); - } - - #[test] - fn happy_path_with_qa() { - let s = Stage::Coding; - let s = transition(s, PipelineEvent::GatesStarted).unwrap(); - assert!(matches!(s, Stage::Qa)); - - let s = transition( - s, - PipelineEvent::GatesPassed { - feature_branch: fb("feature/story-2"), - commits_ahead: nz(5), - }, - ) - .unwrap(); - assert!(matches!(s, Stage::Merge { .. })); - } - - #[test] - fn qa_retry_loop() { - let s = Stage::Coding; - let s = transition(s, PipelineEvent::GatesStarted).unwrap(); - assert!(matches!(s, Stage::Qa)); - - let s = transition( - s, - PipelineEvent::GatesFailed { - reason: "tests failed".into(), - }, - ) - .unwrap(); - assert!(matches!(s, Stage::Coding)); - } - - // ── Bug 519: Merge with zero commits is unrepresentable ───────────── - - #[test] - fn merge_with_zero_commits_is_unrepresentable() { - assert!(NonZeroU32::new(0).is_none()); - } - - // ── Invalid transitions ───────────────────────────────────────────── - - #[test] - fn cannot_jump_backlog_to_done() { - let result = transition(Stage::Backlog, PipelineEvent::Accepted); - assert!(matches!( - result, - Err(TransitionError::InvalidTransition { .. }) - )); - } - - #[test] - fn cannot_unblock_done_story() { - let s = Stage::Done { - merged_at: Utc::now(), - merge_commit: sha("abc"), - }; - let result = transition(s, PipelineEvent::Unblock); - assert!(matches!( - result, - Err(TransitionError::InvalidTransition { .. }) - )); - } - - #[test] - fn cannot_unblock_review_held_story() { - let s = Stage::Archived { - archived_at: Utc::now(), - reason: ArchiveReason::ReviewHeld { - reason: "TBD".into(), - }, - }; - let result = transition(s, PipelineEvent::Unblock); - assert!(matches!( - result, - Err(TransitionError::InvalidTransition { .. }) - )); - } - - #[test] - fn cannot_merge_from_backlog() { - let result = transition( - Stage::Backlog, - PipelineEvent::MergeSucceeded { - merge_commit: sha("abc"), - }, - ); - assert!(matches!( - result, - Err(TransitionError::InvalidTransition { .. }) - )); - } - - #[test] - fn cannot_start_gates_from_backlog() { - let result = transition(Stage::Backlog, PipelineEvent::GatesStarted); - assert!(matches!( - result, - Err(TransitionError::InvalidTransition { .. }) - )); - } - - #[test] - fn cannot_accept_from_coding() { - let result = transition(Stage::Coding, PipelineEvent::Accepted); - assert!(matches!( - result, - Err(TransitionError::InvalidTransition { .. }) - )); - } - - // ── Block from any active stage ───────────────────────────────────── - - #[test] - fn block_from_any_active_stage() { - for s in [Stage::Backlog, Stage::Coding, Stage::Qa] { - let result = transition( - s.clone(), - PipelineEvent::Block { - reason: "stuck".into(), - }, - ); - assert!(matches!( - result, - Ok(Stage::Archived { - reason: ArchiveReason::Blocked { .. }, - .. - }) - )); - } - - let m = Stage::Merge { - feature_branch: fb("f"), - commits_ahead: nz(1), - }; - let result = transition( - m, - PipelineEvent::Block { - reason: "stuck".into(), - }, - ); - assert!(matches!( - result, - Ok(Stage::Archived { - reason: ArchiveReason::Blocked { .. }, - .. - }) - )); - } - - #[test] - fn unblock_returns_to_backlog() { - let s = Stage::Archived { - archived_at: Utc::now(), - reason: ArchiveReason::Blocked { - reason: "test".into(), - }, - }; - let result = transition(s, PipelineEvent::Unblock).unwrap(); - assert!(matches!(result, Stage::Backlog)); - } - - // ── Abandon / supersede ───────────────────────────────────────────── - - #[test] - fn abandon_from_any_active_or_done() { - for s in [ - Stage::Backlog, - Stage::Coding, - Stage::Qa, - Stage::Done { - merged_at: Utc::now(), - merge_commit: sha("x"), - }, - ] { - let result = transition(s, PipelineEvent::Abandon); - assert!(matches!( - result, - Ok(Stage::Archived { - reason: ArchiveReason::Abandoned, - .. - }) - )); - } - } - - #[test] - fn supersede_from_any_active_or_done() { - for s in [ - Stage::Backlog, - Stage::Coding, - Stage::Qa, - Stage::Done { - merged_at: Utc::now(), - merge_commit: sha("x"), - }, - ] { - let result = transition( - s, - PipelineEvent::Supersede { - by: sid("999_story_new"), - }, - ); - assert!(matches!( - result, - Ok(Stage::Archived { - reason: ArchiveReason::Superseded { .. }, - .. - }) - )); - } - } - - // ── Review hold ───────────────────────────────────────────────────── - - #[test] - fn review_hold_from_active_stages() { - for s in [Stage::Backlog, Stage::Coding, Stage::Qa] { - let result = transition( - s.clone(), - PipelineEvent::ReviewHold { - reason: "review".into(), - }, - ); - assert!(matches!( - result, - Ok(Stage::Archived { - reason: ArchiveReason::ReviewHeld { .. }, - .. - }) - )); - } - } - - // ── Merge failed final ────────────────────────────────────────────── - - #[test] - fn merge_failed_final() { - let s = Stage::Merge { - feature_branch: fb("f"), - commits_ahead: nz(1), - }; - let result = transition( - s, - PipelineEvent::MergeFailedFinal { - reason: "conflicts".into(), - }, - ) - .unwrap(); - assert!(matches!( - result, - Stage::Archived { - reason: ArchiveReason::MergeFailed { .. }, - .. - } - )); - } - - #[test] - fn merge_failed_only_from_merge() { - let result = transition( - Stage::Coding, - PipelineEvent::MergeFailedFinal { - reason: "conflicts".into(), - }, - ); - assert!(matches!( - result, - Err(TransitionError::InvalidTransition { .. }) - )); - } - - // ── Execution state machine ───────────────────────────────────────── - - #[test] - fn execution_happy_path() { - let e = ExecutionState::Idle; - let e = execution_transition( - e, - ExecutionEvent::SpawnRequested { - agent: AgentName("coder-1".into()), - }, - ) - .unwrap(); - assert!(matches!(e, ExecutionState::Pending { .. })); - - let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap(); - assert!(matches!(e, ExecutionState::Running { .. })); - - let e = execution_transition(e, ExecutionEvent::Heartbeat).unwrap(); - assert!(matches!(e, ExecutionState::Running { .. })); - - let e = execution_transition(e, ExecutionEvent::Exited { exit_code: 0 }).unwrap(); - assert!(matches!(e, ExecutionState::Completed { exit_code: 0, .. })); - } - - #[test] - fn execution_rate_limit_then_resume() { - let e = ExecutionState::Running { - agent: AgentName("coder-1".into()), - started_at: Utc::now(), - last_heartbeat: Utc::now(), - }; - let e = execution_transition( - e, - ExecutionEvent::HitRateLimit { - resume_at: Utc::now() + chrono::Duration::minutes(5), - }, - ) - .unwrap(); - assert!(matches!(e, ExecutionState::RateLimited { .. })); - - let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap(); - assert!(matches!(e, ExecutionState::Running { .. })); - } - - #[test] - fn execution_stop_from_anywhere() { - let e = ExecutionState::Running { - agent: AgentName("coder-1".into()), - started_at: Utc::now(), - last_heartbeat: Utc::now(), - }; - let e = execution_transition(e, ExecutionEvent::Stopped).unwrap(); - assert!(matches!(e, ExecutionState::Idle)); - } - - // ── Projection tests ──────────────────────────────────────────────── - - #[test] - fn bug_502_agent_not_in_stage() { - // Bug 502 was caused by a coder agent being assigned to a story in - // Merge stage. In the typed system, Stage has no `agent` field at all. - // Agent assignment is per-node ExecutionState. This test documents that - // the old failure mode is structurally impossible. - let merge = Stage::Merge { - feature_branch: BranchName("feature/story-1".into()), - commits_ahead: NonZeroU32::new(3).unwrap(), - }; - // Stage::Merge has exactly two fields: feature_branch and commits_ahead. - // There is no way to attach an agent name to it. The type system - // prevents bug 502 by construction. - assert!(matches!(merge, Stage::Merge { .. })); - } - - // ── TransitionError Display ───────────────────────────────────────── - - #[test] - fn transition_error_display() { - let err = TransitionError::InvalidTransition { - from_stage: "Backlog".into(), - event: "Accepted".into(), - }; - assert_eq!(err.to_string(), "invalid transition: Backlog + Accepted"); - } - - // ── ProjectionError Display ───────────────────────────────────────── -} diff --git a/server/src/pipeline_state/tests.rs b/server/src/pipeline_state/tests.rs new file mode 100644 index 00000000..f476bb09 --- /dev/null +++ b/server/src/pipeline_state/tests.rs @@ -0,0 +1,408 @@ +use super::*; +use std::num::NonZeroU32; + +fn nz(n: u32) -> NonZeroU32 { + NonZeroU32::new(n).unwrap() +} +fn fb(name: &str) -> BranchName { + BranchName(name.to_string()) +} +fn sha(s: &str) -> GitSha { + GitSha(s.to_string()) +} +fn sid(s: &str) -> StoryId { + StoryId(s.to_string()) +} + +#[test] +fn happy_path_backlog_through_archived() { + let s = Stage::Backlog; + let s = transition(s, PipelineEvent::DepsMet).unwrap(); + assert!(matches!(s, Stage::Coding)); + + let s = transition( + s, + PipelineEvent::QaSkipped { + feature_branch: fb("feature/story-1"), + commits_ahead: nz(3), + }, + ) + .unwrap(); + assert!(matches!(s, Stage::Merge { .. })); + + let s = transition( + s, + PipelineEvent::MergeSucceeded { + merge_commit: sha("abc123"), + }, + ) + .unwrap(); + assert!(matches!(s, Stage::Done { .. })); + + let s = transition(s, PipelineEvent::Accepted).unwrap(); + assert!(matches!( + s, + Stage::Archived { + reason: ArchiveReason::Completed, + .. + } + )); +} + +#[test] +fn happy_path_with_qa() { + let s = Stage::Coding; + let s = transition(s, PipelineEvent::GatesStarted).unwrap(); + assert!(matches!(s, Stage::Qa)); + + let s = transition( + s, + PipelineEvent::GatesPassed { + feature_branch: fb("feature/story-2"), + commits_ahead: nz(5), + }, + ) + .unwrap(); + assert!(matches!(s, Stage::Merge { .. })); +} + +#[test] +fn qa_retry_loop() { + let s = Stage::Coding; + let s = transition(s, PipelineEvent::GatesStarted).unwrap(); + assert!(matches!(s, Stage::Qa)); + + let s = transition( + s, + PipelineEvent::GatesFailed { + reason: "tests failed".into(), + }, + ) + .unwrap(); + assert!(matches!(s, Stage::Coding)); +} + +// ── Bug 519: Merge with zero commits is unrepresentable ───────────── + +#[test] +fn merge_with_zero_commits_is_unrepresentable() { + assert!(NonZeroU32::new(0).is_none()); +} + +// ── Invalid transitions ───────────────────────────────────────────── + +#[test] +fn cannot_jump_backlog_to_done() { + let result = transition(Stage::Backlog, PipelineEvent::Accepted); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); +} + +#[test] +fn cannot_unblock_done_story() { + let s = Stage::Done { + merged_at: chrono::Utc::now(), + merge_commit: sha("abc"), + }; + let result = transition(s, PipelineEvent::Unblock); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); +} + +#[test] +fn cannot_unblock_review_held_story() { + let s = Stage::Archived { + archived_at: chrono::Utc::now(), + reason: ArchiveReason::ReviewHeld { + reason: "TBD".into(), + }, + }; + let result = transition(s, PipelineEvent::Unblock); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); +} + +#[test] +fn cannot_merge_from_backlog() { + let result = transition( + Stage::Backlog, + PipelineEvent::MergeSucceeded { + merge_commit: sha("abc"), + }, + ); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); +} + +#[test] +fn cannot_start_gates_from_backlog() { + let result = transition(Stage::Backlog, PipelineEvent::GatesStarted); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); +} + +#[test] +fn cannot_accept_from_coding() { + let result = transition(Stage::Coding, PipelineEvent::Accepted); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); +} + +// ── Block from any active stage ───────────────────────────────────── + +#[test] +fn block_from_any_active_stage() { + for s in [Stage::Backlog, Stage::Coding, Stage::Qa] { + let result = transition( + s.clone(), + PipelineEvent::Block { + reason: "stuck".into(), + }, + ); + assert!(matches!( + result, + Ok(Stage::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + }) + )); + } + + let m = Stage::Merge { + feature_branch: fb("f"), + commits_ahead: nz(1), + }; + let result = transition( + m, + PipelineEvent::Block { + reason: "stuck".into(), + }, + ); + assert!(matches!( + result, + Ok(Stage::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + }) + )); +} + +#[test] +fn unblock_returns_to_backlog() { + let s = Stage::Archived { + archived_at: chrono::Utc::now(), + reason: ArchiveReason::Blocked { + reason: "test".into(), + }, + }; + let result = transition(s, PipelineEvent::Unblock).unwrap(); + assert!(matches!(result, Stage::Backlog)); +} + +// ── Abandon / supersede ───────────────────────────────────────────── + +#[test] +fn abandon_from_any_active_or_done() { + for s in [ + Stage::Backlog, + Stage::Coding, + Stage::Qa, + Stage::Done { + merged_at: chrono::Utc::now(), + merge_commit: sha("x"), + }, + ] { + let result = transition(s, PipelineEvent::Abandon); + assert!(matches!( + result, + Ok(Stage::Archived { + reason: ArchiveReason::Abandoned, + .. + }) + )); + } +} + +#[test] +fn supersede_from_any_active_or_done() { + for s in [ + Stage::Backlog, + Stage::Coding, + Stage::Qa, + Stage::Done { + merged_at: chrono::Utc::now(), + merge_commit: sha("x"), + }, + ] { + let result = transition( + s, + PipelineEvent::Supersede { + by: sid("999_story_new"), + }, + ); + assert!(matches!( + result, + Ok(Stage::Archived { + reason: ArchiveReason::Superseded { .. }, + .. + }) + )); + } +} + +// ── Review hold ───────────────────────────────────────────────────── + +#[test] +fn review_hold_from_active_stages() { + for s in [Stage::Backlog, Stage::Coding, Stage::Qa] { + let result = transition( + s.clone(), + PipelineEvent::ReviewHold { + reason: "review".into(), + }, + ); + assert!(matches!( + result, + Ok(Stage::Archived { + reason: ArchiveReason::ReviewHeld { .. }, + .. + }) + )); + } +} + +// ── Merge failed final ────────────────────────────────────────────── + +#[test] +fn merge_failed_final() { + let s = Stage::Merge { + feature_branch: fb("f"), + commits_ahead: nz(1), + }; + let result = transition( + s, + PipelineEvent::MergeFailedFinal { + reason: "conflicts".into(), + }, + ) + .unwrap(); + assert!(matches!( + result, + Stage::Archived { + reason: ArchiveReason::MergeFailed { .. }, + .. + } + )); +} + +#[test] +fn merge_failed_only_from_merge() { + let result = transition( + Stage::Coding, + PipelineEvent::MergeFailedFinal { + reason: "conflicts".into(), + }, + ); + assert!(matches!( + result, + Err(TransitionError::InvalidTransition { .. }) + )); +} + +// ── Execution state machine ───────────────────────────────────────── + +#[test] +fn execution_happy_path() { + let e = ExecutionState::Idle; + let e = execution_transition( + e, + ExecutionEvent::SpawnRequested { + agent: AgentName("coder-1".into()), + }, + ) + .unwrap(); + assert!(matches!(e, ExecutionState::Pending { .. })); + + let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap(); + assert!(matches!(e, ExecutionState::Running { .. })); + + let e = execution_transition(e, ExecutionEvent::Heartbeat).unwrap(); + assert!(matches!(e, ExecutionState::Running { .. })); + + let e = execution_transition(e, ExecutionEvent::Exited { exit_code: 0 }).unwrap(); + assert!(matches!(e, ExecutionState::Completed { exit_code: 0, .. })); +} + +#[test] +fn execution_rate_limit_then_resume() { + let e = ExecutionState::Running { + agent: AgentName("coder-1".into()), + started_at: chrono::Utc::now(), + last_heartbeat: chrono::Utc::now(), + }; + let e = execution_transition( + e, + ExecutionEvent::HitRateLimit { + resume_at: chrono::Utc::now() + chrono::Duration::minutes(5), + }, + ) + .unwrap(); + assert!(matches!(e, ExecutionState::RateLimited { .. })); + + let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap(); + assert!(matches!(e, ExecutionState::Running { .. })); +} + +#[test] +fn execution_stop_from_anywhere() { + let e = ExecutionState::Running { + agent: AgentName("coder-1".into()), + started_at: chrono::Utc::now(), + last_heartbeat: chrono::Utc::now(), + }; + let e = execution_transition(e, ExecutionEvent::Stopped).unwrap(); + assert!(matches!(e, ExecutionState::Idle)); +} + +// ── Projection tests ──────────────────────────────────────────────── + +#[test] +fn bug_502_agent_not_in_stage() { + // Bug 502 was caused by a coder agent being assigned to a story in + // Merge stage. In the typed system, Stage has no `agent` field at all. + // Agent assignment is per-node ExecutionState. This test documents that + // the old failure mode is structurally impossible. + let merge = Stage::Merge { + feature_branch: BranchName("feature/story-1".into()), + commits_ahead: NonZeroU32::new(3).unwrap(), + }; + // Stage::Merge has exactly two fields: feature_branch and commits_ahead. + // There is no way to attach an agent name to it. The type system + // prevents bug 502 by construction. + assert!(matches!(merge, Stage::Merge { .. })); +} + +// ── TransitionError Display ───────────────────────────────────────── + +#[test] +fn transition_error_display() { + let err = TransitionError::InvalidTransition { + from_stage: "Backlog".into(), + event: "Accepted".into(), + }; + assert_eq!(err.to_string(), "invalid transition: Backlog + Accepted"); +} + +// ── ProjectionError Display ───────────────────────────────────────── diff --git a/server/src/pipeline_state/transition.rs b/server/src/pipeline_state/transition.rs new file mode 100644 index 00000000..8685fc35 --- /dev/null +++ b/server/src/pipeline_state/transition.rs @@ -0,0 +1,254 @@ +//! Pure state transition functions for pipeline and execution state machines. + +use chrono::Utc; +use serde::{Deserialize, Serialize}; + +use super::{ + AgentName, ArchiveReason, BranchName, ExecutionState, GitSha, Stage, StoryId, TransitionError, + stage_label, +}; + +// ── Pipeline events ───────────────────────────────────────────────────────── + +/// Events that drive Stage transitions. Each variant carries the data needed +/// to construct the destination state, so the transition function can never +/// accidentally land in an underspecified state. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum PipelineEvent { + /// Dependencies met; promote from backlog. + DepsMet, + /// Coder starting gates. + GatesStarted, + /// Gates passed — ready to merge. + GatesPassed { + feature_branch: BranchName, + commits_ahead: std::num::NonZeroU32, + }, + /// Gates failed; retry. + GatesFailed { reason: String }, + /// QA mode is "server" — skip QA, go straight to merge. + QaSkipped { + feature_branch: BranchName, + commits_ahead: std::num::NonZeroU32, + }, + /// Mergemaster squash succeeded. + MergeSucceeded { merge_commit: GitSha }, + /// Mergemaster gave up after retry budget. + MergeFailedFinal { reason: String }, + /// Story accepted (Done → Archived). + Accepted, + /// User blocked the story. + Block { reason: String }, + /// User unblocked. + Unblock, + /// User abandoned. + Abandon, + /// Story superseded by another. + Supersede { by: StoryId }, + /// Story put on review hold. + ReviewHold { reason: String }, +} + +// ── Per-node execution events ─────────────────────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ExecutionEvent { + SpawnRequested { agent: AgentName }, + SpawnedSuccessfully, + Heartbeat, + HitRateLimit { resume_at: chrono::DateTime }, + Exited { exit_code: i32 }, + Stopped, + Reset, +} + +// ── Label helper ──────────────────────────────────────────────────────────── + +/// Human-readable label for a `PipelineEvent` variant. +pub fn event_label(e: &PipelineEvent) -> &'static str { + match e { + PipelineEvent::DepsMet => "DepsMet", + PipelineEvent::GatesStarted => "GatesStarted", + PipelineEvent::GatesPassed { .. } => "GatesPassed", + PipelineEvent::GatesFailed { .. } => "GatesFailed", + PipelineEvent::QaSkipped { .. } => "QaSkipped", + PipelineEvent::MergeSucceeded { .. } => "MergeSucceeded", + PipelineEvent::MergeFailedFinal { .. } => "MergeFailedFinal", + PipelineEvent::Accepted => "Accepted", + PipelineEvent::Block { .. } => "Block", + PipelineEvent::Unblock => "Unblock", + PipelineEvent::Abandon => "Abandon", + PipelineEvent::Supersede { .. } => "Supersede", + PipelineEvent::ReviewHold { .. } => "ReviewHold", + } +} + +// ── The pipeline transition function ──────────────────────────────────────── + +/// Pure state transition. Takes the current Stage and an event, returns the +/// new Stage or a TransitionError. Side effects are dispatched separately +/// via the event bus. +pub fn transition(state: Stage, event: PipelineEvent) -> Result { + use PipelineEvent::*; + use Stage::*; + + let sl = stage_label(&state); + let el = event_label(&event); + let invalid = || TransitionError::InvalidTransition { + from_stage: sl.to_string(), + event: el.to_string(), + }; + + let now = Utc::now(); + + match (state, event) { + // ── Forward path ──────────────────────────────────────────────── + (Backlog, DepsMet) => Ok(Coding), + (Coding, GatesStarted) => Ok(Qa), + ( + Coding, + QaSkipped { + feature_branch, + commits_ahead, + }, + ) => Ok(Merge { + feature_branch, + commits_ahead, + }), + ( + Qa, + GatesPassed { + feature_branch, + commits_ahead, + }, + ) => Ok(Merge { + feature_branch, + commits_ahead, + }), + (Qa, GatesFailed { .. }) => Ok(Coding), + (Merge { .. }, MergeSucceeded { merge_commit }) => Ok(Done { + merged_at: now, + merge_commit, + }), + + // ── Done → Archived(Completed) ────────────────────────────────── + (Done { .. }, Accepted) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::Completed, + }), + + // ── Stuck states (any active → Archived) ─────────────────────── + (Backlog, Block { reason }) + | (Coding, Block { reason }) + | (Qa, Block { reason }) + | (Merge { .. }, Block { reason }) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::Blocked { reason }, + }), + + (Backlog, ReviewHold { reason }) + | (Coding, ReviewHold { reason }) + | (Qa, ReviewHold { reason }) + | (Merge { .. }, ReviewHold { reason }) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::ReviewHeld { reason }, + }), + + (Merge { .. }, MergeFailedFinal { reason }) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::MergeFailed { reason }, + }), + + // ── Abandon / supersede from any active or done stage ─────────── + (Backlog, Abandon) + | (Coding, Abandon) + | (Qa, Abandon) + | (Merge { .. }, Abandon) + | (Done { .. }, Abandon) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::Abandoned, + }), + + (Backlog, Supersede { by }) + | (Coding, Supersede { by }) + | (Qa, Supersede { by }) + | (Merge { .. }, Supersede { by }) + | (Done { .. }, Supersede { by }) => Ok(Archived { + archived_at: now, + reason: ArchiveReason::Superseded { by }, + }), + + // ── Unblock: only from Archived(Blocked) → Backlog ───────────── + ( + Archived { + reason: ArchiveReason::Blocked { .. }, + .. + }, + Unblock, + ) => Ok(Backlog), + + // ── Everything else is invalid ────────────────────────────────── + _ => Err(invalid()), + } +} + +// ── The execution state transition function ───────────────────────────────── + +/// Pure execution-state transition. Takes the current ExecutionState and an +/// ExecutionEvent, returns the new ExecutionState or a TransitionError. +pub fn execution_transition( + state: ExecutionState, + event: ExecutionEvent, +) -> Result { + use ExecutionEvent::*; + use ExecutionState::*; + + let now = Utc::now(); + + match (state, event) { + (Idle, SpawnRequested { agent }) => Ok(Pending { agent, since: now }), + + (Pending { agent, .. }, SpawnedSuccessfully) => Ok(Running { + agent, + started_at: now, + last_heartbeat: now, + }), + + ( + Running { + agent, started_at, .. + }, + Heartbeat, + ) => Ok(Running { + agent, + started_at, + last_heartbeat: now, + }), + + (Running { agent, .. }, HitRateLimit { resume_at }) + | (Pending { agent, .. }, HitRateLimit { resume_at }) => { + Ok(RateLimited { agent, resume_at }) + } + + (RateLimited { agent, .. }, SpawnedSuccessfully) => Ok(Running { + agent, + started_at: now, + last_heartbeat: now, + }), + + (Running { agent, .. }, Exited { exit_code }) + | (Pending { agent, .. }, Exited { exit_code }) + | (RateLimited { agent, .. }, Exited { exit_code }) => Ok(Completed { + agent, + exit_code, + completed_at: now, + }), + + (_, Stopped) | (_, Reset) => Ok(Idle), + + _ => Err(TransitionError::InvalidTransition { + from_stage: "ExecutionState".to_string(), + event: "".to_string(), + }), + } +} diff --git a/server/src/pipeline_state/types.rs b/server/src/pipeline_state/types.rs new file mode 100644 index 00000000..f8c58fb0 --- /dev/null +++ b/server/src/pipeline_state/types.rs @@ -0,0 +1,235 @@ +//! Core pipeline types: newtypes, stage and execution-state enums, and errors. + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::num::NonZeroU32; + +// ── Newtypes ──────────────────────────────────────────────────────────────── + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct StoryId(pub String); + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct BranchName(pub String); + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct GitSha(pub String); + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct AgentName(pub String); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct NodePubkey(pub [u8; 32]); + +impl fmt::Display for StoryId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.0) + } +} + +impl fmt::Display for AgentName { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.0) + } +} + +// ── Synced pipeline stage (lives in CRDT, converges across nodes) ─────────── + +/// The pipeline stage for a work item. +/// +/// This is the SHARED state — every node sees the same Stage for a given story +/// after CRDT convergence. Notice what is NOT a field: +/// - `agent` — local execution state, not pipeline state +/// - `retry_count` — also local +/// - `blocked` — folded into `Archived { reason: Blocked { .. } }` +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum Stage { + /// Story exists, waiting for dependencies or auto-assign promotion. + Backlog, + + /// Story is being actively coded somewhere in the mesh. + Coding, + + /// Coder has run; gates are running. + Qa, + + /// Gates passed; ready to merge. + /// `commits_ahead: NonZeroU32` makes "Merge with nothing to merge" + /// structurally impossible (eliminates bug 519). + Merge { + feature_branch: BranchName, + commits_ahead: NonZeroU32, + }, + + /// Mergemaster squashed to master. Always carries merge metadata. + Done { + merged_at: DateTime, + merge_commit: GitSha, + }, + + /// Out of the active flow. The reason explains why. + Archived { + archived_at: DateTime, + reason: ArchiveReason, + }, +} + +/// Why a story was archived. Subsumes the old `blocked`, `merge_failure`, +/// and `review_hold` front-matter fields (story 436). +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum ArchiveReason { + /// Normal happy-path completion. + Completed, + /// User explicitly abandoned the story. + Abandoned, + /// Replaced by another story. + Superseded { by: StoryId }, + /// Manually blocked, awaiting human resolution. + Blocked { reason: String }, + /// Mergemaster failed beyond the retry budget. + MergeFailed { reason: String }, + /// Held in review at human request. + ReviewHeld { reason: String }, +} + +// ── Stage convenience methods ────────────────────────────────────────────── + +impl Stage { + /// Returns true if this stage is an "active" stage (Coding, Qa, or Merge). + pub fn is_active(&self) -> bool { + matches!(self, Stage::Coding | Stage::Qa | Stage::Merge { .. }) + } + + /// Returns the filesystem directory name for this stage. + pub fn dir_name(&self) -> &'static str { + stage_dir_name(self) + } + + /// Returns true if this is the Archived(Blocked) variant. + pub fn is_blocked(&self) -> bool { + matches!( + self, + Stage::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + } + ) + } + + /// Parse a stage from its filesystem directory name. + /// + /// This is the single canonical conversion boundary for turning a loose + /// stage-directory string (from CRDT fields or watcher events) into a + /// typed `Stage`. Rich variants (`Done`, `Archived`, `Merge`) are + /// synthesised with zero-value fields — callers should use this only for + /// stage *classification* (e.g. `is_active()`, `matches!`), not for + /// accessing the rich metadata fields. + pub fn from_dir(s: &str) -> Option { + match s { + "1_backlog" => Some(Stage::Backlog), + "2_current" => Some(Stage::Coding), + "3_qa" => Some(Stage::Qa), + "4_merge" => Some(Stage::Merge { + feature_branch: BranchName(String::new()), + commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), + }), + "5_done" => Some(Stage::Done { + merged_at: DateTime::::UNIX_EPOCH, + merge_commit: GitSha(String::new()), + }), + "6_archived" => Some(Stage::Archived { + archived_at: DateTime::::UNIX_EPOCH, + reason: ArchiveReason::Completed, + }), + _ => None, + } + } +} + +// ── Per-node execution state ──────────────────────────────────────────────── + +/// Per-node execution tracking, stored in the CRDT under each node's pubkey. +/// +/// Each node only writes to entries where `node_pubkey == self.pubkey`, so +/// there are no inter-author CRDT merge conflicts. All nodes can READ all +/// entries to know what's happening across the mesh. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum ExecutionState { + Idle, + Pending { + agent: AgentName, + since: DateTime, + }, + Running { + agent: AgentName, + started_at: DateTime, + last_heartbeat: DateTime, + }, + RateLimited { + agent: AgentName, + resume_at: DateTime, + }, + Completed { + agent: AgentName, + exit_code: i32, + completed_at: DateTime, + }, +} + +// ── Pipeline item (the aggregate) ─────────────────────────────────────────── + +/// A fully typed pipeline item. Every field is validated by construction. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PipelineItem { + pub story_id: StoryId, + pub name: String, + pub stage: Stage, + pub depends_on: Vec, + pub retry_count: u32, +} + +// ── Transition errors ─────────────────────────────────────────────────────── + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum TransitionError { + InvalidTransition { from_stage: String, event: String }, +} + +impl fmt::Display for TransitionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InvalidTransition { from_stage, event } => { + write!(f, "invalid transition: {from_stage} + {event}") + } + } + } +} + +impl std::error::Error for TransitionError {} + +// ── Label helpers ─────────────────────────────────────────────────────────── + +/// Human-readable label for a `Stage` variant. +pub fn stage_label(s: &Stage) -> &'static str { + match s { + Stage::Backlog => "Backlog", + Stage::Coding => "Coding", + Stage::Qa => "Qa", + Stage::Merge { .. } => "Merge", + Stage::Done { .. } => "Done", + Stage::Archived { .. } => "Archived", + } +} + +/// Map a Stage to the filesystem directory name used by the work pipeline. +pub fn stage_dir_name(s: &Stage) -> &'static str { + match s { + Stage::Backlog => "1_backlog", + Stage::Coding => "2_current", + Stage::Qa => "3_qa", + Stage::Merge { .. } => "4_merge", + Stage::Done { .. } => "5_done", + Stage::Archived { .. } => "6_archived", + } +}