2026-04-29 15:49:50 +00:00
|
|
|
//! Write operations for the pipeline — content, stage transitions, and deletions.
|
|
|
|
|
//!
|
|
|
|
|
//! Each function updates three layers atomically in order: the in-memory
|
|
|
|
|
//! content store, the CRDT (source of truth for metadata), and the SQLite
|
|
|
|
|
//! shadow table (via the background channel).
|
|
|
|
|
use super::content_store::{
|
|
|
|
|
all_content_ids, delete_content, ensure_content_store, read_content, write_content,
|
|
|
|
|
};
|
|
|
|
|
use super::shadow_write::{PIPELINE_DB, PipelineWriteMsg};
|
|
|
|
|
|
2026-04-30 22:23:21 +00:00
|
|
|
/// Typed metadata for a pipeline item write.
|
|
|
|
|
///
|
2026-05-12 20:55:25 +01:00
|
|
|
/// Story 929: callers pass metadata explicitly — no YAML parsing. Every
|
|
|
|
|
/// field is `Option`-typed; `None` means "leave unchanged" on update,
|
|
|
|
|
/// "use the default" on insert.
|
2026-04-30 22:23:21 +00:00
|
|
|
#[derive(Default, Clone, Debug)]
|
|
|
|
|
pub struct ItemMeta {
|
|
|
|
|
pub name: Option<String>,
|
|
|
|
|
pub agent: Option<String>,
|
|
|
|
|
pub retry_count: Option<i64>,
|
|
|
|
|
pub blocked: Option<bool>,
|
|
|
|
|
pub depends_on: Option<Vec<u32>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl ItemMeta {
|
|
|
|
|
/// Convenience constructor for the common "just set a name" case.
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
pub fn named(name: impl Into<String>) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
name: Some(name.into()),
|
|
|
|
|
..Self::default()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-05-12 22:31:59 +01:00
|
|
|
/// Normalise a stage string at the db boundary.
|
|
|
|
|
///
|
|
|
|
|
/// Accepts the clean post-934 vocabulary (passthrough) and the pre-934
|
|
|
|
|
/// directory-style strings (`"2_current"`, `"4_merge"`, etc.) by mapping
|
|
|
|
|
/// them to the clean form before handing off to `Stage::from_dir` (which
|
|
|
|
|
/// itself only accepts clean form after stage 6). This keeps the public
|
|
|
|
|
/// db API tolerant for callers that still pass legacy strings while the
|
|
|
|
|
/// internal type stays strict.
|
|
|
|
|
fn normalise_stage_str(stage: &str) -> &str {
|
|
|
|
|
match stage {
|
|
|
|
|
"0_upcoming" => "upcoming",
|
|
|
|
|
"1_backlog" => "backlog",
|
|
|
|
|
"2_current" => "coding",
|
|
|
|
|
"2_blocked" => "blocked",
|
|
|
|
|
"3_qa" => "qa",
|
|
|
|
|
"4_merge" => "merge",
|
|
|
|
|
"4_merge_failure" => "merge_failure",
|
|
|
|
|
"5_done" => "done",
|
|
|
|
|
"6_archived" => "archived",
|
|
|
|
|
// `7_frozen` has no direct clean equivalent (the variant was
|
|
|
|
|
// removed in story 934 stage 4). Returning the unmapped string
|
|
|
|
|
// makes `Stage::from_dir` return None, so the write is logged and
|
|
|
|
|
// skipped — frozen items should be seeded via the `frozen` flag.
|
|
|
|
|
other => other,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-29 15:49:50 +00:00
|
|
|
/// Write a pipeline item from in-memory content (no filesystem access).
|
|
|
|
|
///
|
|
|
|
|
/// This is the primary write path for the DB-backed pipeline. It updates
|
|
|
|
|
/// the CRDT, the in-memory content store, and the SQLite shadow table.
|
2026-04-30 22:23:21 +00:00
|
|
|
///
|
|
|
|
|
/// The metadata in `meta` is authoritative: this function does NOT parse
|
|
|
|
|
/// `content` to extract front-matter fields. Callers must pass typed
|
|
|
|
|
/// metadata explicitly via `ItemMeta`.
|
|
|
|
|
pub fn write_item_with_content(story_id: &str, stage: &str, content: &str, meta: ItemMeta) {
|
|
|
|
|
let depends_on_json = meta
|
|
|
|
|
.depends_on
|
|
|
|
|
.as_ref()
|
|
|
|
|
.and_then(|d| serde_json::to_string(d).ok());
|
2026-04-29 15:49:50 +00:00
|
|
|
|
|
|
|
|
// Update in-memory content store.
|
|
|
|
|
ensure_content_store();
|
|
|
|
|
write_content(story_id, content);
|
|
|
|
|
|
|
|
|
|
// Primary: CRDT ops.
|
2026-05-12 22:31:59 +01:00
|
|
|
let stage = normalise_stage_str(stage);
|
|
|
|
|
let Some(typed_stage) = crate::pipeline_state::Stage::from_dir(stage) else {
|
|
|
|
|
crate::slog!(
|
|
|
|
|
"[db] write_item_with_content: unknown stage '{stage}' for {story_id}; skipping CRDT write"
|
|
|
|
|
);
|
|
|
|
|
return;
|
2026-04-29 15:49:50 +00:00
|
|
|
};
|
2026-05-12 22:31:59 +01:00
|
|
|
let merged_at_ts = matches!(typed_stage, crate::pipeline_state::Stage::Done { .. })
|
|
|
|
|
.then(|| chrono::Utc::now().timestamp() as f64);
|
2026-04-29 15:49:50 +00:00
|
|
|
crate::crdt_state::write_item(
|
|
|
|
|
story_id,
|
2026-05-12 22:31:59 +01:00
|
|
|
&typed_stage,
|
2026-04-30 22:23:21 +00:00
|
|
|
meta.name.as_deref(),
|
|
|
|
|
meta.agent.as_deref(),
|
|
|
|
|
meta.retry_count,
|
|
|
|
|
meta.blocked,
|
|
|
|
|
depends_on_json.as_deref(),
|
2026-04-29 15:49:50 +00:00
|
|
|
None,
|
|
|
|
|
None,
|
|
|
|
|
merged_at_ts,
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Shadow: pipeline_items table (only when DB is initialised).
|
|
|
|
|
if let Some(db) = PIPELINE_DB.get() {
|
|
|
|
|
let msg = PipelineWriteMsg {
|
|
|
|
|
story_id: story_id.to_string(),
|
|
|
|
|
stage: stage.to_string(),
|
2026-04-30 22:23:21 +00:00
|
|
|
name: meta.name,
|
|
|
|
|
agent: meta.agent,
|
|
|
|
|
retry_count: meta.retry_count,
|
|
|
|
|
blocked: meta.blocked,
|
|
|
|
|
depends_on: depends_on_json,
|
2026-04-29 15:49:50 +00:00
|
|
|
content: Some(content.to_string()),
|
|
|
|
|
};
|
|
|
|
|
let _ = db.tx.send(msg);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Update only the stage of an existing item (used by move operations).
|
|
|
|
|
///
|
|
|
|
|
/// Reads current content from the in-memory store, updates the CRDT stage,
|
|
|
|
|
/// and persists the change. Optionally modifies the content (e.g. to clear
|
|
|
|
|
/// front-matter fields).
|
|
|
|
|
pub fn move_item_stage(
|
|
|
|
|
story_id: &str,
|
|
|
|
|
new_stage: &str,
|
|
|
|
|
content_transform: Option<&dyn Fn(&str) -> String>,
|
|
|
|
|
) {
|
|
|
|
|
let current_content = read_content(story_id);
|
|
|
|
|
|
|
|
|
|
let content = match (¤t_content, content_transform) {
|
|
|
|
|
(Some(c), Some(transform)) => {
|
|
|
|
|
let new_content = transform(c);
|
|
|
|
|
write_content(story_id, &new_content);
|
|
|
|
|
Some(new_content)
|
|
|
|
|
}
|
|
|
|
|
(Some(c), None) => Some(c.clone()),
|
|
|
|
|
_ => None,
|
|
|
|
|
};
|
|
|
|
|
|
2026-05-12 20:15:08 +01:00
|
|
|
// Story 929: metadata (name/agent/blocked/depends_on) is owned by the
|
|
|
|
|
// CRDT typed registers — no need to re-derive it from the content body's
|
|
|
|
|
// YAML front matter on every stage transition. Pass `None` for those
|
|
|
|
|
// fields so write_item leaves the existing registers untouched.
|
2026-05-12 22:31:59 +01:00
|
|
|
let new_stage = normalise_stage_str(new_stage);
|
|
|
|
|
let Some(typed_stage) = crate::pipeline_state::Stage::from_dir(new_stage) else {
|
|
|
|
|
crate::slog!(
|
|
|
|
|
"[db] move_item_stage: unknown stage '{new_stage}' for {story_id}; skipping CRDT write"
|
|
|
|
|
);
|
|
|
|
|
return;
|
2026-04-29 15:49:50 +00:00
|
|
|
};
|
2026-05-12 22:31:59 +01:00
|
|
|
let merged_at_ts = matches!(typed_stage, crate::pipeline_state::Stage::Done { .. })
|
|
|
|
|
.then(|| chrono::Utc::now().timestamp() as f64);
|
2026-04-29 15:49:50 +00:00
|
|
|
crate::crdt_state::write_item(
|
|
|
|
|
story_id,
|
2026-05-12 22:31:59 +01:00
|
|
|
&typed_stage,
|
2026-04-29 15:49:50 +00:00
|
|
|
None,
|
2026-05-12 20:15:08 +01:00
|
|
|
None,
|
|
|
|
|
None,
|
|
|
|
|
None,
|
|
|
|
|
None,
|
2026-04-29 15:49:50 +00:00
|
|
|
None,
|
|
|
|
|
None,
|
|
|
|
|
merged_at_ts,
|
|
|
|
|
);
|
|
|
|
|
// Bug 780: stage transitions reset retry_count to 0. retry_count tracks
|
|
|
|
|
// attempts at THIS stage's work (coding, merging, qa); a fresh attempt at
|
|
|
|
|
// a new stage is conceptually distinct from prior attempts at a different
|
|
|
|
|
// stage. `blocked` is preserved — that's a human-set signal that survives
|
|
|
|
|
// transitions.
|
|
|
|
|
crate::crdt_state::set_retry_count(story_id, 0);
|
|
|
|
|
|
2026-05-12 20:15:08 +01:00
|
|
|
// Shadow table — read current metadata from the CRDT so the SQLite
|
|
|
|
|
// mirror stays in sync. Always reset retry_count to 0 on stage transition.
|
2026-04-29 15:49:50 +00:00
|
|
|
if let Some(db) = PIPELINE_DB.get() {
|
2026-05-12 20:15:08 +01:00
|
|
|
let view = crate::crdt_state::read_item(story_id);
|
|
|
|
|
let name = view.as_ref().and_then(|v| v.name().map(str::to_string));
|
|
|
|
|
let agent = view.as_ref().and_then(|v| v.agent().map(str::to_string));
|
|
|
|
|
let blocked = view.as_ref().map(|v| v.blocked());
|
|
|
|
|
let depends_on = view
|
|
|
|
|
.as_ref()
|
|
|
|
|
.map(|v| v.depends_on())
|
|
|
|
|
.filter(|d| !d.is_empty())
|
|
|
|
|
.and_then(|d| serde_json::to_string(d).ok());
|
2026-04-29 15:49:50 +00:00
|
|
|
let msg = PipelineWriteMsg {
|
|
|
|
|
story_id: story_id.to_string(),
|
|
|
|
|
stage: new_stage.to_string(),
|
|
|
|
|
name,
|
|
|
|
|
agent,
|
2026-05-12 20:15:08 +01:00
|
|
|
retry_count: Some(0),
|
2026-04-29 15:49:50 +00:00
|
|
|
blocked,
|
|
|
|
|
depends_on,
|
|
|
|
|
content,
|
|
|
|
|
};
|
|
|
|
|
let _ = db.tx.send(msg);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Delete a story from the shadow table (fire-and-forget).
|
|
|
|
|
pub fn delete_item(story_id: &str) {
|
|
|
|
|
delete_content(story_id);
|
|
|
|
|
|
|
|
|
|
if let Some(db) = PIPELINE_DB.get() {
|
|
|
|
|
// Reuse the channel with a special "deleted" stage marker.
|
|
|
|
|
// The background task will handle it.
|
|
|
|
|
// Actually, we send a delete message by abusing the write — we'll
|
|
|
|
|
// just remove it by setting stage to "deleted".
|
|
|
|
|
let msg = PipelineWriteMsg {
|
|
|
|
|
story_id: story_id.to_string(),
|
|
|
|
|
stage: "deleted".to_string(),
|
|
|
|
|
name: None,
|
|
|
|
|
agent: None,
|
|
|
|
|
retry_count: None,
|
|
|
|
|
blocked: None,
|
|
|
|
|
depends_on: None,
|
|
|
|
|
content: None,
|
|
|
|
|
};
|
|
|
|
|
let _ = db.tx.send(msg);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Get the next available item number by scanning both the CRDT state
|
|
|
|
|
/// and the in-memory content store for the highest existing number.
|
|
|
|
|
pub fn next_item_number() -> u32 {
|
|
|
|
|
let mut max_num: u32 = 0;
|
|
|
|
|
|
|
|
|
|
// Scan CRDT items via typed projection.
|
|
|
|
|
for item in crate::pipeline_state::read_all_typed() {
|
|
|
|
|
let num_str: String = item
|
|
|
|
|
.story_id
|
|
|
|
|
.0
|
|
|
|
|
.chars()
|
|
|
|
|
.take_while(|c| c.is_ascii_digit())
|
|
|
|
|
.collect();
|
|
|
|
|
if let Ok(n) = num_str.parse::<u32>()
|
|
|
|
|
&& n > max_num
|
|
|
|
|
{
|
|
|
|
|
max_num = n;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Also scan the content store (might have items not yet in CRDT).
|
|
|
|
|
for id in all_content_ids() {
|
|
|
|
|
let num_str: String = id.chars().take_while(|c| c.is_ascii_digit()).collect();
|
|
|
|
|
if let Ok(n) = num_str.parse::<u32>()
|
|
|
|
|
&& n > max_num
|
|
|
|
|
{
|
|
|
|
|
max_num = n;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
max_num + 1
|
|
|
|
|
}
|