huskies: merge 967
This commit is contained in:
@@ -839,6 +839,21 @@ mod tests {
|
|||||||
"[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
|
"[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n",
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
// The spawn path calls `git worktree add` — the tempdir must be a real
|
||||||
|
// git repo with at least one commit or it fails with "not a git repo".
|
||||||
|
for args in [
|
||||||
|
&["init"][..],
|
||||||
|
&["config", "user.email", "test@test.com"],
|
||||||
|
&["config", "user.name", "Test"],
|
||||||
|
&["commit", "--allow-empty", "-m", "init"],
|
||||||
|
] {
|
||||||
|
std::process::Command::new("git")
|
||||||
|
.args(args)
|
||||||
|
.current_dir(&root)
|
||||||
|
.output()
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
crate::crdt_state::init_for_test();
|
crate::crdt_state::init_for_test();
|
||||||
crate::db::ensure_content_store();
|
crate::db::ensure_content_store();
|
||||||
crate::db::write_item_with_content(
|
crate::db::write_item_with_content(
|
||||||
|
|||||||
@@ -320,6 +320,13 @@ pub(super) async fn run_agent_spawn(
|
|||||||
.and_then(|a| a.runtime.as_deref())
|
.and_then(|a| a.runtime.as_deref())
|
||||||
.unwrap_or("claude-code");
|
.unwrap_or("claude-code");
|
||||||
|
|
||||||
|
// Extract model once so it can be shared across all runtime branches and
|
||||||
|
// passed to RuntimeContext for eager session recording (bug 967).
|
||||||
|
let agent_model = config_clone
|
||||||
|
.find_agent(&aname)
|
||||||
|
.and_then(|a| a.model.clone())
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
let run_result = match runtime_name {
|
let run_result = match runtime_name {
|
||||||
"claude-code" => {
|
"claude-code" => {
|
||||||
let runtime =
|
let runtime =
|
||||||
@@ -335,6 +342,8 @@ pub(super) async fn run_agent_spawn(
|
|||||||
app_ctx: app_ctx.clone(),
|
app_ctx: app_ctx.clone(),
|
||||||
session_id_to_resume: session_id_to_resume_owned.clone(),
|
session_id_to_resume: session_id_to_resume_owned.clone(),
|
||||||
fresh_prompt: fresh_prompt.clone(),
|
fresh_prompt: fresh_prompt.clone(),
|
||||||
|
project_root: project_root_clone.clone(),
|
||||||
|
model: agent_model.clone(),
|
||||||
};
|
};
|
||||||
runtime
|
runtime
|
||||||
.start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone)
|
.start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone)
|
||||||
@@ -353,6 +362,8 @@ pub(super) async fn run_agent_spawn(
|
|||||||
app_ctx: app_ctx.clone(),
|
app_ctx: app_ctx.clone(),
|
||||||
session_id_to_resume: session_id_to_resume_owned.clone(),
|
session_id_to_resume: session_id_to_resume_owned.clone(),
|
||||||
fresh_prompt: fresh_prompt.clone(),
|
fresh_prompt: fresh_prompt.clone(),
|
||||||
|
project_root: project_root_clone.clone(),
|
||||||
|
model: agent_model.clone(),
|
||||||
};
|
};
|
||||||
runtime
|
runtime
|
||||||
.start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone)
|
.start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone)
|
||||||
@@ -371,6 +382,8 @@ pub(super) async fn run_agent_spawn(
|
|||||||
app_ctx: app_ctx.clone(),
|
app_ctx: app_ctx.clone(),
|
||||||
session_id_to_resume: session_id_to_resume_owned,
|
session_id_to_resume: session_id_to_resume_owned,
|
||||||
fresh_prompt,
|
fresh_prompt,
|
||||||
|
project_root: project_root_clone.clone(),
|
||||||
|
model: agent_model,
|
||||||
};
|
};
|
||||||
runtime
|
runtime
|
||||||
.start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone)
|
.start(ctx, tx_clone.clone(), log_clone.clone(), log_writer_clone)
|
||||||
|
|||||||
@@ -59,6 +59,7 @@ mod tests {
|
|||||||
child_killers,
|
child_killers,
|
||||||
watcher_tx,
|
watcher_tx,
|
||||||
None,
|
None,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -113,6 +114,7 @@ mod tests {
|
|||||||
child_killers,
|
child_killers,
|
||||||
watcher_tx,
|
watcher_tx,
|
||||||
None,
|
None,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -175,6 +177,7 @@ mod tests {
|
|||||||
child_killers,
|
child_killers,
|
||||||
watcher_tx,
|
watcher_tx,
|
||||||
None,
|
None,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
let after = chrono::Utc::now();
|
let after = chrono::Utc::now();
|
||||||
@@ -242,6 +245,7 @@ mod tests {
|
|||||||
child_killers,
|
child_killers,
|
||||||
watcher_tx,
|
watcher_tx,
|
||||||
None,
|
None,
|
||||||
|
None,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -373,4 +377,84 @@ mod tests {
|
|||||||
|
|
||||||
assert!(rx.try_recv().is_err());
|
assert!(rx.try_recv().is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── bug 967: eager session recording survives watchdog kill + task abort ──
|
||||||
|
|
||||||
|
/// AC2 regression: simulates a watchdog kill of an agent that emitted a
|
||||||
|
/// session_id mid-run. The script emits a `"system"` JSON event and then
|
||||||
|
/// sleeps; a concurrent task kills the child after 500 ms (simulating the
|
||||||
|
/// watchdog). The eager-recording path in `run_agent_pty_blocking` must
|
||||||
|
/// have already persisted the session_id before the kill, so
|
||||||
|
/// `lookup_session` returns it (warm) rather than `None` (cold).
|
||||||
|
#[tokio::test]
|
||||||
|
async fn watchdog_kill_session_id_survives_abort() {
|
||||||
|
use std::os::unix::fs::PermissionsExt;
|
||||||
|
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let project_root = tmp.path().to_path_buf();
|
||||||
|
std::fs::create_dir_all(project_root.join(".huskies")).unwrap();
|
||||||
|
|
||||||
|
// Script emits a system event immediately, then sleeps so the process
|
||||||
|
// stays alive long enough for us to kill it (simulating the watchdog).
|
||||||
|
let script = tmp.path().join("emit_then_sleep.sh");
|
||||||
|
std::fs::write(
|
||||||
|
&script,
|
||||||
|
"#!/bin/sh\nprintf '%s\\n' '{\"type\":\"system\",\"session_id\":\"sess-967-watchdog\"}'\nsleep 60\n",
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
|
||||||
|
|
||||||
|
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
|
||||||
|
let (watcher_tx, _watcher_rx) = broadcast::channel::<WatcherEvent>(16);
|
||||||
|
let event_log = Arc::new(Mutex::new(Vec::new()));
|
||||||
|
let child_killers: Arc<
|
||||||
|
Mutex<HashMap<String, Box<dyn portable_pty::ChildKiller + Send + Sync>>>,
|
||||||
|
> = Arc::new(Mutex::new(HashMap::new()));
|
||||||
|
let child_killers_for_kill = Arc::clone(&child_killers);
|
||||||
|
|
||||||
|
// Spawn a task to kill the child after a short delay (simulating watchdog).
|
||||||
|
tokio::spawn(async move {
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
|
||||||
|
if let Ok(mut killers) = child_killers_for_kill.lock() {
|
||||||
|
for (_, killer) in killers.iter_mut() {
|
||||||
|
let _ = killer.kill();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Run the PTY directly — it returns once the child is killed.
|
||||||
|
let script_arg = script.to_string_lossy().to_string();
|
||||||
|
let _ = run_agent_pty_streaming(
|
||||||
|
"967_story_watchdog",
|
||||||
|
"coder-1",
|
||||||
|
"sh",
|
||||||
|
&[script_arg],
|
||||||
|
"--",
|
||||||
|
"/tmp",
|
||||||
|
&tx,
|
||||||
|
&event_log,
|
||||||
|
None,
|
||||||
|
0, // no inactivity timeout
|
||||||
|
child_killers,
|
||||||
|
watcher_tx,
|
||||||
|
None, // no session to resume
|
||||||
|
Some((project_root.clone(), "sonnet".to_string())),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// The session_id must be in the store (eagerly recorded when the
|
||||||
|
// "system" event was seen, before the kill).
|
||||||
|
let recorded = crate::agents::session_store::lookup_session(
|
||||||
|
&project_root,
|
||||||
|
"967_story_watchdog",
|
||||||
|
"coder-1",
|
||||||
|
"sonnet",
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
recorded,
|
||||||
|
Some("sess-967-watchdog".to_string()),
|
||||||
|
"session_id must be recorded eagerly before the watchdog kill so \
|
||||||
|
the respawn's lookup_session returns it (warm), not None (cold)"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,6 +33,16 @@ use super::types::{ChildKillerGuard, PtyResult, composite_key};
|
|||||||
/// If the agent committed valid work before crashing, the "work survived" check
|
/// If the agent committed valid work before crashing, the "work survived" check
|
||||||
/// in `pipeline::advance` detects the committed code and advances the story to
|
/// in `pipeline::advance` detects the committed code and advances the story to
|
||||||
/// QA instead of entering the retry/block path.
|
/// QA instead of entering the retry/block path.
|
||||||
|
///
|
||||||
|
/// ## `eager_record` — watchdog-kill race fix (bug 967)
|
||||||
|
///
|
||||||
|
/// When `Some((project_root, model))` is passed, the blocking thread calls
|
||||||
|
/// `session_store::record_session()` immediately when the `"system"` JSON event
|
||||||
|
/// is parsed. This runs inside the OS blocking thread, which cannot be
|
||||||
|
/// cancelled by a tokio task abort. If the watchdog later kills the PTY child
|
||||||
|
/// and aborts the spawned tokio task, the session_id is already persisted and
|
||||||
|
/// the respawn's `lookup_session()` returns it (warm start instead of cold).
|
||||||
|
/// Pass `None` when session persistence is not needed (e.g. in tests).
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub(in crate::agents) async fn run_agent_pty_streaming(
|
pub(in crate::agents) async fn run_agent_pty_streaming(
|
||||||
story_id: &str,
|
story_id: &str,
|
||||||
@@ -48,6 +58,7 @@ pub(in crate::agents) async fn run_agent_pty_streaming(
|
|||||||
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
||||||
watcher_tx: broadcast::Sender<WatcherEvent>,
|
watcher_tx: broadcast::Sender<WatcherEvent>,
|
||||||
session_id_to_resume: Option<&str>,
|
session_id_to_resume: Option<&str>,
|
||||||
|
eager_record: Option<(std::path::PathBuf, String)>,
|
||||||
) -> Result<PtyResult, String> {
|
) -> Result<PtyResult, String> {
|
||||||
let sid = story_id.to_string();
|
let sid = story_id.to_string();
|
||||||
let aname = agent_name.to_string();
|
let aname = agent_name.to_string();
|
||||||
@@ -74,6 +85,7 @@ pub(in crate::agents) async fn run_agent_pty_streaming(
|
|||||||
&child_killers,
|
&child_killers,
|
||||||
&watcher_tx,
|
&watcher_tx,
|
||||||
resume_sid.as_deref(),
|
resume_sid.as_deref(),
|
||||||
|
eager_record,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
@@ -95,6 +107,7 @@ fn run_agent_pty_blocking(
|
|||||||
child_killers: &Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
child_killers: &Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
|
||||||
watcher_tx: &broadcast::Sender<WatcherEvent>,
|
watcher_tx: &broadcast::Sender<WatcherEvent>,
|
||||||
session_id_to_resume: Option<&str>,
|
session_id_to_resume: Option<&str>,
|
||||||
|
eager_record: Option<(std::path::PathBuf, String)>,
|
||||||
) -> Result<PtyResult, String> {
|
) -> Result<PtyResult, String> {
|
||||||
let pty_system = native_pty_system();
|
let pty_system = native_pty_system();
|
||||||
|
|
||||||
@@ -319,6 +332,15 @@ fn run_agent_pty_blocking(
|
|||||||
.get("session_id")
|
.get("session_id")
|
||||||
.and_then(|s| s.as_str())
|
.and_then(|s| s.as_str())
|
||||||
.map(|s| s.to_string());
|
.map(|s| s.to_string());
|
||||||
|
// Eagerly persist the session_id so it survives a watchdog kill
|
||||||
|
// that aborts the tokio task before run_agent_spawn's
|
||||||
|
// record_session() call (bug 967). Runs in the OS blocking
|
||||||
|
// thread — not cancellable by tokio task abort.
|
||||||
|
if let (Some(sid), Some((root, model))) = (&session_id, &eager_record) {
|
||||||
|
crate::agents::session_store::record_session(
|
||||||
|
root, story_id, agent_name, model, sid,
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// With --include-partial-messages, thinking and text arrive
|
// With --include-partial-messages, thinking and text arrive
|
||||||
// incrementally via stream_event → content_block_delta. Handle
|
// incrementally via stream_event → content_block_delta. Handle
|
||||||
|
|||||||
@@ -42,6 +42,11 @@ impl AgentRuntime for ClaudeCodeRuntime {
|
|||||||
event_log: Arc<Mutex<Vec<AgentEvent>>>,
|
event_log: Arc<Mutex<Vec<AgentEvent>>>,
|
||||||
log_writer: Option<Arc<Mutex<AgentLogWriter>>>,
|
log_writer: Option<Arc<Mutex<AgentLogWriter>>>,
|
||||||
) -> Result<RuntimeResult, String> {
|
) -> Result<RuntimeResult, String> {
|
||||||
|
let eager_record = if ctx.model.is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some((ctx.project_root.clone(), ctx.model.clone()))
|
||||||
|
};
|
||||||
let pty_result = super::super::pty::run_agent_pty_streaming(
|
let pty_result = super::super::pty::run_agent_pty_streaming(
|
||||||
&ctx.story_id,
|
&ctx.story_id,
|
||||||
&ctx.agent_name,
|
&ctx.agent_name,
|
||||||
@@ -56,6 +61,7 @@ impl AgentRuntime for ClaudeCodeRuntime {
|
|||||||
Arc::clone(&self.child_killers),
|
Arc::clone(&self.child_killers),
|
||||||
self.watcher_tx.clone(),
|
self.watcher_tx.clone(),
|
||||||
ctx.session_id_to_resume.as_deref(),
|
ctx.session_id_to_resume.as_deref(),
|
||||||
|
eager_record.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -90,6 +96,7 @@ impl AgentRuntime for ClaudeCodeRuntime {
|
|||||||
Arc::clone(&self.child_killers),
|
Arc::clone(&self.child_killers),
|
||||||
self.watcher_tx.clone(),
|
self.watcher_tx.clone(),
|
||||||
None, // no --resume on fallback
|
None, // no --resume on fallback
|
||||||
|
eager_record,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(RuntimeResult {
|
Ok(RuntimeResult {
|
||||||
|
|||||||
@@ -117,6 +117,8 @@ mod tests {
|
|||||||
app_ctx: Some(test_app_ctx()),
|
app_ctx: Some(test_app_ctx()),
|
||||||
session_id_to_resume: None,
|
session_id_to_resume: None,
|
||||||
fresh_prompt: None,
|
fresh_prompt: None,
|
||||||
|
project_root: std::path::PathBuf::from("/tmp/project"),
|
||||||
|
model: String::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let instruction = build_system_instruction(&ctx);
|
let instruction = build_system_instruction(&ctx);
|
||||||
@@ -136,6 +138,8 @@ mod tests {
|
|||||||
app_ctx: Some(test_app_ctx()),
|
app_ctx: Some(test_app_ctx()),
|
||||||
session_id_to_resume: None,
|
session_id_to_resume: None,
|
||||||
fresh_prompt: None,
|
fresh_prompt: None,
|
||||||
|
project_root: std::path::PathBuf::from("/tmp/project"),
|
||||||
|
model: String::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let instruction = build_system_instruction(&ctx);
|
let instruction = build_system_instruction(&ctx);
|
||||||
|
|||||||
@@ -384,6 +384,8 @@ mod tests {
|
|||||||
app_ctx: Some(test_app_ctx()),
|
app_ctx: Some(test_app_ctx()),
|
||||||
session_id_to_resume: None,
|
session_id_to_resume: None,
|
||||||
fresh_prompt: None,
|
fresh_prompt: None,
|
||||||
|
project_root: std::path::PathBuf::from("/tmp/project"),
|
||||||
|
model: String::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// The model extraction logic is inside start(), but we test the
|
// The model extraction logic is inside start(), but we test the
|
||||||
|
|||||||
@@ -41,6 +41,14 @@ pub struct RuntimeContext {
|
|||||||
/// resume (session expired, file missing, version mismatch), the runtime
|
/// resume (session expired, file missing, version mismatch), the runtime
|
||||||
/// retries with this full prompt and no `--resume` flag.
|
/// retries with this full prompt and no `--resume` flag.
|
||||||
pub fresh_prompt: Option<String>,
|
pub fresh_prompt: Option<String>,
|
||||||
|
/// Project root path — passed to the PTY runner so it can eagerly record
|
||||||
|
/// the session_id as soon as the `"system"` event is seen (bug 967).
|
||||||
|
/// Eager recording ensures the session survives a watchdog kill that aborts
|
||||||
|
/// the tokio task before `run_agent_spawn`'s `record_session()` call runs.
|
||||||
|
pub project_root: std::path::PathBuf,
|
||||||
|
/// Agent model name — forms part of the session store key used for eager
|
||||||
|
/// recording (bug 967). An empty string disables eager recording.
|
||||||
|
pub model: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Result returned by a runtime after the agent session completes.
|
/// Result returned by a runtime after the agent session completes.
|
||||||
@@ -125,6 +133,8 @@ mod tests {
|
|||||||
app_ctx: Some(test_app_ctx()),
|
app_ctx: Some(test_app_ctx()),
|
||||||
session_id_to_resume: None,
|
session_id_to_resume: None,
|
||||||
fresh_prompt: None,
|
fresh_prompt: None,
|
||||||
|
project_root: std::path::PathBuf::from("/tmp/project"),
|
||||||
|
model: "sonnet".to_string(),
|
||||||
};
|
};
|
||||||
assert_eq!(ctx.story_id, "42_story_foo");
|
assert_eq!(ctx.story_id, "42_story_foo");
|
||||||
assert_eq!(ctx.agent_name, "coder-1");
|
assert_eq!(ctx.agent_name, "coder-1");
|
||||||
|
|||||||
@@ -560,6 +560,8 @@ mod tests {
|
|||||||
app_ctx: Some(test_app_ctx()),
|
app_ctx: Some(test_app_ctx()),
|
||||||
session_id_to_resume: None,
|
session_id_to_resume: None,
|
||||||
fresh_prompt: None,
|
fresh_prompt: None,
|
||||||
|
project_root: std::path::PathBuf::from("/tmp/project"),
|
||||||
|
model: String::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
assert_eq!(build_system_text(&ctx), "Custom system prompt");
|
assert_eq!(build_system_text(&ctx), "Custom system prompt");
|
||||||
@@ -578,6 +580,8 @@ mod tests {
|
|||||||
app_ctx: Some(test_app_ctx()),
|
app_ctx: Some(test_app_ctx()),
|
||||||
session_id_to_resume: None,
|
session_id_to_resume: None,
|
||||||
fresh_prompt: None,
|
fresh_prompt: None,
|
||||||
|
project_root: std::path::PathBuf::from("/tmp/project"),
|
||||||
|
model: String::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let text = build_system_text(&ctx);
|
let text = build_system_text(&ctx);
|
||||||
@@ -629,6 +633,8 @@ mod tests {
|
|||||||
app_ctx: Some(test_app_ctx()),
|
app_ctx: Some(test_app_ctx()),
|
||||||
session_id_to_resume: None,
|
session_id_to_resume: None,
|
||||||
fresh_prompt: None,
|
fresh_prompt: None,
|
||||||
|
project_root: std::path::PathBuf::from("/tmp/project"),
|
||||||
|
model: String::new(),
|
||||||
};
|
};
|
||||||
assert!(ctx.command.starts_with("gpt"));
|
assert!(ctx.command.starts_with("gpt"));
|
||||||
}
|
}
|
||||||
@@ -646,6 +652,8 @@ mod tests {
|
|||||||
app_ctx: Some(test_app_ctx()),
|
app_ctx: Some(test_app_ctx()),
|
||||||
session_id_to_resume: None,
|
session_id_to_resume: None,
|
||||||
fresh_prompt: None,
|
fresh_prompt: None,
|
||||||
|
project_root: std::path::PathBuf::from("/tmp/project"),
|
||||||
|
model: String::new(),
|
||||||
};
|
};
|
||||||
assert!(ctx.command.starts_with("o"));
|
assert!(ctx.command.starts_with("o"));
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user