From ca72f36c788913e52137cf6fa3f4881259486631 Mon Sep 17 00:00:00 2001 From: dave Date: Sun, 26 Apr 2026 21:35:04 +0000 Subject: [PATCH] refactor: split agents/pool/pipeline/advance.rs into mod.rs + helpers.rs The 1353-line advance.rs is split into: - mod.rs: impl AgentPool with run_pipeline_advance + start_mergemaster_or_block + tests (1244 lines) - helpers.rs: spawn_pipeline_advance, resolve_qa_mode_from_store, write_review_hold_to_store, should_block_story (128 lines) Tests stay co-located with run_pipeline_advance which they exercise. No behaviour change. All 10 advance tests pass; full suite green. --- .../agents/pool/pipeline/advance/helpers.rs | 130 ++++++++++++++++++ .../pipeline/{advance.rs => advance/mod.rs} | 115 +--------------- 2 files changed, 133 insertions(+), 112 deletions(-) create mode 100644 server/src/agents/pool/pipeline/advance/helpers.rs rename server/src/agents/pool/pipeline/{advance.rs => advance/mod.rs} (92%) diff --git a/server/src/agents/pool/pipeline/advance/helpers.rs b/server/src/agents/pool/pipeline/advance/helpers.rs new file mode 100644 index 00000000..f3d66940 --- /dev/null +++ b/server/src/agents/pool/pipeline/advance/helpers.rs @@ -0,0 +1,130 @@ +//! Helpers for pipeline-advance: spawning, QA-mode resolution, review-hold writing, +//! block-decision logic. + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +use tokio::sync::broadcast; + +use super::super::super::super::CompletionReport; +use super::super::super::{AgentPool, StoryAgent}; +use crate::io::watcher::WatcherEvent; +use crate::{slog, slog_error, slog_warn}; +use std::path::Path; + +/// type cycle between `start_agent` and `run_server_owned_completion`. +#[allow(clippy::too_many_arguments)] +pub(crate) fn spawn_pipeline_advance( + agents: Arc>>, + port: u16, + story_id: &str, + agent_name: &str, + completion: CompletionReport, + project_root: Option, + worktree_path: Option, + watcher_tx: broadcast::Sender, + merge_failure_reported: bool, + previous_session_id: Option, +) { + let sid = story_id.to_string(); + let aname = agent_name.to_string(); + tokio::spawn(async move { + let pool = AgentPool { + agents, + port, + child_killers: Arc::new(Mutex::new(HashMap::new())), + watcher_tx, + merge_jobs: Arc::new(Mutex::new(HashMap::new())), + }; + pool.run_pipeline_advance( + &sid, + &aname, + completion, + project_root, + worktree_path, + merge_failure_reported, + previous_session_id, + ) + .await; + }); +} + +/// Resolve QA mode from the content store. +pub(super) fn resolve_qa_mode_from_store( + _project_root: &Path, + story_id: &str, + default: crate::io::story_metadata::QaMode, +) -> crate::io::story_metadata::QaMode { + if let Some(contents) = crate::db::read_content(story_id) { + return crate::io::story_metadata::resolve_qa_mode_from_content(&contents, default); + } + default +} + +/// Write review_hold to the content store. +pub(super) fn write_review_hold_to_store(story_id: &str) { + if let Some(contents) = crate::db::read_content(story_id) { + let updated = crate::io::story_metadata::write_review_hold_in_content(&contents); + crate::db::write_content(story_id, &updated); + // Also persist to SQLite via shadow write. + let stage = crate::pipeline_state::read_typed(story_id) + .ok() + .flatten() + .map(|i| i.stage.dir_name().to_string()) + .unwrap_or_else(|| "3_qa".to_string()); + crate::db::write_item_with_content(story_id, &stage, &updated); + } else { + slog_error!("[pipeline] Cannot write review_hold for '{story_id}': no content in store"); + } +} + +/// Increment retry_count and block the story if it exceeds `max_retries`. +/// +/// Returns `Some(reason)` if the story is now blocked (caller should NOT restart the agent). +/// Returns `None` if the story may be retried. +/// When `max_retries` is 0, retry limits are disabled. +pub(crate) fn should_block_story( + story_id: &str, + max_retries: u32, + stage_label: &str, +) -> Option { + use crate::io::story_metadata::{increment_retry_count_in_content, write_blocked_in_content}; + + if max_retries == 0 { + return None; + } + + if let Some(contents) = crate::db::read_content(story_id) { + let (updated, new_count) = increment_retry_count_in_content(&contents); + crate::db::write_content(story_id, &updated); + let stage = crate::pipeline_state::read_typed(story_id) + .ok() + .flatten() + .map(|i| i.stage.dir_name().to_string()) + .unwrap_or_else(|| "2_current".to_string()); + crate::db::write_item_with_content(story_id, &stage, &updated); + + if new_count >= max_retries { + slog_warn!( + "[pipeline] Story '{story_id}' reached retry limit ({new_count}/{max_retries}) \ + at {stage_label} stage. Marking as blocked." + ); + let blocked = write_blocked_in_content(&updated); + crate::db::write_content(story_id, &blocked); + crate::db::write_item_with_content(story_id, &stage, &blocked); + Some(format!( + "Retry limit exceeded ({new_count}/{max_retries}) at {stage_label} stage" + )) + } else { + slog!( + "[pipeline] Story '{story_id}' retry {new_count}/{max_retries} at {stage_label} stage." + ); + None + } + } else { + slog_error!("[pipeline] Failed to read content for '{story_id}' to increment retry_count"); + None + } +} + diff --git a/server/src/agents/pool/pipeline/advance.rs b/server/src/agents/pool/pipeline/advance/mod.rs similarity index 92% rename from server/src/agents/pool/pipeline/advance.rs rename to server/src/agents/pool/pipeline/advance/mod.rs index e9ad24cf..b37889ee 100644 --- a/server/src/agents/pool/pipeline/advance.rs +++ b/server/src/agents/pool/pipeline/advance/mod.rs @@ -483,121 +483,12 @@ impl AgentPool { /// /// This is a **non-async** function so it does not participate in the opaque /// type cycle between `start_agent` and `run_server_owned_completion`. -#[allow(clippy::too_many_arguments)] -pub(super) fn spawn_pipeline_advance( - agents: Arc>>, - port: u16, - story_id: &str, - agent_name: &str, - completion: CompletionReport, - project_root: Option, - worktree_path: Option, - watcher_tx: broadcast::Sender, - merge_failure_reported: bool, - previous_session_id: Option, -) { - let sid = story_id.to_string(); - let aname = agent_name.to_string(); - tokio::spawn(async move { - let pool = AgentPool { - agents, - port, - child_killers: Arc::new(Mutex::new(HashMap::new())), - watcher_tx, - merge_jobs: Arc::new(Mutex::new(HashMap::new())), - }; - pool.run_pipeline_advance( - &sid, - &aname, - completion, - project_root, - worktree_path, - merge_failure_reported, - previous_session_id, - ) - .await; - }); -} -/// Resolve QA mode from the content store. -fn resolve_qa_mode_from_store( - _project_root: &Path, - story_id: &str, - default: crate::io::story_metadata::QaMode, -) -> crate::io::story_metadata::QaMode { - if let Some(contents) = crate::db::read_content(story_id) { - return crate::io::story_metadata::resolve_qa_mode_from_content(&contents, default); - } - default -} +mod helpers; -/// Write review_hold to the content store. -fn write_review_hold_to_store(story_id: &str) { - if let Some(contents) = crate::db::read_content(story_id) { - let updated = crate::io::story_metadata::write_review_hold_in_content(&contents); - crate::db::write_content(story_id, &updated); - // Also persist to SQLite via shadow write. - let stage = crate::pipeline_state::read_typed(story_id) - .ok() - .flatten() - .map(|i| i.stage.dir_name().to_string()) - .unwrap_or_else(|| "3_qa".to_string()); - crate::db::write_item_with_content(story_id, &stage, &updated); - } else { - slog_error!("[pipeline] Cannot write review_hold for '{story_id}': no content in store"); - } -} +pub(crate) use helpers::{should_block_story, spawn_pipeline_advance}; +use helpers::{resolve_qa_mode_from_store, write_review_hold_to_store}; -/// Increment retry_count and block the story if it exceeds `max_retries`. -/// -/// Returns `Some(reason)` if the story is now blocked (caller should NOT restart the agent). -/// Returns `None` if the story may be retried. -/// When `max_retries` is 0, retry limits are disabled. -pub(crate) fn should_block_story( - story_id: &str, - max_retries: u32, - stage_label: &str, -) -> Option { - use crate::io::story_metadata::{increment_retry_count_in_content, write_blocked_in_content}; - - if max_retries == 0 { - return None; - } - - if let Some(contents) = crate::db::read_content(story_id) { - let (updated, new_count) = increment_retry_count_in_content(&contents); - crate::db::write_content(story_id, &updated); - let stage = crate::pipeline_state::read_typed(story_id) - .ok() - .flatten() - .map(|i| i.stage.dir_name().to_string()) - .unwrap_or_else(|| "2_current".to_string()); - crate::db::write_item_with_content(story_id, &stage, &updated); - - if new_count >= max_retries { - slog_warn!( - "[pipeline] Story '{story_id}' reached retry limit ({new_count}/{max_retries}) \ - at {stage_label} stage. Marking as blocked." - ); - let blocked = write_blocked_in_content(&updated); - crate::db::write_content(story_id, &blocked); - crate::db::write_item_with_content(story_id, &stage, &blocked); - Some(format!( - "Retry limit exceeded ({new_count}/{max_retries}) at {stage_label} stage" - )) - } else { - slog!( - "[pipeline] Story '{story_id}' retry {new_count}/{max_retries} at {stage_label} stage." - ); - None - } - } else { - slog_error!("[pipeline] Failed to read content for '{story_id}' to increment retry_count"); - None - } -} - -#[cfg(test)] mod tests { use super::super::super::AgentPool; use super::super::super::composite_key;