Compare commits
3 Commits
8421104645
...
38df9c78af
| Author | SHA1 | Date | |
|---|---|---|---|
| 38df9c78af | |||
| a34c9796b5 | |||
| 90b31fc84f |
@@ -12,8 +12,8 @@ use std::process::Command;
|
|||||||
|
|
||||||
use crate::db::yaml_legacy::clear_front_matter_field_in_content;
|
use crate::db::yaml_legacy::clear_front_matter_field_in_content;
|
||||||
use crate::pipeline_state::{
|
use crate::pipeline_state::{
|
||||||
ApplyError, ArchiveReason, BranchName, GitSha, PipelineEvent, Stage, apply_transition,
|
ApplyError, ArchiveReason, BranchName, GitSha, PipelineEvent, Stage, TransitionFired,
|
||||||
stage_label,
|
apply_transition, stage_label,
|
||||||
};
|
};
|
||||||
use crate::slog;
|
use crate::slog;
|
||||||
|
|
||||||
@@ -248,13 +248,22 @@ pub fn transition_to_blocked(story_id: &str, reason: &str) -> Result<(), String>
|
|||||||
.map_err(|e| e.to_string())
|
.map_err(|e| e.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Transition a story from `Stage::Merge` to `Stage::MergeFailure` via the state machine.
|
/// Transition a story from `Stage::Merge` (or `Stage::MergeFailure`) to
|
||||||
|
/// `Stage::MergeFailure` via the state machine.
|
||||||
///
|
///
|
||||||
/// Builds a `PipelineEvent::MergeFailed { reason }`, validates the transition, writes
|
/// Builds a `PipelineEvent::MergeFailed { reason }`, validates the transition, writes
|
||||||
/// the resulting `Stage::MergeFailure` to the CRDT, and persists the reason to front
|
/// the resulting `Stage::MergeFailure` to the CRDT, and persists the reason to front
|
||||||
/// matter so it survives server restarts.
|
/// matter so it survives server restarts.
|
||||||
|
///
|
||||||
|
/// When the story is already in `MergeFailure`, this is a silent self-loop: the
|
||||||
|
/// returned `TransitionFired::before` will be `Stage::MergeFailure`. Callers
|
||||||
|
/// should suppress re-notification in that case to avoid duplicate chat messages.
|
||||||
|
///
|
||||||
/// Returns `Err` on `TransitionError` — callers must NOT fall back to direct register writes.
|
/// Returns `Err` on `TransitionError` — callers must NOT fall back to direct register writes.
|
||||||
pub fn transition_to_merge_failure(story_id: &str, reason: &str) -> Result<(), String> {
|
pub fn transition_to_merge_failure(
|
||||||
|
story_id: &str,
|
||||||
|
reason: &str,
|
||||||
|
) -> Result<TransitionFired, String> {
|
||||||
let reason_owned = reason.to_string();
|
let reason_owned = reason.to_string();
|
||||||
let transform: Box<dyn Fn(&str) -> String> = Box::new(move |content: &str| {
|
let transform: Box<dyn Fn(&str) -> String> = Box::new(move |content: &str| {
|
||||||
crate::db::yaml_legacy::write_merge_failure_in_content(content, &reason_owned)
|
crate::db::yaml_legacy::write_merge_failure_in_content(content, &reason_owned)
|
||||||
@@ -266,7 +275,6 @@ pub fn transition_to_merge_failure(story_id: &str, reason: &str) -> Result<(), S
|
|||||||
},
|
},
|
||||||
Some(&*transform),
|
Some(&*transform),
|
||||||
)
|
)
|
||||||
.map(|_| ())
|
|
||||||
.map_err(|e| e.to_string())
|
.map_err(|e| e.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -126,19 +126,30 @@ impl AgentPool {
|
|||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
// Transition through the state machine (Merge → MergeFailure).
|
// Transition through the state machine (Merge → MergeFailure).
|
||||||
if let Err(e) =
|
// Only send the notification when the stage actually changed; if the
|
||||||
crate::agents::lifecycle::transition_to_merge_failure(&sid, &reason)
|
// story was already in MergeFailure (self-loop), suppress the duplicate.
|
||||||
{
|
let should_notify = match crate::agents::lifecycle::transition_to_merge_failure(
|
||||||
crate::slog_error!(
|
&sid, &reason,
|
||||||
"[merge] Failed to transition '{sid}' to MergeFailure: {e}"
|
) {
|
||||||
);
|
Ok(fired) => !matches!(
|
||||||
|
fired.before,
|
||||||
|
crate::pipeline_state::Stage::MergeFailure { .. }
|
||||||
|
),
|
||||||
|
Err(e) => {
|
||||||
|
crate::slog_error!(
|
||||||
|
"[merge] Failed to transition '{sid}' to MergeFailure: {e}"
|
||||||
|
);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if should_notify {
|
||||||
|
let _ =
|
||||||
|
pool.watcher_tx
|
||||||
|
.send(crate::io::watcher::WatcherEvent::MergeFailure {
|
||||||
|
story_id: sid.clone(),
|
||||||
|
reason,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
let _ = pool
|
|
||||||
.watcher_tx
|
|
||||||
.send(crate::io::watcher::WatcherEvent::MergeFailure {
|
|
||||||
story_id: sid.clone(),
|
|
||||||
reason,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -204,6 +204,55 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Story 916: a rate-limit hard block must extend the inactivity deadline
|
||||||
|
/// by the backoff duration so the watchdog doesn't kill the agent while
|
||||||
|
/// it's legitimately waiting for the limit to clear.
|
||||||
|
///
|
||||||
|
/// Script emits a hard-block event with reset_at in the far future, then
|
||||||
|
/// sleeps 3s, then exits. With inactivity_timeout_secs = 1, the run
|
||||||
|
/// would normally fail at the 1s mark; with the extension the deadline
|
||||||
|
/// is bumped past the sleep and the script completes cleanly. The
|
||||||
|
/// far-future reset_at avoids wall-clock races under cargo-test load.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn rate_limit_hard_block_extends_inactivity_deadline() {
|
||||||
|
use std::os::unix::fs::PermissionsExt;
|
||||||
|
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let script = tmp.path().join("emit_then_wait.sh");
|
||||||
|
let body =
|
||||||
|
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"hard_block\",\"reset_at\":\"2099-01-01T12:00:00Z\"}}'\nsleep 3\n";
|
||||||
|
std::fs::write(&script, body).unwrap();
|
||||||
|
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
|
||||||
|
|
||||||
|
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
||||||
|
let (watcher_tx, _watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
||||||
|
let event_log = Arc::new(Mutex::new(Vec::new()));
|
||||||
|
let child_killers = Arc::new(Mutex::new(HashMap::new()));
|
||||||
|
|
||||||
|
let result = run_agent_pty_streaming(
|
||||||
|
"916_story_rate_limit_extension",
|
||||||
|
"mergemaster",
|
||||||
|
"sh",
|
||||||
|
&[script.to_string_lossy().to_string()],
|
||||||
|
"--",
|
||||||
|
"/tmp",
|
||||||
|
&tx,
|
||||||
|
&event_log,
|
||||||
|
None,
|
||||||
|
1, // inactivity_timeout_secs = 1s; would expire before the 3s sleep without the extension
|
||||||
|
child_killers,
|
||||||
|
watcher_tx,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
result.is_ok(),
|
||||||
|
"PTY run should not be killed by inactivity timeout during rate-limit block: {:?}",
|
||||||
|
result.err()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_emit_event_writes_to_log_writer() {
|
fn test_emit_event_writes_to_log_writer() {
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
|||||||
@@ -232,7 +232,7 @@ fn run_agent_pty_blocking(
|
|||||||
slog!("[agent:{sid_for_reader}:{aname_for_reader}] Reader thread exiting");
|
slog!("[agent:{sid_for_reader}:{aname_for_reader}] Reader thread exiting");
|
||||||
});
|
});
|
||||||
|
|
||||||
let timeout_dur = if inactivity_timeout_secs > 0 {
|
let base_timeout = if inactivity_timeout_secs > 0 {
|
||||||
Some(std::time::Duration::from_secs(inactivity_timeout_secs))
|
Some(std::time::Duration::from_secs(inactivity_timeout_secs))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
@@ -240,9 +240,24 @@ fn run_agent_pty_blocking(
|
|||||||
|
|
||||||
let mut session_id: Option<String> = None;
|
let mut session_id: Option<String> = None;
|
||||||
let mut token_usage: Option<TokenUsage> = None;
|
let mut token_usage: Option<TokenUsage> = None;
|
||||||
|
// When the agent receives a rate-limit hard block, claude-code waits
|
||||||
|
// internally for the limit to clear and emits no PTY output during that
|
||||||
|
// window. Extend each recv's deadline by `block_until - now` so the
|
||||||
|
// inactivity timeout doesn't fire while we're legitimately waiting on
|
||||||
|
// Anthropic. Once `block_until` passes (or the block is cleared by an
|
||||||
|
// earlier-than-expected emit), the extension implicitly drops back to 0
|
||||||
|
// and the base inactivity timeout resumes (story 916).
|
||||||
|
let mut block_until: Option<chrono::DateTime<chrono::Utc>> = None;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let recv_result = match timeout_dur {
|
let effective_timeout = base_timeout.map(|base| {
|
||||||
|
let extra = block_until
|
||||||
|
.and_then(|t| (t - chrono::Utc::now()).to_std().ok())
|
||||||
|
.unwrap_or(std::time::Duration::ZERO);
|
||||||
|
base + extra
|
||||||
|
});
|
||||||
|
|
||||||
|
let recv_result = match effective_timeout {
|
||||||
Some(dur) => line_rx.recv_timeout(dur),
|
Some(dur) => line_rx.recv_timeout(dur),
|
||||||
None => line_rx
|
None => line_rx
|
||||||
.recv()
|
.recv()
|
||||||
@@ -351,6 +366,12 @@ fn run_agent_pty_blocking(
|
|||||||
default
|
default
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
// Pause the inactivity clock until the rate limit resets
|
||||||
|
// (story 916). Without this the watchdog kills the agent
|
||||||
|
// mid-wait — mergemaster especially, whose 15-minute
|
||||||
|
// inactivity window is shorter than typical rate-limit
|
||||||
|
// backoffs.
|
||||||
|
block_until = Some(reset_at);
|
||||||
let _ = watcher_tx.send(WatcherEvent::RateLimitHardBlock {
|
let _ = watcher_tx.send(WatcherEvent::RateLimitHardBlock {
|
||||||
story_id: story_id.to_string(),
|
story_id: story_id.to_string(),
|
||||||
agent_name: agent_name.to_string(),
|
agent_name: agent_name.to_string(),
|
||||||
|
|||||||
@@ -179,19 +179,28 @@ pub(super) fn tool_report_merge_failure(args: &Value, ctx: &AppContext) -> Resul
|
|||||||
slog!("[mergemaster] Merge failure reported for '{story_id}': {reason}");
|
slog!("[mergemaster] Merge failure reported for '{story_id}': {reason}");
|
||||||
ctx.services.agents.set_merge_failure_reported(story_id);
|
ctx.services.agents.set_merge_failure_reported(story_id);
|
||||||
|
|
||||||
// Broadcast the failure so the Matrix notification listener can post an
|
|
||||||
// error message to configured rooms without coupling this tool to the bot.
|
|
||||||
let _ = ctx
|
|
||||||
.watcher_tx
|
|
||||||
.send(crate::io::watcher::WatcherEvent::MergeFailure {
|
|
||||||
story_id: story_id.to_string(),
|
|
||||||
reason: reason.to_string(),
|
|
||||||
});
|
|
||||||
|
|
||||||
// Route the failure through the typed state machine (Merge → MergeFailure).
|
// Route the failure through the typed state machine (Merge → MergeFailure).
|
||||||
// This persists the reason in front matter and updates the CRDT stage.
|
// This persists the reason in front matter and updates the CRDT stage.
|
||||||
if let Err(e) = crate::agents::lifecycle::transition_to_merge_failure(story_id, reason) {
|
// Only broadcast the notification when the stage actually changed; if the
|
||||||
slog_warn!("[mergemaster] Failed to transition '{story_id}' to MergeFailure: {e}");
|
// story was already in MergeFailure (self-loop), suppress the duplicate.
|
||||||
|
let should_notify =
|
||||||
|
match crate::agents::lifecycle::transition_to_merge_failure(story_id, reason) {
|
||||||
|
Ok(fired) => !matches!(
|
||||||
|
fired.before,
|
||||||
|
crate::pipeline_state::Stage::MergeFailure { .. }
|
||||||
|
),
|
||||||
|
Err(e) => {
|
||||||
|
slog_warn!("[mergemaster] Failed to transition '{story_id}' to MergeFailure: {e}");
|
||||||
|
true
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if should_notify {
|
||||||
|
let _ = ctx
|
||||||
|
.watcher_tx
|
||||||
|
.send(crate::io::watcher::WatcherEvent::MergeFailure {
|
||||||
|
story_id: story_id.to_string(),
|
||||||
|
reason: reason.to_string(),
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(format!(
|
Ok(format!(
|
||||||
|
|||||||
@@ -750,4 +750,82 @@ fn merge_failure_transition_emits_event_with_full_reason() {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Story 913: MergeFailure + MergeFailed self-loop ────────────────
|
||||||
|
|
||||||
|
/// AC1: `MergeFailure + MergeFailed` is a valid self-transition — no error logged.
|
||||||
|
#[test]
|
||||||
|
fn merge_failure_plus_merge_failed_is_self_loop() {
|
||||||
|
let s = Stage::MergeFailure {
|
||||||
|
reason: "initial failure".into(),
|
||||||
|
};
|
||||||
|
let result = transition(
|
||||||
|
s,
|
||||||
|
PipelineEvent::MergeFailed {
|
||||||
|
reason: "second failure".into(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
matches!(result, Ok(Stage::MergeFailure { .. })),
|
||||||
|
"MergeFailure + MergeFailed should self-loop to MergeFailure, got: {result:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC2 + AC3: applying `MergeFailed` to a story already in `MergeFailure` succeeds and
|
||||||
|
/// the `TransitionFired::before` is `MergeFailure`, allowing callers to suppress the
|
||||||
|
/// duplicate notification.
|
||||||
|
#[test]
|
||||||
|
fn repeated_merge_failure_apply_transition_no_error_no_duplicate_notification() {
|
||||||
|
crate::crdt_state::init_for_test();
|
||||||
|
crate::db::ensure_content_store();
|
||||||
|
|
||||||
|
let story_id = "99913_story_merge_failure_selfloop";
|
||||||
|
crate::db::write_item_with_content(
|
||||||
|
story_id,
|
||||||
|
"4_merge_failure",
|
||||||
|
"---\nname: MergeFailure Self-loop Test\n---\n# Story\n",
|
||||||
|
crate::db::ItemMeta::from_yaml("---\nname: MergeFailure Self-loop Test\n---\n# Story\n"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Apply a second MergeFailed to a story already in MergeFailure.
|
||||||
|
let fired = super::apply::apply_transition(
|
||||||
|
story_id,
|
||||||
|
PipelineEvent::MergeFailed {
|
||||||
|
reason: "duplicate failure".into(),
|
||||||
|
},
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.expect("MergeFailed on already-failed story should succeed without error");
|
||||||
|
|
||||||
|
// The before-stage was MergeFailure: this was a self-loop.
|
||||||
|
// Callers check this to decide whether to suppress the chat notification.
|
||||||
|
assert!(
|
||||||
|
matches!(fired.before, Stage::MergeFailure { .. }),
|
||||||
|
"fired.before should be MergeFailure (self-loop): {:?}",
|
||||||
|
fired.before
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
matches!(fired.after, Stage::MergeFailure { .. }),
|
||||||
|
"fired.after should remain MergeFailure: {:?}",
|
||||||
|
fired.after
|
||||||
|
);
|
||||||
|
|
||||||
|
// Verify the CRDT stage is still 4_merge_failure.
|
||||||
|
let item = read_typed(story_id)
|
||||||
|
.expect("CRDT read should succeed")
|
||||||
|
.expect("item should still exist");
|
||||||
|
assert_eq!(
|
||||||
|
item.stage.dir_name(),
|
||||||
|
"4_merge_failure",
|
||||||
|
"CRDT stage should remain 4_merge_failure after self-loop"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Simulate the caller's de-dup logic: since fired.before is already MergeFailure,
|
||||||
|
// no notification should be dispatched.
|
||||||
|
let should_notify = !matches!(fired.before, Stage::MergeFailure { .. });
|
||||||
|
assert!(
|
||||||
|
!should_notify,
|
||||||
|
"should_notify must be false for a self-loop to prevent duplicate notification"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// ── ProjectionError Display ─────────────────────────────────────────
|
// ── ProjectionError Display ─────────────────────────────────────────
|
||||||
|
|||||||
@@ -181,6 +181,12 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
|
|||||||
// ── MergeFailed: Merge → MergeFailure (recoverable intermediate) ──
|
// ── MergeFailed: Merge → MergeFailure (recoverable intermediate) ──
|
||||||
(Merge { .. }, MergeFailed { reason }) => Ok(MergeFailure { reason }),
|
(Merge { .. }, MergeFailed { reason }) => Ok(MergeFailure { reason }),
|
||||||
|
|
||||||
|
// ── MergeFailure self-loop: repeated failure is a no-op ─────────────
|
||||||
|
// When the mergemaster retries and fails again while the story is already
|
||||||
|
// in MergeFailure, treat it as a silent self-transition so callers can
|
||||||
|
// detect the no-op via `fired.before == MergeFailure` and skip re-notifying.
|
||||||
|
(MergeFailure { .. }, MergeFailed { reason }) => Ok(MergeFailure { reason }),
|
||||||
|
|
||||||
(Merge { .. }, MergeFailedFinal { reason }) => Ok(Archived {
|
(Merge { .. }, MergeFailedFinal { reason }) => Ok(Archived {
|
||||||
archived_at: now,
|
archived_at: now,
|
||||||
reason: ArchiveReason::MergeFailed { reason },
|
reason: ArchiveReason::MergeFailed { reason },
|
||||||
|
|||||||
Reference in New Issue
Block a user