refactor: split pipeline_state.rs into 4 sub-modules with co-located tests
The 1411-line pipeline_state.rs is split into: - mod.rs: types, transition(), execution_transition(), labels + transition tests (885 lines) - events.rs: TransitionFired, EventBus, TransitionSubscriber + event-bus tests (114 lines) - projection.rs: ProjectionError, TryFrom<&PipelineItemView>, read_typed + projection tests (379 lines) - subscribers.rs: 5 concrete TransitionSubscriber stubs (95 lines) Tests stay co-located. No behaviour change. All 42 pipeline_state tests pass; full suite green.
This commit is contained in:
@@ -0,0 +1,111 @@
|
||||
//! Event bus for pipeline state transitions.
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
use super::{BranchName, PipelineEvent, Stage, StoryId};
|
||||
|
||||
/// Fired when a pipeline stage transition completes.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TransitionFired {
|
||||
pub story_id: StoryId,
|
||||
pub before: Stage,
|
||||
pub after: Stage,
|
||||
pub event: PipelineEvent,
|
||||
pub at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
/// Trait for side-effect handlers that react to pipeline transitions.
|
||||
pub trait TransitionSubscriber: Send + Sync {
|
||||
fn name(&self) -> &'static str;
|
||||
fn on_transition(&self, fired: &TransitionFired);
|
||||
}
|
||||
|
||||
pub struct EventBus {
|
||||
subscribers: Vec<Box<dyn TransitionSubscriber>>,
|
||||
}
|
||||
|
||||
impl EventBus {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
subscribers: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn subscribe<S: TransitionSubscriber + 'static>(&mut self, subscriber: S) {
|
||||
self.subscribers.push(Box::new(subscriber));
|
||||
}
|
||||
|
||||
pub fn fire(&self, event: TransitionFired) {
|
||||
for sub in &self.subscribers {
|
||||
sub.on_transition(&event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for EventBus {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::num::NonZeroU32;
|
||||
|
||||
fn nz(n: u32) -> NonZeroU32 { NonZeroU32::new(n).unwrap() }
|
||||
fn fb(name: &str) -> BranchName { BranchName(name.to_string()) }
|
||||
fn sid(s: &str) -> StoryId { StoryId(s.to_string()) }
|
||||
|
||||
#[test]
|
||||
fn event_bus_fires_to_all_subscribers() {
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
|
||||
struct CountingSub(Arc<AtomicU32>);
|
||||
impl TransitionSubscriber for CountingSub {
|
||||
fn name(&self) -> &'static str {
|
||||
"counter"
|
||||
}
|
||||
fn on_transition(&self, _: &TransitionFired) {
|
||||
self.0.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
let counter = Arc::new(AtomicU32::new(0));
|
||||
let mut bus = EventBus::new();
|
||||
bus.subscribe(CountingSub(counter.clone()));
|
||||
bus.subscribe(CountingSub(counter.clone()));
|
||||
|
||||
bus.fire(TransitionFired {
|
||||
story_id: StoryId("test".into()),
|
||||
before: Stage::Backlog,
|
||||
after: Stage::Coding,
|
||||
event: PipelineEvent::DepsMet,
|
||||
at: Utc::now(),
|
||||
});
|
||||
|
||||
assert_eq!(counter.load(Ordering::SeqCst), 2);
|
||||
}
|
||||
|
||||
// ── Bug 502 regression: agent field is not part of Stage ────────────
|
||||
|
||||
#[test]
|
||||
fn bug_502_agent_not_in_stage() {
|
||||
// Bug 502 was caused by a coder agent being assigned to a story in
|
||||
// Merge stage. In the typed system, Stage has no `agent` field at all.
|
||||
// Agent assignment is per-node ExecutionState. This test documents that
|
||||
// the old failure mode is structurally impossible.
|
||||
let merge = Stage::Merge {
|
||||
feature_branch: BranchName("feature/story-1".into()),
|
||||
commits_ahead: NonZeroU32::new(3).unwrap(),
|
||||
};
|
||||
// Stage::Merge has exactly two fields: feature_branch and commits_ahead.
|
||||
// There is no way to attach an agent name to it. The type system
|
||||
// prevents bug 502 by construction.
|
||||
assert!(matches!(merge, Stage::Merge { .. }));
|
||||
}
|
||||
|
||||
// ── TransitionError Display ─────────────────────────────────────────
|
||||
}
|
||||
@@ -0,0 +1,885 @@
|
||||
//! Typed pipeline state machine (story 520).
|
||||
//!
|
||||
//! Replaces the stringly-typed CRDT views with strict Rust enums so that
|
||||
//! impossible states (e.g. `Stage::Merge` with zero commits, a "done" story
|
||||
//! with no merge metadata) are unrepresentable at compile time.
|
||||
//!
|
||||
//! The CRDT stays loose at the persistence layer — that's what makes it merge
|
||||
//! correctly across nodes. Every consumer above the CRDT operates on these
|
||||
//! strict typed enums, bridged by the projection layer (`TryFrom` / `From`).
|
||||
//!
|
||||
//! This is a foundation module: the types, transitions, projection layer, and
|
||||
//! event bus are fully defined and tested here. Consumers will be migrated to
|
||||
//! the typed API incrementally in follow-up stories.
|
||||
|
||||
// Foundation module — all items are exercised by tests but not yet called from
|
||||
// non-test code. The dead_code lint is suppressed until consumer migration.
|
||||
#![allow(dead_code)]
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::num::NonZeroU32;
|
||||
|
||||
use crate::crdt_state::PipelineItemView;
|
||||
|
||||
// ── Newtypes ────────────────────────────────────────────────────────────────
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct StoryId(pub String);
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct BranchName(pub String);
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct GitSha(pub String);
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct AgentName(pub String);
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct NodePubkey(pub [u8; 32]);
|
||||
|
||||
impl fmt::Display for StoryId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.write_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for AgentName {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.write_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
// ── Synced pipeline stage (lives in CRDT, converges across nodes) ───────────
|
||||
|
||||
/// The pipeline stage for a work item.
|
||||
///
|
||||
/// This is the SHARED state — every node sees the same Stage for a given story
|
||||
/// after CRDT convergence. Notice what is NOT a field:
|
||||
/// - `agent` — local execution state, not pipeline state
|
||||
/// - `retry_count` — also local
|
||||
/// - `blocked` — folded into `Archived { reason: Blocked { .. } }`
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum Stage {
|
||||
/// Story exists, waiting for dependencies or auto-assign promotion.
|
||||
Backlog,
|
||||
|
||||
/// Story is being actively coded somewhere in the mesh.
|
||||
Coding,
|
||||
|
||||
/// Coder has run; gates are running.
|
||||
Qa,
|
||||
|
||||
/// Gates passed; ready to merge.
|
||||
/// `commits_ahead: NonZeroU32` makes "Merge with nothing to merge"
|
||||
/// structurally impossible (eliminates bug 519).
|
||||
Merge {
|
||||
feature_branch: BranchName,
|
||||
commits_ahead: NonZeroU32,
|
||||
},
|
||||
|
||||
/// Mergemaster squashed to master. Always carries merge metadata.
|
||||
Done {
|
||||
merged_at: DateTime<Utc>,
|
||||
merge_commit: GitSha,
|
||||
},
|
||||
|
||||
/// Out of the active flow. The reason explains why.
|
||||
Archived {
|
||||
archived_at: DateTime<Utc>,
|
||||
reason: ArchiveReason,
|
||||
},
|
||||
}
|
||||
|
||||
/// Why a story was archived. Subsumes the old `blocked`, `merge_failure`,
|
||||
/// and `review_hold` front-matter fields (story 436).
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum ArchiveReason {
|
||||
/// Normal happy-path completion.
|
||||
Completed,
|
||||
/// User explicitly abandoned the story.
|
||||
Abandoned,
|
||||
/// Replaced by another story.
|
||||
Superseded { by: StoryId },
|
||||
/// Manually blocked, awaiting human resolution.
|
||||
Blocked { reason: String },
|
||||
/// Mergemaster failed beyond the retry budget.
|
||||
MergeFailed { reason: String },
|
||||
/// Held in review at human request.
|
||||
ReviewHeld { reason: String },
|
||||
}
|
||||
|
||||
// ── Stage convenience methods ──────────────────────────────────────────────
|
||||
|
||||
impl Stage {
|
||||
/// Returns true if this stage is an "active" stage (Coding, Qa, or Merge).
|
||||
pub fn is_active(&self) -> bool {
|
||||
matches!(self, Stage::Coding | Stage::Qa | Stage::Merge { .. })
|
||||
}
|
||||
|
||||
/// Returns the filesystem directory name for this stage.
|
||||
pub fn dir_name(&self) -> &'static str {
|
||||
stage_dir_name(self)
|
||||
}
|
||||
|
||||
/// Returns true if this is the Archived(Blocked) variant.
|
||||
pub fn is_blocked(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Stage::Archived {
|
||||
reason: ArchiveReason::Blocked { .. },
|
||||
..
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// ── Per-node execution state ────────────────────────────────────────────────
|
||||
|
||||
/// Per-node execution tracking, stored in the CRDT under each node's pubkey.
|
||||
///
|
||||
/// Each node only writes to entries where `node_pubkey == self.pubkey`, so
|
||||
/// there are no inter-author CRDT merge conflicts. All nodes can READ all
|
||||
/// entries to know what's happening across the mesh.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum ExecutionState {
|
||||
Idle,
|
||||
Pending {
|
||||
agent: AgentName,
|
||||
since: DateTime<Utc>,
|
||||
},
|
||||
Running {
|
||||
agent: AgentName,
|
||||
started_at: DateTime<Utc>,
|
||||
last_heartbeat: DateTime<Utc>,
|
||||
},
|
||||
RateLimited {
|
||||
agent: AgentName,
|
||||
resume_at: DateTime<Utc>,
|
||||
},
|
||||
Completed {
|
||||
agent: AgentName,
|
||||
exit_code: i32,
|
||||
completed_at: DateTime<Utc>,
|
||||
},
|
||||
}
|
||||
|
||||
// ── Pipeline item (the aggregate) ───────────────────────────────────────────
|
||||
|
||||
/// A fully typed pipeline item. Every field is validated by construction.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct PipelineItem {
|
||||
pub story_id: StoryId,
|
||||
pub name: String,
|
||||
pub stage: Stage,
|
||||
pub depends_on: Vec<StoryId>,
|
||||
pub retry_count: u32,
|
||||
}
|
||||
|
||||
// ── Pipeline events ─────────────────────────────────────────────────────────
|
||||
|
||||
/// Events that drive Stage transitions. Each variant carries the data needed
|
||||
/// to construct the destination state, so the transition function can never
|
||||
/// accidentally land in an underspecified state.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum PipelineEvent {
|
||||
/// Dependencies met; promote from backlog.
|
||||
DepsMet,
|
||||
/// Coder starting gates.
|
||||
GatesStarted,
|
||||
/// Gates passed — ready to merge.
|
||||
GatesPassed {
|
||||
feature_branch: BranchName,
|
||||
commits_ahead: NonZeroU32,
|
||||
},
|
||||
/// Gates failed; retry.
|
||||
GatesFailed { reason: String },
|
||||
/// QA mode is "server" — skip QA, go straight to merge.
|
||||
QaSkipped {
|
||||
feature_branch: BranchName,
|
||||
commits_ahead: NonZeroU32,
|
||||
},
|
||||
/// Mergemaster squash succeeded.
|
||||
MergeSucceeded { merge_commit: GitSha },
|
||||
/// Mergemaster gave up after retry budget.
|
||||
MergeFailedFinal { reason: String },
|
||||
/// Story accepted (Done → Archived).
|
||||
Accepted,
|
||||
/// User blocked the story.
|
||||
Block { reason: String },
|
||||
/// User unblocked.
|
||||
Unblock,
|
||||
/// User abandoned.
|
||||
Abandon,
|
||||
/// Story superseded by another.
|
||||
Supersede { by: StoryId },
|
||||
/// Story put on review hold.
|
||||
ReviewHold { reason: String },
|
||||
}
|
||||
|
||||
// ── Transition errors ───────────────────────────────────────────────────────
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum TransitionError {
|
||||
InvalidTransition { from_stage: String, event: String },
|
||||
}
|
||||
|
||||
impl fmt::Display for TransitionError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::InvalidTransition { from_stage, event } => {
|
||||
write!(f, "invalid transition: {from_stage} + {event}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for TransitionError {}
|
||||
|
||||
// ── The transition function ─────────────────────────────────────────────────
|
||||
|
||||
/// Pure state transition. Takes the current Stage and an event, returns the
|
||||
/// new Stage or a TransitionError. Side effects are dispatched separately
|
||||
/// via the event bus.
|
||||
pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, TransitionError> {
|
||||
use PipelineEvent::*;
|
||||
use Stage::*;
|
||||
|
||||
let sl = stage_label(&state);
|
||||
let el = event_label(&event);
|
||||
let invalid = || TransitionError::InvalidTransition {
|
||||
from_stage: sl.to_string(),
|
||||
event: el.to_string(),
|
||||
};
|
||||
|
||||
let now = Utc::now();
|
||||
|
||||
match (state, event) {
|
||||
// ── Forward path ────────────────────────────────────────────────
|
||||
(Backlog, DepsMet) => Ok(Coding),
|
||||
(Coding, GatesStarted) => Ok(Qa),
|
||||
(
|
||||
Coding,
|
||||
QaSkipped {
|
||||
feature_branch,
|
||||
commits_ahead,
|
||||
},
|
||||
) => Ok(Merge {
|
||||
feature_branch,
|
||||
commits_ahead,
|
||||
}),
|
||||
(
|
||||
Qa,
|
||||
GatesPassed {
|
||||
feature_branch,
|
||||
commits_ahead,
|
||||
},
|
||||
) => Ok(Merge {
|
||||
feature_branch,
|
||||
commits_ahead,
|
||||
}),
|
||||
(Qa, GatesFailed { .. }) => Ok(Coding),
|
||||
(Merge { .. }, MergeSucceeded { merge_commit }) => Ok(Done {
|
||||
merged_at: now,
|
||||
merge_commit,
|
||||
}),
|
||||
|
||||
// ── Done → Archived(Completed) ──────────────────────────────────
|
||||
(Done { .. }, Accepted) => Ok(Archived {
|
||||
archived_at: now,
|
||||
reason: ArchiveReason::Completed,
|
||||
}),
|
||||
|
||||
// ── Stuck states (any active → Archived) ───────────────────────
|
||||
(Backlog, Block { reason })
|
||||
| (Coding, Block { reason })
|
||||
| (Qa, Block { reason })
|
||||
| (Merge { .. }, Block { reason }) => Ok(Archived {
|
||||
archived_at: now,
|
||||
reason: ArchiveReason::Blocked { reason },
|
||||
}),
|
||||
|
||||
(Backlog, ReviewHold { reason })
|
||||
| (Coding, ReviewHold { reason })
|
||||
| (Qa, ReviewHold { reason })
|
||||
| (Merge { .. }, ReviewHold { reason }) => Ok(Archived {
|
||||
archived_at: now,
|
||||
reason: ArchiveReason::ReviewHeld { reason },
|
||||
}),
|
||||
|
||||
(Merge { .. }, MergeFailedFinal { reason }) => Ok(Archived {
|
||||
archived_at: now,
|
||||
reason: ArchiveReason::MergeFailed { reason },
|
||||
}),
|
||||
|
||||
// ── Abandon / supersede from any active or done stage ───────────
|
||||
(Backlog, Abandon)
|
||||
| (Coding, Abandon)
|
||||
| (Qa, Abandon)
|
||||
| (Merge { .. }, Abandon)
|
||||
| (Done { .. }, Abandon) => Ok(Archived {
|
||||
archived_at: now,
|
||||
reason: ArchiveReason::Abandoned,
|
||||
}),
|
||||
|
||||
(Backlog, Supersede { by })
|
||||
| (Coding, Supersede { by })
|
||||
| (Qa, Supersede { by })
|
||||
| (Merge { .. }, Supersede { by })
|
||||
| (Done { .. }, Supersede { by }) => Ok(Archived {
|
||||
archived_at: now,
|
||||
reason: ArchiveReason::Superseded { by },
|
||||
}),
|
||||
|
||||
// ── Unblock: only from Archived(Blocked) → Backlog ─────────────
|
||||
(
|
||||
Archived {
|
||||
reason: ArchiveReason::Blocked { .. },
|
||||
..
|
||||
},
|
||||
Unblock,
|
||||
) => Ok(Backlog),
|
||||
|
||||
// ── Everything else is invalid ──────────────────────────────────
|
||||
_ => Err(invalid()),
|
||||
}
|
||||
}
|
||||
|
||||
// ── Label helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
pub fn stage_label(s: &Stage) -> &'static str {
|
||||
match s {
|
||||
Stage::Backlog => "Backlog",
|
||||
Stage::Coding => "Coding",
|
||||
Stage::Qa => "Qa",
|
||||
Stage::Merge { .. } => "Merge",
|
||||
Stage::Done { .. } => "Done",
|
||||
Stage::Archived { .. } => "Archived",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn event_label(e: &PipelineEvent) -> &'static str {
|
||||
match e {
|
||||
PipelineEvent::DepsMet => "DepsMet",
|
||||
PipelineEvent::GatesStarted => "GatesStarted",
|
||||
PipelineEvent::GatesPassed { .. } => "GatesPassed",
|
||||
PipelineEvent::GatesFailed { .. } => "GatesFailed",
|
||||
PipelineEvent::QaSkipped { .. } => "QaSkipped",
|
||||
PipelineEvent::MergeSucceeded { .. } => "MergeSucceeded",
|
||||
PipelineEvent::MergeFailedFinal { .. } => "MergeFailedFinal",
|
||||
PipelineEvent::Accepted => "Accepted",
|
||||
PipelineEvent::Block { .. } => "Block",
|
||||
PipelineEvent::Unblock => "Unblock",
|
||||
PipelineEvent::Abandon => "Abandon",
|
||||
PipelineEvent::Supersede { .. } => "Supersede",
|
||||
PipelineEvent::ReviewHold { .. } => "ReviewHold",
|
||||
}
|
||||
}
|
||||
|
||||
/// Map a Stage to the filesystem directory name used by the work pipeline.
|
||||
pub fn stage_dir_name(s: &Stage) -> &'static str {
|
||||
match s {
|
||||
Stage::Backlog => "1_backlog",
|
||||
Stage::Coding => "2_current",
|
||||
Stage::Qa => "3_qa",
|
||||
Stage::Merge { .. } => "4_merge",
|
||||
Stage::Done { .. } => "5_done",
|
||||
Stage::Archived { .. } => "6_archived",
|
||||
}
|
||||
}
|
||||
|
||||
// ── Per-node execution state machine ────────────────────────────────────────
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum ExecutionEvent {
|
||||
SpawnRequested { agent: AgentName },
|
||||
SpawnedSuccessfully,
|
||||
Heartbeat,
|
||||
HitRateLimit { resume_at: DateTime<Utc> },
|
||||
Exited { exit_code: i32 },
|
||||
Stopped,
|
||||
Reset,
|
||||
}
|
||||
|
||||
pub fn execution_transition(
|
||||
state: ExecutionState,
|
||||
event: ExecutionEvent,
|
||||
) -> Result<ExecutionState, TransitionError> {
|
||||
use ExecutionEvent::*;
|
||||
use ExecutionState::*;
|
||||
|
||||
let now = Utc::now();
|
||||
|
||||
match (state, event) {
|
||||
(Idle, SpawnRequested { agent }) => Ok(Pending { agent, since: now }),
|
||||
|
||||
(Pending { agent, .. }, SpawnedSuccessfully) => Ok(Running {
|
||||
agent,
|
||||
started_at: now,
|
||||
last_heartbeat: now,
|
||||
}),
|
||||
|
||||
(
|
||||
Running {
|
||||
agent, started_at, ..
|
||||
},
|
||||
Heartbeat,
|
||||
) => Ok(Running {
|
||||
agent,
|
||||
started_at,
|
||||
last_heartbeat: now,
|
||||
}),
|
||||
|
||||
(Running { agent, .. }, HitRateLimit { resume_at })
|
||||
| (Pending { agent, .. }, HitRateLimit { resume_at }) => {
|
||||
Ok(RateLimited { agent, resume_at })
|
||||
}
|
||||
|
||||
(RateLimited { agent, .. }, SpawnedSuccessfully) => Ok(Running {
|
||||
agent,
|
||||
started_at: now,
|
||||
last_heartbeat: now,
|
||||
}),
|
||||
|
||||
(Running { agent, .. }, Exited { exit_code })
|
||||
| (Pending { agent, .. }, Exited { exit_code })
|
||||
| (RateLimited { agent, .. }, Exited { exit_code }) => Ok(Completed {
|
||||
agent,
|
||||
exit_code,
|
||||
completed_at: now,
|
||||
}),
|
||||
|
||||
(_, Stopped) | (_, Reset) => Ok(Idle),
|
||||
|
||||
_ => Err(TransitionError::InvalidTransition {
|
||||
from_stage: "ExecutionState".to_string(),
|
||||
event: "<exec event>".to_string(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
mod events;
|
||||
mod projection;
|
||||
mod subscribers;
|
||||
|
||||
pub use events::{EventBus, TransitionFired, TransitionSubscriber};
|
||||
pub use projection::{ProjectionError, project_stage, read_all_typed, read_typed};
|
||||
pub use subscribers::{
|
||||
AutoAssignSubscriber, FileRendererSubscriber, MatrixBotSubscriber, PipelineItemsSubscriber,
|
||||
WebUiBroadcastSubscriber,
|
||||
};
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::num::NonZeroU32;
|
||||
|
||||
fn nz(n: u32) -> NonZeroU32 {
|
||||
NonZeroU32::new(n).unwrap()
|
||||
}
|
||||
fn fb(name: &str) -> BranchName {
|
||||
BranchName(name.to_string())
|
||||
}
|
||||
fn sha(s: &str) -> GitSha {
|
||||
GitSha(s.to_string())
|
||||
}
|
||||
fn sid(s: &str) -> StoryId {
|
||||
StoryId(s.to_string())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn happy_path_backlog_through_archived() {
|
||||
let s = Stage::Backlog;
|
||||
let s = transition(s, PipelineEvent::DepsMet).unwrap();
|
||||
assert!(matches!(s, Stage::Coding));
|
||||
|
||||
let s = transition(
|
||||
s,
|
||||
PipelineEvent::QaSkipped {
|
||||
feature_branch: fb("feature/story-1"),
|
||||
commits_ahead: nz(3),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert!(matches!(s, Stage::Merge { .. }));
|
||||
|
||||
let s = transition(
|
||||
s,
|
||||
PipelineEvent::MergeSucceeded {
|
||||
merge_commit: sha("abc123"),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert!(matches!(s, Stage::Done { .. }));
|
||||
|
||||
let s = transition(s, PipelineEvent::Accepted).unwrap();
|
||||
assert!(matches!(
|
||||
s,
|
||||
Stage::Archived {
|
||||
reason: ArchiveReason::Completed,
|
||||
..
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn happy_path_with_qa() {
|
||||
let s = Stage::Coding;
|
||||
let s = transition(s, PipelineEvent::GatesStarted).unwrap();
|
||||
assert!(matches!(s, Stage::Qa));
|
||||
|
||||
let s = transition(
|
||||
s,
|
||||
PipelineEvent::GatesPassed {
|
||||
feature_branch: fb("feature/story-2"),
|
||||
commits_ahead: nz(5),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert!(matches!(s, Stage::Merge { .. }));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn qa_retry_loop() {
|
||||
let s = Stage::Coding;
|
||||
let s = transition(s, PipelineEvent::GatesStarted).unwrap();
|
||||
assert!(matches!(s, Stage::Qa));
|
||||
|
||||
let s = transition(
|
||||
s,
|
||||
PipelineEvent::GatesFailed {
|
||||
reason: "tests failed".into(),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert!(matches!(s, Stage::Coding));
|
||||
}
|
||||
|
||||
// ── Bug 519: Merge with zero commits is unrepresentable ─────────────
|
||||
|
||||
#[test]
|
||||
fn merge_with_zero_commits_is_unrepresentable() {
|
||||
assert!(NonZeroU32::new(0).is_none());
|
||||
}
|
||||
|
||||
// ── Invalid transitions ─────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn cannot_jump_backlog_to_done() {
|
||||
let result = transition(Stage::Backlog, PipelineEvent::Accepted);
|
||||
assert!(matches!(
|
||||
result,
|
||||
Err(TransitionError::InvalidTransition { .. })
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cannot_unblock_done_story() {
|
||||
let s = Stage::Done {
|
||||
merged_at: Utc::now(),
|
||||
merge_commit: sha("abc"),
|
||||
};
|
||||
let result = transition(s, PipelineEvent::Unblock);
|
||||
assert!(matches!(
|
||||
result,
|
||||
Err(TransitionError::InvalidTransition { .. })
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cannot_unblock_review_held_story() {
|
||||
let s = Stage::Archived {
|
||||
archived_at: Utc::now(),
|
||||
reason: ArchiveReason::ReviewHeld {
|
||||
reason: "TBD".into(),
|
||||
},
|
||||
};
|
||||
let result = transition(s, PipelineEvent::Unblock);
|
||||
assert!(matches!(
|
||||
result,
|
||||
Err(TransitionError::InvalidTransition { .. })
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cannot_merge_from_backlog() {
|
||||
let result = transition(
|
||||
Stage::Backlog,
|
||||
PipelineEvent::MergeSucceeded {
|
||||
merge_commit: sha("abc"),
|
||||
},
|
||||
);
|
||||
assert!(matches!(
|
||||
result,
|
||||
Err(TransitionError::InvalidTransition { .. })
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cannot_start_gates_from_backlog() {
|
||||
let result = transition(Stage::Backlog, PipelineEvent::GatesStarted);
|
||||
assert!(matches!(
|
||||
result,
|
||||
Err(TransitionError::InvalidTransition { .. })
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cannot_accept_from_coding() {
|
||||
let result = transition(Stage::Coding, PipelineEvent::Accepted);
|
||||
assert!(matches!(
|
||||
result,
|
||||
Err(TransitionError::InvalidTransition { .. })
|
||||
));
|
||||
}
|
||||
|
||||
// ── Block from any active stage ─────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn block_from_any_active_stage() {
|
||||
for s in [Stage::Backlog, Stage::Coding, Stage::Qa] {
|
||||
let result = transition(
|
||||
s.clone(),
|
||||
PipelineEvent::Block {
|
||||
reason: "stuck".into(),
|
||||
},
|
||||
);
|
||||
assert!(matches!(
|
||||
result,
|
||||
Ok(Stage::Archived {
|
||||
reason: ArchiveReason::Blocked { .. },
|
||||
..
|
||||
})
|
||||
));
|
||||
}
|
||||
|
||||
let m = Stage::Merge {
|
||||
feature_branch: fb("f"),
|
||||
commits_ahead: nz(1),
|
||||
};
|
||||
let result = transition(
|
||||
m,
|
||||
PipelineEvent::Block {
|
||||
reason: "stuck".into(),
|
||||
},
|
||||
);
|
||||
assert!(matches!(
|
||||
result,
|
||||
Ok(Stage::Archived {
|
||||
reason: ArchiveReason::Blocked { .. },
|
||||
..
|
||||
})
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unblock_returns_to_backlog() {
|
||||
let s = Stage::Archived {
|
||||
archived_at: Utc::now(),
|
||||
reason: ArchiveReason::Blocked {
|
||||
reason: "test".into(),
|
||||
},
|
||||
};
|
||||
let result = transition(s, PipelineEvent::Unblock).unwrap();
|
||||
assert!(matches!(result, Stage::Backlog));
|
||||
}
|
||||
|
||||
// ── Abandon / supersede ─────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn abandon_from_any_active_or_done() {
|
||||
for s in [
|
||||
Stage::Backlog,
|
||||
Stage::Coding,
|
||||
Stage::Qa,
|
||||
Stage::Done {
|
||||
merged_at: Utc::now(),
|
||||
merge_commit: sha("x"),
|
||||
},
|
||||
] {
|
||||
let result = transition(s, PipelineEvent::Abandon);
|
||||
assert!(matches!(
|
||||
result,
|
||||
Ok(Stage::Archived {
|
||||
reason: ArchiveReason::Abandoned,
|
||||
..
|
||||
})
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn supersede_from_any_active_or_done() {
|
||||
for s in [
|
||||
Stage::Backlog,
|
||||
Stage::Coding,
|
||||
Stage::Qa,
|
||||
Stage::Done {
|
||||
merged_at: Utc::now(),
|
||||
merge_commit: sha("x"),
|
||||
},
|
||||
] {
|
||||
let result = transition(
|
||||
s,
|
||||
PipelineEvent::Supersede {
|
||||
by: sid("999_story_new"),
|
||||
},
|
||||
);
|
||||
assert!(matches!(
|
||||
result,
|
||||
Ok(Stage::Archived {
|
||||
reason: ArchiveReason::Superseded { .. },
|
||||
..
|
||||
})
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// ── Review hold ─────────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn review_hold_from_active_stages() {
|
||||
for s in [Stage::Backlog, Stage::Coding, Stage::Qa] {
|
||||
let result = transition(
|
||||
s.clone(),
|
||||
PipelineEvent::ReviewHold {
|
||||
reason: "review".into(),
|
||||
},
|
||||
);
|
||||
assert!(matches!(
|
||||
result,
|
||||
Ok(Stage::Archived {
|
||||
reason: ArchiveReason::ReviewHeld { .. },
|
||||
..
|
||||
})
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// ── Merge failed final ──────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn merge_failed_final() {
|
||||
let s = Stage::Merge {
|
||||
feature_branch: fb("f"),
|
||||
commits_ahead: nz(1),
|
||||
};
|
||||
let result = transition(
|
||||
s,
|
||||
PipelineEvent::MergeFailedFinal {
|
||||
reason: "conflicts".into(),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert!(matches!(
|
||||
result,
|
||||
Stage::Archived {
|
||||
reason: ArchiveReason::MergeFailed { .. },
|
||||
..
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn merge_failed_only_from_merge() {
|
||||
let result = transition(
|
||||
Stage::Coding,
|
||||
PipelineEvent::MergeFailedFinal {
|
||||
reason: "conflicts".into(),
|
||||
},
|
||||
);
|
||||
assert!(matches!(
|
||||
result,
|
||||
Err(TransitionError::InvalidTransition { .. })
|
||||
));
|
||||
}
|
||||
|
||||
// ── Execution state machine ─────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn execution_happy_path() {
|
||||
let e = ExecutionState::Idle;
|
||||
let e = execution_transition(
|
||||
e,
|
||||
ExecutionEvent::SpawnRequested {
|
||||
agent: AgentName("coder-1".into()),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert!(matches!(e, ExecutionState::Pending { .. }));
|
||||
|
||||
let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap();
|
||||
assert!(matches!(e, ExecutionState::Running { .. }));
|
||||
|
||||
let e = execution_transition(e, ExecutionEvent::Heartbeat).unwrap();
|
||||
assert!(matches!(e, ExecutionState::Running { .. }));
|
||||
|
||||
let e = execution_transition(e, ExecutionEvent::Exited { exit_code: 0 }).unwrap();
|
||||
assert!(matches!(e, ExecutionState::Completed { exit_code: 0, .. }));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn execution_rate_limit_then_resume() {
|
||||
let e = ExecutionState::Running {
|
||||
agent: AgentName("coder-1".into()),
|
||||
started_at: Utc::now(),
|
||||
last_heartbeat: Utc::now(),
|
||||
};
|
||||
let e = execution_transition(
|
||||
e,
|
||||
ExecutionEvent::HitRateLimit {
|
||||
resume_at: Utc::now() + chrono::Duration::minutes(5),
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert!(matches!(e, ExecutionState::RateLimited { .. }));
|
||||
|
||||
let e = execution_transition(e, ExecutionEvent::SpawnedSuccessfully).unwrap();
|
||||
assert!(matches!(e, ExecutionState::Running { .. }));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn execution_stop_from_anywhere() {
|
||||
let e = ExecutionState::Running {
|
||||
agent: AgentName("coder-1".into()),
|
||||
started_at: Utc::now(),
|
||||
last_heartbeat: Utc::now(),
|
||||
};
|
||||
let e = execution_transition(e, ExecutionEvent::Stopped).unwrap();
|
||||
assert!(matches!(e, ExecutionState::Idle));
|
||||
}
|
||||
|
||||
// ── Projection tests ────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn bug_502_agent_not_in_stage() {
|
||||
// Bug 502 was caused by a coder agent being assigned to a story in
|
||||
// Merge stage. In the typed system, Stage has no `agent` field at all.
|
||||
// Agent assignment is per-node ExecutionState. This test documents that
|
||||
// the old failure mode is structurally impossible.
|
||||
let merge = Stage::Merge {
|
||||
feature_branch: BranchName("feature/story-1".into()),
|
||||
commits_ahead: NonZeroU32::new(3).unwrap(),
|
||||
};
|
||||
// Stage::Merge has exactly two fields: feature_branch and commits_ahead.
|
||||
// There is no way to attach an agent name to it. The type system
|
||||
// prevents bug 502 by construction.
|
||||
assert!(matches!(merge, Stage::Merge { .. }));
|
||||
}
|
||||
|
||||
// ── TransitionError Display ─────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn transition_error_display() {
|
||||
let err = TransitionError::InvalidTransition {
|
||||
from_stage: "Backlog".into(),
|
||||
event: "Accepted".into(),
|
||||
};
|
||||
assert_eq!(err.to_string(), "invalid transition: Backlog + Accepted");
|
||||
}
|
||||
|
||||
// ── ProjectionError Display ─────────────────────────────────────────
|
||||
}
|
||||
@@ -0,0 +1,391 @@
|
||||
//! Projection layer — converts loose CRDT views into typed `PipelineItem` enums.
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use std::fmt;
|
||||
use std::num::NonZeroU32;
|
||||
|
||||
use crate::crdt_state::{PipelineItemView, read_all_items, read_item};
|
||||
|
||||
use super::{
|
||||
ArchiveReason, BranchName, ExecutionState, GitSha, PipelineItem, Stage, StoryId,
|
||||
stage_dir_name,
|
||||
};
|
||||
|
||||
/// Errors from projecting loose CRDT data into typed enums.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum ProjectionError {
|
||||
/// The stage string from the CRDT doesn't map to any known Stage variant.
|
||||
UnknownStage(String),
|
||||
/// A required field is missing from the CRDT data.
|
||||
MissingField(&'static str),
|
||||
/// A field has an invalid value.
|
||||
InvalidField { field: &'static str, detail: String },
|
||||
}
|
||||
|
||||
impl fmt::Display for ProjectionError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::UnknownStage(s) => write!(f, "unknown stage: {s:?}"),
|
||||
Self::MissingField(field) => write!(f, "missing required field: {field}"),
|
||||
Self::InvalidField { field, detail } => {
|
||||
write!(f, "invalid field {field}: {detail}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for ProjectionError {}
|
||||
|
||||
// ── Projection: PipelineItemView → PipelineItem ─────────────────────────────
|
||||
|
||||
impl TryFrom<&PipelineItemView> for PipelineItem {
|
||||
type Error = ProjectionError;
|
||||
|
||||
fn try_from(view: &PipelineItemView) -> Result<Self, ProjectionError> {
|
||||
let story_id = StoryId(view.story_id.clone());
|
||||
let name = view.name.clone().unwrap_or_default();
|
||||
|
||||
let depends_on: Vec<StoryId> = view
|
||||
.depends_on
|
||||
.as_ref()
|
||||
.map(|deps| deps.iter().map(|d| StoryId(d.to_string())).collect())
|
||||
.unwrap_or_default();
|
||||
|
||||
let retry_count = view.retry_count.unwrap_or(0).max(0) as u32;
|
||||
|
||||
let stage = project_stage(view)?;
|
||||
|
||||
Ok(PipelineItem {
|
||||
story_id,
|
||||
name,
|
||||
stage,
|
||||
depends_on,
|
||||
retry_count,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Project the stage string + associated fields from a PipelineItemView into
|
||||
/// a typed Stage enum. This is the one carefully-controlled boundary where
|
||||
/// loose CRDT data becomes typed.
|
||||
pub fn project_stage(view: &PipelineItemView) -> Result<Stage, ProjectionError> {
|
||||
match view.stage.as_str() {
|
||||
"1_backlog" => Ok(Stage::Backlog),
|
||||
"2_current" => Ok(Stage::Coding),
|
||||
"3_qa" => Ok(Stage::Qa),
|
||||
"4_merge" => {
|
||||
// Merge stage in the current CRDT doesn't carry feature_branch or
|
||||
// commits_ahead — those are computed at transition time. For
|
||||
// projection from existing CRDT data, we synthesize defaults.
|
||||
// The feature branch follows the naming convention.
|
||||
let branch = format!("feature/story-{}", view.story_id);
|
||||
// Existing CRDT data doesn't track commits_ahead, so we use 1 as
|
||||
// a safe non-zero default (the item is in merge, so there must be
|
||||
// at least one commit).
|
||||
Ok(Stage::Merge {
|
||||
feature_branch: BranchName(branch),
|
||||
commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"),
|
||||
})
|
||||
}
|
||||
"5_done" => {
|
||||
// Use the stored merged_at timestamp if present. Legacy items
|
||||
// that pre-date this field have merged_at = None, so we fall back
|
||||
// to UNIX_EPOCH, which makes them older than any retention window
|
||||
// and therefore eligible for immediate sweep to 6_archived.
|
||||
let merged_at = view
|
||||
.merged_at
|
||||
.map(|ts| {
|
||||
DateTime::from_timestamp(ts as i64, 0).unwrap_or(DateTime::<Utc>::UNIX_EPOCH)
|
||||
})
|
||||
.unwrap_or(DateTime::<Utc>::UNIX_EPOCH);
|
||||
Ok(Stage::Done {
|
||||
merged_at,
|
||||
merge_commit: GitSha("legacy".to_string()),
|
||||
})
|
||||
}
|
||||
"6_archived" => {
|
||||
// Determine the archive reason from the CRDT fields.
|
||||
let reason = if view.blocked == Some(true) {
|
||||
ArchiveReason::Blocked {
|
||||
reason: "migrated from legacy blocked field".to_string(),
|
||||
}
|
||||
} else {
|
||||
// Default to Completed for legacy archived items.
|
||||
ArchiveReason::Completed
|
||||
};
|
||||
Ok(Stage::Archived {
|
||||
archived_at: Utc::now(),
|
||||
reason,
|
||||
})
|
||||
}
|
||||
other => Err(ProjectionError::UnknownStage(other.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
// ── Reverse projection: PipelineItem → stage dir string ─────────────────────
|
||||
|
||||
impl PipelineItem {
|
||||
/// Convert back to the loose fields that the CRDT write path expects.
|
||||
/// Returns `(stage_dir, blocked)`.
|
||||
pub fn to_crdt_fields(&self) -> (&'static str, bool) {
|
||||
let dir = stage_dir_name(&self.stage);
|
||||
let blocked = matches!(
|
||||
self.stage,
|
||||
Stage::Archived {
|
||||
reason: ArchiveReason::Blocked { .. },
|
||||
..
|
||||
}
|
||||
);
|
||||
(dir, blocked)
|
||||
}
|
||||
}
|
||||
|
||||
// ── Bridge to existing CRDT reads ───────────────────────────────────────────
|
||||
|
||||
/// Read all pipeline items from the CRDT and project them into typed enums.
|
||||
///
|
||||
/// Items that fail projection (e.g. unknown stage strings from a future
|
||||
/// version) are logged and skipped — they don't poison the entire read.
|
||||
pub fn read_all_typed() -> Vec<PipelineItem> {
|
||||
let Some(views) = crate::crdt_state::read_all_items() else {
|
||||
return Vec::new();
|
||||
};
|
||||
views
|
||||
.iter()
|
||||
.filter_map(|v| match PipelineItem::try_from(v) {
|
||||
Ok(item) => Some(item),
|
||||
Err(e) => {
|
||||
crate::slog!(
|
||||
"[pipeline_state] projection error for '{}': {e}",
|
||||
v.story_id
|
||||
);
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Read a single pipeline item by story_id and project it into the typed enum.
|
||||
pub fn read_typed(story_id: &str) -> Result<Option<PipelineItem>, ProjectionError> {
|
||||
let Some(view) = crate::crdt_state::read_item(story_id) else {
|
||||
return Ok(None);
|
||||
};
|
||||
PipelineItem::try_from(&view).map(Some)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use chrono::TimeZone;
|
||||
use std::num::NonZeroU32;
|
||||
|
||||
fn nz(n: u32) -> NonZeroU32 { NonZeroU32::new(n).unwrap() }
|
||||
fn fb(name: &str) -> BranchName { BranchName(name.to_string()) }
|
||||
fn sha(s: &str) -> GitSha { GitSha(s.to_string()) }
|
||||
fn sid(s: &str) -> StoryId { StoryId(s.to_string()) }
|
||||
|
||||
#[test]
|
||||
fn project_backlog_item() {
|
||||
let view = PipelineItemView {
|
||||
story_id: "42_story_test".to_string(),
|
||||
stage: "1_backlog".to_string(),
|
||||
name: Some("Test Story".to_string()),
|
||||
agent: None,
|
||||
retry_count: None,
|
||||
blocked: None,
|
||||
depends_on: Some(vec![10, 20]),
|
||||
claimed_by: None,
|
||||
claimed_at: None,
|
||||
merged_at: None,
|
||||
};
|
||||
let item = PipelineItem::try_from(&view).unwrap();
|
||||
assert_eq!(item.story_id, StoryId("42_story_test".to_string()));
|
||||
assert_eq!(item.name, "Test Story");
|
||||
assert!(matches!(item.stage, Stage::Backlog));
|
||||
assert_eq!(item.depends_on.len(), 2);
|
||||
assert_eq!(item.retry_count, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn project_current_item() {
|
||||
let view = PipelineItemView {
|
||||
story_id: "42_story_test".to_string(),
|
||||
stage: "2_current".to_string(),
|
||||
name: Some("Test".to_string()),
|
||||
agent: Some("coder-1".to_string()),
|
||||
retry_count: Some(2),
|
||||
blocked: None,
|
||||
depends_on: None,
|
||||
claimed_by: None,
|
||||
claimed_at: None,
|
||||
merged_at: None,
|
||||
};
|
||||
let item = PipelineItem::try_from(&view).unwrap();
|
||||
assert!(matches!(item.stage, Stage::Coding));
|
||||
assert_eq!(item.retry_count, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn project_merge_item() {
|
||||
let view = PipelineItemView {
|
||||
story_id: "42_story_test".to_string(),
|
||||
stage: "4_merge".to_string(),
|
||||
name: Some("Test".to_string()),
|
||||
agent: None,
|
||||
retry_count: None,
|
||||
blocked: None,
|
||||
depends_on: None,
|
||||
claimed_by: None,
|
||||
claimed_at: None,
|
||||
merged_at: None,
|
||||
};
|
||||
let item = PipelineItem::try_from(&view).unwrap();
|
||||
assert!(matches!(item.stage, Stage::Merge { .. }));
|
||||
if let Stage::Merge {
|
||||
feature_branch,
|
||||
commits_ahead,
|
||||
} = &item.stage
|
||||
{
|
||||
assert_eq!(feature_branch.0, "feature/story-42_story_test");
|
||||
assert_eq!(commits_ahead.get(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn project_archived_blocked_item() {
|
||||
let view = PipelineItemView {
|
||||
story_id: "42_story_test".to_string(),
|
||||
stage: "6_archived".to_string(),
|
||||
name: Some("Test".to_string()),
|
||||
agent: None,
|
||||
retry_count: None,
|
||||
blocked: Some(true),
|
||||
depends_on: None,
|
||||
claimed_by: None,
|
||||
claimed_at: None,
|
||||
merged_at: None,
|
||||
};
|
||||
let item = PipelineItem::try_from(&view).unwrap();
|
||||
assert!(matches!(
|
||||
item.stage,
|
||||
Stage::Archived {
|
||||
reason: ArchiveReason::Blocked { .. },
|
||||
..
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn project_archived_completed_item() {
|
||||
let view = PipelineItemView {
|
||||
story_id: "42_story_test".to_string(),
|
||||
stage: "6_archived".to_string(),
|
||||
name: Some("Test".to_string()),
|
||||
agent: None,
|
||||
retry_count: None,
|
||||
blocked: Some(false),
|
||||
depends_on: None,
|
||||
claimed_by: None,
|
||||
claimed_at: None,
|
||||
merged_at: None,
|
||||
};
|
||||
let item = PipelineItem::try_from(&view).unwrap();
|
||||
assert!(matches!(
|
||||
item.stage,
|
||||
Stage::Archived {
|
||||
reason: ArchiveReason::Completed,
|
||||
..
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn project_unknown_stage_returns_error() {
|
||||
let view = PipelineItemView {
|
||||
story_id: "42_story_test".to_string(),
|
||||
stage: "9_invalid".to_string(),
|
||||
name: Some("Test".to_string()),
|
||||
agent: None,
|
||||
retry_count: None,
|
||||
blocked: None,
|
||||
depends_on: None,
|
||||
claimed_by: None,
|
||||
claimed_at: None,
|
||||
merged_at: None,
|
||||
};
|
||||
let result = PipelineItem::try_from(&view);
|
||||
assert!(matches!(
|
||||
result,
|
||||
Err(ProjectionError::UnknownStage(s)) if s == "9_invalid"
|
||||
));
|
||||
}
|
||||
|
||||
// ── Reverse projection tests ────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn reverse_projection_stage_dirs() {
|
||||
let cases: Vec<(Stage, &str, bool)> = vec![
|
||||
(Stage::Backlog, "1_backlog", false),
|
||||
(Stage::Coding, "2_current", false),
|
||||
(Stage::Qa, "3_qa", false),
|
||||
(
|
||||
Stage::Merge {
|
||||
feature_branch: fb("f"),
|
||||
commits_ahead: nz(1),
|
||||
},
|
||||
"4_merge",
|
||||
false,
|
||||
),
|
||||
(
|
||||
Stage::Done {
|
||||
merged_at: Utc::now(),
|
||||
merge_commit: sha("abc"),
|
||||
},
|
||||
"5_done",
|
||||
false,
|
||||
),
|
||||
(
|
||||
Stage::Archived {
|
||||
archived_at: Utc::now(),
|
||||
reason: ArchiveReason::Completed,
|
||||
},
|
||||
"6_archived",
|
||||
false,
|
||||
),
|
||||
(
|
||||
Stage::Archived {
|
||||
archived_at: Utc::now(),
|
||||
reason: ArchiveReason::Blocked {
|
||||
reason: "stuck".into(),
|
||||
},
|
||||
},
|
||||
"6_archived",
|
||||
true,
|
||||
),
|
||||
];
|
||||
|
||||
for (stage, expected_dir, expected_blocked) in cases {
|
||||
let item = PipelineItem {
|
||||
story_id: StoryId("test".into()),
|
||||
name: "test".into(),
|
||||
stage,
|
||||
depends_on: vec![],
|
||||
retry_count: 0,
|
||||
};
|
||||
let (dir, blocked) = item.to_crdt_fields();
|
||||
assert_eq!(dir, expected_dir);
|
||||
assert_eq!(blocked, expected_blocked);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Event bus tests ─────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn projection_error_display() {
|
||||
let err = ProjectionError::UnknownStage("9_invalid".into());
|
||||
assert_eq!(err.to_string(), "unknown stage: \"9_invalid\"");
|
||||
|
||||
let err = ProjectionError::MissingField("story_id");
|
||||
assert_eq!(err.to_string(), "missing required field: story_id");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
//! Concrete subscriber stubs for the event bus.
|
||||
|
||||
use super::Stage;
|
||||
use super::events::{TransitionFired, TransitionSubscriber};
|
||||
use super::{event_label, stage_dir_name, stage_label};
|
||||
|
||||
|
||||
// ── Subscriber stubs (real dispatch uses these as the interface) ─────────────
|
||||
//
|
||||
// These are ready to wire into the event bus but not yet connected to the
|
||||
// actual subsystems. Suppress dead_code until consumers are migrated.
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct MatrixBotSubscriber;
|
||||
#[allow(dead_code)]
|
||||
impl TransitionSubscriber for MatrixBotSubscriber {
|
||||
fn name(&self) -> &'static str {
|
||||
"matrix-bot"
|
||||
}
|
||||
fn on_transition(&self, f: &TransitionFired) {
|
||||
crate::slog!(
|
||||
"[pipeline/matrix-bot] #{}: {} → {}",
|
||||
f.story_id,
|
||||
stage_label(&f.before),
|
||||
stage_label(&f.after)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct FileRendererSubscriber;
|
||||
#[allow(dead_code)]
|
||||
impl TransitionSubscriber for FileRendererSubscriber {
|
||||
fn name(&self) -> &'static str {
|
||||
"filesystem"
|
||||
}
|
||||
fn on_transition(&self, f: &TransitionFired) {
|
||||
crate::slog!(
|
||||
"[pipeline/filesystem] re-rendering work/{}/{}",
|
||||
stage_dir_name(&f.after),
|
||||
f.story_id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct PipelineItemsSubscriber;
|
||||
#[allow(dead_code)]
|
||||
impl TransitionSubscriber for PipelineItemsSubscriber {
|
||||
fn name(&self) -> &'static str {
|
||||
"pipeline-items"
|
||||
}
|
||||
fn on_transition(&self, f: &TransitionFired) {
|
||||
crate::slog!(
|
||||
"[pipeline/items] UPDATE stage = '{}' WHERE id = '{}'",
|
||||
stage_dir_name(&f.after),
|
||||
f.story_id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct AutoAssignSubscriber;
|
||||
#[allow(dead_code)]
|
||||
impl TransitionSubscriber for AutoAssignSubscriber {
|
||||
fn name(&self) -> &'static str {
|
||||
"auto-assign"
|
||||
}
|
||||
fn on_transition(&self, f: &TransitionFired) {
|
||||
if matches!(f.after, Stage::Done { .. } | Stage::Archived { .. }) {
|
||||
crate::slog!(
|
||||
"[pipeline/auto-assign] story {} reached {}; checking for promotable backlog items",
|
||||
f.story_id,
|
||||
stage_label(&f.after)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct WebUiBroadcastSubscriber;
|
||||
#[allow(dead_code)]
|
||||
impl TransitionSubscriber for WebUiBroadcastSubscriber {
|
||||
fn name(&self) -> &'static str {
|
||||
"web-ui-broadcast"
|
||||
}
|
||||
fn on_transition(&self, f: &TransitionFired) {
|
||||
crate::slog!(
|
||||
"[pipeline/web-ui] broadcasting #{} transition to connected clients",
|
||||
f.story_id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user