huskies: merge 1009

This commit is contained in:
dave
2026-05-13 22:50:13 +00:00
parent a5cd3a2152
commit 4e007bb770
56 changed files with 453 additions and 384 deletions
+30 -12
View File
@@ -78,39 +78,52 @@ mod tests {
#[test]
#[allow(clippy::string_slice)] // stale_holder is a hex/ASCII string literal; [..12] always valid
fn stale_claim_displaced_and_logged() {
use crate::crdt_state::{
init_for_test, our_node_id, read_item, write_claim, write_item_str,
};
use crate::crdt_state::{init_for_test, our_node_id, read_item, write_claim, write_item};
use crate::pipeline_state::{AgentClaim, AgentName, Stage};
use chrono::TimeZone;
init_for_test();
let story_id = "718_test_stale_displacement";
let stale_holder = "staledeadbeef0000000000000000000000000000";
// Place claimed_at well beyond the TTL so the claim is unambiguously stale.
let stale_time = chrono::Utc::now().timestamp() as f64 - CLAIM_TIMEOUT_SECS - 300.0;
let stale_time = chrono::Utc::now().timestamp() as u64 - CLAIM_TIMEOUT_SECS as u64 - 300;
// Seed the story with a stale claim from a foreign node.
write_item_str(
write_item(
story_id,
"2_current",
&Stage::Coding {
claim: Some(AgentClaim {
agent: AgentName(stale_holder.to_string()),
claimed_at: chrono::Utc
.timestamp_opt(stale_time as i64, 0)
.single()
.unwrap(),
}),
},
Some("Stale Claim Displacement Test"),
None,
None,
None,
Some(stale_holder),
Some(stale_time),
None,
);
// Confirm the stale claim is in place.
let before = read_item(story_id).expect("item should exist");
let before_claim = match before.stage() {
Stage::Coding { claim } => claim.as_ref(),
Stage::Merge { claim, .. } => claim.as_ref(),
_ => None,
};
assert_eq!(
before.claim().map(|c| c.node.as_str()),
before_claim.map(|c| c.agent.0.as_str()),
Some(stale_holder),
"pre-condition: item should be claimed by the stale holder"
);
let age = chrono::Utc::now().timestamp() as f64
- before.claim().map(|c| c.at as f64).unwrap_or(0.0);
- before_claim
.map(|c| c.claimed_at.timestamp() as f64)
.unwrap_or(0.0);
assert!(
age >= CLAIM_TIMEOUT_SECS,
"pre-condition: claim age ({age}s) must exceed TTL ({CLAIM_TIMEOUT_SECS}s)"
@@ -136,13 +149,18 @@ mod tests {
// Verify the new claim belongs to this node, not the stale holder.
let our_id = our_node_id().expect("node id should be available after init_for_test");
let after = read_item(story_id).expect("item should still exist");
let after_claim = match after.stage() {
Stage::Coding { claim } => claim.as_ref(),
Stage::Merge { claim, .. } => claim.as_ref(),
_ => None,
};
assert_eq!(
after.claim().map(|c| c.node.as_str()),
after_claim.map(|c| c.agent.0.as_str()),
Some(our_id.as_str()),
"new claim should have displaced the stale holder"
);
assert_ne!(
after.claim().map(|c| c.node.as_str()),
after_claim.map(|c| c.agent.0.as_str()),
Some(stale_holder),
"stale holder must no longer own the claim"
);
+22 -10
View File
@@ -44,7 +44,7 @@ pub(super) async fn scan_and_claim(
// Only claim stories in execution stages (Coding, Qa, Merge).
if !matches!(
item.stage(),
crate::pipeline_state::Stage::Coding
crate::pipeline_state::Stage::Coding { .. }
| crate::pipeline_state::Stage::Qa
| crate::pipeline_state::Stage::Merge { .. }
) {
@@ -65,19 +65,25 @@ pub(super) async fn scan_and_claim(
continue;
}
let item_claim = match item.stage() {
crate::pipeline_state::Stage::Coding { claim } => claim.as_ref(),
crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(),
_ => None,
};
// If already claimed by us, skip.
if item.claim().is_some_and(|c| c.node == our_node) {
if item_claim.is_some_and(|c| c.agent.0 == our_node) {
continue;
}
// If claimed by another node, respect the claim while it is fresh.
// Once the TTL expires the claim is considered stale regardless of
// whether the holder appears alive — displacement is purely TTL-driven.
if let Some(claim) = item.claim()
&& claim.node != our_node
if let Some(claim) = item_claim
&& claim.agent.0 != our_node
{
let now = chrono::Utc::now().timestamp() as u64;
let age = now.saturating_sub(claim.at) as f64;
let age = now.saturating_sub(claim.claimed_at.timestamp() as u64) as f64;
if age < CLAIM_TIMEOUT_SECS {
// Claim is still fresh — respect it.
continue;
@@ -87,7 +93,7 @@ pub(super) async fn scan_and_claim(
"[agent-mode] Displacing stale claim on '{}' held by {:.12}… \
(age {}s > TTL {}s)",
item.story_id(),
claim.node,
claim.agent.0,
age as u64,
CLAIM_TIMEOUT_SECS as u64,
);
@@ -179,7 +185,7 @@ pub(super) fn reclaim_timed_out_work(_project_root: &Path) {
for item in &items {
if !matches!(
item.stage(),
crate::pipeline_state::Stage::Coding
crate::pipeline_state::Stage::Coding { .. }
| crate::pipeline_state::Stage::Qa
| crate::pipeline_state::Stage::Merge { .. }
) {
@@ -189,13 +195,19 @@ pub(super) fn reclaim_timed_out_work(_project_root: &Path) {
// Release the claim if the TTL has expired — regardless of whether the
// holder is still alive. A node actively working should refresh its
// claim before the TTL window closes.
if let Some(claim) = item.claim() {
let age = now as u64 - claim.at.min(now as u64);
let reclaim_claim = match item.stage() {
crate::pipeline_state::Stage::Coding { claim } => claim.as_ref(),
crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(),
_ => None,
};
if let Some(claim) = reclaim_claim {
let claim_ts = claim.claimed_at.timestamp() as u64;
let age = now as u64 - claim_ts.min(now as u64);
if age as f64 >= CLAIM_TIMEOUT_SECS {
slog!(
"[agent-mode] Releasing stale claim on '{}' held by {:.12}… (age {}s)",
item.story_id(),
claim.node,
claim.agent.0,
age,
);
crdt_state::release_claim(item.story_id());
+8 -8
View File
@@ -122,7 +122,7 @@ pub fn move_story_to_done(story_id: &str) -> Result<(), String> {
merge_commit: GitSha("accepted".to_string()),
},
Stage::MergeFailure { .. } => PipelineEvent::Accepted,
Stage::Coding | Stage::Qa | Stage::Backlog => PipelineEvent::Close,
Stage::Coding { .. } | Stage::Qa | Stage::Backlog => PipelineEvent::Close,
_ => {
return Err(format!(
"Work item '{story_id}' is in {} — cannot move to done.",
@@ -160,7 +160,7 @@ pub fn move_story_to_merge(story_id: &str) -> Result<(), String> {
let commits = NonZeroU32::new(1).expect("1 is non-zero");
let event = match &item.stage {
Stage::Coding => PipelineEvent::QaSkipped {
Stage::Coding { .. } => PipelineEvent::QaSkipped {
feature_branch: branch,
commits_ahead: commits,
},
@@ -383,8 +383,8 @@ fn map_stage_move_to_event(
match (from, target) {
(Stage::Upcoming, "backlog") => Ok(PipelineEvent::Triage),
(Stage::Backlog, "current") => Ok(PipelineEvent::DepsMet),
(Stage::Coding, "qa") => Ok(PipelineEvent::GatesStarted),
(Stage::Coding, "merge") => Ok(PipelineEvent::QaSkipped {
(Stage::Coding { .. }, "qa") => Ok(PipelineEvent::GatesStarted),
(Stage::Coding { .. }, "merge") => Ok(PipelineEvent::QaSkipped {
feature_branch: branch(),
commits_ahead: nz1(),
}),
@@ -392,7 +392,7 @@ fn map_stage_move_to_event(
feature_branch: branch(),
commits_ahead: nz1(),
}),
(Stage::Coding, "backlog")
(Stage::Coding { .. }, "backlog")
| (Stage::Qa, "backlog")
| (Stage::Merge { .. }, "backlog")
| (Stage::Blocked { .. }, "backlog") => Ok(PipelineEvent::Demote),
@@ -402,7 +402,7 @@ fn map_stage_move_to_event(
(Stage::Merge { .. }, "done") => Ok(PipelineEvent::MergeSucceeded {
merge_commit: GitSha("manual".to_string()),
}),
(Stage::Coding | Stage::Qa | Stage::Backlog, "done") => Ok(PipelineEvent::Close),
(Stage::Coding { .. } | Stage::Qa | Stage::Backlog, "done") => Ok(PipelineEvent::Close),
(Stage::Blocked { .. }, "current") => Ok(PipelineEvent::Unblock),
// Story 919: MergeFailure + Unblock goes to Merge (re-attempt); manual
// demotion to backlog uses Demote to park it without a retry.
@@ -530,7 +530,7 @@ fn stage_to_name(s: &Stage) -> &'static str {
match s {
Stage::Upcoming => "upcoming",
Stage::Backlog => "backlog",
Stage::Coding => "current",
Stage::Coding { .. } => "current",
Stage::Blocked { .. } => "blocked",
Stage::Qa => "qa",
Stage::Merge { .. } => "merge",
@@ -714,7 +714,7 @@ mod tests {
"should return to coding after unblock"
);
assert!(
matches!(item.stage, Stage::Coding),
matches!(item.stage, Stage::Coding { .. }),
"stage should be Stage::Coding after unblock"
);
}
@@ -38,6 +38,7 @@ impl AgentPool {
let merge_stage = Stage::Merge {
feature_branch: BranchName(String::new()),
commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"),
claim: None,
};
let merge_items = scan_stage_items(&merge_stage);
for story_id in &merge_items {
@@ -160,6 +160,7 @@ mod tests {
before: crate::pipeline_state::Stage::Merge {
feature_branch: BranchName("feature/test".to_string()),
commits_ahead: NonZeroU32::new(1).unwrap(),
claim: None,
},
after: crate::pipeline_state::Stage::MergeFailure {
kind: kind.clone(),
@@ -27,7 +27,7 @@ impl AgentPool {
/// here as well.
pub(super) async fn assign_pipeline_stages(&self, project_root: &Path, config: &ProjectConfig) {
let stages: [(Stage, PipelineStage); 2] = [
(Stage::Coding, PipelineStage::Coder),
(Stage::Coding { claim: None }, PipelineStage::Coder),
(Stage::Qa, PipelineStage::Qa),
];
@@ -149,7 +149,7 @@ impl AgentPool {
stage.dir_name()
);
if matches!(stage, Stage::Coding) {
if matches!(stage, Stage::Coding { .. }) {
// Coder stage — determine qa mode to decide next step.
let qa_mode = {
let item_type = crate::agents::lifecycle::item_type_from_id(story_id);
+1 -1
View File
@@ -220,7 +220,7 @@ mod tests {
crate::db::ItemMeta::named("baz"),
);
let items = scan_stage_items(&Stage::Coding);
let items = scan_stage_items(&Stage::Coding { claim: None });
// The global CRDT may contain items from other tests, so check
// that our three items are present and appear in sorted order.
assert!(
@@ -97,8 +97,6 @@ mod tests {
None,
None,
None,
None,
None,
);
assert!(has_review_hold("890_spike_held"));
}
@@ -115,8 +113,6 @@ mod tests {
None,
None,
None,
None,
None,
);
assert!(!has_review_hold("890_spike_active_qa"));
}
@@ -193,8 +189,6 @@ mod tests {
None,
Some("[999]"),
None,
None,
None,
);
assert!(has_unmet_dependencies("10_story_blocked"));
}
@@ -210,8 +204,6 @@ mod tests {
None,
None,
None,
None,
None,
);
crate::crdt_state::write_item_str(
"10_story_ok",
@@ -221,8 +213,6 @@ mod tests {
None,
Some("[999]"),
None,
None,
None,
);
assert!(!has_unmet_dependencies("10_story_ok"));
}
@@ -238,8 +228,6 @@ mod tests {
None,
None,
None,
None,
None,
);
assert!(!has_unmet_dependencies("5_story_free"));
}
@@ -258,8 +246,6 @@ mod tests {
None,
None,
None,
None,
None,
);
crate::crdt_state::write_item_str(
"503_story_dependent",
@@ -269,8 +255,6 @@ mod tests {
None,
Some("[500]"),
None,
None,
None,
);
let archived_deps = check_archived_dependencies("503_story_dependent");
assert_eq!(archived_deps, vec![500]);
@@ -288,8 +272,6 @@ mod tests {
None,
None,
None,
None,
None,
);
crate::crdt_state::write_item_str(
"503_story_waiting",
@@ -299,8 +281,6 @@ mod tests {
None,
Some("[490]"),
None,
None,
None,
);
let archived_deps = check_archived_dependencies("503_story_waiting");
assert!(archived_deps.is_empty());
@@ -251,8 +251,6 @@ max_turns = 10
None,
None,
None,
None,
None,
);
// 12 turns in a single session exceeds the configured max of 10.
@@ -381,8 +379,6 @@ max_turns = 10
None,
None,
None,
None,
None,
);
// Prior session with 5 turns (under limit alone).
@@ -460,8 +456,6 @@ max_turns = 10
None,
None,
None,
None,
None,
);
// Session 1: exceeds limit → retry_count=1 in CRDT, NOT blocked.
@@ -294,8 +294,6 @@ stage = "coder"
None,
None,
None,
None,
None,
);
let pool = AgentPool::new_test(3011);
+1 -1
View File
@@ -35,7 +35,7 @@ pub(super) fn validate_agent_stage(
return Ok(());
};
let expected_stage = match story_stage {
Stage::Coding => PipelineStage::Coder,
Stage::Coding { .. } => PipelineStage::Coder,
Stage::Qa => PipelineStage::Qa,
Stage::Merge { .. } => PipelineStage::Mergemaster,
_ => PipelineStage::Other,
+2 -2
View File
@@ -30,7 +30,7 @@ pub(super) fn find_active_story_stage(
if let Ok(Some(item)) = crate::pipeline_state::read_typed(story_id)
&& matches!(
item.stage,
crate::pipeline_state::Stage::Coding
crate::pipeline_state::Stage::Coding { .. }
| crate::pipeline_state::Stage::Qa
| crate::pipeline_state::Stage::Merge { .. }
)
@@ -56,7 +56,7 @@ mod tests {
let tmp = tempfile::tempdir().unwrap();
assert!(matches!(
find_active_story_stage(tmp.path(), "10_story_test"),
Some(crate::pipeline_state::Stage::Coding)
Some(crate::pipeline_state::Stage::Coding { .. })
));
}
+1 -1
View File
@@ -22,7 +22,7 @@ pub(crate) fn spawn_worktree_create_subscriber(project_root: PathBuf, port: u16)
loop {
match rx.recv().await {
Ok(fired) => {
if matches!(fired.after, Stage::Coding) {
if matches!(fired.after, Stage::Coding { .. }) {
on_coding_transition(&project_root, port, &fired.story_id.0).await;
}
}
+10 -2
View File
@@ -90,7 +90,11 @@ mod tests {
fn backlog_shows_only_backlog_stage_items() {
let items = vec![
make_item("10_story_in_backlog", "In Backlog", Stage::Backlog),
make_item("20_story_in_progress", "In Progress", Stage::Coding),
make_item(
"20_story_in_progress",
"In Progress",
Stage::Coding { claim: None },
),
make_item("30_story_in_qa", "In QA", Stage::Qa),
];
let output = build_backlog_from_items(&items);
@@ -227,7 +231,11 @@ mod tests {
#[test]
fn backlog_shows_none_when_empty() {
let items = vec![make_item("1_story_done", "Done", Stage::Coding)];
let items = vec![make_item(
"1_story_done",
"Done",
Stage::Coding { claim: None },
)];
let output = build_backlog_from_items(&items);
assert!(
output.contains("*(none)*"),
+1 -1
View File
@@ -239,7 +239,7 @@ mod tests {
.expect("read_typed should succeed")
.expect("item should be present");
assert!(
matches!(item.stage, crate::pipeline_state::Stage::Coding),
matches!(item.stage, crate::pipeline_state::Stage::Coding { .. }),
"stage should be restored to Coding: {:?}",
item.stage
);
+1 -1
View File
@@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet};
pub(crate) fn display_section(s: &Stage) -> Option<&'static str> {
match s {
Stage::Upcoming | Stage::Backlog => Some("Backlog"),
Stage::Coding
Stage::Coding { .. }
| Stage::Blocked { .. }
| Stage::Archived {
reason: ArchiveReason::Blocked { .. },
+26 -13
View File
@@ -137,7 +137,7 @@ fn status_does_not_show_full_filename_stem() {
let items = vec![make_item(
"293_story_register_all_bot_commands",
"Register all bot commands",
Stage::Coding,
Stage::Coding { claim: None },
)];
let agents = AgentPool::new_test(3000);
@@ -163,7 +163,7 @@ fn status_shows_cost_when_token_usage_exists() {
let items = vec![make_item(
"293_story_register_all_bot_commands",
"Register all bot commands",
Stage::Coding,
Stage::Coding { claim: None },
)];
// Write token usage for this story.
@@ -199,7 +199,7 @@ fn status_no_cost_when_no_usage() {
let items = vec![make_item(
"293_story_register_all_bot_commands",
"Register all bot commands",
Stage::Coding,
Stage::Coding { claim: None },
)];
let agents = AgentPool::new_test(3000);
@@ -219,7 +219,7 @@ fn status_aggregates_multiple_records_per_story() {
let items = vec![make_item(
"293_story_register_all_bot_commands",
"Register all bot commands",
Stage::Coding,
Stage::Coding { claim: None },
)];
// Write two records for the same story — costs should be summed.
@@ -262,7 +262,7 @@ fn status_shows_waiting_on_for_story_with_unmet_deps() {
make_item_with_deps(
"10_story_waiting",
"Waiting Story",
Stage::Coding,
Stage::Coding { claim: None },
vec![999],
),
make_item("999_story_dep", "Dep Story", Stage::Backlog),
@@ -287,7 +287,7 @@ fn status_does_not_show_waiting_on_when_dep_is_done() {
make_item_with_deps(
"10_story_unblocked",
"Unblocked Story",
Stage::Coding,
Stage::Coding { claim: None },
vec![999],
),
make_item(
@@ -314,7 +314,11 @@ fn status_shows_no_waiting_info_when_no_deps() {
use tempfile::TempDir;
let tmp = TempDir::new().unwrap();
let items = vec![make_item("42_story_nodeps", "No Deps Story", Stage::Coding)];
let items = vec![make_item(
"42_story_nodeps",
"No Deps Story",
Stage::Coding { claim: None },
)];
let agents = AgentPool::new_test(3000);
let output = build_status_from_items(tmp.path(), &agents, &items);
@@ -377,7 +381,7 @@ fn stage_is_blocked_returns_true_for_archived_blocked() {
#[test]
fn stage_is_blocked_returns_false_for_coding() {
assert!(!matches!(
Stage::Coding,
Stage::Coding { claim: None },
Stage::Blocked { .. }
| Stage::MergeFailure { .. }
| Stage::MergeFailureFinal { .. }
@@ -413,7 +417,11 @@ fn status_shows_idle_dot_for_unassigned_story() {
use tempfile::TempDir;
let tmp = TempDir::new().unwrap();
let items = vec![make_item("42_story_idle", "Idle Story", Stage::Coding)];
let items = vec![make_item(
"42_story_idle",
"Idle Story",
Stage::Coding { claim: None },
)];
let agents = AgentPool::new_test(3000);
let output = build_status_from_items(tmp.path(), &agents, &items);
@@ -503,6 +511,7 @@ fn merge_stage() -> Stage {
Stage::Merge {
feature_branch: BranchName("feature/test".to_string()),
commits_ahead: std::num::NonZeroU32::new(1).unwrap(),
claim: None,
}
}
@@ -779,7 +788,11 @@ fn in_progress_count_includes_blocked_items() {
let tmp = TempDir::new().unwrap();
let items = vec![
make_item("10_story_coding", "Coding Story", Stage::Coding),
make_item(
"10_story_coding",
"Coding Story",
Stage::Coding { claim: None },
),
make_item(
"11_story_blocked",
"Blocked Story",
@@ -810,7 +823,7 @@ fn frozen_coding_item_appears_in_in_progress_section() {
"60_story_frozen",
"Frozen Coding Story",
Stage::Frozen {
resume_to: Box::new(Stage::Coding),
resume_to: Box::new(Stage::Coding { claim: None }),
},
)];
@@ -868,7 +881,7 @@ fn frozen_item_shows_snowflake_indicator() {
"80_story_frozen_flake",
"Frozen Flake Story",
Stage::Frozen {
resume_to: Box::new(Stage::Coding),
resume_to: Box::new(Stage::Coding { claim: None }),
},
)];
@@ -898,7 +911,7 @@ fn frozen_and_blocked_use_distinct_indicators() {
"91_story_frozen_ind",
"Frozen Story",
Stage::Frozen {
resume_to: Box::new(Stage::Coding),
resume_to: Box::new(Stage::Coding { claim: None }),
},
),
];
-6
View File
@@ -221,8 +221,6 @@ mod tests {
Some(5),
None,
None,
None,
None,
);
let output = unblock_cmd_with_root(tmp.path(), "9903").unwrap();
@@ -299,8 +297,6 @@ mod tests {
Some(5),
None,
None,
None,
None,
);
let output = unblock_cmd_with_root(tmp.path(), "9904").unwrap();
@@ -358,8 +354,6 @@ mod tests {
Some(3),
None,
None,
None,
None,
);
let output = unblock_cmd_with_root(tmp.path(), "9901").unwrap();
@@ -324,8 +324,6 @@ mod tests {
None,
None,
None,
None,
None,
);
let agents = std::sync::Arc::new(AgentPool::new_test(3000));
@@ -379,8 +377,6 @@ mod tests {
None,
None,
None,
None,
None,
);
let agents = std::sync::Arc::new(AgentPool::new_test(3000));
@@ -429,8 +425,6 @@ mod tests {
None,
None,
None,
None,
None,
);
let agents = std::sync::Arc::new(AgentPool::new_test(3000));
+1 -3
View File
@@ -108,7 +108,7 @@ fn stage_display_label(stage: &crate::pipeline_state::Stage) -> &'static str {
match stage {
Stage::Upcoming => "upcoming",
Stage::Backlog => "backlog",
Stage::Coding => "in-progress",
Stage::Coding { .. } => "in-progress",
Stage::Blocked { .. } => "blocked",
Stage::Qa => "QA",
Stage::Merge { .. } => "merge",
@@ -254,8 +254,6 @@ mod tests {
None,
None,
None,
None,
None,
);
// Seed in content store so find_story_by_number can resolve it.
-16
View File
@@ -239,8 +239,6 @@ fn snapshot_generation_includes_manifest() {
None,
None,
None,
None,
None,
);
crate::crdt_state::write_item_str(
"636_test_b",
@@ -250,8 +248,6 @@ fn snapshot_generation_includes_manifest() {
None,
None,
None,
None,
None,
);
let snapshot = generate_snapshot();
@@ -282,8 +278,6 @@ fn attribution_query_by_story_id() {
None,
None,
None,
None,
None,
);
let snapshot = generate_snapshot().unwrap();
@@ -319,8 +313,6 @@ fn compaction_reduces_ops() {
None,
None,
None,
None,
None,
);
}
@@ -357,8 +349,6 @@ fn latest_snapshot_available_after_compaction() {
None,
None,
None,
None,
None,
);
let snapshot = generate_snapshot().unwrap();
@@ -629,8 +619,6 @@ fn attribution_preserved_after_compaction() {
None,
None,
None,
None,
None,
);
crate::crdt_state::write_item_str(
"636_archived_story",
@@ -640,8 +628,6 @@ fn attribution_preserved_after_compaction() {
None,
None,
None,
None,
None,
);
crate::crdt_state::write_item_str(
"636_archived_story",
@@ -651,8 +637,6 @@ fn attribution_preserved_after_compaction() {
None,
None,
None,
None,
None,
);
// Generate snapshot.
+5 -5
View File
@@ -47,16 +47,16 @@ pub use read::{
};
pub use state::{init, subscribe};
pub use types::{
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, Claim, CrdtEvent,
EpicId, GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, MergeJobView,
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent, EpicId,
GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, MergeJobView,
NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView,
TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem,
};
pub use write::{
bump_retry_count, migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs,
migrate_story_ids_to_numeric, name_from_story_id, set_agent, set_depends_on, set_epic,
set_item_type, set_name, set_qa_mode, set_resume_to, set_resume_to_raw, set_retry_count,
write_item,
migrate_node_claims_to_agent_claims, migrate_story_ids_to_numeric, name_from_story_id,
set_agent, set_depends_on, set_epic, set_item_type, set_name, set_qa_mode, set_resume_to,
set_resume_to_raw, set_retry_count, write_item,
};
#[cfg(test)]
-2
View File
@@ -550,8 +550,6 @@ mod tests {
None,
None,
None,
None,
None,
);
assert!(
read_item(story_id).is_none(),
+12 -7
View File
@@ -51,7 +51,7 @@ pub fn sign_versioned_challenge(nonce: &str) -> Option<(String, String)> {
/// Write a claim on a pipeline item via CRDT.
///
/// Sets `claimed_by` to this node's ID and `claimed_at` to the current time.
/// Sets `claim_agent` to this node's ID and `claim_ts` to the current time.
/// The LWW register ensures deterministic conflict resolution — if two nodes
/// claim the same item simultaneously, both will converge to the same winner
/// after CRDT sync.
@@ -76,14 +76,14 @@ pub fn write_claim(story_id: &str) -> bool {
};
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claimed_by.set(node_id.clone())
s.crdt.doc.items[idx].claim_agent.set(node_id.clone())
});
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(now));
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claim_ts.set(now));
true
}
/// Release a claim on a pipeline item (clear claimed_by and claimed_at).
/// Release a claim on a pipeline item (clear claim_agent and claim_ts).
pub fn release_claim(story_id: &str) {
let Some(state_mutex) = get_crdt() else {
return;
@@ -96,9 +96,9 @@ pub fn release_claim(story_id: &str) {
};
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claimed_by.set(String::new())
s.crdt.doc.items[idx].claim_agent.set(String::new())
});
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(0.0));
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claim_ts.set(0.0));
}
/// Check if this node currently holds the claim on a pipeline item.
@@ -109,7 +109,12 @@ pub fn is_claimed_by_us(story_id: &str) -> bool {
let Some(item) = read_item(story_id) else {
return false;
};
item.claim().is_some_and(|c| c.node == node_id)
let claim = match item.stage() {
crate::pipeline_state::Stage::Coding { claim } => claim.as_ref(),
crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(),
_ => None,
};
claim.is_some_and(|c| c.agent.0 == node_id)
}
/// Write or update a node presence entry in the CRDT.
+47 -25
View File
@@ -22,8 +22,10 @@ pub struct CrdtItemDump {
pub agent: Option<String>,
pub retry_count: Option<i64>,
pub depends_on: Option<Vec<u32>>,
pub claimed_by: Option<String>,
pub claimed_at: Option<f64>,
/// Agent name holding the claim, or `None` when unclaimed.
pub claim_agent: Option<String>,
/// Unix timestamp (seconds) when the claim was written.
pub claim_ts: Option<f64>,
/// Hex-encoded OpId of the list insert op — cross-reference with `crdt_ops`.
pub content_index: String,
pub is_deleted: bool,
@@ -139,11 +141,11 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump {
_ => None,
};
let claimed_by = match item_crdt.claimed_by.view() {
let claim_agent = match item_crdt.claim_agent.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let claimed_at = match item_crdt.claimed_at.view() {
let claim_ts = match item_crdt.claim_ts.view() {
JsonValue::Number(n) if n > 0.0 => Some(n),
_ => None,
};
@@ -157,8 +159,8 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump {
agent,
retry_count,
depends_on,
claimed_by,
claimed_at,
claim_agent,
claim_ts,
content_index,
is_deleted: op.is_deleted,
});
@@ -326,7 +328,7 @@ pub fn evict_item(story_id: &str) -> Result<(), String> {
/// string, or with no name set, are filtered out (`None`) — a nameless item
/// is treated as malformed and never surfaces to callers.
pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemView> {
use super::types::{Claim, EpicId};
use super::types::EpicId;
use crate::io::story_metadata::{ItemType, QaMode};
let story_id = match item.story_id.view() {
@@ -357,18 +359,17 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemV
_ => Vec::new(),
};
let claimed_by = match item.claimed_by.view() {
// `claim_agent`/`claim_ts` are read only to embed in Stage::Coding /
// Stage::Merge via `project_stage_for_view`; they are not stored on
// `WorkItem` directly (story 1009: readers project from the Stage variant).
let claim_agent = match item.claim_agent.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let claimed_at_secs = match item.claimed_at.view() {
let claim_ts_secs = match item.claim_ts.view() {
JsonValue::Number(n) if n > 0.0 => Some(n as u64),
_ => None,
};
let claim = match (claimed_by, claimed_at_secs) {
(Some(node), Some(at)) => Some(Claim { node, at }),
_ => None,
};
// `merged_at` is read only to project into `Stage::Done`; it is not
// stored on `WorkItem` (callers access it via `Stage::Done { merged_at }`).
@@ -397,8 +398,14 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemV
_ => None,
};
let stage =
project_stage_for_view(&stage_str, &story_id, merged_at_float, resume_to.as_deref())?;
let stage = project_stage_for_view(
&stage_str,
&story_id,
merged_at_float,
resume_to.as_deref(),
claim_agent.as_deref(),
claim_ts_secs,
)?;
Some(PipelineItemView {
story_id,
@@ -407,7 +414,6 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemV
agent,
retry_count,
depends_on,
claim,
qa_mode,
item_type,
epic,
@@ -432,9 +438,11 @@ fn project_stage_for_view(
story_id: &str,
merged_at: Option<f64>,
resume_to: Option<&str>,
claim_agent: Option<&str>,
claim_ts_secs: Option<u64>,
) -> Option<crate::pipeline_state::Stage> {
use crate::pipeline_state::{ArchiveReason, BranchName, GitSha, Stage};
use chrono::{DateTime, Utc};
use crate::pipeline_state::{AgentClaim, AgentName, ArchiveReason, BranchName, GitSha, Stage};
use chrono::{DateTime, TimeZone, Utc};
use std::num::NonZeroU32;
// Normalise legacy directory-style strings to their clean wire form so
@@ -458,13 +466,30 @@ fn project_stage_for_view(
// Story 945: resume target for `Frozen` / `ReviewHold` variants is stored
// in the sibling `resume_to` register. Fall back to `Coding` when the
// register is empty or holds an unrecognised value.
let resume_target =
|| -> Box<Stage> { Box::new(resume_to.and_then(Stage::from_dir).unwrap_or(Stage::Coding)) };
let resume_target = || -> Box<Stage> {
Box::new(
resume_to
.and_then(Stage::from_dir)
.unwrap_or(Stage::Coding { claim: None }),
)
};
// Story 1009: reconstruct AgentClaim from `claim_agent`/`claim_ts` registers.
let claim = match (claim_agent, claim_ts_secs) {
(Some(agent_str), Some(ts)) => Some(AgentClaim {
agent: AgentName(agent_str.to_string()),
claimed_at: Utc
.timestamp_opt(ts as i64, 0)
.single()
.unwrap_or(DateTime::<Utc>::UNIX_EPOCH),
}),
_ => None,
};
match clean {
"upcoming" => Some(Stage::Upcoming),
"backlog" => Some(Stage::Backlog),
"coding" => Some(Stage::Coding),
"coding" => Some(Stage::Coding { claim }),
"qa" => Some(Stage::Qa),
"blocked" => Some(Stage::Blocked {
reason: String::new(),
@@ -472,6 +497,7 @@ fn project_stage_for_view(
"merge" => Some(Stage::Merge {
feature_branch: BranchName(format!("feature/story-{story_id}")),
commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"),
claim,
}),
"merge_failure" => {
// Story 986: read the typed kind directly from ContentKey::MergeFailureKind
@@ -709,8 +735,6 @@ mod tests {
None,
None,
None,
None,
None,
);
// The story is live on this node.
@@ -779,8 +803,6 @@ mod tests {
None,
None,
None,
None,
None,
);
assert!(
read_item(story_id).is_none(),
+4 -5
View File
@@ -117,8 +117,6 @@ async fn subscribe_receives_stage_transition_events() {
None,
None,
None,
None,
None,
);
let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on insert");
@@ -138,8 +136,6 @@ async fn subscribe_receives_stage_transition_events() {
None,
None,
None,
None,
None,
);
let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on stage change");
@@ -148,7 +144,10 @@ async fn subscribe_receives_stage_transition_events() {
evt.from_stage,
Some(crate::pipeline_state::Stage::Backlog)
));
assert!(matches!(evt.to_stage, crate::pipeline_state::Stage::Coding));
assert!(matches!(
evt.to_stage,
crate::pipeline_state::Stage::Coding { .. }
));
}
#[tokio::test]
+17 -37
View File
@@ -65,14 +65,13 @@ pub struct PipelineItemCrdt {
pub agent: LwwRegisterCrdt<String>,
pub retry_count: LwwRegisterCrdt<f64>,
pub depends_on: LwwRegisterCrdt<String>,
/// Node ID (hex-encoded Ed25519 pubkey) of the node that claimed this item.
/// Used for distributed work claiming — the LWW register resolves conflicts
/// deterministically so all nodes converge on the same claimer.
pub claimed_by: LwwRegisterCrdt<String>,
/// Name of the agent (e.g. `"coder-1"`) that has claimed this item.
/// Empty string means the item is unclaimed. Replaces the legacy
/// `claimed_by` node-hex register (story 1009).
pub claim_agent: LwwRegisterCrdt<String>,
/// Unix timestamp (seconds) when the claim was written.
/// Used for timeout-based reclaim: if a node crashes, other nodes can
/// reclaim the item after the timeout expires.
pub claimed_at: LwwRegisterCrdt<f64>,
/// Zero means no active claim. Previously named `claimed_at`.
pub claim_ts: LwwRegisterCrdt<f64>,
/// Unix timestamp (seconds) when the item was merged to master.
/// Written once when the item transitions to `5_done`. Used by the
/// sweep loop to determine when to promote to `6_archived`.
@@ -121,18 +120,6 @@ pub struct NodePresenceCrdt {
// ── Read-side view types ─────────────────────────────────────────────
/// Active claim on a pipeline item — node that owns it and when the claim was written.
///
/// Both fields must be present for a claim to be valid; a partial claim (node
/// but no timestamp, or vice versa) is treated as absent by `extract_item_view`.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Claim {
/// Hex-encoded Ed25519 public key of the node that holds the claim.
pub node: String,
/// Unix timestamp (seconds, integer) when the claim was written.
pub at: u64,
}
/// Numeric identifier for an epic work item.
///
/// The numeric prefix of the epic's story_id (e.g. `EpicId(9990)` for the
@@ -203,9 +190,6 @@ pub struct WorkItem {
pub(super) retry_count: u32,
/// Dependency story numbers — empty `Vec` when the register is unset.
pub(super) depends_on: Vec<u32>,
/// Active claim (node + timestamp). `None` when the item is unclaimed or
/// when only one of the two companion registers is set.
pub(super) claim: Option<Claim>,
/// QA mode override. `None` means "use the project default".
pub(super) qa_mode: Option<crate::io::story_metadata::QaMode>,
/// Item type. `None` means "infer from the story_id slug prefix".
@@ -248,11 +232,6 @@ impl WorkItem {
&self.depends_on
}
/// Active claim on this item, or `None` when unclaimed.
pub fn claim(&self) -> Option<&Claim> {
self.claim.as_ref()
}
/// QA mode override, or `None` when the register is unset (use project default).
pub fn qa_mode(&self) -> Option<crate::io::story_metadata::QaMode> {
self.qa_mode
@@ -281,7 +260,6 @@ impl WorkItem {
agent: Option<crate::config::AgentName>,
retry_count: u32,
depends_on: Vec<u32>,
claim: Option<Claim>,
qa_mode: Option<crate::io::story_metadata::QaMode>,
item_type: Option<crate::io::story_metadata::ItemType>,
epic: Option<EpicId>,
@@ -293,7 +271,6 @@ impl WorkItem {
agent,
retry_count,
depends_on,
claim,
qa_mode,
item_type,
epic,
@@ -480,8 +457,8 @@ mod tests {
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
"claim_agent": "",
"claim_ts": 0.0,
})
.into();
@@ -515,8 +492,8 @@ mod tests {
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
"claim_agent": "",
"claim_ts": 0.0,
})
.into();
@@ -541,7 +518,7 @@ mod tests {
let evt = CrdtEvent {
story_id: "42_story_foo".to_string(),
from_stage: Some(crate::pipeline_state::Stage::Backlog),
to_stage: crate::pipeline_state::Stage::Coding,
to_stage: crate::pipeline_state::Stage::Coding { claim: None },
name: "Foo Feature".to_string(),
};
assert_eq!(evt.story_id, "42_story_foo");
@@ -549,7 +526,10 @@ mod tests {
evt.from_stage,
Some(crate::pipeline_state::Stage::Backlog)
));
assert!(matches!(evt.to_stage, crate::pipeline_state::Stage::Coding));
assert!(matches!(
evt.to_stage,
crate::pipeline_state::Stage::Coding { .. }
));
assert_eq!(evt.name, "Foo Feature");
}
@@ -698,7 +678,7 @@ mod tests {
let evt = CrdtEvent {
story_id: "70_story_broadcast".to_string(),
from_stage: Some(Stage::Backlog),
to_stage: Stage::Coding,
to_stage: Stage::Coding { claim: None },
name: "Broadcast Test".to_string(),
};
tx.send(evt).unwrap();
@@ -706,7 +686,7 @@ mod tests {
let received = rx.try_recv().unwrap();
assert_eq!(received.story_id, "70_story_broadcast");
assert!(matches!(received.from_stage, Some(Stage::Backlog)));
assert!(matches!(received.to_stage, Stage::Coding));
assert!(matches!(received.to_stage, Stage::Coding { .. }));
assert_eq!(received.name, "Broadcast Test");
}
}
+30 -21
View File
@@ -11,7 +11,7 @@ use serde_json::json;
use super::super::state::{apply_and_persist, emit_event, get_crdt, rebuild_index};
use super::super::types::CrdtEvent;
use crate::io::story_metadata::QaMode;
use crate::pipeline_state::{Stage, stage_dir_name};
use crate::pipeline_state::{AgentClaim, Stage, stage_dir_name};
/// Set the typed `depends_on` CRDT register for a pipeline item.
///
@@ -221,7 +221,6 @@ pub fn set_qa_mode(story_id: &str, mode: Option<QaMode>) -> bool {
///
/// `stage` is the typed pipeline state; it is serialised to the canonical
/// clean wire form (story 934) via [`stage_dir_name`] at the CRDT boundary.
#[allow(clippy::too_many_arguments)]
pub fn write_item(
story_id: &str,
stage: &Stage,
@@ -229,11 +228,14 @@ pub fn write_item(
agent: Option<&str>,
retry_count: Option<i64>,
depends_on: Option<&str>,
claimed_by: Option<&str>,
claimed_at: Option<f64>,
merged_at: Option<f64>,
) {
let stage_str = stage_dir_name(stage);
let claim: Option<&AgentClaim> = match stage {
Stage::Coding { claim } => claim.as_ref(),
Stage::Merge { claim, .. } => claim.as_ref(),
_ => None,
};
let Some(state_mutex) = get_crdt() else {
return;
};
@@ -291,14 +293,19 @@ pub fn write_item(
s.crdt.doc.items[idx].depends_on.set(d.to_string())
});
}
if let Some(cb) = claimed_by {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claimed_by.set(cb.to_string())
});
}
if let Some(ca) = claimed_at {
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(ca));
}
let (claim_agent_str, claim_ts_val) = match claim {
Some(c) => (
c.agent.0.as_str().to_string(),
c.claimed_at.timestamp() as f64,
),
None => (String::new(), 0.0),
};
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claim_agent.set(claim_agent_str)
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claim_ts.set(claim_ts_val)
});
if let Some(ma) = merged_at {
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma));
}
@@ -322,6 +329,13 @@ pub fn write_item(
}
} else {
// Insert new item.
let (insert_claim_agent, insert_claim_ts) = match claim {
Some(c) => (
c.agent.0.as_str().to_string(),
c.claimed_at.timestamp() as f64,
),
None => (String::new(), 0.0),
};
let item_json: JsonValue = json!({
"story_id": story_id,
"stage": stage_str,
@@ -329,8 +343,8 @@ pub fn write_item(
"agent": agent.unwrap_or(""),
"retry_count": retry_count.unwrap_or(0) as f64,
"depends_on": depends_on.unwrap_or(""),
"claimed_by": claimed_by.unwrap_or(""),
"claimed_at": claimed_at.unwrap_or(0.0),
"claim_agent": insert_claim_agent,
"claim_ts": insert_claim_ts,
"merged_at": merged_at.unwrap_or(0.0),
"qa_mode": "",
"item_type": "",
@@ -357,8 +371,8 @@ pub fn write_item(
item.agent.advance_seq(floor);
item.retry_count.advance_seq(floor);
item.depends_on.advance_seq(floor);
item.claimed_by.advance_seq(floor);
item.claimed_at.advance_seq(floor);
item.claim_agent.advance_seq(floor);
item.claim_ts.advance_seq(floor);
item.merged_at.advance_seq(floor);
item.qa_mode.advance_seq(floor);
item.item_type.advance_seq(floor);
@@ -384,7 +398,6 @@ pub fn write_item(
/// Stages are normalised through [`Stage::from_dir`]: unknown strings cause
/// the write to be skipped (with a log line).
#[cfg(test)]
#[allow(clippy::too_many_arguments)]
pub fn write_item_str(
story_id: &str,
stage: &str,
@@ -392,8 +405,6 @@ pub fn write_item_str(
agent: Option<&str>,
retry_count: Option<i64>,
depends_on: Option<&str>,
claimed_by: Option<&str>,
claimed_at: Option<f64>,
merged_at: Option<f64>,
) {
// Normalise pre-934 directory-style strings to clean wire form so
@@ -423,8 +434,6 @@ pub fn write_item_str(
agent,
retry_count,
depends_on,
claimed_by,
claimed_at,
merged_at,
);
}
+60 -8
View File
@@ -276,6 +276,57 @@ pub fn migrate_legacy_stage_strings() {
);
}
/// Clear legacy node-hex claims from `claim_agent` and `claim_ts` registers.
///
/// Pre-1009 nodes wrote the Ed25519 hex pubkey as `claimed_by`. That value
/// cannot be converted to an `AgentName`, so the safe migration is to wipe
/// any existing claim rather than carry over a semantically invalid string.
///
/// Only clears entries where `claim_agent` looks like a legacy node hex value
/// (64 hex chars). Entries that are already empty or contain an agent-name
/// string (shorter, mixed case) are left untouched.
pub fn migrate_node_claims_to_agent_claims() {
let Some(state_mutex) = get_crdt() else {
return;
};
let stale_indices: Vec<usize> = {
let Ok(state) = state_mutex.lock() else {
return;
};
state
.index
.values()
.copied()
.filter(|&idx| {
let item = &state.crdt.doc.items[idx];
match item.claim_agent.view() {
JsonValue::String(s) => {
s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit())
}
_ => false,
}
})
.collect()
};
if stale_indices.is_empty() {
return;
}
let Ok(mut state) = state_mutex.lock() else {
return;
};
let count = stale_indices.len();
for idx in stale_indices {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claim_agent.set(String::new())
});
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claim_ts.set(0.0));
}
slog!("[crdt] Cleared {count} legacy node-hex claim(s) from claim_agent/claim_ts");
}
#[cfg(test)]
mod stage_migration_tests {
use super::super::super::state::init_for_test;
@@ -299,8 +350,6 @@ mod stage_migration_tests {
None,
None,
None,
None,
None,
);
// Then overwrite the stage register with the raw legacy string,
// bypassing `db::normalise_stage_str` / `write_item_str`'s mapping.
@@ -318,7 +367,11 @@ mod stage_migration_tests {
let cases: &[(&str, &str, Stage)] = &[
("9501_legacy_upcoming", "0_upcoming", Stage::Upcoming),
("9502_legacy_backlog", "1_backlog", Stage::Backlog),
("9503_legacy_coding", "2_current", Stage::Coding),
(
"9503_legacy_coding",
"2_current",
Stage::Coding { claim: None },
),
(
"9504_legacy_blocked",
"2_blocked",
@@ -333,6 +386,7 @@ mod stage_migration_tests {
Stage::Merge {
feature_branch: BranchName(String::new()),
commits_ahead: NonZeroU32::new(1).unwrap(),
claim: None,
},
),
(
@@ -398,14 +452,12 @@ mod stage_migration_tests {
// Seed two items: one already in clean form, one in legacy form.
write_item(
"9520_already_clean",
&Stage::Coding,
&Stage::Coding { claim: None },
Some("Already Clean"),
None,
None,
None,
None,
None,
None,
);
seed_with_raw_stage("9521_needs_migration", "2_current");
@@ -416,11 +468,11 @@ mod stage_migration_tests {
let migrated = read_item("9521_needs_migration").unwrap();
assert!(matches!(
clean.stage(),
crate::pipeline_state::Stage::Coding
crate::pipeline_state::Stage::Coding { .. }
));
assert!(matches!(
migrated.stage(),
crate::pipeline_state::Stage::Coding
crate::pipeline_state::Stage::Coding { .. }
));
}
+1 -1
View File
@@ -18,5 +18,5 @@ pub use item::{
pub use item::write_item_str;
pub use migrations::{
migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs,
migrate_story_ids_to_numeric, name_from_story_id,
migrate_node_claims_to_agent_claims, migrate_story_ids_to_numeric, name_from_story_id,
};
+5 -34
View File
@@ -98,8 +98,6 @@ fn migrate_story_ids_to_numeric_rewrites_slug_ids() {
None,
None,
None,
None,
None,
);
let result = migrate_story_ids_to_numeric();
@@ -130,8 +128,6 @@ fn migrate_story_ids_to_numeric_is_idempotent() {
None,
None,
None,
None,
None,
);
// First call — nothing to migrate.
@@ -159,8 +155,6 @@ fn migrate_story_ids_to_numeric_skips_conflict() {
None,
None,
None,
None,
None,
);
write_item_str(
"44",
@@ -170,8 +164,6 @@ fn migrate_story_ids_to_numeric_skips_conflict() {
None,
None,
None,
None,
None,
);
let result = migrate_story_ids_to_numeric();
@@ -204,14 +196,15 @@ fn migrate_story_ids_to_numeric_preserves_stage_and_name() {
None,
None,
None,
None,
None,
);
migrate_story_ids_to_numeric();
let item = read_item("45").expect("item must be accessible by numeric ID");
assert!(matches!(item.stage, crate::pipeline_state::Stage::Coding));
assert!(matches!(
item.stage,
crate::pipeline_state::Stage::Coding { .. }
));
assert_eq!(item.name, "Crash Bug");
assert_eq!(item.agent.map(|a| a.as_str()), Some("coder-1"));
}
@@ -229,8 +222,6 @@ fn migrate_names_from_slugs_fills_empty_names() {
None,
None,
None,
None,
None,
);
// Before migration: nameless item is filtered by read_item (AC 5).
@@ -261,8 +252,6 @@ fn migrate_names_from_slugs_leaves_existing_names_unchanged() {
None,
None,
None,
None,
None,
);
migrate_names_from_slugs();
@@ -297,8 +286,6 @@ fn set_depends_on_round_trip_and_clear() {
None,
None,
None,
None,
None,
);
// Set depends_on to [837] and verify CRDT register holds the list.
@@ -352,8 +339,6 @@ fn set_agent_some_writes_name() {
None,
None,
None,
None,
None,
);
let found = set_agent(
@@ -382,8 +367,6 @@ fn set_agent_none_clears_register() {
None,
None,
None,
None,
None,
);
// Confirm agent is set.
@@ -430,8 +413,6 @@ fn set_qa_mode_round_trip_server_then_human() {
None,
None,
None,
None,
None,
);
// Set qa=server via typed path and assert CRDT register reflects it.
@@ -485,8 +466,6 @@ fn set_qa_mode_round_trip_all_variants() {
None,
None,
None,
None,
None,
);
for mode in [QaMode::Server, QaMode::Agent, QaMode::Human] {
@@ -523,8 +502,6 @@ fn bump_retry_count_increments_by_one() {
None,
None,
None,
None,
None,
);
let v1 = bump_retry_count("9001_story_bump_test");
@@ -548,8 +525,6 @@ fn set_retry_count_resets_to_zero() {
Some(5),
None,
None,
None,
None,
);
set_retry_count("9002_story_set_test", 0);
@@ -666,7 +641,7 @@ async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() {
let idx2 = index2["511_story_target"];
let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap();
assert!(
matches!(view.stage, crate::pipeline_state::Stage::Coding),
matches!(view.stage, crate::pipeline_state::Stage::Coding { .. }),
"stage field update lost during replay (bug 511 regression)"
);
@@ -727,8 +702,6 @@ async fn tombstone_survives_concurrent_writes() {
None,
None,
None,
None,
None,
);
assert!(
read_item(story_id).is_some(),
@@ -748,8 +721,6 @@ async fn tombstone_survives_concurrent_writes() {
None,
None,
None,
None,
None,
);
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
-2
View File
@@ -466,8 +466,6 @@ mod tests {
Some(3),
None,
None,
None,
None,
);
write_content(
ContentKey::Story(story_id),
+1 -13
View File
@@ -93,8 +93,6 @@ pub fn write_item_with_content(story_id: &str, stage: &str, content: &str, meta:
meta.agent.as_deref(),
meta.retry_count,
depends_on_json.as_deref(),
None,
None,
merged_at_ts,
);
@@ -148,17 +146,7 @@ pub fn move_item_stage(
};
let merged_at_ts = matches!(typed_stage, crate::pipeline_state::Stage::Done { .. })
.then(|| chrono::Utc::now().timestamp() as f64);
crate::crdt_state::write_item(
story_id,
&typed_stage,
None,
None,
None,
None,
None,
None,
merged_at_ts,
);
crate::crdt_state::write_item(story_id, &typed_stage, None, None, None, None, merged_at_ts);
// Bug 780: stage transitions reset retry_count to 0. retry_count tracks
// attempts at THIS stage's work (coding, merging, qa); a fresh attempt at
// a new stage is conceptually distinct from prior attempts at a different
-2
View File
@@ -350,8 +350,6 @@ mod tests {
None,
None,
None,
None,
None,
);
assert!(
crdt_state::read_item(old_id).is_none(),
+2 -2
View File
@@ -93,8 +93,8 @@ pub(crate) fn tool_dump_crdt(args: &Value) -> Result<String, String> {
"agent": item.agent,
"retry_count": item.retry_count,
"depends_on": item.depends_on,
"claimed_by": item.claimed_by,
"claimed_at": item.claimed_at,
"claimed_by": item.claim_agent,
"claimed_at": item.claim_ts,
"content_index": item.content_index,
"is_deleted": item.is_deleted,
})
-4
View File
@@ -305,8 +305,6 @@ mod tests {
None,
None,
None,
None,
None,
);
let tmp = tempfile::tempdir().unwrap();
let ctx = test_ctx(tmp.path());
@@ -331,8 +329,6 @@ mod tests {
None,
None,
None,
None,
None,
);
let tmp = tempfile::tempdir().unwrap();
let ctx = test_ctx(tmp.path());
+15 -4
View File
@@ -165,7 +165,10 @@ pub(super) async fn tool_status(args: &Value, ctx: &AppContext) -> Result<String
"Story '{story_id}' not found in coding stage. Check the story_id and ensure it is in the current stage."
))?;
if !matches!(typed_item.stage, crate::pipeline_state::Stage::Coding) {
if !matches!(
typed_item.stage,
crate::pipeline_state::Stage::Coding { .. }
) {
return Err(format!(
"Story '{story_id}' not found in coding stage. Check the story_id and ensure it is in the current stage."
));
@@ -192,9 +195,17 @@ pub(super) async fn tool_status(args: &Value, ctx: &AppContext) -> Result<String
if !deps.is_empty() {
front_matter.insert("depends_on".to_string(), json!(deps));
}
if let Some(claim) = view.claim() {
front_matter.insert("claimed_by".to_string(), json!(claim.node));
front_matter.insert("claimed_at".to_string(), json!(claim.at));
let stage_claim = match &typed_item.stage {
crate::pipeline_state::Stage::Coding { claim } => claim.as_ref(),
crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(),
_ => None,
};
if let Some(claim) = stage_claim {
front_matter.insert("claimed_by".to_string(), json!(claim.agent.0.as_str()));
front_matter.insert(
"claimed_at".to_string(),
json!(claim.claimed_at.timestamp() as f64),
);
}
}
+2 -2
View File
@@ -188,8 +188,8 @@ pub fn debug_crdt_handler(req: &poem::Request) -> poem::Response {
"agent": item.agent,
"retry_count": item.retry_count,
"depends_on": item.depends_on,
"claimed_by": item.claimed_by,
"claimed_at": item.claimed_at,
"claimed_by": item.claim_agent,
"claimed_at": item.claim_ts,
"content_index": item.content_index,
"is_deleted": item.is_deleted,
})
+3 -3
View File
@@ -171,7 +171,7 @@ pub fn load_pipeline_state(ctx: &AppContext) -> Result<PipelineState, String> {
match &item.stage {
Stage::Upcoming => state.backlog.push(story), // upcoming shown with backlog
Stage::Backlog => state.backlog.push(story),
Stage::Coding => state.current.push(story),
Stage::Coding { .. } => state.current.push(story),
Stage::Blocked { .. } => state.current.push(story), // blocked shown with current
Stage::Qa => state.qa.push(story),
Stage::Merge { .. } => state.merge.push(story),
@@ -182,7 +182,7 @@ pub fn load_pipeline_state(ctx: &AppContext) -> Result<PipelineState, String> {
// Route to the section matching the stage that was active when
// the item was frozen, so it appears in-place.
match unwrap_frozen(resume_to) {
Stage::Coding | Stage::Blocked { .. } => state.current.push(story),
Stage::Coding { .. } | Stage::Blocked { .. } => state.current.push(story),
Stage::Qa | Stage::ReviewHold { .. } => state.qa.push(story),
Stage::Merge { .. }
| Stage::MergeFailure { .. }
@@ -324,7 +324,7 @@ pub fn validate_story_dirs(_root: &Path) -> Result<Vec<StoryValidationResult>, S
let mut results = Vec::new();
for item in crate::pipeline_state::read_all_typed() {
if !matches!(item.stage, Stage::Backlog | Stage::Coding) {
if !matches!(item.stage, Stage::Backlog | Stage::Coding { .. }) {
continue;
}
results.push(StoryValidationResult {
+1 -1
View File
@@ -54,7 +54,7 @@ pub fn stage_metadata(
match stage {
Stage::Upcoming => ("create", format!("huskies: triage {item_id}")),
Stage::Backlog => ("create", format!("huskies: create {item_id}")),
Stage::Coding => ("start", format!("huskies: start {item_id}")),
Stage::Coding { .. } => ("start", format!("huskies: start {item_id}")),
Stage::Blocked { .. } => ("block", format!("huskies: block {item_id}")),
Stage::Qa => ("qa", format!("huskies: queue {item_id} for QA")),
Stage::Merge { .. } => ("merge", format!("huskies: queue {item_id} for merge")),
+1 -5
View File
@@ -54,7 +54,7 @@ fn stage_metadata_returns_correct_actions() {
use crate::pipeline_state::{GitSha, Stage};
use chrono::Utc;
let (action, msg) = stage_metadata(&Stage::Coding, "42_story_foo");
let (action, msg) = stage_metadata(&Stage::Coding { claim: None }, "42_story_foo");
assert_eq!(action, "start");
assert_eq!(msg, "huskies: start 42_story_foo");
@@ -177,8 +177,6 @@ fn sweep_uses_crdt_merged_at_not_utc_now() {
None,
None,
None,
None,
None,
Some(ten_seconds_ago),
);
@@ -209,8 +207,6 @@ fn sweep_keeps_item_newer_than_retention() {
None,
None,
None,
None,
None,
Some(one_second_ago),
);
+2 -1
View File
@@ -123,7 +123,7 @@ mod tests {
bus.fire(TransitionFired {
story_id: StoryId("test".into()),
before: Stage::Backlog,
after: Stage::Coding,
after: Stage::Coding { claim: None },
event: PipelineEvent::DepsMet,
at: Utc::now(),
});
@@ -142,6 +142,7 @@ mod tests {
let merge = Stage::Merge {
feature_branch: BranchName("feature/story-1".into()),
commits_ahead: NonZeroU32::new(3).unwrap(),
claim: None,
};
// Stage::Merge has exactly two fields: feature_branch and commits_ahead.
// There is no way to attach an agent name to it. The type system
+2 -2
View File
@@ -40,8 +40,8 @@ mod tests;
#[allow(unused_imports)]
pub use types::{
AgentName, ArchiveReason, BranchName, ExecutionState, GitSha, MergeFailureKind, NodePubkey,
PipelineItem, Stage, StoryId, TransitionError, stage_dir_name, stage_label,
AgentClaim, AgentName, ArchiveReason, BranchName, ExecutionState, GitSha, MergeFailureKind,
NodePubkey, PipelineItem, Stage, StoryId, TransitionError, stage_dir_name, stage_label,
};
#[allow(unused_imports)]
+5 -8
View File
@@ -122,7 +122,6 @@ mod tests {
None,
None,
None,
None,
)
}
@@ -145,7 +144,6 @@ mod tests {
None,
None,
None,
None,
);
let item = PipelineItem::try_from(&view).unwrap();
assert_eq!(item.story_id, StoryId("42_story_test".to_string()));
@@ -159,7 +157,7 @@ mod tests {
fn project_current_item() {
let view = PipelineItemView::for_test(
"42_story_test",
Stage::Coding,
Stage::Coding { claim: None },
"Test",
Some(crate::config::AgentName::Coder1),
2u32,
@@ -167,10 +165,9 @@ mod tests {
None,
None,
None,
None,
);
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(item.stage, Stage::Coding));
assert!(matches!(item.stage, Stage::Coding { .. }));
assert_eq!(item.retry_count, 2);
}
@@ -181,6 +178,7 @@ mod tests {
Stage::Merge {
feature_branch: fb("feature/story-42_story_test"),
commits_ahead: nz(1),
claim: None,
},
Some("Test"),
);
@@ -189,6 +187,7 @@ mod tests {
if let Stage::Merge {
feature_branch,
commits_ahead,
..
} = &item.stage
{
assert_eq!(feature_branch.0, "feature/story-42_story_test");
@@ -226,7 +225,6 @@ mod tests {
None,
None,
None,
None,
);
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(
@@ -253,7 +251,6 @@ mod tests {
None,
None,
None,
None,
);
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(
@@ -270,7 +267,7 @@ mod tests {
let view = make_view(
"42_story_test",
Stage::Frozen {
resume_to: Box::new(Stage::Coding),
resume_to: Box::new(Stage::Coding { claim: None }),
},
Some("Frozen Story"),
);
+26 -20
View File
@@ -19,7 +19,7 @@ fn sid(s: &str) -> StoryId {
fn happy_path_backlog_through_archived() {
let s = Stage::Backlog;
let s = transition(s, PipelineEvent::DepsMet).unwrap();
assert!(matches!(s, Stage::Coding));
assert!(matches!(s, Stage::Coding { .. }));
let s = transition(
s,
@@ -52,7 +52,7 @@ fn happy_path_backlog_through_archived() {
#[test]
fn happy_path_with_qa() {
let s = Stage::Coding;
let s = Stage::Coding { claim: None };
let s = transition(s, PipelineEvent::GatesStarted).unwrap();
assert!(matches!(s, Stage::Qa));
@@ -69,7 +69,7 @@ fn happy_path_with_qa() {
#[test]
fn qa_retry_loop() {
let s = Stage::Coding;
let s = Stage::Coding { claim: None };
let s = transition(s, PipelineEvent::GatesStarted).unwrap();
assert!(matches!(s, Stage::Qa));
@@ -80,7 +80,7 @@ fn qa_retry_loop() {
},
)
.unwrap();
assert!(matches!(s, Stage::Coding));
assert!(matches!(s, Stage::Coding { .. }));
}
// ── Bug 519: Merge with zero commits is unrepresentable ─────────────
@@ -154,7 +154,7 @@ fn cannot_start_gates_from_backlog() {
#[test]
fn cannot_accept_from_coding() {
let result = transition(Stage::Coding, PipelineEvent::Accepted);
let result = transition(Stage::Coding { claim: None }, PipelineEvent::Accepted);
assert!(matches!(
result,
Err(TransitionError::InvalidTransition { .. })
@@ -165,7 +165,7 @@ fn cannot_accept_from_coding() {
#[test]
fn block_from_any_active_stage() {
for s in [Stage::Backlog, Stage::Coding, Stage::Qa] {
for s in [Stage::Backlog, Stage::Coding { claim: None }, Stage::Qa] {
let result = transition(
s.clone(),
PipelineEvent::Block {
@@ -178,6 +178,7 @@ fn block_from_any_active_stage() {
let m = Stage::Merge {
feature_branch: fb("f"),
commits_ahead: nz(1),
claim: None,
};
let result = transition(
m,
@@ -194,7 +195,7 @@ fn unblock_returns_to_coding() {
reason: "test".into(),
};
let result = transition(s, PipelineEvent::Unblock).unwrap();
assert!(matches!(result, Stage::Coding));
assert!(matches!(result, Stage::Coding { .. }));
}
#[test]
@@ -251,7 +252,7 @@ fn legacy_unblock_archived_blocked_returns_to_backlog() {
fn abandon_from_any_active_or_done() {
for s in [
Stage::Backlog,
Stage::Coding,
Stage::Coding { claim: None },
Stage::Qa,
Stage::Done {
merged_at: chrono::Utc::now(),
@@ -267,7 +268,7 @@ fn abandon_from_any_active_or_done() {
fn supersede_from_any_active_or_done() {
for s in [
Stage::Backlog,
Stage::Coding,
Stage::Coding { claim: None },
Stage::Qa,
Stage::Done {
merged_at: chrono::Utc::now(),
@@ -291,7 +292,7 @@ fn review_hold_from_active_stages() {
// Story 945: `ReviewHold` transitions to `Stage::ReviewHold { resume_to }`
// with the resume_to set to the originating stage, replacing the legacy
// boolean flag.
for s in [Stage::Backlog, Stage::Coding, Stage::Qa] {
for s in [Stage::Backlog, Stage::Coding { claim: None }, Stage::Qa] {
let result = transition(
s.clone(),
PipelineEvent::ReviewHold {
@@ -316,6 +317,7 @@ fn merge_failed_final() {
let s = Stage::Merge {
feature_branch: fb("f"),
commits_ahead: nz(1),
claim: None,
};
let result = transition(
s,
@@ -336,7 +338,7 @@ fn merge_failed_final() {
#[test]
fn merge_failed_only_from_merge() {
let result = transition(
Stage::Coding,
Stage::Coding { claim: None },
PipelineEvent::MergeFailedFinal {
reason: "conflicts".into(),
},
@@ -413,6 +415,7 @@ fn bug_502_agent_not_in_stage() {
let merge = Stage::Merge {
feature_branch: BranchName("feature/story-1".into()),
commits_ahead: NonZeroU32::new(3).unwrap(),
claim: None,
};
// Stage::Merge has exactly two fields: feature_branch and commits_ahead.
// There is no way to attach an agent name to it. The type system
@@ -480,7 +483,7 @@ fn cannot_deps_met_from_upcoming() {
#[test]
fn reject_from_active_stages() {
for s in [Stage::Backlog, Stage::Coding, Stage::Qa] {
for s in [Stage::Backlog, Stage::Coding { claim: None }, Stage::Qa] {
let result = transition(
s.clone(),
PipelineEvent::Reject {
@@ -493,6 +496,7 @@ fn reject_from_active_stages() {
let m = Stage::Merge {
feature_branch: fb("f"),
commits_ahead: nz(1),
claim: None,
};
let result = transition(
m,
@@ -561,7 +565,7 @@ fn freeze_transitions_to_frozen_variant_with_resume_to() {
);
let item = read_typed(story_id).unwrap().unwrap();
assert!(matches!(item.stage, Stage::Coding));
assert!(matches!(item.stage, Stage::Coding { .. }));
assert!(!matches!(item.stage, Stage::Frozen { .. }));
super::apply::transition_to_frozen(story_id).expect("freeze should succeed");
@@ -569,7 +573,7 @@ fn freeze_transitions_to_frozen_variant_with_resume_to() {
let item = read_typed(story_id).unwrap().unwrap();
match &item.stage {
Stage::Frozen { resume_to } => assert!(
matches!(**resume_to, Stage::Coding),
matches!(**resume_to, Stage::Coding { .. }),
"resume_to should preserve the previous stage; got {resume_to:?}"
),
other => panic!("stage should be Stage::Frozen after freeze; got {other:?}"),
@@ -583,7 +587,7 @@ fn freeze_transitions_to_frozen_variant_with_resume_to() {
let item = read_typed(story_id).unwrap().unwrap();
assert!(
matches!(item.stage, Stage::Coding),
matches!(item.stage, Stage::Coding { .. }),
"stage should return to Coding after unfreeze: {:?}",
item.stage
);
@@ -884,10 +888,11 @@ fn merge_aborted_returns_to_coding() {
let s = Stage::Merge {
feature_branch: fb("feature/story-73"),
commits_ahead: nz(2),
claim: None,
};
let result = transition(s, PipelineEvent::MergeAborted).unwrap();
assert!(
matches!(result, Stage::Coding),
matches!(result, Stage::Coding { .. }),
"Merge + MergeAborted should return to Coding, got: {result:?}"
);
}
@@ -915,7 +920,7 @@ fn merge_aborted_moves_to_coding_via_crdt() {
fired.before
);
assert!(
matches!(fired.after, Stage::Coding),
matches!(fired.after, Stage::Coding { .. }),
"fired.after should be Coding: {:?}",
fired.after
);
@@ -958,7 +963,7 @@ fn move_story_merge_to_current_succeeds() {
.expect("CRDT read should succeed")
.expect("item should exist");
assert!(
matches!(item.stage, Stage::Coding),
matches!(item.stage, Stage::Coding { .. }),
"story should be in Coding after move_story_to_stage(merge → current): {:?}",
item.stage
);
@@ -974,7 +979,7 @@ fn hotfix_requested_from_done_lands_in_coding() {
};
let result = transition(done, PipelineEvent::HotfixRequested).unwrap();
assert!(
matches!(result, Stage::Coding),
matches!(result, Stage::Coding { .. }),
"Done + HotfixRequested must land in Coding; got: {:?}",
result
);
@@ -984,11 +989,12 @@ fn hotfix_requested_from_done_lands_in_coding() {
fn hotfix_requested_rejected_from_non_done_stages() {
for stage in [
Stage::Backlog,
Stage::Coding,
Stage::Coding { claim: None },
Stage::Qa,
Stage::Merge {
feature_branch: fb("feature/story-1"),
commits_ahead: nz(1),
claim: None,
},
] {
let result = transition(stage.clone(), PipelineEvent::HotfixRequested);
+35 -27
View File
@@ -149,10 +149,10 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
(Upcoming, Triage) => Ok(Backlog),
// ── Forward path ────────────────────────────────────────────────
(Backlog, DepsMet) => Ok(Coding),
(Coding, GatesStarted) => Ok(Qa),
(Backlog, DepsMet) => Ok(Coding { claim: None }),
(Coding { .. }, GatesStarted) => Ok(Qa),
(
Coding,
Coding { .. },
QaSkipped {
feature_branch,
commits_ahead,
@@ -160,6 +160,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
) => Ok(Merge {
feature_branch,
commits_ahead,
claim: None,
}),
(
Qa,
@@ -170,8 +171,9 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
) => Ok(Merge {
feature_branch,
commits_ahead,
claim: None,
}),
(Qa, GatesFailed { .. }) => Ok(Coding),
(Qa, GatesFailed { .. }) => Ok(Coding { claim: None }),
(Merge { .. }, MergeSucceeded { merge_commit }) => Ok(Done {
merged_at: now,
merge_commit,
@@ -193,7 +195,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
// ── Block: any active → Blocked ──────────────────────────────
(Backlog, Block { reason })
| (Coding, Block { reason })
| (Coding { .. }, Block { reason })
| (Qa, Block { reason })
| (Merge { .. }, Block { reason }) => Ok(Blocked { reason }),
@@ -201,18 +203,20 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
// story to `Stage::ReviewHold { resume_to, reason }`, preserving the
// current stage as the resume target so a reviewer can clear the
// hold and continue.
(s @ (Backlog | Coding | Qa | Merge { .. }), PipelineEvent::ReviewHold { reason }) => {
Ok(Stage::ReviewHold {
resume_to: Box::new(s),
reason,
})
}
(
s @ (Backlog | Coding { .. } | Qa | Merge { .. }),
PipelineEvent::ReviewHold { reason },
) => Ok(Stage::ReviewHold {
resume_to: Box::new(s),
reason,
}),
// ── MergeFailed: Merge → MergeFailure (recoverable intermediate) ──
(
Merge {
feature_branch,
commits_ahead,
..
},
MergeFailed { kind },
) => Ok(MergeFailure {
@@ -246,14 +250,14 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
// ── Abandon / supersede from any active or done stage ───────────
(Upcoming, Abandon)
| (Backlog, Abandon)
| (Coding, Abandon)
| (Coding { .. }, Abandon)
| (Qa, Abandon)
| (Merge { .. }, Abandon)
| (Done { .. }, Abandon) => Ok(Abandoned { ts: now }),
(Upcoming, Supersede { by })
| (Backlog, Supersede { by })
| (Coding, Supersede { by })
| (Coding { .. }, Supersede { by })
| (Qa, Supersede { by })
| (Merge { .. }, Supersede { by })
| (Done { .. }, Supersede { by }) => Ok(Superseded {
@@ -263,7 +267,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
// ── Reject from any active stage or QA ──────────────────────────
(Backlog, Reject { reason })
| (Coding, Reject { reason })
| (Coding { .. }, Reject { reason })
| (Qa, Reject { reason })
| (Merge { .. }, Reject { reason }) => Ok(Rejected { ts: now, reason }),
@@ -272,21 +276,24 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
// the backlog while waiting on dependent fixes, without losing it to
// Archived. Unlike `Unblock` (Blocked → Coding), this does not
// re-enter the active flow.
(Coding, Demote) | (Qa, Demote) | (Merge { .. }, Demote) | (Blocked { .. }, Demote) => {
Ok(Backlog)
}
(Coding { .. }, Demote)
| (Qa, Demote)
| (Merge { .. }, Demote)
| (Blocked { .. }, Demote) => Ok(Backlog),
// ── Close: direct completion from any active stage ─────────────
(Backlog, Close) | (Coding, Close) | (Qa, Close) | (Merge { .. }, Close) => Ok(Done {
merged_at: now,
merge_commit: GitSha("closed".to_string()),
}),
(Backlog, Close) | (Coding { .. }, Close) | (Qa, Close) | (Merge { .. }, Close) => {
Ok(Done {
merged_at: now,
merge_commit: GitSha("closed".to_string()),
})
}
// ── Freeze: any non-terminal stage → Frozen { resume_to } ──────
(
s @ (Upcoming
| Backlog
| Coding
| Coding { .. }
| Qa
| Merge { .. }
| Blocked { .. }
@@ -305,7 +312,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
(Stage::ReviewHold { resume_to, .. }, ReviewHoldCleared) => Ok(*resume_to),
// ── FixupRequested: MergeFailure → Coding (coder fixup) ────────
(MergeFailure { .. }, FixupRequested) => Ok(Coding),
(MergeFailure { .. }, FixupRequested) => Ok(Coding { claim: None }),
// ── FixupRequested: MergeFailureFinal → Coding (operator override)
//
@@ -314,19 +321,19 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
// the gate failure is fixable and send the story back for another
// coder attempt. The budget counter is a mergemaster bookkeeping
// detail, not a hard ceiling.
(MergeFailureFinal { .. }, FixupRequested) => Ok(Coding),
(MergeFailureFinal { .. }, FixupRequested) => Ok(Coding { claim: None }),
// ── ReQueuedForQa: MergeFailure → Qa (re-review) ────────────────
(MergeFailure { .. }, ReQueuedForQa) => Ok(Qa),
// ── MergeAborted: Merge → Coding (abort in-flight merge) ─────────
(Merge { .. }, MergeAborted) => Ok(Coding),
(Merge { .. }, MergeAborted) => Ok(Coding { claim: None }),
// ── HotfixRequested: Done → Coding (post-merge hotfix) ───────────
// Allows reopening a completed story so a coder can apply a hotfix.
// A fresh feature branch is forked from master when auto-assign spawns
// the coder.
(Done { .. }, HotfixRequested) => Ok(Coding),
(Done { .. }, HotfixRequested) => Ok(Coding { claim: None }),
// ── MergemasterAttempted: MergeFailure → MergeFailureFinal ─────
(MergeFailure { kind, .. }, MergemasterAttempted) => Ok(MergeFailureFinal { kind }),
@@ -337,7 +344,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
(Stage::ReviewHold { resume_to, .. }, Unblock) => Ok(*resume_to),
// ── Unblock: Blocked → Coding ─────────────────────────────────
(Blocked { .. }, Unblock) => Ok(Coding),
(Blocked { .. }, Unblock) => Ok(Coding { claim: None }),
// ── Unblock MergeFailure → Merge (re-attempt) ────────────────────
// `unblock_story` on a failed merge re-queues it for merge, restoring
@@ -353,6 +360,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
) => Ok(Merge {
feature_branch,
commits_ahead,
claim: None,
}),
// ── Demote MergeFailure → Backlog (manual parking) ───────────────
+33 -7
View File
@@ -109,6 +109,22 @@ impl MergeFailureKind {
}
}
// ── Agent claim payload ────────────────────────────────────────────────────
/// Active claim on a pipeline item by a specific agent.
///
/// Embedded directly in [`Stage::Coding`] and [`Stage::Merge`] rather than
/// stored in separate CRDT registers. Readers access the claim via
/// `item.stage()` rather than through a separate `item.claim()` accessor
/// (story 1009).
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AgentClaim {
/// The agent (e.g. `"coder-1"`) that has claimed this work item.
pub agent: AgentName,
/// When the claim was written.
pub claimed_at: DateTime<Utc>,
}
// ── Synced pipeline stage (lives in CRDT, converges across nodes) ───────────
/// The pipeline stage for a work item.
@@ -146,17 +162,26 @@ pub enum Stage {
Backlog,
/// Story is being actively coded somewhere in the mesh.
Coding,
///
/// Carries an optional [`AgentClaim`] identifying which agent is currently
/// working on this item. `None` means the item is in the coding stage but
/// no agent has claimed it yet (e.g. just transitioned from Backlog and
/// waiting for an agent to pick it up).
Coding { claim: Option<AgentClaim> },
/// Coder has run; gates are running.
Qa,
/// Gates passed; ready to merge.
///
/// `commits_ahead: NonZeroU32` makes "Merge with nothing to merge"
/// structurally impossible (eliminates bug 519).
/// structurally impossible (eliminates bug 519). The optional
/// [`AgentClaim`] carries the mergemaster agent that owns this merge.
Merge {
feature_branch: BranchName,
commits_ahead: NonZeroU32,
/// Agent currently running the merge, or `None` when unclaimed.
claim: Option<AgentClaim>,
},
/// Mergemaster squashed to master. Always carries merge metadata.
@@ -274,7 +299,7 @@ impl Stage {
match s {
"upcoming" => Some(Stage::Upcoming),
"backlog" => Some(Stage::Backlog),
"coding" => Some(Stage::Coding),
"coding" => Some(Stage::Coding { claim: None }),
"blocked" => Some(Stage::Blocked {
reason: String::new(),
}),
@@ -282,6 +307,7 @@ impl Stage {
"merge" => Some(Stage::Merge {
feature_branch: BranchName(String::new()),
commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"),
claim: None,
}),
"merge_failure" => Some(Stage::MergeFailure {
kind: MergeFailureKind::Other(String::new()),
@@ -292,10 +318,10 @@ impl Stage {
kind: MergeFailureKind::Other(String::new()),
}),
"frozen" => Some(Stage::Frozen {
resume_to: Box::new(Stage::Coding),
resume_to: Box::new(Stage::Coding { claim: None }),
}),
"review_hold" => Some(Stage::ReviewHold {
resume_to: Box::new(Stage::Coding),
resume_to: Box::new(Stage::Coding { claim: None }),
reason: String::new(),
}),
"done" => Some(Stage::Done {
@@ -391,7 +417,7 @@ pub fn stage_label(s: &Stage) -> &'static str {
match s {
Stage::Upcoming => "Upcoming",
Stage::Backlog => "Backlog",
Stage::Coding => "Coding",
Stage::Coding { .. } => "Coding",
Stage::Qa => "Qa",
Stage::Merge { .. } => "Merge",
Stage::MergeFailure { .. } => "MergeFailure",
@@ -416,7 +442,7 @@ pub fn stage_dir_name(s: &Stage) -> &'static str {
match s {
Stage::Upcoming => "upcoming",
Stage::Backlog => "backlog",
Stage::Coding => "coding",
Stage::Coding { .. } => "coding",
Stage::Blocked { .. } => "blocked",
Stage::Qa => "qa",
Stage::Merge { .. } => "merge",
+1 -3
View File
@@ -142,7 +142,7 @@ pub fn get_work_item_content(
let stages = [
("1_backlog", Stage::Backlog),
("2_current", Stage::Coding),
("2_current", Stage::Coding { claim: None }),
("3_qa", Stage::Qa),
(
"4_merge",
@@ -317,8 +317,6 @@ max_budget_usd = 5.0
None,
None,
None,
None,
None,
);
let item = get_work_item_content(tmp.path(), "42_story_foo").unwrap();
assert!(item.content.contains("Some content."));
+25 -10
View File
@@ -12,7 +12,7 @@ pub fn stage_display_name(stage: &Stage) -> &'static str {
match stage {
Stage::Upcoming => "Upcoming",
Stage::Backlog => "Backlog",
Stage::Coding => "Current",
Stage::Coding { .. } => "Current",
Stage::Blocked { .. } => "Blocked",
Stage::Qa => "QA",
Stage::Merge { .. } => "Merge",
@@ -253,7 +253,10 @@ mod tests {
#[test]
fn stage_display_name_maps_all_known_stages() {
assert_eq!(stage_display_name(&Stage::Backlog), "Backlog");
assert_eq!(stage_display_name(&Stage::Coding), "Current");
assert_eq!(
stage_display_name(&Stage::Coding { claim: None }),
"Current"
);
assert_eq!(stage_display_name(&Stage::Qa), "QA");
assert_eq!(stage_display_name(&merge_stage()), "Merge");
assert_eq!(stage_display_name(&done_stage()), "Done");
@@ -290,7 +293,7 @@ mod tests {
"42_story_thing",
"Some Story",
&Stage::Backlog,
&Stage::Coding,
&Stage::Coding { claim: None },
);
assert!(!plain.contains("\u{1f389}"));
}
@@ -301,7 +304,7 @@ mod tests {
"261_story_bot_notifications",
"Bot notifications",
&Stage::Upcoming,
&Stage::Coding,
&Stage::Coding { claim: None },
);
assert_eq!(
plain,
@@ -315,8 +318,12 @@ mod tests {
#[test]
fn format_stage_notification_without_story_name_falls_back_to_number() {
let (plain, html) =
format_stage_notification("42_bug_fix_thing", "", &Stage::Coding, &Stage::Qa);
let (plain, html) = format_stage_notification(
"42_bug_fix_thing",
"",
&Stage::Coding { claim: None },
&Stage::Qa,
);
assert_eq!(plain, "#42 \u{2014} Current \u{2192} QA");
assert_eq!(html, "<strong>#42</strong> \u{2014} Current \u{2192} QA");
}
@@ -334,15 +341,23 @@ mod tests {
#[test]
fn format_stage_notification_long_name_is_preserved() {
let long_name = "A".repeat(300);
let (plain, _html) =
format_stage_notification("1_story_long", &long_name, &Stage::Coding, &Stage::Qa);
let (plain, _html) = format_stage_notification(
"1_story_long",
&long_name,
&Stage::Coding { claim: None },
&Stage::Qa,
);
assert!(plain.contains(&long_name));
}
#[test]
fn format_stage_notification_empty_story_name_falls_back_to_number() {
let (plain, html) =
format_stage_notification("42_story_empty", "", &Stage::Coding, &Stage::Qa);
let (plain, html) = format_stage_notification(
"42_story_empty",
"",
&Stage::Coding { claim: None },
&Stage::Qa,
);
assert_eq!(plain, "#42 \u{2014} Current \u{2192} QA");
assert_eq!(html, "<strong>#42</strong> \u{2014} Current \u{2192} QA");
}
+1 -1
View File
@@ -130,7 +130,7 @@ pub async fn handle_timer_command(
let in_valid_stage =
if let Ok(Some(item)) = crate::pipeline_state::read_typed(&story_id) {
use crate::pipeline_state::Stage;
matches!(item.stage, Stage::Backlog | Stage::Coding)
matches!(item.stage, Stage::Backlog | Stage::Coding { .. })
} else {
let work_dir = project_root.join(".huskies").join("work");
work_dir
-4
View File
@@ -67,8 +67,6 @@ mod tests {
None,
None,
None,
None,
None,
);
let tmp = tempfile::tempdir().unwrap();
@@ -108,8 +106,6 @@ mod tests {
None,
None,
None,
None,
None,
);
}
-2
View File
@@ -204,8 +204,6 @@ mod tests {
None,
None,
None,
None,
None,
);
// Seed content store.
+2
View File
@@ -165,6 +165,8 @@ pub(crate) async fn init_subsystems(app_state: &Arc<SessionState>, cwd: &Path) {
}
// Story 987: upgrade four-bool MergeJob entries to typed MergeResult enum.
crdt_state::migrate_merge_job(db_path);
// Story 1009: drop legacy node-hex claims that can't be converted to AgentName.
crdt_state::migrate_node_claims_to_agent_claims();
}
}
}
+1 -1
View File
@@ -315,7 +315,7 @@ mod tests {
let config = empty_config();
let report = run_cleanup_with_lookup(&project_root, &config, true, |id| {
if id == story_id {
Some(Stage::Coding)
Some(Stage::Coding { claim: None })
} else {
None
}
+6 -2
View File
@@ -183,7 +183,9 @@ mod tests {
#[test]
fn should_not_sweep_coding() {
assert!(!worktree_should_be_swept(Some(&Stage::Coding)));
assert!(!worktree_should_be_swept(Some(&Stage::Coding {
claim: None
})));
}
#[test]
@@ -196,6 +198,7 @@ mod tests {
let stage = Stage::Merge {
feature_branch: crate::pipeline_state::BranchName("feature/x".to_string()),
commits_ahead: NonZeroU32::new(1).unwrap(),
claim: None,
};
assert!(!worktree_should_be_swept(Some(&stage)));
}
@@ -305,7 +308,7 @@ mod tests {
let removed = sweep_with_lookup(&project_root, &config, |id| {
if id == story_id {
Some(Stage::Coding)
Some(Stage::Coding { claim: None })
} else {
None
}
@@ -374,6 +377,7 @@ mod tests {
"feature/story-104_merge_story".to_string(),
),
commits_ahead: NonZeroU32::new(1).unwrap(),
claim: None,
})
} else {
None