test: serialise merge-pipeline tests against each other
The 12 tests in `agents::pool::pipeline::merge::tests` share a process-wide `server_start_time` (a `OnceLock` captured the first time the merge subsystem runs) and the global merge-job CRDT log. Default cargo parallelism has caught at least one interleaving on the merge gate's Docker scheduler where `stale_running_merge_job_is_cleared_and_retry_succeeds` flakes — `delete_merge_job` from one test lands while another is mid- assertion. Couldn't reproduce locally despite many tries. Each test now acquires a poison-tolerant `std::sync::Mutex` at entry, so the 12 tests run serially relative to each other while the rest of the suite (2862 tests) stays parallel. Module-level `#![allow(clippy::await_holding_lock)]` covers the deliberate sync guard across `.await`s. Targeted isolation — not a global `--test-threads=1`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,11 +1,38 @@
|
|||||||
//! Tests for the merge pipeline module.
|
//! Tests for the merge pipeline module.
|
||||||
|
|
||||||
|
// `serial_test_lock` returns a `std::sync::MutexGuard<'static, ()>` that
|
||||||
|
// each test holds for its full body — including across `.await` points.
|
||||||
|
// Clippy normally warns about that because async-yielding while holding a
|
||||||
|
// sync mutex can deadlock arbitrary tasks; here it's exactly the goal:
|
||||||
|
// only one test in this module holds the guard at a time, the rest block
|
||||||
|
// on `lock()`, and the merge gate's flaky interleaving is eliminated.
|
||||||
|
#![allow(clippy::await_holding_lock)]
|
||||||
|
|
||||||
use super::super::super::AgentPool;
|
use super::super::super::AgentPool;
|
||||||
use super::time::{encode_server_start_time, server_start_time};
|
use super::time::{encode_server_start_time, server_start_time};
|
||||||
use crate::agents::merge::{MergeJob, MergeJobStatus};
|
use crate::agents::merge::{MergeJob, MergeJobStatus};
|
||||||
use std::process::Command;
|
use std::process::Command;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
/// Acquire a process-wide serial lock for the merge-pipeline tests.
|
||||||
|
///
|
||||||
|
/// These tests share a process-wide `server_start_time()` (a `OnceLock`
|
||||||
|
/// captured the first time anything in the merge subsystem calls it) and
|
||||||
|
/// touch the global merge-job CRDT log. Cargo runs them in parallel by
|
||||||
|
/// default, and the merge gate's Docker scheduler has caught at least one
|
||||||
|
/// interleaving where one test's `delete_merge_job` lands while another is
|
||||||
|
/// asserting the entry is still there.
|
||||||
|
///
|
||||||
|
/// Holding this mutex for each test serialises only the merge-pipeline
|
||||||
|
/// tests against each other — the rest of the suite stays parallel. The
|
||||||
|
/// guard is poison-tolerant so a panicking test doesn't lock out everything
|
||||||
|
/// that follows.
|
||||||
|
fn serial_test_lock() -> std::sync::MutexGuard<'static, ()> {
|
||||||
|
use std::sync::Mutex;
|
||||||
|
static LOCK: Mutex<()> = Mutex::new(());
|
||||||
|
LOCK.lock().unwrap_or_else(|p| p.into_inner())
|
||||||
|
}
|
||||||
|
|
||||||
fn init_git_repo(repo: &std::path::Path) {
|
fn init_git_repo(repo: &std::path::Path) {
|
||||||
Command::new("git")
|
Command::new("git")
|
||||||
.args(["init"])
|
.args(["init"])
|
||||||
@@ -40,6 +67,7 @@ fn init_git_repo(repo: &std::path::Path) {
|
|||||||
/// call succeeds.
|
/// call succeeds.
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn stale_running_merge_job_is_cleared_and_retry_succeeds() {
|
async fn stale_running_merge_job_is_cleared_and_retry_succeeds() {
|
||||||
|
let _serial = serial_test_lock();
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
crate::crdt_state::init_for_test();
|
crate::crdt_state::init_for_test();
|
||||||
@@ -94,6 +122,7 @@ async fn stale_running_merge_job_is_cleared_and_retry_succeeds() {
|
|||||||
/// merge attempt being triggered.
|
/// merge attempt being triggered.
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn reap_stale_merge_jobs_removes_old_running_entry_without_merge() {
|
async fn reap_stale_merge_jobs_removes_old_running_entry_without_merge() {
|
||||||
|
let _serial = serial_test_lock();
|
||||||
crate::crdt_state::init_for_test();
|
crate::crdt_state::init_for_test();
|
||||||
|
|
||||||
let pool = Arc::new(AgentPool::new_test(3001));
|
let pool = Arc::new(AgentPool::new_test(3001));
|
||||||
@@ -137,6 +166,7 @@ async fn reap_stale_merge_jobs_removes_old_running_entry_without_merge() {
|
|||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn stale_merge_job_with_dead_pid_is_swept_on_new_merge_attempt() {
|
async fn stale_merge_job_with_dead_pid_is_swept_on_new_merge_attempt() {
|
||||||
|
let _serial = serial_test_lock();
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
crate::crdt_state::init_for_test();
|
crate::crdt_state::init_for_test();
|
||||||
@@ -195,6 +225,7 @@ async fn run_merge_to_completion(
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn merge_agent_work_returns_error_when_branch_not_found() {
|
async fn merge_agent_work_returns_error_when_branch_not_found() {
|
||||||
|
let _serial = serial_test_lock();
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
crate::crdt_state::init_for_test();
|
crate::crdt_state::init_for_test();
|
||||||
@@ -219,6 +250,7 @@ async fn merge_agent_work_returns_error_when_branch_not_found() {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn merge_agent_work_succeeds_on_clean_branch() {
|
async fn merge_agent_work_succeeds_on_clean_branch() {
|
||||||
|
let _serial = serial_test_lock();
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
@@ -303,6 +335,7 @@ async fn merge_agent_work_succeeds_on_clean_branch() {
|
|||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
#[test]
|
#[test]
|
||||||
fn quality_gates_run_before_fast_forward_to_master() {
|
fn quality_gates_run_before_fast_forward_to_master() {
|
||||||
|
let _serial = serial_test_lock();
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::os::unix::fs::PermissionsExt;
|
use std::os::unix::fs::PermissionsExt;
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
@@ -396,6 +429,7 @@ fn quality_gates_run_before_fast_forward_to_master() {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn merge_agent_work_conflict_does_not_break_master() {
|
async fn merge_agent_work_conflict_does_not_break_master() {
|
||||||
|
let _serial = serial_test_lock();
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
@@ -513,6 +547,7 @@ async fn merge_agent_work_conflict_does_not_break_master() {
|
|||||||
/// error and the story must remain in `4_merge` (not advance to `5_done`).
|
/// error and the story must remain in `4_merge` (not advance to `5_done`).
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn merge_agent_work_zero_commits_ahead_stays_in_merge_stage() {
|
async fn merge_agent_work_zero_commits_ahead_stays_in_merge_stage() {
|
||||||
|
let _serial = serial_test_lock();
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
@@ -596,6 +631,7 @@ async fn merge_agent_work_zero_commits_ahead_stays_in_merge_stage() {
|
|||||||
/// The merge_failure field must NOT be written.
|
/// The merge_failure field must NOT be written.
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn server_side_merge_happy_path_advances_to_done() {
|
async fn server_side_merge_happy_path_advances_to_done() {
|
||||||
|
let _serial = serial_test_lock();
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
@@ -707,6 +743,7 @@ async fn server_side_merge_happy_path_advances_to_done() {
|
|||||||
/// must remain in `4_merge/`.
|
/// must remain in `4_merge/`.
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn server_side_merge_conflict_sets_merge_failure() {
|
async fn server_side_merge_conflict_sets_merge_failure() {
|
||||||
|
let _serial = serial_test_lock();
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
@@ -826,6 +863,7 @@ async fn server_side_merge_conflict_sets_merge_failure() {
|
|||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn server_side_merge_gate_failure_sets_merge_failure() {
|
async fn server_side_merge_gate_failure_sets_merge_failure() {
|
||||||
|
let _serial = serial_test_lock();
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::os::unix::fs::PermissionsExt;
|
use std::os::unix::fs::PermissionsExt;
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
@@ -944,6 +982,7 @@ async fn server_side_merge_gate_failure_sets_merge_failure() {
|
|||||||
/// ahead of master must continue to merge successfully (happy path).
|
/// ahead of master must continue to merge successfully (happy path).
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn merge_agent_work_one_commit_ahead_merges_successfully() {
|
async fn merge_agent_work_one_commit_ahead_merges_successfully() {
|
||||||
|
let _serial = serial_test_lock();
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user