refactor: split agents/pool/pipeline/advance.rs into mod.rs + helpers.rs
The 1353-line advance.rs is split into: - mod.rs: impl AgentPool with run_pipeline_advance + start_mergemaster_or_block + tests (1244 lines) - helpers.rs: spawn_pipeline_advance, resolve_qa_mode_from_store, write_review_hold_to_store, should_block_story (128 lines) Tests stay co-located with run_pipeline_advance which they exercise. No behaviour change. All 10 advance tests pass; full suite green.
This commit is contained in:
@@ -0,0 +1,130 @@
|
|||||||
|
//! Helpers for pipeline-advance: spawning, QA-mode resolution, review-hold writing,
|
||||||
|
//! block-decision logic.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
|
use super::super::super::super::CompletionReport;
|
||||||
|
use super::super::super::{AgentPool, StoryAgent};
|
||||||
|
use crate::io::watcher::WatcherEvent;
|
||||||
|
use crate::{slog, slog_error, slog_warn};
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
/// type cycle between `start_agent` and `run_server_owned_completion`.
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
pub(crate) fn spawn_pipeline_advance(
|
||||||
|
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
|
||||||
|
port: u16,
|
||||||
|
story_id: &str,
|
||||||
|
agent_name: &str,
|
||||||
|
completion: CompletionReport,
|
||||||
|
project_root: Option<PathBuf>,
|
||||||
|
worktree_path: Option<PathBuf>,
|
||||||
|
watcher_tx: broadcast::Sender<WatcherEvent>,
|
||||||
|
merge_failure_reported: bool,
|
||||||
|
previous_session_id: Option<String>,
|
||||||
|
) {
|
||||||
|
let sid = story_id.to_string();
|
||||||
|
let aname = agent_name.to_string();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let pool = AgentPool {
|
||||||
|
agents,
|
||||||
|
port,
|
||||||
|
child_killers: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
watcher_tx,
|
||||||
|
merge_jobs: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
};
|
||||||
|
pool.run_pipeline_advance(
|
||||||
|
&sid,
|
||||||
|
&aname,
|
||||||
|
completion,
|
||||||
|
project_root,
|
||||||
|
worktree_path,
|
||||||
|
merge_failure_reported,
|
||||||
|
previous_session_id,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Resolve QA mode from the content store.
|
||||||
|
pub(super) fn resolve_qa_mode_from_store(
|
||||||
|
_project_root: &Path,
|
||||||
|
story_id: &str,
|
||||||
|
default: crate::io::story_metadata::QaMode,
|
||||||
|
) -> crate::io::story_metadata::QaMode {
|
||||||
|
if let Some(contents) = crate::db::read_content(story_id) {
|
||||||
|
return crate::io::story_metadata::resolve_qa_mode_from_content(&contents, default);
|
||||||
|
}
|
||||||
|
default
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write review_hold to the content store.
|
||||||
|
pub(super) fn write_review_hold_to_store(story_id: &str) {
|
||||||
|
if let Some(contents) = crate::db::read_content(story_id) {
|
||||||
|
let updated = crate::io::story_metadata::write_review_hold_in_content(&contents);
|
||||||
|
crate::db::write_content(story_id, &updated);
|
||||||
|
// Also persist to SQLite via shadow write.
|
||||||
|
let stage = crate::pipeline_state::read_typed(story_id)
|
||||||
|
.ok()
|
||||||
|
.flatten()
|
||||||
|
.map(|i| i.stage.dir_name().to_string())
|
||||||
|
.unwrap_or_else(|| "3_qa".to_string());
|
||||||
|
crate::db::write_item_with_content(story_id, &stage, &updated);
|
||||||
|
} else {
|
||||||
|
slog_error!("[pipeline] Cannot write review_hold for '{story_id}': no content in store");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Increment retry_count and block the story if it exceeds `max_retries`.
|
||||||
|
///
|
||||||
|
/// Returns `Some(reason)` if the story is now blocked (caller should NOT restart the agent).
|
||||||
|
/// Returns `None` if the story may be retried.
|
||||||
|
/// When `max_retries` is 0, retry limits are disabled.
|
||||||
|
pub(crate) fn should_block_story(
|
||||||
|
story_id: &str,
|
||||||
|
max_retries: u32,
|
||||||
|
stage_label: &str,
|
||||||
|
) -> Option<String> {
|
||||||
|
use crate::io::story_metadata::{increment_retry_count_in_content, write_blocked_in_content};
|
||||||
|
|
||||||
|
if max_retries == 0 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(contents) = crate::db::read_content(story_id) {
|
||||||
|
let (updated, new_count) = increment_retry_count_in_content(&contents);
|
||||||
|
crate::db::write_content(story_id, &updated);
|
||||||
|
let stage = crate::pipeline_state::read_typed(story_id)
|
||||||
|
.ok()
|
||||||
|
.flatten()
|
||||||
|
.map(|i| i.stage.dir_name().to_string())
|
||||||
|
.unwrap_or_else(|| "2_current".to_string());
|
||||||
|
crate::db::write_item_with_content(story_id, &stage, &updated);
|
||||||
|
|
||||||
|
if new_count >= max_retries {
|
||||||
|
slog_warn!(
|
||||||
|
"[pipeline] Story '{story_id}' reached retry limit ({new_count}/{max_retries}) \
|
||||||
|
at {stage_label} stage. Marking as blocked."
|
||||||
|
);
|
||||||
|
let blocked = write_blocked_in_content(&updated);
|
||||||
|
crate::db::write_content(story_id, &blocked);
|
||||||
|
crate::db::write_item_with_content(story_id, &stage, &blocked);
|
||||||
|
Some(format!(
|
||||||
|
"Retry limit exceeded ({new_count}/{max_retries}) at {stage_label} stage"
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
slog!(
|
||||||
|
"[pipeline] Story '{story_id}' retry {new_count}/{max_retries} at {stage_label} stage."
|
||||||
|
);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
slog_error!("[pipeline] Failed to read content for '{story_id}' to increment retry_count");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
+3
-112
@@ -483,121 +483,12 @@ impl AgentPool {
|
|||||||
///
|
///
|
||||||
/// This is a **non-async** function so it does not participate in the opaque
|
/// This is a **non-async** function so it does not participate in the opaque
|
||||||
/// type cycle between `start_agent` and `run_server_owned_completion`.
|
/// type cycle between `start_agent` and `run_server_owned_completion`.
|
||||||
#[allow(clippy::too_many_arguments)]
|
|
||||||
pub(super) fn spawn_pipeline_advance(
|
|
||||||
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
|
|
||||||
port: u16,
|
|
||||||
story_id: &str,
|
|
||||||
agent_name: &str,
|
|
||||||
completion: CompletionReport,
|
|
||||||
project_root: Option<PathBuf>,
|
|
||||||
worktree_path: Option<PathBuf>,
|
|
||||||
watcher_tx: broadcast::Sender<WatcherEvent>,
|
|
||||||
merge_failure_reported: bool,
|
|
||||||
previous_session_id: Option<String>,
|
|
||||||
) {
|
|
||||||
let sid = story_id.to_string();
|
|
||||||
let aname = agent_name.to_string();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let pool = AgentPool {
|
|
||||||
agents,
|
|
||||||
port,
|
|
||||||
child_killers: Arc::new(Mutex::new(HashMap::new())),
|
|
||||||
watcher_tx,
|
|
||||||
merge_jobs: Arc::new(Mutex::new(HashMap::new())),
|
|
||||||
};
|
|
||||||
pool.run_pipeline_advance(
|
|
||||||
&sid,
|
|
||||||
&aname,
|
|
||||||
completion,
|
|
||||||
project_root,
|
|
||||||
worktree_path,
|
|
||||||
merge_failure_reported,
|
|
||||||
previous_session_id,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Resolve QA mode from the content store.
|
mod helpers;
|
||||||
fn resolve_qa_mode_from_store(
|
|
||||||
_project_root: &Path,
|
|
||||||
story_id: &str,
|
|
||||||
default: crate::io::story_metadata::QaMode,
|
|
||||||
) -> crate::io::story_metadata::QaMode {
|
|
||||||
if let Some(contents) = crate::db::read_content(story_id) {
|
|
||||||
return crate::io::story_metadata::resolve_qa_mode_from_content(&contents, default);
|
|
||||||
}
|
|
||||||
default
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Write review_hold to the content store.
|
pub(crate) use helpers::{should_block_story, spawn_pipeline_advance};
|
||||||
fn write_review_hold_to_store(story_id: &str) {
|
use helpers::{resolve_qa_mode_from_store, write_review_hold_to_store};
|
||||||
if let Some(contents) = crate::db::read_content(story_id) {
|
|
||||||
let updated = crate::io::story_metadata::write_review_hold_in_content(&contents);
|
|
||||||
crate::db::write_content(story_id, &updated);
|
|
||||||
// Also persist to SQLite via shadow write.
|
|
||||||
let stage = crate::pipeline_state::read_typed(story_id)
|
|
||||||
.ok()
|
|
||||||
.flatten()
|
|
||||||
.map(|i| i.stage.dir_name().to_string())
|
|
||||||
.unwrap_or_else(|| "3_qa".to_string());
|
|
||||||
crate::db::write_item_with_content(story_id, &stage, &updated);
|
|
||||||
} else {
|
|
||||||
slog_error!("[pipeline] Cannot write review_hold for '{story_id}': no content in store");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Increment retry_count and block the story if it exceeds `max_retries`.
|
|
||||||
///
|
|
||||||
/// Returns `Some(reason)` if the story is now blocked (caller should NOT restart the agent).
|
|
||||||
/// Returns `None` if the story may be retried.
|
|
||||||
/// When `max_retries` is 0, retry limits are disabled.
|
|
||||||
pub(crate) fn should_block_story(
|
|
||||||
story_id: &str,
|
|
||||||
max_retries: u32,
|
|
||||||
stage_label: &str,
|
|
||||||
) -> Option<String> {
|
|
||||||
use crate::io::story_metadata::{increment_retry_count_in_content, write_blocked_in_content};
|
|
||||||
|
|
||||||
if max_retries == 0 {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(contents) = crate::db::read_content(story_id) {
|
|
||||||
let (updated, new_count) = increment_retry_count_in_content(&contents);
|
|
||||||
crate::db::write_content(story_id, &updated);
|
|
||||||
let stage = crate::pipeline_state::read_typed(story_id)
|
|
||||||
.ok()
|
|
||||||
.flatten()
|
|
||||||
.map(|i| i.stage.dir_name().to_string())
|
|
||||||
.unwrap_or_else(|| "2_current".to_string());
|
|
||||||
crate::db::write_item_with_content(story_id, &stage, &updated);
|
|
||||||
|
|
||||||
if new_count >= max_retries {
|
|
||||||
slog_warn!(
|
|
||||||
"[pipeline] Story '{story_id}' reached retry limit ({new_count}/{max_retries}) \
|
|
||||||
at {stage_label} stage. Marking as blocked."
|
|
||||||
);
|
|
||||||
let blocked = write_blocked_in_content(&updated);
|
|
||||||
crate::db::write_content(story_id, &blocked);
|
|
||||||
crate::db::write_item_with_content(story_id, &stage, &blocked);
|
|
||||||
Some(format!(
|
|
||||||
"Retry limit exceeded ({new_count}/{max_retries}) at {stage_label} stage"
|
|
||||||
))
|
|
||||||
} else {
|
|
||||||
slog!(
|
|
||||||
"[pipeline] Story '{story_id}' retry {new_count}/{max_retries} at {stage_label} stage."
|
|
||||||
);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
slog_error!("[pipeline] Failed to read content for '{story_id}' to increment retry_count");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::super::super::AgentPool;
|
use super::super::super::AgentPool;
|
||||||
use super::super::super::composite_key;
|
use super::super::super::composite_key;
|
||||||
Reference in New Issue
Block a user