From 2f50e2198b96973f47dd352d8efd39127a54668f Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 13 May 2026 04:28:30 +0000 Subject: [PATCH] huskies: merge 951 --- .../watchdog/tests/limits_tests.rs | 14 +- server/src/chat/lookup.rs | 4 +- server/src/crdt_state/mod.rs | 2 +- server/src/crdt_state/read.rs | 97 +++++++++++- server/src/crdt_state/state/tests.rs | 4 +- server/src/crdt_state/types.rs | 89 +---------- server/src/crdt_state/write/migrations.rs | 21 ++- server/src/crdt_state/write/tests.rs | 16 +- server/src/db/mod.rs | 4 +- server/src/http/mcp/merge_tools.rs | 5 +- server/src/pipeline_state/mod.rs | 2 +- server/src/pipeline_state/projection.rs | 138 +++++------------- 12 files changed, 178 insertions(+), 218 deletions(-) diff --git a/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs b/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs index af520cd0..c48e7398 100644 --- a/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs +++ b/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs @@ -274,10 +274,10 @@ max_turns = 10 let item = crate::crdt_state::read_item(story_id) .expect("story must be in CRDT after watchdog termination"); assert_eq!( - item.stage().as_dir(), + item.stage().dir_name(), "blocked", "story stage must be 2_blocked after limit termination with max_retries=1 — got: {}", - item.stage().as_dir() + item.stage().dir_name() ); // Sanity: the agent itself is also Failed with the right reason. @@ -416,7 +416,7 @@ max_turns = 10 let item = crate::crdt_state::read_item(story_id) .expect("story must be in CRDT after per-session overrun"); assert_eq!( - item.stage().as_dir(), + item.stage().dir_name(), "blocked", "story stage must be 2_blocked after per-session overrun with max_retries=1" ); @@ -478,7 +478,7 @@ max_turns = 10 "after session 1, retry_count should be 1 in CRDT" ); assert_ne!( - item.stage().as_dir(), + item.stage().dir_name(), "blocked", "story should NOT be blocked after session 1" ); @@ -498,7 +498,7 @@ max_turns = 10 "after session 2, retry_count should be 2 in CRDT" ); assert_ne!( - item.stage().as_dir(), + item.stage().dir_name(), "blocked", "story should NOT be blocked after session 2" ); @@ -513,10 +513,10 @@ max_turns = 10 let item = crate::crdt_state::read_item(story_id).expect("story must be in CRDT"); assert_eq!( - item.stage().as_dir(), + item.stage().dir_name(), "blocked", "story must be blocked after session 3 (retry_count=3 >= max_retries=3) — got: {}", - item.stage().as_dir() + item.stage().dir_name() ); // retry_count resets to 0 on stage transition (Bug 780) — the fact // that the story reached 2_blocked proves the retry limit was hit. diff --git a/server/src/chat/lookup.rs b/server/src/chat/lookup.rs index 73fb0438..fb4a48cb 100644 --- a/server/src/chat/lookup.rs +++ b/server/src/chat/lookup.rs @@ -33,7 +33,7 @@ pub(crate) fn find_story_by_number( if let Some(items) = crate::crdt_state::read_all_items() { for item in items { if item.story_id().split('_').next().unwrap_or("") == number { - let stage_dir = item.stage().as_dir().to_string(); + let stage_dir = item.stage().dir_name().to_string(); let path = project_root .join(".huskies") .join("work") @@ -54,7 +54,7 @@ pub(crate) fn find_story_by_number( continue; } let stage_dir = crate::crdt_state::read_item(&id) - .map(|v| v.stage().as_dir().to_string()) + .map(|v| v.stage().dir_name().to_string()) .unwrap_or_else(|| "backlog".to_string()); let path = project_root .join(".huskies") diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index bc797a78..0e1ea71d 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -48,7 +48,7 @@ pub use state::{init, subscribe}; pub use types::{ ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent, GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, MergeJobView, - NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView, Stage, + NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem, }; pub use write::{ diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index d19c4d39..99c5326b 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -295,12 +295,17 @@ pub fn evict_item(story_id: &str) -> Result<(), String> { } /// Extract a `PipelineItemView` from a `PipelineItemCrdt`. +/// +/// Projects the loose CRDT `stage` register into a typed +/// [`crate::pipeline_state::Stage`]. Items with an unknown or missing stage +/// string are filtered out (`None`), so every `WorkItem` that escapes the +/// read path carries a valid typed stage. pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option { let story_id = match item.story_id.view() { JsonValue::String(s) if !s.is_empty() => s, _ => return None, }; - let stage = match item.stage.view() { + let stage_str = match item.stage.view() { JsonValue::String(s) if !s.is_empty() => s, _ => return None, }; @@ -368,6 +373,8 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option None, }; + let stage = project_stage_for_view(&stage_str, &story_id, merged_at, blocked)?; + Some(PipelineItemView { story_id, stage, @@ -388,6 +395,90 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option, + blocked: Option, +) -> Option { + use crate::pipeline_state::{ArchiveReason, BranchName, GitSha, Stage}; + use chrono::{DateTime, Utc}; + use std::num::NonZeroU32; + + // Normalise legacy directory-style strings to their clean wire form so + // the match below stays single-shape. + let clean = match stage_str { + "0_upcoming" => "upcoming", + "1_backlog" => "backlog", + "2_current" => "coding", + "2_blocked" => "blocked", + "3_qa" => "qa", + "4_merge" => "merge", + "4_merge_failure" => "merge_failure", + "5_done" => "done", + "6_archived" => "archived", + // Pre-934 `7_frozen` collapses to backlog (the frozen flag is an + // orthogonal CRDT register since story 934 stage 4). + "7_frozen" => "backlog", + other => other, + }; + + match clean { + "upcoming" => Some(Stage::Upcoming), + "backlog" => Some(Stage::Backlog), + "coding" => Some(Stage::Coding), + "qa" => Some(Stage::Qa), + "blocked" => Some(Stage::Blocked { + reason: String::new(), + }), + "merge" => Some(Stage::Merge { + feature_branch: BranchName(format!("feature/story-{story_id}")), + commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), + }), + "merge_failure" => Some(Stage::MergeFailure { + reason: String::new(), + }), + "done" => { + let merged_at = merged_at + .map(|ts| { + DateTime::from_timestamp(ts as i64, 0).unwrap_or(DateTime::::UNIX_EPOCH) + }) + .unwrap_or(DateTime::::UNIX_EPOCH); + Some(Stage::Done { + merged_at, + merge_commit: GitSha("legacy".to_string()), + }) + } + "archived" => { + let reason = if blocked.unwrap_or(false) { + ArchiveReason::Blocked { + reason: "migrated from legacy blocked field".to_string(), + } + } else { + ArchiveReason::Completed + }; + Some(Stage::Archived { + archived_at: Utc::now(), + reason, + }) + } + _ => None, + } +} + /// Check whether a dependency (by numeric ID prefix) is in `5_done` or `6_archived` /// according to CRDT state. /// @@ -478,7 +569,7 @@ mod tests { let item_json: JsonValue = json!({ "story_id": "40_story_view", - "stage": "3_qa", + "stage": "qa", "name": "View Test", "agent": "coder-1", "retry_count": 2.0, @@ -494,7 +585,7 @@ mod tests { let view = extract_item_view(&crdt.doc.items[0]).unwrap(); assert_eq!(view.story_id, "40_story_view"); - assert_eq!(view.stage, "3_qa"); + assert!(matches!(view.stage, crate::pipeline_state::Stage::Qa)); assert_eq!(view.name.as_deref(), Some("View Test")); assert_eq!(view.agent.as_deref(), Some("coder-1")); assert_eq!(view.retry_count, Some(2)); diff --git a/server/src/crdt_state/state/tests.rs b/server/src/crdt_state/state/tests.rs index b151cc69..59d0c580 100644 --- a/server/src/crdt_state/state/tests.rs +++ b/server/src/crdt_state/state/tests.rs @@ -165,7 +165,7 @@ async fn init_and_write_read_roundtrip() { // Insert and update like write_item does. let item_json: JsonValue = json!({ "story_id": "50_story_roundtrip", - "stage": "1_backlog", + "stage": "backlog", "name": "Roundtrip", "agent": "", "retry_count": 0.0, @@ -206,7 +206,7 @@ async fn init_and_write_read_roundtrip() { let view = extract_item_view(&crdt2.doc.items[0]).unwrap(); assert_eq!(view.story_id, "50_story_roundtrip"); - assert_eq!(view.stage, "1_backlog"); + assert!(matches!(view.stage, crate::pipeline_state::Stage::Backlog)); assert_eq!(view.name.as_deref(), Some("Roundtrip")); } diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs index 9718af8d..f42d4eef 100644 --- a/server/src/crdt_state/types.rs +++ b/server/src/crdt_state/types.rs @@ -124,83 +124,6 @@ pub struct NodePresenceCrdt { // ── Read-side view types ───────────────────────────────────────────── -/// Pipeline stage inferred from the CRDT `stage` register. -/// -/// This is the low-level typed stage for [`WorkItem`] accessors. For rich -/// transition metadata (merge commits, timestamps, etc.) project via -/// `pipeline_state::Stage` instead. -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum Stage { - /// Story created but not yet triaged (`0_upcoming`). - Upcoming, - /// Waiting for dependencies or auto-assign (`1_backlog`). - Backlog, - /// Actively being coded (`2_current`). - Coding, - /// Blocked awaiting human resolution (`2_blocked`). - Blocked, - /// Coder done; gates running (`3_qa`). - Qa, - /// Gates passed; ready to merge (`4_merge`). - Merge, - /// Merge failed; awaiting intervention (`4_merge_failure`). - MergeFailure, - /// Merged to master (`5_done`). - Done, - /// Out of the active flow (`6_archived`). - Archived, - /// An unrecognised stage string — forward-compatible catch-all. - Unknown(String), -} - -impl Stage { - /// Parse a stage wire string into the typed enum. - /// - /// Accepts only the post-934 clean vocabulary (`"backlog"`, `"coding"`, - /// `"qa"`, `"merge"`, `"merge_failure"`, `"blocked"`, `"done"`, - /// `"archived"`, `"upcoming"`). Pre-934 directory-style strings - /// (`"2_current"`, `"4_merge"`, etc.) are no longer accepted — they are - /// rewritten at startup by `migrate_legacy_stage_strings`. - pub fn from_dir(s: &str) -> Self { - match s { - "upcoming" => Stage::Upcoming, - "backlog" => Stage::Backlog, - "coding" => Stage::Coding, - "blocked" => Stage::Blocked, - "qa" => Stage::Qa, - "merge" => Stage::Merge, - "merge_failure" => Stage::MergeFailure, - "done" => Stage::Done, - "archived" => Stage::Archived, - other => Stage::Unknown(other.to_string()), - } - } - - /// Convert back to the wire string for persistence into the CRDT. - /// - /// Post-934: clean vocabulary (no numeric prefixes); the strings only - /// survive at this single CRDT-serialisation boundary. - pub fn as_dir(&self) -> &str { - match self { - Stage::Upcoming => "upcoming", - Stage::Backlog => "backlog", - Stage::Coding => "coding", - Stage::Blocked => "blocked", - Stage::Qa => "qa", - Stage::Merge => "merge", - Stage::MergeFailure => "merge_failure", - Stage::Done => "done", - Stage::Archived => "archived", - Stage::Unknown(s) => s.as_str(), - } - } - - /// `true` if this is an "active" stage (`Coding`, `Qa`, or `Merge`). - pub fn is_active(&self) -> bool { - matches!(self, Stage::Coding | Stage::Qa | Stage::Merge) - } -} - /// A typed snapshot of a single pipeline work item derived from the CRDT document. /// /// Access fields exclusively through the typed accessor methods — raw field access is @@ -213,7 +136,7 @@ impl Stage { #[derive(Clone, Debug)] pub struct WorkItem { pub(super) story_id: String, - pub(super) stage: String, + pub(super) stage: crate::pipeline_state::Stage, pub(super) name: Option, pub(super) agent: Option, pub(super) retry_count: Option, @@ -245,9 +168,9 @@ impl WorkItem { &self.story_id } - /// Pipeline stage as a typed enum. - pub fn stage(&self) -> Stage { - Stage::from_dir(&self.stage) + /// Pipeline stage as a typed [`crate::pipeline_state::Stage`]. + pub fn stage(&self) -> &crate::pipeline_state::Stage { + &self.stage } /// Human-readable story name, or `None` when unset. @@ -331,7 +254,7 @@ impl WorkItem { #[allow(clippy::too_many_arguments)] pub fn for_test( story_id: impl Into, - stage: impl Into, + stage: crate::pipeline_state::Stage, name: Option, agent: Option, retry_count: Option, @@ -349,7 +272,7 @@ impl WorkItem { ) -> Self { Self { story_id: story_id.into(), - stage: stage.into(), + stage, name, agent, retry_count, diff --git a/server/src/crdt_state/write/migrations.rs b/server/src/crdt_state/write/migrations.rs index 29d2d233..5806b76f 100644 --- a/server/src/crdt_state/write/migrations.rs +++ b/server/src/crdt_state/write/migrations.rs @@ -343,10 +343,9 @@ mod stage_migration_tests { for (id, _, expected_variant) in cases { let view = read_item(id).expect("item must still exist after migration"); - let projected: Stage = crate::pipeline_state::project_stage(&view) - .expect("projection must succeed after migration"); + let projected: &Stage = view.stage(); assert_eq!( - std::mem::discriminant(&projected), + std::mem::discriminant(projected), std::mem::discriminant(expected_variant), "stage for {id} should project to {expected_variant:?} after migration, got {projected:?}", ); @@ -367,7 +366,7 @@ mod stage_migration_tests { let after = read_item(story_id).expect("item must still exist after migration"); assert!( - matches!(after.stage(), crate::crdt_state::Stage::Backlog), + matches!(after.stage(), crate::pipeline_state::Stage::Backlog), "7_frozen should collapse to Backlog: got {:?}", after.stage() ); @@ -400,8 +399,14 @@ mod stage_migration_tests { // Clean item is unchanged; legacy item is now clean too. let clean = read_item("9520_already_clean").unwrap(); let migrated = read_item("9521_needs_migration").unwrap(); - assert!(matches!(clean.stage(), crate::crdt_state::Stage::Coding)); - assert!(matches!(migrated.stage(), crate::crdt_state::Stage::Coding)); + assert!(matches!( + clean.stage(), + crate::pipeline_state::Stage::Coding + )); + assert!(matches!( + migrated.stage(), + crate::pipeline_state::Stage::Coding + )); } #[test] @@ -413,7 +418,7 @@ mod stage_migration_tests { let after_first = read_item("9530_idempotent").unwrap(); assert!(matches!( after_first.stage(), - crate::crdt_state::Stage::Merge + crate::pipeline_state::Stage::Merge { .. } )); // Second call must be a no-op — the filter pass returns empty. @@ -421,7 +426,7 @@ mod stage_migration_tests { let after_second = read_item("9530_idempotent").unwrap(); assert!(matches!( after_second.stage(), - crate::crdt_state::Stage::Merge + crate::pipeline_state::Stage::Merge { .. } )); } diff --git a/server/src/crdt_state/write/tests.rs b/server/src/crdt_state/write/tests.rs index f1dfbe9d..87f564eb 100644 --- a/server/src/crdt_state/write/tests.rs +++ b/server/src/crdt_state/write/tests.rs @@ -216,7 +216,7 @@ fn migrate_story_ids_to_numeric_preserves_stage_and_name() { migrate_story_ids_to_numeric(); let item = read_item("45").expect("item must be accessible by numeric ID"); - assert_eq!(item.stage, "coding"); + assert!(matches!(item.stage, crate::pipeline_state::Stage::Coding)); assert_eq!(item.name.as_deref(), Some("Crash Bug")); assert_eq!(item.agent.as_deref(), Some("coder-1")); } @@ -620,7 +620,7 @@ async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { let sid = format!("{}_story_warmup", i); let item: JsonValue = json!({ "story_id": sid, - "stage": "1_backlog", + "stage": "backlog", "name": "", "agent": "", "retry_count": 0.0, @@ -657,7 +657,7 @@ async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { let idx = rebuild_index(&crdt)["511_story_target"]; let stage_op = crdt.doc.items[idx] .stage - .set("2_current".to_string()) + .set("coding".to_string()) .sign(&kp); crdt.apply(stage_op.clone()); // stage_op.inner.seq == 1 @@ -701,8 +701,8 @@ async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { ); let idx2 = index2["511_story_target"]; let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap(); - assert_eq!( - view.stage, "2_current", + assert!( + matches!(view.stage, crate::pipeline_state::Stage::Coding), "stage field update lost during replay (bug 511 regression)" ); @@ -726,9 +726,9 @@ async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { // but the stage update is lost (it ran before the item existed). if let Some(idx3) = index3.get("511_story_target") { let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap(); - // The bug: stage is still "1_backlog" because the update was dropped. - assert_eq!( - view3.stage, "1_backlog", + // The bug: stage is still "backlog" because the update was dropped. + assert!( + matches!(view3.stage, crate::pipeline_state::Stage::Backlog), "expected seq-ASC replay to exhibit the bug (update lost)" ); } diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 915601c3..ed3bac7a 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -328,7 +328,7 @@ mod tests { write_item_with_content(story_id, "2_current", content, meta); let view = crate::crdt_state::read_item(story_id).expect("story exists in CRDT"); - assert_eq!(view.stage().as_dir(), "coding"); + assert_eq!(view.stage().dir_name(), "coding"); assert_eq!(view.name(), Some("Typed Name")); assert_eq!(view.agent(), Some("coder-1")); assert_eq!(view.retry_count(), 2); @@ -353,7 +353,7 @@ mod tests { write_item_with_content(story_id, "2_current", content, ItemMeta::default()); let view = crate::crdt_state::read_item(story_id).expect("story exists in CRDT"); - assert_eq!(view.stage().as_dir(), "coding"); + assert_eq!(view.stage().dir_name(), "coding"); assert_eq!( view.name(), None, diff --git a/server/src/http/mcp/merge_tools.rs b/server/src/http/mcp/merge_tools.rs index 4a9983c9..5021e944 100644 --- a/server/src/http/mcp/merge_tools.rs +++ b/server/src/http/mcp/merge_tools.rs @@ -19,10 +19,11 @@ pub(super) async fn tool_merge_agent_work( if let Some(item) = crate::crdt_state::read_item(story_id) && matches!( item.stage(), - crate::crdt_state::Stage::Done | crate::crdt_state::Stage::Archived + crate::pipeline_state::Stage::Done { .. } + | crate::pipeline_state::Stage::Archived { .. } ) { - let stage_name = item.stage().as_dir().to_string(); + let stage_name = item.stage().dir_name().to_string(); return serde_json::to_string_pretty(&json!({ "story_id": story_id, "status": "completed", diff --git a/server/src/pipeline_state/mod.rs b/server/src/pipeline_state/mod.rs index 5854fa93..5a8ddc83 100644 --- a/server/src/pipeline_state/mod.rs +++ b/server/src/pipeline_state/mod.rs @@ -53,7 +53,7 @@ pub use transition::{ pub use events::{EventBus, TransitionFired, TransitionSubscriber}; #[allow(unused_imports)] -pub use projection::{ProjectionError, project_stage}; +pub use projection::ProjectionError; pub use projection::{read_all_typed, read_typed}; #[allow(unused_imports)] diff --git a/server/src/pipeline_state/projection.rs b/server/src/pipeline_state/projection.rs index 207e7473..3b591a02 100644 --- a/server/src/pipeline_state/projection.rs +++ b/server/src/pipeline_state/projection.rs @@ -1,12 +1,16 @@ //! Projection layer — converts loose CRDT views into typed `PipelineItem` enums. +//! +//! Story 944: the view layer (`PipelineItemView`) now carries a typed +//! [`Stage`] directly, so this projection is mechanical — no more stage-string +//! parsing or payload synthesis happens here. [`TryFrom`] is kept for +//! backwards-compatible callers (apply.rs threads `ProjectionError` through +//! `ApplyError`), but the impl is infallible in practice. -use chrono::{DateTime, Utc}; use std::fmt; -use std::num::NonZeroU32; use crate::crdt_state::PipelineItemView; -use super::{ArchiveReason, BranchName, GitSha, PipelineItem, Stage, StoryId, stage_dir_name}; +use super::{ArchiveReason, PipelineItem, Stage, StoryId, stage_dir_name}; /// Errors from projecting loose CRDT data into typed enums. #[derive(Debug, Clone, PartialEq, Eq)] @@ -50,12 +54,10 @@ impl TryFrom<&PipelineItemView> for PipelineItem { let retry_count = view.retry_count(); - let stage = project_stage(view)?; - Ok(PipelineItem { story_id, name, - stage, + stage: view.stage().clone(), depends_on, retry_count, frozen: view.frozen(), @@ -63,78 +65,6 @@ impl TryFrom<&PipelineItemView> for PipelineItem { } } -/// Project the typed low-level [`crdt_state::Stage`] plus the view's -/// associated fields into a rich [`Stage`] with payload defaults. -/// -/// This is the one carefully-controlled boundary where the CRDT's -/// stringly-typed stage register gains payload fields (merge metadata, -/// archive reason, etc.) synthesised from sibling registers and sane -/// defaults. Unknown stage strings (forward-compat aliases) surface as -/// [`ProjectionError::UnknownStage`]. -pub fn project_stage(view: &PipelineItemView) -> Result { - use crate::crdt_state::Stage as LowStage; - match view.stage() { - LowStage::Upcoming => Ok(Stage::Upcoming), - LowStage::Backlog => Ok(Stage::Backlog), - LowStage::Blocked => Ok(Stage::Blocked { - reason: String::new(), - }), - LowStage::Coding => Ok(Stage::Coding), - LowStage::Qa => Ok(Stage::Qa), - LowStage::Merge => { - // Merge stage in the current CRDT doesn't carry feature_branch or - // commits_ahead — those are computed at transition time. For - // projection from existing CRDT data, we synthesize defaults. - let branch = format!("feature/story-{}", view.story_id()); - // Existing CRDT data doesn't track commits_ahead, so we use 1 as - // a safe non-zero default (the item is in merge, so there must be - // at least one commit). - Ok(Stage::Merge { - feature_branch: BranchName(branch), - commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), - }) - } - LowStage::MergeFailure => { - // The reason is persisted in the content body but is not part of - // the raw CRDT view; the projection uses an empty string here. - // Consumers that need the reason should read content directly. - Ok(Stage::MergeFailure { - reason: String::new(), - }) - } - LowStage::Done => { - // Use the stored merged_at timestamp if present. Legacy items - // that pre-date this field have merged_at = None, so we fall back - // to UNIX_EPOCH, which makes them older than any retention window - // and therefore eligible for immediate sweep to archived. - let merged_at = view - .merged_at() - .map(|ts| { - DateTime::from_timestamp(ts as i64, 0).unwrap_or(DateTime::::UNIX_EPOCH) - }) - .unwrap_or(DateTime::::UNIX_EPOCH); - Ok(Stage::Done { - merged_at, - merge_commit: GitSha("legacy".to_string()), - }) - } - LowStage::Archived => { - let reason = if view.blocked() { - ArchiveReason::Blocked { - reason: "migrated from legacy blocked field".to_string(), - } - } else { - ArchiveReason::Completed - }; - Ok(Stage::Archived { - archived_at: Utc::now(), - reason, - }) - } - LowStage::Unknown(s) => Err(ProjectionError::UnknownStage(s)), - } -} - // ── Reverse projection: PipelineItem → stage dir string ───────────────────── impl PipelineItem { @@ -190,6 +120,8 @@ pub fn read_typed(story_id: &str) -> Result, ProjectionErro #[cfg(test)] mod tests { use super::*; + use crate::pipeline_state::{BranchName, GitSha}; + use chrono::Utc; use std::num::NonZeroU32; fn nz(n: u32) -> NonZeroU32 { @@ -201,11 +133,8 @@ mod tests { fn sha(s: &str) -> GitSha { GitSha(s.to_string()) } - fn sid(s: &str) -> StoryId { - StoryId(s.to_string()) - } - fn make_view(story_id: &str, stage: &str, name: Option<&str>) -> PipelineItemView { + fn make_view(story_id: &str, stage: Stage, name: Option<&str>) -> PipelineItemView { PipelineItemView::for_test( story_id, stage, @@ -228,7 +157,7 @@ mod tests { #[test] fn project_upcoming_item() { - let view = make_view("42_story_test", "upcoming", Some("Test Story")); + let view = make_view("42_story_test", Stage::Upcoming, Some("Test Story")); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Upcoming)); } @@ -237,7 +166,7 @@ mod tests { fn project_backlog_item() { let view = PipelineItemView::for_test( "42_story_test", - "backlog", + Stage::Backlog, Some("Test Story".to_string()), None, None, @@ -265,7 +194,7 @@ mod tests { fn project_current_item() { let view = PipelineItemView::for_test( "42_story_test", - "coding", + Stage::Coding, Some("Test".to_string()), Some("coder-1".to_string()), Some(2), @@ -288,7 +217,14 @@ mod tests { #[test] fn project_merge_item() { - let view = make_view("42_story_test", "merge", Some("Test")); + let view = make_view( + "42_story_test", + Stage::Merge { + feature_branch: fb("feature/story-42_story_test"), + commits_ahead: nz(1), + }, + Some("Test"), + ); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Merge { .. })); if let Stage::Merge { @@ -303,7 +239,13 @@ mod tests { #[test] fn project_blocked_item() { - let view = make_view("42_story_test", "blocked", Some("Test")); + let view = make_view( + "42_story_test", + Stage::Blocked { + reason: String::new(), + }, + Some("Test"), + ); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Blocked { .. })); } @@ -312,7 +254,12 @@ mod tests { fn project_archived_blocked_item() { let view = PipelineItemView::for_test( "42_story_test", - "archived", + Stage::Archived { + archived_at: Utc::now(), + reason: ArchiveReason::Blocked { + reason: "migrated from legacy blocked field".to_string(), + }, + }, Some("Test".to_string()), None, None, @@ -342,7 +289,10 @@ mod tests { fn project_archived_completed_item() { let view = PipelineItemView::for_test( "42_story_test", - "archived", + Stage::Archived { + archived_at: Utc::now(), + reason: ArchiveReason::Completed, + }, Some("Test".to_string()), None, None, @@ -368,16 +318,6 @@ mod tests { )); } - #[test] - fn project_unknown_stage_returns_error() { - let view = make_view("42_story_test", "9_invalid", Some("Test")); - let result = PipelineItem::try_from(&view); - assert!(matches!( - result, - Err(ProjectionError::UnknownStage(s)) if s == "9_invalid" - )); - } - // ── Reverse projection tests ──────────────────────────────────────── #[test]