From b340aa97b07b10853229ddefd2b89c178417cc2e Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 27 Apr 2026 01:32:08 +0000 Subject: [PATCH] fix: clean up clippy warnings + cargo fmt across post-refactor surface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 13-file refactor pass (commits db00a5d4 through eca15b4e) introduced ~89 clippy errors and 38 cargo fmt issues — every agent in every worktree hit them on script/test, burning their turn budget on cleanup before doing real story work. This is the silent kill behind 644, 652, 655, 664, 667 all hitting watchdog limits this round. Changes: - cargo fmt --all across 37 files (formatting normalisation only) - #![allow(unused_imports, dead_code)] on 24 split modules where the python-script splitter imported liberally to be safe; tighter cleanup per-import will happen as agents touch each module - Removed truly-dead re-exports (cleanup_merge_workspace, slog_warn from http/mcp/mod.rs, CliArgs/print_help from main.rs) - Prefixed _auth_msg in crdt_sync/server.rs (handshake helper return is bound but not consumed) - Converted dangling /// doc block in crdt_sync/mod.rs to //! so it attaches to the module - Removed empty lines after doc comments in 4 spots (clippy lint) All 2636 tests pass; clippy --all-targets -- -D warnings clean. --- server/src/agents/merge/mod.rs | 2 +- server/src/agents/merge/squash.rs | 5 +- server/src/agents/pool/lifecycle.rs | 1766 +++++++++++++++++ .../agents/pool/pipeline/advance/helpers.rs | 1 - .../src/agents/pool/pipeline/advance/mod.rs | 4 +- server/src/agents/pool/start/mod.rs | 3 +- server/src/agents/pool/start/spawn.rs | 569 +++--- server/src/agents/pool/start/validation.rs | 5 +- .../chat/transport/matrix/config/loading.rs | 1 - .../src/chat/transport/matrix/config/mod.rs | 2 +- .../chat/transport/matrix/notifications.rs | 931 +++++++++ server/src/cli.rs | 1 - server/src/crdt_state/mod.rs | 8 +- server/src/crdt_state/ops.rs | 5 +- server/src/crdt_state/presence.rs | 5 +- server/src/crdt_state/read.rs | 9 +- server/src/crdt_state/state.rs | 9 +- server/src/crdt_state/types.rs | 6 +- server/src/crdt_state/write.rs | 15 +- server/src/crdt_sync/client.rs | 3 +- server/src/crdt_sync/handshake.rs | 17 +- server/src/crdt_sync/mod.rs | 85 +- server/src/crdt_sync/server.rs | 12 +- server/src/http/mcp/dispatch.rs | 9 +- server/src/http/mcp/mod.rs | 3 - server/src/http/mcp/story_tools/bug.rs | 3 +- server/src/http/mcp/story_tools/criteria.rs | 5 +- server/src/http/mcp/story_tools/refactor.rs | 3 +- server/src/http/mcp/story_tools/spike.rs | 5 +- server/src/http/mcp/story_tools/story.rs | 5 +- server/src/http/mcp/tools_list.rs | 1 - server/src/io/fs/scaffold/detect.rs | 1 - server/src/io/fs/scaffold/mod.rs | 2 +- server/src/io/fs/scaffold/templates.rs | 6 +- .../src/llm/providers/claude_code/events.rs | 2 - server/src/llm/providers/claude_code/mod.rs | 4 +- server/src/llm/providers/claude_code/parse.rs | 8 +- server/src/main.rs | 3 +- server/src/pipeline_state/events.rs | 15 +- server/src/pipeline_state/mod.rs | 3 +- server/src/pipeline_state/projection.rs | 20 +- server/src/pipeline_state/subscribers.rs | 2 - 42 files changed, 3125 insertions(+), 439 deletions(-) create mode 100644 server/src/agents/pool/lifecycle.rs create mode 100644 server/src/chat/transport/matrix/notifications.rs diff --git a/server/src/agents/merge/mod.rs b/server/src/agents/merge/mod.rs index b72a50ec..73aabb7f 100644 --- a/server/src/agents/merge/mod.rs +++ b/server/src/agents/merge/mod.rs @@ -5,7 +5,7 @@ use serde::Serialize; mod conflicts; mod squash; -pub(crate) use squash::{cleanup_merge_workspace, run_squash_merge}; +pub(crate) use squash::run_squash_merge; /// Status of an async merge job. #[derive(Debug, Clone, Serialize)] diff --git a/server/src/agents/merge/squash.rs b/server/src/agents/merge/squash.rs index b28591a8..526808d6 100644 --- a/server/src/agents/merge/squash.rs +++ b/server/src/agents/merge/squash.rs @@ -1,13 +1,14 @@ //! Squash-merge orchestration: rebase agent work onto master and run post-merge gates. +#![allow(unused_imports, dead_code)] use std::path::Path; use std::process::Command; use std::sync::Mutex; -use crate::config::ProjectConfig; -use super::conflicts::try_resolve_conflicts; use super::super::gates::run_project_tests; +use super::conflicts::try_resolve_conflicts; use super::{MergeReport, SquashMergeResult}; +use crate::config::ProjectConfig; /// Global lock ensuring only one squash-merge runs at a time. /// diff --git a/server/src/agents/pool/lifecycle.rs b/server/src/agents/pool/lifecycle.rs new file mode 100644 index 00000000..55c2ad6c --- /dev/null +++ b/server/src/agents/pool/lifecycle.rs @@ -0,0 +1,1766 @@ +use crate::agent_log::AgentLogWriter; +use crate::config::ProjectConfig; +use crate::slog; +use crate::slog_error; +use std::path::Path; +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; + +use super::super::{ + AgentEvent, AgentInfo, AgentStatus, PipelineStage, agent_config_stage, + pipeline_stage, +}; +use super::types::{PendingGuard, StoryAgent, agent_info_from_entry, composite_key}; +use super::{AgentPool, auto_assign}; +use super::worktree::find_active_story_stage; +use super::super::runtime::{AgentRuntime, ClaudeCodeRuntime, GeminiRuntime, OpenAiRuntime, RuntimeContext}; + +impl AgentPool { + /// Start an agent for a story: load config, create worktree, spawn agent. + /// + /// When `agent_name` is `None`, automatically selects the first idle coder + /// agent (story 190). If all coders are busy the call fails with an error + /// indicating the story will be picked up when one becomes available. + /// + /// If `resume_context` is provided, it is appended to the rendered prompt + /// so the agent can pick up from a previous failed attempt. + pub async fn start_agent( + &self, + project_root: &Path, + story_id: &str, + agent_name: Option<&str>, + resume_context: Option<&str>, + ) -> Result { + let config = ProjectConfig::load(project_root)?; + + // Validate explicit agent name early (no lock needed). + if let Some(name) = agent_name { + config + .find_agent(name) + .ok_or_else(|| format!("No agent named '{name}' in config"))?; + } + + // Create name-independent shared resources before the lock so they are + // ready for the atomic check-and-insert (story 132). + let (tx, _) = broadcast::channel::(1024); + let event_log: Arc>> = Arc::new(Mutex::new(Vec::new())); + let log_session_id = uuid::Uuid::new_v4().to_string(); + + // Move story from backlog/ to current/ before checking agent + // availability so that auto_assign_available_work can pick it up even + // when all coders are currently busy (story 203). This is idempotent: + // if the story is already in 2_current/ or a later stage, the call is + // a no-op. + crate::agents::lifecycle::move_story_to_current(project_root, story_id)?; + + // Validate that the agent's configured stage matches the story's + // pipeline stage. This prevents any caller (auto-assign, MCP tool, + // pipeline advance, supervisor) from starting a wrong-stage agent on + // a story — e.g. mergemaster on a coding-stage story (bug 312). + if let Some(name) = agent_name { + let agent_stage = config + .find_agent(name) + .map(agent_config_stage) + .unwrap_or_else(|| pipeline_stage(name)); + if agent_stage != PipelineStage::Other + && let Some(story_stage_dir) = find_active_story_stage(project_root, story_id) + { + let expected_stage = match story_stage_dir { + "2_current" => PipelineStage::Coder, + "3_qa" => PipelineStage::Qa, + "4_merge" => PipelineStage::Mergemaster, + _ => PipelineStage::Other, + }; + if expected_stage != PipelineStage::Other && expected_stage != agent_stage { + return Err(format!( + "Agent '{name}' (stage: {agent_stage:?}) cannot be assigned to \ + story '{story_id}' in {story_stage_dir}/ (requires stage: {expected_stage:?})" + )); + } + } + } + + // Read the preferred agent from the story's front matter before acquiring + // the lock. When no explicit agent_name is given, this lets start_agent + // honour `agent: coder-opus` written by the `assign` command — mirroring + // the auto_assign path (bug 379). + let front_matter_agent: Option = if agent_name.is_none() { + find_active_story_stage(project_root, story_id).and_then(|stage_dir| { + let path = project_root + .join(".storkit") + .join("work") + .join(stage_dir) + .join(format!("{story_id}.md")); + let contents = std::fs::read_to_string(path).ok()?; + crate::io::story_metadata::parse_front_matter(&contents).ok()?.agent + }) + } else { + None + }; + + // Atomically resolve agent name, check availability, and register as + // Pending. When `agent_name` is `None` the first idle coder is + // selected inside the lock so no TOCTOU race can occur between the + // availability check and the Pending insert (story 132, story 190). + // + // The `PendingGuard` ensures that if any step below fails the entry is + // removed from the pool so it does not permanently block auto-assign + // (bug 118). + let resolved_name: String; + let key: String; + { + let mut agents = self.agents.lock().map_err(|e| e.to_string())?; + + resolved_name = match agent_name { + Some(name) => name.to_string(), + None => { + // Honour the `agent:` field in the story's front matter so that + // `start 368` after `assign 368 opus` picks the right agent + // (bug 379). Mirrors the auto_assign selection logic. + if let Some(ref pref) = front_matter_agent { + let stage_matches = config + .find_agent(pref) + .map(|cfg| agent_config_stage(cfg) == PipelineStage::Coder) + .unwrap_or(false); + if stage_matches { + if auto_assign::is_agent_free(&agents, pref) { + pref.clone() + } else { + return Err(format!( + "Preferred agent '{pref}' from story front matter is busy; \ + story '{story_id}' has been queued in work/2_current/ and will \ + be auto-assigned when it becomes available" + )); + } + } else { + // Stage mismatch — fall back to any free coder. + auto_assign::find_free_agent_for_stage( + &config, + &agents, + &PipelineStage::Coder, + ) + .map(|s| s.to_string()) + .ok_or_else(|| { + if config + .agent + .iter() + .any(|a| agent_config_stage(a) == PipelineStage::Coder) + { + format!( + "All coder agents are busy; story '{story_id}' has been \ + queued in work/2_current/ and will be auto-assigned when \ + one becomes available" + ) + } else { + "No coder agent configured. Specify an agent_name explicitly." + .to_string() + } + })? + } + } else { + auto_assign::find_free_agent_for_stage( + &config, + &agents, + &PipelineStage::Coder, + ) + .map(|s| s.to_string()) + .ok_or_else(|| { + if config + .agent + .iter() + .any(|a| agent_config_stage(a) == PipelineStage::Coder) + { + format!( + "All coder agents are busy; story '{story_id}' has been \ + queued in work/2_current/ and will be auto-assigned when \ + one becomes available" + ) + } else { + "No coder agent configured. Specify an agent_name explicitly." + .to_string() + } + })? + } + } + }; + + key = composite_key(story_id, &resolved_name); + + // Check for duplicate assignment (same story + same agent already active). + if let Some(agent) = agents.get(&key) + && (agent.status == AgentStatus::Running || agent.status == AgentStatus::Pending) + { + return Err(format!( + "Agent '{resolved_name}' for story '{story_id}' is already {}", + agent.status + )); + } + // Enforce single-stage concurrency: reject if there is already a + // Running/Pending agent at the same pipeline stage for this story. + // This prevents two coders (or two QA/mergemaster agents) from + // corrupting each other's work in the same worktree. + // Applies to both explicit and auto-selected agents; the Other + // stage (supervisors, unknown agents) is exempt. + let resolved_stage = config + .find_agent(&resolved_name) + .map(agent_config_stage) + .unwrap_or_else(|| pipeline_stage(&resolved_name)); + if resolved_stage != PipelineStage::Other + && let Some(conflicting_name) = agents.iter().find_map(|(k, a)| { + let k_story = k.rsplit_once(':').map(|(s, _)| s).unwrap_or(k); + if k_story == story_id + && a.agent_name != resolved_name + && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) + { + let a_stage = config + .find_agent(&a.agent_name) + .map(agent_config_stage) + .unwrap_or_else(|| pipeline_stage(&a.agent_name)); + if a_stage == resolved_stage { + Some(a.agent_name.clone()) + } else { + None + } + } else { + None + } + }) + { + return Err(format!( + "Cannot start '{resolved_name}' on story '{story_id}': \ + '{conflicting_name}' is already active at the same pipeline stage" + )); + } + // Enforce single-instance concurrency for explicitly-named agents: + // if this agent is already running on any other story, reject. + // Auto-selected agents are already guaranteed idle by + // find_free_agent_for_stage, so this check is only needed for + // explicit requests. + if agent_name.is_some() + && let Some(busy_story) = agents.iter().find_map(|(k, a)| { + if a.agent_name == resolved_name + && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) + { + Some( + k.rsplit_once(':') + .map(|(sid, _)| sid) + .unwrap_or(k) + .to_string(), + ) + } else { + None + } + }) + { + return Err(format!( + "Agent '{resolved_name}' is already running on story '{busy_story}'; \ + story '{story_id}' will be picked up when the agent becomes available" + )); + } + agents.insert( + key.clone(), + StoryAgent { + agent_name: resolved_name.clone(), + status: AgentStatus::Pending, + worktree_info: None, + session_id: None, + tx: tx.clone(), + task_handle: None, + event_log: event_log.clone(), + completion: None, + project_root: Some(project_root.to_path_buf()), + log_session_id: Some(log_session_id.clone()), + merge_failure_reported: false, + throttled: false, + }, + ); + } + let mut pending_guard = PendingGuard::new(self.agents.clone(), key.clone()); + + // Create persistent log writer (needs resolved_name, so must be after + // the atomic resolution above). + let log_writer = + match AgentLogWriter::new(project_root, story_id, &resolved_name, &log_session_id) { + Ok(w) => Some(Arc::new(Mutex::new(w))), + Err(e) => { + eprintln!( + "[agents] Failed to create log writer for {story_id}:{resolved_name}: {e}" + ); + None + } + }; + + // Notify WebSocket clients that a new agent is pending. + Self::notify_agent_state_changed(&self.watcher_tx); + + let _ = tx.send(AgentEvent::Status { + story_id: story_id.to_string(), + agent_name: resolved_name.clone(), + status: "pending".to_string(), + }); + + // Extract inactivity timeout from the agent config before cloning config. + let inactivity_timeout_secs = config + .find_agent(&resolved_name) + .map(|a| a.inactivity_timeout_secs) + .unwrap_or(300); + + // Clone all values needed inside the background spawn. + let project_root_clone = project_root.to_path_buf(); + let config_clone = config.clone(); + let resume_context_owned = resume_context.map(str::to_string); + let sid = story_id.to_string(); + let aname = resolved_name.clone(); + let tx_clone = tx.clone(); + let agents_ref = self.agents.clone(); + let key_clone = key.clone(); + let log_clone = event_log.clone(); + let port_for_task = self.port; + let log_writer_clone = log_writer.clone(); + let child_killers_clone = self.child_killers.clone(); + let watcher_tx_clone = self.watcher_tx.clone(); + + // Spawn the background task. Worktree creation and agent launch happen here + // so `start_agent` returns immediately after registering the agent as + // Pending — non-blocking by design (story 157). + let handle = tokio::spawn(async move { + // Step 1: create the worktree (slow — git checkout, pnpm install, etc.) + let wt_info = match crate::worktree::create_worktree( + &project_root_clone, + &sid, + &config_clone, + port_for_task, + ) + .await + { + Ok(wt) => wt, + Err(e) => { + let error_msg = format!("Failed to create worktree: {e}"); + slog_error!("[agents] {error_msg}"); + let event = AgentEvent::Error { + story_id: sid.clone(), + agent_name: aname.clone(), + message: error_msg, + }; + if let Ok(mut log) = log_clone.lock() { + log.push(event.clone()); + } + let _ = tx_clone.send(event); + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.status = AgentStatus::Failed; + } + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + return; + } + }; + + // Step 2: store worktree info and render agent command/args/prompt. + let wt_path_str = wt_info.path.to_string_lossy().to_string(); + { + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.worktree_info = Some(wt_info.clone()); + } + } + + let (command, args, mut prompt) = match config_clone.render_agent_args( + &wt_path_str, + &sid, + Some(&aname), + Some(&wt_info.base_branch), + ) { + Ok(result) => result, + Err(e) => { + let error_msg = format!("Failed to render agent args: {e}"); + slog_error!("[agents] {error_msg}"); + let event = AgentEvent::Error { + story_id: sid.clone(), + agent_name: aname.clone(), + message: error_msg, + }; + if let Ok(mut log) = log_clone.lock() { + log.push(event.clone()); + } + let _ = tx_clone.send(event); + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.status = AgentStatus::Failed; + } + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + return; + } + }; + + // Append resume context if this is a restart with failure information. + if let Some(ctx) = resume_context_owned { + prompt.push_str(&ctx); + } + + // Step 3: transition to Running now that the worktree is ready. + { + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.status = AgentStatus::Running; + } + } + let _ = tx_clone.send(AgentEvent::Status { + story_id: sid.clone(), + agent_name: aname.clone(), + status: "running".to_string(), + }); + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + + // Step 4: launch the agent process via the configured runtime. + let runtime_name = config_clone + .find_agent(&aname) + .and_then(|a| a.runtime.as_deref()) + .unwrap_or("claude-code"); + + let run_result = match runtime_name { + "claude-code" => { + let runtime = ClaudeCodeRuntime::new(child_killers_clone.clone(), watcher_tx_clone.clone()); + let ctx = RuntimeContext { + story_id: sid.clone(), + agent_name: aname.clone(), + command, + args, + prompt, + cwd: wt_path_str, + inactivity_timeout_secs, + mcp_port: port_for_task, + }; + runtime + .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) + .await + } + "gemini" => { + let runtime = GeminiRuntime::new(); + let ctx = RuntimeContext { + story_id: sid.clone(), + agent_name: aname.clone(), + command, + args, + prompt, + cwd: wt_path_str, + inactivity_timeout_secs, + mcp_port: port_for_task, + }; + runtime + .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) + .await + } + "openai" => { + let runtime = OpenAiRuntime::new(); + let ctx = RuntimeContext { + story_id: sid.clone(), + agent_name: aname.clone(), + command, + args, + prompt, + cwd: wt_path_str, + inactivity_timeout_secs, + mcp_port: port_for_task, + }; + runtime + .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) + .await + } + other => Err(format!( + "Unknown agent runtime '{other}'; check the 'runtime' field in project.toml. \ + Supported: 'claude-code', 'gemini', 'openai'" + )), + }; + + match run_result { + Ok(result) => { + // Persist token usage if the agent reported it. + if let Some(ref usage) = result.token_usage + && let Ok(agents) = agents_ref.lock() + && let Some(agent) = agents.get(&key_clone) + && let Some(ref pr) = agent.project_root + { + let model = config_clone + .find_agent(&aname) + .and_then(|a| a.model.clone()); + let record = crate::agents::token_usage::build_record( + &sid, &aname, model, usage.clone(), + ); + if let Err(e) = crate::agents::token_usage::append_record(pr, &record) { + slog_error!( + "[agents] Failed to persist token usage for \ + {sid}:{aname}: {e}" + ); + } + } + + // Server-owned completion: run acceptance gates automatically + // when the agent process exits normally. + super::pipeline::run_server_owned_completion( + &agents_ref, + port_for_task, + &sid, + &aname, + result.session_id, + watcher_tx_clone.clone(), + ) + .await; + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + } + Err(e) => { + slog_error!("[agents] Agent process error for {aname} on {sid}: {e}"); + let event = AgentEvent::Error { + story_id: sid.clone(), + agent_name: aname.clone(), + message: e, + }; + if let Ok(mut log) = log_clone.lock() { + log.push(event.clone()); + } + let _ = tx_clone.send(event); + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.status = AgentStatus::Failed; + } + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + } + } + }); + + // Store the task handle while the agent is still Pending. + { + let mut agents = self.agents.lock().map_err(|e| e.to_string())?; + if let Some(agent) = agents.get_mut(&key) { + agent.task_handle = Some(handle); + } + } + + // Agent successfully spawned — prevent the guard from removing the entry. + pending_guard.disarm(); + + Ok(AgentInfo { + story_id: story_id.to_string(), + agent_name: resolved_name, + status: AgentStatus::Pending, + session_id: None, + worktree_path: None, + base_branch: None, + completion: None, + log_session_id: Some(log_session_id), + throttled: false, + }) + } + + /// Stop a running agent. Worktree is preserved for inspection. + pub async fn stop_agent( + &self, + _project_root: &Path, + story_id: &str, + agent_name: &str, + ) -> Result<(), String> { + let key = composite_key(story_id, agent_name); + + let (worktree_info, task_handle, tx) = { + let mut agents = self.agents.lock().map_err(|e| e.to_string())?; + let agent = agents + .get_mut(&key) + .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; + + let wt = agent.worktree_info.clone(); + let handle = agent.task_handle.take(); + let tx = agent.tx.clone(); + agent.status = AgentStatus::Failed; + (wt, handle, tx) + }; + + // Abort the task and kill the PTY child process. + // Note: aborting a spawn_blocking task handle does not interrupt the blocking + // thread, so we must also kill the child process directly via the killer registry. + if let Some(handle) = task_handle { + handle.abort(); + let _ = handle.await; + } + self.kill_child_for_key(&key); + + // Preserve worktree for inspection — don't destroy agent's work on stop. + if let Some(ref wt) = worktree_info { + slog!( + "[agents] Worktree preserved for {story_id}:{agent_name}: {}", + wt.path.display() + ); + } + + let _ = tx.send(AgentEvent::Status { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + status: "stopped".to_string(), + }); + + // Remove from map + { + let mut agents = self.agents.lock().map_err(|e| e.to_string())?; + agents.remove(&key); + } + + // Notify WebSocket clients so pipeline board and agent panel update. + Self::notify_agent_state_changed(&self.watcher_tx); + + Ok(()) + } + + /// Block until the agent reaches a terminal state (completed, failed, stopped). + /// Returns the agent's final `AgentInfo`. + /// `timeout_ms` caps how long to wait; returns an error if the deadline passes. + pub async fn wait_for_agent( + &self, + story_id: &str, + agent_name: &str, + timeout_ms: u64, + ) -> Result { + // Subscribe before checking status so we don't miss the terminal event + // if the agent completes in the window between the two operations. + let mut rx = self.subscribe(story_id, agent_name)?; + + // Return immediately if already in a terminal state. + { + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let key = composite_key(story_id, agent_name); + if let Some(agent) = agents.get(&key) + && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) + { + return Ok(agent_info_from_entry(story_id, agent)); + } + } + + let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms); + + loop { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + if remaining.is_zero() { + return Err(format!( + "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" + )); + } + + match tokio::time::timeout(remaining, rx.recv()).await { + Ok(Ok(event)) => { + let is_terminal = match &event { + AgentEvent::Done { .. } | AgentEvent::Error { .. } => true, + AgentEvent::Status { status, .. } if status == "stopped" => true, + _ => false, + }; + if is_terminal { + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let key = composite_key(story_id, agent_name); + return Ok(if let Some(agent) = agents.get(&key) { + agent_info_from_entry(story_id, agent) + } else { + // Agent was removed from map (e.g. stop_agent removes it after + // the "stopped" status event is sent). + let (status, session_id) = match event { + AgentEvent::Done { session_id, .. } => { + (AgentStatus::Completed, session_id) + } + _ => (AgentStatus::Failed, None), + }; + AgentInfo { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + status, + session_id, + worktree_path: None, + base_branch: None, + completion: None, + log_session_id: None, + throttled: false, + } + }); + } + } + Ok(Err(broadcast::error::RecvError::Lagged(_))) => { + // Missed some buffered events — check current status before resuming. + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let key = composite_key(story_id, agent_name); + if let Some(agent) = agents.get(&key) + && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) + { + return Ok(agent_info_from_entry(story_id, agent)); + } + // Still running — continue the loop. + } + Ok(Err(broadcast::error::RecvError::Closed)) => { + // Channel closed: no more events will arrive. Return current state. + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let key = composite_key(story_id, agent_name); + if let Some(agent) = agents.get(&key) { + return Ok(agent_info_from_entry(story_id, agent)); + } + return Err(format!( + "Agent '{agent_name}' for story '{story_id}' channel closed unexpectedly" + )); + } + Err(_) => { + return Err(format!( + "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" + )); + } + } + } + } + + /// Remove all agent entries for a given story_id from the pool. + /// + /// Called when a story is archived so that stale entries don't accumulate. + /// Returns the number of entries removed. + pub fn remove_agents_for_story(&self, story_id: &str) -> usize { + let mut agents = match self.agents.lock() { + Ok(a) => a, + Err(e) => { + slog_error!("[agents] Failed to lock pool for cleanup of '{story_id}': {e}"); + return 0; + } + }; + let prefix = format!("{story_id}:"); + let keys_to_remove: Vec = agents + .keys() + .filter(|k| k.starts_with(&prefix)) + .cloned() + .collect(); + let count = keys_to_remove.len(); + for key in &keys_to_remove { + agents.remove(key); + } + if count > 0 { + slog!("[agents] Removed {count} agent entries for archived story '{story_id}'"); + } + count + } +} + +#[cfg(test)] +mod tests { + use super::super::AgentPool; + use crate::agents::{AgentEvent, AgentStatus}; + + #[tokio::test] + async fn wait_for_agent_returns_immediately_if_completed() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("s1", "bot", AgentStatus::Completed); + + let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap(); + assert_eq!(info.status, AgentStatus::Completed); + assert_eq!(info.story_id, "s1"); + assert_eq!(info.agent_name, "bot"); + } + + #[tokio::test] + async fn wait_for_agent_returns_immediately_if_failed() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("s2", "bot", AgentStatus::Failed); + + let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap(); + assert_eq!(info.status, AgentStatus::Failed); + } + + #[tokio::test] + async fn wait_for_agent_completes_on_done_event() { + let pool = AgentPool::new_test(3001); + let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running); + + // Send Done event after a short delay + let tx_clone = tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + let _ = tx_clone.send(AgentEvent::Done { + story_id: "s3".to_string(), + agent_name: "bot".to_string(), + session_id: Some("sess-abc".to_string()), + }); + }); + + let info = pool.wait_for_agent("s3", "bot", 2000).await.unwrap(); + assert_eq!(info.story_id, "s3"); + } + + #[tokio::test] + async fn wait_for_agent_times_out() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("s4", "bot", AgentStatus::Running); + + let result = pool.wait_for_agent("s4", "bot", 50).await; + assert!(result.is_err()); + let msg = result.unwrap_err(); + assert!(msg.contains("Timed out"), "unexpected message: {msg}"); + } + + #[tokio::test] + async fn wait_for_agent_errors_for_nonexistent() { + let pool = AgentPool::new_test(3001); + let result = pool.wait_for_agent("no_story", "no_bot", 100).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn wait_for_agent_completes_on_stopped_status_event() { + let pool = AgentPool::new_test(3001); + let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running); + + let tx_clone = tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(30)).await; + let _ = tx_clone.send(AgentEvent::Status { + story_id: "s5".to_string(), + agent_name: "bot".to_string(), + status: "stopped".to_string(), + }); + }); + + let info = pool.wait_for_agent("s5", "bot", 2000).await.unwrap(); + assert_eq!(info.story_id, "s5"); + } + + #[tokio::test] + async fn start_agent_auto_selects_second_coder_when_first_busy() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + std::fs::create_dir_all(&sk).unwrap(); + std::fs::write( + sk.join("project.toml"), + r#" +[[agent]] +name = "supervisor" +stage = "other" + +[[agent]] +name = "coder-1" +stage = "coder" + +[[agent]] +name = "coder-2" +stage = "coder" +"#, + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("other-story", "coder-1", AgentStatus::Running); + + let result = pool + .start_agent(tmp.path(), "42_my_story", None, None) + .await; + match result { + Ok(info) => { + assert_eq!(info.agent_name, "coder-2"); + } + Err(err) => { + assert!( + !err.contains("All coder agents are busy"), + "should have selected coder-2 but got: {err}" + ); + assert!( + !err.contains("No coder agent configured"), + "should not fail on agent selection, got: {err}" + ); + } + } + } + + #[tokio::test] + async fn start_agent_returns_busy_when_all_coders_occupied() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + std::fs::create_dir_all(&sk).unwrap(); + std::fs::write( + sk.join("project.toml"), + r#" +[[agent]] +name = "coder-1" +stage = "coder" + +[[agent]] +name = "coder-2" +stage = "coder" +"#, + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); + pool.inject_test_agent("story-2", "coder-2", AgentStatus::Pending); + + let result = pool.start_agent(tmp.path(), "story-3", None, None).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.contains("All coder agents are busy"), + "expected busy error, got: {err}" + ); + } + + #[tokio::test] + async fn start_agent_moves_story_to_current_when_coders_busy() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + let backlog = sk.join("work/1_backlog"); + std::fs::create_dir_all(&backlog).unwrap(); + std::fs::write( + sk.join("project.toml"), + r#" +[[agent]] +name = "coder-1" +stage = "coder" +"#, + ) + .unwrap(); + std::fs::write(backlog.join("story-3.md"), "---\nname: Story 3\n---\n").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); + + let result = pool.start_agent(tmp.path(), "story-3", None, None).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.contains("All coder agents are busy"), + "expected busy error, got: {err}" + ); + assert!( + err.contains("queued in work/2_current/"), + "expected story-to-current message, got: {err}" + ); + + let current_path = sk.join("work/2_current/story-3.md"); + assert!( + current_path.exists(), + "story should be in 2_current/ after busy error, but was not" + ); + let backlog_path = backlog.join("story-3.md"); + assert!( + !backlog_path.exists(), + "story should no longer be in 1_backlog/" + ); + } + + #[tokio::test] + async fn start_agent_story_already_in_current_is_noop() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + let current = sk.join("work/2_current"); + std::fs::create_dir_all(¤t).unwrap(); + std::fs::write( + sk.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", + ) + .unwrap(); + std::fs::write(current.join("story-5.md"), "---\nname: Story 5\n---\n").unwrap(); + + let pool = AgentPool::new_test(3001); + + let result = pool.start_agent(tmp.path(), "story-5", None, None).await; + match result { + Ok(_) => {} + Err(e) => { + assert!( + !e.contains("Failed to move"), + "should not fail on idempotent move, got: {e}" + ); + } + } + } + + #[tokio::test] + async fn start_agent_explicit_name_unchanged_when_busy() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + std::fs::create_dir_all(&sk).unwrap(); + std::fs::write( + sk.join("project.toml"), + r#" +[[agent]] +name = "coder-1" +stage = "coder" + +[[agent]] +name = "coder-2" +stage = "coder" +"#, + ) + .unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running); + + let result = pool + .start_agent(tmp.path(), "story-2", Some("coder-1"), None) + .await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.contains("coder-1") && err.contains("already running"), + "expected explicit busy error, got: {err}" + ); + } + + // ── start_agent single-instance concurrency tests ───────────────────────── + + #[tokio::test] + async fn start_agent_rejects_when_same_agent_already_running_on_another_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story-a", "qa", AgentStatus::Running); + + let result = pool.start_agent(root, "story-b", Some("qa"), None).await; + + assert!( + result.is_err(), + "start_agent should fail when qa is already running on another story" + ); + let err = result.unwrap_err(); + assert!( + err.contains("already running") || err.contains("becomes available"), + "error message should explain why: got '{err}'" + ); + } + + #[tokio::test] + async fn start_agent_allows_new_story_when_previous_run_is_completed() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story-a", "qa", AgentStatus::Completed); + + let result = pool.start_agent(root, "story-b", Some("qa"), None).await; + + if let Err(ref e) = result { + assert!( + !e.contains("already running") && !e.contains("becomes available"), + "completed agent must not trigger the concurrency guard: got '{e}'" + ); + } + } + + // ── bug 118: pending entry cleanup on start_agent failure ──────────────── + + #[tokio::test] + async fn start_agent_cleans_up_pending_entry_on_failure() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n", + ) + .unwrap(); + + let upcoming = root.join(".storkit/work/1_backlog"); + fs::create_dir_all(&upcoming).unwrap(); + fs::write(upcoming.join("50_story_test.md"), "---\nname: Test\n---\n").unwrap(); + + let pool = AgentPool::new_test(3099); + + let result = pool + .start_agent(root, "50_story_test", Some("coder-1"), None) + .await; + + assert!( + result.is_ok(), + "start_agent should return Ok(Pending) immediately: {:?}", + result.err() + ); + assert_eq!( + result.unwrap().status, + AgentStatus::Pending, + "initial status must be Pending" + ); + + let final_info = pool + .wait_for_agent("50_story_test", "coder-1", 5000) + .await + .expect("wait_for_agent should not time out"); + assert_eq!( + final_info.status, + AgentStatus::Failed, + "agent must transition to Failed after worktree creation error" + ); + + let agents = pool.agents.lock().unwrap(); + let failed_entry = agents + .values() + .find(|a| a.agent_name == "coder-1" && a.status == AgentStatus::Failed); + assert!( + failed_entry.is_some(), + "agent pool must retain a Failed entry so the UI can show the error state" + ); + drop(agents); + + let events = pool + .drain_events("50_story_test", "coder-1") + .expect("drain_events should succeed"); + let has_error_event = events.iter().any(|e| matches!(e, AgentEvent::Error { .. })); + assert!( + has_error_event, + "event_log must contain AgentEvent::Error after worktree creation fails" + ); + } + + #[tokio::test] + async fn start_agent_guard_does_not_remove_running_entry() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"qa\"\n").unwrap(); + + let pool = AgentPool::new_test(3099); + pool.inject_test_agent("story-x", "qa", AgentStatus::Running); + + let result = pool.start_agent(root, "story-y", Some("qa"), None).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.contains("already running") || err.contains("becomes available"), + "running entry must survive: got '{err}'" + ); + } + + // ── TOCTOU race-condition regression tests (story 132) ─────────────────── + + #[tokio::test] + async fn toctou_pending_entry_blocks_same_agent_on_different_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + pool.inject_test_agent("86_story_foo", "coder-1", AgentStatus::Pending); + + let result = pool + .start_agent(root, "130_story_bar", Some("coder-1"), None) + .await; + + assert!(result.is_err(), "second start_agent must be rejected"); + let err = result.unwrap_err(); + assert!( + err.contains("already running") || err.contains("becomes available"), + "expected concurrency-rejection message, got: '{err}'" + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn toctou_concurrent_start_agent_same_agent_exactly_one_concurrency_rejection() { + use std::fs; + use std::sync::Arc; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path().to_path_buf(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/1_backlog")).unwrap(); + fs::write( + root.join(".storkit/project.toml"), + "[[agent]]\nname = \"coder-1\"\n", + ) + .unwrap(); + fs::write( + root.join(".storkit/work/1_backlog/86_story_foo.md"), + "---\nname: Foo\n---\n", + ) + .unwrap(); + fs::write( + root.join(".storkit/work/1_backlog/130_story_bar.md"), + "---\nname: Bar\n---\n", + ) + .unwrap(); + + let pool = Arc::new(AgentPool::new_test(3099)); + + let pool1 = pool.clone(); + let root1 = root.clone(); + let t1 = tokio::spawn(async move { + pool1 + .start_agent(&root1, "86_story_foo", Some("coder-1"), None) + .await + }); + + let pool2 = pool.clone(); + let root2 = root.clone(); + let t2 = tokio::spawn(async move { + pool2 + .start_agent(&root2, "130_story_bar", Some("coder-1"), None) + .await + }); + + let (r1, r2) = tokio::join!(t1, t2); + let r1 = r1.unwrap(); + let r2 = r2.unwrap(); + + let concurrency_rejections = [&r1, &r2] + .iter() + .filter(|r| { + r.as_ref().is_err_and(|e| { + e.contains("already running") || e.contains("becomes available") + }) + }) + .count(); + + assert_eq!( + concurrency_rejections, 1, + "exactly one call must be rejected by the concurrency check; \ + got r1={r1:?} r2={r2:?}" + ); + } + + // ── story-230: prevent duplicate stage agents on same story ─────────────── + + #[tokio::test] + async fn start_agent_rejects_second_coder_stage_on_same_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running); + + let result = pool + .start_agent(root, "42_story_foo", Some("coder-2"), None) + .await; + + assert!( + result.is_err(), + "second coder on same story must be rejected" + ); + let err = result.unwrap_err(); + assert!( + err.contains("same pipeline stage"), + "error must mention same pipeline stage, got: '{err}'" + ); + assert!( + err.contains("coder-1") && err.contains("coder-2"), + "error must name both agents, got: '{err}'" + ); + } + + #[tokio::test] + async fn start_agent_rejects_second_qa_stage_on_same_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(&sk_dir).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"qa-1\"\nstage = \"qa\"\n\n\ + [[agent]]\nname = \"qa-2\"\nstage = \"qa\"\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + pool.inject_test_agent("55_story_bar", "qa-1", AgentStatus::Running); + + let result = pool + .start_agent(root, "55_story_bar", Some("qa-2"), None) + .await; + + assert!(result.is_err(), "second qa on same story must be rejected"); + let err = result.unwrap_err(); + assert!( + err.contains("same pipeline stage"), + "error must mention same pipeline stage, got: '{err}'" + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn start_agent_concurrent_two_coders_same_story_exactly_one_stage_rejection() { + use std::fs; + use std::sync::Arc; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path().to_path_buf(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); + fs::write( + root.join(".storkit/project.toml"), + "[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n", + ) + .unwrap(); + fs::write( + root.join(".storkit/work/2_current/42_story_foo.md"), + "---\nname: Foo\n---\n", + ) + .unwrap(); + + let pool = Arc::new(AgentPool::new_test(3099)); + + let pool1 = pool.clone(); + let root1 = root.clone(); + let t1 = tokio::spawn(async move { + pool1 + .start_agent(&root1, "42_story_foo", Some("coder-1"), None) + .await + }); + + let pool2 = pool.clone(); + let root2 = root.clone(); + let t2 = tokio::spawn(async move { + pool2 + .start_agent(&root2, "42_story_foo", Some("coder-2"), None) + .await + }); + + let (r1, r2) = tokio::join!(t1, t2); + let r1 = r1.unwrap(); + let r2 = r2.unwrap(); + + let stage_rejections = [&r1, &r2] + .iter() + .filter(|r| r.as_ref().is_err_and(|e| e.contains("same pipeline stage"))) + .count(); + + assert_eq!( + stage_rejections, 1, + "exactly one call must be rejected by the stage-conflict check; \ + got r1={r1:?} r2={r2:?}" + ); + } + + #[tokio::test] + async fn start_agent_two_coders_different_stories_not_blocked_by_stage_check() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/1_backlog")).unwrap(); + fs::write( + root.join(".storkit/project.toml"), + "[[agent]]\nname = \"coder-1\"\n\n[[agent]]\nname = \"coder-2\"\n", + ) + .unwrap(); + fs::write( + root.join(".storkit/work/1_backlog/99_story_baz.md"), + "---\nname: Baz\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running); + + let result = pool + .start_agent(root, "99_story_baz", Some("coder-2"), None) + .await; + + if let Err(ref e) = result { + assert!( + !e.contains("same pipeline stage"), + "stage-conflict guard must not fire for agents on different stories; \ + got: '{e}'" + ); + } + } + + // ── bug 312: stage-pipeline mismatch guard in start_agent ────────────── + + #[tokio::test] + async fn start_agent_rejects_mergemaster_on_coding_stage_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ + [[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", + ) + .unwrap(); + fs::write( + sk_dir.join("work/2_current/310_story_foo.md"), + "---\nname: Foo\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + let result = pool + .start_agent(root, "310_story_foo", Some("mergemaster"), None) + .await; + + assert!( + result.is_err(), + "mergemaster must not be assigned to a story in 2_current/" + ); + let err = result.unwrap_err(); + assert!( + err.contains("stage") && err.contains("2_current"), + "error must mention stage mismatch, got: '{err}'" + ); + } + + #[tokio::test] + async fn start_agent_rejects_coder_on_qa_stage_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/3_qa")).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"coder-1\"\nstage = \"coder\"\n\n\ + [[agent]]\nname = \"qa\"\nstage = \"qa\"\n", + ) + .unwrap(); + fs::write( + sk_dir.join("work/3_qa/42_story_bar.md"), + "---\nname: Bar\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + let result = pool + .start_agent(root, "42_story_bar", Some("coder-1"), None) + .await; + + assert!( + result.is_err(), + "coder must not be assigned to a story in 3_qa/" + ); + let err = result.unwrap_err(); + assert!( + err.contains("stage") && err.contains("3_qa"), + "error must mention stage mismatch, got: '{err}'" + ); + } + + #[tokio::test] + async fn start_agent_rejects_qa_on_merge_stage_story() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/4_merge")).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"qa\"\nstage = \"qa\"\n\n\ + [[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", + ) + .unwrap(); + fs::write( + sk_dir.join("work/4_merge/55_story_baz.md"), + "---\nname: Baz\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + let result = pool + .start_agent(root, "55_story_baz", Some("qa"), None) + .await; + + assert!( + result.is_err(), + "qa must not be assigned to a story in 4_merge/" + ); + let err = result.unwrap_err(); + assert!( + err.contains("stage") && err.contains("4_merge"), + "error must mention stage mismatch, got: '{err}'" + ); + } + + #[tokio::test] + async fn start_agent_allows_supervisor_on_any_stage() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/2_current")).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"supervisor\"\nstage = \"other\"\n", + ) + .unwrap(); + fs::write( + sk_dir.join("work/2_current/77_story_sup.md"), + "---\nname: Sup\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + let result = pool + .start_agent(root, "77_story_sup", Some("supervisor"), None) + .await; + + match result { + Ok(_) => {} + Err(e) => { + assert!( + !e.contains("stage:") || !e.contains("cannot be assigned"), + "supervisor should not be rejected for stage mismatch, got: '{e}'" + ); + } + } + } + + #[tokio::test] + async fn start_agent_allows_correct_stage_agent() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let sk_dir = root.join(".storkit"); + fs::create_dir_all(sk_dir.join("work/4_merge")).unwrap(); + fs::write( + sk_dir.join("project.toml"), + "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", + ) + .unwrap(); + fs::write( + sk_dir.join("work/4_merge/88_story_ok.md"), + "---\nname: OK\n---\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3099); + let result = pool + .start_agent(root, "88_story_ok", Some("mergemaster"), None) + .await; + + match result { + Ok(_) => {} + Err(e) => { + assert!( + !e.contains("cannot be assigned"), + "mergemaster on 4_merge/ story should not fail stage check, got: '{e}'" + ); + } + } + } + + // ── remove_agents_for_story tests ──────────────────────────────────────── + + #[test] + fn remove_agents_for_story_removes_all_entries() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story_a", "coder-1", AgentStatus::Completed); + pool.inject_test_agent("story_a", "qa", AgentStatus::Failed); + pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running); + + let removed = pool.remove_agents_for_story("story_a"); + assert_eq!(removed, 2, "should remove both agents for story_a"); + + let agents = pool.list_agents().unwrap(); + assert_eq!(agents.len(), 1, "only story_b agent should remain"); + assert_eq!(agents[0].story_id, "story_b"); + } + + #[test] + fn remove_agents_for_story_returns_zero_when_no_match() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running); + + let removed = pool.remove_agents_for_story("nonexistent"); + assert_eq!(removed, 0); + + let agents = pool.list_agents().unwrap(); + assert_eq!(agents.len(), 1, "existing agents should not be affected"); + } + + // ── front matter agent preference (bug 379) ────────────────────────────── + + #[tokio::test] + async fn start_agent_honours_front_matter_agent_when_idle() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + let backlog = sk.join("work/1_backlog"); + std::fs::create_dir_all(&backlog).unwrap(); + std::fs::write( + sk.join("project.toml"), + r#" +[[agent]] +name = "coder-sonnet" +stage = "coder" + +[[agent]] +name = "coder-opus" +stage = "coder" +"#, + ) + .unwrap(); + // Story file with agent preference in front matter. + std::fs::write( + backlog.join("368_story_test.md"), + "---\nname: Test Story\nagent: coder-opus\n---\n# Story 368\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3010); + // coder-sonnet is busy so without front matter the auto-selection + // would skip coder-opus and try something else. + pool.inject_test_agent("other-story", "coder-sonnet", AgentStatus::Running); + + let result = pool + .start_agent(tmp.path(), "368_story_test", None, None) + .await; + match result { + Ok(info) => { + assert_eq!( + info.agent_name, "coder-opus", + "should pick the front-matter preferred agent" + ); + } + Err(err) => { + // Allowed to fail for infrastructure reasons (no git repo), + // but NOT due to agent selection ignoring the preference. + assert!( + !err.contains("All coder agents are busy"), + "should not report busy when coder-opus is idle: {err}" + ); + assert!( + !err.contains("coder-sonnet"), + "should not have picked coder-sonnet: {err}" + ); + } + } + } + + #[tokio::test] + async fn start_agent_returns_error_when_front_matter_agent_busy() { + let tmp = tempfile::tempdir().unwrap(); + let sk = tmp.path().join(".storkit"); + let backlog = sk.join("work/1_backlog"); + std::fs::create_dir_all(&backlog).unwrap(); + std::fs::write( + sk.join("project.toml"), + r#" +[[agent]] +name = "coder-sonnet" +stage = "coder" + +[[agent]] +name = "coder-opus" +stage = "coder" +"#, + ) + .unwrap(); + std::fs::write( + backlog.join("368_story_test.md"), + "---\nname: Test Story\nagent: coder-opus\n---\n# Story 368\n", + ) + .unwrap(); + + let pool = AgentPool::new_test(3011); + // Preferred agent is busy — should NOT fall back to coder-sonnet. + pool.inject_test_agent("other-story", "coder-opus", AgentStatus::Running); + + let result = pool + .start_agent(tmp.path(), "368_story_test", None, None) + .await; + assert!(result.is_err(), "expected error when preferred agent is busy"); + let err = result.unwrap_err(); + assert!( + err.contains("coder-opus"), + "error should mention the preferred agent: {err}" + ); + assert!( + err.contains("busy") || err.contains("queued"), + "error should say agent is busy or story is queued: {err}" + ); + } + + // ── archive + cleanup integration test ─────────────────────────────────── + + #[tokio::test] + async fn archiving_story_removes_agent_entries_from_pool() { + use crate::agents::lifecycle::move_story_to_archived; + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let current = root.join(".storkit/work/2_current"); + fs::create_dir_all(¤t).unwrap(); + fs::write(current.join("60_story_cleanup.md"), "test").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("60_story_cleanup", "coder-1", AgentStatus::Completed); + pool.inject_test_agent("60_story_cleanup", "qa", AgentStatus::Completed); + pool.inject_test_agent("61_story_other", "coder-1", AgentStatus::Running); + + assert_eq!(pool.list_agents().unwrap().len(), 3); + + move_story_to_archived(root, "60_story_cleanup").unwrap(); + pool.remove_agents_for_story("60_story_cleanup"); + + let remaining = pool.list_agents().unwrap(); + assert_eq!( + remaining.len(), + 1, + "only the other story's agent should remain" + ); + assert_eq!(remaining[0].story_id, "61_story_other"); + + assert!( + root.join(".storkit/work/5_done/60_story_cleanup.md") + .exists() + ); + } + +} diff --git a/server/src/agents/pool/pipeline/advance/helpers.rs b/server/src/agents/pool/pipeline/advance/helpers.rs index f3d66940..2fcd5310 100644 --- a/server/src/agents/pool/pipeline/advance/helpers.rs +++ b/server/src/agents/pool/pipeline/advance/helpers.rs @@ -127,4 +127,3 @@ pub(crate) fn should_block_story( None } } - diff --git a/server/src/agents/pool/pipeline/advance/mod.rs b/server/src/agents/pool/pipeline/advance/mod.rs index b37889ee..32f510e3 100644 --- a/server/src/agents/pool/pipeline/advance/mod.rs +++ b/server/src/agents/pool/pipeline/advance/mod.rs @@ -1,4 +1,5 @@ //! Pipeline advance — moves stories forward through pipeline stages after agent completion. +#![allow(unused_imports, dead_code)] use crate::config::ProjectConfig; use crate::io::watcher::WatcherEvent; use crate::slog; @@ -483,11 +484,10 @@ impl AgentPool { /// /// This is a **non-async** function so it does not participate in the opaque /// type cycle between `start_agent` and `run_server_owned_completion`. - mod helpers; -pub(crate) use helpers::{should_block_story, spawn_pipeline_advance}; use helpers::{resolve_qa_mode_from_store, write_review_hold_to_store}; +pub(crate) use helpers::{should_block_story, spawn_pipeline_advance}; mod tests { use super::super::super::AgentPool; diff --git a/server/src/agents/pool/start/mod.rs b/server/src/agents/pool/start/mod.rs index 3a89ec60..c71620fa 100644 --- a/server/src/agents/pool/start/mod.rs +++ b/server/src/agents/pool/start/mod.rs @@ -1,4 +1,5 @@ //! Agent start — spawns a new agent process in a worktree for a given story. +#![allow(unused_imports, dead_code)] use crate::agent_log::AgentLogWriter; use crate::config::ProjectConfig; use crate::slog_error; @@ -21,7 +22,6 @@ mod validation; use validation::{read_front_matter_agent, validate_agent_stage}; - impl AgentPool { /// Start an agent for a story: load config, create worktree, spawn agent. /// @@ -315,7 +315,6 @@ impl AgentPool { inactivity_timeout_secs, )); - // Store the task handle while the agent is still Pending. { let mut agents = self.agents.lock().map_err(|e| e.to_string())?; diff --git a/server/src/agents/pool/start/spawn.rs b/server/src/agents/pool/start/spawn.rs index d9fbd90d..1b6b6e12 100644 --- a/server/src/agents/pool/start/spawn.rs +++ b/server/src/agents/pool/start/spawn.rs @@ -16,14 +16,14 @@ use crate::config::ProjectConfig; use crate::io::watcher::WatcherEvent; use crate::slog_error; -use super::super::super::{ - AgentEvent, AgentStatus, PipelineStage, agent_config_stage, pipeline_stage, -}; use super::super::super::merge::MergeJob; -use super::super::AgentPool; use super::super::super::runtime::{ AgentRuntime, ClaudeCodeRuntime, GeminiRuntime, OpenAiRuntime, RuntimeContext, }; +use super::super::super::{ + AgentEvent, AgentStatus, PipelineStage, agent_config_stage, pipeline_stage, +}; +use super::super::AgentPool; use super::super::types::StoryAgent; /// Run the background worktree-creation + agent-launch flow. @@ -66,296 +66,289 @@ pub(super) async fn run_agent_spawn( let child_killers_clone = child_killers; let watcher_tx_clone = watcher_tx; let merge_jobs_clone = merge_jobs; - let _ = inactivity_timeout_secs; // currently unused inside the closure body + let _ = inactivity_timeout_secs; // currently unused inside the closure body - // Step 1: create the worktree (slow — git checkout, pnpm install, etc.) - let wt_info = match crate::worktree::create_worktree( - &project_root_clone, - &sid, - &config_clone, - port_for_task, - ) - .await - { - Ok(wt) => wt, - Err(e) => { - let error_msg = format!("Failed to create worktree: {e}"); - slog_error!("[agents] {error_msg}"); - let event = AgentEvent::Error { - story_id: sid.clone(), - agent_name: aname.clone(), - message: error_msg, - }; - if let Ok(mut log) = log_clone.lock() { - log.push(event.clone()); - } - let _ = tx_clone.send(event); - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Failed; - } - AgentPool::notify_agent_state_changed(&watcher_tx_clone); - return; - } - }; - - // Step 2: store worktree info and render agent command/args/prompt. - let wt_path_str = wt_info.path.to_string_lossy().to_string(); - { - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.worktree_info = Some(wt_info.clone()); - } - } - - let (command, args, mut prompt) = match config_clone.render_agent_args( - &wt_path_str, - &sid, - Some(&aname), - Some(&wt_info.base_branch), - ) { - Ok(result) => result, - Err(e) => { - let error_msg = format!("Failed to render agent args: {e}"); - slog_error!("[agents] {error_msg}"); - let event = AgentEvent::Error { - story_id: sid.clone(), - agent_name: aname.clone(), - message: error_msg, - }; - if let Ok(mut log) = log_clone.lock() { - log.push(event.clone()); - } - let _ = tx_clone.send(event); - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Failed; - } - AgentPool::notify_agent_state_changed(&watcher_tx_clone); - return; - } - }; - - // Append project-local prompt content (.huskies/AGENT.md) to the - // baked-in prompt so every agent role sees project-specific guidance - // without any config changes. The file is read fresh each spawn; - // if absent or empty, the prompt is unchanged and no warning is logged. - if let Some(local) = - crate::agents::local_prompt::read_project_local_prompt(&project_root_clone) - { - prompt.push_str("\n\n"); - prompt.push_str(&local); - } - - // Build the effective prompt and determine resume session. - // - // When resuming a previous session, discard the full rendered prompt - // (which would re-read CLAUDE.md and README) and send only the gate - // failure context as a new message. On a fresh start, append the - // failure context to the original prompt as before. - let effective_prompt = match &session_id_to_resume_owned { - Some(_) => resume_context_owned.unwrap_or_default(), - None => { - if let Some(ctx) = resume_context_owned { - prompt.push_str(&ctx); - } - prompt - } - }; - - // Step 3: transition to Running now that the worktree is ready. - { - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Running; - } - } - let _ = tx_clone.send(AgentEvent::Status { + // Step 1: create the worktree (slow — git checkout, pnpm install, etc.) + let wt_info = match crate::worktree::create_worktree( + &project_root_clone, + &sid, + &config_clone, + port_for_task, + ) + .await + { + Ok(wt) => wt, + Err(e) => { + let error_msg = format!("Failed to create worktree: {e}"); + slog_error!("[agents] {error_msg}"); + let event = AgentEvent::Error { story_id: sid.clone(), agent_name: aname.clone(), - status: "running".to_string(), - }); - AgentPool::notify_agent_state_changed(&watcher_tx_clone); - - // Step 4: launch the agent process via the configured runtime. - let runtime_name = config_clone - .find_agent(&aname) - .and_then(|a| a.runtime.as_deref()) - .unwrap_or("claude-code"); - - let run_result = match runtime_name { - "claude-code" => { - let runtime = ClaudeCodeRuntime::new( - child_killers_clone.clone(), - watcher_tx_clone.clone(), - ); - let ctx = RuntimeContext { - story_id: sid.clone(), - agent_name: aname.clone(), - command, - args, - prompt: effective_prompt, - cwd: wt_path_str, - inactivity_timeout_secs, - mcp_port: port_for_task, - session_id_to_resume: session_id_to_resume_owned.clone(), - }; - runtime - .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) - .await - } - "gemini" => { - let runtime = GeminiRuntime::new(); - let ctx = RuntimeContext { - story_id: sid.clone(), - agent_name: aname.clone(), - command, - args, - prompt: effective_prompt, - cwd: wt_path_str, - inactivity_timeout_secs, - mcp_port: port_for_task, - session_id_to_resume: session_id_to_resume_owned.clone(), - }; - runtime - .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) - .await - } - "openai" => { - let runtime = OpenAiRuntime::new(); - let ctx = RuntimeContext { - story_id: sid.clone(), - agent_name: aname.clone(), - command, - args, - prompt: effective_prompt, - cwd: wt_path_str, - inactivity_timeout_secs, - mcp_port: port_for_task, - session_id_to_resume: session_id_to_resume_owned, - }; - runtime - .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) - .await - } - other => Err(format!( - "Unknown agent runtime '{other}'; check the 'runtime' field in project.toml. \ - Supported: 'claude-code', 'gemini', 'openai'" - )), + message: error_msg, }; + if let Ok(mut log) = log_clone.lock() { + log.push(event.clone()); + } + let _ = tx_clone.send(event); + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.status = AgentStatus::Failed; + } + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + return; + } + }; - match run_result { - Ok(result) => { - // Persist token usage if the agent reported it. - if let Some(ref usage) = result.token_usage - && let Ok(agents) = agents_ref.lock() - && let Some(agent) = agents.get(&key_clone) - && let Some(ref pr) = agent.project_root - { - let model = config_clone - .find_agent(&aname) - .and_then(|a| a.model.clone()); - let record = crate::agents::token_usage::build_record( - &sid, - &aname, - model, - usage.clone(), - ); - if let Err(e) = crate::agents::token_usage::append_record(pr, &record) { - slog_error!( - "[agents] Failed to persist token usage for \ + // Step 2: store worktree info and render agent command/args/prompt. + let wt_path_str = wt_info.path.to_string_lossy().to_string(); + { + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.worktree_info = Some(wt_info.clone()); + } + } + + let (command, args, mut prompt) = match config_clone.render_agent_args( + &wt_path_str, + &sid, + Some(&aname), + Some(&wt_info.base_branch), + ) { + Ok(result) => result, + Err(e) => { + let error_msg = format!("Failed to render agent args: {e}"); + slog_error!("[agents] {error_msg}"); + let event = AgentEvent::Error { + story_id: sid.clone(), + agent_name: aname.clone(), + message: error_msg, + }; + if let Ok(mut log) = log_clone.lock() { + log.push(event.clone()); + } + let _ = tx_clone.send(event); + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.status = AgentStatus::Failed; + } + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + return; + } + }; + + // Append project-local prompt content (.huskies/AGENT.md) to the + // baked-in prompt so every agent role sees project-specific guidance + // without any config changes. The file is read fresh each spawn; + // if absent or empty, the prompt is unchanged and no warning is logged. + if let Some(local) = crate::agents::local_prompt::read_project_local_prompt(&project_root_clone) + { + prompt.push_str("\n\n"); + prompt.push_str(&local); + } + + // Build the effective prompt and determine resume session. + // + // When resuming a previous session, discard the full rendered prompt + // (which would re-read CLAUDE.md and README) and send only the gate + // failure context as a new message. On a fresh start, append the + // failure context to the original prompt as before. + let effective_prompt = match &session_id_to_resume_owned { + Some(_) => resume_context_owned.unwrap_or_default(), + None => { + if let Some(ctx) = resume_context_owned { + prompt.push_str(&ctx); + } + prompt + } + }; + + // Step 3: transition to Running now that the worktree is ready. + { + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.status = AgentStatus::Running; + } + } + let _ = tx_clone.send(AgentEvent::Status { + story_id: sid.clone(), + agent_name: aname.clone(), + status: "running".to_string(), + }); + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + + // Step 4: launch the agent process via the configured runtime. + let runtime_name = config_clone + .find_agent(&aname) + .and_then(|a| a.runtime.as_deref()) + .unwrap_or("claude-code"); + + let run_result = match runtime_name { + "claude-code" => { + let runtime = + ClaudeCodeRuntime::new(child_killers_clone.clone(), watcher_tx_clone.clone()); + let ctx = RuntimeContext { + story_id: sid.clone(), + agent_name: aname.clone(), + command, + args, + prompt: effective_prompt, + cwd: wt_path_str, + inactivity_timeout_secs, + mcp_port: port_for_task, + session_id_to_resume: session_id_to_resume_owned.clone(), + }; + runtime + .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) + .await + } + "gemini" => { + let runtime = GeminiRuntime::new(); + let ctx = RuntimeContext { + story_id: sid.clone(), + agent_name: aname.clone(), + command, + args, + prompt: effective_prompt, + cwd: wt_path_str, + inactivity_timeout_secs, + mcp_port: port_for_task, + session_id_to_resume: session_id_to_resume_owned.clone(), + }; + runtime + .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) + .await + } + "openai" => { + let runtime = OpenAiRuntime::new(); + let ctx = RuntimeContext { + story_id: sid.clone(), + agent_name: aname.clone(), + command, + args, + prompt: effective_prompt, + cwd: wt_path_str, + inactivity_timeout_secs, + mcp_port: port_for_task, + session_id_to_resume: session_id_to_resume_owned, + }; + runtime + .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) + .await + } + other => Err(format!( + "Unknown agent runtime '{other}'; check the 'runtime' field in project.toml. \ + Supported: 'claude-code', 'gemini', 'openai'" + )), + }; + + match run_result { + Ok(result) => { + // Persist token usage if the agent reported it. + if let Some(ref usage) = result.token_usage + && let Ok(agents) = agents_ref.lock() + && let Some(agent) = agents.get(&key_clone) + && let Some(ref pr) = agent.project_root + { + let model = config_clone + .find_agent(&aname) + .and_then(|a| a.model.clone()); + let record = + crate::agents::token_usage::build_record(&sid, &aname, model, usage.clone()); + if let Err(e) = crate::agents::token_usage::append_record(pr, &record) { + slog_error!( + "[agents] Failed to persist token usage for \ {sid}:{aname}: {e}" - ); - } - } - - // Mergemaster agents have their own completion path via - // start_merge_agent_work / run_merge_pipeline and must NOT go - // through server-owned gates. When a mergemaster exits early - // (e.g. rate-limited before calling start_merge_agent_work) the - // feature-branch worktree compiles fine and post-merge tests on - // master pass (nothing changed), which would wrongly advance the - // story to 5_done/ without any squash merge having occurred. - // Instead: just remove the agent from the pool and let - // auto-assign restart a new mergemaster for the story. - let stage = config_clone - .find_agent(&aname) - .map(agent_config_stage) - .unwrap_or_else(|| pipeline_stage(&aname)); - if stage == PipelineStage::Mergemaster { - let (tx_done, done_session_id) = { - let mut lock = match agents_ref.lock() { - Ok(a) => a, - Err(_) => return, - }; - if let Some(agent) = lock.remove(&key_clone) { - (agent.tx, agent.session_id.or(result.session_id)) - } else { - (tx_clone.clone(), result.session_id) - } - }; - // Clear any stale Running merge job so the next mergemaster - // can call start_merge_agent_work without hitting "Merge - // already in progress" (bug 498). - if let Ok(mut jobs) = merge_jobs_clone.lock() - && let Some(job) = jobs.get(&sid) - && matches!(job.status, crate::agents::merge::MergeJobStatus::Running) - { - jobs.remove(&sid); - } - let _ = tx_done.send(AgentEvent::Done { - story_id: sid.clone(), - agent_name: aname.clone(), - session_id: done_session_id, - }); - AgentPool::notify_agent_state_changed(&watcher_tx_clone); - // Send a WorkItem event so the auto-assign watcher loop - // re-dispatches a new mergemaster if the story still needs - // merging. This avoids an async call to start_agent inside - // a tokio::spawn (which would require Send). - let _ = watcher_tx_clone.send(crate::io::watcher::WatcherEvent::WorkItem { - stage: "4_merge".to_string(), - item_id: sid.clone(), - action: "reassign".to_string(), - commit_msg: String::new(), - from_stage: None, - }); - } else { - // Server-owned completion: run acceptance gates automatically - // when the agent process exits normally. - super::super::pipeline::run_server_owned_completion( - &agents_ref, - port_for_task, - &sid, - &aname, - result.session_id, - watcher_tx_clone.clone(), - ) - .await; - AgentPool::notify_agent_state_changed(&watcher_tx_clone); - } - } - Err(e) => { - slog_error!("[agents] Agent process error for {aname} on {sid}: {e}"); - let event = AgentEvent::Error { - story_id: sid.clone(), - agent_name: aname.clone(), - message: e, - }; - if let Ok(mut log) = log_clone.lock() { - log.push(event.clone()); - } - let _ = tx_clone.send(event); - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Failed; - } - AgentPool::notify_agent_state_changed(&watcher_tx_clone); + ); } } + + // Mergemaster agents have their own completion path via + // start_merge_agent_work / run_merge_pipeline and must NOT go + // through server-owned gates. When a mergemaster exits early + // (e.g. rate-limited before calling start_merge_agent_work) the + // feature-branch worktree compiles fine and post-merge tests on + // master pass (nothing changed), which would wrongly advance the + // story to 5_done/ without any squash merge having occurred. + // Instead: just remove the agent from the pool and let + // auto-assign restart a new mergemaster for the story. + let stage = config_clone + .find_agent(&aname) + .map(agent_config_stage) + .unwrap_or_else(|| pipeline_stage(&aname)); + if stage == PipelineStage::Mergemaster { + let (tx_done, done_session_id) = { + let mut lock = match agents_ref.lock() { + Ok(a) => a, + Err(_) => return, + }; + if let Some(agent) = lock.remove(&key_clone) { + (agent.tx, agent.session_id.or(result.session_id)) + } else { + (tx_clone.clone(), result.session_id) + } + }; + // Clear any stale Running merge job so the next mergemaster + // can call start_merge_agent_work without hitting "Merge + // already in progress" (bug 498). + if let Ok(mut jobs) = merge_jobs_clone.lock() + && let Some(job) = jobs.get(&sid) + && matches!(job.status, crate::agents::merge::MergeJobStatus::Running) + { + jobs.remove(&sid); + } + let _ = tx_done.send(AgentEvent::Done { + story_id: sid.clone(), + agent_name: aname.clone(), + session_id: done_session_id, + }); + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + // Send a WorkItem event so the auto-assign watcher loop + // re-dispatches a new mergemaster if the story still needs + // merging. This avoids an async call to start_agent inside + // a tokio::spawn (which would require Send). + let _ = watcher_tx_clone.send(crate::io::watcher::WatcherEvent::WorkItem { + stage: "4_merge".to_string(), + item_id: sid.clone(), + action: "reassign".to_string(), + commit_msg: String::new(), + from_stage: None, + }); + } else { + // Server-owned completion: run acceptance gates automatically + // when the agent process exits normally. + super::super::pipeline::run_server_owned_completion( + &agents_ref, + port_for_task, + &sid, + &aname, + result.session_id, + watcher_tx_clone.clone(), + ) + .await; + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + } + } + Err(e) => { + slog_error!("[agents] Agent process error for {aname} on {sid}: {e}"); + let event = AgentEvent::Error { + story_id: sid.clone(), + agent_name: aname.clone(), + message: e, + }; + if let Ok(mut log) = log_clone.lock() { + log.push(event.clone()); + } + let _ = tx_clone.send(event); + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.status = AgentStatus::Failed; + } + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + } + } } diff --git a/server/src/agents/pool/start/validation.rs b/server/src/agents/pool/start/validation.rs index 40860c42..d9dc5e08 100644 --- a/server/src/agents/pool/start/validation.rs +++ b/server/src/agents/pool/start/validation.rs @@ -54,10 +54,7 @@ pub(super) fn validate_agent_stage( /// `start_agent` honour an explicit `agent: coder-opus` written by the /// `assign` command (bug 379). Returns `None` when an explicit agent_name /// was already supplied or when the story has no front-matter preference. -pub(super) fn read_front_matter_agent( - story_id: &str, - agent_name: Option<&str>, -) -> Option { +pub(super) fn read_front_matter_agent(story_id: &str, agent_name: Option<&str>) -> Option { if agent_name.is_some() { return None; } diff --git a/server/src/chat/transport/matrix/config/loading.rs b/server/src/chat/transport/matrix/config/loading.rs index dabacd10..ec5b56cb 100644 --- a/server/src/chat/transport/matrix/config/loading.rs +++ b/server/src/chat/transport/matrix/config/loading.rs @@ -220,4 +220,3 @@ pub fn save_ambient_rooms(project_root: &Path, room_ids: &[String]) { Err(e) => eprintln!("[matrix-bot] save_ambient_rooms: failed to serialise bot.toml: {e}"), } } - diff --git a/server/src/chat/transport/matrix/config/mod.rs b/server/src/chat/transport/matrix/config/mod.rs index cb6185d8..eedf4734 100644 --- a/server/src/chat/transport/matrix/config/mod.rs +++ b/server/src/chat/transport/matrix/config/mod.rs @@ -1,4 +1,5 @@ //! Matrix transport configuration — deserialization of `bot.toml` Matrix settings. +#![allow(unused_imports, dead_code)] use serde::Deserialize; use std::path::Path; @@ -184,7 +185,6 @@ fn default_whatsapp_provider() -> String { "meta".to_string() } - mod loading; pub use loading::save_ambient_rooms; diff --git a/server/src/chat/transport/matrix/notifications.rs b/server/src/chat/transport/matrix/notifications.rs new file mode 100644 index 00000000..e9d152e4 --- /dev/null +++ b/server/src/chat/transport/matrix/notifications.rs @@ -0,0 +1,931 @@ +//! Stage transition notifications for Matrix rooms. +//! +//! Subscribes to [`WatcherEvent`] broadcasts and posts a notification to all +//! configured Matrix rooms whenever a work item moves between pipeline stages. + +use crate::io::story_metadata::parse_front_matter; +use crate::io::watcher::WatcherEvent; +use crate::slog; +use crate::chat::ChatTransport; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::broadcast; + +/// Human-readable display name for a pipeline stage directory. +pub fn stage_display_name(stage: &str) -> &'static str { + match stage { + "1_backlog" => "Backlog", + "2_current" => "Current", + "3_qa" => "QA", + "4_merge" => "Merge", + "5_done" => "Done", + "6_archived" => "Archived", + _ => "Unknown", + } +} + +/// Infer the previous pipeline stage for a given destination stage. +/// +/// Returns `None` for `1_backlog` since items are created there (not +/// transitioned from another stage). +pub fn inferred_from_stage(to_stage: &str) -> Option<&'static str> { + match to_stage { + "2_current" => Some("Backlog"), + "3_qa" => Some("Current"), + "4_merge" => Some("QA"), + "5_done" => Some("Merge"), + "6_archived" => Some("Done"), + _ => None, + } +} + +/// Extract the numeric story number from an item ID like `"261_story_slug"`. +pub fn extract_story_number(item_id: &str) -> Option<&str> { + item_id + .split('_') + .next() + .filter(|s| !s.is_empty() && s.chars().all(|c| c.is_ascii_digit())) +} + +/// Read the story name from the work item file's YAML front matter. +/// +/// Returns `None` if the file doesn't exist or has no parseable name. +pub fn read_story_name(project_root: &Path, stage: &str, item_id: &str) -> Option { + let path = project_root + .join(".storkit") + .join("work") + .join(stage) + .join(format!("{item_id}.md")); + let contents = std::fs::read_to_string(&path).ok()?; + let meta = parse_front_matter(&contents).ok()?; + meta.name +} + +/// Format a stage transition notification message. +/// +/// Returns `(plain_text, html)` suitable for `RoomMessageEventContent::text_html`. +pub fn format_stage_notification( + item_id: &str, + story_name: Option<&str>, + from_stage: &str, + to_stage: &str, +) -> (String, String) { + let number = extract_story_number(item_id).unwrap_or(item_id); + let name = story_name.unwrap_or(item_id); + + let prefix = if to_stage == "Done" { "\u{1f389} " } else { "" }; + let plain = format!("{prefix}#{number} {name} \u{2014} {from_stage} \u{2192} {to_stage}"); + let html = format!( + "{prefix}#{number} {name} \u{2014} {from_stage} \u{2192} {to_stage}" + ); + (plain, html) +} + +/// Format an error notification message for a story failure. +/// +/// Returns `(plain_text, html)` suitable for `RoomMessageEventContent::text_html`. +pub fn format_error_notification( + item_id: &str, + story_name: Option<&str>, + reason: &str, +) -> (String, String) { + let number = extract_story_number(item_id).unwrap_or(item_id); + let name = story_name.unwrap_or(item_id); + + let plain = format!("\u{274c} #{number} {name} \u{2014} {reason}"); + let html = format!( + "\u{274c} #{number} {name} \u{2014} {reason}" + ); + (plain, html) +} + +/// Search all pipeline stages for a story name. +/// +/// Tries each known pipeline stage directory in order and returns the first +/// name found. Used for events (like rate-limit warnings) that arrive without +/// a known stage. +fn find_story_name_any_stage(project_root: &Path, item_id: &str) -> Option { + for stage in &["2_current", "3_qa", "4_merge", "1_backlog", "5_done"] { + if let Some(name) = read_story_name(project_root, stage, item_id) { + return Some(name); + } + } + None +} + +/// Format a blocked-story notification message. +/// +/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. +pub fn format_blocked_notification( + item_id: &str, + story_name: Option<&str>, + reason: &str, +) -> (String, String) { + let number = extract_story_number(item_id).unwrap_or(item_id); + let name = story_name.unwrap_or(item_id); + + let plain = format!("\u{1f6ab} #{number} {name} \u{2014} BLOCKED: {reason}"); + let html = format!( + "\u{1f6ab} #{number} {name} \u{2014} BLOCKED: {reason}" + ); + (plain, html) +} + +/// Minimum time between rate-limit notifications for the same agent. +const RATE_LIMIT_DEBOUNCE: Duration = Duration::from_secs(60); + +/// Format a rate limit hard block notification message with scheduled resume time. +/// +/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. +pub fn format_rate_limit_hard_block_notification( + item_id: &str, + story_name: Option<&str>, + agent_name: &str, + resume_at: chrono::DateTime, +) -> (String, String) { + let number = extract_story_number(item_id).unwrap_or(item_id); + let name = story_name.unwrap_or(item_id); + let local_time = resume_at.with_timezone(&chrono::Local); + let resume_str = local_time.format("%Y-%m-%d %H:%M").to_string(); + + let plain = format!( + "\u{1f6d1} #{number} {name} \u{2014} {agent_name} hit a hard rate limit; \ + will auto-resume at {resume_str}" + ); + let html = format!( + "\u{1f6d1} #{number} {name} \u{2014} \ + {agent_name} hit a hard rate limit; will auto-resume at {resume_str}" + ); + (plain, html) +} + +/// Format a rate limit warning notification message. +/// +/// Returns `(plain_text, html)` suitable for `ChatTransport::send_message`. +pub fn format_rate_limit_notification( + item_id: &str, + story_name: Option<&str>, + agent_name: &str, +) -> (String, String) { + let number = extract_story_number(item_id).unwrap_or(item_id); + let name = story_name.unwrap_or(item_id); + + let plain = format!( + "\u{26a0}\u{fe0f} #{number} {name} \u{2014} {agent_name} hit an API rate limit" + ); + let html = format!( + "\u{26a0}\u{fe0f} #{number} {name} \u{2014} \ + {agent_name} hit an API rate limit" + ); + (plain, html) +} + +/// Spawn a background task that listens for watcher events and posts +/// stage-transition notifications to all configured rooms via the +/// [`ChatTransport`] abstraction. +/// +/// `get_room_ids` is called on each notification to obtain the current list of +/// destination room IDs. Pass a closure that returns a static list for Matrix +/// and Slack, or one that reads from a runtime `Arc>>` +/// for WhatsApp ambient senders. +pub fn spawn_notification_listener( + transport: Arc, + get_room_ids: impl Fn() -> Vec + Send + 'static, + watcher_rx: broadcast::Receiver, + project_root: PathBuf, +) { + tokio::spawn(async move { + let mut rx = watcher_rx; + // Tracks when a rate-limit notification was last sent for each + // "story_id:agent_name" key, to debounce repeated warnings. + let mut rate_limit_last_notified: HashMap = HashMap::new(); + + loop { + match rx.recv().await { + Ok(WatcherEvent::WorkItem { + ref stage, + ref item_id, + .. + }) => { + // Only notify on stage transitions, not creations. + let Some(from_display) = inferred_from_stage(stage) else { + continue; + }; + let to_display = stage_display_name(stage); + + let story_name = read_story_name(&project_root, stage, item_id); + let (plain, html) = format_stage_notification( + item_id, + story_name.as_deref(), + from_display, + to_display, + ); + + slog!("[bot] Sending stage notification: {plain}"); + + for room_id in &get_room_ids() { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!( + "[bot] Failed to send notification to {room_id}: {e}" + ); + } + } + } + Ok(WatcherEvent::MergeFailure { + ref story_id, + ref reason, + }) => { + let story_name = + read_story_name(&project_root, "4_merge", story_id); + let (plain, html) = format_error_notification( + story_id, + story_name.as_deref(), + reason, + ); + + slog!("[bot] Sending error notification: {plain}"); + + for room_id in &get_room_ids() { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!( + "[bot] Failed to send error notification to {room_id}: {e}" + ); + } + } + } + Ok(WatcherEvent::RateLimitWarning { + ref story_id, + ref agent_name, + }) => { + // Debounce: skip if we sent a notification for this agent + // within the last RATE_LIMIT_DEBOUNCE seconds. + let debounce_key = format!("{story_id}:{agent_name}"); + let now = Instant::now(); + if let Some(&last) = rate_limit_last_notified.get(&debounce_key) + && now.duration_since(last) < RATE_LIMIT_DEBOUNCE + { + slog!( + "[bot] Rate-limit notification debounced for \ + {story_id}:{agent_name}" + ); + continue; + } + rate_limit_last_notified.insert(debounce_key, now); + + let story_name = find_story_name_any_stage(&project_root, story_id); + let (plain, html) = format_rate_limit_notification( + story_id, + story_name.as_deref(), + agent_name, + ); + + slog!("[bot] Sending rate-limit notification: {plain}"); + + for room_id in &get_room_ids() { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!( + "[bot] Failed to send rate-limit notification \ + to {room_id}: {e}" + ); + } + } + } + Ok(WatcherEvent::StoryBlocked { + ref story_id, + ref reason, + }) => { + let story_name = find_story_name_any_stage(&project_root, story_id); + let (plain, html) = format_blocked_notification( + story_id, + story_name.as_deref(), + reason, + ); + + slog!("[bot] Sending blocked notification: {plain}"); + + for room_id in &get_room_ids() { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!( + "[bot] Failed to send blocked notification to {room_id}: {e}" + ); + } + } + } + Ok(WatcherEvent::RateLimitHardBlock { + ref story_id, + ref agent_name, + reset_at, + }) => { + // Debounce: reuse the same key as RateLimitWarning so both + // types are rate-limited together for the same agent. + let debounce_key = format!("{story_id}:{agent_name}"); + let now = Instant::now(); + if let Some(&last) = rate_limit_last_notified.get(&debounce_key) + && now.duration_since(last) < RATE_LIMIT_DEBOUNCE + { + slog!( + "[bot] Rate-limit hard-block notification debounced for \ + {story_id}:{agent_name}" + ); + continue; + } + rate_limit_last_notified.insert(debounce_key, now); + + let story_name = find_story_name_any_stage(&project_root, story_id); + let (plain, html) = format_rate_limit_hard_block_notification( + story_id, + story_name.as_deref(), + agent_name, + reset_at, + ); + + slog!("[bot] Sending rate-limit hard-block notification: {plain}"); + + for room_id in &get_room_ids() { + if let Err(e) = transport.send_message(room_id, &plain, &html).await { + slog!( + "[bot] Failed to send rate-limit hard-block notification \ + to {room_id}: {e}" + ); + } + } + } + Ok(_) => {} // Ignore non-work-item events + Err(broadcast::error::RecvError::Lagged(n)) => { + slog!( + "[bot] Notification listener lagged, skipped {n} events" + ); + } + Err(broadcast::error::RecvError::Closed) => { + slog!( + "[bot] Watcher channel closed, stopping notification listener" + ); + break; + } + } + } + }); +} + +#[cfg(test)] +mod tests { + use super::*; + use async_trait::async_trait; + use crate::chat::MessageId; + + // ── MockTransport ─────────────────────────────────────────────────────── + + type CallLog = Arc>>; + + /// Records every `send_message` call for inspection in tests. + struct MockTransport { + calls: CallLog, + } + + impl MockTransport { + fn new() -> (Arc, CallLog) { + let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); + (Arc::new(Self { calls: Arc::clone(&calls) }), calls) + } + } + + #[async_trait] + impl crate::chat::ChatTransport for MockTransport { + async fn send_message(&self, room_id: &str, plain: &str, html: &str) -> Result { + self.calls.lock().unwrap().push((room_id.to_string(), plain.to_string(), html.to_string())); + Ok("mock-msg-id".to_string()) + } + + async fn edit_message(&self, _room_id: &str, _id: &str, _plain: &str, _html: &str) -> Result<(), String> { + Ok(()) + } + + async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { + Ok(()) + } + } + + // ── spawn_notification_listener: RateLimitWarning ─────────────────────── + + /// AC2 + AC3: when a RateLimitWarning event arrives, send_message is called + /// with a notification that names the agent and story. + #[tokio::test] + async fn rate_limit_warning_sends_notification_with_agent_and_story() { + let tmp = tempfile::tempdir().unwrap(); + let stage_dir = tmp.path().join(".storkit").join("work").join("2_current"); + std::fs::create_dir_all(&stage_dir).unwrap(); + std::fs::write( + stage_dir.join("365_story_rate_limit.md"), + "---\nname: Rate Limit Test Story\n---\n", + ) + .unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room123:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + watcher_tx.send(WatcherEvent::RateLimitWarning { + story_id: "365_story_rate_limit".to_string(), + agent_name: "coder-1".to_string(), + }).unwrap(); + + // Give the spawned task time to process the event. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 1, "Expected exactly one notification"); + let (room_id, plain, _html) = &calls[0]; + assert_eq!(room_id, "!room123:example.org"); + assert!(plain.contains("365"), "plain should contain story number"); + assert!(plain.contains("Rate Limit Test Story"), "plain should contain story name"); + assert!(plain.contains("coder-1"), "plain should contain agent name"); + assert!(plain.contains("rate limit"), "plain should mention rate limit"); + } + + /// AC4: a second RateLimitWarning for the same agent within the debounce + /// window must NOT trigger a second notification. + #[tokio::test] + async fn rate_limit_warning_is_debounced() { + let tmp = tempfile::tempdir().unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room1:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + // Send the same warning twice in rapid succession. + for _ in 0..2 { + watcher_tx.send(WatcherEvent::RateLimitWarning { + story_id: "42_story_debounce".to_string(), + agent_name: "coder-2".to_string(), + }).unwrap(); + } + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 1, "Debounce should suppress the second notification"); + } + + /// AC4 (corollary): warnings for different agents are NOT debounced against + /// each other — both should produce notifications. + #[tokio::test] + async fn rate_limit_warnings_for_different_agents_both_notify() { + let tmp = tempfile::tempdir().unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room1:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + watcher_tx.send(WatcherEvent::RateLimitWarning { + story_id: "42_story_foo".to_string(), + agent_name: "coder-1".to_string(), + }).unwrap(); + watcher_tx.send(WatcherEvent::RateLimitWarning { + story_id: "42_story_foo".to_string(), + agent_name: "coder-2".to_string(), + }).unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 2, "Different agents should each trigger a notification"); + } + + // ── dynamic room IDs (WhatsApp ambient_rooms pattern) ─────────────────── + + /// Notifications are sent to the rooms returned by the closure at + /// notification time, not at listener-spawn time. This verifies that a + /// closure backed by a runtime set (e.g. WhatsApp ambient_rooms) delivers + /// messages to the rooms present when the event fires. + #[tokio::test] + async fn stage_notification_uses_dynamic_room_ids() { + let tmp = tempfile::tempdir().unwrap(); + let stage_dir = tmp.path().join(".storkit").join("work").join("3_qa"); + std::fs::create_dir_all(&stage_dir).unwrap(); + std::fs::write( + stage_dir.join("10_story_foo.md"), + "---\nname: Foo Story\n---\n", + ) + .unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + let rooms: Arc>> = + Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())); + let rooms_for_closure = Arc::clone(&rooms); + + spawn_notification_listener( + transport, + move || rooms_for_closure.lock().unwrap().iter().cloned().collect(), + watcher_rx, + tmp.path().to_path_buf(), + ); + + // Add a room after the listener is spawned (simulates a user messaging first). + rooms.lock().unwrap().insert("phone:+15551234567".to_string()); + + watcher_tx.send(WatcherEvent::WorkItem { + stage: "3_qa".to_string(), + item_id: "10_story_foo".to_string(), + action: "qa".to_string(), + commit_msg: "storkit: qa 10_story_foo".to_string(), + }).unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 1, "Should deliver to the dynamically added room"); + assert_eq!(calls[0].0, "phone:+15551234567"); + assert!(calls[0].1.contains("10"), "plain should contain story number"); + assert!(calls[0].1.contains("Foo Story"), "plain should contain story name"); + } + + /// When no rooms are registered (e.g. no WhatsApp users have messaged yet), + /// no notifications are sent and the listener does not panic. + #[tokio::test] + async fn stage_notification_with_no_rooms_is_silent() { + let tmp = tempfile::tempdir().unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + Vec::new, + watcher_rx, + tmp.path().to_path_buf(), + ); + + watcher_tx.send(WatcherEvent::WorkItem { + stage: "3_qa".to_string(), + item_id: "10_story_foo".to_string(), + action: "qa".to_string(), + commit_msg: "storkit: qa 10_story_foo".to_string(), + }).unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 0, "No rooms means no notifications"); + } + + // ── stage_display_name ────────────────────────────────────────────────── + + #[test] + fn stage_display_name_maps_all_known_stages() { + assert_eq!(stage_display_name("1_backlog"), "Backlog"); + assert_eq!(stage_display_name("2_current"), "Current"); + assert_eq!(stage_display_name("3_qa"), "QA"); + assert_eq!(stage_display_name("4_merge"), "Merge"); + assert_eq!(stage_display_name("5_done"), "Done"); + assert_eq!(stage_display_name("6_archived"), "Archived"); + assert_eq!(stage_display_name("unknown"), "Unknown"); + } + + // ── inferred_from_stage ───────────────────────────────────────────────── + + #[test] + fn inferred_from_stage_returns_previous_stage() { + assert_eq!(inferred_from_stage("2_current"), Some("Backlog")); + assert_eq!(inferred_from_stage("3_qa"), Some("Current")); + assert_eq!(inferred_from_stage("4_merge"), Some("QA")); + assert_eq!(inferred_from_stage("5_done"), Some("Merge")); + assert_eq!(inferred_from_stage("6_archived"), Some("Done")); + } + + #[test] + fn inferred_from_stage_returns_none_for_backlog() { + assert_eq!(inferred_from_stage("1_backlog"), None); + } + + #[test] + fn inferred_from_stage_returns_none_for_unknown() { + assert_eq!(inferred_from_stage("9_unknown"), None); + } + + // ── extract_story_number ──────────────────────────────────────────────── + + #[test] + fn extract_story_number_parses_numeric_prefix() { + assert_eq!( + extract_story_number("261_story_bot_notifications"), + Some("261") + ); + assert_eq!(extract_story_number("42_bug_fix_thing"), Some("42")); + assert_eq!(extract_story_number("1_spike_research"), Some("1")); + } + + #[test] + fn extract_story_number_returns_none_for_non_numeric() { + assert_eq!(extract_story_number("abc_story_thing"), None); + assert_eq!(extract_story_number(""), None); + } + + // ── read_story_name ───────────────────────────────────────────────────── + + #[test] + fn read_story_name_reads_from_front_matter() { + let tmp = tempfile::tempdir().unwrap(); + let stage_dir = tmp + .path() + .join(".storkit") + .join("work") + .join("2_current"); + std::fs::create_dir_all(&stage_dir).unwrap(); + std::fs::write( + stage_dir.join("42_story_my_feature.md"), + "---\nname: My Cool Feature\n---\n# Story\n", + ) + .unwrap(); + + let name = read_story_name(tmp.path(), "2_current", "42_story_my_feature"); + assert_eq!(name.as_deref(), Some("My Cool Feature")); + } + + #[test] + fn read_story_name_returns_none_for_missing_file() { + let tmp = tempfile::tempdir().unwrap(); + let name = read_story_name(tmp.path(), "2_current", "99_story_missing"); + assert_eq!(name, None); + } + + #[test] + fn read_story_name_returns_none_for_missing_name_field() { + let tmp = tempfile::tempdir().unwrap(); + let stage_dir = tmp + .path() + .join(".storkit") + .join("work") + .join("2_current"); + std::fs::create_dir_all(&stage_dir).unwrap(); + std::fs::write( + stage_dir.join("42_story_no_name.md"), + "---\ncoverage_baseline: 50%\n---\n# Story\n", + ) + .unwrap(); + + let name = read_story_name(tmp.path(), "2_current", "42_story_no_name"); + assert_eq!(name, None); + } + + // ── format_error_notification ──────────────────────────────────────────── + + #[test] + fn format_error_notification_with_story_name() { + let (plain, html) = + format_error_notification("262_story_bot_errors", Some("Bot error notifications"), "merge conflict in src/main.rs"); + assert_eq!( + plain, + "\u{274c} #262 Bot error notifications \u{2014} merge conflict in src/main.rs" + ); + assert_eq!( + html, + "\u{274c} #262 Bot error notifications \u{2014} merge conflict in src/main.rs" + ); + } + + #[test] + fn format_error_notification_without_story_name_falls_back_to_item_id() { + let (plain, _html) = + format_error_notification("42_bug_fix_thing", None, "tests failed"); + assert_eq!( + plain, + "\u{274c} #42 42_bug_fix_thing \u{2014} tests failed" + ); + } + + #[test] + fn format_error_notification_non_numeric_id_uses_full_id() { + let (plain, _html) = + format_error_notification("abc_story_thing", Some("Some Story"), "clippy errors"); + assert_eq!( + plain, + "\u{274c} #abc_story_thing Some Story \u{2014} clippy errors" + ); + } + + // ── format_blocked_notification ───────────────────────────────────────── + + #[test] + fn format_blocked_notification_with_story_name() { + let (plain, html) = format_blocked_notification( + "425_story_blocking_reason", + Some("Blocking Reason Story"), + "Retry limit exceeded (3/3) at coder stage", + ); + assert_eq!( + plain, + "\u{1f6ab} #425 Blocking Reason Story \u{2014} BLOCKED: Retry limit exceeded (3/3) at coder stage" + ); + assert_eq!( + html, + "\u{1f6ab} #425 Blocking Reason Story \u{2014} BLOCKED: Retry limit exceeded (3/3) at coder stage" + ); + } + + #[test] + fn format_blocked_notification_falls_back_to_item_id() { + let (plain, _html) = + format_blocked_notification("42_story_thing", None, "empty diff"); + assert_eq!( + plain, + "\u{1f6ab} #42 42_story_thing \u{2014} BLOCKED: empty diff" + ); + } + + // ── spawn_notification_listener: StoryBlocked ─────────────────────────── + + /// AC1: when a StoryBlocked event arrives, send_message is called with a + /// notification that includes the story number, name, and reason. + #[tokio::test] + async fn story_blocked_sends_notification_with_reason() { + let tmp = tempfile::tempdir().unwrap(); + let stage_dir = tmp.path().join(".storkit").join("work").join("2_current"); + std::fs::create_dir_all(&stage_dir).unwrap(); + std::fs::write( + stage_dir.join("425_story_blocking_test.md"), + "---\nname: Blocking Test Story\n---\n", + ) + .unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + || vec!["!room123:example.org".to_string()], + watcher_rx, + tmp.path().to_path_buf(), + ); + + watcher_tx.send(WatcherEvent::StoryBlocked { + story_id: "425_story_blocking_test".to_string(), + reason: "Retry limit exceeded (3/3) at coder stage".to_string(), + }).unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 1, "Expected exactly one notification"); + let (room_id, plain, html) = &calls[0]; + assert_eq!(room_id, "!room123:example.org"); + assert!(plain.contains("425"), "plain should contain story number"); + assert!(plain.contains("Blocking Test Story"), "plain should contain story name"); + assert!(plain.contains("BLOCKED"), "plain should contain BLOCKED label"); + assert!(plain.contains("Retry limit exceeded"), "plain should contain the reason"); + assert!(html.contains("BLOCKED"), "html should contain BLOCKED label"); + } + + /// StoryBlocked with no room registered should not panic. + #[tokio::test] + async fn story_blocked_with_no_rooms_is_silent() { + let tmp = tempfile::tempdir().unwrap(); + + let (watcher_tx, watcher_rx) = broadcast::channel::(16); + let (transport, calls) = MockTransport::new(); + + spawn_notification_listener( + transport, + Vec::new, + watcher_rx, + tmp.path().to_path_buf(), + ); + + watcher_tx.send(WatcherEvent::StoryBlocked { + story_id: "42_story_no_rooms".to_string(), + reason: "empty diff".to_string(), + }).unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let calls = calls.lock().unwrap(); + assert_eq!(calls.len(), 0, "No rooms means no notifications"); + } + + // ── format_rate_limit_notification ───────────────────────────────────── + + #[test] + fn format_rate_limit_notification_includes_agent_and_story() { + let (plain, html) = format_rate_limit_notification( + "365_story_my_feature", + Some("My Feature"), + "coder-2", + ); + assert_eq!( + plain, + "\u{26a0}\u{fe0f} #365 My Feature \u{2014} coder-2 hit an API rate limit" + ); + assert_eq!( + html, + "\u{26a0}\u{fe0f} #365 My Feature \u{2014} coder-2 hit an API rate limit" + ); + } + + #[test] + fn format_rate_limit_notification_falls_back_to_item_id() { + let (plain, _html) = + format_rate_limit_notification("42_story_thing", None, "coder-1"); + assert_eq!( + plain, + "\u{26a0}\u{fe0f} #42 42_story_thing \u{2014} coder-1 hit an API rate limit" + ); + } + + // ── format_stage_notification ─────────────────────────────────────────── + + #[test] + fn format_notification_done_stage_includes_party_emoji() { + let (plain, html) = format_stage_notification( + "353_story_done", + Some("Done Story"), + "Merge", + "Done", + ); + assert_eq!( + plain, + "\u{1f389} #353 Done Story \u{2014} Merge \u{2192} Done" + ); + assert_eq!( + html, + "\u{1f389} #353 Done Story \u{2014} Merge \u{2192} Done" + ); + } + + #[test] + fn format_notification_non_done_stage_has_no_emoji() { + let (plain, _html) = format_stage_notification( + "42_story_thing", + Some("Some Story"), + "Backlog", + "Current", + ); + assert!(!plain.contains("\u{1f389}")); + } + + #[test] + fn format_notification_with_story_name() { + let (plain, html) = format_stage_notification( + "261_story_bot_notifications", + Some("Bot notifications"), + "Upcoming", + "Current", + ); + assert_eq!( + plain, + "#261 Bot notifications \u{2014} Upcoming \u{2192} Current" + ); + assert_eq!( + html, + "#261 Bot notifications \u{2014} Upcoming \u{2192} Current" + ); + } + + #[test] + fn format_notification_without_story_name_falls_back_to_item_id() { + let (plain, _html) = format_stage_notification( + "42_bug_fix_thing", + None, + "Current", + "QA", + ); + assert_eq!( + plain, + "#42 42_bug_fix_thing \u{2014} Current \u{2192} QA" + ); + } + + #[test] + fn format_notification_non_numeric_id_uses_full_id() { + let (plain, _html) = format_stage_notification( + "abc_story_thing", + Some("Some Story"), + "QA", + "Merge", + ); + assert_eq!( + plain, + "#abc_story_thing Some Story \u{2014} QA \u{2192} Merge" + ); + } +} diff --git a/server/src/cli.rs b/server/src/cli.rs index 6b6c603e..05d44e5e 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -173,7 +173,6 @@ pub(crate) fn resolve_path_arg(path_str: Option<&str>, cwd: &std::path::Path) -> path_str.map(|s| crate::io::fs::resolve_cli_path(cwd, s)) } - #[cfg(test)] mod tests { use super::*; diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index 31399a01..107a737f 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -9,6 +9,7 @@ //! so subscribers (auto-assign, WebSocket, notifications) can react without //! polling the filesystem. +#![allow(unused_imports, dead_code)] use std::collections::HashMap; /// A vector clock mapping node IDs (hex-encoded Ed25519 pubkeys) to the count @@ -30,13 +31,12 @@ pub use presence::{ }; pub use read::{ CrdtItemDump, CrdtStateDump, check_archived_deps_crdt, check_unmet_deps_crdt, - dep_is_archived_crdt, dep_is_done_crdt, dump_crdt_state, evict_item, read_all_items, - read_item, + dep_is_archived_crdt, dep_is_done_crdt, dump_crdt_state, evict_item, read_all_items, read_item, }; pub use state::init; pub use types::{ - CrdtEvent, NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, - PipelineItemView, subscribe, + CrdtEvent, NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView, + subscribe, }; pub use write::write_item; diff --git a/server/src/crdt_state/ops.rs b/server/src/crdt_state/ops.rs index a57c5ab2..d79472b0 100644 --- a/server/src/crdt_state/ops.rs +++ b/server/src/crdt_state/ops.rs @@ -1,10 +1,11 @@ //! Public sync-broadcast API and remote-op ingestion. +#![allow(unused_imports, dead_code)] use std::collections::HashMap; +use super::hex; use bft_json_crdt::json_crdt::*; use bft_json_crdt::op::ROOT_ID; -use super::hex; use tokio::sync::broadcast; use super::VectorClock; @@ -157,10 +158,10 @@ pub fn apply_remote_op(op: SignedOp) -> bool { #[cfg(test)] mod tests { - use super::*; use super::super::state::init_for_test; use super::super::types::{NodePresenceCrdt, PipelineItemCrdt}; use super::super::write::write_item; + use super::*; use bft_json_crdt::json_crdt::OpState; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; diff --git a/server/src/crdt_state/presence.rs b/server/src/crdt_state/presence.rs index ec29de03..ac30b348 100644 --- a/server/src/crdt_state/presence.rs +++ b/server/src/crdt_state/presence.rs @@ -1,10 +1,11 @@ //! Node identity, work claiming, and node presence (heartbeat) API. -use bft_json_crdt::json_crdt::*; -use bft_json_crdt::op::ROOT_ID; +#![allow(unused_imports, dead_code)] use super::hex; use super::read::read_item; +use bft_json_crdt::json_crdt::*; use bft_json_crdt::lww_crdt::LwwRegisterCrdt; +use bft_json_crdt::op::ROOT_ID; use fastcrypto::traits::{Signer, ToFromBytes}; use serde_json::json; diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index db7d68d2..85ed0c8b 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -1,12 +1,13 @@ //! Read API for pipeline items, dump introspection, and dependency helpers. +#![allow(unused_imports, dead_code)] use std::collections::HashMap; use bft_json_crdt::json_crdt::*; use super::state::{ALL_OPS, apply_and_persist, get_crdt, rebuild_index}; -use bft_json_crdt::op::ROOT_ID; use super::types::{PipelineDoc, PipelineItemCrdt, PipelineItemView}; +use bft_json_crdt::op::ROOT_ID; // ── Debug dump ─────────────────────────────────────────────────────── @@ -400,14 +401,14 @@ pub fn check_archived_deps_crdt(story_id: &str) -> Vec { #[cfg(test)] mod tests { - use super::*; use super::super::state::init_for_test; + use super::super::state::rebuild_index; use super::super::types::PipelineItemCrdt; use super::super::write::write_item; - use bft_json_crdt::op::ROOT_ID; - use super::super::state::rebuild_index; + use super::*; use bft_json_crdt::json_crdt::OpState; use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; use serde_json::json; #[test] diff --git a/server/src/crdt_state/state.rs b/server/src/crdt_state/state.rs index 4618ebe6..bf03fcc5 100644 --- a/server/src/crdt_state/state.rs +++ b/server/src/crdt_state/state.rs @@ -1,5 +1,6 @@ //! Internal CRDT state struct, statics, initialisation, and central write primitive. +#![allow(unused_imports, dead_code)] use std::collections::HashMap; use std::path::Path; use std::sync::{Mutex, OnceLock}; @@ -95,8 +96,6 @@ pub(super) fn get_crdt() -> Option<&'static Mutex> { } } - - /// Initialise the CRDT state layer. /// /// Opens the SQLite database, loads or creates a node keypair, replays any @@ -319,14 +318,14 @@ pub(super) fn emit_event(event: CrdtEvent) { #[cfg(test)] mod tests { - use super::*; + use super::super::hex; + use super::super::read::{extract_item_view, read_item}; use super::super::types::PipelineItemCrdt; use super::super::write::write_item; - use super::super::read::{extract_item_view, read_item}; + use super::*; use bft_json_crdt::json_crdt::OpState; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; - use super::super::hex; use serde_json::json; #[test] diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs index 3c48e4d5..07f8174e 100644 --- a/server/src/crdt_state/types.rs +++ b/server/src/crdt_state/types.rs @@ -1,12 +1,12 @@ //! CRDT document types, read-side view types, and CRDT-state events. +#![allow(unused_imports, dead_code)] use bft_json_crdt::json_crdt::*; use bft_json_crdt::list_crdt::ListCrdt; use bft_json_crdt::lww_crdt::LwwRegisterCrdt; use std::sync::OnceLock; use tokio::sync::broadcast; - /// An event emitted when a pipeline item's stage changes in the CRDT document. #[derive(Clone, Debug)] pub struct CrdtEvent { @@ -109,13 +109,13 @@ pub struct NodePresenceView { #[cfg(test)] mod tests { - use super::*; + use super::super::state::emit_event; use super::super::state::init_for_test; use super::super::write::write_item; + use super::*; use bft_json_crdt::json_crdt::OpState; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; - use super::super::state::emit_event; use serde_json::json; #[test] diff --git a/server/src/crdt_state/write.rs b/server/src/crdt_state/write.rs index 434e2ff1..487ee898 100644 --- a/server/src/crdt_state/write.rs +++ b/server/src/crdt_state/write.rs @@ -1,8 +1,9 @@ //! High-level write API for pipeline items. +#![allow(unused_imports, dead_code)] use bft_json_crdt::json_crdt::*; -use bft_json_crdt::op::ROOT_ID; use bft_json_crdt::lww_crdt::LwwRegisterCrdt; +use bft_json_crdt::op::ROOT_ID; use serde_json::json; use super::state::{apply_and_persist, emit_event, get_crdt, rebuild_index}; @@ -131,18 +132,18 @@ pub fn write_item( #[cfg(test)] mod tests { - use super::*; - use super::super::state::init_for_test; + use super::super::hex; + use super::super::read::extract_item_view; use super::super::read::read_item; + use super::super::state::init_for_test; + use super::super::state::rebuild_index; + use super::*; use bft_json_crdt::json_crdt::OpState; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; + use serde_json::json; use sqlx::SqlitePool; use sqlx::sqlite::SqliteConnectOptions; - use super::super::hex; - use super::super::state::rebuild_index; - use super::super::read::extract_item_view; - use serde_json::json; #[tokio::test] async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { diff --git a/server/src/crdt_sync/client.rs b/server/src/crdt_sync/client.rs index 8d65ce27..8cf66968 100644 --- a/server/src/crdt_sync/client.rs +++ b/server/src/crdt_sync/client.rs @@ -418,8 +418,7 @@ name = "test" #[test] fn error_threshold_is_ten() { assert_eq!( - RENDEZVOUS_ERROR_THRESHOLD, - 10, + RENDEZVOUS_ERROR_THRESHOLD, 10, "ERROR escalation threshold must be 10 consecutive failures" ); } diff --git a/server/src/crdt_sync/handshake.rs b/server/src/crdt_sync/handshake.rs index a561fa4e..9a048e0a 100644 --- a/server/src/crdt_sync/handshake.rs +++ b/server/src/crdt_sync/handshake.rs @@ -1,5 +1,6 @@ //! Auth handshake for the server-side `/crdt-sync` WebSocket. +#![allow(unused_imports, dead_code)] use futures::{SinkExt, StreamExt}; use poem::web::websocket::Message as WsMessage; @@ -63,11 +64,8 @@ pub(super) async fn perform_auth_handshake( } }; - let sig_valid = node_identity::verify_challenge( - &auth_msg.pubkey_hex, - &challenge, - &auth_msg.signature_hex, - ); + let sig_valid = + node_identity::verify_challenge(&auth_msg.pubkey_hex, &challenge, &auth_msg.signature_hex); let key_trusted = trusted_keys().iter().any(|k| k == &auth_msg.pubkey_hex); if !sig_valid || !key_trusted { @@ -86,7 +84,6 @@ pub(super) async fn perform_auth_handshake( Some(auth_msg) } - /// Close the WebSocket with a generic `auth_failed` reason. /// /// The close reason is intentionally the same for all auth failures @@ -105,11 +102,10 @@ async fn close_with_auth_failed( } /// Process an incoming text-frame sync message from a peer. - #[cfg(test)] mod tests { - use super::*; use super::super::server::crdt_sync_handler; + use super::*; #[allow(dead_code)] #[derive(Debug)] @@ -121,7 +117,6 @@ mod tests { PeerClosedEarly(Option), } - async fn start_auth_listener( trusted_keys: Vec, ) -> ( @@ -236,7 +231,6 @@ mod tests { (addr, result_rx) } - async fn close_listener_auth_failed( sink: &mut futures::stream::SplitSink< tokio_tungstenite::WebSocketStream, @@ -309,7 +303,8 @@ mod tests { TMsg::Text(t) => t.to_string(), other => panic!("Expected bulk text frame, got {other:?}"), }; - let bulk_msg: crate::crdt_sync::wire::SyncMessage = serde_json::from_str(&bulk_text).unwrap(); + let bulk_msg: crate::crdt_sync::wire::SyncMessage = + serde_json::from_str(&bulk_text).unwrap(); match bulk_msg { crate::crdt_sync::wire::SyncMessage::Bulk { ops } => { assert!( diff --git a/server/src/crdt_sync/mod.rs b/server/src/crdt_sync/mod.rs index b33adf12..6c1efb0e 100644 --- a/server/src/crdt_sync/mod.rs +++ b/server/src/crdt_sync/mod.rs @@ -1,47 +1,46 @@ //! CRDT sync — WebSocket-based replication of pipeline state between huskies nodes. -/// WebSocket-based CRDT sync layer for replicating pipeline state between -/// huskies nodes. -/// -/// # Protocol -/// -/// ## Version negotiation -/// -/// After the auth handshake, both sides send their first sync message: -/// -/// - **v2 peers** send a `clock` frame: `{"type":"clock","clock":{ : , ... }}` -/// containing a vector clock that maps each author's hex Ed25519 pubkey to the -/// count of ops received from that author. Upon receiving the peer's clock, -/// each side computes the delta via [`crdt_state::ops_since`] and sends only -/// the missing ops as a `bulk` frame. -/// -/// - **v1 (legacy) peers** send a `bulk` frame directly (full op dump). -/// A v2 peer receiving a `bulk` first (instead of a `clock`) falls back to -/// the full-dump path: applies the incoming bulk and responds with its own -/// full bulk. This preserves backward compatibility — no code change needed -/// on the v1 side. -/// -/// ## Text frames -/// A JSON object with a `"type"` field: -/// - `{"type":"clock","clock":{...}}` — Vector clock (v2 protocol). -/// - `{"type":"bulk","ops":[...]}` — Ops dump (full or delta). -/// - `{"type":"ready"}` — Signals that the bulk-delta phase is complete and the -/// sender is ready for real-time op streaming. Locally-generated ops are -/// buffered until the peer's `ready` is received, then flushed in order. -/// -/// ## Binary frames (real-time op broadcast) -/// Individual `SignedOp`s encoded via [`crate::crdt_wire`] (versioned JSON -/// envelope: `{"v":1,"op":{...}}`). Each locally-applied op is immediately -/// broadcast as a binary frame to all connected peers. -/// -/// Both the server endpoint and the rendezvous client use the same protocol, -/// making the connection fully symmetric. -/// -/// ## Backpressure -/// Each connected peer has its own [`tokio::sync::broadcast`] receiver. If a -/// slow peer allows the channel to fill (indicated by a `Lagged` error), the -/// connection is dropped with a warning log. The peer can reconnect and -/// receive a fresh bulk state dump to catch up. - +//! WebSocket-based CRDT sync layer for replicating pipeline state between +//! huskies nodes. +//! +//! # Protocol +//! +//! ## Version negotiation +//! +//! After the auth handshake, both sides send their first sync message: +//! +//! - **v2 peers** send a `clock` frame: `{"type":"clock","clock":{ : , ... }}` +//! containing a vector clock that maps each author's hex Ed25519 pubkey to the +//! count of ops received from that author. Upon receiving the peer's clock, +//! each side computes the delta via [`crdt_state::ops_since`] and sends only +//! the missing ops as a `bulk` frame. +//! +//! - **v1 (legacy) peers** send a `bulk` frame directly (full op dump). +//! A v2 peer receiving a `bulk` first (instead of a `clock`) falls back to +//! the full-dump path: applies the incoming bulk and responds with its own +//! full bulk. This preserves backward compatibility — no code change needed +//! on the v1 side. +//! +//! ## Text frames +//! A JSON object with a `"type"` field: +//! - `{"type":"clock","clock":{...}}` — Vector clock (v2 protocol). +//! - `{"type":"bulk","ops":[...]}` — Ops dump (full or delta). +//! - `{"type":"ready"}` — Signals that the bulk-delta phase is complete and the +//! sender is ready for real-time op streaming. Locally-generated ops are +//! buffered until the peer's `ready` is received, then flushed in order. +//! +//! ## Binary frames (real-time op broadcast) +//! Individual `SignedOp`s encoded via [`crate::crdt_wire`] (versioned JSON +//! envelope: `{"v":1,"op":{...}}`). Each locally-applied op is immediately +//! broadcast as a binary frame to all connected peers. +//! +//! Both the server endpoint and the rendezvous client use the same protocol, +//! making the connection fully symmetric. +//! +//! ## Backpressure +//! Each connected peer has its own [`tokio::sync::broadcast`] receiver. If a +//! slow peer allows the channel to fill (indicated by a `Lagged` error), the +//! connection is dropped with a warning log. The peer can reconnect and +//! receive a fresh bulk state dump to catch up. // ── Cross-cutting constants ───────────────────────────────────────── // ── Auth configuration ────────────────────────────────────────────── diff --git a/server/src/crdt_sync/server.rs b/server/src/crdt_sync/server.rs index 0ce9df91..b01be4ba 100644 --- a/server/src/crdt_sync/server.rs +++ b/server/src/crdt_sync/server.rs @@ -1,5 +1,6 @@ //! Server-side `/crdt-sync` WebSocket handler. +#![allow(unused_imports, dead_code)] use bft_json_crdt::json_crdt::SignedOp; use futures::{SinkExt, StreamExt}; use poem::handler; @@ -23,9 +24,6 @@ use super::dispatch::{handle_incoming_binary, handle_incoming_text}; use super::wire::{AuthMessage, ChallengeMessage, SyncMessage}; use super::{AUTH_TIMEOUT_SECS, PING_INTERVAL_SECS, PONG_TIMEOUT_SECS}; - - - /// Query parameters accepted on the `/crdt-sync` WebSocket upgrade request. #[derive(Deserialize)] struct SyncQueryParams { @@ -76,7 +74,7 @@ pub async fn crdt_sync_handler( slog!("[crdt-sync] Peer connected, starting auth handshake"); - let auth_msg = match super::handshake::perform_auth_handshake(&mut sink, &mut stream).await { + let _auth_msg = match super::handshake::perform_auth_handshake(&mut sink, &mut stream).await { Some(m) => m, None => return, }; @@ -296,7 +294,6 @@ pub async fn crdt_sync_handler( /// Wait for the next text-frame sync message from the peer, handling Ping/Pong /// transparently. /// - /// Wait for the next text-frame sync message from the peer, handling Ping/Pong /// transparently. /// @@ -321,9 +318,9 @@ async fn wait_for_sync_text( #[cfg(test)] mod tests { - use super::*; - use super::super::wire::SyncMessagePublic; use super::super::handshake::perform_auth_handshake; + use super::super::wire::SyncMessagePublic; + use super::*; #[test] fn peer_receives_op_encoded_via_wire_codec() { @@ -822,7 +819,6 @@ mod tests { ); } - #[test] fn keepalive_constants_are_correct() { assert_eq!( diff --git a/server/src/http/mcp/dispatch.rs b/server/src/http/mcp/dispatch.rs index a2f1c29d..7853b2e1 100644 --- a/server/src/http/mcp/dispatch.rs +++ b/server/src/http/mcp/dispatch.rs @@ -3,16 +3,20 @@ use serde_json::{Value, json}; use super::JsonRpcResponse; -use crate::http::context::AppContext; use super::{ agent_tools, diagnostics, git_tools, merge_tools, qa_tools, shell_tools, status_tools, story_tools, wizard_tools, }; +use crate::http::context::AppContext; use crate::slog_warn; // ── Tool dispatch ───────────────────────────────────────────────── -pub(super) async fn handle_tools_call(id: Option, params: &Value, ctx: &AppContext) -> JsonRpcResponse { +pub(super) async fn handle_tools_call( + id: Option, + params: &Value, + ctx: &AppContext, +) -> JsonRpcResponse { let tool_name = params.get("name").and_then(|v| v.as_str()).unwrap_or(""); let args = params.get("arguments").cloned().unwrap_or(json!({})); @@ -134,7 +138,6 @@ pub(super) async fn handle_tools_call(id: Option, params: &Value, ctx: &A } } - #[cfg(test)] mod tests { use super::*; diff --git a/server/src/http/mcp/mod.rs b/server/src/http/mcp/mod.rs index 72b1fa90..1800e033 100644 --- a/server/src/http/mcp/mod.rs +++ b/server/src/http/mcp/mod.rs @@ -1,7 +1,6 @@ //! HTTP MCP server module. use crate::http::context::AppContext; -use crate::slog_warn; use poem::handler; use poem::http::StatusCode; use poem::web::Data; @@ -20,7 +19,6 @@ pub mod status_tools; pub mod story_tools; pub mod wizard_tools; - mod dispatch; mod tools_list; @@ -206,7 +204,6 @@ fn handle_initialize(id: Option, params: &Value) -> JsonRpcResponse { ) } - #[cfg(test)] mod tests { use super::*; diff --git a/server/src/http/mcp/story_tools/bug.rs b/server/src/http/mcp/story_tools/bug.rs index fcb04efa..5777c7c2 100644 --- a/server/src/http/mcp/story_tools/bug.rs +++ b/server/src/http/mcp/story_tools/bug.rs @@ -1,5 +1,7 @@ //! Bug item MCP tools (create, list, close). +#![allow(unused_imports, dead_code)] +#[allow(unused_imports)] use crate::agents::{ close_bug_to_archive, feature_branch_has_unmerged_changes, move_story_to_done, }; @@ -21,7 +23,6 @@ use serde_json::{Value, json}; use std::collections::HashMap; use std::fs; - pub(crate) fn tool_create_bug(args: &Value, ctx: &AppContext) -> Result { let name = args .get("name") diff --git a/server/src/http/mcp/story_tools/criteria.rs b/server/src/http/mcp/story_tools/criteria.rs index 7a428430..f790c224 100644 --- a/server/src/http/mcp/story_tools/criteria.rs +++ b/server/src/http/mcp/story_tools/criteria.rs @@ -1,5 +1,7 @@ //! Acceptance-criteria MCP tools (todos, record_tests, ensure_acceptance, check/edit/add/remove). +#![allow(unused_imports, dead_code)] +#[allow(unused_imports)] use crate::agents::{ close_bug_to_archive, feature_branch_has_unmerged_changes, move_story_to_done, }; @@ -21,7 +23,6 @@ use serde_json::{Value, json}; use std::collections::HashMap; use std::fs; - pub(crate) fn tool_get_story_todos(args: &Value, ctx: &AppContext) -> Result { let story_id = args .get("story_id") @@ -226,8 +227,6 @@ mod tests { .unwrap(); } - - #[test] fn parse_test_cases_empty() { let result = parse_test_cases(None).unwrap(); diff --git a/server/src/http/mcp/story_tools/refactor.rs b/server/src/http/mcp/story_tools/refactor.rs index c65f20ad..89c19385 100644 --- a/server/src/http/mcp/story_tools/refactor.rs +++ b/server/src/http/mcp/story_tools/refactor.rs @@ -1,5 +1,7 @@ //! Refactor item MCP tools. +#![allow(unused_imports, dead_code)] +#[allow(unused_imports)] use crate::agents::{ close_bug_to_archive, feature_branch_has_unmerged_changes, move_story_to_done, }; @@ -21,7 +23,6 @@ use serde_json::{Value, json}; use std::collections::HashMap; use std::fs; - pub(crate) fn tool_create_refactor(args: &Value, ctx: &AppContext) -> Result { let name = args .get("name") diff --git a/server/src/http/mcp/story_tools/spike.rs b/server/src/http/mcp/story_tools/spike.rs index b4685bd5..d3ee8347 100644 --- a/server/src/http/mcp/story_tools/spike.rs +++ b/server/src/http/mcp/story_tools/spike.rs @@ -1,5 +1,7 @@ //! Spike item MCP tools. +#![allow(unused_imports, dead_code)] +#[allow(unused_imports)] use crate::agents::{ close_bug_to_archive, feature_branch_has_unmerged_changes, move_story_to_done, }; @@ -21,7 +23,6 @@ use serde_json::{Value, json}; use std::collections::HashMap; use std::fs; - pub(crate) fn tool_create_spike(args: &Value, ctx: &AppContext) -> Result { let name = args .get("name") @@ -37,9 +38,9 @@ pub(crate) fn tool_create_spike(args: &Value, ctx: &AppContext) -> Result Result { let name = args .get("name") @@ -460,8 +461,6 @@ mod tests { crate::db::write_content(story_id, content); } - - #[test] fn tool_validate_stories_empty_project() { let tmp = tempfile::tempdir().unwrap(); diff --git a/server/src/http/mcp/tools_list.rs b/server/src/http/mcp/tools_list.rs index cb098532..0a7a1fe5 100644 --- a/server/src/http/mcp/tools_list.rs +++ b/server/src/http/mcp/tools_list.rs @@ -1084,7 +1084,6 @@ pub(super) fn handle_tools_list(id: Option) -> JsonRpcResponse { ) } - #[cfg(test)] mod tests { use super::*; diff --git a/server/src/io/fs/scaffold/detect.rs b/server/src/io/fs/scaffold/detect.rs index f6a3cb04..af1beba8 100644 --- a/server/src/io/fs/scaffold/detect.rs +++ b/server/src/io/fs/scaffold/detect.rs @@ -279,7 +279,6 @@ pub(crate) fn detect_script_test(root: &Path) -> String { /// /// Detects the tech stack via [`detect_components_toml`] and combines the /// resulting `[[component]]` entries with the default project settings. - #[cfg(test)] mod tests { use super::*; diff --git a/server/src/io/fs/scaffold/mod.rs b/server/src/io/fs/scaffold/mod.rs index 28d1abde..51b3d7d0 100644 --- a/server/src/io/fs/scaffold/mod.rs +++ b/server/src/io/fs/scaffold/mod.rs @@ -1,5 +1,6 @@ //! Project scaffolding — creates the `.huskies/` directory structure and default files. +#![allow(unused_imports, dead_code)] use std::fs; use std::path::Path; @@ -152,7 +153,6 @@ pub(crate) fn scaffold_story_kit(root: &Path, port: u16) -> Result<(), String> { Ok(()) } - #[cfg(test)] mod tests { use super::*; diff --git a/server/src/io/fs/scaffold/templates.rs b/server/src/io/fs/scaffold/templates.rs index e979d9c9..90f61112 100644 --- a/server/src/io/fs/scaffold/templates.rs +++ b/server/src/io/fs/scaffold/templates.rs @@ -6,12 +6,14 @@ pub(super) const STORY_KIT_README: &str = include_str!("../../../../../.huskies/README.md"); -pub(super) const BOT_TOML_MATRIX_EXAMPLE: &str = include_str!("../../../../../.huskies/bot.toml.matrix.example"); +pub(super) const BOT_TOML_MATRIX_EXAMPLE: &str = + include_str!("../../../../../.huskies/bot.toml.matrix.example"); pub(super) const BOT_TOML_WHATSAPP_META_EXAMPLE: &str = include_str!("../../../../../.huskies/bot.toml.whatsapp-meta.example"); pub(super) const BOT_TOML_WHATSAPP_TWILIO_EXAMPLE: &str = include_str!("../../../../../.huskies/bot.toml.whatsapp-twilio.example"); -pub(super) const BOT_TOML_SLACK_EXAMPLE: &str = include_str!("../../../../../.huskies/bot.toml.slack.example"); +pub(super) const BOT_TOML_SLACK_EXAMPLE: &str = + include_str!("../../../../../.huskies/bot.toml.slack.example"); pub(super) const STORY_KIT_CONTEXT: &str = "\n\ # Project Context\n\ diff --git a/server/src/llm/providers/claude_code/events.rs b/server/src/llm/providers/claude_code/events.rs index d43258e2..303ba984 100644 --- a/server/src/llm/providers/claude_code/events.rs +++ b/server/src/llm/providers/claude_code/events.rs @@ -82,7 +82,6 @@ pub(super) fn process_json_event( /// Extracts text blocks into `content` and tool_use blocks into `tool_calls`, /// then sends a single `Message { role: Assistant }` via `msg_tx`. /// This is the authoritative source for the final message structure — streaming - pub(super) fn handle_stream_event( event: &serde_json::Value, token_tx: &tokio::sync::mpsc::UnboundedSender, @@ -133,7 +132,6 @@ pub(super) fn handle_stream_event( } } - #[cfg(test)] mod tests { use super::*; diff --git a/server/src/llm/providers/claude_code/mod.rs b/server/src/llm/providers/claude_code/mod.rs index 95f2b95d..38ae1a2b 100644 --- a/server/src/llm/providers/claude_code/mod.rs +++ b/server/src/llm/providers/claude_code/mod.rs @@ -1,4 +1,5 @@ //! Claude Code provider — runs Claude Code CLI in a PTY and parses structured output. +#![allow(unused_imports, dead_code)] use crate::slog; use portable_pty::{CommandBuilder, PtySize, native_pty_system}; use std::io::{BufRead, BufReader}; @@ -30,7 +31,6 @@ pub struct ClaudeCodeResult { /// Permissions are delegated to the MCP `prompt_permission` tool via /// `--permission-prompt-tool`, so Claude Code calls back into the server /// when a tool requires user approval. The frontend dialog handles the UX. - mod events; mod parse; @@ -179,7 +179,6 @@ impl ClaudeCodeProvider { /// Permission handling is delegated to the MCP `prompt_permission` tool /// via `--permission-prompt-tool`. Claude Code calls the MCP tool when it /// needs user approval, and the server bridges the request to the frontend. - #[allow(clippy::too_many_arguments)] fn run_pty_session( user_message: &str, @@ -384,7 +383,6 @@ fn run_pty_session( /// /// Returns `true` if a `result` event was received (signals session completion). /// Captures the session ID from the first event that carries it. - #[cfg(test)] mod tests { use super::*; diff --git a/server/src/llm/providers/claude_code/parse.rs b/server/src/llm/providers/claude_code/parse.rs index 29460782..83f4b611 100644 --- a/server/src/llm/providers/claude_code/parse.rs +++ b/server/src/llm/providers/claude_code/parse.rs @@ -1,6 +1,7 @@ //! Claude Code message parsing — extracts text and tool-use info from assistant //! and user messages. +#![allow(unused_imports, dead_code)] use crate::llm::types::{FunctionCall, Message, Role, ToolCall}; pub(super) fn parse_assistant_message( @@ -58,8 +59,10 @@ pub(super) fn parse_assistant_message( /// Parse a `user` message containing tool_result blocks. /// /// Claude Code injects tool results into the conversation as `user` role - -pub(super) fn parse_tool_results(content: &[serde_json::Value], msg_tx: &std::sync::mpsc::Sender) { +pub(super) fn parse_tool_results( + content: &[serde_json::Value], + msg_tx: &std::sync::mpsc::Sender, +) { for block in content { if block.get("type").and_then(|t| t.as_str()) != Some("tool_result") { continue; @@ -104,7 +107,6 @@ pub(super) fn parse_tool_results(content: &[serde_json::Value], msg_tx: &std::sy /// Extract text from a stream event and send to the token channel for live display. /// /// Stream events provide incremental text deltas for real-time rendering. - #[cfg(test)] mod tests { use super::*; diff --git a/server/src/main.rs b/server/src/main.rs index 3306c90c..394db8ee 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -48,7 +48,7 @@ use tokio::sync::broadcast; mod cli; -use cli::{CliArgs, parse_cli_args, print_help, resolve_path_arg}; +use cli::{parse_cli_args, resolve_path_arg}; #[tokio::main] async fn main() -> Result<(), std::io::Error> { @@ -868,7 +868,6 @@ async fn main() -> Result<(), std::io::Error> { result } - #[cfg(test)] mod tests { use super::*; diff --git a/server/src/pipeline_state/events.rs b/server/src/pipeline_state/events.rs index 90c69db2..8c627c13 100644 --- a/server/src/pipeline_state/events.rs +++ b/server/src/pipeline_state/events.rs @@ -1,5 +1,6 @@ //! Event bus for pipeline state transitions. +#![allow(unused_imports, dead_code)] use chrono::{DateTime, Utc}; use super::{BranchName, PipelineEvent, Stage, StoryId}; @@ -51,12 +52,18 @@ impl Default for EventBus { #[cfg(test)] mod tests { use super::*; - use std::sync::{Arc, Mutex}; use std::num::NonZeroU32; + use std::sync::{Arc, Mutex}; - fn nz(n: u32) -> NonZeroU32 { NonZeroU32::new(n).unwrap() } - fn fb(name: &str) -> BranchName { BranchName(name.to_string()) } - fn sid(s: &str) -> StoryId { StoryId(s.to_string()) } + fn nz(n: u32) -> NonZeroU32 { + NonZeroU32::new(n).unwrap() + } + fn fb(name: &str) -> BranchName { + BranchName(name.to_string()) + } + fn sid(s: &str) -> StoryId { + StoryId(s.to_string()) + } #[test] fn event_bus_fires_to_all_subscribers() { diff --git a/server/src/pipeline_state/mod.rs b/server/src/pipeline_state/mod.rs index 3a190d9c..b0f41b66 100644 --- a/server/src/pipeline_state/mod.rs +++ b/server/src/pipeline_state/mod.rs @@ -14,7 +14,7 @@ // Foundation module — all items are exercised by tests but not yet called from // non-test code. The dead_code lint is suppressed until consumer migration. -#![allow(dead_code)] +#![allow(unused_imports, dead_code)] use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; @@ -460,7 +460,6 @@ pub fn execution_transition( } } - mod events; mod projection; mod subscribers; diff --git a/server/src/pipeline_state/projection.rs b/server/src/pipeline_state/projection.rs index 5bb68a19..5a523d26 100644 --- a/server/src/pipeline_state/projection.rs +++ b/server/src/pipeline_state/projection.rs @@ -1,5 +1,6 @@ //! Projection layer — converts loose CRDT views into typed `PipelineItem` enums. +#![allow(unused_imports, dead_code)] use chrono::{DateTime, Utc}; use std::fmt; use std::num::NonZeroU32; @@ -7,8 +8,7 @@ use std::num::NonZeroU32; use crate::crdt_state::{PipelineItemView, read_all_items, read_item}; use super::{ - ArchiveReason, BranchName, ExecutionState, GitSha, PipelineItem, Stage, StoryId, - stage_dir_name, + ArchiveReason, BranchName, ExecutionState, GitSha, PipelineItem, Stage, StoryId, stage_dir_name, }; /// Errors from projecting loose CRDT data into typed enums. @@ -179,10 +179,18 @@ mod tests { use chrono::TimeZone; use std::num::NonZeroU32; - fn nz(n: u32) -> NonZeroU32 { NonZeroU32::new(n).unwrap() } - fn fb(name: &str) -> BranchName { BranchName(name.to_string()) } - fn sha(s: &str) -> GitSha { GitSha(s.to_string()) } - fn sid(s: &str) -> StoryId { StoryId(s.to_string()) } + fn nz(n: u32) -> NonZeroU32 { + NonZeroU32::new(n).unwrap() + } + fn fb(name: &str) -> BranchName { + BranchName(name.to_string()) + } + fn sha(s: &str) -> GitSha { + GitSha(s.to_string()) + } + fn sid(s: &str) -> StoryId { + StoryId(s.to_string()) + } #[test] fn project_backlog_item() { diff --git a/server/src/pipeline_state/subscribers.rs b/server/src/pipeline_state/subscribers.rs index 2a753a07..bef0ac46 100644 --- a/server/src/pipeline_state/subscribers.rs +++ b/server/src/pipeline_state/subscribers.rs @@ -4,7 +4,6 @@ use super::Stage; use super::events::{TransitionFired, TransitionSubscriber}; use super::{event_label, stage_dir_name, stage_label}; - // ── Subscriber stubs (real dispatch uses these as the interface) ───────────── // // These are ready to wire into the event bus but not yet connected to the @@ -91,4 +90,3 @@ impl TransitionSubscriber for WebUiBroadcastSubscriber { ); } } -