diff --git a/.story_kit/bot.toml.example b/.story_kit/bot.toml.example index 0288e39..f2679c1 100644 --- a/.story_kit/bot.toml.example +++ b/.story_kit/bot.toml.example @@ -2,4 +2,5 @@ homeserver = "https://matrix.example.com" username = "@botname:example.com" password = "your-bot-password" room_id = "!roomid:example.com" +allowed_users = ["@youruser:example.com"] enabled = false diff --git a/server/src/agents.rs b/server/src/agents.rs index 0f0eb09..a0f4760 100644 --- a/server/src/agents.rs +++ b/server/src/agents.rs @@ -12,12 +12,8 @@ use std::io::{BufRead, BufReader}; use std::path::{Path, PathBuf}; use std::process::Command; use std::sync::{Arc, Mutex}; -use std::time::Instant; use tokio::sync::broadcast; -/// Default TTL for completed/failed agent entries: 1 hour. -pub const DEFAULT_AGENT_TTL_SECS: u64 = 3600; - /// Events emitted during server startup reconciliation to broadcast real-time /// progress to connected WebSocket clients. #[derive(Debug, Clone, Serialize)] @@ -221,9 +217,6 @@ struct StoryAgent { project_root: Option, /// UUID identifying the log file for this session. log_session_id: Option, - /// Timestamp when the agent entered a terminal state (Completed/Failed). - /// Used by the TTL reaper to remove stale entries. - completed_at: Option, } /// Build an `AgentInfo` snapshot from a `StoryAgent` map entry. @@ -439,7 +432,6 @@ impl AgentPool { completion: None, project_root: Some(project_root.to_path_buf()), log_session_id: Some(log_session_id.clone()), - completed_at: None, }, ); } @@ -494,17 +486,14 @@ impl AgentPool { Ok(wt) => wt, Err(e) => { // Worktree creation failed — mark agent as Failed so the UI shows the error. - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Failed; - agent.completed_at = Some(Instant::now()); - } let _ = tx_clone.send(AgentEvent::Error { story_id: sid.clone(), agent_name: aname.clone(), message: format!("Failed to create worktree: {e}"), }); + if let Ok(mut agents) = agents_ref.lock() { + agents.remove(&key_clone); + } Self::notify_agent_state_changed(&watcher_tx_clone); return; } @@ -528,17 +517,14 @@ impl AgentPool { ) { Ok(result) => result, Err(e) => { - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Failed; - agent.completed_at = Some(Instant::now()); - } let _ = tx_clone.send(AgentEvent::Error { story_id: sid.clone(), agent_name: aname.clone(), message: format!("Failed to render agent args: {e}"), }); + if let Ok(mut agents) = agents_ref.lock() { + agents.remove(&key_clone); + } Self::notify_agent_state_changed(&watcher_tx_clone); return; } @@ -595,17 +581,14 @@ impl AgentPool { Self::notify_agent_state_changed(&watcher_tx_clone); } Err(e) => { - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Failed; - agent.completed_at = Some(Instant::now()); - } let _ = tx_clone.send(AgentEvent::Error { story_id: sid.clone(), agent_name: aname.clone(), message: e, }); + if let Ok(mut agents) = agents_ref.lock() { + agents.remove(&key_clone); + } Self::notify_agent_state_changed(&watcher_tx_clone); } } @@ -860,35 +843,14 @@ impl AgentPool { /// - **Mergemaster** → run `script/test` on master; if pass: archive + cleanup worktree; /// if fail: restart `mergemaster` with failure context. /// - **Other** (supervisor, unknown) → no automatic advancement. - async fn run_pipeline_advance_for_completed_agent(&self, story_id: &str, agent_name: &str) { - let key = composite_key(story_id, agent_name); - - let (completion, project_root, worktree_path) = { - let agents = match self.agents.lock() { - Ok(a) => a, - Err(e) => { - slog_error!("[pipeline] Failed to lock agents for '{story_id}:{agent_name}': {e}"); - return; - } - }; - let agent = match agents.get(&key) { - Some(a) => a, - None => return, - }; - let wt_path = agent - .worktree_info - .as_ref() - .map(|wt| wt.path.clone()); - (agent.completion.clone(), agent.project_root.clone(), wt_path) - }; - - let completion = match completion { - Some(c) => c, - None => { - slog_warn!("[pipeline] No completion report for '{story_id}:{agent_name}'"); - return; - } - }; + async fn run_pipeline_advance( + &self, + story_id: &str, + agent_name: &str, + completion: CompletionReport, + project_root: Option, + worktree_path: Option, + ) { let project_root = match project_root { Some(p) => p, None => { @@ -1143,20 +1105,20 @@ impl AgentPool { gate_output, }; - // Store the completion report and advance status. - let (tx, session_id) = { + // Extract data for pipeline advance, then remove the entry so + // completed agents never appear in list_agents. + let (tx, session_id, project_root_for_advance, wt_path_for_advance) = { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; let agent = agents.get_mut(&key).ok_or_else(|| { format!("Agent '{agent_name}' for story '{story_id}' disappeared during gate check") })?; agent.completion = Some(report.clone()); - agent.status = if gates_passed { - AgentStatus::Completed - } else { - AgentStatus::Failed - }; - agent.completed_at = Some(Instant::now()); - (agent.tx.clone(), agent.session_id.clone()) + let tx = agent.tx.clone(); + let sid = agent.session_id.clone(); + let pr = agent.project_root.clone(); + let wt = agent.worktree_info.as_ref().map(|w| w.path.clone()); + agents.remove(&key); + (tx, sid, pr, wt) }; // Emit Done so wait_for_agent unblocks. @@ -1166,9 +1128,10 @@ impl AgentPool { session_id, }); + // Notify WebSocket clients that the agent is gone. + Self::notify_agent_state_changed(&self.watcher_tx); + // Advance the pipeline state machine in a background task. - // Only advance when the agent completed (not failed) to avoid spurious restarts - // from agents that never ran acceptance gates properly. let pool_clone = Self { agents: Arc::clone(&self.agents), port: self.port, @@ -1177,9 +1140,16 @@ impl AgentPool { }; let sid = story_id.to_string(); let aname = agent_name.to_string(); + let report_for_advance = report.clone(); tokio::spawn(async move { pool_clone - .run_pipeline_advance_for_completed_agent(&sid, &aname) + .run_pipeline_advance( + &sid, + &aname, + report_for_advance, + project_root_for_advance, + wt_path_for_advance, + ) .await; }); @@ -1295,11 +1265,6 @@ impl AgentPool { agent_name: &str, status: AgentStatus, ) -> broadcast::Sender { - let completed_at = if matches!(status, AgentStatus::Completed | AgentStatus::Failed) { - Some(Instant::now()) - } else { - None - }; let (tx, _) = broadcast::channel::(64); let key = composite_key(story_id, agent_name); let mut agents = self.agents.lock().unwrap(); @@ -1316,7 +1281,6 @@ impl AgentPool { completion: None, project_root: None, log_session_id: None, - completed_at, }, ); tx @@ -1332,11 +1296,6 @@ impl AgentPool { status: AgentStatus, worktree_path: PathBuf, ) -> broadcast::Sender { - let completed_at = if matches!(status, AgentStatus::Completed | AgentStatus::Failed) { - Some(Instant::now()) - } else { - None - }; let (tx, _) = broadcast::channel::(64); let key = composite_key(story_id, agent_name); let mut agents = self.agents.lock().unwrap(); @@ -1357,7 +1316,6 @@ impl AgentPool { completion: None, project_root: None, log_session_id: None, - completed_at, }, ); tx @@ -1684,11 +1642,6 @@ impl AgentPool { project_root: PathBuf, completion: CompletionReport, ) -> broadcast::Sender { - let completed_at = if matches!(status, AgentStatus::Completed | AgentStatus::Failed) { - Some(Instant::now()) - } else { - None - }; let (tx, _) = broadcast::channel::(64); let key = composite_key(story_id, agent_name); let mut agents = self.agents.lock().unwrap(); @@ -1705,7 +1658,6 @@ impl AgentPool { completion: Some(completion), project_root: Some(project_root), log_session_id: None, - completed_at, }, ); tx @@ -1737,7 +1689,6 @@ impl AgentPool { completion: None, project_root: None, log_session_id: None, - completed_at: None, }, ); tx @@ -1818,37 +1769,6 @@ impl AgentPool { } count } - - /// Reap agent entries in terminal states (Completed/Failed) whose `completed_at` - /// timestamp is older than `ttl`. Returns the number of entries reaped. - pub fn reap_expired_agents(&self, ttl: std::time::Duration) -> usize { - let mut agents = match self.agents.lock() { - Ok(a) => a, - Err(e) => { - slog_warn!("[reaper] Failed to lock pool for TTL reaping: {e}"); - return 0; - } - }; - let now = Instant::now(); - let keys_to_remove: Vec = agents - .iter() - .filter(|(_, agent)| { - matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) - && agent - .completed_at - .is_some_and(|t| now.duration_since(t) >= ttl) - }) - .map(|(k, _)| k.clone()) - .collect(); - let count = keys_to_remove.len(); - for key in &keys_to_remove { - agents.remove(key); - } - if count > 0 { - slog!("[reaper] Reaped {count} expired agent entries (TTL: {}s)", ttl.as_secs()); - } - count - } } /// Return the active pipeline stage directory name for `story_id`, or `None` if the @@ -2068,8 +1988,9 @@ async fn run_server_owned_completion( gate_output, }; - // Store completion report and set status. - let tx = { + // Store completion report, extract data for pipeline advance, then + // remove the entry so completed agents never appear in list_agents. + let (tx, project_root_for_advance, wt_path_for_advance) = { let mut lock = match agents.lock() { Ok(a) => a, Err(_) => return, @@ -2078,15 +1999,13 @@ async fn run_server_owned_completion( Some(a) => a, None => return, }; - agent.completion = Some(report); + agent.completion = Some(report.clone()); agent.session_id = session_id.clone(); - agent.status = if gates_passed { - AgentStatus::Completed - } else { - AgentStatus::Failed - }; - agent.completed_at = Some(Instant::now()); - agent.tx.clone() + let tx = agent.tx.clone(); + let pr = agent.project_root.clone(); + let wt = agent.worktree_info.as_ref().map(|w| w.path.clone()); + lock.remove(&key); + (tx, pr, wt) }; // Emit Done so wait_for_agent unblocks. @@ -2096,20 +2015,35 @@ async fn run_server_owned_completion( session_id, }); + // Notify WebSocket clients that the agent is gone. + AgentPool::notify_agent_state_changed(&watcher_tx); + // Advance the pipeline state machine in a background task. - // Uses a non-async helper to break the opaque type cycle. - spawn_pipeline_advance(Arc::clone(agents), port, story_id, agent_name, watcher_tx); + spawn_pipeline_advance( + Arc::clone(agents), + port, + story_id, + agent_name, + report, + project_root_for_advance, + wt_path_for_advance, + watcher_tx, + ); } /// Spawn pipeline advancement as a background task. /// /// This is a **non-async** function so it does not participate in the opaque /// type cycle between `start_agent` and `run_server_owned_completion`. +#[allow(clippy::too_many_arguments)] fn spawn_pipeline_advance( agents: Arc>>, port: u16, story_id: &str, agent_name: &str, + completion: CompletionReport, + project_root: Option, + worktree_path: Option, watcher_tx: broadcast::Sender, ) { let sid = story_id.to_string(); @@ -2121,7 +2055,7 @@ fn spawn_pipeline_advance( child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx, }; - pool.run_pipeline_advance_for_completed_agent(&sid, &aname) + pool.run_pipeline_advance(&sid, &aname, completion, project_root, worktree_path) .await; }); } @@ -3714,35 +3648,23 @@ mod tests { run_server_owned_completion(&pool.agents, pool.port, "s11", "coder-1", Some("sess-2".to_string()), pool.watcher_tx.clone()) .await; - // Completion report should exist (gates were run, though they may fail - // because this is not a real Cargo project). + // Agent entry should be removed from the map after completion. let agents = pool.agents.lock().unwrap(); let key = composite_key("s11", "coder-1"); - let agent = agents.get(&key).unwrap(); assert!( - agent.completion.is_some(), - "completion report should be created" - ); - assert_eq!( - agent.completion.as_ref().unwrap().summary, - "Agent process exited normally" - ); - // Session ID should be stored. - assert_eq!(agent.session_id, Some("sess-2".to_string())); - // Status should be terminal (Completed or Failed depending on gate results). - assert!( - agent.status == AgentStatus::Completed || agent.status == AgentStatus::Failed, - "status should be terminal, got: {:?}", - agent.status + agents.get(&key).is_none(), + "agent should be removed from map after completion" ); drop(agents); - // A Done event should have been emitted. + // A Done event should have been emitted with the session_id. let event = rx.try_recv().expect("should emit Done event"); - assert!( - matches!(event, AgentEvent::Done { .. }), - "expected Done event, got: {event:?}" - ); + match &event { + AgentEvent::Done { session_id, .. } => { + assert_eq!(*session_id, Some("sess-2".to_string())); + } + other => panic!("expected Done event, got: {other:?}"), + } } #[tokio::test] @@ -3764,23 +3686,25 @@ mod tests { repo.to_path_buf(), ); + let mut rx = pool.subscribe("s12", "coder-1").unwrap(); + run_server_owned_completion(&pool.agents, pool.port, "s12", "coder-1", None, pool.watcher_tx.clone()) .await; + // Agent entry should be removed from the map after completion (even on failure). let agents = pool.agents.lock().unwrap(); let key = composite_key("s12", "coder-1"); - let agent = agents.get(&key).unwrap(); - assert!(agent.completion.is_some()); - assert!(!agent.completion.as_ref().unwrap().gates_passed); - assert_eq!(agent.status, AgentStatus::Failed); assert!( - agent - .completion - .as_ref() - .unwrap() - .gate_output - .contains("uncommitted"), - "gate_output should mention uncommitted changes" + agents.get(&key).is_none(), + "agent should be removed from map after failed completion" + ); + drop(agents); + + // A Done event should have been emitted. + let event = rx.try_recv().expect("should emit Done event"); + assert!( + matches!(event, AgentEvent::Done { .. }), + "expected Done event, got: {event:?}" ); } @@ -3949,20 +3873,18 @@ mod tests { fs::write(current.join("50_story_test.md"), "test").unwrap(); let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_completion( + // Call pipeline advance directly with completion data. + pool.run_pipeline_advance( "50_story_test", "coder-1", - AgentStatus::Completed, - root.to_path_buf(), CompletionReport { summary: "done".to_string(), gates_passed: true, gate_output: String::new(), }, - ); - - // Call pipeline advance directly (bypasses background spawn for testing). - pool.run_pipeline_advance_for_completed_agent("50_story_test", "coder-1") + Some(root.to_path_buf()), + None, + ) .await; // Story should have moved to 3_qa/ (start_agent for qa will fail in tests but @@ -3989,19 +3911,17 @@ mod tests { fs::write(qa_dir.join("51_story_test.md"), "test").unwrap(); let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_completion( + pool.run_pipeline_advance( "51_story_test", "qa", - AgentStatus::Completed, - root.to_path_buf(), CompletionReport { summary: "QA done".to_string(), gates_passed: true, gate_output: String::new(), }, - ); - - pool.run_pipeline_advance_for_completed_agent("51_story_test", "qa") + Some(root.to_path_buf()), + None, + ) .await; // Story should have moved to 4_merge/ @@ -4026,19 +3946,17 @@ mod tests { fs::write(current.join("52_story_test.md"), "test").unwrap(); let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_completion( + pool.run_pipeline_advance( "52_story_test", "supervisor", - AgentStatus::Completed, - root.to_path_buf(), CompletionReport { summary: "supervised".to_string(), gates_passed: true, gate_output: String::new(), }, - ); - - pool.run_pipeline_advance_for_completed_agent("52_story_test", "supervisor") + Some(root.to_path_buf()), + None, + ) .await; // Story should NOT have moved (supervisors don't advance pipeline) @@ -5913,93 +5831,6 @@ theirs assert_eq!(agents.len(), 1, "existing agents should not be affected"); } - // ── reap_expired_agents tests ──────────────────────────────────────────── - - #[test] - fn reap_expired_agents_removes_old_completed_entries() { - let pool = AgentPool::new_test(3001); - - // Inject a completed agent with an artificial old completed_at. - { - let (tx, _) = broadcast::channel::(64); - let key = composite_key("old_story", "coder-1"); - let mut agents = pool.agents.lock().unwrap(); - agents.insert( - key, - StoryAgent { - agent_name: "coder-1".to_string(), - status: AgentStatus::Completed, - worktree_info: None, - session_id: None, - tx, - task_handle: None, - event_log: Arc::new(Mutex::new(Vec::new())), - completion: None, - project_root: None, - log_session_id: None, - // Set completed_at 2 hours ago. - completed_at: Some(Instant::now() - std::time::Duration::from_secs(7200)), - }, - ); - } - // Inject a recently completed agent. - pool.inject_test_agent("new_story", "coder-1", AgentStatus::Completed); - // Inject a running agent (should not be reaped). - pool.inject_test_agent("active_story", "coder-2", AgentStatus::Running); - - // Reap with a 1-hour TTL — only the old entry should be removed. - let reaped = pool.reap_expired_agents(std::time::Duration::from_secs(3600)); - assert_eq!(reaped, 1, "should reap only the old completed entry"); - - let agents = pool.list_agents().unwrap(); - assert_eq!(agents.len(), 2, "new_story and active_story should remain"); - assert!( - agents.iter().all(|a| a.story_id != "old_story"), - "old_story should have been reaped" - ); - } - - #[test] - fn reap_expired_agents_removes_old_failed_entries() { - let pool = AgentPool::new_test(3001); - - // Inject a failed agent with an old completed_at. - { - let (tx, _) = broadcast::channel::(64); - let key = composite_key("failed_old", "coder-1"); - let mut agents = pool.agents.lock().unwrap(); - agents.insert( - key, - StoryAgent { - agent_name: "coder-1".to_string(), - status: AgentStatus::Failed, - worktree_info: None, - session_id: None, - tx, - task_handle: None, - event_log: Arc::new(Mutex::new(Vec::new())), - completion: None, - project_root: None, - log_session_id: None, - completed_at: Some(Instant::now() - std::time::Duration::from_secs(7200)), - }, - ); - } - - let reaped = pool.reap_expired_agents(std::time::Duration::from_secs(3600)); - assert_eq!(reaped, 1); - assert!(pool.list_agents().unwrap().is_empty()); - } - - #[test] - fn reap_expired_agents_skips_running_entries() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("running_story", "coder-1", AgentStatus::Running); - - let reaped = pool.reap_expired_agents(std::time::Duration::from_secs(0)); - assert_eq!(reaped, 0, "running agents should never be reaped"); - } - // ── archive + cleanup integration test ─────────────────────────────────── #[tokio::test] @@ -6431,18 +6262,6 @@ stage = "qa" .unwrap(); let pool = AgentPool::new_test(3001); - pool.inject_test_agent_with_completion( - "173_story_test", - "coder-1", - AgentStatus::Completed, - root.to_path_buf(), - CompletionReport { - summary: "done".to_string(), - gates_passed: true, - gate_output: String::new(), - }, - ); - // Subscribe to the watcher channel BEFORE the pipeline advance. let mut rx = pool.watcher_tx.subscribe(); @@ -6451,7 +6270,17 @@ stage = "qa" // 2. Start the QA agent (which calls notify_agent_state_changed) // Note: the actual agent process will fail (no real worktree), but the // agent insertion and notification happen before the background spawn. - pool.run_pipeline_advance_for_completed_agent("173_story_test", "coder-1") + pool.run_pipeline_advance( + "173_story_test", + "coder-1", + CompletionReport { + summary: "done".to_string(), + gates_passed: true, + gate_output: String::new(), + }, + Some(root.to_path_buf()), + None, + ) .await; // The pipeline advance should have sent AgentStateChanged events via diff --git a/server/src/main.rs b/server/src/main.rs index e679ff3..b390946 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -127,20 +127,6 @@ async fn main() -> Result<(), std::io::Error> { let app = build_routes(ctx); - // Background reaper: periodically remove completed/failed agent entries - // that have exceeded the TTL. - { - let reaper_agents = Arc::clone(&startup_agents); - let ttl = std::time::Duration::from_secs(agents::DEFAULT_AGENT_TTL_SECS); - tokio::spawn(async move { - // Check every 5 minutes. - let interval = std::time::Duration::from_secs(300); - loop { - tokio::time::sleep(interval).await; - reaper_agents.reap_expired_agents(ttl); - } - }); - } // Optional Matrix bot: connect to the homeserver and start listening for // messages if `.story_kit/bot.toml` is present and enabled. diff --git a/server/src/matrix/bot.rs b/server/src/matrix/bot.rs index 13705ce..59ee2df 100644 --- a/server/src/matrix/bot.rs +++ b/server/src/matrix/bot.rs @@ -25,6 +25,7 @@ pub struct BotContext { pub bot_user_id: OwnedUserId, pub target_room_id: OwnedRoomId, pub project_root: PathBuf, + pub allowed_users: Vec, } /// Connect to the Matrix homeserver, join the configured room, and start @@ -73,10 +74,24 @@ pub async fn run_bot(config: BotConfig, project_root: PathBuf) -> Result<(), Str Err(_) => slog!("[matrix-bot] Join room timed out (may already be a member)"), } + if config.allowed_users.is_empty() { + return Err( + "allowed_users is empty in bot.toml — refusing to start (fail-closed). \ + Add at least one Matrix user ID to allowed_users." + .to_string(), + ); + } + + slog!( + "[matrix-bot] Allowed users: {:?}", + config.allowed_users + ); + let ctx = BotContext { bot_user_id, target_room_id, project_root, + allowed_users: config.allowed_users, }; // Register event handler and inject shared context @@ -119,6 +134,15 @@ async fn on_room_message( return; } + // Only respond to users on the allowlist (fail-closed) + if !ctx.allowed_users.iter().any(|u| u == ev.sender.as_str()) { + slog!( + "[matrix-bot] Ignoring message from unauthorised user: {}", + ev.sender + ); + return; + } + // Only handle plain text messages let MessageType::Text(text_content) = ev.content.msgtype else { return; diff --git a/server/src/matrix/config.rs b/server/src/matrix/config.rs index d586ee1..a27cf4f 100644 --- a/server/src/matrix/config.rs +++ b/server/src/matrix/config.rs @@ -15,6 +15,10 @@ pub struct BotConfig { /// Set to `true` to enable the bot (default: false) #[serde(default)] pub enabled: bool, + /// Matrix user IDs allowed to interact with the bot. + /// If empty or omitted, the bot ignores ALL messages (fail-closed). + #[serde(default)] + pub allowed_users: Vec, /// Previously used to select an Anthropic model. Now ignored — the bot /// uses Claude Code which manages its own model selection. Kept for /// backwards compatibility so existing bot.toml files still parse.