Files
huskies/server/src/db/ops.rs
T

478 lines
17 KiB
Rust
Raw Normal View History

2026-04-29 15:49:50 +00:00
//! Write operations for the pipeline — content, stage transitions, and deletions.
//!
//! Each function updates three layers atomically in order: the in-memory
//! content store, the CRDT (source of truth for metadata), and the SQLite
//! shadow table (via the background channel).
use super::content_store::{
2026-05-13 11:22:57 +00:00
ContentKey, all_content_ids, delete_content, ensure_content_store, read_content, write_content,
2026-04-29 15:49:50 +00:00
};
use super::shadow_write::{PIPELINE_DB, PipelineWriteMsg};
2026-04-30 22:23:21 +00:00
/// Typed metadata for a pipeline item write.
///
/// Story 929: callers pass metadata explicitly — no YAML parsing. Every
/// field is `Option`-typed; `None` means "leave unchanged" on update,
/// "use the default" on insert.
2026-04-30 22:23:21 +00:00
#[derive(Default, Clone, Debug)]
pub struct ItemMeta {
pub name: Option<String>,
pub agent: Option<String>,
pub depends_on: Option<Vec<u32>>,
}
impl ItemMeta {
/// Convenience constructor for the common "just set a name" case.
#[cfg(test)]
pub fn named(name: impl Into<String>) -> Self {
Self {
name: Some(name.into()),
..Self::default()
}
}
}
/// Normalise a stage string at the db boundary.
///
/// Accepts the clean post-934 vocabulary (passthrough) and the pre-934
/// directory-style strings (`"2_current"`, `"4_merge"`, etc.) by mapping
/// them to the clean form before handing off to `Stage::from_dir` (which
/// itself only accepts clean form after stage 6). This keeps the public
/// db API tolerant for callers that still pass legacy strings while the
/// internal type stays strict.
fn normalise_stage_str(stage: &str) -> &str {
match stage {
"0_upcoming" => "upcoming",
"1_backlog" => "backlog",
"2_current" => "coding",
"2_blocked" => "blocked",
"3_qa" => "qa",
"4_merge" => "merge",
"4_merge_failure" => "merge_failure",
2026-05-13 06:05:01 +00:00
"4_merge_failure_final" => "merge_failure_final",
"5_done" => "done",
"6_archived" => "archived",
2026-05-13 06:05:01 +00:00
"7_frozen" => "frozen",
"7_review_hold" => "review_hold",
other => other,
}
}
2026-04-29 15:49:50 +00:00
/// Write a pipeline item from in-memory content (no filesystem access).
///
/// This is the primary write path for the DB-backed pipeline. It updates
/// the CRDT, the in-memory content store, and the SQLite shadow table.
2026-04-30 22:23:21 +00:00
///
/// The metadata in `meta` is authoritative: this function does NOT parse
/// `content` to extract front-matter fields. Callers must pass typed
/// metadata explicitly via `ItemMeta`.
pub fn write_item_with_content(story_id: &str, stage: &str, content: &str, meta: ItemMeta) {
let depends_on_json = meta
.depends_on
.as_ref()
.and_then(|d| serde_json::to_string(d).ok());
2026-04-29 15:49:50 +00:00
// Update in-memory content store.
ensure_content_store();
2026-05-13 11:22:57 +00:00
write_content(ContentKey::Story(story_id), content);
2026-04-29 15:49:50 +00:00
// Primary: CRDT ops.
let stage = normalise_stage_str(stage);
let Some(typed_stage) = crate::pipeline_state::Stage::from_dir(stage) else {
crate::slog!(
"[db] write_item_with_content: unknown stage '{stage}' for {story_id}; skipping CRDT write"
);
return;
2026-04-29 15:49:50 +00:00
};
let merged_at_ts = matches!(typed_stage, crate::pipeline_state::Stage::Done { .. })
.then(|| chrono::Utc::now().timestamp() as f64);
2026-04-29 15:49:50 +00:00
crate::crdt_state::write_item(
story_id,
&typed_stage,
2026-04-30 22:23:21 +00:00
meta.name.as_deref(),
meta.agent.as_deref(),
depends_on_json.as_deref(),
2026-04-29 15:49:50 +00:00
merged_at_ts,
);
// Shadow: pipeline_items table (only when DB is initialised).
if let Some(db) = PIPELINE_DB.get() {
let msg = PipelineWriteMsg {
story_id: story_id.to_string(),
stage: stage.to_string(),
2026-04-30 22:23:21 +00:00
name: meta.name,
agent: meta.agent,
2026-05-14 11:01:06 +00:00
retry_count: None,
2026-04-30 22:23:21 +00:00
depends_on: depends_on_json,
2026-04-29 15:49:50 +00:00
content: Some(content.to_string()),
};
let _ = db.tx.send(msg);
}
}
/// Update only the stage of an existing item (used by move operations).
///
/// Reads current content from the in-memory store, updates the CRDT stage,
/// and persists the change. Optionally modifies the content (e.g. to clear
/// front-matter fields).
pub fn move_item_stage(
story_id: &str,
new_stage: &str,
content_transform: Option<&dyn Fn(&str) -> String>,
) {
2026-05-13 11:22:57 +00:00
let current_content = read_content(ContentKey::Story(story_id));
2026-04-29 15:49:50 +00:00
let content = match (&current_content, content_transform) {
(Some(c), Some(transform)) => {
let new_content = transform(c);
2026-05-13 11:22:57 +00:00
write_content(ContentKey::Story(story_id), &new_content);
2026-04-29 15:49:50 +00:00
Some(new_content)
}
(Some(c), None) => Some(c.clone()),
_ => None,
};
2026-05-13 06:05:01 +00:00
// Story 929: metadata (name/agent/depends_on) is owned by the CRDT typed
// registers — no need to re-derive it from the content body's YAML front
// matter on every stage transition. Pass `None` for those fields so
// write_item leaves the existing registers untouched.
let new_stage = normalise_stage_str(new_stage);
let Some(typed_stage) = crate::pipeline_state::Stage::from_dir(new_stage) else {
crate::slog!(
"[db] move_item_stage: unknown stage '{new_stage}' for {story_id}; skipping CRDT write"
);
return;
2026-04-29 15:49:50 +00:00
};
let merged_at_ts = matches!(typed_stage, crate::pipeline_state::Stage::Done { .. })
.then(|| chrono::Utc::now().timestamp() as f64);
2026-05-14 11:01:06 +00:00
crate::crdt_state::write_item(story_id, &typed_stage, None, None, None, merged_at_ts);
2026-04-29 15:49:50 +00:00
// Bug 780: stage transitions reset retry_count to 0. retry_count tracks
// attempts at THIS stage's work (coding, merging, qa); a fresh attempt at
// a new stage is conceptually distinct from prior attempts at a different
// stage. `blocked` is preserved — that's a human-set signal that survives
// transitions.
crate::crdt_state::set_retry_count(story_id, 0);
// Shadow table — read current metadata from the CRDT so the SQLite
// mirror stays in sync. Always reset retry_count to 0 on stage transition.
2026-04-29 15:49:50 +00:00
if let Some(db) = PIPELINE_DB.get() {
let view = crate::crdt_state::read_item(story_id);
2026-05-13 07:54:50 +00:00
let name = view.as_ref().map(|v| v.name().to_string());
2026-05-13 11:58:50 +00:00
let agent = view.as_ref().and_then(|v| v.agent().map(|a| a.to_string()));
let depends_on = view
.as_ref()
.map(|v| v.depends_on())
.filter(|d| !d.is_empty())
.and_then(|d| serde_json::to_string(d).ok());
2026-04-29 15:49:50 +00:00
let msg = PipelineWriteMsg {
story_id: story_id.to_string(),
stage: new_stage.to_string(),
name,
agent,
retry_count: Some(0),
2026-04-29 15:49:50 +00:00
depends_on,
content,
};
let _ = db.tx.send(msg);
}
}
/// Shadow-write the updated agent field for an existing pipeline item.
///
/// Called by [`crate::crdt_state::set_agent`] after the CRDT register is updated
/// so `pipeline_items.agent` stays in sync. Reads the full current metadata from
/// the CRDT (stage, name, depends_on, retry_count) to avoid overwriting other
/// columns with stale values — only the `agent` column carries the new data.
pub fn sync_item_agent(story_id: &str) {
let Some(db) = PIPELINE_DB.get() else {
return;
};
let Some(view) = crate::crdt_state::read_item(story_id) else {
return;
};
let stage = view.stage().dir_name().to_string();
let name = Some(view.name().to_string());
let agent = view.agent().map(|a| a.as_str().to_string());
let depends_on = {
let d = view.depends_on();
if d.is_empty() {
None
} else {
serde_json::to_string(d).ok()
}
};
let retry_count = Some(i64::from(view.retry_count()));
let msg = PipelineWriteMsg {
story_id: story_id.to_string(),
stage,
name,
agent,
retry_count,
depends_on,
content: None,
};
let _ = db.tx.send(msg);
}
2026-04-29 15:49:50 +00:00
/// Delete a story from the shadow table (fire-and-forget).
pub fn delete_item(story_id: &str) {
2026-05-13 11:22:57 +00:00
delete_content(ContentKey::Story(story_id));
2026-04-29 15:49:50 +00:00
if let Some(db) = PIPELINE_DB.get() {
// Reuse the channel with a special "deleted" stage marker.
// The background task will handle it.
// Actually, we send a delete message by abusing the write — we'll
// just remove it by setting stage to "deleted".
let msg = PipelineWriteMsg {
story_id: story_id.to_string(),
stage: "deleted".to_string(),
name: None,
agent: None,
retry_count: None,
depends_on: None,
content: None,
};
let _ = db.tx.send(msg);
}
}
/// Delete a story from the shadow table, awaiting the SQLite write.
///
/// Unlike [`delete_item`], this function issues a direct `DELETE FROM
/// pipeline_items` via the shared pool and awaits the result — so the row
/// is gone before this function returns. Use this from async call sites
/// where durability of the deletion matters (e.g. story deletion, startup
/// migration). Falls back to the fire-and-forget channel when the shared
/// pool is not yet initialised.
pub async fn delete_item_sync(story_id: &str) {
delete_content(ContentKey::Story(story_id));
if let Some(pool) = super::shadow_write::get_shared_pool() {
if let Err(e) = sqlx::query("DELETE FROM pipeline_items WHERE id = ?1")
.bind(story_id)
.execute(pool)
.await
{
crate::slog_warn!(
"[db] Synchronous delete from pipeline_items failed for '{}': {e}",
story_id
);
}
} else if let Some(db) = PIPELINE_DB.get() {
let msg = PipelineWriteMsg {
story_id: story_id.to_string(),
stage: "deleted".to_string(),
name: None,
agent: None,
retry_count: None,
depends_on: None,
content: None,
};
let _ = db.tx.send(msg);
2026-04-29 15:49:50 +00:00
}
}
/// Sync the shadow table's `name` column after a CRDT name-register write.
///
/// Reads the current item from the CRDT (which already holds the new name after
/// `apply_and_persist`) and sends a `PipelineWriteMsg` so the SQLite mirror
/// stays in sync. All other columns (stage, agent, retry_count, depends_on)
/// are preserved from the live CRDT view; `content` is left as `None` so the
/// UPSERT's `COALESCE` keeps the existing value.
///
/// No-ops if the DB is not initialised or the item is not in the CRDT.
pub fn sync_item_name(story_id: &str) {
let Some(db) = PIPELINE_DB.get() else { return };
let Some(view) = crate::crdt_state::read_item(story_id) else {
return;
};
let depends_on = {
let d = view.depends_on();
if d.is_empty() {
None
} else {
serde_json::to_string(d).ok()
}
};
let msg = PipelineWriteMsg {
story_id: story_id.to_string(),
stage: view.stage().dir_name().to_string(),
name: Some(view.name().to_string()),
agent: view.agent().map(|a| a.to_string()),
retry_count: Some(view.retry_count() as i64),
depends_on,
content: None,
};
let _ = db.tx.send(msg);
}
/// Sync the `depends_on` field of a pipeline item from the CRDT to the shadow table.
///
/// Called after [`crate::crdt_state::set_depends_on`] updates the CRDT register so
/// that the SQLite shadow table stays in lock-step. Reads the full current view from
/// the CRDT (stage, name, agent, retry_count, depends_on) and sends a
/// [`PipelineWriteMsg`] over [`PIPELINE_DB`]`.tx`. Pattern mirrors
/// [`move_item_stage`] lines 157-176. No-op when the CRDT is uninitialised or the
/// story_id is not found.
pub fn sync_item_depends_on(story_id: &str) {
let Some(db) = PIPELINE_DB.get() else {
return;
};
let Some(view) = crate::crdt_state::read_item(story_id) else {
return;
};
let depends_on = {
let d = view.depends_on();
if d.is_empty() {
None
} else {
serde_json::to_string(d).ok()
}
};
let msg = PipelineWriteMsg {
story_id: story_id.to_string(),
stage: view.stage().dir_name().to_string(),
name: Some(view.name().to_string()),
agent: view.agent().map(|a| a.to_string()),
retry_count: Some(view.retry_count() as i64),
depends_on,
content: None,
};
let _ = db.tx.send(msg);
}
/// Get the next available item number by scanning the CRDT state, the
/// in-memory content store, AND the tombstone set for the highest existing
/// number.
///
/// Tombstoned IDs are excluded from `read_all_typed` (their CRDT entry is
/// `is_deleted`) and from `all_content_ids` (their content row is cleared by
/// `evict_item`). Without consulting the tombstone set, the allocator can
/// hand out a tombstoned numeric ID; `crdt_state::write_item` would then
/// silently reject the new entry while the content store and SQLite shadow
/// happily accept it, producing a split-brain half-write (bug 1001).
2026-04-29 15:49:50 +00:00
pub fn next_item_number() -> u32 {
let mut max_num: u32 = 0;
let parse_leading_digits = |s: &str| -> Option<u32> {
let num_str: String = s.chars().take_while(|c| c.is_ascii_digit()).collect();
num_str.parse::<u32>().ok()
};
2026-04-29 15:49:50 +00:00
// Scan CRDT items via typed projection.
for item in crate::pipeline_state::read_all_typed() {
if let Some(n) = parse_leading_digits(&item.story_id.0)
2026-04-29 15:49:50 +00:00
&& n > max_num
{
max_num = n;
}
}
// Also scan the content store (might have items not yet in CRDT).
for id in all_content_ids() {
if let Some(n) = parse_leading_digits(&id)
&& n > max_num
{
max_num = n;
}
}
// Also scan tombstones — a tombstoned ID still poisons that slot because
// crdt_state::write_item rejects writes for tombstoned IDs. Without this
// pass, the next allocated ID can collide with a tombstone and produce
// a half-write (bug 1001).
for id in crate::crdt_state::tombstoned_ids() {
if let Some(n) = parse_leading_digits(&id)
2026-04-29 15:49:50 +00:00
&& n > max_num
{
max_num = n;
}
}
max_num + 1
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::shadow_write;
/// `shadow_write::init` spawns its background task on the calling runtime.
/// Under `#[tokio::test]` that runtime is per-test and drops when the test
/// ends, killing the task. This OnceLock holds a multi-thread runtime that
/// persists for the lifetime of the test binary so the write loop stays alive
/// across all tests that share `PIPELINE_DB`.
static SHADOW_RT: std::sync::OnceLock<tokio::runtime::Runtime> = std::sync::OnceLock::new();
async fn ensure_shadow_db() {
static INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new();
if INIT.get().is_some() {
return;
}
let rt = SHADOW_RT.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.expect("shadow rt")
});
rt.spawn(async {
static INNER: std::sync::OnceLock<()> = std::sync::OnceLock::new();
if INNER.get().is_some() {
return;
}
let tmp = tempfile::tempdir().expect("tmp");
let db_path = tmp.path().join("pipeline.db");
std::mem::forget(tmp);
shadow_write::init(&db_path).await.expect("shadow init");
let _ = INNER.set(());
})
.await
.expect("shadow init task");
let _ = INIT.set(());
}
/// Regression test for story 1097: `set_depends_on` must sync the shadow
/// table. Before the fix, the CRDT register was updated but the
/// `pipeline_items.depends_on` column was never written.
#[tokio::test]
async fn set_depends_on_syncs_shadow_table() {
crate::crdt_state::init_for_test();
ensure_content_store();
ensure_shadow_db().await;
let story_id = "1097_story_depends_on_shadow_drift";
// Insert the story so it exists in both the CRDT and the shadow table.
write_item_with_content(
story_id,
"backlog",
"---\nname: Depends On Shadow Drift\n---\n",
ItemMeta::named("Depends On Shadow Drift"),
);
// Let the initial shadow write land.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// This is the write under test: it must update the shadow table.
let ok = crate::crdt_state::set_depends_on(story_id, &[1, 2]);
assert!(ok, "set_depends_on must return true for an existing item");
// Let the shadow write land.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let pool = shadow_write::get_shared_pool().expect("pool must be initialised");
let row: (Option<String>,) =
sqlx::query_as("SELECT depends_on FROM pipeline_items WHERE id = ?1")
.bind(story_id)
.fetch_one(pool)
.await
.expect("row must exist in shadow table");
assert_eq!(
row.0.as_deref(),
Some("[1,2]"),
"pipeline_items.depends_on must reflect the set_depends_on call"
);
}
}