Files
huskies/server/src/agents/pty/mod.rs
T

377 lines
14 KiB
Rust
Raw Normal View History

2026-04-28 16:34:16 +00:00
//! PTY runner — spawns agent processes in pseudo-terminals and streams their output.
mod events;
mod runner;
mod types;
pub(in crate::agents) use events::emit_event;
pub(in crate::agents) use runner::run_agent_pty_streaming;
#[cfg(test)]
mod tests {
use super::events::handle_agent_stream_event;
use super::*;
use crate::agents::AgentEvent;
use crate::io::watcher::WatcherEvent;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
// ── AC1: pty detects rate_limit_event and emits RateLimitWarning ─────────
/// Verify that when a `rate_limit_event` JSON line appears in PTY output,
/// `run_agent_pty_streaming` sends a `WatcherEvent::RateLimitWarning` with
/// the correct story_id and agent_name.
///
/// The command invoked is: `sh -p -- <script>` where `--` terminates
/// option parsing so the script path is treated as the operand.
#[tokio::test]
async fn rate_limit_event_json_sends_watcher_warning() {
use std::os::unix::fs::PermissionsExt;
let tmp = tempfile::tempdir().unwrap();
let script = tmp.path().join("emit_rate_limit.sh");
std::fs::write(
&script,
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"allowed_warning\"}}'\n",
)
.unwrap();
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
let (watcher_tx, mut watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let event_log = Arc::new(Mutex::new(Vec::new()));
let child_killers = Arc::new(Mutex::new(HashMap::new()));
// sh -p "--" <script>: -p = privileged mode, "--" = end options,
// then the script path is the file operand.
let result = run_agent_pty_streaming(
"365_story_test",
"coder-1",
"sh",
&[script.to_string_lossy().to_string()],
"--",
"/tmp",
&tx,
&event_log,
None,
0,
child_killers,
watcher_tx,
None,
)
.await;
assert!(result.is_ok(), "PTY run should succeed: {:?}", result.err());
let evt = watcher_rx
.try_recv()
.expect("Expected a RateLimitWarning to be sent on watcher_tx");
match evt {
WatcherEvent::RateLimitWarning {
story_id,
agent_name,
} => {
assert_eq!(story_id, "365_story_test");
assert_eq!(agent_name, "coder-1");
}
other => panic!("Expected RateLimitWarning, got: {other:?}"),
}
}
/// AC1: hard block with `reset_at` emits `RateLimitHardBlock` with the
/// correct story_id, agent_name, and parsed reset_at timestamp.
#[tokio::test]
async fn rate_limit_hard_block_sends_watcher_hard_block_event() {
use std::os::unix::fs::PermissionsExt;
let tmp = tempfile::tempdir().unwrap();
let script = tmp.path().join("emit_hard_block.sh");
std::fs::write(
&script,
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"hard_block\",\"reset_at\":\"2099-01-01T12:00:00Z\"}}'\n",
)
.unwrap();
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
let (watcher_tx, mut watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let event_log = Arc::new(Mutex::new(Vec::new()));
let child_killers = Arc::new(Mutex::new(HashMap::new()));
let result = run_agent_pty_streaming(
"423_story_rate_limit",
"coder-1",
"sh",
&[script.to_string_lossy().to_string()],
"--",
"/tmp",
&tx,
&event_log,
None,
0,
child_killers,
watcher_tx,
None,
)
.await;
assert!(result.is_ok(), "PTY run should succeed: {:?}", result.err());
let evt = watcher_rx
.try_recv()
.expect("Expected a RateLimitHardBlock to be sent on watcher_tx");
match evt {
WatcherEvent::RateLimitHardBlock {
story_id,
agent_name,
reset_at,
} => {
assert_eq!(story_id, "423_story_rate_limit");
assert_eq!(agent_name, "coder-1");
assert_eq!(
reset_at.to_rfc3339(),
"2099-01-01T12:00:00+00:00",
"reset_at should match the parsed timestamp"
);
}
other => panic!("Expected RateLimitHardBlock, got: {other:?}"),
}
}
/// Bug 496: hard block WITHOUT `reset_at` must still emit `RateLimitHardBlock`
/// (not `RateLimitWarning`), using a default 5-minute backoff so the
/// auto-scheduler can set a retry timer.
#[tokio::test]
async fn rate_limit_hard_block_without_reset_at_sends_hard_block_event() {
use std::os::unix::fs::PermissionsExt;
let tmp = tempfile::tempdir().unwrap();
let script = tmp.path().join("emit_hard_block_no_reset.sh");
std::fs::write(
&script,
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"rejected\"}}'\n",
)
.unwrap();
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
let (watcher_tx, mut watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let event_log = Arc::new(Mutex::new(Vec::new()));
let child_killers = Arc::new(Mutex::new(HashMap::new()));
let before = chrono::Utc::now();
let result = run_agent_pty_streaming(
"496_bug_hard_rate_limit",
"coder-1",
"sh",
&[script.to_string_lossy().to_string()],
"--",
"/tmp",
&tx,
&event_log,
None,
0,
child_killers,
watcher_tx,
None,
)
.await;
let after = chrono::Utc::now();
assert!(result.is_ok(), "PTY run should succeed: {:?}", result.err());
let evt = watcher_rx
.try_recv()
.expect("Expected a RateLimitHardBlock to be sent on watcher_tx");
match evt {
WatcherEvent::RateLimitHardBlock {
story_id,
agent_name,
reset_at,
} => {
assert_eq!(story_id, "496_bug_hard_rate_limit");
assert_eq!(agent_name, "coder-1");
// reset_at should be ~5 minutes from when the event fired
let min_expected = before + chrono::Duration::minutes(4);
let max_expected = after + chrono::Duration::minutes(6);
assert!(
reset_at >= min_expected && reset_at <= max_expected,
"reset_at {reset_at} should be ~5 minutes from now"
);
}
other => panic!("Expected RateLimitHardBlock (with default backoff), got: {other:?}"),
}
}
/// Story 916: a rate-limit hard block must extend the inactivity deadline
/// by the backoff duration so the watchdog doesn't kill the agent while
/// it's legitimately waiting for the limit to clear.
///
/// Script emits a hard-block event with reset_at in the far future, then
/// sleeps 3s, then exits. With inactivity_timeout_secs = 1, the run
/// would normally fail at the 1s mark; with the extension the deadline
/// is bumped past the sleep and the script completes cleanly. The
/// far-future reset_at avoids wall-clock races under cargo-test load.
#[tokio::test]
async fn rate_limit_hard_block_extends_inactivity_deadline() {
use std::os::unix::fs::PermissionsExt;
let tmp = tempfile::tempdir().unwrap();
let script = tmp.path().join("emit_then_wait.sh");
let body = "#!/bin/sh\nprintf '%s\\n' '{\"type\":\"rate_limit_event\",\"rate_limit_info\":{\"status\":\"hard_block\",\"reset_at\":\"2099-01-01T12:00:00Z\"}}'\nsleep 3\n";
std::fs::write(&script, body).unwrap();
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
let (watcher_tx, _watcher_rx) = broadcast::channel::<WatcherEvent>(16);
let event_log = Arc::new(Mutex::new(Vec::new()));
let child_killers = Arc::new(Mutex::new(HashMap::new()));
let result = run_agent_pty_streaming(
"916_story_rate_limit_extension",
"mergemaster",
"sh",
&[script.to_string_lossy().to_string()],
"--",
"/tmp",
&tx,
&event_log,
None,
1, // inactivity_timeout_secs = 1s; would expire before the 3s sleep without the extension
child_killers,
watcher_tx,
None,
)
.await;
assert!(
result.is_ok(),
"PTY run should not be killed by inactivity timeout during rate-limit block: {:?}",
result.err()
);
}
2026-04-28 16:34:16 +00:00
#[test]
fn test_emit_event_writes_to_log_writer() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let log_writer =
crate::agent_log::AgentLogWriter::new(root, "42_story_foo", "coder-1", "sess-emit")
.unwrap();
let log_mutex = Mutex::new(log_writer);
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
let event = AgentEvent::Status {
story_id: "42_story_foo".to_string(),
agent_name: "coder-1".to_string(),
status: "running".to_string(),
};
emit_event(event, &tx, &event_log, Some(&log_mutex));
// Verify event was added to in-memory log
let mem_events = event_log.lock().unwrap();
assert_eq!(mem_events.len(), 1);
drop(mem_events);
// Verify event was written to the log file
let log_path =
crate::agent_log::log_file_path(root, "42_story_foo", "coder-1", "sess-emit");
let entries = crate::agent_log::read_log(&log_path).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].event["type"], "status");
assert_eq!(entries[0].event["status"], "running");
}
// ── bug 167: handle_agent_stream_event routes thinking/text correctly ───
#[test]
fn stream_event_thinking_delta_emits_thinking_event() {
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
let event = serde_json::json!({
"type": "content_block_delta",
"delta": {"type": "thinking_delta", "thinking": "Let me analyze this..."}
});
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
let received = rx.try_recv().unwrap();
match received {
AgentEvent::Thinking {
story_id,
agent_name,
text,
} => {
assert_eq!(story_id, "s1");
assert_eq!(agent_name, "coder-1");
assert_eq!(text, "Let me analyze this...");
}
other => panic!("Expected Thinking event, got: {other:?}"),
}
}
#[test]
fn stream_event_text_delta_emits_output_event() {
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
let event = serde_json::json!({
"type": "content_block_delta",
"delta": {"type": "text_delta", "text": "Here is the result."}
});
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
let received = rx.try_recv().unwrap();
match received {
AgentEvent::Output {
story_id,
agent_name,
text,
} => {
assert_eq!(story_id, "s1");
assert_eq!(agent_name, "coder-1");
assert_eq!(text, "Here is the result.");
}
other => panic!("Expected Output event, got: {other:?}"),
}
}
#[test]
fn stream_event_input_json_delta_ignored() {
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
let event = serde_json::json!({
"type": "content_block_delta",
"delta": {"type": "input_json_delta", "partial_json": "{\"file\":"}
});
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
// No event should be emitted for tool argument deltas
assert!(rx.try_recv().is_err());
}
#[test]
fn stream_event_non_delta_type_ignored() {
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
let event = serde_json::json!({
"type": "message_start",
"message": {"role": "assistant"}
});
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
assert!(rx.try_recv().is_err());
}
}