huskies: merge 515_story_add_a_debug_mcp_tool_to_dump_the_in_memory_crdt_state_for_inspection
This commit is contained in:
@@ -473,6 +473,165 @@ pub fn apply_remote_op(op: SignedOp) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
// ── Debug dump ───────────────────────────────────────────────────────
|
||||
|
||||
/// A raw dump of a single CRDT list entry, including deleted items.
|
||||
///
|
||||
/// Use `content_index` (hex of the list insert `OpId`) to cross-reference
|
||||
/// with rows in the `crdt_ops` SQLite table.
|
||||
pub struct CrdtItemDump {
|
||||
pub story_id: Option<String>,
|
||||
pub stage: Option<String>,
|
||||
pub name: Option<String>,
|
||||
pub agent: Option<String>,
|
||||
pub retry_count: Option<i64>,
|
||||
pub blocked: Option<bool>,
|
||||
pub depends_on: Option<Vec<u32>>,
|
||||
/// Hex-encoded OpId of the list insert op — cross-reference with `crdt_ops`.
|
||||
pub content_index: String,
|
||||
pub is_deleted: bool,
|
||||
}
|
||||
|
||||
/// Top-level debug dump of the in-memory CRDT state.
|
||||
pub struct CrdtStateDump {
|
||||
pub in_memory_state_loaded: bool,
|
||||
/// Count of non-deleted items with a valid story_id and stage.
|
||||
pub total_items: usize,
|
||||
/// Total list-level ops seen (excludes root sentinel).
|
||||
pub total_ops_in_list: usize,
|
||||
/// Highest Lamport sequence number seen across all list-level ops.
|
||||
pub max_seq_in_list: u64,
|
||||
/// Count of ops in the ALL_OPS journal (persisted ops replayed at startup).
|
||||
pub persisted_ops_count: usize,
|
||||
pub items: Vec<CrdtItemDump>,
|
||||
}
|
||||
|
||||
/// Dump the raw in-memory CRDT state for debugging.
|
||||
///
|
||||
/// Unlike [`read_all_items`] this includes tombstoned (deleted) entries and
|
||||
/// exposes internal op metadata (content_index, seq). Pass a `story_id`
|
||||
/// filter to restrict the output to a single item.
|
||||
///
|
||||
/// **This is a debug tool.** For normal pipeline introspection use
|
||||
/// [`read_all_items`] or the `get_pipeline_status` MCP tool instead.
|
||||
pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump {
|
||||
let in_memory_state_loaded = CRDT_STATE.get().is_some();
|
||||
|
||||
let persisted_ops_count = ALL_OPS
|
||||
.get()
|
||||
.and_then(|m| m.lock().ok().map(|v| v.len()))
|
||||
.unwrap_or(0);
|
||||
|
||||
let Some(state_mutex) = CRDT_STATE.get() else {
|
||||
return CrdtStateDump {
|
||||
in_memory_state_loaded,
|
||||
total_items: 0,
|
||||
total_ops_in_list: 0,
|
||||
max_seq_in_list: 0,
|
||||
persisted_ops_count,
|
||||
items: Vec::new(),
|
||||
};
|
||||
};
|
||||
|
||||
let Ok(state) = state_mutex.lock() else {
|
||||
return CrdtStateDump {
|
||||
in_memory_state_loaded,
|
||||
total_items: 0,
|
||||
total_ops_in_list: 0,
|
||||
max_seq_in_list: 0,
|
||||
persisted_ops_count,
|
||||
items: Vec::new(),
|
||||
};
|
||||
};
|
||||
|
||||
let total_items = state.crdt.doc.items.iter().count();
|
||||
|
||||
let max_seq_in_list = state
|
||||
.crdt
|
||||
.doc
|
||||
.items
|
||||
.ops
|
||||
.iter()
|
||||
.map(|op| op.seq)
|
||||
.max()
|
||||
.unwrap_or(0);
|
||||
|
||||
// Subtract 1 for the root sentinel.
|
||||
let total_ops_in_list = state.crdt.doc.items.ops.len().saturating_sub(1);
|
||||
|
||||
let mut items = Vec::new();
|
||||
for op in &state.crdt.doc.items.ops {
|
||||
// Skip root sentinel (id == [0u8; 32]).
|
||||
if op.id == ROOT_ID {
|
||||
continue;
|
||||
}
|
||||
let Some(ref item_crdt) = op.content else {
|
||||
// No content — skip (orphaned slot, should not happen in normal use).
|
||||
continue;
|
||||
};
|
||||
|
||||
let story_id = match item_crdt.story_id.view() {
|
||||
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// Apply story_id filter before doing any further work.
|
||||
if let Some(filter) = story_id_filter
|
||||
&& story_id.as_deref() != Some(filter)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let stage = match item_crdt.stage.view() {
|
||||
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||
_ => None,
|
||||
};
|
||||
let name = match item_crdt.name.view() {
|
||||
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||
_ => None,
|
||||
};
|
||||
let agent = match item_crdt.agent.view() {
|
||||
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||
_ => None,
|
||||
};
|
||||
let retry_count = match item_crdt.retry_count.view() {
|
||||
JsonValue::Number(n) if n > 0.0 => Some(n as i64),
|
||||
_ => None,
|
||||
};
|
||||
let blocked = match item_crdt.blocked.view() {
|
||||
JsonValue::Bool(b) => Some(b),
|
||||
_ => None,
|
||||
};
|
||||
let depends_on = match item_crdt.depends_on.view() {
|
||||
JsonValue::String(s) if !s.is_empty() => serde_json::from_str::<Vec<u32>>(&s).ok(),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let content_index = op.id.iter().map(|b| format!("{b:02x}")).collect::<String>();
|
||||
|
||||
items.push(CrdtItemDump {
|
||||
story_id,
|
||||
stage,
|
||||
name,
|
||||
agent,
|
||||
retry_count,
|
||||
blocked,
|
||||
depends_on,
|
||||
content_index,
|
||||
is_deleted: op.is_deleted,
|
||||
});
|
||||
}
|
||||
|
||||
CrdtStateDump {
|
||||
in_memory_state_loaded,
|
||||
total_items,
|
||||
total_ops_in_list,
|
||||
max_seq_in_list,
|
||||
persisted_ops_count,
|
||||
items,
|
||||
}
|
||||
}
|
||||
|
||||
// ── Read path ────────────────────────────────────────────────────────
|
||||
|
||||
/// Read the full pipeline state from the CRDT document.
|
||||
|
||||
Reference in New Issue
Block a user