huskies: merge 844
This commit is contained in:
@@ -0,0 +1,305 @@
|
||||
//! CRDT write operations for individual pipeline items.
|
||||
//!
|
||||
//! Provides typed setters for agent, QA mode, retry count, and the primary
|
||||
//! `write_item` function that inserts or updates pipeline items in the CRDT.
|
||||
|
||||
use bft_json_crdt::json_crdt::{CrdtNode, JsonValue};
|
||||
use bft_json_crdt::lww_crdt::LwwRegisterCrdt;
|
||||
use bft_json_crdt::op::ROOT_ID;
|
||||
use serde_json::json;
|
||||
|
||||
use super::super::state::{apply_and_persist, emit_event, get_crdt, rebuild_index};
|
||||
use super::super::types::CrdtEvent;
|
||||
use crate::io::story_metadata::QaMode;
|
||||
|
||||
/// Set the typed `depends_on` CRDT register for a pipeline item.
|
||||
///
|
||||
/// Encodes `deps` as a compact JSON array string (e.g. `"[837]"`) and writes it
|
||||
/// to the item's `depends_on` register. An empty slice clears the register to an
|
||||
/// empty string, which means "no dependencies".
|
||||
///
|
||||
/// Returns `true` if the item was found and the op was applied, `false` otherwise.
|
||||
pub fn set_depends_on(story_id: &str, deps: &[u32]) -> bool {
|
||||
let Some(state_mutex) = get_crdt() else {
|
||||
return false;
|
||||
};
|
||||
let Ok(mut state) = state_mutex.lock() else {
|
||||
return false;
|
||||
};
|
||||
let Some(&idx) = state.index.get(story_id) else {
|
||||
return false;
|
||||
};
|
||||
let value = if deps.is_empty() {
|
||||
String::new()
|
||||
} else {
|
||||
serde_json::to_string(deps).unwrap_or_default()
|
||||
};
|
||||
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].depends_on.set(value));
|
||||
true
|
||||
}
|
||||
|
||||
/// Set the `mergemaster_attempted` CRDT flag for a pipeline item.
|
||||
///
|
||||
/// Passing `true` records that a mergemaster session has been spawned for this
|
||||
/// item, preventing repeated auto-spawns across restarts.
|
||||
/// Passing `false` explicitly writes `false` (does not remove the register) so
|
||||
/// the cleared state is distinguishable from an unset register and survives
|
||||
/// CRDT replay correctly.
|
||||
///
|
||||
/// Returns `true` if the item was found and the op was applied, `false` otherwise.
|
||||
pub fn set_mergemaster_attempted(story_id: &str, value: bool) -> bool {
|
||||
let Some(state_mutex) = get_crdt() else {
|
||||
return false;
|
||||
};
|
||||
let Ok(mut state) = state_mutex.lock() else {
|
||||
return false;
|
||||
};
|
||||
let Some(&idx) = state.index.get(story_id) else {
|
||||
return false;
|
||||
};
|
||||
apply_and_persist(&mut state, |s| {
|
||||
s.crdt.doc.items[idx].mergemaster_attempted.set(value)
|
||||
});
|
||||
true
|
||||
}
|
||||
|
||||
/// Set the `agent` field for a pipeline item by its story ID.
|
||||
///
|
||||
/// `Some(name)` writes the agent name into the CRDT register.
|
||||
/// `None` clears the register by writing an empty string — use this
|
||||
/// to unpin an agent without touching the surrounding item.
|
||||
///
|
||||
/// This is the typed setter counterpart to [`write_item`]'s `agent` parameter.
|
||||
/// Callers that only need to update the agent (e.g. the `update_story` MCP tool
|
||||
/// and the Matrix `!assign` command) should prefer this function over
|
||||
/// passing `agent` through the full [`write_item`] call, which requires all
|
||||
/// other fields to be known.
|
||||
///
|
||||
/// Returns `true` if the item was found and the write was performed.
|
||||
pub fn set_agent(story_id: &str, agent: Option<&str>) -> bool {
|
||||
let Some(state_mutex) = get_crdt() else {
|
||||
return false;
|
||||
};
|
||||
let Ok(mut state) = state_mutex.lock() else {
|
||||
return false;
|
||||
};
|
||||
let Some(&idx) = state.index.get(story_id) else {
|
||||
return false;
|
||||
};
|
||||
let value = agent.unwrap_or("").to_string();
|
||||
apply_and_persist(&mut state, |s| {
|
||||
s.crdt.doc.items[idx].agent.set(value.clone())
|
||||
});
|
||||
true
|
||||
}
|
||||
|
||||
/// Set the typed `qa_mode` CRDT register for a pipeline item.
|
||||
///
|
||||
/// Passing `Some(mode)` writes the mode string (e.g. `"server"`, `"agent"`, `"human"`)
|
||||
/// to the item's `qa_mode` register and persists a signed op.
|
||||
/// Passing `None` clears the register to an empty string, which means
|
||||
/// "use the project default" (same as if the field was never set).
|
||||
///
|
||||
/// Returns `true` if the item was found and the op was applied, `false` otherwise.
|
||||
pub fn set_qa_mode(story_id: &str, mode: Option<QaMode>) -> bool {
|
||||
let Some(state_mutex) = get_crdt() else {
|
||||
return false;
|
||||
};
|
||||
let Ok(mut state) = state_mutex.lock() else {
|
||||
return false;
|
||||
};
|
||||
let Some(&idx) = state.index.get(story_id) else {
|
||||
return false;
|
||||
};
|
||||
let value = mode.map(|m| m.as_str().to_string()).unwrap_or_default();
|
||||
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].qa_mode.set(value));
|
||||
true
|
||||
}
|
||||
|
||||
/// Write a pipeline item state through CRDT operations.
|
||||
///
|
||||
/// If the item exists, updates its registers. If not, inserts a new item
|
||||
/// into the list. All ops are signed and persisted to SQLite.
|
||||
///
|
||||
/// When the stage changes (or a new item is created), a [`CrdtEvent`] is
|
||||
/// broadcast so subscribers can react to the transition.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn write_item(
|
||||
story_id: &str,
|
||||
stage: &str,
|
||||
name: Option<&str>,
|
||||
agent: Option<&str>,
|
||||
retry_count: Option<i64>,
|
||||
blocked: Option<bool>,
|
||||
depends_on: Option<&str>,
|
||||
claimed_by: Option<&str>,
|
||||
claimed_at: Option<f64>,
|
||||
merged_at: Option<f64>,
|
||||
) {
|
||||
let Some(state_mutex) = get_crdt() else {
|
||||
return;
|
||||
};
|
||||
let Ok(mut state) = state_mutex.lock() else {
|
||||
return;
|
||||
};
|
||||
|
||||
if let Some(&idx) = state.index.get(story_id) {
|
||||
// Capture the old stage before updating so we can detect transitions.
|
||||
let old_stage = match state.crdt.doc.items[idx].stage.view() {
|
||||
JsonValue::String(s) => Some(s),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// Update existing item registers.
|
||||
apply_and_persist(&mut state, |s| {
|
||||
s.crdt.doc.items[idx].stage.set(stage.to_string())
|
||||
});
|
||||
|
||||
if let Some(n) = name {
|
||||
apply_and_persist(&mut state, |s| {
|
||||
s.crdt.doc.items[idx].name.set(n.to_string())
|
||||
});
|
||||
}
|
||||
if let Some(a) = agent {
|
||||
apply_and_persist(&mut state, |s| {
|
||||
s.crdt.doc.items[idx].agent.set(a.to_string())
|
||||
});
|
||||
}
|
||||
if let Some(rc) = retry_count {
|
||||
apply_and_persist(&mut state, |s| {
|
||||
s.crdt.doc.items[idx].retry_count.set(rc as f64)
|
||||
});
|
||||
}
|
||||
if let Some(b) = blocked {
|
||||
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].blocked.set(b));
|
||||
}
|
||||
if let Some(d) = depends_on {
|
||||
apply_and_persist(&mut state, |s| {
|
||||
s.crdt.doc.items[idx].depends_on.set(d.to_string())
|
||||
});
|
||||
}
|
||||
if let Some(cb) = claimed_by {
|
||||
apply_and_persist(&mut state, |s| {
|
||||
s.crdt.doc.items[idx].claimed_by.set(cb.to_string())
|
||||
});
|
||||
}
|
||||
if let Some(ca) = claimed_at {
|
||||
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].claimed_at.set(ca));
|
||||
}
|
||||
if let Some(ma) = merged_at {
|
||||
apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].merged_at.set(ma));
|
||||
}
|
||||
|
||||
// Broadcast a CrdtEvent if the stage actually changed.
|
||||
let stage_changed = old_stage.as_deref() != Some(stage);
|
||||
if stage_changed {
|
||||
// Read the current name from the CRDT document for the event.
|
||||
let current_name = match state.crdt.doc.items[idx].name.view() {
|
||||
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||
_ => None,
|
||||
};
|
||||
emit_event(CrdtEvent {
|
||||
story_id: story_id.to_string(),
|
||||
from_stage: old_stage,
|
||||
to_stage: stage.to_string(),
|
||||
name: current_name,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// Insert new item.
|
||||
let item_json: JsonValue = json!({
|
||||
"story_id": story_id,
|
||||
"stage": stage,
|
||||
"name": name.unwrap_or(""),
|
||||
"agent": agent.unwrap_or(""),
|
||||
"retry_count": retry_count.unwrap_or(0) as f64,
|
||||
"blocked": blocked.unwrap_or(false),
|
||||
"depends_on": depends_on.unwrap_or(""),
|
||||
"claimed_by": claimed_by.unwrap_or(""),
|
||||
"claimed_at": claimed_at.unwrap_or(0.0),
|
||||
"merged_at": merged_at.unwrap_or(0.0),
|
||||
"qa_mode": "",
|
||||
"mergemaster_attempted": false,
|
||||
})
|
||||
.into();
|
||||
|
||||
apply_and_persist(&mut state, |s| s.crdt.doc.items.insert(ROOT_ID, item_json));
|
||||
|
||||
// Rebuild index after insertion (indices may shift).
|
||||
state.index = rebuild_index(&state.crdt);
|
||||
|
||||
// Advance the inner registers of the newly-created item to the Lamport
|
||||
// floor so their first local ops don't re-emit low sequence numbers.
|
||||
let floor = state.lamport_floor;
|
||||
if floor > 0
|
||||
&& let Some(&idx) = state.index.get(story_id)
|
||||
{
|
||||
let item = &mut state.crdt.doc.items[idx];
|
||||
item.story_id.advance_seq(floor);
|
||||
item.stage.advance_seq(floor);
|
||||
item.name.advance_seq(floor);
|
||||
item.agent.advance_seq(floor);
|
||||
item.retry_count.advance_seq(floor);
|
||||
item.blocked.advance_seq(floor);
|
||||
item.depends_on.advance_seq(floor);
|
||||
item.claimed_by.advance_seq(floor);
|
||||
item.claimed_at.advance_seq(floor);
|
||||
item.merged_at.advance_seq(floor);
|
||||
item.qa_mode.advance_seq(floor);
|
||||
item.mergemaster_attempted.advance_seq(floor);
|
||||
}
|
||||
|
||||
// Broadcast a CrdtEvent for the new item.
|
||||
emit_event(CrdtEvent {
|
||||
story_id: story_id.to_string(),
|
||||
from_stage: None,
|
||||
to_stage: stage.to_string(),
|
||||
name: name.map(String::from),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Set `retry_count` to an explicit value for a pipeline item.
|
||||
///
|
||||
/// Pure metadata operation — the item's stage is not changed.
|
||||
/// Call `set_retry_count(story_id, 0)` to reset the counter after a
|
||||
/// stage transition or an explicit unblock.
|
||||
pub fn set_retry_count(story_id: &str, count: i64) {
|
||||
let Some(state_mutex) = get_crdt() else {
|
||||
return;
|
||||
};
|
||||
let Ok(mut state) = state_mutex.lock() else {
|
||||
return;
|
||||
};
|
||||
if let Some(&idx) = state.index.get(story_id) {
|
||||
apply_and_persist(&mut state, |s| {
|
||||
s.crdt.doc.items[idx].retry_count.set(count as f64)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Increment `retry_count` by 1 and return the new value.
|
||||
///
|
||||
/// Pure metadata operation — the item's stage is not changed.
|
||||
/// Returns 0 if the item is not found in the CRDT (no-op in that case).
|
||||
/// Use the returned value to decide whether the story should be blocked.
|
||||
pub fn bump_retry_count(story_id: &str) -> i64 {
|
||||
let Some(state_mutex) = get_crdt() else {
|
||||
return 0;
|
||||
};
|
||||
let Ok(mut state) = state_mutex.lock() else {
|
||||
return 0;
|
||||
};
|
||||
let Some(&idx) = state.index.get(story_id) else {
|
||||
return 0;
|
||||
};
|
||||
let current = match state.crdt.doc.items[idx].retry_count.view() {
|
||||
JsonValue::Number(n) => n as i64,
|
||||
_ => 0,
|
||||
};
|
||||
let new_count = current + 1;
|
||||
apply_and_persist(&mut state, |s| {
|
||||
s.crdt.doc.items[idx].retry_count.set(new_count as f64)
|
||||
});
|
||||
new_count
|
||||
}
|
||||
Reference in New Issue
Block a user