Compare commits

...

3 Commits

Author SHA1 Message Date
Timmy 38df9c78af test(916): use far-future reset_at in inactivity-extension regression test to avoid spawn-time race
The original 90b31fc8 test computed reset_at = now + 3s in the test thread,
then relied on the script spawning fast enough that the rate_limit_event
arrived while reset_at was still meaningfully in the future. Under
cargo-test load the spawn could take long enough that block_until - now
clamped to 0 and the inactivity timeout killed the script before its sleep
finished. Pin reset_at to 2099-01-01 (matching the existing
rate_limit_hard_block_sends_watcher_hard_block_event test) so the
extension is essentially infinite and the assertion isolates the
extension-vs-no-extension behavior from wall-clock slack.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 16:36:24 +01:00
dave a34c9796b5 huskies: merge 913 2026-05-12 15:30:23 +00:00
Timmy 90b31fc84f fix(916): rate-limit hard block extends inactivity deadline so the watchdog doesn't kill mid-wait
When claude-code emits a rate_limit_event with status != allowed_warning,
the subprocess waits internally for the limit to clear before retrying. No
PTY output flows during that window, so the inactivity timeout in the PTY
runner would fire and kill the agent — mergemaster especially, whose
15-minute inactivity window is shorter than typical rate-limit backoffs.

Track `block_until = Some(reset_at)` on hard-block events and add the
remaining time-until-reset to the per-iteration recv timeout. Once reset_at
passes (or an earlier emit arrives), the extension implicitly drops to 0
and the base inactivity timeout resumes. Turn/budget counts aren't affected
— they come from the session log and only advance when API calls actually
complete, so a stalled retry doesn't burn either.

Regression test in agents/pty/mod.rs spawns a script that emits a hard-block
with reset_at = now+3s, sleeps 3s, then exits, with inactivity_timeout_secs
= 1. Without the fix the runner kills the script at 1s; with the fix the
deadline is bumped past the sleep and the run completes cleanly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 16:22:21 +01:00
7 changed files with 212 additions and 30 deletions
+13 -5
View File
@@ -12,8 +12,8 @@ use std::process::Command;
use crate::db::yaml_legacy::clear_front_matter_field_in_content;
use crate::pipeline_state::{
ApplyError, ArchiveReason, BranchName, GitSha, PipelineEvent, Stage, apply_transition,
stage_label,
ApplyError, ArchiveReason, BranchName, GitSha, PipelineEvent, Stage, TransitionFired,
apply_transition, stage_label,
};
use crate::slog;
@@ -248,13 +248,22 @@ pub fn transition_to_blocked(story_id: &str, reason: &str) -> Result<(), String>
.map_err(|e| e.to_string())
}
/// Transition a story from `Stage::Merge` to `Stage::MergeFailure` via the state machine.
/// Transition a story from `Stage::Merge` (or `Stage::MergeFailure`) to
/// `Stage::MergeFailure` via the state machine.
///
/// Builds a `PipelineEvent::MergeFailed { reason }`, validates the transition, writes
/// the resulting `Stage::MergeFailure` to the CRDT, and persists the reason to front
/// matter so it survives server restarts.
///
/// When the story is already in `MergeFailure`, this is a silent self-loop: the
/// returned `TransitionFired::before` will be `Stage::MergeFailure`. Callers
/// should suppress re-notification in that case to avoid duplicate chat messages.
///
/// Returns `Err` on `TransitionError` — callers must NOT fall back to direct register writes.
pub fn transition_to_merge_failure(story_id: &str, reason: &str) -> Result<(), String> {
pub fn transition_to_merge_failure(
story_id: &str,
reason: &str,
) -> Result<TransitionFired, String> {
let reason_owned = reason.to_string();
let transform: Box<dyn Fn(&str) -> String> = Box::new(move |content: &str| {
crate::db::yaml_legacy::write_merge_failure_in_content(content, &reason_owned)
@@ -266,7 +275,6 @@ pub fn transition_to_merge_failure(story_id: &str, reason: &str) -> Result<(), S
},
Some(&*transform),
)
.map(|_| ())
.map_err(|e| e.to_string())
}
+23 -12
View File
@@ -126,19 +126,30 @@ impl AgentPool {
});
} else {
// Transition through the state machine (Merge → MergeFailure).
if let Err(e) =
crate::agents::lifecycle::transition_to_merge_failure(&sid, &reason)
{
crate::slog_error!(
"[merge] Failed to transition '{sid}' to MergeFailure: {e}"
);
// Only send the notification when the stage actually changed; if the
// story was already in MergeFailure (self-loop), suppress the duplicate.
let should_notify = match crate::agents::lifecycle::transition_to_merge_failure(
&sid, &reason,
) {
Ok(fired) => !matches!(
fired.before,
crate::pipeline_state::Stage::MergeFailure { .. }
),
Err(e) => {
crate::slog_error!(
"[merge] Failed to transition '{sid}' to MergeFailure: {e}"
);
true
}
};
if should_notify {
let _ =
pool.watcher_tx
.send(crate::io::watcher::WatcherEvent::MergeFailure {
story_id: sid.clone(),
reason,
});
}
let _ = pool
.watcher_tx
.send(crate::io::watcher::WatcherEvent::MergeFailure {
story_id: sid.clone(),
reason,
});
}
}
+49
View File
@@ -204,6 +204,55 @@ mod tests {
}
}
/// Story 916: a rate-limit hard block must extend the inactivity deadline
/// by the backoff duration so the watchdog doesn't kill the agent while
/// it's legitimately waiting for the limit to clear.
///
/// Script emits a hard-block event with reset_at in the far future, then
/// sleeps 3s, then exits. With inactivity_timeout_secs = 1, the run
/// would normally fail at the 1s mark; with the extension the deadline
/// is bumped past the sleep and the script completes cleanly. The
/// far-future reset_at avoids wall-clock races under cargo-test load.
#[tokio::test]
async fn rate_limit_hard_block_extends_inactivity_deadline() {
use std::os::unix::fs::PermissionsExt;
let tmp = tempfile::tempdir().unwrap();
let script = tmp.path().join("emit_then_wait.sh");
let body =
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"hard_block\",\"reset_at\":\"2099-01-01T12:00:00Z\"}}'\nsleep 3\n";
std::fs::write(&script, body).unwrap();
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
let (watcher_tx, _watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let event_log = Arc::new(Mutex::new(Vec::new()));
let child_killers = Arc::new(Mutex::new(HashMap::new()));
let result = run_agent_pty_streaming(
"916_story_rate_limit_extension",
"mergemaster",
"sh",
&[script.to_string_lossy().to_string()],
"--",
"/tmp",
&tx,
&event_log,
None,
1, // inactivity_timeout_secs = 1s; would expire before the 3s sleep without the extension
child_killers,
watcher_tx,
None,
)
.await;
assert!(
result.is_ok(),
"PTY run should not be killed by inactivity timeout during rate-limit block: {:?}",
result.err()
);
}
#[test]
fn test_emit_event_writes_to_log_writer() {
let tmp = tempfile::tempdir().unwrap();
+23 -2
View File
@@ -232,7 +232,7 @@ fn run_agent_pty_blocking(
slog!("[agent:{sid_for_reader}:{aname_for_reader}] Reader thread exiting");
});
let timeout_dur = if inactivity_timeout_secs > 0 {
let base_timeout = if inactivity_timeout_secs > 0 {
Some(std::time::Duration::from_secs(inactivity_timeout_secs))
} else {
None
@@ -240,9 +240,24 @@ fn run_agent_pty_blocking(
let mut session_id: Option<String> = None;
let mut token_usage: Option<TokenUsage> = None;
// When the agent receives a rate-limit hard block, claude-code waits
// internally for the limit to clear and emits no PTY output during that
// window. Extend each recv's deadline by `block_until - now` so the
// inactivity timeout doesn't fire while we're legitimately waiting on
// Anthropic. Once `block_until` passes (or the block is cleared by an
// earlier-than-expected emit), the extension implicitly drops back to 0
// and the base inactivity timeout resumes (story 916).
let mut block_until: Option<chrono::DateTime<chrono::Utc>> = None;
loop {
let recv_result = match timeout_dur {
let effective_timeout = base_timeout.map(|base| {
let extra = block_until
.and_then(|t| (t - chrono::Utc::now()).to_std().ok())
.unwrap_or(std::time::Duration::ZERO);
base + extra
});
let recv_result = match effective_timeout {
Some(dur) => line_rx.recv_timeout(dur),
None => line_rx
.recv()
@@ -351,6 +366,12 @@ fn run_agent_pty_blocking(
default
}
};
// Pause the inactivity clock until the rate limit resets
// (story 916). Without this the watchdog kills the agent
// mid-wait — mergemaster especially, whose 15-minute
// inactivity window is shorter than typical rate-limit
// backoffs.
block_until = Some(reset_at);
let _ = watcher_tx.send(WatcherEvent::RateLimitHardBlock {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
+20 -11
View File
@@ -179,19 +179,28 @@ pub(super) fn tool_report_merge_failure(args: &Value, ctx: &AppContext) -> Resul
slog!("[mergemaster] Merge failure reported for '{story_id}': {reason}");
ctx.services.agents.set_merge_failure_reported(story_id);
// Broadcast the failure so the Matrix notification listener can post an
// error message to configured rooms without coupling this tool to the bot.
let _ = ctx
.watcher_tx
.send(crate::io::watcher::WatcherEvent::MergeFailure {
story_id: story_id.to_string(),
reason: reason.to_string(),
});
// Route the failure through the typed state machine (Merge → MergeFailure).
// This persists the reason in front matter and updates the CRDT stage.
if let Err(e) = crate::agents::lifecycle::transition_to_merge_failure(story_id, reason) {
slog_warn!("[mergemaster] Failed to transition '{story_id}' to MergeFailure: {e}");
// Only broadcast the notification when the stage actually changed; if the
// story was already in MergeFailure (self-loop), suppress the duplicate.
let should_notify =
match crate::agents::lifecycle::transition_to_merge_failure(story_id, reason) {
Ok(fired) => !matches!(
fired.before,
crate::pipeline_state::Stage::MergeFailure { .. }
),
Err(e) => {
slog_warn!("[mergemaster] Failed to transition '{story_id}' to MergeFailure: {e}");
true
}
};
if should_notify {
let _ = ctx
.watcher_tx
.send(crate::io::watcher::WatcherEvent::MergeFailure {
story_id: story_id.to_string(),
reason: reason.to_string(),
});
}
Ok(format!(
+78
View File
@@ -750,4 +750,82 @@ fn merge_failure_transition_emits_event_with_full_reason() {
);
}
// ── Story 913: MergeFailure + MergeFailed self-loop ────────────────
/// AC1: `MergeFailure + MergeFailed` is a valid self-transition — no error logged.
#[test]
fn merge_failure_plus_merge_failed_is_self_loop() {
let s = Stage::MergeFailure {
reason: "initial failure".into(),
};
let result = transition(
s,
PipelineEvent::MergeFailed {
reason: "second failure".into(),
},
);
assert!(
matches!(result, Ok(Stage::MergeFailure { .. })),
"MergeFailure + MergeFailed should self-loop to MergeFailure, got: {result:?}"
);
}
/// AC2 + AC3: applying `MergeFailed` to a story already in `MergeFailure` succeeds and
/// the `TransitionFired::before` is `MergeFailure`, allowing callers to suppress the
/// duplicate notification.
#[test]
fn repeated_merge_failure_apply_transition_no_error_no_duplicate_notification() {
crate::crdt_state::init_for_test();
crate::db::ensure_content_store();
let story_id = "99913_story_merge_failure_selfloop";
crate::db::write_item_with_content(
story_id,
"4_merge_failure",
"---\nname: MergeFailure Self-loop Test\n---\n# Story\n",
crate::db::ItemMeta::from_yaml("---\nname: MergeFailure Self-loop Test\n---\n# Story\n"),
);
// Apply a second MergeFailed to a story already in MergeFailure.
let fired = super::apply::apply_transition(
story_id,
PipelineEvent::MergeFailed {
reason: "duplicate failure".into(),
},
None,
)
.expect("MergeFailed on already-failed story should succeed without error");
// The before-stage was MergeFailure: this was a self-loop.
// Callers check this to decide whether to suppress the chat notification.
assert!(
matches!(fired.before, Stage::MergeFailure { .. }),
"fired.before should be MergeFailure (self-loop): {:?}",
fired.before
);
assert!(
matches!(fired.after, Stage::MergeFailure { .. }),
"fired.after should remain MergeFailure: {:?}",
fired.after
);
// Verify the CRDT stage is still 4_merge_failure.
let item = read_typed(story_id)
.expect("CRDT read should succeed")
.expect("item should still exist");
assert_eq!(
item.stage.dir_name(),
"4_merge_failure",
"CRDT stage should remain 4_merge_failure after self-loop"
);
// Simulate the caller's de-dup logic: since fired.before is already MergeFailure,
// no notification should be dispatched.
let should_notify = !matches!(fired.before, Stage::MergeFailure { .. });
assert!(
!should_notify,
"should_notify must be false for a self-loop to prevent duplicate notification"
);
}
// ── ProjectionError Display ─────────────────────────────────────────
+6
View File
@@ -181,6 +181,12 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
// ── MergeFailed: Merge → MergeFailure (recoverable intermediate) ──
(Merge { .. }, MergeFailed { reason }) => Ok(MergeFailure { reason }),
// ── MergeFailure self-loop: repeated failure is a no-op ─────────────
// When the mergemaster retries and fails again while the story is already
// in MergeFailure, treat it as a silent self-transition so callers can
// detect the no-op via `fired.before == MergeFailure` and skip re-notifying.
(MergeFailure { .. }, MergeFailed { reason }) => Ok(MergeFailure { reason }),
(Merge { .. }, MergeFailedFinal { reason }) => Ok(Archived {
archived_at: now,
reason: ArchiveReason::MergeFailed { reason },