Fix: remove agent from pool immediately on completion and add Matrix bot user allowlist

This commit is contained in:
Dave
2026-02-25 14:59:20 +00:00
parent 93eaac3ab9
commit ebcd627a45
5 changed files with 140 additions and 296 deletions

View File

@@ -2,4 +2,5 @@ homeserver = "https://matrix.example.com"
username = "@botname:example.com" username = "@botname:example.com"
password = "your-bot-password" password = "your-bot-password"
room_id = "!roomid:example.com" room_id = "!roomid:example.com"
allowed_users = ["@youruser:example.com"]
enabled = false enabled = false

View File

@@ -12,12 +12,8 @@ use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::Command; use std::process::Command;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Instant;
use tokio::sync::broadcast; use tokio::sync::broadcast;
/// Default TTL for completed/failed agent entries: 1 hour.
pub const DEFAULT_AGENT_TTL_SECS: u64 = 3600;
/// Events emitted during server startup reconciliation to broadcast real-time /// Events emitted during server startup reconciliation to broadcast real-time
/// progress to connected WebSocket clients. /// progress to connected WebSocket clients.
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
@@ -221,9 +217,6 @@ struct StoryAgent {
project_root: Option<PathBuf>, project_root: Option<PathBuf>,
/// UUID identifying the log file for this session. /// UUID identifying the log file for this session.
log_session_id: Option<String>, log_session_id: Option<String>,
/// Timestamp when the agent entered a terminal state (Completed/Failed).
/// Used by the TTL reaper to remove stale entries.
completed_at: Option<Instant>,
} }
/// Build an `AgentInfo` snapshot from a `StoryAgent` map entry. /// Build an `AgentInfo` snapshot from a `StoryAgent` map entry.
@@ -439,7 +432,6 @@ impl AgentPool {
completion: None, completion: None,
project_root: Some(project_root.to_path_buf()), project_root: Some(project_root.to_path_buf()),
log_session_id: Some(log_session_id.clone()), log_session_id: Some(log_session_id.clone()),
completed_at: None,
}, },
); );
} }
@@ -494,17 +486,14 @@ impl AgentPool {
Ok(wt) => wt, Ok(wt) => wt,
Err(e) => { Err(e) => {
// Worktree creation failed — mark agent as Failed so the UI shows the error. // Worktree creation failed — mark agent as Failed so the UI shows the error.
if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&key_clone)
{
agent.status = AgentStatus::Failed;
agent.completed_at = Some(Instant::now());
}
let _ = tx_clone.send(AgentEvent::Error { let _ = tx_clone.send(AgentEvent::Error {
story_id: sid.clone(), story_id: sid.clone(),
agent_name: aname.clone(), agent_name: aname.clone(),
message: format!("Failed to create worktree: {e}"), message: format!("Failed to create worktree: {e}"),
}); });
if let Ok(mut agents) = agents_ref.lock() {
agents.remove(&key_clone);
}
Self::notify_agent_state_changed(&watcher_tx_clone); Self::notify_agent_state_changed(&watcher_tx_clone);
return; return;
} }
@@ -528,17 +517,14 @@ impl AgentPool {
) { ) {
Ok(result) => result, Ok(result) => result,
Err(e) => { Err(e) => {
if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&key_clone)
{
agent.status = AgentStatus::Failed;
agent.completed_at = Some(Instant::now());
}
let _ = tx_clone.send(AgentEvent::Error { let _ = tx_clone.send(AgentEvent::Error {
story_id: sid.clone(), story_id: sid.clone(),
agent_name: aname.clone(), agent_name: aname.clone(),
message: format!("Failed to render agent args: {e}"), message: format!("Failed to render agent args: {e}"),
}); });
if let Ok(mut agents) = agents_ref.lock() {
agents.remove(&key_clone);
}
Self::notify_agent_state_changed(&watcher_tx_clone); Self::notify_agent_state_changed(&watcher_tx_clone);
return; return;
} }
@@ -595,17 +581,14 @@ impl AgentPool {
Self::notify_agent_state_changed(&watcher_tx_clone); Self::notify_agent_state_changed(&watcher_tx_clone);
} }
Err(e) => { Err(e) => {
if let Ok(mut agents) = agents_ref.lock()
&& let Some(agent) = agents.get_mut(&key_clone)
{
agent.status = AgentStatus::Failed;
agent.completed_at = Some(Instant::now());
}
let _ = tx_clone.send(AgentEvent::Error { let _ = tx_clone.send(AgentEvent::Error {
story_id: sid.clone(), story_id: sid.clone(),
agent_name: aname.clone(), agent_name: aname.clone(),
message: e, message: e,
}); });
if let Ok(mut agents) = agents_ref.lock() {
agents.remove(&key_clone);
}
Self::notify_agent_state_changed(&watcher_tx_clone); Self::notify_agent_state_changed(&watcher_tx_clone);
} }
} }
@@ -860,35 +843,14 @@ impl AgentPool {
/// - **Mergemaster** → run `script/test` on master; if pass: archive + cleanup worktree; /// - **Mergemaster** → run `script/test` on master; if pass: archive + cleanup worktree;
/// if fail: restart `mergemaster` with failure context. /// if fail: restart `mergemaster` with failure context.
/// - **Other** (supervisor, unknown) → no automatic advancement. /// - **Other** (supervisor, unknown) → no automatic advancement.
async fn run_pipeline_advance_for_completed_agent(&self, story_id: &str, agent_name: &str) { async fn run_pipeline_advance(
let key = composite_key(story_id, agent_name); &self,
story_id: &str,
let (completion, project_root, worktree_path) = { agent_name: &str,
let agents = match self.agents.lock() { completion: CompletionReport,
Ok(a) => a, project_root: Option<PathBuf>,
Err(e) => { worktree_path: Option<PathBuf>,
slog_error!("[pipeline] Failed to lock agents for '{story_id}:{agent_name}': {e}"); ) {
return;
}
};
let agent = match agents.get(&key) {
Some(a) => a,
None => return,
};
let wt_path = agent
.worktree_info
.as_ref()
.map(|wt| wt.path.clone());
(agent.completion.clone(), agent.project_root.clone(), wt_path)
};
let completion = match completion {
Some(c) => c,
None => {
slog_warn!("[pipeline] No completion report for '{story_id}:{agent_name}'");
return;
}
};
let project_root = match project_root { let project_root = match project_root {
Some(p) => p, Some(p) => p,
None => { None => {
@@ -1143,20 +1105,20 @@ impl AgentPool {
gate_output, gate_output,
}; };
// Store the completion report and advance status. // Extract data for pipeline advance, then remove the entry so
let (tx, session_id) = { // completed agents never appear in list_agents.
let (tx, session_id, project_root_for_advance, wt_path_for_advance) = {
let mut agents = self.agents.lock().map_err(|e| e.to_string())?; let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents.get_mut(&key).ok_or_else(|| { let agent = agents.get_mut(&key).ok_or_else(|| {
format!("Agent '{agent_name}' for story '{story_id}' disappeared during gate check") format!("Agent '{agent_name}' for story '{story_id}' disappeared during gate check")
})?; })?;
agent.completion = Some(report.clone()); agent.completion = Some(report.clone());
agent.status = if gates_passed { let tx = agent.tx.clone();
AgentStatus::Completed let sid = agent.session_id.clone();
} else { let pr = agent.project_root.clone();
AgentStatus::Failed let wt = agent.worktree_info.as_ref().map(|w| w.path.clone());
}; agents.remove(&key);
agent.completed_at = Some(Instant::now()); (tx, sid, pr, wt)
(agent.tx.clone(), agent.session_id.clone())
}; };
// Emit Done so wait_for_agent unblocks. // Emit Done so wait_for_agent unblocks.
@@ -1166,9 +1128,10 @@ impl AgentPool {
session_id, session_id,
}); });
// Notify WebSocket clients that the agent is gone.
Self::notify_agent_state_changed(&self.watcher_tx);
// Advance the pipeline state machine in a background task. // Advance the pipeline state machine in a background task.
// Only advance when the agent completed (not failed) to avoid spurious restarts
// from agents that never ran acceptance gates properly.
let pool_clone = Self { let pool_clone = Self {
agents: Arc::clone(&self.agents), agents: Arc::clone(&self.agents),
port: self.port, port: self.port,
@@ -1177,9 +1140,16 @@ impl AgentPool {
}; };
let sid = story_id.to_string(); let sid = story_id.to_string();
let aname = agent_name.to_string(); let aname = agent_name.to_string();
let report_for_advance = report.clone();
tokio::spawn(async move { tokio::spawn(async move {
pool_clone pool_clone
.run_pipeline_advance_for_completed_agent(&sid, &aname) .run_pipeline_advance(
&sid,
&aname,
report_for_advance,
project_root_for_advance,
wt_path_for_advance,
)
.await; .await;
}); });
@@ -1295,11 +1265,6 @@ impl AgentPool {
agent_name: &str, agent_name: &str,
status: AgentStatus, status: AgentStatus,
) -> broadcast::Sender<AgentEvent> { ) -> broadcast::Sender<AgentEvent> {
let completed_at = if matches!(status, AgentStatus::Completed | AgentStatus::Failed) {
Some(Instant::now())
} else {
None
};
let (tx, _) = broadcast::channel::<AgentEvent>(64); let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name); let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap(); let mut agents = self.agents.lock().unwrap();
@@ -1316,7 +1281,6 @@ impl AgentPool {
completion: None, completion: None,
project_root: None, project_root: None,
log_session_id: None, log_session_id: None,
completed_at,
}, },
); );
tx tx
@@ -1332,11 +1296,6 @@ impl AgentPool {
status: AgentStatus, status: AgentStatus,
worktree_path: PathBuf, worktree_path: PathBuf,
) -> broadcast::Sender<AgentEvent> { ) -> broadcast::Sender<AgentEvent> {
let completed_at = if matches!(status, AgentStatus::Completed | AgentStatus::Failed) {
Some(Instant::now())
} else {
None
};
let (tx, _) = broadcast::channel::<AgentEvent>(64); let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name); let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap(); let mut agents = self.agents.lock().unwrap();
@@ -1357,7 +1316,6 @@ impl AgentPool {
completion: None, completion: None,
project_root: None, project_root: None,
log_session_id: None, log_session_id: None,
completed_at,
}, },
); );
tx tx
@@ -1684,11 +1642,6 @@ impl AgentPool {
project_root: PathBuf, project_root: PathBuf,
completion: CompletionReport, completion: CompletionReport,
) -> broadcast::Sender<AgentEvent> { ) -> broadcast::Sender<AgentEvent> {
let completed_at = if matches!(status, AgentStatus::Completed | AgentStatus::Failed) {
Some(Instant::now())
} else {
None
};
let (tx, _) = broadcast::channel::<AgentEvent>(64); let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name); let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap(); let mut agents = self.agents.lock().unwrap();
@@ -1705,7 +1658,6 @@ impl AgentPool {
completion: Some(completion), completion: Some(completion),
project_root: Some(project_root), project_root: Some(project_root),
log_session_id: None, log_session_id: None,
completed_at,
}, },
); );
tx tx
@@ -1737,7 +1689,6 @@ impl AgentPool {
completion: None, completion: None,
project_root: None, project_root: None,
log_session_id: None, log_session_id: None,
completed_at: None,
}, },
); );
tx tx
@@ -1818,37 +1769,6 @@ impl AgentPool {
} }
count count
} }
/// Reap agent entries in terminal states (Completed/Failed) whose `completed_at`
/// timestamp is older than `ttl`. Returns the number of entries reaped.
pub fn reap_expired_agents(&self, ttl: std::time::Duration) -> usize {
let mut agents = match self.agents.lock() {
Ok(a) => a,
Err(e) => {
slog_warn!("[reaper] Failed to lock pool for TTL reaping: {e}");
return 0;
}
};
let now = Instant::now();
let keys_to_remove: Vec<String> = agents
.iter()
.filter(|(_, agent)| {
matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed)
&& agent
.completed_at
.is_some_and(|t| now.duration_since(t) >= ttl)
})
.map(|(k, _)| k.clone())
.collect();
let count = keys_to_remove.len();
for key in &keys_to_remove {
agents.remove(key);
}
if count > 0 {
slog!("[reaper] Reaped {count} expired agent entries (TTL: {}s)", ttl.as_secs());
}
count
}
} }
/// Return the active pipeline stage directory name for `story_id`, or `None` if the /// Return the active pipeline stage directory name for `story_id`, or `None` if the
@@ -2068,8 +1988,9 @@ async fn run_server_owned_completion(
gate_output, gate_output,
}; };
// Store completion report and set status. // Store completion report, extract data for pipeline advance, then
let tx = { // remove the entry so completed agents never appear in list_agents.
let (tx, project_root_for_advance, wt_path_for_advance) = {
let mut lock = match agents.lock() { let mut lock = match agents.lock() {
Ok(a) => a, Ok(a) => a,
Err(_) => return, Err(_) => return,
@@ -2078,15 +1999,13 @@ async fn run_server_owned_completion(
Some(a) => a, Some(a) => a,
None => return, None => return,
}; };
agent.completion = Some(report); agent.completion = Some(report.clone());
agent.session_id = session_id.clone(); agent.session_id = session_id.clone();
agent.status = if gates_passed { let tx = agent.tx.clone();
AgentStatus::Completed let pr = agent.project_root.clone();
} else { let wt = agent.worktree_info.as_ref().map(|w| w.path.clone());
AgentStatus::Failed lock.remove(&key);
}; (tx, pr, wt)
agent.completed_at = Some(Instant::now());
agent.tx.clone()
}; };
// Emit Done so wait_for_agent unblocks. // Emit Done so wait_for_agent unblocks.
@@ -2096,20 +2015,35 @@ async fn run_server_owned_completion(
session_id, session_id,
}); });
// Notify WebSocket clients that the agent is gone.
AgentPool::notify_agent_state_changed(&watcher_tx);
// Advance the pipeline state machine in a background task. // Advance the pipeline state machine in a background task.
// Uses a non-async helper to break the opaque type cycle. spawn_pipeline_advance(
spawn_pipeline_advance(Arc::clone(agents), port, story_id, agent_name, watcher_tx); Arc::clone(agents),
port,
story_id,
agent_name,
report,
project_root_for_advance,
wt_path_for_advance,
watcher_tx,
);
} }
/// Spawn pipeline advancement as a background task. /// Spawn pipeline advancement as a background task.
/// ///
/// This is a **non-async** function so it does not participate in the opaque /// This is a **non-async** function so it does not participate in the opaque
/// type cycle between `start_agent` and `run_server_owned_completion`. /// type cycle between `start_agent` and `run_server_owned_completion`.
#[allow(clippy::too_many_arguments)]
fn spawn_pipeline_advance( fn spawn_pipeline_advance(
agents: Arc<Mutex<HashMap<String, StoryAgent>>>, agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
port: u16, port: u16,
story_id: &str, story_id: &str,
agent_name: &str, agent_name: &str,
completion: CompletionReport,
project_root: Option<PathBuf>,
worktree_path: Option<PathBuf>,
watcher_tx: broadcast::Sender<WatcherEvent>, watcher_tx: broadcast::Sender<WatcherEvent>,
) { ) {
let sid = story_id.to_string(); let sid = story_id.to_string();
@@ -2121,7 +2055,7 @@ fn spawn_pipeline_advance(
child_killers: Arc::new(Mutex::new(HashMap::new())), child_killers: Arc::new(Mutex::new(HashMap::new())),
watcher_tx, watcher_tx,
}; };
pool.run_pipeline_advance_for_completed_agent(&sid, &aname) pool.run_pipeline_advance(&sid, &aname, completion, project_root, worktree_path)
.await; .await;
}); });
} }
@@ -3714,35 +3648,23 @@ mod tests {
run_server_owned_completion(&pool.agents, pool.port, "s11", "coder-1", Some("sess-2".to_string()), pool.watcher_tx.clone()) run_server_owned_completion(&pool.agents, pool.port, "s11", "coder-1", Some("sess-2".to_string()), pool.watcher_tx.clone())
.await; .await;
// Completion report should exist (gates were run, though they may fail // Agent entry should be removed from the map after completion.
// because this is not a real Cargo project).
let agents = pool.agents.lock().unwrap(); let agents = pool.agents.lock().unwrap();
let key = composite_key("s11", "coder-1"); let key = composite_key("s11", "coder-1");
let agent = agents.get(&key).unwrap();
assert!( assert!(
agent.completion.is_some(), agents.get(&key).is_none(),
"completion report should be created" "agent should be removed from map after completion"
);
assert_eq!(
agent.completion.as_ref().unwrap().summary,
"Agent process exited normally"
);
// Session ID should be stored.
assert_eq!(agent.session_id, Some("sess-2".to_string()));
// Status should be terminal (Completed or Failed depending on gate results).
assert!(
agent.status == AgentStatus::Completed || agent.status == AgentStatus::Failed,
"status should be terminal, got: {:?}",
agent.status
); );
drop(agents); drop(agents);
// A Done event should have been emitted. // A Done event should have been emitted with the session_id.
let event = rx.try_recv().expect("should emit Done event"); let event = rx.try_recv().expect("should emit Done event");
assert!( match &event {
matches!(event, AgentEvent::Done { .. }), AgentEvent::Done { session_id, .. } => {
"expected Done event, got: {event:?}" assert_eq!(*session_id, Some("sess-2".to_string()));
); }
other => panic!("expected Done event, got: {other:?}"),
}
} }
#[tokio::test] #[tokio::test]
@@ -3764,23 +3686,25 @@ mod tests {
repo.to_path_buf(), repo.to_path_buf(),
); );
let mut rx = pool.subscribe("s12", "coder-1").unwrap();
run_server_owned_completion(&pool.agents, pool.port, "s12", "coder-1", None, pool.watcher_tx.clone()) run_server_owned_completion(&pool.agents, pool.port, "s12", "coder-1", None, pool.watcher_tx.clone())
.await; .await;
// Agent entry should be removed from the map after completion (even on failure).
let agents = pool.agents.lock().unwrap(); let agents = pool.agents.lock().unwrap();
let key = composite_key("s12", "coder-1"); let key = composite_key("s12", "coder-1");
let agent = agents.get(&key).unwrap();
assert!(agent.completion.is_some());
assert!(!agent.completion.as_ref().unwrap().gates_passed);
assert_eq!(agent.status, AgentStatus::Failed);
assert!( assert!(
agent agents.get(&key).is_none(),
.completion "agent should be removed from map after failed completion"
.as_ref() );
.unwrap() drop(agents);
.gate_output
.contains("uncommitted"), // A Done event should have been emitted.
"gate_output should mention uncommitted changes" let event = rx.try_recv().expect("should emit Done event");
assert!(
matches!(event, AgentEvent::Done { .. }),
"expected Done event, got: {event:?}"
); );
} }
@@ -3949,20 +3873,18 @@ mod tests {
fs::write(current.join("50_story_test.md"), "test").unwrap(); fs::write(current.join("50_story_test.md"), "test").unwrap();
let pool = AgentPool::new_test(3001); let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_completion( // Call pipeline advance directly with completion data.
pool.run_pipeline_advance(
"50_story_test", "50_story_test",
"coder-1", "coder-1",
AgentStatus::Completed,
root.to_path_buf(),
CompletionReport { CompletionReport {
summary: "done".to_string(), summary: "done".to_string(),
gates_passed: true, gates_passed: true,
gate_output: String::new(), gate_output: String::new(),
}, },
); Some(root.to_path_buf()),
None,
// Call pipeline advance directly (bypasses background spawn for testing). )
pool.run_pipeline_advance_for_completed_agent("50_story_test", "coder-1")
.await; .await;
// Story should have moved to 3_qa/ (start_agent for qa will fail in tests but // Story should have moved to 3_qa/ (start_agent for qa will fail in tests but
@@ -3989,19 +3911,17 @@ mod tests {
fs::write(qa_dir.join("51_story_test.md"), "test").unwrap(); fs::write(qa_dir.join("51_story_test.md"), "test").unwrap();
let pool = AgentPool::new_test(3001); let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_completion( pool.run_pipeline_advance(
"51_story_test", "51_story_test",
"qa", "qa",
AgentStatus::Completed,
root.to_path_buf(),
CompletionReport { CompletionReport {
summary: "QA done".to_string(), summary: "QA done".to_string(),
gates_passed: true, gates_passed: true,
gate_output: String::new(), gate_output: String::new(),
}, },
); Some(root.to_path_buf()),
None,
pool.run_pipeline_advance_for_completed_agent("51_story_test", "qa") )
.await; .await;
// Story should have moved to 4_merge/ // Story should have moved to 4_merge/
@@ -4026,19 +3946,17 @@ mod tests {
fs::write(current.join("52_story_test.md"), "test").unwrap(); fs::write(current.join("52_story_test.md"), "test").unwrap();
let pool = AgentPool::new_test(3001); let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_completion( pool.run_pipeline_advance(
"52_story_test", "52_story_test",
"supervisor", "supervisor",
AgentStatus::Completed,
root.to_path_buf(),
CompletionReport { CompletionReport {
summary: "supervised".to_string(), summary: "supervised".to_string(),
gates_passed: true, gates_passed: true,
gate_output: String::new(), gate_output: String::new(),
}, },
); Some(root.to_path_buf()),
None,
pool.run_pipeline_advance_for_completed_agent("52_story_test", "supervisor") )
.await; .await;
// Story should NOT have moved (supervisors don't advance pipeline) // Story should NOT have moved (supervisors don't advance pipeline)
@@ -5913,93 +5831,6 @@ theirs
assert_eq!(agents.len(), 1, "existing agents should not be affected"); assert_eq!(agents.len(), 1, "existing agents should not be affected");
} }
// ── reap_expired_agents tests ────────────────────────────────────────────
#[test]
fn reap_expired_agents_removes_old_completed_entries() {
let pool = AgentPool::new_test(3001);
// Inject a completed agent with an artificial old completed_at.
{
let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key("old_story", "coder-1");
let mut agents = pool.agents.lock().unwrap();
agents.insert(
key,
StoryAgent {
agent_name: "coder-1".to_string(),
status: AgentStatus::Completed,
worktree_info: None,
session_id: None,
tx,
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: None,
project_root: None,
log_session_id: None,
// Set completed_at 2 hours ago.
completed_at: Some(Instant::now() - std::time::Duration::from_secs(7200)),
},
);
}
// Inject a recently completed agent.
pool.inject_test_agent("new_story", "coder-1", AgentStatus::Completed);
// Inject a running agent (should not be reaped).
pool.inject_test_agent("active_story", "coder-2", AgentStatus::Running);
// Reap with a 1-hour TTL — only the old entry should be removed.
let reaped = pool.reap_expired_agents(std::time::Duration::from_secs(3600));
assert_eq!(reaped, 1, "should reap only the old completed entry");
let agents = pool.list_agents().unwrap();
assert_eq!(agents.len(), 2, "new_story and active_story should remain");
assert!(
agents.iter().all(|a| a.story_id != "old_story"),
"old_story should have been reaped"
);
}
#[test]
fn reap_expired_agents_removes_old_failed_entries() {
let pool = AgentPool::new_test(3001);
// Inject a failed agent with an old completed_at.
{
let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key("failed_old", "coder-1");
let mut agents = pool.agents.lock().unwrap();
agents.insert(
key,
StoryAgent {
agent_name: "coder-1".to_string(),
status: AgentStatus::Failed,
worktree_info: None,
session_id: None,
tx,
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: None,
project_root: None,
log_session_id: None,
completed_at: Some(Instant::now() - std::time::Duration::from_secs(7200)),
},
);
}
let reaped = pool.reap_expired_agents(std::time::Duration::from_secs(3600));
assert_eq!(reaped, 1);
assert!(pool.list_agents().unwrap().is_empty());
}
#[test]
fn reap_expired_agents_skips_running_entries() {
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("running_story", "coder-1", AgentStatus::Running);
let reaped = pool.reap_expired_agents(std::time::Duration::from_secs(0));
assert_eq!(reaped, 0, "running agents should never be reaped");
}
// ── archive + cleanup integration test ─────────────────────────────────── // ── archive + cleanup integration test ───────────────────────────────────
#[tokio::test] #[tokio::test]
@@ -6431,18 +6262,6 @@ stage = "qa"
.unwrap(); .unwrap();
let pool = AgentPool::new_test(3001); let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_completion(
"173_story_test",
"coder-1",
AgentStatus::Completed,
root.to_path_buf(),
CompletionReport {
summary: "done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
);
// Subscribe to the watcher channel BEFORE the pipeline advance. // Subscribe to the watcher channel BEFORE the pipeline advance.
let mut rx = pool.watcher_tx.subscribe(); let mut rx = pool.watcher_tx.subscribe();
@@ -6451,7 +6270,17 @@ stage = "qa"
// 2. Start the QA agent (which calls notify_agent_state_changed) // 2. Start the QA agent (which calls notify_agent_state_changed)
// Note: the actual agent process will fail (no real worktree), but the // Note: the actual agent process will fail (no real worktree), but the
// agent insertion and notification happen before the background spawn. // agent insertion and notification happen before the background spawn.
pool.run_pipeline_advance_for_completed_agent("173_story_test", "coder-1") pool.run_pipeline_advance(
"173_story_test",
"coder-1",
CompletionReport {
summary: "done".to_string(),
gates_passed: true,
gate_output: String::new(),
},
Some(root.to_path_buf()),
None,
)
.await; .await;
// The pipeline advance should have sent AgentStateChanged events via // The pipeline advance should have sent AgentStateChanged events via

View File

@@ -127,20 +127,6 @@ async fn main() -> Result<(), std::io::Error> {
let app = build_routes(ctx); let app = build_routes(ctx);
// Background reaper: periodically remove completed/failed agent entries
// that have exceeded the TTL.
{
let reaper_agents = Arc::clone(&startup_agents);
let ttl = std::time::Duration::from_secs(agents::DEFAULT_AGENT_TTL_SECS);
tokio::spawn(async move {
// Check every 5 minutes.
let interval = std::time::Duration::from_secs(300);
loop {
tokio::time::sleep(interval).await;
reaper_agents.reap_expired_agents(ttl);
}
});
}
// Optional Matrix bot: connect to the homeserver and start listening for // Optional Matrix bot: connect to the homeserver and start listening for
// messages if `.story_kit/bot.toml` is present and enabled. // messages if `.story_kit/bot.toml` is present and enabled.

View File

@@ -25,6 +25,7 @@ pub struct BotContext {
pub bot_user_id: OwnedUserId, pub bot_user_id: OwnedUserId,
pub target_room_id: OwnedRoomId, pub target_room_id: OwnedRoomId,
pub project_root: PathBuf, pub project_root: PathBuf,
pub allowed_users: Vec<String>,
} }
/// Connect to the Matrix homeserver, join the configured room, and start /// Connect to the Matrix homeserver, join the configured room, and start
@@ -73,10 +74,24 @@ pub async fn run_bot(config: BotConfig, project_root: PathBuf) -> Result<(), Str
Err(_) => slog!("[matrix-bot] Join room timed out (may already be a member)"), Err(_) => slog!("[matrix-bot] Join room timed out (may already be a member)"),
} }
if config.allowed_users.is_empty() {
return Err(
"allowed_users is empty in bot.toml — refusing to start (fail-closed). \
Add at least one Matrix user ID to allowed_users."
.to_string(),
);
}
slog!(
"[matrix-bot] Allowed users: {:?}",
config.allowed_users
);
let ctx = BotContext { let ctx = BotContext {
bot_user_id, bot_user_id,
target_room_id, target_room_id,
project_root, project_root,
allowed_users: config.allowed_users,
}; };
// Register event handler and inject shared context // Register event handler and inject shared context
@@ -119,6 +134,15 @@ async fn on_room_message(
return; return;
} }
// Only respond to users on the allowlist (fail-closed)
if !ctx.allowed_users.iter().any(|u| u == ev.sender.as_str()) {
slog!(
"[matrix-bot] Ignoring message from unauthorised user: {}",
ev.sender
);
return;
}
// Only handle plain text messages // Only handle plain text messages
let MessageType::Text(text_content) = ev.content.msgtype else { let MessageType::Text(text_content) = ev.content.msgtype else {
return; return;

View File

@@ -15,6 +15,10 @@ pub struct BotConfig {
/// Set to `true` to enable the bot (default: false) /// Set to `true` to enable the bot (default: false)
#[serde(default)] #[serde(default)]
pub enabled: bool, pub enabled: bool,
/// Matrix user IDs allowed to interact with the bot.
/// If empty or omitted, the bot ignores ALL messages (fail-closed).
#[serde(default)]
pub allowed_users: Vec<String>,
/// Previously used to select an Anthropic model. Now ignored — the bot /// Previously used to select an Anthropic model. Now ignored — the bot
/// uses Claude Code which manages its own model selection. Kept for /// uses Claude Code which manages its own model selection. Kept for
/// backwards compatibility so existing bot.toml files still parse. /// backwards compatibility so existing bot.toml files still parse.