huskies: merge 831
This commit is contained in:
@@ -0,0 +1,225 @@
|
||||
//! Merge pipeline runner — start_merge_agent_work and run_merge_pipeline.
|
||||
use crate::slog;
|
||||
use crate::worktree;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::super::super::AgentPool;
|
||||
use super::time::{
|
||||
decode_server_start_time, encode_server_start_time, server_start_time, unix_now,
|
||||
};
|
||||
|
||||
impl AgentPool {
|
||||
/// Start the merge pipeline as a background task.
|
||||
///
|
||||
/// Returns immediately so the MCP tool call doesn't time out (the full
|
||||
/// pipeline — squash merge + quality gates — takes well over 60 seconds,
|
||||
/// exceeding Claude Code's MCP tool-call timeout).
|
||||
///
|
||||
/// The mergemaster agent should poll [`get_merge_status`](Self::get_merge_status)
|
||||
/// until the job reaches a terminal state.
|
||||
pub fn start_merge_agent_work(
|
||||
self: &Arc<Self>,
|
||||
project_root: &Path,
|
||||
story_id: &str,
|
||||
) -> Result<(), String> {
|
||||
// Sweep stale Running entries left behind by dead processes before
|
||||
// applying the double-start guard. This handles the case where the
|
||||
// server crashed mid-merge: the next attempt finds a Running entry
|
||||
// whose owning process is gone and clears it automatically.
|
||||
if let Some(jobs) = crate::crdt_state::read_all_merge_jobs() {
|
||||
let current_boot = server_start_time();
|
||||
for job in jobs {
|
||||
if job.status != "running" {
|
||||
continue;
|
||||
}
|
||||
let stale = match decode_server_start_time(job.error.as_deref()) {
|
||||
Some(t) => t < current_boot,
|
||||
None => true, // Legacy (pid-encoded) or malformed: stale
|
||||
};
|
||||
if stale {
|
||||
slog!(
|
||||
"[merge] Cleared stale Running merge job for '{}' (server restarted)",
|
||||
job.story_id
|
||||
);
|
||||
crate::crdt_state::delete_merge_job(&job.story_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Guard against double-starts; clear any completed/failed entry so the
|
||||
// caller can retry without needing to call a separate cleanup step.
|
||||
if let Some(job) = crate::crdt_state::read_merge_job(story_id) {
|
||||
match job.status.as_str() {
|
||||
"running" => {
|
||||
return Err(format!(
|
||||
"Merge already in progress for '{story_id}'. \
|
||||
Use get_merge_status to poll for completion."
|
||||
));
|
||||
}
|
||||
// Completed or Failed: clear stale entry so we can start fresh.
|
||||
_ => {
|
||||
crate::crdt_state::delete_merge_job(story_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Insert Running job into CRDT.
|
||||
let started_at = unix_now();
|
||||
crate::crdt_state::write_merge_job(
|
||||
story_id,
|
||||
"running",
|
||||
started_at,
|
||||
None,
|
||||
Some(&encode_server_start_time(server_start_time())),
|
||||
);
|
||||
|
||||
let pool = Arc::clone(self);
|
||||
let root = project_root.to_path_buf();
|
||||
let sid = story_id.to_string();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let report = pool.run_merge_pipeline(&root, &sid).await;
|
||||
let success = matches!(&report, Ok(r) if r.success);
|
||||
|
||||
let finished_at = unix_now();
|
||||
|
||||
// On any failure: record merge_failure in CRDT and emit notification.
|
||||
if !success {
|
||||
let reason = match &report {
|
||||
Ok(r) => {
|
||||
if r.had_conflicts {
|
||||
format!(
|
||||
"Merge conflict: {}",
|
||||
r.conflict_details
|
||||
.as_deref()
|
||||
.unwrap_or("conflicts detected")
|
||||
)
|
||||
} else {
|
||||
format!("Quality gates failed: {}", r.gate_output)
|
||||
}
|
||||
}
|
||||
Err(e) => e.clone(),
|
||||
};
|
||||
let is_no_commits = reason.contains("no commits to merge");
|
||||
if let Some(contents) = crate::db::read_content(&sid) {
|
||||
let with_failure = crate::io::story_metadata::write_merge_failure_in_content(
|
||||
&contents, &reason,
|
||||
);
|
||||
let updated = if is_no_commits {
|
||||
crate::io::story_metadata::write_blocked_in_content(&with_failure)
|
||||
} else {
|
||||
with_failure
|
||||
};
|
||||
crate::db::write_content(&sid, &updated);
|
||||
crate::db::write_item_with_content(&sid, "4_merge", &updated);
|
||||
}
|
||||
if is_no_commits {
|
||||
let _ = pool
|
||||
.watcher_tx
|
||||
.send(crate::io::watcher::WatcherEvent::StoryBlocked {
|
||||
story_id: sid.clone(),
|
||||
reason,
|
||||
});
|
||||
} else {
|
||||
let _ = pool
|
||||
.watcher_tx
|
||||
.send(crate::io::watcher::WatcherEvent::MergeFailure {
|
||||
story_id: sid.clone(),
|
||||
reason,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Update CRDT with terminal status.
|
||||
match &report {
|
||||
Ok(r) => {
|
||||
let report_json = serde_json::to_string(r).unwrap_or_else(|_| String::new());
|
||||
crate::crdt_state::write_merge_job(
|
||||
&sid,
|
||||
"completed",
|
||||
started_at,
|
||||
Some(finished_at),
|
||||
Some(&report_json),
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
crate::crdt_state::write_merge_job(
|
||||
&sid,
|
||||
"failed",
|
||||
started_at,
|
||||
Some(finished_at),
|
||||
Some(e),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if !success {
|
||||
pool.auto_assign_available_work(&root).await;
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// The actual merge pipeline, run inside a background task.
|
||||
async fn run_merge_pipeline(
|
||||
self: &Arc<Self>,
|
||||
project_root: &Path,
|
||||
story_id: &str,
|
||||
) -> Result<crate::agents::merge::MergeReport, String> {
|
||||
let branch = format!("feature/story-{story_id}");
|
||||
let wt_path = worktree::worktree_path(project_root, story_id);
|
||||
let root = project_root.to_path_buf();
|
||||
let sid = story_id.to_string();
|
||||
let br = branch.clone();
|
||||
|
||||
let merge_result = tokio::task::spawn_blocking(move || {
|
||||
crate::agents::merge::run_squash_merge(&root, &br, &sid)
|
||||
})
|
||||
.await
|
||||
.map_err(|e| format!("Merge task panicked: {e}"))??;
|
||||
|
||||
if !merge_result.success {
|
||||
return Ok(crate::agents::merge::MergeReport {
|
||||
story_id: story_id.to_string(),
|
||||
success: false,
|
||||
had_conflicts: merge_result.had_conflicts,
|
||||
conflicts_resolved: merge_result.conflicts_resolved,
|
||||
conflict_details: merge_result.conflict_details,
|
||||
gates_passed: merge_result.gates_passed,
|
||||
gate_output: merge_result.output,
|
||||
worktree_cleaned_up: false,
|
||||
story_archived: false,
|
||||
});
|
||||
}
|
||||
|
||||
let story_archived = crate::agents::lifecycle::move_story_to_done(story_id).is_ok();
|
||||
if story_archived {
|
||||
self.remove_agents_for_story(story_id);
|
||||
}
|
||||
|
||||
let worktree_cleaned_up = if wt_path.exists() {
|
||||
let config = crate::config::ProjectConfig::load(project_root).unwrap_or_default();
|
||||
worktree::remove_worktree_by_story_id(project_root, story_id, &config)
|
||||
.await
|
||||
.is_ok()
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
self.auto_assign_available_work(project_root).await;
|
||||
|
||||
Ok(crate::agents::merge::MergeReport {
|
||||
story_id: story_id.to_string(),
|
||||
success: true,
|
||||
had_conflicts: merge_result.had_conflicts,
|
||||
conflicts_resolved: merge_result.conflicts_resolved,
|
||||
conflict_details: merge_result.conflict_details,
|
||||
gates_passed: true,
|
||||
gate_output: merge_result.output,
|
||||
worktree_cleaned_up,
|
||||
story_archived,
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user