huskies: merge 998
This commit is contained in:
@@ -509,252 +509,6 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Story 827: auto-spawn mergemaster on content conflict ─────────────────
|
|
||||||
|
|
||||||
/// A story in 4_merge with a content-conflict merge_failure and no
|
|
||||||
/// mergemaster_attempted flag must trigger an auto-spawn of mergemaster.
|
|
||||||
#[tokio::test]
|
|
||||||
async fn auto_assign_spawns_mergemaster_for_content_conflict() {
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let sk = tmp.path().join(".huskies");
|
|
||||||
std::fs::create_dir_all(&sk).unwrap();
|
|
||||||
std::fs::write(
|
|
||||||
sk.join("project.toml"),
|
|
||||||
"[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
crate::crdt_state::init_for_test();
|
|
||||||
crate::db::ensure_content_store();
|
|
||||||
crate::db::write_item_with_content(
|
|
||||||
"9860_story_conflict",
|
|
||||||
"4_merge_failure",
|
|
||||||
"CONFLICT (content): server/src/lib.rs",
|
|
||||||
crate::db::ItemMeta::named("Conflict"),
|
|
||||||
);
|
|
||||||
// After master c228ae16, has_content_conflict_failure reads from
|
|
||||||
// {story_id}:gate_output (not the story description), so seed it there.
|
|
||||||
crate::db::write_content(
|
|
||||||
crate::db::ContentKey::GateOutput("9860_story_conflict"),
|
|
||||||
"CONFLICT (content): server/src/lib.rs",
|
|
||||||
);
|
|
||||||
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
pool.auto_assign_available_work(tmp.path()).await;
|
|
||||||
|
|
||||||
let agents = pool.agents.lock().unwrap();
|
|
||||||
let mergemaster_spawned = agents.iter().any(|(key, a)| {
|
|
||||||
key.contains("9860_story_conflict")
|
|
||||||
&& a.agent_name == "mergemaster"
|
|
||||||
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
mergemaster_spawned,
|
|
||||||
"mergemaster should be spawned for a content-conflict story"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A story with merge_failure containing only "nothing to commit" must NOT
|
|
||||||
/// auto-spawn mergemaster.
|
|
||||||
#[tokio::test]
|
|
||||||
async fn auto_assign_does_not_spawn_mergemaster_for_non_conflict_failure() {
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let sk = tmp.path().join(".huskies");
|
|
||||||
std::fs::create_dir_all(&sk).unwrap();
|
|
||||||
std::fs::write(
|
|
||||||
sk.join("project.toml"),
|
|
||||||
"[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
crate::crdt_state::init_for_test();
|
|
||||||
crate::db::ensure_content_store();
|
|
||||||
crate::db::write_item_with_content(
|
|
||||||
"9861_story_nothing",
|
|
||||||
"4_merge_failure",
|
|
||||||
"nothing to commit, working tree clean",
|
|
||||||
crate::db::ItemMeta::named("Nothing"),
|
|
||||||
);
|
|
||||||
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
pool.auto_assign_available_work(tmp.path()).await;
|
|
||||||
|
|
||||||
let agents = pool.agents.lock().unwrap();
|
|
||||||
let mergemaster_spawned = agents.iter().any(|(key, a)| {
|
|
||||||
key.contains("9861_story_nothing")
|
|
||||||
&& a.agent_name == "mergemaster"
|
|
||||||
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
!mergemaster_spawned,
|
|
||||||
"mergemaster must not be spawned for non-conflict failures"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A story in 4_merge with blocked: true must NOT auto-spawn mergemaster
|
|
||||||
/// even when it has an unresolved content-conflict merge_failure and
|
|
||||||
/// mergemaster_attempted is still false.
|
|
||||||
#[tokio::test]
|
|
||||||
async fn auto_assign_does_not_spawn_mergemaster_for_blocked_story() {
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let sk = tmp.path().join(".huskies");
|
|
||||||
std::fs::create_dir_all(&sk).unwrap();
|
|
||||||
std::fs::write(
|
|
||||||
sk.join("project.toml"),
|
|
||||||
"[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
crate::db::ensure_content_store();
|
|
||||||
// Story 945: "blocked AND in 4_merge" is no longer representable as
|
|
||||||
// separate states. A blocked story lives in `Stage::Blocked` (which
|
|
||||||
// maps to wire-form "blocked"), so auto-assign won't see it in 4_merge.
|
|
||||||
crate::db::write_item_with_content(
|
|
||||||
"9863_story_blocked_conflict",
|
|
||||||
"blocked",
|
|
||||||
"CONFLICT (content): foo.rs",
|
|
||||||
crate::db::ItemMeta {
|
|
||||||
name: Some("Blocked conflict".to_string()),
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
pool.auto_assign_available_work(tmp.path()).await;
|
|
||||||
|
|
||||||
let agents = pool.agents.lock().unwrap();
|
|
||||||
let mergemaster_spawned = agents.iter().any(|(key, a)| {
|
|
||||||
key.contains("9863_story_blocked_conflict")
|
|
||||||
&& a.agent_name == "mergemaster"
|
|
||||||
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
!mergemaster_spawned,
|
|
||||||
"mergemaster must not be spawned for a blocked story"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A story with mergemaster_attempted: true must NOT auto-spawn again, even
|
|
||||||
/// if the merge_failure still contains a content conflict.
|
|
||||||
#[tokio::test]
|
|
||||||
async fn auto_assign_does_not_respawn_mergemaster_when_already_attempted() {
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let sk = tmp.path().join(".huskies");
|
|
||||||
std::fs::create_dir_all(&sk).unwrap();
|
|
||||||
std::fs::write(
|
|
||||||
sk.join("project.toml"),
|
|
||||||
"[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
crate::db::ensure_content_store();
|
|
||||||
// Story 945: "mergemaster attempted" is now `Stage::MergeFailureFinal`.
|
|
||||||
crate::db::write_item_with_content(
|
|
||||||
"9862_story_attempted",
|
|
||||||
"merge_failure_final",
|
|
||||||
"CONFLICT (content): foo.rs",
|
|
||||||
crate::db::ItemMeta::named("Already tried"),
|
|
||||||
);
|
|
||||||
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
pool.auto_assign_available_work(tmp.path()).await;
|
|
||||||
|
|
||||||
let agents = pool.agents.lock().unwrap();
|
|
||||||
let mergemaster_spawned = agents.iter().any(|(key, a)| {
|
|
||||||
key.contains("9862_story_attempted")
|
|
||||||
&& a.agent_name == "mergemaster"
|
|
||||||
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
!mergemaster_spawned,
|
|
||||||
"mergemaster must not re-spawn when mergemaster_attempted is true"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Story 920: transient vs genuine mergemaster termination ──────────────
|
|
||||||
|
|
||||||
/// AC4 (transient): a mergemaster that was killed transiently (no
|
|
||||||
/// report_merge_failure, spawn count below cap) must be re-spawned by the
|
|
||||||
/// next auto-assign pass — `mergemaster_attempted` stays false.
|
|
||||||
#[tokio::test]
|
|
||||||
async fn transient_mergemaster_exit_allows_respawn() {
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let sk = tmp.path().join(".huskies");
|
|
||||||
std::fs::create_dir_all(&sk).unwrap();
|
|
||||||
std::fs::write(
|
|
||||||
sk.join("project.toml"),
|
|
||||||
"[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
crate::crdt_state::init_for_test();
|
|
||||||
crate::db::ensure_content_store();
|
|
||||||
crate::db::write_item_with_content(
|
|
||||||
"920_story_transient",
|
|
||||||
"4_merge_failure",
|
|
||||||
"CONFLICT (content): foo.rs",
|
|
||||||
crate::db::ItemMeta::named("Transient"),
|
|
||||||
);
|
|
||||||
// After master c228ae16, has_content_conflict_failure reads from
|
|
||||||
// {story_id}:gate_output (not the story description), so seed it there.
|
|
||||||
crate::db::write_content(
|
|
||||||
crate::db::ContentKey::GateOutput("920_story_transient"),
|
|
||||||
"CONFLICT (content): foo.rs",
|
|
||||||
);
|
|
||||||
// Simulate two previous transient exits (below cap of 3) recorded in DB.
|
|
||||||
crate::db::write_content(
|
|
||||||
crate::db::ContentKey::MergeMasterSpawnCount("920_story_transient"),
|
|
||||||
"2",
|
|
||||||
);
|
|
||||||
|
|
||||||
// mergemaster_attempted must still be false (transient exits don't set it).
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
pool.auto_assign_available_work(tmp.path()).await;
|
|
||||||
|
|
||||||
let agents = pool.agents.lock().unwrap();
|
|
||||||
let respawned = agents.iter().any(|(key, a)| {
|
|
||||||
key.contains("920_story_transient")
|
|
||||||
&& a.agent_name == "mergemaster"
|
|
||||||
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
respawned,
|
|
||||||
"mergemaster must re-spawn after transient terminations while below cap"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// AC4 (genuine): after report_merge_failure, mergemaster_attempted is set
|
|
||||||
/// to true and auto-assign must not trigger another re-spawn.
|
|
||||||
#[tokio::test]
|
|
||||||
async fn genuine_mergemaster_exit_no_respawn() {
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let sk = tmp.path().join(".huskies");
|
|
||||||
std::fs::create_dir_all(&sk).unwrap();
|
|
||||||
std::fs::write(
|
|
||||||
sk.join("project.toml"),
|
|
||||||
"[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
crate::crdt_state::init_for_test();
|
|
||||||
crate::db::ensure_content_store();
|
|
||||||
// Story 945: the genuine give-up state is now `Stage::MergeFailureFinal`.
|
|
||||||
crate::db::write_item_with_content(
|
|
||||||
"920_story_genuine",
|
|
||||||
"merge_failure_final",
|
|
||||||
"CONFLICT (content): bar.rs",
|
|
||||||
crate::db::ItemMeta::named("Genuine"),
|
|
||||||
);
|
|
||||||
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
pool.auto_assign_available_work(tmp.path()).await;
|
|
||||||
|
|
||||||
let agents = pool.agents.lock().unwrap();
|
|
||||||
let spawned = agents.iter().any(|(key, a)| {
|
|
||||||
key.contains("920_story_genuine")
|
|
||||||
&& a.agent_name == "mergemaster"
|
|
||||||
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
!spawned,
|
|
||||||
"mergemaster must not re-spawn after genuine give-up (mergemaster_attempted=true)"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Two concurrent auto_assign_available_work calls must not assign the same
|
/// Two concurrent auto_assign_available_work calls must not assign the same
|
||||||
/// agent to two stories simultaneously. After both complete, at most one
|
/// agent to two stories simultaneously. After both complete, at most one
|
||||||
/// Pending/Running entry must exist per agent name.
|
/// Pending/Running entry must exist per agent name.
|
||||||
@@ -814,159 +568,4 @@ mod tests {
|
|||||||
found {active_coder_count} active entries"
|
found {active_coder_count} active entries"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Story 958: MergeFailure transition fires auto-assign via watcher bridge ─
|
|
||||||
|
|
||||||
/// Regression: before story 958, the auto-assign subscriber filtered events
|
|
||||||
/// with `is_active()`, which returned false for `MergeFailure`. This meant
|
|
||||||
/// a CRDT `MergeFailure` transition never triggered auto-assign, and
|
|
||||||
/// mergemaster was never auto-spawned on content conflicts.
|
|
||||||
///
|
|
||||||
/// After story 958, the subscriber fires on EVERY WorkItem event. This
|
|
||||||
/// test verifies the end-to-end path: a WorkItem event with stage
|
|
||||||
/// `merge_failure` arriving on the watcher channel causes
|
|
||||||
/// `auto_assign_available_work` to run, which then auto-spawns mergemaster.
|
|
||||||
#[tokio::test]
|
|
||||||
async fn merge_failure_watcher_event_triggers_mergemaster_spawn() {
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let root = tmp.path().to_path_buf();
|
|
||||||
let sk = root.join(".huskies");
|
|
||||||
std::fs::create_dir_all(&sk).unwrap();
|
|
||||||
std::fs::write(
|
|
||||||
sk.join("project.toml"),
|
|
||||||
"[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
// The spawn path calls `git worktree add` — the tempdir must be a real
|
|
||||||
// git repo with at least one commit or it fails with "not a git repo".
|
|
||||||
for args in [
|
|
||||||
&["init"][..],
|
|
||||||
&["config", "user.email", "test@test.com"],
|
|
||||||
&["config", "user.name", "Test"],
|
|
||||||
&["commit", "--allow-empty", "-m", "init"],
|
|
||||||
] {
|
|
||||||
std::process::Command::new("git")
|
|
||||||
.args(args)
|
|
||||||
.current_dir(&root)
|
|
||||||
.output()
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
crate::crdt_state::init_for_test();
|
|
||||||
crate::db::ensure_content_store();
|
|
||||||
crate::db::write_item_with_content(
|
|
||||||
"958_regression_conflict",
|
|
||||||
"4_merge_failure",
|
|
||||||
"CONFLICT (content): server/src/lib.rs",
|
|
||||||
crate::db::ItemMeta::named("Regression"),
|
|
||||||
);
|
|
||||||
crate::db::write_content(
|
|
||||||
crate::db::ContentKey::GateOutput("958_regression_conflict"),
|
|
||||||
"CONFLICT (content): server/src/lib.rs",
|
|
||||||
);
|
|
||||||
|
|
||||||
let (watcher_tx, _) = broadcast::channel::<crate::io::watcher::WatcherEvent>(16);
|
|
||||||
let pool = Arc::new(AgentPool::new(3102, watcher_tx.clone()));
|
|
||||||
|
|
||||||
crate::startup::tick_loop::spawn_event_bridges(
|
|
||||||
watcher_tx.clone(),
|
|
||||||
Some(root.clone()),
|
|
||||||
Arc::clone(&pool),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Simulate the CRDT bridge forwarding a merge_failure stage transition.
|
|
||||||
let _ = watcher_tx.send(crate::io::watcher::WatcherEvent::WorkItem {
|
|
||||||
stage: "merge_failure".to_string(),
|
|
||||||
item_id: "958_regression_conflict".to_string(),
|
|
||||||
action: "update".to_string(),
|
|
||||||
commit_msg: "huskies: update 958_regression_conflict".to_string(),
|
|
||||||
from_stage: Some("merge".to_string()),
|
|
||||||
});
|
|
||||||
|
|
||||||
// Allow the subscriber task to run auto_assign_available_work.
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
|
||||||
|
|
||||||
let agents = pool.agents.lock().unwrap();
|
|
||||||
let mergemaster_spawned = agents.iter().any(|(key, a)| {
|
|
||||||
key.contains("958_regression_conflict")
|
|
||||||
&& a.agent_name == "mergemaster"
|
|
||||||
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
mergemaster_spawned,
|
|
||||||
"mergemaster must be auto-spawned when a merge_failure event fires \
|
|
||||||
through the watcher bridge (story 958 regression)"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// AC5 (story 982): a merge failure with content conflicts — seeded via the
|
|
||||||
/// typed `transition_to_merge_failure(ConflictDetected)` path without any
|
|
||||||
/// direct content-store or MergeJob writes in the test — produces
|
|
||||||
/// `Stage::MergeFailure { kind: ConflictDetected(_), .. }` and
|
|
||||||
/// auto-spawn-mergemaster fires within one `auto_assign_available_work` call.
|
|
||||||
#[tokio::test]
|
|
||||||
async fn auto_spawn_mergemaster_for_conflict_detected_kind_without_content_store_writes() {
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let sk = tmp.path().join(".huskies");
|
|
||||||
std::fs::create_dir_all(&sk).unwrap();
|
|
||||||
std::fs::write(
|
|
||||||
sk.join("project.toml"),
|
|
||||||
"[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
crate::crdt_state::init_for_test();
|
|
||||||
crate::db::ensure_content_store();
|
|
||||||
|
|
||||||
let story_id = "982_ac5_conflict_auto_spawn";
|
|
||||||
// Seed at Merge stage so the transition is valid.
|
|
||||||
crate::db::write_item_with_content(
|
|
||||||
story_id,
|
|
||||||
"4_merge",
|
|
||||||
"---\nname: AC5 auto-spawn test\n---\n",
|
|
||||||
crate::db::ItemMeta::named("AC5 auto-spawn test"),
|
|
||||||
);
|
|
||||||
// Transition to MergeFailure(ConflictDetected) via lifecycle — no direct
|
|
||||||
// content-store writes in this test body.
|
|
||||||
crate::agents::lifecycle::transition_to_merge_failure(
|
|
||||||
story_id,
|
|
||||||
crate::pipeline_state::MergeFailureKind::ConflictDetected(Some(
|
|
||||||
"CONFLICT (content): server/src/lib.rs".to_string(),
|
|
||||||
)),
|
|
||||||
)
|
|
||||||
.expect("transition to MergeFailure(ConflictDetected) should succeed");
|
|
||||||
|
|
||||||
// Verify the stage kind before triggering auto-assign.
|
|
||||||
let item = crate::pipeline_state::read_typed(story_id)
|
|
||||||
.unwrap()
|
|
||||||
.unwrap();
|
|
||||||
assert!(
|
|
||||||
matches!(
|
|
||||||
item.stage,
|
|
||||||
crate::pipeline_state::Stage::MergeFailure {
|
|
||||||
kind: crate::pipeline_state::MergeFailureKind::ConflictDetected(_),
|
|
||||||
..
|
|
||||||
}
|
|
||||||
),
|
|
||||||
"stage must be MergeFailure(ConflictDetected) before auto-assign: {:?}",
|
|
||||||
item.stage
|
|
||||||
);
|
|
||||||
|
|
||||||
// One auto-assign cycle should spawn mergemaster.
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
pool.auto_assign_available_work(tmp.path()).await;
|
|
||||||
|
|
||||||
let agents = pool.agents.lock().unwrap();
|
|
||||||
let mergemaster_spawned = agents.iter().any(|(key, a)| {
|
|
||||||
key.contains(story_id)
|
|
||||||
&& a.agent_name == "mergemaster"
|
|
||||||
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
mergemaster_spawned,
|
|
||||||
"mergemaster must be auto-spawned for ConflictDetected kind in one auto-assign cycle"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
//! Merge stage dispatch: trigger server-side merges and auto-spawn mergemaster for content conflicts.
|
//! Merge stage dispatch: trigger server-side squash-merges for stories in `Stage::Merge`.
|
||||||
|
|
||||||
use std::num::NonZeroU32;
|
use std::num::NonZeroU32;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
@@ -12,20 +12,19 @@ use crate::worktree;
|
|||||||
|
|
||||||
use super::super::super::PipelineStage;
|
use super::super::super::PipelineStage;
|
||||||
use super::super::AgentPool;
|
use super::super::AgentPool;
|
||||||
use super::scan::{find_free_agent_for_stage, is_story_assigned_for_stage, scan_stage_items};
|
use super::scan::{is_story_assigned_for_stage, scan_stage_items};
|
||||||
use super::story_checks::{
|
use super::story_checks::{
|
||||||
has_content_conflict_failure, has_mergemaster_attempted, has_review_hold,
|
has_review_hold, has_unmet_dependencies, is_story_blocked, is_story_frozen,
|
||||||
has_unmet_dependencies, is_story_blocked, is_story_frozen,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
impl AgentPool {
|
impl AgentPool {
|
||||||
/// Process stories in `4_merge/`: trigger server-side squash-merges and auto-spawn
|
/// Process stories in `4_merge/`: trigger server-side squash-merges.
|
||||||
/// a mergemaster agent when a content-conflict failure is detected.
|
|
||||||
///
|
///
|
||||||
/// Stories with a recorded merge failure may be eligible for automatic mergemaster
|
/// Each eligible story without an active merge job triggers
|
||||||
/// dispatch when the failure is a content conflict — otherwise they need human
|
/// `trigger_server_side_merge`. Mergemaster auto-spawn for
|
||||||
/// intervention. Each eligible story without an active merge job triggers
|
/// `Stage::MergeFailure` stories is handled by the
|
||||||
/// `trigger_server_side_merge`.
|
/// [`merge_failure_subscriber`][super::merge_failure_subscriber] — this
|
||||||
|
/// function no longer scans the `merge_failure` stage.
|
||||||
pub(super) async fn assign_merge_stage(&self, project_root: &Path, config: &ProjectConfig) {
|
pub(super) async fn assign_merge_stage(&self, project_root: &Path, config: &ProjectConfig) {
|
||||||
// ── 4_merge: deterministic server-side merge (no LLM agent) ──────────
|
// ── 4_merge: deterministic server-side merge (no LLM agent) ──────────
|
||||||
//
|
//
|
||||||
@@ -113,59 +112,5 @@ impl AgentPool {
|
|||||||
slog!("[auto-assign] Triggering server-side merge for '{story_id}' in 4_merge/");
|
slog!("[auto-assign] Triggering server-side merge for '{story_id}' in 4_merge/");
|
||||||
self.trigger_server_side_merge(project_root, story_id);
|
self.trigger_server_side_merge(project_root, story_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── 4_merge_failure: auto-spawn mergemaster on content conflict ───────
|
|
||||||
//
|
|
||||||
// Stories transition to 4_merge_failure when the server-side merge fails.
|
|
||||||
// Content conflicts get one automatic mergemaster attempt; other failures
|
|
||||||
// require human intervention.
|
|
||||||
let merge_failure_stage = Stage::MergeFailure {
|
|
||||||
kind: crate::pipeline_state::MergeFailureKind::Other(String::new()),
|
|
||||||
feature_branch: BranchName(String::new()),
|
|
||||||
commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"),
|
|
||||||
};
|
|
||||||
let merge_failure_items = scan_stage_items(&merge_failure_stage);
|
|
||||||
for story_id in &merge_failure_items {
|
|
||||||
if has_content_conflict_failure(story_id) && !has_mergemaster_attempted(story_id) {
|
|
||||||
let mergemaster_agent = {
|
|
||||||
let agents = match self.agents.lock() {
|
|
||||||
Ok(a) => a,
|
|
||||||
Err(e) => {
|
|
||||||
slog_error!(
|
|
||||||
"[auto-assign] Failed to lock agents for mergemaster check: {e}"
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if is_story_assigned_for_stage(
|
|
||||||
config,
|
|
||||||
&agents,
|
|
||||||
story_id,
|
|
||||||
&PipelineStage::Mergemaster,
|
|
||||||
) {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
find_free_agent_for_stage(config, &agents, &PipelineStage::Mergemaster)
|
|
||||||
.map(str::to_string)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(agent_name) = mergemaster_agent {
|
|
||||||
slog!(
|
|
||||||
"[auto-assign] Content conflict on '{story_id}'; \
|
|
||||||
auto-spawning mergemaster '{agent_name}'."
|
|
||||||
);
|
|
||||||
if let Err(e) = self
|
|
||||||
.start_agent(project_root, story_id, Some(&agent_name), None, None)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
slog!(
|
|
||||||
"[auto-assign] Failed to start mergemaster '{agent_name}' \
|
|
||||||
for '{story_id}': {e}"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,368 @@
|
|||||||
|
//! TransitionFired subscriber that auto-spawns mergemaster on ConflictDetected merge failures.
|
||||||
|
//!
|
||||||
|
//! Listens on the pipeline transition broadcast channel and schedules a
|
||||||
|
//! mergemaster agent whenever a story enters
|
||||||
|
//! `Stage::MergeFailure { kind: ConflictDetected(_), .. }`.
|
||||||
|
//! Other [`MergeFailureKind`] variants require human intervention and are
|
||||||
|
//! intentionally ignored here.
|
||||||
|
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use crate::pipeline_state::{MergeFailureKind, Stage};
|
||||||
|
use crate::slog;
|
||||||
|
use crate::slog_warn;
|
||||||
|
|
||||||
|
use super::super::super::PipelineStage;
|
||||||
|
use super::super::AgentPool;
|
||||||
|
use super::scan::{find_free_agent_for_stage, is_story_assigned_for_stage};
|
||||||
|
|
||||||
|
/// Spawn a background task that auto-spawns mergemaster agents on
|
||||||
|
/// `Stage::MergeFailure { kind: ConflictDetected(_) }` transitions.
|
||||||
|
///
|
||||||
|
/// The task subscribes to the pipeline transition broadcast channel and calls
|
||||||
|
/// [`AgentPool::start_agent`] with the first free mergemaster agent whenever a
|
||||||
|
/// story transitions into a recoverable conflict state. All other
|
||||||
|
/// [`MergeFailureKind`] variants are silently skipped — they need a human.
|
||||||
|
pub(crate) fn spawn_merge_failure_subscriber(pool: Arc<AgentPool>, project_root: PathBuf) {
|
||||||
|
let mut rx = crate::pipeline_state::subscribe_transitions();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
match rx.recv().await {
|
||||||
|
Ok(fired) => {
|
||||||
|
on_merge_failure_transition(&pool, &project_root, &fired).await;
|
||||||
|
}
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||||
|
slog_warn!(
|
||||||
|
"[merge-failure-sub] Subscriber lagged, skipped {n} event(s). \
|
||||||
|
ConflictDetected stories may need manual mergemaster spawn."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn on_merge_failure_transition(
|
||||||
|
pool: &AgentPool,
|
||||||
|
project_root: &Path,
|
||||||
|
fired: &crate::pipeline_state::TransitionFired,
|
||||||
|
) {
|
||||||
|
let Stage::MergeFailure { ref kind, .. } = fired.after else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let story_id = &fired.story_id.0;
|
||||||
|
|
||||||
|
match kind {
|
||||||
|
MergeFailureKind::ConflictDetected(_) => {
|
||||||
|
let config = match crate::config::ProjectConfig::load(project_root) {
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(e) => {
|
||||||
|
slog_warn!("[merge-failure-sub] Failed to load config for '{story_id}': {e}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let agent_name = {
|
||||||
|
let agents = match pool.agents.lock() {
|
||||||
|
Ok(a) => a,
|
||||||
|
Err(e) => {
|
||||||
|
slog_warn!(
|
||||||
|
"[merge-failure-sub] Failed to lock agent pool for '{story_id}': {e}"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if is_story_assigned_for_stage(
|
||||||
|
&config,
|
||||||
|
&agents,
|
||||||
|
story_id,
|
||||||
|
&PipelineStage::Mergemaster,
|
||||||
|
) {
|
||||||
|
return; // mergemaster already running for this story
|
||||||
|
}
|
||||||
|
find_free_agent_for_stage(&config, &agents, &PipelineStage::Mergemaster)
|
||||||
|
.map(str::to_string)
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(agent) = agent_name {
|
||||||
|
slog!(
|
||||||
|
"[merge-failure-sub] ConflictDetected on '{story_id}'; \
|
||||||
|
auto-spawning mergemaster '{agent}'."
|
||||||
|
);
|
||||||
|
if let Err(e) = pool
|
||||||
|
.start_agent(project_root, story_id, Some(&agent), None, None)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
slog!("[merge-failure-sub] Failed to spawn '{agent}' for '{story_id}': {e}");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
slog!(
|
||||||
|
"[merge-failure-sub] ConflictDetected on '{story_id}'; \
|
||||||
|
no free mergemaster agent available."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// GatesFailed, EmptyDiff, NoCommits, Other — all require human intervention.
|
||||||
|
MergeFailureKind::GatesFailed(_)
|
||||||
|
| MergeFailureKind::EmptyDiff
|
||||||
|
| MergeFailureKind::NoCommits
|
||||||
|
| MergeFailureKind::Other(_) => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Tests ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::agents::{AgentPool, AgentStatus};
|
||||||
|
use crate::io::watcher::WatcherEvent;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
|
fn setup_project(tmp: &tempfile::TempDir) {
|
||||||
|
let sk = tmp.path().join(".huskies");
|
||||||
|
std::fs::create_dir_all(&sk).unwrap();
|
||||||
|
std::fs::write(
|
||||||
|
sk.join("project.toml"),
|
||||||
|
"[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn seed_at_merge(story_id: &str) {
|
||||||
|
crate::crdt_state::init_for_test();
|
||||||
|
crate::db::ensure_content_store();
|
||||||
|
crate::db::write_item_with_content(
|
||||||
|
story_id,
|
||||||
|
"4_merge",
|
||||||
|
"---\nname: Test\n---\n",
|
||||||
|
crate::db::ItemMeta::named("Test"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_pool(port: u16) -> Arc<AgentPool> {
|
||||||
|
let (tx, _) = broadcast::channel::<WatcherEvent>(4);
|
||||||
|
Arc::new(AgentPool::new(port, tx))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_fired(
|
||||||
|
story_id: &str,
|
||||||
|
kind: MergeFailureKind,
|
||||||
|
) -> crate::pipeline_state::TransitionFired {
|
||||||
|
use crate::pipeline_state::{BranchName, PipelineEvent, StoryId, TransitionFired};
|
||||||
|
use std::num::NonZeroU32;
|
||||||
|
TransitionFired {
|
||||||
|
story_id: StoryId(story_id.to_string()),
|
||||||
|
before: crate::pipeline_state::Stage::Merge {
|
||||||
|
feature_branch: BranchName("feature/test".to_string()),
|
||||||
|
commits_ahead: NonZeroU32::new(1).unwrap(),
|
||||||
|
},
|
||||||
|
after: crate::pipeline_state::Stage::MergeFailure {
|
||||||
|
kind: kind.clone(),
|
||||||
|
feature_branch: BranchName("feature/test".to_string()),
|
||||||
|
commits_ahead: NonZeroU32::new(1).unwrap(),
|
||||||
|
},
|
||||||
|
event: PipelineEvent::MergeFailed { kind },
|
||||||
|
at: chrono::Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── AC4: each MergeFailureKind variant ──────────────────────────────────
|
||||||
|
|
||||||
|
/// ConflictDetected → on_merge_failure_transition must spawn mergemaster.
|
||||||
|
///
|
||||||
|
/// Calls the handler directly (not via the broadcast subscriber) to avoid
|
||||||
|
/// cross-test channel contamination from the global TRANSITION_TX.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn conflict_detected_spawns_mergemaster_via_subscriber() {
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
setup_project(&tmp);
|
||||||
|
let story_id = "998_sub_conflict";
|
||||||
|
seed_at_merge(story_id);
|
||||||
|
|
||||||
|
let pool = make_pool(3998);
|
||||||
|
let fired = make_fired(
|
||||||
|
story_id,
|
||||||
|
MergeFailureKind::ConflictDetected(Some("CONFLICT (content): src/lib.rs".to_string())),
|
||||||
|
);
|
||||||
|
on_merge_failure_transition(&pool, tmp.path(), &fired).await;
|
||||||
|
|
||||||
|
let agents = pool.agents.lock().unwrap();
|
||||||
|
assert!(
|
||||||
|
agents.iter().any(|(key, a)| {
|
||||||
|
key.contains(story_id)
|
||||||
|
&& a.agent_name == "mergemaster"
|
||||||
|
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
||||||
|
}),
|
||||||
|
"mergemaster must be spawned for ConflictDetected"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// GatesFailed → subscriber must NOT spawn mergemaster (human intervention needed).
|
||||||
|
#[tokio::test]
|
||||||
|
async fn gates_failed_does_not_spawn_mergemaster() {
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
setup_project(&tmp);
|
||||||
|
let story_id = "998_sub_gates";
|
||||||
|
seed_at_merge(story_id);
|
||||||
|
|
||||||
|
let pool = make_pool(3997);
|
||||||
|
spawn_merge_failure_subscriber(Arc::clone(&pool), tmp.path().to_path_buf());
|
||||||
|
|
||||||
|
crate::agents::lifecycle::transition_to_merge_failure(
|
||||||
|
story_id,
|
||||||
|
MergeFailureKind::GatesFailed("error[E0308]: mismatched types".to_string()),
|
||||||
|
)
|
||||||
|
.expect("transition must succeed");
|
||||||
|
|
||||||
|
// Give the subscriber time to run (it should do nothing).
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
let agents = pool.agents.lock().unwrap();
|
||||||
|
let spawned = agents.iter().any(|(key, a)| {
|
||||||
|
key.contains(story_id)
|
||||||
|
&& a.agent_name == "mergemaster"
|
||||||
|
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
||||||
|
});
|
||||||
|
assert!(!spawned, "mergemaster must NOT be spawned for GatesFailed");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// EmptyDiff → subscriber must NOT spawn mergemaster.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn empty_diff_does_not_spawn_mergemaster() {
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
setup_project(&tmp);
|
||||||
|
let story_id = "998_sub_emptydiff";
|
||||||
|
seed_at_merge(story_id);
|
||||||
|
|
||||||
|
let pool = make_pool(3996);
|
||||||
|
spawn_merge_failure_subscriber(Arc::clone(&pool), tmp.path().to_path_buf());
|
||||||
|
|
||||||
|
crate::agents::lifecycle::transition_to_merge_failure(
|
||||||
|
story_id,
|
||||||
|
MergeFailureKind::EmptyDiff,
|
||||||
|
)
|
||||||
|
.expect("transition must succeed");
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
let agents = pool.agents.lock().unwrap();
|
||||||
|
let spawned = agents.iter().any(|(key, a)| {
|
||||||
|
key.contains(story_id)
|
||||||
|
&& a.agent_name == "mergemaster"
|
||||||
|
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
||||||
|
});
|
||||||
|
assert!(!spawned, "mergemaster must NOT be spawned for EmptyDiff");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// NoCommits → subscriber must NOT spawn mergemaster.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn no_commits_does_not_spawn_mergemaster() {
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
setup_project(&tmp);
|
||||||
|
let story_id = "998_sub_nocommits";
|
||||||
|
seed_at_merge(story_id);
|
||||||
|
|
||||||
|
let pool = make_pool(3995);
|
||||||
|
spawn_merge_failure_subscriber(Arc::clone(&pool), tmp.path().to_path_buf());
|
||||||
|
|
||||||
|
crate::agents::lifecycle::transition_to_merge_failure(
|
||||||
|
story_id,
|
||||||
|
MergeFailureKind::NoCommits,
|
||||||
|
)
|
||||||
|
.expect("transition must succeed");
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
let agents = pool.agents.lock().unwrap();
|
||||||
|
let spawned = agents.iter().any(|(key, a)| {
|
||||||
|
key.contains(story_id)
|
||||||
|
&& a.agent_name == "mergemaster"
|
||||||
|
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
||||||
|
});
|
||||||
|
assert!(!spawned, "mergemaster must NOT be spawned for NoCommits");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Other(_) → subscriber must NOT spawn mergemaster.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn other_does_not_spawn_mergemaster() {
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
setup_project(&tmp);
|
||||||
|
let story_id = "998_sub_other";
|
||||||
|
seed_at_merge(story_id);
|
||||||
|
|
||||||
|
let pool = make_pool(3994);
|
||||||
|
spawn_merge_failure_subscriber(Arc::clone(&pool), tmp.path().to_path_buf());
|
||||||
|
|
||||||
|
crate::agents::lifecycle::transition_to_merge_failure(
|
||||||
|
story_id,
|
||||||
|
MergeFailureKind::Other("unknown error".to_string()),
|
||||||
|
)
|
||||||
|
.expect("transition must succeed");
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
let agents = pool.agents.lock().unwrap();
|
||||||
|
let spawned = agents.iter().any(|(key, a)| {
|
||||||
|
key.contains(story_id)
|
||||||
|
&& a.agent_name == "mergemaster"
|
||||||
|
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
||||||
|
});
|
||||||
|
assert!(!spawned, "mergemaster must NOT be spawned for Other");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// ConflictDetected self-loop — handler must NOT spawn a second mergemaster
|
||||||
|
/// when one is already Pending/Running for the story.
|
||||||
|
///
|
||||||
|
/// Calls the handler twice directly (no broadcast subscriber) so there is no
|
||||||
|
/// timing window: the first call sets the agent to Pending synchronously,
|
||||||
|
/// and the second call sees that Pending entry and returns early.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn conflict_detected_self_loop_does_not_double_spawn() {
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
setup_project(&tmp);
|
||||||
|
let story_id = "998_sub_selfloop";
|
||||||
|
seed_at_merge(story_id);
|
||||||
|
|
||||||
|
let pool = make_pool(3993);
|
||||||
|
let fired = make_fired(
|
||||||
|
story_id,
|
||||||
|
MergeFailureKind::ConflictDetected(Some("CONFLICT".to_string())),
|
||||||
|
);
|
||||||
|
|
||||||
|
// First call — spawns mergemaster (agent enters Pending).
|
||||||
|
on_merge_failure_transition(&pool, tmp.path(), &fired).await;
|
||||||
|
{
|
||||||
|
let agents = pool.agents.lock().unwrap();
|
||||||
|
assert!(
|
||||||
|
agents.iter().any(|(key, a)| {
|
||||||
|
key.contains(story_id)
|
||||||
|
&& a.agent_name == "mergemaster"
|
||||||
|
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
||||||
|
}),
|
||||||
|
"mergemaster must be Pending after first ConflictDetected"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second call (self-loop) — agent is still Pending; guard must prevent double-spawn.
|
||||||
|
on_merge_failure_transition(&pool, tmp.path(), &fired).await;
|
||||||
|
|
||||||
|
let agents = pool.agents.lock().unwrap();
|
||||||
|
let active_count = agents
|
||||||
|
.iter()
|
||||||
|
.filter(|(key, a)| {
|
||||||
|
key.contains(story_id)
|
||||||
|
&& a.agent_name == "mergemaster"
|
||||||
|
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
||||||
|
})
|
||||||
|
.count();
|
||||||
|
assert_eq!(
|
||||||
|
active_count, 1,
|
||||||
|
"mergemaster must not be double-spawned on ConflictDetected self-loop"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,6 +4,8 @@
|
|||||||
mod auto_assign;
|
mod auto_assign;
|
||||||
mod backlog;
|
mod backlog;
|
||||||
mod merge;
|
mod merge;
|
||||||
|
/// TransitionFired subscriber that auto-spawns mergemaster on ConflictDetected merge failures.
|
||||||
|
pub(crate) mod merge_failure_subscriber;
|
||||||
mod pipeline;
|
mod pipeline;
|
||||||
mod reconcile;
|
mod reconcile;
|
||||||
mod scan;
|
mod scan;
|
||||||
@@ -13,3 +15,6 @@ pub(crate) mod watchdog;
|
|||||||
// Re-export items that were pub(super) in the original monolithic auto_assign.rs
|
// Re-export items that were pub(super) in the original monolithic auto_assign.rs
|
||||||
// so that pool::lifecycle and pool::pipeline continue to access them unchanged.
|
// so that pool::lifecycle and pool::pipeline continue to access them unchanged.
|
||||||
pub(super) use scan::{find_free_agent_for_stage, is_agent_free};
|
pub(super) use scan::{find_free_agent_for_stage, is_agent_free};
|
||||||
|
|
||||||
|
/// Re-export for `startup::tick_loop`.
|
||||||
|
pub(crate) use merge_failure_subscriber::spawn_merge_failure_subscriber;
|
||||||
|
|||||||
@@ -50,46 +50,6 @@ pub(super) fn is_story_blocked(story_id: &str) -> bool {
|
|||||||
.unwrap_or(false)
|
.unwrap_or(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return `true` if the story's merge failure is a git content-conflict
|
|
||||||
/// (`Stage::MergeFailure { kind: ConflictDetected(_), .. }`).
|
|
||||||
///
|
|
||||||
/// Used by the auto-assigner to decide whether to spawn mergemaster automatically.
|
|
||||||
/// The typed kind is carried by the CRDT projection layer (which reads
|
|
||||||
/// `ContentKey::GateOutput` on projection to reconstruct the kind on restart),
|
|
||||||
/// so no direct content-store access is needed here (story 982).
|
|
||||||
pub(super) fn has_content_conflict_failure(story_id: &str) -> bool {
|
|
||||||
crate::pipeline_state::read_typed(story_id)
|
|
||||||
.ok()
|
|
||||||
.flatten()
|
|
||||||
.map(|item| {
|
|
||||||
matches!(
|
|
||||||
item.stage,
|
|
||||||
crate::pipeline_state::Stage::MergeFailure {
|
|
||||||
kind: crate::pipeline_state::MergeFailureKind::ConflictDetected(_),
|
|
||||||
..
|
|
||||||
}
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.unwrap_or(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return `true` if the story is in `Stage::MergeFailureFinal`.
|
|
||||||
///
|
|
||||||
/// Story 945: `Stage::MergeFailureFinal` is the single source of truth —
|
|
||||||
/// the legacy `mergemaster_attempted: bool` CRDT register has been deleted.
|
|
||||||
/// Used to prevent the auto-assigner from repeatedly spawning mergemaster for
|
|
||||||
/// the same story after a failed mergemaster session.
|
|
||||||
pub(super) fn has_mergemaster_attempted(story_id: &str) -> bool {
|
|
||||||
crate::crdt_state::read_item(story_id)
|
|
||||||
.map(|view| {
|
|
||||||
matches!(
|
|
||||||
view.stage(),
|
|
||||||
crate::pipeline_state::Stage::MergeFailureFinal { .. }
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.unwrap_or(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return `true` if the story has any `depends_on` entries that are not yet in
|
/// Return `true` if the story has any `depends_on` entries that are not yet in
|
||||||
/// `5_done` or `6_archived`. Reads dependency state from the CRDT (story 929).
|
/// `5_done` or `6_archived`. Reads dependency state from the CRDT (story 929).
|
||||||
pub(super) fn has_unmet_dependencies(story_id: &str) -> bool {
|
pub(super) fn has_unmet_dependencies(story_id: &str) -> bool {
|
||||||
@@ -345,81 +305,4 @@ mod tests {
|
|||||||
let archived_deps = check_archived_dependencies("503_story_waiting");
|
let archived_deps = check_archived_dependencies("503_story_waiting");
|
||||||
assert!(archived_deps.is_empty());
|
assert!(archived_deps.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Story 982: typed MergeFailureKind — has_content_conflict_failure ──────
|
|
||||||
|
|
||||||
/// AC2 (story 982): `has_content_conflict_failure` returns `true` when the
|
|
||||||
/// story is in `Stage::MergeFailure { kind: ConflictDetected(_), .. }`.
|
|
||||||
/// The test seeds the stage via `transition_to_merge_failure` (no direct
|
|
||||||
/// content-store or MergeJob writes in the test body).
|
|
||||||
#[test]
|
|
||||||
fn has_content_conflict_failure_true_for_conflict_detected_kind() {
|
|
||||||
crate::crdt_state::init_for_test();
|
|
||||||
crate::db::ensure_content_store();
|
|
||||||
let story_id = "982_ac2_conflict_detected";
|
|
||||||
// Seed at Merge stage so the transition is valid.
|
|
||||||
crate::db::write_item_with_content(
|
|
||||||
story_id,
|
|
||||||
"4_merge",
|
|
||||||
"---\nname: AC2 conflict test\n---\n",
|
|
||||||
crate::db::ItemMeta::named("AC2 conflict test"),
|
|
||||||
);
|
|
||||||
// Transition via the lifecycle helper — internally writes ContentKey::GateOutput
|
|
||||||
// so the CRDT projection can reconstruct the kind; no content-store writes here.
|
|
||||||
crate::agents::lifecycle::transition_to_merge_failure(
|
|
||||||
story_id,
|
|
||||||
crate::pipeline_state::MergeFailureKind::ConflictDetected(Some(
|
|
||||||
"CONFLICT (content): server/src/lib.rs".to_string(),
|
|
||||||
)),
|
|
||||||
)
|
|
||||||
.expect("transition should succeed");
|
|
||||||
|
|
||||||
// The typed match now drives the predicate — no substring scan.
|
|
||||||
assert!(
|
|
||||||
has_content_conflict_failure(story_id),
|
|
||||||
"has_content_conflict_failure must be true for ConflictDetected kind"
|
|
||||||
);
|
|
||||||
// Verify the projected stage carries the typed kind.
|
|
||||||
let item = crate::pipeline_state::read_typed(story_id)
|
|
||||||
.unwrap()
|
|
||||||
.unwrap();
|
|
||||||
assert!(
|
|
||||||
matches!(
|
|
||||||
item.stage,
|
|
||||||
crate::pipeline_state::Stage::MergeFailure {
|
|
||||||
kind: crate::pipeline_state::MergeFailureKind::ConflictDetected(_),
|
|
||||||
..
|
|
||||||
}
|
|
||||||
),
|
|
||||||
"stage must be MergeFailure(ConflictDetected): {:?}",
|
|
||||||
item.stage
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// AC2 (story 982): `has_content_conflict_failure` returns `false` when the
|
|
||||||
/// kind is `GatesFailed` — no mergemaster spawn for gate-only failures.
|
|
||||||
#[test]
|
|
||||||
fn has_content_conflict_failure_false_for_gates_failed_kind() {
|
|
||||||
crate::crdt_state::init_for_test();
|
|
||||||
crate::db::ensure_content_store();
|
|
||||||
let story_id = "982_ac2_gates_failed";
|
|
||||||
crate::db::write_item_with_content(
|
|
||||||
story_id,
|
|
||||||
"4_merge",
|
|
||||||
"---\nname: AC2 gates test\n---\n",
|
|
||||||
crate::db::ItemMeta::named("AC2 gates test"),
|
|
||||||
);
|
|
||||||
crate::agents::lifecycle::transition_to_merge_failure(
|
|
||||||
story_id,
|
|
||||||
crate::pipeline_state::MergeFailureKind::GatesFailed(
|
|
||||||
"error[clippy::unused_variable]".to_string(),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.expect("transition should succeed");
|
|
||||||
|
|
||||||
assert!(
|
|
||||||
!has_content_conflict_failure(story_id),
|
|
||||||
"has_content_conflict_failure must be false for GatesFailed kind"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -99,6 +99,7 @@ pub fn apply_transition(
|
|||||||
stage_label(&fired.after),
|
stage_label(&fired.after),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
super::events::try_broadcast(&fired);
|
||||||
Ok(fired)
|
Ok(fired)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,9 +1,40 @@
|
|||||||
//! Event bus for pipeline state transitions.
|
//! Event bus for pipeline state transitions.
|
||||||
|
|
||||||
#![allow(unused_imports, dead_code)]
|
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
|
use std::sync::OnceLock;
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
use super::{BranchName, PipelineEvent, Stage, StoryId};
|
use super::{PipelineEvent, Stage, StoryId};
|
||||||
|
|
||||||
|
// ── Static transition broadcast channel ─────────────────────────────────────
|
||||||
|
|
||||||
|
static TRANSITION_TX: OnceLock<broadcast::Sender<TransitionFired>> = OnceLock::new();
|
||||||
|
|
||||||
|
fn get_or_init_tx() -> &'static broadcast::Sender<TransitionFired> {
|
||||||
|
TRANSITION_TX.get_or_init(|| {
|
||||||
|
let (tx, _) = broadcast::channel(256);
|
||||||
|
tx
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Subscribe to all pipeline stage transitions.
|
||||||
|
///
|
||||||
|
/// Every call to [`apply_transition`][super::apply_transition] broadcasts the
|
||||||
|
/// resulting [`TransitionFired`] on this channel. Returns a new receiver that
|
||||||
|
/// replays events from the moment of subscription. Lagged receivers silently
|
||||||
|
/// skip missed events — callers should handle
|
||||||
|
/// [`broadcast::error::RecvError::Lagged`].
|
||||||
|
pub fn subscribe_transitions() -> broadcast::Receiver<TransitionFired> {
|
||||||
|
get_or_init_tx().subscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Broadcast `fired` to all active transition subscribers.
|
||||||
|
///
|
||||||
|
/// Called from [`apply_transition`][super::apply] after writing the new stage
|
||||||
|
/// to the CRDT. No-ops safely when there are no subscribers.
|
||||||
|
pub(super) fn try_broadcast(fired: &TransitionFired) {
|
||||||
|
let _ = get_or_init_tx().send(fired.clone());
|
||||||
|
}
|
||||||
|
|
||||||
/// Fired when a pipeline stage transition completes.
|
/// Fired when a pipeline stage transition completes.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@@ -55,9 +86,9 @@ impl Default for EventBus {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use super::super::BranchName;
|
||||||
use super::*;
|
use super::*;
|
||||||
use std::num::NonZeroU32;
|
use std::num::NonZeroU32;
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
|
|
||||||
fn nz(n: u32) -> NonZeroU32 {
|
fn nz(n: u32) -> NonZeroU32 {
|
||||||
NonZeroU32::new(n).unwrap()
|
NonZeroU32::new(n).unwrap()
|
||||||
|
|||||||
@@ -50,7 +50,7 @@ pub use transition::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
pub use events::{EventBus, TransitionFired, TransitionSubscriber};
|
pub use events::{EventBus, TransitionFired, TransitionSubscriber, subscribe_transitions};
|
||||||
|
|
||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
pub use projection::ProjectionError;
|
pub use projection::ProjectionError;
|
||||||
|
|||||||
@@ -66,6 +66,15 @@ pub(crate) fn spawn_event_bridges(
|
|||||||
// ensures that MergeFailure and other non-"active" stages are covered
|
// ensures that MergeFailure and other non-"active" stages are covered
|
||||||
// without any per-stage special-casing.
|
// without any per-stage special-casing.
|
||||||
if let Some(root) = project_root {
|
if let Some(root) = project_root {
|
||||||
|
// Mergemaster auto-spawn subscriber: reacts to TransitionFired events for
|
||||||
|
// Stage::MergeFailure { kind: ConflictDetected } and spawns mergemaster
|
||||||
|
// directly from the typed event, eliminating the predicate-mismatch
|
||||||
|
// failure mode of the previous scan-loop approach (story 998).
|
||||||
|
crate::agents::pool::auto_assign::spawn_merge_failure_subscriber(
|
||||||
|
Arc::clone(&agents),
|
||||||
|
root.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
let watcher_auto_rx = watcher_tx.subscribe();
|
let watcher_auto_rx = watcher_tx.subscribe();
|
||||||
let watcher_auto_agents = Arc::clone(&agents);
|
let watcher_auto_agents = Arc::clone(&agents);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|||||||
Reference in New Issue
Block a user