From 4e007bb7708d5901a53d4227dfa3e1512c57d0c0 Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 13 May 2026 22:50:13 +0000 Subject: [PATCH] huskies: merge 1009 --- server/src/agent_mode/claim.rs | 42 +++++++---- server/src/agent_mode/loop_ops.rs | 32 ++++++--- server/src/agents/lifecycle.rs | 16 ++--- server/src/agents/pool/auto_assign/merge.rs | 1 + .../auto_assign/merge_failure_subscriber.rs | 1 + .../src/agents/pool/auto_assign/pipeline.rs | 2 +- .../src/agents/pool/auto_assign/reconcile.rs | 2 +- server/src/agents/pool/auto_assign/scan.rs | 2 +- .../agents/pool/auto_assign/story_checks.rs | 20 ------ .../watchdog/tests/limits_tests.rs | 6 -- .../src/agents/pool/start/tests_selection.rs | 2 - server/src/agents/pool/start/validation.rs | 2 +- server/src/agents/pool/worktree.rs | 4 +- server/src/agents/pool/worktree_lifecycle.rs | 2 +- server/src/chat/commands/backlog.rs | 12 +++- server/src/chat/commands/freeze.rs | 2 +- server/src/chat/commands/status/render.rs | 2 +- server/src/chat/commands/status/tests.rs | 39 ++++++---- server/src/chat/commands/unblock.rs | 6 -- server/src/chat/transport/matrix/assign.rs | 6 -- server/src/chat/transport/matrix/delete.rs | 4 +- server/src/crdt_snapshot/tests.rs | 16 ----- server/src/crdt_state/mod.rs | 10 +-- server/src/crdt_state/ops.rs | 2 - server/src/crdt_state/presence.rs | 19 +++-- server/src/crdt_state/read.rs | 72 ++++++++++++------- server/src/crdt_state/state/tests.rs | 9 ++- server/src/crdt_state/types.rs | 54 +++++--------- server/src/crdt_state/write/item.rs | 51 +++++++------ server/src/crdt_state/write/migrations.rs | 68 +++++++++++++++--- server/src/crdt_state/write/mod.rs | 2 +- server/src/crdt_state/write/tests.rs | 39 ++-------- server/src/db/mod.rs | 2 - server/src/db/ops.rs | 14 +--- server/src/db/recover.rs | 2 - server/src/http/mcp/diagnostics/mod.rs | 4 +- server/src/http/mcp/merge_tools.rs | 4 -- server/src/http/mcp/status_tools.rs | 19 +++-- server/src/http/mod.rs | 4 +- server/src/http/workflow/pipeline.rs | 6 +- server/src/io/watcher/mod.rs | 2 +- server/src/io/watcher/tests.rs | 6 +- server/src/pipeline_state/events.rs | 3 +- server/src/pipeline_state/mod.rs | 4 +- server/src/pipeline_state/projection.rs | 13 ++-- server/src/pipeline_state/tests.rs | 46 ++++++------ server/src/pipeline_state/transition.rs | 62 +++++++++------- server/src/pipeline_state/types.rs | 40 +++++++++-- server/src/service/agents/mod.rs | 4 +- server/src/service/notifications/format.rs | 35 ++++++--- server/src/service/timer/mod.rs | 2 +- server/src/service/work_item/assign.rs | 4 -- server/src/service/work_item/delete.rs | 2 - server/src/startup/project.rs | 2 + server/src/worktree/cleanup.rs | 2 +- server/src/worktree/sweep.rs | 8 ++- 56 files changed, 453 insertions(+), 384 deletions(-) diff --git a/server/src/agent_mode/claim.rs b/server/src/agent_mode/claim.rs index 922e9386..13450fea 100644 --- a/server/src/agent_mode/claim.rs +++ b/server/src/agent_mode/claim.rs @@ -78,39 +78,52 @@ mod tests { #[test] #[allow(clippy::string_slice)] // stale_holder is a hex/ASCII string literal; [..12] always valid fn stale_claim_displaced_and_logged() { - use crate::crdt_state::{ - init_for_test, our_node_id, read_item, write_claim, write_item_str, - }; + use crate::crdt_state::{init_for_test, our_node_id, read_item, write_claim, write_item}; + use crate::pipeline_state::{AgentClaim, AgentName, Stage}; + use chrono::TimeZone; init_for_test(); let story_id = "718_test_stale_displacement"; let stale_holder = "staledeadbeef0000000000000000000000000000"; // Place claimed_at well beyond the TTL so the claim is unambiguously stale. - let stale_time = chrono::Utc::now().timestamp() as f64 - CLAIM_TIMEOUT_SECS - 300.0; + let stale_time = chrono::Utc::now().timestamp() as u64 - CLAIM_TIMEOUT_SECS as u64 - 300; // Seed the story with a stale claim from a foreign node. - write_item_str( + write_item( story_id, - "2_current", + &Stage::Coding { + claim: Some(AgentClaim { + agent: AgentName(stale_holder.to_string()), + claimed_at: chrono::Utc + .timestamp_opt(stale_time as i64, 0) + .single() + .unwrap(), + }), + }, Some("Stale Claim Displacement Test"), None, None, None, - Some(stale_holder), - Some(stale_time), None, ); // Confirm the stale claim is in place. let before = read_item(story_id).expect("item should exist"); + let before_claim = match before.stage() { + Stage::Coding { claim } => claim.as_ref(), + Stage::Merge { claim, .. } => claim.as_ref(), + _ => None, + }; assert_eq!( - before.claim().map(|c| c.node.as_str()), + before_claim.map(|c| c.agent.0.as_str()), Some(stale_holder), "pre-condition: item should be claimed by the stale holder" ); let age = chrono::Utc::now().timestamp() as f64 - - before.claim().map(|c| c.at as f64).unwrap_or(0.0); + - before_claim + .map(|c| c.claimed_at.timestamp() as f64) + .unwrap_or(0.0); assert!( age >= CLAIM_TIMEOUT_SECS, "pre-condition: claim age ({age}s) must exceed TTL ({CLAIM_TIMEOUT_SECS}s)" @@ -136,13 +149,18 @@ mod tests { // Verify the new claim belongs to this node, not the stale holder. let our_id = our_node_id().expect("node id should be available after init_for_test"); let after = read_item(story_id).expect("item should still exist"); + let after_claim = match after.stage() { + Stage::Coding { claim } => claim.as_ref(), + Stage::Merge { claim, .. } => claim.as_ref(), + _ => None, + }; assert_eq!( - after.claim().map(|c| c.node.as_str()), + after_claim.map(|c| c.agent.0.as_str()), Some(our_id.as_str()), "new claim should have displaced the stale holder" ); assert_ne!( - after.claim().map(|c| c.node.as_str()), + after_claim.map(|c| c.agent.0.as_str()), Some(stale_holder), "stale holder must no longer own the claim" ); diff --git a/server/src/agent_mode/loop_ops.rs b/server/src/agent_mode/loop_ops.rs index 6e18e005..273afaa1 100644 --- a/server/src/agent_mode/loop_ops.rs +++ b/server/src/agent_mode/loop_ops.rs @@ -44,7 +44,7 @@ pub(super) async fn scan_and_claim( // Only claim stories in execution stages (Coding, Qa, Merge). if !matches!( item.stage(), - crate::pipeline_state::Stage::Coding + crate::pipeline_state::Stage::Coding { .. } | crate::pipeline_state::Stage::Qa | crate::pipeline_state::Stage::Merge { .. } ) { @@ -65,19 +65,25 @@ pub(super) async fn scan_and_claim( continue; } + let item_claim = match item.stage() { + crate::pipeline_state::Stage::Coding { claim } => claim.as_ref(), + crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(), + _ => None, + }; + // If already claimed by us, skip. - if item.claim().is_some_and(|c| c.node == our_node) { + if item_claim.is_some_and(|c| c.agent.0 == our_node) { continue; } // If claimed by another node, respect the claim while it is fresh. // Once the TTL expires the claim is considered stale regardless of // whether the holder appears alive — displacement is purely TTL-driven. - if let Some(claim) = item.claim() - && claim.node != our_node + if let Some(claim) = item_claim + && claim.agent.0 != our_node { let now = chrono::Utc::now().timestamp() as u64; - let age = now.saturating_sub(claim.at) as f64; + let age = now.saturating_sub(claim.claimed_at.timestamp() as u64) as f64; if age < CLAIM_TIMEOUT_SECS { // Claim is still fresh — respect it. continue; @@ -87,7 +93,7 @@ pub(super) async fn scan_and_claim( "[agent-mode] Displacing stale claim on '{}' held by {:.12}… \ (age {}s > TTL {}s)", item.story_id(), - claim.node, + claim.agent.0, age as u64, CLAIM_TIMEOUT_SECS as u64, ); @@ -179,7 +185,7 @@ pub(super) fn reclaim_timed_out_work(_project_root: &Path) { for item in &items { if !matches!( item.stage(), - crate::pipeline_state::Stage::Coding + crate::pipeline_state::Stage::Coding { .. } | crate::pipeline_state::Stage::Qa | crate::pipeline_state::Stage::Merge { .. } ) { @@ -189,13 +195,19 @@ pub(super) fn reclaim_timed_out_work(_project_root: &Path) { // Release the claim if the TTL has expired — regardless of whether the // holder is still alive. A node actively working should refresh its // claim before the TTL window closes. - if let Some(claim) = item.claim() { - let age = now as u64 - claim.at.min(now as u64); + let reclaim_claim = match item.stage() { + crate::pipeline_state::Stage::Coding { claim } => claim.as_ref(), + crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(), + _ => None, + }; + if let Some(claim) = reclaim_claim { + let claim_ts = claim.claimed_at.timestamp() as u64; + let age = now as u64 - claim_ts.min(now as u64); if age as f64 >= CLAIM_TIMEOUT_SECS { slog!( "[agent-mode] Releasing stale claim on '{}' held by {:.12}… (age {}s)", item.story_id(), - claim.node, + claim.agent.0, age, ); crdt_state::release_claim(item.story_id()); diff --git a/server/src/agents/lifecycle.rs b/server/src/agents/lifecycle.rs index 0e42e2f2..f44f4e3a 100644 --- a/server/src/agents/lifecycle.rs +++ b/server/src/agents/lifecycle.rs @@ -122,7 +122,7 @@ pub fn move_story_to_done(story_id: &str) -> Result<(), String> { merge_commit: GitSha("accepted".to_string()), }, Stage::MergeFailure { .. } => PipelineEvent::Accepted, - Stage::Coding | Stage::Qa | Stage::Backlog => PipelineEvent::Close, + Stage::Coding { .. } | Stage::Qa | Stage::Backlog => PipelineEvent::Close, _ => { return Err(format!( "Work item '{story_id}' is in {} — cannot move to done.", @@ -160,7 +160,7 @@ pub fn move_story_to_merge(story_id: &str) -> Result<(), String> { let commits = NonZeroU32::new(1).expect("1 is non-zero"); let event = match &item.stage { - Stage::Coding => PipelineEvent::QaSkipped { + Stage::Coding { .. } => PipelineEvent::QaSkipped { feature_branch: branch, commits_ahead: commits, }, @@ -383,8 +383,8 @@ fn map_stage_move_to_event( match (from, target) { (Stage::Upcoming, "backlog") => Ok(PipelineEvent::Triage), (Stage::Backlog, "current") => Ok(PipelineEvent::DepsMet), - (Stage::Coding, "qa") => Ok(PipelineEvent::GatesStarted), - (Stage::Coding, "merge") => Ok(PipelineEvent::QaSkipped { + (Stage::Coding { .. }, "qa") => Ok(PipelineEvent::GatesStarted), + (Stage::Coding { .. }, "merge") => Ok(PipelineEvent::QaSkipped { feature_branch: branch(), commits_ahead: nz1(), }), @@ -392,7 +392,7 @@ fn map_stage_move_to_event( feature_branch: branch(), commits_ahead: nz1(), }), - (Stage::Coding, "backlog") + (Stage::Coding { .. }, "backlog") | (Stage::Qa, "backlog") | (Stage::Merge { .. }, "backlog") | (Stage::Blocked { .. }, "backlog") => Ok(PipelineEvent::Demote), @@ -402,7 +402,7 @@ fn map_stage_move_to_event( (Stage::Merge { .. }, "done") => Ok(PipelineEvent::MergeSucceeded { merge_commit: GitSha("manual".to_string()), }), - (Stage::Coding | Stage::Qa | Stage::Backlog, "done") => Ok(PipelineEvent::Close), + (Stage::Coding { .. } | Stage::Qa | Stage::Backlog, "done") => Ok(PipelineEvent::Close), (Stage::Blocked { .. }, "current") => Ok(PipelineEvent::Unblock), // Story 919: MergeFailure + Unblock goes to Merge (re-attempt); manual // demotion to backlog uses Demote to park it without a retry. @@ -530,7 +530,7 @@ fn stage_to_name(s: &Stage) -> &'static str { match s { Stage::Upcoming => "upcoming", Stage::Backlog => "backlog", - Stage::Coding => "current", + Stage::Coding { .. } => "current", Stage::Blocked { .. } => "blocked", Stage::Qa => "qa", Stage::Merge { .. } => "merge", @@ -714,7 +714,7 @@ mod tests { "should return to coding after unblock" ); assert!( - matches!(item.stage, Stage::Coding), + matches!(item.stage, Stage::Coding { .. }), "stage should be Stage::Coding after unblock" ); } diff --git a/server/src/agents/pool/auto_assign/merge.rs b/server/src/agents/pool/auto_assign/merge.rs index 6091b199..c5d88446 100644 --- a/server/src/agents/pool/auto_assign/merge.rs +++ b/server/src/agents/pool/auto_assign/merge.rs @@ -38,6 +38,7 @@ impl AgentPool { let merge_stage = Stage::Merge { feature_branch: BranchName(String::new()), commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), + claim: None, }; let merge_items = scan_stage_items(&merge_stage); for story_id in &merge_items { diff --git a/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs b/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs index d57bafd9..79498048 100644 --- a/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs +++ b/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs @@ -160,6 +160,7 @@ mod tests { before: crate::pipeline_state::Stage::Merge { feature_branch: BranchName("feature/test".to_string()), commits_ahead: NonZeroU32::new(1).unwrap(), + claim: None, }, after: crate::pipeline_state::Stage::MergeFailure { kind: kind.clone(), diff --git a/server/src/agents/pool/auto_assign/pipeline.rs b/server/src/agents/pool/auto_assign/pipeline.rs index 0aa534e5..a78bf70b 100644 --- a/server/src/agents/pool/auto_assign/pipeline.rs +++ b/server/src/agents/pool/auto_assign/pipeline.rs @@ -27,7 +27,7 @@ impl AgentPool { /// here as well. pub(super) async fn assign_pipeline_stages(&self, project_root: &Path, config: &ProjectConfig) { let stages: [(Stage, PipelineStage); 2] = [ - (Stage::Coding, PipelineStage::Coder), + (Stage::Coding { claim: None }, PipelineStage::Coder), (Stage::Qa, PipelineStage::Qa), ]; diff --git a/server/src/agents/pool/auto_assign/reconcile.rs b/server/src/agents/pool/auto_assign/reconcile.rs index db509a00..b38398ed 100644 --- a/server/src/agents/pool/auto_assign/reconcile.rs +++ b/server/src/agents/pool/auto_assign/reconcile.rs @@ -149,7 +149,7 @@ impl AgentPool { stage.dir_name() ); - if matches!(stage, Stage::Coding) { + if matches!(stage, Stage::Coding { .. }) { // Coder stage — determine qa mode to decide next step. let qa_mode = { let item_type = crate::agents::lifecycle::item_type_from_id(story_id); diff --git a/server/src/agents/pool/auto_assign/scan.rs b/server/src/agents/pool/auto_assign/scan.rs index 5ec709e8..0faa2273 100644 --- a/server/src/agents/pool/auto_assign/scan.rs +++ b/server/src/agents/pool/auto_assign/scan.rs @@ -220,7 +220,7 @@ mod tests { crate::db::ItemMeta::named("baz"), ); - let items = scan_stage_items(&Stage::Coding); + let items = scan_stage_items(&Stage::Coding { claim: None }); // The global CRDT may contain items from other tests, so check // that our three items are present and appear in sorted order. assert!( diff --git a/server/src/agents/pool/auto_assign/story_checks.rs b/server/src/agents/pool/auto_assign/story_checks.rs index 49066e98..b9385cb1 100644 --- a/server/src/agents/pool/auto_assign/story_checks.rs +++ b/server/src/agents/pool/auto_assign/story_checks.rs @@ -97,8 +97,6 @@ mod tests { None, None, None, - None, - None, ); assert!(has_review_hold("890_spike_held")); } @@ -115,8 +113,6 @@ mod tests { None, None, None, - None, - None, ); assert!(!has_review_hold("890_spike_active_qa")); } @@ -193,8 +189,6 @@ mod tests { None, Some("[999]"), None, - None, - None, ); assert!(has_unmet_dependencies("10_story_blocked")); } @@ -210,8 +204,6 @@ mod tests { None, None, None, - None, - None, ); crate::crdt_state::write_item_str( "10_story_ok", @@ -221,8 +213,6 @@ mod tests { None, Some("[999]"), None, - None, - None, ); assert!(!has_unmet_dependencies("10_story_ok")); } @@ -238,8 +228,6 @@ mod tests { None, None, None, - None, - None, ); assert!(!has_unmet_dependencies("5_story_free")); } @@ -258,8 +246,6 @@ mod tests { None, None, None, - None, - None, ); crate::crdt_state::write_item_str( "503_story_dependent", @@ -269,8 +255,6 @@ mod tests { None, Some("[500]"), None, - None, - None, ); let archived_deps = check_archived_dependencies("503_story_dependent"); assert_eq!(archived_deps, vec![500]); @@ -288,8 +272,6 @@ mod tests { None, None, None, - None, - None, ); crate::crdt_state::write_item_str( "503_story_waiting", @@ -299,8 +281,6 @@ mod tests { None, Some("[490]"), None, - None, - None, ); let archived_deps = check_archived_dependencies("503_story_waiting"); assert!(archived_deps.is_empty()); diff --git a/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs b/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs index 308ec76f..c0447c22 100644 --- a/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs +++ b/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs @@ -251,8 +251,6 @@ max_turns = 10 None, None, None, - None, - None, ); // 12 turns in a single session exceeds the configured max of 10. @@ -381,8 +379,6 @@ max_turns = 10 None, None, None, - None, - None, ); // Prior session with 5 turns (under limit alone). @@ -460,8 +456,6 @@ max_turns = 10 None, None, None, - None, - None, ); // Session 1: exceeds limit → retry_count=1 in CRDT, NOT blocked. diff --git a/server/src/agents/pool/start/tests_selection.rs b/server/src/agents/pool/start/tests_selection.rs index 73c01684..214fac3f 100644 --- a/server/src/agents/pool/start/tests_selection.rs +++ b/server/src/agents/pool/start/tests_selection.rs @@ -294,8 +294,6 @@ stage = "coder" None, None, None, - None, - None, ); let pool = AgentPool::new_test(3011); diff --git a/server/src/agents/pool/start/validation.rs b/server/src/agents/pool/start/validation.rs index 46333f5b..a2748a6d 100644 --- a/server/src/agents/pool/start/validation.rs +++ b/server/src/agents/pool/start/validation.rs @@ -35,7 +35,7 @@ pub(super) fn validate_agent_stage( return Ok(()); }; let expected_stage = match story_stage { - Stage::Coding => PipelineStage::Coder, + Stage::Coding { .. } => PipelineStage::Coder, Stage::Qa => PipelineStage::Qa, Stage::Merge { .. } => PipelineStage::Mergemaster, _ => PipelineStage::Other, diff --git a/server/src/agents/pool/worktree.rs b/server/src/agents/pool/worktree.rs index 5787ac70..cab9bb03 100644 --- a/server/src/agents/pool/worktree.rs +++ b/server/src/agents/pool/worktree.rs @@ -30,7 +30,7 @@ pub(super) fn find_active_story_stage( if let Ok(Some(item)) = crate::pipeline_state::read_typed(story_id) && matches!( item.stage, - crate::pipeline_state::Stage::Coding + crate::pipeline_state::Stage::Coding { .. } | crate::pipeline_state::Stage::Qa | crate::pipeline_state::Stage::Merge { .. } ) @@ -56,7 +56,7 @@ mod tests { let tmp = tempfile::tempdir().unwrap(); assert!(matches!( find_active_story_stage(tmp.path(), "10_story_test"), - Some(crate::pipeline_state::Stage::Coding) + Some(crate::pipeline_state::Stage::Coding { .. }) )); } diff --git a/server/src/agents/pool/worktree_lifecycle.rs b/server/src/agents/pool/worktree_lifecycle.rs index fa4236ae..8dbe1070 100644 --- a/server/src/agents/pool/worktree_lifecycle.rs +++ b/server/src/agents/pool/worktree_lifecycle.rs @@ -22,7 +22,7 @@ pub(crate) fn spawn_worktree_create_subscriber(project_root: PathBuf, port: u16) loop { match rx.recv().await { Ok(fired) => { - if matches!(fired.after, Stage::Coding) { + if matches!(fired.after, Stage::Coding { .. }) { on_coding_transition(&project_root, port, &fired.story_id.0).await; } } diff --git a/server/src/chat/commands/backlog.rs b/server/src/chat/commands/backlog.rs index ebd89b38..5c77d997 100644 --- a/server/src/chat/commands/backlog.rs +++ b/server/src/chat/commands/backlog.rs @@ -90,7 +90,11 @@ mod tests { fn backlog_shows_only_backlog_stage_items() { let items = vec![ make_item("10_story_in_backlog", "In Backlog", Stage::Backlog), - make_item("20_story_in_progress", "In Progress", Stage::Coding), + make_item( + "20_story_in_progress", + "In Progress", + Stage::Coding { claim: None }, + ), make_item("30_story_in_qa", "In QA", Stage::Qa), ]; let output = build_backlog_from_items(&items); @@ -227,7 +231,11 @@ mod tests { #[test] fn backlog_shows_none_when_empty() { - let items = vec![make_item("1_story_done", "Done", Stage::Coding)]; + let items = vec![make_item( + "1_story_done", + "Done", + Stage::Coding { claim: None }, + )]; let output = build_backlog_from_items(&items); assert!( output.contains("*(none)*"), diff --git a/server/src/chat/commands/freeze.rs b/server/src/chat/commands/freeze.rs index 02b801be..3dca2b07 100644 --- a/server/src/chat/commands/freeze.rs +++ b/server/src/chat/commands/freeze.rs @@ -239,7 +239,7 @@ mod tests { .expect("read_typed should succeed") .expect("item should be present"); assert!( - matches!(item.stage, crate::pipeline_state::Stage::Coding), + matches!(item.stage, crate::pipeline_state::Stage::Coding { .. }), "stage should be restored to Coding: {:?}", item.stage ); diff --git a/server/src/chat/commands/status/render.rs b/server/src/chat/commands/status/render.rs index 445e85c4..05cb6ec2 100644 --- a/server/src/chat/commands/status/render.rs +++ b/server/src/chat/commands/status/render.rs @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet}; pub(crate) fn display_section(s: &Stage) -> Option<&'static str> { match s { Stage::Upcoming | Stage::Backlog => Some("Backlog"), - Stage::Coding + Stage::Coding { .. } | Stage::Blocked { .. } | Stage::Archived { reason: ArchiveReason::Blocked { .. }, diff --git a/server/src/chat/commands/status/tests.rs b/server/src/chat/commands/status/tests.rs index 5b7632e6..04e101a9 100644 --- a/server/src/chat/commands/status/tests.rs +++ b/server/src/chat/commands/status/tests.rs @@ -137,7 +137,7 @@ fn status_does_not_show_full_filename_stem() { let items = vec![make_item( "293_story_register_all_bot_commands", "Register all bot commands", - Stage::Coding, + Stage::Coding { claim: None }, )]; let agents = AgentPool::new_test(3000); @@ -163,7 +163,7 @@ fn status_shows_cost_when_token_usage_exists() { let items = vec![make_item( "293_story_register_all_bot_commands", "Register all bot commands", - Stage::Coding, + Stage::Coding { claim: None }, )]; // Write token usage for this story. @@ -199,7 +199,7 @@ fn status_no_cost_when_no_usage() { let items = vec![make_item( "293_story_register_all_bot_commands", "Register all bot commands", - Stage::Coding, + Stage::Coding { claim: None }, )]; let agents = AgentPool::new_test(3000); @@ -219,7 +219,7 @@ fn status_aggregates_multiple_records_per_story() { let items = vec![make_item( "293_story_register_all_bot_commands", "Register all bot commands", - Stage::Coding, + Stage::Coding { claim: None }, )]; // Write two records for the same story — costs should be summed. @@ -262,7 +262,7 @@ fn status_shows_waiting_on_for_story_with_unmet_deps() { make_item_with_deps( "10_story_waiting", "Waiting Story", - Stage::Coding, + Stage::Coding { claim: None }, vec![999], ), make_item("999_story_dep", "Dep Story", Stage::Backlog), @@ -287,7 +287,7 @@ fn status_does_not_show_waiting_on_when_dep_is_done() { make_item_with_deps( "10_story_unblocked", "Unblocked Story", - Stage::Coding, + Stage::Coding { claim: None }, vec![999], ), make_item( @@ -314,7 +314,11 @@ fn status_shows_no_waiting_info_when_no_deps() { use tempfile::TempDir; let tmp = TempDir::new().unwrap(); - let items = vec![make_item("42_story_nodeps", "No Deps Story", Stage::Coding)]; + let items = vec![make_item( + "42_story_nodeps", + "No Deps Story", + Stage::Coding { claim: None }, + )]; let agents = AgentPool::new_test(3000); let output = build_status_from_items(tmp.path(), &agents, &items); @@ -377,7 +381,7 @@ fn stage_is_blocked_returns_true_for_archived_blocked() { #[test] fn stage_is_blocked_returns_false_for_coding() { assert!(!matches!( - Stage::Coding, + Stage::Coding { claim: None }, Stage::Blocked { .. } | Stage::MergeFailure { .. } | Stage::MergeFailureFinal { .. } @@ -413,7 +417,11 @@ fn status_shows_idle_dot_for_unassigned_story() { use tempfile::TempDir; let tmp = TempDir::new().unwrap(); - let items = vec![make_item("42_story_idle", "Idle Story", Stage::Coding)]; + let items = vec![make_item( + "42_story_idle", + "Idle Story", + Stage::Coding { claim: None }, + )]; let agents = AgentPool::new_test(3000); let output = build_status_from_items(tmp.path(), &agents, &items); @@ -503,6 +511,7 @@ fn merge_stage() -> Stage { Stage::Merge { feature_branch: BranchName("feature/test".to_string()), commits_ahead: std::num::NonZeroU32::new(1).unwrap(), + claim: None, } } @@ -779,7 +788,11 @@ fn in_progress_count_includes_blocked_items() { let tmp = TempDir::new().unwrap(); let items = vec![ - make_item("10_story_coding", "Coding Story", Stage::Coding), + make_item( + "10_story_coding", + "Coding Story", + Stage::Coding { claim: None }, + ), make_item( "11_story_blocked", "Blocked Story", @@ -810,7 +823,7 @@ fn frozen_coding_item_appears_in_in_progress_section() { "60_story_frozen", "Frozen Coding Story", Stage::Frozen { - resume_to: Box::new(Stage::Coding), + resume_to: Box::new(Stage::Coding { claim: None }), }, )]; @@ -868,7 +881,7 @@ fn frozen_item_shows_snowflake_indicator() { "80_story_frozen_flake", "Frozen Flake Story", Stage::Frozen { - resume_to: Box::new(Stage::Coding), + resume_to: Box::new(Stage::Coding { claim: None }), }, )]; @@ -898,7 +911,7 @@ fn frozen_and_blocked_use_distinct_indicators() { "91_story_frozen_ind", "Frozen Story", Stage::Frozen { - resume_to: Box::new(Stage::Coding), + resume_to: Box::new(Stage::Coding { claim: None }), }, ), ]; diff --git a/server/src/chat/commands/unblock.rs b/server/src/chat/commands/unblock.rs index b8940a4c..07a31c12 100644 --- a/server/src/chat/commands/unblock.rs +++ b/server/src/chat/commands/unblock.rs @@ -221,8 +221,6 @@ mod tests { Some(5), None, None, - None, - None, ); let output = unblock_cmd_with_root(tmp.path(), "9903").unwrap(); @@ -299,8 +297,6 @@ mod tests { Some(5), None, None, - None, - None, ); let output = unblock_cmd_with_root(tmp.path(), "9904").unwrap(); @@ -358,8 +354,6 @@ mod tests { Some(3), None, None, - None, - None, ); let output = unblock_cmd_with_root(tmp.path(), "9901").unwrap(); diff --git a/server/src/chat/transport/matrix/assign.rs b/server/src/chat/transport/matrix/assign.rs index 78bd635c..3d742918 100644 --- a/server/src/chat/transport/matrix/assign.rs +++ b/server/src/chat/transport/matrix/assign.rs @@ -324,8 +324,6 @@ mod tests { None, None, None, - None, - None, ); let agents = std::sync::Arc::new(AgentPool::new_test(3000)); @@ -379,8 +377,6 @@ mod tests { None, None, None, - None, - None, ); let agents = std::sync::Arc::new(AgentPool::new_test(3000)); @@ -429,8 +425,6 @@ mod tests { None, None, None, - None, - None, ); let agents = std::sync::Arc::new(AgentPool::new_test(3000)); diff --git a/server/src/chat/transport/matrix/delete.rs b/server/src/chat/transport/matrix/delete.rs index a9404dc3..d3cfb8b4 100644 --- a/server/src/chat/transport/matrix/delete.rs +++ b/server/src/chat/transport/matrix/delete.rs @@ -108,7 +108,7 @@ fn stage_display_label(stage: &crate::pipeline_state::Stage) -> &'static str { match stage { Stage::Upcoming => "upcoming", Stage::Backlog => "backlog", - Stage::Coding => "in-progress", + Stage::Coding { .. } => "in-progress", Stage::Blocked { .. } => "blocked", Stage::Qa => "QA", Stage::Merge { .. } => "merge", @@ -254,8 +254,6 @@ mod tests { None, None, None, - None, - None, ); // Seed in content store so find_story_by_number can resolve it. diff --git a/server/src/crdt_snapshot/tests.rs b/server/src/crdt_snapshot/tests.rs index 0a94a9ad..64d7637a 100644 --- a/server/src/crdt_snapshot/tests.rs +++ b/server/src/crdt_snapshot/tests.rs @@ -239,8 +239,6 @@ fn snapshot_generation_includes_manifest() { None, None, None, - None, - None, ); crate::crdt_state::write_item_str( "636_test_b", @@ -250,8 +248,6 @@ fn snapshot_generation_includes_manifest() { None, None, None, - None, - None, ); let snapshot = generate_snapshot(); @@ -282,8 +278,6 @@ fn attribution_query_by_story_id() { None, None, None, - None, - None, ); let snapshot = generate_snapshot().unwrap(); @@ -319,8 +313,6 @@ fn compaction_reduces_ops() { None, None, None, - None, - None, ); } @@ -357,8 +349,6 @@ fn latest_snapshot_available_after_compaction() { None, None, None, - None, - None, ); let snapshot = generate_snapshot().unwrap(); @@ -629,8 +619,6 @@ fn attribution_preserved_after_compaction() { None, None, None, - None, - None, ); crate::crdt_state::write_item_str( "636_archived_story", @@ -640,8 +628,6 @@ fn attribution_preserved_after_compaction() { None, None, None, - None, - None, ); crate::crdt_state::write_item_str( "636_archived_story", @@ -651,8 +637,6 @@ fn attribution_preserved_after_compaction() { None, None, None, - None, - None, ); // Generate snapshot. diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index d9d315e3..c62c1300 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -47,16 +47,16 @@ pub use read::{ }; pub use state::{init, subscribe}; pub use types::{ - ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, Claim, CrdtEvent, - EpicId, GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, MergeJobView, + ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent, EpicId, + GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, MergeJobView, NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem, }; pub use write::{ bump_retry_count, migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs, - migrate_story_ids_to_numeric, name_from_story_id, set_agent, set_depends_on, set_epic, - set_item_type, set_name, set_qa_mode, set_resume_to, set_resume_to_raw, set_retry_count, - write_item, + migrate_node_claims_to_agent_claims, migrate_story_ids_to_numeric, name_from_story_id, + set_agent, set_depends_on, set_epic, set_item_type, set_name, set_qa_mode, set_resume_to, + set_resume_to_raw, set_retry_count, write_item, }; #[cfg(test)] diff --git a/server/src/crdt_state/ops.rs b/server/src/crdt_state/ops.rs index 4b2c6e0f..a6669d2d 100644 --- a/server/src/crdt_state/ops.rs +++ b/server/src/crdt_state/ops.rs @@ -550,8 +550,6 @@ mod tests { None, None, None, - None, - None, ); assert!( read_item(story_id).is_none(), diff --git a/server/src/crdt_state/presence.rs b/server/src/crdt_state/presence.rs index 44c93cad..761b0c7f 100644 --- a/server/src/crdt_state/presence.rs +++ b/server/src/crdt_state/presence.rs @@ -51,7 +51,7 @@ pub fn sign_versioned_challenge(nonce: &str) -> Option<(String, String)> { /// Write a claim on a pipeline item via CRDT. /// -/// Sets `claimed_by` to this node's ID and `claimed_at` to the current time. +/// Sets `claim_agent` to this node's ID and `claim_ts` to the current time. /// The LWW register ensures deterministic conflict resolution — if two nodes /// claim the same item simultaneously, both will converge to the same winner /// after CRDT sync. @@ -76,14 +76,14 @@ pub fn write_claim(story_id: &str) -> bool { }; apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].claimed_by.set(node_id.clone()) + s.crdt.doc.items[idx].claim_agent.set(node_id.clone()) }); - apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(now)); + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claim_ts.set(now)); true } -/// Release a claim on a pipeline item (clear claimed_by and claimed_at). +/// Release a claim on a pipeline item (clear claim_agent and claim_ts). pub fn release_claim(story_id: &str) { let Some(state_mutex) = get_crdt() else { return; @@ -96,9 +96,9 @@ pub fn release_claim(story_id: &str) { }; apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].claimed_by.set(String::new()) + s.crdt.doc.items[idx].claim_agent.set(String::new()) }); - apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(0.0)); + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claim_ts.set(0.0)); } /// Check if this node currently holds the claim on a pipeline item. @@ -109,7 +109,12 @@ pub fn is_claimed_by_us(story_id: &str) -> bool { let Some(item) = read_item(story_id) else { return false; }; - item.claim().is_some_and(|c| c.node == node_id) + let claim = match item.stage() { + crate::pipeline_state::Stage::Coding { claim } => claim.as_ref(), + crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(), + _ => None, + }; + claim.is_some_and(|c| c.agent.0 == node_id) } /// Write or update a node presence entry in the CRDT. diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index 68daf7b1..c049da90 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -22,8 +22,10 @@ pub struct CrdtItemDump { pub agent: Option, pub retry_count: Option, pub depends_on: Option>, - pub claimed_by: Option, - pub claimed_at: Option, + /// Agent name holding the claim, or `None` when unclaimed. + pub claim_agent: Option, + /// Unix timestamp (seconds) when the claim was written. + pub claim_ts: Option, /// Hex-encoded OpId of the list insert op — cross-reference with `crdt_ops`. pub content_index: String, pub is_deleted: bool, @@ -139,11 +141,11 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump { _ => None, }; - let claimed_by = match item_crdt.claimed_by.view() { + let claim_agent = match item_crdt.claim_agent.view() { JsonValue::String(s) if !s.is_empty() => Some(s), _ => None, }; - let claimed_at = match item_crdt.claimed_at.view() { + let claim_ts = match item_crdt.claim_ts.view() { JsonValue::Number(n) if n > 0.0 => Some(n), _ => None, }; @@ -157,8 +159,8 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump { agent, retry_count, depends_on, - claimed_by, - claimed_at, + claim_agent, + claim_ts, content_index, is_deleted: op.is_deleted, }); @@ -326,7 +328,7 @@ pub fn evict_item(story_id: &str) -> Result<(), String> { /// string, or with no name set, are filtered out (`None`) — a nameless item /// is treated as malformed and never surfaces to callers. pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option { - use super::types::{Claim, EpicId}; + use super::types::EpicId; use crate::io::story_metadata::{ItemType, QaMode}; let story_id = match item.story_id.view() { @@ -357,18 +359,17 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option Vec::new(), }; - let claimed_by = match item.claimed_by.view() { + // `claim_agent`/`claim_ts` are read only to embed in Stage::Coding / + // Stage::Merge via `project_stage_for_view`; they are not stored on + // `WorkItem` directly (story 1009: readers project from the Stage variant). + let claim_agent = match item.claim_agent.view() { JsonValue::String(s) if !s.is_empty() => Some(s), _ => None, }; - let claimed_at_secs = match item.claimed_at.view() { + let claim_ts_secs = match item.claim_ts.view() { JsonValue::Number(n) if n > 0.0 => Some(n as u64), _ => None, }; - let claim = match (claimed_by, claimed_at_secs) { - (Some(node), Some(at)) => Some(Claim { node, at }), - _ => None, - }; // `merged_at` is read only to project into `Stage::Done`; it is not // stored on `WorkItem` (callers access it via `Stage::Done { merged_at }`). @@ -397,8 +398,14 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option None, }; - let stage = - project_stage_for_view(&stage_str, &story_id, merged_at_float, resume_to.as_deref())?; + let stage = project_stage_for_view( + &stage_str, + &story_id, + merged_at_float, + resume_to.as_deref(), + claim_agent.as_deref(), + claim_ts_secs, + )?; Some(PipelineItemView { story_id, @@ -407,7 +414,6 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option, resume_to: Option<&str>, + claim_agent: Option<&str>, + claim_ts_secs: Option, ) -> Option { - use crate::pipeline_state::{ArchiveReason, BranchName, GitSha, Stage}; - use chrono::{DateTime, Utc}; + use crate::pipeline_state::{AgentClaim, AgentName, ArchiveReason, BranchName, GitSha, Stage}; + use chrono::{DateTime, TimeZone, Utc}; use std::num::NonZeroU32; // Normalise legacy directory-style strings to their clean wire form so @@ -458,13 +466,30 @@ fn project_stage_for_view( // Story 945: resume target for `Frozen` / `ReviewHold` variants is stored // in the sibling `resume_to` register. Fall back to `Coding` when the // register is empty or holds an unrecognised value. - let resume_target = - || -> Box { Box::new(resume_to.and_then(Stage::from_dir).unwrap_or(Stage::Coding)) }; + let resume_target = || -> Box { + Box::new( + resume_to + .and_then(Stage::from_dir) + .unwrap_or(Stage::Coding { claim: None }), + ) + }; + + // Story 1009: reconstruct AgentClaim from `claim_agent`/`claim_ts` registers. + let claim = match (claim_agent, claim_ts_secs) { + (Some(agent_str), Some(ts)) => Some(AgentClaim { + agent: AgentName(agent_str.to_string()), + claimed_at: Utc + .timestamp_opt(ts as i64, 0) + .single() + .unwrap_or(DateTime::::UNIX_EPOCH), + }), + _ => None, + }; match clean { "upcoming" => Some(Stage::Upcoming), "backlog" => Some(Stage::Backlog), - "coding" => Some(Stage::Coding), + "coding" => Some(Stage::Coding { claim }), "qa" => Some(Stage::Qa), "blocked" => Some(Stage::Blocked { reason: String::new(), @@ -472,6 +497,7 @@ fn project_stage_for_view( "merge" => Some(Stage::Merge { feature_branch: BranchName(format!("feature/story-{story_id}")), commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), + claim, }), "merge_failure" => { // Story 986: read the typed kind directly from ContentKey::MergeFailureKind @@ -709,8 +735,6 @@ mod tests { None, None, None, - None, - None, ); // The story is live on this node. @@ -779,8 +803,6 @@ mod tests { None, None, None, - None, - None, ); assert!( read_item(story_id).is_none(), diff --git a/server/src/crdt_state/state/tests.rs b/server/src/crdt_state/state/tests.rs index cdef6a4a..4ef73937 100644 --- a/server/src/crdt_state/state/tests.rs +++ b/server/src/crdt_state/state/tests.rs @@ -117,8 +117,6 @@ async fn subscribe_receives_stage_transition_events() { None, None, None, - None, - None, ); let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on insert"); @@ -138,8 +136,6 @@ async fn subscribe_receives_stage_transition_events() { None, None, None, - None, - None, ); let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on stage change"); @@ -148,7 +144,10 @@ async fn subscribe_receives_stage_transition_events() { evt.from_stage, Some(crate::pipeline_state::Stage::Backlog) )); - assert!(matches!(evt.to_stage, crate::pipeline_state::Stage::Coding)); + assert!(matches!( + evt.to_stage, + crate::pipeline_state::Stage::Coding { .. } + )); } #[tokio::test] diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs index f3c8ee82..c85cd851 100644 --- a/server/src/crdt_state/types.rs +++ b/server/src/crdt_state/types.rs @@ -65,14 +65,13 @@ pub struct PipelineItemCrdt { pub agent: LwwRegisterCrdt, pub retry_count: LwwRegisterCrdt, pub depends_on: LwwRegisterCrdt, - /// Node ID (hex-encoded Ed25519 pubkey) of the node that claimed this item. - /// Used for distributed work claiming — the LWW register resolves conflicts - /// deterministically so all nodes converge on the same claimer. - pub claimed_by: LwwRegisterCrdt, + /// Name of the agent (e.g. `"coder-1"`) that has claimed this item. + /// Empty string means the item is unclaimed. Replaces the legacy + /// `claimed_by` node-hex register (story 1009). + pub claim_agent: LwwRegisterCrdt, /// Unix timestamp (seconds) when the claim was written. - /// Used for timeout-based reclaim: if a node crashes, other nodes can - /// reclaim the item after the timeout expires. - pub claimed_at: LwwRegisterCrdt, + /// Zero means no active claim. Previously named `claimed_at`. + pub claim_ts: LwwRegisterCrdt, /// Unix timestamp (seconds) when the item was merged to master. /// Written once when the item transitions to `5_done`. Used by the /// sweep loop to determine when to promote to `6_archived`. @@ -121,18 +120,6 @@ pub struct NodePresenceCrdt { // ── Read-side view types ───────────────────────────────────────────── -/// Active claim on a pipeline item — node that owns it and when the claim was written. -/// -/// Both fields must be present for a claim to be valid; a partial claim (node -/// but no timestamp, or vice versa) is treated as absent by `extract_item_view`. -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct Claim { - /// Hex-encoded Ed25519 public key of the node that holds the claim. - pub node: String, - /// Unix timestamp (seconds, integer) when the claim was written. - pub at: u64, -} - /// Numeric identifier for an epic work item. /// /// The numeric prefix of the epic's story_id (e.g. `EpicId(9990)` for the @@ -203,9 +190,6 @@ pub struct WorkItem { pub(super) retry_count: u32, /// Dependency story numbers — empty `Vec` when the register is unset. pub(super) depends_on: Vec, - /// Active claim (node + timestamp). `None` when the item is unclaimed or - /// when only one of the two companion registers is set. - pub(super) claim: Option, /// QA mode override. `None` means "use the project default". pub(super) qa_mode: Option, /// Item type. `None` means "infer from the story_id slug prefix". @@ -248,11 +232,6 @@ impl WorkItem { &self.depends_on } - /// Active claim on this item, or `None` when unclaimed. - pub fn claim(&self) -> Option<&Claim> { - self.claim.as_ref() - } - /// QA mode override, or `None` when the register is unset (use project default). pub fn qa_mode(&self) -> Option { self.qa_mode @@ -281,7 +260,6 @@ impl WorkItem { agent: Option, retry_count: u32, depends_on: Vec, - claim: Option, qa_mode: Option, item_type: Option, epic: Option, @@ -293,7 +271,6 @@ impl WorkItem { agent, retry_count, depends_on, - claim, qa_mode, item_type, epic, @@ -480,8 +457,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, + "claim_agent": "", + "claim_ts": 0.0, }) .into(); @@ -515,8 +492,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", - "claimed_by": "", - "claimed_at": 0.0, + "claim_agent": "", + "claim_ts": 0.0, }) .into(); @@ -541,7 +518,7 @@ mod tests { let evt = CrdtEvent { story_id: "42_story_foo".to_string(), from_stage: Some(crate::pipeline_state::Stage::Backlog), - to_stage: crate::pipeline_state::Stage::Coding, + to_stage: crate::pipeline_state::Stage::Coding { claim: None }, name: "Foo Feature".to_string(), }; assert_eq!(evt.story_id, "42_story_foo"); @@ -549,7 +526,10 @@ mod tests { evt.from_stage, Some(crate::pipeline_state::Stage::Backlog) )); - assert!(matches!(evt.to_stage, crate::pipeline_state::Stage::Coding)); + assert!(matches!( + evt.to_stage, + crate::pipeline_state::Stage::Coding { .. } + )); assert_eq!(evt.name, "Foo Feature"); } @@ -698,7 +678,7 @@ mod tests { let evt = CrdtEvent { story_id: "70_story_broadcast".to_string(), from_stage: Some(Stage::Backlog), - to_stage: Stage::Coding, + to_stage: Stage::Coding { claim: None }, name: "Broadcast Test".to_string(), }; tx.send(evt).unwrap(); @@ -706,7 +686,7 @@ mod tests { let received = rx.try_recv().unwrap(); assert_eq!(received.story_id, "70_story_broadcast"); assert!(matches!(received.from_stage, Some(Stage::Backlog))); - assert!(matches!(received.to_stage, Stage::Coding)); + assert!(matches!(received.to_stage, Stage::Coding { .. })); assert_eq!(received.name, "Broadcast Test"); } } diff --git a/server/src/crdt_state/write/item.rs b/server/src/crdt_state/write/item.rs index 823b8357..a89e4942 100644 --- a/server/src/crdt_state/write/item.rs +++ b/server/src/crdt_state/write/item.rs @@ -11,7 +11,7 @@ use serde_json::json; use super::super::state::{apply_and_persist, emit_event, get_crdt, rebuild_index}; use super::super::types::CrdtEvent; use crate::io::story_metadata::QaMode; -use crate::pipeline_state::{Stage, stage_dir_name}; +use crate::pipeline_state::{AgentClaim, Stage, stage_dir_name}; /// Set the typed `depends_on` CRDT register for a pipeline item. /// @@ -221,7 +221,6 @@ pub fn set_qa_mode(story_id: &str, mode: Option) -> bool { /// /// `stage` is the typed pipeline state; it is serialised to the canonical /// clean wire form (story 934) via [`stage_dir_name`] at the CRDT boundary. -#[allow(clippy::too_many_arguments)] pub fn write_item( story_id: &str, stage: &Stage, @@ -229,11 +228,14 @@ pub fn write_item( agent: Option<&str>, retry_count: Option, depends_on: Option<&str>, - claimed_by: Option<&str>, - claimed_at: Option, merged_at: Option, ) { let stage_str = stage_dir_name(stage); + let claim: Option<&AgentClaim> = match stage { + Stage::Coding { claim } => claim.as_ref(), + Stage::Merge { claim, .. } => claim.as_ref(), + _ => None, + }; let Some(state_mutex) = get_crdt() else { return; }; @@ -291,14 +293,19 @@ pub fn write_item( s.crdt.doc.items[idx].depends_on.set(d.to_string()) }); } - if let Some(cb) = claimed_by { - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].claimed_by.set(cb.to_string()) - }); - } - if let Some(ca) = claimed_at { - apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(ca)); - } + let (claim_agent_str, claim_ts_val) = match claim { + Some(c) => ( + c.agent.0.as_str().to_string(), + c.claimed_at.timestamp() as f64, + ), + None => (String::new(), 0.0), + }; + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].claim_agent.set(claim_agent_str) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].claim_ts.set(claim_ts_val) + }); if let Some(ma) = merged_at { apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma)); } @@ -322,6 +329,13 @@ pub fn write_item( } } else { // Insert new item. + let (insert_claim_agent, insert_claim_ts) = match claim { + Some(c) => ( + c.agent.0.as_str().to_string(), + c.claimed_at.timestamp() as f64, + ), + None => (String::new(), 0.0), + }; let item_json: JsonValue = json!({ "story_id": story_id, "stage": stage_str, @@ -329,8 +343,8 @@ pub fn write_item( "agent": agent.unwrap_or(""), "retry_count": retry_count.unwrap_or(0) as f64, "depends_on": depends_on.unwrap_or(""), - "claimed_by": claimed_by.unwrap_or(""), - "claimed_at": claimed_at.unwrap_or(0.0), + "claim_agent": insert_claim_agent, + "claim_ts": insert_claim_ts, "merged_at": merged_at.unwrap_or(0.0), "qa_mode": "", "item_type": "", @@ -357,8 +371,8 @@ pub fn write_item( item.agent.advance_seq(floor); item.retry_count.advance_seq(floor); item.depends_on.advance_seq(floor); - item.claimed_by.advance_seq(floor); - item.claimed_at.advance_seq(floor); + item.claim_agent.advance_seq(floor); + item.claim_ts.advance_seq(floor); item.merged_at.advance_seq(floor); item.qa_mode.advance_seq(floor); item.item_type.advance_seq(floor); @@ -384,7 +398,6 @@ pub fn write_item( /// Stages are normalised through [`Stage::from_dir`]: unknown strings cause /// the write to be skipped (with a log line). #[cfg(test)] -#[allow(clippy::too_many_arguments)] pub fn write_item_str( story_id: &str, stage: &str, @@ -392,8 +405,6 @@ pub fn write_item_str( agent: Option<&str>, retry_count: Option, depends_on: Option<&str>, - claimed_by: Option<&str>, - claimed_at: Option, merged_at: Option, ) { // Normalise pre-934 directory-style strings to clean wire form so @@ -423,8 +434,6 @@ pub fn write_item_str( agent, retry_count, depends_on, - claimed_by, - claimed_at, merged_at, ); } diff --git a/server/src/crdt_state/write/migrations.rs b/server/src/crdt_state/write/migrations.rs index ff774f69..b7dfbcb8 100644 --- a/server/src/crdt_state/write/migrations.rs +++ b/server/src/crdt_state/write/migrations.rs @@ -276,6 +276,57 @@ pub fn migrate_legacy_stage_strings() { ); } +/// Clear legacy node-hex claims from `claim_agent` and `claim_ts` registers. +/// +/// Pre-1009 nodes wrote the Ed25519 hex pubkey as `claimed_by`. That value +/// cannot be converted to an `AgentName`, so the safe migration is to wipe +/// any existing claim rather than carry over a semantically invalid string. +/// +/// Only clears entries where `claim_agent` looks like a legacy node hex value +/// (64 hex chars). Entries that are already empty or contain an agent-name +/// string (shorter, mixed case) are left untouched. +pub fn migrate_node_claims_to_agent_claims() { + let Some(state_mutex) = get_crdt() else { + return; + }; + + let stale_indices: Vec = { + let Ok(state) = state_mutex.lock() else { + return; + }; + state + .index + .values() + .copied() + .filter(|&idx| { + let item = &state.crdt.doc.items[idx]; + match item.claim_agent.view() { + JsonValue::String(s) => { + s.len() == 64 && s.chars().all(|c| c.is_ascii_hexdigit()) + } + _ => false, + } + }) + .collect() + }; + + if stale_indices.is_empty() { + return; + } + + let Ok(mut state) = state_mutex.lock() else { + return; + }; + let count = stale_indices.len(); + for idx in stale_indices { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].claim_agent.set(String::new()) + }); + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claim_ts.set(0.0)); + } + slog!("[crdt] Cleared {count} legacy node-hex claim(s) from claim_agent/claim_ts"); +} + #[cfg(test)] mod stage_migration_tests { use super::super::super::state::init_for_test; @@ -299,8 +350,6 @@ mod stage_migration_tests { None, None, None, - None, - None, ); // Then overwrite the stage register with the raw legacy string, // bypassing `db::normalise_stage_str` / `write_item_str`'s mapping. @@ -318,7 +367,11 @@ mod stage_migration_tests { let cases: &[(&str, &str, Stage)] = &[ ("9501_legacy_upcoming", "0_upcoming", Stage::Upcoming), ("9502_legacy_backlog", "1_backlog", Stage::Backlog), - ("9503_legacy_coding", "2_current", Stage::Coding), + ( + "9503_legacy_coding", + "2_current", + Stage::Coding { claim: None }, + ), ( "9504_legacy_blocked", "2_blocked", @@ -333,6 +386,7 @@ mod stage_migration_tests { Stage::Merge { feature_branch: BranchName(String::new()), commits_ahead: NonZeroU32::new(1).unwrap(), + claim: None, }, ), ( @@ -398,14 +452,12 @@ mod stage_migration_tests { // Seed two items: one already in clean form, one in legacy form. write_item( "9520_already_clean", - &Stage::Coding, + &Stage::Coding { claim: None }, Some("Already Clean"), None, None, None, None, - None, - None, ); seed_with_raw_stage("9521_needs_migration", "2_current"); @@ -416,11 +468,11 @@ mod stage_migration_tests { let migrated = read_item("9521_needs_migration").unwrap(); assert!(matches!( clean.stage(), - crate::pipeline_state::Stage::Coding + crate::pipeline_state::Stage::Coding { .. } )); assert!(matches!( migrated.stage(), - crate::pipeline_state::Stage::Coding + crate::pipeline_state::Stage::Coding { .. } )); } diff --git a/server/src/crdt_state/write/mod.rs b/server/src/crdt_state/write/mod.rs index 27f11243..68f6d241 100644 --- a/server/src/crdt_state/write/mod.rs +++ b/server/src/crdt_state/write/mod.rs @@ -18,5 +18,5 @@ pub use item::{ pub use item::write_item_str; pub use migrations::{ migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs, - migrate_story_ids_to_numeric, name_from_story_id, + migrate_node_claims_to_agent_claims, migrate_story_ids_to_numeric, name_from_story_id, }; diff --git a/server/src/crdt_state/write/tests.rs b/server/src/crdt_state/write/tests.rs index 4bfe713b..cb018229 100644 --- a/server/src/crdt_state/write/tests.rs +++ b/server/src/crdt_state/write/tests.rs @@ -98,8 +98,6 @@ fn migrate_story_ids_to_numeric_rewrites_slug_ids() { None, None, None, - None, - None, ); let result = migrate_story_ids_to_numeric(); @@ -130,8 +128,6 @@ fn migrate_story_ids_to_numeric_is_idempotent() { None, None, None, - None, - None, ); // First call — nothing to migrate. @@ -159,8 +155,6 @@ fn migrate_story_ids_to_numeric_skips_conflict() { None, None, None, - None, - None, ); write_item_str( "44", @@ -170,8 +164,6 @@ fn migrate_story_ids_to_numeric_skips_conflict() { None, None, None, - None, - None, ); let result = migrate_story_ids_to_numeric(); @@ -204,14 +196,15 @@ fn migrate_story_ids_to_numeric_preserves_stage_and_name() { None, None, None, - None, - None, ); migrate_story_ids_to_numeric(); let item = read_item("45").expect("item must be accessible by numeric ID"); - assert!(matches!(item.stage, crate::pipeline_state::Stage::Coding)); + assert!(matches!( + item.stage, + crate::pipeline_state::Stage::Coding { .. } + )); assert_eq!(item.name, "Crash Bug"); assert_eq!(item.agent.map(|a| a.as_str()), Some("coder-1")); } @@ -229,8 +222,6 @@ fn migrate_names_from_slugs_fills_empty_names() { None, None, None, - None, - None, ); // Before migration: nameless item is filtered by read_item (AC 5). @@ -261,8 +252,6 @@ fn migrate_names_from_slugs_leaves_existing_names_unchanged() { None, None, None, - None, - None, ); migrate_names_from_slugs(); @@ -297,8 +286,6 @@ fn set_depends_on_round_trip_and_clear() { None, None, None, - None, - None, ); // Set depends_on to [837] and verify CRDT register holds the list. @@ -352,8 +339,6 @@ fn set_agent_some_writes_name() { None, None, None, - None, - None, ); let found = set_agent( @@ -382,8 +367,6 @@ fn set_agent_none_clears_register() { None, None, None, - None, - None, ); // Confirm agent is set. @@ -430,8 +413,6 @@ fn set_qa_mode_round_trip_server_then_human() { None, None, None, - None, - None, ); // Set qa=server via typed path and assert CRDT register reflects it. @@ -485,8 +466,6 @@ fn set_qa_mode_round_trip_all_variants() { None, None, None, - None, - None, ); for mode in [QaMode::Server, QaMode::Agent, QaMode::Human] { @@ -523,8 +502,6 @@ fn bump_retry_count_increments_by_one() { None, None, None, - None, - None, ); let v1 = bump_retry_count("9001_story_bump_test"); @@ -548,8 +525,6 @@ fn set_retry_count_resets_to_zero() { Some(5), None, None, - None, - None, ); set_retry_count("9002_story_set_test", 0); @@ -666,7 +641,7 @@ async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { let idx2 = index2["511_story_target"]; let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap(); assert!( - matches!(view.stage, crate::pipeline_state::Stage::Coding), + matches!(view.stage, crate::pipeline_state::Stage::Coding { .. }), "stage field update lost during replay (bug 511 regression)" ); @@ -727,8 +702,6 @@ async fn tombstone_survives_concurrent_writes() { None, None, None, - None, - None, ); assert!( read_item(story_id).is_some(), @@ -748,8 +721,6 @@ async fn tombstone_survives_concurrent_writes() { None, None, None, - None, - None, ); tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; } diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index d3c22ed6..1edf2133 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -466,8 +466,6 @@ mod tests { Some(3), None, None, - None, - None, ); write_content( ContentKey::Story(story_id), diff --git a/server/src/db/ops.rs b/server/src/db/ops.rs index 73d9f237..3f473122 100644 --- a/server/src/db/ops.rs +++ b/server/src/db/ops.rs @@ -93,8 +93,6 @@ pub fn write_item_with_content(story_id: &str, stage: &str, content: &str, meta: meta.agent.as_deref(), meta.retry_count, depends_on_json.as_deref(), - None, - None, merged_at_ts, ); @@ -148,17 +146,7 @@ pub fn move_item_stage( }; let merged_at_ts = matches!(typed_stage, crate::pipeline_state::Stage::Done { .. }) .then(|| chrono::Utc::now().timestamp() as f64); - crate::crdt_state::write_item( - story_id, - &typed_stage, - None, - None, - None, - None, - None, - None, - merged_at_ts, - ); + crate::crdt_state::write_item(story_id, &typed_stage, None, None, None, None, merged_at_ts); // Bug 780: stage transitions reset retry_count to 0. retry_count tracks // attempts at THIS stage's work (coding, merging, qa); a fresh attempt at // a new stage is conceptually distinct from prior attempts at a different diff --git a/server/src/db/recover.rs b/server/src/db/recover.rs index 2a1a3091..f06cc59c 100644 --- a/server/src/db/recover.rs +++ b/server/src/db/recover.rs @@ -350,8 +350,6 @@ mod tests { None, None, None, - None, - None, ); assert!( crdt_state::read_item(old_id).is_none(), diff --git a/server/src/http/mcp/diagnostics/mod.rs b/server/src/http/mcp/diagnostics/mod.rs index ad013c38..f98a0d14 100644 --- a/server/src/http/mcp/diagnostics/mod.rs +++ b/server/src/http/mcp/diagnostics/mod.rs @@ -93,8 +93,8 @@ pub(crate) fn tool_dump_crdt(args: &Value) -> Result { "agent": item.agent, "retry_count": item.retry_count, "depends_on": item.depends_on, - "claimed_by": item.claimed_by, - "claimed_at": item.claimed_at, + "claimed_by": item.claim_agent, + "claimed_at": item.claim_ts, "content_index": item.content_index, "is_deleted": item.is_deleted, }) diff --git a/server/src/http/mcp/merge_tools.rs b/server/src/http/mcp/merge_tools.rs index 7109050e..7264b98a 100644 --- a/server/src/http/mcp/merge_tools.rs +++ b/server/src/http/mcp/merge_tools.rs @@ -305,8 +305,6 @@ mod tests { None, None, None, - None, - None, ); let tmp = tempfile::tempdir().unwrap(); let ctx = test_ctx(tmp.path()); @@ -331,8 +329,6 @@ mod tests { None, None, None, - None, - None, ); let tmp = tempfile::tempdir().unwrap(); let ctx = test_ctx(tmp.path()); diff --git a/server/src/http/mcp/status_tools.rs b/server/src/http/mcp/status_tools.rs index d254c6bf..29b55773 100644 --- a/server/src/http/mcp/status_tools.rs +++ b/server/src/http/mcp/status_tools.rs @@ -165,7 +165,10 @@ pub(super) async fn tool_status(args: &Value, ctx: &AppContext) -> Result Result claim.as_ref(), + crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(), + _ => None, + }; + if let Some(claim) = stage_claim { + front_matter.insert("claimed_by".to_string(), json!(claim.agent.0.as_str())); + front_matter.insert( + "claimed_at".to_string(), + json!(claim.claimed_at.timestamp() as f64), + ); } } diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index a554bf27..edbdf6d7 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -188,8 +188,8 @@ pub fn debug_crdt_handler(req: &poem::Request) -> poem::Response { "agent": item.agent, "retry_count": item.retry_count, "depends_on": item.depends_on, - "claimed_by": item.claimed_by, - "claimed_at": item.claimed_at, + "claimed_by": item.claim_agent, + "claimed_at": item.claim_ts, "content_index": item.content_index, "is_deleted": item.is_deleted, }) diff --git a/server/src/http/workflow/pipeline.rs b/server/src/http/workflow/pipeline.rs index 6a8c1ade..49e59380 100644 --- a/server/src/http/workflow/pipeline.rs +++ b/server/src/http/workflow/pipeline.rs @@ -171,7 +171,7 @@ pub fn load_pipeline_state(ctx: &AppContext) -> Result { match &item.stage { Stage::Upcoming => state.backlog.push(story), // upcoming shown with backlog Stage::Backlog => state.backlog.push(story), - Stage::Coding => state.current.push(story), + Stage::Coding { .. } => state.current.push(story), Stage::Blocked { .. } => state.current.push(story), // blocked shown with current Stage::Qa => state.qa.push(story), Stage::Merge { .. } => state.merge.push(story), @@ -182,7 +182,7 @@ pub fn load_pipeline_state(ctx: &AppContext) -> Result { // Route to the section matching the stage that was active when // the item was frozen, so it appears in-place. match unwrap_frozen(resume_to) { - Stage::Coding | Stage::Blocked { .. } => state.current.push(story), + Stage::Coding { .. } | Stage::Blocked { .. } => state.current.push(story), Stage::Qa | Stage::ReviewHold { .. } => state.qa.push(story), Stage::Merge { .. } | Stage::MergeFailure { .. } @@ -324,7 +324,7 @@ pub fn validate_story_dirs(_root: &Path) -> Result, S let mut results = Vec::new(); for item in crate::pipeline_state::read_all_typed() { - if !matches!(item.stage, Stage::Backlog | Stage::Coding) { + if !matches!(item.stage, Stage::Backlog | Stage::Coding { .. }) { continue; } results.push(StoryValidationResult { diff --git a/server/src/io/watcher/mod.rs b/server/src/io/watcher/mod.rs index fb56d2f3..752fc532 100644 --- a/server/src/io/watcher/mod.rs +++ b/server/src/io/watcher/mod.rs @@ -54,7 +54,7 @@ pub fn stage_metadata( match stage { Stage::Upcoming => ("create", format!("huskies: triage {item_id}")), Stage::Backlog => ("create", format!("huskies: create {item_id}")), - Stage::Coding => ("start", format!("huskies: start {item_id}")), + Stage::Coding { .. } => ("start", format!("huskies: start {item_id}")), Stage::Blocked { .. } => ("block", format!("huskies: block {item_id}")), Stage::Qa => ("qa", format!("huskies: queue {item_id} for QA")), Stage::Merge { .. } => ("merge", format!("huskies: queue {item_id} for merge")), diff --git a/server/src/io/watcher/tests.rs b/server/src/io/watcher/tests.rs index e6cc4363..3d4e899d 100644 --- a/server/src/io/watcher/tests.rs +++ b/server/src/io/watcher/tests.rs @@ -54,7 +54,7 @@ fn stage_metadata_returns_correct_actions() { use crate::pipeline_state::{GitSha, Stage}; use chrono::Utc; - let (action, msg) = stage_metadata(&Stage::Coding, "42_story_foo"); + let (action, msg) = stage_metadata(&Stage::Coding { claim: None }, "42_story_foo"); assert_eq!(action, "start"); assert_eq!(msg, "huskies: start 42_story_foo"); @@ -177,8 +177,6 @@ fn sweep_uses_crdt_merged_at_not_utc_now() { None, None, None, - None, - None, Some(ten_seconds_ago), ); @@ -209,8 +207,6 @@ fn sweep_keeps_item_newer_than_retention() { None, None, None, - None, - None, Some(one_second_ago), ); diff --git a/server/src/pipeline_state/events.rs b/server/src/pipeline_state/events.rs index 4d2fec34..5b579120 100644 --- a/server/src/pipeline_state/events.rs +++ b/server/src/pipeline_state/events.rs @@ -123,7 +123,7 @@ mod tests { bus.fire(TransitionFired { story_id: StoryId("test".into()), before: Stage::Backlog, - after: Stage::Coding, + after: Stage::Coding { claim: None }, event: PipelineEvent::DepsMet, at: Utc::now(), }); @@ -142,6 +142,7 @@ mod tests { let merge = Stage::Merge { feature_branch: BranchName("feature/story-1".into()), commits_ahead: NonZeroU32::new(3).unwrap(), + claim: None, }; // Stage::Merge has exactly two fields: feature_branch and commits_ahead. // There is no way to attach an agent name to it. The type system diff --git a/server/src/pipeline_state/mod.rs b/server/src/pipeline_state/mod.rs index ad94cdb4..002f4071 100644 --- a/server/src/pipeline_state/mod.rs +++ b/server/src/pipeline_state/mod.rs @@ -40,8 +40,8 @@ mod tests; #[allow(unused_imports)] pub use types::{ - AgentName, ArchiveReason, BranchName, ExecutionState, GitSha, MergeFailureKind, NodePubkey, - PipelineItem, Stage, StoryId, TransitionError, stage_dir_name, stage_label, + AgentClaim, AgentName, ArchiveReason, BranchName, ExecutionState, GitSha, MergeFailureKind, + NodePubkey, PipelineItem, Stage, StoryId, TransitionError, stage_dir_name, stage_label, }; #[allow(unused_imports)] diff --git a/server/src/pipeline_state/projection.rs b/server/src/pipeline_state/projection.rs index d5b20dc1..4396a34b 100644 --- a/server/src/pipeline_state/projection.rs +++ b/server/src/pipeline_state/projection.rs @@ -122,7 +122,6 @@ mod tests { None, None, None, - None, ) } @@ -145,7 +144,6 @@ mod tests { None, None, None, - None, ); let item = PipelineItem::try_from(&view).unwrap(); assert_eq!(item.story_id, StoryId("42_story_test".to_string())); @@ -159,7 +157,7 @@ mod tests { fn project_current_item() { let view = PipelineItemView::for_test( "42_story_test", - Stage::Coding, + Stage::Coding { claim: None }, "Test", Some(crate::config::AgentName::Coder1), 2u32, @@ -167,10 +165,9 @@ mod tests { None, None, None, - None, ); let item = PipelineItem::try_from(&view).unwrap(); - assert!(matches!(item.stage, Stage::Coding)); + assert!(matches!(item.stage, Stage::Coding { .. })); assert_eq!(item.retry_count, 2); } @@ -181,6 +178,7 @@ mod tests { Stage::Merge { feature_branch: fb("feature/story-42_story_test"), commits_ahead: nz(1), + claim: None, }, Some("Test"), ); @@ -189,6 +187,7 @@ mod tests { if let Stage::Merge { feature_branch, commits_ahead, + .. } = &item.stage { assert_eq!(feature_branch.0, "feature/story-42_story_test"); @@ -226,7 +225,6 @@ mod tests { None, None, None, - None, ); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!( @@ -253,7 +251,6 @@ mod tests { None, None, None, - None, ); let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!( @@ -270,7 +267,7 @@ mod tests { let view = make_view( "42_story_test", Stage::Frozen { - resume_to: Box::new(Stage::Coding), + resume_to: Box::new(Stage::Coding { claim: None }), }, Some("Frozen Story"), ); diff --git a/server/src/pipeline_state/tests.rs b/server/src/pipeline_state/tests.rs index 956cb359..08bb73b3 100644 --- a/server/src/pipeline_state/tests.rs +++ b/server/src/pipeline_state/tests.rs @@ -19,7 +19,7 @@ fn sid(s: &str) -> StoryId { fn happy_path_backlog_through_archived() { let s = Stage::Backlog; let s = transition(s, PipelineEvent::DepsMet).unwrap(); - assert!(matches!(s, Stage::Coding)); + assert!(matches!(s, Stage::Coding { .. })); let s = transition( s, @@ -52,7 +52,7 @@ fn happy_path_backlog_through_archived() { #[test] fn happy_path_with_qa() { - let s = Stage::Coding; + let s = Stage::Coding { claim: None }; let s = transition(s, PipelineEvent::GatesStarted).unwrap(); assert!(matches!(s, Stage::Qa)); @@ -69,7 +69,7 @@ fn happy_path_with_qa() { #[test] fn qa_retry_loop() { - let s = Stage::Coding; + let s = Stage::Coding { claim: None }; let s = transition(s, PipelineEvent::GatesStarted).unwrap(); assert!(matches!(s, Stage::Qa)); @@ -80,7 +80,7 @@ fn qa_retry_loop() { }, ) .unwrap(); - assert!(matches!(s, Stage::Coding)); + assert!(matches!(s, Stage::Coding { .. })); } // ── Bug 519: Merge with zero commits is unrepresentable ───────────── @@ -154,7 +154,7 @@ fn cannot_start_gates_from_backlog() { #[test] fn cannot_accept_from_coding() { - let result = transition(Stage::Coding, PipelineEvent::Accepted); + let result = transition(Stage::Coding { claim: None }, PipelineEvent::Accepted); assert!(matches!( result, Err(TransitionError::InvalidTransition { .. }) @@ -165,7 +165,7 @@ fn cannot_accept_from_coding() { #[test] fn block_from_any_active_stage() { - for s in [Stage::Backlog, Stage::Coding, Stage::Qa] { + for s in [Stage::Backlog, Stage::Coding { claim: None }, Stage::Qa] { let result = transition( s.clone(), PipelineEvent::Block { @@ -178,6 +178,7 @@ fn block_from_any_active_stage() { let m = Stage::Merge { feature_branch: fb("f"), commits_ahead: nz(1), + claim: None, }; let result = transition( m, @@ -194,7 +195,7 @@ fn unblock_returns_to_coding() { reason: "test".into(), }; let result = transition(s, PipelineEvent::Unblock).unwrap(); - assert!(matches!(result, Stage::Coding)); + assert!(matches!(result, Stage::Coding { .. })); } #[test] @@ -251,7 +252,7 @@ fn legacy_unblock_archived_blocked_returns_to_backlog() { fn abandon_from_any_active_or_done() { for s in [ Stage::Backlog, - Stage::Coding, + Stage::Coding { claim: None }, Stage::Qa, Stage::Done { merged_at: chrono::Utc::now(), @@ -267,7 +268,7 @@ fn abandon_from_any_active_or_done() { fn supersede_from_any_active_or_done() { for s in [ Stage::Backlog, - Stage::Coding, + Stage::Coding { claim: None }, Stage::Qa, Stage::Done { merged_at: chrono::Utc::now(), @@ -291,7 +292,7 @@ fn review_hold_from_active_stages() { // Story 945: `ReviewHold` transitions to `Stage::ReviewHold { resume_to }` // with the resume_to set to the originating stage, replacing the legacy // boolean flag. - for s in [Stage::Backlog, Stage::Coding, Stage::Qa] { + for s in [Stage::Backlog, Stage::Coding { claim: None }, Stage::Qa] { let result = transition( s.clone(), PipelineEvent::ReviewHold { @@ -316,6 +317,7 @@ fn merge_failed_final() { let s = Stage::Merge { feature_branch: fb("f"), commits_ahead: nz(1), + claim: None, }; let result = transition( s, @@ -336,7 +338,7 @@ fn merge_failed_final() { #[test] fn merge_failed_only_from_merge() { let result = transition( - Stage::Coding, + Stage::Coding { claim: None }, PipelineEvent::MergeFailedFinal { reason: "conflicts".into(), }, @@ -413,6 +415,7 @@ fn bug_502_agent_not_in_stage() { let merge = Stage::Merge { feature_branch: BranchName("feature/story-1".into()), commits_ahead: NonZeroU32::new(3).unwrap(), + claim: None, }; // Stage::Merge has exactly two fields: feature_branch and commits_ahead. // There is no way to attach an agent name to it. The type system @@ -480,7 +483,7 @@ fn cannot_deps_met_from_upcoming() { #[test] fn reject_from_active_stages() { - for s in [Stage::Backlog, Stage::Coding, Stage::Qa] { + for s in [Stage::Backlog, Stage::Coding { claim: None }, Stage::Qa] { let result = transition( s.clone(), PipelineEvent::Reject { @@ -493,6 +496,7 @@ fn reject_from_active_stages() { let m = Stage::Merge { feature_branch: fb("f"), commits_ahead: nz(1), + claim: None, }; let result = transition( m, @@ -561,7 +565,7 @@ fn freeze_transitions_to_frozen_variant_with_resume_to() { ); let item = read_typed(story_id).unwrap().unwrap(); - assert!(matches!(item.stage, Stage::Coding)); + assert!(matches!(item.stage, Stage::Coding { .. })); assert!(!matches!(item.stage, Stage::Frozen { .. })); super::apply::transition_to_frozen(story_id).expect("freeze should succeed"); @@ -569,7 +573,7 @@ fn freeze_transitions_to_frozen_variant_with_resume_to() { let item = read_typed(story_id).unwrap().unwrap(); match &item.stage { Stage::Frozen { resume_to } => assert!( - matches!(**resume_to, Stage::Coding), + matches!(**resume_to, Stage::Coding { .. }), "resume_to should preserve the previous stage; got {resume_to:?}" ), other => panic!("stage should be Stage::Frozen after freeze; got {other:?}"), @@ -583,7 +587,7 @@ fn freeze_transitions_to_frozen_variant_with_resume_to() { let item = read_typed(story_id).unwrap().unwrap(); assert!( - matches!(item.stage, Stage::Coding), + matches!(item.stage, Stage::Coding { .. }), "stage should return to Coding after unfreeze: {:?}", item.stage ); @@ -884,10 +888,11 @@ fn merge_aborted_returns_to_coding() { let s = Stage::Merge { feature_branch: fb("feature/story-73"), commits_ahead: nz(2), + claim: None, }; let result = transition(s, PipelineEvent::MergeAborted).unwrap(); assert!( - matches!(result, Stage::Coding), + matches!(result, Stage::Coding { .. }), "Merge + MergeAborted should return to Coding, got: {result:?}" ); } @@ -915,7 +920,7 @@ fn merge_aborted_moves_to_coding_via_crdt() { fired.before ); assert!( - matches!(fired.after, Stage::Coding), + matches!(fired.after, Stage::Coding { .. }), "fired.after should be Coding: {:?}", fired.after ); @@ -958,7 +963,7 @@ fn move_story_merge_to_current_succeeds() { .expect("CRDT read should succeed") .expect("item should exist"); assert!( - matches!(item.stage, Stage::Coding), + matches!(item.stage, Stage::Coding { .. }), "story should be in Coding after move_story_to_stage(merge → current): {:?}", item.stage ); @@ -974,7 +979,7 @@ fn hotfix_requested_from_done_lands_in_coding() { }; let result = transition(done, PipelineEvent::HotfixRequested).unwrap(); assert!( - matches!(result, Stage::Coding), + matches!(result, Stage::Coding { .. }), "Done + HotfixRequested must land in Coding; got: {:?}", result ); @@ -984,11 +989,12 @@ fn hotfix_requested_from_done_lands_in_coding() { fn hotfix_requested_rejected_from_non_done_stages() { for stage in [ Stage::Backlog, - Stage::Coding, + Stage::Coding { claim: None }, Stage::Qa, Stage::Merge { feature_branch: fb("feature/story-1"), commits_ahead: nz(1), + claim: None, }, ] { let result = transition(stage.clone(), PipelineEvent::HotfixRequested); diff --git a/server/src/pipeline_state/transition.rs b/server/src/pipeline_state/transition.rs index a2194291..2d9e96d3 100644 --- a/server/src/pipeline_state/transition.rs +++ b/server/src/pipeline_state/transition.rs @@ -149,10 +149,10 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Backlog), // ── Forward path ──────────────────────────────────────────────── - (Backlog, DepsMet) => Ok(Coding), - (Coding, GatesStarted) => Ok(Qa), + (Backlog, DepsMet) => Ok(Coding { claim: None }), + (Coding { .. }, GatesStarted) => Ok(Qa), ( - Coding, + Coding { .. }, QaSkipped { feature_branch, commits_ahead, @@ -160,6 +160,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Merge { feature_branch, commits_ahead, + claim: None, }), ( Qa, @@ -170,8 +171,9 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Merge { feature_branch, commits_ahead, + claim: None, }), - (Qa, GatesFailed { .. }) => Ok(Coding), + (Qa, GatesFailed { .. }) => Ok(Coding { claim: None }), (Merge { .. }, MergeSucceeded { merge_commit }) => Ok(Done { merged_at: now, merge_commit, @@ -193,7 +195,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Blocked { reason }), @@ -201,18 +203,20 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result { - Ok(Stage::ReviewHold { - resume_to: Box::new(s), - reason, - }) - } + ( + s @ (Backlog | Coding { .. } | Qa | Merge { .. }), + PipelineEvent::ReviewHold { reason }, + ) => Ok(Stage::ReviewHold { + resume_to: Box::new(s), + reason, + }), // ── MergeFailed: Merge → MergeFailure (recoverable intermediate) ── ( Merge { feature_branch, commits_ahead, + .. }, MergeFailed { kind }, ) => Ok(MergeFailure { @@ -246,14 +250,14 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Abandoned { ts: now }), (Upcoming, Supersede { by }) | (Backlog, Supersede { by }) - | (Coding, Supersede { by }) + | (Coding { .. }, Supersede { by }) | (Qa, Supersede { by }) | (Merge { .. }, Supersede { by }) | (Done { .. }, Supersede { by }) => Ok(Superseded { @@ -263,7 +267,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Rejected { ts: now, reason }), @@ -272,21 +276,24 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result { - Ok(Backlog) - } + (Coding { .. }, Demote) + | (Qa, Demote) + | (Merge { .. }, Demote) + | (Blocked { .. }, Demote) => Ok(Backlog), // ── Close: direct completion from any active stage ───────────── - (Backlog, Close) | (Coding, Close) | (Qa, Close) | (Merge { .. }, Close) => Ok(Done { - merged_at: now, - merge_commit: GitSha("closed".to_string()), - }), + (Backlog, Close) | (Coding { .. }, Close) | (Qa, Close) | (Merge { .. }, Close) => { + Ok(Done { + merged_at: now, + merge_commit: GitSha("closed".to_string()), + }) + } // ── Freeze: any non-terminal stage → Frozen { resume_to } ────── ( s @ (Upcoming | Backlog - | Coding + | Coding { .. } | Qa | Merge { .. } | Blocked { .. } @@ -305,7 +312,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(*resume_to), // ── FixupRequested: MergeFailure → Coding (coder fixup) ──────── - (MergeFailure { .. }, FixupRequested) => Ok(Coding), + (MergeFailure { .. }, FixupRequested) => Ok(Coding { claim: None }), // ── FixupRequested: MergeFailureFinal → Coding (operator override) // @@ -314,19 +321,19 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Coding), + (MergeFailureFinal { .. }, FixupRequested) => Ok(Coding { claim: None }), // ── ReQueuedForQa: MergeFailure → Qa (re-review) ──────────────── (MergeFailure { .. }, ReQueuedForQa) => Ok(Qa), // ── MergeAborted: Merge → Coding (abort in-flight merge) ───────── - (Merge { .. }, MergeAborted) => Ok(Coding), + (Merge { .. }, MergeAborted) => Ok(Coding { claim: None }), // ── HotfixRequested: Done → Coding (post-merge hotfix) ─────────── // Allows reopening a completed story so a coder can apply a hotfix. // A fresh feature branch is forked from master when auto-assign spawns // the coder. - (Done { .. }, HotfixRequested) => Ok(Coding), + (Done { .. }, HotfixRequested) => Ok(Coding { claim: None }), // ── MergemasterAttempted: MergeFailure → MergeFailureFinal ───── (MergeFailure { kind, .. }, MergemasterAttempted) => Ok(MergeFailureFinal { kind }), @@ -337,7 +344,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(*resume_to), // ── Unblock: Blocked → Coding ───────────────────────────────── - (Blocked { .. }, Unblock) => Ok(Coding), + (Blocked { .. }, Unblock) => Ok(Coding { claim: None }), // ── Unblock MergeFailure → Merge (re-attempt) ──────────────────── // `unblock_story` on a failed merge re-queues it for merge, restoring @@ -353,6 +360,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Merge { feature_branch, commits_ahead, + claim: None, }), // ── Demote MergeFailure → Backlog (manual parking) ─────────────── diff --git a/server/src/pipeline_state/types.rs b/server/src/pipeline_state/types.rs index b50ea0cb..68042aaa 100644 --- a/server/src/pipeline_state/types.rs +++ b/server/src/pipeline_state/types.rs @@ -109,6 +109,22 @@ impl MergeFailureKind { } } +// ── Agent claim payload ──────────────────────────────────────────────────── + +/// Active claim on a pipeline item by a specific agent. +/// +/// Embedded directly in [`Stage::Coding`] and [`Stage::Merge`] rather than +/// stored in separate CRDT registers. Readers access the claim via +/// `item.stage()` rather than through a separate `item.claim()` accessor +/// (story 1009). +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct AgentClaim { + /// The agent (e.g. `"coder-1"`) that has claimed this work item. + pub agent: AgentName, + /// When the claim was written. + pub claimed_at: DateTime, +} + // ── Synced pipeline stage (lives in CRDT, converges across nodes) ─────────── /// The pipeline stage for a work item. @@ -146,17 +162,26 @@ pub enum Stage { Backlog, /// Story is being actively coded somewhere in the mesh. - Coding, + /// + /// Carries an optional [`AgentClaim`] identifying which agent is currently + /// working on this item. `None` means the item is in the coding stage but + /// no agent has claimed it yet (e.g. just transitioned from Backlog and + /// waiting for an agent to pick it up). + Coding { claim: Option }, /// Coder has run; gates are running. Qa, /// Gates passed; ready to merge. + /// /// `commits_ahead: NonZeroU32` makes "Merge with nothing to merge" - /// structurally impossible (eliminates bug 519). + /// structurally impossible (eliminates bug 519). The optional + /// [`AgentClaim`] carries the mergemaster agent that owns this merge. Merge { feature_branch: BranchName, commits_ahead: NonZeroU32, + /// Agent currently running the merge, or `None` when unclaimed. + claim: Option, }, /// Mergemaster squashed to master. Always carries merge metadata. @@ -274,7 +299,7 @@ impl Stage { match s { "upcoming" => Some(Stage::Upcoming), "backlog" => Some(Stage::Backlog), - "coding" => Some(Stage::Coding), + "coding" => Some(Stage::Coding { claim: None }), "blocked" => Some(Stage::Blocked { reason: String::new(), }), @@ -282,6 +307,7 @@ impl Stage { "merge" => Some(Stage::Merge { feature_branch: BranchName(String::new()), commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), + claim: None, }), "merge_failure" => Some(Stage::MergeFailure { kind: MergeFailureKind::Other(String::new()), @@ -292,10 +318,10 @@ impl Stage { kind: MergeFailureKind::Other(String::new()), }), "frozen" => Some(Stage::Frozen { - resume_to: Box::new(Stage::Coding), + resume_to: Box::new(Stage::Coding { claim: None }), }), "review_hold" => Some(Stage::ReviewHold { - resume_to: Box::new(Stage::Coding), + resume_to: Box::new(Stage::Coding { claim: None }), reason: String::new(), }), "done" => Some(Stage::Done { @@ -391,7 +417,7 @@ pub fn stage_label(s: &Stage) -> &'static str { match s { Stage::Upcoming => "Upcoming", Stage::Backlog => "Backlog", - Stage::Coding => "Coding", + Stage::Coding { .. } => "Coding", Stage::Qa => "Qa", Stage::Merge { .. } => "Merge", Stage::MergeFailure { .. } => "MergeFailure", @@ -416,7 +442,7 @@ pub fn stage_dir_name(s: &Stage) -> &'static str { match s { Stage::Upcoming => "upcoming", Stage::Backlog => "backlog", - Stage::Coding => "coding", + Stage::Coding { .. } => "coding", Stage::Blocked { .. } => "blocked", Stage::Qa => "qa", Stage::Merge { .. } => "merge", diff --git a/server/src/service/agents/mod.rs b/server/src/service/agents/mod.rs index 0fa380a1..a44b17a9 100644 --- a/server/src/service/agents/mod.rs +++ b/server/src/service/agents/mod.rs @@ -142,7 +142,7 @@ pub fn get_work_item_content( let stages = [ ("1_backlog", Stage::Backlog), - ("2_current", Stage::Coding), + ("2_current", Stage::Coding { claim: None }), ("3_qa", Stage::Qa), ( "4_merge", @@ -317,8 +317,6 @@ max_budget_usd = 5.0 None, None, None, - None, - None, ); let item = get_work_item_content(tmp.path(), "42_story_foo").unwrap(); assert!(item.content.contains("Some content.")); diff --git a/server/src/service/notifications/format.rs b/server/src/service/notifications/format.rs index 80d647e7..aa550cc2 100644 --- a/server/src/service/notifications/format.rs +++ b/server/src/service/notifications/format.rs @@ -12,7 +12,7 @@ pub fn stage_display_name(stage: &Stage) -> &'static str { match stage { Stage::Upcoming => "Upcoming", Stage::Backlog => "Backlog", - Stage::Coding => "Current", + Stage::Coding { .. } => "Current", Stage::Blocked { .. } => "Blocked", Stage::Qa => "QA", Stage::Merge { .. } => "Merge", @@ -253,7 +253,10 @@ mod tests { #[test] fn stage_display_name_maps_all_known_stages() { assert_eq!(stage_display_name(&Stage::Backlog), "Backlog"); - assert_eq!(stage_display_name(&Stage::Coding), "Current"); + assert_eq!( + stage_display_name(&Stage::Coding { claim: None }), + "Current" + ); assert_eq!(stage_display_name(&Stage::Qa), "QA"); assert_eq!(stage_display_name(&merge_stage()), "Merge"); assert_eq!(stage_display_name(&done_stage()), "Done"); @@ -290,7 +293,7 @@ mod tests { "42_story_thing", "Some Story", &Stage::Backlog, - &Stage::Coding, + &Stage::Coding { claim: None }, ); assert!(!plain.contains("\u{1f389}")); } @@ -301,7 +304,7 @@ mod tests { "261_story_bot_notifications", "Bot notifications", &Stage::Upcoming, - &Stage::Coding, + &Stage::Coding { claim: None }, ); assert_eq!( plain, @@ -315,8 +318,12 @@ mod tests { #[test] fn format_stage_notification_without_story_name_falls_back_to_number() { - let (plain, html) = - format_stage_notification("42_bug_fix_thing", "", &Stage::Coding, &Stage::Qa); + let (plain, html) = format_stage_notification( + "42_bug_fix_thing", + "", + &Stage::Coding { claim: None }, + &Stage::Qa, + ); assert_eq!(plain, "#42 \u{2014} Current \u{2192} QA"); assert_eq!(html, "#42 \u{2014} Current \u{2192} QA"); } @@ -334,15 +341,23 @@ mod tests { #[test] fn format_stage_notification_long_name_is_preserved() { let long_name = "A".repeat(300); - let (plain, _html) = - format_stage_notification("1_story_long", &long_name, &Stage::Coding, &Stage::Qa); + let (plain, _html) = format_stage_notification( + "1_story_long", + &long_name, + &Stage::Coding { claim: None }, + &Stage::Qa, + ); assert!(plain.contains(&long_name)); } #[test] fn format_stage_notification_empty_story_name_falls_back_to_number() { - let (plain, html) = - format_stage_notification("42_story_empty", "", &Stage::Coding, &Stage::Qa); + let (plain, html) = format_stage_notification( + "42_story_empty", + "", + &Stage::Coding { claim: None }, + &Stage::Qa, + ); assert_eq!(plain, "#42 \u{2014} Current \u{2192} QA"); assert_eq!(html, "#42 \u{2014} Current \u{2192} QA"); } diff --git a/server/src/service/timer/mod.rs b/server/src/service/timer/mod.rs index 2540ebe5..f75c8983 100644 --- a/server/src/service/timer/mod.rs +++ b/server/src/service/timer/mod.rs @@ -130,7 +130,7 @@ pub async fn handle_timer_command( let in_valid_stage = if let Ok(Some(item)) = crate::pipeline_state::read_typed(&story_id) { use crate::pipeline_state::Stage; - matches!(item.stage, Stage::Backlog | Stage::Coding) + matches!(item.stage, Stage::Backlog | Stage::Coding { .. }) } else { let work_dir = project_root.join(".huskies").join("work"); work_dir diff --git a/server/src/service/work_item/assign.rs b/server/src/service/work_item/assign.rs index e60980ad..96782ff9 100644 --- a/server/src/service/work_item/assign.rs +++ b/server/src/service/work_item/assign.rs @@ -67,8 +67,6 @@ mod tests { None, None, None, - None, - None, ); let tmp = tempfile::tempdir().unwrap(); @@ -108,8 +106,6 @@ mod tests { None, None, None, - None, - None, ); } diff --git a/server/src/service/work_item/delete.rs b/server/src/service/work_item/delete.rs index fb440f06..4f83addd 100644 --- a/server/src/service/work_item/delete.rs +++ b/server/src/service/work_item/delete.rs @@ -204,8 +204,6 @@ mod tests { None, None, None, - None, - None, ); // Seed content store. diff --git a/server/src/startup/project.rs b/server/src/startup/project.rs index 0b71522b..0076ac79 100644 --- a/server/src/startup/project.rs +++ b/server/src/startup/project.rs @@ -165,6 +165,8 @@ pub(crate) async fn init_subsystems(app_state: &Arc, cwd: &Path) { } // Story 987: upgrade four-bool MergeJob entries to typed MergeResult enum. crdt_state::migrate_merge_job(db_path); + // Story 1009: drop legacy node-hex claims that can't be converted to AgentName. + crdt_state::migrate_node_claims_to_agent_claims(); } } } diff --git a/server/src/worktree/cleanup.rs b/server/src/worktree/cleanup.rs index 84645256..702637d0 100644 --- a/server/src/worktree/cleanup.rs +++ b/server/src/worktree/cleanup.rs @@ -315,7 +315,7 @@ mod tests { let config = empty_config(); let report = run_cleanup_with_lookup(&project_root, &config, true, |id| { if id == story_id { - Some(Stage::Coding) + Some(Stage::Coding { claim: None }) } else { None } diff --git a/server/src/worktree/sweep.rs b/server/src/worktree/sweep.rs index 7af28312..48976ad4 100644 --- a/server/src/worktree/sweep.rs +++ b/server/src/worktree/sweep.rs @@ -183,7 +183,9 @@ mod tests { #[test] fn should_not_sweep_coding() { - assert!(!worktree_should_be_swept(Some(&Stage::Coding))); + assert!(!worktree_should_be_swept(Some(&Stage::Coding { + claim: None + }))); } #[test] @@ -196,6 +198,7 @@ mod tests { let stage = Stage::Merge { feature_branch: crate::pipeline_state::BranchName("feature/x".to_string()), commits_ahead: NonZeroU32::new(1).unwrap(), + claim: None, }; assert!(!worktree_should_be_swept(Some(&stage))); } @@ -305,7 +308,7 @@ mod tests { let removed = sweep_with_lookup(&project_root, &config, |id| { if id == story_id { - Some(Stage::Coding) + Some(Stage::Coding { claim: None }) } else { None } @@ -374,6 +377,7 @@ mod tests { "feature/story-104_merge_story".to_string(), ), commits_ahead: NonZeroU32::new(1).unwrap(), + claim: None, }) } else { None