huskies: merge 1097 bug Shadow drift: set_depends_on writes CRDT depends_on register without updating pipeline_items.depends_on
This commit is contained in:
@@ -21,21 +21,26 @@ use crate::pipeline_state::{AgentClaim, Stage, stage_dir_name};
|
||||
///
|
||||
/// Returns `true` if the item was found and the op was applied, `false` otherwise.
|
||||
pub fn set_depends_on(story_id: &str, deps: &[u32]) -> bool {
|
||||
let Some(state_mutex) = get_crdt() else {
|
||||
return false;
|
||||
};
|
||||
let Ok(mut state) = state_mutex.lock() else {
|
||||
return false;
|
||||
};
|
||||
let Some(&idx) = state.index.get(story_id) else {
|
||||
return false;
|
||||
};
|
||||
let value = if deps.is_empty() {
|
||||
String::new()
|
||||
} else {
|
||||
serde_json::to_string(deps).unwrap_or_default()
|
||||
};
|
||||
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].depends_on.set(value));
|
||||
{
|
||||
let Some(state_mutex) = get_crdt() else {
|
||||
return false;
|
||||
};
|
||||
let Ok(mut state) = state_mutex.lock() else {
|
||||
return false;
|
||||
};
|
||||
let Some(&idx) = state.index.get(story_id) else {
|
||||
return false;
|
||||
};
|
||||
let value = if deps.is_empty() {
|
||||
String::new()
|
||||
} else {
|
||||
serde_json::to_string(deps).unwrap_or_default()
|
||||
};
|
||||
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].depends_on.set(value));
|
||||
}
|
||||
// Drop the CRDT lock before calling sync: read_item acquires the same
|
||||
// mutex and would deadlock if the lock were still held here.
|
||||
crate::db::ops::sync_item_depends_on(story_id);
|
||||
true
|
||||
}
|
||||
|
||||
|
||||
@@ -268,6 +268,41 @@ pub fn sync_item_name(story_id: &str) {
|
||||
let _ = db.tx.send(msg);
|
||||
}
|
||||
|
||||
/// Sync the `depends_on` field of a pipeline item from the CRDT to the shadow table.
|
||||
///
|
||||
/// Called after [`crate::crdt_state::set_depends_on`] updates the CRDT register so
|
||||
/// that the SQLite shadow table stays in lock-step. Reads the full current view from
|
||||
/// the CRDT (stage, name, agent, retry_count, depends_on) and sends a
|
||||
/// [`PipelineWriteMsg`] over [`PIPELINE_DB`]`.tx`. Pattern mirrors
|
||||
/// [`move_item_stage`] lines 157-176. No-op when the CRDT is uninitialised or the
|
||||
/// story_id is not found.
|
||||
pub fn sync_item_depends_on(story_id: &str) {
|
||||
let Some(db) = PIPELINE_DB.get() else {
|
||||
return;
|
||||
};
|
||||
let Some(view) = crate::crdt_state::read_item(story_id) else {
|
||||
return;
|
||||
};
|
||||
let depends_on = {
|
||||
let d = view.depends_on();
|
||||
if d.is_empty() {
|
||||
None
|
||||
} else {
|
||||
serde_json::to_string(d).ok()
|
||||
}
|
||||
};
|
||||
let msg = PipelineWriteMsg {
|
||||
story_id: story_id.to_string(),
|
||||
stage: view.stage().dir_name().to_string(),
|
||||
name: Some(view.name().to_string()),
|
||||
agent: view.agent().map(|a| a.to_string()),
|
||||
retry_count: Some(view.retry_count() as i64),
|
||||
depends_on,
|
||||
content: None,
|
||||
};
|
||||
let _ = db.tx.send(msg);
|
||||
}
|
||||
|
||||
/// Get the next available item number by scanning the CRDT state, the
|
||||
/// in-memory content store, AND the tombstone set for the highest existing
|
||||
/// number.
|
||||
@@ -318,3 +353,56 @@ pub fn next_item_number() -> u32 {
|
||||
|
||||
max_num + 1
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::db::shadow_write;
|
||||
|
||||
/// Regression test for story 1097: `set_depends_on` must sync the shadow
|
||||
/// table. Before the fix, the CRDT register was updated but the
|
||||
/// `pipeline_items.depends_on` column was never written.
|
||||
#[tokio::test]
|
||||
async fn set_depends_on_syncs_shadow_table() {
|
||||
crate::crdt_state::init_for_test();
|
||||
ensure_content_store();
|
||||
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let db_path = tmp.path().join("pipeline.db");
|
||||
shadow_write::init(&db_path).await.unwrap();
|
||||
|
||||
let story_id = "1097_story_depends_on_shadow_drift";
|
||||
|
||||
// Insert the story so it exists in both the CRDT and the shadow table.
|
||||
write_item_with_content(
|
||||
story_id,
|
||||
"backlog",
|
||||
"---\nname: Depends On Shadow Drift\n---\n",
|
||||
ItemMeta::named("Depends On Shadow Drift"),
|
||||
);
|
||||
|
||||
// Let the initial shadow write land.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
|
||||
// This is the write under test: it must update the shadow table.
|
||||
let ok = crate::crdt_state::set_depends_on(story_id, &[1, 2]);
|
||||
assert!(ok, "set_depends_on must return true for an existing item");
|
||||
|
||||
// Let the shadow write land.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
|
||||
let pool = shadow_write::get_shared_pool().expect("pool must be initialised");
|
||||
let row: (Option<String>,) =
|
||||
sqlx::query_as("SELECT depends_on FROM pipeline_items WHERE id = ?1")
|
||||
.bind(story_id)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.expect("row must exist in shadow table");
|
||||
|
||||
assert_eq!(
|
||||
row.0.as_deref(),
|
||||
Some("[1,2]"),
|
||||
"pipeline_items.depends_on must reflect the set_depends_on call"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user