story-kit: merge 118_bug_agent_pool_retains_stale_running_state_after_completion_blocking_auto_assign
This commit is contained in:
@@ -28,6 +28,53 @@ fn composite_key(story_id: &str, agent_name: &str) -> String {
|
|||||||
format!("{story_id}:{agent_name}")
|
format!("{story_id}:{agent_name}")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// RAII guard that removes a pending agent entry from the pool on drop.
|
||||||
|
///
|
||||||
|
/// Created after inserting a `Pending` entry into the agent HashMap.
|
||||||
|
/// If `start_agent` succeeds (the agent process is spawned and status
|
||||||
|
/// transitions to `Running`), call [`disarm`](Self::disarm) to prevent
|
||||||
|
/// cleanup. If any intermediate step fails and the guard is dropped
|
||||||
|
/// without being disarmed, the pending entry is removed so it cannot
|
||||||
|
/// block future auto-assign dispatches.
|
||||||
|
struct PendingGuard {
|
||||||
|
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
|
||||||
|
key: String,
|
||||||
|
armed: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PendingGuard {
|
||||||
|
fn new(agents: Arc<Mutex<HashMap<String, StoryAgent>>>, key: String) -> Self {
|
||||||
|
Self {
|
||||||
|
agents,
|
||||||
|
key,
|
||||||
|
armed: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Prevent the guard from cleaning up the entry (call after
|
||||||
|
/// successful spawn).
|
||||||
|
fn disarm(&mut self) {
|
||||||
|
self.armed = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for PendingGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if self.armed
|
||||||
|
&& let Ok(mut agents) = self.agents.lock()
|
||||||
|
&& agents
|
||||||
|
.get(&self.key)
|
||||||
|
.is_some_and(|a| a.status == AgentStatus::Pending)
|
||||||
|
{
|
||||||
|
agents.remove(&self.key);
|
||||||
|
slog!(
|
||||||
|
"[agents] Cleaned up leaked Pending entry for '{}'",
|
||||||
|
self.key
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Events streamed from a running agent to SSE clients.
|
/// Events streamed from a running agent to SSE clients.
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
#[serde(tag = "type", rename_all = "snake_case")]
|
#[serde(tag = "type", rename_all = "snake_case")]
|
||||||
@@ -269,7 +316,9 @@ impl AgentPool {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Register as pending
|
// Register as pending. The `PendingGuard` ensures that if any
|
||||||
|
// step below fails the entry is removed from the pool so it does
|
||||||
|
// not permanently block auto-assign (bug 118).
|
||||||
{
|
{
|
||||||
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
||||||
agents.insert(
|
agents.insert(
|
||||||
@@ -288,6 +337,7 @@ impl AgentPool {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
let mut pending_guard = PendingGuard::new(self.agents.clone(), key.clone());
|
||||||
|
|
||||||
let _ = tx.send(AgentEvent::Status {
|
let _ = tx.send(AgentEvent::Status {
|
||||||
story_id: story_id.to_string(),
|
story_id: story_id.to_string(),
|
||||||
@@ -378,6 +428,9 @@ impl AgentPool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Agent successfully spawned — prevent the guard from removing the entry.
|
||||||
|
pending_guard.disarm();
|
||||||
|
|
||||||
Ok(AgentInfo {
|
Ok(AgentInfo {
|
||||||
story_id: story_id.to_string(),
|
story_id: story_id.to_string(),
|
||||||
agent_name: resolved_name,
|
agent_name: resolved_name,
|
||||||
@@ -3835,4 +3888,95 @@ name = "qa"
|
|||||||
assert_eq!(entries[0].event["type"], "status");
|
assert_eq!(entries[0].event["type"], "status");
|
||||||
assert_eq!(entries[0].event["status"], "running");
|
assert_eq!(entries[0].event["status"], "running");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── bug 118: pending entry cleanup on start_agent failure ────────────────
|
||||||
|
|
||||||
|
/// Regression test for bug 118: when `start_agent` fails (e.g. because
|
||||||
|
/// `create_worktree` cannot find a git repo), the Pending entry that was
|
||||||
|
/// inserted into the agent HashMap must be cleaned up so it does not
|
||||||
|
/// permanently block `find_free_agent_for_stage` / auto-assign.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn start_agent_cleans_up_pending_entry_on_failure() {
|
||||||
|
use std::fs;
|
||||||
|
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let root = tmp.path();
|
||||||
|
|
||||||
|
// Minimal project.toml with a "qa" agent.
|
||||||
|
let sk_dir = root.join(".story_kit");
|
||||||
|
fs::create_dir_all(&sk_dir).unwrap();
|
||||||
|
fs::write(
|
||||||
|
sk_dir.join("project.toml"),
|
||||||
|
"[[agent]]\nname = \"qa\"\n",
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Create the story in upcoming so `move_story_to_current` succeeds,
|
||||||
|
// but do NOT init a git repo — `create_worktree` will fail.
|
||||||
|
let upcoming = root.join(".story_kit/work/1_upcoming");
|
||||||
|
fs::create_dir_all(&upcoming).unwrap();
|
||||||
|
fs::write(
|
||||||
|
upcoming.join("50_story_test.md"),
|
||||||
|
"---\nname: Test\n---\n",
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let pool = AgentPool::new(3099);
|
||||||
|
|
||||||
|
let result = pool
|
||||||
|
.start_agent(root, "50_story_test", Some("qa"), None)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// The call must fail (no git repo for worktree creation).
|
||||||
|
assert!(result.is_err(), "start_agent should fail without a git repo");
|
||||||
|
|
||||||
|
// The pool must NOT retain a Pending entry for this agent.
|
||||||
|
let agents = pool.agents.lock().unwrap();
|
||||||
|
let leaked = agents.values().any(|a| {
|
||||||
|
a.agent_name == "qa"
|
||||||
|
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
||||||
|
});
|
||||||
|
assert!(
|
||||||
|
!leaked,
|
||||||
|
"agent pool must not retain a Pending/Running entry after start_agent fails"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Verify that a successful start_agent keeps the Running entry (guard is
|
||||||
|
/// disarmed). We cannot truly spawn an agent in tests, but we verify that
|
||||||
|
/// the concurrency check still blocks a second concurrent start — which
|
||||||
|
/// proves the first entry survived the guard.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn start_agent_guard_does_not_remove_running_entry() {
|
||||||
|
use std::fs;
|
||||||
|
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let root = tmp.path();
|
||||||
|
|
||||||
|
let sk_dir = root.join(".story_kit");
|
||||||
|
fs::create_dir_all(&sk_dir).unwrap();
|
||||||
|
fs::write(
|
||||||
|
sk_dir.join("project.toml"),
|
||||||
|
"[[agent]]\nname = \"qa\"\n",
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let pool = AgentPool::new(3099);
|
||||||
|
|
||||||
|
// Manually inject a Running agent (simulates successful start).
|
||||||
|
pool.inject_test_agent("story-x", "qa", AgentStatus::Running);
|
||||||
|
|
||||||
|
// Attempting to start the same agent on a different story must be
|
||||||
|
// rejected — the Running entry must still be there.
|
||||||
|
let result = pool
|
||||||
|
.start_agent(root, "story-y", Some("qa"), None)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert!(result.is_err());
|
||||||
|
let err = result.unwrap_err();
|
||||||
|
assert!(
|
||||||
|
err.contains("already running") || err.contains("becomes available"),
|
||||||
|
"running entry must survive: got '{err}'"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user