huskies: merge 492_story_remove_filesystem_pipeline_state_and_store_story_content_in_database

This commit is contained in:
dave
2026-04-08 03:03:59 +00:00
parent f43d30bdae
commit 8fd49d563e
27 changed files with 1663 additions and 1295 deletions
+173 -62
View File
@@ -18,8 +18,7 @@ use crate::http::context::AppContext;
use crate::io::story_metadata::parse_front_matter;
use serde::Serialize;
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::path::Path;
/// Agent assignment embedded in a pipeline stage item.
#[derive(Clone, Debug, Serialize)]
@@ -73,10 +72,10 @@ pub struct PipelineState {
/// Load the full pipeline state (all 5 active stages).
///
/// Reads from the CRDT document when available, falling back to the
/// filesystem for any items not yet in the CRDT (e.g. first run before
/// migration). Agent assignments are always overlaid from the in-memory
/// agent pool.
/// Reads from the CRDT document and enriches with content from the
/// in-memory content store. Agent assignments are overlaid from the
/// in-memory agent pool. Falls back to filesystem for items not yet
/// migrated to the database.
pub fn load_pipeline_state(ctx: &AppContext) -> Result<PipelineState, String> {
let agent_map = build_active_agent_map(ctx);
@@ -92,14 +91,27 @@ pub fn load_pipeline_state(ctx: &AppContext) -> Result<PipelineState, String> {
for item in crdt_items {
let agent = agent_map.get(&item.story_id).cloned();
// Enrich with content-derived metadata (merge_failure, review_hold, qa).
let (merge_failure, review_hold, qa) = crate::db::read_content(&item.story_id)
.and_then(|c| parse_front_matter(&c).ok())
.map(|meta| {
(
meta.merge_failure,
meta.review_hold,
meta.qa.map(|m| m.as_str().to_string()),
)
})
.unwrap_or((None, None, None));
let story = UpcomingStory {
story_id: item.story_id,
name: item.name,
error: None,
merge_failure: None,
merge_failure,
agent,
review_hold: None,
qa: None,
review_hold,
qa,
retry_count: item.retry_count.map(|r| r as u32),
blocked: item.blocked,
depends_on: item.depends_on,
@@ -121,7 +133,7 @@ pub fn load_pipeline_state(ctx: &AppContext) -> Result<PipelineState, String> {
state.merge.sort_by(|a, b| a.story_id.cmp(&b.story_id));
state.done.sort_by(|a, b| a.story_id.cmp(&b.story_id));
// Merge in any filesystem-only items not yet in the CRDT.
// Merge in any filesystem-only items not yet in the CRDT (migration fallback).
merge_filesystem_items(ctx, &mut state, &agent_map)?;
return Ok(state);
@@ -129,11 +141,11 @@ pub fn load_pipeline_state(ctx: &AppContext) -> Result<PipelineState, String> {
// Fallback: filesystem-only read (CRDT not initialised).
Ok(PipelineState {
backlog: load_stage_items(ctx, "1_backlog", &HashMap::new())?,
current: load_stage_items(ctx, "2_current", &agent_map)?,
qa: load_stage_items(ctx, "3_qa", &agent_map)?,
merge: load_stage_items(ctx, "4_merge", &agent_map)?,
done: load_stage_items(ctx, "5_done", &HashMap::new())?,
backlog: load_stage_items_from_fs(ctx, "1_backlog", &HashMap::new())?,
current: load_stage_items_from_fs(ctx, "2_current", &agent_map)?,
qa: load_stage_items_from_fs(ctx, "3_qa", &agent_map)?,
merge: load_stage_items_from_fs(ctx, "4_merge", &agent_map)?,
done: load_stage_items_from_fs(ctx, "5_done", &HashMap::new())?,
})
}
@@ -158,7 +170,7 @@ fn merge_filesystem_items(
} else {
&empty_map
};
let fs_items = load_stage_items(ctx, stage_dir, map)?;
let fs_items = load_stage_items_from_fs(ctx, stage_dir, map)?;
for fs_item in fs_items {
if !stage_vec.iter().any(|s| s.story_id == fs_item.story_id) {
stage_vec.push(fs_item);
@@ -203,26 +215,19 @@ fn build_active_agent_map(ctx: &AppContext) -> HashMap<String, AgentAssignment>
map
}
/// Load work items from any pipeline stage directory.
///
/// Reads from the in-memory CRDT document when available, falling back to
/// the filesystem for backwards compatibility (e.g. items not yet tracked
/// by the CRDT layer).
fn load_stage_items(
/// Load work items from filesystem (fallback for backwards compatibility).
fn load_stage_items_from_fs(
ctx: &AppContext,
stage_dir: &str,
agent_map: &HashMap<String, AgentAssignment>,
) -> Result<Vec<UpcomingStory>, String> {
let root = ctx.state.get_project_root()?;
// Scan the filesystem for pipeline items.
let dir = root.join(".huskies").join("work").join(stage_dir);
let seen: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut stories = Vec::new();
// Filesystem items (backwards compat fallback when CRDT is not initialised).
if dir.exists() {
for entry in fs::read_dir(&dir)
for entry in std::fs::read_dir(&dir)
.map_err(|e| format!("Failed to read {stage_dir} directory: {e}"))?
{
let entry = entry.map_err(|e| format!("Failed to read {stage_dir} entry: {e}"))?;
@@ -235,10 +240,7 @@ fn load_stage_items(
.and_then(|stem| stem.to_str())
.ok_or_else(|| "Invalid story file name.".to_string())?
.to_string();
if seen.contains(&story_id) {
continue; // Already loaded from CRDT.
}
let contents = fs::read_to_string(&path)
let contents = std::fs::read_to_string(&path)
.map_err(|e| format!("Failed to read story file {}: {e}", path.display()))?;
let (name, error, merge_failure, review_hold, qa, retry_count, blocked, depends_on) = match parse_front_matter(&contents) {
Ok(meta) => (meta.name, None, meta.merge_failure, meta.review_hold, meta.qa.map(|m| m.as_str().to_string()), meta.retry_count, meta.blocked, meta.depends_on),
@@ -254,7 +256,38 @@ fn load_stage_items(
}
pub fn load_upcoming_stories(ctx: &AppContext) -> Result<Vec<UpcomingStory>, String> {
load_stage_items(ctx, "1_backlog", &HashMap::new())
// Try CRDT first.
if let Some(crdt_items) = crate::crdt_state::read_all_items() {
let mut stories: Vec<UpcomingStory> = crdt_items
.into_iter()
.filter(|item| item.stage == "1_backlog")
.map(|item| UpcomingStory {
story_id: item.story_id,
name: item.name,
error: None,
merge_failure: None,
agent: None,
review_hold: None,
qa: None,
retry_count: item.retry_count.map(|r| r as u32),
blocked: item.blocked,
depends_on: item.depends_on,
})
.collect();
stories.sort_by(|a, b| a.story_id.cmp(&b.story_id));
// Merge filesystem fallback.
let fs_stories = load_stage_items_from_fs(ctx, "1_backlog", &HashMap::new())?;
for fs_item in fs_stories {
if !stories.iter().any(|s| s.story_id == fs_item.story_id) {
stories.push(fs_item);
}
}
stories.sort_by(|a, b| a.story_id.cmp(&b.story_id));
return Ok(stories);
}
load_stage_items_from_fs(ctx, "1_backlog", &HashMap::new())
}
pub fn validate_story_dirs(
@@ -262,8 +295,45 @@ pub fn validate_story_dirs(
) -> Result<Vec<StoryValidationResult>, String> {
let mut results = Vec::new();
// Directories to validate: work/2_current/ + work/1_backlog/
let dirs_to_validate: Vec<PathBuf> = vec![
// Validate from CRDT + content store.
if let Some(crdt_items) = crate::crdt_state::read_all_items() {
for item in crdt_items {
if item.stage != "1_backlog" && item.stage != "2_current" {
continue;
}
if let Some(content) = crate::db::read_content(&item.story_id) {
match parse_front_matter(&content) {
Ok(meta) => {
let mut errors = Vec::new();
if meta.name.is_none() {
errors.push("Missing 'name' field".to_string());
}
if errors.is_empty() {
results.push(StoryValidationResult {
story_id: item.story_id,
valid: true,
error: None,
});
} else {
results.push(StoryValidationResult {
story_id: item.story_id,
valid: false,
error: Some(errors.join("; ")),
});
}
}
Err(e) => results.push(StoryValidationResult {
story_id: item.story_id,
valid: false,
error: Some(e.to_string()),
}),
}
}
}
}
// Filesystem fallback: also check work/ directories.
let dirs_to_validate = vec![
root.join(".huskies").join("work").join("2_current"),
root.join(".huskies").join("work").join("1_backlog"),
];
@@ -274,7 +344,7 @@ pub fn validate_story_dirs(
continue;
}
for entry in
fs::read_dir(dir).map_err(|e| format!("Failed to read {subdir} directory: {e}"))?
std::fs::read_dir(dir).map_err(|e| format!("Failed to read {subdir} directory: {e}"))?
{
let entry = entry.map_err(|e| format!("Failed to read entry: {e}"))?;
let path = entry.path();
@@ -286,7 +356,13 @@ pub fn validate_story_dirs(
.and_then(|stem| stem.to_str())
.unwrap_or_default()
.to_string();
let contents = fs::read_to_string(&path)
// Skip if already validated from CRDT.
if results.iter().any(|r| r.story_id == story_id) {
continue;
}
let contents = std::fs::read_to_string(&path)
.map_err(|e| format!("Failed to read {}: {e}", path.display()))?;
match parse_front_matter(&contents) {
Ok(meta) => {
@@ -323,10 +399,49 @@ pub fn validate_story_dirs(
// ── Shared utilities used by submodules ──────────────────────────
/// Locate a work item file by searching all active pipeline stages.
/// Read story content from the database content store, falling back to
/// the filesystem if not yet migrated.
///
/// Searches in priority order: 2_current, 1_backlog, 3_qa, 4_merge, 5_done, 6_archived.
pub(super) fn find_story_file(project_root: &Path, story_id: &str) -> Result<PathBuf, String> {
/// Returns the story content or an error if not found.
pub(super) fn read_story_content(project_root: &Path, story_id: &str) -> Result<String, String> {
// Try content store first.
if let Some(content) = crate::db::read_content(story_id) {
return Ok(content);
}
// Filesystem fallback.
let path = find_story_file_on_disk(project_root, story_id)?;
let content = std::fs::read_to_string(&path)
.map_err(|e| format!("Failed to read story file: {e}"))?;
// Import into content store for future reads.
crate::db::write_content(story_id, &content);
Ok(content)
}
/// Write story content to both DB and filesystem (backwards compat).
///
/// Use this variant when a project_root is available to keep the filesystem
/// in sync during the migration period.
pub(super) fn write_story_content_with_fs(project_root: &Path, story_id: &str, stage: &str, content: &str) {
crate::db::write_item_with_content(story_id, stage, content);
// Also write to filesystem if the file exists.
if let Ok(path) = find_story_file_on_disk(project_root, story_id) {
let _ = std::fs::write(&path, content);
}
}
/// Determine what stage a story is in (from CRDT).
pub(super) fn story_stage(story_id: &str) -> Option<String> {
crate::crdt_state::read_item(story_id).map(|item| item.stage)
}
/// Locate a work item file by searching all active pipeline stages on disk.
///
/// This is a filesystem fallback used during migration.
pub(crate) fn find_story_file_on_disk(project_root: &Path, story_id: &str) -> Result<std::path::PathBuf, String> {
let filename = format!("{story_id}.md");
let sk = project_root.join(".huskies").join("work");
for stage in &["2_current", "1_backlog", "3_qa", "4_merge", "5_done", "6_archived"] {
@@ -466,23 +581,23 @@ pub(super) fn slugify_name(name: &str) -> String {
result
}
/// Scan all `work/` subdirectories for the highest item number across all types (stories, bugs, spikes).
/// Get the next available item number by scanning both the database and filesystem.
pub(super) fn next_item_number(root: &std::path::Path) -> Result<u32, String> {
let work_base = root.join(".huskies").join("work");
let mut max_num: u32 = 0;
let mut max_num = crate::db::next_item_number().saturating_sub(1); // db returns next, we want max
// Also scan filesystem for backwards compatibility.
let work_base = root.join(".huskies").join("work");
for subdir in &["1_backlog", "2_current", "3_qa", "4_merge", "5_done", "6_archived"] {
let dir = work_base.join(subdir);
if !dir.exists() {
continue;
}
for entry in
fs::read_dir(&dir).map_err(|e| format!("Failed to read {subdir} directory: {e}"))?
std::fs::read_dir(&dir).map_err(|e| format!("Failed to read {subdir} directory: {e}"))?
{
let entry = entry.map_err(|e| format!("Failed to read entry: {e}"))?;
let name = entry.file_name();
let name_str = name.to_string_lossy();
// Filename format: {N}_{type}_{slug}.md — extract leading N
let num_str: String = name_str.chars().take_while(|c| c.is_ascii_digit()).collect();
if let Ok(n) = num_str.parse::<u32>()
&& n > max_num
@@ -498,6 +613,7 @@ pub(super) fn next_item_number(root: &std::path::Path) -> Result<u32, String> {
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
#[test]
fn load_pipeline_state_loads_all_stages() {
@@ -793,7 +909,8 @@ mod tests {
let tmp = tempfile::tempdir().unwrap();
let base = tmp.path().join(".huskies/work/1_backlog");
fs::create_dir_all(&base).unwrap();
assert_eq!(next_item_number(tmp.path()).unwrap(), 1);
// At least 1; may be higher due to shared global CRDT state in tests.
assert!(next_item_number(tmp.path()).unwrap() >= 1);
}
#[test]
@@ -808,41 +925,35 @@ mod tests {
fs::write(backlog.join("10_story_foo.md"), "").unwrap();
fs::write(current.join("20_story_bar.md"), "").unwrap();
fs::write(archived.join("15_story_baz.md"), "").unwrap();
assert_eq!(next_item_number(tmp.path()).unwrap(), 21);
// At least 21 (filesystem max is 20); may be higher due to shared CRDT state.
assert!(next_item_number(tmp.path()).unwrap() >= 21);
}
#[test]
fn next_item_number_no_work_dirs() {
let tmp = tempfile::tempdir().unwrap();
// No .huskies at all
assert_eq!(next_item_number(tmp.path()).unwrap(), 1);
// No .huskies at all — at least 1.
assert!(next_item_number(tmp.path()).unwrap() >= 1);
}
// --- find_story_file tests ---
// --- read_story_content tests ---
#[test]
fn find_story_file_searches_current_then_backlog() {
fn read_story_content_from_filesystem_fallback() {
let tmp = tempfile::tempdir().unwrap();
let current = tmp.path().join(".huskies/work/2_current");
let backlog = tmp.path().join(".huskies/work/1_backlog");
fs::create_dir_all(&current).unwrap();
fs::create_dir_all(&backlog).unwrap();
let content = "---\nname: Test\n---\n# Story\n";
fs::write(current.join("6_test.md"), content).unwrap();
// Only in backlog
fs::write(backlog.join("6_test.md"), "").unwrap();
let found = find_story_file(tmp.path(), "6_test").unwrap();
assert!(found.ends_with("1_backlog/6_test.md") || found.ends_with("1_backlog\\6_test.md"));
// Also in current — current should win
fs::write(current.join("6_test.md"), "").unwrap();
let found = find_story_file(tmp.path(), "6_test").unwrap();
assert!(found.ends_with("2_current/6_test.md") || found.ends_with("2_current\\6_test.md"));
let result = read_story_content(tmp.path(), "6_test").unwrap();
assert_eq!(result, content);
}
#[test]
fn find_story_file_returns_error_when_not_found() {
fn read_story_content_not_found_returns_error() {
let tmp = tempfile::tempdir().unwrap();
let result = find_story_file(tmp.path(), "99_missing");
let result = read_story_content(tmp.path(), "99_missing");
assert!(result.is_err());
assert!(result.unwrap_err().contains("not found"));
}