From c7a7cb42817f565f58369a8fd6241b1d02eeaef1 Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 14 May 2026 11:01:06 +0000 Subject: [PATCH] huskies: merge 997 --- server/src/agent_mode/claim.rs | 2 +- server/src/agents/pool/auto_assign/merge.rs | 1 + .../merge_failure_block_subscriber.rs | 2 + .../auto_assign/merge_failure_subscriber.rs | 1 + .../src/agents/pool/auto_assign/pipeline.rs | 1 + server/src/agents/pool/auto_assign/scan.rs | 1 + .../agents/pool/auto_assign/story_checks.rs | 19 +-- .../watchdog/tests/limits_tests.rs | 12 +- .../src/agents/pool/cost_rollup_subscriber.rs | 3 + .../src/agents/pool/start/tests_selection.rs | 1 - server/src/chat/commands/backlog.rs | 4 +- server/src/chat/commands/status/tests.rs | 18 ++- server/src/chat/commands/unblock.rs | 12 +- server/src/chat/transport/matrix/assign.rs | 3 - server/src/chat/transport/matrix/delete.rs | 1 - server/src/crdt_snapshot/tests.rs | 44 +----- server/src/crdt_state/ops.rs | 10 +- server/src/crdt_state/read.rs | 20 ++- server/src/crdt_state/state/tests.rs | 11 +- server/src/crdt_state/types.rs | 17 ++- server/src/crdt_state/write/item.rs | 130 +++++++++++------- server/src/crdt_state/write/migrations.rs | 5 +- server/src/crdt_state/write/tests.rs | 55 ++------ server/src/db/mod.rs | 14 +- server/src/db/ops.rs | 6 +- server/src/db/recover.rs | 1 - server/src/http/mcp/merge_tools.rs | 2 - server/src/http/workflow/pipeline.rs | 9 +- server/src/io/watcher/tests.rs | 3 +- server/src/pipeline_state/events.rs | 2 + server/src/pipeline_state/projection.rs | 17 +-- server/src/pipeline_state/tests.rs | 25 ++++ server/src/pipeline_state/transition.rs | 10 ++ server/src/pipeline_state/types.rs | 29 +++- server/src/service/agents/mod.rs | 2 +- server/src/service/notifications/format.rs | 8 +- server/src/service/work_item/assign.rs | 2 - server/src/service/work_item/delete.rs | 1 - server/src/worktree/cleanup.rs | 1 + server/src/worktree/sweep.rs | 4 + 40 files changed, 256 insertions(+), 253 deletions(-) diff --git a/server/src/agent_mode/claim.rs b/server/src/agent_mode/claim.rs index 1da638cd..d5c2c49a 100644 --- a/server/src/agent_mode/claim.rs +++ b/server/src/agent_mode/claim.rs @@ -101,12 +101,12 @@ mod tests { .unwrap(), }), plan: Default::default(), + retries: 0, }, Some("Stale Claim Displacement Test"), None, None, None, - None, ); // Confirm the stale claim is in place. diff --git a/server/src/agents/pool/auto_assign/merge.rs b/server/src/agents/pool/auto_assign/merge.rs index c5d88446..8051882f 100644 --- a/server/src/agents/pool/auto_assign/merge.rs +++ b/server/src/agents/pool/auto_assign/merge.rs @@ -39,6 +39,7 @@ impl AgentPool { feature_branch: BranchName(String::new()), commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), claim: None, + retries: 0, }; let merge_items = scan_stage_items(&merge_stage); for story_id in &merge_items { diff --git a/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs b/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs index 19bf4130..aaae6be1 100644 --- a/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs +++ b/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs @@ -144,6 +144,7 @@ mod tests { feature_branch: BranchName("feature/test".to_string()), commits_ahead: NonZeroU32::new(1).unwrap(), claim: None, + retries: 0, }, after: Stage::MergeFailure { kind: kind.clone(), @@ -166,6 +167,7 @@ mod tests { after: Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, event: PipelineEvent::FixupRequested, at: chrono::Utc::now(), diff --git a/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs b/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs index 79498048..55610e95 100644 --- a/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs +++ b/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs @@ -161,6 +161,7 @@ mod tests { feature_branch: BranchName("feature/test".to_string()), commits_ahead: NonZeroU32::new(1).unwrap(), claim: None, + retries: 0, }, after: crate::pipeline_state::Stage::MergeFailure { kind: kind.clone(), diff --git a/server/src/agents/pool/auto_assign/pipeline.rs b/server/src/agents/pool/auto_assign/pipeline.rs index a07b1313..f46d79e9 100644 --- a/server/src/agents/pool/auto_assign/pipeline.rs +++ b/server/src/agents/pool/auto_assign/pipeline.rs @@ -31,6 +31,7 @@ impl AgentPool { Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, PipelineStage::Coder, ), diff --git a/server/src/agents/pool/auto_assign/scan.rs b/server/src/agents/pool/auto_assign/scan.rs index 1d360bc0..29212932 100644 --- a/server/src/agents/pool/auto_assign/scan.rs +++ b/server/src/agents/pool/auto_assign/scan.rs @@ -222,6 +222,7 @@ mod tests { let items = scan_stage_items(&Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }); // The global CRDT may contain items from other tests, so check // that our three items are present and appear in sorted order. diff --git a/server/src/agents/pool/auto_assign/story_checks.rs b/server/src/agents/pool/auto_assign/story_checks.rs index b9385cb1..a1384044 100644 --- a/server/src/agents/pool/auto_assign/story_checks.rs +++ b/server/src/agents/pool/auto_assign/story_checks.rs @@ -96,7 +96,6 @@ mod tests { None, None, None, - None, ); assert!(has_review_hold("890_spike_held")); } @@ -112,7 +111,6 @@ mod tests { None, None, None, - None, ); assert!(!has_review_hold("890_spike_active_qa")); } @@ -186,7 +184,6 @@ mod tests { "2_current", Some("Blocked"), None, - None, Some("[999]"), None, ); @@ -196,21 +193,12 @@ mod tests { #[test] fn has_unmet_dependencies_returns_false_when_dep_done() { crate::crdt_state::init_for_test(); - crate::crdt_state::write_item_str( - "999_story_dep", - "5_done", - Some("Dep"), - None, - None, - None, - None, - ); + crate::crdt_state::write_item_str("999_story_dep", "5_done", Some("Dep"), None, None, None); crate::crdt_state::write_item_str( "10_story_ok", "2_current", Some("Ok"), None, - None, Some("[999]"), None, ); @@ -227,7 +215,6 @@ mod tests { None, None, None, - None, ); assert!(!has_unmet_dependencies("5_story_free")); } @@ -245,14 +232,12 @@ mod tests { None, None, None, - None, ); crate::crdt_state::write_item_str( "503_story_dependent", "1_backlog", Some("Dependent"), None, - None, Some("[500]"), None, ); @@ -271,14 +256,12 @@ mod tests { None, None, None, - None, ); crate::crdt_state::write_item_str( "503_story_waiting", "1_backlog", Some("Waiting"), None, - None, Some("[490]"), None, ); diff --git a/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs b/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs index c0447c22..501c0071 100644 --- a/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs +++ b/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs @@ -250,7 +250,6 @@ max_turns = 10 None, None, None, - None, ); // 12 turns in a single session exceeds the configured max of 10. @@ -378,7 +377,6 @@ max_turns = 10 None, None, None, - None, ); // Prior session with 5 turns (under limit alone). @@ -448,15 +446,7 @@ max_turns = 10 let initial = "---\nname: Retry Test\n---\n"; crate::crdt_state::init_for_test(); crate::db::write_content(crate::db::ContentKey::Story(story_id), initial); - crate::crdt_state::write_item_str( - story_id, - "2_current", - Some("Retry Test"), - None, - None, - None, - None, - ); + crate::crdt_state::write_item_str(story_id, "2_current", Some("Retry Test"), None, None, None); // Session 1: exceeds limit → retry_count=1 in CRDT, NOT blocked. { diff --git a/server/src/agents/pool/cost_rollup_subscriber.rs b/server/src/agents/pool/cost_rollup_subscriber.rs index 6424ebf6..ed19c6ab 100644 --- a/server/src/agents/pool/cost_rollup_subscriber.rs +++ b/server/src/agents/pool/cost_rollup_subscriber.rs @@ -135,6 +135,7 @@ mod tests { feature_branch: BranchName("feature/test".to_string()), commits_ahead: NonZeroU32::new(1).unwrap(), claim: None, + retries: 0, }, after: Stage::Done { merged_at: Utc::now(), @@ -153,6 +154,7 @@ mod tests { before: Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, after: Stage::Abandoned { ts: Utc::now() }, event: PipelineEvent::Abandon, @@ -167,6 +169,7 @@ mod tests { after: Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, event: PipelineEvent::DepsMet, at: Utc::now(), diff --git a/server/src/agents/pool/start/tests_selection.rs b/server/src/agents/pool/start/tests_selection.rs index 214fac3f..405e005f 100644 --- a/server/src/agents/pool/start/tests_selection.rs +++ b/server/src/agents/pool/start/tests_selection.rs @@ -293,7 +293,6 @@ stage = "coder" Some("coder-opus"), None, None, - None, ); let pool = AgentPool::new_test(3011); diff --git a/server/src/chat/commands/backlog.rs b/server/src/chat/commands/backlog.rs index 1dd4146c..21e03347 100644 --- a/server/src/chat/commands/backlog.rs +++ b/server/src/chat/commands/backlog.rs @@ -70,7 +70,6 @@ mod tests { name: name.to_string(), stage, depends_on: Vec::new(), - retry_count: 0, } } @@ -80,7 +79,6 @@ mod tests { name: name.to_string(), stage, depends_on: deps.iter().map(|n| StoryId(n.to_string())).collect(), - retry_count: 0, } } @@ -96,6 +94,7 @@ mod tests { Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, ), make_item("30_story_in_qa", "In QA", Stage::Qa), @@ -240,6 +239,7 @@ mod tests { Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, )]; let output = build_backlog_from_items(&items); diff --git a/server/src/chat/commands/status/tests.rs b/server/src/chat/commands/status/tests.rs index ac862f24..3f02908a 100644 --- a/server/src/chat/commands/status/tests.rs +++ b/server/src/chat/commands/status/tests.rs @@ -12,7 +12,6 @@ fn make_item(id: &str, name: &str, stage: Stage) -> PipelineItem { name: name.to_string(), stage, depends_on: Vec::new(), - retry_count: 0, } } @@ -23,7 +22,6 @@ fn make_item_with_deps(id: &str, name: &str, stage: Stage, deps: Vec) -> Pi name: name.to_string(), stage, depends_on: deps.iter().map(|n| StoryId(n.to_string())).collect(), - retry_count: 0, } } @@ -140,6 +138,7 @@ fn status_does_not_show_full_filename_stem() { Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, )]; @@ -169,6 +168,7 @@ fn status_shows_cost_when_token_usage_exists() { Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, )]; @@ -209,6 +209,7 @@ fn status_no_cost_when_no_usage() { Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, )]; @@ -232,6 +233,7 @@ fn status_aggregates_multiple_records_per_story() { Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, )]; @@ -279,6 +281,7 @@ fn status_shows_waiting_on_for_story_with_unmet_deps() { Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, vec![999], ), @@ -307,6 +310,7 @@ fn status_does_not_show_waiting_on_when_dep_is_done() { Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, vec![999], ), @@ -340,6 +344,7 @@ fn status_shows_no_waiting_info_when_no_deps() { Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, )]; @@ -406,7 +411,8 @@ fn stage_is_blocked_returns_false_for_coding() { assert!(!matches!( Stage::Coding { claim: None, - plan: Default::default() + plan: Default::default(), + retries: 0, }, Stage::Blocked { .. } | Stage::MergeFailure { .. } @@ -449,6 +455,7 @@ fn status_shows_idle_dot_for_unassigned_story() { Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, )]; @@ -541,6 +548,7 @@ fn merge_stage() -> Stage { feature_branch: BranchName("feature/test".to_string()), commits_ahead: std::num::NonZeroU32::new(1).unwrap(), claim: None, + retries: 0, } } @@ -823,6 +831,7 @@ fn in_progress_count_includes_blocked_items() { Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, ), make_item( @@ -858,6 +867,7 @@ fn frozen_coding_item_appears_in_in_progress_section() { resume_to: Box::new(Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }), }, )]; @@ -919,6 +929,7 @@ fn frozen_item_shows_snowflake_indicator() { resume_to: Box::new(Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }), }, )]; @@ -952,6 +963,7 @@ fn frozen_and_blocked_use_distinct_indicators() { resume_to: Box::new(Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }), }, ), diff --git a/server/src/chat/commands/unblock.rs b/server/src/chat/commands/unblock.rs index 07a31c12..f62157fa 100644 --- a/server/src/chat/commands/unblock.rs +++ b/server/src/chat/commands/unblock.rs @@ -218,7 +218,6 @@ mod tests { "2_blocked", Some("Stuck Story"), None, - Some(5), None, None, ); @@ -289,15 +288,7 @@ mod tests { ); // Seed CRDT registers: blocked=true, retry_count=5, with a name so the // response can echo it back instead of falling through to the raw id. - crate::crdt_state::write_item_str( - story_id, - stage, - Some("Stuck Story"), - None, - Some(5), - None, - None, - ); + crate::crdt_state::write_item_str(story_id, stage, Some("Stuck Story"), None, None, None); let output = unblock_cmd_with_root(tmp.path(), "9904").unwrap(); @@ -351,7 +342,6 @@ mod tests { "blocked", Some("In QA"), None, - Some(3), None, None, ); diff --git a/server/src/chat/transport/matrix/assign.rs b/server/src/chat/transport/matrix/assign.rs index 3d742918..704ae676 100644 --- a/server/src/chat/transport/matrix/assign.rs +++ b/server/src/chat/transport/matrix/assign.rs @@ -323,7 +323,6 @@ mod tests { None, None, None, - None, ); let agents = std::sync::Arc::new(AgentPool::new_test(3000)); @@ -376,7 +375,6 @@ mod tests { None, None, None, - None, ); let agents = std::sync::Arc::new(AgentPool::new_test(3000)); @@ -424,7 +422,6 @@ mod tests { Some("coder-sonnet"), None, None, - None, ); let agents = std::sync::Arc::new(AgentPool::new_test(3000)); diff --git a/server/src/chat/transport/matrix/delete.rs b/server/src/chat/transport/matrix/delete.rs index d3cfb8b4..e1d4d951 100644 --- a/server/src/chat/transport/matrix/delete.rs +++ b/server/src/chat/transport/matrix/delete.rs @@ -253,7 +253,6 @@ mod tests { None, None, None, - None, ); // Seed in content store so find_story_by_number can resolve it. diff --git a/server/src/crdt_snapshot/tests.rs b/server/src/crdt_snapshot/tests.rs index 64d7637a..08b11922 100644 --- a/server/src/crdt_snapshot/tests.rs +++ b/server/src/crdt_snapshot/tests.rs @@ -231,24 +231,8 @@ fn snapshot_generation_includes_manifest() { crate::crdt_state::init_for_test(); // Write some items to populate ALL_OPS. - crate::crdt_state::write_item_str( - "636_test_a", - "1_backlog", - Some("Test A"), - None, - None, - None, - None, - ); - crate::crdt_state::write_item_str( - "636_test_b", - "2_current", - Some("Test B"), - None, - None, - None, - None, - ); + crate::crdt_state::write_item_str("636_test_a", "1_backlog", Some("Test A"), None, None, None); + crate::crdt_state::write_item_str("636_test_b", "2_current", Some("Test B"), None, None, None); let snapshot = generate_snapshot(); assert!(snapshot.is_some()); @@ -277,7 +261,6 @@ fn attribution_query_by_story_id() { None, None, None, - None, ); let snapshot = generate_snapshot().unwrap(); @@ -312,7 +295,6 @@ fn compaction_reduces_ops() { None, None, None, - None, ); } @@ -348,7 +330,6 @@ fn latest_snapshot_available_after_compaction() { None, None, None, - None, ); let snapshot = generate_snapshot().unwrap(); @@ -618,26 +599,9 @@ fn attribution_preserved_after_compaction() { Some("coder-opus"), None, None, - None, - ); - crate::crdt_state::write_item_str( - "636_archived_story", - "2_current", - None, - None, - None, - None, - None, - ); - crate::crdt_state::write_item_str( - "636_archived_story", - "6_archived", - None, - None, - None, - None, - None, ); + crate::crdt_state::write_item_str("636_archived_story", "2_current", None, None, None, None); + crate::crdt_state::write_item_str("636_archived_story", "6_archived", None, None, None, None); // Generate snapshot. let snapshot = generate_snapshot().unwrap(); diff --git a/server/src/crdt_state/ops.rs b/server/src/crdt_state/ops.rs index a6669d2d..cc14901a 100644 --- a/server/src/crdt_state/ops.rs +++ b/server/src/crdt_state/ops.rs @@ -542,15 +542,7 @@ mod tests { ); // Attempt resurrection via write_item — must be rejected by tombstone check. - write_item_str( - story_id, - "1_backlog", - Some("Resurrected"), - None, - None, - None, - None, - ); + write_item_str(story_id, "1_backlog", Some("Resurrected"), None, None, None); assert!( read_item(story_id).is_none(), "tombstoned story must not be resurrected by write_item after remote delete" diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index d8848cd0..a4197d58 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -348,7 +348,7 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option s.parse::().ok(), _ => None, }; - let retry_count = match item.retry_count.view() { + let retry_count_register = match item.retry_count.view() { JsonValue::Number(n) if n >= 0.0 => n as u32, _ => 0u32, }; @@ -411,6 +411,7 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option Option Option, claim_ts_secs: Option, plan_state_str: Option<&str>, + retries: u32, ) -> Option { use crate::pipeline_state::{ AgentClaim, AgentName, ArchiveReason, BranchName, GitSha, PlanState, Stage, @@ -482,6 +484,7 @@ fn project_stage_for_view( .unwrap_or(Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }), ) }; @@ -504,6 +507,7 @@ fn project_stage_for_view( "coding" => Some(Stage::Coding { claim, plan: PlanState::from_str(plan_state_str.unwrap_or("")), + retries, }), "qa" => Some(Stage::Qa), "blocked" => Some(Stage::Blocked { @@ -513,6 +517,7 @@ fn project_stage_for_view( feature_branch: BranchName(format!("feature/story-{story_id}")), commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), claim, + retries, }), "merge_failure" => { // Story 986: read the typed kind directly from ContentKey::MergeFailureKind @@ -675,7 +680,7 @@ mod tests { let item_json: JsonValue = json!({ "story_id": "40_story_view", - "stage": "qa", + "stage": "2_current", "name": "View Test", "agent": "coder-1", "retry_count": 2.0, @@ -691,10 +696,13 @@ mod tests { let view = extract_item_view(&crdt.doc.items[0]).unwrap(); assert_eq!(view.story_id, "40_story_view"); - assert!(matches!(view.stage, crate::pipeline_state::Stage::Qa)); + assert!(matches!( + view.stage, + crate::pipeline_state::Stage::Coding { .. } + )); assert_eq!(view.name, "View Test"); assert_eq!(view.agent.map(|a| a.as_str()), Some("coder-1")); - assert_eq!(view.retry_count, 2u32); + assert_eq!(view.retry_count(), 2u32); assert_eq!(view.depends_on, vec![10u32, 20u32]); } @@ -749,7 +757,6 @@ mod tests { None, None, None, - None, ); // The story is live on this node. @@ -817,7 +824,6 @@ mod tests { None, None, None, - None, ); assert!( read_item(story_id).is_none(), diff --git a/server/src/crdt_state/state/tests.rs b/server/src/crdt_state/state/tests.rs index e195354a..3e37c8f8 100644 --- a/server/src/crdt_state/state/tests.rs +++ b/server/src/crdt_state/state/tests.rs @@ -116,7 +116,6 @@ async fn subscribe_receives_stage_transition_events() { None, None, None, - None, ); // Drain any stale events from concurrent tests until we see ours. @@ -136,15 +135,7 @@ async fn subscribe_receives_stage_transition_events() { )); // Update stage — emit_event fires again with the real from_stage. - write_item_str( - "906_story_subscribe", - "2_current", - None, - None, - None, - None, - None, - ); + write_item_str("906_story_subscribe", "2_current", None, None, None, None); let evt: CrdtEvent = loop { let e = rx diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs index be175c06..b9c24de9 100644 --- a/server/src/crdt_state/types.rs +++ b/server/src/crdt_state/types.rs @@ -190,8 +190,6 @@ pub struct WorkItem { /// treated as malformed, not surfaced with an empty string. pub(super) name: String, pub(super) agent: Option, - /// Retry counter — `0` when the CRDT register is unset. - pub(super) retry_count: u32, /// Dependency story numbers — empty `Vec` when the register is unset. pub(super) depends_on: Vec, /// QA mode override. `None` means "use the project default". @@ -226,9 +224,16 @@ impl WorkItem { self.agent } - /// Retry counter. Returns `0` when the register is unset. + /// Retry counter projected from the Stage variant. + /// + /// Returns `retries` from `Stage::Coding` or `Stage::Merge`; `0` for all + /// other stages (QA, Backlog, Done, etc. carry no retry state). pub fn retry_count(&self) -> u32 { - self.retry_count + match &self.stage { + crate::pipeline_state::Stage::Coding { retries, .. } => *retries, + crate::pipeline_state::Stage::Merge { retries, .. } => *retries, + _ => 0, + } } /// Dependency story numbers. Returns an empty slice when unset. @@ -262,7 +267,6 @@ impl WorkItem { stage: crate::pipeline_state::Stage, name: impl Into, agent: Option, - retry_count: u32, depends_on: Vec, qa_mode: Option, item_type: Option, @@ -273,7 +277,6 @@ impl WorkItem { stage, name: name.into(), agent, - retry_count, depends_on, qa_mode, item_type, @@ -525,6 +528,7 @@ mod tests { to_stage: crate::pipeline_state::Stage::Coding { claim: None, plan: crate::pipeline_state::PlanState::Missing, + retries: 0, }, name: "Foo Feature".to_string(), }; @@ -688,6 +692,7 @@ mod tests { to_stage: Stage::Coding { claim: None, plan: crate::pipeline_state::PlanState::Missing, + retries: 0, }, name: "Broadcast Test".to_string(), }; diff --git a/server/src/crdt_state/write/item.rs b/server/src/crdt_state/write/item.rs index 34291eae..be3e729e 100644 --- a/server/src/crdt_state/write/item.rs +++ b/server/src/crdt_state/write/item.rs @@ -245,12 +245,13 @@ pub fn set_plan_state(story_id: &str, state: crate::pipeline_state::PlanState) - /// /// `stage` is the typed pipeline state; it is serialised to the canonical /// clean wire form (story 934) via [`stage_dir_name`] at the CRDT boundary. +/// The `retries` count embedded in `Stage::Coding` / `Stage::Merge` is +/// automatically written to the `retry_count` CRDT register (story 997). pub fn write_item( story_id: &str, stage: &Stage, name: Option<&str>, agent: Option<&str>, - retry_count: Option, depends_on: Option<&str>, merged_at: Option, ) { @@ -260,6 +261,12 @@ pub fn write_item( Stage::Merge { claim, .. } => claim.as_ref(), _ => None, }; + // Extract retries from the Stage payload; non-Coding/Merge stages store 0. + let stage_retries: f64 = match stage { + Stage::Coding { retries, .. } => *retries as f64, + Stage::Merge { retries, .. } => *retries as f64, + _ => 0.0, + }; let Some(state_mutex) = get_crdt() else { return; }; @@ -307,11 +314,9 @@ pub fn write_item( s.crdt.doc.items[idx].agent.set(a.to_string()) }); } - if let Some(rc) = retry_count { - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].retry_count.set(rc as f64) - }); - } + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].retry_count.set(stage_retries) + }); if let Some(d) = depends_on { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].depends_on.set(d.to_string()) @@ -365,7 +370,7 @@ pub fn write_item( "stage": stage_str, "name": name.unwrap_or(""), "agent": agent.unwrap_or(""), - "retry_count": retry_count.unwrap_or(0) as f64, + "retry_count": stage_retries, "depends_on": depends_on.unwrap_or(""), "claim_agent": insert_claim_agent, "claim_ts": insert_claim_ts, @@ -429,7 +434,6 @@ pub fn write_item_str( stage: &str, name: Option<&str>, agent: Option<&str>, - retry_count: Option, depends_on: Option<&str>, merged_at: Option, ) { @@ -453,58 +457,88 @@ pub fn write_item_str( crate::slog!("[crdt_state] write_item_str: unknown stage '{stage}' for {story_id}"); return; }; - write_item( - story_id, - &typed, - name, - agent, - retry_count, - depends_on, - merged_at, - ); + write_item(story_id, &typed, name, agent, depends_on, merged_at); } -/// Set `retry_count` to an explicit value for a pipeline item. +/// Set `retries` to an explicit value for a pipeline item via a Stage transition. /// -/// Pure metadata operation — the item's stage is not changed. -/// Call `set_retry_count(story_id, 0)` to reset the counter after a -/// stage transition or an explicit unblock. +/// Reads the current Stage from the CRDT, updates the `retries` field (only +/// meaningful for `Stage::Coding` and `Stage::Merge`), and writes back via +/// `write_item`. No-op for items not in a Coding or Merge stage. pub fn set_retry_count(story_id: &str, count: i64) { - let Some(state_mutex) = get_crdt() else { + let Some(item) = super::super::read::read_item(story_id) else { return; }; - let Ok(mut state) = state_mutex.lock() else { - return; + let new_stage = match item.stage().clone() { + Stage::Coding { + claim, + plan, + retries: _, + } => Stage::Coding { + claim, + plan, + retries: count.max(0) as u32, + }, + Stage::Merge { + feature_branch, + commits_ahead, + claim, + retries: _, + } => Stage::Merge { + feature_branch, + commits_ahead, + claim, + retries: count.max(0) as u32, + }, + _ => return, }; - if let Some(&idx) = state.index.get(story_id) { - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].retry_count.set(count as f64) - }); - } + write_item(story_id, &new_stage, None, None, None, None); } -/// Increment `retry_count` by 1 and return the new value. +/// Increment `retries` by 1 and return the new value. /// -/// Pure metadata operation — the item's stage is not changed. -/// Returns 0 if the item is not found in the CRDT (no-op in that case). -/// Use the returned value to decide whether the story should be blocked. +/// Reads the current Stage, increments the embedded `retries` field, and +/// writes back via `write_item`. Returns `0` if the item is not found or is +/// not in a Coding or Merge stage (no-op in that case). pub fn bump_retry_count(story_id: &str) -> i64 { - let Some(state_mutex) = get_crdt() else { + let Some(item) = super::super::read::read_item(story_id) else { return 0; }; - let Ok(mut state) = state_mutex.lock() else { - return 0; + let (new_stage, new_retries) = match item.stage().clone() { + Stage::Coding { + claim, + plan, + retries, + } => { + let n = retries + 1; + ( + Stage::Coding { + claim, + plan, + retries: n, + }, + n, + ) + } + Stage::Merge { + feature_branch, + commits_ahead, + claim, + retries, + } => { + let n = retries + 1; + ( + Stage::Merge { + feature_branch, + commits_ahead, + claim, + retries: n, + }, + n, + ) + } + _ => return 0, }; - let Some(&idx) = state.index.get(story_id) else { - return 0; - }; - let current = match state.crdt.doc.items[idx].retry_count.view() { - JsonValue::Number(n) => n as i64, - _ => 0, - }; - let new_count = current + 1; - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].retry_count.set(new_count as f64) - }); - new_count + write_item(story_id, &new_stage, None, None, None, None); + new_retries as i64 } diff --git a/server/src/crdt_state/write/migrations.rs b/server/src/crdt_state/write/migrations.rs index 72d158a5..f092f5b1 100644 --- a/server/src/crdt_state/write/migrations.rs +++ b/server/src/crdt_state/write/migrations.rs @@ -349,7 +349,6 @@ mod stage_migration_tests { None, None, None, - None, ); // Then overwrite the stage register with the raw legacy string, // bypassing `db::normalise_stage_str` / `write_item_str`'s mapping. @@ -373,6 +372,7 @@ mod stage_migration_tests { Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, ), ( @@ -390,6 +390,7 @@ mod stage_migration_tests { feature_branch: BranchName(String::new()), commits_ahead: NonZeroU32::new(1).unwrap(), claim: None, + retries: 0, }, ), ( @@ -458,12 +459,12 @@ mod stage_migration_tests { &Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, Some("Already Clean"), None, None, None, - None, ); seed_with_raw_stage("9521_needs_migration", "2_current"); diff --git a/server/src/crdt_state/write/tests.rs b/server/src/crdt_state/write/tests.rs index cb018229..1d366781 100644 --- a/server/src/crdt_state/write/tests.rs +++ b/server/src/crdt_state/write/tests.rs @@ -97,7 +97,6 @@ fn migrate_story_ids_to_numeric_rewrites_slug_ids() { None, None, None, - None, ); let result = migrate_story_ids_to_numeric(); @@ -120,15 +119,7 @@ fn migrate_story_ids_to_numeric_rewrites_slug_ids() { fn migrate_story_ids_to_numeric_is_idempotent() { init_for_test(); - write_item_str( - "43", - "1_backlog", - Some("Already Numeric"), - None, - None, - None, - None, - ); + write_item_str("43", "1_backlog", Some("Already Numeric"), None, None, None); // First call — nothing to migrate. let r1 = migrate_story_ids_to_numeric(); @@ -154,17 +145,8 @@ fn migrate_story_ids_to_numeric_skips_conflict() { None, None, None, - None, - ); - write_item_str( - "44", - "2_current", - Some("Foo numeric"), - None, - None, - None, - None, ); + write_item_str("44", "2_current", Some("Foo numeric"), None, None, None); let result = migrate_story_ids_to_numeric(); // The slug entry must NOT be migrated because "44" is already occupied. @@ -195,7 +177,6 @@ fn migrate_story_ids_to_numeric_preserves_stage_and_name() { Some("coder-1"), None, None, - None, ); migrate_story_ids_to_numeric(); @@ -214,15 +195,7 @@ fn migrate_names_from_slugs_fills_empty_names() { init_for_test(); // Write an item without a name. - write_item_str( - "42_story_my_feature", - "1_backlog", - None, - None, - None, - None, - None, - ); + write_item_str("42_story_my_feature", "1_backlog", None, None, None, None); // Before migration: nameless item is filtered by read_item (AC 5). assert!( @@ -251,7 +224,6 @@ fn migrate_names_from_slugs_leaves_existing_names_unchanged() { None, None, None, - None, ); migrate_names_from_slugs(); @@ -285,7 +257,6 @@ fn set_depends_on_round_trip_and_clear() { None, None, None, - None, ); // Set depends_on to [837] and verify CRDT register holds the list. @@ -338,7 +309,6 @@ fn set_agent_some_writes_name() { None, None, None, - None, ); let found = set_agent( @@ -366,7 +336,6 @@ fn set_agent_none_clears_register() { Some("coder-2"), None, None, - None, ); // Confirm agent is set. @@ -412,7 +381,6 @@ fn set_qa_mode_round_trip_server_then_human() { None, None, None, - None, ); // Set qa=server via typed path and assert CRDT register reflects it. @@ -465,7 +433,6 @@ fn set_qa_mode_round_trip_all_variants() { None, None, None, - None, ); for mode in [QaMode::Server, QaMode::Agent, QaMode::Human] { @@ -501,7 +468,6 @@ fn bump_retry_count_increments_by_one() { None, None, None, - None, ); let v1 = bump_retry_count("9001_story_bump_test"); @@ -511,7 +477,11 @@ fn bump_retry_count_increments_by_one() { assert_eq!(v2, 2, "second bump should return 2"); let item = read_item("9001_story_bump_test").expect("item must exist"); - assert_eq!(item.retry_count, 2u32, "CRDT must reflect final bump value"); + assert_eq!( + item.retry_count(), + 2u32, + "CRDT must reflect final bump value" + ); } #[test] @@ -522,7 +492,6 @@ fn set_retry_count_resets_to_zero() { "2_current", Some("Set Test"), None, - Some(5), None, None, ); @@ -530,7 +499,11 @@ fn set_retry_count_resets_to_zero() { set_retry_count("9002_story_set_test", 0); let item = read_item("9002_story_set_test").expect("item must exist"); - assert_eq!(item.retry_count, 0u32, "set_retry_count(0) must reset to 0"); + assert_eq!( + item.retry_count(), + 0u32, + "set_retry_count(0) must reset to 0" + ); } #[test] @@ -701,7 +674,6 @@ async fn tombstone_survives_concurrent_writes() { None, None, None, - None, ); assert!( read_item(story_id).is_some(), @@ -720,7 +692,6 @@ async fn tombstone_survives_concurrent_writes() { None, None, None, - None, ); tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 9244dcef..286c290d 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -377,7 +377,6 @@ mod tests { let meta = ItemMeta { name: Some("Typed Name".into()), agent: Some("coder-1".into()), - retry_count: Some(2), depends_on: Some(vec![100, 200]), }; write_item_with_content(story_id, "2_current", content, meta); @@ -386,7 +385,7 @@ mod tests { assert_eq!(view.stage().dir_name(), "coding"); assert_eq!(view.name(), "Typed Name"); assert_eq!(view.agent(), Some(crate::config::AgentName::Coder1)); - assert_eq!(view.retry_count(), 2); + assert_eq!(view.retry_count(), 0); assert_eq!(view.depends_on(), &[100, 200]); // Content is stored verbatim (no parsing, no rewrite). @@ -461,18 +460,14 @@ mod tests { "2_current", Some("Retry reset test"), None, - Some(3), None, None, ); - write_content( - ContentKey::Story(story_id), - "---\nname: Retry reset test\nretry_count: 3\n---\n", - ); + crate::crdt_state::set_retry_count(story_id, 3); let typed = crate::pipeline_state::read_typed(story_id) .expect("read should succeed") .expect("story exists in CRDT"); - assert_eq!(typed.retry_count, 3); + assert_eq!(typed.retry_count(), 3); // Promote to 4_merge. retry_count must reset. move_item_stage(story_id, "4_merge", None); @@ -482,7 +477,8 @@ mod tests { .expect("story exists in CRDT"); assert_eq!(typed_after.stage.dir_name(), "merge"); assert_eq!( - typed_after.retry_count, 0, + typed_after.retry_count(), + 0, "retry_count must reset to 0 on stage transition" ); } diff --git a/server/src/db/ops.rs b/server/src/db/ops.rs index 3f473122..0ba9817f 100644 --- a/server/src/db/ops.rs +++ b/server/src/db/ops.rs @@ -17,7 +17,6 @@ use super::shadow_write::{PIPELINE_DB, PipelineWriteMsg}; pub struct ItemMeta { pub name: Option, pub agent: Option, - pub retry_count: Option, pub depends_on: Option>, } @@ -91,7 +90,6 @@ pub fn write_item_with_content(story_id: &str, stage: &str, content: &str, meta: &typed_stage, meta.name.as_deref(), meta.agent.as_deref(), - meta.retry_count, depends_on_json.as_deref(), merged_at_ts, ); @@ -103,7 +101,7 @@ pub fn write_item_with_content(story_id: &str, stage: &str, content: &str, meta: stage: stage.to_string(), name: meta.name, agent: meta.agent, - retry_count: meta.retry_count, + retry_count: None, depends_on: depends_on_json, content: Some(content.to_string()), }; @@ -146,7 +144,7 @@ pub fn move_item_stage( }; let merged_at_ts = matches!(typed_stage, crate::pipeline_state::Stage::Done { .. }) .then(|| chrono::Utc::now().timestamp() as f64); - crate::crdt_state::write_item(story_id, &typed_stage, None, None, None, None, merged_at_ts); + crate::crdt_state::write_item(story_id, &typed_stage, None, None, None, merged_at_ts); // Bug 780: stage transitions reset retry_count to 0. retry_count tracks // attempts at THIS stage's work (coding, merging, qa); a fresh attempt at // a new stage is conceptually distinct from prior attempts at a different diff --git a/server/src/db/recover.rs b/server/src/db/recover.rs index f06cc59c..055a7ae3 100644 --- a/server/src/db/recover.rs +++ b/server/src/db/recover.rs @@ -349,7 +349,6 @@ mod tests { None, None, None, - None, ); assert!( crdt_state::read_item(old_id).is_none(), diff --git a/server/src/http/mcp/merge_tools.rs b/server/src/http/mcp/merge_tools.rs index 7264b98a..939f1235 100644 --- a/server/src/http/mcp/merge_tools.rs +++ b/server/src/http/mcp/merge_tools.rs @@ -304,7 +304,6 @@ mod tests { None, None, None, - None, ); let tmp = tempfile::tempdir().unwrap(); let ctx = test_ctx(tmp.path()); @@ -328,7 +327,6 @@ mod tests { None, None, None, - None, ); let tmp = tempfile::tempdir().unwrap(); let ctx = test_ctx(tmp.path()); diff --git a/server/src/http/workflow/pipeline.rs b/server/src/http/workflow/pipeline.rs index 6f52028b..c7764229 100644 --- a/server/src/http/workflow/pipeline.rs +++ b/server/src/http/workflow/pipeline.rs @@ -132,8 +132,8 @@ pub fn load_pipeline_state(ctx: &AppContext) -> Result { agent, review_hold, qa, - retry_count: if item.retry_count > 0 { - Some(item.retry_count) + retry_count: if item.retry_count() > 0 { + Some(item.retry_count()) } else { None }, @@ -258,6 +258,7 @@ pub fn load_upcoming_stories(_ctx: &AppContext) -> Result, St .map(|item| { let sid = &item.story_id.0; let epic_id = crate::crdt_state::read_item(sid).and_then(|v| v.epic()); + let item_retry_count = item.retry_count(); UpcomingStory { story_id: item.story_id.0.clone(), name: item.name, @@ -266,8 +267,8 @@ pub fn load_upcoming_stories(_ctx: &AppContext) -> Result, St agent: None, review_hold: None, qa: None, - retry_count: if item.retry_count > 0 { - Some(item.retry_count) + retry_count: if item_retry_count > 0 { + Some(item_retry_count) } else { None }, diff --git a/server/src/io/watcher/tests.rs b/server/src/io/watcher/tests.rs index a4f4b5f8..bbf25035 100644 --- a/server/src/io/watcher/tests.rs +++ b/server/src/io/watcher/tests.rs @@ -118,6 +118,7 @@ fn stage_metadata_returns_correct_actions() { &Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, "42_story_foo", ); @@ -242,7 +243,6 @@ fn sweep_uses_crdt_merged_at_not_utc_now() { Some("merged_at test"), None, None, - None, Some(ten_seconds_ago), ); @@ -314,7 +314,6 @@ fn sweep_keeps_item_newer_than_retention() { Some("recent merged_at test"), None, None, - None, Some(one_second_ago), ); diff --git a/server/src/pipeline_state/events.rs b/server/src/pipeline_state/events.rs index 82252353..a1533fb2 100644 --- a/server/src/pipeline_state/events.rs +++ b/server/src/pipeline_state/events.rs @@ -152,6 +152,7 @@ mod tests { after: Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, event: PipelineEvent::DepsMet, at: Utc::now(), @@ -172,6 +173,7 @@ mod tests { feature_branch: BranchName("feature/story-1".into()), commits_ahead: NonZeroU32::new(3).unwrap(), claim: None, + retries: 0, }; // Stage::Merge has exactly two fields: feature_branch and commits_ahead. // There is no way to attach an agent name to it. The type system diff --git a/server/src/pipeline_state/projection.rs b/server/src/pipeline_state/projection.rs index 71f8839b..322b4fbd 100644 --- a/server/src/pipeline_state/projection.rs +++ b/server/src/pipeline_state/projection.rs @@ -52,14 +52,11 @@ impl TryFrom<&PipelineItemView> for PipelineItem { .map(|d| StoryId(d.to_string())) .collect(); - let retry_count = view.retry_count(); - Ok(PipelineItem { story_id, name, stage: view.stage().clone(), depends_on, - retry_count, }) } } @@ -117,7 +114,6 @@ mod tests { stage, name.unwrap_or("(unnamed)"), None, - 0u32, vec![], None, None, @@ -139,7 +135,6 @@ mod tests { Stage::Backlog, "Test Story", None, - 0u32, vec![10, 20], None, None, @@ -150,7 +145,6 @@ mod tests { assert_eq!(item.name, "Test Story"); assert!(matches!(item.stage, Stage::Backlog)); assert_eq!(item.depends_on.len(), 2); - assert_eq!(item.retry_count, 0); } #[test] @@ -160,10 +154,10 @@ mod tests { Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 2, }, "Test", Some(crate::config::AgentName::Coder1), - 2u32, vec![], None, None, @@ -171,7 +165,7 @@ mod tests { ); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Coding { .. })); - assert_eq!(item.retry_count, 2); + assert_eq!(item.retry_count(), 2); } #[test] @@ -182,6 +176,7 @@ mod tests { feature_branch: fb("feature/story-42_story_test"), commits_ahead: nz(1), claim: None, + retries: 0, }, Some("Test"), ); @@ -223,7 +218,6 @@ mod tests { }, "Test", None, - 0u32, vec![], None, None, @@ -249,7 +243,6 @@ mod tests { }, "Test", None, - 0u32, vec![], None, None, @@ -273,6 +266,7 @@ mod tests { resume_to: Box::new(Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }), }, Some("Frozen Story"), @@ -308,6 +302,7 @@ mod tests { Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, Some("Test"), ); @@ -328,6 +323,7 @@ mod tests { Stage::Coding { claim: None, plan: PlanState::Drafted, + retries: 0, }, Some("Test"), ); @@ -348,6 +344,7 @@ mod tests { Stage::Coding { claim: None, plan: PlanState::Confirmed, + retries: 0, }, Some("Test"), ); diff --git a/server/src/pipeline_state/tests.rs b/server/src/pipeline_state/tests.rs index 03002c1b..86750af3 100644 --- a/server/src/pipeline_state/tests.rs +++ b/server/src/pipeline_state/tests.rs @@ -55,6 +55,7 @@ fn happy_path_with_qa() { let s = Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }; let s = transition(s, PipelineEvent::GatesStarted).unwrap(); assert!(matches!(s, Stage::Qa)); @@ -75,6 +76,7 @@ fn qa_retry_loop() { let s = Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }; let s = transition(s, PipelineEvent::GatesStarted).unwrap(); assert!(matches!(s, Stage::Qa)); @@ -164,6 +166,7 @@ fn cannot_accept_from_coding() { Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, PipelineEvent::Accepted, ); @@ -182,6 +185,7 @@ fn block_from_any_active_stage() { Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, Stage::Qa, ] { @@ -198,6 +202,7 @@ fn block_from_any_active_stage() { feature_branch: fb("f"), commits_ahead: nz(1), claim: None, + retries: 0, }; let result = transition( m, @@ -274,6 +279,7 @@ fn abandon_from_any_active_or_done() { Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, Stage::Qa, Stage::Done { @@ -293,6 +299,7 @@ fn supersede_from_any_active_or_done() { Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, Stage::Qa, Stage::Done { @@ -322,6 +329,7 @@ fn review_hold_from_active_stages() { Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, Stage::Qa, ] { @@ -350,6 +358,7 @@ fn merge_failed_final() { feature_branch: fb("f"), commits_ahead: nz(1), claim: None, + retries: 0, }; let result = transition( s, @@ -373,6 +382,7 @@ fn merge_failed_only_from_merge() { Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, PipelineEvent::MergeFailedFinal { reason: "conflicts".into(), @@ -451,6 +461,7 @@ fn bug_502_agent_not_in_stage() { feature_branch: BranchName("feature/story-1".into()), commits_ahead: NonZeroU32::new(3).unwrap(), claim: None, + retries: 0, }; // Stage::Merge has exactly two fields: feature_branch and commits_ahead. // There is no way to attach an agent name to it. The type system @@ -523,6 +534,7 @@ fn reject_from_active_stages() { Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, Stage::Qa, ] { @@ -539,6 +551,7 @@ fn reject_from_active_stages() { feature_branch: fb("f"), commits_ahead: nz(1), claim: None, + retries: 0, }; let result = transition( m, @@ -931,6 +944,7 @@ fn merge_aborted_returns_to_coding() { feature_branch: fb("feature/story-73"), commits_ahead: nz(2), claim: None, + retries: 0, }; let result = transition(s, PipelineEvent::MergeAborted).unwrap(); assert!( @@ -1034,12 +1048,14 @@ fn hotfix_requested_rejected_from_non_done_stages() { Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, Stage::Qa, Stage::Merge { feature_branch: fb("feature/story-1"), commits_ahead: nz(1), claim: None, + retries: 0, }, ] { let result = transition(stage.clone(), PipelineEvent::HotfixRequested); @@ -1064,6 +1080,7 @@ fn audit_entry_backlog_to_coding_exact_format() { after: Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, event: PipelineEvent::DepsMet, at, @@ -1083,6 +1100,7 @@ fn audit_entry_is_single_line_with_all_fields() { feature_branch: fb("feature/story-42"), commits_ahead: nz(3), claim: None, + retries: 0, }, event: PipelineEvent::GatesPassed { feature_branch: fb("feature/story-42"), @@ -1120,6 +1138,7 @@ fn audit_entry_merge_to_done() { feature_branch: fb("f"), commits_ahead: nz(1), claim: None, + retries: 0, }, after: Stage::Done { merged_at: chrono::Utc::now(), @@ -1167,6 +1186,7 @@ fn audit_entry_coding_to_blocked() { before: Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, after: Stage::Blocked { reason: "waiting".into(), @@ -1192,6 +1212,7 @@ fn audit_entry_blocked_to_coding() { after: Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, event: PipelineEvent::Unblock, at: chrono::Utc::now(), @@ -1210,6 +1231,7 @@ fn audit_entry_merge_to_merge_failure() { feature_branch: fb("f"), commits_ahead: nz(1), claim: None, + retries: 0, }, after: Stage::MergeFailure { kind: MergeFailureKind::Other("conflicts".into()), @@ -1234,11 +1256,13 @@ fn audit_entry_coding_to_frozen() { before: Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, after: Stage::Frozen { resume_to: Box::new(Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }), }, event: PipelineEvent::Freeze, @@ -1257,6 +1281,7 @@ fn audit_entry_coding_to_abandoned() { before: Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }, after: Stage::Abandoned { ts: chrono::Utc::now(), diff --git a/server/src/pipeline_state/transition.rs b/server/src/pipeline_state/transition.rs index 12035258..811f7ee3 100644 --- a/server/src/pipeline_state/transition.rs +++ b/server/src/pipeline_state/transition.rs @@ -152,6 +152,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Coding { claim: None, plan: PlanState::Missing, + retries: 0, }), (Coding { .. }, GatesStarted) => Ok(Qa), ( @@ -164,6 +165,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Result Ok(Coding { claim: None, plan: PlanState::Missing, + retries: 0, }), (Merge { .. }, MergeSucceeded { merge_commit }) => Ok(Done { merged_at: now, @@ -323,6 +327,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Coding { claim: None, plan: PlanState::Missing, + retries: 0, }), // ── FixupRequested: MergeFailureFinal → Coding (operator override) @@ -335,6 +340,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Coding { claim: None, plan: PlanState::Missing, + retries: 0, }), // ── ReQueuedForQa: MergeFailure → Qa (re-review) ──────────────── @@ -344,6 +350,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Coding { claim: None, plan: PlanState::Missing, + retries: 0, }), // ── HotfixRequested: Done → Coding (post-merge hotfix) ─────────── @@ -353,6 +360,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Coding { claim: None, plan: PlanState::Missing, + retries: 0, }), // ── MergemasterAttempted: MergeFailure → MergeFailureFinal ───── @@ -367,6 +375,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Coding { claim: None, plan: PlanState::Missing, + retries: 0, }), // ── Unblock MergeFailure → Merge (re-attempt) ──────────────────── @@ -384,6 +393,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result, plan: PlanState, + /// Number of coder restarts for this item. Zero on the first attempt. + retries: u32, }, /// Coder has run; gates are running. @@ -225,11 +230,16 @@ pub enum Stage { /// `commits_ahead: NonZeroU32` makes "Merge with nothing to merge" /// structurally impossible (eliminates bug 519). The optional /// [`AgentClaim`] carries the mergemaster agent that owns this merge. + /// + /// `retries` counts how many times the mergemaster agent has been restarted + /// for this item. Replaces the separate `retry_count` CRDT register (story 997). Merge { feature_branch: BranchName, commits_ahead: NonZeroU32, /// Agent currently running the merge, or `None` when unclaimed. claim: Option, + /// Number of mergemaster restarts for this item. Zero on the first attempt. + retries: u32, }, /// Mergemaster squashed to master. Always carries merge metadata. @@ -350,6 +360,7 @@ impl Stage { "coding" => Some(Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }), "blocked" => Some(Stage::Blocked { reason: String::new(), @@ -359,6 +370,7 @@ impl Stage { feature_branch: BranchName(String::new()), commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), claim: None, + retries: 0, }), "merge_failure" => Some(Stage::MergeFailure { kind: MergeFailureKind::Other(String::new()), @@ -372,12 +384,14 @@ impl Stage { resume_to: Box::new(Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }), }), "review_hold" => Some(Stage::ReviewHold { resume_to: Box::new(Stage::Coding { claim: None, plan: PlanState::Missing, + retries: 0, }), reason: String::new(), }), @@ -438,13 +452,26 @@ pub enum ExecutionState { // ── Pipeline item (the aggregate) ─────────────────────────────────────────── /// A fully typed pipeline item. Every field is validated by construction. +/// +/// The retry count is no longer a top-level field — callers read it from the +/// Stage variant (`Stage::Coding { retries }` / `Stage::Merge { retries }`). #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct PipelineItem { pub story_id: StoryId, pub name: String, pub stage: Stage, pub depends_on: Vec, - pub retry_count: u32, +} + +impl PipelineItem { + /// Returns the retry count embedded in the stage payload. + pub fn retry_count(&self) -> u32 { + match &self.stage { + Stage::Coding { retries, .. } => *retries, + Stage::Merge { retries, .. } => *retries, + _ => 0, + } + } } // ── Transition errors ─────────────────────────────────────────────────────── diff --git a/server/src/service/agents/mod.rs b/server/src/service/agents/mod.rs index 89d9cdce..d24d0945 100644 --- a/server/src/service/agents/mod.rs +++ b/server/src/service/agents/mod.rs @@ -149,6 +149,7 @@ pub fn get_work_item_content( Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, ), ("3_qa", Stage::Qa), @@ -331,7 +332,6 @@ max_budget_usd = 5.0 None, None, None, - None, ); let item = get_work_item_content(tmp.path(), "42_story_foo").unwrap(); assert!(item.content.contains("Some content.")); diff --git a/server/src/service/notifications/format.rs b/server/src/service/notifications/format.rs index e50f8641..467cbdc6 100644 --- a/server/src/service/notifications/format.rs +++ b/server/src/service/notifications/format.rs @@ -256,7 +256,8 @@ mod tests { assert_eq!( stage_display_name(&Stage::Coding { claim: None, - plan: Default::default() + plan: Default::default(), + retries: 0, }), "Current" ); @@ -299,6 +300,7 @@ mod tests { &Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, ); assert!(!plain.contains("\u{1f389}")); @@ -313,6 +315,7 @@ mod tests { &Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, ); assert_eq!( @@ -333,6 +336,7 @@ mod tests { &Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, &Stage::Qa, ); @@ -359,6 +363,7 @@ mod tests { &Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, &Stage::Qa, ); @@ -373,6 +378,7 @@ mod tests { &Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }, &Stage::Qa, ); diff --git a/server/src/service/work_item/assign.rs b/server/src/service/work_item/assign.rs index 96782ff9..03e8fe67 100644 --- a/server/src/service/work_item/assign.rs +++ b/server/src/service/work_item/assign.rs @@ -66,7 +66,6 @@ mod tests { None, None, None, - None, ); let tmp = tempfile::tempdir().unwrap(); @@ -105,7 +104,6 @@ mod tests { None, None, None, - None, ); } diff --git a/server/src/service/work_item/delete.rs b/server/src/service/work_item/delete.rs index 4f83addd..02a40b60 100644 --- a/server/src/service/work_item/delete.rs +++ b/server/src/service/work_item/delete.rs @@ -203,7 +203,6 @@ mod tests { None, None, None, - None, ); // Seed content store. diff --git a/server/src/worktree/cleanup.rs b/server/src/worktree/cleanup.rs index e34612eb..d36c2225 100644 --- a/server/src/worktree/cleanup.rs +++ b/server/src/worktree/cleanup.rs @@ -319,6 +319,7 @@ mod tests { Some(Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }) } else { None diff --git a/server/src/worktree/sweep.rs b/server/src/worktree/sweep.rs index 7ea2d9ba..a2be93f2 100644 --- a/server/src/worktree/sweep.rs +++ b/server/src/worktree/sweep.rs @@ -180,6 +180,7 @@ mod tests { assert!(!worktree_should_be_swept(Some(&Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }))); } @@ -194,6 +195,7 @@ mod tests { feature_branch: crate::pipeline_state::BranchName("feature/x".to_string()), commits_ahead: NonZeroU32::new(1).unwrap(), claim: None, + retries: 0, }; assert!(!worktree_should_be_swept(Some(&stage))); } @@ -306,6 +308,7 @@ mod tests { Some(Stage::Coding { claim: None, plan: Default::default(), + retries: 0, }) } else { None @@ -376,6 +379,7 @@ mod tests { ), commits_ahead: NonZeroU32::new(1).unwrap(), claim: None, + retries: 0, }) } else { None