story-kit: merge 157_story_make_start_agent_non_blocking_by_deferring_worktree_creation
Make start_agent non-blocking by deferring worktree creation. The agent spawn now returns immediately while worktree setup happens asynchronously, improving responsiveness of the start_agent MCP call. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -376,52 +376,118 @@ impl AgentPool {
|
|||||||
// Move story from upcoming/ to current/ and auto-commit before creating the worktree.
|
// Move story from upcoming/ to current/ and auto-commit before creating the worktree.
|
||||||
move_story_to_current(project_root, story_id)?;
|
move_story_to_current(project_root, story_id)?;
|
||||||
|
|
||||||
// Create worktree
|
// Extract inactivity timeout from the agent config before cloning config.
|
||||||
let wt_info = worktree::create_worktree(project_root, story_id, &config, self.port).await?;
|
|
||||||
|
|
||||||
// Update with worktree info
|
|
||||||
{
|
|
||||||
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
||||||
if let Some(agent) = agents.get_mut(&key) {
|
|
||||||
agent.worktree_info = Some(wt_info.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Spawn the agent process
|
|
||||||
let wt_path_str = wt_info.path.to_string_lossy().to_string();
|
|
||||||
let (command, args, mut prompt) =
|
|
||||||
config.render_agent_args(&wt_path_str, story_id, Some(&resolved_name), Some(&wt_info.base_branch))?;
|
|
||||||
|
|
||||||
// Append resume context if this is a restart with failure information.
|
|
||||||
if let Some(ctx) = resume_context {
|
|
||||||
prompt.push_str(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract inactivity timeout from the agent config before moving config.
|
|
||||||
let inactivity_timeout_secs = config
|
let inactivity_timeout_secs = config
|
||||||
.find_agent(&resolved_name)
|
.find_agent(&resolved_name)
|
||||||
.map(|a| a.inactivity_timeout_secs)
|
.map(|a| a.inactivity_timeout_secs)
|
||||||
.unwrap_or(300);
|
.unwrap_or(300);
|
||||||
|
|
||||||
|
// Clone all values needed inside the background spawn.
|
||||||
|
let project_root_clone = project_root.to_path_buf();
|
||||||
|
let config_clone = config.clone();
|
||||||
|
let resume_context_owned = resume_context.map(str::to_string);
|
||||||
let sid = story_id.to_string();
|
let sid = story_id.to_string();
|
||||||
let aname = resolved_name.clone();
|
let aname = resolved_name.clone();
|
||||||
let tx_clone = tx.clone();
|
let tx_clone = tx.clone();
|
||||||
let agents_ref = self.agents.clone();
|
let agents_ref = self.agents.clone();
|
||||||
let cwd = wt_path_str.clone();
|
|
||||||
let key_clone = key.clone();
|
let key_clone = key.clone();
|
||||||
let log_clone = event_log.clone();
|
let log_clone = event_log.clone();
|
||||||
let port_for_task = self.port;
|
let port_for_task = self.port;
|
||||||
let log_writer_clone = log_writer.clone();
|
let log_writer_clone = log_writer.clone();
|
||||||
|
|
||||||
|
// Spawn the background task. Worktree creation and agent launch happen here
|
||||||
|
// so `start_agent` returns immediately after registering the agent as
|
||||||
|
// Pending — non-blocking by design (story 157).
|
||||||
let handle = tokio::spawn(async move {
|
let handle = tokio::spawn(async move {
|
||||||
|
// Step 1: create the worktree (slow — git checkout, pnpm install, etc.)
|
||||||
|
let wt_info = match worktree::create_worktree(
|
||||||
|
&project_root_clone,
|
||||||
|
&sid,
|
||||||
|
&config_clone,
|
||||||
|
port_for_task,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(wt) => wt,
|
||||||
|
Err(e) => {
|
||||||
|
// Worktree creation failed — mark agent as Failed so the UI shows the error.
|
||||||
|
if let Ok(mut agents) = agents_ref.lock()
|
||||||
|
&& let Some(agent) = agents.get_mut(&key_clone)
|
||||||
|
{
|
||||||
|
agent.status = AgentStatus::Failed;
|
||||||
|
agent.completed_at = Some(Instant::now());
|
||||||
|
}
|
||||||
|
let _ = tx_clone.send(AgentEvent::Error {
|
||||||
|
story_id: sid.clone(),
|
||||||
|
agent_name: aname.clone(),
|
||||||
|
message: format!("Failed to create worktree: {e}"),
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Step 2: store worktree info and render agent command/args/prompt.
|
||||||
|
let wt_path_str = wt_info.path.to_string_lossy().to_string();
|
||||||
|
{
|
||||||
|
if let Ok(mut agents) = agents_ref.lock()
|
||||||
|
&& let Some(agent) = agents.get_mut(&key_clone)
|
||||||
|
{
|
||||||
|
agent.worktree_info = Some(wt_info.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let (command, args, mut prompt) = match config_clone.render_agent_args(
|
||||||
|
&wt_path_str,
|
||||||
|
&sid,
|
||||||
|
Some(&aname),
|
||||||
|
Some(&wt_info.base_branch),
|
||||||
|
) {
|
||||||
|
Ok(result) => result,
|
||||||
|
Err(e) => {
|
||||||
|
if let Ok(mut agents) = agents_ref.lock()
|
||||||
|
&& let Some(agent) = agents.get_mut(&key_clone)
|
||||||
|
{
|
||||||
|
agent.status = AgentStatus::Failed;
|
||||||
|
agent.completed_at = Some(Instant::now());
|
||||||
|
}
|
||||||
|
let _ = tx_clone.send(AgentEvent::Error {
|
||||||
|
story_id: sid.clone(),
|
||||||
|
agent_name: aname.clone(),
|
||||||
|
message: format!("Failed to render agent args: {e}"),
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Append resume context if this is a restart with failure information.
|
||||||
|
if let Some(ctx) = resume_context_owned {
|
||||||
|
prompt.push_str(&ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 3: transition to Running now that the worktree is ready.
|
||||||
|
{
|
||||||
|
if let Ok(mut agents) = agents_ref.lock()
|
||||||
|
&& let Some(agent) = agents.get_mut(&key_clone)
|
||||||
|
{
|
||||||
|
agent.status = AgentStatus::Running;
|
||||||
|
}
|
||||||
|
}
|
||||||
let _ = tx_clone.send(AgentEvent::Status {
|
let _ = tx_clone.send(AgentEvent::Status {
|
||||||
story_id: sid.clone(),
|
story_id: sid.clone(),
|
||||||
agent_name: aname.clone(),
|
agent_name: aname.clone(),
|
||||||
status: "running".to_string(),
|
status: "running".to_string(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Step 4: launch the agent process.
|
||||||
match run_agent_pty_streaming(
|
match run_agent_pty_streaming(
|
||||||
&sid, &aname, &command, &args, &prompt, &cwd, &tx_clone, &log_clone,
|
&sid,
|
||||||
|
&aname,
|
||||||
|
&command,
|
||||||
|
&args,
|
||||||
|
&prompt,
|
||||||
|
&wt_path_str,
|
||||||
|
&tx_clone,
|
||||||
|
&log_clone,
|
||||||
log_writer_clone,
|
log_writer_clone,
|
||||||
inactivity_timeout_secs,
|
inactivity_timeout_secs,
|
||||||
)
|
)
|
||||||
@@ -455,11 +521,10 @@ impl AgentPool {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Update status to running with task handle
|
// Store the task handle while the agent is still Pending.
|
||||||
{
|
{
|
||||||
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
||||||
if let Some(agent) = agents.get_mut(&key) {
|
if let Some(agent) = agents.get_mut(&key) {
|
||||||
agent.status = AgentStatus::Running;
|
|
||||||
agent.task_handle = Some(handle);
|
agent.task_handle = Some(handle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -470,10 +535,10 @@ impl AgentPool {
|
|||||||
Ok(AgentInfo {
|
Ok(AgentInfo {
|
||||||
story_id: story_id.to_string(),
|
story_id: story_id.to_string(),
|
||||||
agent_name: resolved_name,
|
agent_name: resolved_name,
|
||||||
status: AgentStatus::Running,
|
status: AgentStatus::Pending,
|
||||||
session_id: None,
|
session_id: None,
|
||||||
worktree_path: Some(wt_path_str),
|
worktree_path: None,
|
||||||
base_branch: Some(wt_info.base_branch.clone()),
|
base_branch: None,
|
||||||
completion: None,
|
completion: None,
|
||||||
log_session_id: Some(log_session_id),
|
log_session_id: Some(log_session_id),
|
||||||
})
|
})
|
||||||
@@ -1766,11 +1831,12 @@ fn check_orphaned_agents(agents: &Mutex<HashMap<String, StoryAgent>>) {
|
|||||||
Err(_) => return,
|
Err(_) => return,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Collect orphaned entries: Running agents whose task handle is finished.
|
// Collect orphaned entries: Running or Pending agents whose task handle is finished.
|
||||||
let orphaned: Vec<(String, String, broadcast::Sender<AgentEvent>)> = lock
|
// Pending agents can be orphaned if worktree creation panics before setting status.
|
||||||
|
let orphaned: Vec<(String, String, broadcast::Sender<AgentEvent>, AgentStatus)> = lock
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|(key, agent)| {
|
.filter_map(|(key, agent)| {
|
||||||
if agent.status == AgentStatus::Running
|
if matches!(agent.status, AgentStatus::Running | AgentStatus::Pending)
|
||||||
&& let Some(handle) = &agent.task_handle
|
&& let Some(handle) = &agent.task_handle
|
||||||
&& handle.is_finished()
|
&& handle.is_finished()
|
||||||
{
|
{
|
||||||
@@ -1778,17 +1844,17 @@ fn check_orphaned_agents(agents: &Mutex<HashMap<String, StoryAgent>>) {
|
|||||||
.rsplit_once(':')
|
.rsplit_once(':')
|
||||||
.map(|(s, _)| s.to_string())
|
.map(|(s, _)| s.to_string())
|
||||||
.unwrap_or_else(|| key.clone());
|
.unwrap_or_else(|| key.clone());
|
||||||
return Some((key.clone(), story_id, agent.tx.clone()));
|
return Some((key.clone(), story_id, agent.tx.clone(), agent.status.clone()));
|
||||||
}
|
}
|
||||||
None
|
None
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
for (key, story_id, tx) in orphaned {
|
for (key, story_id, tx, prev_status) in orphaned {
|
||||||
if let Some(agent) = lock.get_mut(&key) {
|
if let Some(agent) = lock.get_mut(&key) {
|
||||||
agent.status = AgentStatus::Failed;
|
agent.status = AgentStatus::Failed;
|
||||||
slog!(
|
slog!(
|
||||||
"[watchdog] Orphaned agent '{key}': task finished but status was Running. \
|
"[watchdog] Orphaned agent '{key}': task finished but status was {prev_status}. \
|
||||||
Marking Failed."
|
Marking Failed."
|
||||||
);
|
);
|
||||||
let _ = tx.send(AgentEvent::Error {
|
let _ = tx.send(AgentEvent::Error {
|
||||||
@@ -4804,10 +4870,15 @@ stage = "qa"
|
|||||||
|
|
||||||
// ── bug 118: pending entry cleanup on start_agent failure ────────────────
|
// ── bug 118: pending entry cleanup on start_agent failure ────────────────
|
||||||
|
|
||||||
/// Regression test for bug 118: when `start_agent` fails (e.g. because
|
/// Regression test for bug 118: when worktree creation fails (e.g. because
|
||||||
/// `create_worktree` cannot find a git repo), the Pending entry that was
|
/// there is no git repo), the Pending entry that was inserted into the agent
|
||||||
/// inserted into the agent HashMap must be cleaned up so it does not
|
/// HashMap must not remain Pending — it must transition to Failed. This
|
||||||
/// permanently block `find_free_agent_for_stage` / auto-assign.
|
/// prevents `find_free_agent_for_stage` / auto-assign from being permanently
|
||||||
|
/// blocked.
|
||||||
|
///
|
||||||
|
/// With story 157 the worktree creation moved into the background spawn, so
|
||||||
|
/// `start_agent` returns `Ok(Pending)` immediately. We use `wait_for_agent`
|
||||||
|
/// to block until the background task resolves.
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn start_agent_cleans_up_pending_entry_on_failure() {
|
async fn start_agent_cleans_up_pending_entry_on_failure() {
|
||||||
use std::fs;
|
use std::fs;
|
||||||
@@ -4825,7 +4896,7 @@ stage = "qa"
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// Create the story in upcoming so `move_story_to_current` succeeds,
|
// Create the story in upcoming so `move_story_to_current` succeeds,
|
||||||
// but do NOT init a git repo — `create_worktree` will fail.
|
// but do NOT init a git repo — `create_worktree` will fail in the spawn.
|
||||||
let upcoming = root.join(".story_kit/work/1_upcoming");
|
let upcoming = root.join(".story_kit/work/1_upcoming");
|
||||||
fs::create_dir_all(&upcoming).unwrap();
|
fs::create_dir_all(&upcoming).unwrap();
|
||||||
fs::write(
|
fs::write(
|
||||||
@@ -4840,10 +4911,32 @@ stage = "qa"
|
|||||||
.start_agent(root, "50_story_test", Some("qa"), None)
|
.start_agent(root, "50_story_test", Some("qa"), None)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// The call must fail (no git repo for worktree creation).
|
// With the non-blocking flow, start_agent returns Ok(Pending) immediately.
|
||||||
assert!(result.is_err(), "start_agent should fail without a git repo");
|
// Worktree creation failure happens asynchronously in the background.
|
||||||
|
assert!(
|
||||||
|
result.is_ok(),
|
||||||
|
"start_agent should return Ok(Pending) immediately: {:?}",
|
||||||
|
result.err()
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
result.unwrap().status,
|
||||||
|
AgentStatus::Pending,
|
||||||
|
"initial status must be Pending"
|
||||||
|
);
|
||||||
|
|
||||||
// The pool must NOT retain a Pending entry for this agent.
|
// Wait for the background task to reach a terminal state.
|
||||||
|
// It must fail (no git repo → create_worktree returns an error).
|
||||||
|
let final_info = pool
|
||||||
|
.wait_for_agent("50_story_test", "qa", 5000)
|
||||||
|
.await
|
||||||
|
.expect("wait_for_agent should not time out");
|
||||||
|
assert_eq!(
|
||||||
|
final_info.status,
|
||||||
|
AgentStatus::Failed,
|
||||||
|
"agent must transition to Failed after worktree creation error"
|
||||||
|
);
|
||||||
|
|
||||||
|
// The pool must NOT retain a Pending or Running entry for this agent.
|
||||||
let agents = pool.agents.lock().unwrap();
|
let agents = pool.agents.lock().unwrap();
|
||||||
let leaked = agents.values().any(|a| {
|
let leaked = agents.values().any(|a| {
|
||||||
a.agent_name == "qa"
|
a.agent_name == "qa"
|
||||||
@@ -4851,7 +4944,7 @@ stage = "qa"
|
|||||||
});
|
});
|
||||||
assert!(
|
assert!(
|
||||||
!leaked,
|
!leaked,
|
||||||
"agent pool must not retain a Pending/Running entry after start_agent fails"
|
"agent pool must not retain a Pending/Running entry after worktree creation fails"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user