huskies: merge 1066

This commit is contained in:
dave
2026-05-14 23:39:56 +00:00
parent bf813d910b
commit bb6a6063e8
15 changed files with 361 additions and 120 deletions
+240 -17
View File
@@ -156,6 +156,17 @@ pub(crate) fn spawn_tick_loop(
{scheduled_count} scheduled timer(s)"
);
let (reconcile_interval, done_retention) = root
.as_ref()
.and_then(|r| config::ProjectConfig::load(r).ok())
.map(|c| {
(
c.watcher.reconcile_interval_secs,
std::time::Duration::from_secs(c.watcher.done_retention_secs),
)
})
.unwrap_or((30, std::time::Duration::from_secs(4 * 3600)));
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
let mut tick_count: u64 = 0;
@@ -190,6 +201,15 @@ pub(crate) fn spawn_tick_loop(
}
agents.reap_stale_merge_jobs();
}
// Periodic reconciler: converge subscriber side effects so that
// Lagged broadcast events never leave state permanently diverged.
if tick_count.is_multiple_of(reconcile_interval)
&& let Some(ref r) = root
{
crate::slog!("[reconcile] Running periodic reconcile pass.");
run_reconcile_pass(r, &agents, done_retention).await;
}
}
});
}
@@ -450,16 +470,50 @@ async fn execute_prompt_action(
}
}
/// Spawn the startup reconstruction task: replay the current pipeline state
/// through the [`TransitionFired`][crate::pipeline_state::TransitionFired]
/// broadcast channel so that all existing subscribers (worktree lifecycle,
/// merge-failure auto-spawn, auto-assign) react identically to a live
/// transition, then trigger a full auto-assign pass.
/// Run one full reconcile pass: call each subscriber's idempotent `reconcile()`
/// entry point so that side effects converge regardless of whether the
/// broadcast channel lagged during startup or at runtime.
///
/// Replaces the legacy scan-based `reconcile_on_startup` approach. The CRDT
/// is the durable source of truth; replaying it as synthetic self-transitions
/// is cheaper, simpler, and idempotent: a second replay produces another burst
/// of events that subscribers safely ignore for already-assigned stories.
/// Safe to call any number of times — every reconcile function is idempotent.
pub(crate) async fn run_reconcile_pass(
root: &std::path::Path,
agents: &Arc<AgentPool>,
done_retention: std::time::Duration,
) {
// Content-GC: purge content-store entries for terminal/tombstoned stories.
crate::db::gc::sweep_zombie_content_on_startup();
// Worktree create: ensure every Coding story has a worktree.
crate::agents::pool::worktree_lifecycle::reconcile_worktree_create(root, agents.port()).await;
// Worktree cleanup: remove worktrees for terminal stories.
crate::agents::pool::worktree_lifecycle::reconcile_worktree_cleanup(root).await;
// Done-archive: archive Done stories whose retention period has elapsed.
crate::io::watcher::sweep_done_to_archived(done_retention);
// Cost-rollup: re-populate the in-memory register from disk.
crate::agents::pool::cost_rollup_subscriber::reconcile_cost_rollup(root);
// Merge-failure: spawn mergemaster for ConflictDetected stories with no active agent.
crate::agents::pool::auto_assign::reconcile_merge_failure(agents, root).await;
// Merge-block: no-op (in-memory counter cannot be reconstructed from CRDT).
crate::agents::pool::auto_assign::reconcile_merge_failure_block();
// Audit-log: no-op (historical replay would produce misleading entries).
crate::pipeline_state::reconcile_audit_log();
}
/// Spawn the startup reconciliation task: run a full reconcile pass so that all
/// side-effect subscribers converge on the current CRDT state without flooding
/// the broadcast channel, then trigger a full auto-assign pass.
///
/// Replaces the former `replay_current_pipeline_state()` approach, which
/// sent one synthetic `TransitionFired` per CRDT item through the broadcast
/// channel. With >256 items that caused `Subscriber lagged` warnings and
/// left subscribers with diverged state. Direct reconcile calls bypass the
/// channel entirely and scale to any CRDT size.
pub(crate) fn spawn_startup_reconciliation(
startup_root: Option<PathBuf>,
startup_agents: Arc<AgentPool>,
@@ -467,20 +521,189 @@ pub(crate) fn spawn_startup_reconciliation(
) {
if let Some(root) = startup_root {
tokio::spawn(async move {
// Purge content-store entries for stories that reached terminal
// stages in a previous session (before the GC subscriber was active).
crate::db::gc::sweep_zombie_content_on_startup();
crate::slog!(
"[startup] Replaying current pipeline state through TransitionFired channel."
);
crate::pipeline_state::replay_current_pipeline_state();
let done_retention = crate::config::ProjectConfig::load(&root)
.map(|c| std::time::Duration::from_secs(c.watcher.done_retention_secs))
.unwrap_or_else(|_| std::time::Duration::from_secs(4 * 3600));
crate::slog!("[startup] Running per-subscriber reconcile pass.");
run_reconcile_pass(&root, &startup_agents, done_retention).await;
crate::slog!("[auto-assign] Scanning pipeline stages for unassigned work.");
startup_agents.auto_assign_available_work(&root).await;
let _ = startup_reconciliation_tx.send(ReconciliationEvent {
story_id: String::new(),
status: "done".to_string(),
message: "Startup event replay complete.".to_string(),
message: "Startup reconcile pass complete.".to_string(),
});
});
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::{
ContentKey, ItemMeta, ensure_content_store, write_content, write_item_with_content,
};
use crate::io::watcher::WatcherEvent;
use tokio::sync::broadcast;
fn make_pool() -> Arc<AgentPool> {
let (tx, _) = broadcast::channel::<WatcherEvent>(16);
Arc::new(AgentPool::new(3099, tx))
}
fn setup_huskies_dir(tmp: &tempfile::TempDir) -> std::path::PathBuf {
let root = tmp.path().to_path_buf();
std::fs::create_dir_all(root.join(".huskies")).unwrap();
std::fs::write(root.join(".huskies/project.toml"), "").unwrap();
root
}
/// AC4 + AC6: seeding >256 CRDT items and running the reconcile pass must not
/// produce any "Subscriber lagged" warnings (structural guarantee — the new
/// path never broadcasts through the channel) and must purge zombie content
/// for all terminal stories after one reconcile tick.
///
/// Distribution: 300 Backlog + 200 Coding + 200 Abandoned (terminal) + 300 QA
/// = 1000 items. Each of the 200 Abandoned stories gets a content-store entry
/// seeded before the reconcile so we can assert it is cleaned up.
#[tokio::test]
async fn reconcile_pass_scales_to_1000_items_without_lagged_divergence() {
crate::crdt_state::init_for_test();
ensure_content_store();
let tmp = tempfile::tempdir().unwrap();
let root = setup_huskies_dir(&tmp);
let pool = make_pool();
// ── Seed 1000 items across several stages ──────────────────────────
for i in 0..300u32 {
let id = format!("1066_backlog_{i:04}");
write_item_with_content(
&id,
"1_backlog",
"---\nname: Backlog\n---\n",
ItemMeta::named("Backlog"),
);
}
for i in 0..200u32 {
let id = format!("1066_coding_{i:04}");
write_item_with_content(
&id,
"2_current",
"---\nname: Coding\n---\n",
ItemMeta::named("Coding"),
);
}
for i in 0..200u32 {
let id = format!("1066_abandoned_{i:04}");
write_item_with_content(
&id,
"2_current",
"---\nname: Abandoned\n---\n",
ItemMeta::named("Abandoned"),
);
// Move to terminal stage (Abandoned).
crate::agents::lifecycle::abandon_story(&id).expect("abandon must succeed");
// Seed a content-store entry to verify GC cleans it up.
write_content(ContentKey::Story(&id), "zombie content");
}
for i in 0..300u32 {
let id = format!("1066_qa_{i:04}");
write_item_with_content(&id, "3_qa", "---\nname: QA\n---\n", ItemMeta::named("QA"));
}
// ── Subscribe BEFORE the reconcile to catch any Lagged events ──────
let mut transition_rx = crate::pipeline_state::subscribe_transitions();
// ── Run one reconcile pass ─────────────────────────────────────────
// Use zero retention so any Done items (none here, but defensive) archive immediately.
run_reconcile_pass(&root, &pool, std::time::Duration::ZERO).await;
// ── Drain the transition channel; must contain zero Lagged events ──
// The reconcile path never broadcasts through TRANSITION_TX, so any
// events here are from the abandon_story calls above (all pre-reconcile).
let mut lagged_count = 0u64;
loop {
match transition_rx.try_recv() {
Ok(_) => {}
Err(tokio::sync::broadcast::error::TryRecvError::Lagged(n)) => {
lagged_count += n;
}
Err(tokio::sync::broadcast::error::TryRecvError::Empty)
| Err(tokio::sync::broadcast::error::TryRecvError::Closed) => break,
}
}
// The reconcile pass itself must not have sent anything through the channel.
// (abandon_story above may have sent some events, but those are pre-reconcile
// lifecycle transitions, not the reconcile itself.)
assert_eq!(
lagged_count, 0,
"run_reconcile_pass must not broadcast through the transition channel (no Lagged)"
);
// ── Assert: zombie content purged for all 200 Abandoned stories ────
for i in 0..200u32 {
let id = format!("1066_abandoned_{i:04}");
assert!(
crate::db::read_content(ContentKey::Story(&id)).is_none(),
"zombie content must be purged for abandoned story {id}"
);
}
}
/// AC4 regression: the subscriber channel (capacity 256) must not lag when
/// 1000 items are seeded — the reconcile path bypasses the channel entirely.
#[tokio::test]
async fn reconcile_never_floods_broadcast_channel() {
crate::crdt_state::init_for_test();
ensure_content_store();
let tmp = tempfile::tempdir().unwrap();
let root = setup_huskies_dir(&tmp);
let pool = make_pool();
// Seed 1000 Backlog items (no lifecycle transitions — clean slate).
for i in 0..1000u32 {
let id = format!("1066_flood_{i:04}");
write_item_with_content(
&id,
"1_backlog",
"---\nname: Flood\n---\n",
ItemMeta::named("Flood"),
);
}
// Subscribe after seeding and drain any pre-existing channel noise from
// concurrent tests before checking that the reconcile pass adds nothing.
let mut rx = crate::pipeline_state::subscribe_transitions();
while let Ok(_) | Err(tokio::sync::broadcast::error::TryRecvError::Lagged(_)) =
rx.try_recv()
{}
run_reconcile_pass(&root, &pool, std::time::Duration::ZERO).await;
// The channel must have received exactly zero messages from run_reconcile_pass.
let mut msg_count = 0u64;
let mut lagged = false;
loop {
match rx.try_recv() {
Ok(_) => msg_count += 1,
Err(tokio::sync::broadcast::error::TryRecvError::Lagged(_)) => {
lagged = true;
break;
}
Err(_) => break,
}
}
assert!(
!lagged,
"run_reconcile_pass must never cause Lagged on the broadcast channel"
);
assert_eq!(
msg_count, 0,
"run_reconcile_pass must not send any TransitionFired events"
);
}
}