huskies: merge 891

This commit is contained in:
dave
2026-05-12 17:03:41 +00:00
parent b76633b79b
commit 148ce37beb
20 changed files with 418 additions and 262 deletions
+4 -4
View File
@@ -103,11 +103,11 @@ mod tests {
// Confirm the stale claim is in place.
let before = read_item(story_id).expect("item should exist");
assert_eq!(
before.claimed_by.as_deref(),
before.claimed_by(),
Some(stale_holder),
"pre-condition: item should be claimed by the stale holder"
);
let age = chrono::Utc::now().timestamp() as f64 - before.claimed_at.unwrap_or(0.0);
let age = chrono::Utc::now().timestamp() as f64 - before.claimed_at().unwrap_or(0.0);
assert!(
age >= CLAIM_TIMEOUT_SECS,
"pre-condition: claim age ({age}s) must exceed TTL ({CLAIM_TIMEOUT_SECS}s)"
@@ -134,12 +134,12 @@ mod tests {
let our_id = our_node_id().expect("node id should be available after init_for_test");
let after = read_item(story_id).expect("item should still exist");
assert_eq!(
after.claimed_by.as_deref(),
after.claimed_by(),
Some(our_id.as_str()),
"new claim should have displaced the stale holder"
);
assert_ne!(
after.claimed_by.as_deref(),
after.claimed_by(),
Some(stale_holder),
"stale holder must no longer own the claim"
);
+19 -17
View File
@@ -42,27 +42,28 @@ pub(super) async fn scan_and_claim(
for item in &items {
// Only claim stories in active stages.
if !crate::pipeline_state::Stage::from_dir(&item.stage).is_some_and(|s| s.is_active()) {
if !crate::pipeline_state::Stage::from_dir(item.stage_str()).is_some_and(|s| s.is_active())
{
continue;
}
// Skip blocked stories.
if item.blocked == Some(true) {
if item.blocked() {
continue;
}
// If already claimed by us, skip.
if item.claimed_by.as_deref() == Some(&our_node) {
if item.claimed_by() == Some(our_node.as_str()) {
continue;
}
// If claimed by another node, respect the claim while it is fresh.
// Once the TTL expires the claim is considered stale regardless of
// whether the holder appears alive — displacement is purely TTL-driven.
if let Some(ref claimer) = item.claimed_by
if let Some(claimer) = item.claimed_by()
&& !claimer.is_empty()
&& claimer != &our_node
&& let Some(claimed_at) = item.claimed_at
&& claimer != our_node.as_str()
&& let Some(claimed_at) = item.claimed_at()
{
let now = chrono::Utc::now().timestamp() as f64;
let age = now - claimed_at;
@@ -74,7 +75,7 @@ pub(super) async fn scan_and_claim(
slog!(
"[agent-mode] Displacing stale claim on '{}' held by {:.12}… \
(age {}s > TTL {}s)",
item.story_id,
item.story_id(),
claimer,
age as u64,
CLAIM_TIMEOUT_SECS as u64,
@@ -97,10 +98,10 @@ pub(super) async fn scan_and_claim(
})
.map(|n| n.node_id)
.collect();
if !should_self_claim(&our_node, &item.story_id, &alive_peers) {
if !should_self_claim(&our_node, item.story_id(), &alive_peers) {
slog!(
"[agent-mode] Hash tie-break: deferring claim on '{}' to lower-hash peer",
item.story_id
item.story_id()
);
continue;
}
@@ -108,11 +109,11 @@ pub(super) async fn scan_and_claim(
// Try to claim this story.
slog!(
"[agent-mode] Claiming story '{}' for this node",
item.story_id
item.story_id()
);
if crdt_state::write_claim(&item.story_id) {
if crdt_state::write_claim(item.story_id()) {
let now = chrono::Utc::now().timestamp() as f64;
our_claims.insert(item.story_id.clone(), now);
our_claims.insert(item.story_id().to_string(), now);
}
}
@@ -165,27 +166,28 @@ pub(super) fn reclaim_timed_out_work(_project_root: &Path) {
let now = chrono::Utc::now().timestamp() as f64;
for item in &items {
if !crate::pipeline_state::Stage::from_dir(&item.stage).is_some_and(|s| s.is_active()) {
if !crate::pipeline_state::Stage::from_dir(item.stage_str()).is_some_and(|s| s.is_active())
{
continue;
}
// Release the claim if the TTL has expired — regardless of whether the
// holder is still alive. A node actively working should refresh its
// claim before the TTL window closes.
if let Some(ref claimer) = item.claimed_by {
if let Some(claimer) = item.claimed_by() {
if claimer.is_empty() {
continue;
}
if let Some(claimed_at) = item.claimed_at
if let Some(claimed_at) = item.claimed_at()
&& now - claimed_at >= CLAIM_TIMEOUT_SECS
{
slog!(
"[agent-mode] Releasing stale claim on '{}' held by {:.12}… (age {}s)",
item.story_id,
item.story_id(),
claimer,
(now - claimed_at) as u64,
);
crdt_state::release_claim(&item.story_id);
crdt_state::release_claim(item.story_id());
}
}
}
@@ -19,10 +19,10 @@ pub(super) fn read_story_front_matter_agent(
story_id: &str,
) -> Option<String> {
if let Some(view) = crate::crdt_state::read_item(story_id)
&& let Some(agent) = view.agent.as_ref()
&& let Some(agent) = view.agent()
&& !agent.is_empty()
{
return Some(agent.clone());
return Some(agent.to_string());
}
use crate::db::yaml_legacy::parse_front_matter;
let contents = read_story_contents(project_root, story_id)?;
@@ -101,7 +101,7 @@ pub(super) fn has_mergemaster_attempted(
story_id: &str,
) -> bool {
crate::crdt_state::read_item(story_id)
.and_then(|view| view.mergemaster_attempted)
.map(|view| view.mergemaster_attempted())
.unwrap_or(false)
}
@@ -274,9 +274,10 @@ max_turns = 10
let item = crate::crdt_state::read_item(story_id)
.expect("story must be in CRDT after watchdog termination");
assert_eq!(
item.stage, "2_blocked",
item.stage_str(),
"2_blocked",
"story stage must be 2_blocked after limit termination with max_retries=1 — got: {}",
item.stage
item.stage_str()
);
// Sanity: the agent itself is also Failed with the right reason.
@@ -415,7 +416,8 @@ max_turns = 10
let item = crate::crdt_state::read_item(story_id)
.expect("story must be in CRDT after per-session overrun");
assert_eq!(
item.stage, "2_blocked",
item.stage_str(),
"2_blocked",
"story stage must be 2_blocked after per-session overrun with max_retries=1"
);
}
@@ -471,12 +473,12 @@ max_turns = 10
let item = crate::crdt_state::read_item(story_id).expect("story must be in CRDT");
assert_eq!(
item.retry_count,
Some(1),
item.retry_count(),
1,
"after session 1, retry_count should be 1 in CRDT"
);
assert_ne!(
item.stage.as_str(),
item.stage_str(),
"2_blocked",
"story should NOT be blocked after session 1"
);
@@ -491,12 +493,12 @@ max_turns = 10
let item = crate::crdt_state::read_item(story_id).expect("story must be in CRDT");
assert_eq!(
item.retry_count,
Some(2),
item.retry_count(),
2,
"after session 2, retry_count should be 2 in CRDT"
);
assert_ne!(
item.stage.as_str(),
item.stage_str(),
"2_blocked",
"story should NOT be blocked after session 2"
);
@@ -511,15 +513,16 @@ max_turns = 10
let item = crate::crdt_state::read_item(story_id).expect("story must be in CRDT");
assert_eq!(
item.stage, "2_blocked",
item.stage_str(),
"2_blocked",
"story must be blocked after session 3 (retry_count=3 >= max_retries=3) — got: {}",
item.stage
item.stage_str()
);
// retry_count resets to 0 on stage transition (Bug 780) — the fact
// that the story reached 2_blocked proves the retry limit was hit.
assert_eq!(
item.retry_count,
Some(0),
item.retry_count(),
0,
"retry_count should reset to 0 after stage transition to blocked"
);
}
@@ -63,7 +63,7 @@ pub(super) fn resolve_qa_mode_from_store(
) -> crate::io::story_metadata::QaMode {
// CRDT register is the authoritative source; check it before the content store.
if let Some(view) = crate::crdt_state::read_item(story_id)
&& let Some(ref s) = view.qa_mode
&& let Some(s) = view.qa_mode()
&& let Some(mode) = crate::io::story_metadata::QaMode::from_str(s)
{
return mode;
@@ -867,8 +867,8 @@ stage = "coder"
let item =
crate::crdt_state::read_item("9950_story_warm_resume").expect("story must be in CRDT");
assert!(
item.retry_count.is_some_and(|rc| rc > 0),
"retry_count must be incremented after warm-resume: got {:?}",
item.retry_count
item.retry_count() > 0,
"retry_count must be incremented after warm-resume: got {}",
item.retry_count()
);
}
+2 -3
View File
@@ -68,7 +68,7 @@ fn inject_gate_failure_section(args: &mut Vec<String>, gate_output: &str) {
/// prior failure context, even when session-resuming (story 881).
pub(super) fn maybe_inject_gate_failure(args: &mut Vec<String>, story_id: &str) {
let retry_count = crate::crdt_state::read_item(story_id)
.and_then(|item| item.retry_count)
.map(|item| item.retry_count())
.unwrap_or(0);
if retry_count > 0
&& let Some(gate_output) = crate::db::read_content(&format!("{story_id}:gate_output"))
@@ -767,8 +767,7 @@ mod tests {
// retry_count must remain 0 — the abort path never calls bump_retry_count.
let retry_count = crate::crdt_state::read_item(story_id)
.and_then(|item| item.retry_count)
.map(|r| r as u32)
.map(|item| item.retry_count())
.unwrap_or(0);
assert_eq!(
retry_count, 0,
+2 -2
View File
@@ -64,10 +64,10 @@ pub(super) fn read_front_matter_agent(story_id: &str, agent_name: Option<&str>)
// to legacy YAML parsing for stories whose CRDT entry doesn't yet have
// the field populated.
if let Some(view) = crate::crdt_state::read_item(story_id)
&& let Some(agent) = view.agent.as_ref()
&& let Some(agent) = view.agent()
&& !agent.is_empty()
{
return Some(agent.clone());
return Some(agent.to_string());
}
crate::db::read_content(story_id).and_then(|contents| {
crate::db::yaml_legacy::parse_front_matter(&contents)
+13 -13
View File
@@ -192,8 +192,8 @@ mod tests {
// CRDT register must hold the deps.
let view = crate::crdt_state::read_item("9910_story_foo").expect("CRDT should have story");
assert_eq!(
view.depends_on,
Some(vec![477, 478]),
view.depends_on(),
&[477, 478],
"CRDT register should hold [477, 478]: {view:?}"
);
// Content store YAML must NOT be mutated with depends_on.
@@ -223,8 +223,8 @@ mod tests {
);
// CRDT register must be empty after clear.
let view = crate::crdt_state::read_item("9911_story_bar").expect("CRDT should have story");
assert_eq!(
view.depends_on, None,
assert!(
view.depends_on().is_empty(),
"CRDT register should be empty after clearing: {view:?}"
);
// Content store YAML must not be mutated.
@@ -260,8 +260,8 @@ mod tests {
let view =
crate::crdt_state::read_item("8790_story_chat_dep").expect("CRDT must have chat story");
assert_eq!(
view.depends_on,
Some(vec![500, 501]),
view.depends_on(),
&[500, 501],
"CRDT must hold [500, 501]: {view:?}"
);
@@ -290,8 +290,8 @@ mod tests {
assert!(out.contains("1"), "response should mention dep 1: {out}");
let view = crate::crdt_state::read_item("9920_story_scr").expect("CRDT must have story");
assert_eq!(
view.depends_on,
Some(vec![1, 2, 3]),
view.depends_on(),
&[1, 2, 3],
"CRDT should hold [1,2,3]: {view:?}"
);
@@ -299,9 +299,9 @@ mod tests {
let out = depends_cmd_with_root(tmp.path(), "9920").unwrap();
assert!(out.contains("Cleared"), "clear should confirm: {out}");
let view = crate::crdt_state::read_item("9920_story_scr").expect("CRDT must have story");
assert_eq!(
view.depends_on, None,
"CRDT should be None after clear: {view:?}"
assert!(
view.depends_on().is_empty(),
"CRDT should be empty after clear: {view:?}"
);
// Replace with [4, 5] — must not append to old list.
@@ -309,8 +309,8 @@ mod tests {
assert!(out.contains("4"), "response should mention dep 4: {out}");
let view = crate::crdt_state::read_item("9920_story_scr").expect("CRDT must have story");
assert_eq!(
view.depends_on,
Some(vec![4, 5]),
view.depends_on(),
&[4, 5],
"CRDT should hold exactly [4,5] after replace: {view:?}"
);
}
+8 -9
View File
@@ -56,7 +56,7 @@ fn unblock_by_story_id(story_id: &str) -> String {
let crdt_item = crate::crdt_state::read_item(story_id);
let story_name = crdt_item
.as_ref()
.and_then(|i| i.name.clone())
.and_then(|i| i.name().map(str::to_string))
.unwrap_or_else(|| story_id.to_string());
// Canonical "is this story blocked?" comes from the typed pipeline state.
@@ -69,7 +69,7 @@ fn unblock_by_story_id(story_id: &str) -> String {
Some(crate::pipeline_state::Stage::MergeFailure { .. })
);
// CRDT register fallback for items not yet projected into typed state.
let crdt_blocked = crdt_item.as_ref().and_then(|i| i.blocked).unwrap_or(false);
let crdt_blocked = crdt_item.as_ref().is_some_and(|i| i.blocked());
if !typed_blocked && !crdt_blocked {
return format!("**{story_name}** ({story_id}) is not blocked. Nothing to unblock.");
@@ -271,8 +271,8 @@ mod tests {
let item = crate::crdt_state::read_item("9903_story_stuck")
.expect("story should be in CRDT after unblock");
assert_eq!(
item.retry_count,
Some(0),
item.retry_count(),
0,
"retry_count should be reset to 0 in CRDT after unblock"
);
}
@@ -334,14 +334,13 @@ mod tests {
let item = crate::crdt_state::read_item(story_id)
.expect("story should still be in CRDT after unblock");
assert_eq!(
item.retry_count,
Some(0),
item.retry_count(),
0,
"retry_count must be reset to 0 in CRDT after unblock"
);
assert!(
!item.blocked.unwrap_or(false),
"blocked flag must be cleared in CRDT after unblock: {:?}",
item.blocked
!item.blocked(),
"blocked flag must be cleared in CRDT after unblock"
);
}
+11 -6
View File
@@ -32,14 +32,19 @@ pub(crate) fn find_story_by_number(
// initialised (e.g. in unit tests or very early startup).
if let Some(items) = crate::crdt_state::read_all_items() {
for item in items {
if item.story_id.split('_').next().unwrap_or("") == number {
if item.story_id().split('_').next().unwrap_or("") == number {
let path = project_root
.join(".huskies")
.join("work")
.join(&item.stage)
.join(format!("{}.md", item.story_id));
let content = crate::db::read_content(&item.story_id);
return Some((item.story_id, item.stage, path, content));
.join(item.stage_str())
.join(format!("{}.md", item.story_id()));
let content = crate::db::read_content(item.story_id());
return Some((
item.story_id().to_string(),
item.stage_str().to_string(),
path,
content,
));
}
}
}
@@ -53,7 +58,7 @@ pub(crate) fn find_story_by_number(
continue;
}
let stage_dir = crate::crdt_state::read_item(&id)
.map(|v| v.stage)
.map(|v| v.stage_str().to_string())
.unwrap_or_else(|| "1_backlog".to_string());
let path = project_root
.join(".huskies")
+2 -2
View File
@@ -48,8 +48,8 @@ pub use state::init;
pub use types::{
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent,
GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, MergeJobView,
NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView,
TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, subscribe,
NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView, Stage,
TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem, subscribe,
};
pub use write::{
bump_retry_count, migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id,
+8 -8
View File
@@ -406,10 +406,10 @@ pub fn check_unmet_deps_crdt(story_id: &str) -> Vec<u32> {
Some(i) => i,
None => return Vec::new(),
};
let deps = match item.depends_on {
Some(d) => d,
None => return Vec::new(),
};
let deps = item.depends_on().to_vec();
if deps.is_empty() {
return Vec::new();
}
deps.into_iter()
.filter(|&dep| !dep_is_done_crdt(dep))
.collect()
@@ -425,10 +425,10 @@ pub fn check_archived_deps_crdt(story_id: &str) -> Vec<u32> {
Some(i) => i,
None => return Vec::new(),
};
let deps = match item.depends_on {
Some(d) => d,
None => return Vec::new(),
};
let deps = item.depends_on().to_vec();
if deps.is_empty() {
return Vec::new();
}
deps.into_iter()
.filter(|&dep| dep_is_archived_crdt(dep))
.collect()
+201 -23
View File
@@ -112,31 +112,209 @@ pub struct NodePresenceCrdt {
// ── Read-side view types ─────────────────────────────────────────────
/// A snapshot of a single pipeline item derived from the CRDT document.
#[derive(Clone, Debug)]
pub struct PipelineItemView {
pub story_id: String,
pub stage: String,
pub name: Option<String>,
pub agent: Option<String>,
pub retry_count: Option<i64>,
pub blocked: Option<bool>,
pub depends_on: Option<Vec<u32>>,
/// Node ID of the node that claimed this item (hex-encoded Ed25519 pubkey).
pub claimed_by: Option<String>,
/// Unix timestamp when the item was claimed.
pub claimed_at: Option<f64>,
/// Unix timestamp (seconds) when the item was merged to master.
/// `None` for items that were never in `5_done` or for legacy items.
pub merged_at: Option<f64>,
/// QA mode override from the CRDT register: `"server"`, `"agent"`, or `"human"`.
/// `None` means the register is unset (use project default).
pub qa_mode: Option<String>,
/// Whether the auto-assigner has already spawned a mergemaster session for
/// this item. `None` means the register has never been set (treat as false).
pub mergemaster_attempted: Option<bool>,
/// Pipeline stage inferred from the CRDT `stage` register.
///
/// This is the low-level typed stage for [`WorkItem`] accessors. For rich
/// transition metadata (merge commits, timestamps, etc.) project via
/// `pipeline_state::Stage` instead.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Stage {
/// Story created but not yet triaged (`0_upcoming`).
Upcoming,
/// Waiting for dependencies or auto-assign (`1_backlog`).
Backlog,
/// Actively being coded (`2_current`).
Coding,
/// Blocked awaiting human resolution (`2_blocked`).
Blocked,
/// Coder done; gates running (`3_qa`).
Qa,
/// Gates passed; ready to merge (`4_merge`).
Merge,
/// Merge failed; awaiting intervention (`4_merge_failure`).
MergeFailure,
/// Merged to master (`5_done`).
Done,
/// Out of the active flow (`6_archived`).
Archived,
/// Frozen, awaiting human review (`7_frozen`).
Frozen,
/// An unrecognised stage string — forward-compatible catch-all.
Unknown(String),
}
impl Stage {
/// Parse a stage directory string into the typed enum.
pub fn from_dir(s: &str) -> Self {
match s {
"0_upcoming" => Stage::Upcoming,
"1_backlog" => Stage::Backlog,
"2_current" => Stage::Coding,
"2_blocked" => Stage::Blocked,
"3_qa" => Stage::Qa,
"4_merge" => Stage::Merge,
"4_merge_failure" => Stage::MergeFailure,
"5_done" => Stage::Done,
"6_archived" => Stage::Archived,
"7_frozen" => Stage::Frozen,
other => Stage::Unknown(other.to_string()),
}
}
/// Convert back to the filesystem directory name string.
pub fn as_dir(&self) -> &str {
match self {
Stage::Upcoming => "0_upcoming",
Stage::Backlog => "1_backlog",
Stage::Coding => "2_current",
Stage::Blocked => "2_blocked",
Stage::Qa => "3_qa",
Stage::Merge => "4_merge",
Stage::MergeFailure => "4_merge_failure",
Stage::Done => "5_done",
Stage::Archived => "6_archived",
Stage::Frozen => "7_frozen",
Stage::Unknown(s) => s.as_str(),
}
}
}
/// A typed snapshot of a single pipeline work item derived from the CRDT document.
///
/// Access fields exclusively through the typed accessor methods — raw field access is
/// restricted to the `crdt_state` module tree. All `JsonValue` interpretation is
/// confined to `crdt_state::read::extract_item_view`, so no `JsonValue` escapes into
/// the public API.
///
/// Adding a new field here without also reading it in an accessor produces an
/// `unused field` compiler warning, enforcing the read-side contract at compile time.
#[derive(Clone, Debug)]
pub struct WorkItem {
pub(super) story_id: String,
pub(super) stage: String,
pub(super) name: Option<String>,
pub(super) agent: Option<String>,
pub(super) retry_count: Option<i64>,
pub(super) blocked: Option<bool>,
pub(super) depends_on: Option<Vec<u32>>,
/// Node ID of the node that claimed this item (hex-encoded Ed25519 pubkey).
pub(super) claimed_by: Option<String>,
/// Unix timestamp (seconds) when the claim was written.
pub(super) claimed_at: Option<f64>,
/// Unix timestamp (seconds) when the item was merged to master.
pub(super) merged_at: Option<f64>,
/// QA mode override: `"server"`, `"agent"`, or `"human"`.
pub(super) qa_mode: Option<String>,
/// Whether the auto-assigner has already attempted a mergemaster spawn.
pub(super) mergemaster_attempted: Option<bool>,
}
impl WorkItem {
/// The story identifier (e.g. `"42"` or `"42_story_my_feature"`).
pub fn story_id(&self) -> &str {
&self.story_id
}
/// Pipeline stage as a typed enum.
pub fn stage(&self) -> Stage {
Stage::from_dir(&self.stage)
}
/// Raw stage directory string (e.g. `"2_current"`).
pub fn stage_str(&self) -> &str {
&self.stage
}
/// Human-readable story name, or `None` when unset.
pub fn name(&self) -> Option<&str> {
self.name.as_deref()
}
/// Agent name pinned to this item, or `None` when unset.
pub fn agent(&self) -> Option<&str> {
self.agent.as_deref()
}
/// Whether the item is blocked. Returns `false` when the register is unset.
pub fn blocked(&self) -> bool {
self.blocked.unwrap_or(false)
}
/// Retry counter. Returns `0` when the register is unset.
pub fn retry_count(&self) -> u32 {
self.retry_count.unwrap_or(0).max(0) as u32
}
/// Dependency story numbers. Returns an empty slice when unset.
pub fn depends_on(&self) -> &[u32] {
self.depends_on.as_deref().unwrap_or(&[])
}
/// Node ID of the current claim holder, or `None` when unclaimed.
pub fn claimed_by(&self) -> Option<&str> {
self.claimed_by.as_deref()
}
/// Unix timestamp (seconds) when the current claim was written, or `None`.
pub fn claimed_at(&self) -> Option<f64> {
self.claimed_at
}
/// Unix timestamp (seconds) when the item was merged to master, or `None`.
pub fn merged_at(&self) -> Option<f64> {
self.merged_at
}
/// QA mode override (`"server"`, `"agent"`, or `"human"`), or `None` when unset.
pub fn qa_mode(&self) -> Option<&str> {
self.qa_mode.as_deref()
}
/// Whether a mergemaster spawn has already been attempted. Returns `false` when unset.
pub fn mergemaster_attempted(&self) -> bool {
self.mergemaster_attempted.unwrap_or(false)
}
/// Construct a `WorkItem` for use in tests outside `crdt_state::*`.
///
/// Within `crdt_state` use a struct literal directly (fields are `pub(super)`).
/// Each field must be supplied — adding a new field to `WorkItem` without updating
/// this constructor produces a compile error, enforcing the read-side contract.
#[allow(clippy::too_many_arguments)]
pub fn for_test(
story_id: impl Into<String>,
stage: impl Into<String>,
name: Option<String>,
agent: Option<String>,
retry_count: Option<i64>,
blocked: Option<bool>,
depends_on: Option<Vec<u32>>,
claimed_by: Option<String>,
claimed_at: Option<f64>,
merged_at: Option<f64>,
qa_mode: Option<String>,
mergemaster_attempted: Option<bool>,
) -> Self {
Self {
story_id: story_id.into(),
stage: stage.into(),
name,
agent,
retry_count,
blocked,
depends_on,
claimed_by,
claimed_at,
merged_at,
qa_mode,
mergemaster_attempted,
}
}
}
/// Backward-compatibility alias; prefer [`WorkItem`].
pub type PipelineItemView = WorkItem;
/// A snapshot of a single node presence entry derived from the CRDT document.
#[derive(Clone, Debug, serde::Serialize)]
pub struct NodePresenceView {
+11 -9
View File
@@ -346,12 +346,12 @@ mod tests {
write_item_with_content(story_id, "2_current", content, meta);
let view = crate::crdt_state::read_item(story_id).expect("story exists in CRDT");
assert_eq!(view.stage, "2_current");
assert_eq!(view.name.as_deref(), Some("Typed Name"));
assert_eq!(view.agent.as_deref(), Some("coder-1"));
assert_eq!(view.retry_count, Some(2));
assert_eq!(view.blocked, Some(true));
assert_eq!(view.depends_on, Some(vec![100, 200]));
assert_eq!(view.stage_str(), "2_current");
assert_eq!(view.name(), Some("Typed Name"));
assert_eq!(view.agent(), Some("coder-1"));
assert_eq!(view.retry_count(), 2);
assert!(view.blocked());
assert_eq!(view.depends_on(), &[100, 200]);
// Content is stored verbatim (no parsing, no rewrite).
assert_eq!(read_content(story_id).as_deref(), Some(content));
@@ -371,13 +371,15 @@ mod tests {
write_item_with_content(story_id, "2_current", content, ItemMeta::default());
let view = crate::crdt_state::read_item(story_id).expect("story exists in CRDT");
assert_eq!(view.stage, "2_current");
assert_eq!(view.stage_str(), "2_current");
assert_eq!(
view.name, None,
view.name(),
None,
"name must come from typed meta, not parsed YAML"
);
assert_eq!(
view.agent, None,
view.agent(),
None,
"agent must come from typed meta, not parsed YAML"
);
}
+2 -2
View File
@@ -17,7 +17,7 @@ pub(super) async fn tool_merge_agent_work(
// Check CRDT stage before attempting merge — if already done or archived,
// return success immediately to avoid spurious error notifications.
if let Some(item) = crate::crdt_state::read_item(story_id)
&& crate::pipeline_state::Stage::from_dir(&item.stage).is_some_and(|s| {
&& crate::pipeline_state::Stage::from_dir(item.stage_str()).is_some_and(|s| {
matches!(
s,
crate::pipeline_state::Stage::Done { .. }
@@ -31,7 +31,7 @@ pub(super) async fn tool_merge_agent_work(
"success": true,
"message": format!(
"Story '{}' is already in '{}' — no merge needed.",
story_id, item.stage
story_id, item.stage_str()
),
}))
.map_err(|e| format!("Serialization error: {e}"));
+2 -2
View File
@@ -215,12 +215,12 @@ pub(super) async fn tool_status(args: &Value, ctx: &AppContext) -> Result<String
// pass the read_typed / 2_current check above, but the code is present
// for completeness and future-proofing).
if let Some(view) = crate::crdt_state::read_item(story_id) {
if let Some(cb) = &view.claimed_by
if let Some(cb) = view.claimed_by()
&& !cb.is_empty()
{
front_matter.insert("claimed_by".to_string(), json!(cb));
}
if let Some(ca) = view.claimed_at
if let Some(ca) = view.claimed_at()
&& ca > 0.0
{
front_matter.insert("claimed_at".to_string(), json!(ca));
+17 -14
View File
@@ -233,8 +233,8 @@ mod tests {
assert!(r.is_ok(), "set [1,2,3] should succeed: {r:?}");
let view = crate::crdt_state::read_item("888_deps_scr").expect("CRDT must have story");
assert_eq!(
view.depends_on,
Some(vec![1, 2, 3]),
view.depends_on(),
&[1, 2, 3],
"CRDT should hold [1,2,3] after set"
);
@@ -245,9 +245,9 @@ mod tests {
);
assert!(r.is_ok(), "clear [] should succeed: {r:?}");
let view = crate::crdt_state::read_item("888_deps_scr").expect("CRDT must have story");
assert_eq!(
view.depends_on, None,
"CRDT should be None after clearing to []"
assert!(
view.depends_on().is_empty(),
"CRDT should be empty after clearing to []"
);
// Replace with [4, 5] — must not append to previous [1,2,3].
@@ -258,8 +258,8 @@ mod tests {
assert!(r.is_ok(), "replace [4,5] should succeed: {r:?}");
let view = crate::crdt_state::read_item("888_deps_scr").expect("CRDT must have story");
assert_eq!(
view.depends_on,
Some(vec![4, 5]),
view.depends_on(),
&[4, 5],
"CRDT should hold exactly [4,5] after replace (not [1,2,3,4,5])"
);
}
@@ -290,7 +290,10 @@ mod tests {
);
assert!(r.is_ok(), "clear should succeed: {r:?}");
let view = crate::crdt_state::read_item("888_deps_persist").expect("CRDT must have story");
assert_eq!(view.depends_on, None, "CRDT should be None after clear");
assert!(
view.depends_on().is_empty(),
"CRDT should be empty after clear"
);
// Now update a different field — this triggers write_story_content with
// the stale YAML (which still has depends_on: [100, 200]).
@@ -300,11 +303,11 @@ mod tests {
);
assert!(r.is_ok(), "subsequent name update should succeed: {r:?}");
// The CRDT must still be None — the YAML value must not have been restored.
// The CRDT must still be empty — the YAML value must not have been restored.
let view = crate::crdt_state::read_item("888_deps_persist").expect("CRDT must have story");
assert_eq!(
view.depends_on, None,
"CRDT depends_on must remain None after unrelated update (write_story_content must not restore YAML value)"
assert!(
view.depends_on().is_empty(),
"CRDT depends_on must remain empty after unrelated update (write_story_content must not restore YAML value)"
);
}
@@ -327,8 +330,8 @@ mod tests {
// CRDT register must hold the deps.
let view = crate::crdt_state::read_item("504_arr_test").expect("CRDT must have the story");
assert_eq!(
view.depends_on,
Some(vec![490, 491]),
view.depends_on(),
&[490, 491],
"CRDT register should hold [490, 491]: {view:?}"
);
+1 -1
View File
@@ -23,7 +23,7 @@ pub fn parse_unchecked_todos(contents: &str) -> Vec<String> {
/// spikes themselves.
pub fn resolve_qa_mode(story_id: &str, default: QaMode) -> QaMode {
crate::crdt_state::read_item(story_id)
.and_then(|view| view.qa_mode)
.and_then(|view| view.qa_mode().map(str::to_string))
.as_deref()
.and_then(QaMode::from_str)
.unwrap_or(default)
+92 -127
View File
@@ -36,22 +36,22 @@ impl fmt::Display for ProjectionError {
impl std::error::Error for ProjectionError {}
// ── Projection: PipelineItemView → PipelineItem ─────────────────────────────
// ── Projection: WorkItem → PipelineItem ─────────────────────────────────────
impl TryFrom<&PipelineItemView> for PipelineItem {
type Error = ProjectionError;
fn try_from(view: &PipelineItemView) -> Result<Self, ProjectionError> {
let story_id = StoryId(view.story_id.clone());
let name = view.name.clone().unwrap_or_default();
let story_id = StoryId(view.story_id().to_string());
let name = view.name().unwrap_or("").to_string();
let depends_on: Vec<StoryId> = view
.depends_on
.as_ref()
.map(|deps| deps.iter().map(|d| StoryId(d.to_string())).collect())
.unwrap_or_default();
.depends_on()
.iter()
.map(|d| StoryId(d.to_string()))
.collect();
let retry_count = view.retry_count.unwrap_or(0).max(0) as u32;
let retry_count = view.retry_count();
let stage = project_stage(view)?;
@@ -65,11 +65,11 @@ impl TryFrom<&PipelineItemView> for PipelineItem {
}
}
/// Project the stage string + associated fields from a PipelineItemView into
/// Project the stage string + associated fields from a WorkItem into
/// a typed Stage enum. This is the one carefully-controlled boundary where
/// loose CRDT data becomes typed.
pub fn project_stage(view: &PipelineItemView) -> Result<Stage, ProjectionError> {
match view.stage.as_str() {
match view.stage_str() {
"0_upcoming" => Ok(Stage::Upcoming),
"1_backlog" => Ok(Stage::Backlog),
"2_blocked" => Ok(Stage::Blocked {
@@ -82,7 +82,7 @@ pub fn project_stage(view: &PipelineItemView) -> Result<Stage, ProjectionError>
// commits_ahead — those are computed at transition time. For
// projection from existing CRDT data, we synthesize defaults.
// The feature branch follows the naming convention.
let branch = format!("feature/story-{}", view.story_id);
let branch = format!("feature/story-{}", view.story_id());
// Existing CRDT data doesn't track commits_ahead, so we use 1 as
// a safe non-zero default (the item is in merge, so there must be
// at least one commit).
@@ -105,7 +105,7 @@ pub fn project_stage(view: &PipelineItemView) -> Result<Stage, ProjectionError>
// to UNIX_EPOCH, which makes them older than any retention window
// and therefore eligible for immediate sweep to 6_archived.
let merged_at = view
.merged_at
.merged_at()
.map(|ts| {
DateTime::from_timestamp(ts as i64, 0).unwrap_or(DateTime::<Utc>::UNIX_EPOCH)
})
@@ -117,7 +117,7 @@ pub fn project_stage(view: &PipelineItemView) -> Result<Stage, ProjectionError>
}
"6_archived" => {
// Determine the archive reason from the CRDT fields.
let reason = if view.blocked == Some(true) {
let reason = if view.blocked() {
ArchiveReason::Blocked {
reason: "migrated from legacy blocked field".to_string(),
}
@@ -133,7 +133,7 @@ pub fn project_stage(view: &PipelineItemView) -> Result<Stage, ProjectionError>
"7_frozen" => {
// The stage to resume to is stored in front matter as `resume_to_stage`.
// Fall back to Coding if the field is absent (e.g. legacy frozen items).
let resume_to = crate::db::read_content(&view.story_id)
let resume_to = crate::db::read_content(view.story_id())
.and_then(|content| {
crate::db::yaml_legacy::parse_front_matter(&content)
.ok()
@@ -186,7 +186,7 @@ pub fn read_all_typed() -> Vec<PipelineItem> {
Err(e) => {
crate::slog!(
"[pipeline_state] projection error for '{}': {e}",
v.story_id
v.story_id()
);
None
}
@@ -221,42 +221,46 @@ mod tests {
StoryId(s.to_string())
}
fn make_view(story_id: &str, stage: &str, name: Option<&str>) -> PipelineItemView {
PipelineItemView::for_test(
story_id,
stage,
name.map(str::to_string),
None,
None,
None,
None,
None,
None,
None,
None,
None,
)
}
#[test]
fn project_upcoming_item() {
let view = PipelineItemView {
story_id: "42_story_test".to_string(),
stage: "0_upcoming".to_string(),
name: Some("Test Story".to_string()),
agent: None,
retry_count: None,
blocked: None,
depends_on: None,
claimed_by: None,
claimed_at: None,
merged_at: None,
qa_mode: None,
mergemaster_attempted: None,
};
let view = make_view("42_story_test", "0_upcoming", Some("Test Story"));
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(item.stage, Stage::Upcoming));
}
#[test]
fn project_backlog_item() {
let view = PipelineItemView {
story_id: "42_story_test".to_string(),
stage: "1_backlog".to_string(),
name: Some("Test Story".to_string()),
agent: None,
retry_count: None,
blocked: None,
depends_on: Some(vec![10, 20]),
claimed_by: None,
claimed_at: None,
merged_at: None,
qa_mode: None,
mergemaster_attempted: None,
};
let view = PipelineItemView::for_test(
"42_story_test",
"1_backlog",
Some("Test Story".to_string()),
None,
None,
None,
Some(vec![10, 20]),
None,
None,
None,
None,
None,
);
let item = PipelineItem::try_from(&view).unwrap();
assert_eq!(item.story_id, StoryId("42_story_test".to_string()));
assert_eq!(item.name, "Test Story");
@@ -267,20 +271,20 @@ mod tests {
#[test]
fn project_current_item() {
let view = PipelineItemView {
story_id: "42_story_test".to_string(),
stage: "2_current".to_string(),
name: Some("Test".to_string()),
agent: Some("coder-1".to_string()),
retry_count: Some(2),
blocked: None,
depends_on: None,
claimed_by: None,
claimed_at: None,
merged_at: None,
qa_mode: None,
mergemaster_attempted: None,
};
let view = PipelineItemView::for_test(
"42_story_test",
"2_current",
Some("Test".to_string()),
Some("coder-1".to_string()),
Some(2),
None,
None,
None,
None,
None,
None,
None,
);
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(item.stage, Stage::Coding));
assert_eq!(item.retry_count, 2);
@@ -288,20 +292,7 @@ mod tests {
#[test]
fn project_merge_item() {
let view = PipelineItemView {
story_id: "42_story_test".to_string(),
stage: "4_merge".to_string(),
name: Some("Test".to_string()),
agent: None,
retry_count: None,
blocked: None,
depends_on: None,
claimed_by: None,
claimed_at: None,
merged_at: None,
qa_mode: None,
mergemaster_attempted: None,
};
let view = make_view("42_story_test", "4_merge", Some("Test"));
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(item.stage, Stage::Merge { .. }));
if let Stage::Merge {
@@ -316,40 +307,27 @@ mod tests {
#[test]
fn project_blocked_item() {
let view = PipelineItemView {
story_id: "42_story_test".to_string(),
stage: "2_blocked".to_string(),
name: Some("Test".to_string()),
agent: None,
retry_count: None,
blocked: None,
depends_on: None,
claimed_by: None,
claimed_at: None,
merged_at: None,
qa_mode: None,
mergemaster_attempted: None,
};
let view = make_view("42_story_test", "2_blocked", Some("Test"));
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(item.stage, Stage::Blocked { .. }));
}
#[test]
fn project_archived_blocked_item() {
let view = PipelineItemView {
story_id: "42_story_test".to_string(),
stage: "6_archived".to_string(),
name: Some("Test".to_string()),
agent: None,
retry_count: None,
blocked: Some(true),
depends_on: None,
claimed_by: None,
claimed_at: None,
merged_at: None,
qa_mode: None,
mergemaster_attempted: None,
};
let view = PipelineItemView::for_test(
"42_story_test",
"6_archived",
Some("Test".to_string()),
None,
None,
Some(true),
None,
None,
None,
None,
None,
None,
);
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(
item.stage,
@@ -362,20 +340,20 @@ mod tests {
#[test]
fn project_archived_completed_item() {
let view = PipelineItemView {
story_id: "42_story_test".to_string(),
stage: "6_archived".to_string(),
name: Some("Test".to_string()),
agent: None,
retry_count: None,
blocked: Some(false),
depends_on: None,
claimed_by: None,
claimed_at: None,
merged_at: None,
qa_mode: None,
mergemaster_attempted: None,
};
let view = PipelineItemView::for_test(
"42_story_test",
"6_archived",
Some("Test".to_string()),
None,
None,
Some(false),
None,
None,
None,
None,
None,
None,
);
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(
item.stage,
@@ -388,20 +366,7 @@ mod tests {
#[test]
fn project_unknown_stage_returns_error() {
let view = PipelineItemView {
story_id: "42_story_test".to_string(),
stage: "9_invalid".to_string(),
name: Some("Test".to_string()),
agent: None,
retry_count: None,
blocked: None,
depends_on: None,
claimed_by: None,
claimed_at: None,
merged_at: None,
qa_mode: None,
mergemaster_attempted: None,
};
let view = make_view("42_story_test", "9_invalid", Some("Test"));
let result = PipelineItem::try_from(&view);
assert!(matches!(
result,