Compare commits
3 Commits
8421104645
...
38df9c78af
| Author | SHA1 | Date | |
|---|---|---|---|
| 38df9c78af | |||
| a34c9796b5 | |||
| 90b31fc84f |
@@ -12,8 +12,8 @@ use std::process::Command;
|
||||
|
||||
use crate::db::yaml_legacy::clear_front_matter_field_in_content;
|
||||
use crate::pipeline_state::{
|
||||
ApplyError, ArchiveReason, BranchName, GitSha, PipelineEvent, Stage, apply_transition,
|
||||
stage_label,
|
||||
ApplyError, ArchiveReason, BranchName, GitSha, PipelineEvent, Stage, TransitionFired,
|
||||
apply_transition, stage_label,
|
||||
};
|
||||
use crate::slog;
|
||||
|
||||
@@ -248,13 +248,22 @@ pub fn transition_to_blocked(story_id: &str, reason: &str) -> Result<(), String>
|
||||
.map_err(|e| e.to_string())
|
||||
}
|
||||
|
||||
/// Transition a story from `Stage::Merge` to `Stage::MergeFailure` via the state machine.
|
||||
/// Transition a story from `Stage::Merge` (or `Stage::MergeFailure`) to
|
||||
/// `Stage::MergeFailure` via the state machine.
|
||||
///
|
||||
/// Builds a `PipelineEvent::MergeFailed { reason }`, validates the transition, writes
|
||||
/// the resulting `Stage::MergeFailure` to the CRDT, and persists the reason to front
|
||||
/// matter so it survives server restarts.
|
||||
///
|
||||
/// When the story is already in `MergeFailure`, this is a silent self-loop: the
|
||||
/// returned `TransitionFired::before` will be `Stage::MergeFailure`. Callers
|
||||
/// should suppress re-notification in that case to avoid duplicate chat messages.
|
||||
///
|
||||
/// Returns `Err` on `TransitionError` — callers must NOT fall back to direct register writes.
|
||||
pub fn transition_to_merge_failure(story_id: &str, reason: &str) -> Result<(), String> {
|
||||
pub fn transition_to_merge_failure(
|
||||
story_id: &str,
|
||||
reason: &str,
|
||||
) -> Result<TransitionFired, String> {
|
||||
let reason_owned = reason.to_string();
|
||||
let transform: Box<dyn Fn(&str) -> String> = Box::new(move |content: &str| {
|
||||
crate::db::yaml_legacy::write_merge_failure_in_content(content, &reason_owned)
|
||||
@@ -266,7 +275,6 @@ pub fn transition_to_merge_failure(story_id: &str, reason: &str) -> Result<(), S
|
||||
},
|
||||
Some(&*transform),
|
||||
)
|
||||
.map(|_| ())
|
||||
.map_err(|e| e.to_string())
|
||||
}
|
||||
|
||||
|
||||
@@ -126,19 +126,30 @@ impl AgentPool {
|
||||
});
|
||||
} else {
|
||||
// Transition through the state machine (Merge → MergeFailure).
|
||||
if let Err(e) =
|
||||
crate::agents::lifecycle::transition_to_merge_failure(&sid, &reason)
|
||||
{
|
||||
crate::slog_error!(
|
||||
"[merge] Failed to transition '{sid}' to MergeFailure: {e}"
|
||||
);
|
||||
// Only send the notification when the stage actually changed; if the
|
||||
// story was already in MergeFailure (self-loop), suppress the duplicate.
|
||||
let should_notify = match crate::agents::lifecycle::transition_to_merge_failure(
|
||||
&sid, &reason,
|
||||
) {
|
||||
Ok(fired) => !matches!(
|
||||
fired.before,
|
||||
crate::pipeline_state::Stage::MergeFailure { .. }
|
||||
),
|
||||
Err(e) => {
|
||||
crate::slog_error!(
|
||||
"[merge] Failed to transition '{sid}' to MergeFailure: {e}"
|
||||
);
|
||||
true
|
||||
}
|
||||
};
|
||||
if should_notify {
|
||||
let _ =
|
||||
pool.watcher_tx
|
||||
.send(crate::io::watcher::WatcherEvent::MergeFailure {
|
||||
story_id: sid.clone(),
|
||||
reason,
|
||||
});
|
||||
}
|
||||
let _ = pool
|
||||
.watcher_tx
|
||||
.send(crate::io::watcher::WatcherEvent::MergeFailure {
|
||||
story_id: sid.clone(),
|
||||
reason,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -204,6 +204,55 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
/// Story 916: a rate-limit hard block must extend the inactivity deadline
|
||||
/// by the backoff duration so the watchdog doesn't kill the agent while
|
||||
/// it's legitimately waiting for the limit to clear.
|
||||
///
|
||||
/// Script emits a hard-block event with reset_at in the far future, then
|
||||
/// sleeps 3s, then exits. With inactivity_timeout_secs = 1, the run
|
||||
/// would normally fail at the 1s mark; with the extension the deadline
|
||||
/// is bumped past the sleep and the script completes cleanly. The
|
||||
/// far-future reset_at avoids wall-clock races under cargo-test load.
|
||||
#[tokio::test]
|
||||
async fn rate_limit_hard_block_extends_inactivity_deadline() {
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let script = tmp.path().join("emit_then_wait.sh");
|
||||
let body =
|
||||
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"hard_block\",\"reset_at\":\"2099-01-01T12:00:00Z\"}}'\nsleep 3\n";
|
||||
std::fs::write(&script, body).unwrap();
|
||||
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
|
||||
|
||||
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
||||
let (watcher_tx, _watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
||||
let event_log = Arc::new(Mutex::new(Vec::new()));
|
||||
let child_killers = Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
let result = run_agent_pty_streaming(
|
||||
"916_story_rate_limit_extension",
|
||||
"mergemaster",
|
||||
"sh",
|
||||
&[script.to_string_lossy().to_string()],
|
||||
"--",
|
||||
"/tmp",
|
||||
&tx,
|
||||
&event_log,
|
||||
None,
|
||||
1, // inactivity_timeout_secs = 1s; would expire before the 3s sleep without the extension
|
||||
child_killers,
|
||||
watcher_tx,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(
|
||||
result.is_ok(),
|
||||
"PTY run should not be killed by inactivity timeout during rate-limit block: {:?}",
|
||||
result.err()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_emit_event_writes_to_log_writer() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
|
||||
@@ -232,7 +232,7 @@ fn run_agent_pty_blocking(
|
||||
slog!("[agent:{sid_for_reader}:{aname_for_reader}] Reader thread exiting");
|
||||
});
|
||||
|
||||
let timeout_dur = if inactivity_timeout_secs > 0 {
|
||||
let base_timeout = if inactivity_timeout_secs > 0 {
|
||||
Some(std::time::Duration::from_secs(inactivity_timeout_secs))
|
||||
} else {
|
||||
None
|
||||
@@ -240,9 +240,24 @@ fn run_agent_pty_blocking(
|
||||
|
||||
let mut session_id: Option<String> = None;
|
||||
let mut token_usage: Option<TokenUsage> = None;
|
||||
// When the agent receives a rate-limit hard block, claude-code waits
|
||||
// internally for the limit to clear and emits no PTY output during that
|
||||
// window. Extend each recv's deadline by `block_until - now` so the
|
||||
// inactivity timeout doesn't fire while we're legitimately waiting on
|
||||
// Anthropic. Once `block_until` passes (or the block is cleared by an
|
||||
// earlier-than-expected emit), the extension implicitly drops back to 0
|
||||
// and the base inactivity timeout resumes (story 916).
|
||||
let mut block_until: Option<chrono::DateTime<chrono::Utc>> = None;
|
||||
|
||||
loop {
|
||||
let recv_result = match timeout_dur {
|
||||
let effective_timeout = base_timeout.map(|base| {
|
||||
let extra = block_until
|
||||
.and_then(|t| (t - chrono::Utc::now()).to_std().ok())
|
||||
.unwrap_or(std::time::Duration::ZERO);
|
||||
base + extra
|
||||
});
|
||||
|
||||
let recv_result = match effective_timeout {
|
||||
Some(dur) => line_rx.recv_timeout(dur),
|
||||
None => line_rx
|
||||
.recv()
|
||||
@@ -351,6 +366,12 @@ fn run_agent_pty_blocking(
|
||||
default
|
||||
}
|
||||
};
|
||||
// Pause the inactivity clock until the rate limit resets
|
||||
// (story 916). Without this the watchdog kills the agent
|
||||
// mid-wait — mergemaster especially, whose 15-minute
|
||||
// inactivity window is shorter than typical rate-limit
|
||||
// backoffs.
|
||||
block_until = Some(reset_at);
|
||||
let _ = watcher_tx.send(WatcherEvent::RateLimitHardBlock {
|
||||
story_id: story_id.to_string(),
|
||||
agent_name: agent_name.to_string(),
|
||||
|
||||
@@ -179,19 +179,28 @@ pub(super) fn tool_report_merge_failure(args: &Value, ctx: &AppContext) -> Resul
|
||||
slog!("[mergemaster] Merge failure reported for '{story_id}': {reason}");
|
||||
ctx.services.agents.set_merge_failure_reported(story_id);
|
||||
|
||||
// Broadcast the failure so the Matrix notification listener can post an
|
||||
// error message to configured rooms without coupling this tool to the bot.
|
||||
let _ = ctx
|
||||
.watcher_tx
|
||||
.send(crate::io::watcher::WatcherEvent::MergeFailure {
|
||||
story_id: story_id.to_string(),
|
||||
reason: reason.to_string(),
|
||||
});
|
||||
|
||||
// Route the failure through the typed state machine (Merge → MergeFailure).
|
||||
// This persists the reason in front matter and updates the CRDT stage.
|
||||
if let Err(e) = crate::agents::lifecycle::transition_to_merge_failure(story_id, reason) {
|
||||
slog_warn!("[mergemaster] Failed to transition '{story_id}' to MergeFailure: {e}");
|
||||
// Only broadcast the notification when the stage actually changed; if the
|
||||
// story was already in MergeFailure (self-loop), suppress the duplicate.
|
||||
let should_notify =
|
||||
match crate::agents::lifecycle::transition_to_merge_failure(story_id, reason) {
|
||||
Ok(fired) => !matches!(
|
||||
fired.before,
|
||||
crate::pipeline_state::Stage::MergeFailure { .. }
|
||||
),
|
||||
Err(e) => {
|
||||
slog_warn!("[mergemaster] Failed to transition '{story_id}' to MergeFailure: {e}");
|
||||
true
|
||||
}
|
||||
};
|
||||
if should_notify {
|
||||
let _ = ctx
|
||||
.watcher_tx
|
||||
.send(crate::io::watcher::WatcherEvent::MergeFailure {
|
||||
story_id: story_id.to_string(),
|
||||
reason: reason.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
Ok(format!(
|
||||
|
||||
@@ -750,4 +750,82 @@ fn merge_failure_transition_emits_event_with_full_reason() {
|
||||
);
|
||||
}
|
||||
|
||||
// ── Story 913: MergeFailure + MergeFailed self-loop ────────────────
|
||||
|
||||
/// AC1: `MergeFailure + MergeFailed` is a valid self-transition — no error logged.
|
||||
#[test]
|
||||
fn merge_failure_plus_merge_failed_is_self_loop() {
|
||||
let s = Stage::MergeFailure {
|
||||
reason: "initial failure".into(),
|
||||
};
|
||||
let result = transition(
|
||||
s,
|
||||
PipelineEvent::MergeFailed {
|
||||
reason: "second failure".into(),
|
||||
},
|
||||
);
|
||||
assert!(
|
||||
matches!(result, Ok(Stage::MergeFailure { .. })),
|
||||
"MergeFailure + MergeFailed should self-loop to MergeFailure, got: {result:?}"
|
||||
);
|
||||
}
|
||||
|
||||
/// AC2 + AC3: applying `MergeFailed` to a story already in `MergeFailure` succeeds and
|
||||
/// the `TransitionFired::before` is `MergeFailure`, allowing callers to suppress the
|
||||
/// duplicate notification.
|
||||
#[test]
|
||||
fn repeated_merge_failure_apply_transition_no_error_no_duplicate_notification() {
|
||||
crate::crdt_state::init_for_test();
|
||||
crate::db::ensure_content_store();
|
||||
|
||||
let story_id = "99913_story_merge_failure_selfloop";
|
||||
crate::db::write_item_with_content(
|
||||
story_id,
|
||||
"4_merge_failure",
|
||||
"---\nname: MergeFailure Self-loop Test\n---\n# Story\n",
|
||||
crate::db::ItemMeta::from_yaml("---\nname: MergeFailure Self-loop Test\n---\n# Story\n"),
|
||||
);
|
||||
|
||||
// Apply a second MergeFailed to a story already in MergeFailure.
|
||||
let fired = super::apply::apply_transition(
|
||||
story_id,
|
||||
PipelineEvent::MergeFailed {
|
||||
reason: "duplicate failure".into(),
|
||||
},
|
||||
None,
|
||||
)
|
||||
.expect("MergeFailed on already-failed story should succeed without error");
|
||||
|
||||
// The before-stage was MergeFailure: this was a self-loop.
|
||||
// Callers check this to decide whether to suppress the chat notification.
|
||||
assert!(
|
||||
matches!(fired.before, Stage::MergeFailure { .. }),
|
||||
"fired.before should be MergeFailure (self-loop): {:?}",
|
||||
fired.before
|
||||
);
|
||||
assert!(
|
||||
matches!(fired.after, Stage::MergeFailure { .. }),
|
||||
"fired.after should remain MergeFailure: {:?}",
|
||||
fired.after
|
||||
);
|
||||
|
||||
// Verify the CRDT stage is still 4_merge_failure.
|
||||
let item = read_typed(story_id)
|
||||
.expect("CRDT read should succeed")
|
||||
.expect("item should still exist");
|
||||
assert_eq!(
|
||||
item.stage.dir_name(),
|
||||
"4_merge_failure",
|
||||
"CRDT stage should remain 4_merge_failure after self-loop"
|
||||
);
|
||||
|
||||
// Simulate the caller's de-dup logic: since fired.before is already MergeFailure,
|
||||
// no notification should be dispatched.
|
||||
let should_notify = !matches!(fired.before, Stage::MergeFailure { .. });
|
||||
assert!(
|
||||
!should_notify,
|
||||
"should_notify must be false for a self-loop to prevent duplicate notification"
|
||||
);
|
||||
}
|
||||
|
||||
// ── ProjectionError Display ─────────────────────────────────────────
|
||||
|
||||
@@ -181,6 +181,12 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
|
||||
// ── MergeFailed: Merge → MergeFailure (recoverable intermediate) ──
|
||||
(Merge { .. }, MergeFailed { reason }) => Ok(MergeFailure { reason }),
|
||||
|
||||
// ── MergeFailure self-loop: repeated failure is a no-op ─────────────
|
||||
// When the mergemaster retries and fails again while the story is already
|
||||
// in MergeFailure, treat it as a silent self-transition so callers can
|
||||
// detect the no-op via `fired.before == MergeFailure` and skip re-notifying.
|
||||
(MergeFailure { .. }, MergeFailed { reason }) => Ok(MergeFailure { reason }),
|
||||
|
||||
(Merge { .. }, MergeFailedFinal { reason }) => Ok(Archived {
|
||||
archived_at: now,
|
||||
reason: ArchiveReason::MergeFailed { reason },
|
||||
|
||||
Reference in New Issue
Block a user