//! Helpers for pipeline-advance: spawning, QA-mode resolution, review-hold writing, //! block-decision logic. use std::collections::HashMap; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; use super::super::super::super::CompletionReport; use super::super::super::{AgentPool, StoryAgent}; use crate::io::watcher::WatcherEvent; use crate::{slog, slog_error, slog_warn}; use std::path::Path; /// type cycle between `start_agent` and `run_server_owned_completion`. #[allow(clippy::too_many_arguments)] pub(crate) fn spawn_pipeline_advance( agents: Arc>>, port: u16, story_id: &str, agent_name: &str, completion: CompletionReport, project_root: Option, worktree_path: Option, watcher_tx: broadcast::Sender, merge_failure_reported: bool, previous_session_id: Option, ) { let sid = story_id.to_string(); let aname = agent_name.to_string(); tokio::spawn(async move { let pool = AgentPool { agents, port, child_killers: Arc::new(Mutex::new(HashMap::new())), watcher_tx, merge_jobs: Arc::new(Mutex::new(HashMap::new())), }; pool.run_pipeline_advance( &sid, &aname, completion, project_root, worktree_path, merge_failure_reported, previous_session_id, ) .await; }); } /// Resolve QA mode from the content store. pub(super) fn resolve_qa_mode_from_store( _project_root: &Path, story_id: &str, default: crate::io::story_metadata::QaMode, ) -> crate::io::story_metadata::QaMode { if let Some(contents) = crate::db::read_content(story_id) { return crate::io::story_metadata::resolve_qa_mode_from_content(&contents, default); } default } /// Write review_hold to the content store. pub(super) fn write_review_hold_to_store(story_id: &str) { if let Some(contents) = crate::db::read_content(story_id) { let updated = crate::io::story_metadata::write_review_hold_in_content(&contents); crate::db::write_content(story_id, &updated); // Also persist to SQLite via shadow write. let stage = crate::pipeline_state::read_typed(story_id) .ok() .flatten() .map(|i| i.stage.dir_name().to_string()) .unwrap_or_else(|| "3_qa".to_string()); crate::db::write_item_with_content(story_id, &stage, &updated); } else { slog_error!("[pipeline] Cannot write review_hold for '{story_id}': no content in store"); } } /// Increment retry_count and block the story if it exceeds `max_retries`. /// /// Returns `Some(reason)` if the story is now blocked (caller should NOT restart the agent). /// Returns `None` if the story may be retried. /// When `max_retries` is 0, retry limits are disabled. pub(crate) fn should_block_story( story_id: &str, max_retries: u32, stage_label: &str, ) -> Option { use crate::io::story_metadata::{increment_retry_count_in_content, write_blocked_in_content}; if max_retries == 0 { return None; } if let Some(contents) = crate::db::read_content(story_id) { let (updated, new_count) = increment_retry_count_in_content(&contents); crate::db::write_content(story_id, &updated); let stage = crate::pipeline_state::read_typed(story_id) .ok() .flatten() .map(|i| i.stage.dir_name().to_string()) .unwrap_or_else(|| "2_current".to_string()); crate::db::write_item_with_content(story_id, &stage, &updated); if new_count >= max_retries { slog_warn!( "[pipeline] Story '{story_id}' reached retry limit ({new_count}/{max_retries}) \ at {stage_label} stage. Marking as blocked." ); let blocked = write_blocked_in_content(&updated); crate::db::write_content(story_id, &blocked); crate::db::write_item_with_content(story_id, &stage, &blocked); Some(format!( "Retry limit exceeded ({new_count}/{max_retries}) at {stage_label} stage" )) } else { slog!( "[pipeline] Story '{story_id}' retry {new_count}/{max_retries} at {stage_label} stage." ); None } } else { slog_error!("[pipeline] Failed to read content for '{story_id}' to increment retry_count"); None } }