Split the monolithic agents.rs into 6 focused modules: - mod.rs: shared types (AgentEvent, AgentStatus, etc.) and re-exports - pool.rs: AgentPool struct, all methods, and helper free functions - pty.rs: PTY streaming (run_agent_pty_blocking, emit_event) - lifecycle.rs: story movement functions (move_story_to_qa, etc.) - gates.rs: acceptance gates (clippy, tests, coverage) - merge.rs: squash-merge, conflict resolution, quality gates All 121 original tests are preserved and distributed across modules. Also adds clear_front_matter_field to story_metadata.rs to strip stale merge_failure from front matter when stories move to done. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
491 lines
16 KiB
Rust
491 lines
16 KiB
Rust
use std::collections::HashMap;
|
|
use std::io::{BufRead, BufReader};
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use portable_pty::{ChildKiller, CommandBuilder, PtySize, native_pty_system};
|
|
use tokio::sync::broadcast;
|
|
|
|
use super::AgentEvent;
|
|
use crate::agent_log::AgentLogWriter;
|
|
use crate::slog;
|
|
use crate::slog_warn;
|
|
|
|
fn composite_key(story_id: &str, agent_name: &str) -> String {
|
|
format!("{story_id}:{agent_name}")
|
|
}
|
|
|
|
struct ChildKillerGuard {
|
|
killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
|
key: String,
|
|
}
|
|
|
|
impl Drop for ChildKillerGuard {
|
|
fn drop(&mut self) {
|
|
if let Ok(mut killers) = self.killers.lock() {
|
|
killers.remove(&self.key);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Spawn claude agent in a PTY and stream events through the broadcast channel.
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub(super) async fn run_agent_pty_streaming(
|
|
story_id: &str,
|
|
agent_name: &str,
|
|
command: &str,
|
|
args: &[String],
|
|
prompt: &str,
|
|
cwd: &str,
|
|
tx: &broadcast::Sender<AgentEvent>,
|
|
event_log: &Arc<Mutex<Vec<AgentEvent>>>,
|
|
log_writer: Option<Arc<Mutex<AgentLogWriter>>>,
|
|
inactivity_timeout_secs: u64,
|
|
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
|
) -> Result<Option<String>, String> {
|
|
let sid = story_id.to_string();
|
|
let aname = agent_name.to_string();
|
|
let cmd = command.to_string();
|
|
let args = args.to_vec();
|
|
let prompt = prompt.to_string();
|
|
let cwd = cwd.to_string();
|
|
let tx = tx.clone();
|
|
let event_log = event_log.clone();
|
|
|
|
tokio::task::spawn_blocking(move || {
|
|
run_agent_pty_blocking(
|
|
&sid,
|
|
&aname,
|
|
&cmd,
|
|
&args,
|
|
&prompt,
|
|
&cwd,
|
|
&tx,
|
|
&event_log,
|
|
log_writer.as_deref(),
|
|
inactivity_timeout_secs,
|
|
&child_killers,
|
|
)
|
|
})
|
|
.await
|
|
.map_err(|e| format!("Agent task panicked: {e}"))?
|
|
}
|
|
|
|
/// Dispatch a `stream_event` from Claude Code's `--include-partial-messages` output.
|
|
///
|
|
/// Extracts `thinking_delta` and `text_delta` from `content_block_delta` events
|
|
/// and routes them as `AgentEvent::Thinking` and `AgentEvent::Output` respectively.
|
|
/// This ensures thinking traces flow through the dedicated `ThinkingBlock` UI
|
|
/// component rather than appearing as unbounded regular output.
|
|
fn handle_agent_stream_event(
|
|
event: &serde_json::Value,
|
|
story_id: &str,
|
|
agent_name: &str,
|
|
tx: &broadcast::Sender<AgentEvent>,
|
|
event_log: &Mutex<Vec<AgentEvent>>,
|
|
log_writer: Option<&Mutex<AgentLogWriter>>,
|
|
) {
|
|
let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
|
|
|
if event_type == "content_block_delta"
|
|
&& let Some(delta) = event.get("delta")
|
|
{
|
|
let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
|
match delta_type {
|
|
"thinking_delta" => {
|
|
if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) {
|
|
emit_event(
|
|
AgentEvent::Thinking {
|
|
story_id: story_id.to_string(),
|
|
agent_name: agent_name.to_string(),
|
|
text: thinking.to_string(),
|
|
},
|
|
tx,
|
|
event_log,
|
|
log_writer,
|
|
);
|
|
}
|
|
}
|
|
"text_delta" => {
|
|
if let Some(text) = delta.get("text").and_then(|t| t.as_str()) {
|
|
emit_event(
|
|
AgentEvent::Output {
|
|
story_id: story_id.to_string(),
|
|
agent_name: agent_name.to_string(),
|
|
text: text.to_string(),
|
|
},
|
|
tx,
|
|
event_log,
|
|
log_writer,
|
|
);
|
|
}
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Helper to send an event to broadcast, event log, and optional persistent log file.
|
|
pub(super) fn emit_event(
|
|
event: AgentEvent,
|
|
tx: &broadcast::Sender<AgentEvent>,
|
|
event_log: &Mutex<Vec<AgentEvent>>,
|
|
log_writer: Option<&Mutex<AgentLogWriter>>,
|
|
) {
|
|
if let Ok(mut log) = event_log.lock() {
|
|
log.push(event.clone());
|
|
}
|
|
if let Some(writer) = log_writer
|
|
&& let Ok(mut w) = writer.lock()
|
|
&& let Err(e) = w.write_event(&event)
|
|
{
|
|
eprintln!("[agent_log] Failed to write event to log file: {e}");
|
|
}
|
|
let _ = tx.send(event);
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
fn run_agent_pty_blocking(
|
|
story_id: &str,
|
|
agent_name: &str,
|
|
command: &str,
|
|
args: &[String],
|
|
prompt: &str,
|
|
cwd: &str,
|
|
tx: &broadcast::Sender<AgentEvent>,
|
|
event_log: &Mutex<Vec<AgentEvent>>,
|
|
log_writer: Option<&Mutex<AgentLogWriter>>,
|
|
inactivity_timeout_secs: u64,
|
|
child_killers: &Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
|
) -> Result<Option<String>, String> {
|
|
let pty_system = native_pty_system();
|
|
|
|
let pair = pty_system
|
|
.openpty(PtySize {
|
|
rows: 50,
|
|
cols: 200,
|
|
pixel_width: 0,
|
|
pixel_height: 0,
|
|
})
|
|
.map_err(|e| format!("Failed to open PTY: {e}"))?;
|
|
|
|
let mut cmd = CommandBuilder::new(command);
|
|
|
|
// -p <prompt> must come first
|
|
cmd.arg("-p");
|
|
cmd.arg(prompt);
|
|
|
|
// Add configured args (e.g., --directory /path/to/worktree, --model, etc.)
|
|
for arg in args {
|
|
cmd.arg(arg);
|
|
}
|
|
|
|
cmd.arg("--output-format");
|
|
cmd.arg("stream-json");
|
|
cmd.arg("--verbose");
|
|
// Enable partial streaming so we receive thinking_delta and text_delta
|
|
// events in real-time, rather than only complete assistant events.
|
|
// Without this, thinking traces may not appear in the structured output
|
|
// and instead leak as unstructured PTY text.
|
|
cmd.arg("--include-partial-messages");
|
|
|
|
// Supervised agents don't need interactive permission prompts
|
|
cmd.arg("--permission-mode");
|
|
cmd.arg("bypassPermissions");
|
|
|
|
cmd.cwd(cwd);
|
|
cmd.env("NO_COLOR", "1");
|
|
|
|
// Allow spawning Claude Code from within a Claude Code session
|
|
cmd.env_remove("CLAUDECODE");
|
|
cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
|
|
|
|
slog!("[agent:{story_id}:{agent_name}] Spawning {command} in {cwd} with args: {args:?}");
|
|
|
|
let mut child = pair
|
|
.slave
|
|
.spawn_command(cmd)
|
|
.map_err(|e| format!("Failed to spawn agent for {story_id}:{agent_name}: {e}"))?;
|
|
|
|
// Register the child killer so that kill_all_children() / stop_agent() can
|
|
// terminate this process on server shutdown, even if the blocking thread
|
|
// cannot be interrupted. The ChildKillerGuard deregisters on function exit.
|
|
let killer_key = composite_key(story_id, agent_name);
|
|
{
|
|
let killer = child.clone_killer();
|
|
if let Ok(mut killers) = child_killers.lock() {
|
|
killers.insert(killer_key.clone(), killer);
|
|
}
|
|
}
|
|
let _killer_guard = ChildKillerGuard {
|
|
killers: Arc::clone(child_killers),
|
|
key: killer_key,
|
|
};
|
|
|
|
drop(pair.slave);
|
|
|
|
let reader = pair
|
|
.master
|
|
.try_clone_reader()
|
|
.map_err(|e| format!("Failed to clone PTY reader: {e}"))?;
|
|
|
|
drop(pair.master);
|
|
|
|
// Spawn a reader thread to collect PTY output lines.
|
|
// We use a channel so the main thread can apply an inactivity deadline
|
|
// via recv_timeout: if no output arrives within the configured window
|
|
// the process is killed and the agent is marked Failed.
|
|
let (line_tx, line_rx) = std::sync::mpsc::channel::<std::io::Result<String>>();
|
|
std::thread::spawn(move || {
|
|
let buf_reader = BufReader::new(reader);
|
|
for line in buf_reader.lines() {
|
|
if line_tx.send(line).is_err() {
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
let timeout_dur = if inactivity_timeout_secs > 0 {
|
|
Some(std::time::Duration::from_secs(inactivity_timeout_secs))
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let mut session_id: Option<String> = None;
|
|
|
|
loop {
|
|
let recv_result = match timeout_dur {
|
|
Some(dur) => line_rx.recv_timeout(dur),
|
|
None => line_rx
|
|
.recv()
|
|
.map_err(|_| std::sync::mpsc::RecvTimeoutError::Disconnected),
|
|
};
|
|
|
|
let line = match recv_result {
|
|
Ok(Ok(l)) => l,
|
|
Ok(Err(_)) => {
|
|
// IO error reading from PTY — treat as EOF.
|
|
break;
|
|
}
|
|
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
|
|
// Reader thread exited (EOF from PTY).
|
|
break;
|
|
}
|
|
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
|
|
slog_warn!(
|
|
"[agent:{story_id}:{agent_name}] Inactivity timeout after \
|
|
{inactivity_timeout_secs}s with no output. Killing process."
|
|
);
|
|
let _ = child.kill();
|
|
let _ = child.wait();
|
|
return Err(format!(
|
|
"Agent inactivity timeout: no output received for {inactivity_timeout_secs}s"
|
|
));
|
|
}
|
|
};
|
|
|
|
let trimmed = line.trim();
|
|
if trimmed.is_empty() {
|
|
continue;
|
|
}
|
|
|
|
// Try to parse as JSON
|
|
let json: serde_json::Value = match serde_json::from_str(trimmed) {
|
|
Ok(j) => j,
|
|
Err(_) => {
|
|
// Non-JSON output (terminal escapes etc.) — send as raw output
|
|
emit_event(
|
|
AgentEvent::Output {
|
|
story_id: story_id.to_string(),
|
|
agent_name: agent_name.to_string(),
|
|
text: trimmed.to_string(),
|
|
},
|
|
tx,
|
|
event_log,
|
|
log_writer,
|
|
);
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let event_type = json.get("type").and_then(|t| t.as_str()).unwrap_or("");
|
|
|
|
match event_type {
|
|
"system" => {
|
|
session_id = json
|
|
.get("session_id")
|
|
.and_then(|s| s.as_str())
|
|
.map(|s| s.to_string());
|
|
}
|
|
// With --include-partial-messages, thinking and text arrive
|
|
// incrementally via stream_event → content_block_delta. Handle
|
|
// them here for real-time streaming to the frontend.
|
|
"stream_event" => {
|
|
if let Some(event) = json.get("event") {
|
|
handle_agent_stream_event(
|
|
event,
|
|
story_id,
|
|
agent_name,
|
|
tx,
|
|
event_log,
|
|
log_writer,
|
|
);
|
|
}
|
|
}
|
|
// Complete assistant events are skipped for content extraction
|
|
// because thinking and text already arrived via stream_event.
|
|
// The raw JSON is still forwarded as AgentJson below.
|
|
"assistant" | "user" | "result" => {}
|
|
_ => {}
|
|
}
|
|
|
|
// Forward all JSON events
|
|
emit_event(
|
|
AgentEvent::AgentJson {
|
|
story_id: story_id.to_string(),
|
|
agent_name: agent_name.to_string(),
|
|
data: json,
|
|
},
|
|
tx,
|
|
event_log,
|
|
log_writer,
|
|
);
|
|
}
|
|
|
|
let _ = child.kill();
|
|
let _ = child.wait();
|
|
|
|
slog!(
|
|
"[agent:{story_id}:{agent_name}] Done. Session: {:?}",
|
|
session_id
|
|
);
|
|
|
|
Ok(session_id)
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::agents::AgentEvent;
|
|
|
|
#[test]
|
|
fn test_emit_event_writes_to_log_writer() {
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let root = tmp.path();
|
|
|
|
let log_writer =
|
|
AgentLogWriter::new(root, "42_story_foo", "coder-1", "sess-emit").unwrap();
|
|
let log_mutex = Mutex::new(log_writer);
|
|
|
|
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
|
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
|
|
|
let event = AgentEvent::Status {
|
|
story_id: "42_story_foo".to_string(),
|
|
agent_name: "coder-1".to_string(),
|
|
status: "running".to_string(),
|
|
};
|
|
|
|
emit_event(event, &tx, &event_log, Some(&log_mutex));
|
|
|
|
// Verify event was added to in-memory log
|
|
let mem_events = event_log.lock().unwrap();
|
|
assert_eq!(mem_events.len(), 1);
|
|
drop(mem_events);
|
|
|
|
// Verify event was written to the log file
|
|
let log_path =
|
|
crate::agent_log::log_file_path(root, "42_story_foo", "coder-1", "sess-emit");
|
|
let entries = crate::agent_log::read_log(&log_path).unwrap();
|
|
assert_eq!(entries.len(), 1);
|
|
assert_eq!(entries[0].event["type"], "status");
|
|
assert_eq!(entries[0].event["status"], "running");
|
|
}
|
|
|
|
// ── bug 167: handle_agent_stream_event routes thinking/text correctly ───
|
|
|
|
#[test]
|
|
fn stream_event_thinking_delta_emits_thinking_event() {
|
|
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
|
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
|
|
|
let event = serde_json::json!({
|
|
"type": "content_block_delta",
|
|
"delta": {"type": "thinking_delta", "thinking": "Let me analyze this..."}
|
|
});
|
|
|
|
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
|
|
|
let received = rx.try_recv().unwrap();
|
|
match received {
|
|
AgentEvent::Thinking {
|
|
story_id,
|
|
agent_name,
|
|
text,
|
|
} => {
|
|
assert_eq!(story_id, "s1");
|
|
assert_eq!(agent_name, "coder-1");
|
|
assert_eq!(text, "Let me analyze this...");
|
|
}
|
|
other => panic!("Expected Thinking event, got: {other:?}"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn stream_event_text_delta_emits_output_event() {
|
|
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
|
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
|
|
|
let event = serde_json::json!({
|
|
"type": "content_block_delta",
|
|
"delta": {"type": "text_delta", "text": "Here is the result."}
|
|
});
|
|
|
|
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
|
|
|
let received = rx.try_recv().unwrap();
|
|
match received {
|
|
AgentEvent::Output {
|
|
story_id,
|
|
agent_name,
|
|
text,
|
|
} => {
|
|
assert_eq!(story_id, "s1");
|
|
assert_eq!(agent_name, "coder-1");
|
|
assert_eq!(text, "Here is the result.");
|
|
}
|
|
other => panic!("Expected Output event, got: {other:?}"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn stream_event_input_json_delta_ignored() {
|
|
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
|
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
|
|
|
let event = serde_json::json!({
|
|
"type": "content_block_delta",
|
|
"delta": {"type": "input_json_delta", "partial_json": "{\"file\":"}
|
|
});
|
|
|
|
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
|
|
|
// No event should be emitted for tool argument deltas
|
|
assert!(rx.try_recv().is_err());
|
|
}
|
|
|
|
#[test]
|
|
fn stream_event_non_delta_type_ignored() {
|
|
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
|
|
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
|
|
|
|
let event = serde_json::json!({
|
|
"type": "message_start",
|
|
"message": {"role": "assistant"}
|
|
});
|
|
|
|
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
|
|
|
|
assert!(rx.try_recv().is_err());
|
|
}
|
|
}
|