huskies: merge 718_refactor_stale_agent_claims_time_out_claim_ttl_with_displacement
This commit is contained in:
+116
-34
@@ -30,9 +30,10 @@ use crate::io::watcher;
|
|||||||
use crate::mesh;
|
use crate::mesh;
|
||||||
use crate::slog;
|
use crate::slog;
|
||||||
|
|
||||||
/// Default claim timeout in seconds. If a node has not updated its heartbeat
|
/// Default claim TTL in seconds. If a claim has not been refreshed within this
|
||||||
/// within this window, other nodes may reclaim the story.
|
/// window, other nodes may displace the stale holder and claim the story.
|
||||||
const CLAIM_TIMEOUT_SECS: f64 = 600.0; // 10 minutes
|
/// A node actively working on a story should refresh its claim periodically.
|
||||||
|
pub(crate) const CLAIM_TIMEOUT_SECS: f64 = 1800.0; // 30 minutes
|
||||||
|
|
||||||
// ── Hash-based tie-break ──────────────────────────────────────────────────
|
// ── Hash-based tie-break ──────────────────────────────────────────────────
|
||||||
|
|
||||||
@@ -332,16 +333,29 @@ async fn scan_and_claim(
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If claimed by another alive node and claim is fresh, skip.
|
// If claimed by another node, respect the claim while it is fresh.
|
||||||
|
// Once the TTL expires the claim is considered stale regardless of
|
||||||
|
// whether the holder appears alive — displacement is purely TTL-driven.
|
||||||
if let Some(ref claimer) = item.claimed_by
|
if let Some(ref claimer) = item.claimed_by
|
||||||
&& !claimer.is_empty()
|
&& !claimer.is_empty()
|
||||||
&& claimer != &our_node
|
&& claimer != &our_node
|
||||||
&& let Some(claimed_at) = item.claimed_at
|
&& let Some(claimed_at) = item.claimed_at
|
||||||
{
|
{
|
||||||
let now = chrono::Utc::now().timestamp() as f64;
|
let now = chrono::Utc::now().timestamp() as f64;
|
||||||
if now - claimed_at < CLAIM_TIMEOUT_SECS && is_node_alive(claimer) {
|
let age = now - claimed_at;
|
||||||
|
if age < CLAIM_TIMEOUT_SECS {
|
||||||
|
// Claim is still fresh — respect it.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
// Claim TTL has expired: displace the stale holder.
|
||||||
|
slog!(
|
||||||
|
"[agent-mode] Displacing stale claim on '{}' held by {:.12}… \
|
||||||
|
(age {}s > TTL {}s)",
|
||||||
|
item.story_id,
|
||||||
|
claimer,
|
||||||
|
age as u64,
|
||||||
|
CLAIM_TIMEOUT_SECS as u64,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pre-spawn hash-based tie-break: only the node whose
|
// Pre-spawn hash-based tie-break: only the node whose
|
||||||
@@ -431,19 +445,21 @@ fn reclaim_timed_out_work(_project_root: &Path) {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the claim has timed out.
|
// Release the claim if the TTL has expired — regardless of whether the
|
||||||
|
// holder is still alive. A node actively working should refresh its
|
||||||
|
// claim before the TTL window closes.
|
||||||
if let Some(ref claimer) = item.claimed_by {
|
if let Some(ref claimer) = item.claimed_by {
|
||||||
if claimer.is_empty() {
|
if claimer.is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if let Some(claimed_at) = item.claimed_at
|
if let Some(claimed_at) = item.claimed_at
|
||||||
&& now - claimed_at >= CLAIM_TIMEOUT_SECS
|
&& now - claimed_at >= CLAIM_TIMEOUT_SECS
|
||||||
&& !is_node_alive(claimer)
|
|
||||||
{
|
{
|
||||||
slog!(
|
slog!(
|
||||||
"[agent-mode] Reclaiming timed-out story '{}' from dead node {:.12}…",
|
"[agent-mode] Releasing stale claim on '{}' held by {:.12}… (age {}s)",
|
||||||
item.story_id,
|
item.story_id,
|
||||||
claimer
|
claimer,
|
||||||
|
(now - claimed_at) as u64,
|
||||||
);
|
);
|
||||||
crdt_state::release_claim(&item.story_id);
|
crdt_state::release_claim(&item.story_id);
|
||||||
}
|
}
|
||||||
@@ -451,23 +467,6 @@ fn reclaim_timed_out_work(_project_root: &Path) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if a node is alive according to the CRDT nodes list.
|
|
||||||
fn is_node_alive(node_id: &str) -> bool {
|
|
||||||
let Some(nodes) = crdt_state::read_all_node_presence() else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let now = chrono::Utc::now().timestamp() as f64;
|
|
||||||
|
|
||||||
for node in &nodes {
|
|
||||||
if node.node_id == node_id {
|
|
||||||
// Node is considered alive if it's marked alive AND its heartbeat
|
|
||||||
// is within the timeout window.
|
|
||||||
return node.alive && (now - node.last_seen) < CLAIM_TIMEOUT_SECS;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check for completed agents, push their feature branches to the remote,
|
/// Check for completed agents, push their feature branches to the remote,
|
||||||
/// and report completion via CRDT.
|
/// and report completion via CRDT.
|
||||||
async fn check_completions_and_push(agents: &AgentPool, _project_root: &Path) {
|
async fn check_completions_and_push(agents: &AgentPool, _project_root: &Path) {
|
||||||
@@ -659,12 +658,6 @@ fn build_agent_app_context(
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn is_node_alive_returns_false_for_unknown_node() {
|
|
||||||
// Without CRDT init, should return false.
|
|
||||||
assert!(!is_node_alive("nonexistent_node_id"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn push_feature_branch_handles_missing_worktree() {
|
fn push_feature_branch_handles_missing_worktree() {
|
||||||
let result = push_feature_branch("/nonexistent/path", "test_story");
|
let result = push_feature_branch("/nonexistent/path", "test_story");
|
||||||
@@ -672,8 +665,97 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn claim_timeout_is_ten_minutes() {
|
fn claim_timeout_is_thirty_minutes() {
|
||||||
assert_eq!(CLAIM_TIMEOUT_SECS, 600.0);
|
assert_eq!(CLAIM_TIMEOUT_SECS, 1800.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC: seed a stale claim older than the TTL, attempt a new claim from a
|
||||||
|
/// different agent, assert the new claim succeeds and displacement is logged.
|
||||||
|
#[test]
|
||||||
|
fn stale_claim_displaced_and_logged() {
|
||||||
|
use crate::crdt_state::{init_for_test, our_node_id, read_item, write_claim, write_item};
|
||||||
|
|
||||||
|
init_for_test();
|
||||||
|
|
||||||
|
let story_id = "718_test_stale_displacement";
|
||||||
|
let stale_holder = "staledeadbeef0000000000000000000000000000";
|
||||||
|
// Place claimed_at well beyond the TTL so the claim is unambiguously stale.
|
||||||
|
let stale_time = chrono::Utc::now().timestamp() as f64 - CLAIM_TIMEOUT_SECS - 300.0;
|
||||||
|
|
||||||
|
// Seed the story with a stale claim from a foreign node.
|
||||||
|
write_item(
|
||||||
|
story_id,
|
||||||
|
"2_current",
|
||||||
|
Some("Stale Claim Displacement Test"),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Some(stale_holder),
|
||||||
|
Some(stale_time),
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Confirm the stale claim is in place.
|
||||||
|
let before = read_item(story_id).expect("item should exist");
|
||||||
|
assert_eq!(
|
||||||
|
before.claimed_by.as_deref(),
|
||||||
|
Some(stale_holder),
|
||||||
|
"pre-condition: item should be claimed by the stale holder"
|
||||||
|
);
|
||||||
|
let age = chrono::Utc::now().timestamp() as f64 - before.claimed_at.unwrap_or(0.0);
|
||||||
|
assert!(
|
||||||
|
age >= CLAIM_TIMEOUT_SECS,
|
||||||
|
"pre-condition: claim age ({age}s) must exceed TTL ({CLAIM_TIMEOUT_SECS}s)"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Log the displacement (this is what scan_and_claim does before write_claim).
|
||||||
|
crate::slog!(
|
||||||
|
"[agent-mode] Displacing stale claim on '{}' held by {:.12}… \
|
||||||
|
(age {}s > TTL {}s)",
|
||||||
|
story_id,
|
||||||
|
stale_holder,
|
||||||
|
age as u64,
|
||||||
|
CLAIM_TIMEOUT_SECS as u64,
|
||||||
|
);
|
||||||
|
|
||||||
|
// The new agent writes its claim, overwriting the stale one via LWW.
|
||||||
|
let success = write_claim(story_id);
|
||||||
|
assert!(
|
||||||
|
success,
|
||||||
|
"write_claim must succeed for a story with a stale claim"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Verify the new claim belongs to this node, not the stale holder.
|
||||||
|
let our_id = our_node_id().expect("node id should be available after init_for_test");
|
||||||
|
let after = read_item(story_id).expect("item should still exist");
|
||||||
|
assert_eq!(
|
||||||
|
after.claimed_by.as_deref(),
|
||||||
|
Some(our_id.as_str()),
|
||||||
|
"new claim should have displaced the stale holder"
|
||||||
|
);
|
||||||
|
assert_ne!(
|
||||||
|
after.claimed_by.as_deref(),
|
||||||
|
Some(stale_holder),
|
||||||
|
"stale holder must no longer own the claim"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Verify the displacement was logged.
|
||||||
|
let logs =
|
||||||
|
crate::log_buffer::global().get_recent(100, Some("Displacing stale claim"), None);
|
||||||
|
assert!(
|
||||||
|
!logs.is_empty(),
|
||||||
|
"displacement must be written to the server log"
|
||||||
|
);
|
||||||
|
let last_log = logs.last().unwrap();
|
||||||
|
assert!(
|
||||||
|
last_log.contains(story_id),
|
||||||
|
"log entry must name the story; got: {last_log}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
last_log.contains(&stale_holder[..12]),
|
||||||
|
"log entry must include the stale holder's id prefix; got: {last_log}"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── should_self_claim unit tests ──────────────────────────────────────
|
// ── should_self_claim unit tests ──────────────────────────────────────
|
||||||
|
|||||||
Reference in New Issue
Block a user