//! CRDT write operations for individual pipeline items. //! //! Provides typed setters for agent, QA mode, retry count, and the primary //! `write_item` function that inserts or updates pipeline items in the CRDT. use bft_json_crdt::json_crdt::{CrdtNode, JsonValue}; use bft_json_crdt::lww_crdt::LwwRegisterCrdt; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use super::super::state::{apply_and_persist, emit_event, get_crdt, rebuild_index}; use super::super::types::CrdtEvent; use crate::io::story_metadata::QaMode; use crate::pipeline_state::{Stage, stage_dir_name}; /// Set the typed `depends_on` CRDT register for a pipeline item. /// /// Encodes `deps` as a compact JSON array string (e.g. `"[837]"`) and writes it /// to the item's `depends_on` register. An empty slice clears the register to an /// empty string, which means "no dependencies". /// /// Returns `true` if the item was found and the op was applied, `false` otherwise. pub fn set_depends_on(story_id: &str, deps: &[u32]) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; let value = if deps.is_empty() { String::new() } else { serde_json::to_string(deps).unwrap_or_default() }; apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].depends_on.set(value)); true } /// Set the `item_type` CRDT register for a pipeline item (sub-story 933). /// /// `Some(t)` writes the canonical type string (e.g. `"story"`, `"epic"`, `"bug"`). /// `None` clears the register to an empty string, which means "use the /// id-prefix heuristic" (see `item_type_from_id`). /// /// Returns `true` if the item was found and the op was applied, `false` otherwise. pub fn set_item_type( story_id: &str, item_type: Option, ) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; let value = item_type .map(|t| t.as_str().to_string()) .unwrap_or_default(); apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].item_type.set(value)); true } /// Set the `epic` CRDT register for a pipeline item (sub-story 933). /// /// `Some(id)` links the item to its parent epic (stored as the numeric string, /// e.g. `"9990"` for `EpicId(9990)`). /// `None` clears the register to an empty string (no epic membership). /// /// Returns `true` if the item was found and the op was applied, `false` otherwise. pub fn set_epic(story_id: &str, epic_id: Option) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; let value = epic_id.map(|e| e.to_string()).unwrap_or_default(); apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].epic.set(value)); true } /// Set the `resume_to` CRDT register for a pipeline item (story 945). /// /// This sibling register stores the wire-form stage name (`"coding"`, `"qa"`, /// etc.) that a `Stage::Frozen` or `Stage::ReviewHold` variant should resume /// to. Empty string means "no resume target stored" (defaults to `Coding` /// on read). /// /// Returns `true` if the item was found and the op was applied, `false` otherwise. pub fn set_resume_to(story_id: &str, stage: &Stage) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; let value = stage_dir_name(stage).to_string(); apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].resume_to.set(value)); true } /// Set the `name` field for a pipeline item by its story ID. /// /// `Some(name)` writes the human-readable name into the CRDT register. /// `None` clears the register by writing an empty string. /// /// Returns `true` if the item was found and the write was performed. pub fn set_name(story_id: &str, name: Option<&str>) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; let value = name.unwrap_or("").to_string(); apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].name.set(value.clone()) }); true } /// Set the `agent` field for a pipeline item by its story ID. /// /// `Some(name)` writes the agent name into the CRDT register. /// `None` clears the register by writing an empty string — use this /// to unpin an agent without touching the surrounding item. /// /// This is the typed setter counterpart to [`write_item`]'s `agent` parameter. /// Callers that only need to update the agent (e.g. the `update_story` MCP tool /// and the Matrix `!assign` command) should prefer this function over /// passing `agent` through the full [`write_item`] call, which requires all /// other fields to be known. /// /// Returns `true` if the item was found and the write was performed. pub fn set_agent(story_id: &str, agent: Option<&str>) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; let value = agent.unwrap_or("").to_string(); apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].agent.set(value.clone()) }); true } /// Set the typed `qa_mode` CRDT register for a pipeline item. /// /// Passing `Some(mode)` writes the mode string (e.g. `"server"`, `"agent"`, `"human"`) /// to the item's `qa_mode` register and persists a signed op. /// Passing `None` clears the register to an empty string, which means /// "use the project default" (same as if the field was never set). /// /// Returns `true` if the item was found and the op was applied, `false` otherwise. pub fn set_qa_mode(story_id: &str, mode: Option) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; let value = mode.map(|m| m.as_str().to_string()).unwrap_or_default(); apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].qa_mode.set(value)); true } /// Write a pipeline item state through CRDT operations. /// /// If the item exists, updates its registers. If not, inserts a new item /// into the list. All ops are signed and persisted to SQLite. /// /// When the stage changes (or a new item is created), a [`CrdtEvent`] is /// broadcast so subscribers can react to the transition. /// /// `stage` is the typed pipeline state; it is serialised to the canonical /// clean wire form (story 934) via [`stage_dir_name`] at the CRDT boundary. #[allow(clippy::too_many_arguments)] pub fn write_item( story_id: &str, stage: &Stage, name: Option<&str>, agent: Option<&str>, retry_count: Option, depends_on: Option<&str>, claimed_by: Option<&str>, claimed_at: Option, merged_at: Option, ) { let stage_str = stage_dir_name(stage); let Some(state_mutex) = get_crdt() else { return; }; let Ok(mut state) = state_mutex.lock() else { return; }; // Reject any write (insert or update) for a tombstoned story_id. // This prevents a concurrent or late-arriving write from resurrecting // a story that was permanently deleted via evict_item. if state.tombstones.contains(story_id) { return; } if let Some(&idx) = state.index.get(story_id) { // Capture the old stage before updating so we can detect transitions. let old_stage = match state.crdt.doc.items[idx].stage.view() { JsonValue::String(s) => Some(s), _ => None, }; // Update existing item registers. apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].stage.set(stage_str.to_string()) }); if let Some(n) = name { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].name.set(n.to_string()) }); } if let Some(a) = agent { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].agent.set(a.to_string()) }); } if let Some(rc) = retry_count { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].retry_count.set(rc as f64) }); } if let Some(d) = depends_on { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].depends_on.set(d.to_string()) }); } if let Some(cb) = claimed_by { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].claimed_by.set(cb.to_string()) }); } if let Some(ca) = claimed_at { apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(ca)); } if let Some(ma) = merged_at { apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma)); } // Broadcast a CrdtEvent if the stage actually changed. let stage_changed = old_stage.as_deref() != Some(stage_str); if stage_changed { // Read the current name from the CRDT document for the event. let current_name = match state.crdt.doc.items[idx].name.view() { JsonValue::String(s) if !s.is_empty() => Some(s), _ => None, }; emit_event(CrdtEvent { story_id: story_id.to_string(), from_stage: old_stage, to_stage: stage_str.to_string(), name: current_name, }); } } else { // Insert new item. let item_json: JsonValue = json!({ "story_id": story_id, "stage": stage_str, "name": name.unwrap_or(""), "agent": agent.unwrap_or(""), "retry_count": retry_count.unwrap_or(0) as f64, "depends_on": depends_on.unwrap_or(""), "claimed_by": claimed_by.unwrap_or(""), "claimed_at": claimed_at.unwrap_or(0.0), "merged_at": merged_at.unwrap_or(0.0), "qa_mode": "", "item_type": "", "epic": "", "resume_to": "", }) .into(); apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); // Rebuild index after insertion (indices may shift). state.index = rebuild_index(&state.crdt); // Advance the inner registers of the newly-created item to the Lamport // floor so their first local ops don't re-emit low sequence numbers. let floor = state.lamport_floor; if floor > 0 && let Some(&idx) = state.index.get(story_id) { let item = &mut state.crdt.doc.items[idx]; item.story_id.advance_seq(floor); item.stage.advance_seq(floor); item.name.advance_seq(floor); item.agent.advance_seq(floor); item.retry_count.advance_seq(floor); item.depends_on.advance_seq(floor); item.claimed_by.advance_seq(floor); item.claimed_at.advance_seq(floor); item.merged_at.advance_seq(floor); item.qa_mode.advance_seq(floor); item.item_type.advance_seq(floor); item.epic.advance_seq(floor); item.resume_to.advance_seq(floor); } // Broadcast a CrdtEvent for the new item. emit_event(CrdtEvent { story_id: story_id.to_string(), from_stage: None, to_stage: stage_str.to_string(), name: name.map(String::from), }); } } /// Test-only convenience that parses a wire-form stage string and forwards /// to [`write_item`]. Existing tests seed CRDT items with legacy directory /// strings (`"2_current"`, `"4_merge"`, etc.) — this shim keeps that idiom /// working without forcing every test to construct typed `Stage` payloads. /// /// Stages are normalised through [`Stage::from_dir`]: unknown strings cause /// the write to be skipped (with a log line). #[cfg(test)] #[allow(clippy::too_many_arguments)] pub fn write_item_str( story_id: &str, stage: &str, name: Option<&str>, agent: Option<&str>, retry_count: Option, depends_on: Option<&str>, claimed_by: Option<&str>, claimed_at: Option, merged_at: Option, ) { // Normalise pre-934 directory-style strings to clean wire form so // existing test fixtures keep working after stage 6 dropped the legacy // aliases from `Stage::from_dir`. See `db::ops::normalise_stage_str` // for the user-facing equivalent on the db boundary. let normalised = match stage { "0_upcoming" => "upcoming", "1_backlog" => "backlog", "2_current" => "coding", "2_blocked" => "blocked", "3_qa" => "qa", "4_merge" => "merge", "4_merge_failure" => "merge_failure", "5_done" => "done", "6_archived" => "archived", other => other, }; let Some(typed) = Stage::from_dir(normalised) else { crate::slog!("[crdt_state] write_item_str: unknown stage '{stage}' for {story_id}"); return; }; write_item( story_id, &typed, name, agent, retry_count, depends_on, claimed_by, claimed_at, merged_at, ); } /// Set `retry_count` to an explicit value for a pipeline item. /// /// Pure metadata operation — the item's stage is not changed. /// Call `set_retry_count(story_id, 0)` to reset the counter after a /// stage transition or an explicit unblock. pub fn set_retry_count(story_id: &str, count: i64) { let Some(state_mutex) = get_crdt() else { return; }; let Ok(mut state) = state_mutex.lock() else { return; }; if let Some(&idx) = state.index.get(story_id) { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].retry_count.set(count as f64) }); } } /// Increment `retry_count` by 1 and return the new value. /// /// Pure metadata operation — the item's stage is not changed. /// Returns 0 if the item is not found in the CRDT (no-op in that case). /// Use the returned value to decide whether the story should be blocked. pub fn bump_retry_count(story_id: &str) -> i64 { let Some(state_mutex) = get_crdt() else { return 0; }; let Ok(mut state) = state_mutex.lock() else { return 0; }; let Some(&idx) = state.index.get(story_id) else { return 0; }; let current = match state.crdt.doc.items[idx].retry_count.view() { JsonValue::Number(n) => n as i64, _ => 0, }; let new_count = current + 1; apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].retry_count.set(new_count as f64) }); new_count }