story-kit: merge 117_story_show_startup_reconciliation_progress_in_ui

This commit is contained in:
Dave
2026-02-23 22:50:57 +00:00
parent e3d9813707
commit 85fddcb71a
7 changed files with 341 additions and 8 deletions

View File

@@ -11,6 +11,18 @@ use std::process::Command;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
/// Events emitted during server startup reconciliation to broadcast real-time
/// progress to connected WebSocket clients.
#[derive(Debug, Clone, Serialize)]
pub struct ReconciliationEvent {
/// The story being reconciled, or empty string for the overall "done" event.
pub story_id: String,
/// Coarse status: "checking", "gates_running", "advanced", "skipped", "failed", "done"
pub status: String,
/// Human-readable details.
pub message: String,
}
/// Build the composite key used to track agents in the pool.
fn composite_key(story_id: &str, agent_name: &str) -> String {
format!("{story_id}:{agent_name}")
@@ -1198,11 +1210,20 @@ impl AgentPool {
/// start a fresh agent to retry.
/// 4. Stories in `4_merge/` are left for `auto_assign_available_work` to handle via a
/// fresh mergemaster (squash-merge must be re-executed by the mergemaster agent).
pub async fn reconcile_on_startup(&self, project_root: &Path) {
pub async fn reconcile_on_startup(
&self,
project_root: &Path,
progress_tx: &broadcast::Sender<ReconciliationEvent>,
) {
let worktrees = match worktree::list_worktrees(project_root) {
Ok(wt) => wt,
Err(e) => {
eprintln!("[startup:reconcile] Failed to list worktrees: {e}");
let _ = progress_tx.send(ReconciliationEvent {
story_id: String::new(),
status: "done".to_string(),
message: format!("Reconciliation failed: {e}"),
});
return;
}
};
@@ -1222,6 +1243,12 @@ impl AgentPool {
continue;
}
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "checking".to_string(),
message: format!("Checking for committed work in {stage_dir}/"),
});
// Check whether the worktree has commits ahead of the base branch.
let wt_path_for_check = wt_path.clone();
let has_work = tokio::task::spawn_blocking(move || {
@@ -1234,12 +1261,22 @@ impl AgentPool {
eprintln!(
"[startup:reconcile] No committed work for '{story_id}' in {stage_dir}/; skipping."
);
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "skipped".to_string(),
message: "No committed work found; skipping.".to_string(),
});
continue;
}
eprintln!(
"[startup:reconcile] Found committed work for '{story_id}' in {stage_dir}/. Running acceptance gates."
);
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "gates_running".to_string(),
message: "Running acceptance gates…".to_string(),
});
// Run acceptance gates on the worktree.
let wt_path_for_gates = wt_path.clone();
@@ -1253,12 +1290,22 @@ impl AgentPool {
Ok(Ok(pair)) => pair,
Ok(Err(e)) => {
eprintln!("[startup:reconcile] Gate check error for '{story_id}': {e}");
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: format!("Gate error: {e}"),
});
continue;
}
Err(e) => {
eprintln!(
"[startup:reconcile] Gate check task panicked for '{story_id}': {e}"
);
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: format!("Gate task panicked: {e}"),
});
continue;
}
};
@@ -1268,6 +1315,11 @@ impl AgentPool {
"[startup:reconcile] Gates failed for '{story_id}': {gate_output}\n\
Leaving in {stage_dir}/ for auto-assign to restart the agent."
);
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: "Gates failed; will be retried by auto-assign.".to_string(),
});
continue;
}
@@ -1279,8 +1331,18 @@ impl AgentPool {
// Coder stage → advance to QA.
if let Err(e) = move_story_to_qa(project_root, story_id) {
eprintln!("[startup:reconcile] Failed to move '{story_id}' to 3_qa/: {e}");
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: format!("Failed to advance to QA: {e}"),
});
} else {
eprintln!("[startup:reconcile] Moved '{story_id}' → 3_qa/.");
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "advanced".to_string(),
message: "Gates passed — moved to QA.".to_string(),
});
}
} else if stage_dir == "3_qa" {
// QA stage → run coverage gate before advancing to merge.
@@ -1295,12 +1357,22 @@ impl AgentPool {
eprintln!(
"[startup:reconcile] Coverage gate error for '{story_id}': {e}"
);
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: format!("Coverage gate error: {e}"),
});
continue;
}
Err(e) => {
eprintln!(
"[startup:reconcile] Coverage gate panicked for '{story_id}': {e}"
);
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: format!("Coverage gate panicked: {e}"),
});
continue;
}
};
@@ -1310,17 +1382,39 @@ impl AgentPool {
eprintln!(
"[startup:reconcile] Failed to move '{story_id}' to 4_merge/: {e}"
);
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: format!("Failed to advance to merge: {e}"),
});
} else {
eprintln!("[startup:reconcile] Moved '{story_id}' → 4_merge/.");
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "advanced".to_string(),
message: "Gates passed — moved to merge.".to_string(),
});
}
} else {
eprintln!(
"[startup:reconcile] Coverage gate failed for '{story_id}': {coverage_output}\n\
Leaving in 3_qa/ for auto-assign to restart the QA agent."
);
let _ = progress_tx.send(ReconciliationEvent {
story_id: story_id.clone(),
status: "failed".to_string(),
message: "Coverage gate failed; will be retried.".to_string(),
});
}
}
}
// Signal that reconciliation is complete.
let _ = progress_tx.send(ReconciliationEvent {
story_id: String::new(),
status: "done".to_string(),
message: "Startup reconciliation complete.".to_string(),
});
}
/// Test helper: inject an agent with a completion report and project_root
@@ -3566,8 +3660,28 @@ name = "qa"
async fn reconcile_on_startup_noop_when_no_worktrees() {
let tmp = tempfile::tempdir().unwrap();
let pool = AgentPool::new(3001);
let (tx, _rx) = broadcast::channel(16);
// Should not panic; no worktrees to reconcile.
pool.reconcile_on_startup(tmp.path()).await;
pool.reconcile_on_startup(tmp.path(), &tx).await;
}
#[tokio::test]
async fn reconcile_on_startup_emits_done_event() {
let tmp = tempfile::tempdir().unwrap();
let pool = AgentPool::new(3001);
let (tx, mut rx) = broadcast::channel::<ReconciliationEvent>(16);
pool.reconcile_on_startup(tmp.path(), &tx).await;
// Collect all events; the last must be "done".
let mut events: Vec<ReconciliationEvent> = Vec::new();
while let Ok(evt) = rx.try_recv() {
events.push(evt);
}
assert!(
events.iter().any(|e| e.status == "done"),
"reconcile_on_startup must emit a 'done' event; got: {:?}",
events.iter().map(|e| &e.status).collect::<Vec<_>>()
);
}
#[tokio::test]
@@ -3588,7 +3702,8 @@ name = "qa"
init_git_repo(&wt_dir);
let pool = AgentPool::new(3001);
pool.reconcile_on_startup(root).await;
let (tx, _rx) = broadcast::channel(16);
pool.reconcile_on_startup(root, &tx).await;
// Story should still be in 2_current/ — nothing was reconciled.
assert!(
@@ -3671,7 +3786,8 @@ name = "qa"
);
let pool = AgentPool::new(3001);
pool.reconcile_on_startup(root).await;
let (tx, _rx) = broadcast::channel(16);
pool.reconcile_on_startup(root, &tx).await;
// In the test env, cargo clippy will fail (no Cargo.toml) so gates fail
// and the story stays in 2_current/. The important assertion is that

View File

@@ -1,4 +1,4 @@
use crate::agents::AgentPool;
use crate::agents::{AgentPool, ReconciliationEvent};
use crate::io::watcher::WatcherEvent;
use crate::state::SessionState;
use crate::store::JsonFileStore;
@@ -26,6 +26,10 @@ pub struct AppContext {
/// Broadcast channel for filesystem watcher events. WebSocket handlers
/// subscribe to this to push lifecycle notifications to connected clients.
pub watcher_tx: broadcast::Sender<WatcherEvent>,
/// Broadcast channel for startup reconciliation progress events.
/// WebSocket handlers subscribe to this to push real-time reconciliation
/// updates to connected clients.
pub reconciliation_tx: broadcast::Sender<ReconciliationEvent>,
/// Sender for permission requests originating from the MCP
/// `prompt_permission` tool. The MCP handler sends a [`PermissionForward`]
/// and awaits the oneshot response.
@@ -42,6 +46,7 @@ impl AppContext {
*state.project_root.lock().unwrap() = Some(project_root.clone());
let store_path = project_root.join(".story_kit_store.json");
let (watcher_tx, _) = broadcast::channel(64);
let (reconciliation_tx, _) = broadcast::channel(64);
let (perm_tx, perm_rx) = mpsc::unbounded_channel();
Self {
state: Arc::new(state),
@@ -49,6 +54,7 @@ impl AppContext {
workflow: Arc::new(std::sync::Mutex::new(WorkflowState::default())),
agents: Arc::new(AgentPool::new(3001)),
watcher_tx,
reconciliation_tx,
perm_tx,
perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)),
}

View File

@@ -79,6 +79,14 @@ enum WsResponse {
ToolActivity {
tool_name: String,
},
/// Real-time progress from the server startup reconciliation pass.
/// `status` is one of: "checking", "gates_running", "advanced", "skipped",
/// "failed", "done". `story_id` is empty for the overall "done" event.
ReconciliationProgress {
story_id: String,
status: String,
message: String,
},
}
impl From<WatcherEvent> for WsResponse {
@@ -155,6 +163,30 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
}
});
// Subscribe to startup reconciliation events and forward them to the client.
let tx_reconcile = tx.clone();
let mut reconcile_rx = ctx.reconciliation_tx.subscribe();
tokio::spawn(async move {
loop {
match reconcile_rx.recv().await {
Ok(evt) => {
if tx_reconcile
.send(WsResponse::ReconciliationProgress {
story_id: evt.story_id,
status: evt.status,
message: evt.message,
})
.is_err()
{
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
// Map of pending permission request_id → oneshot responder.
// Permission requests arrive from the MCP `prompt_permission` tool via
// `ctx.perm_rx` and are forwarded to the client as `PermissionRequest`.

View File

@@ -66,12 +66,18 @@ async fn main() -> Result<(), std::io::Error> {
}
}
// Reconciliation progress channel: startup reconciliation → WebSocket clients.
let (reconciliation_tx, _) =
broadcast::channel::<agents::ReconciliationEvent>(64);
// Permission channel: MCP prompt_permission → WebSocket handler.
let (perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel();
// Capture project root and agents Arc before ctx is consumed by build_routes.
// Capture project root, agents Arc, and reconciliation sender before ctx
// is consumed by build_routes.
let startup_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
let startup_agents = Arc::clone(&agents);
let startup_reconciliation_tx = reconciliation_tx.clone();
let ctx = AppContext {
state: app_state,
@@ -79,6 +85,7 @@ async fn main() -> Result<(), std::io::Error> {
workflow,
agents,
watcher_tx,
reconciliation_tx,
perm_tx,
perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)),
};
@@ -94,7 +101,9 @@ async fn main() -> Result<(), std::io::Error> {
slog!(
"[startup] Reconciling completed worktrees from previous session."
);
startup_agents.reconcile_on_startup(&root).await;
startup_agents
.reconcile_on_startup(&root, &startup_reconciliation_tx)
.await;
slog!(
"[auto-assign] Scanning pipeline stages for unassigned work."
);