From 01169332b38f724807896f0207d35f52405c62dd Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 28 Apr 2026 10:45:23 +0000 Subject: [PATCH] huskies: merge 774 --- server/src/agent_mode.rs | 1 - server/src/http/context.rs | 35 +- server/src/http/mcp/shell_tools/script.rs | 399 +++++++++++----------- server/src/main.rs | 1 - 4 files changed, 199 insertions(+), 237 deletions(-) diff --git a/server/src/agent_mode.rs b/server/src/agent_mode.rs index bc6b2d2b..db7dc1f2 100644 --- a/server/src/agent_mode.rs +++ b/server/src/agent_mode.rs @@ -651,7 +651,6 @@ fn build_agent_app_context( bot_shutdown: None, matrix_shutdown_tx: None, timer_store, - test_jobs: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())), } } diff --git a/server/src/http/context.rs b/server/src/http/context.rs index cc88ac87..593f6641 100644 --- a/server/src/http/context.rs +++ b/server/src/http/context.rs @@ -8,36 +8,9 @@ use crate::state::SessionState; use crate::store::JsonFileStore; use crate::workflow::WorkflowState; use poem::http::StatusCode; -use std::collections::HashMap; -use std::path::PathBuf; use std::sync::Arc; use tokio::sync::{broadcast, mpsc, oneshot}; -/// A running or completed test job spawned by the `run_tests` MCP tool. -pub struct TestJob { - /// The child process handle. `None` once the process has exited and results - /// have been collected. - pub child: Option, - /// Populated once the child exits. - pub result: Option, - /// When the job was started. - pub started_at: std::time::Instant, -} - -/// The result of a completed test job. -#[derive(Clone)] -pub struct TestJobResult { - pub passed: bool, - pub exit_code: i32, - pub tests_passed: u64, - pub tests_failed: u64, - pub output: String, -} - -/// Shared registry of in-flight and recently completed test jobs, keyed by -/// worktree path. -pub type TestJobRegistry = Arc>>; - /// The user's decision when responding to a permission dialog. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PermissionDecision { @@ -101,9 +74,6 @@ pub struct AppContext { /// spawned by the bot so that cancellations take effect in-memory rather /// than only on disk. pub timer_store: Arc, - /// Registry of running/completed test jobs spawned by the `run_tests` MCP - /// tool. Keyed by worktree path so each worktree has at most one active job. - pub test_jobs: TestJobRegistry, } #[cfg(test)] @@ -127,7 +97,9 @@ impl AppContext { bot_user_id: String::new(), ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())), perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), - pending_perm_replies: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + pending_perm_replies: Arc::new(tokio::sync::Mutex::new( + std::collections::HashMap::new(), + )), permission_timeout_secs: 120, status: agents.status_broadcaster(), }); @@ -143,7 +115,6 @@ impl AppContext { bot_shutdown: None, matrix_shutdown_tx: None, timer_store, - test_jobs: Arc::new(std::sync::Mutex::new(HashMap::new())), } } } diff --git a/server/src/http/mcp/shell_tools/script.rs b/server/src/http/mcp/shell_tools/script.rs index ebd450b1..b7b823fc 100644 --- a/server/src/http/mcp/shell_tools/script.rs +++ b/server/src/http/mcp/shell_tools/script.rs @@ -1,6 +1,9 @@ //! MCP shell script tools: run_tests / get_test_result / run_build / run_lint. use serde_json::{Value, json}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::{Mutex, OnceLock}; use crate::http::context::AppContext; #[allow(unused_imports)] @@ -11,6 +14,45 @@ use super::exec::validate_working_dir; const TEST_TIMEOUT_SECS: u64 = 1200; const MAX_OUTPUT_LINES: usize = 100; +// ── In-flight process registry ─────────────────────────────────────────────── +// +// Child process handles are ephemeral and cannot survive a server restart. +// Persistent state (status, output, timestamps) lives in the CRDT `test_jobs` +// collection. This module-level static tracks only the OS-level child process +// so we can kill it on restart or poll it during `get_test_result`. + +struct InFlightJob { + child: std::process::Child, +} + +static ACTIVE_JOBS: OnceLock>> = OnceLock::new(); + +fn active_jobs() -> &'static Mutex> { + ACTIVE_JOBS.get_or_init(|| Mutex::new(HashMap::new())) +} + +/// Derive the CRDT key for a test job from its working directory. +/// +/// Uses the last path component (the story ID or project directory name) so +/// that CRDT entries are human-readable and stable across path changes. +fn story_key(working_dir: &Path) -> String { + working_dir + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("root") + .to_string() +} + +/// Current time as a Unix timestamp (seconds, f64) for CRDT fields. +fn unix_now() -> f64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs_f64() +} + +// ── run_tests ──────────────────────────────────────────────────────────────── + pub(crate) async fn tool_run_tests(args: &Value, ctx: &AppContext) -> Result { let project_root = ctx.services.agents.get_project_root(&ctx.state)?; @@ -26,14 +68,14 @@ pub(crate) async fn tool_run_tests(args: &Value, ctx: &AppContext) -> Result Result Result j, None => return Err("Test job disappeared unexpectedly".to_string()), }; - if let Some(child) = job.child.as_mut() { - match child.try_wait() { - Ok(Some(status)) => { - // Done — join drain threads and collect output. - jobs.remove(&working_dir); - let stdout = stdout_handle - .take() - .and_then(|h| h.join().ok()) - .unwrap_or_default(); - let stderr = stderr_handle - .take() - .and_then(|h| h.join().ok()) - .unwrap_or_default(); - let combined = format!("{stdout}{stderr}"); - let (tests_passed, tests_failed) = parse_test_counts(&combined); - let truncated = truncate_output(&combined, MAX_OUTPUT_LINES); - let passed = status.success(); - let exit_code = status.code().unwrap_or(-1); + match job.child.try_wait() { + Ok(Some(status)) => { + // Child exited — collect output and write final CRDT state. + jobs.remove(&working_dir); + let stdout = stdout_handle + .take() + .and_then(|h| h.join().ok()) + .unwrap_or_default(); + let stderr = stderr_handle + .take() + .and_then(|h| h.join().ok()) + .unwrap_or_default(); + let combined = format!("{stdout}{stderr}"); + let (tests_passed, tests_failed) = parse_test_counts(&combined); + let truncated = truncate_output(&combined, MAX_OUTPUT_LINES); + let passed = status.success(); + let exit_code = status.code().unwrap_or(-1); + let crdt_status = if passed { "pass" } else { "fail" }; + crate::slog!( + "[run_tests] Test job for {} finished (pid {}, passed={})", + working_dir.display(), + pid, + passed + ); + + // Persist result in CRDT for post-restart visibility. + crate::crdt_state::write_test_job( + &sid, + crdt_status, + started_at_unix, + Some(unix_now()), + Some(&truncated), + ); + + // Capture positive test evidence in the DB so the pipeline + // advance salvage path (bug 645/668) can confirm the agent + // ran passing tests before it died. Only written when running + // in a story worktree (worktree_path arg provided). + if passed && args.get("worktree_path").is_some() { + crate::db::write_content(&format!("{sid}:run_tests_ok"), "1"); + } + return serde_json::to_string_pretty(&json!({ + "passed": passed, + "exit_code": exit_code, + "timed_out": false, + "tests_passed": tests_passed, + "tests_failed": tests_failed, + "output": truncated, + })) + .map_err(|e| format!("Serialization error: {e}")); + } + Ok(None) => { + // Still running — check timeout. + if start.elapsed().as_secs() > TEST_TIMEOUT_SECS { + let _ = job.child.kill(); + let _ = job.child.wait(); crate::slog!( - "[run_tests] Test job for {} finished (pid {}, passed={})", + "[run_tests] Killed test job for {} (pid {}) after {}s timeout", working_dir.display(), pid, - passed + TEST_TIMEOUT_SECS + ); + jobs.remove(&working_dir); + let timeout_msg = format!("Test suite timed out after {TEST_TIMEOUT_SECS}s"); + crate::crdt_state::write_test_job( + &sid, + "fail", + started_at_unix, + Some(unix_now()), + Some(&timeout_msg), ); - // Capture positive test evidence in the DB so the pipeline - // advance salvage path (bug 645/668) can confirm the agent - // ran passing tests before it died. Only written when running - // in a story worktree (worktree_path arg provided); extract - // the story ID from the last path component. - if passed - && args.get("worktree_path").is_some() - && let Some(story_id) = working_dir.file_name().and_then(|n| n.to_str()) - { - crate::db::write_content(&format!("{story_id}:run_tests_ok"), "1"); - } return serde_json::to_string_pretty(&json!({ - "passed": passed, - "exit_code": exit_code, - "timed_out": false, - "tests_passed": tests_passed, - "tests_failed": tests_failed, - "output": truncated, + "passed": false, + "exit_code": -1, + "timed_out": true, + "tests_passed": 0, + "tests_failed": 0, + "output": timeout_msg, })) .map_err(|e| format!("Serialization error: {e}")); } - Ok(None) => { - // Still running — check timeout. - if start.elapsed().as_secs() > TEST_TIMEOUT_SECS { - let _ = child.kill(); - let _ = child.wait(); - crate::slog!( - "[run_tests] Killed test job for {} (pid {}) after {}s timeout", - working_dir.display(), - pid, - TEST_TIMEOUT_SECS - ); - jobs.remove(&working_dir); - return serde_json::to_string_pretty(&json!({ - "passed": false, - "exit_code": -1, - "timed_out": true, - "tests_passed": 0, - "tests_failed": 0, - "output": format!("Test suite timed out after {}s", TEST_TIMEOUT_SECS), - })) - .map_err(|e| format!("Serialization error: {e}")); - } - } - Err(e) => { - jobs.remove(&working_dir); - return Err(format!("Failed to check child status: {e}")); - } + } + Err(e) => { + jobs.remove(&working_dir); + let msg = e.to_string(); + crate::crdt_state::write_test_job( + &sid, + "fail", + started_at_unix, + Some(unix_now()), + Some(&msg), + ); + return Err(format!("Failed to check child status: {e}")); } } } } +// ── get_test_result ────────────────────────────────────────────────────────── + /// How long `get_test_result` blocks server-side before returning "running". /// This prevents agents from burning turns polling every 2 seconds. const TEST_POLL_BLOCK_SECS: u64 = 20; /// Check on a running test job and return results if complete. /// -/// Blocks for up to 15 seconds, checking every second. Returns immediately -/// when the test finishes, or after 15s with `{"status": "running"}`. +/// Reads persistent state from the CRDT `test_jobs` collection. Blocks for up +/// to [`TEST_POLL_BLOCK_SECS`], checking the CRDT every second. Returns +/// immediately when the test finishes, or after the blocking period with +/// `{"status": "running"}`. +/// +/// After a server restart the CRDT entry written by `run_tests` remains +/// visible, so callers can observe the "running" status even when the original +/// child process is gone. pub(crate) async fn tool_get_test_result(args: &Value, ctx: &AppContext) -> Result { let project_root = ctx.services.agents.get_project_root(&ctx.state)?; @@ -190,128 +259,64 @@ pub(crate) async fn tool_get_test_result(args: &Value, ctx: &AppContext) -> Resu .map_err(|e| format!("Cannot canonicalize project root: {e}"))?, }; - // Block for up to TEST_POLL_BLOCK_SECS, checking once per second. - let test_jobs = ctx.test_jobs.clone(); - let wd = working_dir.clone(); + let sid = story_key(&working_dir); + + // Poll CRDT for up to TEST_POLL_BLOCK_SECS, returning as soon as the job + // transitions from "running" to a terminal state. for _ in 0..TEST_POLL_BLOCK_SECS { - { - let mut jobs = test_jobs.lock().map_err(|e| e.to_string())?; - if let Some(job) = jobs.get_mut(&wd) { - if let Some(child) = job.child.as_mut() { - match child.try_wait() { - Ok(Some(status)) => { - let result = collect_child_result(child, status); - job.child = None; - job.result = Some(result.clone()); - jobs.remove(&wd); - return format_test_result(&result); - } - Ok(None) => {} // still running, keep waiting - Err(e) => { - jobs.remove(&wd); - return Err(format!("Failed to check child status: {e}")); - } - } - } else if let Some(result) = job.result.clone() { - jobs.remove(&wd); - return format_test_result(&result); - } - } else { + match crate::crdt_state::read_test_job(&sid) { + None => { return Err( "No test job running for this worktree. Call run_tests first.".to_string(), ); } + Some(view) if view.status == "pass" || view.status == "fail" => { + return format_crdt_result(&view); + } + Some(_) => { + // Still "running" — wait one second and re-check. + } } tokio::time::sleep(std::time::Duration::from_secs(1)).await; } - // Still running after blocking period — return status. - let mut jobs = ctx.test_jobs.lock().map_err(|e| e.to_string())?; - - let job = jobs.get_mut(&working_dir).ok_or_else(|| { - "No test job running for this worktree. Call run_tests first.".to_string() - })?; - - // Check if child has finished. - if let Some(child) = job.child.as_mut() { - match child.try_wait() { - Ok(Some(status)) => { - let result = collect_child_result(child, status); - job.child = None; - job.result = Some(result.clone()); - let resp = format_test_result(&result); - jobs.remove(&working_dir); - return resp; - } - Ok(None) => { - let elapsed = job.started_at.elapsed().as_secs(); - // If exceeded our max timeout, kill it. - if elapsed > TEST_TIMEOUT_SECS { - let _ = child.kill(); - let _ = child.wait(); - crate::slog!( - "[run_tests] Killed test job for {} after {elapsed}s timeout", - working_dir.display() - ); - jobs.remove(&working_dir); - return serde_json::to_string_pretty(&json!({ - "passed": false, - "exit_code": -1, - "timed_out": true, - "tests_passed": 0, - "tests_failed": 0, - "output": format!("Test suite timed out after {elapsed}s"), - })) - .map_err(|e| format!("Serialization error: {e}")); - } - return serde_json::to_string_pretty(&json!({ - "status": "running", - "elapsed_secs": elapsed, - })) - .map_err(|e| format!("Serialization error: {e}")); - } - Err(e) => { - jobs.remove(&working_dir); - return Err(format!("Failed to check child status: {e}")); - } + // Still running after the blocking window — return status so the caller + // can decide whether to poll again or give up. + match crate::crdt_state::read_test_job(&sid) { + None => Err("No test job running for this worktree. Call run_tests first.".to_string()), + Some(view) if view.status == "pass" || view.status == "fail" => format_crdt_result(&view), + Some(view) => { + let elapsed = unix_now() - view.started_at; + serde_json::to_string_pretty(&json!({ + "status": "running", + "elapsed_secs": elapsed.max(0.0) as u64, + })) + .map_err(|e| format!("Serialization error: {e}")) } } - - // Job exists with cached result. - if let Some(result) = job.result.clone() { - jobs.remove(&working_dir); - return format_test_result(&result); - } - - Err("Test job in unexpected state".to_string()) } -fn collect_child_result( - child: &mut std::process::Child, - status: std::process::ExitStatus, -) -> crate::http::context::TestJobResult { - let mut stdout = String::new(); - let mut stderr = String::new(); - if let Some(ref mut out) = child.stdout { - use std::io::Read; - let _ = out.read_to_string(&mut stdout); - } - if let Some(ref mut err) = child.stderr { - use std::io::Read; - let _ = err.read_to_string(&mut stderr); - } - let combined = format!("{stdout}{stderr}"); - let (tests_passed, tests_failed) = parse_test_counts(&combined); - let exit_code = status.code().unwrap_or(-1); - crate::http::context::TestJobResult { - passed: status.success(), - exit_code, - tests_passed, - tests_failed, - output: truncate_output(&combined, MAX_OUTPUT_LINES), - } +/// Build a JSON result object from a completed CRDT test-job view. +/// +/// `exit_code` is approximated from status (0 = pass, 1 = fail) because the +/// CRDT schema does not store raw exit codes. +fn format_crdt_result(view: &crate::crdt_state::TestJobView) -> Result { + let passed = view.status == "pass"; + let output = view.output.clone().unwrap_or_default(); + let (tests_passed, tests_failed) = parse_test_counts(&output); + serde_json::to_string_pretty(&json!({ + "passed": passed, + "exit_code": if passed { 0 } else { 1 }, + "timed_out": false, + "tests_passed": tests_passed, + "tests_failed": tests_failed, + "output": output, + })) + .map_err(|e| format!("Serialization error: {e}")) } +// ── run_build / run_lint ───────────────────────────────────────────────────── + /// Shared implementation for run_build and run_lint: runs a named script /// (`script/`) in the working directory, captures output, and returns async fn run_script_tool( @@ -372,18 +377,6 @@ pub(crate) async fn tool_run_lint(args: &Value, ctx: &AppContext) -> Result Result { - serde_json::to_string_pretty(&json!({ - "passed": result.passed, - "exit_code": result.exit_code, - "timed_out": false, - "tests_passed": result.tests_passed, - "tests_failed": result.tests_failed, - "output": result.output, - })) - .map_err(|e| format!("Serialization error: {e}")) -} - #[cfg(test)] mod tests { use super::*; diff --git a/server/src/main.rs b/server/src/main.rs index 5f77855c..2e1b588a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -692,7 +692,6 @@ async fn main() -> Result<(), std::io::Error> { bot_shutdown: bot_shutdown_notifier.clone(), matrix_shutdown_tx: Some(Arc::clone(&matrix_shutdown_tx)), timer_store, - test_jobs: std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())), }; // Create the per-project event buffer and subscribe it to the watcher channel