huskies: merge 652_story_pass_resume_session_id_on_agent_respawn_so_new_sessions_inherit_prior_reasoning

This commit is contained in:
dave
2026-04-27 11:23:28 +00:00
parent 144f07f412
commit ac85cfce5d
8 changed files with 319 additions and 10 deletions
+1
View File
@@ -6,6 +6,7 @@ pub mod merge;
pub(crate) mod pool; pub(crate) mod pool;
pub(crate) mod pty; pub(crate) mod pty;
pub mod runtime; pub mod runtime;
pub mod session_store;
pub mod token_usage; pub mod token_usage;
use crate::config::AgentConfig; use crate::config::AgentConfig;
+17 -1
View File
@@ -292,6 +292,22 @@ impl AgentPool {
.map(|a| a.inactivity_timeout_secs) .map(|a| a.inactivity_timeout_secs)
.unwrap_or(300); .unwrap_or(300);
// If no explicit session_id_to_resume was provided, look up from the
// persistent session store. The key includes the model so a model
// change (e.g. sonnet → opus) produces a cache miss — intentional.
let effective_session_id = session_id_to_resume.or_else(|| {
let model = config
.find_agent(&resolved_name)
.and_then(|a| a.model.clone())
.unwrap_or_default();
crate::agents::session_store::lookup_session(
project_root,
story_id,
&resolved_name,
&model,
)
});
// Clone all values needed inside the background spawn. // Clone all values needed inside the background spawn.
// Spawn the background task. Worktree creation and agent launch happen here // Spawn the background task. Worktree creation and agent launch happen here
// so `start_agent` returns immediately after registering the agent as // so `start_agent` returns immediately after registering the agent as
@@ -300,7 +316,7 @@ impl AgentPool {
project_root.to_path_buf(), project_root.to_path_buf(),
config.clone(), config.clone(),
resume_context.map(str::to_string), resume_context.map(str::to_string),
session_id_to_resume, effective_session_id,
story_id.to_string(), story_id.to_string(),
resolved_name.clone(), resolved_name.clone(),
tx.clone(), tx.clone(),
+25 -3
View File
@@ -155,13 +155,17 @@ pub(super) async fn run_agent_spawn(
// (which would re-read CLAUDE.md and README) and send only the gate // (which would re-read CLAUDE.md and README) and send only the gate
// failure context as a new message. On a fresh start, append the // failure context as a new message. On a fresh start, append the
// failure context to the original prompt as before. // failure context to the original prompt as before.
let effective_prompt = match &session_id_to_resume_owned { let (effective_prompt, fresh_prompt) = match &session_id_to_resume_owned {
Some(_) => resume_context_owned.unwrap_or_default(), Some(_) => {
// Keep the full rendered prompt as fallback if resume fails.
let fallback = prompt;
(resume_context_owned.unwrap_or_default(), Some(fallback))
}
None => { None => {
if let Some(ctx) = resume_context_owned { if let Some(ctx) = resume_context_owned {
prompt.push_str(&ctx); prompt.push_str(&ctx);
} }
prompt (prompt, None)
} }
}; };
@@ -200,6 +204,7 @@ pub(super) async fn run_agent_spawn(
inactivity_timeout_secs, inactivity_timeout_secs,
mcp_port: port_for_task, mcp_port: port_for_task,
session_id_to_resume: session_id_to_resume_owned.clone(), session_id_to_resume: session_id_to_resume_owned.clone(),
fresh_prompt: fresh_prompt.clone(),
}; };
runtime runtime
.start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone)
@@ -217,6 +222,7 @@ pub(super) async fn run_agent_spawn(
inactivity_timeout_secs, inactivity_timeout_secs,
mcp_port: port_for_task, mcp_port: port_for_task,
session_id_to_resume: session_id_to_resume_owned.clone(), session_id_to_resume: session_id_to_resume_owned.clone(),
fresh_prompt: fresh_prompt.clone(),
}; };
runtime runtime
.start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone)
@@ -234,6 +240,7 @@ pub(super) async fn run_agent_spawn(
inactivity_timeout_secs, inactivity_timeout_secs,
mcp_port: port_for_task, mcp_port: port_for_task,
session_id_to_resume: session_id_to_resume_owned, session_id_to_resume: session_id_to_resume_owned,
fresh_prompt,
}; };
runtime runtime
.start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone) .start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone)
@@ -266,6 +273,21 @@ pub(super) async fn run_agent_spawn(
} }
} }
// Persist session_id so respawns can resume prior reasoning.
if let Some(ref sess_id) = result.session_id {
let model = config_clone
.find_agent(&aname)
.and_then(|a| a.model.clone())
.unwrap_or_default();
crate::agents::session_store::record_session(
&project_root_clone,
&sid,
&aname,
&model,
sess_id,
);
}
// Mergemaster agents have their own completion path via // Mergemaster agents have their own completion path via
// start_merge_agent_work / run_merge_pipeline and must NOT go // start_merge_agent_work / run_merge_pipeline and must NOT go
// through server-owned gates. When a mergemaster exits early // through server-owned gates. When a mergemaster exits early
+38 -4
View File
@@ -7,6 +7,7 @@ use tokio::sync::broadcast;
use crate::agent_log::AgentLogWriter; use crate::agent_log::AgentLogWriter;
use crate::io::watcher::WatcherEvent; use crate::io::watcher::WatcherEvent;
use crate::slog_warn;
use super::{AgentEvent, AgentRuntime, RuntimeContext, RuntimeResult, RuntimeStatus}; use super::{AgentEvent, AgentRuntime, RuntimeContext, RuntimeResult, RuntimeStatus};
@@ -49,19 +50,52 @@ impl AgentRuntime for ClaudeCodeRuntime {
&ctx.cwd, &ctx.cwd,
&tx, &tx,
&event_log, &event_log,
log_writer, log_writer.clone(),
ctx.inactivity_timeout_secs, ctx.inactivity_timeout_secs,
Arc::clone(&self.child_killers), Arc::clone(&self.child_killers),
self.watcher_tx.clone(), self.watcher_tx.clone(),
ctx.session_id_to_resume.as_deref(), ctx.session_id_to_resume.as_deref(),
) )
.await?; .await;
match pty_result {
Ok(result) => Ok(RuntimeResult {
session_id: result.session_id,
token_usage: result.token_usage,
}),
Err(e) if ctx.session_id_to_resume.is_some() && ctx.fresh_prompt.is_some() => {
// Resume failed — fall back to a fresh session without --resume.
slog_warn!(
"[agents] Resume failed for {}:{}, retrying without --resume: {}",
ctx.story_id,
ctx.agent_name,
e
);
let fresh = ctx.fresh_prompt.unwrap();
let fallback_result = super::super::pty::run_agent_pty_streaming(
&ctx.story_id,
&ctx.agent_name,
&ctx.command,
&ctx.args,
&fresh,
&ctx.cwd,
&tx,
&event_log,
log_writer,
ctx.inactivity_timeout_secs,
Arc::clone(&self.child_killers),
self.watcher_tx.clone(),
None, // no --resume on fallback
)
.await?;
Ok(RuntimeResult { Ok(RuntimeResult {
session_id: pty_result.session_id, session_id: fallback_result.session_id,
token_usage: pty_result.token_usage, token_usage: fallback_result.token_usage,
}) })
} }
Err(e) => Err(e),
}
}
fn stop(&self) { fn stop(&self) {
// Stopping is handled externally by the pool via kill_child_for_key(). // Stopping is handled externally by the pool via kill_child_for_key().
+3
View File
@@ -705,6 +705,7 @@ mod tests {
inactivity_timeout_secs: 300, inactivity_timeout_secs: 300,
mcp_port: 3001, mcp_port: 3001,
session_id_to_resume: None, session_id_to_resume: None,
fresh_prompt: None,
}; };
let instruction = build_system_instruction(&ctx); let instruction = build_system_instruction(&ctx);
@@ -723,6 +724,7 @@ mod tests {
inactivity_timeout_secs: 300, inactivity_timeout_secs: 300,
mcp_port: 3001, mcp_port: 3001,
session_id_to_resume: None, session_id_to_resume: None,
fresh_prompt: None,
}; };
let instruction = build_system_instruction(&ctx); let instruction = build_system_instruction(&ctx);
@@ -803,6 +805,7 @@ mod tests {
inactivity_timeout_secs: 300, inactivity_timeout_secs: 300,
mcp_port: 3001, mcp_port: 3001,
session_id_to_resume: None, session_id_to_resume: None,
fresh_prompt: None,
}; };
// The model extraction logic is inside start(), but we test the // The model extraction logic is inside start(), but we test the
+8
View File
@@ -32,6 +32,13 @@ pub struct RuntimeContext {
/// than `claude -p <full_prompt>`. The agent re-enters the previous /// than `claude -p <full_prompt>`. The agent re-enters the previous
/// conversation and receives the `prompt` (if non-empty) as a new message. /// conversation and receives the `prompt` (if non-empty) as a new message.
pub session_id_to_resume: Option<String>, pub session_id_to_resume: Option<String>,
/// Full rendered prompt for a fresh session, kept as fallback if resume fails.
///
/// When `session_id_to_resume` is `Some`, `prompt` contains only the
/// resume context (e.g. gate failure output). If the CLI rejects the
/// resume (session expired, file missing, version mismatch), the runtime
/// retries with this full prompt and no `--resume` flag.
pub fresh_prompt: Option<String>,
} }
/// Result returned by a runtime after the agent session completes. /// Result returned by a runtime after the agent session completes.
@@ -101,6 +108,7 @@ mod tests {
inactivity_timeout_secs: 300, inactivity_timeout_secs: 300,
mcp_port: 3001, mcp_port: 3001,
session_id_to_resume: None, session_id_to_resume: None,
fresh_prompt: None,
}; };
assert_eq!(ctx.story_id, "42_story_foo"); assert_eq!(ctx.story_id, "42_story_foo");
assert_eq!(ctx.agent_name, "coder-1"); assert_eq!(ctx.agent_name, "coder-1");
+4
View File
@@ -617,6 +617,7 @@ mod tests {
inactivity_timeout_secs: 300, inactivity_timeout_secs: 300,
mcp_port: 3001, mcp_port: 3001,
session_id_to_resume: None, session_id_to_resume: None,
fresh_prompt: None,
}; };
assert_eq!(build_system_text(&ctx), "Custom system prompt"); assert_eq!(build_system_text(&ctx), "Custom system prompt");
@@ -634,6 +635,7 @@ mod tests {
inactivity_timeout_secs: 300, inactivity_timeout_secs: 300,
mcp_port: 3001, mcp_port: 3001,
session_id_to_resume: None, session_id_to_resume: None,
fresh_prompt: None,
}; };
let text = build_system_text(&ctx); let text = build_system_text(&ctx);
@@ -684,6 +686,7 @@ mod tests {
inactivity_timeout_secs: 300, inactivity_timeout_secs: 300,
mcp_port: 3001, mcp_port: 3001,
session_id_to_resume: None, session_id_to_resume: None,
fresh_prompt: None,
}; };
assert!(ctx.command.starts_with("gpt")); assert!(ctx.command.starts_with("gpt"));
} }
@@ -700,6 +703,7 @@ mod tests {
inactivity_timeout_secs: 300, inactivity_timeout_secs: 300,
mcp_port: 3001, mcp_port: 3001,
session_id_to_resume: None, session_id_to_resume: None,
fresh_prompt: None,
}; };
assert!(ctx.command.starts_with("o")); assert!(ctx.command.starts_with("o"));
} }
+221
View File
@@ -0,0 +1,221 @@
//! Persistent session store — tracks the last Claude Code session_id per
//! (story_id, agent_name, model) triple so respawned agents can resume prior reasoning.
//!
//! The session_id is extracted from the `Done. Session: Some(<uuid>)` log entry
//! emitted at agent shutdown. When the same (story, agent, model) triple is
//! spawned again, the orchestrator passes `--resume <session_id>` so the new
//! session inherits the prior conversation context.
//!
//! Model is part of the key intentionally: resuming across models is not
//! supported (e.g. opus should not resume a sonnet session).
use std::collections::HashMap;
use std::path::Path;
/// Composite key for the session store: `{story_id}:{agent_name}:{model}`.
fn session_key(story_id: &str, agent_name: &str, model: &str) -> String {
format!("{story_id}:{agent_name}:{model}")
}
/// Path to the persistent session store file.
fn store_path(project_root: &Path) -> std::path::PathBuf {
project_root.join(".huskies/session_store.json")
}
/// Read the session store from disk. Returns an empty map if the file is
/// missing, empty, or corrupt.
fn read_store(project_root: &Path) -> HashMap<String, String> {
let path = store_path(project_root);
std::fs::read_to_string(path)
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or_default()
}
/// Write the session store to disk. Silently ignores write errors (the store
/// is best-effort — a missed resume just means a fresh session starts).
fn write_store(project_root: &Path, data: &HashMap<String, String>) {
let path = store_path(project_root);
if let Ok(json) = serde_json::to_string_pretty(data) {
let _ = std::fs::write(path, json);
}
}
/// Record the session_id from a completed agent run, persisted to disk.
///
/// Called after an agent process exits with a valid session_id. The next
/// spawn of the same (story_id, agent_name, model) triple will find this
/// session and pass `--resume <session_id>`.
pub fn record_session(
project_root: &Path,
story_id: &str,
agent_name: &str,
model: &str,
session_id: &str,
) {
let key = session_key(story_id, agent_name, model);
let mut data = read_store(project_root);
data.insert(key, session_id.to_string());
write_store(project_root, &data);
}
/// Look up the last session_id for a (story_id, agent_name, model) triple.
///
/// Returns `None` if no prior session exists (fresh story) or if the model
/// has changed (intentional — resuming across models is not supported).
pub fn lookup_session(
project_root: &Path,
story_id: &str,
agent_name: &str,
model: &str,
) -> Option<String> {
let key = session_key(story_id, agent_name, model);
read_store(project_root).get(&key).cloned()
}
/// Remove all session entries for a story (called when a story reaches done/archived).
#[cfg(test)]
pub fn remove_sessions_for_story(project_root: &Path, story_id: &str) {
let mut data = read_store(project_root);
let prefix = format!("{story_id}:");
let before = data.len();
data.retain(|k, _| !k.starts_with(&prefix));
if data.len() < before {
write_store(project_root, &data);
}
}
#[cfg(test)]
mod tests {
use super::*;
// ── AC1: record and lookup round-trip ─────────────────────────────────
#[test]
fn record_and_lookup_round_trip() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
std::fs::create_dir_all(root.join(".huskies")).unwrap();
record_session(root, "42_story_foo", "coder-1", "sonnet", "sess-abc");
let result = lookup_session(root, "42_story_foo", "coder-1", "sonnet");
assert_eq!(result, Some("sess-abc".to_string()));
}
#[test]
fn lookup_returns_none_for_unknown_story() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
std::fs::create_dir_all(root.join(".huskies")).unwrap();
let result = lookup_session(root, "unknown_story", "coder-1", "sonnet");
assert_eq!(result, None);
}
// ── AC3: model change semantics ───────────────────────────────────────
/// When an operator escalates from sonnet to opus, the new opus spawn
/// must NOT resume the prior sonnet session. The key includes the model,
/// so a different model produces a cache miss.
#[test]
fn model_change_does_not_resume_prior_session() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
std::fs::create_dir_all(root.join(".huskies")).unwrap();
// Record a sonnet session.
record_session(root, "42_story_foo", "coder-1", "sonnet", "sess-sonnet");
// Looking up with opus model returns None — no resume.
let result = lookup_session(root, "42_story_foo", "coder-1", "opus");
assert_eq!(result, None, "opus must not resume a sonnet session");
// Looking up with sonnet still works.
let result = lookup_session(root, "42_story_foo", "coder-1", "sonnet");
assert_eq!(result, Some("sess-sonnet".to_string()));
}
// ── AC9: no prior session → fresh start ───────────────────────────────
#[test]
fn no_prior_session_returns_none() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
std::fs::create_dir_all(root.join(".huskies")).unwrap();
// Empty store — every lookup is None.
assert_eq!(
lookup_session(root, "99_story_new", "coder-1", "sonnet"),
None
);
}
// ── AC1: persistence across "restarts" (re-read from disk) ────────────
#[test]
fn session_survives_store_reload() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
std::fs::create_dir_all(root.join(".huskies")).unwrap();
record_session(root, "42_story_foo", "coder-1", "sonnet", "sess-persist");
// Simulate restart: create a fresh store read.
let result = lookup_session(root, "42_story_foo", "coder-1", "sonnet");
assert_eq!(result, Some("sess-persist".to_string()));
}
#[test]
fn record_overwrites_previous_session() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
std::fs::create_dir_all(root.join(".huskies")).unwrap();
record_session(root, "42_story_foo", "coder-1", "sonnet", "sess-old");
record_session(root, "42_story_foo", "coder-1", "sonnet", "sess-new");
let result = lookup_session(root, "42_story_foo", "coder-1", "sonnet");
assert_eq!(result, Some("sess-new".to_string()));
}
#[test]
fn remove_sessions_for_story_cleans_up() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
std::fs::create_dir_all(root.join(".huskies")).unwrap();
record_session(root, "42_story_foo", "coder-1", "sonnet", "sess-a");
record_session(root, "42_story_foo", "coder-2", "opus", "sess-b");
record_session(root, "99_story_bar", "coder-1", "sonnet", "sess-c");
remove_sessions_for_story(root, "42_story_foo");
assert_eq!(
lookup_session(root, "42_story_foo", "coder-1", "sonnet"),
None
);
assert_eq!(
lookup_session(root, "42_story_foo", "coder-2", "opus"),
None
);
// Other story is untouched.
assert_eq!(
lookup_session(root, "99_story_bar", "coder-1", "sonnet"),
Some("sess-c".to_string())
);
}
// ── AC1: corrupt/empty store file is handled gracefully ───────────────
#[test]
fn corrupt_store_file_returns_empty() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
std::fs::create_dir_all(root.join(".huskies")).unwrap();
std::fs::write(root.join(".huskies/session_store.json"), "NOT JSON").unwrap();
let result = lookup_session(root, "42_story_foo", "coder-1", "sonnet");
assert_eq!(result, None, "corrupt store should return None, not panic");
}
}