diff --git a/server/src/agent_mode/claim.rs b/server/src/agent_mode/claim.rs index 8f86aa37..3d50129b 100644 --- a/server/src/agent_mode/claim.rs +++ b/server/src/agent_mode/claim.rs @@ -103,11 +103,11 @@ mod tests { // Confirm the stale claim is in place. let before = read_item(story_id).expect("item should exist"); assert_eq!( - before.claimed_by.as_deref(), + before.claimed_by(), Some(stale_holder), "pre-condition: item should be claimed by the stale holder" ); - let age = chrono::Utc::now().timestamp() as f64 - before.claimed_at.unwrap_or(0.0); + let age = chrono::Utc::now().timestamp() as f64 - before.claimed_at().unwrap_or(0.0); assert!( age >= CLAIM_TIMEOUT_SECS, "pre-condition: claim age ({age}s) must exceed TTL ({CLAIM_TIMEOUT_SECS}s)" @@ -134,12 +134,12 @@ mod tests { let our_id = our_node_id().expect("node id should be available after init_for_test"); let after = read_item(story_id).expect("item should still exist"); assert_eq!( - after.claimed_by.as_deref(), + after.claimed_by(), Some(our_id.as_str()), "new claim should have displaced the stale holder" ); assert_ne!( - after.claimed_by.as_deref(), + after.claimed_by(), Some(stale_holder), "stale holder must no longer own the claim" ); diff --git a/server/src/agent_mode/loop_ops.rs b/server/src/agent_mode/loop_ops.rs index bffa2d1e..03edb054 100644 --- a/server/src/agent_mode/loop_ops.rs +++ b/server/src/agent_mode/loop_ops.rs @@ -42,27 +42,28 @@ pub(super) async fn scan_and_claim( for item in &items { // Only claim stories in active stages. - if !crate::pipeline_state::Stage::from_dir(&item.stage).is_some_and(|s| s.is_active()) { + if !crate::pipeline_state::Stage::from_dir(item.stage_str()).is_some_and(|s| s.is_active()) + { continue; } // Skip blocked stories. - if item.blocked == Some(true) { + if item.blocked() { continue; } // If already claimed by us, skip. - if item.claimed_by.as_deref() == Some(&our_node) { + if item.claimed_by() == Some(our_node.as_str()) { continue; } // If claimed by another node, respect the claim while it is fresh. // Once the TTL expires the claim is considered stale regardless of // whether the holder appears alive — displacement is purely TTL-driven. - if let Some(ref claimer) = item.claimed_by + if let Some(claimer) = item.claimed_by() && !claimer.is_empty() - && claimer != &our_node - && let Some(claimed_at) = item.claimed_at + && claimer != our_node.as_str() + && let Some(claimed_at) = item.claimed_at() { let now = chrono::Utc::now().timestamp() as f64; let age = now - claimed_at; @@ -74,7 +75,7 @@ pub(super) async fn scan_and_claim( slog!( "[agent-mode] Displacing stale claim on '{}' held by {:.12}… \ (age {}s > TTL {}s)", - item.story_id, + item.story_id(), claimer, age as u64, CLAIM_TIMEOUT_SECS as u64, @@ -97,10 +98,10 @@ pub(super) async fn scan_and_claim( }) .map(|n| n.node_id) .collect(); - if !should_self_claim(&our_node, &item.story_id, &alive_peers) { + if !should_self_claim(&our_node, item.story_id(), &alive_peers) { slog!( "[agent-mode] Hash tie-break: deferring claim on '{}' to lower-hash peer", - item.story_id + item.story_id() ); continue; } @@ -108,11 +109,11 @@ pub(super) async fn scan_and_claim( // Try to claim this story. slog!( "[agent-mode] Claiming story '{}' for this node", - item.story_id + item.story_id() ); - if crdt_state::write_claim(&item.story_id) { + if crdt_state::write_claim(item.story_id()) { let now = chrono::Utc::now().timestamp() as f64; - our_claims.insert(item.story_id.clone(), now); + our_claims.insert(item.story_id().to_string(), now); } } @@ -165,27 +166,28 @@ pub(super) fn reclaim_timed_out_work(_project_root: &Path) { let now = chrono::Utc::now().timestamp() as f64; for item in &items { - if !crate::pipeline_state::Stage::from_dir(&item.stage).is_some_and(|s| s.is_active()) { + if !crate::pipeline_state::Stage::from_dir(item.stage_str()).is_some_and(|s| s.is_active()) + { continue; } // Release the claim if the TTL has expired — regardless of whether the // holder is still alive. A node actively working should refresh its // claim before the TTL window closes. - if let Some(ref claimer) = item.claimed_by { + if let Some(claimer) = item.claimed_by() { if claimer.is_empty() { continue; } - if let Some(claimed_at) = item.claimed_at + if let Some(claimed_at) = item.claimed_at() && now - claimed_at >= CLAIM_TIMEOUT_SECS { slog!( "[agent-mode] Releasing stale claim on '{}' held by {:.12}… (age {}s)", - item.story_id, + item.story_id(), claimer, (now - claimed_at) as u64, ); - crdt_state::release_claim(&item.story_id); + crdt_state::release_claim(item.story_id()); } } } diff --git a/server/src/agents/pool/auto_assign/story_checks.rs b/server/src/agents/pool/auto_assign/story_checks.rs index 4ca7db3e..7c74946b 100644 --- a/server/src/agents/pool/auto_assign/story_checks.rs +++ b/server/src/agents/pool/auto_assign/story_checks.rs @@ -19,10 +19,10 @@ pub(super) fn read_story_front_matter_agent( story_id: &str, ) -> Option { if let Some(view) = crate::crdt_state::read_item(story_id) - && let Some(agent) = view.agent.as_ref() + && let Some(agent) = view.agent() && !agent.is_empty() { - return Some(agent.clone()); + return Some(agent.to_string()); } use crate::db::yaml_legacy::parse_front_matter; let contents = read_story_contents(project_root, story_id)?; @@ -101,7 +101,7 @@ pub(super) fn has_mergemaster_attempted( story_id: &str, ) -> bool { crate::crdt_state::read_item(story_id) - .and_then(|view| view.mergemaster_attempted) + .map(|view| view.mergemaster_attempted()) .unwrap_or(false) } diff --git a/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs b/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs index f68ab556..78cde8fd 100644 --- a/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs +++ b/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs @@ -274,9 +274,10 @@ max_turns = 10 let item = crate::crdt_state::read_item(story_id) .expect("story must be in CRDT after watchdog termination"); assert_eq!( - item.stage, "2_blocked", + item.stage_str(), + "2_blocked", "story stage must be 2_blocked after limit termination with max_retries=1 — got: {}", - item.stage + item.stage_str() ); // Sanity: the agent itself is also Failed with the right reason. @@ -415,7 +416,8 @@ max_turns = 10 let item = crate::crdt_state::read_item(story_id) .expect("story must be in CRDT after per-session overrun"); assert_eq!( - item.stage, "2_blocked", + item.stage_str(), + "2_blocked", "story stage must be 2_blocked after per-session overrun with max_retries=1" ); } @@ -471,12 +473,12 @@ max_turns = 10 let item = crate::crdt_state::read_item(story_id).expect("story must be in CRDT"); assert_eq!( - item.retry_count, - Some(1), + item.retry_count(), + 1, "after session 1, retry_count should be 1 in CRDT" ); assert_ne!( - item.stage.as_str(), + item.stage_str(), "2_blocked", "story should NOT be blocked after session 1" ); @@ -491,12 +493,12 @@ max_turns = 10 let item = crate::crdt_state::read_item(story_id).expect("story must be in CRDT"); assert_eq!( - item.retry_count, - Some(2), + item.retry_count(), + 2, "after session 2, retry_count should be 2 in CRDT" ); assert_ne!( - item.stage.as_str(), + item.stage_str(), "2_blocked", "story should NOT be blocked after session 2" ); @@ -511,15 +513,16 @@ max_turns = 10 let item = crate::crdt_state::read_item(story_id).expect("story must be in CRDT"); assert_eq!( - item.stage, "2_blocked", + item.stage_str(), + "2_blocked", "story must be blocked after session 3 (retry_count=3 >= max_retries=3) — got: {}", - item.stage + item.stage_str() ); // retry_count resets to 0 on stage transition (Bug 780) — the fact // that the story reached 2_blocked proves the retry limit was hit. assert_eq!( - item.retry_count, - Some(0), + item.retry_count(), + 0, "retry_count should reset to 0 after stage transition to blocked" ); } diff --git a/server/src/agents/pool/pipeline/advance/helpers.rs b/server/src/agents/pool/pipeline/advance/helpers.rs index 91da3ba4..05f0a4c9 100644 --- a/server/src/agents/pool/pipeline/advance/helpers.rs +++ b/server/src/agents/pool/pipeline/advance/helpers.rs @@ -63,7 +63,7 @@ pub(super) fn resolve_qa_mode_from_store( ) -> crate::io::story_metadata::QaMode { // CRDT register is the authoritative source; check it before the content store. if let Some(view) = crate::crdt_state::read_item(story_id) - && let Some(ref s) = view.qa_mode + && let Some(s) = view.qa_mode() && let Some(mode) = crate::io::story_metadata::QaMode::from_str(s) { return mode; diff --git a/server/src/agents/pool/pipeline/advance/tests_regression.rs b/server/src/agents/pool/pipeline/advance/tests_regression.rs index c757dd18..e8201c36 100644 --- a/server/src/agents/pool/pipeline/advance/tests_regression.rs +++ b/server/src/agents/pool/pipeline/advance/tests_regression.rs @@ -867,8 +867,8 @@ stage = "coder" let item = crate::crdt_state::read_item("9950_story_warm_resume").expect("story must be in CRDT"); assert!( - item.retry_count.is_some_and(|rc| rc > 0), - "retry_count must be incremented after warm-resume: got {:?}", - item.retry_count + item.retry_count() > 0, + "retry_count must be incremented after warm-resume: got {}", + item.retry_count() ); } diff --git a/server/src/agents/pool/start/spawn.rs b/server/src/agents/pool/start/spawn.rs index c36e095d..91e17eb7 100644 --- a/server/src/agents/pool/start/spawn.rs +++ b/server/src/agents/pool/start/spawn.rs @@ -68,7 +68,7 @@ fn inject_gate_failure_section(args: &mut Vec, gate_output: &str) { /// prior failure context, even when session-resuming (story 881). pub(super) fn maybe_inject_gate_failure(args: &mut Vec, story_id: &str) { let retry_count = crate::crdt_state::read_item(story_id) - .and_then(|item| item.retry_count) + .map(|item| item.retry_count()) .unwrap_or(0); if retry_count > 0 && let Some(gate_output) = crate::db::read_content(&format!("{story_id}:gate_output")) @@ -767,8 +767,7 @@ mod tests { // retry_count must remain 0 — the abort path never calls bump_retry_count. let retry_count = crate::crdt_state::read_item(story_id) - .and_then(|item| item.retry_count) - .map(|r| r as u32) + .map(|item| item.retry_count()) .unwrap_or(0); assert_eq!( retry_count, 0, diff --git a/server/src/agents/pool/start/validation.rs b/server/src/agents/pool/start/validation.rs index ae5e61f0..233fbedd 100644 --- a/server/src/agents/pool/start/validation.rs +++ b/server/src/agents/pool/start/validation.rs @@ -64,10 +64,10 @@ pub(super) fn read_front_matter_agent(story_id: &str, agent_name: Option<&str>) // to legacy YAML parsing for stories whose CRDT entry doesn't yet have // the field populated. if let Some(view) = crate::crdt_state::read_item(story_id) - && let Some(agent) = view.agent.as_ref() + && let Some(agent) = view.agent() && !agent.is_empty() { - return Some(agent.clone()); + return Some(agent.to_string()); } crate::db::read_content(story_id).and_then(|contents| { crate::db::yaml_legacy::parse_front_matter(&contents) diff --git a/server/src/chat/commands/depends.rs b/server/src/chat/commands/depends.rs index 2d396e8c..ad1e8f02 100644 --- a/server/src/chat/commands/depends.rs +++ b/server/src/chat/commands/depends.rs @@ -192,8 +192,8 @@ mod tests { // CRDT register must hold the deps. let view = crate::crdt_state::read_item("9910_story_foo").expect("CRDT should have story"); assert_eq!( - view.depends_on, - Some(vec![477, 478]), + view.depends_on(), + &[477, 478], "CRDT register should hold [477, 478]: {view:?}" ); // Content store YAML must NOT be mutated with depends_on. @@ -223,8 +223,8 @@ mod tests { ); // CRDT register must be empty after clear. let view = crate::crdt_state::read_item("9911_story_bar").expect("CRDT should have story"); - assert_eq!( - view.depends_on, None, + assert!( + view.depends_on().is_empty(), "CRDT register should be empty after clearing: {view:?}" ); // Content store YAML must not be mutated. @@ -260,8 +260,8 @@ mod tests { let view = crate::crdt_state::read_item("8790_story_chat_dep").expect("CRDT must have chat story"); assert_eq!( - view.depends_on, - Some(vec![500, 501]), + view.depends_on(), + &[500, 501], "CRDT must hold [500, 501]: {view:?}" ); @@ -290,8 +290,8 @@ mod tests { assert!(out.contains("1"), "response should mention dep 1: {out}"); let view = crate::crdt_state::read_item("9920_story_scr").expect("CRDT must have story"); assert_eq!( - view.depends_on, - Some(vec![1, 2, 3]), + view.depends_on(), + &[1, 2, 3], "CRDT should hold [1,2,3]: {view:?}" ); @@ -299,9 +299,9 @@ mod tests { let out = depends_cmd_with_root(tmp.path(), "9920").unwrap(); assert!(out.contains("Cleared"), "clear should confirm: {out}"); let view = crate::crdt_state::read_item("9920_story_scr").expect("CRDT must have story"); - assert_eq!( - view.depends_on, None, - "CRDT should be None after clear: {view:?}" + assert!( + view.depends_on().is_empty(), + "CRDT should be empty after clear: {view:?}" ); // Replace with [4, 5] — must not append to old list. @@ -309,8 +309,8 @@ mod tests { assert!(out.contains("4"), "response should mention dep 4: {out}"); let view = crate::crdt_state::read_item("9920_story_scr").expect("CRDT must have story"); assert_eq!( - view.depends_on, - Some(vec![4, 5]), + view.depends_on(), + &[4, 5], "CRDT should hold exactly [4,5] after replace: {view:?}" ); } diff --git a/server/src/chat/commands/unblock.rs b/server/src/chat/commands/unblock.rs index 84d71fb1..c219a82c 100644 --- a/server/src/chat/commands/unblock.rs +++ b/server/src/chat/commands/unblock.rs @@ -56,7 +56,7 @@ fn unblock_by_story_id(story_id: &str) -> String { let crdt_item = crate::crdt_state::read_item(story_id); let story_name = crdt_item .as_ref() - .and_then(|i| i.name.clone()) + .and_then(|i| i.name().map(str::to_string)) .unwrap_or_else(|| story_id.to_string()); // Canonical "is this story blocked?" comes from the typed pipeline state. @@ -69,7 +69,7 @@ fn unblock_by_story_id(story_id: &str) -> String { Some(crate::pipeline_state::Stage::MergeFailure { .. }) ); // CRDT register fallback for items not yet projected into typed state. - let crdt_blocked = crdt_item.as_ref().and_then(|i| i.blocked).unwrap_or(false); + let crdt_blocked = crdt_item.as_ref().is_some_and(|i| i.blocked()); if !typed_blocked && !crdt_blocked { return format!("**{story_name}** ({story_id}) is not blocked. Nothing to unblock."); @@ -271,8 +271,8 @@ mod tests { let item = crate::crdt_state::read_item("9903_story_stuck") .expect("story should be in CRDT after unblock"); assert_eq!( - item.retry_count, - Some(0), + item.retry_count(), + 0, "retry_count should be reset to 0 in CRDT after unblock" ); } @@ -334,14 +334,13 @@ mod tests { let item = crate::crdt_state::read_item(story_id) .expect("story should still be in CRDT after unblock"); assert_eq!( - item.retry_count, - Some(0), + item.retry_count(), + 0, "retry_count must be reset to 0 in CRDT after unblock" ); assert!( - !item.blocked.unwrap_or(false), - "blocked flag must be cleared in CRDT after unblock: {:?}", - item.blocked + !item.blocked(), + "blocked flag must be cleared in CRDT after unblock" ); } diff --git a/server/src/chat/lookup.rs b/server/src/chat/lookup.rs index 2da9a22f..f6749151 100644 --- a/server/src/chat/lookup.rs +++ b/server/src/chat/lookup.rs @@ -32,14 +32,19 @@ pub(crate) fn find_story_by_number( // initialised (e.g. in unit tests or very early startup). if let Some(items) = crate::crdt_state::read_all_items() { for item in items { - if item.story_id.split('_').next().unwrap_or("") == number { + if item.story_id().split('_').next().unwrap_or("") == number { let path = project_root .join(".huskies") .join("work") - .join(&item.stage) - .join(format!("{}.md", item.story_id)); - let content = crate::db::read_content(&item.story_id); - return Some((item.story_id, item.stage, path, content)); + .join(item.stage_str()) + .join(format!("{}.md", item.story_id())); + let content = crate::db::read_content(item.story_id()); + return Some(( + item.story_id().to_string(), + item.stage_str().to_string(), + path, + content, + )); } } } @@ -53,7 +58,7 @@ pub(crate) fn find_story_by_number( continue; } let stage_dir = crate::crdt_state::read_item(&id) - .map(|v| v.stage) + .map(|v| v.stage_str().to_string()) .unwrap_or_else(|| "1_backlog".to_string()); let path = project_root .join(".huskies") diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index 5ae73560..f1e376dd 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -48,8 +48,8 @@ pub use state::init; pub use types::{ ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent, GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, MergeJobView, - NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView, - TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, subscribe, + NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView, Stage, + TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem, subscribe, }; pub use write::{ bump_retry_count, migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id, diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index 039f93f5..b0ea1ebf 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -406,10 +406,10 @@ pub fn check_unmet_deps_crdt(story_id: &str) -> Vec { Some(i) => i, None => return Vec::new(), }; - let deps = match item.depends_on { - Some(d) => d, - None => return Vec::new(), - }; + let deps = item.depends_on().to_vec(); + if deps.is_empty() { + return Vec::new(); + } deps.into_iter() .filter(|&dep| !dep_is_done_crdt(dep)) .collect() @@ -425,10 +425,10 @@ pub fn check_archived_deps_crdt(story_id: &str) -> Vec { Some(i) => i, None => return Vec::new(), }; - let deps = match item.depends_on { - Some(d) => d, - None => return Vec::new(), - }; + let deps = item.depends_on().to_vec(); + if deps.is_empty() { + return Vec::new(); + } deps.into_iter() .filter(|&dep| dep_is_archived_crdt(dep)) .collect() diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs index e60526d3..b6162ef2 100644 --- a/server/src/crdt_state/types.rs +++ b/server/src/crdt_state/types.rs @@ -112,31 +112,209 @@ pub struct NodePresenceCrdt { // ── Read-side view types ───────────────────────────────────────────── -/// A snapshot of a single pipeline item derived from the CRDT document. -#[derive(Clone, Debug)] -pub struct PipelineItemView { - pub story_id: String, - pub stage: String, - pub name: Option, - pub agent: Option, - pub retry_count: Option, - pub blocked: Option, - pub depends_on: Option>, - /// Node ID of the node that claimed this item (hex-encoded Ed25519 pubkey). - pub claimed_by: Option, - /// Unix timestamp when the item was claimed. - pub claimed_at: Option, - /// Unix timestamp (seconds) when the item was merged to master. - /// `None` for items that were never in `5_done` or for legacy items. - pub merged_at: Option, - /// QA mode override from the CRDT register: `"server"`, `"agent"`, or `"human"`. - /// `None` means the register is unset (use project default). - pub qa_mode: Option, - /// Whether the auto-assigner has already spawned a mergemaster session for - /// this item. `None` means the register has never been set (treat as false). - pub mergemaster_attempted: Option, +/// Pipeline stage inferred from the CRDT `stage` register. +/// +/// This is the low-level typed stage for [`WorkItem`] accessors. For rich +/// transition metadata (merge commits, timestamps, etc.) project via +/// `pipeline_state::Stage` instead. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Stage { + /// Story created but not yet triaged (`0_upcoming`). + Upcoming, + /// Waiting for dependencies or auto-assign (`1_backlog`). + Backlog, + /// Actively being coded (`2_current`). + Coding, + /// Blocked awaiting human resolution (`2_blocked`). + Blocked, + /// Coder done; gates running (`3_qa`). + Qa, + /// Gates passed; ready to merge (`4_merge`). + Merge, + /// Merge failed; awaiting intervention (`4_merge_failure`). + MergeFailure, + /// Merged to master (`5_done`). + Done, + /// Out of the active flow (`6_archived`). + Archived, + /// Frozen, awaiting human review (`7_frozen`). + Frozen, + /// An unrecognised stage string — forward-compatible catch-all. + Unknown(String), } +impl Stage { + /// Parse a stage directory string into the typed enum. + pub fn from_dir(s: &str) -> Self { + match s { + "0_upcoming" => Stage::Upcoming, + "1_backlog" => Stage::Backlog, + "2_current" => Stage::Coding, + "2_blocked" => Stage::Blocked, + "3_qa" => Stage::Qa, + "4_merge" => Stage::Merge, + "4_merge_failure" => Stage::MergeFailure, + "5_done" => Stage::Done, + "6_archived" => Stage::Archived, + "7_frozen" => Stage::Frozen, + other => Stage::Unknown(other.to_string()), + } + } + + /// Convert back to the filesystem directory name string. + pub fn as_dir(&self) -> &str { + match self { + Stage::Upcoming => "0_upcoming", + Stage::Backlog => "1_backlog", + Stage::Coding => "2_current", + Stage::Blocked => "2_blocked", + Stage::Qa => "3_qa", + Stage::Merge => "4_merge", + Stage::MergeFailure => "4_merge_failure", + Stage::Done => "5_done", + Stage::Archived => "6_archived", + Stage::Frozen => "7_frozen", + Stage::Unknown(s) => s.as_str(), + } + } +} + +/// A typed snapshot of a single pipeline work item derived from the CRDT document. +/// +/// Access fields exclusively through the typed accessor methods — raw field access is +/// restricted to the `crdt_state` module tree. All `JsonValue` interpretation is +/// confined to `crdt_state::read::extract_item_view`, so no `JsonValue` escapes into +/// the public API. +/// +/// Adding a new field here without also reading it in an accessor produces an +/// `unused field` compiler warning, enforcing the read-side contract at compile time. +#[derive(Clone, Debug)] +pub struct WorkItem { + pub(super) story_id: String, + pub(super) stage: String, + pub(super) name: Option, + pub(super) agent: Option, + pub(super) retry_count: Option, + pub(super) blocked: Option, + pub(super) depends_on: Option>, + /// Node ID of the node that claimed this item (hex-encoded Ed25519 pubkey). + pub(super) claimed_by: Option, + /// Unix timestamp (seconds) when the claim was written. + pub(super) claimed_at: Option, + /// Unix timestamp (seconds) when the item was merged to master. + pub(super) merged_at: Option, + /// QA mode override: `"server"`, `"agent"`, or `"human"`. + pub(super) qa_mode: Option, + /// Whether the auto-assigner has already attempted a mergemaster spawn. + pub(super) mergemaster_attempted: Option, +} + +impl WorkItem { + /// The story identifier (e.g. `"42"` or `"42_story_my_feature"`). + pub fn story_id(&self) -> &str { + &self.story_id + } + + /// Pipeline stage as a typed enum. + pub fn stage(&self) -> Stage { + Stage::from_dir(&self.stage) + } + + /// Raw stage directory string (e.g. `"2_current"`). + pub fn stage_str(&self) -> &str { + &self.stage + } + + /// Human-readable story name, or `None` when unset. + pub fn name(&self) -> Option<&str> { + self.name.as_deref() + } + + /// Agent name pinned to this item, or `None` when unset. + pub fn agent(&self) -> Option<&str> { + self.agent.as_deref() + } + + /// Whether the item is blocked. Returns `false` when the register is unset. + pub fn blocked(&self) -> bool { + self.blocked.unwrap_or(false) + } + + /// Retry counter. Returns `0` when the register is unset. + pub fn retry_count(&self) -> u32 { + self.retry_count.unwrap_or(0).max(0) as u32 + } + + /// Dependency story numbers. Returns an empty slice when unset. + pub fn depends_on(&self) -> &[u32] { + self.depends_on.as_deref().unwrap_or(&[]) + } + + /// Node ID of the current claim holder, or `None` when unclaimed. + pub fn claimed_by(&self) -> Option<&str> { + self.claimed_by.as_deref() + } + + /// Unix timestamp (seconds) when the current claim was written, or `None`. + pub fn claimed_at(&self) -> Option { + self.claimed_at + } + + /// Unix timestamp (seconds) when the item was merged to master, or `None`. + pub fn merged_at(&self) -> Option { + self.merged_at + } + + /// QA mode override (`"server"`, `"agent"`, or `"human"`), or `None` when unset. + pub fn qa_mode(&self) -> Option<&str> { + self.qa_mode.as_deref() + } + + /// Whether a mergemaster spawn has already been attempted. Returns `false` when unset. + pub fn mergemaster_attempted(&self) -> bool { + self.mergemaster_attempted.unwrap_or(false) + } + + /// Construct a `WorkItem` for use in tests outside `crdt_state::*`. + /// + /// Within `crdt_state` use a struct literal directly (fields are `pub(super)`). + /// Each field must be supplied — adding a new field to `WorkItem` without updating + /// this constructor produces a compile error, enforcing the read-side contract. + #[allow(clippy::too_many_arguments)] + pub fn for_test( + story_id: impl Into, + stage: impl Into, + name: Option, + agent: Option, + retry_count: Option, + blocked: Option, + depends_on: Option>, + claimed_by: Option, + claimed_at: Option, + merged_at: Option, + qa_mode: Option, + mergemaster_attempted: Option, + ) -> Self { + Self { + story_id: story_id.into(), + stage: stage.into(), + name, + agent, + retry_count, + blocked, + depends_on, + claimed_by, + claimed_at, + merged_at, + qa_mode, + mergemaster_attempted, + } + } +} + +/// Backward-compatibility alias; prefer [`WorkItem`]. +pub type PipelineItemView = WorkItem; + /// A snapshot of a single node presence entry derived from the CRDT document. #[derive(Clone, Debug, serde::Serialize)] pub struct NodePresenceView { diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 8bc902c0..4aec9895 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -346,12 +346,12 @@ mod tests { write_item_with_content(story_id, "2_current", content, meta); let view = crate::crdt_state::read_item(story_id).expect("story exists in CRDT"); - assert_eq!(view.stage, "2_current"); - assert_eq!(view.name.as_deref(), Some("Typed Name")); - assert_eq!(view.agent.as_deref(), Some("coder-1")); - assert_eq!(view.retry_count, Some(2)); - assert_eq!(view.blocked, Some(true)); - assert_eq!(view.depends_on, Some(vec![100, 200])); + assert_eq!(view.stage_str(), "2_current"); + assert_eq!(view.name(), Some("Typed Name")); + assert_eq!(view.agent(), Some("coder-1")); + assert_eq!(view.retry_count(), 2); + assert!(view.blocked()); + assert_eq!(view.depends_on(), &[100, 200]); // Content is stored verbatim (no parsing, no rewrite). assert_eq!(read_content(story_id).as_deref(), Some(content)); @@ -371,13 +371,15 @@ mod tests { write_item_with_content(story_id, "2_current", content, ItemMeta::default()); let view = crate::crdt_state::read_item(story_id).expect("story exists in CRDT"); - assert_eq!(view.stage, "2_current"); + assert_eq!(view.stage_str(), "2_current"); assert_eq!( - view.name, None, + view.name(), + None, "name must come from typed meta, not parsed YAML" ); assert_eq!( - view.agent, None, + view.agent(), + None, "agent must come from typed meta, not parsed YAML" ); } diff --git a/server/src/http/mcp/merge_tools.rs b/server/src/http/mcp/merge_tools.rs index 253dd6c7..c7d18d01 100644 --- a/server/src/http/mcp/merge_tools.rs +++ b/server/src/http/mcp/merge_tools.rs @@ -17,7 +17,7 @@ pub(super) async fn tool_merge_agent_work( // Check CRDT stage before attempting merge — if already done or archived, // return success immediately to avoid spurious error notifications. if let Some(item) = crate::crdt_state::read_item(story_id) - && crate::pipeline_state::Stage::from_dir(&item.stage).is_some_and(|s| { + && crate::pipeline_state::Stage::from_dir(item.stage_str()).is_some_and(|s| { matches!( s, crate::pipeline_state::Stage::Done { .. } @@ -31,7 +31,7 @@ pub(super) async fn tool_merge_agent_work( "success": true, "message": format!( "Story '{}' is already in '{}' — no merge needed.", - story_id, item.stage + story_id, item.stage_str() ), })) .map_err(|e| format!("Serialization error: {e}")); diff --git a/server/src/http/mcp/status_tools.rs b/server/src/http/mcp/status_tools.rs index 890056e2..f6f1915b 100644 --- a/server/src/http/mcp/status_tools.rs +++ b/server/src/http/mcp/status_tools.rs @@ -215,12 +215,12 @@ pub(super) async fn tool_status(args: &Value, ctx: &AppContext) -> Result 0.0 { front_matter.insert("claimed_at".to_string(), json!(ca)); diff --git a/server/src/http/mcp/story_tools/story/update.rs b/server/src/http/mcp/story_tools/story/update.rs index b63cea1f..0a57b8e5 100644 --- a/server/src/http/mcp/story_tools/story/update.rs +++ b/server/src/http/mcp/story_tools/story/update.rs @@ -233,8 +233,8 @@ mod tests { assert!(r.is_ok(), "set [1,2,3] should succeed: {r:?}"); let view = crate::crdt_state::read_item("888_deps_scr").expect("CRDT must have story"); assert_eq!( - view.depends_on, - Some(vec![1, 2, 3]), + view.depends_on(), + &[1, 2, 3], "CRDT should hold [1,2,3] after set" ); @@ -245,9 +245,9 @@ mod tests { ); assert!(r.is_ok(), "clear [] should succeed: {r:?}"); let view = crate::crdt_state::read_item("888_deps_scr").expect("CRDT must have story"); - assert_eq!( - view.depends_on, None, - "CRDT should be None after clearing to []" + assert!( + view.depends_on().is_empty(), + "CRDT should be empty after clearing to []" ); // Replace with [4, 5] — must not append to previous [1,2,3]. @@ -258,8 +258,8 @@ mod tests { assert!(r.is_ok(), "replace [4,5] should succeed: {r:?}"); let view = crate::crdt_state::read_item("888_deps_scr").expect("CRDT must have story"); assert_eq!( - view.depends_on, - Some(vec![4, 5]), + view.depends_on(), + &[4, 5], "CRDT should hold exactly [4,5] after replace (not [1,2,3,4,5])" ); } @@ -290,7 +290,10 @@ mod tests { ); assert!(r.is_ok(), "clear should succeed: {r:?}"); let view = crate::crdt_state::read_item("888_deps_persist").expect("CRDT must have story"); - assert_eq!(view.depends_on, None, "CRDT should be None after clear"); + assert!( + view.depends_on().is_empty(), + "CRDT should be empty after clear" + ); // Now update a different field — this triggers write_story_content with // the stale YAML (which still has depends_on: [100, 200]). @@ -300,11 +303,11 @@ mod tests { ); assert!(r.is_ok(), "subsequent name update should succeed: {r:?}"); - // The CRDT must still be None — the YAML value must not have been restored. + // The CRDT must still be empty — the YAML value must not have been restored. let view = crate::crdt_state::read_item("888_deps_persist").expect("CRDT must have story"); - assert_eq!( - view.depends_on, None, - "CRDT depends_on must remain None after unrelated update (write_story_content must not restore YAML value)" + assert!( + view.depends_on().is_empty(), + "CRDT depends_on must remain empty after unrelated update (write_story_content must not restore YAML value)" ); } @@ -327,8 +330,8 @@ mod tests { // CRDT register must hold the deps. let view = crate::crdt_state::read_item("504_arr_test").expect("CRDT must have the story"); assert_eq!( - view.depends_on, - Some(vec![490, 491]), + view.depends_on(), + &[490, 491], "CRDT register should hold [490, 491]: {view:?}" ); diff --git a/server/src/io/story_metadata/parser.rs b/server/src/io/story_metadata/parser.rs index 3001da10..891d7eb5 100644 --- a/server/src/io/story_metadata/parser.rs +++ b/server/src/io/story_metadata/parser.rs @@ -23,7 +23,7 @@ pub fn parse_unchecked_todos(contents: &str) -> Vec { /// spikes themselves. pub fn resolve_qa_mode(story_id: &str, default: QaMode) -> QaMode { crate::crdt_state::read_item(story_id) - .and_then(|view| view.qa_mode) + .and_then(|view| view.qa_mode().map(str::to_string)) .as_deref() .and_then(QaMode::from_str) .unwrap_or(default) diff --git a/server/src/pipeline_state/projection.rs b/server/src/pipeline_state/projection.rs index 16d29318..064d28c7 100644 --- a/server/src/pipeline_state/projection.rs +++ b/server/src/pipeline_state/projection.rs @@ -36,22 +36,22 @@ impl fmt::Display for ProjectionError { impl std::error::Error for ProjectionError {} -// ── Projection: PipelineItemView → PipelineItem ───────────────────────────── +// ── Projection: WorkItem → PipelineItem ───────────────────────────────────── impl TryFrom<&PipelineItemView> for PipelineItem { type Error = ProjectionError; fn try_from(view: &PipelineItemView) -> Result { - let story_id = StoryId(view.story_id.clone()); - let name = view.name.clone().unwrap_or_default(); + let story_id = StoryId(view.story_id().to_string()); + let name = view.name().unwrap_or("").to_string(); let depends_on: Vec = view - .depends_on - .as_ref() - .map(|deps| deps.iter().map(|d| StoryId(d.to_string())).collect()) - .unwrap_or_default(); + .depends_on() + .iter() + .map(|d| StoryId(d.to_string())) + .collect(); - let retry_count = view.retry_count.unwrap_or(0).max(0) as u32; + let retry_count = view.retry_count(); let stage = project_stage(view)?; @@ -65,11 +65,11 @@ impl TryFrom<&PipelineItemView> for PipelineItem { } } -/// Project the stage string + associated fields from a PipelineItemView into +/// Project the stage string + associated fields from a WorkItem into /// a typed Stage enum. This is the one carefully-controlled boundary where /// loose CRDT data becomes typed. pub fn project_stage(view: &PipelineItemView) -> Result { - match view.stage.as_str() { + match view.stage_str() { "0_upcoming" => Ok(Stage::Upcoming), "1_backlog" => Ok(Stage::Backlog), "2_blocked" => Ok(Stage::Blocked { @@ -82,7 +82,7 @@ pub fn project_stage(view: &PipelineItemView) -> Result // commits_ahead — those are computed at transition time. For // projection from existing CRDT data, we synthesize defaults. // The feature branch follows the naming convention. - let branch = format!("feature/story-{}", view.story_id); + let branch = format!("feature/story-{}", view.story_id()); // Existing CRDT data doesn't track commits_ahead, so we use 1 as // a safe non-zero default (the item is in merge, so there must be // at least one commit). @@ -105,7 +105,7 @@ pub fn project_stage(view: &PipelineItemView) -> Result // to UNIX_EPOCH, which makes them older than any retention window // and therefore eligible for immediate sweep to 6_archived. let merged_at = view - .merged_at + .merged_at() .map(|ts| { DateTime::from_timestamp(ts as i64, 0).unwrap_or(DateTime::::UNIX_EPOCH) }) @@ -117,7 +117,7 @@ pub fn project_stage(view: &PipelineItemView) -> Result } "6_archived" => { // Determine the archive reason from the CRDT fields. - let reason = if view.blocked == Some(true) { + let reason = if view.blocked() { ArchiveReason::Blocked { reason: "migrated from legacy blocked field".to_string(), } @@ -133,7 +133,7 @@ pub fn project_stage(view: &PipelineItemView) -> Result "7_frozen" => { // The stage to resume to is stored in front matter as `resume_to_stage`. // Fall back to Coding if the field is absent (e.g. legacy frozen items). - let resume_to = crate::db::read_content(&view.story_id) + let resume_to = crate::db::read_content(view.story_id()) .and_then(|content| { crate::db::yaml_legacy::parse_front_matter(&content) .ok() @@ -186,7 +186,7 @@ pub fn read_all_typed() -> Vec { Err(e) => { crate::slog!( "[pipeline_state] projection error for '{}': {e}", - v.story_id + v.story_id() ); None } @@ -221,42 +221,46 @@ mod tests { StoryId(s.to_string()) } + fn make_view(story_id: &str, stage: &str, name: Option<&str>) -> PipelineItemView { + PipelineItemView::for_test( + story_id, + stage, + name.map(str::to_string), + None, + None, + None, + None, + None, + None, + None, + None, + None, + ) + } + #[test] fn project_upcoming_item() { - let view = PipelineItemView { - story_id: "42_story_test".to_string(), - stage: "0_upcoming".to_string(), - name: Some("Test Story".to_string()), - agent: None, - retry_count: None, - blocked: None, - depends_on: None, - claimed_by: None, - claimed_at: None, - merged_at: None, - qa_mode: None, - mergemaster_attempted: None, - }; + let view = make_view("42_story_test", "0_upcoming", Some("Test Story")); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Upcoming)); } #[test] fn project_backlog_item() { - let view = PipelineItemView { - story_id: "42_story_test".to_string(), - stage: "1_backlog".to_string(), - name: Some("Test Story".to_string()), - agent: None, - retry_count: None, - blocked: None, - depends_on: Some(vec![10, 20]), - claimed_by: None, - claimed_at: None, - merged_at: None, - qa_mode: None, - mergemaster_attempted: None, - }; + let view = PipelineItemView::for_test( + "42_story_test", + "1_backlog", + Some("Test Story".to_string()), + None, + None, + None, + Some(vec![10, 20]), + None, + None, + None, + None, + None, + ); let item = PipelineItem::try_from(&view).unwrap(); assert_eq!(item.story_id, StoryId("42_story_test".to_string())); assert_eq!(item.name, "Test Story"); @@ -267,20 +271,20 @@ mod tests { #[test] fn project_current_item() { - let view = PipelineItemView { - story_id: "42_story_test".to_string(), - stage: "2_current".to_string(), - name: Some("Test".to_string()), - agent: Some("coder-1".to_string()), - retry_count: Some(2), - blocked: None, - depends_on: None, - claimed_by: None, - claimed_at: None, - merged_at: None, - qa_mode: None, - mergemaster_attempted: None, - }; + let view = PipelineItemView::for_test( + "42_story_test", + "2_current", + Some("Test".to_string()), + Some("coder-1".to_string()), + Some(2), + None, + None, + None, + None, + None, + None, + None, + ); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Coding)); assert_eq!(item.retry_count, 2); @@ -288,20 +292,7 @@ mod tests { #[test] fn project_merge_item() { - let view = PipelineItemView { - story_id: "42_story_test".to_string(), - stage: "4_merge".to_string(), - name: Some("Test".to_string()), - agent: None, - retry_count: None, - blocked: None, - depends_on: None, - claimed_by: None, - claimed_at: None, - merged_at: None, - qa_mode: None, - mergemaster_attempted: None, - }; + let view = make_view("42_story_test", "4_merge", Some("Test")); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Merge { .. })); if let Stage::Merge { @@ -316,40 +307,27 @@ mod tests { #[test] fn project_blocked_item() { - let view = PipelineItemView { - story_id: "42_story_test".to_string(), - stage: "2_blocked".to_string(), - name: Some("Test".to_string()), - agent: None, - retry_count: None, - blocked: None, - depends_on: None, - claimed_by: None, - claimed_at: None, - merged_at: None, - qa_mode: None, - mergemaster_attempted: None, - }; + let view = make_view("42_story_test", "2_blocked", Some("Test")); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Blocked { .. })); } #[test] fn project_archived_blocked_item() { - let view = PipelineItemView { - story_id: "42_story_test".to_string(), - stage: "6_archived".to_string(), - name: Some("Test".to_string()), - agent: None, - retry_count: None, - blocked: Some(true), - depends_on: None, - claimed_by: None, - claimed_at: None, - merged_at: None, - qa_mode: None, - mergemaster_attempted: None, - }; + let view = PipelineItemView::for_test( + "42_story_test", + "6_archived", + Some("Test".to_string()), + None, + None, + Some(true), + None, + None, + None, + None, + None, + None, + ); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!( item.stage, @@ -362,20 +340,20 @@ mod tests { #[test] fn project_archived_completed_item() { - let view = PipelineItemView { - story_id: "42_story_test".to_string(), - stage: "6_archived".to_string(), - name: Some("Test".to_string()), - agent: None, - retry_count: None, - blocked: Some(false), - depends_on: None, - claimed_by: None, - claimed_at: None, - merged_at: None, - qa_mode: None, - mergemaster_attempted: None, - }; + let view = PipelineItemView::for_test( + "42_story_test", + "6_archived", + Some("Test".to_string()), + None, + None, + Some(false), + None, + None, + None, + None, + None, + None, + ); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!( item.stage, @@ -388,20 +366,7 @@ mod tests { #[test] fn project_unknown_stage_returns_error() { - let view = PipelineItemView { - story_id: "42_story_test".to_string(), - stage: "9_invalid".to_string(), - name: Some("Test".to_string()), - agent: None, - retry_count: None, - blocked: None, - depends_on: None, - claimed_by: None, - claimed_at: None, - merged_at: None, - qa_mode: None, - mergemaster_attempted: None, - }; + let view = make_view("42_story_test", "9_invalid", Some("Test")); let result = PipelineItem::try_from(&view); assert!(matches!( result,