diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index f1e376dd..ef21ab6b 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -44,12 +44,12 @@ pub use read::{ CrdtItemDump, CrdtStateDump, check_archived_deps_crdt, check_unmet_deps_crdt, dep_is_archived_crdt, dep_is_done_crdt, dump_crdt_state, evict_item, read_all_items, read_item, }; -pub use state::init; +pub use state::{init, subscribe}; pub use types::{ ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent, GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, MergeJobView, NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView, Stage, - TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem, subscribe, + TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem, }; pub use write::{ bump_retry_count, migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id, diff --git a/server/src/crdt_state/state/mod.rs b/server/src/crdt_state/state/mod.rs index 0e1a59e7..dcceb3f6 100644 --- a/server/src/crdt_state/state/mod.rs +++ b/server/src/crdt_state/state/mod.rs @@ -29,6 +29,13 @@ mod tests; pub use init::init; +/// Subscribe to CRDT state-transition events. +/// +/// Returns `None` if the CRDT layer has not been initialised yet. +pub fn subscribe() -> Option> { + statics::CRDT_EVENT_TX.get().map(|tx| tx.subscribe()) +} + pub(super) use apply::{apply_and_persist, emit_event}; pub(super) use indices::{ rebuild_active_agent_index, rebuild_agent_throttle_index, rebuild_gateway_project_index, diff --git a/server/src/crdt_state/state/tests.rs b/server/src/crdt_state/state/tests.rs index f4a056d2..f3040d2c 100644 --- a/server/src/crdt_state/state/tests.rs +++ b/server/src/crdt_state/state/tests.rs @@ -94,6 +94,59 @@ fn rebuild_index_maps_story_ids() { assert!(index.contains_key("20_story_b")); } +/// Regression for story 906: `subscribe()` and `emit_event()` must share the +/// same broadcast channel. Previously they referenced two separate +/// `CRDT_EVENT_TX` statics (one in `types.rs`, one in `state/statics.rs`), +/// so events fired into one channel and subscribers listened on the other — +/// no Matrix stage-transition notifications fired post-CRDT migration. +#[tokio::test(flavor = "current_thread")] +async fn subscribe_receives_stage_transition_events() { + use super::super::types::CrdtEvent; + use super::super::write::write_item; + + crate::crdt_state::init_for_test(); + + let mut rx = super::subscribe().expect("subscribe must return Some after init_for_test"); + + // Insert a new item — emit_event fires with from_stage=None. + write_item( + "906_story_subscribe", + "1_backlog", + Some("Subscribe Wiring"), + None, + None, + None, + None, + None, + None, + None, + ); + + let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on insert"); + assert_eq!(evt.story_id, "906_story_subscribe"); + assert!(evt.from_stage.is_none()); + assert_eq!(evt.to_stage, "1_backlog"); + + // Update stage — emit_event fires again with the real from_stage. + write_item( + "906_story_subscribe", + "2_current", + None, + None, + None, + None, + None, + None, + None, + None, + ); + + let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on stage change"); + assert_eq!(evt.story_id, "906_story_subscribe"); + assert_eq!(evt.from_stage.as_deref(), Some("1_backlog")); + assert_eq!(evt.to_stage, "2_current"); +} + #[tokio::test] async fn init_and_write_read_roundtrip() { let tmp = tempfile::tempdir().unwrap(); diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs index b6162ef2..21c24701 100644 --- a/server/src/crdt_state/types.rs +++ b/server/src/crdt_state/types.rs @@ -4,7 +4,6 @@ use bft_json_crdt::json_crdt::*; use bft_json_crdt::list_crdt::ListCrdt; use bft_json_crdt::lww_crdt::LwwRegisterCrdt; -use std::sync::OnceLock; use tokio::sync::broadcast; /// An event emitted when a pipeline item's stage changes in the CRDT document. @@ -20,15 +19,6 @@ pub struct CrdtEvent { pub name: Option, } -/// Subscribe to CRDT state transition events. -/// -/// Returns `None` if the CRDT layer has not been initialised yet. -pub fn subscribe() -> Option> { - CRDT_EVENT_TX.get().map(|tx| tx.subscribe()) -} - -static CRDT_EVENT_TX: OnceLock> = OnceLock::new(); - // ── CRDT document types ────────────────────────────────────────────── /// CRDT sub-document holding gateway-level configuration.