huskies: merge 1010

This commit is contained in:
dave
2026-05-14 08:07:43 +00:00
parent 4520e0e6f9
commit 13ab97a615
27 changed files with 572 additions and 95 deletions
+3 -2
View File
@@ -100,6 +100,7 @@ mod tests {
.single()
.unwrap(),
}),
plan: Default::default(),
},
Some("Stale Claim Displacement Test"),
None,
@@ -111,7 +112,7 @@ mod tests {
// Confirm the stale claim is in place.
let before = read_item(story_id).expect("item should exist");
let before_claim = match before.stage() {
Stage::Coding { claim } => claim.as_ref(),
Stage::Coding { claim, .. } => claim.as_ref(),
Stage::Merge { claim, .. } => claim.as_ref(),
_ => None,
};
@@ -150,7 +151,7 @@ mod tests {
let our_id = our_node_id().expect("node id should be available after init_for_test");
let after = read_item(story_id).expect("item should still exist");
let after_claim = match after.stage() {
Stage::Coding { claim } => claim.as_ref(),
Stage::Coding { claim, .. } => claim.as_ref(),
Stage::Merge { claim, .. } => claim.as_ref(),
_ => None,
};
+2 -2
View File
@@ -66,7 +66,7 @@ pub(super) async fn scan_and_claim(
}
let item_claim = match item.stage() {
crate::pipeline_state::Stage::Coding { claim } => claim.as_ref(),
crate::pipeline_state::Stage::Coding { claim, .. } => claim.as_ref(),
crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(),
_ => None,
};
@@ -196,7 +196,7 @@ pub(super) fn reclaim_timed_out_work(_project_root: &Path) {
// holder is still alive. A node actively working should refresh its
// claim before the TTL window closes.
let reclaim_claim = match item.stage() {
crate::pipeline_state::Stage::Coding { claim } => claim.as_ref(),
crate::pipeline_state::Stage::Coding { claim, .. } => claim.as_ref(),
crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(),
_ => None,
};
@@ -27,7 +27,13 @@ impl AgentPool {
/// here as well.
pub(super) async fn assign_pipeline_stages(&self, project_root: &Path, config: &ProjectConfig) {
let stages: [(Stage, PipelineStage); 2] = [
(Stage::Coding { claim: None }, PipelineStage::Coder),
(
Stage::Coding {
claim: None,
plan: Default::default(),
},
PipelineStage::Coder,
),
(Stage::Qa, PipelineStage::Qa),
];
+4 -1
View File
@@ -218,7 +218,10 @@ mod tests {
crate::db::ItemMeta::named("baz"),
);
let items = scan_stage_items(&Stage::Coding { claim: None });
let items = scan_stage_items(&Stage::Coding {
claim: None,
plan: Default::default(),
});
// The global CRDT may contain items from other tests, so check
// that our three items are present and appear in sorted order.
assert!(
@@ -100,7 +100,9 @@ mod tests {
use super::*;
use crate::agents::TokenUsage;
use crate::agents::token_usage::TokenUsageRecord;
use crate::pipeline_state::{BranchName, PipelineEvent, Stage, StoryId, TransitionFired};
use crate::pipeline_state::{
BranchName, PipelineEvent, PlanState, Stage, StoryId, TransitionFired,
};
use chrono::Utc;
use std::num::NonZeroU32;
@@ -148,7 +150,10 @@ mod tests {
fn fired_abandoned(story_id: &str) -> TransitionFired {
TransitionFired {
story_id: StoryId(story_id.to_string()),
before: Stage::Coding { claim: None },
before: Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
after: Stage::Abandoned { ts: Utc::now() },
event: PipelineEvent::Abandon,
at: Utc::now(),
@@ -159,7 +164,10 @@ mod tests {
TransitionFired {
story_id: StoryId(story_id.to_string()),
before: Stage::Backlog,
after: Stage::Coding { claim: None },
after: Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
event: PipelineEvent::DepsMet,
at: Utc::now(),
}
+8 -2
View File
@@ -93,7 +93,10 @@ mod tests {
make_item(
"20_story_in_progress",
"In Progress",
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: Default::default(),
},
),
make_item("30_story_in_qa", "In QA", Stage::Qa),
];
@@ -234,7 +237,10 @@ mod tests {
let items = vec![make_item(
"1_story_done",
"Done",
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: Default::default(),
},
)];
let output = build_backlog_from_items(&items);
assert!(
+52 -13
View File
@@ -137,7 +137,10 @@ fn status_does_not_show_full_filename_stem() {
let items = vec![make_item(
"293_story_register_all_bot_commands",
"Register all bot commands",
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: Default::default(),
},
)];
let agents = AgentPool::new_test(3000);
@@ -163,7 +166,10 @@ fn status_shows_cost_when_token_usage_exists() {
let items = vec![make_item(
"293_story_register_all_bot_commands",
"Register all bot commands",
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: Default::default(),
},
)];
// Write token usage for this story.
@@ -200,7 +206,10 @@ fn status_no_cost_when_no_usage() {
let items = vec![make_item(
"293_story_register_all_bot_commands",
"Register all bot commands",
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: Default::default(),
},
)];
let agents = AgentPool::new_test(3000);
@@ -220,7 +229,10 @@ fn status_aggregates_multiple_records_per_story() {
let items = vec![make_item(
"293_story_register_all_bot_commands",
"Register all bot commands",
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: Default::default(),
},
)];
// Write two records for the same story — costs should be summed.
@@ -264,7 +276,10 @@ fn status_shows_waiting_on_for_story_with_unmet_deps() {
make_item_with_deps(
"10_story_waiting",
"Waiting Story",
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: Default::default(),
},
vec![999],
),
make_item("999_story_dep", "Dep Story", Stage::Backlog),
@@ -289,7 +304,10 @@ fn status_does_not_show_waiting_on_when_dep_is_done() {
make_item_with_deps(
"10_story_unblocked",
"Unblocked Story",
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: Default::default(),
},
vec![999],
),
make_item(
@@ -319,7 +337,10 @@ fn status_shows_no_waiting_info_when_no_deps() {
let items = vec![make_item(
"42_story_nodeps",
"No Deps Story",
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: Default::default(),
},
)];
let agents = AgentPool::new_test(3000);
@@ -383,7 +404,10 @@ fn stage_is_blocked_returns_true_for_archived_blocked() {
#[test]
fn stage_is_blocked_returns_false_for_coding() {
assert!(!matches!(
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: Default::default()
},
Stage::Blocked { .. }
| Stage::MergeFailure { .. }
| Stage::MergeFailureFinal { .. }
@@ -422,7 +446,10 @@ fn status_shows_idle_dot_for_unassigned_story() {
let items = vec![make_item(
"42_story_idle",
"Idle Story",
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: Default::default(),
},
)];
let agents = AgentPool::new_test(3000);
@@ -793,7 +820,10 @@ fn in_progress_count_includes_blocked_items() {
make_item(
"10_story_coding",
"Coding Story",
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: Default::default(),
},
),
make_item(
"11_story_blocked",
@@ -825,7 +855,10 @@ fn frozen_coding_item_appears_in_in_progress_section() {
"60_story_frozen",
"Frozen Coding Story",
Stage::Frozen {
resume_to: Box::new(Stage::Coding { claim: None }),
resume_to: Box::new(Stage::Coding {
claim: None,
plan: Default::default(),
}),
},
)];
@@ -883,7 +916,10 @@ fn frozen_item_shows_snowflake_indicator() {
"80_story_frozen_flake",
"Frozen Flake Story",
Stage::Frozen {
resume_to: Box::new(Stage::Coding { claim: None }),
resume_to: Box::new(Stage::Coding {
claim: None,
plan: Default::default(),
}),
},
)];
@@ -913,7 +949,10 @@ fn frozen_and_blocked_use_distinct_indicators() {
"91_story_frozen_ind",
"Frozen Story",
Stage::Frozen {
resume_to: Box::new(Stage::Coding { claim: None }),
resume_to: Box::new(Stage::Coding {
claim: None,
plan: Default::default(),
}),
},
),
];
+2 -2
View File
@@ -55,8 +55,8 @@ pub use types::{
pub use write::{
bump_retry_count, migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs,
migrate_node_claims_to_agent_claims, migrate_story_ids_to_numeric, name_from_story_id,
set_agent, set_depends_on, set_epic, set_item_type, set_name, set_qa_mode, set_resume_to,
set_resume_to_raw, set_retry_count, write_item,
set_agent, set_depends_on, set_epic, set_item_type, set_name, set_plan_state, set_qa_mode,
set_resume_to, set_resume_to_raw, set_retry_count, write_item,
};
#[cfg(test)]
+1 -1
View File
@@ -110,7 +110,7 @@ pub fn is_claimed_by_us(story_id: &str) -> bool {
return false;
};
let claim = match item.stage() {
crate::pipeline_state::Stage::Coding { claim } => claim.as_ref(),
crate::pipeline_state::Stage::Coding { claim, .. } => claim.as_ref(),
crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(),
_ => None,
};
+18 -3
View File
@@ -398,6 +398,11 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemV
_ => None,
};
let plan_state_str = match item.plan_state.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let stage = project_stage_for_view(
&stage_str,
&story_id,
@@ -405,6 +410,7 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemV
resume_to.as_deref(),
claim_agent.as_deref(),
claim_ts_secs,
plan_state_str.as_deref(),
)?;
Some(PipelineItemView {
@@ -440,8 +446,11 @@ fn project_stage_for_view(
resume_to: Option<&str>,
claim_agent: Option<&str>,
claim_ts_secs: Option<u64>,
plan_state_str: Option<&str>,
) -> Option<crate::pipeline_state::Stage> {
use crate::pipeline_state::{AgentClaim, AgentName, ArchiveReason, BranchName, GitSha, Stage};
use crate::pipeline_state::{
AgentClaim, AgentName, ArchiveReason, BranchName, GitSha, PlanState, Stage,
};
use chrono::{DateTime, TimeZone, Utc};
use std::num::NonZeroU32;
@@ -470,7 +479,10 @@ fn project_stage_for_view(
Box::new(
resume_to
.and_then(Stage::from_dir)
.unwrap_or(Stage::Coding { claim: None }),
.unwrap_or(Stage::Coding {
claim: None,
plan: PlanState::Missing,
}),
)
};
@@ -489,7 +501,10 @@ fn project_stage_for_view(
match clean {
"upcoming" => Some(Stage::Upcoming),
"backlog" => Some(Stage::Backlog),
"coding" => Some(Stage::Coding { claim }),
"coding" => Some(Stage::Coding {
claim,
plan: PlanState::from_str(plan_state_str.unwrap_or("")),
}),
"qa" => Some(Stage::Qa),
"blocked" => Some(Stage::Blocked {
reason: String::new(),
+12 -2
View File
@@ -96,6 +96,10 @@ pub struct PipelineItemCrdt {
/// `"rejected"`. These stages never have a resume target, so the
/// register is exclusively available for their metadata.
pub resume_to: LwwRegisterCrdt<String>,
/// Story 1010: lifecycle state of `PLAN.md` in the coding worktree.
/// Wire values: `"missing"` (default/empty), `"drafted"`, `"confirmed"`.
/// Updated by the filesystem watcher on PLAN.md create/modify/remove events.
pub plan_state: LwwRegisterCrdt<String>,
}
/// CRDT node that holds a single peer's presence entry.
@@ -518,7 +522,10 @@ mod tests {
let evt = CrdtEvent {
story_id: "42_story_foo".to_string(),
from_stage: Some(crate::pipeline_state::Stage::Backlog),
to_stage: crate::pipeline_state::Stage::Coding { claim: None },
to_stage: crate::pipeline_state::Stage::Coding {
claim: None,
plan: crate::pipeline_state::PlanState::Missing,
},
name: "Foo Feature".to_string(),
};
assert_eq!(evt.story_id, "42_story_foo");
@@ -678,7 +685,10 @@ mod tests {
let evt = CrdtEvent {
story_id: "70_story_broadcast".to_string(),
from_stage: Some(Stage::Backlog),
to_stage: Stage::Coding { claim: None },
to_stage: Stage::Coding {
claim: None,
plan: crate::pipeline_state::PlanState::Missing,
},
name: "Broadcast Test".to_string(),
};
tx.send(evt).unwrap();
+27 -1
View File
@@ -211,6 +211,30 @@ pub fn set_qa_mode(story_id: &str, mode: Option<QaMode>) -> bool {
true
}
/// Set the `plan_state` CRDT register for a pipeline item (story 1010).
///
/// Encodes the PLAN.md lifecycle as a wire string (`"missing"`, `"drafted"`,
/// `"confirmed"`). Called by the filesystem watcher when PLAN.md is created,
/// modified, or removed inside a coding worktree.
///
/// Returns `true` if the item was found and the op was applied, `false` otherwise.
pub fn set_plan_state(story_id: &str, state: crate::pipeline_state::PlanState) -> bool {
let Some(state_mutex) = get_crdt() else {
return false;
};
let Ok(mut crdt_state) = state_mutex.lock() else {
return false;
};
let Some(&idx) = crdt_state.index.get(story_id) else {
return false;
};
let value = state.as_str().to_string();
apply_and_persist(&mut crdt_state, |s| {
s.crdt.doc.items[idx].plan_state.set(value)
});
true
}
/// Write a pipeline item state through CRDT operations.
///
/// If the item exists, updates its registers. If not, inserts a new item
@@ -232,7 +256,7 @@ pub fn write_item(
) {
let stage_str = stage_dir_name(stage);
let claim: Option<&AgentClaim> = match stage {
Stage::Coding { claim } => claim.as_ref(),
Stage::Coding { claim, .. } => claim.as_ref(),
Stage::Merge { claim, .. } => claim.as_ref(),
_ => None,
};
@@ -350,6 +374,7 @@ pub fn write_item(
"item_type": "",
"epic": "",
"resume_to": "",
"plan_state": "",
})
.into();
@@ -378,6 +403,7 @@ pub fn write_item(
item.item_type.advance_seq(floor);
item.epic.advance_seq(floor);
item.resume_to.advance_seq(floor);
item.plan_state.advance_seq(floor);
}
// Broadcast a CrdtEvent for the new item.
+9 -3
View File
@@ -333,7 +333,7 @@ mod stage_migration_tests {
use super::super::item::write_item;
use super::*;
use crate::crdt_state::read_item;
use crate::pipeline_state::{BranchName, Stage};
use crate::pipeline_state::{BranchName, PlanState, Stage};
use std::num::NonZeroU32;
/// Seed a pipeline item with a raw, possibly-legacy stage register value,
@@ -370,7 +370,10 @@ mod stage_migration_tests {
(
"9503_legacy_coding",
"2_current",
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
),
(
"9504_legacy_blocked",
@@ -452,7 +455,10 @@ mod stage_migration_tests {
// Seed two items: one already in clean form, one in legacy form.
write_item(
"9520_already_clean",
&Stage::Coding { claim: None },
&Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
Some("Already Clean"),
None,
None,
+2 -2
View File
@@ -10,8 +10,8 @@ mod migrations;
mod tests;
pub use item::{
bump_retry_count, set_agent, set_depends_on, set_epic, set_item_type, set_name, set_qa_mode,
set_resume_to, set_resume_to_raw, set_retry_count, write_item,
bump_retry_count, set_agent, set_depends_on, set_epic, set_item_type, set_name, set_plan_state,
set_qa_mode, set_resume_to, set_resume_to_raw, set_retry_count, write_item,
};
#[cfg(test)]
+1 -1
View File
@@ -196,7 +196,7 @@ pub(super) async fn tool_status(args: &Value, ctx: &AppContext) -> Result<String
front_matter.insert("depends_on".to_string(), json!(deps));
}
let stage_claim = match &typed_item.stage {
crate::pipeline_state::Stage::Coding { claim } => claim.as_ref(),
crate::pipeline_state::Stage::Coding { claim, .. } => claim.as_ref(),
crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(),
_ => None,
};
+57 -10
View File
@@ -1,15 +1,16 @@
//! Filesystem watcher for `.huskies/project.toml` and `.huskies/agents.toml`.
//! Filesystem watcher for `.huskies/project.toml`, `.huskies/agents.toml`,
//! and `.huskies/worktrees/*/PLAN.md`.
//!
//! Watches config files for changes and broadcasts a [`WatcherEvent`] to all
//! connected WebSocket clients so the frontend can reload the agent roster
//! without a server restart.
//!
//! Work-item pipeline events (stage transitions) are driven by CRDT state
//! changes via [`crate::crdt_state::subscribe`], not by filesystem events.
//! without a server restart. Also watches worktree PLAN.md files and updates
//! the typed [`crate::pipeline_state::PlanState`] in the CRDT whenever a
//! PLAN.md is created, modified, or removed.
//!
//! # Debouncing
//! Config-file events are buffered for 300 ms after the last activity to avoid
//! duplicate broadcasts when an editor writes multiple events in quick succession.
//! PLAN.md events are applied immediately without debouncing.
//!
//! # Submodules
//! - [`events`] — [`WatcherEvent`] enum definition.
@@ -28,6 +29,37 @@ use std::sync::mpsc;
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
/// Extract the story ID from a path of the form
/// `{git_root}/.huskies/worktrees/{story_id}/PLAN.md`.
///
/// Returns `Some(story_id)` when `path` is exactly a `PLAN.md` file one level
/// inside the worktrees directory. Returns `None` for any other path.
pub fn extract_story_id_from_plan_path(path: &Path, git_root: &Path) -> Option<String> {
if path.file_name()? != "PLAN.md" {
return None;
}
let parent = path.parent()?;
let expected_worktrees = git_root.join(".huskies").join("worktrees");
if parent.parent()? != expected_worktrees {
return None;
}
Some(parent.file_name()?.to_str()?.to_string())
}
/// Determine the [`PlanState`] for a PLAN.md file at `path`.
///
/// - File absent or unreadable → [`PlanState::Missing`]
/// - File contains `<TBD>` → [`PlanState::Drafted`]
/// - File exists with no `<TBD>` → [`PlanState::Confirmed`]
pub fn plan_state_for_path(path: &Path) -> crate::pipeline_state::PlanState {
use crate::pipeline_state::PlanState;
match std::fs::read_to_string(path) {
Ok(content) if content.contains("<TBD>") => PlanState::Drafted,
Ok(_) => PlanState::Confirmed,
Err(_) => PlanState::Missing,
}
}
/// Return `true` if `path` is the root-level `.huskies/project.toml` or
/// `.huskies/agents.toml`, i.e. `{git_root}/.huskies/{project,agents}.toml`.
///
@@ -100,8 +132,7 @@ pub fn start_watcher(git_root: PathBuf, event_tx: broadcast::Sender<WatcherEvent
}
};
// Watch config files for hot-reload. Work-item directories are NOT
// watched — CRDT state transitions drive pipeline events now.
// Watch config files for hot-reload.
let huskies = git_root.join(".huskies");
for config_file in [huskies.join("project.toml"), huskies.join("agents.toml")] {
if config_file.exists()
@@ -114,7 +145,15 @@ pub fn start_watcher(git_root: PathBuf, event_tx: broadcast::Sender<WatcherEvent
}
}
slog!("[watcher] watching config files for hot-reload");
// Watch worktrees directory for PLAN.md changes.
let worktrees_dir = huskies.join("worktrees");
if worktrees_dir.exists()
&& let Err(e) = watcher.watch(&worktrees_dir, RecursiveMode::Recursive)
{
slog!("[watcher] failed to watch worktrees dir: {e}");
}
slog!("[watcher] watching config files and worktree PLAN.md files");
const DEBOUNCE: Duration = Duration::from_millis(300);
@@ -141,9 +180,17 @@ pub fn start_watcher(git_root: PathBuf, event_tx: broadcast::Sender<WatcherEvent
slog!("[watcher] config change detected: {}", path.display());
config_changed_pending = true;
deadline = Some(Instant::now() + DEBOUNCE);
} else if let Some(story_id) =
extract_story_id_from_plan_path(&path, &git_root)
{
let plan_state = plan_state_for_path(&path);
slog!(
"[watcher] PLAN.md changed for '{}': {:?}",
story_id,
plan_state
);
crate::crdt_state::set_plan_state(&story_id, plan_state);
}
// Work-item file changes are intentionally ignored.
// CRDT state transitions handle pipeline events.
}
}
false
+67 -1
View File
@@ -1,6 +1,66 @@
//! Tests for the filesystem config watcher.
use super::*;
// ── extract_story_id_from_plan_path ──────────────────────────────────────────
#[test]
fn extracts_story_id_from_plan_path() {
let git_root = PathBuf::from("/proj");
let plan = PathBuf::from("/proj/.huskies/worktrees/42_story_foo/PLAN.md");
assert_eq!(
extract_story_id_from_plan_path(&plan, &git_root),
Some("42_story_foo".to_string())
);
}
#[test]
fn plan_path_wrong_filename_returns_none() {
let git_root = PathBuf::from("/proj");
let other = PathBuf::from("/proj/.huskies/worktrees/42_story_foo/README.md");
assert!(extract_story_id_from_plan_path(&other, &git_root).is_none());
}
#[test]
fn plan_path_not_in_worktrees_returns_none() {
let git_root = PathBuf::from("/proj");
let nested = PathBuf::from("/proj/.huskies/worktrees/42_story_foo/sub/PLAN.md");
assert!(extract_story_id_from_plan_path(&nested, &git_root).is_none());
}
#[test]
fn plan_path_wrong_root_returns_none() {
let git_root = PathBuf::from("/proj");
let other_root = PathBuf::from("/other/.huskies/worktrees/42_story_foo/PLAN.md");
assert!(extract_story_id_from_plan_path(&other_root, &git_root).is_none());
}
// ── plan_state_for_path ──────────────────────────────────────────────────────
#[test]
fn plan_state_missing_when_file_absent() {
use crate::pipeline_state::PlanState;
let path = PathBuf::from("/nonexistent/PLAN.md");
assert_eq!(plan_state_for_path(&path), PlanState::Missing);
}
#[test]
fn plan_state_drafted_when_file_contains_tbd() {
use crate::pipeline_state::PlanState;
use std::io::Write;
let tmp = tempfile::NamedTempFile::new().unwrap();
writeln!(tmp.as_file(), "# Plan\n- step 1 <TBD>\n- step 2").unwrap();
assert_eq!(plan_state_for_path(tmp.path()), PlanState::Drafted);
}
#[test]
fn plan_state_confirmed_when_file_has_no_tbd() {
use crate::pipeline_state::PlanState;
use std::io::Write;
let tmp = tempfile::NamedTempFile::new().unwrap();
writeln!(tmp.as_file(), "# Plan\n- step 1\n- step 2").unwrap();
assert_eq!(plan_state_for_path(tmp.path()), PlanState::Confirmed);
}
// ── is_config_file ────────────────────────────────────────────────────────
#[test]
@@ -54,7 +114,13 @@ fn stage_metadata_returns_correct_actions() {
use crate::pipeline_state::{GitSha, Stage};
use chrono::Utc;
let (action, msg) = stage_metadata(&Stage::Coding { claim: None }, "42_story_foo");
let (action, msg) = stage_metadata(
&Stage::Coding {
claim: None,
plan: Default::default(),
},
"42_story_foo",
);
assert_eq!(action, "start");
assert_eq!(msg, "huskies: start 42_story_foo");
+5 -2
View File
@@ -112,7 +112,7 @@ impl Default for EventBus {
#[cfg(test)]
mod tests {
use super::super::BranchName;
use super::super::{BranchName, PlanState};
use super::*;
use std::num::NonZeroU32;
@@ -149,7 +149,10 @@ mod tests {
bus.fire(TransitionFired {
story_id: StoryId("test".into()),
before: Stage::Backlog,
after: Stage::Coding { claim: None },
after: Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
event: PipelineEvent::DepsMet,
at: Utc::now(),
});
+2 -1
View File
@@ -41,7 +41,8 @@ mod tests;
#[allow(unused_imports)]
pub use types::{
AgentClaim, AgentName, ArchiveReason, BranchName, ExecutionState, GitSha, MergeFailureKind,
NodePubkey, PipelineItem, Stage, StoryId, TransitionError, stage_dir_name, stage_label,
NodePubkey, PipelineItem, PlanState, Stage, StoryId, TransitionError, stage_dir_name,
stage_label,
};
#[allow(unused_imports)]
+71 -3
View File
@@ -100,7 +100,7 @@ pub fn read_typed(story_id: &str) -> Result<Option<PipelineItem>, ProjectionErro
#[cfg(test)]
mod tests {
use super::*;
use crate::pipeline_state::{ArchiveReason, BranchName, GitSha, Stage};
use crate::pipeline_state::{ArchiveReason, BranchName, GitSha, PlanState, Stage};
use chrono::Utc;
use std::num::NonZeroU32;
@@ -157,7 +157,10 @@ mod tests {
fn project_current_item() {
let view = PipelineItemView::for_test(
"42_story_test",
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
"Test",
Some(crate::config::AgentName::Coder1),
2u32,
@@ -267,7 +270,10 @@ mod tests {
let view = make_view(
"42_story_test",
Stage::Frozen {
resume_to: Box::new(Stage::Coding { claim: None }),
resume_to: Box::new(Stage::Coding {
claim: None,
plan: PlanState::Missing,
}),
},
Some("Frozen Story"),
);
@@ -292,4 +298,66 @@ mod tests {
fn git_sha_constructs() {
let _ = GitSha("abc".to_string());
}
// ── PlanState projection ────────────────────────────────────────────
#[test]
fn project_coding_plan_missing() {
let view = make_view(
"42_story_test",
Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
Some("Test"),
);
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(
item.stage,
Stage::Coding {
plan: PlanState::Missing,
..
}
));
}
#[test]
fn project_coding_plan_drafted() {
let view = make_view(
"42_story_test",
Stage::Coding {
claim: None,
plan: PlanState::Drafted,
},
Some("Test"),
);
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(
item.stage,
Stage::Coding {
plan: PlanState::Drafted,
..
}
));
}
#[test]
fn project_coding_plan_confirmed() {
let view = make_view(
"42_story_test",
Stage::Coding {
claim: None,
plan: PlanState::Confirmed,
},
Some("Test"),
);
let item = PipelineItem::try_from(&view).unwrap();
assert!(matches!(
item.stage,
Stage::Coding {
plan: PlanState::Confirmed,
..
}
));
}
}
+79 -16
View File
@@ -52,7 +52,10 @@ fn happy_path_backlog_through_archived() {
#[test]
fn happy_path_with_qa() {
let s = Stage::Coding { claim: None };
let s = Stage::Coding {
claim: None,
plan: PlanState::Missing,
};
let s = transition(s, PipelineEvent::GatesStarted).unwrap();
assert!(matches!(s, Stage::Qa));
@@ -69,7 +72,10 @@ fn happy_path_with_qa() {
#[test]
fn qa_retry_loop() {
let s = Stage::Coding { claim: None };
let s = Stage::Coding {
claim: None,
plan: PlanState::Missing,
};
let s = transition(s, PipelineEvent::GatesStarted).unwrap();
assert!(matches!(s, Stage::Qa));
@@ -154,7 +160,13 @@ fn cannot_start_gates_from_backlog() {
#[test]
fn cannot_accept_from_coding() {
let result = transition(Stage::Coding { claim: None }, PipelineEvent::Accepted);
let result = transition(
Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
PipelineEvent::Accepted,
);
assert!(matches!(
result,
Err(TransitionError::InvalidTransition { .. })
@@ -165,7 +177,14 @@ fn cannot_accept_from_coding() {
#[test]
fn block_from_any_active_stage() {
for s in [Stage::Backlog, Stage::Coding { claim: None }, Stage::Qa] {
for s in [
Stage::Backlog,
Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
Stage::Qa,
] {
let result = transition(
s.clone(),
PipelineEvent::Block {
@@ -252,7 +271,10 @@ fn legacy_unblock_archived_blocked_returns_to_backlog() {
fn abandon_from_any_active_or_done() {
for s in [
Stage::Backlog,
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
Stage::Qa,
Stage::Done {
merged_at: chrono::Utc::now(),
@@ -268,7 +290,10 @@ fn abandon_from_any_active_or_done() {
fn supersede_from_any_active_or_done() {
for s in [
Stage::Backlog,
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
Stage::Qa,
Stage::Done {
merged_at: chrono::Utc::now(),
@@ -292,7 +317,14 @@ fn review_hold_from_active_stages() {
// Story 945: `ReviewHold` transitions to `Stage::ReviewHold { resume_to }`
// with the resume_to set to the originating stage, replacing the legacy
// boolean flag.
for s in [Stage::Backlog, Stage::Coding { claim: None }, Stage::Qa] {
for s in [
Stage::Backlog,
Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
Stage::Qa,
] {
let result = transition(
s.clone(),
PipelineEvent::ReviewHold {
@@ -338,7 +370,10 @@ fn merge_failed_final() {
#[test]
fn merge_failed_only_from_merge() {
let result = transition(
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
PipelineEvent::MergeFailedFinal {
reason: "conflicts".into(),
},
@@ -483,7 +518,14 @@ fn cannot_deps_met_from_upcoming() {
#[test]
fn reject_from_active_stages() {
for s in [Stage::Backlog, Stage::Coding { claim: None }, Stage::Qa] {
for s in [
Stage::Backlog,
Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
Stage::Qa,
] {
let result = transition(
s.clone(),
PipelineEvent::Reject {
@@ -989,7 +1031,10 @@ fn hotfix_requested_from_done_lands_in_coding() {
fn hotfix_requested_rejected_from_non_done_stages() {
for stage in [
Stage::Backlog,
Stage::Coding { claim: None },
Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
Stage::Qa,
Stage::Merge {
feature_branch: fb("feature/story-1"),
@@ -1016,7 +1061,10 @@ fn audit_entry_backlog_to_coding_exact_format() {
let fired = TransitionFired {
story_id: StoryId("1014_my_story".into()),
before: Stage::Backlog,
after: Stage::Coding { claim: None },
after: Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
event: PipelineEvent::DepsMet,
at,
};
@@ -1116,7 +1164,10 @@ fn audit_entry_done_to_archived() {
fn audit_entry_coding_to_blocked() {
let fired = TransitionFired {
story_id: StoryId("300_s".into()),
before: Stage::Coding { claim: None },
before: Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
after: Stage::Blocked {
reason: "waiting".into(),
},
@@ -1138,7 +1189,10 @@ fn audit_entry_blocked_to_coding() {
before: Stage::Blocked {
reason: "test".into(),
},
after: Stage::Coding { claim: None },
after: Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
event: PipelineEvent::Unblock,
at: chrono::Utc::now(),
};
@@ -1177,9 +1231,15 @@ fn audit_entry_merge_to_merge_failure() {
fn audit_entry_coding_to_frozen() {
let fired = TransitionFired {
story_id: StoryId("600_s".into()),
before: Stage::Coding { claim: None },
before: Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
after: Stage::Frozen {
resume_to: Box::new(Stage::Coding { claim: None }),
resume_to: Box::new(Stage::Coding {
claim: None,
plan: PlanState::Missing,
}),
},
event: PipelineEvent::Freeze,
at: chrono::Utc::now(),
@@ -1194,7 +1254,10 @@ fn audit_entry_coding_to_frozen() {
fn audit_entry_coding_to_abandoned() {
let fired = TransitionFired {
story_id: StoryId("700_s".into()),
before: Stage::Coding { claim: None },
before: Stage::Coding {
claim: None,
plan: PlanState::Missing,
},
after: Stage::Abandoned {
ts: chrono::Utc::now(),
},
+30 -9
View File
@@ -4,8 +4,8 @@ use chrono::Utc;
use serde::{Deserialize, Serialize};
use super::{
AgentName, ArchiveReason, BranchName, ExecutionState, GitSha, MergeFailureKind, Stage, StoryId,
TransitionError, stage_label,
AgentName, ArchiveReason, BranchName, ExecutionState, GitSha, MergeFailureKind, PlanState,
Stage, StoryId, TransitionError, stage_label,
};
// ── Pipeline events ─────────────────────────────────────────────────────────
@@ -149,7 +149,10 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
(Upcoming, Triage) => Ok(Backlog),
// ── Forward path ────────────────────────────────────────────────
(Backlog, DepsMet) => Ok(Coding { claim: None }),
(Backlog, DepsMet) => Ok(Coding {
claim: None,
plan: PlanState::Missing,
}),
(Coding { .. }, GatesStarted) => Ok(Qa),
(
Coding { .. },
@@ -173,7 +176,10 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
commits_ahead,
claim: None,
}),
(Qa, GatesFailed { .. }) => Ok(Coding { claim: None }),
(Qa, GatesFailed { .. }) => Ok(Coding {
claim: None,
plan: PlanState::Missing,
}),
(Merge { .. }, MergeSucceeded { merge_commit }) => Ok(Done {
merged_at: now,
merge_commit,
@@ -312,7 +318,10 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
(Stage::ReviewHold { resume_to, .. }, ReviewHoldCleared) => Ok(*resume_to),
// ── FixupRequested: MergeFailure → Coding (coder fixup) ────────
(MergeFailure { .. }, FixupRequested) => Ok(Coding { claim: None }),
(MergeFailure { .. }, FixupRequested) => Ok(Coding {
claim: None,
plan: PlanState::Missing,
}),
// ── FixupRequested: MergeFailureFinal → Coding (operator override)
//
@@ -321,19 +330,28 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
// the gate failure is fixable and send the story back for another
// coder attempt. The budget counter is a mergemaster bookkeeping
// detail, not a hard ceiling.
(MergeFailureFinal { .. }, FixupRequested) => Ok(Coding { claim: None }),
(MergeFailureFinal { .. }, FixupRequested) => Ok(Coding {
claim: None,
plan: PlanState::Missing,
}),
// ── ReQueuedForQa: MergeFailure → Qa (re-review) ────────────────
(MergeFailure { .. }, ReQueuedForQa) => Ok(Qa),
// ── MergeAborted: Merge → Coding (abort in-flight merge) ─────────
(Merge { .. }, MergeAborted) => Ok(Coding { claim: None }),
(Merge { .. }, MergeAborted) => Ok(Coding {
claim: None,
plan: PlanState::Missing,
}),
// ── HotfixRequested: Done → Coding (post-merge hotfix) ───────────
// Allows reopening a completed story so a coder can apply a hotfix.
// A fresh feature branch is forked from master when auto-assign spawns
// the coder.
(Done { .. }, HotfixRequested) => Ok(Coding { claim: None }),
(Done { .. }, HotfixRequested) => Ok(Coding {
claim: None,
plan: PlanState::Missing,
}),
// ── MergemasterAttempted: MergeFailure → MergeFailureFinal ─────
(MergeFailure { kind, .. }, MergemasterAttempted) => Ok(MergeFailureFinal { kind }),
@@ -344,7 +362,10 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
(Stage::ReviewHold { resume_to, .. }, Unblock) => Ok(*resume_to),
// ── Unblock: Blocked → Coding ─────────────────────────────────
(Blocked { .. }, Unblock) => Ok(Coding { claim: None }),
(Blocked { .. }, Unblock) => Ok(Coding {
claim: None,
plan: PlanState::Missing,
}),
// ── Unblock MergeFailure → Merge (re-attempt) ────────────────────
// `unblock_story` on a failed merge re-queues it for merge, restoring
+61 -4
View File
@@ -125,6 +125,48 @@ pub struct AgentClaim {
pub claimed_at: DateTime<Utc>,
}
// ── Plan state (PLAN.md lifecycle inside Stage::Coding) ────────────────────
/// Lifecycle state of the `PLAN.md` file inside a coding worktree.
///
/// Updated by the filesystem watcher whenever PLAN.md is created, modified,
/// or removed in a story's worktree. Embedded in [`Stage::Coding`] so
/// callers access it via the typed projection instead of greping the filesystem.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum PlanState {
/// No `PLAN.md` file exists in the worktree yet.
#[default]
Missing,
/// `PLAN.md` exists but contains `<TBD>` placeholders — the plan has been
/// drafted but not yet confirmed with real file paths and descriptions.
Drafted,
/// `PLAN.md` exists and contains no `<TBD>` placeholders — the plan is
/// considered confirmed.
Confirmed,
}
impl PlanState {
/// Wire-form string stored in the `plan_state` CRDT register.
pub fn as_str(&self) -> &'static str {
match self {
PlanState::Missing => "missing",
PlanState::Drafted => "drafted",
PlanState::Confirmed => "confirmed",
}
}
/// Parse from a `plan_state` CRDT register value.
///
/// Unknown or empty strings default to [`PlanState::Missing`].
pub fn from_str(s: &str) -> Self {
match s {
"drafted" => PlanState::Drafted,
"confirmed" => PlanState::Confirmed,
_ => PlanState::Missing,
}
}
}
// ── Synced pipeline stage (lives in CRDT, converges across nodes) ───────────
/// The pipeline stage for a work item.
@@ -167,7 +209,13 @@ pub enum Stage {
/// working on this item. `None` means the item is in the coding stage but
/// no agent has claimed it yet (e.g. just transitioned from Backlog and
/// waiting for an agent to pick it up).
Coding { claim: Option<AgentClaim> },
///
/// `plan` tracks the lifecycle of the `PLAN.md` file in the worktree,
/// updated by the filesystem watcher on create/modify/remove events.
Coding {
claim: Option<AgentClaim>,
plan: PlanState,
},
/// Coder has run; gates are running.
Qa,
@@ -299,7 +347,10 @@ impl Stage {
match s {
"upcoming" => Some(Stage::Upcoming),
"backlog" => Some(Stage::Backlog),
"coding" => Some(Stage::Coding { claim: None }),
"coding" => Some(Stage::Coding {
claim: None,
plan: PlanState::Missing,
}),
"blocked" => Some(Stage::Blocked {
reason: String::new(),
}),
@@ -318,10 +369,16 @@ impl Stage {
kind: MergeFailureKind::Other(String::new()),
}),
"frozen" => Some(Stage::Frozen {
resume_to: Box::new(Stage::Coding { claim: None }),
resume_to: Box::new(Stage::Coding {
claim: None,
plan: PlanState::Missing,
}),
}),
"review_hold" => Some(Stage::ReviewHold {
resume_to: Box::new(Stage::Coding { claim: None }),
resume_to: Box::new(Stage::Coding {
claim: None,
plan: PlanState::Missing,
}),
reason: String::new(),
}),
"done" => Some(Stage::Done {
+7 -1
View File
@@ -144,7 +144,13 @@ pub fn get_work_item_content(
let stages = [
("1_backlog", Stage::Backlog),
("2_current", Stage::Coding { claim: None }),
(
"2_current",
Stage::Coding {
claim: None,
plan: Default::default(),
},
),
("3_qa", Stage::Qa),
(
"4_merge",
+24 -6
View File
@@ -254,7 +254,10 @@ mod tests {
fn stage_display_name_maps_all_known_stages() {
assert_eq!(stage_display_name(&Stage::Backlog), "Backlog");
assert_eq!(
stage_display_name(&Stage::Coding { claim: None }),
stage_display_name(&Stage::Coding {
claim: None,
plan: Default::default()
}),
"Current"
);
assert_eq!(stage_display_name(&Stage::Qa), "QA");
@@ -293,7 +296,10 @@ mod tests {
"42_story_thing",
"Some Story",
&Stage::Backlog,
&Stage::Coding { claim: None },
&Stage::Coding {
claim: None,
plan: Default::default(),
},
);
assert!(!plain.contains("\u{1f389}"));
}
@@ -304,7 +310,10 @@ mod tests {
"261_story_bot_notifications",
"Bot notifications",
&Stage::Upcoming,
&Stage::Coding { claim: None },
&Stage::Coding {
claim: None,
plan: Default::default(),
},
);
assert_eq!(
plain,
@@ -321,7 +330,10 @@ mod tests {
let (plain, html) = format_stage_notification(
"42_bug_fix_thing",
"",
&Stage::Coding { claim: None },
&Stage::Coding {
claim: None,
plan: Default::default(),
},
&Stage::Qa,
);
assert_eq!(plain, "#42 \u{2014} Current \u{2192} QA");
@@ -344,7 +356,10 @@ mod tests {
let (plain, _html) = format_stage_notification(
"1_story_long",
&long_name,
&Stage::Coding { claim: None },
&Stage::Coding {
claim: None,
plan: Default::default(),
},
&Stage::Qa,
);
assert!(plain.contains(&long_name));
@@ -355,7 +370,10 @@ mod tests {
let (plain, html) = format_stage_notification(
"42_story_empty",
"",
&Stage::Coding { claim: None },
&Stage::Coding {
claim: None,
plan: Default::default(),
},
&Stage::Qa,
);
assert_eq!(plain, "#42 \u{2014} Current \u{2192} QA");
+4 -1
View File
@@ -315,7 +315,10 @@ mod tests {
let config = empty_config();
let report = run_cleanup_with_lookup(&project_root, &config, true, |id| {
if id == story_id {
Some(Stage::Coding { claim: None })
Some(Stage::Coding {
claim: None,
plan: Default::default(),
})
} else {
None
}
+6 -2
View File
@@ -184,7 +184,8 @@ mod tests {
#[test]
fn should_not_sweep_coding() {
assert!(!worktree_should_be_swept(Some(&Stage::Coding {
claim: None
claim: None,
plan: Default::default(),
})));
}
@@ -308,7 +309,10 @@ mod tests {
let removed = sweep_with_lookup(&project_root, &config, |id| {
if id == story_id {
Some(Stage::Coding { claim: None })
Some(Stage::Coding {
claim: None,
plan: Default::default(),
})
} else {
None
}