842 lines
30 KiB
Rust
842 lines
30 KiB
Rust
//! Read API for pipeline items, dump introspection, and dependency helpers.
|
|
|
|
#![allow(unused_imports, dead_code)]
|
|
use std::collections::HashMap;
|
|
|
|
use bft_json_crdt::json_crdt::*;
|
|
use bft_json_crdt::op::{OpId, ROOT_ID};
|
|
|
|
use super::state::{all_ops_lock, apply_and_persist, get_crdt, rebuild_index};
|
|
use super::types::{PipelineDoc, PipelineItemCrdt, PipelineItemView};
|
|
|
|
// ── Debug dump ───────────────────────────────────────────────────────
|
|
|
|
/// A raw dump of a single CRDT list entry, including deleted items.
|
|
///
|
|
/// Use `content_index` (hex of the list insert `OpId`) to cross-reference
|
|
/// with rows in the `crdt_ops` SQLite table.
|
|
pub struct CrdtItemDump {
|
|
pub story_id: Option<String>,
|
|
pub stage: Option<String>,
|
|
pub name: Option<String>,
|
|
pub agent: Option<String>,
|
|
pub retry_count: Option<i64>,
|
|
pub depends_on: Option<Vec<u32>>,
|
|
/// Agent name holding the claim, or `None` when unclaimed.
|
|
pub claim_agent: Option<String>,
|
|
/// Unix timestamp (seconds) when the claim was written.
|
|
pub claim_ts: Option<f64>,
|
|
/// Hex-encoded OpId of the list insert op — cross-reference with `crdt_ops`.
|
|
pub content_index: String,
|
|
pub is_deleted: bool,
|
|
}
|
|
|
|
/// Top-level debug dump of the in-memory CRDT state.
|
|
pub struct CrdtStateDump {
|
|
pub in_memory_state_loaded: bool,
|
|
/// Count of non-deleted items with a valid story_id and stage.
|
|
pub total_items: usize,
|
|
/// Total list-level ops seen (excludes root sentinel).
|
|
pub total_ops_in_list: usize,
|
|
/// Highest Lamport sequence number seen across all list-level ops.
|
|
pub max_seq_in_list: u64,
|
|
/// Count of ops in the ALL_OPS journal (persisted ops replayed at startup).
|
|
pub persisted_ops_count: usize,
|
|
pub items: Vec<CrdtItemDump>,
|
|
}
|
|
|
|
/// Dump the raw in-memory CRDT state for debugging.
|
|
///
|
|
/// Unlike [`read_all_items`] this includes tombstoned (deleted) entries and
|
|
/// exposes internal op metadata (content_index, seq). Pass a `story_id`
|
|
/// filter to restrict the output to a single item.
|
|
///
|
|
/// **This is a debug tool.** For normal pipeline introspection use
|
|
/// [`read_all_items`] or the `get_pipeline_status` MCP tool instead.
|
|
pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump {
|
|
let in_memory_state_loaded = get_crdt().is_some();
|
|
|
|
let persisted_ops_count = all_ops_lock()
|
|
.and_then(|m| m.lock().ok().map(|v| v.len()))
|
|
.unwrap_or(0);
|
|
|
|
let Some(state_mutex) = get_crdt() else {
|
|
return CrdtStateDump {
|
|
in_memory_state_loaded,
|
|
total_items: 0,
|
|
total_ops_in_list: 0,
|
|
max_seq_in_list: 0,
|
|
persisted_ops_count,
|
|
items: Vec::new(),
|
|
};
|
|
};
|
|
|
|
let Ok(state) = state_mutex.lock() else {
|
|
return CrdtStateDump {
|
|
in_memory_state_loaded,
|
|
total_items: 0,
|
|
total_ops_in_list: 0,
|
|
max_seq_in_list: 0,
|
|
persisted_ops_count,
|
|
items: Vec::new(),
|
|
};
|
|
};
|
|
|
|
let total_items = state.crdt.doc.items.iter().count();
|
|
|
|
let max_seq_in_list = state
|
|
.crdt
|
|
.doc
|
|
.items
|
|
.ops
|
|
.iter()
|
|
.map(|op| op.seq)
|
|
.max()
|
|
.unwrap_or(0);
|
|
|
|
// Subtract 1 for the root sentinel.
|
|
let total_ops_in_list = state.crdt.doc.items.ops.len().saturating_sub(1);
|
|
|
|
let mut items = Vec::new();
|
|
for op in &state.crdt.doc.items.ops {
|
|
// Skip root sentinel (id == [0u8; 32]).
|
|
if op.id == ROOT_ID {
|
|
continue;
|
|
}
|
|
let Some(ref item_crdt) = op.content else {
|
|
// No content — skip (orphaned slot, should not happen in normal use).
|
|
continue;
|
|
};
|
|
|
|
let story_id = match item_crdt.story_id.view() {
|
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
|
_ => None,
|
|
};
|
|
|
|
// Apply story_id filter before doing any further work.
|
|
if let Some(filter) = story_id_filter
|
|
&& story_id.as_deref() != Some(filter)
|
|
{
|
|
continue;
|
|
}
|
|
|
|
let stage = match item_crdt.stage.view() {
|
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
|
_ => None,
|
|
};
|
|
let name = match item_crdt.name.view() {
|
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
|
_ => None,
|
|
};
|
|
let agent = match item_crdt.agent.view() {
|
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
|
_ => None,
|
|
};
|
|
let retry_count = match item_crdt.retry_count.view() {
|
|
JsonValue::Number(n) => Some(n as i64),
|
|
_ => None,
|
|
};
|
|
let depends_on = match item_crdt.depends_on.view() {
|
|
JsonValue::String(s) if !s.is_empty() => serde_json::from_str::<Vec<u32>>(&s).ok(),
|
|
_ => None,
|
|
};
|
|
|
|
let claim_agent = match item_crdt.claim_agent.view() {
|
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
|
_ => None,
|
|
};
|
|
let claim_ts = match item_crdt.claim_ts.view() {
|
|
JsonValue::Number(n) if n > 0.0 => Some(n),
|
|
_ => None,
|
|
};
|
|
|
|
let content_index = op.id.iter().map(|b| format!("{b:02x}")).collect::<String>();
|
|
|
|
items.push(CrdtItemDump {
|
|
story_id,
|
|
stage,
|
|
name,
|
|
agent,
|
|
retry_count,
|
|
depends_on,
|
|
claim_agent,
|
|
claim_ts,
|
|
content_index,
|
|
is_deleted: op.is_deleted,
|
|
});
|
|
}
|
|
|
|
CrdtStateDump {
|
|
in_memory_state_loaded,
|
|
total_items,
|
|
total_ops_in_list,
|
|
max_seq_in_list,
|
|
persisted_ops_count,
|
|
items,
|
|
}
|
|
}
|
|
|
|
// ── Read path ────────────────────────────────────────────────────────
|
|
|
|
/// Read the full pipeline state from the CRDT document.
|
|
///
|
|
/// Returns items grouped by stage, or `None` if the CRDT layer is not
|
|
/// initialised.
|
|
pub fn read_all_items() -> Option<Vec<PipelineItemView>> {
|
|
let state_mutex = get_crdt()?;
|
|
let state = state_mutex.lock().ok()?;
|
|
|
|
// Only return items that appear in the deduplicated index.
|
|
// The index maps story_id → visible_index and represents the
|
|
// latest-wins view of each story. Iterating raw CRDT entries
|
|
// would return stale duplicates from earlier stage writes.
|
|
let mut items = Vec::with_capacity(state.index.len());
|
|
for &idx in state.index.values() {
|
|
if let Some(view) = extract_item_view(&state.crdt.doc.items[idx]) {
|
|
items.push(view);
|
|
}
|
|
}
|
|
Some(items)
|
|
}
|
|
|
|
/// Read a single pipeline item by story_id.
|
|
pub fn read_item(story_id: &str) -> Option<PipelineItemView> {
|
|
let state_mutex = get_crdt()?;
|
|
let state = state_mutex.lock().ok()?;
|
|
let &idx = state.index.get(story_id)?;
|
|
extract_item_view(&state.crdt.doc.items[idx])
|
|
}
|
|
|
|
/// Return `true` if `story_id` has been tombstoned via `evict_item`.
|
|
///
|
|
/// Tombstoned IDs are invisible to `read_item` / `read_all_items` but `write_item`
|
|
/// silently rejects writes to them. Callers that allocate fresh IDs (notably
|
|
/// `db::next_item_number`) must consult this to avoid handing out a tombstoned ID,
|
|
/// which would cause a silent half-write split-brain (bug 1001).
|
|
pub fn is_tombstoned(story_id: &str) -> bool {
|
|
let Some(state_mutex) = get_crdt() else {
|
|
return false;
|
|
};
|
|
let Ok(state) = state_mutex.lock() else {
|
|
return false;
|
|
};
|
|
state.tombstones.contains(story_id)
|
|
}
|
|
|
|
/// Snapshot of all tombstoned story IDs.
|
|
///
|
|
/// Used by `db::next_item_number` to compute the highest tombstoned numeric ID
|
|
/// so the allocator can skip past it. Returns an empty vec if the CRDT layer
|
|
/// is not initialised.
|
|
pub fn tombstoned_ids() -> Vec<String> {
|
|
let Some(state_mutex) = get_crdt() else {
|
|
return Vec::new();
|
|
};
|
|
let Ok(state) = state_mutex.lock() else {
|
|
return Vec::new();
|
|
};
|
|
state.tombstones.iter().cloned().collect()
|
|
}
|
|
|
|
/// Mark a story as deleted in the in-memory CRDT and persist a tombstone op.
|
|
///
|
|
/// This is the eviction primitive for story 521 — it lets external callers
|
|
/// (e.g. the `purge_story` MCP tool, or operator scripts during incident
|
|
/// response) clear an item from the running server's in-memory state
|
|
/// without needing a full process restart.
|
|
///
|
|
/// Specifically:
|
|
/// 1. Scans the items list for ALL non-deleted ops whose `story_id` register
|
|
/// matches the target. Under concurrent CRDT ops, two nodes can each
|
|
/// insert the same `story_id` independently; this produces duplicate list
|
|
/// entries. Using the stable `OpId` from a direct scan (not a positional
|
|
/// index translation) ensures every duplicate is tombstoned, so none can
|
|
/// survive CRDT replay after the deletion.
|
|
/// 2. Constructs a delete op per matching entry via the bft-json-crdt list
|
|
/// `delete()` primitive, signs it with the local keypair, and applies it
|
|
/// to the in-memory CRDT (marking each entry `is_deleted = true`).
|
|
/// 3. Persists every signed delete op to `crdt_ops` so the evictions survive
|
|
/// a restart.
|
|
/// 4. Rebuilds the `story_id → visible_index` map.
|
|
/// 5. Adds the story_id to the tombstone set so future `write_item` calls
|
|
/// cannot resurrect it.
|
|
/// 6. Drops the in-memory content-store entry.
|
|
///
|
|
/// Returns `Ok(())` if at least one matching entry was found and tombstoned,
|
|
/// or an `Err` if the CRDT layer isn't initialised or the story_id is
|
|
/// unknown to the in-memory state.
|
|
pub fn evict_item(story_id: &str) -> Result<(), String> {
|
|
let state_mutex = get_crdt().ok_or_else(|| "CRDT layer not initialised".to_string())?;
|
|
let mut state = state_mutex
|
|
.lock()
|
|
.map_err(|e| format!("CRDT lock poisoned: {e}"))?;
|
|
|
|
if !state.index.contains_key(story_id) {
|
|
return Err(format!("Story '{story_id}' not found in in-memory CRDT"));
|
|
}
|
|
|
|
// Collect the stable OpId of every non-deleted list entry whose story_id
|
|
// register matches. Two nodes can concurrently insert the same story_id
|
|
// (each seeing an empty local index at the time of the insert), which
|
|
// leaves duplicate entries in the CRDT list. The visible index (HashMap)
|
|
// keeps only the last-seen entry; without this full scan the earlier
|
|
// concurrent insert would survive the deletion and resurface after replay.
|
|
let item_op_ids: Vec<OpId> = state
|
|
.crdt
|
|
.doc
|
|
.items
|
|
.ops
|
|
.iter()
|
|
.filter(|op| {
|
|
!op.is_deleted
|
|
&& op.content.as_ref().is_some_and(|item| {
|
|
matches!(item.story_id.view(), JsonValue::String(ref s) if s == story_id)
|
|
})
|
|
})
|
|
.map(|op| op.id)
|
|
.collect();
|
|
|
|
if item_op_ids.is_empty() {
|
|
return Err(format!("Story '{story_id}' not found in CRDT ops list"));
|
|
}
|
|
|
|
// Delete every matching op so no duplicate can survive CRDT replay.
|
|
for item_op_id in item_op_ids {
|
|
apply_and_persist(&mut state, |s| s.crdt.doc.items.delete(item_op_id));
|
|
}
|
|
|
|
// Rebuild the story_id → visible_index map; deleted items are no longer
|
|
// counted by the iter that rebuild_index uses.
|
|
state.index = rebuild_index(&state.crdt);
|
|
|
|
// Record the tombstone so that any future write_item call for this
|
|
// story_id is rejected even if the index no longer contains it.
|
|
state.tombstones.insert(story_id.to_string());
|
|
|
|
// Drop the content-store entry so the cached body doesn't outlive the
|
|
// CRDT entry. (Bug 521 follow-up: when CONTENT_STORE becomes a true
|
|
// lazy cache, this explicit eviction can go away.)
|
|
crate::db::delete_content(crate::db::ContentKey::Story(story_id));
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Extract a `PipelineItemView` from a `PipelineItemCrdt`.
|
|
///
|
|
/// Projects the loose CRDT `stage` register into a typed
|
|
/// [`crate::pipeline_state::Stage`]. Items with an unknown or missing stage
|
|
/// string, or with no name set, are filtered out (`None`) — a nameless item
|
|
/// is treated as malformed and never surfaces to callers.
|
|
pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemView> {
|
|
use super::types::EpicId;
|
|
use crate::io::story_metadata::{ItemType, QaMode};
|
|
|
|
let story_id = match item.story_id.view() {
|
|
JsonValue::String(s) if !s.is_empty() => s,
|
|
_ => return None,
|
|
};
|
|
let stage_str = match item.stage.view() {
|
|
JsonValue::String(s) if !s.is_empty() => s,
|
|
_ => return None,
|
|
};
|
|
// AC 5: nameless item = malformed; filter it out.
|
|
let name = match item.name.view() {
|
|
JsonValue::String(s) if !s.is_empty() => s,
|
|
_ => return None,
|
|
};
|
|
let agent = match item.agent.view() {
|
|
JsonValue::String(s) if !s.is_empty() => s.parse::<crate::config::AgentName>().ok(),
|
|
_ => None,
|
|
};
|
|
let retry_count_register = match item.retry_count.view() {
|
|
JsonValue::Number(n) if n >= 0.0 => n as u32,
|
|
_ => 0u32,
|
|
};
|
|
let depends_on = match item.depends_on.view() {
|
|
JsonValue::String(s) if !s.is_empty() => {
|
|
serde_json::from_str::<Vec<u32>>(&s).unwrap_or_default()
|
|
}
|
|
_ => Vec::new(),
|
|
};
|
|
|
|
// `claim_agent`/`claim_ts` are read only to embed in Stage::Coding /
|
|
// Stage::Merge via `project_stage_for_view`; they are not stored on
|
|
// `WorkItem` directly (story 1009: readers project from the Stage variant).
|
|
let claim_agent = match item.claim_agent.view() {
|
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
|
_ => None,
|
|
};
|
|
let claim_ts_secs = match item.claim_ts.view() {
|
|
JsonValue::Number(n) if n > 0.0 => Some(n as u64),
|
|
_ => None,
|
|
};
|
|
|
|
// `merged_at` is read only to project into `Stage::Done`; it is not
|
|
// stored on `WorkItem` (callers access it via `Stage::Done { merged_at }`).
|
|
let merged_at_float = match item.merged_at.view() {
|
|
JsonValue::Number(n) if n > 0.0 => Some(n),
|
|
_ => None,
|
|
};
|
|
|
|
let qa_mode = match item.qa_mode.view() {
|
|
JsonValue::String(s) if !s.is_empty() => QaMode::from_str(&s),
|
|
_ => None,
|
|
};
|
|
|
|
let item_type = match item.item_type.view() {
|
|
JsonValue::String(s) if !s.is_empty() => ItemType::from_str(&s),
|
|
_ => None,
|
|
};
|
|
|
|
let epic = match item.epic.view() {
|
|
JsonValue::String(s) if !s.is_empty() => EpicId::from_crdt_str(&s),
|
|
_ => None,
|
|
};
|
|
|
|
let resume_to = match item.resume_to.view() {
|
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
|
_ => None,
|
|
};
|
|
|
|
let plan_state_str = match item.plan_state.view() {
|
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
|
_ => None,
|
|
};
|
|
|
|
let merge_server_start = match item.merge_server_start.view() {
|
|
JsonValue::Number(n) if n > 0.0 => Some(n),
|
|
_ => None,
|
|
};
|
|
|
|
let stage = project_stage_for_view(
|
|
&stage_str,
|
|
&story_id,
|
|
merged_at_float,
|
|
resume_to.as_deref(),
|
|
claim_agent.as_deref(),
|
|
claim_ts_secs,
|
|
plan_state_str.as_deref(),
|
|
retry_count_register,
|
|
merge_server_start,
|
|
)?;
|
|
|
|
Some(PipelineItemView {
|
|
story_id,
|
|
stage,
|
|
name,
|
|
agent,
|
|
depends_on,
|
|
qa_mode,
|
|
item_type,
|
|
epic,
|
|
})
|
|
}
|
|
|
|
/// Project the loose `stage` string from the CRDT into a typed
|
|
/// [`crate::pipeline_state::Stage`].
|
|
///
|
|
/// Rich variants synthesise payload fields from sibling registers (or sane
|
|
/// defaults). Returns `None` for unknown stage strings — the read path drops
|
|
/// the entry so no caller ever sees a stage it can't pattern-match against.
|
|
///
|
|
/// Accepts BOTH the clean post-934 wire vocabulary (`"backlog"`, `"coding"`,
|
|
/// `"qa"`, etc.) and pre-934 directory-style strings (`"1_backlog"`,
|
|
/// `"2_current"`, etc.) — legacy strings are normalised to their clean form
|
|
/// before the typed projection. This keeps remote ops from older nodes (and
|
|
/// raw-CRDT test inserts that bypass `migrate_legacy_stage_strings`) from
|
|
/// silently disappearing from the typed read path.
|
|
#[allow(clippy::too_many_arguments)]
|
|
fn project_stage_for_view(
|
|
stage_str: &str,
|
|
story_id: &str,
|
|
merged_at: Option<f64>,
|
|
resume_to: Option<&str>,
|
|
claim_agent: Option<&str>,
|
|
claim_ts_secs: Option<u64>,
|
|
plan_state_str: Option<&str>,
|
|
retries: u32,
|
|
merge_server_start: Option<f64>,
|
|
) -> Option<crate::pipeline_state::Stage> {
|
|
use crate::pipeline_state::{
|
|
AgentClaim, AgentName, ArchiveReason, BranchName, GitSha, PlanState, Stage,
|
|
};
|
|
use chrono::{DateTime, TimeZone, Utc};
|
|
use std::num::NonZeroU32;
|
|
|
|
// Normalise legacy directory-style strings to their clean wire form so
|
|
// the match below stays single-shape.
|
|
let clean = match stage_str {
|
|
"0_upcoming" => "upcoming",
|
|
"1_backlog" => "backlog",
|
|
"2_current" => "coding",
|
|
"2_blocked" => "blocked",
|
|
"3_qa" => "qa",
|
|
"4_merge" => "merge",
|
|
"4_merge_failure" => "merge_failure",
|
|
"5_done" => "done",
|
|
"6_archived" => "archived",
|
|
// Pre-934 `7_frozen` collapses to backlog (the frozen flag is an
|
|
// orthogonal CRDT register since story 934 stage 4).
|
|
"7_frozen" => "backlog",
|
|
other => other,
|
|
};
|
|
|
|
// Story 945: resume target for `Frozen` / `ReviewHold` variants is stored
|
|
// in the sibling `resume_to` register. Fall back to `Coding` when the
|
|
// register is empty or holds an unrecognised value.
|
|
let resume_target = || -> Box<Stage> {
|
|
Box::new(
|
|
resume_to
|
|
.and_then(Stage::from_dir)
|
|
.unwrap_or(Stage::Coding {
|
|
claim: None,
|
|
plan: PlanState::Missing,
|
|
retries: 0,
|
|
}),
|
|
)
|
|
};
|
|
|
|
// Story 1009: reconstruct AgentClaim from `claim_agent`/`claim_ts` registers.
|
|
let claim = match (claim_agent, claim_ts_secs) {
|
|
(Some(agent_str), Some(ts)) => Some(AgentClaim {
|
|
agent: AgentName(agent_str.to_string()),
|
|
claimed_at: Utc
|
|
.timestamp_opt(ts as i64, 0)
|
|
.single()
|
|
.unwrap_or(DateTime::<Utc>::UNIX_EPOCH),
|
|
}),
|
|
_ => None,
|
|
};
|
|
|
|
match clean {
|
|
"upcoming" => Some(Stage::Upcoming),
|
|
"backlog" => Some(Stage::Backlog),
|
|
"coding" => Some(Stage::Coding {
|
|
claim,
|
|
plan: PlanState::from_str(plan_state_str.unwrap_or("")),
|
|
retries,
|
|
}),
|
|
"qa" => Some(Stage::Qa),
|
|
"blocked" => Some(Stage::Blocked {
|
|
reason: resume_to.unwrap_or("").to_string(),
|
|
}),
|
|
"merge" => Some(Stage::Merge {
|
|
feature_branch: BranchName(format!("feature/story-{story_id}")),
|
|
commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"),
|
|
claim,
|
|
retries,
|
|
server_start_time: merge_server_start,
|
|
}),
|
|
"merge_failure" => {
|
|
// Story 986: read the typed kind directly from ContentKey::MergeFailureKind
|
|
// (written since 986) so no substring-scanning is needed.
|
|
// Fall back to infer_from_gate_output for data persisted pre-986.
|
|
let kind = crate::db::read_content(crate::db::ContentKey::MergeFailureKind(story_id))
|
|
.and_then(|s| {
|
|
serde_json::from_str::<crate::pipeline_state::MergeFailureKind>(&s).ok()
|
|
})
|
|
.or_else(|| {
|
|
crate::db::read_content(crate::db::ContentKey::GateOutput(story_id)).map(|s| {
|
|
crate::pipeline_state::MergeFailureKind::infer_from_gate_output(&s)
|
|
})
|
|
})
|
|
.unwrap_or(crate::pipeline_state::MergeFailureKind::Other(String::new()));
|
|
Some(Stage::MergeFailure {
|
|
kind,
|
|
feature_branch: BranchName(format!("feature/story-{story_id}")),
|
|
commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"),
|
|
})
|
|
}
|
|
"merge_failure_final" => Some(Stage::MergeFailureFinal {
|
|
kind: crate::pipeline_state::MergeFailureKind::Other(String::new()),
|
|
}),
|
|
"frozen" => Some(Stage::Frozen {
|
|
resume_to: resume_target(),
|
|
}),
|
|
"review_hold" => Some(Stage::ReviewHold {
|
|
resume_to: resume_target(),
|
|
reason: String::new(),
|
|
}),
|
|
"done" => {
|
|
let merged_at = merged_at
|
|
.map(|ts| {
|
|
DateTime::from_timestamp(ts as i64, 0).unwrap_or(DateTime::<Utc>::UNIX_EPOCH)
|
|
})
|
|
.unwrap_or(DateTime::<Utc>::UNIX_EPOCH);
|
|
Some(Stage::Done {
|
|
merged_at,
|
|
merge_commit: GitSha("legacy".to_string()),
|
|
})
|
|
}
|
|
"archived" => Some(Stage::Archived {
|
|
archived_at: Utc::now(),
|
|
reason: ArchiveReason::Completed,
|
|
}),
|
|
"abandoned" => Some(Stage::Abandoned { ts: Utc::now() }),
|
|
"superseded" => Some(Stage::Superseded {
|
|
ts: Utc::now(),
|
|
superseded_by: crate::pipeline_state::StoryId(resume_to.unwrap_or("").to_string()),
|
|
}),
|
|
"rejected" => Some(Stage::Rejected {
|
|
ts: Utc::now(),
|
|
reason: resume_to.unwrap_or("").to_string(),
|
|
}),
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
/// Check whether a dependency (by numeric ID prefix) is in `5_done` or `6_archived`
|
|
/// according to CRDT state.
|
|
///
|
|
/// Returns `true` if the dependency is satisfied (item found in a done stage).
|
|
/// Matches both legacy slug-form IDs (`"664_story_foo"`) and numeric-only IDs
|
|
/// (`"664"`) so the check remains correct after the slug→numeric migration.
|
|
/// See `dep_is_archived_crdt` to distinguish archive-satisfied from cleanly-done.
|
|
pub fn dep_is_done_crdt(dep_number: u32) -> bool {
|
|
use crate::pipeline_state::{Stage, read_all_typed};
|
|
let exact = dep_number.to_string();
|
|
let prefix = format!("{dep_number}_");
|
|
read_all_typed().into_iter().any(|item| {
|
|
(item.story_id.0 == exact || item.story_id.0.starts_with(&prefix))
|
|
&& matches!(
|
|
item.stage,
|
|
Stage::Done { .. }
|
|
| Stage::Archived { .. }
|
|
| Stage::Abandoned { .. }
|
|
| Stage::Superseded { .. }
|
|
| Stage::Rejected { .. }
|
|
)
|
|
})
|
|
}
|
|
|
|
/// Check whether a dependency (by numeric ID prefix) is specifically in `6_archived`
|
|
/// according to CRDT state.
|
|
///
|
|
/// Used to detect when a dependency is satisfied via archive rather than via a clean
|
|
/// completion through `5_done`. Returns `false` when the CRDT layer is not initialised.
|
|
/// Matches both legacy slug-form IDs (`"664_story_foo"`) and numeric-only IDs (`"664"`).
|
|
pub fn dep_is_archived_crdt(dep_number: u32) -> bool {
|
|
use crate::pipeline_state::{Stage, read_all_typed};
|
|
let exact = dep_number.to_string();
|
|
let prefix = format!("{dep_number}_");
|
|
read_all_typed().into_iter().any(|item| {
|
|
(item.story_id.0 == exact || item.story_id.0.starts_with(&prefix))
|
|
&& matches!(
|
|
item.stage,
|
|
Stage::Archived { .. }
|
|
| Stage::Abandoned { .. }
|
|
| Stage::Superseded { .. }
|
|
| Stage::Rejected { .. }
|
|
)
|
|
})
|
|
}
|
|
|
|
/// Check unmet dependencies for a story by reading its `depends_on` from the
|
|
/// CRDT document and checking each dependency against CRDT state.
|
|
///
|
|
/// Returns the list of dependency numbers that are NOT in `5_done` or `6_archived`.
|
|
pub fn check_unmet_deps_crdt(story_id: &str) -> Vec<u32> {
|
|
let item = match read_item(story_id) {
|
|
Some(i) => i,
|
|
None => return Vec::new(),
|
|
};
|
|
let deps = item.depends_on().to_vec();
|
|
if deps.is_empty() {
|
|
return Vec::new();
|
|
}
|
|
deps.into_iter()
|
|
.filter(|&dep| !dep_is_done_crdt(dep))
|
|
.collect()
|
|
}
|
|
|
|
/// Return the list of dependency numbers from `story_id`'s `depends_on` that are
|
|
/// specifically in `6_archived` according to CRDT state.
|
|
///
|
|
/// Used to emit a warning when promotion fires because a dep is archived rather than
|
|
/// cleanly completed. Returns an empty `Vec` when no deps are archived.
|
|
pub fn check_archived_deps_crdt(story_id: &str) -> Vec<u32> {
|
|
let item = match read_item(story_id) {
|
|
Some(i) => i,
|
|
None => return Vec::new(),
|
|
};
|
|
let deps = item.depends_on().to_vec();
|
|
if deps.is_empty() {
|
|
return Vec::new();
|
|
}
|
|
deps.into_iter()
|
|
.filter(|&dep| dep_is_archived_crdt(dep))
|
|
.collect()
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::super::ops::apply_remote_op;
|
|
use super::super::state::init_for_test;
|
|
use super::super::state::rebuild_index;
|
|
use super::super::types::PipelineItemCrdt;
|
|
use super::super::write::write_item_str;
|
|
use super::*;
|
|
use bft_json_crdt::json_crdt::OpState;
|
|
use bft_json_crdt::keypair::make_keypair;
|
|
use bft_json_crdt::op::ROOT_ID;
|
|
use serde_json::json;
|
|
|
|
#[test]
|
|
fn extract_item_view_parses_crdt_item() {
|
|
let kp = make_keypair();
|
|
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
|
|
|
let item_json: JsonValue = json!({
|
|
"story_id": "40_story_view",
|
|
"stage": "2_current",
|
|
"name": "View Test",
|
|
"agent": "coder-1",
|
|
"retry_count": 2.0,
|
|
"blocked": true,
|
|
"depends_on": "[10,20]",
|
|
"claimed_by": "",
|
|
"claimed_at": 0.0,
|
|
})
|
|
.into();
|
|
|
|
let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
|
|
crdt.apply(op);
|
|
|
|
let view = extract_item_view(&crdt.doc.items[0]).unwrap();
|
|
assert_eq!(view.story_id, "40_story_view");
|
|
assert!(matches!(
|
|
view.stage,
|
|
crate::pipeline_state::Stage::Coding { .. }
|
|
));
|
|
assert_eq!(view.name, "View Test");
|
|
assert_eq!(view.agent.map(|a| a.as_str()), Some("coder-1"));
|
|
assert_eq!(view.retry_count(), 2u32);
|
|
assert_eq!(view.depends_on, vec![10u32, 20u32]);
|
|
}
|
|
|
|
#[test]
|
|
fn dep_is_done_crdt_returns_false_when_no_crdt_state() {
|
|
// When the global CRDT state is not initialised (or in a test environment),
|
|
// dep_is_done_crdt should return false rather than panicking.
|
|
// Note: in the test binary the global may or may not be initialised,
|
|
// but the function should never panic either way.
|
|
let _ = dep_is_done_crdt(9999);
|
|
}
|
|
|
|
#[test]
|
|
fn check_unmet_deps_crdt_returns_empty_when_item_not_found() {
|
|
// Non-existent story should return empty deps.
|
|
let result = check_unmet_deps_crdt("nonexistent_story");
|
|
assert!(result.is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn dep_is_archived_crdt_returns_false_when_no_crdt_state() {
|
|
// When the global CRDT state is not initialised, must not panic.
|
|
let _ = dep_is_archived_crdt(9998);
|
|
}
|
|
|
|
#[test]
|
|
fn check_archived_deps_crdt_returns_empty_when_item_not_found() {
|
|
// Non-existent story should return empty archived deps.
|
|
let result = check_archived_deps_crdt("nonexistent_story_archived");
|
|
assert!(result.is_empty());
|
|
}
|
|
|
|
/// Regression for story 917: evict_item must tombstone ALL list entries that
|
|
/// share the target story_id, not just the one pointed to by the visible index.
|
|
///
|
|
/// When two nodes concurrently insert the same story_id (each seeing an empty
|
|
/// local index at the time), the CRDT list ends up with two entries. The
|
|
/// visible index (HashMap) keeps only the last-written entry. The old code
|
|
/// used `id_at(idx + 1)` which targeted only that last entry; the earlier
|
|
/// concurrent insert survived the deletion and would reappear after CRDT
|
|
/// replay — the "targeted item survives" bug.
|
|
#[test]
|
|
fn evict_item_tombstones_concurrent_duplicate_entries() {
|
|
init_for_test();
|
|
let story_id = "917_story_concurrent_evict";
|
|
|
|
// Insert the story locally (simulates node 1's insert).
|
|
write_item_str(
|
|
story_id,
|
|
"1_backlog",
|
|
Some("Node 1 insert"),
|
|
None,
|
|
None,
|
|
None,
|
|
);
|
|
|
|
// The story is live on this node.
|
|
assert!(
|
|
read_item(story_id).is_some(),
|
|
"story must be visible after local insert"
|
|
);
|
|
|
|
// Simulate a concurrent insert of the same story_id from a different node
|
|
// (node 2 uses a separate keypair and its own BaseCrdt, representing a
|
|
// node that was partitioned and independently created the same story_id).
|
|
let kp2 = make_keypair();
|
|
let mut remote_crdt = BaseCrdt::<PipelineDoc>::new(&kp2);
|
|
let remote_item: JsonValue = json!({
|
|
"story_id": story_id,
|
|
"stage": "1_backlog",
|
|
"name": "Node 2 concurrent insert",
|
|
"agent": "",
|
|
"retry_count": 0.0,
|
|
"blocked": false,
|
|
"depends_on": "",
|
|
"claimed_by": "",
|
|
"claimed_at": 0.0,
|
|
"merged_at": 0.0,
|
|
})
|
|
.into();
|
|
let remote_insert_op = remote_crdt
|
|
.doc
|
|
.items
|
|
.insert(ROOT_ID, remote_item)
|
|
.sign(&kp2);
|
|
remote_crdt.apply(remote_insert_op.clone());
|
|
|
|
// Deliver node 2's concurrent insert to this node. After this there are
|
|
// two list entries with story_id == story_id in the CRDT ops list.
|
|
let accepted = apply_remote_op(remote_insert_op);
|
|
assert!(accepted, "remote concurrent insert must be accepted");
|
|
|
|
// Now delete the story.
|
|
let result = evict_item(story_id);
|
|
assert!(result.is_ok(), "evict_item must succeed: {result:?}");
|
|
|
|
// Both the local AND the remote concurrent entry must be gone.
|
|
assert!(
|
|
read_item(story_id).is_none(),
|
|
"story must be absent from read_item after evict"
|
|
);
|
|
|
|
// Confirm via dump_crdt_state that every entry with this story_id is deleted.
|
|
let dump_state = dump_crdt_state(Some(story_id));
|
|
let any_surviving = dump_state
|
|
.items
|
|
.iter()
|
|
.any(|i| i.story_id.as_deref() == Some(story_id) && !i.is_deleted);
|
|
assert!(
|
|
!any_surviving,
|
|
"no non-deleted CRDT entry must remain for story_id '{story_id}'"
|
|
);
|
|
|
|
// story_id must be in the tombstone set so write_item cannot resurrect it.
|
|
write_item_str(
|
|
story_id,
|
|
"1_backlog",
|
|
Some("Resurrection attempt"),
|
|
None,
|
|
None,
|
|
None,
|
|
);
|
|
assert!(
|
|
read_item(story_id).is_none(),
|
|
"tombstoned story must not be resurrected by write_item"
|
|
);
|
|
}
|
|
}
|