Spike 3: Sub-agent infrastructure fixes for multi-agent coordination

- Fix CLAUDECODE env var blocking nested Claude Code sessions
- Add drain-based event_log for reliable get_agent_output polling
- Add non-SSE get_agent_output fallback (critical for MCP tool calls)
- Preserve worktrees on agent stop instead of destroying work
- Reap zombie processes with child.wait() after kill
- Increase broadcast buffer from 256 to 1024
- Engineer supervisor and coder prompts in project.toml
- Point .mcp.json to test port 3002

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Dave
2026-02-20 11:57:25 +00:00
parent b089d314ba
commit db2d055f60
5 changed files with 161 additions and 46 deletions

View File

@@ -82,10 +82,11 @@ struct StoryAgent {
agent_name: String,
status: AgentStatus,
worktree_info: Option<WorktreeInfo>,
config: ProjectConfig,
session_id: Option<String>,
tx: broadcast::Sender<AgentEvent>,
task_handle: Option<tokio::task::JoinHandle<()>>,
/// Accumulated events for polling via get_agent_output.
event_log: Arc<Mutex<Vec<AgentEvent>>>,
}
/// Manages concurrent story agents, each in its own worktree.
@@ -140,7 +141,9 @@ impl AgentPool {
}
}
let (tx, _) = broadcast::channel::<AgentEvent>(256);
let (tx, _) = broadcast::channel::<AgentEvent>(1024);
let event_log: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
// Register as pending
{
@@ -151,10 +154,10 @@ impl AgentPool {
agent_name: resolved_name.clone(),
status: AgentStatus::Pending,
worktree_info: None,
config: config.clone(),
session_id: None,
tx: tx.clone(),
task_handle: None,
event_log: event_log.clone(),
},
);
}
@@ -187,6 +190,7 @@ impl AgentPool {
let agents_ref = self.agents.clone();
let cwd = wt_path_str.clone();
let key_clone = key.clone();
let log_clone = event_log.clone();
let handle = tokio::spawn(async move {
let _ = tx_clone.send(AgentEvent::Status {
@@ -195,8 +199,10 @@ impl AgentPool {
status: "running".to_string(),
});
match run_agent_pty_streaming(&sid, &aname, &command, &args, &prompt, &cwd, &tx_clone)
.await
match run_agent_pty_streaming(
&sid, &aname, &command, &args, &prompt, &cwd, &tx_clone, &log_clone,
)
.await
{
Ok(session_id) => {
if let Ok(mut agents) = agents_ref.lock()
@@ -244,27 +250,26 @@ impl AgentPool {
})
}
/// Stop a running agent and clean up its worktree.
/// Stop a running agent. Worktree is preserved for inspection.
pub async fn stop_agent(
&self,
project_root: &Path,
_project_root: &Path,
story_id: &str,
agent_name: &str,
) -> Result<(), String> {
let key = composite_key(story_id, agent_name);
let (worktree_info, config, task_handle, tx) = {
let (worktree_info, task_handle, tx) = {
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get_mut(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
let wt = agent.worktree_info.clone();
let cfg = agent.config.clone();
let handle = agent.task_handle.take();
let tx = agent.tx.clone();
agent.status = AgentStatus::Failed;
(wt, cfg, handle, tx)
(wt, handle, tx)
};
// Abort the task
@@ -273,11 +278,12 @@ impl AgentPool {
let _ = handle.await;
}
// Remove worktree
if let Some(ref wt) = worktree_info
&& let Err(e) = worktree::remove_worktree(project_root, wt, &config).await
{
eprintln!("[agents] Worktree cleanup warning for {story_id}:{agent_name}: {e}");
// Preserve worktree for inspection — don't destroy agent's work on stop.
if let Some(ref wt) = worktree_info {
eprintln!(
"[agents] Worktree preserved for {story_id}:{agent_name}: {}",
wt.path.display()
);
}
let _ = tx.send(AgentEvent::Status {
@@ -334,6 +340,21 @@ impl AgentPool {
Ok(agent.tx.subscribe())
}
/// Drain accumulated events for polling. Returns all events since the last drain.
pub fn drain_events(
&self,
story_id: &str,
agent_name: &str,
) -> Result<Vec<AgentEvent>, String> {
let key = composite_key(story_id, agent_name);
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
let mut log = agent.event_log.lock().map_err(|e| e.to_string())?;
Ok(log.drain(..).collect())
}
/// Get project root helper.
pub fn get_project_root(
&self,
@@ -344,6 +365,7 @@ impl AgentPool {
}
/// Spawn claude agent in a PTY and stream events through the broadcast channel.
#[allow(clippy::too_many_arguments)]
async fn run_agent_pty_streaming(
story_id: &str,
agent_name: &str,
@@ -352,6 +374,7 @@ async fn run_agent_pty_streaming(
prompt: &str,
cwd: &str,
tx: &broadcast::Sender<AgentEvent>,
event_log: &Arc<Mutex<Vec<AgentEvent>>>,
) -> Result<Option<String>, String> {
let sid = story_id.to_string();
let aname = agent_name.to_string();
@@ -360,14 +383,28 @@ async fn run_agent_pty_streaming(
let prompt = prompt.to_string();
let cwd = cwd.to_string();
let tx = tx.clone();
let event_log = event_log.clone();
tokio::task::spawn_blocking(move || {
run_agent_pty_blocking(&sid, &aname, &cmd, &args, &prompt, &cwd, &tx)
run_agent_pty_blocking(&sid, &aname, &cmd, &args, &prompt, &cwd, &tx, &event_log)
})
.await
.map_err(|e| format!("Agent task panicked: {e}"))?
}
/// Helper to send an event to both broadcast and event log.
fn emit_event(
event: AgentEvent,
tx: &broadcast::Sender<AgentEvent>,
event_log: &Mutex<Vec<AgentEvent>>,
) {
if let Ok(mut log) = event_log.lock() {
log.push(event.clone());
}
let _ = tx.send(event);
}
#[allow(clippy::too_many_arguments)]
fn run_agent_pty_blocking(
story_id: &str,
agent_name: &str,
@@ -376,6 +413,7 @@ fn run_agent_pty_blocking(
prompt: &str,
cwd: &str,
tx: &broadcast::Sender<AgentEvent>,
event_log: &Mutex<Vec<AgentEvent>>,
) -> Result<Option<String>, String> {
let pty_system = native_pty_system();
@@ -410,6 +448,10 @@ fn run_agent_pty_blocking(
cmd.cwd(cwd);
cmd.env("NO_COLOR", "1");
// Allow spawning Claude Code from within a Claude Code session
cmd.env_remove("CLAUDECODE");
cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
eprintln!("[agent:{story_id}:{agent_name}] Spawning {command} in {cwd} with args: {args:?}");
let mut child = pair
@@ -445,11 +487,15 @@ fn run_agent_pty_blocking(
Ok(j) => j,
Err(_) => {
// Non-JSON output (terminal escapes etc.) — send as raw output
let _ = tx.send(AgentEvent::Output {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
text: trimmed.to_string(),
});
emit_event(
AgentEvent::Output {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
text: trimmed.to_string(),
},
tx,
event_log,
);
continue;
}
};
@@ -469,11 +515,15 @@ fn run_agent_pty_blocking(
{
for block in content {
if let Some(text) = block.get("text").and_then(|t| t.as_str()) {
let _ = tx.send(AgentEvent::Output {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
text: text.to_string(),
});
emit_event(
AgentEvent::Output {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
text: text.to_string(),
},
tx,
event_log,
);
}
}
}
@@ -482,14 +532,19 @@ fn run_agent_pty_blocking(
}
// Forward all JSON events
let _ = tx.send(AgentEvent::AgentJson {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
data: json,
});
emit_event(
AgentEvent::AgentJson {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
data: json,
},
tx,
event_log,
);
}
let _ = child.kill();
let _ = child.wait();
eprintln!(
"[agent:{story_id}:{agent_name}] Done. Session: {:?}",