From e82602db7748c7c7f51d7fb4a2893c495a66a796 Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 15 May 2026 08:21:36 +0000 Subject: [PATCH] =?UTF-8?q?huskies:=20merge=201086=20story=20Pipeline+Stat?= =?UTF-8?q?us=20split=20=E2=80=94=20Step=20C:=20migrate=20auto-assign,=20s?= =?UTF-8?q?ubscribers,=20and=20lifecycle=20transitions=20to=20read=20Pipel?= =?UTF-8?q?ine=20+=20Status?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- script/test | 17 ++++++- server/src/agents/pool/auto_assign/backlog.rs | 30 ++++++++---- .../merge_failure_block_subscriber.rs | 9 +++- .../auto_assign/merge_failure_subscriber.rs | 12 ++++- .../src/agents/pool/cost_rollup_subscriber.rs | 18 ++++--- server/src/agents/pool/worktree_lifecycle.rs | 43 ++++++++++------- server/src/crdt_state/read.rs | 48 ++++++++----------- server/src/crdt_state/types.rs | 12 +++++ server/src/crdt_state/write/item.rs | 19 ++++++++ server/src/db/gc.rs | 20 ++++---- server/src/io/watcher/sweep.rs | 9 +++- 11 files changed, 159 insertions(+), 78 deletions(-) diff --git a/script/test b/script/test index 30b8e629..c8920f03 100755 --- a/script/test +++ b/script/test @@ -53,7 +53,22 @@ cargo run --manifest-path "$PROJECT_ROOT/Cargo.toml" -p source-map-gen --bin sou echo "=== Building frontend ===" if [ -d "$PROJECT_ROOT/frontend" ]; then cd "$PROJECT_ROOT/frontend" - npm install + # The merge gate runs in workspaces whose pre-existing `node_modules` was + # populated by an earlier `npm install --omit=dev` (or a partial install). + # In that state `npm install` reports "up to date, audited N packages" + # without actually adding the missing devDependencies, so the subsequent + # `tsc && vite build` fails with `sh: 1: tsc: not found`. + # + # Repair the install when typescript isn't reachable (story 1086 merge gate + # regression). We probe the on-disk binary rather than relying on PATH so + # this also covers the case where `node_modules/.bin/` is missing. + if [ ! -x node_modules/typescript/bin/tsc ]; then + echo "[script/test] node_modules missing typescript; performing clean install." + rm -rf node_modules + npm install --include=dev + else + npm install --include=dev + fi npm run build cd "$PROJECT_ROOT" else diff --git a/server/src/agents/pool/auto_assign/backlog.rs b/server/src/agents/pool/auto_assign/backlog.rs index 3a217d2e..8f4c934e 100644 --- a/server/src/agents/pool/auto_assign/backlog.rs +++ b/server/src/agents/pool/auto_assign/backlog.rs @@ -1,29 +1,39 @@ -//! Backlog promotion: scan `1_backlog/` and promote stories whose `depends_on` are all met. +//! Backlog promotion: scan items in `Pipeline::Backlog` and promote stories whose `depends_on` are all met. -use crate::pipeline_state::Stage; +use crate::pipeline_state::Pipeline; use crate::slog; use crate::slog_warn; use super::super::AgentPool; -use super::scan::scan_stage_items; use super::story_checks::{check_archived_dependencies, has_unmet_dependencies}; impl AgentPool { - /// Scan `1_backlog/` and promote any story whose `depends_on` are all met. + /// Scan items in `Pipeline::Backlog` and promote any story whose `depends_on` are all met. /// /// A story is only promoted if it explicitly lists `depends_on` AND every - /// listed dependency has reached `5_done` or `6_archived`. Stories with no - /// `depends_on` are left in the backlog for human scheduling. + /// listed dependency has reached `Pipeline::Done` or `Pipeline::Archived`. + /// Stories with no `depends_on` are left in the backlog for human scheduling. /// - /// **Archived dep semantics:** a dep in `6_archived` counts as satisfied (since - /// stories auto-sweep from `5_done` to `6_archived` after 4 hours, and the + /// **Archived dep semantics:** a dep in `Pipeline::Archived` counts as satisfied + /// (since stories auto-sweep from `Done` to `Archived` after 4 hours, and the /// dependent story would normally already be promoted by then). However, if a - /// dep was already in `6_archived` when the dependent story was created (e.g. it + /// dep was already archived when the dependent story was created (e.g. it /// was abandoned/superseded before the dependent existed), a prominent warning is /// logged so the user can see the promotion was triggered by an archived dep, not /// a clean completion. pub(super) fn promote_ready_backlog_stories(&self) { - let items = scan_stage_items(&Stage::Backlog); + // Story 1086: scan by Pipeline column, not Stage variant. Pipeline::Backlog + // covers Stage::Upcoming and Stage::Backlog uniformly. + let items: Vec = { + use std::collections::BTreeSet; + let mut ids = BTreeSet::new(); + for item in crate::pipeline_state::read_all_typed() { + if item.stage.pipeline() == Pipeline::Backlog { + ids.insert(item.story_id.0.clone()); + } + } + ids.into_iter().collect() + }; for story_id in &items { // Only promote stories that explicitly declare dependencies // (story 929: read from the CRDT register, not YAML). diff --git a/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs b/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs index 018a2c74..d67758cf 100644 --- a/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs +++ b/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs @@ -13,7 +13,7 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; -use crate::pipeline_state::{MergeFailureKind, PipelineEvent, Stage, StoryId}; +use crate::pipeline_state::{MergeFailureKind, PipelineEvent, Stage, Status, StoryId}; use crate::slog; use crate::slog_warn; @@ -95,6 +95,13 @@ fn on_transition( counters: &mut HashMap, recovery_running: bool, ) { + // Story 1086: gate on the typed `Status` projection — `Status::MergeFailure` + // is precisely the set of stages we count toward the block threshold. We + // still need the variant pattern below to read `kind`. + if fired.after.status() != Status::MergeFailure { + counters.remove(&fired.story_id); + return; + } match &fired.after { Stage::MergeFailure { kind, .. } => { if recovery_running { diff --git a/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs b/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs index 6a36dd4e..81604cec 100644 --- a/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs +++ b/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs @@ -9,7 +9,7 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; -use crate::pipeline_state::{MergeFailureKind, Stage}; +use crate::pipeline_state::{MergeFailureKind, Stage, Status}; use crate::slog; use crate::slog_warn; @@ -26,6 +26,11 @@ use super::scan::{find_free_agent_for_stage, is_story_assigned_for_stage}; pub(crate) async fn reconcile_merge_failure(pool: &Arc, project_root: &Path) { use crate::pipeline_state::{MergeFailureKind, PipelineEvent, Stage, TransitionFired}; for item in crate::pipeline_state::read_all_typed() { + // Story 1086: scan via the Status projection; the variant pattern is + // still needed to read `kind`. + if item.stage.status() != Status::MergeFailure { + continue; + } if let Stage::MergeFailure { ref kind, .. } = item.stage && matches!(kind, MergeFailureKind::ConflictDetected(_)) { @@ -73,6 +78,11 @@ async fn on_merge_failure_transition( project_root: &Path, fired: &crate::pipeline_state::TransitionFired, ) { + // Story 1086: gate on the typed `Status` projection first; only the + // `MergeFailure` kind extraction needs the variant pattern. + if fired.after.status() != Status::MergeFailure { + return; + } let Stage::MergeFailure { ref kind, .. } = fired.after else { return; }; diff --git a/server/src/agents/pool/cost_rollup_subscriber.rs b/server/src/agents/pool/cost_rollup_subscriber.rs index 21a5119e..9c946f10 100644 --- a/server/src/agents/pool/cost_rollup_subscriber.rs +++ b/server/src/agents/pool/cost_rollup_subscriber.rs @@ -9,7 +9,7 @@ use std::path::{Path, PathBuf}; -use crate::pipeline_state::Stage; +use crate::pipeline_state::{Pipeline, Stage, Status}; use crate::slog; use crate::slog_warn; @@ -50,17 +50,15 @@ pub(crate) fn spawn_cost_rollup_subscriber(project_root: PathBuf) { /// Returns `true` if `stage` is a terminal pipeline stage. /// /// Terminal stages are those from which no further work is expected: -/// Done, Archived, Abandoned, Superseded, Rejected. -/// MergeFailure variants are NOT terminal — stories can recover from them. +/// Done, Archived, Abandoned, Superseded, Rejected. Story 1086 routes the +/// classification through the [`Status`] / [`Pipeline`] projection so future +/// Stage variants automatically participate. MergeFailure variants are NOT +/// terminal — stories can recover from them. fn is_terminal(stage: &Stage) -> bool { matches!( - stage, - Stage::Done { .. } - | Stage::Archived { .. } - | Stage::Abandoned { .. } - | Stage::Superseded { .. } - | Stage::Rejected { .. } - ) + stage.status(), + Status::Done | Status::Abandoned | Status::Superseded | Status::Rejected + ) || matches!(stage.pipeline(), Pipeline::Archived) } /// Snapshot the cost data for `fired.story_id` into the register when diff --git a/server/src/agents/pool/worktree_lifecycle.rs b/server/src/agents/pool/worktree_lifecycle.rs index d5bc1051..51ab35d2 100644 --- a/server/src/agents/pool/worktree_lifecycle.rs +++ b/server/src/agents/pool/worktree_lifecycle.rs @@ -6,10 +6,20 @@ use std::path::{Path, PathBuf}; -use crate::pipeline_state::Stage; +use crate::pipeline_state::{Pipeline, Stage, Status}; use crate::slog; use crate::slog_warn; +/// Story 1086: matches the set of terminal stages used by the worktree-cleanup +/// subscriber via the typed [`Status`] / [`Pipeline`] projections. Excludes +/// `Status::Rejected` so rejected stories keep their worktree for human review. +fn is_cleanup_terminal(stage: &Stage) -> bool { + matches!( + stage.status(), + Status::Done | Status::Abandoned | Status::Superseded + ) || matches!(stage.pipeline(), Pipeline::Archived) +} + /// Spawn a background task that creates a git worktree when a story enters `Stage::Coding`. /// /// Subscribes to the pipeline transition broadcast channel. On each @@ -22,7 +32,14 @@ pub(crate) fn spawn_worktree_create_subscriber(project_root: PathBuf, port: u16) loop { match rx.recv().await { Ok(fired) => { - if matches!(fired.after, Stage::Coding { .. }) { + // Story 1086: classify by Pipeline column. `Pipeline::Coding` + // covers `Stage::Coding` and `Stage::Blocked` — but Blocked has + // no worktree to create, so we still need the Stage::Coding + // payload check. Use a layered match: pipeline first for fast + // skip, then variant guard. + if fired.after.pipeline() == Pipeline::Coding + && matches!(fired.after, Stage::Coding { .. }) + { on_coding_transition(&project_root, port, &fired.story_id.0).await; } } @@ -50,13 +67,7 @@ pub(crate) fn spawn_worktree_cleanup_subscriber(project_root: PathBuf) { loop { match rx.recv().await { Ok(fired) => { - if matches!( - fired.after, - Stage::Done { .. } - | Stage::Archived { .. } - | Stage::Abandoned { .. } - | Stage::Superseded { .. } - ) { + if is_cleanup_terminal(&fired.after) { on_terminal_transition(&project_root, &fired.story_id.0).await; } } @@ -79,7 +90,11 @@ pub(crate) fn spawn_worktree_cleanup_subscriber(project_root: PathBuf) { /// so that Lagged events on the broadcast channel never leave Coding stories without worktrees. pub(crate) async fn reconcile_worktree_create(project_root: &Path, port: u16) { for item in crate::pipeline_state::read_all_typed() { - if matches!(item.stage, crate::pipeline_state::Stage::Coding { .. }) { + // Story 1086: filter by Pipeline column then narrow to the `Coding` + // variant (Blocked is in `Pipeline::Coding` but has no worktree). + if item.stage.pipeline() == Pipeline::Coding + && matches!(item.stage, crate::pipeline_state::Stage::Coding { .. }) + { on_coding_transition(project_root, port, &item.story_id.0).await; } } @@ -92,13 +107,7 @@ pub(crate) async fn reconcile_worktree_create(project_root: &Path, port: u16) { /// the broadcast channel never leave terminal stories with dangling worktrees. pub(crate) async fn reconcile_worktree_cleanup(project_root: &Path) { for item in crate::pipeline_state::read_all_typed() { - if matches!( - item.stage, - crate::pipeline_state::Stage::Done { .. } - | crate::pipeline_state::Stage::Archived { .. } - | crate::pipeline_state::Stage::Abandoned { .. } - | crate::pipeline_state::Stage::Superseded { .. } - ) { + if is_cleanup_terminal(&item.stage) { on_terminal_transition(project_root, &item.story_id.0).await; } } diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index 1d12724a..93429531 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -598,56 +598,48 @@ fn project_stage_for_view( } } -/// Check whether a dependency (by numeric ID prefix) is in `5_done` or `6_archived` -/// according to CRDT state. +/// Check whether a dependency (by numeric ID prefix) is in `Pipeline::Done` or +/// `Pipeline::Archived` according to CRDT state. /// -/// Returns `true` if the dependency is satisfied (item found in a done stage). -/// Matches both legacy slug-form IDs (`"664_story_foo"`) and numeric-only IDs -/// (`"664"`) so the check remains correct after the slug→numeric migration. -/// See `dep_is_archived_crdt` to distinguish archive-satisfied from cleanly-done. +/// Returns `true` if the dependency is satisfied (item found in a Done or +/// Archived pipeline column). Matches both legacy slug-form IDs +/// (`"664_story_foo"`) and numeric-only IDs (`"664"`) so the check remains +/// correct after the slug→numeric migration. Story 1086 routes the check +/// through the `Pipeline` projection so that future Stage variants automatically +/// participate via [`crate::pipeline_state::Stage::pipeline`]. See +/// `dep_is_archived_crdt` to distinguish archive-satisfied from cleanly-done. pub fn dep_is_done_crdt(dep_number: u32) -> bool { - use crate::pipeline_state::{Stage, read_all_typed}; + use crate::pipeline_state::{Pipeline, read_all_typed}; let exact = dep_number.to_string(); let prefix = format!("{dep_number}_"); read_all_typed().into_iter().any(|item| { (item.story_id.0 == exact || item.story_id.0.starts_with(&prefix)) - && matches!( - item.stage, - Stage::Done { .. } - | Stage::Archived { .. } - | Stage::Abandoned { .. } - | Stage::Superseded { .. } - | Stage::Rejected { .. } - ) + && matches!(item.stage.pipeline(), Pipeline::Done | Pipeline::Archived) }) } -/// Check whether a dependency (by numeric ID prefix) is specifically in `6_archived` -/// according to CRDT state. +/// Check whether a dependency (by numeric ID prefix) is specifically in +/// `Pipeline::Archived` according to CRDT state. /// /// Used to detect when a dependency is satisfied via archive rather than via a clean -/// completion through `5_done`. Returns `false` when the CRDT layer is not initialised. -/// Matches both legacy slug-form IDs (`"664_story_foo"`) and numeric-only IDs (`"664"`). +/// completion through `Pipeline::Done`. Returns `false` when the CRDT layer is not +/// initialised. Matches both legacy slug-form IDs (`"664_story_foo"`) and +/// numeric-only IDs (`"664"`). pub fn dep_is_archived_crdt(dep_number: u32) -> bool { - use crate::pipeline_state::{Stage, read_all_typed}; + use crate::pipeline_state::{Pipeline, read_all_typed}; let exact = dep_number.to_string(); let prefix = format!("{dep_number}_"); read_all_typed().into_iter().any(|item| { (item.story_id.0 == exact || item.story_id.0.starts_with(&prefix)) - && matches!( - item.stage, - Stage::Archived { .. } - | Stage::Abandoned { .. } - | Stage::Superseded { .. } - | Stage::Rejected { .. } - ) + && item.stage.pipeline() == Pipeline::Archived }) } /// Check unmet dependencies for a story by reading its `depends_on` from the /// CRDT document and checking each dependency against CRDT state. /// -/// Returns the list of dependency numbers that are NOT in `5_done` or `6_archived`. +/// Returns the list of dependency numbers whose stage is NOT in `Pipeline::Done` +/// or `Pipeline::Archived`. pub fn check_unmet_deps_crdt(story_id: &str) -> Vec { let item = match read_item(story_id) { Some(i) => i, diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs index 8d8a2fef..42e8a496 100644 --- a/server/src/crdt_state/types.rs +++ b/server/src/crdt_state/types.rs @@ -105,6 +105,18 @@ pub struct PipelineItemCrdt { /// means no merge task is in flight. Projected into `Stage::Merge { /// server_start_time }` so callers never read this register directly. pub merge_server_start: LwwRegisterCrdt, + /// Story 1086: kebab-case wire form of the [`crate::pipeline_state::Pipeline`] + /// projection of the current `stage`. Written by `write_item` alongside + /// `stage` so display/scan code on remote peers can route by pipeline column + /// without re-deriving from the stage string. Empty string means "use the + /// value derived from `stage`" (legacy items predating 1086). + pub pipeline: LwwRegisterCrdt, + /// Story 1086: kebab-case wire form of the [`crate::pipeline_state::Status`] + /// projection of the current `stage`. Written alongside `stage` so badge + /// renderers can read the status directly without re-projecting from the + /// stage string. Empty string means "use the value derived from `stage`" + /// (legacy items predating 1086). + pub status: LwwRegisterCrdt, /// Story 1088: origin of the work item — who or what created it. /// /// Stored as a compact JSON string, e.g. diff --git a/server/src/crdt_state/write/item.rs b/server/src/crdt_state/write/item.rs index 9535ceee..6a227ece 100644 --- a/server/src/crdt_state/write/item.rs +++ b/server/src/crdt_state/write/item.rs @@ -281,6 +281,11 @@ pub fn write_item( merged_at: Option, ) { let stage_str = stage_dir_name(stage); + // Story 1086: persist the typed Pipeline + Status projections alongside + // the stage register so subscribers/display code on remote peers can route + // by them without re-deriving from the stage string. + let pipeline_str = stage.pipeline().as_str(); + let status_str = stage.status().as_str(); let claim: Option<&AgentClaim> = match stage { Stage::Coding { claim, .. } => claim.as_ref(), Stage::Merge { claim, .. } => claim.as_ref(), @@ -336,6 +341,14 @@ pub fn write_item( apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].stage.set(stage_str.to_string()) }); + // Story 1086: keep `pipeline` and `status` registers in lock-step with + // the stage write so subscribers/display can read them directly. + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].pipeline.set(pipeline_str.to_string()) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].status.set(status_str.to_string()) + }); if let Some(n) = name { apply_and_persist(&mut state, |s| { @@ -419,6 +432,9 @@ pub fn write_item( "resume_to": "", "plan_state": "", "merge_server_start": merge_server_start_val, + // Story 1086: typed Pipeline + Status projections written at insert. + "pipeline": pipeline_str, + "status": status_str, "origin": "", }) .into(); @@ -450,6 +466,9 @@ pub fn write_item( item.resume_to.advance_seq(floor); item.plan_state.advance_seq(floor); item.merge_server_start.advance_seq(floor); + // Story 1086. + item.pipeline.advance_seq(floor); + item.status.advance_seq(floor); item.origin.advance_seq(floor); } diff --git a/server/src/db/gc.rs b/server/src/db/gc.rs index e9b84eac..e7d4f7e6 100644 --- a/server/src/db/gc.rs +++ b/server/src/db/gc.rs @@ -12,7 +12,7 @@ //! zombie entries left over from sessions that predate the subscriber. use crate::db::{ContentKey, all_content_ids, delete_content}; -use crate::pipeline_state::Stage; +use crate::pipeline_state::{Pipeline, Stage, Status}; use crate::slog; use crate::slog_warn; @@ -111,16 +111,18 @@ pub(crate) fn sweep_zombie_content_on_startup() { } } -/// Return `true` when `stage` is one of the five terminal pipeline stages. +/// Return `true` when `stage` is one of the terminal pipeline classifications. +/// +/// Story 1086: matches via the [`Status`] projection (Done / Abandoned / +/// Superseded / Rejected) plus [`Pipeline::Archived`] for plain archived items +/// (which carry `Status::Active`). Future Stage variants automatically +/// participate by returning the appropriate Status / Pipeline from +/// [`Stage::status`] / [`Stage::pipeline`]. fn is_terminal_stage(stage: &Stage) -> bool { matches!( - stage, - Stage::Done { .. } - | Stage::Archived { .. } - | Stage::Abandoned { .. } - | Stage::Superseded { .. } - | Stage::Rejected { .. } - ) + stage.status(), + Status::Done | Status::Abandoned | Status::Superseded | Status::Rejected + ) || matches!(stage.pipeline(), Pipeline::Archived) } #[cfg(test)] diff --git a/server/src/io/watcher/sweep.rs b/server/src/io/watcher/sweep.rs index 8555290c..88f7fb6f 100644 --- a/server/src/io/watcher/sweep.rs +++ b/server/src/io/watcher/sweep.rs @@ -29,13 +29,20 @@ use std::time::Duration; /// /// Replaces the periodic `sweep_done_to_archived` call from the tick loop. pub(crate) fn spawn_done_to_archived_subscriber(done_retention: Duration) { - use crate::pipeline_state::{PipelineEvent, Stage, apply_transition, subscribe_transitions}; + use crate::pipeline_state::{ + PipelineEvent, Stage, Status, apply_transition, subscribe_transitions, + }; let mut rx = subscribe_transitions(); tokio::spawn(async move { loop { match rx.recv().await { Ok(fired) => { + // Story 1086: gate on the typed `Status::Done` projection; + // the variant pattern is still required to read `merged_at`. + if fired.after.status() != Status::Done { + continue; + } if let Stage::Done { merged_at, .. } = fired.after { let story_id = fired.story_id.0.clone(); let retention = done_retention;