From 13ab97a6157ebb80c395244fa99787737d2f6a3b Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 14 May 2026 08:07:43 +0000 Subject: [PATCH] huskies: merge 1010 --- server/src/agent_mode/claim.rs | 5 +- server/src/agent_mode/loop_ops.rs | 4 +- .../src/agents/pool/auto_assign/pipeline.rs | 8 +- server/src/agents/pool/auto_assign/scan.rs | 5 +- .../src/agents/pool/cost_rollup_subscriber.rs | 14 ++- server/src/chat/commands/backlog.rs | 10 +- server/src/chat/commands/status/tests.rs | 65 ++++++++++--- server/src/crdt_state/mod.rs | 4 +- server/src/crdt_state/presence.rs | 2 +- server/src/crdt_state/read.rs | 21 +++- server/src/crdt_state/types.rs | 14 ++- server/src/crdt_state/write/item.rs | 28 +++++- server/src/crdt_state/write/migrations.rs | 12 ++- server/src/crdt_state/write/mod.rs | 4 +- server/src/http/mcp/status_tools.rs | 2 +- server/src/io/watcher/mod.rs | 67 +++++++++++-- server/src/io/watcher/tests.rs | 68 ++++++++++++- server/src/pipeline_state/events.rs | 7 +- server/src/pipeline_state/mod.rs | 3 +- server/src/pipeline_state/projection.rs | 74 ++++++++++++++- server/src/pipeline_state/tests.rs | 95 +++++++++++++++---- server/src/pipeline_state/transition.rs | 39 ++++++-- server/src/pipeline_state/types.rs | 65 ++++++++++++- server/src/service/agents/mod.rs | 8 +- server/src/service/notifications/format.rs | 30 ++++-- server/src/worktree/cleanup.rs | 5 +- server/src/worktree/sweep.rs | 8 +- 27 files changed, 572 insertions(+), 95 deletions(-) diff --git a/server/src/agent_mode/claim.rs b/server/src/agent_mode/claim.rs index 13450fea..1da638cd 100644 --- a/server/src/agent_mode/claim.rs +++ b/server/src/agent_mode/claim.rs @@ -100,6 +100,7 @@ mod tests { .single() .unwrap(), }), + plan: Default::default(), }, Some("Stale Claim Displacement Test"), None, @@ -111,7 +112,7 @@ mod tests { // Confirm the stale claim is in place. let before = read_item(story_id).expect("item should exist"); let before_claim = match before.stage() { - Stage::Coding { claim } => claim.as_ref(), + Stage::Coding { claim, .. } => claim.as_ref(), Stage::Merge { claim, .. } => claim.as_ref(), _ => None, }; @@ -150,7 +151,7 @@ mod tests { let our_id = our_node_id().expect("node id should be available after init_for_test"); let after = read_item(story_id).expect("item should still exist"); let after_claim = match after.stage() { - Stage::Coding { claim } => claim.as_ref(), + Stage::Coding { claim, .. } => claim.as_ref(), Stage::Merge { claim, .. } => claim.as_ref(), _ => None, }; diff --git a/server/src/agent_mode/loop_ops.rs b/server/src/agent_mode/loop_ops.rs index 273afaa1..bee86110 100644 --- a/server/src/agent_mode/loop_ops.rs +++ b/server/src/agent_mode/loop_ops.rs @@ -66,7 +66,7 @@ pub(super) async fn scan_and_claim( } let item_claim = match item.stage() { - crate::pipeline_state::Stage::Coding { claim } => claim.as_ref(), + crate::pipeline_state::Stage::Coding { claim, .. } => claim.as_ref(), crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(), _ => None, }; @@ -196,7 +196,7 @@ pub(super) fn reclaim_timed_out_work(_project_root: &Path) { // holder is still alive. A node actively working should refresh its // claim before the TTL window closes. let reclaim_claim = match item.stage() { - crate::pipeline_state::Stage::Coding { claim } => claim.as_ref(), + crate::pipeline_state::Stage::Coding { claim, .. } => claim.as_ref(), crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(), _ => None, }; diff --git a/server/src/agents/pool/auto_assign/pipeline.rs b/server/src/agents/pool/auto_assign/pipeline.rs index a78bf70b..a07b1313 100644 --- a/server/src/agents/pool/auto_assign/pipeline.rs +++ b/server/src/agents/pool/auto_assign/pipeline.rs @@ -27,7 +27,13 @@ impl AgentPool { /// here as well. pub(super) async fn assign_pipeline_stages(&self, project_root: &Path, config: &ProjectConfig) { let stages: [(Stage, PipelineStage); 2] = [ - (Stage::Coding { claim: None }, PipelineStage::Coder), + ( + Stage::Coding { + claim: None, + plan: Default::default(), + }, + PipelineStage::Coder, + ), (Stage::Qa, PipelineStage::Qa), ]; diff --git a/server/src/agents/pool/auto_assign/scan.rs b/server/src/agents/pool/auto_assign/scan.rs index b5138eae..12944228 100644 --- a/server/src/agents/pool/auto_assign/scan.rs +++ b/server/src/agents/pool/auto_assign/scan.rs @@ -218,7 +218,10 @@ mod tests { crate::db::ItemMeta::named("baz"), ); - let items = scan_stage_items(&Stage::Coding { claim: None }); + let items = scan_stage_items(&Stage::Coding { + claim: None, + plan: Default::default(), + }); // The global CRDT may contain items from other tests, so check // that our three items are present and appear in sorted order. assert!( diff --git a/server/src/agents/pool/cost_rollup_subscriber.rs b/server/src/agents/pool/cost_rollup_subscriber.rs index dca2450a..6424ebf6 100644 --- a/server/src/agents/pool/cost_rollup_subscriber.rs +++ b/server/src/agents/pool/cost_rollup_subscriber.rs @@ -100,7 +100,9 @@ mod tests { use super::*; use crate::agents::TokenUsage; use crate::agents::token_usage::TokenUsageRecord; - use crate::pipeline_state::{BranchName, PipelineEvent, Stage, StoryId, TransitionFired}; + use crate::pipeline_state::{ + BranchName, PipelineEvent, PlanState, Stage, StoryId, TransitionFired, + }; use chrono::Utc; use std::num::NonZeroU32; @@ -148,7 +150,10 @@ mod tests { fn fired_abandoned(story_id: &str) -> TransitionFired { TransitionFired { story_id: StoryId(story_id.to_string()), - before: Stage::Coding { claim: None }, + before: Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, after: Stage::Abandoned { ts: Utc::now() }, event: PipelineEvent::Abandon, at: Utc::now(), @@ -159,7 +164,10 @@ mod tests { TransitionFired { story_id: StoryId(story_id.to_string()), before: Stage::Backlog, - after: Stage::Coding { claim: None }, + after: Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, event: PipelineEvent::DepsMet, at: Utc::now(), } diff --git a/server/src/chat/commands/backlog.rs b/server/src/chat/commands/backlog.rs index 5c77d997..1dd4146c 100644 --- a/server/src/chat/commands/backlog.rs +++ b/server/src/chat/commands/backlog.rs @@ -93,7 +93,10 @@ mod tests { make_item( "20_story_in_progress", "In Progress", - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: Default::default(), + }, ), make_item("30_story_in_qa", "In QA", Stage::Qa), ]; @@ -234,7 +237,10 @@ mod tests { let items = vec![make_item( "1_story_done", "Done", - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: Default::default(), + }, )]; let output = build_backlog_from_items(&items); assert!( diff --git a/server/src/chat/commands/status/tests.rs b/server/src/chat/commands/status/tests.rs index ad8bda22..ac862f24 100644 --- a/server/src/chat/commands/status/tests.rs +++ b/server/src/chat/commands/status/tests.rs @@ -137,7 +137,10 @@ fn status_does_not_show_full_filename_stem() { let items = vec![make_item( "293_story_register_all_bot_commands", "Register all bot commands", - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: Default::default(), + }, )]; let agents = AgentPool::new_test(3000); @@ -163,7 +166,10 @@ fn status_shows_cost_when_token_usage_exists() { let items = vec![make_item( "293_story_register_all_bot_commands", "Register all bot commands", - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: Default::default(), + }, )]; // Write token usage for this story. @@ -200,7 +206,10 @@ fn status_no_cost_when_no_usage() { let items = vec![make_item( "293_story_register_all_bot_commands", "Register all bot commands", - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: Default::default(), + }, )]; let agents = AgentPool::new_test(3000); @@ -220,7 +229,10 @@ fn status_aggregates_multiple_records_per_story() { let items = vec![make_item( "293_story_register_all_bot_commands", "Register all bot commands", - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: Default::default(), + }, )]; // Write two records for the same story — costs should be summed. @@ -264,7 +276,10 @@ fn status_shows_waiting_on_for_story_with_unmet_deps() { make_item_with_deps( "10_story_waiting", "Waiting Story", - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: Default::default(), + }, vec![999], ), make_item("999_story_dep", "Dep Story", Stage::Backlog), @@ -289,7 +304,10 @@ fn status_does_not_show_waiting_on_when_dep_is_done() { make_item_with_deps( "10_story_unblocked", "Unblocked Story", - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: Default::default(), + }, vec![999], ), make_item( @@ -319,7 +337,10 @@ fn status_shows_no_waiting_info_when_no_deps() { let items = vec![make_item( "42_story_nodeps", "No Deps Story", - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: Default::default(), + }, )]; let agents = AgentPool::new_test(3000); @@ -383,7 +404,10 @@ fn stage_is_blocked_returns_true_for_archived_blocked() { #[test] fn stage_is_blocked_returns_false_for_coding() { assert!(!matches!( - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: Default::default() + }, Stage::Blocked { .. } | Stage::MergeFailure { .. } | Stage::MergeFailureFinal { .. } @@ -422,7 +446,10 @@ fn status_shows_idle_dot_for_unassigned_story() { let items = vec![make_item( "42_story_idle", "Idle Story", - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: Default::default(), + }, )]; let agents = AgentPool::new_test(3000); @@ -793,7 +820,10 @@ fn in_progress_count_includes_blocked_items() { make_item( "10_story_coding", "Coding Story", - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: Default::default(), + }, ), make_item( "11_story_blocked", @@ -825,7 +855,10 @@ fn frozen_coding_item_appears_in_in_progress_section() { "60_story_frozen", "Frozen Coding Story", Stage::Frozen { - resume_to: Box::new(Stage::Coding { claim: None }), + resume_to: Box::new(Stage::Coding { + claim: None, + plan: Default::default(), + }), }, )]; @@ -883,7 +916,10 @@ fn frozen_item_shows_snowflake_indicator() { "80_story_frozen_flake", "Frozen Flake Story", Stage::Frozen { - resume_to: Box::new(Stage::Coding { claim: None }), + resume_to: Box::new(Stage::Coding { + claim: None, + plan: Default::default(), + }), }, )]; @@ -913,7 +949,10 @@ fn frozen_and_blocked_use_distinct_indicators() { "91_story_frozen_ind", "Frozen Story", Stage::Frozen { - resume_to: Box::new(Stage::Coding { claim: None }), + resume_to: Box::new(Stage::Coding { + claim: None, + plan: Default::default(), + }), }, ), ]; diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index c62c1300..f0e8000e 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -55,8 +55,8 @@ pub use types::{ pub use write::{ bump_retry_count, migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs, migrate_node_claims_to_agent_claims, migrate_story_ids_to_numeric, name_from_story_id, - set_agent, set_depends_on, set_epic, set_item_type, set_name, set_qa_mode, set_resume_to, - set_resume_to_raw, set_retry_count, write_item, + set_agent, set_depends_on, set_epic, set_item_type, set_name, set_plan_state, set_qa_mode, + set_resume_to, set_resume_to_raw, set_retry_count, write_item, }; #[cfg(test)] diff --git a/server/src/crdt_state/presence.rs b/server/src/crdt_state/presence.rs index 761b0c7f..9cac78d9 100644 --- a/server/src/crdt_state/presence.rs +++ b/server/src/crdt_state/presence.rs @@ -110,7 +110,7 @@ pub fn is_claimed_by_us(story_id: &str) -> bool { return false; }; let claim = match item.stage() { - crate::pipeline_state::Stage::Coding { claim } => claim.as_ref(), + crate::pipeline_state::Stage::Coding { claim, .. } => claim.as_ref(), crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(), _ => None, }; diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index c049da90..83252563 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -398,6 +398,11 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option None, }; + let plan_state_str = match item.plan_state.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let stage = project_stage_for_view( &stage_str, &story_id, @@ -405,6 +410,7 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option, claim_agent: Option<&str>, claim_ts_secs: Option, + plan_state_str: Option<&str>, ) -> Option { - use crate::pipeline_state::{AgentClaim, AgentName, ArchiveReason, BranchName, GitSha, Stage}; + use crate::pipeline_state::{ + AgentClaim, AgentName, ArchiveReason, BranchName, GitSha, PlanState, Stage, + }; use chrono::{DateTime, TimeZone, Utc}; use std::num::NonZeroU32; @@ -470,7 +479,10 @@ fn project_stage_for_view( Box::new( resume_to .and_then(Stage::from_dir) - .unwrap_or(Stage::Coding { claim: None }), + .unwrap_or(Stage::Coding { + claim: None, + plan: PlanState::Missing, + }), ) }; @@ -489,7 +501,10 @@ fn project_stage_for_view( match clean { "upcoming" => Some(Stage::Upcoming), "backlog" => Some(Stage::Backlog), - "coding" => Some(Stage::Coding { claim }), + "coding" => Some(Stage::Coding { + claim, + plan: PlanState::from_str(plan_state_str.unwrap_or("")), + }), "qa" => Some(Stage::Qa), "blocked" => Some(Stage::Blocked { reason: String::new(), diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs index c85cd851..be175c06 100644 --- a/server/src/crdt_state/types.rs +++ b/server/src/crdt_state/types.rs @@ -96,6 +96,10 @@ pub struct PipelineItemCrdt { /// `"rejected"`. These stages never have a resume target, so the /// register is exclusively available for their metadata. pub resume_to: LwwRegisterCrdt, + /// Story 1010: lifecycle state of `PLAN.md` in the coding worktree. + /// Wire values: `"missing"` (default/empty), `"drafted"`, `"confirmed"`. + /// Updated by the filesystem watcher on PLAN.md create/modify/remove events. + pub plan_state: LwwRegisterCrdt, } /// CRDT node that holds a single peer's presence entry. @@ -518,7 +522,10 @@ mod tests { let evt = CrdtEvent { story_id: "42_story_foo".to_string(), from_stage: Some(crate::pipeline_state::Stage::Backlog), - to_stage: crate::pipeline_state::Stage::Coding { claim: None }, + to_stage: crate::pipeline_state::Stage::Coding { + claim: None, + plan: crate::pipeline_state::PlanState::Missing, + }, name: "Foo Feature".to_string(), }; assert_eq!(evt.story_id, "42_story_foo"); @@ -678,7 +685,10 @@ mod tests { let evt = CrdtEvent { story_id: "70_story_broadcast".to_string(), from_stage: Some(Stage::Backlog), - to_stage: Stage::Coding { claim: None }, + to_stage: Stage::Coding { + claim: None, + plan: crate::pipeline_state::PlanState::Missing, + }, name: "Broadcast Test".to_string(), }; tx.send(evt).unwrap(); diff --git a/server/src/crdt_state/write/item.rs b/server/src/crdt_state/write/item.rs index a89e4942..34291eae 100644 --- a/server/src/crdt_state/write/item.rs +++ b/server/src/crdt_state/write/item.rs @@ -211,6 +211,30 @@ pub fn set_qa_mode(story_id: &str, mode: Option) -> bool { true } +/// Set the `plan_state` CRDT register for a pipeline item (story 1010). +/// +/// Encodes the PLAN.md lifecycle as a wire string (`"missing"`, `"drafted"`, +/// `"confirmed"`). Called by the filesystem watcher when PLAN.md is created, +/// modified, or removed inside a coding worktree. +/// +/// Returns `true` if the item was found and the op was applied, `false` otherwise. +pub fn set_plan_state(story_id: &str, state: crate::pipeline_state::PlanState) -> bool { + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut crdt_state) = state_mutex.lock() else { + return false; + }; + let Some(&idx) = crdt_state.index.get(story_id) else { + return false; + }; + let value = state.as_str().to_string(); + apply_and_persist(&mut crdt_state, |s| { + s.crdt.doc.items[idx].plan_state.set(value) + }); + true +} + /// Write a pipeline item state through CRDT operations. /// /// If the item exists, updates its registers. If not, inserts a new item @@ -232,7 +256,7 @@ pub fn write_item( ) { let stage_str = stage_dir_name(stage); let claim: Option<&AgentClaim> = match stage { - Stage::Coding { claim } => claim.as_ref(), + Stage::Coding { claim, .. } => claim.as_ref(), Stage::Merge { claim, .. } => claim.as_ref(), _ => None, }; @@ -350,6 +374,7 @@ pub fn write_item( "item_type": "", "epic": "", "resume_to": "", + "plan_state": "", }) .into(); @@ -378,6 +403,7 @@ pub fn write_item( item.item_type.advance_seq(floor); item.epic.advance_seq(floor); item.resume_to.advance_seq(floor); + item.plan_state.advance_seq(floor); } // Broadcast a CrdtEvent for the new item. diff --git a/server/src/crdt_state/write/migrations.rs b/server/src/crdt_state/write/migrations.rs index b7dfbcb8..72d158a5 100644 --- a/server/src/crdt_state/write/migrations.rs +++ b/server/src/crdt_state/write/migrations.rs @@ -333,7 +333,7 @@ mod stage_migration_tests { use super::super::item::write_item; use super::*; use crate::crdt_state::read_item; - use crate::pipeline_state::{BranchName, Stage}; + use crate::pipeline_state::{BranchName, PlanState, Stage}; use std::num::NonZeroU32; /// Seed a pipeline item with a raw, possibly-legacy stage register value, @@ -370,7 +370,10 @@ mod stage_migration_tests { ( "9503_legacy_coding", "2_current", - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, ), ( "9504_legacy_blocked", @@ -452,7 +455,10 @@ mod stage_migration_tests { // Seed two items: one already in clean form, one in legacy form. write_item( "9520_already_clean", - &Stage::Coding { claim: None }, + &Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, Some("Already Clean"), None, None, diff --git a/server/src/crdt_state/write/mod.rs b/server/src/crdt_state/write/mod.rs index 68f6d241..361a1e59 100644 --- a/server/src/crdt_state/write/mod.rs +++ b/server/src/crdt_state/write/mod.rs @@ -10,8 +10,8 @@ mod migrations; mod tests; pub use item::{ - bump_retry_count, set_agent, set_depends_on, set_epic, set_item_type, set_name, set_qa_mode, - set_resume_to, set_resume_to_raw, set_retry_count, write_item, + bump_retry_count, set_agent, set_depends_on, set_epic, set_item_type, set_name, set_plan_state, + set_qa_mode, set_resume_to, set_resume_to_raw, set_retry_count, write_item, }; #[cfg(test)] diff --git a/server/src/http/mcp/status_tools.rs b/server/src/http/mcp/status_tools.rs index 29b55773..a90d8831 100644 --- a/server/src/http/mcp/status_tools.rs +++ b/server/src/http/mcp/status_tools.rs @@ -196,7 +196,7 @@ pub(super) async fn tool_status(args: &Value, ctx: &AppContext) -> Result claim.as_ref(), + crate::pipeline_state::Stage::Coding { claim, .. } => claim.as_ref(), crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(), _ => None, }; diff --git a/server/src/io/watcher/mod.rs b/server/src/io/watcher/mod.rs index 752fc532..04e241fa 100644 --- a/server/src/io/watcher/mod.rs +++ b/server/src/io/watcher/mod.rs @@ -1,15 +1,16 @@ -//! Filesystem watcher for `.huskies/project.toml` and `.huskies/agents.toml`. +//! Filesystem watcher for `.huskies/project.toml`, `.huskies/agents.toml`, +//! and `.huskies/worktrees/*/PLAN.md`. //! //! Watches config files for changes and broadcasts a [`WatcherEvent`] to all //! connected WebSocket clients so the frontend can reload the agent roster -//! without a server restart. -//! -//! Work-item pipeline events (stage transitions) are driven by CRDT state -//! changes via [`crate::crdt_state::subscribe`], not by filesystem events. +//! without a server restart. Also watches worktree PLAN.md files and updates +//! the typed [`crate::pipeline_state::PlanState`] in the CRDT whenever a +//! PLAN.md is created, modified, or removed. //! //! # Debouncing //! Config-file events are buffered for 300 ms after the last activity to avoid //! duplicate broadcasts when an editor writes multiple events in quick succession. +//! PLAN.md events are applied immediately without debouncing. //! //! # Submodules //! - [`events`] — [`WatcherEvent`] enum definition. @@ -28,6 +29,37 @@ use std::sync::mpsc; use std::time::{Duration, Instant}; use tokio::sync::broadcast; +/// Extract the story ID from a path of the form +/// `{git_root}/.huskies/worktrees/{story_id}/PLAN.md`. +/// +/// Returns `Some(story_id)` when `path` is exactly a `PLAN.md` file one level +/// inside the worktrees directory. Returns `None` for any other path. +pub fn extract_story_id_from_plan_path(path: &Path, git_root: &Path) -> Option { + if path.file_name()? != "PLAN.md" { + return None; + } + let parent = path.parent()?; + let expected_worktrees = git_root.join(".huskies").join("worktrees"); + if parent.parent()? != expected_worktrees { + return None; + } + Some(parent.file_name()?.to_str()?.to_string()) +} + +/// Determine the [`PlanState`] for a PLAN.md file at `path`. +/// +/// - File absent or unreadable → [`PlanState::Missing`] +/// - File contains `` → [`PlanState::Drafted`] +/// - File exists with no `` → [`PlanState::Confirmed`] +pub fn plan_state_for_path(path: &Path) -> crate::pipeline_state::PlanState { + use crate::pipeline_state::PlanState; + match std::fs::read_to_string(path) { + Ok(content) if content.contains("") => PlanState::Drafted, + Ok(_) => PlanState::Confirmed, + Err(_) => PlanState::Missing, + } +} + /// Return `true` if `path` is the root-level `.huskies/project.toml` or /// `.huskies/agents.toml`, i.e. `{git_root}/.huskies/{project,agents}.toml`. /// @@ -100,8 +132,7 @@ pub fn start_watcher(git_root: PathBuf, event_tx: broadcast::Sender\n- step 2").unwrap(); + assert_eq!(plan_state_for_path(tmp.path()), PlanState::Drafted); +} + +#[test] +fn plan_state_confirmed_when_file_has_no_tbd() { + use crate::pipeline_state::PlanState; + use std::io::Write; + let tmp = tempfile::NamedTempFile::new().unwrap(); + writeln!(tmp.as_file(), "# Plan\n- step 1\n- step 2").unwrap(); + assert_eq!(plan_state_for_path(tmp.path()), PlanState::Confirmed); +} + // ── is_config_file ──────────────────────────────────────────────────────── #[test] @@ -54,7 +114,13 @@ fn stage_metadata_returns_correct_actions() { use crate::pipeline_state::{GitSha, Stage}; use chrono::Utc; - let (action, msg) = stage_metadata(&Stage::Coding { claim: None }, "42_story_foo"); + let (action, msg) = stage_metadata( + &Stage::Coding { + claim: None, + plan: Default::default(), + }, + "42_story_foo", + ); assert_eq!(action, "start"); assert_eq!(msg, "huskies: start 42_story_foo"); diff --git a/server/src/pipeline_state/events.rs b/server/src/pipeline_state/events.rs index d068711a..82252353 100644 --- a/server/src/pipeline_state/events.rs +++ b/server/src/pipeline_state/events.rs @@ -112,7 +112,7 @@ impl Default for EventBus { #[cfg(test)] mod tests { - use super::super::BranchName; + use super::super::{BranchName, PlanState}; use super::*; use std::num::NonZeroU32; @@ -149,7 +149,10 @@ mod tests { bus.fire(TransitionFired { story_id: StoryId("test".into()), before: Stage::Backlog, - after: Stage::Coding { claim: None }, + after: Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, event: PipelineEvent::DepsMet, at: Utc::now(), }); diff --git a/server/src/pipeline_state/mod.rs b/server/src/pipeline_state/mod.rs index e181a90e..85fc642e 100644 --- a/server/src/pipeline_state/mod.rs +++ b/server/src/pipeline_state/mod.rs @@ -41,7 +41,8 @@ mod tests; #[allow(unused_imports)] pub use types::{ AgentClaim, AgentName, ArchiveReason, BranchName, ExecutionState, GitSha, MergeFailureKind, - NodePubkey, PipelineItem, Stage, StoryId, TransitionError, stage_dir_name, stage_label, + NodePubkey, PipelineItem, PlanState, Stage, StoryId, TransitionError, stage_dir_name, + stage_label, }; #[allow(unused_imports)] diff --git a/server/src/pipeline_state/projection.rs b/server/src/pipeline_state/projection.rs index 4396a34b..71f8839b 100644 --- a/server/src/pipeline_state/projection.rs +++ b/server/src/pipeline_state/projection.rs @@ -100,7 +100,7 @@ pub fn read_typed(story_id: &str) -> Result, ProjectionErro #[cfg(test)] mod tests { use super::*; - use crate::pipeline_state::{ArchiveReason, BranchName, GitSha, Stage}; + use crate::pipeline_state::{ArchiveReason, BranchName, GitSha, PlanState, Stage}; use chrono::Utc; use std::num::NonZeroU32; @@ -157,7 +157,10 @@ mod tests { fn project_current_item() { let view = PipelineItemView::for_test( "42_story_test", - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, "Test", Some(crate::config::AgentName::Coder1), 2u32, @@ -267,7 +270,10 @@ mod tests { let view = make_view( "42_story_test", Stage::Frozen { - resume_to: Box::new(Stage::Coding { claim: None }), + resume_to: Box::new(Stage::Coding { + claim: None, + plan: PlanState::Missing, + }), }, Some("Frozen Story"), ); @@ -292,4 +298,66 @@ mod tests { fn git_sha_constructs() { let _ = GitSha("abc".to_string()); } + + // ── PlanState projection ──────────────────────────────────────────── + + #[test] + fn project_coding_plan_missing() { + let view = make_view( + "42_story_test", + Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, + Some("Test"), + ); + let item = PipelineItem::try_from(&view).unwrap(); + assert!(matches!( + item.stage, + Stage::Coding { + plan: PlanState::Missing, + .. + } + )); + } + + #[test] + fn project_coding_plan_drafted() { + let view = make_view( + "42_story_test", + Stage::Coding { + claim: None, + plan: PlanState::Drafted, + }, + Some("Test"), + ); + let item = PipelineItem::try_from(&view).unwrap(); + assert!(matches!( + item.stage, + Stage::Coding { + plan: PlanState::Drafted, + .. + } + )); + } + + #[test] + fn project_coding_plan_confirmed() { + let view = make_view( + "42_story_test", + Stage::Coding { + claim: None, + plan: PlanState::Confirmed, + }, + Some("Test"), + ); + let item = PipelineItem::try_from(&view).unwrap(); + assert!(matches!( + item.stage, + Stage::Coding { + plan: PlanState::Confirmed, + .. + } + )); + } } diff --git a/server/src/pipeline_state/tests.rs b/server/src/pipeline_state/tests.rs index 3cc5c731..03002c1b 100644 --- a/server/src/pipeline_state/tests.rs +++ b/server/src/pipeline_state/tests.rs @@ -52,7 +52,10 @@ fn happy_path_backlog_through_archived() { #[test] fn happy_path_with_qa() { - let s = Stage::Coding { claim: None }; + let s = Stage::Coding { + claim: None, + plan: PlanState::Missing, + }; let s = transition(s, PipelineEvent::GatesStarted).unwrap(); assert!(matches!(s, Stage::Qa)); @@ -69,7 +72,10 @@ fn happy_path_with_qa() { #[test] fn qa_retry_loop() { - let s = Stage::Coding { claim: None }; + let s = Stage::Coding { + claim: None, + plan: PlanState::Missing, + }; let s = transition(s, PipelineEvent::GatesStarted).unwrap(); assert!(matches!(s, Stage::Qa)); @@ -154,7 +160,13 @@ fn cannot_start_gates_from_backlog() { #[test] fn cannot_accept_from_coding() { - let result = transition(Stage::Coding { claim: None }, PipelineEvent::Accepted); + let result = transition( + Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, + PipelineEvent::Accepted, + ); assert!(matches!( result, Err(TransitionError::InvalidTransition { .. }) @@ -165,7 +177,14 @@ fn cannot_accept_from_coding() { #[test] fn block_from_any_active_stage() { - for s in [Stage::Backlog, Stage::Coding { claim: None }, Stage::Qa] { + for s in [ + Stage::Backlog, + Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, + Stage::Qa, + ] { let result = transition( s.clone(), PipelineEvent::Block { @@ -252,7 +271,10 @@ fn legacy_unblock_archived_blocked_returns_to_backlog() { fn abandon_from_any_active_or_done() { for s in [ Stage::Backlog, - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, Stage::Qa, Stage::Done { merged_at: chrono::Utc::now(), @@ -268,7 +290,10 @@ fn abandon_from_any_active_or_done() { fn supersede_from_any_active_or_done() { for s in [ Stage::Backlog, - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, Stage::Qa, Stage::Done { merged_at: chrono::Utc::now(), @@ -292,7 +317,14 @@ fn review_hold_from_active_stages() { // Story 945: `ReviewHold` transitions to `Stage::ReviewHold { resume_to }` // with the resume_to set to the originating stage, replacing the legacy // boolean flag. - for s in [Stage::Backlog, Stage::Coding { claim: None }, Stage::Qa] { + for s in [ + Stage::Backlog, + Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, + Stage::Qa, + ] { let result = transition( s.clone(), PipelineEvent::ReviewHold { @@ -338,7 +370,10 @@ fn merge_failed_final() { #[test] fn merge_failed_only_from_merge() { let result = transition( - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, PipelineEvent::MergeFailedFinal { reason: "conflicts".into(), }, @@ -483,7 +518,14 @@ fn cannot_deps_met_from_upcoming() { #[test] fn reject_from_active_stages() { - for s in [Stage::Backlog, Stage::Coding { claim: None }, Stage::Qa] { + for s in [ + Stage::Backlog, + Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, + Stage::Qa, + ] { let result = transition( s.clone(), PipelineEvent::Reject { @@ -989,7 +1031,10 @@ fn hotfix_requested_from_done_lands_in_coding() { fn hotfix_requested_rejected_from_non_done_stages() { for stage in [ Stage::Backlog, - Stage::Coding { claim: None }, + Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, Stage::Qa, Stage::Merge { feature_branch: fb("feature/story-1"), @@ -1016,7 +1061,10 @@ fn audit_entry_backlog_to_coding_exact_format() { let fired = TransitionFired { story_id: StoryId("1014_my_story".into()), before: Stage::Backlog, - after: Stage::Coding { claim: None }, + after: Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, event: PipelineEvent::DepsMet, at, }; @@ -1116,7 +1164,10 @@ fn audit_entry_done_to_archived() { fn audit_entry_coding_to_blocked() { let fired = TransitionFired { story_id: StoryId("300_s".into()), - before: Stage::Coding { claim: None }, + before: Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, after: Stage::Blocked { reason: "waiting".into(), }, @@ -1138,7 +1189,10 @@ fn audit_entry_blocked_to_coding() { before: Stage::Blocked { reason: "test".into(), }, - after: Stage::Coding { claim: None }, + after: Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, event: PipelineEvent::Unblock, at: chrono::Utc::now(), }; @@ -1177,9 +1231,15 @@ fn audit_entry_merge_to_merge_failure() { fn audit_entry_coding_to_frozen() { let fired = TransitionFired { story_id: StoryId("600_s".into()), - before: Stage::Coding { claim: None }, + before: Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, after: Stage::Frozen { - resume_to: Box::new(Stage::Coding { claim: None }), + resume_to: Box::new(Stage::Coding { + claim: None, + plan: PlanState::Missing, + }), }, event: PipelineEvent::Freeze, at: chrono::Utc::now(), @@ -1194,7 +1254,10 @@ fn audit_entry_coding_to_frozen() { fn audit_entry_coding_to_abandoned() { let fired = TransitionFired { story_id: StoryId("700_s".into()), - before: Stage::Coding { claim: None }, + before: Stage::Coding { + claim: None, + plan: PlanState::Missing, + }, after: Stage::Abandoned { ts: chrono::Utc::now(), }, diff --git a/server/src/pipeline_state/transition.rs b/server/src/pipeline_state/transition.rs index 2d9e96d3..9f4b299b 100644 --- a/server/src/pipeline_state/transition.rs +++ b/server/src/pipeline_state/transition.rs @@ -4,8 +4,8 @@ use chrono::Utc; use serde::{Deserialize, Serialize}; use super::{ - AgentName, ArchiveReason, BranchName, ExecutionState, GitSha, MergeFailureKind, Stage, StoryId, - TransitionError, stage_label, + AgentName, ArchiveReason, BranchName, ExecutionState, GitSha, MergeFailureKind, PlanState, + Stage, StoryId, TransitionError, stage_label, }; // ── Pipeline events ───────────────────────────────────────────────────────── @@ -149,7 +149,10 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Backlog), // ── Forward path ──────────────────────────────────────────────── - (Backlog, DepsMet) => Ok(Coding { claim: None }), + (Backlog, DepsMet) => Ok(Coding { + claim: None, + plan: PlanState::Missing, + }), (Coding { .. }, GatesStarted) => Ok(Qa), ( Coding { .. }, @@ -173,7 +176,10 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Coding { claim: None }), + (Qa, GatesFailed { .. }) => Ok(Coding { + claim: None, + plan: PlanState::Missing, + }), (Merge { .. }, MergeSucceeded { merge_commit }) => Ok(Done { merged_at: now, merge_commit, @@ -312,7 +318,10 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(*resume_to), // ── FixupRequested: MergeFailure → Coding (coder fixup) ──────── - (MergeFailure { .. }, FixupRequested) => Ok(Coding { claim: None }), + (MergeFailure { .. }, FixupRequested) => Ok(Coding { + claim: None, + plan: PlanState::Missing, + }), // ── FixupRequested: MergeFailureFinal → Coding (operator override) // @@ -321,19 +330,28 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Coding { claim: None }), + (MergeFailureFinal { .. }, FixupRequested) => Ok(Coding { + claim: None, + plan: PlanState::Missing, + }), // ── ReQueuedForQa: MergeFailure → Qa (re-review) ──────────────── (MergeFailure { .. }, ReQueuedForQa) => Ok(Qa), // ── MergeAborted: Merge → Coding (abort in-flight merge) ───────── - (Merge { .. }, MergeAborted) => Ok(Coding { claim: None }), + (Merge { .. }, MergeAborted) => Ok(Coding { + claim: None, + plan: PlanState::Missing, + }), // ── HotfixRequested: Done → Coding (post-merge hotfix) ─────────── // Allows reopening a completed story so a coder can apply a hotfix. // A fresh feature branch is forked from master when auto-assign spawns // the coder. - (Done { .. }, HotfixRequested) => Ok(Coding { claim: None }), + (Done { .. }, HotfixRequested) => Ok(Coding { + claim: None, + plan: PlanState::Missing, + }), // ── MergemasterAttempted: MergeFailure → MergeFailureFinal ───── (MergeFailure { kind, .. }, MergemasterAttempted) => Ok(MergeFailureFinal { kind }), @@ -344,7 +362,10 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(*resume_to), // ── Unblock: Blocked → Coding ───────────────────────────────── - (Blocked { .. }, Unblock) => Ok(Coding { claim: None }), + (Blocked { .. }, Unblock) => Ok(Coding { + claim: None, + plan: PlanState::Missing, + }), // ── Unblock MergeFailure → Merge (re-attempt) ──────────────────── // `unblock_story` on a failed merge re-queues it for merge, restoring diff --git a/server/src/pipeline_state/types.rs b/server/src/pipeline_state/types.rs index 68042aaa..c49e482d 100644 --- a/server/src/pipeline_state/types.rs +++ b/server/src/pipeline_state/types.rs @@ -125,6 +125,48 @@ pub struct AgentClaim { pub claimed_at: DateTime, } +// ── Plan state (PLAN.md lifecycle inside Stage::Coding) ──────────────────── + +/// Lifecycle state of the `PLAN.md` file inside a coding worktree. +/// +/// Updated by the filesystem watcher whenever PLAN.md is created, modified, +/// or removed in a story's worktree. Embedded in [`Stage::Coding`] so +/// callers access it via the typed projection instead of greping the filesystem. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +pub enum PlanState { + /// No `PLAN.md` file exists in the worktree yet. + #[default] + Missing, + /// `PLAN.md` exists but contains `` placeholders — the plan has been + /// drafted but not yet confirmed with real file paths and descriptions. + Drafted, + /// `PLAN.md` exists and contains no `` placeholders — the plan is + /// considered confirmed. + Confirmed, +} + +impl PlanState { + /// Wire-form string stored in the `plan_state` CRDT register. + pub fn as_str(&self) -> &'static str { + match self { + PlanState::Missing => "missing", + PlanState::Drafted => "drafted", + PlanState::Confirmed => "confirmed", + } + } + + /// Parse from a `plan_state` CRDT register value. + /// + /// Unknown or empty strings default to [`PlanState::Missing`]. + pub fn from_str(s: &str) -> Self { + match s { + "drafted" => PlanState::Drafted, + "confirmed" => PlanState::Confirmed, + _ => PlanState::Missing, + } + } +} + // ── Synced pipeline stage (lives in CRDT, converges across nodes) ─────────── /// The pipeline stage for a work item. @@ -167,7 +209,13 @@ pub enum Stage { /// working on this item. `None` means the item is in the coding stage but /// no agent has claimed it yet (e.g. just transitioned from Backlog and /// waiting for an agent to pick it up). - Coding { claim: Option }, + /// + /// `plan` tracks the lifecycle of the `PLAN.md` file in the worktree, + /// updated by the filesystem watcher on create/modify/remove events. + Coding { + claim: Option, + plan: PlanState, + }, /// Coder has run; gates are running. Qa, @@ -299,7 +347,10 @@ impl Stage { match s { "upcoming" => Some(Stage::Upcoming), "backlog" => Some(Stage::Backlog), - "coding" => Some(Stage::Coding { claim: None }), + "coding" => Some(Stage::Coding { + claim: None, + plan: PlanState::Missing, + }), "blocked" => Some(Stage::Blocked { reason: String::new(), }), @@ -318,10 +369,16 @@ impl Stage { kind: MergeFailureKind::Other(String::new()), }), "frozen" => Some(Stage::Frozen { - resume_to: Box::new(Stage::Coding { claim: None }), + resume_to: Box::new(Stage::Coding { + claim: None, + plan: PlanState::Missing, + }), }), "review_hold" => Some(Stage::ReviewHold { - resume_to: Box::new(Stage::Coding { claim: None }), + resume_to: Box::new(Stage::Coding { + claim: None, + plan: PlanState::Missing, + }), reason: String::new(), }), "done" => Some(Stage::Done { diff --git a/server/src/service/agents/mod.rs b/server/src/service/agents/mod.rs index 5f8cbf03..89d9cdce 100644 --- a/server/src/service/agents/mod.rs +++ b/server/src/service/agents/mod.rs @@ -144,7 +144,13 @@ pub fn get_work_item_content( let stages = [ ("1_backlog", Stage::Backlog), - ("2_current", Stage::Coding { claim: None }), + ( + "2_current", + Stage::Coding { + claim: None, + plan: Default::default(), + }, + ), ("3_qa", Stage::Qa), ( "4_merge", diff --git a/server/src/service/notifications/format.rs b/server/src/service/notifications/format.rs index aa550cc2..e50f8641 100644 --- a/server/src/service/notifications/format.rs +++ b/server/src/service/notifications/format.rs @@ -254,7 +254,10 @@ mod tests { fn stage_display_name_maps_all_known_stages() { assert_eq!(stage_display_name(&Stage::Backlog), "Backlog"); assert_eq!( - stage_display_name(&Stage::Coding { claim: None }), + stage_display_name(&Stage::Coding { + claim: None, + plan: Default::default() + }), "Current" ); assert_eq!(stage_display_name(&Stage::Qa), "QA"); @@ -293,7 +296,10 @@ mod tests { "42_story_thing", "Some Story", &Stage::Backlog, - &Stage::Coding { claim: None }, + &Stage::Coding { + claim: None, + plan: Default::default(), + }, ); assert!(!plain.contains("\u{1f389}")); } @@ -304,7 +310,10 @@ mod tests { "261_story_bot_notifications", "Bot notifications", &Stage::Upcoming, - &Stage::Coding { claim: None }, + &Stage::Coding { + claim: None, + plan: Default::default(), + }, ); assert_eq!( plain, @@ -321,7 +330,10 @@ mod tests { let (plain, html) = format_stage_notification( "42_bug_fix_thing", "", - &Stage::Coding { claim: None }, + &Stage::Coding { + claim: None, + plan: Default::default(), + }, &Stage::Qa, ); assert_eq!(plain, "#42 \u{2014} Current \u{2192} QA"); @@ -344,7 +356,10 @@ mod tests { let (plain, _html) = format_stage_notification( "1_story_long", &long_name, - &Stage::Coding { claim: None }, + &Stage::Coding { + claim: None, + plan: Default::default(), + }, &Stage::Qa, ); assert!(plain.contains(&long_name)); @@ -355,7 +370,10 @@ mod tests { let (plain, html) = format_stage_notification( "42_story_empty", "", - &Stage::Coding { claim: None }, + &Stage::Coding { + claim: None, + plan: Default::default(), + }, &Stage::Qa, ); assert_eq!(plain, "#42 \u{2014} Current \u{2192} QA"); diff --git a/server/src/worktree/cleanup.rs b/server/src/worktree/cleanup.rs index 702637d0..4c22c7b0 100644 --- a/server/src/worktree/cleanup.rs +++ b/server/src/worktree/cleanup.rs @@ -315,7 +315,10 @@ mod tests { let config = empty_config(); let report = run_cleanup_with_lookup(&project_root, &config, true, |id| { if id == story_id { - Some(Stage::Coding { claim: None }) + Some(Stage::Coding { + claim: None, + plan: Default::default(), + }) } else { None } diff --git a/server/src/worktree/sweep.rs b/server/src/worktree/sweep.rs index 48976ad4..4a4c50e6 100644 --- a/server/src/worktree/sweep.rs +++ b/server/src/worktree/sweep.rs @@ -184,7 +184,8 @@ mod tests { #[test] fn should_not_sweep_coding() { assert!(!worktree_should_be_swept(Some(&Stage::Coding { - claim: None + claim: None, + plan: Default::default(), }))); } @@ -308,7 +309,10 @@ mod tests { let removed = sweep_with_lookup(&project_root, &config, |id| { if id == story_id { - Some(Stage::Coding { claim: None }) + Some(Stage::Coding { + claim: None, + plan: Default::default(), + }) } else { None }