huskies: merge 1016
This commit is contained in:
@@ -198,16 +198,10 @@ pub async fn run(
|
|||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
// Reconcile any committed work from a previous session.
|
// Replay current pipeline state so subscribers (worktree lifecycle, merge-failure
|
||||||
{
|
// auto-spawn) react to any stories already in active stages, then auto-assign.
|
||||||
let recon_agents = Arc::clone(&agents);
|
slog!("[agent-mode] Replaying current pipeline state.");
|
||||||
let recon_root = project_root.clone();
|
crate::pipeline_state::replay_current_pipeline_state();
|
||||||
let (recon_tx, _) = broadcast::channel(64);
|
|
||||||
slog!("[agent-mode] Reconciling completed worktrees from previous session.");
|
|
||||||
recon_agents
|
|
||||||
.reconcile_on_startup(&recon_root, &recon_tx)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run initial auto-assign.
|
// Run initial auto-assign.
|
||||||
slog!("[agent-mode] Initial auto-assign scan.");
|
slog!("[agent-mode] Initial auto-assign scan.");
|
||||||
|
|||||||
@@ -568,4 +568,72 @@ mod tests {
|
|||||||
found {active_coder_count} active entries"
|
found {active_coder_count} active entries"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── AC4: startup event replay + pool reconstruction ──────────────────
|
||||||
|
|
||||||
|
/// AC4: Simulates a server restart by seeding the CRDT with a story in
|
||||||
|
/// Coding stage, calling `replay_current_pipeline_state` (the new startup
|
||||||
|
/// path), then `auto_assign_available_work`. Asserts the pool ends in the
|
||||||
|
/// expected state: exactly one agent assigned to the story.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn startup_replay_followed_by_auto_assign_assigns_agent_once() {
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let sk = tmp.path().join(".huskies");
|
||||||
|
std::fs::create_dir_all(&sk).unwrap();
|
||||||
|
std::fs::write(
|
||||||
|
sk.join("project.toml"),
|
||||||
|
"[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n",
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
crate::db::ensure_content_store();
|
||||||
|
let story_id = "9903_restart_replay";
|
||||||
|
crate::db::write_item_with_content(
|
||||||
|
story_id,
|
||||||
|
"2_current",
|
||||||
|
"---\nname: Restart Replay\n---\n",
|
||||||
|
crate::db::ItemMeta::named("Restart Replay"),
|
||||||
|
);
|
||||||
|
|
||||||
|
let pool = AgentPool::new_test(3001);
|
||||||
|
|
||||||
|
// Simulate startup: replay current state, then auto-assign.
|
||||||
|
crate::pipeline_state::replay_current_pipeline_state();
|
||||||
|
pool.auto_assign_available_work(tmp.path()).await;
|
||||||
|
|
||||||
|
let count_after_first = {
|
||||||
|
let agents = pool.agents.lock().unwrap();
|
||||||
|
agents
|
||||||
|
.iter()
|
||||||
|
.filter(|(key, a)| {
|
||||||
|
key.contains(story_id)
|
||||||
|
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
||||||
|
})
|
||||||
|
.count()
|
||||||
|
};
|
||||||
|
|
||||||
|
// AC3 (idempotency): replaying twice must not double-spawn agents.
|
||||||
|
crate::pipeline_state::replay_current_pipeline_state();
|
||||||
|
pool.auto_assign_available_work(tmp.path()).await;
|
||||||
|
|
||||||
|
let count_after_second = {
|
||||||
|
let agents = pool.agents.lock().unwrap();
|
||||||
|
agents
|
||||||
|
.iter()
|
||||||
|
.filter(|(key, a)| {
|
||||||
|
key.contains(story_id)
|
||||||
|
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
||||||
|
})
|
||||||
|
.count()
|
||||||
|
};
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
count_after_first <= 1,
|
||||||
|
"after first replay+assign at most one agent must be assigned to {story_id}"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
count_after_first, count_after_second,
|
||||||
|
"second replay must not spawn additional agents (idempotency)"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ mod merge;
|
|||||||
/// TransitionFired subscriber that auto-spawns mergemaster on ConflictDetected merge failures.
|
/// TransitionFired subscriber that auto-spawns mergemaster on ConflictDetected merge failures.
|
||||||
pub(crate) mod merge_failure_subscriber;
|
pub(crate) mod merge_failure_subscriber;
|
||||||
mod pipeline;
|
mod pipeline;
|
||||||
mod reconcile;
|
|
||||||
mod scan;
|
mod scan;
|
||||||
mod story_checks;
|
mod story_checks;
|
||||||
pub(crate) mod watchdog;
|
pub(crate) mod watchdog;
|
||||||
|
|||||||
@@ -1,526 +1,4 @@
|
|||||||
//! Startup reconciliation: detect stories with committed work and advance the pipeline.
|
//! Scan-based startup reconciliation deleted in story 1016.
|
||||||
|
// Server-restart pool reconstruction now uses TransitionFired event replay.
|
||||||
use std::path::Path;
|
// See: pipeline_state::replay_current_pipeline_state
|
||||||
use tokio::sync::broadcast;
|
// and: startup::tick_loop::spawn_startup_reconciliation
|
||||||
|
|
||||||
use crate::pipeline_state::Stage;
|
|
||||||
use crate::worktree;
|
|
||||||
|
|
||||||
use super::super::super::ReconciliationEvent;
|
|
||||||
use super::super::{AgentPool, find_active_story_stage};
|
|
||||||
|
|
||||||
impl AgentPool {
|
|
||||||
/// Reconcile stories whose agent work was committed while the server was offline.
|
|
||||||
///
|
|
||||||
/// On server startup the in-memory agent pool is empty, so any story that an agent
|
|
||||||
/// completed during a previous session is stuck: the worktree has committed work but
|
|
||||||
/// the pipeline never advanced. This method detects those stories, re-runs the
|
|
||||||
/// acceptance gates, and advances the pipeline stage so that `auto_assign_available_work`
|
|
||||||
/// (called immediately after) picks up the right next-stage agents.
|
|
||||||
///
|
|
||||||
/// Algorithm:
|
|
||||||
/// 1. List all worktree directories under `{project_root}/.huskies/worktrees/`.
|
|
||||||
/// 2. For each worktree, check whether its feature branch has commits ahead of the
|
|
||||||
/// base branch (`master` / `main`).
|
|
||||||
/// 3. If committed work is found AND the story is in `2_current/` or `3_qa/`:
|
|
||||||
/// - Run acceptance gates (uncommitted-change check + clippy + tests).
|
|
||||||
/// - On pass + `2_current/`: move the story to `3_qa/`.
|
|
||||||
/// - On pass + `3_qa/`: run the coverage gate; if that also passes move to `4_merge/`.
|
|
||||||
/// - On failure: leave the story where it is so `auto_assign_available_work` can
|
|
||||||
/// start a fresh agent to retry.
|
|
||||||
/// 4. Stories in `4_merge/` are left for `auto_assign_available_work` to handle via a
|
|
||||||
/// fresh mergemaster (squash-merge must be re-executed by the mergemaster agent).
|
|
||||||
pub async fn reconcile_on_startup(
|
|
||||||
&self,
|
|
||||||
project_root: &Path,
|
|
||||||
progress_tx: &broadcast::Sender<ReconciliationEvent>,
|
|
||||||
) {
|
|
||||||
let worktrees = match worktree::list_worktrees(project_root) {
|
|
||||||
Ok(wt) => wt,
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("[startup:reconcile] Failed to list worktrees: {e}");
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: String::new(),
|
|
||||||
status: "done".to_string(),
|
|
||||||
message: format!("Reconciliation failed: {e}"),
|
|
||||||
});
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
for wt_entry in &worktrees {
|
|
||||||
let story_id = &wt_entry.story_id;
|
|
||||||
let wt_path = wt_entry.path.clone();
|
|
||||||
|
|
||||||
// Determine which active stage the story is in.
|
|
||||||
let stage = match find_active_story_stage(project_root, story_id) {
|
|
||||||
Some(s) => s,
|
|
||||||
None => continue, // Not in any active stage (backlog/archived or unknown).
|
|
||||||
};
|
|
||||||
|
|
||||||
// 4_merge/ is left for auto_assign to handle with a fresh mergemaster.
|
|
||||||
if matches!(stage, Stage::Merge { .. }) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "checking".to_string(),
|
|
||||||
message: format!("Checking for committed work in {}/", stage.dir_name()),
|
|
||||||
});
|
|
||||||
|
|
||||||
// Check whether the worktree has commits ahead of the base branch.
|
|
||||||
let wt_path_for_check = wt_path.clone();
|
|
||||||
let has_work = tokio::task::spawn_blocking(move || {
|
|
||||||
crate::agents::gates::worktree_has_committed_work(&wt_path_for_check)
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap_or(false);
|
|
||||||
|
|
||||||
if !has_work {
|
|
||||||
eprintln!(
|
|
||||||
"[startup:reconcile] No committed work for '{story_id}' in {}/; skipping.",
|
|
||||||
stage.dir_name()
|
|
||||||
);
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "skipped".to_string(),
|
|
||||||
message: "No committed work found; skipping.".to_string(),
|
|
||||||
});
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
eprintln!(
|
|
||||||
"[startup:reconcile] Found committed work for '{story_id}' in {}/. Running acceptance gates.",
|
|
||||||
stage.dir_name()
|
|
||||||
);
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "gates_running".to_string(),
|
|
||||||
message: "Running acceptance gates…".to_string(),
|
|
||||||
});
|
|
||||||
|
|
||||||
// Run acceptance gates on the worktree.
|
|
||||||
let wt_path_for_gates = wt_path.clone();
|
|
||||||
let gates_result = tokio::task::spawn_blocking(move || {
|
|
||||||
crate::agents::gates::check_uncommitted_changes(&wt_path_for_gates)?;
|
|
||||||
crate::agents::gates::run_acceptance_gates(&wt_path_for_gates)
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let (gates_passed, gate_output) = match gates_result {
|
|
||||||
Ok(Ok(outcome)) => (outcome.passed, outcome.output),
|
|
||||||
Ok(Err(e)) => {
|
|
||||||
eprintln!("[startup:reconcile] Gate check error for '{story_id}': {e}");
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "failed".to_string(),
|
|
||||||
message: format!("Gate error: {e}"),
|
|
||||||
});
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("[startup:reconcile] Gate check task panicked for '{story_id}': {e}");
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "failed".to_string(),
|
|
||||||
message: format!("Gate task panicked: {e}"),
|
|
||||||
});
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if !gates_passed {
|
|
||||||
eprintln!(
|
|
||||||
"[startup:reconcile] Gates failed for '{story_id}': {gate_output}\n\
|
|
||||||
Leaving in {}/ for auto-assign to restart the agent.",
|
|
||||||
stage.dir_name()
|
|
||||||
);
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "failed".to_string(),
|
|
||||||
message: "Gates failed; will be retried by auto-assign.".to_string(),
|
|
||||||
});
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
eprintln!(
|
|
||||||
"[startup:reconcile] Gates passed for '{story_id}' (stage: {}/).",
|
|
||||||
stage.dir_name()
|
|
||||||
);
|
|
||||||
|
|
||||||
if matches!(stage, Stage::Coding { .. }) {
|
|
||||||
// Coder stage — determine qa mode to decide next step.
|
|
||||||
let qa_mode = {
|
|
||||||
let item_type = crate::agents::lifecycle::item_type_from_id(story_id);
|
|
||||||
if item_type == "spike" {
|
|
||||||
crate::io::story_metadata::QaMode::Human
|
|
||||||
} else {
|
|
||||||
let default_qa = crate::config::ProjectConfig::load(project_root)
|
|
||||||
.unwrap_or_default()
|
|
||||||
.default_qa_mode();
|
|
||||||
crate::io::story_metadata::resolve_qa_mode(story_id, default_qa)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match qa_mode {
|
|
||||||
crate::io::story_metadata::QaMode::Server => {
|
|
||||||
if let Err(e) = crate::agents::move_story_to_merge(story_id) {
|
|
||||||
eprintln!(
|
|
||||||
"[startup:reconcile] Failed to move '{story_id}' to 4_merge/: {e}"
|
|
||||||
);
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "failed".to_string(),
|
|
||||||
message: format!("Failed to advance to merge: {e}"),
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
eprintln!(
|
|
||||||
"[startup:reconcile] Moved '{story_id}' → 4_merge/ (qa: server)."
|
|
||||||
);
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "advanced".to_string(),
|
|
||||||
message: "Gates passed — moved to merge (qa: server).".to_string(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
crate::io::story_metadata::QaMode::Agent => {
|
|
||||||
if let Err(e) = crate::agents::move_story_to_qa(story_id) {
|
|
||||||
eprintln!(
|
|
||||||
"[startup:reconcile] Failed to move '{story_id}' to 3_qa/: {e}"
|
|
||||||
);
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "failed".to_string(),
|
|
||||||
message: format!("Failed to advance to QA: {e}"),
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
eprintln!("[startup:reconcile] Moved '{story_id}' → 3_qa/.");
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "advanced".to_string(),
|
|
||||||
message: "Gates passed — moved to QA.".to_string(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
crate::io::story_metadata::QaMode::Human => {
|
|
||||||
if let Err(e) = crate::agents::move_story_to_qa(story_id) {
|
|
||||||
eprintln!(
|
|
||||||
"[startup:reconcile] Failed to move '{story_id}' to 3_qa/: {e}"
|
|
||||||
);
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "failed".to_string(),
|
|
||||||
message: format!("Failed to advance to QA: {e}"),
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
// Story 945: ReviewHold is a typed Stage variant.
|
|
||||||
let _ = crate::pipeline_state::apply_transition(
|
|
||||||
story_id,
|
|
||||||
crate::pipeline_state::PipelineEvent::ReviewHold {
|
|
||||||
reason: "qa: human — gates passed, awaiting review".to_string(),
|
|
||||||
},
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
eprintln!(
|
|
||||||
"[startup:reconcile] Moved '{story_id}' → review_hold (qa: human — holding for review)."
|
|
||||||
);
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "review_hold".to_string(),
|
|
||||||
message: "Gates passed — holding for human review.".to_string(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if matches!(stage, Stage::Qa) {
|
|
||||||
// QA stage → run coverage gate before advancing to merge.
|
|
||||||
let wt_path_for_cov = wt_path.clone();
|
|
||||||
let coverage_result = tokio::task::spawn_blocking(move || {
|
|
||||||
crate::agents::gates::run_coverage_gate(&wt_path_for_cov)
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let (coverage_passed, coverage_output) = match coverage_result {
|
|
||||||
Ok(Ok(pair)) => pair,
|
|
||||||
Ok(Err(e)) => {
|
|
||||||
eprintln!("[startup:reconcile] Coverage gate error for '{story_id}': {e}");
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "failed".to_string(),
|
|
||||||
message: format!("Coverage gate error: {e}"),
|
|
||||||
});
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!(
|
|
||||||
"[startup:reconcile] Coverage gate panicked for '{story_id}': {e}"
|
|
||||||
);
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "failed".to_string(),
|
|
||||||
message: format!("Coverage gate panicked: {e}"),
|
|
||||||
});
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if coverage_passed {
|
|
||||||
// Check whether this item needs human review before merging.
|
|
||||||
let needs_human_review = {
|
|
||||||
let item_type = crate::agents::lifecycle::item_type_from_id(story_id);
|
|
||||||
if item_type == "spike" {
|
|
||||||
true
|
|
||||||
} else {
|
|
||||||
let default_qa = crate::config::ProjectConfig::load(project_root)
|
|
||||||
.unwrap_or_default()
|
|
||||||
.default_qa_mode();
|
|
||||||
matches!(
|
|
||||||
crate::io::story_metadata::resolve_qa_mode(story_id, default_qa),
|
|
||||||
crate::io::story_metadata::QaMode::Human
|
|
||||||
)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if needs_human_review {
|
|
||||||
// Story 945: ReviewHold is a typed Stage variant.
|
|
||||||
let _ = crate::pipeline_state::apply_transition(
|
|
||||||
story_id,
|
|
||||||
crate::pipeline_state::PipelineEvent::ReviewHold {
|
|
||||||
reason: "Passed QA — waiting for human review.".to_string(),
|
|
||||||
},
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
eprintln!(
|
|
||||||
"[startup:reconcile] '{story_id}' passed QA — holding for human review."
|
|
||||||
);
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "review_hold".to_string(),
|
|
||||||
message: "Passed QA — waiting for human review.".to_string(),
|
|
||||||
});
|
|
||||||
} else if let Err(e) = crate::agents::move_story_to_merge(story_id) {
|
|
||||||
eprintln!(
|
|
||||||
"[startup:reconcile] Failed to move '{story_id}' to 4_merge/: {e}"
|
|
||||||
);
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "failed".to_string(),
|
|
||||||
message: format!("Failed to advance to merge: {e}"),
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
eprintln!("[startup:reconcile] Moved '{story_id}' → 4_merge/.");
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "advanced".to_string(),
|
|
||||||
message: "Gates passed — moved to merge.".to_string(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
eprintln!(
|
|
||||||
"[startup:reconcile] Coverage gate failed for '{story_id}': {coverage_output}\n\
|
|
||||||
Leaving in 3_qa/ for auto-assign to restart the QA agent."
|
|
||||||
);
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: story_id.clone(),
|
|
||||||
status: "failed".to_string(),
|
|
||||||
message: "Coverage gate failed; will be retried.".to_string(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Signal that reconciliation is complete.
|
|
||||||
let _ = progress_tx.send(ReconciliationEvent {
|
|
||||||
story_id: String::new(),
|
|
||||||
status: "done".to_string(),
|
|
||||||
message: "Startup reconciliation complete.".to_string(),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Tests ──────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use std::process::Command;
|
|
||||||
use tokio::sync::broadcast;
|
|
||||||
|
|
||||||
use super::super::super::AgentPool;
|
|
||||||
use crate::agents::ReconciliationEvent;
|
|
||||||
|
|
||||||
fn init_git_repo(repo: &std::path::Path) {
|
|
||||||
Command::new("git")
|
|
||||||
.args(["init"])
|
|
||||||
.current_dir(repo)
|
|
||||||
.output()
|
|
||||||
.unwrap();
|
|
||||||
Command::new("git")
|
|
||||||
.args(["config", "user.email", "test@test.com"])
|
|
||||||
.current_dir(repo)
|
|
||||||
.output()
|
|
||||||
.unwrap();
|
|
||||||
Command::new("git")
|
|
||||||
.args(["config", "user.name", "Test"])
|
|
||||||
.current_dir(repo)
|
|
||||||
.output()
|
|
||||||
.unwrap();
|
|
||||||
// Create initial commit so master branch exists.
|
|
||||||
std::fs::write(repo.join("README.md"), "# test\n").unwrap();
|
|
||||||
Command::new("git")
|
|
||||||
.args(["add", "."])
|
|
||||||
.current_dir(repo)
|
|
||||||
.output()
|
|
||||||
.unwrap();
|
|
||||||
Command::new("git")
|
|
||||||
.args(["commit", "-m", "initial"])
|
|
||||||
.current_dir(repo)
|
|
||||||
.output()
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn reconcile_on_startup_noop_when_no_worktrees() {
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
let (tx, _rx) = broadcast::channel(16);
|
|
||||||
// Should not panic; no worktrees to reconcile.
|
|
||||||
pool.reconcile_on_startup(tmp.path(), &tx).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn reconcile_on_startup_emits_done_event() {
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
let (tx, mut rx) = broadcast::channel::<ReconciliationEvent>(16);
|
|
||||||
pool.reconcile_on_startup(tmp.path(), &tx).await;
|
|
||||||
|
|
||||||
// Collect all events; the last must be "done".
|
|
||||||
let mut events: Vec<ReconciliationEvent> = Vec::new();
|
|
||||||
while let Ok(evt) = rx.try_recv() {
|
|
||||||
events.push(evt);
|
|
||||||
}
|
|
||||||
assert!(
|
|
||||||
events.iter().any(|e| e.status == "done"),
|
|
||||||
"reconcile_on_startup must emit a 'done' event; got: {:?}",
|
|
||||||
events.iter().map(|e| &e.status).collect::<Vec<_>>()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn reconcile_on_startup_skips_story_without_committed_work() {
|
|
||||||
use std::fs;
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let root = tmp.path();
|
|
||||||
|
|
||||||
// Set up story in 2_current/.
|
|
||||||
let current = root.join(".huskies/work/2_current");
|
|
||||||
fs::create_dir_all(¤t).unwrap();
|
|
||||||
fs::write(current.join("60_story_test.md"), "test").unwrap();
|
|
||||||
|
|
||||||
// Create a worktree directory that is a fresh git repo with no commits
|
|
||||||
// ahead of its own base branch (simulates a worktree where no work was done).
|
|
||||||
let wt_dir = root.join(".huskies/worktrees/60_story_test");
|
|
||||||
fs::create_dir_all(&wt_dir).unwrap();
|
|
||||||
init_git_repo(&wt_dir);
|
|
||||||
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
let (tx, _rx) = broadcast::channel(16);
|
|
||||||
pool.reconcile_on_startup(root, &tx).await;
|
|
||||||
|
|
||||||
// Story should still be in 2_current/ — nothing was reconciled.
|
|
||||||
assert!(
|
|
||||||
current.join("60_story_test.md").exists(),
|
|
||||||
"story should stay in 2_current/ when worktree has no committed work"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn reconcile_on_startup_runs_gates_on_worktree_with_committed_work() {
|
|
||||||
use std::fs;
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let root = tmp.path();
|
|
||||||
|
|
||||||
// Set up a git repo for the project root.
|
|
||||||
init_git_repo(root);
|
|
||||||
|
|
||||||
// Set up story in 2_current/ and commit it so the project root is clean.
|
|
||||||
let current = root.join(".huskies/work/2_current");
|
|
||||||
fs::create_dir_all(¤t).unwrap();
|
|
||||||
fs::write(current.join("61_story_test.md"), "test").unwrap();
|
|
||||||
Command::new("git")
|
|
||||||
.args(["add", "."])
|
|
||||||
.current_dir(root)
|
|
||||||
.output()
|
|
||||||
.unwrap();
|
|
||||||
Command::new("git")
|
|
||||||
.args([
|
|
||||||
"-c",
|
|
||||||
"user.email=test@test.com",
|
|
||||||
"-c",
|
|
||||||
"user.name=Test",
|
|
||||||
"commit",
|
|
||||||
"-m",
|
|
||||||
"add story",
|
|
||||||
])
|
|
||||||
.current_dir(root)
|
|
||||||
.output()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// Create a real git worktree for the story.
|
|
||||||
let wt_dir = root.join(".huskies/worktrees/61_story_test");
|
|
||||||
fs::create_dir_all(wt_dir.parent().unwrap()).unwrap();
|
|
||||||
Command::new("git")
|
|
||||||
.args([
|
|
||||||
"worktree",
|
|
||||||
"add",
|
|
||||||
&wt_dir.to_string_lossy(),
|
|
||||||
"-b",
|
|
||||||
"feature/story-61_story_test",
|
|
||||||
])
|
|
||||||
.current_dir(root)
|
|
||||||
.output()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// Add a commit to the feature branch (simulates coder completing work).
|
|
||||||
fs::write(wt_dir.join("implementation.txt"), "done").unwrap();
|
|
||||||
Command::new("git")
|
|
||||||
.args(["add", "."])
|
|
||||||
.current_dir(&wt_dir)
|
|
||||||
.output()
|
|
||||||
.unwrap();
|
|
||||||
Command::new("git")
|
|
||||||
.args([
|
|
||||||
"-c",
|
|
||||||
"user.email=test@test.com",
|
|
||||||
"-c",
|
|
||||||
"user.name=Test",
|
|
||||||
"commit",
|
|
||||||
"-m",
|
|
||||||
"implement story",
|
|
||||||
])
|
|
||||||
.current_dir(&wt_dir)
|
|
||||||
.output()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
assert!(
|
|
||||||
crate::agents::gates::worktree_has_committed_work(&wt_dir),
|
|
||||||
"test setup: worktree should have committed work"
|
|
||||||
);
|
|
||||||
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
let (tx, _rx) = broadcast::channel(16);
|
|
||||||
pool.reconcile_on_startup(root, &tx).await;
|
|
||||||
|
|
||||||
// In the test env, cargo clippy will fail (no Cargo.toml) so gates fail
|
|
||||||
// and the story stays in 2_current/. The important assertion is that
|
|
||||||
// reconcile ran without panicking and the story is in a consistent state.
|
|
||||||
let in_current = current.join("61_story_test.md").exists();
|
|
||||||
let in_qa = root.join(".huskies/work/3_qa/61_story_test.md").exists();
|
|
||||||
assert!(
|
|
||||||
in_current || in_qa,
|
|
||||||
"story should be in 2_current/ or 3_qa/ after reconciliation"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -24,7 +24,6 @@ use tokio::sync::broadcast;
|
|||||||
// Bring pool-internal types into pool's namespace so that sub-modules
|
// Bring pool-internal types into pool's namespace so that sub-modules
|
||||||
// (auto_assign, pipeline, etc.) can access them via `use super::...`.
|
// (auto_assign, pipeline, etc.) can access them via `use super::...`.
|
||||||
use types::{StoryAgent, composite_key};
|
use types::{StoryAgent, composite_key};
|
||||||
use worktree::find_active_story_stage;
|
|
||||||
|
|
||||||
/// Manages concurrent story agents, each in its own worktree.
|
/// Manages concurrent story agents, each in its own worktree.
|
||||||
pub struct AgentPool {
|
pub struct AgentPool {
|
||||||
|
|||||||
@@ -10,5 +10,5 @@ mod deps;
|
|||||||
mod parser;
|
mod parser;
|
||||||
mod types;
|
mod types;
|
||||||
|
|
||||||
pub use parser::{is_story_frozen_in_store, parse_unchecked_todos, resolve_qa_mode};
|
pub use parser::{is_story_frozen_in_store, parse_unchecked_todos};
|
||||||
pub use types::{ItemType, QaMode};
|
pub use types::{ItemType, QaMode};
|
||||||
|
|||||||
@@ -3,8 +3,6 @@
|
|||||||
//! Story 865 stripped YAML front matter from stored content and the codebase
|
//! Story 865 stripped YAML front matter from stored content and the codebase
|
||||||
//! at large; the only remaining functions here read the CRDT or operate on
|
//! at large; the only remaining functions here read the CRDT or operate on
|
||||||
//! the markdown body directly.
|
//! the markdown body directly.
|
||||||
use super::types::QaMode;
|
|
||||||
|
|
||||||
/// Parse unchecked todo items (`- [ ] ...`) from a markdown string.
|
/// Parse unchecked todo items (`- [ ] ...`) from a markdown string.
|
||||||
pub fn parse_unchecked_todos(contents: &str) -> Vec<String> {
|
pub fn parse_unchecked_todos(contents: &str) -> Vec<String> {
|
||||||
contents
|
contents
|
||||||
@@ -16,17 +14,6 @@ pub fn parse_unchecked_todos(contents: &str) -> Vec<String> {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Resolve the effective QA mode for a story by ID via the CRDT.
|
|
||||||
///
|
|
||||||
/// Returns `default` when the story has no entry or its `qa_mode` register is
|
|
||||||
/// unset. Spikes are **not** handled here — callers override to `Human` for
|
|
||||||
/// spikes themselves.
|
|
||||||
pub fn resolve_qa_mode(story_id: &str, default: QaMode) -> QaMode {
|
|
||||||
crate::crdt_state::read_item(story_id)
|
|
||||||
.and_then(|view| view.qa_mode())
|
|
||||||
.unwrap_or(default)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Return `true` if the story is currently in `Stage::Frozen`
|
/// Return `true` if the story is currently in `Stage::Frozen`
|
||||||
/// (story 945: frozen is a typed stage variant, not a flag).
|
/// (story 945: frozen is a typed stage variant, not a flag).
|
||||||
///
|
///
|
||||||
@@ -70,13 +57,4 @@ mod tests {
|
|||||||
let input = " - [ ] Indented item\n";
|
let input = " - [ ] Indented item\n";
|
||||||
assert_eq!(parse_unchecked_todos(input), vec!["Indented item"]);
|
assert_eq!(parse_unchecked_todos(input), vec!["Indented item"]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn resolve_qa_mode_falls_back_to_default_when_crdt_empty() {
|
|
||||||
crate::crdt_state::init_for_test();
|
|
||||||
assert_eq!(
|
|
||||||
resolve_qa_mode("9999_no_such_story", QaMode::Server),
|
|
||||||
QaMode::Server
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -36,6 +36,32 @@ pub(super) fn try_broadcast(fired: &TransitionFired) {
|
|||||||
let _ = get_or_init_tx().send(fired.clone());
|
let _ = get_or_init_tx().send(fired.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Replay the current CRDT pipeline state as a burst of synthetic
|
||||||
|
/// [`TransitionFired`] events at server startup.
|
||||||
|
///
|
||||||
|
/// Reads every item from the CRDT and broadcasts a self-transition
|
||||||
|
/// (`before == after`) for each one so that all existing subscribers
|
||||||
|
/// (worktree lifecycle, merge-failure auto-spawn, auto-assign) react
|
||||||
|
/// identically to a live event. This replaces the legacy scan-based
|
||||||
|
/// `reconcile_on_startup` path.
|
||||||
|
///
|
||||||
|
/// Idempotent: a second call produces another burst of events, but every
|
||||||
|
/// subscriber already guards against duplicate work (e.g.
|
||||||
|
/// `is_story_assigned_for_stage` returns true once an agent is running,
|
||||||
|
/// and worktree creation is a no-op when the worktree already exists).
|
||||||
|
pub fn replay_current_pipeline_state() {
|
||||||
|
for item in super::read_all_typed() {
|
||||||
|
let fired = TransitionFired {
|
||||||
|
story_id: item.story_id.clone(),
|
||||||
|
before: item.stage.clone(),
|
||||||
|
after: item.stage,
|
||||||
|
event: super::PipelineEvent::DepsMet,
|
||||||
|
at: chrono::Utc::now(),
|
||||||
|
};
|
||||||
|
try_broadcast(&fired);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Fired when a pipeline stage transition completes.
|
/// Fired when a pipeline stage transition completes.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct TransitionFired {
|
pub struct TransitionFired {
|
||||||
@@ -151,4 +177,58 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ── TransitionError Display ─────────────────────────────────────────
|
// ── TransitionError Display ─────────────────────────────────────────
|
||||||
|
|
||||||
|
// ── replay_current_pipeline_state ──────────────────────────────────
|
||||||
|
|
||||||
|
/// AC1: replay broadcasts a synthetic event for every item in the CRDT.
|
||||||
|
#[test]
|
||||||
|
fn replay_broadcasts_event_for_crdt_item_in_coding_stage() {
|
||||||
|
crate::crdt_state::init_for_test();
|
||||||
|
crate::db::ensure_content_store();
|
||||||
|
|
||||||
|
let story_id = "9901_replay_coding";
|
||||||
|
crate::db::write_item_with_content(
|
||||||
|
story_id,
|
||||||
|
"2_current",
|
||||||
|
"---\nname: Replay Coding\n---\n",
|
||||||
|
crate::db::ItemMeta::named("Replay Coding"),
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut rx = subscribe_transitions();
|
||||||
|
replay_current_pipeline_state();
|
||||||
|
|
||||||
|
let mut found = false;
|
||||||
|
while let Ok(fired) = rx.try_recv() {
|
||||||
|
if fired.story_id.0 == story_id && matches!(fired.after, Stage::Coding { .. }) {
|
||||||
|
found = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert!(
|
||||||
|
found,
|
||||||
|
"replay must broadcast a Coding event for a story in 2_current"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// AC3: calling replay_current_pipeline_state twice fires events both times.
|
||||||
|
///
|
||||||
|
/// Pool-state idempotency (no duplicate agents) is enforced by subscribers,
|
||||||
|
/// not by the replay function itself. This test verifies that replay is safe
|
||||||
|
/// to call multiple times without panicking.
|
||||||
|
#[test]
|
||||||
|
fn replay_twice_does_not_panic() {
|
||||||
|
crate::crdt_state::init_for_test();
|
||||||
|
crate::db::ensure_content_store();
|
||||||
|
|
||||||
|
let story_id = "9902_replay_idem";
|
||||||
|
crate::db::write_item_with_content(
|
||||||
|
story_id,
|
||||||
|
"3_qa",
|
||||||
|
"---\nname: Replay QA\n---\n",
|
||||||
|
crate::db::ItemMeta::named("Replay QA"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Two successive replays must not panic.
|
||||||
|
replay_current_pipeline_state();
|
||||||
|
replay_current_pipeline_state();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,7 +50,10 @@ pub use transition::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
pub use events::{EventBus, TransitionFired, TransitionSubscriber, subscribe_transitions};
|
pub use events::{
|
||||||
|
EventBus, TransitionFired, TransitionSubscriber, replay_current_pipeline_state,
|
||||||
|
subscribe_transitions,
|
||||||
|
};
|
||||||
|
|
||||||
#[allow(unused_imports)]
|
#[allow(unused_imports)]
|
||||||
pub use projection::ProjectionError;
|
pub use projection::ProjectionError;
|
||||||
|
|||||||
@@ -199,8 +199,16 @@ pub(crate) fn spawn_gateway_relay(startup_root: &Option<PathBuf>, status: Arc<St
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawn the startup reconciliation task: reconcile any stories whose agent
|
/// Spawn the startup reconstruction task: replay the current pipeline state
|
||||||
/// work was committed while the server was offline, then auto-assign free agents.
|
/// through the [`TransitionFired`][crate::pipeline_state::TransitionFired]
|
||||||
|
/// broadcast channel so that all existing subscribers (worktree lifecycle,
|
||||||
|
/// merge-failure auto-spawn, auto-assign) react identically to a live
|
||||||
|
/// transition, then trigger a full auto-assign pass.
|
||||||
|
///
|
||||||
|
/// Replaces the legacy scan-based `reconcile_on_startup` approach. The CRDT
|
||||||
|
/// is the durable source of truth; replaying it as synthetic self-transitions
|
||||||
|
/// is cheaper, simpler, and idempotent: a second replay produces another burst
|
||||||
|
/// of events that subscribers safely ignore for already-assigned stories.
|
||||||
pub(crate) fn spawn_startup_reconciliation(
|
pub(crate) fn spawn_startup_reconciliation(
|
||||||
startup_root: Option<PathBuf>,
|
startup_root: Option<PathBuf>,
|
||||||
startup_agents: Arc<AgentPool>,
|
startup_agents: Arc<AgentPool>,
|
||||||
@@ -211,12 +219,17 @@ pub(crate) fn spawn_startup_reconciliation(
|
|||||||
// Purge content-store entries for stories that reached terminal
|
// Purge content-store entries for stories that reached terminal
|
||||||
// stages in a previous session (before the GC subscriber was active).
|
// stages in a previous session (before the GC subscriber was active).
|
||||||
crate::db::gc::sweep_zombie_content_on_startup();
|
crate::db::gc::sweep_zombie_content_on_startup();
|
||||||
crate::slog!("[startup] Reconciling completed worktrees from previous session.");
|
crate::slog!(
|
||||||
startup_agents
|
"[startup] Replaying current pipeline state through TransitionFired channel."
|
||||||
.reconcile_on_startup(&root, &startup_reconciliation_tx)
|
);
|
||||||
.await;
|
crate::pipeline_state::replay_current_pipeline_state();
|
||||||
crate::slog!("[auto-assign] Scanning pipeline stages for unassigned work.");
|
crate::slog!("[auto-assign] Scanning pipeline stages for unassigned work.");
|
||||||
startup_agents.auto_assign_available_work(&root).await;
|
startup_agents.auto_assign_available_work(&root).await;
|
||||||
|
let _ = startup_reconciliation_tx.send(ReconciliationEvent {
|
||||||
|
story_id: String::new(),
|
||||||
|
status: "done".to_string(),
|
||||||
|
message: "Startup event replay complete.".to_string(),
|
||||||
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user