diff --git a/server/src/agents/lifecycle.rs b/server/src/agents/lifecycle.rs index 3c738389..46b2fff3 100644 --- a/server/src/agents/lifecycle.rs +++ b/server/src/agents/lifecycle.rs @@ -166,7 +166,7 @@ pub fn move_story_to_done(story_id: &str) -> Result<(), String> { "5_done", &["6_archived"], false, - &["merge_failure", "retry_count", "blocked"], + &["merge_failure", "blocked"], ) .map(|_| ()) } @@ -181,7 +181,7 @@ pub fn move_story_to_merge(story_id: &str) -> Result<(), String> { "4_merge", &["5_done", "6_archived"], false, - &["retry_count", "blocked"], + &["blocked"], ) .map(|_| ()) } @@ -196,7 +196,7 @@ pub fn move_story_to_qa(story_id: &str) -> Result<(), String> { "3_qa", &["5_done", "6_archived"], false, - &["retry_count", "blocked"], + &["blocked"], ) .map(|_| ()) } diff --git a/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs b/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs index 7583e5e6..55e776cc 100644 --- a/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs +++ b/server/src/agents/pool/auto_assign/watchdog/tests/limits_tests.rs @@ -218,6 +218,7 @@ max_budget_usd = 5.00 #[test] fn watchdog_marks_story_blocked_after_limit_termination() { crate::db::ensure_content_store(); + crate::crdt_state::init_for_test(); let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); @@ -239,6 +240,18 @@ max_turns = 10 let story_id = "42_story_runaway"; let initial = "---\nname: Runaway Story\n---\n# Runaway Story\n"; crate::db::write_content(story_id, initial); + crate::crdt_state::write_item( + story_id, + "2_current", + Some("Runaway Story"), + None, + None, + None, + None, + None, + None, + None, + ); // 12 turns in a single session exceeds the configured max of 10. write_fake_session_log(root, story_id, "coder-1", "sess-runaway", 12); @@ -334,6 +347,7 @@ max_turns = 10 #[test] fn per_session_counting_terminates_over_limit() { crate::db::ensure_content_store(); + crate::crdt_state::init_for_test(); let tmp = tempfile::tempdir().unwrap(); let root = tmp.path(); @@ -352,6 +366,18 @@ max_turns = 10 let story_id = "story_e_per_session"; crate::db::write_content(story_id, "---\nname: Per-Session Test\n---\n"); + crate::crdt_state::write_item( + story_id, + "2_current", + Some("Per-Session Test"), + None, + None, + None, + None, + None, + None, + None, + ); // Prior session with 5 turns (under limit alone). write_fake_session_log(root, story_id, "coder-1", "old-sess", 5); @@ -416,38 +442,55 @@ max_turns = 10 let story_id = "88_story_retry_watchdog"; let initial = "---\nname: Retry Test\n---\n"; + crate::crdt_state::init_for_test(); crate::db::write_content(story_id, initial); + crate::crdt_state::write_item( + story_id, + "2_current", + Some("Retry Test"), + None, + None, + None, + None, + None, + None, + None, + ); - // Session 1: exceeds limit → retry_count=1, NOT blocked. + // Session 1: exceeds limit → retry_count=1 in CRDT, NOT blocked. { write_fake_session_log(root, story_id, "coder-1", "session-1", 12); let pool = AgentPool::new_test(3001); pool.inject_test_agent_with_session(story_id, "coder-1", AgentStatus::Running, "session-1"); pool.run_watchdog_pass(Some(root)); - let content = crate::db::read_content(story_id).unwrap(); - assert!( - content.contains("retry_count: 1"), - "after session 1, retry_count should be 1 — got:\n{content}" + let item = crate::crdt_state::read_item(story_id).expect("story must be in CRDT"); + assert_eq!( + item.retry_count, + Some(1), + "after session 1, retry_count should be 1 in CRDT" ); + let content = crate::db::read_content(story_id).unwrap(); assert!( !content.contains("blocked: true"), "story should NOT be blocked after session 1" ); } - // Session 2: exceeds limit → retry_count=2, NOT blocked. + // Session 2: exceeds limit → retry_count=2 in CRDT, NOT blocked. { write_fake_session_log(root, story_id, "coder-1", "session-2", 12); let pool = AgentPool::new_test(3001); pool.inject_test_agent_with_session(story_id, "coder-1", AgentStatus::Running, "session-2"); pool.run_watchdog_pass(Some(root)); - let content = crate::db::read_content(story_id).unwrap(); - assert!( - content.contains("retry_count: 2"), - "after session 2, retry_count should be 2 — got:\n{content}" + let item = crate::crdt_state::read_item(story_id).expect("story must be in CRDT"); + assert_eq!( + item.retry_count, + Some(2), + "after session 2, retry_count should be 2 in CRDT" ); + let content = crate::db::read_content(story_id).unwrap(); assert!( !content.contains("blocked: true"), "story should NOT be blocked after session 2" @@ -466,5 +509,11 @@ max_turns = 10 content.contains("blocked: true"), "story must be blocked after session 3 (retry_count=3 >= max_retries=3) — got:\n{content}" ); + let item = crate::crdt_state::read_item(story_id).expect("story must be in CRDT"); + assert_eq!( + item.retry_count, + Some(3), + "retry_count should be 3 in CRDT after session 3" + ); } } diff --git a/server/src/agents/pool/pipeline/advance/helpers.rs b/server/src/agents/pool/pipeline/advance/helpers.rs index 876e14ed..9f65d223 100644 --- a/server/src/agents/pool/pipeline/advance/helpers.rs +++ b/server/src/agents/pool/pipeline/advance/helpers.rs @@ -104,41 +104,42 @@ pub(crate) fn should_block_story( max_retries: u32, stage_label: &str, ) -> Option { - use crate::io::story_metadata::{increment_retry_count_in_content, write_blocked_in_content}; + use crate::io::story_metadata::write_blocked_in_content; if max_retries == 0 { return None; } - if let Some(contents) = crate::db::read_content(story_id) { - let (updated, new_count) = increment_retry_count_in_content(&contents); - crate::db::write_content(story_id, &updated); - let stage = crate::pipeline_state::read_typed(story_id) - .ok() - .flatten() - .map(|i| i.stage.dir_name().to_string()) - .unwrap_or_else(|| "2_current".to_string()); - crate::db::write_item_with_content(story_id, &stage, &updated); + let new_count = crate::crdt_state::bump_retry_count(story_id) as u32; + if new_count == 0 { + slog_error!( + "[pipeline] Failed to bump retry_count for '{story_id}': item not found in CRDT" + ); + return None; + } - if new_count >= max_retries { - slog_warn!( - "[pipeline] Story '{story_id}' reached retry limit ({new_count}/{max_retries}) \ - at {stage_label} stage. Marking as blocked." - ); - let blocked = write_blocked_in_content(&updated); + if new_count >= max_retries { + slog_warn!( + "[pipeline] Story '{story_id}' reached retry limit ({new_count}/{max_retries}) \ + at {stage_label} stage. Marking as blocked." + ); + if let Some(contents) = crate::db::read_content(story_id) { + let blocked = write_blocked_in_content(&contents); crate::db::write_content(story_id, &blocked); + let stage = crate::pipeline_state::read_typed(story_id) + .ok() + .flatten() + .map(|i| i.stage.dir_name().to_string()) + .unwrap_or_else(|| "2_current".to_string()); crate::db::write_item_with_content(story_id, &stage, &blocked); - Some(format!( - "Retry limit exceeded ({new_count}/{max_retries}) at {stage_label} stage" - )) - } else { - slog!( - "[pipeline] Story '{story_id}' retry {new_count}/{max_retries} at {stage_label} stage." - ); - None } + Some(format!( + "Retry limit exceeded ({new_count}/{max_retries}) at {stage_label} stage" + )) } else { - slog_error!("[pipeline] Failed to read content for '{story_id}' to increment retry_count"); + slog!( + "[pipeline] Story '{story_id}' retry {new_count}/{max_retries} at {stage_label} stage." + ); None } } diff --git a/server/src/agents/pool/pipeline/advance/tests_regression.rs b/server/src/agents/pool/pipeline/advance/tests_regression.rs index a7b79121..198e97b5 100644 --- a/server/src/agents/pool/pipeline/advance/tests_regression.rs +++ b/server/src/agents/pool/pipeline/advance/tests_regression.rs @@ -467,6 +467,7 @@ async fn no_committed_work_still_retries_and_blocks() { .unwrap(); // Set up the story with max_retries=1 so it blocks immediately. + crate::crdt_state::init_for_test(); crate::db::ensure_content_store(); crate::db::write_content("9946_story_nowork", "---\nname: No Work Test\n---\n"); crate::db::write_item_with_content( @@ -806,6 +807,7 @@ stage = "coder" ) .unwrap(); + crate::crdt_state::init_for_test(); crate::db::ensure_content_store(); crate::db::write_item_with_content( "9950_story_warm_resume", @@ -848,11 +850,12 @@ stage = "coder" ); drop(agents); - // Retry counter must have been incremented (AC 3). - let content = crate::db::read_content("9950_story_warm_resume") - .expect("story must exist in content store"); + // Retry counter must have been incremented (AC 3) — checked via CRDT. + let item = + crate::crdt_state::read_item("9950_story_warm_resume").expect("story must be in CRDT"); assert!( - content.contains("retry_count"), - "retry_count must be incremented after warm-resume: {content}" + item.retry_count.is_some_and(|rc| rc > 0), + "retry_count must be incremented after warm-resume: got {:?}", + item.retry_count ); } diff --git a/server/src/chat/commands/unblock.rs b/server/src/chat/commands/unblock.rs index 5e679093..8ff9a34c 100644 --- a/server/src/chat/commands/unblock.rs +++ b/server/src/chat/commands/unblock.rs @@ -5,9 +5,7 @@ //! and returns a confirmation. use super::CommandContext; -use crate::io::story_metadata::{ - clear_front_matter_field_in_content, parse_front_matter, set_front_matter_field, -}; +use crate::io::story_metadata::{clear_front_matter_field_in_content, parse_front_matter}; use std::path::Path; /// Handle the `unblock` command. @@ -73,7 +71,8 @@ fn unblock_by_story_id(story_id: &str) -> String { if has_merge_failure { updated = clear_front_matter_field_in_content(&updated, "merge_failure"); } - updated = set_front_matter_field(&updated, "retry_count", "0"); + // retry_count lives in the CRDT; clear any stale copy from front-matter. + updated = clear_front_matter_field_in_content(&updated, "retry_count"); crate::db::write_content(story_id, &updated); let stage = crate::pipeline_state::read_typed(story_id) @@ -82,6 +81,7 @@ fn unblock_by_story_id(story_id: &str) -> String { .map(|i| i.stage.dir_name().to_string()) .unwrap_or_else(|| "2_current".to_string()); crate::db::write_item_with_content(story_id, &stage, &updated); + crate::crdt_state::set_retry_count(story_id, 0); let mut cleared = Vec::new(); if has_blocked { @@ -189,6 +189,7 @@ mod tests { #[test] fn unblock_command_clears_blocked_and_resets_retry_count() { + crate::crdt_state::init_for_test(); let tmp = tempfile::TempDir::new().unwrap(); // Use a high story number (9903) to avoid collisions with other tests in the // global content store. @@ -198,6 +199,19 @@ mod tests { "9903_story_stuck.md", "---\nname: Stuck Story\nblocked: true\nretry_count: 5\n---\n# Story\n", ); + // Seed the story in the CRDT with retry_count=5 so set_retry_count can reset it. + crate::crdt_state::write_item( + "9903_story_stuck", + "2_current", + Some("Stuck Story"), + None, + Some(5), + Some(true), + None, + None, + None, + None, + ); let output = unblock_cmd_with_root(tmp.path(), "9903").unwrap(); assert!( @@ -209,7 +223,7 @@ mod tests { "should include story_id in response: {output}" ); - // The unblock command writes back via the content store; read from there. + // The unblock command writes back via the content store; blocked field should be gone. let contents = crate::db::read_content("9903_story_stuck") .or_else(|| { std::fs::read_to_string( @@ -223,9 +237,17 @@ mod tests { !contents.contains("blocked:"), "blocked field should be removed: {contents}" ); + // retry_count is now in the CRDT, not in front-matter. assert!( - contents.contains("retry_count: 0"), - "retry_count should be reset to 0: {contents}" + !contents.contains("retry_count:"), + "retry_count should be cleared from front-matter after unblock: {contents}" + ); + let item = crate::crdt_state::read_item("9903_story_stuck") + .expect("story should be in CRDT after unblock"); + assert_eq!( + item.retry_count, + Some(0), + "retry_count should be reset to 0 in CRDT after unblock" ); } diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index 0e4c2565..270d89b8 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -52,8 +52,8 @@ pub use types::{ TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, subscribe, }; pub use write::{ - migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id, set_qa_mode, - write_item, + bump_retry_count, migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id, + set_qa_mode, set_retry_count, write_item, }; #[cfg(test)] diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index 2fa9bc6a..15d34244 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -133,7 +133,7 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump { _ => None, }; let retry_count = match item_crdt.retry_count.view() { - JsonValue::Number(n) if n > 0.0 => Some(n as i64), + JsonValue::Number(n) => Some(n as i64), _ => None, }; let blocked = match item_crdt.blocked.view() { @@ -290,7 +290,7 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option None, }; let retry_count = match item.retry_count.view() { - JsonValue::Number(n) if n > 0.0 => Some(n as i64), + JsonValue::Number(n) => Some(n as i64), _ => None, }; let blocked = match item.blocked.view() { diff --git a/server/src/crdt_state/write.rs b/server/src/crdt_state/write.rs index 4b32c2f7..b5e9f1cf 100644 --- a/server/src/crdt_state/write.rs +++ b/server/src/crdt_state/write.rs @@ -350,6 +350,51 @@ pub fn write_item( } } +/// Set `retry_count` to an explicit value for a pipeline item. +/// +/// Pure metadata operation — the item's stage is not changed. +/// Call `set_retry_count(story_id, 0)` to reset the counter after a +/// stage transition or an explicit unblock. +pub fn set_retry_count(story_id: &str, count: i64) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + if let Some(&idx) = state.index.get(story_id) { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].retry_count.set(count as f64) + }); + } +} + +/// Increment `retry_count` by 1 and return the new value. +/// +/// Pure metadata operation — the item's stage is not changed. +/// Returns 0 if the item is not found in the CRDT (no-op in that case). +/// Use the returned value to decide whether the story should be blocked. +pub fn bump_retry_count(story_id: &str) -> i64 { + let Some(state_mutex) = get_crdt() else { + return 0; + }; + let Ok(mut state) = state_mutex.lock() else { + return 0; + }; + let Some(&idx) = state.index.get(story_id) else { + return 0; + }; + let current = match state.crdt.doc.items[idx].retry_count.view() { + JsonValue::Number(n) => n as i64, + _ => 0, + }; + let new_count = current + 1; + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].retry_count.set(new_count as f64) + }); + new_count +} + #[cfg(test)] mod tests { use super::super::hex; @@ -704,6 +749,71 @@ mod tests { use sqlx::SqlitePool; use sqlx::sqlite::SqliteConnectOptions; + // ── set_retry_count / bump_retry_count tests ───────────────────────────── + + #[test] + fn bump_retry_count_increments_by_one() { + init_for_test(); + write_item( + "9001_story_bump_test", + "2_current", + None, + None, + None, + None, + None, + None, + None, + None, + ); + + let v1 = bump_retry_count("9001_story_bump_test"); + assert_eq!(v1, 1, "first bump should return 1"); + + let v2 = bump_retry_count("9001_story_bump_test"); + assert_eq!(v2, 2, "second bump should return 2"); + + let item = read_item("9001_story_bump_test").expect("item must exist"); + assert_eq!( + item.retry_count, + Some(2), + "CRDT must reflect final bump value" + ); + } + + #[test] + fn set_retry_count_resets_to_zero() { + init_for_test(); + write_item( + "9002_story_set_test", + "2_current", + None, + None, + Some(5), + None, + None, + None, + None, + None, + ); + + set_retry_count("9002_story_set_test", 0); + + let item = read_item("9002_story_set_test").expect("item must exist"); + assert_eq!( + item.retry_count, + Some(0), + "set_retry_count(0) must reset to 0" + ); + } + + #[test] + fn bump_returns_zero_for_missing_item() { + init_for_test(); + let result = bump_retry_count("nonexistent_story"); + assert_eq!(result, 0, "bump on missing item should return 0 (no-op)"); + } + #[tokio::test] async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() { let tmp = tempfile::tempdir().unwrap(); diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 5815f921..7d876045 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -322,14 +322,6 @@ pub fn move_item_stage( }) .unwrap_or((None, None, None, None, None)); - // Bug 780: stage transitions reset retry_count to 0. retry_count tracks - // attempts at THIS stage's work (coding, merging, qa); a fresh attempt at - // a new stage is conceptually distinct from prior attempts at a different - // stage. `blocked` is preserved — that's a human-set signal that survives - // transitions. Note: passing None to write_item is a no-op on the CRDT - // register (the old value is kept), so we must pass Some(0) to force reset. - let retry_count: Option = Some(0); - // CRDT stage transition. let merged_at_ts = if crate::pipeline_state::Stage::from_dir(new_stage) .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. })) @@ -343,15 +335,22 @@ pub fn move_item_stage( new_stage, name.as_deref(), agent.as_deref(), - retry_count, + None, blocked, depends_on.as_deref(), None, None, merged_at_ts, ); + // Bug 780: stage transitions reset retry_count to 0. retry_count tracks + // attempts at THIS stage's work (coding, merging, qa); a fresh attempt at + // a new stage is conceptually distinct from prior attempts at a different + // stage. `blocked` is preserved — that's a human-set signal that survives + // transitions. + crate::crdt_state::set_retry_count(story_id, 0); - // Shadow table. + // Shadow table — always reset retry_count to 0 on stage transition. + let retry_count: Option = Some(0); if let Some(db) = PIPELINE_DB.get() { let msg = PipelineWriteMsg { story_id: story_id.to_string(), @@ -720,6 +719,7 @@ mod tests { /// deterministic-merge skip logic. #[test] fn move_item_stage_resets_retry_count_to_zero() { + crate::crdt_state::init_for_test(); ensure_content_store(); let story_id = "9870_story_780_retry_reset"; diff --git a/server/src/io/story_metadata/mod.rs b/server/src/io/story_metadata/mod.rs index b9f21c6f..a1575663 100644 --- a/server/src/io/story_metadata/mod.rs +++ b/server/src/io/story_metadata/mod.rs @@ -19,7 +19,7 @@ pub use fields::{ write_rejection_notes_to_content, write_review_hold, write_review_hold_in_content, }; pub use parser::{ - increment_retry_count_in_content, is_story_frozen_in_store, parse_front_matter, - parse_unchecked_todos, resolve_qa_mode, resolve_qa_mode_from_content, + is_story_frozen_in_store, parse_front_matter, parse_unchecked_todos, resolve_qa_mode, + resolve_qa_mode_from_content, }; pub use types::QaMode; diff --git a/server/src/io/story_metadata/parser.rs b/server/src/io/story_metadata/parser.rs index 5ba6c664..3c0db5f1 100644 --- a/server/src/io/story_metadata/parser.rs +++ b/server/src/io/story_metadata/parser.rs @@ -3,7 +3,6 @@ use serde::Deserialize; use std::fs; use std::path::Path; -use super::fields::set_front_matter_field; use super::types::{QaMode, StoryMetaError, StoryMetadata}; #[derive(Debug, Deserialize)] @@ -143,19 +142,6 @@ pub fn is_story_frozen_in_store(story_id: &str) -> bool { .unwrap_or(false) } -/// Increment the `retry_count` field in story content (pure function). -/// -/// Returns `(updated_content, new_count)`. -pub fn increment_retry_count_in_content(contents: &str) -> (String, u32) { - let current = parse_front_matter(contents) - .ok() - .and_then(|m| m.retry_count) - .unwrap_or(0); - let new_count = current + 1; - let updated = set_front_matter_field(contents, "retry_count", &new_count.to_string()); - (updated, new_count) -} - #[cfg(test)] mod tests { use super::*;