diff --git a/server/src/agents/mod.rs b/server/src/agents/mod.rs index 7fd5e6d5..939b5fec 100644 --- a/server/src/agents/mod.rs +++ b/server/src/agents/mod.rs @@ -6,6 +6,7 @@ pub mod merge; pub(crate) mod pool; pub(crate) mod pty; pub mod runtime; +pub mod session_store; pub mod token_usage; use crate::config::AgentConfig; diff --git a/server/src/agents/pool/start/mod.rs b/server/src/agents/pool/start/mod.rs index c71620fa..d6dd4731 100644 --- a/server/src/agents/pool/start/mod.rs +++ b/server/src/agents/pool/start/mod.rs @@ -292,6 +292,22 @@ impl AgentPool { .map(|a| a.inactivity_timeout_secs) .unwrap_or(300); + // If no explicit session_id_to_resume was provided, look up from the + // persistent session store. The key includes the model so a model + // change (e.g. sonnet → opus) produces a cache miss — intentional. + let effective_session_id = session_id_to_resume.or_else(|| { + let model = config + .find_agent(&resolved_name) + .and_then(|a| a.model.clone()) + .unwrap_or_default(); + crate::agents::session_store::lookup_session( + project_root, + story_id, + &resolved_name, + &model, + ) + }); + // Clone all values needed inside the background spawn. // Spawn the background task. Worktree creation and agent launch happen here // so `start_agent` returns immediately after registering the agent as @@ -300,7 +316,7 @@ impl AgentPool { project_root.to_path_buf(), config.clone(), resume_context.map(str::to_string), - session_id_to_resume, + effective_session_id, story_id.to_string(), resolved_name.clone(), tx.clone(), diff --git a/server/src/agents/pool/start/spawn.rs b/server/src/agents/pool/start/spawn.rs index 1b6b6e12..8eefeec5 100644 --- a/server/src/agents/pool/start/spawn.rs +++ b/server/src/agents/pool/start/spawn.rs @@ -155,13 +155,17 @@ pub(super) async fn run_agent_spawn( // (which would re-read CLAUDE.md and README) and send only the gate // failure context as a new message. On a fresh start, append the // failure context to the original prompt as before. - let effective_prompt = match &session_id_to_resume_owned { - Some(_) => resume_context_owned.unwrap_or_default(), + let (effective_prompt, fresh_prompt) = match &session_id_to_resume_owned { + Some(_) => { + // Keep the full rendered prompt as fallback if resume fails. + let fallback = prompt; + (resume_context_owned.unwrap_or_default(), Some(fallback)) + } None => { if let Some(ctx) = resume_context_owned { prompt.push_str(&ctx); } - prompt + (prompt, None) } }; @@ -200,6 +204,7 @@ pub(super) async fn run_agent_spawn( inactivity_timeout_secs, mcp_port: port_for_task, session_id_to_resume: session_id_to_resume_owned.clone(), + fresh_prompt: fresh_prompt.clone(), }; runtime .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) @@ -217,6 +222,7 @@ pub(super) async fn run_agent_spawn( inactivity_timeout_secs, mcp_port: port_for_task, session_id_to_resume: session_id_to_resume_owned.clone(), + fresh_prompt: fresh_prompt.clone(), }; runtime .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) @@ -234,6 +240,7 @@ pub(super) async fn run_agent_spawn( inactivity_timeout_secs, mcp_port: port_for_task, session_id_to_resume: session_id_to_resume_owned, + fresh_prompt, }; runtime .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) @@ -266,6 +273,21 @@ pub(super) async fn run_agent_spawn( } } + // Persist session_id so respawns can resume prior reasoning. + if let Some(ref sess_id) = result.session_id { + let model = config_clone + .find_agent(&aname) + .and_then(|a| a.model.clone()) + .unwrap_or_default(); + crate::agents::session_store::record_session( + &project_root_clone, + &sid, + &aname, + &model, + sess_id, + ); + } + // Mergemaster agents have their own completion path via // start_merge_agent_work / run_merge_pipeline and must NOT go // through server-owned gates. When a mergemaster exits early diff --git a/server/src/agents/runtime/claude_code.rs b/server/src/agents/runtime/claude_code.rs index 8d696a97..9a43edec 100644 --- a/server/src/agents/runtime/claude_code.rs +++ b/server/src/agents/runtime/claude_code.rs @@ -7,6 +7,7 @@ use tokio::sync::broadcast; use crate::agent_log::AgentLogWriter; use crate::io::watcher::WatcherEvent; +use crate::slog_warn; use super::{AgentEvent, AgentRuntime, RuntimeContext, RuntimeResult, RuntimeStatus}; @@ -49,18 +50,51 @@ impl AgentRuntime for ClaudeCodeRuntime { &ctx.cwd, &tx, &event_log, - log_writer, + log_writer.clone(), ctx.inactivity_timeout_secs, Arc::clone(&self.child_killers), self.watcher_tx.clone(), ctx.session_id_to_resume.as_deref(), ) - .await?; + .await; - Ok(RuntimeResult { - session_id: pty_result.session_id, - token_usage: pty_result.token_usage, - }) + match pty_result { + Ok(result) => Ok(RuntimeResult { + session_id: result.session_id, + token_usage: result.token_usage, + }), + Err(e) if ctx.session_id_to_resume.is_some() && ctx.fresh_prompt.is_some() => { + // Resume failed — fall back to a fresh session without --resume. + slog_warn!( + "[agents] Resume failed for {}:{}, retrying without --resume: {}", + ctx.story_id, + ctx.agent_name, + e + ); + let fresh = ctx.fresh_prompt.unwrap(); + let fallback_result = super::super::pty::run_agent_pty_streaming( + &ctx.story_id, + &ctx.agent_name, + &ctx.command, + &ctx.args, + &fresh, + &ctx.cwd, + &tx, + &event_log, + log_writer, + ctx.inactivity_timeout_secs, + Arc::clone(&self.child_killers), + self.watcher_tx.clone(), + None, // no --resume on fallback + ) + .await?; + Ok(RuntimeResult { + session_id: fallback_result.session_id, + token_usage: fallback_result.token_usage, + }) + } + Err(e) => Err(e), + } } fn stop(&self) { diff --git a/server/src/agents/runtime/gemini.rs b/server/src/agents/runtime/gemini.rs index e1c5377e..ad7385af 100644 --- a/server/src/agents/runtime/gemini.rs +++ b/server/src/agents/runtime/gemini.rs @@ -705,6 +705,7 @@ mod tests { inactivity_timeout_secs: 300, mcp_port: 3001, session_id_to_resume: None, + fresh_prompt: None, }; let instruction = build_system_instruction(&ctx); @@ -723,6 +724,7 @@ mod tests { inactivity_timeout_secs: 300, mcp_port: 3001, session_id_to_resume: None, + fresh_prompt: None, }; let instruction = build_system_instruction(&ctx); @@ -803,6 +805,7 @@ mod tests { inactivity_timeout_secs: 300, mcp_port: 3001, session_id_to_resume: None, + fresh_prompt: None, }; // The model extraction logic is inside start(), but we test the diff --git a/server/src/agents/runtime/mod.rs b/server/src/agents/runtime/mod.rs index 9ae0013b..2704f057 100644 --- a/server/src/agents/runtime/mod.rs +++ b/server/src/agents/runtime/mod.rs @@ -32,6 +32,13 @@ pub struct RuntimeContext { /// than `claude -p `. The agent re-enters the previous /// conversation and receives the `prompt` (if non-empty) as a new message. pub session_id_to_resume: Option, + /// Full rendered prompt for a fresh session, kept as fallback if resume fails. + /// + /// When `session_id_to_resume` is `Some`, `prompt` contains only the + /// resume context (e.g. gate failure output). If the CLI rejects the + /// resume (session expired, file missing, version mismatch), the runtime + /// retries with this full prompt and no `--resume` flag. + pub fresh_prompt: Option, } /// Result returned by a runtime after the agent session completes. @@ -101,6 +108,7 @@ mod tests { inactivity_timeout_secs: 300, mcp_port: 3001, session_id_to_resume: None, + fresh_prompt: None, }; assert_eq!(ctx.story_id, "42_story_foo"); assert_eq!(ctx.agent_name, "coder-1"); diff --git a/server/src/agents/runtime/openai.rs b/server/src/agents/runtime/openai.rs index 73640d38..ea4809eb 100644 --- a/server/src/agents/runtime/openai.rs +++ b/server/src/agents/runtime/openai.rs @@ -617,6 +617,7 @@ mod tests { inactivity_timeout_secs: 300, mcp_port: 3001, session_id_to_resume: None, + fresh_prompt: None, }; assert_eq!(build_system_text(&ctx), "Custom system prompt"); @@ -634,6 +635,7 @@ mod tests { inactivity_timeout_secs: 300, mcp_port: 3001, session_id_to_resume: None, + fresh_prompt: None, }; let text = build_system_text(&ctx); @@ -684,6 +686,7 @@ mod tests { inactivity_timeout_secs: 300, mcp_port: 3001, session_id_to_resume: None, + fresh_prompt: None, }; assert!(ctx.command.starts_with("gpt")); } @@ -700,6 +703,7 @@ mod tests { inactivity_timeout_secs: 300, mcp_port: 3001, session_id_to_resume: None, + fresh_prompt: None, }; assert!(ctx.command.starts_with("o")); } diff --git a/server/src/agents/session_store.rs b/server/src/agents/session_store.rs new file mode 100644 index 00000000..0a3f2c03 --- /dev/null +++ b/server/src/agents/session_store.rs @@ -0,0 +1,221 @@ +//! Persistent session store — tracks the last Claude Code session_id per +//! (story_id, agent_name, model) triple so respawned agents can resume prior reasoning. +//! +//! The session_id is extracted from the `Done. Session: Some()` log entry +//! emitted at agent shutdown. When the same (story, agent, model) triple is +//! spawned again, the orchestrator passes `--resume ` so the new +//! session inherits the prior conversation context. +//! +//! Model is part of the key intentionally: resuming across models is not +//! supported (e.g. opus should not resume a sonnet session). + +use std::collections::HashMap; +use std::path::Path; + +/// Composite key for the session store: `{story_id}:{agent_name}:{model}`. +fn session_key(story_id: &str, agent_name: &str, model: &str) -> String { + format!("{story_id}:{agent_name}:{model}") +} + +/// Path to the persistent session store file. +fn store_path(project_root: &Path) -> std::path::PathBuf { + project_root.join(".huskies/session_store.json") +} + +/// Read the session store from disk. Returns an empty map if the file is +/// missing, empty, or corrupt. +fn read_store(project_root: &Path) -> HashMap { + let path = store_path(project_root); + std::fs::read_to_string(path) + .ok() + .and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or_default() +} + +/// Write the session store to disk. Silently ignores write errors (the store +/// is best-effort — a missed resume just means a fresh session starts). +fn write_store(project_root: &Path, data: &HashMap) { + let path = store_path(project_root); + if let Ok(json) = serde_json::to_string_pretty(data) { + let _ = std::fs::write(path, json); + } +} + +/// Record the session_id from a completed agent run, persisted to disk. +/// +/// Called after an agent process exits with a valid session_id. The next +/// spawn of the same (story_id, agent_name, model) triple will find this +/// session and pass `--resume `. +pub fn record_session( + project_root: &Path, + story_id: &str, + agent_name: &str, + model: &str, + session_id: &str, +) { + let key = session_key(story_id, agent_name, model); + let mut data = read_store(project_root); + data.insert(key, session_id.to_string()); + write_store(project_root, &data); +} + +/// Look up the last session_id for a (story_id, agent_name, model) triple. +/// +/// Returns `None` if no prior session exists (fresh story) or if the model +/// has changed (intentional — resuming across models is not supported). +pub fn lookup_session( + project_root: &Path, + story_id: &str, + agent_name: &str, + model: &str, +) -> Option { + let key = session_key(story_id, agent_name, model); + read_store(project_root).get(&key).cloned() +} + +/// Remove all session entries for a story (called when a story reaches done/archived). +#[cfg(test)] +pub fn remove_sessions_for_story(project_root: &Path, story_id: &str) { + let mut data = read_store(project_root); + let prefix = format!("{story_id}:"); + let before = data.len(); + data.retain(|k, _| !k.starts_with(&prefix)); + if data.len() < before { + write_store(project_root, &data); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // ── AC1: record and lookup round-trip ───────────────────────────────── + + #[test] + fn record_and_lookup_round_trip() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + std::fs::create_dir_all(root.join(".huskies")).unwrap(); + + record_session(root, "42_story_foo", "coder-1", "sonnet", "sess-abc"); + + let result = lookup_session(root, "42_story_foo", "coder-1", "sonnet"); + assert_eq!(result, Some("sess-abc".to_string())); + } + + #[test] + fn lookup_returns_none_for_unknown_story() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + std::fs::create_dir_all(root.join(".huskies")).unwrap(); + + let result = lookup_session(root, "unknown_story", "coder-1", "sonnet"); + assert_eq!(result, None); + } + + // ── AC3: model change semantics ─────────────────────────────────────── + + /// When an operator escalates from sonnet to opus, the new opus spawn + /// must NOT resume the prior sonnet session. The key includes the model, + /// so a different model produces a cache miss. + #[test] + fn model_change_does_not_resume_prior_session() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + std::fs::create_dir_all(root.join(".huskies")).unwrap(); + + // Record a sonnet session. + record_session(root, "42_story_foo", "coder-1", "sonnet", "sess-sonnet"); + + // Looking up with opus model returns None — no resume. + let result = lookup_session(root, "42_story_foo", "coder-1", "opus"); + assert_eq!(result, None, "opus must not resume a sonnet session"); + + // Looking up with sonnet still works. + let result = lookup_session(root, "42_story_foo", "coder-1", "sonnet"); + assert_eq!(result, Some("sess-sonnet".to_string())); + } + + // ── AC9: no prior session → fresh start ─────────────────────────────── + + #[test] + fn no_prior_session_returns_none() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + std::fs::create_dir_all(root.join(".huskies")).unwrap(); + + // Empty store — every lookup is None. + assert_eq!( + lookup_session(root, "99_story_new", "coder-1", "sonnet"), + None + ); + } + + // ── AC1: persistence across "restarts" (re-read from disk) ──────────── + + #[test] + fn session_survives_store_reload() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + std::fs::create_dir_all(root.join(".huskies")).unwrap(); + + record_session(root, "42_story_foo", "coder-1", "sonnet", "sess-persist"); + + // Simulate restart: create a fresh store read. + let result = lookup_session(root, "42_story_foo", "coder-1", "sonnet"); + assert_eq!(result, Some("sess-persist".to_string())); + } + + #[test] + fn record_overwrites_previous_session() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + std::fs::create_dir_all(root.join(".huskies")).unwrap(); + + record_session(root, "42_story_foo", "coder-1", "sonnet", "sess-old"); + record_session(root, "42_story_foo", "coder-1", "sonnet", "sess-new"); + + let result = lookup_session(root, "42_story_foo", "coder-1", "sonnet"); + assert_eq!(result, Some("sess-new".to_string())); + } + + #[test] + fn remove_sessions_for_story_cleans_up() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + std::fs::create_dir_all(root.join(".huskies")).unwrap(); + + record_session(root, "42_story_foo", "coder-1", "sonnet", "sess-a"); + record_session(root, "42_story_foo", "coder-2", "opus", "sess-b"); + record_session(root, "99_story_bar", "coder-1", "sonnet", "sess-c"); + + remove_sessions_for_story(root, "42_story_foo"); + + assert_eq!( + lookup_session(root, "42_story_foo", "coder-1", "sonnet"), + None + ); + assert_eq!( + lookup_session(root, "42_story_foo", "coder-2", "opus"), + None + ); + // Other story is untouched. + assert_eq!( + lookup_session(root, "99_story_bar", "coder-1", "sonnet"), + Some("sess-c".to_string()) + ); + } + + // ── AC1: corrupt/empty store file is handled gracefully ─────────────── + + #[test] + fn corrupt_store_file_returns_empty() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + std::fs::create_dir_all(root.join(".huskies")).unwrap(); + std::fs::write(root.join(".huskies/session_store.json"), "NOT JSON").unwrap(); + + let result = lookup_session(root, "42_story_foo", "coder-1", "sonnet"); + assert_eq!(result, None, "corrupt store should return None, not panic"); + } +}