Files
huskies/server/src/crdt_state/write.rs
T

301 lines
11 KiB
Rust
Raw Normal View History

//! High-level write API for pipeline items.
#![allow(unused_imports, dead_code)]
use bft_json_crdt::json_crdt::*;
use bft_json_crdt::lww_crdt::LwwRegisterCrdt;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use super::state::{apply_and_persist, emit_event, get_crdt, rebuild_index};
use super::types::{CrdtEvent, PipelineDoc, PipelineItemCrdt};
use crate::slog;
/// Write a pipeline item state through CRDT operations.
///
/// If the item exists, updates its registers. If not, inserts a new item
/// into the list. All ops are signed and persisted to SQLite.
///
/// When the stage changes (or a new item is created), a [`CrdtEvent`] is
/// broadcast so subscribers can react to the transition.
#[allow(clippy::too_many_arguments)]
pub fn write_item(
story_id: &str,
stage: &str,
name: Option<&str>,
agent: Option<&str>,
retry_count: Option<i64>,
blocked: Option<bool>,
depends_on: Option<&str>,
claimed_by: Option<&str>,
claimed_at: Option<f64>,
merged_at: Option<f64>,
) {
let Some(state_mutex) = get_crdt() else {
return;
};
let Ok(mut state) = state_mutex.lock() else {
return;
};
if let Some(&idx) = state.index.get(story_id) {
// Capture the old stage before updating so we can detect transitions.
let old_stage = match state.crdt.doc.items[idx].stage.view() {
JsonValue::String(s) => Some(s),
_ => None,
};
// Update existing item registers.
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].stage.set(stage.to_string())
});
if let Some(n) = name {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].name.set(n.to_string())
});
}
if let Some(a) = agent {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].agent.set(a.to_string())
});
}
if let Some(rc) = retry_count {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].retry_count.set(rc as f64)
});
}
if let Some(b) = blocked {
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].blocked.set(b));
}
if let Some(d) = depends_on {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].depends_on.set(d.to_string())
});
}
if let Some(cb) = claimed_by {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].claimed_by.set(cb.to_string())
});
}
if let Some(ca) = claimed_at {
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(ca));
}
if let Some(ma) = merged_at {
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma));
}
// Broadcast a CrdtEvent if the stage actually changed.
let stage_changed = old_stage.as_deref() != Some(stage);
if stage_changed {
// Read the current name from the CRDT document for the event.
let current_name = match state.crdt.doc.items[idx].name.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
emit_event(CrdtEvent {
story_id: story_id.to_string(),
from_stage: old_stage,
to_stage: stage.to_string(),
name: current_name,
});
}
} else {
// Insert new item.
let item_json: JsonValue = json!({
"story_id": story_id,
"stage": stage,
"name": name.unwrap_or(""),
"agent": agent.unwrap_or(""),
"retry_count": retry_count.unwrap_or(0) as f64,
"blocked": blocked.unwrap_or(false),
"depends_on": depends_on.unwrap_or(""),
"claimed_by": claimed_by.unwrap_or(""),
"claimed_at": claimed_at.unwrap_or(0.0),
"merged_at": merged_at.unwrap_or(0.0),
})
.into();
apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json));
// Rebuild index after insertion (indices may shift).
state.index = rebuild_index(&state.crdt);
// Advance the inner registers of the newly-created item to the Lamport
// floor so their first local ops don't re-emit low sequence numbers.
let floor = state.lamport_floor;
if floor > 0
&& let Some(&idx) = state.index.get(story_id)
{
let item = &mut state.crdt.doc.items[idx];
item.story_id.advance_seq(floor);
item.stage.advance_seq(floor);
item.name.advance_seq(floor);
item.agent.advance_seq(floor);
item.retry_count.advance_seq(floor);
item.blocked.advance_seq(floor);
item.depends_on.advance_seq(floor);
item.claimed_by.advance_seq(floor);
item.claimed_at.advance_seq(floor);
item.merged_at.advance_seq(floor);
}
// Broadcast a CrdtEvent for the new item.
emit_event(CrdtEvent {
story_id: story_id.to_string(),
from_stage: None,
to_stage: stage.to_string(),
name: name.map(String::from),
});
}
}
#[cfg(test)]
mod tests {
use super::super::hex;
use super::super::read::extract_item_view;
use super::super::read::read_item;
use super::super::state::init_for_test;
use super::super::state::rebuild_index;
use super::*;
use bft_json_crdt::json_crdt::OpState;
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
use sqlx::SqlitePool;
use sqlx::sqlite::SqliteConnectOptions;
#[tokio::test]
async fn bug_511_rowid_replay_preserves_field_update_after_list_insert() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("bug511.db");
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
// Insert 5 dummy items to advance items.our_seq to 5.
for i in 0..5u32 {
let sid = format!("{}_story_warmup", i);
let item: JsonValue = json!({
"story_id": sid,
"stage": "1_backlog",
"name": "",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt.apply(op.clone());
// We don't persist these to the DB — they are pre-history.
}
// Now insert the real item. items.our_seq was 5, so this op gets seq=6.
let target_item: JsonValue = json!({
"story_id": "511_story_target",
"stage": "1_backlog",
"name": "Bug 511 target",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
"claimed_by": "",
"claimed_at": 0.0,
})
.into();
let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp);
crdt.apply(insert_op.clone());
// insert_op.inner.seq == 6
// Now update the stage. The stage LwwRegisterCrdt for this item starts
// at our_seq=0, so this field op gets seq=1. Crucially: seq=1 < seq=6.
let idx = rebuild_index(&crdt)["511_story_target"];
let stage_op = crdt.doc.items[idx]
.stage
.set("2_current".to_string())
.sign(&kp);
crdt.apply(stage_op.clone());
// stage_op.inner.seq == 1
// Persist BOTH ops in causal order (insert first, update second).
// This means insert_op gets rowid < stage_op rowid.
let now = chrono::Utc::now().to_rfc3339();
for op in [&insert_op, &stage_op] {
let op_json = serde_json::to_string(op).unwrap();
let op_id = hex::encode(&op.id());
sqlx::query(
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)",
)
.bind(&op_id)
.bind(op.inner.seq as i64)
.bind(&op_json)
.bind(&now)
.execute(&pool)
.await
.unwrap();
}
// Replay by rowid ASC (the fix). The insert must come before the field
// update regardless of their field-level seq values.
let rows: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY rowid ASC")
.fetch_all(&pool)
.await
.unwrap();
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
for (json_str,) in &rows {
let op: SignedOp = serde_json::from_str(json_str).unwrap();
crdt2.apply(op);
}
// The item must be in the CRDT and must reflect the stage update.
let index2 = rebuild_index(&crdt2);
assert!(
index2.contains_key("511_story_target"),
"item not found after rowid-order replay"
);
let idx2 = index2["511_story_target"];
let view = extract_item_view(&crdt2.doc.items[idx2]).unwrap();
assert_eq!(
view.stage, "2_current",
"stage field update lost during replay (bug 511 regression)"
);
// Confirm the bug is reproducible by replaying seq ASC instead.
// With seq ASC the stage_op (seq=1) arrives before insert_op (seq=6),
// fails ErrPathMismatch, and the item ends up at "1_backlog".
let rows_wrong_order: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC")
.fetch_all(&pool)
.await
.unwrap();
let mut crdt3 = BaseCrdt::<PipelineDoc>::new(&kp);
for (json_str,) in &rows_wrong_order {
let op: SignedOp = serde_json::from_str(json_str).unwrap();
crdt3.apply(op);
}
let index3 = rebuild_index(&crdt3);
// With seq ASC replay, the item is created (insert_op eventually runs)
// but the stage update is lost (it ran before the item existed).
if let Some(idx3) = index3.get("511_story_target") {
let view3 = extract_item_view(&crdt3.doc.items[*idx3]).unwrap();
// The bug: stage is still "1_backlog" because the update was dropped.
assert_eq!(
view3.stage, "1_backlog",
"expected seq-ASC replay to exhibit the bug (update lost)"
);
}
}
}