From c600b94f4e9e3d94dfcfca82f928f2b89f650b11 Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 27 Apr 2026 01:32:38 +0000 Subject: [PATCH] chore: remove dangling orphan files accidentally added in b340aa97 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit server/src/agents/pool/lifecycle.rs and server/src/chat/transport/matrix/notifications.rs were untracked leftovers from an abandoned WIP stash that 'git add -A' picked up. Neither is declared as a mod anywhere — they're dangling code that doesn't get compiled but pollutes the tree. --- server/src/agents/pool/lifecycle.rs | 1766 ----------------- .../chat/transport/matrix/notifications.rs | 931 --------- test_fail.txt | 0 3 files changed, 2697 deletions(-) delete mode 100644 server/src/agents/pool/lifecycle.rs delete mode 100644 server/src/chat/transport/matrix/notifications.rs create mode 100644 test_fail.txt diff --git a/server/src/agents/pool/lifecycle.rs b/server/src/agents/pool/lifecycle.rs deleted file mode 100644 index 55c2ad6c..00000000 --- a/server/src/agents/pool/lifecycle.rs +++ /dev/null @@ -1,1766 +0,0 @@ -use crate::agent_log::AgentLogWriter; -use crate::config::ProjectConfig; -use crate::slog; -use crate::slog_error; -use std::path::Path; -use std::sync::{Arc, Mutex}; -use tokio::sync::broadcast; - -use super::super::{ - AgentEvent, AgentInfo, AgentStatus, PipelineStage, agent_config_stage, - pipeline_stage, -}; -use super::types::{PendingGuard, StoryAgent, agent_info_from_entry, composite_key}; -use super::{AgentPool, auto_assign}; -use super::worktree::find_active_story_stage; -use super::super::runtime::{AgentRuntime, ClaudeCodeRuntime, GeminiRuntime, OpenAiRuntime, RuntimeContext}; - -impl AgentPool { - /// Start an agent for a story: load config, create worktree, spawn agent. - /// - /// When `agent_name` is `None`, automatically selects the first idle coder - /// agent (story 190). If all coders are busy the call fails with an error - /// indicating the story will be picked up when one becomes available. - /// - /// If `resume_context` is provided, it is appended to the rendered prompt - /// so the agent can pick up from a previous failed attempt. - pub async fn start_agent( - &self, - project_root: &Path, - story_id: &str, - agent_name: Option<&str>, - resume_context: Option<&str>, - ) -> Result { - let config = ProjectConfig::load(project_root)?; - - // Validate explicit agent name early (no lock needed). - if let Some(name) = agent_name { - config - .find_agent(name) - .ok_or_else(|| format!("No agent named '{name}' in config"))?; - } - - // Create name-independent shared resources before the lock so they are - // ready for the atomic check-and-insert (story 132). - let (tx, _) = broadcast::channel::(1024); - let event_log: Arc>> = Arc::new(Mutex::new(Vec::new())); - let log_session_id = uuid::Uuid::new_v4().to_string(); - - // Move story from backlog/ to current/ before checking agent - // availability so that auto_assign_available_work can pick it up even - // when all coders are currently busy (story 203). This is idempotent: - // if the story is already in 2_current/ or a later stage, the call is - // a no-op. - crate::agents::lifecycle::move_story_to_current(project_root, story_id)?; - - // Validate that the agent's configured stage matches the story's - // pipeline stage. This prevents any caller (auto-assign, MCP tool, - // pipeline advance, supervisor) from starting a wrong-stage agent on - // a story — e.g. mergemaster on a coding-stage story (bug 312). - if let Some(name) = agent_name { - let agent_stage = config - .find_agent(name) - .map(agent_config_stage) - .unwrap_or_else(|| pipeline_stage(name)); - if agent_stage != PipelineStage::Other - && let Some(story_stage_dir) = find_active_story_stage(project_root, story_id) - { - let expected_stage = match story_stage_dir { - "2_current" => PipelineStage::Coder, - "3_qa" => PipelineStage::Qa, - "4_merge" => PipelineStage::Mergemaster, - _ => PipelineStage::Other, - }; - if expected_stage != PipelineStage::Other && expected_stage != agent_stage { - return Err(format!( - "Agent '{name}' (stage: {agent_stage:?}) cannot be assigned to \ - story '{story_id}' in {story_stage_dir}/ (requires stage: {expected_stage:?})" - )); - } - } - } - - // Read the preferred agent from the story's front matter before acquiring - // the lock. When no explicit agent_name is given, this lets start_agent - // honour `agent: coder-opus` written by the `assign` command — mirroring - // the auto_assign path (bug 379). - let front_matter_agent: Option = if agent_name.is_none() { - find_active_story_stage(project_root, story_id).and_then(|stage_dir| { - let path = project_root - .join(".storkit") - .join("work") - .join(stage_dir) - .join(format!("{story_id}.md")); - let contents = std::fs::read_to_string(path).ok()?; - crate::io::story_metadata::parse_front_matter(&contents).ok()?.agent - }) - } else { - None - }; - - // Atomically resolve agent name, check availability, and register as - // Pending. When `agent_name` is `None` the first idle coder is - // selected inside the lock so no TOCTOU race can occur between the - // availability check and the Pending insert (story 132, story 190). - // - // The `PendingGuard` ensures that if any step below fails the entry is - // removed from the pool so it does not permanently block auto-assign - // (bug 118). - let resolved_name: String; - let key: String; - { - let mut agents = self.agents.lock().map_err(|e| e.to_string())?; - - resolved_name = match agent_name { - Some(name) => name.to_string(), - None => { - // Honour the `agent:` field in the story's front matter so that - // `start 368` after `assign 368 opus` picks the right agent - // (bug 379). Mirrors the auto_assign selection logic. - if let Some(ref pref) = front_matter_agent { - let stage_matches = config - .find_agent(pref) - .map(|cfg| agent_config_stage(cfg) == PipelineStage::Coder) - .unwrap_or(false); - if stage_matches { - if auto_assign::is_agent_free(&agents, pref) { - pref.clone() - } else { - return Err(format!( - "Preferred agent '{pref}' from story front matter is busy; \ - story '{story_id}' has been queued in work/2_current/ and will \ - be auto-assigned when it becomes available" - )); - } - } else { - // Stage mismatch — fall back to any free coder. - auto_assign::find_free_agent_for_stage( - &config, - &agents, - &PipelineStage::Coder, - ) - .map(|s| s.to_string()) - .ok_or_else(|| { - if config - .agent - .iter() - .any(|a| agent_config_stage(a) == PipelineStage::Coder) - { - format!( - "All coder agents are busy; story '{story_id}' has been \ - queued in work/2_current/ and will be auto-assigned when \ - one becomes available" - ) - } else { - "No coder agent configured. Specify an agent_name explicitly." - .to_string() - } - })? - } - } else { - auto_assign::find_free_agent_for_stage( - &config, - &agents, - &PipelineStage::Coder, - ) - .map(|s| s.to_string()) - .ok_or_else(|| { - if config - .agent - .iter() - .any(|a| agent_config_stage(a) == PipelineStage::Coder) - { - format!( - "All coder agents are busy; story '{story_id}' has been \ - queued in work/2_current/ and will be auto-assigned when \ - one becomes available" - ) - } else { - "No coder agent configured. Specify an agent_name explicitly." - .to_string() - } - })? - } - } - }; - - key = composite_key(story_id, &resolved_name); - - // Check for duplicate assignment (same story + same agent already active). - if let Some(agent) = agents.get(&key) - && (agent.status == AgentStatus::Running || agent.status == AgentStatus::Pending) - { - return Err(format!( - "Agent '{resolved_name}' for story '{story_id}' is already {}", - agent.status - )); - } - // Enforce single-stage concurrency: reject if there is already a - // Running/Pending agent at the same pipeline stage for this story. - // This prevents two coders (or two QA/mergemaster agents) from - // corrupting each other's work in the same worktree. - // Applies to both explicit and auto-selected agents; the Other - // stage (supervisors, unknown agents) is exempt. - let resolved_stage = config - .find_agent(&resolved_name) - .map(agent_config_stage) - .unwrap_or_else(|| pipeline_stage(&resolved_name)); - if resolved_stage != PipelineStage::Other - && let Some(conflicting_name) = agents.iter().find_map(|(k, a)| { - let k_story = k.rsplit_once(':').map(|(s, _)| s).unwrap_or(k); - if k_story == story_id - && a.agent_name != resolved_name - && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) - { - let a_stage = config - .find_agent(&a.agent_name) - .map(agent_config_stage) - .unwrap_or_else(|| pipeline_stage(&a.agent_name)); - if a_stage == resolved_stage { - Some(a.agent_name.clone()) - } else { - None - } - } else { - None - } - }) - { - return Err(format!( - "Cannot start '{resolved_name}' on story '{story_id}': \ - '{conflicting_name}' is already active at the same pipeline stage" - )); - } - // Enforce single-instance concurrency for explicitly-named agents: - // if this agent is already running on any other story, reject. - // Auto-selected agents are already guaranteed idle by - // find_free_agent_for_stage, so this check is only needed for - // explicit requests. - if agent_name.is_some() - && let Some(busy_story) = agents.iter().find_map(|(k, a)| { - if a.agent_name == resolved_name - && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) - { - Some( - k.rsplit_once(':') - .map(|(sid, _)| sid) - .unwrap_or(k) - .to_string(), - ) - } else { - None - } - }) - { - return Err(format!( - "Agent '{resolved_name}' is already running on story '{busy_story}'; \ - story '{story_id}' will be picked up when the agent becomes available" - )); - } - agents.insert( - key.clone(), - StoryAgent { - agent_name: resolved_name.clone(), - status: AgentStatus::Pending, - worktree_info: None, - session_id: None, - tx: tx.clone(), - task_handle: None, - event_log: event_log.clone(), - completion: None, - project_root: Some(project_root.to_path_buf()), - log_session_id: Some(log_session_id.clone()), - merge_failure_reported: false, - throttled: false, - }, - ); - } - let mut pending_guard = PendingGuard::new(self.agents.clone(), key.clone()); - - // Create persistent log writer (needs resolved_name, so must be after - // the atomic resolution above). - let log_writer = - match AgentLogWriter::new(project_root, story_id, &resolved_name, &log_session_id) { - Ok(w) => Some(Arc::new(Mutex::new(w))), - Err(e) => { - eprintln!( - "[agents] Failed to create log writer for {story_id}:{resolved_name}: {e}" - ); - None - } - }; - - // Notify WebSocket clients that a new agent is pending. - Self::notify_agent_state_changed(&self.watcher_tx); - - let _ = tx.send(AgentEvent::Status { - story_id: story_id.to_string(), - agent_name: resolved_name.clone(), - status: "pending".to_string(), - }); - - // Extract inactivity timeout from the agent config before cloning config. - let inactivity_timeout_secs = config - .find_agent(&resolved_name) - .map(|a| a.inactivity_timeout_secs) - .unwrap_or(300); - - // Clone all values needed inside the background spawn. - let project_root_clone = project_root.to_path_buf(); - let config_clone = config.clone(); - let resume_context_owned = resume_context.map(str::to_string); - let sid = story_id.to_string(); - let aname = resolved_name.clone(); - let tx_clone = tx.clone(); - let agents_ref = self.agents.clone(); - let key_clone = key.clone(); - let log_clone = event_log.clone(); - let port_for_task = self.port; - let log_writer_clone = log_writer.clone(); - let child_killers_clone = self.child_killers.clone(); - let watcher_tx_clone = self.watcher_tx.clone(); - - // Spawn the background task. Worktree creation and agent launch happen here - // so `start_agent` returns immediately after registering the agent as - // Pending — non-blocking by design (story 157). - let handle = tokio::spawn(async move { - // Step 1: create the worktree (slow — git checkout, pnpm install, etc.) - let wt_info = match crate::worktree::create_worktree( - &project_root_clone, - &sid, - &config_clone, - port_for_task, - ) - .await - { - Ok(wt) => wt, - Err(e) => { - let error_msg = format!("Failed to create worktree: {e}"); - slog_error!("[agents] {error_msg}"); - let event = AgentEvent::Error { - story_id: sid.clone(), - agent_name: aname.clone(), - message: error_msg, - }; - if let Ok(mut log) = log_clone.lock() { - log.push(event.clone()); - } - let _ = tx_clone.send(event); - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Failed; - } - AgentPool::notify_agent_state_changed(&watcher_tx_clone); - return; - } - }; - - // Step 2: store worktree info and render agent command/args/prompt. - let wt_path_str = wt_info.path.to_string_lossy().to_string(); - { - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.worktree_info = Some(wt_info.clone()); - } - } - - let (command, args, mut prompt) = match config_clone.render_agent_args( - &wt_path_str, - &sid, - Some(&aname), - Some(&wt_info.base_branch), - ) { - Ok(result) => result, - Err(e) => { - let error_msg = format!("Failed to render agent args: {e}"); - slog_error!("[agents] {error_msg}"); - let event = AgentEvent::Error { - story_id: sid.clone(), - agent_name: aname.clone(), - message: error_msg, - }; - if let Ok(mut log) = log_clone.lock() { - log.push(event.clone()); - } - let _ = tx_clone.send(event); - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Failed; - } - AgentPool::notify_agent_state_changed(&watcher_tx_clone); - return; - } - }; - - // Append resume context if this is a restart with failure information. - if let Some(ctx) = resume_context_owned { - prompt.push_str(&ctx); - } - - // Step 3: transition to Running now that the worktree is ready. - { - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Running; - } - } - let _ = tx_clone.send(AgentEvent::Status { - story_id: sid.clone(), - agent_name: aname.clone(), - status: "running".to_string(), - }); - AgentPool::notify_agent_state_changed(&watcher_tx_clone); - - // Step 4: launch the agent process via the configured runtime. - let runtime_name = config_clone - .find_agent(&aname) - .and_then(|a| a.runtime.as_deref()) - .unwrap_or("claude-code"); - - let run_result = match runtime_name { - "claude-code" => { - let runtime = ClaudeCodeRuntime::new(child_killers_clone.clone(), watcher_tx_clone.clone()); - let ctx = RuntimeContext { - story_id: sid.clone(), - agent_name: aname.clone(), - command, - args, - prompt, - cwd: wt_path_str, - inactivity_timeout_secs, - mcp_port: port_for_task, - }; - runtime - .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) - .await - } - "gemini" => { - let runtime = GeminiRuntime::new(); - let ctx = RuntimeContext { - story_id: sid.clone(), - agent_name: aname.clone(), - command, - args, - prompt, - cwd: wt_path_str, - inactivity_timeout_secs, - mcp_port: port_for_task, - }; - runtime - .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) - .await - } - "openai" => { - let runtime = OpenAiRuntime::new(); - let ctx = RuntimeContext { - story_id: sid.clone(), - agent_name: aname.clone(), - command, - args, - prompt, - cwd: wt_path_str, - inactivity_timeout_secs, - mcp_port: port_for_task, - }; - runtime - .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) - .await - } - other => Err(format!( - "Unknown agent runtime '{other}'; check the 'runtime' field in project.toml. \ - Supported: 'claude-code', 'gemini', 'openai'" - )), - }; - - match run_result { - Ok(result) => { - // Persist token usage if the agent reported it. - if let Some(ref usage) = result.token_usage - && let Ok(agents) = agents_ref.lock() - && let Some(agent) = agents.get(&key_clone) - && let Some(ref pr) = agent.project_root - { - let model = config_clone - .find_agent(&aname) - .and_then(|a| a.model.clone()); - let record = crate::agents::token_usage::build_record( - &sid, &aname, model, usage.clone(), - ); - if let Err(e) = crate::agents::token_usage::append_record(pr, &record) { - slog_error!( - "[agents] Failed to persist token usage for \ - {sid}:{aname}: {e}" - ); - } - } - - // Server-owned completion: run acceptance gates automatically - // when the agent process exits normally. - super::pipeline::run_server_owned_completion( - &agents_ref, - port_for_task, - &sid, - &aname, - result.session_id, - watcher_tx_clone.clone(), - ) - .await; - AgentPool::notify_agent_state_changed(&watcher_tx_clone); - } - Err(e) => { - slog_error!("[agents] Agent process error for {aname} on {sid}: {e}"); - let event = AgentEvent::Error { - story_id: sid.clone(), - agent_name: aname.clone(), - message: e, - }; - if let Ok(mut log) = log_clone.lock() { - log.push(event.clone()); - } - let _ = tx_clone.send(event); - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Failed; - } - AgentPool::notify_agent_state_changed(&watcher_tx_clone); - } - } - }); - - // Store the task handle while the agent is still Pending. - { - let mut agents = self.agents.lock().map_err(|e| e.to_string())?; - if let Some(agent) = agents.get_mut(&key) { - agent.task_handle = Some(handle); - } - } - - // Agent successfully spawned — prevent the guard from removing the entry. - pending_guard.disarm(); - - Ok(AgentInfo { - story_id: story_id.to_string(), - agent_name: resolved_name, - status: AgentStatus::Pending, - session_id: None, - worktree_path: None, - base_branch: None, - completion: None, - log_session_id: Some(log_session_id), - throttled: false, - }) - } - - /// Stop a running agent. Worktree is preserved for inspection. - pub async fn stop_agent( - &self, - _project_root: &Path, - story_id: &str, - agent_name: &str, - ) -> Result<(), String> { - let key = composite_key(story_id, agent_name); - - let (worktree_info, task_handle, tx) = { - let mut agents = self.agents.lock().map_err(|e| e.to_string())?; - let agent = agents - .get_mut(&key) - .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; - - let wt = agent.worktree_info.clone(); - let handle = agent.task_handle.take(); - let tx = agent.tx.clone(); - agent.status = AgentStatus::Failed; - (wt, handle, tx) - }; - - // Abort the task and kill the PTY child process. - // Note: aborting a spawn_blocking task handle does not interrupt the blocking - // thread, so we must also kill the child process directly via the killer registry. - if let Some(handle) = task_handle { - handle.abort(); - let _ = handle.await; - } - self.kill_child_for_key(&key); - - // Preserve worktree for inspection — don't destroy agent's work on stop. - if let Some(ref wt) = worktree_info { - slog!( - "[agents] Worktree preserved for {story_id}:{agent_name}: {}", - wt.path.display() - ); - } - - let _ = tx.send(AgentEvent::Status { - story_id: story_id.to_string(), - agent_name: agent_name.to_string(), - status: "stopped".to_string(), - }); - - // Remove from map - { - let mut agents = self.agents.lock().map_err(|e| e.to_string())?; - agents.remove(&key); - } - - // Notify WebSocket clients so pipeline board and agent panel update. - Self::notify_agent_state_changed(&self.watcher_tx); - - Ok(()) - } - - /// Block until the agent reaches a terminal state (completed, failed, stopped). - /// Returns the agent's final `AgentInfo`. - /// `timeout_ms` caps how long to wait; returns an error if the deadline passes. - pub async fn wait_for_agent( - &self, - story_id: &str, - agent_name: &str, - timeout_ms: u64, - ) -> Result { - // Subscribe before checking status so we don't miss the terminal event - // if the agent completes in the window between the two operations. - let mut rx = self.subscribe(story_id, agent_name)?; - - // Return immediately if already in a terminal state. - { - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let key = composite_key(story_id, agent_name); - if let Some(agent) = agents.get(&key) - && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) - { - return Ok(agent_info_from_entry(story_id, agent)); - } - } - - let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms); - - loop { - let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); - if remaining.is_zero() { - return Err(format!( - "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" - )); - } - - match tokio::time::timeout(remaining, rx.recv()).await { - Ok(Ok(event)) => { - let is_terminal = match &event { - AgentEvent::Done { .. } | AgentEvent::Error { .. } => true, - AgentEvent::Status { status, .. } if status == "stopped" => true, - _ => false, - }; - if is_terminal { - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let key = composite_key(story_id, agent_name); - return Ok(if let Some(agent) = agents.get(&key) { - agent_info_from_entry(story_id, agent) - } else { - // Agent was removed from map (e.g. stop_agent removes it after - // the "stopped" status event is sent). - let (status, session_id) = match event { - AgentEvent::Done { session_id, .. } => { - (AgentStatus::Completed, session_id) - } - _ => (AgentStatus::Failed, None), - }; - AgentInfo { - story_id: story_id.to_string(), - agent_name: agent_name.to_string(), - status, - session_id, - worktree_path: None, - base_branch: None, - completion: None, - log_session_id: None, - throttled: false, - } - }); - } - } - Ok(Err(broadcast::error::RecvError::Lagged(_))) => { - // Missed some buffered events — check current status before resuming. - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let key = composite_key(story_id, agent_name); - if let Some(agent) = agents.get(&key) - && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) - { - return Ok(agent_info_from_entry(story_id, agent)); - } - // Still running — continue the loop. - } - Ok(Err(broadcast::error::RecvError::Closed)) => { - // Channel closed: no more events will arrive. Return current state. - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let key = composite_key(story_id, agent_name); - if let Some(agent) = agents.get(&key) { - return Ok(agent_info_from_entry(story_id, agent)); - } - return Err(format!( - "Agent '{agent_name}' for story '{story_id}' channel closed unexpectedly" - )); - } - Err(_) => { - return Err(format!( - "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" - )); - } - } - } - } - - /// Remove all agent entries for a given story_id from the pool. - /// - /// Called when a story is archived so that stale entries don't accumulate. - /// Returns the number of entries removed. - pub fn remove_agents_for_story(&self, story_id: &str) -> usize { - let mut agents = match self.agents.lock() { - Ok(a) => a, - Err(e) => { - slog_error!("[agents] Failed to lock pool for cleanup of '{story_id}': {e}"); - return 0; - } - }; - let prefix = format!("{story_id}:"); - let keys_to_remove: Vec = agents - .keys() - .filter(|k| k.starts_with(&prefix)) - .cloned() - .collect(); - let count = keys_to_remove.len(); - for key in &keys_to_remove { - agents.remove(key); - } - if count > 0 { - slog!("[agents] Removed {count} agent entries for archived story '{story_id}'"); - } - count - } -} - -#[cfg(test)] -mod tests { - use super::super::AgentPool; - use crate::agents::{AgentEvent, AgentStatus}; - - #[tokio::test] - async fn wait_for_agent_returns_immediately_if_completed() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("s1", "bot", AgentStatus::Completed); - - let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap(); - assert_eq!(info.status, AgentStatus::Completed); - assert_eq!(info.story_id, "s1"); - assert_eq!(info.agent_name, "bot"); - } - - #[tokio::test] - async fn wait_for_agent_returns_immediately_if_failed() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("s2", "bot", AgentStatus::Failed); - - let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap(); - assert_eq!(info.status, AgentStatus::Failed); - } - - #[tokio::test] - async fn wait_for_agent_completes_on_done_event() { - let pool = AgentPool::new_test(3001); - let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running); - - // Send Done event after a short delay - let tx_clone = tx.clone(); - tokio::spawn(async move { - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - let _ = tx_clone.send(AgentEvent::Done { - story_id: "s3".to_string(), - agent_name: "bot".to_string(), - session_id: Some("sess-abc".to_string()), - }); - }); - - let info = pool.wait_for_agent("s3", "bot", 2000).await.unwrap(); - assert_eq!(info.story_id, "s3"); - } - - #[tokio::test] - async fn wait_for_agent_times_out() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("s4", "bot", AgentStatus::Running); - - let result = pool.wait_for_agent("s4", "bot", 50).await; - assert!(result.is_err()); - let msg = result.unwrap_err(); - assert!(msg.contains("Timed out"), "unexpected message: {msg}"); - } - - #[tokio::test] - async fn wait_for_agent_errors_for_nonexistent() { - let pool = AgentPool::new_test(3001); - let result = pool.wait_for_agent("no_story", "no_bot", 100).await; - assert!(result.is_err()); - } - - #[tokio::test] - async fn wait_for_agent_completes_on_stopped_status_event() { - let pool = AgentPool::new_test(3001); - let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running); - - let tx_clone = tx.clone(); - tokio::spawn(async move { - tokio::time::sleep(std::time::Duration::from_millis(30)).await; - let _ = tx_clone.send(AgentEvent::Status { - story_id: "s5".to_string(), - agent_name: "bot".to_string(), - status: "stopped".to_string(), - }); - }); - - let info = pool.wait_for_agent("s5", "bot", 2000).await.unwrap(); - assert_eq!(info.story_id, "s5"); - } - - #[tokio::test] - async fn start_agent_auto_selects_second_coder_when_first_busy() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write( - sk.join("project.toml"), - r#" -[[agent]] -name = "supervisor" -stage = "other" - -[[agent]] -name = "coder-1" -stage = "coder" - -[[agent]] -name = "coder-2" -stage = "coder" -"#, - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("other-story", "coder-1", AgentStatus::Running); - - let result = pool - .start_agent(tmp.path(), "42_my_story", None, None) - .await; - match result { - Ok(info) => { - assert_eq!(info.agent_name, "coder-2"); - } - Err(err) => { - assert!( - !err.contains("All coder agents are busy"), - "should have selected coder-2 but got: {err}" - ); - assert!( - !err.contains("No coder agent configured"), - "should not fail on agent selection, got: {err}" - ); - } - } - } - - #[tokio::test] - async fn start_agent_returns_busy_when_all_coders_occupied() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write( - sk.join("project.toml"), - r#" -[[agent]] -name = "coder-1" -stage = "coder" - -[[agent]] -name = "coder-2" -stage = "coder" -"#, - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); - pool.inject_test_agent("story-2", "coder-2", AgentStatus::Pending); - - let result = pool.start_agent(tmp.path(), "story-3", None, None).await; - assert!(result.is_err()); - let err = result.unwrap_err(); - assert!( - err.contains("All coder agents are busy"), - "expected busy error, got: {err}" - ); - } - - #[tokio::test] - async fn start_agent_moves_story_to_current_when_coders_busy() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - let backlog = sk.join("work/1_backlog"); - std::fs::create_dir_all(&backlog).unwrap(); - std::fs::write( - sk.join("project.toml"), - r#" -[[agent]] -name = "coder-1" -stage = "coder" -"#, - ) - .unwrap(); - std::fs::write(backlog.join("story-3.md"), "---\nname: Story 3\n---\n").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); - - let result = pool.start_agent(tmp.path(), "story-3", None, None).await; - - assert!(result.is_err()); - let err = result.unwrap_err(); - assert!( - err.contains("All coder agents are busy"), - "expected busy error, got: {err}" - ); - assert!( - err.contains("queued in work/2_current/"), - "expected story-to-current message, got: {err}" - ); - - let current_path = sk.join("work/2_current/story-3.md"); - assert!( - current_path.exists(), - "story should be in 2_current/ after busy error, but was not" - ); - let backlog_path = backlog.join("story-3.md"); - assert!( - !backlog_path.exists(), - "story should no longer be in 1_backlog/" - ); - } - - #[tokio::test] - async fn start_agent_story_already_in_current_is_noop() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - let current = sk.join("work/2_current"); - std::fs::create_dir_all(¤t).unwrap(); - std::fs::write( - sk.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", - ) - .unwrap(); - std::fs::write(current.join("story-5.md"), "---\nname: Story 5\n---\n").unwrap(); - - let pool = AgentPool::new_test(3001); - - let result = pool.start_agent(tmp.path(), "story-5", None, None).await; - match result { - Ok(_) => {} - Err(e) => { - assert!( - !e.contains("Failed to move"), - "should not fail on idempotent move, got: {e}" - ); - } - } - } - - #[tokio::test] - async fn start_agent_explicit_name_unchanged_when_busy() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write( - sk.join("project.toml"), - r#" -[[agent]] -name = "coder-1" -stage = "coder" - -[[agent]] -name = "coder-2" -stage = "coder" -"#, - ) - .unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); - - let result = pool - .start_agent(tmp.path(), "story-2", Some("coder-1"), None) - .await; - assert!(result.is_err()); - let err = result.unwrap_err(); - assert!( - err.contains("coder-1") && err.contains("already running"), - "expected explicit busy error, got: {err}" - ); - } - - // ── start_agent single-instance concurrency tests ───────────────────────── - - #[tokio::test] - async fn start_agent_rejects_when_same_agent_already_running_on_another_story() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(&sk_dir).unwrap(); - fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story-a", "qa", AgentStatus::Running); - - let result = pool.start_agent(root, "story-b", Some("qa"), None).await; - - assert!( - result.is_err(), - "start_agent should fail when qa is already running on another story" - ); - let err = result.unwrap_err(); - assert!( - err.contains("already running") || err.contains("becomes available"), - "error message should explain why: got '{err}'" - ); - } - - #[tokio::test] - async fn start_agent_allows_new_story_when_previous_run_is_completed() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(&sk_dir).unwrap(); - fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story-a", "qa", AgentStatus::Completed); - - let result = pool.start_agent(root, "story-b", Some("qa"), None).await; - - if let Err(ref e) = result { - assert!( - !e.contains("already running") && !e.contains("becomes available"), - "completed agent must not trigger the concurrency guard: got '{e}'" - ); - } - } - - // ── bug 118: pending entry cleanup on start_agent failure ──────────────── - - #[tokio::test] - async fn start_agent_cleans_up_pending_entry_on_failure() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(&sk_dir).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", - ) - .unwrap(); - - let upcoming = root.join(".storkit/work/1_backlog"); - fs::create_dir_all(&upcoming).unwrap(); - fs::write(upcoming.join("50_story_test.md"), "---\nname: Test\n---\n").unwrap(); - - let pool = AgentPool::new_test(3099); - - let result = pool - .start_agent(root, "50_story_test", Some("coder-1"), None) - .await; - - assert!( - result.is_ok(), - "start_agent should return Ok(Pending) immediately: {:?}", - result.err() - ); - assert_eq!( - result.unwrap().status, - AgentStatus::Pending, - "initial status must be Pending" - ); - - let final_info = pool - .wait_for_agent("50_story_test", "coder-1", 5000) - .await - .expect("wait_for_agent should not time out"); - assert_eq!( - final_info.status, - AgentStatus::Failed, - "agent must transition to Failed after worktree creation error" - ); - - let agents = pool.agents.lock().unwrap(); - let failed_entry = agents - .values() - .find(|a| a.agent_name == "coder-1" && a.status == AgentStatus::Failed); - assert!( - failed_entry.is_some(), - "agent pool must retain a Failed entry so the UI can show the error state" - ); - drop(agents); - - let events = pool - .drain_events("50_story_test", "coder-1") - .expect("drain_events should succeed"); - let has_error_event = events.iter().any(|e| matches!(e, AgentEvent::Error { .. })); - assert!( - has_error_event, - "event_log must contain AgentEvent::Error after worktree creation fails" - ); - } - - #[tokio::test] - async fn start_agent_guard_does_not_remove_running_entry() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(&sk_dir).unwrap(); - fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap(); - - let pool = AgentPool::new_test(3099); - pool.inject_test_agent("story-x", "qa", AgentStatus::Running); - - let result = pool.start_agent(root, "story-y", Some("qa"), None).await; - - assert!(result.is_err()); - let err = result.unwrap_err(); - assert!( - err.contains("already running") || err.contains("becomes available"), - "running entry must survive: got '{err}'" - ); - } - - // ── TOCTOU race-condition regression tests (story 132) ─────────────────── - - #[tokio::test] - async fn toctou_pending_entry_blocks_same_agent_on_different_story() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(&sk_dir).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - pool.inject_test_agent("86_story_foo", "coder-1", AgentStatus::Pending); - - let result = pool - .start_agent(root, "130_story_bar", Some("coder-1"), None) - .await; - - assert!(result.is_err(), "second start_agent must be rejected"); - let err = result.unwrap_err(); - assert!( - err.contains("already running") || err.contains("becomes available"), - "expected concurrency-rejection message, got: '{err}'" - ); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn toctou_concurrent_start_agent_same_agent_exactly_one_concurrency_rejection() { - use std::fs; - use std::sync::Arc; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path().to_path_buf(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/1_backlog")).unwrap(); - fs::write( - root.join(".storkit/project.toml"), - "[[agent]]\nname = \"coder-1\"\n", - ) - .unwrap(); - fs::write( - root.join(".storkit/work/1_backlog/86_story_foo.md"), - "---\nname: Foo\n---\n", - ) - .unwrap(); - fs::write( - root.join(".storkit/work/1_backlog/130_story_bar.md"), - "---\nname: Bar\n---\n", - ) - .unwrap(); - - let pool = Arc::new(AgentPool::new_test(3099)); - - let pool1 = pool.clone(); - let root1 = root.clone(); - let t1 = tokio::spawn(async move { - pool1 - .start_agent(&root1, "86_story_foo", Some("coder-1"), None) - .await - }); - - let pool2 = pool.clone(); - let root2 = root.clone(); - let t2 = tokio::spawn(async move { - pool2 - .start_agent(&root2, "130_story_bar", Some("coder-1"), None) - .await - }); - - let (r1, r2) = tokio::join!(t1, t2); - let r1 = r1.unwrap(); - let r2 = r2.unwrap(); - - let concurrency_rejections = [&r1, &r2] - .iter() - .filter(|r| { - r.as_ref().is_err_and(|e| { - e.contains("already running") || e.contains("becomes available") - }) - }) - .count(); - - assert_eq!( - concurrency_rejections, 1, - "exactly one call must be rejected by the concurrency check; \ - got r1={r1:?} r2={r2:?}" - ); - } - - // ── story-230: prevent duplicate stage agents on same story ─────────────── - - #[tokio::test] - async fn start_agent_rejects_second_coder_stage_on_same_story() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(&sk_dir).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running); - - let result = pool - .start_agent(root, "42_story_foo", Some("coder-2"), None) - .await; - - assert!( - result.is_err(), - "second coder on same story must be rejected" - ); - let err = result.unwrap_err(); - assert!( - err.contains("same pipeline stage"), - "error must mention same pipeline stage, got: '{err}'" - ); - assert!( - err.contains("coder-1") && err.contains("coder-2"), - "error must name both agents, got: '{err}'" - ); - } - - #[tokio::test] - async fn start_agent_rejects_second_qa_stage_on_same_story() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(&sk_dir).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"qa-1\"\nstage = \"qa\"\n\n\ - [[agent]]\nname = \"qa-2\"\nstage = \"qa\"\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - pool.inject_test_agent("55_story_bar", "qa-1", AgentStatus::Running); - - let result = pool - .start_agent(root, "55_story_bar", Some("qa-2"), None) - .await; - - assert!(result.is_err(), "second qa on same story must be rejected"); - let err = result.unwrap_err(); - assert!( - err.contains("same pipeline stage"), - "error must mention same pipeline stage, got: '{err}'" - ); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn start_agent_concurrent_two_coders_same_story_exactly_one_stage_rejection() { - use std::fs; - use std::sync::Arc; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path().to_path_buf(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); - fs::write( - root.join(".storkit/project.toml"), - "[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n", - ) - .unwrap(); - fs::write( - root.join(".storkit/work/2_current/42_story_foo.md"), - "---\nname: Foo\n---\n", - ) - .unwrap(); - - let pool = Arc::new(AgentPool::new_test(3099)); - - let pool1 = pool.clone(); - let root1 = root.clone(); - let t1 = tokio::spawn(async move { - pool1 - .start_agent(&root1, "42_story_foo", Some("coder-1"), None) - .await - }); - - let pool2 = pool.clone(); - let root2 = root.clone(); - let t2 = tokio::spawn(async move { - pool2 - .start_agent(&root2, "42_story_foo", Some("coder-2"), None) - .await - }); - - let (r1, r2) = tokio::join!(t1, t2); - let r1 = r1.unwrap(); - let r2 = r2.unwrap(); - - let stage_rejections = [&r1, &r2] - .iter() - .filter(|r| r.as_ref().is_err_and(|e| e.contains("same pipeline stage"))) - .count(); - - assert_eq!( - stage_rejections, 1, - "exactly one call must be rejected by the stage-conflict check; \ - got r1={r1:?} r2={r2:?}" - ); - } - - #[tokio::test] - async fn start_agent_two_coders_different_stories_not_blocked_by_stage_check() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/1_backlog")).unwrap(); - fs::write( - root.join(".storkit/project.toml"), - "[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n", - ) - .unwrap(); - fs::write( - root.join(".storkit/work/1_backlog/99_story_baz.md"), - "---\nname: Baz\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running); - - let result = pool - .start_agent(root, "99_story_baz", Some("coder-2"), None) - .await; - - if let Err(ref e) = result { - assert!( - !e.contains("same pipeline stage"), - "stage-conflict guard must not fire for agents on different stories; \ - got: '{e}'" - ); - } - } - - // ── bug 312: stage-pipeline mismatch guard in start_agent ────────────── - - #[tokio::test] - async fn start_agent_rejects_mergemaster_on_coding_stage_story() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ - [[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", - ) - .unwrap(); - fs::write( - sk_dir.join("work/2_current/310_story_foo.md"), - "---\nname: Foo\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - let result = pool - .start_agent(root, "310_story_foo", Some("mergemaster"), None) - .await; - - assert!( - result.is_err(), - "mergemaster must not be assigned to a story in 2_current/" - ); - let err = result.unwrap_err(); - assert!( - err.contains("stage") && err.contains("2_current"), - "error must mention stage mismatch, got: '{err}'" - ); - } - - #[tokio::test] - async fn start_agent_rejects_coder_on_qa_stage_story() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/3_qa")).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ - [[agent]]\nname = \"qa\"\nstage = \"qa\"\n", - ) - .unwrap(); - fs::write( - sk_dir.join("work/3_qa/42_story_bar.md"), - "---\nname: Bar\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - let result = pool - .start_agent(root, "42_story_bar", Some("coder-1"), None) - .await; - - assert!( - result.is_err(), - "coder must not be assigned to a story in 3_qa/" - ); - let err = result.unwrap_err(); - assert!( - err.contains("stage") && err.contains("3_qa"), - "error must mention stage mismatch, got: '{err}'" - ); - } - - #[tokio::test] - async fn start_agent_rejects_qa_on_merge_stage_story() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/4_merge")).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"qa\"\nstage = \"qa\"\n\n\ - [[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", - ) - .unwrap(); - fs::write( - sk_dir.join("work/4_merge/55_story_baz.md"), - "---\nname: Baz\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - let result = pool - .start_agent(root, "55_story_baz", Some("qa"), None) - .await; - - assert!( - result.is_err(), - "qa must not be assigned to a story in 4_merge/" - ); - let err = result.unwrap_err(); - assert!( - err.contains("stage") && err.contains("4_merge"), - "error must mention stage mismatch, got: '{err}'" - ); - } - - #[tokio::test] - async fn start_agent_allows_supervisor_on_any_stage() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"supervisor\"\nstage = \"other\"\n", - ) - .unwrap(); - fs::write( - sk_dir.join("work/2_current/77_story_sup.md"), - "---\nname: Sup\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - let result = pool - .start_agent(root, "77_story_sup", Some("supervisor"), None) - .await; - - match result { - Ok(_) => {} - Err(e) => { - assert!( - !e.contains("stage:") || !e.contains("cannot be assigned"), - "supervisor should not be rejected for stage mismatch, got: '{e}'" - ); - } - } - } - - #[tokio::test] - async fn start_agent_allows_correct_stage_agent() { - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let sk_dir = root.join(".storkit"); - fs::create_dir_all(sk_dir.join("work/4_merge")).unwrap(); - fs::write( - sk_dir.join("project.toml"), - "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", - ) - .unwrap(); - fs::write( - sk_dir.join("work/4_merge/88_story_ok.md"), - "---\nname: OK\n---\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3099); - let result = pool - .start_agent(root, "88_story_ok", Some("mergemaster"), None) - .await; - - match result { - Ok(_) => {} - Err(e) => { - assert!( - !e.contains("cannot be assigned"), - "mergemaster on 4_merge/ story should not fail stage check, got: '{e}'" - ); - } - } - } - - // ── remove_agents_for_story tests ──────────────────────────────────────── - - #[test] - fn remove_agents_for_story_removes_all_entries() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story_a", "coder-1", AgentStatus::Completed); - pool.inject_test_agent("story_a", "qa", AgentStatus::Failed); - pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running); - - let removed = pool.remove_agents_for_story("story_a"); - assert_eq!(removed, 2, "should remove both agents for story_a"); - - let agents = pool.list_agents().unwrap(); - assert_eq!(agents.len(), 1, "only story_b agent should remain"); - assert_eq!(agents[0].story_id, "story_b"); - } - - #[test] - fn remove_agents_for_story_returns_zero_when_no_match() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running); - - let removed = pool.remove_agents_for_story("nonexistent"); - assert_eq!(removed, 0); - - let agents = pool.list_agents().unwrap(); - assert_eq!(agents.len(), 1, "existing agents should not be affected"); - } - - // ── front matter agent preference (bug 379) ────────────────────────────── - - #[tokio::test] - async fn start_agent_honours_front_matter_agent_when_idle() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - let backlog = sk.join("work/1_backlog"); - std::fs::create_dir_all(&backlog).unwrap(); - std::fs::write( - sk.join("project.toml"), - r#" -[[agent]] -name = "coder-sonnet" -stage = "coder" - -[[agent]] -name = "coder-opus" -stage = "coder" -"#, - ) - .unwrap(); - // Story file with agent preference in front matter. - std::fs::write( - backlog.join("368_story_test.md"), - "---\nname: Test Story\nagent: coder-opus\n---\n# Story 368\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3010); - // coder-sonnet is busy so without front matter the auto-selection - // would skip coder-opus and try something else. - pool.inject_test_agent("other-story", "coder-sonnet", AgentStatus::Running); - - let result = pool - .start_agent(tmp.path(), "368_story_test", None, None) - .await; - match result { - Ok(info) => { - assert_eq!( - info.agent_name, "coder-opus", - "should pick the front-matter preferred agent" - ); - } - Err(err) => { - // Allowed to fail for infrastructure reasons (no git repo), - // but NOT due to agent selection ignoring the preference. - assert!( - !err.contains("All coder agents are busy"), - "should not report busy when coder-opus is idle: {err}" - ); - assert!( - !err.contains("coder-sonnet"), - "should not have picked coder-sonnet: {err}" - ); - } - } - } - - #[tokio::test] - async fn start_agent_returns_error_when_front_matter_agent_busy() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".storkit"); - let backlog = sk.join("work/1_backlog"); - std::fs::create_dir_all(&backlog).unwrap(); - std::fs::write( - sk.join("project.toml"), - r#" -[[agent]] -name = "coder-sonnet" -stage = "coder" - -[[agent]] -name = "coder-opus" -stage = "coder" -"#, - ) - .unwrap(); - std::fs::write( - backlog.join("368_story_test.md"), - "---\nname: Test Story\nagent: coder-opus\n---\n# Story 368\n", - ) - .unwrap(); - - let pool = AgentPool::new_test(3011); - // Preferred agent is busy — should NOT fall back to coder-sonnet. - pool.inject_test_agent("other-story", "coder-opus", AgentStatus::Running); - - let result = pool - .start_agent(tmp.path(), "368_story_test", None, None) - .await; - assert!(result.is_err(), "expected error when preferred agent is busy"); - let err = result.unwrap_err(); - assert!( - err.contains("coder-opus"), - "error should mention the preferred agent: {err}" - ); - assert!( - err.contains("busy") || err.contains("queued"), - "error should say agent is busy or story is queued: {err}" - ); - } - - // ── archive + cleanup integration test ─────────────────────────────────── - - #[tokio::test] - async fn archiving_story_removes_agent_entries_from_pool() { - use crate::agents::lifecycle::move_story_to_archived; - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let current = root.join(".storkit/work/2_current"); - fs::create_dir_all(¤t).unwrap(); - fs::write(current.join("60_story_cleanup.md"), "test").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("60_story_cleanup", "coder-1", AgentStatus::Completed); - pool.inject_test_agent("60_story_cleanup", "qa", AgentStatus::Completed); - pool.inject_test_agent("61_story_other", "coder-1", AgentStatus::Running); - - assert_eq!(pool.list_agents().unwrap().len(), 3); - - move_story_to_archived(root, "60_story_cleanup").unwrap(); - pool.remove_agents_for_story("60_story_cleanup"); - - let remaining = pool.list_agents().unwrap(); - assert_eq!( - remaining.len(), - 1, - "only the other story's agent should remain" - ); - assert_eq!(remaining[0].story_id, "61_story_other"); - - assert!( - root.join(".storkit/work/5_done/60_story_cleanup.md") - .exists() - ); - } - -} diff --git a/server/src/chat/transport/matrix/notifications.rs b/server/src/chat/transport/matrix/notifications.rs deleted file mode 100644 index e9d152e4..00000000 --- a/server/src/chat/transport/matrix/notifications.rs +++ /dev/null @@ -1,931 +0,0 @@ -//! Stage transition notifications for Matrix rooms. -//! -//! Subscribes to [`WatcherEvent`] broadcasts and posts a notification to all -//! configured Matrix rooms whenever a work item moves between pipeline stages. - -use crate::io::story_metadata::parse_front_matter; -use crate::io::watcher::WatcherEvent; -use crate::slog; -use crate::chat::ChatTransport; -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::sync::broadcast; - -/// Human-readable display name for a pipeline stage directory. -pub fn stage_display_name(stage: &str) -> &'static str { - match stage { - "1_backlog" => "Backlog", - "2_current" => "Current", - "3_qa" => "QA", - "4_merge" => "Merge", - "5_done" => "Done", - "6_archived" => "Archived", - _ => "Unknown", - } -} - -/// Infer the previous pipeline stage for a given destination stage. -/// -/// Returns `None` for `1_backlog` since items are created there (not -/// transitioned from another stage). -pub fn inferred_from_stage(to_stage: &str) -> Option<&'static str> { - match to_stage { - "2_current" => Some("Backlog"), - "3_qa" => Some("Current"), - "4_merge" => Some("QA"), - "5_done" => Some("Merge"), - "6_archived" => Some("Done"), - _ => None, - } -} - -/// Extract the numeric story number from an item ID like `"261_story_slug"`. -pub fn extract_story_number(item_id: &str) -> Option<&str> { - item_id - .split('_') - .next() - .filter(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit())) -} - -/// Read the story name from the work item file's YAML front matter. -/// -/// Returns `None` if the file doesn't exist or has no parseable name. -pub fn read_story_name(project_root: &Path, stage: &str, item_id: &str) -> Option { - let path = project_root - .join(".storkit") - .join("work") - .join(stage) - .join(format!("{item_id}.md")); - let contents = std::fs::read_to_string(&path).ok()?; - let meta = parse_front_matter(&contents).ok()?; - meta.name -} - -/// Format a stage transition notification message. -/// -/// Returns `(plain_text, html)` suitable for `RoomMessageEventContent::text_html`. -pub fn format_stage_notification( - item_id: &str, - story_name: Option<&str>, - from_stage: &str, - to_stage: &str, -) -> (String, String) { - let number = extract_story_number(item_id).unwrap_or(item_id); - let name = story_name.unwrap_or(item_id); - - let prefix = if to_stage == "Done" { "\u{1f389} " } else { "" }; - let plain = format!("{prefix}#{number} {name} \u{2014} {from_stage} \u{2192} {to_stage}"); - let html = format!( - "{prefix}#{number} {name} \u{2014} {from_stage} \u{2192} {to_stage}" - ); - (plain, html) -} - -/// Format an error notification message for a story failure. -/// -/// Returns `(plain_text, html)` suitable for `RoomMessageEventContent::text_html`. -pub fn format_error_notification( - item_id: &str, - story_name: Option<&str>, - reason: &str, -) -> (String, String) { - let number = extract_story_number(item_id).unwrap_or(item_id); - let name = story_name.unwrap_or(item_id); - - let plain = format!("\u{274c} #{number} {name} \u{2014} {reason}"); - let html = format!( - "\u{274c} #{number} {name} \u{2014} {reason}" - ); - (plain, html) -} - -/// Search all pipeline stages for a story name. -/// -/// Tries each known pipeline stage directory in order and returns the first -/// name found. Used for events (like rate-limit warnings) that arrive without -/// a known stage. -fn find_story_name_any_stage(project_root: &Path, item_id: &str) -> Option { - for stage in &["2_current", "3_qa", "4_merge", "1_backlog", "5_done"] { - if let Some(name) = read_story_name(project_root, stage, item_id) { - return Some(name); - } - } - None -} - -/// Format a blocked-story notification message. -/// -/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. -pub fn format_blocked_notification( - item_id: &str, - story_name: Option<&str>, - reason: &str, -) -> (String, String) { - let number = extract_story_number(item_id).unwrap_or(item_id); - let name = story_name.unwrap_or(item_id); - - let plain = format!("\u{1f6ab} #{number} {name} \u{2014} BLOCKED: {reason}"); - let html = format!( - "\u{1f6ab} #{number} {name} \u{2014} BLOCKED: {reason}" - ); - (plain, html) -} - -/// Minimum time between rate-limit notifications for the same agent. -const RATE_LIMIT_DEBOUNCE: Duration = Duration::from_secs(60); - -/// Format a rate limit hard block notification message with scheduled resume time. -/// -/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. -pub fn format_rate_limit_hard_block_notification( - item_id: &str, - story_name: Option<&str>, - agent_name: &str, - resume_at: chrono::DateTime, -) -> (String, String) { - let number = extract_story_number(item_id).unwrap_or(item_id); - let name = story_name.unwrap_or(item_id); - let local_time = resume_at.with_timezone(&chrono::Local); - let resume_str = local_time.format("%Y-%m-%d %H:%M").to_string(); - - let plain = format!( - "\u{1f6d1} #{number} {name} \u{2014} {agent_name} hit a hard rate limit; \ - will auto-resume at {resume_str}" - ); - let html = format!( - "\u{1f6d1} #{number} {name} \u{2014} \ - {agent_name} hit a hard rate limit; will auto-resume at {resume_str}" - ); - (plain, html) -} - -/// Format a rate limit warning notification message. -/// -/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. -pub fn format_rate_limit_notification( - item_id: &str, - story_name: Option<&str>, - agent_name: &str, -) -> (String, String) { - let number = extract_story_number(item_id).unwrap_or(item_id); - let name = story_name.unwrap_or(item_id); - - let plain = format!( - "\u{26a0}\u{fe0f} #{number} {name} \u{2014} {agent_name} hit an API rate limit" - ); - let html = format!( - "\u{26a0}\u{fe0f} #{number} {name} \u{2014} \ - {agent_name} hit an API rate limit" - ); - (plain, html) -} - -/// Spawn a background task that listens for watcher events and posts -/// stage-transition notifications to all configured rooms via the -/// [`ChatTransport`] abstraction. -/// -/// `get_room_ids` is called on each notification to obtain the current list of -/// destination room IDs. Pass a closure that returns a static list for Matrix -/// and Slack, or one that reads from a runtime `Arc>>` -/// for WhatsApp ambient senders. -pub fn spawn_notification_listener( - transport: Arc, - get_room_ids: impl Fn() -> Vec + Send + 'static, - watcher_rx: broadcast::Receiver, - project_root: PathBuf, -) { - tokio::spawn(async move { - let mut rx = watcher_rx; - // Tracks when a rate-limit notification was last sent for each - // "story_id:agent_name" key, to debounce repeated warnings. - let mut rate_limit_last_notified: HashMap = HashMap::new(); - - loop { - match rx.recv().await { - Ok(WatcherEvent::WorkItem { - ref stage, - ref item_id, - .. - }) => { - // Only notify on stage transitions, not creations. - let Some(from_display) = inferred_from_stage(stage) else { - continue; - }; - let to_display = stage_display_name(stage); - - let story_name = read_story_name(&project_root, stage, item_id); - let (plain, html) = format_stage_notification( - item_id, - story_name.as_deref(), - from_display, - to_display, - ); - - slog!("[bot] Sending stage notification: {plain}"); - - for room_id in &get_room_ids() { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!( - "[bot] Failed to send notification to {room_id}: {e}" - ); - } - } - } - Ok(WatcherEvent::MergeFailure { - ref story_id, - ref reason, - }) => { - let story_name = - read_story_name(&project_root, "4_merge", story_id); - let (plain, html) = format_error_notification( - story_id, - story_name.as_deref(), - reason, - ); - - slog!("[bot] Sending error notification: {plain}"); - - for room_id in &get_room_ids() { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!( - "[bot] Failed to send error notification to {room_id}: {e}" - ); - } - } - } - Ok(WatcherEvent::RateLimitWarning { - ref story_id, - ref agent_name, - }) => { - // Debounce: skip if we sent a notification for this agent - // within the last RATE_LIMIT_DEBOUNCE seconds. - let debounce_key = format!("{story_id}:{agent_name}"); - let now = Instant::now(); - if let Some(&last) = rate_limit_last_notified.get(&debounce_key) - && now.duration_since(last) < RATE_LIMIT_DEBOUNCE - { - slog!( - "[bot] Rate-limit notification debounced for \ - {story_id}:{agent_name}" - ); - continue; - } - rate_limit_last_notified.insert(debounce_key, now); - - let story_name = find_story_name_any_stage(&project_root, story_id); - let (plain, html) = format_rate_limit_notification( - story_id, - story_name.as_deref(), - agent_name, - ); - - slog!("[bot] Sending rate-limit notification: {plain}"); - - for room_id in &get_room_ids() { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!( - "[bot] Failed to send rate-limit notification \ - to {room_id}: {e}" - ); - } - } - } - Ok(WatcherEvent::StoryBlocked { - ref story_id, - ref reason, - }) => { - let story_name = find_story_name_any_stage(&project_root, story_id); - let (plain, html) = format_blocked_notification( - story_id, - story_name.as_deref(), - reason, - ); - - slog!("[bot] Sending blocked notification: {plain}"); - - for room_id in &get_room_ids() { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!( - "[bot] Failed to send blocked notification to {room_id}: {e}" - ); - } - } - } - Ok(WatcherEvent::RateLimitHardBlock { - ref story_id, - ref agent_name, - reset_at, - }) => { - // Debounce: reuse the same key as RateLimitWarning so both - // types are rate-limited together for the same agent. - let debounce_key = format!("{story_id}:{agent_name}"); - let now = Instant::now(); - if let Some(&last) = rate_limit_last_notified.get(&debounce_key) - && now.duration_since(last) < RATE_LIMIT_DEBOUNCE - { - slog!( - "[bot] Rate-limit hard-block notification debounced for \ - {story_id}:{agent_name}" - ); - continue; - } - rate_limit_last_notified.insert(debounce_key, now); - - let story_name = find_story_name_any_stage(&project_root, story_id); - let (plain, html) = format_rate_limit_hard_block_notification( - story_id, - story_name.as_deref(), - agent_name, - reset_at, - ); - - slog!("[bot] Sending rate-limit hard-block notification: {plain}"); - - for room_id in &get_room_ids() { - if let Err(e) = transport.send_message(room_id, &plain, &html).await { - slog!( - "[bot] Failed to send rate-limit hard-block notification \ - to {room_id}: {e}" - ); - } - } - } - Ok(_) => {} // Ignore non-work-item events - Err(broadcast::error::RecvError::Lagged(n)) => { - slog!( - "[bot] Notification listener lagged, skipped {n} events" - ); - } - Err(broadcast::error::RecvError::Closed) => { - slog!( - "[bot] Watcher channel closed, stopping notification listener" - ); - break; - } - } - } - }); -} - -#[cfg(test)] -mod tests { - use super::*; - use async_trait::async_trait; - use crate::chat::MessageId; - - // ── MockTransport ─────────────────────────────────────────────────────── - - type CallLog = Arc>>; - - /// Records every `send_message` call for inspection in tests. - struct MockTransport { - calls: CallLog, - } - - impl MockTransport { - fn new() -> (Arc, CallLog) { - let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); - (Arc::new(Self { calls: Arc::clone(&calls) }), calls) - } - } - - #[async_trait] - impl crate::chat::ChatTransport for MockTransport { - async fn send_message(&self, room_id: &str, plain: &str, html: &str) -> Result { - self.calls.lock().unwrap().push((room_id.to_string(), plain.to_string(), html.to_string())); - Ok("mock-msg-id".to_string()) - } - - async fn edit_message(&self, _room_id: &str, _id: &str, _plain: &str, _html: &str) -> Result<(), String> { - Ok(()) - } - - async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { - Ok(()) - } - } - - // ── spawn_notification_listener: RateLimitWarning ─────────────────────── - - /// AC2 + AC3: when a RateLimitWarning event arrives, send_message is called - /// with a notification that names the agent and story. - #[tokio::test] - async fn rate_limit_warning_sends_notification_with_agent_and_story() { - let tmp = tempfile::tempdir().unwrap(); - let stage_dir = tmp.path().join(".storkit").join("work").join("2_current"); - std::fs::create_dir_all(&stage_dir).unwrap(); - std::fs::write( - stage_dir.join("365_story_rate_limit.md"), - "---\nname: Rate Limit Test Story\n---\n", - ) - .unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room123:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - watcher_tx.send(WatcherEvent::RateLimitWarning { - story_id: "365_story_rate_limit".to_string(), - agent_name: "coder-1".to_string(), - }).unwrap(); - - // Give the spawned task time to process the event. - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 1, "Expected exactly one notification"); - let (room_id, plain, _html) = &calls[0]; - assert_eq!(room_id, "!room123:example.org"); - assert!(plain.contains("365"), "plain should contain story number"); - assert!(plain.contains("Rate Limit Test Story"), "plain should contain story name"); - assert!(plain.contains("coder-1"), "plain should contain agent name"); - assert!(plain.contains("rate limit"), "plain should mention rate limit"); - } - - /// AC4: a second RateLimitWarning for the same agent within the debounce - /// window must NOT trigger a second notification. - #[tokio::test] - async fn rate_limit_warning_is_debounced() { - let tmp = tempfile::tempdir().unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room1:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - // Send the same warning twice in rapid succession. - for _ in 0..2 { - watcher_tx.send(WatcherEvent::RateLimitWarning { - story_id: "42_story_debounce".to_string(), - agent_name: "coder-2".to_string(), - }).unwrap(); - } - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 1, "Debounce should suppress the second notification"); - } - - /// AC4 (corollary): warnings for different agents are NOT debounced against - /// each other — both should produce notifications. - #[tokio::test] - async fn rate_limit_warnings_for_different_agents_both_notify() { - let tmp = tempfile::tempdir().unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room1:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - watcher_tx.send(WatcherEvent::RateLimitWarning { - story_id: "42_story_foo".to_string(), - agent_name: "coder-1".to_string(), - }).unwrap(); - watcher_tx.send(WatcherEvent::RateLimitWarning { - story_id: "42_story_foo".to_string(), - agent_name: "coder-2".to_string(), - }).unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 2, "Different agents should each trigger a notification"); - } - - // ── dynamic room IDs (WhatsApp ambient_rooms pattern) ─────────────────── - - /// Notifications are sent to the rooms returned by the closure at - /// notification time, not at listener-spawn time. This verifies that a - /// closure backed by a runtime set (e.g. WhatsApp ambient_rooms) delivers - /// messages to the rooms present when the event fires. - #[tokio::test] - async fn stage_notification_uses_dynamic_room_ids() { - let tmp = tempfile::tempdir().unwrap(); - let stage_dir = tmp.path().join(".storkit").join("work").join("3_qa"); - std::fs::create_dir_all(&stage_dir).unwrap(); - std::fs::write( - stage_dir.join("10_story_foo.md"), - "---\nname: Foo Story\n---\n", - ) - .unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - let rooms: Arc>> = - Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())); - let rooms_for_closure = Arc::clone(&rooms); - - spawn_notification_listener( - transport, - move || rooms_for_closure.lock().unwrap().iter().cloned().collect(), - watcher_rx, - tmp.path().to_path_buf(), - ); - - // Add a room after the listener is spawned (simulates a user messaging first). - rooms.lock().unwrap().insert("phone:+15551234567".to_string()); - - watcher_tx.send(WatcherEvent::WorkItem { - stage: "3_qa".to_string(), - item_id: "10_story_foo".to_string(), - action: "qa".to_string(), - commit_msg: "storkit: qa 10_story_foo".to_string(), - }).unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 1, "Should deliver to the dynamically added room"); - assert_eq!(calls[0].0, "phone:+15551234567"); - assert!(calls[0].1.contains("10"), "plain should contain story number"); - assert!(calls[0].1.contains("Foo Story"), "plain should contain story name"); - } - - /// When no rooms are registered (e.g. no WhatsApp users have messaged yet), - /// no notifications are sent and the listener does not panic. - #[tokio::test] - async fn stage_notification_with_no_rooms_is_silent() { - let tmp = tempfile::tempdir().unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - Vec::new, - watcher_rx, - tmp.path().to_path_buf(), - ); - - watcher_tx.send(WatcherEvent::WorkItem { - stage: "3_qa".to_string(), - item_id: "10_story_foo".to_string(), - action: "qa".to_string(), - commit_msg: "storkit: qa 10_story_foo".to_string(), - }).unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 0, "No rooms means no notifications"); - } - - // ── stage_display_name ────────────────────────────────────────────────── - - #[test] - fn stage_display_name_maps_all_known_stages() { - assert_eq!(stage_display_name("1_backlog"), "Backlog"); - assert_eq!(stage_display_name("2_current"), "Current"); - assert_eq!(stage_display_name("3_qa"), "QA"); - assert_eq!(stage_display_name("4_merge"), "Merge"); - assert_eq!(stage_display_name("5_done"), "Done"); - assert_eq!(stage_display_name("6_archived"), "Archived"); - assert_eq!(stage_display_name("unknown"), "Unknown"); - } - - // ── inferred_from_stage ───────────────────────────────────────────────── - - #[test] - fn inferred_from_stage_returns_previous_stage() { - assert_eq!(inferred_from_stage("2_current"), Some("Backlog")); - assert_eq!(inferred_from_stage("3_qa"), Some("Current")); - assert_eq!(inferred_from_stage("4_merge"), Some("QA")); - assert_eq!(inferred_from_stage("5_done"), Some("Merge")); - assert_eq!(inferred_from_stage("6_archived"), Some("Done")); - } - - #[test] - fn inferred_from_stage_returns_none_for_backlog() { - assert_eq!(inferred_from_stage("1_backlog"), None); - } - - #[test] - fn inferred_from_stage_returns_none_for_unknown() { - assert_eq!(inferred_from_stage("9_unknown"), None); - } - - // ── extract_story_number ──────────────────────────────────────────────── - - #[test] - fn extract_story_number_parses_numeric_prefix() { - assert_eq!( - extract_story_number("261_story_bot_notifications"), - Some("261") - ); - assert_eq!(extract_story_number("42_bug_fix_thing"), Some("42")); - assert_eq!(extract_story_number("1_spike_research"), Some("1")); - } - - #[test] - fn extract_story_number_returns_none_for_non_numeric() { - assert_eq!(extract_story_number("abc_story_thing"), None); - assert_eq!(extract_story_number(""), None); - } - - // ── read_story_name ───────────────────────────────────────────────────── - - #[test] - fn read_story_name_reads_from_front_matter() { - let tmp = tempfile::tempdir().unwrap(); - let stage_dir = tmp - .path() - .join(".storkit") - .join("work") - .join("2_current"); - std::fs::create_dir_all(&stage_dir).unwrap(); - std::fs::write( - stage_dir.join("42_story_my_feature.md"), - "---\nname: My Cool Feature\n---\n# Story\n", - ) - .unwrap(); - - let name = read_story_name(tmp.path(), "2_current", "42_story_my_feature"); - assert_eq!(name.as_deref(), Some("My Cool Feature")); - } - - #[test] - fn read_story_name_returns_none_for_missing_file() { - let tmp = tempfile::tempdir().unwrap(); - let name = read_story_name(tmp.path(), "2_current", "99_story_missing"); - assert_eq!(name, None); - } - - #[test] - fn read_story_name_returns_none_for_missing_name_field() { - let tmp = tempfile::tempdir().unwrap(); - let stage_dir = tmp - .path() - .join(".storkit") - .join("work") - .join("2_current"); - std::fs::create_dir_all(&stage_dir).unwrap(); - std::fs::write( - stage_dir.join("42_story_no_name.md"), - "---\ncoverage_baseline: 50%\n---\n# Story\n", - ) - .unwrap(); - - let name = read_story_name(tmp.path(), "2_current", "42_story_no_name"); - assert_eq!(name, None); - } - - // ── format_error_notification ──────────────────────────────────────────── - - #[test] - fn format_error_notification_with_story_name() { - let (plain, html) = - format_error_notification("262_story_bot_errors", Some("Bot error notifications"), "merge conflict in src/main.rs"); - assert_eq!( - plain, - "\u{274c} #262 Bot error notifications \u{2014} merge conflict in src/main.rs" - ); - assert_eq!( - html, - "\u{274c} #262 Bot error notifications \u{2014} merge conflict in src/main.rs" - ); - } - - #[test] - fn format_error_notification_without_story_name_falls_back_to_item_id() { - let (plain, _html) = - format_error_notification("42_bug_fix_thing", None, "tests failed"); - assert_eq!( - plain, - "\u{274c} #42 42_bug_fix_thing \u{2014} tests failed" - ); - } - - #[test] - fn format_error_notification_non_numeric_id_uses_full_id() { - let (plain, _html) = - format_error_notification("abc_story_thing", Some("Some Story"), "clippy errors"); - assert_eq!( - plain, - "\u{274c} #abc_story_thing Some Story \u{2014} clippy errors" - ); - } - - // ── format_blocked_notification ───────────────────────────────────────── - - #[test] - fn format_blocked_notification_with_story_name() { - let (plain, html) = format_blocked_notification( - "425_story_blocking_reason", - Some("Blocking Reason Story"), - "Retry limit exceeded (3/3) at coder stage", - ); - assert_eq!( - plain, - "\u{1f6ab} #425 Blocking Reason Story \u{2014} BLOCKED: Retry limit exceeded (3/3) at coder stage" - ); - assert_eq!( - html, - "\u{1f6ab} #425 Blocking Reason Story \u{2014} BLOCKED: Retry limit exceeded (3/3) at coder stage" - ); - } - - #[test] - fn format_blocked_notification_falls_back_to_item_id() { - let (plain, _html) = - format_blocked_notification("42_story_thing", None, "empty diff"); - assert_eq!( - plain, - "\u{1f6ab} #42 42_story_thing \u{2014} BLOCKED: empty diff" - ); - } - - // ── spawn_notification_listener: StoryBlocked ─────────────────────────── - - /// AC1: when a StoryBlocked event arrives, send_message is called with a - /// notification that includes the story number, name, and reason. - #[tokio::test] - async fn story_blocked_sends_notification_with_reason() { - let tmp = tempfile::tempdir().unwrap(); - let stage_dir = tmp.path().join(".storkit").join("work").join("2_current"); - std::fs::create_dir_all(&stage_dir).unwrap(); - std::fs::write( - stage_dir.join("425_story_blocking_test.md"), - "---\nname: Blocking Test Story\n---\n", - ) - .unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - || vec!["!room123:example.org".to_string()], - watcher_rx, - tmp.path().to_path_buf(), - ); - - watcher_tx.send(WatcherEvent::StoryBlocked { - story_id: "425_story_blocking_test".to_string(), - reason: "Retry limit exceeded (3/3) at coder stage".to_string(), - }).unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 1, "Expected exactly one notification"); - let (room_id, plain, html) = &calls[0]; - assert_eq!(room_id, "!room123:example.org"); - assert!(plain.contains("425"), "plain should contain story number"); - assert!(plain.contains("Blocking Test Story"), "plain should contain story name"); - assert!(plain.contains("BLOCKED"), "plain should contain BLOCKED label"); - assert!(plain.contains("Retry limit exceeded"), "plain should contain the reason"); - assert!(html.contains("BLOCKED"), "html should contain BLOCKED label"); - } - - /// StoryBlocked with no room registered should not panic. - #[tokio::test] - async fn story_blocked_with_no_rooms_is_silent() { - let tmp = tempfile::tempdir().unwrap(); - - let (watcher_tx, watcher_rx) = broadcast::channel::(16); - let (transport, calls) = MockTransport::new(); - - spawn_notification_listener( - transport, - Vec::new, - watcher_rx, - tmp.path().to_path_buf(), - ); - - watcher_tx.send(WatcherEvent::StoryBlocked { - story_id: "42_story_no_rooms".to_string(), - reason: "empty diff".to_string(), - }).unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let calls = calls.lock().unwrap(); - assert_eq!(calls.len(), 0, "No rooms means no notifications"); - } - - // ── format_rate_limit_notification ───────────────────────────────────── - - #[test] - fn format_rate_limit_notification_includes_agent_and_story() { - let (plain, html) = format_rate_limit_notification( - "365_story_my_feature", - Some("My Feature"), - "coder-2", - ); - assert_eq!( - plain, - "\u{26a0}\u{fe0f} #365 My Feature \u{2014} coder-2 hit an API rate limit" - ); - assert_eq!( - html, - "\u{26a0}\u{fe0f} #365 My Feature \u{2014} coder-2 hit an API rate limit" - ); - } - - #[test] - fn format_rate_limit_notification_falls_back_to_item_id() { - let (plain, _html) = - format_rate_limit_notification("42_story_thing", None, "coder-1"); - assert_eq!( - plain, - "\u{26a0}\u{fe0f} #42 42_story_thing \u{2014} coder-1 hit an API rate limit" - ); - } - - // ── format_stage_notification ─────────────────────────────────────────── - - #[test] - fn format_notification_done_stage_includes_party_emoji() { - let (plain, html) = format_stage_notification( - "353_story_done", - Some("Done Story"), - "Merge", - "Done", - ); - assert_eq!( - plain, - "\u{1f389} #353 Done Story \u{2014} Merge \u{2192} Done" - ); - assert_eq!( - html, - "\u{1f389} #353 Done Story \u{2014} Merge \u{2192} Done" - ); - } - - #[test] - fn format_notification_non_done_stage_has_no_emoji() { - let (plain, _html) = format_stage_notification( - "42_story_thing", - Some("Some Story"), - "Backlog", - "Current", - ); - assert!(!plain.contains("\u{1f389}")); - } - - #[test] - fn format_notification_with_story_name() { - let (plain, html) = format_stage_notification( - "261_story_bot_notifications", - Some("Bot notifications"), - "Upcoming", - "Current", - ); - assert_eq!( - plain, - "#261 Bot notifications \u{2014} Upcoming \u{2192} Current" - ); - assert_eq!( - html, - "#261 Bot notifications \u{2014} Upcoming \u{2192} Current" - ); - } - - #[test] - fn format_notification_without_story_name_falls_back_to_item_id() { - let (plain, _html) = format_stage_notification( - "42_bug_fix_thing", - None, - "Current", - "QA", - ); - assert_eq!( - plain, - "#42 42_bug_fix_thing \u{2014} Current \u{2192} QA" - ); - } - - #[test] - fn format_notification_non_numeric_id_uses_full_id() { - let (plain, _html) = format_stage_notification( - "abc_story_thing", - Some("Some Story"), - "QA", - "Merge", - ); - assert_eq!( - plain, - "#abc_story_thing Some Story \u{2014} QA \u{2192} Merge" - ); - } -} diff --git a/test_fail.txt b/test_fail.txt new file mode 100644 index 00000000..e69de29b