huskies: merge 951

This commit is contained in:
dave
2026-05-13 04:28:30 +00:00
parent c5abc44a63
commit 2f50e2198b
12 changed files with 178 additions and 218 deletions
+1 -1
View File
@@ -48,7 +48,7 @@ pub use state::{init, subscribe};
pub use types::{
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent,
GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, MergeJobView,
NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView, Stage,
NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView,
TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem,
};
pub use write::{
+94 -3
View File
@@ -295,12 +295,17 @@ pub fn evict_item(story_id: &str) -> Result<(), String> {
}
/// Extract a `PipelineItemView` from a `PipelineItemCrdt`.
///
/// Projects the loose CRDT `stage` register into a typed
/// [`crate::pipeline_state::Stage`]. Items with an unknown or missing stage
/// string are filtered out (`None`), so every `WorkItem` that escapes the
/// read path carries a valid typed stage.
pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemView> {
let story_id = match item.story_id.view() {
JsonValue::String(s) if !s.is_empty() => s,
_ => return None,
};
let stage = match item.stage.view() {
let stage_str = match item.stage.view() {
JsonValue::String(s) if !s.is_empty() => s,
_ => return None,
};
@@ -368,6 +373,8 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemV
_ => None,
};
let stage = project_stage_for_view(&stage_str, &story_id, merged_at, blocked)?;
Some(PipelineItemView {
story_id,
stage,
@@ -388,6 +395,90 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemV
})
}
/// Project the loose `stage` string from the CRDT into a typed
/// [`crate::pipeline_state::Stage`].
///
/// Rich variants synthesise payload fields from sibling registers (or sane
/// defaults). Returns `None` for unknown stage strings — the read path drops
/// the entry so no caller ever sees a stage it can't pattern-match against.
///
/// Accepts BOTH the clean post-934 wire vocabulary (`"backlog"`, `"coding"`,
/// `"qa"`, etc.) and pre-934 directory-style strings (`"1_backlog"`,
/// `"2_current"`, etc.) — legacy strings are normalised to their clean form
/// before the typed projection. This keeps remote ops from older nodes (and
/// raw-CRDT test inserts that bypass `migrate_legacy_stage_strings`) from
/// silently disappearing from the typed read path.
fn project_stage_for_view(
stage_str: &str,
story_id: &str,
merged_at: Option<f64>,
blocked: Option<bool>,
) -> Option<crate::pipeline_state::Stage> {
use crate::pipeline_state::{ArchiveReason, BranchName, GitSha, Stage};
use chrono::{DateTime, Utc};
use std::num::NonZeroU32;
// Normalise legacy directory-style strings to their clean wire form so
// the match below stays single-shape.
let clean = match stage_str {
"0_upcoming" => "upcoming",
"1_backlog" => "backlog",
"2_current" => "coding",
"2_blocked" => "blocked",
"3_qa" => "qa",
"4_merge" => "merge",
"4_merge_failure" => "merge_failure",
"5_done" => "done",
"6_archived" => "archived",
// Pre-934 `7_frozen` collapses to backlog (the frozen flag is an
// orthogonal CRDT register since story 934 stage 4).
"7_frozen" => "backlog",
other => other,
};
match clean {
"upcoming" => Some(Stage::Upcoming),
"backlog" => Some(Stage::Backlog),
"coding" => Some(Stage::Coding),
"qa" => Some(Stage::Qa),
"blocked" => Some(Stage::Blocked {
reason: String::new(),
}),
"merge" => Some(Stage::Merge {
feature_branch: BranchName(format!("feature/story-{story_id}")),
commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"),
}),
"merge_failure" => Some(Stage::MergeFailure {
reason: String::new(),
}),
"done" => {
let merged_at = merged_at
.map(|ts| {
DateTime::from_timestamp(ts as i64, 0).unwrap_or(DateTime::<Utc>::UNIX_EPOCH)
})
.unwrap_or(DateTime::<Utc>::UNIX_EPOCH);
Some(Stage::Done {
merged_at,
merge_commit: GitSha("legacy".to_string()),
})
}
"archived" => {
let reason = if blocked.unwrap_or(false) {
ArchiveReason::Blocked {
reason: "migrated from legacy blocked field".to_string(),
}
} else {
ArchiveReason::Completed
};
Some(Stage::Archived {
archived_at: Utc::now(),
reason,
})
}
_ => None,
}
}
/// Check whether a dependency (by numeric ID prefix) is in `5_done` or `6_archived`
/// according to CRDT state.
///
@@ -478,7 +569,7 @@ mod tests {
let item_json: JsonValue = json!({
"story_id": "40_story_view",
"stage": "3_qa",
"stage": "qa",
"name": "View Test",
"agent": "coder-1",
"retry_count": 2.0,
@@ -494,7 +585,7 @@ mod tests {
let view = extract_item_view(&crdt.doc.items[0]).unwrap();
assert_eq!(view.story_id, "40_story_view");
assert_eq!(view.stage, "3_qa");
assert!(matches!(view.stage, crate::pipeline_state::Stage::Qa));
assert_eq!(view.name.as_deref(), Some("View Test"));
assert_eq!(view.agent.as_deref(), Some("coder-1"));
assert_eq!(view.retry_count, Some(2));
+2 -2
View File
@@ -165,7 +165,7 @@ async fn init_and_write_read_roundtrip() {
// Insert and update like write_item does.
let item_json: JsonValue = json!({
"story_id": "50_story_roundtrip",
"stage": "1_backlog",
"stage": "backlog",
"name": "Roundtrip",
"agent": "",
"retry_count": 0.0,
@@ -206,7 +206,7 @@ async fn init_and_write_read_roundtrip() {
let view = extract_item_view(&crdt2.doc.items[0]).unwrap();
assert_eq!(view.story_id, "50_story_roundtrip");
assert_eq!(view.stage, "1_backlog");
assert!(matches!(view.stage, crate::pipeline_state::Stage::Backlog));
assert_eq!(view.name.as_deref(), Some("Roundtrip"));
}
+6 -83
View File
@@ -124,83 +124,6 @@ pub struct NodePresenceCrdt {
// ── Read-side view types ─────────────────────────────────────────────
/// Pipeline stage inferred from the CRDT `stage` register.
///
/// This is the low-level typed stage for [`WorkItem`] accessors. For rich
/// transition metadata (merge commits, timestamps, etc.) project via
/// `pipeline_state::Stage` instead.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Stage {
/// Story created but not yet triaged (`0_upcoming`).
Upcoming,
/// Waiting for dependencies or auto-assign (`1_backlog`).
Backlog,
/// Actively being coded (`2_current`).
Coding,
/// Blocked awaiting human resolution (`2_blocked`).
Blocked,
/// Coder done; gates running (`3_qa`).
Qa,
/// Gates passed; ready to merge (`4_merge`).
Merge,
/// Merge failed; awaiting intervention (`4_merge_failure`).
MergeFailure,
/// Merged to master (`5_done`).
Done,
/// Out of the active flow (`6_archived`).
Archived,
/// An unrecognised stage string — forward-compatible catch-all.
Unknown(String),
}
impl Stage {
/// Parse a stage wire string into the typed enum.
///
/// Accepts only the post-934 clean vocabulary (`"backlog"`, `"coding"`,
/// `"qa"`, `"merge"`, `"merge_failure"`, `"blocked"`, `"done"`,
/// `"archived"`, `"upcoming"`). Pre-934 directory-style strings
/// (`"2_current"`, `"4_merge"`, etc.) are no longer accepted — they are
/// rewritten at startup by `migrate_legacy_stage_strings`.
pub fn from_dir(s: &str) -> Self {
match s {
"upcoming" => Stage::Upcoming,
"backlog" => Stage::Backlog,
"coding" => Stage::Coding,
"blocked" => Stage::Blocked,
"qa" => Stage::Qa,
"merge" => Stage::Merge,
"merge_failure" => Stage::MergeFailure,
"done" => Stage::Done,
"archived" => Stage::Archived,
other => Stage::Unknown(other.to_string()),
}
}
/// Convert back to the wire string for persistence into the CRDT.
///
/// Post-934: clean vocabulary (no numeric prefixes); the strings only
/// survive at this single CRDT-serialisation boundary.
pub fn as_dir(&self) -> &str {
match self {
Stage::Upcoming => "upcoming",
Stage::Backlog => "backlog",
Stage::Coding => "coding",
Stage::Blocked => "blocked",
Stage::Qa => "qa",
Stage::Merge => "merge",
Stage::MergeFailure => "merge_failure",
Stage::Done => "done",
Stage::Archived => "archived",
Stage::Unknown(s) => s.as_str(),
}
}
/// `true` if this is an "active" stage (`Coding`, `Qa`, or `Merge`).
pub fn is_active(&self) -> bool {
matches!(self, Stage::Coding | Stage::Qa | Stage::Merge)
}
}
/// A typed snapshot of a single pipeline work item derived from the CRDT document.
///
/// Access fields exclusively through the typed accessor methods — raw field access is
@@ -213,7 +136,7 @@ impl Stage {
#[derive(Clone, Debug)]
pub struct WorkItem {
pub(super) story_id: String,
pub(super) stage: String,
pub(super) stage: crate::pipeline_state::Stage,
pub(super) name: Option<String>,
pub(super) agent: Option<String>,
pub(super) retry_count: Option<i64>,
@@ -245,9 +168,9 @@ impl WorkItem {
&self.story_id
}
/// Pipeline stage as a typed enum.
pub fn stage(&self) -> Stage {
Stage::from_dir(&self.stage)
/// Pipeline stage as a typed [`crate::pipeline_state::Stage`].
pub fn stage(&self) -> &crate::pipeline_state::Stage {
&self.stage
}
/// Human-readable story name, or `None` when unset.
@@ -331,7 +254,7 @@ impl WorkItem {
#[allow(clippy::too_many_arguments)]
pub fn for_test(
story_id: impl Into<String>,
stage: impl Into<String>,
stage: crate::pipeline_state::Stage,
name: Option<String>,
agent: Option<String>,
retry_count: Option<i64>,
@@ -349,7 +272,7 @@ impl WorkItem {
) -> Self {
Self {
story_id: story_id.into(),
stage: stage.into(),
stage,
name,
agent,
retry_count,
+13 -8
View File
@@ -343,10 +343,9 @@ mod stage_migration_tests {
for (id, _, expected_variant) in cases {
let view = read_item(id).expect("item must still exist after migration");
let projected: Stage = crate::pipeline_state::project_stage(&view)
.expect("projection must succeed after migration");
let projected: &Stage = view.stage();
assert_eq!(
std::mem::discriminant(&projected),
std::mem::discriminant(projected),
std::mem::discriminant(expected_variant),
"stage for {id} should project to {expected_variant:?} after migration, got {projected:?}",
);
@@ -367,7 +366,7 @@ mod stage_migration_tests {
let after = read_item(story_id).expect("item must still exist after migration");
assert!(
matches!(after.stage(), crate::crdt_state::Stage::Backlog),
matches!(after.stage(), crate::pipeline_state::Stage::Backlog),
"7_frozen should collapse to Backlog: got {:?}",
after.stage()
);
@@ -400,8 +399,14 @@ mod stage_migration_tests {
// Clean item is unchanged; legacy item is now clean too.
let clean = read_item("9520_already_clean").unwrap();
let migrated = read_item("9521_needs_migration").unwrap();
assert!(matches!(clean.stage(), crate::crdt_state::Stage::Coding));
assert!(matches!(migrated.stage(), crate::crdt_state::Stage::Coding));
assert!(matches!(
clean.stage(),
crate::pipeline_state::Stage::Coding
));
assert!(matches!(
migrated.stage(),
crate::pipeline_state::Stage::Coding
));
}
#[test]
@@ -413,7 +418,7 @@ mod stage_migration_tests {
let after_first = read_item("9530_idempotent").unwrap();
assert!(matches!(
after_first.stage(),
crate::crdt_state::Stage::Merge
crate::pipeline_state::Stage::Merge { .. }
));
// Second call must be a no-op — the filter pass returns empty.
@@ -421,7 +426,7 @@ mod stage_migration_tests {
let after_second = read_item("9530_idempotent").unwrap();
assert!(matches!(
after_second.stage(),
crate::crdt_state::Stage::Merge
crate::pipeline_state::Stage::Merge { .. }
));
}
+8 -8
View File
@@ -216,7 +216,7 @@ fn migrate_story_ids_to_numeric_preserves_stage_and_name() {
migrate_story_ids_to_numeric();
let item = read_item("45").expect("item must be accessible by numeric ID");
assert_eq!(item.stage, "coding");
assert!(matches!(item.stage, crate::pipeline_state::Stage::Coding));
assert_eq!(item.name.as_deref(), Some("Crash Bug"));
assert_eq!(item.agent.as_deref(), Some("coder-1"));
}
@@ -620,7 +620,7 @@ async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() {
let sid = format!("{}_story_warmup", i);
let item: JsonValue = json!({
"story_id": sid,
"stage": "1_backlog",
"stage": "backlog",
"name": "",
"agent": "",
"retry_count": 0.0,
@@ -657,7 +657,7 @@ async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() {
let idx = rebuild_index(&crdt)["511_story_target"];
let stage_op = crdt.doc.items[idx]
.stage
.set("2_current".to_string())
.set("coding".to_string())
.sign(&kp);
crdt.apply(stage_op.clone());
// stage_op.inner.seq == 1
@@ -701,8 +701,8 @@ async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() {
);
let idx2 = index2["511_story_target"];
let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap();
assert_eq!(
view.stage, "2_current",
assert!(
matches!(view.stage, crate::pipeline_state::Stage::Coding),
"stage field update lost during replay (bug 511 regression)"
);
@@ -726,9 +726,9 @@ async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() {
// but the stage update is lost (it ran before the item existed).
if let Some(idx3) = index3.get("511_story_target") {
let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap();
// The bug: stage is still "1_backlog" because the update was dropped.
assert_eq!(
view3.stage, "1_backlog",
// The bug: stage is still "backlog" because the update was dropped.
assert!(
matches!(view3.stage, crate::pipeline_state::Stage::Backlog),
"expected seq-ASC replay to exhibit the bug (update lost)"
);
}