story-kit: merge 132_story_fix_toctou_race_in_agent_check_and_insert
This commit is contained in:
@@ -259,9 +259,40 @@ impl AgentPool {
|
||||
|
||||
let key = composite_key(story_id, &resolved_name);
|
||||
|
||||
// Check not already running
|
||||
// Create shared resources before the atomic check-and-insert so the
|
||||
// agents lock is held continuously from the availability check through
|
||||
// the Pending insert, eliminating the TOCTOU race (story 132).
|
||||
let (tx, _) = broadcast::channel::<AgentEvent>(1024);
|
||||
|
||||
let event_log: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
// Generate a unique session ID for the persistent log file.
|
||||
let log_session_id = uuid::Uuid::new_v4().to_string();
|
||||
|
||||
// Create persistent log writer.
|
||||
let log_writer = match AgentLogWriter::new(
|
||||
project_root,
|
||||
story_id,
|
||||
&resolved_name,
|
||||
&log_session_id,
|
||||
) {
|
||||
Ok(w) => Some(Arc::new(Mutex::new(w))),
|
||||
Err(e) => {
|
||||
eprintln!("[agents] Failed to create log writer for {story_id}:{resolved_name}: {e}");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// Atomically check availability and register as Pending. The lock is
|
||||
// held continuously from the duplicate check through the HashMap insert
|
||||
// so no concurrent start_agent call can slip through the check before
|
||||
// this insert completes (fixes TOCTOU race, story 132).
|
||||
//
|
||||
// The `PendingGuard` ensures that if any step below fails the entry is
|
||||
// removed from the pool so it does not permanently block auto-assign
|
||||
// (bug 118).
|
||||
{
|
||||
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
||||
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
||||
if let Some(agent) = agents.get(&key)
|
||||
&& (agent.status == AgentStatus::Running || agent.status == AgentStatus::Pending)
|
||||
{
|
||||
@@ -293,34 +324,6 @@ impl AgentPool {
|
||||
story '{story_id}' will be picked up when the agent becomes available"
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let (tx, _) = broadcast::channel::<AgentEvent>(1024);
|
||||
|
||||
let event_log: Arc<Mutex<Vec<AgentEvent>>> = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
// Generate a unique session ID for the persistent log file.
|
||||
let log_session_id = uuid::Uuid::new_v4().to_string();
|
||||
|
||||
// Create persistent log writer.
|
||||
let log_writer = match AgentLogWriter::new(
|
||||
project_root,
|
||||
story_id,
|
||||
&resolved_name,
|
||||
&log_session_id,
|
||||
) {
|
||||
Ok(w) => Some(Arc::new(Mutex::new(w))),
|
||||
Err(e) => {
|
||||
eprintln!("[agents] Failed to create log writer for {story_id}:{resolved_name}: {e}");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// Register as pending. The `PendingGuard` ensures that if any
|
||||
// step below fails the entry is removed from the pool so it does
|
||||
// not permanently block auto-assign (bug 118).
|
||||
{
|
||||
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
||||
agents.insert(
|
||||
key.clone(),
|
||||
StoryAgent {
|
||||
@@ -4309,6 +4312,174 @@ name = "qa"
|
||||
);
|
||||
}
|
||||
|
||||
// ── TOCTOU race-condition regression tests (story 132) ───────────────────
|
||||
|
||||
/// Verify that a Pending entry (not just Running) blocks a concurrent
|
||||
/// start_agent for the same agent name on a different story. This proves
|
||||
/// the check-and-insert is atomic: the Pending entry is visible to the
|
||||
/// second caller because it was inserted while the lock was still held.
|
||||
#[tokio::test]
|
||||
async fn toctou_pending_entry_blocks_same_agent_on_different_story() {
|
||||
use std::fs;
|
||||
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
let sk_dir = root.join(".story_kit");
|
||||
fs::create_dir_all(&sk_dir).unwrap();
|
||||
fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"coder-1\"\n").unwrap();
|
||||
|
||||
let pool = AgentPool::new(3099);
|
||||
|
||||
// Simulate what the winning concurrent call would have done: insert a
|
||||
// Pending entry for coder-1 on story-86.
|
||||
pool.inject_test_agent("86_story_foo", "coder-1", AgentStatus::Pending);
|
||||
|
||||
// Now attempt to start coder-1 on a *different* story — must be rejected.
|
||||
let result = pool
|
||||
.start_agent(root, "130_story_bar", Some("coder-1"), None)
|
||||
.await;
|
||||
|
||||
assert!(result.is_err(), "second start_agent must be rejected");
|
||||
let err = result.unwrap_err();
|
||||
assert!(
|
||||
err.contains("already running") || err.contains("becomes available"),
|
||||
"expected concurrency-rejection message, got: '{err}'"
|
||||
);
|
||||
}
|
||||
|
||||
/// Concurrent start_agent calls for the same agent name on different stories
|
||||
/// must result in exactly one rejection due to the concurrency check (not
|
||||
/// due to an unrelated failure such as missing git repo).
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn toctou_concurrent_start_agent_same_agent_exactly_one_concurrency_rejection() {
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path().to_path_buf();
|
||||
|
||||
let sk_dir = root.join(".story_kit");
|
||||
fs::create_dir_all(sk_dir.join("work/1_upcoming")).unwrap();
|
||||
fs::write(
|
||||
root.join(".story_kit/project.toml"),
|
||||
"[[agent]]\nname = \"coder-1\"\n",
|
||||
)
|
||||
.unwrap();
|
||||
// Both stories must exist in upcoming so move_story_to_current can run
|
||||
// (only the winner reaches that point, but we set both up defensively).
|
||||
fs::write(
|
||||
root.join(".story_kit/work/1_upcoming/86_story_foo.md"),
|
||||
"---\nname: Foo\n---\n",
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
root.join(".story_kit/work/1_upcoming/130_story_bar.md"),
|
||||
"---\nname: Bar\n---\n",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let pool = Arc::new(AgentPool::new(3099));
|
||||
|
||||
let pool1 = pool.clone();
|
||||
let root1 = root.clone();
|
||||
let t1 = tokio::spawn(async move {
|
||||
pool1
|
||||
.start_agent(&root1, "86_story_foo", Some("coder-1"), None)
|
||||
.await
|
||||
});
|
||||
|
||||
let pool2 = pool.clone();
|
||||
let root2 = root.clone();
|
||||
let t2 = tokio::spawn(async move {
|
||||
pool2
|
||||
.start_agent(&root2, "130_story_bar", Some("coder-1"), None)
|
||||
.await
|
||||
});
|
||||
|
||||
let (r1, r2) = tokio::join!(t1, t2);
|
||||
let r1 = r1.unwrap();
|
||||
let r2 = r2.unwrap();
|
||||
|
||||
// The concurrency-rejection message always contains "already running" /
|
||||
// "becomes available". Any other error (e.g., missing git repo) means
|
||||
// that call *won* the atomic check-and-insert.
|
||||
let concurrency_rejections = [&r1, &r2]
|
||||
.iter()
|
||||
.filter(|r| {
|
||||
r.as_ref().is_err_and(|e| {
|
||||
e.contains("already running") || e.contains("becomes available")
|
||||
})
|
||||
})
|
||||
.count();
|
||||
|
||||
assert_eq!(
|
||||
concurrency_rejections, 1,
|
||||
"exactly one call must be rejected by the concurrency check; \
|
||||
got r1={r1:?} r2={r2:?}"
|
||||
);
|
||||
}
|
||||
|
||||
/// Two concurrent auto_assign_available_work calls must not assign the same
|
||||
/// agent to two stories simultaneously. After both complete, at most one
|
||||
/// Pending/Running entry must exist per agent name.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn toctou_concurrent_auto_assign_no_duplicate_agent_assignments() {
|
||||
use std::fs;
|
||||
use std::sync::Arc;
|
||||
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path().to_path_buf();
|
||||
|
||||
let sk_dir = root.join(".story_kit");
|
||||
// Two stories waiting in 2_current, one coder agent.
|
||||
fs::create_dir_all(sk_dir.join("work/2_current")).unwrap();
|
||||
fs::write(
|
||||
sk_dir.join("project.toml"),
|
||||
"[[agent]]\nname = \"coder-1\"\n",
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
sk_dir.join("work/2_current/86_story_foo.md"),
|
||||
"---\nname: Foo\n---\n",
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
sk_dir.join("work/2_current/130_story_bar.md"),
|
||||
"---\nname: Bar\n---\n",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let pool = Arc::new(AgentPool::new(3099));
|
||||
|
||||
// Run two concurrent auto_assign calls.
|
||||
let pool1 = pool.clone();
|
||||
let root1 = root.clone();
|
||||
let t1 = tokio::spawn(async move { pool1.auto_assign_available_work(&root1).await });
|
||||
|
||||
let pool2 = pool.clone();
|
||||
let root2 = root.clone();
|
||||
let t2 = tokio::spawn(async move { pool2.auto_assign_available_work(&root2).await });
|
||||
|
||||
let _ = tokio::join!(t1, t2);
|
||||
|
||||
// At most one Pending/Running entry should exist for coder-1.
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let active_coder_count = agents
|
||||
.values()
|
||||
.filter(|a| {
|
||||
a.agent_name == "coder-1"
|
||||
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
||||
})
|
||||
.count();
|
||||
|
||||
assert!(
|
||||
active_coder_count <= 1,
|
||||
"coder-1 must not be assigned to more than one story simultaneously; \
|
||||
found {active_coder_count} active entries"
|
||||
);
|
||||
}
|
||||
|
||||
// ── resolve_simple_conflicts unit tests ──────────────────────────────────
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1562,15 +1562,20 @@ fn parse_test_cases(value: Option<&Value>) -> Result<Vec<TestCaseResult>, String
|
||||
}
|
||||
|
||||
fn tool_get_server_logs(args: &Value) -> Result<String, String> {
|
||||
let lines = args
|
||||
let lines_count = args
|
||||
.get("lines")
|
||||
.and_then(|v| v.as_u64())
|
||||
.map(|n| n.min(1000) as usize)
|
||||
.unwrap_or(100);
|
||||
let filter = args.get("filter").and_then(|v| v.as_str());
|
||||
|
||||
let recent = log_buffer::global().get_recent(lines, filter);
|
||||
Ok(recent.join("\n"))
|
||||
// Fetch extra buffer entries to account for multi-line entries within each
|
||||
let fetch = lines_count.saturating_mul(4).min(4000);
|
||||
let recent = log_buffer::global().get_recent(fetch, filter);
|
||||
// Flatten buffer entries into individual lines, then take the last lines_count
|
||||
let all_lines: Vec<&str> = recent.iter().flat_map(|s| s.lines()).collect();
|
||||
let start = all_lines.len().saturating_sub(lines_count);
|
||||
Ok(all_lines[start..].join("\n"))
|
||||
}
|
||||
|
||||
/// MCP tool called by Claude Code via `--permission-prompt-tool`.
|
||||
|
||||
Reference in New Issue
Block a user