From 5aedf94512455f242c6544d40161645d3fc5db39 Mon Sep 17 00:00:00 2001 From: dave Date: Sun, 26 Apr 2026 21:30:55 +0000 Subject: [PATCH] refactor: split pipeline_state.rs into 4 sub-modules with co-located tests The 1411-line pipeline_state.rs is split into: - mod.rs: types, transition(), execution_transition(), labels + transition tests (885 lines) - events.rs: TransitionFired, EventBus, TransitionSubscriber + event-bus tests (114 lines) - projection.rs: ProjectionError, TryFrom<&PipelineItemView>, read_typed + projection tests (379 lines) - subscribers.rs: 5 concrete TransitionSubscriber stubs (95 lines) Tests stay co-located. No behaviour change. All 42 pipeline_state tests pass; full suite green. --- server/src/pipeline_state/events.rs | 111 ++++ .../mod.rs} | 546 +----------------- server/src/pipeline_state/projection.rs | 391 +++++++++++++ server/src/pipeline_state/subscribers.rs | 94 +++ 4 files changed, 606 insertions(+), 536 deletions(-) create mode 100644 server/src/pipeline_state/events.rs rename server/src/{pipeline_state.rs => pipeline_state/mod.rs} (61%) create mode 100644 server/src/pipeline_state/projection.rs create mode 100644 server/src/pipeline_state/subscribers.rs diff --git a/server/src/pipeline_state/events.rs b/server/src/pipeline_state/events.rs new file mode 100644 index 00000000..90c69db2 --- /dev/null +++ b/server/src/pipeline_state/events.rs @@ -0,0 +1,111 @@ +//! Event bus for pipeline state transitions. + +use chrono::{DateTime, Utc}; + +use super::{BranchName, PipelineEvent, Stage, StoryId}; + +/// Fired when a pipeline stage transition completes. +#[derive(Debug, Clone)] +pub struct TransitionFired { + pub story_id: StoryId, + pub before: Stage, + pub after: Stage, + pub event: PipelineEvent, + pub at: DateTime, +} + +/// Trait for side-effect handlers that react to pipeline transitions. +pub trait TransitionSubscriber: Send + Sync { + fn name(&self) -> &'static str; + fn on_transition(&self, fired: &TransitionFired); +} + +pub struct EventBus { + subscribers: Vec>, +} + +impl EventBus { + pub fn new() -> Self { + Self { + subscribers: Vec::new(), + } + } + + pub fn subscribe(&mut self, subscriber: S) { + self.subscribers.push(Box::new(subscriber)); + } + + pub fn fire(&self, event: TransitionFired) { + for sub in &self.subscribers { + sub.on_transition(&event); + } + } +} + +impl Default for EventBus { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + use std::num::NonZeroU32; + + fn nz(n: u32) -> NonZeroU32 { NonZeroU32::new(n).unwrap() } + fn fb(name: &str) -> BranchName { BranchName(name.to_string()) } + fn sid(s: &str) -> StoryId { StoryId(s.to_string()) } + + #[test] + fn event_bus_fires_to_all_subscribers() { + use std::sync::Arc; + use std::sync::atomic::{AtomicU32, Ordering}; + + struct CountingSub(Arc); + impl TransitionSubscriber for CountingSub { + fn name(&self) -> &'static str { + "counter" + } + fn on_transition(&self, _: &TransitionFired) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + + let counter = Arc::new(AtomicU32::new(0)); + let mut bus = EventBus::new(); + bus.subscribe(CountingSub(counter.clone())); + bus.subscribe(CountingSub(counter.clone())); + + bus.fire(TransitionFired { + story_id: StoryId("test".into()), + before: Stage::Backlog, + after: Stage::Coding, + event: PipelineEvent::DepsMet, + at: Utc::now(), + }); + + assert_eq!(counter.load(Ordering::SeqCst), 2); + } + + // ── Bug 502 regression: agent field is not part of Stage ──────────── + + #[test] + fn bug_502_agent_not_in_stage() { + // Bug 502 was caused by a coder agent being assigned to a story in + // Merge stage. In the typed system, Stage has no `agent` field at all. + // Agent assignment is per-node ExecutionState. This test documents that + // the old failure mode is structurally impossible. + let merge = Stage::Merge { + feature_branch: BranchName("feature/story-1".into()), + commits_ahead: NonZeroU32::new(3).unwrap(), + }; + // Stage::Merge has exactly two fields: feature_branch and commits_ahead. + // There is no way to attach an agent name to it. The type system + // prevents bug 502 by construction. + assert!(matches!(merge, Stage::Merge { .. })); + } + + // ── TransitionError Display ───────────────────────────────────────── +} diff --git a/server/src/pipeline_state.rs b/server/src/pipeline_state/mod.rs similarity index 61% rename from server/src/pipeline_state.rs rename to server/src/pipeline_state/mod.rs index b191c229..3a190d9c 100644 --- a/server/src/pipeline_state.rs +++ b/server/src/pipeline_state/mod.rs @@ -460,308 +460,22 @@ pub fn execution_transition( } } -// ── Event bus ─────────────────────────────────────────────────────────────── -/// Fired when a pipeline stage transition completes. -#[derive(Debug, Clone)] -pub struct TransitionFired { - pub story_id: StoryId, - pub before: Stage, - pub after: Stage, - pub event: PipelineEvent, - pub at: DateTime, -} +mod events; +mod projection; +mod subscribers; -/// Trait for side-effect handlers that react to pipeline transitions. -pub trait TransitionSubscriber: Send + Sync { - fn name(&self) -> &'static str; - fn on_transition(&self, fired: &TransitionFired); -} - -pub struct EventBus { - subscribers: Vec>, -} - -impl EventBus { - pub fn new() -> Self { - Self { - subscribers: Vec::new(), - } - } - - pub fn subscribe(&mut self, subscriber: S) { - self.subscribers.push(Box::new(subscriber)); - } - - pub fn fire(&self, event: TransitionFired) { - for sub in &self.subscribers { - sub.on_transition(&event); - } - } -} - -impl Default for EventBus { - fn default() -> Self { - Self::new() - } -} - -// ── Projection errors ─────────────────────────────────────────────────────── - -/// Errors from projecting loose CRDT data into typed enums. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum ProjectionError { - /// The stage string from the CRDT doesn't map to any known Stage variant. - UnknownStage(String), - /// A required field is missing from the CRDT data. - MissingField(&'static str), - /// A field has an invalid value. - InvalidField { field: &'static str, detail: String }, -} - -impl fmt::Display for ProjectionError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::UnknownStage(s) => write!(f, "unknown stage: {s:?}"), - Self::MissingField(field) => write!(f, "missing required field: {field}"), - Self::InvalidField { field, detail } => { - write!(f, "invalid field {field}: {detail}") - } - } - } -} - -impl std::error::Error for ProjectionError {} - -// ── Projection: PipelineItemView → PipelineItem ───────────────────────────── - -impl TryFrom<&PipelineItemView> for PipelineItem { - type Error = ProjectionError; - - fn try_from(view: &PipelineItemView) -> Result { - let story_id = StoryId(view.story_id.clone()); - let name = view.name.clone().unwrap_or_default(); - - let depends_on: Vec = view - .depends_on - .as_ref() - .map(|deps| deps.iter().map(|d| StoryId(d.to_string())).collect()) - .unwrap_or_default(); - - let retry_count = view.retry_count.unwrap_or(0).max(0) as u32; - - let stage = project_stage(view)?; - - Ok(PipelineItem { - story_id, - name, - stage, - depends_on, - retry_count, - }) - } -} - -/// Project the stage string + associated fields from a PipelineItemView into -/// a typed Stage enum. This is the one carefully-controlled boundary where -/// loose CRDT data becomes typed. -fn project_stage(view: &PipelineItemView) -> Result { - match view.stage.as_str() { - "1_backlog" => Ok(Stage::Backlog), - "2_current" => Ok(Stage::Coding), - "3_qa" => Ok(Stage::Qa), - "4_merge" => { - // Merge stage in the current CRDT doesn't carry feature_branch or - // commits_ahead — those are computed at transition time. For - // projection from existing CRDT data, we synthesize defaults. - // The feature branch follows the naming convention. - let branch = format!("feature/story-{}", view.story_id); - // Existing CRDT data doesn't track commits_ahead, so we use 1 as - // a safe non-zero default (the item is in merge, so there must be - // at least one commit). - Ok(Stage::Merge { - feature_branch: BranchName(branch), - commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), - }) - } - "5_done" => { - // Use the stored merged_at timestamp if present. Legacy items - // that pre-date this field have merged_at = None, so we fall back - // to UNIX_EPOCH, which makes them older than any retention window - // and therefore eligible for immediate sweep to 6_archived. - let merged_at = view - .merged_at - .map(|ts| { - DateTime::from_timestamp(ts as i64, 0).unwrap_or(DateTime::::UNIX_EPOCH) - }) - .unwrap_or(DateTime::::UNIX_EPOCH); - Ok(Stage::Done { - merged_at, - merge_commit: GitSha("legacy".to_string()), - }) - } - "6_archived" => { - // Determine the archive reason from the CRDT fields. - let reason = if view.blocked == Some(true) { - ArchiveReason::Blocked { - reason: "migrated from legacy blocked field".to_string(), - } - } else { - // Default to Completed for legacy archived items. - ArchiveReason::Completed - }; - Ok(Stage::Archived { - archived_at: Utc::now(), - reason, - }) - } - other => Err(ProjectionError::UnknownStage(other.to_string())), - } -} - -// ── Reverse projection: PipelineItem → stage dir string ───────────────────── - -impl PipelineItem { - /// Convert back to the loose fields that the CRDT write path expects. - /// Returns `(stage_dir, blocked)`. - pub fn to_crdt_fields(&self) -> (&'static str, bool) { - let dir = stage_dir_name(&self.stage); - let blocked = matches!( - self.stage, - Stage::Archived { - reason: ArchiveReason::Blocked { .. }, - .. - } - ); - (dir, blocked) - } -} - -// ── Bridge to existing CRDT reads ─────────────────────────────────────────── - -/// Read all pipeline items from the CRDT and project them into typed enums. -/// -/// Items that fail projection (e.g. unknown stage strings from a future -/// version) are logged and skipped — they don't poison the entire read. -pub fn read_all_typed() -> Vec { - let Some(views) = crate::crdt_state::read_all_items() else { - return Vec::new(); - }; - views - .iter() - .filter_map(|v| match PipelineItem::try_from(v) { - Ok(item) => Some(item), - Err(e) => { - crate::slog!( - "[pipeline_state] projection error for '{}': {e}", - v.story_id - ); - None - } - }) - .collect() -} - -/// Read a single pipeline item by story_id and project it into the typed enum. -pub fn read_typed(story_id: &str) -> Result, ProjectionError> { - let Some(view) = crate::crdt_state::read_item(story_id) else { - return Ok(None); - }; - PipelineItem::try_from(&view).map(Some) -} - -// ── Subscriber stubs (real dispatch uses these as the interface) ───────────── -// -// These are ready to wire into the event bus but not yet connected to the -// actual subsystems. Suppress dead_code until consumers are migrated. - -#[allow(dead_code)] -pub struct MatrixBotSubscriber; -#[allow(dead_code)] -impl TransitionSubscriber for MatrixBotSubscriber { - fn name(&self) -> &'static str { - "matrix-bot" - } - fn on_transition(&self, f: &TransitionFired) { - crate::slog!( - "[pipeline/matrix-bot] #{}: {} → {}", - f.story_id, - stage_label(&f.before), - stage_label(&f.after) - ); - } -} - -#[allow(dead_code)] -pub struct FileRendererSubscriber; -#[allow(dead_code)] -impl TransitionSubscriber for FileRendererSubscriber { - fn name(&self) -> &'static str { - "filesystem" - } - fn on_transition(&self, f: &TransitionFired) { - crate::slog!( - "[pipeline/filesystem] re-rendering work/{}/{}", - stage_dir_name(&f.after), - f.story_id - ); - } -} - -#[allow(dead_code)] -pub struct PipelineItemsSubscriber; -#[allow(dead_code)] -impl TransitionSubscriber for PipelineItemsSubscriber { - fn name(&self) -> &'static str { - "pipeline-items" - } - fn on_transition(&self, f: &TransitionFired) { - crate::slog!( - "[pipeline/items] UPDATE stage = '{}' WHERE id = '{}'", - stage_dir_name(&f.after), - f.story_id - ); - } -} - -#[allow(dead_code)] -pub struct AutoAssignSubscriber; -#[allow(dead_code)] -impl TransitionSubscriber for AutoAssignSubscriber { - fn name(&self) -> &'static str { - "auto-assign" - } - fn on_transition(&self, f: &TransitionFired) { - if matches!(f.after, Stage::Done { .. } | Stage::Archived { .. }) { - crate::slog!( - "[pipeline/auto-assign] story {} reached {}; checking for promotable backlog items", - f.story_id, - stage_label(&f.after) - ); - } - } -} - -#[allow(dead_code)] -pub struct WebUiBroadcastSubscriber; -#[allow(dead_code)] -impl TransitionSubscriber for WebUiBroadcastSubscriber { - fn name(&self) -> &'static str { - "web-ui-broadcast" - } - fn on_transition(&self, f: &TransitionFired) { - crate::slog!( - "[pipeline/web-ui] broadcasting #{} transition to connected clients", - f.story_id - ); - } -} - -// ── Tests ─────────────────────────────────────────────────────────────────── +pub use events::{EventBus, TransitionFired, TransitionSubscriber}; +pub use projection::{ProjectionError, project_stage, read_all_typed, read_typed}; +pub use subscribers::{ + AutoAssignSubscriber, FileRendererSubscriber, MatrixBotSubscriber, PipelineItemsSubscriber, + WebUiBroadcastSubscriber, +}; #[cfg(test)] mod tests { use super::*; + use std::num::NonZeroU32; fn nz(n: u32) -> NonZeroU32 { NonZeroU32::new(n).unwrap() @@ -776,8 +490,6 @@ mod tests { StoryId(s.to_string()) } - // ── Happy path transitions ────────────────────────────────────────── - #[test] fn happy_path_backlog_through_archived() { let s = Stage::Backlog; @@ -1142,235 +854,6 @@ mod tests { // ── Projection tests ──────────────────────────────────────────────── - #[test] - fn project_backlog_item() { - let view = PipelineItemView { - story_id: "42_story_test".to_string(), - stage: "1_backlog".to_string(), - name: Some("Test Story".to_string()), - agent: None, - retry_count: None, - blocked: None, - depends_on: Some(vec![10, 20]), - claimed_by: None, - claimed_at: None, - merged_at: None, - }; - let item = PipelineItem::try_from(&view).unwrap(); - assert_eq!(item.story_id, StoryId("42_story_test".to_string())); - assert_eq!(item.name, "Test Story"); - assert!(matches!(item.stage, Stage::Backlog)); - assert_eq!(item.depends_on.len(), 2); - assert_eq!(item.retry_count, 0); - } - - #[test] - fn project_current_item() { - let view = PipelineItemView { - story_id: "42_story_test".to_string(), - stage: "2_current".to_string(), - name: Some("Test".to_string()), - agent: Some("coder-1".to_string()), - retry_count: Some(2), - blocked: None, - depends_on: None, - claimed_by: None, - claimed_at: None, - merged_at: None, - }; - let item = PipelineItem::try_from(&view).unwrap(); - assert!(matches!(item.stage, Stage::Coding)); - assert_eq!(item.retry_count, 2); - } - - #[test] - fn project_merge_item() { - let view = PipelineItemView { - story_id: "42_story_test".to_string(), - stage: "4_merge".to_string(), - name: Some("Test".to_string()), - agent: None, - retry_count: None, - blocked: None, - depends_on: None, - claimed_by: None, - claimed_at: None, - merged_at: None, - }; - let item = PipelineItem::try_from(&view).unwrap(); - assert!(matches!(item.stage, Stage::Merge { .. })); - if let Stage::Merge { - feature_branch, - commits_ahead, - } = &item.stage - { - assert_eq!(feature_branch.0, "feature/story-42_story_test"); - assert_eq!(commits_ahead.get(), 1); - } - } - - #[test] - fn project_archived_blocked_item() { - let view = PipelineItemView { - story_id: "42_story_test".to_string(), - stage: "6_archived".to_string(), - name: Some("Test".to_string()), - agent: None, - retry_count: None, - blocked: Some(true), - depends_on: None, - claimed_by: None, - claimed_at: None, - merged_at: None, - }; - let item = PipelineItem::try_from(&view).unwrap(); - assert!(matches!( - item.stage, - Stage::Archived { - reason: ArchiveReason::Blocked { .. }, - .. - } - )); - } - - #[test] - fn project_archived_completed_item() { - let view = PipelineItemView { - story_id: "42_story_test".to_string(), - stage: "6_archived".to_string(), - name: Some("Test".to_string()), - agent: None, - retry_count: None, - blocked: Some(false), - depends_on: None, - claimed_by: None, - claimed_at: None, - merged_at: None, - }; - let item = PipelineItem::try_from(&view).unwrap(); - assert!(matches!( - item.stage, - Stage::Archived { - reason: ArchiveReason::Completed, - .. - } - )); - } - - #[test] - fn project_unknown_stage_returns_error() { - let view = PipelineItemView { - story_id: "42_story_test".to_string(), - stage: "9_invalid".to_string(), - name: Some("Test".to_string()), - agent: None, - retry_count: None, - blocked: None, - depends_on: None, - claimed_by: None, - claimed_at: None, - merged_at: None, - }; - let result = PipelineItem::try_from(&view); - assert!(matches!( - result, - Err(ProjectionError::UnknownStage(s)) if s == "9_invalid" - )); - } - - // ── Reverse projection tests ──────────────────────────────────────── - - #[test] - fn reverse_projection_stage_dirs() { - let cases: Vec<(Stage, &str, bool)> = vec![ - (Stage::Backlog, "1_backlog", false), - (Stage::Coding, "2_current", false), - (Stage::Qa, "3_qa", false), - ( - Stage::Merge { - feature_branch: fb("f"), - commits_ahead: nz(1), - }, - "4_merge", - false, - ), - ( - Stage::Done { - merged_at: Utc::now(), - merge_commit: sha("abc"), - }, - "5_done", - false, - ), - ( - Stage::Archived { - archived_at: Utc::now(), - reason: ArchiveReason::Completed, - }, - "6_archived", - false, - ), - ( - Stage::Archived { - archived_at: Utc::now(), - reason: ArchiveReason::Blocked { - reason: "stuck".into(), - }, - }, - "6_archived", - true, - ), - ]; - - for (stage, expected_dir, expected_blocked) in cases { - let item = PipelineItem { - story_id: StoryId("test".into()), - name: "test".into(), - stage, - depends_on: vec![], - retry_count: 0, - }; - let (dir, blocked) = item.to_crdt_fields(); - assert_eq!(dir, expected_dir); - assert_eq!(blocked, expected_blocked); - } - } - - // ── Event bus tests ───────────────────────────────────────────────── - - #[test] - fn event_bus_fires_to_all_subscribers() { - use std::sync::Arc; - use std::sync::atomic::{AtomicU32, Ordering}; - - struct CountingSub(Arc); - impl TransitionSubscriber for CountingSub { - fn name(&self) -> &'static str { - "counter" - } - fn on_transition(&self, _: &TransitionFired) { - self.0.fetch_add(1, Ordering::SeqCst); - } - } - - let counter = Arc::new(AtomicU32::new(0)); - let mut bus = EventBus::new(); - bus.subscribe(CountingSub(counter.clone())); - bus.subscribe(CountingSub(counter.clone())); - - bus.fire(TransitionFired { - story_id: StoryId("test".into()), - before: Stage::Backlog, - after: Stage::Coding, - event: PipelineEvent::DepsMet, - at: Utc::now(), - }); - - assert_eq!(counter.load(Ordering::SeqCst), 2); - } - - // ── Bug 502 regression: agent field is not part of Stage ──────────── - #[test] fn bug_502_agent_not_in_stage() { // Bug 502 was caused by a coder agent being assigned to a story in @@ -1399,13 +882,4 @@ mod tests { } // ── ProjectionError Display ───────────────────────────────────────── - - #[test] - fn projection_error_display() { - let err = ProjectionError::UnknownStage("9_invalid".into()); - assert_eq!(err.to_string(), "unknown stage: \"9_invalid\""); - - let err = ProjectionError::MissingField("story_id"); - assert_eq!(err.to_string(), "missing required field: story_id"); - } } diff --git a/server/src/pipeline_state/projection.rs b/server/src/pipeline_state/projection.rs new file mode 100644 index 00000000..5bb68a19 --- /dev/null +++ b/server/src/pipeline_state/projection.rs @@ -0,0 +1,391 @@ +//! Projection layer — converts loose CRDT views into typed `PipelineItem` enums. + +use chrono::{DateTime, Utc}; +use std::fmt; +use std::num::NonZeroU32; + +use crate::crdt_state::{PipelineItemView, read_all_items, read_item}; + +use super::{ + ArchiveReason, BranchName, ExecutionState, GitSha, PipelineItem, Stage, StoryId, + stage_dir_name, +}; + +/// Errors from projecting loose CRDT data into typed enums. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ProjectionError { + /// The stage string from the CRDT doesn't map to any known Stage variant. + UnknownStage(String), + /// A required field is missing from the CRDT data. + MissingField(&'static str), + /// A field has an invalid value. + InvalidField { field: &'static str, detail: String }, +} + +impl fmt::Display for ProjectionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::UnknownStage(s) => write!(f, "unknown stage: {s:?}"), + Self::MissingField(field) => write!(f, "missing required field: {field}"), + Self::InvalidField { field, detail } => { + write!(f, "invalid field {field}: {detail}") + } + } + } +} + +impl std::error::Error for ProjectionError {} + +// ── Projection: PipelineItemView → PipelineItem ───────────────────────────── + +impl TryFrom<&PipelineItemView> for PipelineItem { + type Error = ProjectionError; + + fn try_from(view: &PipelineItemView) -> Result { + let story_id = StoryId(view.story_id.clone()); + let name = view.name.clone().unwrap_or_default(); + + let depends_on: Vec = view + .depends_on + .as_ref() + .map(|deps| deps.iter().map(|d| StoryId(d.to_string())).collect()) + .unwrap_or_default(); + + let retry_count = view.retry_count.unwrap_or(0).max(0) as u32; + + let stage = project_stage(view)?; + + Ok(PipelineItem { + story_id, + name, + stage, + depends_on, + retry_count, + }) + } +} + +/// Project the stage string + associated fields from a PipelineItemView into +/// a typed Stage enum. This is the one carefully-controlled boundary where +/// loose CRDT data becomes typed. +pub fn project_stage(view: &PipelineItemView) -> Result { + match view.stage.as_str() { + "1_backlog" => Ok(Stage::Backlog), + "2_current" => Ok(Stage::Coding), + "3_qa" => Ok(Stage::Qa), + "4_merge" => { + // Merge stage in the current CRDT doesn't carry feature_branch or + // commits_ahead — those are computed at transition time. For + // projection from existing CRDT data, we synthesize defaults. + // The feature branch follows the naming convention. + let branch = format!("feature/story-{}", view.story_id); + // Existing CRDT data doesn't track commits_ahead, so we use 1 as + // a safe non-zero default (the item is in merge, so there must be + // at least one commit). + Ok(Stage::Merge { + feature_branch: BranchName(branch), + commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), + }) + } + "5_done" => { + // Use the stored merged_at timestamp if present. Legacy items + // that pre-date this field have merged_at = None, so we fall back + // to UNIX_EPOCH, which makes them older than any retention window + // and therefore eligible for immediate sweep to 6_archived. + let merged_at = view + .merged_at + .map(|ts| { + DateTime::from_timestamp(ts as i64, 0).unwrap_or(DateTime::::UNIX_EPOCH) + }) + .unwrap_or(DateTime::::UNIX_EPOCH); + Ok(Stage::Done { + merged_at, + merge_commit: GitSha("legacy".to_string()), + }) + } + "6_archived" => { + // Determine the archive reason from the CRDT fields. + let reason = if view.blocked == Some(true) { + ArchiveReason::Blocked { + reason: "migrated from legacy blocked field".to_string(), + } + } else { + // Default to Completed for legacy archived items. + ArchiveReason::Completed + }; + Ok(Stage::Archived { + archived_at: Utc::now(), + reason, + }) + } + other => Err(ProjectionError::UnknownStage(other.to_string())), + } +} + +// ── Reverse projection: PipelineItem → stage dir string ───────────────────── + +impl PipelineItem { + /// Convert back to the loose fields that the CRDT write path expects. + /// Returns `(stage_dir, blocked)`. + pub fn to_crdt_fields(&self) -> (&'static str, bool) { + let dir = stage_dir_name(&self.stage); + let blocked = matches!( + self.stage, + Stage::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + } + ); + (dir, blocked) + } +} + +// ── Bridge to existing CRDT reads ─────────────────────────────────────────── + +/// Read all pipeline items from the CRDT and project them into typed enums. +/// +/// Items that fail projection (e.g. unknown stage strings from a future +/// version) are logged and skipped — they don't poison the entire read. +pub fn read_all_typed() -> Vec { + let Some(views) = crate::crdt_state::read_all_items() else { + return Vec::new(); + }; + views + .iter() + .filter_map(|v| match PipelineItem::try_from(v) { + Ok(item) => Some(item), + Err(e) => { + crate::slog!( + "[pipeline_state] projection error for '{}': {e}", + v.story_id + ); + None + } + }) + .collect() +} + +/// Read a single pipeline item by story_id and project it into the typed enum. +pub fn read_typed(story_id: &str) -> Result, ProjectionError> { + let Some(view) = crate::crdt_state::read_item(story_id) else { + return Ok(None); + }; + PipelineItem::try_from(&view).map(Some) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::TimeZone; + use std::num::NonZeroU32; + + fn nz(n: u32) -> NonZeroU32 { NonZeroU32::new(n).unwrap() } + fn fb(name: &str) -> BranchName { BranchName(name.to_string()) } + fn sha(s: &str) -> GitSha { GitSha(s.to_string()) } + fn sid(s: &str) -> StoryId { StoryId(s.to_string()) } + + #[test] + fn project_backlog_item() { + let view = PipelineItemView { + story_id: "42_story_test".to_string(), + stage: "1_backlog".to_string(), + name: Some("Test Story".to_string()), + agent: None, + retry_count: None, + blocked: None, + depends_on: Some(vec![10, 20]), + claimed_by: None, + claimed_at: None, + merged_at: None, + }; + let item = PipelineItem::try_from(&view).unwrap(); + assert_eq!(item.story_id, StoryId("42_story_test".to_string())); + assert_eq!(item.name, "Test Story"); + assert!(matches!(item.stage, Stage::Backlog)); + assert_eq!(item.depends_on.len(), 2); + assert_eq!(item.retry_count, 0); + } + + #[test] + fn project_current_item() { + let view = PipelineItemView { + story_id: "42_story_test".to_string(), + stage: "2_current".to_string(), + name: Some("Test".to_string()), + agent: Some("coder-1".to_string()), + retry_count: Some(2), + blocked: None, + depends_on: None, + claimed_by: None, + claimed_at: None, + merged_at: None, + }; + let item = PipelineItem::try_from(&view).unwrap(); + assert!(matches!(item.stage, Stage::Coding)); + assert_eq!(item.retry_count, 2); + } + + #[test] + fn project_merge_item() { + let view = PipelineItemView { + story_id: "42_story_test".to_string(), + stage: "4_merge".to_string(), + name: Some("Test".to_string()), + agent: None, + retry_count: None, + blocked: None, + depends_on: None, + claimed_by: None, + claimed_at: None, + merged_at: None, + }; + let item = PipelineItem::try_from(&view).unwrap(); + assert!(matches!(item.stage, Stage::Merge { .. })); + if let Stage::Merge { + feature_branch, + commits_ahead, + } = &item.stage + { + assert_eq!(feature_branch.0, "feature/story-42_story_test"); + assert_eq!(commits_ahead.get(), 1); + } + } + + #[test] + fn project_archived_blocked_item() { + let view = PipelineItemView { + story_id: "42_story_test".to_string(), + stage: "6_archived".to_string(), + name: Some("Test".to_string()), + agent: None, + retry_count: None, + blocked: Some(true), + depends_on: None, + claimed_by: None, + claimed_at: None, + merged_at: None, + }; + let item = PipelineItem::try_from(&view).unwrap(); + assert!(matches!( + item.stage, + Stage::Archived { + reason: ArchiveReason::Blocked { .. }, + .. + } + )); + } + + #[test] + fn project_archived_completed_item() { + let view = PipelineItemView { + story_id: "42_story_test".to_string(), + stage: "6_archived".to_string(), + name: Some("Test".to_string()), + agent: None, + retry_count: None, + blocked: Some(false), + depends_on: None, + claimed_by: None, + claimed_at: None, + merged_at: None, + }; + let item = PipelineItem::try_from(&view).unwrap(); + assert!(matches!( + item.stage, + Stage::Archived { + reason: ArchiveReason::Completed, + .. + } + )); + } + + #[test] + fn project_unknown_stage_returns_error() { + let view = PipelineItemView { + story_id: "42_story_test".to_string(), + stage: "9_invalid".to_string(), + name: Some("Test".to_string()), + agent: None, + retry_count: None, + blocked: None, + depends_on: None, + claimed_by: None, + claimed_at: None, + merged_at: None, + }; + let result = PipelineItem::try_from(&view); + assert!(matches!( + result, + Err(ProjectionError::UnknownStage(s)) if s == "9_invalid" + )); + } + + // ── Reverse projection tests ──────────────────────────────────────── + + #[test] + fn reverse_projection_stage_dirs() { + let cases: Vec<(Stage, &str, bool)> = vec![ + (Stage::Backlog, "1_backlog", false), + (Stage::Coding, "2_current", false), + (Stage::Qa, "3_qa", false), + ( + Stage::Merge { + feature_branch: fb("f"), + commits_ahead: nz(1), + }, + "4_merge", + false, + ), + ( + Stage::Done { + merged_at: Utc::now(), + merge_commit: sha("abc"), + }, + "5_done", + false, + ), + ( + Stage::Archived { + archived_at: Utc::now(), + reason: ArchiveReason::Completed, + }, + "6_archived", + false, + ), + ( + Stage::Archived { + archived_at: Utc::now(), + reason: ArchiveReason::Blocked { + reason: "stuck".into(), + }, + }, + "6_archived", + true, + ), + ]; + + for (stage, expected_dir, expected_blocked) in cases { + let item = PipelineItem { + story_id: StoryId("test".into()), + name: "test".into(), + stage, + depends_on: vec![], + retry_count: 0, + }; + let (dir, blocked) = item.to_crdt_fields(); + assert_eq!(dir, expected_dir); + assert_eq!(blocked, expected_blocked); + } + } + + // ── Event bus tests ───────────────────────────────────────────────── + + #[test] + fn projection_error_display() { + let err = ProjectionError::UnknownStage("9_invalid".into()); + assert_eq!(err.to_string(), "unknown stage: \"9_invalid\""); + + let err = ProjectionError::MissingField("story_id"); + assert_eq!(err.to_string(), "missing required field: story_id"); + } +} diff --git a/server/src/pipeline_state/subscribers.rs b/server/src/pipeline_state/subscribers.rs new file mode 100644 index 00000000..2a753a07 --- /dev/null +++ b/server/src/pipeline_state/subscribers.rs @@ -0,0 +1,94 @@ +//! Concrete subscriber stubs for the event bus. + +use super::Stage; +use super::events::{TransitionFired, TransitionSubscriber}; +use super::{event_label, stage_dir_name, stage_label}; + + +// ── Subscriber stubs (real dispatch uses these as the interface) ───────────── +// +// These are ready to wire into the event bus but not yet connected to the +// actual subsystems. Suppress dead_code until consumers are migrated. + +#[allow(dead_code)] +pub struct MatrixBotSubscriber; +#[allow(dead_code)] +impl TransitionSubscriber for MatrixBotSubscriber { + fn name(&self) -> &'static str { + "matrix-bot" + } + fn on_transition(&self, f: &TransitionFired) { + crate::slog!( + "[pipeline/matrix-bot] #{}: {} → {}", + f.story_id, + stage_label(&f.before), + stage_label(&f.after) + ); + } +} + +#[allow(dead_code)] +pub struct FileRendererSubscriber; +#[allow(dead_code)] +impl TransitionSubscriber for FileRendererSubscriber { + fn name(&self) -> &'static str { + "filesystem" + } + fn on_transition(&self, f: &TransitionFired) { + crate::slog!( + "[pipeline/filesystem] re-rendering work/{}/{}", + stage_dir_name(&f.after), + f.story_id + ); + } +} + +#[allow(dead_code)] +pub struct PipelineItemsSubscriber; +#[allow(dead_code)] +impl TransitionSubscriber for PipelineItemsSubscriber { + fn name(&self) -> &'static str { + "pipeline-items" + } + fn on_transition(&self, f: &TransitionFired) { + crate::slog!( + "[pipeline/items] UPDATE stage = '{}' WHERE id = '{}'", + stage_dir_name(&f.after), + f.story_id + ); + } +} + +#[allow(dead_code)] +pub struct AutoAssignSubscriber; +#[allow(dead_code)] +impl TransitionSubscriber for AutoAssignSubscriber { + fn name(&self) -> &'static str { + "auto-assign" + } + fn on_transition(&self, f: &TransitionFired) { + if matches!(f.after, Stage::Done { .. } | Stage::Archived { .. }) { + crate::slog!( + "[pipeline/auto-assign] story {} reached {}; checking for promotable backlog items", + f.story_id, + stage_label(&f.after) + ); + } + } +} + +#[allow(dead_code)] +pub struct WebUiBroadcastSubscriber; +#[allow(dead_code)] +impl TransitionSubscriber for WebUiBroadcastSubscriber { + fn name(&self) -> &'static str { + "web-ui-broadcast" + } + fn on_transition(&self, f: &TransitionFired) { + crate::slog!( + "[pipeline/web-ui] broadcasting #{} transition to connected clients", + f.story_id + ); + } +} +