From f015fe5a1d5cf4aae112291ec9023ff3d87b6759 Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 10 Apr 2026 10:20:41 +0000 Subject: [PATCH] huskies: merge 515_story_add_a_debug_mcp_tool_to_dump_the_in_memory_crdt_state_for_inspection --- README.md | 33 ++++++ server/src/crdt_state.rs | 159 +++++++++++++++++++++++++++++ server/src/http/mcp/diagnostics.rs | 84 +++++++++++++++ server/src/http/mcp/mod.rs | 19 +++- server/src/http/mod.rs | 59 +++++++++++ 5 files changed, 353 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 6d0dbf18..9bf0486f 100644 --- a/README.md +++ b/README.md @@ -130,6 +130,39 @@ includes a count and a suggestion to check the server logs. Set `reconcile_on_startup = false` in `.huskies/project.toml` to disable the pass during the migration window if it produces noise. +## Debugging + +### Inspecting the in-memory CRDT state + +When diagnosing state issues, use the `dump_crdt` MCP tool or the `/debug/crdt` HTTP endpoint to inspect the raw in-memory CRDT state directly. These surfaces show the ground truth that the running server holds — not a summarised pipeline view and not the persisted SQLite ops. + +**MCP tool** (from Claude Code or any MCP client): + +``` +mcp__huskies__dump_crdt +# dump everything +{} + +# restrict to a single item +{"story_id": "42_story_my_feature"} +``` + +**HTTP endpoint** (browser or curl): + +```bash +# dump everything +curl http://localhost:3001/debug/crdt + +# restrict to a single item +curl "http://localhost:3001/debug/crdt?story_id=42_story_my_feature" +``` + +Both return a JSON document with: + +- **`metadata`** — `in_memory_state_loaded`, `total_items`, `total_ops_in_list`, `max_seq_in_list`, `persisted_ops_count`, `pending_persist_ops_count` +- **`items`** — one entry per CRDT list item (including tombstoned/deleted entries), each with `story_id`, `stage`, `name`, `agent`, `retry_count`, `blocked`, `depends_on`, `content_index` (hex OpId for cross-referencing with `crdt_ops`), and `is_deleted` + +> **This is a debug tool.** For normal pipeline introspection use `get_pipeline_status` or `GET /api/pipeline` instead. ## License diff --git a/server/src/crdt_state.rs b/server/src/crdt_state.rs index 2c6d115d..1e697491 100644 --- a/server/src/crdt_state.rs +++ b/server/src/crdt_state.rs @@ -473,6 +473,165 @@ pub fn apply_remote_op(op: SignedOp) -> bool { true } +// ── Debug dump ─────────────────────────────────────────────────────── + +/// A raw dump of a single CRDT list entry, including deleted items. +/// +/// Use `content_index` (hex of the list insert `OpId`) to cross-reference +/// with rows in the `crdt_ops` SQLite table. +pub struct CrdtItemDump { + pub story_id: Option, + pub stage: Option, + pub name: Option, + pub agent: Option, + pub retry_count: Option, + pub blocked: Option, + pub depends_on: Option>, + /// Hex-encoded OpId of the list insert op — cross-reference with `crdt_ops`. + pub content_index: String, + pub is_deleted: bool, +} + +/// Top-level debug dump of the in-memory CRDT state. +pub struct CrdtStateDump { + pub in_memory_state_loaded: bool, + /// Count of non-deleted items with a valid story_id and stage. + pub total_items: usize, + /// Total list-level ops seen (excludes root sentinel). + pub total_ops_in_list: usize, + /// Highest Lamport sequence number seen across all list-level ops. + pub max_seq_in_list: u64, + /// Count of ops in the ALL_OPS journal (persisted ops replayed at startup). + pub persisted_ops_count: usize, + pub items: Vec, +} + +/// Dump the raw in-memory CRDT state for debugging. +/// +/// Unlike [`read_all_items`] this includes tombstoned (deleted) entries and +/// exposes internal op metadata (content_index, seq). Pass a `story_id` +/// filter to restrict the output to a single item. +/// +/// **This is a debug tool.** For normal pipeline introspection use +/// [`read_all_items`] or the `get_pipeline_status` MCP tool instead. +pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump { + let in_memory_state_loaded = CRDT_STATE.get().is_some(); + + let persisted_ops_count = ALL_OPS + .get() + .and_then(|m| m.lock().ok().map(|v| v.len())) + .unwrap_or(0); + + let Some(state_mutex) = CRDT_STATE.get() else { + return CrdtStateDump { + in_memory_state_loaded, + total_items: 0, + total_ops_in_list: 0, + max_seq_in_list: 0, + persisted_ops_count, + items: Vec::new(), + }; + }; + + let Ok(state) = state_mutex.lock() else { + return CrdtStateDump { + in_memory_state_loaded, + total_items: 0, + total_ops_in_list: 0, + max_seq_in_list: 0, + persisted_ops_count, + items: Vec::new(), + }; + }; + + let total_items = state.crdt.doc.items.iter().count(); + + let max_seq_in_list = state + .crdt + .doc + .items + .ops + .iter() + .map(|op| op.seq) + .max() + .unwrap_or(0); + + // Subtract 1 for the root sentinel. + let total_ops_in_list = state.crdt.doc.items.ops.len().saturating_sub(1); + + let mut items = Vec::new(); + for op in &state.crdt.doc.items.ops { + // Skip root sentinel (id == [0u8; 32]). + if op.id == ROOT_ID { + continue; + } + let Some(ref item_crdt) = op.content else { + // No content — skip (orphaned slot, should not happen in normal use). + continue; + }; + + let story_id = match item_crdt.story_id.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + + // Apply story_id filter before doing any further work. + if let Some(filter) = story_id_filter + && story_id.as_deref() != Some(filter) + { + continue; + } + + let stage = match item_crdt.stage.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let name = match item_crdt.name.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let agent = match item_crdt.agent.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let retry_count = match item_crdt.retry_count.view() { + JsonValue::Number(n) if n > 0.0 => Some(n as i64), + _ => None, + }; + let blocked = match item_crdt.blocked.view() { + JsonValue::Bool(b) => Some(b), + _ => None, + }; + let depends_on = match item_crdt.depends_on.view() { + JsonValue::String(s) if !s.is_empty() => serde_json::from_str::>(&s).ok(), + _ => None, + }; + + let content_index = op.id.iter().map(|b| format!("{b:02x}")).collect::(); + + items.push(CrdtItemDump { + story_id, + stage, + name, + agent, + retry_count, + blocked, + depends_on, + content_index, + is_deleted: op.is_deleted, + }); + } + + CrdtStateDump { + in_memory_state_loaded, + total_items, + total_ops_in_list, + max_seq_in_list, + persisted_ops_count, + items, + } +} + // ── Read path ──────────────────────────────────────────────────────── /// Read the full pipeline state from the CRDT document. diff --git a/server/src/http/mcp/diagnostics.rs b/server/src/http/mcp/diagnostics.rs index 44726f60..3c9ee51c 100644 --- a/server/src/http/mcp/diagnostics.rs +++ b/server/src/http/mcp/diagnostics.rs @@ -265,6 +265,45 @@ pub(super) fn tool_move_story(args: &Value, ctx: &AppContext) -> Result Result { + let story_id_filter = args.get("story_id").and_then(|v| v.as_str()); + let dump = crate::crdt_state::dump_crdt_state(story_id_filter); + + let items: Vec = dump + .items + .into_iter() + .map(|item| { + json!({ + "story_id": item.story_id, + "stage": item.stage, + "name": item.name, + "agent": item.agent, + "retry_count": item.retry_count, + "blocked": item.blocked, + "depends_on": item.depends_on, + "content_index": item.content_index, + "is_deleted": item.is_deleted, + }) + }) + .collect(); + + serde_json::to_string_pretty(&json!({ + "metadata": { + "in_memory_state_loaded": dump.in_memory_state_loaded, + "total_items": dump.total_items, + "total_ops_in_list": dump.total_ops_in_list, + "max_seq_in_list": dump.max_seq_in_list, + "persisted_ops_count": dump.persisted_ops_count, + "pending_persist_ops_count": null, + }, + "items": items, + })) + .map_err(|e| format!("Serialization error: {e}")) +} + /// MCP tool: count lines in a specific file relative to the project root. pub(super) fn tool_loc_file(args: &Value, ctx: &AppContext) -> Result { let file_path = args @@ -751,4 +790,49 @@ mod tests { .contains("not found in any pipeline stage") ); } + + // ── dump_crdt tool tests ────────────────────────────────────────── + + #[test] + fn tool_dump_crdt_returns_valid_json() { + let result = tool_dump_crdt(&json!({})).unwrap(); + let parsed: Value = serde_json::from_str(&result).expect("result must be valid JSON"); + assert!(parsed["metadata"].is_object(), "must have metadata object"); + assert!(parsed["items"].is_array(), "must have items array"); + } + + #[test] + fn tool_dump_crdt_metadata_has_required_fields() { + let result = tool_dump_crdt(&json!({})).unwrap(); + let parsed: Value = serde_json::from_str(&result).unwrap(); + let meta = &parsed["metadata"]; + assert!(meta["in_memory_state_loaded"].is_boolean()); + assert!(meta["total_items"].is_number()); + assert!(meta["total_ops_in_list"].is_number()); + assert!(meta["max_seq_in_list"].is_number()); + assert!(meta["persisted_ops_count"].is_number()); + } + + #[test] + fn tool_dump_crdt_with_story_id_filter_returns_valid_json() { + let result = + tool_dump_crdt(&json!({"story_id": "9999_story_nonexistent"})).unwrap(); + let parsed: Value = serde_json::from_str(&result).unwrap(); + assert!(parsed["items"].as_array().unwrap().is_empty()); + } + + #[test] + fn dump_crdt_in_tools_list() { + use super::super::handle_tools_list; + let resp = handle_tools_list(Some(json!(1))); + let tools = resp.result.unwrap()["tools"].as_array().unwrap().clone(); + let tool = tools.iter().find(|t| t["name"] == "dump_crdt"); + assert!(tool.is_some(), "dump_crdt missing from tools list"); + let t = tool.unwrap(); + assert!( + t["description"].as_str().unwrap().to_lowercase().contains("debug"), + "description must mention this is a debug tool" + ); + assert!(t["inputSchema"].is_object()); + } } diff --git a/server/src/http/mcp/mod.rs b/server/src/http/mcp/mod.rs index ed95be25..169c881e 100644 --- a/server/src/http/mcp/mod.rs +++ b/server/src/http/mcp/mod.rs @@ -1017,6 +1017,20 @@ fn handle_tools_list(id: Option) -> JsonRpcResponse { "required": ["story_id"] } }, + { + "name": "dump_crdt", + "description": "DEBUG TOOL: Dump the raw in-memory CRDT state. Returns every item the running server knows about, including tombstoned (deleted) entries, with internal op metadata (content_index, is_deleted, stage, etc.). Use this when diagnosing CRDT/state divergence — NOT for normal pipeline introspection (use get_pipeline_status for that). Optional story_id filter returns a single item.", + "inputSchema": { + "type": "object", + "properties": { + "story_id": { + "type": "string", + "description": "Optional: restrict output to this single work item identifier (filename stem, e.g. '42_story_my_feature')" + } + }, + "required": [] + } + }, { "name": "move_story", "description": "Move a work item (story, bug, spike, or refactor) to an arbitrary pipeline stage. Prefer dedicated tools when available: use accept_story to mark items done, move_story_to_merge to queue for merging, or request_qa to trigger QA review. Use move_story only for arbitrary moves that lack a dedicated tool — for example, moving a story back to backlog or recovering a ghost story by moving it back to current.", @@ -1333,6 +1347,8 @@ async fn handle_tools_call( "delete_story" => story_tools::tool_delete_story(&args, ctx).await, // Purge story (CRDT tombstone — story 521) "purge_story" => story_tools::tool_purge_story(&args, ctx), + // Debug CRDT dump (story 515) + "dump_crdt" => diagnostics::tool_dump_crdt(&args), // Arbitrary pipeline movement "move_story" => diagnostics::tool_move_story(&args, ctx), // Unblock story @@ -1471,7 +1487,8 @@ mod tests { assert!(names.contains(&"git_log")); assert!(names.contains(&"status")); assert!(names.contains(&"loc_file")); - assert_eq!(tools.len(), 58); + assert!(names.contains(&"dump_crdt")); + assert_eq!(tools.len(), 59); } #[test] diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index d3a84fe8..f2568a32 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -98,6 +98,7 @@ pub fn build_routes( "/oauth/status", get(oauth::oauth_status), ) + .at("/debug/crdt", get(debug_crdt_handler)) .at("/assets/*path", get(assets::embedded_asset)) .at("/", get(assets::embedded_index)) .at("/*path", get(assets::embedded_file)); @@ -126,6 +127,64 @@ pub fn build_routes( route.data(ctx_arc) } +/// Debug HTTP endpoint: `GET /debug/crdt[?story_id=]` +/// +/// Returns the raw in-memory CRDT state as JSON. Accepts an optional +/// `story_id` query parameter to restrict the dump to a single item. +/// +/// **This is a debug endpoint.** Use `GET /api/pipeline` or the +/// `get_pipeline_status` MCP tool for normal pipeline introspection. +#[poem::handler] +pub fn debug_crdt_handler(req: &poem::Request) -> poem::Response { + let story_id_filter = req.uri().query().and_then(|q| { + q.split('&').find_map(|pair| { + let (key, val) = pair.split_once('=')?; + if key == "story_id" { + Some(val.to_string()) + } else { + None + } + }) + }); + + let dump = crate::crdt_state::dump_crdt_state(story_id_filter.as_deref()); + + let items: Vec = dump + .items + .into_iter() + .map(|item| { + serde_json::json!({ + "story_id": item.story_id, + "stage": item.stage, + "name": item.name, + "agent": item.agent, + "retry_count": item.retry_count, + "blocked": item.blocked, + "depends_on": item.depends_on, + "content_index": item.content_index, + "is_deleted": item.is_deleted, + }) + }) + .collect(); + + let body = serde_json::json!({ + "metadata": { + "in_memory_state_loaded": dump.in_memory_state_loaded, + "total_items": dump.total_items, + "total_ops_in_list": dump.total_ops_in_list, + "max_seq_in_list": dump.max_seq_in_list, + "persisted_ops_count": dump.persisted_ops_count, + "pending_persist_ops_count": null, + }, + "items": items, + }); + + poem::Response::builder() + .status(poem::http::StatusCode::OK) + .header(poem::http::header::CONTENT_TYPE, "application/json") + .body(serde_json::to_string_pretty(&body).unwrap_or_default()) +} + type ApiTuple = ( ProjectApi, ModelApi,