huskies: merge 870

This commit is contained in:
dave
2026-04-29 15:17:47 +00:00
parent db65271587
commit 2655288412
11 changed files with 251 additions and 80 deletions
+3 -3
View File
@@ -166,7 +166,7 @@ pub fn move_story_to_done(story_id: &str) -> Result<(), String> {
"5_done",
&["6_archived"],
false,
&["merge_failure", "retry_count", "blocked"],
&["merge_failure", "blocked"],
)
.map(|_| ())
}
@@ -181,7 +181,7 @@ pub fn move_story_to_merge(story_id: &str) -> Result<(), String> {
"4_merge",
&["5_done", "6_archived"],
false,
&["retry_count", "blocked"],
&["blocked"],
)
.map(|_| ())
}
@@ -196,7 +196,7 @@ pub fn move_story_to_qa(story_id: &str) -> Result<(), String> {
"3_qa",
&["5_done", "6_archived"],
false,
&["retry_count", "blocked"],
&["blocked"],
)
.map(|_| ())
}
@@ -218,6 +218,7 @@ max_budget_usd = 5.00
#[test]
fn watchdog_marks_story_blocked_after_limit_termination() {
crate::db::ensure_content_store();
crate::crdt_state::init_for_test();
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
@@ -239,6 +240,18 @@ max_turns = 10
let story_id = "42_story_runaway";
let initial = "---\nname: Runaway Story\n---\n# Runaway Story\n";
crate::db::write_content(story_id, initial);
crate::crdt_state::write_item(
story_id,
"2_current",
Some("Runaway Story"),
None,
None,
None,
None,
None,
None,
None,
);
// 12 turns in a single session exceeds the configured max of 10.
write_fake_session_log(root, story_id, "coder-1", "sess-runaway", 12);
@@ -334,6 +347,7 @@ max_turns = 10
#[test]
fn per_session_counting_terminates_over_limit() {
crate::db::ensure_content_store();
crate::crdt_state::init_for_test();
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
@@ -352,6 +366,18 @@ max_turns = 10
let story_id = "story_e_per_session";
crate::db::write_content(story_id, "---\nname: Per-Session Test\n---\n");
crate::crdt_state::write_item(
story_id,
"2_current",
Some("Per-Session Test"),
None,
None,
None,
None,
None,
None,
None,
);
// Prior session with 5 turns (under limit alone).
write_fake_session_log(root, story_id, "coder-1", "old-sess", 5);
@@ -416,38 +442,55 @@ max_turns = 10
let story_id = "88_story_retry_watchdog";
let initial = "---\nname: Retry Test\n---\n";
crate::crdt_state::init_for_test();
crate::db::write_content(story_id, initial);
crate::crdt_state::write_item(
story_id,
"2_current",
Some("Retry Test"),
None,
None,
None,
None,
None,
None,
None,
);
// Session 1: exceeds limit → retry_count=1, NOT blocked.
// Session 1: exceeds limit → retry_count=1 in CRDT, NOT blocked.
{
write_fake_session_log(root, story_id, "coder-1", "session-1", 12);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_session(story_id, "coder-1", AgentStatus::Running, "session-1");
pool.run_watchdog_pass(Some(root));
let content = crate::db::read_content(story_id).unwrap();
assert!(
content.contains("retry_count: 1"),
"after session 1, retry_count should be 1 — got:\n{content}"
let item = crate::crdt_state::read_item(story_id).expect("story must be in CRDT");
assert_eq!(
item.retry_count,
Some(1),
"after session 1, retry_count should be 1 in CRDT"
);
let content = crate::db::read_content(story_id).unwrap();
assert!(
!content.contains("blocked: true"),
"story should NOT be blocked after session 1"
);
}
// Session 2: exceeds limit → retry_count=2, NOT blocked.
// Session 2: exceeds limit → retry_count=2 in CRDT, NOT blocked.
{
write_fake_session_log(root, story_id, "coder-1", "session-2", 12);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_session(story_id, "coder-1", AgentStatus::Running, "session-2");
pool.run_watchdog_pass(Some(root));
let content = crate::db::read_content(story_id).unwrap();
assert!(
content.contains("retry_count: 2"),
"after session 2, retry_count should be 2 — got:\n{content}"
let item = crate::crdt_state::read_item(story_id).expect("story must be in CRDT");
assert_eq!(
item.retry_count,
Some(2),
"after session 2, retry_count should be 2 in CRDT"
);
let content = crate::db::read_content(story_id).unwrap();
assert!(
!content.contains("blocked: true"),
"story should NOT be blocked after session 2"
@@ -466,5 +509,11 @@ max_turns = 10
content.contains("blocked: true"),
"story must be blocked after session 3 (retry_count=3 >= max_retries=3) — got:\n{content}"
);
let item = crate::crdt_state::read_item(story_id).expect("story must be in CRDT");
assert_eq!(
item.retry_count,
Some(3),
"retry_count should be 3 in CRDT after session 3"
);
}
}
@@ -104,30 +104,35 @@ pub(crate) fn should_block_story(
max_retries: u32,
stage_label: &str,
) -> Option<String> {
use crate::io::story_metadata::{increment_retry_count_in_content, write_blocked_in_content};
use crate::io::story_metadata::write_blocked_in_content;
if max_retries == 0 {
return None;
}
if let Some(contents) = crate::db::read_content(story_id) {
let (updated, new_count) = increment_retry_count_in_content(&contents);
crate::db::write_content(story_id, &updated);
let stage = crate::pipeline_state::read_typed(story_id)
.ok()
.flatten()
.map(|i| i.stage.dir_name().to_string())
.unwrap_or_else(|| "2_current".to_string());
crate::db::write_item_with_content(story_id, &stage, &updated);
let new_count = crate::crdt_state::bump_retry_count(story_id) as u32;
if new_count == 0 {
slog_error!(
"[pipeline] Failed to bump retry_count for '{story_id}': item not found in CRDT"
);
return None;
}
if new_count >= max_retries {
slog_warn!(
"[pipeline] Story '{story_id}' reached retry limit ({new_count}/{max_retries}) \
at {stage_label} stage. Marking as blocked."
);
let blocked = write_blocked_in_content(&updated);
if let Some(contents) = crate::db::read_content(story_id) {
let blocked = write_blocked_in_content(&contents);
crate::db::write_content(story_id, &blocked);
let stage = crate::pipeline_state::read_typed(story_id)
.ok()
.flatten()
.map(|i| i.stage.dir_name().to_string())
.unwrap_or_else(|| "2_current".to_string());
crate::db::write_item_with_content(story_id, &stage, &blocked);
}
Some(format!(
"Retry limit exceeded ({new_count}/{max_retries}) at {stage_label} stage"
))
@@ -137,8 +142,4 @@ pub(crate) fn should_block_story(
);
None
}
} else {
slog_error!("[pipeline] Failed to read content for '{story_id}' to increment retry_count");
None
}
}
@@ -467,6 +467,7 @@ async fn no_committed_work_still_retries_and_blocks() {
.unwrap();
// Set up the story with max_retries=1 so it blocks immediately.
crate::crdt_state::init_for_test();
crate::db::ensure_content_store();
crate::db::write_content("9946_story_nowork", "---\nname: No Work Test\n---\n");
crate::db::write_item_with_content(
@@ -806,6 +807,7 @@ stage = "coder"
)
.unwrap();
crate::crdt_state::init_for_test();
crate::db::ensure_content_store();
crate::db::write_item_with_content(
"9950_story_warm_resume",
@@ -848,11 +850,12 @@ stage = "coder"
);
drop(agents);
// Retry counter must have been incremented (AC 3).
let content = crate::db::read_content("9950_story_warm_resume")
.expect("story must exist in content store");
// Retry counter must have been incremented (AC 3) — checked via CRDT.
let item =
crate::crdt_state::read_item("9950_story_warm_resume").expect("story must be in CRDT");
assert!(
content.contains("retry_count"),
"retry_count must be incremented after warm-resume: {content}"
item.retry_count.is_some_and(|rc| rc > 0),
"retry_count must be incremented after warm-resume: got {:?}",
item.retry_count
);
}
+29 -7
View File
@@ -5,9 +5,7 @@
//! and returns a confirmation.
use super::CommandContext;
use crate::io::story_metadata::{
clear_front_matter_field_in_content, parse_front_matter, set_front_matter_field,
};
use crate::io::story_metadata::{clear_front_matter_field_in_content, parse_front_matter};
use std::path::Path;
/// Handle the `unblock` command.
@@ -73,7 +71,8 @@ fn unblock_by_story_id(story_id: &str) -> String {
if has_merge_failure {
updated = clear_front_matter_field_in_content(&updated, "merge_failure");
}
updated = set_front_matter_field(&updated, "retry_count", "0");
// retry_count lives in the CRDT; clear any stale copy from front-matter.
updated = clear_front_matter_field_in_content(&updated, "retry_count");
crate::db::write_content(story_id, &updated);
let stage = crate::pipeline_state::read_typed(story_id)
@@ -82,6 +81,7 @@ fn unblock_by_story_id(story_id: &str) -> String {
.map(|i| i.stage.dir_name().to_string())
.unwrap_or_else(|| "2_current".to_string());
crate::db::write_item_with_content(story_id, &stage, &updated);
crate::crdt_state::set_retry_count(story_id, 0);
let mut cleared = Vec::new();
if has_blocked {
@@ -189,6 +189,7 @@ mod tests {
#[test]
fn unblock_command_clears_blocked_and_resets_retry_count() {
crate::crdt_state::init_for_test();
let tmp = tempfile::TempDir::new().unwrap();
// Use a high story number (9903) to avoid collisions with other tests in the
// global content store.
@@ -198,6 +199,19 @@ mod tests {
"9903_story_stuck.md",
"---\nname: Stuck Story\nblocked: true\nretry_count: 5\n---\n# Story\n",
);
// Seed the story in the CRDT with retry_count=5 so set_retry_count can reset it.
crate::crdt_state::write_item(
"9903_story_stuck",
"2_current",
Some("Stuck Story"),
None,
Some(5),
Some(true),
None,
None,
None,
None,
);
let output = unblock_cmd_with_root(tmp.path(), "9903").unwrap();
assert!(
@@ -209,7 +223,7 @@ mod tests {
"should include story_id in response: {output}"
);
// The unblock command writes back via the content store; read from there.
// The unblock command writes back via the content store; blocked field should be gone.
let contents = crate::db::read_content("9903_story_stuck")
.or_else(|| {
std::fs::read_to_string(
@@ -223,9 +237,17 @@ mod tests {
!contents.contains("blocked:"),
"blocked field should be removed: {contents}"
);
// retry_count is now in the CRDT, not in front-matter.
assert!(
contents.contains("retry_count: 0"),
"retry_count should be reset to 0: {contents}"
!contents.contains("retry_count:"),
"retry_count should be cleared from front-matter after unblock: {contents}"
);
let item = crate::crdt_state::read_item("9903_story_stuck")
.expect("story should be in CRDT after unblock");
assert_eq!(
item.retry_count,
Some(0),
"retry_count should be reset to 0 in CRDT after unblock"
);
}
+2 -2
View File
@@ -52,8 +52,8 @@ pub use types::{
TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, subscribe,
};
pub use write::{
migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id, set_qa_mode,
write_item,
bump_retry_count, migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id,
set_qa_mode, set_retry_count, write_item,
};
#[cfg(test)]
+2 -2
View File
@@ -133,7 +133,7 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump {
_ => None,
};
let retry_count = match item_crdt.retry_count.view() {
JsonValue::Number(n) if n > 0.0 => Some(n as i64),
JsonValue::Number(n) => Some(n as i64),
_ => None,
};
let blocked = match item_crdt.blocked.view() {
@@ -290,7 +290,7 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemV
_ => None,
};
let retry_count = match item.retry_count.view() {
JsonValue::Number(n) if n > 0.0 => Some(n as i64),
JsonValue::Number(n) => Some(n as i64),
_ => None,
};
let blocked = match item.blocked.view() {
+110
View File
@@ -350,6 +350,51 @@ pub fn write_item(
}
}
/// Set `retry_count` to an explicit value for a pipeline item.
///
/// Pure metadata operation — the item's stage is not changed.
/// Call `set_retry_count(story_id, 0)` to reset the counter after a
/// stage transition or an explicit unblock.
pub fn set_retry_count(story_id: &str, count: i64) {
let Some(state_mutex) = get_crdt() else {
return;
};
let Ok(mut state) = state_mutex.lock() else {
return;
};
if let Some(&idx) = state.index.get(story_id) {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].retry_count.set(count as f64)
});
}
}
/// Increment `retry_count` by 1 and return the new value.
///
/// Pure metadata operation — the item's stage is not changed.
/// Returns 0 if the item is not found in the CRDT (no-op in that case).
/// Use the returned value to decide whether the story should be blocked.
pub fn bump_retry_count(story_id: &str) -> i64 {
let Some(state_mutex) = get_crdt() else {
return 0;
};
let Ok(mut state) = state_mutex.lock() else {
return 0;
};
let Some(&idx) = state.index.get(story_id) else {
return 0;
};
let current = match state.crdt.doc.items[idx].retry_count.view() {
JsonValue::Number(n) => n as i64,
_ => 0,
};
let new_count = current + 1;
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].retry_count.set(new_count as f64)
});
new_count
}
#[cfg(test)]
mod tests {
use super::super::hex;
@@ -704,6 +749,71 @@ mod tests {
use sqlx::SqlitePool;
use sqlx::sqlite::SqliteConnectOptions;
// ── set_retry_count / bump_retry_count tests ─────────────────────────────
#[test]
fn bump_retry_count_increments_by_one() {
init_for_test();
write_item(
"9001_story_bump_test",
"2_current",
None,
None,
None,
None,
None,
None,
None,
None,
);
let v1 = bump_retry_count("9001_story_bump_test");
assert_eq!(v1, 1, "first bump should return 1");
let v2 = bump_retry_count("9001_story_bump_test");
assert_eq!(v2, 2, "second bump should return 2");
let item = read_item("9001_story_bump_test").expect("item must exist");
assert_eq!(
item.retry_count,
Some(2),
"CRDT must reflect final bump value"
);
}
#[test]
fn set_retry_count_resets_to_zero() {
init_for_test();
write_item(
"9002_story_set_test",
"2_current",
None,
None,
Some(5),
None,
None,
None,
None,
None,
);
set_retry_count("9002_story_set_test", 0);
let item = read_item("9002_story_set_test").expect("item must exist");
assert_eq!(
item.retry_count,
Some(0),
"set_retry_count(0) must reset to 0"
);
}
#[test]
fn bump_returns_zero_for_missing_item() {
init_for_test();
let result = bump_retry_count("nonexistent_story");
assert_eq!(result, 0, "bump on missing item should return 0 (no-op)");
}
#[tokio::test]
async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() {
let tmp = tempfile::tempdir().unwrap();
+10 -10
View File
@@ -322,14 +322,6 @@ pub fn move_item_stage(
})
.unwrap_or((None, None, None, None, None));
// Bug 780: stage transitions reset retry_count to 0. retry_count tracks
// attempts at THIS stage's work (coding, merging, qa); a fresh attempt at
// a new stage is conceptually distinct from prior attempts at a different
// stage. `blocked` is preserved — that's a human-set signal that survives
// transitions. Note: passing None to write_item is a no-op on the CRDT
// register (the old value is kept), so we must pass Some(0) to force reset.
let retry_count: Option<i64> = Some(0);
// CRDT stage transition.
let merged_at_ts = if crate::pipeline_state::Stage::from_dir(new_stage)
.is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. }))
@@ -343,15 +335,22 @@ pub fn move_item_stage(
new_stage,
name.as_deref(),
agent.as_deref(),
retry_count,
None,
blocked,
depends_on.as_deref(),
None,
None,
merged_at_ts,
);
// Bug 780: stage transitions reset retry_count to 0. retry_count tracks
// attempts at THIS stage's work (coding, merging, qa); a fresh attempt at
// a new stage is conceptually distinct from prior attempts at a different
// stage. `blocked` is preserved — that's a human-set signal that survives
// transitions.
crate::crdt_state::set_retry_count(story_id, 0);
// Shadow table.
// Shadow table — always reset retry_count to 0 on stage transition.
let retry_count: Option<i64> = Some(0);
if let Some(db) = PIPELINE_DB.get() {
let msg = PipelineWriteMsg {
story_id: story_id.to_string(),
@@ -720,6 +719,7 @@ mod tests {
/// deterministic-merge skip logic.
#[test]
fn move_item_stage_resets_retry_count_to_zero() {
crate::crdt_state::init_for_test();
ensure_content_store();
let story_id = "9870_story_780_retry_reset";
+2 -2
View File
@@ -19,7 +19,7 @@ pub use fields::{
write_rejection_notes_to_content, write_review_hold, write_review_hold_in_content,
};
pub use parser::{
increment_retry_count_in_content, is_story_frozen_in_store, parse_front_matter,
parse_unchecked_todos, resolve_qa_mode, resolve_qa_mode_from_content,
is_story_frozen_in_store, parse_front_matter, parse_unchecked_todos, resolve_qa_mode,
resolve_qa_mode_from_content,
};
pub use types::QaMode;
-14
View File
@@ -3,7 +3,6 @@ use serde::Deserialize;
use std::fs;
use std::path::Path;
use super::fields::set_front_matter_field;
use super::types::{QaMode, StoryMetaError, StoryMetadata};
#[derive(Debug, Deserialize)]
@@ -143,19 +142,6 @@ pub fn is_story_frozen_in_store(story_id: &str) -> bool {
.unwrap_or(false)
}
/// Increment the `retry_count` field in story content (pure function).
///
/// Returns `(updated_content, new_count)`.
pub fn increment_retry_count_in_content(contents: &str) -> (String, u32) {
let current = parse_front_matter(contents)
.ok()
.and_then(|m| m.retry_count)
.unwrap_or(0);
let new_count = current + 1;
let updated = set_front_matter_field(contents, "retry_count", &new_count.to_string());
(updated, new_count)
}
#[cfg(test)]
mod tests {
use super::*;