huskies: merge 958

This commit is contained in:
dave
2026-05-13 11:47:27 +00:00
parent 8b53e20ca9
commit 28338a8e8d
7 changed files with 103 additions and 31 deletions
+13 -3
View File
@@ -41,8 +41,13 @@ pub(super) async fn scan_and_claim(
};
for item in &items {
// Only claim stories in active stages.
if !item.stage().is_active() {
// Only claim stories in execution stages (Coding, Qa, Merge).
if !matches!(
item.stage(),
crate::pipeline_state::Stage::Coding
| crate::pipeline_state::Stage::Qa
| crate::pipeline_state::Stage::Merge { .. }
) {
continue;
}
@@ -163,7 +168,12 @@ pub(super) fn reclaim_timed_out_work(_project_root: &Path) {
let now = chrono::Utc::now().timestamp() as f64;
for item in &items {
if !item.stage().is_active() {
if !matches!(
item.stage(),
crate::pipeline_state::Stage::Coding
| crate::pipeline_state::Stage::Qa
| crate::pipeline_state::Stage::Merge { .. }
) {
continue;
}
+2 -5
View File
@@ -123,7 +123,7 @@ pub async fn run(
}
}
// Subscribe to watcher events to trigger auto-assign on stage transitions.
// Subscribe to watcher events to trigger auto-assign on every stage transition.
{
let auto_rx = watcher_tx.subscribe();
let auto_agents = Arc::clone(&agents);
@@ -131,10 +131,7 @@ pub async fn run(
tokio::spawn(async move {
let mut rx = auto_rx;
while let Ok(event) = rx.recv().await {
if let watcher::WatcherEvent::WorkItem { ref stage, .. } = event
&& crate::pipeline_state::Stage::from_dir(stage.as_str())
.is_some_and(|s| s.is_active())
{
if let watcher::WatcherEvent::WorkItem { ref stage, .. } = event {
slog!("[agent-mode] CRDT transition in {stage}/; triggering auto-assign.");
auto_agents.auto_assign_available_work(&auto_root).await;
}
+2 -3
View File
@@ -643,10 +643,9 @@ mod tests {
"stage should be Stage::Merge after unblock, got: {:?}",
item.stage
);
// auto_assign checks is_active() — Merge satisfies it.
assert!(
item.stage.is_active(),
"Merge satisfies is_active() so auto_assign can pick it up: {:?}",
matches!(item.stage, Stage::Merge { .. }),
"stage should be Stage::Merge so auto_assign can pick it up: {:?}",
item.stage
);
}
@@ -814,4 +814,76 @@ mod tests {
found {active_coder_count} active entries"
);
}
// ── Story 958: MergeFailure transition fires auto-assign via watcher bridge ─
/// Regression: before story 958, the auto-assign subscriber filtered events
/// with `is_active()`, which returned false for `MergeFailure`. This meant
/// a CRDT `MergeFailure` transition never triggered auto-assign, and
/// mergemaster was never auto-spawned on content conflicts.
///
/// After story 958, the subscriber fires on EVERY WorkItem event. This
/// test verifies the end-to-end path: a WorkItem event with stage
/// `merge_failure` arriving on the watcher channel causes
/// `auto_assign_available_work` to run, which then auto-spawns mergemaster.
#[tokio::test]
async fn merge_failure_watcher_event_triggers_mergemaster_spawn() {
use std::sync::Arc;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path().to_path_buf();
let sk = root.join(".huskies");
std::fs::create_dir_all(&sk).unwrap();
std::fs::write(
sk.join("project.toml"),
"[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
)
.unwrap();
crate::crdt_state::init_for_test();
crate::db::ensure_content_store();
crate::db::write_item_with_content(
"958_regression_conflict",
"4_merge_failure",
"CONFLICT (content): server/src/lib.rs",
crate::db::ItemMeta::named("Regression"),
);
crate::db::write_content(
crate::db::ContentKey::GateOutput("958_regression_conflict"),
"CONFLICT (content): server/src/lib.rs",
);
let (watcher_tx, _) = broadcast::channel::<crate::io::watcher::WatcherEvent>(16);
let pool = Arc::new(AgentPool::new(3102, watcher_tx.clone()));
crate::startup::tick_loop::spawn_event_bridges(
watcher_tx.clone(),
Some(root.clone()),
Arc::clone(&pool),
);
// Simulate the CRDT bridge forwarding a merge_failure stage transition.
let _ = watcher_tx.send(crate::io::watcher::WatcherEvent::WorkItem {
stage: "merge_failure".to_string(),
item_id: "958_regression_conflict".to_string(),
action: "update".to_string(),
commit_msg: "huskies: update 958_regression_conflict".to_string(),
from_stage: Some("merge".to_string()),
});
// Allow the subscriber task to run auto_assign_available_work.
tokio::task::yield_now().await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let agents = pool.agents.lock().unwrap();
let mergemaster_spawned = agents.iter().any(|(key, a)| {
key.contains("958_regression_conflict")
&& a.agent_name == "mergemaster"
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
});
assert!(
mergemaster_spawned,
"mergemaster must be auto-spawned when a merge_failure event fires \
through the watcher bridge (story 958 regression)"
);
}
}
+6 -1
View File
@@ -28,7 +28,12 @@ pub(super) fn find_active_story_stage(
story_id: &str,
) -> Option<crate::pipeline_state::Stage> {
if let Ok(Some(item)) = crate::pipeline_state::read_typed(story_id)
&& item.stage.is_active()
&& matches!(
item.stage,
crate::pipeline_state::Stage::Coding
| crate::pipeline_state::Stage::Qa
| crate::pipeline_state::Stage::Merge { .. }
)
{
return Some(item.stage);
}
+2 -12
View File
@@ -165,16 +165,6 @@ pub enum ArchiveReason {
// ── Stage convenience methods ──────────────────────────────────────────────
impl Stage {
/// Returns true if this stage is an "active" stage (Coding, Qa, or Merge).
pub fn is_active(&self) -> bool {
matches!(self, Stage::Coding | Stage::Qa | Stage::Merge { .. })
}
/// Returns true if this is the Upcoming variant.
pub fn is_upcoming(&self) -> bool {
matches!(self, Stage::Upcoming)
}
/// Returns the filesystem directory name for this stage.
pub fn dir_name(&self) -> &'static str {
stage_dir_name(self)
@@ -223,8 +213,8 @@ impl Stage {
/// stage-directory string (from CRDT fields or watcher events) into a
/// typed `Stage`. Rich variants (`Done`, `Archived`, `Merge`) are
/// synthesised with zero-value fields — callers should use this only for
/// stage *classification* (e.g. `is_active()`, `matches!`), not for
/// accessing the rich metadata fields.
/// stage *classification* (via `matches!`), not for accessing the rich
/// metadata fields.
pub fn from_dir(s: &str) -> Option<Self> {
match s {
"upcoming" => Some(Stage::Upcoming),
+6 -7
View File
@@ -5,7 +5,6 @@ use crate::agents::{AgentPool, ReconciliationEvent};
use crate::config;
use crate::gateway_relay;
use crate::io;
use crate::pipeline_state;
use crate::service;
use crate::service::status::StatusBroadcaster;
use std::path::PathBuf;
@@ -58,18 +57,18 @@ pub(crate) fn spawn_event_bridges(
}
}
// Auto-assign: trigger `auto_assign_available_work` whenever a work item
// enters an active pipeline stage (2_current/, 3_qa/, 4_merge/).
// Auto-assign: trigger `auto_assign_available_work` on every work-item
// CRDT state-transition event. auto_assign_available_work is idempotent
// and noops where there is nothing to do, so firing on every transition
// ensures that MergeFailure and other non-"active" stages are covered
// without any per-stage special-casing.
if let Some(root) = project_root {
let watcher_auto_rx = watcher_tx.subscribe();
let watcher_auto_agents = Arc::clone(&agents);
tokio::spawn(async move {
let mut rx = watcher_auto_rx;
while let Ok(event) = rx.recv().await {
if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event
&& pipeline_state::Stage::from_dir(stage.as_str())
.is_some_and(|s| s.is_active())
{
if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event {
crate::slog!(
"[auto-assign] CRDT transition detected in {stage}/; \
triggering auto-assign."