huskies: merge 479_story_build_agent_mode_with_crdt_based_work_claiming

This commit is contained in:
dave
2026-04-10 18:46:44 +00:00
parent 91be0ac47f
commit 6f7a0c7708
10 changed files with 714 additions and 9 deletions
+144
View File
@@ -91,6 +91,14 @@ pub struct PipelineItemCrdt {
pub retry_count: LwwRegisterCrdt<f64>,
pub blocked: LwwRegisterCrdt<bool>,
pub depends_on: LwwRegisterCrdt<String>,
/// Node ID (hex-encoded Ed25519 pubkey) of the node that claimed this item.
/// Used for distributed work claiming — the LWW register resolves conflicts
/// deterministically so all nodes converge on the same claimer.
pub claimed_by: LwwRegisterCrdt<String>,
/// Unix timestamp (seconds) when the claim was written.
/// Used for timeout-based reclaim: if a node crashes, other nodes can
/// reclaim the item after the timeout expires.
pub claimed_at: LwwRegisterCrdt<f64>,
}
/// CRDT node that holds a single peer's presence entry.
@@ -119,6 +127,10 @@ pub struct PipelineItemView {
pub retry_count: Option<i64>,
pub blocked: Option<bool>,
pub depends_on: Option<Vec<u32>>,
/// Node ID of the node that claimed this item (hex-encoded Ed25519 pubkey).
pub claimed_by: Option<String>,
/// Unix timestamp when the item was claimed.
pub claimed_at: Option<f64>,
}
/// A snapshot of a single node presence entry derived from the CRDT document.
@@ -366,6 +378,7 @@ where
///
/// When the stage changes (or a new item is created), a [`CrdtEvent`] is
/// broadcast so subscribers can react to the transition.
#[allow(clippy::too_many_arguments)]
pub fn write_item(
story_id: &str,
stage: &str,
@@ -374,6 +387,8 @@ pub fn write_item(
retry_count: Option<i64>,
blocked: Option<bool>,
depends_on: Option<&str>,
claimed_by: Option<&str>,
claimed_at: Option<f64>,
) {
let Some(state_mutex) = CRDT_STATE.get() else {
return;
@@ -419,6 +434,16 @@ pub fn write_item(
s.crdt.doc.items[idx].depends_on.set(d.to_string())
});
}
if let Some(cb) = claimed_by {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claimed_by.set(cb.to_string())
});
}
if let Some(ca) = claimed_at {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claimed_at.set(ca)
});
}
// Broadcast a CrdtEvent if the stage actually changed.
let stage_changed = old_stage.as_deref() != Some(stage);
@@ -445,6 +470,8 @@ pub fn write_item(
"retry_count": retry_count.unwrap_or(0) as f64,
"blocked": blocked.unwrap_or(false),
"depends_on": depends_on.unwrap_or(""),
"claimed_by": claimed_by.unwrap_or(""),
"claimed_at": claimed_at.unwrap_or(0.0),
})
.into();
@@ -573,6 +600,73 @@ pub fn our_node_id() -> Option<String> {
Some(hex::encode(&state.crdt.id))
}
/// Write a claim on a pipeline item via CRDT.
///
/// Sets `claimed_by` to this node's ID and `claimed_at` to the current time.
/// The LWW register ensures deterministic conflict resolution — if two nodes
/// claim the same item simultaneously, both will converge to the same winner
/// after CRDT sync.
///
/// Returns `true` if the claim was written, `false` if the item doesn't exist
/// or CRDT is not initialised.
pub fn write_claim(story_id: &str) -> bool {
let Some(node_id) = our_node_id() else {
return false;
};
let now = chrono::Utc::now().timestamp() as f64;
let Some(state_mutex) = CRDT_STATE.get() else {
return false;
};
let Ok(mut state) = state_mutex.lock() else {
return false;
};
let Some(&idx) = state.index.get(story_id) else {
return false;
};
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claimed_by.set(node_id.clone())
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claimed_at.set(now)
});
true
}
/// Release a claim on a pipeline item (clear claimed_by and claimed_at).
pub fn release_claim(story_id: &str) {
let Some(state_mutex) = CRDT_STATE.get() else {
return;
};
let Ok(mut state) = state_mutex.lock() else {
return;
};
let Some(&idx) = state.index.get(story_id) else {
return;
};
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claimed_by.set(String::new())
});
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claimed_at.set(0.0)
});
}
/// Check if this node currently holds the claim on a pipeline item.
pub fn is_claimed_by_us(story_id: &str) -> bool {
let Some(node_id) = our_node_id() else {
return false;
};
let Some(item) = read_item(story_id) else {
return false;
};
item.claimed_by.as_deref() == Some(&node_id)
}
/// Write or update a node presence entry in the CRDT.
///
/// If a node with the given `node_id` already exists, only `last_seen`,
@@ -673,6 +767,8 @@ pub struct CrdtItemDump {
pub retry_count: Option<i64>,
pub blocked: Option<bool>,
pub depends_on: Option<Vec<u32>>,
pub claimed_by: Option<String>,
pub claimed_at: Option<f64>,
/// Hex-encoded OpId of the list insert op — cross-reference with `crdt_ops`.
pub content_index: String,
pub is_deleted: bool,
@@ -793,6 +889,15 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump {
_ => None,
};
let claimed_by = match item_crdt.claimed_by.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let claimed_at = match item_crdt.claimed_at.view() {
JsonValue::Number(n) if n > 0.0 => Some(n),
_ => None,
};
let content_index = op.id.iter().map(|b| format!("{b:02x}")).collect::<String>();
items.push(CrdtItemDump {
@@ -803,6 +908,8 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump {
retry_count,
blocked,
depends_on,
claimed_by,
claimed_at,
content_index,
is_deleted: op.is_deleted,
});
@@ -941,6 +1048,15 @@ fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemView> {
_ => None,
};
let claimed_by = match item.claimed_by.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let claimed_at = match item.claimed_at.view() {
JsonValue::Number(n) if n > 0.0 => Some(n),
_ => None,
};
Some(PipelineItemView {
story_id,
stage,
@@ -949,6 +1065,8 @@ fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemView> {
retry_count,
blocked,
depends_on,
claimed_by,
claimed_at,
})
}
@@ -1049,6 +1167,8 @@ mod tests {
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
@@ -1076,6 +1196,8 @@ mod tests {
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
@@ -1106,6 +1228,8 @@ mod tests {
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
@@ -1147,6 +1271,8 @@ mod tests {
"retry_count": 2.0,
"blocked": true,
"depends_on": "[10,20]",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
@@ -1177,6 +1303,8 @@ mod tests {
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
@@ -1213,6 +1341,8 @@ mod tests {
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
@@ -1266,6 +1396,8 @@ mod tests {
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
@@ -1385,6 +1517,8 @@ mod tests {
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp);
@@ -1406,6 +1540,8 @@ mod tests {
"retry_count": 1.0,
"blocked": false,
"depends_on": "[10]",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp);
@@ -1433,6 +1569,8 @@ mod tests {
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp);
@@ -1479,6 +1617,8 @@ mod tests {
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
@@ -1495,6 +1635,8 @@ mod tests {
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp);
@@ -1608,6 +1750,8 @@ mod tests {
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();