huskies: merge 1036

This commit is contained in:
dave
2026-05-14 15:07:57 +00:00
parent cfccc2e73c
commit ee20e54d40
17 changed files with 72 additions and 2 deletions
@@ -40,6 +40,7 @@ impl AgentPool {
commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}; };
let merge_items = scan_stage_items(&merge_stage); let merge_items = scan_stage_items(&merge_stage);
for story_id in &merge_items { for story_id in &merge_items {
@@ -187,6 +187,7 @@ mod tests {
commits_ahead: NonZeroU32::new(1).unwrap(), commits_ahead: NonZeroU32::new(1).unwrap(),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}, },
after: Stage::MergeFailure { after: Stage::MergeFailure {
kind: kind.clone(), kind: kind.clone(),
@@ -162,6 +162,7 @@ mod tests {
commits_ahead: NonZeroU32::new(1).unwrap(), commits_ahead: NonZeroU32::new(1).unwrap(),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}, },
after: crate::pipeline_state::Stage::MergeFailure { after: crate::pipeline_state::Stage::MergeFailure {
kind: kind.clone(), kind: kind.clone(),
@@ -136,6 +136,7 @@ mod tests {
commits_ahead: NonZeroU32::new(1).unwrap(), commits_ahead: NonZeroU32::new(1).unwrap(),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}, },
after: Stage::Done { after: Stage::Done {
merged_at: Utc::now(), merged_at: Utc::now(),
+1
View File
@@ -549,6 +549,7 @@ fn merge_stage() -> Stage {
commits_ahead: std::num::NonZeroU32::new(1).unwrap(), commits_ahead: std::num::NonZeroU32::new(1).unwrap(),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
} }
} }
+8
View File
@@ -403,6 +403,11 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemV
_ => None, _ => None,
}; };
let merge_server_start = match item.merge_server_start.view() {
JsonValue::Number(n) if n > 0.0 => Some(n),
_ => None,
};
let stage = project_stage_for_view( let stage = project_stage_for_view(
&stage_str, &stage_str,
&story_id, &story_id,
@@ -412,6 +417,7 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemV
claim_ts_secs, claim_ts_secs,
plan_state_str.as_deref(), plan_state_str.as_deref(),
retry_count_register, retry_count_register,
merge_server_start,
)?; )?;
Some(PipelineItemView { Some(PipelineItemView {
@@ -449,6 +455,7 @@ fn project_stage_for_view(
claim_ts_secs: Option<u64>, claim_ts_secs: Option<u64>,
plan_state_str: Option<&str>, plan_state_str: Option<&str>,
retries: u32, retries: u32,
merge_server_start: Option<f64>,
) -> Option<crate::pipeline_state::Stage> { ) -> Option<crate::pipeline_state::Stage> {
use crate::pipeline_state::{ use crate::pipeline_state::{
AgentClaim, AgentName, ArchiveReason, BranchName, GitSha, PlanState, Stage, AgentClaim, AgentName, ArchiveReason, BranchName, GitSha, PlanState, Stage,
@@ -518,6 +525,7 @@ fn project_stage_for_view(
commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"),
claim, claim,
retries, retries,
server_start_time: merge_server_start,
}), }),
"merge_failure" => { "merge_failure" => {
// Story 986: read the typed kind directly from ContentKey::MergeFailureKind // Story 986: read the typed kind directly from ContentKey::MergeFailureKind
+5
View File
@@ -100,6 +100,11 @@ pub struct PipelineItemCrdt {
/// Wire values: `"missing"` (default/empty), `"drafted"`, `"confirmed"`. /// Wire values: `"missing"` (default/empty), `"drafted"`, `"confirmed"`.
/// Updated by the filesystem watcher on PLAN.md create/modify/remove events. /// Updated by the filesystem watcher on PLAN.md create/modify/remove events.
pub plan_state: LwwRegisterCrdt<String>, pub plan_state: LwwRegisterCrdt<String>,
/// Story 1036: Unix timestamp (f64 seconds) of the server process that
/// started the currently active merge task for this item. Zero / absent
/// means no merge task is in flight. Projected into `Stage::Merge {
/// server_start_time }` so callers never read this register directly.
pub merge_server_start: LwwRegisterCrdt<f64>,
} }
/// CRDT node that holds a single peer's presence entry. /// CRDT node that holds a single peer's presence entry.
+19
View File
@@ -267,6 +267,14 @@ pub fn write_item(
Stage::Merge { retries, .. } => *retries as f64, Stage::Merge { retries, .. } => *retries as f64,
_ => 0.0, _ => 0.0,
}; };
// Extract merge_server_start from Stage::Merge; 0.0 clears the register.
let merge_server_start_val: f64 = match stage {
Stage::Merge {
server_start_time: Some(t),
..
} => *t,
_ => 0.0,
};
let Some(state_mutex) = get_crdt() else { let Some(state_mutex) = get_crdt() else {
return; return;
}; };
@@ -335,6 +343,11 @@ pub fn write_item(
apply_and_persist(&mut state, |s| { apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claim_ts.set(claim_ts_val) s.crdt.doc.items[idx].claim_ts.set(claim_ts_val)
}); });
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx]
.merge_server_start
.set(merge_server_start_val)
});
if let Some(ma) = merged_at { if let Some(ma) = merged_at {
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma)); apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma));
} }
@@ -380,6 +393,7 @@ pub fn write_item(
"epic": "", "epic": "",
"resume_to": "", "resume_to": "",
"plan_state": "", "plan_state": "",
"merge_server_start": merge_server_start_val,
}) })
.into(); .into();
@@ -409,6 +423,7 @@ pub fn write_item(
item.epic.advance_seq(floor); item.epic.advance_seq(floor);
item.resume_to.advance_seq(floor); item.resume_to.advance_seq(floor);
item.plan_state.advance_seq(floor); item.plan_state.advance_seq(floor);
item.merge_server_start.advance_seq(floor);
} }
// Broadcast a CrdtEvent for the new item. // Broadcast a CrdtEvent for the new item.
@@ -484,11 +499,13 @@ pub fn set_retry_count(story_id: &str, count: i64) {
commits_ahead, commits_ahead,
claim, claim,
retries: _, retries: _,
server_start_time,
} => Stage::Merge { } => Stage::Merge {
feature_branch, feature_branch,
commits_ahead, commits_ahead,
claim, claim,
retries: count.max(0) as u32, retries: count.max(0) as u32,
server_start_time,
}, },
_ => return, _ => return,
}; };
@@ -525,6 +542,7 @@ pub fn bump_retry_count(story_id: &str) -> i64 {
commits_ahead, commits_ahead,
claim, claim,
retries, retries,
server_start_time,
} => { } => {
let n = retries + 1; let n = retries + 1;
( (
@@ -533,6 +551,7 @@ pub fn bump_retry_count(story_id: &str) -> i64 {
commits_ahead, commits_ahead,
claim, claim,
retries: n, retries: n,
server_start_time,
}, },
n, n,
) )
@@ -391,6 +391,7 @@ mod stage_migration_tests {
commits_ahead: NonZeroU32::new(1).unwrap(), commits_ahead: NonZeroU32::new(1).unwrap(),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}, },
), ),
( (
+5
View File
@@ -56,6 +56,10 @@ pub enum ContentKey<'a> {
/// "completed" so the mergemaster agent exit handler in `spawn.rs` can /// "completed" so the mergemaster agent exit handler in `spawn.rs` can
/// distinguish a clean success from a transient crash (bug 1008). /// distinguish a clean success from a transient crash (bug 1008).
MergeSuccess(&'a str), MergeSuccess(&'a str),
/// JSON-serialised `MergeReport` written by the merge runner on successful
/// completion. Read by `get_merge_status` to surface gate output for the
/// "completed" state without a separate MergeJob CRDT register (story 1036).
MergeReport(&'a str),
} }
impl<'a> ContentKey<'a> { impl<'a> ContentKey<'a> {
@@ -80,6 +84,7 @@ impl<'a> ContentKey<'a> {
ContentKey::MergeFixupPending(id) => format!("{id}:merge_fixup_pending"), ContentKey::MergeFixupPending(id) => format!("{id}:merge_fixup_pending"),
ContentKey::MergeFailureKind(id) => format!("{id}:merge_failure_kind"), ContentKey::MergeFailureKind(id) => format!("{id}:merge_failure_kind"),
ContentKey::MergeSuccess(id) => format!("{id}:merge_success"), ContentKey::MergeSuccess(id) => format!("{id}:merge_success"),
ContentKey::MergeReport(id) => format!("{id}:merge_report"),
} }
} }
} }
+1
View File
@@ -30,6 +30,7 @@ pub(crate) fn purge_content_keys_for_story(story_id: &str) {
delete_content(ContentKey::CommitRecoveryPending(story_id)); delete_content(ContentKey::CommitRecoveryPending(story_id));
delete_content(ContentKey::MergeFixupPending(story_id)); delete_content(ContentKey::MergeFixupPending(story_id));
delete_content(ContentKey::MergeFailureKind(story_id)); delete_content(ContentKey::MergeFailureKind(story_id));
delete_content(ContentKey::MergeReport(story_id));
} }
/// Spawn a background task that purges content-store entries when a story reaches a terminal stage. /// Spawn a background task that purges content-store entries when a story reaches a terminal stage.
+1
View File
@@ -174,6 +174,7 @@ mod tests {
commits_ahead: NonZeroU32::new(3).unwrap(), commits_ahead: NonZeroU32::new(3).unwrap(),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}; };
// Stage::Merge has exactly two fields: feature_branch and commits_ahead. // Stage::Merge has exactly two fields: feature_branch and commits_ahead.
// There is no way to attach an agent name to it. The type system // There is no way to attach an agent name to it. The type system
+1
View File
@@ -177,6 +177,7 @@ mod tests {
commits_ahead: nz(1), commits_ahead: nz(1),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}, },
Some("Test"), Some("Test"),
); );
+9
View File
@@ -203,6 +203,7 @@ fn block_from_any_active_stage() {
commits_ahead: nz(1), commits_ahead: nz(1),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}; };
let result = transition( let result = transition(
m, m,
@@ -359,6 +360,7 @@ fn merge_failed_final() {
commits_ahead: nz(1), commits_ahead: nz(1),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}; };
let result = transition( let result = transition(
s, s,
@@ -462,6 +464,7 @@ fn bug_502_agent_not_in_stage() {
commits_ahead: NonZeroU32::new(3).unwrap(), commits_ahead: NonZeroU32::new(3).unwrap(),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}; };
// Stage::Merge has exactly two fields: feature_branch and commits_ahead. // Stage::Merge has exactly two fields: feature_branch and commits_ahead.
// There is no way to attach an agent name to it. The type system // There is no way to attach an agent name to it. The type system
@@ -552,6 +555,7 @@ fn reject_from_active_stages() {
commits_ahead: nz(1), commits_ahead: nz(1),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}; };
let result = transition( let result = transition(
m, m,
@@ -945,6 +949,7 @@ fn merge_aborted_returns_to_coding() {
commits_ahead: nz(2), commits_ahead: nz(2),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}; };
let result = transition(s, PipelineEvent::MergeAborted).unwrap(); let result = transition(s, PipelineEvent::MergeAborted).unwrap();
assert!( assert!(
@@ -1056,6 +1061,7 @@ fn hotfix_requested_rejected_from_non_done_stages() {
commits_ahead: nz(1), commits_ahead: nz(1),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}, },
] { ] {
let result = transition(stage.clone(), PipelineEvent::HotfixRequested); let result = transition(stage.clone(), PipelineEvent::HotfixRequested);
@@ -1101,6 +1107,7 @@ fn audit_entry_is_single_line_with_all_fields() {
commits_ahead: nz(3), commits_ahead: nz(3),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}, },
event: PipelineEvent::GatesPassed { event: PipelineEvent::GatesPassed {
feature_branch: fb("feature/story-42"), feature_branch: fb("feature/story-42"),
@@ -1139,6 +1146,7 @@ fn audit_entry_merge_to_done() {
commits_ahead: nz(1), commits_ahead: nz(1),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}, },
after: Stage::Done { after: Stage::Done {
merged_at: chrono::Utc::now(), merged_at: chrono::Utc::now(),
@@ -1232,6 +1240,7 @@ fn audit_entry_merge_to_merge_failure() {
commits_ahead: nz(1), commits_ahead: nz(1),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}, },
after: Stage::MergeFailure { after: Stage::MergeFailure {
kind: MergeFailureKind::Other("conflicts".into()), kind: MergeFailureKind::Other("conflicts".into()),
+3
View File
@@ -166,6 +166,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
commits_ahead, commits_ahead,
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}), }),
( (
Qa, Qa,
@@ -178,6 +179,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
commits_ahead, commits_ahead,
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}), }),
(Qa, GatesFailed { .. }) => Ok(Coding { (Qa, GatesFailed { .. }) => Ok(Coding {
claim: None, claim: None,
@@ -394,6 +396,7 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
commits_ahead, commits_ahead,
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}), }),
// ── Demote MergeFailure → Backlog (manual parking) ─────────────── // ── Demote MergeFailure → Backlog (manual parking) ───────────────
+12 -2
View File
@@ -195,7 +195,7 @@ impl PlanState {
/// | superseded | `Superseded { .. }` | /// | superseded | `Superseded { .. }` |
/// | rejected | `Rejected { .. }` | /// | rejected | `Rejected { .. }` |
/// | abandoned | `Abandoned { .. }` | /// | abandoned | `Abandoned { .. }` |
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum Stage { pub enum Stage {
/// Story has been created but not yet triaged into the backlog. /// Story has been created but not yet triaged into the backlog.
Upcoming, Upcoming,
@@ -233,6 +233,12 @@ pub enum Stage {
/// ///
/// `retries` counts how many times the mergemaster agent has been restarted /// `retries` counts how many times the mergemaster agent has been restarted
/// for this item. Replaces the separate `retry_count` CRDT register (story 997). /// for this item. Replaces the separate `retry_count` CRDT register (story 997).
///
/// `server_start_time` is the Unix timestamp (f64 seconds) captured when
/// the current server process first called `server_start_time()`. Written
/// by the merge runner when it begins a merge task; `None` means no merge
/// task is in flight on any node right now. Used by `reap_stale_merge_jobs`
/// to detect merges left running by a previous server process (story 1036).
Merge { Merge {
feature_branch: BranchName, feature_branch: BranchName,
commits_ahead: NonZeroU32, commits_ahead: NonZeroU32,
@@ -240,6 +246,9 @@ pub enum Stage {
claim: Option<AgentClaim>, claim: Option<AgentClaim>,
/// Number of mergemaster restarts for this item. Zero on the first attempt. /// Number of mergemaster restarts for this item. Zero on the first attempt.
retries: u32, retries: u32,
/// Unix timestamp of the server process that started the active merge task.
/// `None` means no merge task is currently in flight.
server_start_time: Option<f64>,
}, },
/// Mergemaster squashed to master. Always carries merge metadata. /// Mergemaster squashed to master. Always carries merge metadata.
@@ -371,6 +380,7 @@ impl Stage {
commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}), }),
"merge_failure" => Some(Stage::MergeFailure { "merge_failure" => Some(Stage::MergeFailure {
kind: MergeFailureKind::Other(String::new()), kind: MergeFailureKind::Other(String::new()),
@@ -455,7 +465,7 @@ pub enum ExecutionState {
/// ///
/// The retry count is no longer a top-level field — callers read it from the /// The retry count is no longer a top-level field — callers read it from the
/// Stage variant (`Stage::Coding { retries }` / `Stage::Merge { retries }`). /// Stage variant (`Stage::Coding { retries }` / `Stage::Merge { retries }`).
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PipelineItem { pub struct PipelineItem {
pub story_id: StoryId, pub story_id: StoryId,
pub name: String, pub name: String,
+2
View File
@@ -196,6 +196,7 @@ mod tests {
commits_ahead: NonZeroU32::new(1).unwrap(), commits_ahead: NonZeroU32::new(1).unwrap(),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}; };
assert!(!worktree_should_be_swept(Some(&stage))); assert!(!worktree_should_be_swept(Some(&stage)));
} }
@@ -380,6 +381,7 @@ mod tests {
commits_ahead: NonZeroU32::new(1).unwrap(), commits_ahead: NonZeroU32::new(1).unwrap(),
claim: None, claim: None,
retries: 0, retries: 0,
server_start_time: None,
}) })
} else { } else {
None None