506 lines
17 KiB
Rust
506 lines
17 KiB
Rust
//! High-level write API for pipeline items.
|
|
|
|
#![allow(unused_imports, dead_code)]
|
|
use bft_json_crdt::json_crdt::*;
|
|
use bft_json_crdt::lww_crdt::LwwRegisterCrdt;
|
|
use bft_json_crdt::op::ROOT_ID;
|
|
use serde_json::json;
|
|
|
|
use super::state::{apply_and_persist, emit_event, get_crdt, rebuild_index};
|
|
use super::types::{CrdtEvent, PipelineDoc, PipelineItemCrdt};
|
|
use crate::slog;
|
|
|
|
// ── Name migration helpers ────────────────────────────────────────────
|
|
|
|
/// Derive a human-readable name from a story ID's slug component.
|
|
///
|
|
/// Strips the numeric prefix and item-type prefix (story/bug/spike/refactor),
|
|
/// replaces underscores with spaces, and capitalises the first letter.
|
|
///
|
|
/// Examples:
|
|
/// - `"729_story_store_story_name"` → `"Store story name"`
|
|
/// - `"4_bug_login_crash"` → `"Login crash"`
|
|
/// - `"10_spike_arch_review"` → `"Arch review"`
|
|
pub fn name_from_story_id(story_id: &str) -> String {
|
|
// Strip the leading digits then the first underscore: "729_story_..." → "story_..."
|
|
let after_num = story_id.trim_start_matches(|c: char| c.is_ascii_digit());
|
|
let after_num = after_num.strip_prefix('_').unwrap_or(after_num);
|
|
|
|
// Strip the item-type prefix.
|
|
let slug = after_num
|
|
.strip_prefix("story_")
|
|
.or_else(|| after_num.strip_prefix("bug_"))
|
|
.or_else(|| after_num.strip_prefix("spike_"))
|
|
.or_else(|| after_num.strip_prefix("refactor_"))
|
|
.unwrap_or(after_num);
|
|
|
|
// Replace underscores with spaces.
|
|
let spaced = slug.replace('_', " ");
|
|
|
|
// Capitalise the first character.
|
|
let mut chars = spaced.chars();
|
|
match chars.next() {
|
|
None => String::new(),
|
|
Some(first) => {
|
|
let mut name = first.to_uppercase().to_string();
|
|
name.push_str(chars.as_str());
|
|
name
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Backfill the `name` CRDT field for pipeline items that have an empty name.
|
|
///
|
|
/// Iterates over all items in the in-memory CRDT. For each item whose `name`
|
|
/// register is empty, derives a human-readable name from the story ID slug
|
|
/// (see [`name_from_story_id`]) and writes it via a signed CRDT op.
|
|
///
|
|
/// This is a one-time startup migration: items created before the `name` field
|
|
/// was consistently populated will gain a name on the next server start.
|
|
/// Items that already have a non-empty name are left untouched.
|
|
pub fn migrate_names_from_slugs() {
|
|
let Some(state_mutex) = get_crdt() else {
|
|
return;
|
|
};
|
|
|
|
// First pass: collect (index, derived_name) pairs for items missing a name.
|
|
let migrations: Vec<(usize, String)> = {
|
|
let Ok(state) = state_mutex.lock() else {
|
|
return;
|
|
};
|
|
state
|
|
.index
|
|
.iter()
|
|
.filter_map(|(story_id, &idx)| {
|
|
let item = &state.crdt.doc.items[idx];
|
|
// Skip items that already have a name.
|
|
let already_named =
|
|
matches!(item.name.view(), JsonValue::String(ref s) if !s.is_empty());
|
|
if already_named {
|
|
return None;
|
|
}
|
|
let name = name_from_story_id(story_id);
|
|
if name.is_empty() {
|
|
return None;
|
|
}
|
|
Some((idx, name))
|
|
})
|
|
.collect()
|
|
};
|
|
|
|
if migrations.is_empty() {
|
|
return;
|
|
}
|
|
|
|
// Second pass: apply all name writes while holding the lock.
|
|
let Ok(mut state) = state_mutex.lock() else {
|
|
return;
|
|
};
|
|
let count = migrations.len();
|
|
for (idx, name) in migrations {
|
|
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].name.set(name.clone()));
|
|
}
|
|
slog!("[crdt] Migrated names for {count} items from story ID slugs");
|
|
}
|
|
|
|
/// Write a pipeline item state through CRDT operations.
|
|
///
|
|
/// If the item exists, updates its registers. If not, inserts a new item
|
|
/// into the list. All ops are signed and persisted to SQLite.
|
|
///
|
|
/// When the stage changes (or a new item is created), a [`CrdtEvent`] is
|
|
/// broadcast so subscribers can react to the transition.
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub fn write_item(
|
|
story_id: &str,
|
|
stage: &str,
|
|
name: Option<&str>,
|
|
agent: Option<&str>,
|
|
retry_count: Option<i64>,
|
|
blocked: Option<bool>,
|
|
depends_on: Option<&str>,
|
|
claimed_by: Option<&str>,
|
|
claimed_at: Option<f64>,
|
|
merged_at: Option<f64>,
|
|
) {
|
|
let Some(state_mutex) = get_crdt() else {
|
|
return;
|
|
};
|
|
let Ok(mut state) = state_mutex.lock() else {
|
|
return;
|
|
};
|
|
|
|
if let Some(&idx) = state.index.get(story_id) {
|
|
// Capture the old stage before updating so we can detect transitions.
|
|
let old_stage = match state.crdt.doc.items[idx].stage.view() {
|
|
JsonValue::String(s) => Some(s),
|
|
_ => None,
|
|
};
|
|
|
|
// Update existing item registers.
|
|
apply_and_persist(&mut state, |s| {
|
|
s.crdt.doc.items[idx].stage.set(stage.to_string())
|
|
});
|
|
|
|
if let Some(n) = name {
|
|
apply_and_persist(&mut state, |s| {
|
|
s.crdt.doc.items[idx].name.set(n.to_string())
|
|
});
|
|
}
|
|
if let Some(a) = agent {
|
|
apply_and_persist(&mut state, |s| {
|
|
s.crdt.doc.items[idx].agent.set(a.to_string())
|
|
});
|
|
}
|
|
if let Some(rc) = retry_count {
|
|
apply_and_persist(&mut state, |s| {
|
|
s.crdt.doc.items[idx].retry_count.set(rc as f64)
|
|
});
|
|
}
|
|
if let Some(b) = blocked {
|
|
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].blocked.set(b));
|
|
}
|
|
if let Some(d) = depends_on {
|
|
apply_and_persist(&mut state, |s| {
|
|
s.crdt.doc.items[idx].depends_on.set(d.to_string())
|
|
});
|
|
}
|
|
if let Some(cb) = claimed_by {
|
|
apply_and_persist(&mut state, |s| {
|
|
s.crdt.doc.items[idx].claimed_by.set(cb.to_string())
|
|
});
|
|
}
|
|
if let Some(ca) = claimed_at {
|
|
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(ca));
|
|
}
|
|
if let Some(ma) = merged_at {
|
|
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma));
|
|
}
|
|
|
|
// Broadcast a CrdtEvent if the stage actually changed.
|
|
let stage_changed = old_stage.as_deref() != Some(stage);
|
|
if stage_changed {
|
|
// Read the current name from the CRDT document for the event.
|
|
let current_name = match state.crdt.doc.items[idx].name.view() {
|
|
JsonValue::String(s) if !s.is_empty() => Some(s),
|
|
_ => None,
|
|
};
|
|
emit_event(CrdtEvent {
|
|
story_id: story_id.to_string(),
|
|
from_stage: old_stage,
|
|
to_stage: stage.to_string(),
|
|
name: current_name,
|
|
});
|
|
}
|
|
} else {
|
|
// Insert new item.
|
|
let item_json: JsonValue = json!({
|
|
"story_id": story_id,
|
|
"stage": stage,
|
|
"name": name.unwrap_or(""),
|
|
"agent": agent.unwrap_or(""),
|
|
"retry_count": retry_count.unwrap_or(0) as f64,
|
|
"blocked": blocked.unwrap_or(false),
|
|
"depends_on": depends_on.unwrap_or(""),
|
|
"claimed_by": claimed_by.unwrap_or(""),
|
|
"claimed_at": claimed_at.unwrap_or(0.0),
|
|
"merged_at": merged_at.unwrap_or(0.0),
|
|
})
|
|
.into();
|
|
|
|
apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json));
|
|
|
|
// Rebuild index after insertion (indices may shift).
|
|
state.index = rebuild_index(&state.crdt);
|
|
|
|
// Advance the inner registers of the newly-created item to the Lamport
|
|
// floor so their first local ops don't re-emit low sequence numbers.
|
|
let floor = state.lamport_floor;
|
|
if floor > 0
|
|
&& let Some(&idx) = state.index.get(story_id)
|
|
{
|
|
let item = &mut state.crdt.doc.items[idx];
|
|
item.story_id.advance_seq(floor);
|
|
item.stage.advance_seq(floor);
|
|
item.name.advance_seq(floor);
|
|
item.agent.advance_seq(floor);
|
|
item.retry_count.advance_seq(floor);
|
|
item.blocked.advance_seq(floor);
|
|
item.depends_on.advance_seq(floor);
|
|
item.claimed_by.advance_seq(floor);
|
|
item.claimed_at.advance_seq(floor);
|
|
item.merged_at.advance_seq(floor);
|
|
}
|
|
|
|
// Broadcast a CrdtEvent for the new item.
|
|
emit_event(CrdtEvent {
|
|
story_id: story_id.to_string(),
|
|
from_stage: None,
|
|
to_stage: stage.to_string(),
|
|
name: name.map(String::from),
|
|
});
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::super::hex;
|
|
use super::super::read::extract_item_view;
|
|
use super::super::read::read_item;
|
|
use super::super::state::init_for_test;
|
|
use super::super::state::rebuild_index;
|
|
use super::*;
|
|
|
|
// ── name_from_story_id tests ─────────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn name_from_story_id_story_type() {
|
|
assert_eq!(
|
|
name_from_story_id("729_story_store_story_name_as_a_crdt_field"),
|
|
"Store story name as a crdt field"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn name_from_story_id_bug_type() {
|
|
assert_eq!(name_from_story_id("4_bug_login_crash"), "Login crash");
|
|
}
|
|
|
|
#[test]
|
|
fn name_from_story_id_spike_type() {
|
|
assert_eq!(name_from_story_id("10_spike_arch_review"), "Arch review");
|
|
}
|
|
|
|
#[test]
|
|
fn name_from_story_id_refactor_type() {
|
|
assert_eq!(
|
|
name_from_story_id("99_refactor_decompose_server"),
|
|
"Decompose server"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn name_from_story_id_single_word() {
|
|
assert_eq!(name_from_story_id("1_story_auth"), "Auth");
|
|
}
|
|
|
|
#[test]
|
|
fn name_from_story_id_unknown_type_fallback() {
|
|
// Unknown type prefix is left as-is after stripping the number.
|
|
assert_eq!(name_from_story_id("5_unknown_foo_bar"), "Unknown foo bar");
|
|
}
|
|
|
|
// ── migrate_names_from_slugs tests ───────────────────────────────────────
|
|
|
|
#[test]
|
|
fn migrate_names_from_slugs_fills_empty_names() {
|
|
init_for_test();
|
|
|
|
// Write an item without a name.
|
|
write_item(
|
|
"42_story_my_feature",
|
|
"1_backlog",
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
);
|
|
|
|
// Before migration the name should be empty.
|
|
let before = read_item("42_story_my_feature").unwrap();
|
|
assert!(
|
|
before.name.as_deref().unwrap_or("").is_empty(),
|
|
"name should be empty before migration"
|
|
);
|
|
|
|
migrate_names_from_slugs();
|
|
|
|
// After migration the name should be derived from the slug.
|
|
let after = read_item("42_story_my_feature").unwrap();
|
|
assert_eq!(
|
|
after.name.as_deref(),
|
|
Some("My feature"),
|
|
"name should be derived from slug after migration"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn migrate_names_from_slugs_leaves_existing_names_unchanged() {
|
|
init_for_test();
|
|
|
|
write_item(
|
|
"43_story_named_item",
|
|
"1_backlog",
|
|
Some("Already Named"),
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
);
|
|
|
|
migrate_names_from_slugs();
|
|
|
|
let after = read_item("43_story_named_item").unwrap();
|
|
assert_eq!(
|
|
after.name.as_deref(),
|
|
Some("Already Named"),
|
|
"pre-existing name must not be overwritten"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn migrate_names_from_slugs_noop_when_crdt_not_initialised() {
|
|
// Should not panic when called before init.
|
|
// In practice get_crdt() returns None in a fresh thread.
|
|
// We call it here just to confirm no panic.
|
|
migrate_names_from_slugs();
|
|
}
|
|
use bft_json_crdt::json_crdt::OpState;
|
|
use bft_json_crdt::keypair::make_keypair;
|
|
use bft_json_crdt::op::ROOT_ID;
|
|
use serde_json::json;
|
|
use sqlx::SqlitePool;
|
|
use sqlx::sqlite::SqliteConnectOptions;
|
|
|
|
#[tokio::test]
|
|
async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() {
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let db_path = tmp.path().join("bug511.db");
|
|
|
|
let options = SqliteConnectOptions::new()
|
|
.filename(&db_path)
|
|
.create_if_missing(true);
|
|
let pool = SqlitePool::connect_with(options).await.unwrap();
|
|
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
|
|
|
|
let kp = make_keypair();
|
|
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
|
|
|
|
// Insert 5 dummy items to advance items.our_seq to 5.
|
|
for i in 0..5u32 {
|
|
let sid = format!("{}_story_warmup", i);
|
|
let item: JsonValue = json!({
|
|
"story_id": sid,
|
|
"stage": "1_backlog",
|
|
"name": "",
|
|
"agent": "",
|
|
"retry_count": 0.0,
|
|
"blocked": false,
|
|
"depends_on": "",
|
|
"claimed_by": "",
|
|
"claimed_at": 0.0,
|
|
})
|
|
.into();
|
|
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
|
|
crdt.apply(op.clone());
|
|
// We don't persist these to the DB — they are pre-history.
|
|
}
|
|
|
|
// Now insert the real item. items.our_seq was 5, so this op gets seq=6.
|
|
let target_item: JsonValue = json!({
|
|
"story_id": "511_story_target",
|
|
"stage": "1_backlog",
|
|
"name": "Bug 511 target",
|
|
"agent": "",
|
|
"retry_count": 0.0,
|
|
"blocked": false,
|
|
"depends_on": "",
|
|
"claimed_by": "",
|
|
"claimed_at": 0.0,
|
|
})
|
|
.into();
|
|
let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp);
|
|
crdt.apply(insert_op.clone());
|
|
// insert_op.inner.seq == 6
|
|
|
|
// Now update the stage. The stage LwwRegisterCrdt for this item starts
|
|
// at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6.
|
|
let idx = rebuild_index(&crdt)["511_story_target"];
|
|
let stage_op = crdt.doc.items[idx]
|
|
.stage
|
|
.set("2_current".to_string())
|
|
.sign(&kp);
|
|
crdt.apply(stage_op.clone());
|
|
// stage_op.inner.seq == 1
|
|
|
|
// Persist BOTH ops in causal order (insert first, update second).
|
|
// This means insert_op gets rowid < stage_op rowid.
|
|
let now = chrono::Utc::now().to_rfc3339();
|
|
for op in [&insert_op, &stage_op] {
|
|
let op_json = serde_json::to_string(op).unwrap();
|
|
let op_id = hex::encode(&op.id());
|
|
sqlx::query(
|
|
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)",
|
|
)
|
|
.bind(&op_id)
|
|
.bind(op.inner.seq as i64)
|
|
.bind(&op_json)
|
|
.bind(&now)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
// Replay by rowid ASC (the fix). The insert must come before the field
|
|
// update regardless of their field-level seq values.
|
|
let rows: Vec<(String,)> =
|
|
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
|
|
.fetch_all(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
|
|
for (json_str,) in &rows {
|
|
let op: SignedOp = serde_json::from_str(json_str).unwrap();
|
|
crdt2.apply(op);
|
|
}
|
|
|
|
// The item must be in the CRDT and must reflect the stage update.
|
|
let index2 = rebuild_index(&crdt2);
|
|
assert!(
|
|
index2.contains_key("511_story_target"),
|
|
"item not found after rowid-order replay"
|
|
);
|
|
let idx2 = index2["511_story_target"];
|
|
let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap();
|
|
assert_eq!(
|
|
view.stage, "2_current",
|
|
"stage field update lost during replay (bug 511 regression)"
|
|
);
|
|
|
|
// Confirm the bug is reproducible by replaying seq ASC instead.
|
|
// With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6),
|
|
// fails ErrPathMismatch, and the item ends up at "1_backlog".
|
|
let rows_wrong_order: Vec<(String,)> =
|
|
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC")
|
|
.fetch_all(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut crdt3 = BaseCrdt::<PipelineDoc>::new(&kp);
|
|
for (json_str,) in &rows_wrong_order {
|
|
let op: SignedOp = serde_json::from_str(json_str).unwrap();
|
|
crdt3.apply(op);
|
|
}
|
|
|
|
let index3 = rebuild_index(&crdt3);
|
|
// With seq ASC replay, the item is created (insert_op eventually runs)
|
|
// but the stage update is lost (it ran before the item existed).
|
|
if let Some(idx3) = index3.get("511_story_target") {
|
|
let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap();
|
|
// The bug: stage is still "1_backlog" because the update was dropped.
|
|
assert_eq!(
|
|
view3.stage, "1_backlog",
|
|
"expected seq-ASC replay to exhibit the bug (update lost)"
|
|
);
|
|
}
|
|
}
|
|
}
|