//! CRDT write operations for individual pipeline items. //! //! Provides typed setters for agent, QA mode, retry count, and the primary //! `write_item` function that inserts or updates pipeline items in the CRDT. use bft_json_crdt::json_crdt::{CrdtNode, JsonValue}; use bft_json_crdt::lww_crdt::LwwRegisterCrdt; use bft_json_crdt::op::ROOT_ID; use serde_json::json; use super::super::state::{apply_and_persist, emit_event, get_crdt, rebuild_index}; use super::super::types::CrdtEvent; use crate::io::story_metadata::QaMode; use crate::pipeline_state::{AgentClaim, Stage, stage_dir_name}; /// Set the typed `depends_on` CRDT register for a pipeline item. /// /// Encodes `deps` as a compact JSON array string (e.g. `"[837]"`) and writes it /// to the item's `depends_on` register. An empty slice clears the register to an /// empty string, which means "no dependencies". /// /// Returns `true` if the item was found and the op was applied, `false` otherwise. pub fn set_depends_on(story_id: &str, deps: &[u32]) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; let value = if deps.is_empty() { String::new() } else { serde_json::to_string(deps).unwrap_or_default() }; apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].depends_on.set(value)); true } /// Set the `item_type` CRDT register for a pipeline item (sub-story 933). /// /// `Some(t)` writes the canonical type string (e.g. `"story"`, `"epic"`, `"bug"`). /// `None` clears the register to an empty string, which means "use the /// id-prefix heuristic" (see `item_type_from_id`). /// /// Returns `true` if the item was found and the op was applied, `false` otherwise. pub fn set_item_type( story_id: &str, item_type: Option, ) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; let value = item_type .map(|t| t.as_str().to_string()) .unwrap_or_default(); apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].item_type.set(value)); true } /// Set the `epic` CRDT register for a pipeline item (sub-story 933). /// /// `Some(id)` links the item to its parent epic (stored as the numeric string, /// e.g. `"9990"` for `EpicId(9990)`). /// `None` clears the register to an empty string (no epic membership). /// /// Returns `true` if the item was found and the op was applied, `false` otherwise. pub fn set_epic(story_id: &str, epic_id: Option) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; let value = epic_id.map(|e| e.to_string()).unwrap_or_default(); apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].epic.set(value)); true } /// Set the `resume_to` CRDT register for a pipeline item (story 945). /// /// This sibling register stores the wire-form stage name (`"coding"`, `"qa"`, /// etc.) that a `Stage::Frozen` or `Stage::ReviewHold` variant should resume /// to. Empty string means "no resume target stored" (defaults to `Coding` /// on read). /// /// Returns `true` if the item was found and the op was applied, `false` otherwise. pub fn set_resume_to(story_id: &str, stage: &Stage) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; let value = stage_dir_name(stage).to_string(); apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].resume_to.set(value)); true } /// Set the `resume_to` CRDT register to an arbitrary raw string. /// /// Story 984: reuses `resume_to` to carry metadata for `Superseded` /// (`superseded_by` story ID) and `Rejected` (`reason` string). These /// stages never have a resume target, so the register is exclusively /// available for their metadata. /// /// Returns `true` if the item was found and the op was applied. pub fn set_resume_to_raw(story_id: &str, value: &str) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].resume_to.set(value.to_string()) }); true } /// Set the `name` field for a pipeline item by its story ID. /// /// `Some(name)` writes the human-readable name into the CRDT register. /// `None` clears the register by writing an empty string. /// /// Returns `true` if the item was found and the write was performed. pub fn set_name(story_id: &str, name: Option<&str>) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; let value = name.unwrap_or("").to_string(); apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].name.set(value.clone()) }); true } /// Set the `agent` field for a pipeline item by its story ID. /// /// `Some(name)` writes the agent name into the CRDT register. /// `None` clears the register by writing an empty string — use this /// to unpin an agent without touching the surrounding item. /// /// This is the typed setter counterpart to [`write_item`]'s `agent` parameter. /// Callers that only need to update the agent (e.g. the `update_story` MCP tool /// and the Matrix `!assign` command) should prefer this function over /// passing `agent` through the full [`write_item`] call, which requires all /// other fields to be known. /// /// Returns `true` if the item was found and the write was performed. pub fn set_agent(story_id: &str, agent: Option) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; let value = agent.map(|a| a.as_str().to_string()).unwrap_or_default(); apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].agent.set(value.clone()) }); true } /// Set the typed `qa_mode` CRDT register for a pipeline item. /// /// Passing `Some(mode)` writes the mode string (e.g. `"server"`, `"agent"`, `"human"`) /// to the item's `qa_mode` register and persists a signed op. /// Passing `None` clears the register to an empty string, which means /// "use the project default" (same as if the field was never set). /// /// Returns `true` if the item was found and the op was applied, `false` otherwise. pub fn set_qa_mode(story_id: &str, mode: Option) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; let value = mode.map(|m| m.as_str().to_string()).unwrap_or_default(); apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].qa_mode.set(value)); true } /// Set the `plan_state` CRDT register for a pipeline item (story 1010). /// /// Encodes the PLAN.md lifecycle as a wire string (`"missing"`, `"drafted"`, /// `"confirmed"`). Called by the filesystem watcher when PLAN.md is created, /// modified, or removed inside a coding worktree. /// /// Returns `true` if the item was found and the op was applied, `false` otherwise. pub fn set_plan_state(story_id: &str, state: crate::pipeline_state::PlanState) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut crdt_state) = state_mutex.lock() else { return false; }; let Some(&idx) = crdt_state.index.get(story_id) else { return false; }; let value = state.as_str().to_string(); apply_and_persist(&mut crdt_state, |s| { s.crdt.doc.items[idx].plan_state.set(value) }); true } /// Set the `origin` CRDT register for a pipeline item (story 1088). /// /// Writes a compact JSON string describing who or what created the item, e.g. /// `{"kind":"user","id":"","ts":1716768000.0}` or /// `{"kind":"agent","id":"coder-1","ts":1716768000.0}`. /// /// Passing an empty string is treated as "no origin set" (equivalent to the /// pre-1088 state for older items). Returns `true` if the item was found and /// the op was applied, `false` otherwise. pub fn set_origin(story_id: &str, origin: &str) -> bool { let Some(state_mutex) = get_crdt() else { return false; }; let Ok(mut state) = state_mutex.lock() else { return false; }; let Some(&idx) = state.index.get(story_id) else { return false; }; apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].origin.set(origin.to_string()) }); true } /// Write a pipeline item state through CRDT operations. /// /// If the item exists, updates its registers. If not, inserts a new item /// into the list. All ops are signed and persisted to SQLite. /// /// When the stage changes (or a new item is created), a [`CrdtEvent`] is /// broadcast so subscribers can react to the transition. /// /// `stage` is the typed pipeline state; it is serialised to the canonical /// clean wire form (story 934) via [`stage_dir_name`] at the CRDT boundary. /// The `retries` count embedded in `Stage::Coding` / `Stage::Merge` is /// automatically written to the `retry_count` CRDT register (story 997). pub fn write_item( story_id: &str, stage: &Stage, name: Option<&str>, agent: Option<&str>, depends_on: Option<&str>, merged_at: Option, ) { let stage_str = stage_dir_name(stage); // Story 1086: persist the typed Pipeline + Status projections alongside // the stage register so subscribers/display code on remote peers can route // by them without re-deriving from the stage string. let pipeline_str = stage.pipeline().as_str(); let status_str = stage.status().as_str(); let claim: Option<&AgentClaim> = match stage { Stage::Coding { claim, .. } => claim.as_ref(), Stage::Merge { claim, .. } => claim.as_ref(), _ => None, }; // Extract retries from the Stage payload; non-Coding/Merge stages store 0. let stage_retries: f64 = match stage { Stage::Coding { retries, .. } => *retries as f64, Stage::Merge { retries, .. } => *retries as f64, _ => 0.0, }; // Extract merge_server_start from Stage::Merge; 0.0 clears the register. let merge_server_start_val: f64 = match stage { Stage::Merge { server_start_time: Some(t), .. } => *t, _ => 0.0, }; let Some(state_mutex) = get_crdt() else { return; }; let Ok(mut state) = state_mutex.lock() else { return; }; // Reject any write (insert or update) for a tombstoned story_id. // This prevents a concurrent or late-arriving write from resurrecting // a story that was permanently deleted via evict_item. // // Historically this was a silent return, which caused split-brain // half-writes when an upstream allocator (e.g. db::next_item_number) // handed out a tombstoned numeric ID (bug 1001). We log a WARN here // so the failure mode is visible; the structural fix is that callers // who allocate fresh IDs must consult `is_tombstoned` first AND // verify the write via `read_item` afterwards. if state.tombstones.contains(story_id) { crate::slog_warn!( "[crdt_state] write_item rejected for tombstoned story_id '{story_id}' \ (stage='{stage_str}'); caller should have skipped this ID" ); return; } if let Some(&idx) = state.index.get(story_id) { // Capture the old stage before updating so we can detect transitions. let old_stage = match state.crdt.doc.items[idx].stage.view() { JsonValue::String(s) => Some(s), _ => None, }; // Update existing item registers. apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].stage.set(stage_str.to_string()) }); // Story 1086: keep `pipeline` and `status` registers in lock-step with // the stage write so subscribers/display can read them directly. apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].pipeline.set(pipeline_str.to_string()) }); apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].status.set(status_str.to_string()) }); if let Some(n) = name { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].name.set(n.to_string()) }); } if let Some(a) = agent { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].agent.set(a.to_string()) }); } apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].retry_count.set(stage_retries) }); if let Some(d) = depends_on { apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].depends_on.set(d.to_string()) }); } let (claim_agent_str, claim_ts_val) = match claim { Some(c) => ( c.agent.0.as_str().to_string(), c.claimed_at.timestamp() as f64, ), None => (String::new(), 0.0), }; apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].claim_agent.set(claim_agent_str) }); apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx].claim_ts.set(claim_ts_val) }); apply_and_persist(&mut state, |s| { s.crdt.doc.items[idx] .merge_server_start .set(merge_server_start_val) }); if let Some(ma) = merged_at { apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma)); } // Broadcast a CrdtEvent if the stage actually changed. let stage_changed = old_stage.as_deref() != Some(stage_str); if stage_changed { // Read the current name from the CRDT document for the event. let current_name = match state.crdt.doc.items[idx].name.view() { JsonValue::String(s) if !s.is_empty() => s, _ => String::new(), }; // Storage seam: convert the old raw CRDT stage string to a typed Stage. let from_stage = old_stage.and_then(|s| Stage::from_dir(&s)); emit_event(CrdtEvent { story_id: story_id.to_string(), from_stage, to_stage: stage.clone(), name: current_name, }); } } else { // Insert new item. let (insert_claim_agent, insert_claim_ts) = match claim { Some(c) => ( c.agent.0.as_str().to_string(), c.claimed_at.timestamp() as f64, ), None => (String::new(), 0.0), }; let item_json: JsonValue = json!({ "story_id": story_id, "stage": stage_str, "name": name.unwrap_or(""), "agent": agent.unwrap_or(""), "retry_count": stage_retries, "depends_on": depends_on.unwrap_or(""), "claim_agent": insert_claim_agent, "claim_ts": insert_claim_ts, "merged_at": merged_at.unwrap_or(0.0), "qa_mode": "", "item_type": "", "epic": "", "resume_to": "", "plan_state": "", "merge_server_start": merge_server_start_val, // Story 1086: typed Pipeline + Status projections written at insert. "pipeline": pipeline_str, "status": status_str, "origin": "", }) .into(); apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json)); // Rebuild index after insertion (indices may shift). state.index = rebuild_index(&state.crdt); // Advance the inner registers of the newly-created item to the Lamport // floor so their first local ops don't re-emit low sequence numbers. let floor = state.lamport_floor; if floor > 0 && let Some(&idx) = state.index.get(story_id) { let item = &mut state.crdt.doc.items[idx]; item.story_id.advance_seq(floor); item.stage.advance_seq(floor); item.name.advance_seq(floor); item.agent.advance_seq(floor); item.retry_count.advance_seq(floor); item.depends_on.advance_seq(floor); item.claim_agent.advance_seq(floor); item.claim_ts.advance_seq(floor); item.merged_at.advance_seq(floor); item.qa_mode.advance_seq(floor); item.item_type.advance_seq(floor); item.epic.advance_seq(floor); item.resume_to.advance_seq(floor); item.plan_state.advance_seq(floor); item.merge_server_start.advance_seq(floor); // Story 1086. item.pipeline.advance_seq(floor); item.status.advance_seq(floor); item.origin.advance_seq(floor); } // Broadcast a CrdtEvent for the new item. emit_event(CrdtEvent { story_id: story_id.to_string(), from_stage: None, to_stage: stage.clone(), name: name.unwrap_or("").to_string(), }); } } /// Test-only convenience that parses a wire-form stage string and forwards /// to [`write_item`]. Existing tests seed CRDT items with legacy directory /// strings (`"2_current"`, `"4_merge"`, etc.) — this shim keeps that idiom /// working without forcing every test to construct typed `Stage` payloads. /// /// Stages are normalised through [`Stage::from_dir`]: unknown strings cause /// the write to be skipped (with a log line). #[cfg(test)] pub fn write_item_str( story_id: &str, stage: &str, name: Option<&str>, agent: Option<&str>, depends_on: Option<&str>, merged_at: Option, ) { // Normalise pre-934 directory-style strings to clean wire form so // existing test fixtures keep working after stage 6 dropped the legacy // aliases from `Stage::from_dir`. See `db::ops::normalise_stage_str` // for the user-facing equivalent on the db boundary. let normalised = match stage { "0_upcoming" => "upcoming", "1_backlog" => "backlog", "2_current" => "coding", "2_blocked" => "blocked", "3_qa" => "qa", "4_merge" => "merge", "4_merge_failure" => "merge_failure", "5_done" => "done", "6_archived" => "archived", other => other, }; let Some(typed) = Stage::from_dir(normalised) else { crate::slog!("[crdt_state] write_item_str: unknown stage '{stage}' for {story_id}"); return; }; write_item(story_id, &typed, name, agent, depends_on, merged_at); } /// Set `retries` to an explicit value for a pipeline item via a Stage transition. /// /// Reads the current Stage from the CRDT, updates the `retries` field (only /// meaningful for `Stage::Coding` and `Stage::Merge`), and writes back via /// `write_item`. No-op for items not in a Coding or Merge stage. pub fn set_retry_count(story_id: &str, count: i64) { let Some(item) = super::super::read::read_item(story_id) else { return; }; let new_stage = match item.stage().clone() { Stage::Coding { claim, plan, retries: _, } => Stage::Coding { claim, plan, retries: count.max(0) as u32, }, Stage::Merge { feature_branch, commits_ahead, claim, retries: _, server_start_time, } => Stage::Merge { feature_branch, commits_ahead, claim, retries: count.max(0) as u32, server_start_time, }, _ => return, }; write_item(story_id, &new_stage, None, None, None, None); } /// Increment `retries` by 1 and return the new value. /// /// Reads the current Stage, increments the embedded `retries` field, and /// writes back via `write_item`. Returns `0` if the item is not found or is /// not in a Coding or Merge stage (no-op in that case). pub fn bump_retry_count(story_id: &str) -> i64 { let Some(item) = super::super::read::read_item(story_id) else { return 0; }; let (new_stage, new_retries) = match item.stage().clone() { Stage::Coding { claim, plan, retries, } => { let n = retries + 1; ( Stage::Coding { claim, plan, retries: n, }, n, ) } Stage::Merge { feature_branch, commits_ahead, claim, retries, server_start_time, } => { let n = retries + 1; ( Stage::Merge { feature_branch, commits_ahead, claim, retries: n, server_start_time, }, n, ) } _ => return 0, }; write_item(story_id, &new_stage, None, None, None, None); new_retries as i64 }